fix edit bug, tune run/test automation, prepare for testing

This commit is contained in:
2022-05-19 16:11:41 -03:00
parent 6b661f301a
commit 3b26827c70
5 changed files with 238 additions and 78 deletions
+2 -1
View File
@@ -4,7 +4,8 @@ from connpy import *
def main():
conf = configfile()
connapp(conf)
app = connapp(conf)
app.start()
if __name__ == '__main__':
sys.exit(main())
+1 -1
View File
@@ -1,2 +1,2 @@
__version__ = "2.1.0"
__version__ = "2.1.1"
+25 -8
View File
@@ -44,6 +44,17 @@ class connapp:
self.fzf = self.config.config["fzf"]
except:
self.fzf = False
def start(self,argv = sys.argv[1:]):
'''
### Parameters:
- argv (list): List of arguments to pass to the app.
Default: sys.argv[1:]
'''
#DEFAULTPARSER
defaultparser = argparse.ArgumentParser(prog = "conn", description = "SSH and Telnet connection manager", formatter_class=argparse.RawTextHelpFormatter)
subparsers = defaultparser.add_subparsers(title="Commands")
@@ -99,13 +110,13 @@ class connapp:
#Manage sys arguments
commands = ["node", "profile", "mv", "move","copy", "cp", "bulk", "ls", "list", "run", "config"]
profilecmds = ["--add", "-a", "--del", "--rm", "-r", "--mod", "--edit", "-e", "--show", "-s"]
if len(sys.argv) >= 3 and sys.argv[2] == "profile" and sys.argv[1] in profilecmds:
sys.argv[2] = sys.argv[1]
sys.argv[1] = "profile"
if len(sys.argv) < 2 or sys.argv[1] not in commands:
sys.argv.insert(1,"node")
args = defaultparser.parse_args()
args.func(args)
if len(argv) >= 2 and argv[1] == "profile" and argv[0] in profilecmds:
argv[1] = argv[0]
argv[0] = "profile"
if len(argv) < 1 or argv[0] not in commands:
argv.insert(0,"node")
args = defaultparser.parse_args(argv)
return args.func(args)
class _store_type(argparse.Action):
#Custom store type for cli app.
@@ -252,6 +263,7 @@ class connapp:
if not updatenode:
exit(7)
uniques.update(node)
uniques["type"] = "connection"
if sorted(updatenode.items()) == sorted(uniques.items()):
print("Nothing to do here")
return
@@ -546,9 +558,14 @@ class connapp:
try:
myexpected = args["expected"].format(**args["vars"][i])
except:
myexpected = args["expected"]
try:
myexpected = args["expected"].format(**args["vars"]["__global__"])
except:
myexpected = args["expected"]
print(" TEST for '{}' --> ".format(myexpected) + str(nodes.result[i]).upper())
if stdout:
if nodes.status[i] == 0:
print(" " + "-" * (len(myexpected) + 16 + len(str(nodes.result[i]))))
for line in nodes.output[i].splitlines():
print(" " + line)
else:
+34 -20
View File
@@ -10,6 +10,7 @@ from time import sleep
import datetime
import sys
import threading
from pathlib import Path
from copy import deepcopy
#functions and classes
@@ -142,6 +143,7 @@ class node:
t = tb
ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/ ]*[@-~])')
t = ansi_escape.sub('', t)
t = t.lstrip(" \n\r")
if var == False:
d = open(logfile, "w")
d.write(t)
@@ -218,9 +220,11 @@ class node:
'''
connect = self._connect(timeout = timeout)
now = datetime.datetime.now().strftime('%Y-%m-%d_%H%M%S')
if connect == True:
expects = [prompt, pexpect.EOF, pexpect.TIMEOUT]
output = ''
status = ''
if not isinstance(commands, list):
commands = [commands]
for c in commands:
@@ -233,33 +237,41 @@ class node:
if result == 1:
output = output + self.child.before.decode()
if result == 2:
self.output = output + self.child.before.decode()
self.status = 2
return self.output
result = self.child.expect(expects, timeout = timeout)
if result == 0:
output = output + self.child.before.decode() + self.child.after.decode()
if result == 1:
output = output + self.child.before.decode()
if result == 2:
self.output = output + self.child.before.decode()
self.status = 2
return self.output
output = output + self.child.before.decode()
status = 2
break
if not status == 2:
result = self.child.expect(expects, timeout = timeout)
if result == 0:
output = output + self.child.before.decode() + self.child.after.decode()
if result == 1:
output = output + self.child.before.decode()
if result == 2:
output = output + self.child.before.decode()
status = 2
self.child.close()
output = output.lstrip()
output = self._logclean(output, True)
if stdout == True:
print(output)
if folder != '':
with open(folder + "/" + self.unique, "w") as f:
with open(folder + "/" + self.unique + "_" + now + ".txt", "w") as f:
f.write(output)
f.close()
self._logclean(folder + "/" + self.unique)
self.output = output
self.status = 0
if status == 2:
self.status = 2
else:
self.status = 0
return output
else:
self.output = connect
self.status = 1
if stdout == True:
print(connect)
if folder != '':
with open(folder + "/" + self.unique + "_" + now + ".txt", "w") as f:
f.write(connect)
f.close()
return connect
def test(self, commands, expected, vars = None,*, prompt = r'>$|#$|\$$|>.$|#.$|\$.$', timeout = 10):
@@ -317,8 +329,9 @@ class node:
if result == 1:
output = output + self.child.before.decode()
if result == 2:
output = output + self.child.before.decode()
self.result = None
self.output = output + self.child.before.decode()
self.output = self._logclean(output, True)
self.status = 2
return self.output
if vars is not None:
@@ -329,7 +342,7 @@ class node:
if results == 0:
self.result = True
output = output + self.child.before.decode() + self.child.after.decode()
output = output.lstrip()
output = self._logclean(output, True)
self.output = output
self.status = 0
return True
@@ -339,14 +352,14 @@ class node:
output = output + self.child.before.decode() + self.child.after.decode()
elif results == 2:
output = output + self.child.before.decode()
output = output.lstrip()
output = self._logclean(output, True)
self.output = output
self.status = 0
return False
if results == 3:
self.result = None
output = output + self.child.before.decode()
output = output.lstrip()
output = self._logclean(output, True)
self.output = output
self.status = 2
return output
@@ -544,6 +557,7 @@ class nodes:
args["commands"] = commands
if folder != None:
args["folder"] = folder
Path(folder).mkdir(parents=True, exist_ok=True)
if prompt != None:
args["prompt"] = prompt
if stdout != None: