Module connpy.services.ai_service

Classes

class AIService (config=None)
Expand source code
class AIService(BaseService):
    """Business logic for interacting with AI agents and LLM configurations."""

    def _clean_cisco_scrolling(self, text: str) -> str:
        """Resolves horizontal scrolling artifacts (backspaces, \r, ANSI) by merging overlapping segments."""
        def merge_overlapping(s1, s2):
            s2_clean = s2.lstrip(' $')
            max_overlap = min(len(s1), len(s2_clean))
            for i in range(max_overlap, 0, -1):
                if s1[-i:] == s2_clean[:i]:
                    return s1 + s2_clean[i:]
            return s1 + s2_clean

        scroll_re = re.compile(r'(\x08{5,}\s*\$?|\$\r|\x1b\[\d+[GD]\s*\$?)')
        parts = scroll_re.split(text)
        merged = ""
        
        for part in parts:
            if scroll_re.match(part):
                continue
                
            cleaned = log_cleaner(part)
            if not merged:
                merged = cleaned
            else:
                merged_lines = merged.split('\n')
                cleaned_lines = cleaned.split('\n')
                
                merged_lines[-1] = merge_overlapping(merged_lines[-1], cleaned_lines[0])
                merged_lines.extend(cleaned_lines[1:])
                merged = "\n".join(merged_lines)
                
        return merged

    def build_context_blocks(self, raw_bytes: bytes, cmd_byte_positions: list, node_info: dict, last_line: str = "") -> list:
        """Identifies command blocks in the terminal history."""
        blocks = []
        if not raw_bytes:
            return blocks
            
        default_prompt = r'>$|#$|\$$|>.$|#.$|\$.$'
        device_prompt = node_info.get("prompt", default_prompt) if isinstance(node_info, dict) else default_prompt
        prompt_re_str = re.sub(r'(?<!\\)\$', '', device_prompt)
        try:
            prompt_re = re.compile(prompt_re_str)
        except Exception:
            prompt_re = re.compile(re.sub(r'(?<!\\)\$', '', default_prompt))
            
        parsed_positions = []
        if cmd_byte_positions and len(cmd_byte_positions) >= 1:
            for i in range(1, len(cmd_byte_positions)):
                pos, known_cmd = cmd_byte_positions[i]
                prev_pos = cmd_byte_positions[i-1][0]
                
                if known_cmd:
                    if known_cmd == "CANCELLED":
                        parsed_positions.append({"pos": pos, "type": "CANCELLED", "preview": ""})
                    else:
                        prev_chunk = raw_bytes[prev_pos:pos]
                        prev_cleaned = self._clean_cisco_scrolling(prev_chunk.decode(errors='replace'))
                        prev_lines = [l for l in prev_cleaned.split('\n') if l.strip()]
                        prompt_text = prev_lines[-1].strip() if prev_lines else ""
                        preview = f"{prompt_text}{known_cmd}" if prompt_text else known_cmd
                        
                        if len(preview) > 80:
                            preview = preview[:77] + "..."
                        parsed_positions.append({"pos": pos, "type": "VALID_CMD", "preview": preview})
                else:
                    chunk = raw_bytes[prev_pos:pos]
                    
                    cleaned = self._clean_cisco_scrolling(chunk.decode(errors='replace'))
                    lines = [l for l in cleaned.split('\n') if l.strip()]
                    
                    found_in_pass1 = False
                    if lines:
                        # Search backwards through the last few lines for the prompt
                        for idx in range(len(lines) - 1, max(-1, len(lines) - 10), -1):
                            match = prompt_re.search(lines[idx])
                            if match:
                                ptxt = match.group(0).strip()
                                cmd_first_line = lines[idx][match.end():].strip()
                                cmd_rest = [l.strip() for l in lines[idx+1:]]
                                cmd_text = " ".join([cmd_first_line] + cmd_rest).strip()
                                
                                if cmd_text:
                                    pv = f"{ptxt} {cmd_text}".strip()
                                    if len(pv) > 80:
                                        pv = pv[:77] + "..."
                                    parsed_positions.append({"pos": pos, "type": "VALID_CMD", "preview": pv})
                                else:
                                    parsed_positions.append({"pos": pos, "type": "EMPTY_PROMPT", "preview": ""})
                                found_in_pass1 = True
                                break
                        
                        if not found_in_pass1:
                            # Fallback: The prompt might have been isolated in the previous chunk 
                            # due to asynchronous network delays splitting the output exactly at the newline.
                            prev_was_valid_cmd = i >= 2 and parsed_positions[i-2]["type"] == "VALID_CMD"
                            if prev_pos > 0 and not prev_was_valid_cmd:
                                # Fetch the very last chunk that we just processed
                                prev_prev_pos = cmd_byte_positions[i-2][0] if i >= 2 else 0
                                prev_chunk_text = self._clean_cisco_scrolling(raw_bytes[prev_prev_pos:prev_pos].decode(errors='replace'))
                                prev_lines_text = [l for l in prev_chunk_text.split('\n') if l.strip()]
                                
                                if prev_lines_text:
                                    prev_match = prompt_re.search(prev_lines_text[-1])
                                    if prev_match:
                                        ptxt = prev_match.group(0).strip()
                                        cmd_text = " ".join([l.strip() for l in lines]).strip()
                                        if cmd_text:
                                            pv = f"{ptxt} {cmd_text}".strip()
                                            if len(pv) > 80:
                                                pv = pv[:77] + "..."
                                            parsed_positions.append({"pos": pos, "type": "VALID_CMD", "preview": pv})
                                            found_in_pass1 = True
                            
                            if not found_in_pass1:
                                parsed_positions.append({"pos": pos, "type": "SCROLLING", "preview": ""})
                    else:
                        parsed_positions.append({"pos": pos, "type": "SCROLLING", "preview": ""})

        last_newline = raw_bytes.rfind(b'\n')
        current_prompt_pos = last_newline + 1 if last_newline != -1 else 0
        current_end = len(raw_bytes)

        for i, item in enumerate(parsed_positions):
            if item["type"] == "VALID_CMD":
                start_pos = item["pos"]
                preview = item["preview"]
                
                # Find the end position: next VALID_CMD or EMPTY_PROMPT or CANCELLED
                end_pos = current_prompt_pos
                for j in range(i + 1, len(parsed_positions)):
                    next_item = parsed_positions[j]
                    if next_item["type"] in ("VALID_CMD", "EMPTY_PROMPT", "CANCELLED"):
                        end_pos = next_item["pos"]
                        break
                
                blocks.append((start_pos, end_pos, preview))

        # Always ensure there is a final block representing the current prompt
        if not blocks:
            blocks.append((current_prompt_pos, current_end, last_line[:80] if last_line else "CURRENT CONTEXT"))
        elif blocks[-1][0] < current_prompt_pos:
            blocks.append((current_prompt_pos, current_end, last_line[:80] if last_line else "CURRENT CONTEXT"))

        return blocks

    def process_copilot_input(self, input_text: str, session_state: dict) -> dict:
        """Parses slash commands and manages session state. Returns directive dict."""
        text = input_text.strip()
        if not text.startswith('/'):
            return {"action": "execute", "clean_prompt": text, "overrides": {}}
            
        parts = text.split(maxsplit=1)
        cmd = parts[0].lower()
        args = parts[1] if len(parts) > 1 else ""
        
        # 1. State Commands (Persistent)
        if cmd == "/os":
            if args:
                session_state['os'] = args
                return {"action": "state_update", "message": f"OS context changed to {args}"}
        elif cmd == "/prompt":
            if args:
                session_state['prompt'] = args
                return {"action": "state_update", "message": f"Prompt regex changed to {args}"}
        elif cmd == "/memorize":
            if args:
                session_state['memories'].append(args)
                return {"action": "state_update", "message": f"Memory added: {args}"}
        elif cmd == "/clear":
            session_state['memories'] = []
            return {"action": "state_update", "message": "Memory cleared"}
            
        # 2. Hybrid Commands
        elif cmd == "/architect":
            if not args:
                session_state['persona'] = 'architect'
                return {"action": "state_update", "message": "Persona set to Architect"}
            else:
                return {"action": "execute", "clean_prompt": args, "overrides": {"persona": "architect"}}
                
        elif cmd == "/engineer":
            if not args:
                session_state['persona'] = 'engineer'
                return {"action": "state_update", "message": "Persona set to Engineer"}
            else:
                return {"action": "execute", "clean_prompt": args, "overrides": {"persona": "engineer"}}
                
        elif cmd == "/trust":
            if not args:
                session_state['trust_mode'] = True
                return {"action": "state_update", "message": "Auto-execute (trust) enabled for session"}
            else:
                return {"action": "execute", "clean_prompt": args, "overrides": {"trust": True}}
                
        elif cmd == "/untrust":
            if not args:
                session_state['trust_mode'] = False
                return {"action": "state_update", "message": "Auto-execute (trust) disabled for session"}
            else:
                return {"action": "execute", "clean_prompt": args, "overrides": {"trust": False}}

        # Unknown command, execute normally
        return {"action": "execute", "clean_prompt": text, "overrides": {}}

    def ask(self, input_text, dryrun=False, chat_history=None, status=None, debug=False, session_id=None, console=None, chunk_callback=None, confirm_handler=None, trust=False, **overrides):
        """Send a prompt to the AI agent."""
        from connpy.ai import ai
        agent = ai(self.config, console=console, confirm_handler=confirm_handler, trust=trust, **overrides)
        return agent.ask(input_text, dryrun, chat_history, status=status, debug=debug, session_id=session_id, chunk_callback=chunk_callback)


    def confirm(self, input_text, console=None):
        """Ask for a safe confirmation of an action."""
        from connpy.ai import ai
        agent = ai(self.config, console=console)
        return agent.confirm(input_text)

    def ask_copilot(self, terminal_buffer, user_question, node_info=None, chunk_callback=None):
        """Ask the AI copilot for terminal assistance."""
        from connpy.ai import ai, run_ai_async
        agent = ai(self.config)
        future = run_ai_async(agent.aask_copilot(terminal_buffer, user_question, node_info, chunk_callback=chunk_callback))
        return future.result()

    async def aask_copilot(self, terminal_buffer, user_question, node_info=None, chunk_callback=None):
        """Ask the AI copilot for terminal assistance asynchronously."""
        from connpy.ai import ai, run_ai_async
        import asyncio
        agent = ai(self.config)
        future = run_ai_async(agent.aask_copilot(terminal_buffer, user_question, node_info, chunk_callback=chunk_callback))
        return await asyncio.wrap_future(future)


    def list_sessions(self, limit=None):
        """Return a list of saved AI sessions, optionally limited."""
        from connpy.ai import ai
        agent = ai(self.config)
        sessions = agent._get_sessions()
        if limit and len(sessions) > limit:
            return sessions[:limit], len(sessions)
        return sessions, len(sessions)

    def delete_session(self, session_id):
        """Delete an AI session by ID."""
        import os
        sessions_dir = os.path.join(self.config.defaultdir, "ai_sessions")
        path = os.path.join(sessions_dir, f"{session_id}.json")
        if os.path.exists(path):
            os.remove(path)
        else:
            raise InvalidConfigurationError(f"Session '{session_id}' not found.")

    def configure_provider(self, provider, model=None, api_key=None, auth=None):
        """Update AI provider settings in the configuration."""
        settings = self.config.config.get("ai", {})
        if model:
            settings[f"{provider}_model"] = model
        if api_key:
            settings[f"{provider}_api_key"] = api_key
        if auth is not None:
            settings[f"{provider}_auth"] = auth
            
        self.config.config["ai"] = settings
        self.config._saveconfig(self.config.file)

    def configure_mcp(self, name, url=None, enabled=None, auto_load_on_os=None, remove=False):
        """Update MCP server settings in the configuration with smart merging."""
        ai_settings = self.config.config.get("ai", {})
        mcp_servers = ai_settings.get("mcp_servers", {})
        
        if remove:
            if name in mcp_servers:
                del mcp_servers[name]
        else:
            # Get existing or new
            server_cfg = mcp_servers.get(name, {})
            
            # Partial updates
            if url is not None:
                server_cfg["url"] = url
            
            if enabled is not None:
                server_cfg["enabled"] = bool(enabled)
            elif "enabled" not in server_cfg:
                server_cfg["enabled"] = True # Default for new entries
                
            if auto_load_on_os is not None:
                if auto_load_on_os == "": # Explicit clear
                    if "auto_load_on_os" in server_cfg:
                        del server_cfg["auto_load_on_os"]
                else:
                    server_cfg["auto_load_on_os"] = auto_load_on_os
            
            mcp_servers[name] = server_cfg
            
        ai_settings["mcp_servers"] = mcp_servers
        self.config.config["ai"] = ai_settings
        self.config._saveconfig(self.config.file)

    def list_mcp_servers(self) -> dict:
        """Get the configured MCP servers."""
        if hasattr(self.config, "get_effective_setting"):
            ai_settings = self.config.get_effective_setting("ai", {})
        else:
            ai_settings = self.config.config.get("ai", {}) if hasattr(self.config, "config") else {}
        return ai_settings.get("mcp_servers", {})

    def load_session_data(self, session_id):
        """Load a session's raw data by ID."""
        from connpy.ai import ai
        agent = ai(self.config)
        return agent.load_session_data(session_id)

Business logic for interacting with AI agents and LLM configurations.

Initialize the service.

Args

config
An instance of configfile (or None to instantiate a new one/use global context).

Ancestors

Methods

async def aask_copilot(self, terminal_buffer, user_question, node_info=None, chunk_callback=None)
Expand source code
async def aask_copilot(self, terminal_buffer, user_question, node_info=None, chunk_callback=None):
    """Ask the AI copilot for terminal assistance asynchronously."""
    from connpy.ai import ai, run_ai_async
    import asyncio
    agent = ai(self.config)
    future = run_ai_async(agent.aask_copilot(terminal_buffer, user_question, node_info, chunk_callback=chunk_callback))
    return await asyncio.wrap_future(future)

Ask the AI copilot for terminal assistance asynchronously.

def ask(self,
input_text,
dryrun=False,
chat_history=None,
status=None,
debug=False,
session_id=None,
console=None,
chunk_callback=None,
confirm_handler=None,
trust=False,
**overrides)
Expand source code
def ask(self, input_text, dryrun=False, chat_history=None, status=None, debug=False, session_id=None, console=None, chunk_callback=None, confirm_handler=None, trust=False, **overrides):
    """Send a prompt to the AI agent."""
    from connpy.ai import ai
    agent = ai(self.config, console=console, confirm_handler=confirm_handler, trust=trust, **overrides)
    return agent.ask(input_text, dryrun, chat_history, status=status, debug=debug, session_id=session_id, chunk_callback=chunk_callback)

Send a prompt to the AI agent.

def ask_copilot(self, terminal_buffer, user_question, node_info=None, chunk_callback=None)
Expand source code
def ask_copilot(self, terminal_buffer, user_question, node_info=None, chunk_callback=None):
    """Ask the AI copilot for terminal assistance."""
    from connpy.ai import ai, run_ai_async
    agent = ai(self.config)
    future = run_ai_async(agent.aask_copilot(terminal_buffer, user_question, node_info, chunk_callback=chunk_callback))
    return future.result()

Ask the AI copilot for terminal assistance.

def build_context_blocks(self,
raw_bytes: bytes,
cmd_byte_positions: list,
node_info: dict,
last_line: str = '') ‑> list
Expand source code
def build_context_blocks(self, raw_bytes: bytes, cmd_byte_positions: list, node_info: dict, last_line: str = "") -> list:
    """Identifies command blocks in the terminal history."""
    blocks = []
    if not raw_bytes:
        return blocks
        
    default_prompt = r'>$|#$|\$$|>.$|#.$|\$.$'
    device_prompt = node_info.get("prompt", default_prompt) if isinstance(node_info, dict) else default_prompt
    prompt_re_str = re.sub(r'(?<!\\)\$', '', device_prompt)
    try:
        prompt_re = re.compile(prompt_re_str)
    except Exception:
        prompt_re = re.compile(re.sub(r'(?<!\\)\$', '', default_prompt))
        
    parsed_positions = []
    if cmd_byte_positions and len(cmd_byte_positions) >= 1:
        for i in range(1, len(cmd_byte_positions)):
            pos, known_cmd = cmd_byte_positions[i]
            prev_pos = cmd_byte_positions[i-1][0]
            
            if known_cmd:
                if known_cmd == "CANCELLED":
                    parsed_positions.append({"pos": pos, "type": "CANCELLED", "preview": ""})
                else:
                    prev_chunk = raw_bytes[prev_pos:pos]
                    prev_cleaned = self._clean_cisco_scrolling(prev_chunk.decode(errors='replace'))
                    prev_lines = [l for l in prev_cleaned.split('\n') if l.strip()]
                    prompt_text = prev_lines[-1].strip() if prev_lines else ""
                    preview = f"{prompt_text}{known_cmd}" if prompt_text else known_cmd
                    
                    if len(preview) > 80:
                        preview = preview[:77] + "..."
                    parsed_positions.append({"pos": pos, "type": "VALID_CMD", "preview": preview})
            else:
                chunk = raw_bytes[prev_pos:pos]
                
                cleaned = self._clean_cisco_scrolling(chunk.decode(errors='replace'))
                lines = [l for l in cleaned.split('\n') if l.strip()]
                
                found_in_pass1 = False
                if lines:
                    # Search backwards through the last few lines for the prompt
                    for idx in range(len(lines) - 1, max(-1, len(lines) - 10), -1):
                        match = prompt_re.search(lines[idx])
                        if match:
                            ptxt = match.group(0).strip()
                            cmd_first_line = lines[idx][match.end():].strip()
                            cmd_rest = [l.strip() for l in lines[idx+1:]]
                            cmd_text = " ".join([cmd_first_line] + cmd_rest).strip()
                            
                            if cmd_text:
                                pv = f"{ptxt} {cmd_text}".strip()
                                if len(pv) > 80:
                                    pv = pv[:77] + "..."
                                parsed_positions.append({"pos": pos, "type": "VALID_CMD", "preview": pv})
                            else:
                                parsed_positions.append({"pos": pos, "type": "EMPTY_PROMPT", "preview": ""})
                            found_in_pass1 = True
                            break
                    
                    if not found_in_pass1:
                        # Fallback: The prompt might have been isolated in the previous chunk 
                        # due to asynchronous network delays splitting the output exactly at the newline.
                        prev_was_valid_cmd = i >= 2 and parsed_positions[i-2]["type"] == "VALID_CMD"
                        if prev_pos > 0 and not prev_was_valid_cmd:
                            # Fetch the very last chunk that we just processed
                            prev_prev_pos = cmd_byte_positions[i-2][0] if i >= 2 else 0
                            prev_chunk_text = self._clean_cisco_scrolling(raw_bytes[prev_prev_pos:prev_pos].decode(errors='replace'))
                            prev_lines_text = [l for l in prev_chunk_text.split('\n') if l.strip()]
                            
                            if prev_lines_text:
                                prev_match = prompt_re.search(prev_lines_text[-1])
                                if prev_match:
                                    ptxt = prev_match.group(0).strip()
                                    cmd_text = " ".join([l.strip() for l in lines]).strip()
                                    if cmd_text:
                                        pv = f"{ptxt} {cmd_text}".strip()
                                        if len(pv) > 80:
                                            pv = pv[:77] + "..."
                                        parsed_positions.append({"pos": pos, "type": "VALID_CMD", "preview": pv})
                                        found_in_pass1 = True
                        
                        if not found_in_pass1:
                            parsed_positions.append({"pos": pos, "type": "SCROLLING", "preview": ""})
                else:
                    parsed_positions.append({"pos": pos, "type": "SCROLLING", "preview": ""})

    last_newline = raw_bytes.rfind(b'\n')
    current_prompt_pos = last_newline + 1 if last_newline != -1 else 0
    current_end = len(raw_bytes)

    for i, item in enumerate(parsed_positions):
        if item["type"] == "VALID_CMD":
            start_pos = item["pos"]
            preview = item["preview"]
            
            # Find the end position: next VALID_CMD or EMPTY_PROMPT or CANCELLED
            end_pos = current_prompt_pos
            for j in range(i + 1, len(parsed_positions)):
                next_item = parsed_positions[j]
                if next_item["type"] in ("VALID_CMD", "EMPTY_PROMPT", "CANCELLED"):
                    end_pos = next_item["pos"]
                    break
            
            blocks.append((start_pos, end_pos, preview))

    # Always ensure there is a final block representing the current prompt
    if not blocks:
        blocks.append((current_prompt_pos, current_end, last_line[:80] if last_line else "CURRENT CONTEXT"))
    elif blocks[-1][0] < current_prompt_pos:
        blocks.append((current_prompt_pos, current_end, last_line[:80] if last_line else "CURRENT CONTEXT"))

    return blocks

Identifies command blocks in the terminal history.

def configure_mcp(self, name, url=None, enabled=None, auto_load_on_os=None, remove=False)
Expand source code
def configure_mcp(self, name, url=None, enabled=None, auto_load_on_os=None, remove=False):
    """Update MCP server settings in the configuration with smart merging."""
    ai_settings = self.config.config.get("ai", {})
    mcp_servers = ai_settings.get("mcp_servers", {})
    
    if remove:
        if name in mcp_servers:
            del mcp_servers[name]
    else:
        # Get existing or new
        server_cfg = mcp_servers.get(name, {})
        
        # Partial updates
        if url is not None:
            server_cfg["url"] = url
        
        if enabled is not None:
            server_cfg["enabled"] = bool(enabled)
        elif "enabled" not in server_cfg:
            server_cfg["enabled"] = True # Default for new entries
            
        if auto_load_on_os is not None:
            if auto_load_on_os == "": # Explicit clear
                if "auto_load_on_os" in server_cfg:
                    del server_cfg["auto_load_on_os"]
            else:
                server_cfg["auto_load_on_os"] = auto_load_on_os
        
        mcp_servers[name] = server_cfg
        
    ai_settings["mcp_servers"] = mcp_servers
    self.config.config["ai"] = ai_settings
    self.config._saveconfig(self.config.file)

Update MCP server settings in the configuration with smart merging.

def configure_provider(self, provider, model=None, api_key=None, auth=None)
Expand source code
def configure_provider(self, provider, model=None, api_key=None, auth=None):
    """Update AI provider settings in the configuration."""
    settings = self.config.config.get("ai", {})
    if model:
        settings[f"{provider}_model"] = model
    if api_key:
        settings[f"{provider}_api_key"] = api_key
    if auth is not None:
        settings[f"{provider}_auth"] = auth
        
    self.config.config["ai"] = settings
    self.config._saveconfig(self.config.file)

Update AI provider settings in the configuration.

def confirm(self, input_text, console=None)
Expand source code
def confirm(self, input_text, console=None):
    """Ask for a safe confirmation of an action."""
    from connpy.ai import ai
    agent = ai(self.config, console=console)
    return agent.confirm(input_text)

Ask for a safe confirmation of an action.

def delete_session(self, session_id)
Expand source code
def delete_session(self, session_id):
    """Delete an AI session by ID."""
    import os
    sessions_dir = os.path.join(self.config.defaultdir, "ai_sessions")
    path = os.path.join(sessions_dir, f"{session_id}.json")
    if os.path.exists(path):
        os.remove(path)
    else:
        raise InvalidConfigurationError(f"Session '{session_id}' not found.")

Delete an AI session by ID.

def list_mcp_servers(self) ‑> dict
Expand source code
def list_mcp_servers(self) -> dict:
    """Get the configured MCP servers."""
    if hasattr(self.config, "get_effective_setting"):
        ai_settings = self.config.get_effective_setting("ai", {})
    else:
        ai_settings = self.config.config.get("ai", {}) if hasattr(self.config, "config") else {}
    return ai_settings.get("mcp_servers", {})

Get the configured MCP servers.

def list_sessions(self, limit=None)
Expand source code
def list_sessions(self, limit=None):
    """Return a list of saved AI sessions, optionally limited."""
    from connpy.ai import ai
    agent = ai(self.config)
    sessions = agent._get_sessions()
    if limit and len(sessions) > limit:
        return sessions[:limit], len(sessions)
    return sessions, len(sessions)

Return a list of saved AI sessions, optionally limited.

def load_session_data(self, session_id)
Expand source code
def load_session_data(self, session_id):
    """Load a session's raw data by ID."""
    from connpy.ai import ai
    agent = ai(self.config)
    return agent.load_session_data(session_id)

Load a session's raw data by ID.

def process_copilot_input(self, input_text: str, session_state: dict) ‑> dict
Expand source code
def process_copilot_input(self, input_text: str, session_state: dict) -> dict:
    """Parses slash commands and manages session state. Returns directive dict."""
    text = input_text.strip()
    if not text.startswith('/'):
        return {"action": "execute", "clean_prompt": text, "overrides": {}}
        
    parts = text.split(maxsplit=1)
    cmd = parts[0].lower()
    args = parts[1] if len(parts) > 1 else ""
    
    # 1. State Commands (Persistent)
    if cmd == "/os":
        if args:
            session_state['os'] = args
            return {"action": "state_update", "message": f"OS context changed to {args}"}
    elif cmd == "/prompt":
        if args:
            session_state['prompt'] = args
            return {"action": "state_update", "message": f"Prompt regex changed to {args}"}
    elif cmd == "/memorize":
        if args:
            session_state['memories'].append(args)
            return {"action": "state_update", "message": f"Memory added: {args}"}
    elif cmd == "/clear":
        session_state['memories'] = []
        return {"action": "state_update", "message": "Memory cleared"}
        
    # 2. Hybrid Commands
    elif cmd == "/architect":
        if not args:
            session_state['persona'] = 'architect'
            return {"action": "state_update", "message": "Persona set to Architect"}
        else:
            return {"action": "execute", "clean_prompt": args, "overrides": {"persona": "architect"}}
            
    elif cmd == "/engineer":
        if not args:
            session_state['persona'] = 'engineer'
            return {"action": "state_update", "message": "Persona set to Engineer"}
        else:
            return {"action": "execute", "clean_prompt": args, "overrides": {"persona": "engineer"}}
            
    elif cmd == "/trust":
        if not args:
            session_state['trust_mode'] = True
            return {"action": "state_update", "message": "Auto-execute (trust) enabled for session"}
        else:
            return {"action": "execute", "clean_prompt": args, "overrides": {"trust": True}}
            
    elif cmd == "/untrust":
        if not args:
            session_state['trust_mode'] = False
            return {"action": "state_update", "message": "Auto-execute (trust) disabled for session"}
        else:
            return {"action": "execute", "clean_prompt": args, "overrides": {"trust": False}}

    # Unknown command, execute normally
    return {"action": "execute", "clean_prompt": text, "overrides": {}}

Parses slash commands and manages session state. Returns directive dict.

Inherited members