我想向我登录的 keycloak 用户的用户信息中存在的用户模型添加一些自定义属性。 在 superset_config.py 中我添加了这个:
from flask_appbuilder.security.sqla.models import User
from sqlalchemy import Column, Integer
class CustomUser(User):
__tablename__ = "ab_user"
custom_attribute = Column(Integer)
这是我的 CustomSsoSecurityManager 类:
class CustomSsoSecurityManager(SupersetSecurityManager):
authoauthview = CustomSsoAuthOAuthView
def oauth_user_info(self, provider, res=None):
logging.debug("Oauth2 provider: {0}.".format(provider))
if provider == 'keycloak':
try:
# Assuming self.appbuilder.sm.oauth_remotes[provider] is correctly set up
response = self.appbuilder.sm.oauth_remotes[provider].get('userinfo')
if response.status_code == 200:
data = response.json()
username = data.get("preferred_username", "")
first_name = data.get("given_name", "")
last_name = data.get("family_name", "")
email = data.get("email", "")
customAttribute= data.get("customAttribute", 0)
user_info = {
"username": username,
"first_name": first_name,
"last_name": last_name,
"email": email,
"is_active": True,
"custom_attribute": customAttribute
}
print("User Information:", user_info)
return user_info
else:
logging.debug(f"OAuth provider returned an error: {response.status_code} - {response.text}")
return None
except requests.exceptions.RequestException as e:
logging.debug(f"Request to OAuth provider failed: {e}")
return None
except json.JSONDecodeError as e:
logging.debug(f"Error decoding JSON response: {e}")
return None
except Exception as e:
logging.debug(f"An error occurred: {e}")
return None
创建超集数据库表后,我可以在架构中看到 custom_attribute 列, 问题是当我第一次使用具有 customAttribute 值的 keycloak 用户登录时,超集数据库中没有保存任何内容。 我该怎么做才能确保保存该值?
我还想问是否可以制作一个自定义 Jinja 宏来将此 customAttribute 作为 Sql Lab 中的参数传递
首先你需要阅读这篇关于扩展用户模型属性的官方文章。
然后,为了从 keycloak 用户信息中映射属性,您需要重写 SecurityManager 的一些方法。
以下超集 3.1.0 的代码示例
superset/custom/models.py
from flask_appbuilder.security.sqla.models import User
from sqlalchemy import Column, Integer, ForeignKey, String, Integer, Sequence, Table
from sqlalchemy.orm import relationship, backref
from flask_appbuilder import Model
class CustomUser(User):
__tablename__ = 'ab_user'
tenant_id = Column(Integer)
superset/custom/security.py
import logging
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union
from superset.security.manager import SupersetSecurityManager
from superset.custom.models import CustomUser
log = logging.getLogger(__name__)
class CustomSecurityManager(SupersetSecurityManager):
user_model = CustomUser
# userdbmodelview = CustomUserDBModelView
def get_oauth_user_info(self, provider: str, resp: Dict[str, Any]) -> Dict[str, Any]:
# for Keycloak
if provider in ["keycloak", "keycloak_before_17"]:
me = self.appbuilder.sm.oauth_remotes[provider].get(
"openid-connect/userinfo"
)
me.raise_for_status()
data = me.json()
log.debug("User info from Keycloak: %s", data)
return {
"username": data.get("preferred_username", ""),
"first_name": data.get("given_name", ""),
"last_name": data.get("family_name", ""),
"email": data.get("email", ""),
"tenant_id": data.get("tenant_id", None),
}
else:
return super().get_oauth_user_info(provider, resp)
def _oauth_calculate_user_roles(self, userinfo) -> List[str]:
user_role_objects = set()
# apply AUTH_ROLES_MAPPING
if len(self.auth_roles_mapping) > 0:
user_role_keys = userinfo.get("role_keys", [])
user_role_objects.update(self.get_roles_from_keys(user_role_keys))
# apply AUTH_USER_REGISTRATION_ROLE
if self.auth_user_registration:
registration_role_name = self.auth_user_registration_role
# if AUTH_USER_REGISTRATION_ROLE_JMESPATH is set,
# use it for the registration role
if self.auth_user_registration_role_jmespath:
import jmespath
registration_role_name = jmespath.search(
self.auth_user_registration_role_jmespath, userinfo
)
# lookup registration role in flask db
fab_role = self.find_role(registration_role_name)
if fab_role:
user_role_objects.add(fab_role)
else:
log.warning(
"Can't find AUTH_USER_REGISTRATION role: %s", registration_role_name
)
return list(user_role_objects)
def auth_user_oauth(self, userinfo):
"""
Method for authenticating user with OAuth.
:userinfo: dict with user information
(keys are the same as User model columns)
"""
# extract the username from `userinfo`
if "username" in userinfo:
username = userinfo["username"]
elif "email" in userinfo:
username = userinfo["email"]
else:
log.error("OAUTH userinfo does not have username or email %s", userinfo)
return None
# If username is empty, go away
if (username is None) or username == "":
return None
# Search the DB for this user
user = self.find_user(username=username)
# If user is not active, go away
if user and (not user.is_active):
return None
# If user is not registered, and not self-registration, go away
if (not user) and (not self.auth_user_registration):
return None
# Sync the user's roles
if user and self.auth_roles_sync_at_login:
user.roles = self._oauth_calculate_user_roles(userinfo)
log.debug("Calculated new roles for user='%s' as: %s", username, user.roles)
# If the user is new, register them
if (not user) and self.auth_user_registration:
user = self.add_user(
username=username,
first_name=userinfo.get("first_name", ""),
last_name=userinfo.get("last_name", ""),
email=userinfo.get("email", "") or f"{username}@email.notfound",
role=self._oauth_calculate_user_roles(userinfo),
)
log.debug("New user registered: %s", user)
# If user registration failed, go away
if not user:
log.error("Error creating a new OAuth user %s", username)
return None
log.debug(f"tenant_id {userinfo['tenant_id']}")
# Set tenant_id
if "tenant_id" in userinfo:
user.tenant_id = userinfo["tenant_id"]
# LOGIN SUCCESS (only if user is now registered)
if user:
self.update_user_auth_stat(user)
return user
else:
return None
superset_config.py
from superset.custom.security import CustomSecurityManager
CUSTOM_SECURITY_MANAGER=CustomSecurityManager