Skip to content

Commit 7bbb175

Browse files
committed
Add an optional fast checker to (*partitionConsumer).responseFeeder
Add a ticker in (*partitionConsumer).responseFeeder to frequently check for messages before starting the timeout timer. Using the fast checker significantly reduces the number of timer function calls when timeouts are infrequent.
1 parent 3fee590 commit 7bbb175

File tree

4 files changed

+148
-9
lines changed

4 files changed

+148
-9
lines changed

config.go

+20
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,23 @@ type Config struct {
197197
// (MaxProcessingTime * ChanneBufferSize). Defaults to 100ms.
198198
MaxProcessingTime time.Duration
199199

200+
// The time interval between ticks of the fast checker. A value of 0
201+
// turns off the fast checker.
202+
// If this is set to a non-zero value, then there will be periodic
203+
// checks to see if messages have been written to the Messages channel.
204+
// If a message has not been written to the Messages channel since the
205+
// last tick of the fast checker, then the timer will be set.
206+
// Using the fast checker should typically result in many fewer calls to
207+
// Timer functions resulting in a significant performance improvement if
208+
// many messages are being sent and timeouts are infrequent.
209+
// The disadvantage of using the fast checker is that timeouts will be
210+
// less accurate. That is, the effective timeout could be between
211+
// `MaxProcessingTime` and `MaxProcessingTime + FastCheckerInterval`.
212+
// For example, if `MaxProcessingTime` is 100ms and
213+
// `FastCheckerInterval` is 10ms, then a delay of 108ms between two
214+
// messages being sent may not be recognized as a timeout.
215+
FastCheckerInterval time.Duration
216+
200217
// Return specifies what channels will be populated. If they are set to true,
201218
// you must read from them to prevent deadlock.
202219
Return struct {
@@ -277,6 +294,7 @@ func NewConfig() *Config {
277294
c.Consumer.Retry.Backoff = 2 * time.Second
278295
c.Consumer.MaxWaitTime = 250 * time.Millisecond
279296
c.Consumer.MaxProcessingTime = 100 * time.Millisecond
297+
c.Consumer.FastCheckerInterval = 0
280298
c.Consumer.Return.Errors = false
281299
c.Consumer.Offsets.CommitInterval = 1 * time.Second
282300
c.Consumer.Offsets.Initial = OffsetNewest
@@ -402,6 +420,8 @@ func (c *Config) Validate() error {
402420
return ConfigurationError("Consumer.MaxWaitTime must be >= 1ms")
403421
case c.Consumer.MaxProcessingTime <= 0:
404422
return ConfigurationError("Consumer.MaxProcessingTime must be > 0")
423+
case c.Consumer.FastCheckerInterval < 0:
424+
return ConfigurationError("Consumer.FastCheckerInterval must be >= 0")
405425
case c.Consumer.Retry.Backoff < 0:
406426
return ConfigurationError("Consumer.Retry.Backoff must be >= 0")
407427
case c.Consumer.Offsets.CommitInterval <= 0:

consumer.go

+41-8
Original file line numberDiff line numberDiff line change
@@ -440,25 +440,58 @@ func (child *partitionConsumer) HighWaterMarkOffset() int64 {
440440

441441
func (child *partitionConsumer) responseFeeder() {
442442
var msgs []*ConsumerMessage
443-
expiryTimer := time.NewTimer(child.conf.Consumer.MaxProcessingTime)
444-
expireTimedOut := false
443+
msgSent := false
444+
// Initialize timer without a pending send on its channel
445+
expiryTimer := time.NewTimer(0)
446+
<-expiryTimer.C
447+
expiryTimerSet := false
448+
449+
var fastCheckerChan <-chan (time.Time)
450+
if child.conf.Consumer.FastCheckerInterval > 0 {
451+
fastChecker := time.NewTicker(child.conf.Consumer.FastCheckerInterval)
452+
defer fastChecker.Stop()
453+
fastCheckerChan = fastChecker.C
454+
}
445455

446456
feederLoop:
447457
for response := range child.feeder {
448458
msgs, child.responseResult = child.parseResponse(response)
449459

450460
for i, msg := range msgs {
451-
if !expiryTimer.Stop() && !expireTimedOut {
452-
// expiryTimer was expired; clear out the waiting msg
453-
<-expiryTimer.C
461+
if child.conf.Consumer.FastCheckerInterval <= 0 {
462+
expiryTimerSet = true
463+
expiryTimer.Reset(child.conf.Consumer.MaxProcessingTime)
454464
}
455-
expiryTimer.Reset(child.conf.Consumer.MaxProcessingTime)
456-
expireTimedOut = false
457465

466+
messageSelect:
458467
select {
459468
case child.messages <- msg:
469+
msgSent = true
470+
if expiryTimerSet {
471+
// The timer was set and a message was sent, stop the
472+
// timer and resume using the fast checker
473+
if !expiryTimer.Stop() {
474+
<-expiryTimer.C
475+
}
476+
expiryTimerSet = false
477+
}
478+
// Periodically check if messages have been sent
479+
case <-fastCheckerChan:
480+
if msgSent {
481+
msgSent = false
482+
} else if !expiryTimerSet {
483+
// No messages have been sent since the last tick,
484+
// start the timer
485+
expiryTimerSet = true
486+
// If the fast checker is being used, then at least
487+
// the time between two fast checker ticks has already
488+
// passed since the last message was sent.
489+
expiryTimer.Reset(child.conf.Consumer.MaxProcessingTime - child.conf.Consumer.FastCheckerInterval)
490+
}
491+
// message has not been sent, return to select statement
492+
goto messageSelect
460493
case <-expiryTimer.C:
461-
expireTimedOut = true
494+
expiryTimerSet = false
462495
child.responseResult = errTimedOut
463496
child.broker.acks.Done()
464497
for _, msg = range msgs[i:] {

consumer_test.go

+86
Original file line numberDiff line numberDiff line change
@@ -803,6 +803,92 @@ func TestConsumerOffsetOutOfRange(t *testing.T) {
803803
broker0.Close()
804804
}
805805

806+
func TestConsumerFastCheckerOff(t *testing.T) {
807+
// Given
808+
broker0 := NewMockBroker(t, 0)
809+
fetchResponse1 := &FetchResponse{}
810+
for i := 1; i <= 8; i++ {
811+
fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, int64(i))
812+
}
813+
broker0.SetHandlerByMap(map[string]MockResponse{
814+
"MetadataRequest": NewMockMetadataResponse(t).
815+
SetBroker(broker0.Addr(), broker0.BrokerID()).
816+
SetLeader("my_topic", 0, broker0.BrokerID()),
817+
"OffsetRequest": NewMockOffsetResponse(t).
818+
SetOffset("my_topic", 0, OffsetNewest, 1234).
819+
SetOffset("my_topic", 0, OffsetOldest, 1),
820+
"FetchRequest": NewMockSequence(fetchResponse1),
821+
})
822+
823+
config := NewConfig()
824+
config.ChannelBufferSize = 0
825+
config.Consumer.FastCheckerInterval = 0
826+
config.Consumer.MaxProcessingTime = 10 * time.Millisecond
827+
master, err := NewConsumer([]string{broker0.Addr()}, config)
828+
if err != nil {
829+
t.Fatal(err)
830+
}
831+
832+
// When
833+
consumer, err := master.ConsumePartition("my_topic", 0, 1)
834+
if err != nil {
835+
t.Fatal(err)
836+
}
837+
838+
// Then: messages with offsets 1 through 8 are read
839+
for i := 1; i <= 8; i++ {
840+
assertMessageOffset(t, <-consumer.Messages(), int64(i))
841+
time.Sleep(2 * time.Millisecond)
842+
}
843+
844+
safeClose(t, consumer)
845+
safeClose(t, master)
846+
broker0.Close()
847+
}
848+
849+
func TestConsumerFastCheckerOn(t *testing.T) {
850+
// Given
851+
broker0 := NewMockBroker(t, 0)
852+
fetchResponse1 := &FetchResponse{}
853+
for i := 1; i <= 8; i++ {
854+
fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, int64(i))
855+
}
856+
broker0.SetHandlerByMap(map[string]MockResponse{
857+
"MetadataRequest": NewMockMetadataResponse(t).
858+
SetBroker(broker0.Addr(), broker0.BrokerID()).
859+
SetLeader("my_topic", 0, broker0.BrokerID()),
860+
"OffsetRequest": NewMockOffsetResponse(t).
861+
SetOffset("my_topic", 0, OffsetNewest, 1234).
862+
SetOffset("my_topic", 0, OffsetOldest, 1),
863+
"FetchRequest": NewMockSequence(fetchResponse1),
864+
})
865+
866+
config := NewConfig()
867+
config.ChannelBufferSize = 0
868+
config.Consumer.FastCheckerInterval = 1 * time.Millisecond
869+
config.Consumer.MaxProcessingTime = 10 * time.Millisecond
870+
master, err := NewConsumer([]string{broker0.Addr()}, config)
871+
if err != nil {
872+
t.Fatal(err)
873+
}
874+
875+
// When
876+
consumer, err := master.ConsumePartition("my_topic", 0, 1)
877+
if err != nil {
878+
t.Fatal(err)
879+
}
880+
881+
// Then: messages with offsets 1 through 8 are read
882+
for i := 1; i <= 8; i++ {
883+
assertMessageOffset(t, <-consumer.Messages(), int64(i))
884+
time.Sleep(2 * time.Millisecond)
885+
}
886+
887+
safeClose(t, consumer)
888+
safeClose(t, master)
889+
broker0.Close()
890+
}
891+
806892
func assertMessageOffset(t *testing.T, msg *ConsumerMessage, expectedOffset int64) {
807893
if msg.Offset != expectedOffset {
808894
t.Errorf("Incorrect message offset: expected=%d, actual=%d", expectedOffset, msg.Offset)

message_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ func TestMessageEncoding(t *testing.T) {
9191

9292
message.Value = []byte{}
9393
message.Codec = CompressionGZIP
94-
if runtime.Version() == "go1.8" {
94+
if runtime.Version() == "go1.8" || runtime.Version() == "go1.8.1" {
9595
testEncodable(t, "empty gzip", &message, emptyGzipMessage18)
9696
} else {
9797
testEncodable(t, "empty gzip", &message, emptyGzipMessage)

0 commit comments

Comments
 (0)