Skip to content

Commit

Permalink
ddl: Support FLASHBACK DATABASE (#8424)
Browse files Browse the repository at this point in the history
close #8450
  • Loading branch information
JaySon-Huang authored Dec 7, 2023
1 parent 713bca0 commit 3085414
Show file tree
Hide file tree
Showing 10 changed files with 338 additions and 97 deletions.
24 changes: 22 additions & 2 deletions dbms/src/Databases/DatabaseTiFlash.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include <Storages/IManageableStorage.h>
#include <Storages/KVStore/TMTContext.h>
#include <Storages/KVStore/TMTStorages.h>
#include <TiDB/Schema/TiDB.h>
#include <common/logger_useful.h>

namespace DB
Expand Down Expand Up @@ -610,7 +611,7 @@ void DatabaseTiFlash::shutdown()
tables.clear();
}

void DatabaseTiFlash::alterTombstone(const Context & context, Timestamp tombstone_)
void DatabaseTiFlash::alterTombstone(const Context & context, Timestamp tombstone_, const TiDB::DBInfoPtr & new_db_info)
{
const auto database_metadata_path = getDatabaseMetadataPath(metadata_path);
const auto database_metadata_tmp_path = database_metadata_path + ".tmp";
Expand All @@ -622,7 +623,18 @@ void DatabaseTiFlash::alterTombstone(const Context & context, Timestamp tombston

{
// Alter the attach statement in metadata.
auto dbinfo_literal = std::make_shared<ASTLiteral>(Field(db_info == nullptr ? "" : (db_info->serialize())));
std::shared_ptr<ASTLiteral> dbinfo_literal = [&]() {
String seri_info;
if (new_db_info != nullptr)
{
seri_info = new_db_info->serialize();
}
else if (db_info != nullptr)
{
seri_info = db_info->serialize();
}
return std::make_shared<ASTLiteral>(Field(seri_info));
}();
Field format_version_field(static_cast<UInt64>(DatabaseTiFlash::CURRENT_VERSION));
auto version_literal = std::make_shared<ASTLiteral>(format_version_field);
auto tombstone_literal = std::make_shared<ASTLiteral>(Field(tombstone_));
Expand Down Expand Up @@ -651,6 +663,9 @@ void DatabaseTiFlash::alterTombstone(const Context & context, Timestamp tombston
}
else
{
// update the seri dbinfo
args.children[0] = dbinfo_literal;
args.children[1] = version_literal;
// udpate the tombstone mark
args.children[2] = tombstone_literal;
}
Expand Down Expand Up @@ -704,6 +719,11 @@ void DatabaseTiFlash::alterTombstone(const Context & context, Timestamp tombston

// After all done, set the tombstone
tombstone = tombstone_;
// Overwrite db_info if not null
if (new_db_info)
{
db_info = new_db_info;
}
}

void DatabaseTiFlash::drop(const Context & context)
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Databases/DatabaseTiFlash.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class DatabaseTiFlash : public DatabaseWithOwnTablesBase

bool isTombstone() const override { return tombstone != 0; }
Timestamp getTombstone() const override { return tombstone; }
void alterTombstone(const Context & context, Timestamp tombstone_) override;
void alterTombstone(const Context & context, Timestamp tombstone_, const TiDB::DBInfoPtr & new_db_info) override;

void drop(const Context & context) override;

Expand Down
11 changes: 10 additions & 1 deletion dbms/src/Databases/IDatabase.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@
#include <functional>
#include <memory>

namespace TiDB
{
struct DBInfo;
using DBInfoPtr = std::shared_ptr<DBInfo>;
} // namespace TiDB

namespace DB
{
Expand Down Expand Up @@ -144,7 +149,11 @@ class IDatabase : public std::enable_shared_from_this<IDatabase>

virtual bool isTombstone() const { return false; }
virtual Timestamp getTombstone() const { return 0; }
virtual void alterTombstone(const Context & /*context*/, Timestamp /*tombstone_*/) {}
virtual void alterTombstone(
const Context & /*context*/,
Timestamp /*tombstone_*/,
const TiDB::DBInfoPtr & /*new_db_info*/)
{}

/// Delete metadata, the deletion of which differs from the recursive deletion of the directory, if any.
virtual void drop(const Context & context) = 0;
Expand Down
35 changes: 32 additions & 3 deletions dbms/src/Databases/test/gtest_database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@
#include <Storages/IStorage.h>
#include <Storages/KVStore/TMTContext.h>
#include <Storages/KVStore/TMTStorages.h>
#include <Storages/KVStore/Types.h>
#include <Storages/MutableSupport.h>
#include <Storages/registerStorages.h>
#include <TestUtils/TiFlashTestBasic.h>
#include <TiDB/Schema/SchemaNameMapper.h>
#include <TiDB/Schema/TiDB.h>
#include <common/logger_useful.h>

#include <optional>
Expand Down Expand Up @@ -942,6 +944,7 @@ try
)",
};

size_t case_no = 0;
for (const auto & statement : statements)
{
{
Expand All @@ -968,22 +971,48 @@ try
LOG_DEBUG(log, "After create [meta={}]", meta);

DB::Timestamp tso = 1000;
db->alterTombstone(*ctx, tso);
db->alterTombstone(*ctx, tso, nullptr);
EXPECT_TRUE(db->isTombstone());
EXPECT_EQ(db->getTombstone(), tso);
if (case_no != 0)
{
auto db_tiflash = std::dynamic_pointer_cast<DatabaseTiFlash>(db);
ASSERT_NE(db_tiflash, nullptr);
auto db_info = db_tiflash->getDatabaseInfo();
ASSERT_EQ(db_info.name, "test_db"); // not changed
}

// Try restore from disk
db = detachThenAttach(*ctx, db_name, std::move(db), log);
EXPECT_TRUE(db->isTombstone());
EXPECT_EQ(db->getTombstone(), tso);

// Recover
db->alterTombstone(*ctx, 0);
// Recover, usually recover with a new database name
auto new_db_info = std::make_shared<TiDB::DBInfo>(
R"json({"charset":"utf8mb4","collate":"utf8mb4_bin","db_name":{"L":"test_new_db","O":"test_db"},"id":1010,"state":5})json",
NullspaceID);
db->alterTombstone(*ctx, 0, new_db_info);
EXPECT_FALSE(db->isTombstone());
if (case_no != 0)
{
auto db_tiflash = std::dynamic_pointer_cast<DatabaseTiFlash>(db);
ASSERT_NE(db_tiflash, nullptr);
auto db_info = db_tiflash->getDatabaseInfo();
ASSERT_EQ(db_info.name, "test_new_db"); // changed by the `new_db_info`
}

// Try restore from disk
db = detachThenAttach(*ctx, db_name, std::move(db), log);
EXPECT_FALSE(db->isTombstone());
if (case_no != 0)
{
auto db_tiflash = std::dynamic_pointer_cast<DatabaseTiFlash>(db);
ASSERT_NE(db_tiflash, nullptr);
auto db_info = db_tiflash->getDatabaseInfo();
ASSERT_EQ(db_info.name, "test_new_db"); // changed by the `new_db_info`
}

case_no += 1;
}
}
CATCH
Expand Down
101 changes: 76 additions & 25 deletions dbms/src/TiDB/Schema/SchemaBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#include <TiDB/Schema/SchemaBuilder.h>
#include <TiDB/Schema/SchemaNameMapper.h>
#include <TiDB/Schema/TiDB.h>
#include <common/defines.h>
#include <common/logger_useful.h>
#include <fmt/format.h>

Expand Down Expand Up @@ -259,12 +260,17 @@ void SchemaBuilder<Getter, NameMapper>::applyDiff(const SchemaDiff & diff)
{
case SchemaActionType::CreateSchema:
{
applyCreateSchema(diff.schema_id);
applyCreateDatabase(diff.schema_id);
break;
}
case SchemaActionType::DropSchema:
{
applyDropSchema(diff.schema_id);
applyDropDatabase(diff.schema_id);
break;
}
case SchemaActionType::ActionRecoverSchema:
{
applyRecoverDatabase(diff.schema_id);
break;
}
case SchemaActionType::CreateTables:
Expand Down Expand Up @@ -409,7 +415,7 @@ void SchemaBuilder<Getter, NameMapper>::applySetTiFlashReplica(DatabaseID databa
return;
}

// Recover the table if tombstoned
// Recover the table if tombstone
if (storage->isTombstone())
{
applyRecoverLogicalTable(db_info, table_info);
Expand Down Expand Up @@ -568,15 +574,15 @@ void SchemaBuilder<Getter, NameMapper>::applyPartitionDiffOnLogicalTable(
{
LOG_INFO(
log,
"No partition changes, paritions_size={} {} with database_id={}, table_id={}",
"No partition changes, partitions_size={} {} with database_id={}, table_id={}",
new_part_id_set.size(),
name_mapper.debugCanonicalName(*db_info, *table_info),
db_info->id,
table_info->id);
return;
}

// Copy the local table info and update fileds on the copy
// Copy the local table info and update fields on the copy
auto updated_table_info = local_table_info;
updated_table_info.is_partition_table = true;
updated_table_info.belonging_table_id = table_info->belonging_table_id;
Expand Down Expand Up @@ -869,19 +875,19 @@ String createDatabaseStmt(Context & context, const DBInfo & db_info, const Schem
}

template <typename Getter, typename NameMapper>
bool SchemaBuilder<Getter, NameMapper>::applyCreateSchema(DatabaseID schema_id)
bool SchemaBuilder<Getter, NameMapper>::applyCreateDatabase(DatabaseID database_id)
{
auto db_info = getter.getDatabase(schema_id);
auto db_info = getter.getDatabase(database_id);
if (unlikely(db_info == nullptr))
{
return false;
}
applyCreateSchema(db_info);
applyCreateDatabaseByInfo(db_info);
return true;
}

template <typename Getter, typename NameMapper>
void SchemaBuilder<Getter, NameMapper>::applyCreateSchema(const TiDB::DBInfoPtr & db_info)
void SchemaBuilder<Getter, NameMapper>::applyCreateDatabaseByInfo(const TiDB::DBInfoPtr & db_info)
{
GET_METRIC(tiflash_schema_internal_ddl_count, type_create_db).Increment();
LOG_INFO(log, "Create database {} begin, database_id={}", name_mapper.debugDatabaseName(*db_info), db_info->id);
Expand All @@ -901,29 +907,72 @@ void SchemaBuilder<Getter, NameMapper>::applyCreateSchema(const TiDB::DBInfoPtr
}

template <typename Getter, typename NameMapper>
void SchemaBuilder<Getter, NameMapper>::applyDropSchema(DatabaseID schema_id)
void SchemaBuilder<Getter, NameMapper>::applyRecoverDatabase(DatabaseID database_id)
{
TiDB::DBInfoPtr db_info = databases.getDBInfo(schema_id);
auto db_info = getter.getDatabase(database_id);
if (unlikely(db_info == nullptr))
{
LOG_INFO(log, "Try to drop database but not found, may has been dropped, database_id={}", schema_id);
LOG_INFO(
log,
"Recover database is ignored because database is not exist in TiKV,"
" database_id={}",
database_id);
return;
}
LOG_INFO(log, "Recover database begin, database_id={}", database_id);
auto db_name = name_mapper.mapDatabaseName(database_id, keyspace_id);
auto db = context.tryGetDatabase(db_name);
if (unlikely(!db))
{
LOG_ERROR(
log,
"Recover database is ignored because instance is not exists, may have been physically dropped, "
"database_id={}",
db_name,
database_id);
return;
}

{
//TODO: it seems may need a lot time, maybe we can do it in a background thread
auto table_ids = table_id_map.findTablesByDatabaseID(schema_id);
auto table_ids = table_id_map.findTablesByDatabaseID(database_id);
for (auto table_id : table_ids)
applyDropTable(schema_id, table_id);
{
auto table_info = getter.getTableInfo(database_id, table_id);
applyRecoverLogicalTable(db_info, table_info);
}
}

applyDropSchema(name_mapper.mapDatabaseName(*db_info));
// Usually `FLASHBACK DATABASE ... TO ...` will rename the database
db->alterTombstone(context, 0, db_info);
databases.addDatabaseInfo(db_info); // add back database info cache
LOG_INFO(log, "Recover database end, database_id={}", database_id);
}

databases.eraseDBInfo(schema_id);
template <typename Getter, typename NameMapper>
void SchemaBuilder<Getter, NameMapper>::applyDropDatabase(DatabaseID database_id)
{
TiDB::DBInfoPtr db_info = databases.getDBInfo(database_id);
if (unlikely(db_info == nullptr))
{
LOG_INFO(log, "Try to drop database but not found, may has been dropped, database_id={}", database_id);
return;
}

{
//TODO: it seems may need a lot time, maybe we can do it in a background thread
auto table_ids = table_id_map.findTablesByDatabaseID(database_id);
for (auto table_id : table_ids)
applyDropTable(database_id, table_id);
}

applyDropDatabaseByName(name_mapper.mapDatabaseName(database_id, keyspace_id));

databases.eraseDBInfo(database_id);
}

template <typename Getter, typename NameMapper>
void SchemaBuilder<Getter, NameMapper>::applyDropSchema(const String & db_name)
void SchemaBuilder<Getter, NameMapper>::applyDropDatabaseByName(const String & db_name)
{
GET_METRIC(tiflash_schema_internal_ddl_count, type_drop_db).Increment();
LOG_INFO(log, "Tombstone database begin, db_name={}", db_name);
Expand All @@ -945,9 +994,9 @@ void SchemaBuilder<Getter, NameMapper>::applyDropSchema(const String & db_name)
// In such way our database (and its belonging tables) will be GC-ed later than TiDB, which is safe and correct.
auto & tmt_context = context.getTMTContext();
auto tombstone = tmt_context.getPDClient()->getTS();
db->alterTombstone(context, tombstone);
db->alterTombstone(context, tombstone, /*new_db_info*/ nullptr); // keep the old db_info

LOG_INFO(log, "Tombstone database end, db_name={}", db_name);
LOG_INFO(log, "Tombstone database end, db_name={} tombstone={}", db_name, tombstone);
}

std::tuple<NamesAndTypes, Strings> parseColumnsFromTableInfo(const TiDB::TableInfo & table_info)
Expand Down Expand Up @@ -1115,6 +1164,7 @@ void SchemaBuilder<Getter, NameMapper>::applyDropPhysicalTable(const String & db
name_mapper.debugTableName(storage->getTableInfo()),
table_id);

const UInt64 tombstone_ts = tmt_context.getPDClient()->getTS();
// TODO:try to optimize alterCommands
AlterCommands commands;
{
Expand All @@ -1125,17 +1175,18 @@ void SchemaBuilder<Getter, NameMapper>::applyDropPhysicalTable(const String & db
// 1. Use current timestamp, which is after TiDB's drop time, to be the tombstone of this table;
// 2. Use the same GC safe point as TiDB.
// In such way our table will be GC-ed later than TiDB, which is safe and correct.
command.tombstone = tmt_context.getPDClient()->getTS();
command.tombstone = tombstone_ts;
commands.emplace_back(std::move(command));
}
auto alter_lock = storage->lockForAlter(getThreadNameAndID());
storage->updateTombstone(alter_lock, commands, db_name, storage->getTableInfo(), name_mapper, context);
LOG_INFO(
log,
"Tombstone table {}.{} end, table_id={}",
"Tombstone table {}.{} end, table_id={} tombstone={}",
db_name,
name_mapper.debugTableName(storage->getTableInfo()),
table_id);
table_id,
tombstone_ts);
}

template <typename Getter, typename NameMapper>
Expand Down Expand Up @@ -1204,7 +1255,7 @@ void SchemaBuilder<Getter, NameMapper>::syncAllSchema()
{
break;
}
applyCreateSchema(db_info);
applyCreateDatabaseByInfo(db_info);
{
std::unique_lock<std::mutex> created_db_set_lock(created_db_set_mutex);
created_db_set.emplace(name_mapper.mapDatabaseName(*db_info));
Expand Down Expand Up @@ -1302,7 +1353,7 @@ void SchemaBuilder<Getter, NameMapper>::syncAllSchema()
}
if (created_db_set.count(it->first) == 0 && !isReservedDatabase(context, it->first))
{
applyDropSchema(it->first);
applyDropDatabaseByName(it->first);
LOG_INFO(log, "Database {} dropped during sync all schemas", it->first);
}
}
Expand Down Expand Up @@ -1474,7 +1525,7 @@ void SchemaBuilder<Getter, NameMapper>::dropAllSchema()
{
continue;
}
applyDropSchema(db.first);
applyDropDatabaseByName(db.first);
LOG_INFO(log, "Database {} dropped during drop all schemas", db.first);
}

Expand Down
Loading

0 comments on commit 3085414

Please sign in to comment.