@@ -24,7 +24,9 @@ use crate::interpreter::VMTrace;
24
24
use crate :: lotus_json:: { lotus_json_with_self, HasLotusJson } ;
25
25
use crate :: message:: { ChainMessage , Message as _, SignedMessage } ;
26
26
use crate :: rpc:: error:: ServerError ;
27
- use crate :: rpc:: eth:: filter:: { event:: EventFilter , SkipEvent } ;
27
+ use crate :: rpc:: eth:: filter:: {
28
+ event:: EventFilter , mempool:: MempoolFilter , tipset:: TipSetFilter , SkipEvent ,
29
+ } ;
28
30
use crate :: rpc:: eth:: types:: { EthBlockTrace , EthTrace } ;
29
31
use crate :: rpc:: types:: { ApiTipsetKey , EventEntry , MessageLookup } ;
30
32
use crate :: rpc:: EthEventHandler ;
@@ -50,13 +52,15 @@ use crate::utils::encoding::from_slice_with_fallback;
50
52
use crate :: utils:: multihash:: prelude:: * ;
51
53
use anyhow:: { anyhow, bail, Context , Error , Result } ;
52
54
use cid:: Cid ;
55
+ use filter:: { ParsedFilter , ParsedFilterTipsets } ;
53
56
use fvm_ipld_blockstore:: Blockstore ;
54
57
use fvm_ipld_encoding:: { RawBytes , CBOR , DAG_CBOR , IPLD_RAW } ;
55
58
use ipld_core:: ipld:: Ipld ;
56
59
use itertools:: Itertools ;
57
60
use num:: { BigInt , Zero as _} ;
58
61
use schemars:: JsonSchema ;
59
62
use serde:: { Deserialize , Serialize } ;
63
+ use std:: ops:: RangeInclusive ;
60
64
use std:: str:: FromStr ;
61
65
use std:: sync:: Arc ;
62
66
use utils:: { decode_payload, lookup_eth_address} ;
@@ -2730,6 +2734,36 @@ where
2730
2734
. collect ( )
2731
2735
}
2732
2736
2737
+ fn eth_filter_logs_from_tipsets ( events : & [ CollectedEvent ] ) -> anyhow:: Result < Vec < EthHash > > {
2738
+ events
2739
+ . iter ( )
2740
+ . map ( |event| event. tipset_key . cid ( ) . map ( Into :: into) )
2741
+ . collect ( )
2742
+ }
2743
+
2744
+ fn eth_filter_logs_from_messages < DB : Blockstore > (
2745
+ ctx : & Ctx < DB > ,
2746
+ events : & [ CollectedEvent ] ,
2747
+ ) -> anyhow:: Result < Vec < EthHash > > {
2748
+ events
2749
+ . iter ( )
2750
+ . filter_map ( |event| {
2751
+ match eth_tx_hash_from_message_cid (
2752
+ ctx. store ( ) ,
2753
+ & event. msg_cid ,
2754
+ ctx. state_manager . chain_config ( ) . eth_chain_id ,
2755
+ ) {
2756
+ Ok ( Some ( hash) ) => Some ( Ok ( hash) ) ,
2757
+ Ok ( None ) => {
2758
+ tracing:: warn!( "Ignoring event" ) ;
2759
+ None
2760
+ }
2761
+ Err ( err) => Some ( Err ( err) ) ,
2762
+ }
2763
+ } )
2764
+ . collect ( )
2765
+ }
2766
+
2733
2767
fn eth_filter_logs_from_events < DB : Blockstore > (
2734
2768
ctx : & Ctx < DB > ,
2735
2769
events : & [ CollectedEvent ] ,
@@ -2775,6 +2809,19 @@ fn eth_filter_result_from_events<DB: Blockstore>(
2775
2809
) ?) )
2776
2810
}
2777
2811
2812
+ fn eth_filter_result_from_tipsets ( events : & [ CollectedEvent ] ) -> anyhow:: Result < EthFilterResult > {
2813
+ Ok ( EthFilterResult :: Txs ( eth_filter_logs_from_tipsets ( events) ?) )
2814
+ }
2815
+
2816
+ fn eth_filter_result_from_messages < DB : Blockstore > (
2817
+ ctx : & Ctx < DB > ,
2818
+ events : & [ CollectedEvent ] ,
2819
+ ) -> anyhow:: Result < EthFilterResult > {
2820
+ Ok ( EthFilterResult :: Txs ( eth_filter_logs_from_messages (
2821
+ ctx, events,
2822
+ ) ?) )
2823
+ }
2824
+
2778
2825
pub enum EthGetLogs { }
2779
2826
impl RpcMethod < 1 > for EthGetLogs {
2780
2827
const NAME : & ' static str = "Filecoin.EthGetLogs" ;
@@ -2847,6 +2894,120 @@ impl RpcMethod<1> for EthGetFilterLogs {
2847
2894
}
2848
2895
}
2849
2896
2897
+ pub enum EthGetFilterChanges { }
2898
+ impl RpcMethod < 1 > for EthGetFilterChanges {
2899
+ const NAME : & ' static str = "Filecoin.EthGetFilterChanges" ;
2900
+ const NAME_ALIAS : Option < & ' static str > = Some ( "eth_getFilterChanges" ) ;
2901
+ const N_REQUIRED_PARAMS : usize = 1 ;
2902
+ const PARAM_NAMES : [ & ' static str ; 1 ] = [ "filterId" ] ;
2903
+ const API_PATHS : ApiPaths = ApiPaths :: V1 ;
2904
+ const PERMISSION : Permission = Permission :: Write ;
2905
+ const DESCRIPTION : Option < & ' static str > =
2906
+ Some ( "Returns event logs which occured since the last poll" ) ;
2907
+
2908
+ type Params = ( FilterID , ) ;
2909
+ type Ok = EthFilterResult ;
2910
+ async fn handle (
2911
+ ctx : Ctx < impl Blockstore + Send + Sync + ' static > ,
2912
+ ( filter_id, ) : Self :: Params ,
2913
+ ) -> Result < Self :: Ok , ServerError > {
2914
+ let eth_event_handler = ctx. eth_event_handler . clone ( ) ;
2915
+ if let Some ( store) = & eth_event_handler. filter_store {
2916
+ let filter = store. get ( & filter_id) ?;
2917
+ if let Some ( event_filter) = filter. as_any ( ) . downcast_ref :: < EventFilter > ( ) {
2918
+ let events = ctx
2919
+ . eth_event_handler
2920
+ . get_events_for_parsed_filter (
2921
+ & ctx,
2922
+ & event_filter. into ( ) ,
2923
+ SkipEvent :: OnUnresolvedAddress ,
2924
+ )
2925
+ . await ?;
2926
+ let recent_events: Vec < CollectedEvent > = events
2927
+ . clone ( )
2928
+ . into_iter ( )
2929
+ . filter ( |event| !event_filter. collected . contains ( event) )
2930
+ . collect ( ) ;
2931
+ let filter = Arc :: new ( EventFilter {
2932
+ id : event_filter. id . clone ( ) ,
2933
+ tipsets : event_filter. tipsets . clone ( ) ,
2934
+ addresses : event_filter. addresses . clone ( ) ,
2935
+ keys_with_codec : event_filter. keys_with_codec . clone ( ) ,
2936
+ max_results : event_filter. max_results ,
2937
+ collected : events. clone ( ) ,
2938
+ } ) ;
2939
+ store. update ( filter) ;
2940
+ return Ok ( eth_filter_result_from_events ( & ctx, & recent_events) ?) ;
2941
+ }
2942
+ if let Some ( tipset_filter) = filter. as_any ( ) . downcast_ref :: < TipSetFilter > ( ) {
2943
+ let events = ctx
2944
+ . eth_event_handler
2945
+ . get_events_for_parsed_filter (
2946
+ & ctx,
2947
+ & ParsedFilter :: new_with_tipset ( ParsedFilterTipsets :: Range (
2948
+ // heaviest tipset doesn't have events because its messages haven't been executed yet
2949
+ RangeInclusive :: new (
2950
+ tipset_filter
2951
+ . collected
2952
+ . unwrap_or ( ctx. chain_store ( ) . heaviest_tipset ( ) . epoch ( ) - 1 ) ,
2953
+ // Use -1 to indicate that the range extends until the latest available tipset.
2954
+ -1 ,
2955
+ ) ,
2956
+ ) ) ,
2957
+ SkipEvent :: OnUnresolvedAddress ,
2958
+ )
2959
+ . await ?;
2960
+ let new_collected = events
2961
+ . iter ( )
2962
+ . max_by_key ( |event| event. height )
2963
+ . map ( |e| e. height ) ;
2964
+ if let Some ( height) = new_collected {
2965
+ let filter = Arc :: new ( TipSetFilter {
2966
+ id : tipset_filter. id . clone ( ) ,
2967
+ max_results : tipset_filter. max_results ,
2968
+ collected : Some ( height) ,
2969
+ } ) ;
2970
+ store. update ( filter) ;
2971
+ }
2972
+ return Ok ( eth_filter_result_from_tipsets ( & events) ?) ;
2973
+ }
2974
+ if let Some ( mempool_filter) = filter. as_any ( ) . downcast_ref :: < MempoolFilter > ( ) {
2975
+ let events = ctx
2976
+ . eth_event_handler
2977
+ . get_events_for_parsed_filter (
2978
+ & ctx,
2979
+ & ParsedFilter :: new_with_tipset ( ParsedFilterTipsets :: Range (
2980
+ // heaviest tipset doesn't have events because its messages haven't been executed yet
2981
+ RangeInclusive :: new (
2982
+ mempool_filter
2983
+ . collected
2984
+ . unwrap_or ( ctx. chain_store ( ) . heaviest_tipset ( ) . epoch ( ) - 1 ) ,
2985
+ // Use -1 to indicate that the range extends until the latest available tipset.
2986
+ -1 ,
2987
+ ) ,
2988
+ ) ) ,
2989
+ SkipEvent :: OnUnresolvedAddress ,
2990
+ )
2991
+ . await ?;
2992
+ let new_collected = events
2993
+ . iter ( )
2994
+ . max_by_key ( |event| event. height )
2995
+ . map ( |e| e. height ) ;
2996
+ if let Some ( height) = new_collected {
2997
+ let filter = Arc :: new ( MempoolFilter {
2998
+ id : mempool_filter. id . clone ( ) ,
2999
+ max_results : mempool_filter. max_results ,
3000
+ collected : Some ( height) ,
3001
+ } ) ;
3002
+ store. update ( filter) ;
3003
+ }
3004
+ return Ok ( eth_filter_result_from_messages ( & ctx, & events) ?) ;
3005
+ }
3006
+ }
3007
+ Err ( anyhow:: anyhow!( "method not supported" ) . into ( ) )
3008
+ }
3009
+ }
3010
+
2850
3011
pub enum EthTraceBlock { }
2851
3012
impl RpcMethod < 1 > for EthTraceBlock {
2852
3013
const NAME : & ' static str = "Filecoin.EthTraceBlock" ;
0 commit comments