Module connpy.services.user_service

Classes

class UserService (config_dir)
Expand source code
class UserService:
    def __init__(self, config_dir):
        self.config_dir = os.path.abspath(config_dir)
        self.users_dir = os.path.join(self.config_dir, "users")
        self.registry_file = os.path.join(self.users_dir, "registry.yaml")
        
        # Ensure users directory exists
        os.makedirs(self.users_dir, exist_ok=True)

    def _load_registry(self) -> dict:
        """Loads registry from file. If it doesn't exist, initializes it with a new JWT secret."""
        if not os.path.exists(self.registry_file):
            registry = {
                "jwt_secret": secrets.token_hex(32),
                "users": {}
            }
            self._save_registry(registry)
            return registry
        
        try:
            with open(self.registry_file, "r") as f:
                registry = yaml.safe_load(f) or {}
        except Exception:
            registry = {}
            
        if not isinstance(registry, dict):
            registry = {}
            
        if "jwt_secret" not in registry:
            registry["jwt_secret"] = secrets.token_hex(32)
            
        if "users" not in registry or not isinstance(registry["users"], dict):
            registry["users"] = {}
            
        return registry

    def _save_registry(self, data: dict):
        """Safely saves registry structure to registry.yaml."""
        tmp_file = self.registry_file + ".tmp"
        try:
            with open(tmp_file, "w") as f:
                yaml.dump(data, f, default_flow_style=False, sort_keys=False)
            os.replace(tmp_file, self.registry_file)
            os.chmod(self.registry_file, 0o600)
        except Exception as e:
            if os.path.exists(tmp_file):
                try:
                    os.remove(tmp_file)
                except OSError:
                    pass
            raise e

    def create_user(self, username, password, config_path=None) -> dict:
        """Creates a new user with bcrypt-hashed credentials.
        
        Mode A: config_path=None (fresh user) -> Generates config.yaml and .osk key.
        Mode B: config_path set -> Reuses existing directory after validating its structure.
        """
        if not username or not isinstance(username, str):
            raise ValueError("Username cannot be empty")
            
        if not re.match(r"^[a-zA-Z0-9_-]+$", username):
            raise ValueError("Username must contain only alphanumeric characters, dashes, or underscores")
            
        if not password or not isinstance(password, str):
            raise ValueError("Password cannot be empty")
            
        registry = self._load_registry()
        if username in registry["users"]:
            raise ValueError(f"User '{username}' already exists")
            
        # Resolve path and initialize configuration
        if config_path is None:
            user_dir = os.path.join(self.users_dir, username)
            os.makedirs(user_dir, exist_ok=True)
            
            # Create subdirs for plugins and sessions
            os.makedirs(os.path.join(user_dir, "plugins"), exist_ok=True)
            os.makedirs(os.path.join(user_dir, "ai_sessions"), exist_ok=True)
            
            # Create default config.yaml & .osk key via configfile
            conf_file = os.path.join(user_dir, "config.yaml")
            configfile(conf=conf_file)
            
            stored_config_path = None
        else:
            abs_config_path = os.path.abspath(config_path)
            os.makedirs(abs_config_path, exist_ok=True)
            
            # Create subdirs for plugins and sessions in the custom path
            os.makedirs(os.path.join(abs_config_path, "plugins"), exist_ok=True)
            os.makedirs(os.path.join(abs_config_path, "ai_sessions"), exist_ok=True)
            
            # Create default config.yaml & .osk key via configfile if config.yaml is not present
            conf_file = os.path.join(abs_config_path, "config.yaml")
            if not os.path.exists(conf_file):
                configfile(conf=conf_file)
                
            stored_config_path = abs_config_path

        # Hash password securely
        password_hash = bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8")
        
        user_entry = {
            "password_hash": password_hash,
            "config_path": stored_config_path,
            "created": datetime.datetime.now(datetime.timezone.utc).isoformat()
        }
        
        registry["users"][username] = user_entry
        self._save_registry(registry)
        
        return {
            "username": username,
            "config_path": stored_config_path,
            "created": user_entry["created"]
        }

    def delete_user(self, username):
        """Removes user from the registry and cleans up config directory if server-managed."""
        registry = self._load_registry()
        if username not in registry["users"]:
            raise ValueError(f"User '{username}' not found")
            
        user_data = registry["users"][username]
        config_path = user_data.get("config_path")
        
        if config_path is None:
            user_dir = os.path.join(self.users_dir, username)
            if os.path.exists(user_dir):
                shutil.rmtree(user_dir, ignore_errors=True)
                
        del registry["users"][username]
        self._save_registry(registry)

    def list_users(self) -> list[dict]:
        """Lists all registered users with metadata."""
        registry = self._load_registry()
        return [
            {
                "username": name,
                "config_path": data.get("config_path"),
                "created": data.get("created")
            }
            for name, data in registry.get("users", {}).items()
        ]

    def get_user(self, username) -> dict:
        """Retrieves raw metadata for a specific user."""
        registry = self._load_registry()
        if username not in registry["users"]:
            raise ValueError(f"User '{username}' not found")
            
        data = registry["users"][username]
        return {
            "username": username,
            "config_path": data.get("config_path"),
            "created": data.get("created"),
            "password_hash": data.get("password_hash")
        }

    def change_password(self, username, old_password, new_password):
        """Verifies old password and updates registry with new hashed password."""
        if not new_password or not isinstance(new_password, str):
            raise ValueError("New password cannot be empty")
            
        registry = self._load_registry()
        if username not in registry["users"]:
            raise ValueError(f"User '{username}' not found")
            
        user_data = registry["users"][username]
        if not bcrypt.checkpw(old_password.encode("utf-8"), user_data["password_hash"].encode("utf-8")):
            raise ValueError("Invalid credentials")
            
        # Update hash
        user_data["password_hash"] = bcrypt.hashpw(new_password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8")
        self._save_registry(registry)

    def admin_change_password(self, username, new_password):
        """Administrative password override (does not require old password)."""
        if not new_password or not isinstance(new_password, str):
            raise ValueError("New password cannot be empty")
            
        registry = self._load_registry()
        if username not in registry["users"]:
            raise ValueError(f"User '{username}' not found")
            
        user_data = registry["users"][username]
        user_data["password_hash"] = bcrypt.hashpw(new_password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8")
        self._save_registry(registry)

    def authenticate(self, username, password) -> bool:
        """Verifies if the credentials are valid using bcrypt."""
        registry = self._load_registry()
        if username not in registry["users"]:
            return False
            
        user_data = registry["users"][username]
        return bcrypt.checkpw(password.encode("utf-8"), user_data["password_hash"].encode("utf-8"))

    def generate_jwt(self, username) -> str:
        """Generates a secure JSON Web Token for the user expiring in 8 hours."""
        registry = self._load_registry()
        if username not in registry["users"]:
            raise ValueError(f"User '{username}' not found")
            
        expiration = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(hours=8)
        payload = {
            "sub": username,
            "exp": expiration
        }
        
        token = jwt.encode(payload, registry["jwt_secret"], algorithm="HS256")
        if isinstance(token, bytes):
            token = token.decode("utf-8")
            
        return token

    def verify_jwt(self, token) -> str | None:
        """Decodes JWT and returns username if token is valid and unexpired."""
        registry = self._load_registry()
        try:
            payload = jwt.decode(token, registry["jwt_secret"], algorithms=["HS256"])
            return payload.get("sub")
        except (jwt.ExpiredSignatureError, jwt.InvalidTokenError, KeyError):
            return None

Methods

def admin_change_password(self, username, new_password)
Expand source code
def admin_change_password(self, username, new_password):
    """Administrative password override (does not require old password)."""
    if not new_password or not isinstance(new_password, str):
        raise ValueError("New password cannot be empty")
        
    registry = self._load_registry()
    if username not in registry["users"]:
        raise ValueError(f"User '{username}' not found")
        
    user_data = registry["users"][username]
    user_data["password_hash"] = bcrypt.hashpw(new_password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8")
    self._save_registry(registry)

Administrative password override (does not require old password).

def authenticate(self, username, password) ‑> bool
Expand source code
def authenticate(self, username, password) -> bool:
    """Verifies if the credentials are valid using bcrypt."""
    registry = self._load_registry()
    if username not in registry["users"]:
        return False
        
    user_data = registry["users"][username]
    return bcrypt.checkpw(password.encode("utf-8"), user_data["password_hash"].encode("utf-8"))

Verifies if the credentials are valid using bcrypt.

def change_password(self, username, old_password, new_password)
Expand source code
def change_password(self, username, old_password, new_password):
    """Verifies old password and updates registry with new hashed password."""
    if not new_password or not isinstance(new_password, str):
        raise ValueError("New password cannot be empty")
        
    registry = self._load_registry()
    if username not in registry["users"]:
        raise ValueError(f"User '{username}' not found")
        
    user_data = registry["users"][username]
    if not bcrypt.checkpw(old_password.encode("utf-8"), user_data["password_hash"].encode("utf-8")):
        raise ValueError("Invalid credentials")
        
    # Update hash
    user_data["password_hash"] = bcrypt.hashpw(new_password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8")
    self._save_registry(registry)

Verifies old password and updates registry with new hashed password.

def create_user(self, username, password, config_path=None) ‑> dict
Expand source code
def create_user(self, username, password, config_path=None) -> dict:
    """Creates a new user with bcrypt-hashed credentials.
    
    Mode A: config_path=None (fresh user) -> Generates config.yaml and .osk key.
    Mode B: config_path set -> Reuses existing directory after validating its structure.
    """
    if not username or not isinstance(username, str):
        raise ValueError("Username cannot be empty")
        
    if not re.match(r"^[a-zA-Z0-9_-]+$", username):
        raise ValueError("Username must contain only alphanumeric characters, dashes, or underscores")
        
    if not password or not isinstance(password, str):
        raise ValueError("Password cannot be empty")
        
    registry = self._load_registry()
    if username in registry["users"]:
        raise ValueError(f"User '{username}' already exists")
        
    # Resolve path and initialize configuration
    if config_path is None:
        user_dir = os.path.join(self.users_dir, username)
        os.makedirs(user_dir, exist_ok=True)
        
        # Create subdirs for plugins and sessions
        os.makedirs(os.path.join(user_dir, "plugins"), exist_ok=True)
        os.makedirs(os.path.join(user_dir, "ai_sessions"), exist_ok=True)
        
        # Create default config.yaml & .osk key via configfile
        conf_file = os.path.join(user_dir, "config.yaml")
        configfile(conf=conf_file)
        
        stored_config_path = None
    else:
        abs_config_path = os.path.abspath(config_path)
        os.makedirs(abs_config_path, exist_ok=True)
        
        # Create subdirs for plugins and sessions in the custom path
        os.makedirs(os.path.join(abs_config_path, "plugins"), exist_ok=True)
        os.makedirs(os.path.join(abs_config_path, "ai_sessions"), exist_ok=True)
        
        # Create default config.yaml & .osk key via configfile if config.yaml is not present
        conf_file = os.path.join(abs_config_path, "config.yaml")
        if not os.path.exists(conf_file):
            configfile(conf=conf_file)
            
        stored_config_path = abs_config_path

    # Hash password securely
    password_hash = bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8")
    
    user_entry = {
        "password_hash": password_hash,
        "config_path": stored_config_path,
        "created": datetime.datetime.now(datetime.timezone.utc).isoformat()
    }
    
    registry["users"][username] = user_entry
    self._save_registry(registry)
    
    return {
        "username": username,
        "config_path": stored_config_path,
        "created": user_entry["created"]
    }

Creates a new user with bcrypt-hashed credentials.

Mode A: config_path=None (fresh user) -> Generates config.yaml and .osk key. Mode B: config_path set -> Reuses existing directory after validating its structure.

def delete_user(self, username)
Expand source code
def delete_user(self, username):
    """Removes user from the registry and cleans up config directory if server-managed."""
    registry = self._load_registry()
    if username not in registry["users"]:
        raise ValueError(f"User '{username}' not found")
        
    user_data = registry["users"][username]
    config_path = user_data.get("config_path")
    
    if config_path is None:
        user_dir = os.path.join(self.users_dir, username)
        if os.path.exists(user_dir):
            shutil.rmtree(user_dir, ignore_errors=True)
            
    del registry["users"][username]
    self._save_registry(registry)

Removes user from the registry and cleans up config directory if server-managed.

def generate_jwt(self, username) ‑> str
Expand source code
def generate_jwt(self, username) -> str:
    """Generates a secure JSON Web Token for the user expiring in 8 hours."""
    registry = self._load_registry()
    if username not in registry["users"]:
        raise ValueError(f"User '{username}' not found")
        
    expiration = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(hours=8)
    payload = {
        "sub": username,
        "exp": expiration
    }
    
    token = jwt.encode(payload, registry["jwt_secret"], algorithm="HS256")
    if isinstance(token, bytes):
        token = token.decode("utf-8")
        
    return token

Generates a secure JSON Web Token for the user expiring in 8 hours.

def get_user(self, username) ‑> dict
Expand source code
def get_user(self, username) -> dict:
    """Retrieves raw metadata for a specific user."""
    registry = self._load_registry()
    if username not in registry["users"]:
        raise ValueError(f"User '{username}' not found")
        
    data = registry["users"][username]
    return {
        "username": username,
        "config_path": data.get("config_path"),
        "created": data.get("created"),
        "password_hash": data.get("password_hash")
    }

Retrieves raw metadata for a specific user.

def list_users(self) ‑> list[dict]
Expand source code
def list_users(self) -> list[dict]:
    """Lists all registered users with metadata."""
    registry = self._load_registry()
    return [
        {
            "username": name,
            "config_path": data.get("config_path"),
            "created": data.get("created")
        }
        for name, data in registry.get("users", {}).items()
    ]

Lists all registered users with metadata.

def verify_jwt(self, token) ‑> str | None
Expand source code
def verify_jwt(self, token) -> str | None:
    """Decodes JWT and returns username if token is valid and unexpired."""
    registry = self._load_registry()
    try:
        payload = jwt.decode(token, registry["jwt_secret"], algorithms=["HS256"])
        return payload.get("sub")
    except (jwt.ExpiredSignatureError, jwt.InvalidTokenError, KeyError):
        return None

Decodes JWT and returns username if token is valid and unexpired.