Module connpy.cli.login_handler

Classes

class LoginHandler (app)
Expand source code
class LoginHandler:
    def __init__(self, app):
        self.app = app

    def dispatch(self, args):
        action = getattr(args, "action", None)
        if action == "login":
            return self.login(args)
        elif action == "logout":
            return self.logout(args)
        else:
            printer.error(f"Unknown action: {action}")
            sys.exit(1)

    def login(self, args):
        if getattr(args, "status", False):
            return self.show_status()

        if self.app.services.mode != "remote":
            printer.warning("Note: Your current configuration is set to local mode. Logging in will save credentials, but they will only apply when service-mode is set to 'remote'.")

        username = getattr(args, "username", None)
        if not username:
            try:
                username = input("Username: ").strip()
                if not username:
                    printer.error("Username cannot be empty.")
                    sys.exit(1)
            except (KeyboardInterrupt, EOFError):
                printer.warning("\nOperation cancelled.")
                sys.exit(130)

        try:
            password = getpass.getpass("Password: ")
            if not password:
                printer.error("Password cannot be empty.")
                sys.exit(1)
        except (KeyboardInterrupt, EOFError):
            printer.warning("\nOperation cancelled.")
            sys.exit(130)

        # Make the gRPC login call via self.app.services.auth stub
        # We need to make sure auth is initialized in remote mode.
        # If we are in local mode, self.app.services.auth is not initialized on ServiceProvider.
        # Let's instantiate it dynamically if it's not present.
        auth_service = getattr(self.app.services, "auth", None)
        if not auth_service:
            import grpc
            from ..grpc_layer.stubs import AuthStub
            remote_host = self.app.services.remote_host or self.app.config.config.get("remote_host")
            if not remote_host:
                printer.error("Remote host is not configured. Run 'connpy config --remote HOST:PORT' first.")
                sys.exit(1)
            try:
                channel = grpc.insecure_channel(remote_host)
                auth_service = AuthStub(channel, remote_host=remote_host)
            except Exception as e:
                printer.error(f"Failed to connect to remote server for login: {e}")
                sys.exit(1)

        try:
            res = auth_service.login(username, password)
            token = res["token"]
            
            # Save token to ~/.config/conn/.token
            token_path = os.path.join(self.app.config.defaultdir, ".token")
            with open(token_path, "w") as f:
                f.write(token)
            os.chmod(token_path, 0o600)
            
            printer.success(f"Logged in successfully as '{username}'. Session expires in 8 hours.")
        except ConnpyError as e:
            printer.error(f"Login failed: {e}")
            sys.exit(1)
        except Exception as e:
            printer.error(f"Login failed with unexpected error: {e}")
            sys.exit(1)

    def logout(self, args):
        token_path = os.path.join(self.app.config.defaultdir, ".token")
        if os.path.exists(token_path):
            try:
                os.remove(token_path)
                printer.success("Logged out successfully. Local session cleared.")
            except Exception as e:
                printer.error(f"Failed to clear session: {e}")
                sys.exit(1)
        else:
            printer.info("No active session found (already logged out).")

    def show_status(self):
        import base64
        import json
        import datetime
        
        token_path = os.path.join(self.app.config.defaultdir, ".token")
        if not os.path.exists(token_path):
            printer.warning("No active session found. You can log in using 'connpy login'.")
            return
            
        try:
            with open(token_path, "r") as f:
                token = f.read().strip()
                
            parts = token.split(".")
            if len(parts) != 3:
                printer.error("Invalid local session token format.")
                return
                
            payload_b64 = parts[1]
            payload_b64 += "=" * ((4 - len(payload_b64) % 4) % 4)
            payload_bytes = base64.urlsafe_b64decode(payload_b64)
            payload = json.loads(payload_bytes.decode("utf-8"))
            
            username = payload.get("sub")
            exp = payload.get("exp")
            
            if not exp:
                printer.success(f"Active session as '{username}' (Indefinite expiration).")
                return
                
            now = datetime.datetime.now(datetime.timezone.utc).timestamp()
            if now > exp:
                printer.error("Session has expired. Please log in again using 'connpy login'.")
                return
                
            remaining = exp - now
            hours = int(remaining // 3600)
            minutes = int((remaining % 3600) // 60)
            
            printer.success(f"Logged in as '{username}'")
            printer.info(f"Time remaining: {hours}h {minutes}m")
            
            exp_dt = datetime.datetime.fromtimestamp(exp, datetime.timezone.utc)
            printer.info(f"Expires at: {exp_dt.strftime('%Y-%m-%d %H:%M:%S UTC')}")
        except Exception as e:
            printer.error(f"Failed to check local session status: {e}")

Methods

def dispatch(self, args)
Expand source code
def dispatch(self, args):
    action = getattr(args, "action", None)
    if action == "login":
        return self.login(args)
    elif action == "logout":
        return self.logout(args)
    else:
        printer.error(f"Unknown action: {action}")
        sys.exit(1)
def login(self, args)
Expand source code
def login(self, args):
    if getattr(args, "status", False):
        return self.show_status()

    if self.app.services.mode != "remote":
        printer.warning("Note: Your current configuration is set to local mode. Logging in will save credentials, but they will only apply when service-mode is set to 'remote'.")

    username = getattr(args, "username", None)
    if not username:
        try:
            username = input("Username: ").strip()
            if not username:
                printer.error("Username cannot be empty.")
                sys.exit(1)
        except (KeyboardInterrupt, EOFError):
            printer.warning("\nOperation cancelled.")
            sys.exit(130)

    try:
        password = getpass.getpass("Password: ")
        if not password:
            printer.error("Password cannot be empty.")
            sys.exit(1)
    except (KeyboardInterrupt, EOFError):
        printer.warning("\nOperation cancelled.")
        sys.exit(130)

    # Make the gRPC login call via self.app.services.auth stub
    # We need to make sure auth is initialized in remote mode.
    # If we are in local mode, self.app.services.auth is not initialized on ServiceProvider.
    # Let's instantiate it dynamically if it's not present.
    auth_service = getattr(self.app.services, "auth", None)
    if not auth_service:
        import grpc
        from ..grpc_layer.stubs import AuthStub
        remote_host = self.app.services.remote_host or self.app.config.config.get("remote_host")
        if not remote_host:
            printer.error("Remote host is not configured. Run 'connpy config --remote HOST:PORT' first.")
            sys.exit(1)
        try:
            channel = grpc.insecure_channel(remote_host)
            auth_service = AuthStub(channel, remote_host=remote_host)
        except Exception as e:
            printer.error(f"Failed to connect to remote server for login: {e}")
            sys.exit(1)

    try:
        res = auth_service.login(username, password)
        token = res["token"]
        
        # Save token to ~/.config/conn/.token
        token_path = os.path.join(self.app.config.defaultdir, ".token")
        with open(token_path, "w") as f:
            f.write(token)
        os.chmod(token_path, 0o600)
        
        printer.success(f"Logged in successfully as '{username}'. Session expires in 8 hours.")
    except ConnpyError as e:
        printer.error(f"Login failed: {e}")
        sys.exit(1)
    except Exception as e:
        printer.error(f"Login failed with unexpected error: {e}")
        sys.exit(1)
def logout(self, args)
Expand source code
def logout(self, args):
    token_path = os.path.join(self.app.config.defaultdir, ".token")
    if os.path.exists(token_path):
        try:
            os.remove(token_path)
            printer.success("Logged out successfully. Local session cleared.")
        except Exception as e:
            printer.error(f"Failed to clear session: {e}")
            sys.exit(1)
    else:
        printer.info("No active session found (already logged out).")
def show_status(self)
Expand source code
def show_status(self):
    import base64
    import json
    import datetime
    
    token_path = os.path.join(self.app.config.defaultdir, ".token")
    if not os.path.exists(token_path):
        printer.warning("No active session found. You can log in using 'connpy login'.")
        return
        
    try:
        with open(token_path, "r") as f:
            token = f.read().strip()
            
        parts = token.split(".")
        if len(parts) != 3:
            printer.error("Invalid local session token format.")
            return
            
        payload_b64 = parts[1]
        payload_b64 += "=" * ((4 - len(payload_b64) % 4) % 4)
        payload_bytes = base64.urlsafe_b64decode(payload_b64)
        payload = json.loads(payload_bytes.decode("utf-8"))
        
        username = payload.get("sub")
        exp = payload.get("exp")
        
        if not exp:
            printer.success(f"Active session as '{username}' (Indefinite expiration).")
            return
            
        now = datetime.datetime.now(datetime.timezone.utc).timestamp()
        if now > exp:
            printer.error("Session has expired. Please log in again using 'connpy login'.")
            return
            
        remaining = exp - now
        hours = int(remaining // 3600)
        minutes = int((remaining % 3600) // 60)
        
        printer.success(f"Logged in as '{username}'")
        printer.info(f"Time remaining: {hours}h {minutes}m")
        
        exp_dt = datetime.datetime.fromtimestamp(exp, datetime.timezone.utc)
        printer.info(f"Expires at: {exp_dt.strftime('%Y-%m-%d %H:%M:%S UTC')}")
    except Exception as e:
        printer.error(f"Failed to check local session status: {e}")