Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[redis_proxy] Add support for UNWATCH #37620

Merged
merged 11 commits into from
Dec 17, 2024
3 changes: 3 additions & 0 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,9 @@ new_features:
change: |
added new ``xds.virtual_host_name`` and ``xds.virtual_host_metadata`` attributes support. See
:ref:`attributes <arch_overview_attributes>` for looking up xDS configuration information.
- area: redis
change: |
Added support for UNWATCH command.

deprecated:
- area: rbac
Expand Down
1 change: 1 addition & 0 deletions docs/root/intro/arch_overview/other_protocols/redis.rst
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ For details on each command's usage see the official
SREM, Set
SSCAN, Set
WATCH, String
UNWATCH, String
ZADD, Sorted Set
ZCARD, Sorted Set
ZCOUNT, Sorted Set
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ struct SupportedCommands {
"lpush", "lpushx", "lrange", "lrem", "lset", "ltrim", "persist", "pexpire", "pexpireat",
"pfadd", "pfcount", "psetex", "pttl", "publish", "restore", "rpop", "rpush", "rpushx",
"sadd", "scard", "set", "setbit", "setex", "setnx", "setrange", "sismember", "smembers",
"spop", "srandmember", "srem", "sscan", "strlen", "ttl", "type", "watch", "xack", "xadd",
"spop", "srandmember", "srem", "sscan", "strlen", "ttl", "type", "xack", "xadd",
"xautoclaim", "xclaim", "xdel", "xlen", "xpending", "xrange", "xrevrange", "xtrim", "zadd",
"zcard", "zcount", "zincrby", "zlexcount", "zpopmin", "zpopmax", "zrange", "zrangebylex",
"zrangebyscore", "zrank", "zrem", "zremrangebylex", "zremrangebyrank", "zremrangebyscore",
Expand All @@ -55,7 +55,8 @@ struct SupportedCommands {
* @return commands which handle Redis transactions.
*/
static const absl::flat_hash_set<std::string>& transactionCommands() {
CONSTRUCT_ON_FIRST_USE(absl::flat_hash_set<std::string>, "multi", "exec", "discard");
CONSTRUCT_ON_FIRST_USE(absl::flat_hash_set<std::string>, "multi", "exec", "discard", "watch",
"unwatch");
}

/**
Expand All @@ -64,7 +65,7 @@ struct SupportedCommands {
static const std::string& auth() { CONSTRUCT_ON_FIRST_USE(std::string, "auth"); }

/**
* @return auth command
* @return echo command
*/
static const std::string& echo() { CONSTRUCT_ON_FIRST_USE(std::string, "echo"); }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -482,10 +482,19 @@ SplitRequestPtr TransactionRequest::create(Router& router,
return nullptr;
}
transaction.start();
// Respond to MULTI locally.
localResponse(callbacks, "OK");
return nullptr;

// If we already have a key set (from a previous WATCH command), we will send the actual MULTI
// upstream. Otherwise, we will respond locally with "OK".
if (transaction.key_.empty()) {
localResponse(callbacks, "OK");
return nullptr;
} else {
RouteSharedPtr route = router.upstreamPool(transaction.key_, stream_info);
if (route) {
// We reserve a client for the main connection and for each mirror connection.
transaction.clients_.resize(1 + route->mirrorPolicies().size());
}
}
} else if (command_name == "exec" || command_name == "discard") {
// Handle the case where we don't have an open transaction.
if (transaction.active_ == false) {
Expand All @@ -510,13 +519,31 @@ SplitRequestPtr TransactionRequest::create(Router& router,
transaction.should_close_ = true;
}

// If we do a WATCH command without having started a transaction, we send it upstream and save the
// key, so we can support UNWATCH. We have to also set the connection details.
if (command_name == "watch" && !transaction.active_) {
transaction.key_ = incoming_request->asArray()[1].asString();
}

// When we receive the first command with a key we will set this key as our transaction
// key, and then send a MULTI command to the node that handles that key.
// The response for the MULTI command will be discarded since we pass the null_pool_callbacks
// to the handler.

RouteSharedPtr route;
if (transaction.key_.empty()) {
// If we do an UNWATCH and we don't have a key until now, this is a no-op.
if (command_name == "unwatch") {
if (transaction.active_) {
// No-op during transaction -> QUEUED
localResponse(callbacks, "QUEUED");
} else {
// No-op outside transaction -> OK
localResponse(callbacks, "OK");
}

return nullptr;
}

transaction.key_ = incoming_request->asArray()[1].asString();
route = router.upstreamPool(transaction.key_, stream_info);
Common::Redis::RespValueSharedPtr multi_request =
Expand All @@ -542,12 +569,22 @@ SplitRequestPtr TransactionRequest::create(Router& router,
base_request, *request_ptr, callbacks.transaction());
}

// If we send an UNWATCH outside of a transaction, we clear the transaction key.
if (command_name == "unwatch" && !transaction.active_) {
transaction.key_.clear();
}

if (!request_ptr->handle_) {
command_stats.error_.inc();
callbacks.onResponse(Common::Redis::Utility::makeError(Response::get().NoUpstreamHost));
return nullptr;
}

// If we sent a MULTI command upstream, connection was established.
if (command_name == "multi") {
transaction.connection_established_ = true;
}

return request_ptr;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -691,6 +691,9 @@ void RedisProxyIntegrationTest::roundtripToUpstreamStep(
IntegrationTcpClientPtr& redis_client, FakeRawConnectionPtr& fake_upstream_connection,
const std::string& auth_username, const std::string& auth_password) {
redis_client->clearData();
if (fake_upstream_connection.get() != nullptr) {
fake_upstream_connection->clearData();
}
ASSERT_TRUE(redis_client->write(request));

expectUpstreamRequestResponse(upstream, request, response, fake_upstream_connection,
Expand All @@ -713,6 +716,7 @@ void RedisProxyIntegrationTest::expectUpstreamRequestResponse(
expect_auth_command = (!auth_password.empty());
EXPECT_TRUE(upstream->waitForRawConnection(fake_upstream_connection));
}

if (expect_auth_command) {
std::string auth_command = (auth_username.empty())
? makeBulkStringArray({"auth", auth_password})
Expand Down Expand Up @@ -1466,6 +1470,80 @@ TEST_P(RedisProxyIntegrationTest, ExecuteEmptyTransaction) {
redis_client->close();
}

TEST_P(RedisProxyIntegrationTest, UnwatchNoTransactionNoOp) {
initialize();
simpleProxyResponse(makeBulkStringArray({"unwatch"}), "+OK\r\n");
}

TEST_P(RedisProxyIntegrationTest, UnwatchWithTransactionNoOp) {
initialize();
IntegrationTcpClientPtr redis_client = makeTcpConnection(lookupPort("redis_proxy"));

proxyResponseStep(makeBulkStringArray({"multi"}), "+OK\r\n", redis_client);
proxyResponseStep(makeBulkStringArray({"unwatch"}), "+QUEUED\r\n", redis_client);

redis_client->close();
}

TEST_P(RedisProxyIntegrationTest, WatchUnwatchNoTransaction) {
initialize();
IntegrationTcpClientPtr redis_client = makeTcpConnection(lookupPort("redis_proxy"));

FakeUpstreamPtr& upstream = fake_upstreams_[0];
FakeRawConnectionPtr fake_upstream_conn;

roundtripToUpstreamStep(upstream, makeBulkStringArray({"watch", "foo"}), "+OK\r\n", redis_client,
fake_upstream_conn, "", "");

roundtripToUpstreamStep(upstream, makeBulkStringArray({"unwatch"}), "+OK\r\n", redis_client,
fake_upstream_conn, "", "");

EXPECT_TRUE(fake_upstream_conn->close());
redis_client->close();
}

TEST_P(RedisProxyIntegrationTest, WatchUnwatchUnrelatedTransaction) {
initialize();
IntegrationTcpClientPtr redis_client = makeTcpConnection(lookupPort("redis_proxy"));

FakeUpstreamPtr& upstream = fake_upstreams_[0];
FakeRawConnectionPtr fake_upstream_conn;

roundtripToUpstreamStep(upstream, makeBulkStringArray({"watch", "foo"}), "+OK\r\n", redis_client,
fake_upstream_conn, "", "");

roundtripToUpstreamStep(upstream, makeBulkStringArray({"unwatch"}), "+OK\r\n", redis_client,
fake_upstream_conn, "", "");

proxyResponseStep(makeBulkStringArray({"multi"}), "+OK\r\n", redis_client);
proxyResponseStep(makeBulkStringArray({"discard"}), "+OK\r\n", redis_client);

EXPECT_TRUE(fake_upstream_conn->close());
redis_client->close();
}

TEST_P(RedisProxyIntegrationTest, WatchUnwatchInTransaction) {
initialize();
IntegrationTcpClientPtr redis_client = makeTcpConnection(lookupPort("redis_proxy"));

FakeUpstreamPtr& upstream = fake_upstreams_[0];
FakeRawConnectionPtr fake_upstream_conn;

roundtripToUpstreamStep(upstream, makeBulkStringArray({"watch", "foo"}), "+OK\r\n", redis_client,
fake_upstream_conn, "", "");

// MULTI will create a new connection, for the transaction.
FakeRawConnectionPtr fake_upstream_conn2;
roundtripToUpstreamStep(upstream, makeBulkStringArray({"multi"}), "+OK\r\n", redis_client,
fake_upstream_conn2, "", "");

roundtripToUpstreamStep(upstream, makeBulkStringArray({"unwatch"}), "+QUEUED\r\n", redis_client,
fake_upstream_conn2, "", "");

EXPECT_TRUE(fake_upstream_conn->close());
EXPECT_TRUE(fake_upstream_conn2->close());
redis_client->close();
}
// This test discards an empty transaction. The proxy responds
// with an OK.

Expand Down