Package connpy
Connection manager
Connpy is a SSH, SFTP, Telnet, kubectl, and Docker pod connection manager and automation module for Linux, Mac, and Docker.
Features
- Manage connections using SSH, SFTP, Telnet, kubectl, and Docker exec.
- Set contexts to manage specific nodes from specific contexts (work/home/clients/etc).
- You can generate profiles and reference them from nodes using @profilename so you don't
need to edit multiple nodes when changing passwords or other information.
- Nodes can be stored on @folder or @subfolder@folder to organize your devices. They can
be referenced using node@subfolder@folder or node@folder.
- If you have too many nodes, get a completion script using: conn config --completion.
Or use fzf by installing pyfzf and running conn config --fzf true.
- Create in bulk, copy, move, export, and import nodes for easy management.
- Run automation scripts on network devices.
- Use AI with a multi-agent system (Engineer/Architect) to help you manage your devices.
Supports any LLM provider via litellm (OpenAI, Anthropic, Google, etc.).
- Add plugins with your own scripts, and execute them remotely.
- Fully decoupled gRPC Client/Server architecture.
- Unified UI with syntax highlighting and theming.
- Much more!
Usage
usage: conn [-h] [--add | --del | --mod | --show | --debug] [node|folder] [--sftp]
conn {profile,move,mv,copy,cp,list,ls,bulk,export,import,ai,run,api,plugin,config,sync,context} ...
positional arguments:
node|folder node[@subfolder][@folder]
Connect to specific node or show all matching nodes
[@subfolder][@folder]
Show all available connections globally or in specified path
options:
-h, --help show this help message and exit
-v, --version Show version
-a, --add Add new node[@subfolder][@folder] or [@subfolder]@folder
-r, --del, --rm Delete node[@subfolder][@folder] or [@subfolder]@folder
-e, --mod, --edit Modify node[@subfolder][@folder]
-s, --show Show node[@subfolder][@folder]
-d, --debug Display all conections steps
-t, --sftp Connects using sftp instead of ssh
--service-mode Set the backend service mode (local or remote)
--remote Connect to a remote connpy service via gRPC
--theme UI Output theme (dark, light, or path)
Commands:
profile Manage profiles
move(mv) Move node
copy(cp) Copy node
list(ls) List profiles, nodes or folders
bulk Add nodes in bulk
export Export connection folder to Yaml file
import Import connection folder to config from Yaml file
ai Make request to an AI
run Run scripts or commands on nodes
api Start and stop connpy api
plugin Manage plugins
config Manage app config
sync Sync config with Google
context Manage contexts with regex matching
Manage profiles
usage: conn profile [-h] (--add | --del | --mod | --show) profile
positional arguments:
profile Name of profile to manage
options:
-h, --help show this help message and exit
-a, --add Add new profile
-r, --del, --rm Delete profile
-e, --mod, --edit Modify profile
-s, --show Show profile
Examples
#Add new profile
conn profile --add office-user
#Add new folder
conn --add @office
#Add new subfolder
conn --add @datacenter@office
#Add node to subfolder
conn --add server@datacenter@office
#Add node to folder
conn --add pc@office
#Show node information
conn --show server@datacenter@office
#Connect to nodes
conn pc@office
conn server
#Create and set new context
conn context -a office .*@office
conn context --set office
#Run a command in a node
conn run server ls -la
Plugin Requirements for Connpy
Remote Plugin Execution
When Connpy operates in remote mode, plugins are executed transparently on the server:
- The client automatically downloads the plugin source code (Parser class context) to generate the local argparse structure and provide autocompletion.
- The execution phase (Entrypoint class) is redirected via gRPC streams to execute in the server's memory, ensuring the plugin runs securely against the server's inventory without passing sensitive data to the client.
- You can manage remote plugins using the --remote flag (e.g. connpy plugin --add myplugin script.py --remote).
General Structure
- The plugin script must be a Python file.
- Only the following top-level elements are allowed in the plugin script:
- Class definitions
- Function definitions
- Import statements
- The
if __name__ == "__main__":block for standalone execution - Pass statements
Specific Class Requirements
- The plugin script must define specific classes with particular attributes and methods. Each class serves a distinct role within the plugin's architecture:
- Class
Parser:- Purpose: Handles parsing of command-line arguments.
- Requirements:
- Must contain only one method:
__init__. - The
__init__method must initialize at least one attribute:self.parser: An instance ofargparse.ArgumentParser.
- Class
Entrypoint:- Purpose: Acts as the entry point for plugin execution, utilizing parsed arguments and integrating with the main application.
- Requirements:
- Must have an
__init__method that accepts exactly three parameters besidesself:args: Arguments passed to the plugin.- The parser instance (typically
self.parserfrom theParserclass). - The Connapp instance to interact with the Connpy app.
- Class
Preload:- Purpose: Performs any necessary preliminary setup or configuration independent of the main parsing and entry logic.
- Requirements:
- Contains at least an
__init__method that accepts parameter connapp besidesself.
- Contains at least an
Class Dependencies and Combinations
- Dependencies:
ParserandEntrypointare interdependent and must both be present if one is included.Preloadis independent and may exist alone or alongside the other classes.- Valid Combinations:
ParserandEntrypointtogether.Preloadalone.- All three classes (
Parser,Entrypoint,Preload).
Preload Modifications and Hooks
In the Preload class of the plugin system, you have the ability to customize the behavior of existing classes and methods within the application through a robust hooking system. This documentation explains how to use the modify, register_pre_hook, and register_post_hook methods to tailor plugin functionality to your needs.
Modifying Classes with modify
The modify method allows you to alter instances of a class at the time they are created or after their creation. This is particularly useful for setting or modifying configuration settings, altering default behaviors, or adding new functionalities to existing classes without changing the original class definitions.
- Usage: Modify a class to include additional configurations or changes
- Modify Method Signature:
modify(modification_method): A function that is invoked with an instance of the class as its argument. This function should perform any modifications directly on this instance.- Modification Method Signature:
- Arguments:
cls: This function accepts a single argument, the class instance, which it then modifies.
- Modifiable Classes:
connapp.configconnapp.nodeconnapp.nodesconnapp.ai
-
```python def modify_config(cls): # Example modification: adding a new attribute or modifying an existing one cls.new_attribute = 'New Value'
class Preload: def init(self, connapp): # Applying modification to the config class instance connapp.config.modify(modify_config) ```
Implementing Method Hooks
There are 2 methods that allows you to define custom logic to be executed before (register_pre_hook) or after (register_post_hook) the main logic of a method. This is particularly useful for logging, auditing, preprocessing inputs, postprocessing outputs or adding functionalities.
- Usage: Register hooks to methods to execute additional logic before or after the main method execution.
- Registration Methods Signature:
register_pre_hook(pre_hook_method): A function that is invoked before the main method is executed. This function should do preprocessing of the arguments.register_post_hook(post_hook_method): A function that is invoked after the main method is executed. This function should do postprocessing of the outputs.- Method Signatures for Pre-Hooks
pre_hook_method(*args, **kwargs)- Arguments:
*args,**kwargs: The arguments and keyword arguments that will be passed to the method being hooked. The pre-hook function has the opportunity to inspect and modify these arguments before they are passed to the main method.
- Return:
- Must return a tuple
(args, kwargs), which will be used as the new arguments for the main method. If the original arguments are not modified, the function should return them as received.
- Must return a tuple
- Method Signatures for Post-Hooks:
post_hook_method(*args, **kwargs)- Arguments:
*args,**kwargs: The arguments and keyword arguments that were passed to the main method.kwargs["result"]: The value returned by the main method. This allows the post-hook to inspect and even alter the result before it is returned to the original caller.
- Return:
- Can return a modified result, which will replace the original result of the main method, or simply return
kwargs["result"]to return the original method result.
- Can return a modified result, which will replace the original result of the main method, or simply return
-
```python def pre_processing_hook(args, *kwargs): print("Pre-processing logic here") # Modify arguments or perform any checks return args, kwargs # Return modified or unmodified args and kwargs
def post_processing_hook(args, *kwargs): print("Post-processing logic here") # Modify the result or perform any final logging or cleanup return kwargs["result"] # Return the modified or unmodified result
class Preload: def init(self, connapp): # Registering a pre-hook connapp.ai.some_method.register_pre_hook(pre_processing_hook)
# Registering a post-hook connapp.node.another_method.register_post_hook(post_processing_hook)```
Executable Block
- The plugin script can include an executable block:
if __name__ == "__main__":- This block allows the plugin to be run as a standalone script for testing or independent use.
Command Completion Support
Plugins can provide intelligent tab completion by defining autocompletion logic. There are two supported methods, with the tree-based approach being the most modern and recommended.
1. Tree-based Completion (Recommended)
Define a function called _connpy_tree that returns a declarative navigation tree. This method is highly efficient, supports complex state loops, and is very simple to implement for most use cases.
def _connpy_tree(info=None):
nodes = info.get("nodes", [])
return {
"__exclude_used__": True, # Filter out words already typed
"__extra__": nodes, # Suggest nodes at this level
"--format": ["json", "yaml", "table"], # Fixed suggestions
"*": { # Wildcard matches any positional word
"interface1": None,
"interface2": None,
"--verbose": None
}
}
- Keys: Literal completions (exact matches).
*Key: A wildcard that matches any positional word typed by the user.__extra__: A list or a callable(words) -> listthat adds dynamic suggestions.__exclude_used__: (Boolean) If True, automatically filters out words already present in the command line.
2. Legacy Function-based Completion
For backward compatibility or highly custom logic, you can define _connpy_completion.
def _connpy_completion(wordsnumber, words, info=None):
if wordsnumber == 3:
return ["--help", "--verbose", "start", "stop"]
elif wordsnumber == 4 and words[2] == "start":
return info["nodes"] # Suggest node names
return []
| Parameter | Description |
|---|---|
wordsnumber |
Integer indicating the total number of words on the command line. For plugins, this typically starts at 3. |
words |
A list of tokens (words) already typed. words[0] is always the name of the plugin. |
info |
A dictionary of structured context data (nodes, folders, profiles, config). |
In this example, if the user types
connpy myplugin startand presses Tab, it will suggest node names.
Handling Unknown Arguments
Plugins can choose to accept and process unknown arguments that are not explicitly defined in the parser. To enable this behavior, the plugin must define the following hidden argument in its Parser class:
self.parser.add_argument(
"--unknown-args",
action="store_true",
default=True,
help=argparse.SUPPRESS
)
Behavior:
- When this argument is present, Connpy will parse the known arguments and capture any extra (unknown) ones.
- These unknown arguments will be passed to the plugin as
args.unknown_argsinside theEntrypoint. - If the user does not pass any unknown arguments,
args.unknown_argswill contain the default value (True, unless overridden).
Example:
If a plugin accepts unknown tcpdump flags like this:
connpy myplugin -nn -s0
And defines the hidden --unknown-args flag as shown above, then:
args.unknown_argsinsideEntrypoint.__init__()will be:['-nn', '-s0']
This allows the plugin to receive and process arguments intended for external tools (e.g.,
tcpdump) without argparse raising an error.
Note:
If a plugin does not define --unknown-args, any extra arguments passed will cause argparse to fail with an unrecognized arguments error.
Script Verification
- The
verify_scriptmethod inplugins.pyis used to check the plugin script's compliance with these standards. - Non-compliant scripts will be rejected to ensure consistency and proper functionality within the plugin system.
Example Script
For a practical example of how to write a compatible plugin script, please refer to the following example:
This script demonstrates the required structure and implementation details according to the plugin system's standards.
gRPC Service Architecture
Connpy features a completely decoupled gRPC Client/Server architecture. You can run Connpy as a standalone background service and connect to it remotely via the CLI or other clients.
1. Start the Server
Start the gRPC service by running:
connpy api -s 50051
The server will handle all configurations, connections, AI sessions, and plugin execution locally on the machine it runs on.
2. Connect the Client
Configure your local CLI client to connect to the remote server:
connpy config --service-mode remote
connpy config --remote-host localhost:50051
Once configured, all commands (connpy node, connpy list, connpy ai, etc.) will execute transparently on the remote server via thin-client proxies. You can revert back to standalone execution at any time by running connpy config --service-mode local.
Programmatic Access (gRPC & SOA)
Developers can build their own applications using the Connpy backend by utilizing the ServiceProvider:
from connpy.services.provider import ServiceProvider
services = ServiceProvider(config, mode="remote", remote_host="localhost:50051")
nodes = services.nodes.list_nodes()
Automation module
The automation module
Standalone module
import connpy
router = connpy.node("uniqueName","ip/host", user="user", password="pass")
router.run(["term len 0","show run"])
print(router.output)
hasip = router.test("show ip int brief","1.1.1.1")
if hasip:
print("Router has ip 1.1.1.1")
else:
print("router does not have ip 1.1.1.1")
Using manager configuration
import connpy
conf = connpy.configfile()
device = conf.getitem("server@office")
server = connpy.node("unique name", **device, config=conf)
result = server.run(["cd /", "ls -la"])
print(result)
Running parallel tasks
import connpy
conf = connpy.configfile()
#You can get the nodes from the config from a folder and fitlering in it
nodes = conf.getitem("@office", ["router1", "router2", "router3"])
#You can also get each node individually:
nodes = {}
nodes["router1"] = conf.getitem("router1@office")
nodes["router2"] = conf.getitem("router2@office")
nodes["router10"] = conf.getitem("router10@datacenter")
#Also, you can create the nodes manually:
nodes = {}
nodes["router1"] = {"host": "1.1.1.1", "user": "user", "password": "pass1"}
nodes["router2"] = {"host": "1.1.1.2", "user": "user", "password": "pass2"}
nodes["router3"] = {"host": "1.1.1.2", "user": "user", "password": "pass3"}
#Finally you run some tasks on the nodes
mynodes = connpy.nodes(nodes, config = conf)
result = mynodes.test(["show ip int br"], "1.1.1.2")
for i in result:
print("---" + i + "---")
print(result[i])
print()
# Or for one specific node
mynodes.router1.run(["term len 0". "show run"], folder = "/home/user/logs")
Using variables
import connpy
config = connpy.configfile()
nodes = config.getitem("@office", ["router1", "router2", "router3"])
commands = []
commands.append("config t")
commands.append("interface lo {id}")
commands.append("ip add {ip} {mask}")
commands.append("end")
variables = {}
variables["router1@office"] = {"ip": "10.57.57.1"}
variables["router2@office"] = {"ip": "10.57.57.2"}
variables["router3@office"] = {"ip": "10.57.57.3"}
variables["__global__"] = {"id": "57"}
variables["__global__"]["mask"] = "255.255.255.255"
expected = "!"
routers = connpy.nodes(nodes, config = config)
routers.run(commands, variables)
routers.test("ping {ip}", expected, variables)
for key in routers.result:
print(key, ' ---> ', ("pass" if routers.result[key] else "fail"))
Using AI
import connpy
conf = connpy.configfile()
# Uses models and API keys from config, or override them:
myai = connpy.ai(conf, engineer_model="gemini/gemini-2.5-flash", engineer_api_key="your-key")
result = myai.ask("go to router1 and show me the running configuration")
print(result["response"])
# Streaming is enabled by default for CLI, disable for programmatic use:
result = myai.ask("show interfaces on all routers", stream=False)
print(result["response"])
AI Plugin Tool Registration
Plugins can register custom tools with the AI system using register_ai_tool() in their Preload class:
def _register_my_tools(ai_instance):
tool_def = {
"type": "function",
"function": {
"name": "my_custom_tool",
"description": "Does something useful.",
"parameters": {
"type": "object",
"properties": {"query": {"type": "string"}},
"required": ["query"]
}
}
}
ai_instance.register_ai_tool(
tool_definition=tool_def,
handler=my_handler_function,
target="engineer", # or "architect" or "both"
engineer_prompt="- My tool: does X.",
architect_prompt=" * My tool (my_custom_tool)."
)
class Preload:
def __init__(self, connapp):
connapp.ai.modify(_register_my_tools)
Developer Notes (SOA Architecture)
As of version 2.0, Connpy has migrated to a Service-Oriented Architecture (SOA):
- connpy/cli/: Contains all CLI handlers. These are responsible for argument parsing, user interaction (via inquirer), and visual output (via printer).
- connpy/services/: Contains pure logic services (Node, Profile, Execution, etc.).
- Zero-Print Policy: Services must never use print(). All output must be returned as data structures or generators to the caller (CLI handlers).
- ServiceProvider: Access services via connapp.services. This allows transparent switching between local and remote (gRPC) backends without modifying CLI logic.
Sub-modules
connpy.cliconnpy.grpcconnpy.servicesconnpy.tests
Classes
class Plugins-
Expand source code
class Plugins: def __init__(self): self.plugins = {} self.plugin_parsers = {} self.preloads = {} self.remote_plugins = {} self.preferences = {} def _load_preferences(self, config_dir): import json path = os.path.join(config_dir, "plugin_preferences.json") try: with open(path) as f: self.preferences = json.load(f) except (FileNotFoundError, json.JSONDecodeError): self.preferences = {} def _save_preferences(self, config_dir): import json path = os.path.join(config_dir, "plugin_preferences.json") try: with open(path, "w") as f: json.dump(self.preferences, f, indent=4) except OSError as e: printer.error(f"Failed to save plugin preferences: {e}") def verify_script(self, file_path): """ Verifies that a given Python script meets specific structural requirements. This function checks a Python script for compliance with predefined structural rules. It ensures that the script contains only allowed top-level elements (functions, classes, imports, pass statements, and a specific if __name__ block) and that it includes mandatory classes with specific attributes and methods. ### Arguments: - file_path (str): The file path of the Python script to be verified. ### Returns: - str: A message indicating the type of violation if the script doesn't meet the requirements, or False if all requirements are met. ### Verifications: - The presence of only allowed top-level elements. - The existence of two specific classes: 'Parser' and 'Entrypoint'. and/or specific class: Preload. - 'Parser' class must only have an '__init__' method and must assign 'self.parser'. - 'Entrypoint' class must have an '__init__' method accepting specific arguments. If any of these checks fail, the function returns an error message indicating the reason. If the script passes all checks, the function returns False, indicating successful verification. ### Exceptions: - SyntaxError: If the script contains a syntax error, it is caught and returned as a part of the error message. """ with open(file_path, 'r') as file: source_code = file.read() try: tree = ast.parse(source_code) except SyntaxError as e: return f"Syntax error in file: {e}" has_parser = False has_entrypoint = False has_preload = False for node in tree.body: # Allow only function definitions, class definitions, and pass statements at top-level if isinstance(node, ast.If): # Check for the 'if __name__ == "__main__":' block if not (isinstance(node.test, ast.Compare) and isinstance(node.test.left, ast.Name) and node.test.left.id == '__name__' and ((hasattr(ast, 'Str') and isinstance(node.test.comparators[0], getattr(ast, 'Str')) and node.test.comparators[0].s == '__main__') or (hasattr(ast, 'Constant') and isinstance(node.test.comparators[0], getattr(ast, 'Constant')) and node.test.comparators[0].value == '__main__'))): return "Only __name__ == __main__ If is allowed" elif not isinstance(node, (ast.FunctionDef, ast.ClassDef, ast.Import, ast.ImportFrom, ast.Pass)): return f"Plugin can only have pass, functions, classes and imports. {node} is not allowed" # Reject any other AST types if isinstance(node, ast.ClassDef): if node.name == 'Parser': has_parser = True # Ensure Parser class has only the __init__ method and assigns self.parser if not all(isinstance(method, ast.FunctionDef) and method.name == '__init__' for method in node.body): return "Parser class should only have __init__ method" # Check if 'self.parser' is assigned in __init__ method init_method = node.body[0] assigned_attrs = [target.attr for expr in init_method.body if isinstance(expr, ast.Assign) for target in expr.targets if isinstance(target, ast.Attribute) and isinstance(target.value, ast.Name) and target.value.id == 'self'] if 'parser' not in assigned_attrs: return "Parser class should set self.parser" elif node.name == 'Entrypoint': has_entrypoint = True init_method = next((item for item in node.body if isinstance(item, ast.FunctionDef) and item.name == '__init__'), None) if not init_method or len(init_method.args.args) != 4: # self, args, parser, conapp return "Entrypoint class should have method __init__ and accept only arguments: args, parser and connapp" # 'Entrypoint' __init__ does not have correct signature elif node.name == 'Preload': has_preload = True init_method = next((item for item in node.body if isinstance(item, ast.FunctionDef) and item.name == '__init__'), None) if not init_method or len(init_method.args.args) != 2: # self, connapp return "Preload class should have method __init__ and accept only argument: connapp" # 'Preload' __init__ does not have correct signature # Applying the combination logic based on class presence if has_parser and not has_entrypoint: return "Parser requires Entrypoint class to be present." elif has_entrypoint and not has_parser: return "Entrypoint requires Parser class to be present." if not (has_parser or has_entrypoint or has_preload): return "No valid class (Parser, Entrypoint, or Preload) found." return False # All requirements met, no error def _import_from_path(self, path): spec = importlib.util.spec_from_file_location("module.name", path) module = importlib.util.module_from_spec(spec) sys.modules["module.name"] = module spec.loader.exec_module(module) return module def _import_plugins_to_argparse(self, directory, subparsers, remote_enabled=False): if not os.path.exists(directory): return for filename in os.listdir(directory): commands = subparsers.choices.keys() if filename.endswith(".py"): root_filename = os.path.splitext(filename)[0] if root_filename in commands: continue # Check preferences: if remote is preferred AND remote is enabled, skip local loading if remote_enabled and self.preferences.get(root_filename) == "remote": continue # Construct the full path filepath = os.path.join(directory, filename) check_file = self.verify_script(filepath) if check_file: printer.error(f"Failed to load plugin: {filename}. Reason: {check_file}") continue else: self.plugins[root_filename] = self._import_from_path(filepath) if hasattr(self.plugins[root_filename], "Parser"): self.plugin_parsers[root_filename] = self.plugins[root_filename].Parser() plugin = self.plugin_parsers[root_filename] # Default to RichHelpFormatter if plugin doesn't set one try: from rich_argparse import RichHelpFormatter as _RHF fmt = plugin.parser.formatter_class if fmt is argparse.HelpFormatter or fmt is argparse.RawTextHelpFormatter or fmt is argparse.RawDescriptionHelpFormatter: fmt = _RHF except ImportError: fmt = plugin.parser.formatter_class subparsers.add_parser(root_filename, parents=[self.plugin_parsers[root_filename].parser], add_help=False, help=plugin.parser.description, usage=plugin.parser.usage, description=plugin.parser.description, epilog=plugin.parser.epilog, formatter_class=fmt) if hasattr(self.plugins[root_filename], "Preload"): self.preloads[root_filename] = self.plugins[root_filename] def _import_remote_plugins_to_argparse(self, plugin_stub, subparsers, cache_dir, force_sync=False): import hashlib os.makedirs(cache_dir, exist_ok=True) try: remote_plugins_info = plugin_stub.list_plugins() except Exception: return # Pruning: Remove local cached files that are no longer on the server for local_file in os.listdir(cache_dir): if local_file.endswith(".py"): name = local_file[:-3] if name not in remote_plugins_info: try: os.remove(os.path.join(cache_dir, local_file)) except Exception: pass for name, info in remote_plugins_info.items(): if not info.get("enabled", True): continue pref = self.preferences.get(name, "local") if pref != "remote" and name in self.plugins: continue if not force_sync and name in subparsers.choices: continue cache_path = os.path.join(cache_dir, f"{name}.py") # Hash comparison remote_hash = info.get("hash", "") local_hash = "" if os.path.exists(cache_path): try: with open(cache_path, "rb") as f: local_hash = hashlib.md5(f.read()).hexdigest() except Exception: pass # Update only if hash differs or force_sync is True if force_sync or remote_hash != local_hash or not os.path.exists(cache_path): try: source = plugin_stub.get_plugin_source(name) with open(cache_path, "w") as f: f.write(source) except Exception as e: printer.warning(f"Failed to sync remote plugin {name}: {e}") continue # Verify and load check_file = self.verify_script(cache_path) if check_file: printer.warning(f"Remote plugin {name} failed verification: {check_file}") continue module = self._import_from_path(cache_path) if hasattr(module, "Parser"): self.plugin_parsers[name] = module.Parser() self.remote_plugins[name] = True plugin = self.plugin_parsers[name] try: from rich_argparse import RichHelpFormatter as _RHF fmt = plugin.parser.formatter_class if fmt is argparse.HelpFormatter or fmt is argparse.RawTextHelpFormatter or fmt is argparse.RawDescriptionHelpFormatter: fmt = _RHF except ImportError: fmt = plugin.parser.formatter_class # If force_sync, we might be re-registering, but argparse subparsers.add_parser # might fail if it exists. We check if it's already there. if name not in subparsers.choices: subparsers.add_parser( name, parents=[plugin.parser], add_help=False, help=f"[remote] {plugin.parser.description}", usage=plugin.parser.usage, description=plugin.parser.description, epilog=plugin.parser.epilog, formatter_class=fmt )Methods
def verify_script(self, file_path)-
Expand source code
def verify_script(self, file_path): """ Verifies that a given Python script meets specific structural requirements. This function checks a Python script for compliance with predefined structural rules. It ensures that the script contains only allowed top-level elements (functions, classes, imports, pass statements, and a specific if __name__ block) and that it includes mandatory classes with specific attributes and methods. ### Arguments: - file_path (str): The file path of the Python script to be verified. ### Returns: - str: A message indicating the type of violation if the script doesn't meet the requirements, or False if all requirements are met. ### Verifications: - The presence of only allowed top-level elements. - The existence of two specific classes: 'Parser' and 'Entrypoint'. and/or specific class: Preload. - 'Parser' class must only have an '__init__' method and must assign 'self.parser'. - 'Entrypoint' class must have an '__init__' method accepting specific arguments. If any of these checks fail, the function returns an error message indicating the reason. If the script passes all checks, the function returns False, indicating successful verification. ### Exceptions: - SyntaxError: If the script contains a syntax error, it is caught and returned as a part of the error message. """ with open(file_path, 'r') as file: source_code = file.read() try: tree = ast.parse(source_code) except SyntaxError as e: return f"Syntax error in file: {e}" has_parser = False has_entrypoint = False has_preload = False for node in tree.body: # Allow only function definitions, class definitions, and pass statements at top-level if isinstance(node, ast.If): # Check for the 'if __name__ == "__main__":' block if not (isinstance(node.test, ast.Compare) and isinstance(node.test.left, ast.Name) and node.test.left.id == '__name__' and ((hasattr(ast, 'Str') and isinstance(node.test.comparators[0], getattr(ast, 'Str')) and node.test.comparators[0].s == '__main__') or (hasattr(ast, 'Constant') and isinstance(node.test.comparators[0], getattr(ast, 'Constant')) and node.test.comparators[0].value == '__main__'))): return "Only __name__ == __main__ If is allowed" elif not isinstance(node, (ast.FunctionDef, ast.ClassDef, ast.Import, ast.ImportFrom, ast.Pass)): return f"Plugin can only have pass, functions, classes and imports. {node} is not allowed" # Reject any other AST types if isinstance(node, ast.ClassDef): if node.name == 'Parser': has_parser = True # Ensure Parser class has only the __init__ method and assigns self.parser if not all(isinstance(method, ast.FunctionDef) and method.name == '__init__' for method in node.body): return "Parser class should only have __init__ method" # Check if 'self.parser' is assigned in __init__ method init_method = node.body[0] assigned_attrs = [target.attr for expr in init_method.body if isinstance(expr, ast.Assign) for target in expr.targets if isinstance(target, ast.Attribute) and isinstance(target.value, ast.Name) and target.value.id == 'self'] if 'parser' not in assigned_attrs: return "Parser class should set self.parser" elif node.name == 'Entrypoint': has_entrypoint = True init_method = next((item for item in node.body if isinstance(item, ast.FunctionDef) and item.name == '__init__'), None) if not init_method or len(init_method.args.args) != 4: # self, args, parser, conapp return "Entrypoint class should have method __init__ and accept only arguments: args, parser and connapp" # 'Entrypoint' __init__ does not have correct signature elif node.name == 'Preload': has_preload = True init_method = next((item for item in node.body if isinstance(item, ast.FunctionDef) and item.name == '__init__'), None) if not init_method or len(init_method.args.args) != 2: # self, connapp return "Preload class should have method __init__ and accept only argument: connapp" # 'Preload' __init__ does not have correct signature # Applying the combination logic based on class presence if has_parser and not has_entrypoint: return "Parser requires Entrypoint class to be present." elif has_entrypoint and not has_parser: return "Entrypoint requires Parser class to be present." if not (has_parser or has_entrypoint or has_preload): return "No valid class (Parser, Entrypoint, or Preload) found." return False # All requirements met, no errorVerifies that a given Python script meets specific structural requirements.
This function checks a Python script for compliance with predefined structural rules. It ensures that the script contains only allowed top-level elements (functions, classes, imports, pass statements, and a specific if name block) and that it includes mandatory classes with specific attributes and methods.
Arguments:
- file_path (str): The file path of the Python script to be verified.Returns:
- str: A message indicating the type of violation if the script doesn't meet the requirements, or False if all requirements are met.Verifications:
- The presence of only allowed top-level elements. - The existence of two specific classes: 'Parser' and 'Entrypoint'. and/or specific class: Preload. - 'Parser' class must only have an '__init__' method and must assign 'self.parser'. - 'Entrypoint' class must have an '__init__' method accepting specific arguments.If any of these checks fail, the function returns an error message indicating the reason. If the script passes all checks, the function returns False, indicating successful verification.
Exceptions:
- SyntaxError: If the script contains a syntax error, it is caught and returned as a part of the error message.
class ai (config,
org=None,
api_key=None,
engineer_model=None,
architect_model=None,
engineer_api_key=None,
architect_api_key=None,
console=None,
confirm_handler=None,
trust=False)-
Expand source code
@ClassHook class ai: """Hybrid Multi-Agent System: Selective Escalation with Role Persistence.""" SAFE_COMMANDS = [ r'^show\s+', r'^ls\s*', r'^cat\s+', r'^ip\s+', r'^pwd$', r'^hostname$', r'^uname', r'^df\s*', r'^free\s*', r'^ps\s*', r'^ping\s+', r'^traceroute\s+', r'^whois\s+', r'^kubectl\s+(get|describe|version|logs|top|explain|cluster-info|api-resources|api-versions)\s+', r'^systemctl\s+status\s+', r'^journalctl\s+' ] def __init__(self, config, org=None, api_key=None, engineer_model=None, architect_model=None, engineer_api_key=None, architect_api_key=None, console=None, confirm_handler=None, trust=False): self.config = config self.console = console or printer.console self.confirm_handler = confirm_handler or self._local_confirm_handler self.trusted_session = trust # Trust mode for the entire session self.interrupted = False # 1. Cargar configuración genérica aiconfig = self.config.config.get("ai", {}) # Modelos (Prioridad: Argumento -> Config -> Default) self.engineer_model = engineer_model or aiconfig.get("engineer_model") or "gemini/gemini-3.1-flash-lite-preview" self.architect_model = architect_model or aiconfig.get("architect_model") or "anthropic/claude-sonnet-4-6" # API Keys (Prioridad: Argumento -> Config) self.engineer_key = engineer_api_key or aiconfig.get("engineer_api_key") self.architect_key = architect_api_key or aiconfig.get("architect_api_key") # Custom Trusted Commands Regexes custom_trusted = aiconfig.get("trusted_commands", []) if isinstance(custom_trusted, str): custom_trusted = [c.strip() for c in custom_trusted.split(",") if c.strip()] self.safe_commands = list(self.SAFE_COMMANDS) + (custom_trusted if isinstance(custom_trusted, list) else []) # Límites self.max_history = 30 self.max_truncate = 50000 self.soft_limit_iterations = 20 # Show warning and suggest Ctrl+C self.hard_limit_iterations = 50 # Force stop # External tool registry (populated by plugins via ClassHook.modify) self.external_engineer_tools = [] # Tool defs for Engineer LLM self.external_architect_tools = [] # Tool defs for Architect LLM self.external_tool_handlers = {} # {"tool_name": handler_callable} self.tool_status_formatters = {} # {"tool_name": formatter_callable} self.engineer_prompt_extensions = [] # Extra text for engineer prompt self.architect_prompt_extensions = [] # Extra text for architect prompt # Long-term memory self.memory_path = os.path.join(self.config.defaultdir, "ai_memory.md") self.long_term_memory = "" if os.path.exists(self.memory_path): try: with open(self.memory_path, "r") as f: self.long_term_memory = f.read() except FileNotFoundError: self.long_term_memory = "" except PermissionError as e: self.console.print(f"[warning]Warning: Cannot read AI memory file: {e}[/warning]") except Exception as e: self.console.print(f"[warning]Warning: Failed to load AI memory: {e}[/warning]") # Session Management self.sessions_dir = os.path.join(self.config.defaultdir, "ai_sessions") os.makedirs(self.sessions_dir, exist_ok=True) self.session_id = None self.session_path = None # Prompts base agnósticos architect_instructions = "" if self.architect_key: architect_instructions = """ CRITICAL - CONSULT vs ESCALATE: - ALWAYS use 'consult_architect' for: Configuration planning, design decisions, complex troubleshooting. Examples: "consultalo con el arquitecto", "preguntale al arquitecto", "que opina el arquitecto" You stay in control and present the advice to the user. - ONLY use 'escalate_to_architect' when user EXPLICITLY asks to TALK to the Architect: Examples: "quiero hablar con el arquitecto", "pasame con el arquitecto", "que me atienda el arquitecto" After escalation, you hand over control completely. - DEFAULT: When in doubt, use 'consult_architect'. Escalation is rare. """ else: architect_instructions = """ CRITICAL - ARCHITECT UNAVAILABLE: - The Strategic Reasoning Engine (Architect) is currently UNAVAILABLE because its API key is not configured. - DO NOT attempt to consult or escalate to the architect. - If the user asks to consult the architect, inform them that the Architect is offline and offer to help them directly to the best of your abilities. """ self._engineer_base_prompt = dedent(f""" Role: TECHNICAL EXECUTION ENGINE. Expertise: Universal Networking (Cisco, Nokia, Juniper, 6wind, etc.). Rules: - BE FAST AND EXTREMELY CONCISE: Provide direct answers. No filler words, no decorative language, no polite pleasantries. Save output tokens at all costs. - KNOWLEDGE FIRST: For general networking questions (AS numbers, protocol details, standards, generic commands), use your internal knowledge. ONLY use tools when the user's specific infrastructure data is required. - INVENTORY ONLY: 'run_commands', 'list_nodes', and 'get_node_info' are ONLY for interacting with the user's inventory. - BROADCAST RESTRICTION: Avoid using filter '.*' in 'run_commands' unless the user explicitly requests a global action. Try to target specific nodes or groups based on the conversation. - AUTONOMY: Proactively use iterative tool calls to find the root cause of infrastructure issues. - BATCH OPERATIONS: When working on multiple devices, call tools in parallel. - COMPLETE MISSIONS: Execute ALL steps of a mission before reporting back. - DIAGRAM: Use ASCII art or Unicode box-drawing characters directly in your responses to visualize topologies or paths when helpful. - EVIDENCE: Include 'Key Snippets' from tool outputs. Be token-efficient. - NO WANDERING: Do not speculate. If stuck, report attempts. - SAFETY: When you use 'run_commands' with configuration commands, the system automatically prompts the user for confirmation. Just execute - don't ask permission first. {architect_instructions} Network Context: {{self.long_term_memory if self.long_term_memory else "Empty."}} """).strip() self._architect_base_prompt = dedent(f""" Role: STRATEGIC REASONING ENGINE. Expertise: Network Architecture, Complex Troubleshooting, and Design Validation. Rules: - CONCISENESS IS MANDATORY: Strip out fluff, decorative language, and filler words. Provide direct, tactical instructions and analysis to save output tokens. - STRATEGY: Define technical missions for the Engineer. - DIAGRAM: Use ASCII art or Unicode box-drawing characters in your responses to visualize topologies, traffic paths, or logic flows. - ENGINEER CAPABILITIES: Your Engineer can: * Filter nodes (list_nodes), Run CLI commands (run_commands), Get metadata (get_node_info). - ANALYSIS: Review technical findings to identify patterns or design failures. - MEMORY: Update long-term facts ONLY when the user explicitly requests it. CRITICAL - EFFICIENT DELEGATION: - Plan ALL tasks upfront before delegating. - Delegate ONCE with a complete, detailed mission including ALL steps. - Example: "List all routers matching 'border.*', then run 'show ip bgp summary' and 'show ip route' on each, then analyze the outputs." - DO NOT delegate multiple times for the same goal. Batch everything into ONE mission. - Wait for Engineer's complete report before responding to user. CRITICAL - RETURNING CONTROL: - When your strategic analysis is complete and no further architectural decisions are needed, use 'return_to_engineer' to hand control back. - The Engineer is better suited for ongoing technical execution and troubleshooting. - Only stay in control if the user explicitly needs strategic oversight for multiple interactions. Network Context: {self.long_term_memory if self.long_term_memory else "Empty."} """).strip() def _local_confirm_handler(self, prompt, default="n"): """Default confirmation handler using rich.prompt.""" from rich.prompt import Prompt return Prompt.ask(prompt, default=default) @property def engineer_system_prompt(self): """Build engineer system prompt with plugin extensions.""" if self.engineer_prompt_extensions: extensions = "\n".join(self.engineer_prompt_extensions) return self._engineer_base_prompt + f"\n\nPlugin Capabilities:\n{extensions}" return self._engineer_base_prompt @property def architect_system_prompt(self): """Build architect system prompt with plugin extensions.""" if self.architect_prompt_extensions: extensions = "\n".join(self.architect_prompt_extensions) return self._architect_base_prompt + f"\n\nPlugin Capabilities:\n{extensions}" return self._architect_base_prompt def register_ai_tool(self, tool_definition, handler, target="engineer", engineer_prompt=None, architect_prompt=None, status_formatter=None): """Register an external tool for the AI system. Args: tool_definition (dict): OpenAI-compatible tool definition. handler (callable): Function(ai_instance, **tool_args) -> str. target (str): 'engineer', 'architect', or 'both'. engineer_prompt (str): Extra text for engineer system prompt. architect_prompt (str): Extra text for architect system prompt. status_formatter (callable): Function(args_dict) -> status string. """ name = tool_definition["function"]["name"] if target in ("engineer", "both"): self.external_engineer_tools.append(tool_definition) if target in ("architect", "both"): self.external_architect_tools.append(tool_definition) self.external_tool_handlers[name] = handler if engineer_prompt: self.engineer_prompt_extensions.append(engineer_prompt) if architect_prompt: self.architect_prompt_extensions.append(architect_prompt) if status_formatter: self.tool_status_formatters[name] = status_formatter def _stream_completion(self, model, messages, tools, api_key, status=None, label="", debug=False, chunk_callback=None, **kwargs): """Stream a completion call, rendering styled Markdown in real-time. Returns (response, streamed) where: - response: reconstructed ModelResponse (same as non-streaming) - streamed: True if text was rendered to console during streaming """ from rich.live import Live stream_resp = completion(model=model, messages=messages, tools=tools, api_key=api_key, stream=True, **kwargs) chunks = [] full_content = "" is_streaming_text = False has_tool_calls = False live_display = None # Determine styling based on current brain role_label = "Network Architect" if "architect" in label.lower() else "Network Engineer" alias = "architect" if "architect" in label.lower() else "engineer" title = f"[bold {alias}]{role_label}[/bold {alias}]" border = alias try: for chunk in stream_resp: chunks.append(chunk) delta = chunk.choices[0].delta # Detect tool calls if hasattr(delta, 'tool_calls') and delta.tool_calls: has_tool_calls = True # Stream text content with styled rendering if hasattr(delta, 'content') and delta.content: full_content += delta.content if chunk and chunk_callback: # Check for remote interruption during streaming if hasattr(self, "interrupted") and self.interrupted: raise KeyboardInterrupt chunk_callback(delta.content) if not debug and not chunk_callback: if not is_streaming_text: # Stop spinner before starting live display if status: status.stop() live_display = Live( Panel(Markdown(full_content), title=title, border_style=border, expand=False), console=self.console, refresh_per_second=8, transient=False ) live_display.start() is_streaming_text = True else: live_display.update( Panel(Markdown(full_content), title=title, border_style=border, expand=False) ) except Exception as e: if not chunks: raise finally: if live_display: # Render final state with complete content try: live_display.update( Panel(Markdown(full_content), title=title, border_style=border, expand=False) ) except Exception: pass live_display.stop() # Rebuild complete response from chunks try: response = stream_chunk_builder(chunks, messages=messages) except Exception: # Fallback: manual reconstruction if stream_chunk_builder fails full_content_rebuilt = "" tool_calls_map = {} for c in chunks: d = c.choices[0].delta if hasattr(d, 'content') and d.content: full_content_rebuilt += d.content if hasattr(d, 'tool_calls') and d.tool_calls: for tc in d.tool_calls: idx = tc.index if idx not in tool_calls_map: tool_calls_map[idx] = {"id": tc.id or "", "type": "function", "function": {"name": getattr(tc.function, 'name', '') or '', "arguments": getattr(tc.function, 'arguments', '') or ''}} else: if tc.id: tool_calls_map[idx]["id"] = tc.id if tc.function: if tc.function.name: tool_calls_map[idx]["function"]["name"] = tc.function.name if tc.function.arguments: tool_calls_map[idx]["function"]["arguments"] += tc.function.arguments # Build a minimal response-like object class FakeFunc: def __init__(self, name, arguments): self.name = name; self.arguments = arguments class FakeTC: def __init__(self, d): self.id = d["id"]; self.function = FakeFunc(d["function"]["name"], d["function"]["arguments"]) def model_dump(self, **kw): return {"id": self.id, "type": "function", "function": {"name": self.function.name, "arguments": self.function.arguments}} class FakeMsg: def __init__(self, content, tcs): self.content = content or None; self.tool_calls = tcs if tcs else None; self.role = "assistant" def model_dump(self, **kw): d = {"role": "assistant", "content": self.content} if self.tool_calls: d["tool_calls"] = [tc.model_dump() for tc in self.tool_calls] return d class FakeChoice: def __init__(self, msg): self.message = msg class FakeResp: def __init__(self, choice): self.choices = [choice]; self.usage = None tcs = [FakeTC(tool_calls_map[i]) for i in sorted(tool_calls_map)] if tool_calls_map else None response = FakeResp(FakeChoice(FakeMsg(full_content_rebuilt or full_content, tcs))) # Only count as "streamed" if we rendered text AND it was the final response (no tool calls) streamed = is_streaming_text and not has_tool_calls return response, streamed def _sanitize_messages(self, messages): """Sanitize message list for strict providers like Gemini. Ensures that: 1. Every assistant message with tool_calls is followed by ALL its tool responses 2. No user/system messages appear between tool_calls and tool responses 3. Orphaned tool_calls at the end are removed 4. Orphaned tool responses without a preceding tool_call are removed 5. Incompatible metadata like cache_control is stripped for non-Anthropic models 6. Enforces strict alternating history to prevent BadRequestError on Gemini. """ if not messages: return messages # Pre-process messages to pull text from list contents (Anthropic cache format) # and remove explicit cache keys. pre_sanitized = [] for msg in messages: m = msg.copy() if isinstance(msg, dict) else msg.model_dump(exclude_none=True) # Convert content list to plain string if it's a system message with caching metadata if m.get('role') == 'system' and isinstance(m.get('content'), list): if m['content'] and isinstance(m['content'][0], dict) and m['content'][0].get('text'): m['content'] = m['content'][0]['text'] else: m['content'] = "" # Remove any explicit cache_control key anywhere if 'cache_control' in m: del m['cache_control'] if isinstance(m.get('content'), list): for item in m['content']: if isinstance(item, dict) and 'cache_control' in item: del item['cache_control'] pre_sanitized.append(m) sanitized = [] last_role = None i = 0 while i < len(pre_sanitized): msg = pre_sanitized[i] role = msg.get('role', '') if role == 'system': sanitized.append(msg) last_role = 'system' i += 1 elif role == 'user': if last_role == 'user' and sanitized: # Combine consecutive user messages sanitized[-1]['content'] = str(sanitized[-1].get('content', '') or '') + '\n' + str(msg.get('content', '') or '') else: sanitized.append(msg) last_role = 'user' i += 1 elif role == 'assistant': has_tools = bool(msg.get('tool_calls')) # Gemini strict sequence: Assistant MUST be preceded by user or tool. # If preceded by system, assistant, or if it's the very first message... if last_role not in ('user', 'tool'): sanitized.append({"role": "user", "content": "[System sequence separator: History Truncated/Merged]"}) last_role = 'user' if has_tools: # Look ahead for matching tool responses tool_responses = [] j = i + 1 while j < len(pre_sanitized): next_msg = pre_sanitized[j] if next_msg.get('role') == 'tool': tool_responses.append(next_msg) j += 1 else: break if tool_responses: sanitized.append(msg) sanitized.extend(tool_responses) last_role = 'tool' i = j else: # Orphaned tool_calls with no responses - skip the assistant message # If we just added a dummy user message for this assistant, remove it too if sanitized and sanitized[-1].get('content') == "[System sequence separator: History Truncated/Merged]": sanitized.pop() last_role = sanitized[-1].get('role', '') if sanitized else None i += 1 else: sanitized.append(msg) last_role = 'assistant' i += 1 elif role == 'tool': # Orphaned tool response (no preceding assistant with tool_calls) - skip i += 1 else: sanitized.append(msg) last_role = role i += 1 return sanitized def _truncate(self, text, limit=None): """Truncate text to specified limit, keeping head (60%) and tail (40%).""" final_limit = limit or self.max_truncate if len(text) <= final_limit: return text head_limit = int(final_limit * 0.6) tail_limit = int(final_limit * 0.4) return (text[:head_limit] + f"\n\n[... OUTPUT TRUNCATED ...]\n\n" + text[-tail_limit:]) def manage_memory_tool(self, content, action="append"): """Save or update long-term memory. Only use when user explicitly requests it.""" if not content or not content.strip(): return "Error: Cannot save empty content to memory." try: mode = "a" if action == "append" else "w" os.makedirs(os.path.dirname(self.memory_path), exist_ok=True) with open(self.memory_path, mode) as f: timestamp = datetime.datetime.now().strftime('%Y-%m-%d %H:%M') f.write(f"\n\n## {timestamp}\n{content.strip()}\n" if action == "append" else content) # Reload memory after update with open(self.memory_path, "r") as f: self.long_term_memory = f.read() return "Memory updated successfully." except PermissionError as e: return f"Error: Permission denied writing to memory file: {e}" except Exception as e: return f"Error updating memory: {str(e)}" def list_nodes_tool(self, filter_pattern=".*"): """List nodes matching the filter pattern. Returns metadata for <=5 nodes, names only for more.""" try: matched_names = self.config._getallnodes(filter_pattern) if not matched_names: return "No nodes found." if len(matched_names) <= 5: matched_data = self.config.getitems(matched_names, extract=True) res = {} for name, data in matched_data.items(): os_tag = "unknown" if isinstance(data, dict): ts = data.get("tags") if isinstance(ts, dict): os_tag = ts.get("os", "unknown") res[name] = {"os": os_tag} return json.dumps(res) return json.dumps({"count": len(matched_names), "nodes": matched_names, "note": "Use 'get_node_info' for details."}) except Exception as e: return f"Error listing nodes: {str(e)}" def _is_safe_command(self, cmd): """Check if a command matches safe patterns.""" return any(re.match(pattern, cmd.strip(), re.IGNORECASE) for pattern in self.safe_commands) def run_commands_tool(self, nodes_filter, commands, status=None): """Execute commands on nodes matching the filter. Native interactive confirmation for unsafe commands.""" # Handle if commands is a JSON string if isinstance(commands, str): try: commands = json.loads(commands) except ValueError: commands = [c.strip() for c in commands.split('\n') if c.strip()] # Expand multi-line commands within a list (in case the AI packs them) if isinstance(commands, list): expanded_commands = [] for cmd in commands: expanded_commands.extend([c.strip() for c in str(cmd).split('\n') if c.strip()]) commands = expanded_commands else: commands = [str(commands)] # Check command safety natively if not self.trusted_session: unsafe_commands = [cmd for cmd in commands if not self._is_safe_command(cmd)] if unsafe_commands: # Stop the spinner so prompt doesn't get messed up if status: status.stop() # Show ALL commands with unsafe ones highlighted formatted_cmds = [] for cmd in commands: if cmd in unsafe_commands: formatted_cmds.append(f" • [warning]{cmd}[/warning]") else: formatted_cmds.append(f" • {cmd}") panel_content = f"Target: {nodes_filter}\nCommands:\n" + "\n".join(formatted_cmds) # Use print_important if available (for remote bridges) fallback to standard print print_fn = getattr(self.console, "print_important", self.console.print) print_fn(Panel(panel_content, title="[bold warning]⚠️ UNSAFE COMMANDS DETECTED[/bold warning]", border_style="warning")) try: user_resp = self.confirm_handler("[bold warning]Execute? (y: yes / n: no / a: allow all this session / <text>: feedback)[/bold warning]", default="n") except KeyboardInterrupt: if status: status.update("[ai_status]Engineer: Resuming...") self.console.print("[fail]✗ Aborted by user (Ctrl+C).[/fail]") raise # Resume the spinner if status: status.update("[ai_status]Engineer: Processing user response...") user_resp_lower = user_resp.strip().lower() if user_resp_lower in ['a', 'allow']: self.trusted_session = True self.console.print("[pass]✓ Trust Mode Enabled. All future commands in this session will execute without confirmation.[/pass]") elif user_resp_lower in ['y', 'yes']: self.console.print("[pass]✓ Executing...[/pass]") elif user_resp_lower in ['n', 'no', '']: self.console.print("[fail]✗ Execution rejected by user.[/fail]") return "Error: User rejected execution." else: self.console.print(f"[user_prompt]User feedback: [/user_prompt]{user_resp}") return f"User requested changes: {user_resp}. Please adjust the commands based on this feedback and try again." try: matched_names = self.config._getallnodes(nodes_filter) if not matched_names: return "No nodes found matching filter." thisnodes_dict = self.config.getitems(matched_names, extract=True) result = nodes(thisnodes_dict, config=self.config).run(commands) return self._truncate(json.dumps(result)) except Exception as e: return f"Error executing commands: {str(e)}" def get_node_info_tool(self, node_name): """Get detailed metadata for a specific node. Passwords are masked.""" try: d = self.config.getitem(node_name, extract=True) if 'password' in d: d['password'] = '***' return json.dumps(d) except Exception as e: return f"Error getting node info: {str(e)}" def _engineer_loop(self, task, status=None, debug=False, chat_history=None): """Internal loop where the Engineer executes technical tasks for the Architect.""" # Optimización de caché para el Ingeniero (Solo para Anthropic directo, Vertex tiene reglas distintas) if "claude" in self.engineer_model.lower() and "vertex" not in self.engineer_model.lower(): messages = [{"role": "system", "content": [{"type": "text", "text": self.engineer_system_prompt, "cache_control": {"type": "ephemeral"}}]}] else: messages = [{"role": "system", "content": self.engineer_system_prompt}] if chat_history: # Clean chat history from caching metadata if engineer is not a compatible Claude model if "claude" not in self.engineer_model.lower() or "vertex" in self.engineer_model.lower(): messages.extend(self._sanitize_messages(chat_history[-5:])) else: messages.extend(chat_history[-5:]) messages.append({"role": "user", "content": f"MISSION: {task}"}) tools = self._get_engineer_tools() usage = {"input": 0, "output": 0, "total": 0} iteration = 0 soft_limit_warned = False try: # Set up remote interrupt callback if bridge is provided if status and hasattr(status, "on_interrupt"): status.on_interrupt = lambda: setattr(self, "interrupted", True) while iteration < self.hard_limit_iterations: iteration += 1 # Check for interruption if self.interrupted: raise KeyboardInterrupt # Soft limit warning if iteration == self.soft_limit_iterations and not soft_limit_warned: self.console.print(f"[warning]⚠ Engineer has performed {iteration} steps. This is taking longer than expected.[/warning]") self.console.print(f"[warning] You can press Ctrl+C to interrupt and get a summary.[/warning]") soft_limit_warned = True if status: status.update(f"[ai_status]Engineer: Analyzing mission... (step {iteration})") try: safe_messages = self._sanitize_messages(messages) response = completion(model=self.engineer_model, messages=safe_messages, tools=tools, api_key=self.engineer_key) except Exception as e: if status: status.stop() raise ValueError(f"Engineer failed to connect: {str(e)}") if hasattr(response, "usage") and response.usage: usage["input"] += getattr(response.usage, "prompt_tokens", 0) usage["output"] += getattr(response.usage, "completion_tokens", 0) usage["total"] += getattr(response.usage, "total_tokens", 0) resp_msg = response.choices[0].message msg_dict = resp_msg.model_dump(exclude_none=True) if msg_dict.get("tool_calls") and msg_dict.get("content") == "": msg_dict["content"] = None messages.append(msg_dict) if not resp_msg.tool_calls: break for tc in resp_msg.tool_calls: fn, args = tc.function.name, json.loads(tc.function.arguments) # Notificación en tiempo real de la tarea técnica if status: if fn == "list_nodes": status.update(f"[ai_status]Engineer: [SEARCH] {args.get('filter_pattern','.*')}") elif fn == "run_commands": cmds = args.get('commands', []) cmd_str = cmds[0] if cmds else "" status.update(f"[ai_status]Engineer: [CMD] {cmd_str}") elif fn == "get_node_info": status.update(f"[ai_status]Engineer: [INSPECT] {args.get('node_name','')}") elif fn in self.tool_status_formatters: status.update(self.tool_status_formatters[fn](args)) if debug: self.console.print(Panel(Text(json.dumps(args, indent=2)), title=f"[bold engineer]Engineer Tool: {fn}[/bold engineer]", border_style="engineer")) if fn == "list_nodes": obs = self.list_nodes_tool(**args) elif fn == "run_commands": obs = self.run_commands_tool(**args, status=status) elif fn == "get_node_info": obs = self.get_node_info_tool(**args) elif fn in self.external_tool_handlers: obs = self.external_tool_handlers[fn](self, **args) else: obs = f"Error: Unknown tool '{fn}'." if debug: self.console.print(Panel(Text(str(obs)), title=f"[bold pass]Engineer Observation: {fn}[/bold pass]", border_style="success")) messages.append({"tool_call_id": tc.id, "role": "tool", "name": fn, "content": obs}) if iteration >= self.hard_limit_iterations: self.console.print(f"[error]⛔ Engineer reached hard limit ({self.hard_limit_iterations} steps). Forcing stop.[/error]") if debug and resp_msg.content: self.console.print(Panel(Text(resp_msg.content), title="[bold engineer]Engineer Final Report to Architect[/bold engineer]", border_style="engineer")) return resp_msg.content, usage except Exception as e: return f"Engineer failed: {str(e)}", usage def _get_engineer_tools(self): """Define tools available to the Engineer.""" tools = [ {"type": "function", "function": {"name": "list_nodes", "description": "Lists available nodes in the inventory.", "parameters": {"type": "object", "properties": {"filter_pattern": {"type": "string", "description": "Regex to filter nodes (e.g. '.*', 'border.*')."}}}}}, {"type": "function", "function": {"name": "run_commands", "description": "Runs one or more commands on matched nodes. MANDATORY: You MUST call 'list_nodes' first to verify the target list.", "parameters": {"type": "object", "properties": {"nodes_filter": {"type": "string", "description": "Exact node name or verified filter pattern."}, "commands": {"type": "array", "items": {"type": "string"}, "description": "List of commands (e.g. ['show ip route', 'show int desc'])."}}, "required": ["nodes_filter", "commands"]}}}, {"type": "function", "function": {"name": "get_node_info", "description": "Gets full metadata for a specific node.", "parameters": {"type": "object", "properties": {"node_name": {"type": "string"}}, "required": ["node_name"]}}} ] if self.architect_key: tools.extend([ {"type": "function", "function": {"name": "consult_architect", "description": "Ask the Strategic Reasoning Engine for advice on complex design, architecture, or troubleshooting decisions. You remain in control and will present the response to the user. Use this for: configuration planning, design validation, complex troubleshooting.", "parameters": {"type": "object", "properties": {"question": {"type": "string", "description": "Strategic question or decision needed."}, "technical_summary": {"type": "string", "description": "Technical findings and context gathered so far."}}, "required": ["question", "technical_summary"]}}}, {"type": "function", "function": {"name": "escalate_to_architect", "description": "Transfer full control to the Strategic Reasoning Engine. Use ONLY when the user explicitly requests the Architect or when the problem requires strategic oversight beyond consultation. After escalation, the Architect takes over the conversation.", "parameters": {"type": "object", "properties": {"reason": {"type": "string", "description": "Why you're escalating (e.g. 'User requested Architect', 'Complex multi-site design needed')."}, "context": {"type": "string", "description": "Full context and findings to hand over."}}, "required": ["reason", "context"]}}} ]) tools.extend(self.external_engineer_tools) return tools def _get_architect_tools(self): """Define tools available to the Strategic Reasoning Engine.""" tools = [ {"type": "function", "function": {"name": "delegate_to_engineer", "description": "Delegates a technical mission to the Engineer.", "parameters": {"type": "object", "properties": {"task": {"type": "string", "description": "Detailed technical mission or goal."}}, "required": ["task"]}}}, {"type": "function", "function": {"name": "return_to_engineer", "description": "Return control to the Engineer. Use this when your strategic analysis is complete and the Engineer should handle the rest of the conversation.", "parameters": {"type": "object", "properties": {"summary": {"type": "string", "description": "Brief summary of your analysis to hand over to the Engineer."}}, "required": ["summary"]}}}, {"type": "function", "function": {"name": "manage_memory_tool", "description": "Saves information to long-term memory. MANDATORY: Only use this if the user explicitly asks to remember or save something.", "parameters": {"type": "object", "properties": {"content": {"type": "string"}, "action": {"type": "string", "enum": ["append", "replace"]}}, "required": ["content"]}}} ] tools.extend(self.external_architect_tools) return tools def _get_sessions(self): """Returns a list of session metadata sorted by date.""" sessions = [] if not os.path.exists(self.sessions_dir): return [] for f in os.listdir(self.sessions_dir): if f.endswith(".json"): path = os.path.join(self.sessions_dir, f) try: with open(path, "r") as fs: data = json.load(fs) sessions.append({ "id": f[:-5], "title": data.get("title", "Untitled Session"), "created_at": data.get("created_at", "Unknown"), "model": data.get("model", "Unknown"), "path": path }) except Exception: continue return sorted(sessions, key=lambda x: x["created_at"], reverse=True) def list_sessions(self): """Prints a list of sessions using printer.table.""" sessions = self._get_sessions() if not sessions: printer.info("No saved AI sessions found.") return columns = ["ID", "Title", "Created At", "Model"] rows = [[s["id"], s["title"], s["created_at"], s["model"]] for s in sessions] printer.table("AI Persisted Sessions", columns, rows) def load_session_data(self, session_id): """Loads a session's raw data by ID.""" path = os.path.join(self.sessions_dir, f"{session_id}.json") if os.path.exists(path): try: with open(path, "r") as f: data = json.load(f) self.session_id = session_id self.session_path = path return data except Exception as e: printer.error(f"Failed to load session {session_id}: {e}") return None def delete_session(self, session_id): """Deletes a session by ID.""" path = os.path.join(self.sessions_dir, f"{session_id}.json") if os.path.exists(path): os.remove(path) printer.success(f"Session {session_id} deleted.") else: printer.error(f"Session {session_id} not found.") def get_last_session_id(self): """Returns the ID of the most recent session.""" sessions = self._get_sessions() return sessions[0]["id"] if sessions else None def _generate_session_id(self, query): """Generates a unique session ID based on timestamp.""" return datetime.datetime.now().strftime("%Y%m%d-%H%M%S") def save_session(self, history, title=None, model=None): """Saves current history to the session file.""" if not self.session_id: # Generate ID from first user query if available first_user_msg = next((m["content"] for m in history if m["role"] == "user"), "new-session") self.session_id = self._generate_session_id(first_user_msg) self.session_path = os.path.join(self.sessions_dir, f"{self.session_id}.json") # If it's a new file, we might want to set a better title if not os.path.exists(self.session_path) and not title: raw_title = next((m["content"] for m in history if m["role"] == "user"), "New Session") # Clean title: remove newlines, multiple spaces clean_title = " ".join(raw_title.split()) if len(clean_title) > 40: title = clean_title[:37].strip() + "..." else: title = clean_title try: # Read existing metadata if it exists metadata = {} if os.path.exists(self.session_path): with open(self.session_path, "r") as f: metadata = json.load(f) metadata.update({ "id": self.session_id, "title": title or metadata.get("title", "New Session"), "created_at": metadata.get("created_at", datetime.datetime.now().isoformat()), "updated_at": datetime.datetime.now().isoformat(), "model": model or metadata.get("model", self.engineer_model), "history": history }) with open(self.session_path, "w") as f: json.dump(metadata, f, indent=4) except Exception as e: printer.error(f"Failed to save session: {e}") except Exception as e: printer.error(f"Failed to save session: {e}") @MethodHook def ask(self, user_input, dryrun=False, chat_history=None, status=None, debug=False, stream=True, session_id=None, chunk_callback=None): if not self.engineer_key: raise ValueError("Engineer API key not configured. Use 'connpy config --engineer-api-key <key>' to set it.") if chat_history is None: chat_history = [] # Load session if provided and history is empty if session_id and not chat_history: session_data = self.load_session_data(session_id) if session_data: chat_history = session_data.get("history", []) # If we loaded history, the caller might need it back # But typically ask() is called in a loop with an external history object usage = {"input": 0, "output": 0, "total": 0} # 1. Selector de Rol inicial (Sticky Brain) explicit_architect = re.match(r'^(architect|arquitecto|@architect)[:\s]', user_input, re.I) explicit_engineer = re.match(r'^(engineer|ingeniero|@engineer)[:\s]', user_input, re.I) if explicit_architect: current_brain = "architect" elif explicit_engineer: current_brain = "engineer" else: # Sticky Brain: Detectar si el Arquitecto estaba al mando en el historial reciente is_architect_active = False for msg in reversed(chat_history[-5:]): tcs = msg.get('tool_calls') if isinstance(msg, dict) else getattr(msg, 'tool_calls', None) if tcs: for tc in tcs: fn = tc.get('function', {}).get('name') if isinstance(tc, dict) else getattr(getattr(tc, 'function', None), 'name', '') # Architect stays in control if delegating tasks or if Engineer escalated to them # consult_architect is just Engineer asking for advice - Engineer keeps control if fn in ['delegate_to_engineer', 'escalate_to_architect']: is_architect_active = True; break if is_architect_active: break current_brain = "architect" if is_architect_active else "engineer" # 2. Preparación de mensajes y limpieza clean_input = re.sub(r'^(architect|arquitecto|engineer|ingeniero|@architect|@engineer)[:\s]+', '', user_input, flags=re.IGNORECASE).strip() system_prompt = self.architect_system_prompt if current_brain == "architect" else self.engineer_system_prompt tools = self._get_architect_tools() if current_brain == "architect" else self._get_engineer_tools() model = self.architect_model if current_brain == "architect" else self.engineer_model key = self.architect_key if current_brain == "architect" else self.engineer_key # Estructura optimizada para Prompt Caching (Solo para Anthropic directo, Vertex tiene reglas distintas) if "claude" in model.lower() and "vertex" not in model.lower(): messages = [{"role": "system", "content": [{"type": "text", "text": system_prompt, "cache_control": {"type": "ephemeral"}}]}] else: messages = [{"role": "system", "content": system_prompt}] # Interleaving de historial last_role = "system" # Sanitize history if the current target model is not compatible with cache_control history_to_process = chat_history[-self.max_history:] if "claude" not in model.lower() or "vertex" in model.lower(): history_to_process = self._sanitize_messages(history_to_process) for msg in history_to_process: m = msg if isinstance(msg, dict) else msg.model_dump(exclude_none=True) role = m.get('role') if role == last_role and role == 'user': messages[-1]['content'] += "\n" + (m.get('content') or "") continue if role == 'assistant' and m.get('tool_calls') and m.get('content') == "": m['content'] = None messages.append(m) last_role = role if last_role == 'user': messages[-1]['content'] += "\n" + clean_input else: messages.append({"role": "user", "content": clean_input}) # 3. Bucle de ejecución iteration = 0 try: # Set up remote interrupt callback if bridge is provided if status and hasattr(status, "on_interrupt"): status.on_interrupt = lambda: setattr(self, "interrupted", True) while iteration < self.hard_limit_iterations: iteration += 1 # Check for interruption if self.interrupted: raise KeyboardInterrupt # Soft limit warning if iteration == self.soft_limit_iterations and not soft_limit_warned: self.console.print(f"[warning]⚠ Agent has performed {iteration} steps. This is taking longer than expected.[/warning]") self.console.print(f"[warning] You can press Ctrl+C to interrupt and get a summary of progress.[/warning]") soft_limit_warned = True label = "[architect][bold]Architect[/bold][/architect]" if current_brain == "architect" else "[engineer][bold]Engineer[/bold][/engineer]" if status: status.update(f"{label} is thinking... (step {iteration})") streamed_response = False try: safe_messages = self._sanitize_messages(messages) if stream and not debug: response, streamed_response = self._stream_completion( model=model, messages=safe_messages, tools=tools, api_key=key, status=status, label=label, debug=debug, num_retries=3, chunk_callback=chunk_callback ) else: response = completion(model=model, messages=safe_messages, tools=tools, api_key=key, num_retries=3) except Exception as e: if current_brain == "architect": if status: status.update("[unavailable]Architect unavailable! Falling back to Engineer...") # Preserve context when falling back - use clean_input directly current_brain = "engineer" model = self.engineer_model tools = self._get_engineer_tools() key = self.engineer_key # Rebuild messages with Engineer system prompt and original user request messages = [{"role": "system", "content": self.engineer_system_prompt}] # Add chat history if exists (excluding system prompt) if chat_history: for msg in chat_history[-self.max_history:]: if msg.get('role') != 'system': messages.append(msg) # Add current user request messages.append({"role": "user", "content": clean_input}) continue else: return {"response": f"Error: Both engines failed. {str(e)}", "chat_history": messages[1:], "usage": usage} if hasattr(response, "usage") and response.usage: usage["input"] += getattr(response.usage, "prompt_tokens", 0) usage["output"] += getattr(response.usage, "completion_tokens", 0) usage["total"] += getattr(response.usage, "total_tokens", 0) resp_msg = response.choices[0].message msg_dict = resp_msg.model_dump(exclude_none=True) if msg_dict.get("tool_calls") and msg_dict.get("content") == "": msg_dict["content"] = None messages.append(msg_dict) if debug and resp_msg.content: self.console.print(Panel(Markdown(resp_msg.content), title=f"{label} Reasoning", border_style="architect" if current_brain == "architect" else "engineer")) if not resp_msg.tool_calls: break # Track if we need to inject a user message after all tool responses pending_user_message = None for tc in resp_msg.tool_calls: fn, args = tc.function.name, json.loads(tc.function.arguments) # Validate tool access based on current brain if fn in ['delegate_to_engineer'] and current_brain != "architect": obs = f"Error: Tool '{fn}' is only available to the Architect (Architect). You are the Engineer (Engineer). Use 'run_commands' directly to execute configuration." messages.append({"tool_call_id": tc.id, "role": "tool", "name": fn, "content": obs}) continue if status: if fn == "delegate_to_engineer": status.update(f"[architect]Architect: [DELEGATING MISSION] {args.get('task','')[:40]}...") elif fn == "manage_memory_tool": status.update(f"[architect]Architect: [UPDATING MEMORY]") if debug: self.console.print(Panel(Text(json.dumps(args, indent=2)), title=f"{label} Decision: {fn}", border_style="debug")) if fn == "delegate_to_engineer": obs, eng_usage = self._engineer_loop(args["task"], status=status, debug=debug, chat_history=messages[:-1]) usage["input"] += eng_usage["input"]; usage["output"] += eng_usage["output"]; usage["total"] += eng_usage["total"] elif fn == "consult_architect": if status: status.update("[architect]Engineer consulting Architect...") try: # Consultation only - Engineer stays in control claude_resp = completion( model=self.architect_model, messages=[ {"role": "system", "content": self.architect_system_prompt}, {"role": "user", "content": f"The Engineer needs your strategic advice.\n\nTECHNICAL SUMMARY: {args['technical_summary']}\n\nQUESTION: {args['question']}\n\nProvide strategic guidance. The Engineer will continue handling the user."} ], api_key=self.architect_key, num_retries=3 ) obs = claude_resp.choices[0].message.content if debug: self.console.print(Panel(Markdown(obs), title="[architect]Architect Consultation[/architect]", border_style="architect")) except Exception as e: if status: status.update("[unavailable]Architect unavailable! Engineer continuing alone...") obs = f"Architect unavailable ({str(e)}). Proceeding with your best technical judgment." elif fn == "escalate_to_architect": if status: status.update("[architect]Transferring control to Architect...") # Full escalation - Architect takes over current_brain = "architect" model = self.architect_model tools = self._get_architect_tools() key = self.architect_key messages[0] = {"role": "system", "content": self.architect_system_prompt} # Prepare handover context to inject AFTER all tool responses handover_msg = f"HANDOVER FROM EXECUTION ENGINE\n\nReason: {args['reason']}\n\nContext: {args['context']}\n\nYou are now in control of this conversation." pending_user_message = handover_msg obs = "Control transferred to Architect. Handover context will be provided." if debug: self.console.print(Panel(Text(handover_msg), title="[architect]Escalation to Architect[/architect]", border_style="architect")) elif fn == "return_to_engineer": if status: status.update("[engineer]Transferring control back to Engineer...") # Architect returns control to Engineer current_brain = "engineer" model = self.engineer_model tools = self._get_engineer_tools() key = self.engineer_key messages[0] = {"role": "system", "content": self.engineer_system_prompt} # Prepare handover context to inject AFTER all tool responses handover_msg = f"HANDOVER FROM ARCHITECT\n\nSummary: {args['summary']}\n\nYou are now back in control. Continue handling the user's requests." pending_user_message = handover_msg obs = "Control returned to Engineer. Handover summary will be provided." if debug: self.console.print(Panel(Text(handover_msg), title="[engineer]Return to Engineer[/engineer]", border_style="engineer")) elif fn == "list_nodes": obs = self.list_nodes_tool(**args) elif fn == "run_commands": obs = self.run_commands_tool(**args, status=status) elif fn == "get_node_info": obs = self.get_node_info_tool(**args) elif fn == "manage_memory_tool": obs = self.manage_memory_tool(**args) elif fn in self.external_tool_handlers: obs = self.external_tool_handlers[fn](self, **args) else: obs = f"Error: {fn} unknown." messages.append({"tool_call_id": tc.id, "role": "tool", "name": fn, "content": obs}) # Inject pending user message AFTER all tool responses are added if pending_user_message: messages.append({"role": "user", "content": pending_user_message}) if iteration >= self.hard_limit_iterations: self.console.print(f"[error]⛔ Agent reached hard limit ({self.hard_limit_iterations} steps). Forcing stop to prevent infinite loop.[/error]") # Only inject user message if we're not in the middle of tool calls last_msg = messages[-1] if messages else {} if last_msg.get("role") != "assistant" or not last_msg.get("tool_calls"): messages.append({"role": "user", "content": "Hard iteration limit reached. Please provide a summary of your findings so far."}) try: safe_messages = self._sanitize_messages(messages) response = completion(model=model, messages=safe_messages, tools=[], api_key=key) resp_msg = response.choices[0].message messages.append(resp_msg.model_dump(exclude_none=True)) except Exception as e: if status: status.update(f"[error]Error fetching summary: {e}[/error]") printer.warning(f"Failed to fetch final summary from LLM: {e}") except KeyboardInterrupt: if status: status.update("[error]Interrupted! Closing pending tasks...") last_msg = messages[-1] if last_msg.get("tool_calls"): for tc in last_msg["tool_calls"]: messages.append({"tool_call_id": tc.get("id"), "role": "tool", "name": tc.get("function", {}).get("name"), "content": "Operation cancelled by user."}) messages.append({"role": "user", "content": "USER INTERRUPTED. Briefly summarize what you were doing and stop."}) try: safe_messages = self._sanitize_messages(messages) # Use tools=None to force a text summary during interruption response = completion(model=model, messages=safe_messages, tools=None, api_key=key) resp_msg = response.choices[0].message messages.append(resp_msg.model_dump(exclude_none=True)) except Exception: pass finally: # Auto-save session self.save_session(messages, model=model) return { "response": messages[-1].get("content"), "chat_history": messages[1:], "app_related": True, "usage": usage, "responder": current_brain, # "architect" or "engineer" "streamed": streamed_response } @MethodHook def confirm(self, user_input): return TrueHybrid Multi-Agent System: Selective Escalation with Role Persistence.
Class variables
var SAFE_COMMANDS
Instance variables
prop architect_system_prompt-
Expand source code
@property def architect_system_prompt(self): """Build architect system prompt with plugin extensions.""" if self.architect_prompt_extensions: extensions = "\n".join(self.architect_prompt_extensions) return self._architect_base_prompt + f"\n\nPlugin Capabilities:\n{extensions}" return self._architect_base_promptBuild architect system prompt with plugin extensions.
prop engineer_system_prompt-
Expand source code
@property def engineer_system_prompt(self): """Build engineer system prompt with plugin extensions.""" if self.engineer_prompt_extensions: extensions = "\n".join(self.engineer_prompt_extensions) return self._engineer_base_prompt + f"\n\nPlugin Capabilities:\n{extensions}" return self._engineer_base_promptBuild engineer system prompt with plugin extensions.
Methods
def ask(self,
user_input,
dryrun=False,
chat_history=None,
status=None,
debug=False,
stream=True,
session_id=None,
chunk_callback=None)-
Expand source code
@MethodHook def ask(self, user_input, dryrun=False, chat_history=None, status=None, debug=False, stream=True, session_id=None, chunk_callback=None): if not self.engineer_key: raise ValueError("Engineer API key not configured. Use 'connpy config --engineer-api-key <key>' to set it.") if chat_history is None: chat_history = [] # Load session if provided and history is empty if session_id and not chat_history: session_data = self.load_session_data(session_id) if session_data: chat_history = session_data.get("history", []) # If we loaded history, the caller might need it back # But typically ask() is called in a loop with an external history object usage = {"input": 0, "output": 0, "total": 0} # 1. Selector de Rol inicial (Sticky Brain) explicit_architect = re.match(r'^(architect|arquitecto|@architect)[:\s]', user_input, re.I) explicit_engineer = re.match(r'^(engineer|ingeniero|@engineer)[:\s]', user_input, re.I) if explicit_architect: current_brain = "architect" elif explicit_engineer: current_brain = "engineer" else: # Sticky Brain: Detectar si el Arquitecto estaba al mando en el historial reciente is_architect_active = False for msg in reversed(chat_history[-5:]): tcs = msg.get('tool_calls') if isinstance(msg, dict) else getattr(msg, 'tool_calls', None) if tcs: for tc in tcs: fn = tc.get('function', {}).get('name') if isinstance(tc, dict) else getattr(getattr(tc, 'function', None), 'name', '') # Architect stays in control if delegating tasks or if Engineer escalated to them # consult_architect is just Engineer asking for advice - Engineer keeps control if fn in ['delegate_to_engineer', 'escalate_to_architect']: is_architect_active = True; break if is_architect_active: break current_brain = "architect" if is_architect_active else "engineer" # 2. Preparación de mensajes y limpieza clean_input = re.sub(r'^(architect|arquitecto|engineer|ingeniero|@architect|@engineer)[:\s]+', '', user_input, flags=re.IGNORECASE).strip() system_prompt = self.architect_system_prompt if current_brain == "architect" else self.engineer_system_prompt tools = self._get_architect_tools() if current_brain == "architect" else self._get_engineer_tools() model = self.architect_model if current_brain == "architect" else self.engineer_model key = self.architect_key if current_brain == "architect" else self.engineer_key # Estructura optimizada para Prompt Caching (Solo para Anthropic directo, Vertex tiene reglas distintas) if "claude" in model.lower() and "vertex" not in model.lower(): messages = [{"role": "system", "content": [{"type": "text", "text": system_prompt, "cache_control": {"type": "ephemeral"}}]}] else: messages = [{"role": "system", "content": system_prompt}] # Interleaving de historial last_role = "system" # Sanitize history if the current target model is not compatible with cache_control history_to_process = chat_history[-self.max_history:] if "claude" not in model.lower() or "vertex" in model.lower(): history_to_process = self._sanitize_messages(history_to_process) for msg in history_to_process: m = msg if isinstance(msg, dict) else msg.model_dump(exclude_none=True) role = m.get('role') if role == last_role and role == 'user': messages[-1]['content'] += "\n" + (m.get('content') or "") continue if role == 'assistant' and m.get('tool_calls') and m.get('content') == "": m['content'] = None messages.append(m) last_role = role if last_role == 'user': messages[-1]['content'] += "\n" + clean_input else: messages.append({"role": "user", "content": clean_input}) # 3. Bucle de ejecución iteration = 0 try: # Set up remote interrupt callback if bridge is provided if status and hasattr(status, "on_interrupt"): status.on_interrupt = lambda: setattr(self, "interrupted", True) while iteration < self.hard_limit_iterations: iteration += 1 # Check for interruption if self.interrupted: raise KeyboardInterrupt # Soft limit warning if iteration == self.soft_limit_iterations and not soft_limit_warned: self.console.print(f"[warning]⚠ Agent has performed {iteration} steps. This is taking longer than expected.[/warning]") self.console.print(f"[warning] You can press Ctrl+C to interrupt and get a summary of progress.[/warning]") soft_limit_warned = True label = "[architect][bold]Architect[/bold][/architect]" if current_brain == "architect" else "[engineer][bold]Engineer[/bold][/engineer]" if status: status.update(f"{label} is thinking... (step {iteration})") streamed_response = False try: safe_messages = self._sanitize_messages(messages) if stream and not debug: response, streamed_response = self._stream_completion( model=model, messages=safe_messages, tools=tools, api_key=key, status=status, label=label, debug=debug, num_retries=3, chunk_callback=chunk_callback ) else: response = completion(model=model, messages=safe_messages, tools=tools, api_key=key, num_retries=3) except Exception as e: if current_brain == "architect": if status: status.update("[unavailable]Architect unavailable! Falling back to Engineer...") # Preserve context when falling back - use clean_input directly current_brain = "engineer" model = self.engineer_model tools = self._get_engineer_tools() key = self.engineer_key # Rebuild messages with Engineer system prompt and original user request messages = [{"role": "system", "content": self.engineer_system_prompt}] # Add chat history if exists (excluding system prompt) if chat_history: for msg in chat_history[-self.max_history:]: if msg.get('role') != 'system': messages.append(msg) # Add current user request messages.append({"role": "user", "content": clean_input}) continue else: return {"response": f"Error: Both engines failed. {str(e)}", "chat_history": messages[1:], "usage": usage} if hasattr(response, "usage") and response.usage: usage["input"] += getattr(response.usage, "prompt_tokens", 0) usage["output"] += getattr(response.usage, "completion_tokens", 0) usage["total"] += getattr(response.usage, "total_tokens", 0) resp_msg = response.choices[0].message msg_dict = resp_msg.model_dump(exclude_none=True) if msg_dict.get("tool_calls") and msg_dict.get("content") == "": msg_dict["content"] = None messages.append(msg_dict) if debug and resp_msg.content: self.console.print(Panel(Markdown(resp_msg.content), title=f"{label} Reasoning", border_style="architect" if current_brain == "architect" else "engineer")) if not resp_msg.tool_calls: break # Track if we need to inject a user message after all tool responses pending_user_message = None for tc in resp_msg.tool_calls: fn, args = tc.function.name, json.loads(tc.function.arguments) # Validate tool access based on current brain if fn in ['delegate_to_engineer'] and current_brain != "architect": obs = f"Error: Tool '{fn}' is only available to the Architect (Architect). You are the Engineer (Engineer). Use 'run_commands' directly to execute configuration." messages.append({"tool_call_id": tc.id, "role": "tool", "name": fn, "content": obs}) continue if status: if fn == "delegate_to_engineer": status.update(f"[architect]Architect: [DELEGATING MISSION] {args.get('task','')[:40]}...") elif fn == "manage_memory_tool": status.update(f"[architect]Architect: [UPDATING MEMORY]") if debug: self.console.print(Panel(Text(json.dumps(args, indent=2)), title=f"{label} Decision: {fn}", border_style="debug")) if fn == "delegate_to_engineer": obs, eng_usage = self._engineer_loop(args["task"], status=status, debug=debug, chat_history=messages[:-1]) usage["input"] += eng_usage["input"]; usage["output"] += eng_usage["output"]; usage["total"] += eng_usage["total"] elif fn == "consult_architect": if status: status.update("[architect]Engineer consulting Architect...") try: # Consultation only - Engineer stays in control claude_resp = completion( model=self.architect_model, messages=[ {"role": "system", "content": self.architect_system_prompt}, {"role": "user", "content": f"The Engineer needs your strategic advice.\n\nTECHNICAL SUMMARY: {args['technical_summary']}\n\nQUESTION: {args['question']}\n\nProvide strategic guidance. The Engineer will continue handling the user."} ], api_key=self.architect_key, num_retries=3 ) obs = claude_resp.choices[0].message.content if debug: self.console.print(Panel(Markdown(obs), title="[architect]Architect Consultation[/architect]", border_style="architect")) except Exception as e: if status: status.update("[unavailable]Architect unavailable! Engineer continuing alone...") obs = f"Architect unavailable ({str(e)}). Proceeding with your best technical judgment." elif fn == "escalate_to_architect": if status: status.update("[architect]Transferring control to Architect...") # Full escalation - Architect takes over current_brain = "architect" model = self.architect_model tools = self._get_architect_tools() key = self.architect_key messages[0] = {"role": "system", "content": self.architect_system_prompt} # Prepare handover context to inject AFTER all tool responses handover_msg = f"HANDOVER FROM EXECUTION ENGINE\n\nReason: {args['reason']}\n\nContext: {args['context']}\n\nYou are now in control of this conversation." pending_user_message = handover_msg obs = "Control transferred to Architect. Handover context will be provided." if debug: self.console.print(Panel(Text(handover_msg), title="[architect]Escalation to Architect[/architect]", border_style="architect")) elif fn == "return_to_engineer": if status: status.update("[engineer]Transferring control back to Engineer...") # Architect returns control to Engineer current_brain = "engineer" model = self.engineer_model tools = self._get_engineer_tools() key = self.engineer_key messages[0] = {"role": "system", "content": self.engineer_system_prompt} # Prepare handover context to inject AFTER all tool responses handover_msg = f"HANDOVER FROM ARCHITECT\n\nSummary: {args['summary']}\n\nYou are now back in control. Continue handling the user's requests." pending_user_message = handover_msg obs = "Control returned to Engineer. Handover summary will be provided." if debug: self.console.print(Panel(Text(handover_msg), title="[engineer]Return to Engineer[/engineer]", border_style="engineer")) elif fn == "list_nodes": obs = self.list_nodes_tool(**args) elif fn == "run_commands": obs = self.run_commands_tool(**args, status=status) elif fn == "get_node_info": obs = self.get_node_info_tool(**args) elif fn == "manage_memory_tool": obs = self.manage_memory_tool(**args) elif fn in self.external_tool_handlers: obs = self.external_tool_handlers[fn](self, **args) else: obs = f"Error: {fn} unknown." messages.append({"tool_call_id": tc.id, "role": "tool", "name": fn, "content": obs}) # Inject pending user message AFTER all tool responses are added if pending_user_message: messages.append({"role": "user", "content": pending_user_message}) if iteration >= self.hard_limit_iterations: self.console.print(f"[error]⛔ Agent reached hard limit ({self.hard_limit_iterations} steps). Forcing stop to prevent infinite loop.[/error]") # Only inject user message if we're not in the middle of tool calls last_msg = messages[-1] if messages else {} if last_msg.get("role") != "assistant" or not last_msg.get("tool_calls"): messages.append({"role": "user", "content": "Hard iteration limit reached. Please provide a summary of your findings so far."}) try: safe_messages = self._sanitize_messages(messages) response = completion(model=model, messages=safe_messages, tools=[], api_key=key) resp_msg = response.choices[0].message messages.append(resp_msg.model_dump(exclude_none=True)) except Exception as e: if status: status.update(f"[error]Error fetching summary: {e}[/error]") printer.warning(f"Failed to fetch final summary from LLM: {e}") except KeyboardInterrupt: if status: status.update("[error]Interrupted! Closing pending tasks...") last_msg = messages[-1] if last_msg.get("tool_calls"): for tc in last_msg["tool_calls"]: messages.append({"tool_call_id": tc.get("id"), "role": "tool", "name": tc.get("function", {}).get("name"), "content": "Operation cancelled by user."}) messages.append({"role": "user", "content": "USER INTERRUPTED. Briefly summarize what you were doing and stop."}) try: safe_messages = self._sanitize_messages(messages) # Use tools=None to force a text summary during interruption response = completion(model=model, messages=safe_messages, tools=None, api_key=key) resp_msg = response.choices[0].message messages.append(resp_msg.model_dump(exclude_none=True)) except Exception: pass finally: # Auto-save session self.save_session(messages, model=model) return { "response": messages[-1].get("content"), "chat_history": messages[1:], "app_related": True, "usage": usage, "responder": current_brain, # "architect" or "engineer" "streamed": streamed_response } def confirm(self, user_input)-
Expand source code
@MethodHook def confirm(self, user_input): return True def delete_session(self, session_id)-
Expand source code
def delete_session(self, session_id): """Deletes a session by ID.""" path = os.path.join(self.sessions_dir, f"{session_id}.json") if os.path.exists(path): os.remove(path) printer.success(f"Session {session_id} deleted.") else: printer.error(f"Session {session_id} not found.")Deletes a session by ID.
def get_last_session_id(self)-
Expand source code
def get_last_session_id(self): """Returns the ID of the most recent session.""" sessions = self._get_sessions() return sessions[0]["id"] if sessions else NoneReturns the ID of the most recent session.
def get_node_info_tool(self, node_name)-
Expand source code
def get_node_info_tool(self, node_name): """Get detailed metadata for a specific node. Passwords are masked.""" try: d = self.config.getitem(node_name, extract=True) if 'password' in d: d['password'] = '***' return json.dumps(d) except Exception as e: return f"Error getting node info: {str(e)}"Get detailed metadata for a specific node. Passwords are masked.
def list_nodes_tool(self, filter_pattern='.*')-
Expand source code
def list_nodes_tool(self, filter_pattern=".*"): """List nodes matching the filter pattern. Returns metadata for <=5 nodes, names only for more.""" try: matched_names = self.config._getallnodes(filter_pattern) if not matched_names: return "No nodes found." if len(matched_names) <= 5: matched_data = self.config.getitems(matched_names, extract=True) res = {} for name, data in matched_data.items(): os_tag = "unknown" if isinstance(data, dict): ts = data.get("tags") if isinstance(ts, dict): os_tag = ts.get("os", "unknown") res[name] = {"os": os_tag} return json.dumps(res) return json.dumps({"count": len(matched_names), "nodes": matched_names, "note": "Use 'get_node_info' for details."}) except Exception as e: return f"Error listing nodes: {str(e)}"List nodes matching the filter pattern. Returns metadata for <=5 nodes, names only for more.
def list_sessions(self)-
Expand source code
def list_sessions(self): """Prints a list of sessions using printer.table.""" sessions = self._get_sessions() if not sessions: printer.info("No saved AI sessions found.") return columns = ["ID", "Title", "Created At", "Model"] rows = [[s["id"], s["title"], s["created_at"], s["model"]] for s in sessions] printer.table("AI Persisted Sessions", columns, rows)Prints a list of sessions using printer.table.
def load_session_data(self, session_id)-
Expand source code
def load_session_data(self, session_id): """Loads a session's raw data by ID.""" path = os.path.join(self.sessions_dir, f"{session_id}.json") if os.path.exists(path): try: with open(path, "r") as f: data = json.load(f) self.session_id = session_id self.session_path = path return data except Exception as e: printer.error(f"Failed to load session {session_id}: {e}") return NoneLoads a session's raw data by ID.
def manage_memory_tool(self, content, action='append')-
Expand source code
def manage_memory_tool(self, content, action="append"): """Save or update long-term memory. Only use when user explicitly requests it.""" if not content or not content.strip(): return "Error: Cannot save empty content to memory." try: mode = "a" if action == "append" else "w" os.makedirs(os.path.dirname(self.memory_path), exist_ok=True) with open(self.memory_path, mode) as f: timestamp = datetime.datetime.now().strftime('%Y-%m-%d %H:%M') f.write(f"\n\n## {timestamp}\n{content.strip()}\n" if action == "append" else content) # Reload memory after update with open(self.memory_path, "r") as f: self.long_term_memory = f.read() return "Memory updated successfully." except PermissionError as e: return f"Error: Permission denied writing to memory file: {e}" except Exception as e: return f"Error updating memory: {str(e)}"Save or update long-term memory. Only use when user explicitly requests it.
def register_ai_tool(self,
tool_definition,
handler,
target='engineer',
engineer_prompt=None,
architect_prompt=None,
status_formatter=None)-
Expand source code
def register_ai_tool(self, tool_definition, handler, target="engineer", engineer_prompt=None, architect_prompt=None, status_formatter=None): """Register an external tool for the AI system. Args: tool_definition (dict): OpenAI-compatible tool definition. handler (callable): Function(ai_instance, **tool_args) -> str. target (str): 'engineer', 'architect', or 'both'. engineer_prompt (str): Extra text for engineer system prompt. architect_prompt (str): Extra text for architect system prompt. status_formatter (callable): Function(args_dict) -> status string. """ name = tool_definition["function"]["name"] if target in ("engineer", "both"): self.external_engineer_tools.append(tool_definition) if target in ("architect", "both"): self.external_architect_tools.append(tool_definition) self.external_tool_handlers[name] = handler if engineer_prompt: self.engineer_prompt_extensions.append(engineer_prompt) if architect_prompt: self.architect_prompt_extensions.append(architect_prompt) if status_formatter: self.tool_status_formatters[name] = status_formatterRegister an external tool for the AI system.
Args
tool_definition:dict- OpenAI-compatible tool definition.
handler:callable- Function(ai_instance, **tool_args) -> str.
target:str- 'engineer', 'architect', or 'both'.
engineer_prompt:str- Extra text for engineer system prompt.
architect_prompt:str- Extra text for architect system prompt.
status_formatter:callable- Function(args_dict) -> status string.
def run_commands_tool(self, nodes_filter, commands, status=None)-
Expand source code
def run_commands_tool(self, nodes_filter, commands, status=None): """Execute commands on nodes matching the filter. Native interactive confirmation for unsafe commands.""" # Handle if commands is a JSON string if isinstance(commands, str): try: commands = json.loads(commands) except ValueError: commands = [c.strip() for c in commands.split('\n') if c.strip()] # Expand multi-line commands within a list (in case the AI packs them) if isinstance(commands, list): expanded_commands = [] for cmd in commands: expanded_commands.extend([c.strip() for c in str(cmd).split('\n') if c.strip()]) commands = expanded_commands else: commands = [str(commands)] # Check command safety natively if not self.trusted_session: unsafe_commands = [cmd for cmd in commands if not self._is_safe_command(cmd)] if unsafe_commands: # Stop the spinner so prompt doesn't get messed up if status: status.stop() # Show ALL commands with unsafe ones highlighted formatted_cmds = [] for cmd in commands: if cmd in unsafe_commands: formatted_cmds.append(f" • [warning]{cmd}[/warning]") else: formatted_cmds.append(f" • {cmd}") panel_content = f"Target: {nodes_filter}\nCommands:\n" + "\n".join(formatted_cmds) # Use print_important if available (for remote bridges) fallback to standard print print_fn = getattr(self.console, "print_important", self.console.print) print_fn(Panel(panel_content, title="[bold warning]⚠️ UNSAFE COMMANDS DETECTED[/bold warning]", border_style="warning")) try: user_resp = self.confirm_handler("[bold warning]Execute? (y: yes / n: no / a: allow all this session / <text>: feedback)[/bold warning]", default="n") except KeyboardInterrupt: if status: status.update("[ai_status]Engineer: Resuming...") self.console.print("[fail]✗ Aborted by user (Ctrl+C).[/fail]") raise # Resume the spinner if status: status.update("[ai_status]Engineer: Processing user response...") user_resp_lower = user_resp.strip().lower() if user_resp_lower in ['a', 'allow']: self.trusted_session = True self.console.print("[pass]✓ Trust Mode Enabled. All future commands in this session will execute without confirmation.[/pass]") elif user_resp_lower in ['y', 'yes']: self.console.print("[pass]✓ Executing...[/pass]") elif user_resp_lower in ['n', 'no', '']: self.console.print("[fail]✗ Execution rejected by user.[/fail]") return "Error: User rejected execution." else: self.console.print(f"[user_prompt]User feedback: [/user_prompt]{user_resp}") return f"User requested changes: {user_resp}. Please adjust the commands based on this feedback and try again." try: matched_names = self.config._getallnodes(nodes_filter) if not matched_names: return "No nodes found matching filter." thisnodes_dict = self.config.getitems(matched_names, extract=True) result = nodes(thisnodes_dict, config=self.config).run(commands) return self._truncate(json.dumps(result)) except Exception as e: return f"Error executing commands: {str(e)}"Execute commands on nodes matching the filter. Native interactive confirmation for unsafe commands.
def save_session(self, history, title=None, model=None)-
Expand source code
def save_session(self, history, title=None, model=None): """Saves current history to the session file.""" if not self.session_id: # Generate ID from first user query if available first_user_msg = next((m["content"] for m in history if m["role"] == "user"), "new-session") self.session_id = self._generate_session_id(first_user_msg) self.session_path = os.path.join(self.sessions_dir, f"{self.session_id}.json") # If it's a new file, we might want to set a better title if not os.path.exists(self.session_path) and not title: raw_title = next((m["content"] for m in history if m["role"] == "user"), "New Session") # Clean title: remove newlines, multiple spaces clean_title = " ".join(raw_title.split()) if len(clean_title) > 40: title = clean_title[:37].strip() + "..." else: title = clean_title try: # Read existing metadata if it exists metadata = {} if os.path.exists(self.session_path): with open(self.session_path, "r") as f: metadata = json.load(f) metadata.update({ "id": self.session_id, "title": title or metadata.get("title", "New Session"), "created_at": metadata.get("created_at", datetime.datetime.now().isoformat()), "updated_at": datetime.datetime.now().isoformat(), "model": model or metadata.get("model", self.engineer_model), "history": history }) with open(self.session_path, "w") as f: json.dump(metadata, f, indent=4) except Exception as e: printer.error(f"Failed to save session: {e}") except Exception as e: printer.error(f"Failed to save session: {e}")Saves current history to the session file.
class configfile (conf=None, key=None)-
Expand source code
@ClassHook class configfile: ''' This class generates a configfile object. Containts a dictionary storing, config, nodes and profiles, normaly used by connection manager. ### Attributes: - file (str): Path/file to config file. - key (str): Path/file to RSA key file. - config (dict): Dictionary containing information of connection manager configuration. - connections (dict): Dictionary containing all the nodes added to connection manager. - profiles (dict): Dictionary containing all the profiles added to connection manager. - privatekey (obj): Object containing the private key to encrypt passwords. - publickey (obj): Object containing the public key to decrypt passwords. ''' def __init__(self, conf = None, key = None): ''' ### Optional Parameters: - conf (str): Path/file to config file. If left empty default path is ~/.config/conn/config.yaml - key (str): Path/file to RSA key file. If left empty default path is ~/.config/conn/.osk ''' home = os.path.expanduser("~") defaultdir = home + '/.config/conn' if conf is None: # Standard path: use ~/.config/conn and respect .folder redirection self.anchor_path = defaultdir self.defaultdir = defaultdir Path(defaultdir).mkdir(parents=True, exist_ok=True) pathfile = defaultdir + '/.folder' try: with open(pathfile, "r") as f: configdir = f.read().strip() except (FileNotFoundError, IOError): with open(pathfile, "w") as f: f.write(str(defaultdir)) configdir = defaultdir self.defaultdir = configdir self.file = configdir + '/config.yaml' self.key = key or (configdir + '/.osk') # Ensure redirected directories exist Path(configdir).mkdir(parents=True, exist_ok=True) Path(f"{configdir}/plugins").mkdir(parents=True, exist_ok=True) # Backwards compatibility: Migrate from JSON to YAML only for default path legacy_json = configdir + '/config.json' legacy_noext = configdir + '/config' legacy_file = None if os.path.exists(legacy_json): legacy_file = legacy_json elif os.path.exists(legacy_noext): legacy_file = legacy_noext if not os.path.exists(self.file) and legacy_file: try: with open(legacy_file, 'r') as f: old_data = json.load(f) if not self._validate_config(old_data): printer.warning(f"Legacy config {legacy_file} has invalid structure, skipping migration.") else: with open(self.file, 'w') as f: yaml.dump(old_data, f, Dumper=NoAliasDumper, default_flow_style=False, sort_keys=False) # Verify the written YAML can be read back correctly with open(self.file, 'r') as f: verify = yaml.safe_load(f) if not self._validate_config(verify): os.remove(self.file) printer.warning("YAML verification failed after migration, keeping legacy config.") else: # Note: cachefile is derived later, we use temp one for migration sync temp_cache = configdir + '/.config.cache.json' with open(temp_cache, 'w') as f: json.dump(old_data, f) shutil.move(legacy_file, legacy_file + ".backup") printer.success(f"Migrated legacy config ({len(old_data.get('connections',{}))} folders/nodes) into YAML and Cache successfully!") except Exception as e: if os.path.exists(self.file): try: os.remove(self.file) except OSError: pass printer.warning(f"Failed to migrate legacy config: {e}") else: # Custom path (common in tests): isolate everything to the conf parent directory self.file = os.path.abspath(conf) configdir = os.path.dirname(self.file) self.anchor_path = configdir self.defaultdir = configdir self.key = os.path.abspath(key) if key else (configdir + '/.osk') # Sidecar files always live next to the config file (or in the redirected configdir) self.cachefile = configdir + '/.config.cache.json' self.fzf_cachefile = configdir + '/.fzf_nodes_cache.txt' self.folders_cachefile = configdir + '/.folders_cache.txt' self.profiles_cachefile = configdir + '/.profiles_cache.txt' if os.path.exists(self.file): config = self._loadconfig(self.file) else: config = self._createconfig(self.file) self.config = config["config"] self.connections = config["connections"] self.profiles = config["profiles"] if not os.path.exists(self.key): self._createkey(self.key) with open(self.key) as f: self.privatekey = RSA.import_key(f.read()) self.publickey = self.privatekey.publickey() # Self-heal text caches if they are missing if not os.path.exists(self.fzf_cachefile) or not os.path.exists(self.folders_cachefile) or not os.path.exists(self.profiles_cachefile): self._generate_nodes_cache() def _validate_config(self, data): """Verify config data has the required structure.""" if not isinstance(data, dict): return False required = {"config", "connections", "profiles"} return required.issubset(data.keys()) def _loadconfig(self, conf): #Loads config file using dual cache cache_exists = os.path.exists(self.cachefile) yaml_time = os.path.getmtime(conf) if os.path.exists(conf) else 0 cache_time = os.path.getmtime(self.cachefile) if cache_exists else 0 if not cache_exists or yaml_time > cache_time: with open(conf, 'r') as f: data = yaml.safe_load(f) if not self._validate_config(data): # YAML is broken, try to recover from cache if cache_exists: printer.warning("Config file appears corrupt, recovering from cache...") with open(self.cachefile, 'r') as f: data = json.load(f) if self._validate_config(data): # Re-write the YAML from good cache with open(conf, 'w') as f: yaml.dump(data, f, Dumper=NoAliasDumper, default_flow_style=False, sort_keys=False) return data # Both broken or no cache - create fresh printer.error("Config file is corrupt and no valid cache exists. Creating default config.") return self._createconfig(conf) try: with open(self.cachefile, 'w') as f: json.dump(data, f) except Exception: pass return data else: with open(self.cachefile, 'r') as f: data = json.load(f) if not self._validate_config(data): # Cache broken, try yaml with open(conf, 'r') as f: data = yaml.safe_load(f) if self._validate_config(data): return data # Both broken printer.error("Both config and cache are corrupt. Creating default config.") return self._createconfig(conf) return data def _createconfig(self, conf): #Create config file (always writes defaults, safe for recovery) defaultconfig = {'config': {'case': False, 'idletime': 30, 'fzf': False}, 'connections': {}, 'profiles': { "default": { "host":"", "protocol":"ssh", "port":"", "user":"", "password":"", "options":"", "logs":"", "tags": "", "jumphost":""}}} with open(conf, "w") as f: yaml.dump(defaultconfig, f, Dumper=NoAliasDumper, default_flow_style=False, sort_keys=False) os.chmod(conf, 0o600) try: with open(self.cachefile, 'w') as f: json.dump(defaultconfig, f) except Exception: pass return defaultconfig @MethodHook def _saveconfig(self, conf): #Save config file atomically to prevent corruption newconfig = {"config":{}, "connections": {}, "profiles": {}} newconfig["config"] = self.config newconfig["connections"] = self.connections newconfig["profiles"] = self.profiles tmpfile = conf + '.tmp' try: with open(tmpfile, "w") as f: yaml.dump(newconfig, f, Dumper=NoAliasDumper, default_flow_style=False, sort_keys=False) # Atomic replace: only overwrite original if write succeeded shutil.move(tmpfile, conf) with open(self.cachefile, "w") as f: json.dump(newconfig, f) self._generate_nodes_cache() except (IOError, OSError) as e: printer.error(f"Failed to save config: {e}") # Clean up temp file if it exists if os.path.exists(tmpfile): try: os.remove(tmpfile) except OSError: pass return 1 return 0 def _generate_nodes_cache(self, nodes=None, folders=None, profiles=None): try: if nodes is None: nodes = self._getallnodes() if folders is None: folders = self._getallfolders() if profiles is None: profiles = list(self.profiles.keys()) with open(self.fzf_cachefile, "w") as f: f.write("\n".join(nodes)) with open(self.folders_cachefile, "w") as f: f.write("\n".join(folders)) with open(self.profiles_cachefile, "w") as f: f.write("\n".join(profiles)) except Exception: pass def _createkey(self, keyfile): #Create key file key = RSA.generate(2048) with open(keyfile,'wb') as f: f.write(key.export_key('PEM')) f.close() os.chmod(keyfile, 0o600) return key @MethodHook def _explode_unique(self, unique): #Divide unique name into folder, subfolder and id uniques = unique.split("@") if not unique.startswith("@"): result = {"id": uniques[0]} else: result = {} if len(uniques) == 2: result["folder"] = uniques[1] if result["folder"] == "": return False elif len(uniques) == 3: result["folder"] = uniques[2] result["subfolder"] = uniques[1] if result["folder"] == "" or result["subfolder"] == "": return False elif len(uniques) > 3: return False return result @MethodHook def getitem(self, unique, keys = None, extract = False): ''' Get an node or a group of nodes from configfile which can be passed to node/nodes class ### Parameters: - unique (str): Unique name of the node or folder in config using connection manager style: node[@subfolder][@folder] or [@subfolder]@folder ### Optional Parameters: - keys (list): In case you pass a folder as unique, you can filter nodes inside the folder passing a list. - extract (bool): If True, extract information from profiles. Default False. ### Returns: dict: Dictionary containing information of node or multiple dictionaries of multiple nodes. ''' uniques = self._explode_unique(unique) if unique.startswith("@"): if uniques.keys() >= {"folder", "subfolder"}: folder = self.connections[uniques["folder"]][uniques["subfolder"]] else: folder = self.connections[uniques["folder"]] newfolder = deepcopy(folder) newfolder.pop("type") for node_name in folder.keys(): if node_name == "type": continue if "type" in newfolder[node_name].keys(): if newfolder[node_name]["type"] == "subfolder": newfolder.pop(node_name) else: newfolder[node_name].pop("type") if keys != None: newfolder = dict((k, newfolder[k]) for k in keys) if extract: for node_name, node_keys in newfolder.items(): for key, value in node_keys.items(): profile = re.search("^@(.*)", str(value)) if profile: try: newfolder[node_name][key] = self.profiles[profile.group(1)][key] except KeyError: newfolder[node_name][key] = "" elif value == '' and key == "protocol": try: newfolder[node_name][key] = self.profiles["default"][key] except KeyError: newfolder[node_name][key] = "ssh" newfolder = {"{}{}".format(k,unique):v for k,v in newfolder.items()} return newfolder else: if uniques.keys() >= {"folder", "subfolder"}: node = self.connections[uniques["folder"]][uniques["subfolder"]][uniques["id"]] elif "folder" in uniques.keys(): node = self.connections[uniques["folder"]][uniques["id"]] else: node = self.connections[uniques["id"]] newnode = deepcopy(node) newnode.pop("type") if extract: for key, value in newnode.items(): profile = re.search("^@(.*)", str(value)) if profile: try: newnode[key] = self.profiles[profile.group(1)][key] except KeyError: newnode[key] = "" elif value == '' and key == "protocol": try: newnode[key] = self.profiles["default"][key] except KeyError: newnode[key] = "ssh" return newnode @MethodHook def getitems(self, uniques, extract = False): ''' Get a group of nodes from configfile which can be passed to node/nodes class ### Parameters: - uniques (str/list): String name that will match hostnames from the connection manager. It can be a list of strings. ### Optional Parameters: - extract (bool): If True, extract information from profiles. Default False. ### Returns: dict: Dictionary containing information of node or multiple dictionaries of multiple nodes. ''' nodes = {} if isinstance(uniques, str): uniques = [uniques] for i in uniques: if isinstance(i, dict): name = list(i.keys())[0] mylist = i[name] if not self.config["case"]: name = name.lower() mylist = [item.lower() for item in mylist] this = self.getitem(name, mylist, extract = extract) nodes.update(this) elif i.startswith("@"): if not self.config["case"]: i = i.lower() this = self.getitem(i, extract = extract) nodes.update(this) else: if not self.config["case"]: i = i.lower() this = self.getitem(i, extract = extract) nodes[i] = this return nodes @MethodHook def _connections_add(self,*, id, host, folder='', subfolder='', options='', logs='', password='', port='', protocol='', user='', tags='', jumphost='', type = "connection" ): #Add connection from config if folder == '': self.connections[id] = {"host": host, "options": options, "logs": logs, "password": password, "port": port, "protocol": protocol, "user": user, "tags": tags,"jumphost": jumphost,"type": type} elif folder != '' and subfolder == '': self.connections[folder][id] = {"host": host, "options": options, "logs": logs, "password": password, "port": port, "protocol": protocol, "user": user, "tags": tags, "jumphost": jumphost, "type": type} elif folder != '' and subfolder != '': self.connections[folder][subfolder][id] = {"host": host, "options": options, "logs": logs, "password": password, "port": port, "protocol": protocol, "user": user, "tags": tags, "jumphost": jumphost, "type": type} @MethodHook def _connections_del(self,*, id, folder='', subfolder=''): #Delete connection from config if folder == '': del self.connections[id] elif folder != '' and subfolder == '': del self.connections[folder][id] elif folder != '' and subfolder != '': del self.connections[folder][subfolder][id] @MethodHook def _folder_add(self,*, folder, subfolder = ''): #Add Folder from config if subfolder == '': if folder not in self.connections: self.connections[folder] = {"type": "folder"} else: if subfolder not in self.connections[folder]: self.connections[folder][subfolder] = {"type": "subfolder"} @MethodHook def _folder_del(self,*, folder, subfolder=''): #Delete folder from config if subfolder == '': del self.connections[folder] else: del self.connections[folder][subfolder] @MethodHook def _profiles_add(self,*, id, host = '', options='', logs='', password='', port='', protocol='', user='', tags='', jumphost='' ): #Add profile from config self.profiles[id] = {"host": host, "options": options, "logs": logs, "password": password, "port": port, "protocol": protocol, "user": user, "tags": tags, "jumphost": jumphost} @MethodHook def _profiles_del(self,*, id ): #Delete profile from config del self.profiles[id] @MethodHook def _getallnodes(self, filter = None): #get all nodes on configfile nodes = [] layer1 = [k for k,v in self.connections.items() if isinstance(v, dict) and v.get("type") == "connection"] folders = [k for k,v in self.connections.items() if isinstance(v, dict) and v.get("type") == "folder"] nodes.extend(layer1) for f in folders: layer2 = [k + "@" + f for k,v in self.connections[f].items() if isinstance(v, dict) and v.get("type") == "connection"] nodes.extend(layer2) subfolders = [k for k,v in self.connections[f].items() if isinstance(v, dict) and v.get("type") == "subfolder"] for s in subfolders: layer3 = [k + "@" + s + "@" + f for k,v in self.connections[f][s].items() if isinstance(v, dict) and v.get("type") == "connection"] nodes.extend(layer3) if filter: if isinstance(filter, str): nodes = [item for item in nodes if re.search(filter, item)] elif isinstance(filter, list): nodes = [item for item in nodes if any(re.search(pattern, item) for pattern in filter)] else: printer.error("Invalid filter: must be a string or a list of strings.") sys.exit(1) return nodes @MethodHook def _getallnodesfull(self, filter = None, extract = True): #get all nodes on configfile with all their attributes. nodes = {} layer1 = {k:v for k,v in self.connections.items() if isinstance(v, dict) and v.get("type") == "connection"} folders = [k for k,v in self.connections.items() if isinstance(v, dict) and v.get("type") == "folder"] nodes.update(layer1) for f in folders: layer2 = {k + "@" + f:v for k,v in self.connections[f].items() if isinstance(v, dict) and v.get("type") == "connection"} nodes.update(layer2) subfolders = [k for k,v in self.connections[f].items() if isinstance(v, dict) and v.get("type") == "subfolder"] for s in subfolders: layer3 = {k + "@" + s + "@" + f:v for k,v in self.connections[f][s].items() if isinstance(v, dict) and v.get("type") == "connection"} nodes.update(layer3) if filter: if isinstance(filter, str): filter = "^(?!.*@).+$" if filter == "@" else filter nodes = {k: v for k, v in nodes.items() if re.search(filter, k)} elif isinstance(filter, list): filter = ["^(?!.*@).+$" if item == "@" else item for item in filter] nodes = {k: v for k, v in nodes.items() if any(re.search(pattern, k) for pattern in filter)} else: printer.error("Invalid filter: must be a string or a list of strings.") sys.exit(1) if extract: for node, keys in nodes.items(): for key, value in keys.items(): profile = re.search("^@(.*)", str(value)) if profile: try: nodes[node][key] = self.profiles[profile.group(1)][key] except KeyError: nodes[node][key] = "" elif value == '' and key == "protocol": try: nodes[node][key] = self.profiles["default"][key] except KeyError: nodes[node][key] = "ssh" return nodes @MethodHook def _getallfolders(self): #get all folders on configfile folders = ["@" + k for k,v in self.connections.items() if isinstance(v, dict) and v.get("type") == "folder"] subfolders = [] for f in folders: s = ["@" + k + f for k,v in self.connections[f[1:]].items() if isinstance(v, dict) and v.get("type") == "subfolder"] subfolders.extend(s) folders.extend(subfolders) return folders @MethodHook def _profileused(self, profile): #Return all the nodes that uses this profile. nodes = [] layer1 = [k for k,v in self.connections.items() if isinstance(v, dict) and v.get("type") == "connection" and ("@" + profile in v.values() or ( isinstance(v.get("password"),list) and "@" + profile in v.get("password")))] folders = [k for k,v in self.connections.items() if isinstance(v, dict) and v.get("type") == "folder"] nodes.extend(layer1) for f in folders: layer2 = [k + "@" + f for k,v in self.connections[f].items() if isinstance(v, dict) and v.get("type") == "connection" and ("@" + profile in v.values() or ( isinstance(v.get("password"),list) and "@" + profile in v.get("password")))] nodes.extend(layer2) subfolders = [k for k,v in self.connections[f].items() if isinstance(v, dict) and v.get("type") == "subfolder"] for s in subfolders: layer3 = [k + "@" + s + "@" + f for k,v in self.connections[f][s].items() if isinstance(v, dict) and v.get("type") == "connection" and ("@" + profile in v.values() or ( isinstance(v.get("password"),list) and "@" + profile in v.get("password")))] nodes.extend(layer3) return nodes @MethodHook def encrypt(self, password, keyfile=None): ''' Encrypts password using RSA keyfile ### Parameters: - password (str): Plaintext password to encrypt. ### Optional Parameters: - keyfile (str): Path/file to keyfile. Default is config keyfile. ### Returns: str: Encrypted password. ''' if keyfile is None: keyfile = self.key with open(keyfile) as f: key = RSA.import_key(f.read()) f.close() publickey = key.publickey() encryptor = PKCS1_OAEP.new(publickey) password = encryptor.encrypt(password.encode("utf-8")) return str(password)This class generates a configfile object. Containts a dictionary storing, config, nodes and profiles, normaly used by connection manager.
Attributes:
- file (str): Path/file to config file. - key (str): Path/file to RSA key file. - config (dict): Dictionary containing information of connection manager configuration. - connections (dict): Dictionary containing all the nodes added to connection manager. - profiles (dict): Dictionary containing all the profiles added to connection manager. - privatekey (obj): Object containing the private key to encrypt passwords. - publickey (obj): Object containing the public key to decrypt passwords.Optional Parameters:
- conf (str): Path/file to config file. If left empty default path is ~/.config/conn/config.yaml - key (str): Path/file to RSA key file. If left empty default path is ~/.config/conn/.oskMethods
def encrypt(self, password, keyfile=None)-
Expand source code
@MethodHook def encrypt(self, password, keyfile=None): ''' Encrypts password using RSA keyfile ### Parameters: - password (str): Plaintext password to encrypt. ### Optional Parameters: - keyfile (str): Path/file to keyfile. Default is config keyfile. ### Returns: str: Encrypted password. ''' if keyfile is None: keyfile = self.key with open(keyfile) as f: key = RSA.import_key(f.read()) f.close() publickey = key.publickey() encryptor = PKCS1_OAEP.new(publickey) password = encryptor.encrypt(password.encode("utf-8")) return str(password)Encrypts password using RSA keyfile
Parameters:
- password (str): Plaintext password to encrypt.Optional Parameters:
- keyfile (str): Path/file to keyfile. Default is config keyfile.Returns:
str: Encrypted password. def getitem(self, unique, keys=None, extract=False)-
Expand source code
@MethodHook def getitem(self, unique, keys = None, extract = False): ''' Get an node or a group of nodes from configfile which can be passed to node/nodes class ### Parameters: - unique (str): Unique name of the node or folder in config using connection manager style: node[@subfolder][@folder] or [@subfolder]@folder ### Optional Parameters: - keys (list): In case you pass a folder as unique, you can filter nodes inside the folder passing a list. - extract (bool): If True, extract information from profiles. Default False. ### Returns: dict: Dictionary containing information of node or multiple dictionaries of multiple nodes. ''' uniques = self._explode_unique(unique) if unique.startswith("@"): if uniques.keys() >= {"folder", "subfolder"}: folder = self.connections[uniques["folder"]][uniques["subfolder"]] else: folder = self.connections[uniques["folder"]] newfolder = deepcopy(folder) newfolder.pop("type") for node_name in folder.keys(): if node_name == "type": continue if "type" in newfolder[node_name].keys(): if newfolder[node_name]["type"] == "subfolder": newfolder.pop(node_name) else: newfolder[node_name].pop("type") if keys != None: newfolder = dict((k, newfolder[k]) for k in keys) if extract: for node_name, node_keys in newfolder.items(): for key, value in node_keys.items(): profile = re.search("^@(.*)", str(value)) if profile: try: newfolder[node_name][key] = self.profiles[profile.group(1)][key] except KeyError: newfolder[node_name][key] = "" elif value == '' and key == "protocol": try: newfolder[node_name][key] = self.profiles["default"][key] except KeyError: newfolder[node_name][key] = "ssh" newfolder = {"{}{}".format(k,unique):v for k,v in newfolder.items()} return newfolder else: if uniques.keys() >= {"folder", "subfolder"}: node = self.connections[uniques["folder"]][uniques["subfolder"]][uniques["id"]] elif "folder" in uniques.keys(): node = self.connections[uniques["folder"]][uniques["id"]] else: node = self.connections[uniques["id"]] newnode = deepcopy(node) newnode.pop("type") if extract: for key, value in newnode.items(): profile = re.search("^@(.*)", str(value)) if profile: try: newnode[key] = self.profiles[profile.group(1)][key] except KeyError: newnode[key] = "" elif value == '' and key == "protocol": try: newnode[key] = self.profiles["default"][key] except KeyError: newnode[key] = "ssh" return newnodeGet an node or a group of nodes from configfile which can be passed to node/nodes class
Parameters:
- unique (str): Unique name of the node or folder in config using connection manager style: node[@subfolder][@folder] or [@subfolder]@folderOptional Parameters:
- keys (list): In case you pass a folder as unique, you can filter nodes inside the folder passing a list. - extract (bool): If True, extract information from profiles. Default False.Returns:
dict: Dictionary containing information of node or multiple dictionaries of multiple nodes. def getitems(self, uniques, extract=False)-
Expand source code
@MethodHook def getitems(self, uniques, extract = False): ''' Get a group of nodes from configfile which can be passed to node/nodes class ### Parameters: - uniques (str/list): String name that will match hostnames from the connection manager. It can be a list of strings. ### Optional Parameters: - extract (bool): If True, extract information from profiles. Default False. ### Returns: dict: Dictionary containing information of node or multiple dictionaries of multiple nodes. ''' nodes = {} if isinstance(uniques, str): uniques = [uniques] for i in uniques: if isinstance(i, dict): name = list(i.keys())[0] mylist = i[name] if not self.config["case"]: name = name.lower() mylist = [item.lower() for item in mylist] this = self.getitem(name, mylist, extract = extract) nodes.update(this) elif i.startswith("@"): if not self.config["case"]: i = i.lower() this = self.getitem(i, extract = extract) nodes.update(this) else: if not self.config["case"]: i = i.lower() this = self.getitem(i, extract = extract) nodes[i] = this return nodesGet a group of nodes from configfile which can be passed to node/nodes class
Parameters:
- uniques (str/list): String name that will match hostnames from the connection manager. It can be a list of strings.Optional Parameters:
- extract (bool): If True, extract information from profiles. Default False.Returns:
dict: Dictionary containing information of node or multiple dictionaries of multiple nodes.
class node (unique,
host,
options='',
logs='',
password='',
port='',
protocol='',
user='',
config='',
tags='',
jumphost='')-
Expand source code
@ClassHook class node: ''' This class generates a node object. Containts all the information and methods to connect and interact with a device using ssh or telnet. ### Attributes: - output (str): Output of the commands you ran with run or test method. - result(bool): True if expected value is found after running the commands using test method. - status (int): 0 if the method run or test run successfully. 1 if connection failed. 2 if expect timeouts without prompt or EOF. ''' def __init__(self, unique, host, options='', logs='', password='', port='', protocol='', user='', config='', tags='', jumphost=''): ''' ### Parameters: - unique (str): Unique name to assign to the node. - host (str): IP address or hostname of the node. ### Optional Parameters: - options (str): Additional options to pass the ssh/telnet for connection. - logs (str): Path/file for storing the logs. You can use ${unique},${host}, ${port}, ${user}, ${protocol} as variables. - password (str): Encrypted or plaintext password. - port (str): Port to connect to node, default 22 for ssh and 23 for telnet. - protocol (str): Select ssh, telnet, kubectl or docker. Default is ssh. - user (str): Username to of the node. - config (obj): Pass the object created with class configfile with key for decryption and extra configuration if you are using connection manager. - tags (dict) : Tags useful for automation and personal porpuse like "os", "prompt" and "screenleght_command" - jumphost (str): Reference another node to be used as a jumphost ''' if config == '': self.idletime = 0 self.key = None else: self.idletime = config.config["idletime"] self.key = config.key self.unique = unique attr = {"host": host, "logs": logs, "options":options, "port": port, "protocol": protocol, "user": user, "tags": tags, "jumphost": jumphost} for key in attr: profile = re.search("^@(.*)", str(attr[key])) if profile and config != '': try: setattr(self,key,config.profiles[profile.group(1)][key]) except KeyError: setattr(self,key,"") elif attr[key] == '' and key == "protocol": try: setattr(self,key,config.profiles["default"][key]) except (KeyError, AttributeError): setattr(self,key,"ssh") else: setattr(self,key,attr[key]) if isinstance(password,list): self.password = [] for i, s in enumerate(password): profile = re.search("^@(.*)", password[i]) if profile and config != '': self.password.append(config.profiles[profile.group(1)]["password"]) else: self.password.append(password[i]) else: self.password = [password] if self.jumphost != "" and config != '': self.jumphost = config.getitem(self.jumphost) for key in self.jumphost: profile = re.search("^@(.*)", str(self.jumphost[key])) if profile: try: self.jumphost[key] = config.profiles[profile.group(1)][key] except KeyError: self.jumphost[key] = "" elif self.jumphost[key] == '' and key == "protocol": try: self.jumphost[key] = config.profiles["default"][key] except KeyError: self.jumphost[key] = "ssh" if isinstance(self.jumphost["password"],list): jumphost_password = [] for i, s in enumerate(self.jumphost["password"]): profile = re.search("^@(.*)", self.jumphost["password"][i]) if profile: jumphost_password.append(config.profiles[profile.group(1)]["password"]) else: jumphost_password.append(self.jumphost["password"][i]) self.jumphost["password"] = jumphost_password else: self.jumphost["password"] = [self.jumphost["password"]] if self.jumphost["password"] != [""]: self.password = self.jumphost["password"] + self.password if self.jumphost["protocol"] == "ssh": jumphost_cmd = self.jumphost["protocol"] + " -W %h:%p" if self.jumphost["port"] != '': jumphost_cmd = jumphost_cmd + " -p " + self.jumphost["port"] if self.jumphost["options"] != '': jumphost_cmd = jumphost_cmd + " " + self.jumphost["options"] if self.jumphost["user"] == '': jumphost_cmd = jumphost_cmd + " {}".format(self.jumphost["host"]) else: jumphost_cmd = jumphost_cmd + " {}".format("@".join([self.jumphost["user"],self.jumphost["host"]])) self.jumphost = f"-o ProxyCommand=\"{jumphost_cmd}\"" else: self.jumphost = "" @MethodHook def _passtx(self, passwords, *, keyfile=None): # decrypts passwords, used by other methdos. dpass = [] if keyfile is None: keyfile = self.key if keyfile is not None: with open(keyfile) as f: key = RSA.import_key(f.read()) decryptor = PKCS1_OAEP.new(key) for passwd in passwords: if not re.match('^b[\"\'].+[\"\']$', passwd): dpass.append(passwd) else: try: decrypted = decryptor.decrypt(ast.literal_eval(passwd)).decode("utf-8") dpass.append(decrypted) except Exception: printer.error("Decryption failed: Missing or corrupted key.") printer.info("Verify your RSA key and configuration settings.") sys.exit(1) return dpass @MethodHook def _logfile(self, logfile = None): # translate logs variables and generate logs path. if logfile == None: logfile = self.logs logfile = logfile.replace("${unique}", self.unique) logfile = logfile.replace("${host}", self.host) logfile = logfile.replace("${port}", self.port) logfile = logfile.replace("${user}", self.user) logfile = logfile.replace("${protocol}", self.protocol) now = datetime.datetime.now() dateconf = re.search(r'\$\{date \'(.*)\'}', logfile) if dateconf: logfile = re.sub(r'\$\{date (.*)}',now.strftime(dateconf.group(1)), logfile) return logfile @MethodHook def _logclean(self, logfile, var = False): #Remove special ascii characters and other stuff from logfile. if var == False: t = open(logfile, "r").read() else: t = logfile while t.find("\b") != -1: t = re.sub('[^\b]\b', '', t) t = t.replace("\n","",1) t = t.replace("\a","") t = t.replace('\n\n', '\n') t = re.sub(r'.\[K', '', t) ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/ ]*[@-~])') t = ansi_escape.sub('', t) t = t.lstrip(" \n\r") t = t.replace("\r","") t = t.replace("\x0E","") t = t.replace("\x0F","") if var == False: d = open(logfile, "w") d.write(t) d.close() return else: return t @MethodHook def _savelog(self): '''Save the log buffer to the file at regular intervals if there are changes.''' t = threading.current_thread() prev_size = 0 # Store the previous size of the buffer while getattr(t, "do_run", True): # Check if thread is signaled to stop current_size = self.mylog.tell() # Current size of the buffer # Only save if the buffer size has changed if current_size != prev_size: with open(self.logfile, "w") as f: # Use "w" to overwrite the file f.write(self._logclean(self.mylog.getvalue().decode(), True)) prev_size = current_size # Update the previous size sleep(5) @MethodHook def _filter(self, a): #Set time for last input when using interact self.lastinput = time() return a @MethodHook def _keepalive(self): #Send keepalive ctrl+e when idletime passed without new inputs on interact self.lastinput = time() t = threading.current_thread() while True: if time() - self.lastinput >= self.idletime: self.child.sendcontrol("e") self.lastinput = time() sleep(1) @MethodHook def interact(self, debug = False, logger = None): ''' Allow user to interact with the node directly, mostly used by connection manager. ### Optional Parameters: - debug (bool): If True, display all the connecting information before interact. Default False. - logger (callable): Optional callback for status reporting. ''' connect = self._connect(debug = debug, logger = logger) if connect == True: size = re.search('columns=([0-9]+).*lines=([0-9]+)',str(os.get_terminal_size())) self.child.setwinsize(int(size.group(2)),int(size.group(1))) if logger: logger("success", "Connected to " + self.unique + " at " + self.host + (":" if self.port != '' else '') + self.port + " via: " + self.protocol) if 'logfile' in dir(self): # Initialize self.mylog if not 'mylog' in dir(self): self.mylog = io.BytesIO() self.child.logfile_read = self.mylog # Start the _savelog thread log_thread = threading.Thread(target=self._savelog) log_thread.daemon = True log_thread.start() if 'missingtext' in dir(self): print(self.child.after.decode(), end='') if self.idletime > 0: x = threading.Thread(target=self._keepalive) x.daemon = True x.start() if debug: print(self.mylog.getvalue().decode()) self.child.interact(input_filter=self._filter) if 'logfile' in dir(self): with open(self.logfile, "w") as f: f.write(self._logclean(self.mylog.getvalue().decode(), True)) else: if logger: logger("error", str(connect)) else: printer.error(f"Connection failed: {str(connect)}") sys.exit(1) @MethodHook def run(self, commands, vars = None,*, folder = '', prompt = r'>$|#$|\$$|>.$|#.$|\$.$', stdout = False, timeout = 10, logger = None): ''' Run a command or list of commands on the node and return the output. ### Parameters: - commands (str/list): Commands to run on the node. Should be str or a list of str. You can use variables as {varname} and defining them in optional parameter vars. ### Optional Parameters: - vars (dict): Dictionary containing the definition of variables used in commands parameter. Keys: Variable names. Values: strings. ### Optional Named Parameters: - folder (str): Path where output log should be stored, leave empty to disable logging. - prompt (str): Prompt to be expected after a command is finished running. Usually linux uses ">" or EOF while routers use ">" or "#". The default value should work for most nodes. Change it if your connection need some special symbol. - stdout (bool):Set True to send the command output to stdout. default False. - timeout (int):Time in seconds for expect to wait for prompt/EOF. default 10. ### Returns: str: Output of the commands you ran on the node. ''' connect = self._connect(timeout = timeout, logger = logger) now = datetime.datetime.now().strftime('%Y-%m-%d_%H%M%S') if connect == True: if logger: logger("success", "Connected to " + self.unique + " at " + self.host + (":" if self.port != '' else '') + self.port + " via: " + self.protocol) # Attempt to set the terminal size try: self.child.setwinsize(65535, 65535) except Exception: try: self.child.setwinsize(10000, 10000) except Exception: pass if "prompt" in self.tags: prompt = self.tags["prompt"] expects = [prompt, pexpect.EOF, pexpect.TIMEOUT] output = '' status = '' if not isinstance(commands, list): commands = [commands] if "screen_length_command" in self.tags: commands.insert(0, self.tags["screen_length_command"]) self.mylog = io.BytesIO() self.child.logfile_read = self.mylog for c in commands: if vars is not None: c = c.format(**vars) result = self.child.expect(expects, timeout = timeout) self.child.sendline(c) if result == 2: break if not result == 2: result = self.child.expect(expects, timeout = timeout) self.child.close() output = self._logclean(self.mylog.getvalue().decode(), True) if logger: logger("output", output) if folder != '': with open(folder + "/" + self.unique + "_" + now + ".txt", "w") as f: f.write(output) f.close() self.output = output if result == 2: self.status = 2 else: self.status = 0 return output else: self.output = connect self.status = 1 if logger: logger("error", f"Connection failed: {connect}") if folder != '': with open(folder + "/" + self.unique + "_" + now + ".txt", "w") as f: f.write(connect) f.close() return connect @MethodHook def test(self, commands, expected, vars = None,*, prompt = r'>$|#$|\$$|>.$|#.$|\$.$', timeout = 10, logger = None): ''' Run a command or list of commands on the node, then check if expected value appears on the output after the last command. ### Parameters: - commands (str/list): Commands to run on the node. Should be str or a list of str. You can use variables as {varname} and defining them in optional parameter vars. - expected (str) : Expected text to appear after running all the commands on the node.You can use variables as {varname} and defining them in optional parameter vars. ### Optional Parameters: - vars (dict): Dictionary containing the definition of variables used in commands and expected parameters. Keys: Variable names. Values: strings. ### Optional Named Parameters: - prompt (str): Prompt to be expected after a command is finished running. Usually linux uses ">" or EOF while routers use ">" or "#". The default value should work for most nodes. Change it if your connection need some special symbol. - timeout (int):Time in seconds for expect to wait for prompt/EOF. default 10. ### Returns: bool: true if expected value is found after running the commands false if prompt is found before. ''' connect = self._connect(timeout = timeout, logger = logger) if connect == True: if logger: logger("success", "Connected to " + self.unique + " at " + self.host + (":" if self.port != '' else '') + self.port + " via: " + self.protocol) # Attempt to set the terminal size try: self.child.setwinsize(65535, 65535) except Exception: try: self.child.setwinsize(10000, 10000) except Exception: pass if "prompt" in self.tags: prompt = self.tags["prompt"] expects = [prompt, pexpect.EOF, pexpect.TIMEOUT] output = '' if not isinstance(commands, list): commands = [commands] if not isinstance(expected, list): expected = [expected] if "screen_length_command" in self.tags: commands.insert(0, self.tags["screen_length_command"]) self.mylog = io.BytesIO() self.child.logfile_read = self.mylog for c in commands: if vars is not None: c = c.format(**vars) result = self.child.expect(expects, timeout = timeout) self.child.sendline(c) if result == 2: break if not result == 2: result = self.child.expect(expects, timeout = timeout) self.child.close() output = self._logclean(self.mylog.getvalue().decode(), True) self.output = output if result in [0, 1]: # lastcommand = commands[-1] # if vars is not None: # lastcommand = lastcommand.format(**vars) # last_command_index = output.rfind(lastcommand) # cleaned_output = output[last_command_index + len(lastcommand):].strip() self.result = {} for e in expected: if vars is not None: e = e.format(**vars) updatedprompt = re.sub(r'(?<!\\)\$', '', prompt) newpattern = f".*({updatedprompt}).*{e}.*" cleaned_output = output cleaned_output = re.sub(newpattern, '', cleaned_output) if e in cleaned_output: self.result[e] = True else: self.result[e]= False self.status = 0 return self.result if result == 2: self.result = None self.status = 2 return output else: self.result = None self.output = connect self.status = 1 return connect @MethodHook def _generate_ssh_sftp_cmd(self): cmd = self.protocol if self.idletime > 0: cmd += " -o ServerAliveInterval=" + str(self.idletime) if self.port: if self.protocol == "ssh": cmd += " -p " + self.port elif self.protocol == "sftp": cmd += " -P " + self.port if self.options: cmd += " " + self.options if self.jumphost: cmd += " " + self.jumphost user_host = f"{self.user}@{self.host}" if self.user else self.host cmd += f" {user_host}" return cmd @MethodHook def _generate_telnet_cmd(self): cmd = f"telnet {self.host}" if self.port: cmd += f" {self.port}" if self.options: cmd += f" {self.options}" return cmd @MethodHook def _generate_kube_cmd(self): cmd = f"kubectl exec {self.options} {self.host} -it --" kube_command = self.tags.get("kube_command", "/bin/bash") if isinstance(self.tags, dict) else "/bin/bash" cmd += f" {kube_command}" return cmd @MethodHook def _generate_docker_cmd(self): cmd = f"docker {self.options} exec -it {self.host}" docker_command = self.tags.get("docker_command", "/bin/bash") if isinstance(self.tags, dict) else "/bin/bash" cmd += f" {docker_command}" return cmd @MethodHook def _get_cmd(self): if self.protocol in ["ssh", "sftp"]: return self._generate_ssh_sftp_cmd() elif self.protocol == "telnet": return self._generate_telnet_cmd() elif self.protocol == "kubectl": return self._generate_kube_cmd() elif self.protocol == "docker": return self._generate_docker_cmd() else: printer.error(f"Invalid protocol: {self.protocol}") sys.exit(1) @MethodHook def _connect(self, debug=False, timeout=10, max_attempts=3, logger=None): cmd = self._get_cmd() passwords = self._passtx(self.password) if self.password and any(self.password) else [] if self.logs != '': self.logfile = self._logfile() default_prompt = r'>$|#$|\$$|>.$|#.$|\$.$' prompt = self.tags.get("prompt", default_prompt) if isinstance(self.tags, dict) else default_prompt password_prompt = '[p|P]assword:|[u|U]sername:' if self.protocol != 'telnet' else '[p|P]assword:' expects = { "ssh": ['yes/no', 'refused', 'supported', 'Invalid|[u|U]sage: ssh', 'ssh-keygen.*\"', 'timeout|timed.out', 'unavailable', 'closed', password_prompt, prompt, 'suspend', pexpect.EOF, pexpect.TIMEOUT, "No route to host", "resolve hostname", "no matching", "[b|B]ad (owner|permissions)"], "sftp": ['yes/no', 'refused', 'supported', 'Invalid|[u|U]sage: sftp', 'ssh-keygen.*\"', 'timeout|timed.out', 'unavailable', 'closed', password_prompt, prompt, 'suspend', pexpect.EOF, pexpect.TIMEOUT, "No route to host", "resolve hostname", "no matching", "[b|B]ad (owner|permissions)"], "telnet": ['[u|U]sername:', 'refused', 'supported', 'invalid|unrecognized option', 'ssh-keygen.*\"', 'timeout|timed.out', 'unavailable', 'closed', password_prompt, prompt, 'suspend', pexpect.EOF, pexpect.TIMEOUT, "No route to host", "resolve hostname", "no matching", "[b|B]ad (owner|permissions)"], "kubectl": ['[u|U]sername:', '[r|R]efused', '[E|e]rror', 'DEPRECATED', pexpect.TIMEOUT, password_prompt, prompt, pexpect.EOF, "expired|invalid"], "docker": ['[u|U]sername:', 'Cannot', '[E|e]rror', 'failed', 'not a docker command', 'unknown', 'unable to resolve', pexpect.TIMEOUT, password_prompt, prompt, pexpect.EOF] } error_indices = { "ssh": [1, 2, 3, 4, 5, 6, 7, 12, 13, 14, 15, 16], "sftp": [1, 2, 3, 4, 5, 6, 7, 12, 13, 14, 15, 16], "telnet": [1, 2, 3, 4, 5, 6, 7, 12, 13, 14, 15, 16], "kubectl": [1, 2, 3, 4, 8], # Define error indices for kube "docker": [1, 2, 3, 4, 5, 6, 7] # Define error indices for docker } eof_indices = { "ssh": [8, 9, 10, 11], "sftp": [8, 9, 10, 11], "telnet": [8, 9, 10, 11], "kubectl": [5, 6, 7], # Define eof indices for kube "docker": [8, 9, 10] # Define eof indices for docker } initial_indices = { "ssh": [0], "sftp": [0], "telnet": [0], "kubectl": [0], # Define special indices for kube "docker": [0] # Define special indices for docker } attempts = 1 while attempts <= max_attempts: child = pexpect.spawn(cmd) if isinstance(self.tags, dict) and self.tags.get("console"): child.sendline() if debug: if logger: logger("debug", f"Command:\n{cmd}") self.mylog = io.BytesIO() child.logfile_read = self.mylog endloop = False for i in range(len(passwords) if passwords else 1): while True: results = child.expect(expects[self.protocol], timeout=timeout) results_value = expects[self.protocol][results] if results in initial_indices[self.protocol]: if self.protocol in ["ssh", "sftp"]: child.sendline('yes') elif self.protocol in ["telnet", "kubectl", "docker"]: if self.user: child.sendline(self.user) else: self.missingtext = True break elif results in error_indices[self.protocol]: child.terminate() if results_value == pexpect.TIMEOUT and attempts != max_attempts: attempts += 1 endloop = True break else: after = "Connection timeout" if results_value == pexpect.TIMEOUT else child.after.decode() return f"Connection failed code: {results}\n{child.before.decode().lstrip()}{after}{child.readline().decode()}".rstrip() elif results in eof_indices[self.protocol]: if results_value == password_prompt: if passwords: child.sendline(passwords[i]) else: self.missingtext = True break elif results_value == "suspend": child.sendline("\r") sleep(2) else: endloop = True child.sendline() break if endloop: break if results_value == pexpect.TIMEOUT: continue else: break if isinstance(self.tags, dict) and self.tags.get("post_connect_commands"): cmds = self.tags.get("post_connect_commands") commands = [cmds] if isinstance(cmds, str) else cmds for command in commands: child.sendline(command) sleep(1) child.readline(0) self.child = child from pexpect import fdpexpect self.raw_child = fdpexpect.fdspawn(self.child.child_fd) return TrueThis class generates a node object. Containts all the information and methods to connect and interact with a device using ssh or telnet.
Attributes:
- output (str): Output of the commands you ran with run or test method. - result(bool): True if expected value is found after running the commands using test method. - status (int): 0 if the method run or test run successfully. 1 if connection failed. 2 if expect timeouts without prompt or EOF.Parameters:
- unique (str): Unique name to assign to the node. - host (str): IP address or hostname of the node.Optional Parameters:
- options (str): Additional options to pass the ssh/telnet for connection. - logs (str): Path/file for storing the logs. You can use ${unique},${host}, ${port}, ${user}, ${protocol} as variables. - password (str): Encrypted or plaintext password. - port (str): Port to connect to node, default 22 for ssh and 23 for telnet. - protocol (str): Select ssh, telnet, kubectl or docker. Default is ssh. - user (str): Username to of the node. - config (obj): Pass the object created with class configfile with key for decryption and extra configuration if you are using connection manager. - tags (dict) : Tags useful for automation and personal porpuse like "os", "prompt" and "screenleght_command" - jumphost (str): Reference another node to be used as a jumphostMethods
def interact(self, debug=False, logger=None)-
Expand source code
@MethodHook def interact(self, debug = False, logger = None): ''' Allow user to interact with the node directly, mostly used by connection manager. ### Optional Parameters: - debug (bool): If True, display all the connecting information before interact. Default False. - logger (callable): Optional callback for status reporting. ''' connect = self._connect(debug = debug, logger = logger) if connect == True: size = re.search('columns=([0-9]+).*lines=([0-9]+)',str(os.get_terminal_size())) self.child.setwinsize(int(size.group(2)),int(size.group(1))) if logger: logger("success", "Connected to " + self.unique + " at " + self.host + (":" if self.port != '' else '') + self.port + " via: " + self.protocol) if 'logfile' in dir(self): # Initialize self.mylog if not 'mylog' in dir(self): self.mylog = io.BytesIO() self.child.logfile_read = self.mylog # Start the _savelog thread log_thread = threading.Thread(target=self._savelog) log_thread.daemon = True log_thread.start() if 'missingtext' in dir(self): print(self.child.after.decode(), end='') if self.idletime > 0: x = threading.Thread(target=self._keepalive) x.daemon = True x.start() if debug: print(self.mylog.getvalue().decode()) self.child.interact(input_filter=self._filter) if 'logfile' in dir(self): with open(self.logfile, "w") as f: f.write(self._logclean(self.mylog.getvalue().decode(), True)) else: if logger: logger("error", str(connect)) else: printer.error(f"Connection failed: {str(connect)}") sys.exit(1)Allow user to interact with the node directly, mostly used by connection manager.
Optional Parameters:
- debug (bool): If True, display all the connecting information before interact. Default False. - logger (callable): Optional callback for status reporting. def run(self,
commands,
vars=None,
*,
folder='',
prompt='>$|#$|\\$$|>.$|#.$|\\$.$',
stdout=False,
timeout=10,
logger=None)-
Expand source code
@MethodHook def run(self, commands, vars = None,*, folder = '', prompt = r'>$|#$|\$$|>.$|#.$|\$.$', stdout = False, timeout = 10, logger = None): ''' Run a command or list of commands on the node and return the output. ### Parameters: - commands (str/list): Commands to run on the node. Should be str or a list of str. You can use variables as {varname} and defining them in optional parameter vars. ### Optional Parameters: - vars (dict): Dictionary containing the definition of variables used in commands parameter. Keys: Variable names. Values: strings. ### Optional Named Parameters: - folder (str): Path where output log should be stored, leave empty to disable logging. - prompt (str): Prompt to be expected after a command is finished running. Usually linux uses ">" or EOF while routers use ">" or "#". The default value should work for most nodes. Change it if your connection need some special symbol. - stdout (bool):Set True to send the command output to stdout. default False. - timeout (int):Time in seconds for expect to wait for prompt/EOF. default 10. ### Returns: str: Output of the commands you ran on the node. ''' connect = self._connect(timeout = timeout, logger = logger) now = datetime.datetime.now().strftime('%Y-%m-%d_%H%M%S') if connect == True: if logger: logger("success", "Connected to " + self.unique + " at " + self.host + (":" if self.port != '' else '') + self.port + " via: " + self.protocol) # Attempt to set the terminal size try: self.child.setwinsize(65535, 65535) except Exception: try: self.child.setwinsize(10000, 10000) except Exception: pass if "prompt" in self.tags: prompt = self.tags["prompt"] expects = [prompt, pexpect.EOF, pexpect.TIMEOUT] output = '' status = '' if not isinstance(commands, list): commands = [commands] if "screen_length_command" in self.tags: commands.insert(0, self.tags["screen_length_command"]) self.mylog = io.BytesIO() self.child.logfile_read = self.mylog for c in commands: if vars is not None: c = c.format(**vars) result = self.child.expect(expects, timeout = timeout) self.child.sendline(c) if result == 2: break if not result == 2: result = self.child.expect(expects, timeout = timeout) self.child.close() output = self._logclean(self.mylog.getvalue().decode(), True) if logger: logger("output", output) if folder != '': with open(folder + "/" + self.unique + "_" + now + ".txt", "w") as f: f.write(output) f.close() self.output = output if result == 2: self.status = 2 else: self.status = 0 return output else: self.output = connect self.status = 1 if logger: logger("error", f"Connection failed: {connect}") if folder != '': with open(folder + "/" + self.unique + "_" + now + ".txt", "w") as f: f.write(connect) f.close() return connectRun a command or list of commands on the node and return the output.
Parameters:
- commands (str/list): Commands to run on the node. Should be str or a list of str. You can use variables as {varname} and defining them in optional parameter vars.Optional Parameters:
- vars (dict): Dictionary containing the definition of variables used in commands parameter. Keys: Variable names. Values: strings.Optional Named Parameters:
- folder (str): Path where output log should be stored, leave empty to disable logging. - prompt (str): Prompt to be expected after a command is finished running. Usually linux uses ">" or EOF while routers use ">" or "#". The default value should work for most nodes. Change it if your connection need some special symbol. - stdout (bool):Set True to send the command output to stdout. default False. - timeout (int):Time in seconds for expect to wait for prompt/EOF. default 10.Returns:
str: Output of the commands you ran on the node. def test(self,
commands,
expected,
vars=None,
*,
prompt='>$|#$|\\$$|>.$|#.$|\\$.$',
timeout=10,
logger=None)-
Expand source code
@MethodHook def test(self, commands, expected, vars = None,*, prompt = r'>$|#$|\$$|>.$|#.$|\$.$', timeout = 10, logger = None): ''' Run a command or list of commands on the node, then check if expected value appears on the output after the last command. ### Parameters: - commands (str/list): Commands to run on the node. Should be str or a list of str. You can use variables as {varname} and defining them in optional parameter vars. - expected (str) : Expected text to appear after running all the commands on the node.You can use variables as {varname} and defining them in optional parameter vars. ### Optional Parameters: - vars (dict): Dictionary containing the definition of variables used in commands and expected parameters. Keys: Variable names. Values: strings. ### Optional Named Parameters: - prompt (str): Prompt to be expected after a command is finished running. Usually linux uses ">" or EOF while routers use ">" or "#". The default value should work for most nodes. Change it if your connection need some special symbol. - timeout (int):Time in seconds for expect to wait for prompt/EOF. default 10. ### Returns: bool: true if expected value is found after running the commands false if prompt is found before. ''' connect = self._connect(timeout = timeout, logger = logger) if connect == True: if logger: logger("success", "Connected to " + self.unique + " at " + self.host + (":" if self.port != '' else '') + self.port + " via: " + self.protocol) # Attempt to set the terminal size try: self.child.setwinsize(65535, 65535) except Exception: try: self.child.setwinsize(10000, 10000) except Exception: pass if "prompt" in self.tags: prompt = self.tags["prompt"] expects = [prompt, pexpect.EOF, pexpect.TIMEOUT] output = '' if not isinstance(commands, list): commands = [commands] if not isinstance(expected, list): expected = [expected] if "screen_length_command" in self.tags: commands.insert(0, self.tags["screen_length_command"]) self.mylog = io.BytesIO() self.child.logfile_read = self.mylog for c in commands: if vars is not None: c = c.format(**vars) result = self.child.expect(expects, timeout = timeout) self.child.sendline(c) if result == 2: break if not result == 2: result = self.child.expect(expects, timeout = timeout) self.child.close() output = self._logclean(self.mylog.getvalue().decode(), True) self.output = output if result in [0, 1]: # lastcommand = commands[-1] # if vars is not None: # lastcommand = lastcommand.format(**vars) # last_command_index = output.rfind(lastcommand) # cleaned_output = output[last_command_index + len(lastcommand):].strip() self.result = {} for e in expected: if vars is not None: e = e.format(**vars) updatedprompt = re.sub(r'(?<!\\)\$', '', prompt) newpattern = f".*({updatedprompt}).*{e}.*" cleaned_output = output cleaned_output = re.sub(newpattern, '', cleaned_output) if e in cleaned_output: self.result[e] = True else: self.result[e]= False self.status = 0 return self.result if result == 2: self.result = None self.status = 2 return output else: self.result = None self.output = connect self.status = 1 return connectRun a command or list of commands on the node, then check if expected value appears on the output after the last command.
Parameters:
- commands (str/list): Commands to run on the node. Should be str or a list of str. You can use variables as {varname} and defining them in optional parameter vars. - expected (str) : Expected text to appear after running all the commands on the node.You can use variables as {varname} and defining them in optional parameter vars.Optional Parameters:
- vars (dict): Dictionary containing the definition of variables used in commands and expected parameters. Keys: Variable names. Values: strings.Optional Named Parameters:
- prompt (str): Prompt to be expected after a command is finished running. Usually linux uses ">" or EOF while routers use ">" or "#". The default value should work for most nodes. Change it if your connection need some special symbol. - timeout (int):Time in seconds for expect to wait for prompt/EOF. default 10.Returns:
bool: true if expected value is found after running the commands false if prompt is found before.
class nodes (nodes: dict, config='')-
Expand source code
@ClassHook class nodes: ''' This class generates a nodes object. Contains a list of node class objects and methods to run multiple tasks on nodes simultaneously. ### Attributes: - nodelist (list): List of node class objects passed to the init function. - output (dict): Dictionary formed by nodes unique as keys, output of the commands you ran on the node as value. Created after running methods run or test. - result (dict): Dictionary formed by nodes unique as keys, value is True if expected value is found after running the commands, False if prompt is found before. Created after running method test. - status (dict): Dictionary formed by nodes unique as keys, value: 0 if method run or test ended successfully. 1 if connection failed. 2 if expect timeouts without prompt or EOF. - <unique> (obj): For each item in nodelist, there is an attribute generated with the node unique. ''' def __init__(self, nodes: dict, config = ''): ''' ### Parameters: - nodes (dict): Dictionary formed by node information: Keys: Unique name for each node. Mandatory Subkeys: host(str). Optional Subkeys: options(str), logs(str), password(str), port(str), protocol(str), user(str). For reference on subkeys check node class. ### Optional Parameters: - config (obj): Pass the object created with class configfile with key for decryption and extra configuration if you are using connection manager. ''' self.nodelist = [] self.config = config for n in nodes: this = node(n, **nodes[n], config = config) self.nodelist.append(this) setattr(self,n,this) @MethodHook def _splitlist(self, lst, n): #split a list in lists of n members. for i in range(0, len(lst), n): yield lst[i:i + n] @MethodHook def run(self, commands, vars = None,*, folder = None, prompt = None, stdout = None, parallel = 10, timeout = None, on_complete = None, logger = None): ''' Run a command or list of commands on all the nodes in nodelist. ### Parameters: - commands (str/list): Commands to run on the nodes. Should be str or list of str. You can use variables as {varname} and defining them in optional parameter vars. ### Optional Parameters: - vars (dict): Dictionary containing the definition of variables for each node, used in commands parameter. Keys should be formed by nodes unique names. Use special key name __global__ for global variables. Subkeys: Variable names. Values: strings. ### Optional Named Parameters: - folder (str): Path where output log should be stored, leave empty to disable logging. - prompt (str): Prompt to be expected after a command is finished running. Usually linux uses ">" or EOF while routers use ">" or "#". The default value should work for most nodes. Change it if your connection need some special symbol. - stdout (bool): Set True to send the command output to stdout. Default False. - parallel (int): Number of nodes to run the commands simultaneously. Default is 10, if there are more nodes that this value, nodes are groups in groups with max this number of members. - timeout (int): Time in seconds for expect to wait for prompt/EOF. default 10. - on_complete (callable): Optional callback called when each node finishes. Receives (unique, output, status). Called from the node's thread so it must be thread-safe. ###Returns: dict: Dictionary formed by nodes unique as keys, Output of the commands you ran on the node as value. ''' args = {} nodesargs = {} args["commands"] = commands if folder != None: args["folder"] = folder Path(folder).mkdir(parents=True, exist_ok=True) if prompt != None: args["prompt"] = prompt if stdout != None and on_complete is None: args["stdout"] = stdout if timeout != None: args["timeout"] = timeout output = {} status = {} tasks = [] def _run_node(node_obj, node_args, callback): """Wrapper that runs a node and fires the callback on completion.""" node_obj.run(**node_args) if callback: callback(node_obj.unique, node_obj.output, node_obj.status) for n in self.nodelist: nodesargs[n.unique] = deepcopy(args) if vars != None: nodesargs[n.unique]["vars"] = {} if "__global__" in vars.keys(): nodesargs[n.unique]["vars"].update(vars["__global__"]) if n.unique in vars.keys(): nodesargs[n.unique]["vars"].update(vars[n.unique]) # Pass the logger to the node nodesargs[n.unique]["logger"] = logger if on_complete: tasks.append(threading.Thread(target=_run_node, args=(n, nodesargs[n.unique], on_complete))) else: tasks.append(threading.Thread(target=n.run, kwargs=nodesargs[n.unique])) taskslist = list(self._splitlist(tasks, parallel)) for t in taskslist: for i in t: i.start() for i in t: i.join() for i in self.nodelist: output[i.unique] = i.output status[i.unique] = i.status self.output = output self.status = status return output @MethodHook def test(self, commands, expected, vars = None,*, prompt = None, parallel = 10, timeout = None, on_complete = None, logger = None): ''' Run a command or list of commands on all the nodes in nodelist, then check if expected value appears on the output after the last command. ### Parameters: - commands (str/list): Commands to run on the node. Should be str or list of str. - expected (str) : Expected text to appear after running all the commands on the node. ### Optional Parameters: - vars (dict): Dictionary containing the definition of variables for each node, used in commands and expected parameters. Keys should be formed by nodes unique names. Use special key name __global__ for global variables. Subkeys: Variable names. Values: strings. ### Optional Named Parameters: - prompt (str): Prompt to be expected after a command is finished running. Usually linux uses ">" or EOF while routers use ">" or "#". The default value should work for most nodes. Change it if your connection need some special symbol. - parallel (int): Number of nodes to run the commands simultaneously. Default is 10, if there are more nodes that this value, nodes are groups in groups with max this number of members. - timeout (int): Time in seconds for expect to wait for prompt/EOF. default 10. - on_complete (callable): Optional callback called when each node finishes. Receives (unique, output, status). Called from the node's thread so it must be thread-safe. ### Returns: dict: Dictionary formed by nodes unique as keys, value is True if expected value is found after running the commands, False if prompt is found before. ''' args = {} nodesargs = {} args["commands"] = commands args["expected"] = expected if prompt != None: args["prompt"] = prompt if timeout != None: args["timeout"] = timeout output = {} result = {} status = {} tasks = [] def _test_node(node_obj, node_args, callback): """Wrapper that runs a node test and fires the callback on completion.""" node_obj.test(**node_args) if callback: callback(node_obj.unique, node_obj.output, node_obj.status, node_obj.result) for n in self.nodelist: nodesargs[n.unique] = deepcopy(args) if vars != None: nodesargs[n.unique]["vars"] = {} if "__global__" in vars.keys(): nodesargs[n.unique]["vars"].update(vars["__global__"]) if n.unique in vars.keys(): nodesargs[n.unique]["vars"].update(vars[n.unique]) nodesargs[n.unique]["logger"] = logger if on_complete: tasks.append(threading.Thread(target=_test_node, args=(n, nodesargs[n.unique], on_complete))) else: tasks.append(threading.Thread(target=n.test, kwargs=nodesargs[n.unique])) taskslist = list(self._splitlist(tasks, parallel)) for t in taskslist: for i in t: i.start() for i in t: i.join() for i in self.nodelist: result[i.unique] = i.result output[i.unique] = i.output status[i.unique] = i.status self.output = output self.result = result self.status = status return resultThis class generates a nodes object. Contains a list of node class objects and methods to run multiple tasks on nodes simultaneously.
Attributes:
- nodelist (list): List of node class objects passed to the init function. - output (dict): Dictionary formed by nodes unique as keys, output of the commands you ran on the node as value. Created after running methods run or test. - result (dict): Dictionary formed by nodes unique as keys, value is True if expected value is found after running the commands, False if prompt is found before. Created after running method test. - status (dict): Dictionary formed by nodes unique as keys, value: 0 if method run or test ended successfully. 1 if connection failed. 2 if expect timeouts without prompt or EOF. - <unique> (obj): For each item in nodelist, there is an attribute generated with the node unique.Parameters:
- nodes (dict): Dictionary formed by node information: Keys: Unique name for each node. Mandatory Subkeys: host(str). Optional Subkeys: options(str), logs(str), password(str), port(str), protocol(str), user(str). For reference on subkeys check node class.Optional Parameters:
- config (obj): Pass the object created with class configfile with key for decryption and extra configuration if you are using connection manager.Methods
def run(self,
commands,
vars=None,
*,
folder=None,
prompt=None,
stdout=None,
parallel=10,
timeout=None,
on_complete=None,
logger=None)-
Expand source code
@MethodHook def run(self, commands, vars = None,*, folder = None, prompt = None, stdout = None, parallel = 10, timeout = None, on_complete = None, logger = None): ''' Run a command or list of commands on all the nodes in nodelist. ### Parameters: - commands (str/list): Commands to run on the nodes. Should be str or list of str. You can use variables as {varname} and defining them in optional parameter vars. ### Optional Parameters: - vars (dict): Dictionary containing the definition of variables for each node, used in commands parameter. Keys should be formed by nodes unique names. Use special key name __global__ for global variables. Subkeys: Variable names. Values: strings. ### Optional Named Parameters: - folder (str): Path where output log should be stored, leave empty to disable logging. - prompt (str): Prompt to be expected after a command is finished running. Usually linux uses ">" or EOF while routers use ">" or "#". The default value should work for most nodes. Change it if your connection need some special symbol. - stdout (bool): Set True to send the command output to stdout. Default False. - parallel (int): Number of nodes to run the commands simultaneously. Default is 10, if there are more nodes that this value, nodes are groups in groups with max this number of members. - timeout (int): Time in seconds for expect to wait for prompt/EOF. default 10. - on_complete (callable): Optional callback called when each node finishes. Receives (unique, output, status). Called from the node's thread so it must be thread-safe. ###Returns: dict: Dictionary formed by nodes unique as keys, Output of the commands you ran on the node as value. ''' args = {} nodesargs = {} args["commands"] = commands if folder != None: args["folder"] = folder Path(folder).mkdir(parents=True, exist_ok=True) if prompt != None: args["prompt"] = prompt if stdout != None and on_complete is None: args["stdout"] = stdout if timeout != None: args["timeout"] = timeout output = {} status = {} tasks = [] def _run_node(node_obj, node_args, callback): """Wrapper that runs a node and fires the callback on completion.""" node_obj.run(**node_args) if callback: callback(node_obj.unique, node_obj.output, node_obj.status) for n in self.nodelist: nodesargs[n.unique] = deepcopy(args) if vars != None: nodesargs[n.unique]["vars"] = {} if "__global__" in vars.keys(): nodesargs[n.unique]["vars"].update(vars["__global__"]) if n.unique in vars.keys(): nodesargs[n.unique]["vars"].update(vars[n.unique]) # Pass the logger to the node nodesargs[n.unique]["logger"] = logger if on_complete: tasks.append(threading.Thread(target=_run_node, args=(n, nodesargs[n.unique], on_complete))) else: tasks.append(threading.Thread(target=n.run, kwargs=nodesargs[n.unique])) taskslist = list(self._splitlist(tasks, parallel)) for t in taskslist: for i in t: i.start() for i in t: i.join() for i in self.nodelist: output[i.unique] = i.output status[i.unique] = i.status self.output = output self.status = status return outputRun a command or list of commands on all the nodes in nodelist.
Parameters:
- commands (str/list): Commands to run on the nodes. Should be str or list of str. You can use variables as {varname} and defining them in optional parameter vars.Optional Parameters:
- vars (dict): Dictionary containing the definition of variables for each node, used in commands parameter. Keys should be formed by nodes unique names. Use special key name __global__ for global variables. Subkeys: Variable names. Values: strings.Optional Named Parameters:
- folder (str): Path where output log should be stored, leave empty to disable logging. - prompt (str): Prompt to be expected after a command is finished running. Usually linux uses ">" or EOF while routers use ">" or "#". The default value should work for most nodes. Change it if your connection need some special symbol. - stdout (bool): Set True to send the command output to stdout. Default False. - parallel (int): Number of nodes to run the commands simultaneously. Default is 10, if there are more nodes that this value, nodes are groups in groups with max this number of members. - timeout (int): Time in seconds for expect to wait for prompt/EOF. default 10. - on_complete (callable): Optional callback called when each node finishes. Receives (unique, output, status). Called from the node's thread so it must be thread-safe.Returns:
dict: Dictionary formed by nodes unique as keys, Output of the commands you ran on the node as value. def test(self,
commands,
expected,
vars=None,
*,
prompt=None,
parallel=10,
timeout=None,
on_complete=None,
logger=None)-
Expand source code
@MethodHook def test(self, commands, expected, vars = None,*, prompt = None, parallel = 10, timeout = None, on_complete = None, logger = None): ''' Run a command or list of commands on all the nodes in nodelist, then check if expected value appears on the output after the last command. ### Parameters: - commands (str/list): Commands to run on the node. Should be str or list of str. - expected (str) : Expected text to appear after running all the commands on the node. ### Optional Parameters: - vars (dict): Dictionary containing the definition of variables for each node, used in commands and expected parameters. Keys should be formed by nodes unique names. Use special key name __global__ for global variables. Subkeys: Variable names. Values: strings. ### Optional Named Parameters: - prompt (str): Prompt to be expected after a command is finished running. Usually linux uses ">" or EOF while routers use ">" or "#". The default value should work for most nodes. Change it if your connection need some special symbol. - parallel (int): Number of nodes to run the commands simultaneously. Default is 10, if there are more nodes that this value, nodes are groups in groups with max this number of members. - timeout (int): Time in seconds for expect to wait for prompt/EOF. default 10. - on_complete (callable): Optional callback called when each node finishes. Receives (unique, output, status). Called from the node's thread so it must be thread-safe. ### Returns: dict: Dictionary formed by nodes unique as keys, value is True if expected value is found after running the commands, False if prompt is found before. ''' args = {} nodesargs = {} args["commands"] = commands args["expected"] = expected if prompt != None: args["prompt"] = prompt if timeout != None: args["timeout"] = timeout output = {} result = {} status = {} tasks = [] def _test_node(node_obj, node_args, callback): """Wrapper that runs a node test and fires the callback on completion.""" node_obj.test(**node_args) if callback: callback(node_obj.unique, node_obj.output, node_obj.status, node_obj.result) for n in self.nodelist: nodesargs[n.unique] = deepcopy(args) if vars != None: nodesargs[n.unique]["vars"] = {} if "__global__" in vars.keys(): nodesargs[n.unique]["vars"].update(vars["__global__"]) if n.unique in vars.keys(): nodesargs[n.unique]["vars"].update(vars[n.unique]) nodesargs[n.unique]["logger"] = logger if on_complete: tasks.append(threading.Thread(target=_test_node, args=(n, nodesargs[n.unique], on_complete))) else: tasks.append(threading.Thread(target=n.test, kwargs=nodesargs[n.unique])) taskslist = list(self._splitlist(tasks, parallel)) for t in taskslist: for i in t: i.start() for i in t: i.join() for i in self.nodelist: result[i.unique] = i.result output[i.unique] = i.output status[i.unique] = i.status self.output = output self.result = result self.status = status return resultRun a command or list of commands on all the nodes in nodelist, then check if expected value appears on the output after the last command.
Parameters:
- commands (str/list): Commands to run on the node. Should be str or list of str. - expected (str) : Expected text to appear after running all the commands on the node.Optional Parameters:
- vars (dict): Dictionary containing the definition of variables for each node, used in commands and expected parameters. Keys should be formed by nodes unique names. Use special key name __global__ for global variables. Subkeys: Variable names. Values: strings.Optional Named Parameters:
- prompt (str): Prompt to be expected after a command is finished running. Usually linux uses ">" or EOF while routers use ">" or "#". The default value should work for most nodes. Change it if your connection need some special symbol. - parallel (int): Number of nodes to run the commands simultaneously. Default is 10, if there are more nodes that this value, nodes are groups in groups with max this number of members. - timeout (int): Time in seconds for expect to wait for prompt/EOF. default 10. - on_complete (callable): Optional callback called when each node finishes. Receives (unique, output, status). Called from the node's thread so it must be thread-safe.Returns:
dict: Dictionary formed by nodes unique as keys, value is True if expected value is found after running the commands, False if prompt is found before.