Skip to content

Commit 06593de

Browse files
authored
feat: batching write - exponential random backoff (#83)
1 parent b9ef186 commit 06593de

File tree

7 files changed

+224
-110
lines changed

7 files changed

+224
-110
lines changed

README.md

+6-5
Original file line numberDiff line numberDiff line change
@@ -159,13 +159,14 @@ The writes are processed in batches which are configurable by `WriteOptions`:
159159

160160
| Property | Description | Default Value |
161161
| --- | --- | --- |
162-
| batchSize | the number of data point to collect in batch | 1000 |
163-
| flush_interval | the number of milliseconds before the batch is written | 1000 |
164-
| retry_interval | the number of milliseconds to retry unsuccessful write. The retry interval is used when the InfluxDB server does not specify "Retry-After" header. | 5000 |
162+
| batchSize | the number of data point to collect in batch | 1_000 |
163+
| flush_interval | the number of milliseconds before the batch is written | 1_000 |
164+
| retry_interval | the number of milliseconds to retry unsuccessful write. The retry interval is used when the InfluxDB server does not specify "Retry-After" header. | 5_000 |
165165
| jitter_interval | the number of milliseconds to increase the batch flush interval by a random amount | 0 |
166166
| max_retries | the number of max retries when write fails | 5 |
167-
| max_retry_delay | maximum delay when retrying write in milliseconds | 180000 |
168-
| exponential_base | the base for the exponential retry delay, the next delay is computed as `retry_interval * exponential_base^(attempts - 1) + random(jitter_interval)` | 5 |
167+
| max_retry_delay | maximum delay when retrying write in milliseconds | 125_000 |
168+
| max_retry_time | maximum total retry timeout in milliseconds | 180_000 |
169+
| exponential_base | the base for the exponential retry delay, the next delay is computed using random exponential backoff as a random value within the interval ``retry_interval * exponential_base^(attempts-1)`` and ``retry_interval * exponential_base^(attempts)``. Example for ``retry_interval=5000, exponential_base=2, max_retry_delay=125000, total=5`` Retry delays are random distributed values within the ranges of ``[5000-10000, 10000-20000, 20000-40000, 40000-80000, 80000-125000]`` | 2 |
169170
| batch_abort_on_exception | the batching worker will be aborted after failed retry strategy | false |
170171
```ruby
171172
write_options = InfluxDB2::WriteOptions.new(write_type: InfluxDB2::WriteType::BATCHING,

lib/influxdb-client.rb

+1
Original file line numberDiff line numberDiff line change
@@ -28,3 +28,4 @@
2828
require 'influxdb2/client/health_api'
2929
require 'influxdb2/client/point'
3030
require 'influxdb2/client/flux_table'
31+
require 'influxdb2/client/write_retry'

lib/influxdb2/client/worker.rb

+14-22
Original file line numberDiff line numberDiff line change
@@ -96,36 +96,28 @@ def _check_background_queue(size: false)
9696

9797
def _write(data)
9898
data.each do |key, points|
99-
_write_raw(key, points, 1, @write_options.retry_interval)
99+
_write_raw(key, points)
100100
end
101101
end
102102

103-
def _write_raw(key, points, attempts, retry_interval)
103+
def _write_raw(key, points)
104+
write_retry = InfluxDB2::WriteRetry.new(
105+
api_client: @api_client,
106+
max_retries: @write_options.max_retries,
107+
exponential_base: @write_options.exponential_base,
108+
retry_interval: @write_options.retry_interval,
109+
max_retry_delay: @write_options.max_retry_delay,
110+
max_retry_time: @write_options.max_retry_time
111+
)
112+
104113
if @write_options.jitter_interval > 0
105114
jitter_delay = (@write_options.jitter_interval.to_f / 1_000) * rand
106115
sleep jitter_delay
107116
end
108-
@api_client.write_raw(points.join("\n"), precision: key.precision, bucket: key.bucket, org: key.org)
109-
rescue InfluxError => e
110-
raise e if attempts > @write_options.max_retries
111-
raise e if (e.code.nil? || e.code.to_i < 429) && !_connection_error(e.original)
112-
113-
timeout = if e.retry_after.empty?
114-
[retry_interval.to_f, @write_options.max_retry_delay.to_f].min / 1_000
115-
else
116-
e.retry_after.to_f
117-
end
118-
119-
message = 'The retriable error occurred during writing of data. '\
120-
"Reason: '#{e.message}'. Retry in: #{timeout}s."
121-
122-
@api_client.log(:warn, message)
123-
sleep timeout
124-
_write_raw(key, points, attempts + 1, retry_interval * @write_options.exponential_base)
125-
end
126117

127-
def _connection_error(error)
128-
InfluxError::HTTP_ERRORS.any? { |c| error.instance_of? c }
118+
write_retry.retry do
119+
@api_client.write_raw(points.join("\n"), precision: key.precision, bucket: key.bucket, org: key.org)
120+
end
129121
end
130122
end
131123
end

lib/influxdb2/client/write_api.rb

+13-8
Original file line numberDiff line numberDiff line change
@@ -36,19 +36,23 @@ class WriteOptions
3636
# @param [Integer] jitter_interval: the number of milliseconds to increase the batch flush interval
3737
# @param [Integer] max_retries: max number of retries when write fails
3838
# @param [Integer] max_retry_delay: maximum delay when retrying write in milliseconds
39-
# by a random amount
40-
# @param [Integer] exponential_base: base for the exponential retry delay, the next delay is computed as
41-
# "exponential_base^(attempts-1) + random(jitter_interval)"
39+
# @param [Integer] max_retry_time: maximum total retry timeout in milliseconds
40+
# @param [Integer] exponential_base: the next delay is computed using random exponential backoff as a random value
41+
# within the interval retry_interval*exponential_base^(attempts-1) and retry_interval*exponential_base^(attempts).
42+
# Example for retry_interval=5000, exponential_base=2, max_retry_delay=125000, total=5
43+
# Retry delays are random distributed values within the ranges of
44+
# [5000-10000, 10000-20000, 20000-40000, 40000-80000, 80000-125000]
4245
# @param [Boolean] batch_abort_on_exception: batching worker will be aborted after failed retry strategy
4346
def initialize(write_type: WriteType::SYNCHRONOUS, batch_size: 1_000, flush_interval: 1_000, retry_interval: 5_000,
44-
jitter_interval: 0, max_retries: 5, max_retry_delay: 180_000, exponential_base: 5,
45-
batch_abort_on_exception: false)
47+
jitter_interval: 0, max_retries: 5, max_retry_delay: 125_000, max_retry_time: 180_000,
48+
exponential_base: 2, batch_abort_on_exception: false)
4649
_check_not_negative('batch_size', batch_size)
4750
_check_not_negative('flush_interval', flush_interval)
4851
_check_not_negative('retry_interval', retry_interval)
4952
_check_positive('jitter_interval', jitter_interval)
50-
_check_positive('max_retries', jitter_interval)
51-
_check_positive('max_retry_delay', jitter_interval)
53+
_check_positive('max_retries', max_retries)
54+
_check_positive('max_retry_delay', max_retry_delay)
55+
_check_positive('max_retry_time', max_retry_time)
5256
_check_positive('exponential_base', exponential_base)
5357
@write_type = write_type
5458
@batch_size = batch_size
@@ -57,12 +61,13 @@ def initialize(write_type: WriteType::SYNCHRONOUS, batch_size: 1_000, flush_inte
5761
@jitter_interval = jitter_interval
5862
@max_retries = max_retries
5963
@max_retry_delay = max_retry_delay
64+
@max_retry_time = max_retry_time
6065
@exponential_base = exponential_base
6166
@batch_abort_on_exception = batch_abort_on_exception
6267
end
6368

6469
attr_reader :write_type, :batch_size, :flush_interval, :retry_interval, :jitter_interval,
65-
:max_retries, :max_retry_delay, :exponential_base, :batch_abort_on_exception
70+
:max_retries, :max_retry_delay, :max_retry_time, :exponential_base, :batch_abort_on_exception
6671

6772
def _check_not_negative(key, value)
6873
raise ArgumentError, "The '#{key}' should be positive or zero, but is: #{value}" if value <= 0

lib/influxdb2/client/write_retry.rb

+104
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
# The MIT License
2+
#
3+
# Permission is hereby granted, free of charge, to any person obtaining a copy
4+
# of this software and associated documentation files (the "Software"), to deal
5+
# in the Software without restriction, including without limitation the rights
6+
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7+
# copies of the Software, and to permit persons to whom the Software is
8+
# furnished to do so, subject to the following conditions:
9+
#
10+
# The above copyright notice and this permission notice shall be included in
11+
# all copies or substantial portions of the Software.
12+
#
13+
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14+
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15+
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16+
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17+
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18+
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19+
# THE SOFTWARE.
20+
21+
module InfluxDB2
22+
# Exponential random write retry.
23+
class WriteRetry
24+
@error_msg_prefix = 'Error with options to with_retries:'
25+
26+
# @param [Hash] options the retry options.
27+
# @option options [Integer] :max_retries (5) The maximum number of times to run the block.
28+
# @option options [Integer] :retry_interval (5000) number of milliseconds to retry unsuccessful write.
29+
# @option options [Integer] :max_retry_delay (125_000) maximum delay when retrying write in milliseconds.
30+
# @option options [Integer] :max_retry_time (180_000) maximum total retry timeout in milliseconds.
31+
# @option options [Integer] :exponential_base base for the exponential retry delay
32+
# @option options [Integer] :jitter_delay random milliseconds added to write interval
33+
def initialize(options = {})
34+
@api_client = options[:api_client]
35+
@max_retries = options[:max_retries] || 5
36+
raise "#{@error_msg_prefix} :max_retries must be greater than 0." unless @max_retries > 0
37+
38+
@retry_interval = options[:retry_interval] || 5_000
39+
@max_retry_delay = options[:max_retry_delay] || 125_000
40+
@max_retry_time = options[:max_retry_time] || 180_000
41+
@exponential_base = options[:exponential_base] || 2
42+
@jitter_interval = options[:jitter_interval] || 0
43+
raise "#{@error_msg_prefix} :retry_interval cannot be greater than :max_retry_delay." if
44+
@retry_interval > @max_retry_delay
45+
end
46+
47+
def get_backoff_time(attempts)
48+
range_start = @retry_interval
49+
range_stop = @retry_interval * @exponential_base
50+
51+
i = 1
52+
while i < attempts
53+
i += 1
54+
range_start = range_stop
55+
range_stop *= @exponential_base
56+
break if range_stop > @max_retry_delay
57+
end
58+
59+
range_stop = @max_retry_delay if range_stop > @max_retry_delay
60+
range_start + (range_stop - range_start) * rand
61+
end
62+
63+
# Runs the supplied code block with a exponential backoff retry strategy.
64+
def retry
65+
raise "#{@error_msg_prefix} must be passed a block" unless block_given?
66+
67+
attempts = 0
68+
start_time = Time.now
69+
begin
70+
attempts += 1
71+
yield attempts
72+
rescue InfluxError => e
73+
if attempts > @max_retries
74+
@api_client.log(:error, 'Maximum retry attempts reached.')
75+
raise e
76+
end
77+
78+
if (Time.now - start_time) * 1000 > @max_retry_time
79+
@api_client.log(:error, "Maximum retry time #{@max_retry_time} ms exceeded")
80+
raise e
81+
end
82+
83+
raise e if (e.code.nil? || e.code.to_i < 429) && !_connection_error(e.original)
84+
85+
timeout = if e.retry_after.nil? || e.retry_after.empty?
86+
get_backoff_time(attempts)
87+
else
88+
(e.retry_after.to_f * 1000) + @jitter_interval * rand
89+
end
90+
91+
message = 'The retriable error occurred during writing of data. '\
92+
"Reason: '#{e.message}'. Retry in: #{timeout.to_f / 1000}s."
93+
94+
@api_client.log(:warn, message)
95+
sleep timeout / 1000
96+
retry
97+
end
98+
end
99+
100+
def _connection_error(error)
101+
InfluxError::HTTP_ERRORS.any? { |c| error.instance_of? c }
102+
end
103+
end
104+
end

0 commit comments

Comments
 (0)