以太坊源码分析:fetcher模块和区块传播
刘艳琴
发表于 2022-12-21 01:53:48
239
0
0
当前代码是以太坊Release 1.8,如果版本不同,代码上可能存在差异。
总体过程和传播策略4 Y. J7 R& M3 ]. u) D9 O
本节从宏观角度介绍,节点产生区块后,为了传播给远端节点做了啥,远端节点收到区块后又做了什么,每个节点都连接了很多Peer,它传播的策略是什么样的?
总体流程和策略可以总结为,传播给远端Peer节点,Peer验证区块无误后,加入到本地区块链,继续传播新区块信息。具体过程如下。7 F i8 H2 x; d6 W, J3 e4 _( Q
先看总体过程。产生区块后,miner模块会发布一个事件NewMinedBlockEvent,订阅事件的协程收到事件后,就会把新区块的消息,广播给它所有的peer,peer收到消息后,会交给自己的fetcher模块处理,fetcher进行基本的验证后,区块没问题,发现这个区块就是本地链需要的下一个区块,则交给blockChain进一步进行完整的验证,这个过程会执行区块所有的交易,无误后把区块加入到本地链,写入数据库,这个过程就是下面的流程图,图1。5 p) Z; O' e; ?! y* f
& V1 u4 T8 I D- V0 y3 V
总体流程图,能看到有个分叉,是因为节点传播新区块是有策略的。它的传播策略为:
假如节点连接了N个Peer,它只向Peer列表的sqrt(N)个Peer广播完整的区块消息。向所有的Peer广播只包含区块Hash的消息。
策略图的效果如图2,红色节点将区块传播给黄色节点:
# w9 ` J U8 z6 {
收到区块Hash的节点,需要从发送给它消息的Peer那里获取对应的完整区块,获取区块后就会按照图1的流程,加入到fetcher队列,最终插入本地区块链后,将区块的Hash值广播给和它相连,但还不知道这个区块的Peer。非产生区块节点的策略图,如图3,黄色节点将区块Hash传播给青色节点:! r- A+ h/ u0 a7 Q6 J
至此,可以看出以太坊采用以石击水的方式,像水纹一样,层层扩散新产生的区块。
Fetcher模块是干啥的
fetcher模块的功能,就是收集其他Peer通知它的区块信息:1)完整的区块2)区块Hash消息。根据通知的消息,获取完整的区块,然后传递给eth模块把区块插入区块链。
如果是完整区块,就可以传递给eth插入区块,如果只有区块Hash,则需要从其他的Peer获取此完整的区块,然后再传递给eth插入区块6 c- x M! O( O- ^! z3 C9 P
源码解读
本节介绍区块传播和处理的细节东西,方式仍然是先用图解释流程,再是代码流程。5 g/ r" _* j- e+ k# X. c1 y2 ]( D
产块节点的传播新区块
节点产生区块后,广播的流程可以表示为图4:
发布事件事件处理函数选择要广播完整的Peer,然后将区块加入到它们的队列事件处理函数把区块Hash添加到所有Peer的另外一个通知队列每个Peer的广播处理函数,会遍历它的待广播区块队列和通知队列,把数据封装成消息,调用P2P接口发送出去; l' ^# j3 A! W. @$ k" g
再看下代码上的细节。' d7 l1 ]0 b; n4 Y1 r. D
worker.wait()函数发布事件NewMinedBlockEvent。ProtocolManager.minedBroadcastLoop()是事件处理函数。它调用了2次pm.BroadcastBlock()。* Z9 e$ B( D3 v0 S
// Mined broadcast loop: w5 `* G) l# m3 |
func (pm *ProtocolManager) minedBroadcastLoop() {
// automatically stops if unsubscribe
for obj := range pm.minedBlockSub.Chan() {0 x. }# b9 ~5 {
switch ev := obj.Data.(type) {
case core.NewMinedBlockEvent:! g7 m1 q, ~3 _# L
pm.BroadcastBlock(ev.Block, true) // First propagate block to peers9 {7 U' B; J N# F& Z# ]- O8 T
pm.BroadcastBlock(ev.Block, false) // Only then announce to the rest6 M$ |, E9 h# F0 U! T- c
}$ L: W `2 M% f# V' i: I1 ~4 w" R
}+ A/ ~0 @* R) K! w0 Y: F
}
pm.BroadcastBlock()的入参propagate为真时,向部分Peer广播完整的区块,调用peer.AsyncSendNewBlock(),否则向所有Peer广播区块头,调用peer.AsyncSendNewBlockHash(),这2个函数就是把数据放入队列,此处不再放代码。
// BroadcastBlock will either propagate a block to a subset of it's peers, or6 i# |% }* g( q9 Y
// will only announce it's availability (depending what's requested).
func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) {- c# S, Z- R5 |+ V$ l s5 V
hash := block.Hash()0 n( ]) c, j1 X8 A* i9 O
peers := pm.peers.PeersWithoutBlock(hash)0 I# l8 G5 g1 [: H
// If propagation is requested, send to a subset of the peer8 x) U: e' ~( s! s% C! m- S6 j
// 这种情况,要把区块广播给部分peer
if propagate {
// Calculate the TD of the block (it's not imported yet, so block.Td is not valid)
// 计算新的总难度
var td *big.Int
if parent := pm.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1); parent != nil {
td = new(big.Int).Add(block.Difficulty(), pm.blockchain.GetTd(block.ParentHash(), block.NumberU64()-1))
} else {
log.Error("Propagating dangling block", "number", block.Number(), "hash", hash)' O% a6 ^8 {2 b* y# b
return0 |& f! O% ^* o+ J U
}
// Send the block to a subset of our peers B' f# T6 `/ g! r
// 广播区块给部分peer3 X* x2 h0 }1 ^' S
transfer := peers[:int(math.Sqrt(float64(len(peers))))]& u: `% v8 d/ p, Q. _& A$ _
for _, peer := range transfer {. P: o( b6 e. V
peer.AsyncSendNewBlock(block, td)
}5 J' w; x3 F+ `
log.Trace("Propagated block", "hash", hash, "recipients", len(transfer), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))
return
}
// Otherwise if the block is indeed in out own chain, announce it: W$ N' d# ~( E" A4 @4 D+ D
// 把区块hash值广播给所有peer9 c6 C" X2 s9 f9 @: C, Y+ c
if pm.blockchain.HasBlock(hash, block.NumberU64()) {: C: {% G; @- P6 ~9 B M9 m
for _, peer := range peers {2 Q4 V0 v4 Q/ t% `5 q0 ~
peer.AsyncSendNewBlockHash(block). D9 N7 t& G% W
}
log.Trace("Announced block", "hash", hash, "recipients", len(peers), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))
}+ U8 x% _9 y3 K# v: }
}( r. ~8 | f. Q
peer.broadcase()是每个Peer连接的广播函数,它只广播3种消息:交易、完整的区块、区块的Hash,这样表明了节点只会主动广播这3中类型的数据,剩余的数据同步,都是通过请求-响应的方式。
// broadcast is a write loop that multiplexes block propagations, announcements
// and transaction broadcasts into the remote peer. The goal is to have an async
// writer that does not lock up node internals.5 I5 t2 j* }1 Z2 ]9 X
func (p *peer) broadcast() {
for {
select {+ M6 Q4 |/ h$ e; \# r1 h% u- k
// 广播交易2 q9 D2 m8 f' M2 ]% p8 r! N1 g
case txs :=
Peer节点处理新区块
本节介绍远端节点收到2种区块同步消息的处理,其中NewBlockMsg的处理流程比较清晰,也简洁。NewBlockHashesMsg消息的处理就绕了2绕,从总体流程图1上能看出来,它需要先从给他发送消息Peer那里获取到完整的区块,剩下的流程和NewBlockMsg又一致了。
这部分涉及的模块多,画出来有种眼花缭乱的感觉,但只要抓住上面的主线,代码看起来还是很清晰的。通过图5先看下整体流程。
消息处理的起点是ProtocolManager.handleMsg,NewBlockMsg的处理流程是蓝色标记的区域,红色区域是单独的协程,是fetcher处理队列中区块的流程,如果从队列中取出的区块是当前链需要的,校验后,调用blockchian.InsertChain()把区块插入到区块链,最后写入数据库,这是黄色部分。最后,绿色部分是NewBlockHashesMsg的处理流程,代码流程上是比较复杂的,为了能通过图描述整体流程,我把它简化掉了。
仔细看看这幅图,掌握整体的流程后,接下来看每个步骤的细节。
NewBlockMsg的处理
本节介绍节点收到完整区块的处理,流程如下:: T }+ H% |2 M1 n
首先进行RLP编解码,然后标记发送消息的Peer已经知道这个区块,这样本节点最后广播这个区块的Hash时,不会再发送给该Peer。
将区块存入到fetcher的队列,调用fetcher.Enqueue。
更新Peer的Head位置,然后判断本地链是否落后于Peer的链,如果是,则通过Peer更新本地链。
只看handle.Msg()的NewBlockMsg相关的部分。9 n0 t" e v$ p: g* I
case msg.Code == NewBlockMsg:
// Retrieve and decode the propagated block
// 收到新区块,解码,赋值接收数据
var request newBlockData6 R/ M1 y: k) _$ q7 h
if err := msg.Decode(&request); err != nil {/ e \1 ]( H% _% A, U, z7 t# R
return errResp(ErrDecode, "%v: %v", msg, err)
}: j. P- M8 }. P2 ^7 h* H2 |
request.Block.ReceivedAt = msg.ReceivedAt5 G- |( Q5 C! }* b
request.Block.ReceivedFrom = p
// Mark the peer as owning the block and schedule it for import
// 标记peer知道这个区块
p.MarkBlock(request.Block.Hash())
// 为啥要如队列?已经得到完整的区块了. I7 |$ D; U" _: }1 ?4 F: V
// 答:存入fetcher的优先级队列,fetcher会从队列中选取当前高度需要的块/ o. m0 d# ^1 Z
pm.fetcher.Enqueue(p.id, request.Block)4 ~4 s+ S9 z$ k$ o, x5 H1 u
// Assuming the block is importable by the peer, but possibly not yet done so,
// calculate the head hash and TD that the peer truly must have.8 j! d# A% b( E# f, p: R! X- l
// 截止到parent区块的头和难度. z" n4 K& _5 V7 {; Q. G
var (
trueHead = request.Block.ParentHash()
trueTD = new(big.Int).Sub(request.TD, request.Block.Difficulty()) o1 w# i" L% R0 T* p' C% G
) A5 X; {2 [3 ^: P
// Update the peers total difficulty if better than the previous
// 如果收到的块的难度大于peer之前的,以及自己本地的,就去和这个peer同步
// 问题:就只用了一下块里的hash指,为啥不直接使用这个块呢,如果这个块不能用,干嘛不少发送些数据,减少网络负载呢。) C1 A. l3 `1 o& ^
// 答案:实际上,这个块加入到了优先级队列中,当fetcher的loop检查到当前下一个区块的高度,正是队列中有的,则不再向peer请求) H1 X$ N' @- h7 ]8 z
// 该区块,而是直接使用该区块,检查无误后交给block chain执行insertChain
if _, td := p.Head(); trueTD.Cmp(td) > 0 {* x% b2 {, J- s! ]. L
p.SetHead(trueHead, trueTD)# o8 A, {' d1 r L% [, V
// Schedule a sync if above ours. Note, this will not fire a sync for a gap of; O1 R1 E# H% A3 L& \, P( G. i
// a singe block (as the true TD is below the propagated block), however this* z7 d# z1 _ ^5 Q- n k
// scenario should easily be covered by the fetcher." \0 h8 Y c; c# z
currentBlock := pm.blockchain.CurrentBlock()" O; V. [+ @' R% u4 p# g
if trueTD.Cmp(pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64())) > 0 {/ z. X. M/ ?# D
go pm.synchronise(p)" C, ^& s2 C4 g# \3 L' }9 ?0 e: i" l
}
}
//------------------------ 以上 handleMsg0 Z$ B3 M4 m! L$ y( L) X: P# Y
// Enqueue tries to fill gaps the the fetcher's future import queue.
// 发给inject通道,当前协程在handleMsg,通过通道发送给fetcher的协程处理: ^3 h/ |/ n- v, U
func (f *Fetcher) Enqueue(peer string, block *types.Block) error {# S0 B/ L/ |+ {8 |+ v% |
op := &inject{1 Q+ Y S a7 _$ |4 F
origin: peer,' M* X/ D9 @' D0 W# V
block: block,
}# _6 b. V! S0 \
select {3 G: ?# o# d* r, X; X7 E% \& P
case f.inject blockLimit {) N/ U8 ?6 ~0 U( Q
log.Debug("Discarded propagated block, exceeded allowance", "peer", peer, "number", block.Number(), "hash", hash, "limit", blockLimit), a: z$ J+ U2 J4 v) J
propBroadcastDOSMeter.Mark(1)! d& {: [3 u' L: Q* F& C
f.forgetHash(hash)( H% q0 K. g, Z/ D" H9 ]
return
}3 N( B1 M. H8 N: {
// Discard any past or too distant blocks
// 高度检查:未来太远的块丢弃+ ~# O$ B: ] c2 o
if dist := int64(block.NumberU64()) - int64(f.chainHeight()); dist maxQueueDist {" M: E1 @4 c1 u5 ^3 N2 _7 T1 V9 W
log.Debug("Discarded propagated block, too far away", "peer", peer, "number", block.Number(), "hash", hash, "distance", dist)
propBroadcastDropMeter.Mark(1)$ j) l% B9 D, {% X
f.forgetHash(hash)* y1 x6 V6 D/ b$ U" S4 N" E. p# N
return
}
// Schedule the block for future importing
// 块先加入优先级队列,加入链之前,还有很多要做
if _, ok := f.queued[hash]; !ok {+ n5 {9 L5 U/ x3 H3 F) ], c
op := &inject{
origin: peer,
block: block,' M+ Q Y) y/ ]$ o. Z
}! J: R0 `8 p( p1 N- C5 @5 g1 A
f.queues[peer] = count
f.queued[hash] = op$ t* U0 U1 {# V, Q F) Q/ i6 q
f.queue.Push(op, -float32(block.NumberU64()))
if f.queueChangeHook != nil {, Y% i4 h! ]; `% N& K! H$ y
f.queueChangeHook(op.block.Hash(), true)
}( U5 `' u" T( M3 }
log.Debug("Queued propagated block", "peer", peer, "number", block.Number(), "hash", hash, "queued", f.queue.Size())3 U* J" ]# p! R/ w
}% }) f% `, M! O1 \' q2 Q B
}
fetcher队列处理
本节我们看看,区块加入队列后,fetcher如何处理区块,为何不直接校验区块,插入到本地链?
由于以太坊又Uncle的机制,节点可能收到老一点的一些区块。另外,节点可能由于网络原因,落后了几个区块,所以可能收到“未来”的一些区块,这些区块都不能直接插入到本地链。
区块入的队列是一个优先级队列,高度低的区块会被优先取出来。fetcher.loop是单独协程,不断运转,清理fecther中的事务和事件。首先会清理正在fetching的区块,但已经超时。然后处理优先级队列中的区块,判断高度是否是下一个区块,如果是则调用f.insert()函数,校验后调用BlockChain.InsertChain(),成功插入后,广播新区块的Hash。7 B1 r; }# W a0 @; b& s
// Loop is the main fetcher loop, checking and processing various notification6 G: d7 @* _" v8 u; b+ [
// events./ b0 I+ j6 u" b' {9 a5 x# y8 ?$ |6 Y
func (f *Fetcher) loop() {
// Iterate the block fetching until a quit is requested
fetchTimer := time.NewTimer(0)0 }& k, R' `; q% ]8 f
completeTimer := time.NewTimer(0)$ ]$ x8 \* `% E7 @. x/ i3 G8 j
for {
// Clean up any expired block fetches
// 清理过期的区块
for hash, announce := range f.fetching {3 C u, }" ^6 E2 }; j- t
if time.Since(announce.time) > fetchTimeout {* m* \ |! m. [, [. g9 j, m8 Q8 b
f.forgetHash(hash)% A1 O2 p0 h( w" `/ b f. |
}
}
// Import any queued blocks that could potentially fit
// 导入队列中合适的块
height := f.chainHeight()
for !f.queue.Empty() {
op := f.queue.PopItem().(*inject)
hash := op.block.Hash(): S& `9 K8 T+ y( M3 E: i( X
if f.queueChangeHook != nil {
f.queueChangeHook(hash, false)) w& x1 f& E1 _2 C
}3 c* k0 ]8 l1 ]
// If too high up the chain or phase, continue later
// 块不是链需要的下一个块,再入优先级队列,停止循环- F. Z& G* H; b: l, x0 f( ?
number := op.block.NumberU64()
if number > height+1 {
f.queue.Push(op, -float32(number))4 Q+ R9 M, j. @" C" P7 z5 t
if f.queueChangeHook != nil {
f.queueChangeHook(hash, true)9 W4 Q! a, Y0 K9 ?9 t
}) H4 g& Q$ J. R1 z& j" [% f% |9 z
break& j; U1 b3 [. v( C2 L: j: S( g$ Q3 w
}
// Otherwise if fresh and still unknown, try and import. J: b b# z, k6 Q2 {$ v: n
// 高度正好是我们想要的,并且链上也没有这个块
if number+maxUncleDist + W- ~' Z$ j% c s" X1 ~ G# b7 Z
func (f *Fetcher) insert(peer string, block *types.Block) {2 K: w, I1 B) I3 y! v# J6 Q
hash := block.Hash()
// Run the import on a new thread
log.Debug("Importing propagated block", "peer", peer, "number", block.Number(), "hash", hash)! s. u4 x/ D- c8 a; X
go func() {
defer func() { f.done % n9 p7 V& o9 N5 K! n( S
NewBlockHashesMsg的处理
本节介绍NewBlockHashesMsg的处理,其实,消息处理是简单的,而复杂一点的是从Peer哪获取完整的区块,下节再看。; j1 x# O4 x: U ]3 W0 O
流程如下:
对消息进行RLP解码,然后标记Peer已经知道此区块。寻找出本地区块链不存在的区块Hash值,把这些未知的Hash通知给fetcher。fetcher.Notify记录好通知信息,塞入notify通道,以便交给fetcher的协程。fetcher.loop()会对notify中的消息进行处理,确认区块并非DOS攻击,然后检查区块的高度,判断该区块是否已经在fetching或者comleting(代表已经下载区块头,在下载body),如果都没有,则加入到announced中,触发0s定时器,进行处理。" P/ K7 C- B1 E8 S9 d6 u& T9 h
关于announced下节再介绍。( {" A/ f( z4 B) k. j( \; A
// handleMsg()部分
case msg.Code == NewBlockHashesMsg:" q' E# ?& C' I/ T& B& K/ ?, P6 i) g T
var announces newBlockHashesData7 I, s l3 c! r" P: P$ H
if err := msg.Decode(&announces); err != nil {
return errResp(ErrDecode, "%v: %v", msg, err)& ~4 m+ [0 _2 L `
}
// Mark the hashes as present at the remote node# `* a/ o% E" k3 u1 a* s& p2 _ T \; ^# y
for _, block := range announces {8 S; k$ D5 w) c
p.MarkBlock(block.Hash)1 G# c0 c& n5 ^ A7 H! c
}
// Schedule all the unknown hashes for retrieval D( _2 p5 `* Y: G( y9 {
// 把本地链没有的块hash找出来,交给fetcher去下载
unknown := make(newBlockHashesData, 0, len(announces))! l* I/ T4 _4 I& H$ C
for _, block := range announces {9 x9 g% m% G' |: l1 Z0 @2 l" j' f) Z
if !pm.blockchain.HasBlock(block.Hash, block.Number) {
unknown = append(unknown, block)
}
}
for _, block := range unknown {4 O3 o5 ~4 x9 m* e0 p
pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestOneHeader, p.RequestBodies)8 E# @+ m% [) w v6 K( n
}
// Notify announces the fetcher of the potential availability of a new block in3 h( T) S: C/ J2 ]4 {/ |% t
// the network., V! U- g& ~8 }% ?
// 通知fetcher(自己)有新块产生,没有块实体,有hash、高度等信息/ I0 p0 g) _5 |1 d0 p
func (f *Fetcher) Notify(peer string, hash common.Hash, number uint64, time time.Time,
headerFetcher headerRequesterFn, bodyFetcher bodyRequesterFn) error {
block := &announce{
hash: hash,1 h ]& n3 F0 m& @/ d$ K
number: number,: I2 G& H. E2 t6 c3 g
time: time,
origin: peer,
fetchHeader: headerFetcher,( {; a; c& N% S+ X& k' a7 B
fetchBodies: bodyFetcher,
}
select {% Y5 e& ^. u; Z k
case f.notify hashLimit {; O& J7 M5 m' D: X9 d# Y' o( t
log.Debug("Peer exceeded outstanding announces", "peer", notification.origin, "limit", hashLimit)
propAnnounceDOSMeter.Mark(1)2 d5 l3 R; n% \
break }+ J9 F. D i/ A$ Z- T
}0 T) C# V3 [" z _' v( n
// If we have a valid block number, check that it's potentially useful
// 高度检查" S2 Q6 [5 A; e
if notification.number > 0 {
if dist := int64(notification.number) - int64(f.chainHeight()); dist maxQueueDist {7 `& L4 O$ d( N5 P) M
log.Debug("Peer discarded announcement", "peer", notification.origin, "number", notification.number, "hash", notification.hash, "distance", dist)- B; H9 @/ W- d2 g& B7 F; p" K
propAnnounceDropMeter.Mark(1)8 n: _! Q% h: ~0 T
break
}1 C1 ?; u _4 _+ f, V/ H& H+ K' U
}
// All is well, schedule the announce if block's not yet downloading* S; i" p: ^, ?5 F
// 检查是否已经在下载,已下载则忽略
if _, ok := f.fetching[notification.hash]; ok {
break9 z) @4 ?' q6 ^- A+ P5 w
}
if _, ok := f.completing[notification.hash]; ok {7 [1 S6 o: E. o
break
}
// 更新peer已经通知给我们的区块数量
f.announces[notification.origin] = count
// 把通知信息加入到announced,供调度8 ?2 L# @* u0 Q2 N4 x+ a& Y! T2 |; J
f.announced[notification.hash] = append(f.announced[notification.hash], notification)) d+ n8 U) E( N# d% |
if f.announceChangeHook != nil && len(f.announced[notification.hash]) == 1 {, w1 B( {3 [( l4 R
f.announceChangeHook(notification.hash, true)1 ^, U4 x) O& B* H$ E8 K! G" \1 a
}
if len(f.announced) == 1 {8 q' J) g+ p4 Y$ d3 v
// 有通知放入到announced,则重设0s定时器,loop的另外一个分支会处理这些通知% U0 f4 Q& S& d. ?* T: S4 T- ?0 B4 Y
f.rescheduleFetch(fetchTimer), K- r2 N9 {: l C7 w$ \" q+ C
}
fetcher获取完整区块
本节介绍fetcher获取完整区块的过程,这也是fetcher最重要的功能,会涉及到fetcher至少80%的代码。单独拉放一大节吧。
Fetcher的大头2 x* X! s( E0 W. h% l
Fetcher最主要的功能就是获取完整的区块,然后在合适的实际交给InsertChain去验证和插入到本地区块链。我们还是从宏观入手,看Fetcher是如何工作的,一定要先掌握好宏观,因为代码层面上没有这么清晰。9 B& \* U3 b6 O! S: Q
宏观
首先,看两个节点是如何交互,获取完整区块,使用时序图的方式看一下,见图6,流程很清晰不再文字介绍。
8 z8 f' k% `9 M
再看下获取区块过程中,fetcher内部的状态转移,它使用状态来记录,要获取的区块在什么阶段,见图7。我稍微解释一下:& s5 ~* G% \3 \" b) M, |. z3 B
收到NewBlockHashesMsg后,相关信息会记录到announced,进入announced状态,代表了本节点接收了消息。announced由fetcher协程处理,经过校验后,会向给他发送消息的Peer发送请求,请求该区块的区块头,然后进入fetching状态。获取区块头后,如果区块头表示没有交易和uncle,则转移到completing状态,并且使用区块头合成完整的区块,加入到queued优先级队列。获取区块头后,如果区块头表示该区块有交易和uncle,则转移到fetched状态,然后发送请求,请求交易和uncle,然后转移到completing状态。收到交易和uncle后,使用头、交易、uncle这3个信息,生成完整的区块,加入到队列queued。! @5 I5 q/ i/ G$ T4 ~, V( n. t& M
+ N, M+ Q+ a- g/ ^" O, p- W
. c* F0 ?$ |* e' u( X( [- S0 G. I% k
微观% K: }! O) j# b) v3 |+ P4 I) C8 |' x$ p: q
接下来就是从代码角度看如何获取完整区块的流程了,有点多,看不懂的时候,再回顾下上面宏观的介绍图。
首先看Fetcher的定义,它存放了通信数据和状态管理,捡加注释的看,上文提到的状态,里面都有。
// Fetcher is responsible for accumulating block announcements from various peers
// and scheduling them for retrieval.# c a! x, ~, l1 T
// 积累块通知,然后调度获取这些块
type Fetcher struct {# ]5 j* ]$ S1 }/ u/ i7 h2 o/ Z
// Various event channels
// 收到区块hash值的通道
notify chan *announce
// 收到完整区块的通道
inject chan *inject& V4 D* G5 T8 R4 k. Q- v4 w* n0 i
blockFilter chan chan []*types.Block/ C+ a, _7 n' @9 z; k
// 过滤header的通道的通道0 L" V4 q3 L6 ~. X7 z p/ a
headerFilter chan chan *headerFilterTask
// 过滤body的通道的通道7 E+ }/ E: N v% q- F9 F
bodyFilter chan chan *bodyFilterTask& R, s5 R( m0 [
done chan common.Hash
quit chan struct{}
// Announce states
// Peer已经给了本节点多少区块头通知
announces map[string]int // Per peer announce counts to prevent memory exhaustion" o4 t" ~4 q! d9 o( o0 m
// 已经announced的区块列表7 Q2 t! E/ D* u7 ?% V0 T
announced map[common.Hash][]*announce // Announced blocks, scheduled for fetching
// 正在fetching区块头的请求8 o8 J( n. @4 }7 i f* d
fetching map[common.Hash]*announce // Announced blocks, currently fetching
// 已经fetch到区块头,还差body的请求,用来获取body
fetched map[common.Hash][]*announce // Blocks with headers fetched, scheduled for body retrieval# _1 N* G; s" X3 K1 ^+ {. m
// 已经得到区块头的
completing map[common.Hash]*announce // Blocks with headers, currently body-completing# G+ R/ {6 G! ]
// Block cache. R- @; Z# e# W4 @! w4 ?
// queue,优先级队列,高度做优先级
// queues,统计peer通告了多少块
// queued,代表这个块如队列了,
queue *prque.Prque // Queue containing the import operations (block number sorted)
queues map[string]int // Per peer block counts to prevent memory exhaustion
queued map[common.Hash]*inject // Set of already queued blocks (to dedupe imports)
// Callbacks
getBlock blockRetrievalFn // Retrieves a block from the local chain
verifyHeader headerVerifierFn // Checks if a block's headers have a valid proof of work,验证区块头,包含了PoW验证
broadcastBlock blockBroadcasterFn // Broadcasts a block to connected peers,广播给peer( r7 a/ S, G0 y p* e
chainHeight chainHeightFn // Retrieves the current chain's height/ e; z; U. b5 Z( `4 X
insertChain chainInsertFn // Injects a batch of blocks into the chain,插入区块到链的函数
dropPeer peerDropFn // Drops a peer for misbehaving8 i. h/ i" L c& {# g. ^' I
// Testing hooks5 L0 N0 f; g2 t( h8 u+ i
announceChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a hash from the announce list
queueChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a block from the import queue
fetchingHook func([]common.Hash) // Method to call upon starting a block (eth/61) or header (eth/62) fetch
completingHook func([]common.Hash) // Method to call upon starting a block body fetch (eth/62)$ g9 } d/ Q! z" J0 P
importedHook func(*types.Block) // Method to call upon successful block import (both eth/61 and eth/62): A8 k8 o K: f7 Q( E
}
NewBlockHashesMsg消息的处理前面的小节已经讲过了,不记得可向前翻看。这里从announced的状态处理说起。loop() 中,fetchTimer超时后,代表了收到了消息通知,需要处理,会从announced中选择出需要处理的通知,然后创建请求,请求区块头,由于可能有很多节点都通知了它某个区块的Hash,所以随机的从这些发送消息的Peer中选择一个Peer,发送请求的时候,为每个Peer都创建了单独的协程。; d! T* w3 j8 d( L; l
case arriveTimeout-gatherSlack {$ B q8 o3 r* C' z
// Pick a random peer to retrieve from, reset all others
// 可能有很多peer都发送了这个区块的hash值,随机选择一个peer
announce := announces[rand.Intn(len(announces))]6 Z& s7 k# E( ?; G1 h4 }
f.forgetHash(hash)7 O# K/ r, ?) y- B6 M- [( E
// If the block still didn't arrive, queue for fetching
// 本地还没有这个区块,创建获取区块的请求# M% J% S- n; g. T, X7 p
if f.getBlock(hash) == nil {
request[announce.origin] = append(request[announce.origin], hash)! }4 G6 r- S0 b& c4 T6 z% B, Z
f.fetching[hash] = announce) E1 O) z4 w: i/ D6 k4 l
}
}6 |# E8 w5 }. {* u. }
}
// Send out all block header requests
// 把所有的request发送出去
// 为每一个peer都创建一个协程,然后请求所有需要从该peer获取的请求
for peer, hashes := range request {
log.Trace("Fetching scheduled headers", "peer", peer, "list", hashes)
// Create a closure of the fetch and schedule in on a new thread
fetchHeader, hashes := f.fetching[hashes[0]].fetchHeader, hashes8 f/ D! a6 S! u- w+ Y& |% ]6 n# r# G
go func() {
if f.fetchingHook != nil {3 F3 v" o* {# C- p6 G- I) A! f
f.fetchingHook(hashes)3 ^7 Z5 M4 f& ^1 }; s F
}# i& L2 Y+ w5 a2 |" |* f
for _, hash := range hashes {
headerFetchMeter.Mark(1)
fetchHeader(hash) // Suboptimal, but protocol doesn't allow batch header retrievals
}; q. M# c! G4 p. Y; d! J
}()( U- C6 @5 F) {( L6 v' W" t3 \
}
// Schedule the next fetch if blocks are still pending
f.rescheduleFetch(fetchTimer)+ h: E+ Q1 x: \6 n/ _, e
从Notify的调用中,可以看出,fetcherHeader()的实际函数是RequestOneHeader(),该函数使用的消息是GetBlockHeadersMsg,可以用来请求多个区块头,不过fetcher只请求一个。
pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestOneHeader, p.RequestBodies), A, P* d% u- H9 g, K
// RequestOneHeader is a wrapper around the header query functions to fetch a
// single header. It is used solely by the fetcher.& i5 W7 _: n+ a; @ |9 w( s
func (p *peer) RequestOneHeader(hash common.Hash) error {
p.Log().Debug("Fetching single header", "hash", hash)
return p2p.Send(p.rw, GetBlockHeadersMsg, &getBlockHeadersData{Origin: hashOrNumber{Hash: hash}, Amount: uint64(1), Skip: uint64(0), Reverse: false})* e* s9 J' ]9 h/ f$ k8 K5 Y; W- z
}- G: G$ _$ }; k5 K6 ~$ Q8 J
GetBlockHeadersMsg的处理如下:因为它是获取多个区块头的,所以处理起来比较“麻烦”,还好,fetcher只获取一个区块头,其处理在20行~33行,获取下一个区块头的处理逻辑,这里就不看了,最后调用SendBlockHeaders()将区块头发送给请求的节点,消息是BlockHeadersMsg。% w2 v" i% H* e" o6 y8 G* ?
···
// handleMsg()
// Block header query, collect the requested headers and reply
case msg.Code == GetBlockHeadersMsg:
// Decode the complex header query; n! B+ [% @$ P: t. Y5 Q
var query getBlockHeadersData
if err := msg.Decode(&query); err != nil {
return errResp(ErrDecode, “%v: %v”, msg, err)
}+ |# C A7 z j: |7 K: c
hashMode := query.Origin.Hash != (common.Hash{})3 v) e0 L* t9 t: o
// Gather headers until the fetch or network limits is reached* Y: F" Z8 z3 j* o2 B
// 收集区块头,直到达到限制+ P) a& I) R- I8 d
var (
bytes common.StorageSize
headers []*types.Header/ i) e- R) Y) R) e9 F$ n. z
unknown bool
)
// 自己已知区块 && 少于查询的数量 && 大小小于2MB && 小于能下载的最大数量% H8 N" X0 ~% D8 O. u0 ~% q8 h
for !unknown && len(headers)
`BlockHeadersMsg`的处理很有意思,因为`GetBlockHeadersMsg`并不是fetcher独占的消息,downloader也可以调用,所以,响应消息的处理需要分辨出是fetcher请求的,还是downloader请求的。它的处理逻辑是:fetcher先过滤收到的区块头,如果fetcher不要的,那就是downloader的,在调用`fetcher.FilterHeaders`的时候,fetcher就将自己要的区块头拿走了。2 w1 C0 f, I/ G, L- G$ n
// handleMsg()
case msg.Code == BlockHeadersMsg:
// A batch of headers arrived to one of our previous requests
var headers []*types.Header
if err := msg.Decode(&headers); err != nil {( b5 o4 @9 W, n
return errResp(ErrDecode, “msg %v: %v”, msg, err); q. z$ N& ~! j5 T: @
}
// If no headers were received, but we’re expending a DAO fork check, maybe it’s that
// 检查是不是当前DAO的硬分叉
if len(headers) == 0 && p.forkDrop != nil {
// Possibly an empty reply to the fork header checks, sanity check TDs5 t0 d& f( z" a7 K `# @ F
verifyDAO := true x$ j% I6 d7 P* B+ D7 t
// If we already have a DAO header, we can check the peer's TD against it. If: F/ V* k4 v6 i+ h
// the peer's ahead of this, it too must have a reply to the DAO check! T3 R+ g; g* X
if daoHeader := pm.blockchain.GetHeaderByNumber(pm.chainconfig.DAOForkBlock.Uint64()); daoHeader != nil {5 ~; z+ ]) t6 _$ \* x8 K/ i
if _, td := p.Head(); td.Cmp(pm.blockchain.GetTd(daoHeader.Hash(), daoHeader.Number.Uint64())) >= 0 {
verifyDAO = false+ I ~6 j, x M& ?/ x, A0 }
}0 }' f' W! F1 ?8 k& m% _
}- ~$ y( Z+ H e0 d: f
// If we're seemingly on the same chain, disable the drop timer2 a N$ }* a' { X9 N8 v' S/ W, t, V
if verifyDAO {
p.Log().Debug("Seems to be on the same side of the DAO fork")# Y) E. L2 b5 \. ?6 b* n) Z
p.forkDrop.Stop()
p.forkDrop = nil
return nil% v# e# z2 I. u& ]1 F! ?0 \9 j
}0 h3 Q( o3 R# y; d L- Q* k6 h
}/ ~) d$ P3 [; T' L
// Filter out any explicitly requested headers, deliver the rest to the downloader( L# o0 X) z. s* d+ w8 o8 i
// 过滤是不是fetcher请求的区块头,去掉fetcher请求的区块头再交给downloader
filter := len(headers) == 1
if filter {; O: \0 v2 M: Y1 M R7 U
// If it's a potential DAO fork check, validate against the rules
// 检查是否硬分叉& v, O; Q$ @* X% z! r' q
if p.forkDrop != nil && pm.chainconfig.DAOForkBlock.Cmp(headers[0].Number) == 0 {
// Disable the fork drop timer% x: H% x' b' {/ E1 g* |/ a
p.forkDrop.Stop()
p.forkDrop = nil
// Validate the header and either drop the peer or continue
if err := misc.VerifyDAOHeaderExtraData(pm.chainconfig, headers[0]); err != nil {/ B. m# d4 s+ x1 E: o/ g
p.Log().Debug("Verified to be on the other side of the DAO fork, dropping")
return err
}9 v2 p, F! q9 n
p.Log().Debug("Verified to be on the same side of the DAO fork")7 l! L$ G' l# r7 |; a9 |0 L/ v
return nil
}
// Irrelevant of the fork checks, send the header to the fetcher just in case
// 使用fetcher过滤区块头
headers = pm.fetcher.FilterHeaders(p.id, headers, time.Now()). P I* {/ n! P9 C0 V1 M& c4 Q" t
}
// 剩下的区块头交给downloader8 O) i2 C, d. e9 w, |
if len(headers) > 0 || !filter {
err := pm.downloader.DeliverHeaders(p.id, headers)/ @: d( _- q3 L$ ?+ a
if err != nil {- m( P( N" N% c p7 {
log.Debug("Failed to deliver headers", "err", err)9 X1 k% s) v/ W; q" m
}
}
`FilterHeaders()`是一个很有大智慧的函数,看起来耐人寻味,但实在妙。它要把所有的区块头,都传递给fetcher协程,还要获取fetcher协程处理后的结果。`fetcher.headerFilter`是存放通道的通道,而`filter`是存放包含区块头过滤任务的通道。它先把`filter`传递给了`headerFilter`,这样`fetcher`协程就在另外一段等待了,而后将`headerFilterTask`传入`filter`,`fetcher`就能读到数据了,处理后,再将数据写回`filter`而刚好被`FilterHeaders`函数处理了,该函数实际运行在`handleMsg()`的协程中。+ U/ T+ y- ~0 b% b
每个Peer都会分配一个ProtocolManager然后处理该Peer的消息,但`fetcher`只有一个事件处理协程,如果不创建一个`filter`,fetcher哪知道是谁发给它的区块头呢?过滤之后,该如何发回去呢?7 w. Q+ J- v0 U/ J( ]
// FilterHeaders extracts all the headers that were explicitly requested by the fetcher,. m3 z' j9 P }* I8 J7 {, c1 v8 {4 ?& q* |
// returning those that should be handled differently.* w3 O7 f' u+ D* H; o9 ]% @" k
// 寻找出fetcher请求的区块头
func (f *Fetcher) FilterHeaders(peer string, headers []*types.Header, time time.Time) []*types.Header {9 W7 T$ K1 z* L( Q+ a( n
log.Trace(“Filtering headers”, “peer”, peer, “headers”, len(headers))
// Send the filter channel to the fetcher" [1 F4 B- F, ?; T* o8 `+ O. t
// 任务通道9 M1 j+ G9 }: {+ R0 d3 Q9 Y
filter := make(chan *headerFilterTask)4 K7 E* T J7 s' ^* p8 L
select {
// 任务通道发送到这个通道
case f.headerFilter
}& {- {" m! J7 @' Z. B- Z2 O
接下来要看f.headerFilter的处理,这段代码有90行,它做了一下几件事:
1. 从`f.headerFilter`取出`filter`,然后取出过滤任务`task`。
2. 它把区块头分成3类:`unknown`这不是分是要返回给调用者的,即`handleMsg()`, `incomplete`存放还需要获取body的区块头,`complete`存放只包含区块头的区块。遍历所有的区块头,填到到对应的分类中,具体的判断可看18行的注释,记住宏观中将的状态转移图。- T, z' r( h! K
3. 把`unknonw`中的区块返回给`handleMsg()`。
4. 把` incomplete`的区块头获取状态移动到`fetched`状态,然后触发定时器,以便去处理complete的区块。2 b6 N( ]8 N% z1 }
5. 把`compelete`的区块加入到`queued`。
// fetcher.loop()* P. B! W7 x9 _6 o: B
case filter :=
// Split the batch of headers into unknown ones (to return to the caller),: j. P& I9 \3 F( H/ j! G, I" W
// known incomplete ones (requiring body retrievals) and completed blocks.
// unknown的不是fetcher请求的,complete放没有交易和uncle的区块,有头就够了,incomplete放
// 还需要获取uncle和交易的区块
unknown, incomplete, complete := []*types.Header{}, []*announce{}, []*types.Block{}; u, D2 P% D5 T4 K" j
// 遍历所有收到的header
for _, header := range task.headers { c5 i& g$ n7 n4 ^5 Y, _7 z6 F
hash := header.Hash()
// Filter fetcher-requested headers from other synchronisation algorithms) k1 K# G# O- G; ?% `
// 是正在获取的hash,并且对应请求的peer,并且未fetched,未completing,未queued" y2 G7 Z. j: M5 d$ R2 _
if announce := f.fetching[hash]; announce != nil && announce.origin == task.peer && f.fetched[hash] == nil && f.completing[hash] == nil && f.queued[hash] == nil {
// If the delivered header does not match the promised number, drop the announcer6 a q0 ~" D4 L* s% w5 d4 I
// 高度校验,竟然不匹配,扰乱秩序,peer肯定是坏蛋。8 r, Q6 x) _" C4 h: f5 g9 q
if header.Number.Uint64() != announce.number {
log.Trace("Invalid block number fetched", "peer", announce.origin, "hash", header.Hash(), "announced", announce.number, "provided", header.Number): f3 V: W3 y2 f3 _0 {0 n+ D) q8 D
f.dropPeer(announce.origin)
f.forgetHash(hash)7 U# [8 k) Z4 D( s5 F2 S
continue
}
// Only keep if not imported by other means& B# N4 ], z- i* u
// 本地链没有当前区块- X+ V9 O: c5 Z- |3 h
if f.getBlock(hash) == nil {" p# h, }6 o/ S: p
announce.header = header
announce.time = task.time
// If the block is empty (header only), short circuit into the final import queue
// 如果区块没有交易和uncle,加入到complete) D. o1 k# \# r% [5 K0 v
if header.TxHash == types.DeriveSha(types.Transactions{}) && header.UncleHash == types.CalcUncleHash([]*types.Header{}) {2 A6 d# y/ T% n- M
log.Trace("Block empty, skipping body retrieval", "peer", announce.origin, "number", header.Number, "hash", header.Hash())& \* } [( G$ j# B
block := types.NewBlockWithHeader(header)
block.ReceivedAt = task.time
complete = append(complete, block)
f.completing[hash] = announce) D2 b: a, b6 l) ~; N' V; e! I6 c
continue: I% G# N( n' [
}" B* H9 h& n9 l' i/ F
// Otherwise add to the list of blocks needing completion
// 否则就是不完整的区块
incomplete = append(incomplete, announce): p, e, c, V! \8 d
} else {
log.Trace("Block already imported, discarding header", "peer", announce.origin, "number", header.Number, "hash", header.Hash())
f.forgetHash(hash)- z9 Q! M* A% r8 r
}5 M6 }. d% Z5 C% ]
} else {
// Fetcher doesn't know about it, add to the return list
// 没请求过的header
unknown = append(unknown, header)
}
}
// 把未知的区块头,再传递会filter. I2 x$ S( I. J
headerFilterOutMeter.Mark(int64(len(unknown)))5 q; o3 ?0 f6 a; f& J; |
select {' Q0 D" s) ]+ N3 d& n
case filter
跟随状态图的转义,剩下的工作是`fetched`转移到`completing` ,上面的流程已经触发了`completeTimer`定时器,超时后就会处理,流程与请求Header类似,不再赘述,此时发送的请求消息是`GetBlockBodiesMsg`,实际调的函数是`RequestBodies`。& v) M I0 U1 u
// fetcher.loop()5 D# q2 q n! C% ~& W& Y3 f
case
// 遍历所有待获取body的announce$ U9 m& w7 [4 p" r
for hash, announces := range f.fetched {
// Pick a random peer to retrieve from, reset all others4 o# L/ |; r# H; _: z/ o
// 随机选一个Peer发送请求,因为可能已经有很多Peer通知它这个区块了& v: b0 ?0 X( z) r& P [: |
announce := announces[rand.Intn(len(announces))]
f.forgetHash(hash)- c |/ q6 f1 Q; _* a* a
// If the block still didn't arrive, queue for completion$ ~1 M! h& C0 d3 ]8 Y
// 如果本地没有这个区块,则放入到completing,创建请求
if f.getBlock(hash) == nil {$ i# m# P- U1 H* P/ c* F) [
request[announce.origin] = append(request[announce.origin], hash)
f.completing[hash] = announce! D" q9 w4 Y: H2 |1 J( y$ H
}# p- L# [! e4 t5 x, K" W g
}
// Send out all block body requests9 s/ k6 g6 ] c$ D2 {+ m2 K; E
// 发送所有的请求,获取body,依然是每个peer一个单独协程
for peer, hashes := range request {4 j. C2 S U' b# X- i! s% C3 i
log.Trace("Fetching scheduled bodies", "peer", peer, "list", hashes)
// Create a closure of the fetch and schedule in on a new thread
if f.completingHook != nil {2 C( O% R3 n/ ]* v6 W
f.completingHook(hashes)
}
bodyFetchMeter.Mark(int64(len(hashes)))
go f.completing[hashes[0]].fetchBodies(hashes)
}6 w8 ]! S# i; ?* m) f' S& }3 f
// Schedule the next fetch if blocks are still pending/ S( a/ T* f& L2 s$ \# A2 v
f.rescheduleComplete(completeTimer)3 ?/ @% ]$ c" |6 L& v g' j& B& Q3 W
`handleMsg()`处理该消息也是干净利落,直接获取RLP格式的body,然后发送响应消息。7 x9 ~/ t9 b2 G: D T2 p
// handleMsg()- u0 |/ i2 y( c4 p7 w( O
case msg.Code == GetBlockBodiesMsg:
// Decode the retrieval message: x! k$ B- b9 m9 u, v& G
msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
if _, err := msgStream.List(); err != nil { E& z: z; m7 i' Z% M
return err
}# W; ]) X) Z" a
// Gather blocks until the fetch or network limits is reached
var (# |/ W) C. p0 L9 V/ N [
hash common.Hash
bytes int. H1 p. x7 [0 N! X. M7 q4 h1 S
bodies []rlp.RawValue
)8 G) }# S9 g( y8 J6 d+ H
// 遍历所有请求' {7 @+ W' x3 \- c0 I
for bytes + m3 Q9 c' h% q. S
响应消息`BlockBodiesMsg`的处理与处理获取header的处理原理相同,先交给fetcher过滤,然后剩下的才是downloader的。需要注意一点,响应消息里只包含交易列表和叔块列表。# n Z% r8 O5 `+ h1 E/ S8 X
// handleMsg()+ `- P { U( c& X& c( b2 f
case msg.Code == BlockBodiesMsg:
// A batch of block bodies arrived to one of our previous requests
var request blockBodiesData
if err := msg.Decode(&request); err != nil {8 s9 c* b, R2 i+ E2 w
return errResp(ErrDecode, “msg %v: %v”, msg, err)/ @9 @9 B3 d) V
}. L$ H8 [/ `% A, t
// Deliver them all to the downloader for queuing. H4 s+ e! h# J4 f2 x. M
// 传递给downloader去处理
transactions := make([][]*types.Transaction, len(request))
uncles := make([][]*types.Header, len(request))! P# @" \1 O. C5 Y
for i, body := range request {1 W) {; `0 Y. _1 ?$ k4 T' v5 W
transactions = body.Transactions' T5 V1 @; V+ Y! c* X, f
uncles = body.Uncles
}
// Filter out any explicitly requested bodies, deliver the rest to the downloader& N2 k2 {' n) L; \
// 先让fetcher过滤去fetcher请求的body,剩下的给downloader" e: H! ]& F: G& N* H
filter := len(transactions) > 0 || len(uncles) > 0
if filter {! v A. A$ o" I
transactions, uncles = pm.fetcher.FilterBodies(p.id, transactions, uncles, time.Now())0 K% m% V+ a5 J8 E
}/ ]; ~" j/ w2 H% H* ?
// 剩下的body交给downloader
if len(transactions) > 0 || len(uncles) > 0 || !filter {
err := pm.downloader.DeliverBodies(p.id, transactions, uncles)
if err != nil {
log.Debug("Failed to deliver bodies", "err", err), W2 o# N+ R1 H. o5 i8 ^8 `
}
}
过滤函数的原理也与Header相同。( F" K) I r8 u* `/ U
// FilterBodies extracts all the block bodies that were explicitly requested by
// the fetcher, returning those that should be handled differently.; M) e: a6 Q, X
// 过去出fetcher请求的body,返回它没有处理的,过程类型header的处理
func (f *Fetcher) FilterBodies(peer string, transactions [][]*types.Transaction, uncles [][]*types.Header, time time.Time) ([][]*types.Transaction, [][]*types.Header) {
log.Trace(“Filtering bodies”, “peer”, peer, “txs”, len(transactions), “uncles”, len(uncles))
// Send the filter channel to the fetcher
filter := make(chan *bodyFilterTask)
select {
case f.bodyFilter
}5 C9 Z- l% k7 O" l+ s0 B
实际过滤body的处理瞧一下,这和Header的处理是不同的。直接看不点:
1. 它要的区块,单独取出来存到`blocks`中,它不要的继续留在`task`中。: V i& E; V/ R
2. 判断是不是fetcher请求的方法:如果交易列表和叔块列表计算出的hash值与区块头中的一样,并且消息来自请求的Peer,则就是fetcher请求的。$ x, v. K2 T; A6 H6 D2 X
3. 将`blocks`中的区块加入到`queued`,终结。
case filter := 6 E" r# U* J$ E
blocks := []*types.Block{}
// 获取的每个body的txs列表和uncle列表. y& D& r- _6 j0 _
// 遍历每个区块的txs列表和uncle列表,计算hash后判断是否是当前fetcher请求的body& h9 B# e, x" H8 l
for i := 0; i
}, A) k- N' ]$ l( {" U
至此,fetcher获取完整区块的流程讲完了,fetcher模块中80%的代码也都贴出来了,还有2个值得看看的函数:/ f5 F8 b X6 ^6 l4 q
1. `forgetHash(hash common.Hash)`:用于清空指定hash指的记/状态录信息。
2. `forgetBlock(hash common.Hash)`:用于从队列中移除一个区块。
最后了,再回到开始看看fetcher模块和新区块的传播流程,有没有豁然开朗。' P5 B* v' o8 Y8 Q
P+ ?- R) ~2 I E6 Y1 m- U
成为第一个吐槽的人