forked from LiveCarta/LiveCartaMeta
111 lines
3.9 KiB
Python
111 lines
3.9 KiB
Python
import hashlib
|
|
import importlib
|
|
import json
|
|
|
|
import mongoengine
|
|
from pydantic import BaseModel, Extra
|
|
|
|
from components.SenderComponent import SenderComponent
|
|
from configs.config import config
|
|
from configs.configs import CONFIGS
|
|
from models.MetaData import MetaData
|
|
from sources.source_types.AbstractSource import AbstractSource
|
|
|
|
|
|
|
|
class BaseSource(BaseModel):
|
|
source_name: str
|
|
source: dict
|
|
parser_type: dict
|
|
|
|
sourceObject: AbstractSource = None
|
|
senderComponent: SenderComponent = None
|
|
|
|
class Config:
|
|
extra = Extra.allow
|
|
|
|
def __init__(self, **kwargs):
|
|
super().__init__(**kwargs)
|
|
self.senderComponent = SenderComponent(api_url=CONFIGS['application_credentials']['api_url'], api_key=CONFIGS['application_credentials']['api_key'])
|
|
|
|
def get_source_object(config):
|
|
class_name = config.type.capitalize() + "Source"
|
|
module_name = 'sources.source_types.' + class_name
|
|
module = importlib.import_module(module_name)
|
|
module_class = getattr(module, class_name)
|
|
return module_class(source_name=self.source_name, **config)
|
|
|
|
def get_parser(config):
|
|
class_name = config.format.capitalize() + "Parser"
|
|
module_name = 'sources.file_types.' + class_name
|
|
module = importlib.import_module(module_name)
|
|
module_class = getattr(module, class_name)
|
|
return module_class(**config)
|
|
|
|
self.sourceObject = get_source_object(self.source)
|
|
|
|
if (self.sourceObject.is_parser_needed()):
|
|
self.sourceObject.file_parser = get_parser(self.parser_type)
|
|
|
|
def check_is_update_needed(self):
|
|
return self.sourceObject.check_is_update_needed()
|
|
|
|
def check_is_new_row(self, hash):
|
|
try:
|
|
MetaData.objects().get(row_hash=hash)
|
|
return False
|
|
except MetaData.DoesNotExist:
|
|
return True
|
|
|
|
def save_row_hash(self, row_hash, data, file_hash=None): # TODO: is origina_data needed?
|
|
MetaData(file_hash=file_hash, row_hash=row_hash, original_data=data).save()
|
|
|
|
def __bulk_save(self, hashes, data):
|
|
if self.senderComponent.send_data({'data': data, 'source_name': self.source_name}):
|
|
instances = [MetaData(**data) for data in hashes]
|
|
try:
|
|
MetaData.objects.insert(instances, load_bulk=False)
|
|
except mongoengine.errors.BulkWriteError as error:
|
|
print(error)
|
|
self.__insert_row_by_row(instances)
|
|
else:
|
|
raise Exception("Error on saving data.")
|
|
|
|
def __insert_row_by_row(self, instances):
|
|
for model in instances:
|
|
try:
|
|
model.save()
|
|
except mongoengine.errors.BulkWriteError:
|
|
print("Duplicate row with id: {id}".format(id=model.row_hash))
|
|
|
|
def update_data(self):
|
|
data_iterator = self.sourceObject.parse_data()
|
|
|
|
inserted_rows = 0
|
|
# try:
|
|
data_to_send = []
|
|
hashes = []
|
|
for row in data_iterator:
|
|
dhash = hashlib.sha1()
|
|
encoded = json.dumps(row, sort_keys=True).encode()
|
|
dhash.update(encoded)
|
|
row_hash = dhash.hexdigest()
|
|
if self.check_is_new_row(row_hash):
|
|
data_to_send.append(row)
|
|
hashes.append({"row_hash": row_hash, "file_hash": self.sourceObject.active_file_hash()})
|
|
if len(hashes) >= config.get_bulk_insert_limit():
|
|
self.__bulk_save(hashes, data_to_send)
|
|
inserted_rows += len(hashes)
|
|
data_to_send = []
|
|
hashes = []
|
|
|
|
if len(hashes) > 0:
|
|
self.__bulk_save(hashes, data_to_send)
|
|
self.sourceObject.after_save()
|
|
# except Exception as inst:
|
|
# print('Undefined error! Data not updated')
|
|
# print(type(inst))
|
|
# print(inst.args)
|
|
|
|
return inserted_rows
|