From d95168205451e30aa3394bb171a8861f8ebc4705 Mon Sep 17 00:00:00 2001 From: Jeniamakarchik Date: Wed, 5 Feb 2020 16:35:09 +0300 Subject: [PATCH] Update consumer.py add fixes for api access --- src/consumer.py | 55 ++++++++++++++++++------------------------------- 1 file changed, 20 insertions(+), 35 deletions(-) diff --git a/src/consumer.py b/src/consumer.py index 8120042..da145b5 100644 --- a/src/consumer.py +++ b/src/consumer.py @@ -1,50 +1,38 @@ import json import os +from functools import partial from threading import Thread, active_count import pika +from access import Access from book import Book -# from src.book import Book -class Consumer: - def __init__(self, url): - self._connection = None - self._channel = None - self._closing = False - self._url = url - - def run(self): - pass - - def close_connection(self): - pass - - -def convert_book(file_path, output=None, recreate=True, train_mode=False, convert=False, model_location=None): +def convert_book(book_id, access): logging_format = '%(asctime)s - %(levelname)s - %(message)s' - folder_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) - file_path = os.path.join(folder_path, file_path) - book = Book(file_path, output, recreate, train_mode, convert, model_location) + book = Book(book_id, access) book.conversion(logging_format=logging_format) + print('Book has been proceeded.') - range(10) - -def callback(ch, method, properties, body): +def callback(ch, method, properties, body, access): print(f'Message: {body}.') try: data = json.loads(body) - thread = Thread(target=convert_book, kwargs=data) + params = { + 'book_id': data['id'], + 'access': access + } + + thread = Thread(target=convert_book, kwargs=params) thread.start() print(f'Active threads: {active_count()}.') - except Exception as e: - print(e) - pass + except Exception as exc: + print(exc) finally: # thread.join() @@ -54,13 +42,11 @@ def callback(ch, method, properties, body): if __name__ == '__main__': folder_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) - config_path = os.path.join(folder_path, "test_config/config.json") + config_path = os.path.join(folder_path, "config/config.json") + # config_path = os.path.join(folder_path, "config/queue_config.json") with open(config_path, "r") as f: conf_param = json.load(f) - # credentials = pika.PlainCredentials('admin', 'admin') - # parameters = pika.ConnectionParameters('10.40.10.173', credentials=credentials) - credentials = pika.PlainCredentials(username=conf_param['username'], password=conf_param['password']) parameters = pika.ConnectionParameters(host=conf_param['host'], credentials=credentials) connection = pika.BlockingConnection(parameters) @@ -68,14 +54,13 @@ if __name__ == '__main__': try: channel.queue_declare(queue=conf_param['queue'], passive=True) - except Exception as e: - print(e) - raise + except ValueError as exc: + raise exc + acs = Access() channel.basic_consume(queue=conf_param['queue'], auto_ack=True, - on_message_callback=callback) + on_message_callback=partial(callback, access=acs)) print('Waiting for messages...') channel.start_consuming() -