forked from FrederikBaerentsen/BrickTracker
Dowloads instructions through a socket
This commit is contained in:
@@ -2,6 +2,7 @@ from datetime import datetime, timezone
|
||||
import logging
|
||||
import os
|
||||
from shutil import copyfileobj
|
||||
import traceback
|
||||
from typing import Tuple, TYPE_CHECKING
|
||||
|
||||
from bs4 import BeautifulSoup
|
||||
@@ -12,14 +13,16 @@ from werkzeug.datastructures import FileStorage
|
||||
from werkzeug.utils import secure_filename
|
||||
|
||||
from .exceptions import ErrorException, DownloadException
|
||||
from .parser import parse_set
|
||||
if TYPE_CHECKING:
|
||||
from .rebrickable_set import RebrickableSet
|
||||
from .socket import BrickSocket
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class BrickInstructions(object):
|
||||
socket: 'BrickSocket'
|
||||
|
||||
allowed: bool
|
||||
rebrickable: 'RebrickableSet | None'
|
||||
extension: str
|
||||
@@ -29,9 +32,22 @@ class BrickInstructions(object):
|
||||
name: str
|
||||
size: int
|
||||
|
||||
def __init__(self, file: os.DirEntry | str, /):
|
||||
def __init__(
|
||||
self,
|
||||
file: os.DirEntry | str,
|
||||
/,
|
||||
*,
|
||||
socket: 'BrickSocket | None' = None,
|
||||
):
|
||||
# Save the socket
|
||||
if socket is not None:
|
||||
self.socket = socket
|
||||
|
||||
if isinstance(file, str):
|
||||
self.filename = file
|
||||
|
||||
if self.filename == '':
|
||||
raise ErrorException('An instruction filename cannot be empty')
|
||||
else:
|
||||
self.filename = file.name
|
||||
|
||||
@@ -73,31 +89,84 @@ class BrickInstructions(object):
|
||||
|
||||
# Download an instruction file
|
||||
def download(self, path: str, /) -> None:
|
||||
target = self.path(filename=secure_filename(self.filename))
|
||||
try:
|
||||
# Just to make sure that the progress is initiated
|
||||
self.socket.progress(
|
||||
message='Downloading {file}'.format(
|
||||
file=self.filename,
|
||||
)
|
||||
)
|
||||
|
||||
if os.path.isfile(target):
|
||||
raise ErrorException('Cannot download {target} as it already exists'.format( # noqa: E501
|
||||
target=self.filename
|
||||
))
|
||||
target = self.path(filename=secure_filename(self.filename))
|
||||
|
||||
url = current_app.config['REBRICKABLE_LINK_INSTRUCTIONS_PATTERN'].format( # noqa: E501
|
||||
path=path
|
||||
)
|
||||
# Skipping rather than failing here
|
||||
if os.path.isfile(target):
|
||||
self.socket.complete(
|
||||
message='File {file} already exists, skipped'.format(
|
||||
file=self.filename,
|
||||
)
|
||||
)
|
||||
|
||||
response = requests.get(url, stream=True)
|
||||
if response.ok:
|
||||
with open(target, 'wb') as f:
|
||||
copyfileobj(response.raw, f)
|
||||
else:
|
||||
raise DownloadException('Failed to download {file}. Status code: {code}'.format( # noqa: E501
|
||||
file=self.filename,
|
||||
code=response.status_code
|
||||
))
|
||||
else:
|
||||
url = current_app.config['REBRICKABLE_LINK_INSTRUCTIONS_PATTERN'].format( # noqa: E501
|
||||
path=path
|
||||
)
|
||||
|
||||
# Info
|
||||
logger.info('The instruction file {file} has been downloaded'.format(
|
||||
file=self.filename
|
||||
))
|
||||
# Request the file
|
||||
self.socket.progress(
|
||||
message='Requesting {url}'.format(
|
||||
url=url,
|
||||
)
|
||||
)
|
||||
|
||||
response = requests.get(url, stream=True)
|
||||
if response.ok:
|
||||
|
||||
# Store the content header as size
|
||||
try:
|
||||
self.size = int(
|
||||
response.headers.get('Content-length', 0)
|
||||
)
|
||||
except Exception:
|
||||
self.size = 0
|
||||
|
||||
# Downloading the file
|
||||
self.socket.progress(
|
||||
message='Downloading {url} ({size})'.format(
|
||||
url=url,
|
||||
size=self.human_size(),
|
||||
)
|
||||
)
|
||||
|
||||
with open(target, 'wb') as f:
|
||||
copyfileobj(response.raw, f)
|
||||
else:
|
||||
raise DownloadException('failed to download: {code}'.format( # noqa: E501
|
||||
code=response.status_code
|
||||
))
|
||||
|
||||
# Info
|
||||
logger.info('The instruction file {file} has been downloaded'.format( # noqa: E501
|
||||
file=self.filename
|
||||
))
|
||||
|
||||
# Complete
|
||||
self.socket.complete(
|
||||
message='File {file} downloaded ({size})'.format( # noqa: E501
|
||||
file=self.filename,
|
||||
size=self.human_size()
|
||||
)
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
self.socket.fail(
|
||||
message='Error while downloading instruction {file}: {error}'.format( # noqa: E501
|
||||
file=self.filename,
|
||||
error=e,
|
||||
)
|
||||
)
|
||||
|
||||
logger.debug(traceback.format_exc())
|
||||
|
||||
# Display the size in a human format
|
||||
def human_size(self) -> str:
|
||||
@@ -175,36 +244,9 @@ class BrickInstructions(object):
|
||||
else:
|
||||
return 'file-line'
|
||||
|
||||
# Download selected instructions for a set
|
||||
@staticmethod
|
||||
def download_instructions(form: dict[str, str], /) -> None:
|
||||
selected_instructions: list[Tuple[str, str]] = []
|
||||
|
||||
# Get the list of instructions
|
||||
for key in form:
|
||||
if key.startswith('instruction-') and form.get(key) == 'on':
|
||||
_, _, index = key.partition('-')
|
||||
alt_text = form.get(f'instruction-alt-text-{index}', '')
|
||||
href_text = form.get(f'instruction-href-text-{index}', '').removeprefix('/instructions/') # Remove the /instructions/ part # noqa: E501
|
||||
selected_instructions.append((href_text, alt_text))
|
||||
|
||||
# Raise if nothing selected
|
||||
if not len(selected_instructions):
|
||||
raise ErrorException('No instruction was selected to download')
|
||||
|
||||
# Loop over selected instructions and download them
|
||||
for href, filename in selected_instructions:
|
||||
BrickInstructions(f"{filename}.pdf").download(href)
|
||||
|
||||
# Find the instructions for a set
|
||||
@staticmethod
|
||||
def find_instructions(form: dict[str, str], /) -> list[Tuple[str, str]]:
|
||||
# Grab the set ID
|
||||
set: str = form.get('add-set', '')
|
||||
|
||||
# Parse it
|
||||
set = parse_set(set)
|
||||
|
||||
def find_instructions(set: str, /) -> list[Tuple[str, str]]:
|
||||
response = requests.get(
|
||||
current_app.config['REBRICKABLE_LINK_INSTRUCTIONS_PATTERN'].format(
|
||||
path=set,
|
||||
|
||||
@@ -5,6 +5,8 @@ from flask import copy_current_request_context, Flask, request
|
||||
from flask_socketio import SocketIO
|
||||
|
||||
from .configuration_list import BrickConfigurationList
|
||||
from .instructions import BrickInstructions
|
||||
from .instructions_list import BrickInstructionsList
|
||||
from .login import LoginManager
|
||||
from .set import BrickSet
|
||||
from .sql import close as sql_close
|
||||
@@ -17,6 +19,7 @@ MESSAGES: Final[dict[str, str]] = {
|
||||
'COMPLETE': 'complete',
|
||||
'CONNECT': 'connect',
|
||||
'DISCONNECT': 'disconnect',
|
||||
'DOWNLOAD_INSTRUCTIONS': 'download_instructions',
|
||||
'FAIL': 'fail',
|
||||
'IMPORT_SET': 'import_set',
|
||||
'LOAD_SET': 'load_set',
|
||||
@@ -84,6 +87,41 @@ class BrickSocket(object):
|
||||
def disconnect() -> None:
|
||||
self.disconnected()
|
||||
|
||||
@self.socket.on(MESSAGES['DOWNLOAD_INSTRUCTIONS'], namespace=self.namespace) # noqa: E501
|
||||
def download_instructions(data: dict[str, Any], /) -> None:
|
||||
# Needs to be authenticated
|
||||
if LoginManager.is_not_authenticated():
|
||||
self.fail(message='You need to be authenticated')
|
||||
return
|
||||
|
||||
instructions = BrickInstructions(
|
||||
'{name}.pdf'.format(name=data.get('alt', '')),
|
||||
socket=self
|
||||
)
|
||||
|
||||
path = data.get('href', '').removeprefix('/instructions/')
|
||||
|
||||
# Update the progress
|
||||
try:
|
||||
self.progress_total = int(data.get('total', 0))
|
||||
self.progress_count = int(data.get('current', 0))
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Start it in a thread if requested
|
||||
if self.threaded:
|
||||
@copy_current_request_context
|
||||
def do_download() -> None:
|
||||
instructions.download(path)
|
||||
|
||||
BrickInstructionsList(force=True)
|
||||
|
||||
self.socket.start_background_task(do_download)
|
||||
else:
|
||||
instructions.download(path)
|
||||
|
||||
BrickInstructionsList(force=True)
|
||||
|
||||
@self.socket.on(MESSAGES['IMPORT_SET'], namespace=self.namespace)
|
||||
def import_set(data: dict[str, Any], /) -> None:
|
||||
# Needs to be authenticated
|
||||
|
||||
@@ -14,6 +14,7 @@ from .exceptions import exception_handler
|
||||
from ..instructions import BrickInstructions
|
||||
from ..instructions_list import BrickInstructionsList
|
||||
from ..parser import parse_set
|
||||
from ..socket import MESSAGES
|
||||
from .upload import upload_helper
|
||||
|
||||
instructions_page = Blueprint(
|
||||
@@ -149,24 +150,22 @@ def download() -> str:
|
||||
|
||||
|
||||
# Show search results
|
||||
@instructions_page.route('/download/select', methods=['POST'])
|
||||
@login_required
|
||||
@exception_handler(__file__, post_redirect='instructions.download')
|
||||
def select_download() -> str:
|
||||
return render_template(
|
||||
'instructions.html',
|
||||
download=True,
|
||||
instructions=BrickInstructions.find_instructions(request.form)
|
||||
)
|
||||
|
||||
|
||||
# Download files
|
||||
@instructions_page.route('/download', methods=['POST'])
|
||||
@login_required
|
||||
@exception_handler(__file__, post_redirect='instructions.download')
|
||||
def do_download() -> Response:
|
||||
BrickInstructions.download_instructions(request.form)
|
||||
def do_download() -> str:
|
||||
# Grab the set number
|
||||
try:
|
||||
set = parse_set(request.form.get('download-set', ''))
|
||||
except Exception:
|
||||
set = ''
|
||||
|
||||
BrickInstructionsList(force=True)
|
||||
|
||||
return redirect(url_for('instructions.list'))
|
||||
return render_template(
|
||||
'instructions.html',
|
||||
download=True,
|
||||
instructions=BrickInstructions.find_instructions(set),
|
||||
set=set,
|
||||
path=current_app.config['SOCKET_PATH'],
|
||||
namespace=current_app.config['SOCKET_NAMESPACE'],
|
||||
messages=MESSAGES
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user