feat(parts): add core models for individual parts and lot management
This commit is contained in:
@@ -0,0 +1,904 @@
|
||||
import logging
|
||||
import os
|
||||
import traceback
|
||||
from typing import Any, Self, TYPE_CHECKING
|
||||
from urllib.parse import urlparse
|
||||
from uuid import uuid4
|
||||
|
||||
from flask import current_app, url_for
|
||||
import requests
|
||||
from shutil import copyfileobj
|
||||
|
||||
from .exceptions import NotFoundException, DatabaseException, ErrorException
|
||||
from .record import BrickRecord
|
||||
from .set_owner_list import BrickSetOwnerList
|
||||
from .set_purchase_location_list import BrickSetPurchaseLocationList
|
||||
from .set_storage_list import BrickSetStorageList
|
||||
from .set_tag_list import BrickSetTagList
|
||||
from .sql import BrickSQL
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .socket import BrickSocket
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# Individual part (standalone, not associated with a set or minifigure)
|
||||
class IndividualPart(BrickRecord):
|
||||
# Queries
|
||||
select_query: str = 'individual_part/select/by_id'
|
||||
insert_query: str = 'individual_part/insert'
|
||||
update_query: str = 'individual_part/update'
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
/,
|
||||
*,
|
||||
record: Any | None = None
|
||||
):
|
||||
super().__init__()
|
||||
|
||||
# Ingest the record if it has one
|
||||
if record is not None:
|
||||
self.ingest(record)
|
||||
|
||||
# Select a specific individual part by UUID
|
||||
def select_by_id(self, id: str, /) -> Self:
|
||||
from .set_owner_list import BrickSetOwnerList
|
||||
from .set_status_list import BrickSetStatusList
|
||||
from .set_tag_list import BrickSetTagList
|
||||
|
||||
self.fields.id = id
|
||||
if not self.select(
|
||||
override_query=self.select_query,
|
||||
owners=BrickSetOwnerList.as_columns(),
|
||||
statuses=BrickSetStatusList.as_columns(all=True),
|
||||
tags=BrickSetTagList.as_columns(),
|
||||
):
|
||||
raise NotFoundException(
|
||||
'Individual part with id "{id}" not found'.format(id=id)
|
||||
)
|
||||
return self
|
||||
|
||||
# Delete an individual part
|
||||
def delete(self, /) -> None:
|
||||
sql = BrickSQL()
|
||||
sql.executescript(
|
||||
'individual_part/delete',
|
||||
id=self.fields.id
|
||||
)
|
||||
sql.commit()
|
||||
|
||||
# Generate HTML ID for form elements
|
||||
def html_id(self, prefix: str | None = None, /) -> str:
|
||||
components: list[str] = ['individual-part']
|
||||
|
||||
if prefix is not None:
|
||||
components.append(prefix)
|
||||
|
||||
components.append(self.fields.part)
|
||||
components.append(str(self.fields.color))
|
||||
components.append(self.fields.id)
|
||||
|
||||
return '-'.join(components)
|
||||
|
||||
# URL for quantity update
|
||||
def url_for_quantity(self, /) -> str:
|
||||
return url_for('individual_part.update_quantity', id=self.fields.id)
|
||||
|
||||
# URL for description update
|
||||
def url_for_description(self, /) -> str:
|
||||
return url_for('individual_part.update_description', id=self.fields.id)
|
||||
|
||||
# URL for problem (missing/damaged) update
|
||||
def url_for_problem(self, problem_type: str, /) -> str:
|
||||
if problem_type == 'missing':
|
||||
return url_for('individual_part.update_missing', id=self.fields.id)
|
||||
elif problem_type == 'damaged':
|
||||
return url_for('individual_part.update_damaged', id=self.fields.id)
|
||||
else:
|
||||
raise ValueError(f'Invalid problem type: {problem_type}')
|
||||
|
||||
# URL for checked status update
|
||||
def url_for_checked(self, /) -> str:
|
||||
return url_for('individual_part.update_checked', id=self.fields.id)
|
||||
|
||||
# URL for purchase date update
|
||||
def url_for_purchase_date(self, /) -> str:
|
||||
return url_for('individual_part.update_purchase_date', id=self.fields.id)
|
||||
|
||||
# URL for purchase price update
|
||||
def url_for_purchase_price(self, /) -> str:
|
||||
return url_for('individual_part.update_purchase_price', id=self.fields.id)
|
||||
|
||||
# URL for this part's detail page
|
||||
def url(self, /) -> str:
|
||||
return url_for('individual_part.details', id=self.fields.id)
|
||||
|
||||
def url_for_delete(self, /) -> str:
|
||||
return url_for('individual_part.delete_part', id=self.fields.id)
|
||||
|
||||
def url_for_image(self, /) -> str:
|
||||
if current_app.config.get('USE_REMOTE_IMAGES', False):
|
||||
if hasattr(self.fields, 'image') and self.fields.image:
|
||||
return self.fields.image
|
||||
else:
|
||||
return current_app.config.get('REBRICKABLE_IMAGE_NIL', '')
|
||||
else:
|
||||
from .rebrickable_image import RebrickableImage
|
||||
|
||||
if hasattr(self.fields, 'image') and self.fields.image:
|
||||
image_id, _ = os.path.splitext(os.path.basename(urlparse(self.fields.image).path))
|
||||
|
||||
if image_id:
|
||||
return RebrickableImage.static_url(image_id, 'PARTS_FOLDER')
|
||||
|
||||
return RebrickableImage.static_url(RebrickableImage.nil_name(), 'PARTS_FOLDER')
|
||||
|
||||
# String representation for debugging
|
||||
def __repr__(self, /) -> str:
|
||||
"""String representation for debugging"""
|
||||
part_id = getattr(self.fields, 'part', 'unknown')
|
||||
color_id = getattr(self.fields, 'color', 'unknown')
|
||||
qty = getattr(self.fields, 'quantity', 0)
|
||||
return f'<IndividualPart {part_id} color:{color_id} qty:{qty}>'
|
||||
|
||||
# Get or fetch color information from rebrickable_colors table
|
||||
@staticmethod
|
||||
def get_or_fetch_color(color_id: int, /) -> dict[str, Any] | None:
|
||||
sql = BrickSQL()
|
||||
|
||||
# Check if color exists in cache
|
||||
result = sql.fetchone('rebrickable_colors/select/by_color_id', parameters={'color_id': color_id})
|
||||
|
||||
if result:
|
||||
# Color found in cache
|
||||
return {
|
||||
'color_id': result[0],
|
||||
'name': result[1],
|
||||
'rgb': result[2],
|
||||
'is_trans': result[3],
|
||||
'bricklink_color_id': result[4],
|
||||
'bricklink_color_name': result[5]
|
||||
}
|
||||
|
||||
# Color not in cache, fetch from API
|
||||
try:
|
||||
import rebrick
|
||||
import json
|
||||
|
||||
rebrick.init(current_app.config['REBRICKABLE_API_KEY'])
|
||||
color_response = rebrick.lego.get_color(color_id)
|
||||
color_data = json.loads(color_response.read())
|
||||
|
||||
# Extract BrickLink color info
|
||||
bricklink_color_id = None
|
||||
bricklink_color_name = None
|
||||
|
||||
if 'external_ids' in color_data and 'BrickLink' in color_data['external_ids']:
|
||||
bricklink_info = color_data['external_ids']['BrickLink']
|
||||
if 'ext_ids' in bricklink_info and bricklink_info['ext_ids']:
|
||||
bricklink_color_id = bricklink_info['ext_ids'][0]
|
||||
if 'ext_descrs' in bricklink_info and bricklink_info['ext_descrs']:
|
||||
bricklink_color_name = bricklink_info['ext_descrs'][0][0] if bricklink_info['ext_descrs'][0] else None
|
||||
|
||||
# Store in cache
|
||||
sql.execute('rebrickable_colors/insert', parameters={
|
||||
'color_id': color_data['id'],
|
||||
'name': color_data['name'],
|
||||
'rgb': color_data.get('rgb'),
|
||||
'is_trans': color_data.get('is_trans', False),
|
||||
'bricklink_color_id': bricklink_color_id,
|
||||
'bricklink_color_name': bricklink_color_name
|
||||
})
|
||||
sql.connection.commit()
|
||||
|
||||
logger.info('Cached color {color_id} ({color_name}) with BrickLink ID {bricklink_id}'.format(
|
||||
color_id=color_id,
|
||||
color_name=color_data["name"],
|
||||
bricklink_id=bricklink_color_id
|
||||
))
|
||||
|
||||
return {
|
||||
'color_id': color_data['id'],
|
||||
'name': color_data['name'],
|
||||
'rgb': color_data.get('rgb'),
|
||||
'is_trans': color_data.get('is_trans', False),
|
||||
'bricklink_color_id': bricklink_color_id,
|
||||
'bricklink_color_name': bricklink_color_name
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.warning('Could not fetch color {color_id} from API: {error}'.format(
|
||||
color_id=color_id,
|
||||
error=e
|
||||
))
|
||||
return None
|
||||
|
||||
# Download image for this part
|
||||
def download_image(self, image_url: str, /, *, image_filename: str | None = None) -> None:
|
||||
if not image_url:
|
||||
return
|
||||
|
||||
# Use provided filename or extract from URL
|
||||
if image_filename:
|
||||
image_id = image_filename
|
||||
else:
|
||||
image_id, _ = os.path.splitext(os.path.basename(urlparse(image_url).path))
|
||||
|
||||
if not image_id:
|
||||
return
|
||||
|
||||
# Build path (same pattern as RebrickableImage)
|
||||
parts_folder = current_app.config['PARTS_FOLDER']
|
||||
extension = 'jpg' # Everything is saved as jpg
|
||||
|
||||
# If folder is an absolute path (starts with /), use it directly
|
||||
# Otherwise, make it relative to app root (current_app.root_path)
|
||||
if parts_folder.startswith('/'):
|
||||
base_path = parts_folder
|
||||
else:
|
||||
base_path = os.path.join(current_app.root_path, parts_folder)
|
||||
|
||||
path = os.path.join(base_path, f'{image_id}.{extension}')
|
||||
|
||||
# Avoid downloading if file exists
|
||||
if os.path.exists(path):
|
||||
return
|
||||
|
||||
# Create directory if it doesn't exist
|
||||
os.makedirs(os.path.dirname(path), exist_ok=True)
|
||||
|
||||
# Download the image
|
||||
try:
|
||||
response = requests.get(image_url, stream=True)
|
||||
if response.ok:
|
||||
with open(path, 'wb') as f:
|
||||
copyfileobj(response.raw, f)
|
||||
logger.info('Downloaded image to {path}'.format(path=path))
|
||||
except Exception as e:
|
||||
logger.warning('Could not download image from {url}: {error}'.format(
|
||||
url=image_url,
|
||||
error=e
|
||||
))
|
||||
|
||||
# Load available colors for a part
|
||||
def load_colors(self, socket: 'BrickSocket', data: dict[str, Any], /) -> bool:
|
||||
# Check if individual parts are disabled
|
||||
if current_app.config.get('DISABLE_INDIVIDUAL_PARTS', False):
|
||||
socket.fail(message='Individual parts system is disabled.')
|
||||
return False
|
||||
|
||||
try:
|
||||
# Extract part number
|
||||
part_num = str(data.get('part', '')).strip()
|
||||
|
||||
if not part_num:
|
||||
raise ErrorException('Part number is required')
|
||||
|
||||
# Fetch available colors from Rebrickable
|
||||
import rebrick
|
||||
import json
|
||||
|
||||
rebrick.init(current_app.config['REBRICKABLE_API_KEY'])
|
||||
|
||||
# Setup progress tracking
|
||||
socket.progress_count = 0
|
||||
socket.progress_total = 2 # Fetch part info + fetch colors
|
||||
|
||||
try:
|
||||
# Get part information for the name
|
||||
socket.auto_progress(message='Fetching part information')
|
||||
part_response = rebrick.lego.get_part(part_num)
|
||||
part_data = json.loads(part_response.read())
|
||||
part_name = part_data.get('name', part_num)
|
||||
|
||||
# Get all available colors for this part
|
||||
socket.auto_progress(message='Fetching available colors')
|
||||
colors_response = rebrick.lego.get_part_colors(part_num)
|
||||
colors_data = json.loads(colors_response.read())
|
||||
|
||||
# Extract the results
|
||||
colors = colors_data.get('results', [])
|
||||
|
||||
if not colors:
|
||||
raise ErrorException(f'No colors found for part {part_num}')
|
||||
|
||||
# Download images locally if USE_REMOTE_IMAGES is False
|
||||
if not current_app.config.get('USE_REMOTE_IMAGES', False):
|
||||
# Add image downloads to progress
|
||||
socket.progress_total += len(colors)
|
||||
|
||||
for color in colors:
|
||||
image_url = color.get('part_img_url', '')
|
||||
element_id = color.get('elements', [])
|
||||
# Use first element_id if available, otherwise extract from URL
|
||||
if element_id and len(element_id) > 0:
|
||||
image_filename = str(element_id[0])
|
||||
else:
|
||||
# Fallback: extract from URL
|
||||
image_filename = None
|
||||
if image_url:
|
||||
image_filename, _ = os.path.splitext(os.path.basename(urlparse(image_url).path))
|
||||
|
||||
if image_url and image_filename:
|
||||
socket.auto_progress(message='Downloading image for {color}'.format(
|
||||
color=color.get("color_name", "color")
|
||||
))
|
||||
try:
|
||||
self.download_image(image_url, image_filename=image_filename)
|
||||
except Exception as e:
|
||||
logger.warning('Could not download image for part {part_num} color {color_id}: {error}'.format(
|
||||
part_num=part_num,
|
||||
color_id=color.get("color_id"),
|
||||
error=e
|
||||
))
|
||||
|
||||
# Emit the part colors loaded event
|
||||
logger.info('Emitting {count} colors for part {part_num} ({part_name})'.format(
|
||||
count=len(colors),
|
||||
part_num=part_num,
|
||||
part_name=part_name
|
||||
))
|
||||
|
||||
socket.emit(
|
||||
'PART_COLORS_LOADED',
|
||||
{
|
||||
'part': part_num,
|
||||
'part_name': part_name,
|
||||
'colors': colors,
|
||||
'count': len(colors)
|
||||
}
|
||||
)
|
||||
|
||||
logger.info('Successfully loaded {count} colors for part {part_num}'.format(
|
||||
count=len(colors),
|
||||
part_num=part_num
|
||||
))
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
error_msg = str(e)
|
||||
|
||||
# Provide helpful error message for printed/decorated parts
|
||||
if '404' in error_msg or 'Not Found' in error_msg:
|
||||
# Check if this might be a printed part (has letters/pattern code)
|
||||
base_part = ''.join(c for c in part_num if c.isdigit())
|
||||
|
||||
if base_part and base_part != part_num:
|
||||
raise ErrorException(
|
||||
'Part {part_num} not found in Rebrickable. This appears to be a printed/decorated part. '
|
||||
'Try searching for the base part number: {base_part}'.format(
|
||||
part_num=part_num,
|
||||
base_part=base_part
|
||||
)
|
||||
)
|
||||
else:
|
||||
raise ErrorException(
|
||||
'Part {part_num} not found in Rebrickable. '
|
||||
'Please verify the part number is correct.'.format(
|
||||
part_num=part_num
|
||||
)
|
||||
)
|
||||
else:
|
||||
raise ErrorException(
|
||||
'Could not fetch colors for part {part_num}: {error}'.format(
|
||||
part_num=part_num,
|
||||
error=error_msg
|
||||
)
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
error_msg = str(e)
|
||||
socket.fail(message=f'Could not load part colors: {error_msg}')
|
||||
|
||||
if not isinstance(e, (NotFoundException, ErrorException)):
|
||||
logger.debug(traceback.format_exc())
|
||||
|
||||
return False
|
||||
|
||||
# Add a new individual part
|
||||
def add(self, socket: 'BrickSocket', data: dict[str, Any], /) -> bool:
|
||||
# Check if individual parts are disabled
|
||||
if current_app.config.get('DISABLE_INDIVIDUAL_PARTS', False):
|
||||
socket.fail(message='Individual parts system is disabled.')
|
||||
return False
|
||||
|
||||
try:
|
||||
# Reset progress
|
||||
socket.progress_count = 0
|
||||
socket.progress_total = 3
|
||||
|
||||
socket.auto_progress(message='Validating part and color')
|
||||
|
||||
# Extract data
|
||||
part_num = str(data.get('part', '')).strip()
|
||||
color_id = int(data.get('color', -1))
|
||||
quantity = int(data.get('quantity', 1))
|
||||
|
||||
if not part_num:
|
||||
raise ErrorException('Part number is required')
|
||||
if color_id < 0:
|
||||
raise ErrorException('Valid color ID is required')
|
||||
if quantity <= 0:
|
||||
raise ErrorException('Quantity must be greater than 0')
|
||||
|
||||
# Check if color info was pre-loaded (from load_colors)
|
||||
color_data = data.get('color_info', None)
|
||||
part_name = data.get('part_name', None)
|
||||
|
||||
# Validate part+color exists in rebrickable_parts
|
||||
# If not, fetch from Rebrickable or use pre-loaded data and insert
|
||||
sql = BrickSQL()
|
||||
result = sql.fetchone('rebrickable_parts/check_exists', parameters={'part': part_num, 'color_id': color_id})
|
||||
exists = result[0] > 0
|
||||
|
||||
# Store image URL for downloading later
|
||||
image_url = None
|
||||
|
||||
if not exists:
|
||||
# Fetch full color information (with BrickLink mapping)
|
||||
socket.auto_progress(message='Fetching color information')
|
||||
full_color_info = IndividualPart.get_or_fetch_color(color_id)
|
||||
|
||||
# If we have pre-loaded color data, use it; otherwise fetch from Rebrickable
|
||||
if color_data and part_name:
|
||||
# Use pre-loaded data from get_part_colors() response
|
||||
socket.auto_progress(message='Using cached part info')
|
||||
|
||||
image_url = color_data.get('part_img_url', '')
|
||||
|
||||
# Extract image_id from element_id or URL
|
||||
element_ids = color_data.get('elements', [])
|
||||
if element_ids and len(element_ids) > 0:
|
||||
image_id = str(element_ids[0])
|
||||
elif image_url:
|
||||
image_id, _ = os.path.splitext(os.path.basename(urlparse(image_url).path))
|
||||
else:
|
||||
image_id = None
|
||||
|
||||
# Insert into rebrickable_parts using the pre-loaded data
|
||||
sql.execute('rebrickable_parts/insert_with_preloaded_data', parameters={
|
||||
'part': part_num,
|
||||
'color_id': color_id,
|
||||
'color_name': color_data.get('color_name', ''),
|
||||
'color_rgb': full_color_info.get('rgb') if full_color_info else None,
|
||||
'color_transparent': full_color_info.get('is_trans') if full_color_info else None,
|
||||
'bricklink_color_id': full_color_info.get('bricklink_color_id') if full_color_info else None,
|
||||
'bricklink_color_name': full_color_info.get('bricklink_color_name') if full_color_info else None,
|
||||
'name': part_name,
|
||||
'image': image_url,
|
||||
'image_id': image_id,
|
||||
'url': current_app.config['REBRICKABLE_LINK_PART_PATTERN'].format(part=part_num, color=color_id)
|
||||
})
|
||||
else:
|
||||
# Fetch from Rebrickable (fallback for old workflow)
|
||||
socket.auto_progress(message='Fetching part info from Rebrickable')
|
||||
import rebrick
|
||||
import json
|
||||
|
||||
# Initialize rebrick with API key
|
||||
rebrick.init(current_app.config['REBRICKABLE_API_KEY'])
|
||||
|
||||
try:
|
||||
# Get part information
|
||||
part_info = json.loads(rebrick.lego.get_part(part_num).read())
|
||||
|
||||
# Get color information (this also caches it in rebrickable_colors)
|
||||
# full_color_info already fetched above, but get again to be sure
|
||||
if not full_color_info:
|
||||
full_color_info = IndividualPart.get_or_fetch_color(color_id)
|
||||
|
||||
# Get part+color specific info (for the image and element_id)
|
||||
part_color_info = json.loads(rebrick.lego.get_part_color(part_num, color_id).read())
|
||||
|
||||
# Get image URL
|
||||
image_url = part_color_info.get('part_img_url', part_info.get('part_img_url', ''))
|
||||
|
||||
# Extract image_id from element_ids or URL
|
||||
element_ids = part_color_info.get('elements', [])
|
||||
if element_ids and len(element_ids) > 0:
|
||||
image_id = str(element_ids[0])
|
||||
elif image_url:
|
||||
image_id, _ = os.path.splitext(os.path.basename(urlparse(image_url).path))
|
||||
else:
|
||||
image_id = None
|
||||
|
||||
# Insert into rebrickable_parts with BrickLink color info
|
||||
sql.execute('rebrickable_parts/insert_with_preloaded_data', parameters={
|
||||
'part': part_info['part_num'],
|
||||
'color_id': full_color_info['color_id'] if full_color_info else color_id,
|
||||
'color_name': full_color_info['name'] if full_color_info else '',
|
||||
'color_rgb': full_color_info['rgb'] if full_color_info else None,
|
||||
'color_transparent': full_color_info['is_trans'] if full_color_info else None,
|
||||
'bricklink_color_id': full_color_info.get('bricklink_color_id') if full_color_info else None,
|
||||
'bricklink_color_name': full_color_info.get('bricklink_color_name') if full_color_info else None,
|
||||
'name': part_info['name'],
|
||||
'image': image_url,
|
||||
'image_id': image_id,
|
||||
'url': part_info['part_url']
|
||||
})
|
||||
|
||||
except Exception as e:
|
||||
error_msg = str(e)
|
||||
|
||||
# Provide helpful error message for printed/decorated parts
|
||||
if '404' in error_msg or 'Not Found' in error_msg:
|
||||
base_part = ''.join(c for c in part_num if c.isdigit())
|
||||
|
||||
if base_part and base_part != part_num:
|
||||
raise ErrorException(
|
||||
f'Part {part_num} with color {color_id} not found in Rebrickable. '
|
||||
f'This appears to be a printed/decorated part. '
|
||||
f'Try using the base part number: {base_part}'
|
||||
)
|
||||
else:
|
||||
raise ErrorException(
|
||||
f'Part {part_num} with color {color_id} not found in Rebrickable. '
|
||||
f'Please verify the part number is correct.'
|
||||
)
|
||||
else:
|
||||
raise ErrorException(
|
||||
f'Part {part_num} with color {color_id} not found in Rebrickable: {error_msg}'
|
||||
)
|
||||
else:
|
||||
# Part already exists in rebrickable_parts, get the image URL
|
||||
result = sql.fetchone('rebrickable_parts/select/image_by_part_color', parameters={'part': part_num, 'color_id': color_id})
|
||||
if result and result[0]:
|
||||
image_url = result[0]
|
||||
|
||||
# Generate UUID and insert individual part
|
||||
socket.auto_progress(message='Adding part to collection')
|
||||
part_id = str(uuid4())
|
||||
|
||||
# Get storage and purchase location
|
||||
storage = BrickSetStorageList.get(
|
||||
data.get('storage', ''),
|
||||
allow_none=True
|
||||
)
|
||||
purchase_location = BrickSetPurchaseLocationList.get(
|
||||
data.get('purchase_location', ''),
|
||||
allow_none=True
|
||||
)
|
||||
|
||||
# Set fields
|
||||
self.fields.id = part_id
|
||||
self.fields.part = part_num
|
||||
self.fields.color = color_id
|
||||
self.fields.quantity = quantity
|
||||
self.fields.missing = 0
|
||||
self.fields.damaged = 0
|
||||
self.fields.checked = 0
|
||||
self.fields.description = data.get('description', '')
|
||||
self.fields.lot_id = None # Single parts are not in a lot
|
||||
self.fields.storage = storage.fields.id if storage else None
|
||||
self.fields.purchase_location = purchase_location.fields.id if purchase_location else None
|
||||
self.fields.purchase_date = data.get('purchase_date', None)
|
||||
self.fields.purchase_price = data.get('purchase_price', None)
|
||||
|
||||
# Insert into database
|
||||
self.insert(commit=False, no_defer=True)
|
||||
|
||||
# Save owners
|
||||
owners: list[str] = list(data.get('owners', []))
|
||||
for owner_id in owners:
|
||||
owner = BrickSetOwnerList.get(owner_id)
|
||||
owner.update_individual_part_state(self, state=True)
|
||||
|
||||
# Save tags
|
||||
tags: list[str] = list(data.get('tags', []))
|
||||
for tag_id in tags:
|
||||
tag = BrickSetTagList.get(tag_id)
|
||||
tag.update_individual_part_state(self, state=True)
|
||||
|
||||
# Commit
|
||||
sql.connection.commit()
|
||||
|
||||
# Download image if we have a URL
|
||||
if image_url:
|
||||
try:
|
||||
self.download_image(image_url)
|
||||
except Exception as e:
|
||||
# Don't fail the whole operation if image download fails
|
||||
logger.warning('Could not download image for part {part_num} color {color_id}: {error}'.format(
|
||||
part_num=part_num,
|
||||
color_id=color_id,
|
||||
error=e
|
||||
))
|
||||
|
||||
# Get color name for success message
|
||||
color_name = 'Unknown'
|
||||
if color_data and color_data.get('color_name'):
|
||||
color_name = color_data.get('color_name')
|
||||
elif full_color_info and full_color_info.get('name'):
|
||||
color_name = full_color_info.get('name')
|
||||
|
||||
# Generate link to part details page
|
||||
part_url = url_for('part.details', part=part_num, color=color_id)
|
||||
|
||||
socket.complete(
|
||||
message=f'Successfully added part {part_num} in {color_name} (<a href="{part_url}">View details</a>)'
|
||||
)
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
error_msg = str(e)
|
||||
if 'Individual parts system is disabled' in error_msg:
|
||||
socket.fail(message=error_msg)
|
||||
else:
|
||||
socket.fail(
|
||||
message=f'Could not add individual part: {error_msg}'
|
||||
)
|
||||
|
||||
if not isinstance(e, (NotFoundException, ErrorException)):
|
||||
logger.debug(traceback.format_exc())
|
||||
|
||||
return False
|
||||
|
||||
# Create multiple individual parts (bulk mode - no lot)
|
||||
def create_bulk(self, socket: 'BrickSocket', data: dict[str, Any], /) -> bool:
|
||||
"""
|
||||
Create multiple individual parts without creating a lot.
|
||||
|
||||
Expected data format:
|
||||
{
|
||||
'cart': [
|
||||
{
|
||||
'part': '3001',
|
||||
'part_name': 'Brick 2 x 4',
|
||||
'color_id': 1,
|
||||
'color_name': 'White',
|
||||
'quantity': 10,
|
||||
'color_info': {...}
|
||||
},
|
||||
...
|
||||
],
|
||||
'storage': 'storage_id',
|
||||
'purchase_location': 'purchase_location_id',
|
||||
'purchase_date': timestamp,
|
||||
'purchase_price': 0.0,
|
||||
'owners': ['owner_id1', ...],
|
||||
'tags': ['tag_id1', ...]
|
||||
}
|
||||
"""
|
||||
try:
|
||||
# Validate cart data
|
||||
cart = data.get('cart', [])
|
||||
if not cart or not isinstance(cart, list):
|
||||
raise ErrorException('Cart is empty or invalid')
|
||||
|
||||
socket.auto_progress(
|
||||
message=f'Adding {len(cart)} individual parts',
|
||||
increment_total=True
|
||||
)
|
||||
|
||||
# Get storage
|
||||
from .set_list import BrickSetStorageList, BrickSetPurchaseLocationList, BrickSetOwnerList, BrickSetTagList
|
||||
storage = BrickSetStorageList.get(
|
||||
data.get('storage', ''),
|
||||
allow_none=True
|
||||
)
|
||||
storage_id = storage.fields.id if storage else None
|
||||
|
||||
# Get purchase location
|
||||
purchase_location = BrickSetPurchaseLocationList.get(
|
||||
data.get('purchase_location', ''),
|
||||
allow_none=True
|
||||
)
|
||||
purchase_location_id = purchase_location.fields.id if purchase_location else None
|
||||
|
||||
# Get purchase info
|
||||
purchase_date = data.get('purchase_date', None)
|
||||
purchase_price = data.get('purchase_price', None)
|
||||
|
||||
# Get owners and tags
|
||||
owners: list[str] = list(data.get('owners', []))
|
||||
tags: list[str] = list(data.get('tags', []))
|
||||
|
||||
# Add all parts from cart
|
||||
parts_added = 0
|
||||
for idx, cart_item in enumerate(cart):
|
||||
part_num = cart_item.get('part')
|
||||
color_id = cart_item.get('color_id')
|
||||
quantity = cart_item.get('quantity', 1)
|
||||
color_info = cart_item.get('color_info', {})
|
||||
|
||||
socket.auto_progress(
|
||||
message=f'Adding part {idx + 1}/{len(cart)}: {part_num} in {cart_item.get("color_name", "unknown color")}',
|
||||
increment_total=True
|
||||
)
|
||||
|
||||
# Create individual part with no lot_id
|
||||
part_uuid = str(uuid4())
|
||||
|
||||
# Ensure color exists and get full color info (including RGB)
|
||||
full_color_info = IndividualPart.get_or_fetch_color(color_id)
|
||||
|
||||
# Insert the part
|
||||
sql = BrickSQL()
|
||||
|
||||
# Ensure part/color combination exists in rebrickable_parts (same as lot creation)
|
||||
try:
|
||||
# Check if part exists
|
||||
result = sql.fetchone('rebrickable_parts/check_exists', parameters={'part': part_num, 'color_id': color_id})
|
||||
exists = result[0] > 0
|
||||
|
||||
if not exists:
|
||||
# Insert part data
|
||||
part_name = cart_item.get('part_name', '')
|
||||
color_name = cart_item.get('color_name', '')
|
||||
image_url = color_info.get('part_img_url', '')
|
||||
|
||||
# Extract image_id from element_ids or URL
|
||||
element_ids = color_info.get('elements', [])
|
||||
if element_ids and len(element_ids) > 0:
|
||||
image_id = str(element_ids[0])
|
||||
elif image_url:
|
||||
image_id, _ = os.path.splitext(os.path.basename(urlparse(image_url).path))
|
||||
else:
|
||||
image_id = None
|
||||
|
||||
# Use full_color_info for RGB and transparency data (same as single-part add)
|
||||
sql.execute('rebrickable_parts/insert_part_color', parameters={
|
||||
'part': part_num,
|
||||
'name': part_name,
|
||||
'color_id': color_id,
|
||||
'color_name': color_name,
|
||||
'color_rgb': full_color_info.get('rgb') if full_color_info else '',
|
||||
'color_transparent': full_color_info.get('is_trans') if full_color_info else False,
|
||||
'image': image_url,
|
||||
'image_id': image_id,
|
||||
'url': current_app.config['REBRICKABLE_LINK_PART_PATTERN'].format(part=part_num, color=color_id),
|
||||
'bricklink_color_id': full_color_info.get('bricklink_color_id') if full_color_info else None,
|
||||
'bricklink_color_name': full_color_info.get('bricklink_color_name') if full_color_info else None
|
||||
})
|
||||
except Exception as e:
|
||||
logger.warning('Could not ensure part data for {part_num}/{color_id}: {error}'.format(
|
||||
part_num=part_num,
|
||||
color_id=color_id,
|
||||
error=e
|
||||
))
|
||||
|
||||
# Insert individual part
|
||||
sql.execute(
|
||||
'individual_part/insert',
|
||||
parameters={
|
||||
'id': part_uuid,
|
||||
'part': part_num,
|
||||
'color': color_id,
|
||||
'quantity': quantity,
|
||||
'lot_id': None, # No lot - this is bulk individual parts mode
|
||||
'storage': storage_id,
|
||||
'purchase_location': purchase_location_id,
|
||||
'purchase_date': purchase_date,
|
||||
'purchase_price': purchase_price,
|
||||
'description': None,
|
||||
'missing': 0,
|
||||
'damaged': 0,
|
||||
'checked': False
|
||||
}
|
||||
)
|
||||
|
||||
# Add owners
|
||||
for owner_id in owners:
|
||||
owner = BrickSetOwnerList.get(owner_id)
|
||||
if owner:
|
||||
sql.execute(
|
||||
'individual_part/metadata/owner/insert',
|
||||
parameters={
|
||||
'part_id': part_uuid,
|
||||
'owner_id': owner_id
|
||||
}
|
||||
)
|
||||
|
||||
# Add tags
|
||||
for tag_id in tags:
|
||||
tag = BrickSetTagList.get(tag_id)
|
||||
if tag:
|
||||
sql.execute(
|
||||
'individual_part/metadata/tag/insert',
|
||||
parameters={
|
||||
'part_id': part_uuid,
|
||||
'tag_id': tag_id
|
||||
}
|
||||
)
|
||||
|
||||
parts_added += 1
|
||||
|
||||
# Commit all changes
|
||||
sql = BrickSQL()
|
||||
sql.commit()
|
||||
|
||||
socket.auto_progress(
|
||||
message=f'Successfully added {parts_added} individual parts',
|
||||
increment_total=True
|
||||
)
|
||||
|
||||
# Generate link to individual parts list
|
||||
from flask import url_for
|
||||
parts_url = url_for('individual_part.list_all')
|
||||
|
||||
# Send completion with message and link
|
||||
socket.complete(
|
||||
message='Successfully added {count} individual parts. <a href="{url}">View individual parts</a>'.format(
|
||||
count=parts_added,
|
||||
url=parts_url
|
||||
),
|
||||
parts_added=parts_added
|
||||
)
|
||||
return True
|
||||
|
||||
except ErrorException as error:
|
||||
socket.fail(message=str(error))
|
||||
return False
|
||||
except Exception as error:
|
||||
logger.error('Failed to create bulk individual parts: {error}'.format(error=error))
|
||||
logger.error(traceback.format_exc())
|
||||
socket.fail(message='Failed to add individual parts: {error}'.format(error=str(error)))
|
||||
return False
|
||||
|
||||
# Update a field
|
||||
def update_field(self, field: str, value: Any, /) -> Self:
|
||||
setattr(self.fields, field, value)
|
||||
|
||||
# Use a specific update query for each field
|
||||
sql = BrickSQL()
|
||||
sql.execute_and_commit('individual_part/update/field', parameters={
|
||||
'id': self.fields.id,
|
||||
'value': value
|
||||
}, field=field)
|
||||
|
||||
return self
|
||||
|
||||
# Update problem count (missing/damaged)
|
||||
def update_problem(self, problem: str, data: dict[str, Any], /) -> int:
|
||||
# Handle both 'value' key and 'amount' key
|
||||
amount: str | int = data.get('value', data.get('amount', '')) # type: ignore
|
||||
|
||||
# We need a positive integer
|
||||
try:
|
||||
if amount == '':
|
||||
amount = 0
|
||||
|
||||
amount = int(amount)
|
||||
|
||||
if amount < 0:
|
||||
amount = 0
|
||||
except Exception:
|
||||
raise ErrorException(f'"{amount}" is not a valid integer')
|
||||
|
||||
if amount < 0:
|
||||
raise ErrorException('Cannot set a negative amount')
|
||||
|
||||
setattr(self.fields, problem, amount)
|
||||
|
||||
BrickSQL().execute_and_commit(
|
||||
f'individual_part/update/{problem}',
|
||||
parameters={
|
||||
'id': self.fields.id,
|
||||
problem: amount
|
||||
}
|
||||
)
|
||||
|
||||
return amount
|
||||
|
||||
# Update checked status
|
||||
def update_checked(self, data: dict[str, Any], /) -> bool:
|
||||
# Handle both direct 'checked' key and changer.js 'value' key format
|
||||
if data:
|
||||
checked = data.get('checked', data.get('value', False))
|
||||
else:
|
||||
checked = False
|
||||
|
||||
checked = bool(checked)
|
||||
self.fields.checked = 1 if checked else 0
|
||||
|
||||
BrickSQL().execute_and_commit(
|
||||
'individual_part/update/checked',
|
||||
parameters={
|
||||
'id': self.fields.id,
|
||||
'checked': self.fields.checked
|
||||
}
|
||||
)
|
||||
|
||||
return checked
|
||||
@@ -0,0 +1,100 @@
|
||||
import logging
|
||||
from typing import Self, TYPE_CHECKING
|
||||
|
||||
from .record_list import BrickRecordList
|
||||
from .individual_part import IndividualPart
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .set_storage import BrickSetStorage
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# List of individual parts
|
||||
class IndividualPartList(BrickRecordList):
|
||||
# Queries
|
||||
list_query: str = 'individual_part/list/all'
|
||||
by_part_query: str = 'individual_part/list/by_part'
|
||||
by_color_query: str = 'individual_part/list/by_color'
|
||||
by_part_and_color_query: str = 'individual_part/list/by_part_and_color'
|
||||
by_storage_query: str = 'individual_part/list/by_storage'
|
||||
using_storage_query: str = 'individual_part/list/using_storage'
|
||||
using_purchase_location_query: str = 'individual_part/list/using_purchase_location'
|
||||
without_storage_query: str = 'individual_part/list/without_storage'
|
||||
problem_query: str = 'individual_part/list/problem'
|
||||
|
||||
# Get all individual parts
|
||||
def all(self, /) -> Self:
|
||||
self.list(override_query=self.list_query)
|
||||
return self
|
||||
|
||||
# Get individual parts by part number
|
||||
def by_part(self, part: str, /) -> Self:
|
||||
self.fields.part = part
|
||||
self.list(override_query=self.by_part_query)
|
||||
return self
|
||||
|
||||
# Get individual parts by color
|
||||
def by_color(self, color_id: int, /) -> Self:
|
||||
self.fields.color = color_id
|
||||
self.list(override_query=self.by_color_query)
|
||||
return self
|
||||
|
||||
# Get individual parts by part number and color
|
||||
def by_part_and_color(self, part: str, color_id: int, /) -> Self:
|
||||
self.fields.part = part
|
||||
self.fields.color = color_id
|
||||
self.list(override_query=self.by_part_and_color_query)
|
||||
return self
|
||||
|
||||
# Get individual parts by storage location
|
||||
def by_storage(self, storage: 'BrickSetStorage', /) -> Self:
|
||||
self.fields.storage = storage.fields.id
|
||||
self.list(override_query=self.by_storage_query)
|
||||
return self
|
||||
|
||||
# Get individual parts using a specific storage location
|
||||
def using_storage(self, storage: 'BrickSetStorage', /) -> Self:
|
||||
self.fields.storage = storage.fields.id
|
||||
self.list(override_query=self.using_storage_query)
|
||||
return self
|
||||
|
||||
# Get individual parts using a specific purchase location
|
||||
def using_purchase_location(self, purchase_location: 'BrickSetPurchaseLocation', /) -> Self:
|
||||
self.fields.purchase_location = purchase_location.fields.id
|
||||
self.list(override_query=self.using_purchase_location_query)
|
||||
return self
|
||||
|
||||
# Get individual parts without storage
|
||||
def without_storage(self, /) -> Self:
|
||||
self.list(override_query=self.without_storage_query)
|
||||
return self
|
||||
|
||||
# Get individual parts with problems (missing or damaged)
|
||||
def with_problems(self, /) -> Self:
|
||||
self.list(override_query=self.problem_query)
|
||||
return self
|
||||
|
||||
# Base individual part list
|
||||
def list(
|
||||
self,
|
||||
/,
|
||||
*,
|
||||
override_query: str | None = None,
|
||||
order: str | None = None,
|
||||
limit: int | None = None,
|
||||
**context,
|
||||
) -> None:
|
||||
# Load the individual parts from the database
|
||||
for record in super().select(
|
||||
override_query=override_query,
|
||||
order=order,
|
||||
limit=limit,
|
||||
**context
|
||||
):
|
||||
individual_part = IndividualPart(record=record)
|
||||
self.records.append(individual_part)
|
||||
|
||||
# Set the record class
|
||||
def set_record_class(self, /) -> None:
|
||||
self.record_class = IndividualPart
|
||||
@@ -0,0 +1,315 @@
|
||||
import logging
|
||||
import os
|
||||
import traceback
|
||||
from datetime import datetime
|
||||
from typing import Any, Self, TYPE_CHECKING
|
||||
from urllib.parse import urlparse
|
||||
from uuid import uuid4
|
||||
|
||||
from flask import url_for
|
||||
|
||||
from .exceptions import NotFoundException, DatabaseException, ErrorException
|
||||
from .individual_part import IndividualPart
|
||||
from .record import BrickRecord, format_timestamp
|
||||
from .set_owner_list import BrickSetOwnerList
|
||||
from .set_purchase_location_list import BrickSetPurchaseLocationList
|
||||
from .set_storage_list import BrickSetStorageList
|
||||
from .set_tag_list import BrickSetTagList
|
||||
from .sql import BrickSQL
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .socket import BrickSocket
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# Individual part lot (collection/batch of individual parts added together)
|
||||
class IndividualPartLot(BrickRecord):
|
||||
# Queries
|
||||
select_query: str = 'individual_part_lot/select/by_id'
|
||||
insert_query: str = 'individual_part_lot/insert'
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
/,
|
||||
*,
|
||||
record: Any | None = None
|
||||
):
|
||||
super().__init__()
|
||||
|
||||
# Ingest the record if it has one
|
||||
if record is not None:
|
||||
self.ingest(record)
|
||||
|
||||
# Select a specific lot by UUID
|
||||
def select_by_id(self, id: str, /) -> Self:
|
||||
from .set_owner_list import BrickSetOwnerList
|
||||
from .set_tag_list import BrickSetTagList
|
||||
|
||||
self.fields.id = id
|
||||
if not self.select(
|
||||
override_query=self.select_query,
|
||||
owners=BrickSetOwnerList.as_columns(),
|
||||
tags=BrickSetTagList.as_columns(),
|
||||
# Note: Part lots don't have statuses (by design)
|
||||
# Statuses are meant for tracking set completion/verification, which doesn't apply
|
||||
# to loose part collections. Individual parts within lots can still be marked as
|
||||
# missing/damaged/checked through the parts inventory system.
|
||||
):
|
||||
raise NotFoundException(
|
||||
'Individual part lot with id "{id}" not found'.format(id=id)
|
||||
)
|
||||
return self
|
||||
|
||||
# Delete a lot and all its parts
|
||||
def delete(self, /) -> None:
|
||||
BrickSQL().executescript(
|
||||
'individual_part_lot/delete',
|
||||
id=self.fields.id
|
||||
)
|
||||
|
||||
# Get the URL for this lot
|
||||
def url(self, /) -> str:
|
||||
return url_for('individual_part.lot_details', lot_id=self.fields.id)
|
||||
|
||||
# String representation for debugging
|
||||
def __repr__(self, /) -> str:
|
||||
name = getattr(self.fields, 'name', 'Unnamed') or 'Unnamed'
|
||||
lot_id = getattr(self.fields, 'id', 'unknown')
|
||||
# Try to get part_count if available (from optimized query)
|
||||
part_count = getattr(self.fields, 'part_count', '?')
|
||||
return f'<IndividualPartLot "{name}" ({part_count} parts) id:{lot_id[:8]}...>'
|
||||
|
||||
# Format created date
|
||||
def created_date_formatted(self, /) -> str:
|
||||
return format_timestamp(self.fields.created_date)
|
||||
|
||||
# Format purchase date
|
||||
def purchase_date_formatted(self, /) -> str:
|
||||
return format_timestamp(self.fields.purchase_date)
|
||||
|
||||
# Format purchase price
|
||||
def purchase_price(self, /) -> str:
|
||||
from flask import current_app
|
||||
if self.fields.purchase_price is not None:
|
||||
return '{price}{currency}'.format(
|
||||
price=self.fields.purchase_price,
|
||||
currency=current_app.config['PURCHASE_CURRENCY']
|
||||
)
|
||||
else:
|
||||
return ''
|
||||
|
||||
# Get all parts in this lot
|
||||
def parts(self, /) -> list['IndividualPart']:
|
||||
sql = BrickSQL()
|
||||
parts_data = sql.fetchall('individual_part_lot/list/parts', parameters={'lot_id': self.fields.id})
|
||||
|
||||
# Convert to list of IndividualPart objects using ingest()
|
||||
return [IndividualPart(record=record) for record in parts_data]
|
||||
|
||||
# Get total quantity of all parts in this lot
|
||||
def total_quantity(self, /) -> int:
|
||||
parts = self.parts()
|
||||
return sum(part.fields.quantity for part in parts)
|
||||
|
||||
# Create a new lot with parts from cart
|
||||
def create(self, socket: 'BrickSocket', data: dict[str, Any], /) -> bool:
|
||||
"""
|
||||
Create a new individual part lot with multiple parts.
|
||||
|
||||
Expected data format:
|
||||
{
|
||||
'cart': [
|
||||
{
|
||||
'part': '3001',
|
||||
'part_name': 'Brick 2 x 4',
|
||||
'color_id': 1,
|
||||
'color_name': 'White',
|
||||
'quantity': 10,
|
||||
'color_info': {...}
|
||||
},
|
||||
...
|
||||
],
|
||||
'name': 'Optional lot name',
|
||||
'description': 'Optional lot description',
|
||||
'storage': 'storage_id',
|
||||
'purchase_location': 'purchase_location_id',
|
||||
'purchase_date': timestamp,
|
||||
'purchase_price': 0.0,
|
||||
'owners': ['owner_id1', ...],
|
||||
'tags': ['tag_id1', ...]
|
||||
}
|
||||
"""
|
||||
try:
|
||||
# Validate cart data
|
||||
cart = data.get('cart', [])
|
||||
if not cart or not isinstance(cart, list):
|
||||
raise ErrorException('Cart is empty or invalid')
|
||||
|
||||
socket.auto_progress(
|
||||
message=f'Creating lot with {len(cart)} parts',
|
||||
increment_total=True
|
||||
)
|
||||
|
||||
# Generate UUID for the lot
|
||||
lot_id = str(uuid4())
|
||||
self.fields.id = lot_id
|
||||
|
||||
# Set lot metadata
|
||||
self.fields.name = data.get('name', None)
|
||||
self.fields.description = data.get('description', None)
|
||||
self.fields.created_date = datetime.now().timestamp()
|
||||
|
||||
# Get storage
|
||||
storage = BrickSetStorageList.get(
|
||||
data.get('storage', ''),
|
||||
allow_none=True
|
||||
)
|
||||
self.fields.storage = storage.fields.id if storage else None
|
||||
|
||||
# Get purchase location
|
||||
purchase_location = BrickSetPurchaseLocationList.get(
|
||||
data.get('purchase_location', ''),
|
||||
allow_none=True
|
||||
)
|
||||
self.fields.purchase_location = purchase_location.fields.id if purchase_location else None
|
||||
|
||||
# Set purchase info
|
||||
self.fields.purchase_date = data.get('purchase_date', None)
|
||||
self.fields.purchase_price = data.get('purchase_price', None)
|
||||
|
||||
# Insert the lot record
|
||||
socket.auto_progress(
|
||||
message='Inserting lot into database',
|
||||
increment_total=True
|
||||
)
|
||||
self.insert(commit=False)
|
||||
|
||||
# Commit the lot so parts can reference it
|
||||
sql = BrickSQL()
|
||||
sql.commit()
|
||||
|
||||
# Save owners using the metadata update methods
|
||||
owners: list[str] = list(data.get('owners', []))
|
||||
for owner_id in owners:
|
||||
owner = BrickSetOwnerList.get(owner_id)
|
||||
if owner:
|
||||
owner.update_individual_part_lot_state(self, state=True, commit=False)
|
||||
|
||||
# Save tags using the metadata update methods
|
||||
tags: list[str] = list(data.get('tags', []))
|
||||
for tag_id in tags:
|
||||
tag = BrickSetTagList.get(tag_id)
|
||||
if tag:
|
||||
tag.update_individual_part_lot_state(self, state=True, commit=False)
|
||||
|
||||
# Add all parts from cart
|
||||
socket.auto_progress(
|
||||
message=f'Adding {len(cart)} parts to lot',
|
||||
increment_total=True
|
||||
)
|
||||
|
||||
for idx, cart_item in enumerate(cart):
|
||||
part_num = cart_item.get('part')
|
||||
color_id = cart_item.get('color_id')
|
||||
quantity = cart_item.get('quantity', 1)
|
||||
color_info = cart_item.get('color_info', {})
|
||||
|
||||
socket.auto_progress(
|
||||
message=f'Adding part {idx + 1}/{len(cart)}: {part_num} in {cart_item.get("color_name", "unknown color")}',
|
||||
increment_total=True
|
||||
)
|
||||
|
||||
# Create individual part with lot_id
|
||||
part_uuid = str(uuid4())
|
||||
|
||||
# Use the add method but with lot_id
|
||||
# We need to insert the part with the lot_id
|
||||
sql = BrickSQL()
|
||||
|
||||
# First ensure the part exists in rebrickable_parts (BEFORE inserting individual part)
|
||||
IndividualPart.get_or_fetch_color(color_id)
|
||||
|
||||
# Ensure part/color combination exists in rebrickable_parts
|
||||
try:
|
||||
# Check if part exists
|
||||
result = sql.fetchone('rebrickable_parts/check_exists', parameters={'part': part_num, 'color_id': color_id})
|
||||
exists = result[0] > 0
|
||||
|
||||
if not exists:
|
||||
# Insert part data
|
||||
part_name = cart_item.get('part_name', '')
|
||||
color_name = cart_item.get('color_name', '')
|
||||
image_url = color_info.get('part_img_url', '')
|
||||
|
||||
# Extract image_id from element_ids or URL
|
||||
element_ids = color_info.get('elements', [])
|
||||
if element_ids and len(element_ids) > 0:
|
||||
image_id = str(element_ids[0])
|
||||
elif image_url:
|
||||
image_id, _ = os.path.splitext(os.path.basename(urlparse(image_url).path))
|
||||
else:
|
||||
image_id = None
|
||||
|
||||
sql.execute('rebrickable_parts/insert_part_color', parameters={
|
||||
'part': part_num,
|
||||
'name': part_name,
|
||||
'color_id': color_id,
|
||||
'color_name': color_name,
|
||||
'color_rgb': color_info.get('rgb', ''),
|
||||
'color_transparent': color_info.get('is_trans', False),
|
||||
'image': image_url,
|
||||
'image_id': image_id,
|
||||
'url': current_app.config['REBRICKABLE_LINK_PART_PATTERN'].format(part=part_num, color=color_id),
|
||||
'bricklink_color_id': color_info.get('bricklink_color_id', None),
|
||||
'bricklink_color_name': color_info.get('bricklink_color_name', None)
|
||||
})
|
||||
except Exception as e:
|
||||
logger.warning('Could not ensure part data for {part_num}/{color_id}: {error}'.format(
|
||||
part_num=part_num,
|
||||
color_id=color_id,
|
||||
error=e
|
||||
))
|
||||
|
||||
# Now insert the part with lot_id (NO individual metadata - inherited from lot)
|
||||
sql.execute('individual_part/insert_with_lot', parameters={
|
||||
'id': part_uuid,
|
||||
'part': part_num,
|
||||
'color': color_id,
|
||||
'quantity': quantity,
|
||||
'lot_id': lot_id
|
||||
})
|
||||
|
||||
# Commit all changes
|
||||
socket.auto_progress(
|
||||
message='Committing changes to database',
|
||||
increment_total=True
|
||||
)
|
||||
sql.commit()
|
||||
|
||||
socket.auto_progress(
|
||||
message=f'Lot created successfully with {len(cart)} parts',
|
||||
increment_total=True
|
||||
)
|
||||
|
||||
# Complete with success message and lot URL
|
||||
lot_url = self.url()
|
||||
socket.complete(
|
||||
message=f'Successfully created lot with {len(cart)} parts. <a href="{lot_url}">View lot</a>',
|
||||
data={
|
||||
'lot_id': lot_id,
|
||||
'lot_url': lot_url
|
||||
}
|
||||
)
|
||||
|
||||
return True
|
||||
|
||||
except ErrorException as e:
|
||||
socket.fail(message=str(e))
|
||||
logger.error('Error creating lot: {error}'.format(error=e))
|
||||
return False
|
||||
except Exception as e:
|
||||
socket.fail(message='Unexpected error creating lot: {error}'.format(error=str(e)))
|
||||
logger.error('Unexpected error creating lot: {error}'.format(error=e))
|
||||
logger.error(traceback.format_exc())
|
||||
return False
|
||||
@@ -0,0 +1,86 @@
|
||||
import logging
|
||||
from typing import Self, TYPE_CHECKING
|
||||
|
||||
from .record_list import BrickRecordList
|
||||
from .individual_part_lot import IndividualPartLot
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .set_storage import BrickSetStorage
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# List of individual part lots
|
||||
class IndividualPartLotList(BrickRecordList):
|
||||
# Queries
|
||||
list_query: str = 'individual_part_lot/list/all'
|
||||
by_part_and_color_query: str = 'individual_part_lot/list/by_part_and_color'
|
||||
by_storage_query: str = 'individual_part_lot/list/by_storage'
|
||||
using_storage_query: str = 'individual_part_lot/list/using_storage'
|
||||
using_purchase_location_query: str = 'individual_part_lot/list/using_purchase_location'
|
||||
without_storage_query: str = 'individual_part_lot/list/without_storage'
|
||||
problem_query: str = 'individual_part_lot/list/problem'
|
||||
|
||||
# Get all individual part lots
|
||||
def all(self, /) -> Self:
|
||||
self.list(override_query=self.list_query)
|
||||
return self
|
||||
|
||||
# Base individual part lot list
|
||||
def list(
|
||||
self,
|
||||
/,
|
||||
*,
|
||||
override_query: str | None = None,
|
||||
order: str | None = None,
|
||||
limit: int | None = None,
|
||||
**context,
|
||||
) -> None:
|
||||
# Load the individual part lots from the database
|
||||
for record in super().select(
|
||||
override_query=override_query,
|
||||
order=order,
|
||||
limit=limit,
|
||||
**context
|
||||
):
|
||||
lot = IndividualPartLot(record=record)
|
||||
self.records.append(lot)
|
||||
|
||||
# Set the record class
|
||||
def set_record_class(self, /) -> None:
|
||||
self.record_class = IndividualPartLot
|
||||
|
||||
# Get individual part lots containing a specific part and color
|
||||
def by_part_and_color(self, part: str, color_id: int, /) -> Self:
|
||||
self.fields.part = part
|
||||
self.fields.color = color_id
|
||||
self.list(override_query='individual_part_lot/list/by_part_and_color')
|
||||
return self
|
||||
|
||||
# Get individual part lots by storage location
|
||||
def by_storage(self, storage: 'BrickSetStorage', /) -> Self:
|
||||
self.fields.storage = storage.fields.id
|
||||
self.list(override_query=self.by_storage_query)
|
||||
return self
|
||||
|
||||
# Get individual part lots using a specific storage location
|
||||
def using_storage(self, storage: 'BrickSetStorage', /) -> Self:
|
||||
self.fields.storage = storage.fields.id
|
||||
self.list(override_query=self.using_storage_query)
|
||||
return self
|
||||
|
||||
# Get individual part lots using a specific purchase location
|
||||
def using_purchase_location(self, purchase_location: 'BrickSetPurchaseLocation', /) -> Self:
|
||||
self.fields.purchase_location = purchase_location.fields.id
|
||||
self.list(override_query=self.using_purchase_location_query)
|
||||
return self
|
||||
|
||||
# Get individual part lots without storage
|
||||
def without_storage(self, /) -> Self:
|
||||
self.list(override_query=self.without_storage_query)
|
||||
return self
|
||||
|
||||
# Get individual part lots with problems (containing parts with missing or damaged items)
|
||||
def with_problems(self, /) -> Self:
|
||||
self.list(override_query=self.problem_query)
|
||||
return self
|
||||
Reference in New Issue
Block a user