From dd50a7c804541bde489f4b2dd7df9891f3dd7b32 Mon Sep 17 00:00:00 2001 From: Luke Sikina Date: Mon, 30 Dec 2024 12:47:18 -0500 Subject: [PATCH] [ALS-7200] Upload large files via multipart --- .../dataupload/upload/DataUploadService.java | 113 +++++++++++++--- .../upload/DataUploadServiceTest.java | 127 ++++++++++++++++-- 2 files changed, 212 insertions(+), 28 deletions(-) diff --git a/uploader/src/main/java/edu/harvard/dbmi/avillach/dataupload/upload/DataUploadService.java b/uploader/src/main/java/edu/harvard/dbmi/avillach/dataupload/upload/DataUploadService.java index a5cadab..2b019fc 100644 --- a/uploader/src/main/java/edu/harvard/dbmi/avillach/dataupload/upload/DataUploadService.java +++ b/uploader/src/main/java/edu/harvard/dbmi/avillach/dataupload/upload/DataUploadService.java @@ -16,14 +16,18 @@ import software.amazon.awssdk.awscore.exception.AwsServiceException; import software.amazon.awssdk.core.exception.SdkClientException; import software.amazon.awssdk.core.sync.RequestBody; -import software.amazon.awssdk.services.s3.S3ClientBuilder; -import software.amazon.awssdk.services.s3.model.PutObjectRequest; -import software.amazon.awssdk.services.s3.model.ServerSideEncryption; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.*; import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; import java.nio.file.Files; import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.Semaphore; import java.util.function.BiConsumer; @@ -32,6 +36,7 @@ public class DataUploadService { private static final Logger LOG = LoggerFactory.getLogger(DataUploadService.class); + private static final int SIXTEEN_MB = 16 * 1024 * 1024; @Autowired private Semaphore uploadLock; @@ -109,7 +114,7 @@ protected void uploadData(Query query, DataType dataType, String site) { uploadLock.release(); } - private static void deleteFile(Path data) { + private void deleteFile(Path data) { try { Files.delete(data); } catch (IOException e) { @@ -118,20 +123,98 @@ private static void deleteFile(Path data) { } private boolean uploadFileFromPath(Path p, SiteAWSInfo site, String dir) { + Optional maybeClient = s3ClientBuilder.buildClientForSite(site.siteName()); + if (maybeClient.isEmpty()) { + LOG.info("There is no client for site {}", site); + return false; + } + S3Client s3 = maybeClient.get(); + LOG.info("Starting multipart upload for file {} to site {} in dir {}", p, site, dir); + + CreateMultipartUploadRequest createRequest = CreateMultipartUploadRequest.builder() + .bucket(site.bucket()) + .serverSideEncryption(ServerSideEncryption.AWS_KMS) + .ssekmsKeyId(site.kmsKeyID()) + .key(Path.of(dir, home + "_" + p.getFileName().toString()).toString()) + .build(); + String uploadId; + try { + uploadId = s3.createMultipartUpload(createRequest).uploadId(); + } catch (AwsServiceException e) { + LOG.error("Error creating multipart: ", e); + return false; + } + LOG.info("Created initial multipart request and notified S3"); + + LOG.info("Starting upload process..."); + List completedParts = uploadAllParts(p, site, dir, uploadId, s3); + if (completedParts.isEmpty()) { + return false; + } + LOG.info("Upload complete! Uploaded {} parts", completedParts.size()); + + LOG.info("Notifying S3 of completed upload..."); + CompletedMultipartUpload completedUpload = CompletedMultipartUpload.builder() + .parts(completedParts) + .build(); + + CompleteMultipartUploadRequest completeRequest = CompleteMultipartUploadRequest.builder() + .bucket(site.bucket()) + .key(Path.of(dir, home + "_" + p.getFileName().toString()).toString()) + .uploadId(uploadId) + .multipartUpload(completedUpload) + .build(); + try { - RequestBody body = RequestBody.fromFile(p.toFile()); - PutObjectRequest request = PutObjectRequest.builder() - .bucket(site.bucket()) - .serverSideEncryption(ServerSideEncryption.AWS_KMS) - .ssekmsKeyId(site.kmsKeyID()) - .key(Path.of(dir, home + "_" + p.getFileName().toString()).toString()) - .build(); - return s3ClientBuilder.buildClientForSite(site.siteName()) - .map(client -> client.putObject(request, body)) - .isPresent(); + s3.completeMultipartUpload(completeRequest); } catch (AwsServiceException | SdkClientException e) { - LOG.info("Error uploading file from {} to bucket {}", p, site.bucket(), e); + LOG.error("Error finishing multipart: ", e); return false; } + LOG.info("Done uploading {} to {}", p.getFileName(), site.siteName()); + return true; + } + + private List uploadAllParts(Path p, SiteAWSInfo site, String dir, String uploadId, S3Client s3) { + List completedParts = new ArrayList<>(); + int part = 1; + ByteBuffer buffer = ByteBuffer.allocate(SIXTEEN_MB); + + try (RandomAccessFile file = new RandomAccessFile(p.toString(), "r")) { + long fileSize = file.length(); + long position = 0; + + while (position < fileSize) { + file.seek(position); + int bytesRead = file.getChannel().read(buffer); + + LOG.info("Uploading file {} part {}", p.getFileName(), part); + buffer.flip(); + UploadPartRequest uploadPartRequest = UploadPartRequest.builder() + .bucket(site.bucket()) + .key(Path.of(dir, home + "_" + p.getFileName().toString()).toString()) + .uploadId(uploadId) + .partNumber(part) + .contentLength((long) bytesRead) + .build(); + + + UploadPartResponse response = s3.uploadPart(uploadPartRequest, RequestBody.fromByteBuffer(buffer)); + + completedParts.add(CompletedPart.builder() + .partNumber(part) + .eTag(response.eTag()) + .build()); + + buffer.clear(); + position += bytesRead; + part++; + } + } catch (IOException | AwsServiceException | SdkClientException e) { + LOG.error("Failed to upload file {}, part {}: ", p.getFileName(), part, e); + return List.of(); + } + LOG.info("Uploaded all parts, finishing"); + return completedParts; } } diff --git a/uploader/src/test/java/edu/harvard/dbmi/avillach/dataupload/upload/DataUploadServiceTest.java b/uploader/src/test/java/edu/harvard/dbmi/avillach/dataupload/upload/DataUploadServiceTest.java index a4144c1..2fb8f2d 100644 --- a/uploader/src/test/java/edu/harvard/dbmi/avillach/dataupload/upload/DataUploadServiceTest.java +++ b/uploader/src/test/java/edu/harvard/dbmi/avillach/dataupload/upload/DataUploadServiceTest.java @@ -18,8 +18,7 @@ import software.amazon.awssdk.awscore.exception.AwsServiceException; import software.amazon.awssdk.core.sync.RequestBody; import software.amazon.awssdk.services.s3.S3Client; -import software.amazon.awssdk.services.s3.model.PutObjectRequest; -import software.amazon.awssdk.services.s3.model.PutObjectResponse; +import software.amazon.awssdk.services.s3.model.*; import java.io.IOException; import java.nio.file.Files; @@ -102,15 +101,101 @@ void shouldNotUploadDataIfAWSUpset(@TempDir Path tempDir) throws IOException, In Mockito.when(sharingRoot.toString()).thenReturn(tempDir.toString()); Mockito.when(hpds.writePhenotypicData(q)).thenReturn(true); Mockito.when(s3.buildClientForSite("bch")).thenReturn(Optional.of(s3Client)); - Mockito.when(s3Client.putObject(Mockito.any(PutObjectRequest.class), Mockito.any(RequestBody.class))) + Mockito.when(s3Client.createMultipartUpload(Mockito.any(CreateMultipartUploadRequest.class))) .thenThrow(AwsServiceException.builder().build()); subject.uploadData(q, DataType.Phenotypic, "bch"); - Mockito.verify(statusService, Mockito.times(1)).setPhenotypicStatus(q, UploadStatus.Querying); - Mockito.verify(statusService, Mockito.times(1)).setPhenotypicStatus(q, UploadStatus.Uploading); - Mockito.verify(s3Client, Mockito.times(1)).putObject(Mockito.any(PutObjectRequest.class), Mockito.any(RequestBody.class)); - Mockito.verify(statusService, Mockito.times(1)).setPhenotypicStatus(q, UploadStatus.Error); + Mockito.verify(statusService, Mockito.times(1)). + setPhenotypicStatus(q, UploadStatus.Querying); + Mockito.verify(statusService, Mockito.times(1)). + setPhenotypicStatus(q, UploadStatus.Uploading); + Mockito.verify(s3Client, Mockito.times(1)) + .createMultipartUpload(Mockito.any(CreateMultipartUploadRequest.class)); + Mockito.verify(statusService, Mockito.times(1)). + setPhenotypicStatus(q, UploadStatus.Error); + Mockito.verify(uploadLock, Mockito.times(1)).acquire(); + Mockito.verify(uploadLock, Mockito.times(1)).release(); + } + + @Test + void shouldNotUploadDataIfAWSUploadFails(@TempDir Path tempDir) throws IOException, InterruptedException { + Query q = new Query(); + q.setPicSureId("my-id"); + q.setId("my-id"); + + Files.createDirectory(Path.of(tempDir.toString(), q.getPicSureId())); + Files.writeString(Path.of(tempDir.toString(), q.getPicSureId(), DataType.Phenotypic.fileName), ":)"); + ReflectionTestUtils.setField(subject, "roleARNs", roleARNs); + CreateMultipartUploadResponse createResp = Mockito.mock(CreateMultipartUploadResponse.class); + Mockito.when(createResp.uploadId()).thenReturn("frank"); + + Mockito.when(sharingRoot.toString()).thenReturn(tempDir.toString()); + Mockito.when(hpds.writePhenotypicData(q)).thenReturn(true); + Mockito.when(s3.buildClientForSite("bch")).thenReturn(Optional.of(s3Client)); + Mockito.when(s3Client.createMultipartUpload(Mockito.any(CreateMultipartUploadRequest.class))) + .thenReturn(createResp); + Mockito.when(s3Client.uploadPart(Mockito.any(UploadPartRequest.class), Mockito.any(RequestBody.class))) + .thenThrow(AwsServiceException.builder().build()); + + subject.uploadData(q, DataType.Phenotypic, "bch"); + + Mockito.verify(statusService, Mockito.times(1)). + setPhenotypicStatus(q, UploadStatus.Querying); + Mockito.verify(statusService, Mockito.times(1)). + setPhenotypicStatus(q, UploadStatus.Uploading); + Mockito.verify(s3Client, Mockito.times(1)) + .createMultipartUpload(Mockito.any(CreateMultipartUploadRequest.class)); + Mockito.verify(s3Client, Mockito.times(1)) + .uploadPart(Mockito.any(UploadPartRequest.class), Mockito.any(RequestBody.class)); + Mockito.verify(statusService, Mockito.times(1)). + setPhenotypicStatus(q, UploadStatus.Error); + Mockito.verify(uploadLock, Mockito.times(1)).acquire(); + Mockito.verify(uploadLock, Mockito.times(1)).release(); + } + + @Test + void shouldNotUploadDataWhenCompleteFails(@TempDir Path tempDir) throws IOException, InterruptedException { + Query q = new Query(); + q.setPicSureId("my-id"); + q.setId("my-id"); + + Path fileToUpload = Path.of(tempDir.toString(), q.getPicSureId(), DataType.Phenotypic.fileName); + Files.createDirectory(Path.of(tempDir.toString(), q.getPicSureId())); + Files.writeString(fileToUpload, ":)"); + ReflectionTestUtils.setField(subject, "roleARNs", roleARNs); + + CreateMultipartUploadResponse createResp = Mockito.mock(CreateMultipartUploadResponse.class); + Mockito.when(createResp.uploadId()).thenReturn("frank"); + UploadPartResponse uploadResp = Mockito.mock(UploadPartResponse.class); + Mockito.when(uploadResp.eTag()).thenReturn("gus"); + + Mockito.when(sharingRoot.toString()).thenReturn(tempDir.toString()); + Mockito.when(hpds.writePhenotypicData(q)).thenReturn(true); + Mockito.when(s3.buildClientForSite("bch")).thenReturn(Optional.of(s3Client)); + Mockito.when(s3Client.uploadPart(Mockito.any(UploadPartRequest.class), Mockito.any(RequestBody.class))) + .thenReturn(uploadResp); + Mockito.when(s3Client.createMultipartUpload(Mockito.any(CreateMultipartUploadRequest.class))) + .thenReturn(createResp); + Mockito.when(s3Client.completeMultipartUpload(Mockito.any(CompleteMultipartUploadRequest.class))) + .thenThrow(AwsServiceException.builder().build()); + + + subject.uploadData(q, DataType.Phenotypic, "bch"); + + Mockito.verify(statusService, Mockito.times(1)) + .setPhenotypicStatus(q, UploadStatus.Querying); + Mockito.verify(statusService, Mockito.times(1)) + .setPhenotypicStatus(q, UploadStatus.Uploading); + Mockito.verify(s3Client, Mockito.times(1)) + .createMultipartUpload(Mockito.any(CreateMultipartUploadRequest.class)); + Mockito.verify(s3Client, Mockito.times(1)) + .completeMultipartUpload(Mockito.any(CompleteMultipartUploadRequest.class)); + Mockito.verify(s3Client, Mockito.times(1)) + .uploadPart(Mockito.any(UploadPartRequest.class), Mockito.any(RequestBody.class)); + Mockito.verify(statusService, Mockito.times(1)) + .setPhenotypicStatus(q, UploadStatus.Error); + Assertions.assertFalse(Files.exists(fileToUpload)); Mockito.verify(uploadLock, Mockito.times(1)).acquire(); Mockito.verify(uploadLock, Mockito.times(1)).release(); } @@ -126,18 +211,34 @@ void shouldUploadData(@TempDir Path tempDir) throws IOException, InterruptedExce Files.writeString(fileToUpload, ":)"); ReflectionTestUtils.setField(subject, "roleARNs", roleARNs); + CreateMultipartUploadResponse createResp = Mockito.mock(CreateMultipartUploadResponse.class); + Mockito.when(createResp.uploadId()).thenReturn("frank"); + UploadPartResponse uploadResp = Mockito.mock(UploadPartResponse.class); + Mockito.when(uploadResp.eTag()).thenReturn("gus"); + Mockito.when(sharingRoot.toString()).thenReturn(tempDir.toString()); Mockito.when(hpds.writePhenotypicData(q)).thenReturn(true); Mockito.when(s3.buildClientForSite("bch")).thenReturn(Optional.of(s3Client)); - Mockito.when(s3Client.putObject(Mockito.any(PutObjectRequest.class), Mockito.any(RequestBody.class))) - .thenReturn(Mockito.mock(PutObjectResponse.class)); + Mockito.when(s3Client.uploadPart(Mockito.any(UploadPartRequest.class), Mockito.any(RequestBody.class))) + .thenReturn(uploadResp); + Mockito.when(s3Client.createMultipartUpload(Mockito.any(CreateMultipartUploadRequest.class))) + .thenReturn(createResp); + subject.uploadData(q, DataType.Phenotypic, "bch"); - Mockito.verify(statusService, Mockito.times(1)).setPhenotypicStatus(q, UploadStatus.Querying); - Mockito.verify(statusService, Mockito.times(1)).setPhenotypicStatus(q, UploadStatus.Uploading); - Mockito.verify(s3Client, Mockito.times(1)).putObject(Mockito.any(PutObjectRequest.class), Mockito.any(RequestBody.class)); - Mockito.verify(statusService, Mockito.times(1)).setPhenotypicStatus(q, UploadStatus.Uploaded); + Mockito.verify(statusService, Mockito.times(1)) + .setPhenotypicStatus(q, UploadStatus.Querying); + Mockito.verify(statusService, Mockito.times(1)) + .setPhenotypicStatus(q, UploadStatus.Uploading); + Mockito.verify(s3Client, Mockito.times(1)) + .createMultipartUpload(Mockito.any(CreateMultipartUploadRequest.class)); + Mockito.verify(s3Client, Mockito.times(1)) + .completeMultipartUpload(Mockito.any(CompleteMultipartUploadRequest.class)); + Mockito.verify(s3Client, Mockito.times(1)) + .uploadPart(Mockito.any(UploadPartRequest.class), Mockito.any(RequestBody.class)); + Mockito.verify(statusService, Mockito.times(1)) + .setPhenotypicStatus(q, UploadStatus.Uploaded); Assertions.assertFalse(Files.exists(fileToUpload)); Mockito.verify(uploadLock, Mockito.times(1)).acquire(); Mockito.verify(uploadLock, Mockito.times(1)).release();