Skip to content

Commit 498eb5e

Browse files
author
Guy Boertje
authored
Add better support for file rotation (logstash-plugins#192)
* Add better support for file rotation * Move common_restat for watched and active to one iteration to better handle when a file is rotated that has never been active. Change the way @previous_inode_size is cached, it needs to be set only when an inode change is detected. Change discover associate behaviour if the matched watched_file is open. Add spec for when a file is rotated that has never been active. Alter specs because we can rely on ordering with "file_sort_by" * Ensure single file `path` option is ok, reverts earlier change in this PR * Improve and expand our use of FFI on Windows * jnr and ffi interop, yay * put self back in * use fieldId label * handle rotations better. especially in the case where the rotated files are discoverable - in this case we were not the moving the state and the sincedb record correctly. Next - abstract the *nix and Windows stat calls and structure into two classes with the same API. * fixes for rebase from master * Abstract Stat part 1 * Abstract Stat part 2 * Finally have (all) the kinks worked out * Try to fix travis failures. * Try to fix travis failures 2 * Try fix travis 3 * Try fix travis 4 * Try fix travis 5 * Try fix travis 6 * Remove io based stat reliance. travis jruby 1.7.27 should pass now. Move all tail processing state iteration into its own method Clean up rspec sequencing usage. Remove all Mutex use in favour of AtomicBoolean, AtomicArray, AtomicHash Add wait for completely_stopped before removing sincedb in file_tail spec * Some windows fixes * more windows fixes * windows changes 2 * rename rspec run tag in ci/build.sh * move one trace logging line * add first run discovery methods * fix regression on files seen after inital run, travis 2 use docker. * add execute permissions * fix path ordering travis failures * fix jar loading so it works for tests and when installed in LS * reorder the jar require statements Fixes logstash-plugins#198
1 parent c3c7847 commit 498eb5e

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

54 files changed

+2417
-946
lines changed

.travis.yml

+12-14
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,16 @@
11
---
2-
sudo: false
3-
language: ruby
4-
cache: bundler
2+
sudo: required
3+
services: docker
4+
addons:
5+
apt:
6+
packages:
7+
- docker-ce
58
matrix:
69
include:
7-
- rvm: jruby-9.1.13.0
8-
env: LOGSTASH_BRANCH=master
9-
- rvm: jruby-9.1.13.0
10-
env: LOGSTASH_BRANCH=6.x
11-
- rvm: jruby-9.1.13.0
12-
env: LOGSTASH_BRANCH=6.0
13-
- rvm: jruby-1.7.27
14-
env: LOGSTASH_BRANCH=5.6
10+
- env: ELASTIC_STACK_VERSION=5.6.10
11+
- env: ELASTIC_STACK_VERSION=6.3.0
12+
- env: ELASTIC_STACK_VERSION=6.4.0-SNAPSHOT
13+
- env: ELASTIC_STACK_VERSION=7.0.0-alpha1-SNAPSHOT
1514
fast_finish: true
16-
install: true
17-
script: ci/build.sh
18-
jdk: oraclejdk8
15+
install: ci/unit/docker-setup.sh
16+
script: ci/unit/docker-run.sh

JAR_VERSION

+1-1
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
1.0.0
1+
1.0.1

README.md

-3
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,6 @@
22
Travis Build
33
[![Travis Build Status](https://travis-ci.org/logstash-plugins/logstash-input-file.svg)](https://travis-ci.org/logstash-plugins/logstash-input-file)
44

5-
Jenkins Build
6-
[![Travis Build Status](https://travis-ci.org/logstash-plugins/logstash-input-file.svg)](https://travis-ci.org/logstash-plugins/logstash-input-file)
7-
85
This is a plugin for [Logstash](https://github.com/elastic/logstash).
96

107
It is fully free and fully open source. The license is Apache 2.0, meaning you are pretty much free to use it however you want in whatever way.

ci/build.sh

-21
This file was deleted.

ci/setup.sh

-26
This file was deleted.

ci/unit/Dockerfile

+11
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
ARG ELASTIC_STACK_VERSION
2+
FROM docker.elastic.co/logstash/logstash:$ELASTIC_STACK_VERSION
3+
WORKDIR /usr/share/logstash/logstash-core
4+
RUN cp versions-gem-copy.yml ../logstash-core-plugin-api/versions-gem-copy.yml
5+
COPY --chown=logstash:logstash . /usr/share/plugins/this
6+
WORKDIR /usr/share/plugins/this
7+
ENV PATH=/usr/share/logstash/vendor/jruby/bin:${PATH}
8+
ENV LOGSTASH_SOURCE 1
9+
RUN jruby -S gem install bundler
10+
RUN jruby -S bundle install --jobs=3 --retry=3
11+
RUN jruby -S bundle exec rake vendor

ci/unit/docker-compose.yml

+17
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
version: '3'
2+
3+
# run tests: docker-compose -f ci/unit/docker-compose.yml up --build --force-recreate
4+
# only set up: docker-compose -f ci/unit/docker-compose.yml up --build --no-start --force-recreate
5+
# start manually: docker-compose -f ci/unit/docker-compose.yml run logstash
6+
services:
7+
logstash:
8+
build:
9+
context: ../../
10+
dockerfile: ci/unit/Dockerfile
11+
args:
12+
- ELASTIC_STACK_VERSION=$ELASTIC_STACK_VERSION
13+
command: /usr/share/plugins/this/ci/unit/run.sh
14+
environment:
15+
LS_JAVA_OPTS: "-Xmx256m -Xms256m"
16+
OSS: "true"
17+
tty: true

ci/unit/docker-run.sh

+7
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
#!/bin/bash
2+
3+
# This is intended to be run the plugin's root directory. `ci/unit/docker-test.sh`
4+
# Ensure you have Docker installed locally and set the ELASTIC_STACK_VERSION environment variable.
5+
set -e
6+
7+
docker-compose -f ci/unit/docker-compose.yml run logstash

ci/unit/docker-setup.sh

+31
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
#!/bin/bash
2+
3+
# This is intended to be run the plugin's root directory. `ci/unit/docker-test.sh`
4+
# Ensure you have Docker installed locally and set the ELASTIC_STACK_VERSION environment variable.
5+
set -e
6+
7+
if [ "$ELASTIC_STACK_VERSION" ]; then
8+
echo "Testing against version: $ELASTIC_STACK_VERSION"
9+
10+
if [[ "$ELASTIC_STACK_VERSION" = *"-SNAPSHOT" ]]; then
11+
cd /tmp
12+
wget https://snapshots.elastic.co/docker/logstash-"$ELASTIC_STACK_VERSION".tar.gz
13+
tar xfvz logstash-"$ELASTIC_STACK_VERSION".tar.gz repositories
14+
echo "Loading docker image: "
15+
cat repositories
16+
docker load < logstash-"$ELASTIC_STACK_VERSION".tar.gz
17+
rm logstash-"$ELASTIC_STACK_VERSION".tar.gz
18+
cd -
19+
fi
20+
21+
if [ -f Gemfile.lock ]; then
22+
rm Gemfile.lock
23+
fi
24+
25+
docker-compose -f ci/unit/docker-compose.yml down
26+
docker-compose -f ci/unit/docker-compose.yml up --no-start --build --force-recreate logstash
27+
else
28+
echo "Please set the ELASTIC_STACK_VERSION environment variable"
29+
echo "For example: export ELASTIC_STACK_VERSION=6.2.4"
30+
exit 1
31+
fi

ci/unit/run.sh

+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
#!/bin/bash
2+
3+
# This is intended to be run inside the docker container as the command of the docker-compose.
4+
set -ex
5+
6+
bundle exec rspec -fd --pattern spec/**/*_spec.rb,spec/**/*_specs.rb

lib/filewatch/bootstrap.rb

+9-21
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
# encoding: utf-8
2-
require "rbconfig"
32
require "pathname"
4-
# require "logstash/environment"
53

64
## Common setup
75
# all the required constants and files
@@ -13,36 +11,26 @@ module FileWatch
1311
# this is used in the read loop e.g.
1412
# @opts[:file_chunk_count].times do
1513
# where file_chunk_count defaults to this constant
16-
FIXNUM_MAX = (2**(0.size * 8 - 2) - 1)
14+
MAX_ITERATIONS = (2**(0.size * 8 - 2) - 2) / 32768
1715

1816
require_relative "helper"
1917

20-
module WindowsInode
21-
def prepare_inode(path, stat)
22-
fileId = Winhelper.GetWindowsUniqueFileIdentifier(path)
23-
[fileId, 0, 0] # dev_* doesn't make sense on Windows
24-
end
25-
end
26-
27-
module UnixInode
28-
def prepare_inode(path, stat)
29-
[stat.ino.to_s, stat.dev_major, stat.dev_minor]
30-
end
31-
end
32-
33-
jar_version = Pathname.new(__FILE__).dirname.join("../../JAR_VERSION").realpath.read.strip
34-
18+
gem_root_dir = Pathname.new(__FILE__).dirname.join("../../").realpath
19+
jar_version = gem_root_dir.join("JAR_VERSION").read.strip
20+
fullpath = gem_root_dir.join("lib/jars/filewatch-#{jar_version}.jar").expand_path.to_path
3521
require "java"
36-
require_relative "../../lib/jars/filewatch-#{jar_version}.jar"
22+
require fullpath
3723
require "jruby_file_watch"
3824

3925
if LogStash::Environment.windows?
4026
require_relative "winhelper"
27+
require_relative "stat/windows_path"
28+
PathStatClass = Stat::WindowsPath
4129
FileOpener = FileExt
42-
InodeMixin = WindowsInode
4330
else
31+
require_relative "stat/generic"
32+
PathStatClass = Stat::Generic
4433
FileOpener = ::File
45-
InodeMixin = UnixInode
4634
end
4735

4836
# Structs can be used as hash keys because they compare by value

lib/filewatch/discoverer.rb

+35-28
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ class Discoverer
1010
include LogStash::Util::Loggable
1111

1212
def initialize(watched_files_collection, sincedb_collection, settings)
13-
@watching = []
14-
@exclude = []
13+
@watching = Concurrent::Array.new
14+
@exclude = Concurrent::Array.new
1515
@watched_files_collection = watched_files_collection
1616
@sincedb_collection = sincedb_collection
1717
@settings = settings
@@ -21,13 +21,13 @@ def initialize(watched_files_collection, sincedb_collection, settings)
2121
def add_path(path)
2222
return if @watching.member?(path)
2323
@watching << path
24-
discover_files(path)
24+
discover_files_new_path(path)
2525
self
2626
end
2727

2828
def discover
2929
@watching.each do |path|
30-
discover_files(path)
30+
discover_files_ongoing(path)
3131
end
3232
end
3333

@@ -37,7 +37,7 @@ def can_exclude?(watched_file, new_discovery)
3737
@exclude.each do |pattern|
3838
if watched_file.pathname.fnmatch?(pattern)
3939
if new_discovery
40-
logger.debug("Discoverer can_exclude?: #{watched_file.path}: skipping " +
40+
logger.trace("Discoverer can_exclude?: #{watched_file.path}: skipping " +
4141
"because it matches exclude #{pattern}")
4242
end
4343
watched_file.unwatch
@@ -47,45 +47,52 @@ def can_exclude?(watched_file, new_discovery)
4747
false
4848
end
4949

50-
def discover_files(path)
51-
globbed = Dir.glob(path)
52-
globbed = [path] if globbed.empty?
53-
logger.debug("Discoverer found files, count: #{globbed.size}")
54-
globbed.each do |file|
55-
logger.debug("Discoverer found file, path: #{file}")
50+
def discover_files_new_path(path)
51+
discover_any_files(path, false)
52+
end
53+
54+
def discover_files_ongoing(path)
55+
discover_any_files(path, true)
56+
end
57+
58+
def discover_any_files(path, ongoing)
59+
fileset = Dir.glob(path).select{|f| File.file?(f) && !File.symlink?(f)}
60+
logger.trace("discover_files", "count" => fileset.size)
61+
fileset.each do |file|
5662
pathname = Pathname.new(file)
57-
next unless pathname.file?
58-
next if pathname.symlink?
5963
new_discovery = false
6064
watched_file = @watched_files_collection.watched_file_by_path(file)
6165
if watched_file.nil?
62-
logger.debug("Discoverer discover_files: #{path}: new: #{file} (exclude is #{@exclude.inspect})")
6366
new_discovery = true
64-
watched_file = WatchedFile.new(pathname, pathname.stat, @settings)
67+
watched_file = WatchedFile.new(pathname, PathStatClass.new(pathname), @settings)
6568
end
6669
# if it already unwatched or its excluded then we can skip
6770
next if watched_file.unwatched? || can_exclude?(watched_file, new_discovery)
6871

72+
logger.trace("discover_files handling:", "new discovery"=> new_discovery, "watched_file details" => watched_file.details)
73+
6974
if new_discovery
70-
if watched_file.file_ignorable?
71-
logger.debug("Discoverer discover_files: #{file}: skipping because it was last modified more than #{@settings.ignore_older} seconds ago")
72-
# on discovery ignorable watched_files are put into the ignored state and that
73-
# updates the size from the internal stat
74-
# so the existing contents are not read.
75-
# because, normally, a newly discovered file will
76-
# have a watched_file size of zero
77-
# they are still added to the collection so we know they are there for the next periodic discovery
78-
watched_file.ignore
79-
end
80-
# now add the discovered file to the watched_files collection and adjust the sincedb collections
81-
@watched_files_collection.add(watched_file)
75+
watched_file.initial_completed if ongoing
8276
# initially when the sincedb collection is filled with records from the persistence file
8377
# each value is not associated with a watched file
8478
# a sincedb_value can be:
8579
# unassociated
8680
# associated with this watched_file
8781
# associated with a different watched_file
88-
@sincedb_collection.associate(watched_file)
82+
if @sincedb_collection.associate(watched_file)
83+
if watched_file.file_ignorable?
84+
logger.trace("Discoverer discover_files: #{file}: skipping because it was last modified more than #{@settings.ignore_older} seconds ago")
85+
# on discovery ignorable watched_files are put into the ignored state and that
86+
# updates the size from the internal stat
87+
# so the existing contents are not read.
88+
# because, normally, a newly discovered file will
89+
# have a watched_file size of zero
90+
# they are still added to the collection so we know they are there for the next periodic discovery
91+
watched_file.ignore_as_unread
92+
end
93+
# now add the discovered file to the watched_files collection and adjust the sincedb collections
94+
@watched_files_collection.add(watched_file)
95+
end
8996
end
9097
# at this point the watched file is created, is in the db but not yet opened or being processed
9198
end

lib/filewatch/observing_base.rb

+2-1
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@ def initialize(opts={})
4444
:exclude => [],
4545
:start_new_files_at => :end,
4646
:delimiter => "\n",
47-
:file_chunk_count => FIXNUM_MAX,
47+
:file_chunk_count => MAX_ITERATIONS,
48+
:file_chunk_size => FILE_READ_SIZE,
4849
:file_sort_by => "last_modified",
4950
:file_sort_direction => "asc",
5051
}.merge(opts)

0 commit comments

Comments
 (0)