import json import requests from dynaconf import Dynaconf, Validator from mongoengine import connect from configs.configs import CONFIGS class AppConfig: def __init__(self): self.refresh() connect( db=CONFIGS['db']['name'], username=CONFIGS['db']['username'], password=CONFIGS['db']['password'] host="mongodb://{host}:27017/{database}".format(host=CONFIGS['db']['host'], database=CONFIGS['db']['name']) ) def refresh(self): self.config = Dynaconf(settings_files=[ "/app/configs/main.json", "/app/configs/sources.json" ]) def get_bulk_insert_limit(self): if not self.config.bulk_limit: return 1 else: return self.config.bulk_limit def get_source_by_name(self, name: str): if name not in self.config.sources: raise ValueError(f'"{name}" source not exists!') return self.config.sources[name] def get_sources_list(self): return self.config.sources.keys() def update_sources(self): headers = { 'Content-type': 'application/json', 'Authorization': 'Bearer {key}'.format(key=CONFIGS['application_credentials']['api_key']) } r = requests.get(CONFIGS['application_credentials']['api_url'] + "sources", headers=headers) if r.status_code != 200: raise Exception('Bad app response') new_config = {"sources": {}} for source in r.json(): new_config["sources"][source["source_name"]] = source with open("./configs/sources.json", "w") as outfile: outfile.write(json.dumps(new_config)) self.refresh() config = AppConfig()