Skip to content

Commit 9b048b0

Browse files
committed
Merge pull request #408 from lytics/master
Adding TCP keepalives support for the broker's connection
2 parents 237f79b + 449c713 commit 9b048b0

File tree

3 files changed

+13
-1
lines changed

3 files changed

+13
-1
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ Improvements:
1010
([#396](https://github.com/Shopify/sarama/pull/396)).
1111
- The consumer produces much more useful logging output when leadership
1212
changes ([#385](https://github.com/Shopify/sarama/pull/385)).
13+
- Added support for tcp keepalives ([#407](https://github.com/Shopify/sarama/issues/407)).
1314

1415
Bug Fixes:
1516
- The OffsetCommitRequest message now correctly implements all three possible

broker.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,12 @@ func (b *Broker) Open(conf *Config) error {
6868
go withRecover(func() {
6969
defer b.lock.Unlock()
7070

71-
b.conn, b.connErr = net.DialTimeout("tcp", b.addr, conf.Net.DialTimeout)
71+
dialer := net.Dialer{
72+
Timeout: conf.Net.DialTimeout,
73+
KeepAlive: conf.Net.KeepAlive,
74+
}
75+
76+
b.conn, b.connErr = dialer.Dial("tcp", b.addr)
7277
if b.connErr != nil {
7378
b.conn = nil
7479
atomic.StoreInt32(&b.opened, 0)

config.go

+6
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,10 @@ type Config struct {
1212
DialTimeout time.Duration // How long to wait for the initial connection to succeed before timing out and returning an error (default 30s).
1313
ReadTimeout time.Duration // How long to wait for a response before timing out and returning an error (default 30s).
1414
WriteTimeout time.Duration // How long to wait for a transmit to succeed before timing out and returning an error (default 30s).
15+
16+
// KeepAlive specifies the keep-alive period for an active network connection.
17+
// If zero, keep-alives are disabled. (default is 0: disabled).
18+
KeepAlive time.Duration
1519
}
1620

1721
// Metadata is the namespace for metadata management properties used by the Client, and shared by the Producer/Consumer.
@@ -186,6 +190,8 @@ func (c *Config) Validate() error {
186190
return ConfigurationError("Invalid Net.ReadTimeout, must be > 0")
187191
case c.Net.WriteTimeout <= 0:
188192
return ConfigurationError("Invalid Net.WriteTimeout, must be > 0")
193+
case c.Net.KeepAlive < 0:
194+
return ConfigurationError("Invalid Net.KeepAlive, must be >= 0")
189195
}
190196

191197
// validate the Metadata values

0 commit comments

Comments
 (0)