Skip to content

Commit 360e199

Browse files
Added direct and indirect write methods to DSv1 (#692)
Set OLD_INDIRECT in the GA connector as default
1 parent 3f4593f commit 360e199

File tree

25 files changed

+1037
-545
lines changed

25 files changed

+1037
-545
lines changed

spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/InjectorBuilder.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ public class InjectorBuilder {
2929
private SparkSession spark = SparkSession.active();
3030
private Optional<StructType> schema = Optional.empty();
3131
private Map<String, String> options = ImmutableMap.<String, String>of();
32+
private Map<String, String> customDefaults = ImmutableMap.<String, String>of();
3233
private boolean tableIsMandatory = true;
3334
private DataSourceVersion dataSourceVersion = DataSourceVersion.V2;
3435

@@ -57,10 +58,15 @@ public InjectorBuilder withDataSourceVersion(DataSourceVersion dataSourceVersion
5758
return this;
5859
}
5960

61+
public InjectorBuilder withCustomDefaults(Map<String, String> customDefaults) {
62+
this.customDefaults = customDefaults;
63+
return this;
64+
}
65+
6066
public Injector build() {
6167
return Guice.createInjector(
6268
new BigQueryClientModule(),
6369
new SparkBigQueryConnectorModule(
64-
spark, options, schema, dataSourceVersion, tableIsMandatory));
70+
spark, options, customDefaults, schema, dataSourceVersion, tableIsMandatory));
6571
}
6672
}

spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/InjectorFactory.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package com.google.cloud.spark.bigquery;
1717

1818
import com.google.cloud.bigquery.connector.common.BigQueryClientModule;
19+
import com.google.common.collect.ImmutableMap;
1920
import com.google.inject.Guice;
2021
import com.google.inject.Injector;
2122
import java.util.Map;
@@ -37,9 +38,15 @@ public static Injector createInjector(
3738
StructType schema,
3839
Map<String, String> options,
3940
boolean tableIsMandatory) {
41+
Map<String, String> customDefaults = ImmutableMap.of();
4042
return Guice.createInjector(
4143
new BigQueryClientModule(),
4244
new SparkBigQueryConnectorModule(
43-
spark, options, Optional.ofNullable(schema), DataSourceVersion.V2, tableIsMandatory));
45+
spark,
46+
options,
47+
customDefaults,
48+
Optional.ofNullable(schema),
49+
DataSourceVersion.V2,
50+
tableIsMandatory));
4451
}
4552
}

spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/SparkBigQueryConfig.java

+14-5
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,8 @@ public class SparkBigQueryConfig
7878

7979
public enum WriteMethod {
8080
DIRECT,
81-
INDIRECT;
81+
INDIRECT,
82+
OLD_INDIRECT;
8283

8384
public static WriteMethod from(@Nullable String writeMethod) {
8485
try {
@@ -96,6 +97,7 @@ public static WriteMethod from(@Nullable String writeMethod) {
9697
public static final String VALIDATE_SPARK_AVRO_PARAM = "validateSparkAvroInternalParam";
9798
public static final String ENABLE_LIST_INFERENCE = "enableListInference";
9899
public static final String INTERMEDIATE_FORMAT_OPTION = "intermediateFormat";
100+
public static final String WRITE_METHOD_PARAM = "writeMethod";
99101
@VisibleForTesting static final DataFormat DEFAULT_READ_DATA_FORMAT = DataFormat.ARROW;
100102

101103
@VisibleForTesting
@@ -186,6 +188,7 @@ public static WriteMethod from(@Nullable String writeMethod) {
186188
// the catalog ones
187189
public static SparkBigQueryConfig from(
188190
Map<String, String> options,
191+
ImmutableMap<String, String> customDefaults,
189192
DataSourceVersion dataSourceVersion,
190193
SparkSession spark,
191194
Optional<StructType> schema,
@@ -196,6 +199,7 @@ public static SparkBigQueryConfig from(
196199
ImmutableMap.copyOf(optionsMap),
197200
ImmutableMap.copyOf(mapAsJavaMap(spark.conf().getAll())),
198201
spark.sparkContext().hadoopConfiguration(),
202+
customDefaults,
199203
spark.sparkContext().defaultParallelism(),
200204
spark.sqlContext().conf(),
201205
spark.version(),
@@ -208,6 +212,7 @@ public static SparkBigQueryConfig from(
208212
Map<String, String> optionsInput,
209213
ImmutableMap<String, String> originalGlobalOptions,
210214
Configuration hadoopConfiguration,
215+
ImmutableMap<String, String> customDefaults,
211216
int defaultParallelism,
212217
SQLConf sqlConf,
213218
String sparkVersion,
@@ -375,10 +380,14 @@ public static SparkBigQueryConfig from(
375380
.transform(String::toUpperCase)
376381
.or(DEFAULT_ARROW_COMPRESSION_CODEC.toString());
377382

383+
WriteMethod writeMethodDefault =
384+
Optional.ofNullable(customDefaults.get(WRITE_METHOD_PARAM))
385+
.map(WriteMethod::from)
386+
.orElse(DEFAULT_WRITE_METHOD);
378387
config.writeMethod =
379-
getAnyOption(globalOptions, options, "writeMethod")
388+
getAnyOption(globalOptions, options, WRITE_METHOD_PARAM)
380389
.transform(WriteMethod::from)
381-
.or(DEFAULT_WRITE_METHOD);
390+
.or(writeMethodDefault);
382391

383392
try {
384393
config.arrowCompressionCodec = CompressionCodec.valueOf(arrowCompressionCodecParam);
@@ -844,8 +853,8 @@ static boolean isSpark24OrAbove(String sparkVersion) {
844853
}
845854

846855
// could not load the spark-avro data source
847-
private static IllegalStateException missingAvroException(
848-
String sparkVersion, Exception cause) {
856+
@VisibleForTesting
857+
static IllegalStateException missingAvroException(String sparkVersion, Exception cause) {
849858
String avroPackage;
850859
if (isSpark24OrAbove(sparkVersion)) {
851860
String scalaVersion = scala.util.Properties.versionNumberString();

spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/SparkBigQueryConnectorModule.java

+11-1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import com.google.cloud.bigquery.connector.common.BigQueryConfig;
1919
import com.google.cloud.bigquery.connector.common.UserAgentProvider;
20+
import com.google.common.collect.ImmutableMap;
2021
import com.google.inject.Binder;
2122
import com.google.inject.Module;
2223
import com.google.inject.Provides;
@@ -30,18 +31,21 @@ public class SparkBigQueryConnectorModule implements Module {
3031

3132
private final SparkSession spark;
3233
private final Map<String, String> options;
34+
private final Map<String, String> customDefaults;
3335
private final Optional<StructType> schema;
3436
private final DataSourceVersion dataSourceVersion;
3537
private final boolean tableIsMandatory;
3638

3739
public SparkBigQueryConnectorModule(
3840
SparkSession spark,
3941
Map<String, String> options,
42+
Map<String, String> customDefaults,
4043
Optional<StructType> schema,
4144
DataSourceVersion dataSourceVersion,
4245
boolean tableIsMandatory) {
4346
this.spark = spark;
4447
this.options = options;
48+
this.customDefaults = customDefaults;
4549
this.schema = schema;
4650
this.dataSourceVersion = dataSourceVersion;
4751
this.tableIsMandatory = tableIsMandatory;
@@ -67,7 +71,13 @@ public DataSourceVersion provideDataSourceVersion() {
6771
@Singleton
6872
@Provides
6973
public SparkBigQueryConfig provideSparkBigQueryConfig() {
70-
return SparkBigQueryConfig.from(options, dataSourceVersion, spark, schema, tableIsMandatory);
74+
return SparkBigQueryConfig.from(
75+
options,
76+
ImmutableMap.copyOf(customDefaults),
77+
dataSourceVersion,
78+
spark,
79+
schema,
80+
tableIsMandatory);
7181
}
7282

7383
@Singleton
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Copyright 2022 Google Inc. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.google.cloud.spark.bigquery.util;
17+
18+
import java.io.IOException;
19+
import java.io.UncheckedIOException;
20+
import java.util.Iterator;
21+
import org.apache.hadoop.fs.RemoteIterator;
22+
23+
public class HdfsUtils {
24+
25+
/** Converts HDFS RemoteIterator to java.util.Iterator */
26+
public static <T> Iterator<T> toJavaUtilIterator(final RemoteIterator<T> remoteIterator) {
27+
return new Iterator<T>() {
28+
@Override
29+
public boolean hasNext() {
30+
try {
31+
return remoteIterator.hasNext();
32+
} catch (IOException e) {
33+
throw new UncheckedIOException(e);
34+
}
35+
}
36+
37+
@Override
38+
public T next() {
39+
try {
40+
return remoteIterator.next();
41+
} catch (IOException e) {
42+
throw new UncheckedIOException(e);
43+
}
44+
}
45+
};
46+
}
47+
48+
public static <T> Iterable<T> toJavaUtilIterable(final RemoteIterator<T> remoteIterator) {
49+
return () -> toJavaUtilIterator(remoteIterator);
50+
}
51+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Copyright 2022 Google Inc. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.google.cloud.spark.bigquery.write;
17+
18+
import com.google.cloud.bigquery.connector.common.BigQueryClient;
19+
import com.google.cloud.spark.bigquery.SparkBigQueryConfig;
20+
import com.google.cloud.spark.bigquery.write.context.DataSourceWriterContext;
21+
import com.google.cloud.spark.bigquery.write.context.WriterCommitMessageContext;
22+
import org.apache.spark.api.java.JavaRDD;
23+
import org.apache.spark.sql.Dataset;
24+
import org.apache.spark.sql.Row;
25+
import org.apache.spark.sql.SQLContext;
26+
27+
public class BigQueryDataSourceWriterInsertableRelation extends BigQueryInsertableRelationBase {
28+
29+
private final DataSourceWriterContext ctx;
30+
31+
public BigQueryDataSourceWriterInsertableRelation(
32+
BigQueryClient bigQueryClient,
33+
SQLContext sqlContext,
34+
SparkBigQueryConfig config,
35+
DataSourceWriterContext ctx) {
36+
super(bigQueryClient, sqlContext, config);
37+
this.ctx = ctx;
38+
}
39+
40+
@Override
41+
public void insert(Dataset<Row> data, boolean overwrite) {
42+
logger.debug("Inserting data={}, overwrite={}", data, overwrite);
43+
44+
// Here we are mimicking the DataSource v2 API behaviour in oder to use the shared code. The
45+
// partition handler
46+
// iterates on each partition separately, invoking the DataWriter interface. The result of the
47+
// iteration is a
48+
// WriterCommitMessageContext which is used to perform the global commit, or abort if needed.
49+
try {
50+
DataSourceWriterContextPartitionHandler partitionHandler =
51+
new DataSourceWriterContextPartitionHandler(
52+
ctx.createWriterContextFactory(), System.currentTimeMillis());
53+
54+
JavaRDD<Row> rowsRDD = data.toJavaRDD();
55+
int numPartitions = rowsRDD.getNumPartitions();
56+
JavaRDD<WriterCommitMessageContext> writerCommitMessagesRDD =
57+
rowsRDD.mapPartitionsWithIndex(partitionHandler, false);
58+
WriterCommitMessageContext[] writerCommitMessages =
59+
writerCommitMessagesRDD.collect().toArray(new WriterCommitMessageContext[0]);
60+
if (writerCommitMessages.length == numPartitions) {
61+
ctx.commit(writerCommitMessages);
62+
} else {
63+
// missing commit messages, so abort
64+
logger.warn(
65+
"It seems that {} out of {} partitions have failed, aborting",
66+
numPartitions - writerCommitMessages.length,
67+
writerCommitMessages.length);
68+
ctx.abort(writerCommitMessages);
69+
}
70+
} catch (Exception e) {
71+
logger.warn("unexpected issue trying to save " + data, e);
72+
ctx.abort(new WriterCommitMessageContext[] {});
73+
}
74+
}
75+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright 2022 Google Inc. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.google.cloud.spark.bigquery.write;
17+
18+
import com.google.cloud.bigquery.connector.common.BigQueryClient;
19+
import com.google.cloud.spark.bigquery.SparkBigQueryConfig;
20+
import org.apache.spark.sql.Dataset;
21+
import org.apache.spark.sql.Row;
22+
import org.apache.spark.sql.SQLContext;
23+
import org.apache.spark.sql.SaveMode;
24+
25+
/**
26+
* The original indirect insertable relation, using Spark's write. Intermediate formats are Parquet,
27+
* ORC or Avro. Deprecated in favor of BigQueryDataSourceWriterInsertableRelation.
28+
*/
29+
public class BigQueryDeprecatedIndirectInsertableRelation extends BigQueryInsertableRelationBase {
30+
31+
public BigQueryDeprecatedIndirectInsertableRelation(
32+
BigQueryClient bigQueryClient, SQLContext sqlContext, SparkBigQueryConfig config) {
33+
super(bigQueryClient, sqlContext, config);
34+
}
35+
36+
@Override
37+
public void insert(Dataset<Row> data, boolean overwrite) {
38+
logger.debug("Inserting data={}, overwrite={}", data, overwrite);
39+
// the helper also supports the v2 api
40+
SaveMode saveMode = overwrite ? SaveMode.Overwrite : SaveMode.Append;
41+
BigQueryWriteHelper helper =
42+
new BigQueryWriteHelper(bigQueryClient, sqlContext, saveMode, config, data, exists());
43+
helper.writeDataFrameToBigQuery();
44+
}
45+
}

0 commit comments

Comments
 (0)