Coverage for queue_cli/core/rabbitmq.py: 0.00%

83 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2026-04-13 14:11 +0000

1""" 

2RabbitMQ connection and message publishing. 

3 

4Uses kombu (the same library Celery uses) for educational consistency. 

5""" 

6 

7from typing import Any 

8 

9from celery import Celery 

10from kombu import Connection, Exchange, Producer, Queue 

11 

12from .config import QueueConfig 

13 

14 

15class RabbitMQClient: 

16 """RabbitMQ client for publishing messages.""" 

17 

18 def __init__(self, config: QueueConfig): 

19 """ 

20 Initialize RabbitMQ client. 

21 

22 Args: 

23 config: Queue configuration 

24 """ 

25 self.config = config 

26 self.connection: Connection | None = None 

27 self.producer: Producer | None = None 

28 

29 def connect(self) -> tuple[bool, str]: 

30 """ 

31 Connect to RabbitMQ broker. 

32 

33 Returns: 

34 Tuple of (success, message) 

35 """ 

36 try: 

37 self.connection = Connection(self.config.broker_url) 

38 self.connection.connect() 

39 

40 # Create producer 

41 self.producer = Producer(self.connection) 

42 

43 return True, f"Connected to {self.config.get_sanitized_broker_url()}" 

44 except Exception as e: 

45 return False, f"Connection failed: {e}" 

46 

47 def disconnect(self) -> None: 

48 """Disconnect from RabbitMQ broker.""" 

49 if self.connection: 

50 self.connection.release() 

51 self.connection = None 

52 self.producer = None 

53 

54 def publish_message( 

55 self, 

56 message: dict[str, Any], 

57 queue_name: str | None = None, 

58 exchange_name: str | None = None, 

59 routing_key: str | None = None, 

60 ) -> tuple[bool, str, dict[str, Any]]: 

61 """ 

62 Publish a message to RabbitMQ. 

63 

64 Args: 

65 message: Message payload 

66 queue_name: Target queue name (defaults to config) 

67 exchange_name: Exchange name (defaults to config) 

68 routing_key: Routing key (defaults to config) 

69 

70 Returns: 

71 Tuple of (success, message_id, publish_info) 

72 """ 

73 if not self.connection or not self.producer: 

74 return False, "", {"error": "Not connected to RabbitMQ"} 

75 

76 queue_name = queue_name or self.config.default_queue 

77 exchange_name = exchange_name or self.config.default_exchange 

78 routing_key = routing_key or self.config.default_routing_key 

79 

80 try: 

81 # Create exchange 

82 exchange = Exchange(exchange_name, type="topic", durable=True) 

83 

84 # Create queue 

85 queue = Queue(queue_name, exchange=exchange, routing_key=routing_key) 

86 

87 # Bind queue (ensure it exists) 

88 queue.maybe_bind(self.connection) 

89 queue.declare() 

90 

91 # Publish message 

92 self.producer.publish( 

93 message, 

94 exchange=exchange, 

95 routing_key=routing_key, 

96 serializer="json", 

97 content_type="application/json", 

98 declare=[queue], # Ensure queue is declared before publishing 

99 ) 

100 

101 # Get queue stats 

102 with self.connection.channel() as channel: 

103 queue_info = queue.queue_declare(passive=True, channel=channel) 

104 message_count = queue_info.message_count 

105 consumer_count = queue_info.consumer_count 

106 

107 publish_info = { 

108 "queue": queue_name, 

109 "exchange": exchange_name, 

110 "routing_key": routing_key, 

111 "message_count": message_count, 

112 "consumer_count": consumer_count, 

113 "serializer": "json", 

114 "content_type": "application/json", 

115 } 

116 

117 return True, "Message published successfully", publish_info 

118 

119 except Exception as e: 

120 return False, f"Publish failed: {e}", {} 

121 

122 def get_queue_stats(self, queue_name: str | None = None) -> tuple[bool, dict[str, Any]]: 

123 """ 

124 Get queue statistics. 

125 

126 Args: 

127 queue_name: Queue name (defaults to config) 

128 

129 Returns: 

130 Tuple of (success, stats_dict) 

131 """ 

132 if not self.connection: 

133 return False, {"error": "Not connected to RabbitMQ"} 

134 

135 queue_name = queue_name or self.config.default_queue 

136 

137 try: 

138 exchange = Exchange(self.config.default_exchange, type="topic", durable=True) 

139 queue = Queue(queue_name, exchange=exchange, routing_key=self.config.default_routing_key) 

140 

141 with self.connection.channel() as channel: 

142 info = queue.queue_declare(passive=True, channel=channel) 

143 

144 stats = { 

145 "queue_name": queue_name, 

146 "message_count": info.message_count, 

147 "consumer_count": info.consumer_count, 

148 } 

149 

150 return True, stats 

151 

152 except Exception as e: 

153 return False, {"error": f"Failed to get queue stats: {e}"} 

154 

155 def __enter__(self) -> "RabbitMQClient": 

156 """Context manager entry.""" 

157 success, message = self.connect() 

158 if not success: 

159 raise ConnectionError(message) 

160 return self 

161 

162 def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: 

163 """Context manager exit.""" 

164 self.disconnect() 

165 

166 

167class CeleryClient: 

168 """Celery client for submitting tasks directly.""" 

169 

170 def __init__(self, config: QueueConfig): 

171 """ 

172 Initialize Celery client. 

173 

174 Args: 

175 config: Queue configuration 

176 """ 

177 self.config = config 

178 self.app = Celery("queue_cli", broker=config.broker_url, backend=config.result_backend) 

179 self.app.conf.update( 

180 task_serializer="json", 

181 accept_content=["json"], 

182 result_serializer="json", 

183 timezone="UTC", 

184 enable_utc=True, 

185 ) 

186 

187 def send_task( 

188 self, task_name: str, args: tuple = (), kwargs: dict | None = None 

189 ) -> tuple[bool, str, dict[str, Any]]: 

190 """ 

191 Send a task to Celery. 

192 

193 Args: 

194 task_name: Full task name (e.g., 'celery_tasks.match_tasks.process_match_data') 

195 args: Positional arguments 

196 kwargs: Keyword arguments 

197 

198 Returns: 

199 Tuple of (success, task_id, task_info) 

200 """ 

201 try: 

202 task = self.app.send_task(task_name, args=args, kwargs=kwargs or {}) 

203 

204 task_info = { 

205 "task_id": task.id, 

206 "task_name": task_name, 

207 "state": task.state, 

208 } 

209 

210 return True, task.id, task_info 

211 

212 except Exception as e: 

213 return False, "", {"error": f"Failed to send task: {e}"} 

214 

215 def get_task_result(self, task_id: str, timeout: float = 1.0) -> dict[str, Any]: 

216 """ 

217 Get task result. 

218 

219 Args: 

220 task_id: Task ID 

221 timeout: Timeout in seconds 

222 

223 Returns: 

224 Task result info 

225 """ 

226 try: 

227 result = self.app.AsyncResult(task_id) 

228 

229 info = { 

230 "task_id": task_id, 

231 "state": result.state, 

232 "ready": result.ready(), 

233 "successful": result.successful() if result.ready() else None, 

234 } 

235 

236 if result.ready(): 

237 if result.successful(): 

238 info["result"] = result.result 

239 else: 

240 info["error"] = str(result.info) 

241 

242 return info 

243 

244 except Exception as e: 

245 return {"task_id": task_id, "error": f"Failed to get result: {e}"}