This repository has been archived on 2026-04-06. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
BookConverter/src/book.py
2020-04-16 13:26:19 +03:00

801 lines
29 KiB
Python

import codecs
import json
import logging
import os
import pathlib
import re
from copy import copy
from shutil import copyfile
from bs4 import BeautifulSoup
class Book:
# Main constant values
DEFAULT_FONT_NAME = 'Times New Roman'
DEFAULT_ALIGN_STYLE = 'left'
WORD_DEFAULT_FONT_SIZE = 11
LAWCARTA_DEFAULT_FONT_SIZE = 18
FONT_CONVERT_RATIO = LAWCARTA_DEFAULT_FONT_SIZE / WORD_DEFAULT_FONT_SIZE
font_correspondence_table = {
"Arial": "arial,helvetica,sans-serif",
"Comic Sans MS": "comic sans ms,cursive",
"Courier New": "courier new,courier,monospace",
"Georgia": "georgia,serif",
"Lucida Sans Unicode": "lucida sans unicode,lucida grande,sans-serif",
"Tahoma": "tahoma,geneva,sans-serif",
"Times New Roman": "times new roman,times,serif",
"Trebuchet MS": "trebuchet ms,helvetica,sans-serif",
"Verdana": "verdana,geneva,sans-serif"
}
SUPPORTED_LEVELS = 3
SUPPORTED_HEADERS = {"h1", "h2", "h3"}
HEADERS_LEVELS = {"h1", "h2", "h3", "h4", "h5", "h6", "h7", "h8", "h9"}
def __init__(self, book_id=0, access=None, file_path=None, output_path=None, main_logger=None):
self.book_id = book_id
self.access = access
self.file_path = file_path
self.output_path = output_path
self.main_logger = main_logger
self.logger = None
self.html_soup = None
self.body_tag = None
self.content = list()
self.footnotes = list()
self.images = list()
self.content_dict = dict()
assert self.SUPPORTED_LEVELS == len(self.SUPPORTED_HEADERS), \
"Length of headers doesn't match allowd levels."
def configure_file_logger(self, name, attr_name='logger', filename='logs/book_log.log', filemode='w+',
logging_level=logging.INFO, logging_format='%(asctime)s - %(message)s'):
"""
Method for Logger configuration. Logger will write in file.
:param name: name of the Logger.
:param attr_name: name of attribute that will be added to self.
:param filename: name of the log file.
:param filemode: mode of opening log file.
:param logging_level: logging level: 10 - debug, 20 - info, 30 - warning, 40 - error, 50 - critical.
:param logging_format: format of record in log file.
"""
logger = logging.getLogger(name)
folder_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if self.book_id:
filename = f'logs/{self.book_id}_log.log'
file_path = os.path.join(folder_path, filename)
file_handler = logging.FileHandler(file_path, mode=filemode)
# file_format = logging.Formatter(fmt=logging_format, datefmt=date_format)
file_format = logging.Formatter(fmt=logging_format)
file_handler.setFormatter(file_format)
logger.addHandler(file_handler)
logger.setLevel(logging_level)
setattr(self, attr_name, logger)
def log(self, message, logging_level=20):
"""
Method for logging.
:param message: body of the message
:param logging_level: level of logging
"""
self.logger.log(msg=message, level=logging_level)
def log_error_to_main_log(self, message=''):
"""
Method for logging error to main log file.
"""
if self.main_logger:
if not message:
message = f'Error in book conversion. Check {self.book_id}_log.log file.'
self.main_logger.error(message)
def save_docx(self, content):
"""
Save binary content of file to .docx.
:param content: binary content of the file.
"""
folder_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
folder_path = os.path.join(folder_path, 'docx')
file_path = os.path.join(folder_path, f'{self.book_id}.docx')
try:
with open(file_path, 'wb+') as file:
file.write(content)
self.log(f'File was saved to folder: {folder_path}.')
except Exception as exc:
self.log("Error in writing docx file.", logging.ERROR)
self.log_error_to_main_log()
raise exc
self.file_path = pathlib.Path(file_path)
def get_docx(self):
"""
Method for getting and saving book from queue.
"""
try:
self.log(f'Start receiving file from server. URL: {self.access.url}/doc-convert/{self.book_id}/file')
content = self.access.get_doc(self.book_id)
self.log('File was received from server.')
self.save_docx(content)
except FileNotFoundError as ferr:
self.log("Can't get docx from server.", logging.ERROR)
self.log_error_to_main_log()
raise ferr
except Exception as exc:
raise exc
def set_process_status(self):
try:
if self.access:
self.access.update_status(self.book_id, self.access.PROCESS)
self.log(f'Status has been updated to [PROCESS].')
except Exception as exc:
self.log("Can't update status of the book [PROCESS].", logging.ERROR)
self.log_error_to_main_log()
raise exc
def set_generate_status(self):
try:
if self.access:
self.access.update_status(self.book_id, self.access.GENERATE)
self.log(f'Status has been updated to [GENERATE].')
except Exception as exc:
self.log("Can't update status of the book [GENERATE].", logging.ERROR)
self.log_error_to_main_log()
raise exc
def set_error_status(self):
try:
if self.access:
self.access.update_status(self.book_id, self.access.ERROR)
self.log(f'Status has been updated to [ERROR].')
except Exception as exc:
self.log("Can't update status of the book [ERROR].", logging.ERROR)
self.log_error_to_main_log()
raise exc
def convert_doc_to_html(self):
"""
Method for convert .docx document to .html file.
"""
self.log(f'File - {self.file_path}.')
print(f'{self.file_path}')
self.log('Beginning of conversion from .docx to .html.')
try:
f = open(self.file_path)
f.close()
except FileNotFoundError as error:
self.log('Invalid path to input data.', logging.ERROR)
self.set_error_status()
raise error
folder_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
out_dir_path = os.path.join(folder_path, f'html/{self.book_id}')
try:
command = f'libreoffice --headless --convert-to html "{str(self.file_path)}" --outdir {out_dir_path}'
os.system(command)
except Exception as exc:
self.log("Conversion has gone wrong. Libreoffice is not installed.", logging.ERROR)
self.log_error_to_main_log()
self.set_error_status()
raise exc
out_dir_path = os.path.join(out_dir_path, f'{self.file_path.stem}.html')
self.file_path = pathlib.Path(out_dir_path)
try:
f = open(self.file_path)
f.close()
except FileNotFoundError as exc:
self.log("Conversion has gone wrong. HTML file doesn't exist.", logging.ERROR)
self.log_error_to_main_log()
self.set_error_status()
raise exc
self.log('End of conversion from .docx to .html.')
self.log(f'Input file path after conversion: {self.file_path}.')
def check_output_directory(self):
if self.output_path is None:
folder_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
output_path = os.path.join(folder_path, f'json/{self.file_path.stem}.json')
self.output_path = output_path
self.output_path = pathlib.Path(self.output_path)
self.log(f'Output file path: {self.output_path}')
pathlib.Path(self.output_path).parent.mkdir(parents=True, exist_ok=True)
self.output_path.touch(exist_ok=True)
def read_html(self):
"""
Method for reading .html file into beautiful soup tag.
"""
try:
html_text = open(self.file_path, 'r', encoding='utf8').read()
self.log('HTML for book has been loaded.')
except FileNotFoundError as exc:
self.log('There is no html to process. Conversion went wrong or you specified wrong paths.', logging.ERROR)
self.log_error_to_main_log()
self.set_error_status()
raise exc
self.html_soup = BeautifulSoup(html_text, features='lxml')
self.body_tag = self.html_soup.body
def _clean_tag(self, tag, attr_name, attr_value):
"""
Function to clean tags by its name and attribute value.
:param tag: Tag name to clean.
:param attr_name: Attribute name.
:param attr_value: Attribute value.
"""
tags = self.body_tag.find_all(tag, {attr_name: attr_value})
for tag in tags:
if len(tag.attrs) == 1:
tag.unwrap()
def _clean_underline_links(self):
"""
Function cleans meaningless <u> tags before links.
"""
underlines = self.body_tag.find_all("u")
for u in underlines:
if u.find_all('a'):
u.unwrap()
links = self.body_tag.find_all('a')
for link in links:
u = link.find_all('u')
if u and len(u) == 1:
u[0].unwrap()
@classmethod
def convert_pt_to_px(cls, style):
"""
Method converts point in the font-size to pixels.
:param style: Str with style to process.
:return: Str with converted style.
"""
size = re.search(r"font-size: (\d{1,3})pt", style)
if size is None:
return style
size = size.group(1)
new_size = round(cls.FONT_CONVERT_RATIO * float(size))
if new_size == cls.LAWCARTA_DEFAULT_FONT_SIZE:
return ""
return re.sub(size + "pt", str(new_size) + "px", style)
def _font_to_span(self):
"""
Function to convert <font> tag to <span>. If font style is default, then remove this tag.
"""
fonts = self.body_tag.find_all("font")
for font in fonts:
face = font.get("face")
style = font.get("style")
font.attrs = {}
font.name = "span"
if style:
style = self.convert_pt_to_px(style)
if style != "":
font.attrs["style"] = style
if face is not None:
face = re.sub(r",[\w,\- ]*$", "", face)
if face != self.DEFAULT_FONT_NAME and self.font_correspondence_table.get(face):
font.attrs["face"] = self.font_correspondence_table[face]
if len(font.attrs) == 0:
font.unwrap()
assert len(self.body_tag.find_all("font")) == 0 # on this step there should be no more <font> tags
def _remove_table_of_contents(self):
"""
Function to remove table of content from file.
"""
tables = self.body_tag.find_all("div", id=re.compile(r'^Table of Contents\d+'))
for table in tables:
table.decompose()
def _change_table_of_contents(self):
tables = self.body_tag.find_all("div", id=re.compile(r'^Table of Contents\d+'))
for table in tables:
table.wrap(self.html_soup.new_tag("TOC"))
table.decompose()
def delete_content_before_toc(self):
toc_tag = self.html_soup.new_tag('TOC')
if toc_tag in self.content:
ind = self.content.index(toc_tag) + 1
self.content = self.content[ind:]
self.write_html_from_list()
def clean_trash(self):
"""
Function to remove all styles and tags we don't need.
"""
self._clean_tag('span', 'style', re.compile(r'^background: #[0-9a-fA-F]{6}$'))
self._clean_tag('span', 'lang', re.compile(r'^ru-RU$')) # todo: check for another languages
self._clean_tag('span', 'style', re.compile('^letter-spacing: -?[\d\.]+pt$'))
self._clean_tag('font', 'color', re.compile(r'^#[0-9a-fA-F]{6}$'))
self._clean_tag('font', 'face', re.compile(r'^Times New Roman[\w, ]+$'))
self._clean_tag("a", "name", "_GoBack")
self._clean_underline_links()
self._font_to_span()
# self._remove_table_of_contents()
self._change_table_of_contents()
def _process_paragraph(self):
"""
Function to process <p> tags (text-align and text-indent value).
"""
paragraphs = self.body_tag.find_all('p')
for p in paragraphs:
align = p.get('align')
style = p.get('style')
if style:
indent = re.search(r'text-indent: ([\d\.]{1,4})in', style)
margin_left = re.search(r'margin-left: ([\d\.]{1,4})in', style)
margin_right= re.search(r'margin-right: ([\d\.]{1,4})in', style)
margin_top = re.search(r'margin-top: ([\d\.]{1,4})in', style)
margin_bottom = re.search(r'margin-bottom: ([\d\.]{1,4})in', style)
else:
indent = None
margin_left = None
margin_right = None
margin_top = None
margin_bottom = None
if margin_left and margin_right and margin_top and margin_bottom and \
margin_left.group(1) == '0.6' and margin_right.group(1) == '0.6' and \
margin_top.group(1) == '0.14' and margin_bottom.group(1) == '0.11':
p.wrap(BeautifulSoup(features='lxml').new_tag('blockquote'))
p.attrs = {}
style = ''
if align is not None and align != self.DEFAULT_ALIGN_STYLE:
style += f'text-align: {align};'
if indent is not None:
indent = indent.group(1)
style += f'text-indent: {indent}in;'
if style:
p.attrs['style'] = style
def _process_two_columns(self):
"""
Function to process paragraphs which has two columns layout.
"""
two_columns = self.body_tag.find_all("div", style="column-count: 2")
for div in two_columns:
for child in div.children:
if child.name == "p":
child["class"] = "columns2"
div.unwrap()
# def _process_quotes(self):
# """
# Function to process <dl> tags. All tags will be replaced with <blockquote> tags.
# """
# dls = self.body_tag.find_all('dl')
#
# for dl in dls:
# pars = dl.find_all('p')
# for p in pars:
# p.wrap(BeautifulSoup(features='lxml').new_tag('blockquote'))
# new_div = BeautifulSoup(features='lxml').new_tag('div')
# for p in pars:
# new_div.append(p.parent)
# dl.replaceWith(new_div)
@staticmethod
def _clean_footnote_content(content):
content = content.strip()
content = re.sub(r'^\d+ ?', '', content)
return content.strip()
def _process_footnotes(self):
"""
Function returns list of footnotes and delete them from html_soup.
"""
footnote_ancors = self.body_tag.find_all('a', class_='sdfootnoteanc')
footnote_content = self.body_tag.find_all('div', id=re.compile(r'^sdfootnote\d+$'))
footnote_amt = len(footnote_ancors)
assert footnote_amt == len(footnote_content)
footnotes = []
for i, (anc_tag, cont_tag) in enumerate(zip(footnote_ancors, footnote_content)):
assert anc_tag['name'] == cont_tag.find('a')['href'][1:]
new_tag = BeautifulSoup(features='lxml').new_tag('sup')
new_tag['class'] = 'footnote-element'
new_tag['data-id'] = i + 1
new_tag['id'] = f'footnote-{i + 1}'
new_tag.string = '*'
anc_tag.replace_with(new_tag)
content = self._clean_footnote_content(cont_tag.text)
# new_tag = BeautifulSoup(features="lxml").new_tag('div')
# new_tag['class'] = 'footnote-element'
# new_tag['data-id'] = f'"{i}"'
# new_tag['id'] = f'footnote-{i}'
# new_tag.string = content
# footnotes.append(str(new_tag))
footnotes.append(content)
# i += 1
self.footnotes = footnotes
def _process_images(self):
"""
Funcction to process <img> tag. Img should be sent Amazon S3 and then return new tag with valid link.
For now images are moved to one folder.
"""
imgs = self.body_tag.find_all('img')
if len(imgs):
if self.access is None:
folder_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
new_path = pathlib.Path(os.path.join(folder_path, f'json/img_{self.file_path.stem}/'))
new_path.mkdir(exist_ok=True)
for img in imgs:
img_name = img.attrs.get('src')
img_path = pathlib.Path(f'{self.file_path.parent}/{img_name}')
if self.access is not None:
link = self.access.send_image(img_path, self.book_id)
img.attrs['src'] = link
self.log(f'{img_name} successfully uploaded.')
else:
img_size = os.path.getsize(img_path)
print(f'{img_name} successfully loaded. Image size: {img_size}.')
new_img_path = new_path / img_name
copyfile(img_path, new_img_path)
img.attrs["src"] = str(new_img_path)
self.images = imgs
def _process_footer(self):
"""
Function to process <div title="footer"> tags.
All the tags will be deleted from file.
"""
divs = self.body_tag.find_all('div', {'title': 'footer'})
for div in divs:
div.decompose()
def _process_div(self):
"""
Function to process <div> tags. All the tags will be deleted from file, all content of the tags will stay.
"""
divs = self.body_tag.find_all("div")
for div in divs:
div.unwrap()
def _process_toc_links(self):
"""
Function to extract nodes which contains TOC links, remove links from file and detect headers.
"""
toc_links = self.body_tag.find_all("a", {'name': re.compile(r'^_Toc\d+')})
headers = [link.parent for link in toc_links]
outline_level = "1" # All the unknown outlines will be predicted as <h1>
for tag in headers:
if re.search(r"^h\d$", tag.name):
tag.a.unwrap()
# outline_level = tag.name[-1] # TODO: add prediction of the outline level
# TODO: escape from recounting paragraphs every time
elif tag.name == "p":
if tag in self.body_tag.find_all("p"):
new_tag = BeautifulSoup(features="lxml").new_tag("h" + outline_level)
text = tag.text
tag.replaceWith(new_tag)
new_tag.string = text
else:
# rethink document structure when you have toc_links, other cases?
self.logger.warning(f'Something went wrong in processing toc_links. Check the structure of the file. '
f'Tag name: {tag.name}')
@staticmethod
def clean_header_title(title):
"""
Function to remove digits and extra spaces from headers.
:param title: Title to process.
"""
title = re.sub(r'\s+', ' ', title).strip()
title = re.sub(r'^(?:\.?\d+\.? ?)+', '', title)
# title = re.sub(r'^(?:\.?[MDCLXVIclxvi]+\.? ?)+ ', '', title) # delete chapter numbering from the title
title = re.sub(r'^(?:[A-Za-z]\. ?)+', '', title)
return title.strip()
def _preprocessing_headings(self):
"""
Function to convert all lower level headings to p tags
"""
pattern = f'^h[{self.SUPPORTED_LEVELS + 1}-9]$'
header_tags = self.body_tag.find_all(re.compile(pattern))
for tag in header_tags:
tag.name = 'p'
def _process_headings(self):
"""
Function to process tags <h>.
"""
header_tags = self.body_tag.find_all(re.compile("^h[1-9]$"))
for tag in header_tags:
if tag.parent.name == "li":
tag.parent.unwrap()
while tag.parent.name == "ol":
tag.parent.unwrap()
title = tag.text
title = self.clean_header_title(title)
if title == "":
tag.unwrap()
else:
assert tag.name in self.SUPPORTED_HEADERS, \
f'Preprocessing went wrong, there is still h{self.SUPPORTED_LEVELS + 1}-h9 headings.'
# if tag.name in ["h4", "h5", "h6"]:
# tag.name = "h3" # All the lower level headings will be transformed to h3 headings
new_tag = BeautifulSoup(features='lxml').new_tag(name=tag.name)
new_tag.string = title
tag.replace_with(new_tag)
def write_html_from_list(self, file_name='url_test.html'):
folder_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
file_path = pathlib.Path(os.path.join(folder_path, file_name))
with open(file_path, 'w', encoding='utf-8') as f_out:
# f_out.write("".join([tag.prettify() for tag in self.content]))
f_out.write(self.body_tag.prettify())
self.logger.info(f'Check test file - url_test.html.')
def process_html(self):
"""
Process html code to satisfy LawCarta formatting.
"""
self.log('Beginning of processing .html file.')
try:
self.clean_trash()
# process main elements of the .html doc
self.log(f'Processing main elements of html.')
self._preprocessing_headings()
self._process_paragraph()
self._process_two_columns()
# self._process_quotes()
self.log('Footnotes processing.')
self._process_footnotes()
self.log(f'{len(self.footnotes)} footnotes have been processed.')
self.log('Image processing.')
self._process_images()
self.log(f'{len(self.images)} images have been processed.')
self._process_footer()
self._process_div()
self.content = self.body_tag.find_all(recursive=False)
self.log(f'Processing TOC and headers.')
self._process_toc_links()
self._process_headings()
self.content = self.body_tag.find_all(recursive=False)
# delete text before table of content if exists
self.delete_content_before_toc()
except Exception as exc:
self.log('Error has occurred while processing html.', logging.ERROR)
self.log_error_to_main_log()
self.set_error_status()
raise exc
self.log('End of processing .html file.')
@staticmethod
def format_html(html_text):
"""
Function to remove useless symbols from html code.
:param html_text: Text to process.
:return: Cleaned text.
"""
new_text = re.sub(r'([\n\t])', ' ', html_text)
return new_text
# TODO: rethink the function structure without indexes.
def header_to_json(self, ind):
"""
Function process header and collects all content for it.
:param ind: Index of header in content list.
"""
if self.content[ind].name in self.SUPPORTED_HEADERS:
title = self.content[ind].text
curr_outline = int(re.sub(r"^h", "", self.content[ind].name)) # extract outline from tag
result = {title: []}
ch_content = []
ind += 1
while ind < len(self.content):
if self.content[ind].name in self.SUPPORTED_HEADERS:
outline = int(re.sub(r"^h", "", self.content[ind].name))
if outline > curr_outline:
res, ind = self.header_to_json(ind)
if ch_content:
result[title].append("".join(ch_content))
ch_content = []
result[title].append(res)
else:
# return result, ind
break
else:
res = self.format_html(str(self.content[ind]))
# result[title].append(res)
ch_content.append(res)
ind += 1
if ch_content:
result[title].append("".join(ch_content))
return result, ind
return ''
@staticmethod
def _is_empty_p_tag(tag):
if tag.name != 'p':
return False
temp_tag = copy(tag)
brs = temp_tag.find_all('br')
for br in brs:
br.decompose()
text = re.sub(r'\s+', '', temp_tag.text)
if text:
return False
return True
def convert_to_json(self):
"""
Function which convert list of html nodes to appropriate json structure.
"""
json_strc = []
ind = 0
ch_num = 0
ch_amt = 0
try:
while ind < len(self.content):
res = {}
if self.content[ind].name in self.SUPPORTED_HEADERS:
res, ind = self.header_to_json(ind)
else:
chapter_title = f'Untitled chapter {ch_num}'
chapter = []
while ind < len(self.content) and self.content[ind].name not in self.SUPPORTED_HEADERS:
if not self._is_empty_p_tag(self.content[ind]):
chapter.append(self.format_html(str(self.content[ind])))
ind += 1
if chapter:
res = {chapter_title: ["".join(chapter)]}
ch_num += 1
if res:
json_strc.append(res)
ch_amt += 1
self.log(f'Chapter {ch_amt} has been added to structure.')
except Exception as exc:
self.log('Error has occurred while making json structure.', logging.ERROR)
self.log_error_to_main_log()
self.set_error_status()
raise exc
self.content_dict = {
"content": json_strc,
"footnotes": self.footnotes
}
def write_json(self):
try:
with codecs.open(self.output_path, 'w', encoding='utf-8') as f:
json.dump(self.content_dict, f, ensure_ascii=False)
self.log('Data has been saved to .json file.')
except Exception as exc:
self.log('Error has occurred while writing json file.', logging.ERROR)
# self.log_error_to_main_log()
# self.set_error_status()
# raise exc
def send_json_content(self):
try:
self.access.send_book(self.book_id, self.content_dict)
self.log(f'JSON data has been sent to server.')
except Exception as exc:
self.log('Error has occurred while sending json content.', logging.ERROR)
self.log_error_to_main_log()
self.set_error_status()
raise exc
def convert_from_html(self, logging_format):
self.configure_file_logger(f'{__name__}_{self.book_id}', logging_format=logging_format, filemode='w+')
self.read_html()
self.process_html()
self.convert_to_json()
self.write_json()
def test_conversion(self):
self.configure_file_logger(self.book_id, filemode='w+')
self.log('Beginning of the test.')
self.convert_doc_to_html()
self.check_output_directory()
self.read_html()
self.process_html()
self.convert_to_json()
self.write_json()
self.log('End of the test.')
def conversion(self, logging_format, filemode='w+'):
self.configure_file_logger(f'{__name__}_{self.book_id}', logging_format=logging_format, filemode=filemode)
self.log('Beginning of conversion from .docx to .json.')
self.get_docx()
self.set_process_status()
self.convert_doc_to_html()
self.check_output_directory()
self.read_html()
self.process_html()
self.set_generate_status()
self.convert_to_json()
self.write_json()
self.send_json_content()
self.log(f'End of the conversion to LawCarta format. Check {self.output_path}.')
if __name__ == "__main__":
folder_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
file_path = pathlib.Path(os.path.join(folder_path, 'html/0/quote_img.html'))
out_path = pathlib.Path(os.path.join(folder_path, 'json/quote_img.json'))
logging_format = '%(asctime)s - %(levelname)s - %(message)s'
book = Book(file_path=file_path, output_path=out_path)
book.convert_from_html(logging_format=logging_format)