This repository has been archived on 2026-04-06. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
BookConverter/consumer.py
2022-10-27 12:19:35 +03:00

117 lines
4.1 KiB
Python

import os
import sys
import json
import pika
import logging
from typing import Dict
from pathlib import Path
from threading import Event
from functools import partial
from threading import Thread, active_count
from src.access import Access
from src.util.helpers import MainLogger
from src.docx_converter.docx_solver import DocxBook
from src.epub_converter.epub_solver import EpubBook
def local_convert_book(book_type: [DocxBook, EpubBook], book_id: int, logger: logging.Logger, params: dict):
logger.info(f"Start processing book-{book_id}.")
try:
json_file_path = "books/json/9781614382264.json"
book = book_type(book_id=book_id, main_logger=logger, **params)
book.conversion_local(json_file_path)
except Exception as exc:
raise exc
logger.info(f"Book-{book_id} has been proceeded.")
def convert_book(book_type: [DocxBook, EpubBook], book_id: int, logger: logging.Logger, params: Dict[str, Access]):
logger.info(f"Start processing book-{book_id}.")
try:
book = book_type(book_id=book_id, main_logger=logger, **params)
book.conversion()
except Exception as exc:
raise exc
logger.info(f"Book-{book_id} has been proceeded.")
def callback(ch, method, properties, body: bytes, logger: logging.Logger, libre_locker: Event):
print(f"Message: {body}.")
logger.info(f"Message: {body}.")
try:
data = json.loads(body)
assert "apiURL" in data, "No apiURL field in received message."
assert data.get("fileExtension") in [
"epub", "docx"], "Wrong book type received."
book_params = {
"access": Access(url=data["apiURL"]),
}
if data.get("fileExtension") == "docx":
book_params.update({"libre_locker": libre_locker})
params = {
"book_type": EpubBook if data.get("fileExtension") == "epub" else DocxBook,
"book_id": data["id"],
"logger": logger,
"params": book_params
}
thread = Thread(target=convert_book, kwargs=params)
thread.start()
logging.log(logging.INFO, f"Active threads: {active_count()}.")
# print(f"Active threads: {active_count()}.")
except Exception as exc:
if hasattr(exc, "message"):
logger.error(f"{sys.exc_info()[0]}: {exc.message}")
else:
logger.error(f"{sys.exc_info()[0]}: {str(exc)}")
finally:
pass
def server_run():
logger = MainLogger(name="consumer")
logger_object = logger.configure_main_logger()
channel = None
try:
folder_path = os.path.dirname(os.path.abspath(__file__))
config_path = Path(os.path.join(
folder_path, "config/queue_config.json"))
with open(config_path, "r") as f:
conf_param = json.load(f)
host = conf_param.get(
"host") or pika.ConnectionParameters().DEFAULT_HOST
port = conf_param.get(
"port") or pika.ConnectionParameters().DEFAULT_PORT
credentials = pika.PlainCredentials(
username=conf_param["username"], password=conf_param["password"])
parameters = pika.ConnectionParameters(
host=host, port=port, credentials=credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.queue_declare(queue=conf_param["queue"], durable=True, arguments={
"x-max-priority": 10})
except TypeError as exc:
print("TypeError: problem with config, " + str(exc))
except ValueError as exc:
logger_object.log(logging.ERROR,
f"Queue {conf_param['queue']} is not declared.")
raise exc
locker = Event()
locker.set()
channel.basic_consume(queue=conf_param["queue"],
auto_ack=True,
on_message_callback=partial(callback, logger=logger_object, libre_locker=locker))
logger_object.info("Connection has been established.")
print("Waiting for messages...")
logger_object.info("Waiting for messages...")
channel.start_consuming()
if __name__ == "__main__":
server_run()