Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the scraper/discover manager coordination on the Prometheus receiver #2089

Merged
merged 3 commits into from
Nov 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions receiver/prometheusreceiver/internal/ocastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package internal
import (
"context"
"io"
"sync"
"sync/atomic"

"github.com/prometheus/prometheus/pkg/labels"
Expand Down Expand Up @@ -47,16 +46,17 @@ type OcaStore interface {

// OpenCensus Store for prometheus
type ocaStore struct {
running int32
logger *zap.Logger
ctx context.Context

running int32 // access atomically
sink consumer.MetricsConsumer
mc *mService
once *sync.Once
ctx context.Context
jobsMap *JobsMap
useStartTimeMetric bool
startTimeMetricRegex string
receiverName string

logger *zap.Logger
}

// NewOcaStore returns an ocaStore instance, which can be acted as prometheus' scrape.Appendable
Expand All @@ -66,7 +66,6 @@ func NewOcaStore(ctx context.Context, sink consumer.MetricsConsumer, logger *zap
ctx: ctx,
sink: sink,
logger: logger,
once: &sync.Once{},
jobsMap: jobsMap,
useStartTimeMetric: useStartTimeMetric,
startTimeMetricRegex: startTimeMetricRegex,
Expand Down
96 changes: 40 additions & 56 deletions receiver/prometheusreceiver/metrics_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package prometheusreceiver

import (
"context"
"sync"
"time"

"github.com/prometheus/prometheus/discovery"
Expand All @@ -25,18 +24,16 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/obsreport"
"go.opentelemetry.io/collector/receiver/prometheusreceiver/internal"
)

// pReceiver is the type that provides Prometheus scraper/receiver functionality.
type pReceiver struct {
startOnce sync.Once
stopOnce sync.Once
cfg *Config
consumer consumer.MetricsConsumer
cancel context.CancelFunc
logger *zap.Logger
cfg *Config
consumer consumer.MetricsConsumer
cancelFunc context.CancelFunc

logger *zap.Logger
}

// New creates a new prometheus.Receiver reference.
Expand All @@ -51,62 +48,49 @@ func newPrometheusReceiver(logger *zap.Logger, cfg *Config, next consumer.Metric

// Start is the method that starts Prometheus scraping and it
// is controlled by having previously defined a Configuration using perhaps New.
func (pr *pReceiver) Start(_ context.Context, host component.Host) error {
pr.startOnce.Do(func() {
ctx := context.Background()
c, cancel := context.WithCancel(ctx)
pr.cancel = cancel
c = obsreport.ReceiverContext(c, pr.cfg.Name(), "http")
var jobsMap *internal.JobsMap
if !pr.cfg.UseStartTimeMetric {
jobsMap = internal.NewJobsMap(2 * time.Minute)
}
app := internal.NewOcaStore(c, pr.consumer, pr.logger, jobsMap, pr.cfg.UseStartTimeMetric, pr.cfg.StartTimeMetricRegex, pr.cfg.Name())
// need to use a logger with the gokitLog interface
l := internal.NewZapToGokitLogAdapter(pr.logger)
scrapeManager := scrape.NewManager(l, app)
app.SetScrapeManager(scrapeManager)
discoveryManagerScrape := discovery.NewManager(ctx, l)
go func() {
if err := discoveryManagerScrape.Run(); err != nil {
host.ReportFatalError(err)
}
}()
if err := scrapeManager.ApplyConfig(pr.cfg.PrometheusConfig); err != nil {
host.ReportFatalError(err)
return
}
func (r *pReceiver) Start(ctx context.Context, host component.Host) error {
discoveryCtx, cancel := context.WithCancel(context.Background())
r.cancelFunc = cancel

// Run the scrape manager.
syncConfig := make(chan bool)
errsChan := make(chan error, 1)
go func() {
defer close(errsChan)
<-time.After(100 * time.Millisecond)
close(syncConfig)
if err := scrapeManager.Run(discoveryManagerScrape.SyncCh()); err != nil {
errsChan <- err
}
}()
<-syncConfig
// By this point we've given time to the scrape manager
// to start applying its original configuration.
logger := internal.NewZapToGokitLogAdapter(r.logger)

discoveryCfg := make(map[string]discovery.Configs)
for _, scrapeConfig := range pr.cfg.PrometheusConfig.ScrapeConfigs {
discoveryCfg[scrapeConfig.JobName] = scrapeConfig.ServiceDiscoveryConfigs
discoveryManager := discovery.NewManager(discoveryCtx, logger)
discoveryCfg := make(map[string]discovery.Configs)
for _, scrapeConfig := range r.cfg.PrometheusConfig.ScrapeConfigs {
discoveryCfg[scrapeConfig.JobName] = scrapeConfig.ServiceDiscoveryConfigs
}
if err := discoveryManager.ApplyConfig(discoveryCfg); err != nil {
return err
}
go func() {
if err := discoveryManager.Run(); err != nil {
r.logger.Error("Discovery manager failed", zap.Error(err))
host.ReportFatalError(err)
}
}()

var jobsMap *internal.JobsMap
if !r.cfg.UseStartTimeMetric {
jobsMap = internal.NewJobsMap(2 * time.Minute)
}
ocaStore := internal.NewOcaStore(ctx, r.consumer, r.logger, jobsMap, r.cfg.UseStartTimeMetric, r.cfg.StartTimeMetricRegex, r.cfg.Name())

// Now trigger the discovery notification to the scrape manager.
if err := discoveryManagerScrape.ApplyConfig(discoveryCfg); err != nil {
errsChan <- err
scrapeManager := scrape.NewManager(logger, ocaStore)
ocaStore.SetScrapeManager(scrapeManager)
if err := scrapeManager.ApplyConfig(r.cfg.PrometheusConfig); err != nil {
return err
}
go func() {
if err := scrapeManager.Run(discoveryManager.SyncCh()); err != nil {
r.logger.Error("Scrape manager failed", zap.Error(err))
host.ReportFatalError(err)
}
})
}()
return nil
}

// Shutdown stops and cancels the underlying Prometheus scrapers.
func (pr *pReceiver) Shutdown(context.Context) error {
pr.stopOnce.Do(pr.cancel)
func (r *pReceiver) Shutdown(context.Context) error {
r.cancelFunc()
return nil
}
12 changes: 6 additions & 6 deletions service/internal/resources.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.