From c4c236edcad4e79cf7126672f65158d5c83c7a7b Mon Sep 17 00:00:00 2001 From: Xiaobing Li Date: Wed, 10 Apr 2024 13:54:05 -0700 Subject: [PATCH 1/6] add locking logic to get consistent table view for upsert tables --- .../utils/config/QueryOptionsUtils.java | 5 + .../realtime/RealtimeTableDataManager.java | 2 +- .../BasePartitionUpsertMetadataManager.java | 177 ++++++++++++++++-- .../BaseTableUpsertMetadataManager.java | 13 +- ...rentMapPartitionUpsertMetadataManager.java | 18 +- ...ncurrentMapTableUpsertMetadataManager.java | 5 +- .../upsert/TableUpsertMetadataManager.java | 2 +- .../segment/local/upsert/UpsertContext.java | 42 ++++- .../pinot/spi/config/table/UpsertConfig.java | 33 ++++ .../pinot/spi/utils/CommonConstants.java | 1 + 10 files changed, 266 insertions(+), 32 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java index aaa9db3f4fea..75797dcd14f1 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java @@ -139,6 +139,11 @@ public static boolean isSkipUpsert(Map queryOptions) { return Boolean.parseBoolean(queryOptions.get(QueryOptionKey.SKIP_UPSERT)); } + public static long getUpsertViewFreshnessMs(Map queryOptions) { + String freshnessMsString = queryOptions.get(QueryOptionKey.UPSERT_VIEW_FRESHNESS_MS); + return freshnessMsString != null ? Long.parseLong(freshnessMsString) : -1; + } + public static boolean isScanStarTreeNodes(Map queryOptions) { return Boolean.parseBoolean(queryOptions.get(QueryOptionKey.SCAN_STAR_TREE_NODES)); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java index cbb870037c45..f49297175859 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java @@ -302,7 +302,7 @@ public List getSegmentContexts(List selectedSegmen List segmentContexts = new ArrayList<>(selectedSegments.size()); selectedSegments.forEach(s -> segmentContexts.add(new SegmentContext(s))); if (isUpsertEnabled() && !QueryOptionsUtils.isSkipUpsert(queryOptions)) { - _tableUpsertMetadataManager.setSegmentContexts(segmentContexts); + _tableUpsertMetadataManager.setSegmentContexts(segmentContexts, queryOptions); } return segmentContexts; } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java index a6ba19ee5acd..acd20b0a9828 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java @@ -55,6 +55,7 @@ import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.metrics.ServerTimer; import org.apache.pinot.common.utils.SegmentUtils; +import org.apache.pinot.common.utils.config.QueryOptionsUtils; import org.apache.pinot.common.utils.helix.HelixHelper; import org.apache.pinot.segment.local.data.manager.TableDataManager; import org.apache.pinot.segment.local.indexsegment.immutable.EmptyIndexSegment; @@ -122,6 +123,20 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps private final Lock _preloadLock = new ReentrantLock(); private volatile boolean _isPreloading; + // Below are configs to enable consistent upsert view. + // If enabling upsert view, the upsert threads take the WLock when the upsert involves two segments' bitmaps; and + // the query threads take the RLock when getting bitmaps for all its selected segments. + // If enabling block refresh, the query threads don't need to take lock when getting bitmaps for all its selected + // segments, as the query threads access a copy of bitmaps that are kept updated by upsert thread periodically. But + // the query thread can specify a freshness threshold query option to refresh the bitmap copies if not fresh enough. + private final boolean _enableUpsertView; + private final boolean _enableUpsertViewBatchRefresh; + private final long _upsertViewRefreshIntervalMs; + private final ReadWriteLock _upsertViewLock = new ReentrantReadWriteLock(); + private final Set _updatedSegmentsSinceLastRefresh = ConcurrentHashMap.newKeySet(); + private volatile long _lastUpsertViewRefreshTimeMs = 0; + private volatile Map _segmentQueryableDocIdsMap = new HashMap<>(); + protected BasePartitionUpsertMetadataManager(String tableNameWithType, int partitionId, UpsertContext context) { _tableNameWithType = tableNameWithType; _partitionId = partitionId; @@ -137,6 +152,9 @@ protected BasePartitionUpsertMetadataManager(String tableNameWithType, int parti _metadataTTL = context.getMetadataTTL(); _deletedKeysTTL = context.getDeletedKeysTTL(); _tableIndexDir = context.getTableIndexDir(); + _enableUpsertView = context.isUpsertViewEnabled(); + _enableUpsertViewBatchRefresh = _enableUpsertView && context.isUpsertViewBatchRefreshEnabled(); + _upsertViewRefreshIntervalMs = context.getUpsertViewRefreshIntervalMs(); _serverMetrics = ServerMetrics.get(); _logger = LoggerFactory.getLogger(tableNameWithType + "-" + partitionId + "-" + getClass().getSimpleName()); if (_metadataTTL > 0) { @@ -1019,27 +1037,79 @@ public synchronized void close() _logger.info("Closed the metadata manager"); } - protected void replaceDocId(ThreadSafeMutableRoaringBitmap validDocIds, + /** + * Use WLock to make updates on two segments' bitmaps atomically. + */ + protected void replaceDocId(IndexSegment newSegment, ThreadSafeMutableRoaringBitmap validDocIds, @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, IndexSegment oldSegment, int oldDocId, int newDocId, RecordInfo recordInfo) { - removeDocId(oldSegment, oldDocId); - addDocId(validDocIds, queryableDocIds, newDocId, recordInfo); + if (_enableUpsertView) { + _upsertViewLock.writeLock().lock(); + } + try { + doRemoveDocId(oldSegment, oldDocId); + doAddDocId(validDocIds, queryableDocIds, newDocId, recordInfo); + if (_enableUpsertViewBatchRefresh) { + _updatedSegmentsSinceLastRefresh.add(newSegment); + _updatedSegmentsSinceLastRefresh.add(oldSegment); + doBatchRefreshUpsertView(_upsertViewRefreshIntervalMs); + } + } finally { + if (_enableUpsertView) { + _upsertViewLock.writeLock().unlock(); + } + } } - protected void replaceDocId(ThreadSafeMutableRoaringBitmap validDocIds, + /** + * There is no need to take the R/WLock to update single bitmap, as all methods to update the bitmap is synchronized. + * But for upsertViewBatchRefresh to work correctly, we need to block updates on bitmaps while doing batch refresh, + * which takes WLock. So wrap bitmap update logic inside RLock to allow concurrent updates but to be blocked when + * there is thread doing batch refresh, i.e. to take copies of all bitmaps. + */ + protected void replaceDocId(IndexSegment segment, ThreadSafeMutableRoaringBitmap validDocIds, @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, int oldDocId, int newDocId, RecordInfo recordInfo) { - validDocIds.replace(oldDocId, newDocId); - if (queryableDocIds != null) { - if (recordInfo.isDeleteRecord()) { - queryableDocIds.remove(oldDocId); - } else { - queryableDocIds.replace(oldDocId, newDocId); + if (_enableUpsertViewBatchRefresh) { + _upsertViewLock.readLock().lock(); + } + try { + validDocIds.replace(oldDocId, newDocId); + if (queryableDocIds != null) { + if (recordInfo.isDeleteRecord()) { + queryableDocIds.remove(oldDocId); + } else { + queryableDocIds.replace(oldDocId, newDocId); + } + } + } finally { + if (_enableUpsertViewBatchRefresh) { + _updatedSegmentsSinceLastRefresh.add(segment); + _upsertViewLock.readLock().unlock(); + // Do batch refresh outside the RLock block. + doBatchRefreshUpsertView(_upsertViewRefreshIntervalMs); } } } - protected void addDocId(ThreadSafeMutableRoaringBitmap validDocIds, + protected void addDocId(IndexSegment segment, ThreadSafeMutableRoaringBitmap validDocIds, @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, int docId, RecordInfo recordInfo) { + if (_enableUpsertViewBatchRefresh) { + _upsertViewLock.readLock().lock(); + } + try { + doAddDocId(validDocIds, queryableDocIds, docId, recordInfo); + } finally { + if (_enableUpsertViewBatchRefresh) { + _updatedSegmentsSinceLastRefresh.add(segment); + _upsertViewLock.readLock().unlock(); + // Do batch refresh outside the RLock block. + doBatchRefreshUpsertView(_upsertViewRefreshIntervalMs); + } + } + } + + private void doAddDocId(ThreadSafeMutableRoaringBitmap validDocIds, ThreadSafeMutableRoaringBitmap queryableDocIds, + int docId, RecordInfo recordInfo) { validDocIds.add(docId); if (queryableDocIds != null && !recordInfo.isDeleteRecord()) { queryableDocIds.add(docId); @@ -1047,6 +1117,22 @@ protected void addDocId(ThreadSafeMutableRoaringBitmap validDocIds, } protected void removeDocId(IndexSegment segment, int docId) { + if (_enableUpsertViewBatchRefresh) { + _upsertViewLock.readLock().lock(); + } + try { + doRemoveDocId(segment, docId); + } finally { + if (_enableUpsertViewBatchRefresh) { + _updatedSegmentsSinceLastRefresh.add(segment); + _upsertViewLock.readLock().unlock(); + // Do batch refresh outside the RLock block. + doBatchRefreshUpsertView(_upsertViewRefreshIntervalMs); + } + } + } + + private void doRemoveDocId(IndexSegment segment, int docId) { Objects.requireNonNull(segment.getValidDocIds()).remove(docId); ThreadSafeMutableRoaringBitmap currentQueryableDocIds = segment.getQueryableDocIds(); if (currentQueryableDocIds != null) { @@ -1058,11 +1144,32 @@ protected void removeDocId(IndexSegment segment, int docId) { * Use the segmentContexts to collect the contexts for selected segments. Reuse the segmentContext object if * present, to avoid overwriting the contexts specified at the others places. */ - public void setSegmentContexts(List segmentContexts) { + public void setSegmentContexts(List segmentContexts, Map queryOptions) { + if (!_enableUpsertView) { + setSegmentContexts(segmentContexts); + return; + } + // If not using batch refresh, get the bitmaps from segments directly but with RLock for consistency. + if (!_enableUpsertViewBatchRefresh) { + _upsertViewLock.readLock().lock(); + try { + setSegmentContexts(segmentContexts); + return; + } finally { + _upsertViewLock.readLock().unlock(); + } + } + // If batch refresh is enabled, the copy of bitmaps is kept updated and ready to use for a consistent view. + // The locking between query threads and upsert threads can be avoided when using batch refresh. + // Besides, queries can share the copy of bitmaps, w/o cloning the bitmaps by every single query. + // If query has specified a need for certain freshness, check the view and refresh it as needed. + // When refreshing the copy of map, we need to take the WLock so only one thread is refreshing view. + long upsertViewFreshnessMs = QueryOptionsUtils.getUpsertViewFreshnessMs(queryOptions); + doBatchRefreshUpsertView(upsertViewFreshnessMs); for (SegmentContext segmentContext : segmentContexts) { IndexSegment segment = segmentContext.getIndexSegment(); - if (_trackedSegments.contains(segment)) { - segmentContext.setQueryableDocIdsSnapshot(getQueryableDocIdsSnapshotFromSegment(segment)); + if (_segmentQueryableDocIdsMap.containsKey(segment)) { + segmentContext.setQueryableDocIdsSnapshot(_segmentQueryableDocIdsMap.get(segment)); } } } @@ -1081,6 +1188,48 @@ private static MutableRoaringBitmap getQueryableDocIdsSnapshotFromSegment(IndexS return queryableDocIdsSnapshot; } + private void setSegmentContexts(List segmentContexts) { + for (SegmentContext segmentContext : segmentContexts) { + IndexSegment segment = segmentContext.getIndexSegment(); + if (_trackedSegments.contains(segment)) { + segmentContext.setQueryableDocIdsSnapshot(getQueryableDocIdsSnapshotFromSegment(segment)); + } + } + } + + private boolean skipUpsertViewRefresh(long upsertViewFreshnessMs) { + long nowMs = System.currentTimeMillis(); + if (upsertViewFreshnessMs >= 0) { + return _lastUpsertViewRefreshTimeMs + upsertViewFreshnessMs > nowMs; + } + return _lastUpsertViewRefreshTimeMs + _upsertViewRefreshIntervalMs > nowMs; + } + + private void doBatchRefreshUpsertView(long upsertViewFreshnessMs) { + if (skipUpsertViewRefresh(upsertViewFreshnessMs)) { + return; + } + _upsertViewLock.writeLock().lock(); + try { + if (skipUpsertViewRefresh(upsertViewFreshnessMs)) { + return; + } + Map updated = new HashMap<>(); + for (IndexSegment segment : _trackedSegments) { + if (!_updatedSegmentsSinceLastRefresh.contains(segment)) { + continue; + } + updated.put(segment, getQueryableDocIdsSnapshotFromSegment(segment)); + } + // Swap in the new consistent set of bitmaps. + _segmentQueryableDocIdsMap = updated; + _updatedSegmentsSinceLastRefresh.clear(); + _lastUpsertViewRefreshTimeMs = System.currentTimeMillis(); + } finally { + _upsertViewLock.writeLock().unlock(); + } + } + protected void doClose() throws IOException { } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java index 2fbe4a28e512..00c24477c69e 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java @@ -68,19 +68,26 @@ public void init(TableConfig tableConfig, Schema schema, TableDataManager tableD boolean enablePreload = upsertConfig.isEnablePreload(); double metadataTTL = upsertConfig.getMetadataTTL(); double deletedKeysTTL = upsertConfig.getDeletedKeysTTL(); + boolean enableUpsertView = upsertConfig.isEnableUpsertView(); + boolean enableUpsertViewBatchRefresh = upsertConfig.isEnableUpsertViewBatchRefresh(); + long upsertViewRefreshIntervalMs = upsertConfig.getUpsertViewRefreshIntervalMs(); File tableIndexDir = tableDataManager.getTableDataDir(); _context = new UpsertContext.Builder().setTableConfig(tableConfig).setSchema(schema) .setPrimaryKeyColumns(primaryKeyColumns).setComparisonColumns(comparisonColumns) .setDeleteRecordColumn(deleteRecordColumn).setHashFunction(hashFunction) .setPartialUpsertHandler(partialUpsertHandler).setEnableSnapshot(enableSnapshot).setEnablePreload(enablePreload) - .setMetadataTTL(metadataTTL).setDeletedKeysTTL(deletedKeysTTL).setTableIndexDir(tableIndexDir) + .setMetadataTTL(metadataTTL).setDeletedKeysTTL(deletedKeysTTL).setEnableUpsertView(enableUpsertView) + .setEnableUpsertViewBatchRefresh(enableUpsertViewBatchRefresh) + .setUpsertViewRefreshIntervalMs(upsertViewRefreshIntervalMs).setTableIndexDir(tableIndexDir) .setTableDataManager(tableDataManager).build(); LOGGER.info( "Initialized {} for table: {} with primary key columns: {}, comparison columns: {}, delete record column: {}," + " hash function: {}, upsert mode: {}, enable snapshot: {}, enable preload: {}, metadata TTL: {}," - + " deleted Keys TTL: {}, table index dir: {}", getClass().getSimpleName(), _tableNameWithType, + + " deleted Keys TTL: {}, enable upsertView: {}, enable upsertView batchRefresh: {}, " + + "upsertView refresh interval: {} ms, table index dir: {}", getClass().getSimpleName(), _tableNameWithType, primaryKeyColumns, comparisonColumns, deleteRecordColumn, hashFunction, upsertConfig.getMode(), enableSnapshot, - enablePreload, metadataTTL, deletedKeysTTL, tableIndexDir); + enablePreload, metadataTTL, deletedKeysTTL, enableUpsertView, enableUpsertViewBatchRefresh, + upsertViewRefreshIntervalMs, tableIndexDir); initCustomVariables(); } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java index c65f101aca79..5980cbbacefe 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java @@ -93,7 +93,7 @@ protected void addOrReplaceSegment(ImmutableSegmentImpl segment, ThreadSafeMutab // iterator will return records with incremental doc ids. if (currentSegment == segment) { if (comparisonResult >= 0) { - replaceDocId(validDocIds, queryableDocIds, currentDocId, newDocId, recordInfo); + replaceDocId(segment, validDocIds, queryableDocIds, currentDocId, newDocId, recordInfo); return new RecordLocation(segment, newDocId, newComparisonValue); } else { return currentRecordLocation; @@ -108,7 +108,7 @@ protected void addOrReplaceSegment(ImmutableSegmentImpl segment, ThreadSafeMutab // snapshot for the old segment, which can be updated and used to track the docs not replaced yet. if (currentSegment == oldSegment) { if (comparisonResult >= 0) { - addDocId(validDocIds, queryableDocIds, newDocId, recordInfo); + addDocId(segment, validDocIds, queryableDocIds, newDocId, recordInfo); if (validDocIdsForOldSegment != null) { validDocIdsForOldSegment.remove(currentDocId); } @@ -124,7 +124,7 @@ protected void addOrReplaceSegment(ImmutableSegmentImpl segment, ThreadSafeMutab if (currentSegmentName.equals(segmentName)) { numKeysInWrongSegment.getAndIncrement(); if (comparisonResult >= 0) { - addDocId(validDocIds, queryableDocIds, newDocId, recordInfo); + addDocId(segment, validDocIds, queryableDocIds, newDocId, recordInfo); return new RecordLocation(segment, newDocId, newComparisonValue); } else { return currentRecordLocation; @@ -139,14 +139,14 @@ protected void addOrReplaceSegment(ImmutableSegmentImpl segment, ThreadSafeMutab && LLCSegmentName.isLLCSegment(currentSegmentName) && LLCSegmentName.getSequenceNumber(segmentName) > LLCSegmentName.getSequenceNumber( currentSegmentName))) { - replaceDocId(validDocIds, queryableDocIds, currentSegment, currentDocId, newDocId, recordInfo); + replaceDocId(segment, validDocIds, queryableDocIds, currentSegment, currentDocId, newDocId, recordInfo); return new RecordLocation(segment, newDocId, newComparisonValue); } else { return currentRecordLocation; } } else { // New primary key - addDocId(validDocIds, queryableDocIds, newDocId, recordInfo); + addDocId(segment, validDocIds, queryableDocIds, newDocId, recordInfo); return new RecordLocation(segment, newDocId, newComparisonValue); } }); @@ -166,7 +166,7 @@ protected void addSegmentWithoutUpsert(ImmutableSegmentImpl segment, ThreadSafeM RecordInfo recordInfo = recordInfoIterator.next(); int newDocId = recordInfo.getDocId(); Comparable newComparisonValue = recordInfo.getComparisonValue(); - addDocId(validDocIds, queryableDocIds, newDocId, recordInfo); + addDocId(segment, validDocIds, queryableDocIds, newDocId, recordInfo); _primaryKeyToRecordLocationMap.put(HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(), _hashFunction), new RecordLocation(segment, newDocId, newComparisonValue)); } @@ -275,9 +275,9 @@ protected boolean doAddRecord(MutableSegment segment, RecordInfo recordInfo) { IndexSegment currentSegment = currentRecordLocation.getSegment(); int currentDocId = currentRecordLocation.getDocId(); if (segment == currentSegment) { - replaceDocId(validDocIds, queryableDocIds, currentDocId, newDocId, recordInfo); + replaceDocId(segment, validDocIds, queryableDocIds, currentDocId, newDocId, recordInfo); } else { - replaceDocId(validDocIds, queryableDocIds, currentSegment, currentDocId, newDocId, recordInfo); + replaceDocId(segment, validDocIds, queryableDocIds, currentSegment, currentDocId, newDocId, recordInfo); } return new RecordLocation(segment, newDocId, newComparisonValue); } else { @@ -288,7 +288,7 @@ protected boolean doAddRecord(MutableSegment segment, RecordInfo recordInfo) { } } else { // New primary key - addDocId(validDocIds, queryableDocIds, newDocId, recordInfo); + addDocId(segment, validDocIds, queryableDocIds, newDocId, recordInfo); return new RecordLocation(segment, newDocId, newComparisonValue); } }); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java index 4e3e1684028c..4c2f995df8ca 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java @@ -58,9 +58,10 @@ public Map getPartitionToPrimaryKeyCount() { } @Override - public void setSegmentContexts(List segmentContexts) { + public void setSegmentContexts(List segmentContexts, Map queryOptions) { _partitionMetadataManagerMap.forEach( - (partitionID, upsertMetadataManager) -> upsertMetadataManager.setSegmentContexts(segmentContexts)); + (partitionID, upsertMetadataManager) -> upsertMetadataManager.setSegmentContexts(segmentContexts, + queryOptions)); } @Override diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java index 921c31dfa31a..db21f2030e22 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java @@ -53,6 +53,6 @@ public interface TableUpsertMetadataManager extends Closeable { */ Map getPartitionToPrimaryKeyCount(); - default void setSegmentContexts(List segmentContexts) { + default void setSegmentContexts(List segmentContexts, Map queryOptions) { } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertContext.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertContext.java index a45662d389ec..46bf16666aae 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertContext.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertContext.java @@ -41,13 +41,17 @@ public class UpsertContext { private final boolean _enablePreload; private final double _metadataTTL; private final double _deletedKeysTTL; + private final boolean _enableUpsertView; + private final boolean _enableUpsertViewBatchRefresh; + private final long _upsertViewRefreshIntervalMs; private final File _tableIndexDir; private final TableDataManager _tableDataManager; private UpsertContext(TableConfig tableConfig, Schema schema, List primaryKeyColumns, List comparisonColumns, @Nullable String deleteRecordColumn, HashFunction hashFunction, @Nullable PartialUpsertHandler partialUpsertHandler, boolean enableSnapshot, boolean enablePreload, - double metadataTTL, double deletedKeysTTL, File tableIndexDir, @Nullable TableDataManager tableDataManager) { + double metadataTTL, double deletedKeysTTL, boolean enableUpsertView, boolean enableUpsertViewBatchRefresh, + long upsertViewRefreshIntervalMs, File tableIndexDir, @Nullable TableDataManager tableDataManager) { _tableConfig = tableConfig; _schema = schema; _primaryKeyColumns = primaryKeyColumns; @@ -59,6 +63,9 @@ private UpsertContext(TableConfig tableConfig, Schema schema, List prima _enablePreload = enablePreload; _metadataTTL = metadataTTL; _deletedKeysTTL = deletedKeysTTL; + _enableUpsertView = enableUpsertView; + _enableUpsertViewBatchRefresh = enableUpsertViewBatchRefresh; + _upsertViewRefreshIntervalMs = upsertViewRefreshIntervalMs; _tableIndexDir = tableIndexDir; _tableDataManager = tableDataManager; } @@ -107,6 +114,18 @@ public double getDeletedKeysTTL() { return _deletedKeysTTL; } + public boolean isUpsertViewEnabled() { + return _enableUpsertView; + } + + public boolean isUpsertViewBatchRefreshEnabled() { + return _enableUpsertViewBatchRefresh; + } + + public long getUpsertViewRefreshIntervalMs() { + return _upsertViewRefreshIntervalMs; + } + public File getTableIndexDir() { return _tableIndexDir; } @@ -127,6 +146,9 @@ public static class Builder { private boolean _enablePreload; private double _metadataTTL; private double _deletedKeysTTL; + private boolean _enableUpsertView; + private boolean _enableUpsertViewBatchRefresh; + private long _upsertViewRefreshIntervalMs; private File _tableIndexDir; private TableDataManager _tableDataManager; @@ -185,6 +207,21 @@ public Builder setDeletedKeysTTL(double deletedKeysTTL) { return this; } + public Builder setEnableUpsertView(boolean enableUpsertView) { + _enableUpsertView = enableUpsertView; + return this; + } + + public Builder setEnableUpsertViewBatchRefresh(boolean enableUpsertViewBatchRefresh) { + _enableUpsertViewBatchRefresh = enableUpsertViewBatchRefresh; + return this; + } + + public Builder setUpsertViewRefreshIntervalMs(long upsertViewRefreshIntervalMs) { + _upsertViewRefreshIntervalMs = upsertViewRefreshIntervalMs; + return this; + } + public Builder setTableIndexDir(File tableIndexDir) { _tableIndexDir = tableIndexDir; return this; @@ -204,7 +241,8 @@ public UpsertContext build() { Preconditions.checkState(_tableIndexDir != null, "Table index directory must be set"); return new UpsertContext(_tableConfig, _schema, _primaryKeyColumns, _comparisonColumns, _deleteRecordColumn, _hashFunction, _partialUpsertHandler, _enableSnapshot, _enablePreload, _metadataTTL, _deletedKeysTTL, - _tableIndexDir, _tableDataManager); + _enableUpsertView, _enableUpsertViewBatchRefresh, _upsertViewRefreshIntervalMs, _tableIndexDir, + _tableDataManager); } } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java index 848c99199963..656164f11008 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java @@ -75,6 +75,15 @@ public enum Strategy { @JsonPropertyDescription("Whether to preload segments for fast upsert metadata recovery") private boolean _enablePreload; + @JsonPropertyDescription("Whether to enable consistent view for upsert table") + private boolean _enableUpsertView; + + @JsonPropertyDescription("Whether to enable batch refresh mode to keep consistent view for upsert table") + private boolean _enableUpsertViewBatchRefresh; + + @JsonPropertyDescription("Refresh interval if using batch refresh mode to keep consistent view for upsert table") + private long _upsertViewRefreshIntervalMs; + @JsonPropertyDescription("Custom class for upsert metadata manager") private String _metadataManagerClass; @@ -147,6 +156,18 @@ public boolean isEnablePreload() { return _enablePreload; } + public boolean isEnableUpsertView() { + return _enableUpsertView; + } + + public boolean isEnableUpsertViewBatchRefresh() { + return _enableUpsertViewBatchRefresh; + } + + public long getUpsertViewRefreshIntervalMs() { + return _upsertViewRefreshIntervalMs; + } + public boolean isDropOutOfOrderRecord() { return _dropOutOfOrderRecord; } @@ -237,6 +258,18 @@ public void setEnablePreload(boolean enablePreload) { _enablePreload = enablePreload; } + public void setEnableUpsertView(boolean enableUpsertView) { + _enableUpsertView = enableUpsertView; + } + + public void setEnableUpsertViewBatchRefresh(boolean enableUpsertViewBatchRefresh) { + _enableUpsertViewBatchRefresh = enableUpsertViewBatchRefresh; + } + + public void setUpsertViewRefreshIntervalMs(long upsertViewRefreshIntervalMs) { + _upsertViewRefreshIntervalMs = upsertViewRefreshIntervalMs; + } + public void setDropOutOfOrderRecord(boolean dropOutOfOrderRecord) { _dropOutOfOrderRecord = dropOutOfOrderRecord; } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index c02423d4b619..c61101d336ff 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -352,6 +352,7 @@ public static class Request { public static class QueryOptionKey { public static final String TIMEOUT_MS = "timeoutMs"; public static final String SKIP_UPSERT = "skipUpsert"; + public static final String UPSERT_VIEW_FRESHNESS_MS = "upsertViewFreshnessMs"; public static final String USE_STAR_TREE = "useStarTree"; public static final String SCAN_STAR_TREE_NODES = "scanStarTreeNodes"; public static final String ROUTING_OPTIONS = "routingOptions"; From 7482b72d7ef523be7af1ca03bfefc032c2f27540 Mon Sep 17 00:00:00 2001 From: Xiaobing Li Date: Fri, 10 May 2024 16:58:22 -0700 Subject: [PATCH 2/6] add tests --- .../BasePartitionUpsertMetadataManager.java | 22 +- .../BaseTableUpsertMetadataManager.java | 10 +- ...asePartitionUpsertMetadataManagerTest.java | 211 +++++++++++++++++- .../pinot/spi/config/table/UpsertConfig.java | 2 +- 4 files changed, 228 insertions(+), 17 deletions(-) diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java index acd20b0a9828..44c2a6969a7c 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java @@ -135,7 +135,7 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps private final ReadWriteLock _upsertViewLock = new ReentrantReadWriteLock(); private final Set _updatedSegmentsSinceLastRefresh = ConcurrentHashMap.newKeySet(); private volatile long _lastUpsertViewRefreshTimeMs = 0; - private volatile Map _segmentQueryableDocIdsMap = new HashMap<>(); + private volatile Map _segmentQueryableDocIdsMap; protected BasePartitionUpsertMetadataManager(String tableNameWithType, int partitionId, UpsertContext context) { _tableNameWithType = tableNameWithType; @@ -1102,7 +1102,9 @@ protected void addDocId(IndexSegment segment, ThreadSafeMutableRoaringBitmap val if (_enableUpsertViewBatchRefresh) { _updatedSegmentsSinceLastRefresh.add(segment); _upsertViewLock.readLock().unlock(); - // Do batch refresh outside the RLock block. + // Batch refresh takes WLock, so do it outside RLock for clarity. The R/W lock ensures that only one thread + // can refresh the bitmaps. The other threads that are about to update the bitmaps will be blocked until + // refreshing is done. doBatchRefreshUpsertView(_upsertViewRefreshIntervalMs); } } @@ -1126,7 +1128,7 @@ protected void removeDocId(IndexSegment segment, int docId) { if (_enableUpsertViewBatchRefresh) { _updatedSegmentsSinceLastRefresh.add(segment); _upsertViewLock.readLock().unlock(); - // Do batch refresh outside the RLock block. + // Batch refresh takes WLock, so do it outside RLock for clarity. doBatchRefreshUpsertView(_upsertViewRefreshIntervalMs); } } @@ -1206,20 +1208,24 @@ private boolean skipUpsertViewRefresh(long upsertViewFreshnessMs) { } private void doBatchRefreshUpsertView(long upsertViewFreshnessMs) { - if (skipUpsertViewRefresh(upsertViewFreshnessMs)) { + // Always refresh if the current view is still empty. + if (skipUpsertViewRefresh(upsertViewFreshnessMs) && _segmentQueryableDocIdsMap != null) { return; } _upsertViewLock.writeLock().lock(); try { - if (skipUpsertViewRefresh(upsertViewFreshnessMs)) { + // Check again with lock, and always refresh if the current view is still empty. + Map current = _segmentQueryableDocIdsMap; + if (skipUpsertViewRefresh(upsertViewFreshnessMs) && current != null) { return; } Map updated = new HashMap<>(); for (IndexSegment segment : _trackedSegments) { - if (!_updatedSegmentsSinceLastRefresh.contains(segment)) { - continue; + if (current == null || _updatedSegmentsSinceLastRefresh.contains(segment)) { + updated.put(segment, getQueryableDocIdsSnapshotFromSegment(segment)); + } else { + updated.put(segment, current.get(segment)); } - updated.put(segment, getQueryableDocIdsSnapshotFromSegment(segment)); } // Swap in the new consistent set of bitmaps. _segmentQueryableDocIdsMap = updated; diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java index 00c24477c69e..9909917ef039 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java @@ -83,11 +83,11 @@ public void init(TableConfig tableConfig, Schema schema, TableDataManager tableD LOGGER.info( "Initialized {} for table: {} with primary key columns: {}, comparison columns: {}, delete record column: {}," + " hash function: {}, upsert mode: {}, enable snapshot: {}, enable preload: {}, metadata TTL: {}," - + " deleted Keys TTL: {}, enable upsertView: {}, enable upsertView batchRefresh: {}, " - + "upsertView refresh interval: {} ms, table index dir: {}", getClass().getSimpleName(), _tableNameWithType, - primaryKeyColumns, comparisonColumns, deleteRecordColumn, hashFunction, upsertConfig.getMode(), enableSnapshot, - enablePreload, metadataTTL, deletedKeysTTL, enableUpsertView, enableUpsertViewBatchRefresh, - upsertViewRefreshIntervalMs, tableIndexDir); + + " deleted Keys TTL: {}, enable upsertView: {}, enable upsertView batchRefresh: {}," + + " upsertView refresh interval: {} ms, table index dir: {}", getClass().getSimpleName(), + _tableNameWithType, primaryKeyColumns, comparisonColumns, deleteRecordColumn, hashFunction, + upsertConfig.getMode(), enableSnapshot, enablePreload, metadataTTL, deletedKeysTTL, enableUpsertView, + enableUpsertViewBatchRefresh, upsertViewRefreshIntervalMs, tableIndexDir); initCustomVariables(); } diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java index 3cb16b52b637..a75515f5bdfe 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java @@ -30,8 +30,10 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.Nullable; import org.apache.commons.io.FileUtils; @@ -46,6 +48,7 @@ import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; import org.apache.pinot.segment.spi.IndexSegment; import org.apache.pinot.segment.spi.MutableSegment; +import org.apache.pinot.segment.spi.SegmentContext; import org.apache.pinot.segment.spi.V1Constants; import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; import org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap; @@ -65,9 +68,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertFalse; -import static org.testng.Assert.assertTrue; +import static org.testng.Assert.*; public class BasePartitionUpsertMetadataManagerTest { @@ -242,6 +243,210 @@ public void testTakeSnapshotInOrder() assertEquals(seg03.loadValidDocIdsFromSnapshot().getCardinality(), 3); } + @Test + public void testEnableUpsertView() + throws Exception { + UpsertContext upsertContext = mock(UpsertContext.class); + when(upsertContext.isUpsertViewEnabled()).thenReturn(true); + DummyPartitionUpsertMetadataManager upsertMetadataManager = + new DummyPartitionUpsertMetadataManager("myTable", 0, upsertContext); + + CountDownLatch latch = new CountDownLatch(1); + Map segmentQueryableDocIdsMap = new HashMap<>(); + IndexSegment seg01 = mock(IndexSegment.class); + ThreadSafeMutableRoaringBitmap validDocIds01 = createThreadSafeMutableRoaringBitmap(10); + AtomicBoolean called = new AtomicBoolean(false); + when(seg01.getValidDocIds()).then(invocationOnMock -> { + called.set(true); + latch.await(); + return validDocIds01; + }); + upsertMetadataManager.trackSegment(seg01); + segmentQueryableDocIdsMap.put(seg01, validDocIds01); + + IndexSegment seg02 = mock(IndexSegment.class); + ThreadSafeMutableRoaringBitmap validDocIds02 = createThreadSafeMutableRoaringBitmap(11); + when(seg02.getValidDocIds()).thenReturn(validDocIds02); + upsertMetadataManager.trackSegment(seg02); + segmentQueryableDocIdsMap.put(seg02, validDocIds02); + + IndexSegment seg03 = mock(IndexSegment.class); + ThreadSafeMutableRoaringBitmap validDocIds03 = createThreadSafeMutableRoaringBitmap(12); + when(seg03.getValidDocIds()).thenReturn(validDocIds03); + upsertMetadataManager.trackSegment(seg03); + segmentQueryableDocIdsMap.put(seg03, validDocIds03); + + List segmentContexts = new ArrayList<>(); + ExecutorService executor = Executors.newFixedThreadPool(2); + try { + // This thread does replaceDocId and takes WLock first. + executor.submit(() -> { + RecordInfo recordInfo = new RecordInfo(null, 5, null, false); + upsertMetadataManager.replaceDocId(seg03, validDocIds03, null, seg01, 0, 12, recordInfo); + }); + // This thread gets segment contexts, but it's blocked until the first thread finishes replaceDocId. + Future future = executor.submit(() -> { + long startMs = System.currentTimeMillis(); + // Check called flag to let the first thread do replaceDocId, thus get WLock, first. + while (!called.get()) { + Thread.sleep(10); + } + segmentQueryableDocIdsMap.forEach((k, v) -> segmentContexts.add(new SegmentContext(k))); + upsertMetadataManager.setSegmentContexts(segmentContexts, new HashMap<>()); + return System.currentTimeMillis() - startMs; + }); + // The first thread can only finish after the latch is counted down after 2s. + // So the 2nd thread getting segment contexts will be blocked for 2s+. + Thread.sleep(2000); + latch.countDown(); + long duration = future.get(); + assertTrue(duration >= 2000, duration + " was less than expected"); + } finally { + executor.shutdownNow(); + } + + for (SegmentContext sc : segmentContexts) { + ThreadSafeMutableRoaringBitmap validDocIds = segmentQueryableDocIdsMap.get(sc.getIndexSegment()); + assertNotNull(validDocIds); + // SegmentContext holds a clone of the original queryableDocIds held by the segment object. + assertNotSame(sc.getQueryableDocIdsSnapshot(), validDocIds.getMutableRoaringBitmap()); + assertEquals(sc.getQueryableDocIdsSnapshot(), validDocIds.getMutableRoaringBitmap()); + // docId=0 in seg01 got invalidated. + if (sc.getIndexSegment() == seg01) { + assertFalse(sc.getQueryableDocIdsSnapshot().contains(0)); + } + // docId=12 in seg03 was newly added. + if (sc.getIndexSegment() == seg03) { + assertTrue(sc.getQueryableDocIdsSnapshot().contains(12)); + } + } + } + + @Test + public void testEnableUpsertViewBatchRefresh() + throws Exception { + UpsertContext upsertContext = mock(UpsertContext.class); + when(upsertContext.isUpsertViewEnabled()).thenReturn(true); + when(upsertContext.isUpsertViewBatchRefreshEnabled()).thenReturn(true); + when(upsertContext.getUpsertViewRefreshIntervalMs()).thenReturn(3000L); + DummyPartitionUpsertMetadataManager upsertMetadataManager = + new DummyPartitionUpsertMetadataManager("myTable", 0, upsertContext); + + CountDownLatch latch = new CountDownLatch(1); + Map segmentQueryableDocIdsMap = new HashMap<>(); + IndexSegment seg01 = mock(IndexSegment.class); + ThreadSafeMutableRoaringBitmap validDocIds01 = createThreadSafeMutableRoaringBitmap(10); + AtomicBoolean called = new AtomicBoolean(false); + when(seg01.getValidDocIds()).then(invocationOnMock -> { + called.set(true); + latch.await(); + return validDocIds01; + }); + upsertMetadataManager.trackSegment(seg01); + segmentQueryableDocIdsMap.put(seg01, validDocIds01); + + IndexSegment seg02 = mock(IndexSegment.class); + ThreadSafeMutableRoaringBitmap validDocIds02 = createThreadSafeMutableRoaringBitmap(11); + when(seg02.getValidDocIds()).thenReturn(validDocIds02); + upsertMetadataManager.trackSegment(seg02); + segmentQueryableDocIdsMap.put(seg02, validDocIds02); + + IndexSegment seg03 = mock(IndexSegment.class); + ThreadSafeMutableRoaringBitmap validDocIds03 = createThreadSafeMutableRoaringBitmap(12); + when(seg03.getValidDocIds()).thenReturn(validDocIds03); + upsertMetadataManager.trackSegment(seg03); + segmentQueryableDocIdsMap.put(seg03, validDocIds03); + + List segmentContexts = new ArrayList<>(); + ExecutorService executor = Executors.newFixedThreadPool(2); + try { + // This thread does replaceDocId and takes WLock first, and it'll refresh the upsert view + executor.submit(() -> { + RecordInfo recordInfo = new RecordInfo(null, 5, null, false); + upsertMetadataManager.replaceDocId(seg03, validDocIds03, null, seg01, 0, 12, recordInfo); + }); + // This thread gets segment contexts, but it's blocked until the first thread finishes replaceDocId. + Future future = executor.submit(() -> { + long startMs = System.currentTimeMillis(); + // Check called flag to let the first thread do replaceDocId, thus get WLock, first. + while (!called.get()) { + Thread.sleep(10); + } + segmentQueryableDocIdsMap.forEach((k, v) -> segmentContexts.add(new SegmentContext(k))); + // This thread reuses the upsert view refreshed by the first thread above. + upsertMetadataManager.setSegmentContexts(segmentContexts, new HashMap<>()); + return System.currentTimeMillis() - startMs; + }); + // The first thread can only finish after the latch is counted down after 2s. + // So the 2nd thread getting segment contexts will be blocked for 2s+. + Thread.sleep(2000); + latch.countDown(); + long duration = future.get(); + assertTrue(duration >= 2000, duration + " was less than expected"); + } finally { + executor.shutdownNow(); + } + + // Get the upsert view again, and the existing bitmap objects should be set in segment contexts. + // The segmentContexts initialized above holds the same bitmaps objects as from the upsert view. + List reuseSegmentContexts = new ArrayList<>(); + upsertMetadataManager.setSegmentContexts(reuseSegmentContexts, new HashMap<>()); + for (SegmentContext reuseSC : reuseSegmentContexts) { + for (SegmentContext sc : segmentContexts) { + if (reuseSC.getIndexSegment() == sc.getIndexSegment()) { + assertSame(reuseSC.getQueryableDocIdsSnapshot(), sc.getQueryableDocIdsSnapshot()); + } + } + ThreadSafeMutableRoaringBitmap validDocIds = segmentQueryableDocIdsMap.get(reuseSC.getIndexSegment()); + assertNotNull(validDocIds); + // The upsert view holds a clone of the original queryableDocIds held by the segment object. + assertNotSame(reuseSC.getQueryableDocIdsSnapshot(), validDocIds.getMutableRoaringBitmap()); + assertEquals(reuseSC.getQueryableDocIdsSnapshot(), validDocIds.getMutableRoaringBitmap()); + // docId=0 in seg01 got invalidated. + if (reuseSC.getIndexSegment() == seg01) { + assertFalse(reuseSC.getQueryableDocIdsSnapshot().contains(0)); + } + // docId=12 in seg03 was newly added. + if (reuseSC.getIndexSegment() == seg03) { + assertTrue(reuseSC.getQueryableDocIdsSnapshot().contains(12)); + } + } + + // Force refresh the upsert view when getting it, so different bitmap objects should be set in segment contexts. + List refreshSegmentContexts = new ArrayList<>(); + Map queryOptions = new HashMap<>(); + queryOptions.put("upsertViewFreshnessMs", "0"); + upsertMetadataManager.setSegmentContexts(refreshSegmentContexts, queryOptions); + for (SegmentContext refreshSC : refreshSegmentContexts) { + for (SegmentContext sc : segmentContexts) { + if (refreshSC.getIndexSegment() == sc.getIndexSegment()) { + assertNotSame(refreshSC.getQueryableDocIdsSnapshot(), sc.getQueryableDocIdsSnapshot()); + } + } + ThreadSafeMutableRoaringBitmap validDocIds = segmentQueryableDocIdsMap.get(refreshSC.getIndexSegment()); + assertNotNull(validDocIds); + // The upsert view holds a clone of the original queryableDocIds held by the segment object. + assertNotSame(refreshSC.getQueryableDocIdsSnapshot(), validDocIds.getMutableRoaringBitmap()); + assertEquals(refreshSC.getQueryableDocIdsSnapshot(), validDocIds.getMutableRoaringBitmap()); + // docId=0 in seg01 got invalidated. + if (refreshSC.getIndexSegment() == seg01) { + assertFalse(refreshSC.getQueryableDocIdsSnapshot().contains(0)); + } + // docId=12 in seg03 was newly added. + if (refreshSC.getIndexSegment() == seg03) { + assertTrue(refreshSC.getQueryableDocIdsSnapshot().contains(12)); + } + } + } + + private static ThreadSafeMutableRoaringBitmap createThreadSafeMutableRoaringBitmap(int docCnt) { + ThreadSafeMutableRoaringBitmap bitmap = new ThreadSafeMutableRoaringBitmap(); + for (int i = 0; i < docCnt; i++) { + bitmap.add(i); + } + return bitmap; + } + @Test public void testResolveComparisonTies() { // Build a record info list for testing diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java index 656164f11008..fb073fbcd11d 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java @@ -82,7 +82,7 @@ public enum Strategy { private boolean _enableUpsertViewBatchRefresh; @JsonPropertyDescription("Refresh interval if using batch refresh mode to keep consistent view for upsert table") - private long _upsertViewRefreshIntervalMs; + private long _upsertViewRefreshIntervalMs = 3000; @JsonPropertyDescription("Custom class for upsert metadata manager") private String _metadataManagerClass; From 8a6c1ea6c3653e97800bcf1ab93d7b2016520bb0 Mon Sep 17 00:00:00 2001 From: Xiaobing Li Date: Wed, 15 May 2024 14:23:34 -0700 Subject: [PATCH 3/6] add upsert config ConsistencyMode as the feature flag --- .../BasePartitionUpsertMetadataManager.java | 56 ++++++++++--------- .../BaseTableUpsertMetadataManager.java | 15 ++--- .../segment/local/upsert/UpsertContext.java | 32 ++++------- ...asePartitionUpsertMetadataManagerTest.java | 9 ++- .../pinot/spi/config/table/UpsertConfig.java | 29 ++++------ 5 files changed, 60 insertions(+), 81 deletions(-) diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java index 44c2a6969a7c..6564be3dff99 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java @@ -72,6 +72,7 @@ import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths; import org.apache.pinot.spi.config.table.HashFunction; import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.UpsertConfig; import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.utils.BooleanUtils; import org.apache.pinot.spi.utils.CommonConstants; @@ -123,14 +124,14 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps private final Lock _preloadLock = new ReentrantLock(); private volatile boolean _isPreloading; - // Below are configs to enable consistent upsert view. - // If enabling upsert view, the upsert threads take the WLock when the upsert involves two segments' bitmaps; and + // There are two consistency modes: + // If using LOCK mode, the upsert threads take the WLock when the upsert involves two segments' bitmaps; and // the query threads take the RLock when getting bitmaps for all its selected segments. - // If enabling block refresh, the query threads don't need to take lock when getting bitmaps for all its selected + // If using SNAPSHOT mode, the query threads don't need to take lock when getting bitmaps for all its selected // segments, as the query threads access a copy of bitmaps that are kept updated by upsert thread periodically. But // the query thread can specify a freshness threshold query option to refresh the bitmap copies if not fresh enough. - private final boolean _enableUpsertView; - private final boolean _enableUpsertViewBatchRefresh; + // By default, the mode is NONE to disable the support for data consistency. + private final UpsertConfig.ConsistencyMode _consistencyMode; private final long _upsertViewRefreshIntervalMs; private final ReadWriteLock _upsertViewLock = new ReentrantReadWriteLock(); private final Set _updatedSegmentsSinceLastRefresh = ConcurrentHashMap.newKeySet(); @@ -152,8 +153,8 @@ protected BasePartitionUpsertMetadataManager(String tableNameWithType, int parti _metadataTTL = context.getMetadataTTL(); _deletedKeysTTL = context.getDeletedKeysTTL(); _tableIndexDir = context.getTableIndexDir(); - _enableUpsertView = context.isUpsertViewEnabled(); - _enableUpsertViewBatchRefresh = _enableUpsertView && context.isUpsertViewBatchRefreshEnabled(); + UpsertConfig.ConsistencyMode cmode = context.getConsistencyMode(); + _consistencyMode = cmode != null ? cmode : UpsertConfig.ConsistencyMode.NONE; _upsertViewRefreshIntervalMs = context.getUpsertViewRefreshIntervalMs(); _serverMetrics = ServerMetrics.get(); _logger = LoggerFactory.getLogger(tableNameWithType + "-" + partitionId + "-" + getClass().getSimpleName()); @@ -1043,19 +1044,21 @@ public synchronized void close() protected void replaceDocId(IndexSegment newSegment, ThreadSafeMutableRoaringBitmap validDocIds, @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, IndexSegment oldSegment, int oldDocId, int newDocId, RecordInfo recordInfo) { - if (_enableUpsertView) { + // For SNAPSHOT consistency mode, we can use RLock here. But for simplicity and considering there is only one + // thread doing upsert most of the time, we just use WLock, as required by LOCK mode. + if (_consistencyMode != UpsertConfig.ConsistencyMode.NONE) { _upsertViewLock.writeLock().lock(); } try { doRemoveDocId(oldSegment, oldDocId); doAddDocId(validDocIds, queryableDocIds, newDocId, recordInfo); - if (_enableUpsertViewBatchRefresh) { + if (_consistencyMode == UpsertConfig.ConsistencyMode.SNAPSHOT) { _updatedSegmentsSinceLastRefresh.add(newSegment); _updatedSegmentsSinceLastRefresh.add(oldSegment); doBatchRefreshUpsertView(_upsertViewRefreshIntervalMs); } } finally { - if (_enableUpsertView) { + if (_consistencyMode != UpsertConfig.ConsistencyMode.NONE) { _upsertViewLock.writeLock().unlock(); } } @@ -1069,7 +1072,7 @@ protected void replaceDocId(IndexSegment newSegment, ThreadSafeMutableRoaringBit */ protected void replaceDocId(IndexSegment segment, ThreadSafeMutableRoaringBitmap validDocIds, @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, int oldDocId, int newDocId, RecordInfo recordInfo) { - if (_enableUpsertViewBatchRefresh) { + if (_consistencyMode == UpsertConfig.ConsistencyMode.SNAPSHOT) { _upsertViewLock.readLock().lock(); } try { @@ -1082,10 +1085,12 @@ protected void replaceDocId(IndexSegment segment, ThreadSafeMutableRoaringBitmap } } } finally { - if (_enableUpsertViewBatchRefresh) { + if (_consistencyMode == UpsertConfig.ConsistencyMode.SNAPSHOT) { _updatedSegmentsSinceLastRefresh.add(segment); _upsertViewLock.readLock().unlock(); - // Do batch refresh outside the RLock block. + // Batch refresh takes WLock. Do it outside RLock for clarity. The R/W lock ensures that only one thread + // can refresh the bitmaps. The other threads that are about to update the bitmaps will be blocked until + // refreshing is done. doBatchRefreshUpsertView(_upsertViewRefreshIntervalMs); } } @@ -1093,18 +1098,16 @@ protected void replaceDocId(IndexSegment segment, ThreadSafeMutableRoaringBitmap protected void addDocId(IndexSegment segment, ThreadSafeMutableRoaringBitmap validDocIds, @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, int docId, RecordInfo recordInfo) { - if (_enableUpsertViewBatchRefresh) { + if (_consistencyMode == UpsertConfig.ConsistencyMode.SNAPSHOT) { _upsertViewLock.readLock().lock(); } try { doAddDocId(validDocIds, queryableDocIds, docId, recordInfo); } finally { - if (_enableUpsertViewBatchRefresh) { + if (_consistencyMode == UpsertConfig.ConsistencyMode.SNAPSHOT) { _updatedSegmentsSinceLastRefresh.add(segment); _upsertViewLock.readLock().unlock(); - // Batch refresh takes WLock, so do it outside RLock for clarity. The R/W lock ensures that only one thread - // can refresh the bitmaps. The other threads that are about to update the bitmaps will be blocked until - // refreshing is done. + // Batch refresh takes WLock. Do it outside RLock for clarity. doBatchRefreshUpsertView(_upsertViewRefreshIntervalMs); } } @@ -1119,16 +1122,16 @@ private void doAddDocId(ThreadSafeMutableRoaringBitmap validDocIds, ThreadSafeMu } protected void removeDocId(IndexSegment segment, int docId) { - if (_enableUpsertViewBatchRefresh) { + if (_consistencyMode == UpsertConfig.ConsistencyMode.SNAPSHOT) { _upsertViewLock.readLock().lock(); } try { doRemoveDocId(segment, docId); } finally { - if (_enableUpsertViewBatchRefresh) { + if (_consistencyMode == UpsertConfig.ConsistencyMode.SNAPSHOT) { _updatedSegmentsSinceLastRefresh.add(segment); _upsertViewLock.readLock().unlock(); - // Batch refresh takes WLock, so do it outside RLock for clarity. + // Batch refresh takes WLock. Do it outside RLock for clarity. doBatchRefreshUpsertView(_upsertViewRefreshIntervalMs); } } @@ -1147,12 +1150,11 @@ private void doRemoveDocId(IndexSegment segment, int docId) { * present, to avoid overwriting the contexts specified at the others places. */ public void setSegmentContexts(List segmentContexts, Map queryOptions) { - if (!_enableUpsertView) { + if (_consistencyMode == UpsertConfig.ConsistencyMode.NONE) { setSegmentContexts(segmentContexts); return; } - // If not using batch refresh, get the bitmaps from segments directly but with RLock for consistency. - if (!_enableUpsertViewBatchRefresh) { + if (_consistencyMode == UpsertConfig.ConsistencyMode.LOCK) { _upsertViewLock.readLock().lock(); try { setSegmentContexts(segmentContexts); @@ -1201,10 +1203,10 @@ private void setSegmentContexts(List segmentContexts) { private boolean skipUpsertViewRefresh(long upsertViewFreshnessMs) { long nowMs = System.currentTimeMillis(); - if (upsertViewFreshnessMs >= 0) { - return _lastUpsertViewRefreshTimeMs + upsertViewFreshnessMs > nowMs; + if (upsertViewFreshnessMs < 0) { + return true; } - return _lastUpsertViewRefreshTimeMs + _upsertViewRefreshIntervalMs > nowMs; + return _lastUpsertViewRefreshTimeMs + upsertViewFreshnessMs > nowMs; } private void doBatchRefreshUpsertView(long upsertViewFreshnessMs) { diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java index 9909917ef039..ea5aba5d2124 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java @@ -68,26 +68,23 @@ public void init(TableConfig tableConfig, Schema schema, TableDataManager tableD boolean enablePreload = upsertConfig.isEnablePreload(); double metadataTTL = upsertConfig.getMetadataTTL(); double deletedKeysTTL = upsertConfig.getDeletedKeysTTL(); - boolean enableUpsertView = upsertConfig.isEnableUpsertView(); - boolean enableUpsertViewBatchRefresh = upsertConfig.isEnableUpsertViewBatchRefresh(); + UpsertConfig.ConsistencyMode consistencyMode = upsertConfig.getConsistencyMode(); long upsertViewRefreshIntervalMs = upsertConfig.getUpsertViewRefreshIntervalMs(); File tableIndexDir = tableDataManager.getTableDataDir(); _context = new UpsertContext.Builder().setTableConfig(tableConfig).setSchema(schema) .setPrimaryKeyColumns(primaryKeyColumns).setComparisonColumns(comparisonColumns) .setDeleteRecordColumn(deleteRecordColumn).setHashFunction(hashFunction) .setPartialUpsertHandler(partialUpsertHandler).setEnableSnapshot(enableSnapshot).setEnablePreload(enablePreload) - .setMetadataTTL(metadataTTL).setDeletedKeysTTL(deletedKeysTTL).setEnableUpsertView(enableUpsertView) - .setEnableUpsertViewBatchRefresh(enableUpsertViewBatchRefresh) + .setMetadataTTL(metadataTTL).setDeletedKeysTTL(deletedKeysTTL).setConsistencyMode(consistencyMode) .setUpsertViewRefreshIntervalMs(upsertViewRefreshIntervalMs).setTableIndexDir(tableIndexDir) .setTableDataManager(tableDataManager).build(); LOGGER.info( "Initialized {} for table: {} with primary key columns: {}, comparison columns: {}, delete record column: {}," + " hash function: {}, upsert mode: {}, enable snapshot: {}, enable preload: {}, metadata TTL: {}," - + " deleted Keys TTL: {}, enable upsertView: {}, enable upsertView batchRefresh: {}," - + " upsertView refresh interval: {} ms, table index dir: {}", getClass().getSimpleName(), - _tableNameWithType, primaryKeyColumns, comparisonColumns, deleteRecordColumn, hashFunction, - upsertConfig.getMode(), enableSnapshot, enablePreload, metadataTTL, deletedKeysTTL, enableUpsertView, - enableUpsertViewBatchRefresh, upsertViewRefreshIntervalMs, tableIndexDir); + + " deleted Keys TTL: {}, consistency mode: {}, upsert view refresh interval: {}ms, table index dir: {}", + getClass().getSimpleName(), _tableNameWithType, primaryKeyColumns, comparisonColumns, deleteRecordColumn, + hashFunction, upsertConfig.getMode(), enableSnapshot, enablePreload, metadataTTL, deletedKeysTTL, + consistencyMode, upsertViewRefreshIntervalMs, tableIndexDir); initCustomVariables(); } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertContext.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertContext.java index 46bf16666aae..32cf34343b2a 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertContext.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertContext.java @@ -26,6 +26,7 @@ import org.apache.pinot.segment.local.data.manager.TableDataManager; import org.apache.pinot.spi.config.table.HashFunction; import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.UpsertConfig; import org.apache.pinot.spi.data.Schema; @@ -41,8 +42,7 @@ public class UpsertContext { private final boolean _enablePreload; private final double _metadataTTL; private final double _deletedKeysTTL; - private final boolean _enableUpsertView; - private final boolean _enableUpsertViewBatchRefresh; + private final UpsertConfig.ConsistencyMode _consistencyMode; private final long _upsertViewRefreshIntervalMs; private final File _tableIndexDir; private final TableDataManager _tableDataManager; @@ -50,7 +50,7 @@ public class UpsertContext { private UpsertContext(TableConfig tableConfig, Schema schema, List primaryKeyColumns, List comparisonColumns, @Nullable String deleteRecordColumn, HashFunction hashFunction, @Nullable PartialUpsertHandler partialUpsertHandler, boolean enableSnapshot, boolean enablePreload, - double metadataTTL, double deletedKeysTTL, boolean enableUpsertView, boolean enableUpsertViewBatchRefresh, + double metadataTTL, double deletedKeysTTL, UpsertConfig.ConsistencyMode consistencyMode, long upsertViewRefreshIntervalMs, File tableIndexDir, @Nullable TableDataManager tableDataManager) { _tableConfig = tableConfig; _schema = schema; @@ -63,8 +63,7 @@ private UpsertContext(TableConfig tableConfig, Schema schema, List prima _enablePreload = enablePreload; _metadataTTL = metadataTTL; _deletedKeysTTL = deletedKeysTTL; - _enableUpsertView = enableUpsertView; - _enableUpsertViewBatchRefresh = enableUpsertViewBatchRefresh; + _consistencyMode = consistencyMode; _upsertViewRefreshIntervalMs = upsertViewRefreshIntervalMs; _tableIndexDir = tableIndexDir; _tableDataManager = tableDataManager; @@ -114,12 +113,8 @@ public double getDeletedKeysTTL() { return _deletedKeysTTL; } - public boolean isUpsertViewEnabled() { - return _enableUpsertView; - } - - public boolean isUpsertViewBatchRefreshEnabled() { - return _enableUpsertViewBatchRefresh; + public UpsertConfig.ConsistencyMode getConsistencyMode() { + return _consistencyMode; } public long getUpsertViewRefreshIntervalMs() { @@ -146,8 +141,7 @@ public static class Builder { private boolean _enablePreload; private double _metadataTTL; private double _deletedKeysTTL; - private boolean _enableUpsertView; - private boolean _enableUpsertViewBatchRefresh; + private UpsertConfig.ConsistencyMode _consistencyMode; private long _upsertViewRefreshIntervalMs; private File _tableIndexDir; private TableDataManager _tableDataManager; @@ -207,13 +201,8 @@ public Builder setDeletedKeysTTL(double deletedKeysTTL) { return this; } - public Builder setEnableUpsertView(boolean enableUpsertView) { - _enableUpsertView = enableUpsertView; - return this; - } - - public Builder setEnableUpsertViewBatchRefresh(boolean enableUpsertViewBatchRefresh) { - _enableUpsertViewBatchRefresh = enableUpsertViewBatchRefresh; + public Builder setConsistencyMode(UpsertConfig.ConsistencyMode consistencyMode) { + _consistencyMode = consistencyMode; return this; } @@ -241,8 +230,7 @@ public UpsertContext build() { Preconditions.checkState(_tableIndexDir != null, "Table index directory must be set"); return new UpsertContext(_tableConfig, _schema, _primaryKeyColumns, _comparisonColumns, _deleteRecordColumn, _hashFunction, _partialUpsertHandler, _enableSnapshot, _enablePreload, _metadataTTL, _deletedKeysTTL, - _enableUpsertView, _enableUpsertViewBatchRefresh, _upsertViewRefreshIntervalMs, _tableIndexDir, - _tableDataManager); + _consistencyMode, _upsertViewRefreshIntervalMs, _tableIndexDir, _tableDataManager); } } } diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java index a75515f5bdfe..f5cbc79a4f8c 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java @@ -244,10 +244,10 @@ public void testTakeSnapshotInOrder() } @Test - public void testEnableUpsertView() + public void testConsistencyModeLock() throws Exception { UpsertContext upsertContext = mock(UpsertContext.class); - when(upsertContext.isUpsertViewEnabled()).thenReturn(true); + when(upsertContext.getConsistencyMode()).thenReturn(UpsertConfig.ConsistencyMode.LOCK); DummyPartitionUpsertMetadataManager upsertMetadataManager = new DummyPartitionUpsertMetadataManager("myTable", 0, upsertContext); @@ -323,11 +323,10 @@ public void testEnableUpsertView() } @Test - public void testEnableUpsertViewBatchRefresh() + public void testConsistencyModeSnapshot() throws Exception { UpsertContext upsertContext = mock(UpsertContext.class); - when(upsertContext.isUpsertViewEnabled()).thenReturn(true); - when(upsertContext.isUpsertViewBatchRefreshEnabled()).thenReturn(true); + when(upsertContext.getConsistencyMode()).thenReturn(UpsertConfig.ConsistencyMode.SNAPSHOT); when(upsertContext.getUpsertViewRefreshIntervalMs()).thenReturn(3000L); DummyPartitionUpsertMetadataManager upsertMetadataManager = new DummyPartitionUpsertMetadataManager("myTable", 0, upsertContext); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java index fb073fbcd11d..fc9e789e4cf7 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java @@ -39,6 +39,10 @@ public enum Strategy { APPEND, IGNORE, INCREMENT, MAX, MIN, OVERWRITE, UNION } + public enum ConsistencyMode { + LOCK, SNAPSHOT, NONE + } + @JsonPropertyDescription("Upsert mode.") private Mode _mode; @@ -75,13 +79,10 @@ public enum Strategy { @JsonPropertyDescription("Whether to preload segments for fast upsert metadata recovery") private boolean _enablePreload; - @JsonPropertyDescription("Whether to enable consistent view for upsert table") - private boolean _enableUpsertView; + @JsonPropertyDescription("Configure the way to provide consistent view for upsert table") + private ConsistencyMode _consistencyMode = ConsistencyMode.NONE; - @JsonPropertyDescription("Whether to enable batch refresh mode to keep consistent view for upsert table") - private boolean _enableUpsertViewBatchRefresh; - - @JsonPropertyDescription("Refresh interval if using batch refresh mode to keep consistent view for upsert table") + @JsonPropertyDescription("Refresh interval when using the snapshot consistency mode") private long _upsertViewRefreshIntervalMs = 3000; @JsonPropertyDescription("Custom class for upsert metadata manager") @@ -156,12 +157,8 @@ public boolean isEnablePreload() { return _enablePreload; } - public boolean isEnableUpsertView() { - return _enableUpsertView; - } - - public boolean isEnableUpsertViewBatchRefresh() { - return _enableUpsertViewBatchRefresh; + public ConsistencyMode getConsistencyMode() { + return _consistencyMode; } public long getUpsertViewRefreshIntervalMs() { @@ -258,12 +255,8 @@ public void setEnablePreload(boolean enablePreload) { _enablePreload = enablePreload; } - public void setEnableUpsertView(boolean enableUpsertView) { - _enableUpsertView = enableUpsertView; - } - - public void setEnableUpsertViewBatchRefresh(boolean enableUpsertViewBatchRefresh) { - _enableUpsertViewBatchRefresh = enableUpsertViewBatchRefresh; + public void setConsistencyMode(ConsistencyMode consistencyMode) { + _consistencyMode = consistencyMode; } public void setUpsertViewRefreshIntervalMs(long upsertViewRefreshIntervalMs) { From 772d775821f82f4ae6af43caab84adef10ec4c34 Mon Sep 17 00:00:00 2001 From: Xiaobing Li Date: Wed, 15 May 2024 14:27:43 -0700 Subject: [PATCH 4/6] rename --- .../local/upsert/BasePartitionUpsertMetadataManager.java | 6 +++--- .../upsert/BasePartitionUpsertMetadataManagerTest.java | 4 ++-- .../org/apache/pinot/spi/config/table/UpsertConfig.java | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java index 6564be3dff99..458f171fdd2c 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java @@ -125,7 +125,7 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps private volatile boolean _isPreloading; // There are two consistency modes: - // If using LOCK mode, the upsert threads take the WLock when the upsert involves two segments' bitmaps; and + // If using SYNC mode, the upsert threads take the WLock when the upsert involves two segments' bitmaps; and // the query threads take the RLock when getting bitmaps for all its selected segments. // If using SNAPSHOT mode, the query threads don't need to take lock when getting bitmaps for all its selected // segments, as the query threads access a copy of bitmaps that are kept updated by upsert thread periodically. But @@ -1045,7 +1045,7 @@ protected void replaceDocId(IndexSegment newSegment, ThreadSafeMutableRoaringBit @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, IndexSegment oldSegment, int oldDocId, int newDocId, RecordInfo recordInfo) { // For SNAPSHOT consistency mode, we can use RLock here. But for simplicity and considering there is only one - // thread doing upsert most of the time, we just use WLock, as required by LOCK mode. + // thread doing upsert most of the time, we just use WLock, as required by SYNC mode. if (_consistencyMode != UpsertConfig.ConsistencyMode.NONE) { _upsertViewLock.writeLock().lock(); } @@ -1154,7 +1154,7 @@ public void setSegmentContexts(List segmentContexts, Map Date: Mon, 20 May 2024 15:44:44 -0700 Subject: [PATCH 5/6] cr --- .../BasePartitionUpsertMetadataManager.java | 44 ++++++++++++------- 1 file changed, 27 insertions(+), 17 deletions(-) diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java index 458f171fdd2c..c6eefa3d3b0a 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java @@ -1039,28 +1039,35 @@ public synchronized void close() } /** - * Use WLock to make updates on two segments' bitmaps atomically. + * The same R/WLock is used by the two consistency modes, but they are independent: + * - For sync mode, upsert threads take WLock to make updates on two segments' bitmaps atomically, and query threads + * take RLock when to access the segment bitmaps. + * - For snapshot mode, upsert threads take RLock to make updates on segments' bitmaps so that they can be + * synchronized with threads taking the snapshot of bitmaps, which take the WLock. */ protected void replaceDocId(IndexSegment newSegment, ThreadSafeMutableRoaringBitmap validDocIds, @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, IndexSegment oldSegment, int oldDocId, int newDocId, RecordInfo recordInfo) { - // For SNAPSHOT consistency mode, we can use RLock here. But for simplicity and considering there is only one - // thread doing upsert most of the time, we just use WLock, as required by SYNC mode. - if (_consistencyMode != UpsertConfig.ConsistencyMode.NONE) { + if (_consistencyMode == UpsertConfig.ConsistencyMode.SYNC) { _upsertViewLock.writeLock().lock(); + } else if (_consistencyMode == UpsertConfig.ConsistencyMode.SNAPSHOT) { + _upsertViewLock.readLock().lock(); } try { doRemoveDocId(oldSegment, oldDocId); doAddDocId(validDocIds, queryableDocIds, newDocId, recordInfo); - if (_consistencyMode == UpsertConfig.ConsistencyMode.SNAPSHOT) { + } finally { + if (_consistencyMode == UpsertConfig.ConsistencyMode.SYNC) { + _upsertViewLock.writeLock().unlock(); + } else if (_consistencyMode == UpsertConfig.ConsistencyMode.SNAPSHOT) { _updatedSegmentsSinceLastRefresh.add(newSegment); _updatedSegmentsSinceLastRefresh.add(oldSegment); + _upsertViewLock.readLock().unlock(); + // Batch refresh takes WLock. Do it outside RLock for clarity. The R/W lock ensures that only one thread + // can refresh the bitmaps. The other threads that are about to update the bitmaps will be blocked until + // refreshing is done. doBatchRefreshUpsertView(_upsertViewRefreshIntervalMs); } - } finally { - if (_consistencyMode != UpsertConfig.ConsistencyMode.NONE) { - _upsertViewLock.writeLock().unlock(); - } } } @@ -1088,9 +1095,7 @@ protected void replaceDocId(IndexSegment segment, ThreadSafeMutableRoaringBitmap if (_consistencyMode == UpsertConfig.ConsistencyMode.SNAPSHOT) { _updatedSegmentsSinceLastRefresh.add(segment); _upsertViewLock.readLock().unlock(); - // Batch refresh takes WLock. Do it outside RLock for clarity. The R/W lock ensures that only one thread - // can refresh the bitmaps. The other threads that are about to update the bitmaps will be blocked until - // refreshing is done. + // Batch refresh takes WLock. Do it outside RLock for clarity. doBatchRefreshUpsertView(_upsertViewRefreshIntervalMs); } } @@ -1168,12 +1173,18 @@ public void setSegmentContexts(List segmentContexts, Map currentUpsertView = _segmentQueryableDocIdsMap; for (SegmentContext segmentContext : segmentContexts) { IndexSegment segment = segmentContext.getIndexSegment(); - if (_segmentQueryableDocIdsMap.containsKey(segment)) { - segmentContext.setQueryableDocIdsSnapshot(_segmentQueryableDocIdsMap.get(segment)); + MutableRoaringBitmap segmentView = currentUpsertView.get(segment); + if (segmentView != null) { + segmentContext.setQueryableDocIdsSnapshot(segmentView); } } } @@ -1202,11 +1213,10 @@ private void setSegmentContexts(List segmentContexts) { } private boolean skipUpsertViewRefresh(long upsertViewFreshnessMs) { - long nowMs = System.currentTimeMillis(); if (upsertViewFreshnessMs < 0) { return true; } - return _lastUpsertViewRefreshTimeMs + upsertViewFreshnessMs > nowMs; + return _lastUpsertViewRefreshTimeMs + upsertViewFreshnessMs > System.currentTimeMillis(); } private void doBatchRefreshUpsertView(long upsertViewFreshnessMs) { From 97e79e7638f758cfab70548f2a32a792e45efaa0 Mon Sep 17 00:00:00 2001 From: Xiaobing Li Date: Tue, 21 May 2024 16:18:02 -0700 Subject: [PATCH 6/6] minor refine --- .../local/upsert/BasePartitionUpsertMetadataManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java index c6eefa3d3b0a..6a873e3cc439 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java @@ -1175,7 +1175,7 @@ public void setSegmentContexts(List segmentContexts, Map