Coverage for cache_cli.py: 0.00%
235 statements
« prev ^ index » next coverage.py v7.10.6, created at 2026-04-15 12:24 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2026-04-15 12:24 +0000
1#!/usr/bin/env python
2"""
3Cache inspection CLI tool.
5A command-line tool for inspecting and managing the Redis cache.
7Usage:
8 uv run python cache_cli.py --help
9 uv run python cache_cli.py list
10 uv run python cache_cli.py stats
11 uv run python cache_cli.py get mt:teams:all
12 uv run python cache_cli.py delete mt:teams:all
13 uv run python cache_cli.py flush
14"""
16import json
17import os
18from datetime import datetime
20import typer
21from rich import box
22from rich.console import Console
23from rich.panel import Panel
24from rich.table import Table
25from rich.text import Text
26from rich.tree import Tree
28app = typer.Typer(
29 name="cache-cli",
30 help="Inspect and manage the Redis cache for MissingTable",
31 add_completion=False,
32)
33console = Console()
36def get_redis_source():
37 """Read Redis source info from .mt-config."""
38 config_path = os.path.join(os.path.dirname(__file__), "..", ".mt-config")
39 config = {}
40 try:
41 with open(config_path) as f:
42 for line in f:
43 line = line.strip()
44 if line and not line.startswith("#") and "=" in line:
45 key, value = line.split("=", 1)
46 config[key] = value
47 except FileNotFoundError:
48 pass
49 source = config.get("redis_source", "local")
50 if source == "cloud":
51 context = config.get("cloud_context", "not configured")
52 return f"cloud ({context})"
53 return f"local ({config.get('local_context', 'rancher-desktop')})"
56def get_redis_client():
57 """Get Redis client from environment or default."""
58 import redis
60 url = os.getenv("REDIS_URL", "redis://localhost:6379/0")
61 try:
62 client = redis.from_url(url, decode_responses=True)
63 client.ping()
64 console.print(f"[dim]Redis: {get_redis_source()} ({url})[/dim]")
65 return client
66 except redis.RedisError as e:
67 console.print(f"[red]Error connecting to Redis:[/red] {e}")
68 console.print(f"[dim]URL: {url}[/dim]")
69 console.print(f"[dim]Configured source: {get_redis_source()}[/dim]")
70 raise typer.Exit(1) from None
73def format_bytes(size: int) -> str:
74 """Format bytes into human-readable string."""
75 for unit in ["B", "KB", "MB", "GB"]:
76 if size < 1024:
77 return f"{size:.1f} {unit}"
78 size /= 1024
79 return f"{size:.1f} TB"
82def format_ttl(ttl: int) -> str:
83 """Format TTL into human-readable string."""
84 if ttl == -1:
85 return "[dim]no expiry[/dim]"
86 if ttl == -2:
87 return "[red]expired[/red]"
88 if ttl < 60:
89 return f"{ttl}s"
90 if ttl < 3600:
91 return f"{ttl // 60}m {ttl % 60}s"
92 return f"{ttl // 3600}h {(ttl % 3600) // 60}m"
95@app.command(name="list")
96def list_keys(
97 pattern: str = typer.Option("mt:*", "--pattern", "-p", help="Key pattern to match"),
98 show_ttl: bool = typer.Option(True, "--ttl/--no-ttl", help="Show TTL for each key"),
99 show_size: bool = typer.Option(True, "--size/--no-size", help="Show size for each key"),
100):
101 """List all cache keys matching a pattern."""
102 r = get_redis_client()
104 keys = list(r.scan_iter(pattern))
106 if not keys:
107 console.print(f"[yellow]No keys found matching pattern:[/yellow] {pattern}")
108 raise typer.Exit(0)
110 table = Table(title=f"Cache Keys ({len(keys)} total)", box=box.ROUNDED)
111 table.add_column("Key", style="cyan", no_wrap=True, overflow="fold")
112 if show_ttl:
113 table.add_column("TTL", justify="right")
114 if show_size:
115 table.add_column("Size", justify="right")
116 table.add_column("Type", style="dim")
118 for key in sorted(keys):
119 # Use Text object to prevent Rich markup/emoji interpretation
120 key_text = Text(key, style="cyan")
121 row = [key_text]
123 if show_ttl:
124 ttl = r.ttl(key)
125 row.append(format_ttl(ttl))
127 if show_size:
128 try:
129 size = r.memory_usage(key) or 0
130 row.append(format_bytes(size))
131 except Exception:
132 row.append("[dim]n/a[/dim]")
134 key_type = r.type(key)
135 row.append(key_type)
137 table.add_row(*row)
139 console.print(table)
142@app.command()
143def stats():
144 """Show cache statistics."""
145 r = get_redis_client()
147 # Get all keys with our prefix
148 prefix = os.getenv("CACHE_KEY_PREFIX", "mt")
149 keys = list(r.scan_iter(f"{prefix}:*"))
151 # Calculate stats
152 total_size = 0
153 by_domain = {}
155 for key in keys:
156 try:
157 size = r.memory_usage(key) or 0
158 total_size += size
159 except Exception:
160 size = 0
162 # Extract domain from key (e.g., "mt:clubs:all" -> "clubs")
163 parts = key.split(":")
164 if len(parts) >= 2:
165 domain = parts[1]
166 if domain not in by_domain:
167 by_domain[domain] = {"count": 0, "size": 0}
168 by_domain[domain]["count"] += 1
169 by_domain[domain]["size"] += size
171 # Get Redis info
172 info = r.info("memory")
174 # Build stats panel
175 stats_table = Table(box=box.SIMPLE, show_header=False, padding=(0, 2))
176 stats_table.add_column("Metric", style="bold")
177 stats_table.add_column("Value", style="cyan")
179 stats_table.add_row("Total Keys", str(len(keys)))
180 stats_table.add_row("Cache Size", format_bytes(total_size))
181 stats_table.add_row("Redis Memory", format_bytes(info.get("used_memory", 0)))
182 stats_table.add_row("Peak Memory", format_bytes(info.get("used_memory_peak", 0)))
184 console.print(Panel(stats_table, title="Cache Statistics", border_style="blue"))
186 # Domain breakdown
187 if by_domain:
188 domain_table = Table(title="By Domain", box=box.ROUNDED)
189 domain_table.add_column("Domain", style="cyan")
190 domain_table.add_column("Keys", justify="right")
191 domain_table.add_column("Size", justify="right")
193 for domain, data in sorted(by_domain.items()):
194 domain_table.add_row(
195 domain,
196 str(data["count"]),
197 format_bytes(data["size"]),
198 )
200 console.print(domain_table)
203@app.command()
204def get(
205 key: str = typer.Argument(..., help="Cache key to retrieve"),
206 raw: bool = typer.Option(False, "--raw", "-r", help="Output raw JSON without formatting"),
207):
208 """Get and display a cached value."""
209 r = get_redis_client()
211 if not r.exists(key):
212 console.print(f"[red]Key not found:[/red] {key}")
213 raise typer.Exit(1)
215 key_type = r.type(key)
216 ttl = r.ttl(key)
218 if key_type == "string":
219 value = r.get(key)
220 try:
221 data = json.loads(value)
222 if raw:
223 console.print_json(json.dumps(data))
224 else:
225 # Show metadata
226 meta_table = Table(box=box.SIMPLE, show_header=False)
227 meta_table.add_column("", style="bold")
228 meta_table.add_column("")
229 meta_table.add_row("Key", Text(key, style="cyan"))
230 meta_table.add_row("TTL", format_ttl(ttl))
231 meta_table.add_row("Type", key_type)
233 if isinstance(data, list):
234 meta_table.add_row("Items", str(len(data)))
235 console.print(Panel(meta_table, title="Cache Entry", border_style="green"))
237 # Show preview of list items
238 if data:
239 preview_table = Table(
240 title=f"Data Preview (first 5 of {len(data)})",
241 box=box.ROUNDED,
242 )
244 # Use first item to determine columns
245 if isinstance(data[0], dict):
246 # Show select columns for readability
247 cols = list(data[0].keys())[:6]
248 for col in cols:
249 preview_table.add_column(col, overflow="ellipsis", max_width=20)
251 for item in data[:5]:
252 row = [str(item.get(col, ""))[:20] for col in cols]
253 preview_table.add_row(*row)
254 else:
255 preview_table.add_column("Value")
256 for item in data[:5]:
257 preview_table.add_row(str(item)[:50])
259 console.print(preview_table)
261 elif isinstance(data, dict):
262 meta_table.add_row("Fields", str(len(data)))
263 console.print(Panel(meta_table, title="Cache Entry", border_style="green"))
265 # Show dict contents
266 tree = Tree(Text(key, style="bold"))
267 for k, v in list(data.items())[:20]:
268 if isinstance(v, dict | list):
269 tree.add(f"[cyan]{k}[/cyan]: [dim]{type(v).__name__}[/dim]")
270 else:
271 tree.add(f"[cyan]{k}[/cyan]: {str(v)[:50]}")
272 console.print(tree)
273 else:
274 console.print(Panel(meta_table, title="Cache Entry", border_style="green"))
275 console.print(data)
277 except json.JSONDecodeError:
278 console.print("[yellow]Raw string value:[/yellow]")
279 console.print(value[:500])
280 else:
281 console.print(f"[yellow]Unsupported type:[/yellow] {key_type}")
284@app.command()
285def delete(
286 pattern: str = typer.Argument(..., help="Key or pattern to delete (use * for wildcards)"),
287 force: bool = typer.Option(False, "--force", "-f", help="Skip confirmation"),
288):
289 """Delete cache keys matching a pattern."""
290 r = get_redis_client()
292 keys = list(r.scan_iter(pattern)) if "*" in pattern else ([pattern] if r.exists(pattern) else [])
294 if not keys:
295 console.print(f"[yellow]No keys found matching:[/yellow] {pattern}")
296 raise typer.Exit(0)
298 # Show keys to be deleted
299 console.print(f"[bold]Keys to delete ({len(keys)}):[/bold]")
300 for key in keys[:10]:
301 console.print(f" [red]- {key}[/red]")
302 if len(keys) > 10:
303 console.print(f" [dim]... and {len(keys) - 10} more[/dim]")
305 if not force:
306 confirm = typer.confirm(f"Delete {len(keys)} key(s)?")
307 if not confirm:
308 console.print("[dim]Cancelled[/dim]")
309 raise typer.Exit(0)
311 # Delete keys
312 deleted = r.delete(*keys)
313 console.print(f"[green]Deleted {deleted} key(s)[/green]")
316@app.command()
317def flush(
318 force: bool = typer.Option(False, "--force", "-f", help="Skip confirmation"),
319):
320 """Flush all cache keys with our prefix."""
321 r = get_redis_client()
323 prefix = os.getenv("CACHE_KEY_PREFIX", "mt")
324 pattern = f"{prefix}:*"
325 keys = list(r.scan_iter(pattern))
327 if not keys:
328 console.print("[yellow]Cache is already empty[/yellow]")
329 raise typer.Exit(0)
331 console.print(f"[bold red]This will delete {len(keys)} cache keys![/bold red]")
333 if not force:
334 confirm = typer.confirm("Are you sure?")
335 if not confirm:
336 console.print("[dim]Cancelled[/dim]")
337 raise typer.Exit(0)
339 deleted = r.delete(*keys)
340 console.print(f"[green]Flushed {deleted} key(s)[/green]")
343@app.command()
344def monitor(
345 duration: int = typer.Option(10, "--duration", "-d", help="Duration in seconds"),
346):
347 """Monitor cache operations in real-time."""
348 r = get_redis_client()
350 console.print(f"[bold]Monitoring Redis commands for {duration} seconds...[/bold]")
351 console.print("[dim]Press Ctrl+C to stop[/dim]\n")
353 try:
354 pubsub = r.pubsub()
355 # Note: MONITOR is not available via pubsub, using keyspace notifications instead
356 r.config_set("notify-keyspace-events", "KEA")
358 prefix = os.getenv("CACHE_KEY_PREFIX", "mt")
359 pubsub.psubscribe(f"__keyspace@0__:{prefix}:*")
361 import time
363 start = time.time()
364 while time.time() - start < duration:
365 message = pubsub.get_message(timeout=1.0)
366 if message and message["type"] == "pmessage":
367 channel = message["channel"]
368 operation = message["data"]
369 key = channel.split(":", 1)[1] if ":" in channel else channel
371 color = {
372 "set": "green",
373 "del": "red",
374 "expire": "yellow",
375 "get": "cyan",
376 }.get(operation, "white")
378 timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3]
379 console.print(f"[dim]{timestamp}[/dim] [{color}]{operation:6}[/{color}] {key}")
381 except KeyboardInterrupt:
382 console.print("\n[dim]Stopped monitoring[/dim]")
383 finally:
384 pubsub.close()
387if __name__ == "__main__":
388 app()