add new AI feature. Add tags feature, used tags: OS for AI, screen_length_command for automation, prompt for automation. fixes

This commit is contained in:
fluzzi 2023-05-05 13:41:32 -03:00
parent 404d874771
commit 8828471c1b
11 changed files with 1572 additions and 174 deletions

View File

@ -14,7 +14,7 @@ pip install connpy
### Standalone module
```
import connpy
router = connpy.node("unique name","ip/hostname", user="username", password="password")
router = connpy.node("uniqueName","ip/host", user="username", password="password")
router.run(["term len 0","show run"])
print(router.output)
hasip = router.test("show ip int brief","1.1.1.1")
@ -46,9 +46,9 @@ nodes["router2"] = conf.getitem("router2@office")
nodes["router10"] = conf.getitem("router10@datacenter")
#Also, you can create the nodes manually:
nodes = {}
nodes["router1"] = {"host": "1.1.1.1", "user": "username", "password": "password1"}
nodes["router2"] = {"host": "1.1.1.2", "user": "username", "password": "password2"}
nodes["router3"] = {"host": "1.1.1.2", "user": "username", "password": "password3"}
nodes["router1"] = {"host": "1.1.1.1", "user": "user", "password": "password1"}
nodes["router2"] = {"host": "1.1.1.2", "user": "user", "password": "password2"}
nodes["router3"] = {"host": "1.1.1.2", "user": "user", "password": "password3"}
#Finally you run some tasks on the nodes
mynodes = connpy.nodes(nodes, config = conf)
result = mynodes.test(["show ip int br"], "1.1.1.2")
@ -82,7 +82,17 @@ routers.test("ping {ip}", expected, variables)
for key in routers.result:
print(key, ' ---> ', ("pass" if routers.result[key] else "fail"))
```
### Using AI
```
import connpy
conf = connpy.configfile()
organization = 'openai-org'
api_key = "openai-key"
myia = ai(conf, organization, api_key)
input = "go to router 1 and get me the full configuration"
result = myia.ask(input, dryrun = False)
print(result)
```
## Connection manager
### Features
- You can generate profiles and reference them from nodes using @profilename so you dont
@ -182,7 +192,31 @@ With the Connpy API you can run commands on devices using http requests
---
### 2. Run Commands
### 2. Get Nodes
**Endpoint**: `/get_nodes`
**Method**: `POST`
**Description**: This route returns a dictionary of nodes with all their attributes. It can also filter the nodes based on a given keyword.
#### Request Body:
```json
{
"filter": "<keyword>"
}
```
* `filter` (optional): A keyword to filter the nodes. It returns only the nodes that contain the keyword. If not provided, the route will return the entire list of nodes.
#### Response:
- A JSON array containing the filtered nodes.
---
### 3. Run Commands
**Endpoint**: `/run_commands`
@ -211,3 +245,29 @@ With the Connpy API you can run commands on devices using http requests
#### Response:
- A JSON object with the results of the executed commands on the nodes.
### 4. Ask AI
**Endpoint**: `/ask_ai`
**Method**: `POST`
**Description**: This route sends to chatgpt IA a request that will parse it into an understandable output for the application and then run the request.
#### Request Body:
```json
{
"input": "<user input request>",
"dryrun": true or false
}
```
* `input` (required): The user input requesting the AI to perform an action on some devices or get the devices list.
* `dryrun` (optional): If set to true, it will return the parameters to run the request but it won't run it. default is false.
#### Response:
- A JSON array containing the action to run and the parameters and the result of the action.

View File

@ -98,7 +98,31 @@ With the Connpy API you can run commands on devices using http requests
---
### 2. Run Commands
### 2. Get Nodes
**Endpoint**: `/get_nodes`
**Method**: `POST`
**Description**: This route returns a dictionary of nodes with all their attributes. It can also filter the nodes based on a given keyword.
#### Request Body:
```json
{
"filter": "<keyword>"
}
```
* `filter` (optional): A keyword to filter the nodes. It returns only the nodes that contain the keyword. If not provided, the route will return the entire list of nodes.
#### Response:
- A JSON array containing the filtered nodes.
---
### 3. Run Commands
**Endpoint**: `/run_commands`
@ -127,13 +151,37 @@ With the Connpy API you can run commands on devices using http requests
#### Response:
- A JSON object with the results of the executed commands on the nodes.
## Automation module
the automation module
### 4. Ask AI
**Endpoint**: `/ask_ai`
**Method**: `POST`
**Description**: This route sends to chatgpt IA a request that will parse it into an understandable output for the application and then run the request.
#### Request Body:
```json
{
"input": "<user input request>",
"dryrun": true or false
}
```
* `input` (required): The user input requesting the AI to perform an action on some devices or get the devices list.
* `dryrun` (optional): If set to true, it will return the parameters to run the request but it won't run it. default is false.
#### Response:
- A JSON array containing the action to run and the parameters and the result of the action.
## Automation module
The automation module
### Standalone module
```
import connpy
router = connpy.node("unique name","ip/hostname", user="user", password="pass")
router = connpy.node("uniqueName","ip/host", user="user", password="pass")
router.run(["term len 0","show run"])
print(router.output)
hasip = router.test("show ip int brief","1.1.1.1")
@ -165,9 +213,9 @@ nodes["router2"] = conf.getitem("router2@office")
nodes["router10"] = conf.getitem("router10@datacenter")
#Also, you can create the nodes manually:
nodes = {}
nodes["router1"] = {"host": "1.1.1.1", "user": "username", "password": "pass1"}
nodes["router2"] = {"host": "1.1.1.2", "user": "username", "password": "pass2"}
nodes["router3"] = {"host": "1.1.1.2", "user": "username", "password": "pass3"}
nodes["router1"] = {"host": "1.1.1.1", "user": "user", "password": "pass1"}
nodes["router2"] = {"host": "1.1.1.2", "user": "user", "password": "pass2"}
nodes["router3"] = {"host": "1.1.1.2", "user": "user", "password": "pass3"}
#Finally you run some tasks on the nodes
mynodes = connpy.nodes(nodes, config = conf)
result = mynodes.test(["show ip int br"], "1.1.1.2")
@ -201,14 +249,27 @@ routers.test("ping {ip}", expected, variables)
for key in routers.result:
print(key, ' ---> ', ("pass" if routers.result[key] else "fail"))
```
### Using AI
```
import connpy
conf = connpy.configfile()
organization = 'openai-org'
api_key = "openai-key"
myia = ai(conf, organization, api_key)
input = "go to router 1 and get me the full configuration"
result = myia.ask(input, dryrun = False)
print(result)
```
'''
from .core import node,nodes
from .configfile import configfile
from .connapp import connapp
from .api import *
from .ai import ai
from ._version import __version__
from pkg_resources import get_distribution
__all__ = ["node", "nodes", "configfile", "connapp"]
__all__ = ["node", "nodes", "configfile", "connapp", "ai"]
__author__ = "Federico Luzzi"
__pdoc__ = {
'core': False,

View File

@ -1,2 +1,2 @@
__version__ = "3.0.7"
__version__ = "3.2.0"

317
connpy/ai.py Executable file
View File

@ -0,0 +1,317 @@
import openai
import requests
import json
import re
import ast
from textwrap import dedent
from .core import nodes
class ai:
''' This class generates a ai object. Containts all the information and methods to make requests to openAI chatGPT to run actions on the application.
### Attributes:
- model (str): Model of GPT api to use. Default is gpt-3.5-turbo.
- temp (float): Value between 0 and 1 that control the randomness
of generated text, with higher values increasing
creativity. Default is 0.7.
'''
def __init__(self, config, org = None, api_key = None, model = "gpt-3.5-turbo", temp = 0.7):
'''
### Parameters:
- config (obj): Pass the object created with class configfile with
key for decryption and extra configuration if you
are using connection manager.
### Optional Parameters:
- org (str): A unique token identifying the user organization
to interact with the API.
- api_key (str): A unique authentication token required to access
and interact with the API.
- model (str): Model of GPT api to use. Default is gpt-3.5-turbo.
- temp (float): Value between 0 and 1 that control the randomness
of generated text, with higher values increasing
creativity. Default is 0.7.
'''
self.config = config
if org:
openai.organization = org
else:
try:
openai.organization = self.config.config["openai"]["organization"]
except:
raise ValueError("Missing openai organization")
if api_key:
openai.api_key = api_key
else:
try:
openai.api_key = self.config.config["openai"]["api_key"]
except:
raise ValueError("Missing openai api_key")
self.__prompt = {}
self.__prompt["original_system"] = """
When provided with user input for an SSH connection management application, analyze the input and extract the following information:
- app_related: True if the input is related to the application's purpose and the request is understood; False if the input is not related, not understood, or if mandatory information like filter is missing.
- type: Given a user input, identify the type of request they want to make. The input will represent one of two options:
1. "command" - The user wants to get information from devices by running commands.
2. "list_nodes" - The user wants to get a list of nodes, devices, servers, or routers.
Response wich command or list_nodes type depending on the request.
- filter: One or more regex patterns indicating the device or group of devices the command should be run on, returned as a Python list (e.g., ['hostname', 'hostname@folder', '@subfolder@folder']). The filter can have different formats, such as:
- hostname
- hostname@folder
- hostname@subfolder@folder
- partofhostname
- @folder
- @subfolder@folder
- regex_pattern
The filter should be extracted from the user input exactly as it was provided.
Always preserve the exact filter pattern provided by the user, with no modifications. Do not process any regex, the application can do that.
If no filter is specified, set it to None.
- Expected: A value representing an expected output to search for when running the command. For the user this is a optional value. Set it to 'None' if no value was captured.
expected value should ALWAYS come from the user input explicitly.
- response: An optional field to be filled when app_related is False or when providing an explanation related to the app. This is where you can engage in small talk, answer questions not related to the app, or provide explanations about the extracted information.
Always respond in the following format:
app_related: {{app_related}}
Type: {{command}}
Filter: {{filter}}
Expected: {{expected}}
Response: {{response}}
"""
self.__prompt["original_user"] = "Get the IP addresses of loopback0 for all routers from w2az1 and e1.*(prod|dev) and check if they have the ip 192.168.1.1"
self.__prompt["original_assistant"] = "app_related: True\nType: Command\nFilter: ['w2az1', 'e1.*(prod|dev)']\nExpected: 192.168.1.1"
self.__prompt["command_system"] = """
For each device listed below, provide the command(s) needed to perform the specified action, depending on the device OS (e.g., Cisco IOSXR router, Linux server). Always format your response as a Python list (e.g., ['command1', 'command2']).
Note that the application knows how to connect to devices via SSH, so you only need to provide the command(s) to run after connecting.
If the commands needed are not for the specific OS type, just send an empty list (e.g., []).
It is crucial to always include the device name provided in your response, even when there is only one device.
your response has to be always like this:
node1: ["command1", "command2"]
node2: ["command1", "command2", "command3"]
node1@folder: ["command1"]
Node4@subfolder@folder: []
"""
self.__prompt["command_user"]= """
input: show me the full configuration for all this devices:
Devices:
router1: cisco ios
"""
self.__prompt["command_assistant"]= """
router1: ['show running-config']
"""
self.model = model
self.temp = temp
def _clean_original_response(self, raw_response):
#Parse response for first request to openAI GPT.
info_dict = {}
info_dict["app_related"] = False
current_key = "response"
for line in raw_response.split("\n"):
if line.strip() == "":
line = "\n"
possible_keys = ["app_related", "type", "filter", "expected", "response"]
if ':' in line and (key := line.split(':', 1)[0].strip().lower()) in possible_keys:
key, value = line.split(":", 1)
key = key.strip().lower()
value = value.strip()
# Convert "true" or "false" (case-insensitive) to Python boolean
if value.lower() == "true":
value = True
elif value.lower() == "false":
value = False
elif value.lower() == "none":
value = None
if key == "filter":
value = ast.literal_eval(value)
#store in dictionary
info_dict[key] = value
current_key = key
else:
if current_key == "response":
if "response" in info_dict:
info_dict[current_key] += "\n" + line
else:
info_dict[current_key] = line
return info_dict
def _clean_command_response(self, raw_response):
#Parse response for command request to openAI GPT.
info_dict = {}
info_dict["commands"] = []
info_dict["variables"] = {}
info_dict["variables"]["__global__"] = {}
for line in raw_response.split("\n"):
if ":" in line:
key, value = line.split(":", 1)
key = key.strip()
newvalue = {}
try:
value = ast.literal_eval(value.strip())
for i,e in enumerate(value, start=1):
newvalue[f"command{i}"] = e
if f"{{command{i}}}" not in info_dict["commands"]:
info_dict["commands"].append(f"{{command{i}}}")
info_dict["variables"]["__global__"][f"command{i}"] = ""
info_dict["variables"][key] = newvalue
except:
pass
return info_dict
def _get_commands(self, user_input, nodes):
#Send the request for commands for each device to openAI GPT.
output_list = []
for key, value in nodes.items():
tags = value.get('tags', {})
try:
os_value = tags.get('os', '')
except:
os_value = ""
output_list.append(f"{key}: {os_value}")
output_str = "\n".join(output_list)
command_input = f"input: {user_input}\n\nDevices:\n{output_str}"
message = []
message.append({"role": "system", "content": dedent(self.__prompt["command_system"])})
message.append({"role": "user", "content": dedent(self.__prompt["command_user"])})
message.append({"role": "assistant", "content": dedent(self.__prompt["command_assistant"])})
message.append({"role": "user", "content": command_input})
response = openai.ChatCompletion.create(
model=self.model,
messages=message,
temperature=self.temp
)
output = {}
output["dict_response"] = response
output["raw_response"] = response["choices"][0]["message"]["content"]
output["response"] = self._clean_command_response(output["raw_response"])
return output
def _get_filter(self, user_input):
#Send the request to identify the filter and other attributes from the user input to GPT.
message = []
message.append({"role": "system", "content": dedent(self.__prompt["original_system"])})
message.append({"role": "user", "content": dedent(self.__prompt["original_user"])})
message.append({"role": "assistant", "content": dedent(self.__prompt["original_assistant"])})
message.append({"role": "user", "content": user_input})
response = openai.ChatCompletion.create(
model=self.model,
messages=message,
temperature=self.temp,
top_p=1
)
output = {}
output["dict_response"] = response
output["raw_response"] = response["choices"][0]["message"]["content"]
clear_response = self._clean_original_response(output["raw_response"])
output["response"] = self._clean_original_response(output["raw_response"])
return output
def ask(self, user_input, dryrun = False):
'''
Send the user input to openAI GPT and parse the response to run an action in the application.
### Parameters:
- user_input (str): Request to send to openAI that will be parsed
and returned to execute on the application.
AI understands the following tasks:
- Run a command on a group of devices.
- List a group of devices.
- Test a command on a group of devices
and verify if the output contain an
expected value.
### Optional Parameters:
- dryrun (bool): Set to true to get the arguments to use to run
in the app. Default is false and it will run
the actions directly.
### Returns:
dict: Dictionary formed with the following keys:
- input: User input received
- app_related: True if GPT detected the request to be related
to the application.
- dryrun: True/False
- response: If the request is not related to the app. this
key will contain chatGPT answer.
- action: The action detected by the AI to run in the app.
- filter: If it was detected by the AI, the filter used
to get the list of nodes to work on.
- nodes: If it's not a dryrun, the list of nodes matched by
the filter.
- args: A dictionary of arguments required to run command(s)
on the nodes.
- result: A dictionary with the output of the commands or
the test.
'''
output = {}
original = self._get_filter(user_input)
output["input"] = user_input
output["app_related"] = original["response"]["app_related"]
output["dryrun"] = dryrun
if not output["app_related"]:
output["response"] = original["response"]["response"]
else:
type = original["response"]["type"].lower()
if "filter" in original["response"]:
output["filter"] = original["response"]["filter"]
if not self.config.config["case"]:
if isinstance(output["filter"], list):
output["filter"] = [item.lower() for item in output["filter"]]
else:
output["filter"] = output["filter"].lower()
if not dryrun or type == "command":
thisnodes = self.config._getallnodesfull(output["filter"])
output["nodes"] = list(thisnodes.keys())
if not type == "command":
output["action"] = type
else:
commands = self._get_commands(user_input, thisnodes)
output["args"] = {}
output["args"]["commands"] = commands["response"]["commands"]
output["args"]["vars"] = commands["response"]["variables"]
if original["response"]["expected"]:
output["args"]["expected"] = original["response"]["expected"]
output["action"] = "test"
else:
output["action"] = "run"
if not dryrun:
mynodes = nodes(self.config.getitems(output["nodes"]),config=self.config)
if output["action"] == "test":
output["result"] = mynodes.test(**output["args"])
elif output["action"] == "run":
output["result"] = mynodes.run(**output["args"])
return output

61
connpy/api.py Normal file → Executable file
View File

@ -1,5 +1,6 @@
from flask import Flask, request, jsonify
from connpy import configfile, node, nodes
from connpy.ai import ai as myai
from waitress import serve
import os
import signal
@ -22,18 +23,50 @@ def root():
@app.route("/list_nodes", methods=["POST"])
def list_nodes():
conf = app.custom_config
output = conf._getallnodes()
case = conf.config["case"]
try:
data = request.get_json()
filter = data["filter"]
if not case:
filter = filter.lower()
output = [item for item in output if filter in item]
if isinstance(filter, list):
filter = [item.lower() for item in filter]
else:
filter = filter.lower()
output = conf._getallnodes(filter)
except:
pass
output = conf._getallnodes()
return jsonify(output)
@app.route("/get_nodes", methods=["POST"])
def get_nodes():
conf = app.custom_config
case = conf.config["case"]
try:
data = request.get_json()
filter = data["filter"]
if not case:
if isinstance(filter, list):
filter = [item.lower() for item in filter]
else:
filter = filter.lower()
output = conf._getallnodesfull(filter)
except:
output = conf._getallnodesfull()
return jsonify(output)
@app.route("/ask_ai", methods=["POST"])
def ask_ai():
conf = app.custom_config
data = request.get_json()
input = data["input"]
if "dryrun" in data:
dryrun = data["dryrun"]
else:
dryrun = False
ai = myai(conf)
return ai.ask(input, dryrun)
@app.route("/run_commands", methods=["POST"])
def run_commands():
conf = app.custom_config
@ -51,25 +84,7 @@ def run_commands():
error = "'{}' is mandatory".format(e.args[0])
return({"DataError": error})
if isinstance(nodelist, list):
for i in nodelist:
if isinstance(i, dict):
name = list(i.keys())[0]
mylist = i[name]
if not case:
name = name.lower()
mylist = [item.lower() for item in mylist]
this = conf.getitem(name, mylist)
mynodes.update(this)
elif i.startswith("@"):
if not case:
i = i.lower()
this = conf.getitem(i)
mynodes.update(this)
else:
if not case:
i = i.lower()
this = conf.getitem(i)
mynodes[i] = this
mynodes = conf.getitems(nodelist)
else:
if not case:
nodelist = nodelist.lower()

View File

@ -92,7 +92,7 @@ class configfile:
def _createconfig(self, conf):
#Create config file
defaultconfig = {'config': {'case': False, 'idletime': 30, 'fzf': False}, 'connections': {}, 'profiles': { "default": { "host":"", "protocol":"ssh", "port":"", "user":"", "password":"", "options":"", "logs":"" }}}
defaultconfig = {'config': {'case': False, 'idletime': 30, 'fzf': False}, 'connections': {}, 'profiles': { "default": { "host":"", "protocol":"ssh", "port":"", "user":"", "password":"", "options":"", "logs":"", "tags": "" }}}
if not os.path.exists(conf):
with open(conf, "w") as f:
json.dump(defaultconfig, f, indent = 4)
@ -159,8 +159,8 @@ class configfile:
### Returns:
dict: Dictionary containing information of node or multiple dictionaries
of multiple nodes.
dict: Dictionary containing information of node or multiple
dictionaries of multiple nodes.
'''
uniques = self._explode_unique(unique)
@ -197,14 +197,53 @@ class configfile:
newnode.pop("type")
return newnode
def _connections_add(self,*, id, host, folder='', subfolder='', options='', logs='', password='', port='', protocol='', user='', type = "connection" ):
def getitems(self, uniques):
'''
Get a group of nodes from configfile which can be passed to node/nodes class
### Parameters:
- uniques (str/list): Regex string name that will match hostnames
from the connection manager. It can be a
list of strings.
### Returns:
dict: Dictionary containing information of node or multiple
dictionaries of multiple nodes.
'''
nodes = {}
for i in uniques:
if isinstance(i, dict):
name = list(i.keys())[0]
mylist = i[name]
if not self.config["case"]:
name = name.lower()
mylist = [item.lower() for item in mylist]
this = self.getitem(name, mylist)
nodes.update(this)
elif i.startswith("@"):
if not self.config["case"]:
i = i.lower()
this = self.getitem(i)
nodes.update(this)
else:
if not self.config["case"]:
i = i.lower()
this = self.getitem(i)
nodes[i] = this
return nodes
def _connections_add(self,*, id, host, folder='', subfolder='', options='', logs='', password='', port='', protocol='', user='', tags='', type = "connection" ):
#Add connection from config
if folder == '':
self.connections[id] = {"host": host, "options": options, "logs": logs, "password": password, "port": port, "protocol": protocol, "user": user, "type": type}
self.connections[id] = {"host": host, "options": options, "logs": logs, "password": password, "port": port, "protocol": protocol, "user": user, "tags": tags,"type": type}
elif folder != '' and subfolder == '':
self.connections[folder][id] = {"host": host, "options": options, "logs": logs, "password": password, "port": port, "protocol": protocol, "user": user, "type": type}
self.connections[folder][id] = {"host": host, "options": options, "logs": logs, "password": password, "port": port, "protocol": protocol, "user": user, "tags": tags, "type": type}
elif folder != '' and subfolder != '':
self.connections[folder][subfolder][id] = {"host": host, "options": options, "logs": logs, "password": password, "port": port, "protocol": protocol, "user": user, "type": type}
self.connections[folder][subfolder][id] = {"host": host, "options": options, "logs": logs, "password": password, "port": port, "protocol": protocol, "user": user, "tags": tags, "type": type}
def _connections_del(self,*, id, folder='', subfolder=''):
@ -233,16 +272,16 @@ class configfile:
del self.connections[folder][subfolder]
def _profiles_add(self,*, id, host = '', options='', logs='', password='', port='', protocol='', user='' ):
def _profiles_add(self,*, id, host = '', options='', logs='', password='', port='', protocol='', user='', tags='' ):
#Add profile from config
self.profiles[id] = {"host": host, "options": options, "logs": logs, "password": password, "port": port, "protocol": protocol, "user": user}
self.profiles[id] = {"host": host, "options": options, "logs": logs, "password": password, "port": port, "protocol": protocol, "user": user, "tags": tags}
def _profiles_del(self,*, id ):
#Delete profile from config
del self.profiles[id]
def _getallnodes(self):
def _getallnodes(self, filter = None):
#get all nodes on configfile
nodes = []
layer1 = [k for k,v in self.connections.items() if isinstance(v, dict) and v["type"] == "connection"]
@ -255,8 +294,51 @@ class configfile:
for s in subfolders:
layer3 = [k + "@" + s + "@" + f for k,v in self.connections[f][s].items() if isinstance(v, dict) and v["type"] == "connection"]
nodes.extend(layer3)
if filter:
if isinstance(filter, str):
nodes = [item for item in nodes if re.search(filter, item)]
elif isinstance(filter, list):
nodes = [item for item in nodes if any(re.search(pattern, item) for pattern in filter)]
else:
raise ValueError("filter must be a string or a list of strings")
return nodes
def _getallnodesfull(self, filter = None):
#get all nodes on configfile with all their attributes.
nodes = {}
layer1 = {k:v for k,v in self.connections.items() if isinstance(v, dict) and v["type"] == "connection"}
folders = [k for k,v in self.connections.items() if isinstance(v, dict) and v["type"] == "folder"]
nodes.update(layer1)
for f in folders:
layer2 = {k + "@" + f:v for k,v in self.connections[f].items() if isinstance(v, dict) and v["type"] == "connection"}
nodes.update(layer2)
subfolders = [k for k,v in self.connections[f].items() if isinstance(v, dict) and v["type"] == "subfolder"]
for s in subfolders:
layer3 = {k + "@" + s + "@" + f:v for k,v in self.connections[f][s].items() if isinstance(v, dict) and v["type"] == "connection"}
nodes.update(layer3)
if filter:
if isinstance(filter, str):
nodes = {k: v for k, v in nodes.items() if re.search(filter, k)}
elif isinstance(filter, list):
nodes = {k: v for k, v in nodes.items() if any(re.search(pattern, k) for pattern in filter)}
else:
raise ValueError("filter must be a string or a list of strings")
for node, keys in nodes.items():
for key, value in keys.items():
profile = re.search("^@(.*)", str(value))
if profile:
try:
nodes[node][key] = self.profiles[profile.group(1)][key]
except:
nodes[node][key] = ""
elif value == '' and key == "protocol":
try:
nodes[node][key] = config.profiles["default"][key]
except:
nodes[node][key] = "ssh"
return nodes
def _getallfolders(self):
#get all folders on configfile
folders = ["@" + k for k,v in self.connections.items() if isinstance(v, dict) and v["type"] == "folder"]

View File

@ -10,8 +10,9 @@ import sys
import inquirer
from .core import node,nodes
from ._version import __version__
from .api import *
from .api import start_api,stop_api,debug_api
import yaml
import ast
try:
from pyfzf.pyfzf import FzfPrompt
except:
@ -119,6 +120,7 @@ class connapp:
configcrud.add_argument("--keepalive", dest="idletime", nargs=1, action=self._store_type, help="Set keepalive time in seconds, 0 to disable", type=int, metavar="INT")
configcrud.add_argument("--completion", dest="completion", nargs=1, choices=["bash","zsh"], action=self._store_type, help="Get terminal completion configuration for conn")
configcrud.add_argument("--configfolder", dest="configfolder", nargs=1, action=self._store_type, help="Set the default location for config file", metavar="FOLDER")
configcrud.add_argument("--openai", dest="openai", nargs=2, action=self._store_type, help="Set openai organization and api_key", metavar=("ORGANIZATION", "API_KEY"))
configparser.set_defaults(func=self._func_others)
#Manage sys arguments
commands = ["node", "profile", "mv", "move","copy", "cp", "bulk", "ls", "list", "run", "config", "api"]
@ -244,6 +246,7 @@ class connapp:
print("You can also leave empty any value except hostname/IP.")
print("You can pass 1 or more passwords using comma separated @profiles")
print("You can use this variables on logging file name: ${id} ${unique} ${host} ${port} ${user} ${protocol}")
print("Some useful tags to set for automation are 'os', 'screen_length_command', and 'prompt'.")
newnode = self._questions_nodes(args.data, uniques)
if newnode == False:
exit(7)
@ -372,7 +375,7 @@ class connapp:
def _func_others(self, args):
#Function called when using other commands
actions = {"ls": self._ls, "move": self._mvcp, "cp": self._mvcp, "bulk": self._bulk, "completion": self._completion, "case": self._case, "fzf": self._fzf, "idletime": self._idletime, "configfolder": self._configfolder}
actions = {"ls": self._ls, "move": self._mvcp, "cp": self._mvcp, "bulk": self._bulk, "completion": self._completion, "case": self._case, "fzf": self._fzf, "idletime": self._idletime, "configfolder": self._configfolder, "openai": self._openai}
return actions.get(args.command)(args)
def _ls(self, args):
@ -442,6 +445,7 @@ class connapp:
newnode["port"] = newnodes["port"]
newnode["options"] = newnodes["options"]
newnode["logs"] = newnodes["logs"]
newnode["tags"] = newnodes["tags"]
newnode["user"] = newnodes["user"]
newnode["password"] = newnodes["password"]
count +=1
@ -488,6 +492,13 @@ class connapp:
f.write(str(folder))
print("Config saved")
def _openai(self, args):
openaikeys = {}
openaikeys["organization"] = args.data[0]
openaikeys["api_key"] = args.data[1]
self._change_settings(args.command, openaikeys)
def _change_settings(self, name, value):
self.config.config[name] = value
self.config._saveconfig(self.config.file)
@ -516,7 +527,6 @@ class connapp:
def _node_run(self, args):
command = " ".join(args.data[1:])
command = command.split("-")
matches = list(filter(lambda k: k == args.data[0], self.nodes))
if len(matches) == 0:
print("{} not found".format(args.data[0]))
@ -545,7 +555,6 @@ class connapp:
print("failed reading file {}".format(args.data[0]))
exit(10)
for script in scripts["tasks"]:
nodes = {}
args = {}
try:
action = script["action"]
@ -557,18 +566,7 @@ class connapp:
except KeyError as e:
print("'{}' is mandatory".format(e.args[0]))
exit(11)
for i in nodelist:
if isinstance(i, dict):
name = list(i.keys())[0]
this = self.config.getitem(name, i[name])
nodes.update(this)
elif i.startswith("@"):
this = self.config.getitem(i)
nodes.update(this)
else:
this = self.config.getitem(i)
nodes[i] = this
nodes = self.connnodes(nodes, config = self.config)
nodes = self.connnodes(self.config.getitems(nodelist), config = self.config)
stdout = False
if output is None:
pass
@ -698,6 +696,33 @@ class connapp:
raise inquirer.errors.ValidationError("", reason="Profile {} don't exist".format(i))
return True
def _tags_validation(self, answers, current):
#Validation for Tags in inquirer when managing nodes
if current.startswith("@"):
if current[1:] not in self.profiles:
raise inquirer.errors.ValidationError("", reason="Profile {} don't exist".format(current))
elif current != "":
isdict = False
try:
isdict = ast.literal_eval(current)
except:
pass
if not isinstance (isdict, dict):
raise inquirer.errors.ValidationError("", reason="Tags should be a python dictionary.".format(current))
return True
def _profile_tags_validation(self, answers, current):
#Validation for Tags in inquirer when managing profiles
if current != "":
isdict = False
try:
isdict = ast.literal_eval(current)
except:
pass
if not isinstance (isdict, dict):
raise inquirer.errors.ValidationError("", reason="Tags should be a python dictionary.".format(current))
return True
def _default_validation(self, answers, current):
#Default validation type used in multiples questions in inquirer
if current.startswith("@"):
@ -744,6 +769,7 @@ class connapp:
questions.append(inquirer.Confirm("port", message="Edit Port?"))
questions.append(inquirer.Confirm("options", message="Edit Options?"))
questions.append(inquirer.Confirm("logs", message="Edit logging path/file?"))
questions.append(inquirer.Confirm("tags", message="Edit tags?"))
questions.append(inquirer.Confirm("user", message="Edit User?"))
questions.append(inquirer.Confirm("password", message="Edit password?"))
answers = inquirer.prompt(questions)
@ -753,12 +779,13 @@ class connapp:
#Questions when adding or editing nodes
try:
defaults = self.config.getitem(unique)
if "tags" not in defaults:
defaults["tags"] = ""
except:
defaults = { "host":"", "protocol":"", "port":"", "user":"", "options":"", "logs":"" }
defaults = { "host":"", "protocol":"", "port":"", "user":"", "options":"", "logs":"" , "tags":""}
node = {}
if edit == None:
edit = { "host":True, "protocol":True, "port":True, "user":True, "password": True,"options":True, "logs":True }
edit = { "host":True, "protocol":True, "port":True, "user":True, "password": True,"options":True, "logs":True, "tags":True }
questions = []
if edit["host"]:
questions.append(inquirer.Text("host", message="Add Hostname or IP", validate=self._host_validation, default=defaults["host"]))
@ -780,6 +807,10 @@ class connapp:
questions.append(inquirer.Text("logs", message="Pick logging path/file ", validate=self._default_validation, default=defaults["logs"].replace("{","{{").replace("}","}}")))
else:
node["logs"] = defaults["logs"]
if edit["tags"]:
questions.append(inquirer.Text("tags", message="Add tags dictionary", validate=self._tags_validation, default=str(defaults["tags"]).replace("{","{{").replace("}","}}")))
else:
node["tags"] = defaults["tags"]
if edit["user"]:
questions.append(inquirer.Text("user", message="Pick username", validate=self._default_validation, default=defaults["user"]))
else:
@ -806,6 +837,8 @@ class connapp:
answer["password"] = passa["password"].split(",")
elif answer["password"] == "No Password":
answer["password"] = ""
if "tags" in answer.keys() and not answer["tags"].startswith("@") and answer["tags"]:
answer["tags"] = ast.literal_eval(answer["tags"])
result = {**uniques, **answer, **node}
result["type"] = "connection"
return result
@ -814,11 +847,13 @@ class connapp:
#Questions when adding or editing profiles
try:
defaults = self.config.profiles[unique]
if "tags" not in defaults:
defaults["tags"] = ""
except:
defaults = { "host":"", "protocol":"", "port":"", "user":"", "options":"", "logs":"" }
defaults = { "host":"", "protocol":"", "port":"", "user":"", "options":"", "logs":"", "tags": "" }
profile = {}
if edit == None:
edit = { "host":True, "protocol":True, "port":True, "user":True, "password": True,"options":True, "logs":True }
edit = { "host":True, "protocol":True, "port":True, "user":True, "password": True,"options":True, "logs":True, "tags":True }
questions = []
if edit["host"]:
questions.append(inquirer.Text("host", message="Add Hostname or IP", default=defaults["host"]))
@ -840,6 +875,10 @@ class connapp:
questions.append(inquirer.Text("logs", message="Pick logging path/file ", default=defaults["logs"].replace("{","{{").replace("}","}}")))
else:
profile["logs"] = defaults["logs"]
if edit["tags"]:
questions.append(inquirer.Text("tags", message="Add tags dictionary", validate=self._profile_tags_validation, default=str(defaults["tags"]).replace("{","{{").replace("}","}}")))
else:
profile["tags"] = defaults["tags"]
if edit["user"]:
questions.append(inquirer.Text("user", message="Pick username", default=defaults["user"]))
else:
@ -854,6 +893,8 @@ class connapp:
if "password" in answer.keys():
if answer["password"] != "":
answer["password"] = self.encrypt(answer["password"])
if "tags" in answer.keys() and answer["tags"]:
answer["tags"] = ast.literal_eval(answer["tags"])
result = {**answer, **profile}
result["id"] = unique
return result
@ -868,6 +909,7 @@ class connapp:
questions.append(inquirer.Text("port", message="Select Port Number", validate=self._port_validation))
questions.append(inquirer.Text("options", message="Pass extra options to protocol", validate=self._default_validation))
questions.append(inquirer.Text("logs", message="Pick logging path/file ", validate=self._default_validation))
questions.append(inquirer.Text("tags", message="Add tags dictionary", validate=self._tags_validation))
questions.append(inquirer.Text("user", message="Pick username", validate=self._default_validation))
questions.append(inquirer.List("password", message="Password: Use a local password, no password or a list of profiles to reference?", choices=["Local Password", "Profiles", "No Password"]))
answer = inquirer.prompt(questions)
@ -885,6 +927,8 @@ class connapp:
elif answer["password"] == "No Password":
answer["password"] = ""
answer["type"] = "connection"
if "tags" in answer.keys() and not answer["tags"].startswith("@") and answer["tags"]:
answer["tags"] = ast.literal_eval(answer["tags"])
return answer
def _type_node(self, arg_value, pat=re.compile(r"^[0-9a-zA-Z_.$@#-]+$")):

View File

@ -33,7 +33,7 @@ class node:
'''
def __init__(self, unique, host, options='', logs='', password='', port='', protocol='', user='', config=''):
def __init__(self, unique, host, options='', logs='', password='', port='', protocol='', user='', config='', tags=''):
'''
### Parameters:
@ -63,6 +63,9 @@ class node:
- config (obj): Pass the object created with class configfile with
key for decryption and extra configuration if you
are using connection manager.
- tags (dict) : Tags useful for automation and personal porpuse
like "os", "prompt" and "screenleght_command"
'''
if config == '':
self.idletime = 0
@ -71,11 +74,14 @@ class node:
self.idletime = config.config["idletime"]
self.key = config.key
self.unique = unique
attr = {"host": host, "logs": logs, "options":options, "port": port, "protocol": protocol, "user": user}
attr = {"host": host, "logs": logs, "options":options, "port": port, "protocol": protocol, "user": user, "tags": tags}
for key in attr:
profile = re.search("^@(.*)", attr[key])
profile = re.search("^@(.*)", str(attr[key]))
if profile and config != '':
setattr(self,key,config.profiles[profile.group(1)][key])
try:
setattr(self,key,config.profiles[profile.group(1)][key])
except:
setattr(self,key,"")
elif attr[key] == '' and key == "protocol":
try:
setattr(self,key,config.profiles["default"][key])
@ -251,11 +257,15 @@ class node:
connect = self._connect(timeout = timeout)
now = datetime.datetime.now().strftime('%Y-%m-%d_%H%M%S')
if connect == True:
if "prompt" in self.tags:
prompt = self.tags["prompt"]
expects = [prompt, pexpect.EOF, pexpect.TIMEOUT]
output = ''
status = ''
if not isinstance(commands, list):
commands = [commands]
if "screen_length_command" in self.tags:
commands.insert(0, self.tags["screen_length_command"])
self.mylog = io.BytesIO()
self.child.logfile_read = self.mylog
for c in commands:
@ -333,10 +343,14 @@ class node:
'''
connect = self._connect(timeout = timeout)
if connect == True:
if "prompt" in self.tags:
prompt = self.tags["prompt"]
expects = [prompt, pexpect.EOF, pexpect.TIMEOUT]
output = ''
if not isinstance(commands, list):
commands = [commands]
if "screen_length_command" in self.tags:
commands.insert(0, self.tags["screen_length_command"])
self.mylog = io.BytesIO()
self.child.logfile_read = self.mylog
for c in commands:
@ -345,25 +359,26 @@ class node:
result = self.child.expect(expects, timeout = timeout)
self.child.sendline(c)
if result == 2:
result = 3
break
if not result == 3:
if vars is not None:
expected = expected.format(**vars)
expects = [expected, prompt, pexpect.EOF, pexpect.TIMEOUT]
if not result == 2:
result = self.child.expect(expects, timeout = timeout)
self.child.close()
output = self._logclean(self.mylog.getvalue().decode(), True)
self.output = output
if result == 0:
self.result = True
self.status = 0
return True
if result in [1, 2]:
self.result = False
if result in [0, 1]:
lastcommand = commands[-1]
if vars is not None:
expected = expected.format(**vars)
lastcommand = lastcommand.format(**vars)
last_command_index = output.rfind(lastcommand)
cleaned_output = output[last_command_index + len(lastcommand):].strip()
if expected in cleaned_output:
self.result = True
else:
self.result = False
self.status = 0
return False
if result == 3:
if result == 2:
self.result = None
self.status = 2
return output

File diff suppressed because it is too large Load Diff

View File

@ -1,8 +1,10 @@
Flask==2.0.3
inquirer==3.1.3
pexpect==4.8.0
pycryptodome==3.17
pyfzf==0.3.1
PyYAML==6.0
setuptools==67.6.1
waitress==2.1.2
Flask>=2.0.3
inquirer>=3.1.3
openai>=0.27.4
pexpect>=4.8.0
pycryptodome>=3.17
pyfzf>=0.3.1
PyYAML>=6.0
requests>=2.28.2
setuptools>=67.6.1
waitress>=2.1.2

View File

@ -31,6 +31,8 @@ install_requires =
Flask
waitress
PyYAML
openai
requests
[options.extras_require]
fuzzysearch = pyfzf