From 8ee648612de667605b3ab18e53c13afadf6779d6 Mon Sep 17 00:00:00 2001 From: kushalmalani Date: Mon, 10 Jul 2023 12:08:07 -0700 Subject: [PATCH] Throw correct error on updating worker queue config for kubernetes executor (#1304) * Throw correct error on updating worker queue config for kubernetes executor * Fixing from file tests --- cloud/deployment/fromfile/fromfile_test.go | 30 ++++++---- cloud/deployment/workerqueue/workerqueue.go | 2 +- .../workerqueue/workerqueue_test.go | 57 +++++++++++++------ 3 files changed, 60 insertions(+), 29 deletions(-) diff --git a/cloud/deployment/fromfile/fromfile_test.go b/cloud/deployment/fromfile/fromfile_test.go index 3ab2b9c19..8fb9e73e4 100644 --- a/cloud/deployment/fromfile/fromfile_test.go +++ b/cloud/deployment/fromfile/fromfile_test.go @@ -2801,10 +2801,12 @@ func TestGetCreateOrUpdateInput(t *testing.T) { deploymentFromFile.Deployment.Configuration.SchedulerAU = 4 deploymentFromFile.Deployment.Configuration.SchedulerCount = 2 deploymentFromFile.Deployment.Configuration.Executor = deployment.KubeExecutor + minCount := -1 qList = []inspect.Workerq{ { Name: "default", WorkerType: "test-worker-1", + MinWorkerCount: &minCount, MaxWorkerCount: 10, }, } @@ -2838,10 +2840,12 @@ func TestGetCreateOrUpdateInput(t *testing.T) { deploymentFromFile.Deployment.Configuration.SchedulerAU = 4 deploymentFromFile.Deployment.Configuration.SchedulerCount = 2 deploymentFromFile.Deployment.Configuration.Executor = deployment.KubeExecutor + minCount := -1 qList = []inspect.Workerq{ { Name: "default", WorkerType: "test-worker-1", + MinWorkerCount: &minCount, WorkerConcurrency: 10, }, } @@ -2948,18 +2952,21 @@ func TestGetCreateOrUpdateInput(t *testing.T) { deploymentFromFile.Deployment.Configuration.SchedulerAU = 4 deploymentFromFile.Deployment.Configuration.SchedulerCount = 2 deploymentFromFile.Deployment.Configuration.Executor = deployment.KubeExecutor + minCount := -1 qList = []inspect.Workerq{ { - Name: "default", - WorkerType: "test-worker-1", + Name: "default", + WorkerType: "test-worker-1", + MinWorkerCount: &minCount, }, } deploymentFromFile.Deployment.WorkerQs = qList expectedQList = []astro.WorkerQueue{ { - Name: "default", - IsDefault: true, - NodePoolID: "test-pool-id", + Name: "default", + IsDefault: true, + NodePoolID: "test-pool-id", + MinWorkerCount: minCount, }, } existingPools = []astrocore.NodePool{ @@ -3233,18 +3240,21 @@ func TestGetCreateOrUpdateInput(t *testing.T) { deploymentFromFile.Deployment.Configuration.SchedulerAU = 4 deploymentFromFile.Deployment.Configuration.SchedulerCount = 2 deploymentFromFile.Deployment.Configuration.Executor = deployment.KubeExecutor + minCount := -1 qList = []inspect.Workerq{ { - Name: "default", - WorkerType: "test-worker-1", + Name: "default", + WorkerType: "test-worker-1", + MinWorkerCount: &minCount, }, } deploymentFromFile.Deployment.WorkerQs = qList expectedQList = []astro.WorkerQueue{ { - Name: "default", - IsDefault: true, - NodePoolID: "test-pool-id", + Name: "default", + IsDefault: true, + NodePoolID: "test-pool-id", + MinWorkerCount: minCount, }, } existingPools = []astrocore.NodePool{ diff --git a/cloud/deployment/workerqueue/workerqueue.go b/cloud/deployment/workerqueue/workerqueue.go index 983dc1e35..7fdbd9615 100644 --- a/cloud/deployment/workerqueue/workerqueue.go +++ b/cloud/deployment/workerqueue/workerqueue.go @@ -284,7 +284,7 @@ func IsKubernetesWorkerQueueInputValid(requestedWorkerQueue *astro.WorkerQueue) errorMessage = "pod ram in the request. It will be calculated based on the requested worker type" return fmt.Errorf("%s %w %s", deployment.KubeExecutor, ErrNotSupported, errorMessage) } - if requestedWorkerQueue.MinWorkerCount != 0 { + if requestedWorkerQueue.MinWorkerCount != -1 { errorMessage = "minimum worker count in the request. It can only be used with CeleryExecutor" return fmt.Errorf("%s %w %s", deployment.KubeExecutor, ErrNotSupported, errorMessage) } diff --git a/cloud/deployment/workerqueue/workerqueue_test.go b/cloud/deployment/workerqueue/workerqueue_test.go index 049692304..9f1a6ee6f 100644 --- a/cloud/deployment/workerqueue/workerqueue_test.go +++ b/cloud/deployment/workerqueue/workerqueue_test.go @@ -481,7 +481,7 @@ func TestCreate(t *testing.T) { out := new(bytes.Buffer) mockClient := new(astro_mocks.Client) mockClient.On("ListDeployments", mock.Anything, mock.Anything).Return(keDeployment, nil).Once() - err := CreateOrUpdate("test-ws-id", "test-deployment-id", "", "default", createAction, "test-instance-type-1", 0, 0, 0, false, mockClient, mockCoreClient, out) + err := CreateOrUpdate("test-ws-id", "test-deployment-id", "", "default", createAction, "test-instance-type-1", -1, 0, 0, false, mockClient, mockCoreClient, out) assert.ErrorIs(t, err, errCannotUpdateExistingQueue) mockClient.AssertExpectations(t) }) @@ -1091,7 +1091,7 @@ func TestUpdate(t *testing.T) { }, nil).Once() mockClient.On("ListDeployments", mock.Anything, mock.Anything).Return(keDeployment, nil).Twice() mockClient.On("UpdateDeployment", &updateKEDeploymentInput).Return(keDeployment[0], nil).Once() - err := CreateOrUpdate("test-ws-id", "", "test-deployment-label", "default", updateAction, "test-instance-type", 0, 0, 0, true, mockClient, mockCoreClient, out) + err := CreateOrUpdate("test-ws-id", "", "test-deployment-label", "default", updateAction, "test-instance-type", -1, 0, 0, true, mockClient, mockCoreClient, out) assert.NoError(t, err) assert.Contains(t, out.String(), expectedOutMessage) mockClient.AssertExpectations(t) @@ -1120,7 +1120,7 @@ func TestUpdate(t *testing.T) { defer func() { updateKEDeploymentInput.WorkerQueues[0].NodePoolID = origNodePoolID }() mockClient.On("ListDeployments", mock.Anything, mock.Anything).Return(keDeployment, nil).Twice() mockClient.On("UpdateDeployment", &updateKEDeploymentInput).Return(keDeployment[0], nil).Once() - err := CreateOrUpdate("test-ws-id", "", "test-deployment-label", "default", updateAction, "test-instance-type-1", 0, 0, 0, true, mockClient, mockCoreClient, out) + err := CreateOrUpdate("test-ws-id", "", "test-deployment-label", "default", updateAction, "test-instance-type-1", -1, 0, 0, true, mockClient, mockCoreClient, out) assert.NoError(t, err) assert.Contains(t, out.String(), expectedOutMessage) mockClient.AssertExpectations(t) @@ -1129,7 +1129,7 @@ func TestUpdate(t *testing.T) { out := new(bytes.Buffer) mockClient := new(astro_mocks.Client) mockClient.On("ListDeployments", mock.Anything, mock.Anything).Return(keDeployment, nil).Once() - err := CreateOrUpdate("test-ws-id", "test-deployment-id", "", "test-KE-q", updateAction, "test-instance-type-1", 0, 0, 0, false, mockClient, mockCoreClient, out) + err := CreateOrUpdate("test-ws-id", "test-deployment-id", "", "test-KE-q", updateAction, "test-instance-type-1", -1, 0, 0, false, mockClient, mockCoreClient, out) assert.ErrorIs(t, err, ErrNotSupported) assert.ErrorContains(t, err, "KubernetesExecutor does not support a non default worker queue in the request. Rename the queue to default") mockClient.AssertExpectations(t) @@ -1618,8 +1618,11 @@ func TestIsHostedCeleryWorkerQueueInputValid(t *testing.T) { func TestIsKubernetesWorkerQueueInputValid(t *testing.T) { testUtil.InitTestConfig(testUtil.CloudPlatform) requestedWorkerQueue := &astro.WorkerQueue{ - Name: "default", - NodePoolID: "test-pool-id", + Name: "default", + NodePoolID: "test-pool-id", + MinWorkerCount: -1, + MaxWorkerCount: 0, + WorkerConcurrency: 0, } t.Run("returns nil when queue input is valid", func(t *testing.T) { @@ -1630,8 +1633,11 @@ func TestIsKubernetesWorkerQueueInputValid(t *testing.T) { requestedWorkerQueue.Name = "test-queue" defer func() { requestedWorkerQueue = &astro.WorkerQueue{ - Name: "default", - NodePoolID: "test-pool-id", + Name: "default", + NodePoolID: "test-pool-id", + MinWorkerCount: -1, + MaxWorkerCount: 0, + WorkerConcurrency: 0, } }() err := IsKubernetesWorkerQueueInputValid(requestedWorkerQueue) @@ -1642,8 +1648,11 @@ func TestIsKubernetesWorkerQueueInputValid(t *testing.T) { requestedWorkerQueue.PodCPU = "1.0" defer func() { requestedWorkerQueue = &astro.WorkerQueue{ - Name: "default", - NodePoolID: "test-pool-id", + Name: "default", + NodePoolID: "test-pool-id", + MinWorkerCount: -1, + MaxWorkerCount: 0, + WorkerConcurrency: 0, } }() err := IsKubernetesWorkerQueueInputValid(requestedWorkerQueue) @@ -1654,8 +1663,11 @@ func TestIsKubernetesWorkerQueueInputValid(t *testing.T) { requestedWorkerQueue.PodRAM = "1.0" defer func() { requestedWorkerQueue = &astro.WorkerQueue{ - Name: "default", - NodePoolID: "test-pool-id", + Name: "default", + NodePoolID: "test-pool-id", + MinWorkerCount: -1, + MaxWorkerCount: 0, + WorkerConcurrency: 0, } }() err := IsKubernetesWorkerQueueInputValid(requestedWorkerQueue) @@ -1666,8 +1678,11 @@ func TestIsKubernetesWorkerQueueInputValid(t *testing.T) { requestedWorkerQueue.MinWorkerCount = 8 defer func() { requestedWorkerQueue = &astro.WorkerQueue{ - Name: "default", - NodePoolID: "test-pool-id", + Name: "default", + NodePoolID: "test-pool-id", + MinWorkerCount: -1, + MaxWorkerCount: 0, + WorkerConcurrency: 0, } }() err := IsKubernetesWorkerQueueInputValid(requestedWorkerQueue) @@ -1678,8 +1693,11 @@ func TestIsKubernetesWorkerQueueInputValid(t *testing.T) { requestedWorkerQueue.MaxWorkerCount = 25 defer func() { requestedWorkerQueue = &astro.WorkerQueue{ - Name: "default", - NodePoolID: "test-pool-id", + Name: "default", + NodePoolID: "test-pool-id", + MinWorkerCount: -1, + MaxWorkerCount: 0, + WorkerConcurrency: 0, } }() err := IsKubernetesWorkerQueueInputValid(requestedWorkerQueue) @@ -1690,8 +1708,11 @@ func TestIsKubernetesWorkerQueueInputValid(t *testing.T) { requestedWorkerQueue.WorkerConcurrency = 350 defer func() { requestedWorkerQueue = &astro.WorkerQueue{ - Name: "default", - NodePoolID: "test-pool-id", + Name: "default", + NodePoolID: "test-pool-id", + MinWorkerCount: -1, + MaxWorkerCount: 0, + WorkerConcurrency: 0, } }() err := IsKubernetesWorkerQueueInputValid(requestedWorkerQueue)