add capture feature, change printing mechanics for all app, bug fixes

This commit is contained in:
2025-08-04 11:34:22 -03:00
parent 29a6a01015
commit c5e70c6070
14 changed files with 954 additions and 227 deletions
+151 -132
View File
@@ -8,6 +8,7 @@ import sys
import inquirer
from .core import node,nodes
from ._version import __version__
from . import printer
from .api import start_api,stop_api,debug_api,app
from .ai import ai
from .plugins import Plugins
@@ -17,8 +18,13 @@ class NoAliasDumper(yaml.SafeDumper):
def ignore_aliases(self, data):
return True
import ast
from rich import print as mdprint
from rich.markdown import Markdown
from rich.console import Console, Group
from rich.panel import Panel
from rich.text import Text
from rich.rule import Rule
from rich.style import Style
mdprint = Console().print
try:
from pyfzf.pyfzf import FzfPrompt
except:
@@ -70,7 +76,7 @@ class connapp:
'''
#DEFAULTPARSER
defaultparser = argparse.ArgumentParser(prog = "conn", description = "SSH and Telnet connection manager", formatter_class=argparse.RawTextHelpFormatter)
defaultparser = argparse.ArgumentParser(prog = "connpy", description = "SSH and Telnet connection manager", formatter_class=argparse.RawTextHelpFormatter)
subparsers = defaultparser.add_subparsers(title="Commands", dest="subcommand")
#NODEPARSER
nodeparser = subparsers.add_parser("node", formatter_class=argparse.RawTextHelpFormatter)
@@ -190,7 +196,11 @@ class connapp:
argv[0] = "profile"
if len(argv) < 1 or argv[0] not in self.commands:
argv.insert(0,"node")
args = defaultparser.parse_args(argv)
args, unknown_args = defaultparser.parse_known_args(argv)
if hasattr(args, "unknown_args"):
args.unknown_args = unknown_args
else:
args = defaultparser.parse_args(argv)
if args.subcommand in self.plugins.plugins:
self.plugins.plugins[args.subcommand].Entrypoint(args, self.plugins.plugin_parsers[args.subcommand].parser, self)
else:
@@ -211,14 +221,14 @@ class connapp:
return actions.get(args.action)(args)
def _version(self, args):
print(__version__)
printer.info(f"Connpy {__version__}")
def _connect(self, args):
if args.data == None:
matches = self.nodes_list
if len(matches) == 0:
print("There are no nodes created")
print("try: conn --help")
printer.warning("There are no nodes created")
printer.info("try: connpy --help")
exit(9)
else:
if args.data.startswith("@"):
@@ -226,7 +236,7 @@ class connapp:
else:
matches = list(filter(lambda k: k.startswith(args.data), self.nodes_list))
if len(matches) == 0:
print("{} not found".format(args.data))
printer.error("{} not found".format(args.data))
exit(2)
elif len(matches) > 1:
matches[0] = self._choose(matches,"node", "connect")
@@ -243,16 +253,16 @@ class connapp:
def _del(self, args):
if args.data == None:
print("Missing argument node")
printer.error("Missing argument node")
exit(3)
elif args.data.startswith("@"):
matches = list(filter(lambda k: k == args.data, self.folders))
else:
matches = self.config._getallnodes(args.data)
if len(matches) == 0:
print("{} not found".format(args.data))
printer.error("{} not found".format(args.data))
exit(2)
print("Removing: {}".format(matches))
printer.info("Removing: {}".format(matches))
question = [inquirer.Confirm("delete", message="Are you sure you want to continue?")]
confirm = inquirer.prompt(question)
if confirm == None:
@@ -267,14 +277,14 @@ class connapp:
self.config._connections_del(**nodeuniques)
self.config._saveconfig(self.config.file)
if len(matches) == 1:
print("{} deleted succesfully".format(matches[0]))
printer.success("{} deleted successfully".format(matches[0]))
else:
print(f"{len(matches)} nodes deleted succesfully")
printer.success(f"{len(matches)} nodes deleted successfully")
def _add(self, args):
args.data = self._type_node(args.data)
if args.data == None:
print("Missing argument node")
printer.error("Missing argument node")
exit(3)
elif args.data.startswith("@"):
type = "folder"
@@ -285,34 +295,34 @@ class connapp:
matches = list(filter(lambda k: k == args.data, self.nodes_list))
reversematches = list(filter(lambda k: k == "@" + args.data, self.folders))
if len(matches) > 0:
print("{} already exist".format(matches[0]))
printer.error("{} already exist".format(matches[0]))
exit(4)
if len(reversematches) > 0:
print("{} already exist".format(reversematches[0]))
printer.error("{} already exist".format(reversematches[0]))
exit(4)
else:
if type == "folder":
uniques = self.config._explode_unique(args.data)
if uniques == False:
print("Invalid folder {}".format(args.data))
printer.error("Invalid folder {}".format(args.data))
exit(5)
if "subfolder" in uniques.keys():
parent = "@" + uniques["folder"]
if parent not in self.folders:
print("Folder {} not found".format(uniques["folder"]))
printer.error("Folder {} not found".format(uniques["folder"]))
exit(2)
self.config._folder_add(**uniques)
self.config._saveconfig(self.config.file)
print("{} added succesfully".format(args.data))
printer.success("{} added successfully".format(args.data))
if type == "node":
nodefolder = args.data.partition("@")
nodefolder = "@" + nodefolder[2]
if nodefolder not in self.folders and nodefolder != "@":
print(nodefolder + " not found")
printer.error(nodefolder + " not found")
exit(2)
uniques = self.config._explode_unique(args.data)
if uniques == False:
print("Invalid node {}".format(args.data))
printer.error("Invalid node {}".format(args.data))
exit(5)
self._print_instructions()
newnode = self._questions_nodes(args.data, uniques)
@@ -320,44 +330,43 @@ class connapp:
exit(7)
self.config._connections_add(**newnode)
self.config._saveconfig(self.config.file)
print("{} added succesfully".format(args.data))
printer.success("{} added successfully".format(args.data))
def _show(self, args):
if args.data == None:
print("Missing argument node")
printer.error("Missing argument node")
exit(3)
matches = list(filter(lambda k: k == args.data, self.nodes_list))
if args.data.startswith("@"):
matches = list(filter(lambda k: args.data in k, self.nodes_list))
else:
matches = list(filter(lambda k: k.startswith(args.data), self.nodes_list))
if len(matches) == 0:
print("{} not found".format(args.data))
printer.error("{} not found".format(args.data))
exit(2)
elif len(matches) > 1:
matches[0] = self._choose(matches,"node", "connect")
if matches[0] == None:
exit(7)
node = self.config.getitem(matches[0])
for k, v in node.items():
if isinstance(v, str):
print(k + ": " + v)
elif isinstance(v, list):
print(k + ":")
for i in v:
print(" - " + i)
elif isinstance(v, dict):
print(k + ":")
for i,d in v.items():
print(" - " + i + ": " + str(d))
yaml_output = yaml.dump(node, sort_keys=False, default_flow_style=False)
printer.custom(matches[0],"")
print(yaml_output)
def _mod(self, args):
if args.data == None:
print("Missing argument node")
printer.error("Missing argument node")
exit(3)
matches = self.config._getallnodes(args.data)
if len(matches) == 0:
print("No connection found with filter: {}".format(args.data))
printer.error("No connection found with filter: {}".format(args.data))
exit(2)
elif len(matches) == 1:
uniques = self.config._explode_unique(args.data)
uniques = self.config._explode_unique(matches[0])
unique = matches[0]
else:
uniques = {"id": None, "folder": None}
unique = None
print("Editing: {}".format(matches))
printer.info("Editing: {}".format(matches))
node = {}
for i in matches:
node[i] = self.config.getitem(i)
@@ -371,12 +380,12 @@ class connapp:
uniques.update(node[matches[0]])
uniques["type"] = "connection"
if sorted(updatenode.items()) == sorted(uniques.items()):
print("Nothing to do here")
printer.info("Nothing to do here")
return
else:
self.config._connections_add(**updatenode)
self.config._saveconfig(self.config.file)
print("{} edited succesfully".format(args.data))
printer.success("{} edited successfully".format(args.data))
else:
for k in node:
updatednode = self.config._explode_unique(k)
@@ -388,12 +397,12 @@ class connapp:
editcount += 1
updatednode[key] = updatenode[key]
if not editcount:
print("Nothing to do here")
printer.info("Nothing to do here")
return
else:
self.config._connections_add(**updatednode)
self.config._saveconfig(self.config.file)
print("{} edited succesfully".format(matches))
printer.success("{} edited successfully".format(matches))
return
@@ -407,57 +416,48 @@ class connapp:
def _profile_del(self, args):
matches = list(filter(lambda k: k == args.data[0], self.profiles))
if len(matches) == 0:
print("{} not found".format(args.data[0]))
printer.error("{} not found".format(args.data[0]))
exit(2)
if matches[0] == "default":
print("Can't delete default profile")
printer.error("Can't delete default profile")
exit(6)
usedprofile = self.config._profileused(matches[0])
if len(usedprofile) > 0:
print("Profile {} used in the following nodes:".format(matches[0]))
print(", ".join(usedprofile))
printer.error(f"Profile {matches[0]} used in the following nodes:\n{', '.join(usedprofile)}")
exit(8)
question = [inquirer.Confirm("delete", message="Are you sure you want to delete {}?".format(matches[0]))]
confirm = inquirer.prompt(question)
if confirm["delete"]:
self.config._profiles_del(id = matches[0])
self.config._saveconfig(self.config.file)
print("{} deleted succesfully".format(matches[0]))
printer.success("{} deleted successfully".format(matches[0]))
def _profile_show(self, args):
matches = list(filter(lambda k: k == args.data[0], self.profiles))
if len(matches) == 0:
print("{} not found".format(args.data[0]))
printer.error("{} not found".format(args.data[0]))
exit(2)
profile = self.config.profiles[matches[0]]
for k, v in profile.items():
if isinstance(v, str):
print(k + ": " + v)
elif isinstance(v, list):
print(k + ":")
for i in v:
print(" - " + i)
elif isinstance(v, dict):
print(k + ":")
for i,d in v.items():
print(" - " + i + ": " + str(d))
yaml_output = yaml.dump(profile, sort_keys=False, default_flow_style=False)
printer.custom(matches[0],"")
print(yaml_output)
def _profile_add(self, args):
matches = list(filter(lambda k: k == args.data[0], self.profiles))
if len(matches) > 0:
print("Profile {} Already exist".format(matches[0]))
printer.error("Profile {} Already exist".format(matches[0]))
exit(4)
newprofile = self._questions_profiles(args.data[0])
if newprofile == False:
exit(7)
self.config._profiles_add(**newprofile)
self.config._saveconfig(self.config.file)
print("{} added succesfully".format(args.data[0]))
printer.success("{} added successfully".format(args.data[0]))
def _profile_mod(self, args):
matches = list(filter(lambda k: k == args.data[0], self.profiles))
if len(matches) == 0:
print("{} not found".format(args.data[0]))
printer.error("{} not found".format(args.data[0]))
exit(2)
profile = self.config.profiles[matches[0]]
oldprofile = {"id": matches[0]}
@@ -469,12 +469,12 @@ class connapp:
if not updateprofile:
exit(7)
if sorted(updateprofile.items()) == sorted(oldprofile.items()):
print("Nothing to do here")
printer.info("Nothing to do here")
return
else:
self.config._profiles_add(**updateprofile)
self.config._saveconfig(self.config.file)
print("{} edited succesfully".format(args.data[0]))
printer.success("{} edited successfully".format(args.data[0]))
def _func_others(self, args):
#Function called when using other commands
@@ -509,7 +509,9 @@ class connapp:
formated[upper_key] = upper_value
newitems.append(args.format[0].format(**formated))
items = newitems
print(*items, sep="\n")
yaml_output = yaml.dump(items, sort_keys=False, default_flow_style=False)
printer.custom(args.data,"")
print(yaml_output)
def _mvcp(self, args):
if not self.case:
@@ -518,20 +520,20 @@ class connapp:
source = list(filter(lambda k: k == args.data[0], self.nodes_list))
dest = list(filter(lambda k: k == args.data[1], self.nodes_list))
if len(source) != 1:
print("{} not found".format(args.data[0]))
printer.error("{} not found".format(args.data[0]))
exit(2)
if len(dest) > 0:
print("Node {} Already exist".format(args.data[1]))
printer.error("Node {} Already exist".format(args.data[1]))
exit(4)
nodefolder = args.data[1].partition("@")
nodefolder = "@" + nodefolder[2]
if nodefolder not in self.folders and nodefolder != "@":
print("{} not found".format(nodefolder))
printer.error("{} not found".format(nodefolder))
exit(2)
olduniques = self.config._explode_unique(args.data[0])
newuniques = self.config._explode_unique(args.data[1])
if newuniques == False:
print("Invalid node {}".format(args.data[1]))
printer.error("Invalid node {}".format(args.data[1]))
exit(5)
node = self.config.getitem(source[0])
newnode = {**newuniques, **node}
@@ -540,7 +542,7 @@ class connapp:
self.config._connections_del(**olduniques)
self.config._saveconfig(self.config.file)
action = "moved" if args.command == "move" else "copied"
print("{} {} succesfully to {}".format(args.data[0],action, args.data[1]))
printer.success("{} {} successfully to {}".format(args.data[0],action, args.data[1]))
def _bulk(self, args):
if args.file and os.path.isfile(args.file[0]):
@@ -549,7 +551,9 @@ class connapp:
# Expecting exactly 2 lines
if len(lines) < 2:
raise ValueError("The file must contain at least two lines: one for nodes, one for hosts.")
printer.error("The file must contain at least two lines: one for nodes, one for hosts.")
exit(11)
nodes = lines[0].strip()
hosts = lines[1].strip()
@@ -569,10 +573,10 @@ class connapp:
matches = list(filter(lambda k: k == unique, self.nodes_list))
reversematches = list(filter(lambda k: k == "@" + unique, self.folders))
if len(matches) > 0:
print("Node {} already exist, ignoring it".format(unique))
printer.info("Node {} already exist, ignoring it".format(unique))
continue
if len(reversematches) > 0:
print("Folder with name {} already exist, ignoring it".format(unique))
printer.info("Folder with name {} already exist, ignoring it".format(unique))
continue
newnode = {"id": n}
if newnodes["location"] != "":
@@ -596,9 +600,9 @@ class connapp:
self.nodes_list = self.config._getallnodes()
if count > 0:
self.config._saveconfig(self.config.file)
print("Succesfully added {} nodes".format(count))
printer.success("Successfully added {} nodes".format(count))
else:
print("0 nodes added")
printer.info("0 nodes added")
def _completion(self, args):
if args.data[0] == "bash":
@@ -633,7 +637,7 @@ class connapp:
folder = os.path.abspath(args.data[0]).rstrip('/')
with open(pathfile, "w") as f:
f.write(str(folder))
print("Config saved")
printer.success("Config saved")
def _openai(self, args):
if "openai" in self.config.config:
@@ -647,37 +651,37 @@ class connapp:
def _change_settings(self, name, value):
self.config.config[name] = value
self.config._saveconfig(self.config.file)
print("Config saved")
printer.success("Config saved")
def _func_plugin(self, args):
if args.add:
if not os.path.exists(args.add[1]):
print("File {} dosn't exists.".format(args.add[1]))
printer.error("File {} dosn't exists.".format(args.add[1]))
exit(14)
if args.add[0].isalpha() and args.add[0].islower() and len(args.add[0]) <= 15:
disabled_dest_file = os.path.join(self.config.defaultdir + "/plugins", args.add[0] + ".py.bkp")
if args.add[0] in self.commands or os.path.exists(disabled_dest_file):
print("Plugin name can't be the same as other commands.")
printer.error("Plugin name can't be the same as other commands.")
exit(15)
else:
check_bad_script = self.plugins.verify_script(args.add[1])
if check_bad_script:
print(check_bad_script)
printer.error(check_bad_script)
exit(16)
else:
try:
dest_file = os.path.join(self.config.defaultdir + "/plugins", args.add[0] + ".py")
shutil.copy2(args.add[1], dest_file)
print(f"Plugin {args.add[0]} added succesfully.")
printer.success(f"Plugin {args.add[0]} added successfully.")
except Exception as e:
print(f"Failed importing plugin file. {e}")
printer.error(f"Failed importing plugin file. {e}")
exit(17)
else:
print("Plugin name should be lowercase letters up to 15 characters.")
printer.error("Plugin name should be lowercase letters up to 15 characters.")
exit(15)
elif args.update:
if not os.path.exists(args.update[1]):
print("File {} dosn't exists.".format(args.update[1]))
printer.error("File {} dosn't exists.".format(args.update[1]))
exit(14)
plugin_file = os.path.join(self.config.defaultdir + "/plugins", args.update[0] + ".py")
disabled_plugin_file = os.path.join(self.config.defaultdir + "/plugins", args.update[0] + ".py.bkp")
@@ -686,7 +690,7 @@ class connapp:
if plugin_exist or disabled_plugin_exist:
check_bad_script = self.plugins.verify_script(args.update[1])
if check_bad_script:
print(check_bad_script)
printer.error(check_bad_script)
exit(16)
else:
try:
@@ -696,13 +700,13 @@ class connapp:
shutil.copy2(args.update[1], disabled_dest_file)
else:
shutil.copy2(args.update[1], dest_file)
print(f"Plugin {args.update[0]} updated succesfully.")
printer.success(f"Plugin {args.update[0]} updated successfully.")
except Exception as e:
print(f"Failed updating plugin file. {e}")
printer.error(f"Failed updating plugin file. {e}")
exit(17)
else:
print("Plugin {} dosn't exist.".format(args.update[0]))
printer.error("Plugin {} dosn't exist.".format(args.update[0]))
exit(14)
elif args.delete:
plugin_file = os.path.join(self.config.defaultdir + "/plugins", args.delete[0] + ".py")
@@ -710,7 +714,7 @@ class connapp:
plugin_exist = os.path.exists(plugin_file)
disabled_plugin_exist = os.path.exists(disabled_plugin_file)
if not plugin_exist and not disabled_plugin_exist:
print("Plugin {} dosn't exist.".format(args.delete[0]))
printer.error("Plugin {} dosn't exist.".format(args.delete[0]))
exit(14)
question = [inquirer.Confirm("delete", message="Are you sure you want to delete {} plugin?".format(args.delete[0]))]
confirm = inquirer.prompt(question)
@@ -722,33 +726,33 @@ class connapp:
os.remove(plugin_file)
elif disabled_plugin_exist:
os.remove(disabled_plugin_file)
print(f"plugin {args.delete[0]} deleted succesfully.")
printer.success(f"plugin {args.delete[0]} deleted successfully.")
except Exception as e:
print(f"Failed deleting plugin file. {e}")
printer.error(f"Failed deleting plugin file. {e}")
exit(17)
elif args.disable:
plugin_file = os.path.join(self.config.defaultdir + "/plugins", args.disable[0] + ".py")
disabled_plugin_file = os.path.join(self.config.defaultdir + "/plugins", args.disable[0] + ".py.bkp")
if not os.path.exists(plugin_file) or os.path.exists(disabled_plugin_file):
print("Plugin {} dosn't exist or it's disabled.".format(args.disable[0]))
printer.error("Plugin {} dosn't exist or it's disabled.".format(args.disable[0]))
exit(14)
try:
os.rename(plugin_file, disabled_plugin_file)
print(f"plugin {args.disable[0]} disabled succesfully.")
printer.success(f"plugin {args.disable[0]} disabled successfully.")
except Exception as e:
print(f"Failed disabling plugin file. {e}")
printer.error(f"Failed disabling plugin file. {e}")
exit(17)
elif args.enable:
plugin_file = os.path.join(self.config.defaultdir + "/plugins", args.enable[0] + ".py")
disabled_plugin_file = os.path.join(self.config.defaultdir + "/plugins", args.enable[0] + ".py.bkp")
if os.path.exists(plugin_file) or not os.path.exists(disabled_plugin_file):
print("Plugin {} dosn't exist or it's enabled.".format(args.enable[0]))
printer.error("Plugin {} dosn't exist or it's enabled.".format(args.enable[0]))
exit(14)
try:
os.rename(disabled_plugin_file, plugin_file)
print(f"plugin {args.enable[0]} enabled succesfully.")
printer.success(f"plugin {args.enable[0]} enabled successfully.")
except Exception as e:
print(f"Failed enabling plugin file. {e}")
printer.error(f"Failed enabling plugin file. {e}")
exit(17)
elif args.list:
enabled_files = []
@@ -768,18 +772,19 @@ class connapp:
if disabled_files:
plugins["Disabled"] = disabled_files
if plugins:
printer.custom("plugins","")
print(yaml.dump(plugins, sort_keys=False))
else:
print("There are no plugins added.")
printer.warning("There are no plugins added.")
def _func_import(self, args):
if not os.path.exists(args.data[0]):
print("File {} dosn't exist".format(args.data[0]))
printer.error("File {} dosn't exist".format(args.data[0]))
exit(14)
print("This could overwrite your current configuration!")
printer.warning("This could overwrite your current configuration!")
question = [inquirer.Confirm("import", message="Are you sure you want to import {} file?".format(args.data[0]))]
confirm = inquirer.prompt(question)
if confirm == None:
@@ -789,7 +794,7 @@ class connapp:
with open(args.data[0]) as file:
imported = yaml.load(file, Loader=yaml.FullLoader)
except:
print("failed reading file {}".format(args.data[0]))
printer.error("failed reading file {}".format(args.data[0]))
exit(10)
for k,v in imported.items():
uniques = self.config._explode_unique(k)
@@ -808,12 +813,12 @@ class connapp:
uniques.update(v)
self.config._connections_add(**uniques)
self.config._saveconfig(self.config.file)
print("File {} imported succesfully".format(args.data[0]))
printer.success("File {} imported successfully".format(args.data[0]))
return
def _func_export(self, args):
if os.path.exists(args.data[0]):
print("File {} already exists".format(args.data[0]))
printer.error("File {} already exists".format(args.data[0]))
exit(14)
if len(args.data[1:]) == 0:
foldercons = self.config._getallnodesfull(extract = False)
@@ -821,13 +826,13 @@ class connapp:
for folder in args.data[1:]:
matches = list(filter(lambda k: k == folder, self.folders))
if len(matches) == 0 and folder != "@":
print("{} folder not found".format(folder))
printer.error("{} folder not found".format(folder))
exit(2)
foldercons = self.config._getallnodesfull(args.data[1:], extract = False)
with open(args.data[0], "w") as file:
yaml.dump(foldercons, file, Dumper=NoAliasDumper, default_flow_style=False)
file.close()
print("File {} generated succesfully".format(args.data[0]))
printer.success("File {} generated successfully".format(args.data[0]))
exit()
return
@@ -977,13 +982,13 @@ class connapp:
def _yaml_generate(self, args):
if os.path.exists(args.data[0]):
print("File {} already exists".format(args.data[0]))
printer.error("File {} already exists".format(args.data[0]))
exit(14)
else:
with open(args.data[0], "w") as file:
file.write(self._help("generate"))
file.close()
print("File {} generated succesfully".format(args.data[0]))
printer.success("File {} generated successfully".format(args.data[0]))
exit()
def _yaml_run(self, args):
@@ -991,7 +996,7 @@ class connapp:
with open(args.data[0]) as file:
scripts = yaml.load(file, Loader=yaml.FullLoader)
except:
print("failed reading file {}".format(args.data[0]))
printer.error("failed reading file {}".format(args.data[0]))
exit(10)
for script in scripts["tasks"]:
self._cli_run(script)
@@ -1007,11 +1012,11 @@ class connapp:
if action == "test":
args["expected"] = script["expected"]
except KeyError as e:
print("'{}' is mandatory".format(e.args[0]))
printer.error("'{}' is mandatory".format(e.args[0]))
exit(11)
nodes = self.config._getallnodes(nodelist)
if len(nodes) == 0:
print("{} don't match any node".format(nodelist))
printer.error("{} don't match any node".format(nodelist))
exit(2)
nodes = self.nodes(self.config.getitems(nodes), config = self.config)
stdout = False
@@ -1037,32 +1042,47 @@ class connapp:
columns = int(p.group(1))
except:
columns = 80
PANEL_WIDTH = columns
if action == "run":
nodes.run(**args)
print(script["name"].upper() + "-" * (columns - len(script["name"])))
for i in nodes.status.keys():
print(" " + i + " " + "-" * (columns - len(i) - 13) + (" PASS(0)" if nodes.status[i] == 0 else " FAIL({})".format(nodes.status[i])))
if stdout:
for line in nodes.output[i].splitlines():
print(" " + line)
header = f"{script['name'].upper()}"
elif action == "test":
nodes.test(**args)
print(script["name"].upper() + "-" * (columns - len(script["name"])))
for i in nodes.status.keys():
print(" " + i + " " + "-" * (columns - len(i) - 13) + (" PASS(0)" if nodes.status[i] == 0 else " FAIL({})".format(nodes.status[i])))
if nodes.status[i] == 0:
max_length = max(len(s) for s in nodes.result[i].keys())
for k,v in nodes.result[i].items():
print(" TEST for '{}'".format(k) + " "*(max_length - len(k) + 1) + "--> " + str(v).upper())
if stdout:
if nodes.status[i] == 0:
print(" " + "-" * (max_length + 21))
for line in nodes.output[i].splitlines():
print(" " + line)
header = f"{script['name'].upper()}"
else:
print("Wrong action '{}'".format(action))
printer.error(f"Wrong action '{action}'")
exit(13)
mdprint(Rule(header, style="white"))
for node in nodes.status:
status_str = "[✓] PASS(0)" if nodes.status[node] == 0 else f"[x] FAIL({nodes.status[node]})"
title_line = f"{node}{status_str}"
test_output = Text()
if action == "test" and nodes.status[node] == 0:
results = nodes.result[node]
test_output.append("TEST RESULTS:\n")
max_key_len = max(len(k) for k in results.keys())
for k, v in results.items():
status = "[✓]" if str(v).upper() == "TRUE" else "[x]"
test_output.append(f" {k.ljust(max_key_len)} {status}\n")
output = nodes.output[node].strip()
code_block = Text()
if stdout and output:
code_block = Text(output + "\n")
if action == "test" and nodes.status[node] == 0:
highlight_words = [k for k, v in nodes.result[node].items() if str(v).upper() == "TRUE"]
code_block.highlight_words(highlight_words, style=Style(color="green", bold=True, underline=True))
panel_content = Group(test_output, Text(""), code_block)
mdprint(Panel(panel_content, title=title_line, width=PANEL_WIDTH, border_style="white"))
def _choose(self, list, name, action):
#Generates an inquirer list to pick
if FzfPrompt and self.fzf:
@@ -1429,7 +1449,7 @@ class connapp:
if subparser.description != None:
commands.append(subcommand)
commands = ",".join(commands)
usage_help = f"conn [-h] [--add | --del | --mod | --show | --debug] [node|folder] [--sftp]\n conn {{{commands}}} ..."
usage_help = f"connpy [-h] [--add | --del | --mod | --show | --debug] [node|folder] [--sftp]\n connpy {{{commands}}} ..."
return usage_help
if type == "end":
help_dict = {}
@@ -1602,5 +1622,4 @@ Here are some important instructions and tips for configuring your new node:
Please follow these instructions carefully to ensure proper configuration of your new node.
"""
# print(instructions)
mdprint(Markdown(instructions))