Skip to content

Commit 504559a

Browse files
authored
Bytestreams bugfixes (buildfarm#240)
* Write reset interface addition Write resets required a new interface, close() was not sufficient to correctly handle a write request with a 0 offset with the intention of resetting. * Logging cleanup for ByteStreamService * Call readBlob in ByteStreamService for blobs ByteStreamService now uses the readBlob layer to interpret special unlimited 'limit' of 0. * Memory getOperationStreamWrite implementation Expand the ByteStringStreamSource to support committedSize and a completion future for close. * ByteStreamService queryWriteStatus implementation Present write query interface for progressive client implementations * Prevent write processing of completed blobs An attempt to upload a previously completed blob will be handled immediately by the write listener, and must not attempt to write into its output stream. * Executor write support for stdout/stderr The (renamed) ByteStringWriteReader now waits for a successful write completion in getData(), uses ByteString.Output, and try-with-resources. In the event of an empty operation stream for stdout/err, a null Write is used that completes immediately upon close. The Executor no longer interrupts the readers to preserve remote request safety - they are expected to close safely.
1 parent d9161ae commit 504559a

16 files changed

+386
-207
lines changed

src/main/java/build/buildfarm/BUILD

+1
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ java_library(
110110
name = "server",
111111
srcs = glob(["server/**/*.java"]),
112112
deps = [
113+
":cas",
113114
":common",
114115
":common-grpc",
115116
":instance",

src/main/java/build/buildfarm/cas/DigestMismatchException.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import build.buildfarm.common.DigestUtil;
55
import java.io.IOException;
66

7-
class DigestMismatchException extends IOException {
7+
public class DigestMismatchException extends IOException {
88
private final Digest actual;
99
private final Digest expected;
1010

src/main/java/build/buildfarm/cas/MemoryWriteOutputStream.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ class MemoryWriteOutputStream extends OutputStream implements Write {
1717
private final Digest digest;
1818
private final ListenableFuture<ByteString> writtenFuture;
1919
private final ByteString.Output out;
20-
private final HashingOutputStream hashOut;
20+
private HashingOutputStream hashOut;
2121

2222
MemoryWriteOutputStream(ContentAddressableStorage storage, Digest digest, ListenableFuture<ByteString> writtenFuture) {
2323
this.storage = storage;
@@ -95,6 +95,12 @@ public OutputStream getOutput() {
9595
return this;
9696
}
9797

98+
@Override
99+
public void reset() {
100+
out.reset();
101+
hashOut = DigestUtil.forDigest(digest).newHashingOutputStream(out);
102+
}
103+
98104
@Override
99105
public void addListener(Runnable onCompleted, Executor executor) {
100106
writtenFuture.addListener(onCompleted, executor);

src/main/java/build/buildfarm/cas/Writes.java

+4
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,10 @@ public OutputStream getOutput() {
6565
return nullOutputStream();
6666
}
6767

68+
@Override
69+
public void reset() {
70+
}
71+
6872
@Override
6973
public void addListener(Runnable onCompleted, Executor executor) {
7074
executor.execute(onCompleted);

src/main/java/build/buildfarm/common/Write.java

+2
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ public interface Write {
2525

2626
OutputStream getOutput() throws IOException;
2727

28+
void reset();
29+
2830
/** add a callback to be invoked when blob has been completed */
2931
void addListener(Runnable onCompleted, Executor executor);
3032
}

src/main/java/build/buildfarm/common/grpc/StubWriteOutputStream.java

+14
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,17 @@ public class StubWriteOutputStream extends OutputStream implements Write {
3030
private final long expectedSize;
3131
private final boolean autoflush;
3232
private final byte buf[];
33+
private boolean wasReset = false;
3334
private final Supplier<QueryWriteStatusResponse> writeStatus = Suppliers.memoize(
3435
new Supplier() {
3536
@Override
3637
public QueryWriteStatusResponse get() {
38+
if (wasReset) {
39+
return QueryWriteStatusResponse.newBuilder()
40+
.setCommittedSize(0)
41+
.setComplete(false)
42+
.build();
43+
}
3744
return bsBlockingStub.get()
3845
.queryWriteStatus(QueryWriteStatusRequest.newBuilder()
3946
.setResourceName(resourceName)
@@ -194,6 +201,13 @@ public OutputStream getOutput() {
194201
return this;
195202
}
196203

204+
@Override
205+
public void reset() {
206+
wasReset = true;
207+
offset = 0;
208+
writtenBytes = 0;
209+
}
210+
197211
@Override
198212
public void addListener(Runnable onCompleted, Executor executor) {
199213
writeFuture.addListener(onCompleted, executor);

src/main/java/build/buildfarm/instance/memory/ByteStringStreamSource.java

+12
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,15 @@
1515
package build.buildfarm.instance.memory;
1616

1717
import com.google.protobuf.ByteString;
18+
import com.google.common.util.concurrent.ListenableFuture;
19+
import com.google.common.util.concurrent.SettableFuture;
1820
import java.io.InputStream;
1921
import java.io.OutputStream;
2022

2123
class ByteStringStreamSource {
2224
private final Runnable onClose;
2325
private final OutputStream outputStream;
26+
private final SettableFuture<Void> closedFuture = SettableFuture.create();
2427

2528
private final Object bufferSync;
2629
private ByteString buffer;
@@ -55,6 +58,7 @@ public void close() {
5558
synchronized (bufferSync) {
5659
closed = true;
5760
bufferSync.notifyAll();
61+
closedFuture.set(null);
5862
}
5963
onClose.run();
6064
}
@@ -71,6 +75,14 @@ public OutputStream getOutputStream() {
7175
return outputStream;
7276
}
7377

78+
public long getCommittedSize() {
79+
return buffer.size();
80+
}
81+
82+
public ListenableFuture<Void> getClosedFuture() {
83+
return closedFuture;
84+
}
85+
7486
public InputStream openStream() {
7587
return new InputStream() {
7688
private int offset = 0;

src/main/java/build/buildfarm/instance/memory/MemoryInstance.java

+26-7
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@
8080
import java.util.UUID;
8181
import java.util.concurrent.Callable;
8282
import java.util.concurrent.ConcurrentHashMap;
83+
import java.util.concurrent.Executor;
8384
import java.util.concurrent.ExecutorService;
8485
import java.util.function.Predicate;
8586
import java.util.logging.Logger;
@@ -272,15 +273,33 @@ private ByteStringStreamSource getSource(String name) {
272273

273274
@Override
274275
public Write getOperationStreamWrite(String name) {
275-
throw new UnsupportedOperationException(); // needs source->write conversion
276-
}
276+
return new Write() {
277+
@Override
278+
public long getCommittedSize() {
279+
return getSource(name).getCommittedSize();
280+
}
277281

278-
/*
279-
@Override
280-
public OutputStream getStreamOutput(String name) {
281-
return getSource(name).getOutputStream();
282+
@Override
283+
public boolean isComplete() {
284+
return getSource(name).isClosed();
285+
}
286+
287+
@Override
288+
public OutputStream getOutput() {
289+
return getSource(name).getOutputStream();
290+
}
291+
292+
@Override
293+
public void reset() {
294+
streams.remove(name);
295+
}
296+
297+
@Override
298+
public void addListener(Runnable onCompleted, Executor executor) {
299+
getSource(name).getClosedFuture().addListener(onCompleted, executor);
300+
}
301+
};
282302
}
283-
*/
284303

285304
@Override
286305
public InputStream newOperationStreamInput(String name, long offset) throws IOException {

0 commit comments

Comments
 (0)