Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: split notarypool and mempool metrics #2969

Merged
merged 2 commits into from
Apr 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/fakechain/fakechain.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func NewFakeChainWithCustomCfg(protocolCfg func(c *config.Blockchain)) *FakeChai
protocolCfg(&cfg)
}
return &FakeChain{
Pool: mempool.New(10, 0, false),
Pool: mempool.New(10, 0, false, nil),
PoolTxF: func(*transaction.Transaction) error { return nil },
poolTxWithData: func(*transaction.Transaction, any, *mempool.Pool) error { return nil },
blocks: make(map[util.Uint256]*block.Block),
Expand Down
2 changes: 1 addition & 1 deletion pkg/consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ func (s *service) verifyBlock(b block.Block) bool {
}

var fee int64
var pool = mempool.New(len(coreb.Transactions), 0, false)
var pool = mempool.New(len(coreb.Transactions), 0, false, nil)
var mainPool = s.Chain.GetMemPool()
for _, tx := range coreb.Transactions {
var err error
Expand Down
6 changes: 3 additions & 3 deletions pkg/core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ func NewBlockchain(s storage.Store, cfg config.Blockchain, log *zap.Logger) (*Bl
store: s,
stopCh: make(chan struct{}),
runToExitCh: make(chan struct{}),
memPool: mempool.New(cfg.MemPoolSize, 0, false),
memPool: mempool.New(cfg.MemPoolSize, 0, false, updateMempoolMetrics),
log: log,
events: make(chan bcEvent),
subCh: make(chan any),
Expand Down Expand Up @@ -1446,7 +1446,7 @@ func (bc *Blockchain) AddBlock(block *block.Block) error {
if !block.MerkleRoot.Equals(merkle) {
return errors.New("invalid block: MerkleRoot mismatch")
}
mp = mempool.New(len(block.Transactions), 0, false)
mp = mempool.New(len(block.Transactions), 0, false, nil)
for _, tx := range block.Transactions {
var err error
// Transactions are verified before adding them
Expand Down Expand Up @@ -2651,7 +2651,7 @@ func (bc *Blockchain) IsTxStillRelevant(t *transaction.Transaction, txpool *memp
// current blockchain state. Note that this verification is completely isolated
// from the main node's mempool.
func (bc *Blockchain) VerifyTx(t *transaction.Transaction) error {
var mp = mempool.New(1, 0, false)
var mp = mempool.New(1, 0, false, nil)
bc.lock.RLock()
defer bc.lock.RUnlock()
return bc.verifyAndPoolTx(t, mp, bc)
Expand Down
4 changes: 2 additions & 2 deletions pkg/core/blockchain_neotest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1301,7 +1301,7 @@ func TestBlockchain_VerifyTx(t *testing.T) {
require.True(t, errors.Is(err, core.ErrAlreadyExists))
})
t.Run("MemPoolOOM", func(t *testing.T) {
mp := mempool.New(1, 0, false)
mp := mempool.New(1, 0, false, nil)
tx1 := newTestTx(t, h, testScript)
tx1.NetworkFee += 10000 // Give it more priority.
require.NoError(t, accs[0].SignTx(netmode.UnitTestNet, tx1))
Expand Down Expand Up @@ -1860,7 +1860,7 @@ func TestBlockchain_VerifyTx(t *testing.T) {
return tx
}

mp := mempool.New(10, 1, false)
mp := mempool.New(10, 1, false, nil)
verificationF := func(tx *transaction.Transaction, data any) error {
if data.(int) > 5 {
return errors.New("bad data")
Expand Down
18 changes: 12 additions & 6 deletions pkg/core/mempool/mem_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,10 @@ type Pool struct {
// oracleResp contains the ids of oracle responses for the tx in the pool.
oracleResp map[uint64]util.Uint256

capacity int
feePerByte int64
payerIndex int
capacity int
feePerByte int64
payerIndex int
updateMetricsCb func(int)

resendThreshold uint32
resendFunc func(*transaction.Transaction, any)
Expand Down Expand Up @@ -286,7 +287,9 @@ func (mp *Pool) Add(t *transaction.Transaction, fee Feer, data ...any) error {
// we already checked balance in checkTxConflicts, so don't need to check again
mp.tryAddSendersFee(pItem.txn, fee, false)

updateMempoolMetrics(len(mp.verifiedTxes))
if mp.updateMetricsCb != nil {
mp.updateMetricsCb(len(mp.verifiedTxes))
}
mp.lock.Unlock()

if mp.subscriptionsOn.Load() {
Expand Down Expand Up @@ -342,7 +345,9 @@ func (mp *Pool) removeInternal(hash util.Uint256, feer Feer) {
}
}
}
updateMempoolMetrics(len(mp.verifiedTxes))
if mp.updateMetricsCb != nil {
mp.updateMetricsCb(len(mp.verifiedTxes))
}
}

// RemoveStale filters verified transactions through the given function keeping
Expand Down Expand Up @@ -420,7 +425,7 @@ func (mp *Pool) checkPolicy(tx *transaction.Transaction, policyChanged bool) boo
}

// New returns a new Pool struct.
func New(capacity int, payerIndex int, enableSubscriptions bool) *Pool {
func New(capacity int, payerIndex int, enableSubscriptions bool, updateMetricsCb func(int)) *Pool {
mp := &Pool{
verifiedMap: make(map[util.Uint256]*transaction.Transaction, capacity),
verifiedTxes: make([]item, 0, capacity),
Expand All @@ -434,6 +439,7 @@ func New(capacity int, payerIndex int, enableSubscriptions bool) *Pool {
events: make(chan mempoolevent.Event),
subCh: make(chan chan<- mempoolevent.Event),
unsubCh: make(chan chan<- mempoolevent.Event),
updateMetricsCb: updateMetricsCb,
}
mp.subscriptionsOn.Store(false)
return mp
Expand Down
18 changes: 9 additions & 9 deletions pkg/core/mempool/mem_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (fs *FeerStub) P2PSigExtensionsEnabled() bool {
}

func testMemPoolAddRemoveWithFeer(t *testing.T, fs Feer) {
mp := New(10, 0, false)
mp := New(10, 0, false, nil)
tx := transaction.New([]byte{byte(opcode.PUSH1)}, 0)
tx.Nonce = 0
tx.Signers = []transaction.Signer{{Account: util.Uint160{1, 2, 3}}}
Expand All @@ -66,7 +66,7 @@ func testMemPoolAddRemoveWithFeer(t *testing.T, fs Feer) {
}

func TestMemPoolRemoveStale(t *testing.T) {
mp := New(5, 0, false)
mp := New(5, 0, false, nil)
txs := make([]*transaction.Transaction, 5)
for i := range txs {
txs[i] = transaction.New([]byte{byte(opcode.PUSH1)}, 0)
Expand Down Expand Up @@ -117,7 +117,7 @@ func TestMemPoolAddRemove(t *testing.T) {
func TestOverCapacity(t *testing.T) {
var fs = &FeerStub{balance: 10000000}
const mempoolSize = 10
mp := New(mempoolSize, 0, false)
mp := New(mempoolSize, 0, false, nil)

for i := 0; i < mempoolSize; i++ {
tx := transaction.New([]byte{byte(opcode.PUSH1)}, 0)
Expand Down Expand Up @@ -193,7 +193,7 @@ func TestOverCapacity(t *testing.T) {
func TestGetVerified(t *testing.T) {
var fs = &FeerStub{}
const mempoolSize = 10
mp := New(mempoolSize, 0, false)
mp := New(mempoolSize, 0, false, nil)

txes := make([]*transaction.Transaction, 0, mempoolSize)
for i := 0; i < mempoolSize; i++ {
Expand All @@ -217,7 +217,7 @@ func TestGetVerified(t *testing.T) {
func TestRemoveStale(t *testing.T) {
var fs = &FeerStub{}
const mempoolSize = 10
mp := New(mempoolSize, 0, false)
mp := New(mempoolSize, 0, false, nil)

txes1 := make([]*transaction.Transaction, 0, mempoolSize/2)
txes2 := make([]*transaction.Transaction, 0, mempoolSize/2)
Expand Down Expand Up @@ -250,7 +250,7 @@ func TestRemoveStale(t *testing.T) {
}

func TestMemPoolFees(t *testing.T) {
mp := New(10, 0, false)
mp := New(10, 0, false, nil)
fs := &FeerStub{balance: 10000000}
sender0 := util.Uint160{1, 2, 3}
tx0 := transaction.New([]byte{byte(opcode.PUSH1)}, 0)
Expand Down Expand Up @@ -355,7 +355,7 @@ func TestMempoolItemsOrder(t *testing.T) {
}

func TestMempoolAddRemoveOracleResponse(t *testing.T) {
mp := New(3, 0, false)
mp := New(3, 0, false, nil)
nonce := uint32(0)
fs := &FeerStub{balance: 10000}
newTx := func(netFee int64, id uint64) *transaction.Transaction {
Expand Down Expand Up @@ -425,7 +425,7 @@ func TestMempoolAddRemoveOracleResponse(t *testing.T) {

func TestMempoolAddRemoveConflicts(t *testing.T) {
capacity := 6
mp := New(capacity, 0, false)
mp := New(capacity, 0, false, nil)
var (
fs = &FeerStub{p2pSigExt: true, balance: 100000}
nonce uint32 = 1
Expand Down Expand Up @@ -555,7 +555,7 @@ func TestMempoolAddWithDataGetData(t *testing.T) {
blockHeight: 5,
balance: 100,
}
mp := New(10, 1, false)
mp := New(10, 1, false, nil)
newTx := func(t *testing.T, netFee int64) *transaction.Transaction {
tx := transaction.New([]byte{byte(opcode.RET)}, 0)
tx.Signers = []transaction.Signer{{}, {}}
Expand Down
24 changes: 0 additions & 24 deletions pkg/core/mempool/prometheus.go

This file was deleted.

4 changes: 2 additions & 2 deletions pkg/core/mempool/subscriptions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

func TestSubscriptions(t *testing.T) {
t.Run("disabled subscriptions", func(t *testing.T) {
mp := New(5, 0, false)
mp := New(5, 0, false, nil)
require.Panics(t, func() {
mp.RunSubscriptions()
})
Expand All @@ -24,7 +24,7 @@ func TestSubscriptions(t *testing.T) {

t.Run("enabled subscriptions", func(t *testing.T) {
fs := &FeerStub{balance: 100}
mp := New(2, 0, true)
mp := New(2, 0, true, nil)
mp.RunSubscriptions()
subChan1 := make(chan mempoolevent.Event, 3)
subChan2 := make(chan mempoolevent.Event, 3)
Expand Down
20 changes: 17 additions & 3 deletions pkg/core/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,37 +6,46 @@ import (

// Metrics for monitoring service.
var (
//blockHeight prometheus metric.
// blockHeight prometheus metric.
blockHeight = prometheus.NewGauge(
prometheus.GaugeOpts{
Help: "Current index of processed block",
Name: "current_block_height",
Namespace: "neogo",
},
)
//persistedHeight prometheus metric.
// persistedHeight prometheus metric.
persistedHeight = prometheus.NewGauge(
prometheus.GaugeOpts{
Help: "Current persisted block count",
Name: "current_persisted_height",
Namespace: "neogo",
},
)
//headerHeight prometheus metric.
// headerHeight prometheus metric.
headerHeight = prometheus.NewGauge(
prometheus.GaugeOpts{
Help: "Current header height",
Name: "current_header_height",
Namespace: "neogo",
},
)
// mempoolUnsortedTx prometheus metric.
mempoolUnsortedTx = prometheus.NewGauge(
prometheus.GaugeOpts{
Help: "Mempool unsorted transactions",
Name: "mempool_unsorted_tx",
Namespace: "neogo",
},
)
)

func init() {
prometheus.MustRegister(
blockHeight,
persistedHeight,
headerHeight,
mempoolUnsortedTx,
)
}

Expand All @@ -51,3 +60,8 @@ func updateHeaderHeightMetric(hHeight uint32) {
func updateBlockHeightMetric(bHeight uint32) {
blockHeight.Set(float64(bHeight))
}

// updateMempoolMetrics updates metric of the number of unsorted txs inside the mempool.
func updateMempoolMetrics(unsortedTxnLen int) {
mempoolUnsortedTx.Set(float64(unsortedTxnLen))
}
16 changes: 16 additions & 0 deletions pkg/network/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,15 @@ var (
},
)
p2pCmds = make(map[CommandType]prometheus.Histogram)

// notarypoolUnsortedTx prometheus metric.
notarypoolUnsortedTx = prometheus.NewGauge(
prometheus.GaugeOpts{
Help: "Notary request pool fallback txs",
Name: "notarypool_unsorted_tx",
Namespace: "neogo",
},
)
)

func init() {
Expand All @@ -59,6 +68,7 @@ func init() {
servAndNodeVersion,
poolCount,
blockQueueLength,
notarypoolUnsortedTx,
)
for _, cmd := range []CommandType{CMDVersion, CMDVerack, CMDGetAddr,
CMDAddr, CMDPing, CMDPong, CMDGetHeaders, CMDHeaders, CMDGetBlocks,
Expand Down Expand Up @@ -103,3 +113,9 @@ func addCmdTimeMetric(cmd CommandType, t time.Duration) {
}
p2pCmds[cmd].Observe(t.Seconds())
}

// updateNotarypoolMetrics updates metric of the number of fallback txs inside
// the notary request pool.
func updateNotarypoolMetrics(unsortedTxnLen int) {
notarypoolUnsortedTx.Set(float64(unsortedTxnLen))
}
2 changes: 1 addition & 1 deletion pkg/network/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy
}
if chain.P2PSigExtensionsEnabled() {
s.notaryFeer = NewNotaryFeer(chain)
s.notaryRequestPool = mempool.New(s.config.P2PNotaryRequestPayloadPoolSize, 1, true)
s.notaryRequestPool = mempool.New(s.config.P2PNotaryRequestPayloadPoolSize, 1, true, updateNotarypoolMetrics)
chain.RegisterPostBlock(func(isRelevant func(*transaction.Transaction, *mempool.Pool, bool) bool, txpool *mempool.Pool, _ *block.Block) {
s.notaryRequestPool.RemoveStale(func(t *transaction.Transaction) bool {
return isRelevant(t, txpool, true)
Expand Down
2 changes: 1 addition & 1 deletion pkg/services/notary/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func getTestNotary(t *testing.T, bc *core.Blockchain, walletPath, pass string, o
Chain: bc,
Log: zaptest.NewLogger(t),
}
mp := mempool.New(10, 1, true)
mp := mempool.New(10, 1, true, nil)
ntr, err := notary.NewNotary(cfg, netmode.UnitTestNet, mp, onTx)
require.NoError(t, err)

Expand Down
2 changes: 1 addition & 1 deletion pkg/services/notary/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func getTestNotary(t *testing.T, bc Ledger, walletPath, pass string) (*wallet.Ac
Password: pass,
},
}
mp := mempool.New(10, 1, true)
mp := mempool.New(10, 1, true, nil)
cfg := Config{
MainCfg: mainCfg,
Chain: bc,
Expand Down
6 changes: 3 additions & 3 deletions pkg/services/notary/notary_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,21 @@ func TestWallet(t *testing.T) {
}
t.Run("unexisting wallet", func(t *testing.T) {
cfg.MainCfg.UnlockWallet.Path = "./testdata/does_not_exists.json"
_, err := NewNotary(cfg, netmode.UnitTestNet, mempool.New(1, 1, true), nil)
_, err := NewNotary(cfg, netmode.UnitTestNet, mempool.New(1, 1, true, nil), nil)
require.Error(t, err)
})

t.Run("bad password", func(t *testing.T) {
cfg.MainCfg.UnlockWallet.Path = "./testdata/notary1.json"
cfg.MainCfg.UnlockWallet.Password = "invalid"
_, err := NewNotary(cfg, netmode.UnitTestNet, mempool.New(1, 1, true), nil)
_, err := NewNotary(cfg, netmode.UnitTestNet, mempool.New(1, 1, true, nil), nil)
require.Error(t, err)
})

t.Run("good", func(t *testing.T) {
cfg.MainCfg.UnlockWallet.Path = "./testdata/notary1.json"
cfg.MainCfg.UnlockWallet.Password = "one"
_, err := NewNotary(cfg, netmode.UnitTestNet, mempool.New(1, 1, true), nil)
_, err := NewNotary(cfg, netmode.UnitTestNet, mempool.New(1, 1, true, nil), nil)
require.NoError(t, err)
})
}
Expand Down