diff --git a/README.md b/README.md
index c39f41c..7c55ab2 100644
--- a/README.md
+++ b/README.md
@@ -154,9 +154,8 @@ options:
- **Purpose**: Handles parsing of command-line arguments.
- **Requirements**:
- Must contain only one method: `__init__`.
- - The `__init__` method must initialize at least two attributes:
+ - The `__init__` method must initialize at least one attribute:
- `self.parser`: An instance of `argparse.ArgumentParser`.
- - `self.description`: A string containing the description of the parser.
2. **Class `Entrypoint`**:
- **Purpose**: Acts as the entry point for plugin execution, utilizing parsed arguments and integrating with the main application.
- **Requirements**:
@@ -253,6 +252,97 @@ There are 2 methods that allows you to define custom logic to be executed before
- `if __name__ == "__main__":`
- This block allows the plugin to be run as a standalone script for testing or independent use.
+### Command Completion Support
+
+Plugins can provide intelligent **tab completion** by defining a function called `_connpy_completion` in the plugin script. This function will be called by Connpy to assist with command-line completion when the user types partial input.
+
+#### Function Signature
+
+```
+def _connpy_completion(wordsnumber, words, info=None):
+ ...
+```
+
+#### Parameters
+
+| Parameter | Description |
+|----------------|-------------|
+| `wordsnumber` | Integer indicating the number of words (space-separated tokens) currently on the command line. For plugins, this typically starts at 3 (e.g., `connpy ...`). |
+| `words` | A list of tokens (words) already typed. `words[0]` is always the name of the plugin, followed by any subcommands or arguments. |
+| `info` | A dictionary of structured context data provided by Connpy to help with suggestions. |
+
+#### Contents of `info`
+
+The `info` dictionary contains helpful context to generate completions:
+
+```
+info = {
+ "config": config_dict, # The full loaded configuration
+ "nodes": node_list, # List of all known node names
+ "folders": folder_list, # List of all defined folder names
+ "profiles": profile_list, # List of all profile names
+ "plugins": plugin_list # List of all plugin names
+}
+```
+
+You can use this data to generate suggestions based on the current input.
+
+#### Return Value
+
+The function must return a list of suggestion strings to be presented to the user.
+
+#### Example
+
+```
+def _connpy_completion(wordsnumber, words, info=None):
+ if wordsnumber == 3:
+ return ["--help", "--verbose", "start", "stop"]
+
+ elif wordsnumber == 4 and words[2] == "start":
+ return info["nodes"] # Suggest node names
+
+ return []
+```
+
+> In this example, if the user types `connpy myplugin start ` and presses Tab, it will suggest node names.
+
+### Handling Unknown Arguments
+
+Plugins can choose to accept and process unknown arguments that are **not explicitly defined** in the parser. To enable this behavior, the plugin must define the following hidden argument in its `Parser` class:
+
+```
+self.parser.add_argument(
+ "--unknown-args",
+ action="store_true",
+ default=True,
+ help=argparse.SUPPRESS
+)
+```
+
+#### Behavior:
+
+- When this argument is present, Connpy will parse the known arguments and capture any extra (unknown) ones.
+- These unknown arguments will be passed to the plugin as `args.unknown_args` inside the `Entrypoint`.
+- If the user does not pass any unknown arguments, `args.unknown_args` will contain the default value (`True`, unless overridden).
+
+#### Example:
+
+If a plugin accepts unknown tcpdump flags like this:
+
+```
+connpy myplugin -nn -s0
+```
+
+And defines the hidden `--unknown-args` flag as shown above, then:
+
+- `args.unknown_args` inside `Entrypoint.__init__()` will be: `['-nn', '-s0']`
+
+> This allows the plugin to receive and process arguments intended for external tools (e.g., `tcpdump`) without argparse raising an error.
+
+#### Note:
+
+If a plugin does **not** define `--unknown-args`, any extra arguments passed will cause argparse to fail with an unrecognized arguments error.
+
### Script Verification
- The `verify_script` method in `plugins.py` is used to check the plugin script's compliance with these standards.
- Non-compliant scripts will be rejected to ensure consistency and proper functionality within the plugin system.
diff --git a/connpy/__init__.py b/connpy/__init__.py
index e11e917..2ce91e7 100644
--- a/connpy/__init__.py
+++ b/connpy/__init__.py
@@ -112,9 +112,8 @@ options:
- **Purpose**: Handles parsing of command-line arguments.
- **Requirements**:
- Must contain only one method: `__init__`.
- - The `__init__` method must initialize at least two attributes:
+ - The `__init__` method must initialize at least one attribute:
- `self.parser`: An instance of `argparse.ArgumentParser`.
- - `self.description`: A string containing the description of the parser.
2. **Class `Entrypoint`**:
- **Purpose**: Acts as the entry point for plugin execution, utilizing parsed arguments and integrating with the main application.
- **Requirements**:
@@ -210,6 +209,97 @@ There are 2 methods that allows you to define custom logic to be executed before
- `if __name__ == "__main__":`
- This block allows the plugin to be run as a standalone script for testing or independent use.
+### Command Completion Support
+
+Plugins can provide intelligent **tab completion** by defining a function called `_connpy_completion` in the plugin script. This function will be called by Connpy to assist with command-line completion when the user types partial input.
+
+#### Function Signature
+
+```
+def _connpy_completion(wordsnumber, words, info=None):
+ ...
+```
+
+#### Parameters
+
+| Parameter | Description |
+|----------------|-------------|
+| `wordsnumber` | Integer indicating the number of words (space-separated tokens) currently on the command line. For plugins, this typically starts at 3 (e.g., `connpy ...`). |
+| `words` | A list of tokens (words) already typed. `words[0]` is always the name of the plugin, followed by any subcommands or arguments. |
+| `info` | A dictionary of structured context data provided by Connpy to help with suggestions. |
+
+#### Contents of `info`
+
+The `info` dictionary contains helpful context to generate completions:
+
+```
+info = {
+ "config": config_dict, # The full loaded configuration
+ "nodes": node_list, # List of all known node names
+ "folders": folder_list, # List of all defined folder names
+ "profiles": profile_list, # List of all profile names
+ "plugins": plugin_list # List of all plugin names
+}
+```
+
+You can use this data to generate suggestions based on the current input.
+
+#### Return Value
+
+The function must return a list of suggestion strings to be presented to the user.
+
+#### Example
+
+```
+def _connpy_completion(wordsnumber, words, info=None):
+ if wordsnumber == 3:
+ return ["--help", "--verbose", "start", "stop"]
+
+ elif wordsnumber == 4 and words[2] == "start":
+ return info["nodes"] # Suggest node names
+
+ return []
+```
+
+> In this example, if the user types `connpy myplugin start ` and presses Tab, it will suggest node names.
+
+### Handling Unknown Arguments
+
+Plugins can choose to accept and process unknown arguments that are **not explicitly defined** in the parser. To enable this behavior, the plugin must define the following hidden argument in its `Parser` class:
+
+```
+self.parser.add_argument(
+ "--unknown-args",
+ action="store_true",
+ default=True,
+ help=argparse.SUPPRESS
+)
+```
+
+#### Behavior:
+
+- When this argument is present, Connpy will parse the known arguments and capture any extra (unknown) ones.
+- These unknown arguments will be passed to the plugin as `args.unknown_args` inside the `Entrypoint`.
+- If the user does not pass any unknown arguments, `args.unknown_args` will contain the default value (`True`, unless overridden).
+
+#### Example:
+
+If a plugin accepts unknown tcpdump flags like this:
+
+```
+connpy myplugin -nn -s0
+```
+
+And defines the hidden `--unknown-args` flag as shown above, then:
+
+- `args.unknown_args` inside `Entrypoint.__init__()` will be: `['-nn', '-s0']`
+
+> This allows the plugin to receive and process arguments intended for external tools (e.g., `tcpdump`) without argparse raising an error.
+
+#### Note:
+
+If a plugin does **not** define `--unknown-args`, any extra arguments passed will cause argparse to fail with an unrecognized arguments error.
+
### Script Verification
- The `verify_script` method in `plugins.py` is used to check the plugin script's compliance with these standards.
- Non-compliant scripts will be rejected to ensure consistency and proper functionality within the plugin system.
@@ -422,8 +512,9 @@ from .ai import ai
from .plugins import Plugins
from ._version import __version__
from pkg_resources import get_distribution
+from . import printer
-__all__ = ["node", "nodes", "configfile", "connapp", "ai", "Plugins"]
+__all__ = ["node", "nodes", "configfile", "connapp", "ai", "Plugins", "printer"]
__author__ = "Federico Luzzi"
__pdoc__ = {
'core': False,
@@ -438,5 +529,6 @@ __pdoc__ = {
'node.deferred_class_hooks': False,
'nodes.deferred_class_hooks': False,
'connapp': False,
- 'connapp.encrypt': True
+ 'connapp.encrypt': True,
+ 'printer': False
}
diff --git a/connpy/_version.py b/connpy/_version.py
index 8fdd68e..6cab941 100644
--- a/connpy/_version.py
+++ b/connpy/_version.py
@@ -1,2 +1,2 @@
-__version__ = "4.1.4"
+__version__ = "4.2b1"
diff --git a/connpy/api.py b/connpy/api.py
index 99f10c8..8260465 100755
--- a/connpy/api.py
+++ b/connpy/api.py
@@ -1,6 +1,6 @@
from flask import Flask, request, jsonify
from flask_cors import CORS
-from connpy import configfile, node, nodes, hooks
+from connpy import configfile, node, nodes, hooks, printer
from connpy.ai import ai as myai
from waitress import serve
import os
@@ -143,7 +143,7 @@ def stop_api():
port = int(f.readline().strip())
PID_FILE=PID_FILE2
except:
- print("Connpy api server is not running.")
+ printer.warning("Connpy API server is not running.")
return
# Send a SIGTERM signal to the process
try:
@@ -152,7 +152,7 @@ def stop_api():
pass
# Delete the PID file
os.remove(PID_FILE)
- print(f"Server with process ID {pid} stopped.")
+ printer.info(f"Server with process ID {pid} stopped.")
return port
@hooks.MethodHook
@@ -168,7 +168,7 @@ def start_server(port=8048):
@hooks.MethodHook
def start_api(port=8048):
if os.path.exists(PID_FILE1) or os.path.exists(PID_FILE2):
- print("Connpy server is already running.")
+ printer.warning("Connpy server is already running.")
return
pid = os.fork()
if pid == 0:
@@ -182,7 +182,7 @@ def start_api(port=8048):
with open(PID_FILE2, "w") as f:
f.write(str(pid) + "\n" + str(port))
except:
- print("Cound't create PID file")
- return
- print(f'Server is running with process ID {pid} in port {port}')
+ printer.error("Couldn't create PID file.")
+ exit(1)
+ printer.start(f"Server is running with process ID {pid} on port {port}")
diff --git a/connpy/connapp.py b/connpy/connapp.py
index 99f884d..9128225 100755
--- a/connpy/connapp.py
+++ b/connpy/connapp.py
@@ -8,6 +8,7 @@ import sys
import inquirer
from .core import node,nodes
from ._version import __version__
+from . import printer
from .api import start_api,stop_api,debug_api,app
from .ai import ai
from .plugins import Plugins
@@ -17,8 +18,13 @@ class NoAliasDumper(yaml.SafeDumper):
def ignore_aliases(self, data):
return True
import ast
-from rich import print as mdprint
from rich.markdown import Markdown
+from rich.console import Console, Group
+from rich.panel import Panel
+from rich.text import Text
+from rich.rule import Rule
+from rich.style import Style
+mdprint = Console().print
try:
from pyfzf.pyfzf import FzfPrompt
except:
@@ -70,7 +76,7 @@ class connapp:
'''
#DEFAULTPARSER
- defaultparser = argparse.ArgumentParser(prog = "conn", description = "SSH and Telnet connection manager", formatter_class=argparse.RawTextHelpFormatter)
+ defaultparser = argparse.ArgumentParser(prog = "connpy", description = "SSH and Telnet connection manager", formatter_class=argparse.RawTextHelpFormatter)
subparsers = defaultparser.add_subparsers(title="Commands", dest="subcommand")
#NODEPARSER
nodeparser = subparsers.add_parser("node", formatter_class=argparse.RawTextHelpFormatter)
@@ -190,7 +196,11 @@ class connapp:
argv[0] = "profile"
if len(argv) < 1 or argv[0] not in self.commands:
argv.insert(0,"node")
- args = defaultparser.parse_args(argv)
+ args, unknown_args = defaultparser.parse_known_args(argv)
+ if hasattr(args, "unknown_args"):
+ args.unknown_args = unknown_args
+ else:
+ args = defaultparser.parse_args(argv)
if args.subcommand in self.plugins.plugins:
self.plugins.plugins[args.subcommand].Entrypoint(args, self.plugins.plugin_parsers[args.subcommand].parser, self)
else:
@@ -211,14 +221,14 @@ class connapp:
return actions.get(args.action)(args)
def _version(self, args):
- print(__version__)
+ printer.info(f"Connpy {__version__}")
def _connect(self, args):
if args.data == None:
matches = self.nodes_list
if len(matches) == 0:
- print("There are no nodes created")
- print("try: conn --help")
+ printer.warning("There are no nodes created")
+ printer.info("try: connpy --help")
exit(9)
else:
if args.data.startswith("@"):
@@ -226,7 +236,7 @@ class connapp:
else:
matches = list(filter(lambda k: k.startswith(args.data), self.nodes_list))
if len(matches) == 0:
- print("{} not found".format(args.data))
+ printer.error("{} not found".format(args.data))
exit(2)
elif len(matches) > 1:
matches[0] = self._choose(matches,"node", "connect")
@@ -243,16 +253,16 @@ class connapp:
def _del(self, args):
if args.data == None:
- print("Missing argument node")
+ printer.error("Missing argument node")
exit(3)
elif args.data.startswith("@"):
matches = list(filter(lambda k: k == args.data, self.folders))
else:
matches = self.config._getallnodes(args.data)
if len(matches) == 0:
- print("{} not found".format(args.data))
+ printer.error("{} not found".format(args.data))
exit(2)
- print("Removing: {}".format(matches))
+ printer.info("Removing: {}".format(matches))
question = [inquirer.Confirm("delete", message="Are you sure you want to continue?")]
confirm = inquirer.prompt(question)
if confirm == None:
@@ -267,14 +277,14 @@ class connapp:
self.config._connections_del(**nodeuniques)
self.config._saveconfig(self.config.file)
if len(matches) == 1:
- print("{} deleted succesfully".format(matches[0]))
+ printer.success("{} deleted successfully".format(matches[0]))
else:
- print(f"{len(matches)} nodes deleted succesfully")
+ printer.success(f"{len(matches)} nodes deleted successfully")
def _add(self, args):
args.data = self._type_node(args.data)
if args.data == None:
- print("Missing argument node")
+ printer.error("Missing argument node")
exit(3)
elif args.data.startswith("@"):
type = "folder"
@@ -285,34 +295,34 @@ class connapp:
matches = list(filter(lambda k: k == args.data, self.nodes_list))
reversematches = list(filter(lambda k: k == "@" + args.data, self.folders))
if len(matches) > 0:
- print("{} already exist".format(matches[0]))
+ printer.error("{} already exist".format(matches[0]))
exit(4)
if len(reversematches) > 0:
- print("{} already exist".format(reversematches[0]))
+ printer.error("{} already exist".format(reversematches[0]))
exit(4)
else:
if type == "folder":
uniques = self.config._explode_unique(args.data)
if uniques == False:
- print("Invalid folder {}".format(args.data))
+ printer.error("Invalid folder {}".format(args.data))
exit(5)
if "subfolder" in uniques.keys():
parent = "@" + uniques["folder"]
if parent not in self.folders:
- print("Folder {} not found".format(uniques["folder"]))
+ printer.error("Folder {} not found".format(uniques["folder"]))
exit(2)
self.config._folder_add(**uniques)
self.config._saveconfig(self.config.file)
- print("{} added succesfully".format(args.data))
+ printer.success("{} added successfully".format(args.data))
if type == "node":
nodefolder = args.data.partition("@")
nodefolder = "@" + nodefolder[2]
if nodefolder not in self.folders and nodefolder != "@":
- print(nodefolder + " not found")
+ printer.error(nodefolder + " not found")
exit(2)
uniques = self.config._explode_unique(args.data)
if uniques == False:
- print("Invalid node {}".format(args.data))
+ printer.error("Invalid node {}".format(args.data))
exit(5)
self._print_instructions()
newnode = self._questions_nodes(args.data, uniques)
@@ -320,44 +330,43 @@ class connapp:
exit(7)
self.config._connections_add(**newnode)
self.config._saveconfig(self.config.file)
- print("{} added succesfully".format(args.data))
+ printer.success("{} added successfully".format(args.data))
def _show(self, args):
if args.data == None:
- print("Missing argument node")
+ printer.error("Missing argument node")
exit(3)
- matches = list(filter(lambda k: k == args.data, self.nodes_list))
+ if args.data.startswith("@"):
+ matches = list(filter(lambda k: args.data in k, self.nodes_list))
+ else:
+ matches = list(filter(lambda k: k.startswith(args.data), self.nodes_list))
if len(matches) == 0:
- print("{} not found".format(args.data))
+ printer.error("{} not found".format(args.data))
exit(2)
+ elif len(matches) > 1:
+ matches[0] = self._choose(matches,"node", "connect")
+ if matches[0] == None:
+ exit(7)
node = self.config.getitem(matches[0])
- for k, v in node.items():
- if isinstance(v, str):
- print(k + ": " + v)
- elif isinstance(v, list):
- print(k + ":")
- for i in v:
- print(" - " + i)
- elif isinstance(v, dict):
- print(k + ":")
- for i,d in v.items():
- print(" - " + i + ": " + str(d))
+ yaml_output = yaml.dump(node, sort_keys=False, default_flow_style=False)
+ printer.custom(matches[0],"")
+ print(yaml_output)
def _mod(self, args):
if args.data == None:
- print("Missing argument node")
+ printer.error("Missing argument node")
exit(3)
matches = self.config._getallnodes(args.data)
if len(matches) == 0:
- print("No connection found with filter: {}".format(args.data))
+ printer.error("No connection found with filter: {}".format(args.data))
exit(2)
elif len(matches) == 1:
- uniques = self.config._explode_unique(args.data)
+ uniques = self.config._explode_unique(matches[0])
unique = matches[0]
else:
uniques = {"id": None, "folder": None}
unique = None
- print("Editing: {}".format(matches))
+ printer.info("Editing: {}".format(matches))
node = {}
for i in matches:
node[i] = self.config.getitem(i)
@@ -371,12 +380,12 @@ class connapp:
uniques.update(node[matches[0]])
uniques["type"] = "connection"
if sorted(updatenode.items()) == sorted(uniques.items()):
- print("Nothing to do here")
+ printer.info("Nothing to do here")
return
else:
self.config._connections_add(**updatenode)
self.config._saveconfig(self.config.file)
- print("{} edited succesfully".format(args.data))
+ printer.success("{} edited successfully".format(args.data))
else:
for k in node:
updatednode = self.config._explode_unique(k)
@@ -388,12 +397,12 @@ class connapp:
editcount += 1
updatednode[key] = updatenode[key]
if not editcount:
- print("Nothing to do here")
+ printer.info("Nothing to do here")
return
else:
self.config._connections_add(**updatednode)
self.config._saveconfig(self.config.file)
- print("{} edited succesfully".format(matches))
+ printer.success("{} edited successfully".format(matches))
return
@@ -407,57 +416,48 @@ class connapp:
def _profile_del(self, args):
matches = list(filter(lambda k: k == args.data[0], self.profiles))
if len(matches) == 0:
- print("{} not found".format(args.data[0]))
+ printer.error("{} not found".format(args.data[0]))
exit(2)
if matches[0] == "default":
- print("Can't delete default profile")
+ printer.error("Can't delete default profile")
exit(6)
usedprofile = self.config._profileused(matches[0])
if len(usedprofile) > 0:
- print("Profile {} used in the following nodes:".format(matches[0]))
- print(", ".join(usedprofile))
+ printer.error(f"Profile {matches[0]} used in the following nodes:\n{', '.join(usedprofile)}")
exit(8)
question = [inquirer.Confirm("delete", message="Are you sure you want to delete {}?".format(matches[0]))]
confirm = inquirer.prompt(question)
if confirm["delete"]:
self.config._profiles_del(id = matches[0])
self.config._saveconfig(self.config.file)
- print("{} deleted succesfully".format(matches[0]))
+ printer.success("{} deleted successfully".format(matches[0]))
def _profile_show(self, args):
matches = list(filter(lambda k: k == args.data[0], self.profiles))
if len(matches) == 0:
- print("{} not found".format(args.data[0]))
+ printer.error("{} not found".format(args.data[0]))
exit(2)
profile = self.config.profiles[matches[0]]
- for k, v in profile.items():
- if isinstance(v, str):
- print(k + ": " + v)
- elif isinstance(v, list):
- print(k + ":")
- for i in v:
- print(" - " + i)
- elif isinstance(v, dict):
- print(k + ":")
- for i,d in v.items():
- print(" - " + i + ": " + str(d))
+ yaml_output = yaml.dump(profile, sort_keys=False, default_flow_style=False)
+ printer.custom(matches[0],"")
+ print(yaml_output)
def _profile_add(self, args):
matches = list(filter(lambda k: k == args.data[0], self.profiles))
if len(matches) > 0:
- print("Profile {} Already exist".format(matches[0]))
+ printer.error("Profile {} Already exist".format(matches[0]))
exit(4)
newprofile = self._questions_profiles(args.data[0])
if newprofile == False:
exit(7)
self.config._profiles_add(**newprofile)
self.config._saveconfig(self.config.file)
- print("{} added succesfully".format(args.data[0]))
+ printer.success("{} added successfully".format(args.data[0]))
def _profile_mod(self, args):
matches = list(filter(lambda k: k == args.data[0], self.profiles))
if len(matches) == 0:
- print("{} not found".format(args.data[0]))
+ printer.error("{} not found".format(args.data[0]))
exit(2)
profile = self.config.profiles[matches[0]]
oldprofile = {"id": matches[0]}
@@ -469,12 +469,12 @@ class connapp:
if not updateprofile:
exit(7)
if sorted(updateprofile.items()) == sorted(oldprofile.items()):
- print("Nothing to do here")
+ printer.info("Nothing to do here")
return
else:
self.config._profiles_add(**updateprofile)
self.config._saveconfig(self.config.file)
- print("{} edited succesfully".format(args.data[0]))
+ printer.success("{} edited successfully".format(args.data[0]))
def _func_others(self, args):
#Function called when using other commands
@@ -509,7 +509,9 @@ class connapp:
formated[upper_key] = upper_value
newitems.append(args.format[0].format(**formated))
items = newitems
- print(*items, sep="\n")
+ yaml_output = yaml.dump(items, sort_keys=False, default_flow_style=False)
+ printer.custom(args.data,"")
+ print(yaml_output)
def _mvcp(self, args):
if not self.case:
@@ -518,20 +520,20 @@ class connapp:
source = list(filter(lambda k: k == args.data[0], self.nodes_list))
dest = list(filter(lambda k: k == args.data[1], self.nodes_list))
if len(source) != 1:
- print("{} not found".format(args.data[0]))
+ printer.error("{} not found".format(args.data[0]))
exit(2)
if len(dest) > 0:
- print("Node {} Already exist".format(args.data[1]))
+ printer.error("Node {} Already exist".format(args.data[1]))
exit(4)
nodefolder = args.data[1].partition("@")
nodefolder = "@" + nodefolder[2]
if nodefolder not in self.folders and nodefolder != "@":
- print("{} not found".format(nodefolder))
+ printer.error("{} not found".format(nodefolder))
exit(2)
olduniques = self.config._explode_unique(args.data[0])
newuniques = self.config._explode_unique(args.data[1])
if newuniques == False:
- print("Invalid node {}".format(args.data[1]))
+ printer.error("Invalid node {}".format(args.data[1]))
exit(5)
node = self.config.getitem(source[0])
newnode = {**newuniques, **node}
@@ -540,7 +542,7 @@ class connapp:
self.config._connections_del(**olduniques)
self.config._saveconfig(self.config.file)
action = "moved" if args.command == "move" else "copied"
- print("{} {} succesfully to {}".format(args.data[0],action, args.data[1]))
+ printer.success("{} {} successfully to {}".format(args.data[0],action, args.data[1]))
def _bulk(self, args):
if args.file and os.path.isfile(args.file[0]):
@@ -549,7 +551,9 @@ class connapp:
# Expecting exactly 2 lines
if len(lines) < 2:
- raise ValueError("The file must contain at least two lines: one for nodes, one for hosts.")
+ printer.error("The file must contain at least two lines: one for nodes, one for hosts.")
+ exit(11)
+
nodes = lines[0].strip()
hosts = lines[1].strip()
@@ -569,10 +573,10 @@ class connapp:
matches = list(filter(lambda k: k == unique, self.nodes_list))
reversematches = list(filter(lambda k: k == "@" + unique, self.folders))
if len(matches) > 0:
- print("Node {} already exist, ignoring it".format(unique))
+ printer.info("Node {} already exist, ignoring it".format(unique))
continue
if len(reversematches) > 0:
- print("Folder with name {} already exist, ignoring it".format(unique))
+ printer.info("Folder with name {} already exist, ignoring it".format(unique))
continue
newnode = {"id": n}
if newnodes["location"] != "":
@@ -596,9 +600,9 @@ class connapp:
self.nodes_list = self.config._getallnodes()
if count > 0:
self.config._saveconfig(self.config.file)
- print("Succesfully added {} nodes".format(count))
+ printer.success("Successfully added {} nodes".format(count))
else:
- print("0 nodes added")
+ printer.info("0 nodes added")
def _completion(self, args):
if args.data[0] == "bash":
@@ -633,7 +637,7 @@ class connapp:
folder = os.path.abspath(args.data[0]).rstrip('/')
with open(pathfile, "w") as f:
f.write(str(folder))
- print("Config saved")
+ printer.success("Config saved")
def _openai(self, args):
if "openai" in self.config.config:
@@ -647,37 +651,37 @@ class connapp:
def _change_settings(self, name, value):
self.config.config[name] = value
self.config._saveconfig(self.config.file)
- print("Config saved")
+ printer.success("Config saved")
def _func_plugin(self, args):
if args.add:
if not os.path.exists(args.add[1]):
- print("File {} dosn't exists.".format(args.add[1]))
+ printer.error("File {} dosn't exists.".format(args.add[1]))
exit(14)
if args.add[0].isalpha() and args.add[0].islower() and len(args.add[0]) <= 15:
disabled_dest_file = os.path.join(self.config.defaultdir + "/plugins", args.add[0] + ".py.bkp")
if args.add[0] in self.commands or os.path.exists(disabled_dest_file):
- print("Plugin name can't be the same as other commands.")
+ printer.error("Plugin name can't be the same as other commands.")
exit(15)
else:
check_bad_script = self.plugins.verify_script(args.add[1])
if check_bad_script:
- print(check_bad_script)
+ printer.error(check_bad_script)
exit(16)
else:
try:
dest_file = os.path.join(self.config.defaultdir + "/plugins", args.add[0] + ".py")
shutil.copy2(args.add[1], dest_file)
- print(f"Plugin {args.add[0]} added succesfully.")
+ printer.success(f"Plugin {args.add[0]} added successfully.")
except Exception as e:
- print(f"Failed importing plugin file. {e}")
+ printer.error(f"Failed importing plugin file. {e}")
exit(17)
else:
- print("Plugin name should be lowercase letters up to 15 characters.")
+ printer.error("Plugin name should be lowercase letters up to 15 characters.")
exit(15)
elif args.update:
if not os.path.exists(args.update[1]):
- print("File {} dosn't exists.".format(args.update[1]))
+ printer.error("File {} dosn't exists.".format(args.update[1]))
exit(14)
plugin_file = os.path.join(self.config.defaultdir + "/plugins", args.update[0] + ".py")
disabled_plugin_file = os.path.join(self.config.defaultdir + "/plugins", args.update[0] + ".py.bkp")
@@ -686,7 +690,7 @@ class connapp:
if plugin_exist or disabled_plugin_exist:
check_bad_script = self.plugins.verify_script(args.update[1])
if check_bad_script:
- print(check_bad_script)
+ printer.error(check_bad_script)
exit(16)
else:
try:
@@ -696,13 +700,13 @@ class connapp:
shutil.copy2(args.update[1], disabled_dest_file)
else:
shutil.copy2(args.update[1], dest_file)
- print(f"Plugin {args.update[0]} updated succesfully.")
+ printer.success(f"Plugin {args.update[0]} updated successfully.")
except Exception as e:
- print(f"Failed updating plugin file. {e}")
+ printer.error(f"Failed updating plugin file. {e}")
exit(17)
else:
- print("Plugin {} dosn't exist.".format(args.update[0]))
+ printer.error("Plugin {} dosn't exist.".format(args.update[0]))
exit(14)
elif args.delete:
plugin_file = os.path.join(self.config.defaultdir + "/plugins", args.delete[0] + ".py")
@@ -710,7 +714,7 @@ class connapp:
plugin_exist = os.path.exists(plugin_file)
disabled_plugin_exist = os.path.exists(disabled_plugin_file)
if not plugin_exist and not disabled_plugin_exist:
- print("Plugin {} dosn't exist.".format(args.delete[0]))
+ printer.error("Plugin {} dosn't exist.".format(args.delete[0]))
exit(14)
question = [inquirer.Confirm("delete", message="Are you sure you want to delete {} plugin?".format(args.delete[0]))]
confirm = inquirer.prompt(question)
@@ -722,33 +726,33 @@ class connapp:
os.remove(plugin_file)
elif disabled_plugin_exist:
os.remove(disabled_plugin_file)
- print(f"plugin {args.delete[0]} deleted succesfully.")
+ printer.success(f"plugin {args.delete[0]} deleted successfully.")
except Exception as e:
- print(f"Failed deleting plugin file. {e}")
+ printer.error(f"Failed deleting plugin file. {e}")
exit(17)
elif args.disable:
plugin_file = os.path.join(self.config.defaultdir + "/plugins", args.disable[0] + ".py")
disabled_plugin_file = os.path.join(self.config.defaultdir + "/plugins", args.disable[0] + ".py.bkp")
if not os.path.exists(plugin_file) or os.path.exists(disabled_plugin_file):
- print("Plugin {} dosn't exist or it's disabled.".format(args.disable[0]))
+ printer.error("Plugin {} dosn't exist or it's disabled.".format(args.disable[0]))
exit(14)
try:
os.rename(plugin_file, disabled_plugin_file)
- print(f"plugin {args.disable[0]} disabled succesfully.")
+ printer.success(f"plugin {args.disable[0]} disabled successfully.")
except Exception as e:
- print(f"Failed disabling plugin file. {e}")
+ printer.error(f"Failed disabling plugin file. {e}")
exit(17)
elif args.enable:
plugin_file = os.path.join(self.config.defaultdir + "/plugins", args.enable[0] + ".py")
disabled_plugin_file = os.path.join(self.config.defaultdir + "/plugins", args.enable[0] + ".py.bkp")
if os.path.exists(plugin_file) or not os.path.exists(disabled_plugin_file):
- print("Plugin {} dosn't exist or it's enabled.".format(args.enable[0]))
+ printer.error("Plugin {} dosn't exist or it's enabled.".format(args.enable[0]))
exit(14)
try:
os.rename(disabled_plugin_file, plugin_file)
- print(f"plugin {args.enable[0]} enabled succesfully.")
+ printer.success(f"plugin {args.enable[0]} enabled successfully.")
except Exception as e:
- print(f"Failed enabling plugin file. {e}")
+ printer.error(f"Failed enabling plugin file. {e}")
exit(17)
elif args.list:
enabled_files = []
@@ -768,18 +772,19 @@ class connapp:
if disabled_files:
plugins["Disabled"] = disabled_files
if plugins:
+ printer.custom("plugins","")
print(yaml.dump(plugins, sort_keys=False))
else:
- print("There are no plugins added.")
+ printer.warning("There are no plugins added.")
def _func_import(self, args):
if not os.path.exists(args.data[0]):
- print("File {} dosn't exist".format(args.data[0]))
+ printer.error("File {} dosn't exist".format(args.data[0]))
exit(14)
- print("This could overwrite your current configuration!")
+ printer.warning("This could overwrite your current configuration!")
question = [inquirer.Confirm("import", message="Are you sure you want to import {} file?".format(args.data[0]))]
confirm = inquirer.prompt(question)
if confirm == None:
@@ -789,7 +794,7 @@ class connapp:
with open(args.data[0]) as file:
imported = yaml.load(file, Loader=yaml.FullLoader)
except:
- print("failed reading file {}".format(args.data[0]))
+ printer.error("failed reading file {}".format(args.data[0]))
exit(10)
for k,v in imported.items():
uniques = self.config._explode_unique(k)
@@ -808,12 +813,12 @@ class connapp:
uniques.update(v)
self.config._connections_add(**uniques)
self.config._saveconfig(self.config.file)
- print("File {} imported succesfully".format(args.data[0]))
+ printer.success("File {} imported successfully".format(args.data[0]))
return
def _func_export(self, args):
if os.path.exists(args.data[0]):
- print("File {} already exists".format(args.data[0]))
+ printer.error("File {} already exists".format(args.data[0]))
exit(14)
if len(args.data[1:]) == 0:
foldercons = self.config._getallnodesfull(extract = False)
@@ -821,13 +826,13 @@ class connapp:
for folder in args.data[1:]:
matches = list(filter(lambda k: k == folder, self.folders))
if len(matches) == 0 and folder != "@":
- print("{} folder not found".format(folder))
+ printer.error("{} folder not found".format(folder))
exit(2)
foldercons = self.config._getallnodesfull(args.data[1:], extract = False)
with open(args.data[0], "w") as file:
yaml.dump(foldercons, file, Dumper=NoAliasDumper, default_flow_style=False)
file.close()
- print("File {} generated succesfully".format(args.data[0]))
+ printer.success("File {} generated successfully".format(args.data[0]))
exit()
return
@@ -977,13 +982,13 @@ class connapp:
def _yaml_generate(self, args):
if os.path.exists(args.data[0]):
- print("File {} already exists".format(args.data[0]))
+ printer.error("File {} already exists".format(args.data[0]))
exit(14)
else:
with open(args.data[0], "w") as file:
file.write(self._help("generate"))
file.close()
- print("File {} generated succesfully".format(args.data[0]))
+ printer.success("File {} generated successfully".format(args.data[0]))
exit()
def _yaml_run(self, args):
@@ -991,7 +996,7 @@ class connapp:
with open(args.data[0]) as file:
scripts = yaml.load(file, Loader=yaml.FullLoader)
except:
- print("failed reading file {}".format(args.data[0]))
+ printer.error("failed reading file {}".format(args.data[0]))
exit(10)
for script in scripts["tasks"]:
self._cli_run(script)
@@ -1007,11 +1012,11 @@ class connapp:
if action == "test":
args["expected"] = script["expected"]
except KeyError as e:
- print("'{}' is mandatory".format(e.args[0]))
+ printer.error("'{}' is mandatory".format(e.args[0]))
exit(11)
nodes = self.config._getallnodes(nodelist)
if len(nodes) == 0:
- print("{} don't match any node".format(nodelist))
+ printer.error("{} don't match any node".format(nodelist))
exit(2)
nodes = self.nodes(self.config.getitems(nodes), config = self.config)
stdout = False
@@ -1037,32 +1042,47 @@ class connapp:
columns = int(p.group(1))
except:
columns = 80
+
+
+ PANEL_WIDTH = columns
+
if action == "run":
nodes.run(**args)
- print(script["name"].upper() + "-" * (columns - len(script["name"])))
- for i in nodes.status.keys():
- print(" " + i + " " + "-" * (columns - len(i) - 13) + (" PASS(0)" if nodes.status[i] == 0 else " FAIL({})".format(nodes.status[i])))
- if stdout:
- for line in nodes.output[i].splitlines():
- print(" " + line)
+ header = f"{script['name'].upper()}"
elif action == "test":
nodes.test(**args)
- print(script["name"].upper() + "-" * (columns - len(script["name"])))
- for i in nodes.status.keys():
- print(" " + i + " " + "-" * (columns - len(i) - 13) + (" PASS(0)" if nodes.status[i] == 0 else " FAIL({})".format(nodes.status[i])))
- if nodes.status[i] == 0:
- max_length = max(len(s) for s in nodes.result[i].keys())
- for k,v in nodes.result[i].items():
- print(" TEST for '{}'".format(k) + " "*(max_length - len(k) + 1) + "--> " + str(v).upper())
- if stdout:
- if nodes.status[i] == 0:
- print(" " + "-" * (max_length + 21))
- for line in nodes.output[i].splitlines():
- print(" " + line)
+ header = f"{script['name'].upper()}"
else:
- print("Wrong action '{}'".format(action))
+ printer.error(f"Wrong action '{action}'")
exit(13)
+ mdprint(Rule(header, style="white"))
+
+ for node in nodes.status:
+ status_str = "[✓] PASS(0)" if nodes.status[node] == 0 else f"[x] FAIL({nodes.status[node]})"
+ title_line = f"{node} — {status_str}"
+
+ test_output = Text()
+ if action == "test" and nodes.status[node] == 0:
+ results = nodes.result[node]
+ test_output.append("TEST RESULTS:\n")
+ max_key_len = max(len(k) for k in results.keys())
+ for k, v in results.items():
+ status = "[✓]" if str(v).upper() == "TRUE" else "[x]"
+ test_output.append(f" {k.ljust(max_key_len)} {status}\n")
+
+ output = nodes.output[node].strip()
+ code_block = Text()
+ if stdout and output:
+ code_block = Text(output + "\n")
+
+ if action == "test" and nodes.status[node] == 0:
+ highlight_words = [k for k, v in nodes.result[node].items() if str(v).upper() == "TRUE"]
+ code_block.highlight_words(highlight_words, style=Style(color="green", bold=True, underline=True))
+
+ panel_content = Group(test_output, Text(""), code_block)
+ mdprint(Panel(panel_content, title=title_line, width=PANEL_WIDTH, border_style="white"))
+
def _choose(self, list, name, action):
#Generates an inquirer list to pick
if FzfPrompt and self.fzf:
@@ -1429,7 +1449,7 @@ class connapp:
if subparser.description != None:
commands.append(subcommand)
commands = ",".join(commands)
- usage_help = f"conn [-h] [--add | --del | --mod | --show | --debug] [node|folder] [--sftp]\n conn {{{commands}}} ..."
+ usage_help = f"connpy [-h] [--add | --del | --mod | --show | --debug] [node|folder] [--sftp]\n connpy {{{commands}}} ..."
return usage_help
if type == "end":
help_dict = {}
@@ -1602,5 +1622,4 @@ Here are some important instructions and tips for configuring your new node:
Please follow these instructions carefully to ensure proper configuration of your new node.
"""
- # print(instructions)
mdprint(Markdown(instructions))
diff --git a/connpy/core.py b/connpy/core.py
index 1c9bc9c..fa15b2e 100755
--- a/connpy/core.py
+++ b/connpy/core.py
@@ -13,6 +13,7 @@ import threading
from pathlib import Path
from copy import deepcopy
from .hooks import ClassHook, MethodHook
+from . import printer
import io
#functions and classes
@@ -28,7 +29,7 @@ class node:
- result(bool): True if expected value is found after running
the commands using test method.
- - status (int): 0 if the method run or test run succesfully.
+ - status (int): 0 if the method run or test run successfully.
1 if connection failed.
2 if expect timeouts without prompt or EOF.
@@ -254,7 +255,7 @@ class node:
if connect == True:
size = re.search('columns=([0-9]+).*lines=([0-9]+)',str(os.get_terminal_size()))
self.child.setwinsize(int(size.group(2)),int(size.group(1)))
- print("Connected to " + self.unique + " at " + self.host + (":" if self.port != '' else '') + self.port + " via: " + self.protocol)
+ printer.success("Connected to " + self.unique + " at " + self.host + (":" if self.port != '' else '') + self.port + " via: " + self.protocol)
if 'logfile' in dir(self):
# Initialize self.mylog
if not 'mylog' in dir(self):
@@ -279,7 +280,7 @@ class node:
f.write(self._logclean(self.mylog.getvalue().decode(), True))
else:
- print(connect)
+ printer.error(connect)
exit(1)
@MethodHook
@@ -585,7 +586,7 @@ class node:
if isinstance(self.tags, dict) and self.tags.get("console"):
child.sendline()
if debug:
- print(cmd)
+ printer.debug(f"Command:\n{cmd}")
self.mylog = io.BytesIO()
child.logfile_read = self.mylog
@@ -645,6 +646,8 @@ class node:
sleep(1)
child.readline(0)
self.child = child
+ from pexpect import fdpexpect
+ self.raw_child = fdpexpect.fdspawn(self.child.child_fd)
return True
@ClassHook
@@ -666,7 +669,7 @@ class nodes:
Created after running method test.
- status (dict): Dictionary formed by nodes unique as keys, value:
- 0 if method run or test ended succesfully.
+ 0 if method run or test ended successfully.
1 if connection failed.
2 if expect timeouts without prompt or EOF.
diff --git a/connpy/core_plugins/capture.py b/connpy/core_plugins/capture.py
new file mode 100644
index 0000000..5202063
--- /dev/null
+++ b/connpy/core_plugins/capture.py
@@ -0,0 +1,387 @@
+import argparse
+import sys
+import subprocess
+import random
+import socket
+import time
+import threading
+from pexpect import TIMEOUT
+from connpy import printer
+
+class RemoteCapture:
+ def __init__(self, connapp, node_name, interface, namespace=None, use_wireshark=False, tcpdump_filter=None, tcpdump_args=None):
+ self.connapp = connapp
+ self.node_name = node_name
+ self.interface = interface
+ self.namespace = namespace
+ self.use_wireshark = use_wireshark
+ self.tcpdump_filter = tcpdump_filter or []
+ self.tcpdump_args = tcpdump_args if isinstance(tcpdump_args, list) else []
+
+ if node_name.startswith("@"): # fuzzy match
+ matches = [k for k in connapp.nodes_list if node_name in k]
+ else:
+ matches = [k for k in connapp.nodes_list if k.startswith(node_name)]
+
+ if not matches:
+ printer.error(f"Node '{node_name}' not found.")
+ sys.exit(2)
+ elif len(matches) > 1:
+ matches[0] = connapp._choose(matches, "node", "capture")
+
+ if matches[0] is None:
+ sys.exit(7)
+
+ node_data = connapp.config.getitem(matches[0])
+ self.node = connapp.node(matches[0], **node_data, config=connapp.config)
+
+ if self.node.protocol != "ssh":
+ printer.error(f"Node '{self.node.unique}' must be an SSH connection.")
+ sys.exit(2)
+
+ self.wireshark_path = connapp.config.config.get("wireshark_path")
+
+ def _start_local_listener(self, port, ws_proc=None):
+ self.listener_active = True
+ self.listener_conn = None
+ self.listener_connected = threading.Event()
+
+ def listen():
+ with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
+ s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ s.bind(("localhost", port))
+ s.listen(1)
+ printer.start(f"Listening on localhost:{port}")
+
+ conn, addr = s.accept()
+ self.listener_conn = conn
+ printer.start(f"Connection from {addr}")
+ self.listener_connected.set()
+
+ try:
+ while self.listener_active:
+ data = conn.recv(4096)
+ if not data:
+ break
+
+ if self.use_wireshark and ws_proc:
+ try:
+ ws_proc.stdin.write(data)
+ ws_proc.stdin.flush()
+ except BrokenPipeError:
+ printer.info("Wireshark closed the pipe.")
+ break
+ else:
+ sys.stdout.buffer.write(data)
+ sys.stdout.buffer.flush()
+ except Exception as e:
+ if isinstance(e, BrokenPipeError):
+ printer.info("Listener closed due to broken pipe.")
+ else:
+ printer.error(f"Listener error: {e}")
+ finally:
+ conn.close()
+ self.listener_conn = None
+
+ self.listener_thread = threading.Thread(target=listen)
+ self.listener_thread.daemon = True
+ self.listener_thread.start()
+
+ def _is_port_in_use(self, port):
+ with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
+ return s.connect_ex(('localhost', port)) == 0
+
+ def _find_free_port(self, start=20000, end=30000):
+ for _ in range(10):
+ port = random.randint(start, end)
+ if not self._is_port_in_use(port):
+ return port
+ raise RuntimeError("No free port found for SSH tunnel.")
+
+ def _monitor_wireshark(self, ws_proc):
+ try:
+ while True:
+ try:
+ ws_proc.wait(timeout=1)
+ self.listener_active = False
+ if self.listener_conn:
+ printer.info("Wireshark exited, stopping listener.")
+ try:
+ self.listener_conn.shutdown(socket.SHUT_RDWR)
+ self.listener_conn.close()
+ except Exception:
+ pass
+ break
+ except subprocess.TimeoutExpired:
+ if not self.listener_active:
+ break
+ time.sleep(0.2)
+ except Exception as e:
+ printer.warning(f"Error in monitor_wireshark: {e}")
+
+ def _detect_sudo_requirement(self):
+ base_cmd = f"tcpdump -i {self.interface} -w - -U -c 1"
+ if self.namespace:
+ base_cmd = f"ip netns exec {self.namespace} {base_cmd}"
+
+ cmds = [base_cmd, f"sudo {base_cmd}"]
+
+ printer.info(f"Verifying sudo requirement")
+ for cmd in cmds:
+ try:
+ self.node.child.sendline(cmd)
+ start_time = time.time()
+ while time.time() - start_time < 5:
+ try:
+ index = self.node.child.expect([
+ r'listening on',
+ r'permission denied',
+ r'cannot',
+ r'No such file or directory',
+ ], timeout=1)
+
+ if index == 0:
+ self.node.child.send("\x03")
+ return "sudo" in cmd
+ else:
+ break
+ except Exception:
+ continue
+
+ self.node.child.send("\x03")
+ time.sleep(0.5)
+ try:
+ self.node.child.read_nonblocking(size=1024, timeout=0.5)
+ except Exception:
+ pass
+
+ except Exception as e:
+ printer.warning(f"Error during sudo detection: {e}")
+ continue
+
+ printer.error(f"Failed to run tcpdump on remote node '{self.node.unique}'")
+ sys.exit(4)
+
+ def _monitor_capture_output(self):
+ try:
+ index = self.node.child.expect([
+ r'Broken pipe',
+ r'packet[s]? captured'
+ ], timeout=None)
+ if index == 0:
+ printer.error("Tcpdump failed: Broken pipe.")
+ else:
+ printer.success("Tcpdump finished capturing packets.")
+
+ self.listener_active = False
+ except:
+ pass
+
+ def _sendline_until_connected(self, cmd, retries=5, interval=2):
+ for attempt in range(1, retries + 1):
+ printer.info(f"Attempt {attempt}/{retries} to connect listener...")
+ self.node.child.sendline(cmd)
+
+ try:
+ index = self.node.child.expect([
+ r'listening on',
+ TIMEOUT,
+ r'permission',
+ r'not permitted',
+ r'invalid',
+ r'unrecognized',
+ r'Unable',
+ r'No such',
+ r'illegal',
+ r'syntax error'
+ ], timeout=5)
+
+ if index == 0:
+
+ self.monitor_end = threading.Thread(target=self._monitor_capture_output)
+ self.monitor_end.daemon = True
+ self.monitor_end.start()
+
+ if self.listener_connected.wait(timeout=interval):
+ printer.success("Listener successfully received a connection.")
+ return True
+ else:
+ printer.warning("No connection yet. Retrying...")
+
+ elif index == 1:
+ error = f"tcpdump did not respond within the expected time.\n" \
+ f"Command used:\n{cmd}\n" \
+ f"→ Please verify the command syntax."
+ return f"{error}"
+ else:
+ before_last_line = self.node.child.before.decode().splitlines()[-1]
+ error = f"Tcpdump error detected:" \
+ f"{before_last_line}{self.node.child.after.decode()}{self.node.child.readline().decode()}".rstrip()
+ return f"{error}"
+
+ except Exception as e:
+ printer.warning(f"Unexpected error during tcpdump startup: {e}")
+ return False
+
+ return False
+
+ def _build_tcpdump_command(self):
+ base = f"tcpdump -i {self.interface}"
+ if self.use_wireshark:
+ base += " -w - -U"
+ else:
+ base += " -l"
+
+ if self.namespace:
+ base = f"ip netns exec {self.namespace} {base}"
+
+ if self.requires_sudo:
+ base = f"sudo {base}"
+
+ if self.tcpdump_args:
+ base += " " + " ".join(self.tcpdump_args)
+
+ if self.tcpdump_filter:
+ base += " " + " ".join(self.tcpdump_filter)
+
+ base += f" | nc localhost {self.local_port}"
+ return base
+
+ def run(self):
+ if self.use_wireshark:
+ if not self.wireshark_path:
+ printer.error("Wireshark path not set in config.\nUse '--set-wireshark-path /full/path/to/wireshark' to configure it.")
+ sys.exit(1)
+
+ self.local_port = self._find_free_port()
+ self.node.options += f" -o ExitOnForwardFailure=yes -R {self.local_port}:localhost:{self.local_port}"
+
+ connection = self.node._connect()
+ if connection is not True:
+ printer.error(f"Could not connect to {self.node.unique}\n{connection}")
+ sys.exit(1)
+
+ self.requires_sudo = self._detect_sudo_requirement()
+ tcpdump_cmd = self._build_tcpdump_command()
+
+ ws_proc = None
+ monitor_thread = None
+
+ if self.use_wireshark:
+
+ printer.info(f"Live capture from {self.node.unique}:{self.interface}, launching Wireshark...")
+ try:
+ ws_proc = subprocess.Popen(
+ [self.wireshark_path, "-k", "-i", "-"],
+ stdin=subprocess.PIPE,
+ stderr=subprocess.PIPE
+ )
+ except Exception as e:
+ printer.error(f"Failed to launch Wireshark: {e}\nMake sure the path is correct and Wireshark is installed.")
+ exit(1)
+
+ monitor_thread = threading.Thread(target=self._monitor_wireshark, args=(ws_proc,))
+ monitor_thread.daemon = True
+ monitor_thread.start()
+ else:
+ printer.info(f"Live text capture from {self.node.unique}:{self.interface}")
+ printer.info("Press Ctrl+C to stop.\n")
+
+ try:
+ self._start_local_listener(self.local_port, ws_proc=ws_proc)
+ time.sleep(1) # small delay before retry attempts
+
+ result = self._sendline_until_connected(tcpdump_cmd, retries=5, interval=2)
+ if result is not True:
+ if isinstance(result, str):
+ printer.error(f"{result}")
+ else:
+ printer.error("Listener connection failed after all retries.")
+ self.listener_active = False
+ return
+
+ while self.listener_active:
+ time.sleep(0.5)
+
+ except KeyboardInterrupt:
+ print("")
+ printer.warning("Capture interrupted by user.")
+ self.listener_active = False
+ finally:
+ if self.listener_conn:
+ try:
+ self.listener_conn.shutdown(socket.SHUT_RDWR)
+ self.listener_conn.close()
+ except:
+ pass
+ if hasattr(self.node, "child"):
+ self.node.child.close(force=True)
+ if self.listener_thread.is_alive():
+ self.listener_thread.join()
+ if monitor_thread and monitor_thread.is_alive():
+ monitor_thread.join()
+
+
+class Parser:
+ def __init__(self):
+ self.parser = argparse.ArgumentParser(description="Capture packets remotely using a saved SSH node", epilog="All unknown arguments will be passed to tcpdump.")
+
+ self.parser.add_argument("node", nargs='?', help="Name of the saved node (must use SSH)")
+ self.parser.add_argument("interface", nargs='?', help="Network interface to capture on")
+ self.parser.add_argument("--ns", "--namespace", dest="namespace", help="Optional network namespace")
+ self.parser.add_argument("-w","--wireshark", action="store_true", help="Open live capture in Wireshark")
+ self.parser.add_argument("--set-wireshark-path", metavar="PATH", help="Set the default path to Wireshark binary")
+ self.parser.add_argument(
+ "-f", "--filter",
+ dest="tcpdump_filter",
+ metavar="ARG",
+ nargs="*",
+ default=["not", "port", "22"],
+ help="tcpdump filter expression (e.g., -f port 443 and udp). Default: not port 22"
+ )
+ self.parser.add_argument(
+ "--unknown-args",
+ action="store_true",
+ default=True,
+ help=argparse.SUPPRESS
+ )
+
+class Entrypoint:
+ def __init__(self, args, parser, connapp):
+ if "--" in args.unknown_args:
+ args.unknown_args.remove("--")
+ if args.set_wireshark_path:
+ connapp._change_settings("wireshark_path", args.set_wireshark_path)
+ return
+
+ if not args.node or not args.interface:
+ parser.error("node and interface are required unless --set-wireshark-path is used")
+
+ capture = RemoteCapture(
+ connapp=connapp,
+ node_name=args.node,
+ interface=args.interface,
+ namespace=args.namespace,
+ use_wireshark=args.wireshark,
+ tcpdump_filter=args.tcpdump_filter,
+ tcpdump_args=args.unknown_args
+ )
+ capture.run()
+
+def _connpy_completion(wordsnumber, words, info = None):
+ if wordsnumber == 3:
+ result = ["--help", "--set-wireshark-path"]
+ result.extend(info["nodes"])
+ elif wordsnumber == 5 and words[1] in info["nodes"]:
+ result = ['--wireshark', '--namespace', '--filter', '--help']
+ elif wordsnumber == 6 and words[3] in ["-w", "--wireshark"]:
+ result = ['--namespace', '--filter', '--help']
+ elif wordsnumber == 7 and words[3] in ["-n", "--namespace"]:
+ result = ['--wireshark', '--filter', '--help']
+ elif wordsnumber == 8:
+ if any(w in words for w in ["-w", "--wireshark"]) and any(w in words for w in ["-n", "--namespace"]):
+ result = ['--filter', '--help']
+ else:
+ result = []
+
+ return result
diff --git a/connpy/core_plugins/context.py b/connpy/core_plugins/context.py
index 84dcc74..b868b28 100644
--- a/connpy/core_plugins/context.py
+++ b/connpy/core_plugins/context.py
@@ -1,6 +1,7 @@
import argparse
import yaml
import re
+from connpy import printer
class context_manager:
@@ -14,10 +15,10 @@ class context_manager:
def add_context(self, context, regex):
if not context.isalnum():
- print("Context name has to be alphanumeric.")
+ printer.error("Context name has to be alphanumeric.")
exit(1)
elif context in self.contexts:
- print(f"Context {context} already exists.")
+ printer.error(f"Context {context} already exists.")
exit(2)
else:
self.contexts[context] = regex
@@ -25,10 +26,10 @@ class context_manager:
def modify_context(self, context, regex):
if context == "all":
- print("Can't modify default context: all")
+ printer.error("Can't modify default context: all")
exit(3)
elif context not in self.contexts:
- print(f"Context {context} doesn't exist.")
+ printer.error(f"Context {context} doesn't exist.")
exit(4)
else:
self.contexts[context] = regex
@@ -36,13 +37,13 @@ class context_manager:
def delete_context(self, context):
if context == "all":
- print("Can't delete default context: all")
+ printer.error("Can't delete default context: all")
exit(3)
elif context not in self.contexts:
- print(f"Context {context} doesn't exist.")
+ printer.error(f"Context {context} doesn't exist.")
exit(4)
if context == self.current_context:
- print(f"Can't delete current context: {self.current_context}")
+ printer.error(f"Can't delete current context: {self.current_context}")
exit(5)
else:
self.contexts.pop(context)
@@ -51,26 +52,27 @@ class context_manager:
def list_contexts(self):
for key in self.contexts.keys():
if key == self.current_context:
- print(f"{key} * (active)")
+ printer.success(f"{key} (active)")
else:
- print(key)
+ printer.custom(" ",key)
def set_context(self, context):
if context not in self.contexts:
- print(f"Context {context} doesn't exist.")
+ printer.error(f"Context {context} doesn't exist.")
exit(4)
elif context == self.current_context:
- print(f"Context {context} already set")
+ printer.info(f"Context {context} already set")
exit(0)
else:
self.connapp._change_settings("current_context", context)
def show_context(self, context):
if context not in self.contexts:
- print(f"Context {context} doesn't exist.")
+ printer.error(f"Context {context} doesn't exist.")
exit(4)
else:
yaml_output = yaml.dump(self.contexts[context], sort_keys=False, default_flow_style=False)
+ printer.custom(context,"")
print(yaml_output)
@@ -113,18 +115,17 @@ class Preload:
class Parser:
def __init__(self):
self.parser = argparse.ArgumentParser(description="Manage contexts with regex matching", formatter_class=argparse.RawTextHelpFormatter)
- self.description = "Manage contexts with regex matching"
# Define the context name as a positional argument
self.parser.add_argument("context_name", help="Name of the context", nargs='?')
group = self.parser.add_mutually_exclusive_group(required=True)
- group.add_argument("-a", "--add", nargs='+', help='Add a new context with regex values. Usage: context -a name "regex1" "regex2"')
- group.add_argument("-r", "--rm", "--del", action='store_true', help="Delete a context. Usage: context -d name")
- group.add_argument("--ls", action='store_true', help="List all contexts. Usage: context --list")
- group.add_argument("--set", action='store_true', help="Set the used context. Usage: context --set name")
- group.add_argument("-s", "--show", action='store_true', help="Show the defined regex of a context. Usage: context --show name")
- group.add_argument("-e", "--edit", "--mod", nargs='+', help='Modify an existing context. Usage: context --mod name "regex1" "regex2"')
+ group.add_argument("-a", "--add", nargs='+', help='Add a new context with regex values.\nUsage: context -a name "regex1" "regex2"')
+ group.add_argument("-r", "--rm", "--del", action='store_true', help="Delete a context.\nUsage: context -d name")
+ group.add_argument("--ls", action='store_true', help="List all contexts.\nUsage: context --ls")
+ group.add_argument("--set", action='store_true', help="Set the used context.\nUsage: context --set name")
+ group.add_argument("-s", "--show", action='store_true', help="Show the defined regex of a context.\nUsage: context --show name")
+ group.add_argument("-e", "--edit", "--mod", nargs='+', help='Modify an existing context.\nUsage: context --mod name "regex1" "regex2"')
class Entrypoint:
def __init__(self, args, parser, connapp):
diff --git a/connpy/core_plugins/sync.py b/connpy/core_plugins/sync.py
index b4c3ab3..ccfcf5b 100755
--- a/connpy/core_plugins/sync.py
+++ b/connpy/core_plugins/sync.py
@@ -7,6 +7,7 @@ import tempfile
import io
import yaml
import threading
+from connpy import printer
from google.oauth2.credentials import Credentials
from google.auth.transport.requests import Request
from googleapiclient.discovery import build
@@ -50,33 +51,33 @@ class sync:
with open(self.token_file, 'w') as token:
token.write(creds.to_json())
- print("Logged in successfully.")
+ printer.success("Logged in successfully.")
except RefreshError as e:
# If refresh fails, delete the invalid token file and start a new login flow
if os.path.exists(self.token_file):
os.remove(self.token_file)
- print("Existing token was invalid and has been removed. Please log in again.")
+ printer.warning("Existing token was invalid and has been removed. Please log in again.")
flow = InstalledAppFlow.from_client_secrets_file(
self.google_client, self.scopes)
creds = flow.run_local_server(port=0, access_type='offline')
with open(self.token_file, 'w') as token:
token.write(creds.to_json())
- print("Logged in successfully after re-authentication.")
+ printer.success("Logged in successfully after re-authentication.")
def logout(self):
if os.path.exists(self.token_file):
os.remove(self.token_file)
- print("Logged out successfully.")
+ printer.success("Logged out successfully.")
else:
- print("No credentials file found. Already logged out.")
+ printer.info("No credentials file found. Already logged out.")
def get_credentials(self):
# Load credentials from token.json
if os.path.exists(self.token_file):
creds = Credentials.from_authorized_user_file(self.token_file, self.scopes)
else:
- print("Credentials file not found.")
+ printer.error("Credentials file not found.")
return 0
# If there are no valid credentials available, ask the user to log in again
@@ -85,10 +86,10 @@ class sync:
try:
creds.refresh(Request())
except RefreshError:
- print("Could not refresh access token. Please log in again.")
+ printer.warning("Could not refresh access token. Please log in again.")
return 0
else:
- print("Credentials are missing or invalid. Please log in.")
+ printer.warning("Credentials are missing or invalid. Please log in.")
return 0
return creds
@@ -114,8 +115,8 @@ class sync:
return False
def status(self):
- print(f"Login: {self.check_login_status()}")
- print(f"Sync: {self.sync}")
+ printer.info(f"Login: {self.check_login_status()}")
+ printer.info(f"Sync: {self.sync}")
def get_appdata_files(self):
@@ -151,17 +152,18 @@ class sync:
return files_info
except HttpError as error:
- print(f"An error occurred: {error}")
+ printer.error(f"An error occurred: {error}")
return 0
def dump_appdata_files_yaml(self):
files_info = self.get_appdata_files()
if not files_info:
- print("Failed to retrieve files or no files found.")
+ printer.error("Failed to retrieve files or no files found.")
return
# Pretty print as YAML
yaml_output = yaml.dump(files_info, sort_keys=False, default_flow_style=False)
+ printer.custom("backups","")
print(yaml_output)
@@ -233,16 +235,16 @@ class sync:
oldest_file = min(app_data_files, key=lambda x: x['timestamp'])
delete_old = self.delete_file_by_id(oldest_file['id'])
if delete_old:
- print(delete_old)
+ printer.error(delete_old)
return 1
# Upload the new file
upload_new = self.backup_file_to_drive(zip_path, timestamp)
if upload_new:
- print(upload_new)
+ printer.error(upload_new)
return 1
- print("Backup to google uploaded successfully.")
+ printer.success("Backup to google uploaded successfully.")
return 0
def decompress_zip(self, zip_path):
@@ -253,7 +255,7 @@ class sync:
zipf.extract(".osk", os.path.dirname(self.key))
return 0
except Exception as e:
- print(f"An error occurred: {e}")
+ printer.error(f"An error occurred: {e}")
return 1
def download_file_by_id(self, file_id, destination_path):
@@ -282,14 +284,14 @@ class sync:
# Get the files in the app data folder
app_data_files = self.get_appdata_files()
if not app_data_files:
- print("No files found in app data folder.")
+ printer.error("No files found in app data folder.")
return 1
# Check if a specific file_id was provided and if it exists in the list
if file_id:
selected_file = next((f for f in app_data_files if f['id'] == file_id), None)
if not selected_file:
- print(f"No file found with ID: {file_id}")
+ printer.error(f"No file found with ID: {file_id}")
return 1
else:
# Find the latest file based on timestamp
@@ -302,10 +304,10 @@ class sync:
# Unzip the downloaded file to the destination folder
if self.decompress_zip(temp_download_path):
- print("Failed to decompress the file.")
+ printer.error("Failed to decompress the file.")
return 1
- print(f"Backup from Google Drive restored successfully: {selected_file['name']}")
+ printer.success(f"Backup from Google Drive restored successfully: {selected_file['name']}")
return 0
def config_listener_post(self, args, kwargs):
@@ -314,7 +316,7 @@ class sync:
if not kwargs["result"]:
self.compress_and_upload()
else:
- print("Sync cannot be performed. Please check your login status.")
+ printer.warning("Sync cannot be performed. Please check your login status.")
return kwargs["result"]
def config_listener_pre(self, *args, **kwargs):
@@ -337,7 +339,6 @@ class Preload:
class Parser:
def __init__(self):
self.parser = argparse.ArgumentParser(description="Sync config with Google")
- self.description = "Sync config with Google"
subparsers = self.parser.add_subparsers(title="Commands", dest='command',metavar="")
login_parser = subparsers.add_parser("login", help="Login to Google to enable synchronization")
logout_parser = subparsers.add_parser("logout", help="Logout from Google")
diff --git a/connpy/hooks.py b/connpy/hooks.py
index 4bf5e6a..b2befd1 100755
--- a/connpy/hooks.py
+++ b/connpy/hooks.py
@@ -1,6 +1,7 @@
#!/usr/bin/env python3
#Imports
from functools import wraps, partial, update_wrapper
+from . import printer
#functions and classes
@@ -19,7 +20,7 @@ class MethodHook:
try:
args, kwargs = hook(*args, **kwargs)
except Exception as e:
- print(f"{self.func.__name__} Pre-hook {hook.__name__} raised an exception: {e}")
+ printer.error(f"{self.func.__name__} Pre-hook {hook.__name__} raised an exception: {e}")
try:
result = self.func(*args, **kwargs)
@@ -30,7 +31,7 @@ class MethodHook:
try:
result = hook(*args, **kwargs, result=result) # Pass result to hooks
except Exception as e:
- print(f"{self.func.__name__} Post-hook {hook.__name__} raised an exception: {e}")
+ printer.error(f"{self.func.__name__} Post-hook {hook.__name__} raised an exception: {e}")
return result
diff --git a/connpy/plugins.py b/connpy/plugins.py
index 3beff46..18d7bbf 100755
--- a/connpy/plugins.py
+++ b/connpy/plugins.py
@@ -4,6 +4,7 @@ import importlib.util
import sys
import argparse
import os
+from connpy import printer
class Plugins:
def __init__(self):
@@ -30,8 +31,7 @@ class Plugins:
### Verifications:
- The presence of only allowed top-level elements.
- The existence of two specific classes: 'Parser' and 'Entrypoint'. and/or specific class: Preload.
- - 'Parser' class must only have an '__init__' method and must assign 'self.parser'
- and 'self.description'.
+ - 'Parser' class must only have an '__init__' method and must assign 'self.parser'.
- 'Entrypoint' class must have an '__init__' method accepting specific arguments.
If any of these checks fail, the function returns an error message indicating
@@ -77,11 +77,12 @@ class Plugins:
if not all(isinstance(method, ast.FunctionDef) and method.name == '__init__' for method in node.body):
return "Parser class should only have __init__ method"
- # Check if 'self.parser' and 'self.description' are assigned in __init__ method
+ # Check if 'self.parser' is assigned in __init__ method
init_method = node.body[0]
assigned_attrs = [target.attr for expr in init_method.body if isinstance(expr, ast.Assign) for target in expr.targets if isinstance(target, ast.Attribute) and isinstance(target.value, ast.Name) and target.value.id == 'self']
- if 'parser' not in assigned_attrs or 'description' not in assigned_attrs:
- return "Parser class should set self.parser and self.description" # 'self.parser' or 'self.description' not assigned in __init__
+ if 'parser' not in assigned_attrs:
+ return "Parser class should set self.parser"
+
elif node.name == 'Entrypoint':
has_entrypoint = True
@@ -124,13 +125,14 @@ class Plugins:
filepath = os.path.join(directory, filename)
check_file = self.verify_script(filepath)
if check_file:
- print(f"Failed to load plugin: {filename}. Reason: {check_file}")
+ printer.error(f"Failed to load plugin: {filename}. Reason: {check_file}")
continue
else:
self.plugins[root_filename] = self._import_from_path(filepath)
if hasattr(self.plugins[root_filename], "Parser"):
self.plugin_parsers[root_filename] = self.plugins[root_filename].Parser()
- subparsers.add_parser(root_filename, parents=[self.plugin_parsers[root_filename].parser], add_help=False, description=self.plugin_parsers[root_filename].description)
+ plugin = self.plugin_parsers[root_filename]
+ subparsers.add_parser(root_filename, parents=[self.plugin_parsers[root_filename].parser], add_help=False, usage=plugin.parser.usage, description=plugin.parser.description, epilog=plugin.parser.epilog, formatter_class=plugin.parser.formatter_class)
if hasattr(self.plugins[root_filename], "Preload"):
self.preloads[root_filename] = self.plugins[root_filename]
diff --git a/connpy/printer.py b/connpy/printer.py
new file mode 100644
index 0000000..68a6a8c
--- /dev/null
+++ b/connpy/printer.py
@@ -0,0 +1,33 @@
+import sys
+
+def _format_multiline(tag, message):
+ lines = message.splitlines()
+ if not lines:
+ return f"[{tag}]"
+ formatted = [f"[{tag}] {lines[0]}"]
+ indent = " " * (len(tag) + 3)
+ for line in lines[1:]:
+ formatted.append(f"{indent}{line}")
+ return "\n".join(formatted)
+
+def info(message):
+ print(_format_multiline("i", message))
+
+def success(message):
+ print(_format_multiline("✓", message))
+
+def start(message):
+ print(_format_multiline("+", message))
+
+def warning(message):
+ print(_format_multiline("!", message))
+
+def error(message):
+ print(_format_multiline("✗", message), file=sys.stderr)
+
+def debug(message):
+ print(_format_multiline("d", message))
+
+def custom(tag, message):
+ print(_format_multiline(tag, message))
+
diff --git a/docs/connpy/index.html b/docs/connpy/index.html
index ece5d40..2ead57c 100644
--- a/docs/connpy/index.html
+++ b/docs/connpy/index.html
@@ -143,9 +143,8 @@ options:
Purpose: Handles parsing of command-line arguments.
Requirements:
Must contain only one method: __init__
.
-The __init__
method must initialize at least two attributes:
+- The
__init__
method must initialize at least one attribute:
self.parser
: An instance of argparse.ArgumentParser
.
-self.description
: A string containing the description of the parser.
@@ -270,6 +269,89 @@ connapp.ai.some_method.register_pre_hook(pre_processing_hook)
if __name__ == "__main__":
This block allows the plugin to be run as a standalone script for testing or independent use.
+Command Completion Support
+Plugins can provide intelligent tab completion by defining a function called _connpy_completion
in the plugin script. This function will be called by Connpy to assist with command-line completion when the user types partial input.
+Function Signature
+def _connpy_completion(wordsnumber, words, info=None):
+ ...
+
+Parameters
+
+
+
+Parameter |
+Description |
+
+
+
+
+wordsnumber |
+Integer indicating the number of words (space-separated tokens) currently on the command line. For plugins, this typically starts at 3 (e.g., connpy <plugin> ... ). |
+
+
+words |
+A list of tokens (words) already typed. words[0] is always the name of the plugin, followed by any subcommands or arguments. |
+
+
+info |
+A dictionary of structured context data provided by Connpy to help with suggestions. |
+
+
+
+Contents of info
+The info
dictionary contains helpful context to generate completions:
+info = {
+ "config": config_dict, # The full loaded configuration
+ "nodes": node_list, # List of all known node names
+ "folders": folder_list, # List of all defined folder names
+ "profiles": profile_list, # List of all profile names
+ "plugins": plugin_list # List of all plugin names
+}
+
+You can use this data to generate suggestions based on the current input.
+Return Value
+The function must return a list of suggestion strings to be presented to the user.
+Example
+def _connpy_completion(wordsnumber, words, info=None):
+ if wordsnumber == 3:
+ return ["--help", "--verbose", "start", "stop"]
+
+ elif wordsnumber == 4 and words[2] == "start":
+ return info["nodes"] # Suggest node names
+
+ return []
+
+
+In this example, if the user types connpy myplugin start
and presses Tab, it will suggest node names.
+
+Handling Unknown Arguments
+Plugins can choose to accept and process unknown arguments that are not explicitly defined in the parser. To enable this behavior, the plugin must define the following hidden argument in its Parser
class:
+self.parser.add_argument(
+ "--unknown-args",
+ action="store_true",
+ default=True,
+ help=argparse.SUPPRESS
+)
+
+Behavior:
+
+- When this argument is present, Connpy will parse the known arguments and capture any extra (unknown) ones.
+- These unknown arguments will be passed to the plugin as
args.unknown_args
inside the Entrypoint
.
+- If the user does not pass any unknown arguments,
args.unknown_args
will contain the default value (True
, unless overridden).
+
+Example:
+If a plugin accepts unknown tcpdump flags like this:
+connpy myplugin -nn -s0
+
+And defines the hidden --unknown-args
flag as shown above, then:
+
+args.unknown_args
inside Entrypoint.__init__()
will be: ['-nn', '-s0']
+
+
+This allows the plugin to receive and process arguments intended for external tools (e.g., tcpdump
) without argparse raising an error.
+
+Note:
+If a plugin does not define --unknown-args
, any extra arguments passed will cause argparse to fail with an unrecognized arguments error.
Script Verification
- The
verify_script
method in plugins.py
is used to check the plugin script's compliance with these standards.
@@ -481,8 +563,7 @@ print(result)
### Verifications:
- The presence of only allowed top-level elements.
- The existence of two specific classes: 'Parser' and 'Entrypoint'. and/or specific class: Preload.
- - 'Parser' class must only have an '__init__' method and must assign 'self.parser'
- and 'self.description'.
+ - 'Parser' class must only have an '__init__' method and must assign 'self.parser'.
- 'Entrypoint' class must have an '__init__' method accepting specific arguments.
If any of these checks fail, the function returns an error message indicating
@@ -528,11 +609,12 @@ print(result)
if not all(isinstance(method, ast.FunctionDef) and method.name == '__init__' for method in node.body):
return "Parser class should only have __init__ method"
- # Check if 'self.parser' and 'self.description' are assigned in __init__ method
+ # Check if 'self.parser' is assigned in __init__ method
init_method = node.body[0]
assigned_attrs = [target.attr for expr in init_method.body if isinstance(expr, ast.Assign) for target in expr.targets if isinstance(target, ast.Attribute) and isinstance(target.value, ast.Name) and target.value.id == 'self']
- if 'parser' not in assigned_attrs or 'description' not in assigned_attrs:
- return "Parser class should set self.parser and self.description" # 'self.parser' or 'self.description' not assigned in __init__
+ if 'parser' not in assigned_attrs:
+ return "Parser class should set self.parser"
+
elif node.name == 'Entrypoint':
has_entrypoint = True
@@ -575,13 +657,14 @@ print(result)
filepath = os.path.join(directory, filename)
check_file = self.verify_script(filepath)
if check_file:
- print(f"Failed to load plugin: {filename}. Reason: {check_file}")
+ printer.error(f"Failed to load plugin: {filename}. Reason: {check_file}")
continue
else:
self.plugins[root_filename] = self._import_from_path(filepath)
if hasattr(self.plugins[root_filename], "Parser"):
self.plugin_parsers[root_filename] = self.plugins[root_filename].Parser()
- subparsers.add_parser(root_filename, parents=[self.plugin_parsers[root_filename].parser], add_help=False, description=self.plugin_parsers[root_filename].description)
+ plugin = self.plugin_parsers[root_filename]
+ subparsers.add_parser(root_filename, parents=[self.plugin_parsers[root_filename].parser], add_help=False, usage=plugin.parser.usage, description=plugin.parser.description, epilog=plugin.parser.epilog, formatter_class=plugin.parser.formatter_class)
if hasattr(self.plugins[root_filename], "Preload"):
self.preloads[root_filename] = self.plugins[root_filename]
@@ -615,8 +698,7 @@ print(result)
### Verifications:
- The presence of only allowed top-level elements.
- The existence of two specific classes: 'Parser' and 'Entrypoint'. and/or specific class: Preload.
- - 'Parser' class must only have an '__init__' method and must assign 'self.parser'
- and 'self.description'.
+ - 'Parser' class must only have an '__init__' method and must assign 'self.parser'.
- 'Entrypoint' class must have an '__init__' method accepting specific arguments.
If any of these checks fail, the function returns an error message indicating
@@ -662,11 +744,12 @@ print(result)
if not all(isinstance(method, ast.FunctionDef) and method.name == '__init__' for method in node.body):
return "Parser class should only have __init__ method"
- # Check if 'self.parser' and 'self.description' are assigned in __init__ method
+ # Check if 'self.parser' is assigned in __init__ method
init_method = node.body[0]
assigned_attrs = [target.attr for expr in init_method.body if isinstance(expr, ast.Assign) for target in expr.targets if isinstance(target, ast.Attribute) and isinstance(target.value, ast.Name) and target.value.id == 'self']
- if 'parser' not in assigned_attrs or 'description' not in assigned_attrs:
- return "Parser class should set self.parser and self.description" # 'self.parser' or 'self.description' not assigned in __init__
+ if 'parser' not in assigned_attrs:
+ return "Parser class should set self.parser"
+
elif node.name == 'Entrypoint':
has_entrypoint = True
@@ -706,8 +789,7 @@ and that it includes mandatory classes with specific attributes and methods.
Verifications:
- The presence of only allowed top-level elements.
- The existence of two specific classes: 'Parser' and 'Entrypoint'. and/or specific class: Preload.
-- 'Parser' class must only have an '__init__' method and must assign 'self.parser'
- and 'self.description'.
+- 'Parser' class must only have an '__init__' method and must assign 'self.parser'.
- 'Entrypoint' class must have an '__init__' method accepting specific arguments.
If any of these checks fail, the function returns an error message indicating
@@ -2110,7 +2192,7 @@ class node:
- result(bool): True if expected value is found after running
the commands using test method.
- - status (int): 0 if the method run or test run succesfully.
+ - status (int): 0 if the method run or test run successfully.
1 if connection failed.
2 if expect timeouts without prompt or EOF.
@@ -2336,7 +2418,7 @@ class node:
if connect == True:
size = re.search('columns=([0-9]+).*lines=([0-9]+)',str(os.get_terminal_size()))
self.child.setwinsize(int(size.group(2)),int(size.group(1)))
- print("Connected to " + self.unique + " at " + self.host + (":" if self.port != '' else '') + self.port + " via: " + self.protocol)
+ printer.success("Connected to " + self.unique + " at " + self.host + (":" if self.port != '' else '') + self.port + " via: " + self.protocol)
if 'logfile' in dir(self):
# Initialize self.mylog
if not 'mylog' in dir(self):
@@ -2361,7 +2443,7 @@ class node:
f.write(self._logclean(self.mylog.getvalue().decode(), True))
else:
- print(connect)
+ printer.error(connect)
exit(1)
@MethodHook
@@ -2667,7 +2749,7 @@ class node:
if isinstance(self.tags, dict) and self.tags.get("console"):
child.sendline()
if debug:
- print(cmd)
+ printer.debug(f"Command:\n{cmd}")
self.mylog = io.BytesIO()
child.logfile_read = self.mylog
@@ -2727,6 +2809,8 @@ class node:
sleep(1)
child.readline(0)
self.child = child
+ from pexpect import fdpexpect
+ self.raw_child = fdpexpect.fdspawn(self.child.child_fd)
return True
This class generates a node object. Containts all the information and methods to connect and interact with a device using ssh or telnet.
@@ -2737,7 +2821,7 @@ class node:
- result(bool): True if expected value is found after running
the commands using test method.
-- status (int): 0 if the method run or test run succesfully.
+- status (int): 0 if the method run or test run successfully.
1 if connection failed.
2 if expect timeouts without prompt or EOF.
@@ -2796,7 +2880,7 @@ def interact(self, debug = False):
if connect == True:
size = re.search('columns=([0-9]+).*lines=([0-9]+)',str(os.get_terminal_size()))
self.child.setwinsize(int(size.group(2)),int(size.group(1)))
- print("Connected to " + self.unique + " at " + self.host + (":" if self.port != '' else '') + self.port + " via: " + self.protocol)
+ printer.success("Connected to " + self.unique + " at " + self.host + (":" if self.port != '' else '') + self.port + " via: " + self.protocol)
if 'logfile' in dir(self):
# Initialize self.mylog
if not 'mylog' in dir(self):
@@ -2821,7 +2905,7 @@ def interact(self, debug = False):
f.write(self._logclean(self.mylog.getvalue().decode(), True))
else:
- print(connect)
+ printer.error(connect)
exit(1)
Allow user to interact with the node directly, mostly used by connection manager.
@@ -3143,7 +3227,7 @@ class nodes:
Created after running method test.
- status (dict): Dictionary formed by nodes unique as keys, value:
- 0 if method run or test ended succesfully.
+ 0 if method run or test ended successfully.
1 if connection failed.
2 if expect timeouts without prompt or EOF.
@@ -3365,7 +3449,7 @@ class nodes:
Created after running method test.
- status (dict): Dictionary formed by nodes unique as keys, value:
- 0 if method run or test ended succesfully.
+ 0 if method run or test ended successfully.
1 if connection failed.
2 if expect timeouts without prompt or EOF.
@@ -3673,6 +3757,20 @@ def test(self, commands, expected, vars = None,*, prompt = None, parallel = 10,
Executable Block
+
Command Completion Support
+
+
Handling Unknown Arguments
+
Script Verification
Example Script
diff --git a/requirements.txt b/requirements.txt
index abaa405..ed00f25 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -3,7 +3,7 @@ Flask_Cors>=4.0.1
google_api_python_client>=2.125.0
google_auth_oauthlib>=1.2.0
inquirer>=3.3.0
-openai>=0.27.8
+openai>=1.98.0
pexpect>=4.8.0
protobuf>=5.27.2
pycryptodome>=3.18.0