Skip to content

Commit 2f07c6b

Browse files
authored
Replace datetime.datetime.utcnow by airflow.utils.timezone.utcnow in core (#35448)
1 parent b14dbf2 commit 2f07c6b

24 files changed

+75
-78
lines changed

airflow/secrets/cache.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import multiprocessing
2222

2323
from airflow.configuration import conf
24+
from airflow.utils import timezone
2425

2526

2627
class SecretCache:
@@ -36,10 +37,10 @@ class NotPresentException(Exception):
3637
class _CacheValue:
3738
def __init__(self, value: str | None) -> None:
3839
self.value = value
39-
self.date = datetime.datetime.utcnow()
40+
self.date = timezone.utcnow()
4041

4142
def is_expired(self, ttl: datetime.timedelta) -> bool:
42-
return datetime.datetime.utcnow() - self.date > ttl
43+
return timezone.utcnow() - self.date > ttl
4344

4445
_VARIABLE_PREFIX = "__v_"
4546
_CONNECTION_PREFIX = "__c_"

airflow/task/task_runner/cgroup_task_runner.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
"""Task runner for cgroup to run Airflow task."""
1919
from __future__ import annotations
2020

21-
import datetime
2221
import os
2322
import uuid
2423
from typing import TYPE_CHECKING
@@ -27,6 +26,7 @@
2726
from cgroupspy import trees
2827

2928
from airflow.task.task_runner.base_task_runner import BaseTaskRunner
29+
from airflow.utils import timezone
3030
from airflow.utils.operator_resources import Resources
3131
from airflow.utils.platform import getuser
3232
from airflow.utils.process_utils import reap_process_group
@@ -137,7 +137,7 @@ def start(self):
137137
return
138138

139139
# Create a unique cgroup name
140-
cgroup_name = f"airflow/{datetime.datetime.utcnow():%Y-%m-%d}/{uuid.uuid4()}"
140+
cgroup_name = f"airflow/{timezone.utcnow():%Y-%m-%d}/{uuid.uuid4()}"
141141

142142
self.mem_cgroup_name = f"memory/{cgroup_name}"
143143
self.cpu_cgroup_name = f"cpu/{cgroup_name}"

airflow/timetables/interval.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
from airflow.exceptions import AirflowTimetableInvalid
2626
from airflow.timetables._cron import CronMixin
2727
from airflow.timetables.base import DagRunInfo, DataInterval, Timetable
28-
from airflow.utils.timezone import convert_to_utc
28+
from airflow.utils.timezone import coerce_datetime, convert_to_utc, utcnow
2929

3030
if TYPE_CHECKING:
3131
from airflow.timetables.base import TimeRestriction
@@ -146,7 +146,7 @@ def _skip_to_latest(self, earliest: DateTime | None) -> DateTime:
146146
If the next schedule should start *right now*, we want the data interval
147147
that start now, not the one that ends now.
148148
"""
149-
current_time = DateTime.utcnow()
149+
current_time = coerce_datetime(utcnow())
150150
last_start = self._get_prev(current_time)
151151
next_start = self._get_next(last_start)
152152
if next_start == current_time: # Current time is on interval boundary.
@@ -257,7 +257,7 @@ def _skip_to_latest(self, earliest: DateTime | None) -> DateTime:
257257
258258
This is slightly different from the cron version at terminal values.
259259
"""
260-
round_current_time = self._round(DateTime.utcnow())
260+
round_current_time = self._round(coerce_datetime(utcnow()))
261261
new_start = self._get_prev(round_current_time)
262262
if earliest is None:
263263
return new_start

airflow/timetables/simple.py

+6-4
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,11 @@
1919
import operator
2020
from typing import TYPE_CHECKING, Any, Collection
2121

22-
from pendulum import DateTime
23-
2422
from airflow.timetables.base import DagRunInfo, DataInterval, Timetable
23+
from airflow.utils import timezone
2524

2625
if TYPE_CHECKING:
26+
from pendulum import DateTime
2727
from sqlalchemy import Session
2828

2929
from airflow.models.dataset import DatasetEvent
@@ -134,10 +134,12 @@ def next_dagrun_info(
134134
return None
135135
if last_automated_data_interval is not None: # has already run once
136136
start = last_automated_data_interval.end
137-
end = DateTime.utcnow()
137+
end = timezone.coerce_datetime(timezone.utcnow())
138138
else: # first run
139139
start = restriction.earliest
140-
end = max(restriction.earliest, DateTime.utcnow()) # won't run any earlier than start_date
140+
end = max(
141+
restriction.earliest, timezone.coerce_datetime(timezone.utcnow())
142+
) # won't run any earlier than start_date
141143

142144
if restriction.latest is not None and end > restriction.latest:
143145
return None

airflow/timetables/trigger.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,13 @@
1919
import datetime
2020
from typing import TYPE_CHECKING, Any
2121

22-
from pendulum import DateTime
23-
2422
from airflow.timetables._cron import CronMixin
2523
from airflow.timetables.base import DagRunInfo, DataInterval, Timetable
24+
from airflow.utils import timezone
2625

2726
if TYPE_CHECKING:
2827
from dateutil.relativedelta import relativedelta
28+
from pendulum import DateTime
2929
from pendulum.tz.timezone import FixedTimezone, Timezone
3030

3131
from airflow.timetables.base import TimeRestriction
@@ -98,7 +98,7 @@ def next_dagrun_info(
9898
else:
9999
next_start_time = self._align_to_next(restriction.earliest)
100100
else:
101-
start_time_candidates = [self._align_to_prev(DateTime.utcnow())]
101+
start_time_candidates = [self._align_to_prev(timezone.coerce_datetime(timezone.utcnow()))]
102102
if last_automated_data_interval is not None:
103103
start_time_candidates.append(self._get_next(last_automated_data_interval.end))
104104
if restriction.earliest is not None:

airflow/utils/cli.py

+3-4
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import traceback
2828
import warnings
2929
from argparse import Namespace
30-
from datetime import datetime
3130
from pathlib import Path
3231
from typing import TYPE_CHECKING, Callable, TypeVar, cast
3332

@@ -36,7 +35,7 @@
3635

3736
from airflow import settings
3837
from airflow.exceptions import AirflowException, RemovedInAirflow3Warning
39-
from airflow.utils import cli_action_loggers
38+
from airflow.utils import cli_action_loggers, timezone
4039
from airflow.utils.log.non_caching_file_handler import NonCachingFileHandler
4140
from airflow.utils.platform import getuser, is_terminal_support_colors
4241
from airflow.utils.session import NEW_SESSION, provide_session
@@ -116,7 +115,7 @@ def wrapper(*args, **kwargs):
116115
metrics["error"] = e
117116
raise
118117
finally:
119-
metrics["end_datetime"] = datetime.utcnow()
118+
metrics["end_datetime"] = timezone.utcnow()
120119
cli_action_loggers.on_post_execution(**metrics)
121120

122121
return cast(T, wrapper)
@@ -155,7 +154,7 @@ def _build_metrics(func_name, namespace):
155154

156155
metrics = {
157156
"sub_command": func_name,
158-
"start_datetime": datetime.utcnow(),
157+
"start_datetime": timezone.utcnow(),
159158
"full_command": f"{full_command}",
160159
"user": getuser(),
161160
}

airflow/utils/db_cleanup.py

+1-3
Original file line numberDiff line numberDiff line change
@@ -152,14 +152,12 @@ def _dump_table_to_file(*, target_table, file_path, export_format, session):
152152

153153

154154
def _do_delete(*, query, orm_model, skip_archive, session):
155-
from datetime import datetime
156-
157155
import re2
158156

159157
print("Performing Delete...")
160158
# using bulk delete
161159
# create a new table and copy the rows there
162-
timestamp_str = re2.sub(r"[^\d]", "", datetime.utcnow().isoformat())[:14]
160+
timestamp_str = re2.sub(r"[^\d]", "", timezone.utcnow().isoformat())[:14]
163161
target_table_name = f"{ARCHIVE_TABLE_PREFIX}{orm_model.name}__{timestamp_str}"
164162
print(f"Moving data to table {target_table_name}")
165163
bind = session.get_bind()

airflow/utils/jwt_signer.py

+6-4
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,13 @@
1616
# under the License.
1717
from __future__ import annotations
1818

19-
from datetime import datetime, timedelta
19+
from datetime import timedelta
2020
from typing import Any
2121

2222
import jwt
2323

24+
from airflow.utils import timezone
25+
2426

2527
class JWTSigner:
2628
"""
@@ -56,9 +58,9 @@ def generate_signed_token(self, extra_payload: dict[str, Any]) -> str:
5658
"""
5759
jwt_dict = {
5860
"aud": self._audience,
59-
"iat": datetime.utcnow(),
60-
"nbf": datetime.utcnow(),
61-
"exp": datetime.utcnow() + timedelta(seconds=self._expiration_time_in_seconds),
61+
"iat": timezone.utcnow(),
62+
"nbf": timezone.utcnow(),
63+
"exp": timezone.utcnow() + timedelta(seconds=self._expiration_time_in_seconds),
6264
}
6365
jwt_dict.update(extra_payload)
6466
token = jwt.encode(

airflow/utils/log/file_processor_handler.py

+2-3
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
from pathlib import Path
2424

2525
from airflow import settings
26+
from airflow.utils import timezone
2627
from airflow.utils.helpers import parse_template_string
2728
from airflow.utils.log.logging_mixin import DISABLE_PROPOGATE
2829
from airflow.utils.log.non_caching_file_handler import NonCachingFileHandler
@@ -102,9 +103,7 @@ def _render_filename(self, filename):
102103
return self.filename_template.format(filename=ctx["filename"])
103104

104105
def _get_log_directory(self):
105-
now = datetime.utcnow()
106-
107-
return os.path.join(self.base_log_folder, now.strftime("%Y-%m-%d"))
106+
return os.path.join(self.base_log_folder, timezone.utcnow().strftime("%Y-%m-%d"))
108107

109108
def _symlink_latest_log_directory(self):
110109
"""

airflow/utils/timezone.py

+1-7
Original file line numberDiff line numberDiff line change
@@ -60,13 +60,7 @@ def is_naive(value):
6060

6161
def utcnow() -> dt.datetime:
6262
"""Get the current date and time in UTC."""
63-
# pendulum utcnow() is not used as that sets a TimezoneInfo object
64-
# instead of a Timezone. This is not picklable and also creates issues
65-
# when using replace()
66-
result = dt.datetime.utcnow()
67-
result = result.replace(tzinfo=utc)
68-
69-
return result
63+
return dt.datetime.now(tz=utc)
7064

7165

7266
def utc_epoch() -> dt.datetime:

airflow/www/views.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -2975,7 +2975,7 @@ def calendar(self, dag_id: str, session: Session = NEW_SESSION):
29752975
for (date, count) in dates.items()
29762976
)
29772977

2978-
now = DateTime.utcnow()
2978+
now = timezone.utcnow()
29792979
data = {
29802980
"dag_states": data_dag_states,
29812981
"start_date": (dag.start_date or now).date().isoformat(),
@@ -3561,7 +3561,7 @@ def historical_metrics_data(self):
35613561
select(DagRun.run_type, func.count(DagRun.run_id))
35623562
.where(
35633563
DagRun.start_date >= start_date,
3564-
func.coalesce(DagRun.end_date, datetime.datetime.utcnow()) <= end_date,
3564+
func.coalesce(DagRun.end_date, timezone.utcnow()) <= end_date,
35653565
)
35663566
.group_by(DagRun.run_type)
35673567
).all()
@@ -3570,7 +3570,7 @@ def historical_metrics_data(self):
35703570
select(DagRun.state, func.count(DagRun.run_id))
35713571
.where(
35723572
DagRun.start_date >= start_date,
3573-
func.coalesce(DagRun.end_date, datetime.datetime.utcnow()) <= end_date,
3573+
func.coalesce(DagRun.end_date, timezone.utcnow()) <= end_date,
35743574
)
35753575
.group_by(DagRun.state)
35763576
).all()
@@ -3581,7 +3581,7 @@ def historical_metrics_data(self):
35813581
.join(TaskInstance.dag_run)
35823582
.where(
35833583
DagRun.start_date >= start_date,
3584-
func.coalesce(DagRun.end_date, datetime.datetime.utcnow()) <= end_date,
3584+
func.coalesce(DagRun.end_date, timezone.utcnow()) <= end_date,
35853585
)
35863586
.group_by(TaskInstance.state)
35873587
).all()

tests/conftest.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import sys
2424
import warnings
2525
from contextlib import ExitStack, suppress
26-
from datetime import datetime, timedelta
26+
from datetime import datetime, timedelta, timezone
2727
from pathlib import Path
2828
from typing import TYPE_CHECKING
2929

@@ -584,7 +584,7 @@ def test_something(frozen_sleep, monkeypatch):
584584

585585
def fake_sleep(seconds):
586586
nonlocal traveller
587-
utcnow = datetime.utcnow()
587+
utcnow = datetime.now(tz=timezone.utc)
588588
if traveller is not None:
589589
traveller.stop()
590590
traveller = time_machine.travel(utcnow + timedelta(seconds=seconds))

tests/dags/test_impersonation_subdag.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -18,22 +18,22 @@
1818
from __future__ import annotations
1919

2020
import warnings
21-
from datetime import datetime
2221

2322
from airflow.models.dag import DAG
2423
from airflow.operators.bash import BashOperator
2524
from airflow.operators.python import PythonOperator
2625
from airflow.operators.subdag import SubDagOperator
26+
from airflow.utils import timezone
2727

28-
DEFAULT_DATE = datetime(2016, 1, 1)
28+
DEFAULT_DATE = timezone.datetime(2016, 1, 1)
2929

3030
default_args = {"owner": "airflow", "start_date": DEFAULT_DATE, "run_as_user": "airflow_test_user"}
3131

3232
dag = DAG(dag_id="impersonation_subdag", default_args=default_args)
3333

3434

3535
def print_today():
36-
print(f"Today is {datetime.utcnow()}")
36+
print(f"Today is {timezone.utcnow()}")
3737

3838

3939
subdag = DAG("impersonation_subdag.test_subdag_operation", default_args=default_args)

tests/dags/test_scheduler_dags.py

+4-3
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,17 @@
1717
# under the License.
1818
from __future__ import annotations
1919

20-
from datetime import datetime, timedelta
20+
from datetime import timedelta
2121

2222
from airflow.models.dag import DAG
2323
from airflow.operators.empty import EmptyOperator
24+
from airflow.utils import timezone
2425

25-
DEFAULT_DATE = datetime(2016, 1, 1)
26+
DEFAULT_DATE = timezone.datetime(2016, 1, 1)
2627

2728
# DAG tests backfill with pooled tasks
2829
# Previously backfill would queue the task but never run it
29-
dag1 = DAG(dag_id="test_start_date_scheduling", start_date=datetime.utcnow() + timedelta(days=1))
30+
dag1 = DAG(dag_id="test_start_date_scheduling", start_date=timezone.utcnow() + timedelta(days=1))
3031
dag1_task1 = EmptyOperator(task_id="dummy", dag=dag1, owner="airflow")
3132

3233
dag2 = DAG(dag_id="test_task_start_date_scheduling", start_date=DEFAULT_DATE)

tests/decorators/test_external_python.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ def f(_):
139139
return None
140140

141141
with dag_maker():
142-
ret = f(datetime.datetime.utcnow())
142+
ret = f(datetime.datetime.now(tz=datetime.timezone.utc))
143143

144144
ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
145145

tests/decorators/test_python_virtualenv.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ def f(_):
201201
return None
202202

203203
with dag_maker():
204-
ret = f(datetime.datetime.utcnow())
204+
ret = f(datetime.datetime.now(tz=datetime.timezone.utc))
205205

206206
ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
207207

tests/jobs/test_triggerer_job.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -431,7 +431,7 @@ def test_trigger_from_dead_triggerer(session, create_task_instance):
431431
session.add(trigger_orm)
432432
ti_orm = create_task_instance(
433433
task_id="ti_orm",
434-
execution_date=datetime.datetime.utcnow(),
434+
execution_date=timezone.utcnow(),
435435
run_id="orm_run_id",
436436
)
437437
ti_orm.trigger_id = trigger_orm.id
@@ -458,7 +458,7 @@ def test_trigger_from_expired_triggerer(session, create_task_instance):
458458
session.add(trigger_orm)
459459
ti_orm = create_task_instance(
460460
task_id="ti_orm",
461-
execution_date=datetime.datetime.utcnow(),
461+
execution_date=timezone.utcnow(),
462462
run_id="orm_run_id",
463463
)
464464
ti_orm.trigger_id = trigger_orm.id

tests/models/test_skipmixin.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ def teardown_method(self):
5555
@patch("airflow.utils.timezone.utcnow")
5656
def test_skip(self, mock_now, dag_maker):
5757
session = settings.Session()
58-
now = datetime.datetime.utcnow().replace(tzinfo=pendulum.timezone("UTC"))
58+
now = datetime.datetime.now(tz=datetime.timezone.utc)
5959
mock_now.return_value = now
6060
with dag_maker("dag"):
6161
tasks = [EmptyOperator(task_id="task")]
@@ -77,7 +77,7 @@ def test_skip(self, mock_now, dag_maker):
7777
@patch("airflow.utils.timezone.utcnow")
7878
def test_skip_none_dagrun(self, mock_now, dag_maker):
7979
session = settings.Session()
80-
now = datetime.datetime.utcnow().replace(tzinfo=pendulum.timezone("UTC"))
80+
now = datetime.datetime.now(tz=pendulum.timezone("UTC"))
8181
mock_now.return_value = now
8282
with dag_maker(
8383
"dag",

0 commit comments

Comments
 (0)