-
-
Notifications
You must be signed in to change notification settings - Fork 1.7k
/
Copy pathPulsarContainer.java
122 lines (97 loc) · 3.96 KB
/
PulsarContainer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package org.testcontainers.containers;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.containers.wait.strategy.WaitAllStrategy;
import org.testcontainers.utility.DockerImageName;
/**
* Testcontainers implementation for Apache Pulsar.
* <p>
* Supported image: {@code apachepulsar/pulsar}
* <p>
* Exposed ports:
* <ul>
* <li>Pulsar: 6650</li>
* <li>HTTP: 8080</li>
* </ul>
*/
public class PulsarContainer extends GenericContainer<PulsarContainer> {
public static final int BROKER_PORT = 6650;
public static final int BROKER_HTTP_PORT = 8080;
/**
* @deprecated The metrics endpoint is no longer being used for the WaitStrategy.
*/
@Deprecated
public static final String METRICS_ENDPOINT = "/metrics";
private static final String ADMIN_CLUSTERS_ENDPOINT = "/admin/v2/clusters";
/**
* See <a href="https://github.com/apache/pulsar/blob/master/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java">SystemTopicNames</a>.
*/
private static final String TRANSACTION_TOPIC_ENDPOINT =
"/admin/v2/persistent/pulsar/system/transaction_coordinator_assign/partitions";
private static final DockerImageName DEFAULT_IMAGE_NAME = DockerImageName.parse("apachepulsar/pulsar");
@Deprecated
private static final String DEFAULT_TAG = "3.0.0";
private final WaitAllStrategy waitAllStrategy = new WaitAllStrategy();
private boolean functionsWorkerEnabled = false;
private boolean transactionsEnabled = false;
/**
* @deprecated use {@link #PulsarContainer(DockerImageName)} instead
*/
@Deprecated
public PulsarContainer() {
this(DEFAULT_IMAGE_NAME.withTag(DEFAULT_TAG));
}
/**
* @deprecated use {@link #PulsarContainer(DockerImageName)} instead
*/
@Deprecated
public PulsarContainer(String pulsarVersion) {
this(DEFAULT_IMAGE_NAME.withTag(pulsarVersion));
}
public PulsarContainer(final DockerImageName dockerImageName) {
super(dockerImageName);
dockerImageName.assertCompatibleWith(DockerImageName.parse("apachepulsar/pulsar"));
withExposedPorts(BROKER_PORT, BROKER_HTTP_PORT);
setWaitStrategy(waitAllStrategy);
}
@Override
protected void configure() {
super.configure();
setupCommandAndEnv();
}
public PulsarContainer withFunctionsWorker() {
functionsWorkerEnabled = true;
return this;
}
public PulsarContainer withTransactions() {
transactionsEnabled = true;
return this;
}
public String getPulsarBrokerUrl() {
return String.format("pulsar://%s:%s", getHost(), getMappedPort(BROKER_PORT));
}
public String getHttpServiceUrl() {
return String.format("http://%s:%s", getHost(), getMappedPort(BROKER_HTTP_PORT));
}
protected void setupCommandAndEnv() {
String standaloneBaseCommand =
"/pulsar/bin/apply-config-from-env.py /pulsar/conf/standalone.conf " + "&& bin/pulsar standalone";
if (!functionsWorkerEnabled) {
standaloneBaseCommand += " --no-functions-worker -nss";
}
withCommand("/bin/bash", "-c", standaloneBaseCommand);
final String clusterName = getEnvMap().getOrDefault("PULSAR_PREFIX_clusterName", "standalone");
final String response = String.format("[\"%s\"]", clusterName);
waitAllStrategy.withStrategy(
Wait.forHttp(ADMIN_CLUSTERS_ENDPOINT).forPort(BROKER_HTTP_PORT).forResponsePredicate(response::equals)
);
if (transactionsEnabled) {
withEnv("PULSAR_PREFIX_transactionCoordinatorEnabled", "true");
waitAllStrategy.withStrategy(
Wait.forHttp(TRANSACTION_TOPIC_ENDPOINT).forStatusCode(200).forPort(BROKER_HTTP_PORT)
);
}
if (functionsWorkerEnabled) {
waitAllStrategy.withStrategy(Wait.forLogMessage(".*Function worker service started.*", 1));
}
}
}