@@ -31,15 +31,18 @@ import (
31
31
32
32
"github.com/google/uuid"
33
33
"github.com/stretchr/testify/assert"
34
+ "github.com/stretchr/testify/mock"
34
35
"github.com/stretchr/testify/require"
35
36
goakt "github.com/tochemey/goakt/v3/actor"
36
37
"github.com/tochemey/goakt/v3/log"
37
38
"google.golang.org/protobuf/proto"
39
+ "google.golang.org/protobuf/types/known/anypb"
38
40
"google.golang.org/protobuf/types/known/emptypb"
39
41
40
42
"github.com/tochemey/ego/v3/egopb"
41
43
"github.com/tochemey/ego/v3/eventstream"
42
44
"github.com/tochemey/ego/v3/internal/lib"
45
+ mocks "github.com/tochemey/ego/v3/mocks/persistence"
43
46
testpb "github.com/tochemey/ego/v3/test/data/pb/v3"
44
47
"github.com/tochemey/ego/v3/testkit"
45
48
)
@@ -649,6 +652,176 @@ func TestEventSourcedActor(t *testing.T) {
649
652
650
653
lib .Pause (time .Second )
651
654
655
+ // close the stream
656
+ eventStream .Close ()
657
+ // stop the actor system
658
+ err = actorSystem .Stop (ctx )
659
+ assert .NoError (t , err )
660
+ })
661
+ t .Run ("With events store not defined" , func (t * testing.T ) {
662
+ ctx := context .TODO ()
663
+
664
+ // create an instance of events stream
665
+ eventStream := eventstream .New ()
666
+
667
+ // create an actor system
668
+ actorSystem , err := goakt .NewActorSystem ("TestActorSystem" ,
669
+ goakt .WithPassivationDisabled (),
670
+ goakt .WithLogger (log .DiscardLogger ),
671
+ goakt .WithActorInitMaxRetries (3 ))
672
+ require .NoError (t , err )
673
+ assert .NotNil (t , actorSystem )
674
+
675
+ // start the actor system
676
+ err = actorSystem .Start (ctx )
677
+ require .NoError (t , err )
678
+
679
+ lib .Pause (time .Second )
680
+
681
+ // create a persistence id
682
+ persistenceID := uuid .NewString ()
683
+ // create the persistence behavior
684
+ behavior := NewAccountEventSourcedBehavior (persistenceID )
685
+
686
+ // create the persistence actor using the behavior previously created
687
+ actor := newEventSourcedActor (behavior , nil , eventStream )
688
+ // spawn the actor
689
+ pid , err := actorSystem .Spawn (ctx , behavior .ID (), actor )
690
+ require .Error (t , err )
691
+ require .Nil (t , pid )
692
+
693
+ // close the stream
694
+ eventStream .Close ()
695
+ // stop the actor system
696
+ err = actorSystem .Stop (ctx )
697
+ assert .NoError (t , err )
698
+ })
699
+ t .Run ("With events store ping failed" , func (t * testing.T ) {
700
+ ctx := context .TODO ()
701
+
702
+ // create an instance of events stream
703
+ eventStream := eventstream .New ()
704
+
705
+ // create an actor system
706
+ actorSystem , err := goakt .NewActorSystem ("TestActorSystem" ,
707
+ goakt .WithPassivationDisabled (),
708
+ goakt .WithLogger (log .DiscardLogger ),
709
+ goakt .WithActorInitMaxRetries (3 ))
710
+ require .NoError (t , err )
711
+ assert .NotNil (t , actorSystem )
712
+
713
+ // start the actor system
714
+ err = actorSystem .Start (ctx )
715
+ require .NoError (t , err )
716
+
717
+ lib .Pause (time .Second )
718
+
719
+ // create a persistence id
720
+ persistenceID := uuid .NewString ()
721
+ // create the persistence behavior
722
+ behavior := NewAccountEventSourcedBehavior (persistenceID )
723
+
724
+ eventStore := new (mocks.EventsStore )
725
+ eventStore .EXPECT ().Ping (mock .Anything ).Return (assert .AnError )
726
+
727
+ // create the persistence actor using the behavior previously created
728
+ actor := newEventSourcedActor (behavior , eventStore , eventStream )
729
+ // spawn the actor
730
+ pid , err := actorSystem .Spawn (ctx , behavior .ID (), actor )
731
+ require .Error (t , err )
732
+ require .Nil (t , pid )
733
+
734
+ // close the stream
735
+ eventStream .Close ()
736
+ // stop the actor system
737
+ err = actorSystem .Stop (ctx )
738
+ assert .NoError (t , err )
739
+ })
740
+ t .Run ("With events store GetLatestEvent failed" , func (t * testing.T ) {
741
+ ctx := context .TODO ()
742
+
743
+ // create an instance of events stream
744
+ eventStream := eventstream .New ()
745
+
746
+ // create an actor system
747
+ actorSystem , err := goakt .NewActorSystem ("TestActorSystem" ,
748
+ goakt .WithPassivationDisabled (),
749
+ goakt .WithLogger (log .DiscardLogger ),
750
+ goakt .WithActorInitMaxRetries (3 ))
751
+ require .NoError (t , err )
752
+ assert .NotNil (t , actorSystem )
753
+
754
+ // start the actor system
755
+ err = actorSystem .Start (ctx )
756
+ require .NoError (t , err )
757
+
758
+ lib .Pause (time .Second )
759
+
760
+ // create a persistence id
761
+ persistenceID := uuid .NewString ()
762
+ // create the persistence behavior
763
+ behavior := NewAccountEventSourcedBehavior (persistenceID )
764
+
765
+ eventStore := new (mocks.EventsStore )
766
+ eventStore .EXPECT ().Ping (mock .Anything ).Return (nil )
767
+ eventStore .EXPECT ().GetLatestEvent (mock .Anything , persistenceID ).Return (nil , assert .AnError )
768
+
769
+ // create the persistence actor using the behavior previously created
770
+ actor := newEventSourcedActor (behavior , eventStore , eventStream )
771
+ // spawn the actor
772
+ pid , err := actorSystem .Spawn (ctx , behavior .ID (), actor )
773
+ require .Error (t , err )
774
+ require .Nil (t , pid )
775
+
776
+ // close the stream
777
+ eventStream .Close ()
778
+ // stop the actor system
779
+ err = actorSystem .Stop (ctx )
780
+ assert .NoError (t , err )
781
+ })
782
+ t .Run ("With initial parsing failure" , func (t * testing.T ) {
783
+ ctx := context .TODO ()
784
+
785
+ // create an instance of events stream
786
+ eventStream := eventstream .New ()
787
+
788
+ // create an actor system
789
+ actorSystem , err := goakt .NewActorSystem ("TestActorSystem" ,
790
+ goakt .WithPassivationDisabled (),
791
+ goakt .WithLogger (log .DiscardLogger ),
792
+ goakt .WithActorInitMaxRetries (3 ))
793
+ require .NoError (t , err )
794
+ assert .NotNil (t , actorSystem )
795
+
796
+ // start the actor system
797
+ err = actorSystem .Start (ctx )
798
+ require .NoError (t , err )
799
+
800
+ lib .Pause (time .Second )
801
+
802
+ // create a persistence id
803
+ persistenceID := uuid .NewString ()
804
+ // create the persistence behavior
805
+ behavior := NewAccountEventSourcedBehavior (persistenceID )
806
+
807
+ latestEvent := & egopb.Event {
808
+ ResultingState : & anypb.Any {
809
+ TypeUrl : "invalid-type-url" ,
810
+ Value : []byte ("invalid-value" ),
811
+ },
812
+ }
813
+
814
+ eventStore := new (mocks.EventsStore )
815
+ eventStore .EXPECT ().Ping (mock .Anything ).Return (nil )
816
+ eventStore .EXPECT ().GetLatestEvent (mock .Anything , persistenceID ).Return (latestEvent , nil )
817
+
818
+ // create the persistence actor using the behavior previously created
819
+ actor := newEventSourcedActor (behavior , eventStore , eventStream )
820
+ // spawn the actor
821
+ pid , err := actorSystem .Spawn (ctx , behavior .ID (), actor )
822
+ require .Error (t , err )
823
+ require .Nil (t , pid )
824
+
652
825
// close the stream
653
826
eventStream .Close ()
654
827
// stop the actor system
0 commit comments