Skip to content

Commit a014f39

Browse files
committed
[FAB-10655]Problematic client conn stops eventhub evts
If a client connection hangs, the eventhub’s flow of events completely stops until the problematic client connection decides to return (likely after a grpc timeout). This CR adds a timeout to the Send calls on the GRPC stream to ensure the eventhub is not at the mercy of the client connection releasing the connection before it can continue sending events. It also cleans up some code and removes a large chunk of dead code, including the ability to send chaincode events, which has not been used by Fabric at least beginning with v1.0 if not earlier. Change-Id: Idd86c1cec7d307becbd1a47a15c19f65eb0c64f1 Signed-off-by: Will Lahti <wtlahti@us.ibm.com>
1 parent fcb7b26 commit a014f39

11 files changed

+431
-601
lines changed

events/consumer/consumer.go

+4-13
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,7 @@
11
/*
2-
Copyright IBM Corp. 2016 All Rights Reserved.
2+
Copyright IBM Corp. All Rights Reserved.
33
4-
Licensed under the Apache License, Version 2.0 (the "License");
5-
you may not use this file except in compliance with the License.
6-
You may obtain a copy of the License at
7-
8-
http://www.apache.org/licenses/LICENSE-2.0
9-
10-
Unless required by applicable law or agreed to in writing, software
11-
distributed under the License is distributed on an "AS IS" BASIS,
12-
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13-
See the License for the specific language governing permissions and
14-
limitations under the License.
4+
SPDX-License-Identifier: Apache-2.0
155
*/
166

177
package consumer
@@ -161,7 +151,7 @@ func (ec *EventsClient) UnregisterAsync(ies []*ehpb.Interest) error {
161151
emsg := &ehpb.Event{Event: &ehpb.Event_Unregister{Unregister: &ehpb.Unregister{Events: ies}}, Creator: creator}
162152

163153
if err = ec.send(emsg); err != nil {
164-
err = fmt.Errorf("error on unregister send %s\n", err)
154+
err = fmt.Errorf("error on unregister send: %s", err)
165155
}
166156

167157
return err
@@ -185,6 +175,7 @@ func (ec *EventsClient) Recv() (*ehpb.Event, error) {
185175
}
186176
return in, nil
187177
}
178+
188179
func (ec *EventsClient) processEvents() error {
189180
defer ec.stream.CloseSend()
190181
for {

events/consumer/consumer_test.go

+41-41
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,7 @@ import (
2121
coreutil "github.com/hyperledger/fabric/core/testutil"
2222
"github.com/hyperledger/fabric/events/producer"
2323
"github.com/hyperledger/fabric/msp/mgmt/testtools"
24-
ehpb "github.com/hyperledger/fabric/protos/peer"
25-
"github.com/spf13/viper"
24+
pb "github.com/hyperledger/fabric/protos/peer"
2625
"github.com/stretchr/testify/assert"
2726
"google.golang.org/grpc"
2827
)
@@ -43,41 +42,39 @@ type BadAdapter struct {
4342
}
4443

4544
var peerAddress = "0.0.0.0:7303"
46-
var ies = []*ehpb.Interest{{EventType: ehpb.EventType_CHAINCODE, RegInfo: &ehpb.Interest_ChaincodeRegInfo{ChaincodeRegInfo: &ehpb.ChaincodeReg{ChaincodeId: "0xffffffff", EventName: "event1"}}}}
45+
var ies = []*pb.Interest{{EventType: pb.EventType_BLOCK}}
4746
var testCert = &x509.Certificate{
4847
Raw: []byte("test"),
4948
}
5049

5150
var adapter *MockAdapter
52-
var obcEHClient *EventsClient
51+
var ehClient *EventsClient
5352

54-
func (a *ZeroAdapter) GetInterestedEvents() ([]*ehpb.Interest, error) {
55-
return []*ehpb.Interest{}, nil
53+
func (a *ZeroAdapter) GetInterestedEvents() ([]*pb.Interest, error) {
54+
return []*pb.Interest{}, nil
5655
}
57-
func (a *ZeroAdapter) Recv(msg *ehpb.Event) (bool, error) {
56+
func (a *ZeroAdapter) Recv(msg *pb.Event) (bool, error) {
5857
panic("not implemented")
5958
}
6059
func (a *ZeroAdapter) Disconnected(err error) {
6160
panic("not implemented")
6261
}
6362

64-
func (a *BadAdapter) GetInterestedEvents() ([]*ehpb.Interest, error) {
65-
return []*ehpb.Interest{}, fmt.Errorf("Error")
63+
func (a *BadAdapter) GetInterestedEvents() ([]*pb.Interest, error) {
64+
return []*pb.Interest{}, fmt.Errorf("Error")
6665
}
67-
func (a *BadAdapter) Recv(msg *ehpb.Event) (bool, error) {
66+
func (a *BadAdapter) Recv(msg *pb.Event) (bool, error) {
6867
panic("not implemented")
6968
}
7069
func (a *BadAdapter) Disconnected(err error) {
7170
panic("not implemented")
7271
}
7372

74-
func (a *MockAdapter) GetInterestedEvents() ([]*ehpb.Interest, error) {
75-
return []*ehpb.Interest{
76-
{EventType: ehpb.EventType_BLOCK},
77-
}, nil
73+
func (a *MockAdapter) GetInterestedEvents() ([]*pb.Interest, error) {
74+
return ies, nil
7875
}
7976

80-
func (a *MockAdapter) Recv(msg *ehpb.Event) (bool, error) {
77+
func (a *MockAdapter) Recv(msg *pb.Event) (bool, error) {
8178
return true, nil
8279
}
8380

@@ -159,25 +156,28 @@ func TestUnregisterAsync(t *testing.T) {
159156
done := make(chan struct{})
160157
adapter := &MockAdapter{notify: done}
161158

162-
obcEHClient, _ = NewEventsClient(peerAddress, 5, adapter)
159+
ehClient, _ = NewEventsClient(peerAddress, 5, adapter)
163160

164-
if err = obcEHClient.Start(); err != nil {
165-
obcEHClient.Stop()
161+
if err = ehClient.Start(); err != nil {
162+
ehClient.Stop()
166163
t.Fail()
167164
}
168165

169-
regConfig := &RegistrationConfig{InterestedEvents: ies, Timestamp: util.CreateUtcTimestamp(), TlsCert: testCert}
170-
obcEHClient.RegisterAsync(regConfig)
171-
err = obcEHClient.UnregisterAsync(ies)
166+
regConfig := &RegistrationConfig{
167+
InterestedEvents: ies,
168+
Timestamp: util.CreateUtcTimestamp(),
169+
TlsCert: testCert,
170+
}
171+
ehClient.RegisterAsync(regConfig)
172+
err = ehClient.UnregisterAsync(ies)
172173
assert.NoError(t, err)
173174

174-
obcEHClient.Stop()
175-
175+
ehClient.Stop()
176176
}
177177

178178
func TestStart(t *testing.T) {
179179
var err error
180-
var regTimeout = 5 * time.Second
180+
var regTimeout = 1 * time.Second
181181
done := make(chan struct{})
182182

183183
var cases = []struct {
@@ -215,33 +215,32 @@ func TestStart(t *testing.T) {
215215
for _, test := range cases {
216216
t.Run(test.name, func(t *testing.T) {
217217
t.Logf("Running test: %s", test.name)
218-
obcEHClient, _ = NewEventsClient(test.address, regTimeout, test.adapter)
219-
err = obcEHClient.Start()
218+
ehClient, _ = NewEventsClient(test.address, regTimeout, test.adapter)
219+
err = ehClient.Start()
220220
if test.expected {
221221
assert.NoError(t, err)
222222
} else {
223223
assert.Error(t, err)
224224
}
225-
obcEHClient.Stop()
225+
ehClient.Stop()
226226
})
227227
}
228228
}
229229

230230
func TestStop(t *testing.T) {
231231
var err error
232-
var regTimeout = 5 * time.Second
232+
var regTimeout = 1 * time.Second
233233
done := make(chan struct{})
234234
adapter := &MockAdapter{notify: done}
235235

236-
obcEHClient, _ = NewEventsClient(peerAddress, regTimeout, adapter)
236+
ehClient, _ = NewEventsClient(peerAddress, regTimeout, adapter)
237237

238-
if err = obcEHClient.Start(); err != nil {
238+
if err = ehClient.Start(); err != nil {
239239
t.Fail()
240-
t.Logf("Error client start %s", err)
240+
t.Logf("Error starting client: %s", err)
241241
}
242-
err = obcEHClient.Stop()
242+
err = ehClient.Stop()
243243
assert.NoError(t, err)
244-
245244
}
246245

247246
func TestMain(m *testing.M) {
@@ -263,24 +262,25 @@ func TestMain(m *testing.M) {
263262
}
264263

265264
extract := func(msg proto.Message) []byte {
266-
evt, isEvent := msg.(*ehpb.Event)
265+
evt, isEvent := msg.(*pb.Event)
267266
if !isEvent || evt == nil {
268267
return nil
269268
}
270269
return evt.TlsCertHash
271270
}
272271

272+
timeout := 10 * time.Millisecond
273273
ehConfig := &producer.EventsServerConfig{
274-
BufferSize: uint(viper.GetInt("peer.events.buffersize")),
275-
Timeout: viper.GetDuration("peer.events.timeout"),
276-
TimeWindow: viper.GetDuration("peer.events.timewindow"),
277-
BindingInspector: comm.NewBindingInspector(false, extract)}
274+
BufferSize: 100,
275+
Timeout: timeout,
276+
SendTimeout: timeout,
277+
TimeWindow: time.Minute,
278+
BindingInspector: comm.NewBindingInspector(false, extract),
279+
}
278280
ehServer := producer.NewEventsServer(ehConfig)
279-
280-
ehpb.RegisterEventsServer(grpcServer, ehServer)
281+
pb.RegisterEventsServer(grpcServer, ehServer)
281282

282283
go grpcServer.Serve(lis)
283284

284-
time.Sleep(2 * time.Second)
285285
os.Exit(m.Run())
286286
}

events/producer/eventhelper.go

+2-12
Original file line numberDiff line numberDiff line change
@@ -138,22 +138,12 @@ func CreateBlockEvents(block *common.Block) (bevent *pb.Event, fbevent *pb.Event
138138
return CreateBlockEvent(blockForEvent), CreateFilteredBlockEvent(filteredBlockForEvent), channelID, nil
139139
}
140140

141-
//CreateBlockEvent creates a Event from a Block
141+
// CreateBlockEvent creates a Event from a Block
142142
func CreateBlockEvent(te *common.Block) *pb.Event {
143143
return &pb.Event{Event: &pb.Event_Block{Block: te}}
144144
}
145145

146-
//CreateFilteredBlockEvent creates a Event from a FilteredBlock
146+
// CreateFilteredBlockEvent creates a Event from a FilteredBlock
147147
func CreateFilteredBlockEvent(te *pb.FilteredBlock) *pb.Event {
148148
return &pb.Event{Event: &pb.Event_FilteredBlock{FilteredBlock: te}}
149149
}
150-
151-
//CreateChaincodeEvent creates a Event from a ChaincodeEvent
152-
func CreateChaincodeEvent(te *pb.ChaincodeEvent) *pb.Event {
153-
return &pb.Event{Event: &pb.Event_ChaincodeEvent{ChaincodeEvent: te}}
154-
}
155-
156-
//CreateRejectionEvent creates an Event from TxResults
157-
func CreateRejectionEvent(tx *pb.Transaction, errorMsg string) *pb.Event {
158-
return &pb.Event{Event: &pb.Event_Rejection{Rejection: &pb.Rejection{Tx: tx, ErrorMsg: errorMsg}}}
159-
}

0 commit comments

Comments
 (0)