feat: migrate config to YAML, add dual-caching and 0ms fzf wrapper

- Migrated configuration backend from JSON to YAML for better readability.
- Added automatic dual-caching (.config.cache.json) to preserve fast load times with YAML.
- Implemented a new 0ms latency fzf wrapper for bash and zsh (--fzf-wrapper).
- Updated sync plugin to support the new YAML config format and clear caches on extraction.
- Refactored 'completion.py' to gracefully handle fallback config formats.
- Added new test modules (test_capture, test_context, test_sync) covering core plugins.
- Updated existing unit tests to handle YAML config creation and parsing.
- Bumped version to 5.0b3 and regenerated HTML documentation.
This commit is contained in:
2026-04-03 18:47:03 -03:00
parent 5d8c372f23
commit d8f7d4db87
16 changed files with 1681 additions and 109 deletions
+1 -1
View File
@@ -1,2 +1,2 @@
__version__ = "5.0b2"
__version__ = "5.0b3"
+15 -3
View File
@@ -97,9 +97,21 @@ def main():
configdir = f.read().strip()
except (FileNotFoundError, IOError):
configdir = defaultdir
defaultfile = configdir + '/config.json'
jsonconf = open(defaultfile)
config = json.load(jsonconf)
cachefile = configdir + '/.config.cache.json'
try:
with open(cachefile, "r") as jsonconf:
config = json.load(jsonconf)
except FileNotFoundError:
try:
import yaml
with open(configdir + '/config.yaml', "r") as yamlconf:
config = yaml.safe_load(yamlconf)
except Exception:
try:
with open(configdir + '/config.json', "r") as jsonconf:
config = json.load(jsonconf)
except Exception:
exit()
nodes = _getallnodes(config)
folders = _getallfolders(config)
profiles = list(config["profiles"].keys())
+82 -31
View File
@@ -3,6 +3,8 @@
import json
import os
import re
import yaml
import shutil
from Crypto.PublicKey import RSA
from Crypto.Cipher import PKCS1_OAEP
from pathlib import Path
@@ -65,16 +67,40 @@ class configfile:
with open(pathfile, "w") as f:
f.write(str(defaultdir))
configdir = defaultdir
defaultfile = configdir + '/config.json'
defaultfile = configdir + '/config.yaml'
self.cachefile = configdir + '/.config.cache.json'
self.fzf_cachefile = configdir + '/.fzf_nodes_cache.txt'
defaultkey = configdir + '/.osk'
if conf == None:
self.file = defaultfile
# Backwards compatibility: Migrate from JSON to YAML
legacy_json = configdir + '/config.json'
legacy_noext = configdir + '/config'
legacy_file = None
if os.path.exists(legacy_json): legacy_file = legacy_json
elif os.path.exists(legacy_noext): legacy_file = legacy_noext
if not os.path.exists(self.file) and legacy_file:
try:
with open(legacy_file, 'r') as f:
old_data = json.load(f)
with open(self.file, 'w') as f:
yaml.dump(old_data, f, default_flow_style=False, sort_keys=False)
with open(self.cachefile, 'w') as f:
json.dump(old_data, f)
shutil.move(legacy_file, legacy_file + ".backup")
printer.success(f"Migrated legacy config ({len(old_data.get('connections',{}))} folders/nodes) into YAML and Cache successfully!")
except Exception as e:
printer.warning(f"Failed to migrate legacy config: {e}")
else:
self.file = conf
if key == None:
self.key = defaultkey
else:
self.key = key
if os.path.exists(self.file):
config = self._loadconfig(self.file)
else:
@@ -91,23 +117,38 @@ class configfile:
def _loadconfig(self, conf):
#Loads config file
jsonconf = open(conf)
jsondata = json.load(jsonconf)
jsonconf.close()
return jsondata
#Loads config file using dual cache
cache_exists = os.path.exists(self.cachefile)
yaml_time = os.path.getmtime(conf) if os.path.exists(conf) else 0
cache_time = os.path.getmtime(self.cachefile) if cache_exists else 0
if not cache_exists or yaml_time > cache_time:
with open(conf, 'r') as f:
data = yaml.safe_load(f)
try:
with open(self.cachefile, 'w') as f:
json.dump(data, f)
except Exception:
pass
return data
else:
with open(self.cachefile, 'r') as f:
return json.load(f)
def _createconfig(self, conf):
#Create config file
defaultconfig = {'config': {'case': False, 'idletime': 30, 'fzf': False}, 'connections': {}, 'profiles': { "default": { "host":"", "protocol":"ssh", "port":"", "user":"", "password":"", "options":"", "logs":"", "tags": "", "jumphost":""}}}
if not os.path.exists(conf):
with open(conf, "w") as f:
json.dump(defaultconfig, f, indent = 4)
f.close()
yaml.dump(defaultconfig, f, default_flow_style=False, sort_keys=False)
os.chmod(conf, 0o600)
jsonconf = open(conf)
jsondata = json.load(jsonconf)
jsonconf.close()
try:
with open(self.cachefile, 'w') as f:
json.dump(defaultconfig, f)
except Exception:
pass
with open(conf, 'r') as f:
jsondata = yaml.safe_load(f)
return jsondata
@MethodHook
@@ -119,13 +160,23 @@ class configfile:
newconfig["profiles"] = self.profiles
try:
with open(conf, "w") as f:
json.dump(newconfig, f, indent = 4)
f.close()
yaml.dump(newconfig, f, default_flow_style=False, sort_keys=False)
with open(self.cachefile, "w") as f:
json.dump(newconfig, f)
self._generate_nodes_cache()
except (IOError, OSError) as e:
printer.error(f"Failed to save config: {e}")
return 1
return 0
def _generate_nodes_cache(self):
try:
nodes = self._getallnodes()
with open(self.fzf_cachefile, "w") as f:
f.write("\n".join(nodes))
except Exception:
pass
def _createkey(self, keyfile):
#Create key file
key = RSA.generate(2048)
@@ -344,15 +395,15 @@ class configfile:
def _getallnodes(self, filter = None):
#get all nodes on configfile
nodes = []
layer1 = [k for k,v in self.connections.items() if isinstance(v, dict) and v["type"] == "connection"]
folders = [k for k,v in self.connections.items() if isinstance(v, dict) and v["type"] == "folder"]
layer1 = [k for k,v in self.connections.items() if isinstance(v, dict) and v.get("type") == "connection"]
folders = [k for k,v in self.connections.items() if isinstance(v, dict) and v.get("type") == "folder"]
nodes.extend(layer1)
for f in folders:
layer2 = [k + "@" + f for k,v in self.connections[f].items() if isinstance(v, dict) and v["type"] == "connection"]
layer2 = [k + "@" + f for k,v in self.connections[f].items() if isinstance(v, dict) and v.get("type") == "connection"]
nodes.extend(layer2)
subfolders = [k for k,v in self.connections[f].items() if isinstance(v, dict) and v["type"] == "subfolder"]
subfolders = [k for k,v in self.connections[f].items() if isinstance(v, dict) and v.get("type") == "subfolder"]
for s in subfolders:
layer3 = [k + "@" + s + "@" + f for k,v in self.connections[f][s].items() if isinstance(v, dict) and v["type"] == "connection"]
layer3 = [k + "@" + s + "@" + f for k,v in self.connections[f][s].items() if isinstance(v, dict) and v.get("type") == "connection"]
nodes.extend(layer3)
if filter:
if isinstance(filter, str):
@@ -367,15 +418,15 @@ class configfile:
def _getallnodesfull(self, filter = None, extract = True):
#get all nodes on configfile with all their attributes.
nodes = {}
layer1 = {k:v for k,v in self.connections.items() if isinstance(v, dict) and v["type"] == "connection"}
folders = [k for k,v in self.connections.items() if isinstance(v, dict) and v["type"] == "folder"]
layer1 = {k:v for k,v in self.connections.items() if isinstance(v, dict) and v.get("type") == "connection"}
folders = [k for k,v in self.connections.items() if isinstance(v, dict) and v.get("type") == "folder"]
nodes.update(layer1)
for f in folders:
layer2 = {k + "@" + f:v for k,v in self.connections[f].items() if isinstance(v, dict) and v["type"] == "connection"}
layer2 = {k + "@" + f:v for k,v in self.connections[f].items() if isinstance(v, dict) and v.get("type") == "connection"}
nodes.update(layer2)
subfolders = [k for k,v in self.connections[f].items() if isinstance(v, dict) and v["type"] == "subfolder"]
subfolders = [k for k,v in self.connections[f].items() if isinstance(v, dict) and v.get("type") == "subfolder"]
for s in subfolders:
layer3 = {k + "@" + s + "@" + f:v for k,v in self.connections[f][s].items() if isinstance(v, dict) and v["type"] == "connection"}
layer3 = {k + "@" + s + "@" + f:v for k,v in self.connections[f][s].items() if isinstance(v, dict) and v.get("type") == "connection"}
nodes.update(layer3)
if filter:
if isinstance(filter, str):
@@ -406,27 +457,27 @@ class configfile:
@MethodHook
def _getallfolders(self):
#get all folders on configfile
folders = ["@" + k for k,v in self.connections.items() if isinstance(v, dict) and v["type"] == "folder"]
folders = ["@" + k for k,v in self.connections.items() if isinstance(v, dict) and v.get("type") == "folder"]
subfolders = []
for f in folders:
s = ["@" + k + f for k,v in self.connections[f[1:]].items() if isinstance(v, dict) and v["type"] == "subfolder"]
s = ["@" + k + f for k,v in self.connections[f[1:]].items() if isinstance(v, dict) and v.get("type") == "subfolder"]
subfolders.extend(s)
folders.extend(subfolders)
return folders
@MethodHook
def _profileused(self, profile):
#Check if profile is used before deleting it
#Return all the nodes that uses this profile.
nodes = []
layer1 = [k for k,v in self.connections.items() if isinstance(v, dict) and v["type"] == "connection" and ("@" + profile in v.values() or ( isinstance(v["password"],list) and "@" + profile in v["password"]))]
folders = [k for k,v in self.connections.items() if isinstance(v, dict) and v["type"] == "folder"]
layer1 = [k for k,v in self.connections.items() if isinstance(v, dict) and v.get("type") == "connection" and ("@" + profile in v.values() or ( isinstance(v.get("password"),list) and "@" + profile in v.get("password")))]
folders = [k for k,v in self.connections.items() if isinstance(v, dict) and v.get("type") == "folder"]
nodes.extend(layer1)
for f in folders:
layer2 = [k + "@" + f for k,v in self.connections[f].items() if isinstance(v, dict) and v["type"] == "connection" and ("@" + profile in v.values() or ( isinstance(v["password"],list) and "@" + profile in v["password"]))]
layer2 = [k + "@" + f for k,v in self.connections[f].items() if isinstance(v, dict) and v.get("type") == "connection" and ("@" + profile in v.values() or ( isinstance(v.get("password"),list) and "@" + profile in v.get("password")))]
nodes.extend(layer2)
subfolders = [k for k,v in self.connections[f].items() if isinstance(v, dict) and v["type"] == "subfolder"]
subfolders = [k for k,v in self.connections[f].items() if isinstance(v, dict) and v.get("type") == "subfolder"]
for s in subfolders:
layer3 = [k + "@" + s + "@" + f for k,v in self.connections[f][s].items() if isinstance(v, dict) and v["type"] == "connection" and ("@" + profile in v.values() or ( isinstance(v["password"],list) and "@" + profile in v["password"]))]
layer3 = [k + "@" + s + "@" + f for k,v in self.connections[f][s].items() if isinstance(v, dict) and v.get("type") == "connection" and ("@" + profile in v.values() or ( isinstance(v.get("password"),list) and "@" + profile in v.get("password")))]
nodes.extend(layer3)
return nodes
+58 -6
View File
@@ -166,6 +166,7 @@ class connapp:
configcrud.add_argument("--fzf", dest="fzf", nargs=1, action=self._store_type, help="Use fzf for lists", choices=["true","false"])
configcrud.add_argument("--keepalive", dest="idletime", nargs=1, action=self._store_type, help="Set keepalive time in seconds, 0 to disable", type=int, metavar="INT")
configcrud.add_argument("--completion", dest="completion", nargs=1, choices=["bash","zsh"], action=self._store_type, help="Get terminal completion configuration for conn")
configcrud.add_argument("--fzf-wrapper", dest="fzf_wrapper", nargs=1, choices=["bash","zsh"], action=self._store_type, help="Get 0ms latency fzf bash/zsh wrapper")
configcrud.add_argument("--configfolder", dest="configfolder", nargs=1, action=self._store_type, help="Set the default location for config file", metavar="FOLDER")
configcrud.add_argument("--engineer-model", dest="engineer_model", nargs=1, action=self._store_type, help="Set engineer model", metavar="MODEL")
configcrud.add_argument("--engineer-api-key", dest="engineer_api_key", nargs=1, action=self._store_type, help="Set engineer api_key", metavar="API_KEY")
@@ -186,6 +187,10 @@ class connapp:
printer.warning(e)
for preload in self.plugins.preloads.values():
preload.Preload(self)
if not os.path.exists(self.config.fzf_cachefile):
self.config._generate_nodes_cache()
#Generate helps
nodeparser.usage = self._help("usage", subparsers)
nodeparser.epilog = self._help("end", subparsers)
@@ -482,7 +487,7 @@ class connapp:
def _func_others(self, args):
#Function called when using other commands
actions = {"ls": self._ls, "move": self._mvcp, "cp": self._mvcp, "bulk": self._bulk, "completion": self._completion, "case": self._case, "fzf": self._fzf, "idletime": self._idletime, "configfolder": self._configfolder, "engineer_model": self._ai_config, "engineer_api_key": self._ai_config, "architect_model": self._ai_config, "architect_api_key": self._ai_config}
actions = {"ls": self._ls, "move": self._mvcp, "cp": self._mvcp, "bulk": self._bulk, "completion": self._completion, "fzf_wrapper": self._fzf_wrapper, "case": self._case, "fzf": self._fzf, "idletime": self._idletime, "configfolder": self._configfolder, "engineer_model": self._ai_config, "engineer_api_key": self._ai_config, "architect_model": self._ai_config, "architect_api_key": self._ai_config}
return actions.get(args.command)(args)
def _ai_config(self, args):
@@ -622,6 +627,12 @@ class connapp:
elif args.data[0] == "zsh":
print(self._help("zshcompletion"))
def _fzf_wrapper(self, args):
if args.data[0] == "bash":
print(self._help("fzf_wrapper_bash"))
elif args.data[0] == "zsh":
print(self._help("fzf_wrapper_zsh"))
def _case(self, args):
if args.data[0] == "true":
args.data[0] = True
@@ -1520,10 +1531,10 @@ _conn()
mapfile -t strings < <(connpy-completion-helper "bash" "${#COMP_WORDS[@]}" "${COMP_WORDS[@]}")
local IFS=$'\t\n'
local home_dir=$(eval echo ~)
local last_word=${COMP_WORDS[-1]/\~/$home_dir}
local last_word=${COMP_WORDS[-1]/\\~/$home_dir}
COMPREPLY=($(compgen -W "$(printf '%s' "${strings[@]}")" -- "$last_word"))
if [ "$last_word" != "${COMP_WORDS[-1]}" ]; then
COMPREPLY=(${COMPREPLY[@]/$home_dir/\~})
COMPREPLY=(${COMPREPLY[@]/$home_dir/\\~})
fi
}
@@ -1538,12 +1549,12 @@ autoload -U compinit && compinit
_conn()
{
local home_dir=$(eval echo ~)
last_word=${words[-1]/\~/$home_dir}
last_word=${words[-1]/\\~/$home_dir}
strings=($(connpy-completion-helper "zsh" ${#words} $words[1,-2] $last_word))
for string in "${strings[@]}"; do
#Replace the expanded home directory with ~
if [ "$last_word" != "$words[-1]" ]; then
string=${string/$home_dir/\~}
string=${string/$home_dir/\\~}
fi
if [[ "${string}" =~ .*/$ ]]; then
# If the string ends with a '/', do not append a space
@@ -1558,10 +1569,51 @@ compdef _conn conn
compdef _conn connpy
#Here ends zsh completion for conn
'''
if type == "fzf_wrapper_bash":
return '''\n#Here starts bash 0ms fzf wrapper for connpy
connpy() {
if [ $# -eq 0 ]; then
local selected
if [ -f ~/.config/conn/.fzf_nodes_cache.txt ]; then
selected=$(cat ~/.config/conn/.fzf_nodes_cache.txt | fzf-tmux -d 25% --reverse)
else
command connpy
return
fi
if [ -n "$selected" ]; then
command connpy "$selected"
fi
else
command connpy "$@"
fi
}
alias c="connpy"
#Here ends bash 0ms fzf wrapper\n'''
if type == "fzf_wrapper_zsh":
return '''\n#Here starts zsh 0ms fzf wrapper for connpy
connpy() {
if [ $# -eq 0 ]; then
local selected
if [ -f ~/.config/conn/.fzf_nodes_cache.txt ]; then
selected=$(cat ~/.config/conn/.fzf_nodes_cache.txt | fzf-tmux -d 25% --reverse)
else
command connpy
return
fi
if [ -n "$selected" ]; then
command connpy "$selected"
fi
else
command connpy "$@"
fi
}
alias c="connpy"
#Here ends zsh 0ms fzf wrapper\n'''
if type == "run":
return "node[@subfolder][@folder] commmand to run\nRun the specific command on the node and print output\n/path/to/file.yaml\nUse a yaml file to run an automation script"
if type == "generate":
return '''---
return r'''---
tasks:
- name: "Config"
+18 -3
View File
@@ -213,7 +213,7 @@ class sync:
def compress_specific_files(self, zip_path):
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
zipf.write(self.file, "config.json")
zipf.write(self.file, os.path.basename(self.file))
zipf.write(self.key, ".osk")
def compress_and_upload(self):
@@ -251,8 +251,23 @@ class sync:
try:
with zipfile.ZipFile(zip_path, 'r') as zipf:
# Extract the specific file to the specified destination
zipf.extract("config.json", os.path.dirname(self.file))
zipf.extract(".osk", os.path.dirname(self.key))
names = zipf.namelist()
if "config.yaml" in names:
zipf.extract("config.yaml", os.path.dirname(self.file))
elif "config.json" in names:
zipf.extract("config.json", os.path.dirname(self.file))
if ".osk" in names:
zipf.extract(".osk", os.path.dirname(self.key))
# Delete caches to force auto-regeneration on next run
try:
if os.path.exists(self.connapp.config.cachefile):
os.remove(self.connapp.config.cachefile)
if os.path.exists(self.connapp.config.fzf_cachefile):
os.remove(self.connapp.config.fzf_cachefile)
except Exception:
pass
return 0
except Exception as e:
printer.error(f"An error occurred: {e}")
+51
View File
@@ -0,0 +1,51 @@
"""Tests for connpy.core_plugins.capture"""
import pytest
from unittest.mock import MagicMock, patch
from connpy.core_plugins.capture import RemoteCapture
@pytest.fixture
def mock_connapp():
app = MagicMock()
app.nodes_list = ["test_node"]
app.config.getitem.return_value = {"host": "127.0.0.1", "protocol": "ssh"}
mock_node = MagicMock()
mock_node.protocol = "ssh"
mock_node.unique = "test_node"
app.node.return_value = mock_node
app.config.config = {"wireshark_path": "/fake/ws"}
return app
class TestRemoteCapture:
def test_init_node_not_found(self, mock_connapp):
# Attempt to capture a node not in nodes_list
mock_connapp.nodes_list = ["other_node"]
with pytest.raises(SystemExit) as exc:
RemoteCapture(mock_connapp, "test_node", "eth0")
assert exc.value.code == 2
def test_init_success(self, mock_connapp):
rc = RemoteCapture(mock_connapp, "test_node", "eth0")
assert rc.node_name == "test_node"
assert rc.interface == "eth0"
assert rc.wireshark_path == "/fake/ws"
@patch("connpy.core_plugins.capture.socket")
def test_is_port_in_use(self, mock_socket, mock_connapp):
rc = RemoteCapture(mock_connapp, "test_node", "eth0")
mock_sock_instance = MagicMock()
mock_socket.socket.return_value.__enter__.return_value = mock_sock_instance
mock_sock_instance.connect_ex.return_value = 0
assert rc._is_port_in_use(8080) is True
mock_sock_instance.connect_ex.return_value = 1
assert rc._is_port_in_use(8080) is False
@patch.object(RemoteCapture, "_is_port_in_use")
def test_find_free_port(self, mock_is_in_use, mock_connapp):
rc = RemoteCapture(mock_connapp, "test_node", "eth0")
# First 2 ports in use, 3rd is free
mock_is_in_use.side_effect = [True, True, False]
port = rc._find_free_port(20000, 30000)
assert 20000 <= port <= 30000
assert mock_is_in_use.call_count == 3
+12 -11
View File
@@ -3,14 +3,15 @@ import json
import os
import re
import pytest
import yaml
from copy import deepcopy
class TestConfigfileInit:
def test_creates_default_config(self, tmp_config_dir):
"""Creates config.json with defaults when it doesn't exist."""
config_file = tmp_config_dir / "config.json"
config_file.unlink() # Remove existing
"""Creates config.yaml with defaults when it doesn't exist."""
config_file = tmp_config_dir / "config.yaml"
config_file.unlink(missing_ok=True) # Remove existing
key_file = tmp_config_dir / ".osk"
from connpy.configfile import configfile
@@ -27,7 +28,7 @@ class TestConfigfileInit:
key_file.unlink() # Remove existing
from connpy.configfile import configfile
conf = configfile(conf=str(tmp_config_dir / "config.json"), key=str(key_file))
conf = configfile(conf=str(tmp_config_dir / "config.yaml"), key=str(key_file))
assert key_file.exists()
assert conf.privatekey is not None
@@ -41,8 +42,8 @@ class TestConfigfileInit:
def test_config_file_permissions(self, tmp_config_dir):
"""Config is created with 0o600 permissions."""
config_file = tmp_config_dir / "config.json"
config_file.unlink()
config_file = tmp_config_dir / "config.yaml"
config_file.unlink(missing_ok=True)
from connpy.configfile import configfile
configfile(conf=str(config_file), key=str(tmp_config_dir / ".osk"))
@@ -62,7 +63,7 @@ class TestConfigfileInit:
(dot_folder / ".folder").write_text(str(config_dir))
(dot_folder / "plugins").mkdir(exist_ok=True)
conf_path = str(config_dir / "my_config.json")
conf_path = str(config_dir / "my_config.yaml")
key_path = str(config_dir / "my_key")
from connpy.configfile import configfile
@@ -248,7 +249,7 @@ class TestGetItem:
def test_getitem_with_profile_extraction(self, tmp_config_dir):
"""extract=True resolves @profile references."""
config_file = tmp_config_dir / "config.json"
config_file = tmp_config_dir / "config.yaml"
data = {
"config": {"case": False, "idletime": 30, "fzf": False},
"connections": {
@@ -268,7 +269,7 @@ class TestGetItem:
"options": "", "logs": "", "tags": "", "jumphost": ""}
}
}
config_file.write_text(json.dumps(data, indent=4))
config_file.write_text(yaml.dump(data, default_flow_style=False, sort_keys=False))
from connpy.configfile import configfile
conf = configfile(conf=str(config_file), key=str(tmp_config_dir / ".osk"))
@@ -326,7 +327,7 @@ class TestGetAll:
def test_profileused(self, tmp_config_dir):
"""Detects nodes using a specific profile."""
config_file = tmp_config_dir / "config.json"
config_file = tmp_config_dir / "config.yaml"
data = {
"config": {"case": False, "idletime": 30, "fzf": False},
"connections": {
@@ -352,7 +353,7 @@ class TestGetAll:
"options": "", "logs": "", "tags": "", "jumphost": ""}
}
}
config_file.write_text(json.dumps(data, indent=4))
config_file.write_text(yaml.dump(data, default_flow_style=False, sort_keys=False))
from connpy.configfile import configfile
conf = configfile(conf=str(config_file), key=str(tmp_config_dir / ".osk"))
+109
View File
@@ -0,0 +1,109 @@
"""Tests for connpy.core_plugins.context"""
import pytest
from unittest.mock import MagicMock, patch
from connpy.core_plugins.context import context_manager, Preload, Entrypoint
@pytest.fixture
def mock_connapp():
connapp = MagicMock()
connapp.config.config = {
"contexts": {"all": [".*"]},
"current_context": "all"
}
return connapp
class TestContextManager:
def test_init(self, mock_connapp):
cm = context_manager(mock_connapp)
assert cm.contexts == {"all": [".*"]}
assert cm.current_context == "all"
assert len(cm.regex) == 1
def test_add_context_success(self, mock_connapp):
cm = context_manager(mock_connapp)
cm.add_context("prod", ["^prod_.*"])
assert "prod" in cm.contexts
mock_connapp._change_settings.assert_called_with("contexts", cm.contexts)
def test_add_context_invalid_name(self, mock_connapp):
cm = context_manager(mock_connapp)
with pytest.raises(SystemExit) as exc:
cm.add_context("prod-env", ["Regex"])
assert exc.value.code == 1
def test_add_context_already_exists(self, mock_connapp):
cm = context_manager(mock_connapp)
with pytest.raises(SystemExit) as exc:
cm.add_context("all", ["Regex"])
assert exc.value.code == 2
def test_modify_context_success(self, mock_connapp):
cm = context_manager(mock_connapp)
cm.add_context("prod", ["old"])
cm.modify_context("prod", ["new"])
assert cm.contexts["prod"] == ["new"]
def test_modify_context_all(self, mock_connapp):
cm = context_manager(mock_connapp)
with pytest.raises(SystemExit) as exc:
cm.modify_context("all", ["new"])
assert exc.value.code == 3
def test_modify_context_not_exists(self, mock_connapp):
cm = context_manager(mock_connapp)
with pytest.raises(SystemExit) as exc:
cm.modify_context("fake", ["new"])
assert exc.value.code == 4
def test_delete_context_success(self, mock_connapp):
cm = context_manager(mock_connapp)
cm.add_context("prod", ["old"])
cm.delete_context("prod")
assert "prod" not in cm.contexts
def test_delete_context_all(self, mock_connapp):
cm = context_manager(mock_connapp)
with pytest.raises(SystemExit) as exc:
cm.delete_context("all")
assert exc.value.code == 3
def test_delete_context_current(self, mock_connapp):
mock_connapp.config.config["current_context"] = "prod"
mock_connapp.config.config["contexts"]["prod"] = [".*"]
cm = context_manager(mock_connapp)
with pytest.raises(SystemExit) as exc:
cm.delete_context("prod")
assert exc.value.code == 5
def test_set_context_success(self, mock_connapp):
cm = context_manager(mock_connapp)
cm.contexts["prod"] = [".*"]
cm.set_context("prod")
mock_connapp._change_settings.assert_called_with("current_context", "prod")
def test_set_context_already_set(self, mock_connapp):
cm = context_manager(mock_connapp)
with pytest.raises(SystemExit) as exc:
cm.set_context("all")
assert exc.value.code == 0
def test_match_regexp(self, mock_connapp):
mock_connapp.config.config["contexts"]["all"] = ["^prod", "^test"]
cm = context_manager(mock_connapp)
assert cm.match_any_regex("prod_node", cm.regex) is True
assert cm.match_any_regex("test_node", cm.regex) is True
assert cm.match_any_regex("dev_node", cm.regex) is False
def test_modify_node_list(self, mock_connapp):
mock_connapp.config.config["contexts"]["all"] = ["^prod"]
cm = context_manager(mock_connapp)
nodes = ["prod_1", "dev_1", "prod_2"]
result = cm.modify_node_list(result=nodes)
assert result == ["prod_1", "prod_2"]
def test_modify_node_dict(self, mock_connapp):
mock_connapp.config.config["contexts"]["all"] = ["^prod"]
cm = context_manager(mock_connapp)
nodes = {"prod_1": {}, "dev_1": {}, "prod_2": {}}
result = cm.modify_node_dict(result=nodes)
assert set(result.keys()) == {"prod_1", "prod_2"}
+108
View File
@@ -0,0 +1,108 @@
"""Tests for connpy.core_plugins.sync"""
import pytest
from unittest.mock import MagicMock, patch, mock_open
from connpy.core_plugins.sync import sync
@pytest.fixture
def mock_connapp():
app = MagicMock()
app.config.defaultdir = "/fake/dir"
app.config.file = "/fake/dir/config.yaml"
app.config.key = "/fake/dir/.osk"
app.config.config = {"sync": True}
return app
class TestSyncPlugin:
def test_init(self, mock_connapp):
s = sync(mock_connapp)
assert s.sync is True
assert s.file == "/fake/dir/config.yaml"
assert s.token_file == "/fake/dir/gtoken.json"
@patch("connpy.core_plugins.sync.os.path.exists")
@patch("connpy.core_plugins.sync.Credentials")
def test_get_credentials_success(self, MockCreds, mock_exists, mock_connapp):
mock_exists.return_value = True
mock_cred_instance = MagicMock()
mock_cred_instance.valid = True
MockCreds.from_authorized_user_file.return_value = mock_cred_instance
s = sync(mock_connapp)
creds = s.get_credentials()
assert creds == mock_cred_instance
@patch("connpy.core_plugins.sync.os.path.exists")
def test_get_credentials_not_found(self, mock_exists, mock_connapp):
mock_exists.return_value = False
s = sync(mock_connapp)
assert s.get_credentials() == 0
@patch("connpy.core_plugins.sync.zipfile.ZipFile")
@patch("connpy.core_plugins.sync.os.path.basename")
def test_compress_specific_files(self, mock_basename, MockZipFile, mock_connapp):
mock_basename.return_value = "config.yaml"
s = sync(mock_connapp)
zip_mock = MagicMock()
MockZipFile.return_value.__enter__.return_value = zip_mock
s.compress_specific_files("/fake/zip.zip")
zip_mock.write.assert_any_call(s.file, "config.yaml")
zip_mock.write.assert_any_call(s.key, ".osk")
@patch("connpy.core_plugins.sync.zipfile.ZipFile")
@patch("connpy.core_plugins.sync.os.path.dirname")
def test_decompress_zip_yaml(self, mock_dirname, MockZipFile, mock_connapp):
mock_dirname.return_value = "/fake/dir"
s = sync(mock_connapp)
zip_mock = MagicMock()
zip_mock.namelist.return_value = ["config.yaml", ".osk"]
MockZipFile.return_value.__enter__.return_value = zip_mock
assert s.decompress_zip("/fake/zip.zip") == 0
zip_mock.extract.assert_any_call("config.yaml", "/fake/dir")
zip_mock.extract.assert_any_call(".osk", "/fake/dir")
@patch("connpy.core_plugins.sync.zipfile.ZipFile")
@patch("connpy.core_plugins.sync.os.path.dirname")
def test_decompress_zip_json_fallback(self, mock_dirname, MockZipFile, mock_connapp):
mock_dirname.return_value = "/fake/dir"
s = sync(mock_connapp)
zip_mock = MagicMock()
zip_mock.namelist.return_value = ["config.json", ".osk"]
MockZipFile.return_value.__enter__.return_value = zip_mock
assert s.decompress_zip("/fake/old_zip.zip") == 0
zip_mock.extract.assert_any_call("config.json", "/fake/dir")
@patch.object(sync, "get_credentials")
@patch("connpy.core_plugins.sync.build")
def test_get_appdata_files(self, mock_build, mock_get_credentials, mock_connapp):
mock_get_credentials.return_value = MagicMock()
mock_service = MagicMock()
mock_build.return_value = mock_service
mock_service.files().list().execute.return_value = {
"files": [
{"id": "1", "name": "backup1.zip", "appProperties": {"timestamp": "1000", "date": "2024"}}
]
}
s = sync(mock_connapp)
files = s.get_appdata_files()
assert len(files) == 1
assert files[0]["id"] == "1"
assert files[0]["timestamp"] == "1000"
@patch.object(sync, "get_credentials")
@patch("connpy.core_plugins.sync.build")
@patch("connpy.core_plugins.sync.MediaFileUpload")
@patch("connpy.core_plugins.sync.os.path.basename")
def test_backup_file_to_drive(self, mock_basename, mock_media, mock_build, mock_get_credentials, mock_connapp):
mock_get_credentials.return_value = MagicMock()
mock_basename.return_value = "backup.zip"
mock_service = MagicMock()
mock_build.return_value = mock_service
s = sync(mock_connapp)
assert s.backup_file_to_drive("/fake/backup.zip", 1234567890000) == 0
mock_service.files().create.assert_called_once()