diff --git a/README.md b/README.md index 9eb46d8..719ccb3 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,7 @@ pip install connpy ### Standalone module ``` import connpy -router = connpy.node("unique name","ip/hostname", user="username", password="password") +router = connpy.node("uniqueName","ip/host", user="username", password="password") router.run(["term len 0","show run"]) print(router.output) hasip = router.test("show ip int brief","1.1.1.1") @@ -46,9 +46,9 @@ nodes["router2"] = conf.getitem("router2@office") nodes["router10"] = conf.getitem("router10@datacenter") #Also, you can create the nodes manually: nodes = {} -nodes["router1"] = {"host": "1.1.1.1", "user": "username", "password": "password1"} -nodes["router2"] = {"host": "1.1.1.2", "user": "username", "password": "password2"} -nodes["router3"] = {"host": "1.1.1.2", "user": "username", "password": "password3"} +nodes["router1"] = {"host": "1.1.1.1", "user": "user", "password": "password1"} +nodes["router2"] = {"host": "1.1.1.2", "user": "user", "password": "password2"} +nodes["router3"] = {"host": "1.1.1.2", "user": "user", "password": "password3"} #Finally you run some tasks on the nodes mynodes = connpy.nodes(nodes, config = conf) result = mynodes.test(["show ip int br"], "1.1.1.2") @@ -82,7 +82,17 @@ routers.test("ping {ip}", expected, variables) for key in routers.result: print(key, ' ---> ', ("pass" if routers.result[key] else "fail")) ``` - +### Using AI +``` +import connpy +conf = connpy.configfile() +organization = 'openai-org' +api_key = "openai-key" +myia = ai(conf, organization, api_key) +input = "go to router 1 and get me the full configuration" +result = myia.ask(input, dryrun = False) +print(result) +``` ## Connection manager ### Features - You can generate profiles and reference them from nodes using @profilename so you dont @@ -182,7 +192,31 @@ With the Connpy API you can run commands on devices using http requests --- -### 2. Run Commands +### 2. Get Nodes + +**Endpoint**: `/get_nodes` + +**Method**: `POST` + +**Description**: This route returns a dictionary of nodes with all their attributes. It can also filter the nodes based on a given keyword. + +#### Request Body: + +```json +{ + "filter": "" +} +``` + +* `filter` (optional): A keyword to filter the nodes. It returns only the nodes that contain the keyword. If not provided, the route will return the entire list of nodes. + +#### Response: + +- A JSON array containing the filtered nodes. + +--- + +### 3. Run Commands **Endpoint**: `/run_commands` @@ -211,3 +245,29 @@ With the Connpy API you can run commands on devices using http requests #### Response: - A JSON object with the results of the executed commands on the nodes. + +### 4. Ask AI + +**Endpoint**: `/ask_ai` + +**Method**: `POST` + +**Description**: This route sends to chatgpt IA a request that will parse it into an understandable output for the application and then run the request. + +#### Request Body: + +```json +{ + "input": "", + "dryrun": true or false +} +``` + +* `input` (required): The user input requesting the AI to perform an action on some devices or get the devices list. +* `dryrun` (optional): If set to true, it will return the parameters to run the request but it won't run it. default is false. + +#### Response: + +- A JSON array containing the action to run and the parameters and the result of the action. + + diff --git a/connpy/__init__.py b/connpy/__init__.py index 49be761..266b1c0 100644 --- a/connpy/__init__.py +++ b/connpy/__init__.py @@ -98,7 +98,31 @@ With the Connpy API you can run commands on devices using http requests --- -### 2. Run Commands +### 2. Get Nodes + +**Endpoint**: `/get_nodes` + +**Method**: `POST` + +**Description**: This route returns a dictionary of nodes with all their attributes. It can also filter the nodes based on a given keyword. + +#### Request Body: + +```json +{ + "filter": "" +} +``` + +* `filter` (optional): A keyword to filter the nodes. It returns only the nodes that contain the keyword. If not provided, the route will return the entire list of nodes. + +#### Response: + +- A JSON array containing the filtered nodes. + +--- + +### 3. Run Commands **Endpoint**: `/run_commands` @@ -127,13 +151,37 @@ With the Connpy API you can run commands on devices using http requests #### Response: - A JSON object with the results of the executed commands on the nodes. -## Automation module -the automation module +### 4. Ask AI + +**Endpoint**: `/ask_ai` + +**Method**: `POST` + +**Description**: This route sends to chatgpt IA a request that will parse it into an understandable output for the application and then run the request. + +#### Request Body: + +```json +{ + "input": "", + "dryrun": true or false +} +``` + +* `input` (required): The user input requesting the AI to perform an action on some devices or get the devices list. +* `dryrun` (optional): If set to true, it will return the parameters to run the request but it won't run it. default is false. + +#### Response: + +- A JSON array containing the action to run and the parameters and the result of the action. + +## Automation module +The automation module ### Standalone module ``` import connpy -router = connpy.node("unique name","ip/hostname", user="user", password="pass") +router = connpy.node("uniqueName","ip/host", user="user", password="pass") router.run(["term len 0","show run"]) print(router.output) hasip = router.test("show ip int brief","1.1.1.1") @@ -165,9 +213,9 @@ nodes["router2"] = conf.getitem("router2@office") nodes["router10"] = conf.getitem("router10@datacenter") #Also, you can create the nodes manually: nodes = {} -nodes["router1"] = {"host": "1.1.1.1", "user": "username", "password": "pass1"} -nodes["router2"] = {"host": "1.1.1.2", "user": "username", "password": "pass2"} -nodes["router3"] = {"host": "1.1.1.2", "user": "username", "password": "pass3"} +nodes["router1"] = {"host": "1.1.1.1", "user": "user", "password": "pass1"} +nodes["router2"] = {"host": "1.1.1.2", "user": "user", "password": "pass2"} +nodes["router3"] = {"host": "1.1.1.2", "user": "user", "password": "pass3"} #Finally you run some tasks on the nodes mynodes = connpy.nodes(nodes, config = conf) result = mynodes.test(["show ip int br"], "1.1.1.2") @@ -201,14 +249,27 @@ routers.test("ping {ip}", expected, variables) for key in routers.result: print(key, ' ---> ', ("pass" if routers.result[key] else "fail")) ``` +### Using AI +``` +import connpy +conf = connpy.configfile() +organization = 'openai-org' +api_key = "openai-key" +myia = ai(conf, organization, api_key) +input = "go to router 1 and get me the full configuration" +result = myia.ask(input, dryrun = False) +print(result) +``` ''' from .core import node,nodes from .configfile import configfile from .connapp import connapp +from .api import * +from .ai import ai from ._version import __version__ from pkg_resources import get_distribution -__all__ = ["node", "nodes", "configfile", "connapp"] +__all__ = ["node", "nodes", "configfile", "connapp", "ai"] __author__ = "Federico Luzzi" __pdoc__ = { 'core': False, diff --git a/connpy/_version.py b/connpy/_version.py index ca2ef48..b2498df 100644 --- a/connpy/_version.py +++ b/connpy/_version.py @@ -1,2 +1,2 @@ -__version__ = "3.0.7" +__version__ = "3.2.0" diff --git a/connpy/ai.py b/connpy/ai.py new file mode 100755 index 0000000..0600264 --- /dev/null +++ b/connpy/ai.py @@ -0,0 +1,317 @@ +import openai +import requests +import json +import re +import ast +from textwrap import dedent +from .core import nodes + +class ai: + ''' This class generates a ai object. Containts all the information and methods to make requests to openAI chatGPT to run actions on the application. + + ### Attributes: + + - model (str): Model of GPT api to use. Default is gpt-3.5-turbo. + + - temp (float): Value between 0 and 1 that control the randomness + of generated text, with higher values increasing + creativity. Default is 0.7. + + ''' + + def __init__(self, config, org = None, api_key = None, model = "gpt-3.5-turbo", temp = 0.7): + ''' + + ### Parameters: + + - config (obj): Pass the object created with class configfile with + key for decryption and extra configuration if you + are using connection manager. + + ### Optional Parameters: + + - org (str): A unique token identifying the user organization + to interact with the API. + + - api_key (str): A unique authentication token required to access + and interact with the API. + + - model (str): Model of GPT api to use. Default is gpt-3.5-turbo. + + - temp (float): Value between 0 and 1 that control the randomness + of generated text, with higher values increasing + creativity. Default is 0.7. + + + ''' + self.config = config + if org: + openai.organization = org + else: + try: + openai.organization = self.config.config["openai"]["organization"] + except: + raise ValueError("Missing openai organization") + if api_key: + openai.api_key = api_key + else: + try: + openai.api_key = self.config.config["openai"]["api_key"] + except: + raise ValueError("Missing openai api_key") + self.__prompt = {} + self.__prompt["original_system"] = """ + When provided with user input for an SSH connection management application, analyze the input and extract the following information: + + - app_related: True if the input is related to the application's purpose and the request is understood; False if the input is not related, not understood, or if mandatory information like filter is missing. + - type: Given a user input, identify the type of request they want to make. The input will represent one of two options: + 1. "command" - The user wants to get information from devices by running commands. + 2. "list_nodes" - The user wants to get a list of nodes, devices, servers, or routers. + Response wich command or list_nodes type depending on the request. + - filter: One or more regex patterns indicating the device or group of devices the command should be run on, returned as a Python list (e.g., ['hostname', 'hostname@folder', '@subfolder@folder']). The filter can have different formats, such as: + - hostname + - hostname@folder + - hostname@subfolder@folder + - partofhostname + - @folder + - @subfolder@folder + - regex_pattern + The filter should be extracted from the user input exactly as it was provided. + Always preserve the exact filter pattern provided by the user, with no modifications. Do not process any regex, the application can do that. + If no filter is specified, set it to None. + - Expected: A value representing an expected output to search for when running the command. For the user this is a optional value. Set it to 'None' if no value was captured. + expected value should ALWAYS come from the user input explicitly. + - response: An optional field to be filled when app_related is False or when providing an explanation related to the app. This is where you can engage in small talk, answer questions not related to the app, or provide explanations about the extracted information. + + Always respond in the following format: + + app_related: {{app_related}} + Type: {{command}} + Filter: {{filter}} + Expected: {{expected}} + Response: {{response}} + """ + self.__prompt["original_user"] = "Get the IP addresses of loopback0 for all routers from w2az1 and e1.*(prod|dev) and check if they have the ip 192.168.1.1" + self.__prompt["original_assistant"] = "app_related: True\nType: Command\nFilter: ['w2az1', 'e1.*(prod|dev)']\nExpected: 192.168.1.1" + self.__prompt["command_system"] = """ + For each device listed below, provide the command(s) needed to perform the specified action, depending on the device OS (e.g., Cisco IOSXR router, Linux server). Always format your response as a Python list (e.g., ['command1', 'command2']). + + Note that the application knows how to connect to devices via SSH, so you only need to provide the command(s) to run after connecting. + + If the commands needed are not for the specific OS type, just send an empty list (e.g., []). + + It is crucial to always include the device name provided in your response, even when there is only one device. + + your response has to be always like this: + node1: ["command1", "command2"] + node2: ["command1", "command2", "command3"] + node1@folder: ["command1"] + Node4@subfolder@folder: [] + """ + self.__prompt["command_user"]= """ + input: show me the full configuration for all this devices: + + Devices: + router1: cisco ios + """ + self.__prompt["command_assistant"]= """ + router1: ['show running-config'] + """ + self.model = model + self.temp = temp + + def _clean_original_response(self, raw_response): + #Parse response for first request to openAI GPT. + info_dict = {} + info_dict["app_related"] = False + current_key = "response" + for line in raw_response.split("\n"): + if line.strip() == "": + line = "\n" + possible_keys = ["app_related", "type", "filter", "expected", "response"] + if ':' in line and (key := line.split(':', 1)[0].strip().lower()) in possible_keys: + key, value = line.split(":", 1) + key = key.strip().lower() + value = value.strip() + # Convert "true" or "false" (case-insensitive) to Python boolean + if value.lower() == "true": + value = True + elif value.lower() == "false": + value = False + elif value.lower() == "none": + value = None + if key == "filter": + value = ast.literal_eval(value) + #store in dictionary + info_dict[key] = value + current_key = key + else: + if current_key == "response": + if "response" in info_dict: + info_dict[current_key] += "\n" + line + else: + info_dict[current_key] = line + + return info_dict + + def _clean_command_response(self, raw_response): + #Parse response for command request to openAI GPT. + info_dict = {} + info_dict["commands"] = [] + info_dict["variables"] = {} + info_dict["variables"]["__global__"] = {} + for line in raw_response.split("\n"): + if ":" in line: + key, value = line.split(":", 1) + key = key.strip() + newvalue = {} + try: + value = ast.literal_eval(value.strip()) + for i,e in enumerate(value, start=1): + newvalue[f"command{i}"] = e + if f"{{command{i}}}" not in info_dict["commands"]: + info_dict["commands"].append(f"{{command{i}}}") + info_dict["variables"]["__global__"][f"command{i}"] = "" + info_dict["variables"][key] = newvalue + except: + pass + return info_dict + + + def _get_commands(self, user_input, nodes): + #Send the request for commands for each device to openAI GPT. + output_list = [] + for key, value in nodes.items(): + tags = value.get('tags', {}) + try: + os_value = tags.get('os', '') + except: + os_value = "" + output_list.append(f"{key}: {os_value}") + output_str = "\n".join(output_list) + command_input = f"input: {user_input}\n\nDevices:\n{output_str}" + message = [] + message.append({"role": "system", "content": dedent(self.__prompt["command_system"])}) + message.append({"role": "user", "content": dedent(self.__prompt["command_user"])}) + message.append({"role": "assistant", "content": dedent(self.__prompt["command_assistant"])}) + message.append({"role": "user", "content": command_input}) + response = openai.ChatCompletion.create( + model=self.model, + messages=message, + temperature=self.temp + ) + output = {} + output["dict_response"] = response + output["raw_response"] = response["choices"][0]["message"]["content"] + output["response"] = self._clean_command_response(output["raw_response"]) + return output + + def _get_filter(self, user_input): + #Send the request to identify the filter and other attributes from the user input to GPT. + message = [] + message.append({"role": "system", "content": dedent(self.__prompt["original_system"])}) + message.append({"role": "user", "content": dedent(self.__prompt["original_user"])}) + message.append({"role": "assistant", "content": dedent(self.__prompt["original_assistant"])}) + message.append({"role": "user", "content": user_input}) + + response = openai.ChatCompletion.create( + model=self.model, + messages=message, + temperature=self.temp, + top_p=1 + ) + + output = {} + output["dict_response"] = response + output["raw_response"] = response["choices"][0]["message"]["content"] + clear_response = self._clean_original_response(output["raw_response"]) + output["response"] = self._clean_original_response(output["raw_response"]) + return output + + def ask(self, user_input, dryrun = False): + ''' + Send the user input to openAI GPT and parse the response to run an action in the application. + + ### Parameters: + + - user_input (str): Request to send to openAI that will be parsed + and returned to execute on the application. + AI understands the following tasks: + - Run a command on a group of devices. + - List a group of devices. + - Test a command on a group of devices + and verify if the output contain an + expected value. + + ### Optional Parameters: + + - dryrun (bool): Set to true to get the arguments to use to run + in the app. Default is false and it will run + the actions directly. + + ### Returns: + + dict: Dictionary formed with the following keys: + - input: User input received + - app_related: True if GPT detected the request to be related + to the application. + - dryrun: True/False + - response: If the request is not related to the app. this + key will contain chatGPT answer. + - action: The action detected by the AI to run in the app. + - filter: If it was detected by the AI, the filter used + to get the list of nodes to work on. + - nodes: If it's not a dryrun, the list of nodes matched by + the filter. + - args: A dictionary of arguments required to run command(s) + on the nodes. + - result: A dictionary with the output of the commands or + the test. + + + ''' + output = {} + original = self._get_filter(user_input) + output["input"] = user_input + output["app_related"] = original["response"]["app_related"] + output["dryrun"] = dryrun + if not output["app_related"]: + output["response"] = original["response"]["response"] + else: + type = original["response"]["type"].lower() + if "filter" in original["response"]: + output["filter"] = original["response"]["filter"] + if not self.config.config["case"]: + if isinstance(output["filter"], list): + output["filter"] = [item.lower() for item in output["filter"]] + else: + output["filter"] = output["filter"].lower() + if not dryrun or type == "command": + thisnodes = self.config._getallnodesfull(output["filter"]) + output["nodes"] = list(thisnodes.keys()) + if not type == "command": + output["action"] = type + else: + commands = self._get_commands(user_input, thisnodes) + output["args"] = {} + output["args"]["commands"] = commands["response"]["commands"] + output["args"]["vars"] = commands["response"]["variables"] + if original["response"]["expected"]: + output["args"]["expected"] = original["response"]["expected"] + output["action"] = "test" + else: + output["action"] = "run" + if not dryrun: + mynodes = nodes(self.config.getitems(output["nodes"]),config=self.config) + if output["action"] == "test": + output["result"] = mynodes.test(**output["args"]) + elif output["action"] == "run": + output["result"] = mynodes.run(**output["args"]) + return output + + + + + + + diff --git a/connpy/api.py b/connpy/api.py old mode 100644 new mode 100755 index 2b5cd01..a4c3357 --- a/connpy/api.py +++ b/connpy/api.py @@ -1,5 +1,6 @@ from flask import Flask, request, jsonify from connpy import configfile, node, nodes +from connpy.ai import ai as myai from waitress import serve import os import signal @@ -22,18 +23,50 @@ def root(): @app.route("/list_nodes", methods=["POST"]) def list_nodes(): conf = app.custom_config - output = conf._getallnodes() case = conf.config["case"] try: data = request.get_json() filter = data["filter"] if not case: - filter = filter.lower() - output = [item for item in output if filter in item] + if isinstance(filter, list): + filter = [item.lower() for item in filter] + else: + filter = filter.lower() + output = conf._getallnodes(filter) except: - pass + output = conf._getallnodes() return jsonify(output) +@app.route("/get_nodes", methods=["POST"]) +def get_nodes(): + conf = app.custom_config + case = conf.config["case"] + try: + data = request.get_json() + filter = data["filter"] + if not case: + if isinstance(filter, list): + filter = [item.lower() for item in filter] + else: + filter = filter.lower() + output = conf._getallnodesfull(filter) + except: + output = conf._getallnodesfull() + return jsonify(output) + +@app.route("/ask_ai", methods=["POST"]) +def ask_ai(): + conf = app.custom_config + data = request.get_json() + input = data["input"] + if "dryrun" in data: + dryrun = data["dryrun"] + else: + dryrun = False + ai = myai(conf) + return ai.ask(input, dryrun) + + @app.route("/run_commands", methods=["POST"]) def run_commands(): conf = app.custom_config @@ -51,25 +84,7 @@ def run_commands(): error = "'{}' is mandatory".format(e.args[0]) return({"DataError": error}) if isinstance(nodelist, list): - for i in nodelist: - if isinstance(i, dict): - name = list(i.keys())[0] - mylist = i[name] - if not case: - name = name.lower() - mylist = [item.lower() for item in mylist] - this = conf.getitem(name, mylist) - mynodes.update(this) - elif i.startswith("@"): - if not case: - i = i.lower() - this = conf.getitem(i) - mynodes.update(this) - else: - if not case: - i = i.lower() - this = conf.getitem(i) - mynodes[i] = this + mynodes = conf.getitems(nodelist) else: if not case: nodelist = nodelist.lower() diff --git a/connpy/configfile.py b/connpy/configfile.py index 8a3c519..0fe42a7 100755 --- a/connpy/configfile.py +++ b/connpy/configfile.py @@ -92,7 +92,7 @@ class configfile: def _createconfig(self, conf): #Create config file - defaultconfig = {'config': {'case': False, 'idletime': 30, 'fzf': False}, 'connections': {}, 'profiles': { "default": { "host":"", "protocol":"ssh", "port":"", "user":"", "password":"", "options":"", "logs":"" }}} + defaultconfig = {'config': {'case': False, 'idletime': 30, 'fzf': False}, 'connections': {}, 'profiles': { "default": { "host":"", "protocol":"ssh", "port":"", "user":"", "password":"", "options":"", "logs":"", "tags": "" }}} if not os.path.exists(conf): with open(conf, "w") as f: json.dump(defaultconfig, f, indent = 4) @@ -159,8 +159,8 @@ class configfile: ### Returns: - dict: Dictionary containing information of node or multiple dictionaries - of multiple nodes. + dict: Dictionary containing information of node or multiple + dictionaries of multiple nodes. ''' uniques = self._explode_unique(unique) @@ -197,14 +197,53 @@ class configfile: newnode.pop("type") return newnode - def _connections_add(self,*, id, host, folder='', subfolder='', options='', logs='', password='', port='', protocol='', user='', type = "connection" ): + def getitems(self, uniques): + ''' + Get a group of nodes from configfile which can be passed to node/nodes class + + ### Parameters: + + - uniques (str/list): Regex string name that will match hostnames + from the connection manager. It can be a + list of strings. + + ### Returns: + + dict: Dictionary containing information of node or multiple + dictionaries of multiple nodes. + + ''' + nodes = {} + for i in uniques: + if isinstance(i, dict): + name = list(i.keys())[0] + mylist = i[name] + if not self.config["case"]: + name = name.lower() + mylist = [item.lower() for item in mylist] + this = self.getitem(name, mylist) + nodes.update(this) + elif i.startswith("@"): + if not self.config["case"]: + i = i.lower() + this = self.getitem(i) + nodes.update(this) + else: + if not self.config["case"]: + i = i.lower() + this = self.getitem(i) + nodes[i] = this + return nodes + + + def _connections_add(self,*, id, host, folder='', subfolder='', options='', logs='', password='', port='', protocol='', user='', tags='', type = "connection" ): #Add connection from config if folder == '': - self.connections[id] = {"host": host, "options": options, "logs": logs, "password": password, "port": port, "protocol": protocol, "user": user, "type": type} + self.connections[id] = {"host": host, "options": options, "logs": logs, "password": password, "port": port, "protocol": protocol, "user": user, "tags": tags,"type": type} elif folder != '' and subfolder == '': - self.connections[folder][id] = {"host": host, "options": options, "logs": logs, "password": password, "port": port, "protocol": protocol, "user": user, "type": type} + self.connections[folder][id] = {"host": host, "options": options, "logs": logs, "password": password, "port": port, "protocol": protocol, "user": user, "tags": tags, "type": type} elif folder != '' and subfolder != '': - self.connections[folder][subfolder][id] = {"host": host, "options": options, "logs": logs, "password": password, "port": port, "protocol": protocol, "user": user, "type": type} + self.connections[folder][subfolder][id] = {"host": host, "options": options, "logs": logs, "password": password, "port": port, "protocol": protocol, "user": user, "tags": tags, "type": type} def _connections_del(self,*, id, folder='', subfolder=''): @@ -233,16 +272,16 @@ class configfile: del self.connections[folder][subfolder] - def _profiles_add(self,*, id, host = '', options='', logs='', password='', port='', protocol='', user='' ): + def _profiles_add(self,*, id, host = '', options='', logs='', password='', port='', protocol='', user='', tags='' ): #Add profile from config - self.profiles[id] = {"host": host, "options": options, "logs": logs, "password": password, "port": port, "protocol": protocol, "user": user} + self.profiles[id] = {"host": host, "options": options, "logs": logs, "password": password, "port": port, "protocol": protocol, "user": user, "tags": tags} def _profiles_del(self,*, id ): #Delete profile from config del self.profiles[id] - def _getallnodes(self): + def _getallnodes(self, filter = None): #get all nodes on configfile nodes = [] layer1 = [k for k,v in self.connections.items() if isinstance(v, dict) and v["type"] == "connection"] @@ -255,8 +294,51 @@ class configfile: for s in subfolders: layer3 = [k + "@" + s + "@" + f for k,v in self.connections[f][s].items() if isinstance(v, dict) and v["type"] == "connection"] nodes.extend(layer3) + if filter: + if isinstance(filter, str): + nodes = [item for item in nodes if re.search(filter, item)] + elif isinstance(filter, list): + nodes = [item for item in nodes if any(re.search(pattern, item) for pattern in filter)] + else: + raise ValueError("filter must be a string or a list of strings") return nodes + def _getallnodesfull(self, filter = None): + #get all nodes on configfile with all their attributes. + nodes = {} + layer1 = {k:v for k,v in self.connections.items() if isinstance(v, dict) and v["type"] == "connection"} + folders = [k for k,v in self.connections.items() if isinstance(v, dict) and v["type"] == "folder"] + nodes.update(layer1) + for f in folders: + layer2 = {k + "@" + f:v for k,v in self.connections[f].items() if isinstance(v, dict) and v["type"] == "connection"} + nodes.update(layer2) + subfolders = [k for k,v in self.connections[f].items() if isinstance(v, dict) and v["type"] == "subfolder"] + for s in subfolders: + layer3 = {k + "@" + s + "@" + f:v for k,v in self.connections[f][s].items() if isinstance(v, dict) and v["type"] == "connection"} + nodes.update(layer3) + if filter: + if isinstance(filter, str): + nodes = {k: v for k, v in nodes.items() if re.search(filter, k)} + elif isinstance(filter, list): + nodes = {k: v for k, v in nodes.items() if any(re.search(pattern, k) for pattern in filter)} + else: + raise ValueError("filter must be a string or a list of strings") + for node, keys in nodes.items(): + for key, value in keys.items(): + profile = re.search("^@(.*)", str(value)) + if profile: + try: + nodes[node][key] = self.profiles[profile.group(1)][key] + except: + nodes[node][key] = "" + elif value == '' and key == "protocol": + try: + nodes[node][key] = config.profiles["default"][key] + except: + nodes[node][key] = "ssh" + return nodes + + def _getallfolders(self): #get all folders on configfile folders = ["@" + k for k,v in self.connections.items() if isinstance(v, dict) and v["type"] == "folder"] diff --git a/connpy/connapp.py b/connpy/connapp.py index caf9be7..40197cd 100755 --- a/connpy/connapp.py +++ b/connpy/connapp.py @@ -10,8 +10,9 @@ import sys import inquirer from .core import node,nodes from ._version import __version__ -from .api import * +from .api import start_api,stop_api,debug_api import yaml +import ast try: from pyfzf.pyfzf import FzfPrompt except: @@ -119,6 +120,7 @@ class connapp: configcrud.add_argument("--keepalive", dest="idletime", nargs=1, action=self._store_type, help="Set keepalive time in seconds, 0 to disable", type=int, metavar="INT") configcrud.add_argument("--completion", dest="completion", nargs=1, choices=["bash","zsh"], action=self._store_type, help="Get terminal completion configuration for conn") configcrud.add_argument("--configfolder", dest="configfolder", nargs=1, action=self._store_type, help="Set the default location for config file", metavar="FOLDER") + configcrud.add_argument("--openai", dest="openai", nargs=2, action=self._store_type, help="Set openai organization and api_key", metavar=("ORGANIZATION", "API_KEY")) configparser.set_defaults(func=self._func_others) #Manage sys arguments commands = ["node", "profile", "mv", "move","copy", "cp", "bulk", "ls", "list", "run", "config", "api"] @@ -244,6 +246,7 @@ class connapp: print("You can also leave empty any value except hostname/IP.") print("You can pass 1 or more passwords using comma separated @profiles") print("You can use this variables on logging file name: ${id} ${unique} ${host} ${port} ${user} ${protocol}") + print("Some useful tags to set for automation are 'os', 'screen_length_command', and 'prompt'.") newnode = self._questions_nodes(args.data, uniques) if newnode == False: exit(7) @@ -372,7 +375,7 @@ class connapp: def _func_others(self, args): #Function called when using other commands - actions = {"ls": self._ls, "move": self._mvcp, "cp": self._mvcp, "bulk": self._bulk, "completion": self._completion, "case": self._case, "fzf": self._fzf, "idletime": self._idletime, "configfolder": self._configfolder} + actions = {"ls": self._ls, "move": self._mvcp, "cp": self._mvcp, "bulk": self._bulk, "completion": self._completion, "case": self._case, "fzf": self._fzf, "idletime": self._idletime, "configfolder": self._configfolder, "openai": self._openai} return actions.get(args.command)(args) def _ls(self, args): @@ -442,6 +445,7 @@ class connapp: newnode["port"] = newnodes["port"] newnode["options"] = newnodes["options"] newnode["logs"] = newnodes["logs"] + newnode["tags"] = newnodes["tags"] newnode["user"] = newnodes["user"] newnode["password"] = newnodes["password"] count +=1 @@ -487,6 +491,13 @@ class connapp: with open(pathfile, "w") as f: f.write(str(folder)) print("Config saved") + + def _openai(self, args): + openaikeys = {} + openaikeys["organization"] = args.data[0] + openaikeys["api_key"] = args.data[1] + self._change_settings(args.command, openaikeys) + def _change_settings(self, name, value): self.config.config[name] = value @@ -516,7 +527,6 @@ class connapp: def _node_run(self, args): command = " ".join(args.data[1:]) - command = command.split("-") matches = list(filter(lambda k: k == args.data[0], self.nodes)) if len(matches) == 0: print("{} not found".format(args.data[0])) @@ -545,7 +555,6 @@ class connapp: print("failed reading file {}".format(args.data[0])) exit(10) for script in scripts["tasks"]: - nodes = {} args = {} try: action = script["action"] @@ -557,18 +566,7 @@ class connapp: except KeyError as e: print("'{}' is mandatory".format(e.args[0])) exit(11) - for i in nodelist: - if isinstance(i, dict): - name = list(i.keys())[0] - this = self.config.getitem(name, i[name]) - nodes.update(this) - elif i.startswith("@"): - this = self.config.getitem(i) - nodes.update(this) - else: - this = self.config.getitem(i) - nodes[i] = this - nodes = self.connnodes(nodes, config = self.config) + nodes = self.connnodes(self.config.getitems(nodelist), config = self.config) stdout = False if output is None: pass @@ -698,6 +696,33 @@ class connapp: raise inquirer.errors.ValidationError("", reason="Profile {} don't exist".format(i)) return True + def _tags_validation(self, answers, current): + #Validation for Tags in inquirer when managing nodes + if current.startswith("@"): + if current[1:] not in self.profiles: + raise inquirer.errors.ValidationError("", reason="Profile {} don't exist".format(current)) + elif current != "": + isdict = False + try: + isdict = ast.literal_eval(current) + except: + pass + if not isinstance (isdict, dict): + raise inquirer.errors.ValidationError("", reason="Tags should be a python dictionary.".format(current)) + return True + + def _profile_tags_validation(self, answers, current): + #Validation for Tags in inquirer when managing profiles + if current != "": + isdict = False + try: + isdict = ast.literal_eval(current) + except: + pass + if not isinstance (isdict, dict): + raise inquirer.errors.ValidationError("", reason="Tags should be a python dictionary.".format(current)) + return True + def _default_validation(self, answers, current): #Default validation type used in multiples questions in inquirer if current.startswith("@"): @@ -744,6 +769,7 @@ class connapp: questions.append(inquirer.Confirm("port", message="Edit Port?")) questions.append(inquirer.Confirm("options", message="Edit Options?")) questions.append(inquirer.Confirm("logs", message="Edit logging path/file?")) + questions.append(inquirer.Confirm("tags", message="Edit tags?")) questions.append(inquirer.Confirm("user", message="Edit User?")) questions.append(inquirer.Confirm("password", message="Edit password?")) answers = inquirer.prompt(questions) @@ -753,12 +779,13 @@ class connapp: #Questions when adding or editing nodes try: defaults = self.config.getitem(unique) + if "tags" not in defaults: + defaults["tags"] = "" except: - defaults = { "host":"", "protocol":"", "port":"", "user":"", "options":"", "logs":"" } + defaults = { "host":"", "protocol":"", "port":"", "user":"", "options":"", "logs":"" , "tags":""} node = {} - if edit == None: - edit = { "host":True, "protocol":True, "port":True, "user":True, "password": True,"options":True, "logs":True } + edit = { "host":True, "protocol":True, "port":True, "user":True, "password": True,"options":True, "logs":True, "tags":True } questions = [] if edit["host"]: questions.append(inquirer.Text("host", message="Add Hostname or IP", validate=self._host_validation, default=defaults["host"])) @@ -780,6 +807,10 @@ class connapp: questions.append(inquirer.Text("logs", message="Pick logging path/file ", validate=self._default_validation, default=defaults["logs"].replace("{","{{").replace("}","}}"))) else: node["logs"] = defaults["logs"] + if edit["tags"]: + questions.append(inquirer.Text("tags", message="Add tags dictionary", validate=self._tags_validation, default=str(defaults["tags"]).replace("{","{{").replace("}","}}"))) + else: + node["tags"] = defaults["tags"] if edit["user"]: questions.append(inquirer.Text("user", message="Pick username", validate=self._default_validation, default=defaults["user"])) else: @@ -806,6 +837,8 @@ class connapp: answer["password"] = passa["password"].split(",") elif answer["password"] == "No Password": answer["password"] = "" + if "tags" in answer.keys() and not answer["tags"].startswith("@") and answer["tags"]: + answer["tags"] = ast.literal_eval(answer["tags"]) result = {**uniques, **answer, **node} result["type"] = "connection" return result @@ -814,11 +847,13 @@ class connapp: #Questions when adding or editing profiles try: defaults = self.config.profiles[unique] + if "tags" not in defaults: + defaults["tags"] = "" except: - defaults = { "host":"", "protocol":"", "port":"", "user":"", "options":"", "logs":"" } + defaults = { "host":"", "protocol":"", "port":"", "user":"", "options":"", "logs":"", "tags": "" } profile = {} if edit == None: - edit = { "host":True, "protocol":True, "port":True, "user":True, "password": True,"options":True, "logs":True } + edit = { "host":True, "protocol":True, "port":True, "user":True, "password": True,"options":True, "logs":True, "tags":True } questions = [] if edit["host"]: questions.append(inquirer.Text("host", message="Add Hostname or IP", default=defaults["host"])) @@ -840,6 +875,10 @@ class connapp: questions.append(inquirer.Text("logs", message="Pick logging path/file ", default=defaults["logs"].replace("{","{{").replace("}","}}"))) else: profile["logs"] = defaults["logs"] + if edit["tags"]: + questions.append(inquirer.Text("tags", message="Add tags dictionary", validate=self._profile_tags_validation, default=str(defaults["tags"]).replace("{","{{").replace("}","}}"))) + else: + profile["tags"] = defaults["tags"] if edit["user"]: questions.append(inquirer.Text("user", message="Pick username", default=defaults["user"])) else: @@ -854,6 +893,8 @@ class connapp: if "password" in answer.keys(): if answer["password"] != "": answer["password"] = self.encrypt(answer["password"]) + if "tags" in answer.keys() and answer["tags"]: + answer["tags"] = ast.literal_eval(answer["tags"]) result = {**answer, **profile} result["id"] = unique return result @@ -868,6 +909,7 @@ class connapp: questions.append(inquirer.Text("port", message="Select Port Number", validate=self._port_validation)) questions.append(inquirer.Text("options", message="Pass extra options to protocol", validate=self._default_validation)) questions.append(inquirer.Text("logs", message="Pick logging path/file ", validate=self._default_validation)) + questions.append(inquirer.Text("tags", message="Add tags dictionary", validate=self._tags_validation)) questions.append(inquirer.Text("user", message="Pick username", validate=self._default_validation)) questions.append(inquirer.List("password", message="Password: Use a local password, no password or a list of profiles to reference?", choices=["Local Password", "Profiles", "No Password"])) answer = inquirer.prompt(questions) @@ -885,6 +927,8 @@ class connapp: elif answer["password"] == "No Password": answer["password"] = "" answer["type"] = "connection" + if "tags" in answer.keys() and not answer["tags"].startswith("@") and answer["tags"]: + answer["tags"] = ast.literal_eval(answer["tags"]) return answer def _type_node(self, arg_value, pat=re.compile(r"^[0-9a-zA-Z_.$@#-]+$")): diff --git a/connpy/core.py b/connpy/core.py index 3da4bc7..a595ee8 100755 --- a/connpy/core.py +++ b/connpy/core.py @@ -33,7 +33,7 @@ class node: ''' - def __init__(self, unique, host, options='', logs='', password='', port='', protocol='', user='', config=''): + def __init__(self, unique, host, options='', logs='', password='', port='', protocol='', user='', config='', tags=''): ''' ### Parameters: @@ -63,6 +63,9 @@ class node: - config (obj): Pass the object created with class configfile with key for decryption and extra configuration if you are using connection manager. + + - tags (dict) : Tags useful for automation and personal porpuse + like "os", "prompt" and "screenleght_command" ''' if config == '': self.idletime = 0 @@ -71,11 +74,14 @@ class node: self.idletime = config.config["idletime"] self.key = config.key self.unique = unique - attr = {"host": host, "logs": logs, "options":options, "port": port, "protocol": protocol, "user": user} + attr = {"host": host, "logs": logs, "options":options, "port": port, "protocol": protocol, "user": user, "tags": tags} for key in attr: - profile = re.search("^@(.*)", attr[key]) + profile = re.search("^@(.*)", str(attr[key])) if profile and config != '': - setattr(self,key,config.profiles[profile.group(1)][key]) + try: + setattr(self,key,config.profiles[profile.group(1)][key]) + except: + setattr(self,key,"") elif attr[key] == '' and key == "protocol": try: setattr(self,key,config.profiles["default"][key]) @@ -251,11 +257,15 @@ class node: connect = self._connect(timeout = timeout) now = datetime.datetime.now().strftime('%Y-%m-%d_%H%M%S') if connect == True: + if "prompt" in self.tags: + prompt = self.tags["prompt"] expects = [prompt, pexpect.EOF, pexpect.TIMEOUT] output = '' status = '' if not isinstance(commands, list): commands = [commands] + if "screen_length_command" in self.tags: + commands.insert(0, self.tags["screen_length_command"]) self.mylog = io.BytesIO() self.child.logfile_read = self.mylog for c in commands: @@ -333,10 +343,14 @@ class node: ''' connect = self._connect(timeout = timeout) if connect == True: + if "prompt" in self.tags: + prompt = self.tags["prompt"] expects = [prompt, pexpect.EOF, pexpect.TIMEOUT] output = '' if not isinstance(commands, list): commands = [commands] + if "screen_length_command" in self.tags: + commands.insert(0, self.tags["screen_length_command"]) self.mylog = io.BytesIO() self.child.logfile_read = self.mylog for c in commands: @@ -345,25 +359,26 @@ class node: result = self.child.expect(expects, timeout = timeout) self.child.sendline(c) if result == 2: - result = 3 break - if not result == 3: - if vars is not None: - expected = expected.format(**vars) - expects = [expected, prompt, pexpect.EOF, pexpect.TIMEOUT] + if not result == 2: result = self.child.expect(expects, timeout = timeout) self.child.close() output = self._logclean(self.mylog.getvalue().decode(), True) self.output = output - if result == 0: - self.result = True - self.status = 0 - return True - if result in [1, 2]: - self.result = False + if result in [0, 1]: + lastcommand = commands[-1] + if vars is not None: + expected = expected.format(**vars) + lastcommand = lastcommand.format(**vars) + last_command_index = output.rfind(lastcommand) + cleaned_output = output[last_command_index + len(lastcommand):].strip() + if expected in cleaned_output: + self.result = True + else: + self.result = False self.status = 0 return False - if result == 3: + if result == 2: self.result = None self.status = 2 return output diff --git a/docs/connpy/index.html b/docs/connpy/index.html index dc6ac86..6af6391 100644 --- a/docs/connpy/index.html +++ b/docs/connpy/index.html @@ -105,11 +105,28 @@ options:
  • A JSON array containing the filtered list of nodes.

  • -

    2. Run Commands

    +

    2. Get Nodes

    +

    Endpoint: /get_nodes

    +

    Method: POST

    +

    Description: This route returns a dictionary of nodes with all their attributes. It can also filter the nodes based on a given keyword.

    +

    Request Body:

    +
    {
    +  "filter": "<keyword>"
    +}
    +
    +
      +
    • filter (optional): A keyword to filter the nodes. It returns only the nodes that contain the keyword. If not provided, the route will return the entire list of nodes.
    • +
    +

    Response:

    +
      +
    • A JSON array containing the filtered nodes.
    • +
    +
    +

    3. Run Commands

    Endpoint: /run_commands

    Method: POST

    Description: This route runs commands on selected nodes based on the provided action, nodes, and commands. It also supports executing tests by providing expected results.

    -

    Request Body:

    +

    Request Body:

    {
       "action": "<action>",
       "nodes": "<nodes>",
    @@ -126,15 +143,33 @@ options:
     
  • options (optional): Array to pass options to the run command, options are: prompt, parallel, timeout
  • -

    Response:

    +

    Response:

    • A JSON object with the results of the executed commands on the nodes.
    +

    4. Ask AI

    +

    Endpoint: /ask_ai

    +

    Method: POST

    +

    Description: This route sends to chatgpt IA a request that will parse it into an understandable output for the application and then run the request.

    +

    Request Body:

    +
    {
    +  "input": "<user input request>",
    +  "dryrun": true or false
    +}
    +
    +
      +
    • input (required): The user input requesting the AI to perform an action on some devices or get the devices list.
    • +
    • dryrun (optional): If set to true, it will return the parameters to run the request but it won't run it. default is false.
    • +
    +

    Response:

    +
      +
    • A JSON array containing the action to run and the parameters and the result of the action.
    • +

    Automation module

    -

    the automation module

    +

    The automation module

    Standalone module

    import connpy
    -router = connpy.node("unique name","ip/hostname", user="user", password="pass")
    +router = connpy.node("uniqueName","ip/host", user="user", password="pass")
     router.run(["term len 0","show run"])
     print(router.output)
     hasip = router.test("show ip int brief","1.1.1.1")
    @@ -163,9 +198,9 @@ nodes["router2"] = conf.getitem("router2@office")
     nodes["router10"] = conf.getitem("router10@datacenter")
     #Also, you can create the nodes manually:
     nodes = {}
    -nodes["router1"] = {"host": "1.1.1.1", "user": "username", "password": "pass1"}
    -nodes["router2"] = {"host": "1.1.1.2", "user": "username", "password": "pass2"}
    -nodes["router3"] = {"host": "1.1.1.2", "user": "username", "password": "pass3"}
    +nodes["router1"] = {"host": "1.1.1.1", "user": "user", "password": "pass1"}
    +nodes["router2"] = {"host": "1.1.1.2", "user": "user", "password": "pass2"}
    +nodes["router3"] = {"host": "1.1.1.2", "user": "user", "password": "pass3"}
     #Finally you run some tasks on the nodes
     mynodes = connpy.nodes(nodes, config = conf)
     result = mynodes.test(["show ip int br"], "1.1.1.2")
    @@ -198,6 +233,16 @@ routers.test("ping {ip}", expected, variables)
     for key in routers.result:
         print(key, ' ---> ', ("pass" if routers.result[key] else "fail"))
     
    +

    Using AI

    +
    import connpy
    +conf = connpy.configfile()
    +organization = 'openai-org'
    +api_key = "openai-key"
    +myia = ai(conf, organization, api_key)
    +input = "go to router 1 and get me the full configuration"
    +result = myia.ask(input, dryrun = False)
    +print(result)
    +
    Expand source code @@ -302,7 +347,31 @@ With the Connpy API you can run commands on devices using http requests --- -### 2. Run Commands +### 2. Get Nodes + +**Endpoint**: `/get_nodes` + +**Method**: `POST` + +**Description**: This route returns a dictionary of nodes with all their attributes. It can also filter the nodes based on a given keyword. + +#### Request Body: + +```json +{ + "filter": "<keyword>" +} +``` + +* `filter` (optional): A keyword to filter the nodes. It returns only the nodes that contain the keyword. If not provided, the route will return the entire list of nodes. + +#### Response: + +- A JSON array containing the filtered nodes. + +--- + +### 3. Run Commands **Endpoint**: `/run_commands` @@ -331,13 +400,37 @@ With the Connpy API you can run commands on devices using http requests #### Response: - A JSON object with the results of the executed commands on the nodes. -## Automation module -the automation module +### 4. Ask AI + +**Endpoint**: `/ask_ai` + +**Method**: `POST` + +**Description**: This route sends to chatgpt IA a request that will parse it into an understandable output for the application and then run the request. + +#### Request Body: + +```json +{ + "input": "<user input request>", + "dryrun": true or false +} +``` + +* `input` (required): The user input requesting the AI to perform an action on some devices or get the devices list. +* `dryrun` (optional): If set to true, it will return the parameters to run the request but it won't run it. default is false. + +#### Response: + +- A JSON array containing the action to run and the parameters and the result of the action. + +## Automation module +The automation module ### Standalone module ``` import connpy -router = connpy.node("unique name","ip/hostname", user="user", password="pass") +router = connpy.node("uniqueName","ip/host", user="user", password="pass") router.run(["term len 0","show run"]) print(router.output) hasip = router.test("show ip int brief","1.1.1.1") @@ -369,9 +462,9 @@ nodes["router2"] = conf.getitem("router2@office") nodes["router10"] = conf.getitem("router10@datacenter") #Also, you can create the nodes manually: nodes = {} -nodes["router1"] = {"host": "1.1.1.1", "user": "username", "password": "pass1"} -nodes["router2"] = {"host": "1.1.1.2", "user": "username", "password": "pass2"} -nodes["router3"] = {"host": "1.1.1.2", "user": "username", "password": "pass3"} +nodes["router1"] = {"host": "1.1.1.1", "user": "user", "password": "pass1"} +nodes["router2"] = {"host": "1.1.1.2", "user": "user", "password": "pass2"} +nodes["router3"] = {"host": "1.1.1.2", "user": "user", "password": "pass3"} #Finally you run some tasks on the nodes mynodes = connpy.nodes(nodes, config = conf) result = mynodes.test(["show ip int br"], "1.1.1.2") @@ -405,14 +498,27 @@ routers.test("ping {ip}", expected, variables) for key in routers.result: print(key, ' ---> ', ("pass" if routers.result[key] else "fail")) ``` +### Using AI +``` +import connpy +conf = connpy.configfile() +organization = 'openai-org' +api_key = "openai-key" +myia = ai(conf, organization, api_key) +input = "go to router 1 and get me the full configuration" +result = myia.ask(input, dryrun = False) +print(result) +``` ''' from .core import node,nodes from .configfile import configfile from .connapp import connapp +from .api import * +from .ai import ai from ._version import __version__ from pkg_resources import get_distribution -__all__ = ["node", "nodes", "configfile", "connapp"] +__all__ = ["node", "nodes", "configfile", "connapp", "ai"] __author__ = "Federico Luzzi" __pdoc__ = { 'core': False, @@ -430,6 +536,472 @@ __pdoc__ = {

    Classes

    +
    +class ai +(config, org=None, api_key=None, model='gpt-3.5-turbo', temp=0.7) +
    +
    +

    This class generates a ai object. Containts all the information and methods to make requests to openAI chatGPT to run actions on the application.

    +

    Attributes:

    +
    - model        (str): Model of GPT api to use. Default is gpt-3.5-turbo.
    +
    +- temp       (float): Value between 0 and 1 that control the randomness 
    +                      of generated text, with higher values increasing 
    +                      creativity. Default is 0.7.
    +
    +

    Parameters:

    +
    - config (obj): Pass the object created with class configfile with 
    +                key for decryption and extra configuration if you 
    +                are using connection manager.
    +
    +

    Optional Parameters:

    +
    - org     (str): A unique token identifying the user organization
    +                 to interact with the API.
    +
    +- api_key (str): A unique authentication token required to access 
    +                 and interact with the API.
    +
    +- model   (str): Model of GPT api to use. Default is gpt-3.5-turbo.
    +
    +- temp  (float): Value between 0 and 1 that control the randomness 
    +                 of generated text, with higher values increasing 
    +                 creativity. Default is 0.7.
    +
    +
    + +Expand source code + +
    class ai:
    +    ''' This class generates a ai object. Containts all the information and methods to make requests to openAI chatGPT to run actions on the application.
    +
    +    ### Attributes:  
    +
    +        - model        (str): Model of GPT api to use. Default is gpt-3.5-turbo.
    +
    +        - temp       (float): Value between 0 and 1 that control the randomness 
    +                              of generated text, with higher values increasing 
    +                              creativity. Default is 0.7.
    +
    +        '''
    +
    +    def __init__(self, config, org = None, api_key = None, model = "gpt-3.5-turbo", temp = 0.7):
    +        ''' 
    +            
    +        ### Parameters:  
    +
    +            - config (obj): Pass the object created with class configfile with 
    +                            key for decryption and extra configuration if you 
    +                            are using connection manager.  
    +
    +        ### Optional Parameters:  
    +
    +            - org     (str): A unique token identifying the user organization
    +                             to interact with the API.
    +
    +            - api_key (str): A unique authentication token required to access 
    +                             and interact with the API.
    +
    +            - model   (str): Model of GPT api to use. Default is gpt-3.5-turbo. 
    +
    +            - temp  (float): Value between 0 and 1 that control the randomness 
    +                             of generated text, with higher values increasing 
    +                             creativity. Default is 0.7.
    +   
    +
    +        '''
    +        self.config = config
    +        if org:
    +            openai.organization = org
    +        else:
    +            try: 
    +                openai.organization = self.config.config["openai"]["organization"]
    +            except:
    +                raise ValueError("Missing openai organization")
    +        if api_key:
    +            openai.api_key = api_key
    +        else:
    +            try: 
    +                openai.api_key = self.config.config["openai"]["api_key"]
    +            except:
    +                raise ValueError("Missing openai api_key")
    +        self.__prompt = {}
    +        self.__prompt["original_system"] = """
    +            When provided with user input for an SSH connection management application, analyze the input and extract the following information:
    +
    +            - app_related: True if the input is related to the application's purpose and the request is understood; False if the input is not related, not understood, or if mandatory information like filter is missing.
    +            - type: Given a user input, identify the type of request they want to make. The input will represent one of two options: 
    +            1. "command" - The user wants to get information from devices by running commands.
    +            2. "list_nodes" - The user wants to get a list of nodes, devices, servers, or routers.
    +            Response wich command or list_nodes type depending on the request.
    +            - filter: One or more regex patterns indicating the device or group of devices the command should be run on, returned as a Python list (e.g., ['hostname', 'hostname@folder', '@subfolder@folder']). The filter can have different formats, such as:
    +                - hostname
    +                - hostname@folder
    +                - hostname@subfolder@folder
    +                - partofhostname
    +                - @folder
    +                - @subfolder@folder
    +                - regex_pattern
    +                The filter should be extracted from the user input exactly as it was provided.
    +                Always preserve the exact filter pattern provided by the user, with no modifications. Do not process any regex, the application can do that.
    +                If no filter is specified, set it to None.
    +            - Expected: A value representing an expected output to search for when running the command. For the user this is a optional value. Set it to 'None' if no value was captured.
    +              expected value should ALWAYS come from the user input explicitly.
    +            - response: An optional field to be filled when app_related is False or when providing an explanation related to the app. This is where you can engage in small talk, answer questions not related to the app, or provide explanations about the extracted information.
    +                
    +            Always respond in the following format:
    +                
    +                app_related: {{app_related}}
    +                Type: {{command}}
    +                Filter: {{filter}}
    +                Expected: {{expected}}
    +                Response: {{response}}
    +    """ 
    +        self.__prompt["original_user"] = "Get the IP addresses of loopback0 for all routers from w2az1 and e1.*(prod|dev) and check if they have the ip 192.168.1.1"
    +        self.__prompt["original_assistant"] = "app_related: True\nType: Command\nFilter: ['w2az1', 'e1.*(prod|dev)']\nExpected: 192.168.1.1"
    +        self.__prompt["command_system"] = """
    +    For each device listed below, provide the command(s) needed to perform the specified action, depending on the device OS (e.g., Cisco IOSXR router, Linux server). Always format your response as a Python list (e.g., ['command1', 'command2']). 
    +
    +    Note that the application knows how to connect to devices via SSH, so you only need to provide the command(s) to run after connecting. 
    +
    +    If the commands needed are not for the specific OS type, just send an empty list (e.g., []).
    +
    +    It is crucial to always include the device name provided in your response, even when there is only one device.
    +
    +    your response has to be always like this:
    +        node1: ["command1", "command2"]
    +        node2: ["command1", "command2", "command3"]
    +        node1@folder: ["command1"]
    +        Node4@subfolder@folder: []
    +    """
    +        self.__prompt["command_user"]= """
    +    input: show me the full configuration for all this devices:
    +
    +    Devices:
    +    router1: cisco ios
    +    """
    +        self.__prompt["command_assistant"]= """
    +    router1: ['show running-config']
    +    """
    +        self.model = model
    +        self.temp = temp
    +
    +    def _clean_original_response(self, raw_response):
    +        #Parse response for first request to openAI GPT.
    +        info_dict = {}
    +        info_dict["app_related"] = False
    +        current_key = "response"
    +        for line in raw_response.split("\n"):
    +            if line.strip() == "":
    +                line = "\n"
    +            possible_keys = ["app_related", "type", "filter", "expected", "response"]
    +            if ':' in line and (key := line.split(':', 1)[0].strip().lower()) in possible_keys:
    +                key, value = line.split(":", 1)
    +                key = key.strip().lower()
    +                value = value.strip()
    +                # Convert "true" or "false" (case-insensitive) to Python boolean
    +                if value.lower() == "true":
    +                    value = True
    +                elif value.lower() == "false":
    +                    value = False
    +                elif value.lower() == "none":
    +                    value = None
    +                if key == "filter":
    +                    value = ast.literal_eval(value)
    +                #store in dictionary
    +                info_dict[key] = value
    +                current_key = key
    +            else:
    +                if current_key == "response":
    +                    if "response" in info_dict:
    +                        info_dict[current_key] += "\n" + line
    +                    else:
    +                        info_dict[current_key] = line
    +
    +        return info_dict
    +
    +    def _clean_command_response(self, raw_response):
    +        #Parse response for command request to openAI GPT.
    +        info_dict = {}
    +        info_dict["commands"] = []
    +        info_dict["variables"] = {}
    +        info_dict["variables"]["__global__"] = {}
    +        for line in raw_response.split("\n"):
    +            if ":" in line:
    +                key, value = line.split(":", 1)
    +                key = key.strip()
    +                newvalue = {}
    +                try:
    +                    value = ast.literal_eval(value.strip())
    +                    for i,e in enumerate(value, start=1):
    +                        newvalue[f"command{i}"] = e
    +                        if f"{{command{i}}}" not in info_dict["commands"]:
    +                            info_dict["commands"].append(f"{{command{i}}}")
    +                            info_dict["variables"]["__global__"][f"command{i}"] = ""
    +                    info_dict["variables"][key] = newvalue
    +                except:
    +                    pass
    +        return info_dict
    +
    +
    +    def _get_commands(self, user_input, nodes):
    +        #Send the request for commands for each device to openAI GPT.
    +        output_list = []
    +        for key, value in nodes.items():
    +            tags = value.get('tags', {})
    +            try:
    +                os_value = tags.get('os', '')
    +            except:
    +                os_value = ""
    +            output_list.append(f"{key}: {os_value}")
    +        output_str = "\n".join(output_list)
    +        command_input = f"input: {user_input}\n\nDevices:\n{output_str}"
    +        message = []
    +        message.append({"role": "system", "content": dedent(self.__prompt["command_system"])})
    +        message.append({"role": "user", "content": dedent(self.__prompt["command_user"])})
    +        message.append({"role": "assistant", "content": dedent(self.__prompt["command_assistant"])})
    +        message.append({"role": "user", "content": command_input})
    +        response = openai.ChatCompletion.create(
    +            model=self.model,
    +            messages=message,
    +            temperature=self.temp
    +            )
    +        output = {}
    +        output["dict_response"] = response
    +        output["raw_response"] = response["choices"][0]["message"]["content"] 
    +        output["response"] = self._clean_command_response(output["raw_response"])
    +        return output
    +
    +    def _get_filter(self, user_input):
    +        #Send the request to identify the filter and other attributes from the user input to GPT.
    +        message = []
    +        message.append({"role": "system", "content": dedent(self.__prompt["original_system"])})
    +        message.append({"role": "user", "content": dedent(self.__prompt["original_user"])})
    +        message.append({"role": "assistant", "content": dedent(self.__prompt["original_assistant"])})
    +        message.append({"role": "user", "content": user_input})
    +
    +        response = openai.ChatCompletion.create(
    +            model=self.model,
    +            messages=message,
    +            temperature=self.temp,
    +            top_p=1
    +            )
    +
    +        output = {}
    +        output["dict_response"] = response
    +        output["raw_response"] = response["choices"][0]["message"]["content"] 
    +        clear_response = self._clean_original_response(output["raw_response"])
    +        output["response"] = self._clean_original_response(output["raw_response"])
    +        return output
    +        
    +    def ask(self, user_input, dryrun = False):
    +        '''
    +        Send the user input to openAI GPT and parse the response to run an action in the application.
    +
    +        ### Parameters:  
    +
    +            - user_input (str): Request to send to openAI that will be parsed
    +                                and returned to execute on the application.
    +                                AI understands the following tasks:
    +                                - Run a command on a group of devices.
    +                                - List a group of devices.
    +                                - Test a command on a group of devices
    +                                  and verify if the output contain an
    +                                  expected value.
    +
    +        ### Optional Parameters:  
    +
    +            - dryrun  (bool): Set to true to get the arguments to use to run
    +                              in the app. Default is false and it will run 
    +                              the actions directly.
    +
    +        ### Returns:  
    +
    +            dict: Dictionary formed with the following keys:
    +                  - input: User input received
    +                  - app_related: True if GPT detected the request to be related
    +                    to the application.
    +                  - dryrun: True/False
    +                  - response: If the request is not related to the app. this
    +                    key will contain chatGPT answer.
    +                  - action: The action detected by the AI to run in the app.
    +                  - filter: If it was detected by the AI, the filter used
    +                    to get the list of nodes to work on.
    +                  - nodes: If it's not a dryrun, the list of nodes matched by
    +                    the filter.
    +                  - args: A dictionary of arguments required to run command(s)
    +                    on the nodes.
    +                  - result: A dictionary with the output of the commands or 
    +                    the test.
    +                    
    +
    +        '''
    +        output = {}
    +        original = self._get_filter(user_input)
    +        output["input"] = user_input
    +        output["app_related"] = original["response"]["app_related"]
    +        output["dryrun"] = dryrun
    +        if not output["app_related"]:
    +            output["response"] = original["response"]["response"]
    +        else:
    +            type = original["response"]["type"].lower()
    +            if "filter" in original["response"]:
    +                output["filter"] = original["response"]["filter"]
    +                if not self.config.config["case"]:
    +                    if isinstance(output["filter"], list):
    +                        output["filter"] = [item.lower() for item in output["filter"]]
    +                    else:
    +                        output["filter"] = output["filter"].lower()
    +                if not dryrun or type == "command":
    +                    thisnodes = self.config._getallnodesfull(output["filter"])
    +                    output["nodes"] = list(thisnodes.keys())
    +            if not type == "command":
    +                output["action"] = type
    +            else:
    +                commands = self._get_commands(user_input, thisnodes)
    +                output["args"] = {}
    +                output["args"]["commands"] = commands["response"]["commands"]
    +                output["args"]["vars"] = commands["response"]["variables"]
    +                if original["response"]["expected"]:
    +                    output["args"]["expected"] = original["response"]["expected"]
    +                    output["action"] = "test"
    +                else:
    +                    output["action"] = "run"
    +                if not dryrun:
    +                    mynodes = nodes(self.config.getitems(output["nodes"]),config=self.config)
    +                    if output["action"] == "test":
    +                        output["result"] = mynodes.test(**output["args"])
    +                    elif output["action"] == "run":
    +                        output["result"] = mynodes.run(**output["args"])
    +        return output
    +
    +

    Methods

    +
    +
    +def ask(self, user_input, dryrun=False) +
    +
    +

    Send the user input to openAI GPT and parse the response to run an action in the application.

    +

    Parameters:

    +
    - user_input (str): Request to send to openAI that will be parsed
    +                    and returned to execute on the application.
    +                    AI understands the following tasks:
    +                    - Run a command on a group of devices.
    +                    - List a group of devices.
    +                    - Test a command on a group of devices
    +                      and verify if the output contain an
    +                      expected value.
    +
    +

    Optional Parameters:

    +
    - dryrun  (bool): Set to true to get the arguments to use to run
    +                  in the app. Default is false and it will run 
    +                  the actions directly.
    +
    +

    Returns:

    +
    dict: Dictionary formed with the following keys:
    +      - input: User input received
    +      - app_related: True if GPT detected the request to be related
    +        to the application.
    +      - dryrun: True/False
    +      - response: If the request is not related to the app. this
    +        key will contain chatGPT answer.
    +      - action: The action detected by the AI to run in the app.
    +      - filter: If it was detected by the AI, the filter used
    +        to get the list of nodes to work on.
    +      - nodes: If it's not a dryrun, the list of nodes matched by
    +        the filter.
    +      - args: A dictionary of arguments required to run command(s)
    +        on the nodes.
    +      - result: A dictionary with the output of the commands or 
    +        the test.
    +
    +
    + +Expand source code + +
    def ask(self, user_input, dryrun = False):
    +    '''
    +    Send the user input to openAI GPT and parse the response to run an action in the application.
    +
    +    ### Parameters:  
    +
    +        - user_input (str): Request to send to openAI that will be parsed
    +                            and returned to execute on the application.
    +                            AI understands the following tasks:
    +                            - Run a command on a group of devices.
    +                            - List a group of devices.
    +                            - Test a command on a group of devices
    +                              and verify if the output contain an
    +                              expected value.
    +
    +    ### Optional Parameters:  
    +
    +        - dryrun  (bool): Set to true to get the arguments to use to run
    +                          in the app. Default is false and it will run 
    +                          the actions directly.
    +
    +    ### Returns:  
    +
    +        dict: Dictionary formed with the following keys:
    +              - input: User input received
    +              - app_related: True if GPT detected the request to be related
    +                to the application.
    +              - dryrun: True/False
    +              - response: If the request is not related to the app. this
    +                key will contain chatGPT answer.
    +              - action: The action detected by the AI to run in the app.
    +              - filter: If it was detected by the AI, the filter used
    +                to get the list of nodes to work on.
    +              - nodes: If it's not a dryrun, the list of nodes matched by
    +                the filter.
    +              - args: A dictionary of arguments required to run command(s)
    +                on the nodes.
    +              - result: A dictionary with the output of the commands or 
    +                the test.
    +                
    +
    +    '''
    +    output = {}
    +    original = self._get_filter(user_input)
    +    output["input"] = user_input
    +    output["app_related"] = original["response"]["app_related"]
    +    output["dryrun"] = dryrun
    +    if not output["app_related"]:
    +        output["response"] = original["response"]["response"]
    +    else:
    +        type = original["response"]["type"].lower()
    +        if "filter" in original["response"]:
    +            output["filter"] = original["response"]["filter"]
    +            if not self.config.config["case"]:
    +                if isinstance(output["filter"], list):
    +                    output["filter"] = [item.lower() for item in output["filter"]]
    +                else:
    +                    output["filter"] = output["filter"].lower()
    +            if not dryrun or type == "command":
    +                thisnodes = self.config._getallnodesfull(output["filter"])
    +                output["nodes"] = list(thisnodes.keys())
    +        if not type == "command":
    +            output["action"] = type
    +        else:
    +            commands = self._get_commands(user_input, thisnodes)
    +            output["args"] = {}
    +            output["args"]["commands"] = commands["response"]["commands"]
    +            output["args"]["vars"] = commands["response"]["variables"]
    +            if original["response"]["expected"]:
    +                output["args"]["expected"] = original["response"]["expected"]
    +                output["action"] = "test"
    +            else:
    +                output["action"] = "run"
    +            if not dryrun:
    +                mynodes = nodes(self.config.getitems(output["nodes"]),config=self.config)
    +                if output["action"] == "test":
    +                    output["result"] = mynodes.test(**output["args"])
    +                elif output["action"] == "run":
    +                    output["result"] = mynodes.run(**output["args"])
    +    return output
    +
    +
    +
    +
    class configfile (conf=None, key=None) @@ -549,7 +1121,7 @@ __pdoc__ = { def _createconfig(self, conf): #Create config file - defaultconfig = {'config': {'case': False, 'idletime': 30, 'fzf': False}, 'connections': {}, 'profiles': { "default": { "host":"", "protocol":"ssh", "port":"", "user":"", "password":"", "options":"", "logs":"" }}} + defaultconfig = {'config': {'case': False, 'idletime': 30, 'fzf': False}, 'connections': {}, 'profiles': { "default": { "host":"", "protocol":"ssh", "port":"", "user":"", "password":"", "options":"", "logs":"", "tags": "" }}} if not os.path.exists(conf): with open(conf, "w") as f: json.dump(defaultconfig, f, indent = 4) @@ -616,8 +1188,8 @@ __pdoc__ = { ### Returns: - dict: Dictionary containing information of node or multiple dictionaries - of multiple nodes. + dict: Dictionary containing information of node or multiple + dictionaries of multiple nodes. ''' uniques = self._explode_unique(unique) @@ -654,14 +1226,53 @@ __pdoc__ = { newnode.pop("type") return newnode - def _connections_add(self,*, id, host, folder='', subfolder='', options='', logs='', password='', port='', protocol='', user='', type = "connection" ): + def getitems(self, uniques): + ''' + Get a group of nodes from configfile which can be passed to node/nodes class + + ### Parameters: + + - uniques (str/list): Regex string name that will match hostnames + from the connection manager. It can be a + list of strings. + + ### Returns: + + dict: Dictionary containing information of node or multiple + dictionaries of multiple nodes. + + ''' + nodes = {} + for i in uniques: + if isinstance(i, dict): + name = list(i.keys())[0] + mylist = i[name] + if not self.config["case"]: + name = name.lower() + mylist = [item.lower() for item in mylist] + this = self.getitem(name, mylist) + nodes.update(this) + elif i.startswith("@"): + if not self.config["case"]: + i = i.lower() + this = self.getitem(i) + nodes.update(this) + else: + if not self.config["case"]: + i = i.lower() + this = self.getitem(i) + nodes[i] = this + return nodes + + + def _connections_add(self,*, id, host, folder='', subfolder='', options='', logs='', password='', port='', protocol='', user='', tags='', type = "connection" ): #Add connection from config if folder == '': - self.connections[id] = {"host": host, "options": options, "logs": logs, "password": password, "port": port, "protocol": protocol, "user": user, "type": type} + self.connections[id] = {"host": host, "options": options, "logs": logs, "password": password, "port": port, "protocol": protocol, "user": user, "tags": tags,"type": type} elif folder != '' and subfolder == '': - self.connections[folder][id] = {"host": host, "options": options, "logs": logs, "password": password, "port": port, "protocol": protocol, "user": user, "type": type} + self.connections[folder][id] = {"host": host, "options": options, "logs": logs, "password": password, "port": port, "protocol": protocol, "user": user, "tags": tags, "type": type} elif folder != '' and subfolder != '': - self.connections[folder][subfolder][id] = {"host": host, "options": options, "logs": logs, "password": password, "port": port, "protocol": protocol, "user": user, "type": type} + self.connections[folder][subfolder][id] = {"host": host, "options": options, "logs": logs, "password": password, "port": port, "protocol": protocol, "user": user, "tags": tags, "type": type} def _connections_del(self,*, id, folder='', subfolder=''): @@ -690,16 +1301,16 @@ __pdoc__ = { del self.connections[folder][subfolder] - def _profiles_add(self,*, id, host = '', options='', logs='', password='', port='', protocol='', user='' ): + def _profiles_add(self,*, id, host = '', options='', logs='', password='', port='', protocol='', user='', tags='' ): #Add profile from config - self.profiles[id] = {"host": host, "options": options, "logs": logs, "password": password, "port": port, "protocol": protocol, "user": user} + self.profiles[id] = {"host": host, "options": options, "logs": logs, "password": password, "port": port, "protocol": protocol, "user": user, "tags": tags} def _profiles_del(self,*, id ): #Delete profile from config del self.profiles[id] - def _getallnodes(self): + def _getallnodes(self, filter = None): #get all nodes on configfile nodes = [] layer1 = [k for k,v in self.connections.items() if isinstance(v, dict) and v["type"] == "connection"] @@ -712,8 +1323,51 @@ __pdoc__ = { for s in subfolders: layer3 = [k + "@" + s + "@" + f for k,v in self.connections[f][s].items() if isinstance(v, dict) and v["type"] == "connection"] nodes.extend(layer3) + if filter: + if isinstance(filter, str): + nodes = [item for item in nodes if re.search(filter, item)] + elif isinstance(filter, list): + nodes = [item for item in nodes if any(re.search(pattern, item) for pattern in filter)] + else: + raise ValueError("filter must be a string or a list of strings") return nodes + def _getallnodesfull(self, filter = None): + #get all nodes on configfile with all their attributes. + nodes = {} + layer1 = {k:v for k,v in self.connections.items() if isinstance(v, dict) and v["type"] == "connection"} + folders = [k for k,v in self.connections.items() if isinstance(v, dict) and v["type"] == "folder"] + nodes.update(layer1) + for f in folders: + layer2 = {k + "@" + f:v for k,v in self.connections[f].items() if isinstance(v, dict) and v["type"] == "connection"} + nodes.update(layer2) + subfolders = [k for k,v in self.connections[f].items() if isinstance(v, dict) and v["type"] == "subfolder"] + for s in subfolders: + layer3 = {k + "@" + s + "@" + f:v for k,v in self.connections[f][s].items() if isinstance(v, dict) and v["type"] == "connection"} + nodes.update(layer3) + if filter: + if isinstance(filter, str): + nodes = {k: v for k, v in nodes.items() if re.search(filter, k)} + elif isinstance(filter, list): + nodes = {k: v for k, v in nodes.items() if any(re.search(pattern, k) for pattern in filter)} + else: + raise ValueError("filter must be a string or a list of strings") + for node, keys in nodes.items(): + for key, value in keys.items(): + profile = re.search("^@(.*)", str(value)) + if profile: + try: + nodes[node][key] = self.profiles[profile.group(1)][key] + except: + nodes[node][key] = "" + elif value == '' and key == "protocol": + try: + nodes[node][key] = config.profiles["default"][key] + except: + nodes[node][key] = "ssh" + return nodes + + def _getallfolders(self): #get all folders on configfile folders = ["@" + k for k,v in self.connections.items() if isinstance(v, dict) and v["type"] == "folder"] @@ -756,8 +1410,8 @@ __pdoc__ = { nodes inside the folder passing a list.

    Returns:

    -
    dict: Dictionary containing information of node or multiple dictionaries
    -      of multiple nodes.
    +
    dict: Dictionary containing information of node or multiple 
    +      dictionaries of multiple nodes.
     
    @@ -780,8 +1434,8 @@ __pdoc__ = { ### Returns: - dict: Dictionary containing information of node or multiple dictionaries - of multiple nodes. + dict: Dictionary containing information of node or multiple + dictionaries of multiple nodes. ''' uniques = self._explode_unique(unique) @@ -819,6 +1473,63 @@ __pdoc__ = { return newnode
    +
    +def getitems(self, uniques) +
    +
    +

    Get a group of nodes from configfile which can be passed to node/nodes class

    +

    Parameters:

    +
    - uniques (str/list): Regex string name that will match hostnames 
    +                      from the connection manager. It can be a 
    +                      list of strings.
    +
    +

    Returns:

    +
    dict: Dictionary containing information of node or multiple 
    +      dictionaries of multiple nodes.
    +
    +
    + +Expand source code + +
    def getitems(self, uniques):
    +    '''
    +    Get a group of nodes from configfile which can be passed to node/nodes class
    +
    +    ### Parameters:  
    +
    +        - uniques (str/list): Regex string name that will match hostnames 
    +                              from the connection manager. It can be a 
    +                              list of strings.
    +
    +    ### Returns:  
    +
    +        dict: Dictionary containing information of node or multiple 
    +              dictionaries of multiple nodes.
    +
    +    '''
    +    nodes = {}
    +    for i in uniques:
    +        if isinstance(i, dict):
    +            name = list(i.keys())[0]
    +            mylist = i[name]
    +            if not self.config["case"]:
    +                name = name.lower()
    +                mylist = [item.lower() for item in mylist]
    +            this = self.getitem(name, mylist)
    +            nodes.update(this)
    +        elif i.startswith("@"):
    +            if not self.config["case"]:
    +                i = i.lower()
    +            this = self.getitem(i)
    +            nodes.update(this)
    +        else:
    +            if not self.config["case"]:
    +                i = i.lower()
    +            this = self.getitem(i)
    +            nodes[i] = this
    +    return nodes
    +
    +
    @@ -932,6 +1643,7 @@ __pdoc__ = { configcrud.add_argument("--keepalive", dest="idletime", nargs=1, action=self._store_type, help="Set keepalive time in seconds, 0 to disable", type=int, metavar="INT") configcrud.add_argument("--completion", dest="completion", nargs=1, choices=["bash","zsh"], action=self._store_type, help="Get terminal completion configuration for conn") configcrud.add_argument("--configfolder", dest="configfolder", nargs=1, action=self._store_type, help="Set the default location for config file", metavar="FOLDER") + configcrud.add_argument("--openai", dest="openai", nargs=2, action=self._store_type, help="Set openai organization and api_key", metavar=("ORGANIZATION", "API_KEY")) configparser.set_defaults(func=self._func_others) #Manage sys arguments commands = ["node", "profile", "mv", "move","copy", "cp", "bulk", "ls", "list", "run", "config", "api"] @@ -1057,6 +1769,7 @@ __pdoc__ = { print("You can also leave empty any value except hostname/IP.") print("You can pass 1 or more passwords using comma separated @profiles") print("You can use this variables on logging file name: ${id} ${unique} ${host} ${port} ${user} ${protocol}") + print("Some useful tags to set for automation are 'os', 'screen_length_command', and 'prompt'.") newnode = self._questions_nodes(args.data, uniques) if newnode == False: exit(7) @@ -1185,7 +1898,7 @@ __pdoc__ = { def _func_others(self, args): #Function called when using other commands - actions = {"ls": self._ls, "move": self._mvcp, "cp": self._mvcp, "bulk": self._bulk, "completion": self._completion, "case": self._case, "fzf": self._fzf, "idletime": self._idletime, "configfolder": self._configfolder} + actions = {"ls": self._ls, "move": self._mvcp, "cp": self._mvcp, "bulk": self._bulk, "completion": self._completion, "case": self._case, "fzf": self._fzf, "idletime": self._idletime, "configfolder": self._configfolder, "openai": self._openai} return actions.get(args.command)(args) def _ls(self, args): @@ -1255,6 +1968,7 @@ __pdoc__ = { newnode["port"] = newnodes["port"] newnode["options"] = newnodes["options"] newnode["logs"] = newnodes["logs"] + newnode["tags"] = newnodes["tags"] newnode["user"] = newnodes["user"] newnode["password"] = newnodes["password"] count +=1 @@ -1300,6 +2014,13 @@ __pdoc__ = { with open(pathfile, "w") as f: f.write(str(folder)) print("Config saved") + + def _openai(self, args): + openaikeys = {} + openaikeys["organization"] = args.data[0] + openaikeys["api_key"] = args.data[1] + self._change_settings(args.command, openaikeys) + def _change_settings(self, name, value): self.config.config[name] = value @@ -1329,7 +2050,6 @@ __pdoc__ = { def _node_run(self, args): command = " ".join(args.data[1:]) - command = command.split("-") matches = list(filter(lambda k: k == args.data[0], self.nodes)) if len(matches) == 0: print("{} not found".format(args.data[0])) @@ -1358,7 +2078,6 @@ __pdoc__ = { print("failed reading file {}".format(args.data[0])) exit(10) for script in scripts["tasks"]: - nodes = {} args = {} try: action = script["action"] @@ -1370,18 +2089,7 @@ __pdoc__ = { except KeyError as e: print("'{}' is mandatory".format(e.args[0])) exit(11) - for i in nodelist: - if isinstance(i, dict): - name = list(i.keys())[0] - this = self.config.getitem(name, i[name]) - nodes.update(this) - elif i.startswith("@"): - this = self.config.getitem(i) - nodes.update(this) - else: - this = self.config.getitem(i) - nodes[i] = this - nodes = self.connnodes(nodes, config = self.config) + nodes = self.connnodes(self.config.getitems(nodelist), config = self.config) stdout = False if output is None: pass @@ -1511,6 +2219,33 @@ __pdoc__ = { raise inquirer.errors.ValidationError("", reason="Profile {} don't exist".format(i)) return True + def _tags_validation(self, answers, current): + #Validation for Tags in inquirer when managing nodes + if current.startswith("@"): + if current[1:] not in self.profiles: + raise inquirer.errors.ValidationError("", reason="Profile {} don't exist".format(current)) + elif current != "": + isdict = False + try: + isdict = ast.literal_eval(current) + except: + pass + if not isinstance (isdict, dict): + raise inquirer.errors.ValidationError("", reason="Tags should be a python dictionary.".format(current)) + return True + + def _profile_tags_validation(self, answers, current): + #Validation for Tags in inquirer when managing profiles + if current != "": + isdict = False + try: + isdict = ast.literal_eval(current) + except: + pass + if not isinstance (isdict, dict): + raise inquirer.errors.ValidationError("", reason="Tags should be a python dictionary.".format(current)) + return True + def _default_validation(self, answers, current): #Default validation type used in multiples questions in inquirer if current.startswith("@"): @@ -1557,6 +2292,7 @@ __pdoc__ = { questions.append(inquirer.Confirm("port", message="Edit Port?")) questions.append(inquirer.Confirm("options", message="Edit Options?")) questions.append(inquirer.Confirm("logs", message="Edit logging path/file?")) + questions.append(inquirer.Confirm("tags", message="Edit tags?")) questions.append(inquirer.Confirm("user", message="Edit User?")) questions.append(inquirer.Confirm("password", message="Edit password?")) answers = inquirer.prompt(questions) @@ -1566,12 +2302,13 @@ __pdoc__ = { #Questions when adding or editing nodes try: defaults = self.config.getitem(unique) + if "tags" not in defaults: + defaults["tags"] = "" except: - defaults = { "host":"", "protocol":"", "port":"", "user":"", "options":"", "logs":"" } + defaults = { "host":"", "protocol":"", "port":"", "user":"", "options":"", "logs":"" , "tags":""} node = {} - if edit == None: - edit = { "host":True, "protocol":True, "port":True, "user":True, "password": True,"options":True, "logs":True } + edit = { "host":True, "protocol":True, "port":True, "user":True, "password": True,"options":True, "logs":True, "tags":True } questions = [] if edit["host"]: questions.append(inquirer.Text("host", message="Add Hostname or IP", validate=self._host_validation, default=defaults["host"])) @@ -1593,6 +2330,10 @@ __pdoc__ = { questions.append(inquirer.Text("logs", message="Pick logging path/file ", validate=self._default_validation, default=defaults["logs"].replace("{","{{").replace("}","}}"))) else: node["logs"] = defaults["logs"] + if edit["tags"]: + questions.append(inquirer.Text("tags", message="Add tags dictionary", validate=self._tags_validation, default=str(defaults["tags"]).replace("{","{{").replace("}","}}"))) + else: + node["tags"] = defaults["tags"] if edit["user"]: questions.append(inquirer.Text("user", message="Pick username", validate=self._default_validation, default=defaults["user"])) else: @@ -1619,6 +2360,8 @@ __pdoc__ = { answer["password"] = passa["password"].split(",") elif answer["password"] == "No Password": answer["password"] = "" + if "tags" in answer.keys() and not answer["tags"].startswith("@") and answer["tags"]: + answer["tags"] = ast.literal_eval(answer["tags"]) result = {**uniques, **answer, **node} result["type"] = "connection" return result @@ -1627,11 +2370,13 @@ __pdoc__ = { #Questions when adding or editing profiles try: defaults = self.config.profiles[unique] + if "tags" not in defaults: + defaults["tags"] = "" except: - defaults = { "host":"", "protocol":"", "port":"", "user":"", "options":"", "logs":"" } + defaults = { "host":"", "protocol":"", "port":"", "user":"", "options":"", "logs":"", "tags": "" } profile = {} if edit == None: - edit = { "host":True, "protocol":True, "port":True, "user":True, "password": True,"options":True, "logs":True } + edit = { "host":True, "protocol":True, "port":True, "user":True, "password": True,"options":True, "logs":True, "tags":True } questions = [] if edit["host"]: questions.append(inquirer.Text("host", message="Add Hostname or IP", default=defaults["host"])) @@ -1653,6 +2398,10 @@ __pdoc__ = { questions.append(inquirer.Text("logs", message="Pick logging path/file ", default=defaults["logs"].replace("{","{{").replace("}","}}"))) else: profile["logs"] = defaults["logs"] + if edit["tags"]: + questions.append(inquirer.Text("tags", message="Add tags dictionary", validate=self._profile_tags_validation, default=str(defaults["tags"]).replace("{","{{").replace("}","}}"))) + else: + profile["tags"] = defaults["tags"] if edit["user"]: questions.append(inquirer.Text("user", message="Pick username", default=defaults["user"])) else: @@ -1667,6 +2416,8 @@ __pdoc__ = { if "password" in answer.keys(): if answer["password"] != "": answer["password"] = self.encrypt(answer["password"]) + if "tags" in answer.keys() and answer["tags"]: + answer["tags"] = ast.literal_eval(answer["tags"]) result = {**answer, **profile} result["id"] = unique return result @@ -1681,6 +2432,7 @@ __pdoc__ = { questions.append(inquirer.Text("port", message="Select Port Number", validate=self._port_validation)) questions.append(inquirer.Text("options", message="Pass extra options to protocol", validate=self._default_validation)) questions.append(inquirer.Text("logs", message="Pick logging path/file ", validate=self._default_validation)) + questions.append(inquirer.Text("tags", message="Add tags dictionary", validate=self._tags_validation)) questions.append(inquirer.Text("user", message="Pick username", validate=self._default_validation)) questions.append(inquirer.List("password", message="Password: Use a local password, no password or a list of profiles to reference?", choices=["Local Password", "Profiles", "No Password"])) answer = inquirer.prompt(questions) @@ -1698,6 +2450,8 @@ __pdoc__ = { elif answer["password"] == "No Password": answer["password"] = "" answer["type"] = "connection" + if "tags" in answer.keys() and not answer["tags"].startswith("@") and answer["tags"]: + answer["tags"] = ast.literal_eval(answer["tags"]) return answer def _type_node(self, arg_value, pat=re.compile(r"^[0-9a-zA-Z_.$@#-]+$")): @@ -1895,7 +2649,7 @@ tasks:
    -def start(self, argv=['--html', 'connpy', '-o', 'docs', '-f']) +def start(self, argv=['connpy', '--html', '-o', 'docs/', '--force'])

    Parameters:

    @@ -1975,6 +2729,7 @@ tasks: configcrud.add_argument("--keepalive", dest="idletime", nargs=1, action=self._store_type, help="Set keepalive time in seconds, 0 to disable", type=int, metavar="INT") configcrud.add_argument("--completion", dest="completion", nargs=1, choices=["bash","zsh"], action=self._store_type, help="Get terminal completion configuration for conn") configcrud.add_argument("--configfolder", dest="configfolder", nargs=1, action=self._store_type, help="Set the default location for config file", metavar="FOLDER") + configcrud.add_argument("--openai", dest="openai", nargs=2, action=self._store_type, help="Set openai organization and api_key", metavar=("ORGANIZATION", "API_KEY")) configparser.set_defaults(func=self._func_others) #Manage sys arguments commands = ["node", "profile", "mv", "move","copy", "cp", "bulk", "ls", "list", "run", "config", "api"] @@ -1992,7 +2747,7 @@ tasks:
    class node -(unique, host, options='', logs='', password='', port='', protocol='', user='', config='') +(unique, host, options='', logs='', password='', port='', protocol='', user='', config='', tags='')

    This class generates a node object. Containts all the information and methods to connect and interact with a device using ssh or telnet.

    @@ -2032,6 +2787,9 @@ tasks: - config (obj): Pass the object created with class configfile with key for decryption and extra configuration if you are using connection manager. + +- tags (dict) : Tags useful for automation and personal porpuse + like "os", "prompt" and "screenleght_command"
    @@ -2054,7 +2812,7 @@ tasks: ''' - def __init__(self, unique, host, options='', logs='', password='', port='', protocol='', user='', config=''): + def __init__(self, unique, host, options='', logs='', password='', port='', protocol='', user='', config='', tags=''): ''' ### Parameters: @@ -2084,6 +2842,9 @@ tasks: - config (obj): Pass the object created with class configfile with key for decryption and extra configuration if you are using connection manager. + + - tags (dict) : Tags useful for automation and personal porpuse + like "os", "prompt" and "screenleght_command" ''' if config == '': self.idletime = 0 @@ -2092,11 +2853,14 @@ tasks: self.idletime = config.config["idletime"] self.key = config.key self.unique = unique - attr = {"host": host, "logs": logs, "options":options, "port": port, "protocol": protocol, "user": user} + attr = {"host": host, "logs": logs, "options":options, "port": port, "protocol": protocol, "user": user, "tags": tags} for key in attr: - profile = re.search("^@(.*)", attr[key]) + profile = re.search("^@(.*)", str(attr[key])) if profile and config != '': - setattr(self,key,config.profiles[profile.group(1)][key]) + try: + setattr(self,key,config.profiles[profile.group(1)][key]) + except: + setattr(self,key,"") elif attr[key] == '' and key == "protocol": try: setattr(self,key,config.profiles["default"][key]) @@ -2272,11 +3036,15 @@ tasks: connect = self._connect(timeout = timeout) now = datetime.datetime.now().strftime('%Y-%m-%d_%H%M%S') if connect == True: + if "prompt" in self.tags: + prompt = self.tags["prompt"] expects = [prompt, pexpect.EOF, pexpect.TIMEOUT] output = '' status = '' if not isinstance(commands, list): commands = [commands] + if "screen_length_command" in self.tags: + commands.insert(0, self.tags["screen_length_command"]) self.mylog = io.BytesIO() self.child.logfile_read = self.mylog for c in commands: @@ -2354,10 +3122,14 @@ tasks: ''' connect = self._connect(timeout = timeout) if connect == True: + if "prompt" in self.tags: + prompt = self.tags["prompt"] expects = [prompt, pexpect.EOF, pexpect.TIMEOUT] output = '' if not isinstance(commands, list): commands = [commands] + if "screen_length_command" in self.tags: + commands.insert(0, self.tags["screen_length_command"]) self.mylog = io.BytesIO() self.child.logfile_read = self.mylog for c in commands: @@ -2366,25 +3138,26 @@ tasks: result = self.child.expect(expects, timeout = timeout) self.child.sendline(c) if result == 2: - result = 3 break - if not result == 3: - if vars is not None: - expected = expected.format(**vars) - expects = [expected, prompt, pexpect.EOF, pexpect.TIMEOUT] + if not result == 2: result = self.child.expect(expects, timeout = timeout) self.child.close() output = self._logclean(self.mylog.getvalue().decode(), True) self.output = output - if result == 0: - self.result = True - self.status = 0 - return True - if result in [1, 2]: - self.result = False + if result in [0, 1]: + lastcommand = commands[-1] + if vars is not None: + expected = expected.format(**vars) + lastcommand = lastcommand.format(**vars) + last_command_index = output.rfind(lastcommand) + cleaned_output = output[last_command_index + len(lastcommand):].strip() + if expected in cleaned_output: + self.result = True + else: + self.result = False self.status = 0 return False - if result == 3: + if result == 2: self.result = None self.status = 2 return output @@ -2624,11 +3397,15 @@ tasks: connect = self._connect(timeout = timeout) now = datetime.datetime.now().strftime('%Y-%m-%d_%H%M%S') if connect == True: + if "prompt" in self.tags: + prompt = self.tags["prompt"] expects = [prompt, pexpect.EOF, pexpect.TIMEOUT] output = '' status = '' if not isinstance(commands, list): commands = [commands] + if "screen_length_command" in self.tags: + commands.insert(0, self.tags["screen_length_command"]) self.mylog = io.BytesIO() self.child.logfile_read = self.mylog for c in commands: @@ -2747,10 +3524,14 @@ tasks: ''' connect = self._connect(timeout = timeout) if connect == True: + if "prompt" in self.tags: + prompt = self.tags["prompt"] expects = [prompt, pexpect.EOF, pexpect.TIMEOUT] output = '' if not isinstance(commands, list): commands = [commands] + if "screen_length_command" in self.tags: + commands.insert(0, self.tags["screen_length_command"]) self.mylog = io.BytesIO() self.child.logfile_read = self.mylog for c in commands: @@ -2759,25 +3540,26 @@ tasks: result = self.child.expect(expects, timeout = timeout) self.child.sendline(c) if result == 2: - result = 3 break - if not result == 3: - if vars is not None: - expected = expected.format(**vars) - expects = [expected, prompt, pexpect.EOF, pexpect.TIMEOUT] + if not result == 2: result = self.child.expect(expects, timeout = timeout) self.child.close() output = self._logclean(self.mylog.getvalue().decode(), True) self.output = output - if result == 0: - self.result = True - self.status = 0 - return True - if result in [1, 2]: - self.result = False + if result in [0, 1]: + lastcommand = commands[-1] + if vars is not None: + expected = expected.format(**vars) + lastcommand = lastcommand.format(**vars) + last_command_index = output.rfind(lastcommand) + cleaned_output = output[last_command_index + len(lastcommand):].strip() + if expected in cleaned_output: + self.result = True + else: + self.result = False self.status = 0 return False - if result == 3: + if result == 2: self.result = None self.status = 2 return output @@ -3339,11 +4121,21 @@ tasks:
  • Response:
  • -
  • 2. Run Commands
  • Automation module
  • @@ -3359,9 +4152,16 @@ tasks:
  • Classes

    • +

      ai

      + +
    • +
    • configfile

    • diff --git a/requirements.txt b/requirements.txt index 740274c..40f3429 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,8 +1,10 @@ -Flask==2.0.3 -inquirer==3.1.3 -pexpect==4.8.0 -pycryptodome==3.17 -pyfzf==0.3.1 -PyYAML==6.0 -setuptools==67.6.1 -waitress==2.1.2 +Flask>=2.0.3 +inquirer>=3.1.3 +openai>=0.27.4 +pexpect>=4.8.0 +pycryptodome>=3.17 +pyfzf>=0.3.1 +PyYAML>=6.0 +requests>=2.28.2 +setuptools>=67.6.1 +waitress>=2.1.2 diff --git a/setup.cfg b/setup.cfg index 82d7239..983be1d 100644 --- a/setup.cfg +++ b/setup.cfg @@ -31,6 +31,8 @@ install_requires = Flask waitress PyYAML + openai + requests [options.extras_require] fuzzysearch = pyfzf