From 1bd9bd62c50be4dd81c52b49b2503e0f81ea587d Mon Sep 17 00:00:00 2001
From: Federico Luzzi In the The ```python
+def modify_config(cls):
+# Example modification: adding a new attribute or modifying an existing one
+cls.new_attribute = 'New Value' class Preload:
+def init(self, connapp):
+# Applying modification to the config class instance
+connapp.config.modify(modify_config)
+``` There are 2 methods that allows you to define custom logic to be executed before ( ```python
+def pre_processing_hook(args, *kwargs):
+print("Pre-processing logic here")
+# Modify arguments or perform any checks
+return args, kwargs
+# Return modified or unmodified args and kwargs def post_processing_hook(args, *kwargs):
+print("Post-processing logic here")
+# Modify the result or perform any final logging or cleanup
+return kwargs["result"]
+# Return the modified or unmodified result class Preload:
+def init(self, connapp):
+# Registering a pre-hook
+connapp.ai.some_method.register_pre_hook(pre_processing_hook) ```Manage profiles
@@ -655,6 +862,7 @@ __pdoc__ = {
def __init__(self):
self.plugins = {}
self.plugin_parsers = {}
+ self.preloads = {}
def verify_script(self, file_path):
"""
@@ -674,7 +882,7 @@ __pdoc__ = {
### Verifications:
- The presence of only allowed top-level elements.
- - The existence of two specific classes: 'Parser' and 'Entrypoint'.
+ - The existence of two specific classes: 'Parser' and 'Entrypoint'. and/or specific class: Preload.
- 'Parser' class must only have an '__init__' method and must assign 'self.parser'
and 'self.description'.
- 'Entrypoint' class must have an '__init__' method accepting specific arguments.
@@ -695,8 +903,10 @@ __pdoc__ = {
except SyntaxError as e:
return f"Syntax error in file: {e}"
- required_classes = {'Parser', 'Entrypoint'}
- found_classes = set()
+
+ has_parser = False
+ has_entrypoint = False
+ has_preload = False
for node in tree.body:
# Allow only function definitions, class definitions, and pass statements at top-level
@@ -712,10 +922,10 @@ __pdoc__ = {
elif not isinstance(node, (ast.FunctionDef, ast.ClassDef, ast.Import, ast.ImportFrom, ast.Pass)):
return f"Plugin can only have pass, functions, classes and imports. {node} is not allowed" # Reject any other AST types
- if isinstance(node, ast.ClassDef) and node.name in required_classes:
- found_classes.add(node.name)
+ if isinstance(node, ast.ClassDef):
if node.name == 'Parser':
+ has_parser = True
# Ensure Parser class has only the __init__ method and assigns self.parser
if not all(isinstance(method, ast.FunctionDef) and method.name == '__init__' for method in node.body):
return "Parser class should only have __init__ method"
@@ -727,14 +937,27 @@ __pdoc__ = {
return "Parser class should set self.parser and self.description" # 'self.parser' or 'self.description' not assigned in __init__
elif node.name == 'Entrypoint':
+ has_entrypoint = True
init_method = next((item for item in node.body if isinstance(item, ast.FunctionDef) and item.name == '__init__'), None)
if not init_method or len(init_method.args.args) != 4: # self, args, parser, conapp
- return "Entrypoint class should accept only arguments: args, parser and connapp" # 'Entrypoint' __init__ does not have correct signature
+ return "Entrypoint class should have method __init__ and accept only arguments: args, parser and connapp" # 'Entrypoint' __init__ does not have correct signature
- if required_classes == found_classes:
- return False
- else:
- return "Classes Entrypoint and Parser are mandatory"
+ elif node.name == 'Preload':
+ has_preload = True
+ init_method = next((item for item in node.body if isinstance(item, ast.FunctionDef) and item.name == '__init__'), None)
+ if not init_method or len(init_method.args.args) != 2: # self, connapp
+ return "Preload class should have method __init__ and accept only argument: connapp" # 'Preload' __init__ does not have correct signature
+
+ # Applying the combination logic based on class presence
+ if has_parser and not has_entrypoint:
+ return "Parser requires Entrypoint class to be present."
+ elif has_entrypoint and not has_parser:
+ return "Entrypoint requires Parser class to be present."
+
+ if not (has_parser or has_entrypoint or has_preload):
+ return "No valid class (Parser, Entrypoint, or Preload) found."
+
+ return False # All requirements met, no error
def _import_from_path(self, path):
spec = importlib.util.spec_from_file_location("module.name", path)
@@ -754,11 +977,15 @@ __pdoc__ = {
filepath = os.path.join(directory, filename)
check_file = self.verify_script(filepath)
if check_file:
+ print(f"Failed to load plugin: {filename}. Reason: {check_file}")
continue
else:
self.plugins[root_filename] = self._import_from_path(filepath)
- self.plugin_parsers[root_filename] = self.plugins[root_filename].Parser()
- subparsers.add_parser(root_filename, parents=[self.plugin_parsers[root_filename].parser], add_help=False, description=self.plugin_parsers[root_filename].description)
+ if hasattr(self.plugins[root_filename], "Parser"):
+ self.plugin_parsers[root_filename] = self.plugins[root_filename].Parser()
+ subparsers.add_parser(root_filename, parents=[self.plugin_parsers[root_filename].parser], add_help=False, description=self.plugin_parsers[root_filename].description)
+ if hasattr(self.plugins[root_filename], "Preload"):
+ self.preloads[root_filename] = self.plugins[root_filename]
usage: conn profile [-h] (--add | --del | --mod | --show) profile
@@ -107,22 +108,132 @@ options:
Specific Class Requirements
-
+Parser
:
+
+__init__
.__init__
method must initialize at least two attributes:__init__
method must initialize at least two attributes:
self.parser
: An instance of argparse.ArgumentParser
.self.description
: A string containing the description of the parser.Entrypoint
:
-
+__init__
method that accepts exactly three parameters besides self
:__init__
method that accepts exactly three parameters besides self
:
args
: Arguments passed to the plugin.self.parser
from the Parser
class).Preload
:
+
+
+
+__init__
method that accepts parameter connapp besides self
.Class Dependencies and Combinations
+
+
+Parser
and Entrypoint
are interdependent and must both be present if one is included.Preload
is independent and may exist alone or alongside the other classes.Parser
and Entrypoint
together.Preload
alone.Parser
, Entrypoint
, Preload
).Preload Modifications and Hooks
+Preload
class of the plugin system, you have the ability to customize the behavior of existing classes and methods within the application through a robust hooking system. This documentation explains how to use the modify
, register_pre_hook
, and register_post_hook
methods to tailor plugin functionality to your needs.Modifying Classes with
+modify
modify
method allows you to alter instances of a class at the time they are created or after their creation. This is particularly useful for setting or modifying configuration settings, altering default behaviors, or adding new functionalities to existing classes without changing the original class definitions.
+
+modify(modification_method)
: A function that is invoked with an instance of the class as its argument. This function should perform any modifications directly on this instance.
+
+cls
:
+This function accepts a single argument, the class instance, which it then modifies.
+
+connapp.config
connapp.node
connapp.nodes
connapp.ai
Implementing Method Hooks
+register_pre_hook
) or after (register_post_hook
) the main logic of a method. This is particularly useful for logging, auditing, preprocessing inputs, postprocessing outputs or adding functionalities.
+
register_pre_hook(pre_hook_method)
: A function that is invoked before the main method is executed. This function should do preprocessing of the arguments.register_post_hook(post_hook_method)
: A function that is invoked after the main method is executed. This function should do postprocessing of the outputs.pre_hook_method(*args, **kwargs)
+
+*args
, **kwargs
: The arguments and keyword arguments that will be passed to the method being hooked. The pre-hook function has the opportunity to inspect and modify these arguments before they are passed to the main method.
+
+(args, kwargs)
, which will be used as the new arguments for the main method. If the original arguments are not modified, the function should return them as received.post_hook_method(*args, **kwargs)
+
+*args
, **kwargs
: The arguments and keyword arguments that were passed to the main method.kwargs["result"]
: The value returned by the main method. This allows the post-hook to inspect and even alter the result before it is returned to the original caller.
+
+kwargs["result"]
to return the original method result.
+
+ # Registering a post-hook
+ connapp.node.another_method.register_post_hook(post_processing_hook)
+
Executable Block
Methods
@@ -780,7 +1007,7 @@ and that it includes mandatory classes with specific attributes and methods.
- The presence of only allowed top-level elements.
-- The existence of two specific classes: 'Parser' and 'Entrypoint'.
+- The existence of two specific classes: 'Parser' and 'Entrypoint'. and/or specific class: Preload.
- 'Parser' class must only have an '__init__' method and must assign 'self.parser'
and 'self.description'.
- 'Entrypoint' class must have an '__init__' method accepting specific arguments.
@@ -814,7 +1041,7 @@ indicating successful verification.
### Verifications:
- The presence of only allowed top-level elements.
- - The existence of two specific classes: 'Parser' and 'Entrypoint'.
+ - The existence of two specific classes: 'Parser' and 'Entrypoint'. and/or specific class: Preload.
- 'Parser' class must only have an '__init__' method and must assign 'self.parser'
and 'self.description'.
- 'Entrypoint' class must have an '__init__' method accepting specific arguments.
@@ -835,8 +1062,10 @@ indicating successful verification.
except SyntaxError as e:
return f"Syntax error in file: {e}"
- required_classes = {'Parser', 'Entrypoint'}
- found_classes = set()
+
+ has_parser = False
+ has_entrypoint = False
+ has_preload = False
for node in tree.body:
# Allow only function definitions, class definitions, and pass statements at top-level
@@ -852,10 +1081,10 @@ indicating successful verification.
elif not isinstance(node, (ast.FunctionDef, ast.ClassDef, ast.Import, ast.ImportFrom, ast.Pass)):
return f"Plugin can only have pass, functions, classes and imports. {node} is not allowed" # Reject any other AST types
- if isinstance(node, ast.ClassDef) and node.name in required_classes:
- found_classes.add(node.name)
+ if isinstance(node, ast.ClassDef):
if node.name == 'Parser':
+ has_parser = True
# Ensure Parser class has only the __init__ method and assigns self.parser
if not all(isinstance(method, ast.FunctionDef) and method.name == '__init__' for method in node.body):
return "Parser class should only have __init__ method"
@@ -867,14 +1096,27 @@ indicating successful verification.
return "Parser class should set self.parser and self.description" # 'self.parser' or 'self.description' not assigned in __init__
elif node.name == 'Entrypoint':
+ has_entrypoint = True
init_method = next((item for item in node.body if isinstance(item, ast.FunctionDef) and item.name == '__init__'), None)
if not init_method or len(init_method.args.args) != 4: # self, args, parser, conapp
- return "Entrypoint class should accept only arguments: args, parser and connapp" # 'Entrypoint' __init__ does not have correct signature
+ return "Entrypoint class should have method __init__ and accept only arguments: args, parser and connapp" # 'Entrypoint' __init__ does not have correct signature
- if required_classes == found_classes:
- return False
- else:
- return "Classes Entrypoint and Parser are mandatory"
+ elif node.name == 'Preload':
+ has_preload = True
+ init_method = next((item for item in node.body if isinstance(item, ast.FunctionDef) and item.name == '__init__'), None)
+ if not init_method or len(init_method.args.args) != 2: # self, connapp
+ return "Preload class should have method __init__ and accept only argument: connapp" # 'Preload' __init__ does not have correct signature
+
+ # Applying the combination logic based on class presence
+ if has_parser and not has_entrypoint:
+ return "Parser requires Entrypoint class to be present."
+ elif has_entrypoint and not has_parser:
+ return "Entrypoint requires Parser class to be present."
+
+ if not (has_parser or has_entrypoint or has_preload):
+ return "No valid class (Parser, Entrypoint, or Preload) found."
+
+ return False # All requirements met, no error
@@ -914,7 +1156,8 @@ indicating successful verification.
class ai:
+@ClassHook
+class ai:
''' This class generates a ai object. Containts all the information and methods to make requests to openAI chatGPT to run actions on the application.
### Attributes:
@@ -1082,16 +1325,7 @@ Categorize the user's request based on the operation they want to perform on
self.__prompt["confirmation_function"]["parameters"]["properties"]["response"]["type"] = "string"
self.__prompt["confirmation_function"]["parameters"]["required"] = ["result"]
- def process_string(self, s):
- if s.startswith('[') and s.endswith(']') and not (s.startswith("['") and s.endswith("']")) and not (s.startswith('["') and s.endswith('"]')):
- # Extract the content inside square brackets and split by comma
- content = s[1:-1].split(',')
- # Add single quotes around each item and join them back together with commas
- new_content = ', '.join(f"'{item.strip()}'" for item in content)
- # Replace the old content with the new content
- s = '[' + new_content + ']'
- return s
-
+ @MethodHook
def _retry_function(self, function, max_retries, backoff_num, *args):
#Retry openai requests
retries = 0
@@ -1108,6 +1342,7 @@ Categorize the user's request based on the operation they want to perform on
myfunction = False
return myfunction
+ @MethodHook
def _clean_command_response(self, raw_response, node_list):
#Parse response for command request to openAI GPT.
info_dict = {}
@@ -1125,6 +1360,7 @@ Categorize the user's request based on the operation they want to perform on
info_dict["variables"][key] = newvalue
return info_dict
+ @MethodHook
def _get_commands(self, user_input, nodes):
#Send the request for commands for each device to openAI GPT.
output_list = []
@@ -1164,6 +1400,7 @@ Categorize the user's request based on the operation they want to perform on
output["response"] = self._clean_command_response(json_result, node_list)
return output
+ @MethodHook
def _get_filter(self, user_input, chat_history = None):
#Send the request to identify the filter and other attributes from the user input to GPT.
message = []
@@ -1205,6 +1442,7 @@ Categorize the user's request based on the operation they want to perform on
output["chat_history"] = chat_history
return output
+ @MethodHook
def _get_confirmation(self, user_input):
#Send the request to identify if user is confirming or denying the task
message = []
@@ -1229,6 +1467,7 @@ Categorize the user's request based on the operation they want to perform on
output["result"] = json_result["response"]
return output
+ @MethodHook
def confirm(self, user_input, max_retries=3, backoff_num=1):
'''
Send the user input to openAI GPT and verify if response is afirmative or negative.
@@ -1254,6 +1493,7 @@ Categorize the user's request based on the operation they want to perform on
output = f"{self.model} api is not responding right now, please try again later."
return output
+ @MethodHook
def ask(self, user_input, dryrun = False, chat_history = None, max_retries=3, backoff_num=1):
'''
Send the user input to openAI GPT and parse the response to run an action in the application.
@@ -1419,7 +1659,8 @@ Categorize the user's request based on the operation they want to perform on
Expand source code
-def ask(self, user_input, dryrun = False, chat_history = None, max_retries=3, backoff_num=1):
+@MethodHook
+def ask(self, user_input, dryrun = False, chat_history = None, max_retries=3, backoff_num=1):
'''
Send the user input to openAI GPT and parse the response to run an action in the application.
@@ -1555,7 +1796,8 @@ Categorize the user's request based on the operation they want to perform on
Expand source code
-def confirm(self, user_input, max_retries=3, backoff_num=1):
+@MethodHook
+def confirm(self, user_input, max_retries=3, backoff_num=1):
'''
Send the user input to openAI GPT and verify if response is afirmative or negative.
@@ -1581,26 +1823,6 @@ Categorize the user's request based on the operation they want to perform on
return output
-
-def process_string(self, s)
-
-
-
-
-
-Expand source code
-
-def process_string(self, s):
- if s.startswith('[') and s.endswith(']') and not (s.startswith("['") and s.endswith("']")) and not (s.startswith('["') and s.endswith('"]')):
- # Extract the content inside square brackets and split by comma
- content = s[1:-1].split(',')
- # Add single quotes around each item and join them back together with commas
- new_content = ', '.join(f"'{item.strip()}'" for item in content)
- # Replace the old content with the new content
- s = '[' + new_content + ']'
- return s
-
-
@@ -1640,7 +1862,8 @@ Categorize the user's request based on the operation they want to perform on
Expand source code
-class configfile:
+@ClassHook
+class configfile:
''' This class generates a configfile object. Containts a dictionary storing, config, nodes and profiles, normaly used by connection manager.
### Attributes:
@@ -1735,15 +1958,20 @@ Categorize the user's request based on the operation they want to perform on
jsonconf.close()
return jsondata
+ @MethodHook
def _saveconfig(self, conf):
#Save config file
newconfig = {"config":{}, "connections": {}, "profiles": {}}
newconfig["config"] = self.config
newconfig["connections"] = self.connections
newconfig["profiles"] = self.profiles
- with open(conf, "w") as f:
- json.dump(newconfig, f, indent = 4)
- f.close()
+ try:
+ with open(conf, "w") as f:
+ json.dump(newconfig, f, indent = 4)
+ f.close()
+ except:
+ return 1
+ return 0
def _createkey(self, keyfile):
#Create key file
@@ -1754,6 +1982,7 @@ Categorize the user's request based on the operation they want to perform on
os.chmod(keyfile, 0o600)
return key
+ @MethodHook
def _explode_unique(self, unique):
#Divide unique name into folder, subfolder and id
uniques = unique.split("@")
@@ -1774,6 +2003,7 @@ Categorize the user's request based on the operation they want to perform on
return False
return result
+ @MethodHook
def getitem(self, unique, keys = None):
'''
Get an node or a group of nodes from configfile which can be passed to node/nodes class
@@ -1829,6 +2059,7 @@ Categorize the user's request based on the operation they want to perform on
newnode.pop("type")
return newnode
+ @MethodHook
def getitems(self, uniques):
'''
Get a group of nodes from configfile which can be passed to node/nodes class
@@ -1870,6 +2101,7 @@ Categorize the user's request based on the operation they want to perform on
return nodes
+ @MethodHook
def _connections_add(self,*, id, host, folder='', subfolder='', options='', logs='', password='', port='', protocol='', user='', tags='', jumphost='', type = "connection" ):
#Add connection from config
if folder == '':
@@ -1880,6 +2112,7 @@ Categorize the user's request based on the operation they want to perform on
self.connections[folder][subfolder][id] = {"host": host, "options": options, "logs": logs, "password": password, "port": port, "protocol": protocol, "user": user, "tags": tags, "jumphost": jumphost, "type": type}
+ @MethodHook
def _connections_del(self,*, id, folder='', subfolder=''):
#Delete connection from config
if folder == '':
@@ -1889,6 +2122,7 @@ Categorize the user's request based on the operation they want to perform on
elif folder != '' and subfolder != '':
del self.connections[folder][subfolder][id]
+ @MethodHook
def _folder_add(self,*, folder, subfolder = ''):
#Add Folder from config
if subfolder == '':
@@ -1898,6 +2132,7 @@ Categorize the user's request based on the operation they want to perform on
if subfolder not in self.connections[folder]:
self.connections[folder][subfolder] = {"type": "subfolder"}
+ @MethodHook
def _folder_del(self,*, folder, subfolder=''):
#Delete folder from config
if subfolder == '':
@@ -1906,15 +2141,18 @@ Categorize the user's request based on the operation they want to perform on
del self.connections[folder][subfolder]
+ @MethodHook
def _profiles_add(self,*, id, host = '', options='', logs='', password='', port='', protocol='', user='', tags='', jumphost='' ):
#Add profile from config
self.profiles[id] = {"host": host, "options": options, "logs": logs, "password": password, "port": port, "protocol": protocol, "user": user, "tags": tags, "jumphost": jumphost}
+ @MethodHook
def _profiles_del(self,*, id ):
#Delete profile from config
del self.profiles[id]
+ @MethodHook
def _getallnodes(self, filter = None):
#get all nodes on configfile
nodes = []
@@ -1937,6 +2175,7 @@ Categorize the user's request based on the operation they want to perform on
raise ValueError("filter must be a string or a list of strings")
return nodes
+ @MethodHook
def _getallnodesfull(self, filter = None, extract = True):
#get all nodes on configfile with all their attributes.
nodes = {}
@@ -1976,6 +2215,7 @@ Categorize the user's request based on the operation they want to perform on
return nodes
+ @MethodHook
def _getallfolders(self):
#get all folders on configfile
folders = ["@" + k for k,v in self.connections.items() if isinstance(v, dict) and v["type"] == "folder"]
@@ -1986,6 +2226,7 @@ Categorize the user's request based on the operation they want to perform on
folders.extend(subfolders)
return folders
+ @MethodHook
def _profileused(self, profile):
#Check if profile is used before deleting it
nodes = []
@@ -1999,10 +2240,87 @@ Categorize the user's request based on the operation they want to perform on
for s in subfolders:
layer3 = [k + "@" + s + "@" + f for k,v in self.connections[f][s].items() if isinstance(v, dict) and v["type"] == "connection" and ("@" + profile in v.values() or ( isinstance(v["password"],list) and "@" + profile in v["password"]))]
nodes.extend(layer3)
- return nodes
+ return nodes
+
+ @MethodHook
+ def encrypt(self, password, keyfile=None):
+ '''
+ Encrypts password using RSA keyfile
+
+ ### Parameters:
+
+ - password (str): Plaintext password to encrypt.
+
+ ### Optional Parameters:
+
+ - keyfile (str): Path/file to keyfile. Default is config keyfile.
+
+
+ ### Returns:
+
+ str: Encrypted password.
+
+ '''
+ if keyfile is None:
+ keyfile = self.key
+ with open(keyfile) as f:
+ key = RSA.import_key(f.read())
+ f.close()
+ publickey = key.publickey()
+ encryptor = PKCS1_OAEP.new(publickey)
+ password = encryptor.encrypt(password.encode("utf-8"))
+ return str(password)
Methods
+
+def encrypt(self, password, keyfile=None)
+
+-
+
Encrypts password using RSA keyfile
+Parameters:
+- password (str): Plaintext password to encrypt.
+
+Optional Parameters:
+- keyfile (str): Path/file to keyfile. Default is config keyfile.
+
+Returns:
+str: Encrypted password.
+
+
+
+Expand source code
+
+@MethodHook
+def encrypt(self, password, keyfile=None):
+ '''
+ Encrypts password using RSA keyfile
+
+ ### Parameters:
+
+ - password (str): Plaintext password to encrypt.
+
+ ### Optional Parameters:
+
+ - keyfile (str): Path/file to keyfile. Default is config keyfile.
+
+
+ ### Returns:
+
+ str: Encrypted password.
+
+ '''
+ if keyfile is None:
+ keyfile = self.key
+ with open(keyfile) as f:
+ key = RSA.import_key(f.read())
+ f.close()
+ publickey = key.publickey()
+ encryptor = PKCS1_OAEP.new(publickey)
+ password = encryptor.encrypt(password.encode("utf-8"))
+ return str(password)
+
+
def getitem(self, unique, keys=None)
@@ -2025,7 +2343,8 @@ Categorize the user's request based on the operation they want to perform on
Expand source code
-def getitem(self, unique, keys = None):
+@MethodHook
+def getitem(self, unique, keys = None):
'''
Get an node or a group of nodes from configfile which can be passed to node/nodes class
@@ -2099,7 +2418,8 @@ Categorize the user's request based on the operation they want to perform on
Expand source code
-def getitems(self, uniques):
+@MethodHook
+def getitems(self, uniques):
'''
Get a group of nodes from configfile which can be passed to node/nodes class
@@ -2142,1744 +2462,6 @@ Categorize the user's request based on the operation they want to perform on
-
-class connapp
-(config)
-
-
-This class starts the connection manager app. It's normally used by connection manager but you can use it on a script to run the connection manager your way and use a different configfile and key.
-Parameters:
-- config (obj): Object generated with configfile class, it contains
- the nodes configuration and the methods to manage
- the config file.
-
-
-
-Expand source code
-
-class connapp:
- ''' This class starts the connection manager app. It's normally used by connection manager but you can use it on a script to run the connection manager your way and use a different configfile and key.
- '''
-
- def __init__(self, config):
- '''
-
- ### Parameters:
-
- - config (obj): Object generated with configfile class, it contains
- the nodes configuration and the methods to manage
- the config file.
-
- '''
- self.node = node
- self.connnodes = nodes
- self.config = config
- self.nodes = self.config._getallnodes()
- self.folders = self.config._getallfolders()
- self.profiles = list(self.config.profiles.keys())
- self.case = self.config.config["case"]
- try:
- self.fzf = self.config.config["fzf"]
- except:
- self.fzf = False
-
-
- def start(self,argv = sys.argv[1:]):
- '''
-
- ### Parameters:
-
- - argv (list): List of arguments to pass to the app.
- Default: sys.argv[1:]
-
- '''
- #DEFAULTPARSER
- defaultparser = argparse.ArgumentParser(prog = "conn", description = "SSH and Telnet connection manager", formatter_class=argparse.RawTextHelpFormatter)
- subparsers = defaultparser.add_subparsers(title="Commands", dest="subcommand")
- #NODEPARSER
- nodeparser = subparsers.add_parser("node", formatter_class=argparse.RawTextHelpFormatter)
- nodecrud = nodeparser.add_mutually_exclusive_group()
- nodeparser.add_argument("node", metavar="node|folder", nargs='?', default=None, action=self._store_type, help=self._help("node"))
- nodecrud.add_argument("-v","--version", dest="action", action="store_const", help="Show version", const="version", default="connect")
- nodecrud.add_argument("-a","--add", dest="action", action="store_const", help="Add new node[@subfolder][@folder] or [@subfolder]@folder", const="add", default="connect")
- nodecrud.add_argument("-r","--del", "--rm", dest="action", action="store_const", help="Delete node[@subfolder][@folder] or [@subfolder]@folder", const="del", default="connect")
- nodecrud.add_argument("-e","--mod", "--edit", dest="action", action="store_const", help="Modify node[@subfolder][@folder]", const="mod", default="connect")
- nodecrud.add_argument("-s","--show", dest="action", action="store_const", help="Show node[@subfolder][@folder]", const="show", default="connect")
- nodecrud.add_argument("-d","--debug", dest="debug", action="store_true", help="Display all conections steps")
- nodeparser.add_argument("-t","--sftp", dest="sftp", action="store_true", help="Connects using sftp instead of ssh")
- nodeparser.set_defaults(func=self._func_node)
- #PROFILEPARSER
- profileparser = subparsers.add_parser("profile", description="Manage profiles")
- profileparser.add_argument("profile", nargs=1, action=self._store_type, type=self._type_profile, help="Name of profile to manage")
- profilecrud = profileparser.add_mutually_exclusive_group(required=True)
- profilecrud.add_argument("-a", "--add", dest="action", action="store_const", help="Add new profile", const="add")
- profilecrud.add_argument("-r", "--del", "--rm", dest="action", action="store_const", help="Delete profile", const="del")
- profilecrud.add_argument("-e", "--mod", "--edit", dest="action", action="store_const", help="Modify profile", const="mod")
- profilecrud.add_argument("-s", "--show", dest="action", action="store_const", help="Show profile", const="show")
- profileparser.set_defaults(func=self._func_profile)
- #MOVEPARSER
- moveparser = subparsers.add_parser("move", aliases=["mv"], description="Move node")
- moveparser.add_argument("move", nargs=2, action=self._store_type, help="Move node[@subfolder][@folder] dest_node[@subfolder][@folder]", default="move", type=self._type_node)
- moveparser.set_defaults(func=self._func_others)
- #COPYPARSER
- copyparser = subparsers.add_parser("copy", aliases=["cp"], description="Copy node")
- copyparser.add_argument("cp", nargs=2, action=self._store_type, help="Copy node[@subfolder][@folder] new_node[@subfolder][@folder]", default="cp", type=self._type_node)
- copyparser.set_defaults(func=self._func_others)
- #LISTPARSER
- lsparser = subparsers.add_parser("list", aliases=["ls"], description="List profiles, nodes or folders")
- lsparser.add_argument("ls", action=self._store_type, choices=["profiles","nodes","folders"], help="List profiles, nodes or folders", default=False)
- lsparser.add_argument("--filter", nargs=1, help="Filter results")
- lsparser.add_argument("--format", nargs=1, help="Format of the output of nodes using {name}, {NAME}, {location}, {LOCATION}, {host} and {HOST}")
- lsparser.set_defaults(func=self._func_others)
- #BULKPARSER
- bulkparser = subparsers.add_parser("bulk", description="Add nodes in bulk")
- bulkparser.add_argument("bulk", const="bulk", nargs=0, action=self._store_type, help="Add nodes in bulk")
- bulkparser.set_defaults(func=self._func_others)
- # EXPORTPARSER
- exportparser = subparsers.add_parser("export", description="Export connection folder to Yaml file")
- exportparser.add_argument("export", nargs="+", action=self._store_type, help="Export /path/to/file.yml [@subfolder1][@folder1] [@subfolderN][@folderN]")
- exportparser.set_defaults(func=self._func_export)
- # IMPORTPARSER
- importparser = subparsers.add_parser("import", description="Import connection folder to config from Yaml file")
- importparser.add_argument("file", nargs=1, action=self._store_type, help="Import /path/to/file.yml")
- importparser.set_defaults(func=self._func_import)
- # AIPARSER
- aiparser = subparsers.add_parser("ai", description="Make request to an AI")
- aiparser.add_argument("ask", nargs='*', help="Ask connpy AI something")
- aiparser.add_argument("--model", nargs=1, help="Set the OPENAI model id")
- aiparser.add_argument("--org", nargs=1, help="Set the OPENAI organization id")
- aiparser.add_argument("--api_key", nargs=1, help="Set the OPENAI API key")
- aiparser.set_defaults(func=self._func_ai)
- #RUNPARSER
- runparser = subparsers.add_parser("run", description="Run scripts or commands on nodes", formatter_class=argparse.RawTextHelpFormatter)
- runparser.add_argument("run", nargs='+', action=self._store_type, help=self._help("run"), default="run")
- runparser.add_argument("-g","--generate", dest="action", action="store_const", help="Generate yaml file template", const="generate", default="run")
- runparser.set_defaults(func=self._func_run)
- #APIPARSER
- apiparser = subparsers.add_parser("api", description="Start and stop connpy api")
- apicrud = apiparser.add_mutually_exclusive_group(required=True)
- apicrud.add_argument("-s","--start", dest="start", nargs="?", action=self._store_type, help="Start conppy api", type=int, default=8048, metavar="PORT")
- apicrud.add_argument("-r","--restart", dest="restart", nargs=0, action=self._store_type, help="Restart conppy api")
- apicrud.add_argument("-x","--stop", dest="stop", nargs=0, action=self._store_type, help="Stop conppy api")
- apicrud.add_argument("-d", "--debug", dest="debug", nargs="?", action=self._store_type, help="Run connpy server on debug mode", type=int, default=8048, metavar="PORT")
- apiparser.set_defaults(func=self._func_api)
- #PLUGINSPARSER
- pluginparser = subparsers.add_parser("plugin", description="Manage plugins")
- plugincrud = pluginparser.add_mutually_exclusive_group(required=True)
- plugincrud.add_argument("--add", metavar=("PLUGIN", "FILE"), nargs=2, help="Add new plugin")
- plugincrud.add_argument("--update", metavar=("PLUGIN", "FILE"), nargs=2, help="Update plugin")
- plugincrud.add_argument("--del", dest="delete", metavar="PLUGIN", nargs=1, help="Delete plugin")
- plugincrud.add_argument("--enable", metavar="PLUGIN", nargs=1, help="Enable plugin")
- plugincrud.add_argument("--disable", metavar="PLUGIN", nargs=1, help="Disable plugin")
- plugincrud.add_argument("--list", dest="list", action="store_true", help="Disable plugin")
- pluginparser.set_defaults(func=self._func_plugin)
- #CONFIGPARSER
- configparser = subparsers.add_parser("config", description="Manage app config")
- configcrud = configparser.add_mutually_exclusive_group(required=True)
- configcrud.add_argument("--allow-uppercase", dest="case", nargs=1, action=self._store_type, help="Allow case sensitive names", choices=["true","false"])
- configcrud.add_argument("--fzf", dest="fzf", nargs=1, action=self._store_type, help="Use fzf for lists", choices=["true","false"])
- configcrud.add_argument("--keepalive", dest="idletime", nargs=1, action=self._store_type, help="Set keepalive time in seconds, 0 to disable", type=int, metavar="INT")
- configcrud.add_argument("--completion", dest="completion", nargs=1, choices=["bash","zsh"], action=self._store_type, help="Get terminal completion configuration for conn")
- configcrud.add_argument("--configfolder", dest="configfolder", nargs=1, action=self._store_type, help="Set the default location for config file", metavar="FOLDER")
- configcrud.add_argument("--openai-org", dest="organization", nargs=1, action=self._store_type, help="Set openai organization", metavar="ORGANIZATION")
- configcrud.add_argument("--openai-api-key", dest="api_key", nargs=1, action=self._store_type, help="Set openai api_key", metavar="API_KEY")
- configcrud.add_argument("--openai-model", dest="model", nargs=1, action=self._store_type, help="Set openai model", metavar="MODEL")
- configparser.set_defaults(func=self._func_others)
- #Add plugins
- file_path = self.config.defaultdir + "/plugins"
- self.plugins = Plugins()
- self.plugins._import_plugins_to_argparse(file_path, subparsers)
- #Generate helps
- nodeparser.usage = self._help("usage", subparsers)
- nodeparser.epilog = self._help("end", subparsers)
- nodeparser.help = self._help("node")
- #Manage sys arguments
- self.commands = list(subparsers.choices.keys())
- profilecmds = []
- for action in profileparser._actions:
- profilecmds.extend(action.option_strings)
- if len(argv) >= 2 and argv[1] == "profile" and argv[0] in profilecmds:
- argv[1] = argv[0]
- argv[0] = "profile"
- if len(argv) < 1 or argv[0] not in self.commands:
- argv.insert(0,"node")
- args = defaultparser.parse_args(argv)
- if args.subcommand in self.plugins.plugins:
- self.plugins.plugins[args.subcommand].Entrypoint(args, self.plugins.plugin_parsers[args.subcommand].parser, self)
- else:
- return args.func(args)
-
- class _store_type(argparse.Action):
- #Custom store type for cli app.
- def __call__(self, parser, args, values, option_string=None):
- setattr(args, "data", values)
- delattr(args,self.dest)
- setattr(args, "command", self.dest)
-
- def _func_node(self, args):
- #Function called when connecting or managing nodes.
- if not self.case and args.data != None:
- args.data = args.data.lower()
- actions = {"version": self._version, "connect": self._connect, "add": self._add, "del": self._del, "mod": self._mod, "show": self._show}
- return actions.get(args.action)(args)
-
- def _version(self, args):
- print(__version__)
-
- def _connect(self, args):
- if args.data == None:
- matches = self.nodes
- if len(matches) == 0:
- print("There are no nodes created")
- print("try: conn --help")
- exit(9)
- else:
- if args.data.startswith("@"):
- matches = list(filter(lambda k: args.data in k, self.nodes))
- else:
- matches = list(filter(lambda k: k.startswith(args.data), self.nodes))
- if len(matches) == 0:
- print("{} not found".format(args.data))
- exit(2)
- elif len(matches) > 1:
- matches[0] = self._choose(matches,"node", "connect")
- if matches[0] == None:
- exit(7)
- node = self.config.getitem(matches[0])
- node = self.node(matches[0],**node, config = self.config)
- if args.sftp:
- node.protocol = "sftp"
- if args.debug:
- node.interact(debug = True)
- else:
- node.interact()
-
- def _del(self, args):
- if args.data == None:
- print("Missing argument node")
- exit(3)
- elif args.data.startswith("@"):
- matches = list(filter(lambda k: k == args.data, self.folders))
- else:
- matches = self.config._getallnodes(args.data)
- if len(matches) == 0:
- print("{} not found".format(args.data))
- exit(2)
- print("Removing: {}".format(matches))
- question = [inquirer.Confirm("delete", message="Are you sure you want to continue?")]
- confirm = inquirer.prompt(question)
- if confirm == None:
- exit(7)
- if confirm["delete"]:
- if args.data.startswith("@"):
- uniques = self.config._explode_unique(matches[0])
- self.config._folder_del(**uniques)
- else:
- for node in matches:
- nodeuniques = self.config._explode_unique(node)
- self.config._connections_del(**nodeuniques)
- self.config._saveconfig(self.config.file)
- if len(matches) == 1:
- print("{} deleted succesfully".format(matches[0]))
- else:
- print(f"{len(matches)} nodes deleted succesfully")
-
- def _add(self, args):
- args.data = self._type_node(args.data)
- if args.data == None:
- print("Missing argument node")
- exit(3)
- elif args.data.startswith("@"):
- type = "folder"
- matches = list(filter(lambda k: k == args.data, self.folders))
- reversematches = list(filter(lambda k: "@" + k == args.data, self.nodes))
- else:
- type = "node"
- matches = list(filter(lambda k: k == args.data, self.nodes))
- reversematches = list(filter(lambda k: k == "@" + args.data, self.folders))
- if len(matches) > 0:
- print("{} already exist".format(matches[0]))
- exit(4)
- if len(reversematches) > 0:
- print("{} already exist".format(reversematches[0]))
- exit(4)
- else:
- if type == "folder":
- uniques = self.config._explode_unique(args.data)
- if uniques == False:
- print("Invalid folder {}".format(args.data))
- exit(5)
- if "subfolder" in uniques.keys():
- parent = "@" + uniques["folder"]
- if parent not in self.folders:
- print("Folder {} not found".format(uniques["folder"]))
- exit(2)
- self.config._folder_add(**uniques)
- self.config._saveconfig(self.config.file)
- print("{} added succesfully".format(args.data))
- if type == "node":
- nodefolder = args.data.partition("@")
- nodefolder = "@" + nodefolder[2]
- if nodefolder not in self.folders and nodefolder != "@":
- print(nodefolder + " not found")
- exit(2)
- uniques = self.config._explode_unique(args.data)
- if uniques == False:
- print("Invalid node {}".format(args.data))
- exit(5)
- print("You can use the configured setting in a profile using @profilename.")
- print("You can also leave empty any value except hostname/IP.")
- print("You can pass 1 or more passwords using comma separated @profiles")
- print("You can use this variables on logging file name: ${id} ${unique} ${host} ${port} ${user} ${protocol}")
- print("Some useful tags to set for automation are 'os', 'screen_length_command', and 'prompt'.")
- newnode = self._questions_nodes(args.data, uniques)
- if newnode == False:
- exit(7)
- self.config._connections_add(**newnode)
- self.config._saveconfig(self.config.file)
- print("{} added succesfully".format(args.data))
-
- def _show(self, args):
- if args.data == None:
- print("Missing argument node")
- exit(3)
- matches = list(filter(lambda k: k == args.data, self.nodes))
- if len(matches) == 0:
- print("{} not found".format(args.data))
- exit(2)
- node = self.config.getitem(matches[0])
- for k, v in node.items():
- if isinstance(v, str):
- print(k + ": " + v)
- elif isinstance(v, list):
- print(k + ":")
- for i in v:
- print(" - " + i)
- elif isinstance(v, dict):
- print(k + ":")
- for i,d in v.items():
- print(" - " + i + ": " + d)
-
- def _mod(self, args):
- if args.data == None:
- print("Missing argument node")
- exit(3)
- matches = self.config._getallnodes(args.data)
- if len(matches) == 0:
- print("No connection found with filter: {}".format(args.data))
- exit(2)
- elif len(matches) == 1:
- uniques = self.config._explode_unique(args.data)
- unique = matches[0]
- else:
- uniques = {"id": None, "folder": None}
- unique = None
- print("Editing: {}".format(matches))
- node = {}
- for i in matches:
- node[i] = self.config.getitem(i)
- edits = self._questions_edit()
- if edits == None:
- exit(7)
- updatenode = self._questions_nodes(unique, uniques, edit=edits)
- if not updatenode:
- exit(7)
- if len(matches) == 1:
- uniques.update(node[matches[0]])
- uniques["type"] = "connection"
- if sorted(updatenode.items()) == sorted(uniques.items()):
- print("Nothing to do here")
- return
- else:
- self.config._connections_add(**updatenode)
- self.config._saveconfig(self.config.file)
- print("{} edited succesfully".format(args.data))
- else:
- for k in node:
- updatednode = self.config._explode_unique(k)
- updatednode["type"] = "connection"
- updatednode.update(node[k])
- editcount = 0
- for key, should_edit in edits.items():
- if should_edit:
- editcount += 1
- updatednode[key] = updatenode[key]
- if not editcount:
- print("Nothing to do here")
- return
- else:
- self.config._connections_add(**updatednode)
- self.config._saveconfig(self.config.file)
- print("{} edited succesfully".format(matches))
- return
-
-
- def _func_profile(self, args):
- #Function called when managing profiles
- if not self.case:
- args.data[0] = args.data[0].lower()
- actions = {"add": self._profile_add, "del": self._profile_del, "mod": self._profile_mod, "show": self._profile_show}
- return actions.get(args.action)(args)
-
- def _profile_del(self, args):
- matches = list(filter(lambda k: k == args.data[0], self.profiles))
- if len(matches) == 0:
- print("{} not found".format(args.data[0]))
- exit(2)
- if matches[0] == "default":
- print("Can't delete default profile")
- exit(6)
- usedprofile = self.config._profileused(matches[0])
- if len(usedprofile) > 0:
- print("Profile {} used in the following nodes:".format(matches[0]))
- print(", ".join(usedprofile))
- exit(8)
- question = [inquirer.Confirm("delete", message="Are you sure you want to delete {}?".format(matches[0]))]
- confirm = inquirer.prompt(question)
- if confirm["delete"]:
- self.config._profiles_del(id = matches[0])
- self.config._saveconfig(self.config.file)
- print("{} deleted succesfully".format(matches[0]))
-
- def _profile_show(self, args):
- matches = list(filter(lambda k: k == args.data[0], self.profiles))
- if len(matches) == 0:
- print("{} not found".format(args.data[0]))
- exit(2)
- profile = self.config.profiles[matches[0]]
- for k, v in profile.items():
- if isinstance(v, str):
- print(k + ": " + v)
- elif isinstance(v, list):
- print(k + ":")
- for i in v:
- print(" - " + i)
- elif isinstance(v, dict):
- print(k + ":")
- for i,d in v.items():
- print(" - " + i + ": " + d)
-
- def _profile_add(self, args):
- matches = list(filter(lambda k: k == args.data[0], self.profiles))
- if len(matches) > 0:
- print("Profile {} Already exist".format(matches[0]))
- exit(4)
- newprofile = self._questions_profiles(args.data[0])
- if newprofile == False:
- exit(7)
- self.config._profiles_add(**newprofile)
- self.config._saveconfig(self.config.file)
- print("{} added succesfully".format(args.data[0]))
-
- def _profile_mod(self, args):
- matches = list(filter(lambda k: k == args.data[0], self.profiles))
- if len(matches) == 0:
- print("{} not found".format(args.data[0]))
- exit(2)
- profile = self.config.profiles[matches[0]]
- oldprofile = {"id": matches[0]}
- oldprofile.update(profile)
- edits = self._questions_edit()
- if edits == None:
- exit(7)
- updateprofile = self._questions_profiles(matches[0], edit=edits)
- if not updateprofile:
- exit(7)
- if sorted(updateprofile.items()) == sorted(oldprofile.items()):
- print("Nothing to do here")
- return
- else:
- self.config._profiles_add(**updateprofile)
- self.config._saveconfig(self.config.file)
- print("{} edited succesfully".format(args.data[0]))
-
- def _func_others(self, args):
- #Function called when using other commands
- actions = {"ls": self._ls, "move": self._mvcp, "cp": self._mvcp, "bulk": self._bulk, "completion": self._completion, "case": self._case, "fzf": self._fzf, "idletime": self._idletime, "configfolder": self._configfolder, "organization": self._openai, "api_key": self._openai, "model": self._openai}
- return actions.get(args.command)(args)
-
- def _ls(self, args):
- items = getattr(self, args.data)
- if args.filter:
- items = [ item for item in items if re.search(args.filter[0], item)]
- if args.format and args.data == "nodes":
- newitems = []
- for i in items:
- formated = {}
- info = self.config.getitem(i)
- if "@" in i:
- name_part, location_part = i.split("@", 1)
- formated["location"] = "@" + location_part
- else:
- name_part = i
- formated["location"] = ""
- formated["name"] = name_part
- formated["host"] = info["host"]
- items_copy = list(formated.items())
- for key, value in items_copy:
- upper_key = key.upper()
- upper_value = value.upper()
- formated[upper_key] = upper_value
- newitems.append(args.format[0].format(**formated))
- items = newitems
- print(*items, sep="\n")
-
- def _mvcp(self, args):
- if not self.case:
- args.data[0] = args.data[0].lower()
- args.data[1] = args.data[1].lower()
- source = list(filter(lambda k: k == args.data[0], self.nodes))
- dest = list(filter(lambda k: k == args.data[1], self.nodes))
- if len(source) != 1:
- print("{} not found".format(args.data[0]))
- exit(2)
- if len(dest) > 0:
- print("Node {} Already exist".format(args.data[1]))
- exit(4)
- nodefolder = args.data[1].partition("@")
- nodefolder = "@" + nodefolder[2]
- if nodefolder not in self.folders and nodefolder != "@":
- print("{} not found".format(nodefolder))
- exit(2)
- olduniques = self.config._explode_unique(args.data[0])
- newuniques = self.config._explode_unique(args.data[1])
- if newuniques == False:
- print("Invalid node {}".format(args.data[1]))
- exit(5)
- node = self.config.getitem(source[0])
- newnode = {**newuniques, **node}
- self.config._connections_add(**newnode)
- if args.command == "move":
- self.config._connections_del(**olduniques)
- self.config._saveconfig(self.config.file)
- action = "moved" if args.command == "move" else "copied"
- print("{} {} succesfully to {}".format(args.data[0],action, args.data[1]))
-
- def _bulk(self, args):
- newnodes = self._questions_bulk()
- if newnodes == False:
- exit(7)
- if not self.case:
- newnodes["location"] = newnodes["location"].lower()
- newnodes["ids"] = newnodes["ids"].lower()
- ids = newnodes["ids"].split(",")
- hosts = newnodes["host"].split(",")
- count = 0
- for n in ids:
- unique = n + newnodes["location"]
- matches = list(filter(lambda k: k == unique, self.nodes))
- reversematches = list(filter(lambda k: k == "@" + unique, self.folders))
- if len(matches) > 0:
- print("Node {} already exist, ignoring it".format(unique))
- continue
- if len(reversematches) > 0:
- print("Folder with name {} already exist, ignoring it".format(unique))
- continue
- newnode = {"id": n}
- if newnodes["location"] != "":
- location = self.config._explode_unique(newnodes["location"])
- newnode.update(location)
- if len(hosts) > 1:
- index = ids.index(n)
- newnode["host"] = hosts[index]
- else:
- newnode["host"] = hosts[0]
- newnode["protocol"] = newnodes["protocol"]
- newnode["port"] = newnodes["port"]
- newnode["options"] = newnodes["options"]
- newnode["logs"] = newnodes["logs"]
- newnode["tags"] = newnodes["tags"]
- newnode["jumphost"] = newnodes["jumphost"]
- newnode["user"] = newnodes["user"]
- newnode["password"] = newnodes["password"]
- count +=1
- self.config._connections_add(**newnode)
- self.nodes = self.config._getallnodes()
- if count > 0:
- self.config._saveconfig(self.config.file)
- print("Succesfully added {} nodes".format(count))
- else:
- print("0 nodes added")
-
- def _completion(self, args):
- if args.data[0] == "bash":
- print(self._help("bashcompletion"))
- elif args.data[0] == "zsh":
- print(self._help("zshcompletion"))
-
- def _case(self, args):
- if args.data[0] == "true":
- args.data[0] = True
- elif args.data[0] == "false":
- args.data[0] = False
- self._change_settings(args.command, args.data[0])
-
- def _fzf(self, args):
- if args.data[0] == "true":
- args.data[0] = True
- elif args.data[0] == "false":
- args.data[0] = False
- self._change_settings(args.command, args.data[0])
-
- def _idletime(self, args):
- if args.data[0] < 0:
- args.data[0] = 0
- self._change_settings(args.command, args.data[0])
-
- def _configfolder(self, args):
- if not os.path.isdir(args.data[0]):
- raise argparse.ArgumentTypeError(f"readable_dir:{args.data[0]} is not a valid path")
- else:
- pathfile = self.config.defaultdir + "/.folder"
- folder = os.path.abspath(args.data[0]).rstrip('/')
- with open(pathfile, "w") as f:
- f.write(str(folder))
- print("Config saved")
-
- def _openai(self, args):
- if "openai" in self.config.config:
- openaikeys = self.config.config["openai"]
- else:
- openaikeys = {}
- openaikeys[args.command] = args.data[0]
- self._change_settings("openai", openaikeys)
-
-
- def _change_settings(self, name, value):
- self.config.config[name] = value
- self.config._saveconfig(self.config.file)
- print("Config saved")
-
- def _func_plugin(self, args):
- if args.add:
- if not os.path.exists(args.add[1]):
- print("File {} dosn't exists.".format(args.add[1]))
- exit(14)
- if args.add[0].isalpha() and args.add[0].islower() and len(args.add[0]) <= 15:
- disabled_dest_file = os.path.join(self.config.defaultdir + "/plugins", args.add[0] + ".py.bkp")
- if args.add[0] in self.commands or os.path.exists(disabled_dest_file):
- print("Plugin name can't be the same as other commands.")
- exit(15)
- else:
- check_bad_script = self.plugins.verify_script(args.add[1])
- if check_bad_script:
- print(check_bad_script)
- exit(16)
- else:
- try:
- dest_file = os.path.join(self.config.defaultdir + "/plugins", args.add[0] + ".py")
- shutil.copy2(args.add[1], dest_file)
- print(f"Plugin {args.add[0]} added succesfully.")
- except Exception as e:
- print(f"Failed importing plugin file. {e}")
- exit(17)
- else:
- print("Plugin name should be lowercase letters up to 15 characters.")
- exit(15)
- elif args.update:
- if not os.path.exists(args.update[1]):
- print("File {} dosn't exists.".format(args.update[1]))
- exit(14)
- plugin_file = os.path.join(self.config.defaultdir + "/plugins", args.update[0] + ".py")
- disabled_plugin_file = os.path.join(self.config.defaultdir + "/plugins", args.update[0] + ".py.bkp")
- plugin_exist = os.path.exists(plugin_file)
- disabled_plugin_exist = os.path.exists(disabled_plugin_file)
- if plugin_exist or disabled_plugin_exist:
- check_bad_script = self.plugins.verify_script(args.update[1])
- if check_bad_script:
- print(check_bad_script)
- exit(16)
- else:
- try:
- disabled_dest_file = os.path.join(self.config.defaultdir + "/plugins", args.update[0] + ".py.bkp")
- dest_file = os.path.join(self.config.defaultdir + "/plugins", args.update[0] + ".py")
- if disabled_plugin_exist:
- shutil.copy2(args.update[1], disabled_dest_file)
- else:
- shutil.copy2(args.update[1], dest_file)
- print(f"Plugin {args.update[0]} updated succesfully.")
- except Exception as e:
- print(f"Failed updating plugin file. {e}")
- exit(17)
-
- else:
- print("Plugin {} dosn't exist.".format(args.update[0]))
- exit(14)
- elif args.delete:
- plugin_file = os.path.join(self.config.defaultdir + "/plugins", args.delete[0] + ".py")
- disabled_plugin_file = os.path.join(self.config.defaultdir + "/plugins", args.delete[0] + ".py.bkp")
- plugin_exist = os.path.exists(plugin_file)
- disabled_plugin_exist = os.path.exists(disabled_plugin_file)
- if not plugin_exist and not disabled_plugin_exist:
- print("Plugin {} dosn't exist.".format(args.delete[0]))
- exit(14)
- question = [inquirer.Confirm("delete", message="Are you sure you want to delete {} plugin?".format(args.delete[0]))]
- confirm = inquirer.prompt(question)
- if confirm == None:
- exit(7)
- if confirm["delete"]:
- try:
- if plugin_exist:
- os.remove(plugin_file)
- elif disabled_plugin_exist:
- os.remove(disabled_plugin_file)
- print(f"plugin {args.delete[0]} deleted succesfully.")
- except Exception as e:
- print(f"Failed deleting plugin file. {e}")
- exit(17)
- elif args.disable:
- plugin_file = os.path.join(self.config.defaultdir + "/plugins", args.disable[0] + ".py")
- disabled_plugin_file = os.path.join(self.config.defaultdir + "/plugins", args.disable[0] + ".py.bkp")
- if not os.path.exists(plugin_file) or os.path.exists(disabled_plugin_file):
- print("Plugin {} dosn't exist or it's disabled.".format(args.disable[0]))
- exit(14)
- try:
- os.rename(plugin_file, disabled_plugin_file)
- print(f"plugin {args.disable[0]} disabled succesfully.")
- except Exception as e:
- print(f"Failed disabling plugin file. {e}")
- exit(17)
- elif args.enable:
- plugin_file = os.path.join(self.config.defaultdir + "/plugins", args.enable[0] + ".py")
- disabled_plugin_file = os.path.join(self.config.defaultdir + "/plugins", args.enable[0] + ".py.bkp")
- if os.path.exists(plugin_file) or not os.path.exists(disabled_plugin_file):
- print("Plugin {} dosn't exist or it's enabled.".format(args.enable[0]))
- exit(14)
- try:
- os.rename(disabled_plugin_file, plugin_file)
- print(f"plugin {args.enable[0]} enabled succesfully.")
- except Exception as e:
- print(f"Failed enabling plugin file. {e}")
- exit(17)
- elif args.list:
- enabled_files = []
- disabled_files = []
- plugins = {}
-
- # Iterate over all files in the specified folder
- for file in os.listdir(self.config.defaultdir + "/plugins"):
- # Check if the file is a Python file
- if file.endswith('.py'):
- enabled_files.append(os.path.splitext(file)[0])
- # Check if the file is a Python backup file
- elif file.endswith('.py.bkp'):
- disabled_files.append(os.path.splitext(os.path.splitext(file)[0])[0])
- if enabled_files:
- plugins["Enabled"] = enabled_files
- if disabled_files:
- plugins["Disabled"] = disabled_files
- if plugins:
- print(yaml.dump(plugins, sort_keys=False))
- else:
- print("There are no plugins added.")
-
-
-
-
- def _func_import(self, args):
- if not os.path.exists(args.data[0]):
- print("File {} dosn't exist".format(args.data[0]))
- exit(14)
- print("This could overwrite your current configuration!")
- question = [inquirer.Confirm("import", message="Are you sure you want to import {} file?".format(args.data[0]))]
- confirm = inquirer.prompt(question)
- if confirm == None:
- exit(7)
- if confirm["import"]:
- try:
- with open(args.data[0]) as file:
- imported = yaml.load(file, Loader=yaml.FullLoader)
- except:
- print("failed reading file {}".format(args.data[0]))
- exit(10)
- for k,v in imported.items():
- uniques = self.config._explode_unique(k)
- if "folder" in uniques:
- folder = f"@{uniques['folder']}"
- matches = list(filter(lambda k: k == folder, self.folders))
- if len(matches) == 0:
- uniquefolder = self.config._explode_unique(folder)
- self.config._folder_add(**uniquefolder)
- if "subfolder" in uniques:
- subfolder = f"@{uniques['subfolder']}@{uniques['folder']}"
- matches = list(filter(lambda k: k == subfolder, self.folders))
- if len(matches) == 0:
- uniquesubfolder = self.config._explode_unique(subfolder)
- self.config._folder_add(**uniquesubfolder)
- uniques.update(v)
- self.config._connections_add(**uniques)
- self.config._saveconfig(self.config.file)
- print("File {} imported succesfully".format(args.data[0]))
- return
-
- def _func_export(self, args):
- if os.path.exists(args.data[0]):
- print("File {} already exists".format(args.data[0]))
- exit(14)
- if len(args.data[1:]) == 0:
- foldercons = self.config._getallnodesfull(extract = False)
- else:
- for folder in args.data[1:]:
- matches = list(filter(lambda k: k == folder, self.folders))
- if len(matches) == 0 and folder != "@":
- print("{} folder not found".format(folder))
- exit(2)
- foldercons = self.config._getallnodesfull(args.data[1:], extract = False)
- with open(args.data[0], "w") as file:
- yaml.dump(foldercons, file, Dumper=NoAliasDumper, default_flow_style=False)
- file.close()
- print("File {} generated succesfully".format(args.data[0]))
- exit()
- return
-
- def _func_run(self, args):
- if len(args.data) > 1:
- args.action = "noderun"
- actions = {"noderun": self._node_run, "generate": self._yaml_generate, "run": self._yaml_run}
- return actions.get(args.action)(args)
-
- def _func_ai(self, args):
- arguments = {}
- if args.model:
- arguments["model"] = args.model[0]
- if args.org:
- arguments["org"] = args.org[0]
- if args.api_key:
- arguments["api_key"] = args.api_key[0]
- self.myai = ai(self.config, **arguments)
- if args.ask:
- input = " ".join(args.ask)
- request = self.myai.ask(input, dryrun = True)
- if not request["app_related"]:
- mdprint(Markdown(request["response"]))
- print("\r")
- else:
- if request["action"] == "list_nodes":
- if request["filter"]:
- nodes = self.config._getallnodes(request["filter"])
- else:
- nodes = self.config._getallnodes()
- list = "\n".join(nodes)
- print(list)
- else:
- yaml_data = yaml.dump(request["task"])
- confirmation = f"I'm going to run the following task:\n```{yaml_data}```"
- mdprint(Markdown(confirmation))
- question = [inquirer.Confirm("task", message="Are you sure you want to continue?")]
- print("\r")
- confirm = inquirer.prompt(question)
- if confirm == None:
- exit(7)
- if confirm["task"]:
- script = {}
- script["name"] = "RESULT"
- script["output"] = "stdout"
- script["nodes"] = request["nodes"]
- script["action"] = request["action"]
- if "expected" in request:
- script["expected"] = request["expected"]
- script.update(request["args"])
- self._cli_run(script)
- else:
- history = None
- mdprint(Markdown("**Chatbot**: Hi! How can I help you today?\n\n---"))
- while True:
- questions = [
- inquirer.Text('message', message="User", validate=self._ai_validation),
- ]
- answers = inquirer.prompt(questions)
- if answers == None:
- exit(7)
- response, history = self._process_input(answers["message"], history)
- mdprint(Markdown(f"""**Chatbot**:\n{response}\n\n---"""))
- return
-
-
- def _ai_validation(self, answers, current, regex = "^.+$"):
- #Validate ai user chat.
- if not re.match(regex, current):
- raise inquirer.errors.ValidationError("", reason="Can't send empty messages")
- return True
-
- def _process_input(self, input, history):
- response = self.myai.ask(input , chat_history = history, dryrun = True)
- if not response["app_related"]:
- try:
- if not history:
- history = []
- history.extend(response["chat_history"])
- except:
- if not history:
- history = None
- return response["response"], history
- else:
- history = None
- if response["action"] == "list_nodes":
- if response["filter"]:
- nodes = self.config._getallnodes(response["filter"])
- else:
- nodes = self.config._getallnodes()
- list = "\n".join(nodes)
- response = f"```{list}\n```"
- else:
- yaml_data = yaml.dump(response["task"])
- confirmresponse = f"I'm going to run the following task:\n```{yaml_data}```\nPlease confirm"
- while True:
- mdprint(Markdown(f"""**Chatbot**:\n{confirmresponse}"""))
- questions = [
- inquirer.Text('message', message="User", validate=self._ai_validation),
- ]
- answers = inquirer.prompt(questions)
- if answers == None:
- exit(7)
- confirmation = self.myai.confirm(answers["message"])
- if isinstance(confirmation, bool):
- if not confirmation:
- response = "Request cancelled"
- else:
- nodes = self.connnodes(self.config.getitems(response["nodes"]), config = self.config)
- if response["action"] == "run":
- output = nodes.run(**response["args"])
- response = ""
- elif response["action"] == "test":
- result = nodes.test(**response["args"])
- yaml_result = yaml.dump(result,default_flow_style=False, indent=4)
- output = nodes.output
- response = f"This is the result for your test:\n```\n{yaml_result}\n```"
- for k,v in output.items():
- response += f"\n***{k}***:\n```\n{v}\n```\n"
- break
- return response, history
-
- def _func_api(self, args):
- if args.command == "stop" or args.command == "restart":
- args.data = stop_api()
- if args.command == "start" or args.command == "restart":
- if args.data:
- start_api(args.data)
- else:
- start_api()
- if args.command == "debug":
- if args.data:
- debug_api(args.data)
- else:
- debug_api()
- return
-
- def _node_run(self, args):
- command = " ".join(args.data[1:])
- script = {}
- script["name"] = "Output"
- script["action"] = "run"
- script["nodes"] = args.data[0]
- script["commands"] = [command]
- script["output"] = "stdout"
- self._cli_run(script)
-
- def _yaml_generate(self, args):
- if os.path.exists(args.data[0]):
- print("File {} already exists".format(args.data[0]))
- exit(14)
- else:
- with open(args.data[0], "w") as file:
- file.write(self._help("generate"))
- file.close()
- print("File {} generated succesfully".format(args.data[0]))
- exit()
-
- def _yaml_run(self, args):
- try:
- with open(args.data[0]) as file:
- scripts = yaml.load(file, Loader=yaml.FullLoader)
- except:
- print("failed reading file {}".format(args.data[0]))
- exit(10)
- for script in scripts["tasks"]:
- self._cli_run(script)
-
-
- def _cli_run(self, script):
- args = {}
- try:
- action = script["action"]
- nodelist = script["nodes"]
- args["commands"] = script["commands"]
- output = script["output"]
- if action == "test":
- args["expected"] = script["expected"]
- except KeyError as e:
- print("'{}' is mandatory".format(e.args[0]))
- exit(11)
- nodes = self.config._getallnodes(nodelist)
- if len(nodes) == 0:
- print("{} don't match any node".format(nodelist))
- exit(2)
- nodes = self.connnodes(self.config.getitems(nodes), config = self.config)
- stdout = False
- if output is None:
- pass
- elif output == "stdout":
- stdout = True
- elif isinstance(output, str) and action == "run":
- args["folder"] = output
- if "variables" in script:
- args["vars"] = script["variables"]
- if "vars" in script:
- args["vars"] = script["vars"]
- try:
- options = script["options"]
- thisoptions = {k: v for k, v in options.items() if k in ["prompt", "parallel", "timeout"]}
- args.update(thisoptions)
- except:
- options = None
- try:
- size = str(os.get_terminal_size())
- p = re.search(r'.*columns=([0-9]+)', size)
- columns = int(p.group(1))
- except:
- columns = 80
- if action == "run":
- nodes.run(**args)
- print(script["name"].upper() + "-" * (columns - len(script["name"])))
- for i in nodes.status.keys():
- print(" " + i + " " + "-" * (columns - len(i) - 13) + (" PASS(0)" if nodes.status[i] == 0 else " FAIL({})".format(nodes.status[i])))
- if stdout:
- for line in nodes.output[i].splitlines():
- print(" " + line)
- elif action == "test":
- nodes.test(**args)
- print(script["name"].upper() + "-" * (columns - len(script["name"])))
- for i in nodes.status.keys():
- print(" " + i + " " + "-" * (columns - len(i) - 13) + (" PASS(0)" if nodes.status[i] == 0 else " FAIL({})".format(nodes.status[i])))
- if nodes.status[i] == 0:
- max_length = max(len(s) for s in nodes.result[i].keys())
- for k,v in nodes.result[i].items():
- print(" TEST for '{}'".format(k) + " "*(max_length - len(k) + 1) + "--> " + str(v).upper())
- if stdout:
- if nodes.status[i] == 0:
- print(" " + "-" * (max_length + 21))
- for line in nodes.output[i].splitlines():
- print(" " + line)
- else:
- print("Wrong action '{}'".format(action))
- exit(13)
-
- def _choose(self, list, name, action):
- #Generates an inquirer list to pick
- if FzfPrompt and self.fzf:
- fzf = FzfPrompt(executable_path="fzf-tmux")
- if not self.case:
- fzf = FzfPrompt(executable_path="fzf-tmux -i")
- answer = fzf.prompt(list, fzf_options="-d 25%")
- if len(answer) == 0:
- return
- else:
- return answer[0]
- else:
- questions = [inquirer.List(name, message="Pick {} to {}:".format(name,action), choices=list, carousel=True)]
- answer = inquirer.prompt(questions)
- if answer == None:
- return
- else:
- return answer[name]
-
- def _host_validation(self, answers, current, regex = "^.+$"):
- #Validate hostname in inquirer when managing nodes
- if not re.match(regex, current):
- raise inquirer.errors.ValidationError("", reason="Host cannot be empty")
- if current.startswith("@"):
- if current[1:] not in self.profiles:
- raise inquirer.errors.ValidationError("", reason="Profile {} don't exist".format(current))
- return True
-
- def _profile_protocol_validation(self, answers, current, regex = "(^ssh$|^telnet$|^$)"):
- #Validate protocol in inquirer when managing profiles
- if not re.match(regex, current):
- raise inquirer.errors.ValidationError("", reason="Pick between ssh, telnet or leave empty")
- return True
-
- def _protocol_validation(self, answers, current, regex = "(^ssh$|^telnet$|^$|^@.+$)"):
- #Validate protocol in inquirer when managing nodes
- if not re.match(regex, current):
- raise inquirer.errors.ValidationError("", reason="Pick between ssh, telnet, leave empty or @profile")
- if current.startswith("@"):
- if current[1:] not in self.profiles:
- raise inquirer.errors.ValidationError("", reason="Profile {} don't exist".format(current))
- return True
-
- def _profile_port_validation(self, answers, current, regex = "(^[0-9]*$)"):
- #Validate port in inquirer when managing profiles
- if not re.match(regex, current):
- raise inquirer.errors.ValidationError("", reason="Pick a port between 1-65535, @profile o leave empty")
- try:
- port = int(current)
- except:
- port = 0
- if current != "" and not 1 <= int(port) <= 65535:
- raise inquirer.errors.ValidationError("", reason="Pick a port between 1-65535 or leave empty")
- return True
-
- def _port_validation(self, answers, current, regex = "(^[0-9]*$|^@.+$)"):
- #Validate port in inquirer when managing nodes
- if not re.match(regex, current):
- raise inquirer.errors.ValidationError("", reason="Pick a port between 1-65535, @profile or leave empty")
- try:
- port = int(current)
- except:
- port = 0
- if current.startswith("@"):
- if current[1:] not in self.profiles:
- raise inquirer.errors.ValidationError("", reason="Profile {} don't exist".format(current))
- elif current != "" and not 1 <= int(port) <= 65535:
- raise inquirer.errors.ValidationError("", reason="Pick a port between 1-65535, @profile o leave empty")
- return True
-
- def _pass_validation(self, answers, current, regex = "(^@.+$)"):
- #Validate password in inquirer
- profiles = current.split(",")
- for i in profiles:
- if not re.match(regex, i) or i[1:] not in self.profiles:
- raise inquirer.errors.ValidationError("", reason="Profile {} don't exist".format(i))
- return True
-
- def _tags_validation(self, answers, current):
- #Validation for Tags in inquirer when managing nodes
- if current.startswith("@"):
- if current[1:] not in self.profiles:
- raise inquirer.errors.ValidationError("", reason="Profile {} don't exist".format(current))
- elif current != "":
- isdict = False
- try:
- isdict = ast.literal_eval(current)
- except:
- pass
- if not isinstance (isdict, dict):
- raise inquirer.errors.ValidationError("", reason="Tags should be a python dictionary.".format(current))
- return True
-
- def _profile_tags_validation(self, answers, current):
- #Validation for Tags in inquirer when managing profiles
- if current != "":
- isdict = False
- try:
- isdict = ast.literal_eval(current)
- except:
- pass
- if not isinstance (isdict, dict):
- raise inquirer.errors.ValidationError("", reason="Tags should be a python dictionary.".format(current))
- return True
-
- def _jumphost_validation(self, answers, current):
- #Validation for Jumphost in inquirer when managing nodes
- if current.startswith("@"):
- if current[1:] not in self.profiles:
- raise inquirer.errors.ValidationError("", reason="Profile {} don't exist".format(current))
- elif current != "":
- if current not in self.nodes :
- raise inquirer.errors.ValidationError("", reason="Node {} don't exist.".format(current))
- return True
-
- def _profile_jumphost_validation(self, answers, current):
- #Validation for Jumphost in inquirer when managing profiles
- if current != "":
- if current not in self.nodes :
- raise inquirer.errors.ValidationError("", reason="Node {} don't exist.".format(current))
- return True
-
- def _default_validation(self, answers, current):
- #Default validation type used in multiples questions in inquirer
- if current.startswith("@"):
- if current[1:] not in self.profiles:
- raise inquirer.errors.ValidationError("", reason="Profile {} don't exist".format(current))
- return True
-
- def _bulk_node_validation(self, answers, current, regex = "^[0-9a-zA-Z_.,$#-]+$"):
- #Validation of nodes when running bulk command
- if not re.match(regex, current):
- raise inquirer.errors.ValidationError("", reason="Host cannot be empty")
- if current.startswith("@"):
- if current[1:] not in self.profiles:
- raise inquirer.errors.ValidationError("", reason="Profile {} don't exist".format(current))
- return True
-
- def _bulk_folder_validation(self, answers, current):
- #Validation of folders when running bulk command
- if not self.case:
- current = current.lower()
- matches = list(filter(lambda k: k == current, self.folders))
- if current != "" and len(matches) == 0:
- raise inquirer.errors.ValidationError("", reason="Location {} don't exist".format(current))
- return True
-
- def _bulk_host_validation(self, answers, current, regex = "^.+$"):
- #Validate hostname when running bulk command
- if not re.match(regex, current):
- raise inquirer.errors.ValidationError("", reason="Host cannot be empty")
- if current.startswith("@"):
- if current[1:] not in self.profiles:
- raise inquirer.errors.ValidationError("", reason="Profile {} don't exist".format(current))
- hosts = current.split(",")
- nodes = answers["ids"].split(",")
- if len(hosts) > 1 and len(hosts) != len(nodes):
- raise inquirer.errors.ValidationError("", reason="Hosts list should be the same length of nodes list")
- return True
-
- def _questions_edit(self):
- #Inquirer questions when editing nodes or profiles
- questions = []
- questions.append(inquirer.Confirm("host", message="Edit Hostname/IP?"))
- questions.append(inquirer.Confirm("protocol", message="Edit Protocol?"))
- questions.append(inquirer.Confirm("port", message="Edit Port?"))
- questions.append(inquirer.Confirm("options", message="Edit Options?"))
- questions.append(inquirer.Confirm("logs", message="Edit logging path/file?"))
- questions.append(inquirer.Confirm("tags", message="Edit tags?"))
- questions.append(inquirer.Confirm("jumphost", message="Edit jumphost?"))
- questions.append(inquirer.Confirm("user", message="Edit User?"))
- questions.append(inquirer.Confirm("password", message="Edit password?"))
- answers = inquirer.prompt(questions)
- return answers
-
- def _questions_nodes(self, unique, uniques = None, edit = None):
- #Questions when adding or editing nodes
- try:
- defaults = self.config.getitem(unique)
- if "tags" not in defaults:
- defaults["tags"] = ""
- if "jumphost" not in defaults:
- defaults["jumphost"] = ""
- except:
- defaults = { "host":"", "protocol":"", "port":"", "user":"", "options":"", "logs":"" , "tags":"", "password":"", "jumphost":""}
- node = {}
- if edit == None:
- edit = { "host":True, "protocol":True, "port":True, "user":True, "password": True,"options":True, "logs":True, "tags":True, "jumphost":True }
- questions = []
- if edit["host"]:
- questions.append(inquirer.Text("host", message="Add Hostname or IP", validate=self._host_validation, default=defaults["host"]))
- else:
- node["host"] = defaults["host"]
- if edit["protocol"]:
- questions.append(inquirer.Text("protocol", message="Select Protocol", validate=self._protocol_validation, default=defaults["protocol"]))
- else:
- node["protocol"] = defaults["protocol"]
- if edit["port"]:
- questions.append(inquirer.Text("port", message="Select Port Number", validate=self._port_validation, default=defaults["port"]))
- else:
- node["port"] = defaults["port"]
- if edit["options"]:
- questions.append(inquirer.Text("options", message="Pass extra options to protocol", validate=self._default_validation, default=defaults["options"]))
- else:
- node["options"] = defaults["options"]
- if edit["logs"]:
- questions.append(inquirer.Text("logs", message="Pick logging path/file ", validate=self._default_validation, default=defaults["logs"].replace("{","{{").replace("}","}}")))
- else:
- node["logs"] = defaults["logs"]
- if edit["tags"]:
- questions.append(inquirer.Text("tags", message="Add tags dictionary", validate=self._tags_validation, default=str(defaults["tags"]).replace("{","{{").replace("}","}}")))
- else:
- node["tags"] = defaults["tags"]
- if edit["jumphost"]:
- questions.append(inquirer.Text("jumphost", message="Add Jumphost node", validate=self._jumphost_validation, default=str(defaults["jumphost"]).replace("{","{{").replace("}","}}")))
- else:
- node["jumphost"] = defaults["jumphost"]
- if edit["user"]:
- questions.append(inquirer.Text("user", message="Pick username", validate=self._default_validation, default=defaults["user"]))
- else:
- node["user"] = defaults["user"]
- if edit["password"]:
- questions.append(inquirer.List("password", message="Password: Use a local password, no password or a list of profiles to reference?", choices=["Local Password", "Profiles", "No Password"]))
- else:
- node["password"] = defaults["password"]
- answer = inquirer.prompt(questions)
- if answer == None:
- return False
- if "password" in answer.keys():
- if answer["password"] == "Local Password":
- passq = [inquirer.Password("password", message="Set Password")]
- passa = inquirer.prompt(passq)
- if passa == None:
- return False
- answer["password"] = self.encrypt(passa["password"])
- elif answer["password"] == "Profiles":
- passq = [(inquirer.Text("password", message="Set a @profile or a comma separated list of @profiles", validate=self._pass_validation))]
- passa = inquirer.prompt(passq)
- if passa == None:
- return False
- answer["password"] = passa["password"].split(",")
- elif answer["password"] == "No Password":
- answer["password"] = ""
- if "tags" in answer.keys() and not answer["tags"].startswith("@") and answer["tags"]:
- answer["tags"] = ast.literal_eval(answer["tags"])
- result = {**uniques, **answer, **node}
- result["type"] = "connection"
- return result
-
- def _questions_profiles(self, unique, edit = None):
- #Questions when adding or editing profiles
- try:
- defaults = self.config.profiles[unique]
- if "tags" not in defaults:
- defaults["tags"] = ""
- if "jumphost" not in defaults:
- defaults["jumphost"] = ""
- except:
- defaults = { "host":"", "protocol":"", "port":"", "user":"", "options":"", "logs":"", "tags": "", "jumphost": ""}
- profile = {}
- if edit == None:
- edit = { "host":True, "protocol":True, "port":True, "user":True, "password": True,"options":True, "logs":True, "tags":True, "jumphost":True }
- questions = []
- if edit["host"]:
- questions.append(inquirer.Text("host", message="Add Hostname or IP", default=defaults["host"]))
- else:
- profile["host"] = defaults["host"]
- if edit["protocol"]:
- questions.append(inquirer.Text("protocol", message="Select Protocol", validate=self._profile_protocol_validation, default=defaults["protocol"]))
- else:
- profile["protocol"] = defaults["protocol"]
- if edit["port"]:
- questions.append(inquirer.Text("port", message="Select Port Number", validate=self._profile_port_validation, default=defaults["port"]))
- else:
- profile["port"] = defaults["port"]
- if edit["options"]:
- questions.append(inquirer.Text("options", message="Pass extra options to protocol", default=defaults["options"]))
- else:
- profile["options"] = defaults["options"]
- if edit["logs"]:
- questions.append(inquirer.Text("logs", message="Pick logging path/file ", default=defaults["logs"].replace("{","{{").replace("}","}}")))
- else:
- profile["logs"] = defaults["logs"]
- if edit["tags"]:
- questions.append(inquirer.Text("tags", message="Add tags dictionary", validate=self._profile_tags_validation, default=str(defaults["tags"]).replace("{","{{").replace("}","}}")))
- else:
- profile["tags"] = defaults["tags"]
- if edit["jumphost"]:
- questions.append(inquirer.Text("jumphost", message="Add Jumphost node", validate=self._profile_jumphost_validation, default=str(defaults["jumphost"]).replace("{","{{").replace("}","}}")))
- else:
- profile["jumphost"] = defaults["jumphost"]
- if edit["user"]:
- questions.append(inquirer.Text("user", message="Pick username", default=defaults["user"]))
- else:
- profile["user"] = defaults["user"]
- if edit["password"]:
- questions.append(inquirer.Password("password", message="Set Password"))
- else:
- profile["password"] = defaults["password"]
- answer = inquirer.prompt(questions)
- if answer == None:
- return False
- if "password" in answer.keys():
- if answer["password"] != "":
- answer["password"] = self.encrypt(answer["password"])
- if "tags" in answer.keys() and answer["tags"]:
- answer["tags"] = ast.literal_eval(answer["tags"])
- result = {**answer, **profile}
- result["id"] = unique
- return result
-
- def _questions_bulk(self):
- #Questions when using bulk command
- questions = []
- questions.append(inquirer.Text("ids", message="add a comma separated list of nodes to add", validate=self._bulk_node_validation))
- questions.append(inquirer.Text("location", message="Add a @folder, @subfolder@folder or leave empty", validate=self._bulk_folder_validation))
- questions.append(inquirer.Text("host", message="Add comma separated list of Hostnames or IPs", validate=self._bulk_host_validation))
- questions.append(inquirer.Text("protocol", message="Select Protocol", validate=self._protocol_validation))
- questions.append(inquirer.Text("port", message="Select Port Number", validate=self._port_validation))
- questions.append(inquirer.Text("options", message="Pass extra options to protocol", validate=self._default_validation))
- questions.append(inquirer.Text("logs", message="Pick logging path/file ", validate=self._default_validation))
- questions.append(inquirer.Text("tags", message="Add tags dictionary", validate=self._tags_validation))
- questions.append(inquirer.Text("jumphost", message="Add Jumphost node", validate=self._jumphost_validation))
- questions.append(inquirer.Text("user", message="Pick username", validate=self._default_validation))
- questions.append(inquirer.List("password", message="Password: Use a local password, no password or a list of profiles to reference?", choices=["Local Password", "Profiles", "No Password"]))
- answer = inquirer.prompt(questions)
- if answer == None:
- return False
- if "password" in answer.keys():
- if answer["password"] == "Local Password":
- passq = [inquirer.Password("password", message="Set Password")]
- passa = inquirer.prompt(passq)
- answer["password"] = self.encrypt(passa["password"])
- elif answer["password"] == "Profiles":
- passq = [(inquirer.Text("password", message="Set a @profile or a comma separated list of @profiles", validate=self._pass_validation))]
- passa = inquirer.prompt(passq)
- answer["password"] = passa["password"].split(",")
- elif answer["password"] == "No Password":
- answer["password"] = ""
- answer["type"] = "connection"
- if "tags" in answer.keys() and not answer["tags"].startswith("@") and answer["tags"]:
- answer["tags"] = ast.literal_eval(answer["tags"])
- return answer
-
- def _type_node(self, arg_value, pat=re.compile(r"^[0-9a-zA-Z_.$@#-]+$")):
- if arg_value == None:
- raise ValueError("Missing argument node")
- if not pat.match(arg_value):
- raise ValueError(f"Argument error: {arg_value}")
- return arg_value
-
- def _type_profile(self, arg_value, pat=re.compile(r"^[0-9a-zA-Z_.$#-]+$")):
- if not pat.match(arg_value):
- raise ValueError
- return arg_value
-
- def _help(self, type, parsers = None):
- #Store text for help and other commands
- if type == "node":
- return "node[@subfolder][@folder]\nConnect to specific node or show all matching nodes\n[@subfolder][@folder]\nShow all available connections globally or in specified path"
- if type == "usage":
- commands = []
- for subcommand, subparser in parsers.choices.items():
- if subparser.description != None:
- commands.append(subcommand)
- commands = ",".join(commands)
- usage_help = f"conn [-h] [--add | --del | --mod | --show | --debug] [node|folder] [--sftp]\n conn {{{commands}}} ..."
- return usage_help
- if type == "end":
- help_dict = {}
- for subcommand, subparser in parsers.choices.items():
- if subparser.description == None and help_dict:
- previous_key = next(reversed(help_dict.keys()))
- help_dict[f"{previous_key}({subcommand})"] = help_dict.pop(previous_key)
- else:
- help_dict[subcommand] = subparser.description
- subparser.description = None
- commands_help = "Commands:\n"
- commands_help += "\n".join([f" {cmd:<15} {help_text}" for cmd, help_text in help_dict.items() if help_text != None])
- return commands_help
- if type == "bashcompletion":
- return '''
-#Here starts bash completion for conn
-_conn()
-{
- mapfile -t strings < <(connpy-completion-helper "bash" "${#COMP_WORDS[@]}" "${COMP_WORDS[@]}")
- local IFS=$'\t\n'
- local home_dir=$(eval echo ~)
- local last_word=${COMP_WORDS[-1]/\~/$home_dir}
- COMPREPLY=($(compgen -W "$(printf '%s' "${strings[@]}")" -- "$last_word"))
- if [ "$last_word" != "${COMP_WORDS[-1]}" ]; then
- COMPREPLY=(${COMPREPLY[@]/$home_dir/\~})
- fi
-}
-
-complete -o nospace -o nosort -F _conn conn
-complete -o nospace -o nosort -F _conn connpy
-#Here ends bash completion for conn
- '''
- if type == "zshcompletion":
- return '''
-#Here starts zsh completion for conn
-autoload -U compinit && compinit
-_conn()
-{
- local home_dir=$(eval echo ~)
- last_word=${words[-1]/\~/$home_dir}
- strings=($(connpy-completion-helper "zsh" ${#words} $words[1,-2] $last_word))
- for string in "${strings[@]}"; do
- #Replace the expanded home directory with ~
- if [ "$last_word" != "$words[-1]" ]; then
- string=${string/$home_dir/\~}
- fi
- if [[ "${string}" =~ .*/$ ]]; then
- # If the string ends with a '/', do not append a space
- compadd -Q -S '' -- "$string"
- else
- # If the string does not end with a '/', append a space
- compadd -Q -S ' ' -- "$string"
- fi
- done
-}
-compdef _conn conn
-compdef _conn connpy
-#Here ends zsh completion for conn
- '''
- if type == "run":
- return "node[@subfolder][@folder] commmand to run\nRun the specific command on the node and print output\n/path/to/file.yaml\nUse a yaml file to run an automation script"
- if type == "generate":
- return '''---
-tasks:
-- name: "Config"
-
- action: 'run' #Action can be test or run. Mandatory
-
- nodes: #List of nodes to work on. Mandatory
- - 'router1@office' #You can add specific nodes
- - '@aws' #entire folders or subfolders
- - '@office': #or filter inside a folder or subfolder
- - 'router2'
- - 'router7'
-
- commands: #List of commands to send, use {name} to pass variables
- - 'term len 0'
- - 'conf t'
- - 'interface {if}'
- - 'ip address 10.100.100.{id} 255.255.255.255'
- - '{commit}'
- - 'end'
-
- variables: #Variables to use on commands and expected. Optional
- __global__: #Global variables to use on all nodes, fallback if missing in the node.
- commit: ''
- if: 'loopback100'
- router1@office:
- id: 1
- router2@office:
- id: 2
- commit: 'commit'
- router3@office:
- id: 3
- vrouter1@aws:
- id: 4
- vrouterN@aws:
- id: 5
-
- output: /home/user/logs #Type of output, if null you only get Connection and test result. Choices are: null,stdout,/path/to/folder. Folder path only works on 'run' action.
-
- options:
- prompt: r'>$|#$|\$$|>.$|#.$|\$.$' #Optional prompt to check on your devices, default should work on most devices.
- parallel: 10 #Optional number of nodes to run commands on parallel. Default 10.
- timeout: 20 #Optional time to wait in seconds for prompt, expected or EOF. Default 20.
-
-- name: "TestConfig"
- action: 'test'
- nodes:
- - 'router1@office'
- - '@aws'
- - '@office':
- - 'router2'
- - 'router7'
- commands:
- - 'ping 10.100.100.{id}'
- expected: '!' #Expected text to find when running test action. Mandatory for 'test'
- variables:
- router1@office:
- id: 1
- router2@office:
- id: 2
- commit: 'commit'
- router3@office:
- id: 3
- vrouter1@aws:
- id: 4
- vrouterN@aws:
- id: 5
- output: null
-...'''
-
- def encrypt(self, password, keyfile=None):
- '''
- Encrypts password using RSA keyfile
-
- ### Parameters:
-
- - password (str): Plaintext password to encrypt.
-
- ### Optional Parameters:
-
- - keyfile (str): Path/file to keyfile. Default is config keyfile.
-
-
- ### Returns:
-
- str: Encrypted password.
-
- '''
- if keyfile is None:
- keyfile = self.config.key
- with open(keyfile) as f:
- key = RSA.import_key(f.read())
- f.close()
- publickey = key.publickey()
- encryptor = PKCS1_OAEP.new(publickey)
- password = encryptor.encrypt(password.encode("utf-8"))
- return str(password)
-
-Methods
-
-
-def encrypt(self, password, keyfile=None)
-
--
-
Encrypts password using RSA keyfile
-Parameters:
-- password (str): Plaintext password to encrypt.
-
-Optional Parameters:
-- keyfile (str): Path/file to keyfile. Default is config keyfile.
-
-Returns:
-str: Encrypted password.
-
-
-
-Expand source code
-
-def encrypt(self, password, keyfile=None):
- '''
- Encrypts password using RSA keyfile
-
- ### Parameters:
-
- - password (str): Plaintext password to encrypt.
-
- ### Optional Parameters:
-
- - keyfile (str): Path/file to keyfile. Default is config keyfile.
-
-
- ### Returns:
-
- str: Encrypted password.
-
- '''
- if keyfile is None:
- keyfile = self.config.key
- with open(keyfile) as f:
- key = RSA.import_key(f.read())
- f.close()
- publickey = key.publickey()
- encryptor = PKCS1_OAEP.new(publickey)
- password = encryptor.encrypt(password.encode("utf-8"))
- return str(password)
-
-
-
-def start(self, argv=['--html', 'connpy', '-o', 'docs', '--force'])
-
--
-
Parameters:
-- argv (list): List of arguments to pass to the app.
- Default: sys.argv[1:]
-
-
-
-Expand source code
-
-def start(self,argv = sys.argv[1:]):
- '''
-
- ### Parameters:
-
- - argv (list): List of arguments to pass to the app.
- Default: sys.argv[1:]
-
- '''
- #DEFAULTPARSER
- defaultparser = argparse.ArgumentParser(prog = "conn", description = "SSH and Telnet connection manager", formatter_class=argparse.RawTextHelpFormatter)
- subparsers = defaultparser.add_subparsers(title="Commands", dest="subcommand")
- #NODEPARSER
- nodeparser = subparsers.add_parser("node", formatter_class=argparse.RawTextHelpFormatter)
- nodecrud = nodeparser.add_mutually_exclusive_group()
- nodeparser.add_argument("node", metavar="node|folder", nargs='?', default=None, action=self._store_type, help=self._help("node"))
- nodecrud.add_argument("-v","--version", dest="action", action="store_const", help="Show version", const="version", default="connect")
- nodecrud.add_argument("-a","--add", dest="action", action="store_const", help="Add new node[@subfolder][@folder] or [@subfolder]@folder", const="add", default="connect")
- nodecrud.add_argument("-r","--del", "--rm", dest="action", action="store_const", help="Delete node[@subfolder][@folder] or [@subfolder]@folder", const="del", default="connect")
- nodecrud.add_argument("-e","--mod", "--edit", dest="action", action="store_const", help="Modify node[@subfolder][@folder]", const="mod", default="connect")
- nodecrud.add_argument("-s","--show", dest="action", action="store_const", help="Show node[@subfolder][@folder]", const="show", default="connect")
- nodecrud.add_argument("-d","--debug", dest="debug", action="store_true", help="Display all conections steps")
- nodeparser.add_argument("-t","--sftp", dest="sftp", action="store_true", help="Connects using sftp instead of ssh")
- nodeparser.set_defaults(func=self._func_node)
- #PROFILEPARSER
- profileparser = subparsers.add_parser("profile", description="Manage profiles")
- profileparser.add_argument("profile", nargs=1, action=self._store_type, type=self._type_profile, help="Name of profile to manage")
- profilecrud = profileparser.add_mutually_exclusive_group(required=True)
- profilecrud.add_argument("-a", "--add", dest="action", action="store_const", help="Add new profile", const="add")
- profilecrud.add_argument("-r", "--del", "--rm", dest="action", action="store_const", help="Delete profile", const="del")
- profilecrud.add_argument("-e", "--mod", "--edit", dest="action", action="store_const", help="Modify profile", const="mod")
- profilecrud.add_argument("-s", "--show", dest="action", action="store_const", help="Show profile", const="show")
- profileparser.set_defaults(func=self._func_profile)
- #MOVEPARSER
- moveparser = subparsers.add_parser("move", aliases=["mv"], description="Move node")
- moveparser.add_argument("move", nargs=2, action=self._store_type, help="Move node[@subfolder][@folder] dest_node[@subfolder][@folder]", default="move", type=self._type_node)
- moveparser.set_defaults(func=self._func_others)
- #COPYPARSER
- copyparser = subparsers.add_parser("copy", aliases=["cp"], description="Copy node")
- copyparser.add_argument("cp", nargs=2, action=self._store_type, help="Copy node[@subfolder][@folder] new_node[@subfolder][@folder]", default="cp", type=self._type_node)
- copyparser.set_defaults(func=self._func_others)
- #LISTPARSER
- lsparser = subparsers.add_parser("list", aliases=["ls"], description="List profiles, nodes or folders")
- lsparser.add_argument("ls", action=self._store_type, choices=["profiles","nodes","folders"], help="List profiles, nodes or folders", default=False)
- lsparser.add_argument("--filter", nargs=1, help="Filter results")
- lsparser.add_argument("--format", nargs=1, help="Format of the output of nodes using {name}, {NAME}, {location}, {LOCATION}, {host} and {HOST}")
- lsparser.set_defaults(func=self._func_others)
- #BULKPARSER
- bulkparser = subparsers.add_parser("bulk", description="Add nodes in bulk")
- bulkparser.add_argument("bulk", const="bulk", nargs=0, action=self._store_type, help="Add nodes in bulk")
- bulkparser.set_defaults(func=self._func_others)
- # EXPORTPARSER
- exportparser = subparsers.add_parser("export", description="Export connection folder to Yaml file")
- exportparser.add_argument("export", nargs="+", action=self._store_type, help="Export /path/to/file.yml [@subfolder1][@folder1] [@subfolderN][@folderN]")
- exportparser.set_defaults(func=self._func_export)
- # IMPORTPARSER
- importparser = subparsers.add_parser("import", description="Import connection folder to config from Yaml file")
- importparser.add_argument("file", nargs=1, action=self._store_type, help="Import /path/to/file.yml")
- importparser.set_defaults(func=self._func_import)
- # AIPARSER
- aiparser = subparsers.add_parser("ai", description="Make request to an AI")
- aiparser.add_argument("ask", nargs='*', help="Ask connpy AI something")
- aiparser.add_argument("--model", nargs=1, help="Set the OPENAI model id")
- aiparser.add_argument("--org", nargs=1, help="Set the OPENAI organization id")
- aiparser.add_argument("--api_key", nargs=1, help="Set the OPENAI API key")
- aiparser.set_defaults(func=self._func_ai)
- #RUNPARSER
- runparser = subparsers.add_parser("run", description="Run scripts or commands on nodes", formatter_class=argparse.RawTextHelpFormatter)
- runparser.add_argument("run", nargs='+', action=self._store_type, help=self._help("run"), default="run")
- runparser.add_argument("-g","--generate", dest="action", action="store_const", help="Generate yaml file template", const="generate", default="run")
- runparser.set_defaults(func=self._func_run)
- #APIPARSER
- apiparser = subparsers.add_parser("api", description="Start and stop connpy api")
- apicrud = apiparser.add_mutually_exclusive_group(required=True)
- apicrud.add_argument("-s","--start", dest="start", nargs="?", action=self._store_type, help="Start conppy api", type=int, default=8048, metavar="PORT")
- apicrud.add_argument("-r","--restart", dest="restart", nargs=0, action=self._store_type, help="Restart conppy api")
- apicrud.add_argument("-x","--stop", dest="stop", nargs=0, action=self._store_type, help="Stop conppy api")
- apicrud.add_argument("-d", "--debug", dest="debug", nargs="?", action=self._store_type, help="Run connpy server on debug mode", type=int, default=8048, metavar="PORT")
- apiparser.set_defaults(func=self._func_api)
- #PLUGINSPARSER
- pluginparser = subparsers.add_parser("plugin", description="Manage plugins")
- plugincrud = pluginparser.add_mutually_exclusive_group(required=True)
- plugincrud.add_argument("--add", metavar=("PLUGIN", "FILE"), nargs=2, help="Add new plugin")
- plugincrud.add_argument("--update", metavar=("PLUGIN", "FILE"), nargs=2, help="Update plugin")
- plugincrud.add_argument("--del", dest="delete", metavar="PLUGIN", nargs=1, help="Delete plugin")
- plugincrud.add_argument("--enable", metavar="PLUGIN", nargs=1, help="Enable plugin")
- plugincrud.add_argument("--disable", metavar="PLUGIN", nargs=1, help="Disable plugin")
- plugincrud.add_argument("--list", dest="list", action="store_true", help="Disable plugin")
- pluginparser.set_defaults(func=self._func_plugin)
- #CONFIGPARSER
- configparser = subparsers.add_parser("config", description="Manage app config")
- configcrud = configparser.add_mutually_exclusive_group(required=True)
- configcrud.add_argument("--allow-uppercase", dest="case", nargs=1, action=self._store_type, help="Allow case sensitive names", choices=["true","false"])
- configcrud.add_argument("--fzf", dest="fzf", nargs=1, action=self._store_type, help="Use fzf for lists", choices=["true","false"])
- configcrud.add_argument("--keepalive", dest="idletime", nargs=1, action=self._store_type, help="Set keepalive time in seconds, 0 to disable", type=int, metavar="INT")
- configcrud.add_argument("--completion", dest="completion", nargs=1, choices=["bash","zsh"], action=self._store_type, help="Get terminal completion configuration for conn")
- configcrud.add_argument("--configfolder", dest="configfolder", nargs=1, action=self._store_type, help="Set the default location for config file", metavar="FOLDER")
- configcrud.add_argument("--openai-org", dest="organization", nargs=1, action=self._store_type, help="Set openai organization", metavar="ORGANIZATION")
- configcrud.add_argument("--openai-api-key", dest="api_key", nargs=1, action=self._store_type, help="Set openai api_key", metavar="API_KEY")
- configcrud.add_argument("--openai-model", dest="model", nargs=1, action=self._store_type, help="Set openai model", metavar="MODEL")
- configparser.set_defaults(func=self._func_others)
- #Add plugins
- file_path = self.config.defaultdir + "/plugins"
- self.plugins = Plugins()
- self.plugins._import_plugins_to_argparse(file_path, subparsers)
- #Generate helps
- nodeparser.usage = self._help("usage", subparsers)
- nodeparser.epilog = self._help("end", subparsers)
- nodeparser.help = self._help("node")
- #Manage sys arguments
- self.commands = list(subparsers.choices.keys())
- profilecmds = []
- for action in profileparser._actions:
- profilecmds.extend(action.option_strings)
- if len(argv) >= 2 and argv[1] == "profile" and argv[0] in profilecmds:
- argv[1] = argv[0]
- argv[0] = "profile"
- if len(argv) < 1 or argv[0] not in self.commands:
- argv.insert(0,"node")
- args = defaultparser.parse_args(argv)
- if args.subcommand in self.plugins.plugins:
- self.plugins.plugins[args.subcommand].Entrypoint(args, self.plugins.plugin_parsers[args.subcommand].parser, self)
- else:
- return args.func(args)
-
-
-
-
class node
(unique, host, options='', logs='', password='', port='', protocol='', user='', config='', tags='', jumphost='')
@@ -3932,7 +2514,8 @@ tasks:
Expand source code
-class node:
+@ClassHook
+class node:
''' This class generates a node object. Containts all the information and methods to connect and interact with a device using ssh or telnet.
### Attributes:
@@ -4055,6 +2638,7 @@ tasks:
else:
self.jumphost = ""
+ @MethodHook
def _passtx(self, passwords, *, keyfile=None):
# decrypts passwords, used by other methdos.
dpass = []
@@ -4077,6 +2661,7 @@ tasks:
+ @MethodHook
def _logfile(self, logfile = None):
# translate logs variables and generate logs path.
if logfile == None:
@@ -4092,6 +2677,7 @@ tasks:
logfile = re.sub(r'\$\{date (.*)}',now.strftime(dateconf.group(1)), logfile)
return logfile
+ @MethodHook
def _logclean(self, logfile, var = False):
#Remove special ascii characters and other stuff from logfile.
if var == False:
@@ -4118,6 +2704,7 @@ tasks:
else:
return t
+ @MethodHook
def _savelog(self):
'''Save the log buffer to the file at regular intervals if there are changes.'''
t = threading.current_thread()
@@ -4133,11 +2720,13 @@ tasks:
prev_size = current_size # Update the previous size
sleep(5)
+ @MethodHook
def _filter(self, a):
#Set time for last input when using interact
self.lastinput = time()
return a
+ @MethodHook
def _keepalive(self):
#Send keepalive ctrl+e when idletime passed without new inputs on interact
self.lastinput = time()
@@ -4149,6 +2738,7 @@ tasks:
sleep(1)
+ @MethodHook
def interact(self, debug = False):
'''
Allow user to interact with the node directly, mostly used by connection manager.
@@ -4190,6 +2780,7 @@ tasks:
print(connect)
exit(1)
+ @MethodHook
def run(self, commands, vars = None,*, folder = '', prompt = r'>$|#$|\$$|>.$|#.$|\$.$', stdout = False, timeout = 10):
'''
Run a command or list of commands on the node and return the output.
@@ -4278,6 +2869,7 @@ tasks:
f.close()
return connect
+ @MethodHook
def test(self, commands, expected, vars = None,*, prompt = r'>$|#$|\$$|>.$|#.$|\$.$', timeout = 10):
'''
Run a command or list of commands on the node, then check if expected value appears on the output after the last command.
@@ -4373,6 +2965,7 @@ tasks:
self.status = 1
return connect
+ @MethodHook
def _connect(self, debug = False, timeout = 10, max_attempts = 3):
# Method to connect to the node, it parse all the information, create the ssh/telnet command and login to the node.
if self.protocol in ["ssh", "sftp"]:
@@ -4488,7 +3081,8 @@ tasks:
Expand source code
-def interact(self, debug = False):
+@MethodHook
+def interact(self, debug = False):
'''
Allow user to interact with the node directly, mostly used by connection manager.
@@ -4570,7 +3164,8 @@ tasks:
Expand source code
-def run(self, commands, vars = None,*, folder = '', prompt = r'>$|#$|\$$|>.$|#.$|\$.$', stdout = False, timeout = 10):
+@MethodHook
+def run(self, commands, vars = None,*, folder = '', prompt = r'>$|#$|\$$|>.$|#.$|\$.$', stdout = False, timeout = 10):
'''
Run a command or list of commands on the node and return the output.
@@ -4699,7 +3294,8 @@ tasks:
Expand source code
-def test(self, commands, expected, vars = None,*, prompt = r'>$|#$|\$$|>.$|#.$|\$.$', timeout = 10):
+@MethodHook
+def test(self, commands, expected, vars = None,*, prompt = r'>$|#$|\$$|>.$|#.$|\$.$', timeout = 10):
'''
Run a command or list of commands on the node, then check if expected value appears on the output after the last command.
@@ -4841,7 +3437,8 @@ tasks:
Expand source code
-class nodes:
+@ClassHook
+class nodes:
''' This class generates a nodes object. Contains a list of node class objects and methods to run multiple tasks on nodes simultaneously.
### Attributes:
@@ -4892,12 +3489,14 @@ tasks:
setattr(self,n,this)
+ @MethodHook
def _splitlist(self, lst, n):
#split a list in lists of n members.
for i in range(0, len(lst), n):
yield lst[i:i + n]
+ @MethodHook
def run(self, commands, vars = None,*, folder = None, prompt = None, stdout = None, parallel = 10, timeout = None):
'''
Run a command or list of commands on all the nodes in nodelist.
@@ -4982,6 +3581,7 @@ tasks:
self.status = status
return output
+ @MethodHook
def test(self, commands, expected, vars = None,*, prompt = None, parallel = 10, timeout = None):
'''
Run a command or list of commands on all the nodes in nodelist, then check if expected value appears on the output after the last command.
@@ -5112,7 +3712,8 @@ tasks:
Expand source code
-def run(self, commands, vars = None,*, folder = None, prompt = None, stdout = None, parallel = 10, timeout = None):
+@MethodHook
+def run(self, commands, vars = None,*, folder = None, prompt = None, stdout = None, parallel = 10, timeout = None):
'''
Run a command or list of commands on all the nodes in nodelist.
@@ -5242,7 +3843,8 @@ tasks:
Expand source code
-def test(self, commands, expected, vars = None,*, prompt = None, parallel = 10, timeout = None):
+@MethodHook
+def test(self, commands, expected, vars = None,*, prompt = None, parallel = 10, timeout = None):
'''
Run a command or list of commands on all the nodes in nodelist, then check if expected value appears on the output after the last command.
@@ -5343,6 +3945,12 @@ tasks:
Plugin Requirements for Connpy
configfile
-connapp
-
-
-
node