Skip to content

Commit 44663a0

Browse files
Luke SikinaLuke-Sikina
Luke Sikina
authored andcommitted
[ALS-7200] Upload large files via multipart
1 parent c729973 commit 44663a0

File tree

2 files changed

+212
-28
lines changed

2 files changed

+212
-28
lines changed

uploader/src/main/java/edu/harvard/dbmi/avillach/dataupload/upload/DataUploadService.java

+98-15
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,18 @@
1616
import software.amazon.awssdk.awscore.exception.AwsServiceException;
1717
import software.amazon.awssdk.core.exception.SdkClientException;
1818
import software.amazon.awssdk.core.sync.RequestBody;
19-
import software.amazon.awssdk.services.s3.S3ClientBuilder;
20-
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
21-
import software.amazon.awssdk.services.s3.model.ServerSideEncryption;
19+
import software.amazon.awssdk.services.s3.S3Client;
20+
import software.amazon.awssdk.services.s3.model.*;
2221

2322
import java.io.IOException;
23+
import java.io.RandomAccessFile;
24+
import java.nio.ByteBuffer;
2425
import java.nio.file.Files;
2526
import java.nio.file.Path;
27+
import java.util.ArrayList;
28+
import java.util.List;
2629
import java.util.Map;
30+
import java.util.Optional;
2731
import java.util.concurrent.Semaphore;
2832
import java.util.function.BiConsumer;
2933

@@ -32,6 +36,7 @@
3236
public class DataUploadService {
3337

3438
private static final Logger LOG = LoggerFactory.getLogger(DataUploadService.class);
39+
private static final int SIXTEEN_MB = 16 * 1024 * 1024;
3540

3641
@Autowired
3742
private Semaphore uploadLock;
@@ -109,7 +114,7 @@ protected void uploadData(Query query, DataType dataType, String site) {
109114
uploadLock.release();
110115
}
111116

112-
private static void deleteFile(Path data) {
117+
private void deleteFile(Path data) {
113118
try {
114119
Files.delete(data);
115120
} catch (IOException e) {
@@ -118,20 +123,98 @@ private static void deleteFile(Path data) {
118123
}
119124

120125
private boolean uploadFileFromPath(Path p, SiteAWSInfo site, String dir) {
126+
Optional<S3Client> maybeClient = s3ClientBuilder.buildClientForSite(site.siteName());
127+
if (maybeClient.isEmpty()) {
128+
LOG.info("There is no client for site {}", site);
129+
return false;
130+
}
131+
S3Client s3 = maybeClient.get();
132+
LOG.info("Starting multipart upload for file {} to site {} in dir {}", p, site, dir);
133+
134+
CreateMultipartUploadRequest createRequest = CreateMultipartUploadRequest.builder()
135+
.bucket(site.bucket())
136+
.serverSideEncryption(ServerSideEncryption.AWS_KMS)
137+
.ssekmsKeyId(site.kmsKeyID())
138+
.key(Path.of(dir, home + "_" + p.getFileName().toString()).toString())
139+
.build();
140+
String uploadId;
141+
try {
142+
uploadId = s3.createMultipartUpload(createRequest).uploadId();
143+
} catch (AwsServiceException e) {
144+
LOG.error("Error creating multipart: ", e);
145+
return false;
146+
}
147+
LOG.info("Created initial multipart request and notified S3");
148+
149+
LOG.info("Starting upload process...");
150+
List<CompletedPart> completedParts = uploadAllParts(p, site, dir, uploadId, s3);
151+
if (completedParts.isEmpty()) {
152+
return false;
153+
}
154+
LOG.info("Upload complete! Uploaded {} parts", completedParts.size());
155+
156+
LOG.info("Notifying S3 of completed upload...");
157+
CompletedMultipartUpload completedUpload = CompletedMultipartUpload.builder()
158+
.parts(completedParts)
159+
.build();
160+
161+
CompleteMultipartUploadRequest completeRequest = CompleteMultipartUploadRequest.builder()
162+
.bucket(site.bucket())
163+
.key(Path.of(dir, home + "_" + p.getFileName().toString()).toString())
164+
.uploadId(uploadId)
165+
.multipartUpload(completedUpload)
166+
.build();
167+
121168
try {
122-
RequestBody body = RequestBody.fromFile(p.toFile());
123-
PutObjectRequest request = PutObjectRequest.builder()
124-
.bucket(site.bucket())
125-
.serverSideEncryption(ServerSideEncryption.AWS_KMS)
126-
.ssekmsKeyId(site.kmsKeyID())
127-
.key(Path.of(dir, home + "_" + p.getFileName().toString()).toString())
128-
.build();
129-
return s3ClientBuilder.buildClientForSite(site.siteName())
130-
.map(client -> client.putObject(request, body))
131-
.isPresent();
169+
s3.completeMultipartUpload(completeRequest);
132170
} catch (AwsServiceException | SdkClientException e) {
133-
LOG.info("Error uploading file from {} to bucket {}", p, site.bucket(), e);
171+
LOG.error("Error finishing multipart: ", e);
134172
return false;
135173
}
174+
LOG.info("Done uploading {} to {}", p.getFileName(), site.siteName());
175+
return true;
176+
}
177+
178+
private List<CompletedPart> uploadAllParts(Path p, SiteAWSInfo site, String dir, String uploadId, S3Client s3) {
179+
List<CompletedPart> completedParts = new ArrayList<>();
180+
int part = 1;
181+
ByteBuffer buffer = ByteBuffer.allocate(SIXTEEN_MB);
182+
183+
try (RandomAccessFile file = new RandomAccessFile(p.toString(), "r")) {
184+
long fileSize = file.length();
185+
long position = 0;
186+
187+
while (position < fileSize) {
188+
file.seek(position);
189+
int bytesRead = file.getChannel().read(buffer);
190+
191+
LOG.info("Uploading file {} part {}", p.getFileName(), part);
192+
buffer.flip();
193+
UploadPartRequest uploadPartRequest = UploadPartRequest.builder()
194+
.bucket(site.bucket())
195+
.key(Path.of(dir, home + "_" + p.getFileName().toString()).toString())
196+
.uploadId(uploadId)
197+
.partNumber(part)
198+
.contentLength((long) bytesRead)
199+
.build();
200+
201+
202+
UploadPartResponse response = s3.uploadPart(uploadPartRequest, RequestBody.fromByteBuffer(buffer));
203+
204+
completedParts.add(CompletedPart.builder()
205+
.partNumber(part)
206+
.eTag(response.eTag())
207+
.build());
208+
209+
buffer.clear();
210+
position += bytesRead;
211+
part++;
212+
}
213+
} catch (IOException | AwsServiceException | SdkClientException e) {
214+
LOG.error("Failed to upload file {}, part {}: ", p.getFileName(), part, e);
215+
return List.of();
216+
}
217+
LOG.info("Uploaded all parts, finishing");
218+
return completedParts;
136219
}
137220
}

uploader/src/test/java/edu/harvard/dbmi/avillach/dataupload/upload/DataUploadServiceTest.java

+114-13
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,7 @@
1818
import software.amazon.awssdk.awscore.exception.AwsServiceException;
1919
import software.amazon.awssdk.core.sync.RequestBody;
2020
import software.amazon.awssdk.services.s3.S3Client;
21-
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
22-
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
21+
import software.amazon.awssdk.services.s3.model.*;
2322

2423
import java.io.IOException;
2524
import java.nio.file.Files;
@@ -102,15 +101,101 @@ void shouldNotUploadDataIfAWSUpset(@TempDir Path tempDir) throws IOException, In
102101
Mockito.when(sharingRoot.toString()).thenReturn(tempDir.toString());
103102
Mockito.when(hpds.writePhenotypicData(q)).thenReturn(true);
104103
Mockito.when(s3.buildClientForSite("bch")).thenReturn(Optional.of(s3Client));
105-
Mockito.when(s3Client.putObject(Mockito.any(PutObjectRequest.class), Mockito.any(RequestBody.class)))
104+
Mockito.when(s3Client.createMultipartUpload(Mockito.any(CreateMultipartUploadRequest.class)))
106105
.thenThrow(AwsServiceException.builder().build());
107106

108107
subject.uploadData(q, DataType.Phenotypic, "bch");
109108

110-
Mockito.verify(statusService, Mockito.times(1)).setPhenotypicStatus(q, UploadStatus.Querying);
111-
Mockito.verify(statusService, Mockito.times(1)).setPhenotypicStatus(q, UploadStatus.Uploading);
112-
Mockito.verify(s3Client, Mockito.times(1)).putObject(Mockito.any(PutObjectRequest.class), Mockito.any(RequestBody.class));
113-
Mockito.verify(statusService, Mockito.times(1)).setPhenotypicStatus(q, UploadStatus.Error);
109+
Mockito.verify(statusService, Mockito.times(1)).
110+
setPhenotypicStatus(q, UploadStatus.Querying);
111+
Mockito.verify(statusService, Mockito.times(1)).
112+
setPhenotypicStatus(q, UploadStatus.Uploading);
113+
Mockito.verify(s3Client, Mockito.times(1))
114+
.createMultipartUpload(Mockito.any(CreateMultipartUploadRequest.class));
115+
Mockito.verify(statusService, Mockito.times(1)).
116+
setPhenotypicStatus(q, UploadStatus.Error);
117+
Mockito.verify(uploadLock, Mockito.times(1)).acquire();
118+
Mockito.verify(uploadLock, Mockito.times(1)).release();
119+
}
120+
121+
@Test
122+
void shouldNotUploadDataIfAWSUploadFails(@TempDir Path tempDir) throws IOException, InterruptedException {
123+
Query q = new Query();
124+
q.setPicSureId("my-id");
125+
q.setId("my-id");
126+
127+
Files.createDirectory(Path.of(tempDir.toString(), q.getPicSureId()));
128+
Files.writeString(Path.of(tempDir.toString(), q.getPicSureId(), DataType.Phenotypic.fileName), ":)");
129+
ReflectionTestUtils.setField(subject, "roleARNs", roleARNs);
130+
CreateMultipartUploadResponse createResp = Mockito.mock(CreateMultipartUploadResponse.class);
131+
Mockito.when(createResp.uploadId()).thenReturn("frank");
132+
133+
Mockito.when(sharingRoot.toString()).thenReturn(tempDir.toString());
134+
Mockito.when(hpds.writePhenotypicData(q)).thenReturn(true);
135+
Mockito.when(s3.buildClientForSite("bch")).thenReturn(Optional.of(s3Client));
136+
Mockito.when(s3Client.createMultipartUpload(Mockito.any(CreateMultipartUploadRequest.class)))
137+
.thenReturn(createResp);
138+
Mockito.when(s3Client.uploadPart(Mockito.any(UploadPartRequest.class), Mockito.any(RequestBody.class)))
139+
.thenThrow(AwsServiceException.builder().build());
140+
141+
subject.uploadData(q, DataType.Phenotypic, "bch");
142+
143+
Mockito.verify(statusService, Mockito.times(1)).
144+
setPhenotypicStatus(q, UploadStatus.Querying);
145+
Mockito.verify(statusService, Mockito.times(1)).
146+
setPhenotypicStatus(q, UploadStatus.Uploading);
147+
Mockito.verify(s3Client, Mockito.times(1))
148+
.createMultipartUpload(Mockito.any(CreateMultipartUploadRequest.class));
149+
Mockito.verify(s3Client, Mockito.times(1))
150+
.uploadPart(Mockito.any(UploadPartRequest.class), Mockito.any(RequestBody.class));
151+
Mockito.verify(statusService, Mockito.times(1)).
152+
setPhenotypicStatus(q, UploadStatus.Error);
153+
Mockito.verify(uploadLock, Mockito.times(1)).acquire();
154+
Mockito.verify(uploadLock, Mockito.times(1)).release();
155+
}
156+
157+
@Test
158+
void shouldNotUploadDataWhenCompleteFails(@TempDir Path tempDir) throws IOException, InterruptedException {
159+
Query q = new Query();
160+
q.setPicSureId("my-id");
161+
q.setId("my-id");
162+
163+
Path fileToUpload = Path.of(tempDir.toString(), q.getPicSureId(), DataType.Phenotypic.fileName);
164+
Files.createDirectory(Path.of(tempDir.toString(), q.getPicSureId()));
165+
Files.writeString(fileToUpload, ":)");
166+
ReflectionTestUtils.setField(subject, "roleARNs", roleARNs);
167+
168+
CreateMultipartUploadResponse createResp = Mockito.mock(CreateMultipartUploadResponse.class);
169+
Mockito.when(createResp.uploadId()).thenReturn("frank");
170+
UploadPartResponse uploadResp = Mockito.mock(UploadPartResponse.class);
171+
Mockito.when(uploadResp.eTag()).thenReturn("gus");
172+
173+
Mockito.when(sharingRoot.toString()).thenReturn(tempDir.toString());
174+
Mockito.when(hpds.writePhenotypicData(q)).thenReturn(true);
175+
Mockito.when(s3.buildClientForSite("bch")).thenReturn(Optional.of(s3Client));
176+
Mockito.when(s3Client.uploadPart(Mockito.any(UploadPartRequest.class), Mockito.any(RequestBody.class)))
177+
.thenReturn(uploadResp);
178+
Mockito.when(s3Client.createMultipartUpload(Mockito.any(CreateMultipartUploadRequest.class)))
179+
.thenReturn(createResp);
180+
Mockito.when(s3Client.completeMultipartUpload(Mockito.any(CompleteMultipartUploadRequest.class)))
181+
.thenThrow(AwsServiceException.builder().build());
182+
183+
184+
subject.uploadData(q, DataType.Phenotypic, "bch");
185+
186+
Mockito.verify(statusService, Mockito.times(1))
187+
.setPhenotypicStatus(q, UploadStatus.Querying);
188+
Mockito.verify(statusService, Mockito.times(1))
189+
.setPhenotypicStatus(q, UploadStatus.Uploading);
190+
Mockito.verify(s3Client, Mockito.times(1))
191+
.createMultipartUpload(Mockito.any(CreateMultipartUploadRequest.class));
192+
Mockito.verify(s3Client, Mockito.times(1))
193+
.completeMultipartUpload(Mockito.any(CompleteMultipartUploadRequest.class));
194+
Mockito.verify(s3Client, Mockito.times(1))
195+
.uploadPart(Mockito.any(UploadPartRequest.class), Mockito.any(RequestBody.class));
196+
Mockito.verify(statusService, Mockito.times(1))
197+
.setPhenotypicStatus(q, UploadStatus.Error);
198+
Assertions.assertFalse(Files.exists(fileToUpload));
114199
Mockito.verify(uploadLock, Mockito.times(1)).acquire();
115200
Mockito.verify(uploadLock, Mockito.times(1)).release();
116201
}
@@ -126,18 +211,34 @@ void shouldUploadData(@TempDir Path tempDir) throws IOException, InterruptedExce
126211
Files.writeString(fileToUpload, ":)");
127212
ReflectionTestUtils.setField(subject, "roleARNs", roleARNs);
128213

214+
CreateMultipartUploadResponse createResp = Mockito.mock(CreateMultipartUploadResponse.class);
215+
Mockito.when(createResp.uploadId()).thenReturn("frank");
216+
UploadPartResponse uploadResp = Mockito.mock(UploadPartResponse.class);
217+
Mockito.when(uploadResp.eTag()).thenReturn("gus");
218+
129219
Mockito.when(sharingRoot.toString()).thenReturn(tempDir.toString());
130220
Mockito.when(hpds.writePhenotypicData(q)).thenReturn(true);
131221
Mockito.when(s3.buildClientForSite("bch")).thenReturn(Optional.of(s3Client));
132-
Mockito.when(s3Client.putObject(Mockito.any(PutObjectRequest.class), Mockito.any(RequestBody.class)))
133-
.thenReturn(Mockito.mock(PutObjectResponse.class));
222+
Mockito.when(s3Client.uploadPart(Mockito.any(UploadPartRequest.class), Mockito.any(RequestBody.class)))
223+
.thenReturn(uploadResp);
224+
Mockito.when(s3Client.createMultipartUpload(Mockito.any(CreateMultipartUploadRequest.class)))
225+
.thenReturn(createResp);
226+
134227

135228
subject.uploadData(q, DataType.Phenotypic, "bch");
136229

137-
Mockito.verify(statusService, Mockito.times(1)).setPhenotypicStatus(q, UploadStatus.Querying);
138-
Mockito.verify(statusService, Mockito.times(1)).setPhenotypicStatus(q, UploadStatus.Uploading);
139-
Mockito.verify(s3Client, Mockito.times(1)).putObject(Mockito.any(PutObjectRequest.class), Mockito.any(RequestBody.class));
140-
Mockito.verify(statusService, Mockito.times(1)).setPhenotypicStatus(q, UploadStatus.Uploaded);
230+
Mockito.verify(statusService, Mockito.times(1))
231+
.setPhenotypicStatus(q, UploadStatus.Querying);
232+
Mockito.verify(statusService, Mockito.times(1))
233+
.setPhenotypicStatus(q, UploadStatus.Uploading);
234+
Mockito.verify(s3Client, Mockito.times(1))
235+
.createMultipartUpload(Mockito.any(CreateMultipartUploadRequest.class));
236+
Mockito.verify(s3Client, Mockito.times(1))
237+
.completeMultipartUpload(Mockito.any(CompleteMultipartUploadRequest.class));
238+
Mockito.verify(s3Client, Mockito.times(1))
239+
.uploadPart(Mockito.any(UploadPartRequest.class), Mockito.any(RequestBody.class));
240+
Mockito.verify(statusService, Mockito.times(1))
241+
.setPhenotypicStatus(q, UploadStatus.Uploaded);
141242
Assertions.assertFalse(Files.exists(fileToUpload));
142243
Mockito.verify(uploadLock, Mockito.times(1)).acquire();
143244
Mockito.verify(uploadLock, Mockito.times(1)).release();

0 commit comments

Comments
 (0)