This repository has been archived on 2026-04-06. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
BookConverter/consumer.py
2022-03-28 18:43:27 +03:00

117 lines
4.0 KiB
Python

import os
import sys
import json
import pika
import logging
from pathlib import Path
from threading import Event
from functools import partial
from threading import Thread, active_count
from src.access import Access
from src.docx_converter.docx_solver import DocxBook
from src.epub_converter.epub_solver import EpubBook
def configure_file_logger(name, filename='logs/converter.log', filemode='w+',
logging_level=logging.INFO):
logger = logging.getLogger(name)
folder_path = os.path.dirname(os.path.abspath(__file__))
file_path = os.path.join(folder_path, filename)
file_handler = logging.FileHandler(file_path, mode=filemode)
logger.addHandler(file_handler)
file_format = logging.Formatter(fmt='%(asctime)s - %(levelname)s - %(message)s [%(filename)s:%(lineno)d in %(funcName)s]')
file_handler.setFormatter(file_format)
logger.setLevel(logging_level)
return logger
def convert_book(book_type: [DocxBook, EpubBook], book_id, logger, params: dict,):
logger.info(f'Start processing book-{book_id}.')
try:
book = book_type(book_id=book_id, main_logger=logger, **params)
# book.conversion_local('9781641051217')
book.conversion()
except Exception as exc:
raise exc
logger.info(f'Book-{book_id} has been proceeded.')
def callback(ch, method, properties, body, logger, libra_locker):
print(f'Message: {body}.')
logger.info(f'Message: {body}.')
try:
data = json.loads(body)
assert 'apiURL' in data, 'No apiURL field in received message.'
assert data.get('fileExtension') in ['epub', 'docx'], 'Wrong book type received.'
book_params = {
'access': Access(url=data['apiURL']),
}
if data.get('fileExtension') == 'docx':
book_params.update({'libra_locker': libra_locker})
params = {
'book_type': EpubBook if data.get('fileExtension') == 'epub' else DocxBook,
'book_id': data['id'],
'logger': logger,
'params': book_params
}
thread = Thread(target=convert_book, kwargs=params)
thread.start()
logging.log(logging.INFO, f'Active threads: {active_count()}.')
# print(f'Active threads: {active_count()}.')
except Exception as exc:
if hasattr(exc, 'message'):
logger.error(f'{sys.exc_info()[0]}: {exc.message}')
else:
logger.error(f'{sys.exc_info()[0]}: {str(exc)}')
finally:
pass
def server_run():
logger = configure_file_logger('consumer')
folder_path = os.path.dirname(os.path.abspath(__file__))
config_path = Path(os.path.join(folder_path, "config/queue_config.json"))
with open(config_path, "r") as f:
conf_param = json.load(f)
host = conf_param.get('host') or pika.ConnectionParameters().DEFAULT_HOST
port = conf_param.get('port') or pika.ConnectionParameters().DEFAULT_PORT
channel = None
try:
credentials = pika.PlainCredentials(username=conf_param['username'], password=conf_param['password'])
parameters = pika.ConnectionParameters(host=host, port=port, credentials=credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
except Exception as exc:
logger.log(logging.ERROR, f'Problems with queue connection.\n' + str(exc))
raise exc
try:
channel.queue_declare(queue=conf_param['queue'], durable=True, arguments={'x-max-priority': 10})
except ValueError as exc:
logger.log(logging.ERROR, f'Queue {conf_param["queue"]} is not declared.')
raise exc
locker = Event()
locker.set()
channel.basic_consume(queue=conf_param['queue'],
auto_ack=True,
on_message_callback=partial(callback, logger=logger, libra_locker=locker))
logger.info('Connection has been established.')
print('Waiting for messages...')
logger.info('Waiting for messages...')
channel.start_consuming()
if __name__ == '__main__':
server_run()