Skip to content

Commit e0e46cc

Browse files
authored
fix: supports write_precision for write Pandas DataFrame (#270)
1 parent a2dbc0b commit e0e46cc

File tree

4 files changed

+50
-12
lines changed

4 files changed

+50
-12
lines changed

CHANGELOG.md

+3
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@
66
### Deprecated
77
1. [#264](https://github.com/influxdata/influxdb-client-python/pull/264): Deprecated `org_id` options BucketsApi.create_bucket in favor of `org` parameter
88

9+
### Bug Fixes
10+
1. [#270](https://github.com/influxdata/influxdb-client-python/pull/270): Supports `write_precision` for write Pandas DataFrame
11+
912
## 1.18.0 [2021-06-04]
1013

1114
### Breaking Changes

influxdb_client/client/write/dataframe_serializer.py

+18-3
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@
77
import re
88
import math
99

10-
from influxdb_client.client.write.point import _ESCAPE_KEY, _ESCAPE_STRING, _ESCAPE_MEASUREMENT
10+
from influxdb_client import WritePrecision
11+
from influxdb_client.client.write.point import _ESCAPE_KEY, _ESCAPE_STRING, _ESCAPE_MEASUREMENT, DEFAULT_WRITE_PRECISION
1112

1213

1314
def _itertuples(data_frame):
@@ -23,8 +24,16 @@ def _any_not_nan(p, indexes):
2324
return any(map(lambda x: _not_nan(p[x]), indexes))
2425

2526

26-
def data_frame_to_list_of_points(data_frame, point_settings, **kwargs):
27-
"""Serialize DataFrame into LineProtocols."""
27+
def data_frame_to_list_of_points(data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION, **kwargs):
28+
"""
29+
Serialize DataFrame into LineProtocols.
30+
31+
:param data_frame: Pandas DataFrame to serialize
32+
:param point_settings: Default Tags
33+
:param precision: The precision for the unix timestamps within the body line-protocol.
34+
:key data_frame_measurement_name: name of measurement for writing Pandas DataFrame
35+
:key data_frame_tag_columns: list of DataFrame columns which are tags, rest columns will be fields
36+
"""
2837
# This function is hard to understand but for good reason:
2938
# the approach used here is considerably more efficient
3039
# than the alternatives.
@@ -179,6 +188,12 @@ def data_frame_to_list_of_points(data_frame, point_settings, **kwargs):
179188
tags = ''.join(tags)
180189
fields = ''.join(fields)
181190
timestamp = '{p[0].value}'
191+
if precision == WritePrecision.US:
192+
timestamp = '{int(p[0].value / 1e3)}'
193+
elif precision == WritePrecision.MS:
194+
timestamp = '{int(p[0].value / 1e6)}'
195+
elif precision == WritePrecision.S:
196+
timestamp = '{int(p[0].value / 1e9)}'
182197

183198
f = eval(f'lambda p: f"""{{measurement_name}}{tags} {fields} {timestamp}"""', {
184199
'measurement_name': measurement_name,

influxdb_client/client/write_api.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -312,7 +312,7 @@ def _serialize(self, record, write_precision, payload, **kwargs):
312312
self._serialize(Point.from_dict(record, write_precision=write_precision),
313313
write_precision, payload, **kwargs)
314314
elif 'DataFrame' in type(record).__name__:
315-
_data = data_frame_to_list_of_points(record, self._point_settings, **kwargs)
315+
_data = data_frame_to_list_of_points(record, self._point_settings, write_precision, **kwargs)
316316
self._serialize(_data, write_precision, payload, **kwargs)
317317

318318
elif isinstance(record, Iterable):
@@ -338,7 +338,8 @@ def _write_batching(self, bucket, org, data,
338338
precision, **kwargs)
339339

340340
elif 'DataFrame' in type(data).__name__:
341-
self._write_batching(bucket, org, data_frame_to_list_of_points(data, self._point_settings, **kwargs),
341+
self._write_batching(bucket, org,
342+
data_frame_to_list_of_points(data, self._point_settings, precision, **kwargs),
342343
precision, **kwargs)
343344

344345
elif isinstance(data, Iterable):

tests/test_WriteApiDataFrame.py

+26-7
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import unittest
33
from datetime import timedelta
44

5-
from influxdb_client import InfluxDBClient, WriteOptions, WriteApi
5+
from influxdb_client import InfluxDBClient, WriteOptions, WriteApi, WritePrecision
66
from influxdb_client.client.write.dataframe_serializer import data_frame_to_list_of_points
77
from influxdb_client.client.write_api import SYNCHRONOUS, PointSettings
88
from tests.base_test import BaseTest
@@ -56,8 +56,8 @@ class DataSerializerTest(unittest.TestCase):
5656
def test_convert_data_frame(self):
5757
from influxdb_client.extras import pd, np
5858

59-
num_rows=1500000
60-
col_data={
59+
num_rows = 1500000
60+
col_data = {
6161
'time': np.arange(0, num_rows, 1, dtype=int),
6262
'col1': np.random.choice(['test_a', 'test_b', 'test_c'], size=(num_rows,)),
6363
}
@@ -69,8 +69,8 @@ def test_convert_data_frame(self):
6969

7070
start = time.time()
7171
data_frame_to_list_of_points(data_frame, PointSettings(),
72-
data_frame_measurement_name='h2o_feet',
73-
data_frame_tag_columns=['location'])
72+
data_frame_measurement_name='h2o_feet',
73+
data_frame_tag_columns=['location'])
7474

7575
print("Time elapsed: ", (time.time() - start))
7676

@@ -291,7 +291,7 @@ def test_with_default_tags(self):
291291
self.assertEqual("h2o,t1=a2,t2=every,t3=c2 value=2i 1586052000000000000", points[1])
292292

293293
# Check that the data frame hasn't been changed (an earlier version did change it)
294-
self.assertEqual(True, (data_frame == original_data).all(axis = None), f'data changed; old:\n{original_data}\nnew:\n{data_frame}')
294+
self.assertEqual(True, (data_frame == original_data).all(axis=None), f'data changed; old:\n{original_data}\nnew:\n{data_frame}')
295295

296296
# Check that the default tags won't override actual column data.
297297
# This is arguably incorrect behavior, but it's how it works currently.
@@ -304,7 +304,7 @@ def test_with_default_tags(self):
304304
self.assertEqual("h2o,t1=a1,t3=c1 value=1i 1586048400000000000", points[0])
305305
self.assertEqual("h2o,t1=a2,t3=c2 value=2i 1586052000000000000", points[1])
306306

307-
self.assertEqual(True, (data_frame == original_data).all(axis = None), f'data changed; old:\n{original_data}\nnew:\n{data_frame}')
307+
self.assertEqual(True, (data_frame == original_data).all(axis=None), f'data changed; old:\n{original_data}\nnew:\n{data_frame}')
308308

309309
def test_with_period_index(self):
310310
from influxdb_client.extras import pd
@@ -333,3 +333,22 @@ def test_write_num_py_floats(self):
333333
self.assertEqual(1, len(points))
334334
self.assertEqual("h2o level=15.5 1586044800000000000", points[0], msg=f'Current type: {np_float_type}')
335335

336+
def test_write_precision(self):
337+
from influxdb_client.extras import pd
338+
now = pd.Timestamp('2020-04-05 00:00+00:00')
339+
precisions = [
340+
(WritePrecision.NS, 1586044800000000000),
341+
(WritePrecision.US, 1586044800000000),
342+
(WritePrecision.MS, 1586044800000),
343+
(WritePrecision.S, 1586044800),
344+
(None, 1586044800000000000)
345+
]
346+
347+
for precision in precisions:
348+
data_frame = pd.DataFrame([15], index=[now], columns=['level'])
349+
points = data_frame_to_list_of_points(data_frame=data_frame,
350+
data_frame_measurement_name='h2o',
351+
point_settings=PointSettings(),
352+
precision=precision[0])
353+
self.assertEqual(1, len(points))
354+
self.assertEqual(f"h2o level=15i {precision[1]}", points[0])

0 commit comments

Comments
 (0)