|
16 | 16 |
|
17 | 17 | import configparser
|
18 | 18 | import os
|
| 19 | +from typing import Optional |
19 | 20 |
|
20 | 21 | from oauthlib.oauth2 import LegacyApplicationClient
|
21 | 22 | from requests_oauthlib import OAuth2Session
|
|
27 | 28 |
|
28 | 29 | class Authenticator(object):
|
29 | 30 |
|
30 |
| - def __init__(self, configuration_file_location=None): |
31 |
| - self.keycloak_settings = KeycloakServerSettings(configuration_file_location) |
| 31 | + def __init__(self, configuration_file_location: Optional[str] = None): |
| 32 | + self.settings = KeycloakServerSettings(configuration_file_location) |
32 | 33 | self._load_settings(configuration_file_location)
|
33 | 34 |
|
34 |
| - def get_token_and_user_info_password_flow(self, username, password, gateway_id): |
35 |
| - client_id = self.keycloak_settings.CLIENT_ID |
36 |
| - client_secret = self.keycloak_settings.CLIENT_SECRET |
37 |
| - token_url = self.keycloak_settings.TOKEN_URL |
38 |
| - userinfo_url = self.keycloak_settings.USER_INFO_URL |
39 |
| - verify_ssl = self.keycloak_settings.VERIFY_SSL |
40 |
| - oauth2_session = OAuth2Session(client=LegacyApplicationClient( |
41 |
| - client_id=client_id)) |
42 |
| - token = oauth2_session.fetch_token(token_url=token_url, |
43 |
| - username=username, |
44 |
| - password=password, |
45 |
| - client_id=client_id, |
46 |
| - client_secret=client_secret, |
47 |
| - verify=verify_ssl) |
48 |
| - |
| 35 | + def get_token_and_user_info_password_flow(self, username: str, password: str, gateway_id: str): |
| 36 | + client_id = self.settings.CLIENT_ID |
| 37 | + client_secret = self.settings.CLIENT_SECRET |
| 38 | + token_url = self.settings.TOKEN_URL |
| 39 | + # userinfo_url = self.keycloak_settings.USER_INFO_URL |
| 40 | + verify_ssl = self.settings.VERIFY_SSL |
| 41 | + oauth2_session = OAuth2Session(client=LegacyApplicationClient(client_id=client_id)) |
| 42 | + token = oauth2_session.fetch_token( |
| 43 | + token_url=token_url, |
| 44 | + username=username, |
| 45 | + password=password, |
| 46 | + client_id=client_id, |
| 47 | + client_secret=client_secret, |
| 48 | + verify=verify_ssl, |
| 49 | + ) |
49 | 50 | claimsMap = {
|
50 | 51 | "userName": username,
|
51 | 52 | "gatewayID": gateway_id
|
52 | 53 | }
|
53 | 54 | return AuthzToken(accessToken=token['access_token'], claimsMap=claimsMap)
|
54 | 55 |
|
55 |
| - def get_airavata_authz_token(self, username, token, gateway_id): |
| 56 | + def get_airavata_authz_token(self, username: str, token: str, gateway_id: str): |
56 | 57 | claimsMap = {
|
57 | 58 | "userName": username,
|
58 | 59 | "gatewayID": gateway_id
|
59 | 60 | }
|
60 | 61 | return AuthzToken(accessToken=token, claimsMap=claimsMap)
|
61 | 62 |
|
62 |
| - def get_authorize_url(self, username, password, gateway_id): |
63 |
| - client_id = self.keycloak_settings.CLIENT_ID |
64 |
| - client_secret = self.keycloak_settings.CLIENT_SECRET |
65 |
| - token_url = self.keycloak_settings.TOKEN_URL |
66 |
| - userinfo_url = self.keycloak_settings.USER_INFO_URL |
67 |
| - verify_ssl = self.keycloak_settings.VERIFY_SSL |
68 |
| - oauth2_session = OAuth2Session(client=LegacyApplicationClient( |
69 |
| - client_id=client_id)) |
70 |
| - token = oauth2_session.fetch_token(token_url=token_url, |
71 |
| - username=username, |
72 |
| - password=password, |
73 |
| - client_id=client_id, |
74 |
| - client_secret=client_secret, |
75 |
| - verify=verify_ssl) |
76 |
| - |
| 63 | + def get_authorize_url(self, username: str, password: str, gateway_id: str): |
| 64 | + client_id = self.settings.CLIENT_ID |
| 65 | + client_secret = self.settings.CLIENT_SECRET |
| 66 | + token_url = self.settings.TOKEN_URL |
| 67 | + # userinfo_url = self.keycloak_settings.USER_INFO_URL |
| 68 | + verify_ssl = self.settings.VERIFY_SSL |
| 69 | + oauth2_session = OAuth2Session(client=LegacyApplicationClient(client_id=client_id)) |
| 70 | + token = oauth2_session.fetch_token( |
| 71 | + token_url=token_url, |
| 72 | + username=username, |
| 73 | + password=password, |
| 74 | + client_id=client_id, |
| 75 | + client_secret=client_secret, |
| 76 | + verify=verify_ssl, |
| 77 | + ) |
77 | 78 | claimsMap = {
|
78 | 79 | "userName": username,
|
79 | 80 | "gatewayID": gateway_id
|
80 | 81 | }
|
81 | 82 | return AuthzToken(accessToken=token['access_token'], claimsMap=claimsMap)
|
82 | 83 |
|
83 | 84 | def authenticate_with_auth_code(self):
|
84 |
| - print("Click on Login URI ", self.keycloak_settings.LOGIN_DESKTOP_URI) |
85 |
| - return self.keycloak_settings.LOGIN_DESKTOP_URI |
| 85 | + print("Click on Login URI ", self.settings.LOGIN_DESKTOP_URI) |
| 86 | + return self.settings.LOGIN_DESKTOP_URI |
86 | 87 |
|
87 |
| - def _load_settings(self, configuration_file_location): |
| 88 | + def _load_settings(self, configuration_file_location: Optional[str]): |
88 | 89 | if configuration_file_location is not None:
|
89 | 90 | config = configparser.ConfigParser()
|
90 | 91 | config.read(configuration_file_location)
|
91 | 92 | # self.keycloak_settings.KEYCLOAK_CA_CERTIFICATE = config.get("KeycloakServer", 'CERTIFICATE_FILE_PATH')
|
92 |
| - self.keycloak_settings.CLIENT_ID = config.get('KeycloakServer', 'CLIENT_ID') |
93 |
| - self.keycloak_settings.CLIENT_SECRET = config.get('KeycloakServer', 'CLIENT_SECRET') |
94 |
| - self.keycloak_settings.TOKEN_URL = config.get('KeycloakServer', 'TOKEN_URL') |
95 |
| - self.keycloak_settings.USER_INFO_URL = config.get('KeycloakServer', 'USER_INFO_URL') |
96 |
| - self.keycloak_settings.VERIFY_SSL = config.getboolean('KeycloakServer', 'VERIFY_SSL') |
| 93 | + self.settings.CLIENT_ID = config.get('KeycloakServer', 'CLIENT_ID') |
| 94 | + self.settings.CLIENT_SECRET = config.get('KeycloakServer', 'CLIENT_SECRET') |
| 95 | + self.settings.TOKEN_URL = config.get('KeycloakServer', 'TOKEN_URL') |
| 96 | + self.settings.USER_INFO_URL = config.get('KeycloakServer', 'USER_INFO_URL') |
| 97 | + self.settings.VERIFY_SSL = config.getboolean('KeycloakServer', 'VERIFY_SSL') |
0 commit comments