import os import sys import json import pika import logging from typing import Dict from pathlib import Path from threading import Event from functools import partial from threading import Thread, active_count from src.access import Access from src.docx_converter.docx_solver import DocxBook from src.epub_converter.epub_solver import EpubBook def configure_file_logger(name: str, filename: str = "logs/converter.log", filemode: str ="w+", logging_level: int = logging.INFO) -> logging.Logger: logger = logging.getLogger(name) folder_path = os.path.dirname(os.path.abspath(__file__)) file_path = os.path.join(folder_path, filename) file_handler = logging.FileHandler(file_path, mode=filemode) logger.addHandler(file_handler) file_format = logging.Formatter(fmt="%(asctime)s - %(levelname)s - %(message)s " "[%(filename)s:%(lineno)d in %(funcName)s]") file_handler.setFormatter(file_format) logger.setLevel(logging_level) return logger def local_convert_book(book_type: [DocxBook, EpubBook], book_id: int, logger: logging.Logger, params: dict): logger.info(f"Start processing book-{book_id}.") try: json_file_path = "books/json/9781614382264.json" book = book_type(book_id=book_id, main_logger=logger, **params) book.conversion_local(json_file_path) except Exception as exc: raise exc logger.info(f"Book-{book_id} has been proceeded.") def convert_book(book_type: [DocxBook, EpubBook], book_id: int, logger: logging.Logger, params: Dict[str, Access]): logger.info(f"Start processing book-{book_id}.") try: book = book_type(book_id=book_id, main_logger=logger, **params) book.conversion() except Exception as exc: raise exc logger.info(f"Book-{book_id} has been proceeded.") def callback(ch, method, properties, body: bytes, logger: logging.Logger, libre_locker: Event): print(f"Message: {body}.") logger.info(f"Message: {body}.") try: data = json.loads(body) assert "apiURL" in data, "No apiURL field in received message." assert data.get("fileExtension") in [ "epub", "docx"], "Wrong book type received." book_params = { "access": Access(url=data["apiURL"]), } if data.get("fileExtension") == "docx": book_params.update({"libre_locker": libre_locker}) params = { "book_type": EpubBook if data.get("fileExtension") == "epub" else DocxBook, "book_id": data["id"], "logger": logger, "params": book_params } thread = Thread(target=convert_book, kwargs=params) thread.start() logging.log(logging.INFO, f"Active threads: {active_count()}.") # print(f"Active threads: {active_count()}.") except Exception as exc: if hasattr(exc, "message"): logger.error(f"{sys.exc_info()[0]}: {exc.message}") else: logger.error(f"{sys.exc_info()[0]}: {str(exc)}") finally: pass def server_run(): logger = configure_file_logger("consumer") channel = None try: folder_path = os.path.dirname(os.path.abspath(__file__)) config_path = Path(os.path.join( folder_path, "config/queue_config.json")) with open(config_path, "r") as f: conf_param = json.load(f) host = conf_param.get( "host") or pika.ConnectionParameters().DEFAULT_HOST port = conf_param.get( "port") or pika.ConnectionParameters().DEFAULT_PORT credentials = pika.PlainCredentials( username=conf_param["username"], password=conf_param["password"]) parameters = pika.ConnectionParameters( host=host, port=port, credentials=credentials) connection = pika.BlockingConnection(parameters) channel = connection.channel() channel.queue_declare(queue=conf_param["queue"], durable=True, arguments={ "x-max-priority": 10}) except TypeError as exc: print("TypeError: problem with config, " + str(exc)) except ValueError as exc: logger.log(logging.ERROR, f"Queue {conf_param['queue']} is not declared.") raise exc locker = Event() locker.set() channel.basic_consume(queue=conf_param["queue"], auto_ack=True, on_message_callback=partial(callback, logger=logger, libre_locker=locker)) logger.info("Connection has been established.") print("Waiting for messages...") logger.info("Waiting for messages...") channel.start_consuming() if __name__ == "__main__": server_run()