forked from LiveCarta/BookConverter
Update consumer.py
add fixes for api access
This commit is contained in:
@@ -1,50 +1,38 @@
|
|||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
|
from functools import partial
|
||||||
from threading import Thread, active_count
|
from threading import Thread, active_count
|
||||||
|
|
||||||
import pika
|
import pika
|
||||||
|
|
||||||
|
from access import Access
|
||||||
from book import Book
|
from book import Book
|
||||||
# from src.book import Book
|
|
||||||
|
|
||||||
|
|
||||||
class Consumer:
|
def convert_book(book_id, access):
|
||||||
def __init__(self, url):
|
|
||||||
self._connection = None
|
|
||||||
self._channel = None
|
|
||||||
self._closing = False
|
|
||||||
self._url = url
|
|
||||||
|
|
||||||
def run(self):
|
|
||||||
pass
|
|
||||||
|
|
||||||
def close_connection(self):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
def convert_book(file_path, output=None, recreate=True, train_mode=False, convert=False, model_location=None):
|
|
||||||
logging_format = '%(asctime)s - %(levelname)s - %(message)s'
|
logging_format = '%(asctime)s - %(levelname)s - %(message)s'
|
||||||
|
|
||||||
folder_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
book = Book(book_id, access)
|
||||||
file_path = os.path.join(folder_path, file_path)
|
|
||||||
book = Book(file_path, output, recreate, train_mode, convert, model_location)
|
|
||||||
book.conversion(logging_format=logging_format)
|
book.conversion(logging_format=logging_format)
|
||||||
|
|
||||||
print('Book has been proceeded.')
|
print('Book has been proceeded.')
|
||||||
|
|
||||||
range(10)
|
|
||||||
|
|
||||||
|
def callback(ch, method, properties, body, access):
|
||||||
def callback(ch, method, properties, body):
|
|
||||||
print(f'Message: {body}.')
|
print(f'Message: {body}.')
|
||||||
try:
|
try:
|
||||||
data = json.loads(body)
|
data = json.loads(body)
|
||||||
thread = Thread(target=convert_book, kwargs=data)
|
params = {
|
||||||
|
'book_id': data['id'],
|
||||||
|
'access': access
|
||||||
|
}
|
||||||
|
|
||||||
|
thread = Thread(target=convert_book, kwargs=params)
|
||||||
thread.start()
|
thread.start()
|
||||||
print(f'Active threads: {active_count()}.')
|
print(f'Active threads: {active_count()}.')
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as exc:
|
||||||
print(e)
|
print(exc)
|
||||||
pass
|
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
# thread.join()
|
# thread.join()
|
||||||
@@ -54,13 +42,11 @@ def callback(ch, method, properties, body):
|
|||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
|
|
||||||
folder_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
folder_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||||||
config_path = os.path.join(folder_path, "test_config/config.json")
|
config_path = os.path.join(folder_path, "config/config.json")
|
||||||
|
# config_path = os.path.join(folder_path, "config/queue_config.json")
|
||||||
with open(config_path, "r") as f:
|
with open(config_path, "r") as f:
|
||||||
conf_param = json.load(f)
|
conf_param = json.load(f)
|
||||||
|
|
||||||
# credentials = pika.PlainCredentials('admin', 'admin')
|
|
||||||
# parameters = pika.ConnectionParameters('10.40.10.173', credentials=credentials)
|
|
||||||
|
|
||||||
credentials = pika.PlainCredentials(username=conf_param['username'], password=conf_param['password'])
|
credentials = pika.PlainCredentials(username=conf_param['username'], password=conf_param['password'])
|
||||||
parameters = pika.ConnectionParameters(host=conf_param['host'], credentials=credentials)
|
parameters = pika.ConnectionParameters(host=conf_param['host'], credentials=credentials)
|
||||||
connection = pika.BlockingConnection(parameters)
|
connection = pika.BlockingConnection(parameters)
|
||||||
@@ -68,14 +54,13 @@ if __name__ == '__main__':
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
channel.queue_declare(queue=conf_param['queue'], passive=True)
|
channel.queue_declare(queue=conf_param['queue'], passive=True)
|
||||||
except Exception as e:
|
except ValueError as exc:
|
||||||
print(e)
|
raise exc
|
||||||
raise
|
|
||||||
|
|
||||||
|
acs = Access()
|
||||||
channel.basic_consume(queue=conf_param['queue'],
|
channel.basic_consume(queue=conf_param['queue'],
|
||||||
auto_ack=True,
|
auto_ack=True,
|
||||||
on_message_callback=callback)
|
on_message_callback=partial(callback, access=acs))
|
||||||
|
|
||||||
print('Waiting for messages...')
|
print('Waiting for messages...')
|
||||||
channel.start_consuming()
|
channel.start_consuming()
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user