Coverage for queue_cli/core/config.py: 0.00%

22 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2026-04-13 14:11 +0000

1""" 

2Configuration management for queue CLI. 

3 

4Handles loading configuration from environment variables and config files. 

5""" 

6 

7import os 

8from dataclasses import dataclass 

9 

10 

11@dataclass 

12class QueueConfig: 

13 """Queue CLI configuration.""" 

14 

15 broker_url: str 

16 result_backend: str 

17 default_queue: str = "matches" 

18 default_exchange: str = "matches" 

19 default_routing_key: str = "matches.process" 

20 

21 @classmethod 

22 def from_env(cls, env: str | None = None) -> "QueueConfig": 

23 """ 

24 Load configuration from environment variables. 

25 

26 Args: 

27 env: Environment name (local, prod). Defaults to current environment. 

28 

29 Returns: 

30 QueueConfig instance 

31 """ 

32 # Get broker URL from environment 

33 broker_url = os.getenv( 

34 "CELERY_BROKER_URL", 

35 os.getenv( 

36 "RABBITMQ_URL", 

37 "amqp://admin:admin123@localhost:5672//", # pragma: allowlist secret 

38 ), 

39 ) 

40 

41 # Get result backend from environment 

42 result_backend = os.getenv("CELERY_RESULT_BACKEND", os.getenv("REDIS_URL", "redis://localhost:6379/0")) 

43 

44 return cls( 

45 broker_url=broker_url, 

46 result_backend=result_backend, 

47 default_queue="matches", 

48 default_exchange="matches", 

49 default_routing_key="matches.process", 

50 ) 

51 

52 def get_sanitized_broker_url(self) -> str: 

53 """Get broker URL with credentials hidden.""" 

54 if "@" in self.broker_url: 

55 # Split on @ and return everything after it 

56 return self.broker_url.split("@")[-1] 

57 return self.broker_url 

58 

59 def get_sanitized_result_backend(self) -> str: 

60 """Get result backend URL with credentials hidden.""" 

61 if "@" in self.result_backend: 

62 return self.result_backend.split("@")[-1] 

63 return self.result_backend