Skip to content

Commit a5d1da6

Browse files
committed
[FAB-10654] Enhance eventhub logging with client IP
This CR enhances the logging for the eventhub to include the client IP addresses to help troubleshoot problematic client connections. It also cleans up some existing logging messages and removes some unhelpful log messages. Change-Id: I5521ceab521adcf70735a2988784925aa45a4dde Signed-off-by: Will Lahti <wtlahti@us.ibm.com>
1 parent 3436f45 commit a5d1da6

File tree

3 files changed

+61
-93
lines changed

3 files changed

+61
-93
lines changed

events/producer/eventhelper.go

+3-16
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,7 @@
11
/*
2-
Copyright IBM Corp. 2016 All Rights Reserved.
2+
Copyright IBM Corp. All Rights Reserved.
33
4-
Licensed under the Apache License, Version 2.0 (the "License");
5-
you may not use this file except in compliance with the License.
6-
You may obtain a copy of the License at
7-
8-
http://www.apache.org/licenses/LICENSE-2.0
9-
10-
Unless required by applicable law or agreed to in writing, software
11-
distributed under the License is distributed on an "AS IS" BASIS,
12-
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13-
See the License for the specific language governing permissions and
14-
limitations under the License.
4+
SPDX-License-Identifier: Apache-2.0
155
*/
166

177
package producer
@@ -29,9 +19,6 @@ import (
2919
// and creates a block event and a filtered block event. Sending the events
3020
// is the responsibility of the code that calls this function.
3121
func CreateBlockEvents(block *common.Block) (bevent *pb.Event, fbevent *pb.Event, channelID string, err error) {
32-
logger.Debugf("Entry")
33-
defer logger.Debugf("Exit")
34-
3522
blockForEvent := &common.Block{}
3623
filteredBlockForEvent := &pb.FilteredBlock{}
3724
filteredTxArray := []*pb.FilteredTransaction{}
@@ -137,7 +124,7 @@ func CreateBlockEvents(block *common.Block) (bevent *pb.Event, fbevent *pb.Event
137124
}
138125
ebytes, err = utils.GetBytesEnvelope(env)
139126
if err != nil {
140-
return nil, nil, "", fmt.Errorf("cannot marshal transaction %s", err)
127+
return nil, nil, "", fmt.Errorf("cannot marshal transaction: %s", err)
141128
}
142129
}
143130
}

events/producer/events.go

+9-26
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,7 @@
11
/*
2-
Copyright IBM Corp. 2016 All Rights Reserved.
2+
Copyright IBM Corp. All Rights Reserved.
33
4-
Licensed under the Apache License, Version 2.0 (the "License");
5-
you may not use this file except in compliance with the License.
6-
You may obtain a copy of the License at
7-
8-
http://www.apache.org/licenses/LICENSE-2.0
9-
10-
Unless required by applicable law or agreed to in writing, software
11-
distributed under the License is distributed on an "AS IS" BASIS,
12-
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13-
See the License for the specific language governing permissions and
14-
limitations under the License.
4+
SPDX-License-Identifier: Apache-2.0
155
*/
166

177
package producer
@@ -21,7 +11,6 @@ import (
2111
"sync"
2212
"time"
2313

24-
"github.com/hyperledger/fabric/common/util"
2514
pb "github.com/hyperledger/fabric/protos/peer"
2615
)
2716

@@ -232,8 +221,7 @@ func (ep *eventProcessor) start() {
232221
now := time.Now()
233222
hl.foreach(e, func(h *handler) {
234223
if hasSessionExpired(now, h.sessionEndTime) {
235-
addr := util.ExtractRemoteAddress(h.ChatStream.Context())
236-
logger.Warning("Client's", addr, " identity has expired")
224+
logger.Warning("Client identity has expired for", h.RemoteAddr)
237225
// We have to call Stop asynchronously because hl.foreach() holds a lock on hl
238226
go h.Stop()
239227
return
@@ -269,7 +257,7 @@ func AddEventType(eventType pb.EventType) error {
269257
logger.Debugf("Registering %s", pb.EventType_name[int32(eventType)])
270258
if _, ok := gEventProcessor.eventConsumers[eventType]; ok {
271259
gEventProcessor.Unlock()
272-
return fmt.Errorf("event type exists %s", pb.EventType_name[int32(eventType)])
260+
return fmt.Errorf("event type %s already exists", pb.EventType_name[int32(eventType)])
273261
}
274262

275263
switch eventType {
@@ -288,7 +276,7 @@ func AddEventType(eventType pb.EventType) error {
288276
}
289277

290278
func registerHandler(ie *pb.Interest, h *handler) error {
291-
logger.Debugf("registering event type: %s", ie.EventType)
279+
logger.Debugf("Registering event type: %s", ie.EventType)
292280
gEventProcessor.Lock()
293281
defer gEventProcessor.Unlock()
294282
if hl, ok := gEventProcessor.eventConsumers[ie.EventType]; !ok {
@@ -301,7 +289,7 @@ func registerHandler(ie *pb.Interest, h *handler) error {
301289
}
302290

303291
func deRegisterHandler(ie *pb.Interest, h *handler) error {
304-
logger.Debugf("deregistering event type: %s", ie.EventType)
292+
logger.Debugf("Deregistering event type: %s", ie.EventType)
305293

306294
gEventProcessor.Lock()
307295
defer gEventProcessor.Unlock()
@@ -318,8 +306,6 @@ func deRegisterHandler(ie *pb.Interest, h *handler) error {
318306

319307
//Send sends the event to interested consumers
320308
func Send(e *pb.Event) error {
321-
logger.Debugf("Entry")
322-
defer logger.Debugf("Exit")
323309
if e.Event == nil {
324310
logger.Error("event not set")
325311
return fmt.Errorf("event not set")
@@ -331,24 +317,21 @@ func Send(e *pb.Event) error {
331317
}
332318

333319
if gEventProcessor.Timeout < 0 {
334-
logger.Debugf("Event processor timeout < 0")
335320
select {
336321
case gEventProcessor.eventChannel <- e:
337322
default:
338-
return fmt.Errorf("could not send the blocking event")
323+
return fmt.Errorf("could not add block event to event processor queue")
339324
}
340325
} else if gEventProcessor.Timeout == 0 {
341-
logger.Debugf("Event processor timeout = 0")
342326
gEventProcessor.eventChannel <- e
343327
} else {
344-
logger.Debugf("Event processor timeout > 0")
345328
select {
346329
case gEventProcessor.eventChannel <- e:
347330
case <-time.After(gEventProcessor.Timeout):
348-
return fmt.Errorf("could not send the blocking event")
331+
return fmt.Errorf("could not add block event to event processor queue")
349332
}
350333
}
351334

352-
logger.Debugf("Event sent successfully")
335+
logger.Debugf("Event added to event processor queue")
353336
return nil
354337
}

events/producer/handler.go

+49-51
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,7 @@
11
/*
2-
Copyright IBM Corp. 2016 All Rights Reserved.
2+
Copyright IBM Corp. All Rights Reserved.
33
4-
Licensed under the Apache License, Version 2.0 (the "License");
5-
you may not use this file except in compliance with the License.
6-
You may obtain a copy of the License at
7-
8-
http://www.apache.org/licenses/LICENSE-2.0
9-
10-
Unless required by applicable law or agreed to in writing, software
11-
distributed under the License is distributed on an "AS IS" BASIS,
12-
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13-
See the License for the specific language governing permissions and
14-
limitations under the License.
4+
SPDX-License-Identifier: Apache-2.0
155
*/
166

177
package producer
@@ -25,6 +15,7 @@ import (
2515
"github.com/golang/protobuf/proto"
2616

2717
"github.com/hyperledger/fabric/common/crypto"
18+
"github.com/hyperledger/fabric/common/util"
2819
"github.com/hyperledger/fabric/msp/mgmt"
2920
pb "github.com/hyperledger/fabric/protos/peer"
3021
)
@@ -33,20 +24,24 @@ type handler struct {
3324
ChatStream pb.Events_ChatServer
3425
interestedEvents map[string]*pb.Interest
3526
sessionEndTime time.Time
27+
RemoteAddr string
3628
}
3729

3830
func newEventHandler(stream pb.Events_ChatServer) *handler {
39-
d := &handler{
40-
ChatStream: stream,
31+
h := &handler{
32+
ChatStream: stream,
33+
interestedEvents: make(map[string]*pb.Interest),
34+
RemoteAddr: util.ExtractRemoteAddress(stream.Context()),
4135
}
42-
d.interestedEvents = make(map[string]*pb.Interest)
43-
return d
36+
logger.Debug("event handler created for", h.RemoteAddr)
37+
return h
4438
}
4539

4640
// Stop stops this handler
47-
func (d *handler) Stop() error {
48-
d.deregisterAll()
49-
d.interestedEvents = nil
41+
func (h *handler) Stop() error {
42+
h.deregisterAll()
43+
h.interestedEvents = nil
44+
logger.Debug("handler stopped for", h.RemoteAddr)
5045
return nil
5146
}
5247

@@ -62,83 +57,86 @@ func getInterestKey(interest pb.Interest) string {
6257
case pb.EventType_CHAINCODE:
6358
key = "/" + strconv.Itoa(int(pb.EventType_CHAINCODE)) + "/" + interest.GetChaincodeRegInfo().ChaincodeId + "/" + interest.GetChaincodeRegInfo().EventName
6459
default:
65-
logger.Errorf("unknown interest type %s", interest.EventType)
60+
logger.Errorf("unsupported interest type: %s", interest.EventType)
6661
}
6762

6863
return key
6964
}
7065

71-
func (d *handler) register(iMsg []*pb.Interest) error {
66+
func (h *handler) register(iMsg []*pb.Interest) error {
7267
// Could consider passing interest array to registerHandler
7368
// and only lock once for entire array here
7469
for _, v := range iMsg {
75-
if err := registerHandler(v, d); err != nil {
76-
logger.Errorf("could not register %s: %s", v, err)
70+
if err := registerHandler(v, h); err != nil {
71+
logger.Errorf("could not register %s for %s: %s", v, h.RemoteAddr, err)
7772
continue
7873
}
79-
d.interestedEvents[getInterestKey(*v)] = v
74+
h.interestedEvents[getInterestKey(*v)] = v
8075
}
8176

8277
return nil
8378
}
8479

85-
func (d *handler) deregister(iMsg []*pb.Interest) error {
80+
func (h *handler) deregister(iMsg []*pb.Interest) error {
8681
for _, v := range iMsg {
87-
if err := deRegisterHandler(v, d); err != nil {
88-
logger.Errorf("could not deregister %s", v)
82+
if err := deRegisterHandler(v, h); err != nil {
83+
logger.Errorf("could not deregister %s for %s: %s", v, h.RemoteAddr, err)
8984
continue
9085
}
91-
delete(d.interestedEvents, getInterestKey(*v))
86+
delete(h.interestedEvents, getInterestKey(*v))
9287
}
9388
return nil
9489
}
9590

96-
func (d *handler) deregisterAll() {
97-
for k, v := range d.interestedEvents {
98-
if err := deRegisterHandler(v, d); err != nil {
91+
func (h *handler) deregisterAll() {
92+
for k, v := range h.interestedEvents {
93+
if err := deRegisterHandler(v, h); err != nil {
9994
logger.Errorf("could not deregister %s", v)
10095
continue
10196
}
102-
delete(d.interestedEvents, k)
97+
delete(h.interestedEvents, k)
10398
}
10499
}
105100

106101
// HandleMessage handles the Openchain messages for the Peer.
107-
func (d *handler) HandleMessage(msg *pb.SignedEvent) error {
108-
evt, err := d.validateEventMessage(msg)
102+
func (h *handler) HandleMessage(msg *pb.SignedEvent) error {
103+
evt, err := h.validateEventMessage(msg)
109104
if err != nil {
110-
return fmt.Errorf("event message validation failed: [%s]", err)
105+
return fmt.Errorf("event message validation failed for %s: %s", h.RemoteAddr, err)
111106
}
112107

113108
switch evt.Event.(type) {
114109
case *pb.Event_Register:
115110
eventsObj := evt.GetRegister()
116-
if err := d.register(eventsObj.Events); err != nil {
117-
return fmt.Errorf("could not register events %s", err)
111+
if err := h.register(eventsObj.Events); err != nil {
112+
return fmt.Errorf("could not register events for %s: %s", h.RemoteAddr, err)
118113
}
119114
case *pb.Event_Unregister:
120115
eventsObj := evt.GetUnregister()
121-
if err := d.deregister(eventsObj.Events); err != nil {
122-
return fmt.Errorf("could not unregister events %s", err)
116+
if err := h.deregister(eventsObj.Events); err != nil {
117+
return fmt.Errorf("could not deregister events for %s: %s", h.RemoteAddr, err)
123118
}
124119
case nil:
125120
default:
126-
return fmt.Errorf("invalid type from client %T", evt.Event)
121+
return fmt.Errorf("invalid event type received from %s: %T", h.RemoteAddr, evt.Event)
127122
}
128123
//TODO return supported events.. for now just return the received msg
129-
if err := d.ChatStream.Send(evt); err != nil {
130-
return fmt.Errorf("error sending response to %v: %s", msg, err)
124+
if err := h.ChatStream.Send(evt); err != nil {
125+
return fmt.Errorf("error sending response to %s: %s", h.RemoteAddr, err)
131126
}
132127

133128
return nil
134129
}
135130

136131
// SendMessage sends a message to the remote PEER through the stream
137-
func (d *handler) SendMessage(msg *pb.Event) error {
138-
err := d.ChatStream.Send(msg)
132+
func (h *handler) SendMessage(msg *pb.Event) error {
133+
logger.Debug("sending event to", h.RemoteAddr)
134+
err := h.ChatStream.Send(msg)
139135
if err != nil {
140-
return fmt.Errorf("error Sending message through ChatStream: %s", err)
136+
logger.Debugf("sending event failed for %s: %s", h.RemoteAddr, err)
137+
return fmt.Errorf("error sending message through ChatStream: %s", err)
141138
}
139+
logger.Debug("event sent successfully to", h.RemoteAddr)
142140
return nil
143141
}
144142

@@ -153,7 +151,7 @@ func (d *handler) SendMessage(msg *pb.Event) error {
153151
// However, this is not being done for v1.0 due to complexity concerns and the need to complex a stable,
154152
// minimally viable release. Eventually events will be made channel-specific, at which point this method
155153
// should be revisited
156-
func (d *handler) validateEventMessage(signedEvt *pb.SignedEvent) (*pb.Event, error) {
154+
func (h *handler) validateEventMessage(signedEvt *pb.SignedEvent) (*pb.Event, error) {
157155
logger.Debugf("validating for signed event %p", signedEvt)
158156

159157
// messages from the client for registering and unregistering must be signed
@@ -168,7 +166,7 @@ func (d *handler) validateEventMessage(signedEvt *pb.SignedEvent) (*pb.Event, er
168166
if !expirationTime.IsZero() && time.Now().After(expirationTime) {
169167
return nil, fmt.Errorf("identity expired")
170168
}
171-
d.sessionEndTime = expirationTime
169+
h.sessionEndTime = expirationTime
172170

173171
if evt.GetTimestamp() != nil {
174172
evtTime := time.Unix(evt.GetTimestamp().Seconds, int64(evt.GetTimestamp().Nanos)).UTC()
@@ -180,7 +178,7 @@ func (d *handler) validateEventMessage(signedEvt *pb.SignedEvent) (*pb.Event, er
180178
}
181179
}
182180

183-
err = gEventProcessor.BindingInspector(d.ChatStream.Context(), evt)
181+
err = gEventProcessor.BindingInspector(h.ChatStream.Context(), evt)
184182
if err != nil {
185183
return nil, err
186184
}
@@ -191,18 +189,18 @@ func (d *handler) validateEventMessage(signedEvt *pb.SignedEvent) (*pb.Event, er
191189
// Load MSPPrincipal for policy
192190
principal, err := principalGetter.Get(mgmt.Members)
193191
if err != nil {
194-
return nil, fmt.Errorf("failed getting local MSP principal [member]: [%s]", err)
192+
return nil, fmt.Errorf("failed getting local MSP principal [member]: %s", err)
195193
}
196194

197195
id, err := localMSP.DeserializeIdentity(evt.Creator)
198196
if err != nil {
199-
return nil, fmt.Errorf("failed deserializing event creator: [%s]", err)
197+
return nil, fmt.Errorf("failed deserializing event creator: %s", err)
200198
}
201199

202200
// Verify that event's creator satisfies the principal
203201
err = id.SatisfiesPrincipal(principal)
204202
if err != nil {
205-
return nil, fmt.Errorf("failed verifying the creator satisfies local MSP's [member] principal: [%s]", err)
203+
return nil, fmt.Errorf("failed verifying the creator satisfies local MSP's [member] principal: %s", err)
206204
}
207205

208206
// Verify the signature

0 commit comments

Comments
 (0)