add capture feature, change printing mechanics for all app, bug fixes

This commit is contained in:
2025-08-04 11:34:22 -03:00
parent c3f9f75f70
commit 4c76405549
14 changed files with 954 additions and 227 deletions

View File

@@ -154,9 +154,8 @@ options:
- **Purpose**: Handles parsing of command-line arguments.
- **Requirements**:
- Must contain only one method: `__init__`.
- The `__init__` method must initialize at least two attributes:
- The `__init__` method must initialize at least one attribute:
- `self.parser`: An instance of `argparse.ArgumentParser`.
- `self.description`: A string containing the description of the parser.
2. **Class `Entrypoint`**:
- **Purpose**: Acts as the entry point for plugin execution, utilizing parsed arguments and integrating with the main application.
- **Requirements**:
@@ -253,6 +252,97 @@ There are 2 methods that allows you to define custom logic to be executed before
- `if __name__ == "__main__":`
- This block allows the plugin to be run as a standalone script for testing or independent use.
### Command Completion Support
Plugins can provide intelligent **tab completion** by defining a function called `_connpy_completion` in the plugin script. This function will be called by Connpy to assist with command-line completion when the user types partial input.
#### Function Signature
```
def _connpy_completion(wordsnumber, words, info=None):
...
```
#### Parameters
| Parameter | Description |
|----------------|-------------|
| `wordsnumber` | Integer indicating the number of words (space-separated tokens) currently on the command line. For plugins, this typically starts at 3 (e.g., `connpy <plugin> ...`). |
| `words` | A list of tokens (words) already typed. `words[0]` is always the name of the plugin, followed by any subcommands or arguments. |
| `info` | A dictionary of structured context data provided by Connpy to help with suggestions. |
#### Contents of `info`
The `info` dictionary contains helpful context to generate completions:
```
info = {
"config": config_dict, # The full loaded configuration
"nodes": node_list, # List of all known node names
"folders": folder_list, # List of all defined folder names
"profiles": profile_list, # List of all profile names
"plugins": plugin_list # List of all plugin names
}
```
You can use this data to generate suggestions based on the current input.
#### Return Value
The function must return a list of suggestion strings to be presented to the user.
#### Example
```
def _connpy_completion(wordsnumber, words, info=None):
if wordsnumber == 3:
return ["--help", "--verbose", "start", "stop"]
elif wordsnumber == 4 and words[2] == "start":
return info["nodes"] # Suggest node names
return []
```
> In this example, if the user types `connpy myplugin start ` and presses Tab, it will suggest node names.
### Handling Unknown Arguments
Plugins can choose to accept and process unknown arguments that are **not explicitly defined** in the parser. To enable this behavior, the plugin must define the following hidden argument in its `Parser` class:
```
self.parser.add_argument(
"--unknown-args",
action="store_true",
default=True,
help=argparse.SUPPRESS
)
```
#### Behavior:
- When this argument is present, Connpy will parse the known arguments and capture any extra (unknown) ones.
- These unknown arguments will be passed to the plugin as `args.unknown_args` inside the `Entrypoint`.
- If the user does not pass any unknown arguments, `args.unknown_args` will contain the default value (`True`, unless overridden).
#### Example:
If a plugin accepts unknown tcpdump flags like this:
```
connpy myplugin -nn -s0
```
And defines the hidden `--unknown-args` flag as shown above, then:
- `args.unknown_args` inside `Entrypoint.__init__()` will be: `['-nn', '-s0']`
> This allows the plugin to receive and process arguments intended for external tools (e.g., `tcpdump`) without argparse raising an error.
#### Note:
If a plugin does **not** define `--unknown-args`, any extra arguments passed will cause argparse to fail with an unrecognized arguments error.
### Script Verification
- The `verify_script` method in `plugins.py` is used to check the plugin script's compliance with these standards.
- Non-compliant scripts will be rejected to ensure consistency and proper functionality within the plugin system.

View File

@@ -112,9 +112,8 @@ options:
- **Purpose**: Handles parsing of command-line arguments.
- **Requirements**:
- Must contain only one method: `__init__`.
- The `__init__` method must initialize at least two attributes:
- The `__init__` method must initialize at least one attribute:
- `self.parser`: An instance of `argparse.ArgumentParser`.
- `self.description`: A string containing the description of the parser.
2. **Class `Entrypoint`**:
- **Purpose**: Acts as the entry point for plugin execution, utilizing parsed arguments and integrating with the main application.
- **Requirements**:
@@ -210,6 +209,97 @@ There are 2 methods that allows you to define custom logic to be executed before
- `if __name__ == "__main__":`
- This block allows the plugin to be run as a standalone script for testing or independent use.
### Command Completion Support
Plugins can provide intelligent **tab completion** by defining a function called `_connpy_completion` in the plugin script. This function will be called by Connpy to assist with command-line completion when the user types partial input.
#### Function Signature
```
def _connpy_completion(wordsnumber, words, info=None):
...
```
#### Parameters
| Parameter | Description |
|----------------|-------------|
| `wordsnumber` | Integer indicating the number of words (space-separated tokens) currently on the command line. For plugins, this typically starts at 3 (e.g., `connpy <plugin> ...`). |
| `words` | A list of tokens (words) already typed. `words[0]` is always the name of the plugin, followed by any subcommands or arguments. |
| `info` | A dictionary of structured context data provided by Connpy to help with suggestions. |
#### Contents of `info`
The `info` dictionary contains helpful context to generate completions:
```
info = {
"config": config_dict, # The full loaded configuration
"nodes": node_list, # List of all known node names
"folders": folder_list, # List of all defined folder names
"profiles": profile_list, # List of all profile names
"plugins": plugin_list # List of all plugin names
}
```
You can use this data to generate suggestions based on the current input.
#### Return Value
The function must return a list of suggestion strings to be presented to the user.
#### Example
```
def _connpy_completion(wordsnumber, words, info=None):
if wordsnumber == 3:
return ["--help", "--verbose", "start", "stop"]
elif wordsnumber == 4 and words[2] == "start":
return info["nodes"] # Suggest node names
return []
```
> In this example, if the user types `connpy myplugin start ` and presses Tab, it will suggest node names.
### Handling Unknown Arguments
Plugins can choose to accept and process unknown arguments that are **not explicitly defined** in the parser. To enable this behavior, the plugin must define the following hidden argument in its `Parser` class:
```
self.parser.add_argument(
"--unknown-args",
action="store_true",
default=True,
help=argparse.SUPPRESS
)
```
#### Behavior:
- When this argument is present, Connpy will parse the known arguments and capture any extra (unknown) ones.
- These unknown arguments will be passed to the plugin as `args.unknown_args` inside the `Entrypoint`.
- If the user does not pass any unknown arguments, `args.unknown_args` will contain the default value (`True`, unless overridden).
#### Example:
If a plugin accepts unknown tcpdump flags like this:
```
connpy myplugin -nn -s0
```
And defines the hidden `--unknown-args` flag as shown above, then:
- `args.unknown_args` inside `Entrypoint.__init__()` will be: `['-nn', '-s0']`
> This allows the plugin to receive and process arguments intended for external tools (e.g., `tcpdump`) without argparse raising an error.
#### Note:
If a plugin does **not** define `--unknown-args`, any extra arguments passed will cause argparse to fail with an unrecognized arguments error.
### Script Verification
- The `verify_script` method in `plugins.py` is used to check the plugin script's compliance with these standards.
- Non-compliant scripts will be rejected to ensure consistency and proper functionality within the plugin system.
@@ -422,8 +512,9 @@ from .ai import ai
from .plugins import Plugins
from ._version import __version__
from pkg_resources import get_distribution
from . import printer
__all__ = ["node", "nodes", "configfile", "connapp", "ai", "Plugins"]
__all__ = ["node", "nodes", "configfile", "connapp", "ai", "Plugins", "printer"]
__author__ = "Federico Luzzi"
__pdoc__ = {
'core': False,
@@ -438,5 +529,6 @@ __pdoc__ = {
'node.deferred_class_hooks': False,
'nodes.deferred_class_hooks': False,
'connapp': False,
'connapp.encrypt': True
'connapp.encrypt': True,
'printer': False
}

View File

@@ -1,2 +1,2 @@
__version__ = "4.1.4"
__version__ = "4.2b1"

View File

@@ -1,6 +1,6 @@
from flask import Flask, request, jsonify
from flask_cors import CORS
from connpy import configfile, node, nodes, hooks
from connpy import configfile, node, nodes, hooks, printer
from connpy.ai import ai as myai
from waitress import serve
import os
@@ -143,7 +143,7 @@ def stop_api():
port = int(f.readline().strip())
PID_FILE=PID_FILE2
except:
print("Connpy api server is not running.")
printer.warning("Connpy API server is not running.")
return
# Send a SIGTERM signal to the process
try:
@@ -152,7 +152,7 @@ def stop_api():
pass
# Delete the PID file
os.remove(PID_FILE)
print(f"Server with process ID {pid} stopped.")
printer.info(f"Server with process ID {pid} stopped.")
return port
@hooks.MethodHook
@@ -168,7 +168,7 @@ def start_server(port=8048):
@hooks.MethodHook
def start_api(port=8048):
if os.path.exists(PID_FILE1) or os.path.exists(PID_FILE2):
print("Connpy server is already running.")
printer.warning("Connpy server is already running.")
return
pid = os.fork()
if pid == 0:
@@ -182,7 +182,7 @@ def start_api(port=8048):
with open(PID_FILE2, "w") as f:
f.write(str(pid) + "\n" + str(port))
except:
print("Cound't create PID file")
return
print(f'Server is running with process ID {pid} in port {port}')
printer.error("Couldn't create PID file.")
exit(1)
printer.start(f"Server is running with process ID {pid} on port {port}")

View File

@@ -8,6 +8,7 @@ import sys
import inquirer
from .core import node,nodes
from ._version import __version__
from . import printer
from .api import start_api,stop_api,debug_api,app
from .ai import ai
from .plugins import Plugins
@@ -17,8 +18,13 @@ class NoAliasDumper(yaml.SafeDumper):
def ignore_aliases(self, data):
return True
import ast
from rich import print as mdprint
from rich.markdown import Markdown
from rich.console import Console, Group
from rich.panel import Panel
from rich.text import Text
from rich.rule import Rule
from rich.style import Style
mdprint = Console().print
try:
from pyfzf.pyfzf import FzfPrompt
except:
@@ -70,7 +76,7 @@ class connapp:
'''
#DEFAULTPARSER
defaultparser = argparse.ArgumentParser(prog = "conn", description = "SSH and Telnet connection manager", formatter_class=argparse.RawTextHelpFormatter)
defaultparser = argparse.ArgumentParser(prog = "connpy", description = "SSH and Telnet connection manager", formatter_class=argparse.RawTextHelpFormatter)
subparsers = defaultparser.add_subparsers(title="Commands", dest="subcommand")
#NODEPARSER
nodeparser = subparsers.add_parser("node", formatter_class=argparse.RawTextHelpFormatter)
@@ -190,6 +196,10 @@ class connapp:
argv[0] = "profile"
if len(argv) < 1 or argv[0] not in self.commands:
argv.insert(0,"node")
args, unknown_args = defaultparser.parse_known_args(argv)
if hasattr(args, "unknown_args"):
args.unknown_args = unknown_args
else:
args = defaultparser.parse_args(argv)
if args.subcommand in self.plugins.plugins:
self.plugins.plugins[args.subcommand].Entrypoint(args, self.plugins.plugin_parsers[args.subcommand].parser, self)
@@ -211,14 +221,14 @@ class connapp:
return actions.get(args.action)(args)
def _version(self, args):
print(__version__)
printer.info(f"Connpy {__version__}")
def _connect(self, args):
if args.data == None:
matches = self.nodes_list
if len(matches) == 0:
print("There are no nodes created")
print("try: conn --help")
printer.warning("There are no nodes created")
printer.info("try: connpy --help")
exit(9)
else:
if args.data.startswith("@"):
@@ -226,7 +236,7 @@ class connapp:
else:
matches = list(filter(lambda k: k.startswith(args.data), self.nodes_list))
if len(matches) == 0:
print("{} not found".format(args.data))
printer.error("{} not found".format(args.data))
exit(2)
elif len(matches) > 1:
matches[0] = self._choose(matches,"node", "connect")
@@ -243,16 +253,16 @@ class connapp:
def _del(self, args):
if args.data == None:
print("Missing argument node")
printer.error("Missing argument node")
exit(3)
elif args.data.startswith("@"):
matches = list(filter(lambda k: k == args.data, self.folders))
else:
matches = self.config._getallnodes(args.data)
if len(matches) == 0:
print("{} not found".format(args.data))
printer.error("{} not found".format(args.data))
exit(2)
print("Removing: {}".format(matches))
printer.info("Removing: {}".format(matches))
question = [inquirer.Confirm("delete", message="Are you sure you want to continue?")]
confirm = inquirer.prompt(question)
if confirm == None:
@@ -267,14 +277,14 @@ class connapp:
self.config._connections_del(**nodeuniques)
self.config._saveconfig(self.config.file)
if len(matches) == 1:
print("{} deleted succesfully".format(matches[0]))
printer.success("{} deleted successfully".format(matches[0]))
else:
print(f"{len(matches)} nodes deleted succesfully")
printer.success(f"{len(matches)} nodes deleted successfully")
def _add(self, args):
args.data = self._type_node(args.data)
if args.data == None:
print("Missing argument node")
printer.error("Missing argument node")
exit(3)
elif args.data.startswith("@"):
type = "folder"
@@ -285,34 +295,34 @@ class connapp:
matches = list(filter(lambda k: k == args.data, self.nodes_list))
reversematches = list(filter(lambda k: k == "@" + args.data, self.folders))
if len(matches) > 0:
print("{} already exist".format(matches[0]))
printer.error("{} already exist".format(matches[0]))
exit(4)
if len(reversematches) > 0:
print("{} already exist".format(reversematches[0]))
printer.error("{} already exist".format(reversematches[0]))
exit(4)
else:
if type == "folder":
uniques = self.config._explode_unique(args.data)
if uniques == False:
print("Invalid folder {}".format(args.data))
printer.error("Invalid folder {}".format(args.data))
exit(5)
if "subfolder" in uniques.keys():
parent = "@" + uniques["folder"]
if parent not in self.folders:
print("Folder {} not found".format(uniques["folder"]))
printer.error("Folder {} not found".format(uniques["folder"]))
exit(2)
self.config._folder_add(**uniques)
self.config._saveconfig(self.config.file)
print("{} added succesfully".format(args.data))
printer.success("{} added successfully".format(args.data))
if type == "node":
nodefolder = args.data.partition("@")
nodefolder = "@" + nodefolder[2]
if nodefolder not in self.folders and nodefolder != "@":
print(nodefolder + " not found")
printer.error(nodefolder + " not found")
exit(2)
uniques = self.config._explode_unique(args.data)
if uniques == False:
print("Invalid node {}".format(args.data))
printer.error("Invalid node {}".format(args.data))
exit(5)
self._print_instructions()
newnode = self._questions_nodes(args.data, uniques)
@@ -320,44 +330,43 @@ class connapp:
exit(7)
self.config._connections_add(**newnode)
self.config._saveconfig(self.config.file)
print("{} added succesfully".format(args.data))
printer.success("{} added successfully".format(args.data))
def _show(self, args):
if args.data == None:
print("Missing argument node")
printer.error("Missing argument node")
exit(3)
matches = list(filter(lambda k: k == args.data, self.nodes_list))
if args.data.startswith("@"):
matches = list(filter(lambda k: args.data in k, self.nodes_list))
else:
matches = list(filter(lambda k: k.startswith(args.data), self.nodes_list))
if len(matches) == 0:
print("{} not found".format(args.data))
printer.error("{} not found".format(args.data))
exit(2)
elif len(matches) > 1:
matches[0] = self._choose(matches,"node", "connect")
if matches[0] == None:
exit(7)
node = self.config.getitem(matches[0])
for k, v in node.items():
if isinstance(v, str):
print(k + ": " + v)
elif isinstance(v, list):
print(k + ":")
for i in v:
print(" - " + i)
elif isinstance(v, dict):
print(k + ":")
for i,d in v.items():
print(" - " + i + ": " + str(d))
yaml_output = yaml.dump(node, sort_keys=False, default_flow_style=False)
printer.custom(matches[0],"")
print(yaml_output)
def _mod(self, args):
if args.data == None:
print("Missing argument node")
printer.error("Missing argument node")
exit(3)
matches = self.config._getallnodes(args.data)
if len(matches) == 0:
print("No connection found with filter: {}".format(args.data))
printer.error("No connection found with filter: {}".format(args.data))
exit(2)
elif len(matches) == 1:
uniques = self.config._explode_unique(args.data)
uniques = self.config._explode_unique(matches[0])
unique = matches[0]
else:
uniques = {"id": None, "folder": None}
unique = None
print("Editing: {}".format(matches))
printer.info("Editing: {}".format(matches))
node = {}
for i in matches:
node[i] = self.config.getitem(i)
@@ -371,12 +380,12 @@ class connapp:
uniques.update(node[matches[0]])
uniques["type"] = "connection"
if sorted(updatenode.items()) == sorted(uniques.items()):
print("Nothing to do here")
printer.info("Nothing to do here")
return
else:
self.config._connections_add(**updatenode)
self.config._saveconfig(self.config.file)
print("{} edited succesfully".format(args.data))
printer.success("{} edited successfully".format(args.data))
else:
for k in node:
updatednode = self.config._explode_unique(k)
@@ -388,12 +397,12 @@ class connapp:
editcount += 1
updatednode[key] = updatenode[key]
if not editcount:
print("Nothing to do here")
printer.info("Nothing to do here")
return
else:
self.config._connections_add(**updatednode)
self.config._saveconfig(self.config.file)
print("{} edited succesfully".format(matches))
printer.success("{} edited successfully".format(matches))
return
@@ -407,57 +416,48 @@ class connapp:
def _profile_del(self, args):
matches = list(filter(lambda k: k == args.data[0], self.profiles))
if len(matches) == 0:
print("{} not found".format(args.data[0]))
printer.error("{} not found".format(args.data[0]))
exit(2)
if matches[0] == "default":
print("Can't delete default profile")
printer.error("Can't delete default profile")
exit(6)
usedprofile = self.config._profileused(matches[0])
if len(usedprofile) > 0:
print("Profile {} used in the following nodes:".format(matches[0]))
print(", ".join(usedprofile))
printer.error(f"Profile {matches[0]} used in the following nodes:\n{', '.join(usedprofile)}")
exit(8)
question = [inquirer.Confirm("delete", message="Are you sure you want to delete {}?".format(matches[0]))]
confirm = inquirer.prompt(question)
if confirm["delete"]:
self.config._profiles_del(id = matches[0])
self.config._saveconfig(self.config.file)
print("{} deleted succesfully".format(matches[0]))
printer.success("{} deleted successfully".format(matches[0]))
def _profile_show(self, args):
matches = list(filter(lambda k: k == args.data[0], self.profiles))
if len(matches) == 0:
print("{} not found".format(args.data[0]))
printer.error("{} not found".format(args.data[0]))
exit(2)
profile = self.config.profiles[matches[0]]
for k, v in profile.items():
if isinstance(v, str):
print(k + ": " + v)
elif isinstance(v, list):
print(k + ":")
for i in v:
print(" - " + i)
elif isinstance(v, dict):
print(k + ":")
for i,d in v.items():
print(" - " + i + ": " + str(d))
yaml_output = yaml.dump(profile, sort_keys=False, default_flow_style=False)
printer.custom(matches[0],"")
print(yaml_output)
def _profile_add(self, args):
matches = list(filter(lambda k: k == args.data[0], self.profiles))
if len(matches) > 0:
print("Profile {} Already exist".format(matches[0]))
printer.error("Profile {} Already exist".format(matches[0]))
exit(4)
newprofile = self._questions_profiles(args.data[0])
if newprofile == False:
exit(7)
self.config._profiles_add(**newprofile)
self.config._saveconfig(self.config.file)
print("{} added succesfully".format(args.data[0]))
printer.success("{} added successfully".format(args.data[0]))
def _profile_mod(self, args):
matches = list(filter(lambda k: k == args.data[0], self.profiles))
if len(matches) == 0:
print("{} not found".format(args.data[0]))
printer.error("{} not found".format(args.data[0]))
exit(2)
profile = self.config.profiles[matches[0]]
oldprofile = {"id": matches[0]}
@@ -469,12 +469,12 @@ class connapp:
if not updateprofile:
exit(7)
if sorted(updateprofile.items()) == sorted(oldprofile.items()):
print("Nothing to do here")
printer.info("Nothing to do here")
return
else:
self.config._profiles_add(**updateprofile)
self.config._saveconfig(self.config.file)
print("{} edited succesfully".format(args.data[0]))
printer.success("{} edited successfully".format(args.data[0]))
def _func_others(self, args):
#Function called when using other commands
@@ -509,7 +509,9 @@ class connapp:
formated[upper_key] = upper_value
newitems.append(args.format[0].format(**formated))
items = newitems
print(*items, sep="\n")
yaml_output = yaml.dump(items, sort_keys=False, default_flow_style=False)
printer.custom(args.data,"")
print(yaml_output)
def _mvcp(self, args):
if not self.case:
@@ -518,20 +520,20 @@ class connapp:
source = list(filter(lambda k: k == args.data[0], self.nodes_list))
dest = list(filter(lambda k: k == args.data[1], self.nodes_list))
if len(source) != 1:
print("{} not found".format(args.data[0]))
printer.error("{} not found".format(args.data[0]))
exit(2)
if len(dest) > 0:
print("Node {} Already exist".format(args.data[1]))
printer.error("Node {} Already exist".format(args.data[1]))
exit(4)
nodefolder = args.data[1].partition("@")
nodefolder = "@" + nodefolder[2]
if nodefolder not in self.folders and nodefolder != "@":
print("{} not found".format(nodefolder))
printer.error("{} not found".format(nodefolder))
exit(2)
olduniques = self.config._explode_unique(args.data[0])
newuniques = self.config._explode_unique(args.data[1])
if newuniques == False:
print("Invalid node {}".format(args.data[1]))
printer.error("Invalid node {}".format(args.data[1]))
exit(5)
node = self.config.getitem(source[0])
newnode = {**newuniques, **node}
@@ -540,7 +542,7 @@ class connapp:
self.config._connections_del(**olduniques)
self.config._saveconfig(self.config.file)
action = "moved" if args.command == "move" else "copied"
print("{} {} succesfully to {}".format(args.data[0],action, args.data[1]))
printer.success("{} {} successfully to {}".format(args.data[0],action, args.data[1]))
def _bulk(self, args):
if args.file and os.path.isfile(args.file[0]):
@@ -549,7 +551,9 @@ class connapp:
# Expecting exactly 2 lines
if len(lines) < 2:
raise ValueError("The file must contain at least two lines: one for nodes, one for hosts.")
printer.error("The file must contain at least two lines: one for nodes, one for hosts.")
exit(11)
nodes = lines[0].strip()
hosts = lines[1].strip()
@@ -569,10 +573,10 @@ class connapp:
matches = list(filter(lambda k: k == unique, self.nodes_list))
reversematches = list(filter(lambda k: k == "@" + unique, self.folders))
if len(matches) > 0:
print("Node {} already exist, ignoring it".format(unique))
printer.info("Node {} already exist, ignoring it".format(unique))
continue
if len(reversematches) > 0:
print("Folder with name {} already exist, ignoring it".format(unique))
printer.info("Folder with name {} already exist, ignoring it".format(unique))
continue
newnode = {"id": n}
if newnodes["location"] != "":
@@ -596,9 +600,9 @@ class connapp:
self.nodes_list = self.config._getallnodes()
if count > 0:
self.config._saveconfig(self.config.file)
print("Succesfully added {} nodes".format(count))
printer.success("Successfully added {} nodes".format(count))
else:
print("0 nodes added")
printer.info("0 nodes added")
def _completion(self, args):
if args.data[0] == "bash":
@@ -633,7 +637,7 @@ class connapp:
folder = os.path.abspath(args.data[0]).rstrip('/')
with open(pathfile, "w") as f:
f.write(str(folder))
print("Config saved")
printer.success("Config saved")
def _openai(self, args):
if "openai" in self.config.config:
@@ -647,37 +651,37 @@ class connapp:
def _change_settings(self, name, value):
self.config.config[name] = value
self.config._saveconfig(self.config.file)
print("Config saved")
printer.success("Config saved")
def _func_plugin(self, args):
if args.add:
if not os.path.exists(args.add[1]):
print("File {} dosn't exists.".format(args.add[1]))
printer.error("File {} dosn't exists.".format(args.add[1]))
exit(14)
if args.add[0].isalpha() and args.add[0].islower() and len(args.add[0]) <= 15:
disabled_dest_file = os.path.join(self.config.defaultdir + "/plugins", args.add[0] + ".py.bkp")
if args.add[0] in self.commands or os.path.exists(disabled_dest_file):
print("Plugin name can't be the same as other commands.")
printer.error("Plugin name can't be the same as other commands.")
exit(15)
else:
check_bad_script = self.plugins.verify_script(args.add[1])
if check_bad_script:
print(check_bad_script)
printer.error(check_bad_script)
exit(16)
else:
try:
dest_file = os.path.join(self.config.defaultdir + "/plugins", args.add[0] + ".py")
shutil.copy2(args.add[1], dest_file)
print(f"Plugin {args.add[0]} added succesfully.")
printer.success(f"Plugin {args.add[0]} added successfully.")
except Exception as e:
print(f"Failed importing plugin file. {e}")
printer.error(f"Failed importing plugin file. {e}")
exit(17)
else:
print("Plugin name should be lowercase letters up to 15 characters.")
printer.error("Plugin name should be lowercase letters up to 15 characters.")
exit(15)
elif args.update:
if not os.path.exists(args.update[1]):
print("File {} dosn't exists.".format(args.update[1]))
printer.error("File {} dosn't exists.".format(args.update[1]))
exit(14)
plugin_file = os.path.join(self.config.defaultdir + "/plugins", args.update[0] + ".py")
disabled_plugin_file = os.path.join(self.config.defaultdir + "/plugins", args.update[0] + ".py.bkp")
@@ -686,7 +690,7 @@ class connapp:
if plugin_exist or disabled_plugin_exist:
check_bad_script = self.plugins.verify_script(args.update[1])
if check_bad_script:
print(check_bad_script)
printer.error(check_bad_script)
exit(16)
else:
try:
@@ -696,13 +700,13 @@ class connapp:
shutil.copy2(args.update[1], disabled_dest_file)
else:
shutil.copy2(args.update[1], dest_file)
print(f"Plugin {args.update[0]} updated succesfully.")
printer.success(f"Plugin {args.update[0]} updated successfully.")
except Exception as e:
print(f"Failed updating plugin file. {e}")
printer.error(f"Failed updating plugin file. {e}")
exit(17)
else:
print("Plugin {} dosn't exist.".format(args.update[0]))
printer.error("Plugin {} dosn't exist.".format(args.update[0]))
exit(14)
elif args.delete:
plugin_file = os.path.join(self.config.defaultdir + "/plugins", args.delete[0] + ".py")
@@ -710,7 +714,7 @@ class connapp:
plugin_exist = os.path.exists(plugin_file)
disabled_plugin_exist = os.path.exists(disabled_plugin_file)
if not plugin_exist and not disabled_plugin_exist:
print("Plugin {} dosn't exist.".format(args.delete[0]))
printer.error("Plugin {} dosn't exist.".format(args.delete[0]))
exit(14)
question = [inquirer.Confirm("delete", message="Are you sure you want to delete {} plugin?".format(args.delete[0]))]
confirm = inquirer.prompt(question)
@@ -722,33 +726,33 @@ class connapp:
os.remove(plugin_file)
elif disabled_plugin_exist:
os.remove(disabled_plugin_file)
print(f"plugin {args.delete[0]} deleted succesfully.")
printer.success(f"plugin {args.delete[0]} deleted successfully.")
except Exception as e:
print(f"Failed deleting plugin file. {e}")
printer.error(f"Failed deleting plugin file. {e}")
exit(17)
elif args.disable:
plugin_file = os.path.join(self.config.defaultdir + "/plugins", args.disable[0] + ".py")
disabled_plugin_file = os.path.join(self.config.defaultdir + "/plugins", args.disable[0] + ".py.bkp")
if not os.path.exists(plugin_file) or os.path.exists(disabled_plugin_file):
print("Plugin {} dosn't exist or it's disabled.".format(args.disable[0]))
printer.error("Plugin {} dosn't exist or it's disabled.".format(args.disable[0]))
exit(14)
try:
os.rename(plugin_file, disabled_plugin_file)
print(f"plugin {args.disable[0]} disabled succesfully.")
printer.success(f"plugin {args.disable[0]} disabled successfully.")
except Exception as e:
print(f"Failed disabling plugin file. {e}")
printer.error(f"Failed disabling plugin file. {e}")
exit(17)
elif args.enable:
plugin_file = os.path.join(self.config.defaultdir + "/plugins", args.enable[0] + ".py")
disabled_plugin_file = os.path.join(self.config.defaultdir + "/plugins", args.enable[0] + ".py.bkp")
if os.path.exists(plugin_file) or not os.path.exists(disabled_plugin_file):
print("Plugin {} dosn't exist or it's enabled.".format(args.enable[0]))
printer.error("Plugin {} dosn't exist or it's enabled.".format(args.enable[0]))
exit(14)
try:
os.rename(disabled_plugin_file, plugin_file)
print(f"plugin {args.enable[0]} enabled succesfully.")
printer.success(f"plugin {args.enable[0]} enabled successfully.")
except Exception as e:
print(f"Failed enabling plugin file. {e}")
printer.error(f"Failed enabling plugin file. {e}")
exit(17)
elif args.list:
enabled_files = []
@@ -768,18 +772,19 @@ class connapp:
if disabled_files:
plugins["Disabled"] = disabled_files
if plugins:
printer.custom("plugins","")
print(yaml.dump(plugins, sort_keys=False))
else:
print("There are no plugins added.")
printer.warning("There are no plugins added.")
def _func_import(self, args):
if not os.path.exists(args.data[0]):
print("File {} dosn't exist".format(args.data[0]))
printer.error("File {} dosn't exist".format(args.data[0]))
exit(14)
print("This could overwrite your current configuration!")
printer.warning("This could overwrite your current configuration!")
question = [inquirer.Confirm("import", message="Are you sure you want to import {} file?".format(args.data[0]))]
confirm = inquirer.prompt(question)
if confirm == None:
@@ -789,7 +794,7 @@ class connapp:
with open(args.data[0]) as file:
imported = yaml.load(file, Loader=yaml.FullLoader)
except:
print("failed reading file {}".format(args.data[0]))
printer.error("failed reading file {}".format(args.data[0]))
exit(10)
for k,v in imported.items():
uniques = self.config._explode_unique(k)
@@ -808,12 +813,12 @@ class connapp:
uniques.update(v)
self.config._connections_add(**uniques)
self.config._saveconfig(self.config.file)
print("File {} imported succesfully".format(args.data[0]))
printer.success("File {} imported successfully".format(args.data[0]))
return
def _func_export(self, args):
if os.path.exists(args.data[0]):
print("File {} already exists".format(args.data[0]))
printer.error("File {} already exists".format(args.data[0]))
exit(14)
if len(args.data[1:]) == 0:
foldercons = self.config._getallnodesfull(extract = False)
@@ -821,13 +826,13 @@ class connapp:
for folder in args.data[1:]:
matches = list(filter(lambda k: k == folder, self.folders))
if len(matches) == 0 and folder != "@":
print("{} folder not found".format(folder))
printer.error("{} folder not found".format(folder))
exit(2)
foldercons = self.config._getallnodesfull(args.data[1:], extract = False)
with open(args.data[0], "w") as file:
yaml.dump(foldercons, file, Dumper=NoAliasDumper, default_flow_style=False)
file.close()
print("File {} generated succesfully".format(args.data[0]))
printer.success("File {} generated successfully".format(args.data[0]))
exit()
return
@@ -977,13 +982,13 @@ class connapp:
def _yaml_generate(self, args):
if os.path.exists(args.data[0]):
print("File {} already exists".format(args.data[0]))
printer.error("File {} already exists".format(args.data[0]))
exit(14)
else:
with open(args.data[0], "w") as file:
file.write(self._help("generate"))
file.close()
print("File {} generated succesfully".format(args.data[0]))
printer.success("File {} generated successfully".format(args.data[0]))
exit()
def _yaml_run(self, args):
@@ -991,7 +996,7 @@ class connapp:
with open(args.data[0]) as file:
scripts = yaml.load(file, Loader=yaml.FullLoader)
except:
print("failed reading file {}".format(args.data[0]))
printer.error("failed reading file {}".format(args.data[0]))
exit(10)
for script in scripts["tasks"]:
self._cli_run(script)
@@ -1007,11 +1012,11 @@ class connapp:
if action == "test":
args["expected"] = script["expected"]
except KeyError as e:
print("'{}' is mandatory".format(e.args[0]))
printer.error("'{}' is mandatory".format(e.args[0]))
exit(11)
nodes = self.config._getallnodes(nodelist)
if len(nodes) == 0:
print("{} don't match any node".format(nodelist))
printer.error("{} don't match any node".format(nodelist))
exit(2)
nodes = self.nodes(self.config.getitems(nodes), config = self.config)
stdout = False
@@ -1037,32 +1042,47 @@ class connapp:
columns = int(p.group(1))
except:
columns = 80
PANEL_WIDTH = columns
if action == "run":
nodes.run(**args)
print(script["name"].upper() + "-" * (columns - len(script["name"])))
for i in nodes.status.keys():
print(" " + i + " " + "-" * (columns - len(i) - 13) + (" PASS(0)" if nodes.status[i] == 0 else " FAIL({})".format(nodes.status[i])))
if stdout:
for line in nodes.output[i].splitlines():
print(" " + line)
header = f"{script['name'].upper()}"
elif action == "test":
nodes.test(**args)
print(script["name"].upper() + "-" * (columns - len(script["name"])))
for i in nodes.status.keys():
print(" " + i + " " + "-" * (columns - len(i) - 13) + (" PASS(0)" if nodes.status[i] == 0 else " FAIL({})".format(nodes.status[i])))
if nodes.status[i] == 0:
max_length = max(len(s) for s in nodes.result[i].keys())
for k,v in nodes.result[i].items():
print(" TEST for '{}'".format(k) + " "*(max_length - len(k) + 1) + "--> " + str(v).upper())
if stdout:
if nodes.status[i] == 0:
print(" " + "-" * (max_length + 21))
for line in nodes.output[i].splitlines():
print(" " + line)
header = f"{script['name'].upper()}"
else:
print("Wrong action '{}'".format(action))
printer.error(f"Wrong action '{action}'")
exit(13)
mdprint(Rule(header, style="white"))
for node in nodes.status:
status_str = "[✓] PASS(0)" if nodes.status[node] == 0 else f"[x] FAIL({nodes.status[node]})"
title_line = f"{node}{status_str}"
test_output = Text()
if action == "test" and nodes.status[node] == 0:
results = nodes.result[node]
test_output.append("TEST RESULTS:\n")
max_key_len = max(len(k) for k in results.keys())
for k, v in results.items():
status = "[✓]" if str(v).upper() == "TRUE" else "[x]"
test_output.append(f" {k.ljust(max_key_len)} {status}\n")
output = nodes.output[node].strip()
code_block = Text()
if stdout and output:
code_block = Text(output + "\n")
if action == "test" and nodes.status[node] == 0:
highlight_words = [k for k, v in nodes.result[node].items() if str(v).upper() == "TRUE"]
code_block.highlight_words(highlight_words, style=Style(color="green", bold=True, underline=True))
panel_content = Group(test_output, Text(""), code_block)
mdprint(Panel(panel_content, title=title_line, width=PANEL_WIDTH, border_style="white"))
def _choose(self, list, name, action):
#Generates an inquirer list to pick
if FzfPrompt and self.fzf:
@@ -1429,7 +1449,7 @@ class connapp:
if subparser.description != None:
commands.append(subcommand)
commands = ",".join(commands)
usage_help = f"conn [-h] [--add | --del | --mod | --show | --debug] [node|folder] [--sftp]\n conn {{{commands}}} ..."
usage_help = f"connpy [-h] [--add | --del | --mod | --show | --debug] [node|folder] [--sftp]\n connpy {{{commands}}} ..."
return usage_help
if type == "end":
help_dict = {}
@@ -1602,5 +1622,4 @@ Here are some important instructions and tips for configuring your new node:
Please follow these instructions carefully to ensure proper configuration of your new node.
"""
# print(instructions)
mdprint(Markdown(instructions))

View File

@@ -13,6 +13,7 @@ import threading
from pathlib import Path
from copy import deepcopy
from .hooks import ClassHook, MethodHook
from . import printer
import io
#functions and classes
@@ -28,7 +29,7 @@ class node:
- result(bool): True if expected value is found after running
the commands using test method.
- status (int): 0 if the method run or test run succesfully.
- status (int): 0 if the method run or test run successfully.
1 if connection failed.
2 if expect timeouts without prompt or EOF.
@@ -254,7 +255,7 @@ class node:
if connect == True:
size = re.search('columns=([0-9]+).*lines=([0-9]+)',str(os.get_terminal_size()))
self.child.setwinsize(int(size.group(2)),int(size.group(1)))
print("Connected to " + self.unique + " at " + self.host + (":" if self.port != '' else '') + self.port + " via: " + self.protocol)
printer.success("Connected to " + self.unique + " at " + self.host + (":" if self.port != '' else '') + self.port + " via: " + self.protocol)
if 'logfile' in dir(self):
# Initialize self.mylog
if not 'mylog' in dir(self):
@@ -279,7 +280,7 @@ class node:
f.write(self._logclean(self.mylog.getvalue().decode(), True))
else:
print(connect)
printer.error(connect)
exit(1)
@MethodHook
@@ -585,7 +586,7 @@ class node:
if isinstance(self.tags, dict) and self.tags.get("console"):
child.sendline()
if debug:
print(cmd)
printer.debug(f"Command:\n{cmd}")
self.mylog = io.BytesIO()
child.logfile_read = self.mylog
@@ -645,6 +646,8 @@ class node:
sleep(1)
child.readline(0)
self.child = child
from pexpect import fdpexpect
self.raw_child = fdpexpect.fdspawn(self.child.child_fd)
return True
@ClassHook
@@ -666,7 +669,7 @@ class nodes:
Created after running method test.
- status (dict): Dictionary formed by nodes unique as keys, value:
0 if method run or test ended succesfully.
0 if method run or test ended successfully.
1 if connection failed.
2 if expect timeouts without prompt or EOF.

View File

@@ -0,0 +1,387 @@
import argparse
import sys
import subprocess
import random
import socket
import time
import threading
from pexpect import TIMEOUT
from connpy import printer
class RemoteCapture:
def __init__(self, connapp, node_name, interface, namespace=None, use_wireshark=False, tcpdump_filter=None, tcpdump_args=None):
self.connapp = connapp
self.node_name = node_name
self.interface = interface
self.namespace = namespace
self.use_wireshark = use_wireshark
self.tcpdump_filter = tcpdump_filter or []
self.tcpdump_args = tcpdump_args if isinstance(tcpdump_args, list) else []
if node_name.startswith("@"): # fuzzy match
matches = [k for k in connapp.nodes_list if node_name in k]
else:
matches = [k for k in connapp.nodes_list if k.startswith(node_name)]
if not matches:
printer.error(f"Node '{node_name}' not found.")
sys.exit(2)
elif len(matches) > 1:
matches[0] = connapp._choose(matches, "node", "capture")
if matches[0] is None:
sys.exit(7)
node_data = connapp.config.getitem(matches[0])
self.node = connapp.node(matches[0], **node_data, config=connapp.config)
if self.node.protocol != "ssh":
printer.error(f"Node '{self.node.unique}' must be an SSH connection.")
sys.exit(2)
self.wireshark_path = connapp.config.config.get("wireshark_path")
def _start_local_listener(self, port, ws_proc=None):
self.listener_active = True
self.listener_conn = None
self.listener_connected = threading.Event()
def listen():
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(("localhost", port))
s.listen(1)
printer.start(f"Listening on localhost:{port}")
conn, addr = s.accept()
self.listener_conn = conn
printer.start(f"Connection from {addr}")
self.listener_connected.set()
try:
while self.listener_active:
data = conn.recv(4096)
if not data:
break
if self.use_wireshark and ws_proc:
try:
ws_proc.stdin.write(data)
ws_proc.stdin.flush()
except BrokenPipeError:
printer.info("Wireshark closed the pipe.")
break
else:
sys.stdout.buffer.write(data)
sys.stdout.buffer.flush()
except Exception as e:
if isinstance(e, BrokenPipeError):
printer.info("Listener closed due to broken pipe.")
else:
printer.error(f"Listener error: {e}")
finally:
conn.close()
self.listener_conn = None
self.listener_thread = threading.Thread(target=listen)
self.listener_thread.daemon = True
self.listener_thread.start()
def _is_port_in_use(self, port):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
return s.connect_ex(('localhost', port)) == 0
def _find_free_port(self, start=20000, end=30000):
for _ in range(10):
port = random.randint(start, end)
if not self._is_port_in_use(port):
return port
raise RuntimeError("No free port found for SSH tunnel.")
def _monitor_wireshark(self, ws_proc):
try:
while True:
try:
ws_proc.wait(timeout=1)
self.listener_active = False
if self.listener_conn:
printer.info("Wireshark exited, stopping listener.")
try:
self.listener_conn.shutdown(socket.SHUT_RDWR)
self.listener_conn.close()
except Exception:
pass
break
except subprocess.TimeoutExpired:
if not self.listener_active:
break
time.sleep(0.2)
except Exception as e:
printer.warning(f"Error in monitor_wireshark: {e}")
def _detect_sudo_requirement(self):
base_cmd = f"tcpdump -i {self.interface} -w - -U -c 1"
if self.namespace:
base_cmd = f"ip netns exec {self.namespace} {base_cmd}"
cmds = [base_cmd, f"sudo {base_cmd}"]
printer.info(f"Verifying sudo requirement")
for cmd in cmds:
try:
self.node.child.sendline(cmd)
start_time = time.time()
while time.time() - start_time < 5:
try:
index = self.node.child.expect([
r'listening on',
r'permission denied',
r'cannot',
r'No such file or directory',
], timeout=1)
if index == 0:
self.node.child.send("\x03")
return "sudo" in cmd
else:
break
except Exception:
continue
self.node.child.send("\x03")
time.sleep(0.5)
try:
self.node.child.read_nonblocking(size=1024, timeout=0.5)
except Exception:
pass
except Exception as e:
printer.warning(f"Error during sudo detection: {e}")
continue
printer.error(f"Failed to run tcpdump on remote node '{self.node.unique}'")
sys.exit(4)
def _monitor_capture_output(self):
try:
index = self.node.child.expect([
r'Broken pipe',
r'packet[s]? captured'
], timeout=None)
if index == 0:
printer.error("Tcpdump failed: Broken pipe.")
else:
printer.success("Tcpdump finished capturing packets.")
self.listener_active = False
except:
pass
def _sendline_until_connected(self, cmd, retries=5, interval=2):
for attempt in range(1, retries + 1):
printer.info(f"Attempt {attempt}/{retries} to connect listener...")
self.node.child.sendline(cmd)
try:
index = self.node.child.expect([
r'listening on',
TIMEOUT,
r'permission',
r'not permitted',
r'invalid',
r'unrecognized',
r'Unable',
r'No such',
r'illegal',
r'syntax error'
], timeout=5)
if index == 0:
self.monitor_end = threading.Thread(target=self._monitor_capture_output)
self.monitor_end.daemon = True
self.monitor_end.start()
if self.listener_connected.wait(timeout=interval):
printer.success("Listener successfully received a connection.")
return True
else:
printer.warning("No connection yet. Retrying...")
elif index == 1:
error = f"tcpdump did not respond within the expected time.\n" \
f"Command used:\n{cmd}\n" \
f"→ Please verify the command syntax."
return f"{error}"
else:
before_last_line = self.node.child.before.decode().splitlines()[-1]
error = f"Tcpdump error detected:" \
f"{before_last_line}{self.node.child.after.decode()}{self.node.child.readline().decode()}".rstrip()
return f"{error}"
except Exception as e:
printer.warning(f"Unexpected error during tcpdump startup: {e}")
return False
return False
def _build_tcpdump_command(self):
base = f"tcpdump -i {self.interface}"
if self.use_wireshark:
base += " -w - -U"
else:
base += " -l"
if self.namespace:
base = f"ip netns exec {self.namespace} {base}"
if self.requires_sudo:
base = f"sudo {base}"
if self.tcpdump_args:
base += " " + " ".join(self.tcpdump_args)
if self.tcpdump_filter:
base += " " + " ".join(self.tcpdump_filter)
base += f" | nc localhost {self.local_port}"
return base
def run(self):
if self.use_wireshark:
if not self.wireshark_path:
printer.error("Wireshark path not set in config.\nUse '--set-wireshark-path /full/path/to/wireshark' to configure it.")
sys.exit(1)
self.local_port = self._find_free_port()
self.node.options += f" -o ExitOnForwardFailure=yes -R {self.local_port}:localhost:{self.local_port}"
connection = self.node._connect()
if connection is not True:
printer.error(f"Could not connect to {self.node.unique}\n{connection}")
sys.exit(1)
self.requires_sudo = self._detect_sudo_requirement()
tcpdump_cmd = self._build_tcpdump_command()
ws_proc = None
monitor_thread = None
if self.use_wireshark:
printer.info(f"Live capture from {self.node.unique}:{self.interface}, launching Wireshark...")
try:
ws_proc = subprocess.Popen(
[self.wireshark_path, "-k", "-i", "-"],
stdin=subprocess.PIPE,
stderr=subprocess.PIPE
)
except Exception as e:
printer.error(f"Failed to launch Wireshark: {e}\nMake sure the path is correct and Wireshark is installed.")
exit(1)
monitor_thread = threading.Thread(target=self._monitor_wireshark, args=(ws_proc,))
monitor_thread.daemon = True
monitor_thread.start()
else:
printer.info(f"Live text capture from {self.node.unique}:{self.interface}")
printer.info("Press Ctrl+C to stop.\n")
try:
self._start_local_listener(self.local_port, ws_proc=ws_proc)
time.sleep(1) # small delay before retry attempts
result = self._sendline_until_connected(tcpdump_cmd, retries=5, interval=2)
if result is not True:
if isinstance(result, str):
printer.error(f"{result}")
else:
printer.error("Listener connection failed after all retries.")
self.listener_active = False
return
while self.listener_active:
time.sleep(0.5)
except KeyboardInterrupt:
print("")
printer.warning("Capture interrupted by user.")
self.listener_active = False
finally:
if self.listener_conn:
try:
self.listener_conn.shutdown(socket.SHUT_RDWR)
self.listener_conn.close()
except:
pass
if hasattr(self.node, "child"):
self.node.child.close(force=True)
if self.listener_thread.is_alive():
self.listener_thread.join()
if monitor_thread and monitor_thread.is_alive():
monitor_thread.join()
class Parser:
def __init__(self):
self.parser = argparse.ArgumentParser(description="Capture packets remotely using a saved SSH node", epilog="All unknown arguments will be passed to tcpdump.")
self.parser.add_argument("node", nargs='?', help="Name of the saved node (must use SSH)")
self.parser.add_argument("interface", nargs='?', help="Network interface to capture on")
self.parser.add_argument("--ns", "--namespace", dest="namespace", help="Optional network namespace")
self.parser.add_argument("-w","--wireshark", action="store_true", help="Open live capture in Wireshark")
self.parser.add_argument("--set-wireshark-path", metavar="PATH", help="Set the default path to Wireshark binary")
self.parser.add_argument(
"-f", "--filter",
dest="tcpdump_filter",
metavar="ARG",
nargs="*",
default=["not", "port", "22"],
help="tcpdump filter expression (e.g., -f port 443 and udp). Default: not port 22"
)
self.parser.add_argument(
"--unknown-args",
action="store_true",
default=True,
help=argparse.SUPPRESS
)
class Entrypoint:
def __init__(self, args, parser, connapp):
if "--" in args.unknown_args:
args.unknown_args.remove("--")
if args.set_wireshark_path:
connapp._change_settings("wireshark_path", args.set_wireshark_path)
return
if not args.node or not args.interface:
parser.error("node and interface are required unless --set-wireshark-path is used")
capture = RemoteCapture(
connapp=connapp,
node_name=args.node,
interface=args.interface,
namespace=args.namespace,
use_wireshark=args.wireshark,
tcpdump_filter=args.tcpdump_filter,
tcpdump_args=args.unknown_args
)
capture.run()
def _connpy_completion(wordsnumber, words, info = None):
if wordsnumber == 3:
result = ["--help", "--set-wireshark-path"]
result.extend(info["nodes"])
elif wordsnumber == 5 and words[1] in info["nodes"]:
result = ['--wireshark', '--namespace', '--filter', '--help']
elif wordsnumber == 6 and words[3] in ["-w", "--wireshark"]:
result = ['--namespace', '--filter', '--help']
elif wordsnumber == 7 and words[3] in ["-n", "--namespace"]:
result = ['--wireshark', '--filter', '--help']
elif wordsnumber == 8:
if any(w in words for w in ["-w", "--wireshark"]) and any(w in words for w in ["-n", "--namespace"]):
result = ['--filter', '--help']
else:
result = []
return result

View File

@@ -1,6 +1,7 @@
import argparse
import yaml
import re
from connpy import printer
class context_manager:
@@ -14,10 +15,10 @@ class context_manager:
def add_context(self, context, regex):
if not context.isalnum():
print("Context name has to be alphanumeric.")
printer.error("Context name has to be alphanumeric.")
exit(1)
elif context in self.contexts:
print(f"Context {context} already exists.")
printer.error(f"Context {context} already exists.")
exit(2)
else:
self.contexts[context] = regex
@@ -25,10 +26,10 @@ class context_manager:
def modify_context(self, context, regex):
if context == "all":
print("Can't modify default context: all")
printer.error("Can't modify default context: all")
exit(3)
elif context not in self.contexts:
print(f"Context {context} doesn't exist.")
printer.error(f"Context {context} doesn't exist.")
exit(4)
else:
self.contexts[context] = regex
@@ -36,13 +37,13 @@ class context_manager:
def delete_context(self, context):
if context == "all":
print("Can't delete default context: all")
printer.error("Can't delete default context: all")
exit(3)
elif context not in self.contexts:
print(f"Context {context} doesn't exist.")
printer.error(f"Context {context} doesn't exist.")
exit(4)
if context == self.current_context:
print(f"Can't delete current context: {self.current_context}")
printer.error(f"Can't delete current context: {self.current_context}")
exit(5)
else:
self.contexts.pop(context)
@@ -51,26 +52,27 @@ class context_manager:
def list_contexts(self):
for key in self.contexts.keys():
if key == self.current_context:
print(f"{key} * (active)")
printer.success(f"{key} (active)")
else:
print(key)
printer.custom(" ",key)
def set_context(self, context):
if context not in self.contexts:
print(f"Context {context} doesn't exist.")
printer.error(f"Context {context} doesn't exist.")
exit(4)
elif context == self.current_context:
print(f"Context {context} already set")
printer.info(f"Context {context} already set")
exit(0)
else:
self.connapp._change_settings("current_context", context)
def show_context(self, context):
if context not in self.contexts:
print(f"Context {context} doesn't exist.")
printer.error(f"Context {context} doesn't exist.")
exit(4)
else:
yaml_output = yaml.dump(self.contexts[context], sort_keys=False, default_flow_style=False)
printer.custom(context,"")
print(yaml_output)
@@ -113,18 +115,17 @@ class Preload:
class Parser:
def __init__(self):
self.parser = argparse.ArgumentParser(description="Manage contexts with regex matching", formatter_class=argparse.RawTextHelpFormatter)
self.description = "Manage contexts with regex matching"
# Define the context name as a positional argument
self.parser.add_argument("context_name", help="Name of the context", nargs='?')
group = self.parser.add_mutually_exclusive_group(required=True)
group.add_argument("-a", "--add", nargs='+', help='Add a new context with regex values. Usage: context -a name "regex1" "regex2"')
group.add_argument("-r", "--rm", "--del", action='store_true', help="Delete a context. Usage: context -d name")
group.add_argument("--ls", action='store_true', help="List all contexts. Usage: context --list")
group.add_argument("--set", action='store_true', help="Set the used context. Usage: context --set name")
group.add_argument("-s", "--show", action='store_true', help="Show the defined regex of a context. Usage: context --show name")
group.add_argument("-e", "--edit", "--mod", nargs='+', help='Modify an existing context. Usage: context --mod name "regex1" "regex2"')
group.add_argument("-a", "--add", nargs='+', help='Add a new context with regex values.\nUsage: context -a name "regex1" "regex2"')
group.add_argument("-r", "--rm", "--del", action='store_true', help="Delete a context.\nUsage: context -d name")
group.add_argument("--ls", action='store_true', help="List all contexts.\nUsage: context --ls")
group.add_argument("--set", action='store_true', help="Set the used context.\nUsage: context --set name")
group.add_argument("-s", "--show", action='store_true', help="Show the defined regex of a context.\nUsage: context --show name")
group.add_argument("-e", "--edit", "--mod", nargs='+', help='Modify an existing context.\nUsage: context --mod name "regex1" "regex2"')
class Entrypoint:
def __init__(self, args, parser, connapp):

View File

@@ -7,6 +7,7 @@ import tempfile
import io
import yaml
import threading
from connpy import printer
from google.oauth2.credentials import Credentials
from google.auth.transport.requests import Request
from googleapiclient.discovery import build
@@ -50,33 +51,33 @@ class sync:
with open(self.token_file, 'w') as token:
token.write(creds.to_json())
print("Logged in successfully.")
printer.success("Logged in successfully.")
except RefreshError as e:
# If refresh fails, delete the invalid token file and start a new login flow
if os.path.exists(self.token_file):
os.remove(self.token_file)
print("Existing token was invalid and has been removed. Please log in again.")
printer.warning("Existing token was invalid and has been removed. Please log in again.")
flow = InstalledAppFlow.from_client_secrets_file(
self.google_client, self.scopes)
creds = flow.run_local_server(port=0, access_type='offline')
with open(self.token_file, 'w') as token:
token.write(creds.to_json())
print("Logged in successfully after re-authentication.")
printer.success("Logged in successfully after re-authentication.")
def logout(self):
if os.path.exists(self.token_file):
os.remove(self.token_file)
print("Logged out successfully.")
printer.success("Logged out successfully.")
else:
print("No credentials file found. Already logged out.")
printer.info("No credentials file found. Already logged out.")
def get_credentials(self):
# Load credentials from token.json
if os.path.exists(self.token_file):
creds = Credentials.from_authorized_user_file(self.token_file, self.scopes)
else:
print("Credentials file not found.")
printer.error("Credentials file not found.")
return 0
# If there are no valid credentials available, ask the user to log in again
@@ -85,10 +86,10 @@ class sync:
try:
creds.refresh(Request())
except RefreshError:
print("Could not refresh access token. Please log in again.")
printer.warning("Could not refresh access token. Please log in again.")
return 0
else:
print("Credentials are missing or invalid. Please log in.")
printer.warning("Credentials are missing or invalid. Please log in.")
return 0
return creds
@@ -114,8 +115,8 @@ class sync:
return False
def status(self):
print(f"Login: {self.check_login_status()}")
print(f"Sync: {self.sync}")
printer.info(f"Login: {self.check_login_status()}")
printer.info(f"Sync: {self.sync}")
def get_appdata_files(self):
@@ -151,17 +152,18 @@ class sync:
return files_info
except HttpError as error:
print(f"An error occurred: {error}")
printer.error(f"An error occurred: {error}")
return 0
def dump_appdata_files_yaml(self):
files_info = self.get_appdata_files()
if not files_info:
print("Failed to retrieve files or no files found.")
printer.error("Failed to retrieve files or no files found.")
return
# Pretty print as YAML
yaml_output = yaml.dump(files_info, sort_keys=False, default_flow_style=False)
printer.custom("backups","")
print(yaml_output)
@@ -233,16 +235,16 @@ class sync:
oldest_file = min(app_data_files, key=lambda x: x['timestamp'])
delete_old = self.delete_file_by_id(oldest_file['id'])
if delete_old:
print(delete_old)
printer.error(delete_old)
return 1
# Upload the new file
upload_new = self.backup_file_to_drive(zip_path, timestamp)
if upload_new:
print(upload_new)
printer.error(upload_new)
return 1
print("Backup to google uploaded successfully.")
printer.success("Backup to google uploaded successfully.")
return 0
def decompress_zip(self, zip_path):
@@ -253,7 +255,7 @@ class sync:
zipf.extract(".osk", os.path.dirname(self.key))
return 0
except Exception as e:
print(f"An error occurred: {e}")
printer.error(f"An error occurred: {e}")
return 1
def download_file_by_id(self, file_id, destination_path):
@@ -282,14 +284,14 @@ class sync:
# Get the files in the app data folder
app_data_files = self.get_appdata_files()
if not app_data_files:
print("No files found in app data folder.")
printer.error("No files found in app data folder.")
return 1
# Check if a specific file_id was provided and if it exists in the list
if file_id:
selected_file = next((f for f in app_data_files if f['id'] == file_id), None)
if not selected_file:
print(f"No file found with ID: {file_id}")
printer.error(f"No file found with ID: {file_id}")
return 1
else:
# Find the latest file based on timestamp
@@ -302,10 +304,10 @@ class sync:
# Unzip the downloaded file to the destination folder
if self.decompress_zip(temp_download_path):
print("Failed to decompress the file.")
printer.error("Failed to decompress the file.")
return 1
print(f"Backup from Google Drive restored successfully: {selected_file['name']}")
printer.success(f"Backup from Google Drive restored successfully: {selected_file['name']}")
return 0
def config_listener_post(self, args, kwargs):
@@ -314,7 +316,7 @@ class sync:
if not kwargs["result"]:
self.compress_and_upload()
else:
print("Sync cannot be performed. Please check your login status.")
printer.warning("Sync cannot be performed. Please check your login status.")
return kwargs["result"]
def config_listener_pre(self, *args, **kwargs):
@@ -337,7 +339,6 @@ class Preload:
class Parser:
def __init__(self):
self.parser = argparse.ArgumentParser(description="Sync config with Google")
self.description = "Sync config with Google"
subparsers = self.parser.add_subparsers(title="Commands", dest='command',metavar="")
login_parser = subparsers.add_parser("login", help="Login to Google to enable synchronization")
logout_parser = subparsers.add_parser("logout", help="Logout from Google")

View File

@@ -1,6 +1,7 @@
#!/usr/bin/env python3
#Imports
from functools import wraps, partial, update_wrapper
from . import printer
#functions and classes
@@ -19,7 +20,7 @@ class MethodHook:
try:
args, kwargs = hook(*args, **kwargs)
except Exception as e:
print(f"{self.func.__name__} Pre-hook {hook.__name__} raised an exception: {e}")
printer.error(f"{self.func.__name__} Pre-hook {hook.__name__} raised an exception: {e}")
try:
result = self.func(*args, **kwargs)
@@ -30,7 +31,7 @@ class MethodHook:
try:
result = hook(*args, **kwargs, result=result) # Pass result to hooks
except Exception as e:
print(f"{self.func.__name__} Post-hook {hook.__name__} raised an exception: {e}")
printer.error(f"{self.func.__name__} Post-hook {hook.__name__} raised an exception: {e}")
return result

View File

@@ -4,6 +4,7 @@ import importlib.util
import sys
import argparse
import os
from connpy import printer
class Plugins:
def __init__(self):
@@ -30,8 +31,7 @@ class Plugins:
### Verifications:
- The presence of only allowed top-level elements.
- The existence of two specific classes: 'Parser' and 'Entrypoint'. and/or specific class: Preload.
- 'Parser' class must only have an '__init__' method and must assign 'self.parser'
and 'self.description'.
- 'Parser' class must only have an '__init__' method and must assign 'self.parser'.
- 'Entrypoint' class must have an '__init__' method accepting specific arguments.
If any of these checks fail, the function returns an error message indicating
@@ -77,11 +77,12 @@ class Plugins:
if not all(isinstance(method, ast.FunctionDef) and method.name == '__init__' for method in node.body):
return "Parser class should only have __init__ method"
# Check if 'self.parser' and 'self.description' are assigned in __init__ method
# Check if 'self.parser' is assigned in __init__ method
init_method = node.body[0]
assigned_attrs = [target.attr for expr in init_method.body if isinstance(expr, ast.Assign) for target in expr.targets if isinstance(target, ast.Attribute) and isinstance(target.value, ast.Name) and target.value.id == 'self']
if 'parser' not in assigned_attrs or 'description' not in assigned_attrs:
return "Parser class should set self.parser and self.description" # 'self.parser' or 'self.description' not assigned in __init__
if 'parser' not in assigned_attrs:
return "Parser class should set self.parser"
elif node.name == 'Entrypoint':
has_entrypoint = True
@@ -124,13 +125,14 @@ class Plugins:
filepath = os.path.join(directory, filename)
check_file = self.verify_script(filepath)
if check_file:
print(f"Failed to load plugin: {filename}. Reason: {check_file}")
printer.error(f"Failed to load plugin: {filename}. Reason: {check_file}")
continue
else:
self.plugins[root_filename] = self._import_from_path(filepath)
if hasattr(self.plugins[root_filename], "Parser"):
self.plugin_parsers[root_filename] = self.plugins[root_filename].Parser()
subparsers.add_parser(root_filename, parents=[self.plugin_parsers[root_filename].parser], add_help=False, description=self.plugin_parsers[root_filename].description)
plugin = self.plugin_parsers[root_filename]
subparsers.add_parser(root_filename, parents=[self.plugin_parsers[root_filename].parser], add_help=False, usage=plugin.parser.usage, description=plugin.parser.description, epilog=plugin.parser.epilog, formatter_class=plugin.parser.formatter_class)
if hasattr(self.plugins[root_filename], "Preload"):
self.preloads[root_filename] = self.plugins[root_filename]

33
connpy/printer.py Normal file
View File

@@ -0,0 +1,33 @@
import sys
def _format_multiline(tag, message):
lines = message.splitlines()
if not lines:
return f"[{tag}]"
formatted = [f"[{tag}] {lines[0]}"]
indent = " " * (len(tag) + 3)
for line in lines[1:]:
formatted.append(f"{indent}{line}")
return "\n".join(formatted)
def info(message):
print(_format_multiline("i", message))
def success(message):
print(_format_multiline("", message))
def start(message):
print(_format_multiline("+", message))
def warning(message):
print(_format_multiline("!", message))
def error(message):
print(_format_multiline("", message), file=sys.stderr)
def debug(message):
print(_format_multiline("d", message))
def custom(tag, message):
print(_format_multiline(tag, message))

View File

@@ -143,9 +143,8 @@ options:
<li><strong>Purpose</strong>: Handles parsing of command-line arguments.</li>
<li><strong>Requirements</strong>:</li>
<li>Must contain only one method: <code>__init__</code>.</li>
<li>The <code>__init__</code> method must initialize at least two attributes:<ul>
<li>The <code>__init__</code> method must initialize at least one attribute:<ul>
<li><code>self.parser</code>: An instance of <code>argparse.ArgumentParser</code>.</li>
<li><code>self.description</code>: A string containing the description of the parser.</li>
</ul>
</li>
</ul>
@@ -270,6 +269,89 @@ connapp.ai.some_method.register_pre_hook(pre_processing_hook)</p>
<li><code>if __name__ == "__main__":</code></li>
<li>This block allows the plugin to be run as a standalone script for testing or independent use.</li>
</ul>
<h3 id="command-completion-support">Command Completion Support</h3>
<p>Plugins can provide intelligent <strong>tab completion</strong> by defining a function called <code>_connpy_completion</code> in the plugin script. This function will be called by Connpy to assist with command-line completion when the user types partial input.</p>
<h4 id="function-signature">Function Signature</h4>
<pre><code>def _connpy_completion(wordsnumber, words, info=None):
...
</code></pre>
<h4 id="parameters">Parameters</h4>
<table>
<thead>
<tr>
<th>Parameter</th>
<th>Description</th>
</tr>
</thead>
<tbody>
<tr>
<td><code>wordsnumber</code></td>
<td>Integer indicating the number of words (space-separated tokens) currently on the command line. For plugins, this typically starts at 3 (e.g., <code>connpy &lt;plugin&gt; ...</code>).</td>
</tr>
<tr>
<td><code>words</code></td>
<td>A list of tokens (words) already typed. <code>words[0]</code> is always the name of the plugin, followed by any subcommands or arguments.</td>
</tr>
<tr>
<td><code>info</code></td>
<td>A dictionary of structured context data provided by Connpy to help with suggestions.</td>
</tr>
</tbody>
</table>
<h4 id="contents-of-info">Contents of <code>info</code></h4>
<p>The <code>info</code> dictionary contains helpful context to generate completions:</p>
<pre><code>info = {
&quot;config&quot;: config_dict, # The full loaded configuration
&quot;nodes&quot;: node_list, # List of all known node names
&quot;folders&quot;: folder_list, # List of all defined folder names
&quot;profiles&quot;: profile_list, # List of all profile names
&quot;plugins&quot;: plugin_list # List of all plugin names
}
</code></pre>
<p>You can use this data to generate suggestions based on the current input.</p>
<h4 id="return-value">Return Value</h4>
<p>The function must return a list of suggestion strings to be presented to the user.</p>
<h4 id="example">Example</h4>
<pre><code>def _connpy_completion(wordsnumber, words, info=None):
if wordsnumber == 3:
return [&quot;--help&quot;, &quot;--verbose&quot;, &quot;start&quot;, &quot;stop&quot;]
elif wordsnumber == 4 and words[2] == &quot;start&quot;:
return info[&quot;nodes&quot;] # Suggest node names
return []
</code></pre>
<blockquote>
<p>In this example, if the user types <code><a title="connpy" href="#connpy">connpy</a> myplugin start </code> and presses Tab, it will suggest node names.</p>
</blockquote>
<h3 id="handling-unknown-arguments">Handling Unknown Arguments</h3>
<p>Plugins can choose to accept and process unknown arguments that are <strong>not explicitly defined</strong> in the parser. To enable this behavior, the plugin must define the following hidden argument in its <code>Parser</code> class:</p>
<pre><code>self.parser.add_argument(
&quot;--unknown-args&quot;,
action=&quot;store_true&quot;,
default=True,
help=argparse.SUPPRESS
)
</code></pre>
<h4 id="behavior">Behavior:</h4>
<ul>
<li>When this argument is present, Connpy will parse the known arguments and capture any extra (unknown) ones.</li>
<li>These unknown arguments will be passed to the plugin as <code>args.unknown_args</code> inside the <code>Entrypoint</code>.</li>
<li>If the user does not pass any unknown arguments, <code>args.unknown_args</code> will contain the default value (<code>True</code>, unless overridden).</li>
</ul>
<h4 id="example_1">Example:</h4>
<p>If a plugin accepts unknown tcpdump flags like this:</p>
<pre><code>connpy myplugin -nn -s0
</code></pre>
<p>And defines the hidden <code>--unknown-args</code> flag as shown above, then:</p>
<ul>
<li><code>args.unknown_args</code> inside <code>Entrypoint.__init__()</code> will be: <code>['-nn', '-s0']</code></li>
</ul>
<blockquote>
<p>This allows the plugin to receive and process arguments intended for external tools (e.g., <code>tcpdump</code>) without argparse raising an error.</p>
</blockquote>
<h4 id="note">Note:</h4>
<p>If a plugin does <strong>not</strong> define <code>--unknown-args</code>, any extra arguments passed will cause argparse to fail with an unrecognized arguments error.</p>
<h3 id="script-verification">Script Verification</h3>
<ul>
<li>The <code>verify_script</code> method in <code>plugins.py</code> is used to check the plugin script's compliance with these standards.</li>
@@ -481,8 +563,7 @@ print(result)
### Verifications:
- The presence of only allowed top-level elements.
- The existence of two specific classes: &#39;Parser&#39; and &#39;Entrypoint&#39;. and/or specific class: Preload.
- &#39;Parser&#39; class must only have an &#39;__init__&#39; method and must assign &#39;self.parser&#39;
and &#39;self.description&#39;.
- &#39;Parser&#39; class must only have an &#39;__init__&#39; method and must assign &#39;self.parser&#39;.
- &#39;Entrypoint&#39; class must have an &#39;__init__&#39; method accepting specific arguments.
If any of these checks fail, the function returns an error message indicating
@@ -528,11 +609,12 @@ print(result)
if not all(isinstance(method, ast.FunctionDef) and method.name == &#39;__init__&#39; for method in node.body):
return &#34;Parser class should only have __init__ method&#34;
# Check if &#39;self.parser&#39; and &#39;self.description&#39; are assigned in __init__ method
# Check if &#39;self.parser&#39; is assigned in __init__ method
init_method = node.body[0]
assigned_attrs = [target.attr for expr in init_method.body if isinstance(expr, ast.Assign) for target in expr.targets if isinstance(target, ast.Attribute) and isinstance(target.value, ast.Name) and target.value.id == &#39;self&#39;]
if &#39;parser&#39; not in assigned_attrs or &#39;description&#39; not in assigned_attrs:
return &#34;Parser class should set self.parser and self.description&#34; # &#39;self.parser&#39; or &#39;self.description&#39; not assigned in __init__
if &#39;parser&#39; not in assigned_attrs:
return &#34;Parser class should set self.parser&#34;
elif node.name == &#39;Entrypoint&#39;:
has_entrypoint = True
@@ -575,13 +657,14 @@ print(result)
filepath = os.path.join(directory, filename)
check_file = self.verify_script(filepath)
if check_file:
print(f&#34;Failed to load plugin: {filename}. Reason: {check_file}&#34;)
printer.error(f&#34;Failed to load plugin: {filename}. Reason: {check_file}&#34;)
continue
else:
self.plugins[root_filename] = self._import_from_path(filepath)
if hasattr(self.plugins[root_filename], &#34;Parser&#34;):
self.plugin_parsers[root_filename] = self.plugins[root_filename].Parser()
subparsers.add_parser(root_filename, parents=[self.plugin_parsers[root_filename].parser], add_help=False, description=self.plugin_parsers[root_filename].description)
plugin = self.plugin_parsers[root_filename]
subparsers.add_parser(root_filename, parents=[self.plugin_parsers[root_filename].parser], add_help=False, usage=plugin.parser.usage, description=plugin.parser.description, epilog=plugin.parser.epilog, formatter_class=plugin.parser.formatter_class)
if hasattr(self.plugins[root_filename], &#34;Preload&#34;):
self.preloads[root_filename] = self.plugins[root_filename]</code></pre>
</details>
@@ -615,8 +698,7 @@ print(result)
### Verifications:
- The presence of only allowed top-level elements.
- The existence of two specific classes: &#39;Parser&#39; and &#39;Entrypoint&#39;. and/or specific class: Preload.
- &#39;Parser&#39; class must only have an &#39;__init__&#39; method and must assign &#39;self.parser&#39;
and &#39;self.description&#39;.
- &#39;Parser&#39; class must only have an &#39;__init__&#39; method and must assign &#39;self.parser&#39;.
- &#39;Entrypoint&#39; class must have an &#39;__init__&#39; method accepting specific arguments.
If any of these checks fail, the function returns an error message indicating
@@ -662,11 +744,12 @@ print(result)
if not all(isinstance(method, ast.FunctionDef) and method.name == &#39;__init__&#39; for method in node.body):
return &#34;Parser class should only have __init__ method&#34;
# Check if &#39;self.parser&#39; and &#39;self.description&#39; are assigned in __init__ method
# Check if &#39;self.parser&#39; is assigned in __init__ method
init_method = node.body[0]
assigned_attrs = [target.attr for expr in init_method.body if isinstance(expr, ast.Assign) for target in expr.targets if isinstance(target, ast.Attribute) and isinstance(target.value, ast.Name) and target.value.id == &#39;self&#39;]
if &#39;parser&#39; not in assigned_attrs or &#39;description&#39; not in assigned_attrs:
return &#34;Parser class should set self.parser and self.description&#34; # &#39;self.parser&#39; or &#39;self.description&#39; not assigned in __init__
if &#39;parser&#39; not in assigned_attrs:
return &#34;Parser class should set self.parser&#34;
elif node.name == &#39;Entrypoint&#39;:
has_entrypoint = True
@@ -706,8 +789,7 @@ and that it includes mandatory classes with specific attributes and methods.</p>
<h3 id="verifications">Verifications:</h3>
<pre><code>- The presence of only allowed top-level elements.
- The existence of two specific classes: 'Parser' and 'Entrypoint'. and/or specific class: Preload.
- 'Parser' class must only have an '__init__' method and must assign 'self.parser'
and 'self.description'.
- 'Parser' class must only have an '__init__' method and must assign 'self.parser'.
- 'Entrypoint' class must have an '__init__' method accepting specific arguments.
</code></pre>
<p>If any of these checks fail, the function returns an error message indicating
@@ -2110,7 +2192,7 @@ class node:
- result(bool): True if expected value is found after running
the commands using test method.
- status (int): 0 if the method run or test run succesfully.
- status (int): 0 if the method run or test run successfully.
1 if connection failed.
2 if expect timeouts without prompt or EOF.
@@ -2336,7 +2418,7 @@ class node:
if connect == True:
size = re.search(&#39;columns=([0-9]+).*lines=([0-9]+)&#39;,str(os.get_terminal_size()))
self.child.setwinsize(int(size.group(2)),int(size.group(1)))
print(&#34;Connected to &#34; + self.unique + &#34; at &#34; + self.host + (&#34;:&#34; if self.port != &#39;&#39; else &#39;&#39;) + self.port + &#34; via: &#34; + self.protocol)
printer.success(&#34;Connected to &#34; + self.unique + &#34; at &#34; + self.host + (&#34;:&#34; if self.port != &#39;&#39; else &#39;&#39;) + self.port + &#34; via: &#34; + self.protocol)
if &#39;logfile&#39; in dir(self):
# Initialize self.mylog
if not &#39;mylog&#39; in dir(self):
@@ -2361,7 +2443,7 @@ class node:
f.write(self._logclean(self.mylog.getvalue().decode(), True))
else:
print(connect)
printer.error(connect)
exit(1)
@MethodHook
@@ -2667,7 +2749,7 @@ class node:
if isinstance(self.tags, dict) and self.tags.get(&#34;console&#34;):
child.sendline()
if debug:
print(cmd)
printer.debug(f&#34;Command:\n{cmd}&#34;)
self.mylog = io.BytesIO()
child.logfile_read = self.mylog
@@ -2727,6 +2809,8 @@ class node:
sleep(1)
child.readline(0)
self.child = child
from pexpect import fdpexpect
self.raw_child = fdpexpect.fdspawn(self.child.child_fd)
return True</code></pre>
</details>
<div class="desc"><p>This class generates a node object. Containts all the information and methods to connect and interact with a device using ssh or telnet.</p>
@@ -2737,7 +2821,7 @@ class node:
- result(bool): True if expected value is found after running
the commands using test method.
- status (int): 0 if the method run or test run succesfully.
- status (int): 0 if the method run or test run successfully.
1 if connection failed.
2 if expect timeouts without prompt or EOF.
</code></pre>
@@ -2796,7 +2880,7 @@ def interact(self, debug = False):
if connect == True:
size = re.search(&#39;columns=([0-9]+).*lines=([0-9]+)&#39;,str(os.get_terminal_size()))
self.child.setwinsize(int(size.group(2)),int(size.group(1)))
print(&#34;Connected to &#34; + self.unique + &#34; at &#34; + self.host + (&#34;:&#34; if self.port != &#39;&#39; else &#39;&#39;) + self.port + &#34; via: &#34; + self.protocol)
printer.success(&#34;Connected to &#34; + self.unique + &#34; at &#34; + self.host + (&#34;:&#34; if self.port != &#39;&#39; else &#39;&#39;) + self.port + &#34; via: &#34; + self.protocol)
if &#39;logfile&#39; in dir(self):
# Initialize self.mylog
if not &#39;mylog&#39; in dir(self):
@@ -2821,7 +2905,7 @@ def interact(self, debug = False):
f.write(self._logclean(self.mylog.getvalue().decode(), True))
else:
print(connect)
printer.error(connect)
exit(1)</code></pre>
</details>
<div class="desc"><p>Allow user to interact with the node directly, mostly used by connection manager.</p>
@@ -3143,7 +3227,7 @@ class nodes:
Created after running method test.
- status (dict): Dictionary formed by nodes unique as keys, value:
0 if method run or test ended succesfully.
0 if method run or test ended successfully.
1 if connection failed.
2 if expect timeouts without prompt or EOF.
@@ -3365,7 +3449,7 @@ class nodes:
Created after running method test.
- status (dict): Dictionary formed by nodes unique as keys, value:
0 if method run or test ended succesfully.
0 if method run or test ended successfully.
1 if connection failed.
2 if expect timeouts without prompt or EOF.
@@ -3673,6 +3757,20 @@ def test(self, commands, expected, vars = None,*, prompt = None, parallel = 10,
</ul>
</li>
<li><a href="#executable-block">Executable Block</a></li>
<li><a href="#command-completion-support">Command Completion Support</a><ul>
<li><a href="#function-signature">Function Signature</a></li>
<li><a href="#parameters">Parameters</a></li>
<li><a href="#contents-of-info">Contents of info</a></li>
<li><a href="#return-value">Return Value</a></li>
<li><a href="#example">Example</a></li>
</ul>
</li>
<li><a href="#handling-unknown-arguments">Handling Unknown Arguments</a><ul>
<li><a href="#behavior">Behavior:</a></li>
<li><a href="#example_1">Example:</a></li>
<li><a href="#note">Note:</a></li>
</ul>
</li>
<li><a href="#script-verification">Script Verification</a></li>
<li><a href="#example-script">Example Script</a></li>
</ul>

View File

@@ -3,7 +3,7 @@ Flask_Cors>=4.0.1
google_api_python_client>=2.125.0
google_auth_oauthlib>=1.2.0
inquirer>=3.3.0
openai>=0.27.8
openai>=1.98.0
pexpect>=4.8.0
protobuf>=5.27.2
pycryptodome>=3.18.0