Merge pull request #1 from evgeniyKazak/develop

first commit
This commit is contained in:
bivis
2022-12-14 14:45:02 +03:00
committed by GitHub
18 changed files with 570 additions and 2 deletions

11
Dockerfile Normal file
View File

@@ -0,0 +1,11 @@
FROM python:3.12-rc-slim
WORKDIR /app
RUN pip install pymongo \
dynaconf \
pydantic \
pymysql \
mongoengine \
dict_hash \
requests

View File

@@ -1,2 +1,74 @@
# LiveCarta-microservices # Book Meta Data Parser
microservices
Microservice which solves only one issue parse book meta data from our publishers. Not depends on what format publisher stores this data, the service must grub this information and send an array of data to the main application without any formatting. The main idea is to add components for parsing different formats and have the ability to add publishers just by updating config files.
## Version 1.0
Added two components for working with CSV and FTP.
## Tech Stack
• Docker
• Python 3.11
• MongoDb 6.0.2
• Dynaconf
• Pydantic
• MongoEngine
## Folder structure
• app
◦ components
◦ configs
▪ application_credentials.json keys and url for connection to our main app
▪ db.json creds for service db
▪ main.json main config
▪ sources.json list of sources with components that they use
◦ models
◦ sources
▪ file_types
▪ source_types
## Sources configuration
To configure a new source you need to update source config by adding the params below:
• source_name
• source //with neccesary params for component
• parser_type //with neccesary params for component
Example for CSV files from FTP:
```json
{
"sources": {
"McGrawHill": {
"source_name": "McGrawHill",
"source": {
"type": "ftp",
"ftp_url": "127.0.0.1",
"ftp_login": "frp_login",
"ftp_password": "frp_pass",
"local_files_path": "/app/files/McGrawHill/",
"file_regex": "*.csv"
},
"parser_type": {
"format": "csv"
}
}
}
}
```
Each source parser starts by crontab by command
`python update.py {source_name}`
To see list of source types use command
`python update.py -h`

View File

@@ -0,0 +1,21 @@
import os
from pydantic import BaseModel
from models.File import File
class FileComponent(BaseModel):
local_files_path: str
def get_parsed_files(self, source_name):
files = [{"file": self.local_files_path + f.file_path, "hash": f.file_hash}
for f in File.objects(source=source_name)]
return files
def add_parsed_file(self, source, path, hash, time):
return File(source=source, file_path=path, file_hash=hash, file_updated_at=time).save()
def remove_file(self, file):
os.remove(file)

View File

@@ -0,0 +1,65 @@
import hashlib
from pydantic import BaseModel, Extra
from ftplib import FTP
from urllib.parse import urlparse
class FtpDownloader(BaseModel):
url: str
login = ''
password = ''
class Config:
extra = Extra.allow
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.__connect()
def __del__(self):
self.ftp.close()
def __connect(self):
parsed_uri = urlparse(self.url)
self.ftp = FTP(parsed_uri.netloc)
self.ftp.login(self.login, self.password)
if parsed_uri.path:
self.ftp.cwd(parsed_uri.path)
def get_files_with_checksum(self, path, filetypeRegex = None):
if filetypeRegex:
path += filetypeRegex
files = self.ftp.nlst(path)
file_list = []
for file_path in files:
m = hashlib.sha1()
self.ftp.retrbinary('RETR %s' % file_path, m.update)
updated_at = self.ftp.voidcmd("MDTM " + file_path)[4:].strip()
file_dict = {'file': file_path, 'hash': m.hexdigest(), 'updated_at': updated_at}
file_list.append(file_dict)
print(file_dict)
file_list = sorted(file_list, key=lambda d: d['updated_at'])
return file_list
def download_file(self, origin_file, to):
m = hashlib.sha1()
self.ftp.retrbinary('RETR %s' % origin_file, m.update)
checksum = m.hexdigest()
# Write file in binary mode
with open(to, "wb") as file:
self.ftp.retrbinary(f"RETR {origin_file}", file.write)
file.close()
file = open(to, "rb")
local_checksum = hashlib.sha1(file.read()).hexdigest()
if checksum and local_checksum != checksum:
raise BaseException(f"Wrong checksum for file: {origin_file}")
file.close()
print("file downloaded " + origin_file)
return True

View File

@@ -0,0 +1,23 @@
import json
import requests
from pydantic import BaseModel
class SenderComponent(BaseModel):
api_url: str
api_key: str
def __generate_key(self):
return 'Bearer {key}'.format(key=self.api_key)
def __headers(self):
return {
'Authorization': self.__generate_key(),
'Content-type': 'application/json',
}
def send_data(self, data):
headers = self.__headers()
response = requests.post(self.api_url + 'data', data=json.dumps(data), headers=headers)
return response.status_code == 200

View File

@@ -0,0 +1,6 @@
{
"application_credentials": {
"api_url":"http://app.livecarta.loc/meta/api/",
"api_key":"695e513c-xxxx-xxxx-a666-xxxxxxxxxx"
}
}

64
configs/config.py Normal file
View File

@@ -0,0 +1,64 @@
import json
import requests
from dynaconf import Dynaconf, Validator
from mongoengine import connect
class AppConfig:
def __init__(self):
self.config = Dynaconf(settings_files=[
"/app/configs/main.json",
"/app/configs/application_credentials.json",
"/app/configs/db.json",
"/app/configs/sources.json"
])
self.config.validators.register(
Validator('db', 'db.host', 'db.database', must_exist=True),
)
self.config.validators.validate()
creds = self.get_db_config()
connect(
db=creds.database,
host="mongodb://{host}:27017/{database}".format(host=creds.host, database=creds.database)
)
def get_bulk_insert_limit(self):
if not self.config.bulk_limit:
return 1
else:
return self.config.bulk_limit
def get_db_config(self):
return self.config.db
def get_main_app_creds(self):
return self.config.application_credentials
def get_source_by_name(self, name: str):
if name not in self.config.sources:
raise ValueError(f'"{name}" source not exists!')
return self.config.sources[name]
def get_sources_list(self):
return self.config.sources.keys()
def update_sources(self):
creds = self.get_main_app_creds()
headers = {
'Content-type': 'application/json',
'Authorization': 'Bearer {key}'.format(key=creds.api_key)
}
r = requests.get(creds.api_url + "sources", headers=headers)
if r.status_code != 200:
raise Exception('Bad app response')
new_config = {"sources": {}}
for source in r.json():
new_config["sources"][source["source_name"]] = source
with open("./configs/sources.json", "w") as outfile:
outfile.write(json.dumps(new_config))
config = AppConfig()

6
configs/db.json Normal file
View File

@@ -0,0 +1,6 @@
{
"db": {
"host": "mongo_book_meta",
"database": "mongo_book_meta"
}
}

3
configs/main.json Normal file
View File

@@ -0,0 +1,3 @@
{
"bulk_limit": 100
}

0
configs/sources.json Normal file
View File

11
models/File.py Normal file
View File

@@ -0,0 +1,11 @@
from datetime import datetime
from mongoengine import *
class File(Document):
file_hash = StringField(required=True, max_length=40, primary_key=True)
source = StringField(required=True, max_length=40)
file_path = StringField(required=True, max_length=255)
file_updated_at = IntField(required=False)
created_at = DateTimeField(default=datetime.utcnow)

9
models/MetaData.py Normal file
View File

@@ -0,0 +1,9 @@
from datetime import datetime
from mongoengine import *
class MetaData(Document):
row_hash = StringField(required=True, max_length=40, primary_key=True)
file_hash = StringField(max_length=40)
created_at = DateTimeField(default=datetime.utcnow)

108
sources/BaseSource.py Normal file
View File

@@ -0,0 +1,108 @@
import hashlib
import importlib
import json
import mongoengine
from pydantic import BaseModel, Extra
from components.SenderComponent import SenderComponent
from configs.config import config
from models.MetaData import MetaData
from sources.source_types.AbstractSource import AbstractSource
class BaseSource(BaseModel):
source_name: str
source: dict
parser_type: dict
sourceObject: AbstractSource = None
senderComponent: SenderComponent = None
class Config:
extra = Extra.allow
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.senderComponent = SenderComponent(**config.get_main_app_creds())
def get_source_object(config):
class_name = config.type.capitalize() + "Source"
module_name = 'sources.source_types.' + class_name
module = importlib.import_module(module_name)
module_class = getattr(module, class_name)
return module_class(source_name=self.source_name, **config)
def get_parser(config):
class_name = config.format.capitalize() + "Parser"
module_name = 'sources.file_types.' + class_name
module = importlib.import_module(module_name)
module_class = getattr(module, class_name)
return module_class(**config)
self.sourceObject = get_source_object(self.source)
if (self.sourceObject.is_parser_needed()):
self.sourceObject.file_parser = get_parser(self.parser_type)
def check_is_update_needed(self):
return self.sourceObject.check_is_update_needed()
def check_is_new_row(self, hash):
try:
MetaData.objects().get(row_hash=hash)
return False
except MetaData.DoesNotExist:
return True
def save_row_hash(self, row_hash, data, file_hash=None): # TODO: is origina_data needed?
MetaData(file_hash=file_hash, row_hash=row_hash, original_data=data).save()
def __bulk_save(self, hashes, data):
if self.senderComponent.send_data({'data': data, 'source_name': self.source_name}):
instances = [MetaData(**data) for data in hashes]
try:
MetaData.objects.insert(instances, load_bulk=False)
except mongoengine.errors.BulkWriteError as error:
print(error)
self.__insert_row_by_row(instances)
else:
raise Exception("Error on saving data.")
def __insert_row_by_row(self, instances):
for model in instances:
try:
model.save()
except mongoengine.errors.BulkWriteError:
print("Duplicate row with id: {id}".format(id=model.row_hash))
def update_data(self):
data_iterator = self.sourceObject.parse_data()
inserted_rows = 0
# try:
data_to_send = []
hashes = []
for row in data_iterator:
dhash = hashlib.sha1()
encoded = json.dumps(row, sort_keys=True).encode()
dhash.update(encoded)
row_hash = dhash.hexdigest()
if self.check_is_new_row(row_hash):
data_to_send.append(row)
hashes.append({"row_hash": row_hash, "file_hash": self.sourceObject.active_file_hash()})
if len(hashes) >= config.get_bulk_insert_limit():
self.__bulk_save(hashes, data_to_send)
inserted_rows += len(hashes)
data_to_send = []
hashes = []
if len(hashes) > 0:
self.__bulk_save(hashes, data_to_send)
self.sourceObject.after_save()
# except Exception as inst:
# print('Undefined error! Data not updated')
# print(type(inst))
# print(inst.args)
return inserted_rows

View File

@@ -0,0 +1,13 @@
from abc import ABC, abstractmethod
from pydantic import Extra, BaseModel
class AbstractParser(BaseModel, ABC):
class Config:
extra = Extra.allow
@abstractmethod
def parse(self, source):
pass

View File

@@ -0,0 +1,12 @@
import csv
from sources.file_types.AbstractParser import AbstractParser
class CsvParser(AbstractParser):
def parse(self, source):
with open(source, 'r') as item:
reader = csv.DictReader(item)
for line in reader:
yield line

View File

@@ -0,0 +1,37 @@
from abc import ABC, abstractmethod
from pydantic import Extra, BaseModel
from sources.file_types.AbstractParser import AbstractParser
class AbstractSource(BaseModel, ABC):
file_hash: str = None
source_name: str = None
file_parser: AbstractParser = None
class Config:
extra = Extra.allow
def __init__(self, source_name, **kwargs):
super().__init__(**kwargs)
if not source_name:
raise Exception('You have to add "source_name" to your config!')
self.source_name = source_name
@abstractmethod
def check_is_update_needed(self):
pass
@abstractmethod
def parse_data(self):
pass
def is_parser_needed(self):
return False
def active_file_hash(self):
return self.file_hash
def after_save(self):
pass

View File

@@ -0,0 +1,58 @@
import os
from components.FileComponent import FileComponent
from components.FtpDownloader import FtpDownloader
from sources.source_types.AbstractSource import AbstractSource
class FtpSource(AbstractSource):
# config
ftp_url: str
ftp_login: str
ftp_password: str
file_regex: str
local_files_path: str
# additional params
source_files: list = []
files_for_update: list = []
local_files: list = []
def __init__(self, source_name, **kwargs):
super().__init__(source_name, **kwargs)
self.fileComponent = FileComponent(local_files_path=self.local_files_path)
self.local_files = self.fileComponent.get_parsed_files(source_name)
self.ftpClient = FtpDownloader(url=self.ftp_url, login=self.ftp_login, password=self.ftp_password)
self.source_files = self.ftpClient.get_files_with_checksum('/', self.file_regex)
self.__files_for_update()
def __files_for_update(self):
local_hashes = [f.get('hash') for f in self.local_files]
self.files_for_update = [f for f in self.source_files if f.get('hash') not in local_hashes]
return self.files_for_update
def is_parser_needed(self):
return True
def download_file(self, file):
if not os.path.exists(self.local_files_path):
os.makedirs(self.local_files_path)
self.ftpClient.download_file(file, self.local_files_path + os.path.basename(file))
def check_is_update_needed(self):
return len(self.files_for_update) > 0
def parse_data(self):
for f in self.files_for_update:
self.download_file(f.get('file'))
self.file_hash = f.get('hash')
file_path = self.local_files_path + f.get('file')
data = self.file_parser.parse(file_path)
for line in data:
yield line
def after_save(self):
for f in self.files_for_update:
self.fileComponent.add_parsed_file(self.source_name, f.get('file'), f.get('hash'), f.get('updated_at'))
self.fileComponent.remove_file(self.local_files_path + f.get('file'))

49
update.py Normal file
View File

@@ -0,0 +1,49 @@
import argparse
from configs.config import config
from sources.BaseSource import BaseSource
class Updater:
def __init__(self):
parser = argparse.ArgumentParser(description="Source parser.")
parser.add_argument("--config", help='Update config action', action='store_true')
parser.add_argument("--source", type=str, help='List of sources: ' + ', '.join(config.get_sources_list()))
self.args = parser.parse_args()
def is_source_exists(self, source):
try:
config.get_source_by_name(source)
except ValueError:
return False
return True
def parse_source(self, source):
source_config = config.get_source_by_name(source)
source_model = BaseSource(**source_config)
if not source_model.check_is_update_needed():
print('Nothing to update')
return
else:
updated = source_model.update_data()
print('Rows added: ' + str(updated))
def update_config(self):
config.update_sources()
def do_action(self):
if self.args.config:
self.update_config()
return
if not self.is_source_exists(self.args.source):
print("This source not exists, list of sources: " + ', '.join(config.get_sources_list()))
return
self.parse_source(self.args.source)
updater = Updater()
updater.do_action()