diff --git a/docs/security.rst b/docs/security.rst index 6736b157c..2c637ba80 100644 --- a/docs/security.rst +++ b/docs/security.rst @@ -343,13 +343,35 @@ Specify a list of OAUTH_PROVIDERS in **config.py** that you want to allow for yo "authorize_url": "https://login.microsoftonline.com/AZURE_TENANT_ID/oauth2/authorize", }, }, + { + "name": "authentik", + "token_key": "access_token", + "icon": "fa-fingerprint", + "remote_app": { + "api_base_url": "https://authentik.mydomain.com", + "client_kwargs": { + "scope": "email profile", + "verify_signature": True, + }, + "access_token_url": ( + "https://authentik.mydomain.com/application/o/token/" + ), + "authorize_url": ( + "https://authentik.mydomain.com/application/o/authorize/" + ), + "request_token_url": None, + "client_id": "CLIENT_ID", + "client_secret": "CLIENT_SECRET", + 'jwks_uri': 'https://authentik.mydomain.com/application/o/APPLICATION_NAME/jwks/', + }, + }, ] This needs a small explanation, you basically have five special keys: :name: the name of the provider: you can choose whatever you want, but FAB has builtin logic in `BaseSecurityManager.get_oauth_user_info()` for: - 'azure', 'github', 'google', 'keycloak', 'keycloak_before_17', 'linkedin', 'okta', 'openshift', 'twitter' + 'authentik', 'azure', 'github', 'google', 'keycloak', 'keycloak_before_17', 'linkedin', 'okta', 'openshift', 'twitter' :icon: the font-awesome icon for this provider diff --git a/flask_appbuilder/exceptions.py b/flask_appbuilder/exceptions.py index 92189f6b2..9a34d174e 100644 --- a/flask_appbuilder/exceptions.py +++ b/flask_appbuilder/exceptions.py @@ -58,3 +58,11 @@ class OAuthProviderUnknown(FABException): """ ... + + +class InvalidLoginAttempt(FABException): + """ + When the credentials entered could not be verified + """ + + ... diff --git a/flask_appbuilder/security/manager.py b/flask_appbuilder/security/manager.py index 80a91b8cf..869ef42ca 100644 --- a/flask_appbuilder/security/manager.py +++ b/flask_appbuilder/security/manager.py @@ -4,7 +4,7 @@ from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union from flask import Flask, g, session, url_for -from flask_appbuilder.exceptions import OAuthProviderUnknown +from flask_appbuilder.exceptions import InvalidLoginAttempt, OAuthProviderUnknown from flask_babel import lazy_gettext as _ from flask_jwt_extended import current_user as current_user_jwt from flask_jwt_extended import JWTManager @@ -680,6 +680,18 @@ def get_oauth_user_info( "last_name": data.get("family_name", ""), "email": data.get("email", ""), } + # for Authentik + if provider == "authentik": + id_token = resp["id_token"] + me = self._get_authentik_token_info(id_token) + log.debug("User info from authentik: %s", me) + return { + "email": me["preferred_username"], + "first_name": me.get("given_name", ""), + "username": me["nickname"], + "role_keys": me.get("groups", []), + } + raise OAuthProviderUnknown() def _get_microsoft_jwks(self) -> List[Dict[str, Any]]: @@ -701,6 +713,48 @@ def _decode_and_validate_azure_jwt(self, id_token: str) -> Dict[str, str]: return jwt.decode(id_token, options={"verify_signature": False}) + def _get_authentik_jwks(self, jwks_url) -> dict: + import requests + + resp = requests.get(jwks_url) + if resp.status_code == 200: + return resp.json() + return False + + def _validate_jwt(self, id_token, jwks): + from authlib.jose import JsonWebKey, jwt as authlib_jwt + + keyset = JsonWebKey.import_key_set(jwks) + claims = authlib_jwt.decode(id_token, keyset) + claims.validate() + log.info("JWT token is validated") + return claims + + def _get_authentik_token_info(self, id_token): + me = jwt.decode(id_token, options={"verify_signature": False}) + + verify_signature = self.oauth_remotes["authentik"].client_kwargs.get( + "verify_signature", True + ) + if verify_signature: + # Validate the token using authentik certificate + jwks_uri = self.oauth_remotes["authentik"].server_metadata.get("jwks_uri") + if jwks_uri: + jwks = self._get_authentik_jwks(jwks_uri) + if jwks: + return self._validate_jwt(id_token, jwks) + else: + log.error( + "jwks_uri not specified in OAuth Providers, " + "could not verify token signature" + ) + else: + # Return the token info without validating + log.warning("JWT token is not validated!") + return me + + raise InvalidLoginAttempt("OAuth signature verify failed") + def register_views(self): if not self.appbuilder.app.config.get("FAB_ADD_SECURITY_VIEWS", True): return diff --git a/tests/security/test_auth_oauth.py b/tests/security/test_auth_oauth.py index 799ce46a6..a10e17d3c 100644 --- a/tests/security/test_auth_oauth.py +++ b/tests/security/test_auth_oauth.py @@ -3,13 +3,17 @@ import unittest from unittest.mock import MagicMock +from authlib.jose.errors import BadSignatureError from flask import Flask -from flask_appbuilder import AppBuilder, SQLA +from flask_appbuilder import AppBuilder +from flask_appbuilder import SQLA from flask_appbuilder.const import AUTH_OAUTH +from flask_appbuilder.exceptions import InvalidLoginAttempt from flask_appbuilder.exceptions import OAuthProviderUnknown import jinja2 import jwt -from tests.const import USERNAME_ADMIN, USERNAME_READONLY +from tests.const import USERNAME_ADMIN +from tests.const import USERNAME_READONLY from tests.fixtures.users import create_default_users logging.basicConfig(format="%(asctime)s:%(levelname)s:%(name)s:%(message)s") @@ -695,3 +699,355 @@ def test_oauth_user_info_auth0(self): "username": "auth0_test-sub", }, ) + + +class OAuthAuthentikTestCase(unittest.TestCase): + def setUp(self): + # start Flask + self.app = Flask(__name__) + self.app.jinja_env.undefined = jinja2.StrictUndefined + self.app.config["SQLALCHEMY_DATABASE_URI"] = os.environ.get( + "SQLALCHEMY_DATABASE_URI", "sqlite:///" + ) + self.app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False + self.app.config["AUTH_TYPE"] = AUTH_OAUTH + self.app.config["OAUTH_PROVIDERS"] = [ + { + "name": "authentik", + "token_key": "access_token", + "icon": "fa-fingerprint", + "remote_app": { + "api_base_url": "https://authentik.mydomain.com", + "client_kwargs": { + "scope": "email profile", + "verify_signature": False, + }, + "access_token_url": ( + "https://authentik.mydomain.com" "/application/o/token/" + ), + "authorize_url": ( + "https://authentik.mydomain.com/" "application/o/authorize/" + ), + "request_token_url": None, + "client_id": "CLIENT_ID", + "client_secret": "CLIENT_SECRET", + }, + }, + ] + + # start Database + self.db = SQLA(self.app) + + def tearDown(self): + # Remove test user + user_alice = self.appbuilder.sm.find_user("alice") + if user_alice: + self.db.session.delete(user_alice) + self.db.session.commit() + + # stop Flask + self.app = None + + # stop Flask-AppBuilder + self.appbuilder = None + + # stop Database + self.db.session.remove() + self.db = None + + # ---------------- + # Unit Tests + # ---------------- + def test_oauth_user_info_authentik(self): + self.appbuilder = AppBuilder(self.app, self.db.session) + claims = { + "iss": "https://authentik.mydomain.com/application/o/flask-appbuilder-test/", + "sub": "2ac1102e7cf5a4b1cb2dd5adbe4761c551691ecd88991f78d0195d4d3d0cfcfa", + "aud": "CLIENT_ID", + "exp": 1703257941, + "iat": 1700665941, + "auth_time": 7282182129, # 100 years from now ;) + "acr": "goauthentik.io/providers/oauth2/default", + "at_hash": "cAydO2DJMi_ZL6opx3eUdw", + "email": "alice@example.com", + "email_verified": True, + "name": "Alice", + "given_name": "Alice Doe", + "preferred_username": "alice@example.com", + "nickname": "alice", + "groups": ["GROUP_1", "GROUP_2"], + } + + # Create an unsigned JWT + unsigned_jwt = jwt.encode(claims, key=None, algorithm="none") + user_info = self.appbuilder.sm.get_oauth_user_info( + "authentik", {"access_token": "", "id_token": unsigned_jwt} + ) + self.assertEqual( + user_info, + { + "email": "alice@example.com", + "first_name": "Alice Doe", + "role_keys": ["GROUP_1", "GROUP_2"], + "username": "alice", + }, + ) + + def test_oauth_user_info_authentik_with_jwt_validation(self): + self.app.config["OAUTH_PROVIDERS"] = [ + { + "name": "authentik", + "token_key": "access_token", + "icon": "fa-fingerprint", + "remote_app": { + "api_base_url": "https://authentik.mydomain.com", + "client_kwargs": { + "scope": "email profile", + "verify_signature": True, + }, + "access_token_url": ( + "https://authentik.mydomain.com" "/application/o/token/" + ), + "authorize_url": ( + "https://authentik.mydomain.com/" "application/o/authorize/" + ), + "request_token_url": None, + "client_id": "CLIENT_ID", + "client_secret": "CLIENT_SECRET", + "jwks_uri": ( + "https://authentik.mydomain.com/" + "application/o/APPLICATION_NAME/jwks/" + ), + }, + }, + ] + + self.appbuilder = AppBuilder(self.app, self.db.session) + claims = { + "iss": "https://authentik.mydomain.com/application/o/flask-appbuilder-test/", + "sub": "2ac1102e7cf5a4b1cb2dd5adbe4761c551691ecd88991f78d0195d4d3d0cfcfa", + "aud": "CLIENT_ID", + "exp": 1703257941, + "iat": 1700665941, + "auth_time": 7282182129, # 100 years from now ;) + "acr": "goauthentik.io/providers/oauth2/default", + "at_hash": "cAydO2DJMi_ZL6opx3eUdw", + "email": "alice@example.com", + "email_verified": True, + "name": "Alice", + "given_name": "Alice Doe", + "preferred_username": "alice@example.com", + "nickname": "alice", + "groups": ["GROUP_1", "GROUP_2"], + } + from unittest.mock import MagicMock + + private_key = """-----BEGIN PRIVATE KEY----- +MIICdgIBADANBgkqhkiG9w0BAQEFAASCAmAwggJcAgEAAoGBALeDojEka93XZ/J8 +bDGgn2MIHykafgCx2D6wTZgmmhzpRH7/k7J/WSsqG6eSFg38mGJukPCa4dcG8dCL +meajEf2g4IoaYiE55yXs0ou/tixBJI8wRY+NfCluxgIcHdKhZISVO6CkR5r7diN/ +SLHPsFnDd0UiMJ5c48UsJwk8T5T7AgMBAAECgYEAqalrVB+mEi1KDud1Z9RmRzqF +BI1XnPDPSfXZZyeZJ82J5BgJxubx23RMqPnopfm4MJikK64lyZTED9hg6tgskk1X +J9pc7iyU4PQf+tx4tvElyOL4OSqGss/tQHtHz76hNOR1kxeCcJsJG+WS8P0/Kmj1 +0IoYKLFlb5AHr6KqDGECQQDZ0qKIzxdmZj3gSsNldc4oOQOKJgd1QSDGCOqR9p7f +oj7nuOPRVgnztqXALhNhpZXYJq8dWmpGYFi+EC1piRUDAkEA162gPgGzUJAIyhUM +sA6Uy9v64nqBnlygVpofhdvyznSf/KUsmWQZv7gpMMXnIGAQP+rqM1gJvuRtodml +hUeSqQJAHJH4J6GiHBhE/WpQ/rnY9IWl5TTfvY1xUwhQXBzQ8dxCC/rARvDWFVVb +oD1q5V/mq5dHWL5HOjvg5+0PR8xnKQJAMOdBik3AZugB1jBnrBPiUUcT3/5/HXVL +NdfEhgmVSJLRI+wf7LfxzrLnRBPbkE+334ZYjEPOEeahpS1AhrPv4QJAHpap1I+v +8m+N5G/MppasppHLJmXhnFeQsnBX7XcdYiCqHikuBlIzoQ0Cj5xbkfgMMCVORO64 +r9+EFRsxA5GNYA== +-----END PRIVATE KEY-----""" + # Create a signed JWT + signed_jwt = jwt.encode( + claims, key=private_key, algorithm="RS256", headers={"kid": "1"} + ) + self.appbuilder.sm._get_authentik_jwks = MagicMock( + return_value={ + "keys": [ + { + "alg": "RS256", + "e": "AQAB", + "kid": "1", + "kty": "RSA", + "n": "t4OiMSRr3ddn8nxsMaCfYwgfKRp-ALHYPrBNmCaaHOlEfv-" + "Tsn9ZKyobp5IWDfyYYm6Q8Jrh1wbx0IuZ5qMR_aDgihpiITnnJezSi7-" + "2LEEkjzBFj418KW7GAhwd0qFkhJU7oKRHmvt2I39Isc-wWcN3RSIwnlz" + "jxSwnCTxPlPs", + "use": "sig", + "x5c": [ + "MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQC3g6IxJGvd12fyfGwxoJ9" + "jCB8pGn4Asdg+sE2YJpoc6UR+/5Oyf1krKhunkhYN/JhibpDwmuHXBvHQi5n" + "moxH9oOCKGmIhOecl7NKLv7YsQSSPMEWPjXwpbsYCHB3SoWSElTugpEea+3Y" + "jf0ixz7BZw3dFIjCeXOPFLCcJPE+U+wIDAQAB" + ], + } + ] + } + ) + user_info = self.appbuilder.sm.get_oauth_user_info( + "authentik", {"access_token": "", "id_token": signed_jwt} + ) + self.assertEqual( + user_info, + { + "email": "alice@example.com", + "first_name": "Alice Doe", + "role_keys": ["GROUP_1", "GROUP_2"], + "username": "alice", + }, + ) + + def test_oauth_user_info_authentik_with_jwt_validation_wrong_signature(self): + """ + Tests if the get_user_info raises an exception + if the token is signed by a different JKWS + """ + self.app.config["OAUTH_PROVIDERS"] = [ + { + "name": "authentik", + "token_key": "access_token", + "icon": "fa-fingerprint", + "remote_app": { + "api_base_url": "https://authentik.mydomain.com", + "client_kwargs": { + "scope": "email profile", + "verify_signature": True, + }, + "access_token_url": ( + "https://authentik.mydomain.com" "/application/o/token/" + ), + "authorize_url": ( + "https://authentik.mydomain.com/" "application/o/authorize/" + ), + "request_token_url": None, + "client_id": "CLIENT_ID", + "client_secret": "CLIENT_SECRET", + "jwks_uri": ( + "https://authentik.mydomain.com/" + "application/o/APPLICATION_NAME/jwks/" + ), + }, + }, + ] + + self.appbuilder = AppBuilder(self.app, self.db.session) + claims = { + "iss": "https://authentik.mydomain.com/application/o/flask-appbuilder-test/", + "sub": "2ac1102e7cf5a4b1cb2dd5adbe4761c551691ecd88991f78d0195d4d3d0cfcfa", + "aud": "CLIENT_ID", + "exp": 1703257941, + "iat": 1700665941, + "auth_time": 7282182129, # 100 years from now ;) + "acr": "goauthentik.io/providers/oauth2/default", + "at_hash": "cAydO2DJMi_ZL6opx3eUdw", + "email": "alice@example.com", + "email_verified": True, + "name": "Alice", + "given_name": "Alice Doe", + "preferred_username": "alice@example.com", + "nickname": "alice", + "groups": ["GROUP_1", "GROUP_2"], + } + from unittest.mock import MagicMock + + private_key = """-----BEGIN PRIVATE KEY----- +MIIBVAIBADANBgkqhkiG9w0BAQEFAASCAT4wggE6AgEAAkEAqPfgaTEWEP3S9w0t +gsicURfo+nLW09/0KfOPinhYZ4ouzU+3xC4pSlEp8Ut9FgL0AgqNslNaK34Kq+NZ +jO9DAQIDAQABAkAgkuLEHLaqkWhLgNKagSajeobLS3rPT0Agm0f7k55FXVt743hw +Ngkp98bMNrzy9AQ1mJGbQZGrpr4c8ZAx3aRNAiEAoxK/MgGeeLui385KJ7ZOYktj +hLBNAB69fKwTZFsUNh0CIQEJQRpFCcydunv2bENcN/oBTRw39E8GNv2pIcNxZkcb +NQIgbYSzn3Py6AasNj6nEtCfB+i1p3F35TK/87DlPSrmAgkCIQDJLhFoj1gbwRbH +/bDRPrtlRUDDx44wHoEhSDRdy77eiQIgE6z/k6I+ChN1LLttwX0galITxmAYrOBh +BVl433tgTTQ= +-----END PRIVATE KEY-----""" + # Create a signed JWT + wrong_signed_jwt = jwt.encode( + claims, key=private_key, algorithm="RS256", headers={"kid": "1"} + ) + self.appbuilder.sm._get_authentik_jwks = MagicMock( + return_value={ + "keys": [ + { + "alg": "RS256", + "e": "AQAB", + "kid": "1", + "kty": "RSA", + "n": "t4OiMSRr3ddn8nxsMaCfYwgfKRp-ALHYPrBNmCaaHOlEfv-" + "Tsn9ZKyobp5IWDfyYYm6Q8Jrh1wbx0IuZ5qMR_aDgihpiITnnJezSi7-" + "2LEEkjzBFj418KW7GAhwd0qFkhJU7oKRHmvt2I39Isc-wWcN3RSIwnlz" + "jxSwnCTxPlPs", + "use": "sig", + "x5c": [ + "MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQC3g6IxJGvd12fyfGwxoJ9" + "jCB8pGn4Asdg+sE2YJpoc6UR+/5Oyf1krKhunkhYN/JhibpDwmuHXBvHQi5n" + "moxH9oOCKGmIhOecl7NKLv7YsQSSPMEWPjXwpbsYCHB3SoWSElTugpEea+3Y" + "jf0ixz7BZw3dFIjCeXOPFLCcJPE+U+wIDAQAB" + ], + } + ] + } + ) + with self.assertRaises(BadSignatureError): + self.appbuilder.sm.get_oauth_user_info( + "authentik", {"access_token": "", "id_token": wrong_signed_jwt} + ) + + def test_oauth_user_info_authentik_with_jwt_validation_without_signature(self): + """ + Tests if an unsigned token raises an error if verify_signature is set to True + """ + self.app.config["OAUTH_PROVIDERS"] = [ + { + "name": "authentik", + "token_key": "access_token", + "icon": "fa-fingerprint", + "remote_app": { + "api_base_url": "https://authentik.mydomain.com", + "client_kwargs": { + "scope": "email profile", + "verify_signature": True, + }, + "access_token_url": ( + "https://authentik.mydomain.com" "/application/o/token/" + ), + "authorize_url": ( + "https://authentik.mydomain.com/" "application/o/authorize/" + ), + "request_token_url": None, + "client_id": "CLIENT_ID", + "client_secret": "CLIENT_SECRET", + "jwks_uri": ( + "https://authentik.mydomain.com/" + "application/o/APPLICATION_NAME/jwks/" + ), + }, + }, + ] + + self.appbuilder = AppBuilder(self.app, self.db.session) + claims = { + "iss": "https://authentik.mydomain.com/application/o/flask-appbuilder-test/", + "sub": "2ac1102e7cf5a4b1cb2dd5adbe4761c551691ecd88991f78d0195d4d3d0cfcfa", + "aud": "CLIENT_ID", + "exp": 1703257941, + "iat": 1700665941, + "auth_time": 7282182129, # 100 years from now ;) + "acr": "goauthentik.io/providers/oauth2/default", + "at_hash": "cAydO2DJMi_ZL6opx3eUdw", + "email": "alice@example.com", + "email_verified": True, + "name": "Alice", + "given_name": "Alice Doe", + "preferred_username": "alice@example.com", + "nickname": "alice", + "groups": ["GROUP_1", "GROUP_2"], + } + from unittest.mock import MagicMock + + unsigned_jwt = jwt.encode(claims, key=None, algorithm="none") + self.appbuilder.sm._get_authentik_jwks = MagicMock(return_value={}) + with self.assertRaises(InvalidLoginAttempt): + self.appbuilder.sm.get_oauth_user_info( + "authentik", {"access_token": "", "id_token": unsigned_jwt} + )