Skip to content

Commit

Permalink
Add locking logic to get consistent table view for upsert tables (#12976
Browse files Browse the repository at this point in the history
)

* added upsert config ConsistencyMode as the feature flag
  • Loading branch information
klsince authored May 22, 2024
1 parent 652bb6b commit 429bb7a
Show file tree
Hide file tree
Showing 11 changed files with 470 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,11 @@ public static boolean isSkipUpsert(Map<String, String> queryOptions) {
return Boolean.parseBoolean(queryOptions.get(QueryOptionKey.SKIP_UPSERT));
}

public static long getUpsertViewFreshnessMs(Map<String, String> queryOptions) {
String freshnessMsString = queryOptions.get(QueryOptionKey.UPSERT_VIEW_FRESHNESS_MS);
return freshnessMsString != null ? Long.parseLong(freshnessMsString) : -1;
}

public static boolean isScanStarTreeNodes(Map<String, String> queryOptions) {
return Boolean.parseBoolean(queryOptions.get(QueryOptionKey.SCAN_STAR_TREE_NODES));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ public List<SegmentContext> getSegmentContexts(List<IndexSegment> selectedSegmen
List<SegmentContext> segmentContexts = new ArrayList<>(selectedSegments.size());
selectedSegments.forEach(s -> segmentContexts.add(new SegmentContext(s)));
if (isUpsertEnabled() && !QueryOptionsUtils.isSkipUpsert(queryOptions)) {
_tableUpsertMetadataManager.setSegmentContexts(segmentContexts);
_tableUpsertMetadataManager.setSegmentContexts(segmentContexts, queryOptions);
}
return segmentContexts;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.metrics.ServerTimer;
import org.apache.pinot.common.utils.SegmentUtils;
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.segment.local.indexsegment.immutable.EmptyIndexSegment;
Expand All @@ -71,6 +72,7 @@
import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
import org.apache.pinot.spi.config.table.HashFunction;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.utils.BooleanUtils;
import org.apache.pinot.spi.utils.CommonConstants;
Expand Down Expand Up @@ -122,6 +124,20 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
private final Lock _preloadLock = new ReentrantLock();
private volatile boolean _isPreloading;

// There are two consistency modes:
// If using SYNC mode, the upsert threads take the WLock when the upsert involves two segments' bitmaps; and
// the query threads take the RLock when getting bitmaps for all its selected segments.
// If using SNAPSHOT mode, the query threads don't need to take lock when getting bitmaps for all its selected
// segments, as the query threads access a copy of bitmaps that are kept updated by upsert thread periodically. But
// the query thread can specify a freshness threshold query option to refresh the bitmap copies if not fresh enough.
// By default, the mode is NONE to disable the support for data consistency.
private final UpsertConfig.ConsistencyMode _consistencyMode;
private final long _upsertViewRefreshIntervalMs;
private final ReadWriteLock _upsertViewLock = new ReentrantReadWriteLock();
private final Set<IndexSegment> _updatedSegmentsSinceLastRefresh = ConcurrentHashMap.newKeySet();
private volatile long _lastUpsertViewRefreshTimeMs = 0;
private volatile Map<IndexSegment, MutableRoaringBitmap> _segmentQueryableDocIdsMap;

protected BasePartitionUpsertMetadataManager(String tableNameWithType, int partitionId, UpsertContext context) {
_tableNameWithType = tableNameWithType;
_partitionId = partitionId;
Expand All @@ -137,6 +153,9 @@ protected BasePartitionUpsertMetadataManager(String tableNameWithType, int parti
_metadataTTL = context.getMetadataTTL();
_deletedKeysTTL = context.getDeletedKeysTTL();
_tableIndexDir = context.getTableIndexDir();
UpsertConfig.ConsistencyMode cmode = context.getConsistencyMode();
_consistencyMode = cmode != null ? cmode : UpsertConfig.ConsistencyMode.NONE;
_upsertViewRefreshIntervalMs = context.getUpsertViewRefreshIntervalMs();
_serverMetrics = ServerMetrics.get();
_logger = LoggerFactory.getLogger(tableNameWithType + "-" + partitionId + "-" + getClass().getSimpleName());
if (_metadataTTL > 0) {
Expand Down Expand Up @@ -1019,34 +1038,111 @@ public synchronized void close()
_logger.info("Closed the metadata manager");
}

protected void replaceDocId(ThreadSafeMutableRoaringBitmap validDocIds,
/**
* The same R/WLock is used by the two consistency modes, but they are independent:
* - For sync mode, upsert threads take WLock to make updates on two segments' bitmaps atomically, and query threads
* take RLock when to access the segment bitmaps.
* - For snapshot mode, upsert threads take RLock to make updates on segments' bitmaps so that they can be
* synchronized with threads taking the snapshot of bitmaps, which take the WLock.
*/
protected void replaceDocId(IndexSegment newSegment, ThreadSafeMutableRoaringBitmap validDocIds,
@Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, IndexSegment oldSegment, int oldDocId, int newDocId,
RecordInfo recordInfo) {
removeDocId(oldSegment, oldDocId);
addDocId(validDocIds, queryableDocIds, newDocId, recordInfo);
if (_consistencyMode == UpsertConfig.ConsistencyMode.SYNC) {
_upsertViewLock.writeLock().lock();
} else if (_consistencyMode == UpsertConfig.ConsistencyMode.SNAPSHOT) {
_upsertViewLock.readLock().lock();
}
try {
doRemoveDocId(oldSegment, oldDocId);
doAddDocId(validDocIds, queryableDocIds, newDocId, recordInfo);
} finally {
if (_consistencyMode == UpsertConfig.ConsistencyMode.SYNC) {
_upsertViewLock.writeLock().unlock();
} else if (_consistencyMode == UpsertConfig.ConsistencyMode.SNAPSHOT) {
_updatedSegmentsSinceLastRefresh.add(newSegment);
_updatedSegmentsSinceLastRefresh.add(oldSegment);
_upsertViewLock.readLock().unlock();
// Batch refresh takes WLock. Do it outside RLock for clarity. The R/W lock ensures that only one thread
// can refresh the bitmaps. The other threads that are about to update the bitmaps will be blocked until
// refreshing is done.
doBatchRefreshUpsertView(_upsertViewRefreshIntervalMs);
}
}
}

protected void replaceDocId(ThreadSafeMutableRoaringBitmap validDocIds,
/**
* There is no need to take the R/WLock to update single bitmap, as all methods to update the bitmap is synchronized.
* But for upsertViewBatchRefresh to work correctly, we need to block updates on bitmaps while doing batch refresh,
* which takes WLock. So wrap bitmap update logic inside RLock to allow concurrent updates but to be blocked when
* there is thread doing batch refresh, i.e. to take copies of all bitmaps.
*/
protected void replaceDocId(IndexSegment segment, ThreadSafeMutableRoaringBitmap validDocIds,
@Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, int oldDocId, int newDocId, RecordInfo recordInfo) {
validDocIds.replace(oldDocId, newDocId);
if (queryableDocIds != null) {
if (recordInfo.isDeleteRecord()) {
queryableDocIds.remove(oldDocId);
} else {
queryableDocIds.replace(oldDocId, newDocId);
if (_consistencyMode == UpsertConfig.ConsistencyMode.SNAPSHOT) {
_upsertViewLock.readLock().lock();
}
try {
validDocIds.replace(oldDocId, newDocId);
if (queryableDocIds != null) {
if (recordInfo.isDeleteRecord()) {
queryableDocIds.remove(oldDocId);
} else {
queryableDocIds.replace(oldDocId, newDocId);
}
}
} finally {
if (_consistencyMode == UpsertConfig.ConsistencyMode.SNAPSHOT) {
_updatedSegmentsSinceLastRefresh.add(segment);
_upsertViewLock.readLock().unlock();
// Batch refresh takes WLock. Do it outside RLock for clarity.
doBatchRefreshUpsertView(_upsertViewRefreshIntervalMs);
}
}
}

protected void addDocId(ThreadSafeMutableRoaringBitmap validDocIds,
protected void addDocId(IndexSegment segment, ThreadSafeMutableRoaringBitmap validDocIds,
@Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, int docId, RecordInfo recordInfo) {
if (_consistencyMode == UpsertConfig.ConsistencyMode.SNAPSHOT) {
_upsertViewLock.readLock().lock();
}
try {
doAddDocId(validDocIds, queryableDocIds, docId, recordInfo);
} finally {
if (_consistencyMode == UpsertConfig.ConsistencyMode.SNAPSHOT) {
_updatedSegmentsSinceLastRefresh.add(segment);
_upsertViewLock.readLock().unlock();
// Batch refresh takes WLock. Do it outside RLock for clarity.
doBatchRefreshUpsertView(_upsertViewRefreshIntervalMs);
}
}
}

private void doAddDocId(ThreadSafeMutableRoaringBitmap validDocIds, ThreadSafeMutableRoaringBitmap queryableDocIds,
int docId, RecordInfo recordInfo) {
validDocIds.add(docId);
if (queryableDocIds != null && !recordInfo.isDeleteRecord()) {
queryableDocIds.add(docId);
}
}

protected void removeDocId(IndexSegment segment, int docId) {
if (_consistencyMode == UpsertConfig.ConsistencyMode.SNAPSHOT) {
_upsertViewLock.readLock().lock();
}
try {
doRemoveDocId(segment, docId);
} finally {
if (_consistencyMode == UpsertConfig.ConsistencyMode.SNAPSHOT) {
_updatedSegmentsSinceLastRefresh.add(segment);
_upsertViewLock.readLock().unlock();
// Batch refresh takes WLock. Do it outside RLock for clarity.
doBatchRefreshUpsertView(_upsertViewRefreshIntervalMs);
}
}
}

private void doRemoveDocId(IndexSegment segment, int docId) {
Objects.requireNonNull(segment.getValidDocIds()).remove(docId);
ThreadSafeMutableRoaringBitmap currentQueryableDocIds = segment.getQueryableDocIds();
if (currentQueryableDocIds != null) {
Expand All @@ -1058,11 +1154,37 @@ protected void removeDocId(IndexSegment segment, int docId) {
* Use the segmentContexts to collect the contexts for selected segments. Reuse the segmentContext object if
* present, to avoid overwriting the contexts specified at the others places.
*/
public void setSegmentContexts(List<SegmentContext> segmentContexts) {
public void setSegmentContexts(List<SegmentContext> segmentContexts, Map<String, String> queryOptions) {
if (_consistencyMode == UpsertConfig.ConsistencyMode.NONE) {
setSegmentContexts(segmentContexts);
return;
}
if (_consistencyMode == UpsertConfig.ConsistencyMode.SYNC) {
_upsertViewLock.readLock().lock();
try {
setSegmentContexts(segmentContexts);
return;
} finally {
_upsertViewLock.readLock().unlock();
}
}
// If batch refresh is enabled, the copy of bitmaps is kept updated and ready to use for a consistent view.
// The locking between query threads and upsert threads can be avoided when using batch refresh.
// Besides, queries can share the copy of bitmaps, w/o cloning the bitmaps by every single query.
// If query has specified a need for certain freshness, check the view and refresh it as needed.
// When refreshing the copy of map, we need to take the WLock so only one thread is refreshing view.
long upsertViewFreshnessMs =
Math.min(QueryOptionsUtils.getUpsertViewFreshnessMs(queryOptions), _upsertViewRefreshIntervalMs);
if (upsertViewFreshnessMs < 0) {
upsertViewFreshnessMs = _upsertViewRefreshIntervalMs;
}
doBatchRefreshUpsertView(upsertViewFreshnessMs);
Map<IndexSegment, MutableRoaringBitmap> currentUpsertView = _segmentQueryableDocIdsMap;
for (SegmentContext segmentContext : segmentContexts) {
IndexSegment segment = segmentContext.getIndexSegment();
if (_trackedSegments.contains(segment)) {
segmentContext.setQueryableDocIdsSnapshot(getQueryableDocIdsSnapshotFromSegment(segment));
MutableRoaringBitmap segmentView = currentUpsertView.get(segment);
if (segmentView != null) {
segmentContext.setQueryableDocIdsSnapshot(segmentView);
}
}
}
Expand All @@ -1081,6 +1203,51 @@ private static MutableRoaringBitmap getQueryableDocIdsSnapshotFromSegment(IndexS
return queryableDocIdsSnapshot;
}

private void setSegmentContexts(List<SegmentContext> segmentContexts) {
for (SegmentContext segmentContext : segmentContexts) {
IndexSegment segment = segmentContext.getIndexSegment();
if (_trackedSegments.contains(segment)) {
segmentContext.setQueryableDocIdsSnapshot(getQueryableDocIdsSnapshotFromSegment(segment));
}
}
}

private boolean skipUpsertViewRefresh(long upsertViewFreshnessMs) {
if (upsertViewFreshnessMs < 0) {
return true;
}
return _lastUpsertViewRefreshTimeMs + upsertViewFreshnessMs > System.currentTimeMillis();
}

private void doBatchRefreshUpsertView(long upsertViewFreshnessMs) {
// Always refresh if the current view is still empty.
if (skipUpsertViewRefresh(upsertViewFreshnessMs) && _segmentQueryableDocIdsMap != null) {
return;
}
_upsertViewLock.writeLock().lock();
try {
// Check again with lock, and always refresh if the current view is still empty.
Map<IndexSegment, MutableRoaringBitmap> current = _segmentQueryableDocIdsMap;
if (skipUpsertViewRefresh(upsertViewFreshnessMs) && current != null) {
return;
}
Map<IndexSegment, MutableRoaringBitmap> updated = new HashMap<>();
for (IndexSegment segment : _trackedSegments) {
if (current == null || _updatedSegmentsSinceLastRefresh.contains(segment)) {
updated.put(segment, getQueryableDocIdsSnapshotFromSegment(segment));
} else {
updated.put(segment, current.get(segment));
}
}
// Swap in the new consistent set of bitmaps.
_segmentQueryableDocIdsMap = updated;
_updatedSegmentsSinceLastRefresh.clear();
_lastUpsertViewRefreshTimeMs = System.currentTimeMillis();
} finally {
_upsertViewLock.writeLock().unlock();
}
}

protected void doClose()
throws IOException {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,19 +68,23 @@ public void init(TableConfig tableConfig, Schema schema, TableDataManager tableD
boolean enablePreload = upsertConfig.isEnablePreload();
double metadataTTL = upsertConfig.getMetadataTTL();
double deletedKeysTTL = upsertConfig.getDeletedKeysTTL();
UpsertConfig.ConsistencyMode consistencyMode = upsertConfig.getConsistencyMode();
long upsertViewRefreshIntervalMs = upsertConfig.getUpsertViewRefreshIntervalMs();
File tableIndexDir = tableDataManager.getTableDataDir();
_context = new UpsertContext.Builder().setTableConfig(tableConfig).setSchema(schema)
.setPrimaryKeyColumns(primaryKeyColumns).setComparisonColumns(comparisonColumns)
.setDeleteRecordColumn(deleteRecordColumn).setHashFunction(hashFunction)
.setPartialUpsertHandler(partialUpsertHandler).setEnableSnapshot(enableSnapshot).setEnablePreload(enablePreload)
.setMetadataTTL(metadataTTL).setDeletedKeysTTL(deletedKeysTTL).setTableIndexDir(tableIndexDir)
.setMetadataTTL(metadataTTL).setDeletedKeysTTL(deletedKeysTTL).setConsistencyMode(consistencyMode)
.setUpsertViewRefreshIntervalMs(upsertViewRefreshIntervalMs).setTableIndexDir(tableIndexDir)
.setTableDataManager(tableDataManager).build();
LOGGER.info(
"Initialized {} for table: {} with primary key columns: {}, comparison columns: {}, delete record column: {},"
+ " hash function: {}, upsert mode: {}, enable snapshot: {}, enable preload: {}, metadata TTL: {},"
+ " deleted Keys TTL: {}, table index dir: {}", getClass().getSimpleName(), _tableNameWithType,
primaryKeyColumns, comparisonColumns, deleteRecordColumn, hashFunction, upsertConfig.getMode(), enableSnapshot,
enablePreload, metadataTTL, deletedKeysTTL, tableIndexDir);
+ " deleted Keys TTL: {}, consistency mode: {}, upsert view refresh interval: {}ms, table index dir: {}",
getClass().getSimpleName(), _tableNameWithType, primaryKeyColumns, comparisonColumns, deleteRecordColumn,
hashFunction, upsertConfig.getMode(), enableSnapshot, enablePreload, metadataTTL, deletedKeysTTL,
consistencyMode, upsertViewRefreshIntervalMs, tableIndexDir);

initCustomVariables();
}
Expand Down
Loading

0 comments on commit 429bb7a

Please sign in to comment.