2025-01-17 11:03:00 +01:00
|
|
|
from datetime import datetime, timezone
|
|
|
|
import logging
|
|
|
|
import os
|
2025-01-25 08:51:18 +01:00
|
|
|
from shutil import copyfileobj
|
2025-01-25 22:42:59 +01:00
|
|
|
import traceback
|
2025-01-25 08:51:18 +01:00
|
|
|
from typing import Tuple, TYPE_CHECKING
|
2025-01-17 11:03:00 +01:00
|
|
|
|
2025-01-25 08:51:18 +01:00
|
|
|
from bs4 import BeautifulSoup
|
|
|
|
from flask import current_app, g, url_for
|
2025-01-17 11:03:00 +01:00
|
|
|
import humanize
|
2025-01-25 08:51:18 +01:00
|
|
|
import requests
|
2025-01-17 11:03:00 +01:00
|
|
|
from werkzeug.datastructures import FileStorage
|
|
|
|
from werkzeug.utils import secure_filename
|
|
|
|
|
2025-01-25 08:51:18 +01:00
|
|
|
from .exceptions import ErrorException, DownloadException
|
2025-01-17 11:03:00 +01:00
|
|
|
if TYPE_CHECKING:
|
2025-01-24 10:36:24 +01:00
|
|
|
from .rebrickable_set import RebrickableSet
|
2025-01-25 22:42:59 +01:00
|
|
|
from .socket import BrickSocket
|
2025-01-17 11:03:00 +01:00
|
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
|
|
class BrickInstructions(object):
|
2025-01-25 22:42:59 +01:00
|
|
|
socket: 'BrickSocket'
|
|
|
|
|
2025-01-17 11:03:00 +01:00
|
|
|
allowed: bool
|
2025-01-24 10:36:24 +01:00
|
|
|
rebrickable: 'RebrickableSet | None'
|
2025-01-17 11:03:00 +01:00
|
|
|
extension: str
|
|
|
|
filename: str
|
|
|
|
mtime: datetime
|
2025-01-24 10:36:24 +01:00
|
|
|
set: 'str | None'
|
2025-01-17 11:03:00 +01:00
|
|
|
name: str
|
|
|
|
size: int
|
|
|
|
|
2025-01-25 22:42:59 +01:00
|
|
|
def __init__(
|
|
|
|
self,
|
|
|
|
file: os.DirEntry | str,
|
|
|
|
/,
|
|
|
|
*,
|
|
|
|
socket: 'BrickSocket | None' = None,
|
|
|
|
):
|
|
|
|
# Save the socket
|
|
|
|
if socket is not None:
|
|
|
|
self.socket = socket
|
|
|
|
|
2025-01-17 11:03:00 +01:00
|
|
|
if isinstance(file, str):
|
|
|
|
self.filename = file
|
2025-01-25 22:42:59 +01:00
|
|
|
|
|
|
|
if self.filename == '':
|
|
|
|
raise ErrorException('An instruction filename cannot be empty')
|
2025-01-17 11:03:00 +01:00
|
|
|
else:
|
|
|
|
self.filename = file.name
|
|
|
|
|
|
|
|
# Store the file stats
|
|
|
|
stat = file.stat()
|
|
|
|
self.size = stat.st_size
|
|
|
|
self.mtime = datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc)
|
|
|
|
|
|
|
|
# Store the name and extension, check if extension is allowed
|
|
|
|
self.name, self.extension = os.path.splitext(self.filename)
|
|
|
|
self.extension = self.extension.lower()
|
2025-01-20 15:20:07 +01:00
|
|
|
self.allowed = self.extension in current_app.config['INSTRUCTIONS_ALLOWED_EXTENSIONS'] # noqa: E501
|
2025-01-17 11:03:00 +01:00
|
|
|
|
|
|
|
# Placeholder
|
2025-01-24 10:36:24 +01:00
|
|
|
self.rebrickable = None
|
|
|
|
self.set = None
|
2025-01-17 11:03:00 +01:00
|
|
|
|
|
|
|
# Extract the set number
|
|
|
|
if self.allowed:
|
|
|
|
# Normalize special chars to improve set detection
|
|
|
|
normalized = self.name.replace('_', '-')
|
|
|
|
normalized = normalized.replace(' ', '-')
|
|
|
|
|
|
|
|
splits = normalized.split('-', 2)
|
|
|
|
|
|
|
|
if len(splits) >= 2:
|
2025-01-23 08:45:58 +01:00
|
|
|
try:
|
|
|
|
# Trying to make sense of each part as integers
|
|
|
|
int(splits[0])
|
|
|
|
int(splits[1])
|
|
|
|
|
|
|
|
self.set = '-'.join(splits[:2])
|
|
|
|
except Exception:
|
|
|
|
pass
|
2025-01-17 11:03:00 +01:00
|
|
|
|
|
|
|
# Delete an instruction file
|
|
|
|
def delete(self, /) -> None:
|
|
|
|
os.remove(self.path())
|
|
|
|
|
2025-01-25 08:51:18 +01:00
|
|
|
# Download an instruction file
|
|
|
|
def download(self, path: str, /) -> None:
|
2025-01-25 22:42:59 +01:00
|
|
|
try:
|
|
|
|
# Just to make sure that the progress is initiated
|
|
|
|
self.socket.progress(
|
|
|
|
message='Downloading {file}'.format(
|
|
|
|
file=self.filename,
|
|
|
|
)
|
|
|
|
)
|
2025-01-25 08:51:18 +01:00
|
|
|
|
2025-01-25 22:42:59 +01:00
|
|
|
target = self.path(filename=secure_filename(self.filename))
|
2025-01-25 08:51:18 +01:00
|
|
|
|
2025-01-25 22:42:59 +01:00
|
|
|
# Skipping rather than failing here
|
|
|
|
if os.path.isfile(target):
|
|
|
|
self.socket.complete(
|
|
|
|
message='File {file} already exists, skipped'.format(
|
|
|
|
file=self.filename,
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
else:
|
|
|
|
url = current_app.config['REBRICKABLE_LINK_INSTRUCTIONS_PATTERN'].format( # noqa: E501
|
|
|
|
path=path
|
|
|
|
)
|
|
|
|
|
|
|
|
# Request the file
|
|
|
|
self.socket.progress(
|
|
|
|
message='Requesting {url}'.format(
|
|
|
|
url=url,
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
response = requests.get(url, stream=True)
|
|
|
|
if response.ok:
|
|
|
|
|
|
|
|
# Store the content header as size
|
|
|
|
try:
|
|
|
|
self.size = int(
|
|
|
|
response.headers.get('Content-length', 0)
|
|
|
|
)
|
|
|
|
except Exception:
|
|
|
|
self.size = 0
|
|
|
|
|
|
|
|
# Downloading the file
|
|
|
|
self.socket.progress(
|
|
|
|
message='Downloading {url} ({size})'.format(
|
|
|
|
url=url,
|
|
|
|
size=self.human_size(),
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
with open(target, 'wb') as f:
|
|
|
|
copyfileobj(response.raw, f)
|
|
|
|
else:
|
|
|
|
raise DownloadException('failed to download: {code}'.format( # noqa: E501
|
|
|
|
code=response.status_code
|
|
|
|
))
|
|
|
|
|
|
|
|
# Info
|
|
|
|
logger.info('The instruction file {file} has been downloaded'.format( # noqa: E501
|
|
|
|
file=self.filename
|
|
|
|
))
|
2025-01-25 08:51:18 +01:00
|
|
|
|
2025-01-25 22:42:59 +01:00
|
|
|
# Complete
|
|
|
|
self.socket.complete(
|
|
|
|
message='File {file} downloaded ({size})'.format( # noqa: E501
|
|
|
|
file=self.filename,
|
|
|
|
size=self.human_size()
|
|
|
|
)
|
|
|
|
)
|
2025-01-25 08:51:18 +01:00
|
|
|
|
2025-01-25 22:42:59 +01:00
|
|
|
except Exception as e:
|
|
|
|
self.socket.fail(
|
|
|
|
message='Error while downloading instruction {file}: {error}'.format( # noqa: E501
|
|
|
|
file=self.filename,
|
|
|
|
error=e,
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
logger.debug(traceback.format_exc())
|
2025-01-25 08:51:18 +01:00
|
|
|
|
2025-01-17 11:03:00 +01:00
|
|
|
# Display the size in a human format
|
|
|
|
def human_size(self) -> str:
|
|
|
|
return humanize.naturalsize(self.size)
|
|
|
|
|
|
|
|
# Display the time in a human format
|
|
|
|
def human_time(self) -> str:
|
|
|
|
return self.mtime.astimezone(g.timezone).strftime(
|
2025-01-20 15:20:07 +01:00
|
|
|
current_app.config['FILE_DATETIME_FORMAT']
|
2025-01-17 11:03:00 +01:00
|
|
|
)
|
|
|
|
|
|
|
|
# Compute the path of an instruction file
|
2025-01-22 16:36:35 +01:00
|
|
|
def path(self, /, *, filename=None) -> str:
|
2025-01-17 11:03:00 +01:00
|
|
|
if filename is None:
|
|
|
|
filename = self.filename
|
|
|
|
|
|
|
|
return os.path.join(
|
|
|
|
current_app.static_folder, # type: ignore
|
2025-01-20 15:20:07 +01:00
|
|
|
current_app.config['INSTRUCTIONS_FOLDER'],
|
2025-01-17 11:03:00 +01:00
|
|
|
filename
|
|
|
|
)
|
|
|
|
|
|
|
|
# Rename an instructions file
|
|
|
|
def rename(self, filename: str, /) -> None:
|
|
|
|
# Add the extension
|
|
|
|
filename = '{name}{ext}'.format(name=filename, ext=self.extension)
|
|
|
|
|
|
|
|
if filename != self.filename:
|
|
|
|
# Check if it already exists
|
|
|
|
target = self.path(filename=filename)
|
|
|
|
if os.path.isfile(target):
|
|
|
|
raise ErrorException('Cannot rename {source} to {target} as it already exists'.format( # noqa: E501
|
|
|
|
source=self.filename,
|
|
|
|
target=filename
|
|
|
|
))
|
|
|
|
|
|
|
|
os.rename(self.path(), target)
|
|
|
|
|
|
|
|
# Upload a new instructions file
|
|
|
|
def upload(self, file: FileStorage, /) -> None:
|
2025-01-22 16:36:35 +01:00
|
|
|
target = self.path(filename=secure_filename(self.filename))
|
2025-01-17 11:03:00 +01:00
|
|
|
|
|
|
|
if os.path.isfile(target):
|
|
|
|
raise ErrorException('Cannot upload {target} as it already exists'.format( # noqa: E501
|
|
|
|
target=self.filename
|
|
|
|
))
|
|
|
|
|
|
|
|
file.save(target)
|
2025-01-22 22:41:35 +01:00
|
|
|
|
|
|
|
# Info
|
|
|
|
logger.info('The instruction file {file} has been imported'.format(
|
|
|
|
file=self.filename
|
|
|
|
))
|
2025-01-17 11:03:00 +01:00
|
|
|
|
|
|
|
# Compute the url for a set instructions file
|
|
|
|
def url(self, /) -> str:
|
|
|
|
if not self.allowed:
|
|
|
|
return ''
|
|
|
|
|
2025-01-20 15:20:07 +01:00
|
|
|
folder: str = current_app.config['INSTRUCTIONS_FOLDER']
|
2025-01-17 11:03:00 +01:00
|
|
|
|
|
|
|
# Compute the path
|
|
|
|
path = os.path.join(folder, self.filename)
|
|
|
|
|
|
|
|
return url_for('static', filename=path)
|
|
|
|
|
|
|
|
# Return the icon depending on the extension
|
|
|
|
def icon(self, /) -> str:
|
|
|
|
if self.extension == '.pdf':
|
|
|
|
return 'file-pdf-2-line'
|
|
|
|
elif self.extension in ['.doc', '.docx']:
|
|
|
|
return 'file-word-line'
|
|
|
|
elif self.extension in ['.png', '.jpg', '.jpeg']:
|
|
|
|
return 'file-image-line'
|
|
|
|
else:
|
|
|
|
return 'file-line'
|
2025-01-25 08:51:18 +01:00
|
|
|
|
|
|
|
# Find the instructions for a set
|
|
|
|
@staticmethod
|
2025-01-25 22:42:59 +01:00
|
|
|
def find_instructions(set: str, /) -> list[Tuple[str, str]]:
|
2025-01-25 08:51:18 +01:00
|
|
|
response = requests.get(
|
|
|
|
current_app.config['REBRICKABLE_LINK_INSTRUCTIONS_PATTERN'].format(
|
|
|
|
path=set,
|
|
|
|
),
|
|
|
|
headers={
|
|
|
|
'User-Agent': current_app.config['REBRICKABLE_USER_AGENT']
|
|
|
|
}
|
|
|
|
)
|
|
|
|
|
|
|
|
if not response.ok:
|
|
|
|
raise ErrorException('Failed to load the Rebrickable instructions page. Status code: {code}'.format( # noqa: E501
|
|
|
|
code=response.status_code
|
|
|
|
))
|
|
|
|
|
|
|
|
# Parse the HTML content
|
|
|
|
soup = BeautifulSoup(response.content, 'html.parser')
|
|
|
|
|
|
|
|
# Collect all <img> tags with "LEGO Building Instructions" in the
|
|
|
|
# alt attribute
|
|
|
|
found_tags: list[Tuple[str, str]] = []
|
|
|
|
for a_tag in soup.find_all('a', href=True):
|
|
|
|
img_tag = a_tag.find('img', alt=True)
|
|
|
|
if img_tag and "LEGO Building Instructions" in img_tag['alt']:
|
|
|
|
found_tags.append(
|
|
|
|
(
|
|
|
|
img_tag['alt'].removeprefix('LEGO Building Instructions for '), # noqa: E501
|
|
|
|
a_tag['href']
|
|
|
|
)
|
|
|
|
) # Save alt and href
|
|
|
|
|
|
|
|
# Raise an error if nothing found
|
|
|
|
if not len(found_tags):
|
|
|
|
raise ErrorException('No instruction found for set {set}'.format(
|
|
|
|
set=set
|
|
|
|
))
|
|
|
|
|
|
|
|
return found_tags
|