Coverage for celery_tasks/match_tasks.py: 11.27%

200 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2026-04-15 12:31 +0000

1""" 

2Match Processing Tasks 

3 

4Tasks for processing match data from the match-scraper service. 

5These tasks run asynchronously in Celery workers, allowing for: 

6- Non-blocking match data ingestion 

7- Automatic retries on failure 

8- Horizontal scaling of processing capacity 

9""" 

10 

11from datetime import datetime 

12from typing import Any 

13from zoneinfo import ZoneInfo 

14 

15from celery import Task 

16 

17from celery_app import app 

18from celery_tasks.validation_tasks import validate_match_data 

19from dao.league_dao import LeagueDAO 

20from dao.match_dao import MatchDAO, SupabaseConnection 

21from dao.season_dao import SeasonDAO 

22from dao.team_dao import TeamDAO 

23from logging_config import get_logger 

24 

25logger = get_logger(__name__) 

26 

27 

28class DatabaseTask(Task): 

29 """ 

30 Base task class that provides database access. 

31 

32 The DAO and connection attributes are class-level so they're shared 

33 across all task executions in the same worker process. 

34 """ 

35 

36 _connection = None 

37 _dao = None 

38 _team_dao = None 

39 _season_dao = None 

40 _league_dao = None 

41 

42 @property 

43 def dao(self): 

44 """Lazy initialization of MatchDAO to avoid creating connections at import time.""" 

45 if self._dao is None: 

46 if self._connection is None: 

47 self._connection = SupabaseConnection() 

48 self._dao = MatchDAO(self._connection) 

49 return self._dao 

50 

51 @property 

52 def team_dao(self): 

53 """Lazy initialization of TeamDAO for team lookups.""" 

54 if self._team_dao is None: 

55 if self._connection is None: 

56 self._connection = SupabaseConnection() 

57 self._team_dao = TeamDAO(self._connection) 

58 return self._team_dao 

59 

60 @property 

61 def season_dao(self): 

62 """Lazy initialization of SeasonDAO for season/age-group lookups.""" 

63 if self._season_dao is None: 

64 if self._connection is None: 

65 self._connection = SupabaseConnection() 

66 self._season_dao = SeasonDAO(self._connection) 

67 return self._season_dao 

68 

69 @property 

70 def league_dao(self): 

71 """Lazy initialization of LeagueDAO for division lookups.""" 

72 if self._league_dao is None: 

73 if self._connection is None: 

74 self._connection = SupabaseConnection() 

75 self._league_dao = LeagueDAO(self._connection) 

76 return self._league_dao 

77 

78 @staticmethod 

79 def _build_scheduled_kickoff(match_data: dict[str, Any]) -> str | None: 

80 """Combine match_date + match_time into a UTC ISO 8601 timestamp for scheduled_kickoff. 

81 

82 MLS Next displays all times in US Eastern. We interpret match_time as 

83 Eastern, convert to UTC, and return an ISO string suitable for a 

84 Supabase ``timestamptz`` column. 

85 

86 Returns None if match_time is absent or null. 

87 """ 

88 match_time = match_data.get("match_time") 

89 match_date = match_data.get("match_date") 

90 if match_time and match_date: 

91 eastern = ZoneInfo("America/New_York") 

92 naive = datetime.strptime(f"{match_date} {match_time}", "%Y-%m-%d %H:%M") 

93 eastern_dt = naive.replace(tzinfo=eastern) 

94 utc_dt = eastern_dt.astimezone(ZoneInfo("UTC")) 

95 return utc_dt.strftime("%Y-%m-%dT%H:%M:%S+00:00") 

96 return None 

97 

98 def _check_needs_update( 

99 self, 

100 existing_match: dict[str, Any], 

101 new_data: dict[str, Any], 

102 home_team_id: int, 

103 away_team_id: int, 

104 ) -> bool: 

105 """ 

106 Check if the existing match needs to be updated based on new data. 

107 

108 Returns True if any of these conditions are met: 

109 - Home/away teams swapped (home_team_id or away_team_id differ) 

110 - Status changed (scheduled → tbd, tbd → completed, etc.) 

111 - Scores changed (were null, now have values) 

112 - Scores were updated (different values) 

113 

114 Status transition examples: 

115 - scheduled → tbd: Match played, awaiting score 

116 - tbd → tbd: No change (skip) 

117 - tbd → completed: Score posted (update with scores) 

118 - scheduled → completed: Direct completion (skip tbd) 

119 """ 

120 # Check home/away team swap 

121 if existing_match.get("home_team_id") != home_team_id or existing_match.get("away_team_id") != away_team_id: 

122 logger.debug( 

123 f"Team assignment changed: home {existing_match.get('home_team_id')}{home_team_id}, " 

124 f"away {existing_match.get('away_team_id')}{away_team_id}" 

125 ) 

126 return True 

127 

128 # Check status change 

129 existing_status = existing_match.get("match_status", "scheduled") 

130 new_status = new_data.get("match_status", "scheduled") 

131 if existing_status != new_status: 

132 logger.debug(f"Status changed: {existing_status}{new_status}") 

133 return True 

134 

135 # Check score changes 

136 existing_home_score = existing_match.get("home_score") 

137 existing_away_score = existing_match.get("away_score") 

138 new_home_score = new_data.get("home_score") 

139 new_away_score = new_data.get("away_score") 

140 

141 # If new data has scores and they differ from existing 

142 if new_home_score is not None and new_away_score is not None: 

143 if existing_home_score != new_home_score or existing_away_score != new_away_score: 

144 logger.debug( 

145 f"Scores changed: {existing_home_score}-{existing_away_score}{new_home_score}-{new_away_score}" 

146 ) 

147 return True 

148 

149 # Check if match_date changed (rescheduled match) 

150 existing_date = existing_match.get("match_date") 

151 new_date = new_data.get("match_date") 

152 if new_date and existing_date and new_date != existing_date: 

153 logger.debug(f"match_date changed: {existing_date}{new_date}") 

154 return True 

155 

156 # Check if scheduled_kickoff can be set/updated from match_time 

157 new_kickoff = self._build_scheduled_kickoff(new_data) 

158 existing_kickoff = existing_match.get("scheduled_kickoff") 

159 if new_kickoff and new_kickoff != existing_kickoff: 

160 logger.debug(f"scheduled_kickoff changed: {existing_kickoff}{new_kickoff}") 

161 return True 

162 

163 return False 

164 

165 def _update_match_scores( 

166 self, 

167 existing_match: dict[str, Any], 

168 new_data: dict[str, Any], 

169 home_team_id: int | None = None, 

170 away_team_id: int | None = None, 

171 ) -> bool: 

172 """ 

173 Update an existing match's scores, status, and team assignments. 

174 

175 Updates scores, status, match_date, scheduled_kickoff, and 

176 home_team_id/away_team_id when a home/away swap is detected. 

177 """ 

178 try: 

179 match_id = existing_match["id"] 

180 

181 # Prepare update data 

182 update_data = {} 

183 

184 # Correct home/away team swap if team IDs differ 

185 if home_team_id is not None and home_team_id != existing_match.get("home_team_id"): 

186 update_data["home_team_id"] = home_team_id 

187 logger.info( 

188 f"Match {match_id} home_team_id corrected: {existing_match.get('home_team_id')}{home_team_id}" 

189 ) 

190 if away_team_id is not None and away_team_id != existing_match.get("away_team_id"): 

191 update_data["away_team_id"] = away_team_id 

192 logger.info( 

193 f"Match {match_id} away_team_id corrected: {existing_match.get('away_team_id')}{away_team_id}" 

194 ) 

195 

196 # Update scores if provided 

197 if new_data.get("home_score") is not None: 

198 update_data["home_score"] = new_data["home_score"] 

199 if new_data.get("away_score") is not None: 

200 update_data["away_score"] = new_data["away_score"] 

201 

202 # Update status if provided 

203 if new_data.get("match_status"): 

204 update_data["match_status"] = new_data["match_status"] 

205 

206 # Update match_date if changed (rescheduled match) 

207 new_date = new_data.get("match_date") 

208 existing_date = existing_match.get("match_date") 

209 if new_date and existing_date and new_date != existing_date: 

210 update_data["match_date"] = new_date 

211 logger.info(f"Match {match_id} rescheduled: {existing_date}{new_date}") 

212 

213 # Update scheduled_kickoff if match_time provided and different 

214 new_kickoff = self._build_scheduled_kickoff(new_data) 

215 if new_kickoff and new_kickoff != existing_match.get("scheduled_kickoff"): 

216 update_data["scheduled_kickoff"] = new_kickoff 

217 

218 # Note: updated_by field expects UUID, not string. 

219 # For match-scraper updates, we'll skip this field since it's optional. 

220 # If we need to track match-scraper updates, we should use 'source' field instead. 

221 

222 if not update_data: 

223 logger.warning(f"No update data provided for match {match_id}") 

224 return False 

225 

226 # Execute update directly on matches table 

227 response = self.dao.client.table("matches").update(update_data).eq("id", match_id).execute() 

228 

229 if response.data: 

230 logger.info(f"Successfully updated match {match_id}: {update_data}") 

231 return True 

232 else: 

233 logger.error(f"Update returned no data for match {match_id}") 

234 return False 

235 

236 except Exception as e: 

237 logger.error(f"Error updating match scores: {e}", exc_info=True) 

238 return False 

239 

240 

241@app.task( 

242 bind=True, 

243 base=DatabaseTask, 

244 name="celery_tasks.match_tasks.process_match_data", 

245 max_retries=3, 

246 default_retry_delay=60, # Retry after 1 minute 

247 autoretry_for=(Exception,), # Auto-retry on any exception 

248 retry_backoff=True, # Exponential backoff 

249 retry_backoff_max=600, # Max 10 minutes backoff 

250 retry_jitter=True, # Add random jitter to prevent thundering herd 

251) 

252def process_match_data(self: DatabaseTask, match_data: dict[str, Any]) -> dict[str, Any]: 

253 """ 

254 Process match data from match-scraper and insert into database. 

255 

256 This task: 

257 1. Validates the match data 

258 2. Extracts/creates teams if needed 

259 3. Inserts or updates the match in the database 

260 4. Returns the created/updated match ID 

261 

262 Args: 

263 match_data: Dictionary containing match information 

264 Required fields: 

265 - home_team: str 

266 - away_team: str 

267 - match_date: str (ISO format) 

268 - season: str 

269 - age_group: str 

270 - division: str 

271 Optional fields: 

272 - home_score: int 

273 - away_score: int 

274 - match_status: str 

275 - match_type: str 

276 - location: str 

277 

278 Returns: 

279 Dict containing: 

280 - match_id: int - The database ID of the created/updated match 

281 - status: str - 'created' or 'updated' 

282 - message: str - Success message 

283 

284 Raises: 

285 ValidationError: If match data is invalid 

286 DatabaseError: If database operation fails 

287 """ 

288 try: 

289 mls_id = match_data.get("external_match_id", "N/A") 

290 logger.info( 

291 f"Processing match data: {match_data.get('home_team')} vs {match_data.get('away_team')} (MLS ID: {mls_id})" 

292 ) 

293 

294 # Step 1: Validate the match data 

295 validation_result = validate_match_data(match_data) 

296 if not validation_result["valid"]: 

297 error_msg = f"Invalid match data: {validation_result['errors']}" 

298 logger.error(error_msg) 

299 raise ValueError(error_msg) 

300 

301 # Step 2: Extract team information 

302 home_team_name = match_data["home_team"] 

303 away_team_name = match_data["away_team"] 

304 

305 logger.debug(f"Looking up teams: {home_team_name}, {away_team_name}") 

306 

307 # Get or create teams 

308 home_team = self.team_dao.get_team_by_name(home_team_name) 

309 if not home_team: 

310 logger.warning(f"Home team not found: {home_team_name}. Creating placeholder.") 

311 raise ValueError(f"Team not found: {home_team_name}") 

312 

313 away_team = self.team_dao.get_team_by_name(away_team_name) 

314 if not away_team: 

315 logger.warning(f"Away team not found: {away_team_name}. Creating placeholder.") 

316 raise ValueError(f"Team not found: {away_team_name}") 

317 

318 # Step 3: Check if match already exists 

319 external_match_id = match_data.get("external_match_id") 

320 existing_match = None 

321 

322 # First try: Look up by external ID (fast path for previously scraped matches) 

323 if external_match_id: 

324 existing_match = self.dao.get_match_by_external_id(external_match_id) 

325 logger.debug( 

326 f"Lookup by external_match_id '{external_match_id}': {'found' if existing_match else 'not found'}" 

327 ) 

328 

329 # Second try: Fallback to lookup by teams + date + age_group (for manually-entered matches) 

330 if not existing_match: 

331 # Look up age_group_id if age_group is provided 

332 age_group_id = None 

333 if match_data.get("age_group"): 

334 age_group = self.season_dao.get_age_group_by_name(match_data["age_group"]) 

335 if age_group: 

336 age_group_id = age_group["id"] 

337 else: 

338 logger.warning(f"Age group not found: {match_data['age_group']}") 

339 

340 existing_match = self.dao.get_match_by_teams_and_date( 

341 home_team_id=home_team["id"], 

342 away_team_id=away_team["id"], 

343 match_date=match_data["match_date"], 

344 age_group_id=age_group_id, 

345 ) 

346 

347 if existing_match: 

348 logger.info( 

349 f"Found manually-entered match via fallback lookup: " 

350 f"{home_team_name} vs {away_team_name} on {match_data['match_date']}" 

351 + (f" ({match_data['age_group']})" if match_data.get("age_group") else "") 

352 ) 

353 

354 # If found a match without external_match_id, populate it 

355 if external_match_id and not existing_match.get("match_id"): 

356 logger.info(f"Populating match_id '{external_match_id}' on existing match {existing_match['id']}") 

357 success = self.dao.update_match_external_id( 

358 match_id=existing_match["id"], external_match_id=external_match_id 

359 ) 

360 if success: 

361 # Update the existing_match dict to reflect the new match_id 

362 existing_match["match_id"] = external_match_id 

363 existing_match["source"] = "match-scraper" 

364 logger.info(f"Successfully populated match_id on match {existing_match['id']}") 

365 else: 

366 logger.warning(f"Failed to populate match_id on match {existing_match['id']}") 

367 

368 # Step 4: Insert or update match 

369 if existing_match: 

370 # Check if this is a score update 

371 needs_update = self._check_needs_update(existing_match, match_data, home_team["id"], away_team["id"]) 

372 

373 if needs_update: 

374 logger.info( 

375 f"Updating existing match DB ID {existing_match['id']} (MLS ID: {external_match_id}): " 

376 f"{home_team_name} vs {away_team_name}" 

377 ) 

378 success = self._update_match_scores(existing_match, match_data, home_team["id"], away_team["id"]) 

379 

380 if success: 

381 result = { 

382 "db_id": existing_match["id"], 

383 "mls_id": external_match_id, 

384 "status": "updated", 

385 "message": f"Updated match scores: {home_team_name} vs {away_team_name}", 

386 } 

387 else: 

388 raise Exception(f"Failed to update match {existing_match['id']}") 

389 else: 

390 logger.info( 

391 f"Match already exists with DB ID {existing_match['id']} (MLS ID: {external_match_id}). " 

392 "No changes needed." 

393 ) 

394 result = { 

395 "db_id": existing_match["id"], 

396 "mls_id": external_match_id, 

397 "status": "skipped", 

398 "message": f"Match unchanged: {home_team_name} vs {away_team_name}", 

399 } 

400 else: 

401 logger.info(f"Creating new match (MLS ID: {external_match_id}): {home_team_name} vs {away_team_name}") 

402 

403 # Resolve names to IDs before calling create_match 

404 current_season = self.season_dao.get_current_season() 

405 season_id = current_season["id"] if current_season else 1 

406 

407 age_group_id_for_create = 1 # Default fallback 

408 if match_data.get("age_group"): 

409 ag_record = self.season_dao.get_age_group_by_name(match_data["age_group"]) 

410 if ag_record: 

411 age_group_id_for_create = ag_record["id"] 

412 else: 

413 logger.warning(f"Age group '{match_data['age_group']}' not found, using default ID 1") 

414 

415 division_id_for_create = None 

416 if match_data.get("division"): 

417 div_record = self.league_dao.get_division_by_name(match_data["division"]) 

418 if div_record: 

419 division_id_for_create = div_record["id"] 

420 else: 

421 logger.error(f"Division '{match_data['division']}' not found in database") 

422 

423 scheduled_kickoff = self._build_scheduled_kickoff(match_data) 

424 

425 match_id = self.dao.create_match( 

426 home_team_id=home_team["id"], 

427 away_team_id=away_team["id"], 

428 match_date=match_data["match_date"], 

429 season_id=season_id, 

430 home_score=match_data.get("home_score"), 

431 away_score=match_data.get("away_score"), 

432 match_status=match_data.get("match_status", "scheduled"), 

433 source="match-scraper", 

434 match_id=external_match_id, 

435 age_group_id=age_group_id_for_create, 

436 division_id=division_id_for_create, 

437 scheduled_kickoff=scheduled_kickoff, 

438 ) 

439 if match_id: 

440 result = { 

441 "db_id": match_id, 

442 "mls_id": external_match_id, 

443 "status": "created", 

444 "message": f"Created match: {home_team_name} vs {away_team_name}", 

445 } 

446 else: 

447 raise Exception("Failed to create match") 

448 

449 logger.info(f"Successfully processed match: {result}") 

450 return result 

451 

452 except Exception as e: 

453 logger.error(f"Error processing match data: {e}", exc_info=True) 

454 # Celery will auto-retry based on configuration 

455 raise