扩展 Apache Superset 中的用户模型

问题描述 投票:0回答:1

我想向我登录的 keycloak 用户的用户信息中存在的用户模型添加一些自定义属性。 在 superset_config.py 中我添加了这个:

from flask_appbuilder.security.sqla.models import User
from sqlalchemy import Column, Integer

class CustomUser(User):
    __tablename__ = "ab_user"
    custom_attribute = Column(Integer)

这是我的 CustomSsoSecurityManager 类:

class CustomSsoSecurityManager(SupersetSecurityManager):
    authoauthview = CustomSsoAuthOAuthView

def oauth_user_info(self, provider, res=None):
    logging.debug("Oauth2 provider: {0}.".format(provider))
    if provider == 'keycloak':
        try:
            # Assuming self.appbuilder.sm.oauth_remotes[provider] is correctly set up
            response = self.appbuilder.sm.oauth_remotes[provider].get('userinfo')

            if response.status_code == 200:
                data = response.json()
                username = data.get("preferred_username", "")
                first_name = data.get("given_name", "")
                last_name = data.get("family_name", "")
                email = data.get("email", "")
                customAttribute= data.get("customAttribute", 0)

                user_info = {
                    "username": username,
                    "first_name": first_name,
                    "last_name": last_name,
                    "email": email,
                    "is_active": True,
                    "custom_attribute": customAttribute
                }

                print("User Information:", user_info)

                return user_info
            else:
                logging.debug(f"OAuth provider returned an error: {response.status_code} - {response.text}")
                return None
        except requests.exceptions.RequestException as e:
            logging.debug(f"Request to OAuth provider failed: {e}")
            return None
        except json.JSONDecodeError as e:
            logging.debug(f"Error decoding JSON response: {e}")
            return None
        except Exception as e:
            logging.debug(f"An error occurred: {e}")
            return None

创建超集数据库表后,我可以在架构中看到 custom_attribute 列, 问题是当我第一次使用具有 customAttribute 值的 keycloak 用户登录时,超集数据库中没有保存任何内容。 我该怎么做才能确保保存该值?

我还想问是否可以制作一个自定义 Jinja 宏来将此 customAttribute 作为 Sql Lab 中的参数传递

python flask oauth apache-superset
1个回答
0
投票

首先你需要阅读这篇关于扩展用户模型属性的官方文章

然后,为了从 keycloak 用户信息中映射属性,您需要重写 SecurityManager 的一些方法。

以下超集 3.1.0 的代码示例

superset/custom/models.py

from flask_appbuilder.security.sqla.models import User
from sqlalchemy import Column, Integer, ForeignKey, String, Integer, Sequence, Table
from sqlalchemy.orm import relationship, backref
from flask_appbuilder import Model

class CustomUser(User):
    __tablename__ = 'ab_user'
    tenant_id = Column(Integer)

superset/custom/security.py

import logging
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union

from superset.security.manager import SupersetSecurityManager
from superset.custom.models import CustomUser

log = logging.getLogger(__name__)

class CustomSecurityManager(SupersetSecurityManager):
    user_model = CustomUser
    # userdbmodelview = CustomUserDBModelView

    def get_oauth_user_info(self, provider: str, resp: Dict[str, Any]) -> Dict[str, Any]:
        # for Keycloak
        if provider in ["keycloak", "keycloak_before_17"]:
            me = self.appbuilder.sm.oauth_remotes[provider].get(
                "openid-connect/userinfo"
            )
            me.raise_for_status()
            data = me.json()
            log.debug("User info from Keycloak: %s", data)

            return {
                "username": data.get("preferred_username", ""),
                "first_name": data.get("given_name", ""),
                "last_name": data.get("family_name", ""),
                "email": data.get("email", ""),
                "tenant_id": data.get("tenant_id", None),
            }
        else:
            return super().get_oauth_user_info(provider, resp)


    def _oauth_calculate_user_roles(self, userinfo) -> List[str]:
        user_role_objects = set()

        # apply AUTH_ROLES_MAPPING
        if len(self.auth_roles_mapping) > 0:
            user_role_keys = userinfo.get("role_keys", [])
            user_role_objects.update(self.get_roles_from_keys(user_role_keys))

        # apply AUTH_USER_REGISTRATION_ROLE
        if self.auth_user_registration:
            registration_role_name = self.auth_user_registration_role

            # if AUTH_USER_REGISTRATION_ROLE_JMESPATH is set,
            # use it for the registration role
            if self.auth_user_registration_role_jmespath:
                import jmespath

                registration_role_name = jmespath.search(
                    self.auth_user_registration_role_jmespath, userinfo
                )

            # lookup registration role in flask db
            fab_role = self.find_role(registration_role_name)
            if fab_role:
                user_role_objects.add(fab_role)
            else:
                log.warning(
                    "Can't find AUTH_USER_REGISTRATION role: %s", registration_role_name
                )

        return list(user_role_objects)

    def auth_user_oauth(self, userinfo):
        """
        Method for authenticating user with OAuth.

        :userinfo: dict with user information
                   (keys are the same as User model columns)
        """
        # extract the username from `userinfo`
        if "username" in userinfo:
            username = userinfo["username"]
        elif "email" in userinfo:
            username = userinfo["email"]
        else:
            log.error("OAUTH userinfo does not have username or email %s", userinfo)
            return None

        # If username is empty, go away
        if (username is None) or username == "":
            return None

        # Search the DB for this user
        user = self.find_user(username=username)

        # If user is not active, go away
        if user and (not user.is_active):
            return None

        # If user is not registered, and not self-registration, go away
        if (not user) and (not self.auth_user_registration):
            return None

        # Sync the user's roles
        if user and self.auth_roles_sync_at_login:
            user.roles = self._oauth_calculate_user_roles(userinfo)
            log.debug("Calculated new roles for user='%s' as: %s", username, user.roles)

        # If the user is new, register them
        if (not user) and self.auth_user_registration:
            user = self.add_user(
                username=username,
                first_name=userinfo.get("first_name", ""),
                last_name=userinfo.get("last_name", ""),
                email=userinfo.get("email", "") or f"{username}@email.notfound",
                role=self._oauth_calculate_user_roles(userinfo),
            )
            log.debug("New user registered: %s", user)

            # If user registration failed, go away
            if not user:
                log.error("Error creating a new OAuth user %s", username)
                return None

        log.debug(f"tenant_id {userinfo['tenant_id']}")

        # Set tenant_id
        if "tenant_id" in userinfo:
            user.tenant_id = userinfo["tenant_id"]

        # LOGIN SUCCESS (only if user is now registered)
        if user:
            self.update_user_auth_stat(user)
            return user
        else:
            return None

superset_config.py

from superset.custom.security import CustomSecurityManager
CUSTOM_SECURITY_MANAGER=CustomSecurityManager
© www.soinside.com 2019 - 2024. All rights reserved.