diff --git a/config/confighttp/confighttp.go b/config/confighttp/confighttp.go index 039c210043e..8bdff2493b5 100644 --- a/config/confighttp/confighttp.go +++ b/config/confighttp/confighttp.go @@ -24,6 +24,8 @@ import ( "github.com/rs/cors" "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" "go.opentelemetry.io/otel" + "golang.org/x/net/http2" + "golang.org/x/net/http2/h2c" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config" @@ -198,6 +200,21 @@ func (hss *HTTPServerSettings) ToServer(handler http.Handler, opts ...ToServerOp o(serverOpts) } + // Declare an HTTP/1 server which: + // Handles only HTTP/1 traffic over TLS or non-TLS + h1Server := &http.Server{ + Addr: hss.Endpoint, + } + + // Declare an HTTP/2 server which will handle any H2 protocol traffic + h2Server := &http2.Server{ + NewWriteScheduler: func() http2.WriteScheduler { return http2.NewPriorityWriteScheduler(nil) }, + } + + // Configure the HTTP/1 server to serve TLS ALPN=h2 traffic onto the H2 server + // The handler specified on the HTTP/1 server will be via H2 transport + http2.ConfigureServer(h1Server, h2Server) + handler = middleware.HTTPContentDecompressor( handler, middleware.WithErrorHandler(serverOpts.errorHandler), @@ -225,7 +242,17 @@ func (hss *HTTPServerSettings) ToServer(handler http.Handler, opts ...ToServerOp }), ) - return &http.Server{ - Handler: handler, + // Enable the H2C handler if and only if there are no TLS settings in + // accordance with rfc7540 section-3.4 + if hss.TLSSetting == nil { + // Configure the H1 server to intercept HTTP/2 with prior + // knowledge and handle that with the H2 server. + // This allows H2 upgrade via the HTTP/1 server path + handler = h2c.NewHandler(handler, h2Server) } + + // Set the handler on the HTTP1 server + h1Server.Handler = handler + + return h1Server } diff --git a/go.mod b/go.mod index 9b0bd53224f..75da8c4f16c 100644 --- a/go.mod +++ b/go.mod @@ -52,6 +52,7 @@ require ( go.opentelemetry.io/otel/trace v1.0.0-RC2 go.uber.org/atomic v1.9.0 go.uber.org/zap v1.18.1 + golang.org/x/net v0.0.0-20210614182718-04defd469f4e golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1 golang.org/x/text v0.3.6 google.golang.org/genproto v0.0.0-20210604141403-392c879c8b08 diff --git a/internal/middleware/compression.go b/internal/middleware/compression.go index 8b0f3cdd09e..278916864a4 100644 --- a/internal/middleware/compression.go +++ b/internal/middleware/compression.go @@ -109,7 +109,7 @@ func HTTPContentDecompressor(h http.Handler, opts ...DecompressorOption) http.Ha func (d *decompressor) wrap(h http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - newBody, err := newBodyReader(r) + newBody, err := NewBodyDecompressor(r.Body, r.Header.Get("Content-Encoding")) if err != nil { d.errorHandler(w, r, err.Error(), http.StatusBadRequest) return @@ -128,16 +128,19 @@ func (d *decompressor) wrap(h http.Handler) http.Handler { }) } -func newBodyReader(r *http.Request) (io.ReadCloser, error) { - switch r.Header.Get("Content-Encoding") { +// NewBodyDecompressor takes a content encoding string and an io.ReadCloser and +// returns a new reader that, when read, contains the decompressed content. +// It supports gzip and deflate/zlib compression. +func NewBodyDecompressor(body io.ReadCloser, contentEncoding string) (io.ReadCloser, error) { + switch contentEncoding { case "gzip": - gr, err := gzip.NewReader(r.Body) + gr, err := gzip.NewReader(body) if err != nil { return nil, err } return gr, nil case "deflate", "zlib": - zr, err := zlib.NewReader(r.Body) + zr, err := zlib.NewReader(body) if err != nil { return nil, err } diff --git a/receiver/otlpreceiver/config.go b/receiver/otlpreceiver/config.go index 9cc17411046..e703275fd0b 100644 --- a/receiver/otlpreceiver/config.go +++ b/receiver/otlpreceiver/config.go @@ -28,6 +28,7 @@ const ( protoGRPC = "grpc" protoHTTP = "http" protocolsFieldName = "protocols" + experimentalField = "experimental_server" ) // Protocols is the configuration for the supported protocols. @@ -41,6 +42,10 @@ type Config struct { config.ReceiverSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct // Protocols is the configuration for the supported protocols, currently gRPC and HTTP (Proto and JSON). Protocols `mapstructure:"protocols"` + + // ExperimentalServerEnabled enables the HTTP server to serve all the protocols. + // Currently: gRPC and HTTP (Proto and JSON). + ExperimentalServerEnabled bool `mapstructure:"experimental_server_enabled"` } var _ config.Receiver = (*Config)(nil) @@ -80,5 +85,10 @@ func (cfg *Config) Unmarshal(componentParser *configparser.Parser) error { cfg.HTTP = nil } + // If the experimental field is loaded, zero out the config for the gRPC protocol + if cfg.ExperimentalServerEnabled { + cfg.GRPC = nil + } + return nil } diff --git a/receiver/otlpreceiver/config_test.go b/receiver/otlpreceiver/config_test.go index 02f59e8c297..87130fe2d67 100644 --- a/receiver/otlpreceiver/config_test.go +++ b/receiver/otlpreceiver/config_test.go @@ -42,7 +42,7 @@ func TestLoadConfig(t *testing.T) { require.NoError(t, err) require.NotNil(t, cfg) - assert.Equal(t, len(cfg.Receivers), 10) + assert.Equal(t, len(cfg.Receivers), 11) assert.Equal(t, cfg.Receivers[config.NewID(typeStr)], factory.CreateDefaultConfig()) @@ -190,6 +190,20 @@ func TestLoadConfig(t *testing.T) { }, }, }) + + assert.Equal(t, cfg.Receivers[config.NewIDWithName(typeStr, "experimental")], + &Config{ + ReceiverSettings: config.NewReceiverSettings(config.NewIDWithName(typeStr, "experimental")), + Protocols: Protocols{ + GRPC: nil, + HTTP: &confighttp.HTTPServerSettings{ + Endpoint: "0.0.0.0:4317", + CorsOrigins: []string{"https://*.test.com", "https://test.com"}, + CorsHeaders: []string{"ExampleHeader"}, + }, + }, + ExperimentalServerEnabled: true, + }) } func TestFailedLoadConfig(t *testing.T) { diff --git a/receiver/otlpreceiver/otlp.go b/receiver/otlpreceiver/otlp.go index a43f4381c00..ec7b9bb705a 100644 --- a/receiver/otlpreceiver/otlp.go +++ b/receiver/otlpreceiver/otlp.go @@ -37,6 +37,7 @@ import ( ) const ( + grpcContentType = "application/grpc" pbContentType = "application/x-protobuf" jsonContentType = "application/json" ) @@ -202,6 +203,11 @@ func (r *otlpReceiver) registerTraceConsumer(tc consumer.Traces) error { return componenterror.ErrNilNextConsumer } r.traceReceiver = trace.New(r.cfg.ID(), tc) + if r.cfg.ExperimentalServerEnabled { + r.httpMux.HandleFunc("/opentelemetry.proto.collector.trace.v1.TraceService/Export", func(resp http.ResponseWriter, req *http.Request) { + handleTraces(resp, req, grpcContentType, r.traceReceiver, tracesPbUnmarshaler) + }).Methods(http.MethodPost).Headers("Content-Type", grpcContentType) + } if r.httpMux != nil { r.httpMux.HandleFunc("/v1/traces", func(resp http.ResponseWriter, req *http.Request) { handleTraces(resp, req, pbContentType, r.traceReceiver, tracesPbUnmarshaler) @@ -229,6 +235,11 @@ func (r *otlpReceiver) registerMetricsConsumer(mc consumer.Metrics) error { return componenterror.ErrNilNextConsumer } r.metricsReceiver = metrics.New(r.cfg.ID(), mc) + if r.cfg.ExperimentalServerEnabled { + r.httpMux.HandleFunc("/opentelemetry.proto.collector.metrics.v1.MetricsService/Export", func(resp http.ResponseWriter, req *http.Request) { + handleMetrics(resp, req, grpcContentType, r.metricsReceiver, metricsPbUnmarshaler) + }).Methods(http.MethodPost).Headers("Content-Type", grpcContentType) + } if r.httpMux != nil { r.httpMux.HandleFunc("/v1/metrics", func(resp http.ResponseWriter, req *http.Request) { handleMetrics(resp, req, pbContentType, r.metricsReceiver, metricsPbUnmarshaler) @@ -248,6 +259,11 @@ func (r *otlpReceiver) registerLogsConsumer(lc consumer.Logs) error { return componenterror.ErrNilNextConsumer } r.logReceiver = logs.New(r.cfg.ID(), lc) + if r.cfg.ExperimentalServerEnabled { + r.httpMux.HandleFunc("/opentelemetry.proto.collector.logs.v1.LogsService/Export", func(w http.ResponseWriter, req *http.Request) { + handleLogs(w, req, grpcContentType, r.logReceiver, logsPbUnmarshaler) + }).Methods(http.MethodPost).Headers("Content-Type", grpcContentType) + } if r.httpMux != nil { r.httpMux.HandleFunc("/v1/logs", func(w http.ResponseWriter, req *http.Request) { handleLogs(w, req, pbContentType, r.logReceiver, logsPbUnmarshaler) diff --git a/receiver/otlpreceiver/otlpgrpc.go b/receiver/otlpreceiver/otlpgrpc.go new file mode 100644 index 00000000000..e082c24da71 --- /dev/null +++ b/receiver/otlpreceiver/otlpgrpc.go @@ -0,0 +1,60 @@ +package otlpreceiver + +import ( + "encoding/binary" + "errors" + "io" + "net/http" + "strconv" + + "google.golang.org/grpc/codes" +) + +const grpcStatus = "grpc-status" +const grpcMessage = "grpc-message" + +var errTooLong = errors.New("response payload is too long to encode into gRPC") + +type grpcResponseWriter struct { + innerRw http.ResponseWriter + headerWritten bool +} + +func (rw grpcResponseWriter) Header() http.Header { + return rw.innerRw.Header() +} + +func (rw grpcResponseWriter) Write(b []byte) (int, error) { + if !rw.headerWritten { + rw.WriteHeader(int(codes.OK)) + } + lengthPrefix := make([]byte, 5) + payloadLen := uint32(len(b)) + if int(payloadLen) != len(b) { + return 0, errTooLong + } + binary.BigEndian.PutUint32(lengthPrefix[1:], payloadLen) + _, err := rw.innerRw.Write(lengthPrefix) + if err != nil { + return 0, err + } + return rw.innerRw.Write(b) +} + +func (rw *grpcResponseWriter) WriteHeader(statusCode int) { + if !rw.headerWritten { + rw.headerWritten = true + rw.Header()["Date"] = nil + rw.Header()["Trailer"] = []string{grpcStatus, grpcMessage} + rw.Header().Set("Content-Type", grpcContentType) + rw.innerRw.WriteHeader(200) + } + rw.Header().Set(grpcStatus, strconv.FormatInt(int64(statusCode), 10)) +} + +func unwrapProtoFromGrpc(reader io.Reader) (compressed bool, errout error) { + b := make([]byte, 5) + _, err := io.ReadFull(reader, b) + // compressed := b[0] == 1 + return b[0] == 1, err +} diff --git a/receiver/otlpreceiver/otlphttp.go b/receiver/otlpreceiver/otlphttp.go index 16ad9b25d26..d5104e6f2e7 100644 --- a/receiver/otlpreceiver/otlphttp.go +++ b/receiver/otlpreceiver/otlphttp.go @@ -16,15 +16,18 @@ package otlpreceiver import ( "bytes" + "errors" "io/ioutil" "net/http" "github.com/gogo/protobuf/jsonpb" "github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/types" + spb "google.golang.org/genproto/googleapis/rpc/status" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "go.opentelemetry.io/collector/internal/middleware" "go.opentelemetry.io/collector/model/pdata" "go.opentelemetry.io/collector/receiver/otlpreceiver/internal/logs" "go.opentelemetry.io/collector/receiver/otlpreceiver/internal/metrics" @@ -32,6 +35,33 @@ import ( ) var jsonMarshaler = &jsonpb.Marshaler{} +var errUnsupportedProtocol = errors.New("gRPC requires HTTP/2") +var emptyMessage = &types.Empty{} + +func newGrpcResponseWriter(resp http.ResponseWriter, req *http.Request) (http.ResponseWriter, error) { + grpcRw := &grpcResponseWriter{innerRw: resp} + + if req.ProtoMajor != 2 { + writeError(resp, grpcContentType, errUnsupportedProtocol, http.StatusBadRequest) + return nil, errUnsupportedProtocol + } + + grpcRw.WriteHeader(int(codes.Internal)) + compressed, err := unwrapProtoFromGrpc(req.Body) + if err != nil { + writeError(grpcRw, grpcContentType, err, http.StatusBadRequest) + return nil, err + } + if compressed { + newBody, err := middleware.NewBodyDecompressor(req.Body, req.Header.Get("grpc-encoding")) + if err != nil { + writeError(grpcRw, grpcContentType, err, http.StatusBadRequest) + return nil, err + } + req.Body = newBody + } + return grpcRw, nil +} func handleTraces( resp http.ResponseWriter, @@ -39,6 +69,15 @@ func handleTraces( contentType string, tracesReceiver *trace.Receiver, tracesUnmarshaler pdata.TracesUnmarshaler) { + + if contentType == grpcContentType { + var err error + resp, err = newGrpcResponseWriter(resp, req) + if err != nil { + return + } + } + body, ok := readAndCloseBody(resp, req, contentType) if !ok { return @@ -57,7 +96,7 @@ func handleTraces( } // TODO: Pass response from grpc handler when otlpgrpc returns concrete type. - writeResponse(resp, contentType, http.StatusOK, &types.Empty{}) + writeResponse(resp, contentType, http.StatusOK, emptyMessage) } func handleMetrics( @@ -66,6 +105,14 @@ func handleMetrics( contentType string, metricsReceiver *metrics.Receiver, metricsUnmarshaler pdata.MetricsUnmarshaler) { + if contentType == grpcContentType { + var err error + resp, err = newGrpcResponseWriter(resp, req) + if err != nil { + return + } + } + body, ok := readAndCloseBody(resp, req, contentType) if !ok { return @@ -84,7 +131,7 @@ func handleMetrics( } // TODO: Pass response from grpc handler when otlpgrpc returns concrete type. - writeResponse(resp, contentType, http.StatusOK, &types.Empty{}) + writeResponse(resp, contentType, http.StatusOK, emptyMessage) } func handleLogs( @@ -93,6 +140,14 @@ func handleLogs( contentType string, logsReceiver *logs.Receiver, logsUnmarshaler pdata.LogsUnmarshaler) { + if contentType == grpcContentType { + var err error + resp, err = newGrpcResponseWriter(resp, req) + if err != nil { + return + } + } + body, ok := readAndCloseBody(resp, req, contentType) if !ok { return @@ -111,7 +166,7 @@ func handleLogs( } // TODO: Pass response from grpc handler when otlpgrpc returns concrete type. - writeResponse(resp, contentType, http.StatusOK, &types.Empty{}) + writeResponse(resp, contentType, http.StatusOK, emptyMessage) } func readAndCloseBody(resp http.ResponseWriter, req *http.Request, contentType string) ([]byte, bool) { @@ -159,11 +214,23 @@ func errorHandler(w http.ResponseWriter, r *http.Request, errMsg string, statusC // Pre-computed status with code=Internal to be used in case of a marshaling error. var fallbackMsg = []byte(`{"code": 13, "message": "failed to marshal error message"}`) +const fallbackGrpcMessage = "failed to marshal error message" + const fallbackContentType = "application/json" func writeResponse(w http.ResponseWriter, contentType string, statusCode int, rsp proto.Message) { var err error var msg []byte + if contentType == grpcContentType { + if s, ok := rsp.(*spb.Status); ok { + statusCode = int(s.Code) + w.Header().Set(grpcMessage, s.Message) + rsp = emptyMessage + } else { + statusCode = int(codes.OK) + } + } + if contentType == "application/json" { buf := new(bytes.Buffer) err = jsonMarshaler.Marshal(buf, rsp) @@ -173,6 +240,10 @@ func writeResponse(w http.ResponseWriter, contentType string, statusCode int, rs } if err != nil { + if contentType == grpcContentType { + w.Header().Set(grpcMessage, fallbackGrpcMessage) + return + } msg = fallbackMsg contentType = fallbackContentType statusCode = http.StatusInternalServerError diff --git a/receiver/otlpreceiver/testdata/config.yaml b/receiver/otlpreceiver/testdata/config.yaml index 7e9a8ab94e0..92d38a97504 100644 --- a/receiver/otlpreceiver/testdata/config.yaml +++ b/receiver/otlpreceiver/testdata/config.yaml @@ -76,8 +76,8 @@ receivers: protocols: http: cors_allowed_origins: - - https://*.test.com # Wildcard subdomain. Allows domains like https://www.test.com and https://foo.test.com but not https://wwwtest.com. - - https://test.com # Fully qualified domain name. Allows https://test.com only. + - https://*.test.com # Wildcard subdomain. Allows domains like https://www.test.com and https://foo.test.com but not https://wwwtest.com. + - https://test.com # Fully qualified domain name. Allows https://test.com only. # The following entry demonstrates how to use CORS Header configuration. otlp/corsheader: protocols: @@ -87,6 +87,17 @@ receivers: - https://test.com # Fully qualified domain name. Allows https://test.com only. cors_allowed_headers: - ExampleHeader + # The following demonstrates the experimental server which can serve both gRPC and HTTP from the same endpoint + otlp/experimental: + experimental_server_enabled: true + protocols: + http: + endpoint: 0.0.0.0:4317 + cors_allowed_origins: + - https://*.test.com # Wildcard subdomain. Allows domains like https://www.test.com and https://foo.test.com but not https://wwwtest.com. + - https://test.com # Fully qualified domain name. Allows https://test.com only. + cors_allowed_headers: + - ExampleHeader processors: nop: