diff --git a/src/consumer.py b/src/consumer.py index 74dcfda..2fd0a34 100644 --- a/src/consumer.py +++ b/src/consumer.py @@ -5,6 +5,7 @@ import sys from functools import partial from pathlib import Path from threading import Thread, active_count +from threading import Event import pika @@ -12,7 +13,8 @@ from access import Access from book import Book -def configure_file_logger(name, filename='logs/converter_log.log', filemode='w+', logging_level=logging.INFO, +def configure_file_logger(name, filename='logs/converter_log.log', filemode='w+', + logging_level=logging.INFO, logging_format='%(asctime)s - %(message)s'): logger = logging.getLogger(name) @@ -28,12 +30,12 @@ def configure_file_logger(name, filename='logs/converter_log.log', filemode='w+' return logger -def convert_book(book_id, access, logger): +def convert_book(book_id, access, logger, libra_locker): logger.info(f'Start processing book-{book_id}.') logging_format = '%(asctime)s - %(levelname)s - %(message)s' try: - book = Book(book_id, access, main_logger=logger) + book = Book(book_id, access, main_logger=logger, libra_locker=libra_locker) book.conversion(logging_format=logging_format) except Exception as exc: raise exc @@ -42,7 +44,7 @@ def convert_book(book_id, access, logger): # print('Book has been proceeded.') -def callback(ch, method, properties, body, logger): +def callback(ch, method, properties, body, logger, libra_locker): print(f'Message: {body}.') logger.info(f'Message: {body}.') try: @@ -52,7 +54,8 @@ def callback(ch, method, properties, body, logger): params = { 'book_id': data['id'], 'access': Access(url=data['apiURL']), - 'logger': logger + 'logger': logger, + 'libra_locker': libra_locker } thread = Thread(target=convert_book, kwargs=params) @@ -71,19 +74,18 @@ def callback(ch, method, properties, body, logger): # thread.join() # print('Waiting for the message...') -def local_run(filename): + +def local_convert_book(filename, locker): logger = configure_file_logger('consumer', logging_format='%(asctime)s - %(levelname)s - %(message)s') - logger.info(f'Start processing book-{filename}.') - logging_format = '%(asctime)s - %(levelname)s - %(message)s' - - folder_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) - file_path = Path(os.path.join(folder_path, f'docx/{filename}.docx')) + folder_path = os.path.dirname(os.path.dirname(os.path.abspath("__file__"))) output_path = Path(os.path.join(folder_path, f'json/{filename}.json')) - try: - book = Book(file_path=file_path, output_path=output_path, main_logger=logger) + book = Book(book_id=filename, + output_path=output_path, + main_logger=logger, + libra_locker=locker) book.test_conversion() except Exception as exc: @@ -92,8 +94,20 @@ def local_run(filename): logger.info(f'Book-{filename} has been proceeded.') -if __name__ == '__main__': +def local_run(books): + locker = Event() + locker.set() + threads = [] + for book_name in books: + logging.info("Main : create and start thread %d.", book_name) + x = Thread(target=local_convert_book, args=(book_name, locker)) + threads.append(x) + + [x.start() for x in threads] + + +def server_run(): folder_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) # config_path = Path(os.path.join(folder_path, "config/config.json")) config_path = Path(os.path.join(folder_path, "config/queue_config.json")) @@ -116,13 +130,18 @@ if __name__ == '__main__': logger.log(logging.ERROR, f'Queue {conf_param["queue"]} is not declared.') raise exc + locker = Event() + locker.set() channel.basic_consume(queue=conf_param['queue'], auto_ack=True, - on_message_callback=partial(callback, logger=logger)) + on_message_callback=partial(callback, logger=logger, libra_locker=locker)) logger.info('Connection has been established.') print('Waiting for messages...') logger.info('Waiting for messages...') channel.start_consuming() - # local_run('quote') + +if __name__ == '__main__': + server_run() +