Skip to content

Commit

Permalink
Throw correct error on updating worker queue config for kubernetes ex…
Browse files Browse the repository at this point in the history
…ecutor (#1304)

* Throw correct error on updating worker queue config for kubernetes executor

* Fixing from file tests
  • Loading branch information
kushalmalani authored Jul 10, 2023
1 parent f4e6650 commit 8ee6486
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 29 deletions.
30 changes: 20 additions & 10 deletions cloud/deployment/fromfile/fromfile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2801,10 +2801,12 @@ func TestGetCreateOrUpdateInput(t *testing.T) {
deploymentFromFile.Deployment.Configuration.SchedulerAU = 4
deploymentFromFile.Deployment.Configuration.SchedulerCount = 2
deploymentFromFile.Deployment.Configuration.Executor = deployment.KubeExecutor
minCount := -1
qList = []inspect.Workerq{
{
Name: "default",
WorkerType: "test-worker-1",
MinWorkerCount: &minCount,
MaxWorkerCount: 10,
},
}
Expand Down Expand Up @@ -2838,10 +2840,12 @@ func TestGetCreateOrUpdateInput(t *testing.T) {
deploymentFromFile.Deployment.Configuration.SchedulerAU = 4
deploymentFromFile.Deployment.Configuration.SchedulerCount = 2
deploymentFromFile.Deployment.Configuration.Executor = deployment.KubeExecutor
minCount := -1
qList = []inspect.Workerq{
{
Name: "default",
WorkerType: "test-worker-1",
MinWorkerCount: &minCount,
WorkerConcurrency: 10,
},
}
Expand Down Expand Up @@ -2948,18 +2952,21 @@ func TestGetCreateOrUpdateInput(t *testing.T) {
deploymentFromFile.Deployment.Configuration.SchedulerAU = 4
deploymentFromFile.Deployment.Configuration.SchedulerCount = 2
deploymentFromFile.Deployment.Configuration.Executor = deployment.KubeExecutor
minCount := -1
qList = []inspect.Workerq{
{
Name: "default",
WorkerType: "test-worker-1",
Name: "default",
WorkerType: "test-worker-1",
MinWorkerCount: &minCount,
},
}
deploymentFromFile.Deployment.WorkerQs = qList
expectedQList = []astro.WorkerQueue{
{
Name: "default",
IsDefault: true,
NodePoolID: "test-pool-id",
Name: "default",
IsDefault: true,
NodePoolID: "test-pool-id",
MinWorkerCount: minCount,
},
}
existingPools = []astrocore.NodePool{
Expand Down Expand Up @@ -3233,18 +3240,21 @@ func TestGetCreateOrUpdateInput(t *testing.T) {
deploymentFromFile.Deployment.Configuration.SchedulerAU = 4
deploymentFromFile.Deployment.Configuration.SchedulerCount = 2
deploymentFromFile.Deployment.Configuration.Executor = deployment.KubeExecutor
minCount := -1
qList = []inspect.Workerq{
{
Name: "default",
WorkerType: "test-worker-1",
Name: "default",
WorkerType: "test-worker-1",
MinWorkerCount: &minCount,
},
}
deploymentFromFile.Deployment.WorkerQs = qList
expectedQList = []astro.WorkerQueue{
{
Name: "default",
IsDefault: true,
NodePoolID: "test-pool-id",
Name: "default",
IsDefault: true,
NodePoolID: "test-pool-id",
MinWorkerCount: minCount,
},
}
existingPools = []astrocore.NodePool{
Expand Down
2 changes: 1 addition & 1 deletion cloud/deployment/workerqueue/workerqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ func IsKubernetesWorkerQueueInputValid(requestedWorkerQueue *astro.WorkerQueue)
errorMessage = "pod ram in the request. It will be calculated based on the requested worker type"
return fmt.Errorf("%s %w %s", deployment.KubeExecutor, ErrNotSupported, errorMessage)
}
if requestedWorkerQueue.MinWorkerCount != 0 {
if requestedWorkerQueue.MinWorkerCount != -1 {
errorMessage = "minimum worker count in the request. It can only be used with CeleryExecutor"
return fmt.Errorf("%s %w %s", deployment.KubeExecutor, ErrNotSupported, errorMessage)
}
Expand Down
57 changes: 39 additions & 18 deletions cloud/deployment/workerqueue/workerqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ func TestCreate(t *testing.T) {
out := new(bytes.Buffer)
mockClient := new(astro_mocks.Client)
mockClient.On("ListDeployments", mock.Anything, mock.Anything).Return(keDeployment, nil).Once()
err := CreateOrUpdate("test-ws-id", "test-deployment-id", "", "default", createAction, "test-instance-type-1", 0, 0, 0, false, mockClient, mockCoreClient, out)
err := CreateOrUpdate("test-ws-id", "test-deployment-id", "", "default", createAction, "test-instance-type-1", -1, 0, 0, false, mockClient, mockCoreClient, out)
assert.ErrorIs(t, err, errCannotUpdateExistingQueue)
mockClient.AssertExpectations(t)
})
Expand Down Expand Up @@ -1091,7 +1091,7 @@ func TestUpdate(t *testing.T) {
}, nil).Once()
mockClient.On("ListDeployments", mock.Anything, mock.Anything).Return(keDeployment, nil).Twice()
mockClient.On("UpdateDeployment", &updateKEDeploymentInput).Return(keDeployment[0], nil).Once()
err := CreateOrUpdate("test-ws-id", "", "test-deployment-label", "default", updateAction, "test-instance-type", 0, 0, 0, true, mockClient, mockCoreClient, out)
err := CreateOrUpdate("test-ws-id", "", "test-deployment-label", "default", updateAction, "test-instance-type", -1, 0, 0, true, mockClient, mockCoreClient, out)
assert.NoError(t, err)
assert.Contains(t, out.String(), expectedOutMessage)
mockClient.AssertExpectations(t)
Expand Down Expand Up @@ -1120,7 +1120,7 @@ func TestUpdate(t *testing.T) {
defer func() { updateKEDeploymentInput.WorkerQueues[0].NodePoolID = origNodePoolID }()
mockClient.On("ListDeployments", mock.Anything, mock.Anything).Return(keDeployment, nil).Twice()
mockClient.On("UpdateDeployment", &updateKEDeploymentInput).Return(keDeployment[0], nil).Once()
err := CreateOrUpdate("test-ws-id", "", "test-deployment-label", "default", updateAction, "test-instance-type-1", 0, 0, 0, true, mockClient, mockCoreClient, out)
err := CreateOrUpdate("test-ws-id", "", "test-deployment-label", "default", updateAction, "test-instance-type-1", -1, 0, 0, true, mockClient, mockCoreClient, out)
assert.NoError(t, err)
assert.Contains(t, out.String(), expectedOutMessage)
mockClient.AssertExpectations(t)
Expand All @@ -1129,7 +1129,7 @@ func TestUpdate(t *testing.T) {
out := new(bytes.Buffer)
mockClient := new(astro_mocks.Client)
mockClient.On("ListDeployments", mock.Anything, mock.Anything).Return(keDeployment, nil).Once()
err := CreateOrUpdate("test-ws-id", "test-deployment-id", "", "test-KE-q", updateAction, "test-instance-type-1", 0, 0, 0, false, mockClient, mockCoreClient, out)
err := CreateOrUpdate("test-ws-id", "test-deployment-id", "", "test-KE-q", updateAction, "test-instance-type-1", -1, 0, 0, false, mockClient, mockCoreClient, out)
assert.ErrorIs(t, err, ErrNotSupported)
assert.ErrorContains(t, err, "KubernetesExecutor does not support a non default worker queue in the request. Rename the queue to default")
mockClient.AssertExpectations(t)
Expand Down Expand Up @@ -1618,8 +1618,11 @@ func TestIsHostedCeleryWorkerQueueInputValid(t *testing.T) {
func TestIsKubernetesWorkerQueueInputValid(t *testing.T) {
testUtil.InitTestConfig(testUtil.CloudPlatform)
requestedWorkerQueue := &astro.WorkerQueue{
Name: "default",
NodePoolID: "test-pool-id",
Name: "default",
NodePoolID: "test-pool-id",
MinWorkerCount: -1,
MaxWorkerCount: 0,
WorkerConcurrency: 0,
}

t.Run("returns nil when queue input is valid", func(t *testing.T) {
Expand All @@ -1630,8 +1633,11 @@ func TestIsKubernetesWorkerQueueInputValid(t *testing.T) {
requestedWorkerQueue.Name = "test-queue"
defer func() {
requestedWorkerQueue = &astro.WorkerQueue{
Name: "default",
NodePoolID: "test-pool-id",
Name: "default",
NodePoolID: "test-pool-id",
MinWorkerCount: -1,
MaxWorkerCount: 0,
WorkerConcurrency: 0,
}
}()
err := IsKubernetesWorkerQueueInputValid(requestedWorkerQueue)
Expand All @@ -1642,8 +1648,11 @@ func TestIsKubernetesWorkerQueueInputValid(t *testing.T) {
requestedWorkerQueue.PodCPU = "1.0"
defer func() {
requestedWorkerQueue = &astro.WorkerQueue{
Name: "default",
NodePoolID: "test-pool-id",
Name: "default",
NodePoolID: "test-pool-id",
MinWorkerCount: -1,
MaxWorkerCount: 0,
WorkerConcurrency: 0,
}
}()
err := IsKubernetesWorkerQueueInputValid(requestedWorkerQueue)
Expand All @@ -1654,8 +1663,11 @@ func TestIsKubernetesWorkerQueueInputValid(t *testing.T) {
requestedWorkerQueue.PodRAM = "1.0"
defer func() {
requestedWorkerQueue = &astro.WorkerQueue{
Name: "default",
NodePoolID: "test-pool-id",
Name: "default",
NodePoolID: "test-pool-id",
MinWorkerCount: -1,
MaxWorkerCount: 0,
WorkerConcurrency: 0,
}
}()
err := IsKubernetesWorkerQueueInputValid(requestedWorkerQueue)
Expand All @@ -1666,8 +1678,11 @@ func TestIsKubernetesWorkerQueueInputValid(t *testing.T) {
requestedWorkerQueue.MinWorkerCount = 8
defer func() {
requestedWorkerQueue = &astro.WorkerQueue{
Name: "default",
NodePoolID: "test-pool-id",
Name: "default",
NodePoolID: "test-pool-id",
MinWorkerCount: -1,
MaxWorkerCount: 0,
WorkerConcurrency: 0,
}
}()
err := IsKubernetesWorkerQueueInputValid(requestedWorkerQueue)
Expand All @@ -1678,8 +1693,11 @@ func TestIsKubernetesWorkerQueueInputValid(t *testing.T) {
requestedWorkerQueue.MaxWorkerCount = 25
defer func() {
requestedWorkerQueue = &astro.WorkerQueue{
Name: "default",
NodePoolID: "test-pool-id",
Name: "default",
NodePoolID: "test-pool-id",
MinWorkerCount: -1,
MaxWorkerCount: 0,
WorkerConcurrency: 0,
}
}()
err := IsKubernetesWorkerQueueInputValid(requestedWorkerQueue)
Expand All @@ -1690,8 +1708,11 @@ func TestIsKubernetesWorkerQueueInputValid(t *testing.T) {
requestedWorkerQueue.WorkerConcurrency = 350
defer func() {
requestedWorkerQueue = &astro.WorkerQueue{
Name: "default",
NodePoolID: "test-pool-id",
Name: "default",
NodePoolID: "test-pool-id",
MinWorkerCount: -1,
MaxWorkerCount: 0,
WorkerConcurrency: 0,
}
}()
err := IsKubernetesWorkerQueueInputValid(requestedWorkerQueue)
Expand Down

0 comments on commit 8ee6486

Please sign in to comment.