diff --git a/CHANGELOG.md b/CHANGELOG.md index d545beed..d7466fa1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,9 @@ ### Deprecated 1. [#264](https://github.com/influxdata/influxdb-client-python/pull/264): Deprecated `org_id` options BucketsApi.create_bucket in favor of `org` parameter +### Bug Fixes +1. [#270](https://github.com/influxdata/influxdb-client-python/pull/270): Supports `write_precision` for write Pandas DataFrame + ## 1.18.0 [2021-06-04] ### Breaking Changes diff --git a/influxdb_client/client/write/dataframe_serializer.py b/influxdb_client/client/write/dataframe_serializer.py index db5bd3be..cf7bb898 100644 --- a/influxdb_client/client/write/dataframe_serializer.py +++ b/influxdb_client/client/write/dataframe_serializer.py @@ -7,7 +7,8 @@ import re import math -from influxdb_client.client.write.point import _ESCAPE_KEY, _ESCAPE_STRING, _ESCAPE_MEASUREMENT +from influxdb_client import WritePrecision +from influxdb_client.client.write.point import _ESCAPE_KEY, _ESCAPE_STRING, _ESCAPE_MEASUREMENT, DEFAULT_WRITE_PRECISION def _itertuples(data_frame): @@ -23,8 +24,16 @@ def _any_not_nan(p, indexes): return any(map(lambda x: _not_nan(p[x]), indexes)) -def data_frame_to_list_of_points(data_frame, point_settings, **kwargs): - """Serialize DataFrame into LineProtocols.""" +def data_frame_to_list_of_points(data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION, **kwargs): + """ + Serialize DataFrame into LineProtocols. + + :param data_frame: Pandas DataFrame to serialize + :param point_settings: Default Tags + :param precision: The precision for the unix timestamps within the body line-protocol. + :key data_frame_measurement_name: name of measurement for writing Pandas DataFrame + :key data_frame_tag_columns: list of DataFrame columns which are tags, rest columns will be fields + """ # This function is hard to understand but for good reason: # the approach used here is considerably more efficient # than the alternatives. @@ -179,6 +188,12 @@ def data_frame_to_list_of_points(data_frame, point_settings, **kwargs): tags = ''.join(tags) fields = ''.join(fields) timestamp = '{p[0].value}' + if precision == WritePrecision.US: + timestamp = '{int(p[0].value / 1e3)}' + elif precision == WritePrecision.MS: + timestamp = '{int(p[0].value / 1e6)}' + elif precision == WritePrecision.S: + timestamp = '{int(p[0].value / 1e9)}' f = eval(f'lambda p: f"""{{measurement_name}}{tags} {fields} {timestamp}"""', { 'measurement_name': measurement_name, diff --git a/influxdb_client/client/write_api.py b/influxdb_client/client/write_api.py index e1730f3a..027b353e 100644 --- a/influxdb_client/client/write_api.py +++ b/influxdb_client/client/write_api.py @@ -312,7 +312,7 @@ def _serialize(self, record, write_precision, payload, **kwargs): self._serialize(Point.from_dict(record, write_precision=write_precision), write_precision, payload, **kwargs) elif 'DataFrame' in type(record).__name__: - _data = data_frame_to_list_of_points(record, self._point_settings, **kwargs) + _data = data_frame_to_list_of_points(record, self._point_settings, write_precision, **kwargs) self._serialize(_data, write_precision, payload, **kwargs) elif isinstance(record, Iterable): @@ -338,7 +338,8 @@ def _write_batching(self, bucket, org, data, precision, **kwargs) elif 'DataFrame' in type(data).__name__: - self._write_batching(bucket, org, data_frame_to_list_of_points(data, self._point_settings, **kwargs), + self._write_batching(bucket, org, + data_frame_to_list_of_points(data, self._point_settings, precision, **kwargs), precision, **kwargs) elif isinstance(data, Iterable): diff --git a/tests/test_WriteApiDataFrame.py b/tests/test_WriteApiDataFrame.py index 4b4317f2..52c93378 100644 --- a/tests/test_WriteApiDataFrame.py +++ b/tests/test_WriteApiDataFrame.py @@ -2,7 +2,7 @@ import unittest from datetime import timedelta -from influxdb_client import InfluxDBClient, WriteOptions, WriteApi +from influxdb_client import InfluxDBClient, WriteOptions, WriteApi, WritePrecision from influxdb_client.client.write.dataframe_serializer import data_frame_to_list_of_points from influxdb_client.client.write_api import SYNCHRONOUS, PointSettings from tests.base_test import BaseTest @@ -56,8 +56,8 @@ class DataSerializerTest(unittest.TestCase): def test_convert_data_frame(self): from influxdb_client.extras import pd, np - num_rows=1500000 - col_data={ + num_rows = 1500000 + col_data = { 'time': np.arange(0, num_rows, 1, dtype=int), 'col1': np.random.choice(['test_a', 'test_b', 'test_c'], size=(num_rows,)), } @@ -69,8 +69,8 @@ def test_convert_data_frame(self): start = time.time() data_frame_to_list_of_points(data_frame, PointSettings(), - data_frame_measurement_name='h2o_feet', - data_frame_tag_columns=['location']) + data_frame_measurement_name='h2o_feet', + data_frame_tag_columns=['location']) print("Time elapsed: ", (time.time() - start)) @@ -291,7 +291,7 @@ def test_with_default_tags(self): self.assertEqual("h2o,t1=a2,t2=every,t3=c2 value=2i 1586052000000000000", points[1]) # Check that the data frame hasn't been changed (an earlier version did change it) - self.assertEqual(True, (data_frame == original_data).all(axis = None), f'data changed; old:\n{original_data}\nnew:\n{data_frame}') + self.assertEqual(True, (data_frame == original_data).all(axis=None), f'data changed; old:\n{original_data}\nnew:\n{data_frame}') # Check that the default tags won't override actual column data. # This is arguably incorrect behavior, but it's how it works currently. @@ -304,7 +304,7 @@ def test_with_default_tags(self): self.assertEqual("h2o,t1=a1,t3=c1 value=1i 1586048400000000000", points[0]) self.assertEqual("h2o,t1=a2,t3=c2 value=2i 1586052000000000000", points[1]) - self.assertEqual(True, (data_frame == original_data).all(axis = None), f'data changed; old:\n{original_data}\nnew:\n{data_frame}') + self.assertEqual(True, (data_frame == original_data).all(axis=None), f'data changed; old:\n{original_data}\nnew:\n{data_frame}') def test_with_period_index(self): from influxdb_client.extras import pd @@ -333,3 +333,22 @@ def test_write_num_py_floats(self): self.assertEqual(1, len(points)) self.assertEqual("h2o level=15.5 1586044800000000000", points[0], msg=f'Current type: {np_float_type}') + def test_write_precision(self): + from influxdb_client.extras import pd + now = pd.Timestamp('2020-04-05 00:00+00:00') + precisions = [ + (WritePrecision.NS, 1586044800000000000), + (WritePrecision.US, 1586044800000000), + (WritePrecision.MS, 1586044800000), + (WritePrecision.S, 1586044800), + (None, 1586044800000000000) + ] + + for precision in precisions: + data_frame = pd.DataFrame([15], index=[now], columns=['level']) + points = data_frame_to_list_of_points(data_frame=data_frame, + data_frame_measurement_name='h2o', + point_settings=PointSettings(), + precision=precision[0]) + self.assertEqual(1, len(points)) + self.assertEqual(f"h2o level=15i {precision[1]}", points[0])