Skip to content
This repository was archived by the owner on Sep 9, 2022. It is now read-only.

Tag peers with live hop streams #75

Merged
merged 3 commits into from
May 7, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ require (
github.com/ipfs/go-log v0.0.1
github.com/libp2p/go-buffer-pool v0.0.1
github.com/libp2p/go-libp2p-blankhost v0.0.1
github.com/libp2p/go-libp2p-host v0.0.1
github.com/libp2p/go-libp2p-host v0.0.3
github.com/libp2p/go-libp2p-net v0.0.2
github.com/libp2p/go-libp2p-peer v0.0.1
github.com/libp2p/go-libp2p-peerstore v0.0.1
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792/go.mod h1:ghJtE
github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46fmI40EZs=
github.com/coreos/go-semver v0.2.0 h1:3Jm3tLmsgAYcjC+4Up7hJrFBPr+n7rAqYeSw/SZazuY=
github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/coreos/go-semver v0.2.1-0.20180108230905-e214231b295a h1:U0BbGfKnviqVBJQB4etvm+mKx53KfkumNLBt6YeF/0Q=
github.com/coreos/go-semver v0.2.1-0.20180108230905-e214231b295a/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/davecgh/go-spew v0.0.0-20171005155431-ecdeabc65495/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down Expand Up @@ -76,8 +78,12 @@ github.com/libp2p/go-libp2p-crypto v0.0.1 h1:JNQd8CmoGTohO/akqrH16ewsqZpci2CbgYH
github.com/libp2p/go-libp2p-crypto v0.0.1/go.mod h1:yJkNyDmO341d5wwXxDUGO0LykUVT72ImHNUqh5D/dBE=
github.com/libp2p/go-libp2p-host v0.0.1 h1:dnqusU+DheGcdxrE718kG4XgHNuL2n9eEv8Rg5zy8hQ=
github.com/libp2p/go-libp2p-host v0.0.1/go.mod h1:qWd+H1yuU0m5CwzAkvbSjqKairayEHdR5MMl7Cwa7Go=
github.com/libp2p/go-libp2p-host v0.0.3 h1:BB/1Z+4X0rjKP5lbQTmjEjLbDVbrcmLOlA6QDsN5/j4=
github.com/libp2p/go-libp2p-host v0.0.3/go.mod h1:Y/qPyA6C8j2coYyos1dfRm0I8+nvd4TGrDGt4tA7JR8=
github.com/libp2p/go-libp2p-interface-connmgr v0.0.1 h1:Q9EkNSLAOF+u90L88qmE9z/fTdjLh8OsJwGw74mkwk4=
github.com/libp2p/go-libp2p-interface-connmgr v0.0.1/go.mod h1:GarlRLH0LdeWcLnYM/SaBykKFl9U5JFnbBGruAk/D5k=
github.com/libp2p/go-libp2p-interface-connmgr v0.0.4 h1:/LngXETpII5qOD7YjAcQiIxhVtdAk/NQe5t9sC6BR0E=
github.com/libp2p/go-libp2p-interface-connmgr v0.0.4/go.mod h1:GarlRLH0LdeWcLnYM/SaBykKFl9U5JFnbBGruAk/D5k=
github.com/libp2p/go-libp2p-interface-pnet v0.0.1 h1:7GnzRrBTJHEsofi1ahFdPN9Si6skwXQE9UqR2S+Pkh8=
github.com/libp2p/go-libp2p-interface-pnet v0.0.1/go.mod h1:el9jHpQAXK5dnTpKA4yfCNBZXvrzdOU75zz+C6ryp3k=
github.com/libp2p/go-libp2p-loggables v0.0.1 h1:HVww9oAnINIxbt69LJNkxD8lnbfgteXR97Xm4p3l9ps=
Expand Down
17 changes: 16 additions & 1 deletion relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,15 @@ func NewRelay(ctx context.Context, h host.Host, upgrader *tptu.Upgrader, opts ..

func (r *Relay) addLiveHop(from, to peer.ID) {
atomic.AddInt32(&r.liveHopCount, 1)
r.host.ConnManager().UpsertTag(from, "relay-hop-stream", func(v int) int { return v + 1 })
r.host.ConnManager().UpsertTag(to, "relay-hop-stream", func(v int) int { return v + 1 })
}

func (r *Relay) rmLiveHop(from, to peer.ID) {
atomic.AddInt32(&r.liveHopCount, -1)
r.host.ConnManager().UpsertTag(from, "relay-hop-stream", func(v int) int { return v - 1 })
r.host.ConnManager().UpsertTag(to, "relay-hop-stream", func(v int) int { return v - 1 })

}

func (r *Relay) GetActiveHops() int32 {
Expand Down Expand Up @@ -364,10 +369,13 @@ func (r *Relay) handleHopStream(s inet.Stream, msg *pb.CircuitRelay) {

r.addLiveHop(src.ID, dst.ID)

var wg sync.WaitGroup
wg.Add(2)

// Don't reset streams after finishing or the other side will get an
// error, not an EOF.
go func() {
defer r.rmLiveHop(src.ID, dst.ID)
defer wg.Done()

buf := pool.Get(HopStreamBufferSize)
defer pool.Put(buf)
Expand All @@ -386,6 +394,8 @@ func (r *Relay) handleHopStream(s inet.Stream, msg *pb.CircuitRelay) {
}()

go func() {
defer wg.Done()

buf := pool.Get(HopStreamBufferSize)
defer pool.Put(buf)

Expand All @@ -401,6 +411,11 @@ func (r *Relay) handleHopStream(s inet.Stream, msg *pb.CircuitRelay) {
}
log.Debugf("relayed %d bytes from %s to %s", count, src.ID.Pretty(), dst.ID.Pretty())
}()

go func() {
wg.Wait()
r.rmLiveHop(src.ID, dst.ID)
}()
}

func (r *Relay) handleStopStream(s inet.Stream, msg *pb.CircuitRelay) {
Expand Down