forked from LiveCarta/BookConverter
201 lines
6.5 KiB
Python
201 lines
6.5 KiB
Python
import json
|
|
import os
|
|
import time
|
|
import requests
|
|
from threading import Event
|
|
from io import BytesIO
|
|
|
|
|
|
class Access:
|
|
"""Class accessing our platform"""
|
|
def __init__(self, url=None):
|
|
"""
|
|
:param url: str, url received from queue message, if field apiURL exists
|
|
else None
|
|
"""
|
|
self.PENDING = 1
|
|
self.PROCESS = 2
|
|
self.GENERATE = 3
|
|
self.FINISH = 4
|
|
self.ERROR = 5
|
|
|
|
self.username = None
|
|
self.password = None
|
|
|
|
self.token = None
|
|
self.refresh = None
|
|
self.refresh_time = None
|
|
self.headers = None
|
|
self.refreshing = Event()
|
|
|
|
self.set_credentials(url)
|
|
self.get_token()
|
|
self.refreshing.set()
|
|
|
|
def set_credentials(self, url):
|
|
folder_path = os.path.dirname(
|
|
os.path.dirname(os.path.abspath(__file__)))
|
|
config_path = os.path.join(folder_path, "config/api_config.json")
|
|
with open(config_path, "r") as f:
|
|
params = json.load(f)
|
|
|
|
self.refreshing.clear()
|
|
self.url = url
|
|
self.username = params["username"]
|
|
self.password = params["password"]
|
|
self.refreshing.set()
|
|
|
|
def format_header(self):
|
|
self.headers = {
|
|
'Authorization': f'Bearer {self.token}'
|
|
}
|
|
# self.session = requests.Session()
|
|
# self.session.auth=
|
|
# self.session.headers.update({'Authorization': f'Bearer {self.token}'})
|
|
|
|
def is_time_for_refreshing(self):
|
|
return self.refresh_time < time.time()
|
|
|
|
def get_token(self):
|
|
json_form = {
|
|
'grantType': 'password',
|
|
'username': self.username,
|
|
'password': self.password
|
|
}
|
|
response = requests.post(
|
|
f'{self.url}/token', json=json_form,
|
|
# auth=('kiryl.miatselitsa', 'iK4yXCvdyHFEEOvG2v3F')
|
|
)
|
|
|
|
if response.status_code == 400:
|
|
raise Exception('400 Bad request: invalid login and/or password.')
|
|
elif response.status_code == 200:
|
|
self.refreshing.clear()
|
|
self.token = response.json()['accessToken']
|
|
self.refresh = response.json()['refreshToken']
|
|
self.refresh_time = response.json()['expiresIn']
|
|
self.format_header()
|
|
self.refreshing.set()
|
|
else:
|
|
raise Exception(f'{response.status_code}')
|
|
|
|
def refresh_token(self):
|
|
json_form = {
|
|
'grantType': 'refresh_token',
|
|
'refreshToken': self.refresh
|
|
}
|
|
response = requests.post(
|
|
f'{self.url}/token', json=json_form
|
|
# auth=('kiryl.miatselitsa', 'iK4yXCvdyHFEEOvG2v3F')
|
|
)
|
|
|
|
if response.status_code == 400:
|
|
raise Exception(
|
|
'400 Bad request: wrong request parameters for refreshing.')
|
|
elif response.status_code == 401:
|
|
self.get_token()
|
|
elif response.status_code == 200:
|
|
self.refreshing.clear()
|
|
self.token = response.json()['accessToken']
|
|
self.refresh = response.json()['refreshToken']
|
|
self.refresh_time = response.json()['expiresIn']
|
|
self.format_header()
|
|
self.refreshing.set()
|
|
else:
|
|
raise Exception(f'{response.status_code}')
|
|
|
|
def get_file(self, file_path):
|
|
"""Function downloads the file[book, preset] from site"""
|
|
if self.is_time_for_refreshing():
|
|
self.refresh_token()
|
|
|
|
self.refreshing.wait()
|
|
response = requests.get(
|
|
file_path, headers=self.headers,
|
|
# auth=('kiryl.miatselitsa', 'iK4yXCvdyHFEEOvG2v3F')
|
|
)
|
|
|
|
if response.status_code == 404:
|
|
raise FileNotFoundError('404 Not Found: file have not found.')
|
|
elif response.status_code == 200:
|
|
content = response.content
|
|
else:
|
|
raise Exception(f'Error in getting preset from url: {file_path}, '
|
|
f'status code:{response.status_code}')
|
|
return content
|
|
|
|
def sleep(timeout: float, retry=3):
|
|
def decorator(function):
|
|
"""Decorator sleeping timeout sec and makes 3 retries"""
|
|
def wrapper(*args, **kwargs):
|
|
retries = 0
|
|
while retries < retry:
|
|
try:
|
|
value = function(*args, **kwargs)
|
|
if value is not None:
|
|
return value
|
|
except:
|
|
time.sleep(timeout)
|
|
retries += 1
|
|
return wrapper
|
|
return decorator
|
|
|
|
@sleep(3)
|
|
def send_image(self, img_path, doc_id, img_content: bytes = None):
|
|
"""Function sends images to site"""
|
|
if self.is_time_for_refreshing():
|
|
self.refresh_token()
|
|
self.refreshing.wait()
|
|
|
|
img_obj = BytesIO(img_content) if img_content else open(img_path, 'rb')
|
|
files = {
|
|
'image': (os.path.basename(img_path), img_obj)
|
|
}
|
|
response = requests.post(
|
|
f'{self.url}/doc-convert/image', files=files, headers=self.headers
|
|
# auth=('kiryl.miatselitsa', 'iK4yXCvdyHFEEOvG2v3F')
|
|
)
|
|
img_obj.close()
|
|
|
|
if response.status_code == 200:
|
|
img_url = response.json()['imageUrl']
|
|
else:
|
|
self.update_status(doc_id, self.ERROR)
|
|
raise Exception(
|
|
f'{response.status_code} Bad request: {response.json()["message"]}.')
|
|
return img_url
|
|
|
|
def send_book(self, doc_id, content):
|
|
"""Function sends the book to site"""
|
|
if self.is_time_for_refreshing():
|
|
self.refresh_token()
|
|
|
|
self.refreshing.wait()
|
|
json_content = {
|
|
'json': json.dumps(content)
|
|
}
|
|
response = requests.post(
|
|
f'{self.url}/doc-convert/{doc_id}/import', json=json_content, headers=self.headers)
|
|
|
|
if response.status_code == 200:
|
|
pass
|
|
else:
|
|
self.update_status(doc_id, self.ERROR)
|
|
raise Exception(
|
|
f'{response.status_code} Bad request: {response.json()["message"]}.')
|
|
|
|
def update_status(self, doc_id, status):
|
|
"""Function updates status of the book on site"""
|
|
if self.is_time_for_refreshing():
|
|
self.refresh_token()
|
|
|
|
self.refreshing.wait()
|
|
response = requests.patch(
|
|
f'{self.url}/doc-convert/{doc_id}/status/{status}', headers=self.headers)
|
|
|
|
if response.status_code == 200:
|
|
pass
|
|
else:
|
|
raise Exception(
|
|
f'{response.status_code} Bad request: {response.json()["message"]}.')
|