forked from LiveCarta/BookConverter
420 lines
18 KiB
Python
420 lines
18 KiB
Python
import codecs
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
from os.path import dirname, normpath, join
|
|
from collections import defaultdict
|
|
from typing import Dict, Union, List
|
|
from itertools import chain
|
|
|
|
import ebooklib
|
|
from bs4 import BeautifulSoup, Tag
|
|
from ebooklib import epub
|
|
from ebooklib.epub import Link, Section
|
|
|
|
from data_objects import ChapterItem, NavPoint
|
|
from html_epub_preprocessor import unwrap_structural_tags, get_tags_between_chapter_marks, prepare_title_and_content, \
|
|
update_src_links_in_images, preprocess_footnotes
|
|
|
|
from css_reader import clean_css, add_inline_style_to_html_soup
|
|
from livecarta_config import LawCartaConfig, BookLogger
|
|
|
|
|
|
class EpubPostprocessor:
|
|
def __init__(self, file, access=None, logger=None):
|
|
self.file = file
|
|
self.access = access
|
|
self.logger: BookLogger = logger
|
|
self.ebooklib_book = epub.read_epub(file)
|
|
self.internal_anchors = set()
|
|
self.logger.log('Image processing.')
|
|
self.href2img_bytes = {}
|
|
self.old_image_path2_aws_path = {}
|
|
for x in chain(self.ebooklib_book.get_items_of_type(ebooklib.ITEM_IMAGE),
|
|
self.ebooklib_book.get_items_of_type(ebooklib.ITEM_COVER)):
|
|
file_name = x.file_name
|
|
content = x.content
|
|
self.href2img_bytes[file_name] = content
|
|
|
|
self.logger.log('HTML files reading.')
|
|
self.id_anchor_exist_in_nav_points = False
|
|
self.href2soup_html: Dict[str, BeautifulSoup] = self.build_href2soup_content()
|
|
|
|
self.logger.log('CSS files processing.')
|
|
self.css_href2content, self.html_href2css_href = self.build_css_content()
|
|
self.logger.log('CSS styles adding.')
|
|
self.add_css_styles2soup()
|
|
|
|
self.logger.log('Footnotes processing.')
|
|
self.footnotes_contents: List[str] = []
|
|
self.noterefs = []
|
|
self.footnotes: List[Tag] = []
|
|
for href in self.href2soup_html:
|
|
content, noterefs, footnotes_tags = preprocess_footnotes(self.href2soup_html[href],
|
|
self.href2soup_html)
|
|
self.footnotes_contents.extend(content)
|
|
self.noterefs.extend(noterefs)
|
|
self.footnotes.extend(footnotes_tags)
|
|
|
|
for i, (noteref, footnote) in enumerate(zip(self.noterefs, self.footnotes)):
|
|
noteref.attrs['data-id'] = i + 1
|
|
noteref.attrs['id'] = f'footnote-{i + 1}'
|
|
footnote.attrs['href'] = f'#footnote-{i + 1}'
|
|
|
|
self.logger.log(f'Added {len(self.footnotes_contents)} footnotes.')
|
|
self.logger.log('TOC processing.')
|
|
self.href2subchapter_ids = defaultdict(list)
|
|
self.added_to_toc_hrefs = set()
|
|
self.adjacency_list: Dict[Union[NavPoint, -1], Union[list, None]] = {} # nav_point2nav_points
|
|
self.build_adjacency_list_from_toc(self.ebooklib_book.toc)
|
|
# build simple toc from spine if needed
|
|
if not self.is_toc_valid():
|
|
self.build_adjacency_list_from_spine()
|
|
not_added = [x for x in self.href2soup_html if x not in self.added_to_toc_hrefs]
|
|
self.logger.log(f'Html documents not added to TOC: {not_added}.')
|
|
self.add_not_added_files_to_adjacency_list(not_added)
|
|
self.process_html_soup_structure_to_line() # used only after parsed toc, ids from toc needed
|
|
self.process_internal_links()
|
|
self.href_chapter_id2soup_html: Dict[tuple, BeautifulSoup] = {}
|
|
self.define_chapters_content()
|
|
|
|
def build_href2soup_content(self) -> Dict[str, BeautifulSoup]:
|
|
# using EpubElements
|
|
# for now just for HTML objects, as it is simplest chapter
|
|
# todo: check if other chapters exist
|
|
nodes = dict()
|
|
for item in self.ebooklib_book.get_items_of_type(ebooklib.ITEM_DOCUMENT):
|
|
html_body_text = item.get_body_content()
|
|
# html.parser closes tags if needed
|
|
soup = BeautifulSoup(html_body_text, features='html.parser')
|
|
nodes[item.file_name] = soup
|
|
|
|
return nodes
|
|
|
|
def _read_css(self, css_href, html_path):
|
|
path_to_css_from_html = css_href
|
|
html_folder = dirname(html_path)
|
|
path_to_css_from_root = normpath(join(html_folder, path_to_css_from_html))
|
|
css_obj = self.ebooklib_book.get_item_with_href(path_to_css_from_root)
|
|
assert css_obj, f'Css style {css_href} was not in manifest.'
|
|
css_content: str = css_obj.get_content().decode()
|
|
return css_content
|
|
|
|
def build_css_content(self):
|
|
css_href2content, html_href2css_href = {}, {}
|
|
# html_href2css_href 1-to-1, todo: 1-to-many
|
|
|
|
for item in self.ebooklib_book.get_items_of_type(ebooklib.ITEM_DOCUMENT):
|
|
html_text = item.content
|
|
html_path = item.file_name
|
|
soup = BeautifulSoup(html_text, features='lxml')
|
|
for tag in soup.find_all('link', attrs={"type": "text/css"}):
|
|
if tag.attrs.get('rel') and ('alternate' in tag.attrs['rel']):
|
|
continue
|
|
css_href = tag.attrs.get('href')
|
|
html_href2css_href[html_path] = css_href
|
|
if css_href not in css_href2content:
|
|
css_href2content[css_href] = clean_css(self._read_css(css_href, html_path))
|
|
|
|
for i, tag in enumerate(soup.find_all('style')):
|
|
css_content = tag.string
|
|
html_href2css_href[html_path] = f'href{i}'
|
|
css_href2content[f'href{i}'] = clean_css(css_content)
|
|
|
|
return css_href2content, html_href2css_href
|
|
|
|
def add_css_styles2soup(self):
|
|
for href in self.href2soup_html:
|
|
if self.html_href2css_href.get(href):
|
|
css: str = self.css_href2content[self.html_href2css_href[href]]
|
|
content = self.href2soup_html[href]
|
|
content = add_inline_style_to_html_soup(content, css)
|
|
self.href2soup_html[href] = content
|
|
|
|
def build_manifest_id2href(self):
|
|
links = dict()
|
|
for item in self.ebooklib_book.get_items_of_type(ebooklib.ITEM_DOCUMENT):
|
|
links[item.id] = item.file_name
|
|
|
|
return links
|
|
|
|
def build_adjacency_list_from_toc(self, element, lvl=0):
|
|
"""
|
|
self.adjacency_list builds based on TOC nested structure
|
|
|
|
key = -1 if root, value = None if leaf
|
|
|
|
:param element: [Link, tuple, list] - element that appears in TOC( usually parsed from nav.ncx)
|
|
:param lvl: level of depth
|
|
"""
|
|
|
|
if isinstance(element, Link):
|
|
# todo: check if link exists
|
|
node = NavPoint(element)
|
|
if node.id:
|
|
self.id_anchor_exist_in_nav_points = True
|
|
self.href2subchapter_ids[node.href].append(node.id)
|
|
self.adjacency_list[node] = None
|
|
self.added_to_toc_hrefs.add(node.href)
|
|
return node
|
|
|
|
elif isinstance(element, tuple):
|
|
first, second = element
|
|
assert isinstance(first, Section)
|
|
node = NavPoint(first)
|
|
if node.id:
|
|
self.id_anchor_exist_in_nav_points = True
|
|
self.href2subchapter_ids[node.href].append(node.id)
|
|
|
|
sub_nodes = []
|
|
for i in second:
|
|
sub_nodes.append(self.build_adjacency_list_from_toc(i, lvl + 1))
|
|
|
|
self.adjacency_list[node] = sub_nodes
|
|
self.added_to_toc_hrefs.add(node.href)
|
|
return node
|
|
|
|
elif isinstance(element, list) and (lvl == 0):
|
|
sub_nodes = []
|
|
for i in element:
|
|
sub_nodes.append(self.build_adjacency_list_from_toc(i, lvl + 1))
|
|
|
|
self.adjacency_list[-1] = sub_nodes
|
|
|
|
else:
|
|
assert 0, f'Error. Element is not tuple/Link instance: {type(element)}'
|
|
|
|
def is_toc_valid(self):
|
|
if (self.ebooklib_book.toc is None) or (self.adjacency_list.get(-1) is None):
|
|
return False
|
|
return True
|
|
|
|
def build_adjacency_list_from_spine(self):
|
|
manifest_id2href = self.build_manifest_id2href()
|
|
self.adjacency_list = {
|
|
-1: []
|
|
}
|
|
for id_, _ in self.ebooklib_book.spine:
|
|
node = NavPoint(Section(manifest_id2href[id_], manifest_id2href[id_]))
|
|
self.adjacency_list[-1].append(node)
|
|
self.added_to_toc_hrefs.add(node.href)
|
|
|
|
def add_not_added_files_to_adjacency_list(self, not_added):
|
|
for i, file in enumerate(not_added):
|
|
node = NavPoint(Section(f'To check #{i}, filename: {file}', file))
|
|
self.adjacency_list[-1].append(node)
|
|
self.added_to_toc_hrefs.add(file)
|
|
|
|
def process_html_soup_structure_to_line(self):
|
|
# mark
|
|
for href in self.href2soup_html:
|
|
ids = self.href2subchapter_ids[href]
|
|
for i in ids:
|
|
soup = self.href2soup_html[href]
|
|
tag = soup.find(id=i)
|
|
new_h = soup.new_tag('tmp')
|
|
new_h.attrs['class'] = 'converter-chapter-mark'
|
|
new_h.attrs['id'] = i
|
|
tag.insert_before(new_h)
|
|
|
|
# go to line structure
|
|
for href in self.href2soup_html:
|
|
soup = self.href2soup_html[href]
|
|
self.href2soup_html[href] = unwrap_structural_tags(soup)
|
|
|
|
@staticmethod
|
|
def _create_unique_id(href, id_):
|
|
return re.sub(r'([^\w\s])|_|-', '', href) + re.sub(r'[_-]', '0', id_)
|
|
|
|
@staticmethod
|
|
def _create_new_anchor_span(soup, id_):
|
|
new_anchor_span = soup.new_tag("span")
|
|
new_anchor_span.attrs['id'] = id_
|
|
new_anchor_span.attrs['class'] = 'link-anchor'
|
|
new_anchor_span.string = "\xa0"
|
|
return new_anchor_span
|
|
|
|
def match_href_to_path_from_toc(self, href, href_in_link, internal_link_tag):
|
|
dir_name = os.path.dirname(href)
|
|
normed_path = os.path.normpath(os.path.join(dir_name, href_in_link))
|
|
full_path = [path for path in self.added_to_toc_hrefs if normed_path in path]
|
|
if not full_path:
|
|
self.logger.log(f'Error in {href} file. No {normed_path} file found in added to TOC documents. '
|
|
f'While processing href in {internal_link_tag}.')
|
|
internal_link_tag.attrs['converter-mark'] = 'bad-link'
|
|
return None
|
|
|
|
if len(full_path) > 1:
|
|
self.logger.log(f'Warning in {href}. Multiple paths found {full_path} for file {href_in_link}'
|
|
f' while {internal_link_tag} processing. The first one will be chosen.')
|
|
|
|
return full_path[0]
|
|
|
|
def process_internal_links(self):
|
|
# 1. rebuild ids to be unique in all documents
|
|
for toc_href in self.added_to_toc_hrefs:
|
|
for tag in self.href2soup_html[toc_href].find_all(attrs={'id': re.compile(r'.+')}):
|
|
if tag.attrs.get('class') == 'converter-chapter-mark':
|
|
continue
|
|
|
|
if tag.attrs.get('class') == 'footnote-element':
|
|
continue
|
|
|
|
new_id = self._create_unique_id(toc_href, tag.attrs['id'])
|
|
tag.attrs['id'] = new_id
|
|
|
|
# 2.a) process anchor which is a whole xhtml file
|
|
internal_link_reg1 = re.compile(r'(^(?!https?://).+\.(html|xhtml)$)')
|
|
for toc_href in self.added_to_toc_hrefs:
|
|
soup = self.href2soup_html[toc_href]
|
|
for internal_link_tag in soup.find_all('a', {'href': internal_link_reg1}):
|
|
a_tag_href = internal_link_tag.attrs['href']
|
|
# find full path
|
|
a_tag_href_matched_to_toc = self.match_href_to_path_from_toc(toc_href, a_tag_href, internal_link_tag)
|
|
if not a_tag_href_matched_to_toc:
|
|
continue
|
|
new_id = self._create_unique_id(a_tag_href_matched_to_toc, '')
|
|
internal_link_tag.attrs['placeholder'] = '{{tempStyleToAnchor-' + new_id + '}}'
|
|
if new_id not in self.internal_anchors:
|
|
anchor_soup = self.href2soup_html[a_tag_href_matched_to_toc]
|
|
new_anchor_span = self._create_new_anchor_span(soup, new_id)
|
|
anchor_soup.insert(0, new_anchor_span) # insert a new span to the begin of the file
|
|
self.internal_anchors.add(new_id)
|
|
|
|
del internal_link_tag.attrs['href']
|
|
|
|
# 2.a) process anchor which is a an element in xhtml file
|
|
internal_link_reg2 = re.compile(r'(^.+\.(html|xhtml)\#.+)|(^\#.+)')
|
|
for toc_href in self.added_to_toc_hrefs:
|
|
soup = self.href2soup_html[toc_href]
|
|
for internal_link_tag in soup.find_all('a', {'href': internal_link_reg2}):
|
|
a_tag_href, a_tag_id = internal_link_tag.attrs['href'].split('#')
|
|
a_tag_href = a_tag_href or toc_href
|
|
# find full path
|
|
a_tag_href_matched_to_toc = self.match_href_to_path_from_toc(toc_href, a_tag_href, internal_link_tag)
|
|
if not a_tag_href_matched_to_toc:
|
|
continue
|
|
new_id = self._create_unique_id(a_tag_href_matched_to_toc, a_tag_id)
|
|
|
|
anchor_soup = self.href2soup_html[a_tag_href_matched_to_toc]
|
|
anchor_tags = anchor_soup.find_all(attrs={'id': new_id})
|
|
anchor_tags = anchor_tags or anchor_soup.find_all(attrs={'id': a_tag_id}) # if link is a footnote
|
|
|
|
if anchor_tags:
|
|
if len(anchor_tags) > 1:
|
|
self.logger.log(f'Warning in {toc_href}: multiple anchors: {len(anchor_tags)} found.'
|
|
f' While processing {internal_link_tag}')
|
|
|
|
anchor_tag = anchor_tags[0]
|
|
assert anchor_tag.attrs['id'] in [new_id, a_tag_id]
|
|
# if anchor is found we could add placeholder for link creation on server side.
|
|
internal_link_tag.attrs['placeholder'] = '{{tempStyleToAnchor-' + new_id + '}}'
|
|
# create span to have cyclic links, link has 1 type of class, anchor another
|
|
if anchor_tag.attrs['id'] not in self.internal_anchors:
|
|
new_anchor_span = self._create_new_anchor_span(soup, new_id)
|
|
anchor_tag.insert_before(new_anchor_span)
|
|
self.internal_anchors.add(new_id)
|
|
del anchor_tag.attrs['id']
|
|
del internal_link_tag.attrs['href']
|
|
|
|
else:
|
|
internal_link_tag.attrs['converter-mark'] = 'bad-link'
|
|
self.logger.log(f'Error in {toc_href}. While processing {internal_link_tag} no anchor found.'
|
|
f' Should be anchor with new id={new_id} in {a_tag_href_matched_to_toc} file.'
|
|
f' Old id={a_tag_id}')
|
|
|
|
def build_one_chapter(self, node):
|
|
"""
|
|
Updates self.href_chapter_id2soup_html (mapping from (href,id) to chapter content/html soup object)
|
|
|
|
3 cases:
|
|
id wraps all chapter content,
|
|
id wraps chapter's content + subchapters' content
|
|
id points to the start of title of a chapter
|
|
|
|
In all cases we know where chapter starts. Therefore chapter is all tags between chapter's id
|
|
and id of the next chapter/subchapter
|
|
|
|
"""
|
|
if node.id:
|
|
soup = self.href2soup_html[node.href]
|
|
chapter_tags = get_tags_between_chapter_marks(first_id=node.id, href=node.href, html_soup=soup)
|
|
new_tree = BeautifulSoup('', 'html.parser')
|
|
for tag in chapter_tags:
|
|
new_tree.append(tag)
|
|
self.href_chapter_id2soup_html[(node.href, node.id)] = new_tree
|
|
|
|
if self.adjacency_list.get(node):
|
|
for sub_node in self.adjacency_list[node]:
|
|
self.build_one_chapter(sub_node)
|
|
|
|
def define_chapters_content(self):
|
|
nav_points = self.adjacency_list[-1]
|
|
if self.id_anchor_exist_in_nav_points:
|
|
for point in nav_points:
|
|
self.build_one_chapter(point)
|
|
|
|
def node2livecarta_chapter_item(self, node: NavPoint, lvl=1) -> ChapterItem:
|
|
title = node.title
|
|
if node.id:
|
|
content: BeautifulSoup = self.href_chapter_id2soup_html[(node.href, node.id)]
|
|
else:
|
|
content: BeautifulSoup = self.href2soup_html[node.href]
|
|
|
|
self.old_image_path2_aws_path = update_src_links_in_images(content,
|
|
self.href2img_bytes,
|
|
path_to_html=node.href,
|
|
access=self.access,
|
|
path2aws_path=self.old_image_path2_aws_path)
|
|
|
|
is_chapter = lvl <= LawCartaConfig.SUPPORTED_LEVELS
|
|
title_preprocessed, content_preprocessed = prepare_title_and_content(title, content,
|
|
remove_title_from_chapter=is_chapter)
|
|
|
|
sub_nodes = []
|
|
# warning! not EpubHtmlItems won;t be added to chapter
|
|
if self.adjacency_list.get(node):
|
|
for sub_node in self.adjacency_list[node]:
|
|
sub_chapter_item = self.node2livecarta_chapter_item(sub_node, lvl + 1)
|
|
sub_nodes.append(sub_chapter_item)
|
|
|
|
if self.logger:
|
|
indent = ' ' * lvl
|
|
self.logger.log(f'{indent}Chapter: {title} is prepared.')
|
|
return ChapterItem(title_preprocessed, content_preprocessed, sub_nodes)
|
|
|
|
def convert_to_dict(self):
|
|
top_level_nav_points = self.adjacency_list[-1]
|
|
top_level_chapters = []
|
|
|
|
for nav_point in top_level_nav_points:
|
|
chapter = self.node2livecarta_chapter_item(nav_point)
|
|
top_level_chapters.append(chapter)
|
|
|
|
top_level_dict_chapters = [x.to_dict() for x in top_level_chapters]
|
|
self.logger.log(f'Anchors found: {len(self.internal_anchors)}.')
|
|
self.logger.log('End conversion.')
|
|
|
|
return {
|
|
"content": top_level_dict_chapters,
|
|
"footnotes": self.footnotes_contents
|
|
}
|
|
|
|
|
|
if __name__ == "__main__":
|
|
logger = logging.getLogger('epub')
|
|
file_handler = logging.StreamHandler()
|
|
logger.addHandler(file_handler)
|
|
file_handler = logging.FileHandler('epub.log', mode='w+')
|
|
logger.addHandler(file_handler)
|
|
|
|
logger_object = BookLogger(name=f'epub', main_logger=logger, book_id=0)
|
|
|
|
json_converter = EpubPostprocessor('/home/katerina/PycharmProjects/Jenia/converter/epub/9781119682387_pre_code2.epub',
|
|
logger=logger_object)
|
|
tmp = json_converter.convert_to_dict()
|
|
|
|
with codecs.open('tmp.json', 'w', encoding='utf-8') as f:
|
|
json.dump(tmp, f, ensure_ascii=False)
|