Coverage for dao/audit_dao.py: 9.52%

106 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2026-04-13 14:11 +0000

1""" 

2Audit data access object for match data-integrity auditing. 

3 

4Provides operations for the audit_teams scheduling table and audit_events 

5findings table. Used exclusively by the match-scraper-agent (service key). 

6""" 

7 

8from datetime import UTC, datetime, timedelta 

9 

10import structlog 

11 

12from dao.base_dao import BaseDAO 

13 

14logger = structlog.get_logger() 

15 

16# Teams audited within this window are considered "up-to-date" 

17_AUDIT_WINDOW_DAYS = 7 

18 

19 

20class AuditDAO(BaseDAO): 

21 """Data Access Object for audit scheduling and findings.""" 

22 

23 # ── Scheduling ──────────────────────────────────────────────────────────── 

24 

25 def get_next_team(self, season: str, division: str, league: str) -> dict | None: 

26 """Return the next team to audit, or None if all teams are current. 

27 

28 Selects the row with the oldest last_audited_at (NULL first = highest 

29 priority). Returns None when the oldest audit is still within the 

30 7-day window — meaning every team has been seen this week. 

31 

32 Args: 

33 season: Season name, e.g. "2025-2026". 

34 division: Division name, e.g. "Northeast". 

35 league: League name, e.g. "Homegrown". 

36 

37 Returns: 

38 Dict with team/age_group/league/division/season/last_audited_at, 

39 or None if all teams are up-to-date. 

40 """ 

41 try: 

42 response = ( 

43 self.client.table("audit_teams") 

44 .select("team, age_group, league, division, season, last_audited_at") 

45 .eq("season", season) 

46 .eq("division", division) 

47 .eq("league", league) 

48 .order("last_audited_at", desc=False, nullsfirst=True) 

49 .limit(1) 

50 .execute() 

51 ) 

52 except Exception: 

53 logger.exception("audit_dao.get_next_team.error") 

54 raise 

55 

56 if not response.data: 

57 logger.info("audit_dao.get_next_team.no_teams", season=season) 

58 return None 

59 

60 row = response.data[0] 

61 last_audited = row.get("last_audited_at") 

62 

63 if last_audited is not None: 

64 # Parse timestamp and check if still within the audit window 

65 try: 

66 audited_dt = datetime.fromisoformat(last_audited.replace("Z", "+00:00")) 

67 except (ValueError, AttributeError): 

68 audited_dt = None 

69 

70 if audited_dt is not None: 

71 cutoff = datetime.now(tz=UTC) - timedelta(days=_AUDIT_WINDOW_DAYS) 

72 if audited_dt >= cutoff: 

73 logger.info( 

74 "audit_dao.get_next_team.all_current", 

75 season=season, 

76 oldest_audit=last_audited, 

77 ) 

78 return None 

79 

80 logger.info( 

81 "audit_dao.get_next_team.selected", 

82 team=row["team"], 

83 age_group=row["age_group"], 

84 last_audited_at=last_audited, 

85 ) 

86 return row 

87 

88 def get_audit_teams(self, season: str, division: str, league: str) -> list[dict]: 

89 """Return all teams registered for auditing in a season/division/league.""" 

90 try: 

91 response = ( 

92 self.client.table("audit_teams") 

93 .select("*") 

94 .eq("season", season) 

95 .eq("division", division) 

96 .eq("league", league) 

97 .order("age_group") 

98 .order("team") 

99 .execute() 

100 ) 

101 return response.data or [] 

102 except Exception: 

103 logger.exception("audit_dao.get_audit_teams.error") 

104 raise 

105 

106 # ── Event submission ────────────────────────────────────────────────────── 

107 

108 def submit_audit_event(self, event_data: dict) -> None: 

109 """Record a completed audit run and update the team's scheduling state. 

110 

111 Always upserts audit_teams.last_audited_at (records that an audit ran). 

112 Only inserts an audit_events row when findings exist. 

113 

114 Args: 

115 event_data: Dict matching the POST /api/agent/audit/events body: 

116 event_id, audit_run_id, team, age_group, league, 

117 division, season, findings (list of dicts). 

118 """ 

119 event_id = event_data["event_id"] 

120 team = event_data["team"] 

121 age_group = event_data["age_group"] 

122 league = event_data["league"] 

123 division = event_data["division"] 

124 season = event_data["season"] 

125 findings = event_data.get("findings", []) 

126 now_iso = datetime.now(tz=UTC).isoformat() 

127 

128 audit_status = "findings" if findings else "clean" 

129 

130 # 1. Upsert audit_teams scheduling state 

131 try: 

132 self.client.table("audit_teams").upsert( 

133 { 

134 "team": team, 

135 "age_group": age_group, 

136 "league": league, 

137 "division": division, 

138 "season": season, 

139 "last_audited_at": now_iso, 

140 "last_audit_status": audit_status, 

141 "findings_count": len(findings), 

142 }, 

143 on_conflict="team,age_group,league,division,season", 

144 ).execute() 

145 except Exception: 

146 logger.exception( 

147 "audit_dao.submit_event.upsert_team_error", 

148 team=team, 

149 age_group=age_group, 

150 ) 

151 raise 

152 

153 # 2. Insert audit_events row only when there are findings 

154 if not findings: 

155 logger.info( 

156 "audit_dao.submit_event.clean", 

157 team=team, 

158 age_group=age_group, 

159 event_id=event_id, 

160 ) 

161 return 

162 

163 try: 

164 self.client.table("audit_events").insert( 

165 { 

166 "event_id": event_id, 

167 "audit_run_id": event_data.get("audit_run_id", event_id), 

168 "team": team, 

169 "age_group": age_group, 

170 "league": league, 

171 "division": division, 

172 "season": season, 

173 "findings": findings, 

174 "status": "pending", 

175 } 

176 ).execute() 

177 except Exception: 

178 logger.exception( 

179 "audit_dao.submit_event.insert_error", 

180 event_id=event_id, 

181 team=team, 

182 ) 

183 raise 

184 

185 logger.info( 

186 "audit_dao.submit_event.done", 

187 event_id=event_id, 

188 team=team, 

189 age_group=age_group, 

190 findings=len(findings), 

191 ) 

192 

193 # ── Event processing ────────────────────────────────────────────────────── 

194 

195 def get_events( 

196 self, 

197 season: str, 

198 status: str = "pending", 

199 team: str | None = None, 

200 age_group: str | None = None, 

201 ) -> list[dict]: 

202 """Return audit events filtered by status (and optionally team/age_group).""" 

203 try: 

204 query = ( 

205 self.client.table("audit_events") 

206 .select("*") 

207 .eq("season", season) 

208 .eq("status", status) 

209 .order("created_at", desc=False) 

210 ) 

211 if team: 

212 query = query.eq("team", team) 

213 if age_group: 

214 query = query.eq("age_group", age_group) 

215 

216 response = query.execute() 

217 return response.data or [] 

218 except Exception: 

219 logger.exception("audit_dao.get_events.error", season=season, status=status) 

220 raise 

221 

222 def update_event_status( 

223 self, 

224 event_id: str, 

225 status: str, 

226 processed_at: str | None = None, 

227 ) -> None: 

228 """Update the processing status of an audit event.""" 

229 payload: dict = {"status": status} 

230 if processed_at: 

231 payload["processed_at"] = processed_at 

232 

233 try: 

234 self.client.table("audit_events").update(payload).eq("event_id", event_id).execute() 

235 except Exception: 

236 logger.exception( 

237 "audit_dao.update_event_status.error", 

238 event_id=event_id, 

239 status=status, 

240 ) 

241 raise 

242 

243 logger.info("audit_dao.update_event_status", event_id=event_id, status=status) 

244 

245 # ── Summary ─────────────────────────────────────────────────────────────── 

246 

247 def get_audit_summary(self, season: str, division: str, league: str) -> dict: 

248 """Return audit coverage metrics for a season/division/league.""" 

249 from datetime import date 

250 

251 cutoff = (date.today() - timedelta(days=_AUDIT_WINDOW_DAYS)).isoformat() 

252 

253 try: 

254 teams_resp = ( 

255 self.client.table("audit_teams") 

256 .select("team, age_group, last_audited_at, last_audit_status, findings_count") 

257 .eq("season", season) 

258 .eq("division", division) 

259 .eq("league", league) 

260 .execute() 

261 ) 

262 except Exception: 

263 logger.exception("audit_dao.get_audit_summary.error") 

264 raise 

265 

266 teams = teams_resp.data or [] 

267 total = len(teams) 

268 audited_this_week = sum(1 for t in teams if t.get("last_audited_at") and t["last_audited_at"][:10] >= cutoff) 

269 overdue = total - audited_this_week 

270 clean = sum(1 for t in teams if t.get("last_audit_status") == "clean") 

271 with_findings = sum(1 for t in teams if t.get("last_audit_status") == "findings") 

272 

273 # Findings breakdown from pending events 

274 findings_by_type: dict[str, int] = {} 

275 try: 

276 events_resp = ( 

277 self.client.table("audit_events") 

278 .select("findings") 

279 .eq("season", season) 

280 .eq("status", "pending") 

281 .execute() 

282 ) 

283 for ev in events_resp.data or []: 

284 for f in ev.get("findings", []): 

285 ft = f.get("finding_type", "unknown") 

286 findings_by_type[ft] = findings_by_type.get(ft, 0) + 1 

287 except Exception: 

288 logger.warning("audit_dao.get_audit_summary.findings_error") 

289 

290 return { 

291 "season": season, 

292 "total_teams": total, 

293 "audited_this_week": audited_this_week, 

294 "overdue_teams": overdue, 

295 "findings_by_type": findings_by_type, 

296 "teams_with_findings": with_findings, 

297 "clean_teams": clean, 

298 }