A Beam ML pipeline example using Apache Beam with Flink Runner for image classification tasks. This project adapts code from the Google Dataflow ML Starter repository.
- Python 3
- Linux OS (Flink portable runner tests are not compatible with macOS or Windows)
- Docker (required for Portable Runner with Flink)
# Install dependencies
make init
# Run unit tests
make test
- Direct Runner (simplest option)
make run-direct
- Prism Runner
make run-prism
- Flink Runner with LOOPBACK
make run-flink
Note: Uses optimized Flink configurations from
data/flink-conf.yaml
- Portable Runner with Flink
make run-portable-flink
- Portable Runner with Local Flink Cluster
make run-portable-flink-local
Note: Uses optimized Flink configurations from
data/flink-conf-local.yaml
For the local Flink cluster setup:
- Download Apache Flink from the official website
- Set
FLINK_LOCATION
to your Flink installation path - The above command will:
- Copy optimized configurations from
data/flink-conf-local.yaml
- Start a Flink cluster (logs available in
$FLINK_LOCATION/log
) - Execute the Beam job
- Stop the cluster automatically
- Copy optimized configurations from
- Portable Runner with Local Flink Cluster and DOCKER
Using EXTERNAL
introduces complexities in managing Python package dependencies across different environments compared to LOOPBACK
. To mitigate this overhead, DOCKER
is used here to build a local Python worker SDK Docker image containing the necessary packages:
make docker-cpu
This command builds a Pytorch CPU image with Beam, suitable for testing purposes.
Subsequently, a local Flink cluster can be launched to utilize this Python SDK image for model inference:
make run-portable-flink-worker-local
Note that:
- Shared Artifact Staging: The directory
/tmp/beam-artifact-staging
must be accessible to both the job server and the Flink cluster for sharing staging artifacts. - Limitations: The pipeline operating within the Dockerized worker cannot directly access local image lists or write prediction results to the local filesystem. Consequently, testing is limited to scenarios like processing a single image file and printing the output within the worker environment.
However, this method is generally discouraged. For testing Beam pipelines, it is recommended to use LOOPBACK
or local runners. For production deployments, utilize appropriate runners such as DataflowRunner or FlinkRunner with a managed Flink cluster (e.g., on Dataproc).
This guide explains how to set up and use a Dataproc Flink cluster on Google Cloud Platform (GCP).
- A Linux-based environment (recommended)
- GCP project with required permissions
- Configured
.env
file with your GCP Dataproc settings
- Push the previous Docker image (created by
make docker-cpu
) to Artifact Registry (AR):
make push-docker-cpu
- Create a Flink cluster on Dataproc:
make create-flink-cluster
- Execute a Beam ML job on the cluster:
make run-portable-flink-cluster
- Clean up by removing the cluster:
make remove-flink-cluster
Note: Before starting, ensure you've properly configured the GCP DATAPROC SETTINGS
section in your .env
file with your project-specific information.
TODO.
- Streaming
- GPU
Service | Port |
---|---|
Artifact Staging Service | 8098 |
Java Expansion Service | 8097 |
Job Service | 8099 |
Worker Pool Service | 5000 |
Check .env
file to customize environment settings.
graph TD
A[SDK] --> B(Job Service);
B --> C{Execution Engine};
C --> D[Worker Pool];
E[Java Expansion Service] --> F(SDK Harness);
D --> F
F --> C
style D fill:#ccf,stroke:#333,stroke-width:2px
style F fill:#ccf,stroke:#333,stroke-width:2px
Explanation of the Graph:
- The user writes a pipeline using a Beam SDK, which is submitted to the Job Service.
- The Job Service sends the pipeline to the execution engine (like Dataflow or Flink).
- If the pipeline includes cross-language transforms, then a Java Expansion Service will spin up a Java SDK harness for the transforms written in Java.
- The execution engine creates and manages the worker pool.
- The SDK harness is hosted by the worker pool and executes the transforms of the pipeline.
- The execution engine and worker pool communicate during the job execution.