This repository has been archived on 2026-04-06. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
BookConverter/consumer.py
2021-10-01 10:33:38 +03:00

155 lines
5.0 KiB
Python

import json
import logging
import os
import sys
from functools import partial
from pathlib import Path
from threading import Thread, active_count
from threading import Event
import pika
from src.access import Access
from src.docx_converter.docx_solver import DocxBook
from src.epub_converter.epub_solver import EpubBook
def configure_file_logger(name, filename='logs/converter_log.log', filemode='w+',
logging_level=logging.INFO,
logging_format='%(asctime)s - %(levelname)s - %(message)s'):
logger = logging.getLogger(name)
folder_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
file_path = os.path.join(folder_path, filename)
file_handler = logging.FileHandler(file_path, mode=filemode)
logger.addHandler(file_handler)
file_format = logging.Formatter(fmt=logging_format)
file_handler.setFormatter(file_format)
logger.setLevel(logging_level)
return logger
def convert_book(book_type: [DocxBook, EpubBook], params: dict, logger, book_id):
logger.info(f'Start processing book-{book_id}.')
try:
book = book_type(book_id=book_id, main_logger=logger, **params)
book.conversion()
except Exception as exc:
raise exc
logger.info(f'Book-{book_id} has been proceeded.')
print('Book has been proceeded.')
def callback(ch, method, properties, body, logger, libra_locker):
print(f'Message: {body}.')
logger.info(f'Message: {body}.')
try:
data = json.loads(body)
assert 'apiURL' in data, 'No apiURL field in received message.'
assert data.get('fileExtension') in ['epub', 'docx'], 'Wrong book type received.'
book_params = {
'access': Access(url=data['apiURL']),
}
if data.get('fileExtension') == 'docx':
book_params.update({'libra_locker': libra_locker})
params = {
'book_type': EpubBook if data.get('fileExtension') == 'epub' else DocxBook,
'book_id': data['id'],
'logger': logger,
'params': book_params
}
thread = Thread(target=convert_book, kwargs=params)
thread.start()
logging.log(logging.INFO, f'Active threads: {active_count()}.')
# print(f'Active threads: {active_count()}.')
except Exception as exc:
if hasattr(exc, 'message'):
logger.error(f'{sys.exc_info()[0]}: {exc.message}')
else:
logger.error(f'{sys.exc_info()[0]}: {str(exc)}')
finally:
pass
def local_convert_book(filename, locker):
logger = configure_file_logger('consumer', logging_format='%(asctime)s - %(levelname)s - %(message)s')
logger.info(f'Start processing book-{filename}.')
folder_path = os.path.dirname(os.path.dirname(os.path.abspath("__file__")))
output_path = Path(os.path.join(folder_path, f'json/{filename}.json'))
try:
book = DocxBook(book_id=filename,
main_logger=logger,
libra_locker=locker)
book.test_conversion()
except Exception as exc:
raise exc
logger.info(f'Book-{filename} has been proceeded.')
def local_run(books):
locker = Event()
locker.set()
threads = []
for book_name in books:
logging.info("Main : create and start thread %d.", book_name)
x = Thread(target=local_convert_book, args=(book_name, locker))
threads.append(x)
[x.start() for x in threads]
def server_run():
logger = configure_file_logger('consumer')
folder_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
config_path = Path(os.path.join(folder_path, "config/queue_config.json"))
with open(config_path, "r") as f:
conf_param = json.load(f)
host = conf_param.get('host') or pika.ConnectionParameters().DEFAULT_HOST
port = conf_param.get('port') or pika.ConnectionParameters().DEFAULT_PORT
channel = None
try:
credentials = pika.PlainCredentials(username=conf_param['username'], password=conf_param['password'])
parameters = pika.ConnectionParameters(host=host, port=port, credentials=credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
except Exception as exc:
logger.log(logging.ERROR, f'Problems with queue connection.\n' + str(exc))
raise exc
try:
channel.queue_declare(queue=conf_param['queue'], durable=True, arguments={'x-max-priority': 10})
except ValueError as exc:
logger.log(logging.ERROR, f'Queue {conf_param["queue"]} is not declared.')
raise exc
locker = Event()
locker.set()
channel.basic_consume(queue=conf_param['queue'],
auto_ack=True,
on_message_callback=partial(callback, logger=logger, libra_locker=locker))
logger.info('Connection has been established.')
print('Waiting for messages...')
logger.info('Waiting for messages...')
channel.start_consuming()
if __name__ == '__main__':
server_run()