Skip to content

Commit 897a072

Browse files
committed
feat: batching write - exponential random backoff (code style fix)
1 parent 9044b15 commit 897a072

File tree

4 files changed

+50
-57
lines changed

4 files changed

+50
-57
lines changed

lib/influxdb2/client/worker.rb

+6-7
Original file line numberDiff line numberDiff line change
@@ -102,12 +102,12 @@ def _write(data)
102102

103103
def _write_raw(key, points)
104104
write_retry = InfluxDB2::WriteRetry.new(
105-
:api_client => @api_client,
106-
:max_retries => @write_options.max_retries,
107-
:exponential_base => @write_options.exponential_base,
108-
:retry_interval => @write_options.retry_interval,
109-
:max_retry_delay => @write_options.max_retry_delay,
110-
:max_retry_time => @write_options.max_retry_time,
105+
api_client: @api_client,
106+
max_retries: @write_options.max_retries,
107+
exponential_base: @write_options.exponential_base,
108+
retry_interval: @write_options.retry_interval,
109+
max_retry_delay: @write_options.max_retry_delay,
110+
max_retry_time: @write_options.max_retry_time
111111
)
112112

113113
if @write_options.jitter_interval > 0
@@ -119,6 +119,5 @@ def _write_raw(key, points)
119119
@api_client.write_raw(points.join("\n"), precision: key.precision, bucket: key.bucket, org: key.org)
120120
end
121121
end
122-
123122
end
124123
end

lib/influxdb2/client/write_retry.rb

+16-15
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,9 @@
1919
# THE SOFTWARE.
2020

2121
module InfluxDB2
22+
# Exponential random write retry.
2223
class WriteRetry
23-
@error_msg_prefix = "Error with options to with_retries:"
24+
@error_msg_prefix = 'Error with options to with_retries:'
2425

2526
# @param [Hash] options the retry options.
2627
# @option options [Integer] :max_retries (5) The maximum number of times to run the block.
@@ -33,14 +34,14 @@ def initialize(options = {})
3334
@api_client = options[:api_client]
3435
@max_retries = options[:max_retries] || 5
3536
raise "#{@error_msg_prefix} :max_retries must be greater than 0." unless @max_retries > 0
37+
3638
@retry_interval = options[:retry_interval] || 5_000
3739
@max_retry_delay = options[:max_retry_delay] || 125_000
3840
@max_retry_time = options[:max_retry_time] || 180_000
3941
@exponential_base = options[:exponential_base] || 2
4042
@jitter_interval = options[:jitter_interval] || 0
41-
if @retry_interval > @max_retry_delay
42-
raise "#{@error_msg_prefix} :retry_interval cannot be greater than :max_retry_delay."
43-
end
43+
raise "#{@error_msg_prefix} :retry_interval cannot be greater than :max_retry_delay." if
44+
@retry_interval > @max_retry_delay
4445
end
4546

4647
def get_backoff_time(attempts)
@@ -51,7 +52,7 @@ def get_backoff_time(attempts)
5152
while i < attempts
5253
i += 1
5354
range_start = range_stop
54-
range_stop = range_stop * @exponential_base
55+
range_stop *= @exponential_base
5556
break if range_stop > @max_retry_delay
5657
end
5758

@@ -60,17 +61,17 @@ def get_backoff_time(attempts)
6061
end
6162

6263
# Runs the supplied code block with a exponential backoff retry strategy.
63-
def retry(&block)
64+
def retry
6465
raise "#{@error_msg_prefix} must be passed a block" unless block_given?
66+
6567
attempts = 0
6668
start_time = Time.now
6769
begin
6870
attempts += 1
69-
return block.call(attempts)
71+
yield attempts
7072
rescue InfluxError => e
71-
7273
if attempts > @max_retries
73-
@api_client.log(:error, "Maximum retry attempts reached.")
74+
@api_client.log(:error, 'Maximum retry attempts reached.')
7475
raise e
7576
end
7677

@@ -81,11 +82,11 @@ def retry(&block)
8182

8283
raise e if (e.code.nil? || e.code.to_i < 429) && !_connection_error(e.original)
8384

84-
if e.retry_after.nil? || e.retry_after.empty?
85-
timeout = get_backoff_time(attempts)
86-
else
87-
timeout = (e.retry_after.to_f * 1000) + @jitter_interval * rand
88-
end
85+
timeout = if e.retry_after.nil? || e.retry_after.empty?
86+
get_backoff_time(attempts)
87+
else
88+
(e.retry_after.to_f * 1000) + @jitter_interval * rand
89+
end
8990

9091
message = 'The retriable error occurred during writing of data. '\
9192
"Reason: '#{e.message}'. Retry in: #{timeout.to_f / 1000}s."
@@ -100,4 +101,4 @@ def _connection_error(error)
100101
InfluxError::HTTP_ERRORS.any? { |c| error.instance_of? c }
101102
end
102103
end
103-
end
104+
end

test/influxdb/write_api_batching_test.rb

+19-26
Original file line numberDiff line numberDiff line change
@@ -450,7 +450,6 @@ def test_connection_error
450450
sleep(11)
451451
assert_requested(:post, 'http://localhost:8086/api/v2/write?bucket=my-bucket&org=my-org&precision=ns',
452452
times: 4, body: request)
453-
454453
end
455454

456455
def test_write_connection_error
@@ -475,7 +474,6 @@ def test_write_connection_error
475474

476475
assert_requested(:post, 'http://localhost:8086/api/v2/write?bucket=my-bucket&org=my-org&precision=ns',
477476
times: 3, body: request)
478-
479477
end
480478

481479
def test_abort_on_exception
@@ -564,42 +562,39 @@ def test_retry_contains_message
564562
end
565563

566564
def test_backoff_time_default
567-
568565
retries = InfluxDB2::WriteRetry.new
569566

570567
backoff = retries.get_backoff_time(1)
571-
assert_gte backoff, 5000
572-
assert_lte backoff, 10000
568+
assert_gte backoff, 5_000
569+
assert_lte backoff, 10_000
573570

574571
backoff = retries.get_backoff_time(2)
575-
assert_gte backoff, 10000
576-
assert_lte backoff, 20000
572+
assert_gte backoff, 10_000
573+
assert_lte backoff, 20_000
577574

578575
backoff = retries.get_backoff_time(3)
579-
assert_gte backoff, 20000
580-
assert_lte backoff, 40000
576+
assert_gte backoff, 20_000
577+
assert_lte backoff, 40_000
581578

582579
backoff = retries.get_backoff_time(4)
583-
assert_gte backoff, 40000
584-
assert_lte backoff, 80000
580+
assert_gte backoff, 40_000
581+
assert_lte backoff, 80_000
585582

586583
backoff = retries.get_backoff_time(5)
587-
assert_gte backoff, 80000
588-
assert_lte backoff, 125000
584+
assert_gte backoff, 80_000
585+
assert_lte backoff, 125_000
589586

590587
backoff = retries.get_backoff_time(6)
591-
assert_gte backoff, 80000
592-
assert_lte backoff, 125000
593-
588+
assert_gte backoff, 80_000
589+
assert_lte backoff, 125_000
594590
end
595591

596592
def test_backoff_time_custom
597-
598593
retries = InfluxDB2::WriteRetry.new(
599-
:max_retry_delay => 2000,
600-
:retry_interval => 100,
601-
:exponential_base => 2,
602-
:max_retries => 5,
594+
max_retry_delay: 2_000,
595+
retry_interval: 100,
596+
exponential_base: 2,
597+
max_retries: 5
603598
)
604599

605600
backoff = retries.get_backoff_time(1)
@@ -616,12 +611,10 @@ def test_backoff_time_custom
616611

617612
backoff = retries.get_backoff_time(4)
618613
assert_gte backoff, 800
619-
assert_lte backoff, 1600
614+
assert_lte backoff, 1_600
620615

621616
backoff = retries.get_backoff_time(5)
622-
assert_gte backoff, 1600
623-
assert_lte backoff, 2000
624-
617+
assert_gte backoff, 1_600
618+
assert_lte backoff, 2_000
625619
end
626-
627620
end

test/test_helper.rb

+9-9
Original file line numberDiff line numberDiff line change
@@ -52,18 +52,18 @@ def add(level, &block)
5252
end
5353
end
5454

55-
def assert_gt(a, b)
56-
assert_operator a, :>, b
55+
def assert_gt(val1, val2)
56+
assert_operator val1, :>, val2
5757
end
5858

59-
def assert_gte(a, b)
60-
assert_operator a, :>=, b
59+
def assert_gte(val1, val2)
60+
assert_operator val1, :>=, val2
6161
end
6262

63-
def assert_lt(a, b)
64-
assert_operator a, :<, b
63+
def assert_lt(val1, val2)
64+
assert_operator val1, :<, val2
6565
end
6666

67-
def assert_lte(a, b)
68-
assert_operator a, :<=, b
69-
end
67+
def assert_lte(val1, val2)
68+
assert_operator val1, :<=, val2
69+
end

0 commit comments

Comments
 (0)