forked from LiveCarta/LiveCartaMeta
59 lines
1.7 KiB
Python
59 lines
1.7 KiB
Python
import json
|
|
|
|
import requests
|
|
from dynaconf import Dynaconf, Validator
|
|
from mongoengine import connect
|
|
|
|
from configs.configs import CONFIGS
|
|
|
|
|
|
class AppConfig:
|
|
def __init__(self):
|
|
self.refresh()
|
|
connect(
|
|
db=CONFIGS['db']['name'],
|
|
username=CONFIGS['db']['username'],
|
|
password=CONFIGS['db']['password']
|
|
host="mongodb://{host}:27017/{database}".format(host=CONFIGS['db']['host'], database=CONFIGS['db']['name'])
|
|
)
|
|
|
|
def refresh(self):
|
|
self.config = Dynaconf(settings_files=[
|
|
"/app/configs/main.json",
|
|
"/app/configs/sources.json"
|
|
])
|
|
|
|
def get_bulk_insert_limit(self):
|
|
if not self.config.bulk_limit:
|
|
return 1
|
|
else:
|
|
return self.config.bulk_limit
|
|
|
|
def get_source_by_name(self, name: str):
|
|
if name not in self.config.sources:
|
|
raise ValueError(f'"{name}" source not exists!')
|
|
return self.config.sources[name]
|
|
|
|
def get_sources_list(self):
|
|
return self.config.sources.keys()
|
|
|
|
def update_sources(self):
|
|
headers = {
|
|
'Content-type': 'application/json',
|
|
'Authorization': 'Bearer {key}'.format(key=CONFIGS['application_credentials']['api_key'])
|
|
}
|
|
r = requests.get(CONFIGS['application_credentials']['api_url'] + "sources", headers=headers)
|
|
if r.status_code != 200:
|
|
|
|
raise Exception('Bad app response')
|
|
|
|
new_config = {"sources": {}}
|
|
for source in r.json():
|
|
new_config["sources"][source["source_name"]] = source
|
|
|
|
with open("./configs/sources.json", "w") as outfile:
|
|
outfile.write(json.dumps(new_config))
|
|
|
|
self.refresh()
|
|
|
|
config = AppConfig() |