import hashlib import importlib import json import mongoengine from pydantic import BaseModel, Extra from components.SenderComponent import SenderComponent from configs.config import config from models.MetaData import MetaData from sources.source_types.AbstractSource import AbstractSource class BaseSource(BaseModel): source_name: str source: dict parser_type: dict sourceObject: AbstractSource = None senderComponent: SenderComponent = None class Config: extra = Extra.allow def __init__(self, **kwargs): super().__init__(**kwargs) self.senderComponent = SenderComponent(**config.get_main_app_creds()) def get_source_object(config): class_name = config.type.capitalize() + "Source" module_name = 'sources.source_types.' + class_name module = importlib.import_module(module_name) module_class = getattr(module, class_name) return module_class(source_name=self.source_name, **config) def get_parser(config): class_name = config.format.capitalize() + "Parser" module_name = 'sources.file_types.' + class_name module = importlib.import_module(module_name) module_class = getattr(module, class_name) return module_class(**config) self.sourceObject = get_source_object(self.source) if (self.sourceObject.is_parser_needed()): self.sourceObject.file_parser = get_parser(self.parser_type) def check_is_update_needed(self): return self.sourceObject.check_is_update_needed() def check_is_new_row(self, hash): try: MetaData.objects().get(row_hash=hash) return False except MetaData.DoesNotExist: return True def save_row_hash(self, row_hash, data, file_hash=None): # TODO: is origina_data needed? MetaData(file_hash=file_hash, row_hash=row_hash, original_data=data).save() def __bulk_save(self, hashes, data): if self.senderComponent.send_data({'data': data, 'source_name': self.source_name}): instances = [MetaData(**data) for data in hashes] try: MetaData.objects.insert(instances, load_bulk=False) except mongoengine.errors.BulkWriteError as error: print(error) self.__insert_row_by_row(instances) else: raise Exception("Error on saving data.") def __insert_row_by_row(self, instances): for model in instances: try: model.save() except mongoengine.errors.BulkWriteError: print("Duplicate row with id: {id}".format(id=model.row_hash)) def update_data(self): data_iterator = self.sourceObject.parse_data() inserted_rows = 0 # try: data_to_send = [] hashes = [] for row in data_iterator: dhash = hashlib.sha1() encoded = json.dumps(row, sort_keys=True).encode() dhash.update(encoded) row_hash = dhash.hexdigest() if self.check_is_new_row(row_hash): data_to_send.append(row) hashes.append({"row_hash": row_hash, "file_hash": self.sourceObject.active_file_hash()}) if len(hashes) >= config.get_bulk_insert_limit(): self.__bulk_save(hashes, data_to_send) inserted_rows += len(hashes) data_to_send = [] hashes = [] if len(hashes) > 0: self.__bulk_save(hashes, data_to_send) self.sourceObject.after_save() # except Exception as inst: # print('Undefined error! Data not updated') # print(type(inst)) # print(inst.args) return inserted_rows