Skip to content

Commit b787ad4

Browse files
authored
Add CLPV2_ZSTD and CLPV2_LZ4 raw forward index compression codecs. (apache#14661)
1 parent 9259a85 commit b787ad4

File tree

7 files changed

+47
-15
lines changed

7 files changed

+47
-15
lines changed

pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/CLPForwardIndexCreatorV2.java

+20-5
Original file line numberDiff line numberDiff line change
@@ -129,9 +129,10 @@ public class CLPForwardIndexCreatorV2 implements ForwardIndexCreator {
129129
private final ChunkCompressionType _chunkCompressionType;
130130

131131
/**
132-
* Initializes a forward index creator for the given column using the provided base directory and column statistics.
133-
* This constructor is specifically used by {@code ForwardIndexCreatorFactory}. Unlike other immutable forward index
134-
* constructors, this one handles the entire process of converting a mutable forward index into an immutable one.
132+
* Initializes a forward index creator for the given column using the provided base directory, column statistics and
133+
* chunk compressor type. This constructor is specifically used by {@code ForwardIndexCreatorFactory}. Unlike other
134+
* immutable forward index constructors, this one handles the entire process of converting a mutable forward index
135+
* into an immutable one.
135136
*
136137
* <p>The {@code columnStatistics} object passed into this constructor should contain a reference to the mutable
137138
* forward index ({@link CLPMutableForwardIndexV2}). The data from the mutable index is efficiently copied over
@@ -142,12 +143,26 @@ public class CLPForwardIndexCreatorV2 implements ForwardIndexCreator {
142143
* @param baseIndexDir The base directory where the forward index files will be stored.
143144
* @param columnStatistics The column statistics containing the CLP forward index information, including a reference
144145
* to the mutable forward index.
146+
* @param chunkCompressionType The chunk compressor type used to compress internal data columns
145147
* @throws IOException If there is an error during initialization or while accessing the file system.
146148
*/
147-
public CLPForwardIndexCreatorV2(File baseIndexDir, ColumnStatistics columnStatistics)
149+
public CLPForwardIndexCreatorV2(File baseIndexDir, ColumnStatistics columnStatistics,
150+
ChunkCompressionType chunkCompressionType)
148151
throws IOException {
149152
this(baseIndexDir, ((CLPStatsProvider) columnStatistics).getCLPV2Stats().getClpMutableForwardIndexV2(),
150-
ChunkCompressionType.ZSTANDARD);
153+
chunkCompressionType);
154+
}
155+
156+
/**
157+
* Same as above, except with chunk compressor set to ZStandard by default
158+
* @param baseIndexDir The base directory where the forward index files will be stored.
159+
* @param columnStatistics The column statistics containing the CLP forward index information, including a reference
160+
* to the mutable forward index.
161+
* @throws IOException If there is an error during initialization or while accessing the file system.
162+
*/
163+
public CLPForwardIndexCreatorV2(File baseIndexDir, ColumnStatistics columnStatistics)
164+
throws IOException {
165+
this(baseIndexDir, columnStatistics, ChunkCompressionType.ZSTANDARD);
151166
}
152167

153168
/**

pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexCreatorFactory.java

+8
Original file line numberDiff line numberDiff line change
@@ -73,11 +73,19 @@ public static ForwardIndexCreator createIndexCreator(IndexCreationContext contex
7373
// Dictionary disabled columns
7474
DataType storedType = fieldSpec.getDataType().getStoredType();
7575
if (indexConfig.getCompressionCodec() == FieldConfig.CompressionCodec.CLP) {
76+
// CLP (V1) uses hard-coded chunk compressor which is set to `PassThrough`
7677
return new CLPForwardIndexCreatorV1(indexDir, columnName, numTotalDocs, context.getColumnStatistics());
7778
}
7879
if (indexConfig.getCompressionCodec() == FieldConfig.CompressionCodec.CLPV2) {
80+
// Use the default chunk compression codec for CLP, currently configured to use ZStandard
7981
return new CLPForwardIndexCreatorV2(indexDir, context.getColumnStatistics());
8082
}
83+
if (indexConfig.getCompressionCodec() == FieldConfig.CompressionCodec.CLPV2_ZSTD) {
84+
return new CLPForwardIndexCreatorV2(indexDir, context.getColumnStatistics(), ChunkCompressionType.ZSTANDARD);
85+
}
86+
if (indexConfig.getCompressionCodec() == FieldConfig.CompressionCodec.CLPV2_LZ4) {
87+
return new CLPForwardIndexCreatorV2(indexDir, context.getColumnStatistics(), ChunkCompressionType.LZ4);
88+
}
8189
ChunkCompressionType chunkCompressionType = indexConfig.getChunkCompressionType();
8290
if (chunkCompressionType == null) {
8391
chunkCompressionType = ForwardIndexType.getDefaultCompressionType(fieldSpec.getFieldType());

pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexType.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,9 @@ public MutableIndex createMutableIndex(MutableIndexContext context, ForwardIndex
256256
// CLP (V1) always have clp encoding enabled whereas V2 is dynamic
257257
clpMutableForwardIndex.forceClpEncoding();
258258
return clpMutableForwardIndex;
259-
} else if (config.getCompressionCodec() == CompressionCodec.CLPV2) {
259+
} else if (config.getCompressionCodec() == CompressionCodec.CLPV2
260+
|| config.getCompressionCodec() == CompressionCodec.CLPV2_ZSTD
261+
|| config.getCompressionCodec() == CompressionCodec.CLPV2_LZ4) {
260262
CLPMutableForwardIndexV2 clpMutableForwardIndex =
261263
new CLPMutableForwardIndexV2(column, context.getMemoryManager());
262264
return clpMutableForwardIndex;

pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -1207,10 +1207,12 @@ private static void validateFieldConfigList(TableConfig tableConfig, @Nullable S
12071207
switch (encodingType) {
12081208
case RAW:
12091209
Preconditions.checkArgument(compressionCodec == null || compressionCodec.isApplicableToRawIndex()
1210-
|| compressionCodec == CompressionCodec.CLP || compressionCodec == CompressionCodec.CLPV2,
1210+
|| compressionCodec == CompressionCodec.CLP || compressionCodec == CompressionCodec.CLPV2
1211+
|| compressionCodec == CompressionCodec.CLPV2_ZSTD || compressionCodec == CompressionCodec.CLPV2_LZ4,
12111212
"Compression codec: %s is not applicable to raw index",
12121213
compressionCodec);
1213-
if ((compressionCodec == CompressionCodec.CLP || compressionCodec == CompressionCodec.CLPV2)
1214+
if ((compressionCodec == CompressionCodec.CLP || compressionCodec == CompressionCodec.CLPV2
1215+
|| compressionCodec == CompressionCodec.CLPV2_ZSTD || compressionCodec == CompressionCodec.CLPV2_LZ4)
12141216
&& schema != null) {
12151217
Preconditions.checkArgument(
12161218
schema.getFieldSpecFor(columnName).getDataType().getStoredType() == DataType.STRING,

pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/CLPForwardIndexCreatorV2Test.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -114,12 +114,12 @@ public void testCLPWriter()
114114
Assert.assertTrue((float) rawStringFwdIndexSizeZSTD / clpFwdIndexSizeZSTD >= 0.19);
115115
}
116116

117-
private long createStringRawForwardIndex(ChunkCompressionType compressionType, int maxLength)
117+
private long createStringRawForwardIndex(ChunkCompressionType chunkCompressionType, int maxLength)
118118
throws IOException {
119119
// Create a raw string immutable forward index
120120
TestUtils.ensureDirectoriesExistAndEmpty(TEMP_DIR);
121121
SingleValueVarByteRawIndexCreator index =
122-
new SingleValueVarByteRawIndexCreator(TEMP_DIR, compressionType, COLUMN_NAME, _logMessages.size(),
122+
new SingleValueVarByteRawIndexCreator(TEMP_DIR, chunkCompressionType, COLUMN_NAME, _logMessages.size(),
123123
FieldSpec.DataType.STRING, maxLength);
124124
for (String logMessage : _logMessages) {
125125
index.putString(logMessage);
@@ -132,9 +132,9 @@ private long createStringRawForwardIndex(ChunkCompressionType compressionType, i
132132
}
133133

134134
private long createAndValidateClpImmutableForwardIndex(CLPMutableForwardIndexV2 clpMutableForwardIndexV2,
135-
ChunkCompressionType compressionType)
135+
ChunkCompressionType chunkCompressionType)
136136
throws IOException {
137-
long indexSize = createClpImmutableForwardIndex(clpMutableForwardIndexV2, compressionType);
137+
long indexSize = createClpImmutableForwardIndex(clpMutableForwardIndexV2, chunkCompressionType);
138138

139139
// Read from immutable forward index and validate the content
140140
File indexFile = new File(TEMP_DIR, COLUMN_NAME + V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION);
@@ -149,12 +149,12 @@ private long createAndValidateClpImmutableForwardIndex(CLPMutableForwardIndexV2
149149
}
150150

151151
private long createClpImmutableForwardIndex(CLPMutableForwardIndexV2 clpMutableForwardIndexV2,
152-
ChunkCompressionType compressionType)
152+
ChunkCompressionType chunkCompressionType)
153153
throws IOException {
154154
// Create a CLP immutable forward index from mutable forward index
155155
TestUtils.ensureDirectoriesExistAndEmpty(TEMP_DIR);
156156
CLPForwardIndexCreatorV2 clpForwardIndexCreatorV2 =
157-
new CLPForwardIndexCreatorV2(TEMP_DIR, clpMutableForwardIndexV2, compressionType);
157+
new CLPForwardIndexCreatorV2(TEMP_DIR, clpMutableForwardIndexV2, chunkCompressionType);
158158
for (int i = 0; i < _logMessages.size(); i++) {
159159
clpForwardIndexCreatorV2.putString(clpMutableForwardIndexV2.getString(i));
160160
}

pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ForwardIndexConfig.java

+2
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,8 @@ public ForwardIndexConfig(@Nullable Boolean disabled, @Nullable CompressionCodec
116116
case PASS_THROUGH:
117117
case CLP:
118118
case CLPV2:
119+
case CLPV2_ZSTD:
120+
case CLPV2_LZ4:
119121
_chunkCompressionType = ChunkCompressionType.PASS_THROUGH;
120122
_dictIdCompressionType = null;
121123
break;

pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,10 @@ public enum CompressionCodec {
144144
// CLP is a special type of compression codec that isn't generally applicable to all RAW columns and has a special
145145
// handling for log lines (see {@link CLPForwardIndexCreatorV1} and {@link CLPForwardIndexCreatorV2)
146146
CLP(false, false),
147-
CLPV2(false, false);
147+
CLPV2(false, false),
148+
CLPV2_ZSTD(false, false),
149+
CLPV2_LZ4(false, false);
150+
148151
//@formatter:on
149152

150153
private final boolean _applicableToRawIndex;

0 commit comments

Comments
 (0)