diff --git a/astro-client/astro.go b/astro-client/astro.go index 2461a5e46..74a8a718d 100644 --- a/astro-client/astro.go +++ b/astro-client/astro.go @@ -21,6 +21,7 @@ type Client interface { DeleteDeployment(input DeleteDeploymentInput) (Deployment, error) GetDeploymentHistory(vars map[string]interface{}) (DeploymentHistory, error) GetDeploymentConfig() (DeploymentConfig, error) + GetDeploymentConfigWithOrganization(organizationID string) (DeploymentConfig, error) ModifyDeploymentVariable(input EnvironmentVariablesInput) ([]EnvironmentVariablesObject, error) InitiateDagDeployment(input InitiateDagDeploymentInput) (InitiateDagDeployment, error) ReportDagDeploymentStatus(input *ReportDagDeploymentStatusInput) (DagDeploymentStatus, error) @@ -125,6 +126,19 @@ func (c *HTTPClient) GetDeploymentConfig() (DeploymentConfig, error) { return resp.Data.GetDeploymentConfig, nil } +func (c *HTTPClient) GetDeploymentConfigWithOrganization(organizationID string) (DeploymentConfig, error) { + req := Request{ + Query: GetDeploymentConfigOptionsWithOrganization, + Variables: map[string]interface{}{"organizationId": organizationID}, + } + + resp, err := req.DoWithPublicClient(c) + if err != nil { + return DeploymentConfig{}, err + } + return resp.Data.GetDeploymentConfig, nil +} + func (c *HTTPClient) ModifyDeploymentVariable(input EnvironmentVariablesInput) ([]EnvironmentVariablesObject, error) { req := Request{ Query: CreateDeploymentVariables, diff --git a/astro-client/astro_test.go b/astro-client/astro_test.go index 3b87b39ae..1cb545159 100644 --- a/astro-client/astro_test.go +++ b/astro-client/astro_test.go @@ -348,6 +348,57 @@ func TestGetDeploymentConfig(t *testing.T) { }) } +func TestGetDeploymentConfigWithOrganization(t *testing.T) { + testUtil.InitTestConfig(testUtil.CloudPlatform) + mockResponse := &Response{ + Data: ResponseData{ + GetDeploymentConfig: DeploymentConfig{ + AstronomerUnit: AstronomerUnit{CPU: 1, Memory: 1024}, + RuntimeReleases: []RuntimeRelease{ + { + Version: "4.2.5", + AirflowVersion: "2.2.5", + Channel: "stable", + ReleaseDate: "2020-06-25", + AirflowDatabaseMigration: true, + }, + }, + }, + }, + } + jsonResponse, err := json.Marshal(mockResponse) + assert.NoError(t, err) + + t.Run("success", func(t *testing.T) { + client := testUtil.NewTestClient(func(req *http.Request) *http.Response { + return &http.Response{ + StatusCode: 200, + Body: io.NopCloser(bytes.NewBuffer(jsonResponse)), + Header: make(http.Header), + } + }) + astroClient := NewAstroClient(client) + + deploymentConfig, err := astroClient.GetDeploymentConfigWithOrganization("test-org-id") + assert.NoError(t, err) + assert.Equal(t, deploymentConfig, mockResponse.Data.GetDeploymentConfig) + }) + + t.Run("error", func(t *testing.T) { + client := testUtil.NewTestClient(func(req *http.Request) *http.Response { + return &http.Response{ + StatusCode: 500, + Body: io.NopCloser(bytes.NewBufferString("Internal Server Error")), + Header: make(http.Header), + } + }) + astroClient := NewAstroClient(client) + + _, err := astroClient.GetDeploymentConfigWithOrganization("test-org-id") + assert.Contains(t, err.Error(), "Internal Server Error") + }) +} + func TestModifyDeploymentVariable(t *testing.T) { testUtil.InitTestConfig(testUtil.CloudPlatform) mockResponse := &Response{ diff --git a/astro-client/mocks/Client.go b/astro-client/mocks/Client.go index 90680be96..5025eb7ae 100644 --- a/astro-client/mocks/Client.go +++ b/astro-client/mocks/Client.go @@ -163,6 +163,30 @@ func (_m *Client) GetDeploymentConfig() (astro.DeploymentConfig, error) { return r0, r1 } +// GetDeploymentConfigWithOrganization provides a mock function with given fields: organizationID +func (_m *Client) GetDeploymentConfigWithOrganization(organizationID string) (astro.DeploymentConfig, error) { + ret := _m.Called(organizationID) + + var r0 astro.DeploymentConfig + var r1 error + if rf, ok := ret.Get(0).(func(string) (astro.DeploymentConfig, error)); ok { + return rf(organizationID) + } + if rf, ok := ret.Get(0).(func(string) astro.DeploymentConfig); ok { + r0 = rf(organizationID) + } else { + r0 = ret.Get(0).(astro.DeploymentConfig) + } + + if rf, ok := ret.Get(1).(func(string) error); ok { + r1 = rf(organizationID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // GetDeploymentHistory provides a mock function with given fields: vars func (_m *Client) GetDeploymentHistory(vars map[string]interface{}) (astro.DeploymentHistory, error) { ret := _m.Called(vars) diff --git a/astro-client/queries.go b/astro-client/queries.go index 93dc538af..b0d4bda0b 100644 --- a/astro-client/queries.go +++ b/astro-client/queries.go @@ -208,6 +208,51 @@ var ( } ` + GetDeploymentConfigOptionsWithOrganization = ` + query deploymentConfigOptions($organizationId: Id!) { + deploymentConfigOptions(organizationId: $organizationId) { + components + astroUnit { + cpu + memory + } + executors + runtimeReleases { + channel + version + } + astroMachines { + concurrentTasks + concurrentTasksMax + cpu + memory + nodePoolType + storageSize + type + } + defaultAstroMachine { + concurrentTasks + concurrentTasksMax + cpu + memory + nodePoolType + storageSize + type + } + defaultSchedulerSize { + cpu + memory + size + } + schedulerSizes { + cpu + memory + size + } + } + } + ` + GetWorkerQueueOptions = ` query workerQueueOptions { workerQueueOptions { diff --git a/cloud/deployment/deployment.go b/cloud/deployment/deployment.go index c9a9c684e..31c2760d8 100644 --- a/cloud/deployment/deployment.go +++ b/cloud/deployment/deployment.go @@ -383,7 +383,12 @@ func validateResources(schedulerAU, schedulerReplicas int, configOption astro.De } func validateRuntimeVersion(runtimeVersion string, client astro.Client) (bool, error) { - runtimeReleases, err := GetRuntimeReleases(client) + c, err := config.GetCurrentContext() + if err != nil { + return false, err + } + + runtimeReleases, err := GetRuntimeReleases(c.Organization, client) if err != nil { return false, err } @@ -394,11 +399,11 @@ func validateRuntimeVersion(runtimeVersion string, client astro.Client) (bool, e return true, nil } -func GetRuntimeReleases(client astro.Client) ([]string, error) { +func GetRuntimeReleases(organizationID string, client astro.Client) ([]string, error) { // get deployment config options runtimeReleases := []string{} - ConfigOptions, err := client.GetDeploymentConfig() + ConfigOptions, err := client.GetDeploymentConfigWithOrganization(organizationID) if err != nil { return runtimeReleases, errors.Wrap(err, astro.AstronomerConnectionErrMsg) } diff --git a/cloud/deployment/deployment_test.go b/cloud/deployment/deployment_test.go index a5f541bfd..3db7778b0 100644 --- a/cloud/deployment/deployment_test.go +++ b/cloud/deployment/deployment_test.go @@ -779,7 +779,27 @@ func TestCreate(t *testing.T) { Version: "4.2.5", }, }, - }, nil).Times(2) + }, nil).Times(1) + mockClient.On("GetDeploymentConfigWithOrganization", mock.Anything).Return(astro.DeploymentConfig{ + Components: astro.Components{ + Scheduler: astro.SchedulerConfig{ + AU: astro.AuConfig{ + Default: 5, + Limit: 24, + }, + Replicas: astro.ReplicasConfig{ + Default: 1, + Minimum: 1, + Limit: 4, + }, + }, + }, + RuntimeReleases: []astro.RuntimeRelease{ + { + Version: "4.2.5", + }, + }, + }, nil).Times(1) mockCoreClient.On("ListWorkspacesWithResponse", mock.Anything, mock.Anything, mock.Anything).Return(&ListWorkspacesResponseOK, nil).Once() mockCoreClient.On("ListClustersWithResponse", mock.Anything, mockOrgShortName, clusterListParams).Return(&mockListClustersResponse, nil).Once() mockClient.On("CreateDeployment", &deploymentCreateInput).Return(astro.Deployment{ID: "test-id"}, nil).Once() @@ -811,7 +831,27 @@ func TestCreate(t *testing.T) { Version: "4.2.5", }, }, - }, nil).Times(2) + }, nil).Times(1) + mockClient.On("GetDeploymentConfigWithOrganization", mock.Anything).Return(astro.DeploymentConfig{ + Components: astro.Components{ + Scheduler: astro.SchedulerConfig{ + AU: astro.AuConfig{ + Default: 5, + Limit: 24, + }, + Replicas: astro.ReplicasConfig{ + Default: 1, + Minimum: 1, + Limit: 4, + }, + }, + }, + RuntimeReleases: []astro.RuntimeRelease{ + { + Version: "4.2.5", + }, + }, + }, nil).Times(1) deploymentCreateInput.APIKeyOnlyDeployments = true defer func() { deploymentCreateInput.APIKeyOnlyDeployments = false }() mockCoreClient.On("ListWorkspacesWithResponse", mock.Anything, mock.Anything, mock.Anything).Return(&ListWorkspacesResponseOK, nil).Once() @@ -856,7 +896,30 @@ func TestCreate(t *testing.T) { DefaultSchedulerSize: astro.MachineUnit{ Size: "small", }, - }, nil).Times(2) + }, nil).Times(1) + mockClient.On("GetDeploymentConfigWithOrganization", mock.Anything).Return(astro.DeploymentConfig{ + Components: astro.Components{ + Scheduler: astro.SchedulerConfig{ + AU: astro.AuConfig{ + Default: 5, + Limit: 24, + }, + Replicas: astro.ReplicasConfig{ + Default: 1, + Minimum: 1, + Limit: 4, + }, + }, + }, + RuntimeReleases: []astro.RuntimeRelease{ + { + Version: "4.2.5", + }, + }, + DefaultSchedulerSize: astro.MachineUnit{ + Size: "small", + }, + }, nil).Times(1) getSharedClusterParams := &astrocore.GetSharedClusterParams{ Region: region, CloudProvider: astrocore.GetSharedClusterParamsCloudProvider(astrocore.SharedClusterCloudProviderGcp), @@ -945,7 +1008,30 @@ func TestCreate(t *testing.T) { DefaultSchedulerSize: astro.MachineUnit{ Size: "small", }, - }, nil).Times(2) + }, nil).Times(1) + mockClient.On("GetDeploymentConfigWithOrganization", mock.Anything).Return(astro.DeploymentConfig{ + Components: astro.Components{ + Scheduler: astro.SchedulerConfig{ + AU: astro.AuConfig{ + Default: 5, + Limit: 24, + }, + Replicas: astro.ReplicasConfig{ + Default: 1, + Minimum: 1, + Limit: 4, + }, + }, + }, + RuntimeReleases: []astro.RuntimeRelease{ + { + Version: "4.2.5", + }, + }, + DefaultSchedulerSize: astro.MachineUnit{ + Size: "small", + }, + }, nil).Times(1) getSharedClusterParams := &astrocore.GetSharedClusterParams{ Region: region, CloudProvider: astrocore.GetSharedClusterParamsCloudProvider(astrocore.SharedClusterCloudProviderGcp), @@ -1026,7 +1112,30 @@ func TestCreate(t *testing.T) { Version: "4.2.5", }, }, - }, nil).Times(2) + }, nil).Times(1) + mockClient.On("GetDeploymentConfigWithOrganization", mock.Anything).Return(astro.DeploymentConfig{ + Components: astro.Components{ + Scheduler: astro.SchedulerConfig{ + AU: astro.AuConfig{ + Default: 5, + Limit: 24, + }, + Replicas: astro.ReplicasConfig{ + Default: 1, + Minimum: 1, + Limit: 4, + }, + }, + }, + RuntimeReleases: []astro.RuntimeRelease{ + { + Version: "4.2.5", + }, + }, + DefaultSchedulerSize: astro.MachineUnit{ + Size: "small", + }, + }, nil).Times(1) deploymentCreateInput.DeploymentSpec.Executor = "KubeExecutor" defer func() { deploymentCreateInput.DeploymentSpec.Executor = CeleryExecutor }() mockCoreClient.On("ListWorkspacesWithResponse", mock.Anything, mock.Anything, mock.Anything).Return(&ListWorkspacesResponseOK, nil).Once() @@ -1061,7 +1170,30 @@ func TestCreate(t *testing.T) { Version: "4.2.5", }, }, - }, nil).Times(4) + }, nil).Times(2) + mockClient.On("GetDeploymentConfigWithOrganization", mock.Anything).Return(astro.DeploymentConfig{ + Components: astro.Components{ + Scheduler: astro.SchedulerConfig{ + AU: astro.AuConfig{ + Default: 5, + Limit: 24, + }, + Replicas: astro.ReplicasConfig{ + Default: 1, + Minimum: 1, + Limit: 4, + }, + }, + }, + RuntimeReleases: []astro.RuntimeRelease{ + { + Version: "4.2.5", + }, + }, + DefaultSchedulerSize: astro.MachineUnit{ + Size: "small", + }, + }, nil).Times(2) mockCoreClient.On("ListWorkspacesWithResponse", mock.Anything, mock.Anything, mock.Anything).Return(&ListWorkspacesResponseOK, nil).Twice() mockCoreClient.On("ListClustersWithResponse", mock.Anything, mock.Anything, clusterListParams).Return(&mockListClustersResponse, nil).Twice() mockClient.On("CreateDeployment", &deploymentCreateInput).Return(astro.Deployment{ID: deploymentID}, nil).Twice() @@ -1121,7 +1253,30 @@ func TestCreate(t *testing.T) { Version: "4.2.5", }, }, - }, nil).Times(2) + }, nil).Times(1) + mockClient.On("GetDeploymentConfigWithOrganization", mock.Anything).Return(astro.DeploymentConfig{ + Components: astro.Components{ + Scheduler: astro.SchedulerConfig{ + AU: astro.AuConfig{ + Default: 5, + Limit: 24, + }, + Replicas: astro.ReplicasConfig{ + Default: 1, + Minimum: 1, + Limit: 4, + }, + }, + }, + RuntimeReleases: []astro.RuntimeRelease{ + { + Version: "4.2.5", + }, + }, + DefaultSchedulerSize: astro.MachineUnit{ + Size: "small", + }, + }, nil).Times(1) mockCoreClient.On("ListWorkspacesWithResponse", mock.Anything, mock.Anything, mock.Anything).Return(&ListWorkspacesResponseOK, nil).Once() mockCoreClient.On("ListClustersWithResponse", mock.Anything, mock.Anything, clusterListParams).Return(&mockListClustersResponse, nil).Once() mockClient.On("CreateDeployment", &deploymentCreateInput).Return(astro.Deployment{}, errMock).Once() @@ -1161,7 +1316,30 @@ func TestCreate(t *testing.T) { Version: "4.2.5", }, }, - }, nil).Times(2) + }, nil).Times(1) + mockClient.On("GetDeploymentConfigWithOrganization", mock.Anything).Return(astro.DeploymentConfig{ + Components: astro.Components{ + Scheduler: astro.SchedulerConfig{ + AU: astro.AuConfig{ + Default: 5, + Limit: 24, + }, + Replicas: astro.ReplicasConfig{ + Default: 1, + Minimum: 1, + Limit: 4, + }, + }, + }, + RuntimeReleases: []astro.RuntimeRelease{ + { + Version: "4.2.5", + }, + }, + DefaultSchedulerSize: astro.MachineUnit{ + Size: "small", + }, + }, nil).Times(1) mockCoreClient.On("ListWorkspacesWithResponse", mock.Anything, mock.Anything, mock.Anything).Return(&ListWorkspacesResponseOK, nil).Once() mockCoreClient.On("ListClustersWithResponse", mock.Anything, mock.Anything, clusterListParams).Return(&astrocore.ListClustersResponse{}, errMock).Once() err := Create("test-name", ws, "test-desc", "invalid-cluster-id", "4.2.5", dagDeploy, CeleryExecutor, "", "", "", "", "", 10, 3, mockClient, mockCoreClient, false, &disableCiCdEnforcement) @@ -1194,6 +1372,26 @@ func TestCreate(t *testing.T) { assert.NoError(t, err) }) t.Run("list workspace failure", func(t *testing.T) { + mockClient.On("GetDeploymentConfigWithOrganization", mock.Anything).Return(astro.DeploymentConfig{ + Components: astro.Components{ + Scheduler: astro.SchedulerConfig{ + AU: astro.AuConfig{ + Default: 5, + Limit: 24, + }, + Replicas: astro.ReplicasConfig{ + Default: 1, + Minimum: 1, + Limit: 4, + }, + }, + }, + RuntimeReleases: []astro.RuntimeRelease{ + { + Version: "4.2.5", + }, + }, + }, nil).Times(1) mockClient.On("GetDeploymentConfig").Return(astro.DeploymentConfig{ Components: astro.Components{ Scheduler: astro.SchedulerConfig{ @@ -1213,13 +1411,33 @@ func TestCreate(t *testing.T) { Version: "4.2.5", }, }, - }, nil).Times(2) + }, nil).Times(1) mockCoreClient.On("ListWorkspacesWithResponse", mock.Anything, mock.Anything, mock.Anything).Return(nil, errMock).Once() err := Create("", ws, "test-desc", csID, "4.2.5", dagDeploy, CeleryExecutor, "", "", "", "", "", 10, 3, mockClient, mockCoreClient, false, &disableCiCdEnforcement) assert.ErrorIs(t, err, errMock) mockClient.AssertExpectations(t) }) t.Run("invalid workspace failure", func(t *testing.T) { + mockClient.On("GetDeploymentConfigWithOrganization", mock.Anything).Return(astro.DeploymentConfig{ + Components: astro.Components{ + Scheduler: astro.SchedulerConfig{ + AU: astro.AuConfig{ + Default: 5, + Limit: 24, + }, + Replicas: astro.ReplicasConfig{ + Default: 1, + Minimum: 1, + Limit: 4, + }, + }, + }, + RuntimeReleases: []astro.RuntimeRelease{ + { + Version: "4.2.5", + }, + }, + }, nil).Times(1) mockClient.On("GetDeploymentConfig").Return(astro.DeploymentConfig{ Components: astro.Components{ Scheduler: astro.SchedulerConfig{ @@ -1239,7 +1457,7 @@ func TestCreate(t *testing.T) { Version: "4.2.5", }, }, - }, nil).Times(2) + }, nil).Times(1) mockCoreClient.On("ListWorkspacesWithResponse", mock.Anything, mock.Anything, mock.Anything).Return(&ListWorkspacesResponseOK, nil).Once() err := Create("", "test-invalid-id", "test-desc", csID, "4.2.5", dagDeploy, CeleryExecutor, "", "", "", "", "", 10, 3, mockClient, mockCoreClient, false, &disableCiCdEnforcement) assert.Error(t, err) @@ -1253,6 +1471,26 @@ func TestCreate(t *testing.T) { ctx.SetContextKey("organization_product", "HOSTED") ctx.SetContextKey("organization", org) ctx.SetContextKey("organization_short_name", org) + mockClient.On("GetDeploymentConfigWithOrganization", mock.Anything).Return(astro.DeploymentConfig{ + Components: astro.Components{ + Scheduler: astro.SchedulerConfig{ + AU: astro.AuConfig{ + Default: 5, + Limit: 24, + }, + Replicas: astro.ReplicasConfig{ + Default: 1, + Minimum: 1, + Limit: 4, + }, + }, + }, + RuntimeReleases: []astro.RuntimeRelease{ + { + Version: "4.2.5", + }, + }, + }, nil).Times(1) mockClient.On("GetDeploymentConfig").Return(astro.DeploymentConfig{ Components: astro.Components{ Scheduler: astro.SchedulerConfig{ @@ -1272,7 +1510,7 @@ func TestCreate(t *testing.T) { Version: "4.2.5", }, }, - }, nil).Times(2) + }, nil).Times(1) mockCoreClient.On("ListWorkspacesWithResponse", mock.Anything, mock.Anything, mock.Anything).Return(&ListWorkspacesResponseOK, nil).Once() mockClient.On("CreateDeployment", &deploymentCreateInput).Return(astro.Deployment{ID: "test-id"}, nil).Once() mockClient.On("ListDeployments", org, ws).Return([]astro.Deployment{{ID: "test-id"}}, nil).Once() @@ -1298,6 +1536,26 @@ func TestCreate(t *testing.T) { mockCoreClient.AssertExpectations(t) }) t.Run("success with default config", func(t *testing.T) { + mockClient.On("GetDeploymentConfigWithOrganization", mock.Anything).Return(astro.DeploymentConfig{ + Components: astro.Components{ + Scheduler: astro.SchedulerConfig{ + AU: astro.AuConfig{ + Default: 5, + Limit: 24, + }, + Replicas: astro.ReplicasConfig{ + Default: 1, + Minimum: 1, + Limit: 4, + }, + }, + }, + RuntimeReleases: []astro.RuntimeRelease{ + { + Version: "4.2.5", + }, + }, + }, nil).Times(1) mockClient.On("GetDeploymentConfig").Return(astro.DeploymentConfig{ Components: astro.Components{ Scheduler: astro.SchedulerConfig{ @@ -1317,7 +1575,7 @@ func TestCreate(t *testing.T) { Version: "4.2.5", }, }, - }, nil).Times(2) + }, nil).Times(1) deploymentCreateInput := astro.CreateDeploymentInput{ WorkspaceID: ws, ClusterID: csID, @@ -1385,7 +1643,7 @@ func TestValidateResources(t *testing.T) { t.Run("invalid runtime version", func(t *testing.T) { mockClient := new(astro_mocks.Client) - mockClient.On("GetDeploymentConfig").Return(astro.DeploymentConfig{RuntimeReleases: []astro.RuntimeRelease{{Version: "4.2.5"}}}, nil).Once() + mockClient.On("GetDeploymentConfigWithOrganization", mock.Anything).Return(astro.DeploymentConfig{RuntimeReleases: []astro.RuntimeRelease{{Version: "4.2.5"}}}, nil).Once() resp, err := validateRuntimeVersion("4.2.4", mockClient) assert.NoError(t, err) diff --git a/cmd/cloud/deployment_test.go b/cmd/cloud/deployment_test.go index 269285bfa..1d88fc234 100644 --- a/cmd/cloud/deployment_test.go +++ b/cmd/cloud/deployment_test.go @@ -172,7 +172,27 @@ func TestDeploymentCreate(t *testing.T) { Version: "4.2.5", }, }, - }, nil).Times(14) + }, nil).Times(7) + mockClient.On("GetDeploymentConfigWithOrganization", mock.Anything).Return(astro.DeploymentConfig{ + Components: astro.Components{ + Scheduler: astro.SchedulerConfig{ + AU: astro.AuConfig{ + Default: 5, + Limit: 24, + }, + Replicas: astro.ReplicasConfig{ + Default: 1, + Minimum: 1, + Limit: 4, + }, + }, + }, + RuntimeReleases: []astro.RuntimeRelease{ + { + Version: "4.2.5", + }, + }, + }, nil).Times(7) mockCoreClient.On("ListWorkspacesWithResponse", mock.Anything, mock.Anything, mock.Anything).Return(&ListWorkspacesResponseOK, nil).Times(5) mockCoreClient.On("ListClustersWithResponse", mock.Anything, mock.Anything, mock.Anything).Return(&mockListClustersResponse, nil).Times(4) mockClient.On("CreateDeployment", &deploymentCreateInput).Return(astro.Deployment{ID: "test-id"}, nil).Twice()