1
1
package org .testcontainers .containers ;
2
2
3
+ import org .apache .pulsar .client .admin .ListTopicsOptions ;
3
4
import org .apache .pulsar .client .admin .PulsarAdmin ;
4
5
import org .apache .pulsar .client .admin .PulsarAdminException ;
5
6
import org .apache .pulsar .client .api .Consumer ;
13
14
import org .testcontainers .utility .DockerImageName ;
14
15
15
16
import java .time .Duration ;
17
+ import java .util .List ;
16
18
import java .util .concurrent .CompletableFuture ;
17
19
import java .util .concurrent .TimeUnit ;
18
20
@@ -23,14 +25,14 @@ public class PulsarContainerTest {
23
25
24
26
public static final String TEST_TOPIC = "test_topic" ;
25
27
26
- private static final DockerImageName PULSAR_IMAGE = DockerImageName .parse ("apachepulsar/pulsar:2.10 .0" );
28
+ private static final DockerImageName PULSAR_IMAGE = DockerImageName .parse ("apachepulsar/pulsar:3.0 .0" );
27
29
28
30
@ Test
29
31
public void testUsage () throws Exception {
30
32
try (
31
33
// do not use PULSAR_IMAGE to make the doc looks easier
32
34
// constructorWithVersion {
33
- PulsarContainer pulsar = new PulsarContainer (DockerImageName .parse ("apachepulsar/pulsar:2.10 .0" ));
35
+ PulsarContainer pulsar = new PulsarContainer (DockerImageName .parse ("apachepulsar/pulsar:3.0 .0" ));
34
36
// }
35
37
) {
36
38
pulsar .start ();
@@ -103,31 +105,26 @@ public void testTransactions() throws Exception {
103
105
pulsar .start ();
104
106
105
107
try (PulsarAdmin pulsarAdmin = PulsarAdmin .builder ().serviceHttpUrl (pulsar .getHttpServiceUrl ()).build ()) {
106
- assertThat (
107
- pulsarAdmin
108
- .topics ()
109
- .getList ("pulsar/system" )
110
- .contains ("persistent://pulsar/system/transaction_coordinator_assign-partition-0" )
111
- )
112
- .isTrue ();
108
+ assertTransactionsTopicCreated (pulsarAdmin );
113
109
}
114
110
testTransactionFunctionality (pulsar .getPulsarBrokerUrl ());
115
111
}
116
112
}
117
113
114
+ private void assertTransactionsTopicCreated (PulsarAdmin pulsarAdmin ) throws PulsarAdminException {
115
+ final List <String > topics = pulsarAdmin
116
+ .topics ()
117
+ .getPartitionedTopicList ("pulsar/system" , ListTopicsOptions .builder ().includeSystemTopic (true ).build ());
118
+ assertThat (topics ).contains ("persistent://pulsar/system/transaction_coordinator_assign" );
119
+ }
120
+
118
121
@ Test
119
122
public void testTransactionsAndFunctionsWorker () throws Exception {
120
123
try (PulsarContainer pulsar = new PulsarContainer (PULSAR_IMAGE ).withTransactions ().withFunctionsWorker ()) {
121
124
pulsar .start ();
122
125
123
126
try (PulsarAdmin pulsarAdmin = PulsarAdmin .builder ().serviceHttpUrl (pulsar .getHttpServiceUrl ()).build ();) {
124
- assertThat (
125
- pulsarAdmin
126
- .topics ()
127
- .getList ("pulsar/system" )
128
- .contains ("persistent://pulsar/system/transaction_coordinator_assign-partition-0" )
129
- )
130
- .isTrue ();
127
+ assertTransactionsTopicCreated (pulsarAdmin );
131
128
assertThat (pulsarAdmin .functions ().getFunctions ("public" , "default" )).hasSize (0 );
132
129
}
133
130
testTransactionFunctionality (pulsar .getPulsarBrokerUrl ());
0 commit comments