Package conn
Connection manager
conn is a connection manager that allows you to store nodes to connect them fast and password free.
Features
- You can generate profiles and reference them from nodes using @profilename 
  so you dont need to edit multiple nodes when changing password or other 
  information.
- Nodes can be stored on @folder or @subfolder@folder to organize your 
  devices. Then can be referenced using node@subfolder@folder or node@folder
- Much more!
Usage
usage: conn [-h] [--add | --del | --mod | --show | --debug] [node|folder]
       conn {profile,move,mv,copy,cp,list,ls,bulk,config} ...
positional arguments:
  node|folder    node[@subfolder][@folder]
                 Connect to specific node or show all matching nodes
                 [@subfolder][@folder]
                 Show all available connections globaly or in specified path
Options:
  -h, --help     show this help message and exit
  --add          Add new node[@subfolder][@folder] or [@subfolder]@folder
  --del, --rm    Delete node[@subfolder][@folder] or [@subfolder]@folder
  --mod, --edit  Modify node[@subfolder][@folder]
  --show         Show node[@subfolder][@folder]
  --debug, -d    Display all conections steps
Commands:
  profile        Manage profiles
  move (mv)      Move node
  copy (cp)      Copy node
  list (ls)      List profiles, nodes or folders
  bulk           Add nodes in bulk
  config         Manage app config
Manage profiles
usage: conn profile [-h] (--add | --del | --mod | --show) profile
positional arguments:
  profile        Name of profile to manage
options:
  -h, --help     show this help message and exit
  --add          Add new profile
  --del, --rm    Delete profile
  --mod, --edit  Modify profile
  --show         Show profile
Examples
   conn profile --add office-user
   conn --add @office
   conn --add @datacenter@office
   conn --add server@datacenter@office
   conn --add pc@office
   conn --show server@datacenter@office
   conn pc@office
   conn server
Automation module
the automation module
Standalone module
import conn
router = conn.node("unique name","ip/hostname", user="username", password="pass")
router.run(["term len 0","show run"])
print(router.output)
hasip = router.test("show ip int brief","1.1.1.1")
if hasip:
    print("Router has ip 1.1.1.1")
else:
    print("router don't has ip 1.1.1.1")
Using manager configuration
import conn
conf = conn.configfile()
device = conf.getitem("server@office")
server = conn.node("unique name", **device, config=conf)
result = server.run(["cd /", "ls -la"])
print(result)
Running parallel tasks
import conn
conf = conn.configfile()
#You can get the nodes from the config from a folder and fitlering in it
nodes = conf.getitem("@office", ["router1", "router2", "router3"])
#You can also get each node individually:
nodes = {}
nodes["router1"] = conf.getitem("router1@office")
nodes["router2"] = conf.getitem("router2@office")
nodes["router10"] = conf.getitem("router10@datacenter")
#Also, you can create the nodes manually:
nodes = {}
nodes["router1"] = {"host": "1.1.1.1", "user": "username", "password": "pass1"}
nodes["router2"] = {"host": "1.1.1.2", "user": "username", "password": "pass2"}
nodes["router3"] = {"host": "1.1.1.2", "user": "username", "password": "pass3"}
#Finally you run some tasks on the nodes
mynodes = conn.nodes(nodes, config = conf)
result = mynodes.test(["show ip int br"], "1.1.1.2")
for i in result:
    print("---" + i + "---")
    print(result[i])
    print()
# Or for one specific node
mynodes.router1.run(["term len 0". "show run"], folder = "/home/user/logs")
Expand source code
#!/usr/bin/env python3
'''
## Connection manager
conn is a connection manager that allows you to store nodes to connect them fast and password free.
### Features
    - You can generate profiles and reference them from nodes using @profilename 
      so you dont need to edit multiple nodes when changing password or other 
      information.
    - Nodes can be stored on @folder or @subfolder@folder to organize your 
      devices. Then can be referenced using node@subfolder@folder or node@folder
    - Much more!
### Usage
```
usage: conn [-h] [--add | --del | --mod | --show | --debug] [node|folder]
       conn {profile,move,mv,copy,cp,list,ls,bulk,config} ...
positional arguments:
  node|folder    node[@subfolder][@folder]
                 Connect to specific node or show all matching nodes
                 [@subfolder][@folder]
                 Show all available connections globaly or in specified path
Options:
  -h, --help     show this help message and exit
  --add          Add new node[@subfolder][@folder] or [@subfolder]@folder
  --del, --rm    Delete node[@subfolder][@folder] or [@subfolder]@folder
  --mod, --edit  Modify node[@subfolder][@folder]
  --show         Show node[@subfolder][@folder]
  --debug, -d    Display all conections steps
Commands:
  profile        Manage profiles
  move (mv)      Move node
  copy (cp)      Copy node
  list (ls)      List profiles, nodes or folders
  bulk           Add nodes in bulk
  config         Manage app config
```
###   Manage profiles
```
usage: conn profile [-h] (--add | --del | --mod | --show) profile
positional arguments:
  profile        Name of profile to manage
options:
  -h, --help     show this help message and exit
  --add          Add new profile
  --del, --rm    Delete profile
  --mod, --edit  Modify profile
  --show         Show profile
```
###   Examples
```
   conn profile --add office-user
   conn --add @office
   conn --add @datacenter@office
   conn --add server@datacenter@office
   conn --add pc@office
   conn --show server@datacenter@office
   conn pc@office
   conn server
``` 
## Automation module
the automation module
### Standalone module
```
import conn
router = conn.node("unique name","ip/hostname", user="username", password="pass")
router.run(["term len 0","show run"])
print(router.output)
hasip = router.test("show ip int brief","1.1.1.1")
if hasip:
    print("Router has ip 1.1.1.1")
else:
    print("router don't has ip 1.1.1.1")
```
### Using manager configuration
```
import conn
conf = conn.configfile()
device = conf.getitem("server@office")
server = conn.node("unique name", **device, config=conf)
result = server.run(["cd /", "ls -la"])
print(result)
```
### Running parallel tasks 
```
import conn
conf = conn.configfile()
#You can get the nodes from the config from a folder and fitlering in it
nodes = conf.getitem("@office", ["router1", "router2", "router3"])
#You can also get each node individually:
nodes = {}
nodes["router1"] = conf.getitem("router1@office")
nodes["router2"] = conf.getitem("router2@office")
nodes["router10"] = conf.getitem("router10@datacenter")
#Also, you can create the nodes manually:
nodes = {}
nodes["router1"] = {"host": "1.1.1.1", "user": "username", "password": "pass1"}
nodes["router2"] = {"host": "1.1.1.2", "user": "username", "password": "pass2"}
nodes["router3"] = {"host": "1.1.1.2", "user": "username", "password": "pass3"}
#Finally you run some tasks on the nodes
mynodes = conn.nodes(nodes, config = conf)
result = mynodes.test(["show ip int br"], "1.1.1.2")
for i in result:
    print("---" + i + "---")
    print(result[i])
    print()
# Or for one specific node
mynodes.router1.run(["term len 0". "show run"], folder = "/home/user/logs")
```
'''
from .core import node,nodes
from .configfile import configfile
from .connapp import connapp
from pkg_resources import get_distribution
__all__ = ["node", "nodes", "configfile", "connapp"]
__version__ = "2.0.10"
__author__ = "Federico Luzzi"
__pdoc__ = {
    'core': False,
}Classes
- class configfile (conf=None, key=None)
- 
This class generates a configfile object. Containts a dictionary storing, config, nodes and profiles, normaly used by connection manager. Attributes:- file (str): Path/file to config file. - key (str): Path/file to RSA key file. - config (dict): Dictionary containing information of connection manager configuration. - connections (dict): Dictionary containing all the nodes added to connection manager. - profiles (dict): Dictionary containing all the profiles added to connection manager. - privatekey (obj): Object containing the private key to encrypt passwords. - publickey (obj): Object containing the public key to decrypt passwords.Optional Parameters:- conf (str): Path/file to config file. If left empty default path is ~/.config/conn/config.json - key (str): Path/file to RSA key file. If left empty default path is ~/.config/conn/.oskExpand source codeclass configfile: ''' This class generates a configfile object. Containts a dictionary storing, config, nodes and profiles, normaly used by connection manager. ### Attributes: - file (str): Path/file to config file. - key (str): Path/file to RSA key file. - config (dict): Dictionary containing information of connection manager configuration. - connections (dict): Dictionary containing all the nodes added to connection manager. - profiles (dict): Dictionary containing all the profiles added to connection manager. - privatekey (obj): Object containing the private key to encrypt passwords. - publickey (obj): Object containing the public key to decrypt passwords. ''' def __init__(self, conf = None, key = None): ''' ### Optional Parameters: - conf (str): Path/file to config file. If left empty default path is ~/.config/conn/config.json - key (str): Path/file to RSA key file. If left empty default path is ~/.config/conn/.osk ''' home = os.path.expanduser("~") defaultdir = home + '/.config/conn' defaultfile = defaultdir + '/config.json' defaultkey = defaultdir + '/.osk' Path(defaultdir).mkdir(parents=True, exist_ok=True) if conf == None: self.file = defaultfile else: self.file = conf if key == None: self.key = defaultkey else: self.key = key if os.path.exists(self.file): config = self._loadconfig(self.file) else: config = self._createconfig(self.file) self.config = config["config"] self.connections = config["connections"] self.profiles = config["profiles"] if not os.path.exists(self.key): self._createkey(self.key) self.privatekey = RSA.import_key(open(self.key).read()) self.publickey = self.privatekey.publickey() def _loadconfig(self, conf): #Loads config file jsonconf = open(conf) return json.load(jsonconf) def _createconfig(self, conf): #Create config file defaultconfig = {'config': {'case': False, 'idletime': 30}, 'connections': {}, 'profiles': { "default": { "host":"", "protocol":"ssh", "port":"", "user":"", "password":"", "options":"", "logs":"" }}} if not os.path.exists(conf): with open(conf, "w") as f: json.dump(defaultconfig, f, indent = 4) f.close() os.chmod(conf, 0o600) jsonconf = open(conf) return json.load(jsonconf) def _saveconfig(self, conf): #Save config file newconfig = {"config":{}, "connections": {}, "profiles": {}} newconfig["config"] = self.config newconfig["connections"] = self.connections newconfig["profiles"] = self.profiles with open(conf, "w") as f: json.dump(newconfig, f, indent = 4) f.close() def _createkey(self, keyfile): #Create key file key = RSA.generate(2048) with open(keyfile,'wb') as f: f.write(key.export_key('PEM')) f.close() os.chmod(keyfile, 0o600) def _explode_unique(self, unique): #Divide unique name into folder, subfolder and id uniques = unique.split("@") if not unique.startswith("@"): result = {"id": uniques[0]} else: result = {} if len(uniques) == 2: result["folder"] = uniques[1] if result["folder"] == "": return False elif len(uniques) == 3: result["folder"] = uniques[2] result["subfolder"] = uniques[1] if result["folder"] == "" or result["subfolder"] == "": return False elif len(uniques) > 3: return False return result def getitem(self, unique, keys = None): ''' Get an node or a group of nodes from configfile which can be passed to node/nodes class ### Parameters: - unique (str): Unique name of the node or folder in config using connection manager style: node[@subfolder][@folder] or [@subfolder]@folder ### Optional Parameters: - keys (list): In case you pass a folder as unique, you can filter nodes inside the folder passing a list. ### Returns: dict: Dictionary containing information of node or multiple dictionaries of multiple nodes. ''' uniques = self._explode_unique(unique) if unique.startswith("@"): if uniques.keys() >= {"folder", "subfolder"}: folder = self.connections[uniques["folder"]][uniques["subfolder"]] else: folder = self.connections[uniques["folder"]] newfolder = folder.copy() newfolder.pop("type") for node in newfolder.keys(): if "type" in newfolder[node].keys(): newfolder[node].pop("type") if keys == None: return newfolder else: f_newfolder = dict((k, newfolder[k]) for k in keys) return f_newfolder else: if uniques.keys() >= {"folder", "subfolder"}: node = self.connections[uniques["folder"]][uniques["subfolder"]][uniques["id"]] elif "folder" in uniques.keys(): node = self.connections[uniques["folder"]][uniques["id"]] else: node = self.connections[uniques["id"]] newnode = node.copy() newnode.pop("type") return newnode def _connections_add(self,*, id, host, folder='', subfolder='', options='', logs='', password='', port='', protocol='', user='', type = "connection" ): #Add connection from config if folder == '': self.connections[id] = {"host": host, "options": options, "logs": logs, "password": password, "port": port, "protocol": protocol, "user": user, "type": type} elif folder != '' and subfolder == '': self.connections[folder][id] = {"host": host, "options": options, "logs": logs, "password": password, "port": port, "protocol": protocol, "user": user, "type": type} elif folder != '' and subfolder != '': self.connections[folder][subfolder][id] = {"host": host, "options": options, "logs": logs, "password": password, "port": port, "protocol": protocol, "user": user, "type": type} def _connections_del(self,*, id, folder='', subfolder=''): #Delete connection from config if folder == '': del self.connections[id] elif folder != '' and subfolder == '': del self.connections[folder][id] elif folder != '' and subfolder != '': del self.connections[folder][subfolder][id] def _folder_add(self,*, folder, subfolder = ''): #Add Folder from config if subfolder == '': if folder not in self.connections: self.connections[folder] = {"type": "folder"} else: if subfolder not in self.connections[folder]: self.connections[folder][subfolder] = {"type": "subfolder"} def _folder_del(self,*, folder, subfolder=''): #Delete folder from config if subfolder == '': del self.connections[folder] else: del self.connections[folder][subfolder] def _profiles_add(self,*, id, host = '', options='', logs='', password='', port='', protocol='', user='' ): #Add profile from config self.profiles[id] = {"host": host, "options": options, "logs": logs, "password": password, "port": port, "protocol": protocol, "user": user} def _profiles_del(self,*, id ): #Delete profile from config del self.profiles[id]Methods- def getitem(self, unique, keys=None)
- 
Get an node or a group of nodes from configfile which can be passed to node/nodes class Parameters:- unique (str): Unique name of the node or folder in config using connection manager style: node[@subfolder][@folder] or [@subfolder]@folderOptional Parameters:- keys (list): In case you pass a folder as unique, you can filter nodes inside the folder passing a list.Returns:dict: Dictionary containing information of node or multiple dictionaries of multiple nodes.Expand source codedef getitem(self, unique, keys = None): ''' Get an node or a group of nodes from configfile which can be passed to node/nodes class ### Parameters: - unique (str): Unique name of the node or folder in config using connection manager style: node[@subfolder][@folder] or [@subfolder]@folder ### Optional Parameters: - keys (list): In case you pass a folder as unique, you can filter nodes inside the folder passing a list. ### Returns: dict: Dictionary containing information of node or multiple dictionaries of multiple nodes. ''' uniques = self._explode_unique(unique) if unique.startswith("@"): if uniques.keys() >= {"folder", "subfolder"}: folder = self.connections[uniques["folder"]][uniques["subfolder"]] else: folder = self.connections[uniques["folder"]] newfolder = folder.copy() newfolder.pop("type") for node in newfolder.keys(): if "type" in newfolder[node].keys(): newfolder[node].pop("type") if keys == None: return newfolder else: f_newfolder = dict((k, newfolder[k]) for k in keys) return f_newfolder else: if uniques.keys() >= {"folder", "subfolder"}: node = self.connections[uniques["folder"]][uniques["subfolder"]][uniques["id"]] elif "folder" in uniques.keys(): node = self.connections[uniques["folder"]][uniques["id"]] else: node = self.connections[uniques["id"]] newnode = node.copy() newnode.pop("type") return newnode
 
- class connapp (config)
- 
This class starts the connection manager app. It's normally used by connection manager but you can use it on a script to run the connection manager your way and use a different configfile and key. Parameters:- config (obj): Object generated with configfile class, it contains the nodes configuration and the methods to manage the config file.Expand source codeclass connapp: ''' This class starts the connection manager app. It's normally used by connection manager but you can use it on a script to run the connection manager your way and use a different configfile and key. ''' def __init__(self, config): ''' ### Parameters: - config (obj): Object generated with configfile class, it contains the nodes configuration and the methods to manage the config file. ''' self.node = node self.config = config self.nodes = self._getallnodes() self.folders = self._getallfolders() self.profiles = list(self.config.profiles.keys()) self.case = self.config.config["case"] #DEFAULTPARSER defaultparser = argparse.ArgumentParser(prog = "conn", description = "SSH and Telnet connection manager", formatter_class=argparse.RawTextHelpFormatter) subparsers = defaultparser.add_subparsers(title="Commands") #NODEPARSER nodeparser = subparsers.add_parser("node",usage=self._help("usage"), help=self._help("node"),epilog=self._help("end"), formatter_class=argparse.RawTextHelpFormatter) nodecrud = nodeparser.add_mutually_exclusive_group() nodeparser.add_argument("node", metavar="node|folder", nargs='?', default=None, action=self._store_type, type=self._type_node, help=self._help("node")) nodecrud.add_argument("--add", dest="action", action="store_const", help="Add new node[@subfolder][@folder] or [@subfolder]@folder", const="add", default="connect") nodecrud.add_argument("--del", "--rm", dest="action", action="store_const", help="Delete node[@subfolder][@folder] or [@subfolder]@folder", const="del", default="connect") nodecrud.add_argument("--mod", "--edit", dest="action", action="store_const", help="Modify node[@subfolder][@folder]", const="mod", default="connect") nodecrud.add_argument("--show", dest="action", action="store_const", help="Show node[@subfolder][@folder]", const="show", default="connect") nodecrud.add_argument("--debug", "-d", dest="action", action="store_const", help="Display all conections steps", const="debug", default="connect") nodeparser.set_defaults(func=self._func_node) #PROFILEPARSER profileparser = subparsers.add_parser("profile", help="Manage profiles") profileparser.add_argument("profile", nargs=1, action=self._store_type, type=self._type_profile, help="Name of profile to manage") profilecrud = profileparser.add_mutually_exclusive_group(required=True) profilecrud.add_argument("--add", dest="action", action="store_const", help="Add new profile", const="add") profilecrud.add_argument("--del", "--rm", dest="action", action="store_const", help="Delete profile", const="del") profilecrud.add_argument("--mod", "--edit", dest="action", action="store_const", help="Modify profile", const="mod") profilecrud.add_argument("--show", dest="action", action="store_const", help="Show profile", const="show") profileparser.set_defaults(func=self._func_profile) #MOVEPARSER moveparser = subparsers.add_parser("move", aliases=["mv"], help="Move node") moveparser.add_argument("move", nargs=2, action=self._store_type, help="Move node[@subfolder][@folder] dest_node[@subfolder][@folder]", default="move", type=self._type_node) moveparser.set_defaults(func=self._func_others) #COPYPARSER copyparser = subparsers.add_parser("copy", aliases=["cp"], help="Copy node") copyparser.add_argument("cp", nargs=2, action=self._store_type, help="Copy node[@subfolder][@folder] new_node[@subfolder][@folder]", default="cp", type=self._type_node) copyparser.set_defaults(func=self._func_others) #LISTPARSER lsparser = subparsers.add_parser("list", aliases=["ls"], help="List profiles, nodes or folders") lsparser.add_argument("ls", action=self._store_type, choices=["profiles","nodes","folders"], help="List profiles, nodes or folders", default=False) lsparser.set_defaults(func=self._func_others) #BULKPARSER bulkparser = subparsers.add_parser("bulk", help="Add nodes in bulk") bulkparser.add_argument("bulk", const="bulk", nargs=0, action=self._store_type, help="Add nodes in bulk") bulkparser.set_defaults(func=self._func_others) #CONFIGPARSER configparser = subparsers.add_parser("config", help="Manage app config") configparser.add_argument("--allow-uppercase", dest="case", nargs=1, action=self._store_type, help="Allow case sensitive names", choices=["true","false"]) configparser.add_argument("--keepalive", dest="idletime", nargs=1, action=self._store_type, help="Set keepalive time in seconds, 0 to disable", type=int, metavar="INT") configparser.add_argument("--completion", dest="completion", nargs=0, action=self._store_type, help="Get bash completion configuration for conn") configparser.set_defaults(func=self._func_others) #Set default subparser and tune arguments commands = ["node", "profile", "mv", "move","copy", "cp", "bulk", "ls", "list", "config"] profilecmds = ["--add", "--del", "--rm", "--mod", "--edit", "--show"] if len(sys.argv) >= 3 and sys.argv[2] == "profile" and sys.argv[1] in profilecmds: sys.argv[2] = sys.argv[1] sys.argv[1] = "profile" if len(sys.argv) < 2 or sys.argv[1] not in commands: sys.argv.insert(1,"node") args = defaultparser.parse_args() args.func(args) class _store_type(argparse.Action): #Custom store type for cli app. def __call__(self, parser, args, values, option_string=None): setattr(args, "data", values) delattr(args,self.dest) setattr(args, "command", self.dest) def _func_node(self, args): #Function called when connecting or managing nodes. if not self.case and args.data != None: args.data = args.data.lower() if args.action == "connect" or args.action == "debug": if args.data == None: matches = self.nodes if len(matches) == 0: print("There are no nodes created") print("try: conn --help") exit(9) else: if args.data.startswith("@"): matches = list(filter(lambda k: args.data in k, self.nodes)) else: matches = list(filter(lambda k: k.startswith(args.data), self.nodes)) if len(matches) == 0: print("{} not found".format(args.data)) exit(2) elif len(matches) > 1: matches[0] = self._choose(matches,"node", "connect") if matches[0] == None: exit(7) node = self.config.getitem(matches[0]) node = self.node(matches[0],**node, config = self.config) if args.action == "debug": node.interact(debug = True) else: node.interact() elif args.action == "del": if args.data == None: print("Missing argument node") exit(3) elif args.data.startswith("@"): matches = list(filter(lambda k: k == args.data, self.folders)) else: matches = list(filter(lambda k: k == args.data, self.nodes)) if len(matches) == 0: print("{} not found".format(args.data)) exit(2) question = [inquirer.Confirm("delete", message="Are you sure you want to delete {}?".format(matches[0]))] confirm = inquirer.prompt(question) if confirm["delete"]: uniques = self.config._explode_unique(matches[0]) if args.data.startswith("@"): self.config._folder_del(**uniques) else: self.config._connections_del(**uniques) self.config._saveconfig(self.config.file) print("{} deleted succesfully".format(matches[0])) elif args.action == "add": if args.data == None: print("Missing argument node") exit(3) elif args.data.startswith("@"): type = "folder" matches = list(filter(lambda k: k == args.data, self.folders)) reversematches = list(filter(lambda k: "@" + k == args.data, self.nodes)) else: type = "node" matches = list(filter(lambda k: k == args.data, self.nodes)) reversematches = list(filter(lambda k: k == "@" + args.data, self.folders)) if len(matches) > 0: print("{} already exist".format(matches[0])) exit(4) if len(reversematches) > 0: print("{} already exist".format(reversematches[0])) exit(4) else: if type == "folder": uniques = self.config._explode_unique(args.data) if uniques == False: print("Invalid folder {}".format(args.data)) exit(5) if "subfolder" in uniques.keys(): parent = "@" + uniques["folder"] if parent not in self.folders: print("Folder {} not found".format(uniques["folder"])) exit(2) self.config._folder_add(**uniques) self.config._saveconfig(self.config.file) print("{} added succesfully".format(args.data)) if type == "node": nodefolder = args.data.partition("@") nodefolder = "@" + nodefolder[2] if nodefolder not in self.folders and nodefolder != "@": print(nodefolder + " not found") exit(2) uniques = self.config._explode_unique(args.data) if uniques == False: print("Invalid node {}".format(args.data)) exit(5) print("You can use the configured setting in a profile using @profilename.") print("You can also leave empty any value except hostname/IP.") print("You can pass 1 or more passwords using comma separated @profiles") print("You can use this variables on logging file name: ${id} ${unique} ${host} ${port} ${user} ${protocol}") newnode = self._questions_nodes(args.data, uniques) if newnode == False: exit(7) self.config._connections_add(**newnode) self.config._saveconfig(self.config.file) print("{} added succesfully".format(args.data)) elif args.action == "show": if args.data == None: print("Missing argument node") exit(3) matches = list(filter(lambda k: k == args.data, self.nodes)) if len(matches) == 0: print("{} not found".format(args.data)) exit(2) node = self.config.getitem(matches[0]) for k, v in node.items(): if isinstance(v, str): print(k + ": " + v) else: print(k + ":") for i in v: print(" - " + i) elif args.action == "mod": if args.data == None: print("Missing argument node") exit(3) matches = list(filter(lambda k: k == args.data, self.nodes)) if len(matches) == 0: print("{} not found".format(args.data)) exit(2) node = self.config.getitem(matches[0]) edits = self._questions_edit() if edits == None: exit(7) uniques = self.config._explode_unique(args.data) updatenode = self._questions_nodes(args.data, uniques, edit=edits) if not updatenode: exit(7) uniques.update(node) if sorted(updatenode.items()) == sorted(uniques.items()): print("Nothing to do here") return else: self.config._connections_add(**updatenode) self.config._saveconfig(self.config.file) print("{} edited succesfully".format(args.data)) def _func_profile(self, args): #Function called when managing profiles if not self.case: args.data[0] = args.data[0].lower() if args.action == "del": matches = list(filter(lambda k: k == args.data[0], self.profiles)) if len(matches) == 0: print("{} not found".format(args.data[0])) exit(2) if matches[0] == "default": print("Can't delete default profile") exit(6) usedprofile = self._profileused(matches[0]) if len(usedprofile) > 0: print("Profile {} used in the following nodes:".format(matches[0])) print(", ".join(usedprofile)) exit(8) question = [inquirer.Confirm("delete", message="Are you sure you want to delete {}?".format(matches[0]))] confirm = inquirer.prompt(question) if confirm["delete"]: self.config._profiles_del(id = matches[0]) self.config._saveconfig(self.config.file) print("{} deleted succesfully".format(matches[0])) elif args.action == "show": matches = list(filter(lambda k: k == args.data[0], self.profiles)) if len(matches) == 0: print("{} not found".format(args.data[0])) exit(2) profile = self.config.profiles[matches[0]] for k, v in profile.items(): if isinstance(v, str): print(k + ": " + v) else: print(k + ":") for i in v: print(" - " + i) elif args.action == "add": matches = list(filter(lambda k: k == args.data[0], self.profiles)) if len(matches) > 0: print("Profile {} Already exist".format(matches[0])) exit(4) newprofile = self._questions_profiles(args.data[0]) if newprofile == False: exit(7) self.config._profiles_add(**newprofile) self.config._saveconfig(self.config.file) print("{} added succesfully".format(args.data[0])) elif args.action == "mod": matches = list(filter(lambda k: k == args.data[0], self.profiles)) if len(matches) == 0: print("{} not found".format(args.data[0])) exit(2) profile = self.config.profiles[matches[0]] oldprofile = {"id": matches[0]} oldprofile.update(profile) edits = self._questions_edit() if edits == None: exit(7) updateprofile = self._questions_profiles(matches[0], edit=edits) if not updateprofile: exit(7) if sorted(updateprofile.items()) == sorted(oldprofile.items()): print("Nothing to do here") return else: self.config._profiles_add(**updateprofile) self.config._saveconfig(self.config.file) print("{} edited succesfully".format(args.data[0])) def _func_others(self, args): #Function called when using other commands if args.command == "ls": print(*getattr(self, args.data), sep="\n") elif args.command == "move" or args.command == "cp": if not self.case: args.data[0] = args.data[0].lower() args.data[1] = args.data[1].lower() source = list(filter(lambda k: k == args.data[0], self.nodes)) dest = list(filter(lambda k: k == args.data[1], self.nodes)) if len(source) != 1: print("{} not found".format(args.data[0])) exit(2) if len(dest) > 0: print("Node {} Already exist".format(args.data[1])) exit(4) nodefolder = args.data[1].partition("@") nodefolder = "@" + nodefolder[2] if nodefolder not in self.folders and nodefolder != "@": print("{} not found".format(nodefolder)) exit(2) olduniques = self.config._explode_unique(args.data[0]) newuniques = self.config._explode_unique(args.data[1]) if newuniques == False: print("Invalid node {}".format(args.data[1])) exit(5) node = self.config.getitem(source[0]) newnode = {**newuniques, **node} self.config._connections_add(**newnode) if args.command == "move": self.config._connections_del(**olduniques) self.config._saveconfig(self.config.file) if args.command == "move": print("{} moved succesfully to {}".format(args.data[0],args.data[1])) if args.command == "cp": print("{} copied succesfully to {}".format(args.data[0],args.data[1])) elif args.command == "bulk": newnodes = self._questions_bulk() if newnodes == False: exit(7) if not self.case: newnodes["location"] = newnodes["location"].lower() newnodes["ids"] = newnodes["ids"].lower() ids = newnodes["ids"].split(",") hosts = newnodes["host"].split(",") count = 0 for n in ids: unique = n + newnodes["location"] matches = list(filter(lambda k: k == unique, self.nodes)) reversematches = list(filter(lambda k: k == "@" + unique, self.folders)) if len(matches) > 0: print("Node {} already exist, ignoring it".format(unique)) continue if len(reversematches) > 0: print("Folder with name {} already exist, ignoring it".format(unique)) continue newnode = {"id": n} if newnodes["location"] != "": location = self.config._explode_unique(newnodes["location"]) newnode.update(location) if len(hosts) > 1: index = ids.index(n) newnode["host"] = hosts[index] else: newnode["host"] = hosts[0] newnode["protocol"] = newnodes["protocol"] newnode["port"] = newnodes["port"] newnode["options"] = newnodes["options"] newnode["logs"] = newnodes["logs"] newnode["user"] = newnodes["user"] newnode["password"] = newnodes["password"] count +=1 self.config._connections_add(**newnode) self.nodes = self._getallnodes() if count > 0: self.config._saveconfig(self.config.file) print("Succesfully added {} nodes".format(count)) else: print("0 nodes added") else: if args.command == "completion": print(self._help("completion")) else: if args.command == "case": if args.data[0] == "true": args.data[0] = True elif args.data[0] == "false": args.data[0] = False if args.command == "idletime": if args.data[0] < 0: args.data[0] = 0 self.config.config[args.command] = args.data[0] self.config._saveconfig(self.config.file) print("Config saved") def _choose(self, list, name, action): #Generates an inquirer list to pick questions = [inquirer.List(name, message="Pick {} to {}:".format(name,action), choices=list, carousel=True)] answer = inquirer.prompt(questions) if answer == None: return else: return answer[name] def _host_validation(self, answers, current, regex = "^.+$"): #Validate hostname in inquirer when managing nodes if not re.match(regex, current): raise inquirer.errors.ValidationError("", reason="Host cannot be empty") if current.startswith("@"): if current[1:] not in self.profiles: raise inquirer.errors.ValidationError("", reason="Profile {} don't exist".format(current)) return True def _profile_protocol_validation(self, answers, current, regex = "(^ssh$|^telnet$|^$)"): #Validate protocol in inquirer when managing profiles if not re.match(regex, current): raise inquirer.errors.ValidationError("", reason="Pick between ssh, telnet or leave empty") return True def _protocol_validation(self, answers, current, regex = "(^ssh$|^telnet$|^$|^@.+$)"): #Validate protocol in inquirer when managing nodes if not re.match(regex, current): raise inquirer.errors.ValidationError("", reason="Pick between ssh, telnet, leave empty or @profile") if current.startswith("@"): if current[1:] not in self.profiles: raise inquirer.errors.ValidationError("", reason="Profile {} don't exist".format(current)) return True def _profile_port_validation(self, answers, current, regex = "(^[0-9]*$)"): #Validate port in inquirer when managing profiles if not re.match(regex, current): raise inquirer.errors.ValidationError("", reason="Pick a port between 1-65535, @profile o leave empty") try: port = int(current) except: port = 0 if current != "" and not 1 <= int(port) <= 65535: raise inquirer.errors.ValidationError("", reason="Pick a port between 1-65535 or leave empty") return True def _port_validation(self, answers, current, regex = "(^[0-9]*$|^@.+$)"): #Validate port in inquirer when managing nodes if not re.match(regex, current): raise inquirer.errors.ValidationError("", reason="Pick a port between 1-65535, @profile or leave empty") try: port = int(current) except: port = 0 if current.startswith("@"): if current[1:] not in self.profiles: raise inquirer.errors.ValidationError("", reason="Profile {} don't exist".format(current)) elif current != "" and not 1 <= int(port) <= 65535: raise inquirer.errors.ValidationError("", reason="Pick a port between 1-65535, @profile o leave empty") return True def _pass_validation(self, answers, current, regex = "(^@.+$)"): #Validate password in inquirer profiles = current.split(",") for i in profiles: if not re.match(regex, i) or i[1:] not in self.profiles: raise inquirer.errors.ValidationError("", reason="Profile {} don't exist".format(i)) return True def _default_validation(self, answers, current): #Default validation type used in multiples questions in inquirer if current.startswith("@"): if current[1:] not in self.profiles: raise inquirer.errors.ValidationError("", reason="Profile {} don't exist".format(current)) return True def _bulk_node_validation(self, answers, current, regex = "^[0-9a-zA-Z_.,$#-]+$"): #Validation of nodes when running bulk command if not re.match(regex, current): raise inquirer.errors.ValidationError("", reason="Host cannot be empty") if current.startswith("@"): if current[1:] not in self.profiles: raise inquirer.errors.ValidationError("", reason="Profile {} don't exist".format(current)) return True def _bulk_folder_validation(self, answers, current): #Validation of folders when running bulk command if not self.case: current = current.lower() matches = list(filter(lambda k: k == current, self.folders)) if current != "" and len(matches) == 0: raise inquirer.errors.ValidationError("", reason="Location {} don't exist".format(current)) return True def _bulk_host_validation(self, answers, current, regex = "^.+$"): #Validate hostname when running bulk command if not re.match(regex, current): raise inquirer.errors.ValidationError("", reason="Host cannot be empty") if current.startswith("@"): if current[1:] not in self.profiles: raise inquirer.errors.ValidationError("", reason="Profile {} don't exist".format(current)) hosts = current.split(",") nodes = answers["ids"].split(",") if len(hosts) > 1 and len(hosts) != len(nodes): raise inquirer.errors.ValidationError("", reason="Hosts list should be the same length of nodes list") return True def _questions_edit(self): #Inquirer questions when editing nodes or profiles questions = [] questions.append(inquirer.Confirm("host", message="Edit Hostname/IP?")) questions.append(inquirer.Confirm("protocol", message="Edit Protocol?")) questions.append(inquirer.Confirm("port", message="Edit Port?")) questions.append(inquirer.Confirm("options", message="Edit Options?")) questions.append(inquirer.Confirm("logs", message="Edit logging path/file?")) questions.append(inquirer.Confirm("user", message="Edit User?")) questions.append(inquirer.Confirm("password", message="Edit password?")) answers = inquirer.prompt(questions) return answers def _questions_nodes(self, unique, uniques = None, edit = None): #Questions when adding or editing nodes try: defaults = self.config.getitem(unique) except: defaults = { "host":"", "protocol":"", "port":"", "user":"", "options":"", "logs":"" } node = {} if edit == None: edit = { "host":True, "protocol":True, "port":True, "user":True, "password": True,"options":True, "logs":True } questions = [] if edit["host"]: questions.append(inquirer.Text("host", message="Add Hostname or IP", validate=self._host_validation, default=defaults["host"])) else: node["host"] = defaults["host"] if edit["protocol"]: questions.append(inquirer.Text("protocol", message="Select Protocol", validate=self._protocol_validation, default=defaults["protocol"])) else: node["protocol"] = defaults["protocol"] if edit["port"]: questions.append(inquirer.Text("port", message="Select Port Number", validate=self._port_validation, default=defaults["port"])) else: node["port"] = defaults["port"] if edit["options"]: questions.append(inquirer.Text("options", message="Pass extra options to protocol", validate=self._default_validation, default=defaults["options"])) else: node["options"] = defaults["options"] if edit["logs"]: questions.append(inquirer.Text("logs", message="Pick logging path/file ", validate=self._default_validation, default=defaults["logs"])) else: node["logs"] = defaults["logs"] if edit["user"]: questions.append(inquirer.Text("user", message="Pick username", validate=self._default_validation, default=defaults["user"])) else: node["user"] = defaults["user"] if edit["password"]: questions.append(inquirer.List("password", message="Password: Use a local password, no password or a list of profiles to reference?", choices=["Local Password", "Profiles", "No Password"])) else: node["password"] = defaults["password"] answer = inquirer.prompt(questions) if answer == None: return False if "password" in answer.keys(): if answer["password"] == "Local Password": passq = [inquirer.Password("password", message="Set Password")] passa = inquirer.prompt(passq) if passa == None: return False answer["password"] = self.encrypt(passa["password"]) elif answer["password"] == "Profiles": passq = [(inquirer.Text("password", message="Set a @profile or a comma separated list of @profiles", validate=self._pass_validation))] passa = inquirer.prompt(passq) if passa == None: return False answer["password"] = passa["password"].split(",") elif answer["password"] == "No Password": answer["password"] = "" result = {**uniques, **answer, **node} result["type"] = "connection" return result def _questions_profiles(self, unique, edit = None): #Questions when adding or editing profiles try: defaults = self.config.profiles[unique] except: defaults = { "host":"", "protocol":"", "port":"", "user":"", "options":"", "logs":"" } profile = {} if edit == None: edit = { "host":True, "protocol":True, "port":True, "user":True, "password": True,"options":True, "logs":True } questions = [] if edit["host"]: questions.append(inquirer.Text("host", message="Add Hostname or IP", default=defaults["host"])) else: profile["host"] = defaults["host"] if edit["protocol"]: questions.append(inquirer.Text("protocol", message="Select Protocol", validate=self._profile_protocol_validation, default=defaults["protocol"])) else: profile["protocol"] = defaults["protocol"] if edit["port"]: questions.append(inquirer.Text("port", message="Select Port Number", validate=self._profile_port_validation, default=defaults["port"])) else: profile["port"] = defaults["port"] if edit["options"]: questions.append(inquirer.Text("options", message="Pass extra options to protocol", default=defaults["options"])) else: profile["options"] = defaults["options"] if edit["logs"]: questions.append(inquirer.Text("logs", message="Pick logging path/file ", default=defaults["logs"])) else: profile["logs"] = defaults["logs"] if edit["user"]: questions.append(inquirer.Text("user", message="Pick username", default=defaults["user"])) else: profile["user"] = defaults["user"] if edit["password"]: questions.append(inquirer.Password("password", message="Set Password")) else: profile["password"] = defaults["password"] answer = inquirer.prompt(questions) if answer == None: return False if "password" in answer.keys(): if answer["password"] != "": answer["password"] = self.encrypt(answer["password"]) result = {**answer, **profile} result["id"] = unique return result def _questions_bulk(self): #Questions when using bulk command questions = [] questions.append(inquirer.Text("ids", message="add a comma separated list of nodes to add", validate=self._bulk_node_validation)) questions.append(inquirer.Text("location", message="Add a @folder, @subfolder@folder or leave empty", validate=self._bulk_folder_validation)) questions.append(inquirer.Text("host", message="Add comma separated list of Hostnames or IPs", validate=self._bulk_host_validation)) questions.append(inquirer.Text("protocol", message="Select Protocol", validate=self._protocol_validation)) questions.append(inquirer.Text("port", message="Select Port Number", validate=self._port_validation)) questions.append(inquirer.Text("options", message="Pass extra options to protocol", validate=self._default_validation)) questions.append(inquirer.Text("logs", message="Pick logging path/file ", validate=self._default_validation)) questions.append(inquirer.Text("user", message="Pick username", validate=self._default_validation)) questions.append(inquirer.List("password", message="Password: Use a local password, no password or a list of profiles to reference?", choices=["Local Password", "Profiles", "No Password"])) answer = inquirer.prompt(questions) if answer == None: return False if "password" in answer.keys(): if answer["password"] == "Local Password": passq = [inquirer.Password("password", message="Set Password")] passa = inquirer.prompt(passq) answer["password"] = self.encrypt(passa["password"]) elif answer["password"] == "Profiles": passq = [(inquirer.Text("password", message="Set a @profile or a comma separated list of @profiles", validate=self._pass_validation))] passa = inquirer.prompt(passq) answer["password"] = passa["password"].split(",") elif answer["password"] == "No Password": answer["password"] = "" answer["type"] = "connection" return answer def _type_node(self, arg_value, pat=re.compile(r"^[0-9a-zA-Z_.$@#-]+$")): if not pat.match(arg_value): raise argparse.ArgumentTypeError return arg_value def _type_profile(self, arg_value, pat=re.compile(r"^[0-9a-zA-Z_.$#-]+$")): if not pat.match(arg_value): raise argparse.ArgumentTypeError return arg_value def _help(self, type): #Store text for help and other commands if type == "node": return "node[@subfolder][@folder]\nConnect to specific node or show all matching nodes\n[@subfolder][@folder]\nShow all available connections globaly or in specified path" if type == "usage": return "conn [-h] [--add | --del | --mod | --show | --debug] [node|folder]\n conn {profile,move,mv,copy,cp,list,ls,bulk,config} ..." if type == "end": return "Commands:\n profile Manage profiles\n move (mv) Move node\n copy (cp) Copy node\n list (ls) List profiles, nodes or folders\n bulk Add nodes in bulk\n config Manage app config" if type == "completion": return ''' #Here starts bash completion for conn #You need jq installed in order to use this _conn() { DATADIR=$HOME/.config/conn mapfile -t connections < <(jq -r ' .["connections"] | paths as $path | select(getpath($path) == "connection") | $path | [map(select(. != "type"))[-1,-2,-3]] | map(select(. !=null)) | join("@")' $DATADIR/config.json) mapfile -t folders < <(jq -r ' .["connections"] | paths as $path | select(getpath($path) == "folder" or getpath($path) == "subfolder") | $path | [map(select(. != "type"))[-1,-2]] | map(select(. !=null)) | join("@")' $DATADIR/config.json) mapfile -t profiles < <(jq -r '.["profiles"] | keys[]' $DATADIR/config.json) if [ "${#COMP_WORDS[@]}" = "2" ]; then strings="--add --del --rm --edit --mod mv --show ls cp profile bulk config --help" strings="$strings ${connections[@]} ${folders[@]/#/@}" COMPREPLY=($(compgen -W "$strings" -- "${COMP_WORDS[1]}")) fi if [ "${#COMP_WORDS[@]}" = "3" ]; then strings="" if [ "${COMP_WORDS[1]}" = "profile" ]; then strings="--add --rm --del --edit --mod --show --help"; fi if [ "${COMP_WORDS[1]}" = "config" ]; then strings="--allow-uppercase --keepalive --completion --help"; fi if [[ "${COMP_WORDS[1]}" =~ ^--mod|--edit|--show|--add|--rm|--del$ ]]; then strings="profile"; fi if [[ "${COMP_WORDS[1]}" =~ ^list|ls$ ]]; then strings="profiles nodes folders"; fi if [[ "${COMP_WORDS[1]}" =~ ^bulk|mv|move|cp|copy$$ ]]; then strings="--help"; fi if [[ "${COMP_WORDS[1]}" =~ ^--rm|--del$ ]]; then strings="$strings ${folders[@]/#/@}"; fi if [[ "${COMP_WORDS[1]}" =~ ^--rm|--del|--mod|--edit|mv|move|cp|copy|--show$ ]]; then strings="$strings ${connections[@]}" fi COMPREPLY=($(compgen -W "$strings" -- "${COMP_WORDS[2]}")) fi if [ "${#COMP_WORDS[@]}" = "4" ]; then strings="" if [ "${COMP_WORDS[1]}" = "profile" ]; then if [[ "${COMP_WORDS[2]}" =~ ^--rm|--del|--mod|--edit|--show$ ]] ; then strings="$strings ${profiles[@]}" fi fi if [ "${COMP_WORDS[2]}" = "profile" ]; then if [[ "${COMP_WORDS[1]}" =~ ^--rm|--remove|--del|--mod|--edit|--show$ ]] ; then strings="$strings ${profiles[@]}" fi fi COMPREPLY=($(compgen -W "$strings" -- "${COMP_WORDS[3]}")) fi } complete -o nosort -F _conn conn ''' def _getallnodes(self): #get all nodes on configfile nodes = [] layer1 = [k for k,v in self.config.connections.items() if isinstance(v, dict) and v["type"] == "connection"] folders = [k for k,v in self.config.connections.items() if isinstance(v, dict) and v["type"] == "folder"] nodes.extend(layer1) for f in folders: layer2 = [k + "@" + f for k,v in self.config.connections[f].items() if isinstance(v, dict) and v["type"] == "connection"] nodes.extend(layer2) subfolders = [k for k,v in self.config.connections[f].items() if isinstance(v, dict) and v["type"] == "subfolder"] for s in subfolders: layer3 = [k + "@" + s + "@" + f for k,v in self.config.connections[f][s].items() if isinstance(v, dict) and v["type"] == "connection"] nodes.extend(layer3) return nodes def _getallfolders(self): #get all folders on configfile folders = ["@" + k for k,v in self.config.connections.items() if isinstance(v, dict) and v["type"] == "folder"] subfolders = [] for f in folders: s = ["@" + k + f for k,v in self.config.connections[f[1:]].items() if isinstance(v, dict) and v["type"] == "subfolder"] subfolders.extend(s) folders.extend(subfolders) return folders def _profileused(self, profile): #Check if profile is used before deleting it nodes = [] layer1 = [k for k,v in self.config.connections.items() if isinstance(v, dict) and v["type"] == "connection" and ("@" + profile in v.values() or ( isinstance(v["password"],list) and "@" + profile in v["password"]))] folders = [k for k,v in self.config.connections.items() if isinstance(v, dict) and v["type"] == "folder"] nodes.extend(layer1) for f in folders: layer2 = [k + "@" + f for k,v in self.config.connections[f].items() if isinstance(v, dict) and v["type"] == "connection" and ("@" + profile in v.values() or ( isinstance(v["password"],list) and "@" + profile in v["password"]))] nodes.extend(layer2) subfolders = [k for k,v in self.config.connections[f].items() if isinstance(v, dict) and v["type"] == "subfolder"] for s in subfolders: layer3 = [k + "@" + s + "@" + f for k,v in self.config.connections[f][s].items() if isinstance(v, dict) and v["type"] == "connection" and ("@" + profile in v.values() or ( isinstance(v["password"],list) and "@" + profile in v["password"]))] nodes.extend(layer3) return nodes def encrypt(self, password, keyfile=None): ''' Encrypts password using RSA keyfile ### Parameters: - password (str): Plaintext password to encrypt. ### Optional Parameters: - keyfile (str): Path/file to keyfile. Default is config keyfile. ### Returns: str: Encrypted password. ''' if keyfile is None: keyfile = self.config.key key = RSA.import_key(open(keyfile).read()) publickey = key.publickey() encryptor = PKCS1_OAEP.new(publickey) password = encryptor.encrypt(password.encode("utf-8")) return str(password)Methods- def encrypt(self, password, keyfile=None)
- 
Encrypts password using RSA keyfile Parameters:- password (str): Plaintext password to encrypt.Optional Parameters:- keyfile (str): Path/file to keyfile. Default is config keyfile.Returns:str: Encrypted password.Expand source codedef encrypt(self, password, keyfile=None): ''' Encrypts password using RSA keyfile ### Parameters: - password (str): Plaintext password to encrypt. ### Optional Parameters: - keyfile (str): Path/file to keyfile. Default is config keyfile. ### Returns: str: Encrypted password. ''' if keyfile is None: keyfile = self.config.key key = RSA.import_key(open(keyfile).read()) publickey = key.publickey() encryptor = PKCS1_OAEP.new(publickey) password = encryptor.encrypt(password.encode("utf-8")) return str(password)
 
- class node (unique, host, options='', logs='', password='', port='', protocol='', user='', config='')
- 
This class generates a node object. Containts all the information and methods to connect and interact with a device using ssh or telnet. Attributes:- output (str): Output of the commands you ran with run or test method. - result(bool): True if expected value is found after running the commands using test method.Parameters:- unique (str): Unique name to assign to the node. - host (str): IP address or hostname of the node.Optional Parameters:- options (str): Additional options to pass the ssh/telnet for connection. - logs (str): Path/file for storing the logs. You can use ${unique},${host}, ${port}, ${user}, ${protocol} as variables. - password (str): Encrypted or plaintext password. - port (str): Port to connect to node, default 22 for ssh and 23 for telnet. - protocol (str): Select ssh or telnet. Default is ssh. - user (str): Username to of the node. - config (obj): Pass the object created with class configfile with key for decryption and extra configuration if you are using connection manager.Expand source codeclass node: ''' This class generates a node object. Containts all the information and methods to connect and interact with a device using ssh or telnet. ### Attributes: - output (str): Output of the commands you ran with run or test method. - result(bool): True if expected value is found after running the commands using test method. ''' def __init__(self, unique, host, options='', logs='', password='', port='', protocol='', user='', config=''): ''' ### Parameters: - unique (str): Unique name to assign to the node. - host (str): IP address or hostname of the node. ### Optional Parameters: - options (str): Additional options to pass the ssh/telnet for connection. - logs (str): Path/file for storing the logs. You can use ${unique},${host}, ${port}, ${user}, ${protocol} as variables. - password (str): Encrypted or plaintext password. - port (str): Port to connect to node, default 22 for ssh and 23 for telnet. - protocol (str): Select ssh or telnet. Default is ssh. - user (str): Username to of the node. - config (obj): Pass the object created with class configfile with key for decryption and extra configuration if you are using connection manager. ''' if config == '': self.idletime = 0 self.key = None else: self.idletime = config.config["idletime"] self.key = config.key self.unique = unique attr = {"host": host, "logs": logs, "options":options, "port": port, "protocol": protocol, "user": user} for key in attr: profile = re.search("^@(.*)", attr[key]) if profile and config != '': setattr(self,key,config.profiles[profile.group(1)][key]) elif attr[key] == '' and key == "protocol": try: setattr(self,key,config.profiles["default"][key]) except: setattr(self,key,"ssh") else: setattr(self,key,attr[key]) if isinstance(password,list): self.password = [] for i, s in enumerate(password): profile = re.search("^@(.*)", password[i]) if profile and config != '': self.password.append(config.profiles[profile.group(1)]["password"]) else: self.password = [password] def __passtx(self, passwords, *, keyfile=None): # decrypts passwords, used by other methdos. dpass = [] if keyfile is None: keyfile = self.key if keyfile is not None: key = RSA.import_key(open(keyfile).read()) decryptor = PKCS1_OAEP.new(key) for passwd in passwords: if not re.match('^b[\"\'].+[\"\']$', passwd): dpass.append(passwd) else: try: decrypted = decryptor.decrypt(ast.literal_eval(passwd)).decode("utf-8") dpass.append(decrypted) except: raise ValueError("Missing or corrupted key") return dpass def _logfile(self, logfile = None): # translate logs variables and generate logs path. if logfile == None: logfile = self.logs logfile = logfile.replace("${unique}", self.unique) logfile = logfile.replace("${host}", self.host) logfile = logfile.replace("${port}", self.port) logfile = logfile.replace("${user}", self.user) logfile = logfile.replace("${protocol}", self.protocol) now = datetime.datetime.now() dateconf = re.search(r'\$\{date \'(.*)\'}', logfile) if dateconf: logfile = re.sub(r'\$\{date (.*)}',now.strftime(dateconf.group(1)), logfile) return logfile def _logclean(self, logfile, var = False): #Remove special ascii characters and other stuff from logfile. if var == False: t = open(logfile, "r").read() else: t = logfile t = t.replace("\n","",1).replace("\a","") t = t.replace('\n\n', '\n') t = re.sub(r'.\[K', '', t) while True: tb = re.sub('.\b', '', t, count=1) if len(t) == len(tb): break t = tb ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/ ]*[@-~])') t = ansi_escape.sub('', t) if var == False: d = open(logfile, "w") d.write(t) d.close() return else: return t def interact(self, debug = False): ''' Allow user to interact with the node directly, mostly used by connection manager. ### Optional Parameters: - debug (bool): If True, display all the connecting information before interact. Default False. ''' connect = self._connect(debug = debug) if connect == True: size = re.search('columns=([0-9]+).*lines=([0-9]+)',str(os.get_terminal_size())) self.child.setwinsize(int(size.group(2)),int(size.group(1))) print("Connected to " + self.unique + " at " + self.host + (":" if self.port != '' else '') + self.port + " via: " + self.protocol) if 'logfile' in dir(self): self.child.logfile_read = open(self.logfile, "wb") elif debug: self.child.logfile_read = None if 'missingtext' in dir(self): print(self.child.after.decode(), end='') self.child.interact() if "logfile" in dir(self) and not debug: self._logclean(self.logfile) else: print(connect) exit(1) def run(self, commands,*, folder = '', prompt = r'>$|#$|\$$|>.$|#.$|\$.$', stdout = False): ''' Run a command or list of commands on the node and return the output. ### Parameters: - commands (str/list): Commands to run on the node. Should be str or a list of str. ### Optional Named Parameters: - folder (str): Path where output log should be stored, leave empty to disable logging. - prompt (str): Prompt to be expected after a command is finished running. Usually linux uses ">" or EOF while routers use ">" or "#". The default value should work for most nodes. Change it if your connection need some special symbol. - stdout (bool):Set True to send the command output to stdout. default False. ### Returns: str: Output of the commands you ran on the node. ''' connect = self._connect() if connect == True: expects = [prompt, pexpect.EOF] output = '' if isinstance(commands, list): for c in commands: result = self.child.expect(expects) self.child.sendline(c) if result == 0: output = output + self.child.before.decode() + self.child.after.decode() if result == 1: output = output + self.child.before.decode() else: result = self.child.expect(expects) self.child.sendline(commands) if result == 0: output = output + self.child.before.decode() + self.child.after.decode() if result == 1: output = output + self.child.before.decode() result = self.child.expect(expects) if result == 0: output = output + self.child.before.decode() + self.child.after.decode() if result == 1: output = output + self.child.before.decode() self.child.close() output = output.lstrip() if stdout == True: print(output) if folder != '': with open(folder + "/" + self.unique, "w") as f: f.write(output) f.close() self._logclean(folder + "/" + self.unique) self.output = output return output else: self.output = connect return connect def test(self, commands, expected, *, prompt = r'>$|#$|\$$|>.$|#.$|\$.$'): ''' Run a command or list of commands on the node, then check if expected value appears on the output after the last command. ### Parameters: - commands (str/list): Commands to run on the node. Should be str or list of str. - expected (str) : Expected text to appear after running all the commands on the node. ### Optional Named Parameters: - prompt (str): Prompt to be expected after a command is finished running. Usually linux uses ">" or EOF while routers use ">" or "#". The default value should work for most nodes. Change it if your connection need some special symbol. ### Returns: bool: true if expected value is found after running the commands false if prompt is found before. ''' connect = self._connect() if connect == True: expects = [prompt, pexpect.EOF] output = '' if isinstance(commands, list): for c in commands: result = self.child.expect(expects) self.child.sendline(c) if result == 0: output = output + self.child.before.decode() + self.child.after.decode() if result == 1: output = output + self.child.before.decode() else: self.child.expect(expects) self.child.sendline(commands) output = output + self.child.before.decode() + self.child.after.decode() expects = [expected, prompt, pexpect.EOF] results = self.child.expect(expects) if results == 0: self.child.close() self.result = True output = output + self.child.before.decode() + self.child.after.decode() output = output.lstrip() self.output = output return True if results in [1, 2]: self.child.close() self.result = False if results == 1: output = output + self.child.before.decode() + self.child.after.decode() elif results == 2: output = output + self.child.before.decode() output = output.lstrip() self.output = output return False else: self.result = None self.output = connect return connect def _connect(self, debug = False): # Method to connect to the node, it parse all the information, create the ssh/telnet command and login to the node. if self.protocol == "ssh": cmd = "ssh" if self.idletime > 0: cmd = cmd + " -o ServerAliveInterval=" + str(self.idletime) if self.user == '': cmd = cmd + " -t {}".format(self.host) else: cmd = cmd + " -t {}".format("@".join([self.user,self.host])) if self.port != '': cmd = cmd + " -p " + self.port if self.options != '': cmd = cmd + " " + self.options if self.logs != '': self.logfile = self._logfile() if self.password[0] != '': passwords = self.__passtx(self.password) else: passwords = [] expects = ['yes/no', 'refused', 'supported', 'cipher', 'sage', 'timeout', 'unavailable', 'closed', '[p|P]assword:|[u|U]sername:', r'>$|#$|\$$|>.$|#.$|\$.$', 'suspend', pexpect.EOF, "No route to host", "resolve hostname", "no matching host key"] elif self.protocol == "telnet": cmd = "telnet " + self.host if self.port != '': cmd = cmd + " " + self.port if self.options != '': cmd = cmd + " " + self.options if self.logs != '': self.logfile = self._logfile() if self.password[0] != '': passwords = self.__passtx(self.password) else: passwords = [] expects = ['[u|U]sername:', 'refused', 'supported', 'cipher', 'sage', 'timeout', 'unavailable', 'closed', '[p|P]assword:', r'>$|#$|\$$|>.$|#.$|\$.$', 'suspend', pexpect.EOF, "No route to host", "resolve hostname", "no matching host key"] else: raise ValueError("Invalid protocol: " + self.protocol) child = pexpect.spawn(cmd) if debug: child.logfile_read = sys.stdout.buffer if len(passwords) > 0: loops = len(passwords) else: loops = 1 endloop = False for i in range(0, loops): while True: results = child.expect(expects) if results == 0: if self.protocol == "ssh": child.sendline('yes') elif self.protocol == "telnet": if self.user != '': child.sendline(self.user) else: self.missingtext = True break if results in [1, 2, 3, 4, 5, 6, 7, 12, 13, 14]: child.close() return "Connection failed code:" + str(results) if results == 8: if len(passwords) > 0: child.sendline(passwords[i]) else: self.missingtext = True break if results in [9, 11]: endloop = True child.sendline() break if results == 10: child.sendline("\r") sleep(2) if endloop: break child.readline(0) self.child = child return TrueMethods- def interact(self, debug=False)
- 
Allow user to interact with the node directly, mostly used by connection manager. Optional Parameters:- debug (bool): If True, display all the connecting information before interact. Default False.Expand source codedef interact(self, debug = False): ''' Allow user to interact with the node directly, mostly used by connection manager. ### Optional Parameters: - debug (bool): If True, display all the connecting information before interact. Default False. ''' connect = self._connect(debug = debug) if connect == True: size = re.search('columns=([0-9]+).*lines=([0-9]+)',str(os.get_terminal_size())) self.child.setwinsize(int(size.group(2)),int(size.group(1))) print("Connected to " + self.unique + " at " + self.host + (":" if self.port != '' else '') + self.port + " via: " + self.protocol) if 'logfile' in dir(self): self.child.logfile_read = open(self.logfile, "wb") elif debug: self.child.logfile_read = None if 'missingtext' in dir(self): print(self.child.after.decode(), end='') self.child.interact() if "logfile" in dir(self) and not debug: self._logclean(self.logfile) else: print(connect) exit(1)
- def run(self, commands, *, folder='', prompt='>$|#$|\\$$|>.$|#.$|\\$.$', stdout=False)
- 
Run a command or list of commands on the node and return the output. Parameters:- commands (str/list): Commands to run on the node. Should be str or a list of str.Optional Named Parameters:- folder (str): Path where output log should be stored, leave empty to disable logging. - prompt (str): Prompt to be expected after a command is finished running. Usually linux uses ">" or EOF while routers use ">" or "#". The default value should work for most nodes. Change it if your connection need some special symbol. - stdout (bool):Set True to send the command output to stdout. default False.Returns:str: Output of the commands you ran on the node.Expand source codedef run(self, commands,*, folder = '', prompt = r'>$|#$|\$$|>.$|#.$|\$.$', stdout = False): ''' Run a command or list of commands on the node and return the output. ### Parameters: - commands (str/list): Commands to run on the node. Should be str or a list of str. ### Optional Named Parameters: - folder (str): Path where output log should be stored, leave empty to disable logging. - prompt (str): Prompt to be expected after a command is finished running. Usually linux uses ">" or EOF while routers use ">" or "#". The default value should work for most nodes. Change it if your connection need some special symbol. - stdout (bool):Set True to send the command output to stdout. default False. ### Returns: str: Output of the commands you ran on the node. ''' connect = self._connect() if connect == True: expects = [prompt, pexpect.EOF] output = '' if isinstance(commands, list): for c in commands: result = self.child.expect(expects) self.child.sendline(c) if result == 0: output = output + self.child.before.decode() + self.child.after.decode() if result == 1: output = output + self.child.before.decode() else: result = self.child.expect(expects) self.child.sendline(commands) if result == 0: output = output + self.child.before.decode() + self.child.after.decode() if result == 1: output = output + self.child.before.decode() result = self.child.expect(expects) if result == 0: output = output + self.child.before.decode() + self.child.after.decode() if result == 1: output = output + self.child.before.decode() self.child.close() output = output.lstrip() if stdout == True: print(output) if folder != '': with open(folder + "/" + self.unique, "w") as f: f.write(output) f.close() self._logclean(folder + "/" + self.unique) self.output = output return output else: self.output = connect return connect
- def test(self, commands, expected, *, prompt='>$|#$|\\$$|>.$|#.$|\\$.$')
- 
Run a command or list of commands on the node, then check if expected value appears on the output after the last command. Parameters:- commands (str/list): Commands to run on the node. Should be str or list of str. - expected (str) : Expected text to appear after running all the commands on the node.Optional Named Parameters:- prompt (str): Prompt to be expected after a command is finished running. Usually linux uses ">" or EOF while routers use ">" or "#". The default value should work for most nodes. Change it if your connection need some special symbol.Returns:bool: true if expected value is found after running the commands false if prompt is found before.Expand source codedef test(self, commands, expected, *, prompt = r'>$|#$|\$$|>.$|#.$|\$.$'): ''' Run a command or list of commands on the node, then check if expected value appears on the output after the last command. ### Parameters: - commands (str/list): Commands to run on the node. Should be str or list of str. - expected (str) : Expected text to appear after running all the commands on the node. ### Optional Named Parameters: - prompt (str): Prompt to be expected after a command is finished running. Usually linux uses ">" or EOF while routers use ">" or "#". The default value should work for most nodes. Change it if your connection need some special symbol. ### Returns: bool: true if expected value is found after running the commands false if prompt is found before. ''' connect = self._connect() if connect == True: expects = [prompt, pexpect.EOF] output = '' if isinstance(commands, list): for c in commands: result = self.child.expect(expects) self.child.sendline(c) if result == 0: output = output + self.child.before.decode() + self.child.after.decode() if result == 1: output = output + self.child.before.decode() else: self.child.expect(expects) self.child.sendline(commands) output = output + self.child.before.decode() + self.child.after.decode() expects = [expected, prompt, pexpect.EOF] results = self.child.expect(expects) if results == 0: self.child.close() self.result = True output = output + self.child.before.decode() + self.child.after.decode() output = output.lstrip() self.output = output return True if results in [1, 2]: self.child.close() self.result = False if results == 1: output = output + self.child.before.decode() + self.child.after.decode() elif results == 2: output = output + self.child.before.decode() output = output.lstrip() self.output = output return False else: self.result = None self.output = connect return connect
 
- class nodes (nodes: dict, config='')
- 
This class generates a nodes object. Contains a list of node class objects and methods to run multiple tasks on nodes simultaneously. Attributes:- nodelist (list): List of node class objects passed to the init function. - output (dict): Dictionary formed by nodes unique as keys, output of the commands you ran on the node as value. Created after running methods run or test. - result (dict): Dictionary formed by nodes unique as keys, value is True if expected value is found after running the commands, False if prompt is found before. Created after running method test. - <unique> (obj): For each item in nodelist, there is an attribute generated with the node unique.Parameters:- nodes (dict): Dictionary formed by node information: Keys: Unique name for each node. Mandatory Subkeys: host(str). Optional Subkeys: options(str), logs(str), password(str), port(str), protocol(str), user(str). For reference on subkeys check node class.Optional Parameters:- config (obj): Pass the object created with class configfile with key for decryption and extra configuration if you are using connection manager.Expand source codeclass nodes: ''' This class generates a nodes object. Contains a list of node class objects and methods to run multiple tasks on nodes simultaneously. ### Attributes: - nodelist (list): List of node class objects passed to the init function. - output (dict): Dictionary formed by nodes unique as keys, output of the commands you ran on the node as value. Created after running methods run or test. - result (dict): Dictionary formed by nodes unique as keys, value is True if expected value is found after running the commands, False if prompt is found before. Created after running method test. - <unique> (obj): For each item in nodelist, there is an attribute generated with the node unique. ''' def __init__(self, nodes: dict, config = ''): ''' ### Parameters: - nodes (dict): Dictionary formed by node information: Keys: Unique name for each node. Mandatory Subkeys: host(str). Optional Subkeys: options(str), logs(str), password(str), port(str), protocol(str), user(str). For reference on subkeys check node class. ### Optional Parameters: - config (obj): Pass the object created with class configfile with key for decryption and extra configuration if you are using connection manager. ''' self.nodelist = [] self.config = config for n in nodes: this = node(n, **nodes[n], config = config) self.nodelist.append(this) setattr(self,n,this) def _splitlist(self, lst, n): #split a list in lists of n members. for i in range(0, len(lst), n): yield lst[i:i + n] def run(self, commands,*, folder = None, prompt = None, stdout = None, parallel = 10): ''' Run a command or list of commands on all the nodes in nodelist. ### Parameters: commands (str/list): Commands to run on the node. Should be str or list of str. ### Optional Named Parameters: folder (str): Path where output log should be stored, leave empty to disable logging. prompt (str): Prompt to be expected after a command is finished running. Usually linux uses ">" or EOF while routers use ">" or "#". The default value should work for most nodes. Change it if your connection need some special symbol. stdout (bool): Set True to send the command output to stdout. Default False. parallel (int): Number of nodes to run the commands simultaneously. Default is 10, if there are more nodes that this value, nodes are groups in groups with max this number of members. ###Returns: dict: Dictionary formed by nodes unique as keys, Output of the commands you ran on the node as value. ''' args = {} args["commands"] = commands if folder != None: args["folder"] = folder if prompt != None: args["prompt"] = prompt if stdout != None: args["stdout"] = stdout output = {} tasks = [] for n in self.nodelist: tasks.append(threading.Thread(target=n.run, kwargs=args)) taskslist = list(self._splitlist(tasks, parallel)) for t in taskslist: for i in t: i.start() for i in t: i.join() for i in self.nodelist: output[i.unique] = i.output self.output = output return output def test(self, commands, expected, *, prompt = None, parallel = 10): ''' Run a command or list of commands on all the nodes in nodelist, then check if expected value appears on the output after the last command. ### Parameters: commands (str/list): Commands to run on the node. Should be str or list of str. expected (str) : Expected text to appear after running all the commands on the node. ### Optional Named Parameters: prompt (str): Prompt to be expected after a command is finished running. Usually linux uses ">" or EOF while routers use ">" or "#". The default value should work for most nodes. Change it if your connection need some special symbol. ### Returns: dict: Dictionary formed by nodes unique as keys, value is True if expected value is found after running the commands, False if prompt is found before. ''' args = {} args["commands"] = commands args["expected"] = expected if prompt != None: args["prompt"] = prompt output = {} result = {} tasks = [] for n in self.nodelist: tasks.append(threading.Thread(target=n.test, kwargs=args)) taskslist = list(self._splitlist(tasks, parallel)) for t in taskslist: for i in t: i.start() for i in t: i.join() for i in self.nodelist: result[i.unique] = i.result output[i.unique] = i.output self.output = output self.result = result return resultMethods- def run(self, commands, *, folder=None, prompt=None, stdout=None, parallel=10)
- 
Run a command or list of commands on all the nodes in nodelist. Parameters:commands (str/list): Commands to run on the node. Should be str or list of str.Optional Named Parameters:folder (str): Path where output log should be stored, leave empty to disable logging. prompt (str): Prompt to be expected after a command is finished running. Usually linux uses ">" or EOF while routers use ">" or "#". The default value should work for most nodes. Change it if your connection need some special symbol. stdout (bool): Set True to send the command output to stdout. Default False. parallel (int): Number of nodes to run the commands simultaneously. Default is 10, if there are more nodes that this value, nodes are groups in groups with max this number of members.Returns:dict: Dictionary formed by nodes unique as keys, Output of the commands you ran on the node as value.Expand source codedef run(self, commands,*, folder = None, prompt = None, stdout = None, parallel = 10): ''' Run a command or list of commands on all the nodes in nodelist. ### Parameters: commands (str/list): Commands to run on the node. Should be str or list of str. ### Optional Named Parameters: folder (str): Path where output log should be stored, leave empty to disable logging. prompt (str): Prompt to be expected after a command is finished running. Usually linux uses ">" or EOF while routers use ">" or "#". The default value should work for most nodes. Change it if your connection need some special symbol. stdout (bool): Set True to send the command output to stdout. Default False. parallel (int): Number of nodes to run the commands simultaneously. Default is 10, if there are more nodes that this value, nodes are groups in groups with max this number of members. ###Returns: dict: Dictionary formed by nodes unique as keys, Output of the commands you ran on the node as value. ''' args = {} args["commands"] = commands if folder != None: args["folder"] = folder if prompt != None: args["prompt"] = prompt if stdout != None: args["stdout"] = stdout output = {} tasks = [] for n in self.nodelist: tasks.append(threading.Thread(target=n.run, kwargs=args)) taskslist = list(self._splitlist(tasks, parallel)) for t in taskslist: for i in t: i.start() for i in t: i.join() for i in self.nodelist: output[i.unique] = i.output self.output = output return output
- def test(self, commands, expected, *, prompt=None, parallel=10)
- 
Run a command or list of commands on all the nodes in nodelist, then check if expected value appears on the output after the last command. Parameters:commands (str/list): Commands to run on the node. Should be str or list of str. expected (str) : Expected text to appear after running all the commands on the node.Optional Named Parameters:prompt (str): Prompt to be expected after a command is finished running. Usually linux uses ">" or EOF while routers use ">" or "#". The default value should work for most nodes. Change it if your connection need some special symbol.Returns:dict: Dictionary formed by nodes unique as keys, value is True if expected value is found after running the commands, False if prompt is found before.Expand source codedef test(self, commands, expected, *, prompt = None, parallel = 10): ''' Run a command or list of commands on all the nodes in nodelist, then check if expected value appears on the output after the last command. ### Parameters: commands (str/list): Commands to run on the node. Should be str or list of str. expected (str) : Expected text to appear after running all the commands on the node. ### Optional Named Parameters: prompt (str): Prompt to be expected after a command is finished running. Usually linux uses ">" or EOF while routers use ">" or "#". The default value should work for most nodes. Change it if your connection need some special symbol. ### Returns: dict: Dictionary formed by nodes unique as keys, value is True if expected value is found after running the commands, False if prompt is found before. ''' args = {} args["commands"] = commands args["expected"] = expected if prompt != None: args["prompt"] = prompt output = {} result = {} tasks = [] for n in self.nodelist: tasks.append(threading.Thread(target=n.test, kwargs=args)) taskslist = list(self._splitlist(tasks, parallel)) for t in taskslist: for i in t: i.start() for i in t: i.join() for i in self.nodelist: result[i.unique] = i.result output[i.unique] = i.output self.output = output self.result = result return result