Skip to content

Commit 74bc89b

Browse files
committed
separated sync and delete logic for Patroni resources
1 parent 0482af4 commit 74bc89b

File tree

4 files changed

+142
-80
lines changed

4 files changed

+142
-80
lines changed

e2e/tests/k8s_api.py

+1-4
Original file line numberDiff line numberDiff line change
@@ -188,10 +188,7 @@ def count_services_with_label(self, labels, namespace='default'):
188188
return len(self.api.core_v1.list_namespaced_service(namespace, label_selector=labels).items)
189189

190190
def count_endpoints_with_label(self, labels, namespace='default'):
191-
eps = self.api.core_v1.list_namespaced_endpoints(namespace, label_selector=labels).items
192-
for ep in eps:
193-
print("found endpoint: {}".format(ep.metadata.name))
194-
return len(eps)
191+
return len(self.api.core_v1.list_namespaced_endpoints(namespace, label_selector=labels).items)
195192

196193
def count_secrets_with_label(self, labels, namespace='default'):
197194
return len(self.api.core_v1.list_namespaced_secret(namespace, label_selector=labels).items)

pkg/cluster/cluster.go

+22-34
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ import (
3030
appsv1 "k8s.io/api/apps/v1"
3131
batchv1 "k8s.io/api/batch/v1"
3232
v1 "k8s.io/api/core/v1"
33-
apipolicyv1 "k8s.io/api/policy/v1"
3433
policyv1 "k8s.io/api/policy/v1"
3534
rbacv1 "k8s.io/api/rbac/v1"
3635
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -62,7 +61,8 @@ type Config struct {
6261
type kubeResources struct {
6362
Services map[PostgresRole]*v1.Service
6463
Endpoints map[PostgresRole]*v1.Endpoints
65-
ConfigMaps map[string]*v1.ConfigMap
64+
PatroniEndpoints map[string]*v1.Endpoints
65+
PatroniConfigMaps map[string]*v1.ConfigMap
6666
Secrets map[types.UID]*v1.Secret
6767
Statefulset *appsv1.StatefulSet
6868
PodDisruptionBudget *policyv1.PodDisruptionBudget
@@ -135,11 +135,12 @@ func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec acidv1.Postgres
135135
systemUsers: make(map[string]spec.PgUser),
136136
podSubscribers: make(map[spec.NamespacedName]chan PodEvent),
137137
kubeResources: kubeResources{
138-
Secrets: make(map[types.UID]*v1.Secret),
139-
Services: make(map[PostgresRole]*v1.Service),
140-
Endpoints: make(map[PostgresRole]*v1.Endpoints),
141-
ConfigMaps: make(map[string]*v1.ConfigMap),
142-
Streams: make(map[string]*zalandov1.FabricEventStream)},
138+
Secrets: make(map[types.UID]*v1.Secret),
139+
Services: make(map[PostgresRole]*v1.Service),
140+
Endpoints: make(map[PostgresRole]*v1.Endpoints),
141+
PatroniEndpoints: make(map[string]*v1.Endpoints),
142+
PatroniConfigMaps: make(map[string]*v1.ConfigMap),
143+
Streams: make(map[string]*zalandov1.FabricEventStream)},
143144
userSyncStrategy: users.DefaultUserSyncStrategy{
144145
PasswordEncryption: passwordEncryption,
145146
RoleDeletionSuffix: cfg.OpConfig.RoleDeletionSuffix,
@@ -363,17 +364,8 @@ func (c *Cluster) Create() (err error) {
363364
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "StatefulSet", "Pods are ready")
364365

365366
// sync resources created by Patroni
366-
if c.patroniKubernetesUseConfigMaps() {
367-
if err = c.syncConfigMaps(); err != nil {
368-
c.logger.Warnf("Patroni configmaps not yet synced: %v", err)
369-
}
370-
} else {
371-
if err = c.syncEndpoint(Patroni); err != nil {
372-
err = fmt.Errorf("%s endpoint not yet synced: %v", Patroni, err)
373-
}
374-
}
375-
if err = c.syncService(Patroni); err != nil {
376-
err = fmt.Errorf("%s servic not yet synced: %v", Patroni, err)
367+
if err = c.syncPatroniResources(); err != nil {
368+
c.logger.Warnf("Patroni resources not yet synced: %v", err)
377369
}
378370

379371
// create database objects unless we are running without pods or disabled
@@ -866,7 +858,7 @@ func (c *Cluster) compareLogicalBackupJob(cur, new *batchv1.CronJob) (match bool
866858
return true, ""
867859
}
868860

869-
func (c *Cluster) comparePodDisruptionBudget(cur, new *apipolicyv1.PodDisruptionBudget) (bool, string) {
861+
func (c *Cluster) comparePodDisruptionBudget(cur, new *policyv1.PodDisruptionBudget) (bool, string) {
870862
//TODO: improve comparison
871863
if match := reflect.DeepEqual(new.Spec, cur.Spec); !match {
872864
return false, "new PDB spec does not match the current one"
@@ -1201,32 +1193,28 @@ func (c *Cluster) Delete() error {
12011193
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "could not delete pod disruption budget: %v", err)
12021194
}
12031195

1204-
for _, role := range []PostgresRole{Master, Replica, Patroni} {
1205-
if err := c.deleteService(role); err != nil {
1206-
anyErrors = true
1207-
c.logger.Warningf("could not delete %s service: %v", role, err)
1208-
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "could not delete %s service: %v", role, err)
1209-
}
1210-
1196+
for _, role := range []PostgresRole{Master, Replica} {
12111197
if !c.patroniKubernetesUseConfigMaps() {
12121198
if err := c.deleteEndpoint(role); err != nil {
12131199
anyErrors = true
12141200
c.logger.Warningf("could not delete %s endpoint: %v", role, err)
12151201
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "could not delete %s endpoint: %v", role, err)
12161202
}
12171203
}
1218-
}
12191204

1220-
if c.patroniKubernetesUseConfigMaps() {
1221-
for _, suffix := range []string{"leader", "config", "sync", "failover"} {
1222-
if err := c.deletePatroniConfigMap(suffix); err != nil {
1223-
anyErrors = true
1224-
c.logger.Warningf("could not delete %s config map: %v", suffix, err)
1225-
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "could not delete %s config map: %v", suffix, err)
1226-
}
1205+
if err := c.deleteService(role); err != nil {
1206+
anyErrors = true
1207+
c.logger.Warningf("could not delete %s service: %v", role, err)
1208+
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "could not delete %s service: %v", role, err)
12271209
}
12281210
}
12291211

1212+
if err := c.deletePatroniResources(); err != nil {
1213+
anyErrors = true
1214+
c.logger.Warningf("could not delete all Patroni resources: %v", err)
1215+
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "could not delete all Patroni resources: %v", err)
1216+
}
1217+
12301218
// Delete connection pooler objects anyway, even if it's not mentioned in the
12311219
// manifest, just to not keep orphaned components in case if something went
12321220
// wrong

pkg/cluster/resources.go

+71-17
Original file line numberDiff line numberDiff line change
@@ -43,20 +43,24 @@ func (c *Cluster) listResources() error {
4343
c.logger.Infof("found secret: %q (uid: %q) namespace: %s", util.NameFromMeta(secret.ObjectMeta), secret.UID, secret.ObjectMeta.Namespace)
4444
}
4545

46+
for role, service := range c.Services {
47+
c.logger.Infof("found %s service: %q (uid: %q)", role, util.NameFromMeta(service.ObjectMeta), service.UID)
48+
}
49+
50+
for role, endpoint := range c.Endpoints {
51+
c.logger.Infof("found %s endpoint: %q (uid: %q)", role, util.NameFromMeta(endpoint.ObjectMeta), endpoint.UID)
52+
}
53+
4654
if c.patroniKubernetesUseConfigMaps() {
47-
for suffix, configmap := range c.ConfigMaps {
48-
c.logger.Infof("found %s config map: %q (uid: %q)", suffix, util.NameFromMeta(configmap.ObjectMeta), configmap.UID)
55+
for suffix, configmap := range c.PatroniConfigMaps {
56+
c.logger.Infof("found %s Patroni config map: %q (uid: %q)", suffix, util.NameFromMeta(configmap.ObjectMeta), configmap.UID)
4957
}
5058
} else {
51-
for role, endpoint := range c.Endpoints {
52-
c.logger.Infof("found %s endpoint: %q (uid: %q)", role, util.NameFromMeta(endpoint.ObjectMeta), endpoint.UID)
59+
for suffix, endpoint := range c.PatroniEndpoints {
60+
c.logger.Infof("found %s Patroni endpoint: %q (uid: %q)", suffix, util.NameFromMeta(endpoint.ObjectMeta), endpoint.UID)
5361
}
5462
}
5563

56-
for role, service := range c.Services {
57-
c.logger.Infof("found %s service: %q (uid: %q)", role, util.NameFromMeta(service.ObjectMeta), service.UID)
58-
}
59-
6064
pods, err := c.listPods()
6165
if err != nil {
6266
return fmt.Errorf("could not get the list of pods: %v", err)
@@ -510,23 +514,73 @@ func (c *Cluster) deleteEndpoint(role PostgresRole) error {
510514
return nil
511515
}
512516

517+
func (c *Cluster) deletePatroniResources() error {
518+
c.setProcessName("deleting Patroni resources")
519+
errors := make([]string, 0)
520+
521+
if err := c.deleteService(Patroni); err != nil {
522+
errors = append(errors, fmt.Sprintf("%v", err))
523+
}
524+
525+
for _, suffix := range patroniObjectSuffixes {
526+
if c.patroniKubernetesUseConfigMaps() {
527+
if err := c.deletePatroniConfigMap(suffix); err != nil {
528+
errors = append(errors, fmt.Sprintf("%v", err))
529+
}
530+
} else {
531+
if err := c.deletePatroniEndpoint(suffix); err != nil {
532+
errors = append(errors, fmt.Sprintf("%v", err))
533+
}
534+
}
535+
}
536+
537+
if len(errors) > 0 {
538+
return fmt.Errorf("%v", strings.Join(errors, `', '`))
539+
}
540+
541+
return nil
542+
}
543+
513544
func (c *Cluster) deletePatroniConfigMap(suffix string) error {
514-
c.setProcessName("deleting config map")
515-
c.logger.Debugln("deleting config map")
516-
if c.ConfigMaps[suffix] == nil {
517-
c.logger.Debugf("there is no %s config map in the cluster", suffix)
545+
c.setProcessName("deleting Patroni config map")
546+
c.logger.Debugln("deleting Patroni config map")
547+
cm := c.PatroniConfigMaps[suffix]
548+
if cm == nil {
549+
c.logger.Debugf("there is no %s Patroni config map in the cluster", suffix)
550+
return nil
551+
}
552+
553+
if err := c.KubeClient.ConfigMaps(cm.Namespace).Delete(context.TODO(), cm.Name, c.deleteOptions); err != nil {
554+
if !k8sutil.ResourceNotFound(err) {
555+
return fmt.Errorf("could not delete %s Patroni config map %q: %v", suffix, cm.Name, err)
556+
}
557+
c.logger.Debugf("%s Patroni config map has already been deleted", suffix)
558+
}
559+
560+
c.logger.Infof("%s Patroni config map %q has been deleted", suffix, util.NameFromMeta(cm.ObjectMeta))
561+
delete(c.PatroniConfigMaps, suffix)
562+
563+
return nil
564+
}
565+
566+
func (c *Cluster) deletePatroniEndpoint(suffix string) error {
567+
c.setProcessName("deleting Patroni endpoint")
568+
c.logger.Debugln("deleting Patroni endpoint")
569+
ep := c.PatroniEndpoints[suffix]
570+
if ep == nil {
571+
c.logger.Debugf("there is no %s Patroni endpoint in the cluster", suffix)
518572
return nil
519573
}
520574

521-
if err := c.KubeClient.ConfigMaps(c.ConfigMaps[suffix].Namespace).Delete(context.TODO(), c.ConfigMaps[suffix].Name, c.deleteOptions); err != nil {
575+
if err := c.KubeClient.Endpoints(ep.Namespace).Delete(context.TODO(), ep.Name, c.deleteOptions); err != nil {
522576
if !k8sutil.ResourceNotFound(err) {
523-
return fmt.Errorf("could not delete %s config map %q: %v", suffix, c.ConfigMaps[suffix].Name, err)
577+
return fmt.Errorf("could not delete %s Patroni endpoint %q: %v", suffix, ep.Name, err)
524578
}
525-
c.logger.Debugf("%s config map has already been deleted", suffix)
579+
c.logger.Debugf("%s Patroni endpoint has already been deleted", suffix)
526580
}
527581

528-
c.logger.Infof("%s config map %q has been deleted", suffix, util.NameFromMeta(c.ConfigMaps[suffix].ObjectMeta))
529-
delete(c.ConfigMaps, suffix)
582+
c.logger.Infof("%s Patroni endpoint %q has been deleted", suffix, util.NameFromMeta(ep.ObjectMeta))
583+
delete(c.PatroniEndpoints, suffix)
530584

531585
return nil
532586
}

pkg/cluster/sync.go

+48-25
Original file line numberDiff line numberDiff line change
@@ -80,10 +80,8 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error {
8080
return err
8181
}
8282

83-
if c.patroniKubernetesUseConfigMaps() {
84-
if err = c.syncConfigMaps(); err != nil {
85-
c.logger.Errorf("could not sync config maps: %v", err)
86-
}
83+
if err = c.syncPatroniResources(); err != nil {
84+
c.logger.Errorf("could not sync Patroni resources: %v", err)
8785
}
8886

8987
// sync volume may already transition volumes to gp3, if iops/throughput or type is specified
@@ -179,40 +177,74 @@ func (c *Cluster) syncFinalizer() error {
179177
return nil
180178
}
181179

182-
func (c *Cluster) syncConfigMaps() error {
183-
for _, suffix := range []string{"leader", "config", "sync", "failover"} {
184-
if err := c.syncConfigMap(suffix); err != nil {
185-
return fmt.Errorf("could not sync %s config map: %v", suffix, err)
180+
func (c *Cluster) syncPatroniResources() error {
181+
errors := make([]string, 0)
182+
183+
if err := c.syncService(Patroni); err != nil {
184+
errors = append(errors, fmt.Sprintf("could not sync %s service: %v", Patroni, err))
185+
}
186+
187+
for _, suffix := range patroniObjectSuffixes {
188+
if c.patroniKubernetesUseConfigMaps() {
189+
if err := c.syncPatroniConfigMap(suffix); err != nil {
190+
errors = append(errors, fmt.Sprintf("could not sync %s Patroni config map: %v", suffix, err))
191+
}
192+
} else {
193+
if err := c.syncPatroniEndpoint(suffix); err != nil {
194+
errors = append(errors, fmt.Sprintf("could not sync %s Patroni endpoint: %v", suffix, err))
195+
}
186196
}
187197
}
188198

199+
if len(errors) > 0 {
200+
return fmt.Errorf("%v", strings.Join(errors, `', '`))
201+
}
202+
189203
return nil
190204
}
191205

192-
func (c *Cluster) syncConfigMap(suffix string) error {
206+
func (c *Cluster) syncPatroniConfigMap(suffix string) error {
193207
var (
194208
cm *v1.ConfigMap
195209
err error
196210
)
197211
name := fmt.Sprintf("%s-%s", c.Name, suffix)
198-
c.logger.Debugf("syncing %s config map", name)
199-
c.setProcessName("syncing %s config map", name)
212+
c.logger.Debugf("syncing %s Patroni config map", name)
213+
c.setProcessName("syncing %s Patroni config map", name)
200214

201215
if cm, err = c.KubeClient.ConfigMaps(c.Namespace).Get(context.TODO(), name, metav1.GetOptions{}); err == nil {
202-
c.ConfigMaps[suffix] = cm
216+
c.PatroniConfigMaps[suffix] = cm
203217
return nil
204218
}
205219
if !k8sutil.ResourceNotFound(err) {
206-
return fmt.Errorf("could not get %s config map: %v", suffix, err)
220+
return fmt.Errorf("could not get %s Patroni config map: %v", suffix, err)
221+
}
222+
223+
return nil
224+
}
225+
226+
func (c *Cluster) syncPatroniEndpoint(suffix string) error {
227+
var (
228+
ep *v1.Endpoints
229+
err error
230+
)
231+
name := fmt.Sprintf("%s-%s", c.Name, suffix)
232+
c.logger.Debugf("syncing %s Patroni endpoint", name)
233+
c.setProcessName("syncing %s Patroni endpoint", name)
234+
235+
if ep, err = c.KubeClient.Endpoints(c.Namespace).Get(context.TODO(), name, metav1.GetOptions{}); err == nil {
236+
c.PatroniEndpoints[suffix] = ep
237+
return nil
238+
}
239+
if !k8sutil.ResourceNotFound(err) {
240+
return fmt.Errorf("could not get %s Patroni endpoint: %v", suffix, err)
207241
}
208-
// no existing config map, Patroni will handle it
209-
c.ConfigMaps[suffix] = nil
210242

211243
return nil
212244
}
213245

214246
func (c *Cluster) syncServices() error {
215-
for _, role := range []PostgresRole{Master, Replica, Patroni} {
247+
for _, role := range []PostgresRole{Master, Replica} {
216248
c.logger.Debugf("syncing %s service", role)
217249

218250
if !c.patroniKubernetesUseConfigMaps() {
@@ -284,11 +316,6 @@ func (c *Cluster) syncEndpoint(role PostgresRole) error {
284316
c.setProcessName("syncing %s endpoint", role)
285317

286318
if ep, err = c.KubeClient.Endpoints(c.Namespace).Get(context.TODO(), c.serviceName(role), metav1.GetOptions{}); err == nil {
287-
c.Endpoints[role] = ep
288-
// do not touch config endpoint managed by Patroni
289-
if role == Patroni {
290-
return nil
291-
}
292319
desiredEp := c.generateEndpoint(role, ep.Subsets)
293320
if changed, _ := c.compareAnnotations(ep.Annotations, desiredEp.Annotations); changed {
294321
patchData, err := metaAnnotationsPatch(desiredEp.Annotations)
@@ -306,10 +333,6 @@ func (c *Cluster) syncEndpoint(role PostgresRole) error {
306333
if !k8sutil.ResourceNotFound(err) {
307334
return fmt.Errorf("could not get %s endpoint: %v", role, err)
308335
}
309-
// if config endpoint does not exist Patroni will create it
310-
if role == Patroni {
311-
return nil
312-
}
313336
// no existing endpoint, create new one
314337
c.Endpoints[role] = nil
315338
c.logger.Infof("could not find the cluster's %s endpoint", role)

0 commit comments

Comments
 (0)