feat(core,grpc): add regex support for node expectations and secure thread context sharing

- Implement dynamic regex matching fallback (re.search) in `node.test` with safe handling of invalid patterns.
- Refactor terminal window resizing (setwinsize) to trigger only on non-router devices and handle SIGWINCH re-renders.
- Introduce `contextvars` context copying for background worker threads in gRPC execution and AI servicers.
- Add unit tests for regex validation, malformed expression fallbacks, and variable formatting in node testing.
- Optimize Playbook Builder AI guidelines for single-task test evaluations.
- Unify codebase comments to English.
This commit is contained in:
2026-06-03 16:49:52 -03:00
parent 2b8e637298
commit 61a44d004f
11 changed files with 295 additions and 158 deletions
+8 -8
View File
@@ -90,7 +90,7 @@ el.replaceWith(d);
if args.mcp is not None:
return self.configure_mcp(args)
# Determinar session_id para retomar
# Determine session_id to resume
session_id = None
if args.resume:
sessions, _ = self.app.services.ai.list_sessions()
@@ -100,8 +100,8 @@ el.replaceWith(d);
elif args.session:
session_id = args.session[0]
# Configurar argumentos adicionales para el servicio de AI
# Prioridad: CLI Args > Configuración Local
# Configure additional arguments for the AI service
# Priority: CLI Args > Local Config
settings = self.app.services.config_svc.get_settings().get("ai", {})
arguments = {}
@@ -129,7 +129,7 @@ el.replaceWith(d);
printer.warning("Architect API key/auth not configured. Architect will be unavailable.")
printer.info("Use 'connpy config --architect-api-key <key>' or 'connpy config --architect-auth <auth>' to enable it.")
# El resto de la interacción el CLI la maneja con el agente subyacente
# The rest of the interaction is handled by the CLI with the underlying agent
self.app.myai = self.app.services.ai
self.ai_overrides = arguments
@@ -502,7 +502,7 @@ el.replaceWith(d);
if args.mcp is not None:
return self.configure_mcp(args)
# Determinar session_id para retomar
# Determine session_id to resume
session_id = None
if args.resume:
sessions, _ = self.app.services.ai.list_sessions()
@@ -512,8 +512,8 @@ el.replaceWith(d);
elif args.session:
session_id = args.session[0]
# Configurar argumentos adicionales para el servicio de AI
# Prioridad: CLI Args > Configuración Local
# Configure additional arguments for the AI service
# Priority: CLI Args > Local Config
settings = self.app.services.config_svc.get_settings().get("ai", {})
arguments = {}
@@ -541,7 +541,7 @@ el.replaceWith(d);
printer.warning("Architect API key/auth not configured. Architect will be unavailable.")
printer.info("Use 'connpy config --architect-api-key <key>' or 'connpy config --architect-auth <auth>' to enable it.")
# El resto de la interacción el CLI la maneja con el agente subyacente
# The rest of the interaction is handled by the CLI with the underlying agent
self.app.myai = self.app.services.ai
self.ai_overrides = arguments
+22 -22
View File
@@ -121,14 +121,14 @@ el.replaceWith(d);
}
# 1. Visual Separation
self.console.print("") # Salto de línea real
self.console.print("") # Real line break
self.console.print(Rule(title="[bold cyan] AI TERMINAL COPILOT [/bold cyan]", style="cyan"))
self.console.print(Panel(
"[dim]Type your question. Enter to send, Escape/Ctrl+C to cancel. Type / for commands.\n"
"Tab to change context mode. Ctrl+\u2191/\u2193 to adjust context. \u2191\u2193 for question history.[/dim]",
border_style="cyan"
))
self.console.print("\n") # Pequeño espacio antes del prompt del copilot
self.console.print("\n") # Small space before the copilot prompt
bindings = KeyBindings()
@bindings.add('c-up')
@@ -195,7 +195,7 @@ el.replaceWith(d);
if app and app.current_buffer:
text = app.current_buffer.text
# Solo mostrar ayuda de comandos si estamos escribiendo el primer comando y no hay espacios
# Only show command help if typing the first command and there are no spaces
if text.startswith('/') and ' ' not in text:
commands = ['/os', '/prompt', '/architect', '/engineer', '/trust', '/untrust', '/memorize', '/clear']
matches = [c for c in commands if c.startswith(text.lower())]
@@ -210,19 +210,19 @@ el.replaceWith(d);
idx = max(0, state['total_cmds'] - state['context_cmd'])
def clean_preview(text):
# Limpia saltos de línea y el prompt inicial (todo hasta #, > o $) para que quede solo el comando
# Clean newlines and the initial prompt (all up to #, > or $) to leave only the command
original = text.strip().replace('\r', '').replace('\n', ' ')
cleaned = re.sub(r'^.*?[#>\$]\s*', '', original)
# Si limpiar el prompt nos deja con un string vacío (ej: era solo "iol#"), devolvemos el original
# If cleaning the prompt leaves us with an empty string (e.g. it was just "iol#"), return the original
return cleaned if cleaned else original
if state['context_mode'] == self.mode_range:
range_blocks = blocks[idx:]
# Si hay más de un bloque, el último es siempre el prompt vacío/actual. Lo omitimos visualmente.
# If there is more than one block, the last one is always the empty/current prompt. We omit it visually.
if len(range_blocks) > 1:
range_blocks = range_blocks[:-1]
# Limpiar y truncar comandos muy largos para que no rompan la UI
# Clean and truncate very long commands so they don't break the UI
previews = []
for b in range_blocks:
p = clean_preview(b[2])
@@ -300,8 +300,8 @@ el.replaceWith(d);
style=ui_style
)
try:
# Usamos un try/finally interno para asegurar que si algo falla en prompt_async,
# no nos quedemos con la terminal en un estado extraño.
# We use an internal try/finally to ensure that if something fails in prompt_async,
# we don't leave the terminal in a strange state.
question = await session.prompt_async(
get_prompt_text,
key_bindings=bindings,
@@ -333,12 +333,12 @@ el.replaceWith(d);
except: pass
asyncio.create_task(delayed_refresh())
# Mover el cursor arriba y limpiar la línea para que el nuevo prompt reemplace al anterior
# Move the cursor up and clean the line so the new prompt replaces the previous one
sys.stdout.write('\x1b[1A\x1b[2K')
sys.stdout.flush()
continue
else:
# Limpiar el mensaje de la barra cuando se hace una pregunta real
# Clean the toolbar message when a real question is asked
state['toolbar_msg'] = ''
clean_question = directive.get("clean_prompt", question)
@@ -575,14 +575,14 @@ el.replaceWith(d);
}
# 1. Visual Separation
self.console.print("") # Salto de línea real
self.console.print("") # Real line break
self.console.print(Rule(title="[bold cyan] AI TERMINAL COPILOT [/bold cyan]", style="cyan"))
self.console.print(Panel(
"[dim]Type your question. Enter to send, Escape/Ctrl+C to cancel. Type / for commands.\n"
"Tab to change context mode. Ctrl+\u2191/\u2193 to adjust context. \u2191\u2193 for question history.[/dim]",
border_style="cyan"
))
self.console.print("\n") # Pequeño espacio antes del prompt del copilot
self.console.print("\n") # Small space before the copilot prompt
bindings = KeyBindings()
@bindings.add('c-up')
@@ -649,7 +649,7 @@ el.replaceWith(d);
if app and app.current_buffer:
text = app.current_buffer.text
# Solo mostrar ayuda de comandos si estamos escribiendo el primer comando y no hay espacios
# Only show command help if typing the first command and there are no spaces
if text.startswith('/') and ' ' not in text:
commands = ['/os', '/prompt', '/architect', '/engineer', '/trust', '/untrust', '/memorize', '/clear']
matches = [c for c in commands if c.startswith(text.lower())]
@@ -664,19 +664,19 @@ el.replaceWith(d);
idx = max(0, state['total_cmds'] - state['context_cmd'])
def clean_preview(text):
# Limpia saltos de línea y el prompt inicial (todo hasta #, > o $) para que quede solo el comando
# Clean newlines and the initial prompt (all up to #, > or $) to leave only the command
original = text.strip().replace('\r', '').replace('\n', ' ')
cleaned = re.sub(r'^.*?[#>\$]\s*', '', original)
# Si limpiar el prompt nos deja con un string vacío (ej: era solo "iol#"), devolvemos el original
# If cleaning the prompt leaves us with an empty string (e.g. it was just "iol#"), return the original
return cleaned if cleaned else original
if state['context_mode'] == self.mode_range:
range_blocks = blocks[idx:]
# Si hay más de un bloque, el último es siempre el prompt vacío/actual. Lo omitimos visualmente.
# If there is more than one block, the last one is always the empty/current prompt. We omit it visually.
if len(range_blocks) > 1:
range_blocks = range_blocks[:-1]
# Limpiar y truncar comandos muy largos para que no rompan la UI
# Clean and truncate very long commands so they don't break the UI
previews = []
for b in range_blocks:
p = clean_preview(b[2])
@@ -754,8 +754,8 @@ el.replaceWith(d);
style=ui_style
)
try:
# Usamos un try/finally interno para asegurar que si algo falla en prompt_async,
# no nos quedemos con la terminal en un estado extraño.
# We use an internal try/finally to ensure that if something fails in prompt_async,
# we don't leave the terminal in a strange state.
question = await session.prompt_async(
get_prompt_text,
key_bindings=bindings,
@@ -787,12 +787,12 @@ el.replaceWith(d);
except: pass
asyncio.create_task(delayed_refresh())
# Mover el cursor arriba y limpiar la línea para que el nuevo prompt reemplace al anterior
# Move the cursor up and clean the line so the new prompt replaces the previous one
sys.stdout.write('\x1b[1A\x1b[2K')
sys.stdout.flush()
continue
else:
# Limpiar el mensaje de la barra cuando se hace una pregunta real
# Clean the toolbar message when a real question is asked
state['toolbar_msg'] = ''
clean_question = directive.get("clean_prompt", question)
+17 -8
View File
@@ -177,6 +177,7 @@ el.replaceWith(d);
def _handle_chat_stream(self, request_iterator, context, service_method):
import queue
import threading
import contextvars
chunk_queue = queue.Queue()
request_queue = queue.Queue()
@@ -209,6 +210,7 @@ el.replaceWith(d);
session_id=session_id,
debug=debug,
status=bridge,
console=bridge,
confirm_handler=bridge.confirm,
chunk_callback=callback,
trust=trust,
@@ -270,10 +272,10 @@ el.replaceWith(d);
if req.HasField("engineer_auth"): overrides["engineer_auth"] = from_struct(req.engineer_auth)
if req.HasField("architect_auth"): overrides["architect_auth"] = from_struct(req.architect_auth)
# Start AI in its own thread so we can keep listening for interrupts
# Start AI in its own thread with a fresh copy of context so we can keep listening for interrupts
ctx_ai = contextvars.copy_context()
ai_thread = threading.Thread(
target=run_ai_task,
args=(req.input_text, req.session_id, req.debug, overrides, req.trust),
target=lambda: ctx_ai.run(run_ai_task, req.input_text, req.session_id, req.debug, overrides, req.trust),
daemon=True
)
ai_thread.start()
@@ -285,8 +287,9 @@ el.replaceWith(d);
# When client closes stream, send sentinel
chunk_queue.put((None, None))
# Start listening for client requests/signals
threading.Thread(target=request_listener, daemon=True).start()
# Start listening for client requests/signals with a copied context
ctx_listener = contextvars.copy_context()
threading.Thread(target=lambda: ctx_listener.run(request_listener), daemon=True).start()
# Main response loop (yields to gRPC)
while True:
@@ -333,7 +336,9 @@ el.replaceWith(d);
finally:
chunk_queue.put((None, None))
threading.Thread(target=_worker, daemon=True).start()
import contextvars
ctx = contextvars.copy_context()
threading.Thread(target=lambda: ctx.run(_worker), daemon=True).start()
while True:
item = chunk_queue.get()
@@ -858,7 +863,9 @@ def service(self):
finally:
q.put(None)
threading.Thread(target=_worker, daemon=True).start()
import contextvars
ctx = contextvars.copy_context()
threading.Thread(target=lambda: ctx.run(_worker), daemon=True).start()
while True:
item = q.get()
@@ -907,7 +914,9 @@ def service(self):
finally:
q.put(None)
threading.Thread(target=_worker, daemon=True).start()
import contextvars
ctx = contextvars.copy_context()
threading.Thread(target=lambda: ctx.run(_worker), daemon=True).start()
while True:
item = q.get()
+103 -59
View File
@@ -649,7 +649,7 @@ class ai:
self.one_shot = kwargs.get("one_shot", False)
# 1. Cargar configuración genérica con herencia/merge global
# 1. Load generic configuration with global inheritance/merge
if hasattr(self.config, "get_effective_setting"):
aiconfig = self.config.get_effective_setting("ai", {})
else:
@@ -692,7 +692,7 @@ class ai:
custom_trusted = [c.strip() for c in custom_trusted.split(",") if c.strip()]
self.safe_commands = list(self.SAFE_COMMANDS) + (custom_trusted if isinstance(custom_trusted, list) else [])
# Límites
# Limits
self.max_history = 30
self.max_truncate = 50000
self.soft_limit_iterations = 20 # Show warning and suggest Ctrl+C
@@ -729,7 +729,7 @@ class ai:
self.session_id = getattr(self.config, "session_id", None)
self.session_path = os.path.join(self.sessions_dir, f"{self.session_id}.json") if self.session_id else None
# Prompts base agnósticos
# Agnostic base prompts
architect_instructions = ""
if self.has_architect:
architect_instructions = """
@@ -1269,7 +1269,7 @@ class ai:
def _engineer_loop(self, task, status=None, debug=False, chat_history=None):
"""Internal loop where the Engineer executes technical tasks for the Architect."""
# Optimización de caché para el Ingeniero (Solo para Anthropic directo, Vertex tiene reglas distintas)
# Cache optimization for the Engineer (Only for direct Anthropic, Vertex has different rules)
if "claude" in self.engineer_model.lower() and "vertex" not in self.engineer_model.lower():
messages = [{"role": "system", "content": [{"type": "text", "text": self.engineer_system_prompt, "cache_control": {"type": "ephemeral"}}]}]
else:
@@ -1328,7 +1328,7 @@ class ai:
for tc in resp_msg.tool_calls:
fn, args = tc.function.name, json.loads(tc.function.arguments)
# Notificación en tiempo real de la tarea técnica (Only if not in Architect loop)
# Real-time notification of the technical task (Only if not in Architect loop)
if status and not chat_history:
s_text = ""
if fn == "list_nodes": s_text = f"[ai_status]Engineer: [SEARCH] {args.get('filter_pattern','.*')}"
@@ -1583,7 +1583,7 @@ class ai:
usage = {"input": 0, "output": 0, "total": 0}
# 1. Selector de Rol inicial (Sticky Brain)
# 1. Initial Role Selector (Sticky Brain)
explicit_architect = re.match(r'^(architect|arquitecto|@architect)[:\s]', user_input, re.I)
explicit_engineer = re.match(r'^(engineer|ingeniero|@engineer)[:\s]', user_input, re.I)
@@ -1592,7 +1592,7 @@ class ai:
elif explicit_engineer:
current_brain = "engineer"
else:
# Sticky Brain: Detectar si el Arquitecto estaba al mando en el historial reciente
# Sticky Brain: Detect if the Architect was in control in recent history
is_architect_active = False
for msg in reversed(chat_history[-5:]):
tcs = msg.get('tool_calls') if isinstance(msg, dict) else getattr(msg, 'tool_calls', None)
@@ -1606,7 +1606,7 @@ class ai:
if is_architect_active: break
current_brain = "architect" if is_architect_active else "engineer"
# 2. Preparación de mensajes y limpieza
# 2. Message preparation and cleaning
clean_input = re.sub(r'^(architect|arquitecto|engineer|ingeniero|@architect|@engineer)[:\s]+', '', user_input, flags=re.IGNORECASE).strip()
system_prompt = self.architect_system_prompt if current_brain == "architect" else self.engineer_system_prompt
@@ -1615,13 +1615,13 @@ class ai:
key = self.architect_key if current_brain == "architect" else self.engineer_key
current_auth = self.architect_auth if current_brain == "architect" else self.engineer_auth
# Estructura optimizada para Prompt Caching (Solo para Anthropic directo, Vertex tiene reglas distintas)
# Optimized structure for Prompt Caching (Only for direct Anthropic, Vertex has different rules)
if "claude" in model.lower() and "vertex" not in model.lower():
messages = [{"role": "system", "content": [{"type": "text", "text": system_prompt, "cache_control": {"type": "ephemeral"}}]}]
else:
messages = [{"role": "system", "content": system_prompt}]
# Interleaving de historial
# History interleaving
last_role = "system"
# Sanitize history if the current target model is not compatible with cache_control
history_to_process = chat_history[-self.max_history:]
@@ -1641,7 +1641,7 @@ class ai:
if last_role == 'user': messages[-1]['content'] += "\n" + clean_input
else: messages.append({"role": "user", "content": clean_input})
# 3. Bucle de ejecución
# 3. Execution loop
iteration = 0
try:
# Set up remote interrupt callback if bridge is provided
@@ -2536,7 +2536,7 @@ def ask(self, user_input, dryrun=False, chat_history=None, status=None, debug=Fa
usage = {"input": 0, "output": 0, "total": 0}
# 1. Selector de Rol inicial (Sticky Brain)
# 1. Initial Role Selector (Sticky Brain)
explicit_architect = re.match(r'^(architect|arquitecto|@architect)[:\s]', user_input, re.I)
explicit_engineer = re.match(r'^(engineer|ingeniero|@engineer)[:\s]', user_input, re.I)
@@ -2545,7 +2545,7 @@ def ask(self, user_input, dryrun=False, chat_history=None, status=None, debug=Fa
elif explicit_engineer:
current_brain = "engineer"
else:
# Sticky Brain: Detectar si el Arquitecto estaba al mando en el historial reciente
# Sticky Brain: Detect if the Architect was in control in recent history
is_architect_active = False
for msg in reversed(chat_history[-5:]):
tcs = msg.get('tool_calls') if isinstance(msg, dict) else getattr(msg, 'tool_calls', None)
@@ -2559,7 +2559,7 @@ def ask(self, user_input, dryrun=False, chat_history=None, status=None, debug=Fa
if is_architect_active: break
current_brain = "architect" if is_architect_active else "engineer"
# 2. Preparación de mensajes y limpieza
# 2. Message preparation and cleaning
clean_input = re.sub(r'^(architect|arquitecto|engineer|ingeniero|@architect|@engineer)[:\s]+', '', user_input, flags=re.IGNORECASE).strip()
system_prompt = self.architect_system_prompt if current_brain == "architect" else self.engineer_system_prompt
@@ -2568,13 +2568,13 @@ def ask(self, user_input, dryrun=False, chat_history=None, status=None, debug=Fa
key = self.architect_key if current_brain == "architect" else self.engineer_key
current_auth = self.architect_auth if current_brain == "architect" else self.engineer_auth
# Estructura optimizada para Prompt Caching (Solo para Anthropic directo, Vertex tiene reglas distintas)
# Optimized structure for Prompt Caching (Only for direct Anthropic, Vertex has different rules)
if "claude" in model.lower() and "vertex" not in model.lower():
messages = [{"role": "system", "content": [{"type": "text", "text": system_prompt, "cache_control": {"type": "ephemeral"}}]}]
else:
messages = [{"role": "system", "content": system_prompt}]
# Interleaving de historial
# History interleaving
last_role = "system"
# Sanitize history if the current target model is not compatible with cache_control
history_to_process = chat_history[-self.max_history:]
@@ -2594,7 +2594,7 @@ def ask(self, user_input, dryrun=False, chat_history=None, status=None, debug=Fa
if last_role == 'user': messages[-1]['content'] += "\n" + clean_input
else: messages.append({"role": "user", "content": clean_input})
# 3. Bucle de ejecución
# 3. Execution loop
iteration = 0
try:
# Set up remote interrupt callback if bridge is provided
@@ -4778,12 +4778,12 @@ class node:
# Get raw bytes from BytesIO
raw_bytes = self.mylog.getvalue()
# Detener el lector de la terminal para que prompt_toolkit (en run_session)
# tenga control exclusivo del stdin sin interferencias de LocalStream.
# Stop terminal reading so prompt_toolkit (in run_session)
# has exclusive control of stdin without LocalStream interference.
if hasattr(stream, 'stop_reading'):
stream.stop_reading()
elif hasattr(stream, '_loop') and hasattr(stream, 'stdin_fd'):
# Fallback si no tiene el método (en LocalStream)
# Fallback if the method is missing (in LocalStream)
stream._loop.remove_reader(stream.stdin_fd)
try:
@@ -4800,7 +4800,7 @@ class node:
break
finally:
print("\033[2m Returning to session...\033[0m", flush=True)
# Reiniciar el lector de la terminal para volver al modo interactivo SSH/Telnet
# Restart terminal reading to return to interactive SSH/Telnet mode
if hasattr(stream, 'start_reading'):
stream.start_reading()
elif hasattr(stream, '_loop') and hasattr(stream, 'stdin_fd'):
@@ -4868,14 +4868,6 @@ class node:
port_str = f":{self.port}" if self.port and self.protocol not in ["ssm", "kubectl", "docker"] else ""
logger("success", f"Connected to {self.unique} at {self.host}{port_str} via: {self.protocol}")
# Attempt to set the terminal size
try:
self.child.setwinsize(65535, 65535)
except Exception:
try:
self.child.setwinsize(10000, 10000)
except Exception:
pass
if "prompt" in self.tags:
prompt = self.tags["prompt"]
expects = [prompt, pexpect.EOF, pexpect.TIMEOUT]
@@ -4896,6 +4888,20 @@ class node:
self.status = 1
return self.output
result = self.child.expect(expects, timeout = timeout)
# Only set terminal size on devices without a
# screen_length_command (e.g. Linux/bash servers).
# Routers already disable pagination via that command.
# After setwinsize, consume any SIGWINCH re-render
# prompt (~40ms on bash) with a short timeout.
if c == commands[0] and "screen_length_command" not in self.tags:
try:
self.child.setwinsize(65535, 65535)
except Exception:
try:
self.child.setwinsize(10000, 10000)
except Exception:
pass
self.child.expect(expects, timeout = 1)
self.child.sendline(c)
if result == 2:
break
@@ -4978,14 +4984,6 @@ class node:
port_str = f":{self.port}" if self.port and self.protocol not in ["ssm", "kubectl", "docker"] else ""
logger("success", f"Connected to {self.unique} at {self.host}{port_str} via: {self.protocol}")
# Attempt to set the terminal size
try:
self.child.setwinsize(65535, 65535)
except Exception:
try:
self.child.setwinsize(10000, 10000)
except Exception:
pass
if "prompt" in self.tags:
prompt = self.tags["prompt"]
expects = [prompt, pexpect.EOF, pexpect.TIMEOUT]
@@ -5007,6 +5005,15 @@ class node:
self.status = 1
return self.output
result = self.child.expect(expects, timeout = timeout)
if c == commands[0] and "screen_length_command" not in self.tags:
try:
self.child.setwinsize(65535, 65535)
except Exception:
try:
self.child.setwinsize(10000, 10000)
except Exception:
pass
self.child.expect(expects, timeout = 1)
self.child.sendline(c)
if result == 2:
break
@@ -5032,13 +5039,28 @@ class node:
if vars is not None:
e = e.format(**vars)
updatedprompt = re.sub(r'(?<!\\)\$', '', prompt)
newpattern = f".*({updatedprompt}).*{e}.*"
cleaned_output = output
cleaned_output = re.sub(newpattern, '', cleaned_output)
try:
newpattern = f".*({updatedprompt}).*{e}.*"
cleaned_output = re.sub(newpattern, '', cleaned_output)
except re.error:
try:
escaped_e = re.escape(e)
newpattern = f".*({updatedprompt}).*{escaped_e}.*"
cleaned_output = re.sub(newpattern, '', cleaned_output)
except re.error:
pass
if e in cleaned_output:
self.result[e] = True
else:
self.result[e]= False
try:
if re.search(e, cleaned_output):
self.result[e] = True
else:
self.result[e] = False
except re.error:
self.result[e] = False
self.status = 0
return self.result
if result == 2:
@@ -5446,14 +5468,6 @@ def run(self, commands, vars = None,*, folder = '', prompt = r'>$
port_str = f":{self.port}" if self.port and self.protocol not in ["ssm", "kubectl", "docker"] else ""
logger("success", f"Connected to {self.unique} at {self.host}{port_str} via: {self.protocol}")
# Attempt to set the terminal size
try:
self.child.setwinsize(65535, 65535)
except Exception:
try:
self.child.setwinsize(10000, 10000)
except Exception:
pass
if "prompt" in self.tags:
prompt = self.tags["prompt"]
expects = [prompt, pexpect.EOF, pexpect.TIMEOUT]
@@ -5474,6 +5488,20 @@ def run(self, commands, vars = None,*, folder = '', prompt = r'>$
self.status = 1
return self.output
result = self.child.expect(expects, timeout = timeout)
# Only set terminal size on devices without a
# screen_length_command (e.g. Linux/bash servers).
# Routers already disable pagination via that command.
# After setwinsize, consume any SIGWINCH re-render
# prompt (~40ms on bash) with a short timeout.
if c == commands[0] and "screen_length_command" not in self.tags:
try:
self.child.setwinsize(65535, 65535)
except Exception:
try:
self.child.setwinsize(10000, 10000)
except Exception:
pass
self.child.expect(expects, timeout = 1)
self.child.sendline(c)
if result == 2:
break
@@ -5597,14 +5625,6 @@ def test(self, commands, expected, vars = None,*, folder = '', prompt =
port_str = f":{self.port}" if self.port and self.protocol not in ["ssm", "kubectl", "docker"] else ""
logger("success", f"Connected to {self.unique} at {self.host}{port_str} via: {self.protocol}")
# Attempt to set the terminal size
try:
self.child.setwinsize(65535, 65535)
except Exception:
try:
self.child.setwinsize(10000, 10000)
except Exception:
pass
if "prompt" in self.tags:
prompt = self.tags["prompt"]
expects = [prompt, pexpect.EOF, pexpect.TIMEOUT]
@@ -5626,6 +5646,15 @@ def test(self, commands, expected, vars = None,*, folder = '', prompt =
self.status = 1
return self.output
result = self.child.expect(expects, timeout = timeout)
if c == commands[0] and "screen_length_command" not in self.tags:
try:
self.child.setwinsize(65535, 65535)
except Exception:
try:
self.child.setwinsize(10000, 10000)
except Exception:
pass
self.child.expect(expects, timeout = 1)
self.child.sendline(c)
if result == 2:
break
@@ -5651,13 +5680,28 @@ def test(self, commands, expected, vars = None,*, folder = '', prompt =
if vars is not None:
e = e.format(**vars)
updatedprompt = re.sub(r'(?<!\\)\$', '', prompt)
newpattern = f".*({updatedprompt}).*{e}.*"
cleaned_output = output
cleaned_output = re.sub(newpattern, '', cleaned_output)
try:
newpattern = f".*({updatedprompt}).*{e}.*"
cleaned_output = re.sub(newpattern, '', cleaned_output)
except re.error:
try:
escaped_e = re.escape(e)
newpattern = f".*({updatedprompt}).*{escaped_e}.*"
cleaned_output = re.sub(newpattern, '', cleaned_output)
except re.error:
pass
if e in cleaned_output:
self.result[e] = True
else:
self.result[e]= False
try:
if re.search(e, cleaned_output):
self.result[e] = True
else:
self.result[e] = False
except re.error:
self.result[e] = False
self.status = 0
return self.result
if result == 2: