Skip to content

Commit

Permalink
feat(GCS+gRPC): treat some kAlreadyExists errors as transient (#9564)
Browse files Browse the repository at this point in the history
The service is (we think incorrectly) returning `kAlreadyExists` for
some uploads. We need a workaround to unblock some testing. The GCS
team is working on a fix in parallel.
  • Loading branch information
coryan authored Jul 26, 2022
1 parent 70d7f2d commit 29ff9d3
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 28 deletions.
34 changes: 23 additions & 11 deletions google/cloud/storage/internal/retry_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,25 @@ Status ValidateCommittedSize(UploadChunkRequest const& request,
return {};
}

// For resumable uploads over gRPC we need to treat some non-retryable errors
// as retryable.
bool UploadChunkOnFailure(RetryPolicy& retry_policy, int& count,
Status const& status) {
// TODO(#9273) - use ErrorInfo when it becomes available
if (status.code() == StatusCode::kAborted &&
absl::StartsWith(status.message(), "Concurrent requests received.")) {
return retry_policy.OnFailure(Status(
StatusCode::kUnavailable, "TODO(#9273) - workaround service problems"));
}
// TODO(#9563) - kAlreadyExist is sometimes spurious
if (status.code() == StatusCode::kAlreadyExists &&
status.message() == "Requested entity already exists" && ++count == 1) {
return retry_policy.OnFailure(Status(
StatusCode::kUnavailable, "TODO(#9563) - workaround service problems"));
}
return retry_policy.OnFailure(status);
}

} // namespace

std::shared_ptr<RetryClient> RetryClient::Create(
Expand Down Expand Up @@ -516,24 +535,17 @@ StatusOr<QueryResumableUploadResponse> RetryClient::UploadChunk(
auto const expected_committed_size =
request.offset() + request.payload_size();

int count_workaround_9563 = 0;

while (!retry_policy->IsExhausted()) {
auto result = (*operation)(committed_size);
if (!result) {
// On a failure we preserve the error, then query if retry policy allows
// retrying. If so, we backoff, and switch to calling
// QueryResumableUpload().
last_status = std::move(result).status();
// For resumable uploads over gRPC some kAborted errors are retryable.
// TODO(#9273) - use ErrorInfo when it becomes available
auto constexpr kConcurrentMessagePrefix = "Concurrent requests received.";
auto const is_concurrent_write =
last_status.code() == StatusCode::kAborted &&
absl::StartsWith(last_status.message(), kConcurrentMessagePrefix);
auto const is_retryable =
is_concurrent_write
? retry_policy->OnFailure(Status(StatusCode::kUnavailable, ""))
: retry_policy->OnFailure(last_status);
if (!is_retryable) {
if (!UploadChunkOnFailure(*retry_policy, count_workaround_9563,
last_status)) {
return return_error(std::move(last_status), *retry_policy, __func__);
}

Expand Down
79 changes: 62 additions & 17 deletions google/cloud/storage/internal/retry_client_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -189,22 +189,10 @@ TEST(RetryClientTest, UploadChunkHandleTransient) {
.WillOnce(
Return(QueryResumableUploadResponse{2 * quantum, absl::nullopt}));

// Repeat the failure with kAborted. This error code is only retryable for
// resumable uploads.
EXPECT_CALL(*mock, UploadChunk)
.WillOnce(Return(
Status(StatusCode::kAborted, "Concurrent requests received.")));
EXPECT_CALL(*mock, QueryResumableUpload)
.WillOnce(
Return(QueryResumableUploadResponse{2 * quantum, absl::nullopt}));
EXPECT_CALL(*mock, UploadChunk)
.WillOnce(
Return(QueryResumableUploadResponse{3 * quantum, absl::nullopt}));

// Even simpler scenario where only the UploadChunk() calls succeeds.
EXPECT_CALL(*mock, UploadChunk)
.WillOnce(
Return(QueryResumableUploadResponse{4 * quantum, absl::nullopt}));
Return(QueryResumableUploadResponse{3 * quantum, absl::nullopt}));

auto response = client->UploadChunk(
UploadChunkRequest("test-only-session-id", 0, {{payload}}));
Expand All @@ -220,11 +208,68 @@ TEST(RetryClientTest, UploadChunkHandleTransient) {
UploadChunkRequest("test-only-session-id", 2 * quantum, {{payload}}));
ASSERT_STATUS_OK(response);
EXPECT_EQ(3 * quantum, response->committed_size.value_or(0));
}

response = client->UploadChunk(
UploadChunkRequest("test-only-session-id", 3 * quantum, {{payload}}));
ASSERT_STATUS_OK(response);
EXPECT_EQ(4 * quantum, response->committed_size.value_or(0));
// TODO(#9293) - fix this test to use ErrorInfo
Status TransientAbortError() {
return Status(StatusCode::kAborted, "Concurrent requests received.");
}

/// @test Verify that transient failures are handled as expected.
TEST(RetryClientTest, UploadChunkAbortedMaybeIsTransient) {
auto mock = std::make_shared<testing::MockClient>();
auto client = RetryClient::Create(std::shared_ptr<internal::RawClient>(mock));
google::cloud::internal::OptionsSpan const span(
BasicTestPolicies().set<IdempotencyPolicyOption>(
StrictIdempotencyPolicy().clone()));

auto const quantum = UploadChunkRequest::kChunkSizeQuantum;
std::string const payload(quantum, '0');

// Verify that the workaround for "transients" (as defined in #9563) results
// in calls to QueryResumableUpload().
EXPECT_CALL(*mock, UploadChunk)
.Times(4)
.WillRepeatedly(Return(TransientAbortError()));
EXPECT_CALL(*mock, QueryResumableUpload)
.Times(AtLeast(2))
.WillRepeatedly(Return(QueryResumableUploadResponse{0, absl::nullopt}));

auto response = client->UploadChunk(
UploadChunkRequest("test-only-session-id", 0, {{payload}}));
EXPECT_THAT(response, StatusIs(StatusCode::kAborted,
HasSubstr("Concurrent requests received.")));
}

// TODO(#9563) - remove this test once it is not needed
Status Error9563() {
return Status(StatusCode::kAlreadyExists, "Requested entity already exists");
}

/// @test Verify that transient failures are handled as expected.
TEST(RetryClientTest, UploadChunkWorkaround9563) {
auto mock = std::make_shared<testing::MockClient>();
auto client = RetryClient::Create(std::shared_ptr<internal::RawClient>(mock));
google::cloud::internal::OptionsSpan const span(
BasicTestPolicies().set<IdempotencyPolicyOption>(
StrictIdempotencyPolicy().clone()));

auto const quantum = UploadChunkRequest::kChunkSizeQuantum;
std::string const payload(quantum, '0');

// Verify that the workaround for "transients" (as defined in #9563) results
// in calls to QueryResumableUpload().
::testing::InSequence sequence;
EXPECT_CALL(*mock, UploadChunk).WillOnce(Return(Error9563()));
EXPECT_CALL(*mock, QueryResumableUpload)
.WillOnce(Return(QueryResumableUploadResponse{0, absl::nullopt}));
// The second error should be a permanent failure
EXPECT_CALL(*mock, UploadChunk).WillOnce(Return(Error9563()));

auto response = client->UploadChunk(
UploadChunkRequest("test-only-session-id", 0, {{payload}}));
EXPECT_THAT(response, StatusIs(StatusCode::kAlreadyExists,
HasSubstr("Requested entity already exists")));
}

/// @test Verify that we can restore a session and continue writing.
Expand Down

0 comments on commit 29ff9d3

Please sign in to comment.