From af85051eb799d0dcf7a9713af3cffc9bfa9a6e6d Mon Sep 17 00:00:00 2001
From: Fede Luzzi Optional Parameters:
- conf (str): Path/file to config file. If left empty default
- path is ~/.config/conn/config.json
+ path is ~/.config/conn/config.yaml
- key (str): Path/file to RSA key file. If left empty default
path is ~/.config/conn/.osk
diff --git a/docs/connpy/tests/conftest.html b/docs/connpy/tests/conftest.html
index f9181aa..560bf76 100644
--- a/docs/connpy/tests/conftest.html
+++ b/docs/connpy/tests/conftest.html
@@ -58,7 +58,7 @@ No test touches ~/.config/conn/
@pytest.fixture
def ai_config(tmp_config_dir):
"""Create a configfile with AI keys configured for AI tests."""
- config_file = tmp_config_dir / "config.json"
+ config_file = tmp_config_dir / "config.yaml"
data = {
"config": {
"case": False, "idletime": 30, "fzf": False,
@@ -72,7 +72,7 @@ def ai_config(tmp_config_dir):
"connections": SAMPLE_CONNECTIONS,
"profiles": SAMPLE_PROFILES
}
- config_file.write_text(json.dumps(data, indent=4))
+ config_file.write_text(yaml.dump(data, default_flow_style=False, sort_keys=False))
from connpy.configfile import configfile
return configfile(conf=str(config_file), key=str(tmp_config_dir / ".osk"))
@@ -90,7 +90,7 @@ def ai_config(tmp_config_dir):
def config(tmp_config_dir):
"""Create a configfile instance pointing to tmp directory."""
from connpy.configfile import configfile
- conf_path = str(tmp_config_dir / "config.json")
+ conf_path = str(tmp_config_dir / "config.yaml")
key_path = str(tmp_config_dir / ".osk")
return configfile(conf=conf_path, key=key_path)
@@ -182,13 +182,13 @@ def mock_pexpect():
@pytest.fixture
def populated_config(tmp_config_dir):
"""Create a configfile with sample nodes/profiles pre-loaded."""
- config_file = tmp_config_dir / "config.json"
+ config_file = tmp_config_dir / "config.yaml"
data = {
"config": {"case": False, "idletime": 30, "fzf": False},
"connections": SAMPLE_CONNECTIONS,
"profiles": SAMPLE_PROFILES
}
- config_file.write_text(json.dumps(data, indent=4))
+ config_file.write_text(yaml.dump(data, default_flow_style=False, sort_keys=False))
from connpy.configfile import configfile
return configfile(conf=str(config_file), key=str(tmp_config_dir / ".osk"))
@@ -210,9 +210,9 @@ def tmp_config_dir(tmp_path):
plugins_dir = config_dir / "plugins"
plugins_dir.mkdir()
- # Write config.json
- config_file = config_dir / "config.json"
- config_file.write_text(json.dumps(DEFAULT_CONFIG, indent=4))
+ # Write config.yaml
+ config_file = config_dir / "config.yaml"
+ config_file.write_text(yaml.dump(DEFAULT_CONFIG, default_flow_style=False, sort_keys=False))
os.chmod(str(config_file), 0o600)
# Write .folder (points to itself)
diff --git a/docs/connpy/tests/test_configfile.html b/docs/connpy/tests/test_configfile.html
index eea4bc0..7134fb4 100644
--- a/docs/connpy/tests/test_configfile.html
+++ b/docs/connpy/tests/test_configfile.html
@@ -47,6 +47,116 @@ el.replaceWith(d);
+class TestAtomicSave
+class TestAtomicSave:
+ def test_save_creates_no_leftover_tmp(self, config):
+ """After successful save, no .tmp file remains."""
+ config._connections_add(
+ id="test123", host="1.2.3.4", protocol="ssh",
+ port="", user="", password="", options="",
+ logs="", tags="", jumphost=""
+ )
+ result = config._saveconfig(config.file)
+ assert result == 0
+ assert not os.path.exists(config.file + '.tmp')
+
+ def test_save_preserves_original_on_error(self, config):
+ """If save fails, original config file is not corrupted."""
+ import unittest.mock as mock
+
+ config._connections_add(
+ id="original_node", host="10.0.0.1", protocol="ssh",
+ port="", user="", password="", options="",
+ logs="", tags="", jumphost=""
+ )
+ config._saveconfig(config.file)
+
+ # Now add another node and make yaml.dump fail
+ config._connections_add(
+ id="new_node", host="10.0.0.2", protocol="ssh",
+ port="", user="", password="", options="",
+ logs="", tags="", jumphost=""
+ )
+
+ with mock.patch('connpy.configfile.yaml.dump', side_effect=IOError("disk full")):
+ result = config._saveconfig(config.file)
+ assert result == 1
+
+ # Original file should still be valid with original_node
+ from connpy.configfile import configfile
+ reloaded = configfile(conf=config.file, key=config.key)
+ assert "original_node" in reloaded.connections
+
+def test_save_creates_no_leftover_tmp(self, config)
+def test_save_creates_no_leftover_tmp(self, config):
+ """After successful save, no .tmp file remains."""
+ config._connections_add(
+ id="test123", host="1.2.3.4", protocol="ssh",
+ port="", user="", password="", options="",
+ logs="", tags="", jumphost=""
+ )
+ result = config._saveconfig(config.file)
+ assert result == 0
+ assert not os.path.exists(config.file + '.tmp')
+After successful save, no .tmp file remains.
+def test_save_preserves_original_on_error(self, config)
+def test_save_preserves_original_on_error(self, config):
+ """If save fails, original config file is not corrupted."""
+ import unittest.mock as mock
+
+ config._connections_add(
+ id="original_node", host="10.0.0.1", protocol="ssh",
+ port="", user="", password="", options="",
+ logs="", tags="", jumphost=""
+ )
+ config._saveconfig(config.file)
+
+ # Now add another node and make yaml.dump fail
+ config._connections_add(
+ id="new_node", host="10.0.0.2", protocol="ssh",
+ port="", user="", password="", options="",
+ logs="", tags="", jumphost=""
+ )
+
+ with mock.patch('connpy.configfile.yaml.dump', side_effect=IOError("disk full")):
+ result = config._saveconfig(config.file)
+ assert result == 1
+
+ # Original file should still be valid with original_node
+ from connpy.configfile import configfile
+ reloaded = configfile(conf=config.file, key=config.key)
+ assert "original_node" in reloaded.connections
+If save fails, original config file is not corrupted.
class TestCRUDNodes
+class TestCorruptionRecovery
+class TestCorruptionRecovery:
+ def test_corrupt_yaml_recovers_from_cache(self, tmp_config_dir):
+ """If YAML is corrupt but cache is valid, recovers from cache."""
+ config_file = tmp_config_dir / "config.yaml"
+ key_file = tmp_config_dir / ".osk"
+
+ # Write valid config with router1
+ valid_data = {
+ "config": {"case": False, "idletime": 30, "fzf": False},
+ "connections": {"router1": {"host": "10.0.0.1", "type": "connection", "protocol": "ssh", "port": "", "user": "", "password": "", "options": "", "logs": "", "tags": "", "jumphost": ""}},
+ "profiles": {"default": {"host": "", "protocol": "ssh", "port": "", "user": "", "password": "", "options": "", "logs": "", "tags": "", "jumphost": ""}}
+ }
+ config_file.write_text(yaml.dump(valid_data, default_flow_style=False, sort_keys=False))
+
+ from connpy.configfile import configfile
+ conf = configfile(conf=str(config_file), key=str(key_file))
+ # Save to populate cache at the real self.cachefile path
+ conf._saveconfig(conf.file)
+ cachefile_path = conf.cachefile
+ assert os.path.exists(cachefile_path)
+
+ # Now corrupt the YAML
+ config_file.write_text("")
+ import time; time.sleep(0.05) # Ensure YAML is newer than cache
+
+ # Reload - should recover from cache
+ conf2 = configfile(conf=str(config_file), key=str(key_file))
+ assert "router1" in conf2.connections
+ assert conf2.connections["router1"]["host"] == "10.0.0.1"
+
+ def test_corrupt_cache_uses_yaml(self, tmp_config_dir):
+ """If cache is corrupt but YAML is valid, uses YAML."""
+ config_file = tmp_config_dir / "config.yaml"
+ key_file = tmp_config_dir / ".osk"
+
+ valid_data = {
+ "config": {"case": False, "idletime": 30, "fzf": False},
+ "connections": {},
+ "profiles": {"default": {"host": "", "protocol": "ssh", "port": "", "user": "", "password": "", "options": "", "logs": "", "tags": "", "jumphost": ""}}
+ }
+ config_file.write_text(yaml.dump(valid_data, default_flow_style=False, sort_keys=False))
+
+ from connpy.configfile import configfile
+ conf = configfile(conf=str(config_file), key=str(key_file))
+ cachefile_path = conf.cachefile
+
+ # Now corrupt the cache (valid JSON but invalid config structure)
+ from pathlib import Path
+ Path(cachefile_path).write_text(json.dumps({"garbage": True}))
+ # Make cache newer than YAML to force cache path
+ import time; time.sleep(0.05)
+ os.utime(cachefile_path, None)
+
+ conf2 = configfile(conf=str(config_file), key=str(key_file))
+ assert conf2.config["case"] == False
+ assert "default" in conf2.profiles
+
+ def test_both_corrupt_creates_default(self, tmp_config_dir):
+ """If both YAML and cache are corrupt, creates fresh config."""
+ config_file = tmp_config_dir / "config.yaml"
+ key_file = tmp_config_dir / ".osk"
+
+ from connpy.configfile import configfile
+ conf = configfile(conf=str(config_file), key=str(key_file))
+ cachefile_path = conf.cachefile
+
+ # Corrupt YAML
+ config_file.write_text("")
+ # Corrupt cache
+ from pathlib import Path
+ Path(cachefile_path).write_text(json.dumps({"garbage": True}))
+ import time; time.sleep(0.05)
+ os.utime(str(config_file), None)
+
+ conf2 = configfile(conf=str(config_file), key=str(key_file))
+
+ # Should get defaults, not crash
+ assert conf2.config is not None
+ assert "default" in conf2.profiles
+ assert isinstance(conf2.connections, dict)
+
+def test_both_corrupt_creates_default(self, tmp_config_dir)
+def test_both_corrupt_creates_default(self, tmp_config_dir):
+ """If both YAML and cache are corrupt, creates fresh config."""
+ config_file = tmp_config_dir / "config.yaml"
+ key_file = tmp_config_dir / ".osk"
+
+ from connpy.configfile import configfile
+ conf = configfile(conf=str(config_file), key=str(key_file))
+ cachefile_path = conf.cachefile
+
+ # Corrupt YAML
+ config_file.write_text("")
+ # Corrupt cache
+ from pathlib import Path
+ Path(cachefile_path).write_text(json.dumps({"garbage": True}))
+ import time; time.sleep(0.05)
+ os.utime(str(config_file), None)
+
+ conf2 = configfile(conf=str(config_file), key=str(key_file))
+
+ # Should get defaults, not crash
+ assert conf2.config is not None
+ assert "default" in conf2.profiles
+ assert isinstance(conf2.connections, dict)
+If both YAML and cache are corrupt, creates fresh config.
+def test_corrupt_cache_uses_yaml(self, tmp_config_dir)
+def test_corrupt_cache_uses_yaml(self, tmp_config_dir):
+ """If cache is corrupt but YAML is valid, uses YAML."""
+ config_file = tmp_config_dir / "config.yaml"
+ key_file = tmp_config_dir / ".osk"
+
+ valid_data = {
+ "config": {"case": False, "idletime": 30, "fzf": False},
+ "connections": {},
+ "profiles": {"default": {"host": "", "protocol": "ssh", "port": "", "user": "", "password": "", "options": "", "logs": "", "tags": "", "jumphost": ""}}
+ }
+ config_file.write_text(yaml.dump(valid_data, default_flow_style=False, sort_keys=False))
+
+ from connpy.configfile import configfile
+ conf = configfile(conf=str(config_file), key=str(key_file))
+ cachefile_path = conf.cachefile
+
+ # Now corrupt the cache (valid JSON but invalid config structure)
+ from pathlib import Path
+ Path(cachefile_path).write_text(json.dumps({"garbage": True}))
+ # Make cache newer than YAML to force cache path
+ import time; time.sleep(0.05)
+ os.utime(cachefile_path, None)
+
+ conf2 = configfile(conf=str(config_file), key=str(key_file))
+ assert conf2.config["case"] == False
+ assert "default" in conf2.profiles
+If cache is corrupt but YAML is valid, uses YAML.
+def test_corrupt_yaml_recovers_from_cache(self, tmp_config_dir)
+def test_corrupt_yaml_recovers_from_cache(self, tmp_config_dir):
+ """If YAML is corrupt but cache is valid, recovers from cache."""
+ config_file = tmp_config_dir / "config.yaml"
+ key_file = tmp_config_dir / ".osk"
+
+ # Write valid config with router1
+ valid_data = {
+ "config": {"case": False, "idletime": 30, "fzf": False},
+ "connections": {"router1": {"host": "10.0.0.1", "type": "connection", "protocol": "ssh", "port": "", "user": "", "password": "", "options": "", "logs": "", "tags": "", "jumphost": ""}},
+ "profiles": {"default": {"host": "", "protocol": "ssh", "port": "", "user": "", "password": "", "options": "", "logs": "", "tags": "", "jumphost": ""}}
+ }
+ config_file.write_text(yaml.dump(valid_data, default_flow_style=False, sort_keys=False))
+
+ from connpy.configfile import configfile
+ conf = configfile(conf=str(config_file), key=str(key_file))
+ # Save to populate cache at the real self.cachefile path
+ conf._saveconfig(conf.file)
+ cachefile_path = conf.cachefile
+ assert os.path.exists(cachefile_path)
+
+ # Now corrupt the YAML
+ config_file.write_text("")
+ import time; time.sleep(0.05) # Ensure YAML is newer than cache
+
+ # Reload - should recover from cache
+ conf2 = configfile(conf=str(config_file), key=str(key_file))
+ assert "router1" in conf2.connections
+ assert conf2.connections["router1"]["host"] == "10.0.0.1"
+If YAML is corrupt but cache is valid, recovers from cache.
class TestEncryption
+class TestMigrationSafety
+class TestMigrationSafety:
+ def test_migration_validates_legacy_data(self, tmp_path):
+ """Migration skips invalid legacy JSON files."""
+ from unittest.mock import patch
+ config_dir = tmp_path / ".config" / "conn"
+ config_dir.mkdir(parents=True)
+ (config_dir / "plugins").mkdir()
+
+ # Write .folder
+ (config_dir / ".folder").write_text(str(config_dir))
+
+ # Generate RSA key
+ from Crypto.PublicKey import RSA
+ key = RSA.generate(2048)
+ key_file = config_dir / ".osk"
+ key_file.write_bytes(key.export_key("PEM"))
+ os.chmod(str(key_file), 0o600)
+
+ # Write invalid JSON config (missing required keys)
+ legacy_file = config_dir / "config.json"
+ legacy_file.write_text(json.dumps({"garbage": True}))
+
+ with patch("os.path.expanduser", return_value=str(tmp_path)):
+ from connpy.configfile import configfile
+ conf = configfile(key=str(key_file))
+
+ # Legacy file should NOT have been moved to .backup
+ assert legacy_file.exists()
+ assert not (config_dir / "config.json.backup").exists()
+
+ def test_migration_verifies_written_yaml(self, tmp_path):
+ """Migration succeeds when legacy JSON is valid."""
+ from unittest.mock import patch
+ config_dir = tmp_path / ".config" / "conn"
+ config_dir.mkdir(parents=True)
+ (config_dir / "plugins").mkdir()
+
+ # Write .folder
+ (config_dir / ".folder").write_text(str(config_dir))
+
+ # Generate RSA key
+ from Crypto.PublicKey import RSA
+ key = RSA.generate(2048)
+ key_file = config_dir / ".osk"
+ key_file.write_bytes(key.export_key("PEM"))
+ os.chmod(str(key_file), 0o600)
+
+ valid_data = {
+ "config": {"case": False, "idletime": 30, "fzf": False},
+ "connections": {"r1": {"host": "1.2.3.4", "type": "connection", "protocol": "ssh", "port": "", "user": "", "password": "", "options": "", "logs": "", "tags": "", "jumphost": ""}},
+ "profiles": {"default": {"host": "", "protocol": "ssh", "port": "", "user": "", "password": "", "options": "", "logs": "", "tags": "", "jumphost": ""}}
+ }
+ legacy_file = config_dir / "config.json"
+ legacy_file.write_text(json.dumps(valid_data))
+
+ with patch("os.path.expanduser", return_value=str(tmp_path)):
+ from connpy.configfile import configfile
+ conf = configfile(key=str(key_file))
+
+ # Migration should have succeeded: YAML exists, JSON backed up
+ yaml_file = config_dir / "config.yaml"
+ assert yaml_file.exists()
+ assert (config_dir / "config.json.backup").exists()
+ assert not legacy_file.exists()
+ assert "r1" in conf.connections
+
+def test_migration_validates_legacy_data(self, tmp_path)
+def test_migration_validates_legacy_data(self, tmp_path):
+ """Migration skips invalid legacy JSON files."""
+ from unittest.mock import patch
+ config_dir = tmp_path / ".config" / "conn"
+ config_dir.mkdir(parents=True)
+ (config_dir / "plugins").mkdir()
+
+ # Write .folder
+ (config_dir / ".folder").write_text(str(config_dir))
+
+ # Generate RSA key
+ from Crypto.PublicKey import RSA
+ key = RSA.generate(2048)
+ key_file = config_dir / ".osk"
+ key_file.write_bytes(key.export_key("PEM"))
+ os.chmod(str(key_file), 0o600)
+
+ # Write invalid JSON config (missing required keys)
+ legacy_file = config_dir / "config.json"
+ legacy_file.write_text(json.dumps({"garbage": True}))
+
+ with patch("os.path.expanduser", return_value=str(tmp_path)):
+ from connpy.configfile import configfile
+ conf = configfile(key=str(key_file))
+
+ # Legacy file should NOT have been moved to .backup
+ assert legacy_file.exists()
+ assert not (config_dir / "config.json.backup").exists()
+Migration skips invalid legacy JSON files.
+def test_migration_verifies_written_yaml(self, tmp_path)
+def test_migration_verifies_written_yaml(self, tmp_path):
+ """Migration succeeds when legacy JSON is valid."""
+ from unittest.mock import patch
+ config_dir = tmp_path / ".config" / "conn"
+ config_dir.mkdir(parents=True)
+ (config_dir / "plugins").mkdir()
+
+ # Write .folder
+ (config_dir / ".folder").write_text(str(config_dir))
+
+ # Generate RSA key
+ from Crypto.PublicKey import RSA
+ key = RSA.generate(2048)
+ key_file = config_dir / ".osk"
+ key_file.write_bytes(key.export_key("PEM"))
+ os.chmod(str(key_file), 0o600)
+
+ valid_data = {
+ "config": {"case": False, "idletime": 30, "fzf": False},
+ "connections": {"r1": {"host": "1.2.3.4", "type": "connection", "protocol": "ssh", "port": "", "user": "", "password": "", "options": "", "logs": "", "tags": "", "jumphost": ""}},
+ "profiles": {"default": {"host": "", "protocol": "ssh", "port": "", "user": "", "password": "", "options": "", "logs": "", "tags": "", "jumphost": ""}}
+ }
+ legacy_file = config_dir / "config.json"
+ legacy_file.write_text(json.dumps(valid_data))
+
+ with patch("os.path.expanduser", return_value=str(tmp_path)):
+ from connpy.configfile import configfile
+ conf = configfile(key=str(key_file))
+
+ # Migration should have succeeded: YAML exists, JSON backed up
+ yaml_file = config_dir / "config.yaml"
+ assert yaml_file.exists()
+ assert (config_dir / "config.json.backup").exists()
+ assert not legacy_file.exists()
+ assert "r1" in conf.connections
+Migration succeeds when legacy JSON is valid.
+class TestValidateConfig
+class TestValidateConfig:
+ def test_valid_config(self, config):
+ data = {"config": {}, "connections": {}, "profiles": {}}
+ assert config._validate_config(data) == True
+
+ def test_none_data(self, config):
+ assert config._validate_config(None) == False
+
+ def test_string_data(self, config):
+ assert config._validate_config("not a dict") == False
+
+ def test_missing_key(self, config):
+ assert config._validate_config({"config": {}, "connections": {}}) == False
+
+ def test_empty_dict(self, config):
+ assert config._validate_config({}) == False
+
+def test_empty_dict(self, config)
+def test_empty_dict(self, config):
+ assert config._validate_config({}) == False
+
+def test_missing_key(self, config)
+def test_missing_key(self, config):
+ assert config._validate_config({"config": {}, "connections": {}}) == False
+
+def test_none_data(self, config)
+def test_none_data(self, config):
+ assert config._validate_config(None) == False
+
+def test_string_data(self, config)
+def test_string_data(self, config):
+ assert config._validate_config("not a dict") == False
+
+def test_valid_config(self, config)
+def test_valid_config(self, config):
+ data = {"config": {}, "connections": {}, "profiles": {}}
+ assert config._validate_config(data) == True
+TestAtomicSaveTestCRUDNodestest_add_folderTestCorruptionRecoveryTestEncryptiontest_encrypt_decrypt_roundtriptest_getitems_multipleTestMigrationSafetyTestValidateConfig