From d8f7d4db87f84d3960cbccfe12d07f2779ce239d Mon Sep 17 00:00:00 2001 From: Fede Luzzi Date: Fri, 3 Apr 2026 18:47:03 -0300 Subject: [PATCH] feat: migrate config to YAML, add dual-caching and 0ms fzf wrapper - Migrated configuration backend from JSON to YAML for better readability. - Added automatic dual-caching (.config.cache.json) to preserve fast load times with YAML. - Implemented a new 0ms latency fzf wrapper for bash and zsh (--fzf-wrapper). - Updated sync plugin to support the new YAML config format and clear caches on extraction. - Refactored 'completion.py' to gracefully handle fallback config formats. - Added new test modules (test_capture, test_context, test_sync) covering core plugins. - Updated existing unit tests to handle YAML config creation and parsing. - Bumped version to 5.0b3 and regenerated HTML documentation. --- .gitignore | 3 + connpy/_version.py | 2 +- connpy/completion.py | 18 +- connpy/configfile.py | 113 ++++-- connpy/connapp.py | 64 +++- connpy/core_plugins/sync.py | 21 +- connpy/tests/test_capture.py | 51 +++ connpy/tests/test_configfile.py | 23 +- connpy/tests/test_context.py | 109 ++++++ connpy/tests/test_sync.py | 108 ++++++ docs/connpy/index.html | 111 ++++-- docs/connpy/tests/index.html | 15 + docs/connpy/tests/test_capture.html | 235 ++++++++++++ docs/connpy/tests/test_configfile.html | 46 +-- docs/connpy/tests/test_context.html | 475 +++++++++++++++++++++++++ docs/connpy/tests/test_sync.html | 396 +++++++++++++++++++++ 16 files changed, 1681 insertions(+), 109 deletions(-) create mode 100644 connpy/tests/test_capture.py create mode 100644 connpy/tests/test_context.py create mode 100644 connpy/tests/test_sync.py create mode 100644 docs/connpy/tests/test_capture.html create mode 100644 docs/connpy/tests/test_context.html create mode 100644 docs/connpy/tests/test_sync.html diff --git a/.gitignore b/.gitignore index 21e1cd4..a2d4457 100644 --- a/.gitignore +++ b/.gitignore @@ -142,3 +142,6 @@ GEMINI.md node_modules/ package-lock.json package.json + +# Development docs +connpy_roadmap.md diff --git a/connpy/_version.py b/connpy/_version.py index f27f650..c25905d 100644 --- a/connpy/_version.py +++ b/connpy/_version.py @@ -1,2 +1,2 @@ -__version__ = "5.0b2" +__version__ = "5.0b3" diff --git a/connpy/completion.py b/connpy/completion.py index 448fcd5..e46427c 100755 --- a/connpy/completion.py +++ b/connpy/completion.py @@ -97,9 +97,21 @@ def main(): configdir = f.read().strip() except (FileNotFoundError, IOError): configdir = defaultdir - defaultfile = configdir + '/config.json' - jsonconf = open(defaultfile) - config = json.load(jsonconf) + cachefile = configdir + '/.config.cache.json' + try: + with open(cachefile, "r") as jsonconf: + config = json.load(jsonconf) + except FileNotFoundError: + try: + import yaml + with open(configdir + '/config.yaml', "r") as yamlconf: + config = yaml.safe_load(yamlconf) + except Exception: + try: + with open(configdir + '/config.json', "r") as jsonconf: + config = json.load(jsonconf) + except Exception: + exit() nodes = _getallnodes(config) folders = _getallfolders(config) profiles = list(config["profiles"].keys()) diff --git a/connpy/configfile.py b/connpy/configfile.py index 5a4a0e0..716aca0 100755 --- a/connpy/configfile.py +++ b/connpy/configfile.py @@ -3,6 +3,8 @@ import json import os import re +import yaml +import shutil from Crypto.PublicKey import RSA from Crypto.Cipher import PKCS1_OAEP from pathlib import Path @@ -65,16 +67,40 @@ class configfile: with open(pathfile, "w") as f: f.write(str(defaultdir)) configdir = defaultdir - defaultfile = configdir + '/config.json' + defaultfile = configdir + '/config.yaml' + self.cachefile = configdir + '/.config.cache.json' + self.fzf_cachefile = configdir + '/.fzf_nodes_cache.txt' defaultkey = configdir + '/.osk' if conf == None: self.file = defaultfile + + # Backwards compatibility: Migrate from JSON to YAML + legacy_json = configdir + '/config.json' + legacy_noext = configdir + '/config' + legacy_file = None + if os.path.exists(legacy_json): legacy_file = legacy_json + elif os.path.exists(legacy_noext): legacy_file = legacy_noext + + if not os.path.exists(self.file) and legacy_file: + try: + with open(legacy_file, 'r') as f: + old_data = json.load(f) + with open(self.file, 'w') as f: + yaml.dump(old_data, f, default_flow_style=False, sort_keys=False) + with open(self.cachefile, 'w') as f: + json.dump(old_data, f) + shutil.move(legacy_file, legacy_file + ".backup") + printer.success(f"Migrated legacy config ({len(old_data.get('connections',{}))} folders/nodes) into YAML and Cache successfully!") + except Exception as e: + printer.warning(f"Failed to migrate legacy config: {e}") else: self.file = conf + if key == None: self.key = defaultkey else: self.key = key + if os.path.exists(self.file): config = self._loadconfig(self.file) else: @@ -91,23 +117,38 @@ class configfile: def _loadconfig(self, conf): - #Loads config file - jsonconf = open(conf) - jsondata = json.load(jsonconf) - jsonconf.close() - return jsondata + #Loads config file using dual cache + cache_exists = os.path.exists(self.cachefile) + yaml_time = os.path.getmtime(conf) if os.path.exists(conf) else 0 + cache_time = os.path.getmtime(self.cachefile) if cache_exists else 0 + + if not cache_exists or yaml_time > cache_time: + with open(conf, 'r') as f: + data = yaml.safe_load(f) + try: + with open(self.cachefile, 'w') as f: + json.dump(data, f) + except Exception: + pass + return data + else: + with open(self.cachefile, 'r') as f: + return json.load(f) def _createconfig(self, conf): #Create config file defaultconfig = {'config': {'case': False, 'idletime': 30, 'fzf': False}, 'connections': {}, 'profiles': { "default": { "host":"", "protocol":"ssh", "port":"", "user":"", "password":"", "options":"", "logs":"", "tags": "", "jumphost":""}}} if not os.path.exists(conf): with open(conf, "w") as f: - json.dump(defaultconfig, f, indent = 4) - f.close() + yaml.dump(defaultconfig, f, default_flow_style=False, sort_keys=False) os.chmod(conf, 0o600) - jsonconf = open(conf) - jsondata = json.load(jsonconf) - jsonconf.close() + try: + with open(self.cachefile, 'w') as f: + json.dump(defaultconfig, f) + except Exception: + pass + with open(conf, 'r') as f: + jsondata = yaml.safe_load(f) return jsondata @MethodHook @@ -119,13 +160,23 @@ class configfile: newconfig["profiles"] = self.profiles try: with open(conf, "w") as f: - json.dump(newconfig, f, indent = 4) - f.close() + yaml.dump(newconfig, f, default_flow_style=False, sort_keys=False) + with open(self.cachefile, "w") as f: + json.dump(newconfig, f) + self._generate_nodes_cache() except (IOError, OSError) as e: printer.error(f"Failed to save config: {e}") return 1 return 0 + def _generate_nodes_cache(self): + try: + nodes = self._getallnodes() + with open(self.fzf_cachefile, "w") as f: + f.write("\n".join(nodes)) + except Exception: + pass + def _createkey(self, keyfile): #Create key file key = RSA.generate(2048) @@ -344,15 +395,15 @@ class configfile: def _getallnodes(self, filter = None): #get all nodes on configfile nodes = [] - layer1 = [k for k,v in self.connections.items() if isinstance(v, dict) and v["type"] == "connection"] - folders = [k for k,v in self.connections.items() if isinstance(v, dict) and v["type"] == "folder"] + layer1 = [k for k,v in self.connections.items() if isinstance(v, dict) and v.get("type") == "connection"] + folders = [k for k,v in self.connections.items() if isinstance(v, dict) and v.get("type") == "folder"] nodes.extend(layer1) for f in folders: - layer2 = [k + "@" + f for k,v in self.connections[f].items() if isinstance(v, dict) and v["type"] == "connection"] + layer2 = [k + "@" + f for k,v in self.connections[f].items() if isinstance(v, dict) and v.get("type") == "connection"] nodes.extend(layer2) - subfolders = [k for k,v in self.connections[f].items() if isinstance(v, dict) and v["type"] == "subfolder"] + subfolders = [k for k,v in self.connections[f].items() if isinstance(v, dict) and v.get("type") == "subfolder"] for s in subfolders: - layer3 = [k + "@" + s + "@" + f for k,v in self.connections[f][s].items() if isinstance(v, dict) and v["type"] == "connection"] + layer3 = [k + "@" + s + "@" + f for k,v in self.connections[f][s].items() if isinstance(v, dict) and v.get("type") == "connection"] nodes.extend(layer3) if filter: if isinstance(filter, str): @@ -367,15 +418,15 @@ class configfile: def _getallnodesfull(self, filter = None, extract = True): #get all nodes on configfile with all their attributes. nodes = {} - layer1 = {k:v for k,v in self.connections.items() if isinstance(v, dict) and v["type"] == "connection"} - folders = [k for k,v in self.connections.items() if isinstance(v, dict) and v["type"] == "folder"] + layer1 = {k:v for k,v in self.connections.items() if isinstance(v, dict) and v.get("type") == "connection"} + folders = [k for k,v in self.connections.items() if isinstance(v, dict) and v.get("type") == "folder"] nodes.update(layer1) for f in folders: - layer2 = {k + "@" + f:v for k,v in self.connections[f].items() if isinstance(v, dict) and v["type"] == "connection"} + layer2 = {k + "@" + f:v for k,v in self.connections[f].items() if isinstance(v, dict) and v.get("type") == "connection"} nodes.update(layer2) - subfolders = [k for k,v in self.connections[f].items() if isinstance(v, dict) and v["type"] == "subfolder"] + subfolders = [k for k,v in self.connections[f].items() if isinstance(v, dict) and v.get("type") == "subfolder"] for s in subfolders: - layer3 = {k + "@" + s + "@" + f:v for k,v in self.connections[f][s].items() if isinstance(v, dict) and v["type"] == "connection"} + layer3 = {k + "@" + s + "@" + f:v for k,v in self.connections[f][s].items() if isinstance(v, dict) and v.get("type") == "connection"} nodes.update(layer3) if filter: if isinstance(filter, str): @@ -406,27 +457,27 @@ class configfile: @MethodHook def _getallfolders(self): #get all folders on configfile - folders = ["@" + k for k,v in self.connections.items() if isinstance(v, dict) and v["type"] == "folder"] + folders = ["@" + k for k,v in self.connections.items() if isinstance(v, dict) and v.get("type") == "folder"] subfolders = [] for f in folders: - s = ["@" + k + f for k,v in self.connections[f[1:]].items() if isinstance(v, dict) and v["type"] == "subfolder"] + s = ["@" + k + f for k,v in self.connections[f[1:]].items() if isinstance(v, dict) and v.get("type") == "subfolder"] subfolders.extend(s) folders.extend(subfolders) return folders @MethodHook def _profileused(self, profile): - #Check if profile is used before deleting it + #Return all the nodes that uses this profile. nodes = [] - layer1 = [k for k,v in self.connections.items() if isinstance(v, dict) and v["type"] == "connection" and ("@" + profile in v.values() or ( isinstance(v["password"],list) and "@" + profile in v["password"]))] - folders = [k for k,v in self.connections.items() if isinstance(v, dict) and v["type"] == "folder"] + layer1 = [k for k,v in self.connections.items() if isinstance(v, dict) and v.get("type") == "connection" and ("@" + profile in v.values() or ( isinstance(v.get("password"),list) and "@" + profile in v.get("password")))] + folders = [k for k,v in self.connections.items() if isinstance(v, dict) and v.get("type") == "folder"] nodes.extend(layer1) for f in folders: - layer2 = [k + "@" + f for k,v in self.connections[f].items() if isinstance(v, dict) and v["type"] == "connection" and ("@" + profile in v.values() or ( isinstance(v["password"],list) and "@" + profile in v["password"]))] + layer2 = [k + "@" + f for k,v in self.connections[f].items() if isinstance(v, dict) and v.get("type") == "connection" and ("@" + profile in v.values() or ( isinstance(v.get("password"),list) and "@" + profile in v.get("password")))] nodes.extend(layer2) - subfolders = [k for k,v in self.connections[f].items() if isinstance(v, dict) and v["type"] == "subfolder"] + subfolders = [k for k,v in self.connections[f].items() if isinstance(v, dict) and v.get("type") == "subfolder"] for s in subfolders: - layer3 = [k + "@" + s + "@" + f for k,v in self.connections[f][s].items() if isinstance(v, dict) and v["type"] == "connection" and ("@" + profile in v.values() or ( isinstance(v["password"],list) and "@" + profile in v["password"]))] + layer3 = [k + "@" + s + "@" + f for k,v in self.connections[f][s].items() if isinstance(v, dict) and v.get("type") == "connection" and ("@" + profile in v.values() or ( isinstance(v.get("password"),list) and "@" + profile in v.get("password")))] nodes.extend(layer3) return nodes diff --git a/connpy/connapp.py b/connpy/connapp.py index de97bc8..8a340c4 100755 --- a/connpy/connapp.py +++ b/connpy/connapp.py @@ -166,6 +166,7 @@ class connapp: configcrud.add_argument("--fzf", dest="fzf", nargs=1, action=self._store_type, help="Use fzf for lists", choices=["true","false"]) configcrud.add_argument("--keepalive", dest="idletime", nargs=1, action=self._store_type, help="Set keepalive time in seconds, 0 to disable", type=int, metavar="INT") configcrud.add_argument("--completion", dest="completion", nargs=1, choices=["bash","zsh"], action=self._store_type, help="Get terminal completion configuration for conn") + configcrud.add_argument("--fzf-wrapper", dest="fzf_wrapper", nargs=1, choices=["bash","zsh"], action=self._store_type, help="Get 0ms latency fzf bash/zsh wrapper") configcrud.add_argument("--configfolder", dest="configfolder", nargs=1, action=self._store_type, help="Set the default location for config file", metavar="FOLDER") configcrud.add_argument("--engineer-model", dest="engineer_model", nargs=1, action=self._store_type, help="Set engineer model", metavar="MODEL") configcrud.add_argument("--engineer-api-key", dest="engineer_api_key", nargs=1, action=self._store_type, help="Set engineer api_key", metavar="API_KEY") @@ -186,6 +187,10 @@ class connapp: printer.warning(e) for preload in self.plugins.preloads.values(): preload.Preload(self) + + if not os.path.exists(self.config.fzf_cachefile): + self.config._generate_nodes_cache() + #Generate helps nodeparser.usage = self._help("usage", subparsers) nodeparser.epilog = self._help("end", subparsers) @@ -482,7 +487,7 @@ class connapp: def _func_others(self, args): #Function called when using other commands - actions = {"ls": self._ls, "move": self._mvcp, "cp": self._mvcp, "bulk": self._bulk, "completion": self._completion, "case": self._case, "fzf": self._fzf, "idletime": self._idletime, "configfolder": self._configfolder, "engineer_model": self._ai_config, "engineer_api_key": self._ai_config, "architect_model": self._ai_config, "architect_api_key": self._ai_config} + actions = {"ls": self._ls, "move": self._mvcp, "cp": self._mvcp, "bulk": self._bulk, "completion": self._completion, "fzf_wrapper": self._fzf_wrapper, "case": self._case, "fzf": self._fzf, "idletime": self._idletime, "configfolder": self._configfolder, "engineer_model": self._ai_config, "engineer_api_key": self._ai_config, "architect_model": self._ai_config, "architect_api_key": self._ai_config} return actions.get(args.command)(args) def _ai_config(self, args): @@ -622,6 +627,12 @@ class connapp: elif args.data[0] == "zsh": print(self._help("zshcompletion")) + def _fzf_wrapper(self, args): + if args.data[0] == "bash": + print(self._help("fzf_wrapper_bash")) + elif args.data[0] == "zsh": + print(self._help("fzf_wrapper_zsh")) + def _case(self, args): if args.data[0] == "true": args.data[0] = True @@ -1520,10 +1531,10 @@ _conn() mapfile -t strings < <(connpy-completion-helper "bash" "${#COMP_WORDS[@]}" "${COMP_WORDS[@]}") local IFS=$'\t\n' local home_dir=$(eval echo ~) - local last_word=${COMP_WORDS[-1]/\~/$home_dir} + local last_word=${COMP_WORDS[-1]/\\~/$home_dir} COMPREPLY=($(compgen -W "$(printf '%s' "${strings[@]}")" -- "$last_word")) if [ "$last_word" != "${COMP_WORDS[-1]}" ]; then - COMPREPLY=(${COMPREPLY[@]/$home_dir/\~}) + COMPREPLY=(${COMPREPLY[@]/$home_dir/\\~}) fi } @@ -1538,12 +1549,12 @@ autoload -U compinit && compinit _conn() { local home_dir=$(eval echo ~) - last_word=${words[-1]/\~/$home_dir} + last_word=${words[-1]/\\~/$home_dir} strings=($(connpy-completion-helper "zsh" ${#words} $words[1,-2] $last_word)) for string in "${strings[@]}"; do #Replace the expanded home directory with ~ if [ "$last_word" != "$words[-1]" ]; then - string=${string/$home_dir/\~} + string=${string/$home_dir/\\~} fi if [[ "${string}" =~ .*/$ ]]; then # If the string ends with a '/', do not append a space @@ -1558,10 +1569,51 @@ compdef _conn conn compdef _conn connpy #Here ends zsh completion for conn ''' + if type == "fzf_wrapper_bash": + return '''\n#Here starts bash 0ms fzf wrapper for connpy +connpy() { + if [ $# -eq 0 ]; then + local selected + if [ -f ~/.config/conn/.fzf_nodes_cache.txt ]; then + selected=$(cat ~/.config/conn/.fzf_nodes_cache.txt | fzf-tmux -d 25% --reverse) + else + command connpy + return + fi + if [ -n "$selected" ]; then + command connpy "$selected" + fi + else + command connpy "$@" + fi +} +alias c="connpy" +#Here ends bash 0ms fzf wrapper\n''' + + if type == "fzf_wrapper_zsh": + return '''\n#Here starts zsh 0ms fzf wrapper for connpy +connpy() { + if [ $# -eq 0 ]; then + local selected + if [ -f ~/.config/conn/.fzf_nodes_cache.txt ]; then + selected=$(cat ~/.config/conn/.fzf_nodes_cache.txt | fzf-tmux -d 25% --reverse) + else + command connpy + return + fi + if [ -n "$selected" ]; then + command connpy "$selected" + fi + else + command connpy "$@" + fi +} +alias c="connpy" +#Here ends zsh 0ms fzf wrapper\n''' if type == "run": return "node[@subfolder][@folder] commmand to run\nRun the specific command on the node and print output\n/path/to/file.yaml\nUse a yaml file to run an automation script" if type == "generate": - return '''--- + return r'''--- tasks: - name: "Config" diff --git a/connpy/core_plugins/sync.py b/connpy/core_plugins/sync.py index 92eb040..712bca2 100755 --- a/connpy/core_plugins/sync.py +++ b/connpy/core_plugins/sync.py @@ -213,7 +213,7 @@ class sync: def compress_specific_files(self, zip_path): with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: - zipf.write(self.file, "config.json") + zipf.write(self.file, os.path.basename(self.file)) zipf.write(self.key, ".osk") def compress_and_upload(self): @@ -251,8 +251,23 @@ class sync: try: with zipfile.ZipFile(zip_path, 'r') as zipf: # Extract the specific file to the specified destination - zipf.extract("config.json", os.path.dirname(self.file)) - zipf.extract(".osk", os.path.dirname(self.key)) + names = zipf.namelist() + if "config.yaml" in names: + zipf.extract("config.yaml", os.path.dirname(self.file)) + elif "config.json" in names: + zipf.extract("config.json", os.path.dirname(self.file)) + + if ".osk" in names: + zipf.extract(".osk", os.path.dirname(self.key)) + + # Delete caches to force auto-regeneration on next run + try: + if os.path.exists(self.connapp.config.cachefile): + os.remove(self.connapp.config.cachefile) + if os.path.exists(self.connapp.config.fzf_cachefile): + os.remove(self.connapp.config.fzf_cachefile) + except Exception: + pass return 0 except Exception as e: printer.error(f"An error occurred: {e}") diff --git a/connpy/tests/test_capture.py b/connpy/tests/test_capture.py new file mode 100644 index 0000000..b848c56 --- /dev/null +++ b/connpy/tests/test_capture.py @@ -0,0 +1,51 @@ +"""Tests for connpy.core_plugins.capture""" +import pytest +from unittest.mock import MagicMock, patch +from connpy.core_plugins.capture import RemoteCapture + +@pytest.fixture +def mock_connapp(): + app = MagicMock() + app.nodes_list = ["test_node"] + app.config.getitem.return_value = {"host": "127.0.0.1", "protocol": "ssh"} + mock_node = MagicMock() + mock_node.protocol = "ssh" + mock_node.unique = "test_node" + app.node.return_value = mock_node + app.config.config = {"wireshark_path": "/fake/ws"} + return app + +class TestRemoteCapture: + def test_init_node_not_found(self, mock_connapp): + # Attempt to capture a node not in nodes_list + mock_connapp.nodes_list = ["other_node"] + with pytest.raises(SystemExit) as exc: + RemoteCapture(mock_connapp, "test_node", "eth0") + assert exc.value.code == 2 + + def test_init_success(self, mock_connapp): + rc = RemoteCapture(mock_connapp, "test_node", "eth0") + assert rc.node_name == "test_node" + assert rc.interface == "eth0" + assert rc.wireshark_path == "/fake/ws" + + @patch("connpy.core_plugins.capture.socket") + def test_is_port_in_use(self, mock_socket, mock_connapp): + rc = RemoteCapture(mock_connapp, "test_node", "eth0") + mock_sock_instance = MagicMock() + mock_socket.socket.return_value.__enter__.return_value = mock_sock_instance + + mock_sock_instance.connect_ex.return_value = 0 + assert rc._is_port_in_use(8080) is True + + mock_sock_instance.connect_ex.return_value = 1 + assert rc._is_port_in_use(8080) is False + + @patch.object(RemoteCapture, "_is_port_in_use") + def test_find_free_port(self, mock_is_in_use, mock_connapp): + rc = RemoteCapture(mock_connapp, "test_node", "eth0") + # First 2 ports in use, 3rd is free + mock_is_in_use.side_effect = [True, True, False] + port = rc._find_free_port(20000, 30000) + assert 20000 <= port <= 30000 + assert mock_is_in_use.call_count == 3 diff --git a/connpy/tests/test_configfile.py b/connpy/tests/test_configfile.py index 6420bd7..39c508b 100644 --- a/connpy/tests/test_configfile.py +++ b/connpy/tests/test_configfile.py @@ -3,14 +3,15 @@ import json import os import re import pytest +import yaml from copy import deepcopy class TestConfigfileInit: def test_creates_default_config(self, tmp_config_dir): - """Creates config.json with defaults when it doesn't exist.""" - config_file = tmp_config_dir / "config.json" - config_file.unlink() # Remove existing + """Creates config.yaml with defaults when it doesn't exist.""" + config_file = tmp_config_dir / "config.yaml" + config_file.unlink(missing_ok=True) # Remove existing key_file = tmp_config_dir / ".osk" from connpy.configfile import configfile @@ -27,7 +28,7 @@ class TestConfigfileInit: key_file.unlink() # Remove existing from connpy.configfile import configfile - conf = configfile(conf=str(tmp_config_dir / "config.json"), key=str(key_file)) + conf = configfile(conf=str(tmp_config_dir / "config.yaml"), key=str(key_file)) assert key_file.exists() assert conf.privatekey is not None @@ -41,8 +42,8 @@ class TestConfigfileInit: def test_config_file_permissions(self, tmp_config_dir): """Config is created with 0o600 permissions.""" - config_file = tmp_config_dir / "config.json" - config_file.unlink() + config_file = tmp_config_dir / "config.yaml" + config_file.unlink(missing_ok=True) from connpy.configfile import configfile configfile(conf=str(config_file), key=str(tmp_config_dir / ".osk")) @@ -62,7 +63,7 @@ class TestConfigfileInit: (dot_folder / ".folder").write_text(str(config_dir)) (dot_folder / "plugins").mkdir(exist_ok=True) - conf_path = str(config_dir / "my_config.json") + conf_path = str(config_dir / "my_config.yaml") key_path = str(config_dir / "my_key") from connpy.configfile import configfile @@ -248,7 +249,7 @@ class TestGetItem: def test_getitem_with_profile_extraction(self, tmp_config_dir): """extract=True resolves @profile references.""" - config_file = tmp_config_dir / "config.json" + config_file = tmp_config_dir / "config.yaml" data = { "config": {"case": False, "idletime": 30, "fzf": False}, "connections": { @@ -268,7 +269,7 @@ class TestGetItem: "options": "", "logs": "", "tags": "", "jumphost": ""} } } - config_file.write_text(json.dumps(data, indent=4)) + config_file.write_text(yaml.dump(data, default_flow_style=False, sort_keys=False)) from connpy.configfile import configfile conf = configfile(conf=str(config_file), key=str(tmp_config_dir / ".osk")) @@ -326,7 +327,7 @@ class TestGetAll: def test_profileused(self, tmp_config_dir): """Detects nodes using a specific profile.""" - config_file = tmp_config_dir / "config.json" + config_file = tmp_config_dir / "config.yaml" data = { "config": {"case": False, "idletime": 30, "fzf": False}, "connections": { @@ -352,7 +353,7 @@ class TestGetAll: "options": "", "logs": "", "tags": "", "jumphost": ""} } } - config_file.write_text(json.dumps(data, indent=4)) + config_file.write_text(yaml.dump(data, default_flow_style=False, sort_keys=False)) from connpy.configfile import configfile conf = configfile(conf=str(config_file), key=str(tmp_config_dir / ".osk")) diff --git a/connpy/tests/test_context.py b/connpy/tests/test_context.py new file mode 100644 index 0000000..f38709e --- /dev/null +++ b/connpy/tests/test_context.py @@ -0,0 +1,109 @@ +"""Tests for connpy.core_plugins.context""" +import pytest +from unittest.mock import MagicMock, patch +from connpy.core_plugins.context import context_manager, Preload, Entrypoint + +@pytest.fixture +def mock_connapp(): + connapp = MagicMock() + connapp.config.config = { + "contexts": {"all": [".*"]}, + "current_context": "all" + } + return connapp + +class TestContextManager: + def test_init(self, mock_connapp): + cm = context_manager(mock_connapp) + assert cm.contexts == {"all": [".*"]} + assert cm.current_context == "all" + assert len(cm.regex) == 1 + + def test_add_context_success(self, mock_connapp): + cm = context_manager(mock_connapp) + cm.add_context("prod", ["^prod_.*"]) + assert "prod" in cm.contexts + mock_connapp._change_settings.assert_called_with("contexts", cm.contexts) + + def test_add_context_invalid_name(self, mock_connapp): + cm = context_manager(mock_connapp) + with pytest.raises(SystemExit) as exc: + cm.add_context("prod-env", ["Regex"]) + assert exc.value.code == 1 + + def test_add_context_already_exists(self, mock_connapp): + cm = context_manager(mock_connapp) + with pytest.raises(SystemExit) as exc: + cm.add_context("all", ["Regex"]) + assert exc.value.code == 2 + + def test_modify_context_success(self, mock_connapp): + cm = context_manager(mock_connapp) + cm.add_context("prod", ["old"]) + cm.modify_context("prod", ["new"]) + assert cm.contexts["prod"] == ["new"] + + def test_modify_context_all(self, mock_connapp): + cm = context_manager(mock_connapp) + with pytest.raises(SystemExit) as exc: + cm.modify_context("all", ["new"]) + assert exc.value.code == 3 + + def test_modify_context_not_exists(self, mock_connapp): + cm = context_manager(mock_connapp) + with pytest.raises(SystemExit) as exc: + cm.modify_context("fake", ["new"]) + assert exc.value.code == 4 + + def test_delete_context_success(self, mock_connapp): + cm = context_manager(mock_connapp) + cm.add_context("prod", ["old"]) + cm.delete_context("prod") + assert "prod" not in cm.contexts + + def test_delete_context_all(self, mock_connapp): + cm = context_manager(mock_connapp) + with pytest.raises(SystemExit) as exc: + cm.delete_context("all") + assert exc.value.code == 3 + + def test_delete_context_current(self, mock_connapp): + mock_connapp.config.config["current_context"] = "prod" + mock_connapp.config.config["contexts"]["prod"] = [".*"] + cm = context_manager(mock_connapp) + with pytest.raises(SystemExit) as exc: + cm.delete_context("prod") + assert exc.value.code == 5 + + def test_set_context_success(self, mock_connapp): + cm = context_manager(mock_connapp) + cm.contexts["prod"] = [".*"] + cm.set_context("prod") + mock_connapp._change_settings.assert_called_with("current_context", "prod") + + def test_set_context_already_set(self, mock_connapp): + cm = context_manager(mock_connapp) + with pytest.raises(SystemExit) as exc: + cm.set_context("all") + assert exc.value.code == 0 + + def test_match_regexp(self, mock_connapp): + mock_connapp.config.config["contexts"]["all"] = ["^prod", "^test"] + cm = context_manager(mock_connapp) + assert cm.match_any_regex("prod_node", cm.regex) is True + assert cm.match_any_regex("test_node", cm.regex) is True + assert cm.match_any_regex("dev_node", cm.regex) is False + + def test_modify_node_list(self, mock_connapp): + mock_connapp.config.config["contexts"]["all"] = ["^prod"] + cm = context_manager(mock_connapp) + nodes = ["prod_1", "dev_1", "prod_2"] + result = cm.modify_node_list(result=nodes) + assert result == ["prod_1", "prod_2"] + + def test_modify_node_dict(self, mock_connapp): + mock_connapp.config.config["contexts"]["all"] = ["^prod"] + cm = context_manager(mock_connapp) + nodes = {"prod_1": {}, "dev_1": {}, "prod_2": {}} + result = cm.modify_node_dict(result=nodes) + assert set(result.keys()) == {"prod_1", "prod_2"} diff --git a/connpy/tests/test_sync.py b/connpy/tests/test_sync.py new file mode 100644 index 0000000..95bc712 --- /dev/null +++ b/connpy/tests/test_sync.py @@ -0,0 +1,108 @@ +"""Tests for connpy.core_plugins.sync""" +import pytest +from unittest.mock import MagicMock, patch, mock_open +from connpy.core_plugins.sync import sync + +@pytest.fixture +def mock_connapp(): + app = MagicMock() + app.config.defaultdir = "/fake/dir" + app.config.file = "/fake/dir/config.yaml" + app.config.key = "/fake/dir/.osk" + app.config.config = {"sync": True} + return app + +class TestSyncPlugin: + def test_init(self, mock_connapp): + s = sync(mock_connapp) + assert s.sync is True + assert s.file == "/fake/dir/config.yaml" + assert s.token_file == "/fake/dir/gtoken.json" + + @patch("connpy.core_plugins.sync.os.path.exists") + @patch("connpy.core_plugins.sync.Credentials") + def test_get_credentials_success(self, MockCreds, mock_exists, mock_connapp): + mock_exists.return_value = True + mock_cred_instance = MagicMock() + mock_cred_instance.valid = True + MockCreds.from_authorized_user_file.return_value = mock_cred_instance + + s = sync(mock_connapp) + creds = s.get_credentials() + assert creds == mock_cred_instance + + @patch("connpy.core_plugins.sync.os.path.exists") + def test_get_credentials_not_found(self, mock_exists, mock_connapp): + mock_exists.return_value = False + s = sync(mock_connapp) + assert s.get_credentials() == 0 + + @patch("connpy.core_plugins.sync.zipfile.ZipFile") + @patch("connpy.core_plugins.sync.os.path.basename") + def test_compress_specific_files(self, mock_basename, MockZipFile, mock_connapp): + mock_basename.return_value = "config.yaml" + s = sync(mock_connapp) + zip_mock = MagicMock() + MockZipFile.return_value.__enter__.return_value = zip_mock + + s.compress_specific_files("/fake/zip.zip") + zip_mock.write.assert_any_call(s.file, "config.yaml") + zip_mock.write.assert_any_call(s.key, ".osk") + + @patch("connpy.core_plugins.sync.zipfile.ZipFile") + @patch("connpy.core_plugins.sync.os.path.dirname") + def test_decompress_zip_yaml(self, mock_dirname, MockZipFile, mock_connapp): + mock_dirname.return_value = "/fake/dir" + s = sync(mock_connapp) + zip_mock = MagicMock() + zip_mock.namelist.return_value = ["config.yaml", ".osk"] + MockZipFile.return_value.__enter__.return_value = zip_mock + + assert s.decompress_zip("/fake/zip.zip") == 0 + zip_mock.extract.assert_any_call("config.yaml", "/fake/dir") + zip_mock.extract.assert_any_call(".osk", "/fake/dir") + + @patch("connpy.core_plugins.sync.zipfile.ZipFile") + @patch("connpy.core_plugins.sync.os.path.dirname") + def test_decompress_zip_json_fallback(self, mock_dirname, MockZipFile, mock_connapp): + mock_dirname.return_value = "/fake/dir" + s = sync(mock_connapp) + zip_mock = MagicMock() + zip_mock.namelist.return_value = ["config.json", ".osk"] + MockZipFile.return_value.__enter__.return_value = zip_mock + + assert s.decompress_zip("/fake/old_zip.zip") == 0 + zip_mock.extract.assert_any_call("config.json", "/fake/dir") + + @patch.object(sync, "get_credentials") + @patch("connpy.core_plugins.sync.build") + def test_get_appdata_files(self, mock_build, mock_get_credentials, mock_connapp): + mock_get_credentials.return_value = MagicMock() + mock_service = MagicMock() + mock_build.return_value = mock_service + + mock_service.files().list().execute.return_value = { + "files": [ + {"id": "1", "name": "backup1.zip", "appProperties": {"timestamp": "1000", "date": "2024"}} + ] + } + + s = sync(mock_connapp) + files = s.get_appdata_files() + assert len(files) == 1 + assert files[0]["id"] == "1" + assert files[0]["timestamp"] == "1000" + + @patch.object(sync, "get_credentials") + @patch("connpy.core_plugins.sync.build") + @patch("connpy.core_plugins.sync.MediaFileUpload") + @patch("connpy.core_plugins.sync.os.path.basename") + def test_backup_file_to_drive(self, mock_basename, mock_media, mock_build, mock_get_credentials, mock_connapp): + mock_get_credentials.return_value = MagicMock() + mock_basename.return_value = "backup.zip" + mock_service = MagicMock() + mock_build.return_value = mock_service + + s = sync(mock_connapp) + assert s.backup_file_to_drive("/fake/backup.zip", 1234567890000) == 0 + mock_service.files().create.assert_called_once() diff --git a/docs/connpy/index.html b/docs/connpy/index.html index c3dd081..0b0daf6 100644 --- a/docs/connpy/index.html +++ b/docs/connpy/index.html @@ -2259,16 +2259,40 @@ class configfile: with open(pathfile, "w") as f: f.write(str(defaultdir)) configdir = defaultdir - defaultfile = configdir + '/config.json' + defaultfile = configdir + '/config.yaml' + self.cachefile = configdir + '/.config.cache.json' + self.fzf_cachefile = configdir + '/.fzf_nodes_cache.txt' defaultkey = configdir + '/.osk' if conf == None: self.file = defaultfile + + # Backwards compatibility: Migrate from JSON to YAML + legacy_json = configdir + '/config.json' + legacy_noext = configdir + '/config' + legacy_file = None + if os.path.exists(legacy_json): legacy_file = legacy_json + elif os.path.exists(legacy_noext): legacy_file = legacy_noext + + if not os.path.exists(self.file) and legacy_file: + try: + with open(legacy_file, 'r') as f: + old_data = json.load(f) + with open(self.file, 'w') as f: + yaml.dump(old_data, f, default_flow_style=False, sort_keys=False) + with open(self.cachefile, 'w') as f: + json.dump(old_data, f) + shutil.move(legacy_file, legacy_file + ".backup") + printer.success(f"Migrated legacy config ({len(old_data.get('connections',{}))} folders/nodes) into YAML and Cache successfully!") + except Exception as e: + printer.warning(f"Failed to migrate legacy config: {e}") else: self.file = conf + if key == None: self.key = defaultkey else: self.key = key + if os.path.exists(self.file): config = self._loadconfig(self.file) else: @@ -2285,23 +2309,38 @@ class configfile: def _loadconfig(self, conf): - #Loads config file - jsonconf = open(conf) - jsondata = json.load(jsonconf) - jsonconf.close() - return jsondata + #Loads config file using dual cache + cache_exists = os.path.exists(self.cachefile) + yaml_time = os.path.getmtime(conf) if os.path.exists(conf) else 0 + cache_time = os.path.getmtime(self.cachefile) if cache_exists else 0 + + if not cache_exists or yaml_time > cache_time: + with open(conf, 'r') as f: + data = yaml.safe_load(f) + try: + with open(self.cachefile, 'w') as f: + json.dump(data, f) + except Exception: + pass + return data + else: + with open(self.cachefile, 'r') as f: + return json.load(f) def _createconfig(self, conf): #Create config file defaultconfig = {'config': {'case': False, 'idletime': 30, 'fzf': False}, 'connections': {}, 'profiles': { "default": { "host":"", "protocol":"ssh", "port":"", "user":"", "password":"", "options":"", "logs":"", "tags": "", "jumphost":""}}} if not os.path.exists(conf): with open(conf, "w") as f: - json.dump(defaultconfig, f, indent = 4) - f.close() + yaml.dump(defaultconfig, f, default_flow_style=False, sort_keys=False) os.chmod(conf, 0o600) - jsonconf = open(conf) - jsondata = json.load(jsonconf) - jsonconf.close() + try: + with open(self.cachefile, 'w') as f: + json.dump(defaultconfig, f) + except Exception: + pass + with open(conf, 'r') as f: + jsondata = yaml.safe_load(f) return jsondata @MethodHook @@ -2313,13 +2352,23 @@ class configfile: newconfig["profiles"] = self.profiles try: with open(conf, "w") as f: - json.dump(newconfig, f, indent = 4) - f.close() + yaml.dump(newconfig, f, default_flow_style=False, sort_keys=False) + with open(self.cachefile, "w") as f: + json.dump(newconfig, f) + self._generate_nodes_cache() except (IOError, OSError) as e: printer.error(f"Failed to save config: {e}") return 1 return 0 + def _generate_nodes_cache(self): + try: + nodes = self._getallnodes() + with open(self.fzf_cachefile, "w") as f: + f.write("\n".join(nodes)) + except Exception: + pass + def _createkey(self, keyfile): #Create key file key = RSA.generate(2048) @@ -2538,15 +2587,15 @@ class configfile: def _getallnodes(self, filter = None): #get all nodes on configfile nodes = [] - layer1 = [k for k,v in self.connections.items() if isinstance(v, dict) and v["type"] == "connection"] - folders = [k for k,v in self.connections.items() if isinstance(v, dict) and v["type"] == "folder"] + layer1 = [k for k,v in self.connections.items() if isinstance(v, dict) and v.get("type") == "connection"] + folders = [k for k,v in self.connections.items() if isinstance(v, dict) and v.get("type") == "folder"] nodes.extend(layer1) for f in folders: - layer2 = [k + "@" + f for k,v in self.connections[f].items() if isinstance(v, dict) and v["type"] == "connection"] + layer2 = [k + "@" + f for k,v in self.connections[f].items() if isinstance(v, dict) and v.get("type") == "connection"] nodes.extend(layer2) - subfolders = [k for k,v in self.connections[f].items() if isinstance(v, dict) and v["type"] == "subfolder"] + subfolders = [k for k,v in self.connections[f].items() if isinstance(v, dict) and v.get("type") == "subfolder"] for s in subfolders: - layer3 = [k + "@" + s + "@" + f for k,v in self.connections[f][s].items() if isinstance(v, dict) and v["type"] == "connection"] + layer3 = [k + "@" + s + "@" + f for k,v in self.connections[f][s].items() if isinstance(v, dict) and v.get("type") == "connection"] nodes.extend(layer3) if filter: if isinstance(filter, str): @@ -2561,15 +2610,15 @@ class configfile: def _getallnodesfull(self, filter = None, extract = True): #get all nodes on configfile with all their attributes. nodes = {} - layer1 = {k:v for k,v in self.connections.items() if isinstance(v, dict) and v["type"] == "connection"} - folders = [k for k,v in self.connections.items() if isinstance(v, dict) and v["type"] == "folder"] + layer1 = {k:v for k,v in self.connections.items() if isinstance(v, dict) and v.get("type") == "connection"} + folders = [k for k,v in self.connections.items() if isinstance(v, dict) and v.get("type") == "folder"] nodes.update(layer1) for f in folders: - layer2 = {k + "@" + f:v for k,v in self.connections[f].items() if isinstance(v, dict) and v["type"] == "connection"} + layer2 = {k + "@" + f:v for k,v in self.connections[f].items() if isinstance(v, dict) and v.get("type") == "connection"} nodes.update(layer2) - subfolders = [k for k,v in self.connections[f].items() if isinstance(v, dict) and v["type"] == "subfolder"] + subfolders = [k for k,v in self.connections[f].items() if isinstance(v, dict) and v.get("type") == "subfolder"] for s in subfolders: - layer3 = {k + "@" + s + "@" + f:v for k,v in self.connections[f][s].items() if isinstance(v, dict) and v["type"] == "connection"} + layer3 = {k + "@" + s + "@" + f:v for k,v in self.connections[f][s].items() if isinstance(v, dict) and v.get("type") == "connection"} nodes.update(layer3) if filter: if isinstance(filter, str): @@ -2600,27 +2649,27 @@ class configfile: @MethodHook def _getallfolders(self): #get all folders on configfile - folders = ["@" + k for k,v in self.connections.items() if isinstance(v, dict) and v["type"] == "folder"] + folders = ["@" + k for k,v in self.connections.items() if isinstance(v, dict) and v.get("type") == "folder"] subfolders = [] for f in folders: - s = ["@" + k + f for k,v in self.connections[f[1:]].items() if isinstance(v, dict) and v["type"] == "subfolder"] + s = ["@" + k + f for k,v in self.connections[f[1:]].items() if isinstance(v, dict) and v.get("type") == "subfolder"] subfolders.extend(s) folders.extend(subfolders) return folders @MethodHook def _profileused(self, profile): - #Check if profile is used before deleting it + #Return all the nodes that uses this profile. nodes = [] - layer1 = [k for k,v in self.connections.items() if isinstance(v, dict) and v["type"] == "connection" and ("@" + profile in v.values() or ( isinstance(v["password"],list) and "@" + profile in v["password"]))] - folders = [k for k,v in self.connections.items() if isinstance(v, dict) and v["type"] == "folder"] + layer1 = [k for k,v in self.connections.items() if isinstance(v, dict) and v.get("type") == "connection" and ("@" + profile in v.values() or ( isinstance(v.get("password"),list) and "@" + profile in v.get("password")))] + folders = [k for k,v in self.connections.items() if isinstance(v, dict) and v.get("type") == "folder"] nodes.extend(layer1) for f in folders: - layer2 = [k + "@" + f for k,v in self.connections[f].items() if isinstance(v, dict) and v["type"] == "connection" and ("@" + profile in v.values() or ( isinstance(v["password"],list) and "@" + profile in v["password"]))] + layer2 = [k + "@" + f for k,v in self.connections[f].items() if isinstance(v, dict) and v.get("type") == "connection" and ("@" + profile in v.values() or ( isinstance(v.get("password"),list) and "@" + profile in v.get("password")))] nodes.extend(layer2) - subfolders = [k for k,v in self.connections[f].items() if isinstance(v, dict) and v["type"] == "subfolder"] + subfolders = [k for k,v in self.connections[f].items() if isinstance(v, dict) and v.get("type") == "subfolder"] for s in subfolders: - layer3 = [k + "@" + s + "@" + f for k,v in self.connections[f][s].items() if isinstance(v, dict) and v["type"] == "connection" and ("@" + profile in v.values() or ( isinstance(v["password"],list) and "@" + profile in v["password"]))] + layer3 = [k + "@" + s + "@" + f for k,v in self.connections[f][s].items() if isinstance(v, dict) and v.get("type") == "connection" and ("@" + profile in v.values() or ( isinstance(v.get("password"),list) and "@" + profile in v.get("password")))] nodes.extend(layer3) return nodes diff --git a/docs/connpy/tests/index.html b/docs/connpy/tests/index.html index afbe9c3..5b37649 100644 --- a/docs/connpy/tests/index.html +++ b/docs/connpy/tests/index.html @@ -52,6 +52,10 @@ el.replaceWith(d);

Tests for connpy.api module — Flask routes.

+
connpy.tests.test_capture
+
+

Tests for connpy.core_plugins.capture

+
connpy.tests.test_completion

Tests for connpy.completion module.

@@ -60,6 +64,10 @@ el.replaceWith(d);

Tests for connpy.configfile module.

+
connpy.tests.test_context
+
+

Tests for connpy.core_plugins.context

+
connpy.tests.test_core

Tests for connpy.core module — node and nodes classes.

@@ -76,6 +84,10 @@ el.replaceWith(d);

Tests for connpy.printer module.

+
connpy.tests.test_sync
+
+

Tests for connpy.core_plugins.sync

+
@@ -100,12 +112,15 @@ el.replaceWith(d);
  • connpy.tests.conftest
  • connpy.tests.test_ai
  • connpy.tests.test_api
  • +
  • connpy.tests.test_capture
  • connpy.tests.test_completion
  • connpy.tests.test_configfile
  • +
  • connpy.tests.test_context
  • connpy.tests.test_core
  • connpy.tests.test_hooks
  • connpy.tests.test_plugins
  • connpy.tests.test_printer
  • +
  • connpy.tests.test_sync
  • diff --git a/docs/connpy/tests/test_capture.html b/docs/connpy/tests/test_capture.html new file mode 100644 index 0000000..db68f63 --- /dev/null +++ b/docs/connpy/tests/test_capture.html @@ -0,0 +1,235 @@ + + + + + + +connpy.tests.test_capture API documentation + + + + + + + + + + + +
    +
    +
    +

    Module connpy.tests.test_capture

    +
    +
    +

    Tests for connpy.core_plugins.capture

    +
    +
    +
    +
    +
    +
    +

    Functions

    +
    +
    +def mock_connapp() +
    +
    +
    + +Expand source code + +
    @pytest.fixture
    +def mock_connapp():
    +    app = MagicMock()
    +    app.nodes_list = ["test_node"]
    +    app.config.getitem.return_value = {"host": "127.0.0.1", "protocol": "ssh"}
    +    mock_node = MagicMock()
    +    mock_node.protocol = "ssh"
    +    mock_node.unique = "test_node"
    +    app.node.return_value = mock_node
    +    app.config.config = {"wireshark_path": "/fake/ws"}
    +    return app
    +
    +
    +
    +
    +
    +
    +

    Classes

    +
    +
    +class TestRemoteCapture +
    +
    +
    + +Expand source code + +
    class TestRemoteCapture:
    +    def test_init_node_not_found(self, mock_connapp):
    +        # Attempt to capture a node not in nodes_list
    +        mock_connapp.nodes_list = ["other_node"]
    +        with pytest.raises(SystemExit) as exc:
    +            RemoteCapture(mock_connapp, "test_node", "eth0")
    +        assert exc.value.code == 2
    +
    +    def test_init_success(self, mock_connapp):
    +        rc = RemoteCapture(mock_connapp, "test_node", "eth0")
    +        assert rc.node_name == "test_node"
    +        assert rc.interface == "eth0"
    +        assert rc.wireshark_path == "/fake/ws"
    +
    +    @patch("connpy.core_plugins.capture.socket")
    +    def test_is_port_in_use(self, mock_socket, mock_connapp):
    +        rc = RemoteCapture(mock_connapp, "test_node", "eth0")
    +        mock_sock_instance = MagicMock()
    +        mock_socket.socket.return_value.__enter__.return_value = mock_sock_instance
    +        
    +        mock_sock_instance.connect_ex.return_value = 0
    +        assert rc._is_port_in_use(8080) is True
    +        
    +        mock_sock_instance.connect_ex.return_value = 1
    +        assert rc._is_port_in_use(8080) is False
    +
    +    @patch.object(RemoteCapture, "_is_port_in_use")
    +    def test_find_free_port(self, mock_is_in_use, mock_connapp):
    +        rc = RemoteCapture(mock_connapp, "test_node", "eth0")
    +        # First 2 ports in use, 3rd is free
    +        mock_is_in_use.side_effect = [True, True, False]
    +        port = rc._find_free_port(20000, 30000)
    +        assert 20000 <= port <= 30000
    +        assert mock_is_in_use.call_count == 3
    +
    +
    +

    Methods

    +
    +
    +def test_find_free_port(self, mock_is_in_use, mock_connapp) +
    +
    +
    + +Expand source code + +
    @patch.object(RemoteCapture, "_is_port_in_use")
    +def test_find_free_port(self, mock_is_in_use, mock_connapp):
    +    rc = RemoteCapture(mock_connapp, "test_node", "eth0")
    +    # First 2 ports in use, 3rd is free
    +    mock_is_in_use.side_effect = [True, True, False]
    +    port = rc._find_free_port(20000, 30000)
    +    assert 20000 <= port <= 30000
    +    assert mock_is_in_use.call_count == 3
    +
    +
    +
    +
    +def test_init_node_not_found(self, mock_connapp) +
    +
    +
    + +Expand source code + +
    def test_init_node_not_found(self, mock_connapp):
    +    # Attempt to capture a node not in nodes_list
    +    mock_connapp.nodes_list = ["other_node"]
    +    with pytest.raises(SystemExit) as exc:
    +        RemoteCapture(mock_connapp, "test_node", "eth0")
    +    assert exc.value.code == 2
    +
    +
    +
    +
    +def test_init_success(self, mock_connapp) +
    +
    +
    + +Expand source code + +
    def test_init_success(self, mock_connapp):
    +    rc = RemoteCapture(mock_connapp, "test_node", "eth0")
    +    assert rc.node_name == "test_node"
    +    assert rc.interface == "eth0"
    +    assert rc.wireshark_path == "/fake/ws"
    +
    +
    +
    +
    +def test_is_port_in_use(self, mock_socket, mock_connapp) +
    +
    +
    + +Expand source code + +
    @patch("connpy.core_plugins.capture.socket")
    +def test_is_port_in_use(self, mock_socket, mock_connapp):
    +    rc = RemoteCapture(mock_connapp, "test_node", "eth0")
    +    mock_sock_instance = MagicMock()
    +    mock_socket.socket.return_value.__enter__.return_value = mock_sock_instance
    +    
    +    mock_sock_instance.connect_ex.return_value = 0
    +    assert rc._is_port_in_use(8080) is True
    +    
    +    mock_sock_instance.connect_ex.return_value = 1
    +    assert rc._is_port_in_use(8080) is False
    +
    +
    +
    +
    +
    +
    +
    +
    + +
    + + + diff --git a/docs/connpy/tests/test_configfile.html b/docs/connpy/tests/test_configfile.html index 164ba3c..eea4bc0 100644 --- a/docs/connpy/tests/test_configfile.html +++ b/docs/connpy/tests/test_configfile.html @@ -383,9 +383,9 @@ el.replaceWith(d);
    class TestConfigfileInit:
         def test_creates_default_config(self, tmp_config_dir):
    -        """Creates config.json with defaults when it doesn't exist."""
    -        config_file = tmp_config_dir / "config.json"
    -        config_file.unlink()  # Remove existing
    +        """Creates config.yaml with defaults when it doesn't exist."""
    +        config_file = tmp_config_dir / "config.yaml"
    +        config_file.unlink(missing_ok=True)  # Remove existing
             key_file = tmp_config_dir / ".osk"
     
             from connpy.configfile import configfile
    @@ -402,7 +402,7 @@ el.replaceWith(d);
             key_file.unlink()  # Remove existing
     
             from connpy.configfile import configfile
    -        conf = configfile(conf=str(tmp_config_dir / "config.json"), key=str(key_file))
    +        conf = configfile(conf=str(tmp_config_dir / "config.yaml"), key=str(key_file))
     
             assert key_file.exists()
             assert conf.privatekey is not None
    @@ -416,8 +416,8 @@ el.replaceWith(d);
     
         def test_config_file_permissions(self, tmp_config_dir):
             """Config is created with 0o600 permissions."""
    -        config_file = tmp_config_dir / "config.json"
    -        config_file.unlink()
    +        config_file = tmp_config_dir / "config.yaml"
    +        config_file.unlink(missing_ok=True)
     
             from connpy.configfile import configfile
             configfile(conf=str(config_file), key=str(tmp_config_dir / ".osk"))
    @@ -437,7 +437,7 @@ el.replaceWith(d);
             (dot_folder / ".folder").write_text(str(config_dir))
             (dot_folder / "plugins").mkdir(exist_ok=True)
     
    -        conf_path = str(config_dir / "my_config.json")
    +        conf_path = str(config_dir / "my_config.yaml")
             key_path = str(config_dir / "my_key")
     
             from connpy.configfile import configfile
    @@ -459,8 +459,8 @@ el.replaceWith(d);
     
     
    def test_config_file_permissions(self, tmp_config_dir):
         """Config is created with 0o600 permissions."""
    -    config_file = tmp_config_dir / "config.json"
    -    config_file.unlink()
    +    config_file = tmp_config_dir / "config.yaml"
    +    config_file.unlink(missing_ok=True)
     
         from connpy.configfile import configfile
         configfile(conf=str(config_file), key=str(tmp_config_dir / ".osk"))
    @@ -479,9 +479,9 @@ el.replaceWith(d);
     Expand source code
     
     
    def test_creates_default_config(self, tmp_config_dir):
    -    """Creates config.json with defaults when it doesn't exist."""
    -    config_file = tmp_config_dir / "config.json"
    -    config_file.unlink()  # Remove existing
    +    """Creates config.yaml with defaults when it doesn't exist."""
    +    config_file = tmp_config_dir / "config.yaml"
    +    config_file.unlink(missing_ok=True)  # Remove existing
         key_file = tmp_config_dir / ".osk"
     
         from connpy.configfile import configfile
    @@ -492,7 +492,7 @@ el.replaceWith(d);
         assert conf.config["idletime"] == 30
         assert "default" in conf.profiles
    -

    Creates config.json with defaults when it doesn't exist.

    +

    Creates config.yaml with defaults when it doesn't exist.

    def test_creates_rsa_key(self, tmp_config_dir) @@ -508,7 +508,7 @@ el.replaceWith(d); key_file.unlink() # Remove existing from connpy.configfile import configfile - conf = configfile(conf=str(tmp_config_dir / "config.json"), key=str(key_file)) + conf = configfile(conf=str(tmp_config_dir / "config.yaml"), key=str(key_file)) assert key_file.exists() assert conf.privatekey is not None @@ -536,7 +536,7 @@ el.replaceWith(d); (dot_folder / ".folder").write_text(str(config_dir)) (dot_folder / "plugins").mkdir(exist_ok=True) - conf_path = str(config_dir / "my_config.json") + conf_path = str(config_dir / "my_config.yaml") key_path = str(config_dir / "my_key") from connpy.configfile import configfile @@ -846,7 +846,7 @@ el.replaceWith(d); def test_profileused(self, tmp_config_dir): """Detects nodes using a specific profile.""" - config_file = tmp_config_dir / "config.json" + config_file = tmp_config_dir / "config.yaml" data = { "config": {"case": False, "idletime": 30, "fzf": False}, "connections": { @@ -872,7 +872,7 @@ el.replaceWith(d); "options": "", "logs": "", "tags": "", "jumphost": ""} } } - config_file.write_text(json.dumps(data, indent=4)) + config_file.write_text(yaml.dump(data, default_flow_style=False, sort_keys=False)) from connpy.configfile import configfile conf = configfile(conf=str(config_file), key=str(tmp_config_dir / ".osk")) @@ -1014,7 +1014,7 @@ el.replaceWith(d);
    def test_profileused(self, tmp_config_dir):
         """Detects nodes using a specific profile."""
    -    config_file = tmp_config_dir / "config.json"
    +    config_file = tmp_config_dir / "config.yaml"
         data = {
             "config": {"case": False, "idletime": 30, "fzf": False},
             "connections": {
    @@ -1040,7 +1040,7 @@ el.replaceWith(d);
                              "options": "", "logs": "", "tags": "", "jumphost": ""}
             }
         }
    -    config_file.write_text(json.dumps(data, indent=4))
    +    config_file.write_text(yaml.dump(data, default_flow_style=False, sort_keys=False))
         from connpy.configfile import configfile
         conf = configfile(conf=str(config_file), key=str(tmp_config_dir / ".osk"))
     
    @@ -1111,7 +1111,7 @@ el.replaceWith(d);
     
         def test_getitem_with_profile_extraction(self, tmp_config_dir):
             """extract=True resolves @profile references."""
    -        config_file = tmp_config_dir / "config.json"
    +        config_file = tmp_config_dir / "config.yaml"
             data = {
                 "config": {"case": False, "idletime": 30, "fzf": False},
                 "connections": {
    @@ -1131,7 +1131,7 @@ el.replaceWith(d);
                                    "options": "", "logs": "", "tags": "", "jumphost": ""}
                 }
             }
    -        config_file.write_text(json.dumps(data, indent=4))
    +        config_file.write_text(yaml.dump(data, default_flow_style=False, sort_keys=False))
     
             from connpy.configfile import configfile
             conf = configfile(conf=str(config_file), key=str(tmp_config_dir / ".osk"))
    @@ -1235,7 +1235,7 @@ el.replaceWith(d);
     
     
    def test_getitem_with_profile_extraction(self, tmp_config_dir):
         """extract=True resolves @profile references."""
    -    config_file = tmp_config_dir / "config.json"
    +    config_file = tmp_config_dir / "config.yaml"
         data = {
             "config": {"case": False, "idletime": 30, "fzf": False},
             "connections": {
    @@ -1255,7 +1255,7 @@ el.replaceWith(d);
                                "options": "", "logs": "", "tags": "", "jumphost": ""}
             }
         }
    -    config_file.write_text(json.dumps(data, indent=4))
    +    config_file.write_text(yaml.dump(data, default_flow_style=False, sort_keys=False))
     
         from connpy.configfile import configfile
         conf = configfile(conf=str(config_file), key=str(tmp_config_dir / ".osk"))
    diff --git a/docs/connpy/tests/test_context.html b/docs/connpy/tests/test_context.html
    new file mode 100644
    index 0000000..bc4ca50
    --- /dev/null
    +++ b/docs/connpy/tests/test_context.html
    @@ -0,0 +1,475 @@
    +
    +
    +
    +
    +
    +
    +connpy.tests.test_context API documentation
    +
    +
    +
    +
    +
    +
    +
    +
    +
    +
    +
    +
    +
    +
    +

    Module connpy.tests.test_context

    +
    +
    +

    Tests for connpy.core_plugins.context

    +
    +
    +
    +
    +
    +
    +

    Functions

    +
    +
    +def mock_connapp() +
    +
    +
    + +Expand source code + +
    @pytest.fixture
    +def mock_connapp():
    +    connapp = MagicMock()
    +    connapp.config.config = {
    +        "contexts": {"all": [".*"]},
    +        "current_context": "all"
    +    }
    +    return connapp
    +
    +
    +
    +
    +
    +
    +

    Classes

    +
    +
    +class TestContextManager +
    +
    +
    + +Expand source code + +
    class TestContextManager:
    +    def test_init(self, mock_connapp):
    +        cm = context_manager(mock_connapp)
    +        assert cm.contexts == {"all": [".*"]}
    +        assert cm.current_context == "all"
    +        assert len(cm.regex) == 1
    +
    +    def test_add_context_success(self, mock_connapp):
    +        cm = context_manager(mock_connapp)
    +        cm.add_context("prod", ["^prod_.*"])
    +        assert "prod" in cm.contexts
    +        mock_connapp._change_settings.assert_called_with("contexts", cm.contexts)
    +
    +    def test_add_context_invalid_name(self, mock_connapp):
    +        cm = context_manager(mock_connapp)
    +        with pytest.raises(SystemExit) as exc:
    +            cm.add_context("prod-env", ["Regex"])
    +        assert exc.value.code == 1
    +
    +    def test_add_context_already_exists(self, mock_connapp):
    +        cm = context_manager(mock_connapp)
    +        with pytest.raises(SystemExit) as exc:
    +            cm.add_context("all", ["Regex"])
    +        assert exc.value.code == 2
    +
    +    def test_modify_context_success(self, mock_connapp):
    +        cm = context_manager(mock_connapp)
    +        cm.add_context("prod", ["old"])
    +        cm.modify_context("prod", ["new"])
    +        assert cm.contexts["prod"] == ["new"]
    +
    +    def test_modify_context_all(self, mock_connapp):
    +        cm = context_manager(mock_connapp)
    +        with pytest.raises(SystemExit) as exc:
    +            cm.modify_context("all", ["new"])
    +        assert exc.value.code == 3
    +
    +    def test_modify_context_not_exists(self, mock_connapp):
    +        cm = context_manager(mock_connapp)
    +        with pytest.raises(SystemExit) as exc:
    +            cm.modify_context("fake", ["new"])
    +        assert exc.value.code == 4
    +
    +    def test_delete_context_success(self, mock_connapp):
    +        cm = context_manager(mock_connapp)
    +        cm.add_context("prod", ["old"])
    +        cm.delete_context("prod")
    +        assert "prod" not in cm.contexts
    +
    +    def test_delete_context_all(self, mock_connapp):
    +        cm = context_manager(mock_connapp)
    +        with pytest.raises(SystemExit) as exc:
    +            cm.delete_context("all")
    +        assert exc.value.code == 3
    +
    +    def test_delete_context_current(self, mock_connapp):
    +        mock_connapp.config.config["current_context"] = "prod"
    +        mock_connapp.config.config["contexts"]["prod"] = [".*"]
    +        cm = context_manager(mock_connapp)
    +        with pytest.raises(SystemExit) as exc:
    +            cm.delete_context("prod")
    +        assert exc.value.code == 5
    +
    +    def test_set_context_success(self, mock_connapp):
    +        cm = context_manager(mock_connapp)
    +        cm.contexts["prod"] = [".*"]
    +        cm.set_context("prod")
    +        mock_connapp._change_settings.assert_called_with("current_context", "prod")
    +
    +    def test_set_context_already_set(self, mock_connapp):
    +        cm = context_manager(mock_connapp)
    +        with pytest.raises(SystemExit) as exc:
    +            cm.set_context("all")
    +        assert exc.value.code == 0
    +
    +    def test_match_regexp(self, mock_connapp):
    +        mock_connapp.config.config["contexts"]["all"] = ["^prod", "^test"]
    +        cm = context_manager(mock_connapp)
    +        assert cm.match_any_regex("prod_node", cm.regex) is True
    +        assert cm.match_any_regex("test_node", cm.regex) is True
    +        assert cm.match_any_regex("dev_node", cm.regex) is False
    +
    +    def test_modify_node_list(self, mock_connapp):
    +        mock_connapp.config.config["contexts"]["all"] = ["^prod"]
    +        cm = context_manager(mock_connapp)
    +        nodes = ["prod_1", "dev_1", "prod_2"]
    +        result = cm.modify_node_list(result=nodes)
    +        assert result == ["prod_1", "prod_2"]
    +
    +    def test_modify_node_dict(self, mock_connapp):
    +        mock_connapp.config.config["contexts"]["all"] = ["^prod"]
    +        cm = context_manager(mock_connapp)
    +        nodes = {"prod_1": {}, "dev_1": {}, "prod_2": {}}
    +        result = cm.modify_node_dict(result=nodes)
    +        assert set(result.keys()) == {"prod_1", "prod_2"}
    +
    +
    +

    Methods

    +
    +
    +def test_add_context_already_exists(self, mock_connapp) +
    +
    +
    + +Expand source code + +
    def test_add_context_already_exists(self, mock_connapp):
    +    cm = context_manager(mock_connapp)
    +    with pytest.raises(SystemExit) as exc:
    +        cm.add_context("all", ["Regex"])
    +    assert exc.value.code == 2
    +
    +
    +
    +
    +def test_add_context_invalid_name(self, mock_connapp) +
    +
    +
    + +Expand source code + +
    def test_add_context_invalid_name(self, mock_connapp):
    +    cm = context_manager(mock_connapp)
    +    with pytest.raises(SystemExit) as exc:
    +        cm.add_context("prod-env", ["Regex"])
    +    assert exc.value.code == 1
    +
    +
    +
    +
    +def test_add_context_success(self, mock_connapp) +
    +
    +
    + +Expand source code + +
    def test_add_context_success(self, mock_connapp):
    +    cm = context_manager(mock_connapp)
    +    cm.add_context("prod", ["^prod_.*"])
    +    assert "prod" in cm.contexts
    +    mock_connapp._change_settings.assert_called_with("contexts", cm.contexts)
    +
    +
    +
    +
    +def test_delete_context_all(self, mock_connapp) +
    +
    +
    + +Expand source code + +
    def test_delete_context_all(self, mock_connapp):
    +    cm = context_manager(mock_connapp)
    +    with pytest.raises(SystemExit) as exc:
    +        cm.delete_context("all")
    +    assert exc.value.code == 3
    +
    +
    +
    +
    +def test_delete_context_current(self, mock_connapp) +
    +
    +
    + +Expand source code + +
    def test_delete_context_current(self, mock_connapp):
    +    mock_connapp.config.config["current_context"] = "prod"
    +    mock_connapp.config.config["contexts"]["prod"] = [".*"]
    +    cm = context_manager(mock_connapp)
    +    with pytest.raises(SystemExit) as exc:
    +        cm.delete_context("prod")
    +    assert exc.value.code == 5
    +
    +
    +
    +
    +def test_delete_context_success(self, mock_connapp) +
    +
    +
    + +Expand source code + +
    def test_delete_context_success(self, mock_connapp):
    +    cm = context_manager(mock_connapp)
    +    cm.add_context("prod", ["old"])
    +    cm.delete_context("prod")
    +    assert "prod" not in cm.contexts
    +
    +
    +
    +
    +def test_init(self, mock_connapp) +
    +
    +
    + +Expand source code + +
    def test_init(self, mock_connapp):
    +    cm = context_manager(mock_connapp)
    +    assert cm.contexts == {"all": [".*"]}
    +    assert cm.current_context == "all"
    +    assert len(cm.regex) == 1
    +
    +
    +
    +
    +def test_match_regexp(self, mock_connapp) +
    +
    +
    + +Expand source code + +
    def test_match_regexp(self, mock_connapp):
    +    mock_connapp.config.config["contexts"]["all"] = ["^prod", "^test"]
    +    cm = context_manager(mock_connapp)
    +    assert cm.match_any_regex("prod_node", cm.regex) is True
    +    assert cm.match_any_regex("test_node", cm.regex) is True
    +    assert cm.match_any_regex("dev_node", cm.regex) is False
    +
    +
    +
    +
    +def test_modify_context_all(self, mock_connapp) +
    +
    +
    + +Expand source code + +
    def test_modify_context_all(self, mock_connapp):
    +    cm = context_manager(mock_connapp)
    +    with pytest.raises(SystemExit) as exc:
    +        cm.modify_context("all", ["new"])
    +    assert exc.value.code == 3
    +
    +
    +
    +
    +def test_modify_context_not_exists(self, mock_connapp) +
    +
    +
    + +Expand source code + +
    def test_modify_context_not_exists(self, mock_connapp):
    +    cm = context_manager(mock_connapp)
    +    with pytest.raises(SystemExit) as exc:
    +        cm.modify_context("fake", ["new"])
    +    assert exc.value.code == 4
    +
    +
    +
    +
    +def test_modify_context_success(self, mock_connapp) +
    +
    +
    + +Expand source code + +
    def test_modify_context_success(self, mock_connapp):
    +    cm = context_manager(mock_connapp)
    +    cm.add_context("prod", ["old"])
    +    cm.modify_context("prod", ["new"])
    +    assert cm.contexts["prod"] == ["new"]
    +
    +
    +
    +
    +def test_modify_node_dict(self, mock_connapp) +
    +
    +
    + +Expand source code + +
    def test_modify_node_dict(self, mock_connapp):
    +    mock_connapp.config.config["contexts"]["all"] = ["^prod"]
    +    cm = context_manager(mock_connapp)
    +    nodes = {"prod_1": {}, "dev_1": {}, "prod_2": {}}
    +    result = cm.modify_node_dict(result=nodes)
    +    assert set(result.keys()) == {"prod_1", "prod_2"}
    +
    +
    +
    +
    +def test_modify_node_list(self, mock_connapp) +
    +
    +
    + +Expand source code + +
    def test_modify_node_list(self, mock_connapp):
    +    mock_connapp.config.config["contexts"]["all"] = ["^prod"]
    +    cm = context_manager(mock_connapp)
    +    nodes = ["prod_1", "dev_1", "prod_2"]
    +    result = cm.modify_node_list(result=nodes)
    +    assert result == ["prod_1", "prod_2"]
    +
    +
    +
    +
    +def test_set_context_already_set(self, mock_connapp) +
    +
    +
    + +Expand source code + +
    def test_set_context_already_set(self, mock_connapp):
    +    cm = context_manager(mock_connapp)
    +    with pytest.raises(SystemExit) as exc:
    +        cm.set_context("all")
    +    assert exc.value.code == 0
    +
    +
    +
    +
    +def test_set_context_success(self, mock_connapp) +
    +
    +
    + +Expand source code + +
    def test_set_context_success(self, mock_connapp):
    +    cm = context_manager(mock_connapp)
    +    cm.contexts["prod"] = [".*"]
    +    cm.set_context("prod")
    +    mock_connapp._change_settings.assert_called_with("current_context", "prod")
    +
    +
    +
    +
    +
    +
    +
    +
    + +
    + + + diff --git a/docs/connpy/tests/test_sync.html b/docs/connpy/tests/test_sync.html new file mode 100644 index 0000000..d1e0b08 --- /dev/null +++ b/docs/connpy/tests/test_sync.html @@ -0,0 +1,396 @@ + + + + + + +connpy.tests.test_sync API documentation + + + + + + + + + + + +
    +
    +
    +

    Module connpy.tests.test_sync

    +
    +
    +

    Tests for connpy.core_plugins.sync

    +
    +
    +
    +
    +
    +
    +

    Functions

    +
    +
    +def mock_connapp() +
    +
    +
    + +Expand source code + +
    @pytest.fixture
    +def mock_connapp():
    +    app = MagicMock()
    +    app.config.defaultdir = "/fake/dir"
    +    app.config.file = "/fake/dir/config.yaml"
    +    app.config.key = "/fake/dir/.osk"
    +    app.config.config = {"sync": True}
    +    return app
    +
    +
    +
    +
    +
    +
    +

    Classes

    +
    +
    +class TestSyncPlugin +
    +
    +
    + +Expand source code + +
    class TestSyncPlugin:
    +    def test_init(self, mock_connapp):
    +        s = sync(mock_connapp)
    +        assert s.sync is True
    +        assert s.file == "/fake/dir/config.yaml"
    +        assert s.token_file == "/fake/dir/gtoken.json"
    +
    +    @patch("connpy.core_plugins.sync.os.path.exists")
    +    @patch("connpy.core_plugins.sync.Credentials")
    +    def test_get_credentials_success(self, MockCreds, mock_exists, mock_connapp):
    +        mock_exists.return_value = True
    +        mock_cred_instance = MagicMock()
    +        mock_cred_instance.valid = True
    +        MockCreds.from_authorized_user_file.return_value = mock_cred_instance
    +        
    +        s = sync(mock_connapp)
    +        creds = s.get_credentials()
    +        assert creds == mock_cred_instance
    +
    +    @patch("connpy.core_plugins.sync.os.path.exists")
    +    def test_get_credentials_not_found(self, mock_exists, mock_connapp):
    +        mock_exists.return_value = False
    +        s = sync(mock_connapp)
    +        assert s.get_credentials() == 0
    +
    +    @patch("connpy.core_plugins.sync.zipfile.ZipFile")
    +    @patch("connpy.core_plugins.sync.os.path.basename")
    +    def test_compress_specific_files(self, mock_basename, MockZipFile, mock_connapp):
    +        mock_basename.return_value = "config.yaml"
    +        s = sync(mock_connapp)
    +        zip_mock = MagicMock()
    +        MockZipFile.return_value.__enter__.return_value = zip_mock
    +        
    +        s.compress_specific_files("/fake/zip.zip")
    +        zip_mock.write.assert_any_call(s.file, "config.yaml")
    +        zip_mock.write.assert_any_call(s.key, ".osk")
    +
    +    @patch("connpy.core_plugins.sync.zipfile.ZipFile")
    +    @patch("connpy.core_plugins.sync.os.path.dirname")
    +    def test_decompress_zip_yaml(self, mock_dirname, MockZipFile, mock_connapp):
    +        mock_dirname.return_value = "/fake/dir"
    +        s = sync(mock_connapp)
    +        zip_mock = MagicMock()
    +        zip_mock.namelist.return_value = ["config.yaml", ".osk"]
    +        MockZipFile.return_value.__enter__.return_value = zip_mock
    +        
    +        assert s.decompress_zip("/fake/zip.zip") == 0
    +        zip_mock.extract.assert_any_call("config.yaml", "/fake/dir")
    +        zip_mock.extract.assert_any_call(".osk", "/fake/dir")
    +
    +    @patch("connpy.core_plugins.sync.zipfile.ZipFile")
    +    @patch("connpy.core_plugins.sync.os.path.dirname")
    +    def test_decompress_zip_json_fallback(self, mock_dirname, MockZipFile, mock_connapp):
    +        mock_dirname.return_value = "/fake/dir"
    +        s = sync(mock_connapp)
    +        zip_mock = MagicMock()
    +        zip_mock.namelist.return_value = ["config.json", ".osk"]
    +        MockZipFile.return_value.__enter__.return_value = zip_mock
    +        
    +        assert s.decompress_zip("/fake/old_zip.zip") == 0
    +        zip_mock.extract.assert_any_call("config.json", "/fake/dir")
    +
    +    @patch.object(sync, "get_credentials")
    +    @patch("connpy.core_plugins.sync.build")
    +    def test_get_appdata_files(self, mock_build, mock_get_credentials, mock_connapp):
    +        mock_get_credentials.return_value = MagicMock()
    +        mock_service = MagicMock()
    +        mock_build.return_value = mock_service
    +        
    +        mock_service.files().list().execute.return_value = {
    +            "files": [
    +                {"id": "1", "name": "backup1.zip", "appProperties": {"timestamp": "1000", "date": "2024"}}
    +            ]
    +        }
    +        
    +        s = sync(mock_connapp)
    +        files = s.get_appdata_files()
    +        assert len(files) == 1
    +        assert files[0]["id"] == "1"
    +        assert files[0]["timestamp"] == "1000"
    +
    +    @patch.object(sync, "get_credentials")
    +    @patch("connpy.core_plugins.sync.build")
    +    @patch("connpy.core_plugins.sync.MediaFileUpload")
    +    @patch("connpy.core_plugins.sync.os.path.basename")
    +    def test_backup_file_to_drive(self, mock_basename, mock_media, mock_build, mock_get_credentials, mock_connapp):
    +        mock_get_credentials.return_value = MagicMock()
    +        mock_basename.return_value = "backup.zip"
    +        mock_service = MagicMock()
    +        mock_build.return_value = mock_service
    +        
    +        s = sync(mock_connapp)
    +        assert s.backup_file_to_drive("/fake/backup.zip", 1234567890000) == 0
    +        mock_service.files().create.assert_called_once()
    +
    +
    +

    Methods

    +
    +
    +def test_backup_file_to_drive(self, mock_basename, mock_media, mock_build, mock_get_credentials, mock_connapp) +
    +
    +
    + +Expand source code + +
    @patch.object(sync, "get_credentials")
    +@patch("connpy.core_plugins.sync.build")
    +@patch("connpy.core_plugins.sync.MediaFileUpload")
    +@patch("connpy.core_plugins.sync.os.path.basename")
    +def test_backup_file_to_drive(self, mock_basename, mock_media, mock_build, mock_get_credentials, mock_connapp):
    +    mock_get_credentials.return_value = MagicMock()
    +    mock_basename.return_value = "backup.zip"
    +    mock_service = MagicMock()
    +    mock_build.return_value = mock_service
    +    
    +    s = sync(mock_connapp)
    +    assert s.backup_file_to_drive("/fake/backup.zip", 1234567890000) == 0
    +    mock_service.files().create.assert_called_once()
    +
    +
    +
    +
    +def test_compress_specific_files(self, mock_basename, MockZipFile, mock_connapp) +
    +
    +
    + +Expand source code + +
    @patch("connpy.core_plugins.sync.zipfile.ZipFile")
    +@patch("connpy.core_plugins.sync.os.path.basename")
    +def test_compress_specific_files(self, mock_basename, MockZipFile, mock_connapp):
    +    mock_basename.return_value = "config.yaml"
    +    s = sync(mock_connapp)
    +    zip_mock = MagicMock()
    +    MockZipFile.return_value.__enter__.return_value = zip_mock
    +    
    +    s.compress_specific_files("/fake/zip.zip")
    +    zip_mock.write.assert_any_call(s.file, "config.yaml")
    +    zip_mock.write.assert_any_call(s.key, ".osk")
    +
    +
    +
    +
    +def test_decompress_zip_json_fallback(self, mock_dirname, MockZipFile, mock_connapp) +
    +
    +
    + +Expand source code + +
    @patch("connpy.core_plugins.sync.zipfile.ZipFile")
    +@patch("connpy.core_plugins.sync.os.path.dirname")
    +def test_decompress_zip_json_fallback(self, mock_dirname, MockZipFile, mock_connapp):
    +    mock_dirname.return_value = "/fake/dir"
    +    s = sync(mock_connapp)
    +    zip_mock = MagicMock()
    +    zip_mock.namelist.return_value = ["config.json", ".osk"]
    +    MockZipFile.return_value.__enter__.return_value = zip_mock
    +    
    +    assert s.decompress_zip("/fake/old_zip.zip") == 0
    +    zip_mock.extract.assert_any_call("config.json", "/fake/dir")
    +
    +
    +
    +
    +def test_decompress_zip_yaml(self, mock_dirname, MockZipFile, mock_connapp) +
    +
    +
    + +Expand source code + +
    @patch("connpy.core_plugins.sync.zipfile.ZipFile")
    +@patch("connpy.core_plugins.sync.os.path.dirname")
    +def test_decompress_zip_yaml(self, mock_dirname, MockZipFile, mock_connapp):
    +    mock_dirname.return_value = "/fake/dir"
    +    s = sync(mock_connapp)
    +    zip_mock = MagicMock()
    +    zip_mock.namelist.return_value = ["config.yaml", ".osk"]
    +    MockZipFile.return_value.__enter__.return_value = zip_mock
    +    
    +    assert s.decompress_zip("/fake/zip.zip") == 0
    +    zip_mock.extract.assert_any_call("config.yaml", "/fake/dir")
    +    zip_mock.extract.assert_any_call(".osk", "/fake/dir")
    +
    +
    +
    +
    +def test_get_appdata_files(self, mock_build, mock_get_credentials, mock_connapp) +
    +
    +
    + +Expand source code + +
    @patch.object(sync, "get_credentials")
    +@patch("connpy.core_plugins.sync.build")
    +def test_get_appdata_files(self, mock_build, mock_get_credentials, mock_connapp):
    +    mock_get_credentials.return_value = MagicMock()
    +    mock_service = MagicMock()
    +    mock_build.return_value = mock_service
    +    
    +    mock_service.files().list().execute.return_value = {
    +        "files": [
    +            {"id": "1", "name": "backup1.zip", "appProperties": {"timestamp": "1000", "date": "2024"}}
    +        ]
    +    }
    +    
    +    s = sync(mock_connapp)
    +    files = s.get_appdata_files()
    +    assert len(files) == 1
    +    assert files[0]["id"] == "1"
    +    assert files[0]["timestamp"] == "1000"
    +
    +
    +
    +
    +def test_get_credentials_not_found(self, mock_exists, mock_connapp) +
    +
    +
    + +Expand source code + +
    @patch("connpy.core_plugins.sync.os.path.exists")
    +def test_get_credentials_not_found(self, mock_exists, mock_connapp):
    +    mock_exists.return_value = False
    +    s = sync(mock_connapp)
    +    assert s.get_credentials() == 0
    +
    +
    +
    +
    +def test_get_credentials_success(self, MockCreds, mock_exists, mock_connapp) +
    +
    +
    + +Expand source code + +
    @patch("connpy.core_plugins.sync.os.path.exists")
    +@patch("connpy.core_plugins.sync.Credentials")
    +def test_get_credentials_success(self, MockCreds, mock_exists, mock_connapp):
    +    mock_exists.return_value = True
    +    mock_cred_instance = MagicMock()
    +    mock_cred_instance.valid = True
    +    MockCreds.from_authorized_user_file.return_value = mock_cred_instance
    +    
    +    s = sync(mock_connapp)
    +    creds = s.get_credentials()
    +    assert creds == mock_cred_instance
    +
    +
    +
    +
    +def test_init(self, mock_connapp) +
    +
    +
    + +Expand source code + +
    def test_init(self, mock_connapp):
    +    s = sync(mock_connapp)
    +    assert s.sync is True
    +    assert s.file == "/fake/dir/config.yaml"
    +    assert s.token_file == "/fake/dir/gtoken.json"
    +
    +
    +
    +
    +
    +
    +
    +
    + +
    + + +