Module connpy.services.plugin_service

Classes

class PluginService (config=None)
Expand source code
class PluginService(BaseService):
    """Business logic for enabling, disabling, and listing plugins."""

    def list_plugins(self):
        """List all core and user-defined plugins with their status and hash."""
        import os
        import hashlib
        
        # Check for user plugins directory
        plugin_dir = os.path.join(self.config.defaultdir, "plugins")
        # Check for core plugins directory
        core_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), "..", "core_plugins")
        
        all_plugin_info = {}

        def get_hash(path):
            try:
                with open(path, "rb") as f:
                    return hashlib.md5(f.read()).hexdigest()
            except Exception:
                return ""

        # User plugins
        if os.path.exists(plugin_dir):
            for f in os.listdir(plugin_dir):
                if f.endswith(".py"):
                    name = f[:-3]
                    path = os.path.join(plugin_dir, f)
                    all_plugin_info[name] = {"enabled": True, "hash": get_hash(path)}
                elif f.endswith(".py.bkp"):
                    name = f[:-7]
                    all_plugin_info[name] = {"enabled": False}

        return all_plugin_info

    def add_plugin(self, name, source_file, update=False):
        """Add or update a plugin from a local file."""
        import os
        import shutil
        from connpy.plugins import Plugins

        if not name.isalpha() or not name.islower() or len(name) > 15:
            raise InvalidConfigurationError("Plugin name should be lowercase letters up to 15 characters.")

        p_manager = Plugins()
        # Check for bad script
        error = p_manager.verify_script(source_file)
        if error:
            raise InvalidConfigurationError(f"Invalid plugin script: {error}")

        self._save_plugin_file(name, source_file, update, is_path=True)

    def add_plugin_from_bytes(self, name, content, update=False):
        """Add or update a plugin from bytes (gRPC)."""
        import tempfile
        import os
        
        if not name.isalpha() or not name.islower() or len(name) > 15:
            raise InvalidConfigurationError("Plugin name should be lowercase letters up to 15 characters.")

        # Write to temp file to verify script
        with tempfile.NamedTemporaryFile(suffix=".py", delete=False) as tmp:
            tmp.write(content)
            tmp_path = tmp.name

        try:
            from connpy.plugins import Plugins
            p_manager = Plugins()
            error = p_manager.verify_script(tmp_path)
            if error:
                raise InvalidConfigurationError(f"Invalid plugin script: {error}")
            
            self._save_plugin_file(name, tmp_path, update, is_path=True)
        finally:
            if os.path.exists(tmp_path):
                os.remove(tmp_path)

    def _save_plugin_file(self, name, source, update=False, is_path=True):
        import os
        import shutil
        
        plugin_dir = os.path.join(self.config.defaultdir, "plugins")
        os.makedirs(plugin_dir, exist_ok=True)
        
        target_file = os.path.join(plugin_dir, f"{name}.py")
        backup_file = f"{target_file}.bkp"

        if not update and (os.path.exists(target_file) or os.path.exists(backup_file)):
            raise InvalidConfigurationError(f"Plugin '{name}' already exists.")

        try:
            if is_path:
                shutil.copy2(source, target_file)
            else:
                with open(target_file, "wb") as f:
                    f.write(source)
        except OSError as e:
            raise InvalidConfigurationError(f"Failed to save plugin file: {e}")

    def delete_plugin(self, name):
        """Remove a plugin file permanently."""
        import os
        plugin_file = os.path.join(self.config.defaultdir, "plugins", f"{name}.py")
        disabled_file = f"{plugin_file}.bkp"

        deleted = False
        for f in [plugin_file, disabled_file]:
            if os.path.exists(f):
                try:
                    os.remove(f)
                    deleted = True
                except OSError as e:
                    raise InvalidConfigurationError(f"Failed to delete plugin file '{f}': {e}")
        
        if not deleted:
            raise InvalidConfigurationError(f"Plugin '{name}' not found.")

    def enable_plugin(self, name):
        """Activate a plugin by renaming its backup file."""
        import os
        plugin_file = os.path.join(self.config.defaultdir, "plugins", f"{name}.py")
        disabled_file = f"{plugin_file}.bkp"
        
        if os.path.exists(plugin_file):
            return False # Already enabled
            
        if not os.path.exists(disabled_file):
            raise InvalidConfigurationError(f"Plugin '{name}' not found.")
            
        try:
            os.rename(disabled_file, plugin_file)
            return True
        except OSError as e:
            raise InvalidConfigurationError(f"Failed to enable plugin '{name}': {e}")

    def disable_plugin(self, name):
        """Deactivate a plugin by renaming it to a backup file."""
        import os
        plugin_file = os.path.join(self.config.defaultdir, "plugins", f"{name}.py")
        disabled_file = f"{plugin_file}.bkp"
        
        if os.path.exists(disabled_file):
            return False # Already disabled
            
        if not os.path.exists(plugin_file):
            raise InvalidConfigurationError(f"Plugin '{name}' not found or is a core plugin.")
            
        try:
            os.rename(plugin_file, disabled_file)
            return True
        except OSError as e:
            raise InvalidConfigurationError(f"Failed to disable plugin '{name}': {e}")

    def get_plugin_source(self, name):
        import os
        from ..services.exceptions import InvalidConfigurationError
        
        plugin_file = os.path.join(self.config.defaultdir, "plugins", f"{name}.py")
        core_path = os.path.dirname(os.path.realpath(__file__)) + f"/../core_plugins/{name}.py"
        
        if os.path.exists(plugin_file):
            target = plugin_file
        elif os.path.exists(core_path):
            target = core_path
        else:
            raise InvalidConfigurationError(f"Plugin '{name}' not found")
        
        with open(target, "r") as f:
            return f.read()

    def invoke_plugin(self, name, args_dict):
        import sys, io
        from argparse import Namespace
        from ..services.exceptions import InvalidConfigurationError
        from connpy.plugins import Plugins
        class MockApp:
            def __init__(self, config):
                from ..core import node, nodes
                from ..ai import ai
                from ..services.provider import ServiceProvider
                
                self.config = config
                self.node = node
                self.nodes = nodes
                self.ai = ai
                
                self.services = ServiceProvider(config, mode="local")
                try:
                    self.nodes_list = self.services.nodes.list_nodes()
                    self.folders = self.services.nodes.list_folders()
                    self.profiles = self.services.profiles.list_profiles()
                except Exception:
                    self.nodes_list = {}
                    self.folders = {}
                    self.profiles = {}
        
        args = Namespace(**args_dict)
        
        p_manager = Plugins()
        import os
        plugin_file = os.path.join(self.config.defaultdir, "plugins", f"{name}.py")
        core_path = os.path.dirname(os.path.realpath(__file__)) + f"/../core_plugins/{name}.py"
        
        if os.path.exists(plugin_file):
            target = plugin_file
        elif os.path.exists(core_path):
            target = core_path
        else:
            raise InvalidConfigurationError(f"Plugin '{name}' not found")
            
        module = p_manager._import_from_path(target)
        parser = module.Parser().parser if hasattr(module, "Parser") else None
        
        if "__func_name__" in args_dict and hasattr(module, args_dict["__func_name__"]):
            args.func = getattr(module, args_dict["__func_name__"])
        
        app = MockApp(self.config)
        
        from .. import printer
        from rich.console import Console
        
        buf = io.StringIO()
        old_console = printer.console
        old_err_console = printer.err_console
        
        printer.console = Console(file=buf, theme=printer.connpy_theme, force_terminal=True)
        printer.err_console = Console(file=buf, theme=printer.connpy_theme, force_terminal=True)
        
        old_stdout = sys.stdout
        sys.stdout = buf
        
        try:
            if hasattr(module, "Entrypoint"):
                module.Entrypoint(args, parser, app)
        except Exception as e:
            import traceback
            printer.err_console.print(traceback.format_exc())
        finally:
            sys.stdout = old_stdout
            printer.console = old_console
            printer.err_console = old_err_console
            
        for line in buf.getvalue().splitlines(keepends=True):
            yield line

Business logic for enabling, disabling, and listing plugins.

Initialize the service.

Args

config
An instance of configfile (or None to instantiate a new one/use global context).

Ancestors

Methods

def add_plugin(self, name, source_file, update=False)
Expand source code
def add_plugin(self, name, source_file, update=False):
    """Add or update a plugin from a local file."""
    import os
    import shutil
    from connpy.plugins import Plugins

    if not name.isalpha() or not name.islower() or len(name) > 15:
        raise InvalidConfigurationError("Plugin name should be lowercase letters up to 15 characters.")

    p_manager = Plugins()
    # Check for bad script
    error = p_manager.verify_script(source_file)
    if error:
        raise InvalidConfigurationError(f"Invalid plugin script: {error}")

    self._save_plugin_file(name, source_file, update, is_path=True)

Add or update a plugin from a local file.

def add_plugin_from_bytes(self, name, content, update=False)
Expand source code
def add_plugin_from_bytes(self, name, content, update=False):
    """Add or update a plugin from bytes (gRPC)."""
    import tempfile
    import os
    
    if not name.isalpha() or not name.islower() or len(name) > 15:
        raise InvalidConfigurationError("Plugin name should be lowercase letters up to 15 characters.")

    # Write to temp file to verify script
    with tempfile.NamedTemporaryFile(suffix=".py", delete=False) as tmp:
        tmp.write(content)
        tmp_path = tmp.name

    try:
        from connpy.plugins import Plugins
        p_manager = Plugins()
        error = p_manager.verify_script(tmp_path)
        if error:
            raise InvalidConfigurationError(f"Invalid plugin script: {error}")
        
        self._save_plugin_file(name, tmp_path, update, is_path=True)
    finally:
        if os.path.exists(tmp_path):
            os.remove(tmp_path)

Add or update a plugin from bytes (gRPC).

def delete_plugin(self, name)
Expand source code
def delete_plugin(self, name):
    """Remove a plugin file permanently."""
    import os
    plugin_file = os.path.join(self.config.defaultdir, "plugins", f"{name}.py")
    disabled_file = f"{plugin_file}.bkp"

    deleted = False
    for f in [plugin_file, disabled_file]:
        if os.path.exists(f):
            try:
                os.remove(f)
                deleted = True
            except OSError as e:
                raise InvalidConfigurationError(f"Failed to delete plugin file '{f}': {e}")
    
    if not deleted:
        raise InvalidConfigurationError(f"Plugin '{name}' not found.")

Remove a plugin file permanently.

def disable_plugin(self, name)
Expand source code
def disable_plugin(self, name):
    """Deactivate a plugin by renaming it to a backup file."""
    import os
    plugin_file = os.path.join(self.config.defaultdir, "plugins", f"{name}.py")
    disabled_file = f"{plugin_file}.bkp"
    
    if os.path.exists(disabled_file):
        return False # Already disabled
        
    if not os.path.exists(plugin_file):
        raise InvalidConfigurationError(f"Plugin '{name}' not found or is a core plugin.")
        
    try:
        os.rename(plugin_file, disabled_file)
        return True
    except OSError as e:
        raise InvalidConfigurationError(f"Failed to disable plugin '{name}': {e}")

Deactivate a plugin by renaming it to a backup file.

def enable_plugin(self, name)
Expand source code
def enable_plugin(self, name):
    """Activate a plugin by renaming its backup file."""
    import os
    plugin_file = os.path.join(self.config.defaultdir, "plugins", f"{name}.py")
    disabled_file = f"{plugin_file}.bkp"
    
    if os.path.exists(plugin_file):
        return False # Already enabled
        
    if not os.path.exists(disabled_file):
        raise InvalidConfigurationError(f"Plugin '{name}' not found.")
        
    try:
        os.rename(disabled_file, plugin_file)
        return True
    except OSError as e:
        raise InvalidConfigurationError(f"Failed to enable plugin '{name}': {e}")

Activate a plugin by renaming its backup file.

def get_plugin_source(self, name)
Expand source code
def get_plugin_source(self, name):
    import os
    from ..services.exceptions import InvalidConfigurationError
    
    plugin_file = os.path.join(self.config.defaultdir, "plugins", f"{name}.py")
    core_path = os.path.dirname(os.path.realpath(__file__)) + f"/../core_plugins/{name}.py"
    
    if os.path.exists(plugin_file):
        target = plugin_file
    elif os.path.exists(core_path):
        target = core_path
    else:
        raise InvalidConfigurationError(f"Plugin '{name}' not found")
    
    with open(target, "r") as f:
        return f.read()
def invoke_plugin(self, name, args_dict)
Expand source code
def invoke_plugin(self, name, args_dict):
    import sys, io
    from argparse import Namespace
    from ..services.exceptions import InvalidConfigurationError
    from connpy.plugins import Plugins
    class MockApp:
        def __init__(self, config):
            from ..core import node, nodes
            from ..ai import ai
            from ..services.provider import ServiceProvider
            
            self.config = config
            self.node = node
            self.nodes = nodes
            self.ai = ai
            
            self.services = ServiceProvider(config, mode="local")
            try:
                self.nodes_list = self.services.nodes.list_nodes()
                self.folders = self.services.nodes.list_folders()
                self.profiles = self.services.profiles.list_profiles()
            except Exception:
                self.nodes_list = {}
                self.folders = {}
                self.profiles = {}
    
    args = Namespace(**args_dict)
    
    p_manager = Plugins()
    import os
    plugin_file = os.path.join(self.config.defaultdir, "plugins", f"{name}.py")
    core_path = os.path.dirname(os.path.realpath(__file__)) + f"/../core_plugins/{name}.py"
    
    if os.path.exists(plugin_file):
        target = plugin_file
    elif os.path.exists(core_path):
        target = core_path
    else:
        raise InvalidConfigurationError(f"Plugin '{name}' not found")
        
    module = p_manager._import_from_path(target)
    parser = module.Parser().parser if hasattr(module, "Parser") else None
    
    if "__func_name__" in args_dict and hasattr(module, args_dict["__func_name__"]):
        args.func = getattr(module, args_dict["__func_name__"])
    
    app = MockApp(self.config)
    
    from .. import printer
    from rich.console import Console
    
    buf = io.StringIO()
    old_console = printer.console
    old_err_console = printer.err_console
    
    printer.console = Console(file=buf, theme=printer.connpy_theme, force_terminal=True)
    printer.err_console = Console(file=buf, theme=printer.connpy_theme, force_terminal=True)
    
    old_stdout = sys.stdout
    sys.stdout = buf
    
    try:
        if hasattr(module, "Entrypoint"):
            module.Entrypoint(args, parser, app)
    except Exception as e:
        import traceback
        printer.err_console.print(traceback.format_exc())
    finally:
        sys.stdout = old_stdout
        printer.console = old_console
        printer.err_console = old_err_console
        
    for line in buf.getvalue().splitlines(keepends=True):
        yield line
def list_plugins(self)
Expand source code
def list_plugins(self):
    """List all core and user-defined plugins with their status and hash."""
    import os
    import hashlib
    
    # Check for user plugins directory
    plugin_dir = os.path.join(self.config.defaultdir, "plugins")
    # Check for core plugins directory
    core_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), "..", "core_plugins")
    
    all_plugin_info = {}

    def get_hash(path):
        try:
            with open(path, "rb") as f:
                return hashlib.md5(f.read()).hexdigest()
        except Exception:
            return ""

    # User plugins
    if os.path.exists(plugin_dir):
        for f in os.listdir(plugin_dir):
            if f.endswith(".py"):
                name = f[:-3]
                path = os.path.join(plugin_dir, f)
                all_plugin_info[name] = {"enabled": True, "hash": get_hash(path)}
            elif f.endswith(".py.bkp"):
                name = f[:-7]
                all_plugin_info[name] = {"enabled": False}

    return all_plugin_info

List all core and user-defined plugins with their status and hash.

Inherited members