forked from LiveCarta/BookConverter
Update consumer.py
- add libra locker - update local run func for running multiple threads
This commit is contained in:
@@ -5,6 +5,7 @@ import sys
|
|||||||
from functools import partial
|
from functools import partial
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from threading import Thread, active_count
|
from threading import Thread, active_count
|
||||||
|
from threading import Event
|
||||||
|
|
||||||
import pika
|
import pika
|
||||||
|
|
||||||
@@ -12,7 +13,8 @@ from access import Access
|
|||||||
from book import Book
|
from book import Book
|
||||||
|
|
||||||
|
|
||||||
def configure_file_logger(name, filename='logs/converter_log.log', filemode='w+', logging_level=logging.INFO,
|
def configure_file_logger(name, filename='logs/converter_log.log', filemode='w+',
|
||||||
|
logging_level=logging.INFO,
|
||||||
logging_format='%(asctime)s - %(message)s'):
|
logging_format='%(asctime)s - %(message)s'):
|
||||||
logger = logging.getLogger(name)
|
logger = logging.getLogger(name)
|
||||||
|
|
||||||
@@ -28,12 +30,12 @@ def configure_file_logger(name, filename='logs/converter_log.log', filemode='w+'
|
|||||||
return logger
|
return logger
|
||||||
|
|
||||||
|
|
||||||
def convert_book(book_id, access, logger):
|
def convert_book(book_id, access, logger, libra_locker):
|
||||||
logger.info(f'Start processing book-{book_id}.')
|
logger.info(f'Start processing book-{book_id}.')
|
||||||
logging_format = '%(asctime)s - %(levelname)s - %(message)s'
|
logging_format = '%(asctime)s - %(levelname)s - %(message)s'
|
||||||
|
|
||||||
try:
|
try:
|
||||||
book = Book(book_id, access, main_logger=logger)
|
book = Book(book_id, access, main_logger=logger, libra_locker=libra_locker)
|
||||||
book.conversion(logging_format=logging_format)
|
book.conversion(logging_format=logging_format)
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
raise exc
|
raise exc
|
||||||
@@ -42,7 +44,7 @@ def convert_book(book_id, access, logger):
|
|||||||
# print('Book has been proceeded.')
|
# print('Book has been proceeded.')
|
||||||
|
|
||||||
|
|
||||||
def callback(ch, method, properties, body, logger):
|
def callback(ch, method, properties, body, logger, libra_locker):
|
||||||
print(f'Message: {body}.')
|
print(f'Message: {body}.')
|
||||||
logger.info(f'Message: {body}.')
|
logger.info(f'Message: {body}.')
|
||||||
try:
|
try:
|
||||||
@@ -52,7 +54,8 @@ def callback(ch, method, properties, body, logger):
|
|||||||
params = {
|
params = {
|
||||||
'book_id': data['id'],
|
'book_id': data['id'],
|
||||||
'access': Access(url=data['apiURL']),
|
'access': Access(url=data['apiURL']),
|
||||||
'logger': logger
|
'logger': logger,
|
||||||
|
'libra_locker': libra_locker
|
||||||
}
|
}
|
||||||
|
|
||||||
thread = Thread(target=convert_book, kwargs=params)
|
thread = Thread(target=convert_book, kwargs=params)
|
||||||
@@ -71,19 +74,18 @@ def callback(ch, method, properties, body, logger):
|
|||||||
# thread.join()
|
# thread.join()
|
||||||
# print('Waiting for the message...')
|
# print('Waiting for the message...')
|
||||||
|
|
||||||
def local_run(filename):
|
|
||||||
|
def local_convert_book(filename, locker):
|
||||||
logger = configure_file_logger('consumer', logging_format='%(asctime)s - %(levelname)s - %(message)s')
|
logger = configure_file_logger('consumer', logging_format='%(asctime)s - %(levelname)s - %(message)s')
|
||||||
|
|
||||||
logger.info(f'Start processing book-{filename}.')
|
logger.info(f'Start processing book-{filename}.')
|
||||||
logging_format = '%(asctime)s - %(levelname)s - %(message)s'
|
|
||||||
|
|
||||||
|
folder_path = os.path.dirname(os.path.dirname(os.path.abspath("__file__")))
|
||||||
folder_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
|
||||||
file_path = Path(os.path.join(folder_path, f'docx/{filename}.docx'))
|
|
||||||
output_path = Path(os.path.join(folder_path, f'json/{filename}.json'))
|
output_path = Path(os.path.join(folder_path, f'json/{filename}.json'))
|
||||||
|
|
||||||
try:
|
try:
|
||||||
book = Book(file_path=file_path, output_path=output_path, main_logger=logger)
|
book = Book(book_id=filename,
|
||||||
|
output_path=output_path,
|
||||||
|
main_logger=logger,
|
||||||
|
libra_locker=locker)
|
||||||
book.test_conversion()
|
book.test_conversion()
|
||||||
|
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
@@ -92,8 +94,20 @@ def local_run(filename):
|
|||||||
logger.info(f'Book-{filename} has been proceeded.')
|
logger.info(f'Book-{filename} has been proceeded.')
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
def local_run(books):
|
||||||
|
locker = Event()
|
||||||
|
locker.set()
|
||||||
|
|
||||||
|
threads = []
|
||||||
|
for book_name in books:
|
||||||
|
logging.info("Main : create and start thread %d.", book_name)
|
||||||
|
x = Thread(target=local_convert_book, args=(book_name, locker))
|
||||||
|
threads.append(x)
|
||||||
|
|
||||||
|
[x.start() for x in threads]
|
||||||
|
|
||||||
|
|
||||||
|
def server_run():
|
||||||
folder_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
folder_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||||||
# config_path = Path(os.path.join(folder_path, "config/config.json"))
|
# config_path = Path(os.path.join(folder_path, "config/config.json"))
|
||||||
config_path = Path(os.path.join(folder_path, "config/queue_config.json"))
|
config_path = Path(os.path.join(folder_path, "config/queue_config.json"))
|
||||||
@@ -116,13 +130,18 @@ if __name__ == '__main__':
|
|||||||
logger.log(logging.ERROR, f'Queue {conf_param["queue"]} is not declared.')
|
logger.log(logging.ERROR, f'Queue {conf_param["queue"]} is not declared.')
|
||||||
raise exc
|
raise exc
|
||||||
|
|
||||||
|
locker = Event()
|
||||||
|
locker.set()
|
||||||
channel.basic_consume(queue=conf_param['queue'],
|
channel.basic_consume(queue=conf_param['queue'],
|
||||||
auto_ack=True,
|
auto_ack=True,
|
||||||
on_message_callback=partial(callback, logger=logger))
|
on_message_callback=partial(callback, logger=logger, libra_locker=locker))
|
||||||
logger.info('Connection has been established.')
|
logger.info('Connection has been established.')
|
||||||
print('Waiting for messages...')
|
print('Waiting for messages...')
|
||||||
logger.info('Waiting for messages...')
|
logger.info('Waiting for messages...')
|
||||||
|
|
||||||
channel.start_consuming()
|
channel.start_consuming()
|
||||||
|
|
||||||
# local_run('quote')
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
server_run()
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user