Coverage for dao/audit_dao.py: 9.52%
106 statements
« prev ^ index » next coverage.py v7.10.6, created at 2026-04-13 00:07 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2026-04-13 00:07 +0000
1"""
2Audit data access object for match data-integrity auditing.
4Provides operations for the audit_teams scheduling table and audit_events
5findings table. Used exclusively by the match-scraper-agent (service key).
6"""
8from datetime import UTC, datetime, timedelta
10import structlog
12from dao.base_dao import BaseDAO
14logger = structlog.get_logger()
16# Teams audited within this window are considered "up-to-date"
17_AUDIT_WINDOW_DAYS = 7
20class AuditDAO(BaseDAO):
21 """Data Access Object for audit scheduling and findings."""
23 # ── Scheduling ────────────────────────────────────────────────────────────
25 def get_next_team(self, season: str, division: str, league: str) -> dict | None:
26 """Return the next team to audit, or None if all teams are current.
28 Selects the row with the oldest last_audited_at (NULL first = highest
29 priority). Returns None when the oldest audit is still within the
30 7-day window — meaning every team has been seen this week.
32 Args:
33 season: Season name, e.g. "2025-2026".
34 division: Division name, e.g. "Northeast".
35 league: League name, e.g. "Homegrown".
37 Returns:
38 Dict with team/age_group/league/division/season/last_audited_at,
39 or None if all teams are up-to-date.
40 """
41 try:
42 response = (
43 self.client.table("audit_teams")
44 .select("team, age_group, league, division, season, last_audited_at")
45 .eq("season", season)
46 .eq("division", division)
47 .eq("league", league)
48 .order("last_audited_at", desc=False, nullsfirst=True)
49 .limit(1)
50 .execute()
51 )
52 except Exception:
53 logger.exception("audit_dao.get_next_team.error")
54 raise
56 if not response.data:
57 logger.info("audit_dao.get_next_team.no_teams", season=season)
58 return None
60 row = response.data[0]
61 last_audited = row.get("last_audited_at")
63 if last_audited is not None:
64 # Parse timestamp and check if still within the audit window
65 try:
66 audited_dt = datetime.fromisoformat(last_audited.replace("Z", "+00:00"))
67 except (ValueError, AttributeError):
68 audited_dt = None
70 if audited_dt is not None:
71 cutoff = datetime.now(tz=UTC) - timedelta(days=_AUDIT_WINDOW_DAYS)
72 if audited_dt >= cutoff:
73 logger.info(
74 "audit_dao.get_next_team.all_current",
75 season=season,
76 oldest_audit=last_audited,
77 )
78 return None
80 logger.info(
81 "audit_dao.get_next_team.selected",
82 team=row["team"],
83 age_group=row["age_group"],
84 last_audited_at=last_audited,
85 )
86 return row
88 def get_audit_teams(self, season: str, division: str, league: str) -> list[dict]:
89 """Return all teams registered for auditing in a season/division/league."""
90 try:
91 response = (
92 self.client.table("audit_teams")
93 .select("*")
94 .eq("season", season)
95 .eq("division", division)
96 .eq("league", league)
97 .order("age_group")
98 .order("team")
99 .execute()
100 )
101 return response.data or []
102 except Exception:
103 logger.exception("audit_dao.get_audit_teams.error")
104 raise
106 # ── Event submission ──────────────────────────────────────────────────────
108 def submit_audit_event(self, event_data: dict) -> None:
109 """Record a completed audit run and update the team's scheduling state.
111 Always upserts audit_teams.last_audited_at (records that an audit ran).
112 Only inserts an audit_events row when findings exist.
114 Args:
115 event_data: Dict matching the POST /api/agent/audit/events body:
116 event_id, audit_run_id, team, age_group, league,
117 division, season, findings (list of dicts).
118 """
119 event_id = event_data["event_id"]
120 team = event_data["team"]
121 age_group = event_data["age_group"]
122 league = event_data["league"]
123 division = event_data["division"]
124 season = event_data["season"]
125 findings = event_data.get("findings", [])
126 now_iso = datetime.now(tz=UTC).isoformat()
128 audit_status = "findings" if findings else "clean"
130 # 1. Upsert audit_teams scheduling state
131 try:
132 self.client.table("audit_teams").upsert(
133 {
134 "team": team,
135 "age_group": age_group,
136 "league": league,
137 "division": division,
138 "season": season,
139 "last_audited_at": now_iso,
140 "last_audit_status": audit_status,
141 "findings_count": len(findings),
142 },
143 on_conflict="team,age_group,league,division,season",
144 ).execute()
145 except Exception:
146 logger.exception(
147 "audit_dao.submit_event.upsert_team_error",
148 team=team,
149 age_group=age_group,
150 )
151 raise
153 # 2. Insert audit_events row only when there are findings
154 if not findings:
155 logger.info(
156 "audit_dao.submit_event.clean",
157 team=team,
158 age_group=age_group,
159 event_id=event_id,
160 )
161 return
163 try:
164 self.client.table("audit_events").insert(
165 {
166 "event_id": event_id,
167 "audit_run_id": event_data.get("audit_run_id", event_id),
168 "team": team,
169 "age_group": age_group,
170 "league": league,
171 "division": division,
172 "season": season,
173 "findings": findings,
174 "status": "pending",
175 }
176 ).execute()
177 except Exception:
178 logger.exception(
179 "audit_dao.submit_event.insert_error",
180 event_id=event_id,
181 team=team,
182 )
183 raise
185 logger.info(
186 "audit_dao.submit_event.done",
187 event_id=event_id,
188 team=team,
189 age_group=age_group,
190 findings=len(findings),
191 )
193 # ── Event processing ──────────────────────────────────────────────────────
195 def get_events(
196 self,
197 season: str,
198 status: str = "pending",
199 team: str | None = None,
200 age_group: str | None = None,
201 ) -> list[dict]:
202 """Return audit events filtered by status (and optionally team/age_group)."""
203 try:
204 query = (
205 self.client.table("audit_events")
206 .select("*")
207 .eq("season", season)
208 .eq("status", status)
209 .order("created_at", desc=False)
210 )
211 if team:
212 query = query.eq("team", team)
213 if age_group:
214 query = query.eq("age_group", age_group)
216 response = query.execute()
217 return response.data or []
218 except Exception:
219 logger.exception("audit_dao.get_events.error", season=season, status=status)
220 raise
222 def update_event_status(
223 self,
224 event_id: str,
225 status: str,
226 processed_at: str | None = None,
227 ) -> None:
228 """Update the processing status of an audit event."""
229 payload: dict = {"status": status}
230 if processed_at:
231 payload["processed_at"] = processed_at
233 try:
234 self.client.table("audit_events").update(payload).eq("event_id", event_id).execute()
235 except Exception:
236 logger.exception(
237 "audit_dao.update_event_status.error",
238 event_id=event_id,
239 status=status,
240 )
241 raise
243 logger.info("audit_dao.update_event_status", event_id=event_id, status=status)
245 # ── Summary ───────────────────────────────────────────────────────────────
247 def get_audit_summary(self, season: str, division: str, league: str) -> dict:
248 """Return audit coverage metrics for a season/division/league."""
249 from datetime import date
251 cutoff = (date.today() - timedelta(days=_AUDIT_WINDOW_DAYS)).isoformat()
253 try:
254 teams_resp = (
255 self.client.table("audit_teams")
256 .select("team, age_group, last_audited_at, last_audit_status, findings_count")
257 .eq("season", season)
258 .eq("division", division)
259 .eq("league", league)
260 .execute()
261 )
262 except Exception:
263 logger.exception("audit_dao.get_audit_summary.error")
264 raise
266 teams = teams_resp.data or []
267 total = len(teams)
268 audited_this_week = sum(1 for t in teams if t.get("last_audited_at") and t["last_audited_at"][:10] >= cutoff)
269 overdue = total - audited_this_week
270 clean = sum(1 for t in teams if t.get("last_audit_status") == "clean")
271 with_findings = sum(1 for t in teams if t.get("last_audit_status") == "findings")
273 # Findings breakdown from pending events
274 findings_by_type: dict[str, int] = {}
275 try:
276 events_resp = (
277 self.client.table("audit_events")
278 .select("findings")
279 .eq("season", season)
280 .eq("status", "pending")
281 .execute()
282 )
283 for ev in events_resp.data or []:
284 for f in ev.get("findings", []):
285 ft = f.get("finding_type", "unknown")
286 findings_by_type[ft] = findings_by_type.get(ft, 0) + 1
287 except Exception:
288 logger.warning("audit_dao.get_audit_summary.findings_error")
290 return {
291 "season": season,
292 "total_teams": total,
293 "audited_this_week": audited_this_week,
294 "overdue_teams": overdue,
295 "findings_by_type": findings_by_type,
296 "teams_with_findings": with_findings,
297 "clean_teams": clean,
298 }