Skip to content

Commit

Permalink
tests pass
Browse files Browse the repository at this point in the history
  • Loading branch information
just-in-chang committed Apr 3, 2024
1 parent 329ca2d commit 8df8bee
Showing 1 changed file with 25 additions and 22 deletions.
47 changes: 25 additions & 22 deletions ecosystem/indexer-grpc/indexer-grpc-utils/src/in_memory_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ impl Into<u64> for TransactionVersion {
}

impl Incrementable<Arc<Transaction>> for TransactionVersion {
fn next(&self, _value_context: &Arc<Transaction>) -> Self {
TransactionVersion(self.0 + 1)
fn next(&self, value_context: &Arc<Transaction>) -> Self {
TransactionVersion(value_context.version + 1)
}
}

Expand Down Expand Up @@ -82,25 +82,23 @@ impl InMemoryCache {
pub async fn get_transactions(&self, starting_version: u64) -> Vec<Transaction> {
let start_time = std::time::Instant::now();
let (versions_to_fetch, in_memory_latest_version) = loop {
if let Some(latest_version) = self.cache.last_key() {
let latest_version = latest_version.into();
if starting_version >= latest_version {
tokio::time::sleep(std::time::Duration::from_millis(
IN_MEMORY_CACHE_LOOKUP_RETRY_INTERVAL_MS,
))
.await;
continue;
}
// This is to avoid fetching too many transactions at once.
let ending_version = std::cmp::min(
latest_version,
starting_version + MAX_REDIS_FETCH_BATCH_SIZE as u64,
);
break (
(starting_version..ending_version).collect::<Vec<u64>>(),
latest_version,
);
let latest_version = self.latest_version();
if starting_version >= latest_version {
tokio::time::sleep(std::time::Duration::from_millis(
IN_MEMORY_CACHE_LOOKUP_RETRY_INTERVAL_MS,
))
.await;
continue;
}
// This is to avoid fetching too many transactions at once.
let ending_version = std::cmp::min(
latest_version,
starting_version + MAX_REDIS_FETCH_BATCH_SIZE as u64,
);
break (
(starting_version..ending_version).collect::<Vec<u64>>(),
latest_version,
);
};

let lock_waiting_time = start_time.elapsed().as_secs_f64();
Expand Down Expand Up @@ -134,7 +132,11 @@ impl InMemoryCache {
}

pub fn latest_version(&self) -> u64 {
self.cache.last_key().expect("Cache is warmed up").into()
if let Some(key) = self.cache.last_key() {
let key: u64 = key.into();
return key + 1;
}
0
}
}

Expand Down Expand Up @@ -194,7 +196,8 @@ fn spawn_update_task<C, Ca>(
.context("Latest version doesn't exist in Redis")
.unwrap();

let in_cache_latest_version = { cache.last_key().expect("Cache is warmed up") }.into();
let last_key: u64 = { cache.last_key().expect("Cache is warmed up") }.into();
let in_cache_latest_version = last_key + 1;
if current_latest_version == in_cache_latest_version {
tokio::time::sleep(std::time::Duration::from_millis(
IN_MEMORY_CACHE_LOOKUP_RETRY_INTERVAL_MS,
Expand Down

0 comments on commit 8df8bee

Please sign in to comment.