Coverage for queue_cli/commands/send.py: 0.00%

93 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2026-04-13 11:37 +0000

1""" 

2Send command - publishes messages to RabbitMQ queue. 

3""" 

4 

5import json 

6from pathlib import Path 

7from typing import Any 

8 

9import typer 

10 

11from ..core.config import QueueConfig 

12from ..core.rabbitmq import CeleryClient, RabbitMQClient 

13from ..core.schema import SchemaValidator 

14from ..core.templates import TemplateManager 

15from ..utils.display import ( 

16 console, 

17 print_connection_info, 

18 print_debug_mode, 

19 print_divider, 

20 print_error, 

21 print_header, 

22 print_info, 

23 print_json, 

24 print_next_steps, 

25 print_publish_info, 

26 print_success, 

27 print_task_info, 

28 print_validation_result, 

29) 

30 

31 

32def send_message( 

33 template: str | None = typer.Option(None, "--template", "-t", help="Template name to use"), 

34 file: Path | None = typer.Option(None, "--file", "-f", help="JSON file to send"), 

35 json_str: str | None = typer.Option(None, "--json", "-j", help="JSON string to send"), 

36 queue: str | None = typer.Option(None, "--queue", "-q", help="Target queue name"), 

37 use_celery: bool = typer.Option(True, "--celery/--rabbitmq", help="Use Celery task vs direct RabbitMQ publish"), 

38 debug: bool = typer.Option(False, "--debug", "-d", help="Enable debug output"), 

39) -> None: 

40 """ 

41 Send a message to the queue. 

42 

43 Examples: 

44 # Send using a template 

45 uv run python queue_cli.py send --template completed-match 

46 

47 # Send from a file 

48 uv run python queue_cli.py send --file my-match.json 

49 

50 # Send JSON string 

51 uv run python queue_cli.py send --json '{"home_team": "..."}' 

52 

53 # Use direct RabbitMQ publish (for learning) 

54 uv run python queue_cli.py send --template tbd-match --rabbitmq 

55 

56 # Enable debug mode 

57 uv run python queue_cli.py send --template minimal --debug 

58 """ 

59 print_header("Sending Message to Queue") 

60 

61 if debug: 

62 print_debug_mode() 

63 

64 # Load configuration 

65 config = QueueConfig.from_env() 

66 

67 if debug: 

68 print_divider() 

69 print_info(f"Broker: {config.get_sanitized_broker_url()}") 

70 print_info(f"Result Backend: {config.get_sanitized_result_backend()}") 

71 print_info(f"Default Queue: {config.default_queue}") 

72 print_divider() 

73 

74 # Load message data 

75 message_data = _load_message_data(template, file, json_str) 

76 if not message_data: 

77 console.print("[red]❌ Error: Must provide --template, --file, or --json[/red]") 

78 raise typer.Exit(1) 

79 

80 # Validate message 

81 validator = SchemaValidator() 

82 is_valid, errors, warnings = validator.validate(message_data) 

83 

84 print_validation_result(is_valid, errors, warnings) 

85 

86 if not is_valid: 

87 console.print("\n[red]❌ Message validation failed. Cannot send.[/red]") 

88 raise typer.Exit(1) 

89 

90 # Translate to internal schema 

91 internal_data = validator.translate_to_internal(message_data) 

92 

93 if debug: 

94 print_json(internal_data, "Internal Message Format") 

95 

96 # Send message 

97 queue_name = queue or config.default_queue 

98 

99 if use_celery: 

100 _send_via_celery(config, internal_data, debug) 

101 else: 

102 _send_via_rabbitmq(config, internal_data, queue_name, debug) 

103 

104 # Print next steps 

105 if use_celery: 

106 # Task ID will be printed in _send_via_celery 

107 pass 

108 else: 

109 print_next_steps("N/A", queue_name) 

110 

111 

112def _load_message_data(template: str | None, file: Path | None, json_str: str | None) -> dict[str, Any] | None: 

113 """Load message data from various sources.""" 

114 if template: 

115 template_mgr = TemplateManager() 

116 data = template_mgr.load_template(template) 

117 if not data: 

118 console.print(f"[red]❌ Template not found: {template}[/red]") 

119 raise typer.Exit(1) 

120 return data 

121 

122 if file: 

123 if not file.exists(): 

124 console.print(f"[red]❌ File not found: {file}[/red]") 

125 raise typer.Exit(1) 

126 

127 with open(file) as f: 

128 return json.load(f) 

129 

130 if json_str: 

131 try: 

132 return json.loads(json_str) 

133 except json.JSONDecodeError as e: 

134 console.print(f"[red]❌ Invalid JSON: {e}[/red]") 

135 raise typer.Exit(1) from e 

136 

137 return None 

138 

139 

140def _send_via_celery(config: QueueConfig, message_data: dict[str, Any], debug: bool) -> None: 

141 """Send message via Celery task.""" 

142 print_connection_info(config.get_sanitized_broker_url(), True) 

143 

144 celery_client = CeleryClient(config) 

145 

146 task_name = "celery_tasks.match_tasks.process_match_data" 

147 

148 if debug: 

149 print_info(f"Task Name: {task_name}") 

150 print_info("Submitting task...") 

151 

152 success, task_id, task_info = celery_client.send_task(task_name, kwargs={"match_data": message_data}) 

153 

154 if success: 

155 print_success("Task submitted successfully!") 

156 print_task_info(task_id, task_name, task_info.get("state", "PENDING")) 

157 print_next_steps(task_id, config.default_queue) 

158 else: 

159 error = task_info.get("error", "Unknown error") 

160 print_error(f"Failed to submit task: {error}") 

161 raise typer.Exit(1) 

162 

163 

164def _send_via_rabbitmq(config: QueueConfig, message_data: dict[str, Any], queue_name: str, debug: bool) -> None: 

165 """Send message via direct RabbitMQ publish.""" 

166 print_connection_info(config.get_sanitized_broker_url(), False) 

167 

168 try: 

169 with RabbitMQClient(config) as client: 

170 print_connection_info(config.get_sanitized_broker_url(), True) 

171 

172 if debug: 

173 print_info(f"Publishing to queue: {queue_name}") 

174 print_info(f"Exchange: {config.default_exchange}") 

175 print_info(f"Routing Key: {config.default_routing_key}") 

176 

177 success, message, publish_info = client.publish_message(message_data, queue_name=queue_name) 

178 

179 if success: 

180 print_success(message) 

181 print_publish_info( 

182 queue=publish_info["queue"], 

183 exchange=publish_info["exchange"], 

184 routing_key=publish_info["routing_key"], 

185 message_count=publish_info["message_count"], 

186 consumer_count=publish_info["consumer_count"], 

187 ) 

188 else: 

189 print_error(f"Failed to publish: {message}") 

190 raise typer.Exit(1) 

191 

192 except Exception as e: 

193 print_error(f"Connection error: {e}") 

194 raise typer.Exit(1) from e