Module connpy.services

Sub-modules

connpy.services.ai_service
connpy.services.base
connpy.services.config_service
connpy.services.context_service
connpy.services.exceptions
connpy.services.execution_service
connpy.services.import_export_service
connpy.services.node_service
connpy.services.plugin_service
connpy.services.profile_service
connpy.services.provider
connpy.services.sync_service
connpy.services.system_service
connpy.services.user_service

Classes

class AIService (config=None)
Expand source code
class AIService(BaseService):
    """Business logic for interacting with AI agents and LLM configurations."""

    def _clean_cisco_scrolling(self, text: str) -> str:
        """Resolves horizontal scrolling artifacts (backspaces, \r, ANSI) by merging overlapping segments."""
        def merge_overlapping(s1, s2):
            s2_clean = s2.lstrip(' $')
            max_overlap = min(len(s1), len(s2_clean))
            for i in range(max_overlap, 0, -1):
                if s1[-i:] == s2_clean[:i]:
                    return s1 + s2_clean[i:]
            return s1 + s2_clean

        scroll_re = re.compile(r'(\x08{5,}\s*\$?|\$\r|\x1b\[\d+[GD]\s*\$?)')
        parts = scroll_re.split(text)
        merged = ""
        
        for part in parts:
            if scroll_re.match(part):
                continue
                
            cleaned = log_cleaner(part)
            if not merged:
                merged = cleaned
            else:
                merged_lines = merged.split('\n')
                cleaned_lines = cleaned.split('\n')
                
                merged_lines[-1] = merge_overlapping(merged_lines[-1], cleaned_lines[0])
                merged_lines.extend(cleaned_lines[1:])
                merged = "\n".join(merged_lines)
                
        return merged

    def build_context_blocks(self, raw_bytes: bytes, cmd_byte_positions: list, node_info: dict, last_line: str = "") -> list:
        """Identifies command blocks in the terminal history."""
        blocks = []
        if not raw_bytes:
            return blocks
            
        default_prompt = r'>$|#$|\$$|>.$|#.$|\$.$'
        device_prompt = node_info.get("prompt", default_prompt) if isinstance(node_info, dict) else default_prompt
        prompt_re_str = re.sub(r'(?<!\\)\$', '', device_prompt)
        try:
            prompt_re = re.compile(prompt_re_str)
        except Exception:
            prompt_re = re.compile(re.sub(r'(?<!\\)\$', '', default_prompt))
            
        parsed_positions = []
        if cmd_byte_positions and len(cmd_byte_positions) >= 1:
            for i in range(1, len(cmd_byte_positions)):
                pos, known_cmd = cmd_byte_positions[i]
                prev_pos = cmd_byte_positions[i-1][0]
                
                if known_cmd:
                    if known_cmd == "CANCELLED":
                        parsed_positions.append({"pos": pos, "type": "CANCELLED", "preview": ""})
                    else:
                        prev_chunk = raw_bytes[prev_pos:pos]
                        prev_cleaned = self._clean_cisco_scrolling(prev_chunk.decode(errors='replace'))
                        prev_lines = [l for l in prev_cleaned.split('\n') if l.strip()]
                        prompt_text = prev_lines[-1].strip() if prev_lines else ""
                        preview = f"{prompt_text}{known_cmd}" if prompt_text else known_cmd
                        
                        if len(preview) > 80:
                            preview = preview[:77] + "..."
                        parsed_positions.append({"pos": pos, "type": "VALID_CMD", "preview": preview})
                else:
                    chunk = raw_bytes[prev_pos:pos]
                    
                    cleaned = self._clean_cisco_scrolling(chunk.decode(errors='replace'))
                    lines = [l for l in cleaned.split('\n') if l.strip()]
                    
                    found_in_pass1 = False
                    if lines:
                        # Search backwards through the last few lines for the prompt
                        for idx in range(len(lines) - 1, max(-1, len(lines) - 10), -1):
                            match = prompt_re.search(lines[idx])
                            if match:
                                ptxt = match.group(0).strip()
                                cmd_first_line = lines[idx][match.end():].strip()
                                cmd_rest = [l.strip() for l in lines[idx+1:]]
                                cmd_text = " ".join([cmd_first_line] + cmd_rest).strip()
                                
                                if cmd_text:
                                    pv = f"{ptxt} {cmd_text}".strip()
                                    if len(pv) > 80:
                                        pv = pv[:77] + "..."
                                    parsed_positions.append({"pos": pos, "type": "VALID_CMD", "preview": pv})
                                else:
                                    parsed_positions.append({"pos": pos, "type": "EMPTY_PROMPT", "preview": ""})
                                found_in_pass1 = True
                                break
                        
                        if not found_in_pass1:
                            # Fallback: The prompt might have been isolated in the previous chunk 
                            # due to asynchronous network delays splitting the output exactly at the newline.
                            prev_was_valid_cmd = i >= 2 and parsed_positions[i-2]["type"] == "VALID_CMD"
                            if prev_pos > 0 and not prev_was_valid_cmd:
                                # Fetch the very last chunk that we just processed
                                prev_prev_pos = cmd_byte_positions[i-2][0] if i >= 2 else 0
                                prev_chunk_text = self._clean_cisco_scrolling(raw_bytes[prev_prev_pos:prev_pos].decode(errors='replace'))
                                prev_lines_text = [l for l in prev_chunk_text.split('\n') if l.strip()]
                                
                                if prev_lines_text:
                                    prev_match = prompt_re.search(prev_lines_text[-1])
                                    if prev_match:
                                        ptxt = prev_match.group(0).strip()
                                        cmd_text = " ".join([l.strip() for l in lines]).strip()
                                        if cmd_text:
                                            pv = f"{ptxt} {cmd_text}".strip()
                                            if len(pv) > 80:
                                                pv = pv[:77] + "..."
                                            parsed_positions.append({"pos": pos, "type": "VALID_CMD", "preview": pv})
                                            found_in_pass1 = True
                            
                            if not found_in_pass1:
                                parsed_positions.append({"pos": pos, "type": "SCROLLING", "preview": ""})
                    else:
                        parsed_positions.append({"pos": pos, "type": "SCROLLING", "preview": ""})

        last_newline = raw_bytes.rfind(b'\n')
        current_prompt_pos = last_newline + 1 if last_newline != -1 else 0
        current_end = len(raw_bytes)

        for i, item in enumerate(parsed_positions):
            if item["type"] == "VALID_CMD":
                start_pos = item["pos"]
                preview = item["preview"]
                
                # Find the end position: next VALID_CMD or EMPTY_PROMPT or CANCELLED
                end_pos = current_prompt_pos
                for j in range(i + 1, len(parsed_positions)):
                    next_item = parsed_positions[j]
                    if next_item["type"] in ("VALID_CMD", "EMPTY_PROMPT", "CANCELLED"):
                        end_pos = next_item["pos"]
                        break
                
                blocks.append((start_pos, end_pos, preview))

        # Always ensure there is a final block representing the current prompt
        if not blocks:
            blocks.append((current_prompt_pos, current_end, last_line[:80] if last_line else "CURRENT CONTEXT"))
        elif blocks[-1][0] < current_prompt_pos:
            blocks.append((current_prompt_pos, current_end, last_line[:80] if last_line else "CURRENT CONTEXT"))

        return blocks

    def process_copilot_input(self, input_text: str, session_state: dict) -> dict:
        """Parses slash commands and manages session state. Returns directive dict."""
        text = input_text.strip()
        if not text.startswith('/'):
            return {"action": "execute", "clean_prompt": text, "overrides": {}}
            
        parts = text.split(maxsplit=1)
        cmd = parts[0].lower()
        args = parts[1] if len(parts) > 1 else ""
        
        # 1. State Commands (Persistent)
        if cmd == "/os":
            if args:
                session_state['os'] = args
                return {"action": "state_update", "message": f"OS context changed to {args}"}
        elif cmd == "/prompt":
            if args:
                session_state['prompt'] = args
                return {"action": "state_update", "message": f"Prompt regex changed to {args}"}
        elif cmd == "/memorize":
            if args:
                session_state['memories'].append(args)
                return {"action": "state_update", "message": f"Memory added: {args}"}
        elif cmd == "/clear":
            session_state['memories'] = []
            return {"action": "state_update", "message": "Memory cleared"}
            
        # 2. Hybrid Commands
        elif cmd == "/architect":
            if not args:
                session_state['persona'] = 'architect'
                return {"action": "state_update", "message": "Persona set to Architect"}
            else:
                return {"action": "execute", "clean_prompt": args, "overrides": {"persona": "architect"}}
                
        elif cmd == "/engineer":
            if not args:
                session_state['persona'] = 'engineer'
                return {"action": "state_update", "message": "Persona set to Engineer"}
            else:
                return {"action": "execute", "clean_prompt": args, "overrides": {"persona": "engineer"}}
                
        elif cmd == "/trust":
            if not args:
                session_state['trust_mode'] = True
                return {"action": "state_update", "message": "Auto-execute (trust) enabled for session"}
            else:
                return {"action": "execute", "clean_prompt": args, "overrides": {"trust": True}}
                
        elif cmd == "/untrust":
            if not args:
                session_state['trust_mode'] = False
                return {"action": "state_update", "message": "Auto-execute (trust) disabled for session"}
            else:
                return {"action": "execute", "clean_prompt": args, "overrides": {"trust": False}}

        # Unknown command, execute normally
        return {"action": "execute", "clean_prompt": text, "overrides": {}}

    def ask(self, input_text, dryrun=False, chat_history=None, status=None, debug=False, session_id=None, console=None, chunk_callback=None, confirm_handler=None, trust=False, **overrides):
        """Send a prompt to the AI agent."""
        from connpy.ai import ai
        agent = ai(self.config, console=console, confirm_handler=confirm_handler, trust=trust, **overrides)
        return agent.ask(input_text, dryrun, chat_history, status=status, debug=debug, session_id=session_id, chunk_callback=chunk_callback)


    def confirm(self, input_text, console=None):
        """Ask for a safe confirmation of an action."""
        from connpy.ai import ai
        agent = ai(self.config, console=console)
        return agent.confirm(input_text)

    def ask_copilot(self, terminal_buffer, user_question, node_info=None, chunk_callback=None):
        """Ask the AI copilot for terminal assistance."""
        from connpy.ai import ai, run_ai_async
        agent = ai(self.config)
        future = run_ai_async(agent.aask_copilot(terminal_buffer, user_question, node_info, chunk_callback=chunk_callback))
        return future.result()

    async def aask_copilot(self, terminal_buffer, user_question, node_info=None, chunk_callback=None):
        """Ask the AI copilot for terminal assistance asynchronously."""
        from connpy.ai import ai, run_ai_async
        import asyncio
        agent = ai(self.config)
        future = run_ai_async(agent.aask_copilot(terminal_buffer, user_question, node_info, chunk_callback=chunk_callback))
        return await asyncio.wrap_future(future)


    def list_sessions(self, limit=None):
        """Return a list of saved AI sessions, optionally limited."""
        from connpy.ai import ai
        agent = ai(self.config)
        sessions = agent._get_sessions()
        if limit and len(sessions) > limit:
            return sessions[:limit], len(sessions)
        return sessions, len(sessions)

    def delete_session(self, session_id):
        """Delete an AI session by ID."""
        import os
        sessions_dir = os.path.join(self.config.defaultdir, "ai_sessions")
        path = os.path.join(sessions_dir, f"{session_id}.json")
        if os.path.exists(path):
            os.remove(path)
        else:
            raise InvalidConfigurationError(f"Session '{session_id}' not found.")

    def configure_provider(self, provider, model=None, api_key=None, auth=None):
        """Update AI provider settings in the configuration."""
        settings = self.config.config.get("ai", {})
        if model:
            settings[f"{provider}_model"] = model
        if api_key:
            settings[f"{provider}_api_key"] = api_key
        if auth is not None:
            settings[f"{provider}_auth"] = auth
            
        self.config.config["ai"] = settings
        self.config._saveconfig(self.config.file)

    def configure_mcp(self, name, url=None, enabled=None, auto_load_on_os=None, remove=False):
        """Update MCP server settings in the configuration with smart merging."""
        ai_settings = self.config.config.get("ai", {})
        mcp_servers = ai_settings.get("mcp_servers", {})
        
        if remove:
            if name in mcp_servers:
                del mcp_servers[name]
        else:
            # Get existing or new
            server_cfg = mcp_servers.get(name, {})
            
            # Partial updates
            if url is not None:
                server_cfg["url"] = url
            
            if enabled is not None:
                server_cfg["enabled"] = bool(enabled)
            elif "enabled" not in server_cfg:
                server_cfg["enabled"] = True # Default for new entries
                
            if auto_load_on_os is not None:
                if auto_load_on_os == "": # Explicit clear
                    if "auto_load_on_os" in server_cfg:
                        del server_cfg["auto_load_on_os"]
                else:
                    server_cfg["auto_load_on_os"] = auto_load_on_os
            
            mcp_servers[name] = server_cfg
            
        ai_settings["mcp_servers"] = mcp_servers
        self.config.config["ai"] = ai_settings
        self.config._saveconfig(self.config.file)

    def list_mcp_servers(self) -> dict:
        """Get the configured MCP servers."""
        if hasattr(self.config, "get_effective_setting"):
            ai_settings = self.config.get_effective_setting("ai", {})
        else:
            ai_settings = self.config.config.get("ai", {}) if hasattr(self.config, "config") else {}
        return ai_settings.get("mcp_servers", {})

    def load_session_data(self, session_id):
        """Load a session's raw data by ID."""
        from connpy.ai import ai
        agent = ai(self.config)
        return agent.load_session_data(session_id)

    def build_playbook_chat(self, user_input: str, chat_history: list = None, status=None, chunk_callback=None):
        """Interact with the specialized Playbook Builder Agent."""
        from connpy.ai import PlaybookBuilderAgent
        agent = PlaybookBuilderAgent(self.config)
        return agent.ask(user_input, chat_history=chat_history, status=status, chunk_callback=chunk_callback)

    def analyze_execution_results(self, results: dict, query: str = None, status=None, chunk_callback=None):
        """Analyze actual command execution results using Network Architect 1-shot."""
        import json
        results_str = json.dumps(results, indent=2)
        
        prompt = f"@architect: Please analyze the following actual execution results. Diagnose any issues, highlight successful actions, and suggest strategic remediation steps if needed."
        if query:
            prompt += f"\nSpecific user request: {query}"
        prompt += f"\n\nResults Data:\n{results_str}"
        prompt += "\n\nCRITICAL DIRECTIVE: You are running in a strictly 1-shot offline diagnostics mode (--analyze). There is no active conversation loop, and you are NOT conversing with a Network Engineer. You MUST deliver your complete strategic analysis immediately. DO NOT suggest, mention, or attempt to delegate the session back to the engineer."
        
        # Delegate to self.ask, setting stream=True and forwarding callback/status.
        # This will invoke standard ai.ask with '@architect:' prefix, forcing 1-shot architect brain.
        return self.ask(prompt, status=status, chunk_callback=chunk_callback, one_shot=True)

    def predict_execution_results(self, target_nodes: list, commands: list, status=None, chunk_callback=None):
        """Predict and simulate execution results preventively using the Preflight Simulation Agent (1-shot)."""
        nodes_str = ", ".join(target_nodes)
        commands_str = "\n".join(f"- {cmd}" for cmd in commands)
        
        prompt = f"@engineer: Act as a Preflight Simulation Agent. Simulate and predict the expected outputs and behaviors of the following commands on the target nodes. Alert about potential safety or configuration risks based on node profiles."
        prompt += f"\n\nTarget Nodes: {nodes_str}"
        prompt += f"\nCommands to simulate:\n{commands_str}"
        prompt += "\n\nCRITICAL SCALABILITY DIRECTIVE: If there are many target nodes, DO NOT list predictions node-by-node. Instead, group them by Operating System, vendor, or platform, and provide a highly concise Executive Summary. Detail individual risks only for nodes that present specific anomalies or security concerns. Focus on overall impact."
        
        # Delegate to self.ask, using the standard engineer brain but with the simulated preflight prompt.
        return self.ask(prompt, status=status, chunk_callback=chunk_callback)

Business logic for interacting with AI agents and LLM configurations.

Initialize the service.

Args

config
An instance of configfile (or None to instantiate a new one/use global context).

Ancestors

Methods

async def aask_copilot(self, terminal_buffer, user_question, node_info=None, chunk_callback=None)
Expand source code
async def aask_copilot(self, terminal_buffer, user_question, node_info=None, chunk_callback=None):
    """Ask the AI copilot for terminal assistance asynchronously."""
    from connpy.ai import ai, run_ai_async
    import asyncio
    agent = ai(self.config)
    future = run_ai_async(agent.aask_copilot(terminal_buffer, user_question, node_info, chunk_callback=chunk_callback))
    return await asyncio.wrap_future(future)

Ask the AI copilot for terminal assistance asynchronously.

def analyze_execution_results(self, results: dict, query: str = None, status=None, chunk_callback=None)
Expand source code
def analyze_execution_results(self, results: dict, query: str = None, status=None, chunk_callback=None):
    """Analyze actual command execution results using Network Architect 1-shot."""
    import json
    results_str = json.dumps(results, indent=2)
    
    prompt = f"@architect: Please analyze the following actual execution results. Diagnose any issues, highlight successful actions, and suggest strategic remediation steps if needed."
    if query:
        prompt += f"\nSpecific user request: {query}"
    prompt += f"\n\nResults Data:\n{results_str}"
    prompt += "\n\nCRITICAL DIRECTIVE: You are running in a strictly 1-shot offline diagnostics mode (--analyze). There is no active conversation loop, and you are NOT conversing with a Network Engineer. You MUST deliver your complete strategic analysis immediately. DO NOT suggest, mention, or attempt to delegate the session back to the engineer."
    
    # Delegate to self.ask, setting stream=True and forwarding callback/status.
    # This will invoke standard ai.ask with '@architect:' prefix, forcing 1-shot architect brain.
    return self.ask(prompt, status=status, chunk_callback=chunk_callback, one_shot=True)

Analyze actual command execution results using Network Architect 1-shot.

def ask(self,
input_text,
dryrun=False,
chat_history=None,
status=None,
debug=False,
session_id=None,
console=None,
chunk_callback=None,
confirm_handler=None,
trust=False,
**overrides)
Expand source code
def ask(self, input_text, dryrun=False, chat_history=None, status=None, debug=False, session_id=None, console=None, chunk_callback=None, confirm_handler=None, trust=False, **overrides):
    """Send a prompt to the AI agent."""
    from connpy.ai import ai
    agent = ai(self.config, console=console, confirm_handler=confirm_handler, trust=trust, **overrides)
    return agent.ask(input_text, dryrun, chat_history, status=status, debug=debug, session_id=session_id, chunk_callback=chunk_callback)

Send a prompt to the AI agent.

def ask_copilot(self, terminal_buffer, user_question, node_info=None, chunk_callback=None)
Expand source code
def ask_copilot(self, terminal_buffer, user_question, node_info=None, chunk_callback=None):
    """Ask the AI copilot for terminal assistance."""
    from connpy.ai import ai, run_ai_async
    agent = ai(self.config)
    future = run_ai_async(agent.aask_copilot(terminal_buffer, user_question, node_info, chunk_callback=chunk_callback))
    return future.result()

Ask the AI copilot for terminal assistance.

def build_context_blocks(self,
raw_bytes: bytes,
cmd_byte_positions: list,
node_info: dict,
last_line: str = '') ‑> list
Expand source code
def build_context_blocks(self, raw_bytes: bytes, cmd_byte_positions: list, node_info: dict, last_line: str = "") -> list:
    """Identifies command blocks in the terminal history."""
    blocks = []
    if not raw_bytes:
        return blocks
        
    default_prompt = r'>$|#$|\$$|>.$|#.$|\$.$'
    device_prompt = node_info.get("prompt", default_prompt) if isinstance(node_info, dict) else default_prompt
    prompt_re_str = re.sub(r'(?<!\\)\$', '', device_prompt)
    try:
        prompt_re = re.compile(prompt_re_str)
    except Exception:
        prompt_re = re.compile(re.sub(r'(?<!\\)\$', '', default_prompt))
        
    parsed_positions = []
    if cmd_byte_positions and len(cmd_byte_positions) >= 1:
        for i in range(1, len(cmd_byte_positions)):
            pos, known_cmd = cmd_byte_positions[i]
            prev_pos = cmd_byte_positions[i-1][0]
            
            if known_cmd:
                if known_cmd == "CANCELLED":
                    parsed_positions.append({"pos": pos, "type": "CANCELLED", "preview": ""})
                else:
                    prev_chunk = raw_bytes[prev_pos:pos]
                    prev_cleaned = self._clean_cisco_scrolling(prev_chunk.decode(errors='replace'))
                    prev_lines = [l for l in prev_cleaned.split('\n') if l.strip()]
                    prompt_text = prev_lines[-1].strip() if prev_lines else ""
                    preview = f"{prompt_text}{known_cmd}" if prompt_text else known_cmd
                    
                    if len(preview) > 80:
                        preview = preview[:77] + "..."
                    parsed_positions.append({"pos": pos, "type": "VALID_CMD", "preview": preview})
            else:
                chunk = raw_bytes[prev_pos:pos]
                
                cleaned = self._clean_cisco_scrolling(chunk.decode(errors='replace'))
                lines = [l for l in cleaned.split('\n') if l.strip()]
                
                found_in_pass1 = False
                if lines:
                    # Search backwards through the last few lines for the prompt
                    for idx in range(len(lines) - 1, max(-1, len(lines) - 10), -1):
                        match = prompt_re.search(lines[idx])
                        if match:
                            ptxt = match.group(0).strip()
                            cmd_first_line = lines[idx][match.end():].strip()
                            cmd_rest = [l.strip() for l in lines[idx+1:]]
                            cmd_text = " ".join([cmd_first_line] + cmd_rest).strip()
                            
                            if cmd_text:
                                pv = f"{ptxt} {cmd_text}".strip()
                                if len(pv) > 80:
                                    pv = pv[:77] + "..."
                                parsed_positions.append({"pos": pos, "type": "VALID_CMD", "preview": pv})
                            else:
                                parsed_positions.append({"pos": pos, "type": "EMPTY_PROMPT", "preview": ""})
                            found_in_pass1 = True
                            break
                    
                    if not found_in_pass1:
                        # Fallback: The prompt might have been isolated in the previous chunk 
                        # due to asynchronous network delays splitting the output exactly at the newline.
                        prev_was_valid_cmd = i >= 2 and parsed_positions[i-2]["type"] == "VALID_CMD"
                        if prev_pos > 0 and not prev_was_valid_cmd:
                            # Fetch the very last chunk that we just processed
                            prev_prev_pos = cmd_byte_positions[i-2][0] if i >= 2 else 0
                            prev_chunk_text = self._clean_cisco_scrolling(raw_bytes[prev_prev_pos:prev_pos].decode(errors='replace'))
                            prev_lines_text = [l for l in prev_chunk_text.split('\n') if l.strip()]
                            
                            if prev_lines_text:
                                prev_match = prompt_re.search(prev_lines_text[-1])
                                if prev_match:
                                    ptxt = prev_match.group(0).strip()
                                    cmd_text = " ".join([l.strip() for l in lines]).strip()
                                    if cmd_text:
                                        pv = f"{ptxt} {cmd_text}".strip()
                                        if len(pv) > 80:
                                            pv = pv[:77] + "..."
                                        parsed_positions.append({"pos": pos, "type": "VALID_CMD", "preview": pv})
                                        found_in_pass1 = True
                        
                        if not found_in_pass1:
                            parsed_positions.append({"pos": pos, "type": "SCROLLING", "preview": ""})
                else:
                    parsed_positions.append({"pos": pos, "type": "SCROLLING", "preview": ""})

    last_newline = raw_bytes.rfind(b'\n')
    current_prompt_pos = last_newline + 1 if last_newline != -1 else 0
    current_end = len(raw_bytes)

    for i, item in enumerate(parsed_positions):
        if item["type"] == "VALID_CMD":
            start_pos = item["pos"]
            preview = item["preview"]
            
            # Find the end position: next VALID_CMD or EMPTY_PROMPT or CANCELLED
            end_pos = current_prompt_pos
            for j in range(i + 1, len(parsed_positions)):
                next_item = parsed_positions[j]
                if next_item["type"] in ("VALID_CMD", "EMPTY_PROMPT", "CANCELLED"):
                    end_pos = next_item["pos"]
                    break
            
            blocks.append((start_pos, end_pos, preview))

    # Always ensure there is a final block representing the current prompt
    if not blocks:
        blocks.append((current_prompt_pos, current_end, last_line[:80] if last_line else "CURRENT CONTEXT"))
    elif blocks[-1][0] < current_prompt_pos:
        blocks.append((current_prompt_pos, current_end, last_line[:80] if last_line else "CURRENT CONTEXT"))

    return blocks

Identifies command blocks in the terminal history.

def build_playbook_chat(self, user_input: str, chat_history: list = None, status=None, chunk_callback=None)
Expand source code
def build_playbook_chat(self, user_input: str, chat_history: list = None, status=None, chunk_callback=None):
    """Interact with the specialized Playbook Builder Agent."""
    from connpy.ai import PlaybookBuilderAgent
    agent = PlaybookBuilderAgent(self.config)
    return agent.ask(user_input, chat_history=chat_history, status=status, chunk_callback=chunk_callback)

Interact with the specialized Playbook Builder Agent.

def configure_mcp(self, name, url=None, enabled=None, auto_load_on_os=None, remove=False)
Expand source code
def configure_mcp(self, name, url=None, enabled=None, auto_load_on_os=None, remove=False):
    """Update MCP server settings in the configuration with smart merging."""
    ai_settings = self.config.config.get("ai", {})
    mcp_servers = ai_settings.get("mcp_servers", {})
    
    if remove:
        if name in mcp_servers:
            del mcp_servers[name]
    else:
        # Get existing or new
        server_cfg = mcp_servers.get(name, {})
        
        # Partial updates
        if url is not None:
            server_cfg["url"] = url
        
        if enabled is not None:
            server_cfg["enabled"] = bool(enabled)
        elif "enabled" not in server_cfg:
            server_cfg["enabled"] = True # Default for new entries
            
        if auto_load_on_os is not None:
            if auto_load_on_os == "": # Explicit clear
                if "auto_load_on_os" in server_cfg:
                    del server_cfg["auto_load_on_os"]
            else:
                server_cfg["auto_load_on_os"] = auto_load_on_os
        
        mcp_servers[name] = server_cfg
        
    ai_settings["mcp_servers"] = mcp_servers
    self.config.config["ai"] = ai_settings
    self.config._saveconfig(self.config.file)

Update MCP server settings in the configuration with smart merging.

def configure_provider(self, provider, model=None, api_key=None, auth=None)
Expand source code
def configure_provider(self, provider, model=None, api_key=None, auth=None):
    """Update AI provider settings in the configuration."""
    settings = self.config.config.get("ai", {})
    if model:
        settings[f"{provider}_model"] = model
    if api_key:
        settings[f"{provider}_api_key"] = api_key
    if auth is not None:
        settings[f"{provider}_auth"] = auth
        
    self.config.config["ai"] = settings
    self.config._saveconfig(self.config.file)

Update AI provider settings in the configuration.

def confirm(self, input_text, console=None)
Expand source code
def confirm(self, input_text, console=None):
    """Ask for a safe confirmation of an action."""
    from connpy.ai import ai
    agent = ai(self.config, console=console)
    return agent.confirm(input_text)

Ask for a safe confirmation of an action.

def delete_session(self, session_id)
Expand source code
def delete_session(self, session_id):
    """Delete an AI session by ID."""
    import os
    sessions_dir = os.path.join(self.config.defaultdir, "ai_sessions")
    path = os.path.join(sessions_dir, f"{session_id}.json")
    if os.path.exists(path):
        os.remove(path)
    else:
        raise InvalidConfigurationError(f"Session '{session_id}' not found.")

Delete an AI session by ID.

def list_mcp_servers(self) ‑> dict
Expand source code
def list_mcp_servers(self) -> dict:
    """Get the configured MCP servers."""
    if hasattr(self.config, "get_effective_setting"):
        ai_settings = self.config.get_effective_setting("ai", {})
    else:
        ai_settings = self.config.config.get("ai", {}) if hasattr(self.config, "config") else {}
    return ai_settings.get("mcp_servers", {})

Get the configured MCP servers.

def list_sessions(self, limit=None)
Expand source code
def list_sessions(self, limit=None):
    """Return a list of saved AI sessions, optionally limited."""
    from connpy.ai import ai
    agent = ai(self.config)
    sessions = agent._get_sessions()
    if limit and len(sessions) > limit:
        return sessions[:limit], len(sessions)
    return sessions, len(sessions)

Return a list of saved AI sessions, optionally limited.

def load_session_data(self, session_id)
Expand source code
def load_session_data(self, session_id):
    """Load a session's raw data by ID."""
    from connpy.ai import ai
    agent = ai(self.config)
    return agent.load_session_data(session_id)

Load a session's raw data by ID.

def predict_execution_results(self, target_nodes: list, commands: list, status=None, chunk_callback=None)
Expand source code
def predict_execution_results(self, target_nodes: list, commands: list, status=None, chunk_callback=None):
    """Predict and simulate execution results preventively using the Preflight Simulation Agent (1-shot)."""
    nodes_str = ", ".join(target_nodes)
    commands_str = "\n".join(f"- {cmd}" for cmd in commands)
    
    prompt = f"@engineer: Act as a Preflight Simulation Agent. Simulate and predict the expected outputs and behaviors of the following commands on the target nodes. Alert about potential safety or configuration risks based on node profiles."
    prompt += f"\n\nTarget Nodes: {nodes_str}"
    prompt += f"\nCommands to simulate:\n{commands_str}"
    prompt += "\n\nCRITICAL SCALABILITY DIRECTIVE: If there are many target nodes, DO NOT list predictions node-by-node. Instead, group them by Operating System, vendor, or platform, and provide a highly concise Executive Summary. Detail individual risks only for nodes that present specific anomalies or security concerns. Focus on overall impact."
    
    # Delegate to self.ask, using the standard engineer brain but with the simulated preflight prompt.
    return self.ask(prompt, status=status, chunk_callback=chunk_callback)

Predict and simulate execution results preventively using the Preflight Simulation Agent (1-shot).

def process_copilot_input(self, input_text: str, session_state: dict) ‑> dict
Expand source code
def process_copilot_input(self, input_text: str, session_state: dict) -> dict:
    """Parses slash commands and manages session state. Returns directive dict."""
    text = input_text.strip()
    if not text.startswith('/'):
        return {"action": "execute", "clean_prompt": text, "overrides": {}}
        
    parts = text.split(maxsplit=1)
    cmd = parts[0].lower()
    args = parts[1] if len(parts) > 1 else ""
    
    # 1. State Commands (Persistent)
    if cmd == "/os":
        if args:
            session_state['os'] = args
            return {"action": "state_update", "message": f"OS context changed to {args}"}
    elif cmd == "/prompt":
        if args:
            session_state['prompt'] = args
            return {"action": "state_update", "message": f"Prompt regex changed to {args}"}
    elif cmd == "/memorize":
        if args:
            session_state['memories'].append(args)
            return {"action": "state_update", "message": f"Memory added: {args}"}
    elif cmd == "/clear":
        session_state['memories'] = []
        return {"action": "state_update", "message": "Memory cleared"}
        
    # 2. Hybrid Commands
    elif cmd == "/architect":
        if not args:
            session_state['persona'] = 'architect'
            return {"action": "state_update", "message": "Persona set to Architect"}
        else:
            return {"action": "execute", "clean_prompt": args, "overrides": {"persona": "architect"}}
            
    elif cmd == "/engineer":
        if not args:
            session_state['persona'] = 'engineer'
            return {"action": "state_update", "message": "Persona set to Engineer"}
        else:
            return {"action": "execute", "clean_prompt": args, "overrides": {"persona": "engineer"}}
            
    elif cmd == "/trust":
        if not args:
            session_state['trust_mode'] = True
            return {"action": "state_update", "message": "Auto-execute (trust) enabled for session"}
        else:
            return {"action": "execute", "clean_prompt": args, "overrides": {"trust": True}}
            
    elif cmd == "/untrust":
        if not args:
            session_state['trust_mode'] = False
            return {"action": "state_update", "message": "Auto-execute (trust) disabled for session"}
        else:
            return {"action": "execute", "clean_prompt": args, "overrides": {"trust": False}}

    # Unknown command, execute normally
    return {"action": "execute", "clean_prompt": text, "overrides": {}}

Parses slash commands and manages session state. Returns directive dict.

Inherited members

class ConfigService (config=None)
Expand source code
class ConfigService(BaseService):
    """Business logic for general application settings and state configuration."""

    def get_settings(self) -> Dict[str, Any]:
        """Get the global configuration settings block."""
        settings = self.config.config.copy()
        settings["configfolder"] = self.config.defaultdir
        return settings

    def get_default_dir(self) -> str:
        """Get the default configuration directory."""
        return self.config.defaultdir

    def set_config_folder(self, folder_path: str):
        """Set the default location for config file by writing to ~/.config/conn/.folder"""
        if not os.path.isdir(folder_path):
            raise ConnpyError(f"readable_dir:{folder_path} is not a valid path")
            
        pathfile = os.path.join(self.config.anchor_path, ".folder")
        folder = os.path.abspath(folder_path).rstrip('/')
        
        try:
            with open(pathfile, "w") as f:
                f.write(str(folder))
        except Exception as e:
            raise ConnpyError(f"Failed to save config folder: {e}")

    def update_setting(self, key, value):
        """Update a setting in the configuration file."""
        self.config.config[key] = value
        self.config._saveconfig(self.config.file)

    def encrypt_password(self, password):
        """Encrypt a password using the application's configuration encryption key."""
        return self.config.encrypt(password)
        
    def apply_theme_from_file(self, theme_input):
        """Apply 'dark', 'light' theme or load a YAML theme file and save it to the configuration."""
        import yaml
        from ..printer import STYLES, LIGHT_THEME
        
        if theme_input == "dark":
            valid_styles = {}
            self.update_setting("theme", valid_styles)
            return valid_styles
        elif theme_input == "light":
            valid_styles = LIGHT_THEME.copy()
            self.update_setting("theme", valid_styles)
            return valid_styles
            
        if not os.path.exists(theme_input):
            raise InvalidConfigurationError(f"Theme file '{theme_input}' not found.")
            
        try:
            with open(theme_input, 'r') as f:
                user_styles = yaml.safe_load(f)
        except Exception as e:
            raise InvalidConfigurationError(f"Failed to parse theme file: {e}")
            
        if not isinstance(user_styles, dict):
            raise InvalidConfigurationError("Theme file must be a YAML dictionary.")
            
        # Support both direct styles and nested under 'theme' key
        if "theme" in user_styles and isinstance(user_styles["theme"], dict):
            user_styles = user_styles["theme"]
            
        # Filter for valid styles only (prevent junk in config)
        valid_styles = {k: v for k, v in user_styles.items() if k in STYLES}
        
        if not valid_styles:
            raise InvalidConfigurationError("No valid style keys found in theme file.")
            
        # Persist and return merged styles
        self.update_setting("theme", valid_styles)
        return valid_styles

Business logic for general application settings and state configuration.

Initialize the service.

Args

config
An instance of configfile (or None to instantiate a new one/use global context).

Ancestors

Methods

def apply_theme_from_file(self, theme_input)
Expand source code
def apply_theme_from_file(self, theme_input):
    """Apply 'dark', 'light' theme or load a YAML theme file and save it to the configuration."""
    import yaml
    from ..printer import STYLES, LIGHT_THEME
    
    if theme_input == "dark":
        valid_styles = {}
        self.update_setting("theme", valid_styles)
        return valid_styles
    elif theme_input == "light":
        valid_styles = LIGHT_THEME.copy()
        self.update_setting("theme", valid_styles)
        return valid_styles
        
    if not os.path.exists(theme_input):
        raise InvalidConfigurationError(f"Theme file '{theme_input}' not found.")
        
    try:
        with open(theme_input, 'r') as f:
            user_styles = yaml.safe_load(f)
    except Exception as e:
        raise InvalidConfigurationError(f"Failed to parse theme file: {e}")
        
    if not isinstance(user_styles, dict):
        raise InvalidConfigurationError("Theme file must be a YAML dictionary.")
        
    # Support both direct styles and nested under 'theme' key
    if "theme" in user_styles and isinstance(user_styles["theme"], dict):
        user_styles = user_styles["theme"]
        
    # Filter for valid styles only (prevent junk in config)
    valid_styles = {k: v for k, v in user_styles.items() if k in STYLES}
    
    if not valid_styles:
        raise InvalidConfigurationError("No valid style keys found in theme file.")
        
    # Persist and return merged styles
    self.update_setting("theme", valid_styles)
    return valid_styles

Apply 'dark', 'light' theme or load a YAML theme file and save it to the configuration.

def encrypt_password(self, password)
Expand source code
def encrypt_password(self, password):
    """Encrypt a password using the application's configuration encryption key."""
    return self.config.encrypt(password)

Encrypt a password using the application's configuration encryption key.

def get_default_dir(self) ‑> str
Expand source code
def get_default_dir(self) -> str:
    """Get the default configuration directory."""
    return self.config.defaultdir

Get the default configuration directory.

def get_settings(self) ‑> Dict[str, Any]
Expand source code
def get_settings(self) -> Dict[str, Any]:
    """Get the global configuration settings block."""
    settings = self.config.config.copy()
    settings["configfolder"] = self.config.defaultdir
    return settings

Get the global configuration settings block.

def set_config_folder(self, folder_path: str)
Expand source code
def set_config_folder(self, folder_path: str):
    """Set the default location for config file by writing to ~/.config/conn/.folder"""
    if not os.path.isdir(folder_path):
        raise ConnpyError(f"readable_dir:{folder_path} is not a valid path")
        
    pathfile = os.path.join(self.config.anchor_path, ".folder")
    folder = os.path.abspath(folder_path).rstrip('/')
    
    try:
        with open(pathfile, "w") as f:
            f.write(str(folder))
    except Exception as e:
        raise ConnpyError(f"Failed to save config folder: {e}")

Set the default location for config file by writing to ~/.config/conn/.folder

def update_setting(self, key, value)
Expand source code
def update_setting(self, key, value):
    """Update a setting in the configuration file."""
    self.config.config[key] = value
    self.config._saveconfig(self.config.file)

Update a setting in the configuration file.

Inherited members

class ConnpyError (*args, **kwargs)
Expand source code
class ConnpyError(Exception):
    """Base exception for all connpy services."""
    pass

Base exception for all connpy services.

Ancestors

  • builtins.Exception
  • builtins.BaseException

Subclasses

class ExecutionError (*args, **kwargs)
Expand source code
class ExecutionError(ConnpyError):
    """Raised when an execution fails or returns error."""
    pass

Raised when an execution fails or returns error.

Ancestors

  • ConnpyError
  • builtins.Exception
  • builtins.BaseException
class ExecutionService (config=None)
Expand source code
class ExecutionService(BaseService):
    """Business logic for executing commands on nodes and running automation scripts."""

    def run_commands(
        self, 
        nodes_filter: str, 
        commands: List[str], 
        variables: Optional[Dict[str, Any]] = None,
        parallel: int = 10,
        timeout: int = 20,
        folder: Optional[str] = None,
        prompt: Optional[str] = None,
        on_node_complete: Optional[Callable] = None,
        logger: Optional[Callable] = None,
        name: Optional[str] = None
    ) -> Dict[str, str]:

        """Execute commands on a set of nodes."""
        try:
            matched_names = self.config._getallnodes(nodes_filter)
            if not matched_names:
                raise ConnpyError(f"No nodes found matching filter: {nodes_filter}")
            
            node_data = self.config.getitems(matched_names, extract=True)
            executor = Nodes(node_data, config=self.config)
            self.last_executor = executor
            
            results = executor.run(
                commands=commands,
                vars=variables,
                parallel=parallel,
                timeout=timeout,
                folder=folder,
                prompt=prompt,
                on_complete=on_node_complete,
                logger=logger
            )

            # Combine output and status for the caller
            full_results = {}
            for unique in results:
                full_results[unique] = {
                    "output": results[unique],
                    "status": executor.status.get(unique, 1)
                }

            return full_results
        except Exception as e:
            raise ConnpyError(f"Execution failed: {e}")

    def test_commands(
        self,
        nodes_filter: str,
        commands: List[str],
        expected: List[str],
        variables: Optional[Dict[str, Any]] = None,
        parallel: int = 10,
        timeout: int = 20,
        folder: Optional[str] = None,
        prompt: Optional[str] = None,
        on_node_complete: Optional[Callable] = None,
        logger: Optional[Callable] = None,
        name: Optional[str] = None
    ) -> Dict[str, Dict[str, bool]]:

        """Run commands and verify expected output on a set of nodes."""
        try:
            matched_names = self.config._getallnodes(nodes_filter)
            if not matched_names:
                raise ConnpyError(f"No nodes found matching filter: {nodes_filter}")
            
            node_data = self.config.getitems(matched_names, extract=True)
            executor = Nodes(node_data, config=self.config)
            self.last_executor = executor
            
            results = executor.test(
                commands=commands,
                expected=expected,
                vars=variables,
                parallel=parallel,
                timeout=timeout,
                folder=folder,
                prompt=prompt,
                on_complete=on_node_complete,
                logger=logger
            )
            return results
        except Exception as e:
            raise ConnpyError(f"Testing failed: {e}")

    def run_cli_script(self, nodes_filter: str, script_path: str, parallel: int = 10) -> Dict[str, str]:
        """Run a plain-text script containing one command per line."""
        if not os.path.exists(script_path):
            raise ConnpyError(f"Script file not found: {script_path}")
            
        try:
            with open(script_path, "r") as f:
                commands = [line.strip() for line in f if line.strip()]
        except Exception as e:
            raise ConnpyError(f"Failed to read script {script_path}: {e}")
            
        return self.run_commands(nodes_filter, commands, parallel=parallel)

Business logic for executing commands on nodes and running automation scripts.

Initialize the service.

Args

config
An instance of configfile (or None to instantiate a new one/use global context).

Ancestors

Methods

def run_cli_script(self, nodes_filter: str, script_path: str, parallel: int = 10) ‑> Dict[str, str]
Expand source code
def run_cli_script(self, nodes_filter: str, script_path: str, parallel: int = 10) -> Dict[str, str]:
    """Run a plain-text script containing one command per line."""
    if not os.path.exists(script_path):
        raise ConnpyError(f"Script file not found: {script_path}")
        
    try:
        with open(script_path, "r") as f:
            commands = [line.strip() for line in f if line.strip()]
    except Exception as e:
        raise ConnpyError(f"Failed to read script {script_path}: {e}")
        
    return self.run_commands(nodes_filter, commands, parallel=parallel)

Run a plain-text script containing one command per line.

def run_commands(self,
nodes_filter: str,
commands: List[str],
variables: Dict[str, Any] | None = None,
parallel: int = 10,
timeout: int = 20,
folder: str | None = None,
prompt: str | None = None,
on_node_complete: Callable | None = None,
logger: Callable | None = None,
name: str | None = None) ‑> Dict[str, str]
Expand source code
def run_commands(
    self, 
    nodes_filter: str, 
    commands: List[str], 
    variables: Optional[Dict[str, Any]] = None,
    parallel: int = 10,
    timeout: int = 20,
    folder: Optional[str] = None,
    prompt: Optional[str] = None,
    on_node_complete: Optional[Callable] = None,
    logger: Optional[Callable] = None,
    name: Optional[str] = None
) -> Dict[str, str]:

    """Execute commands on a set of nodes."""
    try:
        matched_names = self.config._getallnodes(nodes_filter)
        if not matched_names:
            raise ConnpyError(f"No nodes found matching filter: {nodes_filter}")
        
        node_data = self.config.getitems(matched_names, extract=True)
        executor = Nodes(node_data, config=self.config)
        self.last_executor = executor
        
        results = executor.run(
            commands=commands,
            vars=variables,
            parallel=parallel,
            timeout=timeout,
            folder=folder,
            prompt=prompt,
            on_complete=on_node_complete,
            logger=logger
        )

        # Combine output and status for the caller
        full_results = {}
        for unique in results:
            full_results[unique] = {
                "output": results[unique],
                "status": executor.status.get(unique, 1)
            }

        return full_results
    except Exception as e:
        raise ConnpyError(f"Execution failed: {e}")

Execute commands on a set of nodes.

def test_commands(self,
nodes_filter: str,
commands: List[str],
expected: List[str],
variables: Dict[str, Any] | None = None,
parallel: int = 10,
timeout: int = 20,
folder: str | None = None,
prompt: str | None = None,
on_node_complete: Callable | None = None,
logger: Callable | None = None,
name: str | None = None) ‑> Dict[str, Dict[str, bool]]
Expand source code
def test_commands(
    self,
    nodes_filter: str,
    commands: List[str],
    expected: List[str],
    variables: Optional[Dict[str, Any]] = None,
    parallel: int = 10,
    timeout: int = 20,
    folder: Optional[str] = None,
    prompt: Optional[str] = None,
    on_node_complete: Optional[Callable] = None,
    logger: Optional[Callable] = None,
    name: Optional[str] = None
) -> Dict[str, Dict[str, bool]]:

    """Run commands and verify expected output on a set of nodes."""
    try:
        matched_names = self.config._getallnodes(nodes_filter)
        if not matched_names:
            raise ConnpyError(f"No nodes found matching filter: {nodes_filter}")
        
        node_data = self.config.getitems(matched_names, extract=True)
        executor = Nodes(node_data, config=self.config)
        self.last_executor = executor
        
        results = executor.test(
            commands=commands,
            expected=expected,
            vars=variables,
            parallel=parallel,
            timeout=timeout,
            folder=folder,
            prompt=prompt,
            on_complete=on_node_complete,
            logger=logger
        )
        return results
    except Exception as e:
        raise ConnpyError(f"Testing failed: {e}")

Run commands and verify expected output on a set of nodes.

Inherited members

class ImportExportService (config=None)
Expand source code
class ImportExportService(BaseService):
    """Business logic for YAML/JSON inventory import and export."""

    def export_to_file(self, file_path, folders=None):
        """Export nodes/folders to a YAML file."""
        if os.path.exists(file_path):
            raise InvalidConfigurationError(f"File '{file_path}' already exists.")
            
        data = self.export_to_dict(folders)
        try:
            with open(file_path, "w") as f:
                yaml.dump(data, f, Dumper=NoAliasDumper, default_flow_style=False)
        except OSError as e:
            raise InvalidConfigurationError(f"Failed to export to '{file_path}': {e}")

    def export_to_dict(self, folders=None):
        """Export nodes/folders to a dictionary."""
        if not folders:
            return deepcopy(self.config.connections)
        else:
            # Validate folders exist
            for f in folders:
                if f != "@" and f not in self.config._getallfolders():
                    raise NodeNotFoundError(f"Folder '{f}' not found.")
            
            flat = self.config._getallnodesfull(folders, extract=False)
            nested = {}
            for k, v in flat.items():
                uniques = self.config._explode_unique(k)
                if not uniques:
                    continue
                
                if "folder" in uniques and "subfolder" in uniques:
                    f_name = uniques["folder"]
                    s_name = uniques["subfolder"]
                    i_name = uniques["id"]
                    
                    if f_name not in nested:
                        nested[f_name] = {"type": "folder"}
                    if s_name not in nested[f_name]:
                        nested[f_name][s_name] = {"type": "subfolder"}
                        
                    nested[f_name][s_name][i_name] = v
                    
                elif "folder" in uniques:
                    f_name = uniques["folder"]
                    i_name = uniques["id"]
                    
                    if f_name not in nested:
                        nested[f_name] = {"type": "folder"}
                        
                    nested[f_name][i_name] = v
                else:
                    i_name = uniques["id"]
                    nested[i_name] = v
                    
            return nested

    def import_from_file(self, file_path):
        """Import nodes/folders from a YAML file."""
        if not os.path.exists(file_path):
            raise InvalidConfigurationError(f"File '{file_path}' does not exist.")
            
        try:
            with open(file_path, "r") as f:
                data = yaml.load(f, Loader=yaml.FullLoader)
            self.import_from_dict(data)
        except Exception as e:
            raise InvalidConfigurationError(f"Failed to read/parse import file: {e}")

    def import_from_dict(self, data):
        """Import nodes/folders from a dictionary."""
        if not isinstance(data, dict):
            raise InvalidConfigurationError("Invalid import data format: expected a dictionary of nodes.")

        def _traverse_import(node_data, current_folder='', current_subfolder=''):
            for k, v in node_data.items():
                if k == "type":
                    continue
                if isinstance(v, dict):
                    node_type = v.get("type", "connection")
                    if node_type == "folder":
                        self.config._folder_add(folder=k)
                        _traverse_import(v, current_folder=k, current_subfolder='')
                    elif node_type == "subfolder":
                        self.config._folder_add(folder=current_folder, subfolder=k)
                        _traverse_import(v, current_folder=current_folder, current_subfolder=k)
                    elif node_type == "connection":
                        unique_id = k
                        if current_subfolder:
                            unique_id = f"{k}@{current_subfolder}@{current_folder}"
                        elif current_folder:
                            unique_id = f"{k}@{current_folder}"
                        self._validate_node_name(unique_id)
                        
                        kwargs = deepcopy(v)
                        kwargs['id'] = k
                        kwargs['folder'] = current_folder
                        kwargs['subfolder'] = current_subfolder
                        
                        self.config._connections_add(**kwargs)
                else:
                    # Invalid format skip
                    pass

        _traverse_import(data)
        self.config._saveconfig(self.config.file)

Business logic for YAML/JSON inventory import and export.

Initialize the service.

Args

config
An instance of configfile (or None to instantiate a new one/use global context).

Ancestors

Methods

def export_to_dict(self, folders=None)
Expand source code
def export_to_dict(self, folders=None):
    """Export nodes/folders to a dictionary."""
    if not folders:
        return deepcopy(self.config.connections)
    else:
        # Validate folders exist
        for f in folders:
            if f != "@" and f not in self.config._getallfolders():
                raise NodeNotFoundError(f"Folder '{f}' not found.")
        
        flat = self.config._getallnodesfull(folders, extract=False)
        nested = {}
        for k, v in flat.items():
            uniques = self.config._explode_unique(k)
            if not uniques:
                continue
            
            if "folder" in uniques and "subfolder" in uniques:
                f_name = uniques["folder"]
                s_name = uniques["subfolder"]
                i_name = uniques["id"]
                
                if f_name not in nested:
                    nested[f_name] = {"type": "folder"}
                if s_name not in nested[f_name]:
                    nested[f_name][s_name] = {"type": "subfolder"}
                    
                nested[f_name][s_name][i_name] = v
                
            elif "folder" in uniques:
                f_name = uniques["folder"]
                i_name = uniques["id"]
                
                if f_name not in nested:
                    nested[f_name] = {"type": "folder"}
                    
                nested[f_name][i_name] = v
            else:
                i_name = uniques["id"]
                nested[i_name] = v
                
        return nested

Export nodes/folders to a dictionary.

def export_to_file(self, file_path, folders=None)
Expand source code
def export_to_file(self, file_path, folders=None):
    """Export nodes/folders to a YAML file."""
    if os.path.exists(file_path):
        raise InvalidConfigurationError(f"File '{file_path}' already exists.")
        
    data = self.export_to_dict(folders)
    try:
        with open(file_path, "w") as f:
            yaml.dump(data, f, Dumper=NoAliasDumper, default_flow_style=False)
    except OSError as e:
        raise InvalidConfigurationError(f"Failed to export to '{file_path}': {e}")

Export nodes/folders to a YAML file.

def import_from_dict(self, data)
Expand source code
def import_from_dict(self, data):
    """Import nodes/folders from a dictionary."""
    if not isinstance(data, dict):
        raise InvalidConfigurationError("Invalid import data format: expected a dictionary of nodes.")

    def _traverse_import(node_data, current_folder='', current_subfolder=''):
        for k, v in node_data.items():
            if k == "type":
                continue
            if isinstance(v, dict):
                node_type = v.get("type", "connection")
                if node_type == "folder":
                    self.config._folder_add(folder=k)
                    _traverse_import(v, current_folder=k, current_subfolder='')
                elif node_type == "subfolder":
                    self.config._folder_add(folder=current_folder, subfolder=k)
                    _traverse_import(v, current_folder=current_folder, current_subfolder=k)
                elif node_type == "connection":
                    unique_id = k
                    if current_subfolder:
                        unique_id = f"{k}@{current_subfolder}@{current_folder}"
                    elif current_folder:
                        unique_id = f"{k}@{current_folder}"
                    self._validate_node_name(unique_id)
                    
                    kwargs = deepcopy(v)
                    kwargs['id'] = k
                    kwargs['folder'] = current_folder
                    kwargs['subfolder'] = current_subfolder
                    
                    self.config._connections_add(**kwargs)
            else:
                # Invalid format skip
                pass

    _traverse_import(data)
    self.config._saveconfig(self.config.file)

Import nodes/folders from a dictionary.

def import_from_file(self, file_path)
Expand source code
def import_from_file(self, file_path):
    """Import nodes/folders from a YAML file."""
    if not os.path.exists(file_path):
        raise InvalidConfigurationError(f"File '{file_path}' does not exist.")
        
    try:
        with open(file_path, "r") as f:
            data = yaml.load(f, Loader=yaml.FullLoader)
        self.import_from_dict(data)
    except Exception as e:
        raise InvalidConfigurationError(f"Failed to read/parse import file: {e}")

Import nodes/folders from a YAML file.

Inherited members

class InvalidConfigurationError (*args, **kwargs)
Expand source code
class InvalidConfigurationError(ConnpyError):
    """Raised when data or configuration input is invalid."""
    pass

Raised when data or configuration input is invalid.

Ancestors

  • ConnpyError
  • builtins.Exception
  • builtins.BaseException
class NodeAlreadyExistsError (*args, **kwargs)
Expand source code
class NodeAlreadyExistsError(ConnpyError):
    """Raised when a node or folder already exists."""
    pass

Raised when a node or folder already exists.

Ancestors

  • ConnpyError
  • builtins.Exception
  • builtins.BaseException
class NodeNotFoundError (*args, **kwargs)
Expand source code
class NodeNotFoundError(ConnpyError):
    """Raised when a connection or folder is not found."""
    pass

Raised when a connection or folder is not found.

Ancestors

  • ConnpyError
  • builtins.Exception
  • builtins.BaseException
class NodeService (config=None)
Expand source code
class NodeService(BaseService):
    def __init__(self, config=None):
        super().__init__(config)


    def list_nodes(self, filter_str=None, format_str=None):
        """Return a listed filtered by regex match and formatted if needed."""
        nodes = self.config._getallnodes()
        case_sensitive = self.config.config.get("case", False)
        
        if filter_str:
            flags = re.IGNORECASE if not case_sensitive else 0
            nodes = [n for n in nodes if re.search(filter_str, n, flags)]
            
        if not format_str:
            return nodes
            
        from .profile_service import ProfileService
        profile_service = ProfileService(self.config)
        
        formatted_nodes = []
        for n_id in nodes:
            # Use ProfileService to resolve profiles for dynamic formatting
            details = self.config.getitem(n_id, extract=False)
            if details:
                details = profile_service.resolve_node_data(details)
                
                name = n_id.split("@")[0]
                location = n_id.partition("@")[2] or "root"
                
                # Prepare context for .format() with all details
                context = details.copy()
                context.update({
                    "name": name,
                    "NAME": name.upper(),
                    "location": location,
                    "LOCATION": location.upper(),
                })
                
                # Add exploded uniques (id, folder, subfolder)
                uniques = self.config._explode_unique(n_id)
                if uniques:
                    context.update(uniques)
                
                # Add uppercase versions of all keys for convenience
                for k, v in list(context.items()):
                    if isinstance(v, str):
                        context[k.upper()] = v.upper()
                
                try:
                    formatted_nodes.append(format_str.format(**context))
                except (KeyError, IndexError, ValueError):
                    # Fallback to original string if format fails
                    formatted_nodes.append(n_id)
        return formatted_nodes

    def list_folders(self, filter_str=None):
        """Return all unique folders, optionally filtered by regex."""
        folders = self.config._getallfolders()
        case_sensitive = self.config.config.get("case", False)
        
        if filter_str:
            if filter_str.startswith("@"):
                if not case_sensitive:
                    folders = [f for f in folders if f.lower() == filter_str.lower()]
                else:
                    folders = [f for f in folders if f == filter_str]
            else:
                flags = re.IGNORECASE if not case_sensitive else 0
                folders = [f for f in folders if re.search(filter_str, f, flags)]
        return folders

    def get_node_details(self, unique_id):
        """Return full configuration dictionary for a specific node."""
        try:
            details = self.config.getitem(unique_id)
            if not details:
                raise NodeNotFoundError(f"Node '{unique_id}' not found.")
            return details
        except (KeyError, TypeError):
            raise NodeNotFoundError(f"Node '{unique_id}' not found.")

    def explode_unique(self, unique_id):
        """Explode a unique ID into a dictionary of its parts."""
        return self.config._explode_unique(unique_id)

    def generate_cache(self, nodes=None, folders=None, profiles=None):
        """Generate and update the internal nodes cache."""
        self.config._generate_nodes_cache(nodes=nodes, folders=folders, profiles=profiles)

    def validate_parent_folder(self, unique_id, is_folder=False):
        """Check if parent folder exists for a given node unique ID."""
        if is_folder:
            uniques = self.config._explode_unique(unique_id)
            if uniques and "subfolder" in uniques and "folder" in uniques:
                parent_folder = f"@{uniques['folder']}"
                if parent_folder not in self.config._getallfolders():
                    raise NodeNotFoundError(f"Folder '{parent_folder}' not found.")
        else:
            node_folder = unique_id.partition("@")[2]
            if node_folder:
                parent_folder = f"@{node_folder}"
                if parent_folder not in self.config._getallfolders():
                    raise NodeNotFoundError(f"Folder '{parent_folder}' not found.")


    def add_node(self, unique_id, data, is_folder=False):
        """Logic for adding a new node or folder to configuration."""
        if not is_folder:
            self._validate_node_name(unique_id)
            
        all_nodes = self.config._getallnodes()
        all_folders = self.config._getallfolders()
        
        if is_folder:
            if unique_id in all_folders:
                raise NodeAlreadyExistsError(f"Folder '{unique_id}' already exists.")
            uniques = self.config._explode_unique(unique_id)
            if not uniques:
                raise InvalidConfigurationError(f"Invalid folder name '{unique_id}'.")
            
            # Check if parent folder exists when creating a subfolder
            if "subfolder" in uniques:
                self.validate_parent_folder(unique_id, is_folder=True)
                    
            self.config._folder_add(**uniques)
            self.config._saveconfig(self.config.file)
        else:
            if unique_id in all_nodes:
                raise NodeAlreadyExistsError(f"Node '{unique_id}' already exists.")
                
            # Check if parent folder exists when creating a node in a folder
            self.validate_parent_folder(unique_id)
                    
            # Ensure 'id' is in data for config._connections_add
            if "id" not in data:
                uniques = self.config._explode_unique(unique_id)
                if uniques and "id" in uniques:
                    data["id"] = uniques["id"]
            
            self.config._connections_add(**data)
            self.config._saveconfig(self.config.file)

    def update_node(self, unique_id, data, save=True):
        """Explicitly update an existing node."""
        all_nodes = self.config._getallnodes()
        if unique_id not in all_nodes:
            raise NodeNotFoundError(f"Node '{unique_id}' not found.")
            
        # Ensure 'id' is in data for config._connections_add
        if "id" not in data:
            uniques = self.config._explode_unique(unique_id)
            if uniques:
                data["id"] = uniques["id"]
            
        # config._connections_add actually handles updates if ID exists correctly
        self.config._connections_add(**data)
        if save:
            self.config._saveconfig(self.config.file)

    def delete_node(self, unique_id, is_folder=False, save=True):
        """Logic for deleting a node or folder."""
        if is_folder:
            uniques = self.config._explode_unique(unique_id)
            if not uniques:
                raise NodeNotFoundError(f"Folder '{unique_id}' not found or invalid.")
            self.config._folder_del(**uniques)
        else:
            uniques = self.config._explode_unique(unique_id)
            if not uniques:
                raise NodeNotFoundError(f"Node '{unique_id}' not found or invalid.")
            self.config._connections_del(**uniques)
            
        if save:
            self.config._saveconfig(self.config.file)

    def connect_node(self, unique_id, sftp=False, debug=False, logger=None):
        """Interact with a node directly."""
        from connpy.core import node
        from .profile_service import ProfileService
        
        node_data = self.config.getitem(unique_id, extract=False)
        if not node_data:
            raise NodeNotFoundError(f"Node '{unique_id}' not found.")
            
        # Resolve profiles
        profile_service = ProfileService(self.config)
        resolved_data = profile_service.resolve_node_data(node_data)
            
        n = node(unique_id, **resolved_data, config=self.config)
        if sftp:
            n.protocol = "sftp"

        n.interact(debug=debug, logger=logger)

    def move_node(self, src_id, dst_id, copy=False):
        """Move or copy a node."""
        self._validate_node_name(dst_id)
        
        node_data = self.config.getitem(src_id)
        if not node_data:
            raise NodeNotFoundError(f"Source node '{src_id}' not found.")
            
        if dst_id in self.config._getallnodes():
            raise NodeAlreadyExistsError(f"Destination node '{dst_id}' already exists.")
            
        new_uniques = self.config._explode_unique(dst_id)
        if not new_uniques:
            raise InvalidConfigurationError(f"Invalid destination format '{dst_id}'.")
            
        new_node_data = node_data.copy()
        new_node_data.update(new_uniques)
        
        self.config._connections_add(**new_node_data)
        
        if not copy:
            src_uniques = self.config._explode_unique(src_id)
            self.config._connections_del(**src_uniques)
            
        self.config._saveconfig(self.config.file)

    def bulk_add(self, ids, hosts, common_data):
        """Add multiple nodes with shared common configuration."""
        count = 0
        all_nodes = self.config._getallnodes()
        
        for i, uid in enumerate(ids):
            if uid in all_nodes:
                continue
                
            try:
                self._validate_node_name(uid)
            except ReservedNameError:
                # For bulk, we might want to just skip or log. 
                # CLI caller will handle if it wants to be strict.
                continue
                
            host = hosts[i] if i < len(hosts) else hosts[0]
            uniques = self.config._explode_unique(uid)
            if not uniques:
                continue
                
            node_data = common_data.copy()
            node_data.pop("ids", None)
            node_data.pop("location", None)
            node_data.update(uniques)
            node_data["host"] = host
            node_data["type"] = "connection"

            self.config._connections_add(**node_data)
            count += 1
            
        if count > 0:
            self.config._saveconfig(self.config.file)
        return count

    def full_replace(self, connections, profiles):
        """Replace all connections and profiles with new data."""
        self.config.connections = connections
        self.config.profiles = profiles
        self.config._saveconfig(self.config.file)

    def get_inventory(self):
        """Return a full snapshot of connections and profiles."""
        return {
            "connections": self.config.connections,
            "profiles": self.config.profiles
        }

Base class for all connpy services, providing common configuration access.

Initialize the service.

Args

config
An instance of configfile (or None to instantiate a new one/use global context).

Ancestors

Methods

def add_node(self, unique_id, data, is_folder=False)
Expand source code
def add_node(self, unique_id, data, is_folder=False):
    """Logic for adding a new node or folder to configuration."""
    if not is_folder:
        self._validate_node_name(unique_id)
        
    all_nodes = self.config._getallnodes()
    all_folders = self.config._getallfolders()
    
    if is_folder:
        if unique_id in all_folders:
            raise NodeAlreadyExistsError(f"Folder '{unique_id}' already exists.")
        uniques = self.config._explode_unique(unique_id)
        if not uniques:
            raise InvalidConfigurationError(f"Invalid folder name '{unique_id}'.")
        
        # Check if parent folder exists when creating a subfolder
        if "subfolder" in uniques:
            self.validate_parent_folder(unique_id, is_folder=True)
                
        self.config._folder_add(**uniques)
        self.config._saveconfig(self.config.file)
    else:
        if unique_id in all_nodes:
            raise NodeAlreadyExistsError(f"Node '{unique_id}' already exists.")
            
        # Check if parent folder exists when creating a node in a folder
        self.validate_parent_folder(unique_id)
                
        # Ensure 'id' is in data for config._connections_add
        if "id" not in data:
            uniques = self.config._explode_unique(unique_id)
            if uniques and "id" in uniques:
                data["id"] = uniques["id"]
        
        self.config._connections_add(**data)
        self.config._saveconfig(self.config.file)

Logic for adding a new node or folder to configuration.

def bulk_add(self, ids, hosts, common_data)
Expand source code
def bulk_add(self, ids, hosts, common_data):
    """Add multiple nodes with shared common configuration."""
    count = 0
    all_nodes = self.config._getallnodes()
    
    for i, uid in enumerate(ids):
        if uid in all_nodes:
            continue
            
        try:
            self._validate_node_name(uid)
        except ReservedNameError:
            # For bulk, we might want to just skip or log. 
            # CLI caller will handle if it wants to be strict.
            continue
            
        host = hosts[i] if i < len(hosts) else hosts[0]
        uniques = self.config._explode_unique(uid)
        if not uniques:
            continue
            
        node_data = common_data.copy()
        node_data.pop("ids", None)
        node_data.pop("location", None)
        node_data.update(uniques)
        node_data["host"] = host
        node_data["type"] = "connection"

        self.config._connections_add(**node_data)
        count += 1
        
    if count > 0:
        self.config._saveconfig(self.config.file)
    return count

Add multiple nodes with shared common configuration.

def connect_node(self, unique_id, sftp=False, debug=False, logger=None)
Expand source code
def connect_node(self, unique_id, sftp=False, debug=False, logger=None):
    """Interact with a node directly."""
    from connpy.core import node
    from .profile_service import ProfileService
    
    node_data = self.config.getitem(unique_id, extract=False)
    if not node_data:
        raise NodeNotFoundError(f"Node '{unique_id}' not found.")
        
    # Resolve profiles
    profile_service = ProfileService(self.config)
    resolved_data = profile_service.resolve_node_data(node_data)
        
    n = node(unique_id, **resolved_data, config=self.config)
    if sftp:
        n.protocol = "sftp"

    n.interact(debug=debug, logger=logger)

Interact with a node directly.

def delete_node(self, unique_id, is_folder=False, save=True)
Expand source code
def delete_node(self, unique_id, is_folder=False, save=True):
    """Logic for deleting a node or folder."""
    if is_folder:
        uniques = self.config._explode_unique(unique_id)
        if not uniques:
            raise NodeNotFoundError(f"Folder '{unique_id}' not found or invalid.")
        self.config._folder_del(**uniques)
    else:
        uniques = self.config._explode_unique(unique_id)
        if not uniques:
            raise NodeNotFoundError(f"Node '{unique_id}' not found or invalid.")
        self.config._connections_del(**uniques)
        
    if save:
        self.config._saveconfig(self.config.file)

Logic for deleting a node or folder.

def explode_unique(self, unique_id)
Expand source code
def explode_unique(self, unique_id):
    """Explode a unique ID into a dictionary of its parts."""
    return self.config._explode_unique(unique_id)

Explode a unique ID into a dictionary of its parts.

def full_replace(self, connections, profiles)
Expand source code
def full_replace(self, connections, profiles):
    """Replace all connections and profiles with new data."""
    self.config.connections = connections
    self.config.profiles = profiles
    self.config._saveconfig(self.config.file)

Replace all connections and profiles with new data.

def generate_cache(self, nodes=None, folders=None, profiles=None)
Expand source code
def generate_cache(self, nodes=None, folders=None, profiles=None):
    """Generate and update the internal nodes cache."""
    self.config._generate_nodes_cache(nodes=nodes, folders=folders, profiles=profiles)

Generate and update the internal nodes cache.

def get_inventory(self)
Expand source code
def get_inventory(self):
    """Return a full snapshot of connections and profiles."""
    return {
        "connections": self.config.connections,
        "profiles": self.config.profiles
    }

Return a full snapshot of connections and profiles.

def get_node_details(self, unique_id)
Expand source code
def get_node_details(self, unique_id):
    """Return full configuration dictionary for a specific node."""
    try:
        details = self.config.getitem(unique_id)
        if not details:
            raise NodeNotFoundError(f"Node '{unique_id}' not found.")
        return details
    except (KeyError, TypeError):
        raise NodeNotFoundError(f"Node '{unique_id}' not found.")

Return full configuration dictionary for a specific node.

def list_folders(self, filter_str=None)
Expand source code
def list_folders(self, filter_str=None):
    """Return all unique folders, optionally filtered by regex."""
    folders = self.config._getallfolders()
    case_sensitive = self.config.config.get("case", False)
    
    if filter_str:
        if filter_str.startswith("@"):
            if not case_sensitive:
                folders = [f for f in folders if f.lower() == filter_str.lower()]
            else:
                folders = [f for f in folders if f == filter_str]
        else:
            flags = re.IGNORECASE if not case_sensitive else 0
            folders = [f for f in folders if re.search(filter_str, f, flags)]
    return folders

Return all unique folders, optionally filtered by regex.

def list_nodes(self, filter_str=None, format_str=None)
Expand source code
def list_nodes(self, filter_str=None, format_str=None):
    """Return a listed filtered by regex match and formatted if needed."""
    nodes = self.config._getallnodes()
    case_sensitive = self.config.config.get("case", False)
    
    if filter_str:
        flags = re.IGNORECASE if not case_sensitive else 0
        nodes = [n for n in nodes if re.search(filter_str, n, flags)]
        
    if not format_str:
        return nodes
        
    from .profile_service import ProfileService
    profile_service = ProfileService(self.config)
    
    formatted_nodes = []
    for n_id in nodes:
        # Use ProfileService to resolve profiles for dynamic formatting
        details = self.config.getitem(n_id, extract=False)
        if details:
            details = profile_service.resolve_node_data(details)
            
            name = n_id.split("@")[0]
            location = n_id.partition("@")[2] or "root"
            
            # Prepare context for .format() with all details
            context = details.copy()
            context.update({
                "name": name,
                "NAME": name.upper(),
                "location": location,
                "LOCATION": location.upper(),
            })
            
            # Add exploded uniques (id, folder, subfolder)
            uniques = self.config._explode_unique(n_id)
            if uniques:
                context.update(uniques)
            
            # Add uppercase versions of all keys for convenience
            for k, v in list(context.items()):
                if isinstance(v, str):
                    context[k.upper()] = v.upper()
            
            try:
                formatted_nodes.append(format_str.format(**context))
            except (KeyError, IndexError, ValueError):
                # Fallback to original string if format fails
                formatted_nodes.append(n_id)
    return formatted_nodes

Return a listed filtered by regex match and formatted if needed.

def move_node(self, src_id, dst_id, copy=False)
Expand source code
def move_node(self, src_id, dst_id, copy=False):
    """Move or copy a node."""
    self._validate_node_name(dst_id)
    
    node_data = self.config.getitem(src_id)
    if not node_data:
        raise NodeNotFoundError(f"Source node '{src_id}' not found.")
        
    if dst_id in self.config._getallnodes():
        raise NodeAlreadyExistsError(f"Destination node '{dst_id}' already exists.")
        
    new_uniques = self.config._explode_unique(dst_id)
    if not new_uniques:
        raise InvalidConfigurationError(f"Invalid destination format '{dst_id}'.")
        
    new_node_data = node_data.copy()
    new_node_data.update(new_uniques)
    
    self.config._connections_add(**new_node_data)
    
    if not copy:
        src_uniques = self.config._explode_unique(src_id)
        self.config._connections_del(**src_uniques)
        
    self.config._saveconfig(self.config.file)

Move or copy a node.

def update_node(self, unique_id, data, save=True)
Expand source code
def update_node(self, unique_id, data, save=True):
    """Explicitly update an existing node."""
    all_nodes = self.config._getallnodes()
    if unique_id not in all_nodes:
        raise NodeNotFoundError(f"Node '{unique_id}' not found.")
        
    # Ensure 'id' is in data for config._connections_add
    if "id" not in data:
        uniques = self.config._explode_unique(unique_id)
        if uniques:
            data["id"] = uniques["id"]
        
    # config._connections_add actually handles updates if ID exists correctly
    self.config._connections_add(**data)
    if save:
        self.config._saveconfig(self.config.file)

Explicitly update an existing node.

def validate_parent_folder(self, unique_id, is_folder=False)
Expand source code
def validate_parent_folder(self, unique_id, is_folder=False):
    """Check if parent folder exists for a given node unique ID."""
    if is_folder:
        uniques = self.config._explode_unique(unique_id)
        if uniques and "subfolder" in uniques and "folder" in uniques:
            parent_folder = f"@{uniques['folder']}"
            if parent_folder not in self.config._getallfolders():
                raise NodeNotFoundError(f"Folder '{parent_folder}' not found.")
    else:
        node_folder = unique_id.partition("@")[2]
        if node_folder:
            parent_folder = f"@{node_folder}"
            if parent_folder not in self.config._getallfolders():
                raise NodeNotFoundError(f"Folder '{parent_folder}' not found.")

Check if parent folder exists for a given node unique ID.

Inherited members

class PluginService (config=None)
Expand source code
class PluginService(BaseService):
    """Business logic for enabling, disabling, and listing plugins."""

    def _get_plugin_path(self, name, include_disabled=True):
        """Resolves the physical path of a plugin by name. Priority: user, shared/global, core."""
        import os
        
        # 1. User directory
        user_dir = os.path.join(self.config.defaultdir, "plugins")
        if os.path.exists(user_dir):
            p_file = os.path.join(user_dir, f"{name}.py")
            if os.path.exists(p_file):
                return p_file, "user", True
            if include_disabled:
                bkp_file = os.path.join(user_dir, f"{name}.py.bkp")
                if os.path.exists(bkp_file):
                    return bkp_file, "user", False
                    
        # 2. Shared/Global directory
        if hasattr(self.config, "_shared_config") and self.config._shared_config:
            shared_dir = os.path.join(self.config._shared_config.defaultdir, "plugins")
            if os.path.exists(shared_dir):
                p_file = os.path.join(shared_dir, f"{name}.py")
                if os.path.exists(p_file):
                    return p_file, "shared", True
                if include_disabled:
                    bkp_file = os.path.join(shared_dir, f"{name}.py.bkp")
                    if os.path.exists(bkp_file):
                        return bkp_file, "shared", False
                        
        # 3. Core plugins
        core_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), "..", "core_plugins")
        p_file = os.path.join(core_dir, f"{name}.py")
        if os.path.exists(p_file):
            return p_file, "core", True
            
        return None, None, False


    def list_plugins(self):
        """List all core and user-defined plugins with their status and hash."""
        import os
        import hashlib
        
        all_plugin_info = {}

        def get_hash(path):
            try:
                with open(path, "rb") as f:
                    return hashlib.md5(f.read()).hexdigest()
            except Exception:
                return ""

        # 1. Scan core plugins (lowest priority)
        core_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), "..", "core_plugins")
        if os.path.exists(core_dir):
            for f in os.listdir(core_dir):
                if f.endswith(".py"):
                    name = f[:-3]
                    path = os.path.join(core_dir, f)
                    all_plugin_info[name] = {"enabled": True, "hash": get_hash(path)}

        # 2. Scan shared plugins (medium priority)
        if hasattr(self.config, "_shared_config") and self.config._shared_config:
            shared_dir = os.path.join(self.config._shared_config.defaultdir, "plugins")
            if os.path.exists(shared_dir):
                for f in os.listdir(shared_dir):
                    if f.endswith(".py"):
                        name = f[:-3]
                        path = os.path.join(shared_dir, f)
                        all_plugin_info[name] = {"enabled": True, "hash": get_hash(path)}
                    elif f.endswith(".py.bkp"):
                        name = f[:-7]
                        all_plugin_info[name] = {"enabled": False}

        # 3. Scan user plugins (highest priority)
        user_dir = os.path.join(self.config.defaultdir, "plugins")
        if os.path.exists(user_dir):
            for f in os.listdir(user_dir):
                if f.endswith(".py"):
                    name = f[:-3]
                    path = os.path.join(user_dir, f)
                    all_plugin_info[name] = {"enabled": True, "hash": get_hash(path)}
                elif f.endswith(".py.bkp"):
                    name = f[:-7]
                    all_plugin_info[name] = {"enabled": False}

        return all_plugin_info


    def add_plugin(self, name, source_file, update=False):
        """Add or update a plugin from a local file."""
        import os
        import shutil
        from connpy.plugins import Plugins

        if not name.isalpha() or not name.islower() or len(name) > 15:
            raise InvalidConfigurationError("Plugin name should be lowercase letters up to 15 characters.")

        p_manager = Plugins()
        # Check for bad script
        error = p_manager.verify_script(source_file)
        if error:
            raise InvalidConfigurationError(f"Invalid plugin script: {error}")

        self._save_plugin_file(name, source_file, update, is_path=True)

    def add_plugin_from_bytes(self, name, content, update=False):
        """Add or update a plugin from bytes (gRPC)."""
        import tempfile
        import os
        
        if not name.isalpha() or not name.islower() or len(name) > 15:
            raise InvalidConfigurationError("Plugin name should be lowercase letters up to 15 characters.")

        # Write to temp file to verify script
        with tempfile.NamedTemporaryFile(suffix=".py", delete=False) as tmp:
            tmp.write(content)
            tmp_path = tmp.name

        try:
            from connpy.plugins import Plugins
            p_manager = Plugins()
            error = p_manager.verify_script(tmp_path)
            if error:
                raise InvalidConfigurationError(f"Invalid plugin script: {error}")
            
            self._save_plugin_file(name, tmp_path, update, is_path=True)
        finally:
            if os.path.exists(tmp_path):
                os.remove(tmp_path)

    def _save_plugin_file(self, name, source, update=False, is_path=True):
        import os
        import shutil
        
        plugin_dir = os.path.join(self.config.defaultdir, "plugins")
        os.makedirs(plugin_dir, exist_ok=True)
        
        target_file = os.path.join(plugin_dir, f"{name}.py")
        backup_file = f"{target_file}.bkp"

        if not update and (os.path.exists(target_file) or os.path.exists(backup_file)):
            raise InvalidConfigurationError(f"Plugin '{name}' already exists.")

        try:
            if is_path:
                shutil.copy2(source, target_file)
            else:
                with open(target_file, "wb") as f:
                    f.write(source)
        except OSError as e:
            raise InvalidConfigurationError(f"Failed to save plugin file: {e}")

    def delete_plugin(self, name):
        """Remove a plugin file permanently."""
        import os
        plugin_file = os.path.join(self.config.defaultdir, "plugins", f"{name}.py")
        disabled_file = f"{plugin_file}.bkp"

        deleted = False
        for f in [plugin_file, disabled_file]:
            if os.path.exists(f):
                try:
                    os.remove(f)
                    deleted = True
                except OSError as e:
                    raise InvalidConfigurationError(f"Failed to delete plugin file '{f}': {e}")
        
        if not deleted:
            # If not deleted from user directory, check if it's in shared or core
            path, origin, enabled = self._get_plugin_path(name, include_disabled=True)
            if origin in ["shared", "core"]:
                raise InvalidConfigurationError("Global and core plugins are read-only and cannot be deleted by users.")
            raise InvalidConfigurationError(f"Plugin '{name}' not found.")

    def enable_plugin(self, name):
        """Activate a plugin by renaming its backup file."""
        import os
        plugin_file = os.path.join(self.config.defaultdir, "plugins", f"{name}.py")
        disabled_file = f"{plugin_file}.bkp"
        
        if os.path.exists(disabled_file):
            # Check if it is a shadow bkp file (0 bytes shadowing shared/core)
            is_shadow = False
            if os.path.getsize(disabled_file) == 0:
                # Resolve without the local bkp file to verify if shared/core has it
                path, origin, enabled = self._get_plugin_path(name, include_disabled=False)
                if origin in ["shared", "core"]:
                    is_shadow = True
            
            if is_shadow:
                # Remove shadow file to restore inheritance
                try:
                    os.remove(disabled_file)
                    return True
                except OSError as e:
                    raise InvalidConfigurationError(f"Failed to remove shadow file '{disabled_file}': {e}")
            else:
                try:
                    os.rename(disabled_file, plugin_file)
                    return True
                except OSError as e:
                    raise InvalidConfigurationError(f"Failed to enable plugin '{name}': {e}")
        
        if os.path.exists(plugin_file):
            return False # Already enabled
            
        # If it doesn't exist locally, check if it's already an active shared/core plugin
        path, origin, enabled = self._get_plugin_path(name, include_disabled=False)
        if origin in ["shared", "core"]:
            return False # Already active/enabled through inheritance
            
        raise InvalidConfigurationError(f"Plugin '{name}' not found.")

    def disable_plugin(self, name):
        """Deactivate a plugin by renaming it to a backup file."""
        import os
        plugin_file = os.path.join(self.config.defaultdir, "plugins", f"{name}.py")
        disabled_file = f"{plugin_file}.bkp"
        
        if os.path.exists(plugin_file):
            # Regular user-level plugin exists. Rename to bkp
            try:
                os.rename(plugin_file, disabled_file)
                return True
            except OSError as e:
                raise InvalidConfigurationError(f"Failed to disable plugin '{name}': {e}")
                
        if os.path.exists(disabled_file):
            return False # Already disabled
            
        # Check if it exists in shared or core
        path, origin, enabled = self._get_plugin_path(name, include_disabled=False)
        if origin in ["shared", "core"]:
            # Shadow disable it by creating an empty .py.bkp in user plugins dir
            plugin_dir = os.path.dirname(plugin_file)
            os.makedirs(plugin_dir, exist_ok=True)
            try:
                with open(disabled_file, "w") as f:
                    f.write("")
                return True
            except OSError as e:
                raise InvalidConfigurationError(f"Failed to create shadow disable file: {e}")
                
        raise InvalidConfigurationError(f"Plugin '{name}' not found or is already disabled.")

    def get_plugin_source(self, name):
        import os
        from ..services.exceptions import InvalidConfigurationError
        
        path, origin, enabled = self._get_plugin_path(name, include_disabled=False)
        if not path:
            raise InvalidConfigurationError(f"Plugin '{name}' not found")
        
        with open(path, "r") as f:
            return f.read()

    def invoke_plugin(self, name, args_dict):
        import sys, io
        from argparse import Namespace
        from ..services.exceptions import InvalidConfigurationError
        from connpy.plugins import Plugins
        class MockApp:
            is_mock = True
            def __init__(self, config):
                from ..core import node, nodes
                from ..ai import ai
                from ..services.provider import ServiceProvider
                
                self.config = config
                self.node = node
                self.nodes = nodes
                self.ai = ai
                
                self.services = ServiceProvider(config, mode="local")
                
                # Get settings for CLI behavior
                settings = self.services.config_svc.get_settings()
                self.case = settings.get("case", False)
                self.fzf = settings.get("fzf", False)
                
                try:
                    self.nodes_list = self.services.nodes.list_nodes()
                    self.folders = self.services.nodes.list_folders()
                    self.profiles = self.services.profiles.list_profiles()
                except Exception:
                    self.nodes_list = []
                    self.folders = []
                    self.profiles = []
        
        args = Namespace(**args_dict)
        
        p_manager = Plugins()
        import os
        
        path, origin, enabled = self._get_plugin_path(name, include_disabled=False)
        if not path:
            raise InvalidConfigurationError(f"Plugin '{name}' not found")
            
        module = p_manager._import_from_path(path)
        parser = module.Parser().parser if hasattr(module, "Parser") else None
        
        if "__func_name__" in args_dict and hasattr(module, args_dict["__func_name__"]):
            args.func = getattr(module, args_dict["__func_name__"])
        
        app = MockApp(self.config)
        
        from .. import printer
        from rich.console import Console
        
        from rich.console import Console
        import queue
        import threading
        
        q = queue.Queue()
        
        class QueueIO(io.StringIO):
            def write(self, s):
                q.put(s)
                return len(s)
            def flush(self):
                pass
                
        buf = QueueIO()
        old_console = printer._get_console()
        old_err_console = printer._get_err_console()
        
        def run_plugin():
            printer.set_thread_console(Console(file=buf, theme=printer.connpy_theme, force_terminal=True))
            printer.set_thread_err_console(Console(file=buf, theme=printer.connpy_theme, force_terminal=True))
            printer.set_thread_stream(buf)
            try:
                if hasattr(module, "Entrypoint"):
                    module.Entrypoint(args, parser, app)
            except BaseException as e:
                if not isinstance(e, SystemExit):
                    import traceback
                    printer.err_console.print(traceback.format_exc())
            finally:
                printer.set_thread_console(old_console)
                printer.set_thread_err_console(old_err_console)
                printer.set_thread_stream(None)
                q.put(None)
                
        t = threading.Thread(target=run_plugin, daemon=True)
        t.start()
        
        while True:
            item = q.get()
            if item is None:
                break
            yield item

Business logic for enabling, disabling, and listing plugins.

Initialize the service.

Args

config
An instance of configfile (or None to instantiate a new one/use global context).

Ancestors

Methods

def add_plugin(self, name, source_file, update=False)
Expand source code
def add_plugin(self, name, source_file, update=False):
    """Add or update a plugin from a local file."""
    import os
    import shutil
    from connpy.plugins import Plugins

    if not name.isalpha() or not name.islower() or len(name) > 15:
        raise InvalidConfigurationError("Plugin name should be lowercase letters up to 15 characters.")

    p_manager = Plugins()
    # Check for bad script
    error = p_manager.verify_script(source_file)
    if error:
        raise InvalidConfigurationError(f"Invalid plugin script: {error}")

    self._save_plugin_file(name, source_file, update, is_path=True)

Add or update a plugin from a local file.

def add_plugin_from_bytes(self, name, content, update=False)
Expand source code
def add_plugin_from_bytes(self, name, content, update=False):
    """Add or update a plugin from bytes (gRPC)."""
    import tempfile
    import os
    
    if not name.isalpha() or not name.islower() or len(name) > 15:
        raise InvalidConfigurationError("Plugin name should be lowercase letters up to 15 characters.")

    # Write to temp file to verify script
    with tempfile.NamedTemporaryFile(suffix=".py", delete=False) as tmp:
        tmp.write(content)
        tmp_path = tmp.name

    try:
        from connpy.plugins import Plugins
        p_manager = Plugins()
        error = p_manager.verify_script(tmp_path)
        if error:
            raise InvalidConfigurationError(f"Invalid plugin script: {error}")
        
        self._save_plugin_file(name, tmp_path, update, is_path=True)
    finally:
        if os.path.exists(tmp_path):
            os.remove(tmp_path)

Add or update a plugin from bytes (gRPC).

def delete_plugin(self, name)
Expand source code
def delete_plugin(self, name):
    """Remove a plugin file permanently."""
    import os
    plugin_file = os.path.join(self.config.defaultdir, "plugins", f"{name}.py")
    disabled_file = f"{plugin_file}.bkp"

    deleted = False
    for f in [plugin_file, disabled_file]:
        if os.path.exists(f):
            try:
                os.remove(f)
                deleted = True
            except OSError as e:
                raise InvalidConfigurationError(f"Failed to delete plugin file '{f}': {e}")
    
    if not deleted:
        # If not deleted from user directory, check if it's in shared or core
        path, origin, enabled = self._get_plugin_path(name, include_disabled=True)
        if origin in ["shared", "core"]:
            raise InvalidConfigurationError("Global and core plugins are read-only and cannot be deleted by users.")
        raise InvalidConfigurationError(f"Plugin '{name}' not found.")

Remove a plugin file permanently.

def disable_plugin(self, name)
Expand source code
def disable_plugin(self, name):
    """Deactivate a plugin by renaming it to a backup file."""
    import os
    plugin_file = os.path.join(self.config.defaultdir, "plugins", f"{name}.py")
    disabled_file = f"{plugin_file}.bkp"
    
    if os.path.exists(plugin_file):
        # Regular user-level plugin exists. Rename to bkp
        try:
            os.rename(plugin_file, disabled_file)
            return True
        except OSError as e:
            raise InvalidConfigurationError(f"Failed to disable plugin '{name}': {e}")
            
    if os.path.exists(disabled_file):
        return False # Already disabled
        
    # Check if it exists in shared or core
    path, origin, enabled = self._get_plugin_path(name, include_disabled=False)
    if origin in ["shared", "core"]:
        # Shadow disable it by creating an empty .py.bkp in user plugins dir
        plugin_dir = os.path.dirname(plugin_file)
        os.makedirs(plugin_dir, exist_ok=True)
        try:
            with open(disabled_file, "w") as f:
                f.write("")
            return True
        except OSError as e:
            raise InvalidConfigurationError(f"Failed to create shadow disable file: {e}")
            
    raise InvalidConfigurationError(f"Plugin '{name}' not found or is already disabled.")

Deactivate a plugin by renaming it to a backup file.

def enable_plugin(self, name)
Expand source code
def enable_plugin(self, name):
    """Activate a plugin by renaming its backup file."""
    import os
    plugin_file = os.path.join(self.config.defaultdir, "plugins", f"{name}.py")
    disabled_file = f"{plugin_file}.bkp"
    
    if os.path.exists(disabled_file):
        # Check if it is a shadow bkp file (0 bytes shadowing shared/core)
        is_shadow = False
        if os.path.getsize(disabled_file) == 0:
            # Resolve without the local bkp file to verify if shared/core has it
            path, origin, enabled = self._get_plugin_path(name, include_disabled=False)
            if origin in ["shared", "core"]:
                is_shadow = True
        
        if is_shadow:
            # Remove shadow file to restore inheritance
            try:
                os.remove(disabled_file)
                return True
            except OSError as e:
                raise InvalidConfigurationError(f"Failed to remove shadow file '{disabled_file}': {e}")
        else:
            try:
                os.rename(disabled_file, plugin_file)
                return True
            except OSError as e:
                raise InvalidConfigurationError(f"Failed to enable plugin '{name}': {e}")
    
    if os.path.exists(plugin_file):
        return False # Already enabled
        
    # If it doesn't exist locally, check if it's already an active shared/core plugin
    path, origin, enabled = self._get_plugin_path(name, include_disabled=False)
    if origin in ["shared", "core"]:
        return False # Already active/enabled through inheritance
        
    raise InvalidConfigurationError(f"Plugin '{name}' not found.")

Activate a plugin by renaming its backup file.

def get_plugin_source(self, name)
Expand source code
def get_plugin_source(self, name):
    import os
    from ..services.exceptions import InvalidConfigurationError
    
    path, origin, enabled = self._get_plugin_path(name, include_disabled=False)
    if not path:
        raise InvalidConfigurationError(f"Plugin '{name}' not found")
    
    with open(path, "r") as f:
        return f.read()
def invoke_plugin(self, name, args_dict)
Expand source code
def invoke_plugin(self, name, args_dict):
    import sys, io
    from argparse import Namespace
    from ..services.exceptions import InvalidConfigurationError
    from connpy.plugins import Plugins
    class MockApp:
        is_mock = True
        def __init__(self, config):
            from ..core import node, nodes
            from ..ai import ai
            from ..services.provider import ServiceProvider
            
            self.config = config
            self.node = node
            self.nodes = nodes
            self.ai = ai
            
            self.services = ServiceProvider(config, mode="local")
            
            # Get settings for CLI behavior
            settings = self.services.config_svc.get_settings()
            self.case = settings.get("case", False)
            self.fzf = settings.get("fzf", False)
            
            try:
                self.nodes_list = self.services.nodes.list_nodes()
                self.folders = self.services.nodes.list_folders()
                self.profiles = self.services.profiles.list_profiles()
            except Exception:
                self.nodes_list = []
                self.folders = []
                self.profiles = []
    
    args = Namespace(**args_dict)
    
    p_manager = Plugins()
    import os
    
    path, origin, enabled = self._get_plugin_path(name, include_disabled=False)
    if not path:
        raise InvalidConfigurationError(f"Plugin '{name}' not found")
        
    module = p_manager._import_from_path(path)
    parser = module.Parser().parser if hasattr(module, "Parser") else None
    
    if "__func_name__" in args_dict and hasattr(module, args_dict["__func_name__"]):
        args.func = getattr(module, args_dict["__func_name__"])
    
    app = MockApp(self.config)
    
    from .. import printer
    from rich.console import Console
    
    from rich.console import Console
    import queue
    import threading
    
    q = queue.Queue()
    
    class QueueIO(io.StringIO):
        def write(self, s):
            q.put(s)
            return len(s)
        def flush(self):
            pass
            
    buf = QueueIO()
    old_console = printer._get_console()
    old_err_console = printer._get_err_console()
    
    def run_plugin():
        printer.set_thread_console(Console(file=buf, theme=printer.connpy_theme, force_terminal=True))
        printer.set_thread_err_console(Console(file=buf, theme=printer.connpy_theme, force_terminal=True))
        printer.set_thread_stream(buf)
        try:
            if hasattr(module, "Entrypoint"):
                module.Entrypoint(args, parser, app)
        except BaseException as e:
            if not isinstance(e, SystemExit):
                import traceback
                printer.err_console.print(traceback.format_exc())
        finally:
            printer.set_thread_console(old_console)
            printer.set_thread_err_console(old_err_console)
            printer.set_thread_stream(None)
            q.put(None)
            
    t = threading.Thread(target=run_plugin, daemon=True)
    t.start()
    
    while True:
        item = q.get()
        if item is None:
            break
        yield item
def list_plugins(self)
Expand source code
def list_plugins(self):
    """List all core and user-defined plugins with their status and hash."""
    import os
    import hashlib
    
    all_plugin_info = {}

    def get_hash(path):
        try:
            with open(path, "rb") as f:
                return hashlib.md5(f.read()).hexdigest()
        except Exception:
            return ""

    # 1. Scan core plugins (lowest priority)
    core_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), "..", "core_plugins")
    if os.path.exists(core_dir):
        for f in os.listdir(core_dir):
            if f.endswith(".py"):
                name = f[:-3]
                path = os.path.join(core_dir, f)
                all_plugin_info[name] = {"enabled": True, "hash": get_hash(path)}

    # 2. Scan shared plugins (medium priority)
    if hasattr(self.config, "_shared_config") and self.config._shared_config:
        shared_dir = os.path.join(self.config._shared_config.defaultdir, "plugins")
        if os.path.exists(shared_dir):
            for f in os.listdir(shared_dir):
                if f.endswith(".py"):
                    name = f[:-3]
                    path = os.path.join(shared_dir, f)
                    all_plugin_info[name] = {"enabled": True, "hash": get_hash(path)}
                elif f.endswith(".py.bkp"):
                    name = f[:-7]
                    all_plugin_info[name] = {"enabled": False}

    # 3. Scan user plugins (highest priority)
    user_dir = os.path.join(self.config.defaultdir, "plugins")
    if os.path.exists(user_dir):
        for f in os.listdir(user_dir):
            if f.endswith(".py"):
                name = f[:-3]
                path = os.path.join(user_dir, f)
                all_plugin_info[name] = {"enabled": True, "hash": get_hash(path)}
            elif f.endswith(".py.bkp"):
                name = f[:-7]
                all_plugin_info[name] = {"enabled": False}

    return all_plugin_info

List all core and user-defined plugins with their status and hash.

Inherited members

class ProfileAlreadyExistsError (*args, **kwargs)
Expand source code
class ProfileAlreadyExistsError(ConnpyError):
    """Raised when a profile with the same name already exists."""
    pass

Raised when a profile with the same name already exists.

Ancestors

  • ConnpyError
  • builtins.Exception
  • builtins.BaseException
class ProfileNotFoundError (*args, **kwargs)
Expand source code
class ProfileNotFoundError(ConnpyError):
    """Raised when a profile is not found."""
    pass

Raised when a profile is not found.

Ancestors

  • ConnpyError
  • builtins.Exception
  • builtins.BaseException
class ProfileService (config=None)
Expand source code
class ProfileService(BaseService):
    """Business logic for node profiles management."""

    def list_profiles(self, filter_str=None):
        """List all profile names, optionally filtered."""
        profiles = list(self.config.profiles.keys())
        case_sensitive = self.config.config.get("case", False)
        
        if filter_str:
            if not case_sensitive:
                f_str = filter_str.lower()
                return [p for p in profiles if f_str in p.lower()]
            else:
                return [p for p in profiles if filter_str in p]
        return profiles

    def get_profile(self, name, resolve=True):
        """Get the profile dictionary, optionally resolved."""
        profile = self.config.profiles.get(name)
        if not profile:
            raise ProfileNotFoundError(f"Profile '{name}' not found.")
        
        if resolve:
            return self.resolve_node_data(profile)
        return profile

    def add_profile(self, name, data):
        """Add a new profile."""
        if name in self.config.profiles:
            raise ProfileAlreadyExistsError(f"Profile '{name}' already exists.")
            
        # Filter data to match _profiles_add signature and ensure id is passed
        allowed_keys = {"host", "options", "logs", "password", "port", "protocol", "user", "tags", "jumphost"}
        filtered_data = {k: v for k, v in data.items() if k in allowed_keys}
        
        self.config._profiles_add(id=name, **filtered_data)
        self.config._saveconfig(self.config.file)

    def resolve_node_data(self, node_data):
        """Resolve profile references (@profile) in node data and handle inheritance."""
        resolved = node_data.copy()
        
        # 1. Identify all referenced profiles to support inheritance
        referenced_profiles = []
        for value in resolved.values():
            if isinstance(value, str) and value.startswith("@"):
                referenced_profiles.append(value[1:])
            elif isinstance(value, list):
                for item in value:
                    if isinstance(item, str) and item.startswith("@"):
                        referenced_profiles.append(item[1:])
        
        # 2. Resolve explicit references
        for key, value in resolved.items():
            if isinstance(value, str) and value.startswith("@"):
                profile_name = value[1:]
                try:
                    profile = self.get_profile(profile_name, resolve=True)
                    resolved[key] = profile.get(key, "")
                except ProfileNotFoundError:
                    resolved[key] = ""
            elif isinstance(value, list):
                resolved_list = []
                for item in value:
                    if isinstance(item, str) and item.startswith("@"):
                        profile_name = item[1:]
                        try:
                            profile = self.get_profile(profile_name, resolve=True)
                            if "password" in profile:
                                resolved_list.append(profile["password"])
                        except ProfileNotFoundError:
                            pass
                    else:
                        resolved_list.append(item)
                resolved[key] = resolved_list
        
        # 3. Inheritance: Fill empty keys from the first referenced profile
        if referenced_profiles:
            base_profile_name = referenced_profiles[0]
            try:
                base_profile = self.get_profile(base_profile_name, resolve=True)
                for key, value in base_profile.items():
                    # Fill if key is missing or empty
                    if key not in resolved or resolved[key] == "" or resolved[key] == [] or resolved[key] is None:
                        resolved[key] = value
            except ProfileNotFoundError:
                pass

        # 4. Handle default protocol
        if resolved.get("protocol") == "" or resolved.get("protocol") is None:
            try:
                default_profile = self.get_profile("default", resolve=True)
                resolved["protocol"] = default_profile.get("protocol", "ssh")
            except ProfileNotFoundError:
                resolved["protocol"] = "ssh"
                
        return resolved

    def delete_profile(self, name):
        """Delete an existing profile, with safety checks."""
        if name not in self.config.profiles:
            raise ProfileNotFoundError(f"Profile '{name}' not found.")
            
        if name == "default":
            raise InvalidConfigurationError("Cannot delete the 'default' profile.")
            
        used_by = self.config._profileused(name)
        if used_by:
            # We return the list of nodes using it so the UI can inform the user
            raise InvalidConfigurationError(f"Profile '{name}' is used by nodes: {', '.join(used_by)}")
            
        self.config._profiles_del(id=name)
        self.config._saveconfig(self.config.file)

    def update_profile(self, name, data):
        """Update an existing profile."""
        if name not in self.config.profiles:
            raise ProfileNotFoundError(f"Profile '{name}' not found.")
            
        # Merge with existing data
        existing = self.get_profile(name, resolve=False)
        updated_data = existing.copy()
        updated_data.update(data)
        
        # Filter data to match _profiles_add signature
        allowed_keys = {"host", "options", "logs", "password", "port", "protocol", "user", "tags", "jumphost"}
        filtered_data = {k: v for k, v in updated_data.items() if k in allowed_keys}
        
        self.config._profiles_add(id=name, **filtered_data)
        self.config._saveconfig(self.config.file)

Business logic for node profiles management.

Initialize the service.

Args

config
An instance of configfile (or None to instantiate a new one/use global context).

Ancestors

Methods

def add_profile(self, name, data)
Expand source code
def add_profile(self, name, data):
    """Add a new profile."""
    if name in self.config.profiles:
        raise ProfileAlreadyExistsError(f"Profile '{name}' already exists.")
        
    # Filter data to match _profiles_add signature and ensure id is passed
    allowed_keys = {"host", "options", "logs", "password", "port", "protocol", "user", "tags", "jumphost"}
    filtered_data = {k: v for k, v in data.items() if k in allowed_keys}
    
    self.config._profiles_add(id=name, **filtered_data)
    self.config._saveconfig(self.config.file)

Add a new profile.

def delete_profile(self, name)
Expand source code
def delete_profile(self, name):
    """Delete an existing profile, with safety checks."""
    if name not in self.config.profiles:
        raise ProfileNotFoundError(f"Profile '{name}' not found.")
        
    if name == "default":
        raise InvalidConfigurationError("Cannot delete the 'default' profile.")
        
    used_by = self.config._profileused(name)
    if used_by:
        # We return the list of nodes using it so the UI can inform the user
        raise InvalidConfigurationError(f"Profile '{name}' is used by nodes: {', '.join(used_by)}")
        
    self.config._profiles_del(id=name)
    self.config._saveconfig(self.config.file)

Delete an existing profile, with safety checks.

def get_profile(self, name, resolve=True)
Expand source code
def get_profile(self, name, resolve=True):
    """Get the profile dictionary, optionally resolved."""
    profile = self.config.profiles.get(name)
    if not profile:
        raise ProfileNotFoundError(f"Profile '{name}' not found.")
    
    if resolve:
        return self.resolve_node_data(profile)
    return profile

Get the profile dictionary, optionally resolved.

def list_profiles(self, filter_str=None)
Expand source code
def list_profiles(self, filter_str=None):
    """List all profile names, optionally filtered."""
    profiles = list(self.config.profiles.keys())
    case_sensitive = self.config.config.get("case", False)
    
    if filter_str:
        if not case_sensitive:
            f_str = filter_str.lower()
            return [p for p in profiles if f_str in p.lower()]
        else:
            return [p for p in profiles if filter_str in p]
    return profiles

List all profile names, optionally filtered.

def resolve_node_data(self, node_data)
Expand source code
def resolve_node_data(self, node_data):
    """Resolve profile references (@profile) in node data and handle inheritance."""
    resolved = node_data.copy()
    
    # 1. Identify all referenced profiles to support inheritance
    referenced_profiles = []
    for value in resolved.values():
        if isinstance(value, str) and value.startswith("@"):
            referenced_profiles.append(value[1:])
        elif isinstance(value, list):
            for item in value:
                if isinstance(item, str) and item.startswith("@"):
                    referenced_profiles.append(item[1:])
    
    # 2. Resolve explicit references
    for key, value in resolved.items():
        if isinstance(value, str) and value.startswith("@"):
            profile_name = value[1:]
            try:
                profile = self.get_profile(profile_name, resolve=True)
                resolved[key] = profile.get(key, "")
            except ProfileNotFoundError:
                resolved[key] = ""
        elif isinstance(value, list):
            resolved_list = []
            for item in value:
                if isinstance(item, str) and item.startswith("@"):
                    profile_name = item[1:]
                    try:
                        profile = self.get_profile(profile_name, resolve=True)
                        if "password" in profile:
                            resolved_list.append(profile["password"])
                    except ProfileNotFoundError:
                        pass
                else:
                    resolved_list.append(item)
            resolved[key] = resolved_list
    
    # 3. Inheritance: Fill empty keys from the first referenced profile
    if referenced_profiles:
        base_profile_name = referenced_profiles[0]
        try:
            base_profile = self.get_profile(base_profile_name, resolve=True)
            for key, value in base_profile.items():
                # Fill if key is missing or empty
                if key not in resolved or resolved[key] == "" or resolved[key] == [] or resolved[key] is None:
                    resolved[key] = value
        except ProfileNotFoundError:
            pass

    # 4. Handle default protocol
    if resolved.get("protocol") == "" or resolved.get("protocol") is None:
        try:
            default_profile = self.get_profile("default", resolve=True)
            resolved["protocol"] = default_profile.get("protocol", "ssh")
        except ProfileNotFoundError:
            resolved["protocol"] = "ssh"
            
    return resolved

Resolve profile references (@profile) in node data and handle inheritance.

def update_profile(self, name, data)
Expand source code
def update_profile(self, name, data):
    """Update an existing profile."""
    if name not in self.config.profiles:
        raise ProfileNotFoundError(f"Profile '{name}' not found.")
        
    # Merge with existing data
    existing = self.get_profile(name, resolve=False)
    updated_data = existing.copy()
    updated_data.update(data)
    
    # Filter data to match _profiles_add signature
    allowed_keys = {"host", "options", "logs", "password", "port", "protocol", "user", "tags", "jumphost"}
    filtered_data = {k: v for k, v in updated_data.items() if k in allowed_keys}
    
    self.config._profiles_add(id=name, **filtered_data)
    self.config._saveconfig(self.config.file)

Update an existing profile.

Inherited members

class SystemService (config=None)
Expand source code
class SystemService(BaseService):
    """Business logic for application lifecycle (API, processes)."""

    def start_api(self, port=None):
        """Start the Connpy REST API."""
        from connpy.api import start_api
        try:
            start_api(port, config=self.config)
        except Exception as e:
            raise ConnpyError(f"Failed to start API: {e}")

    def debug_api(self, port=None):
        """Start the Connpy REST API in debug mode."""
        from connpy.api import debug_api
        try:
            debug_api(port, config=self.config)
        except Exception as e:
            raise ConnpyError(f"Failed to start API in debug mode: {e}")


    def stop_api(self):
        """Stop the Connpy REST API."""
        try:
            import os
            import signal
            
            pids = ["/run/connpy.pid", "/tmp/connpy.pid"]
            stopped = False
            for pid_file in pids:
                if os.path.exists(pid_file):
                    try:
                        with open(pid_file, "r") as f:
                            # Read only the first line (PID)
                            line = f.readline().strip()
                            if not line:
                                continue
                            pid = int(line)
                        os.kill(pid, signal.SIGTERM)
                        # Remove the PID file after successful kill
                        os.remove(pid_file)
                        stopped = True
                    except (ValueError, OSError, ProcessLookupError):
                        # If process is already dead, just remove the stale PID file
                        try:
                            os.remove(pid_file)
                        except OSError:
                            pass
                        continue
            return stopped
        except Exception as e:
            raise ConnpyError(f"Failed to stop API: {e}")

    def restart_api(self, port=None):
        """Restart the Connpy REST API, maintaining the current port if none provided."""
        if port is None:
            status = self.get_api_status()
            if status["running"] and status.get("port"):
                port = status["port"]
        
        self.stop_api()
        import time
        time.sleep(1)
        self.start_api(port)

    def get_api_status(self):
        """Check if the API is currently running."""
        import os
        pids = ["/run/connpy.pid", "/tmp/connpy.pid"]
        for pid_file in pids:
            if os.path.exists(pid_file):
                try:
                    with open(pid_file, "r") as f:
                        pid_line = f.readline().strip()
                        port_line = f.readline().strip()
                        if not pid_line:
                            continue
                        pid = int(pid_line)
                        port = int(port_line) if port_line else None
                    # Signal 0 checks for process existence without killing it
                    os.kill(pid, 0)
                    return {"running": True, "pid": pid, "port": port, "pid_file": pid_file}
                except (ValueError, OSError, ProcessLookupError):
                    continue
        return {"running": False}

Business logic for application lifecycle (API, processes).

Initialize the service.

Args

config
An instance of configfile (or None to instantiate a new one/use global context).

Ancestors

Methods

def debug_api(self, port=None)
Expand source code
def debug_api(self, port=None):
    """Start the Connpy REST API in debug mode."""
    from connpy.api import debug_api
    try:
        debug_api(port, config=self.config)
    except Exception as e:
        raise ConnpyError(f"Failed to start API in debug mode: {e}")

Start the Connpy REST API in debug mode.

def get_api_status(self)
Expand source code
def get_api_status(self):
    """Check if the API is currently running."""
    import os
    pids = ["/run/connpy.pid", "/tmp/connpy.pid"]
    for pid_file in pids:
        if os.path.exists(pid_file):
            try:
                with open(pid_file, "r") as f:
                    pid_line = f.readline().strip()
                    port_line = f.readline().strip()
                    if not pid_line:
                        continue
                    pid = int(pid_line)
                    port = int(port_line) if port_line else None
                # Signal 0 checks for process existence without killing it
                os.kill(pid, 0)
                return {"running": True, "pid": pid, "port": port, "pid_file": pid_file}
            except (ValueError, OSError, ProcessLookupError):
                continue
    return {"running": False}

Check if the API is currently running.

def restart_api(self, port=None)
Expand source code
def restart_api(self, port=None):
    """Restart the Connpy REST API, maintaining the current port if none provided."""
    if port is None:
        status = self.get_api_status()
        if status["running"] and status.get("port"):
            port = status["port"]
    
    self.stop_api()
    import time
    time.sleep(1)
    self.start_api(port)

Restart the Connpy REST API, maintaining the current port if none provided.

def start_api(self, port=None)
Expand source code
def start_api(self, port=None):
    """Start the Connpy REST API."""
    from connpy.api import start_api
    try:
        start_api(port, config=self.config)
    except Exception as e:
        raise ConnpyError(f"Failed to start API: {e}")

Start the Connpy REST API.

def stop_api(self)
Expand source code
def stop_api(self):
    """Stop the Connpy REST API."""
    try:
        import os
        import signal
        
        pids = ["/run/connpy.pid", "/tmp/connpy.pid"]
        stopped = False
        for pid_file in pids:
            if os.path.exists(pid_file):
                try:
                    with open(pid_file, "r") as f:
                        # Read only the first line (PID)
                        line = f.readline().strip()
                        if not line:
                            continue
                        pid = int(line)
                    os.kill(pid, signal.SIGTERM)
                    # Remove the PID file after successful kill
                    os.remove(pid_file)
                    stopped = True
                except (ValueError, OSError, ProcessLookupError):
                    # If process is already dead, just remove the stale PID file
                    try:
                        os.remove(pid_file)
                    except OSError:
                        pass
                    continue
        return stopped
    except Exception as e:
        raise ConnpyError(f"Failed to stop API: {e}")

Stop the Connpy REST API.

Inherited members