Add subspace-tv FastAPI service with Unraid deployment

Includes FastAPI application with device authentication, Docker configuration,
Bruno API tests, and git-based deployment scripts for Unraid.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Michael Simard
2026-01-22 09:22:32 -06:00
parent 171d55bd4e
commit 57ad4b0f33
21 changed files with 904 additions and 0 deletions

1
src/__init__.py Normal file
View File

@@ -0,0 +1 @@
"""Subspace TV Configuration Server."""

67
src/auth.py Normal file
View File

@@ -0,0 +1,67 @@
"""Authorization management for device access control."""
import json
import os
from pathlib import Path
from typing import Set
class AuthorizationManager:
"""Manages device authorization with hot-reload capability."""
def __init__(self, file_path: str):
"""Initialize authorization manager with file path.
Args:
file_path: Path to the authorized_devices.json file
"""
self.file_path = Path(file_path)
self._authorized_devices: Set[str] = set()
self._last_modified: float = 0.0
self._load_authorization_list()
def _load_authorization_list(self) -> None:
"""Load authorization list from JSON file."""
try:
if not self.file_path.exists():
self._authorized_devices = set()
self._last_modified = 0.0
return
stat = os.stat(self.file_path)
self._last_modified = stat.st_mtime
with open(self.file_path, "r") as f:
data = json.load(f)
self._authorized_devices = set(data.get("authorized_devices", []))
except (json.JSONDecodeError, OSError) as e:
self._authorized_devices = set()
self._last_modified = 0.0
def _check_reload(self) -> None:
"""Check if authorization file has been modified and reload if necessary."""
try:
if not self.file_path.exists():
if self._last_modified != 0.0:
self._authorized_devices = set()
self._last_modified = 0.0
return
stat = os.stat(self.file_path)
if stat.st_mtime > self._last_modified:
self._load_authorization_list()
except OSError:
pass
def is_authorized(self, device_id: str) -> bool:
"""Check if a device ID is authorized.
Args:
device_id: The device identifier to check
Returns:
True if authorized, False otherwise
"""
self._check_reload()
return device_id in self._authorized_devices

88
src/config.py Normal file
View File

@@ -0,0 +1,88 @@
"""Application configuration settings."""
import json
from typing import Literal, TypedDict
from pydantic_settings import BaseSettings, SettingsConfigDict
ConfigType = Literal["vod", "live", "all"]
class ConfigItem(TypedDict):
"""Configuration item structure."""
type: ConfigType
url: str
username: str
password: str
class Settings(BaseSettings):
"""Application settings loaded from environment variables."""
model_config = SettingsConfigDict(
env_file=".env",
env_file_encoding="utf-8",
case_sensitive=False,
)
# Server Configuration
host: str = "0.0.0.0"
port: int = 8000
debug: bool = False
# Authorization
authorization_file: str = "/app/config/authorized_devices.json"
# Privileged Configuration (JSON array)
configs_json: str = ""
# Legacy config fields for backwards compatibility
config_url: str = "https://your-server.example.com"
config_username: str = ""
config_password: str = ""
# CORS
cors_origins: list[str] = ["*"]
@property
def configs(self) -> list[ConfigItem]:
"""Parse configs from JSON string or fall back to legacy fields."""
if self.configs_json:
try:
parsed = json.loads(self.configs_json)
if isinstance(parsed, list):
return [
ConfigItem(
type=item.get("type", "vod"),
url=item.get("url", ""),
username=item.get("username", ""),
password=item.get("password", ""),
)
for item in parsed
]
except json.JSONDecodeError:
pass
# Fall back to legacy single config as "vod" type
return [
ConfigItem(
type="vod",
url=self.config_url,
username=self.config_username,
password=self.config_password,
)
]
@property
def cors_origins_list(self) -> list[str]:
"""Parse CORS origins from string if necessary."""
if isinstance(self.cors_origins, str):
try:
return json.loads(self.cors_origins)
except json.JSONDecodeError:
return [self.cors_origins]
return self.cors_origins
def get_settings() -> Settings:
"""Returns the application settings singleton."""
return Settings()

65
src/main.py Normal file
View File

@@ -0,0 +1,65 @@
"""FastAPI application entry point."""
from fastapi import FastAPI, Header, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from src.auth import AuthorizationManager
from src.config import get_settings
settings = get_settings()
app = FastAPI(
title="Subspace TV Configuration Server",
version="1.0.0",
debug=settings.debug,
)
# Configure CORS
app.add_middleware(
CORSMiddleware,
allow_origins=settings.cors_origins_list,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Initialize authorization manager
auth_manager = AuthorizationManager(settings.authorization_file)
@app.get("/health")
async def health_check():
"""Health check endpoint."""
return {"status": "healthy"}
@app.get("/api/v1/config")
async def get_config(x_password: str = Header(None)):
"""Get device configuration based on authorization.
Args:
x_password: Password from request header
Returns:
Configuration object with authorization status and config data
Raises:
HTTPException: If X-Password header is missing
"""
if x_password is None:
raise HTTPException(
status_code=400,
detail="X-Password header is required"
)
is_authorized = auth_manager.is_authorized(x_password)
if is_authorized:
return {
"authorized": True,
"configs": settings.configs,
}
else:
return {
"authorized": False,
"configs": [],
}