This repository has been archived on 2026-04-06. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
LiveCartaMeta/sources/BaseSource.py
2023-01-30 10:45:26 +03:00

111 lines
3.9 KiB
Python

import hashlib
import importlib
import json
import mongoengine
from pydantic import BaseModel, Extra
from components.SenderComponent import SenderComponent
from configs.config import config
from configs.configs import CONFIGS
from models.MetaData import MetaData
from sources.source_types.AbstractSource import AbstractSource
class BaseSource(BaseModel):
source_name: str
source: dict
parser_type: dict
sourceObject: AbstractSource = None
senderComponent: SenderComponent = None
class Config:
extra = Extra.allow
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.senderComponent = SenderComponent(api_url=CONFIGS['application_credentials']['api_url'], api_key=CONFIGS['application_credentials']['api_key'])
def get_source_object(config):
class_name = config.type.capitalize() + "Source"
module_name = 'sources.source_types.' + class_name
module = importlib.import_module(module_name)
module_class = getattr(module, class_name)
return module_class(source_name=self.source_name, **config)
def get_parser(config):
class_name = config.format.capitalize() + "Parser"
module_name = 'sources.file_types.' + class_name
module = importlib.import_module(module_name)
module_class = getattr(module, class_name)
return module_class(**config)
self.sourceObject = get_source_object(self.source)
if (self.sourceObject.is_parser_needed()):
self.sourceObject.file_parser = get_parser(self.parser_type)
def check_is_update_needed(self):
return self.sourceObject.check_is_update_needed()
def check_is_new_row(self, hash):
try:
MetaData.objects().get(row_hash=hash)
return False
except MetaData.DoesNotExist:
return True
def save_row_hash(self, row_hash, data, file_hash=None): # TODO: is origina_data needed?
MetaData(file_hash=file_hash, row_hash=row_hash, original_data=data).save()
def __bulk_save(self, hashes, data):
if self.senderComponent.send_data({'data': data, 'source_name': self.source_name}):
instances = [MetaData(**data) for data in hashes]
try:
MetaData.objects.insert(instances, load_bulk=False)
except mongoengine.errors.BulkWriteError as error:
print(error)
self.__insert_row_by_row(instances)
else:
raise Exception("Error on saving data.")
def __insert_row_by_row(self, instances):
for model in instances:
try:
model.save()
except mongoengine.errors.BulkWriteError:
print("Duplicate row with id: {id}".format(id=model.row_hash))
def update_data(self):
data_iterator = self.sourceObject.parse_data()
inserted_rows = 0
# try:
data_to_send = []
hashes = []
for row in data_iterator:
dhash = hashlib.sha1()
encoded = json.dumps(row, sort_keys=True).encode()
dhash.update(encoded)
row_hash = dhash.hexdigest()
if self.check_is_new_row(row_hash):
data_to_send.append(row)
hashes.append({"row_hash": row_hash, "file_hash": self.sourceObject.active_file_hash()})
if len(hashes) >= config.get_bulk_insert_limit():
self.__bulk_save(hashes, data_to_send)
inserted_rows += len(hashes)
data_to_send = []
hashes = []
if len(hashes) > 0:
self.__bulk_save(hashes, data_to_send)
self.sourceObject.after_save()
# except Exception as inst:
# print('Undefined error! Data not updated')
# print(type(inst))
# print(inst.args)
return inserted_rows