import hashlib import importlib import json import mongoengine from pydantic import BaseModel, Extra from components.SenderComponent import SenderComponent from configs.config import config from configs.configs import CONFIGS from models.MetaData import MetaData from sources.source_types.AbstractSource import AbstractSource class BaseSource(BaseModel): source_name: str source: dict parser_type: dict sourceObject: AbstractSource = None senderComponent: SenderComponent = None class Config: extra = Extra.allow def __init__(self, **kwargs): super().__init__(**kwargs) self.senderComponent = SenderComponent(api_url=CONFIGS['application_credentials']['api_url'], api_key=CONFIGS['application_credentials']['api_key']) def get_source_object(config): class_name = config.type.capitalize() + "Source" module_name = 'sources.source_types.' + class_name module = importlib.import_module(module_name) module_class = getattr(module, class_name) return module_class(source_name=self.source_name, **config) def get_parser(config): class_name = config.format.capitalize() + "Parser" module_name = 'sources.file_types.' + class_name module = importlib.import_module(module_name) module_class = getattr(module, class_name) return module_class(**config) self.sourceObject = get_source_object(self.source) if (self.sourceObject.is_parser_needed()): self.sourceObject.file_parser = get_parser(self.parser_type) def check_is_update_needed(self): return self.sourceObject.check_is_update_needed() def check_is_new_row(self, hash): try: MetaData.objects().get(row_hash=hash) return False except MetaData.DoesNotExist: return True def save_row_hash(self, row_hash, data, file_hash=None): # TODO: is origina_data needed? MetaData(file_hash=file_hash, row_hash=row_hash, original_data=data).save() def __bulk_save(self, hashes, data): if self.senderComponent.send_data({'data': data, 'source_name': self.source_name}): instances = [MetaData(**data) for data in hashes] try: MetaData.objects.insert(instances, load_bulk=False) except mongoengine.errors.BulkWriteError as error: print(error) self.__insert_row_by_row(instances) else: raise Exception("Error on saving data.") def __insert_row_by_row(self, instances): for model in instances: try: model.save() except mongoengine.errors.BulkWriteError: print("Duplicate row with id: {id}".format(id=model.row_hash)) def update_data(self): data_iterator = self.sourceObject.parse_data() inserted_rows = 0 # try: data_to_send = [] hashes = [] for row in data_iterator: dhash = hashlib.sha1() encoded = json.dumps(row, sort_keys=True).encode() dhash.update(encoded) row_hash = dhash.hexdigest() if self.check_is_new_row(row_hash): data_to_send.append(row) hashes.append({"row_hash": row_hash, "file_hash": self.sourceObject.active_file_hash()}) if len(hashes) >= config.get_bulk_insert_limit(): self.__bulk_save(hashes, data_to_send) inserted_rows += len(hashes) data_to_send = [] hashes = [] if len(hashes) > 0: self.__bulk_save(hashes, data_to_send) self.sourceObject.after_save() # except Exception as inst: # print('Undefined error! Data not updated') # print(type(inst)) # print(inst.args) return inserted_rows