@@ -29,6 +29,7 @@ use ethers::{
29
29
use std:: collections:: HashSet ;
30
30
use std:: sync:: atomic:: { AtomicBool , AtomicU64 , Ordering } ;
31
31
use std:: sync:: Arc ;
32
+ use tokio:: time:: { sleep, Duration } ;
32
33
use tracing:: { info, warn} ;
33
34
34
35
#[ derive( Debug ) ]
@@ -43,7 +44,6 @@ pub struct EventProcessorConfig {
43
44
44
45
pub struct EventProcessor {
45
46
config : EventProcessorConfig ,
46
- provider : Arc < Provider < Ws > > ,
47
47
running : Arc < AtomicBool > ,
48
48
db_store : DBStoreV2 ,
49
49
block_number : Arc < AtomicU64 > ,
@@ -56,13 +56,8 @@ unsafe impl Send for EventProcessor {}
56
56
impl EventProcessor {
57
57
pub async fn new ( config : EventProcessorConfig , db_store : DBStoreV2 ) -> Result < Self > {
58
58
info ! ( "new event processor with config {:?}" , config) ;
59
- let provider = Provider :: < Ws > :: connect ( & config. evm_node_url )
60
- . await
61
- . map_err ( |e| DB3Error :: StoreEventError ( format ! ( "{e}" ) ) ) ?;
62
- let provider_arc = Arc :: new ( provider) ;
63
59
Ok ( Self {
64
60
config,
65
- provider : provider_arc,
66
61
running : Arc :: new ( AtomicBool :: new ( false ) ) ,
67
62
db_store,
68
63
block_number : Arc :: new ( AtomicU64 :: new ( 0 ) ) ,
@@ -91,8 +86,6 @@ impl EventProcessor {
91
86
}
92
87
93
88
pub async fn start ( & self ) -> Result < ( ) > {
94
- let abi: Abi = serde_json:: from_str ( self . config . abi . as_str ( ) )
95
- . map_err ( |e| DB3Error :: StoreEventError ( format ! ( "{e}" ) ) ) ?;
96
89
self . running
97
90
. store ( true , std:: sync:: atomic:: Ordering :: Relaxed ) ;
98
91
let address = self
@@ -102,88 +95,111 @@ impl EventProcessor {
102
95
. map_err ( |e| DB3Error :: StoreEventError ( format ! ( "{e}" ) ) ) ?;
103
96
let db_addr = DB3Address :: from_hex ( self . config . db_addr . as_str ( ) )
104
97
. map_err ( |e| DB3Error :: StoreEventError ( format ! ( "{e}" ) ) ) ?;
105
- let filter = match self . config . start_block == 0 {
106
- true => Filter :: new ( ) . address ( address) ,
107
- false => {
108
- info ! (
109
- "start process contract from block {} with address {}" ,
110
- self . config. start_block, self . config. contract_addr
111
- ) ;
112
- Filter :: new ( )
113
- . from_block ( self . config . start_block )
114
- . address ( address)
115
- }
116
- } ;
117
- let mut stream = self
118
- . provider
119
- . subscribe_logs ( & filter)
120
- . await
98
+ let abi: Abi = serde_json:: from_str ( self . config . abi . as_str ( ) )
121
99
. map_err ( |e| DB3Error :: StoreEventError ( format ! ( "{e}" ) ) ) ?;
122
- info ! (
123
- "event processor for contract {}" ,
124
- self . config. contract_addr. as_str( )
125
- ) ;
126
- while let Some ( log) = stream. next ( ) . await {
100
+ let local_evm_node_url = self . config . evm_node_url . to_string ( ) ;
101
+ loop {
127
102
if !self . running . load ( Ordering :: Relaxed ) {
128
103
info ! (
129
104
"stop event processor for contract {}" ,
130
105
self . config. contract_addr. as_str( )
131
106
) ;
132
107
break ;
133
108
}
134
- if let Some ( number) = log. block_number {
135
- if number. as_u64 ( ) % 10 == 0 {
109
+ let provider =
110
+ Provider :: < Ws > :: connect_with_reconnects ( local_evm_node_url. as_str ( ) , 100 )
111
+ . await
112
+ . map_err ( |e| DB3Error :: StoreEventError ( format ! ( "{e}" ) ) ) ?;
113
+ let provider_arc = Arc :: new ( provider) ;
114
+ let filter = match self . config . start_block == 0 {
115
+ true => Filter :: new ( ) . address ( address) ,
116
+ false => {
136
117
info ! (
137
- "contract {} sync status block {} event number {}" ,
138
- self . config. contract_addr. as_str( ) ,
139
- self . block_number. load( Ordering :: Relaxed ) ,
140
- self . event_number. load( Ordering :: Relaxed )
118
+ "start process contract from block {} with address {}" ,
119
+ self . config. start_block, self . config. contract_addr
141
120
) ;
121
+ Filter :: new ( )
122
+ . from_block ( self . config . start_block )
123
+ . address ( address)
142
124
}
143
- self . block_number . store ( number. as_u64 ( ) , Ordering :: Relaxed )
144
- }
145
- for e in abi. events ( ) {
146
- // verify
147
- let event_signature = log
148
- . topics
149
- . get ( 0 )
150
- . ok_or ( DB3Error :: StoreEventError ( format ! ( "" ) ) ) ?;
125
+ } ;
151
126
152
- if event_signature != & e. signature ( ) {
153
- continue ;
154
- }
155
- if !self . config . target_events . contains ( e. name . as_str ( ) ) {
156
- continue ;
157
- }
158
- let raw_log = RawLog {
159
- topics : log. topics . clone ( ) ,
160
- data : log. data . to_vec ( ) ,
161
- } ;
162
- if let Ok ( log_entry) = e. parse_log ( raw_log) {
163
- let json_value = Self :: log_to_doc ( & log_entry) ;
164
- match serde_json:: to_string ( & json_value) {
165
- Ok ( value) => {
166
- let values = vec ! [ value. to_string( ) ] ;
167
- if let Err ( e) = self . db_store . add_docs (
168
- & db_addr,
169
- & DB3Address :: ZERO ,
170
- e. name . as_str ( ) ,
171
- & values,
172
- None ,
173
- ) {
174
- warn ! ( "fail to write json doc {} for {e}" , value. as_str( ) ) ;
175
- } else {
176
- self . event_number . fetch_add ( 1 , Ordering :: Relaxed ) ;
127
+ if let Ok ( mut stream) = provider_arc. clone ( ) . subscribe_logs ( & filter) . await {
128
+ loop {
129
+ if let Some ( log) = stream. next ( ) . await {
130
+ if !self . running . load ( Ordering :: Relaxed ) {
131
+ info ! (
132
+ "stop event processor for contract {}" ,
133
+ self . config. contract_addr. as_str( )
134
+ ) ;
135
+ break ;
136
+ }
137
+ if let Some ( number) = log. block_number {
138
+ if number. as_u64 ( ) % 10 == 0 {
139
+ info ! (
140
+ "contract {} sync status block {} event number {}" ,
141
+ self . config. contract_addr. as_str( ) ,
142
+ self . block_number. load( Ordering :: Relaxed ) ,
143
+ self . event_number. load( Ordering :: Relaxed )
144
+ ) ;
177
145
}
146
+ self . block_number . store ( number. as_u64 ( ) , Ordering :: Relaxed )
178
147
}
179
- Err ( e) => {
180
- warn ! ( "fail to convert to json for {e} " ) ;
148
+ for e in abi. events ( ) {
149
+ // verify
150
+ let event_signature = log
151
+ . topics
152
+ . get ( 0 )
153
+ . ok_or ( DB3Error :: StoreEventError ( format ! ( "" ) ) ) ?;
154
+ if event_signature != & e. signature ( ) {
155
+ continue ;
156
+ }
157
+ if !self . config . target_events . contains ( e. name . as_str ( ) ) {
158
+ continue ;
159
+ }
160
+ let raw_log = RawLog {
161
+ topics : log. topics . clone ( ) ,
162
+ data : log. data . to_vec ( ) ,
163
+ } ;
164
+ if let Ok ( log_entry) = e. parse_log ( raw_log) {
165
+ let json_value = Self :: log_to_doc ( & log_entry) ;
166
+ match serde_json:: to_string ( & json_value) {
167
+ Ok ( value) => {
168
+ let values = vec ! [ value. to_string( ) ] ;
169
+ if let Err ( e) = self . db_store . add_docs (
170
+ & db_addr,
171
+ & DB3Address :: ZERO ,
172
+ e. name . as_str ( ) ,
173
+ & values,
174
+ None ,
175
+ ) {
176
+ warn ! (
177
+ "fail to write json doc {} for {e}" ,
178
+ value. as_str( )
179
+ ) ;
180
+ } else {
181
+ self . event_number . fetch_add ( 1 , Ordering :: Relaxed ) ;
182
+ }
183
+ }
184
+ Err ( e) => {
185
+ warn ! ( "fail to convert to json for {e} " ) ;
186
+ }
187
+ }
188
+ break ;
189
+ }
181
190
}
191
+ } else {
192
+ warn ! ( "empty log from stream, sleep 5 seconds and reconnect to it" ) ;
193
+ sleep ( Duration :: from_millis ( 5 * 1000 ) ) . await ;
194
+ break ;
182
195
}
183
- break ;
184
196
}
197
+ } else {
198
+ warn ! ( "fail to subscribe the log, sleep 5 seconds and reconnect to it" ) ;
199
+ sleep ( Duration :: from_millis ( 5 * 1000 ) ) . await ;
185
200
}
186
201
}
202
+
187
203
Ok ( ( ) )
188
204
}
189
205
0 commit comments