Skip to content

Commit 6844f60

Browse files
authored
feat: implement exponential random retry strategy (#225)
1 parent 5cbc212 commit 6844f60

8 files changed

+201
-55
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
### Features
44
1. [#203](https://github.com/influxdata/influxdb-client-python/issues/219): Bind query parameters
5+
1. [#225](https://github.com/influxdata/influxdb-client-python/pull/225): Exponential random backoff retry strategy
56

67
### Bug Fixes
78
1. [#222](https://github.com/influxdata/influxdb-client-python/pull/222): Pass configured timeout to HTTP client

README.rst

+8-5
Original file line numberDiff line numberDiff line change
@@ -256,17 +256,20 @@ The batching is configurable by ``write_options``\ :
256256
- the number of milliseconds to increase the batch flush interval by a random amount
257257
- ``0``
258258
* - **retry_interval**
259-
- the number of milliseconds to retry unsuccessful write. The retry interval is used when the InfluxDB server does not specify "Retry-After" header.
259+
- the number of milliseconds to retry first unsuccessful write. The next retry delay is computed using exponential random backoff. The retry interval is used when the InfluxDB server does not specify "Retry-After" header.
260260
- ``5000``
261+
* - **max_retry_time**
262+
- maximum total retry timeout in milliseconds.
263+
- ``180_000``
261264
* - **max_retries**
262265
- the number of max retries when write fails
263-
- ``3``
266+
- ``5``
264267
* - **max_retry_delay**
265268
- the maximum delay between each retry attempt in milliseconds
266-
- ``180_000``
269+
- ``125_000``
267270
* - **exponential_base**
268-
- the base for the exponential retry delay, the next delay is computed as ``retry_interval * exponential_base^(attempts-1) + random(jitter_interval)``
269-
- ``5``
271+
- the base for the exponential retry delay, the next delay is computed using random exponential backoff as a random value within the interval ``retry_interval * exponential_base^(attempts-1)`` and ``retry_interval * exponential_base^(attempts)``. Example for ``retry_interval=5_000, exponential_base=2, max_retry_delay=125_000, total=5`` Retry delays are random distributed values within the ranges of ``[5_000-10_000, 10_000-20_000, 20_000-40_000, 40_000-80_000, 80_000-125_000]``
272+
- ``2``
270273

271274

272275
.. code-block:: python

examples/import_data_set_sync_batching.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ def csv_to_generator(csv_file_path):
3030
"""
3131
Define Retry strategy - 3 attempts => 2, 4, 8
3232
"""
33-
retries = WritesRetry(total=3, backoff_factor=1, exponential_base=2)
33+
retries = WritesRetry(total=3, retry_interval=1, exponential_base=2)
3434
with InfluxDBClient(url='http://localhost:8086', token='my-token', org='my-org', retries=retries) as client:
3535

3636
"""

influxdb_client/client/write/retry.py

+50-7
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
"""Implementation for Retry strategy during HTTP requests."""
22

33
import logging
4+
from datetime import datetime, timedelta
45
from itertools import takewhile
56
from random import random
67

78
from urllib3 import Retry
9+
from urllib3.exceptions import MaxRetryError, ResponseError
810

911
from influxdb_client.client.exceptions import InfluxDBError
1012

@@ -16,27 +18,49 @@ class WritesRetry(Retry):
1618
Writes retry configuration.
1719
1820
:param int jitter_interval: random milliseconds when retrying writes
19-
:param int max_retry_delay: maximum delay when retrying write
20-
:param int exponential_base: base for the exponential retry delay, the next delay is computed as
21-
`backoff_factor * exponential_base^(attempts-1) + random(jitter_interval)`
21+
:param num max_retry_delay: maximum delay when retrying write in seconds
22+
:param int max_retry_time: maximum total retry timeout in seconds, attempt after this timout throws MaxRetryError
23+
:param int total: maximum number of retries
24+
:param num retry_interval: initial first retry delay range in seconds
25+
:param int exponential_base: base for the exponential retry delay,
26+
27+
The next delay is computed as random value between range
28+
`retry_interval * exponential_base^(attempts-1)` and `retry_interval * exponential_base^(attempts)
29+
30+
Example: for retry_interval=5, exponential_base=2, max_retry_delay=125, total=5
31+
retry delays are random distributed values within the ranges of
32+
[5-10, 10-20, 20-40, 40-80, 80-125]
33+
2234
"""
2335

24-
def __init__(self, jitter_interval=0, max_retry_delay=180, exponential_base=5, **kw):
36+
def __init__(self, jitter_interval=0, max_retry_delay=125, exponential_base=2, max_retry_time=180, total=5,
37+
retry_interval=5, **kw):
2538
"""Initialize defaults."""
2639
super().__init__(**kw)
2740
self.jitter_interval = jitter_interval
41+
self.total = total
42+
self.retry_interval = retry_interval
2843
self.max_retry_delay = max_retry_delay
44+
self.max_retry_time = max_retry_time
2945
self.exponential_base = exponential_base
46+
self.retry_timeout = datetime.now() + timedelta(seconds=max_retry_time)
3047

3148
def new(self, **kw):
3249
"""Initialize defaults."""
3350
if 'jitter_interval' not in kw:
3451
kw['jitter_interval'] = self.jitter_interval
52+
if 'retry_interval' not in kw:
53+
kw['retry_interval'] = self.retry_interval
3554
if 'max_retry_delay' not in kw:
3655
kw['max_retry_delay'] = self.max_retry_delay
56+
if 'max_retry_time' not in kw:
57+
kw['max_retry_time'] = self.max_retry_time
3758
if 'exponential_base' not in kw:
3859
kw['exponential_base'] = self.exponential_base
39-
return super().new(**kw)
60+
61+
new = super().new(**kw)
62+
new.retry_timeout = self.retry_timeout
63+
return new
4064

4165
def is_retry(self, method, status_code, has_retry_after=False):
4266
"""is_retry doesn't require retry_after header. If there is not Retry-After we will use backoff."""
@@ -58,8 +82,21 @@ def get_backoff_time(self):
5882
if consecutive_errors_len < 0:
5983
return 0
6084

61-
backoff_value = self.backoff_factor * (self.exponential_base ** consecutive_errors_len) + self._jitter_delay()
62-
return min(self.max_retry_delay, backoff_value)
85+
range_start = self.retry_interval
86+
range_stop = self.retry_interval * self.exponential_base
87+
88+
i = 1
89+
while i <= consecutive_errors_len:
90+
i += 1
91+
range_start = range_stop
92+
range_stop = range_stop * self.exponential_base
93+
if range_stop > self.max_retry_delay:
94+
break
95+
96+
if range_stop > self.max_retry_delay:
97+
range_stop = self.max_retry_delay
98+
99+
return range_start + (range_stop - range_start) * self._random()
63100

64101
def get_retry_after(self, response):
65102
"""Get the value of Retry-After header and append random jitter delay."""
@@ -70,6 +107,9 @@ def get_retry_after(self, response):
70107

71108
def increment(self, method=None, url=None, response=None, error=None, _pool=None, _stacktrace=None):
72109
"""Return a new Retry object with incremented retry counters."""
110+
if self.retry_timeout < datetime.now():
111+
raise MaxRetryError(_pool, url, error or ResponseError("max_retry_time exceeded"))
112+
73113
new_retry = super().increment(method, url, response, error, _pool, _stacktrace)
74114

75115
if response is not None:
@@ -89,3 +129,6 @@ def increment(self, method=None, url=None, response=None, error=None, _pool=None
89129

90130
def _jitter_delay(self):
91131
return self.jitter_interval * random()
132+
133+
def _random(self):
134+
return random()

influxdb_client/client/write_api.py

+11-13
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,10 @@ def __init__(self, write_type: WriteType = WriteType.batching,
3838
batch_size=1_000, flush_interval=1_000,
3939
jitter_interval=0,
4040
retry_interval=5_000,
41-
max_retries=3,
42-
max_retry_delay=180_000,
43-
exponential_base=5,
41+
max_retries=5,
42+
max_retry_delay=125_000,
43+
max_retry_time=180_000,
44+
exponential_base=2,
4445
write_scheduler=ThreadPoolScheduler(max_workers=1)) -> None:
4546
"""
4647
Create write api configuration.
@@ -51,10 +52,10 @@ def __init__(self, write_type: WriteType = WriteType.batching,
5152
:param jitter_interval: this is primarily to avoid large write spikes for users running a large number of
5253
client instances ie, a jitter of 5s and flush duration 10s means flushes will happen every 10-15s.
5354
:param retry_interval: the time to wait before retry unsuccessful write
54-
:param max_retries: the number of max retries when write fails
55+
:param max_retries: the number of max retries when write fails, 0 means retry is disabled
5556
:param max_retry_delay: the maximum delay between each retry attempt in milliseconds
56-
:param exponential_base: base for the exponential retry delay, the next delay is computed as
57-
`retry_interval * exponential_base^(attempts-1) + random(jitter_interval)`
57+
:param max_retry_time: total timeout for all retry attempts in milliseconds, if 0 retry is disabled
58+
:param exponential_base: base for the exponential retry delay
5859
:param write_scheduler:
5960
"""
6061
self.write_type = write_type
@@ -64,16 +65,18 @@ def __init__(self, write_type: WriteType = WriteType.batching,
6465
self.retry_interval = retry_interval
6566
self.max_retries = max_retries
6667
self.max_retry_delay = max_retry_delay
68+
self.max_retry_time = max_retry_time
6769
self.exponential_base = exponential_base
6870
self.write_scheduler = write_scheduler
6971

7072
def to_retry_strategy(self):
7173
"""Create a Retry strategy from write options."""
7274
return WritesRetry(
7375
total=self.max_retries,
74-
backoff_factor=self.retry_interval / 1_000,
76+
retry_interval=self.retry_interval / 1_000,
7577
jitter_interval=self.jitter_interval / 1_000,
7678
max_retry_delay=self.max_retry_delay / 1_000,
79+
max_retry_time=self.max_retry_time / 1_000,
7780
exponential_base=self.exponential_base,
7881
method_whitelist=["POST"])
7982

@@ -363,12 +366,7 @@ def _http(self, batch_item: _BatchItem):
363366

364367
logger.debug("Write time series data into InfluxDB: %s", batch_item)
365368

366-
retry = WritesRetry(
367-
total=self._write_options.max_retries,
368-
backoff_factor=self._write_options.retry_interval / 1_000,
369-
jitter_interval=self._write_options.jitter_interval / 1_000,
370-
max_retry_delay=self._write_options.max_retry_delay / 1_000,
371-
method_whitelist=["POST"])
369+
retry = self._write_options.to_retry_strategy()
372370

373371
self._post_write(False, batch_item.key.bucket, batch_item.key.org, batch_item.data,
374372
batch_item.key.precision, urlopen_kw={'retries': retry})

tests/test_WriteApiBatching.py

+33-1
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ def test_retry_interval(self):
198198
time.sleep(1)
199199
self.assertEqual(1, len(httpretty.httpretty.latest_requests), msg="first request immediately")
200200

201-
time.sleep(1.5)
201+
time.sleep(3)
202202
self.assertEqual(2, len(httpretty.httpretty.latest_requests), msg="second request after delay_interval")
203203

204204
time.sleep(3)
@@ -238,6 +238,38 @@ def test_retry_interval_max_retries(self):
238238

239239
self.assertEqual(6, len(httpretty.httpretty.latest_requests))
240240

241+
def test_retry_disabled_max_retries(self):
242+
httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/write", status=429,
243+
adding_headers={'Retry-After': '1'})
244+
245+
self._write_client.close()
246+
self._write_client = WriteApi(influxdb_client=self.influxdb_client,
247+
write_options=WriteOptions(max_retries=0,batch_size=2, flush_interval=1_000))
248+
249+
self._write_client.write("my-bucket", "my-org",
250+
["h2o_feet,location=coyote_creek level\\ water_level=1 1",
251+
"h2o_feet,location=coyote_creek level\\ water_level=2 2"])
252+
253+
time.sleep(2)
254+
255+
self.assertEqual(1, len(httpretty.httpretty.latest_requests))
256+
257+
def test_retry_disabled_max_retry_time(self):
258+
httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/write", status=429,
259+
adding_headers={'Retry-After': '1'})
260+
261+
self._write_client.close()
262+
self._write_client = WriteApi(influxdb_client=self.influxdb_client,
263+
write_options=WriteOptions(max_retry_time=0,batch_size=2, flush_interval=1_000))
264+
265+
self._write_client.write("my-bucket", "my-org",
266+
["h2o_feet,location=coyote_creek level\\ water_level=1 1",
267+
"h2o_feet,location=coyote_creek level\\ water_level=2 2"])
268+
269+
time.sleep(5)
270+
271+
self.assertEqual(1, len(httpretty.httpretty.latest_requests))
272+
241273
def test_recover_from_error(self):
242274
httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/write", status=204)
243275
httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/write", status=400)

tests/test_WriteOptions.py

+6-7
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,11 @@ class TestWriteOptions(unittest.TestCase):
77
def test_default(self):
88
retry = WriteOptions().to_retry_strategy()
99

10-
self.assertEqual(retry.total, 3)
11-
self.assertEqual(retry.backoff_factor, 5)
12-
self.assertEqual(retry.jitter_interval, 0)
13-
self.assertEqual(retry.max_retry_delay, 180)
14-
self.assertEqual(retry.exponential_base, 5)
10+
self.assertEqual(retry.total, 5)
11+
self.assertEqual(retry.retry_interval, 5)
12+
self.assertEqual(retry.max_retry_time, 180)
13+
self.assertEqual(retry.max_retry_delay, 125)
14+
self.assertEqual(retry.exponential_base, 2)
1515
self.assertEqual(retry.method_whitelist, ["POST"])
1616

1717
def test_custom(self):
@@ -21,8 +21,7 @@ def test_custom(self):
2121
.to_retry_strategy()
2222

2323
self.assertEqual(retry.total, 5)
24-
self.assertEqual(retry.backoff_factor, 0.5)
25-
self.assertEqual(retry.jitter_interval, 2)
24+
self.assertEqual(retry.retry_interval, 0.5)
2625
self.assertEqual(retry.max_retry_delay, 7.5)
2726
self.assertEqual(retry.exponential_base, 2)
2827
self.assertEqual(retry.method_whitelist, ["POST"])

0 commit comments

Comments
 (0)