BrickTracker/bricktracker/socket_decorator.py

94 lines
2.8 KiB
Python

from functools import wraps
from threading import Thread
from typing import Callable, ParamSpec, TYPE_CHECKING, Union
from flask import copy_current_request_context
from .configuration_list import BrickConfigurationList
from .login import LoginManager
if TYPE_CHECKING:
from .socket import BrickSocket
# What a threaded function can return (None or Thread)
SocketReturn = Union[None, Thread]
# Threaded signature (*arg, **kwargs -> (None or Thread)
P = ParamSpec('P')
SocketCallable = Callable[P, SocketReturn]
# Fail if not authenticated
def authenticated_socket(
self: 'BrickSocket',
/,
*,
threaded: bool = True,
) -> Callable[[SocketCallable], SocketCallable]:
def outer(function: SocketCallable, /) -> SocketCallable:
@wraps(function)
def wrapper(*args, **kwargs) -> SocketReturn:
# Needs to be authenticated
if LoginManager.is_not_authenticated():
self.fail(message='You need to be authenticated')
return
# Apply threading
if threaded:
return threaded_socket(self)(function)(*args, **kwargs)
else:
return function(*args, **kwargs)
return wrapper
return outer
# Fail if not ready for Rebrickable (authenticated, API key)
# Automatically makes it threaded
def rebrickable_socket(
self: 'BrickSocket',
/,
*,
threaded: bool = True,
) -> Callable[[SocketCallable], SocketCallable]:
def outer(function: SocketCallable, /) -> SocketCallable:
@wraps(function)
# Automatically authenticated
@authenticated_socket(self, threaded=False)
def wrapper(*args, **kwargs) -> SocketReturn:
# Needs the Rebrickable API key
try:
BrickConfigurationList.error_unless_is_set('REBRICKABLE_API_KEY') # noqa: E501
except Exception as e:
self.fail(message=str(e))
return
# Apply threading
if threaded:
return threaded_socket(self)(function)(*args, **kwargs)
else:
return function(*args, **kwargs)
return wrapper
return outer
# Start the function in a thread if the socket is threaded
def threaded_socket(
self: 'BrickSocket',
/
) -> Callable[[SocketCallable], SocketCallable]:
def outer(function: SocketCallable, /) -> SocketCallable:
@wraps(function)
def wrapper(*args, **kwargs) -> SocketReturn:
# Start it in a thread if requested
if self.threaded:
@copy_current_request_context
def do_function() -> None:
function(*args, **kwargs)
return self.socket.start_background_task(do_function)
else:
return function(*args, **kwargs)
return wrapper
return outer