diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java index aaa9db3f4fea..75797dcd14f1 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java @@ -139,6 +139,11 @@ public static boolean isSkipUpsert(Map queryOptions) { return Boolean.parseBoolean(queryOptions.get(QueryOptionKey.SKIP_UPSERT)); } + public static long getUpsertViewFreshnessMs(Map queryOptions) { + String freshnessMsString = queryOptions.get(QueryOptionKey.UPSERT_VIEW_FRESHNESS_MS); + return freshnessMsString != null ? Long.parseLong(freshnessMsString) : -1; + } + public static boolean isScanStarTreeNodes(Map queryOptions) { return Boolean.parseBoolean(queryOptions.get(QueryOptionKey.SCAN_STAR_TREE_NODES)); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java index cbb870037c45..f49297175859 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java @@ -302,7 +302,7 @@ public List getSegmentContexts(List selectedSegmen List segmentContexts = new ArrayList<>(selectedSegments.size()); selectedSegments.forEach(s -> segmentContexts.add(new SegmentContext(s))); if (isUpsertEnabled() && !QueryOptionsUtils.isSkipUpsert(queryOptions)) { - _tableUpsertMetadataManager.setSegmentContexts(segmentContexts); + _tableUpsertMetadataManager.setSegmentContexts(segmentContexts, queryOptions); } return segmentContexts; } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java index a6ba19ee5acd..6a873e3cc439 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java @@ -55,6 +55,7 @@ import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.metrics.ServerTimer; import org.apache.pinot.common.utils.SegmentUtils; +import org.apache.pinot.common.utils.config.QueryOptionsUtils; import org.apache.pinot.common.utils.helix.HelixHelper; import org.apache.pinot.segment.local.data.manager.TableDataManager; import org.apache.pinot.segment.local.indexsegment.immutable.EmptyIndexSegment; @@ -71,6 +72,7 @@ import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths; import org.apache.pinot.spi.config.table.HashFunction; import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.UpsertConfig; import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.utils.BooleanUtils; import org.apache.pinot.spi.utils.CommonConstants; @@ -122,6 +124,20 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps private final Lock _preloadLock = new ReentrantLock(); private volatile boolean _isPreloading; + // There are two consistency modes: + // If using SYNC mode, the upsert threads take the WLock when the upsert involves two segments' bitmaps; and + // the query threads take the RLock when getting bitmaps for all its selected segments. + // If using SNAPSHOT mode, the query threads don't need to take lock when getting bitmaps for all its selected + // segments, as the query threads access a copy of bitmaps that are kept updated by upsert thread periodically. But + // the query thread can specify a freshness threshold query option to refresh the bitmap copies if not fresh enough. + // By default, the mode is NONE to disable the support for data consistency. + private final UpsertConfig.ConsistencyMode _consistencyMode; + private final long _upsertViewRefreshIntervalMs; + private final ReadWriteLock _upsertViewLock = new ReentrantReadWriteLock(); + private final Set _updatedSegmentsSinceLastRefresh = ConcurrentHashMap.newKeySet(); + private volatile long _lastUpsertViewRefreshTimeMs = 0; + private volatile Map _segmentQueryableDocIdsMap; + protected BasePartitionUpsertMetadataManager(String tableNameWithType, int partitionId, UpsertContext context) { _tableNameWithType = tableNameWithType; _partitionId = partitionId; @@ -137,6 +153,9 @@ protected BasePartitionUpsertMetadataManager(String tableNameWithType, int parti _metadataTTL = context.getMetadataTTL(); _deletedKeysTTL = context.getDeletedKeysTTL(); _tableIndexDir = context.getTableIndexDir(); + UpsertConfig.ConsistencyMode cmode = context.getConsistencyMode(); + _consistencyMode = cmode != null ? cmode : UpsertConfig.ConsistencyMode.NONE; + _upsertViewRefreshIntervalMs = context.getUpsertViewRefreshIntervalMs(); _serverMetrics = ServerMetrics.get(); _logger = LoggerFactory.getLogger(tableNameWithType + "-" + partitionId + "-" + getClass().getSimpleName()); if (_metadataTTL > 0) { @@ -1019,27 +1038,88 @@ public synchronized void close() _logger.info("Closed the metadata manager"); } - protected void replaceDocId(ThreadSafeMutableRoaringBitmap validDocIds, + /** + * The same R/WLock is used by the two consistency modes, but they are independent: + * - For sync mode, upsert threads take WLock to make updates on two segments' bitmaps atomically, and query threads + * take RLock when to access the segment bitmaps. + * - For snapshot mode, upsert threads take RLock to make updates on segments' bitmaps so that they can be + * synchronized with threads taking the snapshot of bitmaps, which take the WLock. + */ + protected void replaceDocId(IndexSegment newSegment, ThreadSafeMutableRoaringBitmap validDocIds, @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, IndexSegment oldSegment, int oldDocId, int newDocId, RecordInfo recordInfo) { - removeDocId(oldSegment, oldDocId); - addDocId(validDocIds, queryableDocIds, newDocId, recordInfo); + if (_consistencyMode == UpsertConfig.ConsistencyMode.SYNC) { + _upsertViewLock.writeLock().lock(); + } else if (_consistencyMode == UpsertConfig.ConsistencyMode.SNAPSHOT) { + _upsertViewLock.readLock().lock(); + } + try { + doRemoveDocId(oldSegment, oldDocId); + doAddDocId(validDocIds, queryableDocIds, newDocId, recordInfo); + } finally { + if (_consistencyMode == UpsertConfig.ConsistencyMode.SYNC) { + _upsertViewLock.writeLock().unlock(); + } else if (_consistencyMode == UpsertConfig.ConsistencyMode.SNAPSHOT) { + _updatedSegmentsSinceLastRefresh.add(newSegment); + _updatedSegmentsSinceLastRefresh.add(oldSegment); + _upsertViewLock.readLock().unlock(); + // Batch refresh takes WLock. Do it outside RLock for clarity. The R/W lock ensures that only one thread + // can refresh the bitmaps. The other threads that are about to update the bitmaps will be blocked until + // refreshing is done. + doBatchRefreshUpsertView(_upsertViewRefreshIntervalMs); + } + } } - protected void replaceDocId(ThreadSafeMutableRoaringBitmap validDocIds, + /** + * There is no need to take the R/WLock to update single bitmap, as all methods to update the bitmap is synchronized. + * But for upsertViewBatchRefresh to work correctly, we need to block updates on bitmaps while doing batch refresh, + * which takes WLock. So wrap bitmap update logic inside RLock to allow concurrent updates but to be blocked when + * there is thread doing batch refresh, i.e. to take copies of all bitmaps. + */ + protected void replaceDocId(IndexSegment segment, ThreadSafeMutableRoaringBitmap validDocIds, @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, int oldDocId, int newDocId, RecordInfo recordInfo) { - validDocIds.replace(oldDocId, newDocId); - if (queryableDocIds != null) { - if (recordInfo.isDeleteRecord()) { - queryableDocIds.remove(oldDocId); - } else { - queryableDocIds.replace(oldDocId, newDocId); + if (_consistencyMode == UpsertConfig.ConsistencyMode.SNAPSHOT) { + _upsertViewLock.readLock().lock(); + } + try { + validDocIds.replace(oldDocId, newDocId); + if (queryableDocIds != null) { + if (recordInfo.isDeleteRecord()) { + queryableDocIds.remove(oldDocId); + } else { + queryableDocIds.replace(oldDocId, newDocId); + } + } + } finally { + if (_consistencyMode == UpsertConfig.ConsistencyMode.SNAPSHOT) { + _updatedSegmentsSinceLastRefresh.add(segment); + _upsertViewLock.readLock().unlock(); + // Batch refresh takes WLock. Do it outside RLock for clarity. + doBatchRefreshUpsertView(_upsertViewRefreshIntervalMs); } } } - protected void addDocId(ThreadSafeMutableRoaringBitmap validDocIds, + protected void addDocId(IndexSegment segment, ThreadSafeMutableRoaringBitmap validDocIds, @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, int docId, RecordInfo recordInfo) { + if (_consistencyMode == UpsertConfig.ConsistencyMode.SNAPSHOT) { + _upsertViewLock.readLock().lock(); + } + try { + doAddDocId(validDocIds, queryableDocIds, docId, recordInfo); + } finally { + if (_consistencyMode == UpsertConfig.ConsistencyMode.SNAPSHOT) { + _updatedSegmentsSinceLastRefresh.add(segment); + _upsertViewLock.readLock().unlock(); + // Batch refresh takes WLock. Do it outside RLock for clarity. + doBatchRefreshUpsertView(_upsertViewRefreshIntervalMs); + } + } + } + + private void doAddDocId(ThreadSafeMutableRoaringBitmap validDocIds, ThreadSafeMutableRoaringBitmap queryableDocIds, + int docId, RecordInfo recordInfo) { validDocIds.add(docId); if (queryableDocIds != null && !recordInfo.isDeleteRecord()) { queryableDocIds.add(docId); @@ -1047,6 +1127,22 @@ protected void addDocId(ThreadSafeMutableRoaringBitmap validDocIds, } protected void removeDocId(IndexSegment segment, int docId) { + if (_consistencyMode == UpsertConfig.ConsistencyMode.SNAPSHOT) { + _upsertViewLock.readLock().lock(); + } + try { + doRemoveDocId(segment, docId); + } finally { + if (_consistencyMode == UpsertConfig.ConsistencyMode.SNAPSHOT) { + _updatedSegmentsSinceLastRefresh.add(segment); + _upsertViewLock.readLock().unlock(); + // Batch refresh takes WLock. Do it outside RLock for clarity. + doBatchRefreshUpsertView(_upsertViewRefreshIntervalMs); + } + } + } + + private void doRemoveDocId(IndexSegment segment, int docId) { Objects.requireNonNull(segment.getValidDocIds()).remove(docId); ThreadSafeMutableRoaringBitmap currentQueryableDocIds = segment.getQueryableDocIds(); if (currentQueryableDocIds != null) { @@ -1058,11 +1154,37 @@ protected void removeDocId(IndexSegment segment, int docId) { * Use the segmentContexts to collect the contexts for selected segments. Reuse the segmentContext object if * present, to avoid overwriting the contexts specified at the others places. */ - public void setSegmentContexts(List segmentContexts) { + public void setSegmentContexts(List segmentContexts, Map queryOptions) { + if (_consistencyMode == UpsertConfig.ConsistencyMode.NONE) { + setSegmentContexts(segmentContexts); + return; + } + if (_consistencyMode == UpsertConfig.ConsistencyMode.SYNC) { + _upsertViewLock.readLock().lock(); + try { + setSegmentContexts(segmentContexts); + return; + } finally { + _upsertViewLock.readLock().unlock(); + } + } + // If batch refresh is enabled, the copy of bitmaps is kept updated and ready to use for a consistent view. + // The locking between query threads and upsert threads can be avoided when using batch refresh. + // Besides, queries can share the copy of bitmaps, w/o cloning the bitmaps by every single query. + // If query has specified a need for certain freshness, check the view and refresh it as needed. + // When refreshing the copy of map, we need to take the WLock so only one thread is refreshing view. + long upsertViewFreshnessMs = + Math.min(QueryOptionsUtils.getUpsertViewFreshnessMs(queryOptions), _upsertViewRefreshIntervalMs); + if (upsertViewFreshnessMs < 0) { + upsertViewFreshnessMs = _upsertViewRefreshIntervalMs; + } + doBatchRefreshUpsertView(upsertViewFreshnessMs); + Map currentUpsertView = _segmentQueryableDocIdsMap; for (SegmentContext segmentContext : segmentContexts) { IndexSegment segment = segmentContext.getIndexSegment(); - if (_trackedSegments.contains(segment)) { - segmentContext.setQueryableDocIdsSnapshot(getQueryableDocIdsSnapshotFromSegment(segment)); + MutableRoaringBitmap segmentView = currentUpsertView.get(segment); + if (segmentView != null) { + segmentContext.setQueryableDocIdsSnapshot(segmentView); } } } @@ -1081,6 +1203,51 @@ private static MutableRoaringBitmap getQueryableDocIdsSnapshotFromSegment(IndexS return queryableDocIdsSnapshot; } + private void setSegmentContexts(List segmentContexts) { + for (SegmentContext segmentContext : segmentContexts) { + IndexSegment segment = segmentContext.getIndexSegment(); + if (_trackedSegments.contains(segment)) { + segmentContext.setQueryableDocIdsSnapshot(getQueryableDocIdsSnapshotFromSegment(segment)); + } + } + } + + private boolean skipUpsertViewRefresh(long upsertViewFreshnessMs) { + if (upsertViewFreshnessMs < 0) { + return true; + } + return _lastUpsertViewRefreshTimeMs + upsertViewFreshnessMs > System.currentTimeMillis(); + } + + private void doBatchRefreshUpsertView(long upsertViewFreshnessMs) { + // Always refresh if the current view is still empty. + if (skipUpsertViewRefresh(upsertViewFreshnessMs) && _segmentQueryableDocIdsMap != null) { + return; + } + _upsertViewLock.writeLock().lock(); + try { + // Check again with lock, and always refresh if the current view is still empty. + Map current = _segmentQueryableDocIdsMap; + if (skipUpsertViewRefresh(upsertViewFreshnessMs) && current != null) { + return; + } + Map updated = new HashMap<>(); + for (IndexSegment segment : _trackedSegments) { + if (current == null || _updatedSegmentsSinceLastRefresh.contains(segment)) { + updated.put(segment, getQueryableDocIdsSnapshotFromSegment(segment)); + } else { + updated.put(segment, current.get(segment)); + } + } + // Swap in the new consistent set of bitmaps. + _segmentQueryableDocIdsMap = updated; + _updatedSegmentsSinceLastRefresh.clear(); + _lastUpsertViewRefreshTimeMs = System.currentTimeMillis(); + } finally { + _upsertViewLock.writeLock().unlock(); + } + } + protected void doClose() throws IOException { } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java index 2fbe4a28e512..ea5aba5d2124 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java @@ -68,19 +68,23 @@ public void init(TableConfig tableConfig, Schema schema, TableDataManager tableD boolean enablePreload = upsertConfig.isEnablePreload(); double metadataTTL = upsertConfig.getMetadataTTL(); double deletedKeysTTL = upsertConfig.getDeletedKeysTTL(); + UpsertConfig.ConsistencyMode consistencyMode = upsertConfig.getConsistencyMode(); + long upsertViewRefreshIntervalMs = upsertConfig.getUpsertViewRefreshIntervalMs(); File tableIndexDir = tableDataManager.getTableDataDir(); _context = new UpsertContext.Builder().setTableConfig(tableConfig).setSchema(schema) .setPrimaryKeyColumns(primaryKeyColumns).setComparisonColumns(comparisonColumns) .setDeleteRecordColumn(deleteRecordColumn).setHashFunction(hashFunction) .setPartialUpsertHandler(partialUpsertHandler).setEnableSnapshot(enableSnapshot).setEnablePreload(enablePreload) - .setMetadataTTL(metadataTTL).setDeletedKeysTTL(deletedKeysTTL).setTableIndexDir(tableIndexDir) + .setMetadataTTL(metadataTTL).setDeletedKeysTTL(deletedKeysTTL).setConsistencyMode(consistencyMode) + .setUpsertViewRefreshIntervalMs(upsertViewRefreshIntervalMs).setTableIndexDir(tableIndexDir) .setTableDataManager(tableDataManager).build(); LOGGER.info( "Initialized {} for table: {} with primary key columns: {}, comparison columns: {}, delete record column: {}," + " hash function: {}, upsert mode: {}, enable snapshot: {}, enable preload: {}, metadata TTL: {}," - + " deleted Keys TTL: {}, table index dir: {}", getClass().getSimpleName(), _tableNameWithType, - primaryKeyColumns, comparisonColumns, deleteRecordColumn, hashFunction, upsertConfig.getMode(), enableSnapshot, - enablePreload, metadataTTL, deletedKeysTTL, tableIndexDir); + + " deleted Keys TTL: {}, consistency mode: {}, upsert view refresh interval: {}ms, table index dir: {}", + getClass().getSimpleName(), _tableNameWithType, primaryKeyColumns, comparisonColumns, deleteRecordColumn, + hashFunction, upsertConfig.getMode(), enableSnapshot, enablePreload, metadataTTL, deletedKeysTTL, + consistencyMode, upsertViewRefreshIntervalMs, tableIndexDir); initCustomVariables(); } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java index c65f101aca79..5980cbbacefe 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java @@ -93,7 +93,7 @@ protected void addOrReplaceSegment(ImmutableSegmentImpl segment, ThreadSafeMutab // iterator will return records with incremental doc ids. if (currentSegment == segment) { if (comparisonResult >= 0) { - replaceDocId(validDocIds, queryableDocIds, currentDocId, newDocId, recordInfo); + replaceDocId(segment, validDocIds, queryableDocIds, currentDocId, newDocId, recordInfo); return new RecordLocation(segment, newDocId, newComparisonValue); } else { return currentRecordLocation; @@ -108,7 +108,7 @@ protected void addOrReplaceSegment(ImmutableSegmentImpl segment, ThreadSafeMutab // snapshot for the old segment, which can be updated and used to track the docs not replaced yet. if (currentSegment == oldSegment) { if (comparisonResult >= 0) { - addDocId(validDocIds, queryableDocIds, newDocId, recordInfo); + addDocId(segment, validDocIds, queryableDocIds, newDocId, recordInfo); if (validDocIdsForOldSegment != null) { validDocIdsForOldSegment.remove(currentDocId); } @@ -124,7 +124,7 @@ protected void addOrReplaceSegment(ImmutableSegmentImpl segment, ThreadSafeMutab if (currentSegmentName.equals(segmentName)) { numKeysInWrongSegment.getAndIncrement(); if (comparisonResult >= 0) { - addDocId(validDocIds, queryableDocIds, newDocId, recordInfo); + addDocId(segment, validDocIds, queryableDocIds, newDocId, recordInfo); return new RecordLocation(segment, newDocId, newComparisonValue); } else { return currentRecordLocation; @@ -139,14 +139,14 @@ protected void addOrReplaceSegment(ImmutableSegmentImpl segment, ThreadSafeMutab && LLCSegmentName.isLLCSegment(currentSegmentName) && LLCSegmentName.getSequenceNumber(segmentName) > LLCSegmentName.getSequenceNumber( currentSegmentName))) { - replaceDocId(validDocIds, queryableDocIds, currentSegment, currentDocId, newDocId, recordInfo); + replaceDocId(segment, validDocIds, queryableDocIds, currentSegment, currentDocId, newDocId, recordInfo); return new RecordLocation(segment, newDocId, newComparisonValue); } else { return currentRecordLocation; } } else { // New primary key - addDocId(validDocIds, queryableDocIds, newDocId, recordInfo); + addDocId(segment, validDocIds, queryableDocIds, newDocId, recordInfo); return new RecordLocation(segment, newDocId, newComparisonValue); } }); @@ -166,7 +166,7 @@ protected void addSegmentWithoutUpsert(ImmutableSegmentImpl segment, ThreadSafeM RecordInfo recordInfo = recordInfoIterator.next(); int newDocId = recordInfo.getDocId(); Comparable newComparisonValue = recordInfo.getComparisonValue(); - addDocId(validDocIds, queryableDocIds, newDocId, recordInfo); + addDocId(segment, validDocIds, queryableDocIds, newDocId, recordInfo); _primaryKeyToRecordLocationMap.put(HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(), _hashFunction), new RecordLocation(segment, newDocId, newComparisonValue)); } @@ -275,9 +275,9 @@ protected boolean doAddRecord(MutableSegment segment, RecordInfo recordInfo) { IndexSegment currentSegment = currentRecordLocation.getSegment(); int currentDocId = currentRecordLocation.getDocId(); if (segment == currentSegment) { - replaceDocId(validDocIds, queryableDocIds, currentDocId, newDocId, recordInfo); + replaceDocId(segment, validDocIds, queryableDocIds, currentDocId, newDocId, recordInfo); } else { - replaceDocId(validDocIds, queryableDocIds, currentSegment, currentDocId, newDocId, recordInfo); + replaceDocId(segment, validDocIds, queryableDocIds, currentSegment, currentDocId, newDocId, recordInfo); } return new RecordLocation(segment, newDocId, newComparisonValue); } else { @@ -288,7 +288,7 @@ protected boolean doAddRecord(MutableSegment segment, RecordInfo recordInfo) { } } else { // New primary key - addDocId(validDocIds, queryableDocIds, newDocId, recordInfo); + addDocId(segment, validDocIds, queryableDocIds, newDocId, recordInfo); return new RecordLocation(segment, newDocId, newComparisonValue); } }); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java index 4e3e1684028c..4c2f995df8ca 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java @@ -58,9 +58,10 @@ public Map getPartitionToPrimaryKeyCount() { } @Override - public void setSegmentContexts(List segmentContexts) { + public void setSegmentContexts(List segmentContexts, Map queryOptions) { _partitionMetadataManagerMap.forEach( - (partitionID, upsertMetadataManager) -> upsertMetadataManager.setSegmentContexts(segmentContexts)); + (partitionID, upsertMetadataManager) -> upsertMetadataManager.setSegmentContexts(segmentContexts, + queryOptions)); } @Override diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java index 921c31dfa31a..db21f2030e22 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java @@ -53,6 +53,6 @@ public interface TableUpsertMetadataManager extends Closeable { */ Map getPartitionToPrimaryKeyCount(); - default void setSegmentContexts(List segmentContexts) { + default void setSegmentContexts(List segmentContexts, Map queryOptions) { } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertContext.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertContext.java index a45662d389ec..32cf34343b2a 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertContext.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertContext.java @@ -26,6 +26,7 @@ import org.apache.pinot.segment.local.data.manager.TableDataManager; import org.apache.pinot.spi.config.table.HashFunction; import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.UpsertConfig; import org.apache.pinot.spi.data.Schema; @@ -41,13 +42,16 @@ public class UpsertContext { private final boolean _enablePreload; private final double _metadataTTL; private final double _deletedKeysTTL; + private final UpsertConfig.ConsistencyMode _consistencyMode; + private final long _upsertViewRefreshIntervalMs; private final File _tableIndexDir; private final TableDataManager _tableDataManager; private UpsertContext(TableConfig tableConfig, Schema schema, List primaryKeyColumns, List comparisonColumns, @Nullable String deleteRecordColumn, HashFunction hashFunction, @Nullable PartialUpsertHandler partialUpsertHandler, boolean enableSnapshot, boolean enablePreload, - double metadataTTL, double deletedKeysTTL, File tableIndexDir, @Nullable TableDataManager tableDataManager) { + double metadataTTL, double deletedKeysTTL, UpsertConfig.ConsistencyMode consistencyMode, + long upsertViewRefreshIntervalMs, File tableIndexDir, @Nullable TableDataManager tableDataManager) { _tableConfig = tableConfig; _schema = schema; _primaryKeyColumns = primaryKeyColumns; @@ -59,6 +63,8 @@ private UpsertContext(TableConfig tableConfig, Schema schema, List prima _enablePreload = enablePreload; _metadataTTL = metadataTTL; _deletedKeysTTL = deletedKeysTTL; + _consistencyMode = consistencyMode; + _upsertViewRefreshIntervalMs = upsertViewRefreshIntervalMs; _tableIndexDir = tableIndexDir; _tableDataManager = tableDataManager; } @@ -107,6 +113,14 @@ public double getDeletedKeysTTL() { return _deletedKeysTTL; } + public UpsertConfig.ConsistencyMode getConsistencyMode() { + return _consistencyMode; + } + + public long getUpsertViewRefreshIntervalMs() { + return _upsertViewRefreshIntervalMs; + } + public File getTableIndexDir() { return _tableIndexDir; } @@ -127,6 +141,8 @@ public static class Builder { private boolean _enablePreload; private double _metadataTTL; private double _deletedKeysTTL; + private UpsertConfig.ConsistencyMode _consistencyMode; + private long _upsertViewRefreshIntervalMs; private File _tableIndexDir; private TableDataManager _tableDataManager; @@ -185,6 +201,16 @@ public Builder setDeletedKeysTTL(double deletedKeysTTL) { return this; } + public Builder setConsistencyMode(UpsertConfig.ConsistencyMode consistencyMode) { + _consistencyMode = consistencyMode; + return this; + } + + public Builder setUpsertViewRefreshIntervalMs(long upsertViewRefreshIntervalMs) { + _upsertViewRefreshIntervalMs = upsertViewRefreshIntervalMs; + return this; + } + public Builder setTableIndexDir(File tableIndexDir) { _tableIndexDir = tableIndexDir; return this; @@ -204,7 +230,7 @@ public UpsertContext build() { Preconditions.checkState(_tableIndexDir != null, "Table index directory must be set"); return new UpsertContext(_tableConfig, _schema, _primaryKeyColumns, _comparisonColumns, _deleteRecordColumn, _hashFunction, _partialUpsertHandler, _enableSnapshot, _enablePreload, _metadataTTL, _deletedKeysTTL, - _tableIndexDir, _tableDataManager); + _consistencyMode, _upsertViewRefreshIntervalMs, _tableIndexDir, _tableDataManager); } } } diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java index 3cb16b52b637..4a0d127fa70f 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java @@ -30,8 +30,10 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.Nullable; import org.apache.commons.io.FileUtils; @@ -46,6 +48,7 @@ import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; import org.apache.pinot.segment.spi.IndexSegment; import org.apache.pinot.segment.spi.MutableSegment; +import org.apache.pinot.segment.spi.SegmentContext; import org.apache.pinot.segment.spi.V1Constants; import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; import org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap; @@ -65,9 +68,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertFalse; -import static org.testng.Assert.assertTrue; +import static org.testng.Assert.*; public class BasePartitionUpsertMetadataManagerTest { @@ -242,6 +243,209 @@ public void testTakeSnapshotInOrder() assertEquals(seg03.loadValidDocIdsFromSnapshot().getCardinality(), 3); } + @Test + public void testConsistencyModeSync() + throws Exception { + UpsertContext upsertContext = mock(UpsertContext.class); + when(upsertContext.getConsistencyMode()).thenReturn(UpsertConfig.ConsistencyMode.SYNC); + DummyPartitionUpsertMetadataManager upsertMetadataManager = + new DummyPartitionUpsertMetadataManager("myTable", 0, upsertContext); + + CountDownLatch latch = new CountDownLatch(1); + Map segmentQueryableDocIdsMap = new HashMap<>(); + IndexSegment seg01 = mock(IndexSegment.class); + ThreadSafeMutableRoaringBitmap validDocIds01 = createThreadSafeMutableRoaringBitmap(10); + AtomicBoolean called = new AtomicBoolean(false); + when(seg01.getValidDocIds()).then(invocationOnMock -> { + called.set(true); + latch.await(); + return validDocIds01; + }); + upsertMetadataManager.trackSegment(seg01); + segmentQueryableDocIdsMap.put(seg01, validDocIds01); + + IndexSegment seg02 = mock(IndexSegment.class); + ThreadSafeMutableRoaringBitmap validDocIds02 = createThreadSafeMutableRoaringBitmap(11); + when(seg02.getValidDocIds()).thenReturn(validDocIds02); + upsertMetadataManager.trackSegment(seg02); + segmentQueryableDocIdsMap.put(seg02, validDocIds02); + + IndexSegment seg03 = mock(IndexSegment.class); + ThreadSafeMutableRoaringBitmap validDocIds03 = createThreadSafeMutableRoaringBitmap(12); + when(seg03.getValidDocIds()).thenReturn(validDocIds03); + upsertMetadataManager.trackSegment(seg03); + segmentQueryableDocIdsMap.put(seg03, validDocIds03); + + List segmentContexts = new ArrayList<>(); + ExecutorService executor = Executors.newFixedThreadPool(2); + try { + // This thread does replaceDocId and takes WLock first. + executor.submit(() -> { + RecordInfo recordInfo = new RecordInfo(null, 5, null, false); + upsertMetadataManager.replaceDocId(seg03, validDocIds03, null, seg01, 0, 12, recordInfo); + }); + // This thread gets segment contexts, but it's blocked until the first thread finishes replaceDocId. + Future future = executor.submit(() -> { + long startMs = System.currentTimeMillis(); + // Check called flag to let the first thread do replaceDocId, thus get WLock, first. + while (!called.get()) { + Thread.sleep(10); + } + segmentQueryableDocIdsMap.forEach((k, v) -> segmentContexts.add(new SegmentContext(k))); + upsertMetadataManager.setSegmentContexts(segmentContexts, new HashMap<>()); + return System.currentTimeMillis() - startMs; + }); + // The first thread can only finish after the latch is counted down after 2s. + // So the 2nd thread getting segment contexts will be blocked for 2s+. + Thread.sleep(2000); + latch.countDown(); + long duration = future.get(); + assertTrue(duration >= 2000, duration + " was less than expected"); + } finally { + executor.shutdownNow(); + } + + for (SegmentContext sc : segmentContexts) { + ThreadSafeMutableRoaringBitmap validDocIds = segmentQueryableDocIdsMap.get(sc.getIndexSegment()); + assertNotNull(validDocIds); + // SegmentContext holds a clone of the original queryableDocIds held by the segment object. + assertNotSame(sc.getQueryableDocIdsSnapshot(), validDocIds.getMutableRoaringBitmap()); + assertEquals(sc.getQueryableDocIdsSnapshot(), validDocIds.getMutableRoaringBitmap()); + // docId=0 in seg01 got invalidated. + if (sc.getIndexSegment() == seg01) { + assertFalse(sc.getQueryableDocIdsSnapshot().contains(0)); + } + // docId=12 in seg03 was newly added. + if (sc.getIndexSegment() == seg03) { + assertTrue(sc.getQueryableDocIdsSnapshot().contains(12)); + } + } + } + + @Test + public void testConsistencyModeSnapshot() + throws Exception { + UpsertContext upsertContext = mock(UpsertContext.class); + when(upsertContext.getConsistencyMode()).thenReturn(UpsertConfig.ConsistencyMode.SNAPSHOT); + when(upsertContext.getUpsertViewRefreshIntervalMs()).thenReturn(3000L); + DummyPartitionUpsertMetadataManager upsertMetadataManager = + new DummyPartitionUpsertMetadataManager("myTable", 0, upsertContext); + + CountDownLatch latch = new CountDownLatch(1); + Map segmentQueryableDocIdsMap = new HashMap<>(); + IndexSegment seg01 = mock(IndexSegment.class); + ThreadSafeMutableRoaringBitmap validDocIds01 = createThreadSafeMutableRoaringBitmap(10); + AtomicBoolean called = new AtomicBoolean(false); + when(seg01.getValidDocIds()).then(invocationOnMock -> { + called.set(true); + latch.await(); + return validDocIds01; + }); + upsertMetadataManager.trackSegment(seg01); + segmentQueryableDocIdsMap.put(seg01, validDocIds01); + + IndexSegment seg02 = mock(IndexSegment.class); + ThreadSafeMutableRoaringBitmap validDocIds02 = createThreadSafeMutableRoaringBitmap(11); + when(seg02.getValidDocIds()).thenReturn(validDocIds02); + upsertMetadataManager.trackSegment(seg02); + segmentQueryableDocIdsMap.put(seg02, validDocIds02); + + IndexSegment seg03 = mock(IndexSegment.class); + ThreadSafeMutableRoaringBitmap validDocIds03 = createThreadSafeMutableRoaringBitmap(12); + when(seg03.getValidDocIds()).thenReturn(validDocIds03); + upsertMetadataManager.trackSegment(seg03); + segmentQueryableDocIdsMap.put(seg03, validDocIds03); + + List segmentContexts = new ArrayList<>(); + ExecutorService executor = Executors.newFixedThreadPool(2); + try { + // This thread does replaceDocId and takes WLock first, and it'll refresh the upsert view + executor.submit(() -> { + RecordInfo recordInfo = new RecordInfo(null, 5, null, false); + upsertMetadataManager.replaceDocId(seg03, validDocIds03, null, seg01, 0, 12, recordInfo); + }); + // This thread gets segment contexts, but it's blocked until the first thread finishes replaceDocId. + Future future = executor.submit(() -> { + long startMs = System.currentTimeMillis(); + // Check called flag to let the first thread do replaceDocId, thus get WLock, first. + while (!called.get()) { + Thread.sleep(10); + } + segmentQueryableDocIdsMap.forEach((k, v) -> segmentContexts.add(new SegmentContext(k))); + // This thread reuses the upsert view refreshed by the first thread above. + upsertMetadataManager.setSegmentContexts(segmentContexts, new HashMap<>()); + return System.currentTimeMillis() - startMs; + }); + // The first thread can only finish after the latch is counted down after 2s. + // So the 2nd thread getting segment contexts will be blocked for 2s+. + Thread.sleep(2000); + latch.countDown(); + long duration = future.get(); + assertTrue(duration >= 2000, duration + " was less than expected"); + } finally { + executor.shutdownNow(); + } + + // Get the upsert view again, and the existing bitmap objects should be set in segment contexts. + // The segmentContexts initialized above holds the same bitmaps objects as from the upsert view. + List reuseSegmentContexts = new ArrayList<>(); + upsertMetadataManager.setSegmentContexts(reuseSegmentContexts, new HashMap<>()); + for (SegmentContext reuseSC : reuseSegmentContexts) { + for (SegmentContext sc : segmentContexts) { + if (reuseSC.getIndexSegment() == sc.getIndexSegment()) { + assertSame(reuseSC.getQueryableDocIdsSnapshot(), sc.getQueryableDocIdsSnapshot()); + } + } + ThreadSafeMutableRoaringBitmap validDocIds = segmentQueryableDocIdsMap.get(reuseSC.getIndexSegment()); + assertNotNull(validDocIds); + // The upsert view holds a clone of the original queryableDocIds held by the segment object. + assertNotSame(reuseSC.getQueryableDocIdsSnapshot(), validDocIds.getMutableRoaringBitmap()); + assertEquals(reuseSC.getQueryableDocIdsSnapshot(), validDocIds.getMutableRoaringBitmap()); + // docId=0 in seg01 got invalidated. + if (reuseSC.getIndexSegment() == seg01) { + assertFalse(reuseSC.getQueryableDocIdsSnapshot().contains(0)); + } + // docId=12 in seg03 was newly added. + if (reuseSC.getIndexSegment() == seg03) { + assertTrue(reuseSC.getQueryableDocIdsSnapshot().contains(12)); + } + } + + // Force refresh the upsert view when getting it, so different bitmap objects should be set in segment contexts. + List refreshSegmentContexts = new ArrayList<>(); + Map queryOptions = new HashMap<>(); + queryOptions.put("upsertViewFreshnessMs", "0"); + upsertMetadataManager.setSegmentContexts(refreshSegmentContexts, queryOptions); + for (SegmentContext refreshSC : refreshSegmentContexts) { + for (SegmentContext sc : segmentContexts) { + if (refreshSC.getIndexSegment() == sc.getIndexSegment()) { + assertNotSame(refreshSC.getQueryableDocIdsSnapshot(), sc.getQueryableDocIdsSnapshot()); + } + } + ThreadSafeMutableRoaringBitmap validDocIds = segmentQueryableDocIdsMap.get(refreshSC.getIndexSegment()); + assertNotNull(validDocIds); + // The upsert view holds a clone of the original queryableDocIds held by the segment object. + assertNotSame(refreshSC.getQueryableDocIdsSnapshot(), validDocIds.getMutableRoaringBitmap()); + assertEquals(refreshSC.getQueryableDocIdsSnapshot(), validDocIds.getMutableRoaringBitmap()); + // docId=0 in seg01 got invalidated. + if (refreshSC.getIndexSegment() == seg01) { + assertFalse(refreshSC.getQueryableDocIdsSnapshot().contains(0)); + } + // docId=12 in seg03 was newly added. + if (refreshSC.getIndexSegment() == seg03) { + assertTrue(refreshSC.getQueryableDocIdsSnapshot().contains(12)); + } + } + } + + private static ThreadSafeMutableRoaringBitmap createThreadSafeMutableRoaringBitmap(int docCnt) { + ThreadSafeMutableRoaringBitmap bitmap = new ThreadSafeMutableRoaringBitmap(); + for (int i = 0; i < docCnt; i++) { + bitmap.add(i); + } + return bitmap; + } + @Test public void testResolveComparisonTies() { // Build a record info list for testing diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java index 848c99199963..fcab1ab85808 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java @@ -39,6 +39,10 @@ public enum Strategy { APPEND, IGNORE, INCREMENT, MAX, MIN, OVERWRITE, UNION } + public enum ConsistencyMode { + NONE, SYNC, SNAPSHOT + } + @JsonPropertyDescription("Upsert mode.") private Mode _mode; @@ -75,6 +79,12 @@ public enum Strategy { @JsonPropertyDescription("Whether to preload segments for fast upsert metadata recovery") private boolean _enablePreload; + @JsonPropertyDescription("Configure the way to provide consistent view for upsert table") + private ConsistencyMode _consistencyMode = ConsistencyMode.NONE; + + @JsonPropertyDescription("Refresh interval when using the snapshot consistency mode") + private long _upsertViewRefreshIntervalMs = 3000; + @JsonPropertyDescription("Custom class for upsert metadata manager") private String _metadataManagerClass; @@ -147,6 +157,14 @@ public boolean isEnablePreload() { return _enablePreload; } + public ConsistencyMode getConsistencyMode() { + return _consistencyMode; + } + + public long getUpsertViewRefreshIntervalMs() { + return _upsertViewRefreshIntervalMs; + } + public boolean isDropOutOfOrderRecord() { return _dropOutOfOrderRecord; } @@ -237,6 +255,14 @@ public void setEnablePreload(boolean enablePreload) { _enablePreload = enablePreload; } + public void setConsistencyMode(ConsistencyMode consistencyMode) { + _consistencyMode = consistencyMode; + } + + public void setUpsertViewRefreshIntervalMs(long upsertViewRefreshIntervalMs) { + _upsertViewRefreshIntervalMs = upsertViewRefreshIntervalMs; + } + public void setDropOutOfOrderRecord(boolean dropOutOfOrderRecord) { _dropOutOfOrderRecord = dropOutOfOrderRecord; } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index c02423d4b619..c61101d336ff 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -352,6 +352,7 @@ public static class Request { public static class QueryOptionKey { public static final String TIMEOUT_MS = "timeoutMs"; public static final String SKIP_UPSERT = "skipUpsert"; + public static final String UPSERT_VIEW_FRESHNESS_MS = "upsertViewFreshnessMs"; public static final String USE_STAR_TREE = "useStarTree"; public static final String SCAN_STAR_TREE_NODES = "scanStarTreeNodes"; public static final String ROUTING_OPTIONS = "routingOptions";