Module connpy.services.ai_service

Classes

class AIService (config=None)
Expand source code
class AIService(BaseService):
    """Business logic for interacting with AI agents and LLM configurations."""

    def _clean_cisco_scrolling(self, text: str) -> str:
        """Resolves horizontal scrolling artifacts (backspaces, \r, ANSI) by merging overlapping segments."""
        def merge_overlapping(s1, s2):
            s2_clean = s2.lstrip(' $')
            max_overlap = min(len(s1), len(s2_clean))
            for i in range(max_overlap, 0, -1):
                if s1[-i:] == s2_clean[:i]:
                    return s1 + s2_clean[i:]
            return s1 + s2_clean

        scroll_re = re.compile(r'(\x08{5,}\s*\$?|\$\r|\x1b\[\d+[GD]\s*\$?)')
        parts = scroll_re.split(text)
        merged = ""
        
        for part in parts:
            if scroll_re.match(part):
                continue
                
            cleaned = log_cleaner(part)
            if not merged:
                merged = cleaned
            else:
                merged_lines = merged.split('\n')
                cleaned_lines = cleaned.split('\n')
                
                merged_lines[-1] = merge_overlapping(merged_lines[-1], cleaned_lines[0])
                merged_lines.extend(cleaned_lines[1:])
                merged = "\n".join(merged_lines)
                
        return merged

    def build_context_blocks(self, raw_bytes: bytes, cmd_byte_positions: list, node_info: dict, last_line: str = "") -> list:
        """Identifies command blocks in the terminal history."""
        blocks = []
        if not raw_bytes:
            return blocks
            
        default_prompt = r'>$|#$|\$$|>.$|#.$|\$.$'
        device_prompt = node_info.get("prompt", default_prompt) if isinstance(node_info, dict) else default_prompt
        prompt_re_str = re.sub(r'(?<!\\)\$', '', device_prompt)
        try:
            prompt_re = re.compile(prompt_re_str)
        except Exception:
            prompt_re = re.compile(re.sub(r'(?<!\\)\$', '', default_prompt))
            
        parsed_positions = []
        if cmd_byte_positions and len(cmd_byte_positions) >= 1:
            for i in range(1, len(cmd_byte_positions)):
                pos, known_cmd = cmd_byte_positions[i]
                prev_pos = cmd_byte_positions[i-1][0]
                
                if known_cmd:
                    if known_cmd == "CANCELLED":
                        parsed_positions.append({"pos": pos, "type": "CANCELLED", "preview": ""})
                    else:
                        prev_chunk = raw_bytes[prev_pos:pos]
                        prev_cleaned = self._clean_cisco_scrolling(prev_chunk.decode(errors='replace'))
                        prev_lines = [l for l in prev_cleaned.split('\n') if l.strip()]
                        prompt_text = prev_lines[-1].strip() if prev_lines else ""
                        preview = f"{prompt_text}{known_cmd}" if prompt_text else known_cmd
                        
                        if len(preview) > 80:
                            preview = preview[:77] + "..."
                        parsed_positions.append({"pos": pos, "type": "VALID_CMD", "preview": preview})
                else:
                    chunk = raw_bytes[prev_pos:pos]
                    
                    cleaned = self._clean_cisco_scrolling(chunk.decode(errors='replace'))
                    lines = [l for l in cleaned.split('\n') if l.strip()]
                    
                    found_in_pass1 = False
                    if lines:
                        # Search backwards through the last few lines for the prompt
                        for idx in range(len(lines) - 1, max(-1, len(lines) - 10), -1):
                            match = prompt_re.search(lines[idx])
                            if match:
                                ptxt = match.group(0).strip()
                                cmd_first_line = lines[idx][match.end():].strip()
                                cmd_rest = [l.strip() for l in lines[idx+1:]]
                                cmd_text = " ".join([cmd_first_line] + cmd_rest).strip()
                                
                                if cmd_text:
                                    pv = f"{ptxt} {cmd_text}".strip()
                                    if len(pv) > 80:
                                        pv = pv[:77] + "..."
                                    parsed_positions.append({"pos": pos, "type": "VALID_CMD", "preview": pv})
                                else:
                                    parsed_positions.append({"pos": pos, "type": "EMPTY_PROMPT", "preview": ""})
                                found_in_pass1 = True
                                break
                        
                        if not found_in_pass1:
                            # Fallback: The prompt might have been isolated in the previous chunk 
                            # due to asynchronous network delays splitting the output exactly at the newline.
                            prev_was_valid_cmd = i >= 2 and parsed_positions[i-2]["type"] == "VALID_CMD"
                            if prev_pos > 0 and not prev_was_valid_cmd:
                                # Fetch the very last chunk that we just processed
                                prev_prev_pos = cmd_byte_positions[i-2][0] if i >= 2 else 0
                                prev_chunk_text = self._clean_cisco_scrolling(raw_bytes[prev_prev_pos:prev_pos].decode(errors='replace'))
                                prev_lines_text = [l for l in prev_chunk_text.split('\n') if l.strip()]
                                
                                if prev_lines_text:
                                    prev_match = prompt_re.search(prev_lines_text[-1])
                                    if prev_match:
                                        ptxt = prev_match.group(0).strip()
                                        cmd_text = " ".join([l.strip() for l in lines]).strip()
                                        if cmd_text:
                                            pv = f"{ptxt} {cmd_text}".strip()
                                            if len(pv) > 80:
                                                pv = pv[:77] + "..."
                                            parsed_positions.append({"pos": pos, "type": "VALID_CMD", "preview": pv})
                                            found_in_pass1 = True
                            
                            if not found_in_pass1:
                                parsed_positions.append({"pos": pos, "type": "SCROLLING", "preview": ""})
                    else:
                        parsed_positions.append({"pos": pos, "type": "SCROLLING", "preview": ""})

        last_newline = raw_bytes.rfind(b'\n')
        current_prompt_pos = last_newline + 1 if last_newline != -1 else 0
        current_end = len(raw_bytes)

        for i, item in enumerate(parsed_positions):
            if item["type"] == "VALID_CMD":
                start_pos = item["pos"]
                preview = item["preview"]
                
                # Find the end position: next VALID_CMD or EMPTY_PROMPT or CANCELLED
                end_pos = current_prompt_pos
                for j in range(i + 1, len(parsed_positions)):
                    next_item = parsed_positions[j]
                    if next_item["type"] in ("VALID_CMD", "EMPTY_PROMPT", "CANCELLED"):
                        end_pos = next_item["pos"]
                        break
                
                blocks.append((start_pos, end_pos, preview))

        # Always ensure there is a final block representing the current prompt
        if not blocks:
            blocks.append((current_prompt_pos, current_end, last_line[:80] if last_line else "CURRENT CONTEXT"))
        elif blocks[-1][0] < current_prompt_pos:
            blocks.append((current_prompt_pos, current_end, last_line[:80] if last_line else "CURRENT CONTEXT"))

        return blocks

    def process_copilot_input(self, input_text: str, session_state: dict) -> dict:
        """Parses slash commands and manages session state. Returns directive dict."""
        text = input_text.strip()
        if not text.startswith('/'):
            return {"action": "execute", "clean_prompt": text, "overrides": {}}
            
        parts = text.split(maxsplit=1)
        cmd = parts[0].lower()
        args = parts[1] if len(parts) > 1 else ""
        
        # 1. State Commands (Persistent)
        if cmd == "/os":
            if args:
                session_state['os'] = args
                return {"action": "state_update", "message": f"OS context changed to {args}"}
        elif cmd == "/prompt":
            if args:
                session_state['prompt'] = args
                return {"action": "state_update", "message": f"Prompt regex changed to {args}"}
        elif cmd == "/memorize":
            if args:
                session_state['memories'].append(args)
                return {"action": "state_update", "message": f"Memory added: {args}"}
        elif cmd == "/clear":
            session_state['memories'] = []
            return {"action": "state_update", "message": "Memory cleared"}
            
        # 2. Hybrid Commands
        elif cmd == "/architect":
            if not args:
                session_state['persona'] = 'architect'
                return {"action": "state_update", "message": "Persona set to Architect"}
            else:
                return {"action": "execute", "clean_prompt": args, "overrides": {"persona": "architect"}}
                
        elif cmd == "/engineer":
            if not args:
                session_state['persona'] = 'engineer'
                return {"action": "state_update", "message": "Persona set to Engineer"}
            else:
                return {"action": "execute", "clean_prompt": args, "overrides": {"persona": "engineer"}}
                
        elif cmd == "/trust":
            if not args:
                session_state['trust_mode'] = True
                return {"action": "state_update", "message": "Auto-execute (trust) enabled for session"}
            else:
                return {"action": "execute", "clean_prompt": args, "overrides": {"trust": True}}
                
        elif cmd == "/untrust":
            if not args:
                session_state['trust_mode'] = False
                return {"action": "state_update", "message": "Auto-execute (trust) disabled for session"}
            else:
                return {"action": "execute", "clean_prompt": args, "overrides": {"trust": False}}

        # Unknown command, execute normally
        return {"action": "execute", "clean_prompt": text, "overrides": {}}

    def ask(self, input_text, dryrun=False, chat_history=None, status=None, debug=False, session_id=None, console=None, chunk_callback=None, confirm_handler=None, trust=False, **overrides):
        """Send a prompt to the AI agent."""
        from connpy.ai import ai
        agent = ai(self.config, console=console, confirm_handler=confirm_handler, trust=trust, **overrides)
        return agent.ask(input_text, dryrun, chat_history, status=status, debug=debug, session_id=session_id, chunk_callback=chunk_callback)


    def confirm(self, input_text, console=None):
        """Ask for a safe confirmation of an action."""
        from connpy.ai import ai
        agent = ai(self.config, console=console)
        return agent.confirm(input_text)

    def ask_copilot(self, terminal_buffer, user_question, node_info=None, chunk_callback=None):
        """Ask the AI copilot for terminal assistance."""
        from connpy.ai import ai, run_ai_async
        agent = ai(self.config)
        future = run_ai_async(agent.aask_copilot(terminal_buffer, user_question, node_info, chunk_callback=chunk_callback))
        return future.result()

    async def aask_copilot(self, terminal_buffer, user_question, node_info=None, chunk_callback=None):
        """Ask the AI copilot for terminal assistance asynchronously."""
        from connpy.ai import ai, run_ai_async
        import asyncio
        agent = ai(self.config)
        future = run_ai_async(agent.aask_copilot(terminal_buffer, user_question, node_info, chunk_callback=chunk_callback))
        return await asyncio.wrap_future(future)


    def list_sessions(self, limit=None):
        """Return a list of saved AI sessions, optionally limited."""
        from connpy.ai import ai
        agent = ai(self.config)
        sessions = agent._get_sessions()
        if limit and len(sessions) > limit:
            return sessions[:limit], len(sessions)
        return sessions, len(sessions)

    def delete_session(self, session_id):
        """Delete an AI session by ID."""
        import os
        sessions_dir = os.path.join(self.config.defaultdir, "ai_sessions")
        path = os.path.join(sessions_dir, f"{session_id}.json")
        if os.path.exists(path):
            os.remove(path)
        else:
            raise InvalidConfigurationError(f"Session '{session_id}' not found.")

    def configure_provider(self, provider, model=None, api_key=None, auth=None):
        """Update AI provider settings in the configuration."""
        settings = self.config.config.get("ai", {})
        if model:
            settings[f"{provider}_model"] = model
        if api_key:
            settings[f"{provider}_api_key"] = api_key
        if auth is not None:
            settings[f"{provider}_auth"] = auth
            
        self.config.config["ai"] = settings
        self.config._saveconfig(self.config.file)

    def configure_mcp(self, name, url=None, enabled=None, auto_load_on_os=None, remove=False):
        """Update MCP server settings in the configuration with smart merging."""
        ai_settings = self.config.config.get("ai", {})
        mcp_servers = ai_settings.get("mcp_servers", {})
        
        if remove:
            if name in mcp_servers:
                del mcp_servers[name]
        else:
            # Get existing or new
            server_cfg = mcp_servers.get(name, {})
            
            # Partial updates
            if url is not None:
                server_cfg["url"] = url
            
            if enabled is not None:
                server_cfg["enabled"] = bool(enabled)
            elif "enabled" not in server_cfg:
                server_cfg["enabled"] = True # Default for new entries
                
            if auto_load_on_os is not None:
                if auto_load_on_os == "": # Explicit clear
                    if "auto_load_on_os" in server_cfg:
                        del server_cfg["auto_load_on_os"]
                else:
                    server_cfg["auto_load_on_os"] = auto_load_on_os
            
            mcp_servers[name] = server_cfg
            
        ai_settings["mcp_servers"] = mcp_servers
        self.config.config["ai"] = ai_settings
        self.config._saveconfig(self.config.file)

    def list_mcp_servers(self) -> dict:
        """Get the configured MCP servers."""
        if hasattr(self.config, "get_effective_setting"):
            ai_settings = self.config.get_effective_setting("ai", {})
        else:
            ai_settings = self.config.config.get("ai", {}) if hasattr(self.config, "config") else {}
        return ai_settings.get("mcp_servers", {})

    def load_session_data(self, session_id):
        """Load a session's raw data by ID."""
        from connpy.ai import ai
        agent = ai(self.config)
        return agent.load_session_data(session_id)

    def build_playbook_chat(self, user_input: str, chat_history: list = None, status=None, chunk_callback=None):
        """Interact with the specialized Playbook Builder Agent."""
        from connpy.ai import PlaybookBuilderAgent
        agent = PlaybookBuilderAgent(self.config)
        return agent.ask(user_input, chat_history=chat_history, status=status, chunk_callback=chunk_callback)

    def analyze_execution_results(self, results: dict, query: str = None, status=None, chunk_callback=None):
        """Analyze actual command execution results using Network Architect 1-shot."""
        import json
        results_str = json.dumps(results, indent=2)
        
        prompt = f"@architect: Please analyze the following actual execution results. Diagnose any issues, highlight successful actions, and suggest strategic remediation steps if needed."
        if query:
            prompt += f"\nSpecific user request: {query}"
        prompt += f"\n\nResults Data:\n{results_str}"
        prompt += "\n\nCRITICAL DIRECTIVE: You are running in a strictly 1-shot offline diagnostics mode (--analyze). There is no active conversation loop, and you are NOT conversing with a Network Engineer. You MUST deliver your complete strategic analysis immediately. DO NOT suggest, mention, or attempt to delegate the session back to the engineer."
        
        # Delegate to self.ask, setting stream=True and forwarding callback/status.
        # This will invoke standard ai.ask with '@architect:' prefix, forcing 1-shot architect brain.
        return self.ask(prompt, status=status, chunk_callback=chunk_callback, one_shot=True)

    def predict_execution_results(self, target_nodes: list, commands: list, status=None, chunk_callback=None):
        """Predict and simulate execution results preventively using the Preflight Simulation Agent (1-shot)."""
        nodes_str = ", ".join(target_nodes)
        commands_str = "\n".join(f"- {cmd}" for cmd in commands)
        
        prompt = f"@engineer: Act as a Preflight Simulation Agent. Simulate and predict the expected outputs and behaviors of the following commands on the target nodes. Alert about potential safety or configuration risks based on node profiles."
        prompt += f"\n\nTarget Nodes: {nodes_str}"
        prompt += f"\nCommands to simulate:\n{commands_str}"
        prompt += "\n\nCRITICAL SCALABILITY DIRECTIVE: If there are many target nodes, DO NOT list predictions node-by-node. Instead, group them by Operating System, vendor, or platform, and provide a highly concise Executive Summary. Detail individual risks only for nodes that present specific anomalies or security concerns. Focus on overall impact."
        
        # Delegate to self.ask, using the standard engineer brain but with the simulated preflight prompt.
        return self.ask(prompt, status=status, chunk_callback=chunk_callback)

Business logic for interacting with AI agents and LLM configurations.

Initialize the service.

Args

config
An instance of configfile (or None to instantiate a new one/use global context).

Ancestors

Methods

async def aask_copilot(self, terminal_buffer, user_question, node_info=None, chunk_callback=None)
Expand source code
async def aask_copilot(self, terminal_buffer, user_question, node_info=None, chunk_callback=None):
    """Ask the AI copilot for terminal assistance asynchronously."""
    from connpy.ai import ai, run_ai_async
    import asyncio
    agent = ai(self.config)
    future = run_ai_async(agent.aask_copilot(terminal_buffer, user_question, node_info, chunk_callback=chunk_callback))
    return await asyncio.wrap_future(future)

Ask the AI copilot for terminal assistance asynchronously.

def analyze_execution_results(self, results: dict, query: str = None, status=None, chunk_callback=None)
Expand source code
def analyze_execution_results(self, results: dict, query: str = None, status=None, chunk_callback=None):
    """Analyze actual command execution results using Network Architect 1-shot."""
    import json
    results_str = json.dumps(results, indent=2)
    
    prompt = f"@architect: Please analyze the following actual execution results. Diagnose any issues, highlight successful actions, and suggest strategic remediation steps if needed."
    if query:
        prompt += f"\nSpecific user request: {query}"
    prompt += f"\n\nResults Data:\n{results_str}"
    prompt += "\n\nCRITICAL DIRECTIVE: You are running in a strictly 1-shot offline diagnostics mode (--analyze). There is no active conversation loop, and you are NOT conversing with a Network Engineer. You MUST deliver your complete strategic analysis immediately. DO NOT suggest, mention, or attempt to delegate the session back to the engineer."
    
    # Delegate to self.ask, setting stream=True and forwarding callback/status.
    # This will invoke standard ai.ask with '@architect:' prefix, forcing 1-shot architect brain.
    return self.ask(prompt, status=status, chunk_callback=chunk_callback, one_shot=True)

Analyze actual command execution results using Network Architect 1-shot.

def ask(self,
input_text,
dryrun=False,
chat_history=None,
status=None,
debug=False,
session_id=None,
console=None,
chunk_callback=None,
confirm_handler=None,
trust=False,
**overrides)
Expand source code
def ask(self, input_text, dryrun=False, chat_history=None, status=None, debug=False, session_id=None, console=None, chunk_callback=None, confirm_handler=None, trust=False, **overrides):
    """Send a prompt to the AI agent."""
    from connpy.ai import ai
    agent = ai(self.config, console=console, confirm_handler=confirm_handler, trust=trust, **overrides)
    return agent.ask(input_text, dryrun, chat_history, status=status, debug=debug, session_id=session_id, chunk_callback=chunk_callback)

Send a prompt to the AI agent.

def ask_copilot(self, terminal_buffer, user_question, node_info=None, chunk_callback=None)
Expand source code
def ask_copilot(self, terminal_buffer, user_question, node_info=None, chunk_callback=None):
    """Ask the AI copilot for terminal assistance."""
    from connpy.ai import ai, run_ai_async
    agent = ai(self.config)
    future = run_ai_async(agent.aask_copilot(terminal_buffer, user_question, node_info, chunk_callback=chunk_callback))
    return future.result()

Ask the AI copilot for terminal assistance.

def build_context_blocks(self,
raw_bytes: bytes,
cmd_byte_positions: list,
node_info: dict,
last_line: str = '') ‑> list
Expand source code
def build_context_blocks(self, raw_bytes: bytes, cmd_byte_positions: list, node_info: dict, last_line: str = "") -> list:
    """Identifies command blocks in the terminal history."""
    blocks = []
    if not raw_bytes:
        return blocks
        
    default_prompt = r'>$|#$|\$$|>.$|#.$|\$.$'
    device_prompt = node_info.get("prompt", default_prompt) if isinstance(node_info, dict) else default_prompt
    prompt_re_str = re.sub(r'(?<!\\)\$', '', device_prompt)
    try:
        prompt_re = re.compile(prompt_re_str)
    except Exception:
        prompt_re = re.compile(re.sub(r'(?<!\\)\$', '', default_prompt))
        
    parsed_positions = []
    if cmd_byte_positions and len(cmd_byte_positions) >= 1:
        for i in range(1, len(cmd_byte_positions)):
            pos, known_cmd = cmd_byte_positions[i]
            prev_pos = cmd_byte_positions[i-1][0]
            
            if known_cmd:
                if known_cmd == "CANCELLED":
                    parsed_positions.append({"pos": pos, "type": "CANCELLED", "preview": ""})
                else:
                    prev_chunk = raw_bytes[prev_pos:pos]
                    prev_cleaned = self._clean_cisco_scrolling(prev_chunk.decode(errors='replace'))
                    prev_lines = [l for l in prev_cleaned.split('\n') if l.strip()]
                    prompt_text = prev_lines[-1].strip() if prev_lines else ""
                    preview = f"{prompt_text}{known_cmd}" if prompt_text else known_cmd
                    
                    if len(preview) > 80:
                        preview = preview[:77] + "..."
                    parsed_positions.append({"pos": pos, "type": "VALID_CMD", "preview": preview})
            else:
                chunk = raw_bytes[prev_pos:pos]
                
                cleaned = self._clean_cisco_scrolling(chunk.decode(errors='replace'))
                lines = [l for l in cleaned.split('\n') if l.strip()]
                
                found_in_pass1 = False
                if lines:
                    # Search backwards through the last few lines for the prompt
                    for idx in range(len(lines) - 1, max(-1, len(lines) - 10), -1):
                        match = prompt_re.search(lines[idx])
                        if match:
                            ptxt = match.group(0).strip()
                            cmd_first_line = lines[idx][match.end():].strip()
                            cmd_rest = [l.strip() for l in lines[idx+1:]]
                            cmd_text = " ".join([cmd_first_line] + cmd_rest).strip()
                            
                            if cmd_text:
                                pv = f"{ptxt} {cmd_text}".strip()
                                if len(pv) > 80:
                                    pv = pv[:77] + "..."
                                parsed_positions.append({"pos": pos, "type": "VALID_CMD", "preview": pv})
                            else:
                                parsed_positions.append({"pos": pos, "type": "EMPTY_PROMPT", "preview": ""})
                            found_in_pass1 = True
                            break
                    
                    if not found_in_pass1:
                        # Fallback: The prompt might have been isolated in the previous chunk 
                        # due to asynchronous network delays splitting the output exactly at the newline.
                        prev_was_valid_cmd = i >= 2 and parsed_positions[i-2]["type"] == "VALID_CMD"
                        if prev_pos > 0 and not prev_was_valid_cmd:
                            # Fetch the very last chunk that we just processed
                            prev_prev_pos = cmd_byte_positions[i-2][0] if i >= 2 else 0
                            prev_chunk_text = self._clean_cisco_scrolling(raw_bytes[prev_prev_pos:prev_pos].decode(errors='replace'))
                            prev_lines_text = [l for l in prev_chunk_text.split('\n') if l.strip()]
                            
                            if prev_lines_text:
                                prev_match = prompt_re.search(prev_lines_text[-1])
                                if prev_match:
                                    ptxt = prev_match.group(0).strip()
                                    cmd_text = " ".join([l.strip() for l in lines]).strip()
                                    if cmd_text:
                                        pv = f"{ptxt} {cmd_text}".strip()
                                        if len(pv) > 80:
                                            pv = pv[:77] + "..."
                                        parsed_positions.append({"pos": pos, "type": "VALID_CMD", "preview": pv})
                                        found_in_pass1 = True
                        
                        if not found_in_pass1:
                            parsed_positions.append({"pos": pos, "type": "SCROLLING", "preview": ""})
                else:
                    parsed_positions.append({"pos": pos, "type": "SCROLLING", "preview": ""})

    last_newline = raw_bytes.rfind(b'\n')
    current_prompt_pos = last_newline + 1 if last_newline != -1 else 0
    current_end = len(raw_bytes)

    for i, item in enumerate(parsed_positions):
        if item["type"] == "VALID_CMD":
            start_pos = item["pos"]
            preview = item["preview"]
            
            # Find the end position: next VALID_CMD or EMPTY_PROMPT or CANCELLED
            end_pos = current_prompt_pos
            for j in range(i + 1, len(parsed_positions)):
                next_item = parsed_positions[j]
                if next_item["type"] in ("VALID_CMD", "EMPTY_PROMPT", "CANCELLED"):
                    end_pos = next_item["pos"]
                    break
            
            blocks.append((start_pos, end_pos, preview))

    # Always ensure there is a final block representing the current prompt
    if not blocks:
        blocks.append((current_prompt_pos, current_end, last_line[:80] if last_line else "CURRENT CONTEXT"))
    elif blocks[-1][0] < current_prompt_pos:
        blocks.append((current_prompt_pos, current_end, last_line[:80] if last_line else "CURRENT CONTEXT"))

    return blocks

Identifies command blocks in the terminal history.

def build_playbook_chat(self, user_input: str, chat_history: list = None, status=None, chunk_callback=None)
Expand source code
def build_playbook_chat(self, user_input: str, chat_history: list = None, status=None, chunk_callback=None):
    """Interact with the specialized Playbook Builder Agent."""
    from connpy.ai import PlaybookBuilderAgent
    agent = PlaybookBuilderAgent(self.config)
    return agent.ask(user_input, chat_history=chat_history, status=status, chunk_callback=chunk_callback)

Interact with the specialized Playbook Builder Agent.

def configure_mcp(self, name, url=None, enabled=None, auto_load_on_os=None, remove=False)
Expand source code
def configure_mcp(self, name, url=None, enabled=None, auto_load_on_os=None, remove=False):
    """Update MCP server settings in the configuration with smart merging."""
    ai_settings = self.config.config.get("ai", {})
    mcp_servers = ai_settings.get("mcp_servers", {})
    
    if remove:
        if name in mcp_servers:
            del mcp_servers[name]
    else:
        # Get existing or new
        server_cfg = mcp_servers.get(name, {})
        
        # Partial updates
        if url is not None:
            server_cfg["url"] = url
        
        if enabled is not None:
            server_cfg["enabled"] = bool(enabled)
        elif "enabled" not in server_cfg:
            server_cfg["enabled"] = True # Default for new entries
            
        if auto_load_on_os is not None:
            if auto_load_on_os == "": # Explicit clear
                if "auto_load_on_os" in server_cfg:
                    del server_cfg["auto_load_on_os"]
            else:
                server_cfg["auto_load_on_os"] = auto_load_on_os
        
        mcp_servers[name] = server_cfg
        
    ai_settings["mcp_servers"] = mcp_servers
    self.config.config["ai"] = ai_settings
    self.config._saveconfig(self.config.file)

Update MCP server settings in the configuration with smart merging.

def configure_provider(self, provider, model=None, api_key=None, auth=None)
Expand source code
def configure_provider(self, provider, model=None, api_key=None, auth=None):
    """Update AI provider settings in the configuration."""
    settings = self.config.config.get("ai", {})
    if model:
        settings[f"{provider}_model"] = model
    if api_key:
        settings[f"{provider}_api_key"] = api_key
    if auth is not None:
        settings[f"{provider}_auth"] = auth
        
    self.config.config["ai"] = settings
    self.config._saveconfig(self.config.file)

Update AI provider settings in the configuration.

def confirm(self, input_text, console=None)
Expand source code
def confirm(self, input_text, console=None):
    """Ask for a safe confirmation of an action."""
    from connpy.ai import ai
    agent = ai(self.config, console=console)
    return agent.confirm(input_text)

Ask for a safe confirmation of an action.

def delete_session(self, session_id)
Expand source code
def delete_session(self, session_id):
    """Delete an AI session by ID."""
    import os
    sessions_dir = os.path.join(self.config.defaultdir, "ai_sessions")
    path = os.path.join(sessions_dir, f"{session_id}.json")
    if os.path.exists(path):
        os.remove(path)
    else:
        raise InvalidConfigurationError(f"Session '{session_id}' not found.")

Delete an AI session by ID.

def list_mcp_servers(self) ‑> dict
Expand source code
def list_mcp_servers(self) -> dict:
    """Get the configured MCP servers."""
    if hasattr(self.config, "get_effective_setting"):
        ai_settings = self.config.get_effective_setting("ai", {})
    else:
        ai_settings = self.config.config.get("ai", {}) if hasattr(self.config, "config") else {}
    return ai_settings.get("mcp_servers", {})

Get the configured MCP servers.

def list_sessions(self, limit=None)
Expand source code
def list_sessions(self, limit=None):
    """Return a list of saved AI sessions, optionally limited."""
    from connpy.ai import ai
    agent = ai(self.config)
    sessions = agent._get_sessions()
    if limit and len(sessions) > limit:
        return sessions[:limit], len(sessions)
    return sessions, len(sessions)

Return a list of saved AI sessions, optionally limited.

def load_session_data(self, session_id)
Expand source code
def load_session_data(self, session_id):
    """Load a session's raw data by ID."""
    from connpy.ai import ai
    agent = ai(self.config)
    return agent.load_session_data(session_id)

Load a session's raw data by ID.

def predict_execution_results(self, target_nodes: list, commands: list, status=None, chunk_callback=None)
Expand source code
def predict_execution_results(self, target_nodes: list, commands: list, status=None, chunk_callback=None):
    """Predict and simulate execution results preventively using the Preflight Simulation Agent (1-shot)."""
    nodes_str = ", ".join(target_nodes)
    commands_str = "\n".join(f"- {cmd}" for cmd in commands)
    
    prompt = f"@engineer: Act as a Preflight Simulation Agent. Simulate and predict the expected outputs and behaviors of the following commands on the target nodes. Alert about potential safety or configuration risks based on node profiles."
    prompt += f"\n\nTarget Nodes: {nodes_str}"
    prompt += f"\nCommands to simulate:\n{commands_str}"
    prompt += "\n\nCRITICAL SCALABILITY DIRECTIVE: If there are many target nodes, DO NOT list predictions node-by-node. Instead, group them by Operating System, vendor, or platform, and provide a highly concise Executive Summary. Detail individual risks only for nodes that present specific anomalies or security concerns. Focus on overall impact."
    
    # Delegate to self.ask, using the standard engineer brain but with the simulated preflight prompt.
    return self.ask(prompt, status=status, chunk_callback=chunk_callback)

Predict and simulate execution results preventively using the Preflight Simulation Agent (1-shot).

def process_copilot_input(self, input_text: str, session_state: dict) ‑> dict
Expand source code
def process_copilot_input(self, input_text: str, session_state: dict) -> dict:
    """Parses slash commands and manages session state. Returns directive dict."""
    text = input_text.strip()
    if not text.startswith('/'):
        return {"action": "execute", "clean_prompt": text, "overrides": {}}
        
    parts = text.split(maxsplit=1)
    cmd = parts[0].lower()
    args = parts[1] if len(parts) > 1 else ""
    
    # 1. State Commands (Persistent)
    if cmd == "/os":
        if args:
            session_state['os'] = args
            return {"action": "state_update", "message": f"OS context changed to {args}"}
    elif cmd == "/prompt":
        if args:
            session_state['prompt'] = args
            return {"action": "state_update", "message": f"Prompt regex changed to {args}"}
    elif cmd == "/memorize":
        if args:
            session_state['memories'].append(args)
            return {"action": "state_update", "message": f"Memory added: {args}"}
    elif cmd == "/clear":
        session_state['memories'] = []
        return {"action": "state_update", "message": "Memory cleared"}
        
    # 2. Hybrid Commands
    elif cmd == "/architect":
        if not args:
            session_state['persona'] = 'architect'
            return {"action": "state_update", "message": "Persona set to Architect"}
        else:
            return {"action": "execute", "clean_prompt": args, "overrides": {"persona": "architect"}}
            
    elif cmd == "/engineer":
        if not args:
            session_state['persona'] = 'engineer'
            return {"action": "state_update", "message": "Persona set to Engineer"}
        else:
            return {"action": "execute", "clean_prompt": args, "overrides": {"persona": "engineer"}}
            
    elif cmd == "/trust":
        if not args:
            session_state['trust_mode'] = True
            return {"action": "state_update", "message": "Auto-execute (trust) enabled for session"}
        else:
            return {"action": "execute", "clean_prompt": args, "overrides": {"trust": True}}
            
    elif cmd == "/untrust":
        if not args:
            session_state['trust_mode'] = False
            return {"action": "state_update", "message": "Auto-execute (trust) disabled for session"}
        else:
            return {"action": "execute", "clean_prompt": args, "overrides": {"trust": False}}

    # Unknown command, execute normally
    return {"action": "execute", "clean_prompt": text, "overrides": {}}

Parses slash commands and manages session state. Returns directive dict.

Inherited members