-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathBlockchain Cache Global.go
142 lines (112 loc) · 5.27 KB
/
Blockchain Cache Global.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
/*
File Username: Blockchain Cache Global.go
Copyright: 2021 Peernet s.r.o.
Author: Peter Kleissner
*/
package core
import (
"github.com/PeernetOfficial/core/blockchain"
"github.com/PeernetOfficial/core/protocol"
"github.com/enfipy/locker"
)
// The blockchain cache stores blockchains.
type BlockchainCache struct {
BlockchainDirectory string // The directory for storing blockchains in a key-value store.
MaxBlockSize uint64 // Max block size to accept.
MaxBlockCount uint64 // Max block count to cache per peer.
LimitTotalRecords uint64 // Max count of blocks and header in total to keep across all blockchains. 0 = unlimited. Max Records * Max Block Size = Size Limit.
ReadOnly bool // Whether the cache is read only.
Store *blockchain.MultiStore
peerLock *locker.Locker
backend *Backend
}
func (backend *Backend) initBlockchainCache() {
if backend.Config.BlockchainGlobal == "" {
return
}
backend.GlobalBlockchainCache = &BlockchainCache{
backend: backend,
BlockchainDirectory: backend.Config.BlockchainGlobal,
MaxBlockSize: backend.Config.CacheMaxBlockSize,
MaxBlockCount: backend.Config.CacheMaxBlockCount,
LimitTotalRecords: backend.Config.LimitTotalRecords,
}
var err error
backend.GlobalBlockchainCache.Store, err = blockchain.InitMultiStore(backend.Config.BlockchainGlobal)
if err != nil {
backend.LogError("initBlockchainCache", "initializing database '%s': %s", backend.Config.BlockchainGlobal, err.Error())
return
}
backend.GlobalBlockchainCache.peerLock = locker.Initialize()
// Set the blockchain cache to read-only if the record limit is reached.
if backend.Config.LimitTotalRecords > 0 && backend.GlobalBlockchainCache.Store.Database.Count() >= backend.Config.LimitTotalRecords {
backend.GlobalBlockchainCache.ReadOnly = true
}
backend.GlobalBlockchainCache.Store.FilterStatisticUpdate = backend.Filters.GlobalBlockchainCacheStatistic
backend.GlobalBlockchainCache.Store.FilterBlockchainDelete = backend.Filters.GlobalBlockchainCacheDelete
}
// SeenBlockchainVersion shall be called with information about another peer's blockchain.
// If the reported version number is newer, all existing blocks are immediately deleted.
func (cache *BlockchainCache) SeenBlockchainVersion(peer *PeerInfo) {
cache.peerLock.Lock(string(peer.PublicKey.SerializeCompressed()))
defer cache.peerLock.Unlock(string(peer.PublicKey.SerializeCompressed()))
// intermediate function to download and process blocks
downloadAndProcessBlocks := func(peer *PeerInfo, header *blockchain.MultiBlockchainHeader, offset, limit uint64) {
if limit > cache.MaxBlockCount {
limit = cache.MaxBlockCount
}
peer.BlockDownload(peer.PublicKey, cache.MaxBlockCount, cache.MaxBlockSize, []protocol.BlockRange{{Offset: offset, Limit: limit}}, func(data []byte, targetBlock protocol.BlockRange, blockSize uint64, availability uint8) {
if availability != protocol.GetBlockStatusAvailable {
return
}
if decoded, _ := cache.Store.IngestBlock(header, targetBlock.Offset, data, true); decoded != nil {
// index it for search
cache.backend.SearchIndex.IndexNewBlockDecoded(peer.PublicKey, peer.BlockchainVersion, targetBlock.Offset, decoded.RecordsDecoded)
}
})
}
// get the old header
header, status, err := cache.Store.AssessBlockchainHeader(peer.PublicKey, peer.BlockchainVersion, peer.BlockchainHeight)
if err != nil {
return
}
switch status {
case blockchain.MultiStatusEqual:
return
case blockchain.MultiStatusInvalidRemote:
cache.Store.DeleteBlockchain(header)
cache.backend.SearchIndex.UnindexBlockchain(peer.PublicKey)
case blockchain.MultiStatusHeaderNA:
if header, err = cache.Store.NewBlockchainHeader(peer.PublicKey, peer.BlockchainVersion, peer.BlockchainHeight); err != nil {
return
}
downloadAndProcessBlocks(peer, header, 0, peer.BlockchainHeight)
case blockchain.MultiStatusNewVersion:
// delete existing data first, then create it new
cache.Store.DeleteBlockchain(header)
cache.backend.SearchIndex.UnindexBlockchain(peer.PublicKey)
if header, err = cache.Store.NewBlockchainHeader(peer.PublicKey, peer.BlockchainVersion, peer.BlockchainHeight); err != nil {
return
}
downloadAndProcessBlocks(peer, header, 0, peer.BlockchainHeight)
case blockchain.MultiStatusNewBlocks:
offset := header.Height
limit := peer.BlockchainHeight - header.Height
header.Height = peer.BlockchainHeight
downloadAndProcessBlocks(peer, header, offset, limit)
}
if cache.LimitTotalRecords > 0 {
// Bug: This code is currently never reached if ReadOnly is true.
cache.ReadOnly = cache.Store.Database.Count() >= cache.LimitTotalRecords
}
}
// remoteBlockchainUpdate shall be called to indicate a potential update of the remotes blockchain.
// It will use the blockchain version and height to update the data lake as appropriate.
// This function is called in the Go routine of the packet worker and therefore must not stall.
func (peer *PeerInfo) remoteBlockchainUpdate() {
if peer.Backend.GlobalBlockchainCache == nil || peer.Backend.GlobalBlockchainCache.ReadOnly || peer.BlockchainVersion == 0 && peer.BlockchainHeight == 0 {
return
}
// TODO: This entire function should be instead a non-blocking message via a buffer channel.
go peer.Backend.GlobalBlockchainCache.SeenBlockchainVersion(peer)
}