diff --git a/.gitignore b/.gitignore
index 21e1cd4..a2d4457 100644
--- a/.gitignore
+++ b/.gitignore
@@ -142,3 +142,6 @@ GEMINI.md
node_modules/
package-lock.json
package.json
+
+# Development docs
+connpy_roadmap.md
diff --git a/connpy/_version.py b/connpy/_version.py
index f27f650..c25905d 100644
--- a/connpy/_version.py
+++ b/connpy/_version.py
@@ -1,2 +1,2 @@
-__version__ = "5.0b2"
+__version__ = "5.0b3"
diff --git a/connpy/completion.py b/connpy/completion.py
index 448fcd5..e46427c 100755
--- a/connpy/completion.py
+++ b/connpy/completion.py
@@ -97,9 +97,21 @@ def main():
configdir = f.read().strip()
except (FileNotFoundError, IOError):
configdir = defaultdir
- defaultfile = configdir + '/config.json'
- jsonconf = open(defaultfile)
- config = json.load(jsonconf)
+ cachefile = configdir + '/.config.cache.json'
+ try:
+ with open(cachefile, "r") as jsonconf:
+ config = json.load(jsonconf)
+ except FileNotFoundError:
+ try:
+ import yaml
+ with open(configdir + '/config.yaml', "r") as yamlconf:
+ config = yaml.safe_load(yamlconf)
+ except Exception:
+ try:
+ with open(configdir + '/config.json', "r") as jsonconf:
+ config = json.load(jsonconf)
+ except Exception:
+ exit()
nodes = _getallnodes(config)
folders = _getallfolders(config)
profiles = list(config["profiles"].keys())
diff --git a/connpy/configfile.py b/connpy/configfile.py
index 5a4a0e0..716aca0 100755
--- a/connpy/configfile.py
+++ b/connpy/configfile.py
@@ -3,6 +3,8 @@
import json
import os
import re
+import yaml
+import shutil
from Crypto.PublicKey import RSA
from Crypto.Cipher import PKCS1_OAEP
from pathlib import Path
@@ -65,16 +67,40 @@ class configfile:
with open(pathfile, "w") as f:
f.write(str(defaultdir))
configdir = defaultdir
- defaultfile = configdir + '/config.json'
+ defaultfile = configdir + '/config.yaml'
+ self.cachefile = configdir + '/.config.cache.json'
+ self.fzf_cachefile = configdir + '/.fzf_nodes_cache.txt'
defaultkey = configdir + '/.osk'
if conf == None:
self.file = defaultfile
+
+ # Backwards compatibility: Migrate from JSON to YAML
+ legacy_json = configdir + '/config.json'
+ legacy_noext = configdir + '/config'
+ legacy_file = None
+ if os.path.exists(legacy_json): legacy_file = legacy_json
+ elif os.path.exists(legacy_noext): legacy_file = legacy_noext
+
+ if not os.path.exists(self.file) and legacy_file:
+ try:
+ with open(legacy_file, 'r') as f:
+ old_data = json.load(f)
+ with open(self.file, 'w') as f:
+ yaml.dump(old_data, f, default_flow_style=False, sort_keys=False)
+ with open(self.cachefile, 'w') as f:
+ json.dump(old_data, f)
+ shutil.move(legacy_file, legacy_file + ".backup")
+ printer.success(f"Migrated legacy config ({len(old_data.get('connections',{}))} folders/nodes) into YAML and Cache successfully!")
+ except Exception as e:
+ printer.warning(f"Failed to migrate legacy config: {e}")
else:
self.file = conf
+
if key == None:
self.key = defaultkey
else:
self.key = key
+
if os.path.exists(self.file):
config = self._loadconfig(self.file)
else:
@@ -91,23 +117,38 @@ class configfile:
def _loadconfig(self, conf):
- #Loads config file
- jsonconf = open(conf)
- jsondata = json.load(jsonconf)
- jsonconf.close()
- return jsondata
+ #Loads config file using dual cache
+ cache_exists = os.path.exists(self.cachefile)
+ yaml_time = os.path.getmtime(conf) if os.path.exists(conf) else 0
+ cache_time = os.path.getmtime(self.cachefile) if cache_exists else 0
+
+ if not cache_exists or yaml_time > cache_time:
+ with open(conf, 'r') as f:
+ data = yaml.safe_load(f)
+ try:
+ with open(self.cachefile, 'w') as f:
+ json.dump(data, f)
+ except Exception:
+ pass
+ return data
+ else:
+ with open(self.cachefile, 'r') as f:
+ return json.load(f)
def _createconfig(self, conf):
#Create config file
defaultconfig = {'config': {'case': False, 'idletime': 30, 'fzf': False}, 'connections': {}, 'profiles': { "default": { "host":"", "protocol":"ssh", "port":"", "user":"", "password":"", "options":"", "logs":"", "tags": "", "jumphost":""}}}
if not os.path.exists(conf):
with open(conf, "w") as f:
- json.dump(defaultconfig, f, indent = 4)
- f.close()
+ yaml.dump(defaultconfig, f, default_flow_style=False, sort_keys=False)
os.chmod(conf, 0o600)
- jsonconf = open(conf)
- jsondata = json.load(jsonconf)
- jsonconf.close()
+ try:
+ with open(self.cachefile, 'w') as f:
+ json.dump(defaultconfig, f)
+ except Exception:
+ pass
+ with open(conf, 'r') as f:
+ jsondata = yaml.safe_load(f)
return jsondata
@MethodHook
@@ -119,13 +160,23 @@ class configfile:
newconfig["profiles"] = self.profiles
try:
with open(conf, "w") as f:
- json.dump(newconfig, f, indent = 4)
- f.close()
+ yaml.dump(newconfig, f, default_flow_style=False, sort_keys=False)
+ with open(self.cachefile, "w") as f:
+ json.dump(newconfig, f)
+ self._generate_nodes_cache()
except (IOError, OSError) as e:
printer.error(f"Failed to save config: {e}")
return 1
return 0
+ def _generate_nodes_cache(self):
+ try:
+ nodes = self._getallnodes()
+ with open(self.fzf_cachefile, "w") as f:
+ f.write("\n".join(nodes))
+ except Exception:
+ pass
+
def _createkey(self, keyfile):
#Create key file
key = RSA.generate(2048)
@@ -344,15 +395,15 @@ class configfile:
def _getallnodes(self, filter = None):
#get all nodes on configfile
nodes = []
- layer1 = [k for k,v in self.connections.items() if isinstance(v, dict) and v["type"] == "connection"]
- folders = [k for k,v in self.connections.items() if isinstance(v, dict) and v["type"] == "folder"]
+ layer1 = [k for k,v in self.connections.items() if isinstance(v, dict) and v.get("type") == "connection"]
+ folders = [k for k,v in self.connections.items() if isinstance(v, dict) and v.get("type") == "folder"]
nodes.extend(layer1)
for f in folders:
- layer2 = [k + "@" + f for k,v in self.connections[f].items() if isinstance(v, dict) and v["type"] == "connection"]
+ layer2 = [k + "@" + f for k,v in self.connections[f].items() if isinstance(v, dict) and v.get("type") == "connection"]
nodes.extend(layer2)
- subfolders = [k for k,v in self.connections[f].items() if isinstance(v, dict) and v["type"] == "subfolder"]
+ subfolders = [k for k,v in self.connections[f].items() if isinstance(v, dict) and v.get("type") == "subfolder"]
for s in subfolders:
- layer3 = [k + "@" + s + "@" + f for k,v in self.connections[f][s].items() if isinstance(v, dict) and v["type"] == "connection"]
+ layer3 = [k + "@" + s + "@" + f for k,v in self.connections[f][s].items() if isinstance(v, dict) and v.get("type") == "connection"]
nodes.extend(layer3)
if filter:
if isinstance(filter, str):
@@ -367,15 +418,15 @@ class configfile:
def _getallnodesfull(self, filter = None, extract = True):
#get all nodes on configfile with all their attributes.
nodes = {}
- layer1 = {k:v for k,v in self.connections.items() if isinstance(v, dict) and v["type"] == "connection"}
- folders = [k for k,v in self.connections.items() if isinstance(v, dict) and v["type"] == "folder"]
+ layer1 = {k:v for k,v in self.connections.items() if isinstance(v, dict) and v.get("type") == "connection"}
+ folders = [k for k,v in self.connections.items() if isinstance(v, dict) and v.get("type") == "folder"]
nodes.update(layer1)
for f in folders:
- layer2 = {k + "@" + f:v for k,v in self.connections[f].items() if isinstance(v, dict) and v["type"] == "connection"}
+ layer2 = {k + "@" + f:v for k,v in self.connections[f].items() if isinstance(v, dict) and v.get("type") == "connection"}
nodes.update(layer2)
- subfolders = [k for k,v in self.connections[f].items() if isinstance(v, dict) and v["type"] == "subfolder"]
+ subfolders = [k for k,v in self.connections[f].items() if isinstance(v, dict) and v.get("type") == "subfolder"]
for s in subfolders:
- layer3 = {k + "@" + s + "@" + f:v for k,v in self.connections[f][s].items() if isinstance(v, dict) and v["type"] == "connection"}
+ layer3 = {k + "@" + s + "@" + f:v for k,v in self.connections[f][s].items() if isinstance(v, dict) and v.get("type") == "connection"}
nodes.update(layer3)
if filter:
if isinstance(filter, str):
@@ -406,27 +457,27 @@ class configfile:
@MethodHook
def _getallfolders(self):
#get all folders on configfile
- folders = ["@" + k for k,v in self.connections.items() if isinstance(v, dict) and v["type"] == "folder"]
+ folders = ["@" + k for k,v in self.connections.items() if isinstance(v, dict) and v.get("type") == "folder"]
subfolders = []
for f in folders:
- s = ["@" + k + f for k,v in self.connections[f[1:]].items() if isinstance(v, dict) and v["type"] == "subfolder"]
+ s = ["@" + k + f for k,v in self.connections[f[1:]].items() if isinstance(v, dict) and v.get("type") == "subfolder"]
subfolders.extend(s)
folders.extend(subfolders)
return folders
@MethodHook
def _profileused(self, profile):
- #Check if profile is used before deleting it
+ #Return all the nodes that uses this profile.
nodes = []
- layer1 = [k for k,v in self.connections.items() if isinstance(v, dict) and v["type"] == "connection" and ("@" + profile in v.values() or ( isinstance(v["password"],list) and "@" + profile in v["password"]))]
- folders = [k for k,v in self.connections.items() if isinstance(v, dict) and v["type"] == "folder"]
+ layer1 = [k for k,v in self.connections.items() if isinstance(v, dict) and v.get("type") == "connection" and ("@" + profile in v.values() or ( isinstance(v.get("password"),list) and "@" + profile in v.get("password")))]
+ folders = [k for k,v in self.connections.items() if isinstance(v, dict) and v.get("type") == "folder"]
nodes.extend(layer1)
for f in folders:
- layer2 = [k + "@" + f for k,v in self.connections[f].items() if isinstance(v, dict) and v["type"] == "connection" and ("@" + profile in v.values() or ( isinstance(v["password"],list) and "@" + profile in v["password"]))]
+ layer2 = [k + "@" + f for k,v in self.connections[f].items() if isinstance(v, dict) and v.get("type") == "connection" and ("@" + profile in v.values() or ( isinstance(v.get("password"),list) and "@" + profile in v.get("password")))]
nodes.extend(layer2)
- subfolders = [k for k,v in self.connections[f].items() if isinstance(v, dict) and v["type"] == "subfolder"]
+ subfolders = [k for k,v in self.connections[f].items() if isinstance(v, dict) and v.get("type") == "subfolder"]
for s in subfolders:
- layer3 = [k + "@" + s + "@" + f for k,v in self.connections[f][s].items() if isinstance(v, dict) and v["type"] == "connection" and ("@" + profile in v.values() or ( isinstance(v["password"],list) and "@" + profile in v["password"]))]
+ layer3 = [k + "@" + s + "@" + f for k,v in self.connections[f][s].items() if isinstance(v, dict) and v.get("type") == "connection" and ("@" + profile in v.values() or ( isinstance(v.get("password"),list) and "@" + profile in v.get("password")))]
nodes.extend(layer3)
return nodes
diff --git a/connpy/connapp.py b/connpy/connapp.py
index de97bc8..8a340c4 100755
--- a/connpy/connapp.py
+++ b/connpy/connapp.py
@@ -166,6 +166,7 @@ class connapp:
configcrud.add_argument("--fzf", dest="fzf", nargs=1, action=self._store_type, help="Use fzf for lists", choices=["true","false"])
configcrud.add_argument("--keepalive", dest="idletime", nargs=1, action=self._store_type, help="Set keepalive time in seconds, 0 to disable", type=int, metavar="INT")
configcrud.add_argument("--completion", dest="completion", nargs=1, choices=["bash","zsh"], action=self._store_type, help="Get terminal completion configuration for conn")
+ configcrud.add_argument("--fzf-wrapper", dest="fzf_wrapper", nargs=1, choices=["bash","zsh"], action=self._store_type, help="Get 0ms latency fzf bash/zsh wrapper")
configcrud.add_argument("--configfolder", dest="configfolder", nargs=1, action=self._store_type, help="Set the default location for config file", metavar="FOLDER")
configcrud.add_argument("--engineer-model", dest="engineer_model", nargs=1, action=self._store_type, help="Set engineer model", metavar="MODEL")
configcrud.add_argument("--engineer-api-key", dest="engineer_api_key", nargs=1, action=self._store_type, help="Set engineer api_key", metavar="API_KEY")
@@ -186,6 +187,10 @@ class connapp:
printer.warning(e)
for preload in self.plugins.preloads.values():
preload.Preload(self)
+
+ if not os.path.exists(self.config.fzf_cachefile):
+ self.config._generate_nodes_cache()
+
#Generate helps
nodeparser.usage = self._help("usage", subparsers)
nodeparser.epilog = self._help("end", subparsers)
@@ -482,7 +487,7 @@ class connapp:
def _func_others(self, args):
#Function called when using other commands
- actions = {"ls": self._ls, "move": self._mvcp, "cp": self._mvcp, "bulk": self._bulk, "completion": self._completion, "case": self._case, "fzf": self._fzf, "idletime": self._idletime, "configfolder": self._configfolder, "engineer_model": self._ai_config, "engineer_api_key": self._ai_config, "architect_model": self._ai_config, "architect_api_key": self._ai_config}
+ actions = {"ls": self._ls, "move": self._mvcp, "cp": self._mvcp, "bulk": self._bulk, "completion": self._completion, "fzf_wrapper": self._fzf_wrapper, "case": self._case, "fzf": self._fzf, "idletime": self._idletime, "configfolder": self._configfolder, "engineer_model": self._ai_config, "engineer_api_key": self._ai_config, "architect_model": self._ai_config, "architect_api_key": self._ai_config}
return actions.get(args.command)(args)
def _ai_config(self, args):
@@ -622,6 +627,12 @@ class connapp:
elif args.data[0] == "zsh":
print(self._help("zshcompletion"))
+ def _fzf_wrapper(self, args):
+ if args.data[0] == "bash":
+ print(self._help("fzf_wrapper_bash"))
+ elif args.data[0] == "zsh":
+ print(self._help("fzf_wrapper_zsh"))
+
def _case(self, args):
if args.data[0] == "true":
args.data[0] = True
@@ -1520,10 +1531,10 @@ _conn()
mapfile -t strings < <(connpy-completion-helper "bash" "${#COMP_WORDS[@]}" "${COMP_WORDS[@]}")
local IFS=$'\t\n'
local home_dir=$(eval echo ~)
- local last_word=${COMP_WORDS[-1]/\~/$home_dir}
+ local last_word=${COMP_WORDS[-1]/\\~/$home_dir}
COMPREPLY=($(compgen -W "$(printf '%s' "${strings[@]}")" -- "$last_word"))
if [ "$last_word" != "${COMP_WORDS[-1]}" ]; then
- COMPREPLY=(${COMPREPLY[@]/$home_dir/\~})
+ COMPREPLY=(${COMPREPLY[@]/$home_dir/\\~})
fi
}
@@ -1538,12 +1549,12 @@ autoload -U compinit && compinit
_conn()
{
local home_dir=$(eval echo ~)
- last_word=${words[-1]/\~/$home_dir}
+ last_word=${words[-1]/\\~/$home_dir}
strings=($(connpy-completion-helper "zsh" ${#words} $words[1,-2] $last_word))
for string in "${strings[@]}"; do
#Replace the expanded home directory with ~
if [ "$last_word" != "$words[-1]" ]; then
- string=${string/$home_dir/\~}
+ string=${string/$home_dir/\\~}
fi
if [[ "${string}" =~ .*/$ ]]; then
# If the string ends with a '/', do not append a space
@@ -1558,10 +1569,51 @@ compdef _conn conn
compdef _conn connpy
#Here ends zsh completion for conn
'''
+ if type == "fzf_wrapper_bash":
+ return '''\n#Here starts bash 0ms fzf wrapper for connpy
+connpy() {
+ if [ $# -eq 0 ]; then
+ local selected
+ if [ -f ~/.config/conn/.fzf_nodes_cache.txt ]; then
+ selected=$(cat ~/.config/conn/.fzf_nodes_cache.txt | fzf-tmux -d 25% --reverse)
+ else
+ command connpy
+ return
+ fi
+ if [ -n "$selected" ]; then
+ command connpy "$selected"
+ fi
+ else
+ command connpy "$@"
+ fi
+}
+alias c="connpy"
+#Here ends bash 0ms fzf wrapper\n'''
+
+ if type == "fzf_wrapper_zsh":
+ return '''\n#Here starts zsh 0ms fzf wrapper for connpy
+connpy() {
+ if [ $# -eq 0 ]; then
+ local selected
+ if [ -f ~/.config/conn/.fzf_nodes_cache.txt ]; then
+ selected=$(cat ~/.config/conn/.fzf_nodes_cache.txt | fzf-tmux -d 25% --reverse)
+ else
+ command connpy
+ return
+ fi
+ if [ -n "$selected" ]; then
+ command connpy "$selected"
+ fi
+ else
+ command connpy "$@"
+ fi
+}
+alias c="connpy"
+#Here ends zsh 0ms fzf wrapper\n'''
if type == "run":
return "node[@subfolder][@folder] commmand to run\nRun the specific command on the node and print output\n/path/to/file.yaml\nUse a yaml file to run an automation script"
if type == "generate":
- return '''---
+ return r'''---
tasks:
- name: "Config"
diff --git a/connpy/core_plugins/sync.py b/connpy/core_plugins/sync.py
index 92eb040..712bca2 100755
--- a/connpy/core_plugins/sync.py
+++ b/connpy/core_plugins/sync.py
@@ -213,7 +213,7 @@ class sync:
def compress_specific_files(self, zip_path):
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
- zipf.write(self.file, "config.json")
+ zipf.write(self.file, os.path.basename(self.file))
zipf.write(self.key, ".osk")
def compress_and_upload(self):
@@ -251,8 +251,23 @@ class sync:
try:
with zipfile.ZipFile(zip_path, 'r') as zipf:
# Extract the specific file to the specified destination
- zipf.extract("config.json", os.path.dirname(self.file))
- zipf.extract(".osk", os.path.dirname(self.key))
+ names = zipf.namelist()
+ if "config.yaml" in names:
+ zipf.extract("config.yaml", os.path.dirname(self.file))
+ elif "config.json" in names:
+ zipf.extract("config.json", os.path.dirname(self.file))
+
+ if ".osk" in names:
+ zipf.extract(".osk", os.path.dirname(self.key))
+
+ # Delete caches to force auto-regeneration on next run
+ try:
+ if os.path.exists(self.connapp.config.cachefile):
+ os.remove(self.connapp.config.cachefile)
+ if os.path.exists(self.connapp.config.fzf_cachefile):
+ os.remove(self.connapp.config.fzf_cachefile)
+ except Exception:
+ pass
return 0
except Exception as e:
printer.error(f"An error occurred: {e}")
diff --git a/connpy/tests/test_capture.py b/connpy/tests/test_capture.py
new file mode 100644
index 0000000..b848c56
--- /dev/null
+++ b/connpy/tests/test_capture.py
@@ -0,0 +1,51 @@
+"""Tests for connpy.core_plugins.capture"""
+import pytest
+from unittest.mock import MagicMock, patch
+from connpy.core_plugins.capture import RemoteCapture
+
+@pytest.fixture
+def mock_connapp():
+ app = MagicMock()
+ app.nodes_list = ["test_node"]
+ app.config.getitem.return_value = {"host": "127.0.0.1", "protocol": "ssh"}
+ mock_node = MagicMock()
+ mock_node.protocol = "ssh"
+ mock_node.unique = "test_node"
+ app.node.return_value = mock_node
+ app.config.config = {"wireshark_path": "/fake/ws"}
+ return app
+
+class TestRemoteCapture:
+ def test_init_node_not_found(self, mock_connapp):
+ # Attempt to capture a node not in nodes_list
+ mock_connapp.nodes_list = ["other_node"]
+ with pytest.raises(SystemExit) as exc:
+ RemoteCapture(mock_connapp, "test_node", "eth0")
+ assert exc.value.code == 2
+
+ def test_init_success(self, mock_connapp):
+ rc = RemoteCapture(mock_connapp, "test_node", "eth0")
+ assert rc.node_name == "test_node"
+ assert rc.interface == "eth0"
+ assert rc.wireshark_path == "/fake/ws"
+
+ @patch("connpy.core_plugins.capture.socket")
+ def test_is_port_in_use(self, mock_socket, mock_connapp):
+ rc = RemoteCapture(mock_connapp, "test_node", "eth0")
+ mock_sock_instance = MagicMock()
+ mock_socket.socket.return_value.__enter__.return_value = mock_sock_instance
+
+ mock_sock_instance.connect_ex.return_value = 0
+ assert rc._is_port_in_use(8080) is True
+
+ mock_sock_instance.connect_ex.return_value = 1
+ assert rc._is_port_in_use(8080) is False
+
+ @patch.object(RemoteCapture, "_is_port_in_use")
+ def test_find_free_port(self, mock_is_in_use, mock_connapp):
+ rc = RemoteCapture(mock_connapp, "test_node", "eth0")
+ # First 2 ports in use, 3rd is free
+ mock_is_in_use.side_effect = [True, True, False]
+ port = rc._find_free_port(20000, 30000)
+ assert 20000 <= port <= 30000
+ assert mock_is_in_use.call_count == 3
diff --git a/connpy/tests/test_configfile.py b/connpy/tests/test_configfile.py
index 6420bd7..39c508b 100644
--- a/connpy/tests/test_configfile.py
+++ b/connpy/tests/test_configfile.py
@@ -3,14 +3,15 @@ import json
import os
import re
import pytest
+import yaml
from copy import deepcopy
class TestConfigfileInit:
def test_creates_default_config(self, tmp_config_dir):
- """Creates config.json with defaults when it doesn't exist."""
- config_file = tmp_config_dir / "config.json"
- config_file.unlink() # Remove existing
+ """Creates config.yaml with defaults when it doesn't exist."""
+ config_file = tmp_config_dir / "config.yaml"
+ config_file.unlink(missing_ok=True) # Remove existing
key_file = tmp_config_dir / ".osk"
from connpy.configfile import configfile
@@ -27,7 +28,7 @@ class TestConfigfileInit:
key_file.unlink() # Remove existing
from connpy.configfile import configfile
- conf = configfile(conf=str(tmp_config_dir / "config.json"), key=str(key_file))
+ conf = configfile(conf=str(tmp_config_dir / "config.yaml"), key=str(key_file))
assert key_file.exists()
assert conf.privatekey is not None
@@ -41,8 +42,8 @@ class TestConfigfileInit:
def test_config_file_permissions(self, tmp_config_dir):
"""Config is created with 0o600 permissions."""
- config_file = tmp_config_dir / "config.json"
- config_file.unlink()
+ config_file = tmp_config_dir / "config.yaml"
+ config_file.unlink(missing_ok=True)
from connpy.configfile import configfile
configfile(conf=str(config_file), key=str(tmp_config_dir / ".osk"))
@@ -62,7 +63,7 @@ class TestConfigfileInit:
(dot_folder / ".folder").write_text(str(config_dir))
(dot_folder / "plugins").mkdir(exist_ok=True)
- conf_path = str(config_dir / "my_config.json")
+ conf_path = str(config_dir / "my_config.yaml")
key_path = str(config_dir / "my_key")
from connpy.configfile import configfile
@@ -248,7 +249,7 @@ class TestGetItem:
def test_getitem_with_profile_extraction(self, tmp_config_dir):
"""extract=True resolves @profile references."""
- config_file = tmp_config_dir / "config.json"
+ config_file = tmp_config_dir / "config.yaml"
data = {
"config": {"case": False, "idletime": 30, "fzf": False},
"connections": {
@@ -268,7 +269,7 @@ class TestGetItem:
"options": "", "logs": "", "tags": "", "jumphost": ""}
}
}
- config_file.write_text(json.dumps(data, indent=4))
+ config_file.write_text(yaml.dump(data, default_flow_style=False, sort_keys=False))
from connpy.configfile import configfile
conf = configfile(conf=str(config_file), key=str(tmp_config_dir / ".osk"))
@@ -326,7 +327,7 @@ class TestGetAll:
def test_profileused(self, tmp_config_dir):
"""Detects nodes using a specific profile."""
- config_file = tmp_config_dir / "config.json"
+ config_file = tmp_config_dir / "config.yaml"
data = {
"config": {"case": False, "idletime": 30, "fzf": False},
"connections": {
@@ -352,7 +353,7 @@ class TestGetAll:
"options": "", "logs": "", "tags": "", "jumphost": ""}
}
}
- config_file.write_text(json.dumps(data, indent=4))
+ config_file.write_text(yaml.dump(data, default_flow_style=False, sort_keys=False))
from connpy.configfile import configfile
conf = configfile(conf=str(config_file), key=str(tmp_config_dir / ".osk"))
diff --git a/connpy/tests/test_context.py b/connpy/tests/test_context.py
new file mode 100644
index 0000000..f38709e
--- /dev/null
+++ b/connpy/tests/test_context.py
@@ -0,0 +1,109 @@
+"""Tests for connpy.core_plugins.context"""
+import pytest
+from unittest.mock import MagicMock, patch
+from connpy.core_plugins.context import context_manager, Preload, Entrypoint
+
+@pytest.fixture
+def mock_connapp():
+ connapp = MagicMock()
+ connapp.config.config = {
+ "contexts": {"all": [".*"]},
+ "current_context": "all"
+ }
+ return connapp
+
+class TestContextManager:
+ def test_init(self, mock_connapp):
+ cm = context_manager(mock_connapp)
+ assert cm.contexts == {"all": [".*"]}
+ assert cm.current_context == "all"
+ assert len(cm.regex) == 1
+
+ def test_add_context_success(self, mock_connapp):
+ cm = context_manager(mock_connapp)
+ cm.add_context("prod", ["^prod_.*"])
+ assert "prod" in cm.contexts
+ mock_connapp._change_settings.assert_called_with("contexts", cm.contexts)
+
+ def test_add_context_invalid_name(self, mock_connapp):
+ cm = context_manager(mock_connapp)
+ with pytest.raises(SystemExit) as exc:
+ cm.add_context("prod-env", ["Regex"])
+ assert exc.value.code == 1
+
+ def test_add_context_already_exists(self, mock_connapp):
+ cm = context_manager(mock_connapp)
+ with pytest.raises(SystemExit) as exc:
+ cm.add_context("all", ["Regex"])
+ assert exc.value.code == 2
+
+ def test_modify_context_success(self, mock_connapp):
+ cm = context_manager(mock_connapp)
+ cm.add_context("prod", ["old"])
+ cm.modify_context("prod", ["new"])
+ assert cm.contexts["prod"] == ["new"]
+
+ def test_modify_context_all(self, mock_connapp):
+ cm = context_manager(mock_connapp)
+ with pytest.raises(SystemExit) as exc:
+ cm.modify_context("all", ["new"])
+ assert exc.value.code == 3
+
+ def test_modify_context_not_exists(self, mock_connapp):
+ cm = context_manager(mock_connapp)
+ with pytest.raises(SystemExit) as exc:
+ cm.modify_context("fake", ["new"])
+ assert exc.value.code == 4
+
+ def test_delete_context_success(self, mock_connapp):
+ cm = context_manager(mock_connapp)
+ cm.add_context("prod", ["old"])
+ cm.delete_context("prod")
+ assert "prod" not in cm.contexts
+
+ def test_delete_context_all(self, mock_connapp):
+ cm = context_manager(mock_connapp)
+ with pytest.raises(SystemExit) as exc:
+ cm.delete_context("all")
+ assert exc.value.code == 3
+
+ def test_delete_context_current(self, mock_connapp):
+ mock_connapp.config.config["current_context"] = "prod"
+ mock_connapp.config.config["contexts"]["prod"] = [".*"]
+ cm = context_manager(mock_connapp)
+ with pytest.raises(SystemExit) as exc:
+ cm.delete_context("prod")
+ assert exc.value.code == 5
+
+ def test_set_context_success(self, mock_connapp):
+ cm = context_manager(mock_connapp)
+ cm.contexts["prod"] = [".*"]
+ cm.set_context("prod")
+ mock_connapp._change_settings.assert_called_with("current_context", "prod")
+
+ def test_set_context_already_set(self, mock_connapp):
+ cm = context_manager(mock_connapp)
+ with pytest.raises(SystemExit) as exc:
+ cm.set_context("all")
+ assert exc.value.code == 0
+
+ def test_match_regexp(self, mock_connapp):
+ mock_connapp.config.config["contexts"]["all"] = ["^prod", "^test"]
+ cm = context_manager(mock_connapp)
+ assert cm.match_any_regex("prod_node", cm.regex) is True
+ assert cm.match_any_regex("test_node", cm.regex) is True
+ assert cm.match_any_regex("dev_node", cm.regex) is False
+
+ def test_modify_node_list(self, mock_connapp):
+ mock_connapp.config.config["contexts"]["all"] = ["^prod"]
+ cm = context_manager(mock_connapp)
+ nodes = ["prod_1", "dev_1", "prod_2"]
+ result = cm.modify_node_list(result=nodes)
+ assert result == ["prod_1", "prod_2"]
+
+ def test_modify_node_dict(self, mock_connapp):
+ mock_connapp.config.config["contexts"]["all"] = ["^prod"]
+ cm = context_manager(mock_connapp)
+ nodes = {"prod_1": {}, "dev_1": {}, "prod_2": {}}
+ result = cm.modify_node_dict(result=nodes)
+ assert set(result.keys()) == {"prod_1", "prod_2"}
diff --git a/connpy/tests/test_sync.py b/connpy/tests/test_sync.py
new file mode 100644
index 0000000..95bc712
--- /dev/null
+++ b/connpy/tests/test_sync.py
@@ -0,0 +1,108 @@
+"""Tests for connpy.core_plugins.sync"""
+import pytest
+from unittest.mock import MagicMock, patch, mock_open
+from connpy.core_plugins.sync import sync
+
+@pytest.fixture
+def mock_connapp():
+ app = MagicMock()
+ app.config.defaultdir = "/fake/dir"
+ app.config.file = "/fake/dir/config.yaml"
+ app.config.key = "/fake/dir/.osk"
+ app.config.config = {"sync": True}
+ return app
+
+class TestSyncPlugin:
+ def test_init(self, mock_connapp):
+ s = sync(mock_connapp)
+ assert s.sync is True
+ assert s.file == "/fake/dir/config.yaml"
+ assert s.token_file == "/fake/dir/gtoken.json"
+
+ @patch("connpy.core_plugins.sync.os.path.exists")
+ @patch("connpy.core_plugins.sync.Credentials")
+ def test_get_credentials_success(self, MockCreds, mock_exists, mock_connapp):
+ mock_exists.return_value = True
+ mock_cred_instance = MagicMock()
+ mock_cred_instance.valid = True
+ MockCreds.from_authorized_user_file.return_value = mock_cred_instance
+
+ s = sync(mock_connapp)
+ creds = s.get_credentials()
+ assert creds == mock_cred_instance
+
+ @patch("connpy.core_plugins.sync.os.path.exists")
+ def test_get_credentials_not_found(self, mock_exists, mock_connapp):
+ mock_exists.return_value = False
+ s = sync(mock_connapp)
+ assert s.get_credentials() == 0
+
+ @patch("connpy.core_plugins.sync.zipfile.ZipFile")
+ @patch("connpy.core_plugins.sync.os.path.basename")
+ def test_compress_specific_files(self, mock_basename, MockZipFile, mock_connapp):
+ mock_basename.return_value = "config.yaml"
+ s = sync(mock_connapp)
+ zip_mock = MagicMock()
+ MockZipFile.return_value.__enter__.return_value = zip_mock
+
+ s.compress_specific_files("/fake/zip.zip")
+ zip_mock.write.assert_any_call(s.file, "config.yaml")
+ zip_mock.write.assert_any_call(s.key, ".osk")
+
+ @patch("connpy.core_plugins.sync.zipfile.ZipFile")
+ @patch("connpy.core_plugins.sync.os.path.dirname")
+ def test_decompress_zip_yaml(self, mock_dirname, MockZipFile, mock_connapp):
+ mock_dirname.return_value = "/fake/dir"
+ s = sync(mock_connapp)
+ zip_mock = MagicMock()
+ zip_mock.namelist.return_value = ["config.yaml", ".osk"]
+ MockZipFile.return_value.__enter__.return_value = zip_mock
+
+ assert s.decompress_zip("/fake/zip.zip") == 0
+ zip_mock.extract.assert_any_call("config.yaml", "/fake/dir")
+ zip_mock.extract.assert_any_call(".osk", "/fake/dir")
+
+ @patch("connpy.core_plugins.sync.zipfile.ZipFile")
+ @patch("connpy.core_plugins.sync.os.path.dirname")
+ def test_decompress_zip_json_fallback(self, mock_dirname, MockZipFile, mock_connapp):
+ mock_dirname.return_value = "/fake/dir"
+ s = sync(mock_connapp)
+ zip_mock = MagicMock()
+ zip_mock.namelist.return_value = ["config.json", ".osk"]
+ MockZipFile.return_value.__enter__.return_value = zip_mock
+
+ assert s.decompress_zip("/fake/old_zip.zip") == 0
+ zip_mock.extract.assert_any_call("config.json", "/fake/dir")
+
+ @patch.object(sync, "get_credentials")
+ @patch("connpy.core_plugins.sync.build")
+ def test_get_appdata_files(self, mock_build, mock_get_credentials, mock_connapp):
+ mock_get_credentials.return_value = MagicMock()
+ mock_service = MagicMock()
+ mock_build.return_value = mock_service
+
+ mock_service.files().list().execute.return_value = {
+ "files": [
+ {"id": "1", "name": "backup1.zip", "appProperties": {"timestamp": "1000", "date": "2024"}}
+ ]
+ }
+
+ s = sync(mock_connapp)
+ files = s.get_appdata_files()
+ assert len(files) == 1
+ assert files[0]["id"] == "1"
+ assert files[0]["timestamp"] == "1000"
+
+ @patch.object(sync, "get_credentials")
+ @patch("connpy.core_plugins.sync.build")
+ @patch("connpy.core_plugins.sync.MediaFileUpload")
+ @patch("connpy.core_plugins.sync.os.path.basename")
+ def test_backup_file_to_drive(self, mock_basename, mock_media, mock_build, mock_get_credentials, mock_connapp):
+ mock_get_credentials.return_value = MagicMock()
+ mock_basename.return_value = "backup.zip"
+ mock_service = MagicMock()
+ mock_build.return_value = mock_service
+
+ s = sync(mock_connapp)
+ assert s.backup_file_to_drive("/fake/backup.zip", 1234567890000) == 0
+ mock_service.files().create.assert_called_once()
diff --git a/docs/connpy/index.html b/docs/connpy/index.html
index c3dd081..0b0daf6 100644
--- a/docs/connpy/index.html
+++ b/docs/connpy/index.html
@@ -2259,16 +2259,40 @@ class configfile:
with open(pathfile, "w") as f:
f.write(str(defaultdir))
configdir = defaultdir
- defaultfile = configdir + '/config.json'
+ defaultfile = configdir + '/config.yaml'
+ self.cachefile = configdir + '/.config.cache.json'
+ self.fzf_cachefile = configdir + '/.fzf_nodes_cache.txt'
defaultkey = configdir + '/.osk'
if conf == None:
self.file = defaultfile
+
+ # Backwards compatibility: Migrate from JSON to YAML
+ legacy_json = configdir + '/config.json'
+ legacy_noext = configdir + '/config'
+ legacy_file = None
+ if os.path.exists(legacy_json): legacy_file = legacy_json
+ elif os.path.exists(legacy_noext): legacy_file = legacy_noext
+
+ if not os.path.exists(self.file) and legacy_file:
+ try:
+ with open(legacy_file, 'r') as f:
+ old_data = json.load(f)
+ with open(self.file, 'w') as f:
+ yaml.dump(old_data, f, default_flow_style=False, sort_keys=False)
+ with open(self.cachefile, 'w') as f:
+ json.dump(old_data, f)
+ shutil.move(legacy_file, legacy_file + ".backup")
+ printer.success(f"Migrated legacy config ({len(old_data.get('connections',{}))} folders/nodes) into YAML and Cache successfully!")
+ except Exception as e:
+ printer.warning(f"Failed to migrate legacy config: {e}")
else:
self.file = conf
+
if key == None:
self.key = defaultkey
else:
self.key = key
+
if os.path.exists(self.file):
config = self._loadconfig(self.file)
else:
@@ -2285,23 +2309,38 @@ class configfile:
def _loadconfig(self, conf):
- #Loads config file
- jsonconf = open(conf)
- jsondata = json.load(jsonconf)
- jsonconf.close()
- return jsondata
+ #Loads config file using dual cache
+ cache_exists = os.path.exists(self.cachefile)
+ yaml_time = os.path.getmtime(conf) if os.path.exists(conf) else 0
+ cache_time = os.path.getmtime(self.cachefile) if cache_exists else 0
+
+ if not cache_exists or yaml_time > cache_time:
+ with open(conf, 'r') as f:
+ data = yaml.safe_load(f)
+ try:
+ with open(self.cachefile, 'w') as f:
+ json.dump(data, f)
+ except Exception:
+ pass
+ return data
+ else:
+ with open(self.cachefile, 'r') as f:
+ return json.load(f)
def _createconfig(self, conf):
#Create config file
defaultconfig = {'config': {'case': False, 'idletime': 30, 'fzf': False}, 'connections': {}, 'profiles': { "default": { "host":"", "protocol":"ssh", "port":"", "user":"", "password":"", "options":"", "logs":"", "tags": "", "jumphost":""}}}
if not os.path.exists(conf):
with open(conf, "w") as f:
- json.dump(defaultconfig, f, indent = 4)
- f.close()
+ yaml.dump(defaultconfig, f, default_flow_style=False, sort_keys=False)
os.chmod(conf, 0o600)
- jsonconf = open(conf)
- jsondata = json.load(jsonconf)
- jsonconf.close()
+ try:
+ with open(self.cachefile, 'w') as f:
+ json.dump(defaultconfig, f)
+ except Exception:
+ pass
+ with open(conf, 'r') as f:
+ jsondata = yaml.safe_load(f)
return jsondata
@MethodHook
@@ -2313,13 +2352,23 @@ class configfile:
newconfig["profiles"] = self.profiles
try:
with open(conf, "w") as f:
- json.dump(newconfig, f, indent = 4)
- f.close()
+ yaml.dump(newconfig, f, default_flow_style=False, sort_keys=False)
+ with open(self.cachefile, "w") as f:
+ json.dump(newconfig, f)
+ self._generate_nodes_cache()
except (IOError, OSError) as e:
printer.error(f"Failed to save config: {e}")
return 1
return 0
+ def _generate_nodes_cache(self):
+ try:
+ nodes = self._getallnodes()
+ with open(self.fzf_cachefile, "w") as f:
+ f.write("\n".join(nodes))
+ except Exception:
+ pass
+
def _createkey(self, keyfile):
#Create key file
key = RSA.generate(2048)
@@ -2538,15 +2587,15 @@ class configfile:
def _getallnodes(self, filter = None):
#get all nodes on configfile
nodes = []
- layer1 = [k for k,v in self.connections.items() if isinstance(v, dict) and v["type"] == "connection"]
- folders = [k for k,v in self.connections.items() if isinstance(v, dict) and v["type"] == "folder"]
+ layer1 = [k for k,v in self.connections.items() if isinstance(v, dict) and v.get("type") == "connection"]
+ folders = [k for k,v in self.connections.items() if isinstance(v, dict) and v.get("type") == "folder"]
nodes.extend(layer1)
for f in folders:
- layer2 = [k + "@" + f for k,v in self.connections[f].items() if isinstance(v, dict) and v["type"] == "connection"]
+ layer2 = [k + "@" + f for k,v in self.connections[f].items() if isinstance(v, dict) and v.get("type") == "connection"]
nodes.extend(layer2)
- subfolders = [k for k,v in self.connections[f].items() if isinstance(v, dict) and v["type"] == "subfolder"]
+ subfolders = [k for k,v in self.connections[f].items() if isinstance(v, dict) and v.get("type") == "subfolder"]
for s in subfolders:
- layer3 = [k + "@" + s + "@" + f for k,v in self.connections[f][s].items() if isinstance(v, dict) and v["type"] == "connection"]
+ layer3 = [k + "@" + s + "@" + f for k,v in self.connections[f][s].items() if isinstance(v, dict) and v.get("type") == "connection"]
nodes.extend(layer3)
if filter:
if isinstance(filter, str):
@@ -2561,15 +2610,15 @@ class configfile:
def _getallnodesfull(self, filter = None, extract = True):
#get all nodes on configfile with all their attributes.
nodes = {}
- layer1 = {k:v for k,v in self.connections.items() if isinstance(v, dict) and v["type"] == "connection"}
- folders = [k for k,v in self.connections.items() if isinstance(v, dict) and v["type"] == "folder"]
+ layer1 = {k:v for k,v in self.connections.items() if isinstance(v, dict) and v.get("type") == "connection"}
+ folders = [k for k,v in self.connections.items() if isinstance(v, dict) and v.get("type") == "folder"]
nodes.update(layer1)
for f in folders:
- layer2 = {k + "@" + f:v for k,v in self.connections[f].items() if isinstance(v, dict) and v["type"] == "connection"}
+ layer2 = {k + "@" + f:v for k,v in self.connections[f].items() if isinstance(v, dict) and v.get("type") == "connection"}
nodes.update(layer2)
- subfolders = [k for k,v in self.connections[f].items() if isinstance(v, dict) and v["type"] == "subfolder"]
+ subfolders = [k for k,v in self.connections[f].items() if isinstance(v, dict) and v.get("type") == "subfolder"]
for s in subfolders:
- layer3 = {k + "@" + s + "@" + f:v for k,v in self.connections[f][s].items() if isinstance(v, dict) and v["type"] == "connection"}
+ layer3 = {k + "@" + s + "@" + f:v for k,v in self.connections[f][s].items() if isinstance(v, dict) and v.get("type") == "connection"}
nodes.update(layer3)
if filter:
if isinstance(filter, str):
@@ -2600,27 +2649,27 @@ class configfile:
@MethodHook
def _getallfolders(self):
#get all folders on configfile
- folders = ["@" + k for k,v in self.connections.items() if isinstance(v, dict) and v["type"] == "folder"]
+ folders = ["@" + k for k,v in self.connections.items() if isinstance(v, dict) and v.get("type") == "folder"]
subfolders = []
for f in folders:
- s = ["@" + k + f for k,v in self.connections[f[1:]].items() if isinstance(v, dict) and v["type"] == "subfolder"]
+ s = ["@" + k + f for k,v in self.connections[f[1:]].items() if isinstance(v, dict) and v.get("type") == "subfolder"]
subfolders.extend(s)
folders.extend(subfolders)
return folders
@MethodHook
def _profileused(self, profile):
- #Check if profile is used before deleting it
+ #Return all the nodes that uses this profile.
nodes = []
- layer1 = [k for k,v in self.connections.items() if isinstance(v, dict) and v["type"] == "connection" and ("@" + profile in v.values() or ( isinstance(v["password"],list) and "@" + profile in v["password"]))]
- folders = [k for k,v in self.connections.items() if isinstance(v, dict) and v["type"] == "folder"]
+ layer1 = [k for k,v in self.connections.items() if isinstance(v, dict) and v.get("type") == "connection" and ("@" + profile in v.values() or ( isinstance(v.get("password"),list) and "@" + profile in v.get("password")))]
+ folders = [k for k,v in self.connections.items() if isinstance(v, dict) and v.get("type") == "folder"]
nodes.extend(layer1)
for f in folders:
- layer2 = [k + "@" + f for k,v in self.connections[f].items() if isinstance(v, dict) and v["type"] == "connection" and ("@" + profile in v.values() or ( isinstance(v["password"],list) and "@" + profile in v["password"]))]
+ layer2 = [k + "@" + f for k,v in self.connections[f].items() if isinstance(v, dict) and v.get("type") == "connection" and ("@" + profile in v.values() or ( isinstance(v.get("password"),list) and "@" + profile in v.get("password")))]
nodes.extend(layer2)
- subfolders = [k for k,v in self.connections[f].items() if isinstance(v, dict) and v["type"] == "subfolder"]
+ subfolders = [k for k,v in self.connections[f].items() if isinstance(v, dict) and v.get("type") == "subfolder"]
for s in subfolders:
- layer3 = [k + "@" + s + "@" + f for k,v in self.connections[f][s].items() if isinstance(v, dict) and v["type"] == "connection" and ("@" + profile in v.values() or ( isinstance(v["password"],list) and "@" + profile in v["password"]))]
+ layer3 = [k + "@" + s + "@" + f for k,v in self.connections[f][s].items() if isinstance(v, dict) and v.get("type") == "connection" and ("@" + profile in v.values() or ( isinstance(v.get("password"),list) and "@" + profile in v.get("password")))]
nodes.extend(layer3)
return nodes
diff --git a/docs/connpy/tests/index.html b/docs/connpy/tests/index.html
index afbe9c3..5b37649 100644
--- a/docs/connpy/tests/index.html
+++ b/docs/connpy/tests/index.html
@@ -52,6 +52,10 @@ el.replaceWith(d);
Tests for connpy.api module — Flask routes.
+connpy.tests.test_capture
+
+Tests for connpy.core_plugins.capture
+
connpy.tests.test_completion
Tests for connpy.completion module.
@@ -60,6 +64,10 @@ el.replaceWith(d);
Tests for connpy.configfile module.
+connpy.tests.test_context
+
+Tests for connpy.core_plugins.context
+
connpy.tests.test_core
Tests for connpy.core module — node and nodes classes.
@@ -76,6 +84,10 @@ el.replaceWith(d);
Tests for connpy.printer module.
+connpy.tests.test_sync
+
+Tests for connpy.core_plugins.sync
+
@@ -100,12 +112,15 @@ el.replaceWith(d);
connpy.tests.conftest
connpy.tests.test_ai
connpy.tests.test_api
+connpy.tests.test_capture
connpy.tests.test_completion
connpy.tests.test_configfile
+connpy.tests.test_context
connpy.tests.test_core
connpy.tests.test_hooks
connpy.tests.test_plugins
connpy.tests.test_printer
+connpy.tests.test_sync
diff --git a/docs/connpy/tests/test_capture.html b/docs/connpy/tests/test_capture.html
new file mode 100644
index 0000000..db68f63
--- /dev/null
+++ b/docs/connpy/tests/test_capture.html
@@ -0,0 +1,235 @@
+
+
+
+
+
+
+connpy.tests.test_capture API documentation
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+Module connpy.tests.test_capture
+
+
+Tests for connpy.core_plugins.capture
+
+
+
+
+
+
+
+def mock_connapp()
+
+-
+
+
+Expand source code
+
+@pytest.fixture
+def mock_connapp():
+ app = MagicMock()
+ app.nodes_list = ["test_node"]
+ app.config.getitem.return_value = {"host": "127.0.0.1", "protocol": "ssh"}
+ mock_node = MagicMock()
+ mock_node.protocol = "ssh"
+ mock_node.unique = "test_node"
+ app.node.return_value = mock_node
+ app.config.config = {"wireshark_path": "/fake/ws"}
+ return app
+
+
+
+
+
+
+
+
+
+class TestRemoteCapture
+
+-
+
+
+Expand source code
+
+class TestRemoteCapture:
+ def test_init_node_not_found(self, mock_connapp):
+ # Attempt to capture a node not in nodes_list
+ mock_connapp.nodes_list = ["other_node"]
+ with pytest.raises(SystemExit) as exc:
+ RemoteCapture(mock_connapp, "test_node", "eth0")
+ assert exc.value.code == 2
+
+ def test_init_success(self, mock_connapp):
+ rc = RemoteCapture(mock_connapp, "test_node", "eth0")
+ assert rc.node_name == "test_node"
+ assert rc.interface == "eth0"
+ assert rc.wireshark_path == "/fake/ws"
+
+ @patch("connpy.core_plugins.capture.socket")
+ def test_is_port_in_use(self, mock_socket, mock_connapp):
+ rc = RemoteCapture(mock_connapp, "test_node", "eth0")
+ mock_sock_instance = MagicMock()
+ mock_socket.socket.return_value.__enter__.return_value = mock_sock_instance
+
+ mock_sock_instance.connect_ex.return_value = 0
+ assert rc._is_port_in_use(8080) is True
+
+ mock_sock_instance.connect_ex.return_value = 1
+ assert rc._is_port_in_use(8080) is False
+
+ @patch.object(RemoteCapture, "_is_port_in_use")
+ def test_find_free_port(self, mock_is_in_use, mock_connapp):
+ rc = RemoteCapture(mock_connapp, "test_node", "eth0")
+ # First 2 ports in use, 3rd is free
+ mock_is_in_use.side_effect = [True, True, False]
+ port = rc._find_free_port(20000, 30000)
+ assert 20000 <= port <= 30000
+ assert mock_is_in_use.call_count == 3
+
+
+Methods
+
+
+def test_find_free_port(self, mock_is_in_use, mock_connapp)
+
+-
+
+
+Expand source code
+
+@patch.object(RemoteCapture, "_is_port_in_use")
+def test_find_free_port(self, mock_is_in_use, mock_connapp):
+ rc = RemoteCapture(mock_connapp, "test_node", "eth0")
+ # First 2 ports in use, 3rd is free
+ mock_is_in_use.side_effect = [True, True, False]
+ port = rc._find_free_port(20000, 30000)
+ assert 20000 <= port <= 30000
+ assert mock_is_in_use.call_count == 3
+
+
+
+
+def test_init_node_not_found(self, mock_connapp)
+
+-
+
+
+Expand source code
+
+def test_init_node_not_found(self, mock_connapp):
+ # Attempt to capture a node not in nodes_list
+ mock_connapp.nodes_list = ["other_node"]
+ with pytest.raises(SystemExit) as exc:
+ RemoteCapture(mock_connapp, "test_node", "eth0")
+ assert exc.value.code == 2
+
+
+
+
+def test_init_success(self, mock_connapp)
+
+-
+
+
+Expand source code
+
+def test_init_success(self, mock_connapp):
+ rc = RemoteCapture(mock_connapp, "test_node", "eth0")
+ assert rc.node_name == "test_node"
+ assert rc.interface == "eth0"
+ assert rc.wireshark_path == "/fake/ws"
+
+
+
+
+def test_is_port_in_use(self, mock_socket, mock_connapp)
+
+-
+
+
+Expand source code
+
+@patch("connpy.core_plugins.capture.socket")
+def test_is_port_in_use(self, mock_socket, mock_connapp):
+ rc = RemoteCapture(mock_connapp, "test_node", "eth0")
+ mock_sock_instance = MagicMock()
+ mock_socket.socket.return_value.__enter__.return_value = mock_sock_instance
+
+ mock_sock_instance.connect_ex.return_value = 0
+ assert rc._is_port_in_use(8080) is True
+
+ mock_sock_instance.connect_ex.return_value = 1
+ assert rc._is_port_in_use(8080) is False
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/docs/connpy/tests/test_configfile.html b/docs/connpy/tests/test_configfile.html
index 164ba3c..eea4bc0 100644
--- a/docs/connpy/tests/test_configfile.html
+++ b/docs/connpy/tests/test_configfile.html
@@ -383,9 +383,9 @@ el.replaceWith(d);
class TestConfigfileInit:
def test_creates_default_config(self, tmp_config_dir):
- """Creates config.json with defaults when it doesn't exist."""
- config_file = tmp_config_dir / "config.json"
- config_file.unlink() # Remove existing
+ """Creates config.yaml with defaults when it doesn't exist."""
+ config_file = tmp_config_dir / "config.yaml"
+ config_file.unlink(missing_ok=True) # Remove existing
key_file = tmp_config_dir / ".osk"
from connpy.configfile import configfile
@@ -402,7 +402,7 @@ el.replaceWith(d);
key_file.unlink() # Remove existing
from connpy.configfile import configfile
- conf = configfile(conf=str(tmp_config_dir / "config.json"), key=str(key_file))
+ conf = configfile(conf=str(tmp_config_dir / "config.yaml"), key=str(key_file))
assert key_file.exists()
assert conf.privatekey is not None
@@ -416,8 +416,8 @@ el.replaceWith(d);
def test_config_file_permissions(self, tmp_config_dir):
"""Config is created with 0o600 permissions."""
- config_file = tmp_config_dir / "config.json"
- config_file.unlink()
+ config_file = tmp_config_dir / "config.yaml"
+ config_file.unlink(missing_ok=True)
from connpy.configfile import configfile
configfile(conf=str(config_file), key=str(tmp_config_dir / ".osk"))
@@ -437,7 +437,7 @@ el.replaceWith(d);
(dot_folder / ".folder").write_text(str(config_dir))
(dot_folder / "plugins").mkdir(exist_ok=True)
- conf_path = str(config_dir / "my_config.json")
+ conf_path = str(config_dir / "my_config.yaml")
key_path = str(config_dir / "my_key")
from connpy.configfile import configfile
@@ -459,8 +459,8 @@ el.replaceWith(d);
def test_config_file_permissions(self, tmp_config_dir):
"""Config is created with 0o600 permissions."""
- config_file = tmp_config_dir / "config.json"
- config_file.unlink()
+ config_file = tmp_config_dir / "config.yaml"
+ config_file.unlink(missing_ok=True)
from connpy.configfile import configfile
configfile(conf=str(config_file), key=str(tmp_config_dir / ".osk"))
@@ -479,9 +479,9 @@ el.replaceWith(d);
Expand source code
def test_creates_default_config(self, tmp_config_dir):
- """Creates config.json with defaults when it doesn't exist."""
- config_file = tmp_config_dir / "config.json"
- config_file.unlink() # Remove existing
+ """Creates config.yaml with defaults when it doesn't exist."""
+ config_file = tmp_config_dir / "config.yaml"
+ config_file.unlink(missing_ok=True) # Remove existing
key_file = tmp_config_dir / ".osk"
from connpy.configfile import configfile
@@ -492,7 +492,7 @@ el.replaceWith(d);
assert conf.config["idletime"] == 30
assert "default" in conf.profiles
-Creates config.json with defaults when it doesn't exist.
+Creates config.yaml with defaults when it doesn't exist.
def test_creates_rsa_key(self, tmp_config_dir)
@@ -508,7 +508,7 @@ el.replaceWith(d);
key_file.unlink() # Remove existing
from connpy.configfile import configfile
- conf = configfile(conf=str(tmp_config_dir / "config.json"), key=str(key_file))
+ conf = configfile(conf=str(tmp_config_dir / "config.yaml"), key=str(key_file))
assert key_file.exists()
assert conf.privatekey is not None
@@ -536,7 +536,7 @@ el.replaceWith(d);
(dot_folder / ".folder").write_text(str(config_dir))
(dot_folder / "plugins").mkdir(exist_ok=True)
- conf_path = str(config_dir / "my_config.json")
+ conf_path = str(config_dir / "my_config.yaml")
key_path = str(config_dir / "my_key")
from connpy.configfile import configfile
@@ -846,7 +846,7 @@ el.replaceWith(d);
def test_profileused(self, tmp_config_dir):
"""Detects nodes using a specific profile."""
- config_file = tmp_config_dir / "config.json"
+ config_file = tmp_config_dir / "config.yaml"
data = {
"config": {"case": False, "idletime": 30, "fzf": False},
"connections": {
@@ -872,7 +872,7 @@ el.replaceWith(d);
"options": "", "logs": "", "tags": "", "jumphost": ""}
}
}
- config_file.write_text(json.dumps(data, indent=4))
+ config_file.write_text(yaml.dump(data, default_flow_style=False, sort_keys=False))
from connpy.configfile import configfile
conf = configfile(conf=str(config_file), key=str(tmp_config_dir / ".osk"))
@@ -1014,7 +1014,7 @@ el.replaceWith(d);
def test_profileused(self, tmp_config_dir):
"""Detects nodes using a specific profile."""
- config_file = tmp_config_dir / "config.json"
+ config_file = tmp_config_dir / "config.yaml"
data = {
"config": {"case": False, "idletime": 30, "fzf": False},
"connections": {
@@ -1040,7 +1040,7 @@ el.replaceWith(d);
"options": "", "logs": "", "tags": "", "jumphost": ""}
}
}
- config_file.write_text(json.dumps(data, indent=4))
+ config_file.write_text(yaml.dump(data, default_flow_style=False, sort_keys=False))
from connpy.configfile import configfile
conf = configfile(conf=str(config_file), key=str(tmp_config_dir / ".osk"))
@@ -1111,7 +1111,7 @@ el.replaceWith(d);
def test_getitem_with_profile_extraction(self, tmp_config_dir):
"""extract=True resolves @profile references."""
- config_file = tmp_config_dir / "config.json"
+ config_file = tmp_config_dir / "config.yaml"
data = {
"config": {"case": False, "idletime": 30, "fzf": False},
"connections": {
@@ -1131,7 +1131,7 @@ el.replaceWith(d);
"options": "", "logs": "", "tags": "", "jumphost": ""}
}
}
- config_file.write_text(json.dumps(data, indent=4))
+ config_file.write_text(yaml.dump(data, default_flow_style=False, sort_keys=False))
from connpy.configfile import configfile
conf = configfile(conf=str(config_file), key=str(tmp_config_dir / ".osk"))
@@ -1235,7 +1235,7 @@ el.replaceWith(d);
def test_getitem_with_profile_extraction(self, tmp_config_dir):
"""extract=True resolves @profile references."""
- config_file = tmp_config_dir / "config.json"
+ config_file = tmp_config_dir / "config.yaml"
data = {
"config": {"case": False, "idletime": 30, "fzf": False},
"connections": {
@@ -1255,7 +1255,7 @@ el.replaceWith(d);
"options": "", "logs": "", "tags": "", "jumphost": ""}
}
}
- config_file.write_text(json.dumps(data, indent=4))
+ config_file.write_text(yaml.dump(data, default_flow_style=False, sort_keys=False))
from connpy.configfile import configfile
conf = configfile(conf=str(config_file), key=str(tmp_config_dir / ".osk"))
diff --git a/docs/connpy/tests/test_context.html b/docs/connpy/tests/test_context.html
new file mode 100644
index 0000000..bc4ca50
--- /dev/null
+++ b/docs/connpy/tests/test_context.html
@@ -0,0 +1,475 @@
+
+
+
+
+
+
+connpy.tests.test_context API documentation
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+Module connpy.tests.test_context
+
+
+Tests for connpy.core_plugins.context
+
+
+
+
+
+
+
+def mock_connapp()
+
+-
+
+
+Expand source code
+
+@pytest.fixture
+def mock_connapp():
+ connapp = MagicMock()
+ connapp.config.config = {
+ "contexts": {"all": [".*"]},
+ "current_context": "all"
+ }
+ return connapp
+
+
+
+
+
+
+
+
+
+class TestContextManager
+
+-
+
+
+Expand source code
+
+class TestContextManager:
+ def test_init(self, mock_connapp):
+ cm = context_manager(mock_connapp)
+ assert cm.contexts == {"all": [".*"]}
+ assert cm.current_context == "all"
+ assert len(cm.regex) == 1
+
+ def test_add_context_success(self, mock_connapp):
+ cm = context_manager(mock_connapp)
+ cm.add_context("prod", ["^prod_.*"])
+ assert "prod" in cm.contexts
+ mock_connapp._change_settings.assert_called_with("contexts", cm.contexts)
+
+ def test_add_context_invalid_name(self, mock_connapp):
+ cm = context_manager(mock_connapp)
+ with pytest.raises(SystemExit) as exc:
+ cm.add_context("prod-env", ["Regex"])
+ assert exc.value.code == 1
+
+ def test_add_context_already_exists(self, mock_connapp):
+ cm = context_manager(mock_connapp)
+ with pytest.raises(SystemExit) as exc:
+ cm.add_context("all", ["Regex"])
+ assert exc.value.code == 2
+
+ def test_modify_context_success(self, mock_connapp):
+ cm = context_manager(mock_connapp)
+ cm.add_context("prod", ["old"])
+ cm.modify_context("prod", ["new"])
+ assert cm.contexts["prod"] == ["new"]
+
+ def test_modify_context_all(self, mock_connapp):
+ cm = context_manager(mock_connapp)
+ with pytest.raises(SystemExit) as exc:
+ cm.modify_context("all", ["new"])
+ assert exc.value.code == 3
+
+ def test_modify_context_not_exists(self, mock_connapp):
+ cm = context_manager(mock_connapp)
+ with pytest.raises(SystemExit) as exc:
+ cm.modify_context("fake", ["new"])
+ assert exc.value.code == 4
+
+ def test_delete_context_success(self, mock_connapp):
+ cm = context_manager(mock_connapp)
+ cm.add_context("prod", ["old"])
+ cm.delete_context("prod")
+ assert "prod" not in cm.contexts
+
+ def test_delete_context_all(self, mock_connapp):
+ cm = context_manager(mock_connapp)
+ with pytest.raises(SystemExit) as exc:
+ cm.delete_context("all")
+ assert exc.value.code == 3
+
+ def test_delete_context_current(self, mock_connapp):
+ mock_connapp.config.config["current_context"] = "prod"
+ mock_connapp.config.config["contexts"]["prod"] = [".*"]
+ cm = context_manager(mock_connapp)
+ with pytest.raises(SystemExit) as exc:
+ cm.delete_context("prod")
+ assert exc.value.code == 5
+
+ def test_set_context_success(self, mock_connapp):
+ cm = context_manager(mock_connapp)
+ cm.contexts["prod"] = [".*"]
+ cm.set_context("prod")
+ mock_connapp._change_settings.assert_called_with("current_context", "prod")
+
+ def test_set_context_already_set(self, mock_connapp):
+ cm = context_manager(mock_connapp)
+ with pytest.raises(SystemExit) as exc:
+ cm.set_context("all")
+ assert exc.value.code == 0
+
+ def test_match_regexp(self, mock_connapp):
+ mock_connapp.config.config["contexts"]["all"] = ["^prod", "^test"]
+ cm = context_manager(mock_connapp)
+ assert cm.match_any_regex("prod_node", cm.regex) is True
+ assert cm.match_any_regex("test_node", cm.regex) is True
+ assert cm.match_any_regex("dev_node", cm.regex) is False
+
+ def test_modify_node_list(self, mock_connapp):
+ mock_connapp.config.config["contexts"]["all"] = ["^prod"]
+ cm = context_manager(mock_connapp)
+ nodes = ["prod_1", "dev_1", "prod_2"]
+ result = cm.modify_node_list(result=nodes)
+ assert result == ["prod_1", "prod_2"]
+
+ def test_modify_node_dict(self, mock_connapp):
+ mock_connapp.config.config["contexts"]["all"] = ["^prod"]
+ cm = context_manager(mock_connapp)
+ nodes = {"prod_1": {}, "dev_1": {}, "prod_2": {}}
+ result = cm.modify_node_dict(result=nodes)
+ assert set(result.keys()) == {"prod_1", "prod_2"}
+
+
+Methods
+
+
+def test_add_context_already_exists(self, mock_connapp)
+
+-
+
+
+Expand source code
+
+def test_add_context_already_exists(self, mock_connapp):
+ cm = context_manager(mock_connapp)
+ with pytest.raises(SystemExit) as exc:
+ cm.add_context("all", ["Regex"])
+ assert exc.value.code == 2
+
+
+
+
+def test_add_context_invalid_name(self, mock_connapp)
+
+-
+
+
+Expand source code
+
+def test_add_context_invalid_name(self, mock_connapp):
+ cm = context_manager(mock_connapp)
+ with pytest.raises(SystemExit) as exc:
+ cm.add_context("prod-env", ["Regex"])
+ assert exc.value.code == 1
+
+
+
+
+def test_add_context_success(self, mock_connapp)
+
+-
+
+
+Expand source code
+
+def test_add_context_success(self, mock_connapp):
+ cm = context_manager(mock_connapp)
+ cm.add_context("prod", ["^prod_.*"])
+ assert "prod" in cm.contexts
+ mock_connapp._change_settings.assert_called_with("contexts", cm.contexts)
+
+
+
+
+def test_delete_context_all(self, mock_connapp)
+
+-
+
+
+Expand source code
+
+def test_delete_context_all(self, mock_connapp):
+ cm = context_manager(mock_connapp)
+ with pytest.raises(SystemExit) as exc:
+ cm.delete_context("all")
+ assert exc.value.code == 3
+
+
+
+
+def test_delete_context_current(self, mock_connapp)
+
+-
+
+
+Expand source code
+
+def test_delete_context_current(self, mock_connapp):
+ mock_connapp.config.config["current_context"] = "prod"
+ mock_connapp.config.config["contexts"]["prod"] = [".*"]
+ cm = context_manager(mock_connapp)
+ with pytest.raises(SystemExit) as exc:
+ cm.delete_context("prod")
+ assert exc.value.code == 5
+
+
+
+
+def test_delete_context_success(self, mock_connapp)
+
+-
+
+
+Expand source code
+
+def test_delete_context_success(self, mock_connapp):
+ cm = context_manager(mock_connapp)
+ cm.add_context("prod", ["old"])
+ cm.delete_context("prod")
+ assert "prod" not in cm.contexts
+
+
+
+
+def test_init(self, mock_connapp)
+
+-
+
+
+Expand source code
+
+def test_init(self, mock_connapp):
+ cm = context_manager(mock_connapp)
+ assert cm.contexts == {"all": [".*"]}
+ assert cm.current_context == "all"
+ assert len(cm.regex) == 1
+
+
+
+
+def test_match_regexp(self, mock_connapp)
+
+-
+
+
+Expand source code
+
+def test_match_regexp(self, mock_connapp):
+ mock_connapp.config.config["contexts"]["all"] = ["^prod", "^test"]
+ cm = context_manager(mock_connapp)
+ assert cm.match_any_regex("prod_node", cm.regex) is True
+ assert cm.match_any_regex("test_node", cm.regex) is True
+ assert cm.match_any_regex("dev_node", cm.regex) is False
+
+
+
+
+def test_modify_context_all(self, mock_connapp)
+
+-
+
+
+Expand source code
+
+def test_modify_context_all(self, mock_connapp):
+ cm = context_manager(mock_connapp)
+ with pytest.raises(SystemExit) as exc:
+ cm.modify_context("all", ["new"])
+ assert exc.value.code == 3
+
+
+
+
+def test_modify_context_not_exists(self, mock_connapp)
+
+-
+
+
+Expand source code
+
+def test_modify_context_not_exists(self, mock_connapp):
+ cm = context_manager(mock_connapp)
+ with pytest.raises(SystemExit) as exc:
+ cm.modify_context("fake", ["new"])
+ assert exc.value.code == 4
+
+
+
+
+def test_modify_context_success(self, mock_connapp)
+
+-
+
+
+Expand source code
+
+def test_modify_context_success(self, mock_connapp):
+ cm = context_manager(mock_connapp)
+ cm.add_context("prod", ["old"])
+ cm.modify_context("prod", ["new"])
+ assert cm.contexts["prod"] == ["new"]
+
+
+
+
+def test_modify_node_dict(self, mock_connapp)
+
+-
+
+
+Expand source code
+
+def test_modify_node_dict(self, mock_connapp):
+ mock_connapp.config.config["contexts"]["all"] = ["^prod"]
+ cm = context_manager(mock_connapp)
+ nodes = {"prod_1": {}, "dev_1": {}, "prod_2": {}}
+ result = cm.modify_node_dict(result=nodes)
+ assert set(result.keys()) == {"prod_1", "prod_2"}
+
+
+
+
+def test_modify_node_list(self, mock_connapp)
+
+-
+
+
+Expand source code
+
+def test_modify_node_list(self, mock_connapp):
+ mock_connapp.config.config["contexts"]["all"] = ["^prod"]
+ cm = context_manager(mock_connapp)
+ nodes = ["prod_1", "dev_1", "prod_2"]
+ result = cm.modify_node_list(result=nodes)
+ assert result == ["prod_1", "prod_2"]
+
+
+
+
+def test_set_context_already_set(self, mock_connapp)
+
+-
+
+
+Expand source code
+
+def test_set_context_already_set(self, mock_connapp):
+ cm = context_manager(mock_connapp)
+ with pytest.raises(SystemExit) as exc:
+ cm.set_context("all")
+ assert exc.value.code == 0
+
+
+
+
+def test_set_context_success(self, mock_connapp)
+
+-
+
+
+Expand source code
+
+def test_set_context_success(self, mock_connapp):
+ cm = context_manager(mock_connapp)
+ cm.contexts["prod"] = [".*"]
+ cm.set_context("prod")
+ mock_connapp._change_settings.assert_called_with("current_context", "prod")
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/docs/connpy/tests/test_sync.html b/docs/connpy/tests/test_sync.html
new file mode 100644
index 0000000..d1e0b08
--- /dev/null
+++ b/docs/connpy/tests/test_sync.html
@@ -0,0 +1,396 @@
+
+
+
+
+
+
+connpy.tests.test_sync API documentation
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+Module connpy.tests.test_sync
+
+
+Tests for connpy.core_plugins.sync
+
+
+
+
+
+
+
+def mock_connapp()
+
+-
+
+
+Expand source code
+
+@pytest.fixture
+def mock_connapp():
+ app = MagicMock()
+ app.config.defaultdir = "/fake/dir"
+ app.config.file = "/fake/dir/config.yaml"
+ app.config.key = "/fake/dir/.osk"
+ app.config.config = {"sync": True}
+ return app
+
+
+
+
+
+
+
+
+
+class TestSyncPlugin
+
+-
+
+
+Expand source code
+
+class TestSyncPlugin:
+ def test_init(self, mock_connapp):
+ s = sync(mock_connapp)
+ assert s.sync is True
+ assert s.file == "/fake/dir/config.yaml"
+ assert s.token_file == "/fake/dir/gtoken.json"
+
+ @patch("connpy.core_plugins.sync.os.path.exists")
+ @patch("connpy.core_plugins.sync.Credentials")
+ def test_get_credentials_success(self, MockCreds, mock_exists, mock_connapp):
+ mock_exists.return_value = True
+ mock_cred_instance = MagicMock()
+ mock_cred_instance.valid = True
+ MockCreds.from_authorized_user_file.return_value = mock_cred_instance
+
+ s = sync(mock_connapp)
+ creds = s.get_credentials()
+ assert creds == mock_cred_instance
+
+ @patch("connpy.core_plugins.sync.os.path.exists")
+ def test_get_credentials_not_found(self, mock_exists, mock_connapp):
+ mock_exists.return_value = False
+ s = sync(mock_connapp)
+ assert s.get_credentials() == 0
+
+ @patch("connpy.core_plugins.sync.zipfile.ZipFile")
+ @patch("connpy.core_plugins.sync.os.path.basename")
+ def test_compress_specific_files(self, mock_basename, MockZipFile, mock_connapp):
+ mock_basename.return_value = "config.yaml"
+ s = sync(mock_connapp)
+ zip_mock = MagicMock()
+ MockZipFile.return_value.__enter__.return_value = zip_mock
+
+ s.compress_specific_files("/fake/zip.zip")
+ zip_mock.write.assert_any_call(s.file, "config.yaml")
+ zip_mock.write.assert_any_call(s.key, ".osk")
+
+ @patch("connpy.core_plugins.sync.zipfile.ZipFile")
+ @patch("connpy.core_plugins.sync.os.path.dirname")
+ def test_decompress_zip_yaml(self, mock_dirname, MockZipFile, mock_connapp):
+ mock_dirname.return_value = "/fake/dir"
+ s = sync(mock_connapp)
+ zip_mock = MagicMock()
+ zip_mock.namelist.return_value = ["config.yaml", ".osk"]
+ MockZipFile.return_value.__enter__.return_value = zip_mock
+
+ assert s.decompress_zip("/fake/zip.zip") == 0
+ zip_mock.extract.assert_any_call("config.yaml", "/fake/dir")
+ zip_mock.extract.assert_any_call(".osk", "/fake/dir")
+
+ @patch("connpy.core_plugins.sync.zipfile.ZipFile")
+ @patch("connpy.core_plugins.sync.os.path.dirname")
+ def test_decompress_zip_json_fallback(self, mock_dirname, MockZipFile, mock_connapp):
+ mock_dirname.return_value = "/fake/dir"
+ s = sync(mock_connapp)
+ zip_mock = MagicMock()
+ zip_mock.namelist.return_value = ["config.json", ".osk"]
+ MockZipFile.return_value.__enter__.return_value = zip_mock
+
+ assert s.decompress_zip("/fake/old_zip.zip") == 0
+ zip_mock.extract.assert_any_call("config.json", "/fake/dir")
+
+ @patch.object(sync, "get_credentials")
+ @patch("connpy.core_plugins.sync.build")
+ def test_get_appdata_files(self, mock_build, mock_get_credentials, mock_connapp):
+ mock_get_credentials.return_value = MagicMock()
+ mock_service = MagicMock()
+ mock_build.return_value = mock_service
+
+ mock_service.files().list().execute.return_value = {
+ "files": [
+ {"id": "1", "name": "backup1.zip", "appProperties": {"timestamp": "1000", "date": "2024"}}
+ ]
+ }
+
+ s = sync(mock_connapp)
+ files = s.get_appdata_files()
+ assert len(files) == 1
+ assert files[0]["id"] == "1"
+ assert files[0]["timestamp"] == "1000"
+
+ @patch.object(sync, "get_credentials")
+ @patch("connpy.core_plugins.sync.build")
+ @patch("connpy.core_plugins.sync.MediaFileUpload")
+ @patch("connpy.core_plugins.sync.os.path.basename")
+ def test_backup_file_to_drive(self, mock_basename, mock_media, mock_build, mock_get_credentials, mock_connapp):
+ mock_get_credentials.return_value = MagicMock()
+ mock_basename.return_value = "backup.zip"
+ mock_service = MagicMock()
+ mock_build.return_value = mock_service
+
+ s = sync(mock_connapp)
+ assert s.backup_file_to_drive("/fake/backup.zip", 1234567890000) == 0
+ mock_service.files().create.assert_called_once()
+
+
+Methods
+
+
+def test_backup_file_to_drive(self, mock_basename, mock_media, mock_build, mock_get_credentials, mock_connapp)
+
+-
+
+
+Expand source code
+
+@patch.object(sync, "get_credentials")
+@patch("connpy.core_plugins.sync.build")
+@patch("connpy.core_plugins.sync.MediaFileUpload")
+@patch("connpy.core_plugins.sync.os.path.basename")
+def test_backup_file_to_drive(self, mock_basename, mock_media, mock_build, mock_get_credentials, mock_connapp):
+ mock_get_credentials.return_value = MagicMock()
+ mock_basename.return_value = "backup.zip"
+ mock_service = MagicMock()
+ mock_build.return_value = mock_service
+
+ s = sync(mock_connapp)
+ assert s.backup_file_to_drive("/fake/backup.zip", 1234567890000) == 0
+ mock_service.files().create.assert_called_once()
+
+
+
+
+def test_compress_specific_files(self, mock_basename, MockZipFile, mock_connapp)
+
+-
+
+
+Expand source code
+
+@patch("connpy.core_plugins.sync.zipfile.ZipFile")
+@patch("connpy.core_plugins.sync.os.path.basename")
+def test_compress_specific_files(self, mock_basename, MockZipFile, mock_connapp):
+ mock_basename.return_value = "config.yaml"
+ s = sync(mock_connapp)
+ zip_mock = MagicMock()
+ MockZipFile.return_value.__enter__.return_value = zip_mock
+
+ s.compress_specific_files("/fake/zip.zip")
+ zip_mock.write.assert_any_call(s.file, "config.yaml")
+ zip_mock.write.assert_any_call(s.key, ".osk")
+
+
+
+
+def test_decompress_zip_json_fallback(self, mock_dirname, MockZipFile, mock_connapp)
+
+-
+
+
+Expand source code
+
+@patch("connpy.core_plugins.sync.zipfile.ZipFile")
+@patch("connpy.core_plugins.sync.os.path.dirname")
+def test_decompress_zip_json_fallback(self, mock_dirname, MockZipFile, mock_connapp):
+ mock_dirname.return_value = "/fake/dir"
+ s = sync(mock_connapp)
+ zip_mock = MagicMock()
+ zip_mock.namelist.return_value = ["config.json", ".osk"]
+ MockZipFile.return_value.__enter__.return_value = zip_mock
+
+ assert s.decompress_zip("/fake/old_zip.zip") == 0
+ zip_mock.extract.assert_any_call("config.json", "/fake/dir")
+
+
+
+
+def test_decompress_zip_yaml(self, mock_dirname, MockZipFile, mock_connapp)
+
+-
+
+
+Expand source code
+
+@patch("connpy.core_plugins.sync.zipfile.ZipFile")
+@patch("connpy.core_plugins.sync.os.path.dirname")
+def test_decompress_zip_yaml(self, mock_dirname, MockZipFile, mock_connapp):
+ mock_dirname.return_value = "/fake/dir"
+ s = sync(mock_connapp)
+ zip_mock = MagicMock()
+ zip_mock.namelist.return_value = ["config.yaml", ".osk"]
+ MockZipFile.return_value.__enter__.return_value = zip_mock
+
+ assert s.decompress_zip("/fake/zip.zip") == 0
+ zip_mock.extract.assert_any_call("config.yaml", "/fake/dir")
+ zip_mock.extract.assert_any_call(".osk", "/fake/dir")
+
+
+
+
+def test_get_appdata_files(self, mock_build, mock_get_credentials, mock_connapp)
+
+-
+
+
+Expand source code
+
+@patch.object(sync, "get_credentials")
+@patch("connpy.core_plugins.sync.build")
+def test_get_appdata_files(self, mock_build, mock_get_credentials, mock_connapp):
+ mock_get_credentials.return_value = MagicMock()
+ mock_service = MagicMock()
+ mock_build.return_value = mock_service
+
+ mock_service.files().list().execute.return_value = {
+ "files": [
+ {"id": "1", "name": "backup1.zip", "appProperties": {"timestamp": "1000", "date": "2024"}}
+ ]
+ }
+
+ s = sync(mock_connapp)
+ files = s.get_appdata_files()
+ assert len(files) == 1
+ assert files[0]["id"] == "1"
+ assert files[0]["timestamp"] == "1000"
+
+
+
+
+def test_get_credentials_not_found(self, mock_exists, mock_connapp)
+
+-
+
+
+Expand source code
+
+@patch("connpy.core_plugins.sync.os.path.exists")
+def test_get_credentials_not_found(self, mock_exists, mock_connapp):
+ mock_exists.return_value = False
+ s = sync(mock_connapp)
+ assert s.get_credentials() == 0
+
+
+
+
+def test_get_credentials_success(self, MockCreds, mock_exists, mock_connapp)
+
+-
+
+
+Expand source code
+
+@patch("connpy.core_plugins.sync.os.path.exists")
+@patch("connpy.core_plugins.sync.Credentials")
+def test_get_credentials_success(self, MockCreds, mock_exists, mock_connapp):
+ mock_exists.return_value = True
+ mock_cred_instance = MagicMock()
+ mock_cred_instance.valid = True
+ MockCreds.from_authorized_user_file.return_value = mock_cred_instance
+
+ s = sync(mock_connapp)
+ creds = s.get_credentials()
+ assert creds == mock_cred_instance
+
+
+
+
+def test_init(self, mock_connapp)
+
+-
+
+
+Expand source code
+
+def test_init(self, mock_connapp):
+ s = sync(mock_connapp)
+ assert s.sync is True
+ assert s.file == "/fake/dir/config.yaml"
+ assert s.token_file == "/fake/dir/gtoken.json"
+
+
+
+
+
+
+
+
+
+
+
+
+