Skip to content

Commit d7cc028

Browse files
Luke SikinaLuke-Sikina
Luke Sikina
authored andcommitted
[ALS-8610] Add cumulus upload logic
1 parent 9d534ef commit d7cc028

File tree

9 files changed

+255
-11
lines changed

9 files changed

+255
-11
lines changed

uploader/pom.xml

+7-2
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
<parent>
66
<groupId>org.springframework.boot</groupId>
77
<artifactId>spring-boot-starter-parent</artifactId>
8-
<version>3.1.4</version>
8+
<version>3.4.2</version>
99
<relativePath/> <!-- lookup parent from repository -->
1010
</parent>
1111
<groupId>edu.harvard.dbmi.avillach</groupId>
@@ -15,7 +15,7 @@
1515
<description>Data Upload Client</description>
1616
<properties>
1717
<java.version>21</java.version>
18-
<aws.version>2.20.153</aws.version>
18+
<aws.version>2.30.24</aws.version>
1919
</properties>
2020
<repositories>
2121
<repository>
@@ -47,6 +47,11 @@
4747
</exclusion>
4848
</exclusions>
4949
</dependency>
50+
<dependency>
51+
<groupId>software.amazon.awssdk</groupId>
52+
<artifactId>lambda</artifactId>
53+
<version>${aws.version}</version>
54+
</dependency>
5055
<dependency>
5156
<groupId>software.amazon.awssdk</groupId>
5257
<artifactId>sts</artifactId>

uploader/src/main/java/edu/harvard/dbmi/avillach/dataupload/hpds/HPDSClient.java

+6-4
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import com.fasterxml.jackson.core.JsonProcessingException;
44
import com.fasterxml.jackson.databind.ObjectMapper;
55
import edu.harvard.dbmi.avillach.dataupload.hpds.hpdsartifactsdonotchange.Query;
6+
import edu.harvard.dbmi.avillach.dataupload.hpds.hpdsartifactsdonotchange.ResultType;
67
import edu.harvard.dbmi.avillach.domain.GeneralQueryRequest;
78
import edu.harvard.dbmi.avillach.domain.QueryRequest;
89
import org.apache.http.HttpResponse;
@@ -45,6 +46,11 @@ public boolean writeGenomicData(Query query) {
4546
return writeData(query, "genomic");
4647
}
4748

49+
public boolean writePatientData(Query query) {
50+
query.setExpectedResultType(ResultType.PATIENTS);
51+
return writeData(query, "patients");
52+
}
53+
4854
public boolean initializeQuery(Query query) {
4955
QueryRequest req = new GeneralQueryRequest();
5056
req.setQuery(query);
@@ -105,8 +111,4 @@ private String createBody(Object query) {
105111
return null;
106112
}
107113
}
108-
109-
public boolean writePatientData(Query query) {
110-
return writeData(query, "patients");
111-
}
112114
}

uploader/src/main/java/edu/harvard/dbmi/avillach/dataupload/hpds/hpdsartifactsdonotchange/ResultType.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -94,5 +94,10 @@ public enum ResultType {
9494
* is suitable to time series analysis and/or loading into another
9595
* instance of HPDS.
9696
*/
97-
DATAFRAME_TIMESERIES
97+
DATAFRAME_TIMESERIES,
98+
99+
/**
100+
* Patients associated with this query
101+
*/
102+
PATIENTS
98103
}

uploader/src/main/java/edu/harvard/dbmi/avillach/dataupload/site/SiteConfiguration.java

+11
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@
77
import org.springframework.context.ConfigurableApplicationContext;
88
import org.springframework.context.annotation.Bean;
99
import org.springframework.context.annotation.Configuration;
10+
import org.springframework.util.StringUtils;
1011

12+
import java.util.ArrayList;
1113
import java.util.List;
1214
import java.util.function.Predicate;
1315
import java.util.stream.Stream;
@@ -26,6 +28,9 @@ public class SiteConfiguration {
2628
@Value("${institution.short-display}")
2729
private String display;
2830

31+
@Value("${cumulus.bucket:}")
32+
private String cumulus;
33+
2934
@Autowired
3035
private ConfigurableApplicationContext context;
3136

@@ -41,6 +46,12 @@ public SiteListing getSiteInfo() {
4146
// we want the home inst first. Makes frontend display a bit nicer
4247
List<String> sites = Stream.concat(Stream.of(home), otherSites.stream()).toList();
4348

49+
if (StringUtils.hasLength(cumulus)) {
50+
LOG.info("Adding cumulus to sites");
51+
sites = new ArrayList<>(sites);
52+
sites.addLast("cumulus");
53+
}
54+
4455
return new SiteListing(sites, home, display);
4556
}
4657
}

uploader/src/main/java/edu/harvard/dbmi/avillach/dataupload/upload/DataUploadController.java

+9
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import edu.harvard.dbmi.avillach.dataupload.status.DataUploadStatuses;
66
import edu.harvard.dbmi.avillach.dataupload.status.UploadStatus;
77
import edu.harvard.dbmi.avillach.dataupload.status.StatusService;
8+
import edu.harvard.dbmi.avillach.dataupload.upload.lambda.CumulusUploadService;
89
import org.slf4j.Logger;
910
import org.slf4j.LoggerFactory;
1011
import org.springframework.beans.factory.annotation.Autowired;
@@ -34,13 +35,21 @@ public class DataUploadController {
3435
@Autowired
3536
private StatusService statusService;
3637

38+
@Autowired
39+
private CumulusUploadService cumulusUploadService;
40+
3741
@Value("${aws.s3.institution:}")
3842
private List<String> institutions;
3943

4044
@PostMapping("/upload/{site}")
4145
public ResponseEntity<DataUploadStatuses> startUpload(
4246
@RequestBody Query query, @PathVariable String site, @RequestParam(value = "dataType") DataType dataType
4347
) {
48+
if ("cumulus".equals(site)) {
49+
LOG.info("Detected cumulus upload. Switching to cumulus upload service.");
50+
boolean success = cumulusUploadService.asyncUpload(query);
51+
return success ? ResponseEntity.ok(null) : ResponseEntity.internalServerError().body(null);
52+
}
4453
site = site.toLowerCase();
4554
query.setExpectedResultType(ResultType.DATAFRAME_TIMESERIES);
4655
if (!institutions.contains(site)) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
package edu.harvard.dbmi.avillach.dataupload.upload.lambda;
2+
3+
import edu.harvard.dbmi.avillach.dataupload.hpds.HPDSClient;
4+
import edu.harvard.dbmi.avillach.dataupload.hpds.hpdsartifactsdonotchange.Query;
5+
import edu.harvard.dbmi.avillach.dataupload.status.StatusService;
6+
import org.slf4j.Logger;
7+
import org.slf4j.LoggerFactory;
8+
import org.springframework.beans.factory.annotation.Autowired;
9+
import org.springframework.stereotype.Service;
10+
11+
import java.io.IOException;
12+
import java.io.OutputStream;
13+
import java.io.RandomAccessFile;
14+
import java.net.HttpURLConnection;
15+
import java.net.URL;
16+
import java.nio.ByteBuffer;
17+
import java.nio.channels.FileChannel;
18+
import java.nio.file.Files;
19+
import java.nio.file.Path;
20+
import java.util.Optional;
21+
22+
@Service
23+
public class CumulusUploadService {
24+
private static final Logger log = LoggerFactory.getLogger(CumulusUploadService.class);
25+
private final StatusService statusService;
26+
private final POSTUrlFetcher urlFetcher;
27+
private final HPDSClient hpdsClient;
28+
private final Path sharingRoot;
29+
30+
31+
@Autowired
32+
public CumulusUploadService(StatusService statusService, POSTUrlFetcher urlFetcher, HPDSClient hpdsClient, Path sharingRoot) {
33+
this.statusService = statusService;
34+
this.urlFetcher = urlFetcher;
35+
this.hpdsClient = hpdsClient;
36+
this.sharingRoot = sharingRoot;
37+
}
38+
39+
public boolean asyncUpload(Query query) {
40+
log.info("Async upload called");
41+
Thread.ofVirtual().start(() -> upload(query));
42+
return true;
43+
}
44+
private void upload(Query query) {
45+
log.info("Fetching upload URL");
46+
Optional<String> uploadURL = urlFetcher.getPreSignedUploadURL(query.getPicSureId(), "patients.txt");
47+
if (uploadURL.isEmpty()) {
48+
log.error("Could not get upload URL. Exiting");
49+
return;
50+
}
51+
52+
boolean written = hpdsClient.writePatientData(query);
53+
if (!written) {
54+
log.warn("HPDS did not write data. Exiting");
55+
return;
56+
}
57+
log.info("HPDS reported successfully writing {} data for {} to file.", "patients.txt", query.getPicSureId());
58+
59+
Path data = Path.of(sharingRoot.toString(), query.getPicSureId(), "patients.txt");
60+
if (!Files.exists(data)) {
61+
log.info("HPDS lied; file {} DNE. Status set to error", data);
62+
return;
63+
}
64+
65+
log.info("File location verified. Uploading for {} to AWS", query.getPicSureId());
66+
try {
67+
boolean uploaded = uploadFileToPresignedUrl(uploadURL.get(), data);
68+
} catch (IOException e) {
69+
log.error("Error uploading data: ", e);
70+
}
71+
log.info("Done uploading patients for query {}", query.getPicSureId());
72+
73+
}
74+
75+
private boolean uploadFileToPresignedUrl(String presignedUrlString, Path filePath) throws IOException {
76+
77+
URL presignedUrl = new URL(presignedUrlString);
78+
HttpURLConnection connection = (HttpURLConnection) presignedUrl.openConnection();
79+
connection.setDoOutput(true);
80+
connection.setRequestMethod("PUT");
81+
OutputStream out = connection.getOutputStream();
82+
83+
try (RandomAccessFile file = new RandomAccessFile(filePath.toString(), "r");
84+
FileChannel inChannel = file.getChannel()) {
85+
ByteBuffer buffer = ByteBuffer.allocate(8192); //Buffer size is 8k
86+
87+
while (inChannel.read(buffer) > 0) {
88+
buffer.flip();
89+
for (int i = 0; i < buffer.limit(); i++) {
90+
out.write(buffer.get());
91+
}
92+
buffer.clear();
93+
}
94+
} catch (IOException e) {
95+
log.error(e.getMessage(), e);
96+
}
97+
98+
out.close();
99+
connection.getResponseCode();
100+
log.info("HTTP response code is " + connection.getResponseCode());
101+
return HttpURLConnection.HTTP_OK == connection.getResponseCode();
102+
}
103+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
package edu.harvard.dbmi.avillach.dataupload.upload.lambda;
2+
3+
import com.fasterxml.jackson.core.JsonProcessingException;
4+
import com.fasterxml.jackson.databind.JsonNode;
5+
import com.fasterxml.jackson.databind.ObjectMapper;
6+
import edu.harvard.dbmi.avillach.dataupload.aws.AWSCredentialsService;
7+
import org.slf4j.Logger;
8+
import org.slf4j.LoggerFactory;
9+
import org.springframework.beans.factory.annotation.Autowired;
10+
import org.springframework.beans.factory.annotation.Value;
11+
import org.springframework.stereotype.Component;
12+
import software.amazon.awssdk.auth.credentials.AwsCredentials;
13+
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
14+
import software.amazon.awssdk.regions.Region;
15+
import software.amazon.awssdk.services.lambda.LambdaClient;
16+
import software.amazon.awssdk.services.lambda.model.InvokeRequest;
17+
import software.amazon.awssdk.services.lambda.model.InvokeResponse;
18+
import software.amazon.awssdk.core.SdkBytes;
19+
20+
import java.util.Optional;
21+
import java.util.UUID;
22+
23+
@Component
24+
public class POSTUrlFetcher {
25+
private static final Logger log = LoggerFactory.getLogger(POSTUrlFetcher.class);
26+
27+
private final AWSCredentialsService credentialsService;
28+
private final Region region;
29+
private final String labdaARN;
30+
private final String bucketName;
31+
32+
@Autowired
33+
public POSTUrlFetcher(
34+
AWSCredentialsService credentialsService,
35+
@Value("${aws.region}") String region,
36+
@Value("${cumulus.lambda:}") String labdaARN,
37+
@Value("${cumulus.bucket:}") String bucketName
38+
) {
39+
this.credentialsService = credentialsService;
40+
this.region = Region.of(region);
41+
this.labdaARN = labdaARN;
42+
this.bucketName = bucketName;
43+
}
44+
45+
public Optional<String> getPreSignedUploadURL(String uploadUUID, String fileName) {
46+
AwsCredentials awsCredentials = credentialsService.constructCredentials();
47+
StaticCredentialsProvider credentialsProvider = StaticCredentialsProvider.create(awsCredentials);
48+
49+
50+
try (
51+
LambdaClient lambdaClient = LambdaClient.builder()
52+
.region(region)
53+
.credentialsProvider(credentialsProvider)
54+
.build()
55+
) {
56+
57+
String payload = new Payload(uploadUUID, fileName, bucketName).asJson();
58+
log.info("Created upload request payload of {}", payload);
59+
60+
InvokeRequest invokeRequest = InvokeRequest.builder()
61+
.functionName(labdaARN)
62+
.payload(SdkBytes.fromUtf8String(payload))
63+
.build();
64+
65+
log.info("Invoking lambda");
66+
InvokeResponse invokeResponse = lambdaClient.invoke(invokeRequest);
67+
log.info("Returning lambda response");
68+
return parseResponse(invokeResponse.payload().asUtf8String());
69+
}
70+
}
71+
72+
private Optional<String> parseResponse(String raw) {
73+
ObjectMapper objectMapper = new ObjectMapper();
74+
try {
75+
JsonNode jsonNode = objectMapper.readTree(raw);
76+
return Optional.ofNullable(jsonNode.get("presigned_url").asText());
77+
} catch (JsonProcessingException e) {
78+
log.error("Error parsing json: ", e);
79+
return Optional.empty();
80+
}
81+
}
82+
83+
private record Payload(String object_key, String bucket_name) {
84+
Payload(String directory, String fileName, String bucketName) {
85+
this(directory + "/" + fileName, bucketName);
86+
}
87+
88+
public String asJson() {
89+
try {
90+
return new ObjectMapper().writeValueAsString(this);
91+
} catch (JsonProcessingException e) {
92+
log.error("Could not make payload: ", e);
93+
return "";
94+
}
95+
}
96+
}
97+
}

uploader/src/main/resources/application.properties

+4-1
Original file line numberDiff line numberDiff line change
@@ -22,4 +22,7 @@ institution.short-display=${HOME_INSTITUTION_DISPLAY}
2222
institution.long-display=${HOME_INSTITUTION_LONG_DISPLAY}
2323
server.port=${PORT:80}
2424

25-
spring.profiles.active=prod
25+
spring.profiles.active=prod
26+
27+
upload.lambda.cumulus.lambda=arn:aws:lambda:us-east-1:991175998352:function:cumulus_s3URLGen
28+
upload.lambda.cumulus.bucket=991175998352-cumulus-dev

uploader/src/test/resources/application.properties

+12-3
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,22 @@
1+
aws.s3.role_arns=${AWS_DATA_UPLOAD_ROLE:}
2+
aws.s3.external_ids=${AWS_SHARED_SECRET:}
3+
aws.s3.buckets=${AWS_S3_BUCKET_NAME:}
4+
aws.s3.access_key_id=${AWS_ACCESS_KEY_ID:}
5+
aws.s3.access_key_secret=${AWS_SECRET_ACCESS_KEY:}
6+
aws.s3.session_token=${AWS_SESSION_TOKEN:}
7+
aws.s3.institution=bch
8+
aws.s3.lambda_name=arn:aws:lambda:us-east-1:991175998352:function:cumulus_s3URLGen
9+
aws.region=us-east-1
10+
aws.kms.key_ids=${AWS_KEY_ID}
11+
aws.authentication.method=${AUTH_METHOD:noauth}
12+
113
spring.datasource.url=jdbc:mysql://${DATA_UPLOAD_DB_HOST:data-upload}:3306/${DATA_UPLOAD_DB_DATABASE:}
214
spring.datasource.username=${DATA_UPLOAD_DB_USER:}
315
spring.datasource.password=${DATA_UPLOAD_DB_PASS:}
416
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
517

618
production=false
719

8-
aws.s3.institution=bch
9-
10-
aws.region=us-east-1
1120
institution.name=bch
1221
institution.short-display=BCH
1322
institution.long-display=Boston Children's Hospital

0 commit comments

Comments
 (0)