forked from LiveCarta/BookConverter
Annotations for Docx Converter
This commit is contained in:
@@ -3,38 +3,41 @@ import logging
|
||||
import pathlib
|
||||
import subprocess
|
||||
from subprocess import PIPE
|
||||
from typing import Union
|
||||
from threading import Event
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
|
||||
from src.util.helpers import BookLogger
|
||||
|
||||
|
||||
class Docx2LibreHTML:
|
||||
def __init__(self, book_id=0, file_path=None, access=None, logger=None, libre_locker=None):
|
||||
def __init__(self, book_id: int = 0, file_path: Union[pathlib.PosixPath, str] = None,
|
||||
access=None, logger: BookLogger = None, libre_locker: Event = None):
|
||||
self.book_id = book_id if book_id != 0 else pathlib.Path(
|
||||
file_path).stem
|
||||
self.file_path = file_path
|
||||
self.access = access
|
||||
self.logger_object: BookLogger = logger
|
||||
# critical section for occupying libreoffice by one thread
|
||||
self.libre_locker: Event() = libre_locker
|
||||
self.libre_locker = libre_locker
|
||||
|
||||
# path to html file, file appears after libre-conversion
|
||||
self.html_path = self.convert_docx_to_html()
|
||||
self.html_soup = self.read_html(self.html_path)
|
||||
|
||||
def _libre_run(self, out_dir_path):
|
||||
def _libre_run(self, out_dir_path: str):
|
||||
command = ["libreoffice", "--headless",
|
||||
"--convert-to", "html", f"{str(self.file_path)}",
|
||||
"--outdir", f"{out_dir_path}"]
|
||||
print(command)
|
||||
# print(command)
|
||||
result = subprocess.run(command, stdout=PIPE, stderr=PIPE)
|
||||
self.logger_object.log(f"Result of libre conversion for book_{self.book_id}:"
|
||||
f" {result.returncode}, {result.stdout}", logging.DEBUG)
|
||||
self.logger_object.log(f"Any error while libre conversion for book_"
|
||||
f"{self.book_id}: {result.stderr}", logging.DEBUG)
|
||||
|
||||
def convert_docx_to_html(self):
|
||||
def convert_docx_to_html(self) -> pathlib.Path:
|
||||
"""
|
||||
Function converts .docx document to .html file.
|
||||
Steps
|
||||
@@ -44,18 +47,18 @@ class Docx2LibreHTML:
|
||||
|
||||
Returns
|
||||
----------
|
||||
html_path: str
|
||||
html_path: pathlib.Path
|
||||
path to html file, file appears after libre-conversion
|
||||
|
||||
"""
|
||||
def get_and_clear_flag(out_dir_path: str):
|
||||
def get_and_clear_flag(html_file_path: str):
|
||||
self.libre_locker.clear()
|
||||
self.logger_object.log(f"Got flag!", logging.DEBUG)
|
||||
self._libre_run(out_dir_path)
|
||||
self._libre_run(html_file_path)
|
||||
self.libre_locker.set()
|
||||
self.logger_object.log("Cleared flag...", logging.DEBUG)
|
||||
|
||||
def check_file_exists(path, error_string: str):
|
||||
def check_file_exists(path: pathlib.Path, error_string: str):
|
||||
try:
|
||||
f = open(path)
|
||||
f.close()
|
||||
@@ -73,19 +76,20 @@ class Docx2LibreHTML:
|
||||
|
||||
folder_path = os.path.dirname(
|
||||
os.path.dirname(os.path.abspath(__file__)))
|
||||
out_dir_path = os.path.join(folder_path, f"../books/html/{self.book_id}")
|
||||
out_dir_path = os.path.join(
|
||||
folder_path, f"../books/html/{self.book_id}")
|
||||
pathlib.Path(out_dir_path).mkdir(parents=True, exist_ok=True)
|
||||
|
||||
try:
|
||||
if self.libre_locker.isSet():
|
||||
if self.libre_locker.is_set():
|
||||
get_and_clear_flag(out_dir_path)
|
||||
else:
|
||||
while not self.libre_locker.isSet():
|
||||
while not self.libre_locker.is_set():
|
||||
self.logger_object.log(
|
||||
"Waiting for libre...", logging.DEBUG)
|
||||
flag = self.libre_locker.wait(50)
|
||||
if flag:
|
||||
if self.libre_locker.isSet():
|
||||
if self.libre_locker.is_set():
|
||||
get_and_clear_flag(out_dir_path)
|
||||
break
|
||||
except Exception as exc:
|
||||
@@ -105,7 +109,7 @@ class Docx2LibreHTML:
|
||||
f"Input file path after conversion: {html_path}.")
|
||||
return html_path
|
||||
|
||||
def read_html(self, html_path):
|
||||
def read_html(self, html_path: pathlib.Path) -> BeautifulSoup:
|
||||
"""Method for reading .html file into beautiful soup tag."""
|
||||
try:
|
||||
html_text = open(html_path, "r", encoding="utf8").read()
|
||||
|
||||
@@ -12,7 +12,7 @@ from src.docx_converter.libre_html2json_converter import LibreHTML2JSONConverter
|
||||
class DocxBook(BookSolver):
|
||||
"""Class of .docx type book - child of BookSolver"""
|
||||
|
||||
def __init__(self, book_id=0, access=None, main_logger=None, libre_locker=None):
|
||||
def __init__(self, book_id: int = 0, access=None, main_logger=None, libre_locker=None):
|
||||
super().__init__(book_id, access, main_logger)
|
||||
self.book_type = "docx"
|
||||
# critical section for occupying libreoffice by one thread
|
||||
@@ -60,7 +60,7 @@ if __name__ == "__main__":
|
||||
locker.set()
|
||||
|
||||
html_converter = Docx2LibreHTML(file_path=docx_file_path,
|
||||
logger=logger_object, libre_locker=locker)
|
||||
logger=logger_object, libre_locker=locker)
|
||||
|
||||
parser = HTMLDocxPreprocessor(html_converter.html_soup, logger_object)
|
||||
content, footnotes, top_level_headers = parser.process_html(
|
||||
|
||||
@@ -1,13 +1,14 @@
|
||||
import re
|
||||
from bs4 import BeautifulSoup, NavigableString
|
||||
from typing import List
|
||||
from bs4 import BeautifulSoup, Tag, NavigableString
|
||||
|
||||
|
||||
def _clean_footnote_content(content):
|
||||
def _clean_footnote_content(content: str) -> str:
|
||||
content = content.strip()
|
||||
return content.strip()
|
||||
|
||||
|
||||
def process_footnotes(body_tag):
|
||||
def process_footnotes(body_tag: Tag) -> List[str]:
|
||||
"""Function returns list of footnotes and delete them from html_soup."""
|
||||
footnote_anchors = body_tag.find_all("a", class_="sdfootnoteanc")
|
||||
footnote_content = body_tag.find_all(
|
||||
@@ -32,7 +33,7 @@ def process_footnotes(body_tag):
|
||||
|
||||
new_tag = BeautifulSoup(features="lxml").new_tag("sup")
|
||||
new_tag["class"] = "footnote-element"
|
||||
new_tag["data-id"] = i + 1
|
||||
new_tag["data-id"] = f"{i + 1}"
|
||||
new_tag["id"] = f"footnote-{i + 1}"
|
||||
new_tag.string = "*"
|
||||
anc_tag.replace_with(new_tag)
|
||||
@@ -67,7 +68,6 @@ def process_footnotes(body_tag):
|
||||
|
||||
content = _clean_footnote_content(unicode_string)
|
||||
cont_tag.decompose()
|
||||
|
||||
footnotes.append(content)
|
||||
|
||||
return footnotes
|
||||
|
||||
@@ -1,27 +1,25 @@
|
||||
import re
|
||||
import logging
|
||||
from typing import List
|
||||
|
||||
from bs4 import BeautifulSoup, NavigableString, Tag
|
||||
import pathlib
|
||||
from typing import List, Dict, Union
|
||||
from bs4 import BeautifulSoup, Tag, NavigableString
|
||||
|
||||
from src.livecarta_config import LiveCartaConfig
|
||||
from src.util.helpers import BookLogger, BookStatusWrapper
|
||||
from src.util.helpers import BookLogger
|
||||
from src.docx_converter.footnotes_processing import process_footnotes
|
||||
from src.docx_converter.image_processing import process_images
|
||||
|
||||
|
||||
class HTMLDocxPreprocessor:
|
||||
|
||||
def __init__(self, html_soup, logger_object, status_wrapper=None):
|
||||
|
||||
def __init__(self, html_soup: BeautifulSoup, logger_object: BookLogger):
|
||||
self.body_tag = html_soup.body
|
||||
self.html_soup = html_soup
|
||||
self.logger_object: BookLogger = logger_object
|
||||
self.status_wrapper: BookStatusWrapper = status_wrapper
|
||||
self.logger_object = logger_object
|
||||
self.top_level_headers = None
|
||||
self.content = list()
|
||||
|
||||
def _process_toc_links(self):
|
||||
def _check_parent_link_exist_in_toc(tag_with_link):
|
||||
def _check_parent_link_exist_in_toc(tag_with_link: Tag) -> bool:
|
||||
toc_links = []
|
||||
for a_tag in tag_with_link.find_all("a", {"name": re.compile(r"^_Toc\d+")}):
|
||||
link_name = a_tag.attrs["name"]
|
||||
@@ -90,7 +88,7 @@ class HTMLDocxPreprocessor:
|
||||
u[0].unwrap()
|
||||
|
||||
@classmethod
|
||||
def convert_pt_to_px(cls, value):
|
||||
def convert_pt_to_px(cls, value: float) -> float:
|
||||
value = float(value)
|
||||
if value == LiveCartaConfig.WORD_DEFAULT_FONT_SIZE:
|
||||
return LiveCartaConfig.LIVECARTA_DEFAULT_FONT_SIZE
|
||||
@@ -344,11 +342,11 @@ class HTMLDocxPreprocessor:
|
||||
for div in divs:
|
||||
div.unwrap()
|
||||
|
||||
def _get_top_level_headers(self):
|
||||
def _get_top_level_headers(self) -> List[Dict[str, Union[str, bool]]]:
|
||||
"""
|
||||
Function for gathering info about top-level chapters.
|
||||
|
||||
Assume:
|
||||
Assume: _
|
||||
- Headers with the smallest outline(or digit in <h>) are top level chapters.
|
||||
[ It is consistent with a recursive algorithm
|
||||
for saving content to a resulted json structure,
|
||||
@@ -375,7 +373,7 @@ class HTMLDocxPreprocessor:
|
||||
number = re.match(r"^(?:\.?\d+\.? ?)+", title)
|
||||
is_numbered = number is not None
|
||||
|
||||
cleaned_title = re.sub(r"[\s\xa0]", " ", tag.text)
|
||||
cleaned_title = re.sub(r"[\s\xa0]", " ", tag.text)
|
||||
is_introduction = cleaned_title.lower() == "introduction"
|
||||
|
||||
headers_info.append({
|
||||
@@ -422,7 +420,7 @@ class HTMLDocxPreprocessor:
|
||||
features="lxml"), cleaned, NavigableString)
|
||||
tag.replace_with(this)
|
||||
|
||||
def apply_func_to_last_child(self, tag, func=None):
|
||||
def apply_func_to_last_child(self, tag: Union[NavigableString, Tag], func=None):
|
||||
"""
|
||||
works only with constructions like (((child to work with)))
|
||||
where child is object of NavigableString
|
||||
@@ -457,10 +455,9 @@ class HTMLDocxPreprocessor:
|
||||
[tag.unwrap() for tag in b_tags]
|
||||
|
||||
spans = tag.find_all("span")
|
||||
|
||||
if spans:
|
||||
for span in spans:
|
||||
style = span.attrs.get("style")
|
||||
span.unwrap()
|
||||
[span.unwrap() for span in spans]
|
||||
tag.attrs = {}
|
||||
|
||||
header_tags = self.body_tag.find_all(re.compile("^h[1-9]$"))
|
||||
@@ -472,7 +469,7 @@ class HTMLDocxPreprocessor:
|
||||
while tag.parent.name == "ol":
|
||||
tag.parent.unwrap()
|
||||
|
||||
cleaned_title = re.sub(r"[\s\xa0]", " ", tag.text)
|
||||
cleaned_title = re.sub(r"[\s\xa0]", " ", tag.text)
|
||||
if cleaned_title == "":
|
||||
tag.unwrap()
|
||||
else:
|
||||
@@ -488,7 +485,7 @@ class HTMLDocxPreprocessor:
|
||||
|
||||
content[0] = "" if content[0] == " " else content[0]
|
||||
content = [item for item in content if item != ""]
|
||||
|
||||
|
||||
for i, item in enumerate(content):
|
||||
if type(content[i]) is NavigableString:
|
||||
cleaned = re.sub(r"(\s+)+", " ", content[i])
|
||||
@@ -526,62 +523,54 @@ class HTMLDocxPreprocessor:
|
||||
ind = self.content.index(toc_tag) + 1
|
||||
self.content = self.content[ind:]
|
||||
|
||||
def process_html(self, access=None, html_path="", book_id=0):
|
||||
def process_html(self, access=None, html_path: pathlib.Path = "", book_id: int = 0):
|
||||
"""Process html code to satisfy LiveCarta formatting."""
|
||||
self.logger_object.log("Beginning of processing .html file.")
|
||||
try:
|
||||
self.logger_object.log(f"Processing TOC and headers.")
|
||||
self._process_toc_links()
|
||||
|
||||
self.clean_trash()
|
||||
self.logger_object.log(f"Processing TOC and headers.")
|
||||
self._process_toc_links()
|
||||
|
||||
# process main elements of the .html doc
|
||||
self.logger_object.log(f"Processing main elements of html.")
|
||||
self._preprocessing_headings()
|
||||
self._process_paragraph()
|
||||
self._process_two_columns()
|
||||
self.clean_trash()
|
||||
|
||||
self.logger_object.log("Block quotes processing.")
|
||||
self._process_quotes()
|
||||
# process main elements of the .html doc
|
||||
self.logger_object.log(f"Processing main elements of html.")
|
||||
self._preprocessing_headings()
|
||||
self._process_paragraph()
|
||||
self._process_two_columns()
|
||||
|
||||
self.logger_object.log("Tables processing.")
|
||||
self._process_tables()
|
||||
self.logger_object.log(
|
||||
f"{self.tables_amount} tables have been processed.")
|
||||
self.logger_object.log("Block quotes processing.")
|
||||
self._process_quotes()
|
||||
|
||||
self.logger_object.log("Hrefs processing.")
|
||||
self._process_hrefs()
|
||||
self.logger_object.log("Tables processing.")
|
||||
self._process_tables()
|
||||
self.logger_object.log(
|
||||
f"{self.tables_amount} tables have been processed.")
|
||||
|
||||
self.logger_object.log("Footnotes processing.")
|
||||
self.footnotes = process_footnotes(self.body_tag)
|
||||
self.logger_object.log(
|
||||
f"{len(self.footnotes)} footnotes have been processed.")
|
||||
self.logger_object.log("Hrefs processing.")
|
||||
self._process_hrefs()
|
||||
|
||||
self.logger_object.log("Image processing.")
|
||||
self.images = process_images(access=access, html_path=html_path,
|
||||
book_id=book_id, body_tag=self.body_tag)
|
||||
self.logger_object.log(
|
||||
f"{len(self.images)} images have been processed.")
|
||||
self.logger_object.log("Footnotes processing.")
|
||||
self.footnotes = process_footnotes(self.body_tag)
|
||||
self.logger_object.log(
|
||||
f"{len(self.footnotes)} footnotes have been processed.")
|
||||
|
||||
self._process_footer()
|
||||
self._process_div()
|
||||
self.logger_object.log("Image processing.")
|
||||
self.images = process_images(access, path_to_html=html_path,
|
||||
book_id=book_id, body_tag=self.body_tag)
|
||||
self.logger_object.log(
|
||||
f"{len(self.images)} images have been processed.")
|
||||
|
||||
self.top_level_headers = self._get_top_level_headers()
|
||||
self._mark_introduction_headers()
|
||||
self._process_footer()
|
||||
self._process_div()
|
||||
|
||||
self._process_headings()
|
||||
self.top_level_headers = self._get_top_level_headers()
|
||||
self._mark_introduction_headers()
|
||||
|
||||
self._process_lists()
|
||||
# delete text before table of content if exists
|
||||
self.delete_content_before_toc()
|
||||
self._process_headings()
|
||||
|
||||
except Exception as exc:
|
||||
self.logger_object.log(
|
||||
"Error has occurred while processing html.", logging.ERROR)
|
||||
self.logger_object.log_error_to_main_log()
|
||||
if self.status_wrapper:
|
||||
self.status_wrapper.set_error()
|
||||
raise exc
|
||||
self._process_lists()
|
||||
# delete text before table of content if exists
|
||||
self.delete_content_before_toc()
|
||||
|
||||
self.logger_object.log("End of processing .html file.")
|
||||
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import os
|
||||
import pathlib
|
||||
from bs4 import Tag
|
||||
from typing import Union, List
|
||||
from shutil import copyfile
|
||||
|
||||
|
||||
@@ -22,7 +23,7 @@ def save_image_locally(img_file_path: str, book_id: int) -> pathlib.Path:
|
||||
return img_folder_path
|
||||
|
||||
|
||||
def process_images(access, path_to_html: str, book_id: int, body_tag: Tag):
|
||||
def process_images(access, path_to_html: Union[pathlib.Path, str], book_id: int, body_tag: Tag) -> List:
|
||||
"""
|
||||
Function to process <img> tag.
|
||||
Img should be sent Amazon S3 and then return new tag with valid link.
|
||||
@@ -33,8 +34,8 @@ def process_images(access, path_to_html: str, book_id: int, body_tag: Tag):
|
||||
for img in img_tags:
|
||||
path_to_img_from_html = img.attrs.get("src")
|
||||
# quick fix for bad links
|
||||
if (len(path_to_img_from_html) >= 3) and path_to_img_from_html [:3] == "../":
|
||||
path_to_img_from_html = path_to_img_from_html [3:]
|
||||
if (len(path_to_img_from_html) >= 3) and path_to_img_from_html[:3] == "../":
|
||||
path_to_img_from_html = path_to_img_from_html[3:]
|
||||
html_folder = os.path.dirname(path_to_html)
|
||||
path_to_img_from_root = os.path.normpath(os.path.join(
|
||||
html_folder, path_to_img_from_html)).replace("\\", "/")
|
||||
|
||||
@@ -1,12 +1,15 @@
|
||||
import re
|
||||
import logging
|
||||
from copy import copy
|
||||
from typing import List, Tuple, Dict, Union
|
||||
from bs4 import Tag
|
||||
|
||||
from src.livecarta_config import LiveCartaConfig
|
||||
|
||||
|
||||
class LibreHTML2JSONConverter:
|
||||
def __init__(self, content, footnotes, top_level_headers, logger_object, book_api_status=None):
|
||||
def __init__(self, content: List[Tag], footnotes: List[str], top_level_headers: List[Dict[str, Union[str, bool]]],
|
||||
logger_object, book_api_status=None):
|
||||
self.content_dict = None
|
||||
self.content = content
|
||||
self.footnotes = footnotes
|
||||
@@ -33,7 +36,7 @@ class LibreHTML2JSONConverter:
|
||||
return new_text
|
||||
|
||||
# TODO: rethink the function structure without indexes.
|
||||
def header_to_livecarta_chapter_item(self, ind) -> (dict, int):
|
||||
def header_to_livecarta_chapter_item(self, ind: int) -> Union[Tuple[Dict[str, Union[str, List]], int], str]:
|
||||
"""
|
||||
Function process header and collects all content for it.
|
||||
Parameters
|
||||
@@ -90,7 +93,7 @@ class LibreHTML2JSONConverter:
|
||||
return ""
|
||||
|
||||
@staticmethod
|
||||
def _is_empty_p_tag(tag):
|
||||
def _is_empty_p_tag(tag: Tag) -> bool:
|
||||
if tag.name != "p":
|
||||
return False
|
||||
|
||||
@@ -102,7 +105,6 @@ class LibreHTML2JSONConverter:
|
||||
text = re.sub(r"\s+", "", temp_tag.text)
|
||||
if text:
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def convert_to_dict(self):
|
||||
@@ -148,9 +150,7 @@ class LibreHTML2JSONConverter:
|
||||
# Add is_introduction field to json structure
|
||||
# after deleting content before toc, some chapters can be deleted
|
||||
if self.top_level_headers:
|
||||
same_first_titles = self.top_level_headers[0]["title"] == json_strc[0]["title"]
|
||||
is_first_header_introduction = not self.top_level_headers[0]["should_be_numbered"]
|
||||
|
||||
json_strc[0]["is_introduction"] = is_first_header_introduction
|
||||
|
||||
self.content_dict = {
|
||||
|
||||
Reference in New Issue
Block a user