Coverage for queue_cli/core/rabbitmq.py: 0.00%
83 statements
« prev ^ index » next coverage.py v7.10.6, created at 2026-04-13 00:07 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2026-04-13 00:07 +0000
1"""
2RabbitMQ connection and message publishing.
4Uses kombu (the same library Celery uses) for educational consistency.
5"""
7from typing import Any
9from celery import Celery
10from kombu import Connection, Exchange, Producer, Queue
12from .config import QueueConfig
15class RabbitMQClient:
16 """RabbitMQ client for publishing messages."""
18 def __init__(self, config: QueueConfig):
19 """
20 Initialize RabbitMQ client.
22 Args:
23 config: Queue configuration
24 """
25 self.config = config
26 self.connection: Connection | None = None
27 self.producer: Producer | None = None
29 def connect(self) -> tuple[bool, str]:
30 """
31 Connect to RabbitMQ broker.
33 Returns:
34 Tuple of (success, message)
35 """
36 try:
37 self.connection = Connection(self.config.broker_url)
38 self.connection.connect()
40 # Create producer
41 self.producer = Producer(self.connection)
43 return True, f"Connected to {self.config.get_sanitized_broker_url()}"
44 except Exception as e:
45 return False, f"Connection failed: {e}"
47 def disconnect(self) -> None:
48 """Disconnect from RabbitMQ broker."""
49 if self.connection:
50 self.connection.release()
51 self.connection = None
52 self.producer = None
54 def publish_message(
55 self,
56 message: dict[str, Any],
57 queue_name: str | None = None,
58 exchange_name: str | None = None,
59 routing_key: str | None = None,
60 ) -> tuple[bool, str, dict[str, Any]]:
61 """
62 Publish a message to RabbitMQ.
64 Args:
65 message: Message payload
66 queue_name: Target queue name (defaults to config)
67 exchange_name: Exchange name (defaults to config)
68 routing_key: Routing key (defaults to config)
70 Returns:
71 Tuple of (success, message_id, publish_info)
72 """
73 if not self.connection or not self.producer:
74 return False, "", {"error": "Not connected to RabbitMQ"}
76 queue_name = queue_name or self.config.default_queue
77 exchange_name = exchange_name or self.config.default_exchange
78 routing_key = routing_key or self.config.default_routing_key
80 try:
81 # Create exchange
82 exchange = Exchange(exchange_name, type="topic", durable=True)
84 # Create queue
85 queue = Queue(queue_name, exchange=exchange, routing_key=routing_key)
87 # Bind queue (ensure it exists)
88 queue.maybe_bind(self.connection)
89 queue.declare()
91 # Publish message
92 self.producer.publish(
93 message,
94 exchange=exchange,
95 routing_key=routing_key,
96 serializer="json",
97 content_type="application/json",
98 declare=[queue], # Ensure queue is declared before publishing
99 )
101 # Get queue stats
102 with self.connection.channel() as channel:
103 queue_info = queue.queue_declare(passive=True, channel=channel)
104 message_count = queue_info.message_count
105 consumer_count = queue_info.consumer_count
107 publish_info = {
108 "queue": queue_name,
109 "exchange": exchange_name,
110 "routing_key": routing_key,
111 "message_count": message_count,
112 "consumer_count": consumer_count,
113 "serializer": "json",
114 "content_type": "application/json",
115 }
117 return True, "Message published successfully", publish_info
119 except Exception as e:
120 return False, f"Publish failed: {e}", {}
122 def get_queue_stats(self, queue_name: str | None = None) -> tuple[bool, dict[str, Any]]:
123 """
124 Get queue statistics.
126 Args:
127 queue_name: Queue name (defaults to config)
129 Returns:
130 Tuple of (success, stats_dict)
131 """
132 if not self.connection:
133 return False, {"error": "Not connected to RabbitMQ"}
135 queue_name = queue_name or self.config.default_queue
137 try:
138 exchange = Exchange(self.config.default_exchange, type="topic", durable=True)
139 queue = Queue(queue_name, exchange=exchange, routing_key=self.config.default_routing_key)
141 with self.connection.channel() as channel:
142 info = queue.queue_declare(passive=True, channel=channel)
144 stats = {
145 "queue_name": queue_name,
146 "message_count": info.message_count,
147 "consumer_count": info.consumer_count,
148 }
150 return True, stats
152 except Exception as e:
153 return False, {"error": f"Failed to get queue stats: {e}"}
155 def __enter__(self) -> "RabbitMQClient":
156 """Context manager entry."""
157 success, message = self.connect()
158 if not success:
159 raise ConnectionError(message)
160 return self
162 def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
163 """Context manager exit."""
164 self.disconnect()
167class CeleryClient:
168 """Celery client for submitting tasks directly."""
170 def __init__(self, config: QueueConfig):
171 """
172 Initialize Celery client.
174 Args:
175 config: Queue configuration
176 """
177 self.config = config
178 self.app = Celery("queue_cli", broker=config.broker_url, backend=config.result_backend)
179 self.app.conf.update(
180 task_serializer="json",
181 accept_content=["json"],
182 result_serializer="json",
183 timezone="UTC",
184 enable_utc=True,
185 )
187 def send_task(
188 self, task_name: str, args: tuple = (), kwargs: dict | None = None
189 ) -> tuple[bool, str, dict[str, Any]]:
190 """
191 Send a task to Celery.
193 Args:
194 task_name: Full task name (e.g., 'celery_tasks.match_tasks.process_match_data')
195 args: Positional arguments
196 kwargs: Keyword arguments
198 Returns:
199 Tuple of (success, task_id, task_info)
200 """
201 try:
202 task = self.app.send_task(task_name, args=args, kwargs=kwargs or {})
204 task_info = {
205 "task_id": task.id,
206 "task_name": task_name,
207 "state": task.state,
208 }
210 return True, task.id, task_info
212 except Exception as e:
213 return False, "", {"error": f"Failed to send task: {e}"}
215 def get_task_result(self, task_id: str, timeout: float = 1.0) -> dict[str, Any]:
216 """
217 Get task result.
219 Args:
220 task_id: Task ID
221 timeout: Timeout in seconds
223 Returns:
224 Task result info
225 """
226 try:
227 result = self.app.AsyncResult(task_id)
229 info = {
230 "task_id": task_id,
231 "state": result.state,
232 "ready": result.ready(),
233 "successful": result.successful() if result.ready() else None,
234 }
236 if result.ready():
237 if result.successful():
238 info["result"] = result.result
239 else:
240 info["error"] = str(result.info)
242 return info
244 except Exception as e:
245 return {"task_id": task_id, "error": f"Failed to get result: {e}"}