-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathEventSourcedRepository.cs
317 lines (259 loc) · 10.4 KB
/
EventSourcedRepository.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
using Ddd.Domain.EventSourcing;
using Ddd.Domain.EventSourcing.Exceptions;
using Ddd.Infrastructure.Stores.EventStore;
namespace Ddd.Infrastructure.Repositories;
public interface IEventSourcedRepository<TEventSourced>
where TEventSourced : EventSourcedAggregate
{
TEventSourced? FindById(Guid id);
TEventSourced? FindByIdAndTime(Guid id, DateTime at);
void Add(TEventSourced aggregate);
void Save(TEventSourced aggregate);
void StartTransaction();
Task EndActiveTransactionAndCommitAsync();
Task AbortActiveTransactionAsync();
Task DoMultiTransactionalWork(TEventSourced[] aggregates, Action<TEventSourced[]> action);
}
/// <summary>
/// Manage access to event sourced aggregate store
/// </summary>
/// <typeparam name="TEventSourced">Type of event sourced aggregate</typeparam>
public class EventSourcedRepository<TEventSourced> : IEventSourcedRepository<TEventSourced>
where TEventSourced : EventSourcedAggregate
{
private readonly IEventStore _eventStore;
/// <summary>
/// Stored events limit
/// After 128 events stored a snapshot will be created automatically
/// </summary>
private const int Largest = 128;
public EventSourcedRepository(IEventStore eventStore)
{
_eventStore = eventStore ?? throw new ArgumentNullException(nameof(eventStore));
}
#region [ Api ]
/// <summary>
/// Find a stream given a id.
/// Rebuild the aggregate to the current status applying all the events and return it
/// </summary>
/// <param name="id">Stream / Aggregate Id</param>
/// <returns>Event Sourced Aggregate</returns>
public TEventSourced? FindById(Guid id)
{
if (id.Equals(Guid.Empty))
{
throw new ArgumentException("Invalid Id or not initialized");
}
return InternalFindById(id);
}
/// <summary>
/// Find a stream given a id and a desired date & time
/// Rebuild the aggregate to the current status applying all the events and return it
/// TODO: Deve essere possibile costruire l'aggregato usando un campo DateTime definito dal tipo di dato stesso oltre quello di default created. Per esempio nel caso di dati mutlitemporali.
/// </summary>
/// <param name="id">Stream / Aggregate Id</param>
/// <param name="at">Less or equal date & time</param>
/// <returns>Event Sourced Aggregate</returns>
public TEventSourced? FindByIdAndTime(Guid id, DateTime at)
{
if (id.Equals(Guid.Empty))
{
throw new ArgumentException("Invalid Id or not initialized");
}
return InternalFindByIdAndTime(id, at);
}
/// <summary>
/// Create a new stream
/// </summary>
/// <param name="aggregate">Event sourced aggregate</param>
/// <returns>Task</returns>
public void Add(TEventSourced aggregate)
{
if (aggregate.Id.Equals(Guid.Empty))
{
throw new ArgumentException("Invalid Id or not initialized");
}
var streamName = StreamNameFor(aggregate.Id);
_eventStore.CreateNewStream(streamName);
}
/// <summary>
/// Add uncommitted changes to an existent stream
/// </summary>
/// <param name="aggregate">Event sourced aggregate</param>
/// <returns>Task</returns>
public void Save(TEventSourced aggregate)
{
if (aggregate.Id.Equals(Guid.Empty))
{
throw new ArgumentException("Invalid Id or not initialized");
}
if (!aggregate.Changes.Any())
{
return;
}
SaveFunc(aggregate);
}
/// <summary>
/// Allow optionally actions on aggregates and save it in a transactional scope
/// </summary>
/// <param name="action">Do your work</param>
/// <param name="aggregates">Working entities</param>
/// <returns></returns>
public async Task DoMultiTransactionalWork(TEventSourced[] aggregates, Action<TEventSourced[]> action)
{
StartTransaction();
try
{
action(aggregates);
aggregates.ToList().ForEach(Save);
await EndActiveTransactionAndCommitAsync();
}
catch (Exception)
{
await AbortActiveTransactionAsync();
throw;
}
}
/// <summary>
/// Start a new transaction
/// </summary>
public void StartTransaction()
{
_eventStore.StarTransaction();
}
/// <summary>
/// End transaction and commit changes
/// </summary>
/// <returns>Task</returns>
public async Task EndActiveTransactionAndCommitAsync()
{
await _eventStore.EndActiveTransactionAndCommitAsync();
}
/// <summary>
/// Abort current transaction
/// </summary>
/// <returns>Task</returns>
public async Task AbortActiveTransactionAsync()
{
await _eventStore.AbortActiveTransactionAsync();
}
#endregion
#region [ Private ]
private void SaveFunc(TEventSourced aggregate)
{
// Avoid to add a largest number of events one shot. If you need to store more then 'Largest' events,
// split the operation in two or more 'save' operation to keep stream size smaller, and allow
// to create automatically one or more snapshot of the aggregate to drive the system
// in performance optimization as result.
if (aggregate.Changes.Count >= Largest)
{
throw new DomainEventsToAddLimitException($"If you need to store more then '{Largest}' events, " +
"split the operation in two or more 'save' operations to " +
"keep stream size smaller and allow to create automatically " +
"one or more snapshots");
}
var streamName = StreamNameFor(aggregate.Id);
_eventStore.AppendEventsToStream(streamName, aggregate.Changes, aggregate.Version);
// Get the stream size
var streamSize = (int) _eventStore.GetEventsStreamSize(streamName);
// Get the latest snapshot version
var snapshot = _eventStore.GetLatestSnapshot(streamName);
// Evaluate if snapshot creation is needed (after 'Largest' events stored)
if ((streamSize - (snapshot?.Version ?? 0)) >= Largest)
{
MakeASnapshot(aggregate.Id);
}
}
private TEventSourced CreateAggregate(Guid id)
{
var ctor = typeof(TEventSourced)
.GetConstructors()
.FirstOrDefault(c =>
c.IsPublic &&
c.GetParameters().SingleOrDefault()?.ParameterType == typeof(Guid));
return (TEventSourced) ctor?.Invoke(new object[] {id})! ??
throw new InvalidOperationException(
$"Type: {typeof(TEventSourced)} " +
$"must have a public constructor with Guid as unique parameter");
}
private TEventSourced? RebuildAggregateFromEvents(TEventSourced? aggregate, IEnumerable<ChangeEvent> events)
{
// Rebuild the aggregate status applying all the events on the latest snapshot or
// on a new aggregate generated from scratch
foreach(var @event in events)
{
aggregate?.Apply(@event);
}
return aggregate;
}
private TEventSourced? InternalFindByIdAndTime(Guid id, DateTime at)
{
var streamName = StreamNameFor(id);
// Check latest snapshot before the requested time
var snapshot = _eventStore.GetSnapshot(streamName, at);
// Create a new aggregate instance
var aggregate = CreateAggregate(id);
// If there is a snapshot load from it before
if (snapshot != null)
{
aggregate.LoadFromSnapshot(snapshot);
}
// Retrieves the version of the corresponding aggregate that is less than or equal to the specified date
var toVersion = _eventStore.GetVersionAt(streamName, at);
// No aggregate version found, return a null object
if (snapshot == null && toVersion == null)
{
return null;
}
// Get events From aggregate version (zero or the version of the snapshot)
var fromEventNumber = aggregate.Version;
// To the desired version specified
var toEventNumber = toVersion ?? 0;
var storageStream = _eventStore.GetStream(streamName, fromEventNumber, toEventNumber);
var stream = storageStream.ToList();
// No aggregate version found, return a null object
if (!stream.Any() && snapshot == null)
{
return null;
}
// Rebuild the aggregate from his events
return RebuildAggregateFromEvents(aggregate, stream);
}
private TEventSourced? InternalFindById(Guid id)
{
var streamName = StreamNameFor(id);
// Check latest snapshot
var snapshot = _eventStore.GetLatestSnapshot(streamName);
// Create a new aggregate instance
var aggregate = CreateAggregate(id);
// If there is a snapshot load events from it before
if (snapshot != null)
{
aggregate.LoadFromSnapshot(snapshot);
}
// Get events from aggregate version (zero or the version of the snapshot)
var fromEventNumber = aggregate.Version;
// Until the latest version
var toEventNumber = fromEventNumber + Largest;
var storageStream = _eventStore.GetStream(streamName, fromEventNumber, toEventNumber);
var stream = storageStream.ToList();
// No aggregate version found, return a null object
if (!stream.Any() && snapshot == null)
{
return null;
}
// Rebuild the aggregate from his events
return RebuildAggregateFromEvents(aggregate, stream);
}
private string StreamNameFor(Guid id)
{
return $"{typeof(TEventSourced).Name}-{id}";
}
private void MakeASnapshot(Guid id)
{
var aggregate = FindById(id);
var snapshot = aggregate?.Snapshot() ?? throw new NullReferenceException(nameof(TEventSourced));
_eventStore.AddSnapshot(StreamNameFor(aggregate.Id), snapshot);
}
#endregion
}