-
Notifications
You must be signed in to change notification settings - Fork 290
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enhance parallel #4369
Enhance parallel #4369
Conversation
…execution process for those previous blocks that maybe failed to be executed
2, sync process will mutex
2, add mutex when dag commiting
WalkthroughThis pull request introduces a series of modifications across multiple components of the blockchain system, focusing on block synchronization, DAG (Directed Acyclic Graph) management, and error handling. The changes primarily involve refactoring block connectivity checks, enhancing synchronization logic, and improving error management in block-related operations. Key modifications include updates to block existence verification, synchronization request handling, and simplification of certain block processing methods. Changes
Possibly related PRs
Suggested reviewers
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 9
🧹 Nitpick comments (8)
node/src/node.rs (1)
10-10
: Unnecessary Import ofanyhow::Result
The import
use anyhow::Result;
may be unnecessary ifResult
is not used from theanyhow
crate in this file. This can clutter the namespace and potentially cause confusion.If
anyhow::Result
is not specifically required, consider removing the import:-use anyhow::Result;
flexidag/src/blockdag.rs (2)
104-195
: Refactor to Eliminate Code Duplication inhas_block_connected
The method
has_block_connected
contains repetitive patterns for error handling and logging when fetching data from storage. This duplication can make maintenance more challenging and increase the risk of inconsistencies.Refactor the repetitive error handling into a helper function:
fn get_from_store<T, F>(&self, fetch_fn: F, id: HashValue, data_name: &str) -> anyhow::Result<T> where F: FnOnce(HashValue) -> Result<T, StoreError>, { match fetch_fn(id) { Ok(data) => Ok(data), Err(e) => { warn!( "Failed to get {} by hash {:?}: {:?}, the block should be re-executed", data_name, id, e ); Ok(false) } } }Then use this helper in
has_block_connected
:let ghostdata = self.get_from_store( |id| self.storage.ghost_dag_store.get_data(id), block_header.id(), "ghostdata", )?; let dag_header = self.get_from_store( |id| self.storage.header_store.get_header(id), block_header.id(), "header", )?; // Similarly for parents and other data fetches
633-638
: Sensitive Information LoggingThe
info
logs include detailed information about the DAG state and ghost data, which may expose sensitive internal data.Consider reducing the verbosity of logs or adjusting the log level to
debug
to prevent potential information leakage:-info!("start to calculate the mergeset and tips, previous pruning point: {:?}, previous ghostdata: {:?} and its red block count: {:?}", previous_pruning_point, previous_ghostdata.to_compact(), previous_ghostdata.mergeset_reds.len()); +debug!("Calculating mergeset and tips for pruning point: {:?}", previous_pruning_point);sync/src/sync.rs (1)
189-211
: Simplify Sync Stage Management intry_to_start_sync
The
try_to_start_sync
method introduces complex logic for managing sync stages, which may affect code readability and maintainability.Consider simplifying the sync stage transitions or leveraging a state machine pattern to make the code more understandable and less error-prone.
sync/api/src/lib.rs (1)
97-106
: Fix typo in struct name: "Tagret" should be "Target".The struct name contains a typo that should be corrected for better code readability and maintainability.
Apply this diff to fix the typo:
-pub struct SyncSpecificTagretRequest { +pub struct SyncSpecificTargetRequest {sync/src/block_connector/block_connector_service.rs (1)
134-144
: Consider extracting magic numbers into named constants.The implementation is correct but could be improved by:
- Extracting
2
into a named constant to explain its significance- Adding documentation to explain why
2 * G_BASE_MAX_UNCLES_PER_BLOCK
was chosen as the thresholdApply this diff to improve readability:
+ /// The multiplier used to determine the maximum gap for considering a block as "near". + /// This value is set to 2 because [add rationale here]. + const NEAR_BLOCK_GAP_MULTIPLIER: u64 = 2; + fn is_near_block(&self, block_header: &BlockHeader) -> bool { let current_number = self.chain_service.get_main().status().head().number(); if current_number <= block_header.number() { return false; } let gap = current_number.saturating_sub(block_header.number()); - if gap <= G_BASE_MAX_UNCLES_PER_BLOCK.saturating_mul(2) { + if gap <= G_BASE_MAX_UNCLES_PER_BLOCK.saturating_mul(NEAR_BLOCK_GAP_MULTIPLIER) { return true; } false }sync/src/block_connector/write_block_chain.rs (1)
383-431
: Consider removing commented-out code.The large block of commented-out code should be removed if it's no longer needed. If this code needs to be preserved for historical reasons, consider moving it to documentation or a separate design document.
- // let start = new_head_block.header().id(); - // let lastest = self.main.status().head.clone(); - // ... - // }sync/src/verified_rpc_client.rs (1)
738-824
: Enhance error handling in block retrieval.The new
get_block_diligently
method improves reliability but has some areas for improvement:
- Consider adding timeout handling for peer requests
- Add metrics for tracking successful/failed retrievals per peer
- Consider implementing exponential backoff for retries
Example implementation with timeout:
let blocks = match self - .get_blocks_inner(peer_info.peer_id(), waiting_list.clone()) + .get_blocks_inner_with_timeout(peer_info.peer_id(), waiting_list.clone(), Duration::from_secs(10)) .await
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (15)
block-relayer/src/block_relayer.rs
(0 hunks)chain/src/chain.rs
(3 hunks)flexidag/src/blockdag.rs
(9 hunks)flexidag/src/reachability/inquirer.rs
(1 hunks)flexidag/tests/tests.rs
(1 hunks)network-p2p/src/service.rs
(1 hunks)network/api/src/peer_provider.rs
(1 hunks)node/src/node.rs
(4 hunks)sync/api/src/lib.rs
(3 hunks)sync/src/block_connector/block_connector_service.rs
(4 hunks)sync/src/block_connector/write_block_chain.rs
(3 hunks)sync/src/parallel/executor.rs
(1 hunks)sync/src/sync.rs
(5 hunks)sync/src/tasks/block_sync_task.rs
(0 hunks)sync/src/verified_rpc_client.rs
(2 hunks)
💤 Files with no reviewable changes (2)
- block-relayer/src/block_relayer.rs
- sync/src/tasks/block_sync_task.rs
✅ Files skipped from review due to trivial changes (2)
- network/api/src/peer_provider.rs
- network-p2p/src/service.rs
🔇 Additional comments (7)
sync/api/src/lib.rs (1)
21-51
: LGTM! Well-structured block sorting implementation.The implementation correctly orders blocks by number first and then by header ID, which is a logical ordering for blockchain operations. The trait implementations are consistent and follow Rust's ordering conventions.
sync/src/parallel/executor.rs (1)
72-72
: Verify the impact of switching tohas_block_connected
.The change from
has_dag_block
tohas_block_connected
represents a shift in how block connectivity is verified. This could affect the behavior of parent block validation.Run the following script to understand the differences between these methods:
✅ Verification successful
✓ The change to
has_block_connected
is valid and safeThe switch from
has_dag_block
tohas_block_connected
enforces stronger guarantees by ensuring blocks are not just present but properly connected with valid ghostdata, which is essential for DAG-based parent validation.🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Find all usages of both methods to understand the impact. echo "Finding usages of has_dag_block:" rg -A 5 'has_dag_block' echo "Finding usages of has_block_connected:" rg -A 5 'has_block_connected'Length of output: 11361
flexidag/src/reachability/inquirer.rs (1)
77-100
: LGTM! Enhanced error handling with better error differentiation.The changes improve error handling by:
- Collecting
DataInconsistency
errors for batch processing- Immediately returning on
HashesNotOrdered
errors- Propagating other unexpected errors immediately
This approach provides more granular control over different error scenarios.
sync/src/block_connector/block_connector_service.rs (1)
331-339
: Verify the sync strategy selection logic.The implementation correctly differentiates between light sync and burden sync based on block proximity. However, ensure that this change doesn't affect the synchronization performance or correctness.
Run the following script to understand the sync strategy usage:
✅ Verification successful
Sync strategy selection logic is correctly implemented
The differentiation between light sync for near blocks and burden sync for far blocks is consistent with the codebase-wide usage patterns. The implementation properly optimizes synchronization based on block proximity.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Find all usages of both sync strategies to understand the impact. echo "Finding light sync usages:" rg -A 5 'SyncSpecificT[a-z]*Request' echo "Finding burden sync usages:" rg -A 5 'CheckSyncEvent'Length of output: 8063
sync/src/block_connector/write_block_chain.rs (1)
660-660
: Verify the impact of the block existence check change.The method call has been changed from
self.main.dag().has_dag_block(*parent_hash)
toself.main.has_dag_block(*parent_hash)
. While this change aligns with the new block connectivity checks, we should verify that all parent hash checks are handled consistently.✅ Verification successful
Block existence check change is consistent with codebase patterns
The modification aligns perfectly with the codebase's block existence check patterns. All instances use the direct
has_dag_block
method, and no inconsistencies were found.🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Check for any remaining direct calls to dag().has_dag_block # to ensure consistency in block existence checks rg "dag\(\)\.has_dag_block" --type rustLength of output: 41
Script:
#!/bin/bash # Search for all occurrences of has_dag_block echo "=== Searching for has_dag_block usage ===" rg "has_dag_block" --type rust -A 2 # Search for dag() method calls to find similar patterns echo -e "\n=== Searching for dag() method calls ===" rg "\.dag\(\)" --type rust -A 2 # Try to find the method definition echo -e "\n=== Looking for has_dag_block definition ===" ast-grep --pattern 'fn has_dag_block($$$) { $$$ }'Length of output: 12214
flexidag/tests/tests.rs (1)
152-152
: LGTM! Test updated to use new block connectivity check.The test has been correctly updated to use
has_block_connected
instead ofhas_dag_block
, maintaining consistency with the new block connectivity verification pattern.chain/src/chain.rs (1)
1372-1372
: LGTM! Block connectivity check updated.The implementation has been correctly updated to use
has_block_connected
for consistent block connectivity verification.
node/src/node.rs
Outdated
let sync_service = ctx.service_ref::<SyncService>()?.clone(); | ||
let fut = async move { | ||
info!("Prepare to re execute block {}", block_hash); | ||
let block = match storage.get_block(block_hash)? { | ||
Some(block) => Some(block), | ||
None => { | ||
info!("Get block from peer to peer network"); | ||
//get block from peer to peer network. | ||
let peer_set = network.peer_set().await?; | ||
if peer_set.is_empty() { | ||
info!("Peers is empty."); | ||
None | ||
} else { | ||
let peer_selector = | ||
PeerSelector::new(peer_set, PeerStrategy::Best, None); | ||
peer_selector.retain_rpc_peers(); | ||
let rpc_client = VerifiedRpcClient::new(peer_selector, network); | ||
let mut blocks = rpc_client.get_blocks(vec![block_hash]).await?; | ||
blocks.pop().flatten().map(|(block, _peer)| block) | ||
} | ||
} | ||
}; | ||
let block = block.ok_or_else(|| { | ||
format_err!( | ||
"Can not find block by {} from local and peer to peer network.", | ||
block_hash | ||
) | ||
sync_service.notify(SyncSpecificTagretRequest { | ||
block: None, | ||
block_id: block_hash, | ||
peer_id: None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typographical Error in Usage of SyncSpecificTagretRequest
The struct SyncSpecificTagretRequest
is used with a typographical error. The correct name should be SyncSpecificTargetRequest
. Please ensure that the corrected struct name is used consistently throughout the codebase.
Apply this diff to correct the struct name and ensure consistency:
-let sync_service = ctx.service_ref::<SyncService>()?.clone();
-let fut = async move {
- info!("Prepare to re execute block {}", block_hash);
- sync_service.notify(SyncSpecificTagretRequest {
+let sync_service = ctx.service_ref::<SyncService>()?.clone();
+let fut = async move {
+ info!("Prepare to re-execute block {}", block_hash);
+ sync_service.notify(SyncSpecificTargetRequest {
block: None,
block_id: block_hash,
peer_id: None,
})?;
Ok(())
};
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
let sync_service = ctx.service_ref::<SyncService>()?.clone(); | |
let fut = async move { | |
info!("Prepare to re execute block {}", block_hash); | |
let block = match storage.get_block(block_hash)? { | |
Some(block) => Some(block), | |
None => { | |
info!("Get block from peer to peer network"); | |
//get block from peer to peer network. | |
let peer_set = network.peer_set().await?; | |
if peer_set.is_empty() { | |
info!("Peers is empty."); | |
None | |
} else { | |
let peer_selector = | |
PeerSelector::new(peer_set, PeerStrategy::Best, None); | |
peer_selector.retain_rpc_peers(); | |
let rpc_client = VerifiedRpcClient::new(peer_selector, network); | |
let mut blocks = rpc_client.get_blocks(vec![block_hash]).await?; | |
blocks.pop().flatten().map(|(block, _peer)| block) | |
} | |
} | |
}; | |
let block = block.ok_or_else(|| { | |
format_err!( | |
"Can not find block by {} from local and peer to peer network.", | |
block_hash | |
) | |
sync_service.notify(SyncSpecificTagretRequest { | |
block: None, | |
block_id: block_hash, | |
peer_id: None, | |
let sync_service = ctx.service_ref::<SyncService>()?.clone(); | |
let fut = async move { | |
info!("Prepare to re-execute block {}", block_hash); | |
sync_service.notify(SyncSpecificTargetRequest { | |
block: None, | |
block_id: block_hash, | |
peer_id: None, |
node/src/node.rs
Outdated
sync_service.notify(SyncSpecificTagretRequest { | ||
block: None, | ||
block_id: block_hash, | ||
peer_id: None, | ||
})?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Handle Errors from sync_service.notify
Method
The sync_service.notify
method may return an error that is currently propagated using the ?
operator within an asynchronous block. However, since the future is being executed with ctx.exec(fut)
, the error may not be properly handled or logged.
Consider handling the error explicitly to ensure any issues are logged:
let fut = async move {
info!("Prepare to re-execute block {}", block_hash);
- sync_service.notify(SyncSpecificTargetRequest {
+ if let Err(e) = sync_service.notify(SyncSpecificTargetRequest {
block: None,
block_id: block_hash,
peer_id: None,
- })?;
+ }) {
+ error!("Failed to notify SyncService: {:?}", e);
+ }
Ok(())
};
Committable suggestion skipped: line range outside the PR's diff.
let lock_guard = self.commit_lock.lock(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Risk of Deadlock Due to Mutex Locking Order
In both commit_trusted_block
and commit
methods, the commit_lock
mutex is acquired before other locks (e.g., reachability_store
), but in other parts of the code, locks may be acquired in a different order. This inconsistency can lead to deadlocks.
Ensure that all locks are always acquired in the same order throughout the codebase. Review the locking strategy and consider combining locks or redesigning to avoid locking altogether if possible.
Also applies to: 477-479
@@ -54,6 +55,7 @@ | |||
pub storage: FlexiDagStorage, | |||
ghostdag_manager: DbGhostdagManager, | |||
pruning_point_manager: PruningPointManager, | |||
commit_lock: Arc<Mutex<FlexiDagStorage>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Potential Performance Bottleneck Due to commit_lock
Mutex
The addition of commit_lock: Arc<Mutex<FlexiDagStorage>>
introduces a mutex that locks the entire FlexiDagStorage
during commit operations. This could lead to performance issues due to contention in concurrent environments.
Consider implementing more fine-grained locking mechanisms or using lock-free data structures to reduce contention:
-pub struct BlockDAG {
- pub storage: FlexiDagStorage,
- ghostdag_manager: DbGhostdagManager,
- pruning_point_manager: PruningPointManager,
- commit_lock: Arc<Mutex<FlexiDagStorage>>,
+pub struct BlockDAG {
+ pub storage: Arc<FlexiDagStorage>,
+ ghostdag_manager: DbGhostdagManager,
+ pruning_point_manager: PruningPointManager,
+ // Remove commit_lock and use internal synchronization mechanisms
}
Committable suggestion skipped: line range outside the PR's diff.
sync/src/sync.rs
Outdated
if next_round.is_empty() { | ||
break; | ||
} | ||
current_round = next_round | ||
.iter() | ||
.flat_map(|block| block.header().parents_hash()) | ||
.collect::<HashSet<_>>() | ||
.into_iter() | ||
.collect::<Vec<_>>(); | ||
blocks_to_be_executed.extend(next_round); | ||
next_round = vec![]; | ||
info!( | ||
"[sync specific] Fetch parents blocks, current_round: {:?}", | ||
current_round | ||
); | ||
} | ||
let mut waiting_for_execution_heap = blocks_to_be_executed | ||
.into_iter() | ||
.map(|block| SyncBlockSort { block }) | ||
.collect::<BTreeSet<_>>(); | ||
|
||
let mut failed_blocks: HashSet<Block> = HashSet::new(); | ||
info!("[sync specific] Start to execute blocks"); | ||
while let Some(SyncBlockSort { block }) = | ||
waiting_for_execution_heap.iter().next().cloned() | ||
{ | ||
if chain.has_dag_block(block.id())? { | ||
waiting_for_execution_heap.remove(&SyncBlockSort { | ||
block: block.clone(), | ||
}); | ||
continue; | ||
} | ||
if !chain.check_parents_ready(block.header()) { | ||
failed_blocks.insert(block.clone()); | ||
waiting_for_execution_heap.remove(&SyncBlockSort { | ||
block: block.clone(), | ||
}); | ||
continue; | ||
} | ||
match chain.verify_with_verifier::<DagVerifier>(block.clone()) { | ||
Ok(verified_executed_block) => match chain.execute(verified_executed_block) { | ||
Ok(_) => { | ||
waiting_for_execution_heap.extend(failed_blocks.iter().map(|block| { | ||
SyncBlockSort { | ||
block: block.clone(), | ||
} | ||
})); | ||
waiting_for_execution_heap.remove(&SyncBlockSort { | ||
block: block.clone(), | ||
}); | ||
failed_blocks.clear(); | ||
continue; | ||
} | ||
Err(e) => { | ||
warn!( | ||
"[sync specific] Execute block failed, block id: {:?}, error: {:?}", | ||
block.id(), | ||
e | ||
); | ||
waiting_for_execution_heap.remove(&SyncBlockSort { | ||
block: block.clone(), | ||
}); | ||
failed_blocks.insert(block.clone()); | ||
continue; | ||
} | ||
}, | ||
Err(_) => { | ||
return Err(format_err!( | ||
"Verify block failed, block id: {:?}", | ||
block.id() | ||
)) | ||
} | ||
} | ||
} | ||
|
||
if chain.has_dag_block(msg.block_id)? { | ||
chain.connect(ExecutedBlock { | ||
block: specific_block, | ||
block_info: storage.get_block_info(msg.block_id)?.ok_or_else(|| { | ||
format_err!("failed to get the block info for id: {:?}", msg.block_id) | ||
})?, | ||
})?; | ||
info!("[sync specific] Sync specific block done"); | ||
} else { | ||
return Err(format_err!( | ||
"Sync specific block failed, block id: {:?}", | ||
specific_block.id() | ||
)); | ||
} | ||
info!("[sync specific] Sync specific block done"); | ||
Ok(()) | ||
}; | ||
|
||
ctx.spawn(fut.then(|result| async move { | ||
if let Err(e) = result { | ||
error!("[sync specific] Sync specific block failed, error: {:?}", e); | ||
} | ||
})); | ||
Ok(()) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Complexity in check_and_start_light_sync
Method
The check_and_start_light_sync
method contains deeply nested asynchronous logic with multiple conditional branches, which can make it difficult to follow and maintain.
Refactor the method to break down the logic into smaller, reusable functions. This will improve readability and make error handling more straightforward.
pub fn check_parents_ready(&self, header: &BlockHeader) -> bool { | ||
header.parents_hash().into_iter().all(|parent| { | ||
self.has_dag_block(parent).unwrap_or_else(|e| { | ||
warn!("check_parents_ready error: {:?}", e); | ||
false | ||
}) | ||
}) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add error recovery mechanism in parent readiness check.
The check_parents_ready
method silently continues on errors by returning false. Consider either:
- Propagating the error up for better error handling
- Adding metrics to track these failures
pub fn check_parents_ready(&self, header: &BlockHeader) -> bool {
header.parents_hash().into_iter().all(|parent| {
self.has_dag_block(parent).unwrap_or_else(|e| {
+ if let Some(metrics) = &self.metrics {
+ metrics.parent_check_errors.inc();
+ }
warn!("check_parents_ready error: {:?}", e);
false
})
})
}
Committable suggestion skipped: line range outside the PR's diff.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (2)
sync/src/block_connector/block_connector_service.rs (1)
129-144
: Improve method documentationWhile the inline comments explain the return values, the method would benefit from more comprehensive documentation explaining:
- The purpose of the proximity check
- Why the threshold is set to 2 * G_BASE_MAX_UNCLES_PER_BLOCK
- The impact on sync strategy selection
Add this documentation:
/// Determines if a block is "near" the current chain head for optimizing sync strategy. /// /// A block is considered "near" if: /// 1. Its number is less than the current chain head's number /// 2. The gap between the block and chain head is ≤ 2 * G_BASE_MAX_UNCLES_PER_BLOCK /// /// This helps choose between: /// - Light sync: For blocks that are near (returns true) /// - Full sync: For blocks that are far (returns false) /// /// The threshold of 2 * G_BASE_MAX_UNCLES_PER_BLOCK is chosen to cover potential uncle blocks /// while avoiding unnecessary full syncs for small gaps.sync/src/sync.rs (1)
189-211
: Add metrics for sync stage transitionsThe method handles sync stage transitions well but would benefit from metrics to track stage changes and failures.
Add metrics in the match arms:
if let Some(sync_stage_total) = self.metrics.as_ref().map(|m| &m.sync_stage_total) { sync_stage_total.with_label_values(&[match &previous_stage { SyncStage::NotStart => "not_start", SyncStage::Checking => "checking", SyncStage::Synchronizing(_) => "synchronizing", SyncStage::Canceling => "canceling", SyncStage::Done => "done", }]).inc(); }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
chain/src/chain.rs
(2 hunks)node/src/node.rs
(4 hunks)sync/api/src/lib.rs
(3 hunks)sync/src/block_connector/block_connector_service.rs
(4 hunks)sync/src/sync.rs
(5 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- chain/src/chain.rs
🔇 Additional comments (4)
sync/api/src/lib.rs (1)
21-51
: LGTM! Well-structured block sorting implementation.The implementation correctly handles block ordering based on block number and header ID, which is essential for maintaining a consistent order in block collections.
node/src/node.rs (1)
144-150
: 🛠️ Refactor suggestion
⚠️ Potential issueFix typo and enhance error handling
- The struct name contains a typo: "Targret" should be "Target".
- The error from
sync_service.notify
should be logged before being propagated.Apply these changes:
- sync_service.notify(SyncSpecificTargretRequest { + if let Err(e) = sync_service.notify(SyncSpecificTargetRequest { block: None, block_id: block_hash, peer_id: None, - })?; + }) { + error!("Failed to notify sync service: {}", e); + return Err(e.into()); + }Likely invalid or redundant comment.
sync/src/block_connector/block_connector_service.rs (1)
331-339
: LGTM! Well-structured sync strategy selection.The code effectively uses the block proximity check to choose between light sync and full sync, which can optimize the synchronization process.
sync/src/sync.rs (1)
846-853
:⚠️ Potential issueFix typo in event handler
The event handler uses the struct with the "Targret" typo.
Apply this diff:
-impl EventHandler<Self, SyncSpecificTargretRequest> for SyncService { - fn handle_event(&mut self, msg: SyncSpecificTargretRequest, ctx: &mut ServiceContext<Self>) { +impl EventHandler<Self, SyncSpecificTargetRequest> for SyncService { + fn handle_event(&mut self, msg: SyncSpecificTargetRequest, ctx: &mut ServiceContext<Self>) {Likely invalid or redundant comment.
#[derive(Debug, Clone)] | ||
pub struct SyncSpecificTargretRequest { | ||
pub block: Option<Block>, | ||
pub block_id: HashValue, | ||
pub peer_id: Option<PeerId>, | ||
} | ||
|
||
impl ServiceRequest for SyncSpecificTargretRequest { | ||
type Response = Result<()>; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix typo in struct name SyncSpecificTargretRequest
The struct name contains a typo: "Targret" should be "Target".
Apply this diff to fix the typo:
-pub struct SyncSpecificTargretRequest {
+pub struct SyncSpecificTargetRequest {
pub block: Option<Block>,
pub block_id: HashValue,
pub peer_id: Option<PeerId>,
}
-impl ServiceRequest for SyncSpecificTargretRequest {
+impl ServiceRequest for SyncSpecificTargetRequest {
type Response = Result<()>;
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
#[derive(Debug, Clone)] | |
pub struct SyncSpecificTargretRequest { | |
pub block: Option<Block>, | |
pub block_id: HashValue, | |
pub peer_id: Option<PeerId>, | |
} | |
impl ServiceRequest for SyncSpecificTargretRequest { | |
type Response = Result<()>; | |
} | |
#[derive(Debug, Clone)] | |
pub struct SyncSpecificTargetRequest { | |
pub block: Option<Block>, | |
pub block_id: HashValue, | |
pub peer_id: Option<PeerId>, | |
} | |
impl ServiceRequest for SyncSpecificTargetRequest { | |
type Response = Result<()>; | |
} |
if next_round.is_empty() { | ||
break; | ||
} | ||
current_round = next_round | ||
.iter() | ||
.flat_map(|block| block.header().parents_hash()) | ||
.collect::<HashSet<_>>() | ||
.into_iter() | ||
.collect::<Vec<_>>(); | ||
blocks_to_be_executed.extend(next_round); | ||
next_round = vec![]; | ||
info!( | ||
"[sync specific] Fetch parents blocks, current_round: {:?}", | ||
current_round | ||
); | ||
} | ||
let mut waiting_for_execution_heap = blocks_to_be_executed | ||
.into_iter() | ||
.map(|block| SyncBlockSort { block }) | ||
.collect::<BTreeSet<_>>(); | ||
|
||
let mut failed_blocks: HashSet<Block> = HashSet::new(); | ||
info!("[sync specific] Start to execute blocks"); | ||
while let Some(SyncBlockSort { block }) = | ||
waiting_for_execution_heap.iter().next().cloned() | ||
{ | ||
if chain.has_dag_block(block.id())? { | ||
waiting_for_execution_heap.remove(&SyncBlockSort { | ||
block: block.clone(), | ||
}); | ||
continue; | ||
} | ||
if !chain.check_parents_ready(block.header()) { | ||
failed_blocks.insert(block.clone()); | ||
waiting_for_execution_heap.remove(&SyncBlockSort { | ||
block: block.clone(), | ||
}); | ||
continue; | ||
} | ||
match chain.verify_with_verifier::<DagVerifier>(block.clone()) { | ||
Ok(verified_executed_block) => match chain.execute(verified_executed_block) { | ||
Ok(_) => { | ||
waiting_for_execution_heap.extend(failed_blocks.iter().map(|block| { | ||
SyncBlockSort { | ||
block: block.clone(), | ||
} | ||
})); | ||
waiting_for_execution_heap.remove(&SyncBlockSort { | ||
block: block.clone(), | ||
}); | ||
failed_blocks.clear(); | ||
continue; | ||
} | ||
Err(e) => { | ||
warn!( | ||
"[sync specific] Execute block failed, block id: {:?}, error: {:?}", | ||
block.id(), | ||
e | ||
); | ||
waiting_for_execution_heap.remove(&SyncBlockSort { | ||
block: block.clone(), | ||
}); | ||
failed_blocks.insert(block.clone()); | ||
continue; | ||
} | ||
}, | ||
Err(_) => { | ||
return Err(format_err!( | ||
"Verify block failed, block id: {:?}", | ||
block.id() | ||
)) | ||
} | ||
} | ||
} | ||
|
||
if chain.has_dag_block(msg.block_id)? { | ||
chain.connect(ExecutedBlock { | ||
block: specific_block, | ||
block_info: storage.get_block_info(msg.block_id)?.ok_or_else(|| { | ||
format_err!("failed to get the block info for id: {:?}", msg.block_id) | ||
})?, | ||
})?; | ||
info!("[sync specific] Sync specific block done"); | ||
} else { | ||
return Err(format_err!( | ||
"Sync specific block failed, block id: {:?}", | ||
specific_block.id() | ||
)); | ||
} | ||
info!("[sync specific] Sync specific block done"); | ||
Ok(()) | ||
}; | ||
|
||
ctx.spawn(fut.then(|result| async move { | ||
if let Err(e) = result { | ||
error!("[sync specific] Sync specific block failed, error: {:?}", e); | ||
} | ||
})); | ||
Ok(()) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Refactor complex synchronization logic
The method is too complex with deep nesting and multiple responsibilities. Consider breaking it down into smaller, focused methods:
- Block retrieval logic (local/remote)
- Block execution logic
- Parent block verification
Example refactoring:
fn check_and_start_light_sync(...) -> Result<()> {
let network = self.prepare_network_client(ctx, &msg)?;
let chain = self.prepare_blockchain(ctx)?;
let specific_block = self.retrieve_block(&msg, &network)?;
let blocks = self.collect_parent_blocks(&specific_block, &network)?;
self.execute_blocks(blocks, &chain)?;
self.verify_final_state(&msg, specific_block, &chain)?;
Ok(())
}
Pull request type
Please check the type of change your PR introduces:
What is the current behavior?
Issue Number: N/A
What is the new behavior?
Other information
Summary by CodeRabbit
Based on the comprehensive summary of changes across multiple files, here are the updated release notes:
New Features
Improvements
Performance
Logging