forked from LiveCarta/BookConverter
Modify local consumer.py
This commit is contained in:
48
consumer.py
48
consumer.py
@@ -23,36 +23,48 @@ def configure_file_logger(name, filename='logs/converter.log', filemode='w+',
|
||||
file_handler = logging.FileHandler(file_path, mode=filemode)
|
||||
logger.addHandler(file_handler)
|
||||
|
||||
file_format = logging.Formatter(fmt='%(asctime)s - %(levelname)s - %(message)s [%(filename)s:%(lineno)d in %(funcName)s]')
|
||||
file_format = logging.Formatter(fmt='%(asctime)s - %(levelname)s - %(message)s '
|
||||
'[%(filename)s:%(lineno)d in %(funcName)s]')
|
||||
file_handler.setFormatter(file_format)
|
||||
logger.setLevel(logging_level)
|
||||
return logger
|
||||
|
||||
def convert_book(book_type: [DocxBook, EpubBook], book_id, logger, params: dict,):
|
||||
logger.info(f'Start processing book-{book_id}.')
|
||||
|
||||
def local_convert_book(book_type: [DocxBook, EpubBook], book_id, logger, params: dict):
|
||||
logger.info(f'Start processing book-{book_id}.')
|
||||
try:
|
||||
json_file_path = 'json/9781614382264.json'
|
||||
book = book_type(book_id=book_id, main_logger=logger, **params)
|
||||
book.conversion_local(json_file_path)
|
||||
except Exception as exc:
|
||||
raise exc
|
||||
logger.info(f'Book-{book_id} has been proceeded.')
|
||||
|
||||
|
||||
def convert_book(book_type: [DocxBook, EpubBook], book_id, logger, params: dict):
|
||||
logger.info(f'Start processing book-{book_id}.')
|
||||
try:
|
||||
book = book_type(book_id=book_id, main_logger=logger, **params)
|
||||
# book.conversion_local('9781641051217')
|
||||
book.conversion()
|
||||
except Exception as exc:
|
||||
raise exc
|
||||
|
||||
logger.info(f'Book-{book_id} has been proceeded.')
|
||||
|
||||
def callback(ch, method, properties, body, logger, libra_locker):
|
||||
|
||||
def callback(ch, method, properties, body, logger, libre_locker):
|
||||
print(f'Message: {body}.')
|
||||
logger.info(f'Message: {body}.')
|
||||
try:
|
||||
data = json.loads(body)
|
||||
assert 'apiURL' in data, 'No apiURL field in received message.'
|
||||
assert data.get('fileExtension') in ['epub', 'docx'], 'Wrong book type received.'
|
||||
assert data.get('fileExtension') in [
|
||||
'epub', 'docx'], 'Wrong book type received.'
|
||||
|
||||
book_params = {
|
||||
'access': Access(url=data['apiURL']),
|
||||
}
|
||||
if data.get('fileExtension') == 'docx':
|
||||
book_params.update({'libra_locker': libra_locker})
|
||||
book_params.update({'libre_locker': libre_locker})
|
||||
|
||||
params = {
|
||||
'book_type': EpubBook if data.get('fileExtension') == 'epub' else DocxBook,
|
||||
@@ -75,6 +87,7 @@ def callback(ch, method, properties, body, logger, libra_locker):
|
||||
finally:
|
||||
pass
|
||||
|
||||
|
||||
def server_run():
|
||||
logger = configure_file_logger('consumer')
|
||||
|
||||
@@ -87,25 +100,30 @@ def server_run():
|
||||
port = conf_param.get('port') or pika.ConnectionParameters().DEFAULT_PORT
|
||||
channel = None
|
||||
try:
|
||||
credentials = pika.PlainCredentials(username=conf_param['username'], password=conf_param['password'])
|
||||
parameters = pika.ConnectionParameters(host=host, port=port, credentials=credentials)
|
||||
credentials = pika.PlainCredentials(
|
||||
username=conf_param['username'], password=conf_param['password'])
|
||||
parameters = pika.ConnectionParameters(
|
||||
host=host, port=port, credentials=credentials)
|
||||
connection = pika.BlockingConnection(parameters)
|
||||
channel = connection.channel()
|
||||
except Exception as exc:
|
||||
logger.log(logging.ERROR, f'Problems with queue connection.\n' + str(exc))
|
||||
logger.log(logging.ERROR,
|
||||
f'Problems with queue connection.\n' + str(exc))
|
||||
raise exc
|
||||
|
||||
try:
|
||||
channel.queue_declare(queue=conf_param['queue'], durable=True, arguments={'x-max-priority': 10})
|
||||
channel.queue_declare(queue=conf_param['queue'], durable=True, arguments={
|
||||
'x-max-priority': 10})
|
||||
except ValueError as exc:
|
||||
logger.log(logging.ERROR, f'Queue {conf_param["queue"]} is not declared.')
|
||||
logger.log(logging.ERROR,
|
||||
f'Queue {conf_param["queue"]} is not declared.')
|
||||
raise exc
|
||||
|
||||
locker = Event()
|
||||
locker.set()
|
||||
channel.basic_consume(queue=conf_param['queue'],
|
||||
auto_ack=True,
|
||||
on_message_callback=partial(callback, logger=logger, libra_locker=locker))
|
||||
on_message_callback=partial(callback, logger=logger, libre_locker=locker))
|
||||
logger.info('Connection has been established.')
|
||||
print('Waiting for messages...')
|
||||
logger.info('Waiting for messages...')
|
||||
@@ -114,4 +132,4 @@ def server_run():
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
server_run()
|
||||
server_run()
|
||||
|
||||
Reference in New Issue
Block a user