import os import sys import json import pika import logging from pathlib import Path from threading import Event from functools import partial from threading import Thread, active_count from src.access import Access from src.docx_converter.docx_solver import DocxBook from src.epub_converter.epub_solver import EpubBook def configure_file_logger(name, filename="logs/converter.log", filemode="w+", logging_level=logging.INFO): logger = logging.getLogger(name) folder_path = os.path.dirname(os.path.abspath(__file__)) file_path = os.path.join(folder_path, filename) file_handler = logging.FileHandler(file_path, mode=filemode) logger.addHandler(file_handler) file_format = logging.Formatter(fmt="%(asctime)s - %(levelname)s - %(message)s " "[%(filename)s:%(lineno)d in %(funcName)s]") file_handler.setFormatter(file_format) logger.setLevel(logging_level) return logger def local_convert_book(book_type: [DocxBook, EpubBook], book_id, logger, params: dict): logger.info(f"Start processing book-{book_id}.") try: json_file_path = "books/json/9781614382264.json" book = book_type(book_id=book_id, main_logger=logger, **params) book.conversion_local(json_file_path) except Exception as exc: raise exc logger.info(f"Book-{book_id} has been proceeded.") def convert_book(book_type: [DocxBook, EpubBook], book_id, logger, params: dict): logger.info(f"Start processing book-{book_id}.") try: book = book_type(book_id=book_id, main_logger=logger, **params) book.conversion() except Exception as exc: raise exc logger.info(f"Book-{book_id} has been proceeded.") def callback(ch, method, properties, body, logger, libre_locker): print(f"Message: {body}.") logger.info(f"Message: {body}.") try: data = json.loads(body) assert "apiURL" in data, "No apiURL field in received message." assert data.get("fileExtension") in [ "epub", "docx"], "Wrong book type received." book_params = { "access": Access(url=data["apiURL"]), } if data.get("fileExtension") == "docx": book_params.update({"libre_locker": libre_locker}) params = { "book_type": EpubBook if data.get("fileExtension") == "epub" else DocxBook, "book_id": data["id"], "logger": logger, "params": book_params } thread = Thread(target=convert_book, kwargs=params) thread.start() logging.log(logging.INFO, f"Active threads: {active_count()}.") # print(f"Active threads: {active_count()}.") except Exception as exc: if hasattr(exc, "message"): logger.error(f"{sys.exc_info()[0]}: {exc.message}") else: logger.error(f"{sys.exc_info()[0]}: {str(exc)}") finally: pass def server_run(): logger = configure_file_logger("consumer") channel = None try: folder_path = os.path.dirname(os.path.abspath(__file__)) config_path = Path(os.path.join( folder_path, "config/queue_config.json")) with open(config_path, "r") as f: conf_param = json.load(f) host = conf_param.get( "host") or pika.ConnectionParameters().DEFAULT_HOST port = conf_param.get( "port") or pika.ConnectionParameters().DEFAULT_PORT credentials = pika.PlainCredentials( username=conf_param["username"], password=conf_param["password"]) parameters = pika.ConnectionParameters( host=host, port=port, credentials=credentials) connection = pika.BlockingConnection(parameters) channel = connection.channel() channel.queue_declare(queue=conf_param["queue"], durable=True, arguments={ "x-max-priority": 10}) except TypeError as exc: print("TypeError: problem with config, " + str(exc)) except ValueError as exc: logger.log(logging.ERROR, f"Queue {conf_param['queue']} is not declared.") raise exc locker = Event() locker.set() channel.basic_consume(queue=conf_param["queue"], auto_ack=True, on_message_callback=partial(callback, logger=logger, libre_locker=locker)) logger.info("Connection has been established.") print("Waiting for messages...") logger.info("Waiting for messages...") channel.start_consuming() if __name__ == "__main__": server_run()