Skip to content

Commit 99501d1

Browse files
yufansongwenym1
andauthoredMar 17, 2023
feat(connector-node): support stream chunk payload in connector node (risingwavelabs#8548)
Co-authored-by: William Wen <william123.wen@gmail.com>
1 parent 8c95702 commit 99501d1

File tree

17 files changed

+428
-21
lines changed

17 files changed

+428
-21
lines changed
 

‎ci/scripts/e2e-iceberg-sink-test.sh

+5
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,13 @@ echo "--- Download artifacts"
2525
mkdir -p target/debug
2626
buildkite-agent artifact download risingwave-"$profile" target/debug/
2727
buildkite-agent artifact download risedev-dev-"$profile" target/debug/
28+
buildkite-agent artifact download librisingwave_java_binding.so-"$profile" target/debug
2829
mv target/debug/risingwave-"$profile" target/debug/risingwave
2930
mv target/debug/risedev-dev-"$profile" target/debug/risedev-dev
31+
mv target/debug/librisingwave_java_binding.so-"$profile" target/debug/librisingwave_java_binding.so
32+
33+
export RW_JAVA_BINDING_LIB_PATH=${PWD}/target/debug
34+
export RW_CONNECTOR_RPC_SINK_PAYLOAD_FORMAT=stream_chunk
3035

3136
echo "--- Download connector node package"
3237
buildkite-agent artifact download risingwave-connector.tar.gz ./

‎ci/scripts/e2e-sink-test.sh

+5
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,13 @@ echo "--- Download artifacts"
2525
mkdir -p target/debug
2626
buildkite-agent artifact download risingwave-"$profile" target/debug/
2727
buildkite-agent artifact download risedev-dev-"$profile" target/debug/
28+
buildkite-agent artifact download librisingwave_java_binding.so-"$profile" target/debug
2829
mv target/debug/risingwave-"$profile" target/debug/risingwave
2930
mv target/debug/risedev-dev-"$profile" target/debug/risedev-dev
31+
mv target/debug/librisingwave_java_binding.so-"$profile" target/debug/librisingwave_java_binding.so
32+
33+
export RW_JAVA_BINDING_LIB_PATH=${PWD}/target/debug
34+
export RW_CONNECTOR_RPC_SINK_PAYLOAD_FORMAT=stream_chunk
3035

3136
echo "--- Download connector node package"
3237
buildkite-agent artifact download risingwave-connector.tar.gz ./

‎ci/scripts/e2e-source-test.sh

+4
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,12 @@ echo "--- Download artifacts"
2828
mkdir -p target/debug
2929
buildkite-agent artifact download risingwave-"$profile" target/debug/
3030
buildkite-agent artifact download risedev-dev-"$profile" target/debug/
31+
buildkite-agent artifact download librisingwave_java_binding.so-"$profile" target/debug
3132
mv target/debug/risingwave-"$profile" target/debug/risingwave
3233
mv target/debug/risedev-dev-"$profile" target/debug/risedev-dev
34+
mv target/debug/librisingwave_java_binding.so-"$profile" target/debug/librisingwave_java_binding.so
35+
36+
export RW_JAVA_BINDING_LIB_PATH=${PWD}/target/debug
3337

3438

3539
echo "--- Download connector node package"

‎dashboard/proto/gen/connector_service.ts

+103-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

‎dashboard/proto/gen/stream_plan.ts

+7
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

‎docker/Dockerfile

+6-2
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,9 @@ RUN rustup self update \
3434

3535
RUN cargo fetch
3636

37-
RUN cargo build -p risingwave_cmd_all --release --features "static-link static-log-level" && \
37+
RUN cargo build -p risingwave_cmd_all -p risingwave_java_binding --release --features "static-link static-log-level" && \
3838
mkdir -p /risingwave/bin && mv /risingwave/target/release/risingwave /risingwave/bin/ && \
39+
mkdir -p /risingwave/lib && mv /risingwave/target/release/librisingwave_java_binding.so /risingwave/lib && \
3940
cargo clean
4041

4142
RUN cd /risingwave/java && mvn -B package -Dmaven.test.skip=true && \
@@ -47,10 +48,13 @@ RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get -y install ca-certi
4748

4849
FROM image-base as risingwave
4950
LABEL org.opencontainers.image.source https://github.com/risingwavelabs/risingwave
50-
RUN mkdir -p /risingwave/bin/connector-node
51+
RUN mkdir -p /risingwave/bin/connector-node && mkdir -p /risingwave/lib
5152
COPY --from=builder /risingwave/bin/risingwave /risingwave/bin/risingwave
5253
COPY --from=builder /risingwave/bin/connector-node /risingwave/bin/connector-node
5354
COPY --from=builder /risingwave/ui /risingwave/ui
55+
COPY --from=builder /risingwave/lib/librisingwave_java_binding.so /risingwave/lib/librisingwave_java_binding.so
56+
# Set java.library.path env to /risingwave/lib
57+
ENV RW_JAVA_BINDING_LIB_PATH /risingwave/lib
5458
# Set default playground mode to docker-playground profile
5559
ENV PLAYGROUND_PROFILE docker-playground
5660
# Set default dashboard UI to local path instead of github proxy

‎java/connector-node/assembly/scripts/start-service.sh

+1-1
Original file line numberDiff line numberDiff line change
@@ -24,4 +24,4 @@ if [ -z "${port}" ]; then
2424
port=$PORT
2525
fi
2626

27-
java -classpath "${DIR}/libs/*" $MAIN --port ${port}
27+
java -classpath "${DIR}/libs/*" -Djava.library.path="${RW_JAVA_BINDING_LIB_PATH}" $MAIN --port ${port}

‎java/connector-node/risingwave-connector-service/pom.xml

+4
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@
2525
<groupId>com.risingwave.java</groupId>
2626
<artifactId>proto</artifactId>
2727
</dependency>
28+
<dependency>
29+
<groupId>com.risingwave.java</groupId>
30+
<artifactId>java-binding</artifactId>
31+
</dependency>
2832
<dependency>
2933
<groupId>com.risingwave.java</groupId>
3034
<artifactId>connector-api</artifactId>

‎java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkStreamObserver.java

+4
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import com.risingwave.connector.api.TableSchema;
2020
import com.risingwave.connector.api.sink.*;
21+
import com.risingwave.connector.deserializer.StreamChunkDeserializer;
2122
import com.risingwave.metrics.ConnectorNodeMetrics;
2223
import com.risingwave.metrics.MonitoredRowIterator;
2324
import com.risingwave.proto.ConnectorServiceProto;
@@ -202,6 +203,9 @@ private void bindSink(SinkConfig sinkConfig, ConnectorServiceProto.SinkPayloadFo
202203
case JSON:
203204
deserializer = new JsonDeserializer(tableSchema);
204205
break;
206+
case STREAM_CHUNK:
207+
deserializer = new StreamChunkDeserializer(tableSchema);
208+
break;
205209
}
206210
ConnectorNodeMetrics.incActiveConnections(sinkConfig.getConnectorType(), "node1");
207211
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,215 @@
1+
// Copyright 2023 RisingWave Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package com.risingwave.connector.deserializer;
16+
17+
import static io.grpc.Status.INVALID_ARGUMENT;
18+
19+
import com.risingwave.connector.api.TableSchema;
20+
import com.risingwave.connector.api.sink.CloseableIterator;
21+
import com.risingwave.connector.api.sink.Deserializer;
22+
import com.risingwave.connector.api.sink.SinkRow;
23+
import com.risingwave.java.binding.StreamChunkIterator;
24+
import com.risingwave.java.binding.StreamChunkRow;
25+
import com.risingwave.proto.ConnectorServiceProto;
26+
import com.risingwave.proto.ConnectorServiceProto.SinkStreamRequest.WriteBatch.StreamChunkPayload;
27+
import com.risingwave.proto.Data;
28+
29+
public class StreamChunkDeserializer implements Deserializer {
30+
interface ValueGetter {
31+
Object get(StreamChunkRow row);
32+
}
33+
34+
private final ValueGetter[] valueGetters;
35+
36+
public StreamChunkDeserializer(TableSchema tableSchema) {
37+
this.valueGetters = buildValueGetter(tableSchema);
38+
}
39+
40+
static ValueGetter[] buildValueGetter(TableSchema tableSchema) {
41+
String[] colNames = tableSchema.getColumnNames();
42+
ValueGetter[] ret = new ValueGetter[colNames.length];
43+
for (int i = 0; i < colNames.length; i++) {
44+
int index = i;
45+
Data.DataType.TypeName typeName = tableSchema.getColumnType(colNames[i]);
46+
switch (typeName) {
47+
case INT16:
48+
ret[i] =
49+
row -> {
50+
if (row.isNull(index)) {
51+
return null;
52+
}
53+
return row.getShort(index);
54+
};
55+
break;
56+
case INT32:
57+
ret[i] =
58+
row -> {
59+
if (row.isNull(index)) {
60+
return null;
61+
}
62+
return row.getInt(index);
63+
};
64+
break;
65+
case INT64:
66+
ret[i] =
67+
row -> {
68+
if (row.isNull(index)) {
69+
return null;
70+
}
71+
return row.getLong(index);
72+
};
73+
break;
74+
case FLOAT:
75+
ret[i] =
76+
row -> {
77+
if (row.isNull(index)) {
78+
return null;
79+
}
80+
return row.getFloat(index);
81+
};
82+
break;
83+
case DOUBLE:
84+
ret[i] =
85+
row -> {
86+
if (row.isNull(index)) {
87+
return null;
88+
}
89+
return row.getDouble(index);
90+
};
91+
break;
92+
case BOOLEAN:
93+
ret[i] =
94+
row -> {
95+
if (row.isNull(index)) {
96+
return null;
97+
}
98+
return row.getBoolean(index);
99+
};
100+
break;
101+
case VARCHAR:
102+
ret[i] =
103+
row -> {
104+
if (row.isNull(index)) {
105+
return null;
106+
}
107+
return row.getString(index);
108+
};
109+
break;
110+
default:
111+
throw io.grpc.Status.INVALID_ARGUMENT
112+
.withDescription("unsupported type " + typeName)
113+
.asRuntimeException();
114+
}
115+
}
116+
return ret;
117+
}
118+
119+
@Override
120+
public CloseableIterator<SinkRow> deserialize(
121+
ConnectorServiceProto.SinkStreamRequest.WriteBatch writeBatch) {
122+
if (!writeBatch.hasStreamChunkPayload()) {
123+
throw INVALID_ARGUMENT
124+
.withDescription(
125+
"expected StreamChunkPayload, got " + writeBatch.getPayloadCase())
126+
.asRuntimeException();
127+
}
128+
StreamChunkPayload streamChunkPayload = writeBatch.getStreamChunkPayload();
129+
return new StreamChunkIteratorWrapper(
130+
new StreamChunkIterator(streamChunkPayload.getBinaryData().toByteArray()),
131+
valueGetters);
132+
}
133+
134+
static class StreamChunkRowWrapper implements SinkRow {
135+
136+
private boolean isClosed;
137+
private final StreamChunkRow inner;
138+
private final ValueGetter[] valueGetters;
139+
140+
StreamChunkRowWrapper(StreamChunkRow inner, ValueGetter[] valueGetters) {
141+
this.inner = inner;
142+
this.valueGetters = valueGetters;
143+
this.isClosed = false;
144+
}
145+
146+
@Override
147+
public Object get(int index) {
148+
return valueGetters[index].get(inner);
149+
}
150+
151+
@Override
152+
public Data.Op getOp() {
153+
return inner.getOp();
154+
}
155+
156+
@Override
157+
public int size() {
158+
return valueGetters.length;
159+
}
160+
161+
@Override
162+
public void close() {
163+
if (!isClosed) {
164+
this.isClosed = true;
165+
inner.close();
166+
}
167+
}
168+
}
169+
170+
static class StreamChunkIteratorWrapper implements CloseableIterator<SinkRow> {
171+
private final StreamChunkIterator iter;
172+
private final ValueGetter[] valueGetters;
173+
private StreamChunkRowWrapper row;
174+
175+
public StreamChunkIteratorWrapper(StreamChunkIterator iter, ValueGetter[] valueGetters) {
176+
this.iter = iter;
177+
this.valueGetters = valueGetters;
178+
this.row = null;
179+
}
180+
181+
@Override
182+
public void close() {
183+
iter.close();
184+
try {
185+
if (row != null) {
186+
row.close();
187+
}
188+
} catch (Exception e) {
189+
throw new RuntimeException(e);
190+
}
191+
}
192+
193+
@Override
194+
public boolean hasNext() {
195+
if (this.row != null) {
196+
throw new RuntimeException(
197+
"cannot call hasNext again when there is row not consumed by next");
198+
}
199+
StreamChunkRow row = iter.next();
200+
if (row == null) {
201+
return false;
202+
}
203+
this.row = new StreamChunkRowWrapper(row, valueGetters);
204+
return true;
205+
}
206+
207+
@Override
208+
public SinkRow next() {
209+
// Move the sink row outside
210+
SinkRow ret = this.row;
211+
this.row = null;
212+
return ret;
213+
}
214+
}
215+
}

‎proto/connector_service.proto

+6
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ message SinkConfig {
3232
enum SinkPayloadFormat {
3333
FORMAT_UNSPECIFIED = 0;
3434
JSON = 1;
35+
STREAM_CHUNK = 2;
3536
}
3637

3738
message SinkStreamRequest {
@@ -49,8 +50,13 @@ message SinkStreamRequest {
4950
repeated RowOp row_ops = 1;
5051
}
5152

53+
message StreamChunkPayload {
54+
bytes binary_data = 1;
55+
}
56+
5257
oneof payload {
5358
JsonPayload json_payload = 1;
59+
StreamChunkPayload stream_chunk_payload = 2;
5460
}
5561

5662
uint64 batch_id = 3;

‎src/compute/src/lib.rs

+4
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,10 @@ pub struct ComputeNodeOpts {
6565
#[clap(long, env = "RW_CONNECTOR_RPC_ENDPOINT")]
6666
pub connector_rpc_endpoint: Option<String>,
6767

68+
/// Payload format of connector sink rpc
69+
#[clap(long, env = "RW_CONNECTOR_RPC_SINK_PAYLOAD_FORMAT")]
70+
pub connector_rpc_sink_payload_format: Option<String>,
71+
6872
/// One of:
6973
/// 1. `hummock+{object_store}` where `object_store`
7074
/// is one of `s3://{path}`, `s3-compatible://{path}`, `minio://{path}`, `disk://{path}`,

‎src/compute/src/server.rs

+14
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ use risingwave_connector::source::monitor::SourceMetrics;
3434
use risingwave_hummock_sdk::compact::CompactorRuntimeConfig;
3535
use risingwave_pb::common::WorkerType;
3636
use risingwave_pb::compute::config_service_server::ConfigServiceServer;
37+
use risingwave_pb::connector_service::SinkPayloadFormat;
3738
use risingwave_pb::health::health_server::HealthServer;
3839
use risingwave_pb::monitor_service::monitor_service_server::MonitorServiceServer;
3940
use risingwave_pb::stream_service::stream_service_server::StreamServiceServer;
@@ -287,7 +288,20 @@ pub async fn compute_node_serve(
287288

288289
let connector_params = risingwave_connector::ConnectorParams {
289290
connector_rpc_endpoint: opts.connector_rpc_endpoint,
291+
sink_payload_format: match opts.connector_rpc_sink_payload_format.as_deref() {
292+
None | Some("json") => SinkPayloadFormat::Json,
293+
Some("stream_chunk") => SinkPayloadFormat::StreamChunk,
294+
_ => {
295+
unreachable!(
296+
"invalid sink payload format: {:?}. Should be either json or stream_chunk",
297+
opts.connector_rpc_sink_payload_format
298+
)
299+
}
300+
},
290301
};
302+
303+
info!("connector param: {:?}", connector_params);
304+
291305
// Initialize the streaming environment.
292306
let stream_env = StreamEnvironment::new(
293307
advertise_addr.clone(),

‎src/connector/src/lib.rs

+7-1
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
use std::time::Duration;
3131

3232
use duration_str::parse_std;
33+
use risingwave_pb::connector_service::SinkPayloadFormat;
3334
use serde::de;
3435

3536
pub mod aws_utils;
@@ -45,12 +46,17 @@ pub mod common;
4546
#[derive(Clone, Debug, Default)]
4647
pub struct ConnectorParams {
4748
pub connector_rpc_endpoint: Option<String>,
49+
pub sink_payload_format: SinkPayloadFormat,
4850
}
4951

5052
impl ConnectorParams {
51-
pub fn new(connector_rpc_endpoint: Option<String>) -> Self {
53+
pub fn new(
54+
connector_rpc_endpoint: Option<String>,
55+
sink_payload_format: SinkPayloadFormat,
56+
) -> Self {
5257
Self {
5358
connector_rpc_endpoint,
59+
sink_payload_format,
5460
}
5561
}
5662
}

‎src/connector/src/sink/remote.rs

+38-14
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use std::collections::HashMap;
1717
use anyhow::anyhow;
1818
use async_trait::async_trait;
1919
use itertools::Itertools;
20+
use prost::Message;
2021
use risingwave_common::array::StreamChunk;
2122
#[cfg(test)]
2223
use risingwave_common::catalog::Field;
@@ -25,12 +26,16 @@ use risingwave_common::catalog::Schema;
2526
use risingwave_common::types::DataType;
2627
use risingwave_common::util::addr::HostAddr;
2728
use risingwave_pb::connector_service::sink_stream_request::write_batch::json_payload::RowOp;
28-
use risingwave_pb::connector_service::sink_stream_request::write_batch::{JsonPayload, Payload};
29+
use risingwave_pb::connector_service::sink_stream_request::write_batch::{
30+
JsonPayload, Payload, StreamChunkPayload,
31+
};
2932
use risingwave_pb::connector_service::sink_stream_request::{
3033
Request as SinkRequest, StartEpoch, SyncBatch, WriteBatch,
3134
};
3235
use risingwave_pb::connector_service::table_schema::Column;
33-
use risingwave_pb::connector_service::{SinkResponse, SinkStreamRequest, TableSchema};
36+
use risingwave_pb::connector_service::{
37+
SinkPayloadFormat, SinkResponse, SinkStreamRequest, TableSchema,
38+
};
3439
use risingwave_rpc_client::ConnectorClient;
3540
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
3641
use tokio_stream::StreamExt;
@@ -105,6 +110,7 @@ pub struct RemoteSink<const APPEND_ONLY: bool> {
105110
_client: Option<ConnectorClient>,
106111
request_sender: Option<UnboundedSender<SinkStreamRequest>>,
107112
response_stream: ResponseStreamImpl,
113+
payload_format: SinkPayloadFormat,
108114
}
109115

110116
impl<const APPEND_ONLY: bool> RemoteSink<APPEND_ONLY> {
@@ -141,6 +147,7 @@ impl<const APPEND_ONLY: bool> RemoteSink<APPEND_ONLY> {
141147
config.connector_type.clone(),
142148
config.properties.clone(),
143149
table_schema,
150+
connector_params.sink_payload_format,
144151
)
145152
.await
146153
.map_err(SinkError::from)?;
@@ -155,6 +162,7 @@ impl<const APPEND_ONLY: bool> RemoteSink<APPEND_ONLY> {
155162
_client: Some(client),
156163
request_sender: Some(request_sender),
157164
response_stream: ResponseStreamImpl::Grpc(response),
165+
payload_format: connector_params.sink_payload_format,
158166
})
159167
}
160168

@@ -240,24 +248,38 @@ impl<const APPEND_ONLY: bool> RemoteSink<APPEND_ONLY> {
240248
_client: None,
241249
request_sender: Some(request_sender),
242250
response_stream: ResponseStreamImpl::Receiver(response_receiver),
251+
payload_format: SinkPayloadFormat::Json,
243252
}
244253
}
245254
}
246255

247256
#[async_trait]
248257
impl<const APPEND_ONLY: bool> Sink for RemoteSink<APPEND_ONLY> {
249258
async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
250-
let mut row_ops = vec![];
251-
for (op, row_ref) in chunk.rows() {
252-
let map = record_to_json(row_ref, &self.schema.fields)?;
253-
let row_op = RowOp {
254-
op_type: op.to_protobuf() as i32,
255-
line: serde_json::to_string(&map)
256-
.map_err(|e| SinkError::Remote(format!("{:?}", e)))?,
257-
};
258-
259-
row_ops.push(row_op);
260-
}
259+
let payload = match self.payload_format {
260+
SinkPayloadFormat::Json => {
261+
let mut row_ops = vec![];
262+
for (op, row_ref) in chunk.rows() {
263+
let map = record_to_json(row_ref, &self.schema.fields)?;
264+
let row_op = RowOp {
265+
op_type: op.to_protobuf() as i32,
266+
line: serde_json::to_string(&map)
267+
.map_err(|e| SinkError::Remote(format!("{:?}", e)))?,
268+
};
269+
270+
row_ops.push(row_op);
271+
}
272+
Payload::JsonPayload(JsonPayload { row_ops })
273+
}
274+
SinkPayloadFormat::StreamChunk => {
275+
let prost_stream_chunk = chunk.to_protobuf();
276+
let binary_data = Message::encode_to_vec(&prost_stream_chunk);
277+
Payload::StreamChunkPayload(StreamChunkPayload { binary_data })
278+
}
279+
SinkPayloadFormat::FormatUnspecified => {
280+
unreachable!("should specify sink payload format")
281+
}
282+
};
261283

262284
let epoch = self.epoch.ok_or_else(|| {
263285
SinkError::Remote("epoch has not been initialize, call `begin_epoch`".to_string())
@@ -268,7 +290,7 @@ impl<const APPEND_ONLY: bool> Sink for RemoteSink<APPEND_ONLY> {
268290
request: Some(SinkRequest::Write(WriteBatch {
269291
epoch,
270292
batch_id,
271-
payload: Some(Payload::JsonPayload(JsonPayload { row_ops })),
293+
payload: Some(payload),
272294
})),
273295
})
274296
.map_err(|e| SinkError::Remote(e.to_string()))?;
@@ -451,6 +473,7 @@ mod test {
451473
assert_eq!(row_2.line, "{\"id\":3,\"name\":\"Clare\"}");
452474
assert_eq!(row_2.op_type, data::Op::Insert as i32);
453475
}
476+
_ => unreachable!("should be json payload"),
454477
}
455478
}
456479
_ => panic!("test failed: failed to construct write request"),
@@ -512,6 +535,7 @@ mod test {
512535
assert_eq!(row_2.line, "{\"id\":6,\"name\":\"Frank\"}");
513536
assert_eq!(row_2.op_type, data::Op::Insert as i32);
514537
}
538+
_ => unreachable!("should be json payload"),
515539
}
516540
}
517541
_ => panic!("test failed: failed to construct write request"),

‎src/rpc_client/src/connector_client.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -120,14 +120,15 @@ impl ConnectorClient {
120120
connector_type: String,
121121
properties: HashMap<String, String>,
122122
table_schema: Option<TableSchema>,
123+
sink_payload_format: SinkPayloadFormat,
123124
) -> Result<(UnboundedSender<SinkStreamRequest>, Streaming<SinkResponse>)> {
124125
let (request_sender, request_receiver) = unbounded_channel::<SinkStreamRequest>();
125126

126127
// Send initial request in case of the blocking receive call from creating streaming request
127128
request_sender
128129
.send(SinkStreamRequest {
129130
request: Some(SinkRequest::Start(StartSink {
130-
format: SinkPayloadFormat::Json as i32,
131+
format: sink_payload_format as i32,
131132
sink_config: Some(SinkConfig {
132133
connector_type,
133134
properties,

‎src/stream/src/task/env.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ use risingwave_common::system_param::local_manager::LocalSystemParamsManagerRef;
2020
use risingwave_common::util::addr::HostAddr;
2121
use risingwave_connector::source::monitor::SourceMetrics;
2222
use risingwave_connector::ConnectorParams;
23+
#[cfg(test)]
24+
use risingwave_pb::connector_service::SinkPayloadFormat;
2325
use risingwave_source::dml_manager::DmlManagerRef;
2426
use risingwave_storage::StateStoreImpl;
2527

@@ -90,7 +92,7 @@ impl StreamEnvironment {
9092
use risingwave_storage::monitor::MonitoredStorageMetrics;
9193
StreamEnvironment {
9294
server_addr: "127.0.0.1:5688".parse().unwrap(),
93-
connector_params: ConnectorParams::new(None),
95+
connector_params: ConnectorParams::new(None, SinkPayloadFormat::Json),
9496
config: Arc::new(StreamingConfig::default()),
9597
worker_id: WorkerNodeId::default(),
9698
state_store: StateStoreImpl::shared_in_memory_store(Arc::new(

0 commit comments

Comments
 (0)
Please sign in to comment.