Package conn
Connection manager
conn is a connection manager that allows you to store nodes to connect them fast and password free.
Features
- You can generate profiles and reference them from nodes using @profilename
so you dont need to edit multiple nodes when changing password or other
information.
- Nodes can be stored on @folder or @subfolder@folder to organize your
devices. Then can be referenced using node@subfolder@folder or node@folder
- Much more!
Usage
usage: conn [-h] [--add | --del | --mod | --show | --debug] [node|folder]
conn {profile,move,mv,copy,cp,list,ls,bulk,config} ...
positional arguments:
node|folder node[@subfolder][@folder]
Connect to specific node or show all matching nodes
[@subfolder][@folder]
Show all available connections globaly or in specified path
Options:
-h, --help show this help message and exit
--add Add new node[@subfolder][@folder] or [@subfolder]@folder
--del, --rm Delete node[@subfolder][@folder] or [@subfolder]@folder
--mod, --edit Modify node[@subfolder][@folder]
--show Show node[@subfolder][@folder]
--debug, -d Display all conections steps
Commands:
profile Manage profiles
move (mv) Move node
copy (cp) Copy node
list (ls) List profiles, nodes or folders
bulk Add nodes in bulk
config Manage app config
Manage profiles
usage: conn profile [-h] (--add | --del | --mod | --show) profile
positional arguments:
profile Name of profile to manage
options:
-h, --help show this help message and exit
--add Add new profile
--del, --rm Delete profile
--mod, --edit Modify profile
--show Show profile
Examples
conn profile --add office-user
conn --add @office
conn --add @datacenter@office
conn --add server@datacenter@office
conn --add pc@office
conn --show server@datacenter@office
conn pc@office
conn server
Automation module
the automation module
Standalone module
import conn
router = conn.node("unique name","ip/hostname", user="username", password="pass")
router.run(["term len 0","show run"])
print(router.output)
hasip = router.test("show ip int brief","1.1.1.1")
if hasip:
print("Router has ip 1.1.1.1")
else:
print("router don't has ip 1.1.1.1")
Using manager configuration
import conn
conf = conn.configfile()
device = conf.getitem("server@office")
server = conn.node("unique name", **device, config=conf)
result = server.run(["cd /", "ls -la"])
print(result)
Running parallel tasks
import conn
conf = conn.configfile()
#You can get the nodes from the config from a folder and fitlering in it
nodes = conf.getitem("@office", ["router1", "router2", "router3"])
#You can also get each node individually:
nodes = {}
nodes["router1"] = conf.getitem("router1@office")
nodes["router2"] = conf.getitem("router2@office")
nodes["router10"] = conf.getitem("router10@datacenter")
#Also, you can create the nodes manually:
nodes = {}
nodes["router1"] = {"host": "1.1.1.1", "user": "username", "password": "pass1"}
nodes["router2"] = {"host": "1.1.1.2", "user": "username", "password": "pass2"}
nodes["router3"] = {"host": "1.1.1.2", "user": "username", "password": "pass3"}
#Finally you run some tasks on the nodes
mynodes = conn.nodes(nodes, config = conf)
result = mynodes.test(["show ip int br"], "1.1.1.2")
for i in result:
print("---" + i + "---")
print(result[i])
print()
# Or for one specific node
mynodes.router1.run(["term len 0". "show run"], folder = "/home/user/logs")
Expand source code
#!/usr/bin/env python3
'''
## Connection manager
conn is a connection manager that allows you to store nodes to connect them fast and password free.
### Features
- You can generate profiles and reference them from nodes using @profilename
so you dont need to edit multiple nodes when changing password or other
information.
- Nodes can be stored on @folder or @subfolder@folder to organize your
devices. Then can be referenced using node@subfolder@folder or node@folder
- Much more!
### Usage
```
usage: conn [-h] [--add | --del | --mod | --show | --debug] [node|folder]
conn {profile,move,mv,copy,cp,list,ls,bulk,config} ...
positional arguments:
node|folder node[@subfolder][@folder]
Connect to specific node or show all matching nodes
[@subfolder][@folder]
Show all available connections globaly or in specified path
Options:
-h, --help show this help message and exit
--add Add new node[@subfolder][@folder] or [@subfolder]@folder
--del, --rm Delete node[@subfolder][@folder] or [@subfolder]@folder
--mod, --edit Modify node[@subfolder][@folder]
--show Show node[@subfolder][@folder]
--debug, -d Display all conections steps
Commands:
profile Manage profiles
move (mv) Move node
copy (cp) Copy node
list (ls) List profiles, nodes or folders
bulk Add nodes in bulk
config Manage app config
```
### Manage profiles
```
usage: conn profile [-h] (--add | --del | --mod | --show) profile
positional arguments:
profile Name of profile to manage
options:
-h, --help show this help message and exit
--add Add new profile
--del, --rm Delete profile
--mod, --edit Modify profile
--show Show profile
```
### Examples
```
conn profile --add office-user
conn --add @office
conn --add @datacenter@office
conn --add server@datacenter@office
conn --add pc@office
conn --show server@datacenter@office
conn pc@office
conn server
```
## Automation module
the automation module
### Standalone module
```
import conn
router = conn.node("unique name","ip/hostname", user="username", password="pass")
router.run(["term len 0","show run"])
print(router.output)
hasip = router.test("show ip int brief","1.1.1.1")
if hasip:
print("Router has ip 1.1.1.1")
else:
print("router don't has ip 1.1.1.1")
```
### Using manager configuration
```
import conn
conf = conn.configfile()
device = conf.getitem("server@office")
server = conn.node("unique name", **device, config=conf)
result = server.run(["cd /", "ls -la"])
print(result)
```
### Running parallel tasks
```
import conn
conf = conn.configfile()
#You can get the nodes from the config from a folder and fitlering in it
nodes = conf.getitem("@office", ["router1", "router2", "router3"])
#You can also get each node individually:
nodes = {}
nodes["router1"] = conf.getitem("router1@office")
nodes["router2"] = conf.getitem("router2@office")
nodes["router10"] = conf.getitem("router10@datacenter")
#Also, you can create the nodes manually:
nodes = {}
nodes["router1"] = {"host": "1.1.1.1", "user": "username", "password": "pass1"}
nodes["router2"] = {"host": "1.1.1.2", "user": "username", "password": "pass2"}
nodes["router3"] = {"host": "1.1.1.2", "user": "username", "password": "pass3"}
#Finally you run some tasks on the nodes
mynodes = conn.nodes(nodes, config = conf)
result = mynodes.test(["show ip int br"], "1.1.1.2")
for i in result:
print("---" + i + "---")
print(result[i])
print()
# Or for one specific node
mynodes.router1.run(["term len 0". "show run"], folder = "/home/user/logs")
```
'''
from .core import node,nodes
from .configfile import configfile
from .connapp import connapp
from pkg_resources import get_distribution
__all__ = ["node", "nodes", "configfile", "connapp"]
__version__ = "2.0.10"
__author__ = "Federico Luzzi"
__pdoc__ = {
'core': False,
}
Classes
class configfile (conf=None, key=None)
-
This class generates a configfile object. Containts a dictionary storing, config, nodes and profiles, normaly used by connection manager.
Attributes:
- file (str): Path/file to config file. - key (str): Path/file to RSA key file. - config (dict): Dictionary containing information of connection manager configuration. - connections (dict): Dictionary containing all the nodes added to connection manager. - profiles (dict): Dictionary containing all the profiles added to connection manager. - privatekey (obj): Object containing the private key to encrypt passwords. - publickey (obj): Object containing the public key to decrypt passwords.
Optional Parameters:
- conf (str): Path/file to config file. If left empty default path is ~/.config/conn/config.json - key (str): Path/file to RSA key file. If left empty default path is ~/.config/conn/.osk
Expand source code
class configfile: ''' This class generates a configfile object. Containts a dictionary storing, config, nodes and profiles, normaly used by connection manager. ### Attributes: - file (str): Path/file to config file. - key (str): Path/file to RSA key file. - config (dict): Dictionary containing information of connection manager configuration. - connections (dict): Dictionary containing all the nodes added to connection manager. - profiles (dict): Dictionary containing all the profiles added to connection manager. - privatekey (obj): Object containing the private key to encrypt passwords. - publickey (obj): Object containing the public key to decrypt passwords. ''' def __init__(self, conf = None, key = None): ''' ### Optional Parameters: - conf (str): Path/file to config file. If left empty default path is ~/.config/conn/config.json - key (str): Path/file to RSA key file. If left empty default path is ~/.config/conn/.osk ''' home = os.path.expanduser("~") defaultdir = home + '/.config/conn' defaultfile = defaultdir + '/config.json' defaultkey = defaultdir + '/.osk' Path(defaultdir).mkdir(parents=True, exist_ok=True) if conf == None: self.file = defaultfile else: self.file = conf if key == None: self.key = defaultkey else: self.key = key if os.path.exists(self.file): config = self._loadconfig(self.file) else: config = self._createconfig(self.file) self.config = config["config"] self.connections = config["connections"] self.profiles = config["profiles"] if not os.path.exists(self.key): self._createkey(self.key) self.privatekey = RSA.import_key(open(self.key).read()) self.publickey = self.privatekey.publickey() def _loadconfig(self, conf): #Loads config file jsonconf = open(conf) return json.load(jsonconf) def _createconfig(self, conf): #Create config file defaultconfig = {'config': {'case': False, 'idletime': 30}, 'connections': {}, 'profiles': { "default": { "host":"", "protocol":"ssh", "port":"", "user":"", "password":"", "options":"", "logs":"" }}} if not os.path.exists(conf): with open(conf, "w") as f: json.dump(defaultconfig, f, indent = 4) f.close() os.chmod(conf, 0o600) jsonconf = open(conf) return json.load(jsonconf) def _saveconfig(self, conf): #Save config file newconfig = {"config":{}, "connections": {}, "profiles": {}} newconfig["config"] = self.config newconfig["connections"] = self.connections newconfig["profiles"] = self.profiles with open(conf, "w") as f: json.dump(newconfig, f, indent = 4) f.close() def _createkey(self, keyfile): #Create key file key = RSA.generate(2048) with open(keyfile,'wb') as f: f.write(key.export_key('PEM')) f.close() os.chmod(keyfile, 0o600) def _explode_unique(self, unique): #Divide unique name into folder, subfolder and id uniques = unique.split("@") if not unique.startswith("@"): result = {"id": uniques[0]} else: result = {} if len(uniques) == 2: result["folder"] = uniques[1] if result["folder"] == "": return False elif len(uniques) == 3: result["folder"] = uniques[2] result["subfolder"] = uniques[1] if result["folder"] == "" or result["subfolder"] == "": return False elif len(uniques) > 3: return False return result def getitem(self, unique, keys = None): ''' Get an node or a group of nodes from configfile which can be passed to node/nodes class ### Parameters: - unique (str): Unique name of the node or folder in config using connection manager style: node[@subfolder][@folder] or [@subfolder]@folder ### Optional Parameters: - keys (list): In case you pass a folder as unique, you can filter nodes inside the folder passing a list. ### Returns: dict: Dictionary containing information of node or multiple dictionaries of multiple nodes. ''' uniques = self._explode_unique(unique) if unique.startswith("@"): if uniques.keys() >= {"folder", "subfolder"}: folder = self.connections[uniques["folder"]][uniques["subfolder"]] else: folder = self.connections[uniques["folder"]] newfolder = folder.copy() newfolder.pop("type") for node in newfolder.keys(): if "type" in newfolder[node].keys(): newfolder[node].pop("type") if keys == None: return newfolder else: f_newfolder = dict((k, newfolder[k]) for k in keys) return f_newfolder else: if uniques.keys() >= {"folder", "subfolder"}: node = self.connections[uniques["folder"]][uniques["subfolder"]][uniques["id"]] elif "folder" in uniques.keys(): node = self.connections[uniques["folder"]][uniques["id"]] else: node = self.connections[uniques["id"]] newnode = node.copy() newnode.pop("type") return newnode def _connections_add(self,*, id, host, folder='', subfolder='', options='', logs='', password='', port='', protocol='', user='', type = "connection" ): #Add connection from config if folder == '': self.connections[id] = {"host": host, "options": options, "logs": logs, "password": password, "port": port, "protocol": protocol, "user": user, "type": type} elif folder != '' and subfolder == '': self.connections[folder][id] = {"host": host, "options": options, "logs": logs, "password": password, "port": port, "protocol": protocol, "user": user, "type": type} elif folder != '' and subfolder != '': self.connections[folder][subfolder][id] = {"host": host, "options": options, "logs": logs, "password": password, "port": port, "protocol": protocol, "user": user, "type": type} def _connections_del(self,*, id, folder='', subfolder=''): #Delete connection from config if folder == '': del self.connections[id] elif folder != '' and subfolder == '': del self.connections[folder][id] elif folder != '' and subfolder != '': del self.connections[folder][subfolder][id] def _folder_add(self,*, folder, subfolder = ''): #Add Folder from config if subfolder == '': if folder not in self.connections: self.connections[folder] = {"type": "folder"} else: if subfolder not in self.connections[folder]: self.connections[folder][subfolder] = {"type": "subfolder"} def _folder_del(self,*, folder, subfolder=''): #Delete folder from config if subfolder == '': del self.connections[folder] else: del self.connections[folder][subfolder] def _profiles_add(self,*, id, host = '', options='', logs='', password='', port='', protocol='', user='' ): #Add profile from config self.profiles[id] = {"host": host, "options": options, "logs": logs, "password": password, "port": port, "protocol": protocol, "user": user} def _profiles_del(self,*, id ): #Delete profile from config del self.profiles[id]
Methods
def getitem(self, unique, keys=None)
-
Get an node or a group of nodes from configfile which can be passed to node/nodes class
Parameters:
- unique (str): Unique name of the node or folder in config using connection manager style: node[@subfolder][@folder] or [@subfolder]@folder
Optional Parameters:
- keys (list): In case you pass a folder as unique, you can filter nodes inside the folder passing a list.
Returns:
dict: Dictionary containing information of node or multiple dictionaries of multiple nodes.
Expand source code
def getitem(self, unique, keys = None): ''' Get an node or a group of nodes from configfile which can be passed to node/nodes class ### Parameters: - unique (str): Unique name of the node or folder in config using connection manager style: node[@subfolder][@folder] or [@subfolder]@folder ### Optional Parameters: - keys (list): In case you pass a folder as unique, you can filter nodes inside the folder passing a list. ### Returns: dict: Dictionary containing information of node or multiple dictionaries of multiple nodes. ''' uniques = self._explode_unique(unique) if unique.startswith("@"): if uniques.keys() >= {"folder", "subfolder"}: folder = self.connections[uniques["folder"]][uniques["subfolder"]] else: folder = self.connections[uniques["folder"]] newfolder = folder.copy() newfolder.pop("type") for node in newfolder.keys(): if "type" in newfolder[node].keys(): newfolder[node].pop("type") if keys == None: return newfolder else: f_newfolder = dict((k, newfolder[k]) for k in keys) return f_newfolder else: if uniques.keys() >= {"folder", "subfolder"}: node = self.connections[uniques["folder"]][uniques["subfolder"]][uniques["id"]] elif "folder" in uniques.keys(): node = self.connections[uniques["folder"]][uniques["id"]] else: node = self.connections[uniques["id"]] newnode = node.copy() newnode.pop("type") return newnode
class connapp (config)
-
This class starts the connection manager app. It's normally used by connection manager but you can use it on a script to run the connection manager your way and use a different configfile and key.
Parameters:
- config (obj): Object generated with configfile class, it contains the nodes configuration and the methods to manage the config file.
Expand source code
class connapp: ''' This class starts the connection manager app. It's normally used by connection manager but you can use it on a script to run the connection manager your way and use a different configfile and key. ''' def __init__(self, config): ''' ### Parameters: - config (obj): Object generated with configfile class, it contains the nodes configuration and the methods to manage the config file. ''' self.node = node self.config = config self.nodes = self._getallnodes() self.folders = self._getallfolders() self.profiles = list(self.config.profiles.keys()) self.case = self.config.config["case"] #DEFAULTPARSER defaultparser = argparse.ArgumentParser(prog = "conn", description = "SSH and Telnet connection manager", formatter_class=argparse.RawTextHelpFormatter) subparsers = defaultparser.add_subparsers(title="Commands") #NODEPARSER nodeparser = subparsers.add_parser("node",usage=self._help("usage"), help=self._help("node"),epilog=self._help("end"), formatter_class=argparse.RawTextHelpFormatter) nodecrud = nodeparser.add_mutually_exclusive_group() nodeparser.add_argument("node", metavar="node|folder", nargs='?', default=None, action=self._store_type, type=self._type_node, help=self._help("node")) nodecrud.add_argument("--add", dest="action", action="store_const", help="Add new node[@subfolder][@folder] or [@subfolder]@folder", const="add", default="connect") nodecrud.add_argument("--del", "--rm", dest="action", action="store_const", help="Delete node[@subfolder][@folder] or [@subfolder]@folder", const="del", default="connect") nodecrud.add_argument("--mod", "--edit", dest="action", action="store_const", help="Modify node[@subfolder][@folder]", const="mod", default="connect") nodecrud.add_argument("--show", dest="action", action="store_const", help="Show node[@subfolder][@folder]", const="show", default="connect") nodecrud.add_argument("--debug", "-d", dest="action", action="store_const", help="Display all conections steps", const="debug", default="connect") nodeparser.set_defaults(func=self._func_node) #PROFILEPARSER profileparser = subparsers.add_parser("profile", help="Manage profiles") profileparser.add_argument("profile", nargs=1, action=self._store_type, type=self._type_profile, help="Name of profile to manage") profilecrud = profileparser.add_mutually_exclusive_group(required=True) profilecrud.add_argument("--add", dest="action", action="store_const", help="Add new profile", const="add") profilecrud.add_argument("--del", "--rm", dest="action", action="store_const", help="Delete profile", const="del") profilecrud.add_argument("--mod", "--edit", dest="action", action="store_const", help="Modify profile", const="mod") profilecrud.add_argument("--show", dest="action", action="store_const", help="Show profile", const="show") profileparser.set_defaults(func=self._func_profile) #MOVEPARSER moveparser = subparsers.add_parser("move", aliases=["mv"], help="Move node") moveparser.add_argument("move", nargs=2, action=self._store_type, help="Move node[@subfolder][@folder] dest_node[@subfolder][@folder]", default="move", type=self._type_node) moveparser.set_defaults(func=self._func_others) #COPYPARSER copyparser = subparsers.add_parser("copy", aliases=["cp"], help="Copy node") copyparser.add_argument("cp", nargs=2, action=self._store_type, help="Copy node[@subfolder][@folder] new_node[@subfolder][@folder]", default="cp", type=self._type_node) copyparser.set_defaults(func=self._func_others) #LISTPARSER lsparser = subparsers.add_parser("list", aliases=["ls"], help="List profiles, nodes or folders") lsparser.add_argument("ls", action=self._store_type, choices=["profiles","nodes","folders"], help="List profiles, nodes or folders", default=False) lsparser.set_defaults(func=self._func_others) #BULKPARSER bulkparser = subparsers.add_parser("bulk", help="Add nodes in bulk") bulkparser.add_argument("bulk", const="bulk", nargs=0, action=self._store_type, help="Add nodes in bulk") bulkparser.set_defaults(func=self._func_others) #CONFIGPARSER configparser = subparsers.add_parser("config", help="Manage app config") configparser.add_argument("--allow-uppercase", dest="case", nargs=1, action=self._store_type, help="Allow case sensitive names", choices=["true","false"]) configparser.add_argument("--keepalive", dest="idletime", nargs=1, action=self._store_type, help="Set keepalive time in seconds, 0 to disable", type=int, metavar="INT") configparser.add_argument("--completion", dest="completion", nargs=0, action=self._store_type, help="Get bash completion configuration for conn") configparser.set_defaults(func=self._func_others) #Set default subparser and tune arguments commands = ["node", "profile", "mv", "move","copy", "cp", "bulk", "ls", "list", "config"] profilecmds = ["--add", "--del", "--rm", "--mod", "--edit", "--show"] if len(sys.argv) >= 3 and sys.argv[2] == "profile" and sys.argv[1] in profilecmds: sys.argv[2] = sys.argv[1] sys.argv[1] = "profile" if len(sys.argv) < 2 or sys.argv[1] not in commands: sys.argv.insert(1,"node") args = defaultparser.parse_args() args.func(args) class _store_type(argparse.Action): #Custom store type for cli app. def __call__(self, parser, args, values, option_string=None): setattr(args, "data", values) delattr(args,self.dest) setattr(args, "command", self.dest) def _func_node(self, args): #Function called when connecting or managing nodes. if not self.case and args.data != None: args.data = args.data.lower() if args.action == "connect" or args.action == "debug": if args.data == None: matches = self.nodes if len(matches) == 0: print("There are no nodes created") print("try: conn --help") exit(9) else: if args.data.startswith("@"): matches = list(filter(lambda k: args.data in k, self.nodes)) else: matches = list(filter(lambda k: k.startswith(args.data), self.nodes)) if len(matches) == 0: print("{} not found".format(args.data)) exit(2) elif len(matches) > 1: matches[0] = self._choose(matches,"node", "connect") if matches[0] == None: exit(7) node = self.config.getitem(matches[0]) node = self.node(matches[0],**node, config = self.config) if args.action == "debug": node.interact(debug = True) else: node.interact() elif args.action == "del": if args.data == None: print("Missing argument node") exit(3) elif args.data.startswith("@"): matches = list(filter(lambda k: k == args.data, self.folders)) else: matches = list(filter(lambda k: k == args.data, self.nodes)) if len(matches) == 0: print("{} not found".format(args.data)) exit(2) question = [inquirer.Confirm("delete", message="Are you sure you want to delete {}?".format(matches[0]))] confirm = inquirer.prompt(question) if confirm["delete"]: uniques = self.config._explode_unique(matches[0]) if args.data.startswith("@"): self.config._folder_del(**uniques) else: self.config._connections_del(**uniques) self.config._saveconfig(self.config.file) print("{} deleted succesfully".format(matches[0])) elif args.action == "add": if args.data == None: print("Missing argument node") exit(3) elif args.data.startswith("@"): type = "folder" matches = list(filter(lambda k: k == args.data, self.folders)) reversematches = list(filter(lambda k: "@" + k == args.data, self.nodes)) else: type = "node" matches = list(filter(lambda k: k == args.data, self.nodes)) reversematches = list(filter(lambda k: k == "@" + args.data, self.folders)) if len(matches) > 0: print("{} already exist".format(matches[0])) exit(4) if len(reversematches) > 0: print("{} already exist".format(reversematches[0])) exit(4) else: if type == "folder": uniques = self.config._explode_unique(args.data) if uniques == False: print("Invalid folder {}".format(args.data)) exit(5) if "subfolder" in uniques.keys(): parent = "@" + uniques["folder"] if parent not in self.folders: print("Folder {} not found".format(uniques["folder"])) exit(2) self.config._folder_add(**uniques) self.config._saveconfig(self.config.file) print("{} added succesfully".format(args.data)) if type == "node": nodefolder = args.data.partition("@") nodefolder = "@" + nodefolder[2] if nodefolder not in self.folders and nodefolder != "@": print(nodefolder + " not found") exit(2) uniques = self.config._explode_unique(args.data) if uniques == False: print("Invalid node {}".format(args.data)) exit(5) print("You can use the configured setting in a profile using @profilename.") print("You can also leave empty any value except hostname/IP.") print("You can pass 1 or more passwords using comma separated @profiles") print("You can use this variables on logging file name: ${id} ${unique} ${host} ${port} ${user} ${protocol}") newnode = self._questions_nodes(args.data, uniques) if newnode == False: exit(7) self.config._connections_add(**newnode) self.config._saveconfig(self.config.file) print("{} added succesfully".format(args.data)) elif args.action == "show": if args.data == None: print("Missing argument node") exit(3) matches = list(filter(lambda k: k == args.data, self.nodes)) if len(matches) == 0: print("{} not found".format(args.data)) exit(2) node = self.config.getitem(matches[0]) for k, v in node.items(): if isinstance(v, str): print(k + ": " + v) else: print(k + ":") for i in v: print(" - " + i) elif args.action == "mod": if args.data == None: print("Missing argument node") exit(3) matches = list(filter(lambda k: k == args.data, self.nodes)) if len(matches) == 0: print("{} not found".format(args.data)) exit(2) node = self.config.getitem(matches[0]) edits = self._questions_edit() if edits == None: exit(7) uniques = self.config._explode_unique(args.data) updatenode = self._questions_nodes(args.data, uniques, edit=edits) if not updatenode: exit(7) uniques.update(node) if sorted(updatenode.items()) == sorted(uniques.items()): print("Nothing to do here") return else: self.config._connections_add(**updatenode) self.config._saveconfig(self.config.file) print("{} edited succesfully".format(args.data)) def _func_profile(self, args): #Function called when managing profiles if not self.case: args.data[0] = args.data[0].lower() if args.action == "del": matches = list(filter(lambda k: k == args.data[0], self.profiles)) if len(matches) == 0: print("{} not found".format(args.data[0])) exit(2) if matches[0] == "default": print("Can't delete default profile") exit(6) usedprofile = self._profileused(matches[0]) if len(usedprofile) > 0: print("Profile {} used in the following nodes:".format(matches[0])) print(", ".join(usedprofile)) exit(8) question = [inquirer.Confirm("delete", message="Are you sure you want to delete {}?".format(matches[0]))] confirm = inquirer.prompt(question) if confirm["delete"]: self.config._profiles_del(id = matches[0]) self.config._saveconfig(self.config.file) print("{} deleted succesfully".format(matches[0])) elif args.action == "show": matches = list(filter(lambda k: k == args.data[0], self.profiles)) if len(matches) == 0: print("{} not found".format(args.data[0])) exit(2) profile = self.config.profiles[matches[0]] for k, v in profile.items(): if isinstance(v, str): print(k + ": " + v) else: print(k + ":") for i in v: print(" - " + i) elif args.action == "add": matches = list(filter(lambda k: k == args.data[0], self.profiles)) if len(matches) > 0: print("Profile {} Already exist".format(matches[0])) exit(4) newprofile = self._questions_profiles(args.data[0]) if newprofile == False: exit(7) self.config._profiles_add(**newprofile) self.config._saveconfig(self.config.file) print("{} added succesfully".format(args.data[0])) elif args.action == "mod": matches = list(filter(lambda k: k == args.data[0], self.profiles)) if len(matches) == 0: print("{} not found".format(args.data[0])) exit(2) profile = self.config.profiles[matches[0]] oldprofile = {"id": matches[0]} oldprofile.update(profile) edits = self._questions_edit() if edits == None: exit(7) updateprofile = self._questions_profiles(matches[0], edit=edits) if not updateprofile: exit(7) if sorted(updateprofile.items()) == sorted(oldprofile.items()): print("Nothing to do here") return else: self.config._profiles_add(**updateprofile) self.config._saveconfig(self.config.file) print("{} edited succesfully".format(args.data[0])) def _func_others(self, args): #Function called when using other commands if args.command == "ls": print(*getattr(self, args.data), sep="\n") elif args.command == "move" or args.command == "cp": if not self.case: args.data[0] = args.data[0].lower() args.data[1] = args.data[1].lower() source = list(filter(lambda k: k == args.data[0], self.nodes)) dest = list(filter(lambda k: k == args.data[1], self.nodes)) if len(source) != 1: print("{} not found".format(args.data[0])) exit(2) if len(dest) > 0: print("Node {} Already exist".format(args.data[1])) exit(4) nodefolder = args.data[1].partition("@") nodefolder = "@" + nodefolder[2] if nodefolder not in self.folders and nodefolder != "@": print("{} not found".format(nodefolder)) exit(2) olduniques = self.config._explode_unique(args.data[0]) newuniques = self.config._explode_unique(args.data[1]) if newuniques == False: print("Invalid node {}".format(args.data[1])) exit(5) node = self.config.getitem(source[0]) newnode = {**newuniques, **node} self.config._connections_add(**newnode) if args.command == "move": self.config._connections_del(**olduniques) self.config._saveconfig(self.config.file) if args.command == "move": print("{} moved succesfully to {}".format(args.data[0],args.data[1])) if args.command == "cp": print("{} copied succesfully to {}".format(args.data[0],args.data[1])) elif args.command == "bulk": newnodes = self._questions_bulk() if newnodes == False: exit(7) if not self.case: newnodes["location"] = newnodes["location"].lower() newnodes["ids"] = newnodes["ids"].lower() ids = newnodes["ids"].split(",") hosts = newnodes["host"].split(",") count = 0 for n in ids: unique = n + newnodes["location"] matches = list(filter(lambda k: k == unique, self.nodes)) reversematches = list(filter(lambda k: k == "@" + unique, self.folders)) if len(matches) > 0: print("Node {} already exist, ignoring it".format(unique)) continue if len(reversematches) > 0: print("Folder with name {} already exist, ignoring it".format(unique)) continue newnode = {"id": n} if newnodes["location"] != "": location = self.config._explode_unique(newnodes["location"]) newnode.update(location) if len(hosts) > 1: index = ids.index(n) newnode["host"] = hosts[index] else: newnode["host"] = hosts[0] newnode["protocol"] = newnodes["protocol"] newnode["port"] = newnodes["port"] newnode["options"] = newnodes["options"] newnode["logs"] = newnodes["logs"] newnode["user"] = newnodes["user"] newnode["password"] = newnodes["password"] count +=1 self.config._connections_add(**newnode) self.nodes = self._getallnodes() if count > 0: self.config._saveconfig(self.config.file) print("Succesfully added {} nodes".format(count)) else: print("0 nodes added") else: if args.command == "completion": print(self._help("completion")) else: if args.command == "case": if args.data[0] == "true": args.data[0] = True elif args.data[0] == "false": args.data[0] = False if args.command == "idletime": if args.data[0] < 0: args.data[0] = 0 self.config.config[args.command] = args.data[0] self.config._saveconfig(self.config.file) print("Config saved") def _choose(self, list, name, action): #Generates an inquirer list to pick questions = [inquirer.List(name, message="Pick {} to {}:".format(name,action), choices=list, carousel=True)] answer = inquirer.prompt(questions) if answer == None: return else: return answer[name] def _host_validation(self, answers, current, regex = "^.+$"): #Validate hostname in inquirer when managing nodes if not re.match(regex, current): raise inquirer.errors.ValidationError("", reason="Host cannot be empty") if current.startswith("@"): if current[1:] not in self.profiles: raise inquirer.errors.ValidationError("", reason="Profile {} don't exist".format(current)) return True def _profile_protocol_validation(self, answers, current, regex = "(^ssh$|^telnet$|^$)"): #Validate protocol in inquirer when managing profiles if not re.match(regex, current): raise inquirer.errors.ValidationError("", reason="Pick between ssh, telnet or leave empty") return True def _protocol_validation(self, answers, current, regex = "(^ssh$|^telnet$|^$|^@.+$)"): #Validate protocol in inquirer when managing nodes if not re.match(regex, current): raise inquirer.errors.ValidationError("", reason="Pick between ssh, telnet, leave empty or @profile") if current.startswith("@"): if current[1:] not in self.profiles: raise inquirer.errors.ValidationError("", reason="Profile {} don't exist".format(current)) return True def _profile_port_validation(self, answers, current, regex = "(^[0-9]*$)"): #Validate port in inquirer when managing profiles if not re.match(regex, current): raise inquirer.errors.ValidationError("", reason="Pick a port between 1-65535, @profile o leave empty") try: port = int(current) except: port = 0 if current != "" and not 1 <= int(port) <= 65535: raise inquirer.errors.ValidationError("", reason="Pick a port between 1-65535 or leave empty") return True def _port_validation(self, answers, current, regex = "(^[0-9]*$|^@.+$)"): #Validate port in inquirer when managing nodes if not re.match(regex, current): raise inquirer.errors.ValidationError("", reason="Pick a port between 1-65535, @profile or leave empty") try: port = int(current) except: port = 0 if current.startswith("@"): if current[1:] not in self.profiles: raise inquirer.errors.ValidationError("", reason="Profile {} don't exist".format(current)) elif current != "" and not 1 <= int(port) <= 65535: raise inquirer.errors.ValidationError("", reason="Pick a port between 1-65535, @profile o leave empty") return True def _pass_validation(self, answers, current, regex = "(^@.+$)"): #Validate password in inquirer profiles = current.split(",") for i in profiles: if not re.match(regex, i) or i[1:] not in self.profiles: raise inquirer.errors.ValidationError("", reason="Profile {} don't exist".format(i)) return True def _default_validation(self, answers, current): #Default validation type used in multiples questions in inquirer if current.startswith("@"): if current[1:] not in self.profiles: raise inquirer.errors.ValidationError("", reason="Profile {} don't exist".format(current)) return True def _bulk_node_validation(self, answers, current, regex = "^[0-9a-zA-Z_.,$#-]+$"): #Validation of nodes when running bulk command if not re.match(regex, current): raise inquirer.errors.ValidationError("", reason="Host cannot be empty") if current.startswith("@"): if current[1:] not in self.profiles: raise inquirer.errors.ValidationError("", reason="Profile {} don't exist".format(current)) return True def _bulk_folder_validation(self, answers, current): #Validation of folders when running bulk command if not self.case: current = current.lower() matches = list(filter(lambda k: k == current, self.folders)) if current != "" and len(matches) == 0: raise inquirer.errors.ValidationError("", reason="Location {} don't exist".format(current)) return True def _bulk_host_validation(self, answers, current, regex = "^.+$"): #Validate hostname when running bulk command if not re.match(regex, current): raise inquirer.errors.ValidationError("", reason="Host cannot be empty") if current.startswith("@"): if current[1:] not in self.profiles: raise inquirer.errors.ValidationError("", reason="Profile {} don't exist".format(current)) hosts = current.split(",") nodes = answers["ids"].split(",") if len(hosts) > 1 and len(hosts) != len(nodes): raise inquirer.errors.ValidationError("", reason="Hosts list should be the same length of nodes list") return True def _questions_edit(self): #Inquirer questions when editing nodes or profiles questions = [] questions.append(inquirer.Confirm("host", message="Edit Hostname/IP?")) questions.append(inquirer.Confirm("protocol", message="Edit Protocol?")) questions.append(inquirer.Confirm("port", message="Edit Port?")) questions.append(inquirer.Confirm("options", message="Edit Options?")) questions.append(inquirer.Confirm("logs", message="Edit logging path/file?")) questions.append(inquirer.Confirm("user", message="Edit User?")) questions.append(inquirer.Confirm("password", message="Edit password?")) answers = inquirer.prompt(questions) return answers def _questions_nodes(self, unique, uniques = None, edit = None): #Questions when adding or editing nodes try: defaults = self.config.getitem(unique) except: defaults = { "host":"", "protocol":"", "port":"", "user":"", "options":"", "logs":"" } node = {} if edit == None: edit = { "host":True, "protocol":True, "port":True, "user":True, "password": True,"options":True, "logs":True } questions = [] if edit["host"]: questions.append(inquirer.Text("host", message="Add Hostname or IP", validate=self._host_validation, default=defaults["host"])) else: node["host"] = defaults["host"] if edit["protocol"]: questions.append(inquirer.Text("protocol", message="Select Protocol", validate=self._protocol_validation, default=defaults["protocol"])) else: node["protocol"] = defaults["protocol"] if edit["port"]: questions.append(inquirer.Text("port", message="Select Port Number", validate=self._port_validation, default=defaults["port"])) else: node["port"] = defaults["port"] if edit["options"]: questions.append(inquirer.Text("options", message="Pass extra options to protocol", validate=self._default_validation, default=defaults["options"])) else: node["options"] = defaults["options"] if edit["logs"]: questions.append(inquirer.Text("logs", message="Pick logging path/file ", validate=self._default_validation, default=defaults["logs"])) else: node["logs"] = defaults["logs"] if edit["user"]: questions.append(inquirer.Text("user", message="Pick username", validate=self._default_validation, default=defaults["user"])) else: node["user"] = defaults["user"] if edit["password"]: questions.append(inquirer.List("password", message="Password: Use a local password, no password or a list of profiles to reference?", choices=["Local Password", "Profiles", "No Password"])) else: node["password"] = defaults["password"] answer = inquirer.prompt(questions) if answer == None: return False if "password" in answer.keys(): if answer["password"] == "Local Password": passq = [inquirer.Password("password", message="Set Password")] passa = inquirer.prompt(passq) if passa == None: return False answer["password"] = self.encrypt(passa["password"]) elif answer["password"] == "Profiles": passq = [(inquirer.Text("password", message="Set a @profile or a comma separated list of @profiles", validate=self._pass_validation))] passa = inquirer.prompt(passq) if passa == None: return False answer["password"] = passa["password"].split(",") elif answer["password"] == "No Password": answer["password"] = "" result = {**uniques, **answer, **node} result["type"] = "connection" return result def _questions_profiles(self, unique, edit = None): #Questions when adding or editing profiles try: defaults = self.config.profiles[unique] except: defaults = { "host":"", "protocol":"", "port":"", "user":"", "options":"", "logs":"" } profile = {} if edit == None: edit = { "host":True, "protocol":True, "port":True, "user":True, "password": True,"options":True, "logs":True } questions = [] if edit["host"]: questions.append(inquirer.Text("host", message="Add Hostname or IP", default=defaults["host"])) else: profile["host"] = defaults["host"] if edit["protocol"]: questions.append(inquirer.Text("protocol", message="Select Protocol", validate=self._profile_protocol_validation, default=defaults["protocol"])) else: profile["protocol"] = defaults["protocol"] if edit["port"]: questions.append(inquirer.Text("port", message="Select Port Number", validate=self._profile_port_validation, default=defaults["port"])) else: profile["port"] = defaults["port"] if edit["options"]: questions.append(inquirer.Text("options", message="Pass extra options to protocol", default=defaults["options"])) else: profile["options"] = defaults["options"] if edit["logs"]: questions.append(inquirer.Text("logs", message="Pick logging path/file ", default=defaults["logs"])) else: profile["logs"] = defaults["logs"] if edit["user"]: questions.append(inquirer.Text("user", message="Pick username", default=defaults["user"])) else: profile["user"] = defaults["user"] if edit["password"]: questions.append(inquirer.Password("password", message="Set Password")) else: profile["password"] = defaults["password"] answer = inquirer.prompt(questions) if answer == None: return False if "password" in answer.keys(): if answer["password"] != "": answer["password"] = self.encrypt(answer["password"]) result = {**answer, **profile} result["id"] = unique return result def _questions_bulk(self): #Questions when using bulk command questions = [] questions.append(inquirer.Text("ids", message="add a comma separated list of nodes to add", validate=self._bulk_node_validation)) questions.append(inquirer.Text("location", message="Add a @folder, @subfolder@folder or leave empty", validate=self._bulk_folder_validation)) questions.append(inquirer.Text("host", message="Add comma separated list of Hostnames or IPs", validate=self._bulk_host_validation)) questions.append(inquirer.Text("protocol", message="Select Protocol", validate=self._protocol_validation)) questions.append(inquirer.Text("port", message="Select Port Number", validate=self._port_validation)) questions.append(inquirer.Text("options", message="Pass extra options to protocol", validate=self._default_validation)) questions.append(inquirer.Text("logs", message="Pick logging path/file ", validate=self._default_validation)) questions.append(inquirer.Text("user", message="Pick username", validate=self._default_validation)) questions.append(inquirer.List("password", message="Password: Use a local password, no password or a list of profiles to reference?", choices=["Local Password", "Profiles", "No Password"])) answer = inquirer.prompt(questions) if answer == None: return False if "password" in answer.keys(): if answer["password"] == "Local Password": passq = [inquirer.Password("password", message="Set Password")] passa = inquirer.prompt(passq) answer["password"] = self.encrypt(passa["password"]) elif answer["password"] == "Profiles": passq = [(inquirer.Text("password", message="Set a @profile or a comma separated list of @profiles", validate=self._pass_validation))] passa = inquirer.prompt(passq) answer["password"] = passa["password"].split(",") elif answer["password"] == "No Password": answer["password"] = "" answer["type"] = "connection" return answer def _type_node(self, arg_value, pat=re.compile(r"^[0-9a-zA-Z_.$@#-]+$")): if not pat.match(arg_value): raise argparse.ArgumentTypeError return arg_value def _type_profile(self, arg_value, pat=re.compile(r"^[0-9a-zA-Z_.$#-]+$")): if not pat.match(arg_value): raise argparse.ArgumentTypeError return arg_value def _help(self, type): #Store text for help and other commands if type == "node": return "node[@subfolder][@folder]\nConnect to specific node or show all matching nodes\n[@subfolder][@folder]\nShow all available connections globaly or in specified path" if type == "usage": return "conn [-h] [--add | --del | --mod | --show | --debug] [node|folder]\n conn {profile,move,mv,copy,cp,list,ls,bulk,config} ..." if type == "end": return "Commands:\n profile Manage profiles\n move (mv) Move node\n copy (cp) Copy node\n list (ls) List profiles, nodes or folders\n bulk Add nodes in bulk\n config Manage app config" if type == "completion": return ''' #Here starts bash completion for conn #You need jq installed in order to use this _conn() { DATADIR=$HOME/.config/conn mapfile -t connections < <(jq -r ' .["connections"] | paths as $path | select(getpath($path) == "connection") | $path | [map(select(. != "type"))[-1,-2,-3]] | map(select(. !=null)) | join("@")' $DATADIR/config.json) mapfile -t folders < <(jq -r ' .["connections"] | paths as $path | select(getpath($path) == "folder" or getpath($path) == "subfolder") | $path | [map(select(. != "type"))[-1,-2]] | map(select(. !=null)) | join("@")' $DATADIR/config.json) mapfile -t profiles < <(jq -r '.["profiles"] | keys[]' $DATADIR/config.json) if [ "${#COMP_WORDS[@]}" = "2" ]; then strings="--add --del --rm --edit --mod mv --show ls cp profile bulk config --help" strings="$strings ${connections[@]} ${folders[@]/#/@}" COMPREPLY=($(compgen -W "$strings" -- "${COMP_WORDS[1]}")) fi if [ "${#COMP_WORDS[@]}" = "3" ]; then strings="" if [ "${COMP_WORDS[1]}" = "profile" ]; then strings="--add --rm --del --edit --mod --show --help"; fi if [ "${COMP_WORDS[1]}" = "config" ]; then strings="--allow-uppercase --keepalive --completion --help"; fi if [[ "${COMP_WORDS[1]}" =~ ^--mod|--edit|--show|--add|--rm|--del$ ]]; then strings="profile"; fi if [[ "${COMP_WORDS[1]}" =~ ^list|ls$ ]]; then strings="profiles nodes folders"; fi if [[ "${COMP_WORDS[1]}" =~ ^bulk|mv|move|cp|copy$$ ]]; then strings="--help"; fi if [[ "${COMP_WORDS[1]}" =~ ^--rm|--del$ ]]; then strings="$strings ${folders[@]/#/@}"; fi if [[ "${COMP_WORDS[1]}" =~ ^--rm|--del|--mod|--edit|mv|move|cp|copy|--show$ ]]; then strings="$strings ${connections[@]}" fi COMPREPLY=($(compgen -W "$strings" -- "${COMP_WORDS[2]}")) fi if [ "${#COMP_WORDS[@]}" = "4" ]; then strings="" if [ "${COMP_WORDS[1]}" = "profile" ]; then if [[ "${COMP_WORDS[2]}" =~ ^--rm|--del|--mod|--edit|--show$ ]] ; then strings="$strings ${profiles[@]}" fi fi if [ "${COMP_WORDS[2]}" = "profile" ]; then if [[ "${COMP_WORDS[1]}" =~ ^--rm|--remove|--del|--mod|--edit|--show$ ]] ; then strings="$strings ${profiles[@]}" fi fi COMPREPLY=($(compgen -W "$strings" -- "${COMP_WORDS[3]}")) fi } complete -o nosort -F _conn conn ''' def _getallnodes(self): #get all nodes on configfile nodes = [] layer1 = [k for k,v in self.config.connections.items() if isinstance(v, dict) and v["type"] == "connection"] folders = [k for k,v in self.config.connections.items() if isinstance(v, dict) and v["type"] == "folder"] nodes.extend(layer1) for f in folders: layer2 = [k + "@" + f for k,v in self.config.connections[f].items() if isinstance(v, dict) and v["type"] == "connection"] nodes.extend(layer2) subfolders = [k for k,v in self.config.connections[f].items() if isinstance(v, dict) and v["type"] == "subfolder"] for s in subfolders: layer3 = [k + "@" + s + "@" + f for k,v in self.config.connections[f][s].items() if isinstance(v, dict) and v["type"] == "connection"] nodes.extend(layer3) return nodes def _getallfolders(self): #get all folders on configfile folders = ["@" + k for k,v in self.config.connections.items() if isinstance(v, dict) and v["type"] == "folder"] subfolders = [] for f in folders: s = ["@" + k + f for k,v in self.config.connections[f[1:]].items() if isinstance(v, dict) and v["type"] == "subfolder"] subfolders.extend(s) folders.extend(subfolders) return folders def _profileused(self, profile): #Check if profile is used before deleting it nodes = [] layer1 = [k for k,v in self.config.connections.items() if isinstance(v, dict) and v["type"] == "connection" and ("@" + profile in v.values() or ( isinstance(v["password"],list) and "@" + profile in v["password"]))] folders = [k for k,v in self.config.connections.items() if isinstance(v, dict) and v["type"] == "folder"] nodes.extend(layer1) for f in folders: layer2 = [k + "@" + f for k,v in self.config.connections[f].items() if isinstance(v, dict) and v["type"] == "connection" and ("@" + profile in v.values() or ( isinstance(v["password"],list) and "@" + profile in v["password"]))] nodes.extend(layer2) subfolders = [k for k,v in self.config.connections[f].items() if isinstance(v, dict) and v["type"] == "subfolder"] for s in subfolders: layer3 = [k + "@" + s + "@" + f for k,v in self.config.connections[f][s].items() if isinstance(v, dict) and v["type"] == "connection" and ("@" + profile in v.values() or ( isinstance(v["password"],list) and "@" + profile in v["password"]))] nodes.extend(layer3) return nodes def encrypt(self, password, keyfile=None): ''' Encrypts password using RSA keyfile ### Parameters: - password (str): Plaintext password to encrypt. ### Optional Parameters: - keyfile (str): Path/file to keyfile. Default is config keyfile. ### Returns: str: Encrypted password. ''' if keyfile is None: keyfile = self.config.key key = RSA.import_key(open(keyfile).read()) publickey = key.publickey() encryptor = PKCS1_OAEP.new(publickey) password = encryptor.encrypt(password.encode("utf-8")) return str(password)
Methods
def encrypt(self, password, keyfile=None)
-
Encrypts password using RSA keyfile
Parameters:
- password (str): Plaintext password to encrypt.
Optional Parameters:
- keyfile (str): Path/file to keyfile. Default is config keyfile.
Returns:
str: Encrypted password.
Expand source code
def encrypt(self, password, keyfile=None): ''' Encrypts password using RSA keyfile ### Parameters: - password (str): Plaintext password to encrypt. ### Optional Parameters: - keyfile (str): Path/file to keyfile. Default is config keyfile. ### Returns: str: Encrypted password. ''' if keyfile is None: keyfile = self.config.key key = RSA.import_key(open(keyfile).read()) publickey = key.publickey() encryptor = PKCS1_OAEP.new(publickey) password = encryptor.encrypt(password.encode("utf-8")) return str(password)
class node (unique, host, options='', logs='', password='', port='', protocol='', user='', config='')
-
This class generates a node object. Containts all the information and methods to connect and interact with a device using ssh or telnet.
Attributes:
- output (str): Output of the commands you ran with run or test method. - result(bool): True if expected value is found after running the commands using test method.
Parameters:
- unique (str): Unique name to assign to the node. - host (str): IP address or hostname of the node.
Optional Parameters:
- options (str): Additional options to pass the ssh/telnet for connection. - logs (str): Path/file for storing the logs. You can use ${unique},${host}, ${port}, ${user}, ${protocol} as variables. - password (str): Encrypted or plaintext password. - port (str): Port to connect to node, default 22 for ssh and 23 for telnet. - protocol (str): Select ssh or telnet. Default is ssh. - user (str): Username to of the node. - config (obj): Pass the object created with class configfile with key for decryption and extra configuration if you are using connection manager.
Expand source code
class node: ''' This class generates a node object. Containts all the information and methods to connect and interact with a device using ssh or telnet. ### Attributes: - output (str): Output of the commands you ran with run or test method. - result(bool): True if expected value is found after running the commands using test method. ''' def __init__(self, unique, host, options='', logs='', password='', port='', protocol='', user='', config=''): ''' ### Parameters: - unique (str): Unique name to assign to the node. - host (str): IP address or hostname of the node. ### Optional Parameters: - options (str): Additional options to pass the ssh/telnet for connection. - logs (str): Path/file for storing the logs. You can use ${unique},${host}, ${port}, ${user}, ${protocol} as variables. - password (str): Encrypted or plaintext password. - port (str): Port to connect to node, default 22 for ssh and 23 for telnet. - protocol (str): Select ssh or telnet. Default is ssh. - user (str): Username to of the node. - config (obj): Pass the object created with class configfile with key for decryption and extra configuration if you are using connection manager. ''' if config == '': self.idletime = 0 self.key = None else: self.idletime = config.config["idletime"] self.key = config.key self.unique = unique attr = {"host": host, "logs": logs, "options":options, "port": port, "protocol": protocol, "user": user} for key in attr: profile = re.search("^@(.*)", attr[key]) if profile and config != '': setattr(self,key,config.profiles[profile.group(1)][key]) elif attr[key] == '' and key == "protocol": try: setattr(self,key,config.profiles["default"][key]) except: setattr(self,key,"ssh") else: setattr(self,key,attr[key]) if isinstance(password,list): self.password = [] for i, s in enumerate(password): profile = re.search("^@(.*)", password[i]) if profile and config != '': self.password.append(config.profiles[profile.group(1)]["password"]) else: self.password = [password] def __passtx(self, passwords, *, keyfile=None): # decrypts passwords, used by other methdos. dpass = [] if keyfile is None: keyfile = self.key if keyfile is not None: key = RSA.import_key(open(keyfile).read()) decryptor = PKCS1_OAEP.new(key) for passwd in passwords: if not re.match('^b[\"\'].+[\"\']$', passwd): dpass.append(passwd) else: try: decrypted = decryptor.decrypt(ast.literal_eval(passwd)).decode("utf-8") dpass.append(decrypted) except: raise ValueError("Missing or corrupted key") return dpass def _logfile(self, logfile = None): # translate logs variables and generate logs path. if logfile == None: logfile = self.logs logfile = logfile.replace("${unique}", self.unique) logfile = logfile.replace("${host}", self.host) logfile = logfile.replace("${port}", self.port) logfile = logfile.replace("${user}", self.user) logfile = logfile.replace("${protocol}", self.protocol) now = datetime.datetime.now() dateconf = re.search(r'\$\{date \'(.*)\'}', logfile) if dateconf: logfile = re.sub(r'\$\{date (.*)}',now.strftime(dateconf.group(1)), logfile) return logfile def _logclean(self, logfile, var = False): #Remove special ascii characters and other stuff from logfile. if var == False: t = open(logfile, "r").read() else: t = logfile t = t.replace("\n","",1).replace("\a","") t = t.replace('\n\n', '\n') t = re.sub(r'.\[K', '', t) while True: tb = re.sub('.\b', '', t, count=1) if len(t) == len(tb): break t = tb ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/ ]*[@-~])') t = ansi_escape.sub('', t) if var == False: d = open(logfile, "w") d.write(t) d.close() return else: return t def interact(self, debug = False): ''' Allow user to interact with the node directly, mostly used by connection manager. ### Optional Parameters: - debug (bool): If True, display all the connecting information before interact. Default False. ''' connect = self._connect(debug = debug) if connect == True: size = re.search('columns=([0-9]+).*lines=([0-9]+)',str(os.get_terminal_size())) self.child.setwinsize(int(size.group(2)),int(size.group(1))) print("Connected to " + self.unique + " at " + self.host + (":" if self.port != '' else '') + self.port + " via: " + self.protocol) if 'logfile' in dir(self): self.child.logfile_read = open(self.logfile, "wb") elif debug: self.child.logfile_read = None if 'missingtext' in dir(self): print(self.child.after.decode(), end='') self.child.interact() if "logfile" in dir(self) and not debug: self._logclean(self.logfile) else: print(connect) exit(1) def run(self, commands,*, folder = '', prompt = r'>$|#$|\$$|>.$|#.$|\$.$', stdout = False): ''' Run a command or list of commands on the node and return the output. ### Parameters: - commands (str/list): Commands to run on the node. Should be str or a list of str. ### Optional Named Parameters: - folder (str): Path where output log should be stored, leave empty to disable logging. - prompt (str): Prompt to be expected after a command is finished running. Usually linux uses ">" or EOF while routers use ">" or "#". The default value should work for most nodes. Change it if your connection need some special symbol. - stdout (bool):Set True to send the command output to stdout. default False. ### Returns: str: Output of the commands you ran on the node. ''' connect = self._connect() if connect == True: expects = [prompt, pexpect.EOF] output = '' if isinstance(commands, list): for c in commands: result = self.child.expect(expects) self.child.sendline(c) if result == 0: output = output + self.child.before.decode() + self.child.after.decode() if result == 1: output = output + self.child.before.decode() else: result = self.child.expect(expects) self.child.sendline(commands) if result == 0: output = output + self.child.before.decode() + self.child.after.decode() if result == 1: output = output + self.child.before.decode() result = self.child.expect(expects) if result == 0: output = output + self.child.before.decode() + self.child.after.decode() if result == 1: output = output + self.child.before.decode() self.child.close() output = output.lstrip() if stdout == True: print(output) if folder != '': with open(folder + "/" + self.unique, "w") as f: f.write(output) f.close() self._logclean(folder + "/" + self.unique) self.output = output return output else: self.output = connect return connect def test(self, commands, expected, *, prompt = r'>$|#$|\$$|>.$|#.$|\$.$'): ''' Run a command or list of commands on the node, then check if expected value appears on the output after the last command. ### Parameters: - commands (str/list): Commands to run on the node. Should be str or list of str. - expected (str) : Expected text to appear after running all the commands on the node. ### Optional Named Parameters: - prompt (str): Prompt to be expected after a command is finished running. Usually linux uses ">" or EOF while routers use ">" or "#". The default value should work for most nodes. Change it if your connection need some special symbol. ### Returns: bool: true if expected value is found after running the commands false if prompt is found before. ''' connect = self._connect() if connect == True: expects = [prompt, pexpect.EOF] output = '' if isinstance(commands, list): for c in commands: result = self.child.expect(expects) self.child.sendline(c) if result == 0: output = output + self.child.before.decode() + self.child.after.decode() if result == 1: output = output + self.child.before.decode() else: self.child.expect(expects) self.child.sendline(commands) output = output + self.child.before.decode() + self.child.after.decode() expects = [expected, prompt, pexpect.EOF] results = self.child.expect(expects) if results == 0: self.child.close() self.result = True output = output + self.child.before.decode() + self.child.after.decode() output = output.lstrip() self.output = output return True if results in [1, 2]: self.child.close() self.result = False if results == 1: output = output + self.child.before.decode() + self.child.after.decode() elif results == 2: output = output + self.child.before.decode() output = output.lstrip() self.output = output return False else: self.result = None self.output = connect return connect def _connect(self, debug = False): # Method to connect to the node, it parse all the information, create the ssh/telnet command and login to the node. if self.protocol == "ssh": cmd = "ssh" if self.idletime > 0: cmd = cmd + " -o ServerAliveInterval=" + str(self.idletime) if self.user == '': cmd = cmd + " -t {}".format(self.host) else: cmd = cmd + " -t {}".format("@".join([self.user,self.host])) if self.port != '': cmd = cmd + " -p " + self.port if self.options != '': cmd = cmd + " " + self.options if self.logs != '': self.logfile = self._logfile() if self.password[0] != '': passwords = self.__passtx(self.password) else: passwords = [] expects = ['yes/no', 'refused', 'supported', 'cipher', 'sage', 'timeout', 'unavailable', 'closed', '[p|P]assword:|[u|U]sername:', r'>$|#$|\$$|>.$|#.$|\$.$', 'suspend', pexpect.EOF, "No route to host", "resolve hostname", "no matching host key"] elif self.protocol == "telnet": cmd = "telnet " + self.host if self.port != '': cmd = cmd + " " + self.port if self.options != '': cmd = cmd + " " + self.options if self.logs != '': self.logfile = self._logfile() if self.password[0] != '': passwords = self.__passtx(self.password) else: passwords = [] expects = ['[u|U]sername:', 'refused', 'supported', 'cipher', 'sage', 'timeout', 'unavailable', 'closed', '[p|P]assword:', r'>$|#$|\$$|>.$|#.$|\$.$', 'suspend', pexpect.EOF, "No route to host", "resolve hostname", "no matching host key"] else: raise ValueError("Invalid protocol: " + self.protocol) child = pexpect.spawn(cmd) if debug: child.logfile_read = sys.stdout.buffer if len(passwords) > 0: loops = len(passwords) else: loops = 1 endloop = False for i in range(0, loops): while True: results = child.expect(expects) if results == 0: if self.protocol == "ssh": child.sendline('yes') elif self.protocol == "telnet": if self.user != '': child.sendline(self.user) else: self.missingtext = True break if results in [1, 2, 3, 4, 5, 6, 7, 12, 13, 14]: child.close() return "Connection failed code:" + str(results) if results == 8: if len(passwords) > 0: child.sendline(passwords[i]) else: self.missingtext = True break if results in [9, 11]: endloop = True child.sendline() break if results == 10: child.sendline("\r") sleep(2) if endloop: break child.readline(0) self.child = child return True
Methods
def interact(self, debug=False)
-
Allow user to interact with the node directly, mostly used by connection manager.
Optional Parameters:
- debug (bool): If True, display all the connecting information before interact. Default False.
Expand source code
def interact(self, debug = False): ''' Allow user to interact with the node directly, mostly used by connection manager. ### Optional Parameters: - debug (bool): If True, display all the connecting information before interact. Default False. ''' connect = self._connect(debug = debug) if connect == True: size = re.search('columns=([0-9]+).*lines=([0-9]+)',str(os.get_terminal_size())) self.child.setwinsize(int(size.group(2)),int(size.group(1))) print("Connected to " + self.unique + " at " + self.host + (":" if self.port != '' else '') + self.port + " via: " + self.protocol) if 'logfile' in dir(self): self.child.logfile_read = open(self.logfile, "wb") elif debug: self.child.logfile_read = None if 'missingtext' in dir(self): print(self.child.after.decode(), end='') self.child.interact() if "logfile" in dir(self) and not debug: self._logclean(self.logfile) else: print(connect) exit(1)
def run(self, commands, *, folder='', prompt='>$|#$|\\$$|>.$|#.$|\\$.$', stdout=False)
-
Run a command or list of commands on the node and return the output.
Parameters:
- commands (str/list): Commands to run on the node. Should be str or a list of str.
Optional Named Parameters:
- folder (str): Path where output log should be stored, leave empty to disable logging. - prompt (str): Prompt to be expected after a command is finished running. Usually linux uses ">" or EOF while routers use ">" or "#". The default value should work for most nodes. Change it if your connection need some special symbol. - stdout (bool):Set True to send the command output to stdout. default False.
Returns:
str: Output of the commands you ran on the node.
Expand source code
def run(self, commands,*, folder = '', prompt = r'>$|#$|\$$|>.$|#.$|\$.$', stdout = False): ''' Run a command or list of commands on the node and return the output. ### Parameters: - commands (str/list): Commands to run on the node. Should be str or a list of str. ### Optional Named Parameters: - folder (str): Path where output log should be stored, leave empty to disable logging. - prompt (str): Prompt to be expected after a command is finished running. Usually linux uses ">" or EOF while routers use ">" or "#". The default value should work for most nodes. Change it if your connection need some special symbol. - stdout (bool):Set True to send the command output to stdout. default False. ### Returns: str: Output of the commands you ran on the node. ''' connect = self._connect() if connect == True: expects = [prompt, pexpect.EOF] output = '' if isinstance(commands, list): for c in commands: result = self.child.expect(expects) self.child.sendline(c) if result == 0: output = output + self.child.before.decode() + self.child.after.decode() if result == 1: output = output + self.child.before.decode() else: result = self.child.expect(expects) self.child.sendline(commands) if result == 0: output = output + self.child.before.decode() + self.child.after.decode() if result == 1: output = output + self.child.before.decode() result = self.child.expect(expects) if result == 0: output = output + self.child.before.decode() + self.child.after.decode() if result == 1: output = output + self.child.before.decode() self.child.close() output = output.lstrip() if stdout == True: print(output) if folder != '': with open(folder + "/" + self.unique, "w") as f: f.write(output) f.close() self._logclean(folder + "/" + self.unique) self.output = output return output else: self.output = connect return connect
def test(self, commands, expected, *, prompt='>$|#$|\\$$|>.$|#.$|\\$.$')
-
Run a command or list of commands on the node, then check if expected value appears on the output after the last command.
Parameters:
- commands (str/list): Commands to run on the node. Should be str or list of str. - expected (str) : Expected text to appear after running all the commands on the node.
Optional Named Parameters:
- prompt (str): Prompt to be expected after a command is finished running. Usually linux uses ">" or EOF while routers use ">" or "#". The default value should work for most nodes. Change it if your connection need some special symbol.
Returns:
bool: true if expected value is found after running the commands false if prompt is found before.
Expand source code
def test(self, commands, expected, *, prompt = r'>$|#$|\$$|>.$|#.$|\$.$'): ''' Run a command or list of commands on the node, then check if expected value appears on the output after the last command. ### Parameters: - commands (str/list): Commands to run on the node. Should be str or list of str. - expected (str) : Expected text to appear after running all the commands on the node. ### Optional Named Parameters: - prompt (str): Prompt to be expected after a command is finished running. Usually linux uses ">" or EOF while routers use ">" or "#". The default value should work for most nodes. Change it if your connection need some special symbol. ### Returns: bool: true if expected value is found after running the commands false if prompt is found before. ''' connect = self._connect() if connect == True: expects = [prompt, pexpect.EOF] output = '' if isinstance(commands, list): for c in commands: result = self.child.expect(expects) self.child.sendline(c) if result == 0: output = output + self.child.before.decode() + self.child.after.decode() if result == 1: output = output + self.child.before.decode() else: self.child.expect(expects) self.child.sendline(commands) output = output + self.child.before.decode() + self.child.after.decode() expects = [expected, prompt, pexpect.EOF] results = self.child.expect(expects) if results == 0: self.child.close() self.result = True output = output + self.child.before.decode() + self.child.after.decode() output = output.lstrip() self.output = output return True if results in [1, 2]: self.child.close() self.result = False if results == 1: output = output + self.child.before.decode() + self.child.after.decode() elif results == 2: output = output + self.child.before.decode() output = output.lstrip() self.output = output return False else: self.result = None self.output = connect return connect
class nodes (nodes: dict, config='')
-
This class generates a nodes object. Contains a list of node class objects and methods to run multiple tasks on nodes simultaneously.
Attributes:
- nodelist (list): List of node class objects passed to the init function. - output (dict): Dictionary formed by nodes unique as keys, output of the commands you ran on the node as value. Created after running methods run or test. - result (dict): Dictionary formed by nodes unique as keys, value is True if expected value is found after running the commands, False if prompt is found before. Created after running method test. - <unique> (obj): For each item in nodelist, there is an attribute generated with the node unique.
Parameters:
- nodes (dict): Dictionary formed by node information: Keys: Unique name for each node. Mandatory Subkeys: host(str). Optional Subkeys: options(str), logs(str), password(str), port(str), protocol(str), user(str). For reference on subkeys check node class.
Optional Parameters:
- config (obj): Pass the object created with class configfile with key for decryption and extra configuration if you are using connection manager.
Expand source code
class nodes: ''' This class generates a nodes object. Contains a list of node class objects and methods to run multiple tasks on nodes simultaneously. ### Attributes: - nodelist (list): List of node class objects passed to the init function. - output (dict): Dictionary formed by nodes unique as keys, output of the commands you ran on the node as value. Created after running methods run or test. - result (dict): Dictionary formed by nodes unique as keys, value is True if expected value is found after running the commands, False if prompt is found before. Created after running method test. - <unique> (obj): For each item in nodelist, there is an attribute generated with the node unique. ''' def __init__(self, nodes: dict, config = ''): ''' ### Parameters: - nodes (dict): Dictionary formed by node information: Keys: Unique name for each node. Mandatory Subkeys: host(str). Optional Subkeys: options(str), logs(str), password(str), port(str), protocol(str), user(str). For reference on subkeys check node class. ### Optional Parameters: - config (obj): Pass the object created with class configfile with key for decryption and extra configuration if you are using connection manager. ''' self.nodelist = [] self.config = config for n in nodes: this = node(n, **nodes[n], config = config) self.nodelist.append(this) setattr(self,n,this) def _splitlist(self, lst, n): #split a list in lists of n members. for i in range(0, len(lst), n): yield lst[i:i + n] def run(self, commands,*, folder = None, prompt = None, stdout = None, parallel = 10): ''' Run a command or list of commands on all the nodes in nodelist. ### Parameters: commands (str/list): Commands to run on the node. Should be str or list of str. ### Optional Named Parameters: folder (str): Path where output log should be stored, leave empty to disable logging. prompt (str): Prompt to be expected after a command is finished running. Usually linux uses ">" or EOF while routers use ">" or "#". The default value should work for most nodes. Change it if your connection need some special symbol. stdout (bool): Set True to send the command output to stdout. Default False. parallel (int): Number of nodes to run the commands simultaneously. Default is 10, if there are more nodes that this value, nodes are groups in groups with max this number of members. ###Returns: dict: Dictionary formed by nodes unique as keys, Output of the commands you ran on the node as value. ''' args = {} args["commands"] = commands if folder != None: args["folder"] = folder if prompt != None: args["prompt"] = prompt if stdout != None: args["stdout"] = stdout output = {} tasks = [] for n in self.nodelist: tasks.append(threading.Thread(target=n.run, kwargs=args)) taskslist = list(self._splitlist(tasks, parallel)) for t in taskslist: for i in t: i.start() for i in t: i.join() for i in self.nodelist: output[i.unique] = i.output self.output = output return output def test(self, commands, expected, *, prompt = None, parallel = 10): ''' Run a command or list of commands on all the nodes in nodelist, then check if expected value appears on the output after the last command. ### Parameters: commands (str/list): Commands to run on the node. Should be str or list of str. expected (str) : Expected text to appear after running all the commands on the node. ### Optional Named Parameters: prompt (str): Prompt to be expected after a command is finished running. Usually linux uses ">" or EOF while routers use ">" or "#". The default value should work for most nodes. Change it if your connection need some special symbol. ### Returns: dict: Dictionary formed by nodes unique as keys, value is True if expected value is found after running the commands, False if prompt is found before. ''' args = {} args["commands"] = commands args["expected"] = expected if prompt != None: args["prompt"] = prompt output = {} result = {} tasks = [] for n in self.nodelist: tasks.append(threading.Thread(target=n.test, kwargs=args)) taskslist = list(self._splitlist(tasks, parallel)) for t in taskslist: for i in t: i.start() for i in t: i.join() for i in self.nodelist: result[i.unique] = i.result output[i.unique] = i.output self.output = output self.result = result return result
Methods
def run(self, commands, *, folder=None, prompt=None, stdout=None, parallel=10)
-
Run a command or list of commands on all the nodes in nodelist.
Parameters:
commands (str/list): Commands to run on the node. Should be str or list of str.
Optional Named Parameters:
folder (str): Path where output log should be stored, leave empty to disable logging. prompt (str): Prompt to be expected after a command is finished running. Usually linux uses ">" or EOF while routers use ">" or "#". The default value should work for most nodes. Change it if your connection need some special symbol. stdout (bool): Set True to send the command output to stdout. Default False. parallel (int): Number of nodes to run the commands simultaneously. Default is 10, if there are more nodes that this value, nodes are groups in groups with max this number of members.
Returns:
dict: Dictionary formed by nodes unique as keys, Output of the commands you ran on the node as value.
Expand source code
def run(self, commands,*, folder = None, prompt = None, stdout = None, parallel = 10): ''' Run a command or list of commands on all the nodes in nodelist. ### Parameters: commands (str/list): Commands to run on the node. Should be str or list of str. ### Optional Named Parameters: folder (str): Path where output log should be stored, leave empty to disable logging. prompt (str): Prompt to be expected after a command is finished running. Usually linux uses ">" or EOF while routers use ">" or "#". The default value should work for most nodes. Change it if your connection need some special symbol. stdout (bool): Set True to send the command output to stdout. Default False. parallel (int): Number of nodes to run the commands simultaneously. Default is 10, if there are more nodes that this value, nodes are groups in groups with max this number of members. ###Returns: dict: Dictionary formed by nodes unique as keys, Output of the commands you ran on the node as value. ''' args = {} args["commands"] = commands if folder != None: args["folder"] = folder if prompt != None: args["prompt"] = prompt if stdout != None: args["stdout"] = stdout output = {} tasks = [] for n in self.nodelist: tasks.append(threading.Thread(target=n.run, kwargs=args)) taskslist = list(self._splitlist(tasks, parallel)) for t in taskslist: for i in t: i.start() for i in t: i.join() for i in self.nodelist: output[i.unique] = i.output self.output = output return output
def test(self, commands, expected, *, prompt=None, parallel=10)
-
Run a command or list of commands on all the nodes in nodelist, then check if expected value appears on the output after the last command.
Parameters:
commands (str/list): Commands to run on the node. Should be str or list of str. expected (str) : Expected text to appear after running all the commands on the node.
Optional Named Parameters:
prompt (str): Prompt to be expected after a command is finished running. Usually linux uses ">" or EOF while routers use ">" or "#". The default value should work for most nodes. Change it if your connection need some special symbol.
Returns:
dict: Dictionary formed by nodes unique as keys, value is True if expected value is found after running the commands, False if prompt is found before.
Expand source code
def test(self, commands, expected, *, prompt = None, parallel = 10): ''' Run a command or list of commands on all the nodes in nodelist, then check if expected value appears on the output after the last command. ### Parameters: commands (str/list): Commands to run on the node. Should be str or list of str. expected (str) : Expected text to appear after running all the commands on the node. ### Optional Named Parameters: prompt (str): Prompt to be expected after a command is finished running. Usually linux uses ">" or EOF while routers use ">" or "#". The default value should work for most nodes. Change it if your connection need some special symbol. ### Returns: dict: Dictionary formed by nodes unique as keys, value is True if expected value is found after running the commands, False if prompt is found before. ''' args = {} args["commands"] = commands args["expected"] = expected if prompt != None: args["prompt"] = prompt output = {} result = {} tasks = [] for n in self.nodelist: tasks.append(threading.Thread(target=n.test, kwargs=args)) taskslist = list(self._splitlist(tasks, parallel)) for t in taskslist: for i in t: i.start() for i in t: i.join() for i in self.nodelist: result[i.unique] = i.result output[i.unique] = i.output self.output = output self.result = result return result