refactor: optimize bulk ops, prioritize exact node matches & fix remote AI deadlock
- Priority Matching: Prioritize exact node matches in connect, delete, show, and modify actions to bypass disambiguation prompts and prevent accidental bulk mutations on partial matches. - Bulk Operations: Optimize NodeService delete and update operations by deferring configuration writes, syncs, and cache updates to the final element of a batch. - Remote AI: Prevent client-side CLI deadlocks when the gRPC server encounters AI configuration ValueErrors by returning a clean error state and final stream marker. - Testing: Add unit test to verify exact-match priority behavior and update existing CLI tests to match new NodeService signatures.
This commit is contained in:
@@ -14,6 +14,23 @@ class NodeHandler:
|
||||
self.app = app
|
||||
self.forms = Forms(app)
|
||||
|
||||
def _filter_exact_match(self, matches, query):
|
||||
if not query or len(matches) <= 1:
|
||||
return matches
|
||||
|
||||
exact_matches = []
|
||||
for m in matches:
|
||||
if self.app.case:
|
||||
if m == query:
|
||||
exact_matches.append(m)
|
||||
else:
|
||||
if m.lower() == query.lower():
|
||||
exact_matches.append(m)
|
||||
|
||||
if len(exact_matches) == 1:
|
||||
return exact_matches
|
||||
return matches
|
||||
|
||||
def dispatch(self, args):
|
||||
if not self.app.case and args.data != None:
|
||||
args.data = args.data.lower()
|
||||
@@ -39,6 +56,7 @@ class NodeHandler:
|
||||
else:
|
||||
try:
|
||||
matches = self.app.services.nodes.list_nodes(args.data)
|
||||
matches = self._filter_exact_match(matches, args.data)
|
||||
except Exception:
|
||||
matches = []
|
||||
|
||||
@@ -73,6 +91,7 @@ class NodeHandler:
|
||||
matches = self.app.services.nodes.list_folders(args.data)
|
||||
else:
|
||||
matches = self.app.services.nodes.list_nodes(args.data)
|
||||
matches = self._filter_exact_match(matches, args.data)
|
||||
except Exception:
|
||||
matches = []
|
||||
|
||||
@@ -87,8 +106,9 @@ class NodeHandler:
|
||||
sys.exit(7)
|
||||
|
||||
try:
|
||||
for item in matches:
|
||||
self.app.services.nodes.delete_node(item, is_folder=is_folder)
|
||||
for i, item in enumerate(matches):
|
||||
save_on_last = (i == len(matches) - 1)
|
||||
self.app.services.nodes.delete_node(item, is_folder=is_folder, save=save_on_last)
|
||||
|
||||
if len(matches) == 1:
|
||||
printer.success(f"{matches[0]} deleted successfully")
|
||||
@@ -144,6 +164,7 @@ class NodeHandler:
|
||||
|
||||
try:
|
||||
matches = self.app.services.nodes.list_nodes(args.data)
|
||||
matches = self._filter_exact_match(matches, args.data)
|
||||
except Exception:
|
||||
matches = []
|
||||
|
||||
@@ -171,6 +192,7 @@ class NodeHandler:
|
||||
|
||||
try:
|
||||
matches = self.app.services.nodes.list_nodes(args.data)
|
||||
matches = self._filter_exact_match(matches, args.data)
|
||||
except Exception:
|
||||
matches = []
|
||||
|
||||
@@ -209,7 +231,7 @@ class NodeHandler:
|
||||
self.app.services.nodes.update_node(matches[0], updatenode)
|
||||
printer.success(f"{args.data} edited successfully")
|
||||
else:
|
||||
editcount = 0
|
||||
changed_items = []
|
||||
for k in matches:
|
||||
updated_item = self.app.services.nodes.explode_unique(k)
|
||||
updated_item["type"] = "connection"
|
||||
@@ -222,8 +244,12 @@ class NodeHandler:
|
||||
updated_item[key] = updatenode[key]
|
||||
|
||||
if this_item_changed:
|
||||
editcount += 1
|
||||
self.app.services.nodes.update_node(k, updated_item)
|
||||
changed_items.append((k, updated_item))
|
||||
|
||||
editcount = len(changed_items)
|
||||
for i, (k, updated_item) in enumerate(changed_items):
|
||||
save_on_last = (i == editcount - 1)
|
||||
self.app.services.nodes.update_node(k, updated_item, save=save_on_last)
|
||||
|
||||
if editcount == 0:
|
||||
printer.info("Nothing to do here")
|
||||
|
||||
@@ -815,8 +815,13 @@ class StatusBridge:
|
||||
return default
|
||||
|
||||
class AIServicer(connpy_pb2_grpc.AIServiceServicer):
|
||||
def __init__(self, config):
|
||||
def __init__(self, config, debug=False):
|
||||
self.service = AIService(config)
|
||||
self.server_debug = debug
|
||||
if debug:
|
||||
from rich.console import Console
|
||||
from ..printer import connpy_theme, get_original_stdout
|
||||
self.server_console = Console(theme=connpy_theme, file=get_original_stdout())
|
||||
|
||||
@handle_errors
|
||||
def ask(self, request_iterator, context):
|
||||
@@ -859,6 +864,16 @@ class AIServicer(connpy_pb2_grpc.AIServiceServicer):
|
||||
|
||||
# Send final chunk marker
|
||||
chunk_queue.put(("final_mark", res))
|
||||
except ValueError as e:
|
||||
# Configuration or LLM provider connection errors are expected, only print in debug mode
|
||||
if debug or getattr(self, "server_debug", False):
|
||||
from rich.console import Console
|
||||
from ..printer import connpy_theme, get_original_stdout
|
||||
c = getattr(self, "server_console", None) or Console(theme=connpy_theme, file=get_original_stdout())
|
||||
c.print(f"[debug][DEBUG][/debug] AI Task Error: {e}")
|
||||
chunk_queue.put(("status", f"Error: {str(e)}"))
|
||||
# Crucial: always send final_mark to avoid client deadlock
|
||||
chunk_queue.put(("final_mark", {"response": f"Error: {str(e)}", "chat_history": history, "error": True}))
|
||||
except Exception as e:
|
||||
import traceback
|
||||
print(f"AI Task Error: {e}")
|
||||
@@ -1058,7 +1073,7 @@ def serve(config, port=8048, debug=False):
|
||||
remote_plugin_pb2_grpc.add_RemotePluginServiceServicer_to_server(plugin_servicer, server)
|
||||
connpy_pb2_grpc.add_ExecutionServiceServicer_to_server(ExecutionServicer(config), server)
|
||||
connpy_pb2_grpc.add_ImportExportServiceServicer_to_server(ImportExportServicer(config), server)
|
||||
connpy_pb2_grpc.add_AIServiceServicer_to_server(AIServicer(config), server)
|
||||
connpy_pb2_grpc.add_AIServiceServicer_to_server(AIServicer(config, debug=debug), server)
|
||||
connpy_pb2_grpc.add_SystemServiceServicer_to_server(SystemServicer(config), server)
|
||||
|
||||
server.add_insecure_port(f'[::]:{port}')
|
||||
|
||||
@@ -462,15 +462,17 @@ class NodeStub:
|
||||
self._trigger_local_cache_sync()
|
||||
|
||||
@handle_errors
|
||||
def update_node(self, unique_id, data):
|
||||
def update_node(self, unique_id, data, save=True):
|
||||
req = connpy_pb2.NodeRequest(id=unique_id, data=to_struct(data), is_folder=False)
|
||||
self.stub.update_node(req)
|
||||
if save:
|
||||
self._trigger_local_cache_sync()
|
||||
|
||||
@handle_errors
|
||||
def delete_node(self, unique_id, is_folder=False):
|
||||
def delete_node(self, unique_id, is_folder=False, save=True):
|
||||
req = connpy_pb2.DeleteRequest(id=unique_id, is_folder=is_folder)
|
||||
self.stub.delete_node(req)
|
||||
if save:
|
||||
self._trigger_local_cache_sync()
|
||||
|
||||
@handle_errors
|
||||
@@ -895,9 +897,6 @@ class AIStub:
|
||||
from ..printer import connpy_theme, get_original_stdout
|
||||
stable_console = RichConsole(theme=connpy_theme, file=get_original_stdout())
|
||||
stable_console.print(Rule(style=alias))
|
||||
elif not full_content and final_result.get("response"):
|
||||
# If nothing streamed but we have response (e.g. error or direct guide)
|
||||
printer.console.print(Panel(Markdown(final_result["response"]), title=title, border_style=alias, expand=False))
|
||||
break
|
||||
except Exception as e:
|
||||
# Check if it was a gRPC error that we should let handle_errors catch
|
||||
|
||||
@@ -148,7 +148,7 @@ class NodeService(BaseService):
|
||||
self.config._connections_add(**data)
|
||||
self.config._saveconfig(self.config.file)
|
||||
|
||||
def update_node(self, unique_id, data):
|
||||
def update_node(self, unique_id, data, save=True):
|
||||
"""Explicitly update an existing node."""
|
||||
all_nodes = self.config._getallnodes()
|
||||
if unique_id not in all_nodes:
|
||||
@@ -162,9 +162,10 @@ class NodeService(BaseService):
|
||||
|
||||
# config._connections_add actually handles updates if ID exists correctly
|
||||
self.config._connections_add(**data)
|
||||
if save:
|
||||
self.config._saveconfig(self.config.file)
|
||||
|
||||
def delete_node(self, unique_id, is_folder=False):
|
||||
def delete_node(self, unique_id, is_folder=False, save=True):
|
||||
"""Logic for deleting a node or folder."""
|
||||
if is_folder:
|
||||
uniques = self.config._explode_unique(unique_id)
|
||||
@@ -177,6 +178,7 @@ class NodeService(BaseService):
|
||||
raise NodeNotFoundError(f"Node '{unique_id}' not found or invalid.")
|
||||
self.config._connections_del(**uniques)
|
||||
|
||||
if save:
|
||||
self.config._saveconfig(self.config.file)
|
||||
|
||||
def connect_node(self, unique_id, sftp=False, debug=False, logger=None):
|
||||
|
||||
@@ -40,7 +40,7 @@ def test_node_del(mock_prompt, mock_delete_node, mock_list_nodes, app):
|
||||
mock_list_nodes.return_value = ["router1"]
|
||||
mock_prompt.return_value = {"delete": True}
|
||||
app.start(["node", "-r", "router1"])
|
||||
mock_delete_node.assert_called_once_with("router1", is_folder=False)
|
||||
mock_delete_node.assert_called_once_with("router1", is_folder=False, save=True)
|
||||
|
||||
@patch("connpy.services.node_service.NodeService.list_nodes")
|
||||
@patch("connpy.services.node_service.NodeService.get_node_details")
|
||||
@@ -314,3 +314,13 @@ def test_config_auth_file_path(mock_get_settings, mock_update_setting, mock_open
|
||||
assert args[1]["engineer_auth"] == {"vertex_project": "file-project"}
|
||||
|
||||
|
||||
@patch("connpy.services.node_service.NodeService.list_nodes")
|
||||
@patch("connpy.services.node_service.NodeService.connect_node")
|
||||
def test_node_connect_exact_match_priority(mock_connect_node, mock_list_nodes, app):
|
||||
"""Test that exact matches are prioritized over partial/regex matches when connecting."""
|
||||
mock_list_nodes.return_value = ["pe1@ctx", "qro1pe1@ctx"]
|
||||
app.start(["node", "pe1@ctx"])
|
||||
mock_connect_node.assert_called_once_with("pe1@ctx", sftp=False, debug=False, logger=app._service_logger)
|
||||
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user