Package connpy
Connection manager
Connpy is a SSH, SFTP, Telnet, kubectl, and Docker pod connection manager and automation module for Linux, Mac, and Docker.
Features
- Manage connections using SSH, SFTP, Telnet, kubectl, and Docker exec.
- Set contexts to manage specific nodes from specific contexts (work/home/clients/etc).
- You can generate profiles and reference them from nodes using @profilename so you don't
need to edit multiple nodes when changing passwords or other information.
- Nodes can be stored on @folder or @subfolder@folder to organize your devices. They can
be referenced using node@subfolder@folder or node@folder.
- If you have too many nodes, get a completion script using: conn config --completion.
Or use fzf by installing pyfzf and running conn config --fzf true.
- Create in bulk, copy, move, export, and import nodes for easy management.
- Run automation scripts on network devices.
- Use GPT AI to help you manage your devices.
- Add plugins with your own scripts.
- Much more!
Usage
usage: conn [-h] [--add | --del | --mod | --show | --debug] [node|folder] [--sftp]
conn {profile,move,mv,copy,cp,list,ls,bulk,export,import,ai,run,api,plugin,config,sync,context} ...
positional arguments:
node|folder node[@subfolder][@folder]
Connect to specific node or show all matching nodes
[@subfolder][@folder]
Show all available connections globally or in specified path
options:
-h, --help show this help message and exit
-v, --version Show version
-a, --add Add new node[@subfolder][@folder] or [@subfolder]@folder
-r, --del, --rm Delete node[@subfolder][@folder] or [@subfolder]@folder
-e, --mod, --edit Modify node[@subfolder][@folder]
-s, --show Show node[@subfolder][@folder]
-d, --debug Display all conections steps
-t, --sftp Connects using sftp instead of ssh
Commands:
profile Manage profiles
move(mv) Move node
copy(cp) Copy node
list(ls) List profiles, nodes or folders
bulk Add nodes in bulk
export Export connection folder to Yaml file
import Import connection folder to config from Yaml file
ai Make request to an AI
run Run scripts or commands on nodes
api Start and stop connpy api
plugin Manage plugins
config Manage app config
sync Sync config with Google
context Manage contexts with regex matching
Manage profiles
usage: conn profile [-h] (--add | --del | --mod | --show) profile
positional arguments:
profile Name of profile to manage
options:
-h, --help show this help message and exit
-a, --add Add new profile
-r, --del, --rm Delete profile
-e, --mod, --edit Modify profile
-s, --show Show profile
Examples
#Add new profile
conn profile --add office-user
#Add new folder
conn --add @office
#Add new subfolder
conn --add @datacenter@office
#Add node to subfolder
conn --add server@datacenter@office
#Add node to folder
conn --add pc@office
#Show node information
conn --show server@datacenter@office
#Connect to nodes
conn pc@office
conn server
#Create and set new context
conn context -a office .*@office
conn context --set office
#Run a command in a node
conn run server ls -la
Plugin Requirements for Connpy
General Structure
- The plugin script must be a Python file.
- Only the following top-level elements are allowed in the plugin script:
- Class definitions
- Function definitions
- Import statements
- The
if __name__ == "__main__":
block for standalone execution - Pass statements
Specific Class Requirements
- The plugin script must define specific classes with particular attributes and methods. Each class serves a distinct role within the plugin's architecture:
- Class
Parser
:- Purpose: Handles parsing of command-line arguments.
- Requirements:
- Must contain only one method:
__init__
. - The
__init__
method must initialize at least two attributes:self.parser
: An instance ofargparse.ArgumentParser
.self.description
: A string containing the description of the parser.
- Class
Entrypoint
:- Purpose: Acts as the entry point for plugin execution, utilizing parsed arguments and integrating with the main application.
- Requirements:
- Must have an
__init__
method that accepts exactly three parameters besidesself
:args
: Arguments passed to the plugin.- The parser instance (typically
self.parser
from theParser
class). - The Connapp instance to interact with the Connpy app.
- Class
Preload
:- Purpose: Performs any necessary preliminary setup or configuration independent of the main parsing and entry logic.
- Requirements:
- Contains at least an
__init__
method that accepts parameter connapp besidesself
.
- Contains at least an
Class Dependencies and Combinations
- Dependencies:
Parser
andEntrypoint
are interdependent and must both be present if one is included.Preload
is independent and may exist alone or alongside the other classes.- Valid Combinations:
Parser
andEntrypoint
together.Preload
alone.- All three classes (
Parser
,Entrypoint
,Preload
).
Preload Modifications and Hooks
In the Preload
class of the plugin system, you have the ability to customize the behavior of existing classes and methods within the application through a robust hooking system. This documentation explains how to use the modify
, register_pre_hook
, and register_post_hook
methods to tailor plugin functionality to your needs.
Modifying Classes with modify
The modify
method allows you to alter instances of a class at the time they are created or after their creation. This is particularly useful for setting or modifying configuration settings, altering default behaviors, or adding new functionalities to existing classes without changing the original class definitions.
- Usage: Modify a class to include additional configurations or changes
- Modify Method Signature:
modify(modification_method)
: A function that is invoked with an instance of the class as its argument. This function should perform any modifications directly on this instance.- Modification Method Signature:
- Arguments:
cls
: This function accepts a single argument, the class instance, which it then modifies.
- Modifiable Classes:
connapp.config
connapp.node
connapp.nodes
connapp.ai
-
```python def modify_config(cls): # Example modification: adding a new attribute or modifying an existing one cls.new_attribute = 'New Value'
class Preload: def init(self, connapp): # Applying modification to the config class instance connapp.config.modify(modify_config) ```
Implementing Method Hooks
There are 2 methods that allows you to define custom logic to be executed before (register_pre_hook
) or after (register_post_hook
) the main logic of a method. This is particularly useful for logging, auditing, preprocessing inputs, postprocessing outputs or adding functionalities.
- Usage: Register hooks to methods to execute additional logic before or after the main method execution.
- Registration Methods Signature:
register_pre_hook(pre_hook_method)
: A function that is invoked before the main method is executed. This function should do preprocessing of the arguments.register_post_hook(post_hook_method)
: A function that is invoked after the main method is executed. This function should do postprocessing of the outputs.- Method Signatures for Pre-Hooks
pre_hook_method(*args, **kwargs)
- Arguments:
*args
,**kwargs
: The arguments and keyword arguments that will be passed to the method being hooked. The pre-hook function has the opportunity to inspect and modify these arguments before they are passed to the main method.
- Return:
- Must return a tuple
(args, kwargs)
, which will be used as the new arguments for the main method. If the original arguments are not modified, the function should return them as received.
- Must return a tuple
- Method Signatures for Post-Hooks:
post_hook_method(*args, **kwargs)
- Arguments:
*args
,**kwargs
: The arguments and keyword arguments that were passed to the main method.kwargs["result"]
: The value returned by the main method. This allows the post-hook to inspect and even alter the result before it is returned to the original caller.
- Return:
- Can return a modified result, which will replace the original result of the main method, or simply return
kwargs["result"]
to return the original method result.
- Can return a modified result, which will replace the original result of the main method, or simply return
-
```python def pre_processing_hook(args, *kwargs): print("Pre-processing logic here") # Modify arguments or perform any checks return args, kwargs # Return modified or unmodified args and kwargs
def post_processing_hook(args, *kwargs): print("Post-processing logic here") # Modify the result or perform any final logging or cleanup return kwargs["result"] # Return the modified or unmodified result
class Preload: def init(self, connapp): # Registering a pre-hook connapp.ai.some_method.register_pre_hook(pre_processing_hook)
# Registering a post-hook connapp.node.another_method.register_post_hook(post_processing_hook)
```
Executable Block
- The plugin script can include an executable block:
if __name__ == "__main__":
- This block allows the plugin to be run as a standalone script for testing or independent use.
Script Verification
- The
verify_script
method inplugins.py
is used to check the plugin script's compliance with these standards. - Non-compliant scripts will be rejected to ensure consistency and proper functionality within the plugin system.
Example Script
For a practical example of how to write a compatible plugin script, please refer to the following example:
This script demonstrates the required structure and implementation details according to the plugin system's standards.
http API
With the Connpy API you can run commands on devices using http requests
1. List Nodes
Endpoint: /list_nodes
Method: POST
Description: This route returns a list of nodes. It can also filter the list based on a given keyword.
Request Body:
{
"filter": "<keyword>"
}
filter
(optional): A keyword to filter the list of nodes. It returns only the nodes that contain the keyword. If not provided, the route will return the entire list of nodes.
Response:
- A JSON array containing the filtered list of nodes.
2. Get Nodes
Endpoint: /get_nodes
Method: POST
Description: This route returns a dictionary of nodes with all their attributes. It can also filter the nodes based on a given keyword.
Request Body:
{
"filter": "<keyword>"
}
filter
(optional): A keyword to filter the nodes. It returns only the nodes that contain the keyword. If not provided, the route will return the entire list of nodes.
Response:
- A JSON array containing the filtered nodes.
3. Run Commands
Endpoint: /run_commands
Method: POST
Description: This route runs commands on selected nodes based on the provided action, nodes, and commands. It also supports executing tests by providing expected results.
Request Body:
{
"action": "<action>",
"nodes": "<nodes>",
"commands": "<commands>",
"expected": "<expected>",
"options": "<options>"
}
action
(required): The action to be performed. Possible values:run
ortest
.nodes
(required): A list of nodes or a single node on which the commands will be executed. The nodes can be specified as individual node names or a node group with the@
prefix. Node groups can also be specified as arrays with a list of nodes inside the group.commands
(required): A list of commands to be executed on the specified nodes.expected
(optional, only used when the action istest
): A single expected result for the test.options
(optional): Array to pass options to the run command, options are:prompt
,parallel
,timeout
Response:
- A JSON object with the results of the executed commands on the nodes.
4. Ask AI
Endpoint: /ask_ai
Method: POST
Description: This route sends to chatgpt IA a request that will parse it into an understandable output for the application and then run the request.
Request Body:
{
"input": "<user input request>",
"dryrun": true or false
}
input
(required): The user input requesting the AI to perform an action on some devices or get the devices list.dryrun
(optional): If set to true, it will return the parameters to run the request but it won't run it. default is false.
Response:
- A JSON array containing the action to run and the parameters and the result of the action.
Automation module
The automation module
Standalone module
import connpy
router = connpy.node("uniqueName","ip/host", user="user", password="pass")
router.run(["term len 0","show run"])
print(router.output)
hasip = router.test("show ip int brief","1.1.1.1")
if hasip:
print("Router has ip 1.1.1.1")
else:
print("router does not have ip 1.1.1.1")
Using manager configuration
import connpy
conf = connpy.configfile()
device = conf.getitem("server@office")
server = connpy.node("unique name", **device, config=conf)
result = server.run(["cd /", "ls -la"])
print(result)
Running parallel tasks
import connpy
conf = connpy.configfile()
#You can get the nodes from the config from a folder and fitlering in it
nodes = conf.getitem("@office", ["router1", "router2", "router3"])
#You can also get each node individually:
nodes = {}
nodes["router1"] = conf.getitem("router1@office")
nodes["router2"] = conf.getitem("router2@office")
nodes["router10"] = conf.getitem("router10@datacenter")
#Also, you can create the nodes manually:
nodes = {}
nodes["router1"] = {"host": "1.1.1.1", "user": "user", "password": "pass1"}
nodes["router2"] = {"host": "1.1.1.2", "user": "user", "password": "pass2"}
nodes["router3"] = {"host": "1.1.1.2", "user": "user", "password": "pass3"}
#Finally you run some tasks on the nodes
mynodes = connpy.nodes(nodes, config = conf)
result = mynodes.test(["show ip int br"], "1.1.1.2")
for i in result:
print("---" + i + "---")
print(result[i])
print()
# Or for one specific node
mynodes.router1.run(["term len 0". "show run"], folder = "/home/user/logs")
Using variables
import connpy
config = connpy.configfile()
nodes = config.getitem("@office", ["router1", "router2", "router3"])
commands = []
commands.append("config t")
commands.append("interface lo {id}")
commands.append("ip add {ip} {mask}")
commands.append("end")
variables = {}
variables["router1@office"] = {"ip": "10.57.57.1"}
variables["router2@office"] = {"ip": "10.57.57.2"}
variables["router3@office"] = {"ip": "10.57.57.3"}
variables["__global__"] = {"id": "57"}
variables["__global__"]["mask"] = "255.255.255.255"
expected = "!"
routers = connpy.nodes(nodes, config = config)
routers.run(commands, variables)
routers.test("ping {ip}", expected, variables)
for key in routers.result:
print(key, ' ---> ', ("pass" if routers.result[key] else "fail"))
Using AI
import connpy
conf = connpy.configfile()
organization = 'openai-org'
api_key = "openai-key"
myia = connpy.ai(conf, organization, api_key)
input = "go to router 1 and get me the full configuration"
result = myia.ask(input, dryrun = False)
print(result)
Classes
class Plugins
-
Expand source code
class Plugins: def __init__(self): self.plugins = {} self.plugin_parsers = {} self.preloads = {} def verify_script(self, file_path): """ Verifies that a given Python script meets specific structural requirements. This function checks a Python script for compliance with predefined structural rules. It ensures that the script contains only allowed top-level elements (functions, classes, imports, pass statements, and a specific if __name__ block) and that it includes mandatory classes with specific attributes and methods. ### Arguments: - file_path (str): The file path of the Python script to be verified. ### Returns: - str: A message indicating the type of violation if the script doesn't meet the requirements, or False if all requirements are met. ### Verifications: - The presence of only allowed top-level elements. - The existence of two specific classes: 'Parser' and 'Entrypoint'. and/or specific class: Preload. - 'Parser' class must only have an '__init__' method and must assign 'self.parser' and 'self.description'. - 'Entrypoint' class must have an '__init__' method accepting specific arguments. If any of these checks fail, the function returns an error message indicating the reason. If the script passes all checks, the function returns False, indicating successful verification. ### Exceptions: - SyntaxError: If the script contains a syntax error, it is caught and returned as a part of the error message. """ with open(file_path, 'r') as file: source_code = file.read() try: tree = ast.parse(source_code) except SyntaxError as e: return f"Syntax error in file: {e}" has_parser = False has_entrypoint = False has_preload = False for node in tree.body: # Allow only function definitions, class definitions, and pass statements at top-level if isinstance(node, ast.If): # Check for the 'if __name__ == "__main__":' block if not (isinstance(node.test, ast.Compare) and isinstance(node.test.left, ast.Name) and node.test.left.id == '__name__' and isinstance(node.test.comparators[0], ast.Str) and node.test.comparators[0].s == '__main__'): return "Only __name__ == __main__ If is allowed" elif not isinstance(node, (ast.FunctionDef, ast.ClassDef, ast.Import, ast.ImportFrom, ast.Pass)): return f"Plugin can only have pass, functions, classes and imports. {node} is not allowed" # Reject any other AST types if isinstance(node, ast.ClassDef): if node.name == 'Parser': has_parser = True # Ensure Parser class has only the __init__ method and assigns self.parser if not all(isinstance(method, ast.FunctionDef) and method.name == '__init__' for method in node.body): return "Parser class should only have __init__ method" # Check if 'self.parser' and 'self.description' are assigned in __init__ method init_method = node.body[0] assigned_attrs = [target.attr for expr in init_method.body if isinstance(expr, ast.Assign) for target in expr.targets if isinstance(target, ast.Attribute) and isinstance(target.value, ast.Name) and target.value.id == 'self'] if 'parser' not in assigned_attrs or 'description' not in assigned_attrs: return "Parser class should set self.parser and self.description" # 'self.parser' or 'self.description' not assigned in __init__ elif node.name == 'Entrypoint': has_entrypoint = True init_method = next((item for item in node.body if isinstance(item, ast.FunctionDef) and item.name == '__init__'), None) if not init_method or len(init_method.args.args) != 4: # self, args, parser, conapp return "Entrypoint class should have method __init__ and accept only arguments: args, parser and connapp" # 'Entrypoint' __init__ does not have correct signature elif node.name == 'Preload': has_preload = True init_method = next((item for item in node.body if isinstance(item, ast.FunctionDef) and item.name == '__init__'), None) if not init_method or len(init_method.args.args) != 2: # self, connapp return "Preload class should have method __init__ and accept only argument: connapp" # 'Preload' __init__ does not have correct signature # Applying the combination logic based on class presence if has_parser and not has_entrypoint: return "Parser requires Entrypoint class to be present." elif has_entrypoint and not has_parser: return "Entrypoint requires Parser class to be present." if not (has_parser or has_entrypoint or has_preload): return "No valid class (Parser, Entrypoint, or Preload) found." return False # All requirements met, no error def _import_from_path(self, path): spec = importlib.util.spec_from_file_location("module.name", path) module = importlib.util.module_from_spec(spec) sys.modules["module.name"] = module spec.loader.exec_module(module) return module def _import_plugins_to_argparse(self, directory, subparsers): for filename in os.listdir(directory): commands = subparsers.choices.keys() if filename.endswith(".py"): root_filename = os.path.splitext(filename)[0] if root_filename in commands: continue # Construct the full path filepath = os.path.join(directory, filename) check_file = self.verify_script(filepath) if check_file: print(f"Failed to load plugin: {filename}. Reason: {check_file}") continue else: self.plugins[root_filename] = self._import_from_path(filepath) if hasattr(self.plugins[root_filename], "Parser"): self.plugin_parsers[root_filename] = self.plugins[root_filename].Parser() subparsers.add_parser(root_filename, parents=[self.plugin_parsers[root_filename].parser], add_help=False, description=self.plugin_parsers[root_filename].description) if hasattr(self.plugins[root_filename], "Preload"): self.preloads[root_filename] = self.plugins[root_filename]
Methods
def verify_script(self, file_path)
-
Expand source code
def verify_script(self, file_path): """ Verifies that a given Python script meets specific structural requirements. This function checks a Python script for compliance with predefined structural rules. It ensures that the script contains only allowed top-level elements (functions, classes, imports, pass statements, and a specific if __name__ block) and that it includes mandatory classes with specific attributes and methods. ### Arguments: - file_path (str): The file path of the Python script to be verified. ### Returns: - str: A message indicating the type of violation if the script doesn't meet the requirements, or False if all requirements are met. ### Verifications: - The presence of only allowed top-level elements. - The existence of two specific classes: 'Parser' and 'Entrypoint'. and/or specific class: Preload. - 'Parser' class must only have an '__init__' method and must assign 'self.parser' and 'self.description'. - 'Entrypoint' class must have an '__init__' method accepting specific arguments. If any of these checks fail, the function returns an error message indicating the reason. If the script passes all checks, the function returns False, indicating successful verification. ### Exceptions: - SyntaxError: If the script contains a syntax error, it is caught and returned as a part of the error message. """ with open(file_path, 'r') as file: source_code = file.read() try: tree = ast.parse(source_code) except SyntaxError as e: return f"Syntax error in file: {e}" has_parser = False has_entrypoint = False has_preload = False for node in tree.body: # Allow only function definitions, class definitions, and pass statements at top-level if isinstance(node, ast.If): # Check for the 'if __name__ == "__main__":' block if not (isinstance(node.test, ast.Compare) and isinstance(node.test.left, ast.Name) and node.test.left.id == '__name__' and isinstance(node.test.comparators[0], ast.Str) and node.test.comparators[0].s == '__main__'): return "Only __name__ == __main__ If is allowed" elif not isinstance(node, (ast.FunctionDef, ast.ClassDef, ast.Import, ast.ImportFrom, ast.Pass)): return f"Plugin can only have pass, functions, classes and imports. {node} is not allowed" # Reject any other AST types if isinstance(node, ast.ClassDef): if node.name == 'Parser': has_parser = True # Ensure Parser class has only the __init__ method and assigns self.parser if not all(isinstance(method, ast.FunctionDef) and method.name == '__init__' for method in node.body): return "Parser class should only have __init__ method" # Check if 'self.parser' and 'self.description' are assigned in __init__ method init_method = node.body[0] assigned_attrs = [target.attr for expr in init_method.body if isinstance(expr, ast.Assign) for target in expr.targets if isinstance(target, ast.Attribute) and isinstance(target.value, ast.Name) and target.value.id == 'self'] if 'parser' not in assigned_attrs or 'description' not in assigned_attrs: return "Parser class should set self.parser and self.description" # 'self.parser' or 'self.description' not assigned in __init__ elif node.name == 'Entrypoint': has_entrypoint = True init_method = next((item for item in node.body if isinstance(item, ast.FunctionDef) and item.name == '__init__'), None) if not init_method or len(init_method.args.args) != 4: # self, args, parser, conapp return "Entrypoint class should have method __init__ and accept only arguments: args, parser and connapp" # 'Entrypoint' __init__ does not have correct signature elif node.name == 'Preload': has_preload = True init_method = next((item for item in node.body if isinstance(item, ast.FunctionDef) and item.name == '__init__'), None) if not init_method or len(init_method.args.args) != 2: # self, connapp return "Preload class should have method __init__ and accept only argument: connapp" # 'Preload' __init__ does not have correct signature # Applying the combination logic based on class presence if has_parser and not has_entrypoint: return "Parser requires Entrypoint class to be present." elif has_entrypoint and not has_parser: return "Entrypoint requires Parser class to be present." if not (has_parser or has_entrypoint or has_preload): return "No valid class (Parser, Entrypoint, or Preload) found." return False # All requirements met, no error
Verifies that a given Python script meets specific structural requirements.
This function checks a Python script for compliance with predefined structural rules. It ensures that the script contains only allowed top-level elements (functions, classes, imports, pass statements, and a specific if name block) and that it includes mandatory classes with specific attributes and methods.
Arguments:
- file_path (str): The file path of the Python script to be verified.
Returns:
- str: A message indicating the type of violation if the script doesn't meet the requirements, or False if all requirements are met.
Verifications:
- The presence of only allowed top-level elements. - The existence of two specific classes: 'Parser' and 'Entrypoint'. and/or specific class: Preload. - 'Parser' class must only have an '__init__' method and must assign 'self.parser' and 'self.description'. - 'Entrypoint' class must have an '__init__' method accepting specific arguments.
If any of these checks fail, the function returns an error message indicating the reason. If the script passes all checks, the function returns False, indicating successful verification.
Exceptions:
- SyntaxError: If the script contains a syntax error, it is caught and returned as a part of the error message.
class ai (config, org=None, api_key=None, model=None, temp=0.7)
-
Expand source code
@ClassHook class ai: ''' This class generates a ai object. Containts all the information and methods to make requests to openAI chatGPT to run actions on the application. ### Attributes: - model (str): Model of GPT api to use. Default is gpt-4o-mini. - temp (float): Value between 0 and 1 that control the randomness of generated text, with higher values increasing creativity. Default is 0.7. ''' def __init__(self, config, org = None, api_key = None, model = None, temp = 0.7): ''' ### Parameters: - config (obj): Pass the object created with class configfile with key for decryption and extra configuration if you are using connection manager. ### Optional Parameters: - org (str): A unique token identifying the user organization to interact with the API. - api_key (str): A unique authentication token required to access and interact with the API. - model (str): Model of GPT api to use. Default is gpt-4o-mini. - temp (float): Value between 0 and 1 that control the randomness of generated text, with higher values increasing creativity. Default is 0.7. ''' self.config = config if org: openai.organization = org else: try: openai.organization = self.config.config["openai"]["organization"] except: raise ValueError("Missing openai organization") if api_key: openai.api_key = api_key else: try: openai.api_key = self.config.config["openai"]["api_key"] except: raise ValueError("Missing openai api_key") if model: self.model = model else: try: self.model = self.config.config["openai"]["model"] except: self.model = "gpt-4o-mini" self.temp = temp self.__prompt = {} self.__prompt["original_system"] = """ You are the AI chatbot and assistant of a network connection manager and automation app called connpy. When provided with user input analyze the input and extract the following information. If user wants to chat just reply and don't call a function: - type: Given a user input, identify the type of request they want to make. The input will represent one of two options: 1. "command" - The user wants to get information from devices by running commands. 2. "list_nodes" - The user wants to get a list of nodes, devices, servers, or routers. The 'type' field should reflect whether the user input is a command or a request for a list of nodes. - filter: One or more regex patterns indicating the device or group of devices the command should be run on. The filter can have different formats, such as: - hostname - hostname@folder - hostname@subfolder@folder - partofhostname - @folder - @subfolder@folder - regex_pattern The filter should be extracted from the user input exactly as it was provided. Always preserve the exact filter pattern provided by the user, with no modifications. Do not process any regex, the application can do that. """ self.__prompt["original_user"] = "Get the IP addresses of loopback0 for all routers from w2az1 and e1.*(prod|dev) and check if they have the ip 192.168.1.1" self.__prompt["original_assistant"] = {"name": "get_network_device_info", "arguments": "{\n \"type\": \"command\",\n \"filter\": [\"w2az1\",\"e1.*(prod|dev)\"]\n}"} self.__prompt["original_function"] = {} self.__prompt["original_function"]["name"] = "get_network_device_info" self.__prompt["original_function"]["descriptions"] = "You are the AI chatbot and assistant of a network connection manager and automation app called connpy. When provided with user input analyze the input and extract the information acording to the function, If user wants to chat just reply and don't call a function", self.__prompt["original_function"]["parameters"] = {} self.__prompt["original_function"]["parameters"]["type"] = "object" self.__prompt["original_function"]["parameters"]["properties"] = {} self.__prompt["original_function"]["parameters"]["properties"]["type"] = {} self.__prompt["original_function"]["parameters"]["properties"]["type"]["type"] = "string" self.__prompt["original_function"]["parameters"]["properties"]["type"]["description"] =""" Categorize the user's request based on the operation they want to perform on the nodes. The requests can be classified into the following categories: 1. "command" - This represents a request to retrieve specific information or configurations from nodes. An example would be: "go to routers in @office and get the config". 2. "list_nodes" - This is when the user wants a list of nodes. An example could be: "get me the nodes in @office". """ self.__prompt["original_function"]["parameters"]["properties"]["type"]["enum"] = ["command", "list_nodes"] self.__prompt["original_function"]["parameters"]["properties"]["filter"] = {} self.__prompt["original_function"]["parameters"]["properties"]["filter"]["type"] = "array" self.__prompt["original_function"]["parameters"]["properties"]["filter"]["items"] = {} self.__prompt["original_function"]["parameters"]["properties"]["filter"]["items"]["type"] = "string" self.__prompt["original_function"]["parameters"]["properties"]["filter"]["items"]["description"] = """One or more regex patterns indicating the device or group of devices the command should be run on. The filter should be extracted from the user input exactly as it was provided. The filter can have different formats, such as: - hostname - hostname@folder - hostname@subfolder@folder - partofhostname - @folder - @subfolder@folder - regex_pattern """ self.__prompt["original_function"]["parameters"]["required"] = ["type", "filter"] self.__prompt["command_system"] = """ For each OS listed below, provide the command(s) needed to perform the specified action, depending on the device OS (e.g., Cisco IOSXR router, Linux server). The application knows how to connect to devices via SSH, so you only need to provide the command(s) to run after connecting. This includes access configuration mode and commiting if required. If the commands needed are not for the specific OS type, just send an empty list (e.g., []). Note: Preserving the integrity of user-provided commands is of utmost importance. If a user has provided a specific command to run, include that command exactly as it was given, even if it's not recognized or understood. Under no circumstances should you modify or alter user-provided commands. """ self.__prompt["command_user"]= """ input: show me the full configuration for all this devices: OS: cisco ios: """ self.__prompt["command_assistant"] = {"name": "get_commands", "arguments": "{\n \"cisco ios\": \"show running-configuration\"\n}"} self.__prompt["command_function"] = {} self.__prompt["command_function"]["name"] = "get_commands" self.__prompt["command_function"]["descriptions"] = """ For each OS listed below, provide the command(s) needed to perform the specified action, depending on the device OS (e.g., Cisco IOSXR router, Linux server). The application knows how to connect to devices via SSH, so you only need to provide the command(s) to run after connecting. This includes access configuration mode and commiting if required. If the commands needed are not for the specific OS type, just send an empty list (e.g., []). """ self.__prompt["command_function"]["parameters"] = {} self.__prompt["command_function"]["parameters"]["type"] = "object" self.__prompt["command_function"]["parameters"]["properties"] = {} self.__prompt["confirmation_system"] = """ Please analyze the user's input and categorize it as either an affirmation or negation. Based on this analysis, respond with: 'true' if the input is an affirmation like 'do it', 'go ahead', 'sure', etc. 'false' if the input is a negation. 'none' If the input does not fit into either of these categories. """ self.__prompt["confirmation_user"] = "Yes go ahead!" self.__prompt["confirmation_assistant"] = "True" self.__prompt["confirmation_function"] = {} self.__prompt["confirmation_function"]["name"] = "get_confirmation" self.__prompt["confirmation_function"]["descriptions"] = """ Analize user request and respond: """ self.__prompt["confirmation_function"]["parameters"] = {} self.__prompt["confirmation_function"]["parameters"]["type"] = "object" self.__prompt["confirmation_function"]["parameters"]["properties"] = {} self.__prompt["confirmation_function"]["parameters"]["properties"]["result"] = {} self.__prompt["confirmation_function"]["parameters"]["properties"]["result"]["description"] = """'true' if the input is an affirmation like 'do it', 'go ahead', 'sure', etc. 'false' if the input is a negation. 'none' If the input does not fit into either of these categories""" self.__prompt["confirmation_function"]["parameters"]["properties"]["result"]["type"] = "string" self.__prompt["confirmation_function"]["parameters"]["properties"]["result"]["enum"] = ["true", "false", "none"] self.__prompt["confirmation_function"]["parameters"]["properties"]["response"] = {} self.__prompt["confirmation_function"]["parameters"]["properties"]["response"]["description"] = "If the user don't message is not an affiramtion or negation, kindly ask the user to rephrase." self.__prompt["confirmation_function"]["parameters"]["properties"]["response"]["type"] = "string" self.__prompt["confirmation_function"]["parameters"]["required"] = ["result"] @MethodHook def _retry_function(self, function, max_retries, backoff_num, *args): #Retry openai requests retries = 0 while retries < max_retries: try: myfunction = function(*args) break except: wait_time = backoff_num * (2 ** retries) time.sleep(wait_time) retries += 1 continue if retries == max_retries: myfunction = False return myfunction @MethodHook def _clean_command_response(self, raw_response, node_list): # Parse response for command request to openAI GPT. info_dict = {} info_dict["commands"] = [] info_dict["variables"] = {} info_dict["variables"]["__global__"] = {} for key, value in node_list.items(): newvalue = {} commands = raw_response[value] # Ensure commands is a list if isinstance(commands, str): commands = [commands] # Determine the number of digits required for zero-padding num_commands = len(commands) num_digits = len(str(num_commands)) for i, e in enumerate(commands, start=1): # Zero-pad the command number command_num = f"command{str(i).zfill(num_digits)}" newvalue[command_num] = e if f"{{command{i}}}" not in info_dict["commands"]: info_dict["commands"].append(f"{{{command_num}}}") info_dict["variables"]["__global__"][command_num] = "" info_dict["variables"][key] = newvalue return info_dict @MethodHook def _get_commands(self, user_input, nodes): #Send the request for commands for each device to openAI GPT. output_list = [] command_function = deepcopy(self.__prompt["command_function"]) node_list = {} for key, value in nodes.items(): tags = value.get('tags', {}) try: if os_value := tags.get('os'): node_list[key] = os_value output_list.append(f"{os_value}") command_function["parameters"]["properties"][os_value] = {} command_function["parameters"]["properties"][os_value]["type"] = "array" command_function["parameters"]["properties"][os_value]["description"] = f"OS: {os_value}" command_function["parameters"]["properties"][os_value]["items"] = {} command_function["parameters"]["properties"][os_value]["items"]["type"] = "string" except: pass output_str = "\n".join(list(set(output_list))) command_input = f"input: {user_input}\n\nOS:\n{output_str}" message = [] message.append({"role": "system", "content": dedent(self.__prompt["command_system"]).strip()}) message.append({"role": "user", "content": dedent(self.__prompt["command_user"]).strip()}) message.append({"role": "assistant", "content": None, "function_call": self.__prompt["command_assistant"]}) message.append({"role": "user", "content": command_input}) functions = [command_function] response = openai.ChatCompletion.create( model=self.model, messages=message, functions=functions, function_call={"name": "get_commands"}, temperature=self.temp ) output = {} result = response["choices"][0]["message"].to_dict() json_result = json.loads(result["function_call"]["arguments"]) output["response"] = self._clean_command_response(json_result, node_list) return output @MethodHook def _get_filter(self, user_input, chat_history = None): #Send the request to identify the filter and other attributes from the user input to GPT. message = [] message.append({"role": "system", "content": dedent(self.__prompt["original_system"]).strip()}) message.append({"role": "user", "content": dedent(self.__prompt["original_user"]).strip()}) message.append({"role": "assistant", "content": None, "function_call": self.__prompt["original_assistant"]}) functions = [self.__prompt["original_function"]] if not chat_history: chat_history = [] chat_history.append({"role": "user", "content": user_input}) message.extend(chat_history) response = openai.ChatCompletion.create( model=self.model, messages=message, functions=functions, function_call="auto", temperature=self.temp, top_p=1 ) def extract_quoted_strings(text): pattern = r'["\'](.*?)["\']' matches = re.findall(pattern, text) return matches expected = extract_quoted_strings(user_input) output = {} result = response["choices"][0]["message"].to_dict() if result["content"]: output["app_related"] = False chat_history.append({"role": "assistant", "content": result["content"]}) output["response"] = result["content"] else: json_result = json.loads(result["function_call"]["arguments"]) output["app_related"] = True output["filter"] = json_result["filter"] output["type"] = json_result["type"] chat_history.append({"role": "assistant", "content": result["content"], "function_call": {"name": result["function_call"]["name"], "arguments": json.dumps(json_result)}}) output["expected"] = expected output["chat_history"] = chat_history return output @MethodHook def _get_confirmation(self, user_input): #Send the request to identify if user is confirming or denying the task message = [] message.append({"role": "user", "content": user_input}) functions = [self.__prompt["confirmation_function"]] response = openai.ChatCompletion.create( model=self.model, messages=message, functions=functions, function_call={"name": "get_confirmation"}, temperature=self.temp, top_p=1 ) result = response["choices"][0]["message"].to_dict() json_result = json.loads(result["function_call"]["arguments"]) output = {} if json_result["result"] == "true": output["result"] = True elif json_result["result"] == "false": output["result"] = False elif json_result["result"] == "none": output["result"] = json_result["response"] return output @MethodHook def confirm(self, user_input, max_retries=3, backoff_num=1): ''' Send the user input to openAI GPT and verify if response is afirmative or negative. ### Parameters: - user_input (str): User response confirming or denying. ### Optional Parameters: - max_retries (int): Maximum number of retries for gpt api. - backoff_num (int): Backoff factor for exponential wait time between retries. ### Returns: bool or str: True, False or str if AI coudn't understand the response ''' result = self._retry_function(self._get_confirmation, max_retries, backoff_num, user_input) if result: output = result["result"] else: output = f"{self.model} api is not responding right now, please try again later." return output @MethodHook def ask(self, user_input, dryrun = False, chat_history = None, max_retries=3, backoff_num=1): ''' Send the user input to openAI GPT and parse the response to run an action in the application. ### Parameters: - user_input (str): Request to send to openAI that will be parsed and returned to execute on the application. AI understands the following tasks: - Run a command on a group of devices. - List a group of devices. - Test a command on a group of devices and verify if the output contain an expected value. ### Optional Parameters: - dryrun (bool): Set to true to get the arguments to use to run in the app. Default is false and it will run the actions directly. - chat_history (list): List in gpt api format for the chat history. - max_retries (int): Maximum number of retries for gpt api. - backoff_num (int): Backoff factor for exponential wait time between retries. ### Returns: dict: Dictionary formed with the following keys: - input: User input received - app_related: True if GPT detected the request to be related to the application. - dryrun: True/False - response: If the request is not related to the app. this key will contain chatGPT answer. - action: The action detected by the AI to run in the app. - filter: If it was detected by the AI, the filter used to get the list of nodes to work on. - nodes: If it's not a dryrun, the list of nodes matched by the filter. - args: A dictionary of arguments required to run command(s) on the nodes. - result: A dictionary with the output of the commands or the test. - chat_history: The chat history between user and chatbot. It can be used as an attribute for next request. ''' output = {} output["dryrun"] = dryrun output["input"] = user_input original = self._retry_function(self._get_filter, max_retries, backoff_num, user_input, chat_history) if not original: output["app_related"] = False output["response"] = f"{self.model} api is not responding right now, please try again later." return output output["app_related"] = original["app_related"] output["chat_history"] = original["chat_history"] if not output["app_related"]: output["response"] = original["response"] else: type = original["type"] if "filter" in original: output["filter"] = original["filter"] if not self.config.config["case"]: if isinstance(output["filter"], list): output["filter"] = [item.lower() for item in output["filter"]] else: output["filter"] = output["filter"].lower() if not dryrun or type == "command": thisnodes = self.config._getallnodesfull(output["filter"]) output["nodes"] = list(thisnodes.keys()) if not type == "command": output["action"] = "list_nodes" else: if thisnodes: commands = self._retry_function(self._get_commands, max_retries, backoff_num, user_input, thisnodes) else: output["app_related"] = False filterlist = ", ".join(output["filter"]) output["response"] = f"I'm sorry, I coudn't find any device with filter{'s' if len(output['filter']) != 1 else ''}: {filterlist}." return output if not commands: output["app_related"] = False output["response"] = f"{self.model} api is not responding right now, please try again later." return output output["args"] = {} output["args"]["commands"] = commands["response"]["commands"] output["args"]["vars"] = commands["response"]["variables"] output["nodes"] = [item for item in output["nodes"] if output["args"]["vars"].get(item)] if original.get("expected"): output["args"]["expected"] = original["expected"] output["action"] = "test" else: output["action"] = "run" if dryrun: output["task"] = [] if output["action"] == "test": output["task"].append({"Task": "Verify if expected value is in command(s) output"}) output["task"].append({"Expected value to verify": output["args"]["expected"]}) elif output["action"] == "run": output["task"].append({"Task": "Run command(s) on devices and return output"}) varstocommands = deepcopy(output["args"]["vars"]) del varstocommands["__global__"] output["task"].append({"Devices": varstocommands}) if not dryrun: mynodes = nodes(self.config.getitems(output["nodes"]),config=self.config) if output["action"] == "test": output["result"] = mynodes.test(**output["args"]) output["logs"] = mynodes.output elif output["action"] == "run": output["result"] = mynodes.run(**output["args"]) return output
This class generates a ai object. Containts all the information and methods to make requests to openAI chatGPT to run actions on the application.
Attributes:
- model (str): Model of GPT api to use. Default is gpt-4o-mini. - temp (float): Value between 0 and 1 that control the randomness of generated text, with higher values increasing creativity. Default is 0.7.
Parameters:
- config (obj): Pass the object created with class configfile with key for decryption and extra configuration if you are using connection manager.
Optional Parameters:
- org (str): A unique token identifying the user organization to interact with the API. - api_key (str): A unique authentication token required to access and interact with the API. - model (str): Model of GPT api to use. Default is gpt-4o-mini. - temp (float): Value between 0 and 1 that control the randomness of generated text, with higher values increasing creativity. Default is 0.7.
Methods
def ask(self, user_input, dryrun=False, chat_history=None, max_retries=3, backoff_num=1)
-
Expand source code
@MethodHook def ask(self, user_input, dryrun = False, chat_history = None, max_retries=3, backoff_num=1): ''' Send the user input to openAI GPT and parse the response to run an action in the application. ### Parameters: - user_input (str): Request to send to openAI that will be parsed and returned to execute on the application. AI understands the following tasks: - Run a command on a group of devices. - List a group of devices. - Test a command on a group of devices and verify if the output contain an expected value. ### Optional Parameters: - dryrun (bool): Set to true to get the arguments to use to run in the app. Default is false and it will run the actions directly. - chat_history (list): List in gpt api format for the chat history. - max_retries (int): Maximum number of retries for gpt api. - backoff_num (int): Backoff factor for exponential wait time between retries. ### Returns: dict: Dictionary formed with the following keys: - input: User input received - app_related: True if GPT detected the request to be related to the application. - dryrun: True/False - response: If the request is not related to the app. this key will contain chatGPT answer. - action: The action detected by the AI to run in the app. - filter: If it was detected by the AI, the filter used to get the list of nodes to work on. - nodes: If it's not a dryrun, the list of nodes matched by the filter. - args: A dictionary of arguments required to run command(s) on the nodes. - result: A dictionary with the output of the commands or the test. - chat_history: The chat history between user and chatbot. It can be used as an attribute for next request. ''' output = {} output["dryrun"] = dryrun output["input"] = user_input original = self._retry_function(self._get_filter, max_retries, backoff_num, user_input, chat_history) if not original: output["app_related"] = False output["response"] = f"{self.model} api is not responding right now, please try again later." return output output["app_related"] = original["app_related"] output["chat_history"] = original["chat_history"] if not output["app_related"]: output["response"] = original["response"] else: type = original["type"] if "filter" in original: output["filter"] = original["filter"] if not self.config.config["case"]: if isinstance(output["filter"], list): output["filter"] = [item.lower() for item in output["filter"]] else: output["filter"] = output["filter"].lower() if not dryrun or type == "command": thisnodes = self.config._getallnodesfull(output["filter"]) output["nodes"] = list(thisnodes.keys()) if not type == "command": output["action"] = "list_nodes" else: if thisnodes: commands = self._retry_function(self._get_commands, max_retries, backoff_num, user_input, thisnodes) else: output["app_related"] = False filterlist = ", ".join(output["filter"]) output["response"] = f"I'm sorry, I coudn't find any device with filter{'s' if len(output['filter']) != 1 else ''}: {filterlist}." return output if not commands: output["app_related"] = False output["response"] = f"{self.model} api is not responding right now, please try again later." return output output["args"] = {} output["args"]["commands"] = commands["response"]["commands"] output["args"]["vars"] = commands["response"]["variables"] output["nodes"] = [item for item in output["nodes"] if output["args"]["vars"].get(item)] if original.get("expected"): output["args"]["expected"] = original["expected"] output["action"] = "test" else: output["action"] = "run" if dryrun: output["task"] = [] if output["action"] == "test": output["task"].append({"Task": "Verify if expected value is in command(s) output"}) output["task"].append({"Expected value to verify": output["args"]["expected"]}) elif output["action"] == "run": output["task"].append({"Task": "Run command(s) on devices and return output"}) varstocommands = deepcopy(output["args"]["vars"]) del varstocommands["__global__"] output["task"].append({"Devices": varstocommands}) if not dryrun: mynodes = nodes(self.config.getitems(output["nodes"]),config=self.config) if output["action"] == "test": output["result"] = mynodes.test(**output["args"]) output["logs"] = mynodes.output elif output["action"] == "run": output["result"] = mynodes.run(**output["args"]) return output
Send the user input to openAI GPT and parse the response to run an action in the application.
Parameters:
- user_input (str): Request to send to openAI that will be parsed and returned to execute on the application. AI understands the following tasks: - Run a command on a group of devices. - List a group of devices. - Test a command on a group of devices and verify if the output contain an expected value.
Optional Parameters:
- dryrun (bool): Set to true to get the arguments to use to run in the app. Default is false and it will run the actions directly. - chat_history (list): List in gpt api format for the chat history. - max_retries (int): Maximum number of retries for gpt api. - backoff_num (int): Backoff factor for exponential wait time between retries.
Returns:
dict: Dictionary formed with the following keys: - input: User input received - app_related: True if GPT detected the request to be related to the application. - dryrun: True/False - response: If the request is not related to the app. this key will contain chatGPT answer. - action: The action detected by the AI to run in the app. - filter: If it was detected by the AI, the filter used to get the list of nodes to work on. - nodes: If it's not a dryrun, the list of nodes matched by the filter. - args: A dictionary of arguments required to run command(s) on the nodes. - result: A dictionary with the output of the commands or the test. - chat_history: The chat history between user and chatbot. It can be used as an attribute for next request.
def confirm(self, user_input, max_retries=3, backoff_num=1)
-
Expand source code
@MethodHook def confirm(self, user_input, max_retries=3, backoff_num=1): ''' Send the user input to openAI GPT and verify if response is afirmative or negative. ### Parameters: - user_input (str): User response confirming or denying. ### Optional Parameters: - max_retries (int): Maximum number of retries for gpt api. - backoff_num (int): Backoff factor for exponential wait time between retries. ### Returns: bool or str: True, False or str if AI coudn't understand the response ''' result = self._retry_function(self._get_confirmation, max_retries, backoff_num, user_input) if result: output = result["result"] else: output = f"{self.model} api is not responding right now, please try again later." return output
Send the user input to openAI GPT and verify if response is afirmative or negative.
Parameters:
- user_input (str): User response confirming or denying.
Optional Parameters:
- max_retries (int): Maximum number of retries for gpt api. - backoff_num (int): Backoff factor for exponential wait time between retries.
Returns:
bool or str: True, False or str if AI coudn't understand the response
class configfile (conf=None, key=None)
-
Expand source code
@ClassHook class configfile: ''' This class generates a configfile object. Containts a dictionary storing, config, nodes and profiles, normaly used by connection manager. ### Attributes: - file (str): Path/file to config file. - key (str): Path/file to RSA key file. - config (dict): Dictionary containing information of connection manager configuration. - connections (dict): Dictionary containing all the nodes added to connection manager. - profiles (dict): Dictionary containing all the profiles added to connection manager. - privatekey (obj): Object containing the private key to encrypt passwords. - publickey (obj): Object containing the public key to decrypt passwords. ''' def __init__(self, conf = None, key = None): ''' ### Optional Parameters: - conf (str): Path/file to config file. If left empty default path is ~/.config/conn/config.json - key (str): Path/file to RSA key file. If left empty default path is ~/.config/conn/.osk ''' home = os.path.expanduser("~") defaultdir = home + '/.config/conn' self.defaultdir = defaultdir Path(defaultdir).mkdir(parents=True, exist_ok=True) Path(f"{defaultdir}/plugins").mkdir(parents=True, exist_ok=True) pathfile = defaultdir + '/.folder' try: with open(pathfile, "r") as f: configdir = f.read().strip() except: with open(pathfile, "w") as f: f.write(str(defaultdir)) configdir = defaultdir defaultfile = configdir + '/config.json' defaultkey = configdir + '/.osk' if conf == None: self.file = defaultfile else: self.file = conf if key == None: self.key = defaultkey else: self.key = key if os.path.exists(self.file): config = self._loadconfig(self.file) else: config = self._createconfig(self.file) self.config = config["config"] self.connections = config["connections"] self.profiles = config["profiles"] if not os.path.exists(self.key): self._createkey(self.key) with open(self.key) as f: self.privatekey = RSA.import_key(f.read()) f.close() self.publickey = self.privatekey.publickey() def _loadconfig(self, conf): #Loads config file jsonconf = open(conf) jsondata = json.load(jsonconf) jsonconf.close() return jsondata def _createconfig(self, conf): #Create config file defaultconfig = {'config': {'case': False, 'idletime': 30, 'fzf': False}, 'connections': {}, 'profiles': { "default": { "host":"", "protocol":"ssh", "port":"", "user":"", "password":"", "options":"", "logs":"", "tags": "", "jumphost":""}}} if not os.path.exists(conf): with open(conf, "w") as f: json.dump(defaultconfig, f, indent = 4) f.close() os.chmod(conf, 0o600) jsonconf = open(conf) jsondata = json.load(jsonconf) jsonconf.close() return jsondata @MethodHook def _saveconfig(self, conf): #Save config file newconfig = {"config":{}, "connections": {}, "profiles": {}} newconfig["config"] = self.config newconfig["connections"] = self.connections newconfig["profiles"] = self.profiles try: with open(conf, "w") as f: json.dump(newconfig, f, indent = 4) f.close() except: return 1 return 0 def _createkey(self, keyfile): #Create key file key = RSA.generate(2048) with open(keyfile,'wb') as f: f.write(key.export_key('PEM')) f.close() os.chmod(keyfile, 0o600) return key @MethodHook def _explode_unique(self, unique): #Divide unique name into folder, subfolder and id uniques = unique.split("@") if not unique.startswith("@"): result = {"id": uniques[0]} else: result = {} if len(uniques) == 2: result["folder"] = uniques[1] if result["folder"] == "": return False elif len(uniques) == 3: result["folder"] = uniques[2] result["subfolder"] = uniques[1] if result["folder"] == "" or result["subfolder"] == "": return False elif len(uniques) > 3: return False return result @MethodHook def getitem(self, unique, keys = None): ''' Get an node or a group of nodes from configfile which can be passed to node/nodes class ### Parameters: - unique (str): Unique name of the node or folder in config using connection manager style: node[@subfolder][@folder] or [@subfolder]@folder ### Optional Parameters: - keys (list): In case you pass a folder as unique, you can filter nodes inside the folder passing a list. ### Returns: dict: Dictionary containing information of node or multiple dictionaries of multiple nodes. ''' uniques = self._explode_unique(unique) if unique.startswith("@"): if uniques.keys() >= {"folder", "subfolder"}: folder = self.connections[uniques["folder"]][uniques["subfolder"]] else: folder = self.connections[uniques["folder"]] newfolder = deepcopy(folder) newfolder.pop("type") for node in folder.keys(): if node == "type": continue if "type" in newfolder[node].keys(): if newfolder[node]["type"] == "subfolder": newfolder.pop(node) else: newfolder[node].pop("type") if keys == None: newfolder = {"{}{}".format(k,unique):v for k,v in newfolder.items()} return newfolder else: f_newfolder = dict((k, newfolder[k]) for k in keys) f_newfolder = {"{}{}".format(k,unique):v for k,v in f_newfolder.items()} return f_newfolder else: if uniques.keys() >= {"folder", "subfolder"}: node = self.connections[uniques["folder"]][uniques["subfolder"]][uniques["id"]] elif "folder" in uniques.keys(): node = self.connections[uniques["folder"]][uniques["id"]] else: node = self.connections[uniques["id"]] newnode = deepcopy(node) newnode.pop("type") return newnode @MethodHook def getitems(self, uniques): ''' Get a group of nodes from configfile which can be passed to node/nodes class ### Parameters: - uniques (str/list): String name that will match hostnames from the connection manager. It can be a list of strings. ### Returns: dict: Dictionary containing information of node or multiple dictionaries of multiple nodes. ''' nodes = {} if isinstance(uniques, str): uniques = [uniques] for i in uniques: if isinstance(i, dict): name = list(i.keys())[0] mylist = i[name] if not self.config["case"]: name = name.lower() mylist = [item.lower() for item in mylist] this = self.getitem(name, mylist) nodes.update(this) elif i.startswith("@"): if not self.config["case"]: i = i.lower() this = self.getitem(i) nodes.update(this) else: if not self.config["case"]: i = i.lower() this = self.getitem(i) nodes[i] = this return nodes @MethodHook def _connections_add(self,*, id, host, folder='', subfolder='', options='', logs='', password='', port='', protocol='', user='', tags='', jumphost='', type = "connection" ): #Add connection from config if folder == '': self.connections[id] = {"host": host, "options": options, "logs": logs, "password": password, "port": port, "protocol": protocol, "user": user, "tags": tags,"jumphost": jumphost,"type": type} elif folder != '' and subfolder == '': self.connections[folder][id] = {"host": host, "options": options, "logs": logs, "password": password, "port": port, "protocol": protocol, "user": user, "tags": tags, "jumphost": jumphost, "type": type} elif folder != '' and subfolder != '': self.connections[folder][subfolder][id] = {"host": host, "options": options, "logs": logs, "password": password, "port": port, "protocol": protocol, "user": user, "tags": tags, "jumphost": jumphost, "type": type} @MethodHook def _connections_del(self,*, id, folder='', subfolder=''): #Delete connection from config if folder == '': del self.connections[id] elif folder != '' and subfolder == '': del self.connections[folder][id] elif folder != '' and subfolder != '': del self.connections[folder][subfolder][id] @MethodHook def _folder_add(self,*, folder, subfolder = ''): #Add Folder from config if subfolder == '': if folder not in self.connections: self.connections[folder] = {"type": "folder"} else: if subfolder not in self.connections[folder]: self.connections[folder][subfolder] = {"type": "subfolder"} @MethodHook def _folder_del(self,*, folder, subfolder=''): #Delete folder from config if subfolder == '': del self.connections[folder] else: del self.connections[folder][subfolder] @MethodHook def _profiles_add(self,*, id, host = '', options='', logs='', password='', port='', protocol='', user='', tags='', jumphost='' ): #Add profile from config self.profiles[id] = {"host": host, "options": options, "logs": logs, "password": password, "port": port, "protocol": protocol, "user": user, "tags": tags, "jumphost": jumphost} @MethodHook def _profiles_del(self,*, id ): #Delete profile from config del self.profiles[id] @MethodHook def _getallnodes(self, filter = None): #get all nodes on configfile nodes = [] layer1 = [k for k,v in self.connections.items() if isinstance(v, dict) and v["type"] == "connection"] folders = [k for k,v in self.connections.items() if isinstance(v, dict) and v["type"] == "folder"] nodes.extend(layer1) for f in folders: layer2 = [k + "@" + f for k,v in self.connections[f].items() if isinstance(v, dict) and v["type"] == "connection"] nodes.extend(layer2) subfolders = [k for k,v in self.connections[f].items() if isinstance(v, dict) and v["type"] == "subfolder"] for s in subfolders: layer3 = [k + "@" + s + "@" + f for k,v in self.connections[f][s].items() if isinstance(v, dict) and v["type"] == "connection"] nodes.extend(layer3) if filter: if isinstance(filter, str): nodes = [item for item in nodes if re.search(filter, item)] elif isinstance(filter, list): nodes = [item for item in nodes if any(re.search(pattern, item) for pattern in filter)] else: raise ValueError("filter must be a string or a list of strings") return nodes @MethodHook def _getallnodesfull(self, filter = None, extract = True): #get all nodes on configfile with all their attributes. nodes = {} layer1 = {k:v for k,v in self.connections.items() if isinstance(v, dict) and v["type"] == "connection"} folders = [k for k,v in self.connections.items() if isinstance(v, dict) and v["type"] == "folder"] nodes.update(layer1) for f in folders: layer2 = {k + "@" + f:v for k,v in self.connections[f].items() if isinstance(v, dict) and v["type"] == "connection"} nodes.update(layer2) subfolders = [k for k,v in self.connections[f].items() if isinstance(v, dict) and v["type"] == "subfolder"] for s in subfolders: layer3 = {k + "@" + s + "@" + f:v for k,v in self.connections[f][s].items() if isinstance(v, dict) and v["type"] == "connection"} nodes.update(layer3) if filter: if isinstance(filter, str): filter = "^(?!.*@).+$" if filter == "@" else filter nodes = {k: v for k, v in nodes.items() if re.search(filter, k)} elif isinstance(filter, list): filter = ["^(?!.*@).+$" if item == "@" else item for item in filter] nodes = {k: v for k, v in nodes.items() if any(re.search(pattern, k) for pattern in filter)} else: raise ValueError("filter must be a string or a list of strings") if extract: for node, keys in nodes.items(): for key, value in keys.items(): profile = re.search("^@(.*)", str(value)) if profile: try: nodes[node][key] = self.profiles[profile.group(1)][key] except: nodes[node][key] = "" elif value == '' and key == "protocol": try: nodes[node][key] = config.profiles["default"][key] except: nodes[node][key] = "ssh" return nodes @MethodHook def _getallfolders(self): #get all folders on configfile folders = ["@" + k for k,v in self.connections.items() if isinstance(v, dict) and v["type"] == "folder"] subfolders = [] for f in folders: s = ["@" + k + f for k,v in self.connections[f[1:]].items() if isinstance(v, dict) and v["type"] == "subfolder"] subfolders.extend(s) folders.extend(subfolders) return folders @MethodHook def _profileused(self, profile): #Check if profile is used before deleting it nodes = [] layer1 = [k for k,v in self.connections.items() if isinstance(v, dict) and v["type"] == "connection" and ("@" + profile in v.values() or ( isinstance(v["password"],list) and "@" + profile in v["password"]))] folders = [k for k,v in self.connections.items() if isinstance(v, dict) and v["type"] == "folder"] nodes.extend(layer1) for f in folders: layer2 = [k + "@" + f for k,v in self.connections[f].items() if isinstance(v, dict) and v["type"] == "connection" and ("@" + profile in v.values() or ( isinstance(v["password"],list) and "@" + profile in v["password"]))] nodes.extend(layer2) subfolders = [k for k,v in self.connections[f].items() if isinstance(v, dict) and v["type"] == "subfolder"] for s in subfolders: layer3 = [k + "@" + s + "@" + f for k,v in self.connections[f][s].items() if isinstance(v, dict) and v["type"] == "connection" and ("@" + profile in v.values() or ( isinstance(v["password"],list) and "@" + profile in v["password"]))] nodes.extend(layer3) return nodes @MethodHook def encrypt(self, password, keyfile=None): ''' Encrypts password using RSA keyfile ### Parameters: - password (str): Plaintext password to encrypt. ### Optional Parameters: - keyfile (str): Path/file to keyfile. Default is config keyfile. ### Returns: str: Encrypted password. ''' if keyfile is None: keyfile = self.key with open(keyfile) as f: key = RSA.import_key(f.read()) f.close() publickey = key.publickey() encryptor = PKCS1_OAEP.new(publickey) password = encryptor.encrypt(password.encode("utf-8")) return str(password)
This class generates a configfile object. Containts a dictionary storing, config, nodes and profiles, normaly used by connection manager.
Attributes:
- file (str): Path/file to config file. - key (str): Path/file to RSA key file. - config (dict): Dictionary containing information of connection manager configuration. - connections (dict): Dictionary containing all the nodes added to connection manager. - profiles (dict): Dictionary containing all the profiles added to connection manager. - privatekey (obj): Object containing the private key to encrypt passwords. - publickey (obj): Object containing the public key to decrypt passwords.
Optional Parameters:
- conf (str): Path/file to config file. If left empty default path is ~/.config/conn/config.json - key (str): Path/file to RSA key file. If left empty default path is ~/.config/conn/.osk
Methods
def encrypt(self, password, keyfile=None)
-
Expand source code
@MethodHook def encrypt(self, password, keyfile=None): ''' Encrypts password using RSA keyfile ### Parameters: - password (str): Plaintext password to encrypt. ### Optional Parameters: - keyfile (str): Path/file to keyfile. Default is config keyfile. ### Returns: str: Encrypted password. ''' if keyfile is None: keyfile = self.key with open(keyfile) as f: key = RSA.import_key(f.read()) f.close() publickey = key.publickey() encryptor = PKCS1_OAEP.new(publickey) password = encryptor.encrypt(password.encode("utf-8")) return str(password)
Encrypts password using RSA keyfile
Parameters:
- password (str): Plaintext password to encrypt.
Optional Parameters:
- keyfile (str): Path/file to keyfile. Default is config keyfile.
Returns:
str: Encrypted password.
def getitem(self, unique, keys=None)
-
Expand source code
@MethodHook def getitem(self, unique, keys = None): ''' Get an node or a group of nodes from configfile which can be passed to node/nodes class ### Parameters: - unique (str): Unique name of the node or folder in config using connection manager style: node[@subfolder][@folder] or [@subfolder]@folder ### Optional Parameters: - keys (list): In case you pass a folder as unique, you can filter nodes inside the folder passing a list. ### Returns: dict: Dictionary containing information of node or multiple dictionaries of multiple nodes. ''' uniques = self._explode_unique(unique) if unique.startswith("@"): if uniques.keys() >= {"folder", "subfolder"}: folder = self.connections[uniques["folder"]][uniques["subfolder"]] else: folder = self.connections[uniques["folder"]] newfolder = deepcopy(folder) newfolder.pop("type") for node in folder.keys(): if node == "type": continue if "type" in newfolder[node].keys(): if newfolder[node]["type"] == "subfolder": newfolder.pop(node) else: newfolder[node].pop("type") if keys == None: newfolder = {"{}{}".format(k,unique):v for k,v in newfolder.items()} return newfolder else: f_newfolder = dict((k, newfolder[k]) for k in keys) f_newfolder = {"{}{}".format(k,unique):v for k,v in f_newfolder.items()} return f_newfolder else: if uniques.keys() >= {"folder", "subfolder"}: node = self.connections[uniques["folder"]][uniques["subfolder"]][uniques["id"]] elif "folder" in uniques.keys(): node = self.connections[uniques["folder"]][uniques["id"]] else: node = self.connections[uniques["id"]] newnode = deepcopy(node) newnode.pop("type") return newnode
Get an node or a group of nodes from configfile which can be passed to node/nodes class
Parameters:
- unique (str): Unique name of the node or folder in config using connection manager style: node[@subfolder][@folder] or [@subfolder]@folder
Optional Parameters:
- keys (list): In case you pass a folder as unique, you can filter nodes inside the folder passing a list.
Returns:
dict: Dictionary containing information of node or multiple dictionaries of multiple nodes.
def getitems(self, uniques)
-
Expand source code
@MethodHook def getitems(self, uniques): ''' Get a group of nodes from configfile which can be passed to node/nodes class ### Parameters: - uniques (str/list): String name that will match hostnames from the connection manager. It can be a list of strings. ### Returns: dict: Dictionary containing information of node or multiple dictionaries of multiple nodes. ''' nodes = {} if isinstance(uniques, str): uniques = [uniques] for i in uniques: if isinstance(i, dict): name = list(i.keys())[0] mylist = i[name] if not self.config["case"]: name = name.lower() mylist = [item.lower() for item in mylist] this = self.getitem(name, mylist) nodes.update(this) elif i.startswith("@"): if not self.config["case"]: i = i.lower() this = self.getitem(i) nodes.update(this) else: if not self.config["case"]: i = i.lower() this = self.getitem(i) nodes[i] = this return nodes
Get a group of nodes from configfile which can be passed to node/nodes class
Parameters:
- uniques (str/list): String name that will match hostnames from the connection manager. It can be a list of strings.
Returns:
dict: Dictionary containing information of node or multiple dictionaries of multiple nodes.
class node (unique,
host,
options='',
logs='',
password='',
port='',
protocol='',
user='',
config='',
tags='',
jumphost='')-
Expand source code
@ClassHook class node: ''' This class generates a node object. Containts all the information and methods to connect and interact with a device using ssh or telnet. ### Attributes: - output (str): Output of the commands you ran with run or test method. - result(bool): True if expected value is found after running the commands using test method. - status (int): 0 if the method run or test run succesfully. 1 if connection failed. 2 if expect timeouts without prompt or EOF. ''' def __init__(self, unique, host, options='', logs='', password='', port='', protocol='', user='', config='', tags='', jumphost=''): ''' ### Parameters: - unique (str): Unique name to assign to the node. - host (str): IP address or hostname of the node. ### Optional Parameters: - options (str): Additional options to pass the ssh/telnet for connection. - logs (str): Path/file for storing the logs. You can use ${unique},${host}, ${port}, ${user}, ${protocol} as variables. - password (str): Encrypted or plaintext password. - port (str): Port to connect to node, default 22 for ssh and 23 for telnet. - protocol (str): Select ssh, telnet, kubectl or docker. Default is ssh. - user (str): Username to of the node. - config (obj): Pass the object created with class configfile with key for decryption and extra configuration if you are using connection manager. - tags (dict) : Tags useful for automation and personal porpuse like "os", "prompt" and "screenleght_command" - jumphost (str): Reference another node to be used as a jumphost ''' if config == '': self.idletime = 0 self.key = None else: self.idletime = config.config["idletime"] self.key = config.key self.unique = unique attr = {"host": host, "logs": logs, "options":options, "port": port, "protocol": protocol, "user": user, "tags": tags, "jumphost": jumphost} for key in attr: profile = re.search("^@(.*)", str(attr[key])) if profile and config != '': try: setattr(self,key,config.profiles[profile.group(1)][key]) except: setattr(self,key,"") elif attr[key] == '' and key == "protocol": try: setattr(self,key,config.profiles["default"][key]) except: setattr(self,key,"ssh") else: setattr(self,key,attr[key]) if isinstance(password,list): self.password = [] for i, s in enumerate(password): profile = re.search("^@(.*)", password[i]) if profile and config != '': self.password.append(config.profiles[profile.group(1)]["password"]) else: self.password = [password] if self.jumphost != "" and config != '': self.jumphost = config.getitem(self.jumphost) for key in self.jumphost: profile = re.search("^@(.*)", str(self.jumphost[key])) if profile: try: self.jumphost[key] = config.profiles[profile.group(1)][key] except: self.jumphost[key] = "" elif self.jumphost[key] == '' and key == "protocol": try: self.jumphost[key] = config.profiles["default"][key] except: self.jumphost[key] = "ssh" if isinstance(self.jumphost["password"],list): jumphost_password = [] for i, s in enumerate(self.jumphost["password"]): profile = re.search("^@(.*)", self.jumphost["password"][i]) if profile: jumphost_password.append(config.profiles[profile.group(1)]["password"]) self.jumphost["password"] = jumphost_password else: self.jumphost["password"] = [self.jumphost["password"]] if self.jumphost["password"] != [""]: self.password = self.jumphost["password"] + self.password if self.jumphost["protocol"] == "ssh": jumphost_cmd = self.jumphost["protocol"] + " -W %h:%p" if self.jumphost["port"] != '': jumphost_cmd = jumphost_cmd + " -p " + self.jumphost["port"] if self.jumphost["options"] != '': jumphost_cmd = jumphost_cmd + " " + self.jumphost["options"] if self.jumphost["user"] == '': jumphost_cmd = jumphost_cmd + " {}".format(self.jumphost["host"]) else: jumphost_cmd = jumphost_cmd + " {}".format("@".join([self.jumphost["user"],self.jumphost["host"]])) self.jumphost = f"-o ProxyCommand=\"{jumphost_cmd}\"" else: self.jumphost = "" @MethodHook def _passtx(self, passwords, *, keyfile=None): # decrypts passwords, used by other methdos. dpass = [] if keyfile is None: keyfile = self.key if keyfile is not None: with open(keyfile) as f: key = RSA.import_key(f.read()) decryptor = PKCS1_OAEP.new(key) for passwd in passwords: if not re.match('^b[\"\'].+[\"\']$', passwd): dpass.append(passwd) else: try: decrypted = decryptor.decrypt(ast.literal_eval(passwd)).decode("utf-8") dpass.append(decrypted) except: raise ValueError("Missing or corrupted key") return dpass @MethodHook def _logfile(self, logfile = None): # translate logs variables and generate logs path. if logfile == None: logfile = self.logs logfile = logfile.replace("${unique}", self.unique) logfile = logfile.replace("${host}", self.host) logfile = logfile.replace("${port}", self.port) logfile = logfile.replace("${user}", self.user) logfile = logfile.replace("${protocol}", self.protocol) now = datetime.datetime.now() dateconf = re.search(r'\$\{date \'(.*)\'}', logfile) if dateconf: logfile = re.sub(r'\$\{date (.*)}',now.strftime(dateconf.group(1)), logfile) return logfile @MethodHook def _logclean(self, logfile, var = False): #Remove special ascii characters and other stuff from logfile. if var == False: t = open(logfile, "r").read() else: t = logfile while t.find("\b") != -1: t = re.sub('[^\b]\b', '', t) t = t.replace("\n","",1) t = t.replace("\a","") t = t.replace('\n\n', '\n') t = re.sub(r'.\[K', '', t) ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/ ]*[@-~])') t = ansi_escape.sub('', t) t = t.lstrip(" \n\r") t = t.replace("\r","") t = t.replace("\x0E","") t = t.replace("\x0F","") if var == False: d = open(logfile, "w") d.write(t) d.close() return else: return t @MethodHook def _savelog(self): '''Save the log buffer to the file at regular intervals if there are changes.''' t = threading.current_thread() prev_size = 0 # Store the previous size of the buffer while getattr(t, "do_run", True): # Check if thread is signaled to stop current_size = self.mylog.tell() # Current size of the buffer # Only save if the buffer size has changed if current_size != prev_size: with open(self.logfile, "w") as f: # Use "w" to overwrite the file f.write(self._logclean(self.mylog.getvalue().decode(), True)) prev_size = current_size # Update the previous size sleep(5) @MethodHook def _filter(self, a): #Set time for last input when using interact self.lastinput = time() return a @MethodHook def _keepalive(self): #Send keepalive ctrl+e when idletime passed without new inputs on interact self.lastinput = time() t = threading.current_thread() while True: if time() - self.lastinput >= self.idletime: self.child.sendcontrol("e") self.lastinput = time() sleep(1) @MethodHook def interact(self, debug = False): ''' Allow user to interact with the node directly, mostly used by connection manager. ### Optional Parameters: - debug (bool): If True, display all the connecting information before interact. Default False. ''' connect = self._connect(debug = debug) if connect == True: size = re.search('columns=([0-9]+).*lines=([0-9]+)',str(os.get_terminal_size())) self.child.setwinsize(int(size.group(2)),int(size.group(1))) print("Connected to " + self.unique + " at " + self.host + (":" if self.port != '' else '') + self.port + " via: " + self.protocol) if 'logfile' in dir(self): # Initialize self.mylog if not 'mylog' in dir(self): self.mylog = io.BytesIO() self.child.logfile_read = self.mylog # Start the _savelog thread log_thread = threading.Thread(target=self._savelog) log_thread.daemon = True log_thread.start() if 'missingtext' in dir(self): print(self.child.after.decode(), end='') if self.idletime > 0: x = threading.Thread(target=self._keepalive) x.daemon = True x.start() if debug: print(self.mylog.getvalue().decode()) self.child.interact(input_filter=self._filter) if 'logfile' in dir(self): with open(self.logfile, "w") as f: f.write(self._logclean(self.mylog.getvalue().decode(), True)) else: print(connect) exit(1) @MethodHook def run(self, commands, vars = None,*, folder = '', prompt = r'>$|#$|\$$|>.$|#.$|\$.$', stdout = False, timeout = 10): ''' Run a command or list of commands on the node and return the output. ### Parameters: - commands (str/list): Commands to run on the node. Should be str or a list of str. You can use variables as {varname} and defining them in optional parameter vars. ### Optional Parameters: - vars (dict): Dictionary containing the definition of variables used in commands parameter. Keys: Variable names. Values: strings. ### Optional Named Parameters: - folder (str): Path where output log should be stored, leave empty to disable logging. - prompt (str): Prompt to be expected after a command is finished running. Usually linux uses ">" or EOF while routers use ">" or "#". The default value should work for most nodes. Change it if your connection need some special symbol. - stdout (bool):Set True to send the command output to stdout. default False. - timeout (int):Time in seconds for expect to wait for prompt/EOF. default 10. ### Returns: str: Output of the commands you ran on the node. ''' connect = self._connect(timeout = timeout) now = datetime.datetime.now().strftime('%Y-%m-%d_%H%M%S') if connect == True: # Attempt to set the terminal size try: self.child.setwinsize(65535, 65535) except Exception: try: self.child.setwinsize(10000, 10000) except Exception: pass if "prompt" in self.tags: prompt = self.tags["prompt"] expects = [prompt, pexpect.EOF, pexpect.TIMEOUT] output = '' status = '' if not isinstance(commands, list): commands = [commands] if "screen_length_command" in self.tags: commands.insert(0, self.tags["screen_length_command"]) self.mylog = io.BytesIO() self.child.logfile_read = self.mylog for c in commands: if vars is not None: c = c.format(**vars) result = self.child.expect(expects, timeout = timeout) self.child.sendline(c) if result == 2: break if not result == 2: result = self.child.expect(expects, timeout = timeout) self.child.close() output = self._logclean(self.mylog.getvalue().decode(), True) if stdout == True: print(output) if folder != '': with open(folder + "/" + self.unique + "_" + now + ".txt", "w") as f: f.write(output) f.close() self.output = output if result == 2: self.status = 2 else: self.status = 0 return output else: self.output = connect self.status = 1 if stdout == True: print(connect) if folder != '': with open(folder + "/" + self.unique + "_" + now + ".txt", "w") as f: f.write(connect) f.close() return connect @MethodHook def test(self, commands, expected, vars = None,*, prompt = r'>$|#$|\$$|>.$|#.$|\$.$', timeout = 10): ''' Run a command or list of commands on the node, then check if expected value appears on the output after the last command. ### Parameters: - commands (str/list): Commands to run on the node. Should be str or a list of str. You can use variables as {varname} and defining them in optional parameter vars. - expected (str) : Expected text to appear after running all the commands on the node.You can use variables as {varname} and defining them in optional parameter vars. ### Optional Parameters: - vars (dict): Dictionary containing the definition of variables used in commands and expected parameters. Keys: Variable names. Values: strings. ### Optional Named Parameters: - prompt (str): Prompt to be expected after a command is finished running. Usually linux uses ">" or EOF while routers use ">" or "#". The default value should work for most nodes. Change it if your connection need some special symbol. - timeout (int):Time in seconds for expect to wait for prompt/EOF. default 10. ### Returns: bool: true if expected value is found after running the commands false if prompt is found before. ''' connect = self._connect(timeout = timeout) if connect == True: # Attempt to set the terminal size try: self.child.setwinsize(65535, 65535) except Exception: try: self.child.setwinsize(10000, 10000) except Exception: pass if "prompt" in self.tags: prompt = self.tags["prompt"] expects = [prompt, pexpect.EOF, pexpect.TIMEOUT] output = '' if not isinstance(commands, list): commands = [commands] if not isinstance(expected, list): expected = [expected] if "screen_length_command" in self.tags: commands.insert(0, self.tags["screen_length_command"]) self.mylog = io.BytesIO() self.child.logfile_read = self.mylog for c in commands: if vars is not None: c = c.format(**vars) result = self.child.expect(expects, timeout = timeout) self.child.sendline(c) if result == 2: break if not result == 2: result = self.child.expect(expects, timeout = timeout) self.child.close() output = self._logclean(self.mylog.getvalue().decode(), True) self.output = output if result in [0, 1]: # lastcommand = commands[-1] # if vars is not None: # lastcommand = lastcommand.format(**vars) # last_command_index = output.rfind(lastcommand) # cleaned_output = output[last_command_index + len(lastcommand):].strip() self.result = {} for e in expected: if vars is not None: e = e.format(**vars) updatedprompt = re.sub(r'(?<!\\)\$', '', prompt) newpattern = f".*({updatedprompt}).*{e}.*" cleaned_output = output cleaned_output = re.sub(newpattern, '', cleaned_output) if e in cleaned_output: self.result[e] = True else: self.result[e]= False self.status = 0 return self.result if result == 2: self.result = None self.status = 2 return output else: self.result = None self.output = connect self.status = 1 return connect @MethodHook def _generate_ssh_sftp_cmd(self): cmd = self.protocol if self.idletime > 0: cmd += " -o ServerAliveInterval=" + str(self.idletime) if self.port: if self.protocol == "ssh": cmd += " -p " + self.port elif self.protocol == "sftp": cmd += " -P " + self.port if self.options: cmd += " " + self.options if self.jumphost: cmd += " " + self.jumphost user_host = f"{self.user}@{self.host}" if self.user else self.host cmd += f" {user_host}" return cmd @MethodHook def _generate_telnet_cmd(self): cmd = f"telnet {self.host}" if self.port: cmd += f" {self.port}" if self.options: cmd += f" {self.options}" return cmd @MethodHook def _generate_kube_cmd(self): cmd = f"kubectl exec {self.options} {self.host} -it --" kube_command = self.tags.get("kube_command", "/bin/bash") if isinstance(self.tags, dict) else "/bin/bash" cmd += f" {kube_command}" return cmd @MethodHook def _generate_docker_cmd(self): cmd = f"docker {self.options} exec -it {self.host}" docker_command = self.tags.get("docker_command", "/bin/bash") if isinstance(self.tags, dict) else "/bin/bash" cmd += f" {docker_command}" return cmd @MethodHook def _get_cmd(self): if self.protocol in ["ssh", "sftp"]: return self._generate_ssh_sftp_cmd() elif self.protocol == "telnet": return self._generate_telnet_cmd() elif self.protocol == "kubectl": return self._generate_kube_cmd() elif self.protocol == "docker": return self._generate_docker_cmd() else: raise ValueError(f"Invalid protocol: {self.protocol}") @MethodHook def _connect(self, debug=False, timeout=10, max_attempts=3): cmd = self._get_cmd() passwords = self._passtx(self.password) if self.password[0] else [] if self.logs != '': self.logfile = self._logfile() default_prompt = r'>$|#$|\$$|>.$|#.$|\$.$' prompt = self.tags.get("prompt", default_prompt) if isinstance(self.tags, dict) else default_prompt password_prompt = '[p|P]assword:|[u|U]sername:' if self.protocol != 'telnet' else '[p|P]assword:' expects = { "ssh": ['yes/no', 'refused', 'supported', 'Invalid|[u|U]sage: ssh', 'ssh-keygen.*\"', 'timeout|timed.out', 'unavailable', 'closed', password_prompt, prompt, 'suspend', pexpect.EOF, pexpect.TIMEOUT, "No route to host", "resolve hostname", "no matching", "[b|B]ad (owner|permissions)"], "sftp": ['yes/no', 'refused', 'supported', 'Invalid|[u|U]sage: sftp', 'ssh-keygen.*\"', 'timeout|timed.out', 'unavailable', 'closed', password_prompt, prompt, 'suspend', pexpect.EOF, pexpect.TIMEOUT, "No route to host", "resolve hostname", "no matching", "[b|B]ad (owner|permissions)"], "telnet": ['[u|U]sername:', 'refused', 'supported', 'invalid|unrecognized option', 'ssh-keygen.*\"', 'timeout|timed.out', 'unavailable', 'closed', password_prompt, prompt, 'suspend', pexpect.EOF, pexpect.TIMEOUT, "No route to host", "resolve hostname", "no matching", "[b|B]ad (owner|permissions)"], "kubectl": ['[u|U]sername:', '[r|R]efused', '[E|e]rror', 'DEPRECATED', pexpect.TIMEOUT, password_prompt, prompt, pexpect.EOF, "expired|invalid"], "docker": ['[u|U]sername:', 'Cannot', '[E|e]rror', 'failed', 'not a docker command', 'unknown', 'unable to resolve', pexpect.TIMEOUT, password_prompt, prompt, pexpect.EOF] } error_indices = { "ssh": [1, 2, 3, 4, 5, 6, 7, 12, 13, 14, 15, 16], "sftp": [1, 2, 3, 4, 5, 6, 7, 12, 13, 14, 15, 16], "telnet": [1, 2, 3, 4, 5, 6, 7, 12, 13, 14, 15, 16], "kubectl": [1, 2, 3, 4, 8], # Define error indices for kube "docker": [1, 2, 3, 4, 5, 6, 7] # Define error indices for docker } eof_indices = { "ssh": [8, 9, 10, 11], "sftp": [8, 9, 10, 11], "telnet": [8, 9, 10, 11], "kubectl": [5, 6, 7], # Define eof indices for kube "docker": [8, 9, 10] # Define eof indices for docker } initial_indices = { "ssh": [0], "sftp": [0], "telnet": [0], "kubectl": [0], # Define special indices for kube "docker": [0] # Define special indices for docker } attempts = 1 while attempts <= max_attempts: child = pexpect.spawn(cmd) if isinstance(self.tags, dict) and self.tags.get("console"): child.sendline() if debug: print(cmd) self.mylog = io.BytesIO() child.logfile_read = self.mylog endloop = False for i in range(len(passwords) if passwords else 1): while True: results = child.expect(expects[self.protocol], timeout=timeout) results_value = expects[self.protocol][results] if results in initial_indices[self.protocol]: if self.protocol in ["ssh", "sftp"]: child.sendline('yes') elif self.protocol in ["telnet", "kubectl", "docker"]: if self.user: child.sendline(self.user) else: self.missingtext = True break elif results in error_indices[self.protocol]: child.terminate() if results_value == pexpect.TIMEOUT and attempts != max_attempts: attempts += 1 endloop = True break else: after = "Connection timeout" if results_value == pexpect.TIMEOUT else child.after.decode() return f"Connection failed code: {results}\n{child.before.decode().lstrip()}{after}{child.readline().decode()}".rstrip() elif results in eof_indices[self.protocol]: if results_value == password_prompt: if passwords: child.sendline(passwords[i]) else: self.missingtext = True break elif results_value == "suspend": child.sendline("\r") sleep(2) else: endloop = True child.sendline() break if endloop: break if results_value == pexpect.TIMEOUT: continue else: break if isinstance(self.tags, dict) and self.tags.get("post_connect_commands"): cmds = self.tags.get("post_connect_commands") commands = [cmds] if isinstance(cmds, str) else cmds for command in commands: child.sendline(command) sleep(1) child.readline(0) self.child = child return True
This class generates a node object. Containts all the information and methods to connect and interact with a device using ssh or telnet.
Attributes:
- output (str): Output of the commands you ran with run or test method. - result(bool): True if expected value is found after running the commands using test method. - status (int): 0 if the method run or test run succesfully. 1 if connection failed. 2 if expect timeouts without prompt or EOF.
Parameters:
- unique (str): Unique name to assign to the node. - host (str): IP address or hostname of the node.
Optional Parameters:
- options (str): Additional options to pass the ssh/telnet for connection. - logs (str): Path/file for storing the logs. You can use ${unique},${host}, ${port}, ${user}, ${protocol} as variables. - password (str): Encrypted or plaintext password. - port (str): Port to connect to node, default 22 for ssh and 23 for telnet. - protocol (str): Select ssh, telnet, kubectl or docker. Default is ssh. - user (str): Username to of the node. - config (obj): Pass the object created with class configfile with key for decryption and extra configuration if you are using connection manager. - tags (dict) : Tags useful for automation and personal porpuse like "os", "prompt" and "screenleght_command" - jumphost (str): Reference another node to be used as a jumphost
Methods
def interact(self, debug=False)
-
Expand source code
@MethodHook def interact(self, debug = False): ''' Allow user to interact with the node directly, mostly used by connection manager. ### Optional Parameters: - debug (bool): If True, display all the connecting information before interact. Default False. ''' connect = self._connect(debug = debug) if connect == True: size = re.search('columns=([0-9]+).*lines=([0-9]+)',str(os.get_terminal_size())) self.child.setwinsize(int(size.group(2)),int(size.group(1))) print("Connected to " + self.unique + " at " + self.host + (":" if self.port != '' else '') + self.port + " via: " + self.protocol) if 'logfile' in dir(self): # Initialize self.mylog if not 'mylog' in dir(self): self.mylog = io.BytesIO() self.child.logfile_read = self.mylog # Start the _savelog thread log_thread = threading.Thread(target=self._savelog) log_thread.daemon = True log_thread.start() if 'missingtext' in dir(self): print(self.child.after.decode(), end='') if self.idletime > 0: x = threading.Thread(target=self._keepalive) x.daemon = True x.start() if debug: print(self.mylog.getvalue().decode()) self.child.interact(input_filter=self._filter) if 'logfile' in dir(self): with open(self.logfile, "w") as f: f.write(self._logclean(self.mylog.getvalue().decode(), True)) else: print(connect) exit(1)
Allow user to interact with the node directly, mostly used by connection manager.
Optional Parameters:
- debug (bool): If True, display all the connecting information before interact. Default False.
def run(self,
commands,
vars=None,
*,
folder='',
prompt='>$|#$|\\$$|>.$|#.$|\\$.$',
stdout=False,
timeout=10)-
Expand source code
@MethodHook def run(self, commands, vars = None,*, folder = '', prompt = r'>$|#$|\$$|>.$|#.$|\$.$', stdout = False, timeout = 10): ''' Run a command or list of commands on the node and return the output. ### Parameters: - commands (str/list): Commands to run on the node. Should be str or a list of str. You can use variables as {varname} and defining them in optional parameter vars. ### Optional Parameters: - vars (dict): Dictionary containing the definition of variables used in commands parameter. Keys: Variable names. Values: strings. ### Optional Named Parameters: - folder (str): Path where output log should be stored, leave empty to disable logging. - prompt (str): Prompt to be expected after a command is finished running. Usually linux uses ">" or EOF while routers use ">" or "#". The default value should work for most nodes. Change it if your connection need some special symbol. - stdout (bool):Set True to send the command output to stdout. default False. - timeout (int):Time in seconds for expect to wait for prompt/EOF. default 10. ### Returns: str: Output of the commands you ran on the node. ''' connect = self._connect(timeout = timeout) now = datetime.datetime.now().strftime('%Y-%m-%d_%H%M%S') if connect == True: # Attempt to set the terminal size try: self.child.setwinsize(65535, 65535) except Exception: try: self.child.setwinsize(10000, 10000) except Exception: pass if "prompt" in self.tags: prompt = self.tags["prompt"] expects = [prompt, pexpect.EOF, pexpect.TIMEOUT] output = '' status = '' if not isinstance(commands, list): commands = [commands] if "screen_length_command" in self.tags: commands.insert(0, self.tags["screen_length_command"]) self.mylog = io.BytesIO() self.child.logfile_read = self.mylog for c in commands: if vars is not None: c = c.format(**vars) result = self.child.expect(expects, timeout = timeout) self.child.sendline(c) if result == 2: break if not result == 2: result = self.child.expect(expects, timeout = timeout) self.child.close() output = self._logclean(self.mylog.getvalue().decode(), True) if stdout == True: print(output) if folder != '': with open(folder + "/" + self.unique + "_" + now + ".txt", "w") as f: f.write(output) f.close() self.output = output if result == 2: self.status = 2 else: self.status = 0 return output else: self.output = connect self.status = 1 if stdout == True: print(connect) if folder != '': with open(folder + "/" + self.unique + "_" + now + ".txt", "w") as f: f.write(connect) f.close() return connect
Run a command or list of commands on the node and return the output.
Parameters:
- commands (str/list): Commands to run on the node. Should be str or a list of str. You can use variables as {varname} and defining them in optional parameter vars.
Optional Parameters:
- vars (dict): Dictionary containing the definition of variables used in commands parameter. Keys: Variable names. Values: strings.
Optional Named Parameters:
- folder (str): Path where output log should be stored, leave empty to disable logging. - prompt (str): Prompt to be expected after a command is finished running. Usually linux uses ">" or EOF while routers use ">" or "#". The default value should work for most nodes. Change it if your connection need some special symbol. - stdout (bool):Set True to send the command output to stdout. default False. - timeout (int):Time in seconds for expect to wait for prompt/EOF. default 10.
Returns:
str: Output of the commands you ran on the node.
def test(self,
commands,
expected,
vars=None,
*,
prompt='>$|#$|\\$$|>.$|#.$|\\$.$',
timeout=10)-
Expand source code
@MethodHook def test(self, commands, expected, vars = None,*, prompt = r'>$|#$|\$$|>.$|#.$|\$.$', timeout = 10): ''' Run a command or list of commands on the node, then check if expected value appears on the output after the last command. ### Parameters: - commands (str/list): Commands to run on the node. Should be str or a list of str. You can use variables as {varname} and defining them in optional parameter vars. - expected (str) : Expected text to appear after running all the commands on the node.You can use variables as {varname} and defining them in optional parameter vars. ### Optional Parameters: - vars (dict): Dictionary containing the definition of variables used in commands and expected parameters. Keys: Variable names. Values: strings. ### Optional Named Parameters: - prompt (str): Prompt to be expected after a command is finished running. Usually linux uses ">" or EOF while routers use ">" or "#". The default value should work for most nodes. Change it if your connection need some special symbol. - timeout (int):Time in seconds for expect to wait for prompt/EOF. default 10. ### Returns: bool: true if expected value is found after running the commands false if prompt is found before. ''' connect = self._connect(timeout = timeout) if connect == True: # Attempt to set the terminal size try: self.child.setwinsize(65535, 65535) except Exception: try: self.child.setwinsize(10000, 10000) except Exception: pass if "prompt" in self.tags: prompt = self.tags["prompt"] expects = [prompt, pexpect.EOF, pexpect.TIMEOUT] output = '' if not isinstance(commands, list): commands = [commands] if not isinstance(expected, list): expected = [expected] if "screen_length_command" in self.tags: commands.insert(0, self.tags["screen_length_command"]) self.mylog = io.BytesIO() self.child.logfile_read = self.mylog for c in commands: if vars is not None: c = c.format(**vars) result = self.child.expect(expects, timeout = timeout) self.child.sendline(c) if result == 2: break if not result == 2: result = self.child.expect(expects, timeout = timeout) self.child.close() output = self._logclean(self.mylog.getvalue().decode(), True) self.output = output if result in [0, 1]: # lastcommand = commands[-1] # if vars is not None: # lastcommand = lastcommand.format(**vars) # last_command_index = output.rfind(lastcommand) # cleaned_output = output[last_command_index + len(lastcommand):].strip() self.result = {} for e in expected: if vars is not None: e = e.format(**vars) updatedprompt = re.sub(r'(?<!\\)\$', '', prompt) newpattern = f".*({updatedprompt}).*{e}.*" cleaned_output = output cleaned_output = re.sub(newpattern, '', cleaned_output) if e in cleaned_output: self.result[e] = True else: self.result[e]= False self.status = 0 return self.result if result == 2: self.result = None self.status = 2 return output else: self.result = None self.output = connect self.status = 1 return connect
Run a command or list of commands on the node, then check if expected value appears on the output after the last command.
Parameters:
- commands (str/list): Commands to run on the node. Should be str or a list of str. You can use variables as {varname} and defining them in optional parameter vars. - expected (str) : Expected text to appear after running all the commands on the node.You can use variables as {varname} and defining them in optional parameter vars.
Optional Parameters:
- vars (dict): Dictionary containing the definition of variables used in commands and expected parameters. Keys: Variable names. Values: strings.
Optional Named Parameters:
- prompt (str): Prompt to be expected after a command is finished running. Usually linux uses ">" or EOF while routers use ">" or "#". The default value should work for most nodes. Change it if your connection need some special symbol. - timeout (int):Time in seconds for expect to wait for prompt/EOF. default 10.
Returns:
bool: true if expected value is found after running the commands false if prompt is found before.
class nodes (nodes: dict, config='')
-
Expand source code
@ClassHook class nodes: ''' This class generates a nodes object. Contains a list of node class objects and methods to run multiple tasks on nodes simultaneously. ### Attributes: - nodelist (list): List of node class objects passed to the init function. - output (dict): Dictionary formed by nodes unique as keys, output of the commands you ran on the node as value. Created after running methods run or test. - result (dict): Dictionary formed by nodes unique as keys, value is True if expected value is found after running the commands, False if prompt is found before. Created after running method test. - status (dict): Dictionary formed by nodes unique as keys, value: 0 if method run or test ended succesfully. 1 if connection failed. 2 if expect timeouts without prompt or EOF. - <unique> (obj): For each item in nodelist, there is an attribute generated with the node unique. ''' def __init__(self, nodes: dict, config = ''): ''' ### Parameters: - nodes (dict): Dictionary formed by node information: Keys: Unique name for each node. Mandatory Subkeys: host(str). Optional Subkeys: options(str), logs(str), password(str), port(str), protocol(str), user(str). For reference on subkeys check node class. ### Optional Parameters: - config (obj): Pass the object created with class configfile with key for decryption and extra configuration if you are using connection manager. ''' self.nodelist = [] self.config = config for n in nodes: this = node(n, **nodes[n], config = config) self.nodelist.append(this) setattr(self,n,this) @MethodHook def _splitlist(self, lst, n): #split a list in lists of n members. for i in range(0, len(lst), n): yield lst[i:i + n] @MethodHook def run(self, commands, vars = None,*, folder = None, prompt = None, stdout = None, parallel = 10, timeout = None): ''' Run a command or list of commands on all the nodes in nodelist. ### Parameters: - commands (str/list): Commands to run on the nodes. Should be str or list of str. You can use variables as {varname} and defining them in optional parameter vars. ### Optional Parameters: - vars (dict): Dictionary containing the definition of variables for each node, used in commands parameter. Keys should be formed by nodes unique names. Use special key name __global__ for global variables. Subkeys: Variable names. Values: strings. ### Optional Named Parameters: - folder (str): Path where output log should be stored, leave empty to disable logging. - prompt (str): Prompt to be expected after a command is finished running. Usually linux uses ">" or EOF while routers use ">" or "#". The default value should work for most nodes. Change it if your connection need some special symbol. - stdout (bool): Set True to send the command output to stdout. Default False. - parallel (int): Number of nodes to run the commands simultaneously. Default is 10, if there are more nodes that this value, nodes are groups in groups with max this number of members. - timeout (int): Time in seconds for expect to wait for prompt/EOF. default 10. ###Returns: dict: Dictionary formed by nodes unique as keys, Output of the commands you ran on the node as value. ''' args = {} nodesargs = {} args["commands"] = commands if folder != None: args["folder"] = folder Path(folder).mkdir(parents=True, exist_ok=True) if prompt != None: args["prompt"] = prompt if stdout != None: args["stdout"] = stdout if timeout != None: args["timeout"] = timeout output = {} status = {} tasks = [] for n in self.nodelist: nodesargs[n.unique] = deepcopy(args) if vars != None: nodesargs[n.unique]["vars"] = {} if "__global__" in vars.keys(): nodesargs[n.unique]["vars"].update(vars["__global__"]) if n.unique in vars.keys(): nodesargs[n.unique]["vars"].update(vars[n.unique]) tasks.append(threading.Thread(target=n.run, kwargs=nodesargs[n.unique])) taskslist = list(self._splitlist(tasks, parallel)) for t in taskslist: for i in t: i.start() for i in t: i.join() for i in self.nodelist: output[i.unique] = i.output status[i.unique] = i.status self.output = output self.status = status return output @MethodHook def test(self, commands, expected, vars = None,*, prompt = None, parallel = 10, timeout = None): ''' Run a command or list of commands on all the nodes in nodelist, then check if expected value appears on the output after the last command. ### Parameters: - commands (str/list): Commands to run on the node. Should be str or list of str. - expected (str) : Expected text to appear after running all the commands on the node. ### Optional Parameters: - vars (dict): Dictionary containing the definition of variables for each node, used in commands and expected parameters. Keys should be formed by nodes unique names. Use special key name __global__ for global variables. Subkeys: Variable names. Values: strings. ### Optional Named Parameters: - prompt (str): Prompt to be expected after a command is finished running. Usually linux uses ">" or EOF while routers use ">" or "#". The default value should work for most nodes. Change it if your connection need some special symbol. - parallel (int): Number of nodes to run the commands simultaneously. Default is 10, if there are more nodes that this value, nodes are groups in groups with max this number of members. - timeout (int): Time in seconds for expect to wait for prompt/EOF. default 10. ### Returns: dict: Dictionary formed by nodes unique as keys, value is True if expected value is found after running the commands, False if prompt is found before. ''' args = {} nodesargs = {} args["commands"] = commands args["expected"] = expected if prompt != None: args["prompt"] = prompt if timeout != None: args["timeout"] = timeout output = {} result = {} status = {} tasks = [] for n in self.nodelist: nodesargs[n.unique] = deepcopy(args) if vars != None: nodesargs[n.unique]["vars"] = {} if "__global__" in vars.keys(): nodesargs[n.unique]["vars"].update(vars["__global__"]) if n.unique in vars.keys(): nodesargs[n.unique]["vars"].update(vars[n.unique]) tasks.append(threading.Thread(target=n.test, kwargs=nodesargs[n.unique])) taskslist = list(self._splitlist(tasks, parallel)) for t in taskslist: for i in t: i.start() for i in t: i.join() for i in self.nodelist: result[i.unique] = i.result output[i.unique] = i.output status[i.unique] = i.status self.output = output self.result = result self.status = status return result
This class generates a nodes object. Contains a list of node class objects and methods to run multiple tasks on nodes simultaneously.
Attributes:
- nodelist (list): List of node class objects passed to the init function. - output (dict): Dictionary formed by nodes unique as keys, output of the commands you ran on the node as value. Created after running methods run or test. - result (dict): Dictionary formed by nodes unique as keys, value is True if expected value is found after running the commands, False if prompt is found before. Created after running method test. - status (dict): Dictionary formed by nodes unique as keys, value: 0 if method run or test ended succesfully. 1 if connection failed. 2 if expect timeouts without prompt or EOF. - <unique> (obj): For each item in nodelist, there is an attribute generated with the node unique.
Parameters:
- nodes (dict): Dictionary formed by node information: Keys: Unique name for each node. Mandatory Subkeys: host(str). Optional Subkeys: options(str), logs(str), password(str), port(str), protocol(str), user(str). For reference on subkeys check node class.
Optional Parameters:
- config (obj): Pass the object created with class configfile with key for decryption and extra configuration if you are using connection manager.
Methods
def run(self,
commands,
vars=None,
*,
folder=None,
prompt=None,
stdout=None,
parallel=10,
timeout=None)-
Expand source code
@MethodHook def run(self, commands, vars = None,*, folder = None, prompt = None, stdout = None, parallel = 10, timeout = None): ''' Run a command or list of commands on all the nodes in nodelist. ### Parameters: - commands (str/list): Commands to run on the nodes. Should be str or list of str. You can use variables as {varname} and defining them in optional parameter vars. ### Optional Parameters: - vars (dict): Dictionary containing the definition of variables for each node, used in commands parameter. Keys should be formed by nodes unique names. Use special key name __global__ for global variables. Subkeys: Variable names. Values: strings. ### Optional Named Parameters: - folder (str): Path where output log should be stored, leave empty to disable logging. - prompt (str): Prompt to be expected after a command is finished running. Usually linux uses ">" or EOF while routers use ">" or "#". The default value should work for most nodes. Change it if your connection need some special symbol. - stdout (bool): Set True to send the command output to stdout. Default False. - parallel (int): Number of nodes to run the commands simultaneously. Default is 10, if there are more nodes that this value, nodes are groups in groups with max this number of members. - timeout (int): Time in seconds for expect to wait for prompt/EOF. default 10. ###Returns: dict: Dictionary formed by nodes unique as keys, Output of the commands you ran on the node as value. ''' args = {} nodesargs = {} args["commands"] = commands if folder != None: args["folder"] = folder Path(folder).mkdir(parents=True, exist_ok=True) if prompt != None: args["prompt"] = prompt if stdout != None: args["stdout"] = stdout if timeout != None: args["timeout"] = timeout output = {} status = {} tasks = [] for n in self.nodelist: nodesargs[n.unique] = deepcopy(args) if vars != None: nodesargs[n.unique]["vars"] = {} if "__global__" in vars.keys(): nodesargs[n.unique]["vars"].update(vars["__global__"]) if n.unique in vars.keys(): nodesargs[n.unique]["vars"].update(vars[n.unique]) tasks.append(threading.Thread(target=n.run, kwargs=nodesargs[n.unique])) taskslist = list(self._splitlist(tasks, parallel)) for t in taskslist: for i in t: i.start() for i in t: i.join() for i in self.nodelist: output[i.unique] = i.output status[i.unique] = i.status self.output = output self.status = status return output
Run a command or list of commands on all the nodes in nodelist.
Parameters:
- commands (str/list): Commands to run on the nodes. Should be str or list of str. You can use variables as {varname} and defining them in optional parameter vars.
Optional Parameters:
- vars (dict): Dictionary containing the definition of variables for each node, used in commands parameter. Keys should be formed by nodes unique names. Use special key name __global__ for global variables. Subkeys: Variable names. Values: strings.
Optional Named Parameters:
- folder (str): Path where output log should be stored, leave empty to disable logging. - prompt (str): Prompt to be expected after a command is finished running. Usually linux uses ">" or EOF while routers use ">" or "#". The default value should work for most nodes. Change it if your connection need some special symbol. - stdout (bool): Set True to send the command output to stdout. Default False. - parallel (int): Number of nodes to run the commands simultaneously. Default is 10, if there are more nodes that this value, nodes are groups in groups with max this number of members. - timeout (int): Time in seconds for expect to wait for prompt/EOF. default 10.
Returns:
dict: Dictionary formed by nodes unique as keys, Output of the commands you ran on the node as value.
def test(self, commands, expected, vars=None, *, prompt=None, parallel=10, timeout=None)
-
Expand source code
@MethodHook def test(self, commands, expected, vars = None,*, prompt = None, parallel = 10, timeout = None): ''' Run a command or list of commands on all the nodes in nodelist, then check if expected value appears on the output after the last command. ### Parameters: - commands (str/list): Commands to run on the node. Should be str or list of str. - expected (str) : Expected text to appear after running all the commands on the node. ### Optional Parameters: - vars (dict): Dictionary containing the definition of variables for each node, used in commands and expected parameters. Keys should be formed by nodes unique names. Use special key name __global__ for global variables. Subkeys: Variable names. Values: strings. ### Optional Named Parameters: - prompt (str): Prompt to be expected after a command is finished running. Usually linux uses ">" or EOF while routers use ">" or "#". The default value should work for most nodes. Change it if your connection need some special symbol. - parallel (int): Number of nodes to run the commands simultaneously. Default is 10, if there are more nodes that this value, nodes are groups in groups with max this number of members. - timeout (int): Time in seconds for expect to wait for prompt/EOF. default 10. ### Returns: dict: Dictionary formed by nodes unique as keys, value is True if expected value is found after running the commands, False if prompt is found before. ''' args = {} nodesargs = {} args["commands"] = commands args["expected"] = expected if prompt != None: args["prompt"] = prompt if timeout != None: args["timeout"] = timeout output = {} result = {} status = {} tasks = [] for n in self.nodelist: nodesargs[n.unique] = deepcopy(args) if vars != None: nodesargs[n.unique]["vars"] = {} if "__global__" in vars.keys(): nodesargs[n.unique]["vars"].update(vars["__global__"]) if n.unique in vars.keys(): nodesargs[n.unique]["vars"].update(vars[n.unique]) tasks.append(threading.Thread(target=n.test, kwargs=nodesargs[n.unique])) taskslist = list(self._splitlist(tasks, parallel)) for t in taskslist: for i in t: i.start() for i in t: i.join() for i in self.nodelist: result[i.unique] = i.result output[i.unique] = i.output status[i.unique] = i.status self.output = output self.result = result self.status = status return result
Run a command or list of commands on all the nodes in nodelist, then check if expected value appears on the output after the last command.
Parameters:
- commands (str/list): Commands to run on the node. Should be str or list of str. - expected (str) : Expected text to appear after running all the commands on the node.
Optional Parameters:
- vars (dict): Dictionary containing the definition of variables for each node, used in commands and expected parameters. Keys should be formed by nodes unique names. Use special key name __global__ for global variables. Subkeys: Variable names. Values: strings.
Optional Named Parameters:
- prompt (str): Prompt to be expected after a command is finished running. Usually linux uses ">" or EOF while routers use ">" or "#". The default value should work for most nodes. Change it if your connection need some special symbol. - parallel (int): Number of nodes to run the commands simultaneously. Default is 10, if there are more nodes that this value, nodes are groups in groups with max this number of members. - timeout (int): Time in seconds for expect to wait for prompt/EOF. default 10.
Returns:
dict: Dictionary formed by nodes unique as keys, value is True if expected value is found after running the commands, False if prompt is found before.