From 2bde5c709a86c77ce13d8be78bbd2eceb65cbfe3 Mon Sep 17 00:00:00 2001 From: ekazak Date: Wed, 14 Dec 2022 12:36:37 +0100 Subject: [PATCH] first commit --- Dockerfile | 11 +++ README.md | 76 ++++++++++++++++- components/FileComponent.py | 21 +++++ components/FtpDownloader.py | 65 +++++++++++++++ components/SenderComponent.py | 23 ++++++ configs/application_credentials.json | 6 ++ configs/config.py | 64 +++++++++++++++ configs/db.json | 6 ++ configs/main.json | 3 + configs/sources.json | 0 models/File.py | 11 +++ models/MetaData.py | 9 +++ sources/BaseSource.py | 108 +++++++++++++++++++++++++ sources/file_types/AbstractParser.py | 13 +++ sources/file_types/CsvParser.py | 12 +++ sources/source_types/AbstractSource.py | 37 +++++++++ sources/source_types/FtpSource.py | 58 +++++++++++++ update.py | 49 +++++++++++ 18 files changed, 570 insertions(+), 2 deletions(-) create mode 100644 Dockerfile create mode 100644 components/FileComponent.py create mode 100644 components/FtpDownloader.py create mode 100644 components/SenderComponent.py create mode 100644 configs/application_credentials.json create mode 100644 configs/config.py create mode 100644 configs/db.json create mode 100644 configs/main.json create mode 100644 configs/sources.json create mode 100644 models/File.py create mode 100644 models/MetaData.py create mode 100644 sources/BaseSource.py create mode 100644 sources/file_types/AbstractParser.py create mode 100644 sources/file_types/CsvParser.py create mode 100644 sources/source_types/AbstractSource.py create mode 100644 sources/source_types/FtpSource.py create mode 100644 update.py diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..6339c6c --- /dev/null +++ b/Dockerfile @@ -0,0 +1,11 @@ +FROM python:3.12-rc-slim + +WORKDIR /app + +RUN pip install pymongo \ + dynaconf \ + pydantic \ + pymysql \ + mongoengine \ + dict_hash \ + requests diff --git a/README.md b/README.md index 10199ce..028b29c 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,74 @@ -# LiveCarta-microservices -microservices +# Book Meta Data Parser + +Microservice which solves only one issue – parse book meta data from our publishers. Not depends on what format publisher stores this data, the service must grub this information and send an array of data to the main application without any formatting. The main idea is to add components for parsing different formats and have the ability to add publishers just by updating config files. + + +## Version 1.0 + +Added two components for working with CSV and FTP. + + +## Tech Stack + + • Docker + • Python 3.11 + • MongoDb 6.0.2 + • Dynaconf + • Pydantic + • MongoEngine + + +## Folder structure + + + • app + ◦ components + ◦ configs + ▪ application_credentials.json – keys and url for connection to our main app + ▪ db.json – creds for service db + ▪ main.json – main config + ▪ sources.json – list of sources with components that they use + ◦ models + ◦ sources + ▪ file_types + ▪ source_types + + +## Sources configuration + +To configure a new source you need to update source config by adding the params below: + + • source_name + • source //with neccesary params for component + • parser_type //with neccesary params for component + +Example for CSV files from FTP: +```json + { + "sources": { + "McGrawHill": { + "source_name": "McGrawHill", + "source": { + "type": "ftp", + "ftp_url": "127.0.0.1", + "ftp_login": "frp_login", + "ftp_password": "frp_pass", + "local_files_path": "/app/files/McGrawHill/", + "file_regex": "*.csv" + }, + "parser_type": { + "format": "csv" + } + } + } + } +``` + +Each source parser starts by crontab by command + +`python update.py {source_name}` + + +To see list of source types use command + +`python update.py -h` diff --git a/components/FileComponent.py b/components/FileComponent.py new file mode 100644 index 0000000..d70a00a --- /dev/null +++ b/components/FileComponent.py @@ -0,0 +1,21 @@ +import os + +from pydantic import BaseModel + +from models.File import File + + +class FileComponent(BaseModel): + + local_files_path: str + + def get_parsed_files(self, source_name): + files = [{"file": self.local_files_path + f.file_path, "hash": f.file_hash} + for f in File.objects(source=source_name)] + return files + + def add_parsed_file(self, source, path, hash, time): + return File(source=source, file_path=path, file_hash=hash, file_updated_at=time).save() + + def remove_file(self, file): + os.remove(file) \ No newline at end of file diff --git a/components/FtpDownloader.py b/components/FtpDownloader.py new file mode 100644 index 0000000..5eb62d2 --- /dev/null +++ b/components/FtpDownloader.py @@ -0,0 +1,65 @@ +import hashlib + +from pydantic import BaseModel, Extra +from ftplib import FTP +from urllib.parse import urlparse + +class FtpDownloader(BaseModel): + url: str + login = '' + password = '' + + class Config: + extra = Extra.allow + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.__connect() + + def __del__(self): + self.ftp.close() + + def __connect(self): + parsed_uri = urlparse(self.url) + self.ftp = FTP(parsed_uri.netloc) + self.ftp.login(self.login, self.password) + if parsed_uri.path: + self.ftp.cwd(parsed_uri.path) + + def get_files_with_checksum(self, path, filetypeRegex = None): + if filetypeRegex: + path += filetypeRegex + files = self.ftp.nlst(path) + + file_list = [] + + for file_path in files: + m = hashlib.sha1() + self.ftp.retrbinary('RETR %s' % file_path, m.update) + updated_at = self.ftp.voidcmd("MDTM " + file_path)[4:].strip() + file_dict = {'file': file_path, 'hash': m.hexdigest(), 'updated_at': updated_at} + file_list.append(file_dict) + print(file_dict) + + file_list = sorted(file_list, key=lambda d: d['updated_at']) + return file_list + + def download_file(self, origin_file, to): + m = hashlib.sha1() + self.ftp.retrbinary('RETR %s' % origin_file, m.update) + checksum = m.hexdigest() + + # Write file in binary mode + with open(to, "wb") as file: + self.ftp.retrbinary(f"RETR {origin_file}", file.write) + + file.close() + file = open(to, "rb") + local_checksum = hashlib.sha1(file.read()).hexdigest() + + if checksum and local_checksum != checksum: + raise BaseException(f"Wrong checksum for file: {origin_file}") + + file.close() + print("file downloaded " + origin_file) + return True \ No newline at end of file diff --git a/components/SenderComponent.py b/components/SenderComponent.py new file mode 100644 index 0000000..c0d414d --- /dev/null +++ b/components/SenderComponent.py @@ -0,0 +1,23 @@ +import json +import requests + +from pydantic import BaseModel + + +class SenderComponent(BaseModel): + api_url: str + api_key: str + + def __generate_key(self): + return 'Bearer {key}'.format(key=self.api_key) + + def __headers(self): + return { + 'Authorization': self.__generate_key(), + 'Content-type': 'application/json', + } + + def send_data(self, data): + headers = self.__headers() + response = requests.post(self.api_url + 'data', data=json.dumps(data), headers=headers) + return response.status_code == 200 diff --git a/configs/application_credentials.json b/configs/application_credentials.json new file mode 100644 index 0000000..d3a1be3 --- /dev/null +++ b/configs/application_credentials.json @@ -0,0 +1,6 @@ +{ + "application_credentials": { + "api_url":"http://app.livecarta.loc/meta/api/", + "api_key":"695e513c-xxxx-xxxx-a666-xxxxxxxxxx" + } +} \ No newline at end of file diff --git a/configs/config.py b/configs/config.py new file mode 100644 index 0000000..1b09ae5 --- /dev/null +++ b/configs/config.py @@ -0,0 +1,64 @@ +import json + +import requests +from dynaconf import Dynaconf, Validator +from mongoengine import connect + + +class AppConfig: + def __init__(self): + self.config = Dynaconf(settings_files=[ + "/app/configs/main.json", + "/app/configs/application_credentials.json", + "/app/configs/db.json", + "/app/configs/sources.json" + ]) + self.config.validators.register( + Validator('db', 'db.host', 'db.database', must_exist=True), + ) + self.config.validators.validate() + + creds = self.get_db_config() + connect( + db=creds.database, + host="mongodb://{host}:27017/{database}".format(host=creds.host, database=creds.database) + ) + + def get_bulk_insert_limit(self): + if not self.config.bulk_limit: + return 1 + else: + return self.config.bulk_limit + + def get_db_config(self): + return self.config.db + + def get_main_app_creds(self): + return self.config.application_credentials + + def get_source_by_name(self, name: str): + if name not in self.config.sources: + raise ValueError(f'"{name}" source not exists!') + return self.config.sources[name] + + def get_sources_list(self): + return self.config.sources.keys() + + def update_sources(self): + creds = self.get_main_app_creds() + headers = { + 'Content-type': 'application/json', + 'Authorization': 'Bearer {key}'.format(key=creds.api_key) + } + r = requests.get(creds.api_url + "sources", headers=headers) + if r.status_code != 200: + raise Exception('Bad app response') + + new_config = {"sources": {}} + for source in r.json(): + new_config["sources"][source["source_name"]] = source + + with open("./configs/sources.json", "w") as outfile: + outfile.write(json.dumps(new_config)) + +config = AppConfig() \ No newline at end of file diff --git a/configs/db.json b/configs/db.json new file mode 100644 index 0000000..2d3e8a6 --- /dev/null +++ b/configs/db.json @@ -0,0 +1,6 @@ +{ + "db": { + "host": "mongo_book_meta", + "database": "mongo_book_meta" + } +} \ No newline at end of file diff --git a/configs/main.json b/configs/main.json new file mode 100644 index 0000000..0b762b3 --- /dev/null +++ b/configs/main.json @@ -0,0 +1,3 @@ +{ + "bulk_limit": 100 +} \ No newline at end of file diff --git a/configs/sources.json b/configs/sources.json new file mode 100644 index 0000000..e69de29 diff --git a/models/File.py b/models/File.py new file mode 100644 index 0000000..37da8e1 --- /dev/null +++ b/models/File.py @@ -0,0 +1,11 @@ +from datetime import datetime +from mongoengine import * + + +class File(Document): + + file_hash = StringField(required=True, max_length=40, primary_key=True) + source = StringField(required=True, max_length=40) + file_path = StringField(required=True, max_length=255) + file_updated_at = IntField(required=False) + created_at = DateTimeField(default=datetime.utcnow) diff --git a/models/MetaData.py b/models/MetaData.py new file mode 100644 index 0000000..3381215 --- /dev/null +++ b/models/MetaData.py @@ -0,0 +1,9 @@ +from datetime import datetime +from mongoengine import * + + +class MetaData(Document): + + row_hash = StringField(required=True, max_length=40, primary_key=True) + file_hash = StringField(max_length=40) + created_at = DateTimeField(default=datetime.utcnow) \ No newline at end of file diff --git a/sources/BaseSource.py b/sources/BaseSource.py new file mode 100644 index 0000000..aac4d9d --- /dev/null +++ b/sources/BaseSource.py @@ -0,0 +1,108 @@ +import hashlib +import importlib +import json + +import mongoengine +from pydantic import BaseModel, Extra + +from components.SenderComponent import SenderComponent +from configs.config import config +from models.MetaData import MetaData +from sources.source_types.AbstractSource import AbstractSource + + +class BaseSource(BaseModel): + source_name: str + source: dict + parser_type: dict + + sourceObject: AbstractSource = None + senderComponent: SenderComponent = None + + class Config: + extra = Extra.allow + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.senderComponent = SenderComponent(**config.get_main_app_creds()) + + def get_source_object(config): + class_name = config.type.capitalize() + "Source" + module_name = 'sources.source_types.' + class_name + module = importlib.import_module(module_name) + module_class = getattr(module, class_name) + return module_class(source_name=self.source_name, **config) + + def get_parser(config): + class_name = config.format.capitalize() + "Parser" + module_name = 'sources.file_types.' + class_name + module = importlib.import_module(module_name) + module_class = getattr(module, class_name) + return module_class(**config) + + self.sourceObject = get_source_object(self.source) + + if (self.sourceObject.is_parser_needed()): + self.sourceObject.file_parser = get_parser(self.parser_type) + + def check_is_update_needed(self): + return self.sourceObject.check_is_update_needed() + + def check_is_new_row(self, hash): + try: + MetaData.objects().get(row_hash=hash) + return False + except MetaData.DoesNotExist: + return True + + def save_row_hash(self, row_hash, data, file_hash=None): # TODO: is origina_data needed? + MetaData(file_hash=file_hash, row_hash=row_hash, original_data=data).save() + + def __bulk_save(self, hashes, data): + if self.senderComponent.send_data({'data': data, 'source_name': self.source_name}): + instances = [MetaData(**data) for data in hashes] + try: + MetaData.objects.insert(instances, load_bulk=False) + except mongoengine.errors.BulkWriteError as error: + print(error) + self.__insert_row_by_row(instances) + else: + raise Exception("Error on saving data.") + + def __insert_row_by_row(self, instances): + for model in instances: + try: + model.save() + except mongoengine.errors.BulkWriteError: + print("Duplicate row with id: {id}".format(id=model.row_hash)) + + def update_data(self): + data_iterator = self.sourceObject.parse_data() + + inserted_rows = 0 + # try: + data_to_send = [] + hashes = [] + for row in data_iterator: + dhash = hashlib.sha1() + encoded = json.dumps(row, sort_keys=True).encode() + dhash.update(encoded) + row_hash = dhash.hexdigest() + if self.check_is_new_row(row_hash): + data_to_send.append(row) + hashes.append({"row_hash": row_hash, "file_hash": self.sourceObject.active_file_hash()}) + if len(hashes) >= config.get_bulk_insert_limit(): + self.__bulk_save(hashes, data_to_send) + inserted_rows += len(hashes) + data_to_send = [] + hashes = [] + + if len(hashes) > 0: + self.__bulk_save(hashes, data_to_send) + self.sourceObject.after_save() + # except Exception as inst: + # print('Undefined error! Data not updated') + # print(type(inst)) + # print(inst.args) + + return inserted_rows diff --git a/sources/file_types/AbstractParser.py b/sources/file_types/AbstractParser.py new file mode 100644 index 0000000..ce35862 --- /dev/null +++ b/sources/file_types/AbstractParser.py @@ -0,0 +1,13 @@ +from abc import ABC, abstractmethod +from pydantic import Extra, BaseModel + + +class AbstractParser(BaseModel, ABC): + + + class Config: + extra = Extra.allow + + @abstractmethod + def parse(self, source): + pass \ No newline at end of file diff --git a/sources/file_types/CsvParser.py b/sources/file_types/CsvParser.py new file mode 100644 index 0000000..8acefe3 --- /dev/null +++ b/sources/file_types/CsvParser.py @@ -0,0 +1,12 @@ +import csv + +from sources.file_types.AbstractParser import AbstractParser + + +class CsvParser(AbstractParser): + + def parse(self, source): + with open(source, 'r') as item: + reader = csv.DictReader(item) + for line in reader: + yield line \ No newline at end of file diff --git a/sources/source_types/AbstractSource.py b/sources/source_types/AbstractSource.py new file mode 100644 index 0000000..3208b7f --- /dev/null +++ b/sources/source_types/AbstractSource.py @@ -0,0 +1,37 @@ +from abc import ABC, abstractmethod + +from pydantic import Extra, BaseModel + +from sources.file_types.AbstractParser import AbstractParser + + +class AbstractSource(BaseModel, ABC): + file_hash: str = None + source_name: str = None + file_parser: AbstractParser = None + + class Config: + extra = Extra.allow + + def __init__(self, source_name, **kwargs): + super().__init__(**kwargs) + if not source_name: + raise Exception('You have to add "source_name" to your config!') + self.source_name = source_name + + @abstractmethod + def check_is_update_needed(self): + pass + + @abstractmethod + def parse_data(self): + pass + + def is_parser_needed(self): + return False + + def active_file_hash(self): + return self.file_hash + + def after_save(self): + pass \ No newline at end of file diff --git a/sources/source_types/FtpSource.py b/sources/source_types/FtpSource.py new file mode 100644 index 0000000..727c18f --- /dev/null +++ b/sources/source_types/FtpSource.py @@ -0,0 +1,58 @@ +import os + +from components.FileComponent import FileComponent +from components.FtpDownloader import FtpDownloader +from sources.source_types.AbstractSource import AbstractSource + + +class FtpSource(AbstractSource): + + # config + ftp_url: str + ftp_login: str + ftp_password: str + file_regex: str + local_files_path: str + + # additional params + source_files: list = [] + files_for_update: list = [] + local_files: list = [] + + def __init__(self, source_name, **kwargs): + super().__init__(source_name, **kwargs) + self.fileComponent = FileComponent(local_files_path=self.local_files_path) + self.local_files = self.fileComponent.get_parsed_files(source_name) + self.ftpClient = FtpDownloader(url=self.ftp_url, login=self.ftp_login, password=self.ftp_password) + self.source_files = self.ftpClient.get_files_with_checksum('/', self.file_regex) + self.__files_for_update() + + def __files_for_update(self): + local_hashes = [f.get('hash') for f in self.local_files] + self.files_for_update = [f for f in self.source_files if f.get('hash') not in local_hashes] + return self.files_for_update + + def is_parser_needed(self): + return True + + def download_file(self, file): + if not os.path.exists(self.local_files_path): + os.makedirs(self.local_files_path) + self.ftpClient.download_file(file, self.local_files_path + os.path.basename(file)) + + def check_is_update_needed(self): + return len(self.files_for_update) > 0 + + def parse_data(self): + for f in self.files_for_update: + self.download_file(f.get('file')) + self.file_hash = f.get('hash') + file_path = self.local_files_path + f.get('file') + data = self.file_parser.parse(file_path) + for line in data: + yield line + + def after_save(self): + for f in self.files_for_update: + self.fileComponent.add_parsed_file(self.source_name, f.get('file'), f.get('hash'), f.get('updated_at')) + self.fileComponent.remove_file(self.local_files_path + f.get('file')) \ No newline at end of file diff --git a/update.py b/update.py new file mode 100644 index 0000000..145e99d --- /dev/null +++ b/update.py @@ -0,0 +1,49 @@ +import argparse +from configs.config import config +from sources.BaseSource import BaseSource + + +class Updater: + def __init__(self): + parser = argparse.ArgumentParser(description="Source parser.") + parser.add_argument("--config", help='Update config action', action='store_true') + parser.add_argument("--source", type=str, help='List of sources: ' + ', '.join(config.get_sources_list())) + + self.args = parser.parse_args() + + def is_source_exists(self, source): + try: + config.get_source_by_name(source) + except ValueError: + return False + + return True + + def parse_source(self, source): + source_config = config.get_source_by_name(source) + source_model = BaseSource(**source_config) + + if not source_model.check_is_update_needed(): + print('Nothing to update') + return + else: + updated = source_model.update_data() + print('Rows added: ' + str(updated)) + + def update_config(self): + config.update_sources() + + def do_action(self): + if self.args.config: + self.update_config() + return + + if not self.is_source_exists(self.args.source): + print("This source not exists, list of sources: " + ', '.join(config.get_sources_list())) + return + + self.parse_source(self.args.source) + + +updater = Updater() +updater.do_action()