new: create mux-0kmtui tool

This commit is contained in:
Stéphan Sainléger
2026-02-28 00:13:06 +01:00
parent 5f935e9842
commit 68dec05436
8 changed files with 1136 additions and 0 deletions

95
mux-0kmtui/README.md Normal file
View File

@@ -0,0 +1,95 @@
# mux-0kmtui
TUI (Text User Interface) for running commands on multiple VPS simultaneously via `0km vps-mux`.
## Features
- **Multi-server selection**: Select multiple servers with the spacebar
- **Filter by name or tag**: Quickly search through your servers
- **Multi-line commands**: Write complete bash scripts in the editor
- **Script import**: Load `.sh` files to execute
- **Parallel execution**: Run commands on all selected servers at once
- **Real-time logs**: View output from each server in dedicated tabs
- **Closable tabs**: Close individual tabs when done
## Installation
```bash
cd mux-0kmtui
pip install -e .
```
## Usage
```bash
mux-0kmtui
```
### Workflow
1. **Filter** (optional): Type in the search bar to filter by name or tag
2. **Select**: Check the servers (Space to toggle, or All/None buttons)
3. **Command**: Type a command or load a script (Ctrl+O)
4. **Execute**: Press `Ctrl+R` or click "Run"
5. **Monitor**: View logs from each server in the tabs
6. **Clean up**: Close tabs with the X button
### Keyboard Shortcuts
| Key | Action |
|-----|--------|
| `F1` | Show help |
| `Ctrl+Q` | Quit |
| `Ctrl+R` | Run command |
| `Ctrl+O` | Load .sh file |
| `Ctrl+L` | Close all tabs |
| `Ctrl+A` | Select all visible servers |
| `Ctrl+D` | Deselect all visible servers |
| `Escape` | Focus command input |
| `Space` | Toggle server selection |
## Configuration
Configuration is read from (in order of priority):
1. `~/.config/0kmtui/config.yml`
2. `~/.config/0km/config.yml`
3. `/etc/0km/config.yml`
### Format
```yaml
hosts:
my-server.com:
host: my-server.com
tags: [prod, web]
staging.example.com:
host: staging.example.com
tags: [staging, api]
db-server:
host: 192.168.1.100
user: admin
port: 2222
tags: [prod, database]
```
### Fields
| Field | Description | Default |
|-------|-------------|---------|
| `host` | Hostname or IP | (entry name) |
| `user` | SSH user | `root` |
| `port` | SSH port | `22` |
| `tags` | List of tags for filtering | `[]` |
## Requirements
- Python 3.10+
- `0km` command installed and in PATH (from [myc-manage](https://git.myceliandre.fr/Myceliandre/myc-manage))
- SSH access to configured servers
## License
AGPL-3.0 - See [LICENSE](../LICENSE)

35
mux-0kmtui/pyproject.toml Normal file
View File

@@ -0,0 +1,35 @@
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "mux-0kmtui"
version = "0.1.0"
description = "TUI for running commands on multiple VPS via 0km vps-mux"
readme = "README.md"
requires-python = ">=3.10"
license = "AGPL-3.0-or-later"
authors = [
{ name = "Stéphan Sainléger", email = "stephan@sainleger.fr" }
]
dependencies = [
"textual>=0.47.0",
"pyyaml>=6.0",
]
[project.optional-dependencies]
dev = [
"pytest>=7.0",
"textual-dev>=1.0",
]
[project.scripts]
mux-0kmtui = "mux_0kmtui.app:main"
[tool.hatch.build.targets.wheel]
packages = ["src/mux_0kmtui"]
[tool.hatch.build.targets.sdist]
include = [
"/src",
]

View File

@@ -0,0 +1,3 @@
"""mux-0kmtui - TUI for running commands on multiple VPS."""
__version__ = "0.1.0"

View File

@@ -0,0 +1,444 @@
"""Main application for mux-0kmtui."""
import asyncio
import re
from pathlib import Path
from textual.app import App, ComposeResult
from textual.binding import Binding
from textual.containers import Horizontal, Vertical, VerticalScroll
from textual.screen import ModalScreen
from textual.widgets import Footer, Header, Button, Static, TextArea, Input, Markdown
from .config import Config
from .widgets import ServerSelector, LogTabs
def _build_help_text(tags: list[str]) -> str:
"""Build the help text with available tags."""
tags_str = ", ".join(f"`{t}`" for t in sorted(tags)) if tags else "*none*"
return f"""\
# mux-0kmtui - Help
## Global
| Shortcut | Action |
|----------|--------|
| `Ctrl+R` | Run command |
| `Ctrl+O` | Load .sh file |
| `Ctrl+L` | Close all log tabs |
| `F1` | Show this help |
| `Escape` | Focus command input |
| `Ctrl+Q` | Quit |
## Server Selection
| Shortcut | Action |
|----------|--------|
| `Space` | Toggle server selection |
| `Ctrl+A` | Select all visible servers |
| `Ctrl+D` | Deselect all visible servers |
| `Up/Down` | Navigate the list |
### Filtering
Type in the search bar to filter by:
- **Server name**: e.g. `elabore` → servers containing "elabore"
- **Tag**: e.g. `prod` → servers with the "prod" tag
Selected servers remain selected even when filtered out.
**Available tags**: {tags_str}
## Command Area
| Shortcut | Action |
|----------|--------|
| `Ctrl+R` | Run command |
| Multi-line | Supported (write a complete script) |
## Log Tabs
| Action | Description |
|--------|-------------|
| Click tab | Show server logs |
| `X` button | Close server tab |
| Scroll | Navigate through logs |
---
*Press Escape or q to close*
"""
class HelpScreen(ModalScreen):
"""Modal screen displaying help information."""
BINDINGS = [
Binding("escape", "dismiss", "Close"),
Binding("q", "dismiss", "Close"),
]
DEFAULT_CSS = """
HelpScreen {
align: center middle;
}
HelpScreen > Vertical {
width: 70;
height: 80%;
border: thick $primary;
background: $surface;
padding: 1 2;
}
HelpScreen Markdown {
margin: 1 0;
}
"""
def __init__(self, tags: list[str] | None = None) -> None:
super().__init__()
self._tags = tags or []
def compose(self) -> ComposeResult:
help_text = _build_help_text(self._tags)
with Vertical():
yield VerticalScroll(Markdown(help_text))
class MuxApp(App):
"""TUI for running commands on multiple VPS via 0km vps-mux."""
TITLE = "mux-0kmtui"
SUB_TITLE = "Multi-VPS Command Runner"
CSS = """
Screen {
background: $surface;
}
#main-container {
height: 1fr;
}
#left-panel {
width: 40;
height: 100%;
}
#right-panel {
width: 1fr;
height: 100%;
}
#command-area {
height: auto;
min-height: 10;
max-height: 18;
dock: bottom;
padding: 0 1;
background: $panel;
border-top: solid $primary;
}
#command-header {
height: 1;
padding: 0;
margin-bottom: 1;
}
#command-header > Static {
width: 1fr;
color: $text-muted;
}
#command-header > Button {
min-width: 12;
margin-left: 1;
border: none;
height: 1;
background: $primary;
color: $text;
text-style: bold;
padding: 0 1;
}
#command-input {
height: 1fr;
min-height: 3;
}
#file-input-area {
height: 3;
display: none;
padding: 0;
}
#file-input-area.visible {
display: block;
}
#file-path-input {
width: 1fr;
}
#selection-actions {
height: 3;
padding: 0 1;
}
#selection-actions Button {
margin-right: 1;
}
Header {
dock: top;
}
Footer {
dock: bottom;
}
"""
BINDINGS = [
Binding("ctrl+q", "quit", "Quit"),
Binding("ctrl+a", "select_all", "Select All", show=False),
Binding("ctrl+d", "deselect_all", "Deselect All", show=False),
Binding("ctrl+l", "clear_logs", "Clear Logs"),
Binding("ctrl+r", "run_command", "Run", show=True),
Binding("ctrl+o", "load_file", "Load .sh"),
Binding("escape", "focus_command", "Focus Command", show=False),
Binding("f1", "show_help", "Help"),
Binding("question_mark", "show_help", "Help", show=False),
]
def __init__(self) -> None:
super().__init__()
self.config = Config.load()
self._running_process: asyncio.subprocess.Process | None = None
def compose(self) -> ComposeResult:
yield Header()
with Horizontal(id="main-container"):
with Vertical(id="left-panel"):
yield ServerSelector(self.config.hosts, id="server-selector")
with Horizontal(id="selection-actions"):
yield Button("All", id="btn-all", variant="default")
yield Button("None", id="btn-none", variant="default")
with Vertical(id="right-panel"):
yield LogTabs(id="log-tabs")
with Vertical(id="command-area"):
with Horizontal(id="command-header"):
yield Static("Command (multi-line supported, Ctrl+R to run)")
yield Button("Load .sh", id="btn-load", variant="default")
yield Button("Run", id="run-btn", variant="primary")
yield TextArea(id="command-input")
with Horizontal(id="file-input-area"):
yield Input(
placeholder="Enter path to .sh file...",
id="file-path-input",
)
yield Button("OK", id="btn-load-ok", variant="primary")
yield Button("Cancel", id="btn-load-cancel", variant="default")
yield Footer()
def on_button_pressed(self, event: Button.Pressed) -> None:
"""Handle button presses."""
if event.button.id == "run-btn":
self.action_run_command()
elif event.button.id == "btn-all":
self.action_select_all()
elif event.button.id == "btn-none":
self.action_deselect_all()
elif event.button.id == "btn-load":
self.action_load_file()
elif event.button.id == "btn-load-ok":
self._do_load_file()
elif event.button.id == "btn-load-cancel":
self._hide_file_input()
def on_input_submitted(self, event: Input.Submitted) -> None:
"""Handle Enter in file path input."""
if event.input.id == "file-path-input":
self._do_load_file()
def action_select_all(self) -> None:
"""Select all servers."""
self.query_one("#server-selector", ServerSelector).select_all()
def action_deselect_all(self) -> None:
"""Deselect all servers."""
self.query_one("#server-selector", ServerSelector).deselect_all()
def action_clear_logs(self) -> None:
"""Clear all log tabs."""
self.query_one("#log-tabs", LogTabs).clear_all()
def action_focus_command(self) -> None:
"""Focus the command input."""
self.query_one("#command-input", TextArea).focus()
def action_show_help(self) -> None:
"""Show help screen."""
tags = set()
for host in self.config.hosts:
tags.update(host.tags)
self.push_screen(HelpScreen(tags=list(tags)))
def action_load_file(self) -> None:
"""Show file input area."""
file_area = self.query_one("#file-input-area")
file_area.add_class("visible")
file_input = self.query_one("#file-path-input", Input)
file_input.value = ""
file_input.focus()
def _hide_file_input(self) -> None:
"""Hide file input area."""
file_area = self.query_one("#file-input-area")
file_area.remove_class("visible")
self.query_one("#command-input", TextArea).focus()
def _do_load_file(self) -> None:
"""Load the specified file."""
file_input = self.query_one("#file-path-input", Input)
file_path = file_input.value.strip()
if not file_path:
self.notify("Please enter a file path", severity="warning")
return
path = Path(file_path).expanduser().resolve()
if not path.exists():
self.notify(f"File not found: {path}", severity="error")
return
if not path.is_file():
self.notify(f"Not a file: {path}", severity="error")
return
try:
content = path.read_text(encoding="utf-8")
text_area = self.query_one("#command-input", TextArea)
text_area.load_text(content)
self.notify(f"Loaded: {path.name}", severity="information")
self._hide_file_input()
except Exception as e:
self.notify(f"Error reading file: {e}", severity="error")
def action_run_command(self) -> None:
"""Run command on selected servers."""
text_area = self.query_one("#command-input", TextArea)
command = text_area.text.strip()
if not command:
self.notify("Please enter a command", severity="warning")
return
selector = self.query_one("#server-selector", ServerSelector)
selected = selector.get_selected()
if not selected:
self.notify("Please select at least one server", severity="warning")
return
asyncio.create_task(self._run_mux_command(command, [h.name for h in selected]))
async def _run_mux_command(self, command: str, servers: list[str]) -> None:
"""Execute 0km vps-mux command and stream output."""
log_tabs = self.query_one("#log-tabs", LogTabs)
first_line = command.split("\n")[0][:50]
if len(command.split("\n")) > 1 or len(command) > 50:
first_line += "..."
for server in servers:
panel = log_tabs.add_server(server)
panel.clear()
panel.write(f"[cyan]Running: {first_line}[/cyan]")
cmd = ["0km", "vps-mux"] + servers
self.notify(f"Running on {len(servers)} server(s)...")
try:
proc = await asyncio.create_subprocess_exec(
*cmd,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
)
self._running_process = proc
if proc.stdin:
proc.stdin.write(command.encode("utf-8"))
if not command.endswith("\n"):
proc.stdin.write(b"\n")
await proc.stdin.drain()
proc.stdin.close()
await proc.stdin.wait_closed()
line_pattern = re.compile(r"^(.+?)\s+([|!])\s*(.*)$")
if proc.stdout:
async for line in proc.stdout:
decoded = line.decode("utf-8", errors="replace").rstrip("\n")
clean_line = re.sub(r"\x1b\[[0-9;]*m", "", decoded)
match = line_pattern.match(clean_line)
if match:
server_name = match.group(1).strip()
is_stderr = match.group(2) == "!"
content = match.group(3)
panel = log_tabs.get_panel(server_name)
if panel:
if is_stderr:
panel.write_stderr(content)
else:
panel.write_stdout(content)
else:
for server in servers:
panel = log_tabs.get_panel(server)
if panel:
panel.write(decoded)
await proc.wait()
for server in servers:
panel = log_tabs.get_panel(server)
if panel:
panel.write("[dim]--- Command completed ---[/dim]")
self.notify("Command completed", severity="information")
except FileNotFoundError:
self.notify("Error: 0km command not found", severity="error")
for server in servers:
panel = log_tabs.get_panel(server)
if panel:
panel.write_stderr("Error: 0km command not found in PATH")
except Exception as e:
self.notify(f"Error: {e}", severity="error")
for server in servers:
panel = log_tabs.get_panel(server)
if panel:
panel.write_stderr(f"Error: {e}")
finally:
self._running_process = None
def main() -> None:
"""Entry point for the application."""
app = MuxApp()
app.run()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,167 @@
"""Configuration management for 0kmtui."""
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
import yaml
@dataclass
class VPSHost:
"""Represents a VPS host configuration."""
name: str
host: str
user: str = "root"
port: int = 22
tags: list[str] = field(default_factory=list)
@classmethod
def from_dict(cls, name: str, data: dict[str, Any] | str) -> "VPSHost":
"""Create VPSHost from dict or string."""
if isinstance(data, str):
return cls(name=name, host=data)
return cls(
name=name,
host=data.get("host", name),
user=data.get("user", "root"),
port=data.get("port", 22),
tags=data.get("tags", []),
)
@dataclass
class Config:
"""Application configuration."""
hosts: list[VPSHost] = field(default_factory=list)
ssh_public_keys: list[str] = field(default_factory=list)
default_backup_server: str | None = None
@classmethod
def get_config_paths(cls) -> list[Path]:
"""Return list of possible config file paths (first found takes priority for hosts)."""
return [
Path.home()
/ ".config"
/ "0kmtui"
/ "config.yml", # App-specific config first
Path.home() / ".config" / "0km" / "config.yml", # Shared 0km config
Path("/etc/0km/config.yml"), # System-wide config
]
@classmethod
def load(cls, path: Path | None = None) -> "Config":
"""Load configuration from YAML files, merging all found configs."""
if path is not None:
paths = [path]
else:
paths = cls.get_config_paths()
# Merge all config files found
merged_config = cls()
for config_path in paths:
if config_path.exists():
file_config = cls._load_from_file(config_path)
# Merge hosts (add new ones, don't overwrite existing)
existing_names = {h.name for h in merged_config.hosts}
for host in file_config.hosts:
if host.name not in existing_names:
merged_config.hosts.append(host)
# Merge SSH keys
for key in file_config.ssh_public_keys:
if key not in merged_config.ssh_public_keys:
merged_config.ssh_public_keys.append(key)
# Use first found backup server
if (
not merged_config.default_backup_server
and file_config.default_backup_server
):
merged_config.default_backup_server = (
file_config.default_backup_server
)
return merged_config
@classmethod
def _load_from_file(cls, path: Path) -> "Config":
"""Load config from a specific file."""
with open(path) as f:
data = yaml.safe_load(f) or {}
hosts = []
# Parse hosts section
hosts_data = data.get("hosts", {})
if isinstance(hosts_data, dict):
for name, host_data in hosts_data.items():
hosts.append(VPSHost.from_dict(name, host_data))
elif isinstance(hosts_data, list):
for item in hosts_data:
if isinstance(item, str):
hosts.append(VPSHost(name=item, host=item))
elif isinstance(item, dict):
for name, host_data in item.items():
hosts.append(VPSHost.from_dict(name, host_data))
# Parse SSH keys
ssh_keys = []
ssh_access = data.get("ssh-access", {})
if isinstance(ssh_access, dict):
ssh_keys = ssh_access.get("public-keys", [])
return cls(
hosts=hosts,
ssh_public_keys=ssh_keys,
default_backup_server=data.get("default-backup-server"),
)
def save(self, path: Path | None = None) -> None:
"""Save configuration to YAML file."""
if path is None:
path = Path.home() / ".config" / "0kmtui" / "config.yml"
path.parent.mkdir(parents=True, exist_ok=True)
data: dict[str, Any] = {}
if self.hosts:
data["hosts"] = {}
for host in self.hosts:
host_data: dict[str, Any] = {"host": host.host}
if host.user != "root":
host_data["user"] = host.user
if host.port != 22:
host_data["port"] = host.port
if host.tags:
host_data["tags"] = host.tags
data["hosts"][host.name] = host_data
if self.ssh_public_keys:
data["ssh-access"] = {"public-keys": self.ssh_public_keys}
if self.default_backup_server:
data["default-backup-server"] = self.default_backup_server
with open(path, "w") as f:
yaml.dump(data, f, default_flow_style=False, allow_unicode=True)
def add_host(self, host: VPSHost) -> None:
"""Add a new host to configuration."""
# Remove existing host with same name
self.hosts = [h for h in self.hosts if h.name != host.name]
self.hosts.append(host)
def remove_host(self, name: str) -> bool:
"""Remove a host by name. Returns True if found and removed."""
initial_len = len(self.hosts)
self.hosts = [h for h in self.hosts if h.name != name]
return len(self.hosts) < initial_len
def get_host(self, name: str) -> VPSHost | None:
"""Get a host by name."""
for host in self.hosts:
if host.name == name:
return host
return None

View File

@@ -0,0 +1,6 @@
"""Widgets for mux-0kmtui."""
from .server_selector import ServerSelector
from .log_panel import LogPanel, LogTabs
__all__ = ["ServerSelector", "LogPanel", "LogTabs"]

View File

@@ -0,0 +1,214 @@
"""Log panel widgets for displaying server output."""
import re
from textual.app import ComposeResult
from textual.containers import Vertical, Horizontal
from textual.message import Message
from textual.widgets import Static, RichLog, Button, TabbedContent, TabPane, Label
def _sanitize_id(name: str) -> str:
"""Convert a server name to a valid Textual ID."""
return re.sub(r"[^a-zA-Z0-9_-]", "-", name)
class LogPanel(Vertical):
"""A single server's log output panel."""
DEFAULT_CSS = """
LogPanel {
height: 100%;
border: solid $primary;
}
LogPanel .log-header {
height: 1;
background: $primary;
color: $text;
padding: 0 1;
}
LogPanel .log-header Label {
width: 1fr;
}
LogPanel .log-header Button {
min-width: 3;
height: 1;
border: none;
background: $primary;
}
LogPanel RichLog {
height: 1fr;
background: $surface;
}
"""
class CloseRequested(Message):
"""Message sent when close is requested."""
def __init__(self, server_name: str) -> None:
super().__init__()
self.server_name = server_name
def __init__(
self,
server_name: str,
*,
name: str | None = None,
id: str | None = None,
classes: str | None = None,
) -> None:
super().__init__(name=name, id=id, classes=classes)
self.server_name = server_name
self._log: RichLog | None = None
self._pending_writes: list[tuple[str, str | None]] = []
def compose(self) -> ComposeResult:
with Horizontal(classes="log-header"):
yield Label(self.server_name)
yield Button("X", id="close-btn", variant="error")
yield RichLog(id="log", highlight=True, markup=True, wrap=True)
def on_mount(self) -> None:
"""Cache the log widget and flush pending writes."""
self._log = self.query_one("#log", RichLog)
for text, style in self._pending_writes:
self._do_write(text, style)
self._pending_writes.clear()
def on_button_pressed(self, event: Button.Pressed) -> None:
"""Handle close button."""
if event.button.id == "close-btn":
self.post_message(self.CloseRequested(self.server_name))
def _do_write(self, text: str, style: str | None = None) -> None:
"""Actually write to the log."""
if self._log:
if style:
self._log.write(f"[{style}]{text}[/{style}]")
else:
self._log.write(text)
def write(self, text: str, style: str | None = None) -> None:
"""Write text to the log."""
if self._log:
self._do_write(text, style)
else:
self._pending_writes.append((text, style))
def write_stdout(self, text: str) -> None:
"""Write stdout text."""
self.write(text)
def write_stderr(self, text: str) -> None:
"""Write stderr text (in red)."""
self.write(text, "red")
def clear(self) -> None:
"""Clear the log."""
self._pending_writes.clear()
if self._log:
self._log.clear()
class LogTabs(Vertical):
"""Container with closable tabs for multiple server logs."""
DEFAULT_CSS = """
LogTabs {
height: 100%;
}
LogTabs .no-logs {
height: 100%;
content-align: center middle;
color: $text-muted;
}
LogTabs TabbedContent {
height: 100%;
}
LogTabs TabPane {
height: 100%;
padding: 0;
}
LogTabs TabPane > LogPanel {
height: 100%;
}
LogTabs ContentSwitcher {
height: 1fr;
}
"""
def __init__(
self,
*,
name: str | None = None,
id: str | None = None,
classes: str | None = None,
) -> None:
super().__init__(name=name, id=id, classes=classes)
self._panels: dict[str, LogPanel] = {}
def compose(self) -> ComposeResult:
yield Static(
"Run a command to see server logs here",
classes="no-logs",
id="placeholder",
)
yield TabbedContent(id="log-tabs")
def on_mount(self) -> None:
"""Hide tabs initially."""
self.query_one("#log-tabs", TabbedContent).display = False
def add_server(self, server_name: str) -> LogPanel:
"""Add a new server log tab."""
if server_name in self._panels:
return self._panels[server_name]
self.query_one("#placeholder").display = False
tabs = self.query_one("#log-tabs", TabbedContent)
tabs.display = True
safe_id = _sanitize_id(server_name)
panel = LogPanel(server_name, id=f"panel-{safe_id}")
self._panels[server_name] = panel
tab_pane = TabPane(server_name, panel, id=f"tab-{safe_id}")
tabs.add_pane(tab_pane)
return panel
def remove_server(self, server_name: str) -> None:
"""Remove a server log tab."""
if server_name not in self._panels:
return
safe_id = _sanitize_id(server_name)
tabs = self.query_one("#log-tabs", TabbedContent)
tabs.remove_pane(f"tab-{safe_id}")
del self._panels[server_name]
if not self._panels:
tabs.display = False
self.query_one("#placeholder").display = True
def get_panel(self, server_name: str) -> LogPanel | None:
"""Get panel for a server."""
return self._panels.get(server_name)
def clear_all(self) -> None:
"""Remove all server tabs."""
for name in list(self._panels.keys()):
self.remove_server(name)
def on_log_panel_close_requested(self, event: LogPanel.CloseRequested) -> None:
"""Handle panel close request."""
self.remove_server(event.server_name)

View File

@@ -0,0 +1,172 @@
"""Server selection widget with multi-select support."""
from textual.app import ComposeResult
from textual.containers import Vertical
from textual.message import Message
from textual.widgets import Static, SelectionList, Input
from textual.widgets.selection_list import Selection
from ..config import VPSHost
class ServerSelector(Vertical):
"""Widget for selecting multiple VPS servers with search/filter support."""
DEFAULT_CSS = """
ServerSelector {
border: solid $primary;
height: 100%;
}
ServerSelector .selector-header {
height: 1;
background: $primary;
color: $text;
padding: 0 1;
}
ServerSelector #search-input {
height: 1;
border: none;
padding: 0 1;
background: $surface;
}
ServerSelector #search-input:focus {
border: none;
}
ServerSelector .selector-info {
height: 1;
background: $surface;
color: $text-muted;
padding: 0 1;
}
ServerSelector SelectionList {
height: 1fr;
}
"""
class SelectionChanged(Message):
"""Message sent when selection changes."""
def __init__(self, selected: list[VPSHost]) -> None:
super().__init__()
self.selected = selected
def __init__(
self,
hosts: list[VPSHost],
*,
name: str | None = None,
id: str | None = None,
classes: str | None = None,
) -> None:
super().__init__(name=name, id=id, classes=classes)
self._hosts = {h.name: h for h in hosts}
self._selected_names: set[str] = set()
self._current_filter: str = ""
def compose(self) -> ComposeResult:
yield Static("Servers (Space=toggle)", classes="selector-header")
yield Input(placeholder="Filter by name or tag...", id="search-input")
yield Static("0 selected", id="selection-info", classes="selector-info")
yield SelectionList[str](
*self._build_selections(),
id="server-list",
)
def _build_selections(self) -> list[Selection[str]]:
"""Build selection items, filtered by current search."""
selections = []
filter_lower = self._current_filter.lower()
for host in self._hosts.values():
if filter_lower:
name_match = filter_lower in host.name.lower()
tag_match = any(filter_lower in tag.lower() for tag in host.tags)
if not (name_match or tag_match):
continue
if host.tags:
tags_str = " [" + ", ".join(host.tags) + "]"
label = f"{host.name}{tags_str}"
else:
label = host.name
is_selected = host.name in self._selected_names
selections.append(Selection(label, host.name, is_selected))
return selections
def _rebuild_list(self) -> None:
"""Rebuild the selection list with current filter."""
selection_list = self.query_one("#server-list", SelectionList)
selection_list.clear_options()
for selection in self._build_selections():
selection_list.add_option(selection)
self._update_info()
def on_input_changed(self, event: Input.Changed) -> None:
"""Handle search input changes."""
if event.input.id == "search-input":
self._current_filter = event.value
self._rebuild_list()
def on_selection_list_selected_changed(
self, event: SelectionList.SelectedChanged
) -> None:
"""Handle selection changes."""
visible_selected = set(event.selection_list.selected)
for option in event.selection_list._options:
name = option.value
if name in visible_selected:
self._selected_names.add(name)
elif name in self._selected_names:
self._selected_names.discard(name)
self._update_info()
self.post_message(self.SelectionChanged(self.get_selected()))
def _update_info(self) -> None:
"""Update the selection info label."""
info = self.query_one("#selection-info", Static)
count = len(self._selected_names)
visible_count = len(self.query_one("#server-list", SelectionList)._options)
total_count = len(self._hosts)
if self._current_filter:
info.update(f"{count} selected ({visible_count}/{total_count} shown)")
else:
info.update(f"{count} selected")
def get_selected(self) -> list[VPSHost]:
"""Get currently selected hosts (including filtered-out ones)."""
return [
self._hosts[name] for name in self._selected_names if name in self._hosts
]
def select_all(self) -> None:
"""Select all visible servers."""
selection_list = self.query_one("#server-list", SelectionList)
selection_list.select_all()
for option in selection_list._options:
self._selected_names.add(option.value)
self._update_info()
def deselect_all(self) -> None:
"""Deselect all visible servers."""
selection_list = self.query_one("#server-list", SelectionList)
for option in selection_list._options:
self._selected_names.discard(option.value)
selection_list.deselect_all()
self._update_info()
def get_all_tags(self) -> set[str]:
"""Get all unique tags from all hosts."""
tags = set()
for host in self._hosts.values():
tags.update(host.tags)
return tags