Skip to content

Commit e3f30e5

Browse files
authored
Refactor Message Queue implementation (#64)
* Refactor management service message queue * Refactor treatment service message queue * Update plugin * Bump golangci-lint version * More fixes * Refactor implementation to message queue packages
1 parent 6c2e25f commit e3f30e5

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+537
-244
lines changed

.github/workflows/plugins.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ on:
2525

2626
env:
2727
GO_VERSION: 1.18
28-
GO_LINT_VERSION: v1.45.2
28+
GO_LINT_VERSION: v1.51.2
2929

3030
jobs:
3131
lint-go:

.github/workflows/xp.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ on:
2626
env:
2727
ARTIFACT_RETENTION_DAYS: 7
2828
GO_VERSION: 1.18
29-
GO_LINT_VERSION: v1.45.2
29+
GO_LINT_VERSION: v1.51.2
3030

3131
jobs:
3232
lint-python:

Makefile

+1-1
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ compile-protos: | $(protoc_dir)
9898
.PHONY: setup
9999
setup:
100100
@echo "> Initializing dependencies ..."
101-
@test -x ${GOPATH}/bin/golangci-lint || go install github.com/golangci/golangci-lint/cmd/golangci-lint@v1.40.1
101+
@test -x ${GOPATH}/bin/golangci-lint || go install github.com/golangci/golangci-lint/cmd/golangci-lint@v1.51.2
102102

103103
@echo "Setting up dev tools..."
104104
@test -x "$(which pre-commit)" || pip install pre-commit

api/schema.yaml

+15-2
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,26 @@ components:
99
topic_name:
1010
type: string
1111
description: Topic name of the PubSub subscription
12+
MessageQueueKind:
13+
description: Kind of message queue
14+
type: string
15+
enum:
16+
- noop
17+
- pubsub
18+
MessageQueueConfig:
19+
type: object
20+
properties:
21+
kind:
22+
$ref: '#/components/schemas/MessageQueueKind'
23+
pub_sub:
24+
$ref: '#/components/schemas/PubSub'
1225
SegmenterConfig:
1326
type: object
1427
TreatmentServiceConfig:
1528
type: object
1629
properties:
17-
pub_sub:
18-
$ref: '#/components/schemas/PubSub'
30+
message_queue_config:
31+
$ref: '#/components/schemas/MessageQueueConfig'
1932
segmenter_config:
2033
$ref: '#/components/schemas/SegmenterConfig'
2134
SelectedTreatmentData:

clients/testutils/mocks/management/ManagementClientInterface.go

+1-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

clients/testutils/mocks/treatment/TreatmentClientInterface.go

+1-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

common/api/schema/schema.go

+54-35
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

common/messagequeue/config.go

+26
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package messagequeue
2+
3+
// MessageQueueKind describes the message queue for transmitting event updates to and fro Treatment Service
4+
type MessageQueueKind = string
5+
6+
const (
7+
// NoopMQ is a No-Op Message Queue
8+
NoopMQ MessageQueueKind = ""
9+
// PubSubMQ is a PubSub Message Queue
10+
PubSubMQ MessageQueueKind = "pubsub"
11+
)
12+
13+
type MessageQueueConfig struct {
14+
// The type of Message Queue for event updates
15+
Kind MessageQueueKind `default:""`
16+
17+
// PubSubConfig captures the config related to publishing and subscribing to a PubSub Message Queue
18+
PubSubConfig *PubSubConfig
19+
}
20+
21+
type PubSubConfig struct {
22+
Project string `json:"project" default:"dev" validate:"required"`
23+
TopicName string `json:"topic_name" default:"xp-update" validate:"required"`
24+
// PubSubTimeoutSeconds is the duration beyond which subscribing to a topic will time out
25+
PubSubTimeoutSeconds int `json:"pub_sub_timeout_seconds" default:"30" validate:"required"`
26+
}

management-service/appcontext/appcontext.go

+3-6
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"github.com/caraml-dev/xp/management-service/config"
88
mw "github.com/caraml-dev/xp/management-service/middleware"
99
"github.com/caraml-dev/xp/management-service/services"
10+
"github.com/caraml-dev/xp/management-service/services/messagequeue"
1011
)
1112

1213
type AppContext struct {
@@ -29,11 +30,7 @@ func NewAppContext(db *gorm.DB, authorizer *mw.Authorizer, cfg *config.Config) (
2930
}
3031

3132
// Init Services
32-
pubSubConfig := config.PubSubConfig{
33-
Project: cfg.PubSubConfig.Project,
34-
TopicName: cfg.PubSubConfig.TopicName,
35-
}
36-
pubSubPublisherService, err := services.NewPubSubPublisherService(&pubSubConfig)
33+
messageQueueService, err := messagequeue.NewMessageQueueService(*cfg.MessageQueueConfig)
3734
if err != nil {
3835
return nil, err
3936
}
@@ -76,7 +73,7 @@ func NewAppContext(db *gorm.DB, authorizer *mw.Authorizer, cfg *config.Config) (
7673
treatmentSvc,
7774
treatmentHistorySvc,
7875
validationService,
79-
pubSubPublisherService,
76+
messageQueueService,
8077
configurationSvc,
8178
)
8279

management-service/appcontext/appcontext_test.go

+14-10
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,11 @@ import (
88
"github.com/stretchr/testify/require"
99
"gorm.io/gorm"
1010

11+
common_mq_config "github.com/caraml-dev/xp/common/messagequeue"
1112
"github.com/caraml-dev/xp/management-service/config"
1213
mw "github.com/caraml-dev/xp/management-service/middleware"
1314
"github.com/caraml-dev/xp/management-service/services"
15+
"github.com/caraml-dev/xp/management-service/services/messagequeue"
1416
"github.com/caraml-dev/xp/management-service/services/mocks"
1517
)
1618

@@ -30,9 +32,11 @@ func TestNewAppContext(t *testing.T) {
3032
MLPConfig: &config.MLPConfig{
3133
URL: "http://mlp.example.com/api/merlin/v1",
3234
},
33-
PubSubConfig: &config.PubSubConfig{
34-
Project: "test",
35-
TopicName: "update",
35+
MessageQueueConfig: &common_mq_config.MessageQueueConfig{
36+
PubSubConfig: &common_mq_config.PubSubConfig{
37+
Project: "test",
38+
TopicName: "update",
39+
},
3640
},
3741
ValidationConfig: config.ValidationConfig{
3842
ValidationUrlTimeoutSeconds: 5,
@@ -53,7 +57,7 @@ func TestNewAppContext(t *testing.T) {
5357
validationService, err := services.NewValidationService(cfg.ValidationConfig)
5458
require.NoError(t, err)
5559

56-
pubSubPublisherService, _ := services.NewPubSubPublisherService(cfg.PubSubConfig)
60+
messageQueueService, _ := messagequeue.NewMessageQueueService(*cfg.MessageQueueConfig)
5761

5862
expHistSvc := services.NewExperimentHistoryService(db)
5963
expSvc := services.NewExperimentService(&allServices, db)
@@ -87,10 +91,10 @@ func TestNewAppContext(t *testing.T) {
8791
return segmenterSvc, nil
8892
},
8993
)
90-
// Patch PubSub publisher service
91-
monkey.Patch(services.NewPubSubPublisherService,
92-
func(pubsubConfig *config.PubSubConfig) (services.PubSubPublisherService, error) {
93-
return pubSubPublisherService, nil
94+
// Patch MessageQueue service
95+
monkey.Patch(messagequeue.NewMessageQueueService,
96+
func(messageQueueConfig common_mq_config.MessageQueueConfig) (messagequeue.MessageQueueService, error) {
97+
return messageQueueService, nil
9498
},
9599
)
96100
// Patch New MLP Service to validate the input and return the mock service object
@@ -116,7 +120,7 @@ func TestNewAppContext(t *testing.T) {
116120
TreatmentService: treatmentSvc,
117121
TreatmentHistoryService: treatmentHistSvc,
118122
ValidationService: validationService,
119-
PubSubPublisherService: pubSubPublisherService,
123+
MessageQueueService: messageQueueService,
120124
ConfigurationService: configurationSvc,
121125
}
122126
monkey.Patch(services.NewServices,
@@ -131,7 +135,7 @@ func TestNewAppContext(t *testing.T) {
131135
treatmentService services.TreatmentService,
132136
treatmentHistoryService services.TreatmentHistoryService,
133137
validationService services.ValidationService,
134-
publisherService services.PubSubPublisherService,
138+
messageQueueService messagequeue.MessageQueueService,
135139
configurationService services.ConfigurationService,
136140
) services.Services {
137141
return allServices

management-service/config/config.go

+2-8
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"github.com/gojek/mlp/api/pkg/instrumentation/sentry"
99

1010
common_config "github.com/caraml-dev/xp/common/config"
11+
common_mq_config "github.com/caraml-dev/xp/common/messagequeue"
1112
)
1213

1314
type Config struct {
@@ -18,7 +19,7 @@ type Config struct {
1819
AuthorizationConfig *AuthorizationConfig
1920
DbConfig *DatabaseConfig
2021
MLPConfig *MLPConfig
21-
PubSubConfig *PubSubConfig
22+
MessageQueueConfig *common_mq_config.MessageQueueConfig
2223
SegmenterConfig map[string]interface{}
2324
ValidationConfig ValidationConfig
2425
DeploymentConfig DeploymentConfig
@@ -53,13 +54,6 @@ type MLPConfig struct {
5354
URL string
5455
}
5556

56-
// PubSubConfig captures the config for the Google PubSub client, to publish messages
57-
// about changes in the experimentation data
58-
type PubSubConfig struct {
59-
Project string `default:"dev"`
60-
TopicName string `default:"xp-update"`
61-
}
62-
6357
// ValidationConfig captures the config related to the validation of schemas
6458
type ValidationConfig struct {
6559
ValidationUrlTimeoutSeconds int `default:"5"`

management-service/config/config_test.go

+15-6
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"testing"
66
"time"
77

8+
common_mq_config "github.com/caraml-dev/xp/common/messagequeue"
89
"github.com/gojek/mlp/api/pkg/instrumentation/newrelic"
910
"github.com/gojek/mlp/api/pkg/instrumentation/sentry"
1011
"github.com/stretchr/testify/assert"
@@ -38,9 +39,13 @@ func TestDefaultConfigs(t *testing.T) {
3839
MLPConfig: &MLPConfig{
3940
URL: "",
4041
},
41-
PubSubConfig: &PubSubConfig{
42-
Project: "dev",
43-
TopicName: "xp-update",
42+
MessageQueueConfig: &common_mq_config.MessageQueueConfig{
43+
Kind: "",
44+
PubSubConfig: &common_mq_config.PubSubConfig{
45+
Project: "dev",
46+
TopicName: "xp-update",
47+
PubSubTimeoutSeconds: 30,
48+
},
4449
},
4550
ValidationConfig: ValidationConfig{
4651
ValidationUrlTimeoutSeconds: 5,
@@ -109,9 +114,13 @@ func TestLoadConfigFiles(t *testing.T) {
109114
MLPConfig: &MLPConfig{
110115
URL: "test-mlp-url",
111116
},
112-
PubSubConfig: &PubSubConfig{
113-
Project: "test-pubsub-project",
114-
TopicName: "test-pubsub-topic",
117+
MessageQueueConfig: &common_mq_config.MessageQueueConfig{
118+
Kind: "pubsub",
119+
PubSubConfig: &common_mq_config.PubSubConfig{
120+
Project: "test-pubsub-project",
121+
TopicName: "test-pubsub-topic",
122+
PubSubTimeoutSeconds: 30,
123+
},
115124
},
116125
ValidationConfig: ValidationConfig{
117126
ValidationUrlTimeoutSeconds: 5,

management-service/config/example.yaml

+5-3
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,11 @@ DbConfig:
2929
MLPConfig:
3030
URL: http://localhost:8080/api/v1
3131

32-
PubSubConfig:
33-
Project: dev
34-
TopicName: xp-update
32+
MessageQueueConfig:
33+
# Kind: pubsub
34+
PubSubConfig:
35+
Project: dev
36+
TopicName: xp-update
3537

3638
NewRelicConfig:
3739
Enabled: false

0 commit comments

Comments
 (0)