Skip to content

Commit cf21862

Browse files
authored
feat: dataframe_serializer supports batching (#293)
1 parent a26fc4c commit cf21862

File tree

6 files changed

+369
-191
lines changed

6 files changed

+369
-191
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
### Features
44
1. [#281](https://github.com/influxdata/influxdb-client-python/pull/281): `FluxTable`, `FluxColumn` and `FluxRecord` objects have helpful reprs
5+
1. [#293](https://github.com/influxdata/influxdb-client-python/pull/293): `dataframe_serializer` supports batching
56

67
### Bug Fixes
78
1. [#283](https://github.com/influxdata/influxdb-client-python/pull/283): Set proxy server in config file

examples/README.md

+1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
- [import_data_set.py](import_data_set.py) - How to import CSV file
55
- [import_data_set_multiprocessing.py](import_data_set_multiprocessing.py) - How to large CSV file by Python Multiprocessing
66
- [ingest_dataframe_default_tags.py](ingest_dataframe_default_tags.py) - How to ingest DataFrame with default tags
7+
- [ingest_large_dataframe.py](ingest_large_dataframe.py) - How to ingest large DataFrame
78
- [iot_sensor.py](iot_sensor.py) - How to write sensor data every minute by [RxPY](https://rxpy.readthedocs.io/en/latest/)
89
- [import_data_set_sync_batching.py](import_data_set_sync_batching.py) - How to use [RxPY](https://rxpy.readthedocs.io/en/latest/) to prepare batches for synchronous write into InfluxDB
910

examples/ingest_large_dataframe.py

+69
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
"""
2+
How to ingest large DataFrame by splitting into chunks.
3+
"""
4+
import logging
5+
import random
6+
from datetime import datetime
7+
8+
from influxdb_client import InfluxDBClient
9+
from influxdb_client.extras import pd, np
10+
11+
"""
12+
Enable logging for DataFrame serializer
13+
"""
14+
loggerSerializer = logging.getLogger('influxdb_client.client.write.dataframe_serializer')
15+
loggerSerializer.setLevel(level=logging.DEBUG)
16+
handler = logging.StreamHandler()
17+
handler.setFormatter(logging.Formatter('%(asctime)s | %(message)s'))
18+
loggerSerializer.addHandler(handler)
19+
20+
"""
21+
Configuration
22+
"""
23+
url = 'http://localhost:8086'
24+
token = 'my-token'
25+
org = 'my-org'
26+
bucket = 'my-bucket'
27+
28+
"""
29+
Generate Dataframe
30+
"""
31+
print()
32+
print("=== Generating DataFrame ===")
33+
print()
34+
dataframe_rows_count = 150_000
35+
36+
col_data = {
37+
'time': np.arange(0, dataframe_rows_count, 1, dtype=int),
38+
'tag': np.random.choice(['tag_a', 'tag_b', 'test_c'], size=(dataframe_rows_count,)),
39+
}
40+
for n in range(2, 2999):
41+
col_data[f'col{n}'] = random.randint(1, 10)
42+
43+
data_frame = pd.DataFrame(data=col_data).set_index('time')
44+
print(data_frame)
45+
46+
"""
47+
Ingest DataFrame
48+
"""
49+
print()
50+
print("=== Ingesting DataFrame via batching API ===")
51+
print()
52+
startTime = datetime.now()
53+
54+
with InfluxDBClient(url=url, token=token, org=org) as client:
55+
56+
"""
57+
Use batching API
58+
"""
59+
with client.write_api() as write_api:
60+
write_api.write(bucket=bucket, record=data_frame,
61+
data_frame_tag_columns=['tag'],
62+
data_frame_measurement_name="measurement_name")
63+
print()
64+
print("Wait to finishing ingesting DataFrame...")
65+
print()
66+
67+
print()
68+
print(f'Import finished in: {datetime.now() - startTime}')
69+
print()

0 commit comments

Comments
 (0)