Skip to content

Commit 01302a1

Browse files
authored
AIP-72: Add "Get Variable" endpoint for Execution API (#43832)
This commit introduces a new endpoint, `/execution/variable/{variable_key}`, in the Execution API to retrieve Variables details. Same as the Connections PR, it uses a placeholder `check_connection_access` function to validate task permissions for each request.
1 parent 6c30fc5 commit 01302a1

File tree

8 files changed

+189
-22
lines changed

8 files changed

+189
-22
lines changed

airflow/api_fastapi/execution_api/datamodels.py

+9
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,15 @@ class ConnectionResponse(BaseModel):
134134
extra: str | None
135135

136136

137+
class VariableResponse(BaseModel):
138+
"""Variable schema for responses with fields that are needed for Runtime."""
139+
140+
model_config = ConfigDict(from_attributes=True)
141+
142+
key: str
143+
val: str | None = Field(alias="value")
144+
145+
137146
# TODO: This is a placeholder for Task Identity Token schema.
138147
class TIToken(BaseModel):
139148
"""Task Identity Token."""

airflow/api_fastapi/execution_api/routes/__init__.py

+5-6
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,10 @@
1717
from __future__ import annotations
1818

1919
from airflow.api_fastapi.common.router import AirflowRouter
20-
from airflow.api_fastapi.execution_api.routes.connections import connection_router
21-
from airflow.api_fastapi.execution_api.routes.health import health_router
22-
from airflow.api_fastapi.execution_api.routes.task_instance import ti_router
20+
from airflow.api_fastapi.execution_api.routes import connections, health, task_instance, variables
2321

2422
execution_api_router = AirflowRouter()
25-
execution_api_router.include_router(connection_router)
26-
execution_api_router.include_router(health_router)
27-
execution_api_router.include_router(ti_router)
23+
execution_api_router.include_router(connections.router, prefix="/connections", tags=["Connections"])
24+
execution_api_router.include_router(health.router, tags=["Health"])
25+
execution_api_router.include_router(task_instance.router, prefix="/task_instance", tags=["Task Instance"])
26+
execution_api_router.include_router(variables.router, prefix="/variables", tags=["Variables"])

airflow/api_fastapi/execution_api/routes/connections.py

+2-4
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,7 @@
2828
from airflow.models.connection import Connection
2929

3030
# TODO: Add dependency on JWT token
31-
connection_router = AirflowRouter(
32-
prefix="/connection",
33-
tags=["Connection"],
31+
router = AirflowRouter(
3432
responses={status.HTTP_404_NOT_FOUND: {"description": "Connection not found"}},
3533
)
3634

@@ -42,7 +40,7 @@ def get_task_token() -> datamodels.TIToken:
4240
return datamodels.TIToken(ti_key="test_key")
4341

4442

45-
@connection_router.get(
43+
@router.get(
4644
"/{connection_id}",
4745
responses={
4846
status.HTTP_401_UNAUTHORIZED: {"description": "Unauthorized"},

airflow/api_fastapi/execution_api/routes/health.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@
1919

2020
from airflow.api_fastapi.common.router import AirflowRouter
2121

22-
health_router = AirflowRouter(tags=["Health"])
22+
router = AirflowRouter()
2323

2424

25-
@health_router.get("/health")
25+
@router.get("/health")
2626
def health() -> dict:
2727
return {"status": "healthy"}

airflow/api_fastapi/execution_api/routes/task_instance.py

+3-6
Original file line numberDiff line numberDiff line change
@@ -35,16 +35,13 @@
3535
from airflow.utils.state import State
3636

3737
# TODO: Add dependency on JWT token
38-
ti_router = AirflowRouter(
39-
prefix="/task_instance",
40-
tags=["Task Instance"],
41-
)
38+
router = AirflowRouter()
4239

4340

4441
log = logging.getLogger(__name__)
4542

4643

47-
@ti_router.patch(
44+
@router.patch(
4845
"/{task_instance_id}/state",
4946
status_code=status.HTTP_204_NO_CONTENT,
5047
# TODO: Add description to the operation
@@ -133,7 +130,7 @@ def ti_update_state(
133130
)
134131

135132

136-
@ti_router.put(
133+
@router.put(
137134
"/{task_instance_id}/heartbeat",
138135
status_code=status.HTTP_204_NO_CONTENT,
139136
responses={
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
from __future__ import annotations
19+
20+
import logging
21+
22+
from fastapi import Depends, HTTPException, status
23+
from typing_extensions import Annotated
24+
25+
from airflow.api_fastapi.common.router import AirflowRouter
26+
from airflow.api_fastapi.execution_api import datamodels
27+
from airflow.models.variable import Variable
28+
29+
# TODO: Add dependency on JWT token
30+
router = AirflowRouter(
31+
responses={status.HTTP_404_NOT_FOUND: {"description": "Variable not found"}},
32+
)
33+
34+
log = logging.getLogger(__name__)
35+
36+
37+
def get_task_token() -> datamodels.TIToken:
38+
"""TODO: Placeholder for task identity authentication. This should be replaced with actual JWT decoding and validation."""
39+
return datamodels.TIToken(ti_key="test_key")
40+
41+
42+
@router.get(
43+
"/{variable_key}",
44+
responses={
45+
status.HTTP_401_UNAUTHORIZED: {"description": "Unauthorized"},
46+
status.HTTP_403_FORBIDDEN: {"description": "Task does not have access to the variable"},
47+
},
48+
)
49+
def get_variable(
50+
variable_key: str,
51+
token: Annotated[datamodels.TIToken, Depends(get_task_token)],
52+
) -> datamodels.VariableResponse:
53+
"""Get an Airflow Variable."""
54+
if not has_variable_access(variable_key, token):
55+
raise HTTPException(
56+
status_code=status.HTTP_403_FORBIDDEN,
57+
detail={
58+
"reason": "access_denied",
59+
"message": f"Task does not have access to variable {variable_key}",
60+
},
61+
)
62+
63+
try:
64+
variable_value = Variable.get(variable_key)
65+
except KeyError:
66+
raise HTTPException(
67+
status.HTTP_404_NOT_FOUND,
68+
detail={
69+
"reason": "not_found",
70+
"message": f"Variable with key '{variable_key}' not found",
71+
},
72+
)
73+
74+
return datamodels.VariableResponse(key=variable_key, value=variable_value)
75+
76+
77+
def has_variable_access(variable_key: str, token: datamodels.TIToken) -> bool:
78+
"""Check if the task has access to the variable."""
79+
# TODO: Placeholder for actual implementation
80+
81+
ti_key = token.ti_key
82+
log.debug(
83+
"Checking access for task instance with key '%s' to variable '%s'",
84+
ti_key,
85+
variable_key,
86+
)
87+
return True

tests/api_fastapi/execution_api/routes/test_connection.py tests/api_fastapi/execution_api/routes/test_connections.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ def test_connection_get_from_db(self, client, session):
4343
session.add(connection)
4444
session.commit()
4545

46-
response = client.get("/execution/connection/test_conn")
46+
response = client.get("/execution/connections/test_conn")
4747

4848
assert response.status_code == 200
4949
assert response.json() == {
@@ -66,7 +66,7 @@ def test_connection_get_from_db(self, client, session):
6666
{"AIRFLOW_CONN_TEST_CONN2": '{"uri": "http://root:admin@localhost:8080/https?headers=header"}'},
6767
)
6868
def test_connection_get_from_env_var(self, client, session):
69-
response = client.get("/execution/connection/test_conn2")
69+
response = client.get("/execution/connections/test_conn2")
7070

7171
assert response.status_code == 200
7272
assert response.json() == {
@@ -81,7 +81,7 @@ def test_connection_get_from_env_var(self, client, session):
8181
}
8282

8383
def test_connection_get_not_found(self, client):
84-
response = client.get("/execution/connection/non_existent_test_conn")
84+
response = client.get("/execution/connections/non_existent_test_conn")
8585

8686
assert response.status_code == 404
8787
assert response.json() == {
@@ -95,7 +95,7 @@ def test_connection_get_access_denied(self, client):
9595
with mock.patch(
9696
"airflow.api_fastapi.execution_api.routes.connections.has_connection_access", return_value=False
9797
):
98-
response = client.get("/execution/connection/test_conn")
98+
response = client.get("/execution/connections/test_conn")
9999

100100
# Assert response status code and detail for access denied
101101
assert response.status_code == 403
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
from __future__ import annotations
19+
20+
from unittest import mock
21+
22+
import pytest
23+
24+
from airflow.models.variable import Variable
25+
26+
pytestmark = pytest.mark.db_test
27+
28+
29+
class TestGetVariable:
30+
def test_variable_get_from_db(self, client, session):
31+
Variable.set(key="var1", value="value", session=session)
32+
session.commit()
33+
34+
response = client.get("/execution/variables/var1")
35+
36+
assert response.status_code == 200
37+
assert response.json() == {"key": "var1", "value": "value"}
38+
39+
# Remove connection
40+
Variable.delete(key="var1", session=session)
41+
session.commit()
42+
43+
@mock.patch.dict(
44+
"os.environ",
45+
{"AIRFLOW_VAR_KEY1": "VALUE"},
46+
)
47+
def test_variable_get_from_env_var(self, client, session):
48+
response = client.get("/execution/variables/key1")
49+
50+
assert response.status_code == 200
51+
assert response.json() == {"key": "key1", "value": "VALUE"}
52+
53+
def test_variable_get_not_found(self, client):
54+
response = client.get("/execution/variables/non_existent_var")
55+
56+
assert response.status_code == 404
57+
assert response.json() == {
58+
"detail": {
59+
"message": "Variable with key 'non_existent_var' not found",
60+
"reason": "not_found",
61+
}
62+
}
63+
64+
def test_variable_get_access_denied(self, client):
65+
with mock.patch(
66+
"airflow.api_fastapi.execution_api.routes.variables.has_variable_access", return_value=False
67+
):
68+
response = client.get("/execution/variables/key1")
69+
70+
# Assert response status code and detail for access denied
71+
assert response.status_code == 403
72+
assert response.json() == {
73+
"detail": {
74+
"reason": "access_denied",
75+
"message": "Task does not have access to variable key1",
76+
}
77+
}

0 commit comments

Comments
 (0)