This repository has been archived on 2026-04-06. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
BookConverter/src/book.py
shirshasa bbe690bf80 updated book conversion
- new resulted json structure
- fixed spelling
- added asserts messages
2020-06-03 12:40:08 +03:00

996 lines
36 KiB
Python

import codecs
import json
import logging
import os
import pathlib
import re
from copy import copy
from shutil import copyfile
from bs4 import BeautifulSoup
class Book:
# Main constant values
DEFAULT_FONT_NAME = 'Times New Roman'
DEFAULT_ALIGN_STYLE = 'left'
WORD_DEFAULT_FONT_SIZE = 11
LAWCARTA_DEFAULT_FONT_SIZE = 18
FONT_CONVERT_RATIO = LAWCARTA_DEFAULT_FONT_SIZE / WORD_DEFAULT_FONT_SIZE
font_correspondence_table = {
"Arial": "arial,helvetica,sans-serif",
"Comic Sans MS": "comic sans ms,cursive",
"Courier New": "courier new,courier,monospace",
"Georgia": "georgia,serif",
"Lucida Sans Unicode": "lucida sans unicode,lucida grande,sans-serif",
"Tahoma": "tahoma,geneva,sans-serif",
"Times New Roman": "times new roman,times,serif",
"Trebuchet MS": "trebuchet ms,helvetica,sans-serif",
"Verdana": "verdana,geneva,sans-serif"
}
SUPPORTED_LEVELS = 4
SUPPORTED_HEADERS = {"h1", "h2", "h3", "h4"}
HEADERS_LEVELS = {"h1", "h2", "h3", "h4", "h5", "h6", "h7", "h8", "h9"}
def __init__(self, book_id=0, access=None, file_path=None, output_path=None, main_logger=None):
self.book_id = book_id
self.access = access
self.file_path = file_path
self.output_path = output_path
self.main_logger = main_logger
self.logger = None
self.html_soup = None
self.body_tag = None
self.content = list()
self.footnotes = list()
self.images = list()
self.top_level_headers = None
self.content_dict = dict()
self.tables_amount = 0
assert self.SUPPORTED_LEVELS == len(self.SUPPORTED_HEADERS), \
"Length of headers doesn't match allowed levels."
def configure_file_logger(self, name, attr_name='logger', filename='logs/book_log.log', filemode='w+',
logging_level=logging.INFO, logging_format='%(asctime)s - %(message)s'):
"""
Method for Logger configuration. Logger will write in file.
:param name: name of the Logger.
:param attr_name: name of attribute that will be added to self.
:param filename: name of the log file.
:param filemode: mode of opening log file.
:param logging_level: logging level: 10 - debug, 20 - info, 30 - warning, 40 - error, 50 - critical.
:param logging_format: format of record in log file.
"""
logger = logging.getLogger(name)
folder_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if self.book_id:
filename = f'logs/{self.book_id}_log.log'
file_path = os.path.join(folder_path, filename)
file_handler = logging.FileHandler(file_path, mode=filemode)
# file_format = logging.Formatter(fmt=logging_format, datefmt=date_format)
file_format = logging.Formatter(fmt=logging_format)
file_handler.setFormatter(file_format)
logger.addHandler(file_handler)
logger.setLevel(logging_level)
setattr(self, attr_name, logger)
def log(self, message, logging_level=20):
"""
Method for logging.
:param message: body of the message
:param logging_level: level of logging
"""
self.logger.log(msg=message, level=logging_level)
def log_error_to_main_log(self, message=''):
"""
Method for logging error to main log file.
"""
if self.main_logger:
if not message:
message = f'Error in book conversion. Check {self.book_id}_log.log file.'
self.main_logger.error(message)
def save_docx(self, content):
"""
Save binary content of file to .docx.
:param content: binary content of the file.
"""
folder_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
folder_path = os.path.join(folder_path, 'docx')
file_path = os.path.join(folder_path, f'{self.book_id}.docx')
try:
with open(file_path, 'wb+') as file:
file.write(content)
self.log(f'File was saved to folder: {folder_path}.')
except Exception as exc:
self.log("Error in writing docx file.", logging.ERROR)
self.log_error_to_main_log()
raise exc
self.file_path = pathlib.Path(file_path)
def get_docx(self):
"""
Method for getting and saving book from queue.
"""
try:
self.log(f'Start receiving file from server. URL: {self.access.url}/doc-convert/{self.book_id}/file')
content = self.access.get_doc(self.book_id)
self.log('File was received from server.')
self.save_docx(content)
except FileNotFoundError as f_err:
self.log("Can't get docx from server.", logging.ERROR)
self.log_error_to_main_log()
raise f_err
except Exception as exc:
raise exc
def set_process_status(self):
try:
if self.access:
self.access.update_status(self.book_id, self.access.PROCESS)
self.log(f'Status has been updated to [PROCESS].')
except Exception as exc:
self.log("Can't update status of the book [PROCESS].", logging.ERROR)
self.log_error_to_main_log()
raise exc
def set_generate_status(self):
try:
if self.access:
self.access.update_status(self.book_id, self.access.GENERATE)
self.log(f'Status has been updated to [GENERATE].')
except Exception as exc:
self.log("Can't update status of the book [GENERATE].", logging.ERROR)
self.log_error_to_main_log()
raise exc
def set_error_status(self):
try:
if self.access:
self.access.update_status(self.book_id, self.access.ERROR)
self.log(f'Status has been updated to [ERROR].')
except Exception as exc:
self.log("Can't update status of the book [ERROR].", logging.ERROR)
self.log_error_to_main_log()
raise exc
def convert_doc_to_html(self):
"""
Method for convert .docx document to .html file.
"""
self.log(f'File - {self.file_path}.')
print(f'{self.file_path}')
self.log('Beginning of conversion from .docx to .html.')
try:
f = open(self.file_path)
f.close()
except FileNotFoundError as error:
self.log('Invalid path to input data.', logging.ERROR)
self.set_error_status()
raise error
folder_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
out_dir_path = os.path.join(folder_path, f'html/{self.book_id}')
try:
command = f'libreoffice --headless --convert-to html "{str(self.file_path)}" --outdir {out_dir_path}'
os.system(command)
except Exception as exc:
self.log("Conversion has gone wrong. Libreoffice is not installed.", logging.ERROR)
self.log_error_to_main_log()
self.set_error_status()
raise exc
out_dir_path = os.path.join(out_dir_path, f'{self.file_path.stem}.html')
self.file_path = pathlib.Path(out_dir_path)
try:
f = open(self.file_path)
f.close()
except FileNotFoundError as exc:
self.log("Conversion has gone wrong. HTML file doesn't exist.", logging.ERROR)
self.log_error_to_main_log()
self.set_error_status()
raise exc
self.log('End of conversion from .docx to .html.')
self.log(f'Input file path after conversion: {self.file_path}.')
def check_output_directory(self):
if self.output_path is None:
folder_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
output_path = os.path.join(folder_path, f'json/{self.file_path.stem}.json')
self.output_path = output_path
self.output_path = pathlib.Path(self.output_path)
self.log(f'Output file path: {self.output_path}')
pathlib.Path(self.output_path).parent.mkdir(parents=True, exist_ok=True)
self.output_path.touch(exist_ok=True)
def read_html(self):
"""
Method for reading .html file into beautiful soup tag.
"""
try:
html_text = open(self.file_path, 'r', encoding='utf8').read()
self.log('HTML for book has been loaded.')
except FileNotFoundError as exc:
self.log('There is no html to process. Conversion went wrong or you specified wrong paths.', logging.ERROR)
self.log_error_to_main_log()
self.set_error_status()
raise exc
self.html_soup = BeautifulSoup(html_text, features='lxml')
self.body_tag = self.html_soup.body
def _clean_tag(self, tag, attr_name, attr_value):
"""
Function to clean tags by its name and attribute value.
:param tag: Tag name to clean.
:param attr_name: Attribute name.
:param attr_value: Attribute value.
"""
tags = self.body_tag.find_all(tag, {attr_name: attr_value})
for tag in tags:
if len(tag.attrs) == 1:
tag.unwrap()
def _clean_underline_links(self):
"""
Function cleans meaningless <u> tags before links.
"""
underlines = self.body_tag.find_all("u")
for u in underlines:
if u.find_all('a'):
u.unwrap()
links = self.body_tag.find_all('a')
for link in links:
u = link.find_all('u')
if u and len(u) == 1:
u[0].unwrap()
@classmethod
def convert_pt_to_px(cls, value):
return round(cls.FONT_CONVERT_RATIO * float(value))
@classmethod
def convert_font_pt_to_px(cls, style):
"""
Method converts point in the font-size to pixels.
:param style: Str with style to process.
:return: Str with converted style.
"""
size = re.search(r"font-size: (\d{1,3})pt", style)
if size is None:
return style
size = size.group(1)
new_size = cls.convert_pt_to_px(size)
if new_size == cls.LAWCARTA_DEFAULT_FONT_SIZE:
return ""
return re.sub(size + "pt", str(new_size) + "px", style)
def _font_to_span(self):
"""
Function to convert <font> tag to <span>. If font style is default, then remove this tag.
"""
fonts = self.body_tag.find_all("font")
for font in fonts:
face = font.get("face")
style = font.get("style")
font.attrs = {}
font.name = "span"
if style:
style = self.convert_font_pt_to_px(style)
if style != "":
font.attrs["style"] = style
if face is not None:
face = re.sub(r",[\w,\- ]*$", "", face)
if face != self.DEFAULT_FONT_NAME and self.font_correspondence_table.get(face):
font.attrs["face"] = self.font_correspondence_table[face]
if len(font.attrs) == 0:
font.unwrap()
assert len(self.body_tag.find_all("font")) == 0 # on this step there should be no more <font> tags
def _remove_table_of_contents(self):
"""
Function to remove table of content from file.
"""
tables = self.body_tag.find_all("div", id=re.compile(r'^Table of Contents\d+'))
for table in tables:
table.decompose()
def _change_table_of_contents(self):
tables = self.body_tag.find_all("div", id=re.compile(r'^Table of Contents\d+'))
for table in tables:
table.wrap(self.html_soup.new_tag("TOC"))
table.decompose()
def delete_content_before_toc(self):
toc_tag = self.html_soup.new_tag('TOC')
if toc_tag in self.content:
ind = self.content.index(toc_tag) + 1
self.content = self.content[ind:]
self.write_html_from_list()
def clean_trash(self):
"""
Function to remove all styles and tags we don't need.
"""
self._clean_tag('span', 'style', re.compile(r'^background: #[0-9a-fA-F]{6}$'))
self._clean_tag('span', 'lang', re.compile(r'^ru-RU$')) # todo: check for another languages
self._clean_tag('span', 'style', re.compile('^letter-spacing: -?[\d\.]+pt$'))
self._clean_tag('font', 'color', re.compile(r'^#[0-9a-fA-F]{6}$'))
self._clean_tag('font', 'face', re.compile(r'^Times New Roman[\w, ]+$'))
self._clean_tag("a", "name", "_GoBack")
self._clean_underline_links()
self._font_to_span()
# self._remove_table_of_contents()
self._change_table_of_contents()
def _process_paragraph(self):
"""
Function to process <p> tags (text-align and text-indent value).
"""
paragraphs = self.body_tag.find_all('p')
for p in paragraphs:
align = p.get('align')
style = p.get('style')
if style:
indent = re.search(r'text-indent: ([\d\.]{1,4})in', style)
margin_left = re.search(r'margin-left: ([\d\.]{1,4})in', style)
margin_right= re.search(r'margin-right: ([\d\.]{1,4})in', style)
margin_top = re.search(r'margin-top: ([\d\.]{1,4})in', style)
margin_bottom = re.search(r'margin-bottom: ([\d\.]{1,4})in', style)
else:
indent = None
margin_left = None
margin_right = None
margin_top = None
margin_bottom = None
if margin_left and margin_right and margin_top and margin_bottom and \
margin_left.group(1) == '0.6' and margin_right.group(1) == '0.6' and \
margin_top.group(1) == '0.14' and margin_bottom.group(1) == '0.11':
p.wrap(BeautifulSoup(features='lxml').new_tag('blockquote'))
p.attrs = {}
style = ''
if align is not None and align != self.DEFAULT_ALIGN_STYLE:
style += f'text-align: {align};'
if indent is not None:
indent = indent.group(1)
style += f'text-indent: {indent}in;'
if style:
p.attrs['style'] = style
def _process_two_columns(self):
"""
Function to process paragraphs which has two columns layout.
"""
two_columns = self.body_tag.find_all("div", style="column-count: 2")
for div in two_columns:
for child in div.children:
if child.name == "p":
child["class"] = "columns2"
div.unwrap()
def _process_tables(self):
"""
Function to process tables. Set "border" attribute.
"""
tables = self.body_tag.find_all("table")
for table in tables:
tds = table.find_all("td")
sizes = []
for td in tds:
style = td.get('style')
if style:
match = re.search(r"border: ?(\d+\.?\d*)(p[tx])", style)
if match:
size = match.group(1)
units = match.group(2)
if units == "pt":
size = self.convert_pt_to_px(size)
sizes.append(float(size))
width = td.get('width')
td.attrs = {}
if width:
td.attrs['width'] = width
if sizes:
border_size = sum(sizes)/len(sizes)
table.attrs['border'] = f'{border_size:.2}'
self.tables_amount = len(tables)
def _process_quotes(self):
"""
Function to process block quotes.
After docx to html conversion block quotes are stored inside table with 1 cell.
All text is wrapped in a <i> tag.
Such tables will be replaced with <blockquote> tags.
<table cellpadding=\"7\" cellspacing=\"0\" width=\"614\">
<col width=\"600\"/>
<tr>
<td width=\"600\">
<p style=\"text-align: justify;\"><i>aaaaa</i></p>
<p style=\"text-align: justify;\"><br/></p>
</td>
</tr>
</table>
"""
tables = self.body_tag.find_all("table")
for table in tables:
trs = table.find_all("tr")
tds = table.find_all("td")
if len(trs) == 1 and len(tds) == 1 and tds[0].get('width') == '600':
td = tds[0]
is_zero_border = 'border: none;' in td.get('style')
paragraphs = td.find_all("p")
has_i_tag_or_br = [(p.i, p.br) for p in paragraphs]
has_i_tag_or_br = [x[0] is not None or x[1] is not None
for x in has_i_tag_or_br]
if all(has_i_tag_or_br) and is_zero_border:
new_div = BeautifulSoup(features='lxml').new_tag('blockquote')
for p in paragraphs:
new_div.append(p)
table.replaceWith(new_div)
# def _process_quotes(self):
# """
# Function to process <dl> tags. All tags will be replaced with <blockquote> tags.
# """
# dls = self.body_tag.find_all('dl')
#
# for dl in dls:
# pars = dl.find_all('p')
# for p in pars:
# p.wrap(BeautifulSoup(features='lxml').new_tag('blockquote'))
# new_div = BeautifulSoup(features='lxml').new_tag('div')
# for p in pars:
# new_div.append(p.parent)
# dl.replaceWith(new_div)
@staticmethod
def _clean_footnote_content(content):
content = content.strip()
return content.strip()
def _process_footnotes(self):
"""
Function returns list of footnotes and delete them from html_soup.
"""
footnote_anchors = self.body_tag.find_all('a', class_='sdfootnoteanc')
footnote_content = self.body_tag.find_all('div', id=re.compile(r'^sdfootnote\d+$'))
footnote_amt = len(footnote_anchors)
assert footnote_amt == len(footnote_content),\
'Some ting went wrong with footnotes after libra conversion'
footnotes = []
for i, (anc_tag, cont_tag) in enumerate(zip(footnote_anchors, footnote_content)):
assert anc_tag['name'] == cont_tag.find('a')['href'][1:], \
'Some ting went wrong with footnotes after libra conversion'
new_tag = BeautifulSoup(features='lxml').new_tag('sup')
new_tag['class'] = 'footnote-element'
new_tag['data-id'] = i + 1
new_tag['id'] = f'footnote-{i + 1}'
new_tag.string = '*'
anc_tag.replace_with(new_tag)
cont_tag.a.decompose()
content = self._clean_footnote_content(cont_tag.p.decode_contents())
cont_tag.decompose()
# new_tag = BeautifulSoup(features="lxml").new_tag('div')
# new_tag['class'] = 'footnote-element'
# new_tag['data-id'] = f'"{i}"'
# new_tag['id'] = f'footnote-{i}'
# new_tag.string = content
# footnotes.append(str(new_tag))
footnotes.append(content)
# i += 1
self.footnotes = footnotes
def _process_images(self):
"""
Function to process <img> tag. Img should be sent Amazon S3 and then return new tag with valid link.
For now images are moved to one folder.
"""
img_tags = self.body_tag.find_all('img')
if len(img_tags):
if self.access is None:
folder_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
new_path = pathlib.Path(os.path.join(folder_path, f'json/img_{self.file_path.stem}/'))
new_path.mkdir(exist_ok=True)
for img in img_tags:
img_name = img.attrs.get('src')
img_path = pathlib.Path(f'{self.file_path.parent}/{img_name}')
if self.access is not None:
link = self.access.send_image(img_path, self.book_id)
img.attrs['src'] = link
self.log(f'{img_name} successfully uploaded.')
else:
img_size = os.path.getsize(img_path)
print(f'{img_name} successfully loaded. Image size: {img_size}.')
new_img_path = new_path / img_name
copyfile(img_path, new_img_path)
img.attrs["src"] = str(new_img_path)
self.images = img_tags
def _process_footer(self):
"""
Function to process <div title="footer"> tags.
All the tags will be deleted from file.
"""
divs = self.body_tag.find_all('div', {'title': 'footer'})
for div in divs:
div.decompose()
def _process_div(self):
"""
Function to process <div> tags. All the tags will be deleted from file, all content of the tags will stay.
"""
divs = self.body_tag.find_all("div")
for div in divs:
div.unwrap()
def _process_toc_links(self):
"""
Function to extract nodes which contains TOC links, remove links from file and detect headers.
"""
toc_links = self.body_tag.find_all("a", {'name': re.compile(r'^_Toc\d+')})
headers = [link.parent for link in toc_links]
outline_level = "1" # All the unknown outlines will be predicted as <h1>
for tag in headers:
if re.search(r"^h\d$", tag.name):
tag.a.unwrap()
# outline_level = tag.name[-1] # TODO: add prediction of the outline level
# TODO: escape from recounting paragraphs every time
elif tag.name == "p":
if tag in self.body_tag.find_all("p"):
new_tag = BeautifulSoup(features="lxml").new_tag("h" + outline_level)
text = tag.text
tag.replaceWith(new_tag)
new_tag.string = text
else:
# rethink document structure when you have toc_links, other cases?
self.logger.warning(f'Something went wrong in processing toc_links.'
f' Check the structure of the file. '
f'Tag name: {tag.name}')
@staticmethod
def clean_header_title(title):
"""
Function to remove digits and extra spaces from headers.
:param title: Title to process.
"""
title = re.sub(r'\s+', ' ', title).strip()
title = re.sub(r'^(?:\.?\d+\.? ?)+', '', title)
# title = re.sub(r'^(?:\.?[MDCLXVIclxvi]+\.? ?)+ ', '', title) # delete chapter numbering from the title
title = re.sub(r'^(?:[A-Za-z]\. ?)+', '', title)
return title.strip()
def _preprocessing_headings(self):
"""
Function to convert all lower level headings to p tags
"""
pattern = f'^h[{self.SUPPORTED_LEVELS + 1}-9]$'
header_tags = self.body_tag.find_all(re.compile(pattern))
for tag in header_tags:
tag.name = 'p'
def _get_top_level_headers(self):
"""
Function for gathering info about top-level chapters.
Assume:
- Headers with smallest outline(or digit in <h>) are top level chapters.
[ It is consistent with a recursive algorithm
for saving content to a resulted json structure,
which happens in header_to_json()]
"""
headers_info = []
header_tags = self.body_tag.find_all(re.compile("^h[1-9]$"))
headers_outline = [int(re.sub(r"^h", "", tag.name)) for tag in header_tags]
top_level_outline = min(headers_outline)
top_level_headers = [tag for tag in header_tags
if int(re.sub(r"^h", "", tag.name)) == top_level_outline]
for tag in top_level_headers:
if tag.parent.name == "li":
tag.parent.unwrap()
while tag.parent.name == "ol":
tag.parent.unwrap()
title = tag.text
title = re.sub(r'\s+', ' ', title).strip()
number = re.match(r'^(?:\.?\d+\.? ?)+', title)
is_numbered = number is not None
cleaned_title = self.clean_header_title(tag.text)
is_introduction = cleaned_title.lower() == 'introduction'
headers_info.append({
'title': cleaned_title,
'is_numbered': is_numbered,
'is_introduction': is_introduction})
return headers_info
def _mark_introduction_headers(self):
"""
Function to find out:
what header shouldn't be numbered and can be treated as introduction chapter
Assume header(s) to be introduction if:
1. one header not numbered, before 1 numbered header
2. it is first header from the top level list and it equals to 'introduction'
Result :
Mark each top-level header with flag should_be_numbered = true/false
"""
is_numbered_header = [header['is_numbered'] for header in self.top_level_headers]
is_title = [header['is_introduction'] for header in self.top_level_headers]
first_not_numbered = is_numbered_header and is_numbered_header[0] == 0
second_is_numbered_or_not_exist = all(is_numbered_header[1:2])
first_header_is_introduction = is_title and is_title[0]
if (first_not_numbered and second_is_numbered_or_not_exist) or first_header_is_introduction:
self.top_level_headers[0]['should_be_numbered'] = False
for i in range(1, len(self.top_level_headers)):
self.top_level_headers[i]['should_be_numbered'] = True
else:
for i in range(0, len(self.top_level_headers)):
self.top_level_headers[i]['should_be_numbered'] = True
def _process_headings(self):
"""
Function to process tags <h>.
"""
header_tags = self.body_tag.find_all(re.compile("^h[1-9]$"))
for tag in header_tags:
if tag.parent.name == "li":
tag.parent.unwrap()
while tag.parent.name == "ol":
tag.parent.unwrap()
title = tag.text
title = self.clean_header_title(title)
if title == "":
tag.unwrap()
else:
assert tag.name in self.SUPPORTED_HEADERS, \
f'Preprocessing went wrong, there is still h{self.SUPPORTED_LEVELS + 1}-h9 headings.'
# if tag.name in ["h4", "h5", "h6"]:
# tag.name = "h3" # All the lower level headings will be transformed to h3 headings
new_tag = BeautifulSoup(features='lxml').new_tag(name=tag.name)
new_tag.string = title
tag.replace_with(new_tag)
def _process_lists(self):
"""
Function to process tags <ul>.
Unwrap <p> tags.
"""
list_tags = self.body_tag.find_all("ul")
for tag in list_tags:
for il_tag in tag.find_all("li"):
il_tag.attrs.update(il_tag.p.attrs)
il_tag.p.unwrap()
def write_html_from_list(self, file_name='url_test.html'):
folder_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
file_path = pathlib.Path(os.path.join(folder_path, file_name))
with open(file_path, 'w', encoding='utf-8') as f_out:
# f_out.write("".join([tag.prettify() for tag in self.content]))
f_out.write(self.body_tag.prettify())
self.logger.info(f'Check test file - url_test.html.')
def process_html(self):
"""
Process html code to satisfy LawCarta formatting.
"""
self.log('Beginning of processing .html file.')
try:
self.clean_trash()
# process main elements of the .html doc
self.log(f'Processing main elements of html.')
self._preprocessing_headings()
self._process_paragraph()
self._process_two_columns()
self.log('Block quotes processing.')
self._process_quotes()
self.log('Tables processing.')
self._process_tables()
self.log(f'{self.tables_amount} tables have been processed.')
self.log('Footnotes processing.')
self._process_footnotes()
self.log(f'{len(self.footnotes)} footnotes have been processed.')
self.log('Image processing.')
self._process_images()
self.log(f'{len(self.images)} images have been processed.')
self._process_footer()
self._process_div()
self.content = self.body_tag.find_all(recursive=False)
self.log(f'Processing TOC and headers.')
self._process_toc_links()
self.top_level_headers = self._get_top_level_headers()
self._mark_introduction_headers()
self._process_headings()
self.content = self.body_tag.find_all(recursive=False)
# delete text before table of content if exists
self.delete_content_before_toc()
self._process_lists()
except Exception as exc:
self.log('Error has occurred while processing html.', logging.ERROR)
self.log_error_to_main_log()
self.set_error_status()
raise exc
self.log('End of processing .html file.')
@staticmethod
def format_html(html_text):
"""
Function to remove useless symbols from html code.
:param html_text: Text to process.
:return: Cleaned text.
"""
new_text = re.sub(r'([\n\t])', ' ', html_text)
return new_text
# TODO: rethink the function structure without indexes.
def header_to_json(self, ind):
"""
Function process header and collects all content for it.
:param ind: Index of header in content list.
"""
if self.content[ind].name in self.SUPPORTED_HEADERS:
title = self.content[ind].text
curr_outline = int(re.sub(r"^h", "", self.content[ind].name)) # extract outline from tag
result = {title: []}
ch_content = []
ind += 1
while ind < len(self.content):
# 1. next tag is a header
if self.content[ind].name in self.SUPPORTED_HEADERS:
outline = int(re.sub(r"^h", "", self.content[ind].name))
# - recursion step until h_i > h_initial
if outline > curr_outline:
res, ind = self.header_to_json(ind)
if ch_content:
result[title].append("".join(ch_content))
ch_content = []
result[title].append(res)
# - current h_i <= h_initial, end of recursion
else:
# return result, ind
break
# 2. next tag is not a header. add new paragraphs
else:
res = self.format_html(str(self.content[ind]))
# result[title].append(res)
ch_content.append(res)
ind += 1
if ch_content:
result[title].append("".join(ch_content))
return result, ind
return ''
@staticmethod
def _is_empty_p_tag(tag):
if tag.name != 'p':
return False
temp_tag = copy(tag)
brs = temp_tag.find_all('br')
for br in brs:
br.decompose()
text = re.sub(r'\s+', '', temp_tag.text)
if text:
return False
return True
def convert_to_json(self):
"""
Function which convert list of html nodes to appropriate json structure.
"""
json_strc = []
ind = 0
ch_num = 0
ch_amt = 0
try:
while ind < len(self.content):
res = {}
if self.content[ind].name in self.SUPPORTED_HEADERS:
res, ind = self.header_to_json(ind)
assert len(res.keys()) == 1, 'Something went wrong during header to json conversion.'
top_level_header = list(res.keys())[0]
res = {
'title': top_level_header,
'contents': res[top_level_header]
}
else:
chapter_title = f'Untitled chapter {ch_num}'
chapter = []
while ind < len(self.content) and self.content[ind].name not in self.SUPPORTED_HEADERS:
if not self._is_empty_p_tag(self.content[ind]):
chapter.append(self.format_html(str(self.content[ind])))
ind += 1
if chapter:
res = {
'title': chapter_title,
'contents': ["".join(chapter)]
}
ch_num += 1
if res:
json_strc.append(res)
ch_amt += 1
self.log(f'Chapter {ch_amt} has been added to structure.')
except Exception as exc:
self.log('Error has occurred while making json structure.', logging.ERROR)
self.log_error_to_main_log()
self.set_error_status()
raise exc
# Add is_introduction field to json structure
# after deleting content before toc, some chapters can be deleted
same_first_titles = self.top_level_headers[0]['title'] == json_strc[0]['title']
is_first_header_introduction = not self.top_level_headers[0]['should_be_numbered']
json_strc[0]['is_introduction'] = is_first_header_introduction and same_first_titles
self.content_dict = {
"content": json_strc,
"footnotes": self.footnotes
}
def write_json(self):
try:
with codecs.open(self.output_path, 'w', encoding='utf-8') as f:
json.dump(self.content_dict, f, ensure_ascii=False)
self.log('Data has been saved to .json file.')
except Exception as exc:
self.log('Error has occurred while writing json file.', logging.ERROR)
# self.log_error_to_main_log()
# self.set_error_status()
# raise exc
def send_json_content(self):
try:
self.access.send_book(self.book_id, self.content_dict)
self.log(f'JSON data has been sent to server.')
except Exception as exc:
self.log('Error has occurred while sending json content.', logging.ERROR)
self.log_error_to_main_log()
self.set_error_status()
raise exc
def convert_from_html(self, logging_format):
self.configure_file_logger(f'{__name__}_{self.book_id}', logging_format=logging_format, filemode='w+')
self.read_html()
self.process_html()
self.convert_to_json()
self.write_json()
def test_conversion(self):
self.configure_file_logger(self.book_id, filemode='w+')
self.log('Beginning of the test.')
self.convert_doc_to_html()
self.check_output_directory()
self.read_html()
self.process_html()
self.convert_to_json()
self.write_json()
self.log('End of the test.')
def conversion(self, logging_format, filemode='w+'):
self.configure_file_logger(f'{__name__}_{self.book_id}', logging_format=logging_format, filemode=filemode)
self.log('Beginning of conversion from .docx to .json.')
self.get_docx()
self.set_process_status()
self.convert_doc_to_html()
self.check_output_directory()
self.read_html()
self.process_html()
self.set_generate_status()
self.convert_to_json()
self.write_json()
self.send_json_content()
self.log(f'End of the conversion to LawCarta format. Check {self.output_path}.')
if __name__ == "__main__":
folder_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
file_path = pathlib.Path(os.path.join(folder_path, 'html/82/82.html'))
out_path = pathlib.Path(os.path.join(folder_path, 'json/82.json'))
logging_format = '%(asctime)s - %(levelname)s - %(message)s'
book = Book(file_path=file_path, output_path=out_path)
book.convert_from_html(logging_format=logging_format)