1
1
/**
2
2
* Copyright 2012 Netflix, Inc.
3
- *
3
+ *
4
4
* Licensed under the Apache License, Version 2.0 (the "License");
5
5
* you may not use this file except in compliance with the License.
6
6
* You may obtain a copy of the License at
7
- *
8
- * http://www.apache.org/licenses/LICENSE-2.0
9
- *
7
+ *
8
+ * http://www.apache.org/licenses/LICENSE-2.0
9
+ *
10
10
* Unless required by applicable law or agreed to in writing, software
11
11
* distributed under the License is distributed on an "AS IS" BASIS,
12
12
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
23
23
import java .util .concurrent .ConcurrentHashMap ;
24
24
import java .util .concurrent .ConcurrentLinkedQueue ;
25
25
import java .util .concurrent .atomic .AtomicInteger ;
26
+ import java .util .concurrent .atomic .AtomicReference ;
26
27
27
28
import javax .annotation .concurrent .NotThreadSafe ;
28
29
@@ -55,8 +56,6 @@ public class HystrixTimer {
55
56
56
57
private HystrixTimer () {
57
58
// private to prevent public instantiation
58
- // start up tick thread
59
- tickThread .start ();
60
59
}
61
60
62
61
/**
@@ -73,11 +72,18 @@ public static HystrixTimer getInstance() {
73
72
* </p>
74
73
*/
75
74
public static void reset () {
75
+ // interrupt the thread to shut it down
76
+ TickThread t = INSTANCE .tickThread .get ();
77
+ if (t != null ) {
78
+ t .interrupt ();
79
+ }
80
+ // clear the queues
76
81
INSTANCE .listenersPerInterval .clear ();
77
82
INSTANCE .intervals .clear ();
78
83
}
79
84
80
- private TickThread tickThread = new TickThread ();
85
+ // use AtomicReference so we can use CAS to ensure once-and-only-once initialization after reset
86
+ private AtomicReference <TickThread > tickThread = new AtomicReference <TickThread >();
81
87
private ConcurrentHashMap <Integer , ConcurrentLinkedQueue <Reference <TimerListener >>> listenersPerInterval = new ConcurrentHashMap <Integer , ConcurrentLinkedQueue <Reference <TimerListener >>>();
82
88
private ConcurrentLinkedQueue <TimerInterval > intervals = new ConcurrentLinkedQueue <TimerInterval >();
83
89
@@ -103,6 +109,8 @@ public static void reset() {
103
109
* @return reference to the TimerListener that allows cleanup via the <code>clear()</code> method
104
110
*/
105
111
public Reference <TimerListener > addTimerListener (TimerListener listener ) {
112
+ startThreadIfNeeded ();
113
+ // add the listener
106
114
if (!listenersPerInterval .containsKey (listener .getIntervalTimeInMilliseconds ())) {
107
115
listenersPerInterval .putIfAbsent (listener .getIntervalTimeInMilliseconds (), new ConcurrentLinkedQueue <Reference <TimerListener >>());
108
116
intervals .add (new TimerInterval (listener .getIntervalTimeInMilliseconds ()));
@@ -112,6 +120,21 @@ public Reference<TimerListener> addTimerListener(TimerListener listener) {
112
120
return reference ;
113
121
}
114
122
123
+ /**
124
+ * Since we allow resetting the timer (shutting down the thread) we need to lazily re-start it if it starts being used again.
125
+ * <p>
126
+ * This does the lazy initialization and start of the thread in a thread-safe manner while having little cost the rest of the time.
127
+ */
128
+ protected void startThreadIfNeeded () {
129
+ // create and start thread if one doesn't exist
130
+ if (tickThread .get () == null ) {
131
+ if (tickThread .compareAndSet (null , new TickThread ())) {
132
+ // start the thread that we 'won' setting
133
+ tickThread .get ().start ();
134
+ }
135
+ }
136
+ }
137
+
115
138
private class TickThread extends Thread {
116
139
117
140
TickThread () {
@@ -124,7 +147,7 @@ private class TickThread extends Thread {
124
147
public void run () {
125
148
long diffBetweenNowAndInterval = 0 ;
126
149
long timeToNextInterval = 0 ;
127
- while (true ) {
150
+ while (true && ! isInterrupted () ) {
128
151
try {
129
152
for (TimerInterval interval : intervals ) {
130
153
// if enough time has elapsed for this interval then tick the listeners
@@ -171,10 +194,17 @@ public void run() {
171
194
Thread .sleep (timeToNextInterval );
172
195
// reset to 0
173
196
timeToNextInterval = 0 ;
197
+ } catch (InterruptedException e ) {
198
+ logger .error ("Thread interrupted so will shut down." , e );
199
+ // mark status so the while loop will exit
200
+ Thread .currentThread ().interrupt ();
174
201
} catch (Exception e ) {
175
202
logger .error ("Error in TickThread run loop." , e );
176
203
}
177
204
}
205
+ logger .info ("HystrixTimer thread was interrupted so is shutting down." );
206
+ // clear out the state of this thread so it can be restarted (if reset hasn't already occurred)
207
+ tickThread .compareAndSet (this , null );
178
208
}
179
209
}
180
210
@@ -351,6 +381,52 @@ public void testSingleCommandRemoveListener() {
351
381
assertEquals (0 , l2 .tickCount .get ());
352
382
}
353
383
384
+ @ Test
385
+ public void testThreadInterrupt () {
386
+ HystrixTimer timer = HystrixTimer .getInstance ();
387
+ TestListener l1 = new TestListener (50 , "A" );
388
+ timer .addTimerListener (l1 );
389
+
390
+ TickThread t = timer .tickThread .get ();
391
+ // send interrupt
392
+ t .interrupt ();
393
+ // assert that the thread is shutdown
394
+ assertTrue (t .isInterrupted ());
395
+ try {
396
+ t .join (2000 );
397
+ } catch (InterruptedException e ) {
398
+ throw new RuntimeException ("thread should have died but didn't" , e );
399
+ }
400
+ assertFalse (t .isAlive ());
401
+
402
+ // assert the thread is now null after shutdown
403
+ TickThread t2 = timer .tickThread .get ();
404
+ assertNull (t2 );
405
+
406
+ // assert that it starts itself back up when we interact with it again
407
+ TestListener l2 = new TestListener (50 , "A" );
408
+ timer .addTimerListener (l2 );
409
+ TickThread t3 = timer .tickThread .get ();
410
+ assertNotNull (t3 );
411
+ assertNotSame (t , t2 );
412
+
413
+ // assert that reset shuts the thread down same as manual interrupt
414
+ TickThread t4 = INSTANCE .tickThread .get ();
415
+ assertNotNull (t4 );
416
+ // perform reset which should shut it down
417
+ HystrixTimer .reset ();
418
+
419
+ // assert that the thread is shutdown
420
+ try {
421
+ t4 .join (2000 );
422
+ } catch (InterruptedException e ) {
423
+ throw new RuntimeException ("thread should have died but didn't" , e );
424
+ }
425
+ assertFalse (t4 .isAlive ());
426
+
427
+ assertNull (timer .tickThread .get ());
428
+ }
429
+
354
430
private static class TestListener implements TimerListener {
355
431
356
432
private final int interval ;
0 commit comments