diff --git a/bricktracker/instructions.py b/bricktracker/instructions.py index cc5cee7..12e5171 100644 --- a/bricktracker/instructions.py +++ b/bricktracker/instructions.py @@ -1,6 +1,7 @@ from datetime import datetime, timezone import logging import os +from urllib.parse import urljoin from shutil import copyfileobj import traceback from typing import Tuple, TYPE_CHECKING @@ -11,6 +12,8 @@ import humanize import requests from werkzeug.datastructures import FileStorage from werkzeug.utils import secure_filename +import re +import cloudscraper from .exceptions import ErrorException, DownloadException if TYPE_CHECKING: @@ -90,85 +93,44 @@ class BrickInstructions(object): # Download an instruction file def download(self, path: str, /) -> None: try: - # Just to make sure that the progress is initiated - self.socket.progress( - message='Downloading {file}'.format( - file=self.filename, - ) - ) - + # start progress + self.socket.progress(message=f'Downloading {self.filename}') target = self.path(filename=secure_filename(self.filename)) - # Skipping rather than failing here + # skip if already exists if os.path.isfile(target): - self.socket.complete( - message='File {file} already exists, skipped'.format( - file=self.filename, - ) + return self.socket.complete( + message=f'File {self.filename} already exists, skipped' ) - else: - url = current_app.config['REBRICKABLE_LINK_INSTRUCTIONS_PATTERN'].format( # noqa: E501 - path=path - ) - trimmed_url = current_app.config['REBRICKABLE_LINK_INSTRUCTIONS_PATTERN'].format( # noqa: E501 - path=path.partition('/')[0] - ) + # path is already a full URL from find_instructions() + url = path + self.socket.progress(message=f'Requesting {url}') + # use cloudscraper to pass the CF challenge here too + scraper = cloudscraper.create_scraper() + scraper.headers.update({'User-Agent': current_app.config['REBRICKABLE_USER_AGENT']}) + response = scraper.get(url, stream=True) + if not response.ok: + raise DownloadException(f'Failed to download: HTTP {response.status_code}') - # Request the file - self.socket.progress( - message='Requesting {url}'.format( - url=trimmed_url, - ) - ) + # record size if available + try: + self.size = int(response.headers.get('Content-Length', 0)) + except ValueError: + self.size = 0 - response = requests.get(url, stream=True) - if response.ok: + # download to disk + self.socket.progress(message=f'Downloading {self.filename} ({self.human_size()})') + with open(target, 'wb') as f: + copyfileobj(response.raw, f) - # Store the content header as size - try: - self.size = int( - response.headers.get('Content-length', 0) - ) - except Exception: - self.size = 0 - - # Downloading the file - self.socket.progress( - message='Downloading {url} ({size})'.format( - url=trimmed_url, - size=self.human_size(), - ) - ) - - with open(target, 'wb') as f: - copyfileobj(response.raw, f) - else: - raise DownloadException('failed to download: {code}'.format( # noqa: E501 - code=response.status_code - )) - - # Info - logger.info('The instruction file {file} has been downloaded'.format( # noqa: E501 - file=self.filename - )) - - # Complete - self.socket.complete( - message='File {file} downloaded ({size})'.format( # noqa: E501 - file=self.filename, - size=self.human_size() - ) - ) + logger.info(f'The instruction file {self.filename} has been downloaded') + self.socket.complete(message=f'File {self.filename} downloaded ({self.human_size()})') except Exception as e: self.socket.fail( - message='Error while downloading instruction {file}: {error}'.format( # noqa: E501 - file=self.filename, - error=e, - ) + message=f'Error downloading {self.filename}: {e}' ) - logger.debug(traceback.format_exc()) # Display the size in a human format @@ -250,40 +212,52 @@ class BrickInstructions(object): # Find the instructions for a set @staticmethod def find_instructions(set: str, /) -> list[Tuple[str, str]]: - response = requests.get( - current_app.config['REBRICKABLE_LINK_INSTRUCTIONS_PATTERN'].format( - path=set, - ), - headers={ - 'User-Agent': current_app.config['REBRICKABLE_USER_AGENT'] - } - ) + """ + Scrape Rebrickable’s HTML and return a list of + (filename_slug, download_url). Duplicate slugs get _1, _2, … + """ + page_url = f"https://rebrickable.com/instructions/{set}/" + logger.debug(f"[find_instructions] fetching HTML from {page_url!r}") - if not response.ok: - raise ErrorException('Failed to load the Rebrickable instructions page. Status code: {code}'.format( # noqa: E501 - code=response.status_code - )) + # Solve Cloudflare’s challenge + scraper = cloudscraper.create_scraper() + scraper.headers.update({'User-Agent': current_app.config['REBRICKABLE_USER_AGENT']}) + resp = scraper.get(page_url) + if not resp.ok: + raise ErrorException(f'Failed to load instructions page for {set}. HTTP {resp.status_code}') - # Parse the HTML content - soup = BeautifulSoup(response.content, 'html.parser') + soup = BeautifulSoup(resp.content, 'html.parser') + link_re = re.compile(r'^/instructions/\d+/.+/download/') - # Collect all tags with "LEGO Building Instructions" in the - # alt attribute - found_tags: list[Tuple[str, str]] = [] - for a_tag in soup.find_all('a', href=True): - img_tag = a_tag.find('img', alt=True) - if img_tag and "LEGO Building Instructions" in img_tag['alt']: - found_tags.append( - ( - img_tag['alt'].removeprefix('LEGO Building Instructions for '), # noqa: E501 - a_tag['href'] - ) - ) # Save alt and href + raw: list[tuple[str, str]] = [] + for a in soup.find_all('a', href=link_re): + img = a.find('img', alt=True) + if not img or set not in img['alt']: + continue - # Raise an error if nothing found - if not len(found_tags): - raise ErrorException('No instruction found for set {set}'.format( - set=set - )) + # Turn the alt text into a slug + alt_text = img['alt'].removeprefix('LEGO Building Instructions for ') + slug = re.sub(r'[^A-Za-z0-9]+', '-', alt_text).strip('-') - return found_tags + # Build the absolute download URL + download_url = urljoin('https://rebrickable.com', a['href']) + raw.append((slug, download_url)) + + if not raw: + raise ErrorException(f'No download links found on instructions page for {set}') + + # Disambiguate duplicate slugs by appending _1, _2, … + from collections import Counter, defaultdict + counts = Counter(name for name, _ in raw) + seen: dict[str, int] = defaultdict(int) + unique: list[tuple[str, str]] = [] + for name, url in raw: + idx = seen[name] + if counts[name] > 1 and idx > 0: + final_name = f"{name}_{idx}" + else: + final_name = name + seen[name] += 1 + unique.append((final_name, url)) + + return unique diff --git a/requirements.txt b/requirements.txt index d2ca909..cfdb1dd 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,3 +9,4 @@ rebrick requests tzdata bs4 +cloudscraper