Coverage for auth.py: 67.55%
226 statements
« prev ^ index » next coverage.py v7.10.6, created at 2026-04-13 00:07 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2026-04-13 00:07 +0000
1"""
2Authentication and authorization utilities for the sports league backend.
3"""
5import logging
6import os
7import secrets
8from datetime import UTC, datetime, timedelta
9from functools import wraps
10from typing import Any
12import jwt
13from dotenv import load_dotenv
14from fastapi import Depends, HTTPException
15from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
16from jwt import PyJWKClient
18from supabase import Client
20load_dotenv()
22# JWKS client for ES256 token verification (cached)
23_jwks_client: PyJWKClient | None = None
25logger = logging.getLogger(__name__)
26security = HTTPBearer()
29# ============================================================================
30# Username Authentication Helper Functions
31# ============================================================================
34def username_to_internal_email(username: str) -> str:
35 """
36 Convert username to internal email format for Supabase Auth.
38 Example: gabe_ifa_35 -> gabe_ifa_35@missingtable.local
39 """
40 return f"{username.lower()}@missingtable.local"
43def internal_email_to_username(email: str) -> str | None:
44 """
45 Extract username from internal email format.
47 Example: gabe_ifa_35@missingtable.local -> gabe_ifa_35
48 Returns None if not an internal email.
49 """
50 if email.endswith("@missingtable.local"):
51 return email.replace("@missingtable.local", "")
52 return None
55def is_internal_email(email: str) -> bool:
56 """Check if email is an internal format."""
57 return email.endswith("@missingtable.local")
60async def check_username_available(supabase_client: Client, username: str) -> bool:
61 """
62 Check if username is available.
64 Returns True if username is available, False if taken.
65 """
66 try:
67 result = supabase_client.table("user_profiles").select("id").eq("username", username.lower()).execute()
69 return len(result.data) == 0
70 except Exception as e:
71 logger.error(f"Error checking username availability: {e}")
72 raise
75class AuthManager:
76 def __init__(self, supabase_client: Client):
77 self.supabase = supabase_client
78 self.jwt_secret = os.getenv("SUPABASE_JWT_SECRET")
79 self.service_account_secret = os.getenv("SERVICE_ACCOUNT_SECRET", secrets.token_urlsafe(32))
81 if not self.jwt_secret:
82 raise ValueError("SUPABASE_JWT_SECRET environment variable is required. Please set it in your .env file.")
84 def verify_token(self, token: str) -> dict[str, Any] | None:
85 """Verify JWT token and return user data."""
86 global _jwks_client
87 try:
88 # Check token header to determine algorithm
89 unverified_header = jwt.get_unverified_header(token)
90 alg = unverified_header.get("alg", "HS256")
92 if alg == "ES256": 92 ↛ 94line 92 didn't jump to line 94 because the condition on line 92 was never true
93 # Use JWKS for ES256 tokens (new Supabase CLI)
94 supabase_url = os.getenv("SUPABASE_URL", "http://127.0.0.1:54331")
95 jwks_url = f"{supabase_url}/auth/v1/.well-known/jwks.json"
97 if _jwks_client is None:
98 _jwks_client = PyJWKClient(jwks_url)
100 signing_key = _jwks_client.get_signing_key_from_jwt(token)
101 payload = jwt.decode(
102 token,
103 signing_key.key,
104 algorithms=["ES256"],
105 audience="authenticated",
106 )
107 else:
108 # Use symmetric secret for HS256 tokens (legacy/cloud)
109 payload = jwt.decode(
110 token,
111 self.jwt_secret,
112 algorithms=["HS256"],
113 audience="authenticated",
114 )
116 user_id = payload.get("sub")
117 if not user_id:
118 return None
120 # Get user profile with role
121 logger.debug(f"Querying user_profiles for user_id: {user_id}")
122 profile_response = self.supabase.table("user_profiles").select("*").eq("id", user_id).execute()
124 if not profile_response.data or len(profile_response.data) == 0: 124 ↛ 125line 124 didn't jump to line 125 because the condition on line 124 was never true
125 logger.warning(f"No profile found for user {user_id}")
126 return None
128 # If multiple profiles exist, take the first one (should be fixed by cleanup script)
129 profile = profile_response.data[0]
130 if len(profile_response.data) > 1:
131 logger.warning(f"Multiple profiles found for user {user_id}, using first one")
133 # Extract username from profile (primary identifier)
134 username = profile.get("username")
136 # Get email from JWT payload (might be internal email format)
137 jwt_email = payload.get("email")
139 # Distinguish between internal email (username@missingtable.local) and real email
140 # Real email is stored in profile.email, internal email is in JWT
141 if jwt_email and is_internal_email(jwt_email):
142 # This is an internal email, extract username if not in profile
143 if not username:
144 username = internal_email_to_username(jwt_email)
145 real_email = profile.get("email") # Optional real email from profile
146 else:
147 # Legacy user with real email in JWT
148 real_email = jwt_email
150 return {
151 "user_id": user_id,
152 "username": username, # Primary identifier for username auth users
153 "email": real_email, # Real email (optional, for notifications)
154 "role": profile["role"],
155 "team_id": profile.get("team_id"),
156 "club_id": profile.get("club_id"), # For club managers
157 "display_name": profile.get("display_name"),
158 }
160 except jwt.ExpiredSignatureError:
161 logger.warning("Token has expired")
162 return None
163 except jwt.InvalidTokenError as e:
164 logger.warning(f"Invalid token: {e}")
165 return None
166 except Exception as e:
167 logger.error(f"Error verifying token: {e}")
168 return None
170 def create_service_account_token(self, service_name: str, permissions: list[str], expires_days: int = 365) -> str:
171 """Create a service account JWT token for automated systems."""
172 expiration = datetime.now(UTC) + timedelta(days=expires_days)
174 payload = {
175 "sub": f"service-{service_name}",
176 "iss": "missing-table",
177 "aud": "service-account",
178 "exp": int(expiration.timestamp()),
179 "iat": int(datetime.now(UTC).timestamp()),
180 "service_name": service_name,
181 "permissions": permissions,
182 "role": "service_account",
183 }
185 return jwt.encode(payload, self.service_account_secret, algorithm="HS256")
187 def verify_service_account_token(self, token: str) -> dict[str, Any] | None:
188 """Verify service account JWT token and return service data."""
189 try:
190 payload = jwt.decode(token, self.service_account_secret, algorithms=["HS256"], audience="service-account")
192 service_name = payload.get("service_name")
193 permissions = payload.get("permissions", [])
195 if not service_name:
196 return None
198 return {
199 "service_id": payload.get("sub"),
200 "service_name": service_name,
201 "permissions": permissions,
202 "role": "service_account",
203 "is_service_account": True,
204 }
206 except jwt.ExpiredSignatureError:
207 logger.warning("Service account token has expired")
208 return None
209 except jwt.InvalidTokenError as e:
210 logger.warning(f"Invalid service account token: {e}")
211 return None
212 except Exception as e:
213 logger.error(f"Error verifying service account token: {e}")
214 return None
216 def create_password_reset_token(self, user_id: str) -> str:
217 """Create a short-lived JWT for password reset (1 hour)."""
218 expiration = datetime.now(UTC) + timedelta(hours=1)
220 payload = {
221 "sub": user_id,
222 "iss": "missing-table",
223 "aud": "password-reset",
224 "exp": int(expiration.timestamp()),
225 "iat": int(datetime.now(UTC).timestamp()),
226 }
228 return jwt.encode(payload, self.service_account_secret, algorithm="HS256")
230 def verify_password_reset_token(self, token: str) -> str | None:
231 """Verify a password reset JWT and return the user_id, or None if invalid/expired."""
232 try:
233 payload = jwt.decode(
234 token,
235 self.service_account_secret,
236 algorithms=["HS256"],
237 audience="password-reset",
238 )
239 user_id = payload.get("sub")
240 return user_id if user_id else None
241 except jwt.ExpiredSignatureError:
242 logger.warning("Password reset token has expired")
243 return None
244 except jwt.InvalidTokenError as e:
245 logger.warning(f"Invalid password reset token: {e}")
246 return None
247 except Exception as e:
248 logger.error(f"Error verifying password reset token: {e}")
249 return None
251 def get_current_user(self, credentials: HTTPAuthorizationCredentials = Depends(security)) -> dict[str, Any]:
252 """FastAPI dependency to get current authenticated user or service account."""
253 if not credentials:
254 logger.warning("auth_failed: no_credentials")
255 raise HTTPException(status_code=401, detail="Authentication required")
257 # Try regular user token first
258 user_data = self.verify_token(credentials.credentials)
259 if user_data:
260 logger.debug(f"auth_success: user={user_data.get('username')}")
261 return user_data
263 # Try service account token
264 service_data = self.verify_service_account_token(credentials.credentials)
265 if service_data:
266 logger.debug(f"auth_success: service={service_data.get('service_name')}")
267 return service_data
269 logger.warning("auth_failed: invalid_token")
270 raise HTTPException(status_code=401, detail="Invalid or expired token")
272 def require_role(self, required_roles: list):
273 """Decorator to require specific roles."""
275 def decorator(func):
276 @wraps(func)
277 async def wrapper(*args, **kwargs):
278 # Get current user from kwargs (injected by FastAPI dependency)
279 current_user = None
280 for _key, value in kwargs.items():
281 if isinstance(value, dict) and "role" in value:
282 current_user = value
283 break
285 if not current_user:
286 raise HTTPException(status_code=401, detail="Authentication required")
288 user_role = current_user.get("role")
289 if user_role not in required_roles:
290 raise HTTPException(status_code=403, detail=f"Access denied. Required roles: {required_roles}")
292 return await func(*args, **kwargs)
294 return wrapper
296 return decorator
298 def can_manage_team(self, user_data: dict[str, Any], team_id: int) -> bool:
299 """Check if user can manage a specific team."""
300 role = user_data.get("role")
301 user_team_id = user_data.get("team_id")
302 user_club_id = user_data.get("club_id")
304 # Admins can manage any team
305 if role == "admin":
306 return True
308 # Team managers can only manage their own team
309 if role == "team-manager" and user_team_id == team_id:
310 return True
312 # Club managers can manage any team in their club
313 if role == "club_manager" and user_club_id:
314 team_club_id = self._get_team_club_id(team_id)
315 if team_club_id and team_club_id == user_club_id:
316 return True
318 return False
320 def _get_team_club_id(self, team_id: int) -> int | None:
321 """Get the club_id for a team."""
322 try:
323 result = self.supabase.table("teams").select("club_id").eq("id", team_id).execute()
324 if result.data and len(result.data) > 0:
325 return result.data[0].get("club_id")
326 return None
327 except Exception as e:
328 logger.error(f"Error getting team club_id: {e}")
329 return None
331 def can_edit_match(self, user_data: dict[str, Any], home_team_id: int, away_team_id: int) -> bool:
332 """Check if user can edit a match between specific teams."""
333 role = user_data.get("role")
334 user_team_id = user_data.get("team_id")
335 user_club_id = user_data.get("club_id")
336 username = user_data.get("username", "unknown")
338 logger.debug(
339 f"can_edit_match check - User: {username}, Role: {role}, "
340 f"user_team_id: {user_team_id}, user_club_id: {user_club_id}, "
341 f"home_team_id: {home_team_id}, away_team_id: {away_team_id}"
342 )
344 # Admins can edit any match
345 if role == "admin":
346 logger.debug(f"can_edit_match: GRANTED - {username} is admin")
347 return True
349 # Service accounts with manage_matches permission can edit any match
350 if role == "service_account":
351 permissions = user_data.get("permissions", [])
352 if "manage_matches" in permissions:
353 logger.debug("can_edit_match: GRANTED - service account has manage_matches")
354 return True
356 # Team managers can edit matches involving their team
357 if role == "team-manager" and user_team_id in [home_team_id, away_team_id]:
358 logger.debug(f"can_edit_match: GRANTED - team manager's team ({user_team_id}) is in match")
359 return True
361 # Club managers can edit matches involving any team in their club
362 if role == "club_manager" and user_club_id:
363 home_club_id = self._get_team_club_id(home_team_id)
364 away_club_id = self._get_team_club_id(away_team_id)
365 logger.debug(
366 f"can_edit_match: club_manager check - user_club_id: {user_club_id}, "
367 f"home_team ({home_team_id}) club: {home_club_id}, "
368 f"away_team ({away_team_id}) club: {away_club_id}"
369 )
370 if user_club_id in [home_club_id, away_club_id]:
371 logger.debug("can_edit_match: GRANTED - club manager's club matches")
372 return True
373 else:
374 logger.warning(
375 f"can_edit_match: DENIED - club_manager {username} (club {user_club_id}) "
376 f"cannot edit match between teams {home_team_id} (club {home_club_id}) "
377 f"and {away_team_id} (club {away_club_id})"
378 )
379 elif role == "club_manager" and not user_club_id:
380 logger.warning(f"can_edit_match: DENIED - club_manager {username} has no club_id set")
382 logger.debug(f"can_edit_match: DENIED - {username} ({role}) lacks permission")
383 return False
386# Auth dependency functions for FastAPI
387def get_current_user_optional(
388 credentials: HTTPAuthorizationCredentials | None = Depends(HTTPBearer(auto_error=False)),
389) -> dict[str, Any] | None:
390 """Get current user if authenticated, None otherwise."""
391 if not credentials:
392 return None
394 # This will be injected by the main app
395 from app import auth_manager
397 return auth_manager.verify_token(credentials.credentials)
400def get_current_user_required(
401 credentials: HTTPAuthorizationCredentials = Depends(security),
402) -> dict[str, Any]:
403 """Get current user, raise exception if not authenticated."""
404 # This will be injected by the main app
405 from app import auth_manager
407 return auth_manager.get_current_user(credentials)
410def require_admin(
411 current_user: dict[str, Any] = Depends(get_current_user_required),
412) -> dict[str, Any]:
413 """Require admin role."""
414 if current_user.get("role") != "admin":
415 raise HTTPException(status_code=403, detail="Admin access required")
416 return current_user
419def require_team_manager_or_admin(
420 current_user: dict[str, Any] = Depends(get_current_user_required),
421) -> dict[str, Any]:
422 """Require team-manager, club_manager, or admin role."""
423 role = current_user.get("role")
424 if role not in ["admin", "club_manager", "team-manager"]:
425 raise HTTPException(status_code=403, detail="Team manager, club manager, or admin access required")
426 return current_user
429def require_admin_or_service_account(
430 current_user: dict[str, Any] = Depends(get_current_user_required),
431) -> dict[str, Any]:
432 """Require admin role or service account with appropriate permissions."""
433 role = current_user.get("role")
435 # Allow admin users
436 if role == "admin":
437 return current_user
439 # Allow service accounts with match management permissions
440 if role == "service_account":
441 permissions = current_user.get("permissions", [])
442 if "manage_matches" in permissions:
443 return current_user
444 else:
445 raise HTTPException(status_code=403, detail="Service account requires 'manage_matches' permission")
447 raise HTTPException(status_code=403, detail="Admin or authorized service account access required")
450def require_match_management_permission(
451 current_user: dict[str, Any] = Depends(get_current_user_required),
452) -> dict[str, Any]:
453 """Require permission to manage matches (admin, club_manager, team-manager, or service account)."""
454 import logging
456 logger = logging.getLogger(__name__)
458 role = current_user.get("role")
459 # Use username for regular users, service_name for service accounts
460 user_identifier = current_user.get("username") or current_user.get("service_name", "unknown")
462 logger.info(f"Match management permission check - User: {user_identifier}, Role: {role}, User data: {current_user}")
464 # Allow admin, club managers, and team managers
465 if role in ["admin", "club_manager", "team-manager"]:
466 logger.info(f"Access granted - User {user_identifier} has role {role}")
467 return current_user
469 # Allow service accounts with match management permissions
470 if role == "service_account":
471 permissions = current_user.get("permissions", [])
472 if "manage_matches" in permissions:
473 logger.info(f"Access granted - Service account {user_identifier} has manage_matches permission")
474 return current_user
475 else:
476 logger.warning(
477 f"Access denied - Service account {user_identifier} missing manage_matches permission. "
478 f"Has: {permissions}"
479 )
480 raise HTTPException(status_code=403, detail="Service account requires 'manage_matches' permission")
482 logger.warning(f"Access denied - User {user_identifier} has insufficient role: {role}")
483 raise HTTPException(
484 status_code=403,
485 detail="Admin, club manager, team manager, or authorized service account access required",
486 )