multiuser plugins + fixes

This commit is contained in:
2026-05-28 15:23:39 -03:00
parent f5e09a55ab
commit 1b9751bd23
3 changed files with 327 additions and 42 deletions
+5 -1
View File
@@ -162,7 +162,11 @@ class NodeServicer(connpy_pb2_grpc.NodeServiceServicer):
if "tags" in params: if "tags" in params:
n.tags = params["tags"] n.tags = params["tags"]
else: else:
node_data = user_config.getitem(unique_id, extract=False) try:
node_data = user_config.getitem(unique_id, extract=False)
except (KeyError, TypeError):
node_data = None
if not node_data: if not node_data:
context.abort(grpc.StatusCode.NOT_FOUND, f"Node {unique_id} not found") context.abort(grpc.StatusCode.NOT_FOUND, f"Node {unique_id} not found")
resolved_data = profile_service.resolve_node_data(node_data) resolved_data = profile_service.resolve_node_data(node_data)
+124 -41
View File
@@ -7,16 +7,47 @@ from .exceptions import InvalidConfigurationError, NodeNotFoundError
class PluginService(BaseService): class PluginService(BaseService):
"""Business logic for enabling, disabling, and listing plugins.""" """Business logic for enabling, disabling, and listing plugins."""
def _get_plugin_path(self, name, include_disabled=True):
"""Resolves the physical path of a plugin by name. Priority: user, shared/global, core."""
import os
# 1. User directory
user_dir = os.path.join(self.config.defaultdir, "plugins")
if os.path.exists(user_dir):
p_file = os.path.join(user_dir, f"{name}.py")
if os.path.exists(p_file):
return p_file, "user", True
if include_disabled:
bkp_file = os.path.join(user_dir, f"{name}.py.bkp")
if os.path.exists(bkp_file):
return bkp_file, "user", False
# 2. Shared/Global directory
if hasattr(self.config, "_shared_config") and self.config._shared_config:
shared_dir = os.path.join(self.config._shared_config.defaultdir, "plugins")
if os.path.exists(shared_dir):
p_file = os.path.join(shared_dir, f"{name}.py")
if os.path.exists(p_file):
return p_file, "shared", True
if include_disabled:
bkp_file = os.path.join(shared_dir, f"{name}.py.bkp")
if os.path.exists(bkp_file):
return bkp_file, "shared", False
# 3. Core plugins
core_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), "..", "core_plugins")
p_file = os.path.join(core_dir, f"{name}.py")
if os.path.exists(p_file):
return p_file, "core", True
return None, None, False
def list_plugins(self): def list_plugins(self):
"""List all core and user-defined plugins with their status and hash.""" """List all core and user-defined plugins with their status and hash."""
import os import os
import hashlib import hashlib
# Check for user plugins directory
plugin_dir = os.path.join(self.config.defaultdir, "plugins")
# Check for core plugins directory
core_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), "..", "core_plugins")
all_plugin_info = {} all_plugin_info = {}
def get_hash(path): def get_hash(path):
@@ -26,12 +57,35 @@ class PluginService(BaseService):
except Exception: except Exception:
return "" return ""
# User plugins # 1. Scan core plugins (lowest priority)
if os.path.exists(plugin_dir): core_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), "..", "core_plugins")
for f in os.listdir(plugin_dir): if os.path.exists(core_dir):
for f in os.listdir(core_dir):
if f.endswith(".py"): if f.endswith(".py"):
name = f[:-3] name = f[:-3]
path = os.path.join(plugin_dir, f) path = os.path.join(core_dir, f)
all_plugin_info[name] = {"enabled": True, "hash": get_hash(path)}
# 2. Scan shared plugins (medium priority)
if hasattr(self.config, "_shared_config") and self.config._shared_config:
shared_dir = os.path.join(self.config._shared_config.defaultdir, "plugins")
if os.path.exists(shared_dir):
for f in os.listdir(shared_dir):
if f.endswith(".py"):
name = f[:-3]
path = os.path.join(shared_dir, f)
all_plugin_info[name] = {"enabled": True, "hash": get_hash(path)}
elif f.endswith(".py.bkp"):
name = f[:-7]
all_plugin_info[name] = {"enabled": False}
# 3. Scan user plugins (highest priority)
user_dir = os.path.join(self.config.defaultdir, "plugins")
if os.path.exists(user_dir):
for f in os.listdir(user_dir):
if f.endswith(".py"):
name = f[:-3]
path = os.path.join(user_dir, f)
all_plugin_info[name] = {"enabled": True, "hash": get_hash(path)} all_plugin_info[name] = {"enabled": True, "hash": get_hash(path)}
elif f.endswith(".py.bkp"): elif f.endswith(".py.bkp"):
name = f[:-7] name = f[:-7]
@@ -39,6 +93,7 @@ class PluginService(BaseService):
return all_plugin_info return all_plugin_info
def add_plugin(self, name, source_file, update=False): def add_plugin(self, name, source_file, update=False):
"""Add or update a plugin from a local file.""" """Add or update a plugin from a local file."""
import os import os
@@ -119,6 +174,10 @@ class PluginService(BaseService):
raise InvalidConfigurationError(f"Failed to delete plugin file '{f}': {e}") raise InvalidConfigurationError(f"Failed to delete plugin file '{f}': {e}")
if not deleted: if not deleted:
# If not deleted from user directory, check if it's in shared or core
path, origin, enabled = self._get_plugin_path(name, include_disabled=True)
if origin in ["shared", "core"]:
raise InvalidConfigurationError("Global and core plugins are read-only and cannot be deleted by users.")
raise InvalidConfigurationError(f"Plugin '{name}' not found.") raise InvalidConfigurationError(f"Plugin '{name}' not found.")
def enable_plugin(self, name): def enable_plugin(self, name):
@@ -127,17 +186,38 @@ class PluginService(BaseService):
plugin_file = os.path.join(self.config.defaultdir, "plugins", f"{name}.py") plugin_file = os.path.join(self.config.defaultdir, "plugins", f"{name}.py")
disabled_file = f"{plugin_file}.bkp" disabled_file = f"{plugin_file}.bkp"
if os.path.exists(disabled_file):
# Check if it is a shadow bkp file (0 bytes shadowing shared/core)
is_shadow = False
if os.path.getsize(disabled_file) == 0:
# Resolve without the local bkp file to verify if shared/core has it
path, origin, enabled = self._get_plugin_path(name, include_disabled=False)
if origin in ["shared", "core"]:
is_shadow = True
if is_shadow:
# Remove shadow file to restore inheritance
try:
os.remove(disabled_file)
return True
except OSError as e:
raise InvalidConfigurationError(f"Failed to remove shadow file '{disabled_file}': {e}")
else:
try:
os.rename(disabled_file, plugin_file)
return True
except OSError as e:
raise InvalidConfigurationError(f"Failed to enable plugin '{name}': {e}")
if os.path.exists(plugin_file): if os.path.exists(plugin_file):
return False # Already enabled return False # Already enabled
if not os.path.exists(disabled_file): # If it doesn't exist locally, check if it's already an active shared/core plugin
raise InvalidConfigurationError(f"Plugin '{name}' not found.") path, origin, enabled = self._get_plugin_path(name, include_disabled=False)
if origin in ["shared", "core"]:
return False # Already active/enabled through inheritance
try: raise InvalidConfigurationError(f"Plugin '{name}' not found.")
os.rename(disabled_file, plugin_file)
return True
except OSError as e:
raise InvalidConfigurationError(f"Failed to enable plugin '{name}': {e}")
def disable_plugin(self, name): def disable_plugin(self, name):
"""Deactivate a plugin by renaming it to a backup file.""" """Deactivate a plugin by renaming it to a backup file."""
@@ -145,33 +225,41 @@ class PluginService(BaseService):
plugin_file = os.path.join(self.config.defaultdir, "plugins", f"{name}.py") plugin_file = os.path.join(self.config.defaultdir, "plugins", f"{name}.py")
disabled_file = f"{plugin_file}.bkp" disabled_file = f"{plugin_file}.bkp"
if os.path.exists(plugin_file):
# Regular user-level plugin exists. Rename to bkp
try:
os.rename(plugin_file, disabled_file)
return True
except OSError as e:
raise InvalidConfigurationError(f"Failed to disable plugin '{name}': {e}")
if os.path.exists(disabled_file): if os.path.exists(disabled_file):
return False # Already disabled return False # Already disabled
if not os.path.exists(plugin_file): # Check if it exists in shared or core
raise InvalidConfigurationError(f"Plugin '{name}' not found or is a core plugin.") path, origin, enabled = self._get_plugin_path(name, include_disabled=False)
if origin in ["shared", "core"]:
try: # Shadow disable it by creating an empty .py.bkp in user plugins dir
os.rename(plugin_file, disabled_file) plugin_dir = os.path.dirname(plugin_file)
return True os.makedirs(plugin_dir, exist_ok=True)
except OSError as e: try:
raise InvalidConfigurationError(f"Failed to disable plugin '{name}': {e}") with open(disabled_file, "w") as f:
f.write("")
return True
except OSError as e:
raise InvalidConfigurationError(f"Failed to create shadow disable file: {e}")
raise InvalidConfigurationError(f"Plugin '{name}' not found or is already disabled.")
def get_plugin_source(self, name): def get_plugin_source(self, name):
import os import os
from ..services.exceptions import InvalidConfigurationError from ..services.exceptions import InvalidConfigurationError
plugin_file = os.path.join(self.config.defaultdir, "plugins", f"{name}.py") path, origin, enabled = self._get_plugin_path(name, include_disabled=False)
core_path = os.path.dirname(os.path.realpath(__file__)) + f"/../core_plugins/{name}.py" if not path:
if os.path.exists(plugin_file):
target = plugin_file
elif os.path.exists(core_path):
target = core_path
else:
raise InvalidConfigurationError(f"Plugin '{name}' not found") raise InvalidConfigurationError(f"Plugin '{name}' not found")
with open(target, "r") as f: with open(path, "r") as f:
return f.read() return f.read()
def invoke_plugin(self, name, args_dict): def invoke_plugin(self, name, args_dict):
@@ -211,17 +299,12 @@ class PluginService(BaseService):
p_manager = Plugins() p_manager = Plugins()
import os import os
plugin_file = os.path.join(self.config.defaultdir, "plugins", f"{name}.py")
core_path = os.path.dirname(os.path.realpath(__file__)) + f"/../core_plugins/{name}.py"
if os.path.exists(plugin_file): path, origin, enabled = self._get_plugin_path(name, include_disabled=False)
target = plugin_file if not path:
elif os.path.exists(core_path):
target = core_path
else:
raise InvalidConfigurationError(f"Plugin '{name}' not found") raise InvalidConfigurationError(f"Plugin '{name}' not found")
module = p_manager._import_from_path(target) module = p_manager._import_from_path(path)
parser = module.Parser().parser if hasattr(module, "Parser") else None parser = module.Parser().parser if hasattr(module, "Parser") else None
if "__func_name__" in args_dict and hasattr(module, args_dict["__func_name__"]): if "__func_name__" in args_dict and hasattr(module, args_dict["__func_name__"]):
+198
View File
@@ -0,0 +1,198 @@
import os
import shutil
import pytest
from connpy.configfile import configfile
from connpy.services.plugin_service import PluginService
from connpy.services.exceptions import InvalidConfigurationError
@pytest.fixture
def temp_plugins_env(tmp_path):
"""Creates a temporary isolated environment for core, shared, and user plugins."""
base_dir = tmp_path / "plugins_test_env"
base_dir.mkdir()
# Paths for shared config and user config folders
shared_dir = base_dir / "shared"
user_dir = base_dir / "user"
shared_dir.mkdir()
user_dir.mkdir()
# Create plugins subdirectories
(shared_dir / "plugins").mkdir()
(user_dir / "plugins").mkdir()
# Mock core_plugins path by creating a sibling folder
core_dir = base_dir / "core_plugins"
core_dir.mkdir()
# Config file paths
shared_path = os.path.join(shared_dir, "config.yaml")
user_path = os.path.join(user_dir, "config.yaml")
# Write empty config templates
import yaml
empty_conf = {"config": {}, "connections": {}, "profiles": {}}
with open(shared_path, "w") as f:
yaml.safe_dump(empty_conf, f)
with open(user_path, "w") as f:
yaml.safe_dump(empty_conf, f)
return {
"shared_dir": shared_dir,
"user_dir": user_dir,
"core_dir": core_dir,
"shared_path": shared_path,
"user_path": user_path
}
def test_plugin_resolution_priority_merge(temp_plugins_env, monkeypatch):
"""Test that list_plugins correctly merges core, shared, and user plugins with overrides."""
env = temp_plugins_env
# 1. Create a core plugin: 'coreplug'
core_file = env["core_dir"] / "coreplug.py"
with open(core_file, "w") as f:
f.write("# core plugin content")
# 2. Create a shared plugin: 'sharedplug'
shared_file = env["shared_dir"] / "plugins" / "sharedplug.py"
with open(shared_file, "w") as f:
f.write("# shared plugin content")
# 3. Create a user plugin: 'userplug'
user_file = env["user_dir"] / "plugins" / "userplug.py"
with open(user_file, "w") as f:
f.write("# user plugin content")
# 4. Create an override plugin: 'overrideplug' in all three directories
with open(env["core_dir"] / "overrideplug.py", "w") as f:
f.write("# core override version")
with open(env["shared_dir"] / "plugins" / "overrideplug.py", "w") as f:
f.write("# shared override version")
with open(env["user_dir"] / "plugins" / "overrideplug.py", "w") as f:
f.write("# user override version")
# Initialize configs
shared_cfg = configfile(conf=env["shared_path"])
user_cfg = configfile(conf=env["user_path"], shared_config=shared_cfg)
# Initialize service
plugin_svc = PluginService(user_cfg)
# Monkeypatch the core plugins folder path inside list_plugins
# in order to use our mock core folder instead of the real one.
# Note: real path is computed via __file__, so we'll mock the internal core path
monkeypatch.setattr(
"os.path.realpath",
lambda path: os.path.join(str(env["core_dir"]), "dummy")
)
plugins_list = plugin_svc.list_plugins()
# Verify all plugins are registered
assert "coreplug" in plugins_list
assert "sharedplug" in plugins_list
assert "userplug" in plugins_list
assert "overrideplug" in plugins_list
# Verify status is Active (enabled=True)
assert plugins_list["coreplug"]["enabled"] is True
assert plugins_list["sharedplug"]["enabled"] is True
assert plugins_list["userplug"]["enabled"] is True
assert plugins_list["overrideplug"]["enabled"] is True
# Verify hashes differ matching user overrides
import hashlib
user_override_hash = hashlib.md5(b"# user override version").hexdigest()
assert plugins_list["overrideplug"]["hash"] == user_override_hash
def test_get_plugin_source_override(temp_plugins_env, monkeypatch):
"""Test that get_plugin_source resolves the highest priority plugin version."""
env = temp_plugins_env
# Create override in shared and user
with open(env["shared_dir"] / "plugins" / "myplug.py", "w") as f:
f.write("shared content")
with open(env["user_dir"] / "plugins" / "myplug.py", "w") as f:
f.write("user override")
shared_cfg = configfile(conf=env["shared_path"])
user_cfg = configfile(conf=env["user_path"], shared_config=shared_cfg)
plugin_svc = PluginService(user_cfg)
# Fetch source
source = plugin_svc.get_plugin_source("myplug")
assert source == "user override"
def test_delete_plugin_restrictions(temp_plugins_env):
"""Test that deleting shared plugins is rejected, but deleting user overrides works."""
env = temp_plugins_env
# Create shared plugin
with open(env["shared_dir"] / "plugins" / "globalplug.py", "w") as f:
f.write("global content")
# Create user plugin override
with open(env["user_dir"] / "plugins" / "globalplug.py", "w") as f:
f.write("user content")
shared_cfg = configfile(conf=env["shared_path"])
user_cfg = configfile(conf=env["user_path"], shared_config=shared_cfg)
plugin_svc = PluginService(user_cfg)
# 1. Delete plugin (should delete the user override first)
plugin_svc.delete_plugin("globalplug")
# Verify user override is gone, but shared plugin remains
assert not os.path.exists(env["user_dir"] / "plugins" / "globalplug.py")
assert os.path.exists(env["shared_dir"] / "plugins" / "globalplug.py")
# 2. Try to delete again (now only exists in shared/global folder)
with pytest.raises(InvalidConfigurationError) as exc:
plugin_svc.delete_plugin("globalplug")
assert "Global and core plugins are read-only" in str(exc.value)
# Verify shared plugin is still present
assert os.path.exists(env["shared_dir"] / "plugins" / "globalplug.py")
def test_shadow_disable_and_enable_mechanisms(temp_plugins_env):
"""Test that disabling a shared plugin creates a shadow backup file and enabling it removes it."""
env = temp_plugins_env
# Create a shared plugin
with open(env["shared_dir"] / "plugins" / "sharedplug.py", "w") as f:
f.write("shared content")
shared_cfg = configfile(conf=env["shared_path"])
user_cfg = configfile(conf=env["user_path"], shared_config=shared_cfg)
plugin_svc = PluginService(user_cfg)
# Ensure it's active initially
list_initial = plugin_svc.list_plugins()
assert list_initial["sharedplug"]["enabled"] is True
# 1. Disable the shared plugin (should shadow-disable it in user dir)
res = plugin_svc.disable_plugin("sharedplug")
assert res is True
# Verify shadow bkp file exists in user plugins and has 0 bytes
shadow_bkp = env["user_dir"] / "plugins" / "sharedplug.py.bkp"
assert os.path.exists(shadow_bkp)
assert os.path.getsize(shadow_bkp) == 0
# Verify list_plugins lists it as disabled
list_disabled = plugin_svc.list_plugins()
assert list_disabled["sharedplug"]["enabled"] is False
# 2. Re-enable the shadow-disabled plugin (should delete the user shadow file)
res_enable = plugin_svc.enable_plugin("sharedplug")
assert res_enable is True
# Verify shadow file is deleted
assert not os.path.exists(shadow_bkp)
# Verify list_plugins lists it as active again
list_active = plugin_svc.list_plugins()
assert list_active["sharedplug"]["enabled"] is True