Coverage for celery_app.py: 100.00%

7 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2026-04-15 13:38 +0000

1""" 

2Celery Application Configuration 

3 

4This module configures the Celery application for distributed task processing. 

5Workers can run separately from the FastAPI application, allowing for: 

6- Async processing of match data from match-scraper 

7- Horizontal scaling of workers 

8- Fault tolerance and automatic retries 

9- Task result tracking via Redis 

10 

11Usage: 

12 # Start worker 

13 celery -A celery_app worker --loglevel=info 

14 

15 # Submit task from application 

16 from celery_app import process_match_data 

17 task = process_match_data.delay(match_data) 

18""" 

19 

20import os 

21 

22from celery import Celery 

23from kombu import Exchange, Queue 

24 

25# Celery broker and result backend configuration 

26# These can be overridden with environment variables 

27# Default to cluster-internal names, fallback to localhost for local development 

28BROKER_URL = os.getenv( 

29 "CELERY_BROKER_URL", 

30 os.getenv("RABBITMQ_URL", "amqp://admin:admin123@localhost:5672//"), # pragma: allowlist secret 

31) 

32RESULT_BACKEND = os.getenv("CELERY_RESULT_BACKEND", os.getenv("REDIS_URL", "redis://localhost:6379/0")) 

33 

34# Create Celery app instance 

35app = Celery( 

36 "missing_table", 

37 broker=BROKER_URL, 

38 backend=RESULT_BACKEND, 

39) 

40 

41# Celery configuration 

42app.conf.update( 

43 # Serialization 

44 task_serializer="json", 

45 accept_content=["json"], 

46 result_serializer="json", 

47 # Timezone 

48 timezone="UTC", 

49 enable_utc=True, 

50 # Task tracking 

51 task_track_started=True, 

52 task_time_limit=300, # 5 minutes hard limit 

53 task_soft_time_limit=240, # 4 minutes soft limit 

54 # Results 

55 result_expires=3600, # Results expire after 1 hour 

56 result_extended=True, # Store additional task metadata 

57 # Retries 

58 task_acks_late=True, # Acknowledge task after completion, not on receive 

59 task_reject_on_worker_lost=True, # Requeue task if worker dies 

60 # Worker configuration 

61 worker_prefetch_multiplier=1, # Worker fetches 1 task at a time (fair distribution) 

62 worker_max_tasks_per_child=1000, # Restart worker after 1000 tasks (prevent memory leaks) 

63 # Task routing 

64 task_routes={ 

65 "celery_tasks.match_tasks.*": {"queue": "match_processing"}, 

66 "celery_tasks.validation_tasks.*": {"queue": "validation"}, 

67 }, 

68 # Queue configuration 

69 task_queues=( 

70 Queue("match_processing", Exchange("match_processing"), routing_key="match.*"), 

71 Queue("validation", Exchange("validation"), routing_key="validation.*"), 

72 Queue("celery", Exchange("celery"), routing_key="celery"), # Default queue 

73 # Legacy direct queue: MSA pods configured with AGENT_QUEUE_NAME=matches.prod 

74 # bypass the fanout exchange and publish here. Worker must consume both until 

75 # all pods are switched back to using the matches-fanout exchange. 

76 Queue("matches.prod"), 

77 ), 

78 # Monitoring 

79 task_send_sent_event=True, # Enable task-sent events 

80 worker_send_task_events=True, # Enable worker task events 

81) 

82 

83# Auto-discover tasks in celery_tasks module 

84app.autodiscover_tasks(["celery_tasks"]) 

85 

86if __name__ == "__main__": 

87 app.start()