Skip to content

Commit

Permalink
chore: bpop prints
Browse files Browse the repository at this point in the history
Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
  • Loading branch information
dranikpg committed May 25, 2024
1 parent 8a0007d commit c9e1bfc
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 10 deletions.
15 changes: 11 additions & 4 deletions src/server/container_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,8 @@ string_view LpGetView(uint8_t* lp_it, uint8_t int_buf[]) {

OpResult<string> RunCbOnFirstNonEmptyBlocking(Transaction* trans, int req_obj_type,
BlockingResultCb func, unsigned limit_ms,
bool* block_flag, bool* pause_flag) {
bool* block_flag, bool* pause_flag,
std::string* info) {
string result_key;

// Fast path. If we have only a single shard, we can run opportunistically with a single hop.
Expand All @@ -289,10 +290,13 @@ OpResult<string> RunCbOnFirstNonEmptyBlocking(Transaction* trans, int req_obj_ty
OpResult<ShardFFResult> result;
if (trans->GetUniqueShardCnt() == 1 && absl::GetFlag(FLAGS_singlehop_blocking)) {
auto res = FindFirstNonEmptySingleShard(trans, req_obj_type, func);
if (res.ok())
if (res.ok()) {
if (info)
*info = "FF1S/";
return res;
else
} else {
result = res.status();
}
} else {
result = FindFirstNonEmpty(trans, req_obj_type);
}
Expand All @@ -307,6 +311,8 @@ OpResult<string> RunCbOnFirstNonEmptyBlocking(Transaction* trans, int req_obj_ty
return OpStatus::OK;
};
trans->Execute(std::move(cb), true);
if (info)
*info = "FFMS/";
return result_key;
}

Expand Down Expand Up @@ -351,7 +357,8 @@ OpResult<string> RunCbOnFirstNonEmptyBlocking(Transaction* trans, int req_obj_ty
return OpStatus::OK;
};
trans->Execute(std::move(cb), true);

if (info)
*info = "BLOCK/";
return result_key;
}

Expand Down
3 changes: 2 additions & 1 deletion src/server/container_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ using BlockingResultCb =
// immediately with the first key listed in the tx arguments.
OpResult<std::string> RunCbOnFirstNonEmptyBlocking(Transaction* trans, int req_obj_type,
BlockingResultCb cb, unsigned limit_ms,
bool* block_flag, bool* pause_flag);
bool* block_flag, bool* pause_flag,
std::string* info = nullptr);

}; // namespace container_utils

Expand Down
24 changes: 19 additions & 5 deletions src/server/zset_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1281,11 +1281,19 @@ ScoredArray OpBZPop(Transaction* t, EngineShard* shard, std::string_view key, bo
DVLOG(2) << "popping from " << key << " " << t->DebugId();

PrimeValue& pv = it->second;
CHECK_GT(pv.Size(), 0u) << key << " " << pv.GetRobjWrapper()->encoding();

IntervalVisitor iv{Action::POP, range_spec.params, &pv};
std::visit(iv, range_spec.interval);

it_res->post_updater.Run();

auto res = iv.PopResult();

// We don't store empty keys
CHECK(!res.empty()) << key << " failed to pop from type " << pv.GetRobjWrapper()->encoding()
<< " now size is " << pv.Size();

auto zlen = pv.Size();
if (zlen == 0) {
DVLOG(1) << "deleting key " << key << " " << t->DebugId();
Expand All @@ -1298,7 +1306,7 @@ ScoredArray OpBZPop(Transaction* t, EngineShard* shard, std::string_view key, bo
RecordJournal(op_args, command, ArgSlice{key}, 1);
}

return iv.PopResult();
return res;
}

void BZPopMinMax(CmdArgList args, ConnectionContext* cntx, bool is_max) {
Expand All @@ -1316,19 +1324,25 @@ void BZPopMinMax(CmdArgList args, ConnectionContext* cntx, bool is_max) {

Transaction* transaction = cntx->transaction;

std::string dinfo;
std::string callback_ran_key = "/NONE/";
OpResult<ScoredArray> popped_array;
auto cb = [is_max, &popped_array](Transaction* t, EngineShard* shard, std::string_view key) {
auto cb = [is_max, &popped_array, &callback_ran_key](Transaction* t, EngineShard* shard,
std::string_view key) {
callback_ran_key = key;
popped_array = OpBZPop(t, shard, key, is_max);
};

OpResult<string> popped_key = container_utils::RunCbOnFirstNonEmptyBlocking(
transaction, OBJ_ZSET, std::move(cb), unsigned(timeout * 1000), &cntx->blocked,
&cntx->paused);
transaction, OBJ_ZSET, std::move(cb), unsigned(timeout * 1000), &cntx->blocked, &cntx->paused,
&dinfo);

auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
if (popped_key) {
DVLOG(1) << "BZPop " << transaction->DebugId() << " popped from key " << popped_key; // key.
CHECK(popped_array->size() == 1);
CHECK(popped_array.ok()) << dinfo;
CHECK_EQ(popped_array->size(), 1u)
<< popped_key << " ran " << callback_ran_key << " info " << dinfo;
rb->StartArray(3);
rb->SendBulkString(*popped_key);
rb->SendBulkString(popped_array->front().first);
Expand Down

0 comments on commit c9e1bfc

Please sign in to comment.