Coverage for auth.py: 67.55%

226 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2026-04-13 14:26 +0000

1""" 

2Authentication and authorization utilities for the sports league backend. 

3""" 

4 

5import logging 

6import os 

7import secrets 

8from datetime import UTC, datetime, timedelta 

9from functools import wraps 

10from typing import Any 

11 

12import jwt 

13from dotenv import load_dotenv 

14from fastapi import Depends, HTTPException 

15from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer 

16from jwt import PyJWKClient 

17 

18from supabase import Client 

19 

20load_dotenv() 

21 

22# JWKS client for ES256 token verification (cached) 

23_jwks_client: PyJWKClient | None = None 

24 

25logger = logging.getLogger(__name__) 

26security = HTTPBearer() 

27 

28 

29# ============================================================================ 

30# Username Authentication Helper Functions 

31# ============================================================================ 

32 

33 

34def username_to_internal_email(username: str) -> str: 

35 """ 

36 Convert username to internal email format for Supabase Auth. 

37 

38 Example: gabe_ifa_35 -> gabe_ifa_35@missingtable.local 

39 """ 

40 return f"{username.lower()}@missingtable.local" 

41 

42 

43def internal_email_to_username(email: str) -> str | None: 

44 """ 

45 Extract username from internal email format. 

46 

47 Example: gabe_ifa_35@missingtable.local -> gabe_ifa_35 

48 Returns None if not an internal email. 

49 """ 

50 if email.endswith("@missingtable.local"): 

51 return email.replace("@missingtable.local", "") 

52 return None 

53 

54 

55def is_internal_email(email: str) -> bool: 

56 """Check if email is an internal format.""" 

57 return email.endswith("@missingtable.local") 

58 

59 

60async def check_username_available(supabase_client: Client, username: str) -> bool: 

61 """ 

62 Check if username is available. 

63 

64 Returns True if username is available, False if taken. 

65 """ 

66 try: 

67 result = supabase_client.table("user_profiles").select("id").eq("username", username.lower()).execute() 

68 

69 return len(result.data) == 0 

70 except Exception as e: 

71 logger.error(f"Error checking username availability: {e}") 

72 raise 

73 

74 

75class AuthManager: 

76 def __init__(self, supabase_client: Client): 

77 self.supabase = supabase_client 

78 self.jwt_secret = os.getenv("SUPABASE_JWT_SECRET") 

79 self.service_account_secret = os.getenv("SERVICE_ACCOUNT_SECRET", secrets.token_urlsafe(32)) 

80 

81 if not self.jwt_secret: 

82 raise ValueError("SUPABASE_JWT_SECRET environment variable is required. Please set it in your .env file.") 

83 

84 def verify_token(self, token: str) -> dict[str, Any] | None: 

85 """Verify JWT token and return user data.""" 

86 global _jwks_client 

87 try: 

88 # Check token header to determine algorithm 

89 unverified_header = jwt.get_unverified_header(token) 

90 alg = unverified_header.get("alg", "HS256") 

91 

92 if alg == "ES256": 92 ↛ 94line 92 didn't jump to line 94 because the condition on line 92 was never true

93 # Use JWKS for ES256 tokens (new Supabase CLI) 

94 supabase_url = os.getenv("SUPABASE_URL", "http://127.0.0.1:54331") 

95 jwks_url = f"{supabase_url}/auth/v1/.well-known/jwks.json" 

96 

97 if _jwks_client is None: 

98 _jwks_client = PyJWKClient(jwks_url) 

99 

100 signing_key = _jwks_client.get_signing_key_from_jwt(token) 

101 payload = jwt.decode( 

102 token, 

103 signing_key.key, 

104 algorithms=["ES256"], 

105 audience="authenticated", 

106 ) 

107 else: 

108 # Use symmetric secret for HS256 tokens (legacy/cloud) 

109 payload = jwt.decode( 

110 token, 

111 self.jwt_secret, 

112 algorithms=["HS256"], 

113 audience="authenticated", 

114 ) 

115 

116 user_id = payload.get("sub") 

117 if not user_id: 

118 return None 

119 

120 # Get user profile with role 

121 logger.debug(f"Querying user_profiles for user_id: {user_id}") 

122 profile_response = self.supabase.table("user_profiles").select("*").eq("id", user_id).execute() 

123 

124 if not profile_response.data or len(profile_response.data) == 0: 124 ↛ 125line 124 didn't jump to line 125 because the condition on line 124 was never true

125 logger.warning(f"No profile found for user {user_id}") 

126 return None 

127 

128 # If multiple profiles exist, take the first one (should be fixed by cleanup script) 

129 profile = profile_response.data[0] 

130 if len(profile_response.data) > 1: 

131 logger.warning(f"Multiple profiles found for user {user_id}, using first one") 

132 

133 # Extract username from profile (primary identifier) 

134 username = profile.get("username") 

135 

136 # Get email from JWT payload (might be internal email format) 

137 jwt_email = payload.get("email") 

138 

139 # Distinguish between internal email (username@missingtable.local) and real email 

140 # Real email is stored in profile.email, internal email is in JWT 

141 if jwt_email and is_internal_email(jwt_email): 

142 # This is an internal email, extract username if not in profile 

143 if not username: 

144 username = internal_email_to_username(jwt_email) 

145 real_email = profile.get("email") # Optional real email from profile 

146 else: 

147 # Legacy user with real email in JWT 

148 real_email = jwt_email 

149 

150 return { 

151 "user_id": user_id, 

152 "username": username, # Primary identifier for username auth users 

153 "email": real_email, # Real email (optional, for notifications) 

154 "role": profile["role"], 

155 "team_id": profile.get("team_id"), 

156 "club_id": profile.get("club_id"), # For club managers 

157 "display_name": profile.get("display_name"), 

158 } 

159 

160 except jwt.ExpiredSignatureError: 

161 logger.warning("Token has expired") 

162 return None 

163 except jwt.InvalidTokenError as e: 

164 logger.warning(f"Invalid token: {e}") 

165 return None 

166 except Exception as e: 

167 logger.error(f"Error verifying token: {e}") 

168 return None 

169 

170 def create_service_account_token(self, service_name: str, permissions: list[str], expires_days: int = 365) -> str: 

171 """Create a service account JWT token for automated systems.""" 

172 expiration = datetime.now(UTC) + timedelta(days=expires_days) 

173 

174 payload = { 

175 "sub": f"service-{service_name}", 

176 "iss": "missing-table", 

177 "aud": "service-account", 

178 "exp": int(expiration.timestamp()), 

179 "iat": int(datetime.now(UTC).timestamp()), 

180 "service_name": service_name, 

181 "permissions": permissions, 

182 "role": "service_account", 

183 } 

184 

185 return jwt.encode(payload, self.service_account_secret, algorithm="HS256") 

186 

187 def verify_service_account_token(self, token: str) -> dict[str, Any] | None: 

188 """Verify service account JWT token and return service data.""" 

189 try: 

190 payload = jwt.decode(token, self.service_account_secret, algorithms=["HS256"], audience="service-account") 

191 

192 service_name = payload.get("service_name") 

193 permissions = payload.get("permissions", []) 

194 

195 if not service_name: 

196 return None 

197 

198 return { 

199 "service_id": payload.get("sub"), 

200 "service_name": service_name, 

201 "permissions": permissions, 

202 "role": "service_account", 

203 "is_service_account": True, 

204 } 

205 

206 except jwt.ExpiredSignatureError: 

207 logger.warning("Service account token has expired") 

208 return None 

209 except jwt.InvalidTokenError as e: 

210 logger.warning(f"Invalid service account token: {e}") 

211 return None 

212 except Exception as e: 

213 logger.error(f"Error verifying service account token: {e}") 

214 return None 

215 

216 def create_password_reset_token(self, user_id: str) -> str: 

217 """Create a short-lived JWT for password reset (1 hour).""" 

218 expiration = datetime.now(UTC) + timedelta(hours=1) 

219 

220 payload = { 

221 "sub": user_id, 

222 "iss": "missing-table", 

223 "aud": "password-reset", 

224 "exp": int(expiration.timestamp()), 

225 "iat": int(datetime.now(UTC).timestamp()), 

226 } 

227 

228 return jwt.encode(payload, self.service_account_secret, algorithm="HS256") 

229 

230 def verify_password_reset_token(self, token: str) -> str | None: 

231 """Verify a password reset JWT and return the user_id, or None if invalid/expired.""" 

232 try: 

233 payload = jwt.decode( 

234 token, 

235 self.service_account_secret, 

236 algorithms=["HS256"], 

237 audience="password-reset", 

238 ) 

239 user_id = payload.get("sub") 

240 return user_id if user_id else None 

241 except jwt.ExpiredSignatureError: 

242 logger.warning("Password reset token has expired") 

243 return None 

244 except jwt.InvalidTokenError as e: 

245 logger.warning(f"Invalid password reset token: {e}") 

246 return None 

247 except Exception as e: 

248 logger.error(f"Error verifying password reset token: {e}") 

249 return None 

250 

251 def get_current_user(self, credentials: HTTPAuthorizationCredentials = Depends(security)) -> dict[str, Any]: 

252 """FastAPI dependency to get current authenticated user or service account.""" 

253 if not credentials: 

254 logger.warning("auth_failed: no_credentials") 

255 raise HTTPException(status_code=401, detail="Authentication required") 

256 

257 # Try regular user token first 

258 user_data = self.verify_token(credentials.credentials) 

259 if user_data: 

260 logger.debug(f"auth_success: user={user_data.get('username')}") 

261 return user_data 

262 

263 # Try service account token 

264 service_data = self.verify_service_account_token(credentials.credentials) 

265 if service_data: 

266 logger.debug(f"auth_success: service={service_data.get('service_name')}") 

267 return service_data 

268 

269 logger.warning("auth_failed: invalid_token") 

270 raise HTTPException(status_code=401, detail="Invalid or expired token") 

271 

272 def require_role(self, required_roles: list): 

273 """Decorator to require specific roles.""" 

274 

275 def decorator(func): 

276 @wraps(func) 

277 async def wrapper(*args, **kwargs): 

278 # Get current user from kwargs (injected by FastAPI dependency) 

279 current_user = None 

280 for _key, value in kwargs.items(): 

281 if isinstance(value, dict) and "role" in value: 

282 current_user = value 

283 break 

284 

285 if not current_user: 

286 raise HTTPException(status_code=401, detail="Authentication required") 

287 

288 user_role = current_user.get("role") 

289 if user_role not in required_roles: 

290 raise HTTPException(status_code=403, detail=f"Access denied. Required roles: {required_roles}") 

291 

292 return await func(*args, **kwargs) 

293 

294 return wrapper 

295 

296 return decorator 

297 

298 def can_manage_team(self, user_data: dict[str, Any], team_id: int) -> bool: 

299 """Check if user can manage a specific team.""" 

300 role = user_data.get("role") 

301 user_team_id = user_data.get("team_id") 

302 user_club_id = user_data.get("club_id") 

303 

304 # Admins can manage any team 

305 if role == "admin": 

306 return True 

307 

308 # Team managers can only manage their own team 

309 if role == "team-manager" and user_team_id == team_id: 

310 return True 

311 

312 # Club managers can manage any team in their club 

313 if role == "club_manager" and user_club_id: 

314 team_club_id = self._get_team_club_id(team_id) 

315 if team_club_id and team_club_id == user_club_id: 

316 return True 

317 

318 return False 

319 

320 def _get_team_club_id(self, team_id: int) -> int | None: 

321 """Get the club_id for a team.""" 

322 try: 

323 result = self.supabase.table("teams").select("club_id").eq("id", team_id).execute() 

324 if result.data and len(result.data) > 0: 

325 return result.data[0].get("club_id") 

326 return None 

327 except Exception as e: 

328 logger.error(f"Error getting team club_id: {e}") 

329 return None 

330 

331 def can_edit_match(self, user_data: dict[str, Any], home_team_id: int, away_team_id: int) -> bool: 

332 """Check if user can edit a match between specific teams.""" 

333 role = user_data.get("role") 

334 user_team_id = user_data.get("team_id") 

335 user_club_id = user_data.get("club_id") 

336 username = user_data.get("username", "unknown") 

337 

338 logger.debug( 

339 f"can_edit_match check - User: {username}, Role: {role}, " 

340 f"user_team_id: {user_team_id}, user_club_id: {user_club_id}, " 

341 f"home_team_id: {home_team_id}, away_team_id: {away_team_id}" 

342 ) 

343 

344 # Admins can edit any match 

345 if role == "admin": 

346 logger.debug(f"can_edit_match: GRANTED - {username} is admin") 

347 return True 

348 

349 # Service accounts with manage_matches permission can edit any match 

350 if role == "service_account": 

351 permissions = user_data.get("permissions", []) 

352 if "manage_matches" in permissions: 

353 logger.debug("can_edit_match: GRANTED - service account has manage_matches") 

354 return True 

355 

356 # Team managers can edit matches involving their team 

357 if role == "team-manager" and user_team_id in [home_team_id, away_team_id]: 

358 logger.debug(f"can_edit_match: GRANTED - team manager's team ({user_team_id}) is in match") 

359 return True 

360 

361 # Club managers can edit matches involving any team in their club 

362 if role == "club_manager" and user_club_id: 

363 home_club_id = self._get_team_club_id(home_team_id) 

364 away_club_id = self._get_team_club_id(away_team_id) 

365 logger.debug( 

366 f"can_edit_match: club_manager check - user_club_id: {user_club_id}, " 

367 f"home_team ({home_team_id}) club: {home_club_id}, " 

368 f"away_team ({away_team_id}) club: {away_club_id}" 

369 ) 

370 if user_club_id in [home_club_id, away_club_id]: 

371 logger.debug("can_edit_match: GRANTED - club manager's club matches") 

372 return True 

373 else: 

374 logger.warning( 

375 f"can_edit_match: DENIED - club_manager {username} (club {user_club_id}) " 

376 f"cannot edit match between teams {home_team_id} (club {home_club_id}) " 

377 f"and {away_team_id} (club {away_club_id})" 

378 ) 

379 elif role == "club_manager" and not user_club_id: 

380 logger.warning(f"can_edit_match: DENIED - club_manager {username} has no club_id set") 

381 

382 logger.debug(f"can_edit_match: DENIED - {username} ({role}) lacks permission") 

383 return False 

384 

385 

386# Auth dependency functions for FastAPI 

387def get_current_user_optional( 

388 credentials: HTTPAuthorizationCredentials | None = Depends(HTTPBearer(auto_error=False)), 

389) -> dict[str, Any] | None: 

390 """Get current user if authenticated, None otherwise.""" 

391 if not credentials: 

392 return None 

393 

394 # This will be injected by the main app 

395 from app import auth_manager 

396 

397 return auth_manager.verify_token(credentials.credentials) 

398 

399 

400def get_current_user_required( 

401 credentials: HTTPAuthorizationCredentials = Depends(security), 

402) -> dict[str, Any]: 

403 """Get current user, raise exception if not authenticated.""" 

404 # This will be injected by the main app 

405 from app import auth_manager 

406 

407 return auth_manager.get_current_user(credentials) 

408 

409 

410def require_admin( 

411 current_user: dict[str, Any] = Depends(get_current_user_required), 

412) -> dict[str, Any]: 

413 """Require admin role.""" 

414 if current_user.get("role") != "admin": 

415 raise HTTPException(status_code=403, detail="Admin access required") 

416 return current_user 

417 

418 

419def require_team_manager_or_admin( 

420 current_user: dict[str, Any] = Depends(get_current_user_required), 

421) -> dict[str, Any]: 

422 """Require team-manager, club_manager, or admin role.""" 

423 role = current_user.get("role") 

424 if role not in ["admin", "club_manager", "team-manager"]: 

425 raise HTTPException(status_code=403, detail="Team manager, club manager, or admin access required") 

426 return current_user 

427 

428 

429def require_admin_or_service_account( 

430 current_user: dict[str, Any] = Depends(get_current_user_required), 

431) -> dict[str, Any]: 

432 """Require admin role or service account with appropriate permissions.""" 

433 role = current_user.get("role") 

434 

435 # Allow admin users 

436 if role == "admin": 

437 return current_user 

438 

439 # Allow service accounts with match management permissions 

440 if role == "service_account": 

441 permissions = current_user.get("permissions", []) 

442 if "manage_matches" in permissions: 

443 return current_user 

444 else: 

445 raise HTTPException(status_code=403, detail="Service account requires 'manage_matches' permission") 

446 

447 raise HTTPException(status_code=403, detail="Admin or authorized service account access required") 

448 

449 

450def require_match_management_permission( 

451 current_user: dict[str, Any] = Depends(get_current_user_required), 

452) -> dict[str, Any]: 

453 """Require permission to manage matches (admin, club_manager, team-manager, or service account).""" 

454 import logging 

455 

456 logger = logging.getLogger(__name__) 

457 

458 role = current_user.get("role") 

459 # Use username for regular users, service_name for service accounts 

460 user_identifier = current_user.get("username") or current_user.get("service_name", "unknown") 

461 

462 logger.info(f"Match management permission check - User: {user_identifier}, Role: {role}, User data: {current_user}") 

463 

464 # Allow admin, club managers, and team managers 

465 if role in ["admin", "club_manager", "team-manager"]: 

466 logger.info(f"Access granted - User {user_identifier} has role {role}") 

467 return current_user 

468 

469 # Allow service accounts with match management permissions 

470 if role == "service_account": 

471 permissions = current_user.get("permissions", []) 

472 if "manage_matches" in permissions: 

473 logger.info(f"Access granted - Service account {user_identifier} has manage_matches permission") 

474 return current_user 

475 else: 

476 logger.warning( 

477 f"Access denied - Service account {user_identifier} missing manage_matches permission. " 

478 f"Has: {permissions}" 

479 ) 

480 raise HTTPException(status_code=403, detail="Service account requires 'manage_matches' permission") 

481 

482 logger.warning(f"Access denied - User {user_identifier} has insufficient role: {role}") 

483 raise HTTPException( 

484 status_code=403, 

485 detail="Admin, club manager, team manager, or authorized service account access required", 

486 )