From 66fb1cb5d6d72d7c210e3676561fbd61ba67320c Mon Sep 17 00:00:00 2001 From: Jeniamakarchik Date: Fri, 10 Apr 2020 17:02:39 +0300 Subject: [PATCH] fix consumer for run with queue --- src/consumer.py | 64 ++++++++++++++++++++++++------------------------- 1 file changed, 32 insertions(+), 32 deletions(-) diff --git a/src/consumer.py b/src/consumer.py index 0d5a449..f39b9c3 100644 --- a/src/consumer.py +++ b/src/consumer.py @@ -92,36 +92,36 @@ def local_run(filename): if __name__ == '__main__': - # folder_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) - # # config_path = Path(os.path.join(folder_path, "config/config.json")) - # config_path = Path(os.path.join(folder_path, "config/queue_config.json")) - # with open(config_path, "r") as f: - # conf_param = json.load(f) - # - # host = conf_param.get('host') or pika.ConnectionParameters().DEFAULT_HOST - # port = conf_param.get('port') or pika.ConnectionParameters().DEFAULT_PORT - # - # credentials = pika.PlainCredentials(username=conf_param['username'], password=conf_param['password']) - # parameters = pika.ConnectionParameters(host=host, port=port, credentials=credentials) - # connection = pika.BlockingConnection(parameters) - # channel = connection.channel() - # - # logger = configure_file_logger('consumer', logging_format='%(asctime)s - %(levelname)s - %(message)s') - # - # try: - # channel.queue_declare(queue=conf_param['queue'], durable=True, arguments={'x-max-priority': 10}) - # except ValueError as exc: - # logger.log(logging.ERROR, f'Queue {conf_param["queue"]} is not declared.') - # raise exc - # - # acs = Access() - # channel.basic_consume(queue=conf_param['queue'], - # auto_ack=True, - # on_message_callback=partial(callback, access=acs, logger=logger)) - # logger.info('Connection has been established.') - # print('Waiting for messages...') - # logger.info('Waiting for messages...') - # - # channel.start_consuming() + folder_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + # config_path = Path(os.path.join(folder_path, "config/config.json")) + config_path = Path(os.path.join(folder_path, "config/queue_config.json")) + with open(config_path, "r") as f: + conf_param = json.load(f) - local_run('music') + host = conf_param.get('host') or pika.ConnectionParameters().DEFAULT_HOST + port = conf_param.get('port') or pika.ConnectionParameters().DEFAULT_PORT + + credentials = pika.PlainCredentials(username=conf_param['username'], password=conf_param['password']) + parameters = pika.ConnectionParameters(host=host, port=port, credentials=credentials) + connection = pika.BlockingConnection(parameters) + channel = connection.channel() + + logger = configure_file_logger('consumer', logging_format='%(asctime)s - %(levelname)s - %(message)s') + + try: + channel.queue_declare(queue=conf_param['queue'], durable=True, arguments={'x-max-priority': 10}) + except ValueError as exc: + logger.log(logging.ERROR, f'Queue {conf_param["queue"]} is not declared.') + raise exc + + acs = Access() + channel.basic_consume(queue=conf_param['queue'], + auto_ack=True, + on_message_callback=partial(callback, access=acs, logger=logger)) + logger.info('Connection has been established.') + print('Waiting for messages...') + logger.info('Waiting for messages...') + + channel.start_consuming() + + # local_run('quote')