From ffccbf71f2c137cbe240abb8ff6a7ae77d9520de Mon Sep 17 00:00:00 2001
From: JBD <jbd@amazon.com>
Date: Thu, 12 Nov 2020 11:20:44 -0800
Subject: [PATCH 1/3] Fix the scraper/discover manager coordination on the
 Prometheus receiver

The receiver contains various unnecessary sections. Rewriting the
receiver's Start for better maintainability.

Related to #1909.
---
 .../prometheusreceiver/internal/ocastore.go   | 12 ++-
 .../prometheusreceiver/metrics_receiver.go    | 96 ++++++++-----------
 service/internal/resources.go                 | 12 +--
 3 files changed, 53 insertions(+), 67 deletions(-)

diff --git a/receiver/prometheusreceiver/internal/ocastore.go b/receiver/prometheusreceiver/internal/ocastore.go
index 55e1f53719d..ea539fc1740 100644
--- a/receiver/prometheusreceiver/internal/ocastore.go
+++ b/receiver/prometheusreceiver/internal/ocastore.go
@@ -47,16 +47,19 @@ type OcaStore interface {
 
 // OpenCensus Store for prometheus
 type ocaStore struct {
-	running              int32
-	logger               *zap.Logger
+	ctx context.Context
+
+	running              int32 // access atomically
 	sink                 consumer.MetricsConsumer
 	mc                   *mService
-	once                 *sync.Once
-	ctx                  context.Context
 	jobsMap              *JobsMap
 	useStartTimeMetric   bool
 	startTimeMetricRegex string
 	receiverName         string
+
+	logger *zap.Logger
+
+	once sync.Once
 }
 
 // NewOcaStore returns an ocaStore instance, which can be acted as prometheus' scrape.Appendable
@@ -66,7 +69,6 @@ func NewOcaStore(ctx context.Context, sink consumer.MetricsConsumer, logger *zap
 		ctx:                  ctx,
 		sink:                 sink,
 		logger:               logger,
-		once:                 &sync.Once{},
 		jobsMap:              jobsMap,
 		useStartTimeMetric:   useStartTimeMetric,
 		startTimeMetricRegex: startTimeMetricRegex,
diff --git a/receiver/prometheusreceiver/metrics_receiver.go b/receiver/prometheusreceiver/metrics_receiver.go
index 639e3546bd3..f05ab91303e 100644
--- a/receiver/prometheusreceiver/metrics_receiver.go
+++ b/receiver/prometheusreceiver/metrics_receiver.go
@@ -16,7 +16,6 @@ package prometheusreceiver
 
 import (
 	"context"
-	"sync"
 	"time"
 
 	"github.com/prometheus/prometheus/discovery"
@@ -25,18 +24,16 @@ import (
 
 	"go.opentelemetry.io/collector/component"
 	"go.opentelemetry.io/collector/consumer"
-	"go.opentelemetry.io/collector/obsreport"
 	"go.opentelemetry.io/collector/receiver/prometheusreceiver/internal"
 )
 
 // pReceiver is the type that provides Prometheus scraper/receiver functionality.
 type pReceiver struct {
-	startOnce sync.Once
-	stopOnce  sync.Once
-	cfg       *Config
-	consumer  consumer.MetricsConsumer
-	cancel    context.CancelFunc
-	logger    *zap.Logger
+	cfg        *Config
+	consumer   consumer.MetricsConsumer
+	cancelFunc context.CancelFunc
+
+	logger *zap.Logger
 }
 
 // New creates a new prometheus.Receiver reference.
@@ -51,62 +48,49 @@ func newPrometheusReceiver(logger *zap.Logger, cfg *Config, next consumer.Metric
 
 // Start is the method that starts Prometheus scraping and it
 // is controlled by having previously defined a Configuration using perhaps New.
-func (pr *pReceiver) Start(_ context.Context, host component.Host) error {
-	pr.startOnce.Do(func() {
-		ctx := context.Background()
-		c, cancel := context.WithCancel(ctx)
-		pr.cancel = cancel
-		c = obsreport.ReceiverContext(c, pr.cfg.Name(), "http")
-		var jobsMap *internal.JobsMap
-		if !pr.cfg.UseStartTimeMetric {
-			jobsMap = internal.NewJobsMap(2 * time.Minute)
-		}
-		app := internal.NewOcaStore(c, pr.consumer, pr.logger, jobsMap, pr.cfg.UseStartTimeMetric, pr.cfg.StartTimeMetricRegex, pr.cfg.Name())
-		// need to use a logger with the gokitLog interface
-		l := internal.NewZapToGokitLogAdapter(pr.logger)
-		scrapeManager := scrape.NewManager(l, app)
-		app.SetScrapeManager(scrapeManager)
-		discoveryManagerScrape := discovery.NewManager(ctx, l)
-		go func() {
-			if err := discoveryManagerScrape.Run(); err != nil {
-				host.ReportFatalError(err)
-			}
-		}()
-		if err := scrapeManager.ApplyConfig(pr.cfg.PrometheusConfig); err != nil {
-			host.ReportFatalError(err)
-			return
-		}
+func (r *pReceiver) Start(ctx context.Context, host component.Host) error {
+	logger := internal.NewZapToGokitLogAdapter(r.logger)
 
-		// Run the scrape manager.
-		syncConfig := make(chan bool)
-		errsChan := make(chan error, 1)
-		go func() {
-			defer close(errsChan)
-			<-time.After(100 * time.Millisecond)
-			close(syncConfig)
-			if err := scrapeManager.Run(discoveryManagerScrape.SyncCh()); err != nil {
-				errsChan <- err
-			}
-		}()
-		<-syncConfig
-		// By this point we've given time to the scrape manager
-		// to start applying its original configuration.
+	discoveryCtx, cancel := context.WithCancel(ctx)
+	r.cancelFunc = cancel
 
-		discoveryCfg := make(map[string]discovery.Configs)
-		for _, scrapeConfig := range pr.cfg.PrometheusConfig.ScrapeConfigs {
-			discoveryCfg[scrapeConfig.JobName] = scrapeConfig.ServiceDiscoveryConfigs
+	discoveryManager := discovery.NewManager(discoveryCtx, logger)
+	discoveryCfg := make(map[string]discovery.Configs)
+	for _, scrapeConfig := range r.cfg.PrometheusConfig.ScrapeConfigs {
+		discoveryCfg[scrapeConfig.JobName] = scrapeConfig.ServiceDiscoveryConfigs
+	}
+	if err := discoveryManager.ApplyConfig(discoveryCfg); err != nil {
+		return err
+	}
+	go func() {
+		if err := discoveryManager.Run(); err != nil {
+			r.logger.Error("Discovery manager failed", zap.Error(err))
+			host.ReportFatalError(err)
 		}
+	}()
+
+	var jobsMap *internal.JobsMap
+	if !r.cfg.UseStartTimeMetric {
+		jobsMap = internal.NewJobsMap(2 * time.Minute)
+	}
+	ocaStore := internal.NewOcaStore(ctx, r.consumer, r.logger, jobsMap, r.cfg.UseStartTimeMetric, r.cfg.StartTimeMetricRegex, r.cfg.Name())
 
-		// Now trigger the discovery notification to the scrape manager.
-		if err := discoveryManagerScrape.ApplyConfig(discoveryCfg); err != nil {
-			errsChan <- err
+	scrapeManager := scrape.NewManager(logger, ocaStore)
+	ocaStore.SetScrapeManager(scrapeManager)
+	if err := scrapeManager.ApplyConfig(r.cfg.PrometheusConfig); err != nil {
+		return err
+	}
+	go func() {
+		if err := scrapeManager.Run(discoveryManager.SyncCh()); err != nil {
+			r.logger.Error("Scrape manager failed", zap.Error(err))
+			host.ReportFatalError(err)
 		}
-	})
+	}()
 	return nil
 }
 
 // Shutdown stops and cancels the underlying Prometheus scrapers.
-func (pr *pReceiver) Shutdown(context.Context) error {
-	pr.stopOnce.Do(pr.cancel)
+func (r *pReceiver) Shutdown(context.Context) error {
+	r.cancelFunc()
 	return nil
 }
diff --git a/service/internal/resources.go b/service/internal/resources.go
index 3777769f2b3..c5e93cd1c43 100644
--- a/service/internal/resources.go
+++ b/service/internal/resources.go
@@ -227,7 +227,7 @@ var _escData = map[string]*_escFile{
 		name:    "component_header.html",
 		local:   "templates/component_header.html",
 		size:    156,
-		modtime: 1594178791,
+		modtime: 1605208512,
 		compressed: `
 H4sIAAAAAAAC/1SMsQqDMBRFd7/iIq7q5lBiltKt9B8CPklQX6R1e9x/L6ZQ2vXcc65ZE3AZ0V3ztmcV
 PW467TnpQVZmzZp0Kfs96VJQizTjw1uyAgAXB+8C4lPmsT4fydqbdY+wCen64F0fB19iWV/yF/54X0en
@@ -239,7 +239,7 @@ U3kHAAD//zT+SdCcAAAA
 		name:    "extensions_table.html",
 		local:   "templates/extensions_table.html",
 		size:    353,
-		modtime: 1594178791,
+		modtime: 1605208512,
 		compressed: `
 H4sIAAAAAAAC/2SQwU7DMBBE7/2KlemRNJwjxxwQHDnwB248DRbOOnK2tGD531HTQIvqk1fzZjU7Wuw2
 gCb5CmjVNiaHVE2j7Tz3DT0osyIiynltqWlp8xSHMTJYntmN0bOUsgDJcg9ap3jw7HC8n7+z5y0epgU7
@@ -252,7 +252,7 @@ oxX5HeETfMGv9NPTkv4i2e6jT3HPrqE7AEui8yaECbdWkzPYUXWlaHFkg++5VR1YkJTRlt4Tdq06HVfK
 		name:    "footer.html",
 		local:   "templates/footer.html",
 		size:    15,
-		modtime: 1594178791,
+		modtime: 1605208512,
 		compressed: `
 H4sIAAAAAAAC/7LRT8pPqbTjstHPKMnNsQMEAAD//wEFevAPAAAA
 `,
@@ -262,7 +262,7 @@ H4sIAAAAAAAC/7LRT8pPqbTjstHPKMnNsQMEAAD//wEFevAPAAAA
 		name:    "header.html",
 		local:   "templates/header.html",
 		size:    467,
-		modtime: 1594178791,
+		modtime: 1605208512,
 		compressed: `
 H4sIAAAAAAAC/5TRMU8sIRAH8P4+BY/25eC9szGGxUItLIwW11giO7uMB8wG5rxsLvfdDdnTxNhoBeFP
 fpnM3/y5fbzZPj/dicAp2pVph4guj52ELK0J4Hq7EkIIk4Cd8MGVCtzJPQ/rS3mOGDmCPR7Vtl1OJ6OX
@@ -276,7 +276,7 @@ vuDEoocBiqjF/5RszGuV1uhFsCujl0bMC/Vz62vzZe1hY98DAAD//7qRGmLTAQAA
 		name:    "pipelines_table.html",
 		local:   "templates/pipelines_table.html",
 		size:    1946,
-		modtime: 1594178791,
+		modtime: 1605208512,
 		compressed: `
 H4sIAAAAAAAC/7SVwXLTMBCG7zyFxnRyIjVcU1scSpnhAMN0eAFZ2gRNlZVmJbdujd+dsWyrTp0LtL5k
 rOjX/tlv/8hFEJUB5sOjgTKrLCmgrXdCajzs2MeMv2OMsSLQ8DAsFJPWeCew/MSE0QcsDewDLyr+tTbm
@@ -293,7 +293,7 @@ QeMmXNC4hCvdNKvQgsYtacFoGWFFxSvCNl+lu3HQFXl8JfO/AQAA//9We3KLmgcAAA==
 		name:    "properties_table.html",
 		local:   "templates/properties_table.html",
 		size:    420,
-		modtime: 1594178791,
+		modtime: 1605208512,
 		compressed: `
 H4sIAAAAAAAC/2SRwW7DIBBE7/6KVRr1VMc5u5gfqFT11Ds2U8sqWVuwqRoR/r1yTCpb4YAEO48ZDarV
 MR7ezQkp1apqdaHEtA4U5OLQ7NrRW/gyTKYbuK/puNMFEVGMtB/Y4pfqho6UUr71hnvk0Qvt4XACyyw6

From 6f4cc91e5c3b9f0cb97712238d68e9f0b77ee0f2 Mon Sep 17 00:00:00 2001
From: JBD <jbd@amazon.com>
Date: Thu, 12 Nov 2020 13:39:23 -0800
Subject: [PATCH 2/3] Use the background context

---
 receiver/prometheusreceiver/metrics_receiver.go | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/receiver/prometheusreceiver/metrics_receiver.go b/receiver/prometheusreceiver/metrics_receiver.go
index f05ab91303e..3c9828b154e 100644
--- a/receiver/prometheusreceiver/metrics_receiver.go
+++ b/receiver/prometheusreceiver/metrics_receiver.go
@@ -49,11 +49,11 @@ func newPrometheusReceiver(logger *zap.Logger, cfg *Config, next consumer.Metric
 // Start is the method that starts Prometheus scraping and it
 // is controlled by having previously defined a Configuration using perhaps New.
 func (r *pReceiver) Start(ctx context.Context, host component.Host) error {
-	logger := internal.NewZapToGokitLogAdapter(r.logger)
-
-	discoveryCtx, cancel := context.WithCancel(ctx)
+	discoveryCtx, cancel := context.WithCancel(context.Background())
 	r.cancelFunc = cancel
 
+	logger := internal.NewZapToGokitLogAdapter(r.logger)
+
 	discoveryManager := discovery.NewManager(discoveryCtx, logger)
 	discoveryCfg := make(map[string]discovery.Configs)
 	for _, scrapeConfig := range r.cfg.PrometheusConfig.ScrapeConfigs {

From 883685b5d1ba62277110ed2b44343a59b9783b6e Mon Sep 17 00:00:00 2001
From: JBD <jbd@amazon.com>
Date: Thu, 12 Nov 2020 15:57:43 -0800
Subject: [PATCH 3/3] Remove dead code

---
 receiver/prometheusreceiver/internal/ocastore.go | 3 ---
 1 file changed, 3 deletions(-)

diff --git a/receiver/prometheusreceiver/internal/ocastore.go b/receiver/prometheusreceiver/internal/ocastore.go
index ea539fc1740..cbdb7e74a9f 100644
--- a/receiver/prometheusreceiver/internal/ocastore.go
+++ b/receiver/prometheusreceiver/internal/ocastore.go
@@ -17,7 +17,6 @@ package internal
 import (
 	"context"
 	"io"
-	"sync"
 	"sync/atomic"
 
 	"github.com/prometheus/prometheus/pkg/labels"
@@ -58,8 +57,6 @@ type ocaStore struct {
 	receiverName         string
 
 	logger *zap.Logger
-
-	once sync.Once
 }
 
 // NewOcaStore returns an ocaStore instance, which can be acted as prometheus' scrape.Appendable