-
Notifications
You must be signed in to change notification settings - Fork 187
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Pandas Dataframe writing using tons of RAM with large datasets #291
Comments
@CyberAngler93, thanks for using our client. We will take a look. |
@CyberAngler93, the current implementation of our batching API serialize whole DataFrame into Line Protocols. These LineProtocols are used for create batches which is not optimal way how to do it - we will have to improve our Meanwhile you can use this workaround - split DataFrame into chunks and write chunks by synchronous API: """
How to ingest large DataFrame by splitting into chunks.
"""
import math
from datetime import datetime
from influxdb_client import InfluxDBClient
from influxdb_client.client.write.retry import WritesRetry
from influxdb_client.client.write_api import SYNCHRONOUS
from influxdb_client.extras import pd, np
"""
Configuration
"""
url = 'http://localhost:8086'
token = 'my-token'
org = 'my-org'
bucket = 'my-bucket'
dataframe_rows_count = 150_000
chunk_size = 100
"""
Generate Dataframe
"""
print()
print("=== Generating DataFrame ===")
print()
col_data = {
'time': np.arange(0, dataframe_rows_count, 1, dtype=int),
'tag': np.random.choice(['tag_a', 'tag_b', 'test_c'], size=(dataframe_rows_count,)),
}
for n in range(2, 2999):
col_data[f'col{n}'] = np.random.rand(dataframe_rows_count)
data_frame = pd.DataFrame(data=col_data).set_index('time')
print(data_frame)
"""
Split DataFrame into Chunks
"""
print()
print("=== Splitting into chunks ===")
print()
chunks = []
number_of_chunks = int(math.ceil(len(data_frame) / float(chunk_size)))
for chunk_idx in range(number_of_chunks):
chunks.append(data_frame[chunk_idx * chunk_size:(chunk_idx + 1) * chunk_size])
"""
Write chunks by Synchronous WriteAPI
"""
startTime = datetime.now()
with InfluxDBClient(url=url, token=token, org=org,
retries=WritesRetry(total=3, retry_interval=1, exponential_base=2)) as client:
"""
Use synchronous version of WriteApi to strongly depends on result of write
"""
write_api = client.write_api(write_options=SYNCHRONOUS)
for idx, chunk in enumerate(chunks):
print(f"Writing chunk {idx + 1}/{len(chunks)}...")
write_api.write(bucket=bucket, record=chunk, data_frame_tag_columns=['tag'],
data_frame_measurement_name="measurement_name")
print()
print(f'Import finished in: {datetime.now() - startTime}')
print() |
Will give this method a shot, thank you for the quick reply and code example! |
Steps to reproduce:
List the minimal actions needed to reproduce the behavior.
This is apart of a Docker deployment of Grafana, Influxdb, and Nginx
Nginx is configured as a reverse proxy and I have configured it with the following config with 10 minute request timeout and max body size disabled
Expected behavior:
I have used the influx 1.x client to always do my pandas to influx processing and this issue does not occur. When making the change to 2.0 I expect large data ingest like this to take some time about 10-15 minutes but to successfully upload to Inlfuxdb. I expect the batch to be sent out as they are completed.
Actual behavior:
The process eventually times out while consuming all 16GB RAM and 4GB of swap on the system.
The retriable error occurred during request. Reason: 'HTTPSConnectionPool(host='192.168.1.200', port=443): Read timed out. (read timeout=3.5777276509998046)'
Specifications:
Last thing to note, is if I use a subset of my DataFrame say 10,000 rows the data writes with no issues.
The text was updated successfully, but these errors were encountered: