import json import logging import os import sys from functools import partial from pathlib import Path from threading import Thread, active_count from threading import Event import pika from src.access import Access from src.docx_converter.docx_solver import DocxBook from src.epub_converter.epub_solver import EpubBook def configure_file_logger(name, filename='logs/converter_log.log', filemode='w+', logging_level=logging.INFO, logging_format='%(asctime)s - %(levelname)s - %(message)s'): logger = logging.getLogger(name) folder_path = os.path.dirname(os.path.abspath(__file__)) file_path = os.path.join(folder_path, filename) file_handler = logging.FileHandler(file_path, mode=filemode) logger.addHandler(file_handler) file_format = logging.Formatter(fmt=logging_format) file_handler.setFormatter(file_format) logger.setLevel(logging_level) return logger def convert_book(book_type: [DocxBook, EpubBook], params: dict, logger, book_id): logger.info(f'Start processing book-{book_id}.') try: book = book_type(book_id=book_id, main_logger=logger, **params) book.conversion() except Exception as exc: raise exc logger.info(f'Book-{book_id} has been proceeded.') print('Book has been proceeded.') def callback(ch, method, properties, body, logger, libra_locker): print(f'Message: {body}.') logger.info(f'Message: {body}.') try: data = json.loads(body) assert 'apiURL' in data, 'No apiURL field in received message.' assert data.get('fileExtension') in ['epub', 'docx'], 'Wrong book type received.' book_params = { 'access': Access(url=data['apiURL']), } if data.get('fileExtension') == 'docx': book_params.update({'libra_locker': libra_locker}) params = { 'book_type': EpubBook if data.get('fileExtension') == 'epub' else DocxBook, 'book_id': data['id'], 'logger': logger, 'params': book_params } thread = Thread(target=convert_book, kwargs=params) thread.start() logging.log(logging.INFO, f'Active threads: {active_count()}.') # print(f'Active threads: {active_count()}.') except Exception as exc: if hasattr(exc, 'message'): logger.error(f'{sys.exc_info()[0]}: {exc.message}') else: logger.error(f'{sys.exc_info()[0]}: {str(exc)}') finally: pass def local_convert_book(filename, locker): logger = configure_file_logger('consumer', logging_format='%(asctime)s - %(levelname)s - %(message)s') logger.info(f'Start processing book-{filename}.') folder_path = os.path.dirname(os.path.abspath("__file__")) output_path = Path(os.path.join(folder_path, f'json/{filename}.json')) try: book = DocxBook(book_id=filename, main_logger=logger, libra_locker=locker) book.test_conversion() except Exception as exc: raise exc logger.info(f'Book-{filename} has been proceeded.') def local_run(books): locker = Event() locker.set() threads = [] for book_name in books: logging.info("Main : create and start thread %d.", book_name) x = Thread(target=local_convert_book, args=(book_name, locker)) threads.append(x) [x.start() for x in threads] def server_run(): logger = configure_file_logger('consumer') folder_path = os.path.dirname(os.path.abspath(__file__)) config_path = Path(os.path.join(folder_path, "config/queue_config.json")) with open(config_path, "r") as f: conf_param = json.load(f) host = conf_param.get('host') or pika.ConnectionParameters().DEFAULT_HOST port = conf_param.get('port') or pika.ConnectionParameters().DEFAULT_PORT channel = None try: credentials = pika.PlainCredentials(username=conf_param['username'], password=conf_param['password']) parameters = pika.ConnectionParameters(host=host, port=port, credentials=credentials) connection = pika.BlockingConnection(parameters) channel = connection.channel() except Exception as exc: logger.log(logging.ERROR, f'Problems with queue connection.\n' + str(exc)) raise exc try: channel.queue_declare(queue=conf_param['queue'], durable=True, arguments={'x-max-priority': 10}) except ValueError as exc: logger.log(logging.ERROR, f'Queue {conf_param["queue"]} is not declared.') raise exc locker = Event() locker.set() channel.basic_consume(queue=conf_param['queue'], auto_ack=True, on_message_callback=partial(callback, logger=logger, libra_locker=locker)) logger.info('Connection has been established.') print('Waiting for messages...') logger.info('Waiting for messages...') channel.start_consuming() if __name__ == '__main__': server_run()