forked from LiveCarta/LiveCartaMeta
first commit
This commit is contained in:
108
sources/BaseSource.py
Normal file
108
sources/BaseSource.py
Normal file
@@ -0,0 +1,108 @@
|
||||
import hashlib
|
||||
import importlib
|
||||
import json
|
||||
|
||||
import mongoengine
|
||||
from pydantic import BaseModel, Extra
|
||||
|
||||
from components.SenderComponent import SenderComponent
|
||||
from configs.config import config
|
||||
from models.MetaData import MetaData
|
||||
from sources.source_types.AbstractSource import AbstractSource
|
||||
|
||||
|
||||
class BaseSource(BaseModel):
|
||||
source_name: str
|
||||
source: dict
|
||||
parser_type: dict
|
||||
|
||||
sourceObject: AbstractSource = None
|
||||
senderComponent: SenderComponent = None
|
||||
|
||||
class Config:
|
||||
extra = Extra.allow
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self.senderComponent = SenderComponent(**config.get_main_app_creds())
|
||||
|
||||
def get_source_object(config):
|
||||
class_name = config.type.capitalize() + "Source"
|
||||
module_name = 'sources.source_types.' + class_name
|
||||
module = importlib.import_module(module_name)
|
||||
module_class = getattr(module, class_name)
|
||||
return module_class(source_name=self.source_name, **config)
|
||||
|
||||
def get_parser(config):
|
||||
class_name = config.format.capitalize() + "Parser"
|
||||
module_name = 'sources.file_types.' + class_name
|
||||
module = importlib.import_module(module_name)
|
||||
module_class = getattr(module, class_name)
|
||||
return module_class(**config)
|
||||
|
||||
self.sourceObject = get_source_object(self.source)
|
||||
|
||||
if (self.sourceObject.is_parser_needed()):
|
||||
self.sourceObject.file_parser = get_parser(self.parser_type)
|
||||
|
||||
def check_is_update_needed(self):
|
||||
return self.sourceObject.check_is_update_needed()
|
||||
|
||||
def check_is_new_row(self, hash):
|
||||
try:
|
||||
MetaData.objects().get(row_hash=hash)
|
||||
return False
|
||||
except MetaData.DoesNotExist:
|
||||
return True
|
||||
|
||||
def save_row_hash(self, row_hash, data, file_hash=None): # TODO: is origina_data needed?
|
||||
MetaData(file_hash=file_hash, row_hash=row_hash, original_data=data).save()
|
||||
|
||||
def __bulk_save(self, hashes, data):
|
||||
if self.senderComponent.send_data({'data': data, 'source_name': self.source_name}):
|
||||
instances = [MetaData(**data) for data in hashes]
|
||||
try:
|
||||
MetaData.objects.insert(instances, load_bulk=False)
|
||||
except mongoengine.errors.BulkWriteError as error:
|
||||
print(error)
|
||||
self.__insert_row_by_row(instances)
|
||||
else:
|
||||
raise Exception("Error on saving data.")
|
||||
|
||||
def __insert_row_by_row(self, instances):
|
||||
for model in instances:
|
||||
try:
|
||||
model.save()
|
||||
except mongoengine.errors.BulkWriteError:
|
||||
print("Duplicate row with id: {id}".format(id=model.row_hash))
|
||||
|
||||
def update_data(self):
|
||||
data_iterator = self.sourceObject.parse_data()
|
||||
|
||||
inserted_rows = 0
|
||||
# try:
|
||||
data_to_send = []
|
||||
hashes = []
|
||||
for row in data_iterator:
|
||||
dhash = hashlib.sha1()
|
||||
encoded = json.dumps(row, sort_keys=True).encode()
|
||||
dhash.update(encoded)
|
||||
row_hash = dhash.hexdigest()
|
||||
if self.check_is_new_row(row_hash):
|
||||
data_to_send.append(row)
|
||||
hashes.append({"row_hash": row_hash, "file_hash": self.sourceObject.active_file_hash()})
|
||||
if len(hashes) >= config.get_bulk_insert_limit():
|
||||
self.__bulk_save(hashes, data_to_send)
|
||||
inserted_rows += len(hashes)
|
||||
data_to_send = []
|
||||
hashes = []
|
||||
|
||||
if len(hashes) > 0:
|
||||
self.__bulk_save(hashes, data_to_send)
|
||||
self.sourceObject.after_save()
|
||||
# except Exception as inst:
|
||||
# print('Undefined error! Data not updated')
|
||||
# print(type(inst))
|
||||
# print(inst.args)
|
||||
|
||||
return inserted_rows
|
||||
13
sources/file_types/AbstractParser.py
Normal file
13
sources/file_types/AbstractParser.py
Normal file
@@ -0,0 +1,13 @@
|
||||
from abc import ABC, abstractmethod
|
||||
from pydantic import Extra, BaseModel
|
||||
|
||||
|
||||
class AbstractParser(BaseModel, ABC):
|
||||
|
||||
|
||||
class Config:
|
||||
extra = Extra.allow
|
||||
|
||||
@abstractmethod
|
||||
def parse(self, source):
|
||||
pass
|
||||
12
sources/file_types/CsvParser.py
Normal file
12
sources/file_types/CsvParser.py
Normal file
@@ -0,0 +1,12 @@
|
||||
import csv
|
||||
|
||||
from sources.file_types.AbstractParser import AbstractParser
|
||||
|
||||
|
||||
class CsvParser(AbstractParser):
|
||||
|
||||
def parse(self, source):
|
||||
with open(source, 'r') as item:
|
||||
reader = csv.DictReader(item)
|
||||
for line in reader:
|
||||
yield line
|
||||
37
sources/source_types/AbstractSource.py
Normal file
37
sources/source_types/AbstractSource.py
Normal file
@@ -0,0 +1,37 @@
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
from pydantic import Extra, BaseModel
|
||||
|
||||
from sources.file_types.AbstractParser import AbstractParser
|
||||
|
||||
|
||||
class AbstractSource(BaseModel, ABC):
|
||||
file_hash: str = None
|
||||
source_name: str = None
|
||||
file_parser: AbstractParser = None
|
||||
|
||||
class Config:
|
||||
extra = Extra.allow
|
||||
|
||||
def __init__(self, source_name, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
if not source_name:
|
||||
raise Exception('You have to add "source_name" to your config!')
|
||||
self.source_name = source_name
|
||||
|
||||
@abstractmethod
|
||||
def check_is_update_needed(self):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def parse_data(self):
|
||||
pass
|
||||
|
||||
def is_parser_needed(self):
|
||||
return False
|
||||
|
||||
def active_file_hash(self):
|
||||
return self.file_hash
|
||||
|
||||
def after_save(self):
|
||||
pass
|
||||
58
sources/source_types/FtpSource.py
Normal file
58
sources/source_types/FtpSource.py
Normal file
@@ -0,0 +1,58 @@
|
||||
import os
|
||||
|
||||
from components.FileComponent import FileComponent
|
||||
from components.FtpDownloader import FtpDownloader
|
||||
from sources.source_types.AbstractSource import AbstractSource
|
||||
|
||||
|
||||
class FtpSource(AbstractSource):
|
||||
|
||||
# config
|
||||
ftp_url: str
|
||||
ftp_login: str
|
||||
ftp_password: str
|
||||
file_regex: str
|
||||
local_files_path: str
|
||||
|
||||
# additional params
|
||||
source_files: list = []
|
||||
files_for_update: list = []
|
||||
local_files: list = []
|
||||
|
||||
def __init__(self, source_name, **kwargs):
|
||||
super().__init__(source_name, **kwargs)
|
||||
self.fileComponent = FileComponent(local_files_path=self.local_files_path)
|
||||
self.local_files = self.fileComponent.get_parsed_files(source_name)
|
||||
self.ftpClient = FtpDownloader(url=self.ftp_url, login=self.ftp_login, password=self.ftp_password)
|
||||
self.source_files = self.ftpClient.get_files_with_checksum('/', self.file_regex)
|
||||
self.__files_for_update()
|
||||
|
||||
def __files_for_update(self):
|
||||
local_hashes = [f.get('hash') for f in self.local_files]
|
||||
self.files_for_update = [f for f in self.source_files if f.get('hash') not in local_hashes]
|
||||
return self.files_for_update
|
||||
|
||||
def is_parser_needed(self):
|
||||
return True
|
||||
|
||||
def download_file(self, file):
|
||||
if not os.path.exists(self.local_files_path):
|
||||
os.makedirs(self.local_files_path)
|
||||
self.ftpClient.download_file(file, self.local_files_path + os.path.basename(file))
|
||||
|
||||
def check_is_update_needed(self):
|
||||
return len(self.files_for_update) > 0
|
||||
|
||||
def parse_data(self):
|
||||
for f in self.files_for_update:
|
||||
self.download_file(f.get('file'))
|
||||
self.file_hash = f.get('hash')
|
||||
file_path = self.local_files_path + f.get('file')
|
||||
data = self.file_parser.parse(file_path)
|
||||
for line in data:
|
||||
yield line
|
||||
|
||||
def after_save(self):
|
||||
for f in self.files_for_update:
|
||||
self.fileComponent.add_parsed_file(self.source_name, f.get('file'), f.get('hash'), f.get('updated_at'))
|
||||
self.fileComponent.remove_file(self.local_files_path + f.get('file'))
|
||||
Reference in New Issue
Block a user