Skip to content

Commit 3abb54a

Browse files
committed
Add a SetSchema transformation for imposing and inferring schemas from schemaless data
1 parent fc93fb4 commit 3abb54a

File tree

3 files changed

+474
-0
lines changed

3 files changed

+474
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,218 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.connect.transforms;
18+
19+
import org.apache.kafka.common.config.ConfigDef;
20+
import org.apache.kafka.connect.connector.ConnectRecord;
21+
import org.apache.kafka.connect.data.Schema;
22+
import org.apache.kafka.connect.data.SchemaAndValue;
23+
import org.apache.kafka.connect.data.SchemaBuilder;
24+
import org.apache.kafka.connect.data.Struct;
25+
import org.apache.kafka.connect.errors.DataException;
26+
import org.apache.kafka.connect.transforms.util.SimpleConfig;
27+
28+
import java.nio.ByteBuffer;
29+
import java.util.ArrayList;
30+
import java.util.Arrays;
31+
import java.util.Collection;
32+
import java.util.HashMap;
33+
import java.util.Iterator;
34+
import java.util.List;
35+
import java.util.Map;
36+
37+
import static org.apache.kafka.connect.transforms.util.Requirements.requireNoSchema;
38+
39+
public abstract class SetSchema<R extends ConnectRecord<R>> implements Transformation<R> {
40+
41+
public static final String OVERVIEW_DOC =
42+
"Apply a schema to schemaless data, either using a predefined schema against which the data is validated " +
43+
"or by inferring a schema based on the data. " +
44+
"Schemas can be applied to the key (<code>" + Key.class.getName() + "</code>)"
45+
+ " or value (<code>" + Value.class.getName() + "</code>). " +
46+
"When using inferred schemas, combine with the SetSchemaMetadata transformation if you also need " +
47+
"to set the name or version. Inferred schemas are always optional and use optional schemas for " +
48+
"Struct fields, array elements, and map elements.";
49+
50+
public static final ConfigDef CONFIG_DEF = new ConfigDef();
51+
52+
@Override
53+
public void configure(Map<String, ?> configs) {
54+
final SimpleConfig config = new SimpleConfig(CONFIG_DEF, configs);
55+
}
56+
57+
@Override
58+
public R apply(R record) {
59+
final Schema originalSchema = operatingSchema(record);
60+
requireNoSchema(originalSchema, "setting or inferring the schema");
61+
final SchemaAndValue updated = inferSchema(operatingValue(record));
62+
return newRecord(record, updated.schema(), updated.value());
63+
}
64+
65+
@Override
66+
public ConfigDef config() {
67+
return CONFIG_DEF;
68+
}
69+
70+
@Override
71+
public void close() {
72+
}
73+
74+
protected abstract Schema operatingSchema(R record);
75+
76+
protected abstract Object operatingValue(R record);
77+
78+
protected abstract R newRecord(R record, Schema updatedSchema, Object updated);
79+
80+
/**
81+
* Set the schema name, version or both on the record's key schema.
82+
*/
83+
public static class Key<R extends ConnectRecord<R>> extends SetSchema<R> {
84+
@Override
85+
protected Schema operatingSchema(R record) {
86+
return record.keySchema();
87+
}
88+
89+
@Override
90+
protected Object operatingValue(R record) {
91+
return record.key();
92+
}
93+
94+
@Override
95+
protected R newRecord(R record, Schema updatedSchema, Object updated) {
96+
return record.newRecord(record.topic(), record.kafkaPartition(), updatedSchema, updated, record.valueSchema(), record.value(), record.timestamp());
97+
}
98+
}
99+
100+
/**
101+
* Set the schema name, version or both on the record's value schema.
102+
*/
103+
public static class Value<R extends ConnectRecord<R>> extends SetSchema<R> {
104+
@Override
105+
protected Schema operatingSchema(R record) {
106+
return record.valueSchema();
107+
}
108+
109+
@Override
110+
protected Object operatingValue(R record) {
111+
return record.value();
112+
}
113+
114+
@Override
115+
protected R newRecord(R record, Schema updatedSchema, Object updated) {
116+
return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), updatedSchema, updated, record.timestamp());
117+
}
118+
}
119+
120+
private SchemaAndValue inferSchema(Object value) {
121+
if (value == null) {
122+
return null;
123+
}
124+
125+
if (value instanceof Byte) {
126+
return new SchemaAndValue(Schema.OPTIONAL_INT8_SCHEMA, value);
127+
} else if (value instanceof Short) {
128+
return new SchemaAndValue(Schema.OPTIONAL_INT16_SCHEMA, value);
129+
} else if (value instanceof Integer) {
130+
return new SchemaAndValue(Schema.OPTIONAL_INT32_SCHEMA, value);
131+
} else if (value instanceof Long) {
132+
return new SchemaAndValue(Schema.OPTIONAL_INT64_SCHEMA, value);
133+
} else if (value instanceof Float) {
134+
return new SchemaAndValue(Schema.OPTIONAL_FLOAT32_SCHEMA, value);
135+
} else if (value instanceof Double) {
136+
return new SchemaAndValue(Schema.OPTIONAL_FLOAT64_SCHEMA, value);
137+
} else if (value instanceof Boolean) {
138+
return new SchemaAndValue(Schema.OPTIONAL_BOOLEAN_SCHEMA, value);
139+
} else if (value instanceof String) {
140+
return new SchemaAndValue(Schema.OPTIONAL_STRING_SCHEMA, value);
141+
} else if (value instanceof byte[]) {
142+
return new SchemaAndValue(Schema.OPTIONAL_BYTES_SCHEMA, value);
143+
} else if (value instanceof ByteBuffer) {
144+
return new SchemaAndValue(Schema.OPTIONAL_BYTES_SCHEMA, value);
145+
} else if (value instanceof Collection) {
146+
Collection<Object> arrayValue = (Collection<Object>) value;
147+
if (arrayValue.isEmpty()) {
148+
throw new DataException("Cannot infer schema from an empty array/list type");
149+
}
150+
final List<Object> convertedValues = new ArrayList();
151+
Schema elementSchema = null;
152+
for (Object element : arrayValue) {
153+
SchemaAndValue inferred = inferSchema(element);
154+
if (element == null) {
155+
// This is ok, just skip over any conversion/checks and just add the null value to the list
156+
convertedValues.add(null);
157+
continue;
158+
} else if (elementSchema == null) {
159+
elementSchema = inferred.schema();
160+
} else if (!elementSchema.equals(inferred.schema())) {
161+
throw new DataException("All inferred schemas in an array/list must be equal: " + elementSchema + " != " + inferred.schema());
162+
}
163+
convertedValues.add(inferred.value());
164+
}
165+
if (elementSchema == null) {
166+
throw new DataException("Cannot infer schema of array type if no elements have non-null values");
167+
}
168+
return new SchemaAndValue(SchemaBuilder.array(elementSchema).optional().build(), convertedValues);
169+
} else if (value instanceof Map) {
170+
// Schemaless maps convert to Structs with schemas
171+
Map<Object, Object> mapValue = (Map<Object, Object>) value;
172+
if (mapValue.isEmpty()) {
173+
throw new DataException("Cannot infer schema from an empty map type");
174+
}
175+
176+
final Object[] keys = mapValue.keySet().toArray();
177+
try {
178+
Arrays.sort(keys);
179+
} catch (ClassCastException e) {
180+
throw new DataException("Cannot infer schema with mismatching keys", e);
181+
}
182+
183+
// Pass 1 to build the schema
184+
SchemaBuilder builder = SchemaBuilder.struct().optional();
185+
// Temporary holder for converted values
186+
Map<String, Object> convertedFields = new HashMap();
187+
for (Object key : Arrays.asList(keys)) {
188+
if (key == null) {
189+
throw new DataException("Map keys may not be null, they must be valid strings");
190+
}
191+
SchemaAndValue keyConverted = inferSchema(key);
192+
if (!Schema.OPTIONAL_STRING_SCHEMA.equals(keyConverted.schema())) {
193+
throw new DataException("Inferred schemas must have string keys, found " + keyConverted);
194+
}
195+
196+
Object fieldValue = mapValue.get(key);
197+
if (fieldValue == null) {
198+
throw new DataException("Map values may not be null");
199+
}
200+
SchemaAndValue valueConverted = inferSchema(fieldValue);
201+
builder.field((String) keyConverted.value(), valueConverted.schema());
202+
convertedFields.put((String) key, valueConverted.value());
203+
}
204+
205+
Schema schema = builder.build();
206+
207+
// Pass 2 to build the value
208+
Struct result = new Struct(schema);
209+
for (Map.Entry<String, Object> entry : convertedFields.entrySet()) {
210+
result.put(entry.getKey(), entry.getValue());
211+
}
212+
213+
return new SchemaAndValue(schema, result);
214+
}
215+
216+
throw new DataException("Unexpected data type for SetSchema: " + value.getClass().getName());
217+
}
218+
}

connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util/Requirements.java

+6
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,12 @@ public static void requireSchema(Schema schema, String purpose) {
3232
}
3333
}
3434

35+
public static void requireNoSchema(Schema schema, String purpose) {
36+
if (schema != null) {
37+
throw new DataException("Schema should not exist for [" + purpose + "]");
38+
}
39+
}
40+
3541
@SuppressWarnings("unchecked")
3642
public static Map<String, Object> requireMap(Object value, String purpose) {
3743
if (!(value instanceof Map)) {

0 commit comments

Comments
 (0)