Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Managed Iceberg] unbounded source #33504

Open
wants to merge 41 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
bb87511
initial
ahmedabu98 Jan 6, 2025
853de4d
let CombinedScanTask do splitting (based on Parquet row groups)
ahmedabu98 Jan 7, 2025
69fd988
perf improv
ahmedabu98 Jan 7, 2025
da2f33f
create one read task descriptor per snapshot range
ahmedabu98 Jan 8, 2025
73c8992
Merge branch 'master' of https://github.com/ahmedabu98/beam into iceb…
ahmedabu98 Jan 22, 2025
81ca709
some improvements
ahmedabu98 Jan 25, 2025
e319d76
use GiB for streaming, Redistribute for batch; update docs
ahmedabu98 Jan 30, 2025
c25cd75
Merge branch 'master' of https://github.com/ahmedabu98/beam into iceb…
ahmedabu98 Jan 30, 2025
af1ec85
use static value
ahmedabu98 Feb 3, 2025
f5d3268
add some test
ahmedabu98 Feb 3, 2025
df40239
Merge branch 'master' of https://github.com/ahmedabu98/beam into iceb…
ahmedabu98 Feb 3, 2025
43ab88f
Merge branch 'master' of https://github.com/ahmedabu98/beam into iceb…
ahmedabu98 Feb 4, 2025
20db0ee
Merge branch 'master' of https://github.com/ahmedabu98/beam into iceb…
ahmedabu98 Feb 7, 2025
622625f
add a java doc; don't use static block to create coder
ahmedabu98 Feb 10, 2025
4c25d3f
spotless
ahmedabu98 Feb 10, 2025
8666166
add options: from/to timestamp, starting strategy, and streaming toggle
ahmedabu98 Feb 13, 2025
297c309
trigger integration tests
ahmedabu98 Feb 13, 2025
5e3a2cc
small test fix
ahmedabu98 Feb 13, 2025
8b131fd
Merge branch 'master' of https://github.com/ahmedabu98/beam into iceb…
ahmedabu98 Feb 14, 2025
887eff1
scan every snapshot individually; use snapshot commit timestamp to ma…
ahmedabu98 Feb 25, 2025
6cfc2d8
new schematransform for cdc streaming; add watermark configs
ahmedabu98 Mar 3, 2025
fbad86e
cleanup
ahmedabu98 Mar 3, 2025
50f9497
add guava import
ahmedabu98 Mar 3, 2025
4f1f40b
remove iceberg_cdc_read from xlang auto-wrapper gen
ahmedabu98 Mar 3, 2025
633365c
fix javadoc
ahmedabu98 Mar 3, 2025
37485f1
cleanup
ahmedabu98 Mar 3, 2025
4ede0e8
spotless
ahmedabu98 Mar 4, 2025
db9fd63
use CDC schema for batch and streaming; re-introduce boolean 'streami…
ahmedabu98 Mar 4, 2025
79ab16a
add to CHANGES.md and discussion docs
ahmedabu98 Mar 4, 2025
06a4cee
spotless
ahmedabu98 Mar 4, 2025
132034f
Merge branch 'master' of https://github.com/ahmedabu98/beam into iceb…
ahmedabu98 Mar 4, 2025
795c87c
address review comments about java docs
ahmedabu98 Mar 5, 2025
c6461c9
remove raw guava dep
ahmedabu98 Mar 5, 2025
7dbf3e1
add another test for read utils
ahmedabu98 Mar 5, 2025
5263a13
Merge branch 'master' of https://github.com/ahmedabu98/beam into iceb…
ahmedabu98 Mar 6, 2025
40fe4ab
use cached schemas
ahmedabu98 Mar 7, 2025
db3c570
watermark based on snapshot timestamp; remove infinite allowed skew; …
ahmedabu98 Mar 10, 2025
7078f20
remove check
ahmedabu98 Mar 10, 2025
fce87dc
remove watermark support; add a counter and some helpful logging
ahmedabu98 Mar 11, 2025
adbbfaf
Merge branch 'master' of https://github.com/ahmedabu98/beam into iceb…
ahmedabu98 Mar 11, 2025
30f3ced
change metric name
ahmedabu98 Mar 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/trigger_files/IO_Iceberg_Integration_Tests.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run.",
"modification": 2
"modification": 3
}
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@

* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* [Java] Use API compatible with both com.google.cloud.bigdataoss:util 2.x and 3.x in BatchLoads ([#34105](https://github.com/apache/beam/pull/34105))
* [Iceberg] Added new CDC source for batch and streaming, available as `Managed.ICEBERG_CDC` ([#33504](https://github.com/apache/beam/pull/33504))

## New Features / Improvements

Expand Down
8 changes: 6 additions & 2 deletions contributor-docs/discussion-docs/2025.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,9 @@ limitations under the License.
# List Of Documents Submitted To dev@beam.apache.org In 2025
| No. | Author | Subject | Date (UTC) |
|---|---|---|---|
| 1 | Danny McCormick | [Beam Python Dependency Extras](https://docs.google.com/document/d/1c84Gc-cZRCfrU8f7kWGsNR2o8oSRjCM-dGHO9KvPWPw) | 2025-01-27 17:50:00 |
| 2 | Danny McCormick | [How vLLM Model Handler Works (Plus a Summary of Model Memory Management in Beam ML)](https://docs.google.com/document/d/1UB4umrtnp1Eg45fiUB3iLS7kPK3BE6pcf0YRDkA289Q) | 2025-01-31 17:50:00 |
| 1 | Kenneth Knowles | [Apache Beam Release Acceptance Criteria - Google Sheets](https://docs.google.com/spreadsheets/d/1qk-N5vjXvbcEk68GjbkSZTR8AGqyNUM-oLFo_ZXBpJw) | 2025-01-13 10:54:22 |
| 2 | Danny McCormick | [Apache Beam Vendored Dependencies Release Guide](https://s.apache.org/beam-release-vendored-artifacts) | 2025-01-13 15:00:51 |
| 3 | Danny McCormick | [Beam Python & ML Dependency Extras](https://docs.google.com/document/d/1c84Gc-cZRCfrU8f7kWGsNR2o8oSRjCM-dGHO9KvPWPw) | 2025-01-27 15:33:36 |
| 4 | Danny McCormick | [How vLLM Model Handler Works (Plus a Summary of Model Memory Management in Beam ML)](https://docs.google.com/document/d/1UB4umrtnp1Eg45fiUB3iLS7kPK3BE6pcf0YRDkA289Q) | 2025-01-31 11:56:59 |
| 5 | Shunping Huang | [Improve Logging Dependencies in Beam Java SDK](https://docs.google.com/document/d/1IkbiM4m8D-aB3NYI1aErFZHt6M7BQ-8eCULh284Davs) | 2025-02-04 15:13:14 |
| 6 | Ahmed Abualsaud | [Iceberg Incremental Source design](https://s.apache.org/beam-iceberg-incremental-source) | 2025-03-03 14:52:42 |
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ message ManagedTransforms {
"beam:schematransform:org.apache.beam:bigquery_storage_read:v1"];
BIGQUERY_WRITE = 5 [(org.apache.beam.model.pipeline.v1.beam_urn) =
"beam:schematransform:org.apache.beam:bigquery_write:v1"];
ICEBERG_CDC_READ = 6 [(org.apache.beam.model.pipeline.v1.beam_urn) =
"beam:schematransform:org.apache.beam:iceberg_cdc_read:v1"];
}
}

Expand Down
3 changes: 2 additions & 1 deletion sdks/java/io/iceberg/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ dependencies {
implementation library.java.slf4j_api
implementation library.java.joda_time
implementation "org.apache.parquet:parquet-column:$parquet_version"
implementation "org.apache.parquet:parquet-hadoop:$parquet_version"
implementation "org.apache.orc:orc-core:$orc_version"
implementation "org.apache.iceberg:iceberg-core:$iceberg_version"
implementation "org.apache.iceberg:iceberg-api:$iceberg_version"
Expand Down Expand Up @@ -88,7 +89,7 @@ dependencies {
testImplementation library.java.google_api_services_bigquery

testRuntimeOnly library.java.slf4j_jdk14
testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
testImplementation project(path: ":runners:direct-java", configuration: "shadow")
testRuntimeOnly project(path: ":runners:google-cloud-dataflow-java")
hadoopVersions.each {kv ->
"hadoopVersion$kv.key" "org.apache.hadoop:hadoop-client:$kv.value"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public String apply(FileWriteResult input) {
.apply(
"Append metadata updates to tables",
ParDo.of(new AppendFilesToTablesDoFn(catalogConfig, manifestFilePrefix)))
.setCoder(KvCoder.of(StringUtf8Coder.of(), SnapshotInfo.CODER));
.setCoder(KvCoder.of(StringUtf8Coder.of(), SnapshotInfo.getCoder()));
}

private static class AppendFilesToTablesDoFn
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.iceberg;

import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutionException;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.values.KV;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.DataOperations;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.IncrementalAppendScan;
import org.apache.iceberg.ScanTaskParser;
import org.apache.iceberg.Table;
import org.apache.iceberg.io.CloseableIterable;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Scans the given snapshot and creates multiple {@link ReadTask}s. Each task represents a portion
* of a data file that was appended within the snapshot range.
*/
class CreateReadTasksDoFn
extends DoFn<KV<String, List<SnapshotInfo>>, KV<ReadTaskDescriptor, ReadTask>> {
private static final Logger LOG = LoggerFactory.getLogger(CreateReadTasksDoFn.class);
private static final Counter totalScanTasks =
Metrics.counter(CreateReadTasksDoFn.class, "totalScanTasks");
// TODO(ahmedabu98): should we expose a metric that tracks the latest observed snapshot sequence
// number?

private final IcebergScanConfig scanConfig;

CreateReadTasksDoFn(IcebergScanConfig scanConfig) {
this.scanConfig = scanConfig;
}

@ProcessElement
public void process(
@Element KV<String, List<SnapshotInfo>> element,
OutputReceiver<KV<ReadTaskDescriptor, ReadTask>> out)
throws IOException, ExecutionException {
Table table =
TableCache.getRefreshed(element.getKey(), scanConfig.getCatalogConfig().catalog());
List<SnapshotInfo> snapshots = element.getValue();

// scan snapshots individually and assign commit timestamp to files
for (SnapshotInfo snapshot : snapshots) {
@Nullable Long fromSnapshot = snapshot.getParentId();
long toSnapshot = snapshot.getSnapshotId();

if (!DataOperations.APPEND.equals(snapshot.getOperation())) {
LOG.info(
"Skipping non-append snapshot of operation '{}'. Sequence number: {}, id: {}",
snapshot.getOperation(),
snapshot.getSequenceNumber(),
snapshot.getSnapshotId());
}

LOG.info("Planning to scan snapshot {}", toSnapshot);
IncrementalAppendScan scan = table.newIncrementalAppendScan().toSnapshot(toSnapshot);
if (fromSnapshot != null) {
scan = scan.fromSnapshotExclusive(fromSnapshot);
}

createAndOutputReadTasks(scan, snapshot, out);
}
}

private void createAndOutputReadTasks(
IncrementalAppendScan scan,
SnapshotInfo snapshot,
OutputReceiver<KV<ReadTaskDescriptor, ReadTask>> out)
throws IOException {
int numTasks = 0;
try (CloseableIterable<CombinedScanTask> combinedScanTasks = scan.planTasks()) {
for (CombinedScanTask combinedScanTask : combinedScanTasks) {
// A single DataFile can be broken up into multiple FileScanTasks
for (FileScanTask fileScanTask : combinedScanTask.tasks()) {
ReadTask task =
ReadTask.builder()
.setFileScanTaskJson(ScanTaskParser.toJson(fileScanTask))
.setByteSize(fileScanTask.file().fileSizeInBytes())
.setOperation(snapshot.getOperation())
.setSnapshotTimestampMillis(snapshot.getTimestampMillis())
.build();
ReadTaskDescriptor descriptor =
ReadTaskDescriptor.builder()
.setTableIdentifierString(checkStateNotNull(snapshot.getTableIdentifierString()))
.build();

out.outputWithTimestamp(
KV.of(descriptor, task), Instant.ofEpochMilli(snapshot.getTimestampMillis()));
totalScanTasks.inc();
numTasks++;
}
}
}
LOG.info("Snapshot {} produced {} read tasks.", snapshot.getSnapshotId(), numTasks);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,26 +51,25 @@ public static Builder builder() {
}

public org.apache.iceberg.catalog.Catalog catalog() {
if (cachedCatalog != null) {
return cachedCatalog;
if (cachedCatalog == null) {
String catalogName = getCatalogName();
if (catalogName == null) {
catalogName = "apache-beam-" + ReleaseInfo.getReleaseInfo().getVersion();
}
Map<String, String> catalogProps = getCatalogProperties();
if (catalogProps == null) {
catalogProps = Maps.newHashMap();
}
Map<String, String> confProps = getConfigProperties();
if (confProps == null) {
confProps = Maps.newHashMap();
}
Configuration config = new Configuration();
for (Map.Entry<String, String> prop : confProps.entrySet()) {
config.set(prop.getKey(), prop.getValue());
}
cachedCatalog = CatalogUtil.buildIcebergCatalog(catalogName, catalogProps, config);
}
String catalogName = getCatalogName();
if (catalogName == null) {
catalogName = "apache-beam-" + ReleaseInfo.getReleaseInfo().getVersion();
}
Map<String, String> catalogProps = getCatalogProperties();
if (catalogProps == null) {
catalogProps = Maps.newHashMap();
}
Map<String, String> confProps = getConfigProperties();
if (confProps == null) {
confProps = Maps.newHashMap();
}
Configuration config = new Configuration();
for (Map.Entry<String, String> prop : confProps.entrySet()) {
config.set(prop.getKey(), prop.getValue());
}
cachedCatalog = CatalogUtil.buildIcebergCatalog(catalogName, catalogProps, config);
return cachedCatalog;
}

Expand Down
Loading
Loading