111 lines
3.4 KiB
Python
111 lines
3.4 KiB
Python
import csv
|
|
import json
|
|
import requests
|
|
import time
|
|
from pathlib import Path
|
|
from typing import Optional, Dict
|
|
from dataclasses import dataclass
|
|
import logging
|
|
from requests.exceptions import RequestException
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s'
|
|
)
|
|
|
|
@dataclass
|
|
class APIConfig:
|
|
"""Configuration for the Rebrickable API"""
|
|
api_key: str
|
|
base_url: str = 'https://rebrickable.com/api/v3/lego/colors/'
|
|
rate_limit_delay: int = 10 # seconds between API calls
|
|
|
|
@property
|
|
def headers(self) -> Dict[str, str]:
|
|
return {
|
|
'Accept': 'application/json',
|
|
'Authorization': f'key {self.api_key}'
|
|
}
|
|
|
|
class ColorDataFetcher:
|
|
"""Handles fetching and saving of color data from the Rebrickable API"""
|
|
|
|
def __init__(self, config: APIConfig, output_dir: Path):
|
|
self.config = config
|
|
self.output_dir = output_dir
|
|
self.output_dir.mkdir(exist_ok=True)
|
|
|
|
def fetch_and_save_color(self, color_id: int) -> Optional[Dict]:
|
|
"""Fetch color data from API and save to JSON file if not already exists"""
|
|
if color_id == -1: # Skip unknown color
|
|
return None
|
|
|
|
json_path = self.output_dir / f'{color_id}.json'
|
|
|
|
# Check if file already exists
|
|
if json_path.exists():
|
|
try:
|
|
with json_path.open('r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
logging.info(f"JSON file for color ID {color_id} already exists, skipping API call")
|
|
return data
|
|
except json.JSONDecodeError:
|
|
logging.warning(f"Corrupted JSON file for color ID {color_id}, will retry API call")
|
|
json_path.unlink()
|
|
|
|
# Fetch from API
|
|
try:
|
|
url = f'{self.config.base_url}{color_id}'
|
|
response = requests.get(url, headers=self.config.headers)
|
|
response.raise_for_status()
|
|
|
|
data = response.json()
|
|
|
|
# Save to JSON file
|
|
with json_path.open('w', encoding='utf-8') as f:
|
|
json.dump(data, f, indent=2)
|
|
|
|
logging.info(f"Successfully fetched and saved color ID {color_id}")
|
|
time.sleep(self.config.rate_limit_delay) # Respect rate limits
|
|
return data
|
|
|
|
except RequestException as e:
|
|
logging.error(f"Error fetching data for color ID {color_id}: {e}")
|
|
return None
|
|
|
|
def process_colors(input_csv: Path, fetcher: ColorDataFetcher) -> None:
|
|
"""Process colors from CSV and fetch their data"""
|
|
try:
|
|
# Read color IDs from CSV
|
|
with input_csv.open('r', encoding='utf-8') as f:
|
|
reader = csv.DictReader(f)
|
|
color_ids = [int(row['id']) for row in reader]
|
|
|
|
# Fetch and save data for each color
|
|
for color_id in color_ids:
|
|
fetcher.fetch_and_save_color(color_id)
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error processing colors: {e}")
|
|
raise
|
|
|
|
def main():
|
|
"""Main entry point"""
|
|
try:
|
|
api_key = '<API KEY>' # Should be loaded from environment variable or config file
|
|
config = APIConfig(api_key=api_key)
|
|
fetcher = ColorDataFetcher(config, output_dir=Path('colors'))
|
|
|
|
process_colors(
|
|
input_csv=Path('colors.csv'),
|
|
fetcher=fetcher
|
|
)
|
|
|
|
except Exception as e:
|
|
logging.error(f"Application error: {e}")
|
|
raise
|
|
|
|
if __name__ == '__main__':
|
|
main()
|