forked from LiveCarta/BookConverter
413 lines
19 KiB
Python
413 lines
19 KiB
Python
import codecs
|
||
import json
|
||
import logging
|
||
import re
|
||
from os.path import dirname, normpath, join
|
||
from collections import defaultdict
|
||
from typing import Dict, Union
|
||
from itertools import chain
|
||
|
||
import ebooklib
|
||
from bs4 import BeautifulSoup
|
||
from ebooklib import epub
|
||
from ebooklib.epub import Link, Section
|
||
|
||
from data_objects import ChapterItem, NavPoint
|
||
from html_epub_preprocessor import unwrap_structural_tags, get_tags_between_chapter_marks, prepare_title_and_content, \
|
||
update_src_links_in_images, preprocess_footnotes
|
||
|
||
from css_reader import clean_css, add_inline_style_to_html_soup
|
||
from livecarta_config import LawCartaConfig, BookLogger
|
||
|
||
|
||
class EpubPostprocessor:
|
||
def __init__(self, file, access=None, logger=None):
|
||
self.file = file
|
||
self.access = access
|
||
self.logger: BookLogger = logger
|
||
self.ebooklib_book = epub.read_epub(file) # todo: log error from ebooklib
|
||
self.internal_anchors = set()
|
||
self.logger.log('Image processing.')
|
||
self.href2img_bytes = {}
|
||
self.old_image_path2_aws_path = {}
|
||
for x in chain(self.ebooklib_book.get_items_of_type(ebooklib.ITEM_IMAGE),
|
||
self.ebooklib_book.get_items_of_type(ebooklib.ITEM_COVER)):
|
||
file_name = x.file_name
|
||
content = x.content
|
||
self.href2img_bytes[file_name] = content
|
||
|
||
self.logger.log('HTML files reading.')
|
||
self.id_anchor_exist_in_nav_points = False
|
||
self.href2soup_html: Dict[str, BeautifulSoup] = self.build_href2soup_content()
|
||
|
||
self.logger.log('CSS files processing.')
|
||
self.css_href2content, self.html_href2css_href = self.build_css_content()
|
||
# add css
|
||
self.logger.log('CSS styles adding.')
|
||
self.add_css_styles2soup()
|
||
|
||
self.logger.log('Footnotes processing.')
|
||
self.footnotes = []
|
||
self.noterefs = []
|
||
for href in self.href2soup_html:
|
||
footnotes, noterefs = preprocess_footnotes(self.href2soup_html[href], self.href2soup_html)
|
||
self.footnotes.extend(footnotes)
|
||
self.noterefs.extend(noterefs)
|
||
for i, noteref in enumerate(self.noterefs):
|
||
noteref.attrs['data-id'] = i + 1
|
||
noteref.attrs['id'] = f'footnote-{i + 1}'
|
||
|
||
self.logger.log(f'Added {len(self.footnotes)} footnotes.')
|
||
self.logger.log('TOC processing.')
|
||
self.href2subchapter_ids = defaultdict(list)
|
||
self.added_to_toc_hrefs = set()
|
||
self.adjacency_list: Dict[Union[NavPoint, -1], Union[list, None]] = {} # k = -1 if root, v = None if leaf
|
||
self.build_adjacency_list_from_toc(self.ebooklib_book.toc)
|
||
# build simple toc from spine if needed
|
||
if not self.is_toc_valid():
|
||
self.build_adjacency_list_from_spine()
|
||
not_added = [x for x in self.href2soup_html if x not in self.added_to_toc_hrefs]
|
||
self.logger.log(f'Html documents not added to TOC: {not_added}.')
|
||
self.add_not_added_files_to_adjacency_list(not_added)
|
||
# read anchored blocks, split html into separate block
|
||
self.unwrap_all_html_soup() # used only after parsed toc, ids from toc needed
|
||
self.process_internal_links()
|
||
self.id_anchor2soup: Dict[tuple, BeautifulSoup] = {}
|
||
self.build_anchor2soup()
|
||
|
||
# if not self.is_all_html_epub_items_added(): # not all hrefs in adjacency_list
|
||
# self.add_missed_items_from_spine() # to contents to the chapter after which it placed in spine
|
||
|
||
def build_href2soup_content(self) -> Dict[str, BeautifulSoup]:
|
||
# using EpubElements
|
||
# for now just for HTML objects, as it is simplest chapter
|
||
# todo: check if other chapters exist
|
||
nodes = dict()
|
||
for item in self.ebooklib_book.get_items_of_type(ebooklib.ITEM_DOCUMENT):
|
||
html_body_text = item.get_body_content()
|
||
soup = BeautifulSoup(html_body_text, features='lxml')
|
||
nodes[item.file_name] = soup
|
||
|
||
return nodes
|
||
|
||
def _read_css(self, css_href, html_path):
|
||
path_to_css_from_html = css_href
|
||
html_folder = dirname(html_path)
|
||
path_to_css_from_root = normpath(join(html_folder, path_to_css_from_html))
|
||
css_obj = self.ebooklib_book.get_item_with_href(path_to_css_from_root)
|
||
assert css_obj, f'Css style {css_href} was not in manifest.'
|
||
css_content: str = css_obj.get_content().decode()
|
||
return css_content
|
||
|
||
def build_css_content(self):
|
||
css_href2content, html_href2css_href = {}, {}
|
||
# html_href2css_href 1-to-1, todo: 1-to-many
|
||
|
||
for item in self.ebooklib_book.get_items_of_type(ebooklib.ITEM_DOCUMENT):
|
||
html_text = item.content
|
||
html_path = item.file_name
|
||
soup = BeautifulSoup(html_text, features='lxml')
|
||
for tag in soup.find_all('link', attrs={"type": "text/css"}):
|
||
if tag.attrs.get('rel') and ('alternate' in tag.attrs['rel']):
|
||
continue
|
||
css_href = tag.attrs.get('href')
|
||
html_href2css_href[html_path] = css_href
|
||
if css_href not in css_href2content:
|
||
css_href2content[css_href] = clean_css(self._read_css(css_href, html_path))
|
||
|
||
for i, tag in enumerate(soup.find_all('style')):
|
||
css_content = tag.string
|
||
html_href2css_href[html_path] = f'href{i}'
|
||
css_href2content[f'href{i}'] = clean_css(css_content)
|
||
|
||
return css_href2content, html_href2css_href
|
||
|
||
def add_css_styles2soup(self):
|
||
for href in self.href2soup_html:
|
||
if self.html_href2css_href.get(href):
|
||
css: str = self.css_href2content[self.html_href2css_href[href]]
|
||
content = self.href2soup_html[href]
|
||
content = add_inline_style_to_html_soup(content, css)
|
||
self.href2soup_html[href] = content
|
||
|
||
def build_manifest_id2href(self):
|
||
links = dict()
|
||
for item in self.ebooklib_book.get_items_of_type(ebooklib.ITEM_DOCUMENT):
|
||
links[item.id] = item.file_name
|
||
|
||
return links
|
||
|
||
def build_adjacency_list_from_toc(self, element, lvl=0):
|
||
# use book.toc as a root
|
||
|
||
if isinstance(element, Link):
|
||
# todo: check if link exists
|
||
node = NavPoint(element)
|
||
if node.id:
|
||
self.id_anchor_exist_in_nav_points = True
|
||
self.href2subchapter_ids[node.href].append(node.id)
|
||
self.adjacency_list[node] = None
|
||
self.added_to_toc_hrefs.add(node.href)
|
||
return node
|
||
|
||
elif isinstance(element, tuple):
|
||
first, second = element
|
||
assert isinstance(first, Section)
|
||
node = NavPoint(first)
|
||
if node.id:
|
||
self.id_anchor_exist_in_nav_points = True
|
||
self.href2subchapter_ids[node.href].append(node.id)
|
||
|
||
sub_nodes = []
|
||
for i in second:
|
||
sub_nodes.append(self.build_adjacency_list_from_toc(i, lvl + 1))
|
||
|
||
self.adjacency_list[node] = sub_nodes
|
||
self.added_to_toc_hrefs.add(node.href)
|
||
return node
|
||
|
||
elif isinstance(element, list) and (lvl == 0):
|
||
sub_nodes = []
|
||
for i in element:
|
||
sub_nodes.append(self.build_adjacency_list_from_toc(i, lvl + 1))
|
||
|
||
self.adjacency_list[-1] = sub_nodes
|
||
|
||
else:
|
||
assert 0, f'Error. Element is not tuple/Link instance: {type(element)}'
|
||
|
||
def is_toc_valid(self):
|
||
if (self.ebooklib_book.toc is None) or (self.adjacency_list.get(-1) is None):
|
||
return False
|
||
return True
|
||
|
||
def build_adjacency_list_from_spine(self):
|
||
manifest_id2href = self.build_manifest_id2href()
|
||
self.adjacency_list = {
|
||
-1: []
|
||
}
|
||
for id_, _ in self.ebooklib_book.spine:
|
||
node = NavPoint(Section(manifest_id2href[id_], manifest_id2href[id_]))
|
||
self.adjacency_list[-1].append(node)
|
||
self.added_to_toc_hrefs.add(node.href)
|
||
|
||
def add_not_added_files_to_adjacency_list(self, not_added):
|
||
for i, file in enumerate(not_added):
|
||
node = NavPoint(Section(f'To check #{i}, filename: {file}', file))
|
||
self.adjacency_list[-1].append(node)
|
||
self.added_to_toc_hrefs.add(file)
|
||
|
||
def unwrap_all_html_soup(self):
|
||
# mark
|
||
for href in self.href2soup_html:
|
||
ids = self.href2subchapter_ids[href]
|
||
for i in ids:
|
||
soup = self.href2soup_html[href]
|
||
tag = soup.find(id=i)
|
||
new_h = soup.new_tag('tmp')
|
||
new_h.attrs['class'] = 'converter-chapter-mark'
|
||
new_h.attrs['id'] = i
|
||
tag.insert_before(new_h)
|
||
|
||
# go to line structure
|
||
for href in self.href2soup_html:
|
||
soup = self.href2soup_html[href]
|
||
self.href2soup_html[href] = unwrap_structural_tags(soup)
|
||
|
||
@staticmethod
|
||
def _create_unique_id(href, id_):
|
||
return re.sub(r'([^\w\s])|_|-', '', href) + re.sub(r'[_-]', '0', id_)
|
||
|
||
def process_internal_links(self):
|
||
# rebuild ids to be unique in all documents
|
||
for href in self.added_to_toc_hrefs:
|
||
for tag in self.href2soup_html[href].find_all(attrs={'id': re.compile(r'.+')}):
|
||
if tag.attrs.get('class') == 'converter-chapter-mark':
|
||
continue
|
||
|
||
if tag.attrs.get('class') == 'footnote-element':
|
||
continue
|
||
|
||
new_id = self._create_unique_id(href, tag.attrs['id'])
|
||
tag.attrs['id'] = new_id
|
||
|
||
# ---------------------------------------------------------------------------------
|
||
internal_link_reg = re.compile(r'(^.+\.(html|xhtml)$)')
|
||
for href in self.added_to_toc_hrefs:
|
||
soup = self.href2soup_html[href]
|
||
tags = soup.find_all('a', {'href': internal_link_reg})
|
||
for t in tags:
|
||
href_in_link = t.attrs['href']
|
||
full_path = [path for path in self.added_to_toc_hrefs if href_in_link in path]
|
||
if not full_path:
|
||
self.logger.log(f'Error in {href} file. No {href_in_link} file found in added to TOC documents. '
|
||
f'While processing href in {t}.')
|
||
continue
|
||
|
||
href_in_link = full_path[0]
|
||
new_id = self._create_unique_id(href_in_link, '')
|
||
t.attrs['placeholder'] = '{{tempStyleToAnchor-' + new_id + '}}'
|
||
if new_id not in self.internal_anchors:
|
||
anchor_soup = self.href2soup_html[href_in_link]
|
||
new_anchor_span = soup.new_tag("span")
|
||
new_anchor_span.attrs['id'] = new_id
|
||
new_anchor_span.attrs['class'] = 'link-anchor'
|
||
new_anchor_span.string = "\xa0"
|
||
anchor_soup.insert(0, new_anchor_span)
|
||
self.internal_anchors.add(new_id)
|
||
|
||
del t.attrs['href']
|
||
|
||
# ------------------------------------------------------------------------
|
||
# write placeholder to all internal links
|
||
internal_link_reg = re.compile(r'(^.+\.(html|xhtml)\#.+)|(^\#.+)')
|
||
for href in self.added_to_toc_hrefs:
|
||
soup = self.href2soup_html[href]
|
||
for internal_link_tag in soup.find_all('a', {'href': internal_link_reg}):
|
||
href_in_link, id_in_link = internal_link_tag.attrs['href'].split('#')
|
||
if not href_in_link:
|
||
href_in_link = href
|
||
# find full path
|
||
full_path = [path for path in self.added_to_toc_hrefs if href_in_link in path]
|
||
if not full_path:
|
||
self.logger.log(f'Error in {href} file. No {href_in_link} file found in added to TOC documents. '
|
||
f'While processing href in {internal_link_tag}.')
|
||
internal_link_tag.attrs['converter-mark'] = 'bad-link'
|
||
continue
|
||
|
||
if len(full_path) > 1:
|
||
self.logger.log(f'Warning in {href}. Multiple paths found {full_path} for file {href_in_link}'
|
||
f' while {internal_link_tag} processing. The first one will be chosen.')
|
||
|
||
href_in_link = full_path[0]
|
||
new_id = self._create_unique_id(href_in_link, id_in_link)
|
||
|
||
anchor_soup = self.href2soup_html[href_in_link]
|
||
anchor_tags = anchor_soup.find_all(attrs={'id': new_id})
|
||
if anchor_tags:
|
||
if len(anchor_tags) > 1:
|
||
self.logger.log(f'Warning in {href}: multiple anchors: {len(anchor_tags)} found.'
|
||
f' While processing {internal_link_tag}')
|
||
|
||
anchor_tag = anchor_tags[0]
|
||
# if anchor is found we could add placeholder for link creation on server side.
|
||
internal_link_tag.attrs['placeholder'] = '{{tempStyleToAnchor-' + new_id + '}}'
|
||
# create span to have cyclic links, link has 1 type of class, anchor another
|
||
new_anchor_already_created = soup.find_all('span',
|
||
attrs={'class': 'link-anchor',
|
||
'id': anchor_tag.attrs['id']})
|
||
if anchor_tag.attrs['id'] not in self.internal_anchors:
|
||
new_anchor_span = soup.new_tag("span")
|
||
new_anchor_span.attrs['id'] = anchor_tag.attrs['id']
|
||
new_anchor_span.attrs['class'] = 'link-anchor'
|
||
new_anchor_span.string = "\xa0"
|
||
anchor_tag.insert_before(new_anchor_span)
|
||
self.internal_anchors.add(anchor_tag.attrs['id'])
|
||
del anchor_tag.attrs['id']
|
||
del internal_link_tag.attrs['href']
|
||
|
||
else:
|
||
internal_link_tag.attrs['converter-mark'] = 'bad-link'
|
||
if 'page' not in id_in_link:
|
||
self.logger.log(f'Error in {href}. While processing {internal_link_tag} no anchor found.'
|
||
f' Should be anchor with new id={new_id} in {href_in_link} file.'
|
||
f' Old id={id_in_link}')
|
||
|
||
def build_one_anchored_section(self, node):
|
||
"""
|
||
к этому моементу html soup уже существует в линейном виде
|
||
- если не в линейном - то мы не виноваты
|
||
|
||
есть 3 случая:
|
||
id оборачивает весь контент,
|
||
id оборачивает контент чаптера и под-чаптера,
|
||
id только указывает на заголовок
|
||
|
||
во всех 3х случаях мы знаем где начало заголовка. Поэтому
|
||
глава - это все теги от текущего заголовка - до какого угодно следущющего
|
||
|
||
заголовок принимается в расчет если в toc есть указание id,тогда заголовок -
|
||
это любой тег с id из toc
|
||
:return:
|
||
"""
|
||
if node.id:
|
||
soup = self.href2soup_html[node.href]
|
||
chapter_tags = get_tags_between_chapter_marks(first_id=node.id, href=node.href, html_soup=soup)
|
||
new_tree = BeautifulSoup('', 'html.parser')
|
||
for tag in chapter_tags:
|
||
new_tree.append(tag)
|
||
self.id_anchor2soup[(node.href, node.id)] = new_tree
|
||
|
||
if self.adjacency_list.get(node):
|
||
for sub_node in self.adjacency_list[node]:
|
||
self.build_one_anchored_section(sub_node)
|
||
|
||
def build_anchor2soup(self):
|
||
nav_points = self.adjacency_list[-1]
|
||
if self.id_anchor_exist_in_nav_points:
|
||
for point in nav_points:
|
||
self.build_one_anchored_section(point)
|
||
|
||
def node2livecarta_chapter_item(self, node: NavPoint, lvl=1) -> ChapterItem:
|
||
title = node.title
|
||
if node.id:
|
||
content: BeautifulSoup = self.id_anchor2soup[(node.href, node.id)]
|
||
else:
|
||
content: BeautifulSoup = self.href2soup_html[node.href]
|
||
|
||
self.old_image_path2_aws_path = update_src_links_in_images(content,
|
||
self.href2img_bytes,
|
||
path_to_html=node.href,
|
||
access=self.access,
|
||
path2aws_path=self.old_image_path2_aws_path)
|
||
|
||
is_chapter = lvl <= LawCartaConfig.SUPPORTED_LEVELS
|
||
title_preprocessed, content_preprocessed = prepare_title_and_content(title, content,
|
||
remove_title_from_chapter=is_chapter)
|
||
|
||
sub_nodes = []
|
||
# warning! not EpubHtmlItems won;t be added to chapter
|
||
if self.adjacency_list.get(node):
|
||
for sub_node in self.adjacency_list[node]:
|
||
sub_chapter_item = self.node2livecarta_chapter_item(sub_node, lvl + 1)
|
||
sub_nodes.append(sub_chapter_item)
|
||
|
||
if self.logger:
|
||
indent = ' ' * lvl
|
||
self.logger.log(f'{indent}Chapter: {title} is prepared.')
|
||
return ChapterItem(title_preprocessed, content_preprocessed, sub_nodes)
|
||
|
||
def convert_to_dict(self):
|
||
top_level_nav_points = self.adjacency_list[-1]
|
||
top_level_chapters = []
|
||
|
||
for nav_point in top_level_nav_points:
|
||
chapter = self.node2livecarta_chapter_item(nav_point)
|
||
top_level_chapters.append(chapter)
|
||
|
||
top_level_dict_chapters = [x.to_dict() for x in top_level_chapters]
|
||
self.logger.log(f'Anchors found: {len(self.internal_anchors)}.')
|
||
self.logger.log('End conversion.')
|
||
|
||
return {
|
||
"content": top_level_dict_chapters,
|
||
"footnotes": self.footnotes
|
||
}
|
||
|
||
|
||
if __name__ == "__main__":
|
||
logger = logging.getLogger('epub')
|
||
file_handler = logging.StreamHandler()
|
||
logger.addHandler(file_handler)
|
||
file_handler = logging.FileHandler('epub.log', mode='w+')
|
||
logger.addHandler(file_handler)
|
||
|
||
logger_object = BookLogger(name=f'epub', main_logger=logger, book_id=0)
|
||
|
||
json_converter = EpubPostprocessor('/home/katerina/PycharmProjects/Jenia/converter/epub/9781284171242.epub',
|
||
logger=logger_object)
|
||
tmp = json_converter.convert_to_dict()
|
||
|
||
with codecs.open('tmp.json', 'w', encoding='utf-8') as f:
|
||
json.dump(tmp, f, ensure_ascii=False)
|