Skip to content

Commit 2bae17d

Browse files
committed
Merge pull request #1061 from mattrjacobs/buffered-request-stream
Buffered request stream
2 parents 56ef8c2 + fc10998 commit 2bae17d

19 files changed

+2237
-33
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,274 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.netflix.hystrix.contrib.requests.stream;
17+
18+
import com.fasterxml.jackson.core.JsonFactory;
19+
import com.fasterxml.jackson.core.JsonGenerator;
20+
import com.netflix.config.DynamicIntProperty;
21+
import com.netflix.config.DynamicPropertyFactory;
22+
import com.netflix.hystrix.HystrixEventType;
23+
import com.netflix.hystrix.HystrixInvokableInfo;
24+
import com.netflix.hystrix.metric.HystrixRequestEvents;
25+
import com.netflix.hystrix.metric.HystrixRequestEventsStream;
26+
import org.slf4j.Logger;
27+
import org.slf4j.LoggerFactory;
28+
import rx.Subscriber;
29+
import rx.Subscription;
30+
import rx.schedulers.Schedulers;
31+
32+
import javax.servlet.ServletException;
33+
import javax.servlet.http.HttpServlet;
34+
import javax.servlet.http.HttpServletRequest;
35+
import javax.servlet.http.HttpServletResponse;
36+
import java.io.IOException;
37+
import java.io.PrintWriter;
38+
import java.io.StringWriter;
39+
import java.util.ArrayList;
40+
import java.util.Collection;
41+
import java.util.List;
42+
import java.util.concurrent.LinkedBlockingQueue;
43+
import java.util.concurrent.atomic.AtomicBoolean;
44+
import java.util.concurrent.atomic.AtomicInteger;
45+
46+
/**
47+
*/
48+
public class HystrixRequestEventsSseServlet extends HttpServlet {
49+
50+
private static final Logger logger = LoggerFactory.getLogger(HystrixRequestEventsSseServlet.class);
51+
52+
private static volatile boolean isDestroyed = false;
53+
54+
private static final String DELAY_REQ_PARAM_NAME = "delay";
55+
private static final int DEFAULT_DELAY_IN_MILLISECONDS = 10000;
56+
private static final int DEFAULT_QUEUE_DEPTH = 1000;
57+
private static final String PING = "\n: ping\n";
58+
59+
/* used to track number of connections and throttle */
60+
private static AtomicInteger concurrentConnections = new AtomicInteger(0);
61+
private static DynamicIntProperty maxConcurrentConnections =
62+
DynamicPropertyFactory.getInstance().getIntProperty("hystrix.requests.stream.maxConcurrentConnections", 5);
63+
64+
private final LinkedBlockingQueue<HystrixRequestEvents> requestQueue = new LinkedBlockingQueue<HystrixRequestEvents>(DEFAULT_QUEUE_DEPTH);
65+
private final JsonFactory jsonFactory = new JsonFactory();
66+
67+
/**
68+
* Handle incoming GETs
69+
*/
70+
@Override
71+
protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
72+
if (isDestroyed) {
73+
response.sendError(503, "Service has been shut down.");
74+
} else {
75+
handleRequest(request, response);
76+
}
77+
}
78+
79+
/* package-private */
80+
int getDelayFromHttpRequest(HttpServletRequest req) {
81+
try {
82+
String delay = req.getParameter(DELAY_REQ_PARAM_NAME);
83+
if (delay != null) {
84+
return Math.max(Integer.parseInt(delay), 1);
85+
}
86+
} catch (Throwable ex) {
87+
//silently fail
88+
}
89+
return DEFAULT_DELAY_IN_MILLISECONDS;
90+
}
91+
92+
/**
93+
* WebSphere won't shutdown a servlet until after a 60 second timeout if there is an instance of the servlet executing
94+
* a request. Add this method to enable a hook to notify Hystrix to shutdown. You must invoke this method at
95+
* shutdown, perhaps from some other servlet's destroy() method.
96+
*/
97+
public static void shutdown() {
98+
isDestroyed = true;
99+
}
100+
101+
@Override
102+
public void init() throws ServletException {
103+
isDestroyed = false;
104+
}
105+
106+
/**
107+
* Handle servlet being undeployed by gracefully releasing connections so poller threads stop.
108+
*/
109+
@Override
110+
public void destroy() {
111+
/* set marker so the loops can break out */
112+
isDestroyed = true;
113+
super.destroy();
114+
}
115+
116+
private String convertToString(Collection<HystrixRequestEvents> requests) throws IOException {
117+
StringWriter jsonString = new StringWriter();
118+
JsonGenerator json = jsonFactory.createGenerator(jsonString);
119+
120+
json.writeStartArray();
121+
for (HystrixRequestEvents request : requests) {
122+
convertRequestToJson(json, request);
123+
}
124+
json.writeEndArray();
125+
json.close();
126+
return jsonString.getBuffer().toString();
127+
}
128+
129+
private void convertRequestToJson(JsonGenerator json, HystrixRequestEvents request) throws IOException {
130+
json.writeStartObject();
131+
json.writeStringField("request", request.getRequestContext().toString());
132+
json.writeObjectFieldStart("commands");
133+
for (HystrixInvokableInfo<?> execution: request.getExecutions()) {
134+
convertExecutionToJson(json, execution);
135+
}
136+
json.writeEndObject();
137+
json.writeEndObject();
138+
}
139+
140+
private void convertExecutionToJson(JsonGenerator json, HystrixInvokableInfo<?> execution) throws IOException {
141+
json.writeObjectFieldStart(execution.getCommandKey().name());
142+
json.writeNumberField("latency", execution.getExecutionTimeInMilliseconds());
143+
json.writeArrayFieldStart("events");
144+
for (HystrixEventType eventType: execution.getExecutionEvents()) {
145+
switch (eventType) {
146+
case EMIT:
147+
json.writeStartObject();
148+
json.writeNumberField(eventType.name(), execution.getNumberEmissions());
149+
json.writeEndObject();
150+
break;
151+
case FALLBACK_EMIT:
152+
json.writeStartObject();
153+
json.writeNumberField(eventType.name(), execution.getNumberFallbackEmissions());
154+
json.writeEndObject();
155+
break;
156+
case COLLAPSED:
157+
json.writeStartObject();
158+
json.writeNumberField(eventType.name(), execution.getNumberCollapsed());
159+
json.writeEndObject();
160+
break;
161+
default:
162+
json.writeString(eventType.name());
163+
break;
164+
}
165+
}
166+
json.writeEndArray();
167+
json.writeEndObject();
168+
}
169+
170+
/**
171+
* - maintain an open connection with the client
172+
* - on initial connection send latest data of each requested event type
173+
* - subsequently send all changes for each requested event type
174+
*
175+
* @param request incoming HTTP Request
176+
* @param response outgoing HTTP Response (as a streaming response)
177+
* @throws javax.servlet.ServletException
178+
* @throws java.io.IOException
179+
*/
180+
private void handleRequest(HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException {
181+
final AtomicBoolean moreDataWillBeSent = new AtomicBoolean(true);
182+
Subscription requestsSubscription = null;
183+
184+
/* ensure we aren't allowing more connections than we want */
185+
int numberConnections = concurrentConnections.incrementAndGet();
186+
try {
187+
int maxNumberConnectionsAllowed = maxConcurrentConnections.get();
188+
if (numberConnections > maxNumberConnectionsAllowed) {
189+
response.sendError(503, "MaxConcurrentConnections reached: " + maxNumberConnectionsAllowed);
190+
} else {
191+
int delay = getDelayFromHttpRequest(request);
192+
193+
/* initialize response */
194+
response.setHeader("Content-Type", "text/event-stream;charset=UTF-8");
195+
response.setHeader("Cache-Control", "no-cache, no-store, max-age=0, must-revalidate");
196+
response.setHeader("Pragma", "no-cache");
197+
198+
final PrintWriter writer = response.getWriter();
199+
200+
//since the sample stream is based on Observable.interval, events will get published on an RxComputation thread
201+
//since writing to the servlet response is blocking, use the Rx IO thread for the write that occurs in the onNext
202+
requestsSubscription = HystrixRequestEventsStream.getInstance()
203+
.observe()
204+
.observeOn(Schedulers.io())
205+
.subscribe(new Subscriber<HystrixRequestEvents>() {
206+
@Override
207+
public void onCompleted() {
208+
logger.error("HystrixRequestEventsSseServlet received unexpected OnCompleted from request stream");
209+
moreDataWillBeSent.set(false);
210+
}
211+
212+
@Override
213+
public void onError(Throwable e) {
214+
moreDataWillBeSent.set(false);
215+
}
216+
217+
@Override
218+
public void onNext(HystrixRequestEvents requestEvents) {
219+
if (requestEvents != null) {
220+
requestQueue.offer(requestEvents);
221+
}
222+
}
223+
});
224+
225+
while (moreDataWillBeSent.get() && !isDestroyed) {
226+
try {
227+
if (requestQueue.isEmpty()) {
228+
try {
229+
writer.print(PING);
230+
writer.flush();
231+
} catch (Throwable t) {
232+
throw new IOException("Exception while writing ping");
233+
}
234+
235+
if (writer.checkError()) {
236+
throw new IOException("io error");
237+
}
238+
} else {
239+
List<HystrixRequestEvents> l = new ArrayList<HystrixRequestEvents>();
240+
requestQueue.drainTo(l);
241+
String requestEventsAsStr = convertToString(l);
242+
//try {
243+
//} catch (IOException ioe) {
244+
// //exception while converting String to JSON
245+
// logger.error("Error converting configuration to JSON ", ioe);
246+
//}
247+
if (requestEventsAsStr != null) {
248+
try {
249+
writer.print("data: " + requestEventsAsStr + "\n\n");
250+
// explicitly check for client disconnect - PrintWriter does not throw exceptions
251+
if (writer.checkError()) {
252+
throw new IOException("io error");
253+
}
254+
writer.flush();
255+
} catch (IOException ioe) {
256+
moreDataWillBeSent.set(false);
257+
}
258+
}
259+
}
260+
Thread.sleep(delay);
261+
} catch (InterruptedException e) {
262+
moreDataWillBeSent.set(false);
263+
}
264+
}
265+
}
266+
} finally {
267+
concurrentConnections.decrementAndGet();
268+
if (requestsSubscription != null && !requestsSubscription.isUnsubscribed()) {
269+
requestsSubscription.unsubscribe();
270+
}
271+
}
272+
}
273+
}
274+

0 commit comments

Comments
 (0)