Skip to content

Commit 421ffe2

Browse files
committed
remove field selector and add try except to streams e2e test
1 parent 5ae709e commit 421ffe2

File tree

3 files changed

+117
-115
lines changed

3 files changed

+117
-115
lines changed

e2e/tests/test_e2e.py

+113-108
Original file line numberDiff line numberDiff line change
@@ -2131,131 +2131,136 @@ def test_stream_resources(self):
21312131
verbs=["create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"]
21322132
)
21332133
cluster_role.rules.append(fes_cluster_role_rule)
2134-
k8s.api.rbac_api.patch_cluster_role("postgres-operator", cluster_role)
21352134

2136-
# create a table in one of the database of acid-minimal-cluster
2137-
create_stream_table = """
2138-
CREATE TABLE test_table (id int, payload jsonb);
2139-
"""
2140-
self.query_database(leader.metadata.name, "foo", create_stream_table)
2135+
try:
2136+
k8s.api.rbac_api.patch_cluster_role("postgres-operator", cluster_role)
21412137

2142-
# update the manifest with the streams section
2143-
patch_streaming_config = {
2144-
"spec": {
2145-
"patroni": {
2146-
"slots": {
2147-
"manual_slot": {
2148-
"type": "physical"
2149-
}
2150-
}
2151-
},
2152-
"streams": [
2153-
{
2154-
"applicationId": "test-app",
2155-
"batchSize": 100,
2156-
"database": "foo",
2157-
"enableRecovery": True,
2158-
"tables": {
2159-
"test_table": {
2160-
"eventType": "test-event",
2161-
"idColumn": "id",
2162-
"payloadColumn": "payload",
2163-
"recoveryEventType": "test-event-dlq"
2138+
# create a table in one of the database of acid-minimal-cluster
2139+
create_stream_table = """
2140+
CREATE TABLE test_table (id int, payload jsonb);
2141+
"""
2142+
self.query_database(leader.metadata.name, "foo", create_stream_table)
2143+
2144+
# update the manifest with the streams section
2145+
patch_streaming_config = {
2146+
"spec": {
2147+
"patroni": {
2148+
"slots": {
2149+
"manual_slot": {
2150+
"type": "physical"
21642151
}
21652152
}
21662153
},
2167-
{
2168-
"applicationId": "test-app2",
2169-
"batchSize": 100,
2170-
"database": "foo",
2171-
"enableRecovery": True,
2172-
"tables": {
2173-
"test_non_exist_table": {
2174-
"eventType": "test-event",
2175-
"idColumn": "id",
2176-
"payloadColumn": "payload",
2177-
"recoveryEventType": "test-event-dlq"
2154+
"streams": [
2155+
{
2156+
"applicationId": "test-app",
2157+
"batchSize": 100,
2158+
"database": "foo",
2159+
"enableRecovery": True,
2160+
"tables": {
2161+
"test_table": {
2162+
"eventType": "test-event",
2163+
"idColumn": "id",
2164+
"payloadColumn": "payload",
2165+
"recoveryEventType": "test-event-dlq"
2166+
}
2167+
}
2168+
},
2169+
{
2170+
"applicationId": "test-app2",
2171+
"batchSize": 100,
2172+
"database": "foo",
2173+
"enableRecovery": True,
2174+
"tables": {
2175+
"test_non_exist_table": {
2176+
"eventType": "test-event",
2177+
"idColumn": "id",
2178+
"payloadColumn": "payload",
2179+
"recoveryEventType": "test-event-dlq"
2180+
}
21782181
}
21792182
}
2180-
}
2181-
]
2183+
]
2184+
}
21822185
}
2183-
}
2184-
k8s.api.custom_objects_api.patch_namespaced_custom_object(
2185-
'acid.zalan.do', 'v1', 'default', 'postgresqls', 'acid-minimal-cluster', patch_streaming_config)
2186-
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
2186+
k8s.api.custom_objects_api.patch_namespaced_custom_object(
2187+
'acid.zalan.do', 'v1', 'default', 'postgresqls', 'acid-minimal-cluster', patch_streaming_config)
2188+
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
21872189

2188-
# check if publication, slot, and fes resource are created
2189-
get_publication_query = """
2190-
SELECT * FROM pg_publication WHERE pubname = 'fes_foo_test_app';
2191-
"""
2192-
get_slot_query = """
2193-
SELECT * FROM pg_replication_slots WHERE slot_name = 'fes_foo_test_app';
2194-
"""
2195-
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_publication_query)), 1,
2196-
"Publication is not created", 10, 5)
2197-
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_slot_query)), 1,
2198-
"Replication slot is not created", 10, 5)
2199-
print('Operator log: {}'.format(k8s.get_operator_log()))
2200-
self.eventuallyEqual(lambda: len(k8s.api.custom_objects_api.list_namespaced_custom_object(
2190+
# check if publication, slot, and fes resource are created
2191+
get_publication_query = """
2192+
SELECT * FROM pg_publication WHERE pubname = 'fes_foo_test_app';
2193+
"""
2194+
get_slot_query = """
2195+
SELECT * FROM pg_replication_slots WHERE slot_name = 'fes_foo_test_app';
2196+
"""
2197+
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_publication_query)), 1,
2198+
"Publication is not created", 10, 5)
2199+
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_slot_query)), 1,
2200+
"Replication slot is not created", 10, 5)
2201+
self.eventuallyEqual(lambda: len(k8s.api.custom_objects_api.list_namespaced_custom_object(
22012202
"zalando.org", "v1", "default", "fabriceventstreams", label_selector="cluster-name=acid-minimal-cluster")["items"]), 1,
22022203
"Could not find Fabric Event Stream resource", 10, 5)
22032204

2204-
# check if the non-existing table in the stream section does not create a publication and slot
2205-
get_publication_query_not_exist_table = """
2206-
SELECT * FROM pg_publication WHERE pubname = 'fes_foo_test_app2';
2207-
"""
2208-
get_slot_query_not_exist_table = """
2209-
SELECT * FROM pg_replication_slots WHERE slot_name = 'fes_foo_test_app2';
2210-
"""
2211-
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_publication_query_not_exist_table)), 0,
2212-
"Publication is created for non-existing tables", 10, 5)
2213-
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_slot_query_not_exist_table)), 0,
2214-
"Replication slot is created for non-existing tables", 10, 5)
2215-
2216-
# grant create and ownership of test_table to foo_user, reset search path to default
2217-
grant_permission_foo_user = """
2218-
GRANT CREATE ON DATABASE foo TO foo_user;
2219-
ALTER TABLE test_table OWNER TO foo_user;
2220-
ALTER ROLE foo_user RESET search_path;
2221-
"""
2222-
self.query_database(leader.metadata.name, "foo", grant_permission_foo_user)
2223-
# non-postgres user creates a publication
2224-
create_nonstream_publication = """
2225-
CREATE PUBLICATION mypublication FOR TABLE test_table;
2226-
"""
2227-
self.query_database_with_user(leader.metadata.name, "foo", create_nonstream_publication, "foo_user")
2205+
# check if the non-existing table in the stream section does not create a publication and slot
2206+
get_publication_query_not_exist_table = """
2207+
SELECT * FROM pg_publication WHERE pubname = 'fes_foo_test_app2';
2208+
"""
2209+
get_slot_query_not_exist_table = """
2210+
SELECT * FROM pg_replication_slots WHERE slot_name = 'fes_foo_test_app2';
2211+
"""
2212+
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_publication_query_not_exist_table)), 0,
2213+
"Publication is created for non-existing tables", 10, 5)
2214+
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_slot_query_not_exist_table)), 0,
2215+
"Replication slot is created for non-existing tables", 10, 5)
2216+
2217+
# grant create and ownership of test_table to foo_user, reset search path to default
2218+
grant_permission_foo_user = """
2219+
GRANT CREATE ON DATABASE foo TO foo_user;
2220+
ALTER TABLE test_table OWNER TO foo_user;
2221+
ALTER ROLE foo_user RESET search_path;
2222+
"""
2223+
self.query_database(leader.metadata.name, "foo", grant_permission_foo_user)
2224+
# non-postgres user creates a publication
2225+
create_nonstream_publication = """
2226+
CREATE PUBLICATION mypublication FOR TABLE test_table;
2227+
"""
2228+
self.query_database_with_user(leader.metadata.name, "foo", create_nonstream_publication, "foo_user")
22282229

2229-
# remove the streams section from the manifest
2230-
patch_streaming_config_removal = {
2231-
"spec": {
2232-
"streams": []
2230+
# remove the streams section from the manifest
2231+
patch_streaming_config_removal = {
2232+
"spec": {
2233+
"streams": []
2234+
}
22332235
}
2234-
}
2235-
k8s.api.custom_objects_api.patch_namespaced_custom_object(
2236-
'acid.zalan.do', 'v1', 'default', 'postgresqls', 'acid-minimal-cluster', patch_streaming_config_removal)
2237-
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
2236+
k8s.api.custom_objects_api.patch_namespaced_custom_object(
2237+
'acid.zalan.do', 'v1', 'default', 'postgresqls', 'acid-minimal-cluster', patch_streaming_config_removal)
2238+
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
22382239

2239-
# check if publication, slot, and fes resource are removed
2240-
self.eventuallyEqual(lambda: len(k8s.api.custom_objects_api.list_namespaced_custom_object(
2240+
# check if publication, slot, and fes resource are removed
2241+
self.eventuallyEqual(lambda: len(k8s.api.custom_objects_api.list_namespaced_custom_object(
22412242
"zalando.org", "v1", "default", "fabriceventstreams", label_selector="cluster-name=acid-minimal-cluster")["items"]), 0,
22422243
'Could not delete Fabric Event Stream resource', 10, 5)
2243-
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_publication_query)), 0,
2244-
"Publication is not deleted", 10, 5)
2245-
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_slot_query)), 0,
2246-
"Replication slot is not deleted", 10, 5)
2247-
2248-
# check the manual_slot and mypublication should not get deleted
2249-
get_manual_slot_query = """
2250-
SELECT * FROM pg_replication_slots WHERE slot_name = 'manual_slot';
2251-
"""
2252-
get_nonstream_publication_query = """
2253-
SELECT * FROM pg_publication WHERE pubname = 'mypublication';
2254-
"""
2255-
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "postgres", get_manual_slot_query)), 1,
2256-
"Slot defined in patroni config is deleted", 10, 5)
2257-
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_nonstream_publication_query)), 1,
2258-
"Publication defined not in stream section is deleted", 10, 5)
2244+
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_publication_query)), 0,
2245+
"Publication is not deleted", 10, 5)
2246+
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_slot_query)), 0,
2247+
"Replication slot is not deleted", 10, 5)
2248+
2249+
# check the manual_slot and mypublication should not get deleted
2250+
get_manual_slot_query = """
2251+
SELECT * FROM pg_replication_slots WHERE slot_name = 'manual_slot';
2252+
"""
2253+
get_nonstream_publication_query = """
2254+
SELECT * FROM pg_publication WHERE pubname = 'mypublication';
2255+
"""
2256+
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "postgres", get_manual_slot_query)), 1,
2257+
"Slot defined in patroni config is deleted", 10, 5)
2258+
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_nonstream_publication_query)), 1,
2259+
"Publication defined not in stream section is deleted", 10, 5)
2260+
2261+
except timeout_decorator.TimeoutError:
2262+
print('Operator log: {}'.format(k8s.get_operator_log()))
2263+
raise
22592264

22602265
@timeout_decorator.timeout(TEST_TIMEOUT_SEC)
22612266
def test_taint_based_eviction(self):

pkg/cluster/streams.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -440,17 +440,17 @@ func (c *Cluster) syncStream(appId string) error {
440440
c.setProcessName("syncing stream with applicationId %s", appId)
441441
c.logger.Debugf("syncing stream with applicationId %s", appId)
442442

443-
listOptions := metav1.ListOptions{
444-
LabelSelector: c.labelsSet(true).String(),
445-
FieldSelector: fmt.Sprintf("spec.applicationId=%s", appId),
446-
}
443+
listOptions := metav1.ListOptions{LabelSelector: c.labelsSet(true).String()}
447444
streams, err = c.KubeClient.FabricEventStreams(c.Namespace).List(context.TODO(), listOptions)
448445
if err != nil {
449446
return fmt.Errorf("could not list of FabricEventStreams for applicationId %s: %v", appId, err)
450447
}
451448

452449
streamExists := false
453450
for _, stream := range streams.Items {
451+
if stream.Spec.ApplicationId != appId {
452+
continue
453+
}
454454
if streamExists {
455455
c.logger.Warningf("more than one event stream with applicationId %s found, delete it", appId)
456456
if err = c.KubeClient.FabricEventStreams(stream.ObjectMeta.Namespace).Delete(context.TODO(), stream.ObjectMeta.Name, metav1.DeleteOptions{}); err != nil {

pkg/cluster/streams_test.go

-3
Original file line numberDiff line numberDiff line change
@@ -450,7 +450,6 @@ func TestGenerateFabricEventStream(t *testing.T) {
450450

451451
listOptions := metav1.ListOptions{
452452
LabelSelector: cluster.labelsSet(true).String(),
453-
FieldSelector: fmt.Sprintf("spec.applicationId=%s", appId),
454453
}
455454
streams, err := cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions)
456455
assert.NoError(t, err)
@@ -524,7 +523,6 @@ func TestSyncStreams(t *testing.T) {
524523
// check that two streams exist
525524
listOptions := metav1.ListOptions{
526525
LabelSelector: cluster.labelsSet(true).String(),
527-
FieldSelector: fmt.Sprintf("spec.applicationId=%s", appId),
528526
}
529527
streams, err := cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions)
530528
assert.NoError(t, err)
@@ -698,7 +696,6 @@ func TestUpdateStreams(t *testing.T) {
698696
// compare stream returned from API with expected stream
699697
listOptions := metav1.ListOptions{
700698
LabelSelector: cluster.labelsSet(true).String(),
701-
FieldSelector: fmt.Sprintf("spec.applicationId=%s", appId),
702699
}
703700
streams := patchPostgresqlStreams(t, cluster, &pg.Spec, listOptions)
704701
result := cluster.generateFabricEventStream(appId)

0 commit comments

Comments
 (0)