forked from LiveCarta/BookConverter
add functionality for api
This commit is contained in:
169
src/book.py
169
src/book.py
@@ -28,14 +28,12 @@ class Book:
|
|||||||
}
|
}
|
||||||
SUPPORTED_HEADERS = ["h1", "h2", "h3"]
|
SUPPORTED_HEADERS = ["h1", "h2", "h3"]
|
||||||
|
|
||||||
def __init__(self, file_path, output=None, recreate=False, train_mode=False, convert=False, model_location=None):
|
def __init__(self, book_id, access=None):
|
||||||
self.file_path = pathlib.Path(file_path)
|
self.book_id = book_id
|
||||||
self.output_path = output
|
self.access = access
|
||||||
self.recreate = recreate
|
|
||||||
self.train_mode = train_mode
|
|
||||||
self.convert = convert
|
|
||||||
self.model_location = model_location
|
|
||||||
|
|
||||||
|
self.file_path = None
|
||||||
|
self.output_path = None
|
||||||
self.logger = None
|
self.logger = None
|
||||||
self.html_soup = None
|
self.html_soup = None
|
||||||
self.body_tag = None
|
self.body_tag = None
|
||||||
@@ -43,37 +41,8 @@ class Book:
|
|||||||
self.footnotes = list()
|
self.footnotes = list()
|
||||||
self.images = list()
|
self.images = list()
|
||||||
self.content_dict = dict()
|
self.content_dict = dict()
|
||||||
# self.model = HeaderDetector(self.model_location, self.file_path.name)
|
|
||||||
|
|
||||||
def parse_args(self):
|
def configure_file_logger(self, name, attr_name='logger', filename='logs/converter_log.log', filemode='w+',
|
||||||
"""
|
|
||||||
Method for parsing arguments from command line.
|
|
||||||
"""
|
|
||||||
parser = argparse.ArgumentParser(description='Converts .docx/.html documents to .json file with '
|
|
||||||
'LiveCarta book structure.')
|
|
||||||
parser.add_argument('-f', dest='file_path', type=str, required=True, help='Path to file to be processed.')
|
|
||||||
parser.add_argument('-o', dest='output', help='Path to output file.', default="")
|
|
||||||
parser.add_argument('--recreate', nargs='?', const=True, default=False,
|
|
||||||
help='If output file exist, will overwrite it.')
|
|
||||||
parser.add_argument('--convert', dest='convert', nargs='?', const=True, default=False,
|
|
||||||
help='Conversion from .docx to .html with "libreoffice".')
|
|
||||||
parser.add_argument('--train', dest='train_mode', nargs='?', const=True, default=False,
|
|
||||||
help='Train mode, takes labeled file (with highlighted paragraphs) and fitted model')
|
|
||||||
parser.add_argument('-m', dest='model_location',
|
|
||||||
help='Path to file with fitted model. If does not exist, will be created')
|
|
||||||
|
|
||||||
args = parser.parse_args()
|
|
||||||
|
|
||||||
folder_path = os.path.dirname(os.path.abspath(__file__))
|
|
||||||
self.file_path = os.path.join(folder_path, "..", args.file_path)
|
|
||||||
self.output_path = args.output
|
|
||||||
self.recreate = args.recreate
|
|
||||||
self.train_mode = args.train_mode
|
|
||||||
self.convert = args.convert
|
|
||||||
self.model_location = args.model_location
|
|
||||||
# self.model = HeaderDetector(self.model_location, self.file_path.name)
|
|
||||||
|
|
||||||
def configure_file_logger(self, name, attr_name='logger', filename='logs/converter_log.log', filemode='w',
|
|
||||||
logging_level=logging.INFO, logging_format='%(asctime)s - %(message)s'):
|
logging_level=logging.INFO, logging_format='%(asctime)s - %(message)s'):
|
||||||
"""
|
"""
|
||||||
Method for Logger configuration. Logger will write in file.
|
Method for Logger configuration. Logger will write in file.
|
||||||
@@ -84,14 +53,16 @@ class Book:
|
|||||||
:param filemode: mode of opening log file.
|
:param filemode: mode of opening log file.
|
||||||
:param logging_level: logging level: 10 - debug, 20 - info, 30 - warning, 40 - error, 50 - critical.
|
:param logging_level: logging level: 10 - debug, 20 - info, 30 - warning, 40 - error, 50 - critical.
|
||||||
:param logging_format: format of record in log file.
|
:param logging_format: format of record in log file.
|
||||||
:param date_format: format of the date that will be used in record.
|
|
||||||
"""
|
"""
|
||||||
logger = logging.getLogger(name)
|
logger = logging.getLogger(name)
|
||||||
|
|
||||||
if self.file_path:
|
folder_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||||||
filename = f'logs/{self.file_path.stem}_log.log'
|
if self.book_id:
|
||||||
|
filename = f'logs/{self.book_id}_log.log'
|
||||||
|
|
||||||
file_handler = logging.FileHandler(filename, mode=filemode)
|
file_path = os.path.join(folder_path, filename)
|
||||||
|
|
||||||
|
file_handler = logging.FileHandler(file_path, mode=filemode)
|
||||||
# file_format = logging.Formatter(fmt=logging_format, datefmt=date_format)
|
# file_format = logging.Formatter(fmt=logging_format, datefmt=date_format)
|
||||||
file_format = logging.Formatter(fmt=logging_format)
|
file_format = logging.Formatter(fmt=logging_format)
|
||||||
file_handler.setFormatter(file_format)
|
file_handler.setFormatter(file_format)
|
||||||
@@ -101,6 +72,58 @@ class Book:
|
|||||||
|
|
||||||
setattr(self, attr_name, logger)
|
setattr(self, attr_name, logger)
|
||||||
|
|
||||||
|
def log(self, message, logging_level=20):
|
||||||
|
"""
|
||||||
|
Method for logging.
|
||||||
|
|
||||||
|
:param message: body of the message
|
||||||
|
:param logging_level: level of logging
|
||||||
|
"""
|
||||||
|
self.logger.log(msg=message, level=logging_level)
|
||||||
|
|
||||||
|
def save_docx(self, content):
|
||||||
|
"""
|
||||||
|
Save binary content of file to .docx.
|
||||||
|
:param content: binary content of the file.
|
||||||
|
"""
|
||||||
|
folder_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||||||
|
file_path = os.path.join(folder_path, f'docx/{self.book_id}.docx')
|
||||||
|
with open(file_path, 'wb+') as file:
|
||||||
|
file.write(content)
|
||||||
|
|
||||||
|
self.file_path = pathlib.Path(file_path)
|
||||||
|
|
||||||
|
def get_docx(self):
|
||||||
|
"""
|
||||||
|
Method for getting and saving book from queue.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
content = self.access.get_doc(self.book_id)
|
||||||
|
self.save_docx(content)
|
||||||
|
except FileNotFoundError as ferr:
|
||||||
|
self.log('File have not found')
|
||||||
|
raise ferr
|
||||||
|
except Exception as exc:
|
||||||
|
raise exc
|
||||||
|
|
||||||
|
def set_process_status(self):
|
||||||
|
try:
|
||||||
|
self.access.update_status(self.book_id, self.access.PROCESS)
|
||||||
|
except Exception as exc:
|
||||||
|
raise exc
|
||||||
|
|
||||||
|
def set_generate_status(self):
|
||||||
|
try:
|
||||||
|
self.access.update_status(self.book_id, self.access.GENERATE)
|
||||||
|
except Exception as exc:
|
||||||
|
raise exc
|
||||||
|
|
||||||
|
def set_error_status(self):
|
||||||
|
try:
|
||||||
|
self.access.update_status(self.book_id, self.access.ERROR)
|
||||||
|
except Exception as exc:
|
||||||
|
raise exc
|
||||||
|
|
||||||
def convert_doc_to_html(self):
|
def convert_doc_to_html(self):
|
||||||
"""
|
"""
|
||||||
Method for convert .docx document to .html file.
|
Method for convert .docx document to .html file.
|
||||||
@@ -114,27 +137,34 @@ class Book:
|
|||||||
f.close()
|
f.close()
|
||||||
except FileNotFoundError as error:
|
except FileNotFoundError as error:
|
||||||
self.logger.error('Invalid path to input data.')
|
self.logger.error('Invalid path to input data.')
|
||||||
|
self.set_error_status()
|
||||||
raise error
|
raise error
|
||||||
|
|
||||||
command = f'libreoffice --headless --convert-to html "{str(self.file_path)}" --outdir html'
|
folder_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||||||
|
out_dir_path = os.path.join(folder_path, f'html/{self.book_id}')
|
||||||
|
|
||||||
|
command = f'libreoffice --headless --convert-to html "{str(self.file_path)}" --outdir {out_dir_path}'
|
||||||
os.system(command)
|
os.system(command)
|
||||||
|
|
||||||
self.file_path = pathlib.Path(f'html/{self.file_path.stem}.html')
|
out_dir_path = os.path.join(out_dir_path, f'{self.file_path.stem}.html')
|
||||||
|
self.file_path = pathlib.Path(out_dir_path)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
f = open(self.file_path)
|
f = open(self.file_path)
|
||||||
f.close()
|
f.close()
|
||||||
except FileNotFoundError as e:
|
except FileNotFoundError as exc:
|
||||||
self.logger.error('Conversion has gone wrong.')
|
self.logger.error('Conversion has gone wrong.')
|
||||||
raise e
|
self.set_error_status()
|
||||||
|
raise exc
|
||||||
|
|
||||||
self.log('End of conversion from .docx to .html.')
|
self.log('End of conversion from .docx to .html.')
|
||||||
self.log(f'Input file path after conversion: {self.file_path}.')
|
self.log(f'Input file path after conversion: {self.file_path}.')
|
||||||
|
|
||||||
def check_output_directory(self):
|
def check_output_directory(self):
|
||||||
if not self.output_path:
|
if self.output_path is None:
|
||||||
filename = f'{self.file_path.stem}.json'
|
folder_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||||||
self.output_path = f'json/{filename}'
|
output_path = os.path.join(folder_path, f'json/{self.file_path.stem}.json')
|
||||||
|
self.output_path = output_path
|
||||||
|
|
||||||
self.output_path = pathlib.Path(self.output_path)
|
self.output_path = pathlib.Path(self.output_path)
|
||||||
self.log(f'Output file path: {self.output_path}')
|
self.log(f'Output file path: {self.output_path}')
|
||||||
@@ -148,9 +178,10 @@ class Book:
|
|||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
html_text = open(self.file_path, 'r', encoding='utf8').read()
|
html_text = open(self.file_path, 'r', encoding='utf8').read()
|
||||||
except FileNotFoundError as e:
|
except FileNotFoundError as exc:
|
||||||
self.logger.error('There is no html to process. Conversion went wrong or you specified wrong paths.')
|
self.logger.error('There is no html to process. Conversion went wrong or you specified wrong paths.')
|
||||||
raise e
|
self.set_error_status()
|
||||||
|
raise exc
|
||||||
|
|
||||||
self.html_soup = BeautifulSoup(html_text, features='lxml')
|
self.html_soup = BeautifulSoup(html_text, features='lxml')
|
||||||
self.body_tag = self.html_soup.body
|
self.body_tag = self.html_soup.body
|
||||||
@@ -354,19 +385,21 @@ class Book:
|
|||||||
imgs = self.body_tag.find_all('img')
|
imgs = self.body_tag.find_all('img')
|
||||||
|
|
||||||
if len(imgs):
|
if len(imgs):
|
||||||
new_path = pathlib.Path(f'json/img_{self.file_path.stem}/')
|
# new_path = pathlib.Path(f'json/img_{self.file_path.stem}/')
|
||||||
new_path.mkdir(exist_ok=True)
|
# new_path.mkdir(exist_ok=True)
|
||||||
|
|
||||||
for img in imgs:
|
for img in imgs:
|
||||||
img_name = img.attrs.get("src")
|
img_name = img.attrs.get('src')
|
||||||
img_path = pathlib.Path(f'html/{img_name}')
|
img_path = pathlib.Path(f'{self.file_path.parent}/{img_name}')
|
||||||
|
|
||||||
# img_size = os.path.getsize(img_path) # TODO: Implement loading to S3 and then getting link to it.
|
link = self.access.send_image(img_path, self.book_id)
|
||||||
|
img.attrs['src'] = link
|
||||||
|
|
||||||
|
# img_size = os.path.getsize(img_path)
|
||||||
# print(f'{img_name} successfully loaded. Image size: {img_size}.')
|
# print(f'{img_name} successfully loaded. Image size: {img_size}.')
|
||||||
|
# new_img_path = new_path / img_name
|
||||||
new_img_path = new_path / img_name
|
# copyfile(img_path, new_img_path)
|
||||||
copyfile(img_path, new_img_path)
|
# img.attrs["src"] = str(new_img_path)
|
||||||
img.attrs["src"] = str(new_img_path)
|
|
||||||
|
|
||||||
self.images = imgs
|
self.images = imgs
|
||||||
|
|
||||||
@@ -544,7 +577,7 @@ class Book:
|
|||||||
else:
|
else:
|
||||||
chapter_title = f'Untitled chapter {ch_num}'
|
chapter_title = f'Untitled chapter {ch_num}'
|
||||||
chapter = []
|
chapter = []
|
||||||
while self.content[ind].name not in self.SUPPORTED_HEADERS:
|
while ind < len(self.content) and self.content[ind].name not in self.SUPPORTED_HEADERS:
|
||||||
chapter.append(self.format_html(str(self.content[ind])))
|
chapter.append(self.format_html(str(self.content[ind])))
|
||||||
ind += 1
|
ind += 1
|
||||||
res = {chapter_title: ["".join(chapter)]}
|
res = {chapter_title: ["".join(chapter)]}
|
||||||
@@ -560,18 +593,24 @@ class Book:
|
|||||||
with codecs.open(self.output_path, 'w', encoding='utf-8') as f:
|
with codecs.open(self.output_path, 'w', encoding='utf-8') as f:
|
||||||
json.dump(self.content_dict, f, ensure_ascii=False)
|
json.dump(self.content_dict, f, ensure_ascii=False)
|
||||||
|
|
||||||
def log(self, message, logging_level=20):
|
def send_json_content(self):
|
||||||
self.logger.log(msg=message, level=logging_level)
|
try:
|
||||||
|
self.access.send_book(self.book_id, self.content_dict)
|
||||||
|
except Exception as exc:
|
||||||
|
raise exc
|
||||||
|
|
||||||
def conversion(self, logging_format, filemode='w'):
|
def conversion(self, logging_format, filemode='w+'):
|
||||||
self.configure_file_logger(__name__, logging_format=logging_format, filemode=filemode)
|
self.configure_file_logger(__name__, logging_format=logging_format, filemode=filemode)
|
||||||
self.log('Beginning of conversion from .docx to .json.')
|
self.log('Beginning of conversion from .docx to .json.')
|
||||||
if self.convert:
|
self.get_docx()
|
||||||
self.convert_doc_to_html()
|
self.set_process_status()
|
||||||
|
self.convert_doc_to_html()
|
||||||
self.check_output_directory()
|
self.check_output_directory()
|
||||||
self.read_html()
|
self.read_html()
|
||||||
self.clean_trash()
|
self.clean_trash()
|
||||||
self.process_html()
|
self.process_html()
|
||||||
|
self.set_generate_status()
|
||||||
self.convert_to_json()
|
self.convert_to_json()
|
||||||
self.write_json()
|
self.write_json()
|
||||||
|
self.send_json_content()
|
||||||
self.log(f'End of the conversion to LawCarta format. Check {self.output_path}.')
|
self.log(f'End of the conversion to LawCarta format. Check {self.output_path}.')
|
||||||
|
|||||||
Reference in New Issue
Block a user