Files
connpy/connpy/connapp.py
T

1676 lines
78 KiB
Python
Raw Normal View History

2022-03-19 20:41:35 -03:00
#!/usr/bin/env python3
#Imports
import os
import re
import ast
import argparse
2022-03-22 19:54:05 -03:00
import sys
import inquirer
2022-05-11 14:25:43 -03:00
from .core import node,nodes
2022-04-05 20:04:18 -03:00
from ._version import __version__
from . import printer
2024-07-02 16:53:07 -03:00
from .api import start_api,stop_api,debug_api,app
from .ai import ai
2023-12-14 16:56:59 -03:00
from .plugins import Plugins
2022-05-11 14:25:43 -03:00
import yaml
2023-12-14 16:56:59 -03:00
import shutil
class NoAliasDumper(yaml.SafeDumper):
def ignore_aliases(self, data):
return True
import ast
from rich.markdown import Markdown
from rich.console import Console, Group
from rich.panel import Panel
from rich.text import Text
from rich.rule import Rule
from rich.style import Style
from rich.prompt import Prompt
mdprint = Console().print
console = Console()
2022-04-18 19:19:25 -03:00
try:
from pyfzf.pyfzf import FzfPrompt
except:
FzfPrompt = None
2023-04-06 18:47:29 -03:00
2022-04-02 23:25:53 -03:00
2022-03-19 20:41:35 -03:00
#functions and classes
class connapp:
2022-04-03 18:25:58 -03:00
''' This class starts the connection manager app. It's normally used by connection manager but you can use it on a script to run the connection manager your way and use a different configfile and key.
'''
def __init__(self, config):
'''
### Parameters:
- config (obj): Object generated with configfile class, it contains
the nodes configuration and the methods to manage
the config file.
'''
2024-07-02 16:53:07 -03:00
self.app = app
2022-03-19 20:41:35 -03:00
self.node = node
2024-04-22 18:17:11 -03:00
self.nodes = nodes
self.start_api = start_api
self.stop_api = stop_api
self.debug_api = debug_api
self.ai = ai
2022-03-19 20:41:35 -03:00
self.config = config
2024-04-22 18:17:11 -03:00
self.nodes_list = self.config._getallnodes()
2023-04-14 11:44:56 -03:00
self.folders = self.config._getallfolders()
2022-03-22 19:54:05 -03:00
self.profiles = list(self.config.profiles.keys())
2022-03-25 12:25:59 -03:00
self.case = self.config.config["case"]
2022-04-18 19:19:25 -03:00
try:
self.fzf = self.config.config["fzf"]
except:
self.fzf = False
def start(self,argv = sys.argv[1:]):
'''
### Parameters:
- argv (list): List of arguments to pass to the app.
Default: sys.argv[1:]
'''
2022-03-22 19:54:05 -03:00
#DEFAULTPARSER
defaultparser = argparse.ArgumentParser(prog = "connpy", description = "SSH and Telnet connection manager", formatter_class=argparse.RawTextHelpFormatter)
2023-12-14 16:56:59 -03:00
subparsers = defaultparser.add_subparsers(title="Commands", dest="subcommand")
2022-03-22 19:54:05 -03:00
#NODEPARSER
2023-12-14 16:56:59 -03:00
nodeparser = subparsers.add_parser("node", formatter_class=argparse.RawTextHelpFormatter)
2022-03-22 19:54:05 -03:00
nodecrud = nodeparser.add_mutually_exclusive_group()
2023-10-26 17:33:44 -03:00
nodeparser.add_argument("node", metavar="node|folder", nargs='?', default=None, action=self._store_type, help=self._help("node"))
2022-04-18 19:19:25 -03:00
nodecrud.add_argument("-v","--version", dest="action", action="store_const", help="Show version", const="version", default="connect")
2022-04-04 14:41:51 -03:00
nodecrud.add_argument("-a","--add", dest="action", action="store_const", help="Add new node[@subfolder][@folder] or [@subfolder]@folder", const="add", default="connect")
nodecrud.add_argument("-r","--del", "--rm", dest="action", action="store_const", help="Delete node[@subfolder][@folder] or [@subfolder]@folder", const="del", default="connect")
nodecrud.add_argument("-e","--mod", "--edit", dest="action", action="store_const", help="Modify node[@subfolder][@folder]", const="mod", default="connect")
nodecrud.add_argument("-s","--show", dest="action", action="store_const", help="Show node[@subfolder][@folder]", const="show", default="connect")
2023-11-03 11:59:00 -03:00
nodecrud.add_argument("-d","--debug", dest="debug", action="store_true", help="Display all conections steps")
nodeparser.add_argument("-t","--sftp", dest="sftp", action="store_true", help="Connects using sftp instead of ssh")
2022-03-22 19:54:05 -03:00
nodeparser.set_defaults(func=self._func_node)
#PROFILEPARSER
2023-12-14 16:56:59 -03:00
profileparser = subparsers.add_parser("profile", description="Manage profiles")
2022-04-02 23:25:53 -03:00
profileparser.add_argument("profile", nargs=1, action=self._store_type, type=self._type_profile, help="Name of profile to manage")
2022-03-22 19:54:05 -03:00
profilecrud = profileparser.add_mutually_exclusive_group(required=True)
2022-04-04 14:41:51 -03:00
profilecrud.add_argument("-a", "--add", dest="action", action="store_const", help="Add new profile", const="add")
profilecrud.add_argument("-r", "--del", "--rm", dest="action", action="store_const", help="Delete profile", const="del")
profilecrud.add_argument("-e", "--mod", "--edit", dest="action", action="store_const", help="Modify profile", const="mod")
profilecrud.add_argument("-s", "--show", dest="action", action="store_const", help="Show profile", const="show")
2022-03-22 19:54:05 -03:00
profileparser.set_defaults(func=self._func_profile)
#MOVEPARSER
2023-12-14 16:56:59 -03:00
moveparser = subparsers.add_parser("move", aliases=["mv"], description="Move node")
2022-04-02 23:25:53 -03:00
moveparser.add_argument("move", nargs=2, action=self._store_type, help="Move node[@subfolder][@folder] dest_node[@subfolder][@folder]", default="move", type=self._type_node)
2022-03-22 19:54:05 -03:00
moveparser.set_defaults(func=self._func_others)
#COPYPARSER
2023-12-14 16:56:59 -03:00
copyparser = subparsers.add_parser("copy", aliases=["cp"], description="Copy node")
2022-04-02 23:25:53 -03:00
copyparser.add_argument("cp", nargs=2, action=self._store_type, help="Copy node[@subfolder][@folder] new_node[@subfolder][@folder]", default="cp", type=self._type_node)
2022-03-22 19:54:05 -03:00
copyparser.set_defaults(func=self._func_others)
#LISTPARSER
2023-12-14 16:56:59 -03:00
lsparser = subparsers.add_parser("list", aliases=["ls"], description="List profiles, nodes or folders")
2022-04-02 23:25:53 -03:00
lsparser.add_argument("ls", action=self._store_type, choices=["profiles","nodes","folders"], help="List profiles, nodes or folders", default=False)
2023-10-26 17:33:44 -03:00
lsparser.add_argument("--filter", nargs=1, help="Filter results")
lsparser.add_argument("--format", nargs=1, help="Format of the output of nodes using {name}, {NAME}, {location}, {LOCATION}, {host} and {HOST}")
2022-03-22 19:54:05 -03:00
lsparser.set_defaults(func=self._func_others)
#BULKPARSER
2023-12-14 16:56:59 -03:00
bulkparser = subparsers.add_parser("bulk", description="Add nodes in bulk")
2022-04-02 23:25:53 -03:00
bulkparser.add_argument("bulk", const="bulk", nargs=0, action=self._store_type, help="Add nodes in bulk")
bulkparser.add_argument("-f", "--file", nargs=1, help="Import nodes from a file. First line nodes, second line hosts")
2022-03-22 19:54:05 -03:00
bulkparser.set_defaults(func=self._func_others)
# EXPORTPARSER
2023-12-14 16:56:59 -03:00
exportparser = subparsers.add_parser("export", description="Export connection folder to Yaml file")
2023-09-21 17:28:09 -03:00
exportparser.add_argument("export", nargs="+", action=self._store_type, help="Export /path/to/file.yml [@subfolder1][@folder1] [@subfolderN][@folderN]")
exportparser.set_defaults(func=self._func_export)
# IMPORTPARSER
2023-12-14 16:56:59 -03:00
importparser = subparsers.add_parser("import", description="Import connection folder to config from Yaml file")
importparser.add_argument("file", nargs=1, action=self._store_type, help="Import /path/to/file.yml")
importparser.set_defaults(func=self._func_import)
# AIPARSER
2023-12-14 16:56:59 -03:00
aiparser = subparsers.add_parser("ai", description="Make request to an AI")
aiparser.add_argument("ask", nargs='*', help="Ask connpy AI something")
aiparser.add_argument("--engineer-model", nargs=1, help="Override engineer model")
aiparser.add_argument("--engineer-api-key", nargs=1, help="Override engineer api key")
aiparser.add_argument("--architect-model", nargs=1, help="Override architect model")
aiparser.add_argument("--architect-api-key", nargs=1, help="Override architect api key")
aiparser.add_argument("--debug", action="store_true", help="Show AI reasoning and tool calls")
aiparser.set_defaults(func=self._func_ai)
2022-05-11 14:25:43 -03:00
#RUNPARSER
2023-12-14 16:56:59 -03:00
runparser = subparsers.add_parser("run", description="Run scripts or commands on nodes", formatter_class=argparse.RawTextHelpFormatter)
2023-01-05 16:39:22 -03:00
runparser.add_argument("run", nargs='+', action=self._store_type, help=self._help("run"), default="run")
2022-05-11 14:25:43 -03:00
runparser.add_argument("-g","--generate", dest="action", action="store_const", help="Generate yaml file template", const="generate", default="run")
runparser.set_defaults(func=self._func_run)
2023-04-06 18:47:29 -03:00
#APIPARSER
2023-12-14 16:56:59 -03:00
apiparser = subparsers.add_parser("api", description="Start and stop connpy api")
2023-04-06 18:47:29 -03:00
apicrud = apiparser.add_mutually_exclusive_group(required=True)
2023-04-15 22:38:52 -03:00
apicrud.add_argument("-s","--start", dest="start", nargs="?", action=self._store_type, help="Start conppy api", type=int, default=8048, metavar="PORT")
apicrud.add_argument("-r","--restart", dest="restart", nargs=0, action=self._store_type, help="Restart conppy api")
apicrud.add_argument("-x","--stop", dest="stop", nargs=0, action=self._store_type, help="Stop conppy api")
apicrud.add_argument("-d", "--debug", dest="debug", nargs="?", action=self._store_type, help="Run connpy server on debug mode", type=int, default=8048, metavar="PORT")
2023-04-06 18:47:29 -03:00
apiparser.set_defaults(func=self._func_api)
2023-12-14 16:56:59 -03:00
#PLUGINSPARSER
pluginparser = subparsers.add_parser("plugin", description="Manage plugins")
plugincrud = pluginparser.add_mutually_exclusive_group(required=True)
plugincrud.add_argument("--add", metavar=("PLUGIN", "FILE"), nargs=2, help="Add new plugin")
plugincrud.add_argument("--update", metavar=("PLUGIN", "FILE"), nargs=2, help="Update plugin")
2023-12-14 16:56:59 -03:00
plugincrud.add_argument("--del", dest="delete", metavar="PLUGIN", nargs=1, help="Delete plugin")
plugincrud.add_argument("--enable", metavar="PLUGIN", nargs=1, help="Enable plugin")
plugincrud.add_argument("--disable", metavar="PLUGIN", nargs=1, help="Disable plugin")
plugincrud.add_argument("--list", dest="list", action="store_true", help="Disable plugin")
pluginparser.set_defaults(func=self._func_plugin)
2022-03-25 12:25:59 -03:00
#CONFIGPARSER
2023-12-14 16:56:59 -03:00
configparser = subparsers.add_parser("config", description="Manage app config")
2022-04-18 19:19:25 -03:00
configcrud = configparser.add_mutually_exclusive_group(required=True)
configcrud.add_argument("--allow-uppercase", dest="case", nargs=1, action=self._store_type, help="Allow case sensitive names", choices=["true","false"])
configcrud.add_argument("--fzf", dest="fzf", nargs=1, action=self._store_type, help="Use fzf for lists", choices=["true","false"])
configcrud.add_argument("--keepalive", dest="idletime", nargs=1, action=self._store_type, help="Set keepalive time in seconds, 0 to disable", type=int, metavar="INT")
configcrud.add_argument("--completion", dest="completion", nargs=1, choices=["bash","zsh"], action=self._store_type, help="Get terminal completion configuration for conn")
2023-04-14 11:44:56 -03:00
configcrud.add_argument("--configfolder", dest="configfolder", nargs=1, action=self._store_type, help="Set the default location for config file", metavar="FOLDER")
configcrud.add_argument("--engineer-model", dest="engineer_model", nargs=1, action=self._store_type, help="Set engineer model", metavar="MODEL")
configcrud.add_argument("--engineer-api-key", dest="engineer_api_key", nargs=1, action=self._store_type, help="Set engineer api_key", metavar="API_KEY")
configcrud.add_argument("--architect-model", dest="architect_model", nargs=1, action=self._store_type, help="Set architect model", metavar="MODEL")
configcrud.add_argument("--architect-api-key", dest="architect_api_key", nargs=1, action=self._store_type, help="Set architect api_key", metavar="API_KEY")
2022-03-25 12:25:59 -03:00
configparser.set_defaults(func=self._func_others)
2023-12-14 16:56:59 -03:00
#Add plugins
self.plugins = Plugins()
2024-04-17 16:27:02 -03:00
try:
core_path = os.path.dirname(os.path.realpath(__file__)) + "/core_plugins"
self.plugins._import_plugins_to_argparse(core_path, subparsers)
except:
pass
try:
file_path = self.config.defaultdir + "/plugins"
self.plugins._import_plugins_to_argparse(file_path, subparsers)
except:
pass
for preload in self.plugins.preloads.values():
preload.Preload(self)
2023-12-14 16:56:59 -03:00
#Generate helps
nodeparser.usage = self._help("usage", subparsers)
nodeparser.epilog = self._help("end", subparsers)
nodeparser.help = self._help("node")
2022-05-11 14:25:43 -03:00
#Manage sys arguments
2023-12-14 16:56:59 -03:00
self.commands = list(subparsers.choices.keys())
profilecmds = []
for action in profileparser._actions:
profilecmds.extend(action.option_strings)
if len(argv) >= 2 and argv[1] == "profile" and argv[0] in profilecmds:
argv[1] = argv[0]
argv[0] = "profile"
2023-12-14 16:56:59 -03:00
if len(argv) < 1 or argv[0] not in self.commands:
argv.insert(0,"node")
args, unknown_args = defaultparser.parse_known_args(argv)
if hasattr(args, "unknown_args"):
args.unknown_args = unknown_args
else:
args = defaultparser.parse_args(argv)
2023-12-14 16:56:59 -03:00
if args.subcommand in self.plugins.plugins:
2023-12-14 18:36:22 -03:00
self.plugins.plugins[args.subcommand].Entrypoint(args, self.plugins.plugin_parsers[args.subcommand].parser, self)
2023-12-14 16:56:59 -03:00
else:
return args.func(args)
2022-03-22 19:54:05 -03:00
2022-04-02 23:25:53 -03:00
class _store_type(argparse.Action):
2022-04-03 12:00:35 -03:00
#Custom store type for cli app.
2022-03-25 12:25:59 -03:00
def __call__(self, parser, args, values, option_string=None):
setattr(args, "data", values)
delattr(args,self.dest)
setattr(args, "command", self.dest)
2022-03-22 19:54:05 -03:00
def _func_node(self, args):
2022-04-03 12:00:35 -03:00
#Function called when connecting or managing nodes.
2022-03-25 12:25:59 -03:00
if not self.case and args.data != None:
args.data = args.data.lower()
2023-11-03 11:59:00 -03:00
actions = {"version": self._version, "connect": self._connect, "add": self._add, "del": self._del, "mod": self._mod, "show": self._show}
2022-06-10 13:24:26 -03:00
return actions.get(args.action)(args)
def _version(self, args):
printer.info(f"Connpy {__version__}")
2022-06-10 13:24:26 -03:00
def _connect(self, args):
if args.data == None:
2024-04-22 18:17:11 -03:00
matches = self.nodes_list
2022-03-22 19:54:05 -03:00
if len(matches) == 0:
printer.warning("There are no nodes created")
printer.info("try: connpy --help")
2022-06-10 13:24:26 -03:00
exit(9)
else:
if args.data.startswith("@"):
2024-04-22 18:17:11 -03:00
matches = list(filter(lambda k: args.data in k, self.nodes_list))
2022-03-22 19:54:05 -03:00
else:
2024-04-22 18:17:11 -03:00
matches = list(filter(lambda k: k.startswith(args.data), self.nodes_list))
2022-06-10 13:24:26 -03:00
if len(matches) == 0:
printer.error("{} not found".format(args.data))
2022-06-10 13:24:26 -03:00
exit(2)
elif len(matches) > 1:
matches[0] = self._choose(matches,"node", "connect")
if matches[0] == None:
exit(7)
node = self.config.getitem(matches[0])
node = self.node(matches[0],**node, config = self.config)
2023-11-03 11:59:00 -03:00
if args.sftp:
node.protocol = "sftp"
if args.debug:
2022-06-10 13:24:26 -03:00
node.interact(debug = True)
else:
node.interact()
def _del(self, args):
if args.data == None:
printer.error("Missing argument node")
2022-06-10 13:24:26 -03:00
exit(3)
elif args.data.startswith("@"):
matches = list(filter(lambda k: k == args.data, self.folders))
else:
2023-10-26 17:33:44 -03:00
matches = self.config._getallnodes(args.data)
2022-06-10 13:24:26 -03:00
if len(matches) == 0:
printer.error("{} not found".format(args.data))
2022-06-10 13:24:26 -03:00
exit(2)
printer.info("Removing: {}".format(matches))
2023-10-26 17:33:44 -03:00
question = [inquirer.Confirm("delete", message="Are you sure you want to continue?")]
2022-06-10 13:24:26 -03:00
confirm = inquirer.prompt(question)
if confirm == None:
exit(7)
if confirm["delete"]:
if args.data.startswith("@"):
2023-10-26 17:33:44 -03:00
uniques = self.config._explode_unique(matches[0])
2022-06-10 13:24:26 -03:00
self.config._folder_del(**uniques)
2022-03-22 19:54:05 -03:00
else:
2023-10-26 17:33:44 -03:00
for node in matches:
nodeuniques = self.config._explode_unique(node)
self.config._connections_del(**nodeuniques)
2022-06-10 13:24:26 -03:00
self.config._saveconfig(self.config.file)
2023-10-26 17:33:44 -03:00
if len(matches) == 1:
printer.success("{} deleted successfully".format(matches[0]))
2023-10-26 17:33:44 -03:00
else:
printer.success(f"{len(matches)} nodes deleted successfully")
2022-06-10 13:24:26 -03:00
def _add(self, args):
2023-10-26 17:33:44 -03:00
args.data = self._type_node(args.data)
2022-06-10 13:24:26 -03:00
if args.data == None:
printer.error("Missing argument node")
2022-06-10 13:24:26 -03:00
exit(3)
elif args.data.startswith("@"):
type = "folder"
matches = list(filter(lambda k: k == args.data, self.folders))
2024-04-22 18:17:11 -03:00
reversematches = list(filter(lambda k: "@" + k == args.data, self.nodes_list))
2022-06-10 13:24:26 -03:00
else:
type = "node"
2024-04-22 18:17:11 -03:00
matches = list(filter(lambda k: k == args.data, self.nodes_list))
2022-06-10 13:24:26 -03:00
reversematches = list(filter(lambda k: k == "@" + args.data, self.folders))
if len(matches) > 0:
printer.error("{} already exist".format(matches[0]))
2022-06-10 13:24:26 -03:00
exit(4)
if len(reversematches) > 0:
printer.error("{} already exist".format(reversematches[0]))
2022-06-10 13:24:26 -03:00
exit(4)
else:
if type == "folder":
uniques = self.config._explode_unique(args.data)
if uniques == False:
printer.error("Invalid folder {}".format(args.data))
2022-06-10 13:24:26 -03:00
exit(5)
if "subfolder" in uniques.keys():
parent = "@" + uniques["folder"]
if parent not in self.folders:
printer.error("Folder {} not found".format(uniques["folder"]))
2022-06-10 13:24:26 -03:00
exit(2)
self.config._folder_add(**uniques)
self.config._saveconfig(self.config.file)
printer.success("{} added successfully".format(args.data))
2022-06-10 13:24:26 -03:00
if type == "node":
nodefolder = args.data.partition("@")
nodefolder = "@" + nodefolder[2]
if nodefolder not in self.folders and nodefolder != "@":
printer.error(nodefolder + " not found")
2022-06-10 13:24:26 -03:00
exit(2)
uniques = self.config._explode_unique(args.data)
if uniques == False:
printer.error("Invalid node {}".format(args.data))
2022-06-10 13:24:26 -03:00
exit(5)
2024-06-17 15:58:28 -03:00
self._print_instructions()
2022-06-10 13:24:26 -03:00
newnode = self._questions_nodes(args.data, uniques)
if newnode == False:
exit(7)
self.config._connections_add(**newnode)
2022-04-03 18:25:58 -03:00
self.config._saveconfig(self.config.file)
printer.success("{} added successfully".format(args.data))
2022-06-10 13:24:26 -03:00
def _show(self, args):
if args.data == None:
printer.error("Missing argument node")
2022-06-10 13:24:26 -03:00
exit(3)
if args.data.startswith("@"):
matches = list(filter(lambda k: args.data in k, self.nodes_list))
else:
matches = list(filter(lambda k: k.startswith(args.data), self.nodes_list))
2022-06-10 13:24:26 -03:00
if len(matches) == 0:
printer.error("{} not found".format(args.data))
2022-06-10 13:24:26 -03:00
exit(2)
elif len(matches) > 1:
matches[0] = self._choose(matches,"node", "connect")
if matches[0] == None:
exit(7)
2022-06-10 13:24:26 -03:00
node = self.config.getitem(matches[0])
yaml_output = yaml.dump(node, sort_keys=False, default_flow_style=False)
printer.custom(matches[0],"")
print(yaml_output)
2022-06-10 13:24:26 -03:00
def _mod(self, args):
if args.data == None:
printer.error("Missing argument node")
2022-06-10 13:24:26 -03:00
exit(3)
matches = self.config._getallnodes(args.data)
2022-06-10 13:24:26 -03:00
if len(matches) == 0:
printer.error("No connection found with filter: {}".format(args.data))
2022-06-10 13:24:26 -03:00
exit(2)
elif len(matches) == 1:
uniques = self.config._explode_unique(matches[0])
unique = matches[0]
else:
uniques = {"id": None, "folder": None}
unique = None
printer.info("Editing: {}".format(matches))
node = {}
for i in matches:
node[i] = self.config.getitem(i)
2022-06-10 13:24:26 -03:00
edits = self._questions_edit()
if edits == None:
exit(7)
updatenode = self._questions_nodes(unique, uniques, edit=edits)
2022-06-10 13:24:26 -03:00
if not updatenode:
exit(7)
if len(matches) == 1:
uniques.update(node[matches[0]])
uniques["type"] = "connection"
if sorted(updatenode.items()) == sorted(uniques.items()):
printer.info("Nothing to do here")
return
else:
self.config._connections_add(**updatenode)
self.config._saveconfig(self.config.file)
printer.success("{} edited successfully".format(args.data))
2022-06-10 13:24:26 -03:00
else:
for k in node:
updatednode = self.config._explode_unique(k)
updatednode["type"] = "connection"
updatednode.update(node[k])
editcount = 0
for key, should_edit in edits.items():
if should_edit:
editcount += 1
updatednode[key] = updatenode[key]
if not editcount:
printer.info("Nothing to do here")
return
else:
self.config._connections_add(**updatednode)
2022-06-10 13:24:26 -03:00
self.config._saveconfig(self.config.file)
printer.success("{} edited successfully".format(matches))
return
2022-03-22 19:54:05 -03:00
def _func_profile(self, args):
2022-04-03 12:00:35 -03:00
#Function called when managing profiles
2022-03-25 12:25:59 -03:00
if not self.case:
args.data[0] = args.data[0].lower()
2022-06-10 13:24:26 -03:00
actions = {"add": self._profile_add, "del": self._profile_del, "mod": self._profile_mod, "show": self._profile_show}
return actions.get(args.action)(args)
def _profile_del(self, args):
matches = list(filter(lambda k: k == args.data[0], self.profiles))
if len(matches) == 0:
printer.error("{} not found".format(args.data[0]))
2022-06-10 13:24:26 -03:00
exit(2)
if matches[0] == "default":
printer.error("Can't delete default profile")
2022-06-10 13:24:26 -03:00
exit(6)
2023-04-14 11:44:56 -03:00
usedprofile = self.config._profileused(matches[0])
2022-06-10 13:24:26 -03:00
if len(usedprofile) > 0:
printer.error(f"Profile {matches[0]} used in the following nodes:\n{', '.join(usedprofile)}")
2022-06-10 13:24:26 -03:00
exit(8)
question = [inquirer.Confirm("delete", message="Are you sure you want to delete {}?".format(matches[0]))]
confirm = inquirer.prompt(question)
if confirm["delete"]:
self.config._profiles_del(id = matches[0])
2022-04-03 18:25:58 -03:00
self.config._saveconfig(self.config.file)
printer.success("{} deleted successfully".format(matches[0]))
2022-06-10 13:24:26 -03:00
def _profile_show(self, args):
matches = list(filter(lambda k: k == args.data[0], self.profiles))
if len(matches) == 0:
printer.error("{} not found".format(args.data[0]))
2022-06-10 13:24:26 -03:00
exit(2)
profile = self.config.profiles[matches[0]]
yaml_output = yaml.dump(profile, sort_keys=False, default_flow_style=False)
printer.custom(matches[0],"")
print(yaml_output)
2022-06-10 13:24:26 -03:00
def _profile_add(self, args):
matches = list(filter(lambda k: k == args.data[0], self.profiles))
if len(matches) > 0:
printer.error("Profile {} Already exist".format(matches[0]))
2022-06-10 13:24:26 -03:00
exit(4)
newprofile = self._questions_profiles(args.data[0])
if newprofile == False:
exit(7)
self.config._profiles_add(**newprofile)
self.config._saveconfig(self.config.file)
printer.success("{} added successfully".format(args.data[0]))
2022-06-10 13:24:26 -03:00
def _profile_mod(self, args):
matches = list(filter(lambda k: k == args.data[0], self.profiles))
if len(matches) == 0:
printer.error("{} not found".format(args.data[0]))
2022-06-10 13:24:26 -03:00
exit(2)
profile = self.config.profiles[matches[0]]
oldprofile = {"id": matches[0]}
oldprofile.update(profile)
edits = self._questions_edit()
if edits == None:
exit(7)
updateprofile = self._questions_profiles(matches[0], edit=edits)
if not updateprofile:
exit(7)
if sorted(updateprofile.items()) == sorted(oldprofile.items()):
printer.info("Nothing to do here")
2022-06-10 13:24:26 -03:00
return
else:
self.config._profiles_add(**updateprofile)
self.config._saveconfig(self.config.file)
printer.success("{} edited successfully".format(args.data[0]))
2022-03-22 19:54:05 -03:00
def _func_others(self, args):
2022-04-03 12:00:35 -03:00
#Function called when using other commands
actions = {"ls": self._ls, "move": self._mvcp, "cp": self._mvcp, "bulk": self._bulk, "completion": self._completion, "case": self._case, "fzf": self._fzf, "idletime": self._idletime, "configfolder": self._configfolder, "engineer_model": self._ai_config, "engineer_api_key": self._ai_config, "architect_model": self._ai_config, "architect_api_key": self._ai_config}
2022-06-10 13:24:26 -03:00
return actions.get(args.command)(args)
def _ai_config(self, args):
if "ai" in self.config.config:
aiconfig = self.config.config["ai"]
else:
aiconfig = {}
aiconfig[args.command] = args.data[0]
self._change_settings("ai", aiconfig)
2022-06-10 13:24:26 -03:00
def _ls(self, args):
2024-06-10 15:45:04 -03:00
if args.data == "nodes":
attribute = "nodes_list"
else:
attribute = args.data
items = getattr(self, attribute)
2023-10-26 17:33:44 -03:00
if args.filter:
items = [ item for item in items if re.search(args.filter[0], item)]
if args.format and args.data == "nodes":
newitems = []
for i in items:
formated = {}
info = self.config.getitem(i)
if "@" in i:
name_part, location_part = i.split("@", 1)
formated["location"] = "@" + location_part
else:
name_part = i
formated["location"] = ""
formated["name"] = name_part
formated["host"] = info["host"]
items_copy = list(formated.items())
for key, value in items_copy:
upper_key = key.upper()
upper_value = value.upper()
formated[upper_key] = upper_value
newitems.append(args.format[0].format(**formated))
items = newitems
yaml_output = yaml.dump(items, sort_keys=False, default_flow_style=False)
printer.custom(args.data,"")
print(yaml_output)
2022-06-10 13:24:26 -03:00
def _mvcp(self, args):
if not self.case:
args.data[0] = args.data[0].lower()
args.data[1] = args.data[1].lower()
2024-04-22 18:17:11 -03:00
source = list(filter(lambda k: k == args.data[0], self.nodes_list))
dest = list(filter(lambda k: k == args.data[1], self.nodes_list))
2022-06-10 13:24:26 -03:00
if len(source) != 1:
printer.error("{} not found".format(args.data[0]))
2022-06-10 13:24:26 -03:00
exit(2)
if len(dest) > 0:
printer.error("Node {} Already exist".format(args.data[1]))
2022-06-10 13:24:26 -03:00
exit(4)
nodefolder = args.data[1].partition("@")
nodefolder = "@" + nodefolder[2]
if nodefolder not in self.folders and nodefolder != "@":
printer.error("{} not found".format(nodefolder))
2022-06-10 13:24:26 -03:00
exit(2)
olduniques = self.config._explode_unique(args.data[0])
newuniques = self.config._explode_unique(args.data[1])
if newuniques == False:
printer.error("Invalid node {}".format(args.data[1]))
2022-06-10 13:24:26 -03:00
exit(5)
node = self.config.getitem(source[0])
newnode = {**newuniques, **node}
self.config._connections_add(**newnode)
if args.command == "move":
self.config._connections_del(**olduniques)
self.config._saveconfig(self.config.file)
action = "moved" if args.command == "move" else "copied"
printer.success("{} {} successfully to {}".format(args.data[0],action, args.data[1]))
2022-06-10 13:24:26 -03:00
def _bulk(self, args):
if args.file and os.path.isfile(args.file[0]):
with open(args.file[0], 'r') as f:
lines = f.readlines()
# Expecting exactly 2 lines
if len(lines) < 2:
printer.error("The file must contain at least two lines: one for nodes, one for hosts.")
exit(11)
nodes = lines[0].strip()
hosts = lines[1].strip()
newnodes = self._questions_bulk(nodes, hosts)
else:
newnodes = self._questions_bulk()
2022-06-10 13:24:26 -03:00
if newnodes == False:
exit(7)
if not self.case:
newnodes["location"] = newnodes["location"].lower()
newnodes["ids"] = newnodes["ids"].lower()
ids = newnodes["ids"].split(",")
hosts = newnodes["host"].split(",")
count = 0
for n in ids:
unique = n + newnodes["location"]
2024-04-22 18:17:11 -03:00
matches = list(filter(lambda k: k == unique, self.nodes_list))
2022-06-10 13:24:26 -03:00
reversematches = list(filter(lambda k: k == "@" + unique, self.folders))
if len(matches) > 0:
printer.info("Node {} already exist, ignoring it".format(unique))
2022-06-10 13:24:26 -03:00
continue
if len(reversematches) > 0:
printer.info("Folder with name {} already exist, ignoring it".format(unique))
2022-06-10 13:24:26 -03:00
continue
newnode = {"id": n}
if newnodes["location"] != "":
location = self.config._explode_unique(newnodes["location"])
newnode.update(location)
if len(hosts) > 1:
index = ids.index(n)
newnode["host"] = hosts[index]
else:
newnode["host"] = hosts[0]
newnode["protocol"] = newnodes["protocol"]
newnode["port"] = newnodes["port"]
newnode["options"] = newnodes["options"]
newnode["logs"] = newnodes["logs"]
newnode["tags"] = newnodes["tags"]
2023-12-01 18:30:29 -03:00
newnode["jumphost"] = newnodes["jumphost"]
2022-06-10 13:24:26 -03:00
newnode["user"] = newnodes["user"]
newnode["password"] = newnodes["password"]
count +=1
2022-03-23 17:28:53 -03:00
self.config._connections_add(**newnode)
2024-04-22 18:17:11 -03:00
self.nodes_list = self.config._getallnodes()
2022-06-10 13:24:26 -03:00
if count > 0:
2022-04-03 18:25:58 -03:00
self.config._saveconfig(self.config.file)
printer.success("Successfully added {} nodes".format(count))
2022-03-23 17:28:53 -03:00
else:
printer.info("0 nodes added")
2022-06-10 13:24:26 -03:00
def _completion(self, args):
if args.data[0] == "bash":
print(self._help("bashcompletion"))
elif args.data[0] == "zsh":
print(self._help("zshcompletion"))
def _case(self, args):
if args.data[0] == "true":
args.data[0] = True
elif args.data[0] == "false":
args.data[0] = False
self._change_settings(args.command, args.data[0])
def _fzf(self, args):
if args.data[0] == "true":
args.data[0] = True
elif args.data[0] == "false":
args.data[0] = False
self._change_settings(args.command, args.data[0])
def _idletime(self, args):
if args.data[0] < 0:
args.data[0] = 0
self._change_settings(args.command, args.data[0])
2023-04-14 11:44:56 -03:00
def _configfolder(self, args):
if not os.path.isdir(args.data[0]):
raise argparse.ArgumentTypeError(f"readable_dir:{args.data[0]} is not a valid path")
else:
2023-12-14 16:56:59 -03:00
pathfile = self.config.defaultdir + "/.folder"
2023-04-14 11:44:56 -03:00
folder = os.path.abspath(args.data[0]).rstrip('/')
with open(pathfile, "w") as f:
f.write(str(folder))
printer.success("Config saved")
def _openai(self, args):
if "openai" in self.config.config:
openaikeys = self.config.config["openai"]
else:
openaikeys = {}
openaikeys[args.command] = args.data[0]
self._change_settings("openai", openaikeys)
def _anthropic(self, args):
if "anthropic" in self.config.config:
anthropickeys = self.config.config["anthropic"]
else:
anthropickeys = {}
# Mapear el nombre del argumento al nombre de la clave en el config (sin el prefijo 'anthropic_')
key_name = args.command.replace("anthropic_", "")
anthropickeys[key_name] = args.data[0]
self._change_settings("anthropic", anthropickeys)
def _google(self, args):
if "google" in self.config.config:
googlekeys = self.config.config["google"]
else:
googlekeys = {}
# Mapear el nombre del argumento al nombre de la clave en el config (sin el prefijo 'google_')
key_name = args.command.replace("google_", "")
googlekeys[key_name] = args.data[0]
self._change_settings("google", googlekeys)
2023-04-14 11:44:56 -03:00
2022-06-10 13:24:26 -03:00
def _change_settings(self, name, value):
self.config.config[name] = value
self.config._saveconfig(self.config.file)
printer.success("Config saved")
2022-03-22 19:54:05 -03:00
2023-12-14 16:56:59 -03:00
def _func_plugin(self, args):
if args.add:
if not os.path.exists(args.add[1]):
printer.error("File {} dosn't exists.".format(args.add[1]))
2023-12-14 16:56:59 -03:00
exit(14)
if args.add[0].isalpha() and args.add[0].islower() and len(args.add[0]) <= 15:
disabled_dest_file = os.path.join(self.config.defaultdir + "/plugins", args.add[0] + ".py.bkp")
if args.add[0] in self.commands or os.path.exists(disabled_dest_file):
printer.error("Plugin name can't be the same as other commands.")
2023-12-14 16:56:59 -03:00
exit(15)
else:
check_bad_script = self.plugins.verify_script(args.add[1])
if check_bad_script:
printer.error(check_bad_script)
2023-12-14 16:56:59 -03:00
exit(16)
else:
try:
dest_file = os.path.join(self.config.defaultdir + "/plugins", args.add[0] + ".py")
shutil.copy2(args.add[1], dest_file)
printer.success(f"Plugin {args.add[0]} added successfully.")
except Exception as e:
printer.error(f"Failed importing plugin file. {e}")
2023-12-14 16:56:59 -03:00
exit(17)
else:
printer.error("Plugin name should be lowercase letters up to 15 characters.")
2023-12-14 16:56:59 -03:00
exit(15)
elif args.update:
if not os.path.exists(args.update[1]):
printer.error("File {} dosn't exists.".format(args.update[1]))
exit(14)
plugin_file = os.path.join(self.config.defaultdir + "/plugins", args.update[0] + ".py")
disabled_plugin_file = os.path.join(self.config.defaultdir + "/plugins", args.update[0] + ".py.bkp")
plugin_exist = os.path.exists(plugin_file)
disabled_plugin_exist = os.path.exists(disabled_plugin_file)
if plugin_exist or disabled_plugin_exist:
check_bad_script = self.plugins.verify_script(args.update[1])
if check_bad_script:
printer.error(check_bad_script)
exit(16)
else:
try:
disabled_dest_file = os.path.join(self.config.defaultdir + "/plugins", args.update[0] + ".py.bkp")
dest_file = os.path.join(self.config.defaultdir + "/plugins", args.update[0] + ".py")
if disabled_plugin_exist:
shutil.copy2(args.update[1], disabled_dest_file)
else:
shutil.copy2(args.update[1], dest_file)
printer.success(f"Plugin {args.update[0]} updated successfully.")
except Exception as e:
printer.error(f"Failed updating plugin file. {e}")
exit(17)
else:
printer.error("Plugin {} dosn't exist.".format(args.update[0]))
exit(14)
2023-12-14 16:56:59 -03:00
elif args.delete:
plugin_file = os.path.join(self.config.defaultdir + "/plugins", args.delete[0] + ".py")
disabled_plugin_file = os.path.join(self.config.defaultdir + "/plugins", args.delete[0] + ".py.bkp")
plugin_exist = os.path.exists(plugin_file)
disabled_plugin_exist = os.path.exists(disabled_plugin_file)
if not plugin_exist and not disabled_plugin_exist:
printer.error("Plugin {} dosn't exist.".format(args.delete[0]))
2023-12-14 16:56:59 -03:00
exit(14)
question = [inquirer.Confirm("delete", message="Are you sure you want to delete {} plugin?".format(args.delete[0]))]
confirm = inquirer.prompt(question)
if confirm == None:
exit(7)
if confirm["delete"]:
try:
if plugin_exist:
os.remove(plugin_file)
elif disabled_plugin_exist:
os.remove(disabled_plugin_file)
printer.success(f"plugin {args.delete[0]} deleted successfully.")
except Exception as e:
printer.error(f"Failed deleting plugin file. {e}")
2023-12-14 16:56:59 -03:00
exit(17)
elif args.disable:
plugin_file = os.path.join(self.config.defaultdir + "/plugins", args.disable[0] + ".py")
disabled_plugin_file = os.path.join(self.config.defaultdir + "/plugins", args.disable[0] + ".py.bkp")
if not os.path.exists(plugin_file) or os.path.exists(disabled_plugin_file):
printer.error("Plugin {} dosn't exist or it's disabled.".format(args.disable[0]))
2023-12-14 16:56:59 -03:00
exit(14)
try:
os.rename(plugin_file, disabled_plugin_file)
printer.success(f"plugin {args.disable[0]} disabled successfully.")
except Exception as e:
printer.error(f"Failed disabling plugin file. {e}")
2023-12-14 16:56:59 -03:00
exit(17)
elif args.enable:
plugin_file = os.path.join(self.config.defaultdir + "/plugins", args.enable[0] + ".py")
disabled_plugin_file = os.path.join(self.config.defaultdir + "/plugins", args.enable[0] + ".py.bkp")
if os.path.exists(plugin_file) or not os.path.exists(disabled_plugin_file):
printer.error("Plugin {} dosn't exist or it's enabled.".format(args.enable[0]))
2023-12-14 16:56:59 -03:00
exit(14)
try:
os.rename(disabled_plugin_file, plugin_file)
printer.success(f"plugin {args.enable[0]} enabled successfully.")
except Exception as e:
printer.error(f"Failed enabling plugin file. {e}")
2023-12-14 16:56:59 -03:00
exit(17)
elif args.list:
enabled_files = []
disabled_files = []
plugins = {}
# Iterate over all files in the specified folder
for file in os.listdir(self.config.defaultdir + "/plugins"):
# Check if the file is a Python file
if file.endswith('.py'):
enabled_files.append(os.path.splitext(file)[0])
# Check if the file is a Python backup file
elif file.endswith('.py.bkp'):
disabled_files.append(os.path.splitext(os.path.splitext(file)[0])[0])
if enabled_files:
plugins["Enabled"] = enabled_files
if disabled_files:
plugins["Disabled"] = disabled_files
if plugins:
printer.custom("plugins","")
2023-12-14 16:56:59 -03:00
print(yaml.dump(plugins, sort_keys=False))
else:
printer.warning("There are no plugins added.")
2023-12-14 16:56:59 -03:00
def _func_import(self, args):
if not os.path.exists(args.data[0]):
printer.error("File {} dosn't exist".format(args.data[0]))
exit(14)
printer.warning("This could overwrite your current configuration!")
question = [inquirer.Confirm("import", message="Are you sure you want to import {} file?".format(args.data[0]))]
confirm = inquirer.prompt(question)
if confirm == None:
exit(7)
if confirm["import"]:
try:
with open(args.data[0]) as file:
imported = yaml.load(file, Loader=yaml.FullLoader)
except:
printer.error("failed reading file {}".format(args.data[0]))
exit(10)
for k,v in imported.items():
uniques = self.config._explode_unique(k)
2023-09-21 17:28:09 -03:00
if "folder" in uniques:
folder = f"@{uniques['folder']}"
matches = list(filter(lambda k: k == folder, self.folders))
if len(matches) == 0:
uniquefolder = self.config._explode_unique(folder)
self.config._folder_add(**uniquefolder)
if "subfolder" in uniques:
subfolder = f"@{uniques['subfolder']}@{uniques['folder']}"
matches = list(filter(lambda k: k == subfolder, self.folders))
if len(matches) == 0:
uniquesubfolder = self.config._explode_unique(subfolder)
self.config._folder_add(**uniquesubfolder)
uniques.update(v)
self.config._connections_add(**uniques)
self.config._saveconfig(self.config.file)
printer.success("File {} imported successfully".format(args.data[0]))
return
def _func_export(self, args):
2023-09-21 17:28:09 -03:00
if os.path.exists(args.data[0]):
printer.error("File {} already exists".format(args.data[0]))
exit(14)
2023-09-21 17:28:09 -03:00
if len(args.data[1:]) == 0:
foldercons = self.config._getallnodesfull(extract = False)
else:
2023-09-21 17:28:09 -03:00
for folder in args.data[1:]:
matches = list(filter(lambda k: k == folder, self.folders))
if len(matches) == 0 and folder != "@":
printer.error("{} folder not found".format(folder))
2023-09-21 17:28:09 -03:00
exit(2)
foldercons = self.config._getallnodesfull(args.data[1:], extract = False)
with open(args.data[0], "w") as file:
yaml.dump(foldercons, file, Dumper=NoAliasDumper, default_flow_style=False)
file.close()
printer.success("File {} generated successfully".format(args.data[0]))
2023-09-21 17:28:09 -03:00
exit()
return
2022-05-11 14:25:43 -03:00
def _func_run(self, args):
if len(args.data) > 1:
2022-06-10 13:24:26 -03:00
args.action = "noderun"
actions = {"noderun": self._node_run, "generate": self._yaml_generate, "run": self._yaml_run}
return actions.get(args.action)(args)
def _func_ai(self, args):
arguments = {}
if args.engineer_model:
arguments["engineer_model"] = args.engineer_model[0]
if args.engineer_api_key:
arguments["engineer_api_key"] = args.engineer_api_key[0]
if args.architect_model:
arguments["architect_model"] = args.architect_model[0]
if args.architect_api_key:
arguments["architect_api_key"] = args.architect_api_key[0]
2024-04-22 18:17:11 -03:00
self.myai = self.ai(self.config, **arguments)
if args.ask:
# Single question mode
query = " ".join(args.ask)
with console.status("[bold green]Agent is thinking and analyzing...") as status:
result = self.myai.ask(query, status=status, debug=args.debug)
# Determine title and color based on responder
responder = result.get("responder", "engineer")
if responder == "architect":
title = "[bold purple]Network Architect[/bold purple]"
border_style = "purple"
else:
title = "[bold blue]Network Engineer[/bold blue]"
border_style = "blue"
# Only render in panel if response wasn't already streamed
if not result.get("streamed"):
mdprint(Panel(Markdown(result["response"]), title=title, border_style=border_style, expand=False))
# Mostrar tokens consumidos
if "usage" in result:
u = result["usage"]
console.print(f"[dim]Tokens: {u['total']} (Input: {u['input']}, Output: {u['output']})[/dim]")
print("\r")
else:
# Interactive chat mode
history = None
mdprint(Rule(style="bold blue"))
mdprint(Markdown("**Networking Expert Agent**: Hi! I'm your assistant. I can help you diagnose issues, run commands, and manage your nodes.\nType 'exit' to quit.\n"))
mdprint(Rule(style="bold blue"))
while True:
try:
user_query = Prompt.ask("[bold cyan]User[/bold cyan]")
if not user_query.strip():
continue
if user_query.lower() in ['exit', 'quit', 'bye']:
break
# User message is already in the prompt, no need to print it again
try:
with console.status("[bold green]Agent is thinking...") as status:
result = self.myai.ask(user_query, chat_history=history, status=status, debug=args.debug)
except KeyboardInterrupt:
# La interrupción ahora se maneja dentro de myai.ask para no perder el contexto
# y generar un resumen de lo que se estaba haciendo.
continue
history = result.get("chat_history")
# Determine title and color based on responder
responder = result.get("responder", "engineer")
if responder == "architect":
title = "[bold purple]Network Architect[/bold purple]"
border_style = "purple"
else:
title = "[bold blue]Network Engineer[/bold blue]"
border_style = "blue"
# Only render in panel if response wasn't already streamed
if not result.get("streamed"):
mdprint(Panel(Markdown(result["response"]), title=title, border_style=border_style, expand=False))
# Mostrar tokens consumidos
if "usage" in result:
u = result["usage"]
console.print(f"[dim]Tokens: {u['total']} (Input: {u['input']}, Output: {u['output']})[/dim]")
print("\r")
except KeyboardInterrupt:
break
return
def _ai_validation(self, answers, current, regex = "^.+$"):
#Validate ai user chat.
if not re.match(regex, current):
raise inquirer.errors.ValidationError("", reason="Can't send empty messages")
return True
2023-04-06 18:47:29 -03:00
def _func_api(self, args):
2023-04-14 11:44:56 -03:00
if args.command == "stop" or args.command == "restart":
2024-04-22 18:17:11 -03:00
args.data = self.stop_api()
2023-04-14 11:44:56 -03:00
if args.command == "start" or args.command == "restart":
2023-04-15 22:38:52 -03:00
if args.data:
2024-04-22 18:17:11 -03:00
self.start_api(args.data)
2023-04-15 22:38:52 -03:00
else:
2024-04-22 18:17:11 -03:00
self.start_api()
2023-04-15 22:38:52 -03:00
if args.command == "debug":
if args.data:
2024-04-22 18:17:11 -03:00
self.debug_api(args.data)
2023-04-15 22:38:52 -03:00
else:
2024-04-22 18:17:11 -03:00
self.debug_api()
2023-04-06 18:47:29 -03:00
return
2022-06-10 13:24:26 -03:00
def _node_run(self, args):
command = " ".join(args.data[1:])
2023-10-26 17:33:44 -03:00
script = {}
script["name"] = "Output"
script["action"] = "run"
script["nodes"] = args.data[0]
script["commands"] = [command]
script["output"] = "stdout"
self._cli_run(script)
2022-06-10 13:24:26 -03:00
def _yaml_generate(self, args):
if os.path.exists(args.data[0]):
printer.error("File {} already exists".format(args.data[0]))
2022-06-10 13:24:26 -03:00
exit(14)
2022-05-11 14:25:43 -03:00
else:
2022-06-10 13:24:26 -03:00
with open(args.data[0], "w") as file:
file.write(self._help("generate"))
file.close()
printer.success("File {} generated successfully".format(args.data[0]))
2022-06-10 13:24:26 -03:00
exit()
def _yaml_run(self, args):
try:
with open(args.data[0]) as file:
scripts = yaml.load(file, Loader=yaml.FullLoader)
except:
printer.error("failed reading file {}".format(args.data[0]))
2022-06-10 13:24:26 -03:00
exit(10)
for script in scripts["tasks"]:
self._cli_run(script)
def _cli_run(self, script):
import threading as _threading
args = {}
try:
action = script["action"]
nodelist = script["nodes"]
args["commands"] = script["commands"]
output = script["output"]
if action == "test":
args["expected"] = script["expected"]
except KeyError as e:
printer.error("'{}' is mandatory".format(e.args[0]))
exit(11)
2023-10-26 17:33:44 -03:00
nodes = self.config._getallnodes(nodelist)
if len(nodes) == 0:
printer.error("{} don't match any node".format(nodelist))
2023-10-26 17:33:44 -03:00
exit(2)
2024-04-22 18:17:11 -03:00
nodes = self.nodes(self.config.getitems(nodes), config = self.config)
stdout = False
if output is None:
pass
elif output == "stdout":
stdout = True
elif isinstance(output, str) and action == "run":
args["folder"] = output
if "variables" in script:
args["vars"] = script["variables"]
if "vars" in script:
args["vars"] = script["vars"]
try:
options = script["options"]
thisoptions = {k: v for k, v in options.items() if k in ["prompt", "parallel", "timeout"]}
args.update(thisoptions)
except:
options = None
2023-10-26 17:33:44 -03:00
try:
size = str(os.get_terminal_size())
p = re.search(r'.*columns=([0-9]+)', size)
columns = int(p.group(1))
except:
columns = 80
PANEL_WIDTH = columns
header = f"{script['name'].upper()}"
# Streaming mode: print each node's panel as it completes
if action == "run" and stdout:
mdprint(Rule(header, style="bold cyan"))
print_lock = _threading.Lock()
def _on_node_complete(unique, node_output, node_status):
if node_status == 0:
status_str = "[bold green]✓ PASS[/bold green]"
border = "green"
title_line = f"[bold]{unique}[/bold] — {status_str}"
else:
status_str = f"[bold red]✗ FAIL({node_status})[/bold red]"
border = "red"
title_line = f"[bold]{unique}[/bold] — {status_str}"
stripped = node_output.strip() if node_output else ""
code_block = Text(stripped + "\n") if stripped else Text()
panel_content = Group(Text(), Text(""), code_block)
with print_lock:
mdprint(Panel(panel_content, title=title_line, width=PANEL_WIDTH, border_style=border))
nodes.run(**args, on_complete=_on_node_complete)
return
# Batch mode: wait for all nodes, then print
if action == "run":
nodes.run(**args)
elif action == "test":
nodes.test(**args)
else:
printer.error(f"Wrong action '{action}'")
exit(13)
2022-05-11 14:25:43 -03:00
mdprint(Rule(header, style="bold cyan"))
for node in nodes.status:
if nodes.status[node] == 0:
status_str = "[bold green]✓ PASS[/bold green]"
border = "green"
else:
status_str = f"[bold red]✗ FAIL({nodes.status[node]})[/bold red]"
border = "red"
title_line = f"[bold]{node}[/bold] — {status_str}"
test_output = Text()
if action == "test" and nodes.status[node] == 0:
results = nodes.result[node]
test_output.append("TEST RESULTS:\n", style="bold cyan")
max_key_len = max(len(k) for k in results.keys())
for k, v in results.items():
if str(v).upper() == "TRUE":
test_output.append(f" {k.ljust(max_key_len)}\n", style="green")
else:
test_output.append(f" {k.ljust(max_key_len)}\n", style="red")
output = nodes.output[node].strip()
code_block = Text()
if stdout and output:
code_block = Text(output + "\n")
if action == "test" and nodes.status[node] == 0:
highlight_words = [k for k, v in nodes.result[node].items() if str(v).upper() == "TRUE"]
code_block.highlight_words(highlight_words, style=Style(color="green", bold=True, underline=True))
panel_content = Group(test_output, Text(""), code_block)
mdprint(Panel(panel_content, title=title_line, width=PANEL_WIDTH, border_style=border))
2022-03-22 19:54:05 -03:00
def _choose(self, list, name, action):
2022-04-03 12:00:35 -03:00
#Generates an inquirer list to pick
2022-04-18 19:19:25 -03:00
if FzfPrompt and self.fzf:
fzf = FzfPrompt(executable_path="fzf-tmux")
2023-03-21 18:23:29 -03:00
if not self.case:
fzf = FzfPrompt(executable_path="fzf-tmux -i")
2022-04-18 19:19:25 -03:00
answer = fzf.prompt(list, fzf_options="-d 25%")
if len(answer) == 0:
return
else:
return answer[0]
2022-03-22 19:54:05 -03:00
else:
2022-04-18 19:19:25 -03:00
questions = [inquirer.List(name, message="Pick {} to {}:".format(name,action), choices=list, carousel=True)]
answer = inquirer.prompt(questions)
if answer == None:
return
else:
return answer[name]
2022-03-22 19:54:05 -03:00
def _host_validation(self, answers, current, regex = "^.+$"):
2022-04-03 12:00:35 -03:00
#Validate hostname in inquirer when managing nodes
2022-03-22 19:54:05 -03:00
if not re.match(regex, current):
raise inquirer.errors.ValidationError("", reason="Host cannot be empty")
if current.startswith("@"):
if current[1:] not in self.profiles:
raise inquirer.errors.ValidationError("", reason="Profile {} don't exist".format(current))
return True
2024-06-17 15:58:28 -03:00
def _profile_protocol_validation(self, answers, current, regex = "(^ssh$|^telnet$|^kubectl$|^docker$|^$)"):
2022-04-03 12:00:35 -03:00
#Validate protocol in inquirer when managing profiles
2022-03-23 17:28:53 -03:00
if not re.match(regex, current):
2024-06-17 15:58:28 -03:00
raise inquirer.errors.ValidationError("", reason="Pick between ssh, telnet, kubectl, docker or leave empty")
2022-03-23 17:28:53 -03:00
return True
2024-06-17 15:58:28 -03:00
def _protocol_validation(self, answers, current, regex = "(^ssh$|^telnet$|^kubectl$|^docker$|^$|^@.+$)"):
2022-04-03 12:00:35 -03:00
#Validate protocol in inquirer when managing nodes
2022-03-22 19:54:05 -03:00
if not re.match(regex, current):
2024-06-17 15:58:28 -03:00
raise inquirer.errors.ValidationError("", reason="Pick between ssh, telnet, kubectl, docker leave empty or @profile")
2022-03-22 19:54:05 -03:00
if current.startswith("@"):
if current[1:] not in self.profiles:
raise inquirer.errors.ValidationError("", reason="Profile {} don't exist".format(current))
return True
2022-03-23 17:28:53 -03:00
def _profile_port_validation(self, answers, current, regex = "(^[0-9]*$)"):
2022-04-03 12:00:35 -03:00
#Validate port in inquirer when managing profiles
2022-03-22 19:54:05 -03:00
if not re.match(regex, current):
raise inquirer.errors.ValidationError("", reason="Pick a port between 1-65535, @profile o leave empty")
try:
port = int(current)
except:
port = 0
2022-03-23 17:28:53 -03:00
if current != "" and not 1 <= int(port) <= 65535:
raise inquirer.errors.ValidationError("", reason="Pick a port between 1-65535 or leave empty")
return True
def _port_validation(self, answers, current, regex = "(^[0-9]*$|^@.+$)"):
2022-04-03 12:00:35 -03:00
#Validate port in inquirer when managing nodes
2022-03-23 17:28:53 -03:00
if not re.match(regex, current):
2024-06-17 15:58:28 -03:00
raise inquirer.errors.ValidationError("", reason="Pick a port between 1-6553/app5, @profile or leave empty")
2022-03-23 17:28:53 -03:00
try:
port = int(current)
except:
port = 0
2022-03-22 19:54:05 -03:00
if current.startswith("@"):
if current[1:] not in self.profiles:
raise inquirer.errors.ValidationError("", reason="Profile {} don't exist".format(current))
elif current != "" and not 1 <= int(port) <= 65535:
raise inquirer.errors.ValidationError("", reason="Pick a port between 1-65535, @profile o leave empty")
return True
def _pass_validation(self, answers, current, regex = "(^@.+$)"):
2022-04-03 12:00:35 -03:00
#Validate password in inquirer
2022-03-22 19:54:05 -03:00
profiles = current.split(",")
for i in profiles:
if not re.match(regex, i) or i[1:] not in self.profiles:
raise inquirer.errors.ValidationError("", reason="Profile {} don't exist".format(i))
return True
def _tags_validation(self, answers, current):
#Validation for Tags in inquirer when managing nodes
if current.startswith("@"):
if current[1:] not in self.profiles:
raise inquirer.errors.ValidationError("", reason="Profile {} don't exist".format(current))
elif current != "":
isdict = False
try:
isdict = ast.literal_eval(current)
except:
pass
if not isinstance (isdict, dict):
raise inquirer.errors.ValidationError("", reason="Tags should be a python dictionary.".format(current))
return True
def _profile_tags_validation(self, answers, current):
#Validation for Tags in inquirer when managing profiles
if current != "":
isdict = False
try:
isdict = ast.literal_eval(current)
except:
pass
if not isinstance (isdict, dict):
raise inquirer.errors.ValidationError("", reason="Tags should be a python dictionary.".format(current))
return True
2023-12-01 18:30:29 -03:00
def _jumphost_validation(self, answers, current):
#Validation for Jumphost in inquirer when managing nodes
if current.startswith("@"):
if current[1:] not in self.profiles:
raise inquirer.errors.ValidationError("", reason="Profile {} don't exist".format(current))
elif current != "":
2024-04-22 18:17:11 -03:00
if current not in self.nodes_list :
2023-12-01 18:30:29 -03:00
raise inquirer.errors.ValidationError("", reason="Node {} don't exist.".format(current))
return True
def _profile_jumphost_validation(self, answers, current):
#Validation for Jumphost in inquirer when managing profiles
if current != "":
2024-04-22 18:17:11 -03:00
if current not in self.nodes_list :
2023-12-01 18:30:29 -03:00
raise inquirer.errors.ValidationError("", reason="Node {} don't exist.".format(current))
return True
2022-03-22 19:54:05 -03:00
def _default_validation(self, answers, current):
2022-04-03 12:00:35 -03:00
#Default validation type used in multiples questions in inquirer
2022-03-22 19:54:05 -03:00
if current.startswith("@"):
if current[1:] not in self.profiles:
raise inquirer.errors.ValidationError("", reason="Profile {} don't exist".format(current))
return True
2022-03-23 19:33:56 -03:00
def _bulk_node_validation(self, answers, current, regex = "^[0-9a-zA-Z_.,$#-]+$"):
2022-04-03 12:00:35 -03:00
#Validation of nodes when running bulk command
2022-03-23 19:33:56 -03:00
if not re.match(regex, current):
raise inquirer.errors.ValidationError("", reason="Host cannot be empty")
if current.startswith("@"):
if current[1:] not in self.profiles:
raise inquirer.errors.ValidationError("", reason="Profile {} don't exist".format(current))
return True
def _bulk_folder_validation(self, answers, current):
2022-04-03 12:00:35 -03:00
#Validation of folders when running bulk command
2022-03-25 12:25:59 -03:00
if not self.case:
current = current.lower()
2022-03-23 19:33:56 -03:00
matches = list(filter(lambda k: k == current, self.folders))
if current != "" and len(matches) == 0:
raise inquirer.errors.ValidationError("", reason="Location {} don't exist".format(current))
return True
def _bulk_host_validation(self, answers, current, regex = "^.+$"):
2022-04-03 12:00:35 -03:00
#Validate hostname when running bulk command
2022-03-23 19:33:56 -03:00
if not re.match(regex, current):
raise inquirer.errors.ValidationError("", reason="Host cannot be empty")
if current.startswith("@"):
if current[1:] not in self.profiles:
raise inquirer.errors.ValidationError("", reason="Profile {} don't exist".format(current))
hosts = current.split(",")
nodes = answers["ids"].split(",")
if len(hosts) > 1 and len(hosts) != len(nodes):
raise inquirer.errors.ValidationError("", reason="Hosts list should be the same length of nodes list")
return True
2022-03-22 19:54:05 -03:00
def _questions_edit(self):
2022-04-03 12:00:35 -03:00
#Inquirer questions when editing nodes or profiles
2022-03-22 19:54:05 -03:00
questions = []
questions.append(inquirer.Confirm("host", message="Edit Hostname/IP?"))
2024-06-17 15:58:28 -03:00
questions.append(inquirer.Confirm("protocol", message="Edit Protocol/app?"))
2022-03-22 19:54:05 -03:00
questions.append(inquirer.Confirm("port", message="Edit Port?"))
questions.append(inquirer.Confirm("options", message="Edit Options?"))
questions.append(inquirer.Confirm("logs", message="Edit logging path/file?"))
questions.append(inquirer.Confirm("tags", message="Edit tags?"))
2023-12-01 18:30:29 -03:00
questions.append(inquirer.Confirm("jumphost", message="Edit jumphost?"))
2022-03-22 19:54:05 -03:00
questions.append(inquirer.Confirm("user", message="Edit User?"))
questions.append(inquirer.Confirm("password", message="Edit password?"))
answers = inquirer.prompt(questions)
return answers
2022-03-23 17:28:53 -03:00
def _questions_nodes(self, unique, uniques = None, edit = None):
2022-04-03 12:00:35 -03:00
#Questions when adding or editing nodes
2022-03-22 19:54:05 -03:00
try:
2022-03-25 12:25:59 -03:00
defaults = self.config.getitem(unique)
if "tags" not in defaults:
defaults["tags"] = ""
2023-12-01 18:30:29 -03:00
if "jumphost" not in defaults:
defaults["jumphost"] = ""
2022-03-22 19:54:05 -03:00
except:
2023-12-01 18:30:29 -03:00
defaults = { "host":"", "protocol":"", "port":"", "user":"", "options":"", "logs":"" , "tags":"", "password":"", "jumphost":""}
2022-03-22 19:54:05 -03:00
node = {}
if edit == None:
2023-12-01 18:30:29 -03:00
edit = { "host":True, "protocol":True, "port":True, "user":True, "password": True,"options":True, "logs":True, "tags":True, "jumphost":True }
2022-03-22 19:54:05 -03:00
questions = []
if edit["host"]:
questions.append(inquirer.Text("host", message="Add Hostname or IP", validate=self._host_validation, default=defaults["host"]))
else:
node["host"] = defaults["host"]
if edit["protocol"]:
2024-06-17 15:58:28 -03:00
questions.append(inquirer.Text("protocol", message="Select Protocol/app", validate=self._protocol_validation, default=defaults["protocol"]))
2022-03-22 19:54:05 -03:00
else:
node["protocol"] = defaults["protocol"]
if edit["port"]:
questions.append(inquirer.Text("port", message="Select Port Number", validate=self._port_validation, default=defaults["port"]))
else:
node["port"] = defaults["port"]
if edit["options"]:
2024-06-17 15:58:28 -03:00
questions.append(inquirer.Text("options", message="Pass extra options to protocol/app", validate=self._default_validation, default=defaults["options"]))
2022-03-22 19:54:05 -03:00
else:
node["options"] = defaults["options"]
if edit["logs"]:
2023-04-14 18:30:58 -03:00
questions.append(inquirer.Text("logs", message="Pick logging path/file ", validate=self._default_validation, default=defaults["logs"].replace("{","{{").replace("}","}}")))
2022-03-22 19:54:05 -03:00
else:
node["logs"] = defaults["logs"]
if edit["tags"]:
questions.append(inquirer.Text("tags", message="Add tags dictionary", validate=self._tags_validation, default=str(defaults["tags"]).replace("{","{{").replace("}","}}")))
else:
node["tags"] = defaults["tags"]
2023-12-01 18:30:29 -03:00
if edit["jumphost"]:
questions.append(inquirer.Text("jumphost", message="Add Jumphost node", validate=self._jumphost_validation, default=str(defaults["jumphost"]).replace("{","{{").replace("}","}}")))
else:
node["jumphost"] = defaults["jumphost"]
2022-03-22 19:54:05 -03:00
if edit["user"]:
questions.append(inquirer.Text("user", message="Pick username", validate=self._default_validation, default=defaults["user"]))
else:
node["user"] = defaults["user"]
if edit["password"]:
questions.append(inquirer.List("password", message="Password: Use a local password, no password or a list of profiles to reference?", choices=["Local Password", "Profiles", "No Password"]))
else:
node["password"] = defaults["password"]
answer = inquirer.prompt(questions)
if answer == None:
return False
if "password" in answer.keys():
if answer["password"] == "Local Password":
passq = [inquirer.Password("password", message="Set Password")]
passa = inquirer.prompt(passq)
2022-03-25 17:55:43 -03:00
if passa == None:
return False
answer["password"] = self.config.encrypt(passa["password"])
2022-03-22 19:54:05 -03:00
elif answer["password"] == "Profiles":
passq = [(inquirer.Text("password", message="Set a @profile or a comma separated list of @profiles", validate=self._pass_validation))]
passa = inquirer.prompt(passq)
2022-03-25 17:55:43 -03:00
if passa == None:
return False
2022-03-22 19:54:05 -03:00
answer["password"] = passa["password"].split(",")
elif answer["password"] == "No Password":
answer["password"] = ""
if "tags" in answer.keys() and not answer["tags"].startswith("@") and answer["tags"]:
answer["tags"] = ast.literal_eval(answer["tags"])
2022-03-22 19:54:05 -03:00
result = {**uniques, **answer, **node}
2022-03-23 17:28:53 -03:00
result["type"] = "connection"
2022-03-22 19:54:05 -03:00
return result
2022-03-23 17:28:53 -03:00
def _questions_profiles(self, unique, edit = None):
2022-04-03 12:00:35 -03:00
#Questions when adding or editing profiles
2022-03-23 17:28:53 -03:00
try:
defaults = self.config.profiles[unique]
if "tags" not in defaults:
defaults["tags"] = ""
2023-12-01 18:30:29 -03:00
if "jumphost" not in defaults:
defaults["jumphost"] = ""
2022-03-23 17:28:53 -03:00
except:
2023-12-01 18:30:29 -03:00
defaults = { "host":"", "protocol":"", "port":"", "user":"", "options":"", "logs":"", "tags": "", "jumphost": ""}
2022-03-23 17:28:53 -03:00
profile = {}
if edit == None:
2023-12-01 18:30:29 -03:00
edit = { "host":True, "protocol":True, "port":True, "user":True, "password": True,"options":True, "logs":True, "tags":True, "jumphost":True }
2022-03-23 17:28:53 -03:00
questions = []
if edit["host"]:
questions.append(inquirer.Text("host", message="Add Hostname or IP", default=defaults["host"]))
else:
profile["host"] = defaults["host"]
if edit["protocol"]:
2024-06-17 15:58:28 -03:00
questions.append(inquirer.Text("protocol", message="Select Protocol/app", validate=self._profile_protocol_validation, default=defaults["protocol"]))
2022-03-23 17:28:53 -03:00
else:
profile["protocol"] = defaults["protocol"]
if edit["port"]:
questions.append(inquirer.Text("port", message="Select Port Number", validate=self._profile_port_validation, default=defaults["port"]))
else:
profile["port"] = defaults["port"]
if edit["options"]:
2024-06-17 15:58:28 -03:00
questions.append(inquirer.Text("options", message="Pass extra options to protocol/app", default=defaults["options"]))
2022-03-23 17:28:53 -03:00
else:
profile["options"] = defaults["options"]
if edit["logs"]:
2023-04-14 18:30:58 -03:00
questions.append(inquirer.Text("logs", message="Pick logging path/file ", default=defaults["logs"].replace("{","{{").replace("}","}}")))
2022-03-23 17:28:53 -03:00
else:
profile["logs"] = defaults["logs"]
if edit["tags"]:
questions.append(inquirer.Text("tags", message="Add tags dictionary", validate=self._profile_tags_validation, default=str(defaults["tags"]).replace("{","{{").replace("}","}}")))
else:
profile["tags"] = defaults["tags"]
2023-12-01 18:30:29 -03:00
if edit["jumphost"]:
questions.append(inquirer.Text("jumphost", message="Add Jumphost node", validate=self._profile_jumphost_validation, default=str(defaults["jumphost"]).replace("{","{{").replace("}","}}")))
else:
profile["jumphost"] = defaults["jumphost"]
2022-03-23 17:28:53 -03:00
if edit["user"]:
questions.append(inquirer.Text("user", message="Pick username", default=defaults["user"]))
else:
profile["user"] = defaults["user"]
if edit["password"]:
questions.append(inquirer.Password("password", message="Set Password"))
else:
profile["password"] = defaults["password"]
answer = inquirer.prompt(questions)
if answer == None:
return False
if "password" in answer.keys():
if answer["password"] != "":
answer["password"] = self.config.encrypt(answer["password"])
if "tags" in answer.keys() and answer["tags"]:
answer["tags"] = ast.literal_eval(answer["tags"])
2022-03-23 17:28:53 -03:00
result = {**answer, **profile}
result["id"] = unique
return result
2022-03-22 19:54:05 -03:00
def _questions_bulk(self, nodes="", hosts=""):
2022-04-03 12:00:35 -03:00
#Questions when using bulk command
2022-03-23 19:33:56 -03:00
questions = []
questions.append(inquirer.Text("ids", message="add a comma separated list of nodes to add", default=nodes, validate=self._bulk_node_validation))
2022-03-23 19:33:56 -03:00
questions.append(inquirer.Text("location", message="Add a @folder, @subfolder@folder or leave empty", validate=self._bulk_folder_validation))
questions.append(inquirer.Text("host", message="Add comma separated list of Hostnames or IPs", default=hosts, validate=self._bulk_host_validation))
2024-06-17 15:58:28 -03:00
questions.append(inquirer.Text("protocol", message="Select Protocol/app", validate=self._protocol_validation))
2022-03-23 19:33:56 -03:00
questions.append(inquirer.Text("port", message="Select Port Number", validate=self._port_validation))
2024-06-17 15:58:28 -03:00
questions.append(inquirer.Text("options", message="Pass extra options to protocol/app", validate=self._default_validation))
2022-03-23 19:33:56 -03:00
questions.append(inquirer.Text("logs", message="Pick logging path/file ", validate=self._default_validation))
questions.append(inquirer.Text("tags", message="Add tags dictionary", validate=self._tags_validation))
2023-12-01 18:30:29 -03:00
questions.append(inquirer.Text("jumphost", message="Add Jumphost node", validate=self._jumphost_validation))
2022-03-23 19:33:56 -03:00
questions.append(inquirer.Text("user", message="Pick username", validate=self._default_validation))
questions.append(inquirer.List("password", message="Password: Use a local password, no password or a list of profiles to reference?", choices=["Local Password", "Profiles", "No Password"]))
answer = inquirer.prompt(questions)
if answer == None:
return False
if "password" in answer.keys():
if answer["password"] == "Local Password":
passq = [inquirer.Password("password", message="Set Password")]
passa = inquirer.prompt(passq)
answer["password"] = self.config.encrypt(passa["password"])
2022-03-23 19:33:56 -03:00
elif answer["password"] == "Profiles":
passq = [(inquirer.Text("password", message="Set a @profile or a comma separated list of @profiles", validate=self._pass_validation))]
passa = inquirer.prompt(passq)
answer["password"] = passa["password"].split(",")
elif answer["password"] == "No Password":
answer["password"] = ""
answer["type"] = "connection"
if "tags" in answer.keys() and not answer["tags"].startswith("@") and answer["tags"]:
answer["tags"] = ast.literal_eval(answer["tags"])
2022-03-23 19:33:56 -03:00
return answer
2022-03-22 19:54:05 -03:00
def _type_node(self, arg_value, pat=re.compile(r"^[0-9a-zA-Z_.$@#-]+$")):
2023-12-01 18:30:29 -03:00
if arg_value == None:
raise ValueError("Missing argument node")
2022-03-19 20:41:35 -03:00
if not pat.match(arg_value):
2023-10-26 17:33:44 -03:00
raise ValueError(f"Argument error: {arg_value}")
2022-03-22 19:54:05 -03:00
return arg_value
2022-03-19 20:41:35 -03:00
2022-03-22 19:54:05 -03:00
def _type_profile(self, arg_value, pat=re.compile(r"^[0-9a-zA-Z_.$#-]+$")):
if not pat.match(arg_value):
2023-10-26 17:33:44 -03:00
raise ValueError
2022-03-22 19:54:05 -03:00
return arg_value
2023-12-14 16:56:59 -03:00
def _help(self, type, parsers = None):
2022-04-03 12:00:35 -03:00
#Store text for help and other commands
2022-03-22 19:54:05 -03:00
if type == "node":
2023-12-01 13:40:49 -03:00
return "node[@subfolder][@folder]\nConnect to specific node or show all matching nodes\n[@subfolder][@folder]\nShow all available connections globally or in specified path"
2022-03-26 16:30:37 -03:00
if type == "usage":
2023-12-14 16:56:59 -03:00
commands = []
for subcommand, subparser in parsers.choices.items():
if subparser.description != None:
commands.append(subcommand)
commands = ",".join(commands)
usage_help = f"connpy [-h] [--add | --del | --mod | --show | --debug] [node|folder] [--sftp]\n connpy {{{commands}}} ..."
2023-12-14 16:56:59 -03:00
return usage_help
2022-03-26 16:30:37 -03:00
if type == "end":
2023-12-14 16:56:59 -03:00
help_dict = {}
for subcommand, subparser in parsers.choices.items():
if subparser.description == None and help_dict:
previous_key = next(reversed(help_dict.keys()))
help_dict[f"{previous_key}({subcommand})"] = help_dict.pop(previous_key)
else:
help_dict[subcommand] = subparser.description
subparser.description = None
commands_help = "Commands:\n"
commands_help += "\n".join([f" {cmd:<15} {help_text}" for cmd, help_text in help_dict.items() if help_text != None])
return commands_help
2022-04-04 19:09:00 -03:00
if type == "bashcompletion":
return '''
#Here starts bash completion for conn
_conn()
{
mapfile -t strings < <(connpy-completion-helper "bash" "${#COMP_WORDS[@]}" "${COMP_WORDS[@]}")
2023-10-05 15:21:17 -03:00
local IFS=$'\t\n'
local home_dir=$(eval echo ~)
local last_word=${COMP_WORDS[-1]/\~/$home_dir}
COMPREPLY=($(compgen -W "$(printf '%s' "${strings[@]}")" -- "$last_word"))
if [ "$last_word" != "${COMP_WORDS[-1]}" ]; then
COMPREPLY=(${COMPREPLY[@]/$home_dir/\~})
fi
}
complete -o nospace -o nosort -F _conn conn
complete -o nospace -o nosort -F _conn connpy
2022-04-18 19:19:25 -03:00
#Here ends bash completion for conn
'''
2022-04-04 19:09:00 -03:00
if type == "zshcompletion":
return '''
2022-04-18 19:19:25 -03:00
#Here starts zsh completion for conn
2022-04-04 19:09:00 -03:00
autoload -U compinit && compinit
_conn()
{
2023-10-05 15:21:17 -03:00
local home_dir=$(eval echo ~)
last_word=${words[-1]/\~/$home_dir}
strings=($(connpy-completion-helper "zsh" ${#words} $words[1,-2] $last_word))
for string in "${strings[@]}"; do
2023-10-05 15:21:17 -03:00
#Replace the expanded home directory with ~
if [ "$last_word" != "$words[-1]" ]; then
string=${string/$home_dir/\~}
fi
if [[ "${string}" =~ .*/$ ]]; then
# If the string ends with a '/', do not append a space
2023-10-05 15:21:17 -03:00
compadd -Q -S '' -- "$string"
else
# If the string does not end with a '/', append a space
2023-10-05 15:21:17 -03:00
compadd -Q -S ' ' -- "$string"
fi
done
2022-04-04 19:09:00 -03:00
}
compdef _conn conn
compdef _conn connpy
2022-04-18 19:19:25 -03:00
#Here ends zsh completion for conn
2022-04-04 19:09:00 -03:00
'''
2022-05-11 14:25:43 -03:00
if type == "run":
return "node[@subfolder][@folder] commmand to run\nRun the specific command on the node and print output\n/path/to/file.yaml\nUse a yaml file to run an automation script"
if type == "generate":
return '''---
tasks:
- name: "Config"
action: 'run' #Action can be test or run. Mandatory
nodes: #List of nodes to work on. Mandatory
- 'router1@office' #You can add specific nodes
- '@aws' #entire folders or subfolders
- '@office': #or filter inside a folder or subfolder
2022-05-11 14:25:43 -03:00
- 'router2'
- 'router7'
commands: #List of commands to send, use {name} to pass variables
- 'term len 0'
- 'conf t'
- 'interface {if}'
- 'ip address 10.100.100.{id} 255.255.255.255'
- '{commit}'
- 'end'
variables: #Variables to use on commands and expected. Optional
__global__: #Global variables to use on all nodes, fallback if missing in the node.
commit: ''
if: 'loopback100'
router1@office:
id: 1
router2@office:
id: 2
commit: 'commit'
router3@office:
id: 3
vrouter1@aws:
id: 4
vrouterN@aws:
id: 5
output: /home/user/logs #Type of output, if null you only get Connection and test result. Choices are: null,stdout,/path/to/folder. Folder path only works on 'run' action.
options:
prompt: r'>$|#$|\$$|>.$|#.$|\$.$' #Optional prompt to check on your devices, default should work on most devices.
parallel: 10 #Optional number of nodes to run commands on parallel. Default 10.
timeout: 20 #Optional time to wait in seconds for prompt, expected or EOF. Default 20.
- name: "TestConfig"
action: 'test'
nodes:
- 'router1@office'
- '@aws'
- '@office':
- 'router2'
- 'router7'
commands:
- 'ping 10.100.100.{id}'
expected: '!' #Expected text to find when running test action. Mandatory for 'test'
variables:
router1@office:
id: 1
router2@office:
id: 2
commit: 'commit'
router3@office:
id: 3
vrouter1@aws:
id: 4
vrouterN@aws:
id: 5
output: null
...'''
2022-03-19 20:41:35 -03:00
2024-06-17 15:58:28 -03:00
def _print_instructions(self):
instructions = """
Welcome to Connpy node Addition Wizard!
Here are some important instructions and tips for configuring your new node:
1. **Profiles**:
- You can use the configured settings in a profile using `@profilename`.
2. **Available Protocols and Apps**:
- ssh
- telnet
- kubectl (`kubectl exec`)
- docker (`docker exec`)
3. **Optional Values**:
- You can leave any value empty except for the hostname/IP.
4. **Passwords**:
- You can pass one or more passwords using comma-separated `@profiles`.
5. **Logging**:
- You can use the following variables in the logging file name:
- `${id}`
- `${unique}`
- `${host}`
- `${port}`
- `${user}`
- `${protocol}`
6. **Well-Known Tags**:
- `os`: Identified by AI to generate commands based on the operating system.
- `screen_length_command`: Used by automation to avoid pagination on different devices (e.g., `terminal length 0` for Cisco devices).
- `prompt`: Replaces default app prompt to identify the end of output or where the user can start inputting commands.
- `kube_command`: Replaces the default command (`/bin/bash`) for `kubectl exec`.
- `docker_command`: Replaces the default command for `docker exec`.
Please follow these instructions carefully to ensure proper configuration of your new node.
"""
mdprint(Markdown(instructions))