Skip to content

Commit c7ee34e

Browse files
authored
fix sync streams and add diffs for annotations and owner references (#2728)
* extend and improve hasSlotsInSync unit test * fix sync streams and add diffs for annotations and owner references * incl. current annotations as desired where we do not fully control them * added one more unit test and fixed sub test names * pass maintenance windows to function and update unit test
1 parent aad03f7 commit c7ee34e

File tree

6 files changed

+268
-96
lines changed

6 files changed

+268
-96
lines changed

pkg/cluster/majorversionupgrade.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ func (c *Cluster) majorVersionUpgrade() error {
7373
return nil
7474
}
7575

76-
if !c.isInMainternanceWindow() {
76+
if !isInMainternanceWindow(c.Spec.MaintenanceWindows) {
7777
c.logger.Infof("skipping major version upgrade, not in maintenance window")
7878
return nil
7979
}

pkg/cluster/streams.go

+46-10
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,8 @@ func (c *Cluster) syncPublication(dbName string, databaseSlotsList map[string]za
128128
createPublications[slotName] = tableList
129129
} else if currentTables != tableList {
130130
alterPublications[slotName] = tableList
131+
} else {
132+
(*slotsToSync)[slotName] = slotAndPublication.Slot
131133
}
132134
}
133135

@@ -142,30 +144,34 @@ func (c *Cluster) syncPublication(dbName string, databaseSlotsList map[string]za
142144
return nil
143145
}
144146

145-
var errorMessage error = nil
147+
errors := make([]string, 0)
146148
for publicationName, tables := range createPublications {
147149
if err = c.executeCreatePublication(publicationName, tables); err != nil {
148-
errorMessage = fmt.Errorf("creation of publication %q failed: %v", publicationName, err)
150+
errors = append(errors, fmt.Sprintf("creation of publication %q failed: %v", publicationName, err))
149151
continue
150152
}
151153
(*slotsToSync)[publicationName] = databaseSlotsList[publicationName].Slot
152154
}
153155
for publicationName, tables := range alterPublications {
154156
if err = c.executeAlterPublication(publicationName, tables); err != nil {
155-
errorMessage = fmt.Errorf("update of publication %q failed: %v", publicationName, err)
157+
errors = append(errors, fmt.Sprintf("update of publication %q failed: %v", publicationName, err))
156158
continue
157159
}
158160
(*slotsToSync)[publicationName] = databaseSlotsList[publicationName].Slot
159161
}
160162
for _, publicationName := range deletePublications {
161163
if err = c.executeDropPublication(publicationName); err != nil {
162-
errorMessage = fmt.Errorf("deletion of publication %q failed: %v", publicationName, err)
164+
errors = append(errors, fmt.Sprintf("deletion of publication %q failed: %v", publicationName, err))
163165
continue
164166
}
165167
(*slotsToSync)[publicationName] = nil
166168
}
167169

168-
return errorMessage
170+
if len(errors) > 0 {
171+
return fmt.Errorf("%v", strings.Join(errors, `', '`))
172+
}
173+
174+
return nil
169175
}
170176

171177
func (c *Cluster) generateFabricEventStream(appId string) *zalandov1.FabricEventStream {
@@ -370,7 +376,7 @@ func (c *Cluster) syncStreams() error {
370376
for dbName, databaseSlotsList := range databaseSlots {
371377
err := c.syncPublication(dbName, databaseSlotsList, &slotsToSync)
372378
if err != nil {
373-
c.logger.Warningf("could not sync publications in database %q: %v", dbName, err)
379+
c.logger.Warningf("could not sync all publications in database %q: %v", dbName, err)
374380
continue
375381
}
376382
}
@@ -398,7 +404,7 @@ func (c *Cluster) syncStreams() error {
398404
c.logger.Warningf("could not sync event streams with applicationId %s: %v", appId, err)
399405
}
400406
} else {
401-
c.logger.Warningf("database replication slots for streams with applicationId %s not in sync, skipping event stream sync", appId)
407+
c.logger.Warningf("database replication slots %#v for streams with applicationId %s not in sync, skipping event stream sync", slotsToSync, appId)
402408
}
403409
}
404410

@@ -415,8 +421,9 @@ func hasSlotsInSync(appId string, databaseSlots map[string]map[string]zalandov1.
415421
for dbName, slots := range databaseSlots {
416422
for slotName := range slots {
417423
if slotName == getSlotName(dbName, appId) {
418-
if _, exists := slotsToSync[slotName]; !exists {
424+
if slot, exists := slotsToSync[slotName]; !exists || slot == nil {
419425
allSlotsInSync = false
426+
continue
420427
}
421428
}
422429
}
@@ -432,7 +439,17 @@ func (c *Cluster) syncStream(appId string) error {
432439
if appId == stream.Spec.ApplicationId {
433440
streamExists = true
434441
desiredStreams := c.generateFabricEventStream(appId)
435-
if match, reason := sameStreams(stream.Spec.EventStreams, desiredStreams.Spec.EventStreams); !match {
442+
if !reflect.DeepEqual(stream.ObjectMeta.OwnerReferences, desiredStreams.ObjectMeta.OwnerReferences) {
443+
c.logger.Infof("owner references of event streams with applicationId %s do not match the current ones", appId)
444+
stream.ObjectMeta.OwnerReferences = desiredStreams.ObjectMeta.OwnerReferences
445+
c.setProcessName("updating event streams with applicationId %s", appId)
446+
stream, err := c.KubeClient.FabricEventStreams(stream.Namespace).Update(context.TODO(), stream, metav1.UpdateOptions{})
447+
if err != nil {
448+
return fmt.Errorf("could not update event streams with applicationId %s: %v", appId, err)
449+
}
450+
c.Streams[appId] = stream
451+
}
452+
if match, reason := c.compareStreams(stream, desiredStreams); !match {
436453
c.logger.Debugf("updating event streams with applicationId %s: %s", appId, reason)
437454
desiredStreams.ObjectMeta = stream.ObjectMeta
438455
updatedStream, err := c.updateStreams(desiredStreams)
@@ -459,7 +476,26 @@ func (c *Cluster) syncStream(appId string) error {
459476
return nil
460477
}
461478

462-
func sameStreams(curEventStreams, newEventStreams []zalandov1.EventStream) (match bool, reason string) {
479+
func (c *Cluster) compareStreams(curEventStreams, newEventStreams *zalandov1.FabricEventStream) (match bool, reason string) {
480+
reasons := make([]string, 0)
481+
match = true
482+
483+
// stream operator can add extra annotations so incl. current annotations in desired annotations
484+
desiredAnnotations := c.annotationsSet(curEventStreams.Annotations)
485+
if changed, reason := c.compareAnnotations(curEventStreams.ObjectMeta.Annotations, desiredAnnotations); changed {
486+
match = false
487+
reasons = append(reasons, fmt.Sprintf("new streams annotations do not match: %s", reason))
488+
}
489+
490+
if changed, reason := sameEventStreams(curEventStreams.Spec.EventStreams, newEventStreams.Spec.EventStreams); !changed {
491+
match = false
492+
reasons = append(reasons, fmt.Sprintf("new streams EventStreams array does not match : %s", reason))
493+
}
494+
495+
return match, strings.Join(reasons, ", ")
496+
}
497+
498+
func sameEventStreams(curEventStreams, newEventStreams []zalandov1.EventStream) (match bool, reason string) {
463499
if len(newEventStreams) != len(curEventStreams) {
464500
return false, "number of defined streams is different"
465501
}

0 commit comments

Comments
 (0)