Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(data_frame): add possibility to specify timestamp column #440

Merged
merged 5 commits into from
May 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
## 1.30.0 [unreleased]

### Features
1. [#440](https://github.com/influxdata/influxdb-client-python/pull/440): Add possibility to specify timestamp column and its timezone [DataFrame]

## 1.29.1 [2022-05-23]

### Bug Fixes
Expand Down
47 changes: 36 additions & 11 deletions influxdb_client/client/write/dataframe_serializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,11 @@ def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION
:param chunk_size: The size of chunk for serializing into chunks.
:key data_frame_measurement_name: name of measurement for writing Pandas DataFrame
:key data_frame_tag_columns: list of DataFrame columns which are tags, rest columns will be fields
"""
:key data_frame_timestamp_column: name of DataFrame column which contains a timestamp. The column can be defined as a :class:`~str` value
formatted as `2018-10-26`, `2018-10-26 12:00`, `2018-10-26 12:00:00-05:00`
or other formats and types supported by `pandas.to_datetime <https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.to_datetime.html#pandas.to_datetime>`_ - ``DataFrame``
:key data_frame_timestamp_timezone: name of the timezone which is used for timestamp column - ``DataFrame``
""" # noqa: E501
# This function is hard to understand but for good reason:
# the approach used here is considerably more efficient
# than the alternatives.
Expand Down Expand Up @@ -92,19 +96,32 @@ def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION
if data_frame_measurement_name is None:
raise TypeError('"data_frame_measurement_name" is a Required Argument')

timestamp_column = kwargs.get('data_frame_timestamp_column', None)
timestamp_timezone = kwargs.get('data_frame_timestamp_timezone', None)
data_frame = data_frame.copy(deep=False)
if isinstance(data_frame.index, pd.PeriodIndex):
data_frame.index = data_frame.index.to_timestamp()
data_frame_timestamp = data_frame.index if timestamp_column is None else data_frame[timestamp_column]
if isinstance(data_frame_timestamp, pd.PeriodIndex):
data_frame_timestamp = data_frame_timestamp.to_timestamp()
else:
# TODO: this is almost certainly not what you want
# when the index is the default RangeIndex.
# Instead, it would probably be better to leave
# out the timestamp unless a time column is explicitly
# enabled.
data_frame.index = pd.to_datetime(data_frame.index, unit=precision)
data_frame_timestamp = pd.to_datetime(data_frame_timestamp, unit=precision)

if timestamp_timezone:
if isinstance(data_frame_timestamp, pd.DatetimeIndex):
data_frame_timestamp = data_frame_timestamp.tz_localize(timestamp_timezone)
else:
data_frame_timestamp = data_frame_timestamp.dt.tz_localize(timestamp_timezone)

if data_frame.index.tzinfo is None:
data_frame.index = data_frame.index.tz_localize('UTC')
if hasattr(data_frame_timestamp, 'tzinfo') and data_frame_timestamp.tzinfo is None:
data_frame_timestamp = data_frame_timestamp.tz_localize('UTC')
if timestamp_column is None:
data_frame.index = data_frame_timestamp
else:
data_frame[timestamp_column] = data_frame_timestamp

data_frame_tag_columns = kwargs.get('data_frame_tag_columns')
data_frame_tag_columns = set(data_frame_tag_columns or [])
Expand Down Expand Up @@ -141,6 +158,7 @@ def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION
# null_columns has a bool value for each column holding
# whether that column contains any null (NaN or None) values.
null_columns = data_frame.isnull().any()
timestamp_index = 0

# Iterate through the columns building up the expression for each column.
for index, (key, value) in columns:
Expand All @@ -164,6 +182,9 @@ def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION
key_value = f',{key_format}={{str({val_format}).translate(_ESCAPE_KEY)}}'
tags.append(key_value)
continue
elif timestamp_column is not None and key in timestamp_column:
timestamp_index = field_index
continue

# This column is a field column.
# Note: no comma separator is needed for the first field.
Expand Down Expand Up @@ -195,13 +216,13 @@ def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION

tags = ''.join(tags)
fields = ''.join(fields)
timestamp = '{p[0].value}'
timestamp = '{p[%s].value}' % timestamp_index
if precision == WritePrecision.US:
timestamp = '{int(p[0].value / 1e3)}'
timestamp = '{int(p[%s].value / 1e3)}' % timestamp_index
elif precision == WritePrecision.MS:
timestamp = '{int(p[0].value / 1e6)}'
timestamp = '{int(p[%s].value / 1e6)}' % timestamp_index
elif precision == WritePrecision.S:
timestamp = '{int(p[0].value / 1e9)}'
timestamp = '{int(p[%s].value / 1e9)}' % timestamp_index

f = eval(f'lambda p: f"""{{measurement_name}}{tags} {fields} {timestamp}"""', {
'measurement_name': measurement_name,
Expand Down Expand Up @@ -268,5 +289,9 @@ def data_frame_to_list_of_points(data_frame, point_settings, precision=DEFAULT_W
:param precision: The precision for the unix timestamps within the body line-protocol.
:key data_frame_measurement_name: name of measurement for writing Pandas DataFrame
:key data_frame_tag_columns: list of DataFrame columns which are tags, rest columns will be fields
"""
:key data_frame_timestamp_column: name of DataFrame column which contains a timestamp. The column can be defined as a :class:`~str` value
formatted as `2018-10-26`, `2018-10-26 12:00`, `2018-10-26 12:00:00-05:00`
or other formats and types supported by `pandas.to_datetime <https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.to_datetime.html#pandas.to_datetime>`_ - ``DataFrame``
:key data_frame_timestamp_timezone: name of the timezone which is used for timestamp column - ``DataFrame``
""" # noqa: E501
return DataframeSerializer(data_frame, point_settings, precision, **kwargs).serialize()
8 changes: 6 additions & 2 deletions influxdb_client/client/write_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,10 @@ def write(self, bucket: str, org: str = None,
:key data_frame_measurement_name: name of measurement for writing Pandas DataFrame - ``DataFrame``
:key data_frame_tag_columns: list of DataFrame columns which are tags,
rest columns will be fields - ``DataFrame``
:key data_frame_timestamp_column: name of DataFrame column which contains a timestamp. The column can be defined as a :class:`~str` value
formatted as `2018-10-26`, `2018-10-26 12:00`, `2018-10-26 12:00:00-05:00`
or other formats and types supported by `pandas.to_datetime <https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.to_datetime.html#pandas.to_datetime>`_ - ``DataFrame``
:key data_frame_timestamp_timezone: name of the timezone which is used for timestamp column - ``DataFrame``
:key record_measurement_key: key of record with specified measurement -
``dictionary``, ``NamedTuple``, ``dataclass``
:key record_measurement_name: static measurement name - ``dictionary``, ``NamedTuple``, ``dataclass``
Expand Down Expand Up @@ -322,8 +326,8 @@ def write(self, bucket: str, org: str = None,
write_api.write("my-bucket", "my-org", point)

DataFrame:
The index of `Pandas DataFrame <https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html>`_
is used as a ``timestamp`` for written data. The index should be `PeriodIndex <https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.PeriodIndex.html#pandas.PeriodIndex>`_
If the ``data_frame_timestamp_column`` is not specified the index of `Pandas DataFrame <https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html>`_
is used as a ``timestamp`` for written data. The index can be `PeriodIndex <https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.PeriodIndex.html#pandas.PeriodIndex>`_
or its must be transformable to ``datetime`` by
`pandas.to_datetime <https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.to_datetime.html#pandas.to_datetime>`_.

Expand Down
8 changes: 6 additions & 2 deletions influxdb_client/client/write_api_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ async def write(self, bucket: str, org: str = None,
:key data_frame_measurement_name: name of measurement for writing Pandas DataFrame - ``DataFrame``
:key data_frame_tag_columns: list of DataFrame columns which are tags,
rest columns will be fields - ``DataFrame``
:key data_frame_timestamp_column: name of DataFrame column which contains a timestamp. The column can be defined as a :class:`~str` value
formatted as `2018-10-26`, `2018-10-26 12:00`, `2018-10-26 12:00:00-05:00`
or other formats and types supported by `pandas.to_datetime <https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.to_datetime.html#pandas.to_datetime>`_ - ``DataFrame``
:key data_frame_timestamp_timezone: name of the timezone which is used for timestamp column - ``DataFrame``
:key record_measurement_key: key of record with specified measurement -
``dictionary``, ``NamedTuple``, ``dataclass``
:key record_measurement_name: static measurement name - ``dictionary``, ``NamedTuple``, ``dataclass``
Expand Down Expand Up @@ -87,8 +91,8 @@ async def write(self, bucket: str, org: str = None,
await write_api.write("my-bucket", "my-org", point)

DataFrame:
The index of `Pandas DataFrame <https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html>`_
is used as a ``timestamp`` for written data. The index should be `PeriodIndex <https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.PeriodIndex.html#pandas.PeriodIndex>`_
If the ``data_frame_timestamp_column`` is not specified the index of `Pandas DataFrame <https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html>`_
is used as a ``timestamp`` for written data. The index can be `PeriodIndex <https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.PeriodIndex.html#pandas.PeriodIndex>`_
or its must be transformable to ``datetime`` by
`pandas.to_datetime <https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.to_datetime.html#pandas.to_datetime>`_.

Expand Down
95 changes: 95 additions & 0 deletions tests/test_WriteApiDataFrame.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,101 @@ def test_without_tags_and_fields_with_nan(self):
self.assertEqual("test a=1.0 1609459260000000000", points[1])
self.assertEqual("test a=2.0,b=1.0 1609459320000000000", points[2])

def test_use_timestamp_from_specified_column(self):
from influxdb_client.extras import pd
data_frame = pd.DataFrame(data={
'column_time': ['2020-04-05', '2020-05-05'],
'value1': [10, 20],
'value2': [30, 40],
}, index=['A', 'B'])

points = data_frame_to_list_of_points(data_frame=data_frame,
data_frame_measurement_name="test",
data_frame_timestamp_column="column_time",
point_settings=PointSettings())

self.assertEqual(2, len(points))
self.assertEqual('test value1=10i,value2=30i 1586044800000000000', points[0])
self.assertEqual('test value1=20i,value2=40i 1588636800000000000', points[1])

def test_str_format_for_timestamp(self):
from influxdb_client.extras import pd

time_formats = [
('2018-10-26', 'test value1=10i,value2=20i 1540512000000000000'),
('2018-10-26 10:00', 'test value1=10i,value2=20i 1540548000000000000'),
('2018-10-26 10:00:00-05:00', 'test value1=10i,value2=20i 1540566000000000000'),
('2018-10-26T11:00:00+00:00', 'test value1=10i,value2=20i 1540551600000000000'),
('2018-10-26 12:00:00+00:00', 'test value1=10i,value2=20i 1540555200000000000'),
('2018-10-26T16:00:00-01:00', 'test value1=10i,value2=20i 1540573200000000000'),
]

for time_format in time_formats:
data_frame = pd.DataFrame(data={
'column_time': [time_format[0]],
'value1': [10],
'value2': [20],
}, index=['A'])
points = data_frame_to_list_of_points(data_frame=data_frame,
data_frame_measurement_name="test",
data_frame_timestamp_column="column_time",
point_settings=PointSettings())

self.assertEqual(1, len(points))
self.assertEqual(time_format[1], points[0])

def test_specify_timezone(self):
from influxdb_client.extras import pd
data_frame = pd.DataFrame(data={
'column_time': ['2020-05-24 10:00', '2020-05-24 01:00'],
'value1': [10, 20],
'value2': [30, 40],
}, index=['A', 'B'])

points = data_frame_to_list_of_points(data_frame=data_frame,
data_frame_measurement_name="test",
data_frame_timestamp_column="column_time",
data_frame_timestamp_timezone="Europe/Berlin",
point_settings=PointSettings())

self.assertEqual(2, len(points))
self.assertEqual('test value1=10i,value2=30i 1590307200000000000', points[0])
self.assertEqual('test value1=20i,value2=40i 1590274800000000000', points[1])

def test_specify_timezone_date_time_index(self):
from influxdb_client.extras import pd
data_frame = pd.DataFrame(data={
'value1': [10, 20],
'value2': [30, 40],
}, index=[pd.Timestamp('2020-05-24 10:00'), pd.Timestamp('2020-05-24 01:00')])

points = data_frame_to_list_of_points(data_frame=data_frame,
data_frame_measurement_name="test",
data_frame_timestamp_timezone="Europe/Berlin",
point_settings=PointSettings())

self.assertEqual(2, len(points))
self.assertEqual('test value1=10i,value2=30i 1590307200000000000', points[0])
self.assertEqual('test value1=20i,value2=40i 1590274800000000000', points[1])

def test_specify_timezone_period_time_index(self):
from influxdb_client.extras import pd
data_frame = pd.DataFrame(data={
'value1': [10, 20],
'value2': [30, 40],
}, index=pd.period_range(start='2020-05-24 10:00', freq='H', periods=2))

print(data_frame.to_string())

points = data_frame_to_list_of_points(data_frame=data_frame,
data_frame_measurement_name="test",
data_frame_timestamp_timezone="Europe/Berlin",
point_settings=PointSettings())

self.assertEqual(2, len(points))
self.assertEqual('test value1=10i,value2=30i 1590307200000000000', points[0])
self.assertEqual('test value1=20i,value2=40i 1590310800000000000', points[1])


class DataSerializerChunksTest(unittest.TestCase):
def test_chunks(self):
Expand Down