Skip to content

Commit 6b857d0

Browse files
author
Matt Jacobs
committed
Added parent class to encapsulate shared SSE logic
1 parent 7c33691 commit 6b857d0

File tree

4 files changed

+270
-570
lines changed

4 files changed

+270
-570
lines changed

hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixConfigSseServlet.java

+22-155
Original file line numberDiff line numberDiff line change
@@ -27,22 +27,12 @@
2727
import com.netflix.hystrix.config.HystrixConfiguration;
2828
import com.netflix.hystrix.config.HystrixConfigurationStream;
2929
import com.netflix.hystrix.config.HystrixThreadPoolConfiguration;
30-
import org.slf4j.Logger;
31-
import org.slf4j.LoggerFactory;
32-
import rx.Subscriber;
33-
import rx.Subscription;
30+
import rx.Observable;
3431
import rx.functions.Func1;
35-
import rx.schedulers.Schedulers;
3632

37-
import javax.servlet.ServletException;
38-
import javax.servlet.http.HttpServlet;
39-
import javax.servlet.http.HttpServletRequest;
40-
import javax.servlet.http.HttpServletResponse;
4133
import java.io.IOException;
42-
import java.io.PrintWriter;
4334
import java.io.StringWriter;
4435
import java.util.Map;
45-
import java.util.concurrent.atomic.AtomicBoolean;
4636
import java.util.concurrent.atomic.AtomicInteger;
4737

4838
/**
@@ -66,88 +56,54 @@
6656
* </servlet-mapping>
6757
* } </pre>
6858
*/
69-
public class HystrixConfigSseServlet extends HttpServlet {
59+
public class HystrixConfigSseServlet extends HystrixSampleSseServlet<HystrixConfiguration> {
7060

7161
private static final long serialVersionUID = -3599771169762858235L;
7262

73-
private static final Logger logger = LoggerFactory.getLogger(HystrixConfigSseServlet.class);
74-
75-
private static final String DELAY_REQ_PARAM_NAME = "delay";
7663
private static final int DEFAULT_ONNEXT_DELAY_IN_MS = 10000;
7764

78-
private final Func1<Integer, HystrixConfigurationStream> createStream;
7965
private JsonFactory jsonFactory = new JsonFactory();
8066

8167
/* used to track number of connections and throttle */
8268
private static AtomicInteger concurrentConnections = new AtomicInteger(0);
8369
private static DynamicIntProperty maxConcurrentConnections = DynamicPropertyFactory.getInstance().getIntProperty("hystrix.config.stream.maxConcurrentConnections", 5);
8470

85-
private static volatile boolean isDestroyed = false;
86-
8771
public HystrixConfigSseServlet() {
88-
this.createStream = new Func1<Integer, HystrixConfigurationStream>() {
72+
super(new Func1<Integer, Observable<HystrixConfiguration>>() {
8973
@Override
90-
public HystrixConfigurationStream call(Integer delay) {
91-
return new HystrixConfigurationStream(delay);
74+
public Observable<HystrixConfiguration> call(Integer delay) {
75+
return new HystrixConfigurationStream(delay).observe();
9276
}
93-
};
94-
}
95-
96-
/* package-private */ HystrixConfigSseServlet(Func1<Integer, HystrixConfigurationStream> createStream) {
97-
this.createStream = createStream;
77+
});
9878
}
9979

100-
/**
101-
* WebSphere won't shutdown a servlet until after a 60 second timeout if there is an instance of the servlet executing
102-
* a request. Add this method to enable a hook to notify Hystrix to shutdown. You must invoke this method at
103-
* shutdown, perhaps from some other servlet's destroy() method.
104-
*/
105-
public static void shutdown() {
106-
isDestroyed = true;
80+
/* package-private */ HystrixConfigSseServlet(Func1<Integer, Observable<HystrixConfiguration>> createStream) {
81+
super(createStream);
10782
}
10883

10984
@Override
110-
public void init() throws ServletException {
111-
isDestroyed = false;
85+
int getDefaultDelayInMilliseconds() {
86+
return DEFAULT_ONNEXT_DELAY_IN_MS;
11287
}
11388

114-
/**
115-
* Handle incoming GETs
116-
*/
11789
@Override
118-
protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
119-
if (isDestroyed) {
120-
response.sendError(503, "Service has been shut down.");
121-
} else {
122-
handleRequest(request, response);
123-
}
90+
int getMaxNumberConcurrentConnectionsAllowed() {
91+
return maxConcurrentConnections.get();
12492
}
12593

126-
/**
127-
* Handle servlet being undeployed by gracefully releasing connections so poller threads stop.
128-
*/
12994
@Override
130-
public void destroy() {
131-
/* set marker so the loops can break out */
132-
isDestroyed = true;
133-
super.destroy();
95+
int getNumberCurrentConnections() {
96+
return concurrentConnections.get();
13497
}
13598

136-
/* package-private */ int getNumberCurrentConnections() {
137-
return concurrentConnections.get();
99+
@Override
100+
protected int incrementAndGetCurrentConcurrentConnections() {
101+
return concurrentConnections.incrementAndGet();
138102
}
139103

140-
/* package-private */
141-
static int getDelayFromHttpRequest(HttpServletRequest req) {
142-
try {
143-
String delay = req.getParameter(DELAY_REQ_PARAM_NAME);
144-
if (delay != null) {
145-
return Math.max(Integer.parseInt(delay), 1);
146-
}
147-
} catch (Throwable ex) {
148-
//silently fail
149-
}
150-
return DEFAULT_ONNEXT_DELAY_IN_MS;
104+
@Override
105+
protected void decrementCurrentConcurrentConnections() {
106+
concurrentConnections.decrementAndGet();
151107
}
152108

153109
private void writeCommandConfigJson(JsonGenerator json, HystrixCommandKey key, HystrixCommandConfiguration commandConfig) throws IOException {
@@ -215,7 +171,8 @@ private void writeCollapserConfigJson(JsonGenerator json, HystrixCollapserKey co
215171
json.writeEndObject();
216172
}
217173

218-
private String convertToString(HystrixConfiguration config) throws IOException {
174+
@Override
175+
protected String convertToString(HystrixConfiguration config) throws IOException {
219176
StringWriter jsonString = new StringWriter();
220177
JsonGenerator json = jsonFactory.createGenerator(jsonString);
221178

@@ -250,95 +207,5 @@ private String convertToString(HystrixConfiguration config) throws IOException {
250207

251208
return jsonString.getBuffer().toString();
252209
}
253-
254-
/**
255-
* - maintain an open connection with the client
256-
* - on initial connection send latest data of each requested event type
257-
* - subsequently send all changes for each requested event type
258-
*
259-
* @param request incoming HTTP Request
260-
* @param response outgoing HTTP Response (as a streaming response)
261-
* @throws javax.servlet.ServletException
262-
* @throws java.io.IOException
263-
*/
264-
private void handleRequest(HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException {
265-
final AtomicBoolean moreDataWillBeSent = new AtomicBoolean(true);
266-
Subscription configSubscription = null;
267-
268-
/* ensure we aren't allowing more connections than we want */
269-
int numberConnections = concurrentConnections.incrementAndGet();
270-
try {
271-
if (numberConnections > maxConcurrentConnections.get()) {
272-
response.sendError(503, "MaxConcurrentConnections reached: " + maxConcurrentConnections.get());
273-
} else {
274-
int delay = getDelayFromHttpRequest(request);
275-
276-
/* initialize response */
277-
response.setHeader("Content-Type", "text/event-stream;charset=UTF-8");
278-
response.setHeader("Cache-Control", "no-cache, no-store, max-age=0, must-revalidate");
279-
response.setHeader("Pragma", "no-cache");
280-
281-
final PrintWriter writer = response.getWriter();
282-
283-
HystrixConfigurationStream configurationStream = createStream.call(delay);
284-
285-
//since the config stream is based on Observable.interval, events will get published on an RxComputation thread
286-
//since writing to the servlet response is blocking, use the Rx IO thread for the write that occurs in the onNext
287-
configSubscription = configurationStream
288-
.observe()
289-
.observeOn(Schedulers.io())
290-
.subscribe(new Subscriber<HystrixConfiguration>() {
291-
@Override
292-
public void onCompleted() {
293-
logger.error("HystrixConfigSseServlet received unexpected OnCompleted from config stream");
294-
moreDataWillBeSent.set(false);
295-
}
296-
297-
@Override
298-
public void onError(Throwable e) {
299-
moreDataWillBeSent.set(false);
300-
}
301-
302-
@Override
303-
public void onNext(HystrixConfiguration hystrixConfiguration) {
304-
if (hystrixConfiguration != null) {
305-
String configAsStr = null;
306-
try {
307-
configAsStr = convertToString(hystrixConfiguration);
308-
} catch (IOException ioe) {
309-
//exception while converting String to JSON
310-
logger.error("Error converting configuration to JSON ", ioe);
311-
}
312-
if (configAsStr != null) {
313-
try {
314-
writer.print("data: " + configAsStr + "\n\n");
315-
// explicitly check for client disconnect - PrintWriter does not throw exceptions
316-
if (writer.checkError()) {
317-
throw new IOException("io error");
318-
}
319-
writer.flush();
320-
} catch (IOException ioe) {
321-
moreDataWillBeSent.set(false);
322-
}
323-
}
324-
}
325-
}
326-
});
327-
328-
while (moreDataWillBeSent.get() && !isDestroyed) {
329-
try {
330-
Thread.sleep(delay);
331-
} catch (InterruptedException e) {
332-
moreDataWillBeSent.set(false);
333-
}
334-
}
335-
}
336-
} finally {
337-
concurrentConnections.decrementAndGet();
338-
if (configSubscription != null && !configSubscription.isUnsubscribed()) {
339-
configSubscription.unsubscribe();
340-
}
341-
}
342-
}
343210
}
344211

0 commit comments

Comments
 (0)