diff --git a/consumer.py b/consumer.py index dfa0b16..7a78525 100644 --- a/consumer.py +++ b/consumer.py @@ -3,6 +3,7 @@ import sys import json import pika import logging +from typing import Dict from pathlib import Path from threading import Event from functools import partial @@ -13,8 +14,8 @@ from src.docx_converter.docx_solver import DocxBook from src.epub_converter.epub_solver import EpubBook -def configure_file_logger(name, filename="logs/converter.log", filemode="w+", - logging_level=logging.INFO): +def configure_file_logger(name: str, filename: str = "logs/converter.log", + filemode: str = "w+", logging_level: int = logging.INFO) -> logging.Logger: logger = logging.getLogger(name) folder_path = os.path.dirname(os.path.abspath(__file__)) @@ -30,7 +31,7 @@ def configure_file_logger(name, filename="logs/converter.log", filemode="w+", return logger -def local_convert_book(book_type: [DocxBook, EpubBook], book_id, logger, params: dict): +def local_convert_book(book_type: [DocxBook, EpubBook], book_id: int, logger: logging.Logger, params: dict): logger.info(f"Start processing book-{book_id}.") try: json_file_path = "books/json/9781614382264.json" @@ -41,7 +42,7 @@ def local_convert_book(book_type: [DocxBook, EpubBook], book_id, logger, params: logger.info(f"Book-{book_id} has been proceeded.") -def convert_book(book_type: [DocxBook, EpubBook], book_id, logger, params: dict): +def convert_book(book_type: [DocxBook, EpubBook], book_id: int, logger: logging.Logger, params: Dict[str, Access]): logger.info(f"Start processing book-{book_id}.") try: book = book_type(book_id=book_id, main_logger=logger, **params) @@ -51,7 +52,7 @@ def convert_book(book_type: [DocxBook, EpubBook], book_id, logger, params: dict) logger.info(f"Book-{book_id} has been proceeded.") -def callback(ch, method, properties, body, logger, libre_locker): +def callback(ch, method, properties, body: bytes, logger: logging.Logger, libre_locker: Event): print(f"Message: {body}.") logger.info(f"Message: {body}.") try: @@ -82,7 +83,6 @@ def callback(ch, method, properties, body, logger, libre_locker): logger.error(f"{sys.exc_info()[0]}: {exc.message}") else: logger.error(f"{sys.exc_info()[0]}: {str(exc)}") - finally: pass diff --git a/presets/.gitignore b/presets/.gitignore index d6b7ef3..c3bf4b1 100644 --- a/presets/.gitignore +++ b/presets/.gitignore @@ -1,2 +1,3 @@ * !.gitignore +!*.json \ No newline at end of file diff --git a/presets/docx_presets.json b/presets/docx_presets.json new file mode 100644 index 0000000..fed2d62 --- /dev/null +++ b/presets/docx_presets.json @@ -0,0 +1,152 @@ +[ + { + "preset_name": "wrapper", + "rules": [ + { + "tags": ["^div$"], + "condition": { + "parent_tags": null, + "child_tags": null, + "attrs": [ + { + "name": "id", + "value": "^Table of Contents\\d+" + } + ] + }, + "tag_to_wrap": "TOC" + } + ] + }, + { + "preset_name": "decomposer", + "rules": [ + { + "tags": ["^div$"], + "condition": { + "parent_tags": null, + "child_tags": null, + "attrs": [ + { + "name": "title", + "value": "footer" + }, + { + "name": "id", + "value": "^Table of Contents\\d+" + } + ] + } + } + ] + }, + { + "preset_name": "replacer", + "rules": [ + { + "tags": ["^h[6-9]$"], + "condition": null, + "tag_to_replace": "p" + }, + { + "tags": ["^div$"], + "condition": { + "parent_tags": null, + "child_tags": null, + "attrs": [ + { + "name": "style", + "value": "column-count: 2" + } + ] + }, + "tag_to_replace": "p" + } + ] + }, + { + "preset_name": "attr_replacer", + "rules": [ + { + "attr": { + "name": "style", + "value": "column-count: 2" + }, + "condition": { + "tags": ["^p$"] + }, + "attr_to_replace": { + "name": "class", + "value": "columns2" + } + } + ] + }, + { + "preset_name": "unwrapper", + "rules": [ + { + "tags": ["^span$"], + "condition": { + "parent_tags": ":is(h1, h2, h3, h4, h5, h6, h7, h8, h9)", + "child_tags": null, + "attrs": [ + { + "name": "style", + "value": "(^background: #[\\da-fA-F]{6}$)|(^letter-spacing: -?[\\d.]+pt$)" + }, + { + "name": "lang", + "value": "^ru-RU$" + }, + { + "name": "face", + "value": "^Times New Roman[\\w, ]+$" + } + ] + } + }, + { + "tags": ["^p$"], + "condition": { + "parent_tags": ":is(li)", + "child_tags": null, + "attrs": null + } + }, + { + "tags": ["^a$"], + "condition": { + "parent_tags": null, + "child_tags": null, + "attrs": [ + { + "name": "name", + "value": "_GoBack" + } + ] + } + }, + { + "tags": ["^u$"], + "condition": { + "parent_tags": ":is(a)", + "child_tags": ":is(a)", + "attrs": null + } + }, + { + "tags": ["^b$"], + "condition": { + "parent_tags": ":is(h1, h2, h3, h4, h5, h6, h7, h8, h9)", + "child_tags": null, + "attrs": null + } + }, + { + "tags": ["^div$"], + "condition": null + } + ] + } +] diff --git a/presets/presets.json b/presets/epub_presets.json similarity index 51% rename from presets/presets.json rename to presets/epub_presets.json index 7272038..1ff62a8 100644 --- a/presets/presets.json +++ b/presets/epub_presets.json @@ -3,30 +3,38 @@ "preset_name": "table_wrapper", "rules": [ { - "tags": ["div"], - "attrs": [ - { - "name": "width", - "value": ".*" - }, - { - "name": "border", - "value": ".*" - }, - { - "name": "bgcolor", - "value": ".*" - } - ] + "tags": ["^div$"], + "condition": { + "parent_tags": null, + "child_tags": null, + "attrs": [ + { + "name": "width", + "value": ".*" + }, + { + "name": "border", + "value": ".*" + }, + { + "name": "bgcolor", + "value": ".*" + } + ] + } }, { - "tags": ["section", "blockquote"], - "attrs": [ + "tags": ["^section$", "^blockquote$"], + "condition": { + "parent_tags": null, + "child_tags": null, + "attrs": [ { "name": "class", "value": "feature[1234]" } ] + } } ] }, @@ -73,37 +81,53 @@ "preset_name": "attr_replacer", "rules": [ { - "attr": "xlink:href", - "condition": { - "tags": ["img"] + "attr": { + "name": "xlink:href", + "value": ".*" }, - "attr_to_replace": "src" + "condition": { + "tags": ["^img$"] + }, + "attr_to_replace": { + "name": "src", + "value": null + } } ] }, { "preset_name": "unwrapper", - "rules": { - "tags": [ - "section", - "article", - "figcaption", - "main", - "body", - "html", - "svg", - "li > p" - ] - } + "rules": [ + { + "tags": [ + "^section$", + "^article$", + "^figcaption$", + "^main$", + "^body$", + "^html$", + "^svg$" + ], + "condition": null + }, + { + "tags": ["^p$"], + "condition": { + "parent_tags": "li", + "child_tags": null, + "attrs": null + } + } + ] }, { "preset_name": "inserter", "rules": [ { - "tags": ["pre"], + "tags": ["^pre$"], "condition": { "parent_tags": null, - "child_tags": ":not(code, kbd, var)", + "child_tags": ":not(:has(code, kbd, var))", "attrs": null }, "tag_to_insert": "code" diff --git a/src/access.py b/src/access.py index 6d22202..379f47c 100644 --- a/src/access.py +++ b/src/access.py @@ -1,17 +1,23 @@ -import json import os +import json import time import requests -from threading import Event from io import BytesIO +from threading import Event +from typing import List, Tuple, Dict, Union class Access: """Class accessing our platform""" - def __init__(self, url=None): + def __init__(self, url: str = None): """ - :param url: str, url received from queue message, if field apiURL exists - else None + Parameters + ---------- + url: str + url received from queue message, + if field apiURL exists + else None + """ self.PENDING = 1 self.PROCESS = 2 @@ -19,6 +25,7 @@ class Access: self.FINISH = 4 self.ERROR = 5 + self.url = None self.username = None self.password = None @@ -32,12 +39,12 @@ class Access: self.get_token() self.refreshing.set() - def set_credentials(self, url): - folder_path = os.path.dirname( + def set_credentials(self, url: str): + folder_path: str = os.path.dirname( os.path.dirname(os.path.abspath(__file__))) - config_path = os.path.join(folder_path, "config/api_config.json") + config_path: str = os.path.join(folder_path, "config/api_config.json") with open(config_path, "r") as f: - params = json.load(f) + params: Dict[str, str] = json.load(f) self.refreshing.clear() self.url = url @@ -64,7 +71,7 @@ class Access: } response = requests.post( f'{self.url}/token', json=json_form, - # auth=('kiryl.miatselitsa', 'iK4yXCvdyHFEEOvG2v3F') + # auth=('kiryl.miatselitsa', 'iK4yXCvdyHFEEOvG2v3F') ) if response.status_code == 400: @@ -104,7 +111,7 @@ class Access: else: raise Exception(f'{response.status_code}') - def get_file(self, file_path): + def get_file(self, file_path: str) -> bytes: """Function downloads the file[book, preset] from site""" if self.is_time_for_refreshing(): self.refresh_token() @@ -124,10 +131,10 @@ class Access: f'status code:{response.status_code}') return content - def sleep(timeout: float, retry=3): + def sleep(timeout: float, retry: int = 3): def decorator(function): """Decorator sleeping timeout sec and makes 3 retries""" - def wrapper(*args, **kwargs): + def wrapper(*args, **kwargs) -> str: retries = 0 while retries < retry: try: @@ -141,14 +148,14 @@ class Access: return decorator @sleep(3) - def send_image(self, img_path, doc_id, img_content: bytes = None): + def send_image(self, img_path: str, doc_id: str, img_content: bytes = None) -> str: """Function sends images to site""" if self.is_time_for_refreshing(): self.refresh_token() self.refreshing.wait() - img_obj = BytesIO(img_content) if img_content else open(img_path, 'rb') - files = { + img_obj: BytesIO = BytesIO(img_content) if img_content else open(img_path, 'rb') + files: Dict[str, Tuple[str, BytesIO]] = { 'image': (os.path.basename(img_path), img_obj) } response = requests.post( @@ -165,7 +172,7 @@ class Access: f'{response.status_code} Bad request: {response.json()["message"]}.') return img_url - def send_book(self, doc_id, content): + def send_book(self, doc_id: int, content: Dict[str, List[Dict[str, Union[List, str]]]]): """Function sends the book to site""" if self.is_time_for_refreshing(): self.refresh_token() @@ -184,7 +191,7 @@ class Access: raise Exception( f'{response.status_code} Bad request: {response.json()["message"]}.') - def update_status(self, doc_id, status): + def update_status(self, doc_id: Union[int, str], status: int): """Function updates status of the book on site""" if self.is_time_for_refreshing(): self.refresh_token() diff --git a/src/book_solver.py b/src/book_solver.py index a7625d5..4c42f3f 100644 --- a/src/book_solver.py +++ b/src/book_solver.py @@ -3,6 +3,7 @@ import json import codecs import logging import pathlib +from typing import List, Dict, Union from abc import abstractmethod, ABCMeta from src.livecarta_config import LiveCartaConfig @@ -20,7 +21,7 @@ class BookSolver: __metaclass__ = ABCMeta - def __init__(self, book_id=0, access=None, main_logger=None): + def __init__(self, book_id: int = 0, access=None, main_logger=None): self.book_type = None self.book_id = book_id self.access = access @@ -36,22 +37,30 @@ class BookSolver: assert LiveCartaConfig.SUPPORTED_LEVELS == len(LiveCartaConfig.SUPPORTED_HEADERS), \ "Length of headers doesn't match allowed levels." - def save_file(self, content: bytes, path_to_save, file_type): + def save_file(self, content: bytes, path_to_save: str, file_type: str) -> str: """ Function saves binary content of file to folder(path_to_save) Parameters ---------- + content: bytes str binary content of the file + path_to_save: str + path to the folder + file_type: str + Returns + ---------- + file_path: str + path to file on local """ - folder_path = os.path.dirname( + folder_path: str = os.path.dirname( os.path.dirname(os.path.abspath(__file__))) folder_path = os.path.join( folder_path, path_to_save) pathlib.Path(folder_path).mkdir(parents=True, exist_ok=True) - file_path = os.path.join( + file_path: str = os.path.join( folder_path, f"{self.book_id}.{file_type}") try: with open(file_path, "wb+") as file: @@ -68,13 +77,15 @@ class BookSolver: def get_preset_file(self): """Method for getting and saving preset from server""" try: - self.logger_object.log(f"Start receiving preset file from server. URL:" - f" {self.access.url}/doc-convert/{self.book_id}/presets") - content = self.access.get_file( - file_path=f"{self.access.url}/doc-convert/{self.book_id}/presets") - self.logger_object.log("Preset file was received from server.") - self.preset_path = pathlib.Path( - str(self.save_file(content, path_to_save="presets", file_type="json"))) + pass + self.preset_path = "presets/docx_presets.json" + # self.logger_object.log(f"Start receiving preset file from server. URL:" + # f" {self.access.url}/doc-convert/{self.book_id}/presets") + # content = self.access.get_file( + # file_path=f"{self.access.url}/doc-convert/{self.book_id}/presets") + # self.logger_object.log("Preset file was received from server.") + # self.preset_path = pathlib.Path( + # str(self.save_file(content, path_to_save="presets", file_type="json"))) except FileNotFoundError as f_err: self.logger_object.log( "Can't get preset file from server.", logging.ERROR) @@ -116,7 +127,7 @@ class BookSolver: parents=True, exist_ok=True) self.book_output_path.touch(exist_ok=True) - def write_to_json(self, content: dict): + def write_to_json(self, content: Dict[str, List[Dict[str, Union[List, str]]]]): self.check_output_directory() try: with codecs.open(self.book_output_path, "w", encoding="utf-8") as f: @@ -127,7 +138,7 @@ class BookSolver: self.logger_object.log( "Error has occurred while writing .json file." + str(exc), logging.ERROR) - def send_json_content_to_server(self, content: dict): + def send_json_content_to_server(self, content: Dict[str, List[Dict[str, Union[List, str]]]]): """Function sends json_content to site""" try: self.access.send_book(self.book_id, content) @@ -140,7 +151,7 @@ class BookSolver: raise exc @abstractmethod - def get_converted_book(self): + def get_converted_book(self) -> Dict[str, List[Dict[str, Union[List, str]]]]: self.logger_object.log("Beginning of processing .json output.") self.status_wrapper.set_generating() return {} @@ -158,8 +169,9 @@ class BookSolver: self.logger_object.log( f"Beginning of conversion from .{self.book_type} to .json.") self.status_wrapper.set_processing() - content_dict = self.get_converted_book() - [os.remove(path) for path in [self.preset_path, self.book_path]] + content_dict: Dict[str, List[Dict[Union[str, List]]]] = self.get_converted_book() + # todo add delete of preset path + [os.remove(path) for path in [self.book_path]] self.logger_object.log("Beginning of processing .json output.") self.status_wrapper.set_generating() self.write_to_json(content_dict) diff --git a/src/data_objects.py b/src/data_objects.py index 110db8d..f1ca163 100644 --- a/src/data_objects.py +++ b/src/data_objects.py @@ -1,5 +1,5 @@ import re -from typing import Union +from typing import List, Dict, Union from ebooklib.epub import Section, Link from src.livecarta_config import LiveCartaConfig @@ -11,7 +11,7 @@ class NavPoint: These are data structures which form mapping from NCX to python data structures. """ - def __init__(self, obj: Union[Link, Section] = None, ): + def __init__(self, obj: Union[Link, Section] = None): self.href, self.id = self.parse_href_id(obj) self.title = obj.title @@ -52,15 +52,15 @@ def flatten(x): class ChapterItem: """ Class of Chapter that could have subchapters - These are data structures which form mapping to livecarta json structure. + These are data structures which form mapping to LiveCarta json structure. """ - def __init__(self, title, content, sub_items): + def __init__(self, title: str, content: str, sub_items: List): self.title = title self.content = content self.sub_items = sub_items - def to_dict(self, lvl=1): + def to_dict(self, lvl: int = 1) -> Dict[str, Union[str, List]]: """Function returns dictionary of chapter""" sub_dicts = [] if self.sub_items: diff --git a/src/docx_converter/docx2libre_html.py b/src/docx_converter/docx2libre_html.py index 56fe2f7..e28d98f 100644 --- a/src/docx_converter/docx2libre_html.py +++ b/src/docx_converter/docx2libre_html.py @@ -3,38 +3,40 @@ import logging import pathlib import subprocess from subprocess import PIPE +from typing import Union from threading import Event from bs4 import BeautifulSoup from src.util.helpers import BookLogger -class Docx2LibreHTML: - def __init__(self, book_id=0, file_path=None, access=None, logger=None, libre_locker=None): +class Docx2LibreHtml: + def __init__(self, book_id: int = 0, file_path: Union[pathlib.PosixPath, str] = None, + access=None, logger: BookLogger = None, libre_locker: Event = None): self.book_id = book_id if book_id != 0 else pathlib.Path( file_path).stem self.file_path = file_path self.access = access self.logger_object: BookLogger = logger # critical section for occupying libreoffice by one thread - self.libre_locker: Event() = libre_locker + self.libre_locker = libre_locker # path to html file, file appears after libre-conversion self.html_path = self.convert_docx_to_html() self.html_soup = self.read_html(self.html_path) - def _libre_run(self, out_dir_path): + def _libre_run(self, out_dir_path: str): command = ["libreoffice", "--headless", "--convert-to", "html", f"{str(self.file_path)}", "--outdir", f"{out_dir_path}"] - print(command) + # print(command) result = subprocess.run(command, stdout=PIPE, stderr=PIPE) self.logger_object.log(f"Result of libre conversion for book_{self.book_id}:" f" {result.returncode}, {result.stdout}", logging.DEBUG) self.logger_object.log(f"Any error while libre conversion for book_" f"{self.book_id}: {result.stderr}", logging.DEBUG) - def convert_docx_to_html(self): + def convert_docx_to_html(self) -> pathlib.Path: """ Function converts .docx document to .html file. Steps @@ -44,18 +46,18 @@ class Docx2LibreHTML: Returns ---------- - html_path: str + html_path: pathlib.Path path to html file, file appears after libre-conversion """ - def get_and_clear_flag(out_dir_path: str): + def get_and_clear_flag(html_file_path: str): self.libre_locker.clear() self.logger_object.log(f"Got flag!", logging.DEBUG) - self._libre_run(out_dir_path) + self._libre_run(html_file_path) self.libre_locker.set() self.logger_object.log("Cleared flag...", logging.DEBUG) - def check_file_exists(path, error_string: str): + def check_file_exists(path: pathlib.Path, error_string: str): try: f = open(path) f.close() @@ -73,19 +75,20 @@ class Docx2LibreHTML: folder_path = os.path.dirname( os.path.dirname(os.path.abspath(__file__))) - out_dir_path = os.path.join(folder_path, f"../books/html/{self.book_id}") + out_dir_path = os.path.join( + folder_path, f"../books/html/{self.book_id}") pathlib.Path(out_dir_path).mkdir(parents=True, exist_ok=True) try: - if self.libre_locker.isSet(): + if self.libre_locker.is_set(): get_and_clear_flag(out_dir_path) else: - while not self.libre_locker.isSet(): + while not self.libre_locker.is_set(): self.logger_object.log( "Waiting for libre...", logging.DEBUG) flag = self.libre_locker.wait(50) if flag: - if self.libre_locker.isSet(): + if self.libre_locker.is_set(): get_and_clear_flag(out_dir_path) break except Exception as exc: @@ -105,11 +108,11 @@ class Docx2LibreHTML: f"Input file path after conversion: {html_path}.") return html_path - def read_html(self, html_path): + def read_html(self, html_path: pathlib.Path) -> BeautifulSoup: """Method for reading .html file into beautiful soup tag.""" try: html_text = open(html_path, "r", encoding="utf8").read() - self.logger_object.log("HTML for book has been loaded.") + self.logger_object.log("Html for book has been loaded.") except FileNotFoundError as exc: self.logger_object.log("There is no html to process." "Conversion went wrong or you specified wrong paths.", logging.ERROR) diff --git a/src/docx_converter/docx_solver.py b/src/docx_converter/docx_solver.py index 5edeb46..3cd324d 100644 --- a/src/docx_converter/docx_solver.py +++ b/src/docx_converter/docx_solver.py @@ -1,22 +1,25 @@ import json import codecs +import logging from threading import Event from src.book_solver import BookSolver from src.util.helpers import BookLogger -from src.docx_converter.docx2libre_html import Docx2LibreHTML -from src.docx_converter.html_docx_preprocessor import HTMLDocxPreprocessor -from src.docx_converter.libre_html2json_converter import LibreHTML2JSONConverter +from src.html_presets_processor import HtmlPresetsProcessor +from src.style_reader import StyleReader +from src.docx_converter.docx2libre_html import Docx2LibreHtml +from src.docx_converter.html_docx_processor import HtmlDocxProcessor +from src.docx_converter.libre_html2json_converter import LibreHtml2JsonConverter class DocxBook(BookSolver): """Class of .docx type book - child of BookSolver""" - def __init__(self, book_id=0, access=None, main_logger=None, libre_locker=None): + def __init__(self, book_id: int = 0, access=None, main_logger=None, libre_locker: Event = None): super().__init__(book_id, access, main_logger) self.book_type = "docx" # critical section for occupying libreoffice by one thread - self.libre_locker: Event() = libre_locker + self.libre_locker = libre_locker def get_converted_book(self): """ @@ -34,39 +37,67 @@ class DocxBook(BookSolver): """ # 1. Converts docx to html with LibreOffice - html_converter = Docx2LibreHTML(self.book_id, self.book_path, self.access, - self.logger_object, self.libre_locker) - # todo presets + try: + html_converter = Docx2LibreHtml(self.book_id, self.book_path, self.access, + self.logger_object, self.libre_locker) + except Exception as exc: + self.logger_object.log( + "Error has occurred while converting .docx to .html.", logging.ERROR) + self.logger_object.log_error_to_main_log() + self.status_wrapper.set_error() + raise exc # 2. Parses and cleans html, gets list of tags, gets footnotes - parser = HTMLDocxPreprocessor( - html_converter.html_soup, self.logger_object) - bs_tags, footnotes, top_level_headers = parser.process_html( - self.access, html_converter.html_path, self.book_id) + try: + html_preprocessor = HtmlPresetsProcessor( + logger=self.logger_object, preset_path="presets/docx_presets.json") + style_preprocessor = StyleReader() + html_processor = HtmlDocxProcessor(html_soup=html_converter.html_soup, + logger=self.logger_object, + html_preprocessor=html_preprocessor, + style_preprocessor=style_preprocessor) + bs_tags, footnotes, top_level_headers = html_processor.process_html( + self.access, html_converter.html_path, self.book_id) + except Exception as exc: + self.logger_object.log( + "Error has occurred while processing .html", logging.ERROR) + self.logger_object.log_error_to_main_log() + self.status_wrapper.set_error() + raise exc # 3. Parses from line structure to nested structure with JSONConverter - json_converter = LibreHTML2JSONConverter(bs_tags, footnotes, top_level_headers, - self.logger_object) - content_dict = json_converter.convert_to_dict() - + try: + json_converter = LibreHtml2JsonConverter(bs_tags, footnotes, top_level_headers, + self.logger_object) + content_dict = json_converter.convert_to_dict() + except Exception as exc: + self.logger_object.log( + "Error has occurred while converting .html to .json", logging.ERROR) + self.logger_object.log_error_to_main_log() + self.status_wrapper.set_error() + raise exc return content_dict if __name__ == "__main__": - docx_file_path = "../../books/docx/music_inquiry.docx" + docx_file_path = "../../books/docx/AmericanGovernment3e-WEB.docx" logger_object = BookLogger( name="docx", book_id=docx_file_path.split("/")[-1]) locker = Event() locker.set() - html_converter = Docx2LibreHTML(file_path=docx_file_path, - logger=logger_object, libre_locker=locker) + html_converter = Docx2LibreHtml(file_path=docx_file_path, + logger=logger_object, libre_locker=locker) - parser = HTMLDocxPreprocessor(html_converter.html_soup, logger_object) - content, footnotes, top_level_headers = parser.process_html( + html_preprocessor = HtmlPresetsProcessor( + logger=logger_object, preset_path="../../presets/docx_presets.json") + style_preprocessor = StyleReader() + html_processor = HtmlDocxProcessor(html_soup=html_converter.html_soup, logger=logger_object, + html_preprocessor=html_preprocessor, style_preprocessor=style_preprocessor) + content, footnotes, top_level_headers = html_processor.process_html( html_path=html_converter.html_path, book_id=html_converter.book_id) - json_converter = LibreHTML2JSONConverter( + json_converter = LibreHtml2JsonConverter( content, footnotes, top_level_headers, logger_object) content_dict = json_converter.convert_to_dict() diff --git a/src/docx_converter/footnotes_processing.py b/src/docx_converter/footnotes_processing.py index bda6733..d5e3265 100644 --- a/src/docx_converter/footnotes_processing.py +++ b/src/docx_converter/footnotes_processing.py @@ -1,13 +1,14 @@ import re -from bs4 import BeautifulSoup, NavigableString +from typing import List +from bs4 import BeautifulSoup, Tag, NavigableString -def _clean_footnote_content(content): +def clean_footnote_content(content: str) -> str: content = content.strip() return content.strip() -def process_footnotes(body_tag): +def process_footnotes(body_tag: Tag) -> List[str]: """Function returns list of footnotes and delete them from html_soup.""" footnote_anchors = body_tag.find_all("a", class_="sdfootnoteanc") footnote_content = body_tag.find_all( @@ -32,7 +33,7 @@ def process_footnotes(body_tag): new_tag = BeautifulSoup(features="lxml").new_tag("sup") new_tag["class"] = "footnote-element" - new_tag["data-id"] = i + 1 + new_tag["data-id"] = f"{i + 1}" new_tag["id"] = f"footnote-{i + 1}" new_tag.string = "*" anc_tag.replace_with(new_tag) @@ -65,9 +66,8 @@ def process_footnotes(body_tag): else: unicode_string += child.decode_contents() - content = _clean_footnote_content(unicode_string) + content = clean_footnote_content(unicode_string) cont_tag.decompose() - footnotes.append(content) return footnotes diff --git a/src/docx_converter/html_docx_preprocessor.py b/src/docx_converter/html_docx_preprocessor.py deleted file mode 100644 index a44df01..0000000 --- a/src/docx_converter/html_docx_preprocessor.py +++ /dev/null @@ -1,588 +0,0 @@ -import re -import logging -from typing import List - -from bs4 import BeautifulSoup, NavigableString, Tag - -from src.livecarta_config import LiveCartaConfig -from src.util.helpers import BookLogger, BookStatusWrapper -from src.docx_converter.footnotes_processing import process_footnotes -from src.docx_converter.image_processing import process_images - - -class HTMLDocxPreprocessor: - - def __init__(self, html_soup, logger_object, status_wrapper=None): - self.body_tag = html_soup.body - self.html_soup = html_soup - self.logger_object: BookLogger = logger_object - self.status_wrapper: BookStatusWrapper = status_wrapper - self.top_level_headers = None - self.content = list() - - def _process_toc_links(self): - def _check_parent_link_exist_in_toc(tag_with_link): - toc_links = [] - for a_tag in tag_with_link.find_all("a", {"name": re.compile(r"^_Toc\d+")}): - link_name = a_tag.attrs["name"] - toc_item = self.body_tag.find("a", {"href": "#" + link_name}) - if toc_item: - toc_links.append(toc_item) - return len(toc_links) > 0 - """Function to extract nodes which contains TOC links, remove links from file and detect headers.""" - toc_links = self.body_tag.find_all( - "a", {"name": re.compile(r"^_Toc\d+")}) - headers = [link.parent for link in toc_links] - outline_level = "1" # All the unknown outlines will be predicted as

- for h_tag in headers: - if re.search(r"^h\d$", h_tag.name): - h_tag.a.unwrap() - # outline_level = tag.name[-1] # TODO: add prediction of the outline level - elif h_tag.name == "p": - exist_in_toc = _check_parent_link_exist_in_toc(h_tag) - if h_tag in self.body_tag.find_all("p") and exist_in_toc: - new_tag = BeautifulSoup( - features="lxml").new_tag("h" + outline_level) - text = h_tag.text - h_tag.replaceWith(new_tag) - new_tag.string = text - else: - # rethink document structure when you have toc_links, other cases? - self.logger_object.log(f"Something went wrong in processing toc_links." - f" Check the structure of the file. " - f"Tag name: {h_tag.name}") - - def _clean_tag(self, tag: str, attr_name: str, attr_value: re): - # todo regex - """ - Function to clean tags by its name and attribute value. - Parameters - ---------- - tag: str - tag name to clean - attr_name: str - attribute name - attr_value: [str,re] - attribute value - - Returns - ------- - clean tag - - """ - tags = self.body_tag.find_all(tag, {attr_name: attr_value}) - for tag in tags: - if len(tag.attrs) == 1: - tag.unwrap() - - def _clean_underline_links(self): - # todo regex - """Function cleans meaningless tags before links.""" - underlines = self.body_tag.find_all("u") - for u in underlines: - if u.find_all("a"): - u.unwrap() - - links = self.body_tag.find_all("a") - for link in links: - u = link.find_all("u") - if u and len(u) == 1: - u[0].unwrap() - - @classmethod - def convert_pt_to_px(cls, value): - value = float(value) - if value == LiveCartaConfig.WORD_DEFAULT_FONT_SIZE: - return LiveCartaConfig.LIVECARTA_DEFAULT_FONT_SIZE - else: - return value - - @classmethod - def convert_font_pt_to_px(cls, style: str) -> str: - """ - Function converts point in the font-size to pixels. - Parameters - ---------- - style: str - str with style to proces - - Returns - ------- - : str - str with converted style - - """ - size = re.search(r"font-size: (\d{1,3})pt", style) - if size is None: - return style - size = size.group(1) - new_size = cls.convert_pt_to_px(size) - if new_size == LiveCartaConfig.LIVECARTA_DEFAULT_FONT_SIZE: - return "" - return re.sub(size + "pt", str(new_size) + "px", style) - - def _font_to_span(self): - """ - Function to convert tag to . - If font style is default, then remove this tag. - """ - fonts = self.body_tag.find_all("font") - for font in fonts: - face, style, color =\ - font.get("face"), font.get("style"), font.get("color") - - font.attrs, font.name = {}, "span" - if style: - style = self.convert_font_pt_to_px(style) - if style != "": - if color and color in LiveCartaConfig.COLORS_MAP: - style += f"; color: {color};" - font.attrs["style"] = style - elif color and color in LiveCartaConfig.COLORS_MAP: - font.attrs["style"] = f"color: {color};" - - if len(font.attrs) == 0: - font.unwrap() - - # on this step there should be no more tags - assert len(self.body_tag.find_all("font")) == 0 - - def clean_trash(self): - # todo make it regex dict - """Function to remove all styles and tags we don"t need.""" - self._clean_tag("span", "style", re.compile( - r"^background: #[\da-fA-F]{6}$")) - # todo: check for another languages - self._clean_tag("span", "lang", re.compile(r"^ru-RU$")) - self._clean_tag("span", "style", re.compile( - "^letter-spacing: -?[\d.]+pt$")) - - self._clean_tag("font", "face", re.compile( - r"^Times New Roman[\w, ]+$")) - - self._clean_tag("a", "name", "_GoBack") - self._clean_underline_links() - - self._font_to_span() - - # replace toc with empty tag - tables = self.body_tag.find_all( - "div", id=re.compile(r"^Table of Contents\d+")) - for table in tables: - table.wrap(self.html_soup.new_tag("TOC")) - table.decompose() - - def _preprocessing_headings(self): - # todo regex - """Function to convert all lower level headings to p tags""" - pattern = f"^h[{LiveCartaConfig.SUPPORTED_LEVELS + 1}-9]$" - header_tags = self.body_tag.find_all(re.compile(pattern)) - for tag in header_tags: - tag.name = "p" - - def _process_paragraph(self): - """Function to process

tags (text-align and text-indent value).""" - paragraphs = self.body_tag.find_all("p") - - for p in paragraphs: - # libre converts some \n into

with 2
- # there we remove 1 unnecessary
- brs = p.find_all("br") - text = p.text - - if brs and text == "\n\n" and len(brs) == 2: - brs[0].decompose() - - indent_should_be_added = False - if text and ((text[0:1] == "\t") or (text[:2] == "\n\t")): - indent_should_be_added = True - - align = p.get("align") - style = p.get("style") - - if style: - indent = re.search(r"text-indent: ([\d.]{1,4})in", style) - margin_left = re.search(r"margin-left: ([\d.]{1,4})in", style) - margin_right = re.search( - r"margin-right: ([\d.]{1,4})in", style) - margin_top = re.search(r"margin-top: ([\d.]{1,4})in", style) - margin_bottom = re.search( - r"margin-bottom: ([\d.]{1,4})in", style) - else: - indent = margin_left = margin_right = \ - margin_top = margin_bottom = None - - if margin_left and margin_right and margin_top and margin_bottom and \ - margin_left.group(1) == "0.6" and margin_right.group(1) == "0.6" and \ - margin_top.group(1) == "0.14" and margin_bottom.group(1) == "0.11": - p.wrap(BeautifulSoup(features="lxml").new_tag("blockquote")) - - p.attrs = {} - style = "" - - if align is not None and align != LiveCartaConfig.DEFAULT_ALIGN_STYLE: - style += f"text-align: {align};" - - if indent is not None or indent_should_be_added: - # indent = indent.group(1) - style += f"text-indent: {LiveCartaConfig.INDENT};" - - if style: - p.attrs["style"] = style - - def _process_two_columns(self): - """Function to process paragraphs which has two columns layout.""" - two_columns = self.body_tag.find_all("div", style="column-count: 2") - for div in two_columns: - for child in div.children: - if child.name == "p": - child["class"] = "columns2" - div.unwrap() - - def _process_quotes(self): - """ - Function to process block quotes. - After docx to html conversion block quotes are stored inside table with 1 cell. - All text is wrapped in a tag. - Such tables will be replaced with

tags. - - - - - - -
-

aaaaa

-


-
- - """ - tables = self.body_tag.find_all("table") - for table in tables: - trs = table.find_all("tr") - tds = table.find_all("td") - if len(trs) == 1 and len(tds) == 1 and tds[0].get("width") == "600": - td = tds[0] - is_zero_border = "border: none;" in td.get("style") - paragraphs = td.find_all("p") - has_i_tag_or_br = [(p.i, p.br) for p in paragraphs] - has_i_tag_or_br = [x[0] is not None or x[1] is not None - for x in has_i_tag_or_br] - - if all(has_i_tag_or_br) and is_zero_border: - new_div = BeautifulSoup( - features="lxml").new_tag("blockquote") - for p in paragraphs: - new_div.append(p) - - table.replaceWith(new_div) - - def _process_tables(self): - """Function to process tables. Set "border" attribute.""" - tables = self.body_tag.find_all("table") - for table in tables: - tds = table.find_all("td") - - sizes = [] - for td in tds: - style = td.get("style") - - if style: - match = re.search(r"border: ?(\d+\.?\d*)(p[tx])", style) - - if match: - size = match.group(1) - units = match.group(2) - - if units == "pt": - size = self.convert_pt_to_px(size) - - sizes.append(float(size)) - - width = td.get("width") - - td.attrs = {} - if width: - td.attrs["width"] = width - - if sizes: - border_size = sum(sizes) / len(sizes) - table.attrs["border"] = f"{border_size:.2}" - - self.tables_amount = len(tables) - - def _process_hrefs(self): - a_tags_with_href = self.body_tag.find_all( - "a", {"href": re.compile("^.*http.+")}) - - # remove char=end of file for some editors - for tag in a_tags_with_href: - tag.string = tag.text.replace("\u200c", "") - tag["href"] = tag.attrs.get("href").replace("%E2%80%8C", "") - - a_tags_with_href = self.body_tag.find_all( - "a", {"href": re.compile("^(?!#sdfootnote)")}) - for tag in a_tags_with_href: - tag.string = tag.text.replace("\u200c", "") - tag.string = tag.text.replace("\u200b", "") # zero-width-space - tag["href"] = tag.attrs.get("href").replace("%E2%80%8C", "") - - def _process_footer(self): - # todo regex - """ - Function to process
tags. - All the tags will be deleted from file. - """ - divs = self.body_tag.find_all("div", {"title": "footer"}) - for div in divs: - div.decompose() - - def _process_div(self): - # todo regex - """Function to process
tags. All the tags will be deleted from file, all content of the tags will stay.""" - divs = self.body_tag.find_all("div") - for div in divs: - div.unwrap() - - def _get_top_level_headers(self): - """ - Function for gathering info about top-level chapters. - - Assume: - - Headers with the smallest outline(or digit in ) are top level chapters. - [ It is consistent with a recursive algorithm - for saving content to a resulted json structure, - which happens in header_to_json()] - - """ - headers_info = [] - header_tags = self.body_tag.find_all(re.compile("^h[1-9]$")) - headers_outline = [int(re.sub(r"^h", "", tag.name)) - for tag in header_tags] - if headers_outline: - top_level_outline = min(headers_outline) - top_level_headers = [tag for tag in header_tags - if int(re.sub(r"^h", "", tag.name)) == top_level_outline] - - for tag in top_level_headers: - if tag.parent.name == "li": - tag.parent.unwrap() - while tag.parent.name == "ol": - tag.parent.unwrap() - - title = tag.text - title = re.sub(r"\s+", " ", title).strip() - number = re.match(r"^(?:\.?\d+\.? ?)+", title) - is_numbered = number is not None - - cleaned_title = re.sub(r"[\s\xa0]", " ", tag.text) - is_introduction = cleaned_title.lower() == "introduction" - - headers_info.append({ - "title": cleaned_title, - "is_numbered": is_numbered, - "is_introduction": is_introduction}) - return headers_info - - def _mark_introduction_headers(self): - """ - Function to find out: - what header shouldn"t be numbered and can be treated as introduction chapter - Assume header(s) to be introduction if: - 1. one header not numbered, before 1 numbered header - 2. it is first header from the top level list, and it equals to "introduction" - - Returns - ------- - None - mark each top-level header with flag should_be_numbered = true/false - - """ - is_numbered_header = [header["is_numbered"] - for header in self.top_level_headers] - is_title = [header["is_introduction"] - for header in self.top_level_headers] - - first_not_numbered = is_numbered_header and is_numbered_header[0] == 0 - second_is_numbered_or_not_exist = all(is_numbered_header[1:2]) - first_header_is_introduction = is_title and is_title[0] - - if (first_not_numbered and second_is_numbered_or_not_exist) or first_header_is_introduction: - self.top_level_headers[0]["should_be_numbered"] = False - for i in range(1, len(self.top_level_headers)): - self.top_level_headers[i]["should_be_numbered"] = True - else: - for i in range(0, len(self.top_level_headers)): - self.top_level_headers[i]["should_be_numbered"] = True - - @staticmethod - def clean_title_from_tabs(tag: NavigableString): - cleaned = re.sub(r"[\s\xa0]", " ", tag) - this = BeautifulSoup.new_string(BeautifulSoup( - features="lxml"), cleaned, NavigableString) - tag.replace_with(this) - - def apply_func_to_last_child(self, tag, func=None): - """ - works only with constructions like (((child to work with))) - where child is object of NavigableString - """ - if type(tag) is NavigableString: - func(tag) - else: - children = list(tag.children) - if children: - self.apply_func_to_last_child(children[0], func) - - def _process_headings(self): - # todo regex - """ - Function to process tags . - Steps - ---------- - 1. remove , - 2. clean text in header from numbering and \n - - Returns - ------- - None - processed tags - - """ - header_tags = self.body_tag.find_all(re.compile("^h[1-9]$")) - - # 1. remove , - for tag in header_tags: - b_tags = tag.find_all("b") - [tag.unwrap() for tag in b_tags] - - spans = tag.find_all("span") - if spans: - for span in spans: - style = span.attrs.get("style") - span.unwrap() - tag.attrs = {} - - header_tags = self.body_tag.find_all(re.compile("^h[1-9]$")) - - # 2. clean text in header from numbering and \n - for tag in header_tags: - if tag.parent.name == "li": - tag.parent.unwrap() - while tag.parent.name == "ol": - tag.parent.unwrap() - - cleaned_title = re.sub(r"[\s\xa0]", " ", tag.text) - if cleaned_title == "": - tag.unwrap() - else: - assert tag.name in LiveCartaConfig.SUPPORTED_HEADERS, \ - f"Preprocessing went wrong, there is still h{LiveCartaConfig.SUPPORTED_LEVELS + 1}-h9 headings." - - content = list(tag.children) - - # do not take into account rubbish empty tags like , but don"t remove them - content = [item for item in content if - (type(item) is not NavigableString and item.text != "") - or (type(item) is NavigableString)] - - content[0] = "" if content[0] == " " else content[0] - content = [item for item in content if item != ""] - - for i, item in enumerate(content): - if type(content[i]) is NavigableString: - cleaned = re.sub(r"(\s+)+", " ", content[i]) - this = BeautifulSoup.new_string(BeautifulSoup( - features="lxml"), cleaned, NavigableString) - content[i].replace_with(this) - content[i] = this - else: - self.apply_func_to_last_child( - content[i], self.clean_title_from_tabs) - - def _process_lists(self): - # todo regex - """ - Function - - process tags
  • . - - unwrap

    tags. - - Returns - ------- - None - uwrap

    tag with li - - """ - li_tags = self.body_tag.find_all("li") - for li_tag in li_tags: - li_tag.attrs.update(li_tag.p.attrs) - li_tag.p.unwrap() - - def delete_content_before_toc(self): - # remove all tag upper the only in content !!! body tag is not updated - toc_tag = self.html_soup.new_tag("TOC") - self.content: List[Tag] = self.body_tag.find_all(recursive=False) - if toc_tag in self.content: - ind = self.content.index(toc_tag) + 1 - self.content = self.content[ind:] - - def process_html(self, access=None, html_path="", book_id=0): - """Process html code to satisfy LiveCarta formatting.""" - self.logger_object.log("Beginning of processing .html file.") - try: - self.logger_object.log(f"Processing TOC and headers.") - self._process_toc_links() - - self.clean_trash() - - # process main elements of the .html doc - self.logger_object.log(f"Processing main elements of html.") - self._preprocessing_headings() - self._process_paragraph() - self._process_two_columns() - - self.logger_object.log("Block quotes processing.") - self._process_quotes() - - self.logger_object.log("Tables processing.") - self._process_tables() - self.logger_object.log( - f"{self.tables_amount} tables have been processed.") - - self.logger_object.log("Hrefs processing.") - self._process_hrefs() - - self.logger_object.log("Footnotes processing.") - self.footnotes = process_footnotes(self.body_tag) - self.logger_object.log( - f"{len(self.footnotes)} footnotes have been processed.") - - self.logger_object.log("Image processing.") - self.images = process_images(access=access, html_path=html_path, - book_id=book_id, body_tag=self.body_tag) - self.logger_object.log( - f"{len(self.images)} images have been processed.") - - self._process_footer() - self._process_div() - - self.top_level_headers = self._get_top_level_headers() - self._mark_introduction_headers() - - self._process_headings() - - self._process_lists() - # delete text before table of content if exists - self.delete_content_before_toc() - - except Exception as exc: - self.logger_object.log( - "Error has occurred while processing html.", logging.ERROR) - self.logger_object.log_error_to_main_log() - if self.status_wrapper: - self.status_wrapper.set_error() - raise exc - - self.logger_object.log("End of processing .html file.") - - return self.content, self.footnotes, self.top_level_headers diff --git a/src/docx_converter/html_docx_processor.py b/src/docx_converter/html_docx_processor.py new file mode 100644 index 0000000..8650865 --- /dev/null +++ b/src/docx_converter/html_docx_processor.py @@ -0,0 +1,266 @@ +import re +import pathlib +from typing import List, Tuple, Dict, Union +from bs4 import BeautifulSoup, Tag, NavigableString + +from src.util.helpers import BookLogger +from src.html_presets_processor import _process_presets +from src.docx_converter.image_processing import process_images +from src.docx_converter.footnotes_processing import process_footnotes +from src.inline_style_processor import modify_html_soup_with_css_styles + + +class HtmlDocxProcessor: + def __init__(self, logger: BookLogger, html_soup: BeautifulSoup, html_preprocessor, style_preprocessor): + self.logger = logger + self.html_soup = html_soup + self.body_tag = self.html_soup.body + self.html_preprocessor = html_preprocessor + self.style_preprocessor = style_preprocessor + self.content: List[Tag] = [] + + def _font_to_span(self): + for font in self.body_tag.find_all("font"): + font.name = "span" + + + def _process_hrefs(self): + a_tags_with_href = self.body_tag.find_all( + "a", {"href": re.compile("^.*http.+")}) + + # remove char=end of file for some editors + for tag in a_tags_with_href: + tag.string = tag.text.replace("\u200c", "") + tag["href"] = tag.attrs.get("href").replace("%E2%80%8C", "") + + a_tags_with_href = self.body_tag.find_all( + "a", {"href": re.compile("^(?!#sdfootnote)")}) + for tag in a_tags_with_href: + tag.string = tag.text.replace("\u200c", "") + tag.string = tag.text.replace("\u200b", "") # zero-width-space + tag["href"] = tag.attrs.get("href").replace("%E2%80%8C", "") + + def _process_toc_links(self): + """Function to extract nodes which contains TOC links, remove links from file and detect headers.""" + def _check_parent_link_exist_in_toc(tag_with_link: Tag) -> bool: + toc_links = [] + for a_tag in tag_with_link.find_all("a", {"name": re.compile(r"^_Toc\d+")}): + link_name = a_tag.attrs["name"] + toc_item = self.body_tag.find("a", {"href": "#" + link_name}) + if toc_item: + toc_links.append(toc_item) + return len(toc_links) > 0 + toc_links = self.body_tag.find_all( + "a", {"name": re.compile(r"^_Toc\d+")}) + headers = [link.parent for link in toc_links] + outline_level = "1" # All the unknown outlines will be predicted as

    + for tag in headers: + if re.search(r"^h\d$", tag.name): + tag.a.unwrap() + elif tag.name == "p": + exist_in_toc = _check_parent_link_exist_in_toc(tag) + if tag in self.body_tag.find_all("p") and exist_in_toc: + new_tag = BeautifulSoup( + features="lxml").new_tag("h" + outline_level) + text = tag.text + tag.replaceWith(new_tag) + new_tag.string = text + else: + # rethink document structure when you have toc_links, other cases? + self.logger.log(f"Something went wrong in processing toc_links." + f"Check the structure of the file." + f"Tag name: {tag.name}") + + def _get_top_level_headers(self) -> List[Dict[str, Union[str, bool]]]: + """ + Function for gathering info about top-level chapters. + + Assume: _ + - Headers with the smallest outline(or digit in ) are top level chapters. + [It is consistent with a recursive algorithm + for saving content to a resulted json structure, + which happens in header_to_json()] + + """ + headers_info = [] + header_tags = self.body_tag.find_all(re.compile("^h[1-9]$")) + headers_outline = [int(re.sub(r"^h", "", tag.name)) + for tag in header_tags] + if headers_outline: + top_level_outline = min(headers_outline) + top_level_headers = [tag for tag in header_tags + if int(re.sub(r"^h", "", tag.name)) == top_level_outline] + + for tag in top_level_headers: + if tag.parent.name == "li": + tag.parent.unwrap() + while tag.parent.name == "ol": + tag.parent.unwrap() + + title = tag.text + title = re.sub(r"\s+", " ", title).strip() + number = re.match(r"^(?:\.?\d+\.? ?)+", title) + is_numbered = number is not None + + cleaned_title = re.sub(r"[\s\xa0]", " ", tag.text) + is_introduction = cleaned_title.lower() == "introduction" + + headers_info.append({ + "title": cleaned_title, + "is_numbered": is_numbered, + "is_introduction": is_introduction}) + return headers_info + + @staticmethod + def _mark_introduction_headers(top_level_headers: List[Dict[str, Union[str, bool]]]): + """ + Function to find out: + what header shouldn't be numbered and can be treated as introduction chapter + Assume header(s) to be introduction if: + 1. one header not numbered, before 1 numbered header + 2. it is first header from the top level list, and it equals to "introduction" + + Returns + ------- + None + mark each top-level header with flag should_be_numbered = true/false + + """ + is_numbered_header = [header["is_numbered"] + for header in top_level_headers] + is_title = [header["is_introduction"] + for header in top_level_headers] + + first_not_numbered = is_numbered_header and is_numbered_header[0] == 0 + second_is_numbered_or_not_exist = all(is_numbered_header[1:2]) + first_header_is_introduction = is_title and is_title[0] + + if (first_not_numbered and second_is_numbered_or_not_exist) or first_header_is_introduction: + top_level_headers[0]["should_be_numbered"] = False + for i in range(1, len(top_level_headers)): + top_level_headers[i]["should_be_numbered"] = True + else: + for i in range(0, len(top_level_headers)): + top_level_headers[i]["should_be_numbered"] = True + + @staticmethod + def clean_title_from_tabs(tag: NavigableString): + cleaned = re.sub(r"[\s\xa0]", " ", tag) + this = BeautifulSoup.new_string(BeautifulSoup( + features="lxml"), cleaned, NavigableString) + tag.replace_with(this) + + def apply_func_to_last_child(self, tag: Union[NavigableString, Tag], func=None): + """ + works only with constructions like (((child to work with))) + where child is object of NavigableString + """ + if type(tag) is NavigableString: + func(tag) + elif list(tag.children): + self.apply_func_to_last_child(list(tag.children)[0], func) + + def _process_headings(self): + """ + Function to process tags . + Clean header from attrs and text in header from numbering and \n + + Returns + ------- + None + processed tags + + """ + header_tags = self.body_tag.find_all(re.compile("^h[1-5]$")) + # clean header from attrs and text in header from numbering and \n + for h_tag in header_tags: + h_tag.attrs = {} + for tag in h_tag.find_all(): + tag.attrs = {} + if h_tag.parent.name == "li": + h_tag.parent.unwrap() + while h_tag.parent.name == "ol": + h_tag.parent.unwrap() + + cleaned_title = re.sub(r"[\s\xa0]", " ", h_tag.text) + if cleaned_title != "": + content = list(h_tag.children) + # do not take into account rubbish empty tags like , but don"t remove them + content = [item for item in content if + (type(item) is not NavigableString and item.text != "") + or (type(item) is NavigableString)] + + content[0] = "" if content[0] == " " else content[0] + content = [item for item in content if item != ""] + + for i, item in enumerate(content): + if type(content[i]) is NavigableString: + cleaned = re.sub(r"(\s+)+", " ", content[i]) + this = BeautifulSoup.new_string(BeautifulSoup( + features="lxml"), cleaned, NavigableString) + content[i].replace_with(this) + content[i] = this + else: + self.apply_func_to_last_child( + content[i], self.clean_title_from_tabs) + else: + h_tag.unwrap() + + + def delete_content_before_toc(self): + # remove all tag upper the only in content !!! body tag is not updated + toc_tag = self.html_soup.new_tag("TOC") + if toc_tag in self.content: + ind = self.content.index(toc_tag) + 1 + self.content = self.content[ind:] + + def process_html(self, + access=None, + html_path: pathlib.Path = "", + book_id: int = 0) -> Tuple[List[Tag], List[str], List[Dict[str, Union[str, bool]]]]: + """Process html to satisfy LiveCarta formatting.""" + self.logger.log("Beginning of processing .html file.") + + # Process styles doesn't see because they aren't supported by html + self._font_to_span() + + self.logger.log("Inline style reading.") + self.style_preprocessor.process_inline_styles_in_html_soup( + self.body_tag) + + self.logger.log("Inline style processing.") + modify_html_soup_with_css_styles(self.body_tag) + + self.logger.log("Image processing.") + images = process_images(access, path_to_html=html_path, + book_id=book_id, body_tag=self.body_tag) + self.logger.log( + f"{len(images)} images have been processed.") + + self.logger.log("Footnotes processing.") + footnotes: List[str] = process_footnotes(self.body_tag) + self.logger.log( + f"{len(footnotes)} footnotes have been processed.") + + self.logger.log("Hrefs processing.") + self._process_hrefs() + + self.logger.log(f"TOC processing.") + self._process_toc_links() + + top_level_headers: List[Dict[str, Union[str, bool]]]\ + = self._get_top_level_headers() + self._mark_introduction_headers(top_level_headers) + + self._process_headings() + + self.logger.log(f".html using presets processing.") + _process_presets(html_preprocessor=self.html_preprocessor, + html_soup=self.html_soup) + + self.content = self.body_tag.find_all(recursive=False) + # delete text before table of content if exists + self.delete_content_before_toc() + + self.logger.log("End of processing .html file.") + return self.content, footnotes, top_level_headers diff --git a/src/docx_converter/image_processing.py b/src/docx_converter/image_processing.py index 9c5fdab..6f4112c 100644 --- a/src/docx_converter/image_processing.py +++ b/src/docx_converter/image_processing.py @@ -1,9 +1,29 @@ import os import pathlib +from bs4 import Tag +from typing import Union, List from shutil import copyfile -def process_images(access, html_path, book_id, body_tag): +def save_image_to_aws(access, img_file_path: str, book_id: int) -> str: + """Function saves all images to Amazon web service""" + link_path: str = access.send_image(img_file_path, doc_id=book_id) + return link_path + + +def save_image_locally(img_file_path: str, book_id: int) -> pathlib.Path: + """Function saves all images locally""" + folder_path = os.path.dirname( + os.path.dirname(os.path.abspath(__file__))) + new_path = pathlib.Path(os.path.join( + folder_path, f"../books/json/img_{book_id}/")) + new_path.mkdir(exist_ok=True) + img_folder_path = new_path / os.path.basename(img_file_path) + copyfile(img_file_path, img_folder_path) + return img_folder_path + + +def process_images(access, path_to_html: Union[pathlib.Path, str], book_id: int, body_tag: Tag) -> List: """ Function to process tag. Img should be sent Amazon S3 and then return new tag with valid link. @@ -12,23 +32,18 @@ def process_images(access, html_path, book_id, body_tag): """ img_tags = body_tag.find_all("img") for img in img_tags: - img_name = img.attrs.get("src") + path_to_img_from_html = img.attrs.get("src") # quick fix for bad links - if (len(img_name) >= 3) and img_name[:3] == "../": - img_name = img_name[3:] - img_path = pathlib.Path(f"{html_path.parent}", f"{img_name}") - + if (len(path_to_img_from_html) >= 3) and path_to_img_from_html[:3] == "../": + path_to_img_from_html = path_to_img_from_html[3:] + html_folder = os.path.dirname(path_to_html) + path_to_img_from_root = os.path.normpath(os.path.join( + html_folder, path_to_img_from_html)).replace("\\", "/") if access is not None: - link = access.send_image(img_path, doc_id=book_id) - img.attrs["src"] = link + img_folder_path = save_image_to_aws( + access, path_to_img_from_root, book_id) else: - if img_tags.index(img) == 0: - folder_path = os.path.dirname( - os.path.dirname(os.path.abspath(__file__))) - new_path = pathlib.Path(os.path.join( - folder_path, f"../books/json/img_{book_id}/")) - new_path.mkdir(exist_ok=True) - new_img_path = new_path / img_name - copyfile(img_path, new_img_path) - img.attrs["src"] = str(new_img_path) + img_folder_path = save_image_locally( + path_to_img_from_root, book_id) + img.attrs["src"] = str(img_folder_path) return img_tags diff --git a/src/docx_converter/libre_html2json_converter.py b/src/docx_converter/libre_html2json_converter.py index eb5f0a2..50d936c 100644 --- a/src/docx_converter/libre_html2json_converter.py +++ b/src/docx_converter/libre_html2json_converter.py @@ -1,12 +1,15 @@ import re import logging from copy import copy +from typing import List, Tuple, Dict, Union +from bs4 import Tag from src.livecarta_config import LiveCartaConfig -class LibreHTML2JSONConverter: - def __init__(self, content, footnotes, top_level_headers, logger_object, book_api_status=None): +class LibreHtml2JsonConverter: + def __init__(self, content: List[Tag], footnotes: List[str], top_level_headers: List[Dict[str, Union[str, bool]]], + logger_object, book_api_status=None): self.content_dict = None self.content = content self.footnotes = footnotes @@ -33,7 +36,7 @@ class LibreHTML2JSONConverter: return new_text # TODO: rethink the function structure without indexes. - def header_to_livecarta_chapter_item(self, ind) -> (dict, int): + def header_to_livecarta_chapter_item(self, ind: int) -> Union[Tuple[Dict[str, Union[str, List]], int], str]: """ Function process header and collects all content for it. Parameters @@ -90,7 +93,7 @@ class LibreHTML2JSONConverter: return "" @staticmethod - def _is_empty_p_tag(tag): + def _is_empty_p_tag(tag: Tag) -> bool: if tag.name != "p": return False @@ -102,7 +105,6 @@ class LibreHTML2JSONConverter: text = re.sub(r"\s+", "", temp_tag.text) if text: return False - return True def convert_to_dict(self): @@ -148,9 +150,7 @@ class LibreHTML2JSONConverter: # Add is_introduction field to json structure # after deleting content before toc, some chapters can be deleted if self.top_level_headers: - same_first_titles = self.top_level_headers[0]["title"] == json_strc[0]["title"] is_first_header_introduction = not self.top_level_headers[0]["should_be_numbered"] - json_strc[0]["is_introduction"] = is_first_header_introduction self.content_dict = { diff --git a/src/epub_converter/epub_converter.py b/src/epub_converter/epub_converter.py index fb3b786..dbf3509 100644 --- a/src/epub_converter/epub_converter.py +++ b/src/epub_converter/epub_converter.py @@ -1,34 +1,32 @@ import re -import json -import codecs import ebooklib -from ebooklib import epub -from ebooklib.epub import Link, Section from os import path from pathlib import Path +from ebooklib import epub +from ebooklib.epub import Link, Section from itertools import chain -from premailer import transform from collections import defaultdict -from typing import Dict, Union, List -from bs4 import BeautifulSoup, NavigableString, Tag +from typing import List, Tuple, Dict, Union +from bs4 import BeautifulSoup, Tag, NavigableString from src.util.helpers import BookLogger -from src.epub_converter.css_processor import CSSPreprocessor -from src.epub_converter.html_epub_processor import HtmlEpubPreprocessor from src.livecarta_config import LiveCartaConfig from src.data_objects import ChapterItem, NavPoint +from src.style_reader import StyleReader +from src.epub_converter.html_epub_processor import HtmlEpubProcessor from src.epub_converter.image_processing import update_images_src_links from src.epub_converter.footnotes_processing import preprocess_footnotes -from src.epub_converter.tag_inline_style_processor import TagInlineStyleProcessor +from src.inline_style_processor import modify_html_soup_with_css_styles class EpubConverter: - def __init__(self, book_path, access=None, logger=None, css_processor=None, html_processor=None): + def __init__(self, book_path, access=None, logger: BookLogger = None, + style_processor: StyleReader = None, html_processor: HtmlEpubProcessor = None): self.book_path = book_path self.access = access self.logger: BookLogger = logger self.ebooklib_book = epub.read_epub(book_path) - self.css_processor = css_processor + self.style_processor = style_processor self.html_processor = html_processor # main container for all epub .xhtml files @@ -39,7 +37,8 @@ class EpubConverter: # toc tree structure stored as adj.list (NavPoint to list of NavPoints) # key = -1 for top level NavPoints - self.adjacency_list: Dict[Union[NavPoint, -1], Union[list, None]] = {} + self.adjacency_list: Dict[Union[NavPoint, -1], + Union[List[NavPoint], None]] = {} # list to offset Chapter_i on 1st level self.offset_sub_nodes = [] @@ -58,6 +57,18 @@ class EpubConverter: self.noterefs: List[Tag] = [] # start of the footnote self.footnotes: List[Tag] = [] # end of the footnote + self.logger.log("HTML files reading.") + self.html_href2html_body_soup: Dict[str, + BeautifulSoup] = self.build_href2soup_content() + + self.logger.log("CSS inline style processing.") + [self.style_processor.process_inline_styles_in_html_soup( + self.html_href2html_body_soup[html_href]) for html_href in self.html_href2html_body_soup] + self.logger.log("CSS files processing.") + self.html_href2css_href, self.css_href2css_content = self.build_html_and_css_relations() + self.logger.log("CSS styles fusion(inline+file).") + self.add_css_styles_to_html_soup() + self.logger.log("Image processing.") for x in chain(self.ebooklib_book.get_items_of_type(ebooklib.ITEM_IMAGE), self.ebooklib_book.get_items_of_type(ebooklib.ITEM_COVER)): @@ -65,17 +76,6 @@ class EpubConverter: content = x.content self.img_href2img_bytes[file_name] = content - self.logger.log("HTML files reading.") - self.html_href2html_body_soup: Dict[str, - BeautifulSoup] = self.build_href2soup_content() - - self.logger.log("CSS inline style processing.") - self.css_processor.process_inline_styles_in_html_soup(self.html_href2html_body_soup) - self.logger.log("CSS files processing.") - self.html_href2css_href, self.css_href2css_content = self.build_html_and_css_relations() - self.logger.log("CSS styles fusion(inline+file).") - self.add_css_styles_to_html_soup() - self.logger.log("Footnotes processing.") for href in self.html_href2html_body_soup: self.footnotes_contents, self.noterefs, self.footnotes =\ @@ -107,7 +107,6 @@ class EpubConverter: def build_href2soup_content(self) -> Dict[str, BeautifulSoup]: # using EpubElements # for now just for HTML objects, as it is the simplest chapter - nodes = dict() for item in self.ebooklib_book.get_items_of_type(ebooklib.ITEM_DOCUMENT): html_body_text = item.get_body_content() @@ -116,7 +115,7 @@ class EpubConverter: nodes[item.file_name] = soup return nodes - def build_html_and_css_relations(self) -> tuple[dict, dict]: + def build_html_and_css_relations(self) -> Tuple[Dict[str, List[str]], Dict[str, str]]: """ Function is designed to get 2 dictionaries: The first is html_href2css_href. It is created to connect href of html to css files(hrefs of them @@ -130,8 +129,8 @@ class EpubConverter: """ # dictionary: href of html to related css files - html_href2css_href: defaultdict = defaultdict(list) - css_href2css_content: dict = {} + html_href2css_href: Dict[str, List[str]] = defaultdict(list) + css_href2css_content: Dict[str, str] = {} for item in self.ebooklib_book.get_items_of_type(ebooklib.ITEM_DOCUMENT): html_content = item.content @@ -146,54 +145,16 @@ class EpubConverter: html_href2css_href[html_href].append(css_href) if css_href not in css_href2css_content: # css_href not in css_href2css_content, add to this dict - css_href2css_content[css_href] = self.css_processor.build_css_file_content( - self.css_processor.get_css_content(css_href, html_href, self.ebooklib_book)) + css_href2css_content[css_href] = self.style_processor.build_css_file_content( + self.style_processor.get_css_content(css_href, html_href, self.ebooklib_book)) for i, tag in enumerate(soup_html_content.find_all("style")): css_content = tag.string html_href2css_href[html_href].append(f"href{i}") - css_href2css_content[f"href{i}"] = self.css_processor.build_css_file_content( + css_href2css_content[f"href{i}"] = self.style_processor.build_css_file_content( css_content) return html_href2css_href, css_href2css_content - @staticmethod - def modify_html_soup_with_css_styles(html_soup: BeautifulSoup, css_text: str) -> BeautifulSoup: - """ - Function adds styles from .css to inline style. - Parameters - ---------- - html_soup: BeautifulSoup - html page with inline style - css_text: str - css content from css file - Returns - ------- - inline_soup: BeautifulSoup - soup with styles from css - - """ - # remove this specification because it causes problems - css_text = css_text.replace( - '@namespace epub "http://www.idpf.org/2007/ops";', '') - # here we add css styles to inline style - html_with_css_styles: str = transform(str(html_soup), css_text=css_text, - remove_classes=False, - external_styles=False, - allow_network=False, - disable_validation=True, - ) - # soup with converted styles from css - inline_soup = BeautifulSoup(html_with_css_styles, features="lxml") - - tags_with_inline_style = inline_soup.find_all(LiveCartaConfig.could_have_style_in_livecarta_regexp, - attrs={"style": re.compile(".*")}) - - # go through the tags with inline style + style parsed from css file - for tag_inline_style in tags_with_inline_style: - style_converter = TagInlineStyleProcessor(tag_inline_style) - style_converter.convert_initial_tag() - return inline_soup - def add_css_styles_to_html_soup(self): """ This function is designed to update html_href2html_body_soup @@ -209,11 +170,13 @@ class EpubConverter: for css_href in self.html_href2css_href[html_href]: css += self.css_href2css_content[css_href] html_content: BeautifulSoup = self.html_href2html_body_soup[html_href] - html_content = self.modify_html_soup_with_css_styles( + html_content = modify_html_soup_with_css_styles( html_content, css) self.html_href2html_body_soup[html_href] = html_content - def build_adjacency_list_from_toc(self, element: [Link, tuple, list], lvl=0): + def build_adjacency_list_from_toc(self, + element: Union[Link, Tuple[Section, List], List[Union[Link, Tuple]]], + lvl: int = 0) -> NavPoint: """ Function self.adjacency_list builds based on TOC nested structure, got from self.ebooklib.toc @@ -254,7 +217,7 @@ class EpubConverter: sub_nodes = [] for elem in second: - if (bool(re.search('^section$|^part$', first.title.lower()))) and lvl == 1: + if (bool(re.search("^section$|^part$", first.title.lower()))) and lvl == 1: self.offset_sub_nodes.append( self.build_adjacency_list_from_toc(elem, lvl)) else: @@ -288,7 +251,7 @@ class EpubConverter: return False def build_adjacency_list_from_spine(self): - def build_manifest_id2html_href() -> dict: + def build_manifest_id2html_href() -> Dict[int, str]: links = dict() for item in self.ebooklib_book.get_items_of_type(ebooklib.ITEM_DOCUMENT): links[item.id] = item.file_name @@ -304,7 +267,7 @@ class EpubConverter: self.adjacency_list[-1].append(nav_point) self.hrefs_added_to_toc.add(nav_point.href) - def add_not_added_files_to_adjacency_list(self, not_added: list): + def add_not_added_files_to_adjacency_list(self, not_added: List[str]): """Function add files that not added to adjacency list""" for i, file in enumerate(not_added): nav_point = NavPoint( @@ -315,7 +278,7 @@ class EpubConverter: def label_subchapters_with_lc_tag(self): for html_href in self.html_href2html_body_soup: ids, soup = self.html_href2subchapters_ids[html_href], \ - self.html_href2html_body_soup[html_href] + self.html_href2html_body_soup[html_href] for i in ids: tag = soup.find(id=i) tmp_tag = soup.new_tag("lc_tmp") @@ -345,10 +308,13 @@ class EpubConverter: mark.parent.unwrap() @staticmethod - def create_unique_id(href, id_): + def create_unique_id(href: str, id_: str) -> str: return re.sub(r"([^\w\s])|_|-", "", href) + re.sub(r"[_-]", "0", id_) - def match_href_to_path_from_toc(self, cur_file_path: str, href_in_link: str, internal_link_tag: Tag) -> [None, str]: + def match_href_to_path_from_toc(self, + cur_file_path: str, + href_in_link: str, + internal_link_tag: Tag) -> Union[None, str]: """ Function used to find full path to file that is parsed from tag link TOC: a/b/c.xhtml @@ -387,7 +353,7 @@ class EpubConverter: return full_path[0] @staticmethod - def create_new_anchor_span(soup, id_): + def create_new_anchor_span(soup: BeautifulSoup, id_: str) -> Tag: new_anchor_span = soup.new_tag("span") new_anchor_span.attrs["id"] = id_ new_anchor_span.attrs["class"] = "link-anchor" @@ -415,7 +381,8 @@ class EpubConverter: for toc_href in self.hrefs_added_to_toc: for tag in self.html_href2html_body_soup[toc_href].find_all(attrs={"id": re.compile(r".+")}): if tag.attrs.get("class") not in ["converter-chapter-mark", "footnote-element"]: - new_id = self.create_unique_id(toc_href, tag.attrs["id"]) + new_id = self.create_unique_id( + toc_href, tag.attrs["id"]) tag.attrs["id"] = new_id def process_file_anchor(): @@ -427,11 +394,13 @@ class EpubConverter: a_tag_href_matched_to_toc = self.match_href_to_path_from_toc( toc_href, a_tag_href, internal_link_tag) if a_tag_href_matched_to_toc: - new_id = self.create_unique_id(a_tag_href_matched_to_toc, "") + new_id = self.create_unique_id( + a_tag_href_matched_to_toc, "") internal_link_tag.attrs["placeholder"] = "{{tempStyleToAnchor-" + new_id + "}}" if new_id not in self.internal_anchors: anchor_soup = self.html_href2html_body_soup[a_tag_href_matched_to_toc] - new_anchor_span = self.create_new_anchor_span(soup, new_id) + new_anchor_span = self.create_new_anchor_span( + soup, new_id) # insert a new span to the beginning of the file anchor_soup.insert(0, new_anchor_span) self.internal_anchors.add(new_id) @@ -442,7 +411,8 @@ class EpubConverter: soup = self.html_href2html_body_soup[toc_href] # process_file_element_anchor for internal_link_tag in soup.find_all("a", {"href": re.compile(r"(^.+\.(htm|html|xhtml)#.+)|(^#.+)")}): - a_tag_href, a_tag_id = internal_link_tag.attrs["href"].split("#") + a_tag_href, a_tag_id = internal_link_tag.attrs["href"].split( + "#") a_tag_href_matched_to_toc = self.match_href_to_path_from_toc( toc_href, a_tag_href, internal_link_tag) if a_tag_href \ else path.normpath(toc_href).replace("\\", "/") @@ -452,7 +422,8 @@ class EpubConverter: anchor_soup = self.html_href2html_body_soup[a_tag_href_matched_to_toc] anchor_tags = anchor_soup.find_all(attrs={"id": new_id}) or \ - anchor_soup.find_all(attrs={"id": a_tag_id}) # if link is a footnote + anchor_soup.find_all( + attrs={"id": a_tag_id}) # if link is a footnote if anchor_tags: if len(anchor_tags) > 1: self.logger.log(f"Warning in {toc_href}: multiple anchors:" @@ -487,7 +458,9 @@ class EpubConverter: process_file_element_anchor() @staticmethod - def get_tags_between_chapter_marks(first_id: str, href: str, html_soup: BeautifulSoup) -> list: + def get_tags_between_chapter_marks(first_id: str, + href: str, + html_soup: BeautifulSoup) -> List[Union[Tag, NavigableString]]: """ Get tags between LiveCarta chapter marks Parameters @@ -568,7 +541,7 @@ class EpubConverter: for tl_nav_point in top_level_nav_points: self.detect_one_chapter(tl_nav_point) - def html_node_to_livecarta_chapter_item(self, nav_point: NavPoint, lvl=1) -> ChapterItem: + def html_node_to_livecarta_chapter_item(self, nav_point: NavPoint, lvl: int = 1) -> ChapterItem: """ Function prepare style, tags to json structure Parameters @@ -584,18 +557,18 @@ class EpubConverter: built chapter """ - title = nav_point.title + title: str = nav_point.title content: BeautifulSoup = self.href_chapter_id2soup_html[(nav_point.href, nav_point.id)] \ if nav_point.id else self.html_href2html_body_soup[nav_point.href] - indent = " " * lvl + indent: str = " " * lvl self.logger.log(indent + f"Chapter: {title} is processing.") - is_chapter = lvl <= LiveCartaConfig.SUPPORTED_LEVELS + is_chapter: bool = lvl <= LiveCartaConfig.SUPPORTED_LEVELS self.logger.log(indent + "Process title.") - title_preprocessed = self.html_processor.prepare_title(title) + title_preprocessed: str = self.html_processor.prepare_title(title) self.logger.log(indent + "Process content.") - content_preprocessed = self.html_processor.prepare_content(title_preprocessed, content, - remove_title_from_chapter=is_chapter) + content_preprocessed: Union[Tag, BeautifulSoup] = self.html_processor.prepare_content( + title_preprocessed, content, remove_title_from_chapter=is_chapter) self.book_image_src_path2aws_path = update_images_src_links(content_preprocessed, self.img_href2img_bytes, @@ -613,7 +586,7 @@ class EpubConverter: sub_nodes.append(sub_chapter_item) return ChapterItem(title_preprocessed, str(content_preprocessed), sub_nodes) - def convert_to_dict(self) -> dict: + def convert_to_dict(self) -> Dict[str, List[Dict[str, Union[List, str]]]]: """Function which convert list of html nodes to appropriate json structure""" top_level_nav_points = self.adjacency_list[-1] top_level_chapters = [] @@ -630,19 +603,3 @@ class EpubConverter: "content": top_level_dict_chapters, "footnotes": self.footnotes_contents } - - -if __name__ == "__main__": - epub_file_path = "../../books/epub/9780763774134.epub" - logger_object = BookLogger( - name="epub", book_id=epub_file_path.split("/")[-1]) - - css_processor = CSSPreprocessor() - html_processor = HtmlEpubPreprocessor(logger=logger_object) - - json_converter = EpubConverter(epub_file_path, logger=logger_object, - css_processor=css_processor, html_processor=html_processor) - content_dict = json_converter.convert_to_dict() - - with codecs.open(epub_file_path.replace("epub", "json"), "w", encoding="utf-8") as f_json: - json.dump(content_dict, f_json, ensure_ascii=False) diff --git a/src/epub_converter/epub_solver.py b/src/epub_converter/epub_solver.py index 9131eda..90c3b95 100644 --- a/src/epub_converter/epub_solver.py +++ b/src/epub_converter/epub_solver.py @@ -1,13 +1,18 @@ +import json +import codecs + from src.book_solver import BookSolver -from src.epub_converter.css_processor import CSSPreprocessor -from src.epub_converter.html_epub_processor import HtmlEpubPreprocessor +from src.util.helpers import BookLogger +from src.html_presets_processor import HtmlPresetsProcessor +from src.style_reader import StyleReader +from src.epub_converter.html_epub_processor import HtmlEpubProcessor from src.epub_converter.epub_converter import EpubConverter class EpubBook(BookSolver): """Class of .epub type book - child of BookSolver""" - def __init__(self, book_id=0, access=None, main_logger=None): + def __init__(self, book_id: int = 0, access=None, main_logger=None): super().__init__(book_id, access, main_logger) self.book_type = "epub" @@ -16,10 +21,8 @@ class EpubBook(BookSolver): Function Steps ---------- - 1. Gets data from preset structure - 2. Add preset to html preprocessor - 3. Converts .epub to .html - 4. Parses from line structure to nested structure + 1. Converts .epub to .html + 2. Parses from line structure to nested structure Returns ---------- @@ -27,10 +30,32 @@ class EpubBook(BookSolver): json for LiveCarta platform """ - css_processor = CSSPreprocessor() - html_processor = HtmlEpubPreprocessor(self.preset_path, logger=self.logger_object) + html_preprocessor = HtmlPresetsProcessor( + logger=self.logger_object, preset_path="presets/epub_presets.json") + style_preprocessor = StyleReader() + html_processor = HtmlEpubProcessor(logger=self.logger_object, + html_preprocessor=html_preprocessor) json_converter = EpubConverter( self.book_path, access=self.access, logger=self.logger_object, - css_processor=css_processor, html_processor=html_processor) + style_processor=style_preprocessor, html_processor=html_processor) content_dict = json_converter.convert_to_dict() return content_dict + + +if __name__ == "__main__": + epub_file_path = "../../books/epub/9780763774134.epub" + logger_object = BookLogger( + name="epub", book_id=epub_file_path.split("/")[-1]) + + html_preprocessor = HtmlPresetsProcessor( + logger=logger_object, preset_path="../../presets/epub_presets.json") + style_preprocessor = StyleReader() + html_processor = HtmlEpubProcessor(logger=logger_object, + html_preprocessor=html_preprocessor) + + json_converter = EpubConverter(epub_file_path, logger=logger_object, + style_processor=style_preprocessor, html_processor=html_processor) + content_dict = json_converter.convert_to_dict() + + with codecs.open(epub_file_path.replace("epub", "json"), "w", encoding="utf-8") as f_json: + json.dump(content_dict, f_json, ensure_ascii=False) diff --git a/src/epub_converter/footnotes_processing.py b/src/epub_converter/footnotes_processing.py index 34cd1fb..8f7ed77 100644 --- a/src/epub_converter/footnotes_processing.py +++ b/src/epub_converter/footnotes_processing.py @@ -1,9 +1,9 @@ import re -from typing import Tuple +from typing import List, Tuple from bs4 import BeautifulSoup, Tag -def _replace_with_livecarta_anchor_tag(anchor, i): +def replace_with_livecarta_anchor_tag(anchor, i): """Function replace noteref_tag(anchor) with new livecarta tag""" new_tag = BeautifulSoup(features="lxml").new_tag("sup") new_tag["class"] = "footnote-element" @@ -16,8 +16,8 @@ def _replace_with_livecarta_anchor_tag(anchor, i): return new_tag -def preprocess_footnotes(source_html_tag: Tag, href2soup_html: dict = None, noteref_attr_name="epub:type") \ - -> Tuple[list, list, list]: +def preprocess_footnotes(source_html_tag: Tag, href2soup_html: dict = None, noteref_attr_name: str = "epub:type") \ + -> Tuple[List, List, List]: """ This function preprocessing footnotes This function should be earlier that adding fonts in pipeline. @@ -75,7 +75,7 @@ def preprocess_footnotes(source_html_tag: Tag, href2soup_html: dict = None, note if footnote_tag.parent.attrs.get("role") and footnote_tag.parent.attrs.get("role") == "docs-endnote": footnote_tag = footnote_tag.parent new_noterefs_tags.append( - _replace_with_livecarta_anchor_tag(noteref_tag, i)) + replace_with_livecarta_anchor_tag(noteref_tag, i)) content = footnote_tag.text # footnote_tag.decompose() footnotes.append(content) @@ -87,5 +87,4 @@ def preprocess_footnotes(source_html_tag: Tag, href2soup_html: dict = None, note noteref.attrs["data-id"] = i + 1 noteref.attrs["id"] = f"footnote-{i + 1}" footnote.attrs["href"] = f"#footnote-{i + 1}" - return footnotes, new_noterefs_tags, new_footnotes_tags diff --git a/src/epub_converter/html_epub_processor.py b/src/epub_converter/html_epub_processor.py index da2a6c0..e92ac8b 100644 --- a/src/epub_converter/html_epub_processor.py +++ b/src/epub_converter/html_epub_processor.py @@ -1,52 +1,16 @@ import re -import json -from bs4 import BeautifulSoup, NavigableString, Comment, Tag +from typing import Union +from bs4.element import PageElement +from bs4 import BeautifulSoup, Tag, NavigableString, Comment from src.util.helpers import BookLogger +from src.html_presets_processor import _process_presets -class HtmlEpubPreprocessor: - def __init__(self, preset_path="../../presets/presets.json", logger=None): - self.preset = json.load(open(preset_path)) - self.logger: BookLogger = logger - self.name2function = { - "table_wrapper": self._wrap_tags_with_table, - "replacer": self._tags_to_correspond_livecarta_tag, - "attr_replacer": self._replace_attrs_in_tags, - "unwrapper": self._unwrap_tags, - "inserter": self._insert_tags_into_correspond_tags - } - - @staticmethod - def _add_span_to_save_ids_for_links(tag_to_be_removed, chapter_tag: BeautifulSoup): - """ - Function adds span with id from tag_to_be_removed - because this tag will be removed(unwrapped/extract) - Parameters - ---------- - tag_to_be_removed: Soup object - chapter_tag: BeautifulSoup - - Returns - ------- - None - updated body tag - - """ - - def _insert_span_with_attrs_before_tag(chapter_tag: BeautifulSoup, tag_to_be_removed: Tag, id_: str, - class_: list): - """Function inserts span before tag aren't supported by LiveCarta""" - new_tag = chapter_tag.new_tag("span") - new_tag.attrs["id"] = id_ or "" - new_tag.attrs["class"] = class_ or "" - new_tag.string = "\xa0" - tag_to_be_removed.insert_before(new_tag) - - if tag_to_be_removed.attrs.get("id"): - _insert_span_with_attrs_before_tag(chapter_tag=chapter_tag, tag_to_be_removed=tag_to_be_removed, - id_=tag_to_be_removed.attrs["id"], - class_=tag_to_be_removed.attrs.get("class")) +class HtmlEpubProcessor: + def __init__(self, logger: BookLogger = None, html_preprocessor=None): + self.logger = logger + self.html_preprocessor = html_preprocessor @staticmethod def prepare_title(title_of_chapter: str) -> str: @@ -78,7 +42,7 @@ class HtmlEpubPreprocessor: Returns ------- - None + NoReturn Chapter Tag without comments """ @@ -110,202 +74,28 @@ class HtmlEpubPreprocessor: p_tag.append(str(node)) node.replace_with(p_tag) - def _wrap_tags_with_table(self, chapter_tag: BeautifulSoup, rules: list): - """ - Function wraps with - Parameters - ---------- - chapter_tag: BeautifulSoup - Tag & contents of the chapter tag - - Returns - ------- - None - Chapter Tag with wrapped certain tags with
    - - """ - - def _wrap_tag_with_table(width="100", border="", bg_color=None): - table = chapter_tag.new_tag("table") - table.attrs["border"], table.attrs["align"], table.attrs["style"] \ - = border, "center", f"width:{width}%;" - tbody, tr, td = \ - chapter_tag.new_tag("tbody"), chapter_tag.new_tag("tr"), chapter_tag.new_tag("td") - td.attrs["bgcolor"] = bg_color - tag_to_wrap.wrap(td) - td.wrap(tr) - tr.wrap(tbody) - tbody.wrap(table) - table.insert_after(BeautifulSoup(features="lxml").new_tag("br")) - return table - - def process_tag_using_table(): - _wrap_tag_with_table( - width=tag_to_wrap.attrs["width"] if tag_to_wrap.attrs.get("width") else "100", - border=tag_to_wrap.attrs["border"] if tag_to_wrap.attrs.get("border") else None, - bg_color=tag_to_wrap.attrs["bgcolor"] if tag_to_wrap.attrs.get("bgcolor") else None) - self._add_span_to_save_ids_for_links(tag_to_wrap, chapter_tag) - tag_to_wrap.unwrap() - - for rule in rules: - tags = rule["tags"] - for attr in rule["attrs"]: - for tag_to_wrap in chapter_tag.find_all([re.compile(tag) for tag in tags], - {attr["name"]: re.compile(fr"{attr['value']}")}): - process_tag_using_table() - - @staticmethod - def _tags_to_correspond_livecarta_tag(chapter_tag: BeautifulSoup, rules: list): - """ - Function to replace all tags to correspond LiveCarta tags - Parameters - ---------- - chapter_tag: BeautifulSoup - Tag & contents of the chapter tag - - Returns - ------- - None - Chapter Tag with all tags replaced with LiveCarta tags - - """ - for rule in rules: - tags = rule["tags"] - tag_to_replace = rule["tag_to_replace"] - if rule["condition"]: - for condition_on_tag in ((k, v) for k, v in rule["condition"].items() if v): - if condition_on_tag[0] == 'parent_tags': - for tag in chapter_tag.find_all([re.compile(tag) for tag in tags]): - if tag.parent.select(condition_on_tag[1]): - tag.name = tag_to_replace - elif condition_on_tag[0] == 'child_tags': - for tag in chapter_tag.find_all([re.compile(tag) for tag in tags]): - if not tag.select(re.sub('[():]|not', '', condition_on_tag[1])): - tag.name = tag_to_replace - elif condition_on_tag[0] == "attrs": - for attr in rule["condition"]["attrs"]: - for tag in chapter_tag.find_all([re.compile(tag) for tag in tags], - {attr["name"]: re.compile(fr"{attr['value']}")}): - tag.name = tag_to_replace - else: - for tag in chapter_tag.find_all([re.compile(tag) for tag in tags]): - # todo can cause appearance of \n

    ...

    ->

    \n

    ...

    \n

    (section) - tag.name = tag_to_replace - - @staticmethod - def _replace_attrs_in_tags(chapter_tag: BeautifulSoup, rules: list): - """ - Function to replace all tags to correspond LiveCarta tags - Parameters - ---------- - chapter_tag: BeautifulSoup - Tag & contents of the chapter tag - - Returns - ------- - None - Chapter Tag with all tags replaced with LiveCarta tags - - """ - for rule in rules: - attr = rule["attr"] - tags = rule["condition"]["tags"] - attr_to_replace = rule["attr_to_replace"] - for tag in chapter_tag.find_all([re.compile(tag) for tag in tags], - {attr: re.compile(r".*")}): - tag[attr_to_replace] = tag[attr] - del tag[attr] - - def _unwrap_tags(self, chapter_tag: BeautifulSoup, rules: dict): - """ - Function unwrap tags and moves id to span - Parameters - ---------- - chapter_tag: BeautifulSoup - Tag & contents of the chapter tag - - Returns - ------- - None - Chapter Tag with unwrapped certain tags - - """ - for tag_name in rules["tags"]: - for tag in chapter_tag.select(tag_name): - # if tag is a subtag - if ">" in tag_name: - tag.parent.attrs.update(tag.attrs) - self._add_span_to_save_ids_for_links(tag, chapter_tag) - tag.unwrap() - - @staticmethod - def _insert_tags_into_correspond_tags(chapter_tag: BeautifulSoup, rules: list): - """ - Function inserts tags into correspond tags - Parameters - ---------- - chapter_tag: BeautifulSoup - Tag & contents of the chapter tag - - Returns - ------- - None - Chapter Tag with inserted tags - - """ - def insert(tag): - tag_to_insert = \ - chapter_tag.new_tag(rule["tag_to_insert"]) - # insert all items that was in tag to subtag and remove from tag - for content in reversed(tag.contents): - tag_to_insert.insert(0, content.extract()) - # wrap subtag with items - tag.append(tag_to_insert) - - for rule in rules: - tags = rule["tags"] - if rule["condition"]: - for condition_on_tag in ((k, v) for k, v in rule["condition"].items() if v): - if condition_on_tag[0] == 'parent_tags': - for tag in chapter_tag.find_all([re.compile(tag) for tag in tags]): - if tag.parent.select(condition_on_tag[1]): - insert(tag) - elif condition_on_tag[0] == 'child_tags': - for tag in chapter_tag.find_all([re.compile(tag) for tag in tags]): - if not tag.select(re.sub('[():]|not', '', condition_on_tag[1])): - insert(tag) - elif condition_on_tag[0] == "attrs": - for attr in rule["condition"]["attrs"]: - for tag in chapter_tag.find_all([re.compile(tag) for tag in tags], - {attr["name"]: re.compile(fr"{attr['value']}")}): - insert(tag) - else: - for tag in chapter_tag.find_all([re.compile(tag) for tag in tags]): - insert(tag) - - def _remove_headings_content(self, chapter_tag, title_of_chapter: str): + def _remove_headings_content(self, chapter_tag: Union[BeautifulSoup, PageElement], title_of_chapter: str): """ Function - cleans/removes headings from chapter in order to avoid duplication of chapter titles in the content - adds span with id in order to Parameters ---------- - chapter_tag: soup object + chapter_tag: Union[BeautifulSoup, PageElement] Tag of the page title_of_chapter: str Chapter title Returns ------- - None + NoReturn clean/remove headings & add span with id """ title_of_chapter = title_of_chapter.lower() - if title_of_chapter == "chapter 1": - pass for tag in chapter_tag.contents: - text = tag if isinstance(tag, NavigableString) else tag.text + tag: PageElement + text: str = tag if isinstance(tag, NavigableString) else tag.text if re.sub(r"[\s\xa0]", "", text): text = re.sub(r"[\s\xa0]", " ", text).lower() text = text.strip() # delete extra spaces @@ -313,7 +103,8 @@ class HtmlEpubPreprocessor: if title_of_chapter == text or \ (title_of_chapter in text and re.findall(r"^h[1-3]$", tag.name or chapter_tag.name)): - self._add_span_to_save_ids_for_links(tag, chapter_tag) + self.html_preprocessor._add_span_to_save_ids_for_links( + tag, chapter_tag) tag.extract() return elif not self._remove_headings_content(tag, title_of_chapter): @@ -322,43 +113,6 @@ class HtmlEpubPreprocessor: tag.extract() return - @staticmethod - def _process_tables(chapter_tag: BeautifulSoup): - """ - Function preprocesses tables and tags(td|th|tr) - Parameters - ---------- - chapter_tag: BeautifulSoup - Tag & contents of the chapter tag - - Returns - ------- - None - Chapter Tag with processed tables - - """ - tables = chapter_tag.find_all("table") - for table in tables: - for t_tag in table.find_all(re.compile("td|th|tr")): - width = "" - if t_tag.get("style"): - width_match = re.search( - r"[^-]width: ?(\d+\.?\d*)(p[tx])", t_tag["style"]) - if width_match: - size = width_match.group(1) - width = size + "px" - - t_tag.attrs["width"] = t_tag.get("width") or width - - if t_tag.attrs.get("style"): - t_tag.attrs["style"] = t_tag.attrs["style"].replace( - "border:0;", "") - if re.sub(r"[\s\xa0]", "", t_tag.attrs.get("style")) == "": - del t_tag.attrs["style"] - - if not table.attrs.get("border") or table.attrs.get("border") in ["0", "0px"]: - table.attrs["border"] = "1" - @staticmethod def _class_removing(chapter_tag: BeautifulSoup): """ @@ -370,7 +124,7 @@ class HtmlEpubPreprocessor: Returns ------- - None + NoReturn Chapter Tag without original classes of the book """ @@ -379,14 +133,14 @@ class HtmlEpubPreprocessor: and (tag.attrs.get("class") not in ["link-anchor", "footnote-element"]): del tag.attrs["class"] - def prepare_content(self, title_str: str, content_tag: BeautifulSoup, remove_title_from_chapter: bool) -> Tag: + def prepare_content(self, title_str: str, chapter_tag: BeautifulSoup, remove_title_from_chapter: bool) -> Tag: """ Function finalise processing/cleaning content Parameters ---------- title_str: str - content_tag: Tag, soup object + chapter_tag: BeautifulSoup, soup object remove_title_from_chapter: bool @@ -394,13 +148,13 @@ class HtmlEpubPreprocessor: ---------- 1. comments removal 2. wrap NavigableString with tag

    - 3-6. wrap tags with

    + 3. heading removal + 4. wrap tags with
    replace tags with correspond LiveCarta tags + replace/remove attrs, values of attrs unwrap tags insert tags into correspond tags - 7. heading removal - 8. process_tables - 9. class removal + 5. class removal Returns ------- @@ -409,18 +163,15 @@ class HtmlEpubPreprocessor: """ # 1. remove comments - self._remove_comments(content_tag) + self._remove_comments(chapter_tag) # 2. - self._wrap_strings_with_p(content_tag) - # 3-6. - for dict in self.preset: - func = self.name2function[dict["preset_name"]] - func(content_tag, dict['rules']) - # 7. + self._wrap_strings_with_p(chapter_tag) + # 3. if remove_title_from_chapter: - self._remove_headings_content(content_tag, title_str) - # 8. - self._process_tables(content_tag) - # 9. remove classes that weren't created by converter - self._class_removing(content_tag) - return content_tag + self._remove_headings_content(chapter_tag, title_str) + # 4. + _process_presets( + html_preprocessor=self.html_preprocessor, html_soup=chapter_tag) + # 5. remove classes that weren't created by converter + self._class_removing(chapter_tag) + return chapter_tag diff --git a/src/epub_converter/image_processing.py b/src/epub_converter/image_processing.py index 6f35c3a..da4e8a7 100644 --- a/src/epub_converter/image_processing.py +++ b/src/epub_converter/image_processing.py @@ -1,37 +1,38 @@ import os import pathlib +from typing import Dict from bs4 import BeautifulSoup from src.access import Access -def save_image_to_aws(access: Access, img_file_path: str, img_content: bytes, book_id: str): +def save_image_to_aws(access: Access, img_file_path: str, img_content: bytes, book_id: str) -> str: """Function saves all images to Amazon web service""" - link_path = access.send_image( + link_path: str = access.send_image( img_file_path, doc_id=book_id, img_content=img_content) return link_path -def save_image_locally(img_file_path: str, img_content: bytes, book_id: str): +def save_image_locally(img_file_path: str, img_content: bytes, book_id: str) -> pathlib.Path: """Function saves all images locally""" folder_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) new_path = pathlib.Path(os.path.join( folder_path, f"../books/json/img_{book_id}/")) new_path.mkdir(exist_ok=True) - new_img_path = new_path / os.path.basename(img_file_path) - f = open(new_img_path, "wb+") + img_folder_path = new_path / os.path.basename(img_file_path) + f = open(img_folder_path, "wb+") f.write(img_content) f.close() - return new_img_path + return img_folder_path def update_images_src_links(body_tag: BeautifulSoup, - img_href2img_content: dict, + img_href2img_content: Dict[str, bytes], path_to_html: str, - access=None, - path2aws_path: dict = None, - book_id: str = None) -> dict: + access: Access = None, + path2aws_path: Dict[str, str] = None, + book_id: str = None) -> Dict[str, str]: """Function makes dictionary image_src_path -> Amazon web service_path""" img_tags = body_tag.find_all("img") for img in img_tags: @@ -43,23 +44,20 @@ def update_images_src_links(body_tag: BeautifulSoup, assert path_to_img_from_root in img_href2img_content, \ f"Image {path_to_img_from_html} in file {path_to_html} was not added to manifest." - img_content = img_href2img_content[path_to_img_from_root] + img_content: bytes = img_href2img_content[path_to_img_from_root] if access is not None: if path_to_img_from_root in path2aws_path: - new_folder = path2aws_path[path_to_img_from_root] + img_folder_path = path2aws_path[path_to_img_from_root] else: - new_folder = save_image_to_aws( + img_folder_path = save_image_to_aws( access, path_to_img_from_root, img_content, book_id) - path2aws_path[path_to_img_from_root] = new_folder + path2aws_path[path_to_img_from_root] = img_folder_path else: - new_folder = save_image_locally( + img_folder_path = save_image_locally( path_to_img_from_root, img_content, book_id) - img.attrs["src"] = str(new_folder) - if img.attrs.get("width"): - del img.attrs["width"] - if img.attrs.get("height"): - del img.attrs["height"] - if img.attrs.get("style"): - del img.attrs["style"] + img.attrs["src"] = str(img_folder_path) + for attr in ["width", "height", "style"]: + if img.attrs.get(attr): + del img.attrs[attr] return path2aws_path diff --git a/src/html_presets_processor.py b/src/html_presets_processor.py new file mode 100644 index 0000000..30f7906 --- /dev/null +++ b/src/html_presets_processor.py @@ -0,0 +1,182 @@ +import re +import json +from bs4 import BeautifulSoup, Tag +from bs4.element import PageElement +from typing import List, Dict, Union + +from src.util.helpers import BookLogger + + +class HtmlPresetsProcessor: + def __init__(self, logger: BookLogger, preset_path): + self.preset = json.load(open(preset_path)) + self.logger = logger + self.name2action = { + "wrapper": self._wrap_tag, + "table_wrapper": self._process_tag_using_table, + "decomposer": self._decompose_tag, + "replacer": self._replace_tag, + "attr_replacer": self._replace_attr, + "unwrapper": self._unwrap_tag, + "inserter": self._insert_tag + } + + @staticmethod + def _wrap_tag(**kwargs): + kwargs["tag"].wrap(kwargs["body_tag"].new_tag( + kwargs["rule"]["tag_to_wrap"])) + + @staticmethod + def _decompose_tag(**kwargs): + kwargs["tag"].decompose() + + @staticmethod + def _add_span_to_save_ids_for_links(tag_to_be_removed: Union[PageElement, BeautifulSoup], + chapter_tag: BeautifulSoup): + """ + Function adds span with id from tag_to_be_removed + because this tag will be removed(unwrapped/extract) + Parameters + ---------- + tag_to_be_removed: Union[PageElement, BeautifulSoup] + + chapter_tag: BeautifulSoup + + Returns + ------- + NoReturn + updated body tag + + """ + def _insert_span_with_attrs_before_tag(chapter_tag: BeautifulSoup, + tag_to_be_removed: Tag, + id_: str, + class_: Union[List[str], str]): + """Function inserts span before tag aren't supported by LiveCarta""" + new_tag: Tag = chapter_tag.new_tag("span") + new_tag.attrs["id"] = id_ or "" + new_tag.attrs["class"] = class_ or "" + new_tag.string = "\xa0" + tag_to_be_removed.insert_before(new_tag) + + if tag_to_be_removed.attrs.get("id"): + _insert_span_with_attrs_before_tag(chapter_tag=chapter_tag, + tag_to_be_removed=tag_to_be_removed, + id_=tag_to_be_removed.attrs["id"], + class_=tag_to_be_removed.attrs.get("class")) + + def _process_tag_using_table(self, **kwargs): + def _wrap_tag_with_table(width: str = "100", border: str = "", bg_color: str = None) -> Tag: + table = kwargs["body_tag"].new_tag("table") + table.attrs["border"], table.attrs["align"], table.attrs["style"] \ + = border, "center", f"width:{width}%;" + tbody, tr, td = \ + kwargs["body_tag"].new_tag("tbody"), kwargs["body_tag"].new_tag( + "tr"), kwargs["body_tag"].new_tag("td") + td.attrs["bgcolor"] = bg_color + kwargs["tag"].wrap(td) + td.wrap(tr) + tr.wrap(tbody) + tbody.wrap(table) + table.insert_after(BeautifulSoup(features="lxml").new_tag("br")) + return table + _wrap_tag_with_table( + width=kwargs["tag"].attrs["width"] if kwargs["tag"].attrs.get( + "width") else "100", + border=kwargs["tag"].attrs["border"] if kwargs["tag"].attrs.get( + "border") else None, + bg_color=kwargs["tag"].attrs["bgcolor"] if kwargs["tag"].attrs.get("bgcolor") else None) + self._add_span_to_save_ids_for_links(kwargs["tag"], kwargs["body_tag"]) + kwargs["tag"].unwrap() + + @staticmethod + def _replace_tag(**kwargs): + tag_to_replace: str = kwargs["rule"]["tag_to_replace"] + kwargs["tag"].name = tag_to_replace + + @staticmethod + def _replace_attr(**kwargs): + attr, attr_value =\ + kwargs["rule"]["attr"]["name"], kwargs["rule"]["attr"]["value"] + attr_to_replace, attr_value_to_replace =\ + kwargs["rule"]["attr_to_replace"]["name"], kwargs["rule"]["attr_to_replace"]["value"] + if attr_to_replace: + kwargs["tag"][attr_to_replace] = kwargs["tag"][attr] + if attr_value_to_replace: + kwargs["tag"].attrs[attr_to_replace] = attr_value_to_replace + del kwargs["tag"][attr] + elif attr_value_to_replace: + kwargs["tag"].attrs[attr] = attr_value_to_replace + elif attr: + del kwargs["tag"][attr] + + @staticmethod + def _unwrap_tag(**kwargs): + kwargs["tag"].unwrap() + + @staticmethod + def _insert_tag(**kwargs): + tag_to_insert = \ + kwargs["body_tag"].new_tag(kwargs["rule"]["tag_to_insert"]) + # insert all items that was in tag to subtag and remove from tag + for content in reversed(kwargs["tag"].contents): + tag_to_insert.insert(0, content.extract()) + # wrap subtag with items + kwargs["tag"].append(tag_to_insert) + + @staticmethod + def _process_tags(body_tag: BeautifulSoup, + rules: List[Dict[str, Union[List[str], str, Dict[str, Union[List[Dict[str, str]], int, str]]]]], + action): + """ + Function does action with tags + Parameters + ---------- + body_tag: BeautifulSoup + Tag & contents of the body tag + rules: List[Dict[str, Union[List[str], str, Dict[str, Union[List[Dict[str, str]], int, str]]]]] + list of conditions when fire function + action: function + action what to do with tag + Returns + ------- + NoReturn + Body Tag with processed certain tags + + """ + for rule in rules: + tags: List[str] = rule["tags"] if rule.get( + "tags") else rule["condition"]["tags"] + if rule["condition"]: + for condition_on_tag in ((k, v) for k, v in rule["condition"].items() if v): + if condition_on_tag[0] == "parent_tags": + for parent_tag in body_tag.select(condition_on_tag[1]): + for tag in parent_tag.find_all([re.compile(tag) for tag in tags]): + # parent_tag != tag.parent + tag.parent.attrs.update(tag.attrs) + action(body_tag=body_tag, tag=tag, rule=rule) + elif condition_on_tag[0] == "child_tags": + for tag in body_tag.find_all([re.compile(tag) for tag in tags]): + if tag.select(condition_on_tag[1]): + action(body_tag=body_tag, tag=tag, rule=rule) + elif condition_on_tag[0] == "attrs": + for attr in rule["condition"]["attrs"]: + for tag in body_tag.find_all([re.compile(tag) for tag in tags], + {attr["name"]: re.compile(fr"{attr['value']}")}): + action(body_tag=body_tag, tag=tag, rule=rule) + # attr replacer + elif condition_on_tag[0] == "tags": + attr = rule["attr"] + for tag in body_tag.find_all([re.compile(tag) for tag in tags], + {attr['name']: re.compile(fr"{attr['value']}")}): + action(body_tag=body_tag, tag=tag, rule=rule) + else: + for tag in body_tag.find_all([re.compile(tag) for tag in tags]): + action(body_tag=body_tag, tag=tag, rule=rule) + + +def _process_presets(html_preprocessor: HtmlPresetsProcessor, html_soup: BeautifulSoup): + for rule in html_preprocessor.preset: + # html_preprocessor.logger.log(rule["preset_name"].title() + " process.") + action = html_preprocessor.name2action[rule["preset_name"]] + html_preprocessor._process_tags(html_soup, rule["rules"], action) diff --git a/src/epub_converter/tag_inline_style_processor.py b/src/inline_style_processor.py similarity index 82% rename from src/epub_converter/tag_inline_style_processor.py rename to src/inline_style_processor.py index 30d7e50..d63122a 100644 --- a/src/epub_converter/tag_inline_style_processor.py +++ b/src/inline_style_processor.py @@ -1,23 +1,23 @@ import re import cssutils from typing import List - from logging import CRITICAL -from bs4 import BeautifulSoup +from premailer import transform +from bs4 import BeautifulSoup, Tag from src.livecarta_config import LiveCartaConfig cssutils.log.setLevel(CRITICAL) -class TagInlineStyleProcessor: - def __init__(self, tag_inline_style): +class InlineStyleProcessor: + def __init__(self, tag_inline_style: Tag): # tag with inline style + style parsed from css file self.tag_inline_style = tag_inline_style - self.tag_inline_style.attrs['style'] = self.process_inline_style() + self.tag_inline_style.attrs['style']: str = self.process_inline_style() @staticmethod - def remove_white_if_no_bgcolor(style_, tag): + def remove_white_if_no_bgcolor(style_: str, tag: Tag) -> str: """Function remove text white color if there is no bg color""" if "background" in style_: style_ = style_.replace( @@ -62,13 +62,13 @@ class TagInlineStyleProcessor: # return split_style @staticmethod - def indents_processing(split_style: list) -> str: + def indents_processing(split_style: List[str]) -> str: """ Function process indents from left using formula_of_indent: indent = abs(margin - text_indent) Parameters ---------- - split_style: list + split_style: List[str] list of styles split by ";" Returns @@ -111,7 +111,7 @@ class TagInlineStyleProcessor: return processed_style return processed_style - def process_inline_style(self): + def process_inline_style(self) -> str: """ Function processes final(css+initial inline) inline style Steps @@ -180,7 +180,7 @@ class TagInlineStyleProcessor: self.tag_inline_style.append(correspond_tag) @staticmethod - def wrap_span_in_tag_to_save_style_attrs(initial_tag): + def wrap_span_in_tag_to_save_style_attrs(initial_tag: Tag): """Function designed to save style attrs that cannot be in tag.name -> span""" dictkeys_pattern = re.compile("|".join(LiveCartaConfig.LIVECARTA_STYLES_CAN_BE_IN_TAG)) if re.findall(dictkeys_pattern, initial_tag.name) and initial_tag.attrs.get("style"): @@ -212,7 +212,45 @@ class TagInlineStyleProcessor: initial_tag.attrs["style"] = span_style initial_tag.wrap(tag) - def convert_initial_tag(self): + def convert_initial_tag(self) -> Tag: self.change_attrs_with_corresponding_tags() self.wrap_span_in_tag_to_save_style_attrs(self.tag_inline_style) return self.tag_inline_style + + +def modify_html_soup_with_css_styles(html_soup: BeautifulSoup, css_text: str = "") -> BeautifulSoup: + """ + Function adds styles from .css to inline style. + Parameters + ---------- + html_soup: BeautifulSoup + html page with inline style + css_text: str + css content from css file + Returns + ------- + inline_soup: BeautifulSoup + soup with styles from css + + """ + # remove this specification because it causes problems + css_text = css_text.replace( + '@namespace epub "http://www.idpf.org/2007/ops";', '') + # here we add css styles to inline style + html_with_css_styles: str = transform(str(html_soup), css_text=css_text, + remove_classes=False, + external_styles=False, + allow_network=False, + disable_validation=True, + ) + # soup with converted styles from css + inline_soup = BeautifulSoup(html_with_css_styles, features="lxml") + + tags_with_inline_style = inline_soup.find_all(LiveCartaConfig.could_have_style_in_livecarta_regexp, + attrs={"style": re.compile(".*")}) + + # go through the tags with inline style + style parsed from css file + for tag_inline_style in tags_with_inline_style: + style_converter = InlineStyleProcessor(tag_inline_style) + style_converter.convert_initial_tag() + return inline_soup diff --git a/src/epub_converter/css_processor.py b/src/style_reader.py similarity index 86% rename from src/epub_converter/css_processor.py rename to src/style_reader.py index 2be0dab..9810caf 100644 --- a/src/epub_converter/css_processor.py +++ b/src/style_reader.py @@ -1,13 +1,13 @@ import re import cssutils -from bs4 import BeautifulSoup +from typing import Tuple from os.path import dirname, normpath, join from src.util.color_reader import str2hex from src.livecarta_config import LiveCartaConfig -class CSSPreprocessor: +class StyleReader: def __init__(self): """ Dictionary LIVECARTA_STYLE_ATTRS_MAPPING = { property: mapping function } @@ -41,13 +41,13 @@ class CSSPreprocessor: } @staticmethod - def get_text_color(x): + def get_text_color(x: str) -> str: color = str2hex(x) color = color if color not in ["#000000", "#000", "black"] else "" return color @staticmethod - def get_bg_color(x): + def get_bg_color(x: str) -> str: color = str2hex(x) color = color if color not in ["#ffffff", "#fff", "white"] else "" return color @@ -56,7 +56,7 @@ class CSSPreprocessor: def convert_tag_style_values(size_value: str, is_indent: bool = False) -> str: """ Function - - converts values of tags from em/%/pt to px + - converts values of tags from em/%/pt/in to px - find closest font-size px Parameters ---------- @@ -70,20 +70,23 @@ class CSSPreprocessor: converted value size """ size_regexp = re.compile( - r"(^-*(\d*\.*\d+)%$)|(^-*(\d*\.*\d+)em$)|(^-*(\d*\.*\d+)pt$)") + r"(^-*(\d*\.*\d+)%$)|(^-*(\d*\.*\d+)em$)|(^-*(\d*\.*\d+)pt$)|(^-*(\d*\.*\d+)in$)") has_style_attrs = re.search(size_regexp, size_value) if has_style_attrs: if has_style_attrs.group(1): multiplier = 5.76 if is_indent else 0.16 size_value = float(size_value.replace("%", "")) * multiplier - return str(size_value)+'px' + return str(size_value) + "px" elif has_style_attrs.group(3): multiplier = 18 if is_indent else 16 size_value = float(size_value.replace("em", "")) * multiplier - return str(size_value)+'px' + return str(size_value) + "px" elif has_style_attrs.group(5): size_value = float(size_value.replace("pt", "")) * 4/3 - return str(size_value)+'px' + return str(size_value) + "px" + elif has_style_attrs.group(7): + size_value = float(size_value.replace("in", "")) * 96 + return str(size_value) + "px" else: return "" return size_value @@ -114,7 +117,7 @@ class CSSPreprocessor: return cleaned_value @staticmethod - def style_conditions(style_value: str, style_name: str) -> tuple[bool, bool]: + def style_conditions(style_value: str, style_name: str) -> Tuple[bool, bool]: constraints_on_value = LiveCartaConfig.LIVECARTA_STYLE_ATTRS.get( style_name) value_not_in_possible_values_list = style_value not in LiveCartaConfig.LIVECARTA_STYLE_ATTRS[ @@ -156,20 +159,20 @@ class CSSPreprocessor: style = "; ".join(split_style) return style - def process_inline_styles_in_html_soup(self, html_href2html_body_soup: dict): + def process_inline_styles_in_html_soup(self, html_content): """This function is designed to convert inline html styles""" - for html_href in html_href2html_body_soup: - html_content: BeautifulSoup = html_href2html_body_soup[html_href] - tags_with_inline_style = html_content.find_all(LiveCartaConfig.could_have_style_in_livecarta_regexp, - attrs={"style": re.compile(".*")}) + tags_with_inline_style = html_content.find_all(LiveCartaConfig.could_have_style_in_livecarta_regexp, + attrs={"style": re.compile(".*")}) - for tag_initial_inline_style in tags_with_inline_style: - inline_style = tag_initial_inline_style.attrs["style"] - tag_initial_inline_style.attrs["style"] = \ - self.build_inline_style_content(inline_style) + for tag_initial_inline_style in tags_with_inline_style: + inline_style = tag_initial_inline_style.attrs["style"] + if tag_initial_inline_style.attrs.get("align"): + inline_style += f";text-align: {tag_initial_inline_style.attrs['align']};" + tag_initial_inline_style.attrs["style"] = \ + self.build_inline_style_content(inline_style) @staticmethod - def get_css_content(css_href, html_href, ebooklib_book): + def get_css_content(css_href: str, html_href: str, ebooklib_book) -> str: path_to_css_from_html = css_href html_folder = dirname(html_href) path_to_css_from_root = normpath( diff --git a/src/util/check_dirs.py b/src/util/check_dirs.py index 542763d..f7a0af0 100644 --- a/src/util/check_dirs.py +++ b/src/util/check_dirs.py @@ -4,13 +4,12 @@ import argparse def parse_args(): parser = argparse.ArgumentParser(description="Utility for folders's clean up.") - parser.add_argument('-f', '--folders', type=str, nargs='*', help='Names of the folders to be cleaned.') - + parser.add_argument("-f", "--folders", type=str, nargs="*", help="Names of the folders to be cleaned.") args = parser.parse_args() return args -def check_dir(dir_path): +def check_dir(dir_path: str): if not os.path.exists(dir_path): try: os.mkdir(dir_path) @@ -18,18 +17,16 @@ def check_dir(dir_path): raise exc -if __name__ == '__main__': +if __name__ == "__main__": folders = parse_args().folders if not folders: - folders = ['docx', 'html', 'json', 'logs', 'config'] + folders = ["books/epub", "books/docx", "books/html", "books/json", "logs", "config"] folder_path = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) folders = [os.path.join(folder_path, folder) for folder in folders] try: - for folder in folders: - check_dir(folder) - + [check_dir(folder) for folder in folders] except OSError as exc: print(exc) raise diff --git a/src/util/check_packs.py b/src/util/check_packs.py index 1b2f2e9..512f9a8 100644 --- a/src/util/check_packs.py +++ b/src/util/check_packs.py @@ -6,15 +6,15 @@ import subprocess def parse_args(): parser = argparse.ArgumentParser(description="Utility for checking installed packages.") - parser.add_argument('-p', '--packages', type=str, nargs='*', help='Names of the packages.') + parser.add_argument("-p", "--packages", type=str, nargs="*", help="Names of the packages.") args = parser.parse_args() return args def check_packages(required_packs): - inst = subprocess.check_output([sys.executable, '-m', 'pip', 'freeze']) - installed_packages = [r.decode().split('==')[0] for r in inst.split()] + inst = subprocess.check_output([sys.executable, "-m", "pip", "freeze"]) + installed_packages = [r.decode().split("==")[0] for r in inst.split()] to_be_installed = [] for package in required_packs: @@ -24,19 +24,19 @@ def check_packages(required_packs): return to_be_installed -if __name__ == '__main__': +if __name__ == "__main__": required_packs = parse_args().packages if not required_packs: folder_path = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) - req_path = os.path.join(folder_path, 'requirements.txt') + req_path = os.path.join(folder_path, "requirements.txt") - with open(req_path, 'r') as f: + with open(req_path, "r") as f: packs = f.readlines() - required_packs = [pack.split('>=')[0] for pack in packs] + required_packs = [pack.split(">=")[0] for pack in packs] not_inst_packs = check_packages(required_packs) if not_inst_packs: - raise Exception(f'{" ".join(not_inst_packs)} are not installed.') + raise Exception(f"{' '.join(not_inst_packs)} are not installed.") else: - print('All required packages has been installed.') + print("All required packages has been installed.") diff --git a/src/util/color_reader.py b/src/util/color_reader.py index 82fb451..92b3ee7 100644 --- a/src/util/color_reader.py +++ b/src/util/color_reader.py @@ -1,10 +1,10 @@ import re - +from typing import Tuple from colorsys import hls_to_rgb from webcolors import html4_hex_to_names, hex_to_rgb, rgb_to_name, rgb_percent_to_hex, rgb_to_hex, css3_names_to_hex -def closest_colour_rgb(requested_color): +def closest_colour_rgb(requested_color: Tuple[int, ...]) -> str: """ Function finds closes colour rgb """ min_colours = {} for key, name in html4_hex_to_names.items(): @@ -17,10 +17,10 @@ def closest_colour_rgb(requested_color): return min_colours[min(min_colours.keys())] -def rgb2color_name(color): +def rgb2color_name(color: Tuple[int, ...]) -> str: """ Transform rgb -> color name """ try: - closest_name = actual_name = rgb_to_name(color, 'html4') + closest_name = actual_name = rgb_to_name(color, "html4") except ValueError: closest_name = closest_colour_rgb(color) actual_name = None @@ -30,15 +30,15 @@ def rgb2color_name(color): return closest_name -def hex2color_name(color): +def hex2color_name(color: str) -> str: """ Transform hex -> color name """ try: color = hex_to_rgb(color) except ValueError: - return '' + return "" try: - closest_name = actual_name = rgb_to_name(color, 'html4') + closest_name = actual_name = rgb_to_name(color, "html4") except ValueError: closest_name = closest_colour_rgb(color) actual_name = None @@ -48,41 +48,41 @@ def hex2color_name(color): return closest_name -def str2closest_html_color_name(s: str): +def str2closest_html_color_name(s: str) -> str: """ Transform str -> closest color name """ - if 'rgb' in s: - rgb_str = 'rgba' if ('rgba' in s) else 'rgb' - s = s.replace(rgb_str, '').replace('(', '').replace(')', '') + if "rgb" in s: + rgb_str = "rgba" if ("rgba" in s) else "rgb" + s = s.replace(rgb_str, "").replace("(", "").replace(")", "") try: - rgb = [int(x) for x in s.split(',')[:3]] + rgb = [int(x) for x in s.split(",")[:3]] rgb = tuple(rgb) except ValueError: - return '' + return "" if len(rgb) != 3: - return '' + return "" name = rgb2color_name(rgb) return name - elif '#' in s: - if s in ['#996A95', '#D5C9D3', '#E9E2E8', '#70416F']: - return 'purple' - if s in ['#FFD472', '#F47B4D', '#FFFBEF', '#F47B4D']: - return 'olive' - if s in ['#B0DFD7', '#EFF8F6', '#5CC4B7']: - return 'teal' + elif "#" in s: + if s in ["#996A95", "#D5C9D3", "#E9E2E8", "#70416F"]: + return "purple" + if s in ["#FFD472", "#F47B4D", "#FFFBEF", "#F47B4D"]: + return "olive" + if s in ["#B0DFD7", "#EFF8F6", "#5CC4B7"]: + return "teal" name = hex2color_name(s) - if (name == 'white') and (s.lower() not in ['#ffffff', '#fff']): - name = 'gray' + if (name == "white") and (s.lower() not in ["#ffffff", "#fff"]): + name = "gray" return name elif s in html4_hex_to_names.items(): return s else: - return '' + return "" -def rgba2rgb(r, g, b, alpha): +def rgba2rgb(r: int, g: int, b: int, alpha: float) -> Tuple[int, int, int]: """ Transform rgba -> rgb """ r_background, g_background, b_background = 255, 255, 255 r_new = int((1 - alpha) * r_background + alpha * r) @@ -91,28 +91,28 @@ def rgba2rgb(r, g, b, alpha): return r_new, g_new, b_new -def str2hex(s: str): +def str2hex(s: str) -> str: """ Transform str -> hex """ - if '#' in s and (len(s) <= 7): + if "#" in s and (len(s) <= 7): return s.lower() - if ('rgb' in s.lower()) and ('%' in s): - match = re.search(r'rgba*\(((\d+)%, *(\d+)%, *(\d+)%(, \d\.\d+)*)\)', s) + if ("rgb" in s.lower()) and ("%" in s): + match = re.search(r"rgba*\(((\d+)%, *(\d+)%, *(\d+)%(, \d\.\d+)*)\)", s) if match: r, g, b = int(match.group(2)), int(match.group(3)), int(match.group(4)) return rgb_percent_to_hex((r, g, b)) - if 'rgb' in s.lower(): - rgba = re.findall('([0-9] *\.?[0-9]+)', s) + if "rgb" in s.lower(): + rgba = re.findall("([0-9] *\.?[0-9]+)", s) r, g, b = int(rgba[0]), int(rgba[1]), int(rgba[2]) if len(rgba) == 4: alpha = float(rgba[3]) r, g, b = rgba2rgb(r, g, b, alpha) return rgb_to_hex((r, g, b)) - if 'hsl' in s.lower(): + if "hsl" in s.lower(): # hsl(hue in {0,360}, saturation [0, 100%], lightness [0, 100%]) - match = re.search(r'hsla*\(((\d+), *(\d+)%, *(\d+)%, (\d\.\d+)*)\)', s) + match = re.search(r"hsla*\(((\d+), *(\d+)%, *(\d+)%, (\d\.\d+)*)\)", s) if match: h, s, l = int(match.group(2)), int(match.group(3)), int(match.group(4)) h /= 360 @@ -127,11 +127,10 @@ def str2hex(s: str): if s.lower() in css3_names_to_hex: return css3_names_to_hex[s.lower()] - return '' + return "" -if __name__ == '__main__': - +if __name__ == "__main__": colors = [ (75, 0, 130), (255, 0, 255), (139, 69, 19), (46, 139, 87), @@ -139,7 +138,7 @@ if __name__ == '__main__': ] hex_colors = [ - '#96F', '#000', '#4C4C4C', '#A00', '#99F' + "#96F", "#000", "#4C4C4C", "#A00", "#99F" ] for c in colors: diff --git a/src/util/helpers.py b/src/util/helpers.py index aafb632..ca95606 100644 --- a/src/util/helpers.py +++ b/src/util/helpers.py @@ -1,51 +1,60 @@ import os import logging +from typing import Union class ColoredFormatter(logging.Formatter): """ Class to prettify logger and command line output """ MAPPING = { - 'DEBUG': 37, # white - 'INFO': 36, # cyan - 'WARNING': 33, # yellow - 'ERROR': 31, # red - 'CRITICAL': 41, # white on red bg + "DEBUG": 37, # white + "INFO": 36, # cyan + "WARNING": 33, # yellow + "ERROR": 31, # red + "CRITICAL": 41, # white on red bg } - PREFIX = '\033[' - SUFFIX = '\033[0m' + PREFIX = "\033[" + SUFFIX = "\033[0m" def __init__(self, pattern): logging.Formatter.__init__(self, pattern) def format(self, record): seq = self.MAPPING.get(record.levelname, 37) # default white - record.levelname = '{0}{1}m{2}{3}' \ + record.levelname = "{0}{1}m{2}{3}" \ .format(self.PREFIX, seq, record.levelname, self.SUFFIX) return logging.Formatter.format(self, record) class BookLogger: - def __init__(self, name, book_id, main_logger=None, - filemode='w+', logging_level=logging.INFO, - logging_format='%(asctime)s - %(levelname)s - %(message)s [%(filename)s:%(lineno)d in %(funcName)s]'): + def __init__(self, name: str, book_id: Union[int, str], main_logger: logging.Logger = None, + filemode: str = "w+", logging_level: int = logging.INFO, + logging_format: str = "%(asctime)s - %(levelname)s - %(message)s [%(filename)s:%(lineno)d in %(funcName)s]"): """ Method for Logger configuration. Logger will write to file. - :param name: name of the Logger. - :param attr_name: name of attribute that will be added to self. - :param filename: name of the log file. - :param filemode: mode of opening log file. - :param logging_level: logging level: 10 - debug, 20 - info, 30 - warning, 40 - error, 50 - critical. - :param logging_format: format of record in log file. + Parameters + ---------- + name: str + name of the Logger + book_id: Union[int, str] + id of the book + main_logger: Logger + main logger of the converter + filemode: str + mode of opening log file. + logging_level: int + logging level: 10 - debug, 20 - info, 30 - warning, 40 - error, 50 - critical + logging_format: str + format of record in log file + """ self.main_logger = main_logger - self.logger = logging.getLogger(name) self.logger.propagate = False folder_path = os.path.dirname( os.path.dirname(os.path.abspath(__file__))) folder_path = os.path.dirname(folder_path) - filename = f'logs/{book_id}.log' + filename = f"logs/{book_id}.log" file_path = os.path.join(folder_path, filename) file_handler = logging.FileHandler(file_path, mode=filemode) file_format = logging.Formatter(logging_format) @@ -58,42 +67,46 @@ class BookLogger: self.logger.addHandler(stream_handler) self.logger.setLevel(logging_level) - def log(self, message, logging_level=20): + def log(self, message: str, logging_level: int = 20): """ Method for logging. + Parameters + ---------- + message: str + body of the message + logging_level: int + level of logging - :param message: body of the message - :param logging_level: level of logging """ self.logger.log(msg=message, level=logging_level, stacklevel=2) - def log_error_to_main_log(self, message=''): + def log_error_to_main_log(self, message: str = ""): """ Method for logging error to main log file. """ if self.main_logger: if not message: - message = f'Error in book conversion. Check log file.' + message = f"Error in book conversion. Check log file." self.main_logger.error(message) class BookStatusWrapper: """Class sets/updates statuses of Converter on Platform""" - def __init__(self, access, logger_object, book_id=0): + def __init__(self, access, logger_object: BookLogger, book_id: int = 0): self.access = access self.logger_object = logger_object self.book_id = book_id def set_status(self, status: str): str_2_status = { - '[PROCESS]': self.access.PROCESS, - '[GENERATE]': self.access.GENERATE, - '[ERROR]': self.access.ERROR + "[PROCESS]": self.access.PROCESS, + "[GENERATE]": self.access.GENERATE, + "[ERROR]": self.access.ERROR } try: if self.access: self.access.update_status(self.book_id, str_2_status[status]) - self.logger_object.log(f'Status has been updated to {status}.') + self.logger_object.log(f"Status has been updated to {status}.") except Exception as exc: self.logger_object.log( f"Can't update status of the book {status}.", logging.ERROR) @@ -101,10 +114,10 @@ class BookStatusWrapper: raise exc def set_processing(self): - self.set_status('[PROCESS]') + self.set_status("[PROCESS]") def set_generating(self): - self.set_status('[GENERATE]') + self.set_status("[GENERATE]") def set_error(self): - self.set_status('[ERROR]') + self.set_status("[ERROR]") diff --git a/src/util/rgb2closest_color.py b/src/util/rgb2closest_color.py index 6770684..92b16f9 100644 --- a/src/util/rgb2closest_color.py +++ b/src/util/rgb2closest_color.py @@ -1,4 +1,5 @@ -from webcolors import html4_hex_to_names, hex_to_rgb +from typing import Tuple +from webcolors import hex_to_rgb # 16 основных цветов, hex соответвуют hex цветам livecarta # названия другие @@ -8,7 +9,7 @@ html4_hex_to_names = {'#00ffff': 'aqua', '#000000': 'black', '#0000ff': 'blue', '#ffffff': 'white', '#ffff00': 'yellow'} -def rgb2hsv(r, g, b): +def rgb2hsv(r: int, g: int, b: int) -> Tuple[float, float, float]: r /= 255 g /= 255 b /= 255 @@ -42,18 +43,23 @@ for key, name in html4_hex_to_names.items(): HTML_COLORS_HSV[name] = (h, s, v) -def rgb2closest_html_color_name(color): +def rgb2closest_html_color_name(color: str) -> str: """ - - get color in hsv (hue, saturation, value) - try to match with black, grey, silver (black, darkGray, lightGray) as this colors matches badly even in hsv model - calc hue difference between color and all base colors - if for new base color hue diff same as for any other, try to measure saturation and value (it happens for similar colors like red - pink, blue - dark blue) + Parameters + ---------- + color: str + color in hex + + Returns + ------- + base color name that matches best to a given color - :param color: str, color in hex - :return: base color name that matches best to a given color """ if color == (255, 255, 255):