Merge branch 'kiryl/converter_fix' of https://github.com/Teqniksoft/LiveCarta_add_ons into kiryl/converter_fix

This commit is contained in:
Kiryl
2022-09-12 15:26:56 +03:00
27 changed files with 1178 additions and 1287 deletions

View File

@@ -3,6 +3,7 @@ import sys
import json import json
import pika import pika
import logging import logging
from typing import Dict
from pathlib import Path from pathlib import Path
from threading import Event from threading import Event
from functools import partial from functools import partial
@@ -13,8 +14,8 @@ from src.docx_converter.docx_solver import DocxBook
from src.epub_converter.epub_solver import EpubBook from src.epub_converter.epub_solver import EpubBook
def configure_file_logger(name, filename="logs/converter.log", filemode="w+", def configure_file_logger(name: str, filename: str = "logs/converter.log",
logging_level=logging.INFO): filemode: str = "w+", logging_level: int = logging.INFO) -> logging.Logger:
logger = logging.getLogger(name) logger = logging.getLogger(name)
folder_path = os.path.dirname(os.path.abspath(__file__)) folder_path = os.path.dirname(os.path.abspath(__file__))
@@ -30,7 +31,7 @@ def configure_file_logger(name, filename="logs/converter.log", filemode="w+",
return logger return logger
def local_convert_book(book_type: [DocxBook, EpubBook], book_id, logger, params: dict): def local_convert_book(book_type: [DocxBook, EpubBook], book_id: int, logger: logging.Logger, params: dict):
logger.info(f"Start processing book-{book_id}.") logger.info(f"Start processing book-{book_id}.")
try: try:
json_file_path = "books/json/9781614382264.json" json_file_path = "books/json/9781614382264.json"
@@ -41,7 +42,7 @@ def local_convert_book(book_type: [DocxBook, EpubBook], book_id, logger, params:
logger.info(f"Book-{book_id} has been proceeded.") logger.info(f"Book-{book_id} has been proceeded.")
def convert_book(book_type: [DocxBook, EpubBook], book_id, logger, params: dict): def convert_book(book_type: [DocxBook, EpubBook], book_id: int, logger: logging.Logger, params: Dict[str, Access]):
logger.info(f"Start processing book-{book_id}.") logger.info(f"Start processing book-{book_id}.")
try: try:
book = book_type(book_id=book_id, main_logger=logger, **params) book = book_type(book_id=book_id, main_logger=logger, **params)
@@ -51,7 +52,7 @@ def convert_book(book_type: [DocxBook, EpubBook], book_id, logger, params: dict)
logger.info(f"Book-{book_id} has been proceeded.") logger.info(f"Book-{book_id} has been proceeded.")
def callback(ch, method, properties, body, logger, libre_locker): def callback(ch, method, properties, body: bytes, logger: logging.Logger, libre_locker: Event):
print(f"Message: {body}.") print(f"Message: {body}.")
logger.info(f"Message: {body}.") logger.info(f"Message: {body}.")
try: try:
@@ -82,7 +83,6 @@ def callback(ch, method, properties, body, logger, libre_locker):
logger.error(f"{sys.exc_info()[0]}: {exc.message}") logger.error(f"{sys.exc_info()[0]}: {exc.message}")
else: else:
logger.error(f"{sys.exc_info()[0]}: {str(exc)}") logger.error(f"{sys.exc_info()[0]}: {str(exc)}")
finally: finally:
pass pass

1
presets/.gitignore vendored
View File

@@ -1,2 +1,3 @@
* *
!.gitignore !.gitignore
!*.json

152
presets/docx_presets.json Normal file
View File

@@ -0,0 +1,152 @@
[
{
"preset_name": "wrapper",
"rules": [
{
"tags": ["^div$"],
"condition": {
"parent_tags": null,
"child_tags": null,
"attrs": [
{
"name": "id",
"value": "^Table of Contents\\d+"
}
]
},
"tag_to_wrap": "TOC"
}
]
},
{
"preset_name": "decomposer",
"rules": [
{
"tags": ["^div$"],
"condition": {
"parent_tags": null,
"child_tags": null,
"attrs": [
{
"name": "title",
"value": "footer"
},
{
"name": "id",
"value": "^Table of Contents\\d+"
}
]
}
}
]
},
{
"preset_name": "replacer",
"rules": [
{
"tags": ["^h[6-9]$"],
"condition": null,
"tag_to_replace": "p"
},
{
"tags": ["^div$"],
"condition": {
"parent_tags": null,
"child_tags": null,
"attrs": [
{
"name": "style",
"value": "column-count: 2"
}
]
},
"tag_to_replace": "p"
}
]
},
{
"preset_name": "attr_replacer",
"rules": [
{
"attr": {
"name": "style",
"value": "column-count: 2"
},
"condition": {
"tags": ["^p$"]
},
"attr_to_replace": {
"name": "class",
"value": "columns2"
}
}
]
},
{
"preset_name": "unwrapper",
"rules": [
{
"tags": ["^span$"],
"condition": {
"parent_tags": ":is(h1, h2, h3, h4, h5, h6, h7, h8, h9)",
"child_tags": null,
"attrs": [
{
"name": "style",
"value": "(^background: #[\\da-fA-F]{6}$)|(^letter-spacing: -?[\\d.]+pt$)"
},
{
"name": "lang",
"value": "^ru-RU$"
},
{
"name": "face",
"value": "^Times New Roman[\\w, ]+$"
}
]
}
},
{
"tags": ["^p$"],
"condition": {
"parent_tags": ":is(li)",
"child_tags": null,
"attrs": null
}
},
{
"tags": ["^a$"],
"condition": {
"parent_tags": null,
"child_tags": null,
"attrs": [
{
"name": "name",
"value": "_GoBack"
}
]
}
},
{
"tags": ["^u$"],
"condition": {
"parent_tags": ":is(a)",
"child_tags": ":is(a)",
"attrs": null
}
},
{
"tags": ["^b$"],
"condition": {
"parent_tags": ":is(h1, h2, h3, h4, h5, h6, h7, h8, h9)",
"child_tags": null,
"attrs": null
}
},
{
"tags": ["^div$"],
"condition": null
}
]
}
]

View File

@@ -3,30 +3,38 @@
"preset_name": "table_wrapper", "preset_name": "table_wrapper",
"rules": [ "rules": [
{ {
"tags": ["div"], "tags": ["^div$"],
"attrs": [ "condition": {
{ "parent_tags": null,
"name": "width", "child_tags": null,
"value": ".*" "attrs": [
}, {
{ "name": "width",
"name": "border", "value": ".*"
"value": ".*" },
}, {
{ "name": "border",
"name": "bgcolor", "value": ".*"
"value": ".*" },
} {
] "name": "bgcolor",
"value": ".*"
}
]
}
}, },
{ {
"tags": ["section", "blockquote"], "tags": ["^section$", "^blockquote$"],
"attrs": [ "condition": {
"parent_tags": null,
"child_tags": null,
"attrs": [
{ {
"name": "class", "name": "class",
"value": "feature[1234]" "value": "feature[1234]"
} }
] ]
}
} }
] ]
}, },
@@ -73,37 +81,53 @@
"preset_name": "attr_replacer", "preset_name": "attr_replacer",
"rules": [ "rules": [
{ {
"attr": "xlink:href", "attr": {
"condition": { "name": "xlink:href",
"tags": ["img"] "value": ".*"
}, },
"attr_to_replace": "src" "condition": {
"tags": ["^img$"]
},
"attr_to_replace": {
"name": "src",
"value": null
}
} }
] ]
}, },
{ {
"preset_name": "unwrapper", "preset_name": "unwrapper",
"rules": { "rules": [
"tags": [ {
"section", "tags": [
"article", "^section$",
"figcaption", "^article$",
"main", "^figcaption$",
"body", "^main$",
"html", "^body$",
"svg", "^html$",
"li > p" "^svg$"
] ],
} "condition": null
},
{
"tags": ["^p$"],
"condition": {
"parent_tags": "li",
"child_tags": null,
"attrs": null
}
}
]
}, },
{ {
"preset_name": "inserter", "preset_name": "inserter",
"rules": [ "rules": [
{ {
"tags": ["pre"], "tags": ["^pre$"],
"condition": { "condition": {
"parent_tags": null, "parent_tags": null,
"child_tags": ":not(code, kbd, var)", "child_tags": ":not(:has(code, kbd, var))",
"attrs": null "attrs": null
}, },
"tag_to_insert": "code" "tag_to_insert": "code"

View File

@@ -1,17 +1,23 @@
import json
import os import os
import json
import time import time
import requests import requests
from threading import Event
from io import BytesIO from io import BytesIO
from threading import Event
from typing import List, Tuple, Dict, Union
class Access: class Access:
"""Class accessing our platform""" """Class accessing our platform"""
def __init__(self, url=None): def __init__(self, url: str = None):
""" """
:param url: str, url received from queue message, if field apiURL exists Parameters
else None ----------
url: str
url received from queue message,
if field apiURL exists
else None
""" """
self.PENDING = 1 self.PENDING = 1
self.PROCESS = 2 self.PROCESS = 2
@@ -19,6 +25,7 @@ class Access:
self.FINISH = 4 self.FINISH = 4
self.ERROR = 5 self.ERROR = 5
self.url = None
self.username = None self.username = None
self.password = None self.password = None
@@ -32,12 +39,12 @@ class Access:
self.get_token() self.get_token()
self.refreshing.set() self.refreshing.set()
def set_credentials(self, url): def set_credentials(self, url: str):
folder_path = os.path.dirname( folder_path: str = os.path.dirname(
os.path.dirname(os.path.abspath(__file__))) os.path.dirname(os.path.abspath(__file__)))
config_path = os.path.join(folder_path, "config/api_config.json") config_path: str = os.path.join(folder_path, "config/api_config.json")
with open(config_path, "r") as f: with open(config_path, "r") as f:
params = json.load(f) params: Dict[str, str] = json.load(f)
self.refreshing.clear() self.refreshing.clear()
self.url = url self.url = url
@@ -64,7 +71,7 @@ class Access:
} }
response = requests.post( response = requests.post(
f'{self.url}/token', json=json_form, f'{self.url}/token', json=json_form,
# auth=('kiryl.miatselitsa', 'iK4yXCvdyHFEEOvG2v3F') # auth=('kiryl.miatselitsa', 'iK4yXCvdyHFEEOvG2v3F')
) )
if response.status_code == 400: if response.status_code == 400:
@@ -104,7 +111,7 @@ class Access:
else: else:
raise Exception(f'{response.status_code}') raise Exception(f'{response.status_code}')
def get_file(self, file_path): def get_file(self, file_path: str) -> bytes:
"""Function downloads the file[book, preset] from site""" """Function downloads the file[book, preset] from site"""
if self.is_time_for_refreshing(): if self.is_time_for_refreshing():
self.refresh_token() self.refresh_token()
@@ -124,10 +131,10 @@ class Access:
f'status code:{response.status_code}') f'status code:{response.status_code}')
return content return content
def sleep(timeout: float, retry=3): def sleep(timeout: float, retry: int = 3):
def decorator(function): def decorator(function):
"""Decorator sleeping timeout sec and makes 3 retries""" """Decorator sleeping timeout sec and makes 3 retries"""
def wrapper(*args, **kwargs): def wrapper(*args, **kwargs) -> str:
retries = 0 retries = 0
while retries < retry: while retries < retry:
try: try:
@@ -141,14 +148,14 @@ class Access:
return decorator return decorator
@sleep(3) @sleep(3)
def send_image(self, img_path, doc_id, img_content: bytes = None): def send_image(self, img_path: str, doc_id: str, img_content: bytes = None) -> str:
"""Function sends images to site""" """Function sends images to site"""
if self.is_time_for_refreshing(): if self.is_time_for_refreshing():
self.refresh_token() self.refresh_token()
self.refreshing.wait() self.refreshing.wait()
img_obj = BytesIO(img_content) if img_content else open(img_path, 'rb') img_obj: BytesIO = BytesIO(img_content) if img_content else open(img_path, 'rb')
files = { files: Dict[str, Tuple[str, BytesIO]] = {
'image': (os.path.basename(img_path), img_obj) 'image': (os.path.basename(img_path), img_obj)
} }
response = requests.post( response = requests.post(
@@ -165,7 +172,7 @@ class Access:
f'{response.status_code} Bad request: {response.json()["message"]}.') f'{response.status_code} Bad request: {response.json()["message"]}.')
return img_url return img_url
def send_book(self, doc_id, content): def send_book(self, doc_id: int, content: Dict[str, List[Dict[str, Union[List, str]]]]):
"""Function sends the book to site""" """Function sends the book to site"""
if self.is_time_for_refreshing(): if self.is_time_for_refreshing():
self.refresh_token() self.refresh_token()
@@ -184,7 +191,7 @@ class Access:
raise Exception( raise Exception(
f'{response.status_code} Bad request: {response.json()["message"]}.') f'{response.status_code} Bad request: {response.json()["message"]}.')
def update_status(self, doc_id, status): def update_status(self, doc_id: Union[int, str], status: int):
"""Function updates status of the book on site""" """Function updates status of the book on site"""
if self.is_time_for_refreshing(): if self.is_time_for_refreshing():
self.refresh_token() self.refresh_token()

View File

@@ -3,6 +3,7 @@ import json
import codecs import codecs
import logging import logging
import pathlib import pathlib
from typing import List, Dict, Union
from abc import abstractmethod, ABCMeta from abc import abstractmethod, ABCMeta
from src.livecarta_config import LiveCartaConfig from src.livecarta_config import LiveCartaConfig
@@ -20,7 +21,7 @@ class BookSolver:
__metaclass__ = ABCMeta __metaclass__ = ABCMeta
def __init__(self, book_id=0, access=None, main_logger=None): def __init__(self, book_id: int = 0, access=None, main_logger=None):
self.book_type = None self.book_type = None
self.book_id = book_id self.book_id = book_id
self.access = access self.access = access
@@ -36,22 +37,30 @@ class BookSolver:
assert LiveCartaConfig.SUPPORTED_LEVELS == len(LiveCartaConfig.SUPPORTED_HEADERS), \ assert LiveCartaConfig.SUPPORTED_LEVELS == len(LiveCartaConfig.SUPPORTED_HEADERS), \
"Length of headers doesn't match allowed levels." "Length of headers doesn't match allowed levels."
def save_file(self, content: bytes, path_to_save, file_type): def save_file(self, content: bytes, path_to_save: str, file_type: str) -> str:
""" """
Function saves binary content of file to folder(path_to_save) Function saves binary content of file to folder(path_to_save)
Parameters Parameters
---------- ----------
content: bytes str content: bytes str
binary content of the file binary content of the file
path_to_save: str
path to the folder
file_type: str
Returns
----------
file_path: str
path to file on local
""" """
folder_path = os.path.dirname( folder_path: str = os.path.dirname(
os.path.dirname(os.path.abspath(__file__))) os.path.dirname(os.path.abspath(__file__)))
folder_path = os.path.join( folder_path = os.path.join(
folder_path, path_to_save) folder_path, path_to_save)
pathlib.Path(folder_path).mkdir(parents=True, exist_ok=True) pathlib.Path(folder_path).mkdir(parents=True, exist_ok=True)
file_path = os.path.join( file_path: str = os.path.join(
folder_path, f"{self.book_id}.{file_type}") folder_path, f"{self.book_id}.{file_type}")
try: try:
with open(file_path, "wb+") as file: with open(file_path, "wb+") as file:
@@ -68,13 +77,15 @@ class BookSolver:
def get_preset_file(self): def get_preset_file(self):
"""Method for getting and saving preset from server""" """Method for getting and saving preset from server"""
try: try:
self.logger_object.log(f"Start receiving preset file from server. URL:" pass
f" {self.access.url}/doc-convert/{self.book_id}/presets") self.preset_path = "presets/docx_presets.json"
content = self.access.get_file( # self.logger_object.log(f"Start receiving preset file from server. URL:"
file_path=f"{self.access.url}/doc-convert/{self.book_id}/presets") # f" {self.access.url}/doc-convert/{self.book_id}/presets")
self.logger_object.log("Preset file was received from server.") # content = self.access.get_file(
self.preset_path = pathlib.Path( # file_path=f"{self.access.url}/doc-convert/{self.book_id}/presets")
str(self.save_file(content, path_to_save="presets", file_type="json"))) # self.logger_object.log("Preset file was received from server.")
# self.preset_path = pathlib.Path(
# str(self.save_file(content, path_to_save="presets", file_type="json")))
except FileNotFoundError as f_err: except FileNotFoundError as f_err:
self.logger_object.log( self.logger_object.log(
"Can't get preset file from server.", logging.ERROR) "Can't get preset file from server.", logging.ERROR)
@@ -116,7 +127,7 @@ class BookSolver:
parents=True, exist_ok=True) parents=True, exist_ok=True)
self.book_output_path.touch(exist_ok=True) self.book_output_path.touch(exist_ok=True)
def write_to_json(self, content: dict): def write_to_json(self, content: Dict[str, List[Dict[str, Union[List, str]]]]):
self.check_output_directory() self.check_output_directory()
try: try:
with codecs.open(self.book_output_path, "w", encoding="utf-8") as f: with codecs.open(self.book_output_path, "w", encoding="utf-8") as f:
@@ -127,7 +138,7 @@ class BookSolver:
self.logger_object.log( self.logger_object.log(
"Error has occurred while writing .json file." + str(exc), logging.ERROR) "Error has occurred while writing .json file." + str(exc), logging.ERROR)
def send_json_content_to_server(self, content: dict): def send_json_content_to_server(self, content: Dict[str, List[Dict[str, Union[List, str]]]]):
"""Function sends json_content to site""" """Function sends json_content to site"""
try: try:
self.access.send_book(self.book_id, content) self.access.send_book(self.book_id, content)
@@ -140,7 +151,7 @@ class BookSolver:
raise exc raise exc
@abstractmethod @abstractmethod
def get_converted_book(self): def get_converted_book(self) -> Dict[str, List[Dict[str, Union[List, str]]]]:
self.logger_object.log("Beginning of processing .json output.") self.logger_object.log("Beginning of processing .json output.")
self.status_wrapper.set_generating() self.status_wrapper.set_generating()
return {} return {}
@@ -158,8 +169,9 @@ class BookSolver:
self.logger_object.log( self.logger_object.log(
f"Beginning of conversion from .{self.book_type} to .json.") f"Beginning of conversion from .{self.book_type} to .json.")
self.status_wrapper.set_processing() self.status_wrapper.set_processing()
content_dict = self.get_converted_book() content_dict: Dict[str, List[Dict[Union[str, List]]]] = self.get_converted_book()
[os.remove(path) for path in [self.preset_path, self.book_path]] # todo add delete of preset path
[os.remove(path) for path in [self.book_path]]
self.logger_object.log("Beginning of processing .json output.") self.logger_object.log("Beginning of processing .json output.")
self.status_wrapper.set_generating() self.status_wrapper.set_generating()
self.write_to_json(content_dict) self.write_to_json(content_dict)

View File

@@ -1,5 +1,5 @@
import re import re
from typing import Union from typing import List, Dict, Union
from ebooklib.epub import Section, Link from ebooklib.epub import Section, Link
from src.livecarta_config import LiveCartaConfig from src.livecarta_config import LiveCartaConfig
@@ -11,7 +11,7 @@ class NavPoint:
These are data structures which form mapping from NCX to python data structures. These are data structures which form mapping from NCX to python data structures.
""" """
def __init__(self, obj: Union[Link, Section] = None, ): def __init__(self, obj: Union[Link, Section] = None):
self.href, self.id = self.parse_href_id(obj) self.href, self.id = self.parse_href_id(obj)
self.title = obj.title self.title = obj.title
@@ -52,15 +52,15 @@ def flatten(x):
class ChapterItem: class ChapterItem:
""" """
Class of Chapter that could have subchapters Class of Chapter that could have subchapters
These are data structures which form mapping to livecarta json structure. These are data structures which form mapping to LiveCarta json structure.
""" """
def __init__(self, title, content, sub_items): def __init__(self, title: str, content: str, sub_items: List):
self.title = title self.title = title
self.content = content self.content = content
self.sub_items = sub_items self.sub_items = sub_items
def to_dict(self, lvl=1): def to_dict(self, lvl: int = 1) -> Dict[str, Union[str, List]]:
"""Function returns dictionary of chapter""" """Function returns dictionary of chapter"""
sub_dicts = [] sub_dicts = []
if self.sub_items: if self.sub_items:

View File

@@ -3,38 +3,40 @@ import logging
import pathlib import pathlib
import subprocess import subprocess
from subprocess import PIPE from subprocess import PIPE
from typing import Union
from threading import Event from threading import Event
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from src.util.helpers import BookLogger from src.util.helpers import BookLogger
class Docx2LibreHTML: class Docx2LibreHtml:
def __init__(self, book_id=0, file_path=None, access=None, logger=None, libre_locker=None): def __init__(self, book_id: int = 0, file_path: Union[pathlib.PosixPath, str] = None,
access=None, logger: BookLogger = None, libre_locker: Event = None):
self.book_id = book_id if book_id != 0 else pathlib.Path( self.book_id = book_id if book_id != 0 else pathlib.Path(
file_path).stem file_path).stem
self.file_path = file_path self.file_path = file_path
self.access = access self.access = access
self.logger_object: BookLogger = logger self.logger_object: BookLogger = logger
# critical section for occupying libreoffice by one thread # critical section for occupying libreoffice by one thread
self.libre_locker: Event() = libre_locker self.libre_locker = libre_locker
# path to html file, file appears after libre-conversion # path to html file, file appears after libre-conversion
self.html_path = self.convert_docx_to_html() self.html_path = self.convert_docx_to_html()
self.html_soup = self.read_html(self.html_path) self.html_soup = self.read_html(self.html_path)
def _libre_run(self, out_dir_path): def _libre_run(self, out_dir_path: str):
command = ["libreoffice", "--headless", command = ["libreoffice", "--headless",
"--convert-to", "html", f"{str(self.file_path)}", "--convert-to", "html", f"{str(self.file_path)}",
"--outdir", f"{out_dir_path}"] "--outdir", f"{out_dir_path}"]
print(command) # print(command)
result = subprocess.run(command, stdout=PIPE, stderr=PIPE) result = subprocess.run(command, stdout=PIPE, stderr=PIPE)
self.logger_object.log(f"Result of libre conversion for book_{self.book_id}:" self.logger_object.log(f"Result of libre conversion for book_{self.book_id}:"
f" {result.returncode}, {result.stdout}", logging.DEBUG) f" {result.returncode}, {result.stdout}", logging.DEBUG)
self.logger_object.log(f"Any error while libre conversion for book_" self.logger_object.log(f"Any error while libre conversion for book_"
f"{self.book_id}: {result.stderr}", logging.DEBUG) f"{self.book_id}: {result.stderr}", logging.DEBUG)
def convert_docx_to_html(self): def convert_docx_to_html(self) -> pathlib.Path:
""" """
Function converts .docx document to .html file. Function converts .docx document to .html file.
Steps Steps
@@ -44,18 +46,18 @@ class Docx2LibreHTML:
Returns Returns
---------- ----------
html_path: str html_path: pathlib.Path
path to html file, file appears after libre-conversion path to html file, file appears after libre-conversion
""" """
def get_and_clear_flag(out_dir_path: str): def get_and_clear_flag(html_file_path: str):
self.libre_locker.clear() self.libre_locker.clear()
self.logger_object.log(f"Got flag!", logging.DEBUG) self.logger_object.log(f"Got flag!", logging.DEBUG)
self._libre_run(out_dir_path) self._libre_run(html_file_path)
self.libre_locker.set() self.libre_locker.set()
self.logger_object.log("Cleared flag...", logging.DEBUG) self.logger_object.log("Cleared flag...", logging.DEBUG)
def check_file_exists(path, error_string: str): def check_file_exists(path: pathlib.Path, error_string: str):
try: try:
f = open(path) f = open(path)
f.close() f.close()
@@ -73,19 +75,20 @@ class Docx2LibreHTML:
folder_path = os.path.dirname( folder_path = os.path.dirname(
os.path.dirname(os.path.abspath(__file__))) os.path.dirname(os.path.abspath(__file__)))
out_dir_path = os.path.join(folder_path, f"../books/html/{self.book_id}") out_dir_path = os.path.join(
folder_path, f"../books/html/{self.book_id}")
pathlib.Path(out_dir_path).mkdir(parents=True, exist_ok=True) pathlib.Path(out_dir_path).mkdir(parents=True, exist_ok=True)
try: try:
if self.libre_locker.isSet(): if self.libre_locker.is_set():
get_and_clear_flag(out_dir_path) get_and_clear_flag(out_dir_path)
else: else:
while not self.libre_locker.isSet(): while not self.libre_locker.is_set():
self.logger_object.log( self.logger_object.log(
"Waiting for libre...", logging.DEBUG) "Waiting for libre...", logging.DEBUG)
flag = self.libre_locker.wait(50) flag = self.libre_locker.wait(50)
if flag: if flag:
if self.libre_locker.isSet(): if self.libre_locker.is_set():
get_and_clear_flag(out_dir_path) get_and_clear_flag(out_dir_path)
break break
except Exception as exc: except Exception as exc:
@@ -105,11 +108,11 @@ class Docx2LibreHTML:
f"Input file path after conversion: {html_path}.") f"Input file path after conversion: {html_path}.")
return html_path return html_path
def read_html(self, html_path): def read_html(self, html_path: pathlib.Path) -> BeautifulSoup:
"""Method for reading .html file into beautiful soup tag.""" """Method for reading .html file into beautiful soup tag."""
try: try:
html_text = open(html_path, "r", encoding="utf8").read() html_text = open(html_path, "r", encoding="utf8").read()
self.logger_object.log("HTML for book has been loaded.") self.logger_object.log("Html for book has been loaded.")
except FileNotFoundError as exc: except FileNotFoundError as exc:
self.logger_object.log("There is no html to process." self.logger_object.log("There is no html to process."
"Conversion went wrong or you specified wrong paths.", logging.ERROR) "Conversion went wrong or you specified wrong paths.", logging.ERROR)

View File

@@ -1,22 +1,25 @@
import json import json
import codecs import codecs
import logging
from threading import Event from threading import Event
from src.book_solver import BookSolver from src.book_solver import BookSolver
from src.util.helpers import BookLogger from src.util.helpers import BookLogger
from src.docx_converter.docx2libre_html import Docx2LibreHTML from src.html_presets_processor import HtmlPresetsProcessor
from src.docx_converter.html_docx_preprocessor import HTMLDocxPreprocessor from src.style_reader import StyleReader
from src.docx_converter.libre_html2json_converter import LibreHTML2JSONConverter from src.docx_converter.docx2libre_html import Docx2LibreHtml
from src.docx_converter.html_docx_processor import HtmlDocxProcessor
from src.docx_converter.libre_html2json_converter import LibreHtml2JsonConverter
class DocxBook(BookSolver): class DocxBook(BookSolver):
"""Class of .docx type book - child of BookSolver""" """Class of .docx type book - child of BookSolver"""
def __init__(self, book_id=0, access=None, main_logger=None, libre_locker=None): def __init__(self, book_id: int = 0, access=None, main_logger=None, libre_locker: Event = None):
super().__init__(book_id, access, main_logger) super().__init__(book_id, access, main_logger)
self.book_type = "docx" self.book_type = "docx"
# critical section for occupying libreoffice by one thread # critical section for occupying libreoffice by one thread
self.libre_locker: Event() = libre_locker self.libre_locker = libre_locker
def get_converted_book(self): def get_converted_book(self):
""" """
@@ -34,39 +37,67 @@ class DocxBook(BookSolver):
""" """
# 1. Converts docx to html with LibreOffice # 1. Converts docx to html with LibreOffice
html_converter = Docx2LibreHTML(self.book_id, self.book_path, self.access, try:
self.logger_object, self.libre_locker) html_converter = Docx2LibreHtml(self.book_id, self.book_path, self.access,
# todo presets self.logger_object, self.libre_locker)
except Exception as exc:
self.logger_object.log(
"Error has occurred while converting .docx to .html.", logging.ERROR)
self.logger_object.log_error_to_main_log()
self.status_wrapper.set_error()
raise exc
# 2. Parses and cleans html, gets list of tags, gets footnotes # 2. Parses and cleans html, gets list of tags, gets footnotes
parser = HTMLDocxPreprocessor( try:
html_converter.html_soup, self.logger_object) html_preprocessor = HtmlPresetsProcessor(
bs_tags, footnotes, top_level_headers = parser.process_html( logger=self.logger_object, preset_path="presets/docx_presets.json")
self.access, html_converter.html_path, self.book_id) style_preprocessor = StyleReader()
html_processor = HtmlDocxProcessor(html_soup=html_converter.html_soup,
logger=self.logger_object,
html_preprocessor=html_preprocessor,
style_preprocessor=style_preprocessor)
bs_tags, footnotes, top_level_headers = html_processor.process_html(
self.access, html_converter.html_path, self.book_id)
except Exception as exc:
self.logger_object.log(
"Error has occurred while processing .html", logging.ERROR)
self.logger_object.log_error_to_main_log()
self.status_wrapper.set_error()
raise exc
# 3. Parses from line structure to nested structure with JSONConverter # 3. Parses from line structure to nested structure with JSONConverter
json_converter = LibreHTML2JSONConverter(bs_tags, footnotes, top_level_headers, try:
self.logger_object) json_converter = LibreHtml2JsonConverter(bs_tags, footnotes, top_level_headers,
content_dict = json_converter.convert_to_dict() self.logger_object)
content_dict = json_converter.convert_to_dict()
except Exception as exc:
self.logger_object.log(
"Error has occurred while converting .html to .json", logging.ERROR)
self.logger_object.log_error_to_main_log()
self.status_wrapper.set_error()
raise exc
return content_dict return content_dict
if __name__ == "__main__": if __name__ == "__main__":
docx_file_path = "../../books/docx/music_inquiry.docx" docx_file_path = "../../books/docx/AmericanGovernment3e-WEB.docx"
logger_object = BookLogger( logger_object = BookLogger(
name="docx", book_id=docx_file_path.split("/")[-1]) name="docx", book_id=docx_file_path.split("/")[-1])
locker = Event() locker = Event()
locker.set() locker.set()
html_converter = Docx2LibreHTML(file_path=docx_file_path, html_converter = Docx2LibreHtml(file_path=docx_file_path,
logger=logger_object, libre_locker=locker) logger=logger_object, libre_locker=locker)
parser = HTMLDocxPreprocessor(html_converter.html_soup, logger_object) html_preprocessor = HtmlPresetsProcessor(
content, footnotes, top_level_headers = parser.process_html( logger=logger_object, preset_path="../../presets/docx_presets.json")
style_preprocessor = StyleReader()
html_processor = HtmlDocxProcessor(html_soup=html_converter.html_soup, logger=logger_object,
html_preprocessor=html_preprocessor, style_preprocessor=style_preprocessor)
content, footnotes, top_level_headers = html_processor.process_html(
html_path=html_converter.html_path, book_id=html_converter.book_id) html_path=html_converter.html_path, book_id=html_converter.book_id)
json_converter = LibreHTML2JSONConverter( json_converter = LibreHtml2JsonConverter(
content, footnotes, top_level_headers, logger_object) content, footnotes, top_level_headers, logger_object)
content_dict = json_converter.convert_to_dict() content_dict = json_converter.convert_to_dict()

View File

@@ -1,13 +1,14 @@
import re import re
from bs4 import BeautifulSoup, NavigableString from typing import List
from bs4 import BeautifulSoup, Tag, NavigableString
def _clean_footnote_content(content): def clean_footnote_content(content: str) -> str:
content = content.strip() content = content.strip()
return content.strip() return content.strip()
def process_footnotes(body_tag): def process_footnotes(body_tag: Tag) -> List[str]:
"""Function returns list of footnotes and delete them from html_soup.""" """Function returns list of footnotes and delete them from html_soup."""
footnote_anchors = body_tag.find_all("a", class_="sdfootnoteanc") footnote_anchors = body_tag.find_all("a", class_="sdfootnoteanc")
footnote_content = body_tag.find_all( footnote_content = body_tag.find_all(
@@ -32,7 +33,7 @@ def process_footnotes(body_tag):
new_tag = BeautifulSoup(features="lxml").new_tag("sup") new_tag = BeautifulSoup(features="lxml").new_tag("sup")
new_tag["class"] = "footnote-element" new_tag["class"] = "footnote-element"
new_tag["data-id"] = i + 1 new_tag["data-id"] = f"{i + 1}"
new_tag["id"] = f"footnote-{i + 1}" new_tag["id"] = f"footnote-{i + 1}"
new_tag.string = "*" new_tag.string = "*"
anc_tag.replace_with(new_tag) anc_tag.replace_with(new_tag)
@@ -65,9 +66,8 @@ def process_footnotes(body_tag):
else: else:
unicode_string += child.decode_contents() unicode_string += child.decode_contents()
content = _clean_footnote_content(unicode_string) content = clean_footnote_content(unicode_string)
cont_tag.decompose() cont_tag.decompose()
footnotes.append(content) footnotes.append(content)
return footnotes return footnotes

View File

@@ -1,588 +0,0 @@
import re
import logging
from typing import List
from bs4 import BeautifulSoup, NavigableString, Tag
from src.livecarta_config import LiveCartaConfig
from src.util.helpers import BookLogger, BookStatusWrapper
from src.docx_converter.footnotes_processing import process_footnotes
from src.docx_converter.image_processing import process_images
class HTMLDocxPreprocessor:
def __init__(self, html_soup, logger_object, status_wrapper=None):
self.body_tag = html_soup.body
self.html_soup = html_soup
self.logger_object: BookLogger = logger_object
self.status_wrapper: BookStatusWrapper = status_wrapper
self.top_level_headers = None
self.content = list()
def _process_toc_links(self):
def _check_parent_link_exist_in_toc(tag_with_link):
toc_links = []
for a_tag in tag_with_link.find_all("a", {"name": re.compile(r"^_Toc\d+")}):
link_name = a_tag.attrs["name"]
toc_item = self.body_tag.find("a", {"href": "#" + link_name})
if toc_item:
toc_links.append(toc_item)
return len(toc_links) > 0
"""Function to extract nodes which contains TOC links, remove links from file and detect headers."""
toc_links = self.body_tag.find_all(
"a", {"name": re.compile(r"^_Toc\d+")})
headers = [link.parent for link in toc_links]
outline_level = "1" # All the unknown outlines will be predicted as <h1>
for h_tag in headers:
if re.search(r"^h\d$", h_tag.name):
h_tag.a.unwrap()
# outline_level = tag.name[-1] # TODO: add prediction of the outline level
elif h_tag.name == "p":
exist_in_toc = _check_parent_link_exist_in_toc(h_tag)
if h_tag in self.body_tag.find_all("p") and exist_in_toc:
new_tag = BeautifulSoup(
features="lxml").new_tag("h" + outline_level)
text = h_tag.text
h_tag.replaceWith(new_tag)
new_tag.string = text
else:
# rethink document structure when you have toc_links, other cases?
self.logger_object.log(f"Something went wrong in processing toc_links."
f" Check the structure of the file. "
f"Tag name: {h_tag.name}")
def _clean_tag(self, tag: str, attr_name: str, attr_value: re):
# todo regex
"""
Function to clean tags by its name and attribute value.
Parameters
----------
tag: str
tag name to clean
attr_name: str
attribute name
attr_value: [str,re]
attribute value
Returns
-------
clean tag
"""
tags = self.body_tag.find_all(tag, {attr_name: attr_value})
for tag in tags:
if len(tag.attrs) == 1:
tag.unwrap()
def _clean_underline_links(self):
# todo regex
"""Function cleans meaningless <u> tags before links."""
underlines = self.body_tag.find_all("u")
for u in underlines:
if u.find_all("a"):
u.unwrap()
links = self.body_tag.find_all("a")
for link in links:
u = link.find_all("u")
if u and len(u) == 1:
u[0].unwrap()
@classmethod
def convert_pt_to_px(cls, value):
value = float(value)
if value == LiveCartaConfig.WORD_DEFAULT_FONT_SIZE:
return LiveCartaConfig.LIVECARTA_DEFAULT_FONT_SIZE
else:
return value
@classmethod
def convert_font_pt_to_px(cls, style: str) -> str:
"""
Function converts point in the font-size to pixels.
Parameters
----------
style: str
str with style to proces
Returns
-------
: str
str with converted style
"""
size = re.search(r"font-size: (\d{1,3})pt", style)
if size is None:
return style
size = size.group(1)
new_size = cls.convert_pt_to_px(size)
if new_size == LiveCartaConfig.LIVECARTA_DEFAULT_FONT_SIZE:
return ""
return re.sub(size + "pt", str(new_size) + "px", style)
def _font_to_span(self):
"""
Function to convert <font> tag to <span>.
If font style is default, then remove this tag.
"""
fonts = self.body_tag.find_all("font")
for font in fonts:
face, style, color =\
font.get("face"), font.get("style"), font.get("color")
font.attrs, font.name = {}, "span"
if style:
style = self.convert_font_pt_to_px(style)
if style != "":
if color and color in LiveCartaConfig.COLORS_MAP:
style += f"; color: {color};"
font.attrs["style"] = style
elif color and color in LiveCartaConfig.COLORS_MAP:
font.attrs["style"] = f"color: {color};"
if len(font.attrs) == 0:
font.unwrap()
# on this step there should be no more <font> tags
assert len(self.body_tag.find_all("font")) == 0
def clean_trash(self):
# todo make it regex dict
"""Function to remove all styles and tags we don"t need."""
self._clean_tag("span", "style", re.compile(
r"^background: #[\da-fA-F]{6}$"))
# todo: check for another languages
self._clean_tag("span", "lang", re.compile(r"^ru-RU$"))
self._clean_tag("span", "style", re.compile(
"^letter-spacing: -?[\d.]+pt$"))
self._clean_tag("font", "face", re.compile(
r"^Times New Roman[\w, ]+$"))
self._clean_tag("a", "name", "_GoBack")
self._clean_underline_links()
self._font_to_span()
# replace toc with empty <TOC> tag
tables = self.body_tag.find_all(
"div", id=re.compile(r"^Table of Contents\d+"))
for table in tables:
table.wrap(self.html_soup.new_tag("TOC"))
table.decompose()
def _preprocessing_headings(self):
# todo regex
"""Function to convert all lower level headings to p tags"""
pattern = f"^h[{LiveCartaConfig.SUPPORTED_LEVELS + 1}-9]$"
header_tags = self.body_tag.find_all(re.compile(pattern))
for tag in header_tags:
tag.name = "p"
def _process_paragraph(self):
"""Function to process <p> tags (text-align and text-indent value)."""
paragraphs = self.body_tag.find_all("p")
for p in paragraphs:
# libre converts some \n into <p> with 2 </br>
# there we remove 1 unnecessary <br>
brs = p.find_all("br")
text = p.text
if brs and text == "\n\n" and len(brs) == 2:
brs[0].decompose()
indent_should_be_added = False
if text and ((text[0:1] == "\t") or (text[:2] == "\n\t")):
indent_should_be_added = True
align = p.get("align")
style = p.get("style")
if style:
indent = re.search(r"text-indent: ([\d.]{1,4})in", style)
margin_left = re.search(r"margin-left: ([\d.]{1,4})in", style)
margin_right = re.search(
r"margin-right: ([\d.]{1,4})in", style)
margin_top = re.search(r"margin-top: ([\d.]{1,4})in", style)
margin_bottom = re.search(
r"margin-bottom: ([\d.]{1,4})in", style)
else:
indent = margin_left = margin_right = \
margin_top = margin_bottom = None
if margin_left and margin_right and margin_top and margin_bottom and \
margin_left.group(1) == "0.6" and margin_right.group(1) == "0.6" and \
margin_top.group(1) == "0.14" and margin_bottom.group(1) == "0.11":
p.wrap(BeautifulSoup(features="lxml").new_tag("blockquote"))
p.attrs = {}
style = ""
if align is not None and align != LiveCartaConfig.DEFAULT_ALIGN_STYLE:
style += f"text-align: {align};"
if indent is not None or indent_should_be_added:
# indent = indent.group(1)
style += f"text-indent: {LiveCartaConfig.INDENT};"
if style:
p.attrs["style"] = style
def _process_two_columns(self):
"""Function to process paragraphs which has two columns layout."""
two_columns = self.body_tag.find_all("div", style="column-count: 2")
for div in two_columns:
for child in div.children:
if child.name == "p":
child["class"] = "columns2"
div.unwrap()
def _process_quotes(self):
"""
Function to process block quotes.
After docx to html conversion block quotes are stored inside table with 1 cell.
All text is wrapped in a <i> tag.
Such tables will be replaced with <blockquote> tags.
<table cellpadding=\"7\" cellspacing=\"0\" width=\"614\">
<col width=\"600\"/>
<tr>
<td width=\"600\">
<p style=\"text-align: justify;\"><i>aaaaa</i></p>
<p style=\"text-align: justify;\"><br/></p>
</td>
</tr>
</table>
"""
tables = self.body_tag.find_all("table")
for table in tables:
trs = table.find_all("tr")
tds = table.find_all("td")
if len(trs) == 1 and len(tds) == 1 and tds[0].get("width") == "600":
td = tds[0]
is_zero_border = "border: none;" in td.get("style")
paragraphs = td.find_all("p")
has_i_tag_or_br = [(p.i, p.br) for p in paragraphs]
has_i_tag_or_br = [x[0] is not None or x[1] is not None
for x in has_i_tag_or_br]
if all(has_i_tag_or_br) and is_zero_border:
new_div = BeautifulSoup(
features="lxml").new_tag("blockquote")
for p in paragraphs:
new_div.append(p)
table.replaceWith(new_div)
def _process_tables(self):
"""Function to process tables. Set "border" attribute."""
tables = self.body_tag.find_all("table")
for table in tables:
tds = table.find_all("td")
sizes = []
for td in tds:
style = td.get("style")
if style:
match = re.search(r"border: ?(\d+\.?\d*)(p[tx])", style)
if match:
size = match.group(1)
units = match.group(2)
if units == "pt":
size = self.convert_pt_to_px(size)
sizes.append(float(size))
width = td.get("width")
td.attrs = {}
if width:
td.attrs["width"] = width
if sizes:
border_size = sum(sizes) / len(sizes)
table.attrs["border"] = f"{border_size:.2}"
self.tables_amount = len(tables)
def _process_hrefs(self):
a_tags_with_href = self.body_tag.find_all(
"a", {"href": re.compile("^.*http.+")})
# remove char=end of file for some editors
for tag in a_tags_with_href:
tag.string = tag.text.replace("\u200c", "")
tag["href"] = tag.attrs.get("href").replace("%E2%80%8C", "")
a_tags_with_href = self.body_tag.find_all(
"a", {"href": re.compile("^(?!#sdfootnote)")})
for tag in a_tags_with_href:
tag.string = tag.text.replace("\u200c", "")
tag.string = tag.text.replace("\u200b", "") # zero-width-space
tag["href"] = tag.attrs.get("href").replace("%E2%80%8C", "")
def _process_footer(self):
# todo regex
"""
Function to process <div title="footer"> tags.
All the tags will be deleted from file.
"""
divs = self.body_tag.find_all("div", {"title": "footer"})
for div in divs:
div.decompose()
def _process_div(self):
# todo regex
"""Function to process <div> tags. All the tags will be deleted from file, all content of the tags will stay."""
divs = self.body_tag.find_all("div")
for div in divs:
div.unwrap()
def _get_top_level_headers(self):
"""
Function for gathering info about top-level chapters.
Assume:
- Headers with the smallest outline(or digit in <h>) are top level chapters.
[ It is consistent with a recursive algorithm
for saving content to a resulted json structure,
which happens in header_to_json()]
"""
headers_info = []
header_tags = self.body_tag.find_all(re.compile("^h[1-9]$"))
headers_outline = [int(re.sub(r"^h", "", tag.name))
for tag in header_tags]
if headers_outline:
top_level_outline = min(headers_outline)
top_level_headers = [tag for tag in header_tags
if int(re.sub(r"^h", "", tag.name)) == top_level_outline]
for tag in top_level_headers:
if tag.parent.name == "li":
tag.parent.unwrap()
while tag.parent.name == "ol":
tag.parent.unwrap()
title = tag.text
title = re.sub(r"\s+", " ", title).strip()
number = re.match(r"^(?:\.?\d+\.? ?)+", title)
is_numbered = number is not None
cleaned_title = re.sub(r"[\s\xa0]", " ", tag.text)
is_introduction = cleaned_title.lower() == "introduction"
headers_info.append({
"title": cleaned_title,
"is_numbered": is_numbered,
"is_introduction": is_introduction})
return headers_info
def _mark_introduction_headers(self):
"""
Function to find out:
what header shouldn"t be numbered and can be treated as introduction chapter
Assume header(s) to be introduction if:
1. one header not numbered, before 1 numbered header
2. it is first header from the top level list, and it equals to "introduction"
Returns
-------
None
mark each top-level header with flag should_be_numbered = true/false
"""
is_numbered_header = [header["is_numbered"]
for header in self.top_level_headers]
is_title = [header["is_introduction"]
for header in self.top_level_headers]
first_not_numbered = is_numbered_header and is_numbered_header[0] == 0
second_is_numbered_or_not_exist = all(is_numbered_header[1:2])
first_header_is_introduction = is_title and is_title[0]
if (first_not_numbered and second_is_numbered_or_not_exist) or first_header_is_introduction:
self.top_level_headers[0]["should_be_numbered"] = False
for i in range(1, len(self.top_level_headers)):
self.top_level_headers[i]["should_be_numbered"] = True
else:
for i in range(0, len(self.top_level_headers)):
self.top_level_headers[i]["should_be_numbered"] = True
@staticmethod
def clean_title_from_tabs(tag: NavigableString):
cleaned = re.sub(r"[\s\xa0]", " ", tag)
this = BeautifulSoup.new_string(BeautifulSoup(
features="lxml"), cleaned, NavigableString)
tag.replace_with(this)
def apply_func_to_last_child(self, tag, func=None):
"""
works only with constructions like (((child to work with)))
where child is object of NavigableString
"""
if type(tag) is NavigableString:
func(tag)
else:
children = list(tag.children)
if children:
self.apply_func_to_last_child(children[0], func)
def _process_headings(self):
# todo regex
"""
Function to process tags <h>.
Steps
----------
1. remove <b>, <span>
2. clean text in header from numbering and \n
Returns
-------
None
processed <h> tags
"""
header_tags = self.body_tag.find_all(re.compile("^h[1-9]$"))
# 1. remove <b>, <span>
for tag in header_tags:
b_tags = tag.find_all("b")
[tag.unwrap() for tag in b_tags]
spans = tag.find_all("span")
if spans:
for span in spans:
style = span.attrs.get("style")
span.unwrap()
tag.attrs = {}
header_tags = self.body_tag.find_all(re.compile("^h[1-9]$"))
# 2. clean text in header from numbering and \n
for tag in header_tags:
if tag.parent.name == "li":
tag.parent.unwrap()
while tag.parent.name == "ol":
tag.parent.unwrap()
cleaned_title = re.sub(r"[\s\xa0]", " ", tag.text)
if cleaned_title == "":
tag.unwrap()
else:
assert tag.name in LiveCartaConfig.SUPPORTED_HEADERS, \
f"Preprocessing went wrong, there is still h{LiveCartaConfig.SUPPORTED_LEVELS + 1}-h9 headings."
content = list(tag.children)
# do not take into account rubbish empty tags like <a>, but don"t remove them
content = [item for item in content if
(type(item) is not NavigableString and item.text != "")
or (type(item) is NavigableString)]
content[0] = "" if content[0] == " " else content[0]
content = [item for item in content if item != ""]
for i, item in enumerate(content):
if type(content[i]) is NavigableString:
cleaned = re.sub(r"(\s+)+", " ", content[i])
this = BeautifulSoup.new_string(BeautifulSoup(
features="lxml"), cleaned, NavigableString)
content[i].replace_with(this)
content[i] = this
else:
self.apply_func_to_last_child(
content[i], self.clean_title_from_tabs)
def _process_lists(self):
# todo regex
"""
Function
- process tags <li>.
- unwrap <p> tags.
Returns
-------
None
uwrap <p> tag with li
"""
li_tags = self.body_tag.find_all("li")
for li_tag in li_tags:
li_tag.attrs.update(li_tag.p.attrs)
li_tag.p.unwrap()
def delete_content_before_toc(self):
# remove all tag upper the <TOC> only in content !!! body tag is not updated
toc_tag = self.html_soup.new_tag("TOC")
self.content: List[Tag] = self.body_tag.find_all(recursive=False)
if toc_tag in self.content:
ind = self.content.index(toc_tag) + 1
self.content = self.content[ind:]
def process_html(self, access=None, html_path="", book_id=0):
"""Process html code to satisfy LiveCarta formatting."""
self.logger_object.log("Beginning of processing .html file.")
try:
self.logger_object.log(f"Processing TOC and headers.")
self._process_toc_links()
self.clean_trash()
# process main elements of the .html doc
self.logger_object.log(f"Processing main elements of html.")
self._preprocessing_headings()
self._process_paragraph()
self._process_two_columns()
self.logger_object.log("Block quotes processing.")
self._process_quotes()
self.logger_object.log("Tables processing.")
self._process_tables()
self.logger_object.log(
f"{self.tables_amount} tables have been processed.")
self.logger_object.log("Hrefs processing.")
self._process_hrefs()
self.logger_object.log("Footnotes processing.")
self.footnotes = process_footnotes(self.body_tag)
self.logger_object.log(
f"{len(self.footnotes)} footnotes have been processed.")
self.logger_object.log("Image processing.")
self.images = process_images(access=access, html_path=html_path,
book_id=book_id, body_tag=self.body_tag)
self.logger_object.log(
f"{len(self.images)} images have been processed.")
self._process_footer()
self._process_div()
self.top_level_headers = self._get_top_level_headers()
self._mark_introduction_headers()
self._process_headings()
self._process_lists()
# delete text before table of content if exists
self.delete_content_before_toc()
except Exception as exc:
self.logger_object.log(
"Error has occurred while processing html.", logging.ERROR)
self.logger_object.log_error_to_main_log()
if self.status_wrapper:
self.status_wrapper.set_error()
raise exc
self.logger_object.log("End of processing .html file.")
return self.content, self.footnotes, self.top_level_headers

View File

@@ -0,0 +1,266 @@
import re
import pathlib
from typing import List, Tuple, Dict, Union
from bs4 import BeautifulSoup, Tag, NavigableString
from src.util.helpers import BookLogger
from src.html_presets_processor import _process_presets
from src.docx_converter.image_processing import process_images
from src.docx_converter.footnotes_processing import process_footnotes
from src.inline_style_processor import modify_html_soup_with_css_styles
class HtmlDocxProcessor:
def __init__(self, logger: BookLogger, html_soup: BeautifulSoup, html_preprocessor, style_preprocessor):
self.logger = logger
self.html_soup = html_soup
self.body_tag = self.html_soup.body
self.html_preprocessor = html_preprocessor
self.style_preprocessor = style_preprocessor
self.content: List[Tag] = []
def _font_to_span(self):
for font in self.body_tag.find_all("font"):
font.name = "span"
def _process_hrefs(self):
a_tags_with_href = self.body_tag.find_all(
"a", {"href": re.compile("^.*http.+")})
# remove char=end of file for some editors
for tag in a_tags_with_href:
tag.string = tag.text.replace("\u200c", "")
tag["href"] = tag.attrs.get("href").replace("%E2%80%8C", "")
a_tags_with_href = self.body_tag.find_all(
"a", {"href": re.compile("^(?!#sdfootnote)")})
for tag in a_tags_with_href:
tag.string = tag.text.replace("\u200c", "")
tag.string = tag.text.replace("\u200b", "") # zero-width-space
tag["href"] = tag.attrs.get("href").replace("%E2%80%8C", "")
def _process_toc_links(self):
"""Function to extract nodes which contains TOC links, remove links from file and detect headers."""
def _check_parent_link_exist_in_toc(tag_with_link: Tag) -> bool:
toc_links = []
for a_tag in tag_with_link.find_all("a", {"name": re.compile(r"^_Toc\d+")}):
link_name = a_tag.attrs["name"]
toc_item = self.body_tag.find("a", {"href": "#" + link_name})
if toc_item:
toc_links.append(toc_item)
return len(toc_links) > 0
toc_links = self.body_tag.find_all(
"a", {"name": re.compile(r"^_Toc\d+")})
headers = [link.parent for link in toc_links]
outline_level = "1" # All the unknown outlines will be predicted as <h1>
for tag in headers:
if re.search(r"^h\d$", tag.name):
tag.a.unwrap()
elif tag.name == "p":
exist_in_toc = _check_parent_link_exist_in_toc(tag)
if tag in self.body_tag.find_all("p") and exist_in_toc:
new_tag = BeautifulSoup(
features="lxml").new_tag("h" + outline_level)
text = tag.text
tag.replaceWith(new_tag)
new_tag.string = text
else:
# rethink document structure when you have toc_links, other cases?
self.logger.log(f"Something went wrong in processing toc_links."
f"Check the structure of the file."
f"Tag name: {tag.name}")
def _get_top_level_headers(self) -> List[Dict[str, Union[str, bool]]]:
"""
Function for gathering info about top-level chapters.
Assume: _
- Headers with the smallest outline(or digit in <h>) are top level chapters.
[It is consistent with a recursive algorithm
for saving content to a resulted json structure,
which happens in header_to_json()]
"""
headers_info = []
header_tags = self.body_tag.find_all(re.compile("^h[1-9]$"))
headers_outline = [int(re.sub(r"^h", "", tag.name))
for tag in header_tags]
if headers_outline:
top_level_outline = min(headers_outline)
top_level_headers = [tag for tag in header_tags
if int(re.sub(r"^h", "", tag.name)) == top_level_outline]
for tag in top_level_headers:
if tag.parent.name == "li":
tag.parent.unwrap()
while tag.parent.name == "ol":
tag.parent.unwrap()
title = tag.text
title = re.sub(r"\s+", " ", title).strip()
number = re.match(r"^(?:\.?\d+\.? ?)+", title)
is_numbered = number is not None
cleaned_title = re.sub(r"[\s\xa0]", " ", tag.text)
is_introduction = cleaned_title.lower() == "introduction"
headers_info.append({
"title": cleaned_title,
"is_numbered": is_numbered,
"is_introduction": is_introduction})
return headers_info
@staticmethod
def _mark_introduction_headers(top_level_headers: List[Dict[str, Union[str, bool]]]):
"""
Function to find out:
what header shouldn't be numbered and can be treated as introduction chapter
Assume header(s) to be introduction if:
1. one header not numbered, before 1 numbered header
2. it is first header from the top level list, and it equals to "introduction"
Returns
-------
None
mark each top-level header with flag should_be_numbered = true/false
"""
is_numbered_header = [header["is_numbered"]
for header in top_level_headers]
is_title = [header["is_introduction"]
for header in top_level_headers]
first_not_numbered = is_numbered_header and is_numbered_header[0] == 0
second_is_numbered_or_not_exist = all(is_numbered_header[1:2])
first_header_is_introduction = is_title and is_title[0]
if (first_not_numbered and second_is_numbered_or_not_exist) or first_header_is_introduction:
top_level_headers[0]["should_be_numbered"] = False
for i in range(1, len(top_level_headers)):
top_level_headers[i]["should_be_numbered"] = True
else:
for i in range(0, len(top_level_headers)):
top_level_headers[i]["should_be_numbered"] = True
@staticmethod
def clean_title_from_tabs(tag: NavigableString):
cleaned = re.sub(r"[\s\xa0]", " ", tag)
this = BeautifulSoup.new_string(BeautifulSoup(
features="lxml"), cleaned, NavigableString)
tag.replace_with(this)
def apply_func_to_last_child(self, tag: Union[NavigableString, Tag], func=None):
"""
works only with constructions like (((child to work with)))
where child is object of NavigableString
"""
if type(tag) is NavigableString:
func(tag)
elif list(tag.children):
self.apply_func_to_last_child(list(tag.children)[0], func)
def _process_headings(self):
"""
Function to process tags <h>.
Clean header from attrs and text in header from numbering and \n
Returns
-------
None
processed <h> tags
"""
header_tags = self.body_tag.find_all(re.compile("^h[1-5]$"))
# clean header from attrs and text in header from numbering and \n
for h_tag in header_tags:
h_tag.attrs = {}
for tag in h_tag.find_all():
tag.attrs = {}
if h_tag.parent.name == "li":
h_tag.parent.unwrap()
while h_tag.parent.name == "ol":
h_tag.parent.unwrap()
cleaned_title = re.sub(r"[\s\xa0]", " ", h_tag.text)
if cleaned_title != "":
content = list(h_tag.children)
# do not take into account rubbish empty tags like <a>, but don"t remove them
content = [item for item in content if
(type(item) is not NavigableString and item.text != "")
or (type(item) is NavigableString)]
content[0] = "" if content[0] == " " else content[0]
content = [item for item in content if item != ""]
for i, item in enumerate(content):
if type(content[i]) is NavigableString:
cleaned = re.sub(r"(\s+)+", " ", content[i])
this = BeautifulSoup.new_string(BeautifulSoup(
features="lxml"), cleaned, NavigableString)
content[i].replace_with(this)
content[i] = this
else:
self.apply_func_to_last_child(
content[i], self.clean_title_from_tabs)
else:
h_tag.unwrap()
def delete_content_before_toc(self):
# remove all tag upper the <TOC> only in content !!! body tag is not updated
toc_tag = self.html_soup.new_tag("TOC")
if toc_tag in self.content:
ind = self.content.index(toc_tag) + 1
self.content = self.content[ind:]
def process_html(self,
access=None,
html_path: pathlib.Path = "",
book_id: int = 0) -> Tuple[List[Tag], List[str], List[Dict[str, Union[str, bool]]]]:
"""Process html to satisfy LiveCarta formatting."""
self.logger.log("Beginning of processing .html file.")
# Process styles doesn't see <fonts> because they aren't supported by html
self._font_to_span()
self.logger.log("Inline style reading.")
self.style_preprocessor.process_inline_styles_in_html_soup(
self.body_tag)
self.logger.log("Inline style processing.")
modify_html_soup_with_css_styles(self.body_tag)
self.logger.log("Image processing.")
images = process_images(access, path_to_html=html_path,
book_id=book_id, body_tag=self.body_tag)
self.logger.log(
f"{len(images)} images have been processed.")
self.logger.log("Footnotes processing.")
footnotes: List[str] = process_footnotes(self.body_tag)
self.logger.log(
f"{len(footnotes)} footnotes have been processed.")
self.logger.log("Hrefs processing.")
self._process_hrefs()
self.logger.log(f"TOC processing.")
self._process_toc_links()
top_level_headers: List[Dict[str, Union[str, bool]]]\
= self._get_top_level_headers()
self._mark_introduction_headers(top_level_headers)
self._process_headings()
self.logger.log(f".html using presets processing.")
_process_presets(html_preprocessor=self.html_preprocessor,
html_soup=self.html_soup)
self.content = self.body_tag.find_all(recursive=False)
# delete text before table of content if exists
self.delete_content_before_toc()
self.logger.log("End of processing .html file.")
return self.content, footnotes, top_level_headers

View File

@@ -1,9 +1,29 @@
import os import os
import pathlib import pathlib
from bs4 import Tag
from typing import Union, List
from shutil import copyfile from shutil import copyfile
def process_images(access, html_path, book_id, body_tag): def save_image_to_aws(access, img_file_path: str, book_id: int) -> str:
"""Function saves all images to Amazon web service"""
link_path: str = access.send_image(img_file_path, doc_id=book_id)
return link_path
def save_image_locally(img_file_path: str, book_id: int) -> pathlib.Path:
"""Function saves all images locally"""
folder_path = os.path.dirname(
os.path.dirname(os.path.abspath(__file__)))
new_path = pathlib.Path(os.path.join(
folder_path, f"../books/json/img_{book_id}/"))
new_path.mkdir(exist_ok=True)
img_folder_path = new_path / os.path.basename(img_file_path)
copyfile(img_file_path, img_folder_path)
return img_folder_path
def process_images(access, path_to_html: Union[pathlib.Path, str], book_id: int, body_tag: Tag) -> List:
""" """
Function to process <img> tag. Function to process <img> tag.
Img should be sent Amazon S3 and then return new tag with valid link. Img should be sent Amazon S3 and then return new tag with valid link.
@@ -12,23 +32,18 @@ def process_images(access, html_path, book_id, body_tag):
""" """
img_tags = body_tag.find_all("img") img_tags = body_tag.find_all("img")
for img in img_tags: for img in img_tags:
img_name = img.attrs.get("src") path_to_img_from_html = img.attrs.get("src")
# quick fix for bad links # quick fix for bad links
if (len(img_name) >= 3) and img_name[:3] == "../": if (len(path_to_img_from_html) >= 3) and path_to_img_from_html[:3] == "../":
img_name = img_name[3:] path_to_img_from_html = path_to_img_from_html[3:]
img_path = pathlib.Path(f"{html_path.parent}", f"{img_name}") html_folder = os.path.dirname(path_to_html)
path_to_img_from_root = os.path.normpath(os.path.join(
html_folder, path_to_img_from_html)).replace("\\", "/")
if access is not None: if access is not None:
link = access.send_image(img_path, doc_id=book_id) img_folder_path = save_image_to_aws(
img.attrs["src"] = link access, path_to_img_from_root, book_id)
else: else:
if img_tags.index(img) == 0: img_folder_path = save_image_locally(
folder_path = os.path.dirname( path_to_img_from_root, book_id)
os.path.dirname(os.path.abspath(__file__))) img.attrs["src"] = str(img_folder_path)
new_path = pathlib.Path(os.path.join(
folder_path, f"../books/json/img_{book_id}/"))
new_path.mkdir(exist_ok=True)
new_img_path = new_path / img_name
copyfile(img_path, new_img_path)
img.attrs["src"] = str(new_img_path)
return img_tags return img_tags

View File

@@ -1,12 +1,15 @@
import re import re
import logging import logging
from copy import copy from copy import copy
from typing import List, Tuple, Dict, Union
from bs4 import Tag
from src.livecarta_config import LiveCartaConfig from src.livecarta_config import LiveCartaConfig
class LibreHTML2JSONConverter: class LibreHtml2JsonConverter:
def __init__(self, content, footnotes, top_level_headers, logger_object, book_api_status=None): def __init__(self, content: List[Tag], footnotes: List[str], top_level_headers: List[Dict[str, Union[str, bool]]],
logger_object, book_api_status=None):
self.content_dict = None self.content_dict = None
self.content = content self.content = content
self.footnotes = footnotes self.footnotes = footnotes
@@ -33,7 +36,7 @@ class LibreHTML2JSONConverter:
return new_text return new_text
# TODO: rethink the function structure without indexes. # TODO: rethink the function structure without indexes.
def header_to_livecarta_chapter_item(self, ind) -> (dict, int): def header_to_livecarta_chapter_item(self, ind: int) -> Union[Tuple[Dict[str, Union[str, List]], int], str]:
""" """
Function process header and collects all content for it. Function process header and collects all content for it.
Parameters Parameters
@@ -90,7 +93,7 @@ class LibreHTML2JSONConverter:
return "" return ""
@staticmethod @staticmethod
def _is_empty_p_tag(tag): def _is_empty_p_tag(tag: Tag) -> bool:
if tag.name != "p": if tag.name != "p":
return False return False
@@ -102,7 +105,6 @@ class LibreHTML2JSONConverter:
text = re.sub(r"\s+", "", temp_tag.text) text = re.sub(r"\s+", "", temp_tag.text)
if text: if text:
return False return False
return True return True
def convert_to_dict(self): def convert_to_dict(self):
@@ -148,9 +150,7 @@ class LibreHTML2JSONConverter:
# Add is_introduction field to json structure # Add is_introduction field to json structure
# after deleting content before toc, some chapters can be deleted # after deleting content before toc, some chapters can be deleted
if self.top_level_headers: if self.top_level_headers:
same_first_titles = self.top_level_headers[0]["title"] == json_strc[0]["title"]
is_first_header_introduction = not self.top_level_headers[0]["should_be_numbered"] is_first_header_introduction = not self.top_level_headers[0]["should_be_numbered"]
json_strc[0]["is_introduction"] = is_first_header_introduction json_strc[0]["is_introduction"] = is_first_header_introduction
self.content_dict = { self.content_dict = {

View File

@@ -1,34 +1,32 @@
import re import re
import json
import codecs
import ebooklib import ebooklib
from ebooklib import epub
from ebooklib.epub import Link, Section
from os import path from os import path
from pathlib import Path from pathlib import Path
from ebooklib import epub
from ebooklib.epub import Link, Section
from itertools import chain from itertools import chain
from premailer import transform
from collections import defaultdict from collections import defaultdict
from typing import Dict, Union, List from typing import List, Tuple, Dict, Union
from bs4 import BeautifulSoup, NavigableString, Tag from bs4 import BeautifulSoup, Tag, NavigableString
from src.util.helpers import BookLogger from src.util.helpers import BookLogger
from src.epub_converter.css_processor import CSSPreprocessor
from src.epub_converter.html_epub_processor import HtmlEpubPreprocessor
from src.livecarta_config import LiveCartaConfig from src.livecarta_config import LiveCartaConfig
from src.data_objects import ChapterItem, NavPoint from src.data_objects import ChapterItem, NavPoint
from src.style_reader import StyleReader
from src.epub_converter.html_epub_processor import HtmlEpubProcessor
from src.epub_converter.image_processing import update_images_src_links from src.epub_converter.image_processing import update_images_src_links
from src.epub_converter.footnotes_processing import preprocess_footnotes from src.epub_converter.footnotes_processing import preprocess_footnotes
from src.epub_converter.tag_inline_style_processor import TagInlineStyleProcessor from src.inline_style_processor import modify_html_soup_with_css_styles
class EpubConverter: class EpubConverter:
def __init__(self, book_path, access=None, logger=None, css_processor=None, html_processor=None): def __init__(self, book_path, access=None, logger: BookLogger = None,
style_processor: StyleReader = None, html_processor: HtmlEpubProcessor = None):
self.book_path = book_path self.book_path = book_path
self.access = access self.access = access
self.logger: BookLogger = logger self.logger: BookLogger = logger
self.ebooklib_book = epub.read_epub(book_path) self.ebooklib_book = epub.read_epub(book_path)
self.css_processor = css_processor self.style_processor = style_processor
self.html_processor = html_processor self.html_processor = html_processor
# main container for all epub .xhtml files # main container for all epub .xhtml files
@@ -39,7 +37,8 @@ class EpubConverter:
# toc tree structure stored as adj.list (NavPoint to list of NavPoints) # toc tree structure stored as adj.list (NavPoint to list of NavPoints)
# key = -1 for top level NavPoints # key = -1 for top level NavPoints
self.adjacency_list: Dict[Union[NavPoint, -1], Union[list, None]] = {} self.adjacency_list: Dict[Union[NavPoint, -1],
Union[List[NavPoint], None]] = {}
# list to offset Chapter_i on 1st level # list to offset Chapter_i on 1st level
self.offset_sub_nodes = [] self.offset_sub_nodes = []
@@ -58,6 +57,18 @@ class EpubConverter:
self.noterefs: List[Tag] = [] # start of the footnote self.noterefs: List[Tag] = [] # start of the footnote
self.footnotes: List[Tag] = [] # end of the footnote self.footnotes: List[Tag] = [] # end of the footnote
self.logger.log("HTML files reading.")
self.html_href2html_body_soup: Dict[str,
BeautifulSoup] = self.build_href2soup_content()
self.logger.log("CSS inline style processing.")
[self.style_processor.process_inline_styles_in_html_soup(
self.html_href2html_body_soup[html_href]) for html_href in self.html_href2html_body_soup]
self.logger.log("CSS files processing.")
self.html_href2css_href, self.css_href2css_content = self.build_html_and_css_relations()
self.logger.log("CSS styles fusion(inline+file).")
self.add_css_styles_to_html_soup()
self.logger.log("Image processing.") self.logger.log("Image processing.")
for x in chain(self.ebooklib_book.get_items_of_type(ebooklib.ITEM_IMAGE), for x in chain(self.ebooklib_book.get_items_of_type(ebooklib.ITEM_IMAGE),
self.ebooklib_book.get_items_of_type(ebooklib.ITEM_COVER)): self.ebooklib_book.get_items_of_type(ebooklib.ITEM_COVER)):
@@ -65,17 +76,6 @@ class EpubConverter:
content = x.content content = x.content
self.img_href2img_bytes[file_name] = content self.img_href2img_bytes[file_name] = content
self.logger.log("HTML files reading.")
self.html_href2html_body_soup: Dict[str,
BeautifulSoup] = self.build_href2soup_content()
self.logger.log("CSS inline style processing.")
self.css_processor.process_inline_styles_in_html_soup(self.html_href2html_body_soup)
self.logger.log("CSS files processing.")
self.html_href2css_href, self.css_href2css_content = self.build_html_and_css_relations()
self.logger.log("CSS styles fusion(inline+file).")
self.add_css_styles_to_html_soup()
self.logger.log("Footnotes processing.") self.logger.log("Footnotes processing.")
for href in self.html_href2html_body_soup: for href in self.html_href2html_body_soup:
self.footnotes_contents, self.noterefs, self.footnotes =\ self.footnotes_contents, self.noterefs, self.footnotes =\
@@ -107,7 +107,6 @@ class EpubConverter:
def build_href2soup_content(self) -> Dict[str, BeautifulSoup]: def build_href2soup_content(self) -> Dict[str, BeautifulSoup]:
# using EpubElements # using EpubElements
# for now just for HTML objects, as it is the simplest chapter # for now just for HTML objects, as it is the simplest chapter
nodes = dict() nodes = dict()
for item in self.ebooklib_book.get_items_of_type(ebooklib.ITEM_DOCUMENT): for item in self.ebooklib_book.get_items_of_type(ebooklib.ITEM_DOCUMENT):
html_body_text = item.get_body_content() html_body_text = item.get_body_content()
@@ -116,7 +115,7 @@ class EpubConverter:
nodes[item.file_name] = soup nodes[item.file_name] = soup
return nodes return nodes
def build_html_and_css_relations(self) -> tuple[dict, dict]: def build_html_and_css_relations(self) -> Tuple[Dict[str, List[str]], Dict[str, str]]:
""" """
Function is designed to get 2 dictionaries: Function is designed to get 2 dictionaries:
The first is html_href2css_href. It is created to connect href of html to css files(hrefs of them The first is html_href2css_href. It is created to connect href of html to css files(hrefs of them
@@ -130,8 +129,8 @@ class EpubConverter:
""" """
# dictionary: href of html to related css files # dictionary: href of html to related css files
html_href2css_href: defaultdict = defaultdict(list) html_href2css_href: Dict[str, List[str]] = defaultdict(list)
css_href2css_content: dict = {} css_href2css_content: Dict[str, str] = {}
for item in self.ebooklib_book.get_items_of_type(ebooklib.ITEM_DOCUMENT): for item in self.ebooklib_book.get_items_of_type(ebooklib.ITEM_DOCUMENT):
html_content = item.content html_content = item.content
@@ -146,54 +145,16 @@ class EpubConverter:
html_href2css_href[html_href].append(css_href) html_href2css_href[html_href].append(css_href)
if css_href not in css_href2css_content: if css_href not in css_href2css_content:
# css_href not in css_href2css_content, add to this dict # css_href not in css_href2css_content, add to this dict
css_href2css_content[css_href] = self.css_processor.build_css_file_content( css_href2css_content[css_href] = self.style_processor.build_css_file_content(
self.css_processor.get_css_content(css_href, html_href, self.ebooklib_book)) self.style_processor.get_css_content(css_href, html_href, self.ebooklib_book))
for i, tag in enumerate(soup_html_content.find_all("style")): for i, tag in enumerate(soup_html_content.find_all("style")):
css_content = tag.string css_content = tag.string
html_href2css_href[html_href].append(f"href{i}") html_href2css_href[html_href].append(f"href{i}")
css_href2css_content[f"href{i}"] = self.css_processor.build_css_file_content( css_href2css_content[f"href{i}"] = self.style_processor.build_css_file_content(
css_content) css_content)
return html_href2css_href, css_href2css_content return html_href2css_href, css_href2css_content
@staticmethod
def modify_html_soup_with_css_styles(html_soup: BeautifulSoup, css_text: str) -> BeautifulSoup:
"""
Function adds styles from .css to inline style.
Parameters
----------
html_soup: BeautifulSoup
html page with inline style
css_text: str
css content from css file
Returns
-------
inline_soup: BeautifulSoup
soup with styles from css
"""
# remove this specification because it causes problems
css_text = css_text.replace(
'@namespace epub "http://www.idpf.org/2007/ops";', '')
# here we add css styles to inline style
html_with_css_styles: str = transform(str(html_soup), css_text=css_text,
remove_classes=False,
external_styles=False,
allow_network=False,
disable_validation=True,
)
# soup with converted styles from css
inline_soup = BeautifulSoup(html_with_css_styles, features="lxml")
tags_with_inline_style = inline_soup.find_all(LiveCartaConfig.could_have_style_in_livecarta_regexp,
attrs={"style": re.compile(".*")})
# go through the tags with inline style + style parsed from css file
for tag_inline_style in tags_with_inline_style:
style_converter = TagInlineStyleProcessor(tag_inline_style)
style_converter.convert_initial_tag()
return inline_soup
def add_css_styles_to_html_soup(self): def add_css_styles_to_html_soup(self):
""" """
This function is designed to update html_href2html_body_soup This function is designed to update html_href2html_body_soup
@@ -209,11 +170,13 @@ class EpubConverter:
for css_href in self.html_href2css_href[html_href]: for css_href in self.html_href2css_href[html_href]:
css += self.css_href2css_content[css_href] css += self.css_href2css_content[css_href]
html_content: BeautifulSoup = self.html_href2html_body_soup[html_href] html_content: BeautifulSoup = self.html_href2html_body_soup[html_href]
html_content = self.modify_html_soup_with_css_styles( html_content = modify_html_soup_with_css_styles(
html_content, css) html_content, css)
self.html_href2html_body_soup[html_href] = html_content self.html_href2html_body_soup[html_href] = html_content
def build_adjacency_list_from_toc(self, element: [Link, tuple, list], lvl=0): def build_adjacency_list_from_toc(self,
element: Union[Link, Tuple[Section, List], List[Union[Link, Tuple]]],
lvl: int = 0) -> NavPoint:
""" """
Function Function
self.adjacency_list builds based on TOC nested structure, got from self.ebooklib.toc self.adjacency_list builds based on TOC nested structure, got from self.ebooklib.toc
@@ -254,7 +217,7 @@ class EpubConverter:
sub_nodes = [] sub_nodes = []
for elem in second: for elem in second:
if (bool(re.search('^section$|^part$', first.title.lower()))) and lvl == 1: if (bool(re.search("^section$|^part$", first.title.lower()))) and lvl == 1:
self.offset_sub_nodes.append( self.offset_sub_nodes.append(
self.build_adjacency_list_from_toc(elem, lvl)) self.build_adjacency_list_from_toc(elem, lvl))
else: else:
@@ -288,7 +251,7 @@ class EpubConverter:
return False return False
def build_adjacency_list_from_spine(self): def build_adjacency_list_from_spine(self):
def build_manifest_id2html_href() -> dict: def build_manifest_id2html_href() -> Dict[int, str]:
links = dict() links = dict()
for item in self.ebooklib_book.get_items_of_type(ebooklib.ITEM_DOCUMENT): for item in self.ebooklib_book.get_items_of_type(ebooklib.ITEM_DOCUMENT):
links[item.id] = item.file_name links[item.id] = item.file_name
@@ -304,7 +267,7 @@ class EpubConverter:
self.adjacency_list[-1].append(nav_point) self.adjacency_list[-1].append(nav_point)
self.hrefs_added_to_toc.add(nav_point.href) self.hrefs_added_to_toc.add(nav_point.href)
def add_not_added_files_to_adjacency_list(self, not_added: list): def add_not_added_files_to_adjacency_list(self, not_added: List[str]):
"""Function add files that not added to adjacency list""" """Function add files that not added to adjacency list"""
for i, file in enumerate(not_added): for i, file in enumerate(not_added):
nav_point = NavPoint( nav_point = NavPoint(
@@ -315,7 +278,7 @@ class EpubConverter:
def label_subchapters_with_lc_tag(self): def label_subchapters_with_lc_tag(self):
for html_href in self.html_href2html_body_soup: for html_href in self.html_href2html_body_soup:
ids, soup = self.html_href2subchapters_ids[html_href], \ ids, soup = self.html_href2subchapters_ids[html_href], \
self.html_href2html_body_soup[html_href] self.html_href2html_body_soup[html_href]
for i in ids: for i in ids:
tag = soup.find(id=i) tag = soup.find(id=i)
tmp_tag = soup.new_tag("lc_tmp") tmp_tag = soup.new_tag("lc_tmp")
@@ -345,10 +308,13 @@ class EpubConverter:
mark.parent.unwrap() mark.parent.unwrap()
@staticmethod @staticmethod
def create_unique_id(href, id_): def create_unique_id(href: str, id_: str) -> str:
return re.sub(r"([^\w\s])|_|-", "", href) + re.sub(r"[_-]", "0", id_) return re.sub(r"([^\w\s])|_|-", "", href) + re.sub(r"[_-]", "0", id_)
def match_href_to_path_from_toc(self, cur_file_path: str, href_in_link: str, internal_link_tag: Tag) -> [None, str]: def match_href_to_path_from_toc(self,
cur_file_path: str,
href_in_link: str,
internal_link_tag: Tag) -> Union[None, str]:
""" """
Function used to find full path to file that is parsed from tag link Function used to find full path to file that is parsed from tag link
TOC: a/b/c.xhtml TOC: a/b/c.xhtml
@@ -387,7 +353,7 @@ class EpubConverter:
return full_path[0] return full_path[0]
@staticmethod @staticmethod
def create_new_anchor_span(soup, id_): def create_new_anchor_span(soup: BeautifulSoup, id_: str) -> Tag:
new_anchor_span = soup.new_tag("span") new_anchor_span = soup.new_tag("span")
new_anchor_span.attrs["id"] = id_ new_anchor_span.attrs["id"] = id_
new_anchor_span.attrs["class"] = "link-anchor" new_anchor_span.attrs["class"] = "link-anchor"
@@ -415,7 +381,8 @@ class EpubConverter:
for toc_href in self.hrefs_added_to_toc: for toc_href in self.hrefs_added_to_toc:
for tag in self.html_href2html_body_soup[toc_href].find_all(attrs={"id": re.compile(r".+")}): for tag in self.html_href2html_body_soup[toc_href].find_all(attrs={"id": re.compile(r".+")}):
if tag.attrs.get("class") not in ["converter-chapter-mark", "footnote-element"]: if tag.attrs.get("class") not in ["converter-chapter-mark", "footnote-element"]:
new_id = self.create_unique_id(toc_href, tag.attrs["id"]) new_id = self.create_unique_id(
toc_href, tag.attrs["id"])
tag.attrs["id"] = new_id tag.attrs["id"] = new_id
def process_file_anchor(): def process_file_anchor():
@@ -427,11 +394,13 @@ class EpubConverter:
a_tag_href_matched_to_toc = self.match_href_to_path_from_toc( a_tag_href_matched_to_toc = self.match_href_to_path_from_toc(
toc_href, a_tag_href, internal_link_tag) toc_href, a_tag_href, internal_link_tag)
if a_tag_href_matched_to_toc: if a_tag_href_matched_to_toc:
new_id = self.create_unique_id(a_tag_href_matched_to_toc, "") new_id = self.create_unique_id(
a_tag_href_matched_to_toc, "")
internal_link_tag.attrs["placeholder"] = "{{tempStyleToAnchor-" + new_id + "}}" internal_link_tag.attrs["placeholder"] = "{{tempStyleToAnchor-" + new_id + "}}"
if new_id not in self.internal_anchors: if new_id not in self.internal_anchors:
anchor_soup = self.html_href2html_body_soup[a_tag_href_matched_to_toc] anchor_soup = self.html_href2html_body_soup[a_tag_href_matched_to_toc]
new_anchor_span = self.create_new_anchor_span(soup, new_id) new_anchor_span = self.create_new_anchor_span(
soup, new_id)
# insert a new span to the beginning of the file # insert a new span to the beginning of the file
anchor_soup.insert(0, new_anchor_span) anchor_soup.insert(0, new_anchor_span)
self.internal_anchors.add(new_id) self.internal_anchors.add(new_id)
@@ -442,7 +411,8 @@ class EpubConverter:
soup = self.html_href2html_body_soup[toc_href] soup = self.html_href2html_body_soup[toc_href]
# process_file_element_anchor # process_file_element_anchor
for internal_link_tag in soup.find_all("a", {"href": re.compile(r"(^.+\.(htm|html|xhtml)#.+)|(^#.+)")}): for internal_link_tag in soup.find_all("a", {"href": re.compile(r"(^.+\.(htm|html|xhtml)#.+)|(^#.+)")}):
a_tag_href, a_tag_id = internal_link_tag.attrs["href"].split("#") a_tag_href, a_tag_id = internal_link_tag.attrs["href"].split(
"#")
a_tag_href_matched_to_toc = self.match_href_to_path_from_toc( a_tag_href_matched_to_toc = self.match_href_to_path_from_toc(
toc_href, a_tag_href, internal_link_tag) if a_tag_href \ toc_href, a_tag_href, internal_link_tag) if a_tag_href \
else path.normpath(toc_href).replace("\\", "/") else path.normpath(toc_href).replace("\\", "/")
@@ -452,7 +422,8 @@ class EpubConverter:
anchor_soup = self.html_href2html_body_soup[a_tag_href_matched_to_toc] anchor_soup = self.html_href2html_body_soup[a_tag_href_matched_to_toc]
anchor_tags = anchor_soup.find_all(attrs={"id": new_id}) or \ anchor_tags = anchor_soup.find_all(attrs={"id": new_id}) or \
anchor_soup.find_all(attrs={"id": a_tag_id}) # if link is a footnote anchor_soup.find_all(
attrs={"id": a_tag_id}) # if link is a footnote
if anchor_tags: if anchor_tags:
if len(anchor_tags) > 1: if len(anchor_tags) > 1:
self.logger.log(f"Warning in {toc_href}: multiple anchors:" self.logger.log(f"Warning in {toc_href}: multiple anchors:"
@@ -487,7 +458,9 @@ class EpubConverter:
process_file_element_anchor() process_file_element_anchor()
@staticmethod @staticmethod
def get_tags_between_chapter_marks(first_id: str, href: str, html_soup: BeautifulSoup) -> list: def get_tags_between_chapter_marks(first_id: str,
href: str,
html_soup: BeautifulSoup) -> List[Union[Tag, NavigableString]]:
""" """
Get tags between LiveCarta chapter marks Get tags between LiveCarta chapter marks
Parameters Parameters
@@ -568,7 +541,7 @@ class EpubConverter:
for tl_nav_point in top_level_nav_points: for tl_nav_point in top_level_nav_points:
self.detect_one_chapter(tl_nav_point) self.detect_one_chapter(tl_nav_point)
def html_node_to_livecarta_chapter_item(self, nav_point: NavPoint, lvl=1) -> ChapterItem: def html_node_to_livecarta_chapter_item(self, nav_point: NavPoint, lvl: int = 1) -> ChapterItem:
""" """
Function prepare style, tags to json structure Function prepare style, tags to json structure
Parameters Parameters
@@ -584,18 +557,18 @@ class EpubConverter:
built chapter built chapter
""" """
title = nav_point.title title: str = nav_point.title
content: BeautifulSoup = self.href_chapter_id2soup_html[(nav_point.href, nav_point.id)] \ content: BeautifulSoup = self.href_chapter_id2soup_html[(nav_point.href, nav_point.id)] \
if nav_point.id else self.html_href2html_body_soup[nav_point.href] if nav_point.id else self.html_href2html_body_soup[nav_point.href]
indent = " " * lvl indent: str = " " * lvl
self.logger.log(indent + f"Chapter: {title} is processing.") self.logger.log(indent + f"Chapter: {title} is processing.")
is_chapter = lvl <= LiveCartaConfig.SUPPORTED_LEVELS is_chapter: bool = lvl <= LiveCartaConfig.SUPPORTED_LEVELS
self.logger.log(indent + "Process title.") self.logger.log(indent + "Process title.")
title_preprocessed = self.html_processor.prepare_title(title) title_preprocessed: str = self.html_processor.prepare_title(title)
self.logger.log(indent + "Process content.") self.logger.log(indent + "Process content.")
content_preprocessed = self.html_processor.prepare_content(title_preprocessed, content, content_preprocessed: Union[Tag, BeautifulSoup] = self.html_processor.prepare_content(
remove_title_from_chapter=is_chapter) title_preprocessed, content, remove_title_from_chapter=is_chapter)
self.book_image_src_path2aws_path = update_images_src_links(content_preprocessed, self.book_image_src_path2aws_path = update_images_src_links(content_preprocessed,
self.img_href2img_bytes, self.img_href2img_bytes,
@@ -613,7 +586,7 @@ class EpubConverter:
sub_nodes.append(sub_chapter_item) sub_nodes.append(sub_chapter_item)
return ChapterItem(title_preprocessed, str(content_preprocessed), sub_nodes) return ChapterItem(title_preprocessed, str(content_preprocessed), sub_nodes)
def convert_to_dict(self) -> dict: def convert_to_dict(self) -> Dict[str, List[Dict[str, Union[List, str]]]]:
"""Function which convert list of html nodes to appropriate json structure""" """Function which convert list of html nodes to appropriate json structure"""
top_level_nav_points = self.adjacency_list[-1] top_level_nav_points = self.adjacency_list[-1]
top_level_chapters = [] top_level_chapters = []
@@ -630,19 +603,3 @@ class EpubConverter:
"content": top_level_dict_chapters, "content": top_level_dict_chapters,
"footnotes": self.footnotes_contents "footnotes": self.footnotes_contents
} }
if __name__ == "__main__":
epub_file_path = "../../books/epub/9780763774134.epub"
logger_object = BookLogger(
name="epub", book_id=epub_file_path.split("/")[-1])
css_processor = CSSPreprocessor()
html_processor = HtmlEpubPreprocessor(logger=logger_object)
json_converter = EpubConverter(epub_file_path, logger=logger_object,
css_processor=css_processor, html_processor=html_processor)
content_dict = json_converter.convert_to_dict()
with codecs.open(epub_file_path.replace("epub", "json"), "w", encoding="utf-8") as f_json:
json.dump(content_dict, f_json, ensure_ascii=False)

View File

@@ -1,13 +1,18 @@
import json
import codecs
from src.book_solver import BookSolver from src.book_solver import BookSolver
from src.epub_converter.css_processor import CSSPreprocessor from src.util.helpers import BookLogger
from src.epub_converter.html_epub_processor import HtmlEpubPreprocessor from src.html_presets_processor import HtmlPresetsProcessor
from src.style_reader import StyleReader
from src.epub_converter.html_epub_processor import HtmlEpubProcessor
from src.epub_converter.epub_converter import EpubConverter from src.epub_converter.epub_converter import EpubConverter
class EpubBook(BookSolver): class EpubBook(BookSolver):
"""Class of .epub type book - child of BookSolver""" """Class of .epub type book - child of BookSolver"""
def __init__(self, book_id=0, access=None, main_logger=None): def __init__(self, book_id: int = 0, access=None, main_logger=None):
super().__init__(book_id, access, main_logger) super().__init__(book_id, access, main_logger)
self.book_type = "epub" self.book_type = "epub"
@@ -16,10 +21,8 @@ class EpubBook(BookSolver):
Function Function
Steps Steps
---------- ----------
1. Gets data from preset structure 1. Converts .epub to .html
2. Add preset to html preprocessor 2. Parses from line structure to nested structure
3. Converts .epub to .html
4. Parses from line structure to nested structure
Returns Returns
---------- ----------
@@ -27,10 +30,32 @@ class EpubBook(BookSolver):
json for LiveCarta platform json for LiveCarta platform
""" """
css_processor = CSSPreprocessor() html_preprocessor = HtmlPresetsProcessor(
html_processor = HtmlEpubPreprocessor(self.preset_path, logger=self.logger_object) logger=self.logger_object, preset_path="presets/epub_presets.json")
style_preprocessor = StyleReader()
html_processor = HtmlEpubProcessor(logger=self.logger_object,
html_preprocessor=html_preprocessor)
json_converter = EpubConverter( json_converter = EpubConverter(
self.book_path, access=self.access, logger=self.logger_object, self.book_path, access=self.access, logger=self.logger_object,
css_processor=css_processor, html_processor=html_processor) style_processor=style_preprocessor, html_processor=html_processor)
content_dict = json_converter.convert_to_dict() content_dict = json_converter.convert_to_dict()
return content_dict return content_dict
if __name__ == "__main__":
epub_file_path = "../../books/epub/9780763774134.epub"
logger_object = BookLogger(
name="epub", book_id=epub_file_path.split("/")[-1])
html_preprocessor = HtmlPresetsProcessor(
logger=logger_object, preset_path="../../presets/epub_presets.json")
style_preprocessor = StyleReader()
html_processor = HtmlEpubProcessor(logger=logger_object,
html_preprocessor=html_preprocessor)
json_converter = EpubConverter(epub_file_path, logger=logger_object,
style_processor=style_preprocessor, html_processor=html_processor)
content_dict = json_converter.convert_to_dict()
with codecs.open(epub_file_path.replace("epub", "json"), "w", encoding="utf-8") as f_json:
json.dump(content_dict, f_json, ensure_ascii=False)

View File

@@ -1,9 +1,9 @@
import re import re
from typing import Tuple from typing import List, Tuple
from bs4 import BeautifulSoup, Tag from bs4 import BeautifulSoup, Tag
def _replace_with_livecarta_anchor_tag(anchor, i): def replace_with_livecarta_anchor_tag(anchor, i):
"""Function replace noteref_tag(anchor) with new livecarta tag""" """Function replace noteref_tag(anchor) with new livecarta tag"""
new_tag = BeautifulSoup(features="lxml").new_tag("sup") new_tag = BeautifulSoup(features="lxml").new_tag("sup")
new_tag["class"] = "footnote-element" new_tag["class"] = "footnote-element"
@@ -16,8 +16,8 @@ def _replace_with_livecarta_anchor_tag(anchor, i):
return new_tag return new_tag
def preprocess_footnotes(source_html_tag: Tag, href2soup_html: dict = None, noteref_attr_name="epub:type") \ def preprocess_footnotes(source_html_tag: Tag, href2soup_html: dict = None, noteref_attr_name: str = "epub:type") \
-> Tuple[list, list, list]: -> Tuple[List, List, List]:
""" """
This function preprocessing footnotes This function preprocessing footnotes
This function should be earlier that adding fonts in pipeline. This function should be earlier that adding fonts in pipeline.
@@ -75,7 +75,7 @@ def preprocess_footnotes(source_html_tag: Tag, href2soup_html: dict = None, note
if footnote_tag.parent.attrs.get("role") and footnote_tag.parent.attrs.get("role") == "docs-endnote": if footnote_tag.parent.attrs.get("role") and footnote_tag.parent.attrs.get("role") == "docs-endnote":
footnote_tag = footnote_tag.parent footnote_tag = footnote_tag.parent
new_noterefs_tags.append( new_noterefs_tags.append(
_replace_with_livecarta_anchor_tag(noteref_tag, i)) replace_with_livecarta_anchor_tag(noteref_tag, i))
content = footnote_tag.text content = footnote_tag.text
# footnote_tag.decompose() # footnote_tag.decompose()
footnotes.append(content) footnotes.append(content)
@@ -87,5 +87,4 @@ def preprocess_footnotes(source_html_tag: Tag, href2soup_html: dict = None, note
noteref.attrs["data-id"] = i + 1 noteref.attrs["data-id"] = i + 1
noteref.attrs["id"] = f"footnote-{i + 1}" noteref.attrs["id"] = f"footnote-{i + 1}"
footnote.attrs["href"] = f"#footnote-{i + 1}" footnote.attrs["href"] = f"#footnote-{i + 1}"
return footnotes, new_noterefs_tags, new_footnotes_tags return footnotes, new_noterefs_tags, new_footnotes_tags

View File

@@ -1,52 +1,16 @@
import re import re
import json from typing import Union
from bs4 import BeautifulSoup, NavigableString, Comment, Tag from bs4.element import PageElement
from bs4 import BeautifulSoup, Tag, NavigableString, Comment
from src.util.helpers import BookLogger from src.util.helpers import BookLogger
from src.html_presets_processor import _process_presets
class HtmlEpubPreprocessor: class HtmlEpubProcessor:
def __init__(self, preset_path="../../presets/presets.json", logger=None): def __init__(self, logger: BookLogger = None, html_preprocessor=None):
self.preset = json.load(open(preset_path)) self.logger = logger
self.logger: BookLogger = logger self.html_preprocessor = html_preprocessor
self.name2function = {
"table_wrapper": self._wrap_tags_with_table,
"replacer": self._tags_to_correspond_livecarta_tag,
"attr_replacer": self._replace_attrs_in_tags,
"unwrapper": self._unwrap_tags,
"inserter": self._insert_tags_into_correspond_tags
}
@staticmethod
def _add_span_to_save_ids_for_links(tag_to_be_removed, chapter_tag: BeautifulSoup):
"""
Function adds span with id from tag_to_be_removed
because this tag will be removed(unwrapped/extract)
Parameters
----------
tag_to_be_removed: Soup object
chapter_tag: BeautifulSoup
Returns
-------
None
updated body tag
"""
def _insert_span_with_attrs_before_tag(chapter_tag: BeautifulSoup, tag_to_be_removed: Tag, id_: str,
class_: list):
"""Function inserts span before tag aren't supported by LiveCarta"""
new_tag = chapter_tag.new_tag("span")
new_tag.attrs["id"] = id_ or ""
new_tag.attrs["class"] = class_ or ""
new_tag.string = "\xa0"
tag_to_be_removed.insert_before(new_tag)
if tag_to_be_removed.attrs.get("id"):
_insert_span_with_attrs_before_tag(chapter_tag=chapter_tag, tag_to_be_removed=tag_to_be_removed,
id_=tag_to_be_removed.attrs["id"],
class_=tag_to_be_removed.attrs.get("class"))
@staticmethod @staticmethod
def prepare_title(title_of_chapter: str) -> str: def prepare_title(title_of_chapter: str) -> str:
@@ -78,7 +42,7 @@ class HtmlEpubPreprocessor:
Returns Returns
------- -------
None NoReturn
Chapter Tag without comments Chapter Tag without comments
""" """
@@ -110,202 +74,28 @@ class HtmlEpubPreprocessor:
p_tag.append(str(node)) p_tag.append(str(node))
node.replace_with(p_tag) node.replace_with(p_tag)
def _wrap_tags_with_table(self, chapter_tag: BeautifulSoup, rules: list): def _remove_headings_content(self, chapter_tag: Union[BeautifulSoup, PageElement], title_of_chapter: str):
"""
Function wraps <tag> with <table>
Parameters
----------
chapter_tag: BeautifulSoup
Tag & contents of the chapter tag
Returns
-------
None
Chapter Tag with wrapped certain tags with <table>
"""
def _wrap_tag_with_table(width="100", border="", bg_color=None):
table = chapter_tag.new_tag("table")
table.attrs["border"], table.attrs["align"], table.attrs["style"] \
= border, "center", f"width:{width}%;"
tbody, tr, td = \
chapter_tag.new_tag("tbody"), chapter_tag.new_tag("tr"), chapter_tag.new_tag("td")
td.attrs["bgcolor"] = bg_color
tag_to_wrap.wrap(td)
td.wrap(tr)
tr.wrap(tbody)
tbody.wrap(table)
table.insert_after(BeautifulSoup(features="lxml").new_tag("br"))
return table
def process_tag_using_table():
_wrap_tag_with_table(
width=tag_to_wrap.attrs["width"] if tag_to_wrap.attrs.get("width") else "100",
border=tag_to_wrap.attrs["border"] if tag_to_wrap.attrs.get("border") else None,
bg_color=tag_to_wrap.attrs["bgcolor"] if tag_to_wrap.attrs.get("bgcolor") else None)
self._add_span_to_save_ids_for_links(tag_to_wrap, chapter_tag)
tag_to_wrap.unwrap()
for rule in rules:
tags = rule["tags"]
for attr in rule["attrs"]:
for tag_to_wrap in chapter_tag.find_all([re.compile(tag) for tag in tags],
{attr["name"]: re.compile(fr"{attr['value']}")}):
process_tag_using_table()
@staticmethod
def _tags_to_correspond_livecarta_tag(chapter_tag: BeautifulSoup, rules: list):
"""
Function to replace all tags to correspond LiveCarta tags
Parameters
----------
chapter_tag: BeautifulSoup
Tag & contents of the chapter tag
Returns
-------
None
Chapter Tag with all tags replaced with LiveCarta tags
"""
for rule in rules:
tags = rule["tags"]
tag_to_replace = rule["tag_to_replace"]
if rule["condition"]:
for condition_on_tag in ((k, v) for k, v in rule["condition"].items() if v):
if condition_on_tag[0] == 'parent_tags':
for tag in chapter_tag.find_all([re.compile(tag) for tag in tags]):
if tag.parent.select(condition_on_tag[1]):
tag.name = tag_to_replace
elif condition_on_tag[0] == 'child_tags':
for tag in chapter_tag.find_all([re.compile(tag) for tag in tags]):
if not tag.select(re.sub('[():]|not', '', condition_on_tag[1])):
tag.name = tag_to_replace
elif condition_on_tag[0] == "attrs":
for attr in rule["condition"]["attrs"]:
for tag in chapter_tag.find_all([re.compile(tag) for tag in tags],
{attr["name"]: re.compile(fr"{attr['value']}")}):
tag.name = tag_to_replace
else:
for tag in chapter_tag.find_all([re.compile(tag) for tag in tags]):
# todo can cause appearance of \n <p><p>...</p></p> -> <p>\n</p> <p>...</p> <p>\n</p> (section)
tag.name = tag_to_replace
@staticmethod
def _replace_attrs_in_tags(chapter_tag: BeautifulSoup, rules: list):
"""
Function to replace all tags to correspond LiveCarta tags
Parameters
----------
chapter_tag: BeautifulSoup
Tag & contents of the chapter tag
Returns
-------
None
Chapter Tag with all tags replaced with LiveCarta tags
"""
for rule in rules:
attr = rule["attr"]
tags = rule["condition"]["tags"]
attr_to_replace = rule["attr_to_replace"]
for tag in chapter_tag.find_all([re.compile(tag) for tag in tags],
{attr: re.compile(r".*")}):
tag[attr_to_replace] = tag[attr]
del tag[attr]
def _unwrap_tags(self, chapter_tag: BeautifulSoup, rules: dict):
"""
Function unwrap tags and moves id to span
Parameters
----------
chapter_tag: BeautifulSoup
Tag & contents of the chapter tag
Returns
-------
None
Chapter Tag with unwrapped certain tags
"""
for tag_name in rules["tags"]:
for tag in chapter_tag.select(tag_name):
# if tag is a subtag
if ">" in tag_name:
tag.parent.attrs.update(tag.attrs)
self._add_span_to_save_ids_for_links(tag, chapter_tag)
tag.unwrap()
@staticmethod
def _insert_tags_into_correspond_tags(chapter_tag: BeautifulSoup, rules: list):
"""
Function inserts tags into correspond tags
Parameters
----------
chapter_tag: BeautifulSoup
Tag & contents of the chapter tag
Returns
-------
None
Chapter Tag with inserted tags
"""
def insert(tag):
tag_to_insert = \
chapter_tag.new_tag(rule["tag_to_insert"])
# insert all items that was in tag to subtag and remove from tag
for content in reversed(tag.contents):
tag_to_insert.insert(0, content.extract())
# wrap subtag with items
tag.append(tag_to_insert)
for rule in rules:
tags = rule["tags"]
if rule["condition"]:
for condition_on_tag in ((k, v) for k, v in rule["condition"].items() if v):
if condition_on_tag[0] == 'parent_tags':
for tag in chapter_tag.find_all([re.compile(tag) for tag in tags]):
if tag.parent.select(condition_on_tag[1]):
insert(tag)
elif condition_on_tag[0] == 'child_tags':
for tag in chapter_tag.find_all([re.compile(tag) for tag in tags]):
if not tag.select(re.sub('[():]|not', '', condition_on_tag[1])):
insert(tag)
elif condition_on_tag[0] == "attrs":
for attr in rule["condition"]["attrs"]:
for tag in chapter_tag.find_all([re.compile(tag) for tag in tags],
{attr["name"]: re.compile(fr"{attr['value']}")}):
insert(tag)
else:
for tag in chapter_tag.find_all([re.compile(tag) for tag in tags]):
insert(tag)
def _remove_headings_content(self, chapter_tag, title_of_chapter: str):
""" """
Function Function
- cleans/removes headings from chapter in order to avoid duplication of chapter titles in the content - cleans/removes headings from chapter in order to avoid duplication of chapter titles in the content
- adds span with id in order to - adds span with id in order to
Parameters Parameters
---------- ----------
chapter_tag: soup object chapter_tag: Union[BeautifulSoup, PageElement]
Tag of the page Tag of the page
title_of_chapter: str title_of_chapter: str
Chapter title Chapter title
Returns Returns
------- -------
None NoReturn
clean/remove headings & add span with id clean/remove headings & add span with id
""" """
title_of_chapter = title_of_chapter.lower() title_of_chapter = title_of_chapter.lower()
if title_of_chapter == "chapter 1":
pass
for tag in chapter_tag.contents: for tag in chapter_tag.contents:
text = tag if isinstance(tag, NavigableString) else tag.text tag: PageElement
text: str = tag if isinstance(tag, NavigableString) else tag.text
if re.sub(r"[\s\xa0]", "", text): if re.sub(r"[\s\xa0]", "", text):
text = re.sub(r"[\s\xa0]", " ", text).lower() text = re.sub(r"[\s\xa0]", " ", text).lower()
text = text.strip() # delete extra spaces text = text.strip() # delete extra spaces
@@ -313,7 +103,8 @@ class HtmlEpubPreprocessor:
if title_of_chapter == text or \ if title_of_chapter == text or \
(title_of_chapter in text and (title_of_chapter in text and
re.findall(r"^h[1-3]$", tag.name or chapter_tag.name)): re.findall(r"^h[1-3]$", tag.name or chapter_tag.name)):
self._add_span_to_save_ids_for_links(tag, chapter_tag) self.html_preprocessor._add_span_to_save_ids_for_links(
tag, chapter_tag)
tag.extract() tag.extract()
return return
elif not self._remove_headings_content(tag, title_of_chapter): elif not self._remove_headings_content(tag, title_of_chapter):
@@ -322,43 +113,6 @@ class HtmlEpubPreprocessor:
tag.extract() tag.extract()
return return
@staticmethod
def _process_tables(chapter_tag: BeautifulSoup):
"""
Function preprocesses tables and tags(td|th|tr)
Parameters
----------
chapter_tag: BeautifulSoup
Tag & contents of the chapter tag
Returns
-------
None
Chapter Tag with processed tables
"""
tables = chapter_tag.find_all("table")
for table in tables:
for t_tag in table.find_all(re.compile("td|th|tr")):
width = ""
if t_tag.get("style"):
width_match = re.search(
r"[^-]width: ?(\d+\.?\d*)(p[tx])", t_tag["style"])
if width_match:
size = width_match.group(1)
width = size + "px"
t_tag.attrs["width"] = t_tag.get("width") or width
if t_tag.attrs.get("style"):
t_tag.attrs["style"] = t_tag.attrs["style"].replace(
"border:0;", "")
if re.sub(r"[\s\xa0]", "", t_tag.attrs.get("style")) == "":
del t_tag.attrs["style"]
if not table.attrs.get("border") or table.attrs.get("border") in ["0", "0px"]:
table.attrs["border"] = "1"
@staticmethod @staticmethod
def _class_removing(chapter_tag: BeautifulSoup): def _class_removing(chapter_tag: BeautifulSoup):
""" """
@@ -370,7 +124,7 @@ class HtmlEpubPreprocessor:
Returns Returns
------- -------
None NoReturn
Chapter Tag without original classes of the book Chapter Tag without original classes of the book
""" """
@@ -379,14 +133,14 @@ class HtmlEpubPreprocessor:
and (tag.attrs.get("class") not in ["link-anchor", "footnote-element"]): and (tag.attrs.get("class") not in ["link-anchor", "footnote-element"]):
del tag.attrs["class"] del tag.attrs["class"]
def prepare_content(self, title_str: str, content_tag: BeautifulSoup, remove_title_from_chapter: bool) -> Tag: def prepare_content(self, title_str: str, chapter_tag: BeautifulSoup, remove_title_from_chapter: bool) -> Tag:
""" """
Function finalise processing/cleaning content Function finalise processing/cleaning content
Parameters Parameters
---------- ----------
title_str: str title_str: str
content_tag: Tag, soup object chapter_tag: BeautifulSoup, soup object
remove_title_from_chapter: bool remove_title_from_chapter: bool
@@ -394,13 +148,13 @@ class HtmlEpubPreprocessor:
---------- ----------
1. comments removal 1. comments removal
2. wrap NavigableString with tag <p> 2. wrap NavigableString with tag <p>
3-6. wrap tags with <table> 3. heading removal
4. wrap tags with <table>
replace tags with correspond LiveCarta tags replace tags with correspond LiveCarta tags
replace/remove attrs, values of attrs
unwrap tags unwrap tags
insert tags into correspond tags insert tags into correspond tags
7. heading removal 5. class removal
8. process_tables
9. class removal
Returns Returns
------- -------
@@ -409,18 +163,15 @@ class HtmlEpubPreprocessor:
""" """
# 1. remove comments # 1. remove comments
self._remove_comments(content_tag) self._remove_comments(chapter_tag)
# 2. # 2.
self._wrap_strings_with_p(content_tag) self._wrap_strings_with_p(chapter_tag)
# 3-6. # 3.
for dict in self.preset:
func = self.name2function[dict["preset_name"]]
func(content_tag, dict['rules'])
# 7.
if remove_title_from_chapter: if remove_title_from_chapter:
self._remove_headings_content(content_tag, title_str) self._remove_headings_content(chapter_tag, title_str)
# 8. # 4.
self._process_tables(content_tag) _process_presets(
# 9. remove classes that weren't created by converter html_preprocessor=self.html_preprocessor, html_soup=chapter_tag)
self._class_removing(content_tag) # 5. remove classes that weren't created by converter
return content_tag self._class_removing(chapter_tag)
return chapter_tag

View File

@@ -1,37 +1,38 @@
import os import os
import pathlib import pathlib
from typing import Dict
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from src.access import Access from src.access import Access
def save_image_to_aws(access: Access, img_file_path: str, img_content: bytes, book_id: str): def save_image_to_aws(access: Access, img_file_path: str, img_content: bytes, book_id: str) -> str:
"""Function saves all images to Amazon web service""" """Function saves all images to Amazon web service"""
link_path = access.send_image( link_path: str = access.send_image(
img_file_path, doc_id=book_id, img_content=img_content) img_file_path, doc_id=book_id, img_content=img_content)
return link_path return link_path
def save_image_locally(img_file_path: str, img_content: bytes, book_id: str): def save_image_locally(img_file_path: str, img_content: bytes, book_id: str) -> pathlib.Path:
"""Function saves all images locally""" """Function saves all images locally"""
folder_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) folder_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
new_path = pathlib.Path(os.path.join( new_path = pathlib.Path(os.path.join(
folder_path, f"../books/json/img_{book_id}/")) folder_path, f"../books/json/img_{book_id}/"))
new_path.mkdir(exist_ok=True) new_path.mkdir(exist_ok=True)
new_img_path = new_path / os.path.basename(img_file_path) img_folder_path = new_path / os.path.basename(img_file_path)
f = open(new_img_path, "wb+") f = open(img_folder_path, "wb+")
f.write(img_content) f.write(img_content)
f.close() f.close()
return new_img_path return img_folder_path
def update_images_src_links(body_tag: BeautifulSoup, def update_images_src_links(body_tag: BeautifulSoup,
img_href2img_content: dict, img_href2img_content: Dict[str, bytes],
path_to_html: str, path_to_html: str,
access=None, access: Access = None,
path2aws_path: dict = None, path2aws_path: Dict[str, str] = None,
book_id: str = None) -> dict: book_id: str = None) -> Dict[str, str]:
"""Function makes dictionary image_src_path -> Amazon web service_path""" """Function makes dictionary image_src_path -> Amazon web service_path"""
img_tags = body_tag.find_all("img") img_tags = body_tag.find_all("img")
for img in img_tags: for img in img_tags:
@@ -43,23 +44,20 @@ def update_images_src_links(body_tag: BeautifulSoup,
assert path_to_img_from_root in img_href2img_content, \ assert path_to_img_from_root in img_href2img_content, \
f"Image {path_to_img_from_html} in file {path_to_html} was not added to manifest." f"Image {path_to_img_from_html} in file {path_to_html} was not added to manifest."
img_content = img_href2img_content[path_to_img_from_root] img_content: bytes = img_href2img_content[path_to_img_from_root]
if access is not None: if access is not None:
if path_to_img_from_root in path2aws_path: if path_to_img_from_root in path2aws_path:
new_folder = path2aws_path[path_to_img_from_root] img_folder_path = path2aws_path[path_to_img_from_root]
else: else:
new_folder = save_image_to_aws( img_folder_path = save_image_to_aws(
access, path_to_img_from_root, img_content, book_id) access, path_to_img_from_root, img_content, book_id)
path2aws_path[path_to_img_from_root] = new_folder path2aws_path[path_to_img_from_root] = img_folder_path
else: else:
new_folder = save_image_locally( img_folder_path = save_image_locally(
path_to_img_from_root, img_content, book_id) path_to_img_from_root, img_content, book_id)
img.attrs["src"] = str(new_folder) img.attrs["src"] = str(img_folder_path)
if img.attrs.get("width"): for attr in ["width", "height", "style"]:
del img.attrs["width"] if img.attrs.get(attr):
if img.attrs.get("height"): del img.attrs[attr]
del img.attrs["height"]
if img.attrs.get("style"):
del img.attrs["style"]
return path2aws_path return path2aws_path

View File

@@ -0,0 +1,182 @@
import re
import json
from bs4 import BeautifulSoup, Tag
from bs4.element import PageElement
from typing import List, Dict, Union
from src.util.helpers import BookLogger
class HtmlPresetsProcessor:
def __init__(self, logger: BookLogger, preset_path):
self.preset = json.load(open(preset_path))
self.logger = logger
self.name2action = {
"wrapper": self._wrap_tag,
"table_wrapper": self._process_tag_using_table,
"decomposer": self._decompose_tag,
"replacer": self._replace_tag,
"attr_replacer": self._replace_attr,
"unwrapper": self._unwrap_tag,
"inserter": self._insert_tag
}
@staticmethod
def _wrap_tag(**kwargs):
kwargs["tag"].wrap(kwargs["body_tag"].new_tag(
kwargs["rule"]["tag_to_wrap"]))
@staticmethod
def _decompose_tag(**kwargs):
kwargs["tag"].decompose()
@staticmethod
def _add_span_to_save_ids_for_links(tag_to_be_removed: Union[PageElement, BeautifulSoup],
chapter_tag: BeautifulSoup):
"""
Function adds span with id from tag_to_be_removed
because this tag will be removed(unwrapped/extract)
Parameters
----------
tag_to_be_removed: Union[PageElement, BeautifulSoup]
chapter_tag: BeautifulSoup
Returns
-------
NoReturn
updated body tag
"""
def _insert_span_with_attrs_before_tag(chapter_tag: BeautifulSoup,
tag_to_be_removed: Tag,
id_: str,
class_: Union[List[str], str]):
"""Function inserts span before tag aren't supported by LiveCarta"""
new_tag: Tag = chapter_tag.new_tag("span")
new_tag.attrs["id"] = id_ or ""
new_tag.attrs["class"] = class_ or ""
new_tag.string = "\xa0"
tag_to_be_removed.insert_before(new_tag)
if tag_to_be_removed.attrs.get("id"):
_insert_span_with_attrs_before_tag(chapter_tag=chapter_tag,
tag_to_be_removed=tag_to_be_removed,
id_=tag_to_be_removed.attrs["id"],
class_=tag_to_be_removed.attrs.get("class"))
def _process_tag_using_table(self, **kwargs):
def _wrap_tag_with_table(width: str = "100", border: str = "", bg_color: str = None) -> Tag:
table = kwargs["body_tag"].new_tag("table")
table.attrs["border"], table.attrs["align"], table.attrs["style"] \
= border, "center", f"width:{width}%;"
tbody, tr, td = \
kwargs["body_tag"].new_tag("tbody"), kwargs["body_tag"].new_tag(
"tr"), kwargs["body_tag"].new_tag("td")
td.attrs["bgcolor"] = bg_color
kwargs["tag"].wrap(td)
td.wrap(tr)
tr.wrap(tbody)
tbody.wrap(table)
table.insert_after(BeautifulSoup(features="lxml").new_tag("br"))
return table
_wrap_tag_with_table(
width=kwargs["tag"].attrs["width"] if kwargs["tag"].attrs.get(
"width") else "100",
border=kwargs["tag"].attrs["border"] if kwargs["tag"].attrs.get(
"border") else None,
bg_color=kwargs["tag"].attrs["bgcolor"] if kwargs["tag"].attrs.get("bgcolor") else None)
self._add_span_to_save_ids_for_links(kwargs["tag"], kwargs["body_tag"])
kwargs["tag"].unwrap()
@staticmethod
def _replace_tag(**kwargs):
tag_to_replace: str = kwargs["rule"]["tag_to_replace"]
kwargs["tag"].name = tag_to_replace
@staticmethod
def _replace_attr(**kwargs):
attr, attr_value =\
kwargs["rule"]["attr"]["name"], kwargs["rule"]["attr"]["value"]
attr_to_replace, attr_value_to_replace =\
kwargs["rule"]["attr_to_replace"]["name"], kwargs["rule"]["attr_to_replace"]["value"]
if attr_to_replace:
kwargs["tag"][attr_to_replace] = kwargs["tag"][attr]
if attr_value_to_replace:
kwargs["tag"].attrs[attr_to_replace] = attr_value_to_replace
del kwargs["tag"][attr]
elif attr_value_to_replace:
kwargs["tag"].attrs[attr] = attr_value_to_replace
elif attr:
del kwargs["tag"][attr]
@staticmethod
def _unwrap_tag(**kwargs):
kwargs["tag"].unwrap()
@staticmethod
def _insert_tag(**kwargs):
tag_to_insert = \
kwargs["body_tag"].new_tag(kwargs["rule"]["tag_to_insert"])
# insert all items that was in tag to subtag and remove from tag
for content in reversed(kwargs["tag"].contents):
tag_to_insert.insert(0, content.extract())
# wrap subtag with items
kwargs["tag"].append(tag_to_insert)
@staticmethod
def _process_tags(body_tag: BeautifulSoup,
rules: List[Dict[str, Union[List[str], str, Dict[str, Union[List[Dict[str, str]], int, str]]]]],
action):
"""
Function does action with tags
Parameters
----------
body_tag: BeautifulSoup
Tag & contents of the body tag
rules: List[Dict[str, Union[List[str], str, Dict[str, Union[List[Dict[str, str]], int, str]]]]]
list of conditions when fire function
action: function
action what to do with tag
Returns
-------
NoReturn
Body Tag with processed certain tags
"""
for rule in rules:
tags: List[str] = rule["tags"] if rule.get(
"tags") else rule["condition"]["tags"]
if rule["condition"]:
for condition_on_tag in ((k, v) for k, v in rule["condition"].items() if v):
if condition_on_tag[0] == "parent_tags":
for parent_tag in body_tag.select(condition_on_tag[1]):
for tag in parent_tag.find_all([re.compile(tag) for tag in tags]):
# parent_tag != tag.parent
tag.parent.attrs.update(tag.attrs)
action(body_tag=body_tag, tag=tag, rule=rule)
elif condition_on_tag[0] == "child_tags":
for tag in body_tag.find_all([re.compile(tag) for tag in tags]):
if tag.select(condition_on_tag[1]):
action(body_tag=body_tag, tag=tag, rule=rule)
elif condition_on_tag[0] == "attrs":
for attr in rule["condition"]["attrs"]:
for tag in body_tag.find_all([re.compile(tag) for tag in tags],
{attr["name"]: re.compile(fr"{attr['value']}")}):
action(body_tag=body_tag, tag=tag, rule=rule)
# attr replacer
elif condition_on_tag[0] == "tags":
attr = rule["attr"]
for tag in body_tag.find_all([re.compile(tag) for tag in tags],
{attr['name']: re.compile(fr"{attr['value']}")}):
action(body_tag=body_tag, tag=tag, rule=rule)
else:
for tag in body_tag.find_all([re.compile(tag) for tag in tags]):
action(body_tag=body_tag, tag=tag, rule=rule)
def _process_presets(html_preprocessor: HtmlPresetsProcessor, html_soup: BeautifulSoup):
for rule in html_preprocessor.preset:
# html_preprocessor.logger.log(rule["preset_name"].title() + " process.")
action = html_preprocessor.name2action[rule["preset_name"]]
html_preprocessor._process_tags(html_soup, rule["rules"], action)

View File

@@ -1,23 +1,23 @@
import re import re
import cssutils import cssutils
from typing import List from typing import List
from logging import CRITICAL from logging import CRITICAL
from bs4 import BeautifulSoup from premailer import transform
from bs4 import BeautifulSoup, Tag
from src.livecarta_config import LiveCartaConfig from src.livecarta_config import LiveCartaConfig
cssutils.log.setLevel(CRITICAL) cssutils.log.setLevel(CRITICAL)
class TagInlineStyleProcessor: class InlineStyleProcessor:
def __init__(self, tag_inline_style): def __init__(self, tag_inline_style: Tag):
# tag with inline style + style parsed from css file # tag with inline style + style parsed from css file
self.tag_inline_style = tag_inline_style self.tag_inline_style = tag_inline_style
self.tag_inline_style.attrs['style'] = self.process_inline_style() self.tag_inline_style.attrs['style']: str = self.process_inline_style()
@staticmethod @staticmethod
def remove_white_if_no_bgcolor(style_, tag): def remove_white_if_no_bgcolor(style_: str, tag: Tag) -> str:
"""Function remove text white color if there is no bg color""" """Function remove text white color if there is no bg color"""
if "background" in style_: if "background" in style_:
style_ = style_.replace( style_ = style_.replace(
@@ -62,13 +62,13 @@ class TagInlineStyleProcessor:
# return split_style # return split_style
@staticmethod @staticmethod
def indents_processing(split_style: list) -> str: def indents_processing(split_style: List[str]) -> str:
""" """
Function process indents from left using Function process indents from left using
formula_of_indent: indent = abs(margin - text_indent) formula_of_indent: indent = abs(margin - text_indent)
Parameters Parameters
---------- ----------
split_style: list split_style: List[str]
list of styles split by ";" list of styles split by ";"
Returns Returns
@@ -111,7 +111,7 @@ class TagInlineStyleProcessor:
return processed_style return processed_style
return processed_style return processed_style
def process_inline_style(self): def process_inline_style(self) -> str:
""" """
Function processes final(css+initial inline) inline style Function processes final(css+initial inline) inline style
Steps Steps
@@ -180,7 +180,7 @@ class TagInlineStyleProcessor:
self.tag_inline_style.append(correspond_tag) self.tag_inline_style.append(correspond_tag)
@staticmethod @staticmethod
def wrap_span_in_tag_to_save_style_attrs(initial_tag): def wrap_span_in_tag_to_save_style_attrs(initial_tag: Tag):
"""Function designed to save style attrs that cannot be in tag.name -> span""" """Function designed to save style attrs that cannot be in tag.name -> span"""
dictkeys_pattern = re.compile("|".join(LiveCartaConfig.LIVECARTA_STYLES_CAN_BE_IN_TAG)) dictkeys_pattern = re.compile("|".join(LiveCartaConfig.LIVECARTA_STYLES_CAN_BE_IN_TAG))
if re.findall(dictkeys_pattern, initial_tag.name) and initial_tag.attrs.get("style"): if re.findall(dictkeys_pattern, initial_tag.name) and initial_tag.attrs.get("style"):
@@ -212,7 +212,45 @@ class TagInlineStyleProcessor:
initial_tag.attrs["style"] = span_style initial_tag.attrs["style"] = span_style
initial_tag.wrap(tag) initial_tag.wrap(tag)
def convert_initial_tag(self): def convert_initial_tag(self) -> Tag:
self.change_attrs_with_corresponding_tags() self.change_attrs_with_corresponding_tags()
self.wrap_span_in_tag_to_save_style_attrs(self.tag_inline_style) self.wrap_span_in_tag_to_save_style_attrs(self.tag_inline_style)
return self.tag_inline_style return self.tag_inline_style
def modify_html_soup_with_css_styles(html_soup: BeautifulSoup, css_text: str = "") -> BeautifulSoup:
"""
Function adds styles from .css to inline style.
Parameters
----------
html_soup: BeautifulSoup
html page with inline style
css_text: str
css content from css file
Returns
-------
inline_soup: BeautifulSoup
soup with styles from css
"""
# remove this specification because it causes problems
css_text = css_text.replace(
'@namespace epub "http://www.idpf.org/2007/ops";', '')
# here we add css styles to inline style
html_with_css_styles: str = transform(str(html_soup), css_text=css_text,
remove_classes=False,
external_styles=False,
allow_network=False,
disable_validation=True,
)
# soup with converted styles from css
inline_soup = BeautifulSoup(html_with_css_styles, features="lxml")
tags_with_inline_style = inline_soup.find_all(LiveCartaConfig.could_have_style_in_livecarta_regexp,
attrs={"style": re.compile(".*")})
# go through the tags with inline style + style parsed from css file
for tag_inline_style in tags_with_inline_style:
style_converter = InlineStyleProcessor(tag_inline_style)
style_converter.convert_initial_tag()
return inline_soup

View File

@@ -1,13 +1,13 @@
import re import re
import cssutils import cssutils
from bs4 import BeautifulSoup from typing import Tuple
from os.path import dirname, normpath, join from os.path import dirname, normpath, join
from src.util.color_reader import str2hex from src.util.color_reader import str2hex
from src.livecarta_config import LiveCartaConfig from src.livecarta_config import LiveCartaConfig
class CSSPreprocessor: class StyleReader:
def __init__(self): def __init__(self):
""" """
Dictionary LIVECARTA_STYLE_ATTRS_MAPPING = { property: mapping function } Dictionary LIVECARTA_STYLE_ATTRS_MAPPING = { property: mapping function }
@@ -41,13 +41,13 @@ class CSSPreprocessor:
} }
@staticmethod @staticmethod
def get_text_color(x): def get_text_color(x: str) -> str:
color = str2hex(x) color = str2hex(x)
color = color if color not in ["#000000", "#000", "black"] else "" color = color if color not in ["#000000", "#000", "black"] else ""
return color return color
@staticmethod @staticmethod
def get_bg_color(x): def get_bg_color(x: str) -> str:
color = str2hex(x) color = str2hex(x)
color = color if color not in ["#ffffff", "#fff", "white"] else "" color = color if color not in ["#ffffff", "#fff", "white"] else ""
return color return color
@@ -56,7 +56,7 @@ class CSSPreprocessor:
def convert_tag_style_values(size_value: str, is_indent: bool = False) -> str: def convert_tag_style_values(size_value: str, is_indent: bool = False) -> str:
""" """
Function Function
- converts values of tags from em/%/pt to px - converts values of tags from em/%/pt/in to px
- find closest font-size px - find closest font-size px
Parameters Parameters
---------- ----------
@@ -70,20 +70,23 @@ class CSSPreprocessor:
converted value size converted value size
""" """
size_regexp = re.compile( size_regexp = re.compile(
r"(^-*(\d*\.*\d+)%$)|(^-*(\d*\.*\d+)em$)|(^-*(\d*\.*\d+)pt$)") r"(^-*(\d*\.*\d+)%$)|(^-*(\d*\.*\d+)em$)|(^-*(\d*\.*\d+)pt$)|(^-*(\d*\.*\d+)in$)")
has_style_attrs = re.search(size_regexp, size_value) has_style_attrs = re.search(size_regexp, size_value)
if has_style_attrs: if has_style_attrs:
if has_style_attrs.group(1): if has_style_attrs.group(1):
multiplier = 5.76 if is_indent else 0.16 multiplier = 5.76 if is_indent else 0.16
size_value = float(size_value.replace("%", "")) * multiplier size_value = float(size_value.replace("%", "")) * multiplier
return str(size_value)+'px' return str(size_value) + "px"
elif has_style_attrs.group(3): elif has_style_attrs.group(3):
multiplier = 18 if is_indent else 16 multiplier = 18 if is_indent else 16
size_value = float(size_value.replace("em", "")) * multiplier size_value = float(size_value.replace("em", "")) * multiplier
return str(size_value)+'px' return str(size_value) + "px"
elif has_style_attrs.group(5): elif has_style_attrs.group(5):
size_value = float(size_value.replace("pt", "")) * 4/3 size_value = float(size_value.replace("pt", "")) * 4/3
return str(size_value)+'px' return str(size_value) + "px"
elif has_style_attrs.group(7):
size_value = float(size_value.replace("in", "")) * 96
return str(size_value) + "px"
else: else:
return "" return ""
return size_value return size_value
@@ -114,7 +117,7 @@ class CSSPreprocessor:
return cleaned_value return cleaned_value
@staticmethod @staticmethod
def style_conditions(style_value: str, style_name: str) -> tuple[bool, bool]: def style_conditions(style_value: str, style_name: str) -> Tuple[bool, bool]:
constraints_on_value = LiveCartaConfig.LIVECARTA_STYLE_ATTRS.get( constraints_on_value = LiveCartaConfig.LIVECARTA_STYLE_ATTRS.get(
style_name) style_name)
value_not_in_possible_values_list = style_value not in LiveCartaConfig.LIVECARTA_STYLE_ATTRS[ value_not_in_possible_values_list = style_value not in LiveCartaConfig.LIVECARTA_STYLE_ATTRS[
@@ -156,20 +159,20 @@ class CSSPreprocessor:
style = "; ".join(split_style) style = "; ".join(split_style)
return style return style
def process_inline_styles_in_html_soup(self, html_href2html_body_soup: dict): def process_inline_styles_in_html_soup(self, html_content):
"""This function is designed to convert inline html styles""" """This function is designed to convert inline html styles"""
for html_href in html_href2html_body_soup: tags_with_inline_style = html_content.find_all(LiveCartaConfig.could_have_style_in_livecarta_regexp,
html_content: BeautifulSoup = html_href2html_body_soup[html_href] attrs={"style": re.compile(".*")})
tags_with_inline_style = html_content.find_all(LiveCartaConfig.could_have_style_in_livecarta_regexp,
attrs={"style": re.compile(".*")})
for tag_initial_inline_style in tags_with_inline_style: for tag_initial_inline_style in tags_with_inline_style:
inline_style = tag_initial_inline_style.attrs["style"] inline_style = tag_initial_inline_style.attrs["style"]
tag_initial_inline_style.attrs["style"] = \ if tag_initial_inline_style.attrs.get("align"):
self.build_inline_style_content(inline_style) inline_style += f";text-align: {tag_initial_inline_style.attrs['align']};"
tag_initial_inline_style.attrs["style"] = \
self.build_inline_style_content(inline_style)
@staticmethod @staticmethod
def get_css_content(css_href, html_href, ebooklib_book): def get_css_content(css_href: str, html_href: str, ebooklib_book) -> str:
path_to_css_from_html = css_href path_to_css_from_html = css_href
html_folder = dirname(html_href) html_folder = dirname(html_href)
path_to_css_from_root = normpath( path_to_css_from_root = normpath(

View File

@@ -4,13 +4,12 @@ import argparse
def parse_args(): def parse_args():
parser = argparse.ArgumentParser(description="Utility for folders's clean up.") parser = argparse.ArgumentParser(description="Utility for folders's clean up.")
parser.add_argument('-f', '--folders', type=str, nargs='*', help='Names of the folders to be cleaned.') parser.add_argument("-f", "--folders", type=str, nargs="*", help="Names of the folders to be cleaned.")
args = parser.parse_args() args = parser.parse_args()
return args return args
def check_dir(dir_path): def check_dir(dir_path: str):
if not os.path.exists(dir_path): if not os.path.exists(dir_path):
try: try:
os.mkdir(dir_path) os.mkdir(dir_path)
@@ -18,18 +17,16 @@ def check_dir(dir_path):
raise exc raise exc
if __name__ == '__main__': if __name__ == "__main__":
folders = parse_args().folders folders = parse_args().folders
if not folders: if not folders:
folders = ['docx', 'html', 'json', 'logs', 'config'] folders = ["books/epub", "books/docx", "books/html", "books/json", "logs", "config"]
folder_path = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) folder_path = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
folders = [os.path.join(folder_path, folder) for folder in folders] folders = [os.path.join(folder_path, folder) for folder in folders]
try: try:
for folder in folders: [check_dir(folder) for folder in folders]
check_dir(folder)
except OSError as exc: except OSError as exc:
print(exc) print(exc)
raise raise

View File

@@ -6,15 +6,15 @@ import subprocess
def parse_args(): def parse_args():
parser = argparse.ArgumentParser(description="Utility for checking installed packages.") parser = argparse.ArgumentParser(description="Utility for checking installed packages.")
parser.add_argument('-p', '--packages', type=str, nargs='*', help='Names of the packages.') parser.add_argument("-p", "--packages", type=str, nargs="*", help="Names of the packages.")
args = parser.parse_args() args = parser.parse_args()
return args return args
def check_packages(required_packs): def check_packages(required_packs):
inst = subprocess.check_output([sys.executable, '-m', 'pip', 'freeze']) inst = subprocess.check_output([sys.executable, "-m", "pip", "freeze"])
installed_packages = [r.decode().split('==')[0] for r in inst.split()] installed_packages = [r.decode().split("==")[0] for r in inst.split()]
to_be_installed = [] to_be_installed = []
for package in required_packs: for package in required_packs:
@@ -24,19 +24,19 @@ def check_packages(required_packs):
return to_be_installed return to_be_installed
if __name__ == '__main__': if __name__ == "__main__":
required_packs = parse_args().packages required_packs = parse_args().packages
if not required_packs: if not required_packs:
folder_path = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) folder_path = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
req_path = os.path.join(folder_path, 'requirements.txt') req_path = os.path.join(folder_path, "requirements.txt")
with open(req_path, 'r') as f: with open(req_path, "r") as f:
packs = f.readlines() packs = f.readlines()
required_packs = [pack.split('>=')[0] for pack in packs] required_packs = [pack.split(">=")[0] for pack in packs]
not_inst_packs = check_packages(required_packs) not_inst_packs = check_packages(required_packs)
if not_inst_packs: if not_inst_packs:
raise Exception(f'{" ".join(not_inst_packs)} are not installed.') raise Exception(f"{' '.join(not_inst_packs)} are not installed.")
else: else:
print('All required packages has been installed.') print("All required packages has been installed.")

View File

@@ -1,10 +1,10 @@
import re import re
from typing import Tuple
from colorsys import hls_to_rgb from colorsys import hls_to_rgb
from webcolors import html4_hex_to_names, hex_to_rgb, rgb_to_name, rgb_percent_to_hex, rgb_to_hex, css3_names_to_hex from webcolors import html4_hex_to_names, hex_to_rgb, rgb_to_name, rgb_percent_to_hex, rgb_to_hex, css3_names_to_hex
def closest_colour_rgb(requested_color): def closest_colour_rgb(requested_color: Tuple[int, ...]) -> str:
""" Function finds closes colour rgb """ """ Function finds closes colour rgb """
min_colours = {} min_colours = {}
for key, name in html4_hex_to_names.items(): for key, name in html4_hex_to_names.items():
@@ -17,10 +17,10 @@ def closest_colour_rgb(requested_color):
return min_colours[min(min_colours.keys())] return min_colours[min(min_colours.keys())]
def rgb2color_name(color): def rgb2color_name(color: Tuple[int, ...]) -> str:
""" Transform rgb -> color name """ """ Transform rgb -> color name """
try: try:
closest_name = actual_name = rgb_to_name(color, 'html4') closest_name = actual_name = rgb_to_name(color, "html4")
except ValueError: except ValueError:
closest_name = closest_colour_rgb(color) closest_name = closest_colour_rgb(color)
actual_name = None actual_name = None
@@ -30,15 +30,15 @@ def rgb2color_name(color):
return closest_name return closest_name
def hex2color_name(color): def hex2color_name(color: str) -> str:
""" Transform hex -> color name """ """ Transform hex -> color name """
try: try:
color = hex_to_rgb(color) color = hex_to_rgb(color)
except ValueError: except ValueError:
return '' return ""
try: try:
closest_name = actual_name = rgb_to_name(color, 'html4') closest_name = actual_name = rgb_to_name(color, "html4")
except ValueError: except ValueError:
closest_name = closest_colour_rgb(color) closest_name = closest_colour_rgb(color)
actual_name = None actual_name = None
@@ -48,41 +48,41 @@ def hex2color_name(color):
return closest_name return closest_name
def str2closest_html_color_name(s: str): def str2closest_html_color_name(s: str) -> str:
""" Transform str -> closest color name """ """ Transform str -> closest color name """
if 'rgb' in s: if "rgb" in s:
rgb_str = 'rgba' if ('rgba' in s) else 'rgb' rgb_str = "rgba" if ("rgba" in s) else "rgb"
s = s.replace(rgb_str, '').replace('(', '').replace(')', '') s = s.replace(rgb_str, "").replace("(", "").replace(")", "")
try: try:
rgb = [int(x) for x in s.split(',')[:3]] rgb = [int(x) for x in s.split(",")[:3]]
rgb = tuple(rgb) rgb = tuple(rgb)
except ValueError: except ValueError:
return '' return ""
if len(rgb) != 3: if len(rgb) != 3:
return '' return ""
name = rgb2color_name(rgb) name = rgb2color_name(rgb)
return name return name
elif '#' in s: elif "#" in s:
if s in ['#996A95', '#D5C9D3', '#E9E2E8', '#70416F']: if s in ["#996A95", "#D5C9D3", "#E9E2E8", "#70416F"]:
return 'purple' return "purple"
if s in ['#FFD472', '#F47B4D', '#FFFBEF', '#F47B4D']: if s in ["#FFD472", "#F47B4D", "#FFFBEF", "#F47B4D"]:
return 'olive' return "olive"
if s in ['#B0DFD7', '#EFF8F6', '#5CC4B7']: if s in ["#B0DFD7", "#EFF8F6", "#5CC4B7"]:
return 'teal' return "teal"
name = hex2color_name(s) name = hex2color_name(s)
if (name == 'white') and (s.lower() not in ['#ffffff', '#fff']): if (name == "white") and (s.lower() not in ["#ffffff", "#fff"]):
name = 'gray' name = "gray"
return name return name
elif s in html4_hex_to_names.items(): elif s in html4_hex_to_names.items():
return s return s
else: else:
return '' return ""
def rgba2rgb(r, g, b, alpha): def rgba2rgb(r: int, g: int, b: int, alpha: float) -> Tuple[int, int, int]:
""" Transform rgba -> rgb """ """ Transform rgba -> rgb """
r_background, g_background, b_background = 255, 255, 255 r_background, g_background, b_background = 255, 255, 255
r_new = int((1 - alpha) * r_background + alpha * r) r_new = int((1 - alpha) * r_background + alpha * r)
@@ -91,28 +91,28 @@ def rgba2rgb(r, g, b, alpha):
return r_new, g_new, b_new return r_new, g_new, b_new
def str2hex(s: str): def str2hex(s: str) -> str:
""" Transform str -> hex """ """ Transform str -> hex """
if '#' in s and (len(s) <= 7): if "#" in s and (len(s) <= 7):
return s.lower() return s.lower()
if ('rgb' in s.lower()) and ('%' in s): if ("rgb" in s.lower()) and ("%" in s):
match = re.search(r'rgba*\(((\d+)%, *(\d+)%, *(\d+)%(, \d\.\d+)*)\)', s) match = re.search(r"rgba*\(((\d+)%, *(\d+)%, *(\d+)%(, \d\.\d+)*)\)", s)
if match: if match:
r, g, b = int(match.group(2)), int(match.group(3)), int(match.group(4)) r, g, b = int(match.group(2)), int(match.group(3)), int(match.group(4))
return rgb_percent_to_hex((r, g, b)) return rgb_percent_to_hex((r, g, b))
if 'rgb' in s.lower(): if "rgb" in s.lower():
rgba = re.findall('([0-9] *\.?[0-9]+)', s) rgba = re.findall("([0-9] *\.?[0-9]+)", s)
r, g, b = int(rgba[0]), int(rgba[1]), int(rgba[2]) r, g, b = int(rgba[0]), int(rgba[1]), int(rgba[2])
if len(rgba) == 4: if len(rgba) == 4:
alpha = float(rgba[3]) alpha = float(rgba[3])
r, g, b = rgba2rgb(r, g, b, alpha) r, g, b = rgba2rgb(r, g, b, alpha)
return rgb_to_hex((r, g, b)) return rgb_to_hex((r, g, b))
if 'hsl' in s.lower(): if "hsl" in s.lower():
# hsl(hue in {0,360}, saturation [0, 100%], lightness [0, 100%]) # hsl(hue in {0,360}, saturation [0, 100%], lightness [0, 100%])
match = re.search(r'hsla*\(((\d+), *(\d+)%, *(\d+)%, (\d\.\d+)*)\)', s) match = re.search(r"hsla*\(((\d+), *(\d+)%, *(\d+)%, (\d\.\d+)*)\)", s)
if match: if match:
h, s, l = int(match.group(2)), int(match.group(3)), int(match.group(4)) h, s, l = int(match.group(2)), int(match.group(3)), int(match.group(4))
h /= 360 h /= 360
@@ -127,11 +127,10 @@ def str2hex(s: str):
if s.lower() in css3_names_to_hex: if s.lower() in css3_names_to_hex:
return css3_names_to_hex[s.lower()] return css3_names_to_hex[s.lower()]
return '' return ""
if __name__ == '__main__': if __name__ == "__main__":
colors = [ colors = [
(75, 0, 130), (255, 0, 255), (75, 0, 130), (255, 0, 255),
(139, 69, 19), (46, 139, 87), (139, 69, 19), (46, 139, 87),
@@ -139,7 +138,7 @@ if __name__ == '__main__':
] ]
hex_colors = [ hex_colors = [
'#96F', '#000', '#4C4C4C', '#A00', '#99F' "#96F", "#000", "#4C4C4C", "#A00", "#99F"
] ]
for c in colors: for c in colors:

View File

@@ -1,51 +1,60 @@
import os import os
import logging import logging
from typing import Union
class ColoredFormatter(logging.Formatter): class ColoredFormatter(logging.Formatter):
""" Class to prettify logger and command line output """ """ Class to prettify logger and command line output """
MAPPING = { MAPPING = {
'DEBUG': 37, # white "DEBUG": 37, # white
'INFO': 36, # cyan "INFO": 36, # cyan
'WARNING': 33, # yellow "WARNING": 33, # yellow
'ERROR': 31, # red "ERROR": 31, # red
'CRITICAL': 41, # white on red bg "CRITICAL": 41, # white on red bg
} }
PREFIX = '\033[' PREFIX = "\033["
SUFFIX = '\033[0m' SUFFIX = "\033[0m"
def __init__(self, pattern): def __init__(self, pattern):
logging.Formatter.__init__(self, pattern) logging.Formatter.__init__(self, pattern)
def format(self, record): def format(self, record):
seq = self.MAPPING.get(record.levelname, 37) # default white seq = self.MAPPING.get(record.levelname, 37) # default white
record.levelname = '{0}{1}m{2}{3}' \ record.levelname = "{0}{1}m{2}{3}" \
.format(self.PREFIX, seq, record.levelname, self.SUFFIX) .format(self.PREFIX, seq, record.levelname, self.SUFFIX)
return logging.Formatter.format(self, record) return logging.Formatter.format(self, record)
class BookLogger: class BookLogger:
def __init__(self, name, book_id, main_logger=None, def __init__(self, name: str, book_id: Union[int, str], main_logger: logging.Logger = None,
filemode='w+', logging_level=logging.INFO, filemode: str = "w+", logging_level: int = logging.INFO,
logging_format='%(asctime)s - %(levelname)s - %(message)s [%(filename)s:%(lineno)d in %(funcName)s]'): logging_format: str = "%(asctime)s - %(levelname)s - %(message)s [%(filename)s:%(lineno)d in %(funcName)s]"):
""" """
Method for Logger configuration. Logger will write to file. Method for Logger configuration. Logger will write to file.
:param name: name of the Logger. Parameters
:param attr_name: name of attribute that will be added to self. ----------
:param filename: name of the log file. name: str
:param filemode: mode of opening log file. name of the Logger
:param logging_level: logging level: 10 - debug, 20 - info, 30 - warning, 40 - error, 50 - critical. book_id: Union[int, str]
:param logging_format: format of record in log file. id of the book
main_logger: Logger
main logger of the converter
filemode: str
mode of opening log file.
logging_level: int
logging level: 10 - debug, 20 - info, 30 - warning, 40 - error, 50 - critical
logging_format: str
format of record in log file
""" """
self.main_logger = main_logger self.main_logger = main_logger
self.logger = logging.getLogger(name) self.logger = logging.getLogger(name)
self.logger.propagate = False self.logger.propagate = False
folder_path = os.path.dirname( folder_path = os.path.dirname(
os.path.dirname(os.path.abspath(__file__))) os.path.dirname(os.path.abspath(__file__)))
folder_path = os.path.dirname(folder_path) folder_path = os.path.dirname(folder_path)
filename = f'logs/{book_id}.log' filename = f"logs/{book_id}.log"
file_path = os.path.join(folder_path, filename) file_path = os.path.join(folder_path, filename)
file_handler = logging.FileHandler(file_path, mode=filemode) file_handler = logging.FileHandler(file_path, mode=filemode)
file_format = logging.Formatter(logging_format) file_format = logging.Formatter(logging_format)
@@ -58,42 +67,46 @@ class BookLogger:
self.logger.addHandler(stream_handler) self.logger.addHandler(stream_handler)
self.logger.setLevel(logging_level) self.logger.setLevel(logging_level)
def log(self, message, logging_level=20): def log(self, message: str, logging_level: int = 20):
""" """
Method for logging. Method for logging.
Parameters
----------
message: str
body of the message
logging_level: int
level of logging
:param message: body of the message
:param logging_level: level of logging
""" """
self.logger.log(msg=message, level=logging_level, stacklevel=2) self.logger.log(msg=message, level=logging_level, stacklevel=2)
def log_error_to_main_log(self, message=''): def log_error_to_main_log(self, message: str = ""):
""" Method for logging error to main log file. """ """ Method for logging error to main log file. """
if self.main_logger: if self.main_logger:
if not message: if not message:
message = f'Error in book conversion. Check log file.' message = f"Error in book conversion. Check log file."
self.main_logger.error(message) self.main_logger.error(message)
class BookStatusWrapper: class BookStatusWrapper:
"""Class sets/updates statuses of Converter on Platform""" """Class sets/updates statuses of Converter on Platform"""
def __init__(self, access, logger_object, book_id=0): def __init__(self, access, logger_object: BookLogger, book_id: int = 0):
self.access = access self.access = access
self.logger_object = logger_object self.logger_object = logger_object
self.book_id = book_id self.book_id = book_id
def set_status(self, status: str): def set_status(self, status: str):
str_2_status = { str_2_status = {
'[PROCESS]': self.access.PROCESS, "[PROCESS]": self.access.PROCESS,
'[GENERATE]': self.access.GENERATE, "[GENERATE]": self.access.GENERATE,
'[ERROR]': self.access.ERROR "[ERROR]": self.access.ERROR
} }
try: try:
if self.access: if self.access:
self.access.update_status(self.book_id, str_2_status[status]) self.access.update_status(self.book_id, str_2_status[status])
self.logger_object.log(f'Status has been updated to {status}.') self.logger_object.log(f"Status has been updated to {status}.")
except Exception as exc: except Exception as exc:
self.logger_object.log( self.logger_object.log(
f"Can't update status of the book {status}.", logging.ERROR) f"Can't update status of the book {status}.", logging.ERROR)
@@ -101,10 +114,10 @@ class BookStatusWrapper:
raise exc raise exc
def set_processing(self): def set_processing(self):
self.set_status('[PROCESS]') self.set_status("[PROCESS]")
def set_generating(self): def set_generating(self):
self.set_status('[GENERATE]') self.set_status("[GENERATE]")
def set_error(self): def set_error(self):
self.set_status('[ERROR]') self.set_status("[ERROR]")

View File

@@ -1,4 +1,5 @@
from webcolors import html4_hex_to_names, hex_to_rgb from typing import Tuple
from webcolors import hex_to_rgb
# 16 основных цветов, hex соответвуют hex цветам livecarta # 16 основных цветов, hex соответвуют hex цветам livecarta
# названия другие # названия другие
@@ -8,7 +9,7 @@ html4_hex_to_names = {'#00ffff': 'aqua', '#000000': 'black', '#0000ff': 'blue',
'#ffffff': 'white', '#ffff00': 'yellow'} '#ffffff': 'white', '#ffff00': 'yellow'}
def rgb2hsv(r, g, b): def rgb2hsv(r: int, g: int, b: int) -> Tuple[float, float, float]:
r /= 255 r /= 255
g /= 255 g /= 255
b /= 255 b /= 255
@@ -42,18 +43,23 @@ for key, name in html4_hex_to_names.items():
HTML_COLORS_HSV[name] = (h, s, v) HTML_COLORS_HSV[name] = (h, s, v)
def rgb2closest_html_color_name(color): def rgb2closest_html_color_name(color: str) -> str:
""" """
- get color in hsv (hue, saturation, value) - get color in hsv (hue, saturation, value)
- try to match with black, grey, silver (black, darkGray, lightGray) as this colors matches badly even in hsv model - try to match with black, grey, silver (black, darkGray, lightGray) as this colors matches badly even in hsv model
- calc hue difference between color and all base colors - calc hue difference between color and all base colors
- if for new base color hue diff same as for any other, try to measure saturation and value - if for new base color hue diff same as for any other, try to measure saturation and value
(it happens for similar colors like red - pink, blue - dark blue) (it happens for similar colors like red - pink, blue - dark blue)
Parameters
----------
color: str
color in hex
Returns
-------
base color name that matches best to a given color
:param color: str, color in hex
:return: base color name that matches best to a given color
""" """
if color == (255, 255, 255): if color == (255, 255, 255):