Coverage for queue_cli/core/config.py: 0.00%
22 statements
« prev ^ index » next coverage.py v7.10.6, created at 2026-04-13 14:11 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2026-04-13 14:11 +0000
1"""
2Configuration management for queue CLI.
4Handles loading configuration from environment variables and config files.
5"""
7import os
8from dataclasses import dataclass
11@dataclass
12class QueueConfig:
13 """Queue CLI configuration."""
15 broker_url: str
16 result_backend: str
17 default_queue: str = "matches"
18 default_exchange: str = "matches"
19 default_routing_key: str = "matches.process"
21 @classmethod
22 def from_env(cls, env: str | None = None) -> "QueueConfig":
23 """
24 Load configuration from environment variables.
26 Args:
27 env: Environment name (local, prod). Defaults to current environment.
29 Returns:
30 QueueConfig instance
31 """
32 # Get broker URL from environment
33 broker_url = os.getenv(
34 "CELERY_BROKER_URL",
35 os.getenv(
36 "RABBITMQ_URL",
37 "amqp://admin:admin123@localhost:5672//", # pragma: allowlist secret
38 ),
39 )
41 # Get result backend from environment
42 result_backend = os.getenv("CELERY_RESULT_BACKEND", os.getenv("REDIS_URL", "redis://localhost:6379/0"))
44 return cls(
45 broker_url=broker_url,
46 result_backend=result_backend,
47 default_queue="matches",
48 default_exchange="matches",
49 default_routing_key="matches.process",
50 )
52 def get_sanitized_broker_url(self) -> str:
53 """Get broker URL with credentials hidden."""
54 if "@" in self.broker_url:
55 # Split on @ and return everything after it
56 return self.broker_url.split("@")[-1]
57 return self.broker_url
59 def get_sanitized_result_backend(self) -> str:
60 """Get result backend URL with credentials hidden."""
61 if "@" in self.result_backend:
62 return self.result_backend.split("@")[-1]
63 return self.result_backend