diff --git a/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/AsyncapiClientCompositeBindingAdapter.java b/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/AsyncapiClientCompositeBindingAdapter.java index 34cc1e7bf6..d8d7c354cb 100644 --- a/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/AsyncapiClientCompositeBindingAdapter.java +++ b/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/AsyncapiClientCompositeBindingAdapter.java @@ -15,12 +15,16 @@ package io.aklivity.zilla.runtime.binding.asyncapi.internal; import static io.aklivity.zilla.runtime.engine.config.KindConfig.CLIENT; +import static java.util.Collections.emptyList; + +import java.util.List; import io.aklivity.zilla.runtime.binding.asyncapi.config.AsyncapiConfig; import io.aklivity.zilla.runtime.binding.asyncapi.config.AsyncapiOptionsConfig; import io.aklivity.zilla.runtime.binding.asyncapi.internal.view.AsyncapiServerView; import io.aklivity.zilla.runtime.engine.config.BindingConfig; import io.aklivity.zilla.runtime.engine.config.CompositeBindingAdapterSpi; +import io.aklivity.zilla.runtime.engine.config.MetricRefConfig; import io.aklivity.zilla.runtime.engine.config.NamespaceConfigBuilder; public class AsyncapiClientCompositeBindingAdapter extends AsyncapiCompositeBindingAdapter implements CompositeBindingAdapterSpi @@ -39,6 +43,8 @@ public BindingConfig adapt( AsyncapiOptionsConfig options = (AsyncapiOptionsConfig) binding.options; AsyncapiConfig asyncapiConfig = options.specs.get(0); this.asyncapi = asyncapiConfig.asyncapi; + final List metricRefs = binding.telemetryRef != null ? + binding.telemetryRef.metricRefs : emptyList(); //TODO: add composite for all servers AsyncapiServerView firstServer = AsyncapiServerView.of(asyncapi.servers.entrySet().iterator().next().getValue()); @@ -52,20 +58,23 @@ public BindingConfig adapt( return BindingConfig.builder(binding) .composite() .name(String.format("%s.%s", qname, "$composite")) + .inject(n -> this.injectNamespaceMetric(n, !metricRefs.isEmpty())) .inject(n -> this.injectCatalog(n, asyncapi)) - .inject(protocol::injectProtocolClientCache) + .inject(n -> protocol.injectProtocolClientCache(n, metricRefs)) .binding() .name(String.format("%s_client0", protocol.scheme)) .type(protocol.scheme) .kind(CLIENT) + .inject(b -> this.injectMetrics(b, metricRefs, protocol.scheme)) .inject(protocol::injectProtocolClientOptions) .exit(isTlsEnabled ? "tls_client0" : "tcp_client0") .build() - .inject(n -> injectTlsClient(n, options)) + .inject(n -> injectTlsClient(n, options, metricRefs)) .binding() .name("tcp_client0") .type("tcp") .kind(CLIENT) + .inject(b -> this.injectMetrics(b, metricRefs, "tcp")) .options(options.tcp) .build() .build() @@ -74,7 +83,8 @@ public BindingConfig adapt( private NamespaceConfigBuilder injectTlsClient( NamespaceConfigBuilder namespace, - AsyncapiOptionsConfig options) + AsyncapiOptionsConfig options, + List metricRefs) { if (isTlsEnabled) { @@ -83,6 +93,7 @@ private NamespaceConfigBuilder injectTlsClient( .name("tls_client0") .type("tls") .kind(CLIENT) + .inject(b -> this.injectMetrics(b, metricRefs, "tls")) .options(options.tls) .vault(String.format("%s:%s", this.namespace, vault)) .exit("tcp_client0") diff --git a/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/AsyncapiCompositeBindingAdapter.java b/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/AsyncapiCompositeBindingAdapter.java index aaa35c87e2..9243a80d1d 100644 --- a/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/AsyncapiCompositeBindingAdapter.java +++ b/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/AsyncapiCompositeBindingAdapter.java @@ -16,8 +16,10 @@ import static com.fasterxml.jackson.dataformat.yaml.YAMLGenerator.Feature.MINIMIZE_QUOTES; import static com.fasterxml.jackson.dataformat.yaml.YAMLGenerator.Feature.WRITE_DOC_START_MARKER; +import static java.util.stream.Collectors.toList; import static org.agrona.LangUtil.rethrowUnchecked; +import java.util.List; import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -36,7 +38,11 @@ import io.aklivity.zilla.runtime.binding.asyncapi.internal.view.AsyncapiSchemaView; import io.aklivity.zilla.runtime.catalog.inline.config.InlineOptionsConfig; import io.aklivity.zilla.runtime.catalog.inline.config.InlineSchemaConfigBuilder; +import io.aklivity.zilla.runtime.engine.config.BindingConfig; +import io.aklivity.zilla.runtime.engine.config.BindingConfigBuilder; +import io.aklivity.zilla.runtime.engine.config.MetricRefConfig; import io.aklivity.zilla.runtime.engine.config.NamespaceConfigBuilder; +import io.aklivity.zilla.runtime.engine.config.TelemetryRefConfigBuilder; public class AsyncapiCompositeBindingAdapter { @@ -149,4 +155,77 @@ protected static String writeSchemaYaml( } return result; } + + protected BindingConfigBuilder injectMetrics( + BindingConfigBuilder binding, + List metricRefs, + String protocol) + { + List metrics = metricRefs.stream() + .filter(m -> m.name.startsWith("stream.")) + .collect(toList()); + + if (!metrics.isEmpty()) + { + final TelemetryRefConfigBuilder> telemetry = binding.telemetry(); + metrics.forEach(telemetry::metric); + telemetry.build(); + } + + return binding; + } + + protected NamespaceConfigBuilder> injectNamespaceMetric( + NamespaceConfigBuilder> namespace, + boolean hasMetrics) + { + if (hasMetrics) + { + namespace + .telemetry() + .metric() + .group("stream") + .name("stream.active.received") + .build() + .metric() + .group("stream") + .name("stream.active.sent") + .build() + .metric() + .group("stream") + .name("stream.opens.received") + .build() + .metric() + .group("stream") + .name("stream.opens.sent") + .build() + .metric() + .group("stream") + .name("stream.data.received") + .build() + .metric() + .group("stream") + .name("stream.data.sent") + .build() + .metric() + .group("stream") + .name("stream.errors.received") + .build() + .metric() + .group("stream") + .name("stream.errors.sent") + .build() + .metric() + .group("stream") + .name("stream.closes.received") + .build() + .metric() + .group("stream") + .name("stream.closes.sent") + .build() + .build(); + } + + return namespace; + } } diff --git a/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/AsyncapiProtocol.java b/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/AsyncapiProtocol.java index 72fb71a3de..51dad71e5c 100644 --- a/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/AsyncapiProtocol.java +++ b/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/AsyncapiProtocol.java @@ -17,9 +17,11 @@ import static java.util.Objects.requireNonNull; import java.net.URI; +import java.util.List; import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.stream.Collectors; import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.Asyncapi; import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.AsyncapiMessage; @@ -28,7 +30,9 @@ import io.aklivity.zilla.runtime.binding.asyncapi.internal.view.AsyncapiServerView; import io.aklivity.zilla.runtime.engine.config.BindingConfigBuilder; import io.aklivity.zilla.runtime.engine.config.CatalogedConfigBuilder; +import io.aklivity.zilla.runtime.engine.config.MetricRefConfig; import io.aklivity.zilla.runtime.engine.config.NamespaceConfigBuilder; +import io.aklivity.zilla.runtime.engine.config.TelemetryRefConfigBuilder; public abstract class AsyncapiProtocol { @@ -62,7 +66,8 @@ public abstract BindingConfigBuilder injectProtocolServerRoutes( BindingConfigBuilder binding); public NamespaceConfigBuilder injectProtocolClientCache( - NamespaceConfigBuilder namespace) + NamespaceConfigBuilder namespace, + List metricRefs) { return namespace; } @@ -145,4 +150,18 @@ protected URI findFirstServerUrlWithScheme( } return result; } + + protected BindingConfigBuilder injectMetrics( + BindingConfigBuilder binding, + List metricRefs, + String protocol) + { + final TelemetryRefConfigBuilder> telemetry = binding.telemetry(); + metricRefs.stream() + .filter(m -> m.name.startsWith("stream.")) + .collect(Collectors.toList()) + .forEach(m -> telemetry.metric(m)); + telemetry.build(); + return binding; + } } diff --git a/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/AsyncapiProxyCompositeBindingAdapter.java b/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/AsyncapiProxyCompositeBindingAdapter.java index c85e764503..1309fabaaa 100644 --- a/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/AsyncapiProxyCompositeBindingAdapter.java +++ b/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/AsyncapiProxyCompositeBindingAdapter.java @@ -15,6 +15,7 @@ package io.aklivity.zilla.runtime.binding.asyncapi.internal; import static io.aklivity.zilla.runtime.engine.config.KindConfig.PROXY; +import static java.util.Collections.emptyList; import java.util.Collections; import java.util.List; @@ -33,6 +34,7 @@ import io.aklivity.zilla.runtime.engine.config.BindingConfig; import io.aklivity.zilla.runtime.engine.config.BindingConfigBuilder; import io.aklivity.zilla.runtime.engine.config.CompositeBindingAdapterSpi; +import io.aklivity.zilla.runtime.engine.config.MetricRefConfig; import io.aklivity.zilla.runtime.engine.config.RouteConfigBuilder; public class AsyncapiProxyCompositeBindingAdapter extends AsyncapiCompositeBindingAdapter implements CompositeBindingAdapterSpi @@ -58,6 +60,8 @@ public BindingConfig adapt( .collect(Collectors.toList()); this.asyncApis = options.specs.stream().collect(Collectors.toUnmodifiableMap(a -> a.apiLabel, a -> a.asyncapi)); this.qname = binding.qname; + final List metricRefs = binding.telemetryRef != null ? + binding.telemetryRef.metricRefs : emptyList(); String sessions = ""; String messages = ""; @@ -83,10 +87,12 @@ public BindingConfig adapt( return BindingConfig.builder(binding) .composite() .name(String.format("%s/%s", qname, "mqtt-kafka")) + .inject(n -> this.injectNamespaceMetric(n, !metricRefs.isEmpty())) .binding() .name("mqtt_kafka_proxy0") .type("mqtt-kafka") .kind(PROXY) + .inject(b -> this.injectMetrics(b, metricRefs, "mqtt-kafka")) .options(MqttKafkaOptionsConfig::builder) .topics() .sessions(sessions) diff --git a/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/AsyncapiServerCompositeBindingAdapter.java b/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/AsyncapiServerCompositeBindingAdapter.java index 9fae81ce33..7cfab97f87 100644 --- a/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/AsyncapiServerCompositeBindingAdapter.java +++ b/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/AsyncapiServerCompositeBindingAdapter.java @@ -15,6 +15,9 @@ package io.aklivity.zilla.runtime.binding.asyncapi.internal; import static io.aklivity.zilla.runtime.engine.config.KindConfig.SERVER; +import static java.util.Collections.emptyList; + +import java.util.List; import io.aklivity.zilla.runtime.binding.asyncapi.config.AsyncapiConfig; import io.aklivity.zilla.runtime.binding.asyncapi.config.AsyncapiOptionsConfig; @@ -25,6 +28,7 @@ import io.aklivity.zilla.runtime.engine.config.BindingConfig; import io.aklivity.zilla.runtime.engine.config.BindingConfigBuilder; import io.aklivity.zilla.runtime.engine.config.CompositeBindingAdapterSpi; +import io.aklivity.zilla.runtime.engine.config.MetricRefConfig; import io.aklivity.zilla.runtime.engine.config.NamespaceConfigBuilder; public class AsyncapiServerCompositeBindingAdapter extends AsyncapiCompositeBindingAdapter implements CompositeBindingAdapterSpi @@ -44,6 +48,8 @@ public BindingConfig adapt( AsyncapiOptionsConfig options = (AsyncapiOptionsConfig) binding.options; AsyncapiConfig asyncapiConfig = options.specs.get(0); this.asyncapi = asyncapiConfig.asyncapi; + final List metricRefs = binding.telemetryRef != null ? + binding.telemetryRef.metricRefs : emptyList(); //TODO: add composite for all servers AsyncapiServerView firstServer = AsyncapiServerView.of(asyncapi.servers.entrySet().iterator().next().getValue()); @@ -57,22 +63,25 @@ public BindingConfig adapt( return BindingConfig.builder(binding) .composite() .name(String.format("%s/%s", qname, protocol.scheme)) + .inject(n -> this.injectNamespaceMetric(n, !metricRefs.isEmpty())) .inject(n -> this.injectCatalog(n, asyncapi)) .binding() .name("tcp_server0") .type("tcp") .kind(SERVER) + .inject(b -> this.injectMetrics(b, metricRefs, "tcp")) .options(TcpOptionsConfig::builder) .host("0.0.0.0") .ports(compositePorts) .build() .inject(this::injectPlainTcpRoute) - .inject(this::injectTlsTcpRoute) + .inject(b -> this.injectTlsTcpRoute(b, metricRefs)) .build() .inject(n -> injectTlsServer(n, options)) .binding() .name(String.format("%s_server0", protocol.scheme)) .type(protocol.scheme) + .inject(b -> this.injectMetrics(b, metricRefs, protocol.scheme)) .kind(SERVER) .inject(protocol::injectProtocolServerOptions) .inject(protocol::injectProtocolServerRoutes) @@ -98,11 +107,13 @@ private BindingConfigBuilder injectPlainTcpRoute( } private BindingConfigBuilder injectTlsTcpRoute( - BindingConfigBuilder binding) + BindingConfigBuilder binding, + List metricRefs) { if (isTlsEnabled) { binding + .inject(b -> this.injectMetrics(b, metricRefs, "tls")) .route() .when(TcpConditionConfig::builder) .ports(compositePorts) diff --git a/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/AyncapiKafkaProtocol.java b/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/AyncapiKafkaProtocol.java index ec7e1243b2..fce84a3023 100644 --- a/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/AyncapiKafkaProtocol.java +++ b/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/AyncapiKafkaProtocol.java @@ -14,6 +14,7 @@ */ package io.aklivity.zilla.runtime.binding.asyncapi.internal; +import java.util.List; import java.util.Map; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -34,6 +35,7 @@ import io.aklivity.zilla.runtime.engine.config.BindingConfigBuilder; import io.aklivity.zilla.runtime.engine.config.CatalogedConfigBuilder; import io.aklivity.zilla.runtime.engine.config.KindConfig; +import io.aklivity.zilla.runtime.engine.config.MetricRefConfig; import io.aklivity.zilla.runtime.engine.config.NamespaceConfigBuilder; import io.aklivity.zilla.runtime.model.json.config.JsonModelConfig; @@ -60,13 +62,15 @@ public AyncapiKafkaProtocol( @Override public NamespaceConfigBuilder injectProtocolClientCache( - NamespaceConfigBuilder namespace) + NamespaceConfigBuilder namespace, + List metricRefs) { return namespace .binding() .name("kafka_cache_client0") .type("kafka") .kind(KindConfig.CACHE_CLIENT) + .inject(b -> this.injectMetrics(b, metricRefs, "kafka")) .options(KafkaOptionsConfig::builder) .inject(this::injectKafkaTopicOptions) .build() @@ -76,6 +80,7 @@ public NamespaceConfigBuilder injectProtocolClientCache( .name("kafka_cache_server0") .type("kafka") .kind(KindConfig.CACHE_SERVER) + .inject(b -> this.injectMetrics(b, metricRefs, "kafka")) .options(KafkaOptionsConfig::builder) .inject(this::injectKafkaBootstrapOptions) .inject(this::injectKafkaTopicOptions) diff --git a/incubator/binding-openapi-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/config/OpenapiAsyncCompositeBindingAdapter.java b/incubator/binding-openapi-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/config/OpenapiAsyncCompositeBindingAdapter.java index 0bcd8d57b7..5ba3db36a2 100644 --- a/incubator/binding-openapi-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/config/OpenapiAsyncCompositeBindingAdapter.java +++ b/incubator/binding-openapi-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/config/OpenapiAsyncCompositeBindingAdapter.java @@ -15,6 +15,7 @@ package io.aklivity.zilla.runtime.binding.openapi.asyncapi.internal.config; import static io.aklivity.zilla.runtime.engine.config.KindConfig.PROXY; +import static java.util.Collections.emptyList; import static java.util.stream.Collectors.toList; import java.util.ArrayList; @@ -46,7 +47,10 @@ import io.aklivity.zilla.runtime.engine.config.BindingConfig; import io.aklivity.zilla.runtime.engine.config.BindingConfigBuilder; import io.aklivity.zilla.runtime.engine.config.CompositeBindingAdapterSpi; +import io.aklivity.zilla.runtime.engine.config.MetricRefConfig; +import io.aklivity.zilla.runtime.engine.config.NamespaceConfigBuilder; import io.aklivity.zilla.runtime.engine.config.RouteConfigBuilder; +import io.aklivity.zilla.runtime.engine.config.TelemetryRefConfigBuilder; public final class OpenapiAsyncCompositeBindingAdapter implements CompositeBindingAdapterSpi { @@ -66,7 +70,9 @@ public String type() public BindingConfig adapt( BindingConfig binding) { - OpenapiAsyncapiOptionsConfig options = (OpenapiAsyncapiOptionsConfig) binding.options; + final OpenapiAsyncapiOptionsConfig options = (OpenapiAsyncapiOptionsConfig) binding.options; + final List metricRefs = binding.telemetryRef != null ? + binding.telemetryRef.metricRefs : emptyList(); List routes = binding.routes.stream() .map(r -> new OpenapiAsyncapiRouteConfig(r, options::resolveOpenapiApiId)) @@ -75,10 +81,12 @@ public BindingConfig adapt( return BindingConfig.builder(binding) .composite() .name(String.format("%s/http_kafka", binding.qname)) + .inject(n -> this.injectNamespaceMetric(n, !metricRefs.isEmpty())) .binding() .name("http_kafka0") .type("http-kafka") .kind(PROXY) + .inject(b -> this.injectMetrics(b, metricRefs, "http-kafka")) .inject(b -> this.injectHttpKafkaRoutes(b, binding.qname, options.specs, routes)) .build() .build() @@ -213,6 +221,79 @@ private HttpKafkaWithFetchConfigBuilder injectHttpKafkaRouteFetchWith( return fetch; } + protected NamespaceConfigBuilder> injectNamespaceMetric( + NamespaceConfigBuilder> namespace, + boolean hasMetrics) + { + if (hasMetrics) + { + namespace + .telemetry() + .metric() + .group("stream") + .name("stream.active.received") + .build() + .metric() + .group("stream") + .name("stream.active.sent") + .build() + .metric() + .group("stream") + .name("stream.opens.received") + .build() + .metric() + .group("stream") + .name("stream.opens.sent") + .build() + .metric() + .group("stream") + .name("stream.data.received") + .build() + .metric() + .group("stream") + .name("stream.data.sent") + .build() + .metric() + .group("stream") + .name("stream.errors.received") + .build() + .metric() + .group("stream") + .name("stream.errors.sent") + .build() + .metric() + .group("stream") + .name("stream.closes.received") + .build() + .metric() + .group("stream") + .name("stream.closes.sent") + .build() + .build(); + } + + return namespace; + } + + protected BindingConfigBuilder injectMetrics( + BindingConfigBuilder binding, + List metricRefs, + String protocol) + { + List metrics = metricRefs.stream() + .filter(m -> m.name.startsWith("stream.")) + .collect(toList()); + + if (!metrics.isEmpty()) + { + final TelemetryRefConfigBuilder> telemetry = binding.telemetry(); + metrics.forEach(telemetry::metric); + telemetry.build(); + } + + return binding; + } + private OpenapiSchemaView resolveSchemaForJsonContentType( Map content, Openapi openApi) diff --git a/incubator/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/OpenapiBindingAdapter.java b/incubator/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/OpenapiBindingAdapter.java new file mode 100644 index 0000000000..f99cb7f5f4 --- /dev/null +++ b/incubator/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/OpenapiBindingAdapter.java @@ -0,0 +1,56 @@ +/* + * Copyright 2021-2023 Aklivity Inc + * + * Licensed under the Aklivity Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * https://www.aklivity.io/aklivity-community-license/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.aklivity.zilla.runtime.binding.openapi.internal.config; + +import static io.aklivity.zilla.runtime.engine.config.KindConfig.CLIENT; +import static io.aklivity.zilla.runtime.engine.config.KindConfig.SERVER; +import static java.util.function.UnaryOperator.identity; + +import java.util.EnumMap; +import java.util.Map; +import java.util.function.UnaryOperator; + +import io.aklivity.zilla.runtime.binding.openapi.internal.OpenapiBinding; +import io.aklivity.zilla.runtime.engine.config.BindingConfig; +import io.aklivity.zilla.runtime.engine.config.CompositeBindingAdapterSpi; +import io.aklivity.zilla.runtime.engine.config.KindConfig; + +public final class OpenapiBindingAdapter implements CompositeBindingAdapterSpi +{ + private final UnaryOperator composite; + + @Override + public String type() + { + return OpenapiBinding.NAME; + } + + public OpenapiBindingAdapter() + { + Map> composites = new EnumMap<>(KindConfig.class); + composites.put(SERVER, new OpenapiServerCompositeBindingAdapter()::adapt); + composites.put(CLIENT, new OpenapiClientCompositeBindingAdapter()::adapt); + UnaryOperator composite = binding -> composites + .getOrDefault(binding.kind, identity()).apply(binding); + this.composite = composite; + } + + @Override + public BindingConfig adapt( + BindingConfig binding) + { + return composite.apply(binding); + } +} diff --git a/incubator/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/OpenapiClientCompositeBindingAdapter.java b/incubator/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/OpenapiClientCompositeBindingAdapter.java index 79bc622575..c4171996da 100644 --- a/incubator/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/OpenapiClientCompositeBindingAdapter.java +++ b/incubator/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/OpenapiClientCompositeBindingAdapter.java @@ -15,9 +15,11 @@ package io.aklivity.zilla.runtime.binding.openapi.internal.config; import static io.aklivity.zilla.runtime.engine.config.KindConfig.CLIENT; +import static java.util.Collections.emptyList; import static java.util.Objects.requireNonNull; import java.net.URI; +import java.util.List; import java.util.Map; import io.aklivity.zilla.runtime.binding.http.config.HttpOptionsConfig; @@ -27,7 +29,6 @@ import io.aklivity.zilla.runtime.binding.http.config.HttpResponseConfigBuilder; import io.aklivity.zilla.runtime.binding.openapi.config.OpenapiConfig; import io.aklivity.zilla.runtime.binding.openapi.config.OpenapiOptionsConfig; -import io.aklivity.zilla.runtime.binding.openapi.internal.OpenapiBinding; import io.aklivity.zilla.runtime.binding.openapi.internal.model.Openapi; import io.aklivity.zilla.runtime.binding.openapi.internal.model.OpenapiHeader; import io.aklivity.zilla.runtime.binding.openapi.internal.model.OpenapiResponse; @@ -41,38 +42,21 @@ import io.aklivity.zilla.runtime.binding.tls.config.TlsOptionsConfig; import io.aklivity.zilla.runtime.engine.config.BindingConfig; import io.aklivity.zilla.runtime.engine.config.BindingConfigBuilder; -import io.aklivity.zilla.runtime.engine.config.CompositeBindingAdapterSpi; +import io.aklivity.zilla.runtime.engine.config.MetricRefConfig; import io.aklivity.zilla.runtime.engine.config.ModelConfig; import io.aklivity.zilla.runtime.engine.config.NamespaceConfigBuilder; -import io.aklivity.zilla.runtime.model.core.config.Int32ModelConfig; -import io.aklivity.zilla.runtime.model.core.config.StringModelConfig; import io.aklivity.zilla.runtime.model.json.config.JsonModelConfig; -public final class OpenapiClientCompositeBindingAdapter implements CompositeBindingAdapterSpi +public final class OpenapiClientCompositeBindingAdapter extends OpenapiCompositeBindingAdapter { - private static final String INLINE_CATALOG_NAME = "catalog0"; - - private final Map models = Map.of( - "string", StringModelConfig.builder().build(), - "integer", Int32ModelConfig.builder().build() - ); - - @Override - public String type() - { - return OpenapiBinding.NAME; - } - - public OpenapiClientCompositeBindingAdapter() - { - } - @Override public BindingConfig adapt( BindingConfig binding) { - OpenapiOptionsConfig options = (OpenapiOptionsConfig) binding.options; - OpenapiConfig openapiConfig = options.openapis.get(0); + final OpenapiOptionsConfig options = (OpenapiOptionsConfig) binding.options; + final OpenapiConfig openapiConfig = options.openapis.get(0); + final List metricRefs = binding.telemetryRef != null ? + binding.telemetryRef.metricRefs : emptyList(); final Openapi openApi = openapiConfig.openapi; final int[] httpsPorts = resolvePortsForScheme(openApi, "https"); @@ -81,19 +65,22 @@ public BindingConfig adapt( return BindingConfig.builder(binding) .composite() .name(String.format(binding.qname, "$composite")) + .inject(n -> this.injectNamespaceMetric(n, !metricRefs.isEmpty())) .binding() .name("http_client0") .type("http") .kind(CLIENT) .inject(b -> this.injectHttpClientOptions(b, openApi)) + .inject(b -> this.injectMetrics(b, metricRefs, "http")) .exit(secure ? "tls_client0" : "tcp_client0") .build() - .inject(b -> this.injectTlsClient(b, options.tls, secure)) + .inject(b -> this.injectTlsClient(b, options.tls, secure, metricRefs)) .binding() .name("tcp_client0") .type("tcp") .kind(CLIENT) .options(options.tcp) + .inject(b -> this.injectMetrics(b, metricRefs, "tcp")) .build() .build() .build(); @@ -235,7 +222,8 @@ private HttpResponseConfigBuilder injectResponseHeaders( private NamespaceConfigBuilder injectTlsClient( NamespaceConfigBuilder namespace, TlsOptionsConfig tlsConfig, - boolean secure) + boolean secure, + List metricRefs) { if (secure) { @@ -246,10 +234,10 @@ private NamespaceConfigBuilder injectTlsClient( .kind(CLIENT) .options(tlsConfig) .vault("client") + .inject(b -> injectMetrics(b, metricRefs, "tls")) .exit("tcp_client0") .build(); } return namespace; } - } diff --git a/incubator/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/OpenapiCompositeBindingAdapter.java b/incubator/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/OpenapiCompositeBindingAdapter.java index 06ad49a40c..a24b0525ae 100644 --- a/incubator/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/OpenapiCompositeBindingAdapter.java +++ b/incubator/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/OpenapiCompositeBindingAdapter.java @@ -14,43 +14,109 @@ */ package io.aklivity.zilla.runtime.binding.openapi.internal.config; -import static io.aklivity.zilla.runtime.engine.config.KindConfig.CLIENT; -import static io.aklivity.zilla.runtime.engine.config.KindConfig.SERVER; -import static java.util.function.UnaryOperator.identity; -import java.util.EnumMap; +import static java.util.stream.Collectors.toList; + +import java.util.List; import java.util.Map; -import java.util.function.UnaryOperator; +import java.util.regex.Matcher; +import java.util.regex.Pattern; -import io.aklivity.zilla.runtime.binding.openapi.internal.OpenapiBinding; import io.aklivity.zilla.runtime.engine.config.BindingConfig; -import io.aklivity.zilla.runtime.engine.config.CompositeBindingAdapterSpi; -import io.aklivity.zilla.runtime.engine.config.KindConfig; +import io.aklivity.zilla.runtime.engine.config.BindingConfigBuilder; +import io.aklivity.zilla.runtime.engine.config.MetricRefConfig; +import io.aklivity.zilla.runtime.engine.config.ModelConfig; +import io.aklivity.zilla.runtime.engine.config.NamespaceConfigBuilder; +import io.aklivity.zilla.runtime.engine.config.TelemetryRefConfigBuilder; +import io.aklivity.zilla.runtime.model.core.config.Int32ModelConfig; +import io.aklivity.zilla.runtime.model.core.config.StringModelConfig; -public final class OpenapiCompositeBindingAdapter implements CompositeBindingAdapterSpi +public abstract class OpenapiCompositeBindingAdapter { - private final UnaryOperator composite; + protected static final String INLINE_CATALOG_NAME = "catalog0"; + protected static final String INLINE_CATALOG_TYPE = "inline"; + protected static final String VERSION_LATEST = "latest"; + protected static final Pattern JSON_CONTENT_TYPE = Pattern.compile("^application/(?:.+\\+)?json$"); - @Override - public String type() - { - return OpenapiBinding.NAME; - } + protected final Matcher jsonContentType = JSON_CONTENT_TYPE.matcher(""); + protected final Map models = Map.of( + "string", StringModelConfig.builder().build(), + "integer", Int32ModelConfig.builder().build() + ); - public OpenapiCompositeBindingAdapter() + protected NamespaceConfigBuilder> injectNamespaceMetric( + NamespaceConfigBuilder> namespace, + boolean hasMetrics) { - Map> composites = new EnumMap<>(KindConfig.class); - composites.put(SERVER, new OpenapiServerCompositeBindingAdapter()::adapt); - composites.put(CLIENT, new OpenapiClientCompositeBindingAdapter()::adapt); - UnaryOperator composite = binding -> composites - .getOrDefault(binding.kind, identity()).apply(binding); - this.composite = composite; + if (hasMetrics) + { + namespace + .telemetry() + .metric() + .group("stream") + .name("stream.active.received") + .build() + .metric() + .group("stream") + .name("stream.active.sent") + .build() + .metric() + .group("stream") + .name("stream.opens.received") + .build() + .metric() + .group("stream") + .name("stream.opens.sent") + .build() + .metric() + .group("stream") + .name("stream.data.received") + .build() + .metric() + .group("stream") + .name("stream.data.sent") + .build() + .metric() + .group("stream") + .name("stream.errors.received") + .build() + .metric() + .group("stream") + .name("stream.errors.sent") + .build() + .metric() + .group("stream") + .name("stream.closes.received") + .build() + .metric() + .group("stream") + .name("stream.closes.sent") + .build() + .build(); + } + + return namespace; } - @Override - public BindingConfig adapt( - BindingConfig binding) + protected BindingConfigBuilder injectMetrics( + BindingConfigBuilder binding, + List metricRefs, + String protocol) { - return composite.apply(binding); + List metrics = metricRefs.stream() + .filter(m -> m.name.startsWith("stream.")) + .collect(toList()); + + if (!metrics.isEmpty()) + { + final TelemetryRefConfigBuilder> telemetry = binding.telemetry(); + metrics.forEach(telemetry::metric); + telemetry.build(); + } + + return binding; } + + public abstract BindingConfig adapt( + BindingConfig binding); } diff --git a/incubator/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/OpenapiServerCompositeBindingAdapter.java b/incubator/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/OpenapiServerCompositeBindingAdapter.java index bfe18cec7a..ff70d4e4db 100644 --- a/incubator/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/OpenapiServerCompositeBindingAdapter.java +++ b/incubator/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/OpenapiServerCompositeBindingAdapter.java @@ -16,14 +16,13 @@ import static io.aklivity.zilla.runtime.binding.http.config.HttpPolicyConfig.CROSS_ORIGIN; import static io.aklivity.zilla.runtime.engine.config.KindConfig.SERVER; +import static java.util.Collections.emptyList; import static java.util.Objects.requireNonNull; import static org.agrona.LangUtil.rethrowUnchecked; import java.net.URI; import java.util.List; import java.util.Map; -import java.util.regex.Matcher; -import java.util.regex.Pattern; import jakarta.json.bind.Jsonb; import jakarta.json.bind.JsonbBuilder; @@ -38,7 +37,6 @@ import io.aklivity.zilla.runtime.binding.http.config.HttpRequestConfigBuilder; import io.aklivity.zilla.runtime.binding.openapi.config.OpenapiConfig; import io.aklivity.zilla.runtime.binding.openapi.config.OpenapiOptionsConfig; -import io.aklivity.zilla.runtime.binding.openapi.internal.OpenapiBinding; import io.aklivity.zilla.runtime.binding.openapi.internal.model.Openapi; import io.aklivity.zilla.runtime.binding.openapi.internal.model.OpenapiMediaType; import io.aklivity.zilla.runtime.binding.openapi.internal.model.OpenapiOperation; @@ -55,40 +53,23 @@ import io.aklivity.zilla.runtime.catalog.inline.config.InlineSchemaConfigBuilder; import io.aklivity.zilla.runtime.engine.config.BindingConfig; import io.aklivity.zilla.runtime.engine.config.BindingConfigBuilder; -import io.aklivity.zilla.runtime.engine.config.CompositeBindingAdapterSpi; import io.aklivity.zilla.runtime.engine.config.GuardedConfigBuilder; +import io.aklivity.zilla.runtime.engine.config.MetricRefConfig; import io.aklivity.zilla.runtime.engine.config.ModelConfig; import io.aklivity.zilla.runtime.engine.config.NamespaceConfigBuilder; import io.aklivity.zilla.runtime.engine.config.RouteConfigBuilder; -import io.aklivity.zilla.runtime.model.core.config.Int32ModelConfig; -import io.aklivity.zilla.runtime.model.core.config.StringModelConfig; import io.aklivity.zilla.runtime.model.json.config.JsonModelConfig; -public final class OpenapiServerCompositeBindingAdapter implements CompositeBindingAdapterSpi +public final class OpenapiServerCompositeBindingAdapter extends OpenapiCompositeBindingAdapter { - private static final String INLINE_CATALOG_NAME = "catalog0"; - private static final String INLINE_CATALOG_TYPE = "inline"; - private static final String VERSION_LATEST = "latest"; - private static final Pattern JSON_CONTENT_TYPE = Pattern.compile("^application/(?:.+\\+)?json$"); - - private final Matcher jsonContentType = JSON_CONTENT_TYPE.matcher(""); - private final Map models = Map.of( - "string", StringModelConfig.builder().build(), - "integer", Int32ModelConfig.builder().build() - ); - - @Override - public String type() - { - return OpenapiBinding.NAME; - } - @Override public BindingConfig adapt( BindingConfig binding) { - OpenapiOptionsConfig options = (OpenapiOptionsConfig) binding.options; - OpenapiConfig openapiConfig = options.openapis.get(0); + final OpenapiOptionsConfig options = (OpenapiOptionsConfig) binding.options; + final OpenapiConfig openapiConfig = options.openapis.get(0); + final List metricRefs = binding.telemetryRef != null ? + binding.telemetryRef.metricRefs : emptyList(); final Openapi openApi = openapiConfig.openapi; final TlsOptionsConfig tlsOption = options.tls != null ? options.tls : null; @@ -107,6 +88,7 @@ public BindingConfig adapt( return BindingConfig.builder(binding) .composite() .name(String.format("%s/http", binding.qname)) + .inject(namespace -> injectNamespaceMetric(namespace, !metricRefs.isEmpty())) .inject(n -> this.injectCatalog(n, openApi)) .binding() .name("tcp_server0") @@ -118,8 +100,9 @@ public BindingConfig adapt( .build() .inject(b -> this.injectPlainTcpRoute(b, httpPorts, secure)) .inject(b -> this.injectTlsTcpRoute(b, httpsPorts, secure)) + .inject(b -> this.injectMetrics(b, metricRefs, "tcp")) .build() - .inject(n -> this.injectTlsServer(n, qvault, tlsOption, secure)) + .inject(n -> this.injectTlsServer(n, qvault, tlsOption, secure, metricRefs)) .binding() .name("http_server0") .type("http") @@ -132,6 +115,7 @@ public BindingConfig adapt( .inject(r -> this.injectHttpServerRequests(r, openApi)) .build() .inject(b -> this.injectHttpServerRoutes(b, openApi, binding.qname, guardName, securitySchemes)) + .inject(b -> this.injectMetrics(b, metricRefs, "http")) .build() .build() .build(); @@ -177,7 +161,7 @@ private NamespaceConfigBuilder injectTlsServer( NamespaceConfigBuilder namespace, String vault, TlsOptionsConfig tls, - boolean secure) + boolean secure, List metricRefs) { if (secure) { @@ -189,6 +173,7 @@ private NamespaceConfigBuilder injectTlsServer( .options(tls) .vault(vault) .exit("http_server0") + .inject(b -> this.injectMetrics(b, metricRefs, "tls")) .build(); } return namespace; diff --git a/incubator/binding-openapi/src/main/moditect/module-info.java b/incubator/binding-openapi/src/main/moditect/module-info.java index 9e8001e5dd..11c6001a1c 100644 --- a/incubator/binding-openapi/src/main/moditect/module-info.java +++ b/incubator/binding-openapi/src/main/moditect/module-info.java @@ -35,7 +35,7 @@ with io.aklivity.zilla.runtime.binding.openapi.internal.OpenapiBindingFactorySpi; provides io.aklivity.zilla.runtime.engine.config.CompositeBindingAdapterSpi - with io.aklivity.zilla.runtime.binding.openapi.internal.config.OpenapiCompositeBindingAdapter; + with io.aklivity.zilla.runtime.binding.openapi.internal.config.OpenapiBindingAdapter; provides io.aklivity.zilla.runtime.engine.config.OptionsConfigAdapterSpi with io.aklivity.zilla.runtime.binding.openapi.internal.config.OpenapiOptionsConfigAdapter; diff --git a/incubator/binding-openapi/src/main/resources/META-INF/services/io.aklivity.zilla.runtime.engine.config.CompositeBindingAdapterSpi b/incubator/binding-openapi/src/main/resources/META-INF/services/io.aklivity.zilla.runtime.engine.config.CompositeBindingAdapterSpi index 499d92471c..392312312e 100644 --- a/incubator/binding-openapi/src/main/resources/META-INF/services/io.aklivity.zilla.runtime.engine.config.CompositeBindingAdapterSpi +++ b/incubator/binding-openapi/src/main/resources/META-INF/services/io.aklivity.zilla.runtime.engine.config.CompositeBindingAdapterSpi @@ -1 +1 @@ -io.aklivity.zilla.runtime.binding.openapi.internal.config.OpenapiCompositeBindingAdapter +io.aklivity.zilla.runtime.binding.openapi.internal.config.OpenapiBindingAdapter