forked from LiveCarta/BookConverter
Add TypeError Exception
This commit is contained in:
95
consumer.py
95
consumer.py
@@ -13,7 +13,7 @@ from src.docx_converter.docx_solver import DocxBook
|
|||||||
from src.epub_converter.epub_solver import EpubBook
|
from src.epub_converter.epub_solver import EpubBook
|
||||||
|
|
||||||
|
|
||||||
def configure_file_logger(name, filename='logs/converter.log', filemode='w+',
|
def configure_file_logger(name, filename="logs/converter.log", filemode="w+",
|
||||||
logging_level=logging.INFO):
|
logging_level=logging.INFO):
|
||||||
logger = logging.getLogger(name)
|
logger = logging.getLogger(name)
|
||||||
|
|
||||||
@@ -23,113 +23,108 @@ def configure_file_logger(name, filename='logs/converter.log', filemode='w+',
|
|||||||
file_handler = logging.FileHandler(file_path, mode=filemode)
|
file_handler = logging.FileHandler(file_path, mode=filemode)
|
||||||
logger.addHandler(file_handler)
|
logger.addHandler(file_handler)
|
||||||
|
|
||||||
file_format = logging.Formatter(fmt='%(asctime)s - %(levelname)s - %(message)s '
|
file_format = logging.Formatter(fmt="%(asctime)s - %(levelname)s - %(message)s "
|
||||||
'[%(filename)s:%(lineno)d in %(funcName)s]')
|
"[%(filename)s:%(lineno)d in %(funcName)s]")
|
||||||
file_handler.setFormatter(file_format)
|
file_handler.setFormatter(file_format)
|
||||||
logger.setLevel(logging_level)
|
logger.setLevel(logging_level)
|
||||||
return logger
|
return logger
|
||||||
|
|
||||||
|
|
||||||
def local_convert_book(book_type: [DocxBook, EpubBook], book_id, logger, params: dict):
|
def local_convert_book(book_type: [DocxBook, EpubBook], book_id, logger, params: dict):
|
||||||
logger.info(f'Start processing book-{book_id}.')
|
logger.info(f"Start processing book-{book_id}.")
|
||||||
try:
|
try:
|
||||||
json_file_path = 'json/9781614382264.json'
|
json_file_path = "json/9781614382264.json"
|
||||||
book = book_type(book_id=book_id, main_logger=logger, **params)
|
book = book_type(book_id=book_id, main_logger=logger, **params)
|
||||||
book.conversion_local(json_file_path)
|
book.conversion_local(json_file_path)
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
raise exc
|
raise exc
|
||||||
logger.info(f'Book-{book_id} has been proceeded.')
|
logger.info(f"Book-{book_id} has been proceeded.")
|
||||||
|
|
||||||
|
|
||||||
def convert_book(book_type: [DocxBook, EpubBook], book_id, logger, params: dict):
|
def convert_book(book_type: [DocxBook, EpubBook], book_id, logger, params: dict):
|
||||||
logger.info(f'Start processing book-{book_id}.')
|
logger.info(f"Start processing book-{book_id}.")
|
||||||
try:
|
try:
|
||||||
book = book_type(book_id=book_id, main_logger=logger, **params)
|
book = book_type(book_id=book_id, main_logger=logger, **params)
|
||||||
book.conversion()
|
book.conversion()
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
raise exc
|
raise exc
|
||||||
logger.info(f'Book-{book_id} has been proceeded.')
|
logger.info(f"Book-{book_id} has been proceeded.")
|
||||||
|
|
||||||
|
|
||||||
def callback(ch, method, properties, body, logger, libre_locker):
|
def callback(ch, method, properties, body, logger, libre_locker):
|
||||||
print(f'Message: {body}.')
|
print(f"Message: {body}.")
|
||||||
logger.info(f'Message: {body}.')
|
logger.info(f"Message: {body}.")
|
||||||
try:
|
try:
|
||||||
data = json.loads(body)
|
data = json.loads(body)
|
||||||
assert 'apiURL' in data, 'No apiURL field in received message.'
|
assert "apiURL" in data, "No apiURL field in received message."
|
||||||
assert data.get('fileExtension') in [
|
assert data.get("fileExtension") in [
|
||||||
'epub', 'docx'], 'Wrong book type received.'
|
"epub", "docx"], "Wrong book type received."
|
||||||
|
|
||||||
book_params = {
|
book_params = {
|
||||||
'access': Access(url=data['apiURL']),
|
"access": Access(url=data["apiURL"]),
|
||||||
}
|
}
|
||||||
if data.get('fileExtension') == 'docx':
|
if data.get("fileExtension") == "docx":
|
||||||
book_params.update({'libre_locker': libre_locker})
|
book_params.update({"libre_locker": libre_locker})
|
||||||
|
|
||||||
params = {
|
params = {
|
||||||
'book_type': EpubBook if data.get('fileExtension') == 'epub' else DocxBook,
|
"book_type": EpubBook if data.get("fileExtension") == "epub" else DocxBook,
|
||||||
'book_id': data['id'],
|
"book_id": data["id"],
|
||||||
'logger': logger,
|
"logger": logger,
|
||||||
'params': book_params
|
"params": book_params
|
||||||
}
|
}
|
||||||
|
|
||||||
thread = Thread(target=convert_book, kwargs=params)
|
thread = Thread(target=convert_book, kwargs=params)
|
||||||
thread.start()
|
thread.start()
|
||||||
logging.log(logging.INFO, f'Active threads: {active_count()}.')
|
logging.log(logging.INFO, f"Active threads: {active_count()}.")
|
||||||
# print(f'Active threads: {active_count()}.')
|
# print(f"Active threads: {active_count()}.")
|
||||||
|
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
if hasattr(exc, 'message'):
|
if hasattr(exc, "message"):
|
||||||
logger.error(f'{sys.exc_info()[0]}: {exc.message}')
|
logger.error(f"{sys.exc_info()[0]}: {exc.message}")
|
||||||
else:
|
else:
|
||||||
logger.error(f'{sys.exc_info()[0]}: {str(exc)}')
|
logger.error(f"{sys.exc_info()[0]}: {str(exc)}")
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
def server_run():
|
def server_run():
|
||||||
logger = configure_file_logger('consumer')
|
logger = configure_file_logger("consumer")
|
||||||
|
|
||||||
folder_path = os.path.dirname(os.path.abspath(__file__))
|
|
||||||
config_path = Path(os.path.join(folder_path, "config/queue_config.json"))
|
|
||||||
with open(config_path, "r") as f:
|
|
||||||
conf_param = json.load(f)
|
|
||||||
|
|
||||||
host = conf_param.get('host') or pika.ConnectionParameters().DEFAULT_HOST
|
|
||||||
port = conf_param.get('port') or pika.ConnectionParameters().DEFAULT_PORT
|
|
||||||
channel = None
|
|
||||||
try:
|
try:
|
||||||
|
folder_path = os.path.dirname(os.path.abspath(__file__))
|
||||||
|
config_path = Path(os.path.join(folder_path, "config/queue_config.json"))
|
||||||
|
with open(config_path, "r") as f:
|
||||||
|
conf_param = json.load(f)
|
||||||
|
|
||||||
|
host = conf_param.get("host") or pika.ConnectionParameters().DEFAULT_HOST
|
||||||
|
port = conf_param.get("port") or pika.ConnectionParameters().DEFAULT_PORT
|
||||||
|
channel = None
|
||||||
credentials = pika.PlainCredentials(
|
credentials = pika.PlainCredentials(
|
||||||
username=conf_param['username'], password=conf_param['password'])
|
username=conf_param["username"], password=conf_param["password"])
|
||||||
parameters = pika.ConnectionParameters(
|
parameters = pika.ConnectionParameters(
|
||||||
host=host, port=port, credentials=credentials)
|
host=host, port=port, credentials=credentials)
|
||||||
connection = pika.BlockingConnection(parameters)
|
connection = pika.BlockingConnection(parameters)
|
||||||
channel = connection.channel()
|
channel = connection.channel()
|
||||||
except Exception as exc:
|
channel.queue_declare(queue=conf_param["queue"], durable=True, arguments={
|
||||||
logger.log(logging.ERROR,
|
"x-max-priority": 10})
|
||||||
f'Problems with queue connection.\n' + str(exc))
|
except TypeError as exc:
|
||||||
raise exc
|
print("TypeError: problem with config, " + str(exc))
|
||||||
|
|
||||||
try:
|
|
||||||
channel.queue_declare(queue=conf_param['queue'], durable=True, arguments={
|
|
||||||
'x-max-priority': 10})
|
|
||||||
except ValueError as exc:
|
except ValueError as exc:
|
||||||
logger.log(logging.ERROR,
|
logger.log(logging.ERROR,
|
||||||
f'Queue {conf_param["queue"]} is not declared.')
|
f"Queue {conf_param['queue']} is not declared.")
|
||||||
raise exc
|
raise exc
|
||||||
|
|
||||||
locker = Event()
|
locker = Event()
|
||||||
locker.set()
|
locker.set()
|
||||||
channel.basic_consume(queue=conf_param['queue'],
|
channel.basic_consume(queue=conf_param["queue"],
|
||||||
auto_ack=True,
|
auto_ack=True,
|
||||||
on_message_callback=partial(callback, logger=logger, libre_locker=locker))
|
on_message_callback=partial(callback, logger=logger, libre_locker=locker))
|
||||||
logger.info('Connection has been established.')
|
logger.info("Connection has been established.")
|
||||||
print('Waiting for messages...')
|
print("Waiting for messages...")
|
||||||
logger.info('Waiting for messages...')
|
logger.info("Waiting for messages...")
|
||||||
|
|
||||||
channel.start_consuming()
|
channel.start_consuming()
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == "__main__":
|
||||||
server_run()
|
server_run()
|
||||||
|
|||||||
Reference in New Issue
Block a user