Skip to content

Commit c5cb3a4

Browse files
committed
Add workflow skeleton and request from dashboard
- Create and register basic deletion workflow. - Trigger workflow from API endpoint. - Add button and API call to dashboard.
1 parent c7a73e7 commit c5cb3a4

File tree

8 files changed

+199
-29
lines changed

8 files changed

+199
-29
lines changed

cmd/enduro/main.go

+4
Original file line numberDiff line numberDiff line change
@@ -453,6 +453,10 @@ func main() {
453453
temporalsdk_activity.RegisterOptions{Name: activities.DisposeOriginalActivityName},
454454
)
455455

456+
w.RegisterWorkflowWithOptions(
457+
storage_workflows.NewStorageDeleteWorkflow(storagesvc).Execute,
458+
temporalsdk_workflow.RegisterOptions{Name: storage.StorageDeleteWorkflowName},
459+
)
456460
w.RegisterWorkflowWithOptions(
457461
storage_workflows.NewStorageUploadWorkflow().Execute,
458462
temporalsdk_workflow.RegisterOptions{Name: storage.StorageUploadWorkflowName},

dashboard/src/components/AipLocationCard.vue

+11
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,17 @@ watch(aipStore.ui.download, () => download());
8080
</template>
8181
<template v-else>Move</template>
8282
</button>
83+
<button
84+
v-if="
85+
aipStore.isStored &&
86+
authStore.checkAttributes(['storage:aips:deletion:request'])
87+
"
88+
type="button"
89+
class="btn btn-primary btn-sm"
90+
@click="aipStore.requestDeletion"
91+
>
92+
Delete
93+
</button>
8394
</div>
8495
</div>
8596
</div>

dashboard/src/stores/aip.ts

+21
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,27 @@ export const useAipStore = defineStore("aip", {
173173
}
174174
this.locationChanging = !resp.done;
175175
},
176+
async requestDeletion() {
177+
if (!this.current) return;
178+
// TODO:
179+
// - Add dialog and get reason from it.
180+
// - Improve error reporting.
181+
try {
182+
await client.storage.storageRequestAipDeletion({
183+
uuid: this.current.uuid,
184+
requestAipDeletionRequestBody: {
185+
reason: "Requesting deletion",
186+
},
187+
});
188+
} catch (error) {
189+
return error;
190+
}
191+
this.$patch((state) => {
192+
// TODO: Use proper status.
193+
if (state.current)
194+
state.current.status = api.EnduroStorageAipStatusEnum.InReview;
195+
});
196+
},
176197
nextPage() {
177198
if (this.hasNextPage) {
178199
this.fetchAips(this.pager.current + 1);

internal/storage/deletion_request.go

+29-2
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,41 @@ package storage
22

33
import (
44
"context"
5+
"errors"
6+
7+
"github.com/google/uuid"
58

69
goastorage "github.com/artefactual-sdps/enduro/internal/api/gen/storage"
710
)
811

9-
func (svc *serviceImpl) RequestAipDeletion(ctx context.Context, payload *goastorage.RequestAipDeletionPayload) error {
12+
func (s *serviceImpl) RequestAipDeletion(ctx context.Context, payload *goastorage.RequestAipDeletionPayload) error {
13+
aipID, err := uuid.Parse(payload.UUID)
14+
if err != nil {
15+
return goastorage.MakeNotValid(errors.New("invalid UUID"))
16+
}
17+
if payload.Reason == "" {
18+
return goastorage.MakeNotValid(errors.New("invalid reason"))
19+
}
20+
21+
s.logger.Info("HERE WE GO!!", "UUID", payload.UUID, "Reason", payload.Reason)
22+
23+
// TODO:
24+
// - Check AIP existence and status, same as in workflow.
25+
// - Get user details from context claim and include them in the request.
26+
27+
_, err = InitStorageDeleteWorkflow(ctx, s.tc, &StorageDeleteWorkflowRequest{
28+
AIPID: aipID,
29+
Reason: payload.Reason,
30+
TaskQueue: s.config.TaskQueue,
31+
})
32+
if err != nil {
33+
s.logger.Error(err, "error initializing delete workflow")
34+
return goastorage.MakeNotAvailable(errors.New("cannot perform operation"))
35+
}
36+
1037
return nil
1138
}
1239

13-
func (svc *serviceImpl) ReviewAipDeletion(ctx context.Context, payload *goastorage.ReviewAipDeletionPayload) error {
40+
func (s *serviceImpl) ReviewAipDeletion(ctx context.Context, payload *goastorage.ReviewAipDeletionPayload) error {
1441
return nil
1542
}

internal/storage/workflow.go

+26
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,21 @@ import (
1313
const (
1414
CopyToPermanentLocationActivityName = "copy-to-permanent-location-activity"
1515
DeleteFromLocationActivityName = "delete-from-location-activity"
16+
StorageDeleteWorkflowName = "storage-delete-workflow"
1617
StorageUploadWorkflowName = "storage-upload-workflow"
1718
StorageMoveWorkflowName = "storage-move-workflow"
1819
UploadDoneSignalName = "upload-done-signal"
1920
)
2021

22+
type StorageDeleteWorkflowRequest struct {
23+
AIPID uuid.UUID
24+
Reason string
25+
UserEmail string
26+
UserSub string
27+
UserISS string
28+
TaskQueue string
29+
}
30+
2131
type StorageUploadWorkflowRequest struct {
2232
AIPID uuid.UUID
2333
TaskQueue string
@@ -36,6 +46,22 @@ type CopyToPermanentLocationActivityParams struct {
3646

3747
type UploadDoneSignal struct{}
3848

49+
func InitStorageDeleteWorkflow(
50+
ctx context.Context,
51+
tc temporalsdk_client.Client,
52+
req *StorageDeleteWorkflowRequest,
53+
) (temporalsdk_client.WorkflowRun, error) {
54+
ctx, cancel := context.WithTimeout(ctx, time.Second*5)
55+
defer cancel()
56+
57+
opts := temporalsdk_client.StartWorkflowOptions{
58+
ID: fmt.Sprintf("%s-%s", StorageDeleteWorkflowName, req.AIPID),
59+
TaskQueue: req.TaskQueue,
60+
WorkflowIDReusePolicy: temporalsdk_api_enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY,
61+
}
62+
return tc.ExecuteWorkflow(ctx, opts, StorageDeleteWorkflowName, req)
63+
}
64+
3965
func InitStorageUploadWorkflow(
4066
ctx context.Context,
4167
tc temporalsdk_client.Client,

internal/storage/workflows/delete.go

+88
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
package workflows
2+
3+
import (
4+
"errors"
5+
6+
temporalsdk_workflow "go.temporal.io/sdk/workflow"
7+
8+
"github.com/artefactual-sdps/enduro/internal/storage"
9+
"github.com/artefactual-sdps/enduro/internal/storage/enums"
10+
)
11+
12+
type StorageDeleteWorkflow struct {
13+
storagesvc storage.Service
14+
}
15+
16+
func NewStorageDeleteWorkflow(storagesvc storage.Service) *StorageDeleteWorkflow {
17+
return &StorageDeleteWorkflow{storagesvc: storagesvc}
18+
}
19+
20+
func (w *StorageDeleteWorkflow) Execute(
21+
ctx temporalsdk_workflow.Context,
22+
req storage.StorageDeleteWorkflowRequest,
23+
) (e error) {
24+
// TODO: Check AIP existence and status, fail workflow.
25+
26+
// Set AIP status to processing.
27+
// TODO: Update AIP status enum and use proper status.
28+
if err := updateAIPStatus(ctx, w.storagesvc, req.AIPID, enums.AIPStatusInReview); err != nil {
29+
return err
30+
}
31+
32+
// Create persistence workflow.
33+
workflowDBID, err := createWorkflow(ctx, w.storagesvc, req.AIPID, enums.WorkflowTypeDeleteAip)
34+
if err != nil {
35+
return err
36+
}
37+
38+
// Complete persistence workflow.
39+
defer func() {
40+
// TODO: Consider rejected/cancelled case.
41+
status := enums.WorkflowStatusDone
42+
if e != nil {
43+
status = enums.WorkflowStatusError
44+
}
45+
46+
err := completeWorkflow(ctx, w.storagesvc, workflowDBID, status)
47+
if err != nil {
48+
e = errors.Join(e, err)
49+
}
50+
}()
51+
52+
// Create review task.
53+
reviewTaskID, err := createTask(
54+
ctx,
55+
w.storagesvc,
56+
workflowDBID,
57+
"Review AIP deletion request",
58+
"Awaiting user decision",
59+
)
60+
if err != nil {
61+
return err
62+
}
63+
64+
// TODO:
65+
// - Create DeletionRequest.
66+
// - Add signal channel, etc.
67+
// - Update DeletionRequest.
68+
69+
// Complete review task.
70+
taskNote := "AIP deletion request approved"
71+
if false {
72+
taskNote = "AIP deletion request rejected"
73+
}
74+
taskErr := completeTask(ctx, w.storagesvc, reviewTaskID, enums.TaskStatusDone, taskNote)
75+
if err = errors.Join(err, taskErr); err != nil {
76+
return err
77+
}
78+
79+
// TODO: Delete the AIP from AMSS or MinIO location source.
80+
81+
// Set AIP status to deleted.
82+
// TODO: Update AIP status enum and use proper status.
83+
if err := updateAIPStatus(ctx, w.storagesvc, req.AIPID, enums.AIPStatusRejected); err != nil {
84+
return err
85+
}
86+
87+
return nil
88+
}

internal/storage/workflows/move.go

+2-27
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"fmt"
66
"time"
77

8-
"github.com/google/uuid"
98
temporalsdk_temporal "go.temporal.io/sdk/temporal"
109
temporalsdk_workflow "go.temporal.io/sdk/workflow"
1110

@@ -29,7 +28,7 @@ func (w *StorageMoveWorkflow) Execute(
2928
) (e error) {
3029
// Set AIP status to moving.
3130
{
32-
if err := w.updateAIPStatus(ctx, enums.AIPStatusMoving, req.AIPID); err != nil {
31+
if err := updateAIPStatus(ctx, w.storagesvc, req.AIPID, enums.AIPStatusMoving); err != nil {
3332
return err
3433
}
3534
}
@@ -145,34 +144,10 @@ func (w *StorageMoveWorkflow) Execute(
145144

146145
// Set AIP status to stored.
147146
{
148-
if err := w.updateAIPStatus(ctx, enums.AIPStatusStored, req.AIPID); err != nil {
147+
if err := updateAIPStatus(ctx, w.storagesvc, req.AIPID, enums.AIPStatusStored); err != nil {
149148
return err
150149
}
151150
}
152151

153152
return nil
154153
}
155-
156-
func (w *StorageMoveWorkflow) updateAIPStatus(
157-
ctx temporalsdk_workflow.Context,
158-
st enums.AIPStatus,
159-
aipID uuid.UUID,
160-
) error {
161-
activityOpts := temporalsdk_workflow.WithLocalActivityOptions(ctx, temporalsdk_workflow.LocalActivityOptions{
162-
ScheduleToCloseTimeout: 5 * time.Second,
163-
RetryPolicy: &temporalsdk_temporal.RetryPolicy{
164-
InitialInterval: time.Second,
165-
BackoffCoefficient: 2,
166-
MaximumInterval: time.Minute,
167-
MaximumAttempts: 3,
168-
},
169-
})
170-
171-
params := &storage.UpdateAIPStatusLocalActivityParams{
172-
AIPID: aipID,
173-
Status: st,
174-
}
175-
176-
return temporalsdk_workflow.ExecuteLocalActivity(activityOpts, storage.UpdateAIPStatusLocalActivity, w.storagesvc, params).
177-
Get(activityOpts, nil)
178-
}

internal/storage/workflows/workflows.go

+18
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,24 @@ func localActivityOptions(ctx temporalsdk_workflow.Context) temporalsdk_workflow
2323
})
2424
}
2525

26+
func updateAIPStatus(
27+
ctx temporalsdk_workflow.Context,
28+
storagesvc storage.Service,
29+
aipID uuid.UUID,
30+
s enums.AIPStatus,
31+
) error {
32+
activityOpts := localActivityOptions(ctx)
33+
return temporalsdk_workflow.ExecuteLocalActivity(
34+
activityOpts,
35+
storage.UpdateAIPStatusLocalActivity,
36+
storagesvc,
37+
&storage.UpdateAIPStatusLocalActivityParams{
38+
AIPID: aipID,
39+
Status: s,
40+
},
41+
).Get(activityOpts, nil)
42+
}
43+
2644
func createWorkflow(
2745
ctx temporalsdk_workflow.Context,
2846
storagesvc storage.Service,

0 commit comments

Comments
 (0)