Skip to content

Commit

Permalink
Extract common parts of ApplySnapshot (#8110)
Browse files Browse the repository at this point in the history
ref #8081
  • Loading branch information
CalvinNeo authored Sep 19, 2023
1 parent 537cccf commit 158296d
Show file tree
Hide file tree
Showing 10 changed files with 259 additions and 164 deletions.
90 changes: 90 additions & 0 deletions dbms/src/Storages/DeltaMerge/BoundedSSTFilesToBlockInputStream.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Interpreters/Context.h>
#include <Poco/File.h>
#include <Storages/DeltaMerge/DMVersionFilterBlockInputStream.h>
#include <Storages/DeltaMerge/DeltaMergeStore.h>
#include <Storages/DeltaMerge/File/DMFileBlockOutputStream.h>
#include <Storages/DeltaMerge/PKSquashingBlockInputStream.h>
#include <Storages/DeltaMerge/SSTFilesToBlockInputStream.h>
#include <Storages/KVStore/Decode/PartitionStreams.h>
#include <Storages/KVStore/FFI/ProxyFFI.h>
#include <Storages/StorageDeltaMerge.h>
#include <common/logger_useful.h>

namespace DB
{
namespace DM
{
BoundedSSTFilesToBlockInputStream::BoundedSSTFilesToBlockInputStream( //
SSTFilesToBlockInputStreamPtr child,
const ColId pk_column_id_,
const DecodingStorageSchemaSnapshotConstPtr & schema_snap)
: pk_column_id(pk_column_id_)
, _raw_child(std::move(child))
{
const bool is_common_handle = schema_snap->is_common_handle;
// Initlize `mvcc_compact_stream`
// First refine the boundary of blocks. Note that the rows decoded from SSTFiles are sorted by primary key asc, timestamp desc
// (https://github.com/tikv/tikv/blob/v5.0.1/components/txn_types/src/types.rs#L103-L108).
// While DMVersionFilter require rows sorted by primary key asc, timestamp asc, so we need an extra sort in PKSquashing.
auto stream = std::make_shared<PKSquashingBlockInputStream</*need_extra_sort=*/true>>(
_raw_child,
pk_column_id,
is_common_handle);
mvcc_compact_stream = std::make_unique<DMVersionFilterBlockInputStream<DM_VERSION_FILTER_MODE_COMPACT>>(
stream,
*(schema_snap->column_defines),
_raw_child->opts.gc_safepoint,
is_common_handle);
}

void BoundedSSTFilesToBlockInputStream::readPrefix()
{
mvcc_compact_stream->readPrefix();
}

void BoundedSSTFilesToBlockInputStream::readSuffix()
{
mvcc_compact_stream->readSuffix();
}

Block BoundedSSTFilesToBlockInputStream::read()
{
return mvcc_compact_stream->read();
}

SSTFilesToBlockInputStream::ProcessKeys BoundedSSTFilesToBlockInputStream::getProcessKeys() const
{
return _raw_child->process_keys;
}

RegionPtr BoundedSSTFilesToBlockInputStream::getRegion() const
{
return _raw_child->region;
}

std::tuple<size_t, size_t, size_t, UInt64> //
BoundedSSTFilesToBlockInputStream::getMvccStatistics() const
{
return std::make_tuple(
mvcc_compact_stream->getEffectiveNumRows(),
mvcc_compact_stream->getNotCleanRows(),
mvcc_compact_stream->getDeletedRows(),
mvcc_compact_stream->getGCHintVersion());
}

} // namespace DM
} // namespace DB
90 changes: 10 additions & 80 deletions dbms/src/Storages/DeltaMerge/SSTFilesToBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,8 @@
#include <Interpreters/Context.h>
#include <Poco/File.h>
#include <RaftStoreProxyFFI/ColumnFamily.h>
#include <Storages/DeltaMerge/DMVersionFilterBlockInputStream.h>
#include <Storages/DeltaMerge/DeltaMergeStore.h>
#include <Storages/DeltaMerge/File/DMFile.h>
#include <Storages/DeltaMerge/File/DMFileBlockOutputStream.h>
#include <Storages/DeltaMerge/PKSquashingBlockInputStream.h>
#include <Storages/DeltaMerge/SSTFilesToBlockInputStream.h>
#include <Storages/KVStore/Decode/PartitionStreams.h>
#include <Storages/KVStore/FFI/ProxyFFI.h>
Expand All @@ -39,27 +36,21 @@ extern const int ILLFORMAT_RAFT_ROW;
namespace DM
{
SSTFilesToBlockInputStream::SSTFilesToBlockInputStream( //
const std::string & log_prefix_,
RegionPtr region_,
UInt64 snapshot_index_,
const SSTViewVec & snaps_,
const TiFlashRaftProxyHelper * proxy_helper_,
DecodingStorageSchemaSnapshotConstPtr schema_snap_,
Timestamp gc_safepoint_,
bool force_decode_,
TMTContext & tmt_,
size_t expected_size_)
SSTFilesToBlockInputStreamOpts && opts_)
: region(std::move(region_))
, snapshot_index(snapshot_index_)
, snaps(snaps_)
, proxy_helper(proxy_helper_)
, schema_snap(std::move(schema_snap_))
, tmt(tmt_)
, gc_safepoint(gc_safepoint_)
, expected_size(expected_size_)
, log(Logger::get(log_prefix_))
, force_decode(force_decode_)
{}
, opts(std::move(opts_))
{
log = Logger::get(opts.log_prefix);
}

SSTFilesToBlockInputStream::~SSTFilesToBlockInputStream() = default;

Expand Down Expand Up @@ -170,14 +161,14 @@ Block SSTFilesToBlockInputStream::read()
region->insert(ColumnFamilyType::Write, TiKVKey(key.data, key.len), TiKVValue(value.data, value.len));
++process_keys.write_cf;
process_keys.write_cf_bytes += (key.len + value.len);
if (process_keys.write_cf % expected_size == 0)
if (process_keys.write_cf % opts.expected_size == 0)
{
loaded_write_cf_key.assign(key.data, key.len);
}
} // Notice: `key`, `value` are string-view-like object, should never use after `next` called
write_cf_reader->next();

if (process_keys.write_cf % expected_size == 0)
if (process_keys.write_cf % opts.expected_size == 0)
{
// If we should form a new block.
const DecodedTiKVKey rowkey = RecordKVFormat::decodeTiKVKey(TiKVKey(std::move(loaded_write_cf_key)));
Expand Down Expand Up @@ -297,7 +288,7 @@ void SSTFilesToBlockInputStream::loadCFDataFromSST(
// Update the end offset.
// If there are no more key-value, the outer while loop will be break.
// Else continue to read next batch from current CF.
process_keys_offset_end += expected_size;
process_keys_offset_end += opts.expected_size;
}
}

Expand All @@ -310,7 +301,7 @@ Block SSTFilesToBlockInputStream::readCommitedBlock()
{
// Read block from `region`. If the schema has been updated, it will
// throw an exception with code `ErrorCodes::REGION_DATA_SCHEMA_UPDATED`
return GenRegionBlockDataWithSchema(region, schema_snap, gc_safepoint, force_decode, tmt);
return GenRegionBlockDataWithSchema(region, opts.schema_snap, opts.gc_safepoint, opts.force_decode, tmt);
}
catch (DB::Exception & e)
{
Expand Down Expand Up @@ -340,66 +331,5 @@ Block SSTFilesToBlockInputStream::readCommitedBlock()
throw;
}
}

/// Methods for BoundedSSTFilesToBlockInputStream

BoundedSSTFilesToBlockInputStream::BoundedSSTFilesToBlockInputStream( //
SSTFilesToBlockInputStreamPtr child,
const ColId pk_column_id_,
const DecodingStorageSchemaSnapshotConstPtr & schema_snap)
: pk_column_id(pk_column_id_)
, _raw_child(std::move(child))
{
const bool is_common_handle = schema_snap->is_common_handle;
// Initlize `mvcc_compact_stream`
// First refine the boundary of blocks. Note that the rows decoded from SSTFiles are sorted by primary key asc, timestamp desc
// (https://github.com/tikv/tikv/blob/v5.0.1/components/txn_types/src/types.rs#L103-L108).
// While DMVersionFilter require rows sorted by primary key asc, timestamp asc, so we need an extra sort in PKSquashing.
auto stream = std::make_shared<PKSquashingBlockInputStream</*need_extra_sort=*/true>>(
_raw_child,
pk_column_id,
is_common_handle);
mvcc_compact_stream = std::make_unique<DMVersionFilterBlockInputStream<DM_VERSION_FILTER_MODE_COMPACT>>(
stream,
*(schema_snap->column_defines),
_raw_child->gc_safepoint,
is_common_handle);
}

void BoundedSSTFilesToBlockInputStream::readPrefix()
{
mvcc_compact_stream->readPrefix();
}

void BoundedSSTFilesToBlockInputStream::readSuffix()
{
mvcc_compact_stream->readSuffix();
}

Block BoundedSSTFilesToBlockInputStream::read()
{
return mvcc_compact_stream->read();
}

SSTFilesToBlockInputStream::ProcessKeys BoundedSSTFilesToBlockInputStream::getProcessKeys() const
{
return _raw_child->process_keys;
}

RegionPtr BoundedSSTFilesToBlockInputStream::getRegion() const
{
return _raw_child->region;
}

std::tuple<size_t, size_t, size_t, UInt64> //
BoundedSSTFilesToBlockInputStream::getMvccStatistics() const
{
return std::make_tuple(
mvcc_compact_stream->getEffectiveNumRows(),
mvcc_compact_stream->getNotCleanRows(),
mvcc_compact_stream->getDeletedRows(),
mvcc_compact_stream->getGCHintVersion());
}

} // namespace DM
} // namespace DB
} // namespace DB
24 changes: 14 additions & 10 deletions dbms/src/Storages/DeltaMerge/SSTFilesToBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,26 +50,33 @@ using SSTFilesToBlockInputStreamPtr = std::shared_ptr<SSTFilesToBlockInputStream
class BoundedSSTFilesToBlockInputStream;
using BoundedSSTFilesToBlockInputStreamPtr = std::shared_ptr<BoundedSSTFilesToBlockInputStream>;

struct SSTFilesToBlockInputStreamOpts
{
std::string log_prefix;
DecodingStorageSchemaSnapshotConstPtr schema_snap;
Timestamp gc_safepoint;
// Whether abort when meeting an error in decoding.
bool force_decode;
// The expected size of emitted `Block`.
size_t expected_size;
};

// Read blocks from TiKV's SSTFiles
class SSTFilesToBlockInputStream final : public IBlockInputStream
{
public:
SSTFilesToBlockInputStream( //
const std::string & log_prefix_,
RegionPtr region_,
UInt64 snapshot_index_,
const SSTViewVec & snaps_,
const TiFlashRaftProxyHelper * proxy_helper_,
DecodingStorageSchemaSnapshotConstPtr schema_snap_,
Timestamp gc_safepoint_,
bool force_decode_,
TMTContext & tmt_,
size_t expected_size_ = DEFAULT_MERGE_BLOCK_SIZE);
SSTFilesToBlockInputStreamOpts && opts_);
~SSTFilesToBlockInputStream() override;

String getName() const override { return "SSTFilesToBlockInputStream"; }

Block getHeader() const override { return toEmptyBlock(*(schema_snap->column_defines)); }
Block getHeader() const override { return toEmptyBlock(*(opts.schema_snap->column_defines)); }

void readPrefix() override;
void readSuffix() override;
Expand Down Expand Up @@ -101,10 +108,8 @@ class SSTFilesToBlockInputStream final : public IBlockInputStream
UInt64 snapshot_index;
const SSTViewVec & snaps;
const TiFlashRaftProxyHelper * proxy_helper{nullptr};
DecodingStorageSchemaSnapshotConstPtr schema_snap;
TMTContext & tmt;
const Timestamp gc_safepoint;
size_t expected_size;
const SSTFilesToBlockInputStreamOpts opts;
LoggerPtr log;

using SSTReaderPtr = std::unique_ptr<SSTReader>;
Expand All @@ -117,7 +122,6 @@ class SSTFilesToBlockInputStream final : public IBlockInputStream

friend class BoundedSSTFilesToBlockInputStream;

const bool force_decode;
bool is_decode_cancelled = false;

ProcessKeys process_keys;
Expand Down
5 changes: 3 additions & 2 deletions dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@
#include <Storages/KVStore/FFI/ProxyFFICommon.h>
#include <Storages/KVStore/KVStore.h>
#include <Storages/KVStore/MultiRaft/Disagg/CheckpointInfo.h>
#include <Storages/KVStore/MultiRaft/Disagg/FastAddPeerAsyncTasksImpl.h>
#include <Storages/KVStore/MultiRaft/Disagg/FastAddPeer.h>
#include <Storages/KVStore/MultiRaft/Disagg/FastAddPeerCache.h>
#include <Storages/KVStore/Region.h>
#include <Storages/KVStore/TMTContext.h>
#include <Storages/KVStore/Utils/AsyncTasks.h>
#include <Storages/Page/Config.h>
#include <Storages/Page/V3/CheckpointFile/CPManifestFileReader.h>
#include <Storages/Page/V3/PageDirectory.h>
Expand All @@ -49,7 +50,7 @@ FastAddPeerContext::FastAddPeerContext(uint64_t thread_count)
static constexpr int region_per_sec = 2;
thread_count = ffi_handle_sec * region_per_sec;
}
tasks_trace = std::make_shared<AsyncTasks>(thread_count);
tasks_trace = std::make_shared<FAPAsyncTasks>(thread_count);
}

ParsedCheckpointDataHolderPtr FastAddPeerContext::CheckpointCacheElement::getParsedCheckpointData(Context & context)
Expand Down
6 changes: 4 additions & 2 deletions dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeer.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@

#pragma once

#include <Storages/KVStore/FFI/ProxyFFI.h>
#include <Storages/KVStore/MultiRaft/Disagg/FastAddPeerCache.h>
#include <Storages/KVStore/Utils/AsyncTasks.h>

namespace DB
{
struct AsyncTasks;
using FAPAsyncTasks = AsyncTasks<uint64_t, std::function<FastAddPeerRes()>, FastAddPeerRes>;

class FastAddPeerContext
{
Expand All @@ -32,7 +34,7 @@ class FastAddPeerContext
UInt64 required_seq);

public:
std::shared_ptr<AsyncTasks> tasks_trace;
std::shared_ptr<FAPAsyncTasks> tasks_trace;

private:
class CheckpointCacheElement
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@
#include <Encryption/PosixRandomAccessFile.h>
#include <Interpreters/Context.h>
#include <Interpreters/SharedContexts/Disagg.h>
#include <Storages/KVStore/FFI/ProxyFFI.h>
#include <Storages/KVStore/KVStore.h>
#include <Storages/KVStore/MultiRaft/Disagg/CheckpointInfo.h>
#include <Storages/KVStore/MultiRaft/Disagg/FastAddPeerAsyncTasksImpl.h>
#include <Storages/KVStore/MultiRaft/Disagg/FastAddPeerCache.h>
#include <Storages/KVStore/Region.h>
#include <Storages/KVStore/TMTContext.h>
#include <Storages/KVStore/Utils/AsyncTasks.h>
#include <Storages/Page/V3/CheckpointFile/CPManifestFileReader.h>
#include <Storages/Page/V3/PageDirectory.h>
#include <Storages/Page/V3/Universal/RaftDataReader.h>
Expand Down
2 changes: 0 additions & 2 deletions dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeerCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ namespace DB
class UniversalPageStorage;
using UniversalPageStoragePtr = std::shared_ptr<UniversalPageStorage>;

struct AsyncTasks;

// A mapping from segment end key to segment id,
// The main usage:
// auto lock = lock();
Expand Down
Loading

0 comments on commit 158296d

Please sign in to comment.