Skip to content

Commit 7e098d6

Browse files
gedlwermajew
andauthored
Fix/md5 management (#108)
* remove unused imports * print md5 when local matches remote * use S3's etag to store the md5 instead of a user defined meta entry. This is because we can only know the md5 *after* the upload has completed, and we must set meta at the time of the upload, so we were using the original file's md5 instead of the transferred file md5 * improve logging messages * reject file if md5 doesn't match (and noCheckMd5 option is not enabled) * remove unused variable and import * add some downloader tests * name file properly * remove unecessary code * inline variable only used once * reduce jacoco coverage target for the downloader module * fix: add assertions for 'latest' obj * add tests for the case where files already exist assert on contents of the "latest" file * Revert "fix: add assertions for 'latest' obj" This reverts commit cb14328. * add tests for the case where files already exist (#109) assert on contents of the "latest" file * tests for LocalFileAcceptor --------- Co-authored-by: Weronika Majewska <wmajewska@endeavorstreaming.com>
1 parent b2e6e1b commit 7e098d6

File tree

12 files changed

+599
-39
lines changed

12 files changed

+599
-39
lines changed

dice-where-downloader-lib/pom.xml

+78
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,55 @@
1515
<slf4j.version>1.7.30</slf4j.version>
1616
<jackson.version>2.14.2</jackson.version>
1717
<javax-bind.version>2.3.1</javax-bind.version>
18+
<test-containers.version>1.19.8</test-containers.version>
19+
<wiremock.version>3.0.1</wiremock.version>
1820
</properties>
1921

22+
<build>
23+
<plugins>
24+
<plugin>
25+
<groupId>org.jacoco</groupId>
26+
<artifactId>jacoco-maven-plugin</artifactId>
27+
<version>${jacoco.version}</version>
28+
<executions>
29+
<execution>
30+
<id>pre-unit-tests</id>
31+
<goals>
32+
<goal>prepare-agent</goal>
33+
</goals>
34+
</execution>
35+
<execution>
36+
<id>post-unit-tests</id>
37+
<phase>test</phase>
38+
<goals>
39+
<goal>report</goal>
40+
</goals>
41+
</execution>
42+
<execution>
43+
<id>default-check</id>
44+
<goals>
45+
<goal>check</goal>
46+
</goals>
47+
<configuration>
48+
<rules>
49+
<rule implementation="org.jacoco.maven.RuleConfiguration">
50+
<element>BUNDLE</element>
51+
<limits>
52+
<limit implementation="org.jacoco.report.check.Limit">
53+
<counter>COMPLEXITY</counter>
54+
<value>COVEREDRATIO</value>
55+
<minimum>0.1</minimum>
56+
</limit>
57+
</limits>
58+
</rule>
59+
</rules>
60+
</configuration>
61+
</execution>
62+
</executions>
63+
</plugin>
64+
</plugins>
65+
</build>
66+
2067
<dependencies>
2168
<dependency>
2269
<groupId>software.amazon.awssdk</groupId>
@@ -60,6 +107,37 @@
60107
<version>${javax-bind.version}</version>
61108
</dependency>
62109

110+
<dependency>
111+
<groupId>org.testcontainers</groupId>
112+
<artifactId>testcontainers</artifactId>
113+
<version>${test-containers.version}</version>
114+
<scope>test</scope>
115+
</dependency>
116+
<dependency>
117+
<groupId>org.testcontainers</groupId>
118+
<artifactId>junit-jupiter</artifactId>
119+
<version>${test-containers.version}</version>
120+
<scope>test</scope>
121+
</dependency>
122+
<dependency>
123+
<groupId>org.testcontainers</groupId>
124+
<artifactId>localstack</artifactId>
125+
<version>${test-containers.version}</version>
126+
<scope>test</scope>
127+
</dependency>
128+
<dependency>
129+
<groupId>org.testcontainers</groupId>
130+
<artifactId>nginx</artifactId>
131+
<version>${test-containers.version}</version>
132+
<scope>test</scope>
133+
</dependency>
134+
<dependency>
135+
<groupId>org.wiremock</groupId>
136+
<artifactId>wiremock</artifactId>
137+
<version>${wiremock.version}</version>
138+
<scope>test</scope>
139+
</dependency>
140+
63141
</dependencies>
64142

65143
</project>

dice-where-downloader-lib/src/main/java/technology/dice/dicewhere/downloader/Download.java

+15-6
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import technology.dice.dicewhere.downloader.source.FileSource;
1111

1212
public abstract class Download {
13+
1314
private static final Logger LOG = LoggerFactory.getLogger(Download.class);
1415

1516
protected final boolean noCheckMd5;
@@ -38,25 +39,29 @@ protected DownloadExecutionResult process(FileAcceptor<?> acceptor, FileSource f
3839
result = processFileDoesNotExist(acceptor, fileSource, pathWritable);
3940
}
4041
LOG.info("A new file was" + (result.isNewFileDownloaded() ? "" : " not") + " downloaded");
42+
LOG.info("Download is " + (!result.isSuccessful() ? "un" : "" + "successful"));
4143
return result;
4244
}
4345

4446
private DownloadExecutionResult processFileDoesNotExist(
4547
FileAcceptor<?> acceptor, FileSource fileSource, boolean pathWritable) {
4648

4749
if (pathWritable) {
48-
final MD5Checksum md5Checksum = fileSource.produce(acceptor);
49-
LOG.info("File successfully transferred");
50+
final MD5Checksum md5Checksum = fileSource.produce(acceptor, noCheckMd5);
51+
LOG.info("File transferred");
5052
if (!noCheckMd5) {
5153
boolean checksumMatches = md5Checksum.matches(fileSource.fileInfo().getMd5Checksum());
5254
if (!checksumMatches) {
53-
LOG.warn(
55+
LOG.error(
5456
"Local and remote files' MD5 do not match: "
5557
+ md5Checksum.stringFormat()
5658
+ " Vs. "
5759
+ fileSource.fileInfo().getMd5Checksum().stringFormat());
5860
} else {
59-
LOG.info("MD5 matches that of the remote file");
61+
LOG.info("MD5 matches that of the remote file: "
62+
+ md5Checksum.stringFormat()
63+
+ " Vs. "
64+
+ fileSource.fileInfo().getMd5Checksum().stringFormat());
6065
}
6166
return new DownloadExecutionResult(
6267
true, checksumMatches, md5Checksum, acceptor.getUri(), checksumMatches);
@@ -87,7 +92,10 @@ private DownloadExecutionResult processFileExists(
8792
+ " Vs. "
8893
+ fileSource.fileInfo().getMd5Checksum().stringFormat());
8994
} else {
90-
LOG.info("MD5 matches that of the remote file");
95+
LOG.info("MD5 matches that of the remote file: "
96+
+ existingMd5.map(md5 -> md5.stringFormat()).orElse("?")
97+
+ " Vs. "
98+
+ fileSource.fileInfo().getMd5Checksum().stringFormat());
9199
}
92100
return new DownloadExecutionResult(
93101
false,
@@ -106,7 +114,8 @@ private DownloadExecutionResult processFileExists(
106114

107115
protected abstract DownloadExecutionResult execute();
108116

109-
protected void checkNecessaryEnvironmentVariables() {}
117+
protected void checkNecessaryEnvironmentVariables() {
118+
}
110119

111120
public boolean isVerbose() {
112121
return verbose;

dice-where-downloader-lib/src/main/java/technology/dice/dicewhere/downloader/actions/ipinfo/IpInfoBaseDownload.java

-2
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
package technology.dice.dicewhere.downloader.actions.ipinfo;
22

3-
import java.net.URI;
43
import technology.dice.dicewhere.downloader.Download;
5-
import technology.dice.dicewhere.downloader.PathUtils;
64

75
public abstract class IpInfoBaseDownload extends Download {
86

dice-where-downloader-lib/src/main/java/technology/dice/dicewhere/downloader/destination/FileAcceptor.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@
77
import technology.dice.dicewhere.downloader.stream.StreamConsumer;
88

99
public interface FileAcceptor<T> {
10-
StreamConsumer<T> getStreamConsumer(MD5Checksum originalFileMd5, Instant originalFileTimestamp);
10+
11+
StreamConsumer<T> getStreamConsumer(MD5Checksum originalFileMd5, Instant originalFileTimestamp,
12+
boolean noMd5Check);
1113

1214
boolean destinationExists();
1315

dice-where-downloader-lib/src/main/java/technology/dice/dicewhere/downloader/destination/local/LocalFileAcceptor.java

+9-3
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import technology.dice.dicewhere.downloader.stream.StreamWithMD5Decorator;
2020

2121
public class LocalFileAcceptor implements FileAcceptor<Void> {
22+
2223
private static final Logger LOG = LoggerFactory.getLogger(LocalFileAcceptor.class);
2324
public static final int BUFFER = 8192;
2425

@@ -30,7 +31,7 @@ public LocalFileAcceptor(Path destination) {
3031

3132
@Override
3233
public StreamConsumer<Void> getStreamConsumer(
33-
MD5Checksum originalFileMd5, Instant originalFileTimestamp) {
34+
MD5Checksum originalFileMd5, Instant originalFileTimestamp, boolean noMd5Check) {
3435
return (stream, size) -> {
3536
try {
3637
Files.createDirectories(destination);
@@ -39,6 +40,10 @@ public StreamConsumer<Void> getStreamConsumer(
3940
LOG.debug("Destination directory already exists");
4041
}
4142
Files.copy(stream, destination, StandardCopyOption.REPLACE_EXISTING);
43+
if ((!noMd5Check) && (!originalFileMd5.matches(stream.md5()))) {
44+
LOG.error("MD5 mismatch. Deleting destination file");
45+
Files.delete(destination);
46+
}
4247
return null;
4348
};
4449
}
@@ -68,11 +73,12 @@ public Optional<MD5Checksum> existingFileMd5() {
6873
BufferedInputStream bis = new BufferedInputStream(is);
6974
StreamWithMD5Decorator md5Is = StreamWithMD5Decorator.of(bis)) {
7075
byte[] buffer = new byte[BUFFER];
71-
while ((md5Is.read(buffer)) != -1) {}
76+
while ((md5Is.read(buffer)) != -1) {
77+
}
7278
return Optional.of(md5Is.md5());
7379
} catch (IOException | NoSuchAlgorithmException e) {
7480
throw new RuntimeException(
75-
"Could not obtain md5 of the file existing at the target: " + destination.toString(),
81+
"Could not obtain md5 of the file existing at the target: " + destination,
7682
e);
7783
}
7884
}

dice-where-downloader-lib/src/main/java/technology/dice/dicewhere/downloader/destination/s3/S3FileAcceptor.java

+25-17
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,11 @@
88
import java.util.HashMap;
99
import java.util.Map;
1010
import java.util.Optional;
11+
import org.slf4j.Logger;
12+
import org.slf4j.LoggerFactory;
1113
import software.amazon.awssdk.core.sync.RequestBody;
1214
import software.amazon.awssdk.services.s3.S3Client;
15+
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
1316
import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
1417
import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
1518
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
@@ -22,8 +25,8 @@
2225

2326
public class S3FileAcceptor implements FileAcceptor<Void> {
2427

28+
private static final Logger LOG = LoggerFactory.getLogger(S3FileAcceptor.class);
2529
private static final String LATEST_KEY = "latest";
26-
public static final String MD5_METADATA_KEY = "md5";
2730
public static final String TIMESTAMP_METADATA_KEY = "ts";
2831
private final S3Client client;
2932
private final String bucket;
@@ -42,11 +45,10 @@ public S3FileAcceptor(
4245

4346
@Override
4447
public StreamConsumer<Void> getStreamConsumer(
45-
MD5Checksum originalFileMd5, Instant originalFileTimestamp) {
48+
MD5Checksum originalFileMd5, Instant originalFileTimestamp, boolean noMd5Check) {
4649
return (StreamConsumer)
4750
(stream, size) -> {
4851
Map<String, String> objectMetadata = new HashMap<>();
49-
objectMetadata.put(MD5_METADATA_KEY, originalFileMd5.stringFormat());
5052
objectMetadata.put(
5153
TIMESTAMP_METADATA_KEY, String.valueOf(originalFileTimestamp.toEpochMilli()));
5254
PutObjectRequest putObjectRequest =
@@ -58,22 +60,29 @@ public StreamConsumer<Void> getStreamConsumer(
5860
.storageClass(StorageClass.INTELLIGENT_TIERING)
5961
.build();
6062
client.putObject(putObjectRequest, RequestBody.fromInputStream(stream, size));
61-
6263
Latest latest = new Latest(clock.instant(), key);
6364
String latestContent = mapper.writeValueAsString(latest);
6465

65-
PutObjectRequest putLatest =
66-
PutObjectRequest.builder()
67-
.key(Paths.get(key).getParent().toString() + "/" + LATEST_KEY)
68-
.bucket(bucket)
69-
.contentLength((long) latestContent.length())
70-
.storageClass(StorageClass.INTELLIGENT_TIERING)
71-
.build();
72-
client.putObject(
73-
putLatest,
74-
RequestBody.fromInputStream(
75-
new StringInputStream(latestContent), latestContent.length()));
66+
if ((!noMd5Check) && (!originalFileMd5.matches(stream.md5()))) {
67+
LOG.error("MD5 mismatch. Deleting destination file");
68+
client.deleteObject(DeleteObjectRequest.builder()
69+
.bucket(bucket)
70+
.key(key)
71+
.build());
72+
} else {
7673

74+
PutObjectRequest putLatest =
75+
PutObjectRequest.builder()
76+
.key(Paths.get(key).getParent().toString() + "/" + LATEST_KEY)
77+
.bucket(bucket)
78+
.contentLength((long) latestContent.length())
79+
.storageClass(StorageClass.INTELLIGENT_TIERING)
80+
.build();
81+
client.putObject(
82+
putLatest,
83+
RequestBody.fromInputStream(
84+
new StringInputStream(latestContent), latestContent.length()));
85+
}
7786
return null;
7887
};
7988
}
@@ -103,8 +112,7 @@ public Optional<MD5Checksum> existingFileMd5() {
103112

104113
try {
105114
final HeadObjectResponse headObjectResponse = client.headObject(headObjectRequest);
106-
final Map<String, String> metadata = headObjectResponse.metadata();
107-
return Optional.ofNullable(metadata.get(MD5_METADATA_KEY)).map(m -> MD5Checksum.of(m));
115+
return Optional.ofNullable(headObjectResponse.eTag()).map(m -> MD5Checksum.of(m));
108116
} catch (NoSuchKeyException e) {
109117
return Optional.empty();
110118
}

dice-where-downloader-lib/src/main/java/technology/dice/dicewhere/downloader/source/BaseUrlSource.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import technology.dice.dicewhere.downloader.stream.StreamWithMD5Decorator;
1212

1313
public abstract class BaseUrlSource implements FileSource {
14+
1415
protected FileInfo fileInfo;
1516
protected final URL dataFileLocation;
1617

@@ -19,14 +20,14 @@ protected BaseUrlSource(URL dataFileLocation) {
1920
}
2021

2122
@Override
22-
public MD5Checksum produce(FileAcceptor acceptor) {
23+
public MD5Checksum produce(FileAcceptor acceptor, boolean noMd5Check) {
2324
try {
2425
HttpURLConnection httpConnection = (HttpURLConnection) this.dataFileLocation.openConnection();
2526
httpConnection.setRequestMethod("GET");
2627

2728
try (StreamWithMD5Decorator is = StreamWithMD5Decorator.of(httpConnection.getInputStream())) {
2829
acceptor
29-
.getStreamConsumer(fileInfo.getMd5Checksum(), fileInfo.getTimestamp())
30+
.getStreamConsumer(fileInfo.getMd5Checksum(), fileInfo.getTimestamp(), noMd5Check)
3031
.consume(is, fileInfo.getSize());
3132
return is.md5();
3233
}

dice-where-downloader-lib/src/main/java/technology/dice/dicewhere/downloader/source/FileSource.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@
55
import technology.dice.dicewhere.downloader.destination.FileAcceptor;
66

77
public interface FileSource {
8+
89
FileInfo fileInfo();
910

10-
MD5Checksum produce(FileAcceptor consumer);
11+
MD5Checksum produce(FileAcceptor consumer, boolean noMd5Check);
1112
}

dice-where-downloader-lib/src/main/java/technology/dice/dicewhere/downloader/source/s3/S3Source.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@
2222
import technology.dice.dicewhere.downloader.stream.StreamWithMD5Decorator;
2323

2424
public class S3Source implements FileSource {
25+
2526
private static Logger LOG = LoggerFactory.getLogger(S3Source.class);
26-
public static final String MD5_METADATA_KEY = "md5";
2727
public static final String TIMESTAMP_METADATA_KEY = "ts";
2828
private final S3Client client;
2929
private final String bucket;
@@ -43,11 +43,11 @@ public FileInfo fileInfo() {
4343
HeadObjectRequest.builder().key(key).bucket(bucket).build();
4444

4545
final HeadObjectResponse headObjectResponse = client.headObject(headObjectRequest);
46-
final Map<String, String> metadata = headObjectResponse.metadata();
47-
if (!metadata.containsKey(MD5_METADATA_KEY)) {
46+
if (headObjectResponse.eTag() == null) {
4847
throw new DownloaderException(
4948
"Remote file does not have md5 information. Please delete the file and re-upload");
5049
}
50+
final Map<String, String> metadata = headObjectResponse.metadata();
5151
if (!metadata.containsKey(TIMESTAMP_METADATA_KEY)) {
5252
LOG.warn("Timestamp not available at source. Using now as timestamp.");
5353
}
@@ -59,20 +59,20 @@ public FileInfo fileInfo() {
5959
Optional.ofNullable(metadata.get(TIMESTAMP_METADATA_KEY))
6060
.map(m -> Instant.ofEpochMilli(Long.valueOf(m)))
6161
.orElse(Instant.now()),
62-
MD5Checksum.of(metadata.get(MD5_METADATA_KEY)),
62+
MD5Checksum.of(headObjectResponse.eTag()),
6363
size);
6464
}
6565

6666
return this.fileInfo;
6767
}
6868

6969
@Override
70-
public MD5Checksum produce(FileAcceptor consumer) {
70+
public MD5Checksum produce(FileAcceptor consumer, boolean noMd5Check) {
7171
GetObjectRequest getObjectRequest = GetObjectRequest.builder().bucket(bucket).key(key).build();
7272
try (final ResponseInputStream<GetObjectResponse> object = client.getObject(getObjectRequest);
7373
StreamWithMD5Decorator is = StreamWithMD5Decorator.of(object)) {
7474
consumer
75-
.getStreamConsumer(fileInfo.getMd5Checksum(), fileInfo.getTimestamp())
75+
.getStreamConsumer(fileInfo.getMd5Checksum(), fileInfo.getTimestamp(), noMd5Check)
7676
.consume(is, fileInfo.getSize());
7777
return is.md5();
7878
} catch (IOException | NoSuchAlgorithmException e) {

dice-where-downloader-lib/src/main/java/technology/dice/dicewhere/downloader/stream/StreamConsumer.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,5 @@
55

66
@FunctionalInterface
77
public interface StreamConsumer<T> {
8-
T consume(InputStream stream, long size) throws IOException;
8+
T consume(StreamWithMD5Decorator stream, long size) throws IOException;
99
}

0 commit comments

Comments
 (0)