@@ -3,16 +3,24 @@ package integration_tests
3
3
import (
4
4
"context"
5
5
"fmt"
6
+ "github.com/datastax/go-cassandra-native-protocol/message"
7
+ "github.com/datastax/go-cassandra-native-protocol/primitive"
8
+ "github.com/datastax/zdm-proxy/integration-tests/client"
6
9
"github.com/datastax/zdm-proxy/integration-tests/setup"
7
10
"github.com/datastax/zdm-proxy/integration-tests/utils"
8
11
"github.com/datastax/zdm-proxy/proxy/pkg/config"
9
12
"github.com/datastax/zdm-proxy/proxy/pkg/health"
10
13
"github.com/datastax/zdm-proxy/proxy/pkg/httpzdmproxy"
11
14
"github.com/datastax/zdm-proxy/proxy/pkg/metrics"
12
15
"github.com/datastax/zdm-proxy/proxy/pkg/runner"
16
+ "github.com/datastax/zdm-proxy/proxy/pkg/zdmproxy"
17
+ "github.com/jpillora/backoff"
18
+ log "github.com/sirupsen/logrus"
13
19
"github.com/stretchr/testify/require"
14
20
"net/http"
21
+ "strings"
15
22
"sync"
23
+ "sync/atomic"
16
24
"testing"
17
25
"time"
18
26
)
@@ -42,6 +50,10 @@ func TestWithHttpHandlers(t *testing.T) {
42
50
t .Run ("testHttpEndpointsWithUnavailableNode" , func (t * testing.T ) {
43
51
testHttpEndpointsWithUnavailableNode (t , metricsHandler , readinessHandler )
44
52
})
53
+
54
+ t .Run ("testMetricsWithUnavailableNode" , func (t * testing.T ) {
55
+ testMetricsWithUnavailableNode (t , metricsHandler )
56
+ })
45
57
}
46
58
47
59
func testHttpEndpointsWithProxyNotInitialized (
@@ -137,6 +149,86 @@ func testHttpEndpointsWithProxyInitialized(
137
149
require .Equal (t , health .UP , report .Status )
138
150
}
139
151
152
+ func testMetricsWithUnavailableNode (
153
+ t * testing.T , metricsHandler * httpzdmproxy.HandlerWithFallback ) {
154
+
155
+ simulacronSetup , err := setup .NewSimulacronTestSetupWithSession (t , false , false )
156
+ require .Nil (t , err )
157
+ defer simulacronSetup .Cleanup ()
158
+
159
+ conf := setup .NewTestConfig (simulacronSetup .Origin .GetInitialContactPoint (), simulacronSetup .Target .GetInitialContactPoint ())
160
+ modifyConfForHealthTests (conf , 2 )
161
+
162
+ waitGroup := & sync.WaitGroup {}
163
+ ctx , cancelFunc := context .WithCancel (context .Background ())
164
+
165
+ defer waitGroup .Wait ()
166
+ defer cancelFunc ()
167
+
168
+ srv := httpzdmproxy .StartHttpServer (fmt .Sprintf ("%s:%d" , conf .MetricsAddress , conf .MetricsPort ), waitGroup )
169
+ defer func (srv * http.Server , ctx context.Context ) {
170
+ err := srv .Shutdown (ctx )
171
+ if err != nil {
172
+ log .Error ("Failed to shutdown metrics server:" , err .Error ())
173
+ }
174
+ }(srv , ctx )
175
+
176
+ b := & backoff.Backoff {
177
+ Factor : 2 ,
178
+ Jitter : false ,
179
+ Min : 100 * time .Millisecond ,
180
+ Max : 500 * time .Millisecond ,
181
+ }
182
+ proxy := atomic.Value {}
183
+ waitGroup .Add (1 )
184
+ go func () {
185
+ defer waitGroup .Done ()
186
+ p , err := zdmproxy .RunWithRetries (conf , ctx , b )
187
+ if err == nil {
188
+ metricsHandler .SetHandler (p .GetMetricHandler ().GetHttpHandler ())
189
+ proxy .Store (& p )
190
+ <- ctx .Done ()
191
+ p .Shutdown ()
192
+ }
193
+ }()
194
+
195
+ httpAddr := fmt .Sprintf ("%s:%d" , conf .MetricsAddress , conf .MetricsPort )
196
+
197
+ // check that metrics endpoint has been initialized
198
+ utils .RequireWithRetries (t , func () (err error , fatal bool ) {
199
+ fatal = false
200
+ err = utils .CheckMetricsEndpointResult (httpAddr , true )
201
+ return
202
+ }, 10 , 100 * time .Millisecond )
203
+
204
+ // stop origin cluster
205
+ err = simulacronSetup .Origin .DisableConnectionListener ()
206
+ require .Nil (t , err , "failed to disable origin connection listener: %v" , err )
207
+ err = simulacronSetup .Origin .DropAllConnections ()
208
+ require .Nil (t , err , "failed to drop origin connections: %v" , err )
209
+
210
+ // send a request
211
+ testClient , err := client .NewTestClient (context .Background (), "127.0.0.1:14002" )
212
+ require .Nil (t , err )
213
+ queryMsg := & message.Query {
214
+ Query : "SELECT * FROM table1" ,
215
+ }
216
+ _ , _ , _ = testClient .SendMessage (context .Background (), primitive .ProtocolVersion4 , queryMsg )
217
+
218
+ utils .RequireWithRetries (t , func () (err error , fatal bool ) {
219
+ // expect connection failure to origin cluster
220
+ statusCode , rspStr , err := utils .GetMetrics (httpAddr )
221
+ require .Nil (t , err )
222
+ require .Equal (t , http .StatusOK , statusCode )
223
+ if ! strings .Contains (rspStr , fmt .Sprintf ("%v 1" , getPrometheusName ("zdm" , metrics .FailedConnectionsOrigin ))) {
224
+ err = fmt .Errorf ("did not observe failed connection attempts" )
225
+ } else {
226
+ err = nil
227
+ }
228
+ return
229
+ }, 10 , 500 * time .Millisecond )
230
+ }
231
+
140
232
func testHttpEndpointsWithUnavailableNode (
141
233
t * testing.T , metricsHandler * httpzdmproxy.HandlerWithFallback , healthHandler * httpzdmproxy.HandlerWithFallback ) {
142
234
0 commit comments