Skip to content

Commit 755e968

Browse files
authored
Save own mempool txs to disk (#675)
1 parent 6502534 commit 755e968

16 files changed

+335
-107
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -64,3 +64,4 @@ testdata2
6464
*.db
6565
config.json
6666
/config2.json
67+
own-mempool-txs

api/base_api.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ func (api *BaseApi) sendTx(ctx context.Context, from common.Address, to *common.
9797
func (api *BaseApi) sendInternalTx(ctx context.Context, tx *types.Transaction) (common.Hash, error) {
9898
log.Info("Sending new tx", "ip", ctx.Value("remote"), "type", tx.Type, "hash", tx.Hash().Hex(), "nonce", tx.AccountNonce, "epoch", tx.Epoch)
9999

100-
if err := api.txpool.Add(tx); err != nil {
100+
if err := api.txpool.AddInternalTx(tx); err != nil {
101101
return common.Hash{}, err
102102
}
103103

blockchain/blockchain_mocks.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ func NewTestBlockchainWithConfig(withIdentity bool, conf *config.ConsensusConf,
6565

6666
chain.InitializeChain()
6767
appState.Initialize(chain.Head.Height())
68-
txPool.Initialize(chain.Head, secStore.GetAddress())
68+
txPool.Initialize(chain.Head, secStore.GetAddress(), false)
6969

7070
return &TestBlockchain{db, chain}, appState, txPool, key
7171
}
@@ -122,7 +122,7 @@ func NewCustomTestBlockchainWithConfig(blocksCount int, emptyBlocksCount int, ke
122122

123123
result := &TestBlockchain{db, chain}
124124
result.GenerateBlocks(blocksCount).GenerateEmptyBlocks(emptyBlocksCount)
125-
txPool.Initialize(chain.Head, secStore.GetAddress())
125+
txPool.Initialize(chain.Head, secStore.GetAddress(), false)
126126
return result, appState
127127
}
128128

blockchain/blockchain_test.go

+14-14
Original file line numberDiff line numberDiff line change
@@ -553,7 +553,7 @@ func Test_Blockchain_OnlineStatusSwitch(t *testing.T) {
553553

554554
// add pending request to switch online
555555
tx, _ := chain.secStore.SignTx(BuildTx(state, addr, nil, types.OnlineStatusTx, decimal.Zero, decimal.New(20, 0), decimal.Zero, 0, 0, attachments.CreateOnlineStatusAttachment(true)))
556-
err := chain.txpool.Add(tx)
556+
err := chain.txpool.AddInternalTx(tx)
557557
require.NoError(err)
558558

559559
// apply pending status switch
@@ -563,7 +563,7 @@ func Test_Blockchain_OnlineStatusSwitch(t *testing.T) {
563563

564564
// fail to switch online again
565565
tx, _ = chain.secStore.SignTx(BuildTx(state, addr, nil, types.OnlineStatusTx, decimal.Zero, decimal.New(20, 0), decimal.Zero, 0, 0, attachments.CreateOnlineStatusAttachment(true)))
566-
err = chain.txpool.Add(tx)
566+
err = chain.txpool.AddInternalTx(tx)
567567
require.Error(err, "should not validate tx if switch is already pending")
568568

569569
// switch status to online
@@ -576,13 +576,13 @@ func Test_Blockchain_OnlineStatusSwitch(t *testing.T) {
576576
// fail to switch online again
577577
chain.GenerateBlocks(5)
578578
tx, _ = chain.secStore.SignTx(BuildTx(state, addr, nil, types.OnlineStatusTx, decimal.Zero, decimal.New(20, 0), decimal.Zero, 0, 0, attachments.CreateOnlineStatusAttachment(true)))
579-
err = chain.txpool.Add(tx)
579+
err = chain.txpool.AddInternalTx(tx)
580580
require.Error(err, "should not validate tx if identity already has online status")
581581

582582
// add pending request to switch offline
583583
chain.GenerateBlocks(4)
584584
tx, _ = chain.secStore.SignTx(BuildTx(state, addr, nil, types.OnlineStatusTx, decimal.Zero, decimal.New(20, 0), decimal.Zero, 0, 0, attachments.CreateOnlineStatusAttachment(false)))
585-
err = chain.txpool.Add(tx)
585+
err = chain.txpool.AddInternalTx(tx)
586586
require.NoError(err)
587587

588588
// switch status to offline
@@ -594,15 +594,15 @@ func Test_Blockchain_OnlineStatusSwitch(t *testing.T) {
594594

595595
// add pending request to switch offline
596596
tx, _ = chain.secStore.SignTx(BuildTx(state, addr, nil, types.OnlineStatusTx, decimal.Zero, decimal.New(20, 0), decimal.Zero, 0, 0, attachments.CreateOnlineStatusAttachment(true)))
597-
err = chain.txpool.Add(tx)
597+
err = chain.txpool.AddInternalTx(tx)
598598
require.NoError(err)
599599
chain.GenerateBlocks(1)
600600

601601
require.Equal(1, len(state.State.StatusSwitchAddresses()))
602602

603603
// remove pending request to switch online
604604
tx, _ = chain.secStore.SignTx(BuildTx(state, addr, nil, types.OnlineStatusTx, decimal.Zero, decimal.New(20, 0), decimal.Zero, 0, 0, attachments.CreateOnlineStatusAttachment(false)))
605-
err = chain.txpool.Add(tx)
605+
err = chain.txpool.AddInternalTx(tx)
606606
require.NoError(err)
607607
chain.GenerateBlocks(1)
608608

@@ -618,7 +618,7 @@ func Test_Blockchain_OnlineStatusSwitch(t *testing.T) {
618618
require.Equal(uint64(100), chain.Head.Height())
619619

620620
tx, _ = chain.secStore.SignTx(BuildTx(state, addr, nil, types.OnlineStatusTx, decimal.Zero, decimal.New(20, 0), decimal.Zero, 0, 0, attachments.CreateOnlineStatusAttachment(true)))
621-
err = chain.txpool.Add(tx)
621+
err = chain.txpool.AddInternalTx(tx)
622622
require.Nil(err)
623623

624624
// switch status to online
@@ -632,7 +632,7 @@ func Test_Blockchain_OnlineStatusSwitch(t *testing.T) {
632632
chain.CommitState()
633633

634634
tx, _ = chain.secStore.SignTx(BuildTx(state, addr, nil, types.OnlineStatusTx, decimal.Zero, decimal.New(20, 0), decimal.Zero, 0, 0, attachments.CreateOnlineStatusAttachment(true)))
635-
err = chain.txpool.Add(tx)
635+
err = chain.txpool.AddInternalTx(tx)
636636
require.Nil(err)
637637
chain.GenerateBlocks(1)
638638

@@ -692,7 +692,7 @@ func Test_ApplySubmitCeremonyTxs(t *testing.T) {
692692

693693
signed, err := types.SignTx(tx, key)
694694
require.NoError(t, err)
695-
err = chain.txpool.Add(signed)
695+
err = chain.txpool.AddInternalTx(signed)
696696
require.NoError(t, err)
697697

698698
chain.GenerateBlocks(3)
@@ -706,7 +706,7 @@ func Test_ApplySubmitCeremonyTxs(t *testing.T) {
706706
Payload: []byte{0x1},
707707
}
708708
signed, _ = types.SignTx(tx, key)
709-
err = chain.txpool.Add(signed)
709+
err = chain.txpool.AddInternalTx(signed)
710710
require.NoError(t, err)
711711

712712
chain.GenerateBlocks(1)
@@ -720,7 +720,7 @@ func Test_ApplySubmitCeremonyTxs(t *testing.T) {
720720
}
721721
signed, _ = types.SignTx(tx, key)
722722

723-
err = chain.txpool.Add(signed)
723+
err = chain.txpool.AddInternalTx(signed)
724724
require.True(t, err == validation.DuplicatedTx)
725725
}
726726

@@ -735,14 +735,14 @@ func Test_Blockchain_GodAddressInvitesLimit(t *testing.T) {
735735
keyReceiver, _ := crypto.GenerateKey()
736736
receiver := crypto.PubkeyToAddress(keyReceiver.PublicKey)
737737
tx, _ := chain.secStore.SignTx(BuildTx(state, addr, &receiver, types.InviteTx, decimal.Zero, decimal.New(2, 0), decimal.Zero, 0, 0, nil))
738-
require.NoError(chain.txpool.Add(tx))
738+
require.NoError(chain.txpool.AddInternalTx(tx))
739739
chain.GenerateBlocks(1)
740740
}
741741

742742
keyReceiver, _ := crypto.GenerateKey()
743743
receiver := crypto.PubkeyToAddress(keyReceiver.PublicKey)
744744
tx, _ := chain.secStore.SignTx(BuildTx(state, addr, &receiver, types.InviteTx, decimal.Zero, decimal.New(2, 0), decimal.Zero, 0, 0, nil))
745-
require.Equal(validation.InsufficientInvites, chain.txpool.Add(tx), "we should not issue invite if we exceed limit")
745+
require.Equal(validation.InsufficientInvites, chain.txpool.AddInternalTx(tx), "we should not issue invite if we exceed limit")
746746
}
747747

748748
func Test_setNewIdentitiesAttributes(t *testing.T) {
@@ -990,7 +990,7 @@ func Test_Delegation(t *testing.T) {
990990
Payload: payload,
991991
}
992992
signedTx, _ := types.SignTx(tx, key)
993-
return txpool.Add(signedTx)
993+
return txpool.AddInternalTx(signedTx)
994994
}
995995

996996
require.NoError(t, addTx(keys[0], types.DelegateTx, 1, 0, &pool1, nil))

core/ceremony/ceremony.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -813,7 +813,7 @@ func (vc *ValidationCeremony) sendTx(txType uint16, payload []byte) (common.Hash
813813
vc.epochDb.WriteOwnTx(txType, txBytes)
814814
}
815815

816-
err := vc.mempool.Add(signedTx)
816+
err := vc.mempool.AddInternalTx(signedTx)
817817

818818
if err != nil {
819819
if !vc.epochDb.HasSuccessfulOwnTx(signedTx.Hash()) {

core/flip/flipper.go

+6-2
Original file line numberDiff line numberDiff line change
@@ -174,8 +174,12 @@ func (fp *Flipper) addNewFlip(flip *types.Flip, local bool) error {
174174
if local {
175175
log.Info("Sending new flip tx", "hash", flip.Tx.Hash().Hex(), "nonce", flip.Tx.AccountNonce, "epoch", flip.Tx.Epoch)
176176
}
177-
178-
if err := fp.txpool.Add(flip.Tx); err != nil && err != mempool.DuplicateTxError {
177+
if local {
178+
err = fp.txpool.AddInternalTx(flip.Tx)
179+
} else {
180+
err = fp.txpool.AddExternalTxs(flip.Tx)
181+
}
182+
if err != nil && err != mempool.DuplicateTxError {
179183
return err
180184
}
181185
return err

core/mempool/async_txpool.go

+11-6
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package mempool
22

33
import "github.com/idena-network/idena-go/blockchain/types"
44

5-
const batchSize = 10
5+
const batchSize = 1000
66

77
type AsyncTxPool struct {
88
txPool *TxPool
@@ -22,10 +22,15 @@ func NewAsyncTxPool(txPool *TxPool) *AsyncTxPool {
2222
return pool
2323
}
2424

25-
func (pool *AsyncTxPool) Add(tx *types.Transaction) error {
26-
select {
27-
case pool.queue <- tx:
28-
default:
25+
func (pool *AsyncTxPool) AddInternalTx(tx *types.Transaction) error {
26+
panic("not implemented")
27+
}
28+
func (pool *AsyncTxPool) AddExternalTxs(txs ...*types.Transaction) error {
29+
for _, tx := range txs {
30+
select {
31+
case pool.queue <- tx:
32+
default:
33+
}
2934
}
3035
return nil
3136
}
@@ -49,6 +54,6 @@ func (pool *AsyncTxPool) loop() {
4954
break batchLoop
5055
}
5156
}
52-
pool.txPool.AddTxs(batch)
57+
pool.txPool.AddExternalTxs(batch...)
5358
}
5459
}

core/mempool/tx_keeper.go

+121
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
package mempool
2+
3+
import (
4+
"encoding/json"
5+
"github.com/idena-network/idena-go/blockchain/types"
6+
"github.com/idena-network/idena-go/common"
7+
"github.com/idena-network/idena-go/common/hexutil"
8+
"github.com/idena-network/idena-go/crypto"
9+
"github.com/idena-network/idena-go/log"
10+
"io/ioutil"
11+
"os"
12+
"path/filepath"
13+
"sync"
14+
)
15+
16+
const (
17+
Folder = "own-mempool-txs"
18+
)
19+
20+
type txKeeper struct {
21+
txs map[common.Hash]hexutil.Bytes
22+
datadir string
23+
mutex sync.Mutex
24+
}
25+
26+
func NewTxKeeper(datadir string) *txKeeper {
27+
return &txKeeper{datadir: datadir, txs: make(map[common.Hash]hexutil.Bytes)}
28+
}
29+
30+
func (k *txKeeper) persist() error {
31+
file, err := k.openFile()
32+
if err != nil {
33+
return err
34+
}
35+
defer file.Close()
36+
var list []hexutil.Bytes
37+
for _, d := range k.txs {
38+
list = append(list, d)
39+
}
40+
data, err := json.Marshal(list)
41+
if err != nil {
42+
return err
43+
}
44+
if err := file.Truncate(0); err != nil {
45+
return err
46+
}
47+
if _, err := file.Write(data); err != nil {
48+
return err
49+
}
50+
return nil
51+
}
52+
func (k *txKeeper) Load() {
53+
file, err := k.openFile()
54+
defer file.Close()
55+
if err != nil {
56+
return
57+
}
58+
data, err := ioutil.ReadAll(file)
59+
if err != nil {
60+
return
61+
}
62+
list := []hexutil.Bytes{}
63+
if err := json.Unmarshal(data, &list); err != nil {
64+
log.Warn("cannot parse txs.json", "err", err)
65+
}
66+
k.mutex.Lock()
67+
for _, hex := range list {
68+
k.txs[crypto.Hash(hex)] = hex
69+
}
70+
k.mutex.Unlock()
71+
}
72+
73+
func (k *txKeeper) openFile() (file *os.File, err error) {
74+
newpath := filepath.Join(k.datadir, Folder)
75+
if err := os.MkdirAll(newpath, os.ModePerm); err != nil {
76+
return nil, err
77+
}
78+
filePath := filepath.Join(newpath, "txs.json")
79+
f, err := os.OpenFile(filePath, os.O_RDWR|os.O_CREATE, 0666)
80+
if err != nil {
81+
return nil, err
82+
}
83+
return f, nil
84+
}
85+
86+
func (k *txKeeper) AddTx(tx *types.Transaction) {
87+
k.mutex.Lock()
88+
defer k.mutex.Unlock()
89+
if _, ok := k.txs[tx.Hash()]; ok {
90+
return
91+
}
92+
data, _ := tx.ToBytes()
93+
k.txs[tx.Hash()] = data
94+
if err := k.persist(); err != nil {
95+
log.Warn("error while save mempool tx", "err", err)
96+
}
97+
}
98+
99+
func (k *txKeeper) RemoveTx(hash common.Hash) {
100+
k.mutex.Lock()
101+
defer k.mutex.Unlock()
102+
if _, ok := k.txs[hash]; ok {
103+
delete(k.txs, hash)
104+
if err := k.persist(); err != nil {
105+
log.Warn("error while remove mempool tx", "err", err)
106+
}
107+
}
108+
}
109+
110+
func (k *txKeeper) List() []*types.Transaction {
111+
k.mutex.Lock()
112+
defer k.mutex.Unlock()
113+
var result []*types.Transaction
114+
for _, hex := range k.txs {
115+
tx := new(types.Transaction)
116+
if err := tx.FromBytes(hex); err == nil {
117+
result = append(result, tx)
118+
}
119+
}
120+
return result
121+
}

0 commit comments

Comments
 (0)