Skip to content

Commit

Permalink
core: distinguish notarypool/mempool metrics
Browse files Browse the repository at this point in the history
Move them to the core/network packages, close #2950. The name of
mempool's unsorted transactions metrics has been changed along the
way to match the core's metrics naming convention.

Signed-off-by: Anna Shaleva <shaleva.ann@nspcc.ru>
  • Loading branch information
AnnaShaleva committed Apr 13, 2023
1 parent 7bcc62d commit 3a71aaf
Show file tree
Hide file tree
Showing 14 changed files with 66 additions and 54 deletions.
2 changes: 1 addition & 1 deletion internal/fakechain/fakechain.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func NewFakeChainWithCustomCfg(protocolCfg func(c *config.Blockchain)) *FakeChai
protocolCfg(&cfg)
}
return &FakeChain{
Pool: mempool.New(10, 0, false),
Pool: mempool.New(10, 0, false, nil),
PoolTxF: func(*transaction.Transaction) error { return nil },
poolTxWithData: func(*transaction.Transaction, any, *mempool.Pool) error { return nil },
blocks: make(map[util.Uint256]*block.Block),
Expand Down
2 changes: 1 addition & 1 deletion pkg/consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ func (s *service) verifyBlock(b block.Block) bool {
}

var fee int64
var pool = mempool.New(len(coreb.Transactions), 0, false)
var pool = mempool.New(len(coreb.Transactions), 0, false, nil)
var mainPool = s.Chain.GetMemPool()
for _, tx := range coreb.Transactions {
var err error
Expand Down
6 changes: 3 additions & 3 deletions pkg/core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ func NewBlockchain(s storage.Store, cfg config.Blockchain, log *zap.Logger) (*Bl
store: s,
stopCh: make(chan struct{}),
runToExitCh: make(chan struct{}),
memPool: mempool.New(cfg.MemPoolSize, 0, false),
memPool: mempool.New(cfg.MemPoolSize, 0, false, updateMempoolMetrics),
log: log,
events: make(chan bcEvent),
subCh: make(chan any),
Expand Down Expand Up @@ -1446,7 +1446,7 @@ func (bc *Blockchain) AddBlock(block *block.Block) error {
if !block.MerkleRoot.Equals(merkle) {
return errors.New("invalid block: MerkleRoot mismatch")
}
mp = mempool.New(len(block.Transactions), 0, false)
mp = mempool.New(len(block.Transactions), 0, false, nil)
for _, tx := range block.Transactions {
var err error
// Transactions are verified before adding them
Expand Down Expand Up @@ -2651,7 +2651,7 @@ func (bc *Blockchain) IsTxStillRelevant(t *transaction.Transaction, txpool *memp
// current blockchain state. Note that this verification is completely isolated
// from the main node's mempool.
func (bc *Blockchain) VerifyTx(t *transaction.Transaction) error {
var mp = mempool.New(1, 0, false)
var mp = mempool.New(1, 0, false, nil)
bc.lock.RLock()
defer bc.lock.RUnlock()
return bc.verifyAndPoolTx(t, mp, bc)
Expand Down
4 changes: 2 additions & 2 deletions pkg/core/blockchain_neotest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1301,7 +1301,7 @@ func TestBlockchain_VerifyTx(t *testing.T) {
require.True(t, errors.Is(err, core.ErrAlreadyExists))
})
t.Run("MemPoolOOM", func(t *testing.T) {
mp := mempool.New(1, 0, false)
mp := mempool.New(1, 0, false, nil)
tx1 := newTestTx(t, h, testScript)
tx1.NetworkFee += 10000 // Give it more priority.
require.NoError(t, accs[0].SignTx(netmode.UnitTestNet, tx1))
Expand Down Expand Up @@ -1860,7 +1860,7 @@ func TestBlockchain_VerifyTx(t *testing.T) {
return tx
}

mp := mempool.New(10, 1, false)
mp := mempool.New(10, 1, false, nil)
verificationF := func(tx *transaction.Transaction, data any) error {
if data.(int) > 5 {
return errors.New("bad data")
Expand Down
18 changes: 12 additions & 6 deletions pkg/core/mempool/mem_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,10 @@ type Pool struct {
// oracleResp contains the ids of oracle responses for the tx in the pool.
oracleResp map[uint64]util.Uint256

capacity int
feePerByte int64
payerIndex int
capacity int
feePerByte int64
payerIndex int
updateMetricsCb func(int)

resendThreshold uint32
resendFunc func(*transaction.Transaction, any)
Expand Down Expand Up @@ -286,7 +287,9 @@ func (mp *Pool) Add(t *transaction.Transaction, fee Feer, data ...any) error {
// we already checked balance in checkTxConflicts, so don't need to check again
mp.tryAddSendersFee(pItem.txn, fee, false)

updateMempoolMetrics(len(mp.verifiedTxes))
if mp.updateMetricsCb != nil {
mp.updateMetricsCb(len(mp.verifiedTxes))
}
mp.lock.Unlock()

if mp.subscriptionsOn.Load() {
Expand Down Expand Up @@ -342,7 +345,9 @@ func (mp *Pool) removeInternal(hash util.Uint256, feer Feer) {
}
}
}
updateMempoolMetrics(len(mp.verifiedTxes))
if mp.updateMetricsCb != nil {
mp.updateMetricsCb(len(mp.verifiedTxes))
}
}

// RemoveStale filters verified transactions through the given function keeping
Expand Down Expand Up @@ -420,7 +425,7 @@ func (mp *Pool) checkPolicy(tx *transaction.Transaction, policyChanged bool) boo
}

// New returns a new Pool struct.
func New(capacity int, payerIndex int, enableSubscriptions bool) *Pool {
func New(capacity int, payerIndex int, enableSubscriptions bool, updateMetricsCb func(int)) *Pool {
mp := &Pool{
verifiedMap: make(map[util.Uint256]*transaction.Transaction, capacity),
verifiedTxes: make([]item, 0, capacity),
Expand All @@ -434,6 +439,7 @@ func New(capacity int, payerIndex int, enableSubscriptions bool) *Pool {
events: make(chan mempoolevent.Event),
subCh: make(chan chan<- mempoolevent.Event),
unsubCh: make(chan chan<- mempoolevent.Event),
updateMetricsCb: updateMetricsCb,
}
mp.subscriptionsOn.Store(false)
return mp
Expand Down
18 changes: 9 additions & 9 deletions pkg/core/mempool/mem_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (fs *FeerStub) P2PSigExtensionsEnabled() bool {
}

func testMemPoolAddRemoveWithFeer(t *testing.T, fs Feer) {
mp := New(10, 0, false)
mp := New(10, 0, false, nil)
tx := transaction.New([]byte{byte(opcode.PUSH1)}, 0)
tx.Nonce = 0
tx.Signers = []transaction.Signer{{Account: util.Uint160{1, 2, 3}}}
Expand All @@ -66,7 +66,7 @@ func testMemPoolAddRemoveWithFeer(t *testing.T, fs Feer) {
}

func TestMemPoolRemoveStale(t *testing.T) {
mp := New(5, 0, false)
mp := New(5, 0, false, nil)
txs := make([]*transaction.Transaction, 5)
for i := range txs {
txs[i] = transaction.New([]byte{byte(opcode.PUSH1)}, 0)
Expand Down Expand Up @@ -117,7 +117,7 @@ func TestMemPoolAddRemove(t *testing.T) {
func TestOverCapacity(t *testing.T) {
var fs = &FeerStub{balance: 10000000}
const mempoolSize = 10
mp := New(mempoolSize, 0, false)
mp := New(mempoolSize, 0, false, nil)

for i := 0; i < mempoolSize; i++ {
tx := transaction.New([]byte{byte(opcode.PUSH1)}, 0)
Expand Down Expand Up @@ -193,7 +193,7 @@ func TestOverCapacity(t *testing.T) {
func TestGetVerified(t *testing.T) {
var fs = &FeerStub{}
const mempoolSize = 10
mp := New(mempoolSize, 0, false)
mp := New(mempoolSize, 0, false, nil)

txes := make([]*transaction.Transaction, 0, mempoolSize)
for i := 0; i < mempoolSize; i++ {
Expand All @@ -217,7 +217,7 @@ func TestGetVerified(t *testing.T) {
func TestRemoveStale(t *testing.T) {
var fs = &FeerStub{}
const mempoolSize = 10
mp := New(mempoolSize, 0, false)
mp := New(mempoolSize, 0, false, nil)

txes1 := make([]*transaction.Transaction, 0, mempoolSize/2)
txes2 := make([]*transaction.Transaction, 0, mempoolSize/2)
Expand Down Expand Up @@ -250,7 +250,7 @@ func TestRemoveStale(t *testing.T) {
}

func TestMemPoolFees(t *testing.T) {
mp := New(10, 0, false)
mp := New(10, 0, false, nil)
fs := &FeerStub{balance: 10000000}
sender0 := util.Uint160{1, 2, 3}
tx0 := transaction.New([]byte{byte(opcode.PUSH1)}, 0)
Expand Down Expand Up @@ -355,7 +355,7 @@ func TestMempoolItemsOrder(t *testing.T) {
}

func TestMempoolAddRemoveOracleResponse(t *testing.T) {
mp := New(3, 0, false)
mp := New(3, 0, false, nil)
nonce := uint32(0)
fs := &FeerStub{balance: 10000}
newTx := func(netFee int64, id uint64) *transaction.Transaction {
Expand Down Expand Up @@ -425,7 +425,7 @@ func TestMempoolAddRemoveOracleResponse(t *testing.T) {

func TestMempoolAddRemoveConflicts(t *testing.T) {
capacity := 6
mp := New(capacity, 0, false)
mp := New(capacity, 0, false, nil)
var (
fs = &FeerStub{p2pSigExt: true, balance: 100000}
nonce uint32 = 1
Expand Down Expand Up @@ -555,7 +555,7 @@ func TestMempoolAddWithDataGetData(t *testing.T) {
blockHeight: 5,
balance: 100,
}
mp := New(10, 1, false)
mp := New(10, 1, false, nil)
newTx := func(t *testing.T, netFee int64) *transaction.Transaction {
tx := transaction.New([]byte{byte(opcode.RET)}, 0)
tx.Signers = []transaction.Signer{{}, {}}
Expand Down
24 changes: 0 additions & 24 deletions pkg/core/mempool/prometheus.go

This file was deleted.

4 changes: 2 additions & 2 deletions pkg/core/mempool/subscriptions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

func TestSubscriptions(t *testing.T) {
t.Run("disabled subscriptions", func(t *testing.T) {
mp := New(5, 0, false)
mp := New(5, 0, false, nil)
require.Panics(t, func() {
mp.RunSubscriptions()
})
Expand All @@ -24,7 +24,7 @@ func TestSubscriptions(t *testing.T) {

t.Run("enabled subscriptions", func(t *testing.T) {
fs := &FeerStub{balance: 100}
mp := New(2, 0, true)
mp := New(2, 0, true, nil)
mp.RunSubscriptions()
subChan1 := make(chan mempoolevent.Event, 3)
subChan2 := make(chan mempoolevent.Event, 3)
Expand Down
14 changes: 14 additions & 0 deletions pkg/core/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,22 @@ var (
Namespace: "neogo",
},
)
// mempoolUnsortedTx prometheus metric.
mempoolUnsortedTx = prometheus.NewGauge(
prometheus.GaugeOpts{
Help: "Mempool unsorted transactions",
Name: "mempool_unsorted_tx",
Namespace: "neogo",
},
)
)

func init() {
prometheus.MustRegister(
blockHeight,
persistedHeight,
headerHeight,
mempoolUnsortedTx,
)
}

Expand All @@ -51,3 +60,8 @@ func updateHeaderHeightMetric(hHeight uint32) {
func updateBlockHeightMetric(bHeight uint32) {
blockHeight.Set(float64(bHeight))
}

// updateMempoolMetrics updates metric of the number of unsorted txs inside the mempool.
func updateMempoolMetrics(unsortedTxnLen int) {
mempoolUnsortedTx.Set(float64(unsortedTxnLen))
}
16 changes: 16 additions & 0 deletions pkg/network/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,15 @@ var (
},
)
p2pCmds = make(map[CommandType]prometheus.Histogram)

// notarypoolUnsortedTx prometheus metric.
notarypoolUnsortedTx = prometheus.NewGauge(
prometheus.GaugeOpts{
Help: "Notary request pool fallback txs",
Name: "notarypool_unsorted_tx",
Namespace: "neogo",
},
)
)

func init() {
Expand All @@ -59,6 +68,7 @@ func init() {
servAndNodeVersion,
poolCount,
blockQueueLength,
notarypoolUnsortedTx,
)
for _, cmd := range []CommandType{CMDVersion, CMDVerack, CMDGetAddr,
CMDAddr, CMDPing, CMDPong, CMDGetHeaders, CMDHeaders, CMDGetBlocks,
Expand Down Expand Up @@ -103,3 +113,9 @@ func addCmdTimeMetric(cmd CommandType, t time.Duration) {
}
p2pCmds[cmd].Observe(t.Seconds())
}

// updateNotarypoolMetrics updates metric of the number of fallback txs inside
// the notary request pool.
func updateNotarypoolMetrics(unsortedTxnLen int) {
notarypoolUnsortedTx.Set(float64(unsortedTxnLen))
}
2 changes: 1 addition & 1 deletion pkg/network/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy
}
if chain.P2PSigExtensionsEnabled() {
s.notaryFeer = NewNotaryFeer(chain)
s.notaryRequestPool = mempool.New(s.config.P2PNotaryRequestPayloadPoolSize, 1, true)
s.notaryRequestPool = mempool.New(s.config.P2PNotaryRequestPayloadPoolSize, 1, true, updateNotarypoolMetrics)
chain.RegisterPostBlock(func(isRelevant func(*transaction.Transaction, *mempool.Pool, bool) bool, txpool *mempool.Pool, _ *block.Block) {
s.notaryRequestPool.RemoveStale(func(t *transaction.Transaction) bool {
return isRelevant(t, txpool, true)
Expand Down
2 changes: 1 addition & 1 deletion pkg/services/notary/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func getTestNotary(t *testing.T, bc *core.Blockchain, walletPath, pass string, o
Chain: bc,
Log: zaptest.NewLogger(t),
}
mp := mempool.New(10, 1, true)
mp := mempool.New(10, 1, true, nil)
ntr, err := notary.NewNotary(cfg, netmode.UnitTestNet, mp, onTx)
require.NoError(t, err)

Expand Down
2 changes: 1 addition & 1 deletion pkg/services/notary/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func getTestNotary(t *testing.T, bc Ledger, walletPath, pass string) (*wallet.Ac
Password: pass,
},
}
mp := mempool.New(10, 1, true)
mp := mempool.New(10, 1, true, nil)
cfg := Config{
MainCfg: mainCfg,
Chain: bc,
Expand Down
6 changes: 3 additions & 3 deletions pkg/services/notary/notary_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,21 @@ func TestWallet(t *testing.T) {
}
t.Run("unexisting wallet", func(t *testing.T) {
cfg.MainCfg.UnlockWallet.Path = "./testdata/does_not_exists.json"
_, err := NewNotary(cfg, netmode.UnitTestNet, mempool.New(1, 1, true), nil)
_, err := NewNotary(cfg, netmode.UnitTestNet, mempool.New(1, 1, true, nil), nil)
require.Error(t, err)
})

t.Run("bad password", func(t *testing.T) {
cfg.MainCfg.UnlockWallet.Path = "./testdata/notary1.json"
cfg.MainCfg.UnlockWallet.Password = "invalid"
_, err := NewNotary(cfg, netmode.UnitTestNet, mempool.New(1, 1, true), nil)
_, err := NewNotary(cfg, netmode.UnitTestNet, mempool.New(1, 1, true, nil), nil)
require.Error(t, err)
})

t.Run("good", func(t *testing.T) {
cfg.MainCfg.UnlockWallet.Path = "./testdata/notary1.json"
cfg.MainCfg.UnlockWallet.Password = "one"
_, err := NewNotary(cfg, netmode.UnitTestNet, mempool.New(1, 1, true), nil)
_, err := NewNotary(cfg, netmode.UnitTestNet, mempool.New(1, 1, true, nil), nil)
require.NoError(t, err)
})
}
Expand Down

0 comments on commit 3a71aaf

Please sign in to comment.