Coverage for queue_cli/commands/send.py: 0.00%
93 statements
« prev ^ index » next coverage.py v7.10.6, created at 2026-04-15 17:36 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2026-04-15 17:36 +0000
1"""
2Send command - publishes messages to RabbitMQ queue.
3"""
5import json
6from pathlib import Path
7from typing import Any
9import typer
11from ..core.config import QueueConfig
12from ..core.rabbitmq import CeleryClient, RabbitMQClient
13from ..core.schema import SchemaValidator
14from ..core.templates import TemplateManager
15from ..utils.display import (
16 console,
17 print_connection_info,
18 print_debug_mode,
19 print_divider,
20 print_error,
21 print_header,
22 print_info,
23 print_json,
24 print_next_steps,
25 print_publish_info,
26 print_success,
27 print_task_info,
28 print_validation_result,
29)
32def send_message(
33 template: str | None = typer.Option(None, "--template", "-t", help="Template name to use"),
34 file: Path | None = typer.Option(None, "--file", "-f", help="JSON file to send"),
35 json_str: str | None = typer.Option(None, "--json", "-j", help="JSON string to send"),
36 queue: str | None = typer.Option(None, "--queue", "-q", help="Target queue name"),
37 use_celery: bool = typer.Option(True, "--celery/--rabbitmq", help="Use Celery task vs direct RabbitMQ publish"),
38 debug: bool = typer.Option(False, "--debug", "-d", help="Enable debug output"),
39) -> None:
40 """
41 Send a message to the queue.
43 Examples:
44 # Send using a template
45 uv run python queue_cli.py send --template completed-match
47 # Send from a file
48 uv run python queue_cli.py send --file my-match.json
50 # Send JSON string
51 uv run python queue_cli.py send --json '{"home_team": "..."}'
53 # Use direct RabbitMQ publish (for learning)
54 uv run python queue_cli.py send --template tbd-match --rabbitmq
56 # Enable debug mode
57 uv run python queue_cli.py send --template minimal --debug
58 """
59 print_header("Sending Message to Queue")
61 if debug:
62 print_debug_mode()
64 # Load configuration
65 config = QueueConfig.from_env()
67 if debug:
68 print_divider()
69 print_info(f"Broker: {config.get_sanitized_broker_url()}")
70 print_info(f"Result Backend: {config.get_sanitized_result_backend()}")
71 print_info(f"Default Queue: {config.default_queue}")
72 print_divider()
74 # Load message data
75 message_data = _load_message_data(template, file, json_str)
76 if not message_data:
77 console.print("[red]❌ Error: Must provide --template, --file, or --json[/red]")
78 raise typer.Exit(1)
80 # Validate message
81 validator = SchemaValidator()
82 is_valid, errors, warnings = validator.validate(message_data)
84 print_validation_result(is_valid, errors, warnings)
86 if not is_valid:
87 console.print("\n[red]❌ Message validation failed. Cannot send.[/red]")
88 raise typer.Exit(1)
90 # Translate to internal schema
91 internal_data = validator.translate_to_internal(message_data)
93 if debug:
94 print_json(internal_data, "Internal Message Format")
96 # Send message
97 queue_name = queue or config.default_queue
99 if use_celery:
100 _send_via_celery(config, internal_data, debug)
101 else:
102 _send_via_rabbitmq(config, internal_data, queue_name, debug)
104 # Print next steps
105 if use_celery:
106 # Task ID will be printed in _send_via_celery
107 pass
108 else:
109 print_next_steps("N/A", queue_name)
112def _load_message_data(template: str | None, file: Path | None, json_str: str | None) -> dict[str, Any] | None:
113 """Load message data from various sources."""
114 if template:
115 template_mgr = TemplateManager()
116 data = template_mgr.load_template(template)
117 if not data:
118 console.print(f"[red]❌ Template not found: {template}[/red]")
119 raise typer.Exit(1)
120 return data
122 if file:
123 if not file.exists():
124 console.print(f"[red]❌ File not found: {file}[/red]")
125 raise typer.Exit(1)
127 with open(file) as f:
128 return json.load(f)
130 if json_str:
131 try:
132 return json.loads(json_str)
133 except json.JSONDecodeError as e:
134 console.print(f"[red]❌ Invalid JSON: {e}[/red]")
135 raise typer.Exit(1) from e
137 return None
140def _send_via_celery(config: QueueConfig, message_data: dict[str, Any], debug: bool) -> None:
141 """Send message via Celery task."""
142 print_connection_info(config.get_sanitized_broker_url(), True)
144 celery_client = CeleryClient(config)
146 task_name = "celery_tasks.match_tasks.process_match_data"
148 if debug:
149 print_info(f"Task Name: {task_name}")
150 print_info("Submitting task...")
152 success, task_id, task_info = celery_client.send_task(task_name, kwargs={"match_data": message_data})
154 if success:
155 print_success("Task submitted successfully!")
156 print_task_info(task_id, task_name, task_info.get("state", "PENDING"))
157 print_next_steps(task_id, config.default_queue)
158 else:
159 error = task_info.get("error", "Unknown error")
160 print_error(f"Failed to submit task: {error}")
161 raise typer.Exit(1)
164def _send_via_rabbitmq(config: QueueConfig, message_data: dict[str, Any], queue_name: str, debug: bool) -> None:
165 """Send message via direct RabbitMQ publish."""
166 print_connection_info(config.get_sanitized_broker_url(), False)
168 try:
169 with RabbitMQClient(config) as client:
170 print_connection_info(config.get_sanitized_broker_url(), True)
172 if debug:
173 print_info(f"Publishing to queue: {queue_name}")
174 print_info(f"Exchange: {config.default_exchange}")
175 print_info(f"Routing Key: {config.default_routing_key}")
177 success, message, publish_info = client.publish_message(message_data, queue_name=queue_name)
179 if success:
180 print_success(message)
181 print_publish_info(
182 queue=publish_info["queue"],
183 exchange=publish_info["exchange"],
184 routing_key=publish_info["routing_key"],
185 message_count=publish_info["message_count"],
186 consumer_count=publish_info["consumer_count"],
187 )
188 else:
189 print_error(f"Failed to publish: {message}")
190 raise typer.Exit(1)
192 except Exception as e:
193 print_error(f"Connection error: {e}")
194 raise typer.Exit(1) from e