fix bugs long commands in cisco

This commit is contained in:
2026-05-20 17:23:57 -03:00
parent dce9982454
commit 4f3af7ca12
3 changed files with 127 additions and 17 deletions
+9 -2
View File
@@ -440,9 +440,16 @@ class node:
if clean_data: if clean_data:
# Track command boundaries when user hits Enter # Track command boundaries when user hits Enter
if hasattr(self, 'mylog') and (b'\r' in clean_data or b'\n' in clean_data): if hasattr(self, 'mylog') and (b'\r' in clean_data or b'\n' in clean_data):
self.cmd_byte_positions.append((self.mylog.tell(), None)) # Introduce a tiny 20ms delay to allow late-arriving tab-completion bytes
# to be written to mylog before finalizing the boundary marker.
async def delayed_marker():
await asyncio.sleep(0.02)
if hasattr(self, 'mylog'):
self.cmd_byte_positions.append((self.mylog.tell(), None))
asyncio.create_task(delayed_marker())
try: os.write(child_fd, clean_data) try:
os.write(child_fd, clean_data)
except OSError: except OSError:
break break
self.lastinput = time() self.lastinput = time()
+83 -15
View File
@@ -6,6 +6,37 @@ from connpy.utils import log_cleaner
class AIService(BaseService): class AIService(BaseService):
"""Business logic for interacting with AI agents and LLM configurations.""" """Business logic for interacting with AI agents and LLM configurations."""
def _clean_cisco_scrolling(self, text: str) -> str:
"""Resolves horizontal scrolling artifacts (backspaces, \r, ANSI) by merging overlapping segments."""
def merge_overlapping(s1, s2):
s2_clean = s2.lstrip(' $')
max_overlap = min(len(s1), len(s2_clean))
for i in range(max_overlap, 0, -1):
if s1[-i:] == s2_clean[:i]:
return s1 + s2_clean[i:]
return s1 + s2_clean
scroll_re = re.compile(r'(\x08{5,}\s*\$?|\$\r|\x1b\[\d+[GD]\s*\$?)')
parts = scroll_re.split(text)
merged = ""
for part in parts:
if scroll_re.match(part):
continue
cleaned = log_cleaner(part)
if not merged:
merged = cleaned
else:
merged_lines = merged.split('\n')
cleaned_lines = cleaned.split('\n')
merged_lines[-1] = merge_overlapping(merged_lines[-1], cleaned_lines[0])
merged_lines.extend(cleaned_lines[1:])
merged = "\n".join(merged_lines)
return merged
def build_context_blocks(self, raw_bytes: bytes, cmd_byte_positions: list, node_info: dict, last_line: str = "") -> list: def build_context_blocks(self, raw_bytes: bytes, cmd_byte_positions: list, node_info: dict, last_line: str = "") -> list:
"""Identifies command blocks in the terminal history.""" """Identifies command blocks in the terminal history."""
blocks = [] blocks = []
@@ -28,27 +59,64 @@ class AIService(BaseService):
if known_cmd: if known_cmd:
prev_chunk = raw_bytes[prev_pos:pos] prev_chunk = raw_bytes[prev_pos:pos]
prev_cleaned = log_cleaner(prev_chunk.decode(errors='replace')) prev_cleaned = self._clean_cisco_scrolling(prev_chunk.decode(errors='replace'))
prev_lines = [l for l in prev_cleaned.split('\n') if l.strip()] prev_lines = [l for l in prev_cleaned.split('\n') if l.strip()]
prompt_text = prev_lines[-1].strip() if prev_lines else "" prompt_text = prev_lines[-1].strip() if prev_lines else ""
preview = f"{prompt_text}{known_cmd}" if prompt_text else known_cmd preview = f"{prompt_text}{known_cmd}" if prompt_text else known_cmd
parsed_positions.append({"pos": pos, "type": "VALID_CMD", "preview": preview[:80]})
if len(preview) > 80:
preview = preview[:77] + "..."
parsed_positions.append({"pos": pos, "type": "VALID_CMD", "preview": preview})
else: else:
chunk = raw_bytes[prev_pos:pos] chunk = raw_bytes[prev_pos:pos]
cleaned = log_cleaner(chunk.decode(errors='replace'))
lines = [l for l in cleaned.split('\n') if l.strip()]
preview = lines[-1].strip() if lines else ""
if preview: cleaned = self._clean_cisco_scrolling(chunk.decode(errors='replace'))
match = prompt_re.search(preview) lines = [l for l in cleaned.split('\n') if l.strip()]
if match:
cmd_text = preview[match.end():].strip() found_in_pass1 = False
if cmd_text: if lines:
parsed_positions.append({"pos": pos, "type": "VALID_CMD", "preview": preview[:80]}) # Search backwards through the last few lines for the prompt
else: for idx in range(len(lines) - 1, max(-1, len(lines) - 10), -1):
parsed_positions.append({"pos": pos, "type": "EMPTY_PROMPT", "preview": ""}) match = prompt_re.search(lines[idx])
else: if match:
parsed_positions.append({"pos": pos, "type": "SCROLLING", "preview": ""}) ptxt = match.group(0).strip()
cmd_first_line = lines[idx][match.end():].strip()
cmd_rest = [l.strip() for l in lines[idx+1:]]
cmd_text = " ".join([cmd_first_line] + cmd_rest).strip()
if cmd_text:
pv = f"{ptxt} {cmd_text}".strip()
if len(pv) > 80:
pv = pv[:77] + "..."
parsed_positions.append({"pos": pos, "type": "VALID_CMD", "preview": pv})
else:
parsed_positions.append({"pos": pos, "type": "EMPTY_PROMPT", "preview": ""})
found_in_pass1 = True
break
if not found_in_pass1:
# Fallback: The prompt might have been isolated in the previous chunk
# due to asynchronous network delays splitting the output exactly at the newline.
if prev_pos > 0:
# Fetch the very last chunk that we just processed
prev_prev_pos = cmd_byte_positions[i-2][0] if i >= 2 else 0
prev_chunk_text = self._clean_cisco_scrolling(raw_bytes[prev_prev_pos:prev_pos].decode(errors='replace'))
prev_lines_text = [l for l in prev_chunk_text.split('\n') if l.strip()]
if prev_lines_text:
prev_match = prompt_re.search(prev_lines_text[-1])
if prev_match:
ptxt = prev_match.group(0).strip()
cmd_text = " ".join([l.strip() for l in lines]).strip()
if cmd_text:
pv = f"{ptxt} {cmd_text}".strip()
if len(pv) > 80:
pv = pv[:77] + "..."
parsed_positions.append({"pos": pos, "type": "VALID_CMD", "preview": pv})
found_in_pass1 = True
if not found_in_pass1:
parsed_positions.append({"pos": pos, "type": "SCROLLING", "preview": ""})
else: else:
parsed_positions.append({"pos": pos, "type": "SCROLLING", "preview": ""}) parsed_positions.append({"pos": pos, "type": "SCROLLING", "preview": ""})
+35
View File
@@ -158,3 +158,38 @@ def test_ingress_task_interception():
assert called_copilot assert called_copilot
asyncio.run(run_test()) asyncio.run(run_test())
def test_build_context_blocks_horizontal_scrolling():
from connpy.services.ai_service import AIService
svc = AIService(None)
node_info = {"prompt": "RP/0/RP0/CPU0:xrd#"}
part1 = 'RP/0/RP0/CPU0:xrd#s show interfaces * | inc "rate|is up|escr|test1|test2|test3|test4|test5|teest8|test7|t$'
part2 = '|escr|test1|test2|test3|test4|test5|teest8|test7|te s998"show interfaces * | inc "rate|is up|escr|test1|test2|test3|test4|test5|teest8|test7|$'
# Test with \r (classic IOS)
raw_bytes = (part1 + '\r' + part2).encode()
cmd_byte_positions = [(0, None), (len(raw_bytes), None)]
blocks = svc.build_context_blocks(raw_bytes, cmd_byte_positions, node_info)
assert len(blocks) >= 1
start, end, preview = blocks[0]
assert "RP/0/RP0/CPU0:xrd# s show interfaces * | inc" in preview
def test_build_context_blocks_horizontal_scrolling_ansi():
"""Test with CSI cursor repositioning (\\x1B[1G) instead of raw \\r, as used by Cisco IOS XR."""
from connpy.services.ai_service import AIService
svc = AIService(None)
node_info = {"prompt": "RP/0/RP0/CPU0:xrd#"}
part1 = 'RP/0/RP0/CPU0:xrd#s show interfaces * | inc "rate|is up|escr|test1|test2|test3|test4|test5|teest8|test7|t'
part2 = '$|escr|test1|test2|test3|test4|test5|teest8|test7|te s998"show interfaces * | inc "rate|is up|escr|test1|test2|test3|test4|test5|teest8|test7|$'
# Test with \x1B[1G (CSI Cursor Horizontal Absolute - IOS XR)
raw_bytes = (part1 + '\x1b[1G' + part2).encode()
cmd_byte_positions = [(0, None), (len(raw_bytes), None)]
blocks = svc.build_context_blocks(raw_bytes, cmd_byte_positions, node_info)
assert len(blocks) >= 1
start, end, preview = blocks[0]
assert "RP/0/RP0/CPU0:xrd# s show interfaces * | inc" in preview