diff --git a/mux-0kmtui/README.md b/mux-0kmtui/README.md new file mode 100644 index 0000000..2378db2 --- /dev/null +++ b/mux-0kmtui/README.md @@ -0,0 +1,95 @@ +# mux-0kmtui + +TUI (Text User Interface) for running commands on multiple VPS simultaneously via `0km vps-mux`. + +## Features + +- **Multi-server selection**: Select multiple servers with the spacebar +- **Filter by name or tag**: Quickly search through your servers +- **Multi-line commands**: Write complete bash scripts in the editor +- **Script import**: Load `.sh` files to execute +- **Parallel execution**: Run commands on all selected servers at once +- **Real-time logs**: View output from each server in dedicated tabs +- **Closable tabs**: Close individual tabs when done + +## Installation + +```bash +cd mux-0kmtui +pip install -e . +``` + +## Usage + +```bash +mux-0kmtui +``` + +### Workflow + +1. **Filter** (optional): Type in the search bar to filter by name or tag +2. **Select**: Check the servers (Space to toggle, or All/None buttons) +3. **Command**: Type a command or load a script (Ctrl+O) +4. **Execute**: Press `Ctrl+R` or click "Run" +5. **Monitor**: View logs from each server in the tabs +6. **Clean up**: Close tabs with the X button + +### Keyboard Shortcuts + +| Key | Action | +|-----|--------| +| `F1` | Show help | +| `Ctrl+Q` | Quit | +| `Ctrl+R` | Run command | +| `Ctrl+O` | Load .sh file | +| `Ctrl+L` | Close all tabs | +| `Ctrl+A` | Select all visible servers | +| `Ctrl+D` | Deselect all visible servers | +| `Escape` | Focus command input | +| `Space` | Toggle server selection | + +## Configuration + +Configuration is read from (in order of priority): + +1. `~/.config/0kmtui/config.yml` +2. `~/.config/0km/config.yml` +3. `/etc/0km/config.yml` + +### Format + +```yaml +hosts: + my-server.com: + host: my-server.com + tags: [prod, web] + + staging.example.com: + host: staging.example.com + tags: [staging, api] + + db-server: + host: 192.168.1.100 + user: admin + port: 2222 + tags: [prod, database] +``` + +### Fields + +| Field | Description | Default | +|-------|-------------|---------| +| `host` | Hostname or IP | (entry name) | +| `user` | SSH user | `root` | +| `port` | SSH port | `22` | +| `tags` | List of tags for filtering | `[]` | + +## Requirements + +- Python 3.10+ +- `0km` command installed and in PATH (from [myc-manage](https://git.myceliandre.fr/Myceliandre/myc-manage)) +- SSH access to configured servers + +## License + +AGPL-3.0 - See [LICENSE](../LICENSE) diff --git a/mux-0kmtui/pyproject.toml b/mux-0kmtui/pyproject.toml new file mode 100644 index 0000000..08a8d2d --- /dev/null +++ b/mux-0kmtui/pyproject.toml @@ -0,0 +1,35 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "mux-0kmtui" +version = "0.1.0" +description = "TUI for running commands on multiple VPS via 0km vps-mux" +readme = "README.md" +requires-python = ">=3.10" +license = "AGPL-3.0-or-later" +authors = [ + { name = "Stéphan Sainléger", email = "stephan@sainleger.fr" } +] +dependencies = [ + "textual>=0.47.0", + "pyyaml>=6.0", +] + +[project.optional-dependencies] +dev = [ + "pytest>=7.0", + "textual-dev>=1.0", +] + +[project.scripts] +mux-0kmtui = "mux_0kmtui.app:main" + +[tool.hatch.build.targets.wheel] +packages = ["src/mux_0kmtui"] + +[tool.hatch.build.targets.sdist] +include = [ + "/src", +] diff --git a/mux-0kmtui/src/mux_0kmtui/__init__.py b/mux-0kmtui/src/mux_0kmtui/__init__.py new file mode 100644 index 0000000..9275c61 --- /dev/null +++ b/mux-0kmtui/src/mux_0kmtui/__init__.py @@ -0,0 +1,3 @@ +"""mux-0kmtui - TUI for running commands on multiple VPS.""" + +__version__ = "0.1.0" diff --git a/mux-0kmtui/src/mux_0kmtui/app.py b/mux-0kmtui/src/mux_0kmtui/app.py new file mode 100644 index 0000000..81f8c5d --- /dev/null +++ b/mux-0kmtui/src/mux_0kmtui/app.py @@ -0,0 +1,444 @@ +"""Main application for mux-0kmtui.""" + +import asyncio +import re +from pathlib import Path + +from textual.app import App, ComposeResult +from textual.binding import Binding +from textual.containers import Horizontal, Vertical, VerticalScroll +from textual.screen import ModalScreen +from textual.widgets import Footer, Header, Button, Static, TextArea, Input, Markdown + +from .config import Config +from .widgets import ServerSelector, LogTabs + + +def _build_help_text(tags: list[str]) -> str: + """Build the help text with available tags.""" + tags_str = ", ".join(f"`{t}`" for t in sorted(tags)) if tags else "*none*" + + return f"""\ +# mux-0kmtui - Help + +## Global + +| Shortcut | Action | +|----------|--------| +| `Ctrl+R` | Run command | +| `Ctrl+O` | Load .sh file | +| `Ctrl+L` | Close all log tabs | +| `F1` | Show this help | +| `Escape` | Focus command input | +| `Ctrl+Q` | Quit | + +## Server Selection + +| Shortcut | Action | +|----------|--------| +| `Space` | Toggle server selection | +| `Ctrl+A` | Select all visible servers | +| `Ctrl+D` | Deselect all visible servers | +| `Up/Down` | Navigate the list | + +### Filtering + +Type in the search bar to filter by: +- **Server name**: e.g. `elabore` → servers containing "elabore" +- **Tag**: e.g. `prod` → servers with the "prod" tag + +Selected servers remain selected even when filtered out. + +**Available tags**: {tags_str} + +## Command Area + +| Shortcut | Action | +|----------|--------| +| `Ctrl+R` | Run command | +| Multi-line | Supported (write a complete script) | + +## Log Tabs + +| Action | Description | +|--------|-------------| +| Click tab | Show server logs | +| `X` button | Close server tab | +| Scroll | Navigate through logs | + +--- +*Press Escape or q to close* +""" + + +class HelpScreen(ModalScreen): + """Modal screen displaying help information.""" + + BINDINGS = [ + Binding("escape", "dismiss", "Close"), + Binding("q", "dismiss", "Close"), + ] + + DEFAULT_CSS = """ + HelpScreen { + align: center middle; + } + + HelpScreen > Vertical { + width: 70; + height: 80%; + border: thick $primary; + background: $surface; + padding: 1 2; + } + + HelpScreen Markdown { + margin: 1 0; + } + """ + + def __init__(self, tags: list[str] | None = None) -> None: + super().__init__() + self._tags = tags or [] + + def compose(self) -> ComposeResult: + help_text = _build_help_text(self._tags) + with Vertical(): + yield VerticalScroll(Markdown(help_text)) + + +class MuxApp(App): + """TUI for running commands on multiple VPS via 0km vps-mux.""" + + TITLE = "mux-0kmtui" + SUB_TITLE = "Multi-VPS Command Runner" + + CSS = """ + Screen { + background: $surface; + } + + #main-container { + height: 1fr; + } + + #left-panel { + width: 40; + height: 100%; + } + + #right-panel { + width: 1fr; + height: 100%; + } + + #command-area { + height: auto; + min-height: 10; + max-height: 18; + dock: bottom; + padding: 0 1; + background: $panel; + border-top: solid $primary; + } + + #command-header { + height: 1; + padding: 0; + margin-bottom: 1; + } + + #command-header > Static { + width: 1fr; + color: $text-muted; + } + + #command-header > Button { + min-width: 12; + margin-left: 1; + border: none; + height: 1; + background: $primary; + color: $text; + text-style: bold; + padding: 0 1; + } + + #command-input { + height: 1fr; + min-height: 3; + } + + #file-input-area { + height: 3; + display: none; + padding: 0; + } + + #file-input-area.visible { + display: block; + } + + #file-path-input { + width: 1fr; + } + + #selection-actions { + height: 3; + padding: 0 1; + } + + #selection-actions Button { + margin-right: 1; + } + + Header { + dock: top; + } + + Footer { + dock: bottom; + } + """ + + BINDINGS = [ + Binding("ctrl+q", "quit", "Quit"), + Binding("ctrl+a", "select_all", "Select All", show=False), + Binding("ctrl+d", "deselect_all", "Deselect All", show=False), + Binding("ctrl+l", "clear_logs", "Clear Logs"), + Binding("ctrl+r", "run_command", "Run", show=True), + Binding("ctrl+o", "load_file", "Load .sh"), + Binding("escape", "focus_command", "Focus Command", show=False), + Binding("f1", "show_help", "Help"), + Binding("question_mark", "show_help", "Help", show=False), + ] + + def __init__(self) -> None: + super().__init__() + self.config = Config.load() + self._running_process: asyncio.subprocess.Process | None = None + + def compose(self) -> ComposeResult: + yield Header() + + with Horizontal(id="main-container"): + with Vertical(id="left-panel"): + yield ServerSelector(self.config.hosts, id="server-selector") + with Horizontal(id="selection-actions"): + yield Button("All", id="btn-all", variant="default") + yield Button("None", id="btn-none", variant="default") + + with Vertical(id="right-panel"): + yield LogTabs(id="log-tabs") + + with Vertical(id="command-area"): + with Horizontal(id="command-header"): + yield Static("Command (multi-line supported, Ctrl+R to run)") + yield Button("Load .sh", id="btn-load", variant="default") + yield Button("Run", id="run-btn", variant="primary") + + yield TextArea(id="command-input") + + with Horizontal(id="file-input-area"): + yield Input( + placeholder="Enter path to .sh file...", + id="file-path-input", + ) + yield Button("OK", id="btn-load-ok", variant="primary") + yield Button("Cancel", id="btn-load-cancel", variant="default") + + yield Footer() + + def on_button_pressed(self, event: Button.Pressed) -> None: + """Handle button presses.""" + if event.button.id == "run-btn": + self.action_run_command() + elif event.button.id == "btn-all": + self.action_select_all() + elif event.button.id == "btn-none": + self.action_deselect_all() + elif event.button.id == "btn-load": + self.action_load_file() + elif event.button.id == "btn-load-ok": + self._do_load_file() + elif event.button.id == "btn-load-cancel": + self._hide_file_input() + + def on_input_submitted(self, event: Input.Submitted) -> None: + """Handle Enter in file path input.""" + if event.input.id == "file-path-input": + self._do_load_file() + + def action_select_all(self) -> None: + """Select all servers.""" + self.query_one("#server-selector", ServerSelector).select_all() + + def action_deselect_all(self) -> None: + """Deselect all servers.""" + self.query_one("#server-selector", ServerSelector).deselect_all() + + def action_clear_logs(self) -> None: + """Clear all log tabs.""" + self.query_one("#log-tabs", LogTabs).clear_all() + + def action_focus_command(self) -> None: + """Focus the command input.""" + self.query_one("#command-input", TextArea).focus() + + def action_show_help(self) -> None: + """Show help screen.""" + tags = set() + for host in self.config.hosts: + tags.update(host.tags) + self.push_screen(HelpScreen(tags=list(tags))) + + def action_load_file(self) -> None: + """Show file input area.""" + file_area = self.query_one("#file-input-area") + file_area.add_class("visible") + file_input = self.query_one("#file-path-input", Input) + file_input.value = "" + file_input.focus() + + def _hide_file_input(self) -> None: + """Hide file input area.""" + file_area = self.query_one("#file-input-area") + file_area.remove_class("visible") + self.query_one("#command-input", TextArea).focus() + + def _do_load_file(self) -> None: + """Load the specified file.""" + file_input = self.query_one("#file-path-input", Input) + file_path = file_input.value.strip() + + if not file_path: + self.notify("Please enter a file path", severity="warning") + return + + path = Path(file_path).expanduser().resolve() + + if not path.exists(): + self.notify(f"File not found: {path}", severity="error") + return + + if not path.is_file(): + self.notify(f"Not a file: {path}", severity="error") + return + + try: + content = path.read_text(encoding="utf-8") + text_area = self.query_one("#command-input", TextArea) + text_area.load_text(content) + self.notify(f"Loaded: {path.name}", severity="information") + self._hide_file_input() + except Exception as e: + self.notify(f"Error reading file: {e}", severity="error") + + def action_run_command(self) -> None: + """Run command on selected servers.""" + text_area = self.query_one("#command-input", TextArea) + command = text_area.text.strip() + if not command: + self.notify("Please enter a command", severity="warning") + return + + selector = self.query_one("#server-selector", ServerSelector) + selected = selector.get_selected() + if not selected: + self.notify("Please select at least one server", severity="warning") + return + + asyncio.create_task(self._run_mux_command(command, [h.name for h in selected])) + + async def _run_mux_command(self, command: str, servers: list[str]) -> None: + """Execute 0km vps-mux command and stream output.""" + log_tabs = self.query_one("#log-tabs", LogTabs) + + first_line = command.split("\n")[0][:50] + if len(command.split("\n")) > 1 or len(command) > 50: + first_line += "..." + + for server in servers: + panel = log_tabs.add_server(server) + panel.clear() + panel.write(f"[cyan]Running: {first_line}[/cyan]") + + cmd = ["0km", "vps-mux"] + servers + + self.notify(f"Running on {len(servers)} server(s)...") + + try: + proc = await asyncio.create_subprocess_exec( + *cmd, + stdin=asyncio.subprocess.PIPE, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.STDOUT, + ) + self._running_process = proc + + if proc.stdin: + proc.stdin.write(command.encode("utf-8")) + if not command.endswith("\n"): + proc.stdin.write(b"\n") + await proc.stdin.drain() + proc.stdin.close() + await proc.stdin.wait_closed() + + line_pattern = re.compile(r"^(.+?)\s+([|!])\s*(.*)$") + + if proc.stdout: + async for line in proc.stdout: + decoded = line.decode("utf-8", errors="replace").rstrip("\n") + clean_line = re.sub(r"\x1b\[[0-9;]*m", "", decoded) + match = line_pattern.match(clean_line) + + if match: + server_name = match.group(1).strip() + is_stderr = match.group(2) == "!" + content = match.group(3) + + panel = log_tabs.get_panel(server_name) + if panel: + if is_stderr: + panel.write_stderr(content) + else: + panel.write_stdout(content) + else: + for server in servers: + panel = log_tabs.get_panel(server) + if panel: + panel.write(decoded) + + await proc.wait() + + for server in servers: + panel = log_tabs.get_panel(server) + if panel: + panel.write("[dim]--- Command completed ---[/dim]") + + self.notify("Command completed", severity="information") + + except FileNotFoundError: + self.notify("Error: 0km command not found", severity="error") + for server in servers: + panel = log_tabs.get_panel(server) + if panel: + panel.write_stderr("Error: 0km command not found in PATH") + except Exception as e: + self.notify(f"Error: {e}", severity="error") + for server in servers: + panel = log_tabs.get_panel(server) + if panel: + panel.write_stderr(f"Error: {e}") + finally: + self._running_process = None + + +def main() -> None: + """Entry point for the application.""" + app = MuxApp() + app.run() + + +if __name__ == "__main__": + main() diff --git a/mux-0kmtui/src/mux_0kmtui/config.py b/mux-0kmtui/src/mux_0kmtui/config.py new file mode 100644 index 0000000..686de14 --- /dev/null +++ b/mux-0kmtui/src/mux_0kmtui/config.py @@ -0,0 +1,167 @@ +"""Configuration management for 0kmtui.""" + +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any + +import yaml + + +@dataclass +class VPSHost: + """Represents a VPS host configuration.""" + + name: str + host: str + user: str = "root" + port: int = 22 + tags: list[str] = field(default_factory=list) + + @classmethod + def from_dict(cls, name: str, data: dict[str, Any] | str) -> "VPSHost": + """Create VPSHost from dict or string.""" + if isinstance(data, str): + return cls(name=name, host=data) + return cls( + name=name, + host=data.get("host", name), + user=data.get("user", "root"), + port=data.get("port", 22), + tags=data.get("tags", []), + ) + + +@dataclass +class Config: + """Application configuration.""" + + hosts: list[VPSHost] = field(default_factory=list) + ssh_public_keys: list[str] = field(default_factory=list) + default_backup_server: str | None = None + + @classmethod + def get_config_paths(cls) -> list[Path]: + """Return list of possible config file paths (first found takes priority for hosts).""" + return [ + Path.home() + / ".config" + / "0kmtui" + / "config.yml", # App-specific config first + Path.home() / ".config" / "0km" / "config.yml", # Shared 0km config + Path("/etc/0km/config.yml"), # System-wide config + ] + + @classmethod + def load(cls, path: Path | None = None) -> "Config": + """Load configuration from YAML files, merging all found configs.""" + if path is not None: + paths = [path] + else: + paths = cls.get_config_paths() + + # Merge all config files found + merged_config = cls() + for config_path in paths: + if config_path.exists(): + file_config = cls._load_from_file(config_path) + # Merge hosts (add new ones, don't overwrite existing) + existing_names = {h.name for h in merged_config.hosts} + for host in file_config.hosts: + if host.name not in existing_names: + merged_config.hosts.append(host) + # Merge SSH keys + for key in file_config.ssh_public_keys: + if key not in merged_config.ssh_public_keys: + merged_config.ssh_public_keys.append(key) + # Use first found backup server + if ( + not merged_config.default_backup_server + and file_config.default_backup_server + ): + merged_config.default_backup_server = ( + file_config.default_backup_server + ) + + return merged_config + + @classmethod + def _load_from_file(cls, path: Path) -> "Config": + """Load config from a specific file.""" + with open(path) as f: + data = yaml.safe_load(f) or {} + + hosts = [] + + # Parse hosts section + hosts_data = data.get("hosts", {}) + if isinstance(hosts_data, dict): + for name, host_data in hosts_data.items(): + hosts.append(VPSHost.from_dict(name, host_data)) + elif isinstance(hosts_data, list): + for item in hosts_data: + if isinstance(item, str): + hosts.append(VPSHost(name=item, host=item)) + elif isinstance(item, dict): + for name, host_data in item.items(): + hosts.append(VPSHost.from_dict(name, host_data)) + + # Parse SSH keys + ssh_keys = [] + ssh_access = data.get("ssh-access", {}) + if isinstance(ssh_access, dict): + ssh_keys = ssh_access.get("public-keys", []) + + return cls( + hosts=hosts, + ssh_public_keys=ssh_keys, + default_backup_server=data.get("default-backup-server"), + ) + + def save(self, path: Path | None = None) -> None: + """Save configuration to YAML file.""" + if path is None: + path = Path.home() / ".config" / "0kmtui" / "config.yml" + + path.parent.mkdir(parents=True, exist_ok=True) + + data: dict[str, Any] = {} + + if self.hosts: + data["hosts"] = {} + for host in self.hosts: + host_data: dict[str, Any] = {"host": host.host} + if host.user != "root": + host_data["user"] = host.user + if host.port != 22: + host_data["port"] = host.port + if host.tags: + host_data["tags"] = host.tags + data["hosts"][host.name] = host_data + + if self.ssh_public_keys: + data["ssh-access"] = {"public-keys": self.ssh_public_keys} + + if self.default_backup_server: + data["default-backup-server"] = self.default_backup_server + + with open(path, "w") as f: + yaml.dump(data, f, default_flow_style=False, allow_unicode=True) + + def add_host(self, host: VPSHost) -> None: + """Add a new host to configuration.""" + # Remove existing host with same name + self.hosts = [h for h in self.hosts if h.name != host.name] + self.hosts.append(host) + + def remove_host(self, name: str) -> bool: + """Remove a host by name. Returns True if found and removed.""" + initial_len = len(self.hosts) + self.hosts = [h for h in self.hosts if h.name != name] + return len(self.hosts) < initial_len + + def get_host(self, name: str) -> VPSHost | None: + """Get a host by name.""" + for host in self.hosts: + if host.name == name: + return host + return None diff --git a/mux-0kmtui/src/mux_0kmtui/widgets/__init__.py b/mux-0kmtui/src/mux_0kmtui/widgets/__init__.py new file mode 100644 index 0000000..e488b3f --- /dev/null +++ b/mux-0kmtui/src/mux_0kmtui/widgets/__init__.py @@ -0,0 +1,6 @@ +"""Widgets for mux-0kmtui.""" + +from .server_selector import ServerSelector +from .log_panel import LogPanel, LogTabs + +__all__ = ["ServerSelector", "LogPanel", "LogTabs"] diff --git a/mux-0kmtui/src/mux_0kmtui/widgets/log_panel.py b/mux-0kmtui/src/mux_0kmtui/widgets/log_panel.py new file mode 100644 index 0000000..232d17f --- /dev/null +++ b/mux-0kmtui/src/mux_0kmtui/widgets/log_panel.py @@ -0,0 +1,214 @@ +"""Log panel widgets for displaying server output.""" + +import re + +from textual.app import ComposeResult +from textual.containers import Vertical, Horizontal +from textual.message import Message +from textual.widgets import Static, RichLog, Button, TabbedContent, TabPane, Label + + +def _sanitize_id(name: str) -> str: + """Convert a server name to a valid Textual ID.""" + return re.sub(r"[^a-zA-Z0-9_-]", "-", name) + + +class LogPanel(Vertical): + """A single server's log output panel.""" + + DEFAULT_CSS = """ + LogPanel { + height: 100%; + border: solid $primary; + } + + LogPanel .log-header { + height: 1; + background: $primary; + color: $text; + padding: 0 1; + } + + LogPanel .log-header Label { + width: 1fr; + } + + LogPanel .log-header Button { + min-width: 3; + height: 1; + border: none; + background: $primary; + } + + LogPanel RichLog { + height: 1fr; + background: $surface; + } + """ + + class CloseRequested(Message): + """Message sent when close is requested.""" + + def __init__(self, server_name: str) -> None: + super().__init__() + self.server_name = server_name + + def __init__( + self, + server_name: str, + *, + name: str | None = None, + id: str | None = None, + classes: str | None = None, + ) -> None: + super().__init__(name=name, id=id, classes=classes) + self.server_name = server_name + self._log: RichLog | None = None + self._pending_writes: list[tuple[str, str | None]] = [] + + def compose(self) -> ComposeResult: + with Horizontal(classes="log-header"): + yield Label(self.server_name) + yield Button("X", id="close-btn", variant="error") + yield RichLog(id="log", highlight=True, markup=True, wrap=True) + + def on_mount(self) -> None: + """Cache the log widget and flush pending writes.""" + self._log = self.query_one("#log", RichLog) + for text, style in self._pending_writes: + self._do_write(text, style) + self._pending_writes.clear() + + def on_button_pressed(self, event: Button.Pressed) -> None: + """Handle close button.""" + if event.button.id == "close-btn": + self.post_message(self.CloseRequested(self.server_name)) + + def _do_write(self, text: str, style: str | None = None) -> None: + """Actually write to the log.""" + if self._log: + if style: + self._log.write(f"[{style}]{text}[/{style}]") + else: + self._log.write(text) + + def write(self, text: str, style: str | None = None) -> None: + """Write text to the log.""" + if self._log: + self._do_write(text, style) + else: + self._pending_writes.append((text, style)) + + def write_stdout(self, text: str) -> None: + """Write stdout text.""" + self.write(text) + + def write_stderr(self, text: str) -> None: + """Write stderr text (in red).""" + self.write(text, "red") + + def clear(self) -> None: + """Clear the log.""" + self._pending_writes.clear() + if self._log: + self._log.clear() + + +class LogTabs(Vertical): + """Container with closable tabs for multiple server logs.""" + + DEFAULT_CSS = """ + LogTabs { + height: 100%; + } + + LogTabs .no-logs { + height: 100%; + content-align: center middle; + color: $text-muted; + } + + LogTabs TabbedContent { + height: 100%; + } + + LogTabs TabPane { + height: 100%; + padding: 0; + } + + LogTabs TabPane > LogPanel { + height: 100%; + } + + LogTabs ContentSwitcher { + height: 1fr; + } + """ + + def __init__( + self, + *, + name: str | None = None, + id: str | None = None, + classes: str | None = None, + ) -> None: + super().__init__(name=name, id=id, classes=classes) + self._panels: dict[str, LogPanel] = {} + + def compose(self) -> ComposeResult: + yield Static( + "Run a command to see server logs here", + classes="no-logs", + id="placeholder", + ) + yield TabbedContent(id="log-tabs") + + def on_mount(self) -> None: + """Hide tabs initially.""" + self.query_one("#log-tabs", TabbedContent).display = False + + def add_server(self, server_name: str) -> LogPanel: + """Add a new server log tab.""" + if server_name in self._panels: + return self._panels[server_name] + + self.query_one("#placeholder").display = False + tabs = self.query_one("#log-tabs", TabbedContent) + tabs.display = True + + safe_id = _sanitize_id(server_name) + panel = LogPanel(server_name, id=f"panel-{safe_id}") + self._panels[server_name] = panel + + tab_pane = TabPane(server_name, panel, id=f"tab-{safe_id}") + tabs.add_pane(tab_pane) + + return panel + + def remove_server(self, server_name: str) -> None: + """Remove a server log tab.""" + if server_name not in self._panels: + return + + safe_id = _sanitize_id(server_name) + tabs = self.query_one("#log-tabs", TabbedContent) + tabs.remove_pane(f"tab-{safe_id}") + del self._panels[server_name] + + if not self._panels: + tabs.display = False + self.query_one("#placeholder").display = True + + def get_panel(self, server_name: str) -> LogPanel | None: + """Get panel for a server.""" + return self._panels.get(server_name) + + def clear_all(self) -> None: + """Remove all server tabs.""" + for name in list(self._panels.keys()): + self.remove_server(name) + + def on_log_panel_close_requested(self, event: LogPanel.CloseRequested) -> None: + """Handle panel close request.""" + self.remove_server(event.server_name) diff --git a/mux-0kmtui/src/mux_0kmtui/widgets/server_selector.py b/mux-0kmtui/src/mux_0kmtui/widgets/server_selector.py new file mode 100644 index 0000000..91a74f2 --- /dev/null +++ b/mux-0kmtui/src/mux_0kmtui/widgets/server_selector.py @@ -0,0 +1,172 @@ +"""Server selection widget with multi-select support.""" + +from textual.app import ComposeResult +from textual.containers import Vertical +from textual.message import Message +from textual.widgets import Static, SelectionList, Input +from textual.widgets.selection_list import Selection + +from ..config import VPSHost + + +class ServerSelector(Vertical): + """Widget for selecting multiple VPS servers with search/filter support.""" + + DEFAULT_CSS = """ + ServerSelector { + border: solid $primary; + height: 100%; + } + + ServerSelector .selector-header { + height: 1; + background: $primary; + color: $text; + padding: 0 1; + } + + ServerSelector #search-input { + height: 1; + border: none; + padding: 0 1; + background: $surface; + } + + ServerSelector #search-input:focus { + border: none; + } + + ServerSelector .selector-info { + height: 1; + background: $surface; + color: $text-muted; + padding: 0 1; + } + + ServerSelector SelectionList { + height: 1fr; + } + """ + + class SelectionChanged(Message): + """Message sent when selection changes.""" + + def __init__(self, selected: list[VPSHost]) -> None: + super().__init__() + self.selected = selected + + def __init__( + self, + hosts: list[VPSHost], + *, + name: str | None = None, + id: str | None = None, + classes: str | None = None, + ) -> None: + super().__init__(name=name, id=id, classes=classes) + self._hosts = {h.name: h for h in hosts} + self._selected_names: set[str] = set() + self._current_filter: str = "" + + def compose(self) -> ComposeResult: + yield Static("Servers (Space=toggle)", classes="selector-header") + yield Input(placeholder="Filter by name or tag...", id="search-input") + yield Static("0 selected", id="selection-info", classes="selector-info") + yield SelectionList[str]( + *self._build_selections(), + id="server-list", + ) + + def _build_selections(self) -> list[Selection[str]]: + """Build selection items, filtered by current search.""" + selections = [] + filter_lower = self._current_filter.lower() + + for host in self._hosts.values(): + if filter_lower: + name_match = filter_lower in host.name.lower() + tag_match = any(filter_lower in tag.lower() for tag in host.tags) + if not (name_match or tag_match): + continue + + if host.tags: + tags_str = " [" + ", ".join(host.tags) + "]" + label = f"{host.name}{tags_str}" + else: + label = host.name + + is_selected = host.name in self._selected_names + selections.append(Selection(label, host.name, is_selected)) + + return selections + + def _rebuild_list(self) -> None: + """Rebuild the selection list with current filter.""" + selection_list = self.query_one("#server-list", SelectionList) + selection_list.clear_options() + for selection in self._build_selections(): + selection_list.add_option(selection) + self._update_info() + + def on_input_changed(self, event: Input.Changed) -> None: + """Handle search input changes.""" + if event.input.id == "search-input": + self._current_filter = event.value + self._rebuild_list() + + def on_selection_list_selected_changed( + self, event: SelectionList.SelectedChanged + ) -> None: + """Handle selection changes.""" + visible_selected = set(event.selection_list.selected) + + for option in event.selection_list._options: + name = option.value + if name in visible_selected: + self._selected_names.add(name) + elif name in self._selected_names: + self._selected_names.discard(name) + + self._update_info() + self.post_message(self.SelectionChanged(self.get_selected())) + + def _update_info(self) -> None: + """Update the selection info label.""" + info = self.query_one("#selection-info", Static) + count = len(self._selected_names) + visible_count = len(self.query_one("#server-list", SelectionList)._options) + total_count = len(self._hosts) + + if self._current_filter: + info.update(f"{count} selected ({visible_count}/{total_count} shown)") + else: + info.update(f"{count} selected") + + def get_selected(self) -> list[VPSHost]: + """Get currently selected hosts (including filtered-out ones).""" + return [ + self._hosts[name] for name in self._selected_names if name in self._hosts + ] + + def select_all(self) -> None: + """Select all visible servers.""" + selection_list = self.query_one("#server-list", SelectionList) + selection_list.select_all() + for option in selection_list._options: + self._selected_names.add(option.value) + self._update_info() + + def deselect_all(self) -> None: + """Deselect all visible servers.""" + selection_list = self.query_one("#server-list", SelectionList) + for option in selection_list._options: + self._selected_names.discard(option.value) + selection_list.deselect_all() + self._update_info() + + def get_all_tags(self) -> set[str]: + """Get all unique tags from all hosts.""" + tags = set() + for host in self._hosts.values(): + tags.update(host.tags) + return tags