Skip to content

Commit

Permalink
fix(server): handle running script load inside multi (#4074)
Browse files Browse the repository at this point in the history

Signed-off-by: adi_holden <[email protected]>
  • Loading branch information
adiholden authored Nov 10, 2024
1 parent 75c961e commit 2d49a28
Show file tree
Hide file tree
Showing 10 changed files with 123 additions and 78 deletions.
27 changes: 27 additions & 0 deletions src/server/common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ extern "C" {
#include "base/flags.h"
#include "base/logging.h"
#include "core/compact_object.h"
#include "core/interpreter.h"
#include "server/conn_context.h"
#include "server/engine_shard_set.h"
#include "server/error.h"
#include "server/journal/journal.h"
Expand Down Expand Up @@ -453,4 +455,29 @@ void ThreadLocalMutex::unlock() {
}
}

BorrowedInterpreter::BorrowedInterpreter(Transaction* tx, ConnectionState* state) {
// Ensure squashing ignores EVAL. We can't run on a stub context, because it doesn't have our
// preborrowed interpreter (which can't be shared on multiple threads).
CHECK(!state->squashing_info);

if (auto borrowed = state->exec_info.preborrowed_interpreter; borrowed) {
// Ensure a preborrowed interpreter is only set for an already running MULTI transaction.
CHECK_EQ(state->exec_info.state, ConnectionState::ExecInfo::EXEC_RUNNING);

interpreter_ = borrowed;
} else {
// A scheduled transaction occupies a place in the transaction queue and holds locks,
// preventing other transactions from progressing. Blocking below can deadlock!
CHECK(!tx->IsScheduled());

interpreter_ = ServerState::tlocal()->BorrowInterpreter();
owned_ = true;
}
}

BorrowedInterpreter::~BorrowedInterpreter() {
if (owned_)
ServerState::tlocal()->ReturnInterpreter(interpreter_);
}

} // namespace dfly
25 changes: 25 additions & 0 deletions src/server/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ using RdbTypeFreqMap = absl::flat_hash_map<unsigned, size_t>;
class CommandId;
class Transaction;
class EngineShard;
class ConnectionState;
class Interpreter;

struct LockTagOptions {
bool enabled = false;
Expand Down Expand Up @@ -353,6 +355,29 @@ template <typename Mutex> class ABSL_SCOPED_LOCKABLE SharedLock {
bool is_locked_;
};

// Ensures availability of an interpreter for EVAL-like commands and it's automatic release.
// If it's part of MULTI, the preborrowed interpreter is returned, otherwise a new is acquired.
struct BorrowedInterpreter {
BorrowedInterpreter(Transaction* tx, ConnectionState* state);

~BorrowedInterpreter();

// Give up ownership of the interpreter, it must be returned manually.
Interpreter* Release() && {
DCHECK(owned_);
owned_ = false;
return interpreter_;
}

operator Interpreter*() {
return interpreter_;
}

private:
Interpreter* interpreter_ = nullptr;
bool owned_ = false;
};

extern size_t serialization_max_chunk_size;

} // namespace dfly
7 changes: 7 additions & 0 deletions src/server/conn_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,13 @@ size_t StoredCmd::NumArgs() const {
return sizes_.size();
}

std::string StoredCmd::FirstArg() const {
if (sizes_.size() == 0) {
return {};
}
return buffer_.substr(0, sizes_[0]);
}

facade::ReplyMode StoredCmd::ReplyMode() const {
return reply_mode_;
}
Expand Down
2 changes: 2 additions & 0 deletions src/server/conn_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ class StoredCmd {
Fill(absl::MakeSpan(*dest));
}

std::string FirstArg() const;

const CommandId* Cid() const;

facade::ReplyMode ReplyMode() const;
Expand Down
90 changes: 24 additions & 66 deletions src/server/main_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -684,40 +684,41 @@ void ClusterHtmlPage(const http::QueryArgs& args, HttpContext* send,
send->Invoke(std::move(resp));
}

enum class ExecEvalState {
enum class ExecScriptUse {
NONE = 0,
ALL = 1,
SOME = 2,
SCRIPT_LOAD = 1,
SCRIPT_RUN = 2,
};

ExecEvalState DetermineEvalPresense(const std::vector<StoredCmd>& body) {
unsigned eval_cnt = 0;
ExecScriptUse DetermineScriptPresense(const std::vector<StoredCmd>& body) {
bool script_load = false;
for (const auto& scmd : body) {
if (CO::IsEvalKind(scmd.Cid()->name())) {
eval_cnt++;
return ExecScriptUse::SCRIPT_RUN;
}
}

if (eval_cnt == 0)
return ExecEvalState::NONE;
if ((scmd.Cid()->name() == "SCRIPT") && (absl::AsciiStrToUpper(scmd.FirstArg()) == "LOAD")) {
script_load = true;
}
}

if (eval_cnt == body.size())
return ExecEvalState::ALL;
if (script_load)
return ExecScriptUse::SCRIPT_LOAD;

return ExecEvalState::SOME;
return ExecScriptUse::NONE;
}

// Returns the multi mode for that transaction. Returns NOT_DETERMINED if no scheduling
// is required.
Transaction::MultiMode DeduceExecMode(ExecEvalState state,
Transaction::MultiMode DeduceExecMode(ExecScriptUse state,
const ConnectionState::ExecInfo& exec_info,
const ScriptMgr& script_mgr) {
// Check if script most LIKELY has global eval transactions
bool contains_global = false;
Transaction::MultiMode multi_mode =
static_cast<Transaction::MultiMode>(absl::GetFlag(FLAGS_multi_exec_mode));

if (state != ExecEvalState::NONE) {
if (state == ExecScriptUse::SCRIPT_RUN) {
contains_global = script_mgr.AreGlobalByDefault();
}

Expand Down Expand Up @@ -765,50 +766,6 @@ string CreateExecDescriptor(const std::vector<StoredCmd>& stored_cmds, unsigned
return result;
}

// Ensures availability of an interpreter for EVAL-like commands and it's automatic release.
// If it's part of MULTI, the preborrowed interpreter is returned, otherwise a new is acquired.
struct BorrowedInterpreter {
BorrowedInterpreter(Transaction* tx, ConnectionContext* cntx) {
// Ensure squashing ignores EVAL. We can't run on a stub context, because it doesn't have our
// preborrowed interpreter (which can't be shared on multiple threads).
CHECK(!cntx->conn_state.squashing_info);

if (auto borrowed = cntx->conn_state.exec_info.preborrowed_interpreter; borrowed) {
// Ensure a preborrowed interpreter is only set for an already running MULTI transaction.
CHECK_EQ(cntx->conn_state.exec_info.state, ConnectionState::ExecInfo::EXEC_RUNNING);

interpreter_ = borrowed;
} else {
// A scheduled transaction occupies a place in the transaction queue and holds locks,
// preventing other transactions from progressing. Blocking below can deadlock!
CHECK(!tx->IsScheduled());

interpreter_ = ServerState::tlocal()->BorrowInterpreter();
owned_ = true;
}
}

~BorrowedInterpreter() {
if (owned_)
ServerState::tlocal()->ReturnInterpreter(interpreter_);
}

// Give up ownership of the interpreter, it must be returned manually.
Interpreter* Release() && {
DCHECK(owned_);
owned_ = false;
return interpreter_;
}

operator Interpreter*() {
return interpreter_;
}

private:
Interpreter* interpreter_ = nullptr;
bool owned_ = false;
};

string ConnectionLogContext(const facade::Connection* conn) {
if (conn == nullptr) {
return "(null-conn)";
Expand Down Expand Up @@ -1873,7 +1830,7 @@ void Service::Eval(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
return rb->SendNull();
}

BorrowedInterpreter interpreter{tx, cntx};
BorrowedInterpreter interpreter{tx, &cntx->conn_state};
auto res = server_family_.script_mgr()->Insert(body, interpreter);
if (!res)
return builder->SendError(res.error().Format(), facade::kScriptErrType);
Expand All @@ -1887,7 +1844,7 @@ void Service::EvalSha(CmdArgList args, Transaction* tx, SinkReplyBuilder* builde
ConnectionContext* cntx) {
string sha = absl::AsciiStrToLower(ArgS(args, 0));

BorrowedInterpreter interpreter{cntx->transaction, cntx};
BorrowedInterpreter interpreter{cntx->transaction, &cntx->conn_state};
CallSHA(args, sha, interpreter, builder, cntx);
}

Expand Down Expand Up @@ -2254,12 +2211,13 @@ void Service::Exec(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,

cntx->last_command_debug.exec_body_len = exec_info.body.size();

// The transaction can contain scripts, determine their presence ahead to customize logic below.
ExecEvalState state = DetermineEvalPresense(exec_info.body);
// The transaction can contain script load script execution, determine their presence ahead to
// customize logic below.
ExecScriptUse state = DetermineScriptPresense(exec_info.body);

// We borrow a single interpreter for all the EVALs inside. Returned by MultiCleanup
if (state != ExecEvalState::NONE) {
exec_info.preborrowed_interpreter = BorrowedInterpreter(tx, cntx).Release();
// We borrow a single interpreter for all the EVALs/Script load inside. Returned by MultiCleanup
if (state != ExecScriptUse::NONE) {
exec_info.preborrowed_interpreter = BorrowedInterpreter(tx, &cntx->conn_state).Release();
}

// Determine according multi mode, not only only flag, but based on presence of global commands
Expand Down Expand Up @@ -2293,7 +2251,7 @@ void Service::Exec(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ServerState::tlocal()->exec_freq_count[descr]++;
}

if (absl::GetFlag(FLAGS_multi_exec_squash) && state == ExecEvalState::NONE &&
if (absl::GetFlag(FLAGS_multi_exec_squash) && state != ExecScriptUse::SCRIPT_RUN &&
!cntx->conn_state.tracking_info_.IsTrackingOn()) {
MultiCommandSquasher::Execute(absl::MakeSpan(exec_info.body), rb, cntx, this);
} else {
Expand Down
23 changes: 23 additions & 0 deletions src/server/multi_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1147,6 +1147,29 @@ TEST_F(MultiEvalTest, MultiAndEval) {
Run({"eval", "return 'OK';", "0"});
auto resp = Run({"exec"});
EXPECT_EQ(resp, "OK");

// We had a bug running script load inside multi
Run({"multi"});
Run({"script", "load", "return '5'"});
Run({"exec"});

Run({"multi"});
Run({"script", "load", "return '5'"});
Run({"get", "x"});
Run({"exec"});

Run({"multi"});
Run({"script", "load", "return '5'"});
Run({"mset", "x1", "y1", "x2", "y2"});
Run({"exec"});

Run({"multi"});
Run({"script", "load", "return '5'"});
Run({"eval", "return redis.call('set', 'x', 'y')", "1", "x"});
Run({"get", "x"});
Run({"exec"});

Run({"get", "x"});
}

TEST_F(MultiTest, MultiTypes) {
Expand Down
12 changes: 6 additions & 6 deletions src/server/script_mgr.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ ScriptMgr::ScriptKey::ScriptKey(string_view sha) : array{} {
memcpy(data(), sha.data(), size());
}

void ScriptMgr::Run(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
void ScriptMgr::Run(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
string subcmd = absl::AsciiStrToUpper(ArgS(args, 0));

if (subcmd == "HELP") {
Expand Down Expand Up @@ -110,7 +111,7 @@ void ScriptMgr::Run(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder)
return LatencyCmd(tx, builder);

if (subcmd == "LOAD" && args.size() == 2)
return LoadCmd(args, tx, builder);
return LoadCmd(args, tx, builder, cntx);

if (subcmd == "FLAGS" && args.size() > 2)
return ConfigCmd(args, tx, builder);
Expand Down Expand Up @@ -144,7 +145,8 @@ void ScriptMgr::FlushCmd(CmdArgList args, Transaction* tx, SinkReplyBuilder* bui
return builder->SendOk();
}

void ScriptMgr::LoadCmd(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
void ScriptMgr::LoadCmd(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
string_view body = ArgS(args, 1);
auto rb = static_cast<RedisReplyBuilder*>(builder);
if (body.empty()) {
Expand All @@ -153,9 +155,7 @@ void ScriptMgr::LoadCmd(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil
return rb->SendBulkString(sha);
}

ServerState* ss = ServerState::tlocal();
auto interpreter = ss->BorrowInterpreter();
absl::Cleanup clean = [ss, interpreter]() { ss->ReturnInterpreter(interpreter); };
BorrowedInterpreter interpreter{tx, &cntx->conn_state};

auto res = Insert(body, interpreter);
if (!res)
Expand Down
5 changes: 3 additions & 2 deletions src/server/script_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class ScriptMgr {

ScriptMgr();

void Run(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
void Run(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx);

// Insert script and return sha. Get possible error from compilation or parsing script flags.
io::Result<std::string, GenericError> Insert(std::string_view body, Interpreter* interpreter);
Expand All @@ -69,7 +69,8 @@ class ScriptMgr {
private:
void ExistsCmd(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) const;
void FlushCmd(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
void LoadCmd(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
void LoadCmd(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx);
void ConfigCmd(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
void ListCmd(Transaction* tx, SinkReplyBuilder* builder) const;
void LatencyCmd(Transaction* tx, SinkReplyBuilder* builder) const;
Expand Down
2 changes: 1 addition & 1 deletion src/server/server_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3008,7 +3008,7 @@ void ServerFamily::Role(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil

void ServerFamily::Script(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
script_mgr_->Run(std::move(args), tx, builder);
script_mgr_->Run(std::move(args), tx, builder, cntx);
}

void ServerFamily::LastSave(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
Expand Down
8 changes: 5 additions & 3 deletions src/server/transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -399,10 +399,12 @@ OpStatus Transaction::InitByArgs(Namespace* ns, DbIndex index, CmdArgList args)
}

if ((cid_->opt_mask() & CO::NO_KEY_TRANSACTIONAL) > 0) {
if ((cid_->opt_mask() & CO::NO_KEY_TX_SPAN_ALL) > 0)
if (((cid_->opt_mask() & CO::NO_KEY_TX_SPAN_ALL) > 0)) {
EnableAllShards();
else
} else {
EnableShard(0);
}

return OpStatus::OK;
}

Expand Down Expand Up @@ -976,7 +978,7 @@ string Transaction::DEBUG_PrintFailState(ShardId sid) const {
void Transaction::EnableShard(ShardId sid) {
unique_shard_cnt_ = 1;
unique_shard_id_ = sid;
shard_data_.resize(1);
shard_data_.resize(IsActiveMulti() ? shard_set->size() : 1);
shard_data_.front().local_mask |= ACTIVE;
}

Expand Down

0 comments on commit 2d49a28

Please sign in to comment.