forked from LiveCarta/BookConverter
118 lines
4.2 KiB
Python
118 lines
4.2 KiB
Python
import os
|
|
import sys
|
|
import json
|
|
import pika
|
|
import logging
|
|
from typing import Dict
|
|
from pathlib import Path
|
|
from threading import Event
|
|
from functools import partial
|
|
from threading import Thread, active_count
|
|
|
|
from src.access import Access
|
|
from src.util.helpers import MainLogger
|
|
from src.docx_converter.docx_solver import DocxBook
|
|
from src.epub_converter.epub_solver import EpubBook
|
|
|
|
|
|
def local_convert_book(book_type: [DocxBook, EpubBook], book_id: int, main_logger: logging.Logger, params: dict):
|
|
main_logger.info(f"Start processing book-{book_id}.")
|
|
try:
|
|
json_file_path = "books/json/9781839211973.json"
|
|
book = book_type(book_id=book_id, main_logger=main_logger, **params)
|
|
book.conversion_local(json_file_path)
|
|
except Exception as exc:
|
|
raise exc
|
|
main_logger.info(f"Book-{book_id} has been proceeded.")
|
|
|
|
|
|
def convert_book(book_type: [DocxBook, EpubBook], book_id: int, main_logger: logging.Logger, params: Dict[str, Access]):
|
|
main_logger.info(f"Start processing book-{book_id}.")
|
|
try:
|
|
book = book_type(book_id=book_id, main_logger=main_logger, **params)
|
|
book.conversion()
|
|
except Exception as exc:
|
|
raise exc
|
|
main_logger.info(f"Book-{book_id} has been proceeded.")
|
|
|
|
|
|
def callback(ch, method, properties, body: bytes, main_logger: logging.Logger, libre_locker: Event):
|
|
print(f"Message: {body}.")
|
|
main_logger.info(f"Message: {body}.")
|
|
try:
|
|
data = json.loads(body)
|
|
assert "apiURL" in data, "No apiURL field in received message."
|
|
assert data.get("fileExtension") in [
|
|
"epub", "docx"], "Wrong book type received."
|
|
|
|
book_params = {
|
|
"access": Access(url=data["apiURL"]),
|
|
}
|
|
if data.get("fileExtension") == "docx":
|
|
book_params.update({"libre_locker": libre_locker})
|
|
|
|
params = {
|
|
"book_type": EpubBook if data.get("fileExtension") == "epub" else DocxBook,
|
|
"book_id": data["id"],
|
|
"main_logger": main_logger,
|
|
"params": book_params
|
|
}
|
|
|
|
thread = Thread(target=convert_book, kwargs=params)
|
|
# thread = Thread(target=local_convert_book, kwargs=params)
|
|
thread.start()
|
|
logging.log(logging.INFO, f"Active threads: {active_count()}.")
|
|
# print(f"Active threads: {active_count()}.")
|
|
except Exception as exc:
|
|
if hasattr(exc, "message"):
|
|
main_logger.error(f"{sys.exc_info()[0]}: {exc.message}")
|
|
else:
|
|
main_logger.error(f"{sys.exc_info()[0]}: {str(exc)}")
|
|
finally:
|
|
pass
|
|
|
|
|
|
def server_run():
|
|
logger = MainLogger(name="consumer")
|
|
logger_object = logger.configure_main_logger()
|
|
channel = None
|
|
try:
|
|
folder_path = os.path.dirname(os.path.abspath(__file__))
|
|
config_path = Path(os.path.join(
|
|
folder_path, "config/queue_config.json"))
|
|
with open(config_path, "r") as f:
|
|
conf_param = json.load(f)
|
|
|
|
host = conf_param.get(
|
|
"host") or pika.ConnectionParameters().DEFAULT_HOST
|
|
port = conf_param.get(
|
|
"port") or pika.ConnectionParameters().DEFAULT_PORT
|
|
credentials = pika.PlainCredentials(
|
|
username=conf_param["username"], password=conf_param["password"])
|
|
parameters = pika.ConnectionParameters(
|
|
host=host, port=port, credentials=credentials)
|
|
connection = pika.BlockingConnection(parameters)
|
|
channel = connection.channel()
|
|
channel.queue_declare(queue=conf_param["queue"], durable=True, arguments={
|
|
"x-max-priority": 10})
|
|
except TypeError as exc:
|
|
print("TypeError: problem with queue config, " + str(exc))
|
|
except ValueError as exc:
|
|
logger_object.log(logging.ERROR,
|
|
f"Queue {conf_param['queue']} is not declared.")
|
|
raise exc
|
|
locker = Event()
|
|
locker.set()
|
|
channel.basic_consume(queue=conf_param["queue"],
|
|
auto_ack=True,
|
|
on_message_callback=partial(callback, main_logger=logger_object, libre_locker=locker))
|
|
logger_object.info("Connection has been established.")
|
|
print("Waiting for messages...")
|
|
logger_object.info("Waiting for messages...")
|
|
|
|
channel.start_consuming()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
server_run()
|