Skip to content

Commit 4966218

Browse files
committedJan 7, 2018
[FAB-7639] Block expired x509 identities in events
This change set blocks event registration in expired x509 identities, and also makes the events producer not send events to x509 clients which their identity has expired during the session. Change-Id: I1b32c21ced5a7b531a427fb14ee331092a8eae3f Signed-off-by: yacovm <yacovm@il.ibm.com>
1 parent d783a0d commit 4966218

File tree

4 files changed

+140
-4
lines changed

4 files changed

+140
-4
lines changed
 

‎events/producer/events.go

+13-1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"sync"
2222
"time"
2323

24+
"github.com/hyperledger/fabric/common/util"
2425
pb "github.com/hyperledger/fabric/protos/peer"
2526
)
2627

@@ -217,7 +218,6 @@ func (ep *eventProcessor) start() {
217218
for {
218219
//wait for event
219220
e := <-ep.eventChannel
220-
221221
var hl handlerList
222222
eType := getMessageType(e)
223223
ep.Lock()
@@ -229,7 +229,15 @@ func (ep *eventProcessor) start() {
229229
//lock the handler map lock
230230
ep.Unlock()
231231

232+
now := time.Now()
232233
hl.foreach(e, func(h *handler) {
234+
if hasSessionExpired(now, h.sessionEndTime) {
235+
addr := util.ExtractRemoteAddress(h.ChatStream.Context())
236+
logger.Warning("Client's", addr, " identity has expired")
237+
// We have to call Stop asynchronously because hl.foreach() holds a lock on hl
238+
go h.Stop()
239+
return
240+
}
233241
if e.Event != nil {
234242
h.SendMessage(e)
235243
}
@@ -238,6 +246,10 @@ func (ep *eventProcessor) start() {
238246
}
239247
}
240248

249+
func hasSessionExpired(now, sessionEndTime time.Time) bool {
250+
return !sessionEndTime.IsZero() && now.After(sessionEndTime)
251+
}
252+
241253
//initialize and start
242254
func initializeEvents(config *EventsServerConfig) {
243255
if gEventProcessor != nil {

‎events/producer/handler.go

+8
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,15 @@ import (
2424

2525
"github.com/golang/protobuf/proto"
2626

27+
"github.com/hyperledger/fabric/common/crypto"
2728
"github.com/hyperledger/fabric/msp/mgmt"
2829
pb "github.com/hyperledger/fabric/protos/peer"
2930
)
3031

3132
type handler struct {
3233
ChatStream pb.Events_ChatServer
3334
interestedEvents map[string]*pb.Interest
35+
sessionEndTime time.Time
3436
}
3537

3638
func newEventHandler(stream pb.Events_ChatServer) *handler {
@@ -162,6 +164,12 @@ func (d *handler) validateEventMessage(signedEvt *pb.SignedEvent) (*pb.Event, er
162164
return nil, fmt.Errorf("error unmarshaling the event bytes in the SignedEvent: %s", err)
163165
}
164166

167+
expirationTime := crypto.ExpiresAt(evt.Creator)
168+
if !expirationTime.IsZero() && time.Now().After(expirationTime) {
169+
return nil, fmt.Errorf("identity expired")
170+
}
171+
d.sessionEndTime = expirationTime
172+
165173
if evt.GetTimestamp() != nil {
166174
evtTime := time.Unix(evt.GetTimestamp().Seconds, int64(evt.GetTimestamp().Nanos)).UTC()
167175
peerTime := time.Now()

‎events/producer/producer_test.go

+105-3
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ import (
3535
"google.golang.org/grpc/credentials"
3636
"google.golang.org/grpc/grpclog"
3737

38+
"io/ioutil"
39+
"path/filepath"
40+
3841
"github.com/golang/protobuf/proto"
3942
"github.com/golang/protobuf/ptypes/timestamp"
4043
"github.com/hyperledger/fabric/common/ledger/testutil"
@@ -47,6 +50,8 @@ import (
4750
"github.com/hyperledger/fabric/msp"
4851
"github.com/hyperledger/fabric/msp/mgmt"
4952
"github.com/hyperledger/fabric/msp/mgmt/testtools"
53+
"github.com/hyperledger/fabric/protos/common"
54+
msp2 "github.com/hyperledger/fabric/protos/msp"
5055
pb "github.com/hyperledger/fabric/protos/peer"
5156
"github.com/hyperledger/fabric/protos/utils"
5257
"github.com/spf13/viper"
@@ -110,7 +115,7 @@ func (a *Adapter) Disconnected(err error) {
110115
}
111116
}
112117

113-
func createRegisterEvent(timestamp *timestamp.Timestamp, cert *x509.Certificate) (*pb.Event, error) {
118+
func createRegisterEvent(timestamp *timestamp.Timestamp, tlsCert *x509.Certificate) (*pb.Event, error) {
114119
events := make([]*pb.Interest, 2)
115120
events[0] = &pb.Interest{
116121
EventType: pb.EventType_BLOCK,
@@ -129,8 +134,8 @@ func createRegisterEvent(timestamp *timestamp.Timestamp, cert *x509.Certificate)
129134
Creator: signerSerialized,
130135
Timestamp: timestamp,
131136
}
132-
if cert != nil {
133-
evt.TlsCertHash = util.ComputeSHA256(cert.Raw)
137+
if tlsCert != nil {
138+
evt.TlsCertHash = util.ComputeSHA256(tlsCert.Raw)
134139
}
135140
return evt, nil
136141
}
@@ -157,11 +162,24 @@ func corrupt(bytes []byte) {
157162
bytes[r.Int31n(int32(len(bytes)))]--
158163
}
159164

165+
func createExpiredIdentity(t *testing.T) []byte {
166+
certBytes, err := ioutil.ReadFile(filepath.Join("testdata", "expiredCert.pem"))
167+
assert.NoError(t, err)
168+
sId := &msp2.SerializedIdentity{
169+
IdBytes: certBytes,
170+
}
171+
serializedIdentity, err := proto.Marshal(sId)
172+
assert.NoError(t, err)
173+
return serializedIdentity
174+
}
175+
160176
func TestSignedEvent(t *testing.T) {
161177
recvChan := make(chan *streamEvent)
162178
sendChan := make(chan *pb.Event)
163179
stream := &mockEventStream{recvChan: recvChan, sendChan: sendChan}
164180
mockHandler := &handler{ChatStream: stream}
181+
backupSerializedIdentity := signerSerialized
182+
signerSerialized = createExpiredIdentity(t)
165183
// get a test event
166184
evt, err := createRegisterEvent(nil, nil)
167185
if err != nil {
@@ -176,6 +194,29 @@ func TestSignedEvent(t *testing.T) {
176194
return
177195
}
178196

197+
// validate it. Expected to fail because the identity expired
198+
_, err = mockHandler.validateEventMessage(sEvt)
199+
assert.Equal(t, err.Error(), "identity expired")
200+
if err == nil {
201+
t.Fatalf("validateEventMessage succeeded but should have failed")
202+
return
203+
}
204+
205+
// Restore the original legit serialized identity
206+
signerSerialized = backupSerializedIdentity
207+
evt, err = createRegisterEvent(nil, nil)
208+
if err != nil {
209+
t.Fatalf("createEvent failed, err %s", err)
210+
return
211+
}
212+
213+
// sign it
214+
sEvt, err = utils.GetSignedEvent(evt, signer)
215+
if err != nil {
216+
t.Fatalf("GetSignedEvent failed, err %s", err)
217+
return
218+
}
219+
179220
// validate it. Expected to succeed
180221
_, err = mockHandler.validateEventMessage(sEvt)
181222
if err != nil {
@@ -440,6 +481,67 @@ func TestRegister_MutualTLS(t *testing.T) {
440481
}
441482
}
442483

484+
func TestRegister_ExpiredIdentity(t *testing.T) {
485+
m := newMockEventhub()
486+
defer close(m.recvChan)
487+
488+
go ehServer.Chat(m)
489+
490+
publishBlock := func() {
491+
gEventProcessor.eventChannel <- &pb.Event{
492+
Event: &pb.Event_Block{
493+
Block: &common.Block{
494+
Header: &common.BlockHeader{
495+
Number: 100,
496+
},
497+
},
498+
},
499+
}
500+
}
501+
502+
expireSessions := func() {
503+
gEventProcessor.RLock()
504+
handlerList := gEventProcessor.eventConsumers[pb.EventType_BLOCK].(*genericHandlerList)
505+
handlerList.RLock()
506+
for k := range handlerList.handlers {
507+
// Artificially move the session end time a minute into the past
508+
k.sessionEndTime = time.Now().Add(-1 * time.Minute)
509+
}
510+
handlerList.RUnlock()
511+
gEventProcessor.RUnlock()
512+
}
513+
514+
sEvt, err := createSignedRegisterEvent(util.CreateUtcTimestamp(), nil)
515+
assert.NoError(t, err)
516+
m.recvChan <- &streamEvent{event: sEvt}
517+
518+
// Wait for register Ack
519+
select {
520+
case <-m.sendChan:
521+
case <-time.After(time.Millisecond * 500):
522+
assert.Fail(t, "Didn't receive back a register ack on time")
523+
}
524+
525+
// Publish a block and make sure we receive it
526+
publishBlock()
527+
select {
528+
case resp := <-m.sendChan:
529+
assert.Equal(t, uint64(100), resp.GetBlock().Header.Number)
530+
case <-time.After(time.Millisecond * 500):
531+
assert.Fail(t, "Didn't receive the block on time, but should have")
532+
}
533+
534+
// Expire the sessions, and publish a block again
535+
expireSessions()
536+
publishBlock()
537+
// Make sure we don't receive it
538+
select {
539+
case resp := <-m.sendChan:
540+
t.Fatalf("Received a block (%v) but wasn't supposed to", resp.GetBlock())
541+
case <-time.After(time.Millisecond * 500):
542+
}
543+
}
544+
443545
func resetEventProcessor(useMutualTLS bool) {
444546
extract := func(msg proto.Message) []byte {
445547
evt, isEvent := msg.(*pb.Event)
+14
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
-----BEGIN CERTIFICATE-----
2+
MIICGTCCAcCgAwIBAgIRAMjlb3tiiXUty7eFbtauwM0wCgYIKoZIzj0EAwIwczEL
3+
MAkGA1UEBhMCVVMxEzARBgNVBAgTCkNhbGlmb3JuaWExFjAUBgNVBAcTDVNhbiBG
4+
cmFuY2lzY28xGTAXBgNVBAoTEG9yZzEuZXhhbXBsZS5jb20xHDAaBgNVBAMTE2Nh
5+
Lm9yZzEuZXhhbXBsZS5jb20wHhcNMTgwMTA3MTIxMDM2WhcNMTgwMTA3MTIwNTM2
6+
WjBbMQswCQYDVQQGEwJVUzETMBEGA1UECBMKQ2FsaWZvcm5pYTEWMBQGA1UEBxMN
7+
U2FuIEZyYW5jaXNjbzEfMB0GA1UEAwwWVXNlcjFAb3JnMS5leGFtcGxlLmNvbTBZ
8+
MBMGByqGSM49AgEGCCqGSM49AwEHA0IABGzOvcOtZ+WvatGfkU/iQPfbU9AlvD+O
9+
bKuyUD35v71WvQf1sasUPKp0gBt4SaGLe0bOnvFTGKvTr0TgVu6T85ejTTBLMA4G
10+
A1UdDwEB/wQEAwIHgDAMBgNVHRMBAf8EAjAAMCsGA1UdIwQkMCKAIAvawoCOaQSY
11+
mSfQpUm8eezH91vKkfQM2Ui0RVffql3bMAoGCCqGSM49BAMCA0cAMEQCIEbzjljz
12+
s0QcGPZZGhPkbaSK1MurBzfmE4izQ6roccLLAiBGZo5i59k+gNTsVCsS4s2HBF6U
13+
rj/t6mYKdOVxhPXFbw==
14+
-----END CERTIFICATE-----

0 commit comments

Comments
 (0)
Please sign in to comment.