Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce configurable blacklist #2827

Merged
merged 1 commit into from
Jul 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions cmd/livepeer/starter/starter.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ type LivepeerConfig struct {
AuthWebhookURL *string
OrchWebhookURL *string
DetectionWebhookURL *string
OrchBlacklist *string
}

// DefaultLivepeerConfig creates LivepeerConfig exactly the same as when no flags are passed to the livepeer process.
Expand Down Expand Up @@ -1012,13 +1013,15 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {

bcast := core.NewBroadcaster(n)

orchBlacklist := parseOrchBlacklist(cfg.OrchBlacklist)

// When the node is on-chain mode always cache the on-chain orchestrators and poll for updates
// Right now we rely on the DBOrchestratorPoolCache constructor to do this. Consider separating the logic
// caching/polling from the logic for fetching orchestrators during discovery
if *cfg.Network != "offchain" {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
dbOrchPoolCache, err := discovery.NewDBOrchestratorPoolCache(ctx, n, timeWatcher)
dbOrchPoolCache, err := discovery.NewDBOrchestratorPoolCache(ctx, n, timeWatcher, orchBlacklist)
if err != nil {
glog.Fatalf("Could not create orchestrator pool with DB cache: %v", err)
}
Expand All @@ -1035,7 +1038,7 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
glog.Info("Using orchestrator webhook URL ", whurl)
n.OrchestratorPool = discovery.NewWebhookPool(bcast, whurl)
} else if len(orchURLs) > 0 {
n.OrchestratorPool = discovery.NewOrchestratorPool(bcast, orchURLs, common.Score_Trusted)
n.OrchestratorPool = discovery.NewOrchestratorPool(bcast, orchURLs, common.Score_Trusted, orchBlacklist)
}

if n.OrchestratorPool == nil {
Expand Down Expand Up @@ -1256,6 +1259,13 @@ func parseOrchAddrs(addrs string) []*url.URL {
return res
}

func parseOrchBlacklist(b *string) []string {
if b == nil {
return []string{}
}
return strings.Split(*b, ",")
}

func validateURL(u string) (*url.URL, error) {
if u == "" {
return nil, nil
Expand Down
6 changes: 4 additions & 2 deletions discovery/db_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ type DBOrchestratorPoolCache struct {
ticketParamsValidator ticketParamsValidator
rm common.RoundsManager
bcast common.Broadcaster
orchBlacklist []string
}

func NewDBOrchestratorPoolCache(ctx context.Context, node *core.LivepeerNode, rm common.RoundsManager) (*DBOrchestratorPoolCache, error) {
func NewDBOrchestratorPoolCache(ctx context.Context, node *core.LivepeerNode, rm common.RoundsManager, orchBlacklist []string) (*DBOrchestratorPoolCache, error) {
if node.Eth == nil {
return nil, fmt.Errorf("could not create DBOrchestratorPoolCache: LivepeerEthClient is nil")
}
Expand All @@ -49,6 +50,7 @@ func NewDBOrchestratorPoolCache(ctx context.Context, node *core.LivepeerNode, rm
ticketParamsValidator: node.Sender,
rm: rm,
bcast: core.NewBroadcaster(node),
orchBlacklist: orchBlacklist,
}

if err := dbo.cacheTranscoderPool(); err != nil {
Expand Down Expand Up @@ -140,7 +142,7 @@ func (dbo *DBOrchestratorPoolCache) GetOrchestrators(ctx context.Context, numOrc
return true
}

orchPool := NewOrchestratorPoolWithPred(dbo.bcast, uris, pred, common.Score_Untrusted)
orchPool := NewOrchestratorPoolWithPred(dbo.bcast, uris, pred, common.Score_Untrusted, dbo.orchBlacklist)
orchInfos, err := orchPool.GetOrchestrators(ctx, numOrchestrators, suspender, caps, scorePred)
if err != nil || len(orchInfos) <= 0 {
return nil, err
Expand Down
29 changes: 21 additions & 8 deletions discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package discovery
import (
"container/heap"
"context"
"encoding/hex"
"errors"
"math"
"math/rand"
"net/url"
"strings"
"time"

"github.com/livepeer/go-livepeer/clog"
Expand All @@ -25,12 +27,13 @@ var maxGetOrchestratorCutoffTimeout = 6 * time.Second
var serverGetOrchInfo = server.GetOrchestratorInfo

type orchestratorPool struct {
infos []common.OrchestratorLocalInfo
pred func(info *net.OrchestratorInfo) bool
bcast common.Broadcaster
infos []common.OrchestratorLocalInfo
pred func(info *net.OrchestratorInfo) bool
bcast common.Broadcaster
orchBlacklist []string
}

func NewOrchestratorPool(bcast common.Broadcaster, uris []*url.URL, score float32) *orchestratorPool {
func NewOrchestratorPool(bcast common.Broadcaster, uris []*url.URL, score float32, orchBlacklist []string) *orchestratorPool {
if len(uris) <= 0 {
// Should we return here?
glog.Error("Orchestrator pool does not have any URIs")
Expand All @@ -39,13 +42,13 @@ func NewOrchestratorPool(bcast common.Broadcaster, uris []*url.URL, score float3
for _, uri := range uris {
infos = append(infos, common.OrchestratorLocalInfo{URL: uri, Score: score})
}
return &orchestratorPool{infos: infos, bcast: bcast}
return &orchestratorPool{infos: infos, bcast: bcast, orchBlacklist: orchBlacklist}
}

func NewOrchestratorPoolWithPred(bcast common.Broadcaster, addresses []*url.URL,
pred func(*net.OrchestratorInfo) bool, score float32) *orchestratorPool {
pred func(*net.OrchestratorInfo) bool, score float32, orchBlacklist []string) *orchestratorPool {

pool := NewOrchestratorPool(bcast, addresses, score)
pool := NewOrchestratorPool(bcast, addresses, score, orchBlacklist)
pool.pred = pred
return pool
}
Expand Down Expand Up @@ -78,6 +81,15 @@ func (o *orchestratorPool) GetOrchestrators(ctx context.Context, numOrchestrator
// the assumption that all orchestrators support capability discovery.
legacyCapsOnly := caps.LegacyOnly()

isBlacklisted := func(info *net.OrchestratorInfo) bool {
for _, blacklisted := range o.orchBlacklist {
if strings.TrimPrefix(blacklisted, "0x") == hex.EncodeToString(info.Address) {
return true
}
}
return false
}

isCompatible := func(info *net.OrchestratorInfo) bool {
if o.pred != nil && !o.pred(info) {
return false
Expand All @@ -95,7 +107,7 @@ func (o *orchestratorPool) GetOrchestrators(ctx context.Context, numOrchestrator
}
getOrchInfo := func(ctx context.Context, od common.OrchestratorDescriptor, infoCh chan common.OrchestratorDescriptor, errCh chan error) {
info, err := serverGetOrchInfo(ctx, o.bcast, od.LocalInfo.URL)
if err == nil && isCompatible(info) {
if err == nil && !isBlacklisted(info) && isCompatible(info) {
od.RemoteInfo = info
infoCh <- od
return
Expand Down Expand Up @@ -156,6 +168,7 @@ func (o *orchestratorPool) GetOrchestrators(ctx context.Context, numOrchestrator
}
cancel()

// consider suspended orchestrators if we have an insufficient number of non-suspended ones
if len(ods) < numOrchestrators {
diff := numOrchestrators - len(ods)
for i := 0; i < diff && suspendedInfos.Len() > 0; i++ {
Expand Down
61 changes: 33 additions & 28 deletions discovery/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package discovery

import (
"context"
"encoding/hex"
"encoding/json"
"errors"
"math"
Expand Down Expand Up @@ -42,7 +43,7 @@ func TestNewDBOrchestratorPoolCache_NilEthClient_ReturnsError(t *testing.T) {
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{})
pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{}, []string{})
assert.Nil(pool)
assert.EqualError(err, "could not create DBOrchestratorPoolCache: LivepeerEthClient is nil")
}
Expand Down Expand Up @@ -72,7 +73,7 @@ func TestDeadLock(t *testing.T) {
uris := stringsToURIs(addresses)
assert := assert.New(t)
wg.Add(len(uris))
pool := NewOrchestratorPool(nil, uris, common.Score_Trusted)
pool := NewOrchestratorPool(nil, uris, common.Score_Trusted, []string{})
infos, err := pool.GetOrchestrators(context.TODO(), 1, newStubSuspender(), newStubCapabilities(), common.ScoreAtLeast(0))
assert.Nil(err, "Should not be error")
assert.Len(infos, 1, "Should return one orchestrator")
Expand Down Expand Up @@ -119,7 +120,7 @@ func TestDeadLock_NewOrchestratorPoolWithPred(t *testing.T) {
}

wg.Add(len(uris))
pool := NewOrchestratorPoolWithPred(nil, uris, pred, common.Score_Trusted)
pool := NewOrchestratorPoolWithPred(nil, uris, pred, common.Score_Trusted, []string{})
infos, err := pool.GetOrchestrators(context.TODO(), 1, newStubSuspender(), newStubCapabilities(), common.ScoreAtLeast(0))

assert.Nil(err, "Should not be error")
Expand All @@ -131,12 +132,12 @@ func TestPoolSize(t *testing.T) {
addresses := stringsToURIs([]string{"https://127.0.0.1:8936", "https://127.0.0.1:8937", "https://127.0.0.1:8938"})

assert := assert.New(t)
pool := NewOrchestratorPool(nil, addresses, common.Score_Trusted)
pool := NewOrchestratorPool(nil, addresses, common.Score_Trusted, []string{})
assert.Equal(3, pool.Size())

// will results in len(uris) <= 0 -> log Error
errorLogsBefore := glog.Stats.Error.Lines()
pool = NewOrchestratorPool(nil, nil, common.Score_Trusted)
pool = NewOrchestratorPool(nil, nil, common.Score_Trusted, []string{})
errorLogsAfter := glog.Stats.Error.Lines()
assert.Equal(0, pool.Size())
assert.NotZero(t, errorLogsAfter-errorLogsBefore)
Expand All @@ -162,7 +163,7 @@ func TestDBOrchestratorPoolCacheSize(t *testing.T) {
goleak.VerifyNone(t, common.IgnoreRoutines()...)
}()

emptyPool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{})
emptyPool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{}, []string{})
require.NoError(err)
require.NotNil(emptyPool)
assert.Equal(0, emptyPool.Size())
Expand All @@ -173,7 +174,7 @@ func TestDBOrchestratorPoolCacheSize(t *testing.T) {
dbh.UpdateOrch(ethOrchToDBOrch(o))
}

nonEmptyPool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{})
nonEmptyPool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{}, []string{})
require.NoError(err)
require.NotNil(nonEmptyPool)
assert.Equal(len(addresses), nonEmptyPool.Size())
Expand Down Expand Up @@ -217,7 +218,7 @@ func TestNewDBOrchestorPoolCache_NoEthAddress(t *testing.T) {

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
pool, err := NewDBOrchestratorPoolCache(ctx, node, rm)
pool, err := NewDBOrchestratorPoolCache(ctx, node, rm, []string{})
require.Nil(err)

// Check that serverGetOrchInfo returns early and the orchestrator isn't updated
Expand Down Expand Up @@ -271,7 +272,7 @@ func TestNewDBOrchestratorPoolCache_InvalidPrices(t *testing.T) {

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
pool, err := NewDBOrchestratorPoolCache(ctx, node, rm)
pool, err := NewDBOrchestratorPoolCache(ctx, node, rm, []string{})
require.Nil(err)

// priceInfo.PixelsPerUnit = 0
Expand Down Expand Up @@ -342,7 +343,7 @@ func TestNewDBOrchestratorPoolCache_GivenListOfOrchs_CreatesPoolCacheCorrectly(t

sender.On("ValidateTicketParams", mock.Anything).Return(nil).Times(3)

pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{})
pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{}, []string{})
require.NoError(err)
assert.Equal(pool.Size(), 3)
orchs, err := pool.GetOrchestrators(context.TODO(), pool.Size(), newStubSuspender(), newStubCapabilities(), common.ScoreAtLeast(0))
Expand Down Expand Up @@ -412,7 +413,7 @@ func TestNewDBOrchestratorPoolCache_TestURLs(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{})
pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{}, []string{})
require.NoError(err)
// bad URLs are inserted in the database but are not included in the working set, as there is no returnable query for getting their priceInfo
// And if URL is updated it won't be picked up until next cache update
Expand Down Expand Up @@ -445,7 +446,7 @@ func TestNewDBOrchestratorPoolCache_TestURLs_Empty(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{})
pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{}, []string{})
require.NoError(err)
assert.Equal(0, pool.Size())
infos := pool.GetInfos()
Expand Down Expand Up @@ -530,7 +531,7 @@ func TestNewDBOrchestorPoolCache_PollOrchestratorInfo(t *testing.T) {
origCacheRefreshInterval := cacheRefreshInterval
cacheRefreshInterval = 200 * time.Millisecond
defer func() { cacheRefreshInterval = origCacheRefreshInterval }()
pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{})
pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{}, []string{})
require.NoError(err)

// Ensure orchestrators exist in DB
Expand Down Expand Up @@ -568,7 +569,7 @@ func TestNewOrchestratorPoolCache_GivenListOfOrchs_CreatesPoolCacheCorrectly(t *
assert := assert.New(t)

// creating NewOrchestratorPool with orch addresses
offchainOrch := NewOrchestratorPool(nil, addresses, common.Score_Trusted)
offchainOrch := NewOrchestratorPool(nil, addresses, common.Score_Trusted, []string{})

for i, info := range offchainOrch.infos {
assert.Equal(info.URL.String(), addresses[i].String())
Expand All @@ -594,7 +595,7 @@ func TestNewOrchestratorPoolWithPred_TestPredicate(t *testing.T) {
}
uris := stringsToURIs(addresses)

pool := NewOrchestratorPoolWithPred(nil, uris, pred, common.Score_Trusted)
pool := NewOrchestratorPoolWithPred(nil, uris, pred, common.Score_Trusted, []string{})

oInfo := &net.OrchestratorInfo{
PriceInfo: &net.PriceInfo{
Expand Down Expand Up @@ -684,7 +685,7 @@ func TestCachedPool_AllOrchestratorsTooExpensive_ReturnsEmptyList(t *testing.T)

sender.On("ValidateTicketParams", mock.Anything).Return(nil)

pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{})
pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{}, []string{})
require.NoError(err)

// ensuring orchs exist in DB
Expand Down Expand Up @@ -773,7 +774,7 @@ func TestCachedPool_GetOrchestrators_MaxBroadcastPriceNotSet(t *testing.T) {

sender.On("ValidateTicketParams", mock.Anything).Return(nil)

pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{})
pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{}, []string{})
require.NoError(err)

// ensuring orchs exist in DB
Expand Down Expand Up @@ -879,7 +880,7 @@ func TestCachedPool_N_OrchestratorsGoodPricing_ReturnsNOrchestrators(t *testing.

sender.On("ValidateTicketParams", mock.Anything).Return(nil)

pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{})
pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{}, []string{})
require.NoError(err)

// ensuring orchs exist in DB
Expand Down Expand Up @@ -964,7 +965,7 @@ func TestCachedPool_GetOrchestrators_TicketParamsValidation(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{})
pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{}, []string{})
require.NoError(err)

// Test 25 out of 50 orchs pass ticket params validation
Expand Down Expand Up @@ -1058,7 +1059,7 @@ func TestCachedPool_GetOrchestrators_OnlyActiveOrchestrators(t *testing.T) {

sender.On("ValidateTicketParams", mock.Anything).Return(nil)

pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{round: big.NewInt(24)})
pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{round: big.NewInt(24)}, []string{})
require.NoError(err)

// ensuring orchs exist in DB
Expand Down Expand Up @@ -1276,7 +1277,7 @@ func TestOrchestratorPool_GetOrchestrators(t *testing.T) {
}, err
}

pool := NewOrchestratorPool(nil, addresses, common.Score_Trusted)
pool := NewOrchestratorPool(nil, addresses, common.Score_Trusted, []string{})

// Check that we receive everything
wg.Add(len(addresses))
Expand Down Expand Up @@ -1341,7 +1342,7 @@ func TestOrchestratorPool_GetOrchestrators_SuspendedOrchs(t *testing.T) {
}, err
}

pool := NewOrchestratorPool(nil, addresses, common.Score_Trusted)
pool := NewOrchestratorPool(nil, addresses, common.Score_Trusted, []string{})

// suspend https://127.0.0.1:8938
sus := newStubSuspender()
Expand Down Expand Up @@ -1410,7 +1411,7 @@ func TestOrchestratorPool_ShuffleGetOrchestrators(t *testing.T) {
return &net.OrchestratorInfo{Transcoder: server.String()}, nil
}

pool := NewOrchestratorPool(nil, addresses, common.Score_Trusted)
pool := NewOrchestratorPool(nil, addresses, common.Score_Trusted, []string{})

// Check that randomization happens: check for elements in a different order
// Could fail sometimes due to scheduling; the order of execution is undefined
Expand Down Expand Up @@ -1477,7 +1478,7 @@ func TestOrchestratorPool_GetOrchestratorTimeout(t *testing.T) {
getOrchestratorsTimeoutLoop = 1 * time.Millisecond
defer func() { getOrchestratorsTimeoutLoop = oldTimeout }()

pool := NewOrchestratorPool(nil, addresses, common.Score_Trusted)
pool := NewOrchestratorPool(nil, addresses, common.Score_Trusted, []string{})

timedOut := func(start, end time.Time) bool {
return end.Sub(start).Milliseconds() >= getOrchestratorsTimeoutLoop.Milliseconds()
Expand Down Expand Up @@ -1565,10 +1566,14 @@ func TestOrchestratorPool_Capabilities(t *testing.T) {
i3 := &net.OrchestratorInfo{Capabilities: &net.Capabilities{Bitstring: []uint64{1}}}
// should succeed: compatible caps
i4 := &net.OrchestratorInfo{Capabilities: &net.Capabilities{Bitstring: capCompatString}}

responses := []*net.OrchestratorInfo{i1, i2, i3, i4}
addresses := stringsToURIs([]string{"a://b", "a://b", "a://b", "a://b"})
pool := NewOrchestratorPool(nil, addresses, common.Score_Trusted)
// should be blacklisted
address, err := hex.DecodeString("40B28ee755260ae2735950Fe1BD0a64326ce58b0")
assert.NoError(err)
i5 := &net.OrchestratorInfo{Capabilities: &net.Capabilities{Bitstring: capCompatString}, Address: address}

responses := []*net.OrchestratorInfo{i1, i2, i3, i4, i5}
addresses := stringsToURIs([]string{"a://b", "a://b", "a://b", "a://b", "a://b"})
pool := NewOrchestratorPool(nil, addresses, common.Score_Trusted, []string{hex.EncodeToString(address)})

// some sanity checks
assert.Len(addresses, len(responses))
Expand Down