connpy/connpy/connapp.py
2023-12-01 13:40:49 -03:00

1368 lines
62 KiB
Python
Executable File

#!/usr/bin/env python3
#Imports
import os
import re
from Crypto.PublicKey import RSA
from Crypto.Cipher import PKCS1_OAEP
import ast
import argparse
import sys
import inquirer
from .core import node,nodes
from ._version import __version__
from .api import start_api,stop_api,debug_api
from .ai import ai
import yaml
class NoAliasDumper(yaml.SafeDumper):
def ignore_aliases(self, data):
return True
import ast
from rich import print as mdprint
from rich.markdown import Markdown
try:
from pyfzf.pyfzf import FzfPrompt
except:
FzfPrompt = None
home = os.path.expanduser("~")
defaultdir = home + '/.config/conn'
#functions and classes
class connapp:
''' This class starts the connection manager app. It's normally used by connection manager but you can use it on a script to run the connection manager your way and use a different configfile and key.
'''
def __init__(self, config):
'''
### Parameters:
- config (obj): Object generated with configfile class, it contains
the nodes configuration and the methods to manage
the config file.
'''
self.node = node
self.connnodes = nodes
self.config = config
self.nodes = self.config._getallnodes()
self.folders = self.config._getallfolders()
self.profiles = list(self.config.profiles.keys())
self.case = self.config.config["case"]
try:
self.fzf = self.config.config["fzf"]
except:
self.fzf = False
def start(self,argv = sys.argv[1:]):
'''
### Parameters:
- argv (list): List of arguments to pass to the app.
Default: sys.argv[1:]
'''
#DEFAULTPARSER
defaultparser = argparse.ArgumentParser(prog = "conn", description = "SSH and Telnet connection manager", formatter_class=argparse.RawTextHelpFormatter)
subparsers = defaultparser.add_subparsers(title="Commands")
#NODEPARSER
nodeparser = subparsers.add_parser("node",usage=self._help("usage"), help=self._help("node"),epilog=self._help("end"), formatter_class=argparse.RawTextHelpFormatter)
nodecrud = nodeparser.add_mutually_exclusive_group()
nodeparser.add_argument("node", metavar="node|folder", nargs='?', default=None, action=self._store_type, help=self._help("node"))
nodecrud.add_argument("-v","--version", dest="action", action="store_const", help="Show version", const="version", default="connect")
nodecrud.add_argument("-a","--add", dest="action", action="store_const", help="Add new node[@subfolder][@folder] or [@subfolder]@folder", const="add", default="connect")
nodecrud.add_argument("-r","--del", "--rm", dest="action", action="store_const", help="Delete node[@subfolder][@folder] or [@subfolder]@folder", const="del", default="connect")
nodecrud.add_argument("-e","--mod", "--edit", dest="action", action="store_const", help="Modify node[@subfolder][@folder]", const="mod", default="connect")
nodecrud.add_argument("-s","--show", dest="action", action="store_const", help="Show node[@subfolder][@folder]", const="show", default="connect")
nodecrud.add_argument("-d","--debug", dest="debug", action="store_true", help="Display all conections steps")
nodeparser.add_argument("-t","--sftp", dest="sftp", action="store_true", help="Connects using sftp instead of ssh")
nodeparser.set_defaults(func=self._func_node)
#PROFILEPARSER
profileparser = subparsers.add_parser("profile", help="Manage profiles")
profileparser.add_argument("profile", nargs=1, action=self._store_type, type=self._type_profile, help="Name of profile to manage")
profilecrud = profileparser.add_mutually_exclusive_group(required=True)
profilecrud.add_argument("-a", "--add", dest="action", action="store_const", help="Add new profile", const="add")
profilecrud.add_argument("-r", "--del", "--rm", dest="action", action="store_const", help="Delete profile", const="del")
profilecrud.add_argument("-e", "--mod", "--edit", dest="action", action="store_const", help="Modify profile", const="mod")
profilecrud.add_argument("-s", "--show", dest="action", action="store_const", help="Show profile", const="show")
profileparser.set_defaults(func=self._func_profile)
#MOVEPARSER
moveparser = subparsers.add_parser("move", aliases=["mv"], help="Move node")
moveparser.add_argument("move", nargs=2, action=self._store_type, help="Move node[@subfolder][@folder] dest_node[@subfolder][@folder]", default="move", type=self._type_node)
moveparser.set_defaults(func=self._func_others)
#COPYPARSER
copyparser = subparsers.add_parser("copy", aliases=["cp"], help="Copy node")
copyparser.add_argument("cp", nargs=2, action=self._store_type, help="Copy node[@subfolder][@folder] new_node[@subfolder][@folder]", default="cp", type=self._type_node)
copyparser.set_defaults(func=self._func_others)
#LISTPARSER
lsparser = subparsers.add_parser("list", aliases=["ls"], help="List profiles, nodes or folders")
lsparser.add_argument("ls", action=self._store_type, choices=["profiles","nodes","folders"], help="List profiles, nodes or folders", default=False)
lsparser.add_argument("--filter", nargs=1, help="Filter results")
lsparser.add_argument("--format", nargs=1, help="Format of the output of nodes using {name}, {NAME}, {location}, {LOCATION}, {host} and {HOST}")
lsparser.set_defaults(func=self._func_others)
#BULKPARSER
bulkparser = subparsers.add_parser("bulk", help="Add nodes in bulk")
bulkparser.add_argument("bulk", const="bulk", nargs=0, action=self._store_type, help="Add nodes in bulk")
bulkparser.set_defaults(func=self._func_others)
# EXPORTPARSER
exportparser = subparsers.add_parser("export", help="Export connection folder to Yaml file")
exportparser.add_argument("export", nargs="+", action=self._store_type, help="Export /path/to/file.yml [@subfolder1][@folder1] [@subfolderN][@folderN]")
exportparser.set_defaults(func=self._func_export)
# IMPORTPARSER
importparser = subparsers.add_parser("import", help="Import connection folder to config from Yaml file")
importparser.add_argument("file", nargs=1, action=self._store_type, help="Import /path/to/file.yml")
importparser.set_defaults(func=self._func_import)
# AIPARSER
aiparser = subparsers.add_parser("ai", help="Make request to an AI")
aiparser.add_argument("ask", nargs='*', help="Ask connpy AI something")
aiparser.add_argument("--model", nargs=1, help="Set the OPENAI model id")
aiparser.add_argument("--org", nargs=1, help="Set the OPENAI organization id")
aiparser.add_argument("--api_key", nargs=1, help="Set the OPENAI API key")
aiparser.set_defaults(func=self._func_ai)
#RUNPARSER
runparser = subparsers.add_parser("run", help="Run scripts or commands on nodes", formatter_class=argparse.RawTextHelpFormatter)
runparser.add_argument("run", nargs='+', action=self._store_type, help=self._help("run"), default="run")
runparser.add_argument("-g","--generate", dest="action", action="store_const", help="Generate yaml file template", const="generate", default="run")
runparser.set_defaults(func=self._func_run)
#APIPARSER
apiparser = subparsers.add_parser("api", help="Start and stop connpy api")
apicrud = apiparser.add_mutually_exclusive_group(required=True)
apicrud.add_argument("-s","--start", dest="start", nargs="?", action=self._store_type, help="Start conppy api", type=int, default=8048, metavar="PORT")
apicrud.add_argument("-r","--restart", dest="restart", nargs=0, action=self._store_type, help="Restart conppy api")
apicrud.add_argument("-x","--stop", dest="stop", nargs=0, action=self._store_type, help="Stop conppy api")
apicrud.add_argument("-d", "--debug", dest="debug", nargs="?", action=self._store_type, help="Run connpy server on debug mode", type=int, default=8048, metavar="PORT")
apiparser.set_defaults(func=self._func_api)
#CONFIGPARSER
configparser = subparsers.add_parser("config", help="Manage app config")
configcrud = configparser.add_mutually_exclusive_group(required=True)
configcrud.add_argument("--allow-uppercase", dest="case", nargs=1, action=self._store_type, help="Allow case sensitive names", choices=["true","false"])
configcrud.add_argument("--fzf", dest="fzf", nargs=1, action=self._store_type, help="Use fzf for lists", choices=["true","false"])
configcrud.add_argument("--keepalive", dest="idletime", nargs=1, action=self._store_type, help="Set keepalive time in seconds, 0 to disable", type=int, metavar="INT")
configcrud.add_argument("--completion", dest="completion", nargs=1, choices=["bash","zsh"], action=self._store_type, help="Get terminal completion configuration for conn")
configcrud.add_argument("--configfolder", dest="configfolder", nargs=1, action=self._store_type, help="Set the default location for config file", metavar="FOLDER")
configcrud.add_argument("--openai-org", dest="organization", nargs=1, action=self._store_type, help="Set openai organization", metavar="ORGANIZATION")
configcrud.add_argument("--openai-api-key", dest="api_key", nargs=1, action=self._store_type, help="Set openai api_key", metavar="API_KEY")
configcrud.add_argument("--openai-model", dest="model", nargs=1, action=self._store_type, help="Set openai model", metavar="MODEL")
configparser.set_defaults(func=self._func_others)
#Manage sys arguments
commands = ["node", "profile", "mv", "move","copy", "cp", "bulk", "ls", "list", "run", "config", "api", "ai", "export", "import"]
profilecmds = ["--add", "-a", "--del", "--rm", "-r", "--mod", "--edit", "-e", "--show", "-s"]
if len(argv) >= 2 and argv[1] == "profile" and argv[0] in profilecmds:
argv[1] = argv[0]
argv[0] = "profile"
if len(argv) < 1 or argv[0] not in commands:
argv.insert(0,"node")
args = defaultparser.parse_args(argv)
return args.func(args)
class _store_type(argparse.Action):
#Custom store type for cli app.
def __call__(self, parser, args, values, option_string=None):
setattr(args, "data", values)
delattr(args,self.dest)
setattr(args, "command", self.dest)
def _func_node(self, args):
#Function called when connecting or managing nodes.
if not self.case and args.data != None:
args.data = args.data.lower()
actions = {"version": self._version, "connect": self._connect, "add": self._add, "del": self._del, "mod": self._mod, "show": self._show}
return actions.get(args.action)(args)
def _version(self, args):
print(__version__)
def _connect(self, args):
if args.data == None:
matches = self.nodes
if len(matches) == 0:
print("There are no nodes created")
print("try: conn --help")
exit(9)
else:
if args.data.startswith("@"):
matches = list(filter(lambda k: args.data in k, self.nodes))
else:
matches = list(filter(lambda k: k.startswith(args.data), self.nodes))
if len(matches) == 0:
print("{} not found".format(args.data))
exit(2)
elif len(matches) > 1:
matches[0] = self._choose(matches,"node", "connect")
if matches[0] == None:
exit(7)
node = self.config.getitem(matches[0])
node = self.node(matches[0],**node, config = self.config)
if args.sftp:
node.protocol = "sftp"
if args.debug:
node.interact(debug = True)
else:
node.interact()
def _del(self, args):
if args.data == None:
print("Missing argument node")
exit(3)
elif args.data.startswith("@"):
matches = list(filter(lambda k: k == args.data, self.folders))
else:
matches = self.config._getallnodes(args.data)
if len(matches) == 0:
print("{} not found".format(args.data))
exit(2)
print("Removing: {}".format(matches))
question = [inquirer.Confirm("delete", message="Are you sure you want to continue?")]
confirm = inquirer.prompt(question)
if confirm == None:
exit(7)
if confirm["delete"]:
if args.data.startswith("@"):
uniques = self.config._explode_unique(matches[0])
self.config._folder_del(**uniques)
else:
for node in matches:
nodeuniques = self.config._explode_unique(node)
self.config._connections_del(**nodeuniques)
self.config._saveconfig(self.config.file)
if len(matches) == 1:
print("{} deleted succesfully".format(matches[0]))
else:
print(f"{len(matches)} nodes deleted succesfully")
def _add(self, args):
args.data = self._type_node(args.data)
if args.data == None:
print("Missing argument node")
exit(3)
elif args.data.startswith("@"):
type = "folder"
matches = list(filter(lambda k: k == args.data, self.folders))
reversematches = list(filter(lambda k: "@" + k == args.data, self.nodes))
else:
type = "node"
matches = list(filter(lambda k: k == args.data, self.nodes))
reversematches = list(filter(lambda k: k == "@" + args.data, self.folders))
if len(matches) > 0:
print("{} already exist".format(matches[0]))
exit(4)
if len(reversematches) > 0:
print("{} already exist".format(reversematches[0]))
exit(4)
else:
if type == "folder":
uniques = self.config._explode_unique(args.data)
if uniques == False:
print("Invalid folder {}".format(args.data))
exit(5)
if "subfolder" in uniques.keys():
parent = "@" + uniques["folder"]
if parent not in self.folders:
print("Folder {} not found".format(uniques["folder"]))
exit(2)
self.config._folder_add(**uniques)
self.config._saveconfig(self.config.file)
print("{} added succesfully".format(args.data))
if type == "node":
nodefolder = args.data.partition("@")
nodefolder = "@" + nodefolder[2]
if nodefolder not in self.folders and nodefolder != "@":
print(nodefolder + " not found")
exit(2)
uniques = self.config._explode_unique(args.data)
if uniques == False:
print("Invalid node {}".format(args.data))
exit(5)
print("You can use the configured setting in a profile using @profilename.")
print("You can also leave empty any value except hostname/IP.")
print("You can pass 1 or more passwords using comma separated @profiles")
print("You can use this variables on logging file name: ${id} ${unique} ${host} ${port} ${user} ${protocol}")
print("Some useful tags to set for automation are 'os', 'screen_length_command', and 'prompt'.")
newnode = self._questions_nodes(args.data, uniques)
if newnode == False:
exit(7)
self.config._connections_add(**newnode)
self.config._saveconfig(self.config.file)
print("{} added succesfully".format(args.data))
def _show(self, args):
if args.data == None:
print("Missing argument node")
exit(3)
matches = list(filter(lambda k: k == args.data, self.nodes))
if len(matches) == 0:
print("{} not found".format(args.data))
exit(2)
node = self.config.getitem(matches[0])
for k, v in node.items():
if isinstance(v, str):
print(k + ": " + v)
elif isinstance(v, list):
print(k + ":")
for i in v:
print(" - " + i)
elif isinstance(v, dict):
print(k + ":")
for i,d in v.items():
print(" - " + i + ": " + d)
def _mod(self, args):
if args.data == None:
print("Missing argument node")
exit(3)
matches = self.config._getallnodes(args.data)
if len(matches) == 0:
print("No connection found with filter: {}".format(args.data))
exit(2)
elif len(matches) == 1:
uniques = self.config._explode_unique(args.data)
unique = matches[0]
else:
uniques = {"id": None, "folder": None}
unique = None
print("Editing: {}".format(matches))
node = {}
for i in matches:
node[i] = self.config.getitem(i)
edits = self._questions_edit()
if edits == None:
exit(7)
updatenode = self._questions_nodes(unique, uniques, edit=edits)
if not updatenode:
exit(7)
if len(matches) == 1:
uniques.update(node[matches[0]])
uniques["type"] = "connection"
if sorted(updatenode.items()) == sorted(uniques.items()):
print("Nothing to do here")
return
else:
self.config._connections_add(**updatenode)
self.config._saveconfig(self.config.file)
print("{} edited succesfully".format(args.data))
else:
for k in node:
updatednode = self.config._explode_unique(k)
updatednode["type"] = "connection"
updatednode.update(node[k])
editcount = 0
for key, should_edit in edits.items():
if should_edit:
editcount += 1
updatednode[key] = updatenode[key]
if not editcount:
print("Nothing to do here")
return
else:
self.config._connections_add(**updatednode)
self.config._saveconfig(self.config.file)
print("{} edited succesfully".format(matches))
return
def _func_profile(self, args):
#Function called when managing profiles
if not self.case:
args.data[0] = args.data[0].lower()
actions = {"add": self._profile_add, "del": self._profile_del, "mod": self._profile_mod, "show": self._profile_show}
return actions.get(args.action)(args)
def _profile_del(self, args):
matches = list(filter(lambda k: k == args.data[0], self.profiles))
if len(matches) == 0:
print("{} not found".format(args.data[0]))
exit(2)
if matches[0] == "default":
print("Can't delete default profile")
exit(6)
usedprofile = self.config._profileused(matches[0])
if len(usedprofile) > 0:
print("Profile {} used in the following nodes:".format(matches[0]))
print(", ".join(usedprofile))
exit(8)
question = [inquirer.Confirm("delete", message="Are you sure you want to delete {}?".format(matches[0]))]
confirm = inquirer.prompt(question)
if confirm["delete"]:
self.config._profiles_del(id = matches[0])
self.config._saveconfig(self.config.file)
print("{} deleted succesfully".format(matches[0]))
def _profile_show(self, args):
matches = list(filter(lambda k: k == args.data[0], self.profiles))
if len(matches) == 0:
print("{} not found".format(args.data[0]))
exit(2)
profile = self.config.profiles[matches[0]]
for k, v in profile.items():
if isinstance(v, str):
print(k + ": " + v)
elif isinstance(v, list):
print(k + ":")
for i in v:
print(" - " + i)
elif isinstance(v, dict):
print(k + ":")
for i,d in v.items():
print(" - " + i + ": " + d)
def _profile_add(self, args):
matches = list(filter(lambda k: k == args.data[0], self.profiles))
if len(matches) > 0:
print("Profile {} Already exist".format(matches[0]))
exit(4)
newprofile = self._questions_profiles(args.data[0])
if newprofile == False:
exit(7)
self.config._profiles_add(**newprofile)
self.config._saveconfig(self.config.file)
print("{} added succesfully".format(args.data[0]))
def _profile_mod(self, args):
matches = list(filter(lambda k: k == args.data[0], self.profiles))
if len(matches) == 0:
print("{} not found".format(args.data[0]))
exit(2)
profile = self.config.profiles[matches[0]]
oldprofile = {"id": matches[0]}
oldprofile.update(profile)
edits = self._questions_edit()
if edits == None:
exit(7)
updateprofile = self._questions_profiles(matches[0], edit=edits)
if not updateprofile:
exit(7)
if sorted(updateprofile.items()) == sorted(oldprofile.items()):
print("Nothing to do here")
return
else:
self.config._profiles_add(**updateprofile)
self.config._saveconfig(self.config.file)
print("{} edited succesfully".format(args.data[0]))
def _func_others(self, args):
#Function called when using other commands
actions = {"ls": self._ls, "move": self._mvcp, "cp": self._mvcp, "bulk": self._bulk, "completion": self._completion, "case": self._case, "fzf": self._fzf, "idletime": self._idletime, "configfolder": self._configfolder, "organization": self._openai, "api_key": self._openai, "model": self._openai}
return actions.get(args.command)(args)
def _ls(self, args):
items = getattr(self, args.data)
if args.filter:
items = [ item for item in items if re.search(args.filter[0], item)]
if args.format and args.data == "nodes":
newitems = []
for i in items:
formated = {}
info = self.config.getitem(i)
if "@" in i:
name_part, location_part = i.split("@", 1)
formated["location"] = "@" + location_part
else:
name_part = i
formated["location"] = ""
formated["name"] = name_part
formated["host"] = info["host"]
items_copy = list(formated.items())
for key, value in items_copy:
upper_key = key.upper()
upper_value = value.upper()
formated[upper_key] = upper_value
newitems.append(args.format[0].format(**formated))
items = newitems
print(*items, sep="\n")
def _mvcp(self, args):
if not self.case:
args.data[0] = args.data[0].lower()
args.data[1] = args.data[1].lower()
source = list(filter(lambda k: k == args.data[0], self.nodes))
dest = list(filter(lambda k: k == args.data[1], self.nodes))
if len(source) != 1:
print("{} not found".format(args.data[0]))
exit(2)
if len(dest) > 0:
print("Node {} Already exist".format(args.data[1]))
exit(4)
nodefolder = args.data[1].partition("@")
nodefolder = "@" + nodefolder[2]
if nodefolder not in self.folders and nodefolder != "@":
print("{} not found".format(nodefolder))
exit(2)
olduniques = self.config._explode_unique(args.data[0])
newuniques = self.config._explode_unique(args.data[1])
if newuniques == False:
print("Invalid node {}".format(args.data[1]))
exit(5)
node = self.config.getitem(source[0])
newnode = {**newuniques, **node}
self.config._connections_add(**newnode)
if args.command == "move":
self.config._connections_del(**olduniques)
self.config._saveconfig(self.config.file)
action = "moved" if args.command == "move" else "copied"
print("{} {} succesfully to {}".format(args.data[0],action, args.data[1]))
def _bulk(self, args):
newnodes = self._questions_bulk()
if newnodes == False:
exit(7)
if not self.case:
newnodes["location"] = newnodes["location"].lower()
newnodes["ids"] = newnodes["ids"].lower()
ids = newnodes["ids"].split(",")
hosts = newnodes["host"].split(",")
count = 0
for n in ids:
unique = n + newnodes["location"]
matches = list(filter(lambda k: k == unique, self.nodes))
reversematches = list(filter(lambda k: k == "@" + unique, self.folders))
if len(matches) > 0:
print("Node {} already exist, ignoring it".format(unique))
continue
if len(reversematches) > 0:
print("Folder with name {} already exist, ignoring it".format(unique))
continue
newnode = {"id": n}
if newnodes["location"] != "":
location = self.config._explode_unique(newnodes["location"])
newnode.update(location)
if len(hosts) > 1:
index = ids.index(n)
newnode["host"] = hosts[index]
else:
newnode["host"] = hosts[0]
newnode["protocol"] = newnodes["protocol"]
newnode["port"] = newnodes["port"]
newnode["options"] = newnodes["options"]
newnode["logs"] = newnodes["logs"]
newnode["tags"] = newnodes["tags"]
newnode["user"] = newnodes["user"]
newnode["password"] = newnodes["password"]
count +=1
self.config._connections_add(**newnode)
self.nodes = self.config._getallnodes()
if count > 0:
self.config._saveconfig(self.config.file)
print("Succesfully added {} nodes".format(count))
else:
print("0 nodes added")
def _completion(self, args):
if args.data[0] == "bash":
print(self._help("bashcompletion"))
elif args.data[0] == "zsh":
print(self._help("zshcompletion"))
def _case(self, args):
if args.data[0] == "true":
args.data[0] = True
elif args.data[0] == "false":
args.data[0] = False
self._change_settings(args.command, args.data[0])
def _fzf(self, args):
if args.data[0] == "true":
args.data[0] = True
elif args.data[0] == "false":
args.data[0] = False
self._change_settings(args.command, args.data[0])
def _idletime(self, args):
if args.data[0] < 0:
args.data[0] = 0
self._change_settings(args.command, args.data[0])
def _configfolder(self, args):
if not os.path.isdir(args.data[0]):
raise argparse.ArgumentTypeError(f"readable_dir:{args.data[0]} is not a valid path")
else:
pathfile = defaultdir + "/.folder"
folder = os.path.abspath(args.data[0]).rstrip('/')
with open(pathfile, "w") as f:
f.write(str(folder))
print("Config saved")
def _openai(self, args):
if "openai" in self.config.config:
openaikeys = self.config.config["openai"]
else:
openaikeys = {}
openaikeys[args.command] = args.data[0]
self._change_settings("openai", openaikeys)
def _change_settings(self, name, value):
self.config.config[name] = value
self.config._saveconfig(self.config.file)
print("Config saved")
def _func_import(self, args):
if not os.path.exists(args.data[0]):
print("File {} dosn't exists".format(args.data[0]))
exit(14)
print("This could overwrite your current configuration!")
question = [inquirer.Confirm("import", message="Are you sure you want to import {} file?".format(args.data[0]))]
confirm = inquirer.prompt(question)
if confirm == None:
exit(7)
if confirm["import"]:
try:
with open(args.data[0]) as file:
imported = yaml.load(file, Loader=yaml.FullLoader)
except:
print("failed reading file {}".format(args.data[0]))
exit(10)
for k,v in imported.items():
uniques = self.config._explode_unique(k)
if "folder" in uniques:
folder = f"@{uniques['folder']}"
matches = list(filter(lambda k: k == folder, self.folders))
if len(matches) == 0:
uniquefolder = self.config._explode_unique(folder)
self.config._folder_add(**uniquefolder)
if "subfolder" in uniques:
subfolder = f"@{uniques['subfolder']}@{uniques['folder']}"
matches = list(filter(lambda k: k == subfolder, self.folders))
if len(matches) == 0:
uniquesubfolder = self.config._explode_unique(subfolder)
self.config._folder_add(**uniquesubfolder)
uniques.update(v)
self.config._connections_add(**uniques)
self.config._saveconfig(self.config.file)
print("File {} imported succesfully".format(args.data[0]))
return
def _func_export(self, args):
if os.path.exists(args.data[0]):
print("File {} already exists".format(args.data[0]))
exit(14)
if len(args.data[1:]) == 0:
foldercons = self.config._getallnodesfull(extract = False)
else:
for folder in args.data[1:]:
matches = list(filter(lambda k: k == folder, self.folders))
if len(matches) == 0 and folder != "@":
print("{} folder not found".format(folder))
exit(2)
foldercons = self.config._getallnodesfull(args.data[1:], extract = False)
with open(args.data[0], "w") as file:
yaml.dump(foldercons, file, Dumper=NoAliasDumper, default_flow_style=False)
file.close()
print("File {} generated succesfully".format(args.data[0]))
exit()
return
def _func_run(self, args):
if len(args.data) > 1:
args.action = "noderun"
actions = {"noderun": self._node_run, "generate": self._yaml_generate, "run": self._yaml_run}
return actions.get(args.action)(args)
def _func_ai(self, args):
arguments = {}
if args.model:
arguments["model"] = args.model[0]
if args.org:
arguments["org"] = args.org[0]
if args.api_key:
arguments["api_key"] = args.api_key[0]
self.myai = ai(self.config, **arguments)
if args.ask:
input = " ".join(args.ask)
request = self.myai.ask(input, dryrun = True)
if not request["app_related"]:
mdprint(Markdown(request["response"]))
print("\r")
else:
if request["action"] == "list_nodes":
if request["filter"]:
nodes = self.config._getallnodes(request["filter"])
else:
nodes = self.config._getallnodes()
list = "\n".join(nodes)
print(list)
else:
yaml_data = yaml.dump(request["task"])
confirmation = f"I'm going to run the following task:\n```{yaml_data}```"
mdprint(Markdown(confirmation))
question = [inquirer.Confirm("task", message="Are you sure you want to continue?")]
print("\r")
confirm = inquirer.prompt(question)
if confirm == None:
exit(7)
if confirm["task"]:
script = {}
script["name"] = "RESULT"
script["output"] = "stdout"
script["nodes"] = request["nodes"]
script["action"] = request["action"]
if "expected" in request:
script["expected"] = request["expected"]
script.update(request["args"])
self._cli_run(script)
else:
history = None
mdprint(Markdown("**Chatbot**: Hi! How can I help you today?\n\n---"))
while True:
questions = [
inquirer.Text('message', message="User", validate=self._ai_validation),
]
answers = inquirer.prompt(questions)
if answers == None:
exit(7)
response, history = self._process_input(answers["message"], history)
mdprint(Markdown(f"""**Chatbot**:\n{response}\n\n---"""))
return
def _ai_validation(self, answers, current, regex = "^.+$"):
#Validate ai user chat.
if not re.match(regex, current):
raise inquirer.errors.ValidationError("", reason="Can't send empty messages")
return True
def _process_input(self, input, history):
response = self.myai.ask(input , chat_history = history, dryrun = True)
if not response["app_related"]:
try:
if not history:
history = []
history.extend(response["chat_history"])
except:
if not history:
history = None
return response["response"], history
else:
history = None
if response["action"] == "list_nodes":
if response["filter"]:
nodes = self.config._getallnodes(response["filter"])
else:
nodes = self.config._getallnodes()
list = "\n".join(nodes)
response = f"```{list}\n```"
else:
yaml_data = yaml.dump(response["task"])
confirmresponse = f"I'm going to run the following task:\n```{yaml_data}```\nPlease confirm"
while True:
mdprint(Markdown(f"""**Chatbot**:\n{confirmresponse}"""))
questions = [
inquirer.Text('message', message="User", validate=self._ai_validation),
]
answers = inquirer.prompt(questions)
if answers == None:
exit(7)
confirmation = self.myai.confirm(answers["message"])
if isinstance(confirmation, bool):
if not confirmation:
response = "Request cancelled"
else:
nodes = self.connnodes(self.config.getitems(response["nodes"]), config = self.config)
if response["action"] == "run":
output = nodes.run(**response["args"])
response = ""
elif response["action"] == "test":
result = nodes.test(**response["args"])
yaml_result = yaml.dump(result,default_flow_style=False, indent=4)
output = nodes.output
response = f"This is the result for your test:\n```\n{yaml_result}\n```"
for k,v in output.items():
response += f"\n***{k}***:\n```\n{v}\n```\n"
break
return response, history
def _func_api(self, args):
if args.command == "stop" or args.command == "restart":
args.data = stop_api()
if args.command == "start" or args.command == "restart":
if args.data:
start_api(args.data)
else:
start_api()
if args.command == "debug":
if args.data:
debug_api(args.data)
else:
debug_api()
return
def _node_run(self, args):
command = " ".join(args.data[1:])
script = {}
script["name"] = "Output"
script["action"] = "run"
script["nodes"] = args.data[0]
script["commands"] = [command]
script["output"] = "stdout"
self._cli_run(script)
def _yaml_generate(self, args):
if os.path.exists(args.data[0]):
print("File {} already exists".format(args.data[0]))
exit(14)
else:
with open(args.data[0], "w") as file:
file.write(self._help("generate"))
file.close()
print("File {} generated succesfully".format(args.data[0]))
exit()
def _yaml_run(self, args):
try:
with open(args.data[0]) as file:
scripts = yaml.load(file, Loader=yaml.FullLoader)
except:
print("failed reading file {}".format(args.data[0]))
exit(10)
for script in scripts["tasks"]:
self._cli_run(script)
def _cli_run(self, script):
args = {}
try:
action = script["action"]
nodelist = script["nodes"]
args["commands"] = script["commands"]
output = script["output"]
if action == "test":
args["expected"] = script["expected"]
except KeyError as e:
print("'{}' is mandatory".format(e.args[0]))
exit(11)
nodes = self.config._getallnodes(nodelist)
if len(nodes) == 0:
print("{} don't match any node".format(nodelist))
exit(2)
nodes = self.connnodes(self.config.getitems(nodes), config = self.config)
stdout = False
if output is None:
pass
elif output == "stdout":
stdout = True
elif isinstance(output, str) and action == "run":
args["folder"] = output
if "variables" in script:
args["vars"] = script["variables"]
if "vars" in script:
args["vars"] = script["vars"]
try:
options = script["options"]
thisoptions = {k: v for k, v in options.items() if k in ["prompt", "parallel", "timeout"]}
args.update(thisoptions)
except:
options = None
try:
size = str(os.get_terminal_size())
p = re.search(r'.*columns=([0-9]+)', size)
columns = int(p.group(1))
except:
columns = 80
if action == "run":
nodes.run(**args)
print(script["name"].upper() + "-" * (columns - len(script["name"])))
for i in nodes.status.keys():
print(" " + i + " " + "-" * (columns - len(i) - 13) + (" PASS(0)" if nodes.status[i] == 0 else " FAIL({})".format(nodes.status[i])))
if stdout:
for line in nodes.output[i].splitlines():
print(" " + line)
elif action == "test":
nodes.test(**args)
print(script["name"].upper() + "-" * (columns - len(script["name"])))
for i in nodes.status.keys():
print(" " + i + " " + "-" * (columns - len(i) - 13) + (" PASS(0)" if nodes.status[i] == 0 else " FAIL({})".format(nodes.status[i])))
if nodes.status[i] == 0:
max_length = max(len(s) for s in nodes.result[i].keys())
for k,v in nodes.result[i].items():
print(" TEST for '{}'".format(k) + " "*(max_length - len(k) + 1) + "--> " + str(v).upper())
if stdout:
if nodes.status[i] == 0:
print(" " + "-" * (max_length + 21))
for line in nodes.output[i].splitlines():
print(" " + line)
else:
print("Wrong action '{}'".format(action))
exit(13)
def _choose(self, list, name, action):
#Generates an inquirer list to pick
if FzfPrompt and self.fzf:
fzf = FzfPrompt(executable_path="fzf-tmux")
if not self.case:
fzf = FzfPrompt(executable_path="fzf-tmux -i")
answer = fzf.prompt(list, fzf_options="-d 25%")
if len(answer) == 0:
return
else:
return answer[0]
else:
questions = [inquirer.List(name, message="Pick {} to {}:".format(name,action), choices=list, carousel=True)]
answer = inquirer.prompt(questions)
if answer == None:
return
else:
return answer[name]
def _host_validation(self, answers, current, regex = "^.+$"):
#Validate hostname in inquirer when managing nodes
if not re.match(regex, current):
raise inquirer.errors.ValidationError("", reason="Host cannot be empty")
if current.startswith("@"):
if current[1:] not in self.profiles:
raise inquirer.errors.ValidationError("", reason="Profile {} don't exist".format(current))
return True
def _profile_protocol_validation(self, answers, current, regex = "(^ssh$|^telnet$|^$)"):
#Validate protocol in inquirer when managing profiles
if not re.match(regex, current):
raise inquirer.errors.ValidationError("", reason="Pick between ssh, telnet or leave empty")
return True
def _protocol_validation(self, answers, current, regex = "(^ssh$|^telnet$|^$|^@.+$)"):
#Validate protocol in inquirer when managing nodes
if not re.match(regex, current):
raise inquirer.errors.ValidationError("", reason="Pick between ssh, telnet, leave empty or @profile")
if current.startswith("@"):
if current[1:] not in self.profiles:
raise inquirer.errors.ValidationError("", reason="Profile {} don't exist".format(current))
return True
def _profile_port_validation(self, answers, current, regex = "(^[0-9]*$)"):
#Validate port in inquirer when managing profiles
if not re.match(regex, current):
raise inquirer.errors.ValidationError("", reason="Pick a port between 1-65535, @profile o leave empty")
try:
port = int(current)
except:
port = 0
if current != "" and not 1 <= int(port) <= 65535:
raise inquirer.errors.ValidationError("", reason="Pick a port between 1-65535 or leave empty")
return True
def _port_validation(self, answers, current, regex = "(^[0-9]*$|^@.+$)"):
#Validate port in inquirer when managing nodes
if not re.match(regex, current):
raise inquirer.errors.ValidationError("", reason="Pick a port between 1-65535, @profile or leave empty")
try:
port = int(current)
except:
port = 0
if current.startswith("@"):
if current[1:] not in self.profiles:
raise inquirer.errors.ValidationError("", reason="Profile {} don't exist".format(current))
elif current != "" and not 1 <= int(port) <= 65535:
raise inquirer.errors.ValidationError("", reason="Pick a port between 1-65535, @profile o leave empty")
return True
def _pass_validation(self, answers, current, regex = "(^@.+$)"):
#Validate password in inquirer
profiles = current.split(",")
for i in profiles:
if not re.match(regex, i) or i[1:] not in self.profiles:
raise inquirer.errors.ValidationError("", reason="Profile {} don't exist".format(i))
return True
def _tags_validation(self, answers, current):
#Validation for Tags in inquirer when managing nodes
if current.startswith("@"):
if current[1:] not in self.profiles:
raise inquirer.errors.ValidationError("", reason="Profile {} don't exist".format(current))
elif current != "":
isdict = False
try:
isdict = ast.literal_eval(current)
except:
pass
if not isinstance (isdict, dict):
raise inquirer.errors.ValidationError("", reason="Tags should be a python dictionary.".format(current))
return True
def _profile_tags_validation(self, answers, current):
#Validation for Tags in inquirer when managing profiles
if current != "":
isdict = False
try:
isdict = ast.literal_eval(current)
except:
pass
if not isinstance (isdict, dict):
raise inquirer.errors.ValidationError("", reason="Tags should be a python dictionary.".format(current))
return True
def _default_validation(self, answers, current):
#Default validation type used in multiples questions in inquirer
if current.startswith("@"):
if current[1:] not in self.profiles:
raise inquirer.errors.ValidationError("", reason="Profile {} don't exist".format(current))
return True
def _bulk_node_validation(self, answers, current, regex = "^[0-9a-zA-Z_.,$#-]+$"):
#Validation of nodes when running bulk command
if not re.match(regex, current):
raise inquirer.errors.ValidationError("", reason="Host cannot be empty")
if current.startswith("@"):
if current[1:] not in self.profiles:
raise inquirer.errors.ValidationError("", reason="Profile {} don't exist".format(current))
return True
def _bulk_folder_validation(self, answers, current):
#Validation of folders when running bulk command
if not self.case:
current = current.lower()
matches = list(filter(lambda k: k == current, self.folders))
if current != "" and len(matches) == 0:
raise inquirer.errors.ValidationError("", reason="Location {} don't exist".format(current))
return True
def _bulk_host_validation(self, answers, current, regex = "^.+$"):
#Validate hostname when running bulk command
if not re.match(regex, current):
raise inquirer.errors.ValidationError("", reason="Host cannot be empty")
if current.startswith("@"):
if current[1:] not in self.profiles:
raise inquirer.errors.ValidationError("", reason="Profile {} don't exist".format(current))
hosts = current.split(",")
nodes = answers["ids"].split(",")
if len(hosts) > 1 and len(hosts) != len(nodes):
raise inquirer.errors.ValidationError("", reason="Hosts list should be the same length of nodes list")
return True
def _questions_edit(self):
#Inquirer questions when editing nodes or profiles
questions = []
questions.append(inquirer.Confirm("host", message="Edit Hostname/IP?"))
questions.append(inquirer.Confirm("protocol", message="Edit Protocol?"))
questions.append(inquirer.Confirm("port", message="Edit Port?"))
questions.append(inquirer.Confirm("options", message="Edit Options?"))
questions.append(inquirer.Confirm("logs", message="Edit logging path/file?"))
questions.append(inquirer.Confirm("tags", message="Edit tags?"))
questions.append(inquirer.Confirm("user", message="Edit User?"))
questions.append(inquirer.Confirm("password", message="Edit password?"))
answers = inquirer.prompt(questions)
return answers
def _questions_nodes(self, unique, uniques = None, edit = None):
#Questions when adding or editing nodes
try:
defaults = self.config.getitem(unique)
if "tags" not in defaults:
defaults["tags"] = ""
except:
defaults = { "host":"", "protocol":"", "port":"", "user":"", "options":"", "logs":"" , "tags":"", "password":""}
node = {}
if edit == None:
edit = { "host":True, "protocol":True, "port":True, "user":True, "password": True,"options":True, "logs":True, "tags":True }
questions = []
if edit["host"]:
questions.append(inquirer.Text("host", message="Add Hostname or IP", validate=self._host_validation, default=defaults["host"]))
else:
node["host"] = defaults["host"]
if edit["protocol"]:
questions.append(inquirer.Text("protocol", message="Select Protocol", validate=self._protocol_validation, default=defaults["protocol"]))
else:
node["protocol"] = defaults["protocol"]
if edit["port"]:
questions.append(inquirer.Text("port", message="Select Port Number", validate=self._port_validation, default=defaults["port"]))
else:
node["port"] = defaults["port"]
if edit["options"]:
questions.append(inquirer.Text("options", message="Pass extra options to protocol", validate=self._default_validation, default=defaults["options"]))
else:
node["options"] = defaults["options"]
if edit["logs"]:
questions.append(inquirer.Text("logs", message="Pick logging path/file ", validate=self._default_validation, default=defaults["logs"].replace("{","{{").replace("}","}}")))
else:
node["logs"] = defaults["logs"]
if edit["tags"]:
questions.append(inquirer.Text("tags", message="Add tags dictionary", validate=self._tags_validation, default=str(defaults["tags"]).replace("{","{{").replace("}","}}")))
else:
node["tags"] = defaults["tags"]
if edit["user"]:
questions.append(inquirer.Text("user", message="Pick username", validate=self._default_validation, default=defaults["user"]))
else:
node["user"] = defaults["user"]
if edit["password"]:
questions.append(inquirer.List("password", message="Password: Use a local password, no password or a list of profiles to reference?", choices=["Local Password", "Profiles", "No Password"]))
else:
node["password"] = defaults["password"]
answer = inquirer.prompt(questions)
if answer == None:
return False
if "password" in answer.keys():
if answer["password"] == "Local Password":
passq = [inquirer.Password("password", message="Set Password")]
passa = inquirer.prompt(passq)
if passa == None:
return False
answer["password"] = self.encrypt(passa["password"])
elif answer["password"] == "Profiles":
passq = [(inquirer.Text("password", message="Set a @profile or a comma separated list of @profiles", validate=self._pass_validation))]
passa = inquirer.prompt(passq)
if passa == None:
return False
answer["password"] = passa["password"].split(",")
elif answer["password"] == "No Password":
answer["password"] = ""
if "tags" in answer.keys() and not answer["tags"].startswith("@") and answer["tags"]:
answer["tags"] = ast.literal_eval(answer["tags"])
result = {**uniques, **answer, **node}
result["type"] = "connection"
return result
def _questions_profiles(self, unique, edit = None):
#Questions when adding or editing profiles
try:
defaults = self.config.profiles[unique]
if "tags" not in defaults:
defaults["tags"] = ""
except:
defaults = { "host":"", "protocol":"", "port":"", "user":"", "options":"", "logs":"", "tags": "" }
profile = {}
if edit == None:
edit = { "host":True, "protocol":True, "port":True, "user":True, "password": True,"options":True, "logs":True, "tags":True }
questions = []
if edit["host"]:
questions.append(inquirer.Text("host", message="Add Hostname or IP", default=defaults["host"]))
else:
profile["host"] = defaults["host"]
if edit["protocol"]:
questions.append(inquirer.Text("protocol", message="Select Protocol", validate=self._profile_protocol_validation, default=defaults["protocol"]))
else:
profile["protocol"] = defaults["protocol"]
if edit["port"]:
questions.append(inquirer.Text("port", message="Select Port Number", validate=self._profile_port_validation, default=defaults["port"]))
else:
profile["port"] = defaults["port"]
if edit["options"]:
questions.append(inquirer.Text("options", message="Pass extra options to protocol", default=defaults["options"]))
else:
profile["options"] = defaults["options"]
if edit["logs"]:
questions.append(inquirer.Text("logs", message="Pick logging path/file ", default=defaults["logs"].replace("{","{{").replace("}","}}")))
else:
profile["logs"] = defaults["logs"]
if edit["tags"]:
questions.append(inquirer.Text("tags", message="Add tags dictionary", validate=self._profile_tags_validation, default=str(defaults["tags"]).replace("{","{{").replace("}","}}")))
else:
profile["tags"] = defaults["tags"]
if edit["user"]:
questions.append(inquirer.Text("user", message="Pick username", default=defaults["user"]))
else:
profile["user"] = defaults["user"]
if edit["password"]:
questions.append(inquirer.Password("password", message="Set Password"))
else:
profile["password"] = defaults["password"]
answer = inquirer.prompt(questions)
if answer == None:
return False
if "password" in answer.keys():
if answer["password"] != "":
answer["password"] = self.encrypt(answer["password"])
if "tags" in answer.keys() and answer["tags"]:
answer["tags"] = ast.literal_eval(answer["tags"])
result = {**answer, **profile}
result["id"] = unique
return result
def _questions_bulk(self):
#Questions when using bulk command
questions = []
questions.append(inquirer.Text("ids", message="add a comma separated list of nodes to add", validate=self._bulk_node_validation))
questions.append(inquirer.Text("location", message="Add a @folder, @subfolder@folder or leave empty", validate=self._bulk_folder_validation))
questions.append(inquirer.Text("host", message="Add comma separated list of Hostnames or IPs", validate=self._bulk_host_validation))
questions.append(inquirer.Text("protocol", message="Select Protocol", validate=self._protocol_validation))
questions.append(inquirer.Text("port", message="Select Port Number", validate=self._port_validation))
questions.append(inquirer.Text("options", message="Pass extra options to protocol", validate=self._default_validation))
questions.append(inquirer.Text("logs", message="Pick logging path/file ", validate=self._default_validation))
questions.append(inquirer.Text("tags", message="Add tags dictionary", validate=self._tags_validation))
questions.append(inquirer.Text("user", message="Pick username", validate=self._default_validation))
questions.append(inquirer.List("password", message="Password: Use a local password, no password or a list of profiles to reference?", choices=["Local Password", "Profiles", "No Password"]))
answer = inquirer.prompt(questions)
if answer == None:
return False
if "password" in answer.keys():
if answer["password"] == "Local Password":
passq = [inquirer.Password("password", message="Set Password")]
passa = inquirer.prompt(passq)
answer["password"] = self.encrypt(passa["password"])
elif answer["password"] == "Profiles":
passq = [(inquirer.Text("password", message="Set a @profile or a comma separated list of @profiles", validate=self._pass_validation))]
passa = inquirer.prompt(passq)
answer["password"] = passa["password"].split(",")
elif answer["password"] == "No Password":
answer["password"] = ""
answer["type"] = "connection"
if "tags" in answer.keys() and not answer["tags"].startswith("@") and answer["tags"]:
answer["tags"] = ast.literal_eval(answer["tags"])
return answer
def _type_node(self, arg_value, pat=re.compile(r"^[0-9a-zA-Z_.$@#-]+$")):
if not pat.match(arg_value):
raise ValueError(f"Argument error: {arg_value}")
return arg_value
def _type_profile(self, arg_value, pat=re.compile(r"^[0-9a-zA-Z_.$#-]+$")):
if not pat.match(arg_value):
raise ValueError
return arg_value
def _help(self, type):
#Store text for help and other commands
if type == "node":
return "node[@subfolder][@folder]\nConnect to specific node or show all matching nodes\n[@subfolder][@folder]\nShow all available connections globally or in specified path"
if type == "usage":
return "conn [-h] [--add | --del | --mod | --show | --debug] [node|folder] [--sftp]\n conn {profile,move,copy,list,bulk,export,import,run,config,api,ai} ..."
if type == "end":
return "Commands:\n profile Manage profiles\n move (mv) Move node\n copy (cp) Copy node\n list (ls) List profiles, nodes or folders\n bulk Add nodes in bulk\n export Export connection folder to Yaml file\n import Import connection folder to config from Yaml file\n run Run scripts or commands on nodes\n config Manage app config\n api Start and stop connpy api\n ai Make request to an AI"
if type == "bashcompletion":
return '''
#Here starts bash completion for conn
_conn()
{
mapfile -t strings < <(connpy-completion-helper "bash" "${#COMP_WORDS[@]}" "${COMP_WORDS[@]}")
local IFS=$'\t\n'
local home_dir=$(eval echo ~)
local last_word=${COMP_WORDS[-1]/\~/$home_dir}
COMPREPLY=($(compgen -W "$(printf '%s' "${strings[@]}")" -- "$last_word"))
if [ "$last_word" != "${COMP_WORDS[-1]}" ]; then
COMPREPLY=(${COMPREPLY[@]/$home_dir/\~})
fi
}
complete -o nospace -o nosort -F _conn conn
complete -o nospace -o nosort -F _conn connpy
#Here ends bash completion for conn
'''
if type == "zshcompletion":
return '''
#Here starts zsh completion for conn
autoload -U compinit && compinit
_conn()
{
local home_dir=$(eval echo ~)
last_word=${words[-1]/\~/$home_dir}
strings=($(connpy-completion-helper "zsh" ${#words} $words[1,-2] $last_word))
for string in "${strings[@]}"; do
#Replace the expanded home directory with ~
if [ "$last_word" != "$words[-1]" ]; then
string=${string/$home_dir/\~}
fi
if [[ "${string}" =~ .*/$ ]]; then
# If the string ends with a '/', do not append a space
compadd -Q -S '' -- "$string"
else
# If the string does not end with a '/', append a space
compadd -Q -S ' ' -- "$string"
fi
done
}
compdef _conn conn
compdef _conn connpy
#Here ends zsh completion for conn
'''
if type == "run":
return "node[@subfolder][@folder] commmand to run\nRun the specific command on the node and print output\n/path/to/file.yaml\nUse a yaml file to run an automation script"
if type == "generate":
return '''---
tasks:
- name: "Config"
action: 'run' #Action can be test or run. Mandatory
nodes: #List of nodes to work on. Mandatory
- 'router1@office' #You can add specific nodes
- '@aws' #entire folders or subfolders
- '@office': #or filter inside a folder or subfolder
- 'router2'
- 'router7'
commands: #List of commands to send, use {name} to pass variables
- 'term len 0'
- 'conf t'
- 'interface {if}'
- 'ip address 10.100.100.{id} 255.255.255.255'
- '{commit}'
- 'end'
variables: #Variables to use on commands and expected. Optional
__global__: #Global variables to use on all nodes, fallback if missing in the node.
commit: ''
if: 'loopback100'
router1@office:
id: 1
router2@office:
id: 2
commit: 'commit'
router3@office:
id: 3
vrouter1@aws:
id: 4
vrouterN@aws:
id: 5
output: /home/user/logs #Type of output, if null you only get Connection and test result. Choices are: null,stdout,/path/to/folder. Folder path only works on 'run' action.
options:
prompt: r'>$|#$|\$$|>.$|#.$|\$.$' #Optional prompt to check on your devices, default should work on most devices.
parallel: 10 #Optional number of nodes to run commands on parallel. Default 10.
timeout: 20 #Optional time to wait in seconds for prompt, expected or EOF. Default 20.
- name: "TestConfig"
action: 'test'
nodes:
- 'router1@office'
- '@aws'
- '@office':
- 'router2'
- 'router7'
commands:
- 'ping 10.100.100.{id}'
expected: '!' #Expected text to find when running test action. Mandatory for 'test'
variables:
router1@office:
id: 1
router2@office:
id: 2
commit: 'commit'
router3@office:
id: 3
vrouter1@aws:
id: 4
vrouterN@aws:
id: 5
output: null
...'''
def encrypt(self, password, keyfile=None):
'''
Encrypts password using RSA keyfile
### Parameters:
- password (str): Plaintext password to encrypt.
### Optional Parameters:
- keyfile (str): Path/file to keyfile. Default is config keyfile.
### Returns:
str: Encrypted password.
'''
if keyfile is None:
keyfile = self.config.key
with open(keyfile) as f:
key = RSA.import_key(f.read())
f.close()
publickey = key.publickey()
encryptor = PKCS1_OAEP.new(publickey)
password = encryptor.encrypt(password.encode("utf-8"))
return str(password)