diff --git a/consumer.py b/consumer.py index 4c67d6e..2ea307c 100644 --- a/consumer.py +++ b/consumer.py @@ -13,7 +13,7 @@ from src.docx_converter.docx_solver import DocxBook from src.epub_converter.epub_solver import EpubBook -def configure_file_logger(name, filename='logs/converter.log', filemode='w+', +def configure_file_logger(name, filename="logs/converter.log", filemode="w+", logging_level=logging.INFO): logger = logging.getLogger(name) @@ -23,113 +23,108 @@ def configure_file_logger(name, filename='logs/converter.log', filemode='w+', file_handler = logging.FileHandler(file_path, mode=filemode) logger.addHandler(file_handler) - file_format = logging.Formatter(fmt='%(asctime)s - %(levelname)s - %(message)s ' - '[%(filename)s:%(lineno)d in %(funcName)s]') + file_format = logging.Formatter(fmt="%(asctime)s - %(levelname)s - %(message)s " + "[%(filename)s:%(lineno)d in %(funcName)s]") file_handler.setFormatter(file_format) logger.setLevel(logging_level) return logger def local_convert_book(book_type: [DocxBook, EpubBook], book_id, logger, params: dict): - logger.info(f'Start processing book-{book_id}.') + logger.info(f"Start processing book-{book_id}.") try: - json_file_path = 'json/9781614382264.json' + json_file_path = "json/9781614382264.json" book = book_type(book_id=book_id, main_logger=logger, **params) book.conversion_local(json_file_path) except Exception as exc: raise exc - logger.info(f'Book-{book_id} has been proceeded.') + logger.info(f"Book-{book_id} has been proceeded.") def convert_book(book_type: [DocxBook, EpubBook], book_id, logger, params: dict): - logger.info(f'Start processing book-{book_id}.') + logger.info(f"Start processing book-{book_id}.") try: book = book_type(book_id=book_id, main_logger=logger, **params) book.conversion() except Exception as exc: raise exc - logger.info(f'Book-{book_id} has been proceeded.') + logger.info(f"Book-{book_id} has been proceeded.") def callback(ch, method, properties, body, logger, libre_locker): - print(f'Message: {body}.') - logger.info(f'Message: {body}.') + print(f"Message: {body}.") + logger.info(f"Message: {body}.") try: data = json.loads(body) - assert 'apiURL' in data, 'No apiURL field in received message.' - assert data.get('fileExtension') in [ - 'epub', 'docx'], 'Wrong book type received.' + assert "apiURL" in data, "No apiURL field in received message." + assert data.get("fileExtension") in [ + "epub", "docx"], "Wrong book type received." book_params = { - 'access': Access(url=data['apiURL']), + "access": Access(url=data["apiURL"]), } - if data.get('fileExtension') == 'docx': - book_params.update({'libre_locker': libre_locker}) + if data.get("fileExtension") == "docx": + book_params.update({"libre_locker": libre_locker}) params = { - 'book_type': EpubBook if data.get('fileExtension') == 'epub' else DocxBook, - 'book_id': data['id'], - 'logger': logger, - 'params': book_params + "book_type": EpubBook if data.get("fileExtension") == "epub" else DocxBook, + "book_id": data["id"], + "logger": logger, + "params": book_params } thread = Thread(target=convert_book, kwargs=params) thread.start() - logging.log(logging.INFO, f'Active threads: {active_count()}.') - # print(f'Active threads: {active_count()}.') + logging.log(logging.INFO, f"Active threads: {active_count()}.") + # print(f"Active threads: {active_count()}.") except Exception as exc: - if hasattr(exc, 'message'): - logger.error(f'{sys.exc_info()[0]}: {exc.message}') + if hasattr(exc, "message"): + logger.error(f"{sys.exc_info()[0]}: {exc.message}") else: - logger.error(f'{sys.exc_info()[0]}: {str(exc)}') + logger.error(f"{sys.exc_info()[0]}: {str(exc)}") finally: pass def server_run(): - logger = configure_file_logger('consumer') - - folder_path = os.path.dirname(os.path.abspath(__file__)) - config_path = Path(os.path.join(folder_path, "config/queue_config.json")) - with open(config_path, "r") as f: - conf_param = json.load(f) - - host = conf_param.get('host') or pika.ConnectionParameters().DEFAULT_HOST - port = conf_param.get('port') or pika.ConnectionParameters().DEFAULT_PORT - channel = None + logger = configure_file_logger("consumer") try: + folder_path = os.path.dirname(os.path.abspath(__file__)) + config_path = Path(os.path.join(folder_path, "config/queue_config.json")) + with open(config_path, "r") as f: + conf_param = json.load(f) + + host = conf_param.get("host") or pika.ConnectionParameters().DEFAULT_HOST + port = conf_param.get("port") or pika.ConnectionParameters().DEFAULT_PORT + channel = None credentials = pika.PlainCredentials( - username=conf_param['username'], password=conf_param['password']) + username=conf_param["username"], password=conf_param["password"]) parameters = pika.ConnectionParameters( host=host, port=port, credentials=credentials) connection = pika.BlockingConnection(parameters) channel = connection.channel() - except Exception as exc: - logger.log(logging.ERROR, - f'Problems with queue connection.\n' + str(exc)) - raise exc - - try: - channel.queue_declare(queue=conf_param['queue'], durable=True, arguments={ - 'x-max-priority': 10}) + channel.queue_declare(queue=conf_param["queue"], durable=True, arguments={ + "x-max-priority": 10}) + except TypeError as exc: + print("TypeError: problem with config, " + str(exc)) except ValueError as exc: logger.log(logging.ERROR, - f'Queue {conf_param["queue"]} is not declared.') + f"Queue {conf_param['queue']} is not declared.") raise exc locker = Event() locker.set() - channel.basic_consume(queue=conf_param['queue'], + channel.basic_consume(queue=conf_param["queue"], auto_ack=True, on_message_callback=partial(callback, logger=logger, libre_locker=locker)) - logger.info('Connection has been established.') - print('Waiting for messages...') - logger.info('Waiting for messages...') + logger.info("Connection has been established.") + print("Waiting for messages...") + logger.info("Waiting for messages...") channel.start_consuming() -if __name__ == '__main__': +if __name__ == "__main__": server_run()