Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PROTOTYPE] Combine gRPC and HTTP servers #3765

Closed
31 changes: 29 additions & 2 deletions config/confighttp/confighttp.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"github.com/rs/cors"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel"
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
Expand Down Expand Up @@ -198,6 +200,21 @@ func (hss *HTTPServerSettings) ToServer(handler http.Handler, opts ...ToServerOp
o(serverOpts)
}

// Declare an HTTP/1 server which:
// Handles only HTTP/1 traffic over TLS or non-TLS
h1Server := &http.Server{
Addr: hss.Endpoint,
}

// Declare an HTTP/2 server which will handle any H2 protocol traffic
h2Server := &http2.Server{
NewWriteScheduler: func() http2.WriteScheduler { return http2.NewPriorityWriteScheduler(nil) },
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment to explain why we need priority scheduler and not the default one.

}

// Configure the HTTP/1 server to serve TLS ALPN=h2 traffic onto the H2 server
// The handler specified on the HTTP/1 server will be via H2 transport
http2.ConfigureServer(h1Server, h2Server)

handler = middleware.HTTPContentDecompressor(
handler,
middleware.WithErrorHandler(serverOpts.errorHandler),
Expand Down Expand Up @@ -225,7 +242,17 @@ func (hss *HTTPServerSettings) ToServer(handler http.Handler, opts ...ToServerOp
}),
)

return &http.Server{
Handler: handler,
// Enable the H2C handler if and only if there are no TLS settings in
// accordance with rfc7540 section-3.4
if hss.TLSSetting == nil {
// Configure the H1 server to intercept HTTP/2 with prior
// knowledge and handle that with the H2 server.
// This allows H2 upgrade via the HTTP/1 server path
handler = h2c.NewHandler(handler, h2Server)
}

// Set the handler on the HTTP1 server
h1Server.Handler = handler

return h1Server
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ require (
go.opentelemetry.io/otel/trace v1.0.0-RC2
go.uber.org/atomic v1.9.0
go.uber.org/zap v1.18.1
golang.org/x/net v0.0.0-20210614182718-04defd469f4e
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1
golang.org/x/text v0.3.6
google.golang.org/genproto v0.0.0-20210604141403-392c879c8b08
Expand Down
13 changes: 8 additions & 5 deletions internal/middleware/compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func HTTPContentDecompressor(h http.Handler, opts ...DecompressorOption) http.Ha

func (d *decompressor) wrap(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
newBody, err := newBodyReader(r)
newBody, err := NewBodyDecompressor(r.Body, r.Header.Get("Content-Encoding"))
if err != nil {
d.errorHandler(w, r, err.Error(), http.StatusBadRequest)
return
Expand All @@ -128,16 +128,19 @@ func (d *decompressor) wrap(h http.Handler) http.Handler {
})
}

func newBodyReader(r *http.Request) (io.ReadCloser, error) {
switch r.Header.Get("Content-Encoding") {
// NewBodyDecompressor takes a content encoding string and an io.ReadCloser and
// returns a new reader that, when read, contains the decompressed content.
// It supports gzip and deflate/zlib compression.
func NewBodyDecompressor(body io.ReadCloser, contentEncoding string) (io.ReadCloser, error) {
switch contentEncoding {
case "gzip":
gr, err := gzip.NewReader(r.Body)
gr, err := gzip.NewReader(body)
if err != nil {
return nil, err
}
return gr, nil
case "deflate", "zlib":
zr, err := zlib.NewReader(r.Body)
zr, err := zlib.NewReader(body)
if err != nil {
return nil, err
}
Expand Down
10 changes: 10 additions & 0 deletions receiver/otlpreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const (
protoGRPC = "grpc"
protoHTTP = "http"
protocolsFieldName = "protocols"
experimentalField = "experimental_server"
)

// Protocols is the configuration for the supported protocols.
Expand All @@ -41,6 +42,10 @@ type Config struct {
config.ReceiverSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct
// Protocols is the configuration for the supported protocols, currently gRPC and HTTP (Proto and JSON).
Protocols `mapstructure:"protocols"`

// ExperimentalServerEnabled enables the HTTP server to serve all the protocols.
// Currently: gRPC and HTTP (Proto and JSON).
ExperimentalServerEnabled bool `mapstructure:"experimental_server_enabled"`
}

var _ config.Receiver = (*Config)(nil)
Expand Down Expand Up @@ -80,5 +85,10 @@ func (cfg *Config) Unmarshal(componentParser *configparser.Parser) error {
cfg.HTTP = nil
}

// If the experimental field is loaded, zero out the config for the gRPC protocol
if cfg.ExperimentalServerEnabled {
cfg.GRPC = nil
}

return nil
}
16 changes: 15 additions & 1 deletion receiver/otlpreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestLoadConfig(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, cfg)

assert.Equal(t, len(cfg.Receivers), 10)
assert.Equal(t, len(cfg.Receivers), 11)

assert.Equal(t, cfg.Receivers[config.NewID(typeStr)], factory.CreateDefaultConfig())

Expand Down Expand Up @@ -190,6 +190,20 @@ func TestLoadConfig(t *testing.T) {
},
},
})

assert.Equal(t, cfg.Receivers[config.NewIDWithName(typeStr, "experimental")],
&Config{
ReceiverSettings: config.NewReceiverSettings(config.NewIDWithName(typeStr, "experimental")),
Protocols: Protocols{
GRPC: nil,
HTTP: &confighttp.HTTPServerSettings{
Endpoint: "0.0.0.0:4317",
CorsOrigins: []string{"https://*.test.com", "https://test.com"},
CorsHeaders: []string{"ExampleHeader"},
},
},
ExperimentalServerEnabled: true,
})
}

func TestFailedLoadConfig(t *testing.T) {
Expand Down
16 changes: 16 additions & 0 deletions receiver/otlpreceiver/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
)

const (
grpcContentType = "application/grpc"
pbContentType = "application/x-protobuf"
jsonContentType = "application/json"
)
Expand Down Expand Up @@ -202,6 +203,11 @@ func (r *otlpReceiver) registerTraceConsumer(tc consumer.Traces) error {
return componenterror.ErrNilNextConsumer
}
r.traceReceiver = trace.New(r.cfg.ID(), tc)
if r.cfg.ExperimentalServerEnabled {
r.httpMux.HandleFunc("/opentelemetry.proto.collector.trace.v1.TraceService/Export", func(resp http.ResponseWriter, req *http.Request) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit fragile. If we rename a message or service in the proto this will no longer work. Can we have a test that ensures the path for this handler matches the proto definition correctly?
Or maybe event better, there is a way to get this string from generated protobuf *.pb.go files? (I don't see it).

handleTraces(resp, req, grpcContentType, r.traceReceiver, tracesPbUnmarshaler)
}).Methods(http.MethodPost).Headers("Content-Type", grpcContentType)
}
if r.httpMux != nil {
r.httpMux.HandleFunc("/v1/traces", func(resp http.ResponseWriter, req *http.Request) {
handleTraces(resp, req, pbContentType, r.traceReceiver, tracesPbUnmarshaler)
Expand Down Expand Up @@ -229,6 +235,11 @@ func (r *otlpReceiver) registerMetricsConsumer(mc consumer.Metrics) error {
return componenterror.ErrNilNextConsumer
}
r.metricsReceiver = metrics.New(r.cfg.ID(), mc)
if r.cfg.ExperimentalServerEnabled {
r.httpMux.HandleFunc("/opentelemetry.proto.collector.metrics.v1.MetricsService/Export", func(resp http.ResponseWriter, req *http.Request) {
handleMetrics(resp, req, grpcContentType, r.metricsReceiver, metricsPbUnmarshaler)
}).Methods(http.MethodPost).Headers("Content-Type", grpcContentType)
}
Comment on lines +238 to +242
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be inside if r.httpMux != nil { check? Not sure when can r.httpMux be nil though. Perhaps it can't anymore since we always create http server when ExperimentalServerEnabled?

if r.httpMux != nil {
r.httpMux.HandleFunc("/v1/metrics", func(resp http.ResponseWriter, req *http.Request) {
handleMetrics(resp, req, pbContentType, r.metricsReceiver, metricsPbUnmarshaler)
Expand All @@ -248,6 +259,11 @@ func (r *otlpReceiver) registerLogsConsumer(lc consumer.Logs) error {
return componenterror.ErrNilNextConsumer
}
r.logReceiver = logs.New(r.cfg.ID(), lc)
if r.cfg.ExperimentalServerEnabled {
r.httpMux.HandleFunc("/opentelemetry.proto.collector.logs.v1.LogsService/Export", func(w http.ResponseWriter, req *http.Request) {
handleLogs(w, req, grpcContentType, r.logReceiver, logsPbUnmarshaler)
}).Methods(http.MethodPost).Headers("Content-Type", grpcContentType)
}
if r.httpMux != nil {
r.httpMux.HandleFunc("/v1/logs", func(w http.ResponseWriter, req *http.Request) {
handleLogs(w, req, pbContentType, r.logReceiver, logsPbUnmarshaler)
Expand Down
60 changes: 60 additions & 0 deletions receiver/otlpreceiver/otlpgrpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package otlpreceiver

import (
"encoding/binary"
"errors"
"io"
"net/http"
"strconv"

"google.golang.org/grpc/codes"
)

const grpcStatus = "grpc-status"
const grpcMessage = "grpc-message"

var errTooLong = errors.New("response payload is too long to encode into gRPC")

type grpcResponseWriter struct {
innerRw http.ResponseWriter
headerWritten bool
}

func (rw grpcResponseWriter) Header() http.Header {
return rw.innerRw.Header()
}

func (rw grpcResponseWriter) Write(b []byte) (int, error) {
if !rw.headerWritten {
rw.WriteHeader(int(codes.OK))
}
lengthPrefix := make([]byte, 5)
payloadLen := uint32(len(b))
if int(payloadLen) != len(b) {
return 0, errTooLong
}
binary.BigEndian.PutUint32(lengthPrefix[1:], payloadLen)
_, err := rw.innerRw.Write(lengthPrefix)
if err != nil {
return 0, err
}
return rw.innerRw.Write(b)
}

func (rw *grpcResponseWriter) WriteHeader(statusCode int) {
if !rw.headerWritten {
rw.headerWritten = true
rw.Header()["Date"] = nil
rw.Header()["Trailer"] = []string{grpcStatus, grpcMessage}
rw.Header().Set("Content-Type", grpcContentType)
rw.innerRw.WriteHeader(200)
}
rw.Header().Set(grpcStatus, strconv.FormatInt(int64(statusCode), 10))
}

func unwrapProtoFromGrpc(reader io.Reader) (compressed bool, errout error) {
b := make([]byte, 5)
_, err := io.ReadFull(reader, b)
// compressed := b[0] == 1
return b[0] == 1, err
}
Loading