Coverage for celery_app.py: 100.00%
7 statements
« prev ^ index » next coverage.py v7.10.6, created at 2026-04-13 14:26 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2026-04-13 14:26 +0000
1"""
2Celery Application Configuration
4This module configures the Celery application for distributed task processing.
5Workers can run separately from the FastAPI application, allowing for:
6- Async processing of match data from match-scraper
7- Horizontal scaling of workers
8- Fault tolerance and automatic retries
9- Task result tracking via Redis
11Usage:
12 # Start worker
13 celery -A celery_app worker --loglevel=info
15 # Submit task from application
16 from celery_app import process_match_data
17 task = process_match_data.delay(match_data)
18"""
20import os
22from celery import Celery
23from kombu import Exchange, Queue
25# Celery broker and result backend configuration
26# These can be overridden with environment variables
27# Default to cluster-internal names, fallback to localhost for local development
28BROKER_URL = os.getenv(
29 "CELERY_BROKER_URL",
30 os.getenv("RABBITMQ_URL", "amqp://admin:admin123@localhost:5672//"), # pragma: allowlist secret
31)
32RESULT_BACKEND = os.getenv("CELERY_RESULT_BACKEND", os.getenv("REDIS_URL", "redis://localhost:6379/0"))
34# Create Celery app instance
35app = Celery(
36 "missing_table",
37 broker=BROKER_URL,
38 backend=RESULT_BACKEND,
39)
41# Celery configuration
42app.conf.update(
43 # Serialization
44 task_serializer="json",
45 accept_content=["json"],
46 result_serializer="json",
47 # Timezone
48 timezone="UTC",
49 enable_utc=True,
50 # Task tracking
51 task_track_started=True,
52 task_time_limit=300, # 5 minutes hard limit
53 task_soft_time_limit=240, # 4 minutes soft limit
54 # Results
55 result_expires=3600, # Results expire after 1 hour
56 result_extended=True, # Store additional task metadata
57 # Retries
58 task_acks_late=True, # Acknowledge task after completion, not on receive
59 task_reject_on_worker_lost=True, # Requeue task if worker dies
60 # Worker configuration
61 worker_prefetch_multiplier=1, # Worker fetches 1 task at a time (fair distribution)
62 worker_max_tasks_per_child=1000, # Restart worker after 1000 tasks (prevent memory leaks)
63 # Task routing
64 task_routes={
65 "celery_tasks.match_tasks.*": {"queue": "match_processing"},
66 "celery_tasks.validation_tasks.*": {"queue": "validation"},
67 },
68 # Queue configuration
69 task_queues=(
70 Queue("match_processing", Exchange("match_processing"), routing_key="match.*"),
71 Queue("validation", Exchange("validation"), routing_key="validation.*"),
72 Queue("celery", Exchange("celery"), routing_key="celery"), # Default queue
73 # Legacy direct queue: MSA pods configured with AGENT_QUEUE_NAME=matches.prod
74 # bypass the fanout exchange and publish here. Worker must consume both until
75 # all pods are switched back to using the matches-fanout exchange.
76 Queue("matches.prod"),
77 ),
78 # Monitoring
79 task_send_sent_event=True, # Enable task-sent events
80 worker_send_task_events=True, # Enable worker task events
81)
83# Auto-discover tasks in celery_tasks module
84app.autodiscover_tasks(["celery_tasks"])
86if __name__ == "__main__":
87 app.start()