Coverage for celery_tasks/match_tasks.py: 11.27%
200 statements
« prev ^ index » next coverage.py v7.10.6, created at 2026-04-13 00:07 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2026-04-13 00:07 +0000
1"""
2Match Processing Tasks
4Tasks for processing match data from the match-scraper service.
5These tasks run asynchronously in Celery workers, allowing for:
6- Non-blocking match data ingestion
7- Automatic retries on failure
8- Horizontal scaling of processing capacity
9"""
11from datetime import datetime
12from typing import Any
13from zoneinfo import ZoneInfo
15from celery import Task
17from celery_app import app
18from celery_tasks.validation_tasks import validate_match_data
19from dao.league_dao import LeagueDAO
20from dao.match_dao import MatchDAO, SupabaseConnection
21from dao.season_dao import SeasonDAO
22from dao.team_dao import TeamDAO
23from logging_config import get_logger
25logger = get_logger(__name__)
28class DatabaseTask(Task):
29 """
30 Base task class that provides database access.
32 The DAO and connection attributes are class-level so they're shared
33 across all task executions in the same worker process.
34 """
36 _connection = None
37 _dao = None
38 _team_dao = None
39 _season_dao = None
40 _league_dao = None
42 @property
43 def dao(self):
44 """Lazy initialization of MatchDAO to avoid creating connections at import time."""
45 if self._dao is None:
46 if self._connection is None:
47 self._connection = SupabaseConnection()
48 self._dao = MatchDAO(self._connection)
49 return self._dao
51 @property
52 def team_dao(self):
53 """Lazy initialization of TeamDAO for team lookups."""
54 if self._team_dao is None:
55 if self._connection is None:
56 self._connection = SupabaseConnection()
57 self._team_dao = TeamDAO(self._connection)
58 return self._team_dao
60 @property
61 def season_dao(self):
62 """Lazy initialization of SeasonDAO for season/age-group lookups."""
63 if self._season_dao is None:
64 if self._connection is None:
65 self._connection = SupabaseConnection()
66 self._season_dao = SeasonDAO(self._connection)
67 return self._season_dao
69 @property
70 def league_dao(self):
71 """Lazy initialization of LeagueDAO for division lookups."""
72 if self._league_dao is None:
73 if self._connection is None:
74 self._connection = SupabaseConnection()
75 self._league_dao = LeagueDAO(self._connection)
76 return self._league_dao
78 @staticmethod
79 def _build_scheduled_kickoff(match_data: dict[str, Any]) -> str | None:
80 """Combine match_date + match_time into a UTC ISO 8601 timestamp for scheduled_kickoff.
82 MLS Next displays all times in US Eastern. We interpret match_time as
83 Eastern, convert to UTC, and return an ISO string suitable for a
84 Supabase ``timestamptz`` column.
86 Returns None if match_time is absent or null.
87 """
88 match_time = match_data.get("match_time")
89 match_date = match_data.get("match_date")
90 if match_time and match_date:
91 eastern = ZoneInfo("America/New_York")
92 naive = datetime.strptime(f"{match_date} {match_time}", "%Y-%m-%d %H:%M")
93 eastern_dt = naive.replace(tzinfo=eastern)
94 utc_dt = eastern_dt.astimezone(ZoneInfo("UTC"))
95 return utc_dt.strftime("%Y-%m-%dT%H:%M:%S+00:00")
96 return None
98 def _check_needs_update(
99 self,
100 existing_match: dict[str, Any],
101 new_data: dict[str, Any],
102 home_team_id: int,
103 away_team_id: int,
104 ) -> bool:
105 """
106 Check if the existing match needs to be updated based on new data.
108 Returns True if any of these conditions are met:
109 - Home/away teams swapped (home_team_id or away_team_id differ)
110 - Status changed (scheduled → tbd, tbd → completed, etc.)
111 - Scores changed (were null, now have values)
112 - Scores were updated (different values)
114 Status transition examples:
115 - scheduled → tbd: Match played, awaiting score
116 - tbd → tbd: No change (skip)
117 - tbd → completed: Score posted (update with scores)
118 - scheduled → completed: Direct completion (skip tbd)
119 """
120 # Check home/away team swap
121 if existing_match.get("home_team_id") != home_team_id or existing_match.get("away_team_id") != away_team_id:
122 logger.debug(
123 f"Team assignment changed: home {existing_match.get('home_team_id')} → {home_team_id}, "
124 f"away {existing_match.get('away_team_id')} → {away_team_id}"
125 )
126 return True
128 # Check status change
129 existing_status = existing_match.get("match_status", "scheduled")
130 new_status = new_data.get("match_status", "scheduled")
131 if existing_status != new_status:
132 logger.debug(f"Status changed: {existing_status} → {new_status}")
133 return True
135 # Check score changes
136 existing_home_score = existing_match.get("home_score")
137 existing_away_score = existing_match.get("away_score")
138 new_home_score = new_data.get("home_score")
139 new_away_score = new_data.get("away_score")
141 # If new data has scores and they differ from existing
142 if new_home_score is not None and new_away_score is not None:
143 if existing_home_score != new_home_score or existing_away_score != new_away_score:
144 logger.debug(
145 f"Scores changed: {existing_home_score}-{existing_away_score} → {new_home_score}-{new_away_score}"
146 )
147 return True
149 # Check if match_date changed (rescheduled match)
150 existing_date = existing_match.get("match_date")
151 new_date = new_data.get("match_date")
152 if new_date and existing_date and new_date != existing_date:
153 logger.debug(f"match_date changed: {existing_date} → {new_date}")
154 return True
156 # Check if scheduled_kickoff can be set/updated from match_time
157 new_kickoff = self._build_scheduled_kickoff(new_data)
158 existing_kickoff = existing_match.get("scheduled_kickoff")
159 if new_kickoff and new_kickoff != existing_kickoff:
160 logger.debug(f"scheduled_kickoff changed: {existing_kickoff} → {new_kickoff}")
161 return True
163 return False
165 def _update_match_scores(
166 self,
167 existing_match: dict[str, Any],
168 new_data: dict[str, Any],
169 home_team_id: int | None = None,
170 away_team_id: int | None = None,
171 ) -> bool:
172 """
173 Update an existing match's scores, status, and team assignments.
175 Updates scores, status, match_date, scheduled_kickoff, and
176 home_team_id/away_team_id when a home/away swap is detected.
177 """
178 try:
179 match_id = existing_match["id"]
181 # Prepare update data
182 update_data = {}
184 # Correct home/away team swap if team IDs differ
185 if home_team_id is not None and home_team_id != existing_match.get("home_team_id"):
186 update_data["home_team_id"] = home_team_id
187 logger.info(
188 f"Match {match_id} home_team_id corrected: {existing_match.get('home_team_id')} → {home_team_id}"
189 )
190 if away_team_id is not None and away_team_id != existing_match.get("away_team_id"):
191 update_data["away_team_id"] = away_team_id
192 logger.info(
193 f"Match {match_id} away_team_id corrected: {existing_match.get('away_team_id')} → {away_team_id}"
194 )
196 # Update scores if provided
197 if new_data.get("home_score") is not None:
198 update_data["home_score"] = new_data["home_score"]
199 if new_data.get("away_score") is not None:
200 update_data["away_score"] = new_data["away_score"]
202 # Update status if provided
203 if new_data.get("match_status"):
204 update_data["match_status"] = new_data["match_status"]
206 # Update match_date if changed (rescheduled match)
207 new_date = new_data.get("match_date")
208 existing_date = existing_match.get("match_date")
209 if new_date and existing_date and new_date != existing_date:
210 update_data["match_date"] = new_date
211 logger.info(f"Match {match_id} rescheduled: {existing_date} → {new_date}")
213 # Update scheduled_kickoff if match_time provided and different
214 new_kickoff = self._build_scheduled_kickoff(new_data)
215 if new_kickoff and new_kickoff != existing_match.get("scheduled_kickoff"):
216 update_data["scheduled_kickoff"] = new_kickoff
218 # Note: updated_by field expects UUID, not string.
219 # For match-scraper updates, we'll skip this field since it's optional.
220 # If we need to track match-scraper updates, we should use 'source' field instead.
222 if not update_data:
223 logger.warning(f"No update data provided for match {match_id}")
224 return False
226 # Execute update directly on matches table
227 response = self.dao.client.table("matches").update(update_data).eq("id", match_id).execute()
229 if response.data:
230 logger.info(f"Successfully updated match {match_id}: {update_data}")
231 return True
232 else:
233 logger.error(f"Update returned no data for match {match_id}")
234 return False
236 except Exception as e:
237 logger.error(f"Error updating match scores: {e}", exc_info=True)
238 return False
241@app.task(
242 bind=True,
243 base=DatabaseTask,
244 name="celery_tasks.match_tasks.process_match_data",
245 max_retries=3,
246 default_retry_delay=60, # Retry after 1 minute
247 autoretry_for=(Exception,), # Auto-retry on any exception
248 retry_backoff=True, # Exponential backoff
249 retry_backoff_max=600, # Max 10 minutes backoff
250 retry_jitter=True, # Add random jitter to prevent thundering herd
251)
252def process_match_data(self: DatabaseTask, match_data: dict[str, Any]) -> dict[str, Any]:
253 """
254 Process match data from match-scraper and insert into database.
256 This task:
257 1. Validates the match data
258 2. Extracts/creates teams if needed
259 3. Inserts or updates the match in the database
260 4. Returns the created/updated match ID
262 Args:
263 match_data: Dictionary containing match information
264 Required fields:
265 - home_team: str
266 - away_team: str
267 - match_date: str (ISO format)
268 - season: str
269 - age_group: str
270 - division: str
271 Optional fields:
272 - home_score: int
273 - away_score: int
274 - match_status: str
275 - match_type: str
276 - location: str
278 Returns:
279 Dict containing:
280 - match_id: int - The database ID of the created/updated match
281 - status: str - 'created' or 'updated'
282 - message: str - Success message
284 Raises:
285 ValidationError: If match data is invalid
286 DatabaseError: If database operation fails
287 """
288 try:
289 mls_id = match_data.get("external_match_id", "N/A")
290 logger.info(
291 f"Processing match data: {match_data.get('home_team')} vs {match_data.get('away_team')} (MLS ID: {mls_id})"
292 )
294 # Step 1: Validate the match data
295 validation_result = validate_match_data(match_data)
296 if not validation_result["valid"]:
297 error_msg = f"Invalid match data: {validation_result['errors']}"
298 logger.error(error_msg)
299 raise ValueError(error_msg)
301 # Step 2: Extract team information
302 home_team_name = match_data["home_team"]
303 away_team_name = match_data["away_team"]
305 logger.debug(f"Looking up teams: {home_team_name}, {away_team_name}")
307 # Get or create teams
308 home_team = self.team_dao.get_team_by_name(home_team_name)
309 if not home_team:
310 logger.warning(f"Home team not found: {home_team_name}. Creating placeholder.")
311 raise ValueError(f"Team not found: {home_team_name}")
313 away_team = self.team_dao.get_team_by_name(away_team_name)
314 if not away_team:
315 logger.warning(f"Away team not found: {away_team_name}. Creating placeholder.")
316 raise ValueError(f"Team not found: {away_team_name}")
318 # Step 3: Check if match already exists
319 external_match_id = match_data.get("external_match_id")
320 existing_match = None
322 # First try: Look up by external ID (fast path for previously scraped matches)
323 if external_match_id:
324 existing_match = self.dao.get_match_by_external_id(external_match_id)
325 logger.debug(
326 f"Lookup by external_match_id '{external_match_id}': {'found' if existing_match else 'not found'}"
327 )
329 # Second try: Fallback to lookup by teams + date + age_group (for manually-entered matches)
330 if not existing_match:
331 # Look up age_group_id if age_group is provided
332 age_group_id = None
333 if match_data.get("age_group"):
334 age_group = self.season_dao.get_age_group_by_name(match_data["age_group"])
335 if age_group:
336 age_group_id = age_group["id"]
337 else:
338 logger.warning(f"Age group not found: {match_data['age_group']}")
340 existing_match = self.dao.get_match_by_teams_and_date(
341 home_team_id=home_team["id"],
342 away_team_id=away_team["id"],
343 match_date=match_data["match_date"],
344 age_group_id=age_group_id,
345 )
347 if existing_match:
348 logger.info(
349 f"Found manually-entered match via fallback lookup: "
350 f"{home_team_name} vs {away_team_name} on {match_data['match_date']}"
351 + (f" ({match_data['age_group']})" if match_data.get("age_group") else "")
352 )
354 # If found a match without external_match_id, populate it
355 if external_match_id and not existing_match.get("match_id"):
356 logger.info(f"Populating match_id '{external_match_id}' on existing match {existing_match['id']}")
357 success = self.dao.update_match_external_id(
358 match_id=existing_match["id"], external_match_id=external_match_id
359 )
360 if success:
361 # Update the existing_match dict to reflect the new match_id
362 existing_match["match_id"] = external_match_id
363 existing_match["source"] = "match-scraper"
364 logger.info(f"Successfully populated match_id on match {existing_match['id']}")
365 else:
366 logger.warning(f"Failed to populate match_id on match {existing_match['id']}")
368 # Step 4: Insert or update match
369 if existing_match:
370 # Check if this is a score update
371 needs_update = self._check_needs_update(existing_match, match_data, home_team["id"], away_team["id"])
373 if needs_update:
374 logger.info(
375 f"Updating existing match DB ID {existing_match['id']} (MLS ID: {external_match_id}): "
376 f"{home_team_name} vs {away_team_name}"
377 )
378 success = self._update_match_scores(existing_match, match_data, home_team["id"], away_team["id"])
380 if success:
381 result = {
382 "db_id": existing_match["id"],
383 "mls_id": external_match_id,
384 "status": "updated",
385 "message": f"Updated match scores: {home_team_name} vs {away_team_name}",
386 }
387 else:
388 raise Exception(f"Failed to update match {existing_match['id']}")
389 else:
390 logger.info(
391 f"Match already exists with DB ID {existing_match['id']} (MLS ID: {external_match_id}). "
392 "No changes needed."
393 )
394 result = {
395 "db_id": existing_match["id"],
396 "mls_id": external_match_id,
397 "status": "skipped",
398 "message": f"Match unchanged: {home_team_name} vs {away_team_name}",
399 }
400 else:
401 logger.info(f"Creating new match (MLS ID: {external_match_id}): {home_team_name} vs {away_team_name}")
403 # Resolve names to IDs before calling create_match
404 current_season = self.season_dao.get_current_season()
405 season_id = current_season["id"] if current_season else 1
407 age_group_id_for_create = 1 # Default fallback
408 if match_data.get("age_group"):
409 ag_record = self.season_dao.get_age_group_by_name(match_data["age_group"])
410 if ag_record:
411 age_group_id_for_create = ag_record["id"]
412 else:
413 logger.warning(f"Age group '{match_data['age_group']}' not found, using default ID 1")
415 division_id_for_create = None
416 if match_data.get("division"):
417 div_record = self.league_dao.get_division_by_name(match_data["division"])
418 if div_record:
419 division_id_for_create = div_record["id"]
420 else:
421 logger.error(f"Division '{match_data['division']}' not found in database")
423 scheduled_kickoff = self._build_scheduled_kickoff(match_data)
425 match_id = self.dao.create_match(
426 home_team_id=home_team["id"],
427 away_team_id=away_team["id"],
428 match_date=match_data["match_date"],
429 season_id=season_id,
430 home_score=match_data.get("home_score"),
431 away_score=match_data.get("away_score"),
432 match_status=match_data.get("match_status", "scheduled"),
433 source="match-scraper",
434 match_id=external_match_id,
435 age_group_id=age_group_id_for_create,
436 division_id=division_id_for_create,
437 scheduled_kickoff=scheduled_kickoff,
438 )
439 if match_id:
440 result = {
441 "db_id": match_id,
442 "mls_id": external_match_id,
443 "status": "created",
444 "message": f"Created match: {home_team_name} vs {away_team_name}",
445 }
446 else:
447 raise Exception("Failed to create match")
449 logger.info(f"Successfully processed match: {result}")
450 return result
452 except Exception as e:
453 logger.error(f"Error processing match data: {e}", exc_info=True)
454 # Celery will auto-retry based on configuration
455 raise