Module connpy.services.execution_service
Classes
class ExecutionService (config=None)-
Expand source code
class ExecutionService(BaseService): """Business logic for executing commands on nodes and running automation scripts.""" def run_commands( self, nodes_filter: str, commands: List[str], variables: Optional[Dict[str, Any]] = None, parallel: int = 10, timeout: int = 20, folder: Optional[str] = None, prompt: Optional[str] = None, on_node_complete: Optional[Callable] = None, logger: Optional[Callable] = None, name: Optional[str] = None ) -> Dict[str, str]: """Execute commands on a set of nodes.""" try: matched_names = self.config._getallnodes(nodes_filter) if not matched_names: raise ConnpyError(f"No nodes found matching filter: {nodes_filter}") node_data = self.config.getitems(matched_names, extract=True) executor = Nodes(node_data, config=self.config) self.last_executor = executor results = executor.run( commands=commands, vars=variables, parallel=parallel, timeout=timeout, folder=folder, prompt=prompt, on_complete=on_node_complete, logger=logger ) # Combine output and status for the caller full_results = {} for unique in results: full_results[unique] = { "output": results[unique], "status": executor.status.get(unique, 1) } return full_results except Exception as e: raise ConnpyError(f"Execution failed: {e}") def test_commands( self, nodes_filter: str, commands: List[str], expected: List[str], variables: Optional[Dict[str, Any]] = None, parallel: int = 10, timeout: int = 20, folder: Optional[str] = None, prompt: Optional[str] = None, on_node_complete: Optional[Callable] = None, logger: Optional[Callable] = None, name: Optional[str] = None ) -> Dict[str, Dict[str, bool]]: """Run commands and verify expected output on a set of nodes.""" try: matched_names = self.config._getallnodes(nodes_filter) if not matched_names: raise ConnpyError(f"No nodes found matching filter: {nodes_filter}") node_data = self.config.getitems(matched_names, extract=True) executor = Nodes(node_data, config=self.config) self.last_executor = executor results = executor.test( commands=commands, expected=expected, vars=variables, parallel=parallel, timeout=timeout, folder=folder, prompt=prompt, on_complete=on_node_complete, logger=logger ) return results except Exception as e: raise ConnpyError(f"Testing failed: {e}") def run_cli_script(self, nodes_filter: str, script_path: str, parallel: int = 10) -> Dict[str, str]: """Run a plain-text script containing one command per line.""" if not os.path.exists(script_path): raise ConnpyError(f"Script file not found: {script_path}") try: with open(script_path, "r") as f: commands = [line.strip() for line in f if line.strip()] except Exception as e: raise ConnpyError(f"Failed to read script {script_path}: {e}") return self.run_commands(nodes_filter, commands, parallel=parallel)Business logic for executing commands on nodes and running automation scripts.
Initialize the service.
Args
config- An instance of configfile (or None to instantiate a new one/use global context).
Ancestors
Methods
def run_cli_script(self, nodes_filter: str, script_path: str, parallel: int = 10) ‑> Dict[str, str]-
Expand source code
def run_cli_script(self, nodes_filter: str, script_path: str, parallel: int = 10) -> Dict[str, str]: """Run a plain-text script containing one command per line.""" if not os.path.exists(script_path): raise ConnpyError(f"Script file not found: {script_path}") try: with open(script_path, "r") as f: commands = [line.strip() for line in f if line.strip()] except Exception as e: raise ConnpyError(f"Failed to read script {script_path}: {e}") return self.run_commands(nodes_filter, commands, parallel=parallel)Run a plain-text script containing one command per line.
def run_commands(self,
nodes_filter: str,
commands: List[str],
variables: Dict[str, Any] | None = None,
parallel: int = 10,
timeout: int = 20,
folder: str | None = None,
prompt: str | None = None,
on_node_complete: Callable | None = None,
logger: Callable | None = None,
name: str | None = None) ‑> Dict[str, str]-
Expand source code
def run_commands( self, nodes_filter: str, commands: List[str], variables: Optional[Dict[str, Any]] = None, parallel: int = 10, timeout: int = 20, folder: Optional[str] = None, prompt: Optional[str] = None, on_node_complete: Optional[Callable] = None, logger: Optional[Callable] = None, name: Optional[str] = None ) -> Dict[str, str]: """Execute commands on a set of nodes.""" try: matched_names = self.config._getallnodes(nodes_filter) if not matched_names: raise ConnpyError(f"No nodes found matching filter: {nodes_filter}") node_data = self.config.getitems(matched_names, extract=True) executor = Nodes(node_data, config=self.config) self.last_executor = executor results = executor.run( commands=commands, vars=variables, parallel=parallel, timeout=timeout, folder=folder, prompt=prompt, on_complete=on_node_complete, logger=logger ) # Combine output and status for the caller full_results = {} for unique in results: full_results[unique] = { "output": results[unique], "status": executor.status.get(unique, 1) } return full_results except Exception as e: raise ConnpyError(f"Execution failed: {e}")Execute commands on a set of nodes.
def test_commands(self,
nodes_filter: str,
commands: List[str],
expected: List[str],
variables: Dict[str, Any] | None = None,
parallel: int = 10,
timeout: int = 20,
folder: str | None = None,
prompt: str | None = None,
on_node_complete: Callable | None = None,
logger: Callable | None = None,
name: str | None = None) ‑> Dict[str, Dict[str, bool]]-
Expand source code
def test_commands( self, nodes_filter: str, commands: List[str], expected: List[str], variables: Optional[Dict[str, Any]] = None, parallel: int = 10, timeout: int = 20, folder: Optional[str] = None, prompt: Optional[str] = None, on_node_complete: Optional[Callable] = None, logger: Optional[Callable] = None, name: Optional[str] = None ) -> Dict[str, Dict[str, bool]]: """Run commands and verify expected output on a set of nodes.""" try: matched_names = self.config._getallnodes(nodes_filter) if not matched_names: raise ConnpyError(f"No nodes found matching filter: {nodes_filter}") node_data = self.config.getitems(matched_names, extract=True) executor = Nodes(node_data, config=self.config) self.last_executor = executor results = executor.test( commands=commands, expected=expected, vars=variables, parallel=parallel, timeout=timeout, folder=folder, prompt=prompt, on_complete=on_node_complete, logger=logger ) return results except Exception as e: raise ConnpyError(f"Testing failed: {e}")Run commands and verify expected output on a set of nodes.
Inherited members