Skip to content

Commit c6cf659

Browse files
authored
fix(bigquery/storage/managedwriter): fix double-close error, add tests (#4502)
Writing tests picked up the error, so hooray. BufferedStream integration test exposed that while the API surface is in preview without special requirements, advanced features such as the FlushRows rpc used by BufferedStream does. This has been whitelisted for test projects, but we'll want to add this to doc.go when I start that PR. Towards #4366
1 parent 9923fd1 commit c6cf659

File tree

2 files changed

+164
-2
lines changed

2 files changed

+164
-2
lines changed

bigquery/storage/managedwriter/integration_test.go

+162-1
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,12 @@ import (
2626
"cloud.google.com/go/internal/testutil"
2727
"cloud.google.com/go/internal/uid"
2828
"google.golang.org/api/option"
29+
"google.golang.org/protobuf/encoding/protojson"
2930
"google.golang.org/protobuf/proto"
3031
"google.golang.org/protobuf/reflect/protodesc"
3132
"google.golang.org/protobuf/reflect/protoreflect"
3233
"google.golang.org/protobuf/types/descriptorpb"
34+
"google.golang.org/protobuf/types/dynamicpb"
3335
)
3436

3537
var (
@@ -120,7 +122,7 @@ func setupDynamicDescriptors(t *testing.T, schema bigquery.Schema) (protoreflect
120122
return messageDescriptor, protodesc.ToDescriptorProto(messageDescriptor)
121123
}
122124

123-
func TestIntegration_ManagedWriter_BasicOperation(t *testing.T) {
125+
func TestIntegration_ManagedWriter_DefaultStream(t *testing.T) {
124126
mwClient, bqClient := getTestClients(context.Background(), t)
125127
defer mwClient.Close()
126128
defer bqClient.Close()
@@ -205,3 +207,162 @@ func TestIntegration_ManagedWriter_BasicOperation(t *testing.T) {
205207
wantRows = wantRows * 2
206208
validateRowCount(ctx, t, bqClient, testTable, wantRows)
207209
}
210+
211+
func TestIntegration_ManagedWriter_DynamicJSON(t *testing.T) {
212+
mwClient, bqClient := getTestClients(context.Background(), t)
213+
defer mwClient.Close()
214+
defer bqClient.Close()
215+
216+
dataset, cleanup, err := setupTestDataset(context.Background(), t, bqClient)
217+
if err != nil {
218+
t.Fatalf("failed to init test dataset: %v", err)
219+
}
220+
defer cleanup()
221+
222+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
223+
defer cancel()
224+
225+
// prep a suitable destination table.
226+
testTable := dataset.Table(tableIDs.New())
227+
schema := bigquery.Schema{
228+
{Name: "name", Type: bigquery.StringFieldType, Required: true},
229+
{Name: "value", Type: bigquery.IntegerFieldType, Required: true},
230+
}
231+
if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: schema}); err != nil {
232+
t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err)
233+
}
234+
235+
md, descriptorProto := setupDynamicDescriptors(t, schema)
236+
237+
// setup a new stream.
238+
ms, err := mwClient.NewManagedStream(ctx,
239+
WithDestinationTable(fmt.Sprintf("projects/%s/datasets/%s/tables/%s", testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
240+
WithType(DefaultStream),
241+
WithSchemaDescriptor(descriptorProto),
242+
)
243+
if err != nil {
244+
t.Fatalf("NewManagedStream: %v", err)
245+
}
246+
247+
sampleData := [][]byte{
248+
[]byte(`{"name": "one", "value": 1}`),
249+
[]byte(`{"name": "two", "value": 2}`),
250+
[]byte(`{"name": "three", "value": 3}`),
251+
[]byte(`{"name": "four", "value": 4}`),
252+
[]byte(`{"name": "five", "value": 5}`),
253+
}
254+
255+
// prevalidate we have no data in table.
256+
validateRowCount(ctx, t, bqClient, testTable, 0)
257+
258+
// First, append rows individually.
259+
var results []*AppendResult
260+
for k, v := range sampleData {
261+
message := dynamicpb.NewMessage(md)
262+
263+
// First, json->proto message
264+
err = protojson.Unmarshal(v, message)
265+
if err != nil {
266+
t.Fatalf("failed to Unmarshal json message for row %d: %v", k, err)
267+
}
268+
// Then, proto message -> bytes.
269+
b, err := proto.Marshal(message)
270+
if err != nil {
271+
t.Fatalf("failed to marshal proto bytes for row %d: %v", k, err)
272+
}
273+
results, err = ms.AppendRows(ctx, [][]byte{b}, NoStreamOffset)
274+
if err != nil {
275+
t.Errorf("single-row append %d failed: %v", k, err)
276+
}
277+
}
278+
279+
// wait for the result to indicate ready, then validate.
280+
results[0].Ready()
281+
wantRows := int64(len(sampleData))
282+
validateRowCount(ctx, t, bqClient, testTable, wantRows)
283+
}
284+
285+
func TestIntegration_ManagedWriter_BufferedStream(t *testing.T) {
286+
mwClient, bqClient := getTestClients(context.Background(), t)
287+
defer mwClient.Close()
288+
defer bqClient.Close()
289+
290+
dataset, cleanup, err := setupTestDataset(context.Background(), t, bqClient)
291+
if err != nil {
292+
t.Fatalf("failed to init test dataset: %v", err)
293+
}
294+
defer cleanup()
295+
296+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
297+
defer cancel()
298+
299+
// prep a suitable destination table.
300+
testTable := dataset.Table(tableIDs.New())
301+
schema := bigquery.Schema{
302+
{Name: "name", Type: bigquery.StringFieldType, Required: true},
303+
{Name: "value", Type: bigquery.IntegerFieldType, Required: true},
304+
}
305+
if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: schema}); err != nil {
306+
t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err)
307+
}
308+
// We'll use a precompiled test proto, but we need it's corresponding descriptorproto representation
309+
// to send as the stream's schema.
310+
m := &testdata.SimpleMessage{}
311+
descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor())
312+
313+
// setup a new stream.
314+
ms, err := mwClient.NewManagedStream(ctx,
315+
WithDestinationTable(fmt.Sprintf("projects/%s/datasets/%s/tables/%s", testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
316+
WithType(BufferedStream),
317+
WithSchemaDescriptor(descriptorProto),
318+
)
319+
if err != nil {
320+
t.Fatalf("NewManagedStream: %v", err)
321+
}
322+
323+
info, err := ms.c.getWriteStream(ctx, ms.streamSettings.streamID)
324+
if err != nil {
325+
t.Errorf("couldn't get stream info: %v", err)
326+
}
327+
if info.GetType().String() != string(ms.StreamType()) {
328+
t.Errorf("mismatch on stream type, got %s want %s", info.GetType(), ms.StreamType())
329+
}
330+
331+
// prevalidate we have no data in table.
332+
validateRowCount(ctx, t, bqClient, testTable, 0)
333+
334+
testData := []*testdata.SimpleMessage{
335+
{Name: "one", Value: 1},
336+
{Name: "two", Value: 2},
337+
{Name: "three", Value: 3},
338+
{Name: "four", Value: 1},
339+
{Name: "five", Value: 2},
340+
}
341+
342+
// First, send the test rows individually, validate, then advance.
343+
var expectedRows int64
344+
for k, mesg := range testData {
345+
b, err := proto.Marshal(mesg)
346+
if err != nil {
347+
t.Errorf("failed to marshal message %d: %v", k, err)
348+
}
349+
data := [][]byte{b}
350+
results, err := ms.AppendRows(ctx, data, NoStreamOffset)
351+
if err != nil {
352+
t.Errorf("single-row append %d failed: %v", k, err)
353+
}
354+
// wait for ack
355+
offset, err := results[0].GetResult(ctx)
356+
if err != nil {
357+
t.Errorf("got error from pending result %d: %v", k, err)
358+
}
359+
validateRowCount(ctx, t, bqClient, testTable, expectedRows)
360+
// move offset and re-validate.
361+
flushOffset, err := ms.FlushRows(ctx, offset)
362+
if err != nil {
363+
t.Errorf("failed to flush offset to %d: %v", offset, err)
364+
}
365+
expectedRows = flushOffset + 1
366+
validateRowCount(ctx, t, bqClient, testTable, expectedRows)
367+
}
368+
}

bigquery/storage/managedwriter/managed_stream.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -349,8 +349,9 @@ func recvProcessor(ctx context.Context, arc storagepb.BigQueryWrite_AppendRowsCl
349349
off := success.GetOffset()
350350
if off != nil {
351351
nextWrite.markDone(off.GetValue(), nil, fc)
352+
} else {
353+
nextWrite.markDone(NoStreamOffset, nil, fc)
352354
}
353-
nextWrite.markDone(NoStreamOffset, nil, fc)
354355
}
355356
}
356357
}

0 commit comments

Comments
 (0)