Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace datetime.datetime.utcnow by airflow.utils.timezone.utcnow in core #35448

Merged
merged 1 commit into from
Jan 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions airflow/secrets/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import multiprocessing

from airflow.configuration import conf
from airflow.utils import timezone


class SecretCache:
Expand All @@ -36,10 +37,10 @@ class NotPresentException(Exception):
class _CacheValue:
def __init__(self, value: str | None) -> None:
self.value = value
self.date = datetime.datetime.utcnow()
self.date = timezone.utcnow()

def is_expired(self, ttl: datetime.timedelta) -> bool:
return datetime.datetime.utcnow() - self.date > ttl
return timezone.utcnow() - self.date > ttl

_VARIABLE_PREFIX = "__v_"
_CONNECTION_PREFIX = "__c_"
Expand Down
4 changes: 2 additions & 2 deletions airflow/task/task_runner/cgroup_task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
"""Task runner for cgroup to run Airflow task."""
from __future__ import annotations

import datetime
import os
import uuid
from typing import TYPE_CHECKING
Expand All @@ -27,6 +26,7 @@
from cgroupspy import trees

from airflow.task.task_runner.base_task_runner import BaseTaskRunner
from airflow.utils import timezone
from airflow.utils.operator_resources import Resources
from airflow.utils.platform import getuser
from airflow.utils.process_utils import reap_process_group
Expand Down Expand Up @@ -137,7 +137,7 @@ def start(self):
return

# Create a unique cgroup name
cgroup_name = f"airflow/{datetime.datetime.utcnow():%Y-%m-%d}/{uuid.uuid4()}"
cgroup_name = f"airflow/{timezone.utcnow():%Y-%m-%d}/{uuid.uuid4()}"

self.mem_cgroup_name = f"memory/{cgroup_name}"
self.cpu_cgroup_name = f"cpu/{cgroup_name}"
Expand Down
6 changes: 3 additions & 3 deletions airflow/timetables/interval.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from airflow.exceptions import AirflowTimetableInvalid
from airflow.timetables._cron import CronMixin
from airflow.timetables.base import DagRunInfo, DataInterval, Timetable
from airflow.utils.timezone import convert_to_utc
from airflow.utils.timezone import coerce_datetime, convert_to_utc, utcnow

if TYPE_CHECKING:
from airflow.timetables.base import TimeRestriction
Expand Down Expand Up @@ -146,7 +146,7 @@ def _skip_to_latest(self, earliest: DateTime | None) -> DateTime:
If the next schedule should start *right now*, we want the data interval
that start now, not the one that ends now.
"""
current_time = DateTime.utcnow()
current_time = coerce_datetime(utcnow())
last_start = self._get_prev(current_time)
next_start = self._get_next(last_start)
if next_start == current_time: # Current time is on interval boundary.
Expand Down Expand Up @@ -257,7 +257,7 @@ def _skip_to_latest(self, earliest: DateTime | None) -> DateTime:

This is slightly different from the cron version at terminal values.
"""
round_current_time = self._round(DateTime.utcnow())
round_current_time = self._round(coerce_datetime(utcnow()))
new_start = self._get_prev(round_current_time)
if earliest is None:
return new_start
Expand Down
10 changes: 6 additions & 4 deletions airflow/timetables/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@
import operator
from typing import TYPE_CHECKING, Any, Collection

from pendulum import DateTime

from airflow.timetables.base import DagRunInfo, DataInterval, Timetable
from airflow.utils import timezone

if TYPE_CHECKING:
from pendulum import DateTime
from sqlalchemy import Session

from airflow.models.dataset import DatasetEvent
Expand Down Expand Up @@ -134,10 +134,12 @@ def next_dagrun_info(
return None
if last_automated_data_interval is not None: # has already run once
start = last_automated_data_interval.end
end = DateTime.utcnow()
end = timezone.coerce_datetime(timezone.utcnow())
else: # first run
start = restriction.earliest
end = max(restriction.earliest, DateTime.utcnow()) # won't run any earlier than start_date
end = max(
restriction.earliest, timezone.coerce_datetime(timezone.utcnow())
) # won't run any earlier than start_date

if restriction.latest is not None and end > restriction.latest:
return None
Expand Down
6 changes: 3 additions & 3 deletions airflow/timetables/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@
import datetime
from typing import TYPE_CHECKING, Any

from pendulum import DateTime

from airflow.timetables._cron import CronMixin
from airflow.timetables.base import DagRunInfo, DataInterval, Timetable
from airflow.utils import timezone

if TYPE_CHECKING:
from dateutil.relativedelta import relativedelta
from pendulum import DateTime
from pendulum.tz.timezone import FixedTimezone, Timezone

from airflow.timetables.base import TimeRestriction
Expand Down Expand Up @@ -98,7 +98,7 @@ def next_dagrun_info(
else:
next_start_time = self._align_to_next(restriction.earliest)
else:
start_time_candidates = [self._align_to_prev(DateTime.utcnow())]
start_time_candidates = [self._align_to_prev(timezone.coerce_datetime(timezone.utcnow()))]
if last_automated_data_interval is not None:
start_time_candidates.append(self._get_next(last_automated_data_interval.end))
if restriction.earliest is not None:
Expand Down
7 changes: 3 additions & 4 deletions airflow/utils/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import traceback
import warnings
from argparse import Namespace
from datetime import datetime
from pathlib import Path
from typing import TYPE_CHECKING, Callable, TypeVar, cast

Expand All @@ -36,7 +35,7 @@

from airflow import settings
from airflow.exceptions import AirflowException, RemovedInAirflow3Warning
from airflow.utils import cli_action_loggers
from airflow.utils import cli_action_loggers, timezone
from airflow.utils.log.non_caching_file_handler import NonCachingFileHandler
from airflow.utils.platform import getuser, is_terminal_support_colors
from airflow.utils.session import NEW_SESSION, provide_session
Expand Down Expand Up @@ -116,7 +115,7 @@ def wrapper(*args, **kwargs):
metrics["error"] = e
raise
finally:
metrics["end_datetime"] = datetime.utcnow()
metrics["end_datetime"] = timezone.utcnow()
cli_action_loggers.on_post_execution(**metrics)

return cast(T, wrapper)
Expand Down Expand Up @@ -155,7 +154,7 @@ def _build_metrics(func_name, namespace):

metrics = {
"sub_command": func_name,
"start_datetime": datetime.utcnow(),
"start_datetime": timezone.utcnow(),
"full_command": f"{full_command}",
"user": getuser(),
}
Expand Down
4 changes: 1 addition & 3 deletions airflow/utils/db_cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,14 +152,12 @@ def _dump_table_to_file(*, target_table, file_path, export_format, session):


def _do_delete(*, query, orm_model, skip_archive, session):
from datetime import datetime

import re2

print("Performing Delete...")
# using bulk delete
# create a new table and copy the rows there
timestamp_str = re2.sub(r"[^\d]", "", datetime.utcnow().isoformat())[:14]
timestamp_str = re2.sub(r"[^\d]", "", timezone.utcnow().isoformat())[:14]
target_table_name = f"{ARCHIVE_TABLE_PREFIX}{orm_model.name}__{timestamp_str}"
print(f"Moving data to table {target_table_name}")
bind = session.get_bind()
Expand Down
10 changes: 6 additions & 4 deletions airflow/utils/jwt_signer.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@
# under the License.
from __future__ import annotations

from datetime import datetime, timedelta
from datetime import timedelta
from typing import Any

import jwt

from airflow.utils import timezone


class JWTSigner:
"""
Expand Down Expand Up @@ -56,9 +58,9 @@ def generate_signed_token(self, extra_payload: dict[str, Any]) -> str:
"""
jwt_dict = {
"aud": self._audience,
"iat": datetime.utcnow(),
"nbf": datetime.utcnow(),
"exp": datetime.utcnow() + timedelta(seconds=self._expiration_time_in_seconds),
"iat": timezone.utcnow(),
"nbf": timezone.utcnow(),
"exp": timezone.utcnow() + timedelta(seconds=self._expiration_time_in_seconds),
}
jwt_dict.update(extra_payload)
token = jwt.encode(
Expand Down
5 changes: 2 additions & 3 deletions airflow/utils/log/file_processor_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from pathlib import Path

from airflow import settings
from airflow.utils import timezone
from airflow.utils.helpers import parse_template_string
from airflow.utils.log.logging_mixin import DISABLE_PROPOGATE
from airflow.utils.log.non_caching_file_handler import NonCachingFileHandler
Expand Down Expand Up @@ -102,9 +103,7 @@ def _render_filename(self, filename):
return self.filename_template.format(filename=ctx["filename"])

def _get_log_directory(self):
now = datetime.utcnow()

return os.path.join(self.base_log_folder, now.strftime("%Y-%m-%d"))
return os.path.join(self.base_log_folder, timezone.utcnow().strftime("%Y-%m-%d"))

def _symlink_latest_log_directory(self):
"""
Expand Down
8 changes: 1 addition & 7 deletions airflow/utils/timezone.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,7 @@ def is_naive(value):

def utcnow() -> dt.datetime:
"""Get the current date and time in UTC."""
# pendulum utcnow() is not used as that sets a TimezoneInfo object
# instead of a Timezone. This is not picklable and also creates issues
# when using replace()
result = dt.datetime.utcnow()
result = result.replace(tzinfo=utc)

return result
return dt.datetime.now(tz=utc)


def utc_epoch() -> dt.datetime:
Expand Down
8 changes: 4 additions & 4 deletions airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -2975,7 +2975,7 @@ def calendar(self, dag_id: str, session: Session = NEW_SESSION):
for (date, count) in dates.items()
)

now = DateTime.utcnow()
now = timezone.utcnow()
data = {
"dag_states": data_dag_states,
"start_date": (dag.start_date or now).date().isoformat(),
Expand Down Expand Up @@ -3561,7 +3561,7 @@ def historical_metrics_data(self):
select(DagRun.run_type, func.count(DagRun.run_id))
.where(
DagRun.start_date >= start_date,
func.coalesce(DagRun.end_date, datetime.datetime.utcnow()) <= end_date,
func.coalesce(DagRun.end_date, timezone.utcnow()) <= end_date,
)
.group_by(DagRun.run_type)
).all()
Expand All @@ -3570,7 +3570,7 @@ def historical_metrics_data(self):
select(DagRun.state, func.count(DagRun.run_id))
.where(
DagRun.start_date >= start_date,
func.coalesce(DagRun.end_date, datetime.datetime.utcnow()) <= end_date,
func.coalesce(DagRun.end_date, timezone.utcnow()) <= end_date,
)
.group_by(DagRun.state)
).all()
Expand All @@ -3581,7 +3581,7 @@ def historical_metrics_data(self):
.join(TaskInstance.dag_run)
.where(
DagRun.start_date >= start_date,
func.coalesce(DagRun.end_date, datetime.datetime.utcnow()) <= end_date,
func.coalesce(DagRun.end_date, timezone.utcnow()) <= end_date,
)
.group_by(TaskInstance.state)
).all()
Expand Down
4 changes: 2 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import sys
import warnings
from contextlib import ExitStack, suppress
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import TYPE_CHECKING

Expand Down Expand Up @@ -584,7 +584,7 @@ def test_something(frozen_sleep, monkeypatch):

def fake_sleep(seconds):
nonlocal traveller
utcnow = datetime.utcnow()
utcnow = datetime.now(tz=timezone.utc)
if traveller is not None:
traveller.stop()
traveller = time_machine.travel(utcnow + timedelta(seconds=seconds))
Expand Down
6 changes: 3 additions & 3 deletions tests/dags/test_impersonation_subdag.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,22 @@
from __future__ import annotations

import warnings
from datetime import datetime

from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.operators.subdag import SubDagOperator
from airflow.utils import timezone

DEFAULT_DATE = datetime(2016, 1, 1)
DEFAULT_DATE = timezone.datetime(2016, 1, 1)

default_args = {"owner": "airflow", "start_date": DEFAULT_DATE, "run_as_user": "airflow_test_user"}

dag = DAG(dag_id="impersonation_subdag", default_args=default_args)


def print_today():
print(f"Today is {datetime.utcnow()}")
print(f"Today is {timezone.utcnow()}")


subdag = DAG("impersonation_subdag.test_subdag_operation", default_args=default_args)
Expand Down
7 changes: 4 additions & 3 deletions tests/dags/test_scheduler_dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,17 @@
# under the License.
from __future__ import annotations

from datetime import datetime, timedelta
from datetime import timedelta

from airflow.models.dag import DAG
from airflow.operators.empty import EmptyOperator
from airflow.utils import timezone

DEFAULT_DATE = datetime(2016, 1, 1)
DEFAULT_DATE = timezone.datetime(2016, 1, 1)

# DAG tests backfill with pooled tasks
# Previously backfill would queue the task but never run it
dag1 = DAG(dag_id="test_start_date_scheduling", start_date=datetime.utcnow() + timedelta(days=1))
dag1 = DAG(dag_id="test_start_date_scheduling", start_date=timezone.utcnow() + timedelta(days=1))
dag1_task1 = EmptyOperator(task_id="dummy", dag=dag1, owner="airflow")

dag2 = DAG(dag_id="test_task_start_date_scheduling", start_date=DEFAULT_DATE)
Expand Down
2 changes: 1 addition & 1 deletion tests/decorators/test_external_python.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ def f(_):
return None

with dag_maker():
ret = f(datetime.datetime.utcnow())
ret = f(datetime.datetime.now(tz=datetime.timezone.utc))

ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)

Expand Down
2 changes: 1 addition & 1 deletion tests/decorators/test_python_virtualenv.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ def f(_):
return None

with dag_maker():
ret = f(datetime.datetime.utcnow())
ret = f(datetime.datetime.now(tz=datetime.timezone.utc))

ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)

Expand Down
4 changes: 2 additions & 2 deletions tests/jobs/test_triggerer_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ def test_trigger_from_dead_triggerer(session, create_task_instance):
session.add(trigger_orm)
ti_orm = create_task_instance(
task_id="ti_orm",
execution_date=datetime.datetime.utcnow(),
execution_date=timezone.utcnow(),
run_id="orm_run_id",
)
ti_orm.trigger_id = trigger_orm.id
Expand All @@ -458,7 +458,7 @@ def test_trigger_from_expired_triggerer(session, create_task_instance):
session.add(trigger_orm)
ti_orm = create_task_instance(
task_id="ti_orm",
execution_date=datetime.datetime.utcnow(),
execution_date=timezone.utcnow(),
run_id="orm_run_id",
)
ti_orm.trigger_id = trigger_orm.id
Expand Down
4 changes: 2 additions & 2 deletions tests/models/test_skipmixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def teardown_method(self):
@patch("airflow.utils.timezone.utcnow")
def test_skip(self, mock_now, dag_maker):
session = settings.Session()
now = datetime.datetime.utcnow().replace(tzinfo=pendulum.timezone("UTC"))
now = datetime.datetime.now(tz=datetime.timezone.utc)
mock_now.return_value = now
with dag_maker("dag"):
tasks = [EmptyOperator(task_id="task")]
Expand All @@ -77,7 +77,7 @@ def test_skip(self, mock_now, dag_maker):
@patch("airflow.utils.timezone.utcnow")
def test_skip_none_dagrun(self, mock_now, dag_maker):
session = settings.Session()
now = datetime.datetime.utcnow().replace(tzinfo=pendulum.timezone("UTC"))
now = datetime.datetime.now(tz=pendulum.timezone("UTC"))
mock_now.return_value = now
with dag_maker(
"dag",
Expand Down
Loading