Skip to content

Commit 1b77e82

Browse files
committed
Merge pull request #2524 from fluent/buf_file-suffix
buf_file: Add path_suffix parameter. ref #2236 Signed-off-by: Masahiro Nakagawa <repeatedly@gmail.com>
1 parent 3b29e04 commit 1b77e82

File tree

2 files changed

+35
-17
lines changed

2 files changed

+35
-17
lines changed

lib/fluent/plugin/buf_file.rb

+7-5
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ class FileBuffer < Fluent::Plugin::Buffer
3434

3535
desc 'The path where buffer chunks are stored.'
3636
config_param :path, :string, default: nil
37+
desc 'The suffix of buffer chunks'
38+
config_param :path_suffix, :string, default: '.log'
3739

3840
config_set_default :chunk_limit_size, DEFAULT_CHUNK_LIMIT_SIZE
3941
config_set_default :total_limit_size, DEFAULT_TOTAL_LIMIT_SIZE
@@ -78,23 +80,23 @@ def configure(conf)
7880

7981
if specified_directory_exists || unexisting_path_for_directory # directory
8082
if using_plugin_root_dir || !multi_workers_configured
81-
@path = File.join(@path, 'buffer.*.log')
83+
@path = File.join(@path, "buffer.*#{@path_suffix}")
8284
else
83-
@path = File.join(@path, "worker#{fluentd_worker_id}", 'buffer.*.log')
85+
@path = File.join(@path, "worker#{fluentd_worker_id}", "buffer.*#{@path_suffix}")
8486
if fluentd_worker_id == 0
8587
# worker 0 always checks unflushed buffer chunks to be resumed (might be created while non-multi-worker configuration)
86-
@additional_resume_path = File.join(File.expand_path("../../", @path), 'buffer.*.log')
88+
@additional_resume_path = File.join(File.expand_path("../../", @path), "buffer.*#{@path_suffix}")
8789
end
8890
end
8991
@multi_workers_available = true
9092
else # specified path is file path
9193
if File.basename(@path).include?('.*.')
9294
# valid file path
9395
elsif File.basename(@path).end_with?('.*')
94-
@path = @path + '.log'
96+
@path = @path + @path_suffix
9597
else
9698
# existing file will be ignored
97-
@path = @path + '.*.log'
99+
@path = @path + ".*#{@path_suffix}"
98100
end
99101
@multi_workers_available = false
100102
end

test/plugin/test_buf_file.rb

+28-12
Original file line numberDiff line numberDiff line change
@@ -57,20 +57,30 @@ def write_metadata(path, chunk_id, metadata, size, ctime, mtime)
5757
assert_equal File.join(@dir, 'buffer.*.file'), p.path
5858
end
5959

60-
test 'existing directory will be used with additional default file name' do
60+
data('default' => [nil, 'log'],
61+
'conf' => ['.buf', 'buf'])
62+
test 'existing directory will be used with additional default file name' do |params|
63+
conf, suffix = params
6164
d = FluentPluginFileBufferTest::DummyOutputPlugin.new
6265
p = Fluent::Plugin::FileBuffer.new
6366
p.owner = d
64-
p.configure(config_element('buffer', '', {'path' => @dir}))
65-
assert_equal File.join(@dir, 'buffer.*.log'), p.path
67+
c = {'path' => @dir}
68+
c['path_suffix'] = conf if conf
69+
p.configure(config_element('buffer', '', c))
70+
assert_equal File.join(@dir, "buffer.*.#{suffix}"), p.path
6671
end
6772

68-
test 'unexisting path without * handled as directory' do
73+
data('default' => [nil, 'log'],
74+
'conf' => ['.buf', 'buf'])
75+
test 'unexisting path without * handled as directory' do |params|
76+
conf, suffix = params
6977
d = FluentPluginFileBufferTest::DummyOutputPlugin.new
7078
p = Fluent::Plugin::FileBuffer.new
7179
p.owner = d
72-
p.configure(config_element('buffer', '', {'path' => File.join(@dir, 'buffer')}))
73-
assert_equal File.join(@dir, 'buffer', 'buffer.*.log'), p.path
80+
c = {'path' => File.join(@dir, 'buffer')}
81+
c['path_suffix'] = conf if conf
82+
p.configure(config_element('buffer', '', c))
83+
assert_equal File.join(@dir, 'buffer', "buffer.*.#{suffix}"), p.path
7484
end
7585
end
7686

@@ -312,10 +322,6 @@ def write_metadata(path, chunk_id, metadata, size, ctime, mtime)
312322
@d = FluentPluginFileBufferTest::DummyOutputPlugin.new
313323
@p = Fluent::Plugin::FileBuffer.new
314324
@p.owner = @d
315-
Fluent::SystemConfig.overwrite_system_config('root_dir' => @root_dir) do
316-
@d.configure(config_element('ROOT', '', {'@id' => 'dummy_output_with_buf'}))
317-
@p.configure(config_element('buffer', ''))
318-
end
319325
end
320326

321327
teardown do
@@ -329,8 +335,18 @@ def write_metadata(path, chunk_id, metadata, size, ctime, mtime)
329335
end
330336
end
331337

332-
test '#start creates directory for buffer chunks' do
333-
expected_buffer_path = File.join(@root_dir, 'worker0', 'dummy_output_with_buf', 'buffer', 'buffer.*.log')
338+
data('default' => [nil, 'log'],
339+
'conf' => ['.buf', 'buf'])
340+
test '#start creates directory for buffer chunks' do |params|
341+
conf, suffix = params
342+
c = {}
343+
c['path_suffix'] = conf if conf
344+
Fluent::SystemConfig.overwrite_system_config('root_dir' => @root_dir) do
345+
@d.configure(config_element('ROOT', '', {'@id' => 'dummy_output_with_buf'}))
346+
@p.configure(config_element('buffer', '', c))
347+
end
348+
349+
expected_buffer_path = File.join(@root_dir, 'worker0', 'dummy_output_with_buf', 'buffer', "buffer.*.#{suffix}")
334350
expected_buffer_dir = File.dirname(expected_buffer_path)
335351
assert_equal expected_buffer_path, @p.path
336352
assert_false Dir.exist?(expected_buffer_dir)

0 commit comments

Comments
 (0)