Package connpy

Connection manager

Connpy is a SSH, SFTP, Telnet, kubectl, and Docker pod connection manager and automation module for Linux, Mac, and Docker.

Features

- Manage connections using SSH, SFTP, Telnet, kubectl, and Docker exec.
- Set contexts to manage specific nodes from specific contexts (work/home/clients/etc).
- You can generate profiles and reference them from nodes using @profilename so you don't
  need to edit multiple nodes when changing passwords or other information.
- Nodes can be stored on @folder or @subfolder@folder to organize your devices. They can
  be referenced using node@subfolder@folder or node@folder.
- If you have too many nodes, get a completion script using: conn config --completion.
  Or use fzf by installing pyfzf and running conn config --fzf true.
- Create in bulk, copy, move, export, and import nodes for easy management.
- Run automation scripts on network devices.
- Use AI with a multi-agent system (Engineer/Architect) to help you manage your devices.
  Supports any LLM provider via litellm (OpenAI, Anthropic, Google, etc.).
- Add plugins with your own scripts.
- Much more!

Usage

usage: conn [-h] [--add | --del | --mod | --show | --debug] [node|folder] [--sftp]
       conn {profile,move,mv,copy,cp,list,ls,bulk,export,import,ai,run,api,plugin,config,sync,context} ...

positional arguments:
  node|folder        node[@subfolder][@folder]
                     Connect to specific node or show all matching nodes
                     [@subfolder][@folder]
                     Show all available connections globally or in specified path

options:
  -h, --help         show this help message and exit
  -v, --version      Show version
  -a, --add          Add new node[@subfolder][@folder] or [@subfolder]@folder
  -r, --del, --rm    Delete node[@subfolder][@folder] or [@subfolder]@folder
  -e, --mod, --edit  Modify node[@subfolder][@folder]
  -s, --show         Show node[@subfolder][@folder]
  -d, --debug        Display all conections steps
  -t, --sftp         Connects using sftp instead of ssh

Commands:
  profile         Manage profiles
  move(mv)        Move node
  copy(cp)        Copy node
  list(ls)        List profiles, nodes or folders
  bulk            Add nodes in bulk
  export          Export connection folder to Yaml file
  import          Import connection folder to config from Yaml file
  ai              Make request to an AI
  run             Run scripts or commands on nodes
  api             Start and stop connpy api
  plugin          Manage plugins
  config          Manage app config
  sync            Sync config with Google
  context         Manage contexts with regex matching

Manage profiles

usage: conn profile [-h] (--add | --del | --mod | --show) profile

positional arguments:
  profile        Name of profile to manage

options:
  -h, --help         show this help message and exit
  -a, --add          Add new profile
  -r, --del, --rm    Delete profile
  -e, --mod, --edit  Modify profile
  -s, --show         Show profile

Examples

   #Add new profile
   conn profile --add office-user
   #Add new folder
   conn --add @office
   #Add new subfolder
   conn --add @datacenter@office
   #Add node to subfolder
   conn --add server@datacenter@office
   #Add node to folder
   conn --add pc@office
   #Show node information
   conn --show server@datacenter@office
   #Connect to nodes
   conn pc@office
   conn server
   #Create and set new context
   conn context -a office .*@office
   conn context --set office
   #Run a command in a node
   conn run server ls -la

Plugin Requirements for Connpy

General Structure

  • The plugin script must be a Python file.
  • Only the following top-level elements are allowed in the plugin script:
  • Class definitions
  • Function definitions
  • Import statements
  • The if __name__ == "__main__": block for standalone execution
  • Pass statements

Specific Class Requirements

  • The plugin script must define specific classes with particular attributes and methods. Each class serves a distinct role within the plugin's architecture:
  • Class Parser:
    • Purpose: Handles parsing of command-line arguments.
    • Requirements:
    • Must contain only one method: __init__.
    • The __init__ method must initialize at least one attribute:
      • self.parser: An instance of argparse.ArgumentParser.
  • Class Entrypoint:
    • Purpose: Acts as the entry point for plugin execution, utilizing parsed arguments and integrating with the main application.
    • Requirements:
    • Must have an __init__ method that accepts exactly three parameters besides self:
      • args: Arguments passed to the plugin.
      • The parser instance (typically self.parser from the Parser class).
      • The Connapp instance to interact with the Connpy app.
  • Class Preload:
    • Purpose: Performs any necessary preliminary setup or configuration independent of the main parsing and entry logic.
  • Requirements:
    • Contains at least an __init__ method that accepts parameter connapp besides self.

Class Dependencies and Combinations

  • Dependencies:
  • Parser and Entrypoint are interdependent and must both be present if one is included.
  • Preload is independent and may exist alone or alongside the other classes.
  • Valid Combinations:
  • Parser and Entrypoint together.
  • Preload alone.
  • All three classes (Parser, Entrypoint, Preload).

Preload Modifications and Hooks

In the Preload class of the plugin system, you have the ability to customize the behavior of existing classes and methods within the application through a robust hooking system. This documentation explains how to use the modify, register_pre_hook, and register_post_hook methods to tailor plugin functionality to your needs.

Modifying Classes with modify

The modify method allows you to alter instances of a class at the time they are created or after their creation. This is particularly useful for setting or modifying configuration settings, altering default behaviors, or adding new functionalities to existing classes without changing the original class definitions.

  • Usage: Modify a class to include additional configurations or changes
  • Modify Method Signature:
  • modify(modification_method): A function that is invoked with an instance of the class as its argument. This function should perform any modifications directly on this instance.
  • Modification Method Signature:
  • Arguments:
    • cls: This function accepts a single argument, the class instance, which it then modifies.
  • Modifiable Classes:
    • connapp.config
    • connapp.node
    • connapp.nodes
    • connapp.ai
  • ```python def modify_config(cls): # Example modification: adding a new attribute or modifying an existing one cls.new_attribute = 'New Value'

    class Preload: def init(self, connapp): # Applying modification to the config class instance connapp.config.modify(modify_config) ```

Implementing Method Hooks

There are 2 methods that allows you to define custom logic to be executed before (register_pre_hook) or after (register_post_hook) the main logic of a method. This is particularly useful for logging, auditing, preprocessing inputs, postprocessing outputs or adding functionalities.

  • Usage: Register hooks to methods to execute additional logic before or after the main method execution.
  • Registration Methods Signature:
  • register_pre_hook(pre_hook_method): A function that is invoked before the main method is executed. This function should do preprocessing of the arguments.
  • register_post_hook(post_hook_method): A function that is invoked after the main method is executed. This function should do postprocessing of the outputs.
  • Method Signatures for Pre-Hooks
  • pre_hook_method(*args, **kwargs)
  • Arguments:
    • *args, **kwargs: The arguments and keyword arguments that will be passed to the method being hooked. The pre-hook function has the opportunity to inspect and modify these arguments before they are passed to the main method.
  • Return:
    • Must return a tuple (args, kwargs), which will be used as the new arguments for the main method. If the original arguments are not modified, the function should return them as received.
  • Method Signatures for Post-Hooks:
  • post_hook_method(*args, **kwargs)
  • Arguments:
    • *args, **kwargs: The arguments and keyword arguments that were passed to the main method.
    • kwargs["result"]: The value returned by the main method. This allows the post-hook to inspect and even alter the result before it is returned to the original caller.
  • Return:
    • Can return a modified result, which will replace the original result of the main method, or simply return kwargs["result"] to return the original method result.
  • ```python def pre_processing_hook(args, *kwargs): print("Pre-processing logic here") # Modify arguments or perform any checks return args, kwargs # Return modified or unmodified args and kwargs

    def post_processing_hook(args, *kwargs): print("Post-processing logic here") # Modify the result or perform any final logging or cleanup return kwargs["result"] # Return the modified or unmodified result

    class Preload: def init(self, connapp): # Registering a pre-hook connapp.ai.some_method.register_pre_hook(pre_processing_hook)

        # Registering a post-hook
        connapp.node.another_method.register_post_hook(post_processing_hook)
    

    ```

Executable Block

  • The plugin script can include an executable block:
  • if __name__ == "__main__":
  • This block allows the plugin to be run as a standalone script for testing or independent use.

Command Completion Support

Plugins can provide intelligent tab completion by defining a function called _connpy_completion in the plugin script. This function will be called by Connpy to assist with command-line completion when the user types partial input.

Function Signature

def _connpy_completion(wordsnumber, words, info=None):
    ...

Parameters

Parameter Description
wordsnumber Integer indicating the number of words (space-separated tokens) currently on the command line. For plugins, this typically starts at 3 (e.g., connpy <plugin> ...).
words A list of tokens (words) already typed. words[0] is always the name of the plugin, followed by any subcommands or arguments.
info A dictionary of structured context data provided by Connpy to help with suggestions.

Contents of info

The info dictionary contains helpful context to generate completions:

info = {
    "config": config_dict,     # The full loaded configuration
    "nodes": node_list,        # List of all known node names
    "folders": folder_list,    # List of all defined folder names
    "profiles": profile_list,  # List of all profile names
    "plugins": plugin_list     # List of all plugin names
}

You can use this data to generate suggestions based on the current input.

Return Value

The function must return a list of suggestion strings to be presented to the user.

Example

def _connpy_completion(wordsnumber, words, info=None):
    if wordsnumber == 3:
        return ["--help", "--verbose", "start", "stop"]

    elif wordsnumber == 4 and words[2] == "start":
        return info["nodes"]  # Suggest node names

    return []

In this example, if the user types connpy myplugin start and presses Tab, it will suggest node names.

Handling Unknown Arguments

Plugins can choose to accept and process unknown arguments that are not explicitly defined in the parser. To enable this behavior, the plugin must define the following hidden argument in its Parser class:

self.parser.add_argument(
    "--unknown-args",
    action="store_true",
    default=True,
    help=argparse.SUPPRESS
)

Behavior:

  • When this argument is present, Connpy will parse the known arguments and capture any extra (unknown) ones.
  • These unknown arguments will be passed to the plugin as args.unknown_args inside the Entrypoint.
  • If the user does not pass any unknown arguments, args.unknown_args will contain the default value (True, unless overridden).

Example:

If a plugin accepts unknown tcpdump flags like this:

connpy myplugin -nn -s0

And defines the hidden --unknown-args flag as shown above, then:

  • args.unknown_args inside Entrypoint.__init__() will be: ['-nn', '-s0']

This allows the plugin to receive and process arguments intended for external tools (e.g., tcpdump) without argparse raising an error.

Note:

If a plugin does not define --unknown-args, any extra arguments passed will cause argparse to fail with an unrecognized arguments error.

Script Verification

  • The verify_script method in plugins.py is used to check the plugin script's compliance with these standards.
  • Non-compliant scripts will be rejected to ensure consistency and proper functionality within the plugin system.

Example Script

For a practical example of how to write a compatible plugin script, please refer to the following example:

Example Plugin Script

This script demonstrates the required structure and implementation details according to the plugin system's standards.

http API

With the Connpy API you can run commands on devices using http requests

1. List Nodes

Endpoint: /list_nodes

Method: POST

Description: This route returns a list of nodes. It can also filter the list based on a given keyword.

Request Body:

{
  "filter": "<keyword>"
}
  • filter (optional): A keyword to filter the list of nodes. It returns only the nodes that contain the keyword. If not provided, the route will return the entire list of nodes.

Response:

  • A JSON array containing the filtered list of nodes.

2. Get Nodes

Endpoint: /get_nodes

Method: POST

Description: This route returns a dictionary of nodes with all their attributes. It can also filter the nodes based on a given keyword.

Request Body:

{
  "filter": "<keyword>"
}
  • filter (optional): A keyword to filter the nodes. It returns only the nodes that contain the keyword. If not provided, the route will return the entire list of nodes.

Response:

  • A JSON array containing the filtered nodes.

3. Run Commands

Endpoint: /run_commands

Method: POST

Description: This route runs commands on selected nodes based on the provided action, nodes, and commands. It also supports executing tests by providing expected results.

Request Body:

{
  "action": "<action>",
  "nodes": "<nodes>",
  "commands": "<commands>",
  "expected": "<expected>",
  "options": "<options>"
}
  • action (required): The action to be performed. Possible values: run or test.
  • nodes (required): A list of nodes or a single node on which the commands will be executed. The nodes can be specified as individual node names or a node group with the @ prefix. Node groups can also be specified as arrays with a list of nodes inside the group.
  • commands (required): A list of commands to be executed on the specified nodes.
  • expected (optional, only used when the action is test): A single expected result for the test.
  • options (optional): Array to pass options to the run command, options are: prompt, parallel, timeout

Response:

  • A JSON object with the results of the executed commands on the nodes.

4. Ask AI

Endpoint: /ask_ai

Method: POST

Description: This route sends to chatgpt IA a request that will parse it into an understandable output for the application and then run the request.

Request Body:

{
  "input": "<user input request>",
  "dryrun": true or false
}
  • input (required): The user input requesting the AI to perform an action on some devices or get the devices list.
  • dryrun (optional): If set to true, it will return the parameters to run the request but it won't run it. default is false.

Response:

  • A JSON array containing the action to run and the parameters and the result of the action.

Automation module

The automation module

Standalone module

import connpy
router = connpy.node("uniqueName","ip/host", user="user", password="pass")
router.run(["term len 0","show run"])
print(router.output)
hasip = router.test("show ip int brief","1.1.1.1")
if hasip:
    print("Router has ip 1.1.1.1")
else:
    print("router does not have ip 1.1.1.1")

Using manager configuration

import connpy
conf = connpy.configfile()
device = conf.getitem("server@office")
server = connpy.node("unique name", **device, config=conf)
result = server.run(["cd /", "ls -la"])
print(result)

Running parallel tasks

import connpy
conf = connpy.configfile()
#You can get the nodes from the config from a folder and fitlering in it
nodes = conf.getitem("@office", ["router1", "router2", "router3"])
#You can also get each node individually:
nodes = {}
nodes["router1"] = conf.getitem("router1@office")
nodes["router2"] = conf.getitem("router2@office")
nodes["router10"] = conf.getitem("router10@datacenter")
#Also, you can create the nodes manually:
nodes = {}
nodes["router1"] = {"host": "1.1.1.1", "user": "user", "password": "pass1"}
nodes["router2"] = {"host": "1.1.1.2", "user": "user", "password": "pass2"}
nodes["router3"] = {"host": "1.1.1.2", "user": "user", "password": "pass3"}
#Finally you run some tasks on the nodes
mynodes = connpy.nodes(nodes, config = conf)
result = mynodes.test(["show ip int br"], "1.1.1.2")
for i in result:
    print("---" + i + "---")
    print(result[i])
    print()
# Or for one specific node
mynodes.router1.run(["term len 0". "show run"], folder = "/home/user/logs")

Using variables

import connpy
config = connpy.configfile()
nodes = config.getitem("@office", ["router1", "router2", "router3"])
commands = []
commands.append("config t")
commands.append("interface lo {id}")
commands.append("ip add {ip} {mask}")
commands.append("end")
variables = {}
variables["router1@office"] = {"ip": "10.57.57.1"}
variables["router2@office"] = {"ip": "10.57.57.2"}
variables["router3@office"] = {"ip": "10.57.57.3"}
variables["__global__"] = {"id": "57"}
variables["__global__"]["mask"] =  "255.255.255.255"
expected = "!"
routers = connpy.nodes(nodes, config = config)
routers.run(commands, variables)
routers.test("ping {ip}", expected, variables)
for key in routers.result:
    print(key, ' ---> ', ("pass" if routers.result[key] else "fail"))

Using AI

import connpy
conf = connpy.configfile()
# Uses models and API keys from config, or override them:
myai = connpy.ai(conf, engineer_model="gemini/gemini-2.5-flash", engineer_api_key="your-key")
result = myai.ask("go to router1 and show me the running configuration")
print(result["response"])
# Streaming is enabled by default for CLI, disable for programmatic use:
result = myai.ask("show interfaces on all routers", stream=False)
print(result["response"])

AI Plugin Tool Registration

Plugins can register custom tools with the AI system using register_ai_tool() in their Preload class:

def _register_my_tools(ai_instance):
    tool_def = {
        "type": "function",
        "function": {
            "name": "my_custom_tool",
            "description": "Does something useful.",
            "parameters": {
                "type": "object",
                "properties": {"query": {"type": "string"}},
                "required": ["query"]
            }
        }
    }
    ai_instance.register_ai_tool(
        tool_definition=tool_def,
        handler=my_handler_function,
        target="engineer",  # or "architect" or "both"
        engineer_prompt="- My tool: does X.",
        architect_prompt="  * My tool (my_custom_tool)."
    )

class Preload:
    def __init__(self, connapp):
        connapp.ai.modify(_register_my_tools)

Sub-modules

connpy.tests

Classes

class Plugins
Expand source code
class Plugins:
    def __init__(self):
        self.plugins = {}
        self.plugin_parsers = {}
        self.preloads = {}

    def verify_script(self, file_path):
        """
        Verifies that a given Python script meets specific structural requirements.

        This function checks a Python script for compliance with predefined structural 
        rules. It ensures that the script contains only allowed top-level elements 
        (functions, classes, imports, pass statements, and a specific if __name__ block) 
        and that it includes mandatory classes with specific attributes and methods.

        ### Arguments:
            - file_path (str): The file path of the Python script to be verified.

        ### Returns:
            - str: A message indicating the type of violation if the script doesn't meet 
                 the requirements, or False if all requirements are met.

        ### Verifications:
            - The presence of only allowed top-level elements.
            - The existence of two specific classes: 'Parser' and 'Entrypoint'. and/or specific class: Preload.
            - 'Parser' class must only have an '__init__' method and must assign 'self.parser'.
            - 'Entrypoint' class must have an '__init__' method accepting specific arguments.

        If any of these checks fail, the function returns an error message indicating 
        the reason. If the script passes all checks, the function returns False, 
        indicating successful verification.

        ### Exceptions:
                - SyntaxError: If the script contains a syntax error, it is caught and 
                               returned as a part of the error message.
        """
        with open(file_path, 'r') as file:
            source_code = file.read()

        try:
            tree = ast.parse(source_code)
        except SyntaxError as e:
            return f"Syntax error in file: {e}"


        has_parser = False
        has_entrypoint = False
        has_preload = False

        for node in tree.body:
            # Allow only function definitions, class definitions, and pass statements at top-level
            if isinstance(node, ast.If):
                # Check for the 'if __name__ == "__main__":' block
                if not (isinstance(node.test, ast.Compare) and
                        isinstance(node.test.left, ast.Name) and
                        node.test.left.id == '__name__' and
                        ((hasattr(ast, 'Str') and isinstance(node.test.comparators[0], getattr(ast, 'Str')) and node.test.comparators[0].s == '__main__') or
                         (hasattr(ast, 'Constant') and isinstance(node.test.comparators[0], getattr(ast, 'Constant')) and node.test.comparators[0].value == '__main__'))):
                    return "Only __name__ == __main__ If is allowed"

            elif not isinstance(node, (ast.FunctionDef, ast.ClassDef, ast.Import, ast.ImportFrom, ast.Pass)):
                return f"Plugin can only have pass, functions, classes and imports. {node} is not allowed"  # Reject any other AST types

            if isinstance(node, ast.ClassDef):

                if node.name == 'Parser':
                    has_parser = True
                    # Ensure Parser class has only the __init__ method and assigns self.parser
                    if not all(isinstance(method, ast.FunctionDef) and method.name == '__init__' for method in node.body):
                        return "Parser class should only have __init__ method"

                    # Check if 'self.parser' is assigned in __init__ method
                    init_method = node.body[0]
                    assigned_attrs = [target.attr for expr in init_method.body if isinstance(expr, ast.Assign) for target in expr.targets if isinstance(target, ast.Attribute) and isinstance(target.value, ast.Name) and target.value.id == 'self']
                    if 'parser' not in assigned_attrs:
                        return "Parser class should set self.parser"


                elif node.name == 'Entrypoint':
                    has_entrypoint = True
                    init_method = next((item for item in node.body if isinstance(item, ast.FunctionDef) and item.name == '__init__'), None)
                    if not init_method or len(init_method.args.args) != 4:  # self, args, parser, conapp
                        return "Entrypoint class should have method __init__ and accept only arguments: args, parser and connapp"  # 'Entrypoint' __init__ does not have correct signature

                elif node.name == 'Preload':
                    has_preload = True
                    init_method = next((item for item in node.body if isinstance(item, ast.FunctionDef) and item.name == '__init__'), None)
                    if not init_method or len(init_method.args.args) != 2:  # self, connapp
                        return "Preload class should have method __init__ and accept only argument: connapp"  # 'Preload' __init__ does not have correct signature

        # Applying the combination logic based on class presence
        if has_parser and not has_entrypoint:
            return "Parser requires Entrypoint class to be present."
        elif has_entrypoint and not has_parser:
            return "Entrypoint requires Parser class to be present."
    
        if not (has_parser or has_entrypoint or has_preload):
            return "No valid class (Parser, Entrypoint, or Preload) found."

        return False  # All requirements met, no error

    def _import_from_path(self, path):
        spec = importlib.util.spec_from_file_location("module.name", path)
        module = importlib.util.module_from_spec(spec)
        sys.modules["module.name"] = module
        spec.loader.exec_module(module)
        return module

    def _import_plugins_to_argparse(self, directory, subparsers):
        for filename in os.listdir(directory):
            commands = subparsers.choices.keys()
            if filename.endswith(".py"):
                root_filename = os.path.splitext(filename)[0]
                if root_filename in commands:
                    continue
                # Construct the full path
                filepath = os.path.join(directory, filename)
                check_file = self.verify_script(filepath)
                if check_file:
                    printer.error(f"Failed to load plugin: {filename}. Reason: {check_file}")
                    continue
                else:
                    self.plugins[root_filename] = self._import_from_path(filepath)
                    if hasattr(self.plugins[root_filename], "Parser"):
                        self.plugin_parsers[root_filename] = self.plugins[root_filename].Parser()
                        plugin = self.plugin_parsers[root_filename]
                        subparsers.add_parser(root_filename, parents=[self.plugin_parsers[root_filename].parser], add_help=False, usage=plugin.parser.usage, description=plugin.parser.description, epilog=plugin.parser.epilog, formatter_class=plugin.parser.formatter_class)
                    if hasattr(self.plugins[root_filename], "Preload"):
                        self.preloads[root_filename] = self.plugins[root_filename]

Methods

def verify_script(self, file_path)
Expand source code
def verify_script(self, file_path):
    """
    Verifies that a given Python script meets specific structural requirements.

    This function checks a Python script for compliance with predefined structural 
    rules. It ensures that the script contains only allowed top-level elements 
    (functions, classes, imports, pass statements, and a specific if __name__ block) 
    and that it includes mandatory classes with specific attributes and methods.

    ### Arguments:
        - file_path (str): The file path of the Python script to be verified.

    ### Returns:
        - str: A message indicating the type of violation if the script doesn't meet 
             the requirements, or False if all requirements are met.

    ### Verifications:
        - The presence of only allowed top-level elements.
        - The existence of two specific classes: 'Parser' and 'Entrypoint'. and/or specific class: Preload.
        - 'Parser' class must only have an '__init__' method and must assign 'self.parser'.
        - 'Entrypoint' class must have an '__init__' method accepting specific arguments.

    If any of these checks fail, the function returns an error message indicating 
    the reason. If the script passes all checks, the function returns False, 
    indicating successful verification.

    ### Exceptions:
            - SyntaxError: If the script contains a syntax error, it is caught and 
                           returned as a part of the error message.
    """
    with open(file_path, 'r') as file:
        source_code = file.read()

    try:
        tree = ast.parse(source_code)
    except SyntaxError as e:
        return f"Syntax error in file: {e}"


    has_parser = False
    has_entrypoint = False
    has_preload = False

    for node in tree.body:
        # Allow only function definitions, class definitions, and pass statements at top-level
        if isinstance(node, ast.If):
            # Check for the 'if __name__ == "__main__":' block
            if not (isinstance(node.test, ast.Compare) and
                    isinstance(node.test.left, ast.Name) and
                    node.test.left.id == '__name__' and
                    ((hasattr(ast, 'Str') and isinstance(node.test.comparators[0], getattr(ast, 'Str')) and node.test.comparators[0].s == '__main__') or
                     (hasattr(ast, 'Constant') and isinstance(node.test.comparators[0], getattr(ast, 'Constant')) and node.test.comparators[0].value == '__main__'))):
                return "Only __name__ == __main__ If is allowed"

        elif not isinstance(node, (ast.FunctionDef, ast.ClassDef, ast.Import, ast.ImportFrom, ast.Pass)):
            return f"Plugin can only have pass, functions, classes and imports. {node} is not allowed"  # Reject any other AST types

        if isinstance(node, ast.ClassDef):

            if node.name == 'Parser':
                has_parser = True
                # Ensure Parser class has only the __init__ method and assigns self.parser
                if not all(isinstance(method, ast.FunctionDef) and method.name == '__init__' for method in node.body):
                    return "Parser class should only have __init__ method"

                # Check if 'self.parser' is assigned in __init__ method
                init_method = node.body[0]
                assigned_attrs = [target.attr for expr in init_method.body if isinstance(expr, ast.Assign) for target in expr.targets if isinstance(target, ast.Attribute) and isinstance(target.value, ast.Name) and target.value.id == 'self']
                if 'parser' not in assigned_attrs:
                    return "Parser class should set self.parser"


            elif node.name == 'Entrypoint':
                has_entrypoint = True
                init_method = next((item for item in node.body if isinstance(item, ast.FunctionDef) and item.name == '__init__'), None)
                if not init_method or len(init_method.args.args) != 4:  # self, args, parser, conapp
                    return "Entrypoint class should have method __init__ and accept only arguments: args, parser and connapp"  # 'Entrypoint' __init__ does not have correct signature

            elif node.name == 'Preload':
                has_preload = True
                init_method = next((item for item in node.body if isinstance(item, ast.FunctionDef) and item.name == '__init__'), None)
                if not init_method or len(init_method.args.args) != 2:  # self, connapp
                    return "Preload class should have method __init__ and accept only argument: connapp"  # 'Preload' __init__ does not have correct signature

    # Applying the combination logic based on class presence
    if has_parser and not has_entrypoint:
        return "Parser requires Entrypoint class to be present."
    elif has_entrypoint and not has_parser:
        return "Entrypoint requires Parser class to be present."

    if not (has_parser or has_entrypoint or has_preload):
        return "No valid class (Parser, Entrypoint, or Preload) found."

    return False  # All requirements met, no error

Verifies that a given Python script meets specific structural requirements.

This function checks a Python script for compliance with predefined structural rules. It ensures that the script contains only allowed top-level elements (functions, classes, imports, pass statements, and a specific if name block) and that it includes mandatory classes with specific attributes and methods.

Arguments:

- file_path (str): The file path of the Python script to be verified.

Returns:

- str: A message indicating the type of violation if the script doesn't meet 
     the requirements, or False if all requirements are met.

Verifications:

- The presence of only allowed top-level elements.
- The existence of two specific classes: 'Parser' and 'Entrypoint'. and/or specific class: Preload.
- 'Parser' class must only have an '__init__' method and must assign 'self.parser'.
- 'Entrypoint' class must have an '__init__' method accepting specific arguments.

If any of these checks fail, the function returns an error message indicating the reason. If the script passes all checks, the function returns False, indicating successful verification.

Exceptions:

    - SyntaxError: If the script contains a syntax error, it is caught and 
                   returned as a part of the error message.
class ai (config,
org=None,
api_key=None,
engineer_model=None,
architect_model=None,
engineer_api_key=None,
architect_api_key=None)
Expand source code
@ClassHook
class ai:
    """Hybrid Multi-Agent System: Selective Escalation with Role Persistence."""

    SAFE_COMMANDS = [r'^show\s+', r'^ls\s*', r'^cat\s+', r'^ip\s+route\s+show', r'^ip\s+addr\s+show', r'^ip\s+link\s+show', r'^pwd$', r'^hostname$', r'^uname', r'^df\s*', r'^free\s*', r'^ps\s*', r'^ping\s+', r'^traceroute\s+']

    def __init__(self, config, org=None, api_key=None, engineer_model=None, architect_model=None, engineer_api_key=None, architect_api_key=None):
        self.config = config
        self.trusted_session = False  # Trust mode for the entire session
        
        # 1. Cargar configuración genérica
        aiconfig = self.config.config.get("ai", {})
        
        # Modelos (Prioridad: Argumento -> Config -> Default)
        self.engineer_model = engineer_model or aiconfig.get("engineer_model") or "gemini/gemini-3.1-flash-lite-preview"
        self.architect_model = architect_model or aiconfig.get("architect_model") or "anthropic/claude-sonnet-4-6"
        
        # API Keys (Prioridad: Argumento -> Config)
        self.engineer_key = engineer_api_key or aiconfig.get("engineer_api_key")
        self.architect_key = architect_api_key or aiconfig.get("architect_api_key")
        
        # Validate configuration
        if not self.engineer_key:
            raise ValueError("Engineer API key not configured. Use 'conn config ai engineer_api_key <key>' to set it.")
        if not self.architect_key:
            console.print("[yellow]Warning: Architect API key not configured. Architect will be unavailable.[/yellow]")
            console.print("[yellow]Use 'conn config ai architect_api_key <key>' to enable it.[/yellow]")
        
        # Límites
        self.max_history = 30
        self.max_truncate = 50000
        self.soft_limit_iterations = 20  # Show warning and suggest Ctrl+C
        self.hard_limit_iterations = 50  # Force stop

        # External tool registry (populated by plugins via ClassHook.modify)
        self.external_engineer_tools = []     # Tool defs for Engineer LLM
        self.external_architect_tools = []    # Tool defs for Architect LLM
        self.external_tool_handlers = {}      # {"tool_name": handler_callable}
        self.tool_status_formatters = {}      # {"tool_name": formatter_callable}
        self.engineer_prompt_extensions = []  # Extra text for engineer prompt
        self.architect_prompt_extensions = [] # Extra text for architect prompt

        # Long-term memory
        self.memory_path = os.path.expanduser("~/.config/conn/ai_memory.md")
        self.long_term_memory = ""
        if os.path.exists(self.memory_path):
            try:
                with open(self.memory_path, "r") as f:
                    self.long_term_memory = f.read()
            except FileNotFoundError:
                self.long_term_memory = ""
            except PermissionError as e:
                console.print(f"[yellow]Warning: Cannot read AI memory file: {e}[/yellow]")
            except Exception as e:
                console.print(f"[yellow]Warning: Failed to load AI memory: {e}[/yellow]")

        # Prompts base agnósticos
        self._engineer_base_prompt = dedent(f"""
            Role: TECHNICAL EXECUTION ENGINE.
            Expertise: Universal Networking (Cisco, Nokia, Juniper, 6wind, etc.).
            
            Rules:
            - BE FAST: Execute tools directly to provide swift technical answers.
            - AUTONOMY: Proactively use iterative tool calls (list_nodes, run_commands) to find the root cause.
            - BATCH OPERATIONS: When working on multiple devices, call tools in parallel (multiple tool_calls in same response).
            - COMPLETE MISSIONS: Execute ALL steps of a mission before reporting back. Don't stop halfway.
            - DIAGRAM: Use ASCII art or Unicode box-drawing characters directly in your responses to visualize topologies or paths when helpful.
            - EVIDENCE: Include 'Key Snippets' from tool outputs. Be token-efficient.
            - NO WANDERING: Do not speculate. If stuck, report attempts.
            - SAFETY: When you use 'run_commands' with configuration commands, the system automatically prompts the user for confirmation. Just execute - don't ask permission first.
            
            CRITICAL - CONSULT vs ESCALATE:
            - ALWAYS use 'consult_architect' for: Configuration planning, design decisions, complex troubleshooting.
              Examples: "consultalo con el arquitecto", "preguntale al arquitecto", "que opina el arquitecto"
              You stay in control and present the advice to the user.
            
            - ONLY use 'escalate_to_architect' when user EXPLICITLY asks to TALK to the Architect:
              Examples: "quiero hablar con el arquitecto", "pasame con el arquitecto", "que me atienda el arquitecto"
              After escalation, you hand over control completely.
            
            - DEFAULT: When in doubt, use 'consult_architect'. Escalation is rare.
            
            Network Context: {self.long_term_memory if self.long_term_memory else "Empty."}
        """).strip()

        self._architect_base_prompt = dedent(f"""
            Role: STRATEGIC REASONING ENGINE.
            Expertise: Network Architecture, Complex Troubleshooting, and Design Validation.
            
            Rules:
            - STRATEGY: Define technical missions for the Engineer. 
            - DIAGRAM: Use ASCII art or Unicode box-drawing characters in your responses to visualize topologies, traffic paths, or logic flows.
            - ENGINEER CAPABILITIES: Your Engineer can:
                * Filter nodes (list_nodes), Run CLI commands (run_commands), Get metadata (get_node_info).
            - ANALYSIS: Review technical findings to identify patterns or design failures.
            - MEMORY: Update long-term facts ONLY when the user explicitly requests it.
            
            CRITICAL - EFFICIENT DELEGATION:
            - Plan ALL tasks upfront before delegating.
            - Delegate ONCE with a complete, detailed mission including ALL steps.
            - Example: "List all routers matching 'border.*', then run 'show ip bgp summary' and 'show ip route' on each, then analyze the outputs."
            - DO NOT delegate multiple times for the same goal. Batch everything into ONE mission.
            - Wait for Engineer's complete report before responding to user.
            
            CRITICAL - RETURNING CONTROL:
            - When your strategic analysis is complete and no further architectural decisions are needed, use 'return_to_engineer' to hand control back.
            - The Engineer is better suited for ongoing technical execution and troubleshooting.
            - Only stay in control if the user explicitly needs strategic oversight for multiple interactions.
            
            Network Context: {self.long_term_memory if self.long_term_memory else "Empty."}
        """).strip()

    @property
    def engineer_system_prompt(self):
        """Build engineer system prompt with plugin extensions."""
        if self.engineer_prompt_extensions:
            extensions = "\n".join(self.engineer_prompt_extensions)
            return self._engineer_base_prompt + f"\n\nPlugin Capabilities:\n{extensions}"
        return self._engineer_base_prompt

    @property
    def architect_system_prompt(self):
        """Build architect system prompt with plugin extensions."""
        if self.architect_prompt_extensions:
            extensions = "\n".join(self.architect_prompt_extensions)
            return self._architect_base_prompt + f"\n\nPlugin Capabilities:\n{extensions}"
        return self._architect_base_prompt

    def register_ai_tool(self, tool_definition, handler, target="engineer", engineer_prompt=None, architect_prompt=None, status_formatter=None):
        """Register an external tool for the AI system.

        Args:
            tool_definition (dict): OpenAI-compatible tool definition.
            handler (callable): Function(ai_instance, **tool_args) -> str.
            target (str): 'engineer', 'architect', or 'both'.
            engineer_prompt (str): Extra text for engineer system prompt.
            architect_prompt (str): Extra text for architect system prompt.
            status_formatter (callable): Function(args_dict) -> status string.
        """
        name = tool_definition["function"]["name"]
        if target in ("engineer", "both"):
            self.external_engineer_tools.append(tool_definition)
        if target in ("architect", "both"):
            self.external_architect_tools.append(tool_definition)
        self.external_tool_handlers[name] = handler
        if engineer_prompt:
            self.engineer_prompt_extensions.append(engineer_prompt)
        if architect_prompt:
            self.architect_prompt_extensions.append(architect_prompt)
        if status_formatter:
            self.tool_status_formatters[name] = status_formatter

    def _stream_completion(self, model, messages, tools, api_key, status=None, label="", debug=False, **kwargs):
        """Stream a completion call, rendering styled Markdown in real-time.
        
        Returns (response, streamed) where:
        - response: reconstructed ModelResponse (same as non-streaming)
        - streamed: True if text was rendered to console during streaming
        """
        from rich.live import Live
        
        stream_resp = completion(model=model, messages=messages, tools=tools, api_key=api_key, stream=True, **kwargs)
        
        chunks = []
        full_content = ""
        is_streaming_text = False
        has_tool_calls = False
        live_display = None
        
        # Determine styling based on current brain
        role_label = "Network Architect" if "architect" in label.lower() else "Network Engineer"
        border = "purple" if "architect" in label.lower() else "blue"
        title = f"[bold {border}]{role_label}[/bold {border}]"
        
        try:
            for chunk in stream_resp:
                chunks.append(chunk)
                delta = chunk.choices[0].delta
                
                # Detect tool calls
                if hasattr(delta, 'tool_calls') and delta.tool_calls:
                    has_tool_calls = True
                
                # Stream text content with styled rendering
                if hasattr(delta, 'content') and delta.content and not debug:
                    full_content += delta.content
                    
                    if not is_streaming_text:
                        # Stop spinner before starting live display
                        if status:
                            status.stop()
                        live_display = Live(
                            Panel(Markdown(full_content), title=title, border_style=border, expand=False),
                            console=console,
                            refresh_per_second=8,
                            transient=False
                        )
                        live_display.start()
                        is_streaming_text = True
                    else:
                        live_display.update(
                            Panel(Markdown(full_content), title=title, border_style=border, expand=False)
                        )
        except Exception as e:
            if not chunks:
                raise
        finally:
            if live_display:
                # Render final state with complete content
                try:
                    live_display.update(
                        Panel(Markdown(full_content), title=title, border_style=border, expand=False)
                    )
                except Exception:
                    pass
                live_display.stop()
        
        # Rebuild complete response from chunks
        try:
            response = stream_chunk_builder(chunks, messages=messages)
        except Exception:
            # Fallback: manual reconstruction if stream_chunk_builder fails
            full_content_rebuilt = ""
            tool_calls_map = {}
            for c in chunks:
                d = c.choices[0].delta
                if hasattr(d, 'content') and d.content:
                    full_content_rebuilt += d.content
                if hasattr(d, 'tool_calls') and d.tool_calls:
                    for tc in d.tool_calls:
                        idx = tc.index
                        if idx not in tool_calls_map:
                            tool_calls_map[idx] = {"id": tc.id or "", "type": "function", "function": {"name": getattr(tc.function, 'name', '') or '', "arguments": getattr(tc.function, 'arguments', '') or ''}}
                        else:
                            if tc.id: tool_calls_map[idx]["id"] = tc.id
                            if tc.function:
                                if tc.function.name: tool_calls_map[idx]["function"]["name"] = tc.function.name
                                if tc.function.arguments: tool_calls_map[idx]["function"]["arguments"] += tc.function.arguments
            
            # Build a minimal response-like object
            class FakeFunc:
                def __init__(self, name, arguments): self.name = name; self.arguments = arguments
            class FakeTC:
                def __init__(self, d): self.id = d["id"]; self.function = FakeFunc(d["function"]["name"], d["function"]["arguments"])
                def model_dump(self, **kw): return {"id": self.id, "type": "function", "function": {"name": self.function.name, "arguments": self.function.arguments}}
            class FakeMsg:
                def __init__(self, content, tcs): self.content = content or None; self.tool_calls = tcs if tcs else None; self.role = "assistant"
                def model_dump(self, **kw):
                    d = {"role": "assistant", "content": self.content}
                    if self.tool_calls: d["tool_calls"] = [tc.model_dump() for tc in self.tool_calls]
                    return d
            class FakeChoice:
                def __init__(self, msg): self.message = msg
            class FakeResp:
                def __init__(self, choice): self.choices = [choice]; self.usage = None
            
            tcs = [FakeTC(tool_calls_map[i]) for i in sorted(tool_calls_map)] if tool_calls_map else None
            response = FakeResp(FakeChoice(FakeMsg(full_content_rebuilt or full_content, tcs)))
        
        # Only count as "streamed" if we rendered text AND it was the final response (no tool calls)
        streamed = is_streaming_text and not has_tool_calls
        return response, streamed

    def _sanitize_messages(self, messages):
        """Sanitize message list for strict providers like Gemini.
        
        Ensures that:
        1. Every assistant message with tool_calls is followed by ALL its tool responses
        2. No user/system messages appear between tool_calls and tool responses
        3. Orphaned tool_calls at the end are removed
        4. Orphaned tool responses without a preceding tool_call are removed
        """
        if not messages:
            return messages
        
        sanitized = []
        i = 0
        while i < len(messages):
            msg = messages[i]
            role = msg.get('role', '')
            
            if role == 'assistant' and msg.get('tool_calls'):
                # Collect all expected tool_call_ids
                expected_ids = set()
                for tc in msg['tool_calls']:
                    tc_id = tc.get('id') if isinstance(tc, dict) else getattr(tc, 'id', None)
                    if tc_id:
                        expected_ids.add(tc_id)
                
                # Look ahead for matching tool responses
                tool_responses = []
                j = i + 1
                while j < len(messages):
                    next_msg = messages[j]
                    if next_msg.get('role') == 'tool':
                        tool_responses.append(next_msg)
                        j += 1
                    else:
                        break
                
                # Only include this assistant+tools block if we have responses
                if tool_responses:
                    sanitized.append(msg)
                    sanitized.extend(tool_responses)
                    i = j
                else:
                    # Orphaned tool_calls with no responses - skip the assistant message
                    i += 1
            elif role == 'tool':
                # Orphaned tool response (no preceding assistant with tool_calls) - skip
                i += 1
            else:
                sanitized.append(msg)
                i += 1
        
        return sanitized

    def _truncate(self, text, limit=None):
        """Truncate text to specified limit, keeping head (60%) and tail (40%)."""
        final_limit = limit or self.max_truncate
        if len(text) <= final_limit: return text
        head_limit = int(final_limit * 0.6)
        tail_limit = int(final_limit * 0.4)
        return (text[:head_limit] + f"\n\n[... OUTPUT TRUNCATED ...]\n\n" + text[-tail_limit:])

    def manage_memory_tool(self, content, action="append"):
        """Save or update long-term memory. Only use when user explicitly requests it."""
        if not content or not content.strip():
            return "Error: Cannot save empty content to memory."
        
        try:
            mode = "a" if action == "append" else "w"
            os.makedirs(os.path.dirname(self.memory_path), exist_ok=True)
            with open(self.memory_path, mode) as f:
                timestamp = datetime.datetime.now().strftime('%Y-%m-%d %H:%M')
                f.write(f"\n\n## {timestamp}\n{content.strip()}\n" if action == "append" else content)
            
            # Reload memory after update
            with open(self.memory_path, "r") as f:
                self.long_term_memory = f.read()
            
            return "Memory updated successfully."
        except PermissionError as e:
            return f"Error: Permission denied writing to memory file: {e}"
        except Exception as e:
            return f"Error updating memory: {str(e)}"


    def list_nodes_tool(self, filter_pattern=".*"):
        """List nodes matching the filter pattern. Returns metadata for <=5 nodes, names only for more."""
        try:
            matched_names = self.config._getallnodes(filter_pattern)
            if not matched_names: return "No nodes found."
            if len(matched_names) <= 5:
                matched_data = self.config.getitems(matched_names, extract=True)
                res = {}
                for name, data in matched_data.items():
                    os_tag = "unknown"
                    if isinstance(data, dict):
                        ts = data.get("tags")
                        if isinstance(ts, dict): os_tag = ts.get("os", "unknown")
                    res[name] = {"os": os_tag}
                return json.dumps(res)
            return json.dumps({"count": len(matched_names), "nodes": matched_names, "note": "Use 'get_node_info' for details."})
        except Exception as e: 
            return f"Error listing nodes: {str(e)}"

    def _is_safe_command(self, cmd):
        """Check if a command matches safe patterns."""
        return any(re.match(pattern, cmd.strip(), re.IGNORECASE) for pattern in self.SAFE_COMMANDS)
    
    def run_commands_tool(self, nodes_filter, commands, status=None):
        """Execute commands on nodes matching the filter. Native interactive confirmation for unsafe commands."""
        # Handle if commands is a JSON string
        if isinstance(commands, str):
            try:
                commands = json.loads(commands)
            except ValueError:
                commands = [c.strip() for c in commands.split('\n') if c.strip()]
        
        # Expand multi-line commands within a list (in case the AI packs them)
        if isinstance(commands, list):
            expanded_commands = []
            for cmd in commands:
                expanded_commands.extend([c.strip() for c in str(cmd).split('\n') if c.strip()])
            commands = expanded_commands
        else:
            commands = [str(commands)]
        
        # Check command safety natively
        if not self.trusted_session:
            unsafe_commands = [cmd for cmd in commands if not self._is_safe_command(cmd)]
            if unsafe_commands:
                # Stop the spinner so prompt doesn't get messed up
                if status: status.stop()
                
                # Show ALL commands with unsafe ones highlighted
                formatted_cmds = []
                for cmd in commands:
                    if cmd in unsafe_commands:
                        formatted_cmds.append(f"  • [yellow]{cmd}[/yellow]")
                    else:
                        formatted_cmds.append(f"  • {cmd}")
                
                panel_content = f"Target: {nodes_filter}\nCommands:\n" + "\n".join(formatted_cmds)
                console.print(Panel(panel_content, title="[bold yellow]⚠️ UNSAFE COMMANDS DETECTED[/bold yellow]", border_style="yellow"))
                
                try:
                    from rich.prompt import Prompt
                    user_resp = Prompt.ask("[bold yellow]Execute? (y: yes / n: no / a: allow all this session / <text>: feedback)[/bold yellow]", default="n")
                except KeyboardInterrupt:
                    if status: status.update("[bold blue]Engineer: Resuming...")
                    console.print("[bold red]✗ Aborted by user (Ctrl+C).[/bold red]")
                    return "Error: User cancelled execution (Ctrl+C)."
                
                # Resume the spinner
                if status: status.update("[bold blue]Engineer: Processing user response...")
                
                user_resp_lower = user_resp.strip().lower()
                if user_resp_lower in ['a', 'allow']:
                    self.trusted_session = True
                    console.print("[bold green]✓ Trust Mode Enabled. All future commands in this session will execute without confirmation.[/bold green]")
                elif user_resp_lower in ['y', 'yes']:
                    console.print("[bold green]✓ Executing...[/bold green]")
                elif user_resp_lower in ['n', 'no', '']:
                    console.print("[bold red]✗ Execution rejected by user.[/bold red]")
                    return "Error: User rejected execution."
                else:
                    console.print(f"[bold cyan]User feedback: [/bold cyan]{user_resp}")
                    return f"User requested changes: {user_resp}. Please adjust the commands based on this feedback and try again."
        
        try:
            matched_names = self.config._getallnodes(nodes_filter)
            if not matched_names: return "No nodes found matching filter."
            thisnodes_dict = self.config.getitems(matched_names, extract=True)
            result = nodes(thisnodes_dict, config=self.config).run(commands)
            return self._truncate(json.dumps(result))
        except Exception as e: 
            return f"Error executing commands: {str(e)}"

    def get_node_info_tool(self, node_name):
        """Get detailed metadata for a specific node. Passwords are masked."""
        try:
            d = self.config.getitem(node_name, extract=True)
            if 'password' in d: d['password'] = '***'
            return json.dumps(d)
        except Exception as e: 
            return f"Error getting node info: {str(e)}"

    def _engineer_loop(self, task, status=None, debug=False, chat_history=None):
        """Internal loop where the Engineer executes technical tasks for the Architect."""
        # Optimización de caché para el Ingeniero
        if "claude" in self.engineer_model.lower():
            messages = [{"role": "system", "content": [{"type": "text", "text": self.engineer_system_prompt, "cache_control": {"type": "ephemeral"}}]}]
        else:
            messages = [{"role": "system", "content": self.engineer_system_prompt}]
            
        if chat_history:
            # Clean chat history from caching metadata if engineer is not Claude
            if "claude" not in self.engineer_model.lower():
                cleaned_history = []
                for msg in chat_history[-5:]:
                    m = msg if isinstance(msg, dict) else msg.model_dump(exclude_none=True)
                    # Remove cache_control from system messages
                    if m.get('role') == 'system' and isinstance(m.get('content'), list):
                        m['content'] = m['content'][0]['text'] if m['content'] else ""
                    cleaned_history.append(m)
                messages.extend(cleaned_history)
            else:
                messages.extend(chat_history[-5:])
        
        messages.append({"role": "user", "content": f"MISSION: {task}"})
        
        tools = self._get_engineer_tools()
        usage = {"input": 0, "output": 0, "total": 0}
        iteration = 0
        soft_limit_warned = False
        
        try:
            while iteration < self.hard_limit_iterations:
                iteration += 1
                
                # Soft limit warning
                if iteration == self.soft_limit_iterations and not soft_limit_warned:
                    console.print(f"[yellow]⚠ Engineer has performed {iteration} steps. This is taking longer than expected.[/yellow]")
                    console.print(f"[yellow]  You can press Ctrl+C to interrupt and get a summary.[/yellow]")
                    soft_limit_warned = True
                
                if status: status.update(f"[bold blue]Engineer: Analyzing mission... (step {iteration})")
                
                try:
                    safe_messages = self._sanitize_messages(messages)
                    response = completion(model=self.engineer_model, messages=safe_messages, tools=tools, api_key=self.engineer_key)
                except Exception as e:
                    return f"Engineer failed to connect: {str(e)}", usage
                
                if hasattr(response, "usage") and response.usage:
                    usage["input"] += getattr(response.usage, "prompt_tokens", 0)
                    usage["output"] += getattr(response.usage, "completion_tokens", 0)
                    usage["total"] += getattr(response.usage, "total_tokens", 0)

                resp_msg = response.choices[0].message
                msg_dict = resp_msg.model_dump(exclude_none=True)
                if msg_dict.get("tool_calls") and msg_dict.get("content") == "": msg_dict["content"] = None
                messages.append(msg_dict)

                if not resp_msg.tool_calls: break
                for tc in resp_msg.tool_calls:
                    fn, args = tc.function.name, json.loads(tc.function.arguments)
                    
                    # Notificación en tiempo real de la tarea técnica
                    if status:
                        if fn == "list_nodes": status.update(f"[bold blue]Engineer: [SEARCH] {args.get('filter_pattern','.*')}")
                        elif fn == "run_commands": 
                            cmds = args.get('commands', [])
                            cmd_str = cmds[0] if cmds else ""
                            status.update(f"[bold blue]Engineer: [CMD] {cmd_str}")
                        elif fn == "get_node_info": status.update(f"[bold blue]Engineer: [INSPECT] {args.get('node_name','')}")
                        elif fn in self.tool_status_formatters: status.update(self.tool_status_formatters[fn](args))

                    if debug: console.print(Panel(Text(json.dumps(args, indent=2)), title=f"[bold blue]Engineer Tool: {fn}[/bold blue]", border_style="blue"))
                    
                    if fn == "list_nodes": obs = self.list_nodes_tool(**args)
                    elif fn == "run_commands": obs = self.run_commands_tool(**args, status=status)
                    elif fn == "get_node_info": obs = self.get_node_info_tool(**args)
                    elif fn in self.external_tool_handlers: obs = self.external_tool_handlers[fn](self, **args)
                    else: obs = f"Error: Unknown tool '{fn}'."
                    
                    if debug: console.print(Panel(Text(str(obs)), title=f"[bold green]Engineer Observation: {fn}[/bold green]", border_style="green"))
                    messages.append({"tool_call_id": tc.id, "role": "tool", "name": fn, "content": obs})
            
            if iteration >= self.hard_limit_iterations:
                console.print(f"[red]⛔ Engineer reached hard limit ({self.hard_limit_iterations} steps). Forcing stop.[/red]")
            
            if debug and resp_msg.content:
                console.print(Panel(Text(resp_msg.content), title="[bold blue]Engineer Final Report to Architect[/bold blue]", border_style="blue"))
            
            return resp_msg.content, usage
        except Exception as e:
            return f"Engineer failed: {str(e)}", usage

    def _get_engineer_tools(self):
        """Define tools available to the Engineer."""
        tools = [
            {"type": "function", "function": {"name": "list_nodes", "description": "Lists available nodes in the inventory.", "parameters": {"type": "object", "properties": {"filter_pattern": {"type": "string", "description": "Regex to filter nodes (e.g. '.*', 'border.*')."}}}}},
            {"type": "function", "function": {"name": "run_commands", "description": "Runs one or more commands on matched nodes. MANDATORY: You MUST call 'list_nodes' first to verify the target list.", "parameters": {"type": "object", "properties": {"nodes_filter": {"type": "string", "description": "Exact node name or verified filter pattern."}, "commands": {"type": "array", "items": {"type": "string"}, "description": "List of commands (e.g. ['show ip route', 'show int desc'])."}}, "required": ["nodes_filter", "commands"]}}},
            {"type": "function", "function": {"name": "get_node_info", "description": "Gets full metadata for a specific node.", "parameters": {"type": "object", "properties": {"node_name": {"type": "string"}}, "required": ["node_name"]}}},
            {"type": "function", "function": {"name": "consult_architect", "description": "Ask the Strategic Reasoning Engine for advice on complex design, architecture, or troubleshooting decisions. You remain in control and will present the response to the user. Use this for: configuration planning, design validation, complex troubleshooting.", "parameters": {"type": "object", "properties": {"question": {"type": "string", "description": "Strategic question or decision needed."}, "technical_summary": {"type": "string", "description": "Technical findings and context gathered so far."}}, "required": ["question", "technical_summary"]}}},
            {"type": "function", "function": {"name": "escalate_to_architect", "description": "Transfer full control to the Strategic Reasoning Engine. Use ONLY when the user explicitly requests the Architect or when the problem requires strategic oversight beyond consultation. After escalation, the Architect takes over the conversation.", "parameters": {"type": "object", "properties": {"reason": {"type": "string", "description": "Why you're escalating (e.g. 'User requested Architect', 'Complex multi-site design needed')."}, "context": {"type": "string", "description": "Full context and findings to hand over."}}, "required": ["reason", "context"]}}}
        ]
        tools.extend(self.external_engineer_tools)
        return tools

    def _get_architect_tools(self):
        """Define tools available to the Strategic Reasoning Engine."""
        tools = [
            {"type": "function", "function": {"name": "delegate_to_engineer", "description": "Delegates a technical mission to the Engineer.", "parameters": {"type": "object", "properties": {"task": {"type": "string", "description": "Detailed technical mission or goal."}}, "required": ["task"]}}},
            {"type": "function", "function": {"name": "return_to_engineer", "description": "Return control to the Engineer. Use this when your strategic analysis is complete and the Engineer should handle the rest of the conversation.", "parameters": {"type": "object", "properties": {"summary": {"type": "string", "description": "Brief summary of your analysis to hand over to the Engineer."}}, "required": ["summary"]}}},
            {"type": "function", "function": {"name": "manage_memory_tool", "description": "Saves information to long-term memory. MANDATORY: Only use this if the user explicitly asks to remember or save something.", "parameters": {"type": "object", "properties": {"content": {"type": "string"}, "action": {"type": "string", "enum": ["append", "replace"]}}, "required": ["content"]}}}
        ]
        tools.extend(self.external_architect_tools)
        return tools

    @MethodHook
    def ask(self, user_input, dryrun=False, chat_history=None, status=None, debug=False, stream=True):
        if chat_history is None: chat_history = []
        usage = {"input": 0, "output": 0, "total": 0}
        
        # 1. Selector de Rol inicial (Sticky Brain)
        explicit_architect = re.match(r'^(architect|arquitecto|@architect)[:\s]', user_input, re.I)
        explicit_engineer = re.match(r'^(engineer|ingeniero|@engineer)[:\s]', user_input, re.I)
        
        if explicit_architect:
            current_brain = "architect"
        elif explicit_engineer:
            current_brain = "engineer"
        else:
            # Sticky Brain: Detectar si el Arquitecto estaba al mando en el historial reciente
            is_architect_active = False
            for msg in reversed(chat_history[-5:]):
                tcs = msg.get('tool_calls') if isinstance(msg, dict) else getattr(msg, 'tool_calls', None)
                if tcs:
                    for tc in tcs:
                        fn = tc.get('function', {}).get('name') if isinstance(tc, dict) else getattr(getattr(tc, 'function', None), 'name', '')
                        # Architect stays in control if delegating tasks or if Engineer escalated to them
                        # consult_architect is just Engineer asking for advice - Engineer keeps control
                        if fn in ['delegate_to_engineer', 'escalate_to_architect']:
                            is_architect_active = True; break
                if is_architect_active: break
            current_brain = "architect" if is_architect_active else "engineer"
        
        # 2. Preparación de mensajes y limpieza
        clean_input = re.sub(r'^(architect|arquitecto|engineer|ingeniero|@architect|@engineer)[:\s]+', '', user_input, flags=re.IGNORECASE).strip()
        
        system_prompt = self.architect_system_prompt if current_brain == "architect" else self.engineer_system_prompt
        tools = self._get_architect_tools() if current_brain == "architect" else self._get_engineer_tools()
        model = self.architect_model if current_brain == "architect" else self.engineer_model
        key = self.architect_key if current_brain == "architect" else self.engineer_key

        # Estructura optimizada para Prompt Caching
        if "claude" in model.lower():
            messages = [{"role": "system", "content": [{"type": "text", "text": system_prompt, "cache_control": {"type": "ephemeral"}}]}]
        else:
            messages = [{"role": "system", "content": system_prompt}]
        
        # Interleaving de historial
        last_role = "system"
        for msg in chat_history[-self.max_history:]:
            m = msg if isinstance(msg, dict) else msg.model_dump(exclude_none=True)
            role = m.get('role')
            if role == last_role and role == 'user':
                messages[-1]['content'] += "\n" + (m.get('content') or "")
                continue
            if role == 'assistant' and m.get('tool_calls') and m.get('content') == "": m['content'] = None
            messages.append(m)
            last_role = role

        if last_role == 'user': messages[-1]['content'] += "\n" + clean_input
        else: messages.append({"role": "user", "content": clean_input})

        # 3. Bucle de ejecución
        iteration = 0
        soft_limit_warned = False
        streamed_response = False
        
        try:
            while iteration < self.hard_limit_iterations:
                iteration += 1
                
                # Soft limit warning
                if iteration == self.soft_limit_iterations and not soft_limit_warned:
                    console.print(f"[yellow]⚠ Agent has performed {iteration} steps. This is taking longer than expected.[/yellow]")
                    console.print(f"[yellow]  You can press Ctrl+C to interrupt and get a summary of progress.[/yellow]")
                    soft_limit_warned = True
                
                label = "[bold purple]Architect" if current_brain == "architect" else "[bold blue]Engineer"
                if status: status.update(f"{label} is thinking... (step {iteration})")
                
                streamed_response = False
                try:
                    safe_messages = self._sanitize_messages(messages)
                    if stream and not debug:
                        response, streamed_response = self._stream_completion(
                            model=model, messages=safe_messages, tools=tools, api_key=key,
                            status=status, label=label, debug=debug, num_retries=3
                        )
                    else:
                        response = completion(model=model, messages=safe_messages, tools=tools, api_key=key, num_retries=3)
                except Exception as e:
                    if current_brain == "architect":
                        if status: status.update("[bold orange3]Architect unavailable! Falling back to Engineer...")
                        # Preserve context when falling back - use clean_input directly
                        current_brain = "engineer"
                        model = self.engineer_model
                        tools = self._get_engineer_tools()
                        key = self.engineer_key
                        # Rebuild messages with Engineer system prompt and original user request
                        messages = [{"role": "system", "content": self.engineer_system_prompt}]
                        # Add chat history if exists (excluding system prompt)
                        if chat_history:
                            for msg in chat_history[-self.max_history:]:
                                if msg.get('role') != 'system':
                                    messages.append(msg)
                        # Add current user request
                        messages.append({"role": "user", "content": clean_input})
                        continue
                    else: 
                        return {"response": f"Error: Both engines failed. {str(e)}", "chat_history": messages[1:], "usage": usage}
                
                if hasattr(response, "usage") and response.usage:
                    usage["input"] += getattr(response.usage, "prompt_tokens", 0)
                    usage["output"] += getattr(response.usage, "completion_tokens", 0)
                    usage["total"] += getattr(response.usage, "total_tokens", 0)

                resp_msg = response.choices[0].message
                msg_dict = resp_msg.model_dump(exclude_none=True)
                if msg_dict.get("tool_calls") and msg_dict.get("content") == "": msg_dict["content"] = None
                messages.append(msg_dict)

                if debug and resp_msg.content:
                    console.print(Panel(Markdown(resp_msg.content), title=f"{label} Reasoning", border_style="purple" if current_brain == "architect" else "blue"))

                if not resp_msg.tool_calls: break
                
                # Track if we need to inject a user message after all tool responses
                pending_user_message = None
                
                for tc in resp_msg.tool_calls:
                    fn, args = tc.function.name, json.loads(tc.function.arguments)
                    
                    # Validate tool access based on current brain
                    if fn in ['delegate_to_engineer'] and current_brain != "architect":
                        obs = f"Error: Tool '{fn}' is only available to the Architect (Architect). You are the Engineer (Engineer). Use 'run_commands' directly to execute configuration."
                        messages.append({"tool_call_id": tc.id, "role": "tool", "name": fn, "content": obs})
                        continue
                    
                    if status:
                        if fn == "delegate_to_engineer": status.update(f"[bold purple]Architect: [DELEGATING MISSION] {args.get('task','')[:40]}...")
                        elif fn == "manage_memory_tool": status.update(f"[bold purple]Architect: [UPDATING MEMORY]")

                    if debug: console.print(Panel(Text(json.dumps(args, indent=2)), title=f"{label} Decision: {fn}", border_style="white"))

                    if fn == "delegate_to_engineer":
                        obs, eng_usage = self._engineer_loop(args["task"], status=status, debug=debug, chat_history=messages[:-1])
                        usage["input"] += eng_usage["input"]; usage["output"] += eng_usage["output"]; usage["total"] += eng_usage["total"]
                    elif fn == "consult_architect":
                        if status: status.update("[bold purple]Engineer consulting Architect...")
                        try:
                            # Consultation only - Engineer stays in control
                            claude_resp = completion(
                                model=self.architect_model, 
                                messages=[
                                    {"role": "system", "content": self.architect_system_prompt},
                                    {"role": "user", "content": f"The Engineer needs your strategic advice.\n\nTECHNICAL SUMMARY: {args['technical_summary']}\n\nQUESTION: {args['question']}\n\nProvide strategic guidance. The Engineer will continue handling the user."}
                                ], 
                                api_key=self.architect_key, 
                                num_retries=3
                            )
                            obs = claude_resp.choices[0].message.content
                            if debug: console.print(Panel(Markdown(obs), title="[bold purple]Architect Consultation[/bold purple]", border_style="purple"))
                        except Exception as e:
                            if status: status.update("[bold orange3]Architect unavailable! Engineer continuing alone...")
                            obs = f"Architect unavailable ({str(e)}). Proceeding with your best technical judgment."
                    
                    elif fn == "escalate_to_architect":
                        if status: status.update("[bold purple]Transferring control to Architect...")
                        # Full escalation - Architect takes over
                        current_brain = "architect"
                        model = self.architect_model
                        tools = self._get_architect_tools()
                        key = self.architect_key
                        messages[0] = {"role": "system", "content": self.architect_system_prompt}
                        # Prepare handover context to inject AFTER all tool responses
                        handover_msg = f"HANDOVER FROM EXECUTION ENGINE\n\nReason: {args['reason']}\n\nContext: {args['context']}\n\nYou are now in control of this conversation."
                        pending_user_message = handover_msg
                        obs = "Control transferred to Architect. Handover context will be provided."
                        if debug: console.print(Panel(Text(handover_msg), title="[bold purple]Escalation to Architect[/bold purple]", border_style="purple"))
                    
                    elif fn == "return_to_engineer":
                        if status: status.update("[bold blue]Transferring control back to Engineer...")
                        # Architect returns control to Engineer
                        current_brain = "engineer"
                        model = self.engineer_model
                        tools = self._get_engineer_tools()
                        key = self.engineer_key
                        messages[0] = {"role": "system", "content": self.engineer_system_prompt}
                        # Prepare handover context to inject AFTER all tool responses
                        handover_msg = f"HANDOVER FROM ARCHITECT\n\nSummary: {args['summary']}\n\nYou are now back in control. Continue handling the user's requests."
                        pending_user_message = handover_msg
                        obs = "Control returned to Engineer. Handover summary will be provided."
                        if debug: console.print(Panel(Text(handover_msg), title="[bold blue]Return to Engineer[/bold blue]", border_style="blue"))
                    
                    elif fn == "list_nodes": obs = self.list_nodes_tool(**args)
                    elif fn == "run_commands": obs = self.run_commands_tool(**args, status=status)
                    elif fn == "get_node_info": obs = self.get_node_info_tool(**args)
                    elif fn == "manage_memory_tool": obs = self.manage_memory_tool(**args)
                    elif fn in self.external_tool_handlers: obs = self.external_tool_handlers[fn](self, **args)
                    else: obs = f"Error: {fn} unknown."
                    
                    messages.append({"tool_call_id": tc.id, "role": "tool", "name": fn, "content": obs})
                
                # Inject pending user message AFTER all tool responses are added
                if pending_user_message:
                    messages.append({"role": "user", "content": pending_user_message})
            
            if iteration >= self.hard_limit_iterations:
                console.print(f"[red]⛔ Agent reached hard limit ({self.hard_limit_iterations} steps). Forcing stop to prevent infinite loop.[/red]")
                # Only inject user message if we're not in the middle of tool calls
                last_msg = messages[-1] if messages else {}
                if last_msg.get("role") != "assistant" or not last_msg.get("tool_calls"):
                    messages.append({"role": "user", "content": "Hard iteration limit reached. Please provide a summary of your findings so far."})
                    try:
                        safe_messages = self._sanitize_messages(messages)
                        response = completion(model=model, messages=safe_messages, tools=[], api_key=key)
                        resp_msg = response.choices[0].message
                        messages.append(resp_msg.model_dump(exclude_none=True))
                    except Exception as e:
                        if status:
                            status.update(f"[bold red]Error fetching summary: {e}[/bold red]")
                        printer.warning(f"Failed to fetch final summary from LLM: {e}")
        except KeyboardInterrupt:
            if status: status.update("[bold red]Interrupted! Closing pending tasks...")
            last_msg = messages[-1]
            if last_msg.get("tool_calls"):
                for tc in last_msg["tool_calls"]:
                    messages.append({"tool_call_id": tc.get("id"), "role": "tool", "name": tc.get("function", {}).get("name"), "content": "Operation cancelled by user."})
            messages.append({"role": "user", "content": "USER INTERRUPTED. Briefly summarize what you were doing and stop."})
            try:
                safe_messages = self._sanitize_messages(messages)
                response = completion(model=model, messages=safe_messages, tools=tools, api_key=key)
                resp_msg = response.choices[0].message
                messages.append(resp_msg.model_dump(exclude_none=True))
            except Exception: pass
        finally:
            try:
                log_dir = self.config.defaultdir
                os.makedirs(log_dir, exist_ok=True)
                log_path = os.path.join(log_dir, "ai_debug.json")
                hist = []
                if os.path.exists(log_path):
                    try:
                        with open(log_path, "r") as f: hist = json.load(f)
                    except (IOError, json.JSONDecodeError): hist = []
                hist.append({"timestamp": datetime.datetime.now().isoformat(), "roles": {"strategic_engine": self.architect_model, "execution_engine": self.engineer_model}, "session": messages})
                with open(log_path, "w") as f: json.dump(hist[-10:], f, indent=4)
            except Exception as e:
                if debug: console.print(f"[dim red]Debug log failed: {e}[/dim red]")

        return {
            "response": messages[-1].get("content"), 
            "chat_history": messages[1:], 
            "app_related": True, 
            "usage": usage,
            "responder": current_brain,  # "architect" or "engineer"
            "streamed": streamed_response
        }

    @MethodHook
    def confirm(self, user_input): return True

Hybrid Multi-Agent System: Selective Escalation with Role Persistence.

Class variables

var SAFE_COMMANDS

The type of the None singleton.

Instance variables

prop architect_system_prompt
Expand source code
@property
def architect_system_prompt(self):
    """Build architect system prompt with plugin extensions."""
    if self.architect_prompt_extensions:
        extensions = "\n".join(self.architect_prompt_extensions)
        return self._architect_base_prompt + f"\n\nPlugin Capabilities:\n{extensions}"
    return self._architect_base_prompt

Build architect system prompt with plugin extensions.

prop engineer_system_prompt
Expand source code
@property
def engineer_system_prompt(self):
    """Build engineer system prompt with plugin extensions."""
    if self.engineer_prompt_extensions:
        extensions = "\n".join(self.engineer_prompt_extensions)
        return self._engineer_base_prompt + f"\n\nPlugin Capabilities:\n{extensions}"
    return self._engineer_base_prompt

Build engineer system prompt with plugin extensions.

Methods

def ask(self,
user_input,
dryrun=False,
chat_history=None,
status=None,
debug=False,
stream=True)
Expand source code
@MethodHook
def ask(self, user_input, dryrun=False, chat_history=None, status=None, debug=False, stream=True):
    if chat_history is None: chat_history = []
    usage = {"input": 0, "output": 0, "total": 0}
    
    # 1. Selector de Rol inicial (Sticky Brain)
    explicit_architect = re.match(r'^(architect|arquitecto|@architect)[:\s]', user_input, re.I)
    explicit_engineer = re.match(r'^(engineer|ingeniero|@engineer)[:\s]', user_input, re.I)
    
    if explicit_architect:
        current_brain = "architect"
    elif explicit_engineer:
        current_brain = "engineer"
    else:
        # Sticky Brain: Detectar si el Arquitecto estaba al mando en el historial reciente
        is_architect_active = False
        for msg in reversed(chat_history[-5:]):
            tcs = msg.get('tool_calls') if isinstance(msg, dict) else getattr(msg, 'tool_calls', None)
            if tcs:
                for tc in tcs:
                    fn = tc.get('function', {}).get('name') if isinstance(tc, dict) else getattr(getattr(tc, 'function', None), 'name', '')
                    # Architect stays in control if delegating tasks or if Engineer escalated to them
                    # consult_architect is just Engineer asking for advice - Engineer keeps control
                    if fn in ['delegate_to_engineer', 'escalate_to_architect']:
                        is_architect_active = True; break
            if is_architect_active: break
        current_brain = "architect" if is_architect_active else "engineer"
    
    # 2. Preparación de mensajes y limpieza
    clean_input = re.sub(r'^(architect|arquitecto|engineer|ingeniero|@architect|@engineer)[:\s]+', '', user_input, flags=re.IGNORECASE).strip()
    
    system_prompt = self.architect_system_prompt if current_brain == "architect" else self.engineer_system_prompt
    tools = self._get_architect_tools() if current_brain == "architect" else self._get_engineer_tools()
    model = self.architect_model if current_brain == "architect" else self.engineer_model
    key = self.architect_key if current_brain == "architect" else self.engineer_key

    # Estructura optimizada para Prompt Caching
    if "claude" in model.lower():
        messages = [{"role": "system", "content": [{"type": "text", "text": system_prompt, "cache_control": {"type": "ephemeral"}}]}]
    else:
        messages = [{"role": "system", "content": system_prompt}]
    
    # Interleaving de historial
    last_role = "system"
    for msg in chat_history[-self.max_history:]:
        m = msg if isinstance(msg, dict) else msg.model_dump(exclude_none=True)
        role = m.get('role')
        if role == last_role and role == 'user':
            messages[-1]['content'] += "\n" + (m.get('content') or "")
            continue
        if role == 'assistant' and m.get('tool_calls') and m.get('content') == "": m['content'] = None
        messages.append(m)
        last_role = role

    if last_role == 'user': messages[-1]['content'] += "\n" + clean_input
    else: messages.append({"role": "user", "content": clean_input})

    # 3. Bucle de ejecución
    iteration = 0
    soft_limit_warned = False
    streamed_response = False
    
    try:
        while iteration < self.hard_limit_iterations:
            iteration += 1
            
            # Soft limit warning
            if iteration == self.soft_limit_iterations and not soft_limit_warned:
                console.print(f"[yellow]⚠ Agent has performed {iteration} steps. This is taking longer than expected.[/yellow]")
                console.print(f"[yellow]  You can press Ctrl+C to interrupt and get a summary of progress.[/yellow]")
                soft_limit_warned = True
            
            label = "[bold purple]Architect" if current_brain == "architect" else "[bold blue]Engineer"
            if status: status.update(f"{label} is thinking... (step {iteration})")
            
            streamed_response = False
            try:
                safe_messages = self._sanitize_messages(messages)
                if stream and not debug:
                    response, streamed_response = self._stream_completion(
                        model=model, messages=safe_messages, tools=tools, api_key=key,
                        status=status, label=label, debug=debug, num_retries=3
                    )
                else:
                    response = completion(model=model, messages=safe_messages, tools=tools, api_key=key, num_retries=3)
            except Exception as e:
                if current_brain == "architect":
                    if status: status.update("[bold orange3]Architect unavailable! Falling back to Engineer...")
                    # Preserve context when falling back - use clean_input directly
                    current_brain = "engineer"
                    model = self.engineer_model
                    tools = self._get_engineer_tools()
                    key = self.engineer_key
                    # Rebuild messages with Engineer system prompt and original user request
                    messages = [{"role": "system", "content": self.engineer_system_prompt}]
                    # Add chat history if exists (excluding system prompt)
                    if chat_history:
                        for msg in chat_history[-self.max_history:]:
                            if msg.get('role') != 'system':
                                messages.append(msg)
                    # Add current user request
                    messages.append({"role": "user", "content": clean_input})
                    continue
                else: 
                    return {"response": f"Error: Both engines failed. {str(e)}", "chat_history": messages[1:], "usage": usage}
            
            if hasattr(response, "usage") and response.usage:
                usage["input"] += getattr(response.usage, "prompt_tokens", 0)
                usage["output"] += getattr(response.usage, "completion_tokens", 0)
                usage["total"] += getattr(response.usage, "total_tokens", 0)

            resp_msg = response.choices[0].message
            msg_dict = resp_msg.model_dump(exclude_none=True)
            if msg_dict.get("tool_calls") and msg_dict.get("content") == "": msg_dict["content"] = None
            messages.append(msg_dict)

            if debug and resp_msg.content:
                console.print(Panel(Markdown(resp_msg.content), title=f"{label} Reasoning", border_style="purple" if current_brain == "architect" else "blue"))

            if not resp_msg.tool_calls: break
            
            # Track if we need to inject a user message after all tool responses
            pending_user_message = None
            
            for tc in resp_msg.tool_calls:
                fn, args = tc.function.name, json.loads(tc.function.arguments)
                
                # Validate tool access based on current brain
                if fn in ['delegate_to_engineer'] and current_brain != "architect":
                    obs = f"Error: Tool '{fn}' is only available to the Architect (Architect). You are the Engineer (Engineer). Use 'run_commands' directly to execute configuration."
                    messages.append({"tool_call_id": tc.id, "role": "tool", "name": fn, "content": obs})
                    continue
                
                if status:
                    if fn == "delegate_to_engineer": status.update(f"[bold purple]Architect: [DELEGATING MISSION] {args.get('task','')[:40]}...")
                    elif fn == "manage_memory_tool": status.update(f"[bold purple]Architect: [UPDATING MEMORY]")

                if debug: console.print(Panel(Text(json.dumps(args, indent=2)), title=f"{label} Decision: {fn}", border_style="white"))

                if fn == "delegate_to_engineer":
                    obs, eng_usage = self._engineer_loop(args["task"], status=status, debug=debug, chat_history=messages[:-1])
                    usage["input"] += eng_usage["input"]; usage["output"] += eng_usage["output"]; usage["total"] += eng_usage["total"]
                elif fn == "consult_architect":
                    if status: status.update("[bold purple]Engineer consulting Architect...")
                    try:
                        # Consultation only - Engineer stays in control
                        claude_resp = completion(
                            model=self.architect_model, 
                            messages=[
                                {"role": "system", "content": self.architect_system_prompt},
                                {"role": "user", "content": f"The Engineer needs your strategic advice.\n\nTECHNICAL SUMMARY: {args['technical_summary']}\n\nQUESTION: {args['question']}\n\nProvide strategic guidance. The Engineer will continue handling the user."}
                            ], 
                            api_key=self.architect_key, 
                            num_retries=3
                        )
                        obs = claude_resp.choices[0].message.content
                        if debug: console.print(Panel(Markdown(obs), title="[bold purple]Architect Consultation[/bold purple]", border_style="purple"))
                    except Exception as e:
                        if status: status.update("[bold orange3]Architect unavailable! Engineer continuing alone...")
                        obs = f"Architect unavailable ({str(e)}). Proceeding with your best technical judgment."
                
                elif fn == "escalate_to_architect":
                    if status: status.update("[bold purple]Transferring control to Architect...")
                    # Full escalation - Architect takes over
                    current_brain = "architect"
                    model = self.architect_model
                    tools = self._get_architect_tools()
                    key = self.architect_key
                    messages[0] = {"role": "system", "content": self.architect_system_prompt}
                    # Prepare handover context to inject AFTER all tool responses
                    handover_msg = f"HANDOVER FROM EXECUTION ENGINE\n\nReason: {args['reason']}\n\nContext: {args['context']}\n\nYou are now in control of this conversation."
                    pending_user_message = handover_msg
                    obs = "Control transferred to Architect. Handover context will be provided."
                    if debug: console.print(Panel(Text(handover_msg), title="[bold purple]Escalation to Architect[/bold purple]", border_style="purple"))
                
                elif fn == "return_to_engineer":
                    if status: status.update("[bold blue]Transferring control back to Engineer...")
                    # Architect returns control to Engineer
                    current_brain = "engineer"
                    model = self.engineer_model
                    tools = self._get_engineer_tools()
                    key = self.engineer_key
                    messages[0] = {"role": "system", "content": self.engineer_system_prompt}
                    # Prepare handover context to inject AFTER all tool responses
                    handover_msg = f"HANDOVER FROM ARCHITECT\n\nSummary: {args['summary']}\n\nYou are now back in control. Continue handling the user's requests."
                    pending_user_message = handover_msg
                    obs = "Control returned to Engineer. Handover summary will be provided."
                    if debug: console.print(Panel(Text(handover_msg), title="[bold blue]Return to Engineer[/bold blue]", border_style="blue"))
                
                elif fn == "list_nodes": obs = self.list_nodes_tool(**args)
                elif fn == "run_commands": obs = self.run_commands_tool(**args, status=status)
                elif fn == "get_node_info": obs = self.get_node_info_tool(**args)
                elif fn == "manage_memory_tool": obs = self.manage_memory_tool(**args)
                elif fn in self.external_tool_handlers: obs = self.external_tool_handlers[fn](self, **args)
                else: obs = f"Error: {fn} unknown."
                
                messages.append({"tool_call_id": tc.id, "role": "tool", "name": fn, "content": obs})
            
            # Inject pending user message AFTER all tool responses are added
            if pending_user_message:
                messages.append({"role": "user", "content": pending_user_message})
        
        if iteration >= self.hard_limit_iterations:
            console.print(f"[red]⛔ Agent reached hard limit ({self.hard_limit_iterations} steps). Forcing stop to prevent infinite loop.[/red]")
            # Only inject user message if we're not in the middle of tool calls
            last_msg = messages[-1] if messages else {}
            if last_msg.get("role") != "assistant" or not last_msg.get("tool_calls"):
                messages.append({"role": "user", "content": "Hard iteration limit reached. Please provide a summary of your findings so far."})
                try:
                    safe_messages = self._sanitize_messages(messages)
                    response = completion(model=model, messages=safe_messages, tools=[], api_key=key)
                    resp_msg = response.choices[0].message
                    messages.append(resp_msg.model_dump(exclude_none=True))
                except Exception as e:
                    if status:
                        status.update(f"[bold red]Error fetching summary: {e}[/bold red]")
                    printer.warning(f"Failed to fetch final summary from LLM: {e}")
    except KeyboardInterrupt:
        if status: status.update("[bold red]Interrupted! Closing pending tasks...")
        last_msg = messages[-1]
        if last_msg.get("tool_calls"):
            for tc in last_msg["tool_calls"]:
                messages.append({"tool_call_id": tc.get("id"), "role": "tool", "name": tc.get("function", {}).get("name"), "content": "Operation cancelled by user."})
        messages.append({"role": "user", "content": "USER INTERRUPTED. Briefly summarize what you were doing and stop."})
        try:
            safe_messages = self._sanitize_messages(messages)
            response = completion(model=model, messages=safe_messages, tools=tools, api_key=key)
            resp_msg = response.choices[0].message
            messages.append(resp_msg.model_dump(exclude_none=True))
        except Exception: pass
    finally:
        try:
            log_dir = self.config.defaultdir
            os.makedirs(log_dir, exist_ok=True)
            log_path = os.path.join(log_dir, "ai_debug.json")
            hist = []
            if os.path.exists(log_path):
                try:
                    with open(log_path, "r") as f: hist = json.load(f)
                except (IOError, json.JSONDecodeError): hist = []
            hist.append({"timestamp": datetime.datetime.now().isoformat(), "roles": {"strategic_engine": self.architect_model, "execution_engine": self.engineer_model}, "session": messages})
            with open(log_path, "w") as f: json.dump(hist[-10:], f, indent=4)
        except Exception as e:
            if debug: console.print(f"[dim red]Debug log failed: {e}[/dim red]")

    return {
        "response": messages[-1].get("content"), 
        "chat_history": messages[1:], 
        "app_related": True, 
        "usage": usage,
        "responder": current_brain,  # "architect" or "engineer"
        "streamed": streamed_response
    }
def confirm(self, user_input)
Expand source code
@MethodHook
def confirm(self, user_input): return True
def get_node_info_tool(self, node_name)
Expand source code
def get_node_info_tool(self, node_name):
    """Get detailed metadata for a specific node. Passwords are masked."""
    try:
        d = self.config.getitem(node_name, extract=True)
        if 'password' in d: d['password'] = '***'
        return json.dumps(d)
    except Exception as e: 
        return f"Error getting node info: {str(e)}"

Get detailed metadata for a specific node. Passwords are masked.

def list_nodes_tool(self, filter_pattern='.*')
Expand source code
def list_nodes_tool(self, filter_pattern=".*"):
    """List nodes matching the filter pattern. Returns metadata for <=5 nodes, names only for more."""
    try:
        matched_names = self.config._getallnodes(filter_pattern)
        if not matched_names: return "No nodes found."
        if len(matched_names) <= 5:
            matched_data = self.config.getitems(matched_names, extract=True)
            res = {}
            for name, data in matched_data.items():
                os_tag = "unknown"
                if isinstance(data, dict):
                    ts = data.get("tags")
                    if isinstance(ts, dict): os_tag = ts.get("os", "unknown")
                res[name] = {"os": os_tag}
            return json.dumps(res)
        return json.dumps({"count": len(matched_names), "nodes": matched_names, "note": "Use 'get_node_info' for details."})
    except Exception as e: 
        return f"Error listing nodes: {str(e)}"

List nodes matching the filter pattern. Returns metadata for <=5 nodes, names only for more.

def manage_memory_tool(self, content, action='append')
Expand source code
def manage_memory_tool(self, content, action="append"):
    """Save or update long-term memory. Only use when user explicitly requests it."""
    if not content or not content.strip():
        return "Error: Cannot save empty content to memory."
    
    try:
        mode = "a" if action == "append" else "w"
        os.makedirs(os.path.dirname(self.memory_path), exist_ok=True)
        with open(self.memory_path, mode) as f:
            timestamp = datetime.datetime.now().strftime('%Y-%m-%d %H:%M')
            f.write(f"\n\n## {timestamp}\n{content.strip()}\n" if action == "append" else content)
        
        # Reload memory after update
        with open(self.memory_path, "r") as f:
            self.long_term_memory = f.read()
        
        return "Memory updated successfully."
    except PermissionError as e:
        return f"Error: Permission denied writing to memory file: {e}"
    except Exception as e:
        return f"Error updating memory: {str(e)}"

Save or update long-term memory. Only use when user explicitly requests it.

def register_ai_tool(self,
tool_definition,
handler,
target='engineer',
engineer_prompt=None,
architect_prompt=None,
status_formatter=None)
Expand source code
def register_ai_tool(self, tool_definition, handler, target="engineer", engineer_prompt=None, architect_prompt=None, status_formatter=None):
    """Register an external tool for the AI system.

    Args:
        tool_definition (dict): OpenAI-compatible tool definition.
        handler (callable): Function(ai_instance, **tool_args) -> str.
        target (str): 'engineer', 'architect', or 'both'.
        engineer_prompt (str): Extra text for engineer system prompt.
        architect_prompt (str): Extra text for architect system prompt.
        status_formatter (callable): Function(args_dict) -> status string.
    """
    name = tool_definition["function"]["name"]
    if target in ("engineer", "both"):
        self.external_engineer_tools.append(tool_definition)
    if target in ("architect", "both"):
        self.external_architect_tools.append(tool_definition)
    self.external_tool_handlers[name] = handler
    if engineer_prompt:
        self.engineer_prompt_extensions.append(engineer_prompt)
    if architect_prompt:
        self.architect_prompt_extensions.append(architect_prompt)
    if status_formatter:
        self.tool_status_formatters[name] = status_formatter

Register an external tool for the AI system.

Args

tool_definition : dict
OpenAI-compatible tool definition.
handler : callable
Function(ai_instance, **tool_args) -> str.
target : str
'engineer', 'architect', or 'both'.
engineer_prompt : str
Extra text for engineer system prompt.
architect_prompt : str
Extra text for architect system prompt.
status_formatter : callable
Function(args_dict) -> status string.
def run_commands_tool(self, nodes_filter, commands, status=None)
Expand source code
def run_commands_tool(self, nodes_filter, commands, status=None):
    """Execute commands on nodes matching the filter. Native interactive confirmation for unsafe commands."""
    # Handle if commands is a JSON string
    if isinstance(commands, str):
        try:
            commands = json.loads(commands)
        except ValueError:
            commands = [c.strip() for c in commands.split('\n') if c.strip()]
    
    # Expand multi-line commands within a list (in case the AI packs them)
    if isinstance(commands, list):
        expanded_commands = []
        for cmd in commands:
            expanded_commands.extend([c.strip() for c in str(cmd).split('\n') if c.strip()])
        commands = expanded_commands
    else:
        commands = [str(commands)]
    
    # Check command safety natively
    if not self.trusted_session:
        unsafe_commands = [cmd for cmd in commands if not self._is_safe_command(cmd)]
        if unsafe_commands:
            # Stop the spinner so prompt doesn't get messed up
            if status: status.stop()
            
            # Show ALL commands with unsafe ones highlighted
            formatted_cmds = []
            for cmd in commands:
                if cmd in unsafe_commands:
                    formatted_cmds.append(f"  • [yellow]{cmd}[/yellow]")
                else:
                    formatted_cmds.append(f"  • {cmd}")
            
            panel_content = f"Target: {nodes_filter}\nCommands:\n" + "\n".join(formatted_cmds)
            console.print(Panel(panel_content, title="[bold yellow]⚠️ UNSAFE COMMANDS DETECTED[/bold yellow]", border_style="yellow"))
            
            try:
                from rich.prompt import Prompt
                user_resp = Prompt.ask("[bold yellow]Execute? (y: yes / n: no / a: allow all this session / <text>: feedback)[/bold yellow]", default="n")
            except KeyboardInterrupt:
                if status: status.update("[bold blue]Engineer: Resuming...")
                console.print("[bold red]✗ Aborted by user (Ctrl+C).[/bold red]")
                return "Error: User cancelled execution (Ctrl+C)."
            
            # Resume the spinner
            if status: status.update("[bold blue]Engineer: Processing user response...")
            
            user_resp_lower = user_resp.strip().lower()
            if user_resp_lower in ['a', 'allow']:
                self.trusted_session = True
                console.print("[bold green]✓ Trust Mode Enabled. All future commands in this session will execute without confirmation.[/bold green]")
            elif user_resp_lower in ['y', 'yes']:
                console.print("[bold green]✓ Executing...[/bold green]")
            elif user_resp_lower in ['n', 'no', '']:
                console.print("[bold red]✗ Execution rejected by user.[/bold red]")
                return "Error: User rejected execution."
            else:
                console.print(f"[bold cyan]User feedback: [/bold cyan]{user_resp}")
                return f"User requested changes: {user_resp}. Please adjust the commands based on this feedback and try again."
    
    try:
        matched_names = self.config._getallnodes(nodes_filter)
        if not matched_names: return "No nodes found matching filter."
        thisnodes_dict = self.config.getitems(matched_names, extract=True)
        result = nodes(thisnodes_dict, config=self.config).run(commands)
        return self._truncate(json.dumps(result))
    except Exception as e: 
        return f"Error executing commands: {str(e)}"

Execute commands on nodes matching the filter. Native interactive confirmation for unsafe commands.

class configfile (conf=None, key=None)
Expand source code
@ClassHook
class configfile:
    ''' This class generates a configfile object. Containts a dictionary storing, config, nodes and profiles, normaly used by connection manager.

    ### Attributes:  

        - file         (str): Path/file to config file.

        - key          (str): Path/file to RSA key file.

        - config      (dict): Dictionary containing information of connection
                              manager configuration.

        - connections (dict): Dictionary containing all the nodes added to
                              connection manager.

        - profiles    (dict): Dictionary containing all the profiles added to
                              connection manager.

        - privatekey   (obj): Object containing the private key to encrypt 
                              passwords.

        - publickey    (obj): Object containing the public key to decrypt 
                              passwords.
        '''

    def __init__(self, conf = None, key = None):
        ''' 
            
        ### Optional Parameters:  

            - conf (str): Path/file to config file. If left empty default
                          path is ~/.config/conn/config.json

            - key  (str): Path/file to RSA key file. If left empty default
                          path is ~/.config/conn/.osk

        '''
        home = os.path.expanduser("~")
        defaultdir = home + '/.config/conn'
        self.defaultdir = defaultdir
        Path(defaultdir).mkdir(parents=True, exist_ok=True)
        Path(f"{defaultdir}/plugins").mkdir(parents=True, exist_ok=True)
        pathfile = defaultdir + '/.folder'
        try:
            with open(pathfile, "r") as f:
                configdir = f.read().strip()
        except (FileNotFoundError, IOError):
            with open(pathfile, "w") as f:
                f.write(str(defaultdir))
            configdir = defaultdir
        defaultfile = configdir + '/config.yaml'
        self.cachefile = configdir + '/.config.cache.json'
        self.fzf_cachefile = configdir + '/.fzf_nodes_cache.txt'
        defaultkey = configdir + '/.osk'
        if conf == None:
            self.file = defaultfile
            
            # Backwards compatibility: Migrate from JSON to YAML
            legacy_json = configdir + '/config.json'
            legacy_noext = configdir + '/config'
            legacy_file = None
            if os.path.exists(legacy_json): legacy_file = legacy_json
            elif os.path.exists(legacy_noext): legacy_file = legacy_noext
            
            if not os.path.exists(self.file) and legacy_file:
                try:
                    with open(legacy_file, 'r') as f:
                        old_data = json.load(f)
                    with open(self.file, 'w') as f:
                        yaml.dump(old_data, f, default_flow_style=False, sort_keys=False)
                    with open(self.cachefile, 'w') as f:
                        json.dump(old_data, f)
                    shutil.move(legacy_file, legacy_file + ".backup")
                    printer.success(f"Migrated legacy config ({len(old_data.get('connections',{}))} folders/nodes) into YAML and Cache successfully!")
                except Exception as e:
                    printer.warning(f"Failed to migrate legacy config: {e}")
        else:
            self.file = conf
            
        if key == None:
            self.key = defaultkey
        else:
            self.key = key
            
        if os.path.exists(self.file):
            config = self._loadconfig(self.file)
        else:
            config = self._createconfig(self.file)
        self.config = config["config"]
        self.connections = config["connections"]
        self.profiles = config["profiles"]
        if not os.path.exists(self.key):
            self._createkey(self.key)
        with open(self.key) as f:
            self.privatekey = RSA.import_key(f.read())
            f.close()
        self.publickey = self.privatekey.publickey()


    def _loadconfig(self, conf):
        #Loads config file using dual cache
        cache_exists = os.path.exists(self.cachefile)
        yaml_time = os.path.getmtime(conf) if os.path.exists(conf) else 0
        cache_time = os.path.getmtime(self.cachefile) if cache_exists else 0

        if not cache_exists or yaml_time > cache_time:
            with open(conf, 'r') as f:
                data = yaml.safe_load(f)
            try:
                with open(self.cachefile, 'w') as f:
                    json.dump(data, f)
            except Exception:
                pass
            return data
        else:
            with open(self.cachefile, 'r') as f:
                return json.load(f)

    def _createconfig(self, conf):
        #Create config file
        defaultconfig = {'config': {'case': False, 'idletime': 30, 'fzf': False}, 'connections': {}, 'profiles': { "default": { "host":"", "protocol":"ssh", "port":"", "user":"", "password":"", "options":"", "logs":"", "tags": "", "jumphost":""}}}
        if not os.path.exists(conf):
            with open(conf, "w") as f:
                yaml.dump(defaultconfig, f, default_flow_style=False, sort_keys=False)
                os.chmod(conf, 0o600)
            try:
                with open(self.cachefile, 'w') as f:
                    json.dump(defaultconfig, f)
            except Exception:
                pass
        with open(conf, 'r') as f:
            jsondata = yaml.safe_load(f)
        return jsondata

    @MethodHook
    def _saveconfig(self, conf):
        #Save config file
        newconfig = {"config":{}, "connections": {}, "profiles": {}}
        newconfig["config"] = self.config
        newconfig["connections"] = self.connections
        newconfig["profiles"] = self.profiles
        try:
            with open(conf, "w") as f:
                yaml.dump(newconfig, f, default_flow_style=False, sort_keys=False)
            with open(self.cachefile, "w") as f:
                json.dump(newconfig, f)
            self._generate_nodes_cache()
        except (IOError, OSError) as e:
            printer.error(f"Failed to save config: {e}")
            return 1
        return 0

    def _generate_nodes_cache(self):
        try:
            nodes = self._getallnodes()
            with open(self.fzf_cachefile, "w") as f:
                f.write("\n".join(nodes))
        except Exception:
            pass

    def _createkey(self, keyfile):
        #Create key file
        key = RSA.generate(2048)
        with open(keyfile,'wb') as f:
            f.write(key.export_key('PEM'))
            f.close()
            os.chmod(keyfile, 0o600)
        return key

    @MethodHook
    def _explode_unique(self, unique):
        #Divide unique name into folder, subfolder and id
        uniques = unique.split("@")
        if not unique.startswith("@"):
            result = {"id": uniques[0]}
        else:
            result = {}
        if len(uniques) == 2:
            result["folder"] = uniques[1]
            if result["folder"] == "":
                return False
        elif len(uniques) == 3:
            result["folder"] = uniques[2]
            result["subfolder"] = uniques[1]
            if result["folder"] == "" or result["subfolder"] == "":
                return False
        elif len(uniques) > 3:
            return False
        return result

    @MethodHook
    def getitem(self, unique, keys = None, extract = False):
        '''
        Get an node or a group of nodes from configfile which can be passed to node/nodes class

        ### Parameters:  

            - unique (str): Unique name of the node or folder in config using
                            connection manager style: node[@subfolder][@folder]
                            or [@subfolder]@folder

        ### Optional Parameters:  

            - keys (list): In case you pass a folder as unique, you can filter
                           nodes inside the folder passing a list.
            - extract (bool): If True, extract information from profiles. 
                              Default False.

        ### Returns:  

            dict: Dictionary containing information of node or multiple 
                  dictionaries of multiple nodes.

        '''
        uniques = self._explode_unique(unique)
        if unique.startswith("@"):
            if uniques.keys() >= {"folder", "subfolder"}:
                folder = self.connections[uniques["folder"]][uniques["subfolder"]]
            else:
                folder = self.connections[uniques["folder"]]
            newfolder = deepcopy(folder)
            newfolder.pop("type")
            for node_name in folder.keys():
                if node_name == "type":
                    continue
                if "type" in newfolder[node_name].keys():
                    if newfolder[node_name]["type"] == "subfolder":
                        newfolder.pop(node_name)
                    else:
                        newfolder[node_name].pop("type")
            
            if keys != None:
                newfolder = dict((k, newfolder[k]) for k in keys)
            
            if extract:
                for node_name, node_keys in newfolder.items():
                    for key, value in node_keys.items():
                        profile = re.search("^@(.*)", str(value))
                        if profile:
                            try:
                                newfolder[node_name][key] = self.profiles[profile.group(1)][key]
                            except KeyError:
                                newfolder[node_name][key] = ""
                        elif value == '' and key == "protocol":
                            try:
                                newfolder[node_name][key] = self.profiles["default"][key]
                            except KeyError:
                                newfolder[node_name][key] = "ssh"
            
            newfolder = {"{}{}".format(k,unique):v for k,v in newfolder.items()}
            return newfolder
        else:
            if uniques.keys() >= {"folder", "subfolder"}:
                node = self.connections[uniques["folder"]][uniques["subfolder"]][uniques["id"]]
            elif "folder" in uniques.keys():
                node = self.connections[uniques["folder"]][uniques["id"]]
            else:
                node = self.connections[uniques["id"]]
            newnode = deepcopy(node)
            newnode.pop("type")
            
            if extract:
                for key, value in newnode.items():
                    profile = re.search("^@(.*)", str(value))
                    if profile:
                        try:
                            newnode[key] = self.profiles[profile.group(1)][key]
                        except KeyError:
                            newnode[key] = ""
                    elif value == '' and key == "protocol":
                        try:
                            newnode[key] = self.profiles["default"][key]
                        except KeyError:
                            newnode[key] = "ssh"
            return newnode

    @MethodHook
    def getitems(self, uniques, extract = False):
        '''
        Get a group of nodes from configfile which can be passed to node/nodes class

        ### Parameters:  

            - uniques (str/list): String name that will match hostnames 
                                  from the connection manager. It can be a 
                                  list of strings.

        ### Optional Parameters:

            - extract (bool): If True, extract information from profiles. 
                              Default False.

        ### Returns:  

            dict: Dictionary containing information of node or multiple 
                  dictionaries of multiple nodes.

        '''
        nodes = {}
        if isinstance(uniques, str):
            uniques = [uniques]
        for i in uniques:
            if isinstance(i, dict):
                name = list(i.keys())[0]
                mylist = i[name]
                if not self.config["case"]:
                    name = name.lower()
                    mylist = [item.lower() for item in mylist]
                this = self.getitem(name, mylist, extract = extract)
                nodes.update(this)
            elif i.startswith("@"):
                if not self.config["case"]:
                    i = i.lower()
                this = self.getitem(i, extract = extract)
                nodes.update(this)
            else:
                if not self.config["case"]:
                    i = i.lower()
                this = self.getitem(i, extract = extract)
                nodes[i] = this
        return nodes


    @MethodHook
    def _connections_add(self,*, id, host, folder='', subfolder='', options='', logs='', password='', port='', protocol='', user='', tags='', jumphost='', type = "connection" ):
        #Add connection from config
        if folder == '':
            self.connections[id] = {"host": host, "options": options, "logs": logs, "password": password, "port": port, "protocol": protocol, "user": user, "tags": tags,"jumphost": jumphost,"type": type}
        elif folder != '' and subfolder == '':
            self.connections[folder][id] = {"host": host, "options": options, "logs": logs, "password": password, "port": port, "protocol": protocol, "user": user, "tags": tags, "jumphost": jumphost, "type": type}
        elif folder != '' and subfolder != '':
            self.connections[folder][subfolder][id] = {"host": host, "options": options, "logs": logs, "password": password, "port": port, "protocol": protocol, "user": user, "tags": tags,  "jumphost": jumphost, "type": type}
            

    @MethodHook
    def _connections_del(self,*, id, folder='', subfolder=''):
        #Delete connection from config
        if folder == '':
            del self.connections[id]
        elif folder != '' and subfolder == '':
            del self.connections[folder][id]
        elif folder != '' and subfolder != '':
            del self.connections[folder][subfolder][id]

    @MethodHook
    def _folder_add(self,*, folder, subfolder = ''):
        #Add Folder from config
        if subfolder == '':
            if folder not in self.connections:
                self.connections[folder] = {"type": "folder"}
        else:
            if subfolder not in self.connections[folder]:
                self.connections[folder][subfolder] = {"type": "subfolder"}

    @MethodHook
    def _folder_del(self,*, folder, subfolder=''):
        #Delete folder from config
        if subfolder == '':
            del self.connections[folder]
        else:
            del self.connections[folder][subfolder]


    @MethodHook
    def _profiles_add(self,*, id, host = '', options='', logs='', password='', port='', protocol='', user='', tags='', jumphost='' ):
        #Add profile from config
        self.profiles[id] = {"host": host, "options": options, "logs": logs, "password": password, "port": port, "protocol": protocol, "user": user, "tags": tags, "jumphost": jumphost}
            

    @MethodHook
    def _profiles_del(self,*, id ):
        #Delete profile from config
        del self.profiles[id]
        
    @MethodHook
    def _getallnodes(self, filter = None):
        #get all nodes on configfile
        nodes = []
        layer1 = [k for k,v in self.connections.items() if isinstance(v, dict) and v.get("type") == "connection"]
        folders = [k for k,v in self.connections.items() if isinstance(v, dict) and v.get("type") == "folder"]
        nodes.extend(layer1)
        for f in folders:
            layer2 = [k + "@" + f for k,v in self.connections[f].items() if isinstance(v, dict) and v.get("type") == "connection"]
            nodes.extend(layer2)
            subfolders = [k for k,v in self.connections[f].items() if isinstance(v, dict) and v.get("type") == "subfolder"]
            for s in subfolders:
                layer3 = [k + "@" + s + "@" + f for k,v in self.connections[f][s].items() if isinstance(v, dict) and v.get("type") == "connection"]
                nodes.extend(layer3)
        if filter:
            if isinstance(filter, str):
                nodes = [item for item in nodes if re.search(filter, item)]
            elif isinstance(filter, list):
                nodes = [item for item in nodes if any(re.search(pattern, item) for pattern in filter)]
            else:
                raise ValueError("filter must be a string or a list of strings")
        return nodes

    @MethodHook
    def _getallnodesfull(self, filter = None, extract = True):
        #get all nodes on configfile with all their attributes.
        nodes = {}
        layer1 = {k:v for k,v in self.connections.items() if isinstance(v, dict) and v.get("type") == "connection"}
        folders = [k for k,v in self.connections.items() if isinstance(v, dict) and v.get("type") == "folder"]
        nodes.update(layer1)
        for f in folders:
            layer2 = {k + "@" + f:v for k,v in self.connections[f].items() if isinstance(v, dict) and v.get("type") == "connection"}
            nodes.update(layer2)
            subfolders = [k for k,v in self.connections[f].items() if isinstance(v, dict) and v.get("type") == "subfolder"]
            for s in subfolders:
                layer3 = {k + "@" + s + "@" + f:v for k,v in self.connections[f][s].items() if isinstance(v, dict) and v.get("type") == "connection"}
                nodes.update(layer3)
        if filter:
            if isinstance(filter, str):
                filter = "^(?!.*@).+$" if filter == "@" else filter
                nodes = {k: v for k, v in nodes.items() if re.search(filter, k)}
            elif isinstance(filter, list):
                filter = ["^(?!.*@).+$" if item == "@" else item for item in filter]
                nodes = {k: v for k, v in nodes.items() if any(re.search(pattern, k) for pattern in filter)}
            else:
                raise ValueError("filter must be a string or a list of strings")
        if extract:
            for node, keys in nodes.items():
                for key, value in keys.items():
                    profile = re.search("^@(.*)", str(value))
                    if profile:
                        try:
                            nodes[node][key] = self.profiles[profile.group(1)][key]
                        except KeyError:
                            nodes[node][key] = ""
                    elif value == '' and key == "protocol":
                        try:
                            nodes[node][key] = self.profiles["default"][key]
                        except KeyError:
                            nodes[node][key] = "ssh"
        return nodes


    @MethodHook
    def _getallfolders(self):
        #get all folders on configfile
        folders = ["@" + k for k,v in self.connections.items() if isinstance(v, dict) and v.get("type") == "folder"]
        subfolders = []
        for f in folders:
            s = ["@" + k + f for k,v in self.connections[f[1:]].items() if isinstance(v, dict) and v.get("type") == "subfolder"]
            subfolders.extend(s)
        folders.extend(subfolders)
        return folders

    @MethodHook
    def _profileused(self, profile):
        #Return all the nodes that uses this profile.
        nodes = []
        layer1 = [k for k,v in self.connections.items() if isinstance(v, dict) and v.get("type") == "connection" and ("@" + profile in v.values() or ( isinstance(v.get("password"),list) and "@" + profile in v.get("password")))]
        folders = [k for k,v in self.connections.items() if isinstance(v, dict) and v.get("type") == "folder"]
        nodes.extend(layer1)
        for f in folders:
            layer2 = [k + "@" + f for k,v in self.connections[f].items() if isinstance(v, dict) and v.get("type") == "connection" and ("@" + profile in v.values() or ( isinstance(v.get("password"),list) and "@" + profile in v.get("password")))]
            nodes.extend(layer2)
            subfolders = [k for k,v in self.connections[f].items() if isinstance(v, dict) and v.get("type") == "subfolder"]
            for s in subfolders:
                layer3 = [k + "@" + s + "@" + f for k,v in self.connections[f][s].items() if isinstance(v, dict) and v.get("type") == "connection" and ("@" + profile in v.values() or ( isinstance(v.get("password"),list) and "@" + profile in v.get("password")))]
                nodes.extend(layer3)
        return nodes

    @MethodHook
    def encrypt(self, password, keyfile=None):
        '''
        Encrypts password using RSA keyfile

        ### Parameters:  

            - password (str): Plaintext password to encrypt.

        ### Optional Parameters:  

            - keyfile  (str): Path/file to keyfile. Default is config keyfile.
                              

        ### Returns:  

            str: Encrypted password.

        '''
        if keyfile is None:
            keyfile = self.key
        with open(keyfile) as f:
            key = RSA.import_key(f.read())
            f.close()
        publickey = key.publickey()
        encryptor = PKCS1_OAEP.new(publickey)
        password = encryptor.encrypt(password.encode("utf-8"))
        return str(password)

This class generates a configfile object. Containts a dictionary storing, config, nodes and profiles, normaly used by connection manager.

Attributes:

- file         (str): Path/file to config file.

- key          (str): Path/file to RSA key file.

- config      (dict): Dictionary containing information of connection
                      manager configuration.

- connections (dict): Dictionary containing all the nodes added to
                      connection manager.

- profiles    (dict): Dictionary containing all the profiles added to
                      connection manager.

- privatekey   (obj): Object containing the private key to encrypt 
                      passwords.

- publickey    (obj): Object containing the public key to decrypt 
                      passwords.

Optional Parameters:

- conf (str): Path/file to config file. If left empty default
              path is ~/.config/conn/config.json

- key  (str): Path/file to RSA key file. If left empty default
              path is ~/.config/conn/.osk

Methods

def encrypt(self, password, keyfile=None)
Expand source code
@MethodHook
def encrypt(self, password, keyfile=None):
    '''
    Encrypts password using RSA keyfile

    ### Parameters:  

        - password (str): Plaintext password to encrypt.

    ### Optional Parameters:  

        - keyfile  (str): Path/file to keyfile. Default is config keyfile.
                          

    ### Returns:  

        str: Encrypted password.

    '''
    if keyfile is None:
        keyfile = self.key
    with open(keyfile) as f:
        key = RSA.import_key(f.read())
        f.close()
    publickey = key.publickey()
    encryptor = PKCS1_OAEP.new(publickey)
    password = encryptor.encrypt(password.encode("utf-8"))
    return str(password)

Encrypts password using RSA keyfile

Parameters:

- password (str): Plaintext password to encrypt.

Optional Parameters:

- keyfile  (str): Path/file to keyfile. Default is config keyfile.

Returns:

str: Encrypted password.
def getitem(self, unique, keys=None, extract=False)
Expand source code
@MethodHook
def getitem(self, unique, keys = None, extract = False):
    '''
    Get an node or a group of nodes from configfile which can be passed to node/nodes class

    ### Parameters:  

        - unique (str): Unique name of the node or folder in config using
                        connection manager style: node[@subfolder][@folder]
                        or [@subfolder]@folder

    ### Optional Parameters:  

        - keys (list): In case you pass a folder as unique, you can filter
                       nodes inside the folder passing a list.
        - extract (bool): If True, extract information from profiles. 
                          Default False.

    ### Returns:  

        dict: Dictionary containing information of node or multiple 
              dictionaries of multiple nodes.

    '''
    uniques = self._explode_unique(unique)
    if unique.startswith("@"):
        if uniques.keys() >= {"folder", "subfolder"}:
            folder = self.connections[uniques["folder"]][uniques["subfolder"]]
        else:
            folder = self.connections[uniques["folder"]]
        newfolder = deepcopy(folder)
        newfolder.pop("type")
        for node_name in folder.keys():
            if node_name == "type":
                continue
            if "type" in newfolder[node_name].keys():
                if newfolder[node_name]["type"] == "subfolder":
                    newfolder.pop(node_name)
                else:
                    newfolder[node_name].pop("type")
        
        if keys != None:
            newfolder = dict((k, newfolder[k]) for k in keys)
        
        if extract:
            for node_name, node_keys in newfolder.items():
                for key, value in node_keys.items():
                    profile = re.search("^@(.*)", str(value))
                    if profile:
                        try:
                            newfolder[node_name][key] = self.profiles[profile.group(1)][key]
                        except KeyError:
                            newfolder[node_name][key] = ""
                    elif value == '' and key == "protocol":
                        try:
                            newfolder[node_name][key] = self.profiles["default"][key]
                        except KeyError:
                            newfolder[node_name][key] = "ssh"
        
        newfolder = {"{}{}".format(k,unique):v for k,v in newfolder.items()}
        return newfolder
    else:
        if uniques.keys() >= {"folder", "subfolder"}:
            node = self.connections[uniques["folder"]][uniques["subfolder"]][uniques["id"]]
        elif "folder" in uniques.keys():
            node = self.connections[uniques["folder"]][uniques["id"]]
        else:
            node = self.connections[uniques["id"]]
        newnode = deepcopy(node)
        newnode.pop("type")
        
        if extract:
            for key, value in newnode.items():
                profile = re.search("^@(.*)", str(value))
                if profile:
                    try:
                        newnode[key] = self.profiles[profile.group(1)][key]
                    except KeyError:
                        newnode[key] = ""
                elif value == '' and key == "protocol":
                    try:
                        newnode[key] = self.profiles["default"][key]
                    except KeyError:
                        newnode[key] = "ssh"
        return newnode

Get an node or a group of nodes from configfile which can be passed to node/nodes class

Parameters:

- unique (str): Unique name of the node or folder in config using
                connection manager style: node[@subfolder][@folder]
                or [@subfolder]@folder

Optional Parameters:

- keys (list): In case you pass a folder as unique, you can filter
               nodes inside the folder passing a list.
- extract (bool): If True, extract information from profiles. 
                  Default False.

Returns:

dict: Dictionary containing information of node or multiple 
      dictionaries of multiple nodes.
def getitems(self, uniques, extract=False)
Expand source code
@MethodHook
def getitems(self, uniques, extract = False):
    '''
    Get a group of nodes from configfile which can be passed to node/nodes class

    ### Parameters:  

        - uniques (str/list): String name that will match hostnames 
                              from the connection manager. It can be a 
                              list of strings.

    ### Optional Parameters:

        - extract (bool): If True, extract information from profiles. 
                          Default False.

    ### Returns:  

        dict: Dictionary containing information of node or multiple 
              dictionaries of multiple nodes.

    '''
    nodes = {}
    if isinstance(uniques, str):
        uniques = [uniques]
    for i in uniques:
        if isinstance(i, dict):
            name = list(i.keys())[0]
            mylist = i[name]
            if not self.config["case"]:
                name = name.lower()
                mylist = [item.lower() for item in mylist]
            this = self.getitem(name, mylist, extract = extract)
            nodes.update(this)
        elif i.startswith("@"):
            if not self.config["case"]:
                i = i.lower()
            this = self.getitem(i, extract = extract)
            nodes.update(this)
        else:
            if not self.config["case"]:
                i = i.lower()
            this = self.getitem(i, extract = extract)
            nodes[i] = this
    return nodes

Get a group of nodes from configfile which can be passed to node/nodes class

Parameters:

- uniques (str/list): String name that will match hostnames 
                      from the connection manager. It can be a 
                      list of strings.

Optional Parameters:

- extract (bool): If True, extract information from profiles. 
                  Default False.

Returns:

dict: Dictionary containing information of node or multiple 
      dictionaries of multiple nodes.
class node (unique,
host,
options='',
logs='',
password='',
port='',
protocol='',
user='',
config='',
tags='',
jumphost='')
Expand source code
@ClassHook
class node:
    ''' This class generates a node object. Containts all the information and methods to connect and interact with a device using ssh or telnet.

    ### Attributes:  

        - output (str): Output of the commands you ran with run or test 
                        method.  

        - result(bool): True if expected value is found after running 
                        the commands using test method.

        - status (int): 0 if the method run or test run successfully.
                        1 if connection failed.
                        2 if expect timeouts without prompt or EOF.

        '''
    
    def __init__(self, unique, host, options='', logs='', password='', port='', protocol='', user='', config='', tags='', jumphost=''):
        ''' 
            
        ### Parameters:  

            - unique (str): Unique name to assign to the node.

            - host   (str): IP address or hostname of the node.

        ### Optional Parameters:  

            - options  (str): Additional options to pass the ssh/telnet for
                              connection.  

            - logs     (str): Path/file for storing the logs. You can use 
                              ${unique},${host}, ${port}, ${user}, ${protocol} 
                              as variables.  

            - password (str): Encrypted or plaintext password.  

            - port     (str): Port to connect to node, default 22 for ssh and 23 
                              for telnet.  

            - protocol (str): Select ssh, telnet, kubectl or docker. Default is ssh.  

            - user     (str): Username to of the node.  

            - config   (obj): Pass the object created with class configfile with 
                              key for decryption and extra configuration if you 
                              are using connection manager.  

            - tags   (dict) : Tags useful for automation and personal porpuse
                              like "os", "prompt" and "screenleght_command"
                              
            - jumphost (str): Reference another node to be used as a jumphost
        '''
        if config == '':
            self.idletime = 0
            self.key = None
        else:
            self.idletime = config.config["idletime"]
            self.key = config.key
        self.unique = unique
        attr = {"host": host, "logs": logs, "options":options, "port": port, "protocol": protocol, "user": user, "tags": tags, "jumphost": jumphost}
        for key in attr:
            profile = re.search("^@(.*)", str(attr[key]))
            if profile and config != '':
                try:
                    setattr(self,key,config.profiles[profile.group(1)][key])
                except KeyError:
                    setattr(self,key,"")
            elif attr[key] == '' and key == "protocol":
                try:
                    setattr(self,key,config.profiles["default"][key])
                except (KeyError, AttributeError):
                    setattr(self,key,"ssh")
            else: 
                setattr(self,key,attr[key])
        if isinstance(password,list):
            self.password = []
            for i, s in enumerate(password):
                profile = re.search("^@(.*)", password[i])
                if profile and config != '':
                    self.password.append(config.profiles[profile.group(1)]["password"])
        else:
            self.password = [password]
        if self.jumphost != "" and config != '':
            self.jumphost = config.getitem(self.jumphost)
            for key in self.jumphost:
                profile = re.search("^@(.*)", str(self.jumphost[key]))
                if profile:
                    try:
                        self.jumphost[key] = config.profiles[profile.group(1)][key]
                    except KeyError:
                        self.jumphost[key] = ""
                elif self.jumphost[key] == '' and key == "protocol":
                    try:
                        self.jumphost[key] = config.profiles["default"][key]
                    except KeyError:
                        self.jumphost[key] = "ssh"
            if isinstance(self.jumphost["password"],list):
                jumphost_password = []
                for i, s in enumerate(self.jumphost["password"]):
                    profile = re.search("^@(.*)", self.jumphost["password"][i])
                    if profile:
                        jumphost_password.append(config.profiles[profile.group(1)]["password"])
                self.jumphost["password"] = jumphost_password
            else:
                self.jumphost["password"] = [self.jumphost["password"]]
            if self.jumphost["password"] != [""]:
                self.password = self.jumphost["password"] + self.password

            if self.jumphost["protocol"] == "ssh":
                jumphost_cmd = self.jumphost["protocol"] + " -W %h:%p"
                if self.jumphost["port"] != '':
                    jumphost_cmd = jumphost_cmd + " -p " + self.jumphost["port"]
                if self.jumphost["options"] != '':
                    jumphost_cmd = jumphost_cmd + " " + self.jumphost["options"]
                if self.jumphost["user"] == '':
                    jumphost_cmd = jumphost_cmd + " {}".format(self.jumphost["host"])
                else:
                    jumphost_cmd = jumphost_cmd + " {}".format("@".join([self.jumphost["user"],self.jumphost["host"]]))
                self.jumphost = f"-o ProxyCommand=\"{jumphost_cmd}\""
            else:
                self.jumphost = ""

    @MethodHook
    def _passtx(self, passwords, *, keyfile=None):
        # decrypts passwords, used by other methdos.
        dpass = []
        if keyfile is None:
            keyfile = self.key
        if keyfile is not None:
            with open(keyfile) as f:
                key = RSA.import_key(f.read())
            decryptor = PKCS1_OAEP.new(key)
        for passwd in passwords:
            if not re.match('^b[\"\'].+[\"\']$', passwd):
                dpass.append(passwd)
            else:
                try:
                    decrypted = decryptor.decrypt(ast.literal_eval(passwd)).decode("utf-8")
                    dpass.append(decrypted)
                except Exception:
                    raise ValueError("Missing or corrupted key")
        return dpass

    

    @MethodHook
    def _logfile(self, logfile = None):
        # translate logs variables and generate logs path.
        if logfile == None:
            logfile = self.logs
        logfile = logfile.replace("${unique}", self.unique)
        logfile = logfile.replace("${host}", self.host)
        logfile = logfile.replace("${port}", self.port)
        logfile = logfile.replace("${user}", self.user)
        logfile = logfile.replace("${protocol}", self.protocol)
        now = datetime.datetime.now()
        dateconf = re.search(r'\$\{date \'(.*)\'}', logfile)
        if dateconf:
            logfile = re.sub(r'\$\{date (.*)}',now.strftime(dateconf.group(1)), logfile)
        return logfile

    @MethodHook
    def _logclean(self, logfile, var = False):
        #Remove special ascii characters and other stuff from logfile.
        if var == False:
            t = open(logfile, "r").read()
        else:
            t = logfile
        while t.find("\b") != -1:
            t = re.sub('[^\b]\b', '', t)
        t = t.replace("\n","",1)
        t = t.replace("\a","")
        t = t.replace('\n\n', '\n')
        t = re.sub(r'.\[K', '', t)
        ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/ ]*[@-~])')
        t = ansi_escape.sub('', t)
        t = t.lstrip(" \n\r")
        t = t.replace("\r","")
        t = t.replace("\x0E","")
        t = t.replace("\x0F","")
        if var == False:
            d = open(logfile, "w")
            d.write(t)
            d.close()
            return
        else:
            return t

    @MethodHook
    def _savelog(self):
        '''Save the log buffer to the file at regular intervals if there are changes.'''
        t = threading.current_thread()
        prev_size = 0  # Store the previous size of the buffer

        while getattr(t, "do_run", True):  # Check if thread is signaled to stop
            current_size = self.mylog.tell()  # Current size of the buffer

            # Only save if the buffer size has changed
            if current_size != prev_size:
                with open(self.logfile, "w") as f:  # Use "w" to overwrite the file
                    f.write(self._logclean(self.mylog.getvalue().decode(), True))
                prev_size = current_size  # Update the previous size
            sleep(5)

    @MethodHook
    def _filter(self, a):
        #Set time for last input when using interact
        self.lastinput = time()
        return a

    @MethodHook
    def _keepalive(self):
        #Send keepalive ctrl+e when idletime passed without new inputs on interact
        self.lastinput = time()
        t = threading.current_thread()
        while True:
            if time() - self.lastinput >= self.idletime:
                self.child.sendcontrol("e")
                self.lastinput = time()
            sleep(1)


    @MethodHook
    def interact(self, debug = False):
        '''
        Allow user to interact with the node directly, mostly used by connection manager.

        ### Optional Parameters:  

            - debug (bool): If True, display all the connecting information 
                            before interact. Default False.  
        '''
        connect = self._connect(debug = debug)
        if connect == True:
            size = re.search('columns=([0-9]+).*lines=([0-9]+)',str(os.get_terminal_size()))
            self.child.setwinsize(int(size.group(2)),int(size.group(1)))
            printer.success("Connected to " + self.unique + " at " + self.host + (":" if self.port != '' else '') + self.port + " via: " + self.protocol)
            if 'logfile' in dir(self):
                # Initialize self.mylog
                if not 'mylog' in dir(self):
                    self.mylog = io.BytesIO()
                self.child.logfile_read = self.mylog
                
                # Start the _savelog thread
                log_thread = threading.Thread(target=self._savelog)
                log_thread.daemon = True
                log_thread.start()
            if 'missingtext' in dir(self):
                print(self.child.after.decode(), end='')
            if self.idletime > 0:
                x = threading.Thread(target=self._keepalive)
                x.daemon = True
                x.start()
            if debug:
                print(self.mylog.getvalue().decode())
            self.child.interact(input_filter=self._filter)
            if 'logfile' in dir(self):
                with open(self.logfile, "w") as f:
                    f.write(self._logclean(self.mylog.getvalue().decode(), True))

        else:
            printer.error(connect)
            exit(1)

    @MethodHook
    def run(self, commands, vars = None,*, folder = '', prompt = r'>$|#$|\$$|>.$|#.$|\$.$', stdout = False, timeout = 10):
        '''
        Run a command or list of commands on the node and return the output.

        ### Parameters:  

            - commands (str/list): Commands to run on the node. Should be 
                                   str or a list of str. You can use variables
                                   as {varname} and defining them in optional
                                   parameter vars.

        ### Optional Parameters:  

            - vars  (dict): Dictionary containing the definition of variables
                            used in commands parameter.
                            Keys: Variable names.
                            Values: strings.

        ### Optional Named Parameters:  

            - folder (str): Path where output log should be stored, leave 
                            empty to disable logging.  

            - prompt (str): Prompt to be expected after a command is finished 
                            running. Usually linux uses  ">" or EOF while 
                            routers use ">" or "#". The default value should 
                            work for most nodes. Change it if your connection 
                            need some special symbol.  

            - stdout (bool):Set True to send the command output to stdout. 
                            default False.

            - timeout (int):Time in seconds for expect to wait for prompt/EOF.
                            default 10.

        ### Returns:  

            str: Output of the commands you ran on the node.

        '''
        connect = self._connect(timeout = timeout)
        now = datetime.datetime.now().strftime('%Y-%m-%d_%H%M%S')
        if connect == True:
            # Attempt to set the terminal size
            try:
                self.child.setwinsize(65535, 65535)
            except Exception:
                try:
                    self.child.setwinsize(10000, 10000)
                except Exception:
                    pass
            if "prompt" in self.tags:
                prompt = self.tags["prompt"]
            expects = [prompt, pexpect.EOF, pexpect.TIMEOUT]
            output = ''
            status = ''
            if not isinstance(commands, list):
                commands = [commands]
            if "screen_length_command" in self.tags:
                commands.insert(0, self.tags["screen_length_command"])
            self.mylog = io.BytesIO()
            self.child.logfile_read = self.mylog
            for c in commands:
                if vars is not None:
                    c = c.format(**vars)
                result = self.child.expect(expects, timeout = timeout)
                self.child.sendline(c)
                if result == 2:
                    break
            if not result == 2:
                result = self.child.expect(expects, timeout = timeout)
            self.child.close()
            output = self._logclean(self.mylog.getvalue().decode(), True)
            if stdout == True:
                print(output)
            if folder != '':
                with open(folder + "/" + self.unique + "_" + now + ".txt", "w") as f:
                    f.write(output)
                    f.close()
            self.output = output
            if result == 2:
                self.status = 2
            else:
                self.status = 0
            return output
        else:
            self.output = connect
            self.status = 1
            if stdout == True:
                print(connect)
            if folder != '':
                with open(folder + "/" + self.unique + "_" + now + ".txt", "w") as f:
                    f.write(connect)
                    f.close()
            return connect

    @MethodHook
    def test(self, commands, expected, vars = None,*, prompt = r'>$|#$|\$$|>.$|#.$|\$.$', timeout = 10):
        '''
        Run a command or list of commands on the node, then check if expected value appears on the output after the last command.

        ### Parameters:  

            - commands (str/list): Commands to run on the node. Should be
                                   str or a list of str. You can use variables
                                   as {varname} and defining them in optional
                                   parameter vars.

            - expected (str)     : Expected text to appear after running 
                                   all the commands on the node.You can use
                                   variables as {varname} and defining them
                                   in optional parameter vars.

        ### Optional Parameters:  

            - vars  (dict): Dictionary containing the definition of variables
                            used in commands and expected parameters.
                            Keys: Variable names.
                            Values: strings.

        ### Optional Named Parameters: 

            - prompt (str): Prompt to be expected after a command is finished
                            running. Usually linux uses  ">" or EOF while 
                            routers use ">" or "#". The default value should 
                            work for most nodes. Change it if your connection 
                            need some special symbol.

            - timeout (int):Time in seconds for expect to wait for prompt/EOF.
                            default 10.

        ### Returns: 
            bool: true if expected value is found after running the commands 
                  false if prompt is found before.

        '''
        connect = self._connect(timeout = timeout)
        if connect == True:
            # Attempt to set the terminal size
            try:
                self.child.setwinsize(65535, 65535)
            except Exception:
                try:
                    self.child.setwinsize(10000, 10000)
                except Exception:
                    pass
            if "prompt" in self.tags:
                prompt = self.tags["prompt"]
            expects = [prompt, pexpect.EOF, pexpect.TIMEOUT]
            output = ''
            if not isinstance(commands, list):
                commands = [commands]
            if not isinstance(expected, list):
                expected = [expected]
            if "screen_length_command" in self.tags:
                commands.insert(0, self.tags["screen_length_command"])
            self.mylog = io.BytesIO()
            self.child.logfile_read = self.mylog
            for c in commands:
                if vars is not None:
                    c = c.format(**vars)
                result = self.child.expect(expects, timeout = timeout)
                self.child.sendline(c)
                if result == 2:
                    break
            if not result == 2:
                result = self.child.expect(expects, timeout = timeout)
            self.child.close()
            output = self._logclean(self.mylog.getvalue().decode(), True)
            self.output = output
            if result in [0, 1]:
                # lastcommand = commands[-1]
                # if vars is not None:
                    # lastcommand = lastcommand.format(**vars)
                # last_command_index = output.rfind(lastcommand)
                # cleaned_output = output[last_command_index + len(lastcommand):].strip()
                self.result = {}
                for e in expected:
                    if vars is not None:
                        e = e.format(**vars)
                    updatedprompt = re.sub(r'(?<!\\)\$', '', prompt)
                    newpattern = f".*({updatedprompt}).*{e}.*"
                    cleaned_output = output
                    cleaned_output = re.sub(newpattern, '', cleaned_output)
                    if e in cleaned_output:
                        self.result[e] = True
                    else:
                        self.result[e]= False
                self.status = 0
                return self.result
            if result == 2:
                self.result = None
                self.status = 2
                return output
        else:
            self.result = None
            self.output = connect
            self.status = 1
            return connect

    @MethodHook
    def _generate_ssh_sftp_cmd(self):
        cmd = self.protocol
        if self.idletime > 0:
            cmd += " -o ServerAliveInterval=" + str(self.idletime)
        if self.port:
            if self.protocol == "ssh":
                cmd += " -p " + self.port
            elif self.protocol == "sftp":
                cmd += " -P " + self.port
        if self.options:
            cmd += " " + self.options
        if self.jumphost:
            cmd += " " + self.jumphost
        user_host = f"{self.user}@{self.host}" if self.user else self.host
        cmd += f" {user_host}"
        return cmd

    @MethodHook
    def _generate_telnet_cmd(self):
        cmd = f"telnet {self.host}"
        if self.port:
            cmd += f" {self.port}"
        if self.options:
            cmd += f" {self.options}"
        return cmd

    @MethodHook
    def _generate_kube_cmd(self):
        cmd = f"kubectl exec {self.options} {self.host} -it --"
        kube_command = self.tags.get("kube_command", "/bin/bash") if isinstance(self.tags, dict) else "/bin/bash"
        cmd += f" {kube_command}"
        return cmd

    @MethodHook
    def _generate_docker_cmd(self):
        cmd = f"docker {self.options} exec -it {self.host}"
        docker_command = self.tags.get("docker_command", "/bin/bash") if isinstance(self.tags, dict) else "/bin/bash"
        cmd += f" {docker_command}"
        return cmd

    @MethodHook
    def _get_cmd(self):
        if self.protocol in ["ssh", "sftp"]:
            return self._generate_ssh_sftp_cmd()
        elif self.protocol == "telnet":
            return self._generate_telnet_cmd()
        elif self.protocol == "kubectl":
            return self._generate_kube_cmd()
        elif self.protocol == "docker":
            return self._generate_docker_cmd()
        else:
            raise ValueError(f"Invalid protocol: {self.protocol}")

    @MethodHook
    def _connect(self, debug=False, timeout=10, max_attempts=3):
        cmd = self._get_cmd()
        passwords = self._passtx(self.password) if self.password[0] else []
        if self.logs != '':
            self.logfile = self._logfile()
        default_prompt = r'>$|#$|\$$|>.$|#.$|\$.$'
        prompt = self.tags.get("prompt", default_prompt) if isinstance(self.tags, dict) else default_prompt
        password_prompt = '[p|P]assword:|[u|U]sername:' if self.protocol != 'telnet' else '[p|P]assword:'

        expects = {
            "ssh": ['yes/no', 'refused', 'supported', 'Invalid|[u|U]sage: ssh', 'ssh-keygen.*\"', 'timeout|timed.out', 'unavailable', 'closed', password_prompt, prompt, 'suspend', pexpect.EOF, pexpect.TIMEOUT, "No route to host", "resolve hostname", "no matching", "[b|B]ad (owner|permissions)"],
            "sftp": ['yes/no', 'refused', 'supported', 'Invalid|[u|U]sage: sftp', 'ssh-keygen.*\"', 'timeout|timed.out', 'unavailable', 'closed', password_prompt, prompt, 'suspend', pexpect.EOF, pexpect.TIMEOUT, "No route to host", "resolve hostname", "no matching", "[b|B]ad (owner|permissions)"],
            "telnet": ['[u|U]sername:', 'refused', 'supported', 'invalid|unrecognized option', 'ssh-keygen.*\"', 'timeout|timed.out', 'unavailable', 'closed', password_prompt, prompt, 'suspend', pexpect.EOF, pexpect.TIMEOUT, "No route to host", "resolve hostname", "no matching", "[b|B]ad (owner|permissions)"],
            "kubectl": ['[u|U]sername:', '[r|R]efused', '[E|e]rror', 'DEPRECATED', pexpect.TIMEOUT, password_prompt, prompt, pexpect.EOF, "expired|invalid"],
            "docker": ['[u|U]sername:', 'Cannot', '[E|e]rror', 'failed', 'not a docker command', 'unknown', 'unable to resolve', pexpect.TIMEOUT, password_prompt, prompt, pexpect.EOF]
        }

        error_indices = {
            "ssh": [1, 2, 3, 4, 5, 6, 7, 12, 13, 14, 15, 16],
            "sftp": [1, 2, 3, 4, 5, 6, 7, 12, 13, 14, 15, 16],
            "telnet": [1, 2, 3, 4, 5, 6, 7, 12, 13, 14, 15, 16],
            "kubectl": [1, 2, 3, 4, 8],  # Define error indices for kube
            "docker": [1, 2, 3, 4, 5, 6, 7]  # Define error indices for docker
        }

        eof_indices = {
            "ssh": [8, 9, 10, 11],
            "sftp": [8, 9, 10, 11],
            "telnet": [8, 9, 10, 11],
            "kubectl": [5, 6, 7],  # Define eof indices for kube
            "docker": [8, 9, 10]  # Define eof indices for docker
        }

        initial_indices = {
            "ssh": [0],
            "sftp": [0],
            "telnet": [0],
            "kubectl": [0],  # Define special indices for kube
            "docker": [0]  # Define special indices for docker
        }

        attempts = 1
        while attempts <= max_attempts:
            child = pexpect.spawn(cmd)
            if isinstance(self.tags, dict) and self.tags.get("console"):
                child.sendline()
            if debug:
                printer.debug(f"Command:\n{cmd}")
                self.mylog = io.BytesIO()
                child.logfile_read = self.mylog

            endloop = False
            for i in range(len(passwords) if passwords else 1):
                while True:
                    results = child.expect(expects[self.protocol], timeout=timeout)
                    results_value = expects[self.protocol][results]
                    
                    if results in initial_indices[self.protocol]:
                        if self.protocol in ["ssh", "sftp"]:
                            child.sendline('yes')
                        elif self.protocol in ["telnet", "kubectl", "docker"]:
                            if self.user:
                                child.sendline(self.user)
                            else:
                                self.missingtext = True
                                break
                    
                    elif results in error_indices[self.protocol]:
                        child.terminate()
                        if results_value == pexpect.TIMEOUT and attempts != max_attempts:
                            attempts += 1
                            endloop = True
                            break
                        else:
                            after = "Connection timeout" if results_value == pexpect.TIMEOUT else child.after.decode()
                            return f"Connection failed code: {results}\n{child.before.decode().lstrip()}{after}{child.readline().decode()}".rstrip()
                    
                    elif results in eof_indices[self.protocol]:
                        if results_value == password_prompt:
                            if passwords:
                                child.sendline(passwords[i])
                            else:
                                self.missingtext = True
                            break
                        elif results_value == "suspend":
                            child.sendline("\r")
                            sleep(2)
                        else:
                            endloop = True
                            child.sendline()
                            break
                    
                if endloop:
                    break
            if results_value == pexpect.TIMEOUT:
                continue
            else:
                break

        if isinstance(self.tags, dict) and self.tags.get("post_connect_commands"):
            cmds = self.tags.get("post_connect_commands")
            commands = [cmds] if isinstance(cmds, str) else cmds
            for command in commands:
                child.sendline(command)
                sleep(1)
        child.readline(0)
        self.child = child
        from pexpect import fdpexpect
        self.raw_child = fdpexpect.fdspawn(self.child.child_fd)
        return True

This class generates a node object. Containts all the information and methods to connect and interact with a device using ssh or telnet.

Attributes:

- output (str): Output of the commands you ran with run or test 
                method.

- result(bool): True if expected value is found after running 
                the commands using test method.

- status (int): 0 if the method run or test run successfully.
                1 if connection failed.
                2 if expect timeouts without prompt or EOF.

Parameters:

- unique (str): Unique name to assign to the node.

- host   (str): IP address or hostname of the node.

Optional Parameters:

- options  (str): Additional options to pass the ssh/telnet for
                  connection.

- logs     (str): Path/file for storing the logs. You can use 
                  ${unique},${host}, ${port}, ${user}, ${protocol} 
                  as variables.

- password (str): Encrypted or plaintext password.

- port     (str): Port to connect to node, default 22 for ssh and 23 
                  for telnet.

- protocol (str): Select ssh, telnet, kubectl or docker. Default is ssh.

- user     (str): Username to of the node.

- config   (obj): Pass the object created with class configfile with 
                  key for decryption and extra configuration if you 
                  are using connection manager.

- tags   (dict) : Tags useful for automation and personal porpuse
                  like "os", "prompt" and "screenleght_command"

- jumphost (str): Reference another node to be used as a jumphost

Methods

def interact(self, debug=False)
Expand source code
@MethodHook
def interact(self, debug = False):
    '''
    Allow user to interact with the node directly, mostly used by connection manager.

    ### Optional Parameters:  

        - debug (bool): If True, display all the connecting information 
                        before interact. Default False.  
    '''
    connect = self._connect(debug = debug)
    if connect == True:
        size = re.search('columns=([0-9]+).*lines=([0-9]+)',str(os.get_terminal_size()))
        self.child.setwinsize(int(size.group(2)),int(size.group(1)))
        printer.success("Connected to " + self.unique + " at " + self.host + (":" if self.port != '' else '') + self.port + " via: " + self.protocol)
        if 'logfile' in dir(self):
            # Initialize self.mylog
            if not 'mylog' in dir(self):
                self.mylog = io.BytesIO()
            self.child.logfile_read = self.mylog
            
            # Start the _savelog thread
            log_thread = threading.Thread(target=self._savelog)
            log_thread.daemon = True
            log_thread.start()
        if 'missingtext' in dir(self):
            print(self.child.after.decode(), end='')
        if self.idletime > 0:
            x = threading.Thread(target=self._keepalive)
            x.daemon = True
            x.start()
        if debug:
            print(self.mylog.getvalue().decode())
        self.child.interact(input_filter=self._filter)
        if 'logfile' in dir(self):
            with open(self.logfile, "w") as f:
                f.write(self._logclean(self.mylog.getvalue().decode(), True))

    else:
        printer.error(connect)
        exit(1)

Allow user to interact with the node directly, mostly used by connection manager.

Optional Parameters:

- debug (bool): If True, display all the connecting information 
                before interact. Default False.
def run(self,
commands,
vars=None,
*,
folder='',
prompt='>$|#$|\\$$|>.$|#.$|\\$.$',
stdout=False,
timeout=10)
Expand source code
@MethodHook
def run(self, commands, vars = None,*, folder = '', prompt = r'>$|#$|\$$|>.$|#.$|\$.$', stdout = False, timeout = 10):
    '''
    Run a command or list of commands on the node and return the output.

    ### Parameters:  

        - commands (str/list): Commands to run on the node. Should be 
                               str or a list of str. You can use variables
                               as {varname} and defining them in optional
                               parameter vars.

    ### Optional Parameters:  

        - vars  (dict): Dictionary containing the definition of variables
                        used in commands parameter.
                        Keys: Variable names.
                        Values: strings.

    ### Optional Named Parameters:  

        - folder (str): Path where output log should be stored, leave 
                        empty to disable logging.  

        - prompt (str): Prompt to be expected after a command is finished 
                        running. Usually linux uses  ">" or EOF while 
                        routers use ">" or "#". The default value should 
                        work for most nodes. Change it if your connection 
                        need some special symbol.  

        - stdout (bool):Set True to send the command output to stdout. 
                        default False.

        - timeout (int):Time in seconds for expect to wait for prompt/EOF.
                        default 10.

    ### Returns:  

        str: Output of the commands you ran on the node.

    '''
    connect = self._connect(timeout = timeout)
    now = datetime.datetime.now().strftime('%Y-%m-%d_%H%M%S')
    if connect == True:
        # Attempt to set the terminal size
        try:
            self.child.setwinsize(65535, 65535)
        except Exception:
            try:
                self.child.setwinsize(10000, 10000)
            except Exception:
                pass
        if "prompt" in self.tags:
            prompt = self.tags["prompt"]
        expects = [prompt, pexpect.EOF, pexpect.TIMEOUT]
        output = ''
        status = ''
        if not isinstance(commands, list):
            commands = [commands]
        if "screen_length_command" in self.tags:
            commands.insert(0, self.tags["screen_length_command"])
        self.mylog = io.BytesIO()
        self.child.logfile_read = self.mylog
        for c in commands:
            if vars is not None:
                c = c.format(**vars)
            result = self.child.expect(expects, timeout = timeout)
            self.child.sendline(c)
            if result == 2:
                break
        if not result == 2:
            result = self.child.expect(expects, timeout = timeout)
        self.child.close()
        output = self._logclean(self.mylog.getvalue().decode(), True)
        if stdout == True:
            print(output)
        if folder != '':
            with open(folder + "/" + self.unique + "_" + now + ".txt", "w") as f:
                f.write(output)
                f.close()
        self.output = output
        if result == 2:
            self.status = 2
        else:
            self.status = 0
        return output
    else:
        self.output = connect
        self.status = 1
        if stdout == True:
            print(connect)
        if folder != '':
            with open(folder + "/" + self.unique + "_" + now + ".txt", "w") as f:
                f.write(connect)
                f.close()
        return connect

Run a command or list of commands on the node and return the output.

Parameters:

- commands (str/list): Commands to run on the node. Should be 
                       str or a list of str. You can use variables
                       as {varname} and defining them in optional
                       parameter vars.

Optional Parameters:

- vars  (dict): Dictionary containing the definition of variables
                used in commands parameter.
                Keys: Variable names.
                Values: strings.

Optional Named Parameters:

- folder (str): Path where output log should be stored, leave 
                empty to disable logging.

- prompt (str): Prompt to be expected after a command is finished 
                running. Usually linux uses  ">" or EOF while 
                routers use ">" or "#". The default value should 
                work for most nodes. Change it if your connection 
                need some special symbol.

- stdout (bool):Set True to send the command output to stdout. 
                default False.

- timeout (int):Time in seconds for expect to wait for prompt/EOF.
                default 10.

Returns:

str: Output of the commands you ran on the node.
def test(self,
commands,
expected,
vars=None,
*,
prompt='>$|#$|\\$$|>.$|#.$|\\$.$',
timeout=10)
Expand source code
@MethodHook
def test(self, commands, expected, vars = None,*, prompt = r'>$|#$|\$$|>.$|#.$|\$.$', timeout = 10):
    '''
    Run a command or list of commands on the node, then check if expected value appears on the output after the last command.

    ### Parameters:  

        - commands (str/list): Commands to run on the node. Should be
                               str or a list of str. You can use variables
                               as {varname} and defining them in optional
                               parameter vars.

        - expected (str)     : Expected text to appear after running 
                               all the commands on the node.You can use
                               variables as {varname} and defining them
                               in optional parameter vars.

    ### Optional Parameters:  

        - vars  (dict): Dictionary containing the definition of variables
                        used in commands and expected parameters.
                        Keys: Variable names.
                        Values: strings.

    ### Optional Named Parameters: 

        - prompt (str): Prompt to be expected after a command is finished
                        running. Usually linux uses  ">" or EOF while 
                        routers use ">" or "#". The default value should 
                        work for most nodes. Change it if your connection 
                        need some special symbol.

        - timeout (int):Time in seconds for expect to wait for prompt/EOF.
                        default 10.

    ### Returns: 
        bool: true if expected value is found after running the commands 
              false if prompt is found before.

    '''
    connect = self._connect(timeout = timeout)
    if connect == True:
        # Attempt to set the terminal size
        try:
            self.child.setwinsize(65535, 65535)
        except Exception:
            try:
                self.child.setwinsize(10000, 10000)
            except Exception:
                pass
        if "prompt" in self.tags:
            prompt = self.tags["prompt"]
        expects = [prompt, pexpect.EOF, pexpect.TIMEOUT]
        output = ''
        if not isinstance(commands, list):
            commands = [commands]
        if not isinstance(expected, list):
            expected = [expected]
        if "screen_length_command" in self.tags:
            commands.insert(0, self.tags["screen_length_command"])
        self.mylog = io.BytesIO()
        self.child.logfile_read = self.mylog
        for c in commands:
            if vars is not None:
                c = c.format(**vars)
            result = self.child.expect(expects, timeout = timeout)
            self.child.sendline(c)
            if result == 2:
                break
        if not result == 2:
            result = self.child.expect(expects, timeout = timeout)
        self.child.close()
        output = self._logclean(self.mylog.getvalue().decode(), True)
        self.output = output
        if result in [0, 1]:
            # lastcommand = commands[-1]
            # if vars is not None:
                # lastcommand = lastcommand.format(**vars)
            # last_command_index = output.rfind(lastcommand)
            # cleaned_output = output[last_command_index + len(lastcommand):].strip()
            self.result = {}
            for e in expected:
                if vars is not None:
                    e = e.format(**vars)
                updatedprompt = re.sub(r'(?<!\\)\$', '', prompt)
                newpattern = f".*({updatedprompt}).*{e}.*"
                cleaned_output = output
                cleaned_output = re.sub(newpattern, '', cleaned_output)
                if e in cleaned_output:
                    self.result[e] = True
                else:
                    self.result[e]= False
            self.status = 0
            return self.result
        if result == 2:
            self.result = None
            self.status = 2
            return output
    else:
        self.result = None
        self.output = connect
        self.status = 1
        return connect

Run a command or list of commands on the node, then check if expected value appears on the output after the last command.

Parameters:

- commands (str/list): Commands to run on the node. Should be
                       str or a list of str. You can use variables
                       as {varname} and defining them in optional
                       parameter vars.

- expected (str)     : Expected text to appear after running 
                       all the commands on the node.You can use
                       variables as {varname} and defining them
                       in optional parameter vars.

Optional Parameters:

- vars  (dict): Dictionary containing the definition of variables
                used in commands and expected parameters.
                Keys: Variable names.
                Values: strings.

Optional Named Parameters:

- prompt (str): Prompt to be expected after a command is finished
                running. Usually linux uses  ">" or EOF while 
                routers use ">" or "#". The default value should 
                work for most nodes. Change it if your connection 
                need some special symbol.

- timeout (int):Time in seconds for expect to wait for prompt/EOF.
                default 10.

Returns:

bool: true if expected value is found after running the commands 
      false if prompt is found before.
class nodes (nodes: dict, config='')
Expand source code
@ClassHook
class nodes:
    ''' This class generates a nodes object. Contains a list of node class objects and methods to run multiple tasks on nodes simultaneously.

    ### Attributes:  

        - nodelist (list): List of node class objects passed to the init 
                           function.  

        - output   (dict): Dictionary formed by nodes unique as keys, 
                           output of the commands you ran on the node as 
                           value. Created after running methods run or test.  

        - result   (dict): Dictionary formed by nodes unique as keys, value 
                           is True if expected value is found after running 
                           the commands, False if prompt is found before. 
                           Created after running method test.  

        - status   (dict): Dictionary formed by nodes unique as keys, value: 
                           0 if method run or test ended successfully.
                           1 if connection failed.
                           2 if expect timeouts without prompt or EOF.

        - <unique> (obj):  For each item in nodelist, there is an attribute
                           generated with the node unique.
        '''

    def __init__(self, nodes: dict, config = ''):
        ''' 
        ### Parameters:  

            - nodes (dict): Dictionary formed by node information:  
                            Keys: Unique name for each node.  
                            Mandatory Subkeys: host(str).  
                            Optional Subkeys: options(str), logs(str), password(str),
                            port(str), protocol(str), user(str).  
                            For reference on subkeys check node class.

        ### Optional Parameters:  

            - config (obj): Pass the object created with class configfile with key 
                            for decryption and extra configuration if you are using 
                            connection manager.
        '''
        self.nodelist = []
        self.config = config
        for n in nodes:
            this = node(n, **nodes[n], config = config)
            self.nodelist.append(this)
            setattr(self,n,this)

    
    @MethodHook
    def _splitlist(self, lst, n):
        #split a list in lists of n members.
        for i in range(0, len(lst), n):
            yield lst[i:i + n]


    @MethodHook
    def run(self, commands, vars = None,*, folder = None, prompt = None, stdout = None, parallel = 10, timeout = None, on_complete = None):
        '''
        Run a command or list of commands on all the nodes in nodelist.

        ### Parameters:  

            - commands (str/list): Commands to run on the nodes. Should be str or 
                                   list of str. You can use variables as {varname}
                                   and defining them in optional parameter vars.

        ### Optional Parameters:  

            - vars  (dict): Dictionary containing the definition of variables for
                            each node, used in commands parameter.
                            Keys should be formed by nodes unique names. Use
                            special key name __global__ for global variables.
                            Subkeys: Variable names.
                            Values: strings.

        ### Optional Named Parameters:  

            - folder   (str): Path where output log should be stored, leave empty 
                              to disable logging.  

            - prompt   (str): Prompt to be expected after a command is finished 
                              running. Usually linux uses  ">" or EOF while routers 
                              use ">" or "#". The default value should work for 
                              most nodes. Change it if your connection need some 
                              special symbol.  

            - stdout  (bool): Set True to send the command output to stdout. 
                              Default False.  

            - parallel (int): Number of nodes to run the commands simultaneously. 
                              Default is 10, if there are more nodes that this 
                              value, nodes are groups in groups with max this 
                              number of members.
            
            - timeout  (int): Time in seconds for expect to wait for prompt/EOF.
                              default 10.

            - on_complete (callable): Optional callback called when each node 
                                      finishes. Receives (unique, output, status).
                                      Called from the node's thread so it must
                                      be thread-safe.

        ###Returns:  

            dict: Dictionary formed by nodes unique as keys, Output of the 
                  commands you ran on the node as value.

        '''
        args = {}
        nodesargs = {}
        args["commands"] = commands
        if folder != None:
            args["folder"] = folder
            Path(folder).mkdir(parents=True, exist_ok=True)
        if prompt != None:
            args["prompt"] = prompt
        if stdout != None and on_complete is None:
            args["stdout"] = stdout
        if timeout != None:
            args["timeout"] = timeout
        output = {}
        status = {}
        tasks = []

        def _run_node(node_obj, node_args, callback):
            """Wrapper that runs a node and fires the callback on completion."""
            node_obj.run(**node_args)
            if callback:
                callback(node_obj.unique, node_obj.output, node_obj.status)

        for n in self.nodelist:
            nodesargs[n.unique] = deepcopy(args)
            if vars != None:
                nodesargs[n.unique]["vars"] = {}
                if "__global__" in vars.keys():
                    nodesargs[n.unique]["vars"].update(vars["__global__"])
                if n.unique in vars.keys():
                    nodesargs[n.unique]["vars"].update(vars[n.unique])
            if on_complete:
                tasks.append(threading.Thread(target=_run_node, args=(n, nodesargs[n.unique], on_complete)))
            else:
                tasks.append(threading.Thread(target=n.run, kwargs=nodesargs[n.unique]))
        taskslist = list(self._splitlist(tasks, parallel))
        for t in taskslist:
            for i in t:
                i.start()
            for i in t:
                i.join()
        for i in self.nodelist:
            output[i.unique] = i.output
            status[i.unique] = i.status
        self.output = output
        self.status = status
        return output

    @MethodHook
    def test(self, commands, expected, vars = None,*, prompt = None, parallel = 10, timeout = None):
        '''
        Run a command or list of commands on all the nodes in nodelist, then check if expected value appears on the output after the last command.

        ### Parameters:  

            - commands (str/list): Commands to run on the node. Should be str or 
                                   list of str.  

            - expected (str)     : Expected text to appear after running all the 
                                   commands on the node.

        ### Optional Parameters:  

            - vars  (dict): Dictionary containing the definition of variables for
                            each node, used in commands and expected parameters.
                            Keys should be formed by nodes unique names. Use
                            special key name __global__ for global variables.
                            Subkeys: Variable names.
                            Values: strings.

        ### Optional Named Parameters:  

            - prompt   (str): Prompt to be expected after a command is finished 
                              running. Usually linux uses  ">" or EOF while 
                              routers use ">" or "#". The default value should 
                              work for most nodes. Change it if your connection 
                              need some special symbol.


            - parallel (int): Number of nodes to run the commands simultaneously. 
                              Default is 10, if there are more nodes that this 
                              value, nodes are groups in groups with max this 
                              number of members.

            - timeout  (int): Time in seconds for expect to wait for prompt/EOF.
                              default 10.

        ### Returns:  

            dict: Dictionary formed by nodes unique as keys, value is True if 
                  expected value is found after running the commands, False 
                  if prompt is found before.

        '''
        args = {}
        nodesargs = {}
        args["commands"] = commands
        args["expected"] = expected
        if prompt != None:
            args["prompt"] = prompt
        if timeout != None:
            args["timeout"] = timeout
        output = {}
        result = {}
        status = {}
        tasks = []
        for n in self.nodelist:
            nodesargs[n.unique] = deepcopy(args)
            if vars != None:
                nodesargs[n.unique]["vars"] = {}
                if "__global__" in vars.keys():
                    nodesargs[n.unique]["vars"].update(vars["__global__"])
                if n.unique in vars.keys():
                    nodesargs[n.unique]["vars"].update(vars[n.unique])
            tasks.append(threading.Thread(target=n.test, kwargs=nodesargs[n.unique]))
        taskslist = list(self._splitlist(tasks, parallel))
        for t in taskslist:
            for i in t:
                i.start()
            for i in t:
                i.join()
        for i in self.nodelist:
            result[i.unique] = i.result
            output[i.unique] = i.output
            status[i.unique] = i.status
        self.output = output
        self.result = result
        self.status = status
        return result

This class generates a nodes object. Contains a list of node class objects and methods to run multiple tasks on nodes simultaneously.

Attributes:

- nodelist (list): List of node class objects passed to the init 
                   function.

- output   (dict): Dictionary formed by nodes unique as keys, 
                   output of the commands you ran on the node as 
                   value. Created after running methods run or test.

- result   (dict): Dictionary formed by nodes unique as keys, value 
                   is True if expected value is found after running 
                   the commands, False if prompt is found before. 
                   Created after running method test.

- status   (dict): Dictionary formed by nodes unique as keys, value: 
                   0 if method run or test ended successfully.
                   1 if connection failed.
                   2 if expect timeouts without prompt or EOF.

- <unique> (obj):  For each item in nodelist, there is an attribute
                   generated with the node unique.

Parameters:

- nodes (dict): Dictionary formed by node information:  
                Keys: Unique name for each node.  
                Mandatory Subkeys: host(str).  
                Optional Subkeys: options(str), logs(str), password(str),
                port(str), protocol(str), user(str).  
                For reference on subkeys check node class.

Optional Parameters:

- config (obj): Pass the object created with class configfile with key 
                for decryption and extra configuration if you are using 
                connection manager.

Methods

def run(self,
commands,
vars=None,
*,
folder=None,
prompt=None,
stdout=None,
parallel=10,
timeout=None,
on_complete=None)
Expand source code
@MethodHook
def run(self, commands, vars = None,*, folder = None, prompt = None, stdout = None, parallel = 10, timeout = None, on_complete = None):
    '''
    Run a command or list of commands on all the nodes in nodelist.

    ### Parameters:  

        - commands (str/list): Commands to run on the nodes. Should be str or 
                               list of str. You can use variables as {varname}
                               and defining them in optional parameter vars.

    ### Optional Parameters:  

        - vars  (dict): Dictionary containing the definition of variables for
                        each node, used in commands parameter.
                        Keys should be formed by nodes unique names. Use
                        special key name __global__ for global variables.
                        Subkeys: Variable names.
                        Values: strings.

    ### Optional Named Parameters:  

        - folder   (str): Path where output log should be stored, leave empty 
                          to disable logging.  

        - prompt   (str): Prompt to be expected after a command is finished 
                          running. Usually linux uses  ">" or EOF while routers 
                          use ">" or "#". The default value should work for 
                          most nodes. Change it if your connection need some 
                          special symbol.  

        - stdout  (bool): Set True to send the command output to stdout. 
                          Default False.  

        - parallel (int): Number of nodes to run the commands simultaneously. 
                          Default is 10, if there are more nodes that this 
                          value, nodes are groups in groups with max this 
                          number of members.
        
        - timeout  (int): Time in seconds for expect to wait for prompt/EOF.
                          default 10.

        - on_complete (callable): Optional callback called when each node 
                                  finishes. Receives (unique, output, status).
                                  Called from the node's thread so it must
                                  be thread-safe.

    ###Returns:  

        dict: Dictionary formed by nodes unique as keys, Output of the 
              commands you ran on the node as value.

    '''
    args = {}
    nodesargs = {}
    args["commands"] = commands
    if folder != None:
        args["folder"] = folder
        Path(folder).mkdir(parents=True, exist_ok=True)
    if prompt != None:
        args["prompt"] = prompt
    if stdout != None and on_complete is None:
        args["stdout"] = stdout
    if timeout != None:
        args["timeout"] = timeout
    output = {}
    status = {}
    tasks = []

    def _run_node(node_obj, node_args, callback):
        """Wrapper that runs a node and fires the callback on completion."""
        node_obj.run(**node_args)
        if callback:
            callback(node_obj.unique, node_obj.output, node_obj.status)

    for n in self.nodelist:
        nodesargs[n.unique] = deepcopy(args)
        if vars != None:
            nodesargs[n.unique]["vars"] = {}
            if "__global__" in vars.keys():
                nodesargs[n.unique]["vars"].update(vars["__global__"])
            if n.unique in vars.keys():
                nodesargs[n.unique]["vars"].update(vars[n.unique])
        if on_complete:
            tasks.append(threading.Thread(target=_run_node, args=(n, nodesargs[n.unique], on_complete)))
        else:
            tasks.append(threading.Thread(target=n.run, kwargs=nodesargs[n.unique]))
    taskslist = list(self._splitlist(tasks, parallel))
    for t in taskslist:
        for i in t:
            i.start()
        for i in t:
            i.join()
    for i in self.nodelist:
        output[i.unique] = i.output
        status[i.unique] = i.status
    self.output = output
    self.status = status
    return output

Run a command or list of commands on all the nodes in nodelist.

Parameters:

- commands (str/list): Commands to run on the nodes. Should be str or 
                       list of str. You can use variables as {varname}
                       and defining them in optional parameter vars.

Optional Parameters:

- vars  (dict): Dictionary containing the definition of variables for
                each node, used in commands parameter.
                Keys should be formed by nodes unique names. Use
                special key name __global__ for global variables.
                Subkeys: Variable names.
                Values: strings.

Optional Named Parameters:

- folder   (str): Path where output log should be stored, leave empty 
                  to disable logging.

- prompt   (str): Prompt to be expected after a command is finished 
                  running. Usually linux uses  ">" or EOF while routers 
                  use ">" or "#". The default value should work for 
                  most nodes. Change it if your connection need some 
                  special symbol.

- stdout  (bool): Set True to send the command output to stdout. 
                  Default False.

- parallel (int): Number of nodes to run the commands simultaneously. 
                  Default is 10, if there are more nodes that this 
                  value, nodes are groups in groups with max this 
                  number of members.

- timeout  (int): Time in seconds for expect to wait for prompt/EOF.
                  default 10.

- on_complete (callable): Optional callback called when each node 
                          finishes. Receives (unique, output, status).
                          Called from the node's thread so it must
                          be thread-safe.

Returns:

dict: Dictionary formed by nodes unique as keys, Output of the 
      commands you ran on the node as value.
def test(self, commands, expected, vars=None, *, prompt=None, parallel=10, timeout=None)
Expand source code
@MethodHook
def test(self, commands, expected, vars = None,*, prompt = None, parallel = 10, timeout = None):
    '''
    Run a command or list of commands on all the nodes in nodelist, then check if expected value appears on the output after the last command.

    ### Parameters:  

        - commands (str/list): Commands to run on the node. Should be str or 
                               list of str.  

        - expected (str)     : Expected text to appear after running all the 
                               commands on the node.

    ### Optional Parameters:  

        - vars  (dict): Dictionary containing the definition of variables for
                        each node, used in commands and expected parameters.
                        Keys should be formed by nodes unique names. Use
                        special key name __global__ for global variables.
                        Subkeys: Variable names.
                        Values: strings.

    ### Optional Named Parameters:  

        - prompt   (str): Prompt to be expected after a command is finished 
                          running. Usually linux uses  ">" or EOF while 
                          routers use ">" or "#". The default value should 
                          work for most nodes. Change it if your connection 
                          need some special symbol.


        - parallel (int): Number of nodes to run the commands simultaneously. 
                          Default is 10, if there are more nodes that this 
                          value, nodes are groups in groups with max this 
                          number of members.

        - timeout  (int): Time in seconds for expect to wait for prompt/EOF.
                          default 10.

    ### Returns:  

        dict: Dictionary formed by nodes unique as keys, value is True if 
              expected value is found after running the commands, False 
              if prompt is found before.

    '''
    args = {}
    nodesargs = {}
    args["commands"] = commands
    args["expected"] = expected
    if prompt != None:
        args["prompt"] = prompt
    if timeout != None:
        args["timeout"] = timeout
    output = {}
    result = {}
    status = {}
    tasks = []
    for n in self.nodelist:
        nodesargs[n.unique] = deepcopy(args)
        if vars != None:
            nodesargs[n.unique]["vars"] = {}
            if "__global__" in vars.keys():
                nodesargs[n.unique]["vars"].update(vars["__global__"])
            if n.unique in vars.keys():
                nodesargs[n.unique]["vars"].update(vars[n.unique])
        tasks.append(threading.Thread(target=n.test, kwargs=nodesargs[n.unique]))
    taskslist = list(self._splitlist(tasks, parallel))
    for t in taskslist:
        for i in t:
            i.start()
        for i in t:
            i.join()
    for i in self.nodelist:
        result[i.unique] = i.result
        output[i.unique] = i.output
        status[i.unique] = i.status
    self.output = output
    self.result = result
    self.status = status
    return result

Run a command or list of commands on all the nodes in nodelist, then check if expected value appears on the output after the last command.

Parameters:

- commands (str/list): Commands to run on the node. Should be str or 
                       list of str.

- expected (str)     : Expected text to appear after running all the 
                       commands on the node.

Optional Parameters:

- vars  (dict): Dictionary containing the definition of variables for
                each node, used in commands and expected parameters.
                Keys should be formed by nodes unique names. Use
                special key name __global__ for global variables.
                Subkeys: Variable names.
                Values: strings.

Optional Named Parameters:

- prompt   (str): Prompt to be expected after a command is finished 
                  running. Usually linux uses  ">" or EOF while 
                  routers use ">" or "#". The default value should 
                  work for most nodes. Change it if your connection 
                  need some special symbol.


- parallel (int): Number of nodes to run the commands simultaneously. 
                  Default is 10, if there are more nodes that this 
                  value, nodes are groups in groups with max this 
                  number of members.

- timeout  (int): Time in seconds for expect to wait for prompt/EOF.
                  default 10.

Returns:

dict: Dictionary formed by nodes unique as keys, value is True if 
      expected value is found after running the commands, False 
      if prompt is found before.