18
18
*/
19
19
package org .apache .pinot .core .transport .grpc ;
20
20
21
+ import io .grpc .Attributes ;
22
+ import io .grpc .Grpc ;
21
23
import io .grpc .Server ;
22
24
import io .grpc .ServerBuilder ;
25
+ import io .grpc .ServerTransportFilter ;
23
26
import io .grpc .Status ;
24
27
import io .grpc .netty .shaded .io .grpc .netty .GrpcSslContexts ;
25
28
import io .grpc .netty .shaded .io .grpc .netty .NettyServerBuilder ;
33
36
import java .util .concurrent .ConcurrentHashMap ;
34
37
import java .util .concurrent .ExecutorService ;
35
38
import java .util .concurrent .Executors ;
39
+ import java .util .concurrent .TimeUnit ;
36
40
import nl .altindag .ssl .SSLFactory ;
37
41
import org .apache .pinot .common .config .GrpcConfig ;
38
42
import org .apache .pinot .common .config .TlsConfig ;
39
43
import org .apache .pinot .common .datatable .DataTable ;
40
44
import org .apache .pinot .common .metrics .ServerMeter ;
41
45
import org .apache .pinot .common .metrics .ServerMetrics ;
46
+ import org .apache .pinot .common .metrics .ServerTimer ;
42
47
import org .apache .pinot .common .proto .PinotQueryServerGrpc ;
43
48
import org .apache .pinot .common .proto .Server .ServerRequest ;
44
49
import org .apache .pinot .common .proto .Server .ServerResponse ;
@@ -72,19 +77,41 @@ public class GrpcQueryServer extends PinotQueryServerGrpc.PinotQueryServerImplBa
72
77
private final AccessControl _accessControl ;
73
78
private final ServerQueryLogger _queryLogger = ServerQueryLogger .getInstance ();
74
79
80
+ // Filter to keep track of gRPC connections.
81
+ private class GrpcQueryTransportFilter extends ServerTransportFilter {
82
+ @ Override
83
+ public Attributes transportReady (Attributes transportAttrs ) {
84
+ LOGGER .info ("gRPC transportReady: REMOTE_ADDR {}" ,
85
+ transportAttrs != null ? transportAttrs .get (Grpc .TRANSPORT_ATTR_REMOTE_ADDR ) : "null" );
86
+ _serverMetrics .addMeteredGlobalValue (ServerMeter .GRPC_TRANSPORT_READY , 1 );
87
+ return super .transportReady (transportAttrs );
88
+ }
89
+
90
+ @ Override
91
+ public void transportTerminated (Attributes transportAttrs ) {
92
+ // transportTerminated can be called without transportReady before it, e.g. handshake fails
93
+ // So, don't emit metrics if transportAttrs is null
94
+ if (transportAttrs != null ) {
95
+ LOGGER .info ("gRPC transportTerminated: REMOTE_ADDR {}" , transportAttrs .get (Grpc .TRANSPORT_ATTR_REMOTE_ADDR ));
96
+ _serverMetrics .addMeteredGlobalValue (ServerMeter .GRPC_TRANSPORT_TERMINATED , 1 );
97
+ }
98
+ }
99
+ }
100
+
75
101
public GrpcQueryServer (int port , GrpcConfig config , TlsConfig tlsConfig , QueryExecutor queryExecutor ,
76
102
ServerMetrics serverMetrics , AccessControl accessControl ) {
77
103
_queryExecutor = queryExecutor ;
78
104
_serverMetrics = serverMetrics ;
79
105
if (tlsConfig != null ) {
80
106
try {
81
107
_server = NettyServerBuilder .forPort (port ).sslContext (buildGRpcSslContext (tlsConfig ))
82
- .maxInboundMessageSize (config .getMaxInboundMessageSizeBytes ()).addService (this ).build ();
108
+ .maxInboundMessageSize (config .getMaxInboundMessageSizeBytes ()).addService (this )
109
+ .addTransportFilter (new GrpcQueryTransportFilter ()).build ();
83
110
} catch (Exception e ) {
84
111
throw new RuntimeException ("Failed to start secure grpcQueryServer" , e );
85
112
}
86
113
} else {
87
- _server = ServerBuilder .forPort (port ).addService (this ).build ();
114
+ _server = ServerBuilder .forPort (port ).addService (this ).addTransportFilter ( new GrpcQueryTransportFilter ()). build ();
88
115
}
89
116
_accessControl = accessControl ;
90
117
LOGGER .info ("Initialized GrpcQueryServer on port: {} with numWorkerThreads: {}" , port ,
@@ -136,6 +163,7 @@ public void shutdown() {
136
163
137
164
@ Override
138
165
public void submit (ServerRequest request , StreamObserver <ServerResponse > responseObserver ) {
166
+ long startTime = System .nanoTime ();
139
167
_serverMetrics .addMeteredGlobalValue (ServerMeter .GRPC_QUERIES , 1 );
140
168
_serverMetrics .addMeteredGlobalValue (ServerMeter .GRPC_BYTES_RECEIVED , request .getSerializedSize ());
141
169
@@ -162,13 +190,16 @@ public void submit(ServerRequest request, StreamObserver<ServerResponse> respons
162
190
_serverMetrics .addMeteredGlobalValue (ServerMeter .NO_TABLE_ACCESS , 1 );
163
191
responseObserver .onError (
164
192
Status .NOT_FOUND .withDescription (exceptionMsg ).withCause (unsupportedOperationException ).asException ());
193
+ return ;
165
194
}
166
195
167
196
// Process the query
168
197
InstanceResponseBlock instanceResponse ;
169
198
try {
170
- instanceResponse =
171
- _queryExecutor .execute (queryRequest , _executorService , new GrpcResultsBlockStreamer (responseObserver ));
199
+ LOGGER .info ("Executing gRPC query request {}: {} received from broker: {}" , queryRequest .getRequestId (),
200
+ queryRequest .getQueryContext (), queryRequest .getBrokerId ());
201
+ instanceResponse = _queryExecutor .execute (queryRequest , _executorService ,
202
+ new GrpcResultsBlockStreamer (responseObserver , _serverMetrics ));
172
203
} catch (Exception e ) {
173
204
LOGGER .error ("Caught exception while processing request {}: {} from broker: {}" , queryRequest .getRequestId (),
174
205
queryRequest .getQueryContext (), queryRequest .getBrokerId (), e );
@@ -192,6 +223,8 @@ public void submit(ServerRequest request, StreamObserver<ServerResponse> respons
192
223
responseObserver .onNext (serverResponse );
193
224
_serverMetrics .addMeteredGlobalValue (ServerMeter .GRPC_BYTES_SENT , serverResponse .getSerializedSize ());
194
225
responseObserver .onCompleted ();
226
+ _serverMetrics .addTimedTableValue (queryRequest .getTableNameWithType (), ServerTimer .GRPC_QUERY_EXECUTION_MS ,
227
+ System .nanoTime () - startTime , TimeUnit .NANOSECONDS );
195
228
196
229
// Log the query
197
230
if (_queryLogger != null ) {
0 commit comments