Annotations in Epub converter

This commit is contained in:
Kiryl
2022-08-03 14:39:13 +03:00
parent 7453029295
commit 78e3ad8911
16 changed files with 259 additions and 192 deletions

View File

@@ -3,6 +3,7 @@ import sys
import json
import pika
import logging
from typing import Dict
from pathlib import Path
from threading import Event
from functools import partial
@@ -13,8 +14,7 @@ from src.docx_converter.docx_solver import DocxBook
from src.epub_converter.epub_solver import EpubBook
def configure_file_logger(name, filename="logs/converter.log", filemode="w+",
logging_level=logging.INFO):
def configure_file_logger(name: str, filename: str = "logs/converter.log", filemode: str ="w+", logging_level: int = logging.INFO) -> logging.Logger:
logger = logging.getLogger(name)
folder_path = os.path.dirname(os.path.abspath(__file__))
@@ -30,7 +30,7 @@ def configure_file_logger(name, filename="logs/converter.log", filemode="w+",
return logger
def local_convert_book(book_type: [DocxBook, EpubBook], book_id, logger, params: dict):
def local_convert_book(book_type: [DocxBook, EpubBook], book_id, logger: logging.Logger, params: dict):
logger.info(f"Start processing book-{book_id}.")
try:
json_file_path = "books/json/9781614382264.json"
@@ -41,7 +41,7 @@ def local_convert_book(book_type: [DocxBook, EpubBook], book_id, logger, params:
logger.info(f"Book-{book_id} has been proceeded.")
def convert_book(book_type: [DocxBook, EpubBook], book_id, logger, params: dict):
def convert_book(book_type: [DocxBook, EpubBook], book_id: int, logger: logging.Logger, params: Dict[str, Access]):
logger.info(f"Start processing book-{book_id}.")
try:
book = book_type(book_id=book_id, main_logger=logger, **params)
@@ -51,7 +51,7 @@ def convert_book(book_type: [DocxBook, EpubBook], book_id, logger, params: dict)
logger.info(f"Book-{book_id} has been proceeded.")
def callback(ch, method, properties, body, logger, libre_locker):
def callback(ch, method, properties, body: bytes, logger: logging.Logger, libre_locker: Event):
print(f"Message: {body}.")
logger.info(f"Message: {body}.")
try:
@@ -82,7 +82,6 @@ def callback(ch, method, properties, body, logger, libre_locker):
logger.error(f"{sys.exc_info()[0]}: {exc.message}")
else:
logger.error(f"{sys.exc_info()[0]}: {str(exc)}")
finally:
pass

View File

@@ -1,17 +1,23 @@
import json
import os
import json
import time
import requests
from threading import Event
from io import BytesIO
from threading import Event
from typing import List, Tuple, Dict, Union
class Access:
"""Class accessing our platform"""
def __init__(self, url=None):
def __init__(self, url: str = None):
"""
:param url: str, url received from queue message, if field apiURL exists
Parameters
----------
url: str
url received from queue message,
if field apiURL exists
else None
"""
self.PENDING = 1
self.PROCESS = 2
@@ -19,6 +25,7 @@ class Access:
self.FINISH = 4
self.ERROR = 5
self.url = None
self.username = None
self.password = None
@@ -32,12 +39,12 @@ class Access:
self.get_token()
self.refreshing.set()
def set_credentials(self, url):
folder_path = os.path.dirname(
def set_credentials(self, url: str):
folder_path: str = os.path.dirname(
os.path.dirname(os.path.abspath(__file__)))
config_path = os.path.join(folder_path, "config/api_config.json")
config_path: str = os.path.join(folder_path, "config/api_config.json")
with open(config_path, "r") as f:
params = json.load(f)
params: Dict[str, str] = json.load(f)
self.refreshing.clear()
self.url = url
@@ -104,7 +111,7 @@ class Access:
else:
raise Exception(f'{response.status_code}')
def get_file(self, file_path):
def get_file(self, file_path: str) -> bytes:
"""Function downloads the file[book, preset] from site"""
if self.is_time_for_refreshing():
self.refresh_token()
@@ -124,10 +131,11 @@ class Access:
f'status code:{response.status_code}')
return content
def sleep(timeout: float, retry=3):
@staticmethod
def sleep(timeout: float, retry: int = 3):
def decorator(function):
"""Decorator sleeping timeout sec and makes 3 retries"""
def wrapper(*args, **kwargs):
def wrapper(*args, **kwargs) -> str:
retries = 0
while retries < retry:
try:
@@ -141,14 +149,14 @@ class Access:
return decorator
@sleep(3)
def send_image(self, img_path, doc_id, img_content: bytes = None):
def send_image(self, img_path: str, doc_id: str, img_content: bytes = None) -> str:
"""Function sends images to site"""
if self.is_time_for_refreshing():
self.refresh_token()
self.refreshing.wait()
img_obj = BytesIO(img_content) if img_content else open(img_path, 'rb')
files = {
img_obj: BytesIO = BytesIO(img_content) if img_content else open(img_path, 'rb')
files: Dict[str, Tuple[str, BytesIO]] = {
'image': (os.path.basename(img_path), img_obj)
}
response = requests.post(
@@ -165,7 +173,7 @@ class Access:
f'{response.status_code} Bad request: {response.json()["message"]}.')
return img_url
def send_book(self, doc_id, content):
def send_book(self, doc_id: int, content: Dict[str, List[Dict[str, Union[List, str]]]]):
"""Function sends the book to site"""
if self.is_time_for_refreshing():
self.refresh_token()
@@ -184,7 +192,7 @@ class Access:
raise Exception(
f'{response.status_code} Bad request: {response.json()["message"]}.')
def update_status(self, doc_id, status):
def update_status(self, doc_id: Union[int, str], status: int):
"""Function updates status of the book on site"""
if self.is_time_for_refreshing():
self.refresh_token()

View File

@@ -3,6 +3,7 @@ import json
import codecs
import logging
import pathlib
from typing import List, Dict, Union
from abc import abstractmethod, ABCMeta
from src.livecarta_config import LiveCartaConfig
@@ -20,7 +21,7 @@ class BookSolver:
__metaclass__ = ABCMeta
def __init__(self, book_id=0, access=None, main_logger=None):
def __init__(self, book_id: int = 0, access=None, main_logger=None):
self.book_type = None
self.book_id = book_id
self.access = access
@@ -36,22 +37,30 @@ class BookSolver:
assert LiveCartaConfig.SUPPORTED_LEVELS == len(LiveCartaConfig.SUPPORTED_HEADERS), \
"Length of headers doesn't match allowed levels."
def save_file(self, content: bytes, path_to_save, file_type):
def save_file(self, content: bytes, path_to_save: str, file_type: str) -> str:
"""
Function saves binary content of file to folder(path_to_save)
Parameters
----------
content: bytes str
binary content of the file
path_to_save: str
path to the folder
file_type: str
Returns
----------
file_path: str
path to file on local
"""
folder_path = os.path.dirname(
folder_path: str = os.path.dirname(
os.path.dirname(os.path.abspath(__file__)))
folder_path = os.path.join(
folder_path, path_to_save)
pathlib.Path(folder_path).mkdir(parents=True, exist_ok=True)
file_path = os.path.join(
file_path: str = os.path.join(
folder_path, f"{self.book_id}.{file_type}")
try:
with open(file_path, "wb+") as file:
@@ -116,7 +125,7 @@ class BookSolver:
parents=True, exist_ok=True)
self.book_output_path.touch(exist_ok=True)
def write_to_json(self, content: dict):
def write_to_json(self, content: Dict[str, List[Dict[str, Union[List, str]]]]):
self.check_output_directory()
try:
with codecs.open(self.book_output_path, "w", encoding="utf-8") as f:
@@ -127,7 +136,7 @@ class BookSolver:
self.logger_object.log(
"Error has occurred while writing .json file." + str(exc), logging.ERROR)
def send_json_content_to_server(self, content: dict):
def send_json_content_to_server(self, content: Dict[str, List[Dict[str, Union[List, str]]]]):
"""Function sends json_content to site"""
try:
self.access.send_book(self.book_id, content)
@@ -140,7 +149,7 @@ class BookSolver:
raise exc
@abstractmethod
def get_converted_book(self):
def get_converted_book(self) -> Dict[str, List[Dict[str, Union[List, str]]]]:
self.logger_object.log("Beginning of processing .json output.")
self.status_wrapper.set_generating()
return {}
@@ -158,7 +167,7 @@ class BookSolver:
self.logger_object.log(
f"Beginning of conversion from .{self.book_type} to .json.")
self.status_wrapper.set_processing()
content_dict = self.get_converted_book()
content_dict: Dict[str, List[Dict[Union[str, List]]]] = self.get_converted_book()
[os.remove(path) for path in [self.preset_path, self.book_path]]
self.logger_object.log("Beginning of processing .json output.")
self.status_wrapper.set_generating()

View File

@@ -1,5 +1,5 @@
import re
from typing import Union
from typing import List, Dict, Union
from ebooklib.epub import Section, Link
from src.livecarta_config import LiveCartaConfig
@@ -11,7 +11,7 @@ class NavPoint:
These are data structures which form mapping from NCX to python data structures.
"""
def __init__(self, obj: Union[Link, Section] = None, ):
def __init__(self, obj: Union[Link, Section] = None):
self.href, self.id = self.parse_href_id(obj)
self.title = obj.title
@@ -52,15 +52,15 @@ def flatten(x):
class ChapterItem:
"""
Class of Chapter that could have subchapters
These are data structures which form mapping to livecarta json structure.
These are data structures which form mapping to LiveCarta json structure.
"""
def __init__(self, title, content, sub_items):
def __init__(self, title: str, content: str, sub_items: List):
self.title = title
self.content = content
self.sub_items = sub_items
def to_dict(self, lvl=1):
def to_dict(self, lvl: int = 1) -> Dict[str, Union[str, List]]:
"""Function returns dictionary of chapter"""
sub_dicts = []
if self.sub_items:

View File

@@ -1,5 +1,6 @@
import re
import cssutils
from typing import Tuple, Dict
from bs4 import BeautifulSoup
from os.path import dirname, normpath, join
@@ -41,13 +42,13 @@ class CSSPreprocessor:
}
@staticmethod
def get_text_color(x):
def get_text_color(x: str) -> str:
color = str2hex(x)
color = color if color not in ["#000000", "#000", "black"] else ""
return color
@staticmethod
def get_bg_color(x):
def get_bg_color(x: str) -> str:
color = str2hex(x)
color = color if color not in ["#ffffff", "#fff", "white"] else ""
return color
@@ -114,7 +115,7 @@ class CSSPreprocessor:
return cleaned_value
@staticmethod
def style_conditions(style_value: str, style_name: str) -> tuple[bool, bool]:
def style_conditions(style_value: str, style_name: str) -> Tuple[bool, bool]:
constraints_on_value = LiveCartaConfig.LIVECARTA_STYLE_ATTRS.get(
style_name)
value_not_in_possible_values_list = style_value not in LiveCartaConfig.LIVECARTA_STYLE_ATTRS[
@@ -156,7 +157,7 @@ class CSSPreprocessor:
style = "; ".join(split_style)
return style
def process_inline_styles_in_html_soup(self, html_href2html_body_soup: dict):
def process_inline_styles_in_html_soup(self, html_href2html_body_soup: Dict[str, BeautifulSoup]):
"""This function is designed to convert inline html styles"""
for html_href in html_href2html_body_soup:
html_content: BeautifulSoup = html_href2html_body_soup[html_href]
@@ -169,7 +170,7 @@ class CSSPreprocessor:
self.build_inline_style_content(inline_style)
@staticmethod
def get_css_content(css_href, html_href, ebooklib_book):
def get_css_content(css_href: str, html_href: str, ebooklib_book) -> str:
path_to_css_from_html = css_href
html_folder = dirname(html_href)
path_to_css_from_root = normpath(

View File

@@ -9,8 +9,8 @@ from pathlib import Path
from itertools import chain
from premailer import transform
from collections import defaultdict
from typing import Dict, Union, List
from bs4 import BeautifulSoup, NavigableString, Tag
from typing import List, Tuple, Dict, Union
from bs4 import BeautifulSoup, Tag, NavigableString
from src.util.helpers import BookLogger
from src.epub_converter.css_processor import CSSPreprocessor
@@ -39,7 +39,8 @@ class EpubConverter:
# toc tree structure stored as adj.list (NavPoint to list of NavPoints)
# key = -1 for top level NavPoints
self.adjacency_list: Dict[Union[NavPoint, -1], Union[list, None]] = {}
self.adjacency_list: Dict[Union[NavPoint, -1],
Union[List[NavPoint], None]] = {}
# list to offset Chapter_i on 1st level
self.offset_sub_nodes = []
@@ -70,7 +71,8 @@ class EpubConverter:
BeautifulSoup] = self.build_href2soup_content()
self.logger.log("CSS inline style processing.")
self.css_processor.process_inline_styles_in_html_soup(self.html_href2html_body_soup)
self.css_processor.process_inline_styles_in_html_soup(
self.html_href2html_body_soup)
self.logger.log("CSS files processing.")
self.html_href2css_href, self.css_href2css_content = self.build_html_and_css_relations()
self.logger.log("CSS styles fusion(inline+file).")
@@ -107,7 +109,6 @@ class EpubConverter:
def build_href2soup_content(self) -> Dict[str, BeautifulSoup]:
# using EpubElements
# for now just for HTML objects, as it is the simplest chapter
nodes = dict()
for item in self.ebooklib_book.get_items_of_type(ebooklib.ITEM_DOCUMENT):
html_body_text = item.get_body_content()
@@ -116,7 +117,7 @@ class EpubConverter:
nodes[item.file_name] = soup
return nodes
def build_html_and_css_relations(self) -> tuple[dict, dict]:
def build_html_and_css_relations(self) -> Tuple[Dict[str, List[str]], Dict[str, str]]:
"""
Function is designed to get 2 dictionaries:
The first is html_href2css_href. It is created to connect href of html to css files(hrefs of them
@@ -130,8 +131,8 @@ class EpubConverter:
"""
# dictionary: href of html to related css files
html_href2css_href: defaultdict = defaultdict(list)
css_href2css_content: dict = {}
html_href2css_href: Dict[str, List[str]] = defaultdict(list)
css_href2css_content: Dict[str, str] = {}
for item in self.ebooklib_book.get_items_of_type(ebooklib.ITEM_DOCUMENT):
html_content = item.content
@@ -213,7 +214,9 @@ class EpubConverter:
html_content, css)
self.html_href2html_body_soup[html_href] = html_content
def build_adjacency_list_from_toc(self, element: [Link, tuple, list], lvl=0):
def build_adjacency_list_from_toc(self,
element: Union[Link, Tuple[Section, List], List[Union[Link, Tuple]]],
lvl: int = 0) -> NavPoint:
"""
Function
self.adjacency_list builds based on TOC nested structure, got from self.ebooklib.toc
@@ -304,7 +307,7 @@ class EpubConverter:
self.adjacency_list[-1].append(nav_point)
self.hrefs_added_to_toc.add(nav_point.href)
def add_not_added_files_to_adjacency_list(self, not_added: list):
def add_not_added_files_to_adjacency_list(self, not_added: List[str]):
"""Function add files that not added to adjacency list"""
for i, file in enumerate(not_added):
nav_point = NavPoint(
@@ -345,10 +348,13 @@ class EpubConverter:
mark.parent.unwrap()
@staticmethod
def create_unique_id(href, id_):
def create_unique_id(href: str, id_: str) -> str:
return re.sub(r"([^\w\s])|_|-", "", href) + re.sub(r"[_-]", "0", id_)
def match_href_to_path_from_toc(self, cur_file_path: str, href_in_link: str, internal_link_tag: Tag) -> [None, str]:
def match_href_to_path_from_toc(self,
cur_file_path: str,
href_in_link: str,
internal_link_tag: Tag) -> Union[None, str]:
"""
Function used to find full path to file that is parsed from tag link
TOC: a/b/c.xhtml
@@ -387,7 +393,7 @@ class EpubConverter:
return full_path[0]
@staticmethod
def create_new_anchor_span(soup, id_):
def create_new_anchor_span(soup: BeautifulSoup, id_: str) -> Tag:
new_anchor_span = soup.new_tag("span")
new_anchor_span.attrs["id"] = id_
new_anchor_span.attrs["class"] = "link-anchor"
@@ -415,7 +421,8 @@ class EpubConverter:
for toc_href in self.hrefs_added_to_toc:
for tag in self.html_href2html_body_soup[toc_href].find_all(attrs={"id": re.compile(r".+")}):
if tag.attrs.get("class") not in ["converter-chapter-mark", "footnote-element"]:
new_id = self.create_unique_id(toc_href, tag.attrs["id"])
new_id = self.create_unique_id(
toc_href, tag.attrs["id"])
tag.attrs["id"] = new_id
def process_file_anchor():
@@ -427,11 +434,13 @@ class EpubConverter:
a_tag_href_matched_to_toc = self.match_href_to_path_from_toc(
toc_href, a_tag_href, internal_link_tag)
if a_tag_href_matched_to_toc:
new_id = self.create_unique_id(a_tag_href_matched_to_toc, "")
new_id = self.create_unique_id(
a_tag_href_matched_to_toc, "")
internal_link_tag.attrs["placeholder"] = "{{tempStyleToAnchor-" + new_id + "}}"
if new_id not in self.internal_anchors:
anchor_soup = self.html_href2html_body_soup[a_tag_href_matched_to_toc]
new_anchor_span = self.create_new_anchor_span(soup, new_id)
new_anchor_span = self.create_new_anchor_span(
soup, new_id)
# insert a new span to the beginning of the file
anchor_soup.insert(0, new_anchor_span)
self.internal_anchors.add(new_id)
@@ -442,7 +451,8 @@ class EpubConverter:
soup = self.html_href2html_body_soup[toc_href]
# process_file_element_anchor
for internal_link_tag in soup.find_all("a", {"href": re.compile(r"(^.+\.(htm|html|xhtml)#.+)|(^#.+)")}):
a_tag_href, a_tag_id = internal_link_tag.attrs["href"].split("#")
a_tag_href, a_tag_id = internal_link_tag.attrs["href"].split(
"#")
a_tag_href_matched_to_toc = self.match_href_to_path_from_toc(
toc_href, a_tag_href, internal_link_tag) if a_tag_href \
else path.normpath(toc_href).replace("\\", "/")
@@ -452,7 +462,8 @@ class EpubConverter:
anchor_soup = self.html_href2html_body_soup[a_tag_href_matched_to_toc]
anchor_tags = anchor_soup.find_all(attrs={"id": new_id}) or \
anchor_soup.find_all(attrs={"id": a_tag_id}) # if link is a footnote
anchor_soup.find_all(
attrs={"id": a_tag_id}) # if link is a footnote
if anchor_tags:
if len(anchor_tags) > 1:
self.logger.log(f"Warning in {toc_href}: multiple anchors:"
@@ -487,7 +498,9 @@ class EpubConverter:
process_file_element_anchor()
@staticmethod
def get_tags_between_chapter_marks(first_id: str, href: str, html_soup: BeautifulSoup) -> list:
def get_tags_between_chapter_marks(first_id: str,
href: str,
html_soup: BeautifulSoup) -> List[Union[Tag, NavigableString]]:
"""
Get tags between LiveCarta chapter marks
Parameters
@@ -568,7 +581,7 @@ class EpubConverter:
for tl_nav_point in top_level_nav_points:
self.detect_one_chapter(tl_nav_point)
def html_node_to_livecarta_chapter_item(self, nav_point: NavPoint, lvl=1) -> ChapterItem:
def html_node_to_livecarta_chapter_item(self, nav_point: NavPoint, lvl: int = 1) -> ChapterItem:
"""
Function prepare style, tags to json structure
Parameters
@@ -584,18 +597,18 @@ class EpubConverter:
built chapter
"""
title = nav_point.title
title: str = nav_point.title
content: BeautifulSoup = self.href_chapter_id2soup_html[(nav_point.href, nav_point.id)] \
if nav_point.id else self.html_href2html_body_soup[nav_point.href]
indent = " " * lvl
indent: str = " " * lvl
self.logger.log(indent + f"Chapter: {title} is processing.")
is_chapter = lvl <= LiveCartaConfig.SUPPORTED_LEVELS
is_chapter: bool = lvl <= LiveCartaConfig.SUPPORTED_LEVELS
self.logger.log(indent + "Process title.")
title_preprocessed = self.html_processor.prepare_title(title)
title_preprocessed: str = self.html_processor.prepare_title(title)
self.logger.log(indent + "Process content.")
content_preprocessed = self.html_processor.prepare_content(title_preprocessed, content,
remove_title_from_chapter=is_chapter)
content_preprocessed: BeautifulSoup = self.html_processor.prepare_content(
title_preprocessed, content, remove_title_from_chapter=is_chapter)
self.book_image_src_path2aws_path = update_images_src_links(content_preprocessed,
self.img_href2img_bytes,
@@ -613,7 +626,7 @@ class EpubConverter:
sub_nodes.append(sub_chapter_item)
return ChapterItem(title_preprocessed, str(content_preprocessed), sub_nodes)
def convert_to_dict(self) -> dict:
def convert_to_dict(self) -> Dict[str, List[Dict[str, Union[List, str]]]]:
"""Function which convert list of html nodes to appropriate json structure"""
top_level_nav_points = self.adjacency_list[-1]
top_level_chapters = []
@@ -633,7 +646,7 @@ class EpubConverter:
if __name__ == "__main__":
epub_file_path = "../../books/epub/9780763774134.epub"
epub_file_path = "../../books/epub/9781119646044.epub"
logger_object = BookLogger(
name="epub", book_id=epub_file_path.split("/")[-1])

View File

@@ -7,7 +7,7 @@ from src.epub_converter.epub_converter import EpubConverter
class EpubBook(BookSolver):
"""Class of .epub type book - child of BookSolver"""
def __init__(self, book_id=0, access=None, main_logger=None):
def __init__(self, book_id: int = 0, access=None, main_logger=None):
super().__init__(book_id, access, main_logger)
self.book_type = "epub"
@@ -28,7 +28,8 @@ class EpubBook(BookSolver):
"""
css_processor = CSSPreprocessor()
html_processor = HtmlEpubPreprocessor(self.preset_path, logger=self.logger_object)
html_processor = HtmlEpubPreprocessor(
self.preset_path, logger=self.logger_object)
json_converter = EpubConverter(
self.book_path, access=self.access, logger=self.logger_object,
css_processor=css_processor, html_processor=html_processor)

View File

@@ -1,5 +1,5 @@
import re
from typing import Tuple
from typing import List, Tuple
from bs4 import BeautifulSoup, Tag
@@ -16,8 +16,8 @@ def _replace_with_livecarta_anchor_tag(anchor, i):
return new_tag
def preprocess_footnotes(source_html_tag: Tag, href2soup_html: dict = None, noteref_attr_name="epub:type") \
-> Tuple[list, list, list]:
def preprocess_footnotes(source_html_tag: Tag, href2soup_html: dict = None, noteref_attr_name: str = "epub:type") \
-> Tuple[List, List, List]:
"""
This function preprocessing footnotes
This function should be earlier that adding fonts in pipeline.
@@ -87,5 +87,4 @@ def preprocess_footnotes(source_html_tag: Tag, href2soup_html: dict = None, note
noteref.attrs["data-id"] = i + 1
noteref.attrs["id"] = f"footnote-{i + 1}"
footnote.attrs["href"] = f"#footnote-{i + 1}"
return footnotes, new_noterefs_tags, new_footnotes_tags

View File

@@ -1,14 +1,16 @@
import re
import json
from bs4 import BeautifulSoup, NavigableString, Comment, Tag
from typing import List, Dict, Union
from bs4 import BeautifulSoup, Tag, NavigableString, Comment
from bs4.element import PageElement
from src.util.helpers import BookLogger
class HtmlEpubPreprocessor:
def __init__(self, preset_path="../../presets/presets.json", logger=None):
def __init__(self, preset_path: str = "../../presets/presets.json", logger: BookLogger = None):
self.preset = json.load(open(preset_path))
self.logger: BookLogger = logger
self.logger = logger
self.name2function = {
"table_wrapper": self._wrap_tags_with_table,
"replacer": self._tags_to_correspond_livecarta_tag,
@@ -18,33 +20,37 @@ class HtmlEpubPreprocessor:
}
@staticmethod
def _add_span_to_save_ids_for_links(tag_to_be_removed, chapter_tag: BeautifulSoup):
def _add_span_to_save_ids_for_links(tag_to_be_removed: Union[PageElement, BeautifulSoup],
chapter_tag: BeautifulSoup):
"""
Function adds span with id from tag_to_be_removed
because this tag will be removed(unwrapped/extract)
Parameters
----------
tag_to_be_removed: Soup object
tag_to_be_removed: Union[PageElement, BeautifulSoup]
chapter_tag: BeautifulSoup
Returns
-------
None
NoReturn
updated body tag
"""
def _insert_span_with_attrs_before_tag(chapter_tag: BeautifulSoup, tag_to_be_removed: Tag, id_: str,
class_: list):
def _insert_span_with_attrs_before_tag(chapter_tag: BeautifulSoup,
tag_to_be_removed: Tag,
id_: str,
class_: Union[List[str], str]):
"""Function inserts span before tag aren't supported by LiveCarta"""
new_tag = chapter_tag.new_tag("span")
new_tag: Tag = chapter_tag.new_tag("span")
new_tag.attrs["id"] = id_ or ""
new_tag.attrs["class"] = class_ or ""
new_tag.string = "\xa0"
tag_to_be_removed.insert_before(new_tag)
if tag_to_be_removed.attrs.get("id"):
_insert_span_with_attrs_before_tag(chapter_tag=chapter_tag, tag_to_be_removed=tag_to_be_removed,
_insert_span_with_attrs_before_tag(chapter_tag=chapter_tag,
tag_to_be_removed=tag_to_be_removed,
id_=tag_to_be_removed.attrs["id"],
class_=tag_to_be_removed.attrs.get("class"))
@@ -78,7 +84,7 @@ class HtmlEpubPreprocessor:
Returns
-------
None
NoReturn
Chapter Tag without comments
"""
@@ -110,27 +116,32 @@ class HtmlEpubPreprocessor:
p_tag.append(str(node))
node.replace_with(p_tag)
def _wrap_tags_with_table(self, chapter_tag: BeautifulSoup, rules: list):
def _wrap_tags_with_table(self,
chapter_tag: BeautifulSoup,
rules: List[Dict[str, List[Union[str, Dict[str, str]]]]]):
"""
Function wraps <tag> with <table>
Parameters
----------
chapter_tag: BeautifulSoup
Tag & contents of the chapter tag
rules: List[Dict[str, List[str, Dict[str, str]]]]
list of conditions when fire function
Returns
-------
None
NoReturn
Chapter Tag with wrapped certain tags with <table>
"""
def _wrap_tag_with_table(width="100", border="", bg_color=None):
def _wrap_tag_with_table(width: str = "100", border: str = "", bg_color: str = None) -> Tag:
table = chapter_tag.new_tag("table")
table.attrs["border"], table.attrs["align"], table.attrs["style"] \
= border, "center", f"width:{width}%;"
tbody, tr, td = \
chapter_tag.new_tag("tbody"), chapter_tag.new_tag("tr"), chapter_tag.new_tag("td")
chapter_tag.new_tag("tbody"), chapter_tag.new_tag(
"tr"), chapter_tag.new_tag("td")
td.attrs["bgcolor"] = bg_color
tag_to_wrap.wrap(td)
td.wrap(tr)
@@ -141,8 +152,10 @@ class HtmlEpubPreprocessor:
def process_tag_using_table():
_wrap_tag_with_table(
width=tag_to_wrap.attrs["width"] if tag_to_wrap.attrs.get("width") else "100",
border=tag_to_wrap.attrs["border"] if tag_to_wrap.attrs.get("border") else None,
width=tag_to_wrap.attrs["width"] if tag_to_wrap.attrs.get(
"width") else "100",
border=tag_to_wrap.attrs["border"] if tag_to_wrap.attrs.get(
"border") else None,
bg_color=tag_to_wrap.attrs["bgcolor"] if tag_to_wrap.attrs.get("bgcolor") else None)
self._add_span_to_save_ids_for_links(tag_to_wrap, chapter_tag)
tag_to_wrap.unwrap()
@@ -155,23 +168,26 @@ class HtmlEpubPreprocessor:
process_tag_using_table()
@staticmethod
def _tags_to_correspond_livecarta_tag(chapter_tag: BeautifulSoup, rules: list):
def _tags_to_correspond_livecarta_tag(chapter_tag: BeautifulSoup,
rules: List[Dict[str, Union[List[str], str, int, Dict[str, Union[str, int]]]]]):
"""
Function to replace all tags to correspond LiveCarta tags
Parameters
----------
chapter_tag: BeautifulSoup
Tag & contents of the chapter tag
rules: List[Dict[str, Union[List[str], str, int, Dict[str, Union[str, int]]]]]
list of conditions when fire function
Returns
-------
None
NoReturn
Chapter Tag with all tags replaced with LiveCarta tags
"""
for rule in rules:
tags = rule["tags"]
tag_to_replace = rule["tag_to_replace"]
tags: List[str] = rule["tags"]
tag_to_replace: str = rule["tag_to_replace"]
if rule["condition"]:
for condition_on_tag in ((k, v) for k, v in rule["condition"].items() if v):
if condition_on_tag[0] == 'parent_tags':
@@ -193,40 +209,44 @@ class HtmlEpubPreprocessor:
tag.name = tag_to_replace
@staticmethod
def _replace_attrs_in_tags(chapter_tag: BeautifulSoup, rules: list):
def _replace_attrs_in_tags(chapter_tag: BeautifulSoup, rules: List[Dict[str, Union[str, Dict[str, List[str]]]]]):
"""
Function to replace all tags to correspond LiveCarta tags
Parameters
----------
chapter_tag: BeautifulSoup
Tag & contents of the chapter tag
rules: List[Dict[str, Union[str, Dict[str, List[str]]]]]
list of conditions when fire function
Returns
-------
None
NoReturn
Chapter Tag with all tags replaced with LiveCarta tags
"""
for rule in rules:
attr = rule["attr"]
tags = rule["condition"]["tags"]
tags: List[str] = rule["condition"]["tags"]
attr_to_replace = rule["attr_to_replace"]
for tag in chapter_tag.find_all([re.compile(tag) for tag in tags],
{attr: re.compile(r".*")}):
tag[attr_to_replace] = tag[attr]
del tag[attr]
def _unwrap_tags(self, chapter_tag: BeautifulSoup, rules: dict):
def _unwrap_tags(self, chapter_tag: BeautifulSoup, rules: Dict[str, List[str]]):
"""
Function unwrap tags and moves id to span
Parameters
----------
chapter_tag: BeautifulSoup
Tag & contents of the chapter tag
rules: Dict[str, List[str]]
dict of tags to unwrap
Returns
-------
None
NoReturn
Chapter Tag with unwrapped certain tags
"""
@@ -239,21 +259,23 @@ class HtmlEpubPreprocessor:
tag.unwrap()
@staticmethod
def _insert_tags_into_correspond_tags(chapter_tag: BeautifulSoup, rules: list):
def _insert_tags_into_correspond_tags(chapter_tag: BeautifulSoup, rules: List[Dict[str, Union[List[str], str, Dict[str, Union[str, int]]]]]):
"""
Function inserts tags into correspond tags
Parameters
----------
chapter_tag: BeautifulSoup
Tag & contents of the chapter tag
rules: List[Dict[str, Union[List[str], str, Dict[str, Union[str, int]]]]]
list of conditions when fire function
Returns
-------
None
NoReturn
Chapter Tag with inserted tags
"""
def insert(tag):
def insert(tag: Tag):
tag_to_insert = \
chapter_tag.new_tag(rule["tag_to_insert"])
# insert all items that was in tag to subtag and remove from tag
@@ -263,7 +285,7 @@ class HtmlEpubPreprocessor:
tag.append(tag_to_insert)
for rule in rules:
tags = rule["tags"]
tags: List[str] = rule["tags"]
if rule["condition"]:
for condition_on_tag in ((k, v) for k, v in rule["condition"].items() if v):
if condition_on_tag[0] == 'parent_tags':
@@ -283,29 +305,28 @@ class HtmlEpubPreprocessor:
for tag in chapter_tag.find_all([re.compile(tag) for tag in tags]):
insert(tag)
def _remove_headings_content(self, chapter_tag, title_of_chapter: str):
def _remove_headings_content(self, chapter_tag: Union[BeautifulSoup, PageElement], title_of_chapter: str):
"""
Function
- cleans/removes headings from chapter in order to avoid duplication of chapter titles in the content
- adds span with id in order to
Parameters
----------
chapter_tag: soup object
chapter_tag: Union[BeautifulSoup, PageElement]
Tag of the page
title_of_chapter: str
Chapter title
Returns
-------
None
NoReturn
clean/remove headings & add span with id
"""
title_of_chapter = title_of_chapter.lower()
if title_of_chapter == "chapter 1":
pass
for tag in chapter_tag.contents:
text = tag if isinstance(tag, NavigableString) else tag.text
tag: PageElement
text: str = tag if isinstance(tag, NavigableString) else tag.text
if re.sub(r"[\s\xa0]", "", text):
text = re.sub(r"[\s\xa0]", " ", text).lower()
text = text.strip() # delete extra spaces
@@ -333,7 +354,7 @@ class HtmlEpubPreprocessor:
Returns
-------
None
NoReturn
Chapter Tag with processed tables
"""
@@ -370,7 +391,7 @@ class HtmlEpubPreprocessor:
Returns
-------
None
NoReturn
Chapter Tag without original classes of the book
"""
@@ -413,9 +434,9 @@ class HtmlEpubPreprocessor:
# 2.
self._wrap_strings_with_p(content_tag)
# 3-6.
for dict in self.preset:
func = self.name2function[dict["preset_name"]]
func(content_tag, dict['rules'])
for rule in self.preset:
func = self.name2function[rule["preset_name"]]
func(content_tag, rule['rules'])
# 7.
if remove_title_from_chapter:
self._remove_headings_content(content_tag, title_str)

View File

@@ -1,13 +1,14 @@
import os
import pathlib
from typing import Dict
from bs4 import BeautifulSoup
from src.access import Access
def save_image_to_aws(access: Access, img_file_path: str, img_content: bytes, book_id: str):
def save_image_to_aws(access: Access, img_file_path: str, img_content: bytes, book_id: str) -> str:
"""Function saves all images to Amazon web service"""
link_path = access.send_image(
link_path: str = access.send_image(
img_file_path, doc_id=book_id, img_content=img_content)
return link_path
@@ -27,11 +28,11 @@ def save_image_locally(img_file_path: str, img_content: bytes, book_id: str):
def update_images_src_links(body_tag: BeautifulSoup,
img_href2img_content: dict,
img_href2img_content: Dict[str, bytes],
path_to_html: str,
access=None,
path2aws_path: dict = None,
book_id: str = None) -> dict:
access: Access = None,
path2aws_path: Dict[str, str] = None,
book_id: str = None) -> Dict[str, str]:
"""Function makes dictionary image_src_path -> Amazon web service_path"""
img_tags = body_tag.find_all("img")
for img in img_tags:
@@ -43,7 +44,7 @@ def update_images_src_links(body_tag: BeautifulSoup,
assert path_to_img_from_root in img_href2img_content, \
f"Image {path_to_img_from_html} in file {path_to_html} was not added to manifest."
img_content = img_href2img_content[path_to_img_from_root]
img_content: bytes = img_href2img_content[path_to_img_from_root]
if access is not None:
if path_to_img_from_root in path2aws_path:
new_folder = path2aws_path[path_to_img_from_root]

View File

@@ -1,9 +1,8 @@
import re
import cssutils
from typing import List
from logging import CRITICAL
from bs4 import BeautifulSoup
from bs4 import BeautifulSoup, Tag
from src.livecarta_config import LiveCartaConfig
@@ -11,13 +10,13 @@ cssutils.log.setLevel(CRITICAL)
class TagInlineStyleProcessor:
def __init__(self, tag_inline_style):
def __init__(self, tag_inline_style: Tag):
# tag with inline style + style parsed from css file
self.tag_inline_style = tag_inline_style
self.tag_inline_style.attrs['style'] = self.process_inline_style()
self.tag_inline_style.attrs['style']: str = self.process_inline_style()
@staticmethod
def remove_white_if_no_bgcolor(style_, tag):
def remove_white_if_no_bgcolor(style_: str, tag: Tag) -> str:
"""Function remove text white color if there is no bg color"""
if "background" in style_:
style_ = style_.replace(
@@ -62,13 +61,13 @@ class TagInlineStyleProcessor:
# return split_style
@staticmethod
def indents_processing(split_style: list) -> str:
def indents_processing(split_style: List[str]) -> str:
"""
Function process indents from left using
formula_of_indent: indent = abs(margin - text_indent)
Parameters
----------
split_style: list
split_style: List[str]
list of styles split by ";"
Returns
@@ -111,7 +110,7 @@ class TagInlineStyleProcessor:
return processed_style
return processed_style
def process_inline_style(self):
def process_inline_style(self) -> str:
"""
Function processes final(css+initial inline) inline style
Steps
@@ -180,7 +179,7 @@ class TagInlineStyleProcessor:
self.tag_inline_style.append(correspond_tag)
@staticmethod
def wrap_span_in_tag_to_save_style_attrs(initial_tag):
def wrap_span_in_tag_to_save_style_attrs(initial_tag: Tag):
"""Function designed to save style attrs that cannot be in tag.name -> span"""
dictkeys_pattern = re.compile("|".join(LiveCartaConfig.LIVECARTA_STYLES_CAN_BE_IN_TAG))
if re.findall(dictkeys_pattern, initial_tag.name) and initial_tag.attrs.get("style"):
@@ -212,7 +211,7 @@ class TagInlineStyleProcessor:
initial_tag.attrs["style"] = span_style
initial_tag.wrap(tag)
def convert_initial_tag(self):
def convert_initial_tag(self) -> Tag:
self.change_attrs_with_corresponding_tags()
self.wrap_span_in_tag_to_save_style_attrs(self.tag_inline_style)
return self.tag_inline_style

View File

@@ -4,8 +4,7 @@ import argparse
def parse_args():
parser = argparse.ArgumentParser(description="Utility for folders's clean up.")
parser.add_argument('-f', '--folders', type=str, nargs='*', help='Names of the folders to be cleaned.')
parser.add_argument("-f", "--folders", type=str, nargs="*", help="Names of the folders to be cleaned.")
args = parser.parse_args()
return args
@@ -18,10 +17,10 @@ def check_dir(dir_path):
raise exc
if __name__ == '__main__':
if __name__ == "__main__":
folders = parse_args().folders
if not folders:
folders = ['docx', 'html', 'json', 'logs', 'config']
folders = ["docx", "html", "json", "logs", "config"]
folder_path = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
folders = [os.path.join(folder_path, folder) for folder in folders]

View File

@@ -6,15 +6,15 @@ import subprocess
def parse_args():
parser = argparse.ArgumentParser(description="Utility for checking installed packages.")
parser.add_argument('-p', '--packages', type=str, nargs='*', help='Names of the packages.')
parser.add_argument("-p", "--packages", type=str, nargs="*", help="Names of the packages.")
args = parser.parse_args()
return args
def check_packages(required_packs):
inst = subprocess.check_output([sys.executable, '-m', 'pip', 'freeze'])
installed_packages = [r.decode().split('==')[0] for r in inst.split()]
inst = subprocess.check_output([sys.executable, "-m", "pip", "freeze"])
installed_packages = [r.decode().split("==")[0] for r in inst.split()]
to_be_installed = []
for package in required_packs:
@@ -24,19 +24,19 @@ def check_packages(required_packs):
return to_be_installed
if __name__ == '__main__':
if __name__ == "__main__":
required_packs = parse_args().packages
if not required_packs:
folder_path = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
req_path = os.path.join(folder_path, 'requirements.txt')
req_path = os.path.join(folder_path, "requirements.txt")
with open(req_path, 'r') as f:
with open(req_path, "r") as f:
packs = f.readlines()
required_packs = [pack.split('>=')[0] for pack in packs]
required_packs = [pack.split(">=")[0] for pack in packs]
not_inst_packs = check_packages(required_packs)
if not_inst_packs:
raise Exception(f'{" ".join(not_inst_packs)} are not installed.')
raise Exception(f"{' '.join(not_inst_packs)} are not installed.")
else:
print('All required packages has been installed.')
print("All required packages has been installed.")

View File

@@ -1,5 +1,5 @@
import re
from typing import Tuple
from colorsys import hls_to_rgb
from webcolors import html4_hex_to_names, hex_to_rgb, rgb_to_name, rgb_percent_to_hex, rgb_to_hex, css3_names_to_hex
@@ -48,7 +48,7 @@ def hex2color_name(color):
return closest_name
def str2closest_html_color_name(s: str):
def str2closest_html_color_name(s: str) -> str:
""" Transform str -> closest color name """
if 'rgb' in s:
rgb_str = 'rgba' if ('rgba' in s) else 'rgb'
@@ -82,7 +82,7 @@ def str2closest_html_color_name(s: str):
return ''
def rgba2rgb(r, g, b, alpha):
def rgba2rgb(r: int, g: int, b: int, alpha: float) -> Tuple[int, int, int]:
""" Transform rgba -> rgb """
r_background, g_background, b_background = 255, 255, 255
r_new = int((1 - alpha) * r_background + alpha * r)
@@ -91,7 +91,7 @@ def rgba2rgb(r, g, b, alpha):
return r_new, g_new, b_new
def str2hex(s: str):
def str2hex(s: str) -> str:
""" Transform str -> hex """
if '#' in s and (len(s) <= 7):
return s.lower()
@@ -131,7 +131,6 @@ def str2hex(s: str):
if __name__ == '__main__':
colors = [
(75, 0, 130), (255, 0, 255),
(139, 69, 19), (46, 139, 87),

View File

@@ -1,51 +1,60 @@
import os
import logging
from typing import Union
class ColoredFormatter(logging.Formatter):
""" Class to prettify logger and command line output """
MAPPING = {
'DEBUG': 37, # white
'INFO': 36, # cyan
'WARNING': 33, # yellow
'ERROR': 31, # red
'CRITICAL': 41, # white on red bg
"DEBUG": 37, # white
"INFO": 36, # cyan
"WARNING": 33, # yellow
"ERROR": 31, # red
"CRITICAL": 41, # white on red bg
}
PREFIX = '\033['
SUFFIX = '\033[0m'
PREFIX = "\033["
SUFFIX = "\033[0m"
def __init__(self, pattern):
logging.Formatter.__init__(self, pattern)
def format(self, record):
seq = self.MAPPING.get(record.levelname, 37) # default white
record.levelname = '{0}{1}m{2}{3}' \
record.levelname = "{0}{1}m{2}{3}" \
.format(self.PREFIX, seq, record.levelname, self.SUFFIX)
return logging.Formatter.format(self, record)
class BookLogger:
def __init__(self, name, book_id, main_logger=None,
filemode='w+', logging_level=logging.INFO,
logging_format='%(asctime)s - %(levelname)s - %(message)s [%(filename)s:%(lineno)d in %(funcName)s]'):
def __init__(self, name: str, book_id: Union[int, str], main_logger: logging.Logger = None,
filemode: str = "w+", logging_level: int = logging.INFO,
logging_format: str = "%(asctime)s - %(levelname)s - %(message)s [%(filename)s:%(lineno)d in %(funcName)s]"):
"""
Method for Logger configuration. Logger will write to file.
:param name: name of the Logger.
:param attr_name: name of attribute that will be added to self.
:param filename: name of the log file.
:param filemode: mode of opening log file.
:param logging_level: logging level: 10 - debug, 20 - info, 30 - warning, 40 - error, 50 - critical.
:param logging_format: format of record in log file.
Parameters
----------
name: str
name of the Logger
book_id: Union[int, str]
id of the book
main_logger: Logger
main logger of the converter
filemode: str
mode of opening log file.
logging_level: int
logging level: 10 - debug, 20 - info, 30 - warning, 40 - error, 50 - critical
logging_format: str
format of record in log file
"""
self.main_logger = main_logger
self.logger = logging.getLogger(name)
self.logger.propagate = False
folder_path = os.path.dirname(
os.path.dirname(os.path.abspath(__file__)))
folder_path = os.path.dirname(folder_path)
filename = f'logs/{book_id}.log'
filename = f"logs/{book_id}.log"
file_path = os.path.join(folder_path, filename)
file_handler = logging.FileHandler(file_path, mode=filemode)
file_format = logging.Formatter(logging_format)
@@ -58,42 +67,46 @@ class BookLogger:
self.logger.addHandler(stream_handler)
self.logger.setLevel(logging_level)
def log(self, message, logging_level=20):
def log(self, message: str, logging_level: int = 20):
"""
Method for logging.
Parameters
----------
message: str
body of the message
logging_level: int
level of logging
:param message: body of the message
:param logging_level: level of logging
"""
self.logger.log(msg=message, level=logging_level, stacklevel=2)
def log_error_to_main_log(self, message=''):
def log_error_to_main_log(self, message: str = ""):
""" Method for logging error to main log file. """
if self.main_logger:
if not message:
message = f'Error in book conversion. Check log file.'
message = f"Error in book conversion. Check log file."
self.main_logger.error(message)
class BookStatusWrapper:
"""Class sets/updates statuses of Converter on Platform"""
def __init__(self, access, logger_object, book_id=0):
def __init__(self, access, logger_object: BookLogger, book_id: int = 0):
self.access = access
self.logger_object = logger_object
self.book_id = book_id
def set_status(self, status: str):
str_2_status = {
'[PROCESS]': self.access.PROCESS,
'[GENERATE]': self.access.GENERATE,
'[ERROR]': self.access.ERROR
"[PROCESS]": self.access.PROCESS,
"[GENERATE]": self.access.GENERATE,
"[ERROR]": self.access.ERROR
}
try:
if self.access:
self.access.update_status(self.book_id, str_2_status[status])
self.logger_object.log(f'Status has been updated to {status}.')
self.logger_object.log(f"Status has been updated to {status}.")
except Exception as exc:
self.logger_object.log(
f"Can't update status of the book {status}.", logging.ERROR)
@@ -101,10 +114,10 @@ class BookStatusWrapper:
raise exc
def set_processing(self):
self.set_status('[PROCESS]')
self.set_status("[PROCESS]")
def set_generating(self):
self.set_status('[GENERATE]')
self.set_status("[GENERATE]")
def set_error(self):
self.set_status('[ERROR]')
self.set_status("[ERROR]")

View File

@@ -1,4 +1,4 @@
from webcolors import html4_hex_to_names, hex_to_rgb
from webcolors import hex_to_rgb
# 16 основных цветов, hex соответвуют hex цветам livecarta
# названия другие
@@ -42,18 +42,23 @@ for key, name in html4_hex_to_names.items():
HTML_COLORS_HSV[name] = (h, s, v)
def rgb2closest_html_color_name(color):
def rgb2closest_html_color_name(color: str):
"""
- get color in hsv (hue, saturation, value)
- try to match with black, grey, silver (black, darkGray, lightGray) as this colors matches badly even in hsv model
- calc hue difference between color and all base colors
- if for new base color hue diff same as for any other, try to measure saturation and value
(it happens for similar colors like red - pink, blue - dark blue)
Parameters
----------
color: str
color in hex
Returns
-------
base color name that matches best to a given color
:param color: str, color in hex
:return: base color name that matches best to a given color
"""
if color == (255, 255, 255):