以太坊源码分析:fetcher模块和区块传播
刘艳琴
发表于 2022-12-21 01:53:48
140
0
0
当前代码是以太坊Release 1.8,如果版本不同,代码上可能存在差异。
总体过程和传播策略$ W. q, V5 W% w/ T
本节从宏观角度介绍,节点产生区块后,为了传播给远端节点做了啥,远端节点收到区块后又做了什么,每个节点都连接了很多Peer,它传播的策略是什么样的?
总体流程和策略可以总结为,传播给远端Peer节点,Peer验证区块无误后,加入到本地区块链,继续传播新区块信息。具体过程如下。( K) C9 J* l6 F U
先看总体过程。产生区块后,miner模块会发布一个事件NewMinedBlockEvent,订阅事件的协程收到事件后,就会把新区块的消息,广播给它所有的peer,peer收到消息后,会交给自己的fetcher模块处理,fetcher进行基本的验证后,区块没问题,发现这个区块就是本地链需要的下一个区块,则交给blockChain进一步进行完整的验证,这个过程会执行区块所有的交易,无误后把区块加入到本地链,写入数据库,这个过程就是下面的流程图,图1。. f* W1 |3 S; C
总体流程图,能看到有个分叉,是因为节点传播新区块是有策略的。它的传播策略为:
假如节点连接了N个Peer,它只向Peer列表的sqrt(N)个Peer广播完整的区块消息。向所有的Peer广播只包含区块Hash的消息。% @6 _4 W4 P) C& C
策略图的效果如图2,红色节点将区块传播给黄色节点:
+ i6 O P+ `- \% G" d" T4 O* V
收到区块Hash的节点,需要从发送给它消息的Peer那里获取对应的完整区块,获取区块后就会按照图1的流程,加入到fetcher队列,最终插入本地区块链后,将区块的Hash值广播给和它相连,但还不知道这个区块的Peer。非产生区块节点的策略图,如图3,黄色节点将区块Hash传播给青色节点:; i2 p( q! x$ l- ]& S
; U" p/ m# z" A" n: f; K
至此,可以看出以太坊采用以石击水的方式,像水纹一样,层层扩散新产生的区块。: F" ~6 E8 F8 T2 r' p
Fetcher模块是干啥的
fetcher模块的功能,就是收集其他Peer通知它的区块信息:1)完整的区块2)区块Hash消息。根据通知的消息,获取完整的区块,然后传递给eth模块把区块插入区块链。
如果是完整区块,就可以传递给eth插入区块,如果只有区块Hash,则需要从其他的Peer获取此完整的区块,然后再传递给eth插入区块2 l$ F8 w( |9 A/ R c" [9 i
/ l, ^6 b3 @- E" U' K& \
源码解读
本节介绍区块传播和处理的细节东西,方式仍然是先用图解释流程,再是代码流程。
产块节点的传播新区块
节点产生区块后,广播的流程可以表示为图4:
发布事件事件处理函数选择要广播完整的Peer,然后将区块加入到它们的队列事件处理函数把区块Hash添加到所有Peer的另外一个通知队列每个Peer的广播处理函数,会遍历它的待广播区块队列和通知队列,把数据封装成消息,调用P2P接口发送出去( F& s# o( z' _* q7 ]: X: _
, \6 w+ @/ R- x, X
再看下代码上的细节。; L8 ]& L- {8 n$ J
worker.wait()函数发布事件NewMinedBlockEvent。ProtocolManager.minedBroadcastLoop()是事件处理函数。它调用了2次pm.BroadcastBlock()。
// Mined broadcast loop
func (pm *ProtocolManager) minedBroadcastLoop() {
// automatically stops if unsubscribe0 R, C) ]; I" y1 k
for obj := range pm.minedBlockSub.Chan() {
switch ev := obj.Data.(type) {
case core.NewMinedBlockEvent:
pm.BroadcastBlock(ev.Block, true) // First propagate block to peers G( H7 B+ A! B& C
pm.BroadcastBlock(ev.Block, false) // Only then announce to the rest: J* V$ I+ R% b! J
}
} g( c7 _2 A: X: y4 l. \+ f1 g
}5 M7 y/ z4 \) S: `0 M
pm.BroadcastBlock()的入参propagate为真时,向部分Peer广播完整的区块,调用peer.AsyncSendNewBlock(),否则向所有Peer广播区块头,调用peer.AsyncSendNewBlockHash(),这2个函数就是把数据放入队列,此处不再放代码。
// BroadcastBlock will either propagate a block to a subset of it's peers, or
// will only announce it's availability (depending what's requested).
func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) {4 i5 b# u- v1 W. ?! b- o9 T) i1 y( l
hash := block.Hash()
peers := pm.peers.PeersWithoutBlock(hash)4 N- b9 O! ]1 _9 m# }
// If propagation is requested, send to a subset of the peer$ O2 E) S6 Z7 Y; Q4 ?
// 这种情况,要把区块广播给部分peer5 N: u5 ^6 t" J ?# G0 k7 }( L7 I
if propagate {. ^3 E1 m; R. l9 D
// Calculate the TD of the block (it's not imported yet, so block.Td is not valid)! p# S W4 `) ]" ]- @
// 计算新的总难度
var td *big.Int
if parent := pm.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1); parent != nil {
td = new(big.Int).Add(block.Difficulty(), pm.blockchain.GetTd(block.ParentHash(), block.NumberU64()-1))5 j7 w. m- t6 j% C
} else {# P/ e9 t' G/ X1 S+ I% }; A- d. f
log.Error("Propagating dangling block", "number", block.Number(), "hash", hash)' {% n( B$ P2 {4 K+ _( u3 l( G
return& I' I( B; a/ b! i( e
}7 K. Y# k' ~7 b* h/ h5 K
// Send the block to a subset of our peers) [# T7 e- k2 E; U* u
// 广播区块给部分peer8 B6 ~4 O0 B/ }1 ]
transfer := peers[:int(math.Sqrt(float64(len(peers))))]
for _, peer := range transfer {
peer.AsyncSendNewBlock(block, td)
}
log.Trace("Propagated block", "hash", hash, "recipients", len(transfer), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))
return* m2 U1 H* @, d- K2 K" _
}) n+ c" V& u1 V' ^. Q9 o$ U! F$ \6 i! `
// Otherwise if the block is indeed in out own chain, announce it
// 把区块hash值广播给所有peer
if pm.blockchain.HasBlock(hash, block.NumberU64()) {
for _, peer := range peers {
peer.AsyncSendNewBlockHash(block)9 t4 H( d# I$ j9 R- d1 d
}
log.Trace("Announced block", "hash", hash, "recipients", len(peers), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))
}2 ~2 S6 x# Y/ Y7 T
}
peer.broadcase()是每个Peer连接的广播函数,它只广播3种消息:交易、完整的区块、区块的Hash,这样表明了节点只会主动广播这3中类型的数据,剩余的数据同步,都是通过请求-响应的方式。
// broadcast is a write loop that multiplexes block propagations, announcements, |, L! z f* V! J
// and transaction broadcasts into the remote peer. The goal is to have an async
// writer that does not lock up node internals.
func (p *peer) broadcast() {2 y; V6 [; |" U! u
for {
select {
// 广播交易6 t4 n- c5 ?0 v5 d% d# Q, ]
case txs := + H0 r* f5 ] @: ~; x
Peer节点处理新区块
本节介绍远端节点收到2种区块同步消息的处理,其中NewBlockMsg的处理流程比较清晰,也简洁。NewBlockHashesMsg消息的处理就绕了2绕,从总体流程图1上能看出来,它需要先从给他发送消息Peer那里获取到完整的区块,剩下的流程和NewBlockMsg又一致了。8 q( A% G" f3 ^" v$ Z
这部分涉及的模块多,画出来有种眼花缭乱的感觉,但只要抓住上面的主线,代码看起来还是很清晰的。通过图5先看下整体流程。( X+ ?/ R# U) s! S" v$ `8 y" S( U
消息处理的起点是ProtocolManager.handleMsg,NewBlockMsg的处理流程是蓝色标记的区域,红色区域是单独的协程,是fetcher处理队列中区块的流程,如果从队列中取出的区块是当前链需要的,校验后,调用blockchian.InsertChain()把区块插入到区块链,最后写入数据库,这是黄色部分。最后,绿色部分是NewBlockHashesMsg的处理流程,代码流程上是比较复杂的,为了能通过图描述整体流程,我把它简化掉了。/ U" O- `8 ~8 v- Z+ P6 T
仔细看看这幅图,掌握整体的流程后,接下来看每个步骤的细节。5 e+ a# v. @8 U# B. o
NewBlockMsg的处理
本节介绍节点收到完整区块的处理,流程如下:
首先进行RLP编解码,然后标记发送消息的Peer已经知道这个区块,这样本节点最后广播这个区块的Hash时,不会再发送给该Peer。
将区块存入到fetcher的队列,调用fetcher.Enqueue。8 N0 s5 K$ Y8 O" u
更新Peer的Head位置,然后判断本地链是否落后于Peer的链,如果是,则通过Peer更新本地链。
只看handle.Msg()的NewBlockMsg相关的部分。; C7 z$ Q% B2 }2 }
case msg.Code == NewBlockMsg:. l, O1 g9 O6 B6 z" u
// Retrieve and decode the propagated block1 `. |# h& ]% t8 L1 \' J! S- B
// 收到新区块,解码,赋值接收数据" n$ U- G4 V1 }0 `/ a6 b& V* B. v
var request newBlockData
if err := msg.Decode(&request); err != nil {
return errResp(ErrDecode, "%v: %v", msg, err)
}
request.Block.ReceivedAt = msg.ReceivedAt, K8 r# q. v' b2 e
request.Block.ReceivedFrom = p
// Mark the peer as owning the block and schedule it for import
// 标记peer知道这个区块
p.MarkBlock(request.Block.Hash())
// 为啥要如队列?已经得到完整的区块了$ F2 C8 y1 f, n* }% |
// 答:存入fetcher的优先级队列,fetcher会从队列中选取当前高度需要的块
pm.fetcher.Enqueue(p.id, request.Block)# H* r+ w. L6 |
// Assuming the block is importable by the peer, but possibly not yet done so, Q, c0 Q4 C0 L. \ }
// calculate the head hash and TD that the peer truly must have., F( p' T. I6 z
// 截止到parent区块的头和难度
var () @9 v- S# C5 n; n
trueHead = request.Block.ParentHash()
trueTD = new(big.Int).Sub(request.TD, request.Block.Difficulty())
)
// Update the peers total difficulty if better than the previous
// 如果收到的块的难度大于peer之前的,以及自己本地的,就去和这个peer同步
// 问题:就只用了一下块里的hash指,为啥不直接使用这个块呢,如果这个块不能用,干嘛不少发送些数据,减少网络负载呢。
// 答案:实际上,这个块加入到了优先级队列中,当fetcher的loop检查到当前下一个区块的高度,正是队列中有的,则不再向peer请求
// 该区块,而是直接使用该区块,检查无误后交给block chain执行insertChain
if _, td := p.Head(); trueTD.Cmp(td) > 0 {7 {; N* T% c2 H( u
p.SetHead(trueHead, trueTD)
// Schedule a sync if above ours. Note, this will not fire a sync for a gap of& L7 L* ]5 C! P* U1 |7 T
// a singe block (as the true TD is below the propagated block), however this$ X# c6 P9 s/ Q ]1 _- |! q7 e9 s
// scenario should easily be covered by the fetcher.
currentBlock := pm.blockchain.CurrentBlock()
if trueTD.Cmp(pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64())) > 0 {7 \( u3 z5 S: s% I o% B: k8 A( |
go pm.synchronise(p)
}3 M, e' I3 x2 Q4 n; U9 z, _
}( i6 M0 I, t, G
//------------------------ 以上 handleMsg
// Enqueue tries to fill gaps the the fetcher's future import queue.3 t7 p+ P# V0 g9 A) ^# i
// 发给inject通道,当前协程在handleMsg,通过通道发送给fetcher的协程处理
func (f *Fetcher) Enqueue(peer string, block *types.Block) error {
op := &inject{
origin: peer,
block: block,
}
select {
case f.inject blockLimit {7 D0 Q9 C0 \# K Y& @- s& p+ s% M
log.Debug("Discarded propagated block, exceeded allowance", "peer", peer, "number", block.Number(), "hash", hash, "limit", blockLimit)" `5 g' i2 s' D7 l6 n9 B
propBroadcastDOSMeter.Mark(1)
f.forgetHash(hash)( S2 z. w2 o6 N* h! B
return. A/ s5 X# z9 e: Z: o9 E$ T- d1 ?
}
// Discard any past or too distant blocks) l5 f5 Z# {; Y/ ]( a2 C6 W
// 高度检查:未来太远的块丢弃" i: `, I N6 O! h: k& l
if dist := int64(block.NumberU64()) - int64(f.chainHeight()); dist maxQueueDist {; |( V& x/ e, M& ~4 B
log.Debug("Discarded propagated block, too far away", "peer", peer, "number", block.Number(), "hash", hash, "distance", dist)
propBroadcastDropMeter.Mark(1)
f.forgetHash(hash): e. r+ Y2 E( ]5 j# k8 @" M
return2 G l5 W3 Y" |8 I4 B- W
}4 k2 w3 e0 x4 b. U8 m. b: Y
// Schedule the block for future importing
// 块先加入优先级队列,加入链之前,还有很多要做9 ~. `- S+ z/ n# S C
if _, ok := f.queued[hash]; !ok {6 ~6 G1 |+ l' P/ p
op := &inject{
origin: peer, i. N6 y3 j8 D5 { O0 p
block: block,( k# h9 s9 F% o8 i9 c
}6 e8 `7 R9 A. ?+ w" G3 h1 u+ t
f.queues[peer] = count7 ?* D$ Z* ^" ?* x% b* h; x
f.queued[hash] = op
f.queue.Push(op, -float32(block.NumberU64()))
if f.queueChangeHook != nil {7 D& i2 L% Z1 I# J. h/ i
f.queueChangeHook(op.block.Hash(), true)3 e& k g: `( |6 J1 U
}
log.Debug("Queued propagated block", "peer", peer, "number", block.Number(), "hash", hash, "queued", f.queue.Size())
}! s* Q+ I! g" {, C* l4 c0 Y) l; D+ P
}7 }' I& [- o. h5 j
fetcher队列处理2 B& s% W. Q4 B% E0 E7 j( Y9 s
本节我们看看,区块加入队列后,fetcher如何处理区块,为何不直接校验区块,插入到本地链?
由于以太坊又Uncle的机制,节点可能收到老一点的一些区块。另外,节点可能由于网络原因,落后了几个区块,所以可能收到“未来”的一些区块,这些区块都不能直接插入到本地链。
区块入的队列是一个优先级队列,高度低的区块会被优先取出来。fetcher.loop是单独协程,不断运转,清理fecther中的事务和事件。首先会清理正在fetching的区块,但已经超时。然后处理优先级队列中的区块,判断高度是否是下一个区块,如果是则调用f.insert()函数,校验后调用BlockChain.InsertChain(),成功插入后,广播新区块的Hash。% F! H. j' ^! H6 l- U$ \ I
// Loop is the main fetcher loop, checking and processing various notification2 u9 a9 L% V$ D$ E- |+ b
// events.4 n0 z8 x. l0 n6 z
func (f *Fetcher) loop() {# v3 H' n7 u; g
// Iterate the block fetching until a quit is requested
fetchTimer := time.NewTimer(0)
completeTimer := time.NewTimer(0)' z4 h9 N) Q( w+ ]
for {
// Clean up any expired block fetches
// 清理过期的区块* R' Z; _: e% L# i+ `5 u
for hash, announce := range f.fetching {' K, A2 ~, `3 j _
if time.Since(announce.time) > fetchTimeout {3 b0 { |0 B; A5 w" q; _. `
f.forgetHash(hash)4 p6 f' s1 `" p$ J0 \& G* N
}
}. j( F* X6 U/ {) i- x4 w6 J
// Import any queued blocks that could potentially fit6 ]7 X" a8 N( y% B2 D2 g" I
// 导入队列中合适的块
height := f.chainHeight()
for !f.queue.Empty() {4 y' N* H. y3 _* l9 ?0 [
op := f.queue.PopItem().(*inject)) D9 u6 V* j8 k3 T2 j' _
hash := op.block.Hash()
if f.queueChangeHook != nil {
f.queueChangeHook(hash, false). I( T( r) ]2 s
}
// If too high up the chain or phase, continue later) B9 P% n- Z B9 ~9 T
// 块不是链需要的下一个块,再入优先级队列,停止循环
number := op.block.NumberU64(): }$ t6 R6 b5 }: f) H
if number > height+1 {/ G. a1 L/ t6 I. A
f.queue.Push(op, -float32(number))5 J3 h' P' j: e: U L0 C
if f.queueChangeHook != nil {6 m% _) y6 g' E' F- z1 x
f.queueChangeHook(hash, true)
}
break
}
// Otherwise if fresh and still unknown, try and import
// 高度正好是我们想要的,并且链上也没有这个块! K- ?( ~8 |) h. U
if number+maxUncleDist
func (f *Fetcher) insert(peer string, block *types.Block) {+ |$ p' o0 j; \
hash := block.Hash()
// Run the import on a new thread
log.Debug("Importing propagated block", "peer", peer, "number", block.Number(), "hash", hash)
go func() {
defer func() { f.done 5 m8 \$ D2 u' {6 q" O
NewBlockHashesMsg的处理
本节介绍NewBlockHashesMsg的处理,其实,消息处理是简单的,而复杂一点的是从Peer哪获取完整的区块,下节再看。
流程如下:+ n, _$ ~% h" H: J4 s* [) R
对消息进行RLP解码,然后标记Peer已经知道此区块。寻找出本地区块链不存在的区块Hash值,把这些未知的Hash通知给fetcher。fetcher.Notify记录好通知信息,塞入notify通道,以便交给fetcher的协程。fetcher.loop()会对notify中的消息进行处理,确认区块并非DOS攻击,然后检查区块的高度,判断该区块是否已经在fetching或者comleting(代表已经下载区块头,在下载body),如果都没有,则加入到announced中,触发0s定时器,进行处理。. B3 w# x4 E2 b: s
关于announced下节再介绍。
// handleMsg()部分
case msg.Code == NewBlockHashesMsg:
var announces newBlockHashesData
if err := msg.Decode(&announces); err != nil {: G1 {1 [+ w: k5 q) w. ^
return errResp(ErrDecode, "%v: %v", msg, err)
}* S2 C2 k- I. E' ]0 ?3 {7 ~/ X9 d
// Mark the hashes as present at the remote node
for _, block := range announces {
p.MarkBlock(block.Hash)- ]9 I- w2 O8 U6 C0 G$ S/ @
}6 \$ k- l# v; M
// Schedule all the unknown hashes for retrieval+ a% v6 N- s6 `
// 把本地链没有的块hash找出来,交给fetcher去下载
unknown := make(newBlockHashesData, 0, len(announces))2 d0 {. |" L* j8 e, B
for _, block := range announces {
if !pm.blockchain.HasBlock(block.Hash, block.Number) {
unknown = append(unknown, block)2 _$ S; F6 n; \
}
}/ z$ W$ u. `* X
for _, block := range unknown {" P/ ?! o6 G5 G" O& Q
pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestOneHeader, p.RequestBodies)
}- N" y% S4 x" M v
// Notify announces the fetcher of the potential availability of a new block in
// the network.0 ?8 w0 d; V; k( t; q
// 通知fetcher(自己)有新块产生,没有块实体,有hash、高度等信息
func (f *Fetcher) Notify(peer string, hash common.Hash, number uint64, time time.Time,0 h+ T6 V3 S8 }- e
headerFetcher headerRequesterFn, bodyFetcher bodyRequesterFn) error {( }2 Y/ @8 l4 U1 I4 Z' C/ O1 B
block := &announce{
hash: hash,, t. l l& K0 i5 _
number: number,
time: time,1 q" ^; | D9 g/ _8 {
origin: peer,& Z* N: d9 K5 H
fetchHeader: headerFetcher,
fetchBodies: bodyFetcher,3 N' I" j+ k8 U8 A5 |, g! l7 s: x
}
select {2 w; P0 g* `* b; f9 b+ {
case f.notify hashLimit {, @/ j" |; r1 H7 R k: G& H# H
log.Debug("Peer exceeded outstanding announces", "peer", notification.origin, "limit", hashLimit)( p5 ^/ h- W2 o; E
propAnnounceDOSMeter.Mark(1)
break) S p5 X: x* U
}
// If we have a valid block number, check that it's potentially useful
// 高度检查$ g' s, M0 K* Z
if notification.number > 0 {
if dist := int64(notification.number) - int64(f.chainHeight()); dist maxQueueDist {
log.Debug("Peer discarded announcement", "peer", notification.origin, "number", notification.number, "hash", notification.hash, "distance", dist)
propAnnounceDropMeter.Mark(1)
break
}8 H1 S9 A* E7 E" [, _2 f2 a
}
// All is well, schedule the announce if block's not yet downloading# O& o& v$ S; ^3 D4 @0 H! L
// 检查是否已经在下载,已下载则忽略; B I( Y% T) j# M: n/ c- ?! @ k( u
if _, ok := f.fetching[notification.hash]; ok {
break6 t7 i7 Q8 K2 f, `4 J
}2 [$ d$ e+ f/ a7 b8 o
if _, ok := f.completing[notification.hash]; ok {- {, x# I7 i/ D
break
}
// 更新peer已经通知给我们的区块数量5 U6 a8 h( e; r. _2 P
f.announces[notification.origin] = count4 Q' d- ]2 ?7 w; ?- Y
// 把通知信息加入到announced,供调度
f.announced[notification.hash] = append(f.announced[notification.hash], notification)! n6 P1 L+ Q \2 a1 B9 Q$ T9 W
if f.announceChangeHook != nil && len(f.announced[notification.hash]) == 1 {
f.announceChangeHook(notification.hash, true)
}
if len(f.announced) == 1 {9 N( [+ p! h6 |) B* ~
// 有通知放入到announced,则重设0s定时器,loop的另外一个分支会处理这些通知9 W+ e8 S4 t! K& j+ I: [& p' n3 _
f.rescheduleFetch(fetchTimer)
}8 ]* m! g3 ?0 O, w! [ i2 j
fetcher获取完整区块
本节介绍fetcher获取完整区块的过程,这也是fetcher最重要的功能,会涉及到fetcher至少80%的代码。单独拉放一大节吧。$ ^% @% B" }2 V' c! T0 V
Fetcher的大头
Fetcher最主要的功能就是获取完整的区块,然后在合适的实际交给InsertChain去验证和插入到本地区块链。我们还是从宏观入手,看Fetcher是如何工作的,一定要先掌握好宏观,因为代码层面上没有这么清晰。
宏观* W/ ~: l A H. l
首先,看两个节点是如何交互,获取完整区块,使用时序图的方式看一下,见图6,流程很清晰不再文字介绍。' {3 [2 E8 C# B
再看下获取区块过程中,fetcher内部的状态转移,它使用状态来记录,要获取的区块在什么阶段,见图7。我稍微解释一下:
收到NewBlockHashesMsg后,相关信息会记录到announced,进入announced状态,代表了本节点接收了消息。announced由fetcher协程处理,经过校验后,会向给他发送消息的Peer发送请求,请求该区块的区块头,然后进入fetching状态。获取区块头后,如果区块头表示没有交易和uncle,则转移到completing状态,并且使用区块头合成完整的区块,加入到queued优先级队列。获取区块头后,如果区块头表示该区块有交易和uncle,则转移到fetched状态,然后发送请求,请求交易和uncle,然后转移到completing状态。收到交易和uncle后,使用头、交易、uncle这3个信息,生成完整的区块,加入到队列queued。
$ N' ?# s9 k5 u2 k
微观/ ]$ A/ u/ c! v' J4 m, R% t2 u/ N! K
接下来就是从代码角度看如何获取完整区块的流程了,有点多,看不懂的时候,再回顾下上面宏观的介绍图。/ J) s+ ^ i$ D3 _" n
首先看Fetcher的定义,它存放了通信数据和状态管理,捡加注释的看,上文提到的状态,里面都有。
// Fetcher is responsible for accumulating block announcements from various peers
// and scheduling them for retrieval.& R/ [# q: a0 K
// 积累块通知,然后调度获取这些块0 V, T8 F2 V! {2 L' E( q' q. A
type Fetcher struct { x7 r5 G7 J& T1 x1 D
// Various event channels
// 收到区块hash值的通道
notify chan *announce+ V8 ?' m) Y) t
// 收到完整区块的通道1 _ E: Y+ {, b9 e& G t) \
inject chan *inject
blockFilter chan chan []*types.Block1 Z5 i* K) J( H2 }$ R; j
// 过滤header的通道的通道% o0 n3 L8 O% V, d) m! ?; ~+ D
headerFilter chan chan *headerFilterTask; G/ p I/ R) I4 ]; [6 Q* Y/ V) z7 ^3 ]
// 过滤body的通道的通道
bodyFilter chan chan *bodyFilterTask
done chan common.Hash" U0 B( g) X! {( @$ d
quit chan struct{}
// Announce states3 T( {1 \3 U( `, X. E
// Peer已经给了本节点多少区块头通知7 q% c/ S1 r4 u, r
announces map[string]int // Per peer announce counts to prevent memory exhaustion
// 已经announced的区块列表
announced map[common.Hash][]*announce // Announced blocks, scheduled for fetching
// 正在fetching区块头的请求& y7 }+ f$ M, b5 k6 M' f+ v
fetching map[common.Hash]*announce // Announced blocks, currently fetching& D, h. |! j3 ~7 P8 \( X# Z
// 已经fetch到区块头,还差body的请求,用来获取body {% Y2 @$ G; T
fetched map[common.Hash][]*announce // Blocks with headers fetched, scheduled for body retrieval
// 已经得到区块头的3 f5 e* a7 N. P4 L P. t
completing map[common.Hash]*announce // Blocks with headers, currently body-completing
// Block cache0 u- |; F1 ~ k+ P4 M) t6 b
// queue,优先级队列,高度做优先级
// queues,统计peer通告了多少块% @/ B1 _# g# E6 n! j
// queued,代表这个块如队列了,
queue *prque.Prque // Queue containing the import operations (block number sorted)( Y. f9 L. X9 v1 b
queues map[string]int // Per peer block counts to prevent memory exhaustion- e. E* K, Y, B/ X1 V
queued map[common.Hash]*inject // Set of already queued blocks (to dedupe imports)
// Callbacks V/ c$ O/ H6 Y% d/ h0 O+ J
getBlock blockRetrievalFn // Retrieves a block from the local chain
verifyHeader headerVerifierFn // Checks if a block's headers have a valid proof of work,验证区块头,包含了PoW验证
broadcastBlock blockBroadcasterFn // Broadcasts a block to connected peers,广播给peer- }2 T1 i1 k, @2 ]! z5 k( J
chainHeight chainHeightFn // Retrieves the current chain's height4 O* B W0 J2 p- {0 M2 J: ]( S
insertChain chainInsertFn // Injects a batch of blocks into the chain,插入区块到链的函数& x$ l& w+ Y1 e
dropPeer peerDropFn // Drops a peer for misbehaving
// Testing hooks
announceChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a hash from the announce list0 j* t5 ?/ e4 T! S2 |8 x2 h3 t
queueChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a block from the import queue+ _: |* o% N. B7 u
fetchingHook func([]common.Hash) // Method to call upon starting a block (eth/61) or header (eth/62) fetch
completingHook func([]common.Hash) // Method to call upon starting a block body fetch (eth/62)
importedHook func(*types.Block) // Method to call upon successful block import (both eth/61 and eth/62)" b8 ~0 t, R, `' K# _
}
NewBlockHashesMsg消息的处理前面的小节已经讲过了,不记得可向前翻看。这里从announced的状态处理说起。loop() 中,fetchTimer超时后,代表了收到了消息通知,需要处理,会从announced中选择出需要处理的通知,然后创建请求,请求区块头,由于可能有很多节点都通知了它某个区块的Hash,所以随机的从这些发送消息的Peer中选择一个Peer,发送请求的时候,为每个Peer都创建了单独的协程。
case arriveTimeout-gatherSlack {* U: `/ t" L# [7 Y2 }
// Pick a random peer to retrieve from, reset all others4 w4 {" L' P5 [: }
// 可能有很多peer都发送了这个区块的hash值,随机选择一个peer+ v' K- S9 X% K+ w% y5 O" [
announce := announces[rand.Intn(len(announces))]
f.forgetHash(hash)
// If the block still didn't arrive, queue for fetching
// 本地还没有这个区块,创建获取区块的请求' |, k2 d) k3 \; a' S
if f.getBlock(hash) == nil {/ U2 {3 W2 z' V$ }
request[announce.origin] = append(request[announce.origin], hash)
f.fetching[hash] = announce
}. s3 T" d+ ?' n7 l6 {. ?: ~5 [4 E
}% x! F. h, |2 p7 C+ s8 ?7 l
}6 y# F6 W4 z, Y
// Send out all block header requests
// 把所有的request发送出去
// 为每一个peer都创建一个协程,然后请求所有需要从该peer获取的请求9 Y% P7 V: h$ M3 p6 n9 O( i
for peer, hashes := range request {
log.Trace("Fetching scheduled headers", "peer", peer, "list", hashes)" m4 {$ B$ J* y7 x' R: G
// Create a closure of the fetch and schedule in on a new thread
fetchHeader, hashes := f.fetching[hashes[0]].fetchHeader, hashes
go func() {! j; h9 v" O1 ?) v/ a
if f.fetchingHook != nil {
f.fetchingHook(hashes)0 R. i0 L7 K9 u S6 l9 j5 l
}4 Y' }( P: x2 k- l
for _, hash := range hashes {
headerFetchMeter.Mark(1)
fetchHeader(hash) // Suboptimal, but protocol doesn't allow batch header retrievals U" p- S; w& S! w. ^) Z/ p
}- b6 a% N0 F3 {. ?9 g) V+ A
}()
}; N: C1 J* L& z
// Schedule the next fetch if blocks are still pending4 |7 v" |3 n" n% U5 `0 T
f.rescheduleFetch(fetchTimer)( P' J) r# g5 o g/ p
从Notify的调用中,可以看出,fetcherHeader()的实际函数是RequestOneHeader(),该函数使用的消息是GetBlockHeadersMsg,可以用来请求多个区块头,不过fetcher只请求一个。7 @3 W6 C. \8 R- Y: T7 \* w0 _
pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestOneHeader, p.RequestBodies)6 h1 ?- n" s& Q4 g* j
// RequestOneHeader is a wrapper around the header query functions to fetch a: H* L% T# L7 w. \% D
// single header. It is used solely by the fetcher.
func (p *peer) RequestOneHeader(hash common.Hash) error {% V0 J2 u4 y0 F: z4 c) i
p.Log().Debug("Fetching single header", "hash", hash)' j ~5 K4 t: V& {+ e( T
return p2p.Send(p.rw, GetBlockHeadersMsg, &getBlockHeadersData{Origin: hashOrNumber{Hash: hash}, Amount: uint64(1), Skip: uint64(0), Reverse: false})% w$ T+ H1 Q$ R1 K8 q4 J
}
GetBlockHeadersMsg的处理如下:因为它是获取多个区块头的,所以处理起来比较“麻烦”,还好,fetcher只获取一个区块头,其处理在20行~33行,获取下一个区块头的处理逻辑,这里就不看了,最后调用SendBlockHeaders()将区块头发送给请求的节点,消息是BlockHeadersMsg。
···4 c. J, b3 P% Q' ^) P) S
// handleMsg()
// Block header query, collect the requested headers and reply. V3 G8 M: f- i
case msg.Code == GetBlockHeadersMsg:) h. s) Y; s" C' l, P, c) F. N1 a
// Decode the complex header query
var query getBlockHeadersData
if err := msg.Decode(&query); err != nil {
return errResp(ErrDecode, “%v: %v”, msg, err)0 o- w) X1 A4 T
}
hashMode := query.Origin.Hash != (common.Hash{})
// Gather headers until the fetch or network limits is reached
// 收集区块头,直到达到限制& A, F. ?% G" i- ]+ ~; p& Q
var (+ ^4 c; i: K3 J* C" n6 F2 p2 N$ P- q
bytes common.StorageSize
headers []*types.Header
unknown bool, ]7 N* {1 M( X2 {3 m3 I0 f
)! b: k' K! N: O
// 自己已知区块 && 少于查询的数量 && 大小小于2MB && 小于能下载的最大数量
for !unknown && len(headers) ! C5 ~% B) L- X7 h
`BlockHeadersMsg`的处理很有意思,因为`GetBlockHeadersMsg`并不是fetcher独占的消息,downloader也可以调用,所以,响应消息的处理需要分辨出是fetcher请求的,还是downloader请求的。它的处理逻辑是:fetcher先过滤收到的区块头,如果fetcher不要的,那就是downloader的,在调用`fetcher.FilterHeaders`的时候,fetcher就将自己要的区块头拿走了。; q0 S1 n+ F, }! ~
// handleMsg()
case msg.Code == BlockHeadersMsg:3 {5 J5 {: T4 M
// A batch of headers arrived to one of our previous requests' N) p+ a; u$ t- w2 b- m& A+ g2 x
var headers []*types.Header8 v. G) r, ?# {6 w; k
if err := msg.Decode(&headers); err != nil {
return errResp(ErrDecode, “msg %v: %v”, msg, err)9 ` ~" U g' d/ y, F
}% L* h8 j) }) M- E$ g, [' Q! V
// If no headers were received, but we’re expending a DAO fork check, maybe it’s that
// 检查是不是当前DAO的硬分叉1 T, U4 L( p4 e
if len(headers) == 0 && p.forkDrop != nil {* K; w! O8 v1 N4 G2 N
// Possibly an empty reply to the fork header checks, sanity check TDs
verifyDAO := true
// If we already have a DAO header, we can check the peer's TD against it. If* Y' p* }( X6 B9 w- w a9 ?9 L7 `
// the peer's ahead of this, it too must have a reply to the DAO check
if daoHeader := pm.blockchain.GetHeaderByNumber(pm.chainconfig.DAOForkBlock.Uint64()); daoHeader != nil {
if _, td := p.Head(); td.Cmp(pm.blockchain.GetTd(daoHeader.Hash(), daoHeader.Number.Uint64())) >= 0 {
verifyDAO = false
}4 a1 j) q' ]9 [: k) U# _: g% e
}
// If we're seemingly on the same chain, disable the drop timer6 H X8 R* Z, e- s% D# n* ^4 b
if verifyDAO {
p.Log().Debug("Seems to be on the same side of the DAO fork")
p.forkDrop.Stop()
p.forkDrop = nil" }5 }; {5 L0 t0 M
return nil
}1 W0 Z: p* Y$ p; m: J
}
// Filter out any explicitly requested headers, deliver the rest to the downloader
// 过滤是不是fetcher请求的区块头,去掉fetcher请求的区块头再交给downloader
filter := len(headers) == 1' B% B, D6 ]) N% _; \# d5 d) W) |
if filter {. v; ~- W0 v- D% e# c
// If it's a potential DAO fork check, validate against the rules, ^; m7 [; N" E4 e, d4 F8 K6 v
// 检查是否硬分叉) c0 O: [& t7 \( g! ]: g
if p.forkDrop != nil && pm.chainconfig.DAOForkBlock.Cmp(headers[0].Number) == 0 {
// Disable the fork drop timer. v' ]/ K' C, I- W
p.forkDrop.Stop()
p.forkDrop = nil
// Validate the header and either drop the peer or continue3 k* `" k' f- ]; |5 P# W
if err := misc.VerifyDAOHeaderExtraData(pm.chainconfig, headers[0]); err != nil {% d2 P3 \# E5 M/ v L% b* }
p.Log().Debug("Verified to be on the other side of the DAO fork, dropping")
return err
}5 J% g+ l6 i2 A5 V+ d
p.Log().Debug("Verified to be on the same side of the DAO fork"); h6 T( M- i+ Y7 P. g: d- d. w8 _
return nil4 P' ~$ J; u0 r
}
// Irrelevant of the fork checks, send the header to the fetcher just in case
// 使用fetcher过滤区块头
headers = pm.fetcher.FilterHeaders(p.id, headers, time.Now())
}
// 剩下的区块头交给downloader
if len(headers) > 0 || !filter {
err := pm.downloader.DeliverHeaders(p.id, headers)
if err != nil {) l5 `+ ^: Z! y! ]& R
log.Debug("Failed to deliver headers", "err", err)
}4 x( ]1 R c6 P; F4 U! d
}
`FilterHeaders()`是一个很有大智慧的函数,看起来耐人寻味,但实在妙。它要把所有的区块头,都传递给fetcher协程,还要获取fetcher协程处理后的结果。`fetcher.headerFilter`是存放通道的通道,而`filter`是存放包含区块头过滤任务的通道。它先把`filter`传递给了`headerFilter`,这样`fetcher`协程就在另外一段等待了,而后将`headerFilterTask`传入`filter`,`fetcher`就能读到数据了,处理后,再将数据写回`filter`而刚好被`FilterHeaders`函数处理了,该函数实际运行在`handleMsg()`的协程中。' V7 O g0 W M$ `" ^9 r: R" _
每个Peer都会分配一个ProtocolManager然后处理该Peer的消息,但`fetcher`只有一个事件处理协程,如果不创建一个`filter`,fetcher哪知道是谁发给它的区块头呢?过滤之后,该如何发回去呢?
// FilterHeaders extracts all the headers that were explicitly requested by the fetcher,
// returning those that should be handled differently.' T/ D, J7 g% i4 E1 J& l
// 寻找出fetcher请求的区块头
func (f *Fetcher) FilterHeaders(peer string, headers []*types.Header, time time.Time) []*types.Header {
log.Trace(“Filtering headers”, “peer”, peer, “headers”, len(headers))
// Send the filter channel to the fetcher2 n. q1 o9 `; W( x0 a
// 任务通道
filter := make(chan *headerFilterTask)
select {
// 任务通道发送到这个通道
case f.headerFilter * y- C. B+ H1 k
}& Q1 P* [, \* |5 d* v
接下来要看f.headerFilter的处理,这段代码有90行,它做了一下几件事:
1. 从`f.headerFilter`取出`filter`,然后取出过滤任务`task`。9 }% y/ q* h1 p* `7 w
2. 它把区块头分成3类:`unknown`这不是分是要返回给调用者的,即`handleMsg()`, `incomplete`存放还需要获取body的区块头,`complete`存放只包含区块头的区块。遍历所有的区块头,填到到对应的分类中,具体的判断可看18行的注释,记住宏观中将的状态转移图。; O8 L- J# W. J6 V* \
3. 把`unknonw`中的区块返回给`handleMsg()`。, f2 F) r. Q; |+ E U3 L3 U
4. 把` incomplete`的区块头获取状态移动到`fetched`状态,然后触发定时器,以便去处理complete的区块。& t% g0 c% O: g" M- ?
5. 把`compelete`的区块加入到`queued`。
// fetcher.loop()* `5 M% ^ B: @/ B
case filter :=
// Split the batch of headers into unknown ones (to return to the caller),
// known incomplete ones (requiring body retrievals) and completed blocks.
// unknown的不是fetcher请求的,complete放没有交易和uncle的区块,有头就够了,incomplete放
// 还需要获取uncle和交易的区块; D0 E; e1 \+ O9 o. P
unknown, incomplete, complete := []*types.Header{}, []*announce{}, []*types.Block{}6 S l/ N( g- Y
// 遍历所有收到的header8 S8 N- c4 {- a; C0 V
for _, header := range task.headers {# M4 K: V( _) u8 G2 h0 ^1 `; F
hash := header.Hash()
// Filter fetcher-requested headers from other synchronisation algorithms; F. m0 W0 a1 O( i! M2 d( V
// 是正在获取的hash,并且对应请求的peer,并且未fetched,未completing,未queued
if announce := f.fetching[hash]; announce != nil && announce.origin == task.peer && f.fetched[hash] == nil && f.completing[hash] == nil && f.queued[hash] == nil {; _+ O/ Y/ X6 {2 K' ~0 l
// If the delivered header does not match the promised number, drop the announcer- d/ [: p/ b7 e3 l4 Q4 }( I, G& o/ T
// 高度校验,竟然不匹配,扰乱秩序,peer肯定是坏蛋。
if header.Number.Uint64() != announce.number {
log.Trace("Invalid block number fetched", "peer", announce.origin, "hash", header.Hash(), "announced", announce.number, "provided", header.Number) F. T. C2 O2 y+ ]0 h
f.dropPeer(announce.origin)+ f% G2 U4 h# C' s: o! t
f.forgetHash(hash)
continue- m3 k3 T) J' `) g% I( N0 b
}4 \& i. ~' G( E0 m# ~: T) B( L
// Only keep if not imported by other means
// 本地链没有当前区块) t5 j6 K" y) V& \/ s' F
if f.getBlock(hash) == nil {7 t5 f* c& w; S, i( r% q
announce.header = header
announce.time = task.time
// If the block is empty (header only), short circuit into the final import queue; L6 j2 r% y# }) p% E. }! @
// 如果区块没有交易和uncle,加入到complete
if header.TxHash == types.DeriveSha(types.Transactions{}) && header.UncleHash == types.CalcUncleHash([]*types.Header{}) {) |9 M3 A X* n7 O: x3 S4 J' P
log.Trace("Block empty, skipping body retrieval", "peer", announce.origin, "number", header.Number, "hash", header.Hash())
block := types.NewBlockWithHeader(header)' L$ D' T. O, A2 K
block.ReceivedAt = task.time4 _1 |6 t) H8 a6 L; {. Q; _
complete = append(complete, block)% j' b9 C) [/ g" r6 z
f.completing[hash] = announce& c3 b2 A5 W$ `& y& v
continue5 a! E# B5 I/ o/ T
}
// Otherwise add to the list of blocks needing completion: T5 s" V. w) K7 O m
// 否则就是不完整的区块
incomplete = append(incomplete, announce)" |: G1 Z) c7 d5 ]1 |6 r
} else {1 {1 T. E$ B Q0 F7 @: F
log.Trace("Block already imported, discarding header", "peer", announce.origin, "number", header.Number, "hash", header.Hash())
f.forgetHash(hash)
}7 L7 Z: u2 j" R h+ V
} else {+ I- ? A1 B7 v2 D( h
// Fetcher doesn't know about it, add to the return list2 E6 m) _! c& {* u
// 没请求过的header1 K$ b4 L% b$ s: J) v( v4 P
unknown = append(unknown, header)7 m4 @, Q r6 T( G' W
}+ g1 l8 [& l0 g( e8 g* |
}
// 把未知的区块头,再传递会filter
headerFilterOutMeter.Mark(int64(len(unknown)))9 Q( F, q& h: g) }0 A
select {; k; r* d; ^. c
case filter
跟随状态图的转义,剩下的工作是`fetched`转移到`completing` ,上面的流程已经触发了`completeTimer`定时器,超时后就会处理,流程与请求Header类似,不再赘述,此时发送的请求消息是`GetBlockBodiesMsg`,实际调的函数是`RequestBodies`。1 C8 L8 v* Z E% q5 ], R. a$ A+ R& t
// fetcher.loop()" N5 _5 ~/ R+ v1 h( \
case
// 遍历所有待获取body的announce
for hash, announces := range f.fetched {
// Pick a random peer to retrieve from, reset all others
// 随机选一个Peer发送请求,因为可能已经有很多Peer通知它这个区块了8 m1 B, n7 o* |* x% B( M$ F
announce := announces[rand.Intn(len(announces))]
f.forgetHash(hash)
// If the block still didn't arrive, queue for completion, q: F+ v* {4 n/ E! c
// 如果本地没有这个区块,则放入到completing,创建请求
if f.getBlock(hash) == nil {
request[announce.origin] = append(request[announce.origin], hash)- @: |! i3 u; S8 S' k
f.completing[hash] = announce, }, @' d4 `$ H" o9 G3 M
}, T' _ Z% c( e7 e3 U
}
// Send out all block body requests
// 发送所有的请求,获取body,依然是每个peer一个单独协程) @6 l$ e% Y, Q4 ~
for peer, hashes := range request {
log.Trace("Fetching scheduled bodies", "peer", peer, "list", hashes)
// Create a closure of the fetch and schedule in on a new thread, v* ~. Y: o+ r) h" {
if f.completingHook != nil {, @) K0 R$ p* K2 q- N
f.completingHook(hashes)
}; [+ y4 }2 x' Q4 u1 f
bodyFetchMeter.Mark(int64(len(hashes)))' i8 f1 ]9 W' [1 d
go f.completing[hashes[0]].fetchBodies(hashes)
}
// Schedule the next fetch if blocks are still pending& d8 K7 A' y7 O
f.rescheduleComplete(completeTimer)
`handleMsg()`处理该消息也是干净利落,直接获取RLP格式的body,然后发送响应消息。
// handleMsg() j2 B M( ~3 |& ~
case msg.Code == GetBlockBodiesMsg:
// Decode the retrieval message7 C# ]5 W/ x8 L8 Q/ D4 t7 J
msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
if _, err := msgStream.List(); err != nil {
return err
}
// Gather blocks until the fetch or network limits is reached
var (% @8 U( @6 b; q3 t" C% P3 n
hash common.Hash! V9 [+ v) v; [8 S1 ^, m3 U
bytes int- ?/ x- W% j$ v# U0 T9 E
bodies []rlp.RawValue
)6 d( V% u) Z0 M4 ` W" u4 P. N
// 遍历所有请求9 T+ x! {' D- }, X
for bytes
响应消息`BlockBodiesMsg`的处理与处理获取header的处理原理相同,先交给fetcher过滤,然后剩下的才是downloader的。需要注意一点,响应消息里只包含交易列表和叔块列表。
// handleMsg()* ]) H3 [; z0 c
case msg.Code == BlockBodiesMsg:# E* q" c* }0 i: T/ |4 c
// A batch of block bodies arrived to one of our previous requests( v \, k0 n* D0 h5 k5 Q
var request blockBodiesData) Q0 m% h: S* P$ T
if err := msg.Decode(&request); err != nil {; S2 u" f/ c$ h7 l( v+ }' D( p
return errResp(ErrDecode, “msg %v: %v”, msg, err)! I$ ~/ J' W) ]; Q2 g4 V
}8 ~4 Z5 \' w# _9 E* f; i
// Deliver them all to the downloader for queuing$ O! ?* [! A4 L( E& ?, I5 o3 Y
// 传递给downloader去处理
transactions := make([][]*types.Transaction, len(request))
uncles := make([][]*types.Header, len(request))
for i, body := range request {
transactions = body.Transactions* F2 v% z3 {6 D
uncles = body.Uncles
}
// Filter out any explicitly requested bodies, deliver the rest to the downloader
// 先让fetcher过滤去fetcher请求的body,剩下的给downloader7 g( k, [# y x k+ f z' G
filter := len(transactions) > 0 || len(uncles) > 0# W/ c3 r5 c- } U& ^
if filter {' u! w: C: B8 l& e7 h" T! f9 ?
transactions, uncles = pm.fetcher.FilterBodies(p.id, transactions, uncles, time.Now())
}
// 剩下的body交给downloader
if len(transactions) > 0 || len(uncles) > 0 || !filter {, `, t, c, V/ Z7 ~4 Q1 I
err := pm.downloader.DeliverBodies(p.id, transactions, uncles)
if err != nil {
log.Debug("Failed to deliver bodies", "err", err), W: b7 r/ Y$ t
}) q0 f! b: x# W$ J( R
}
过滤函数的原理也与Header相同。
// FilterBodies extracts all the block bodies that were explicitly requested by1 a3 x( |4 j4 b( Q1 V( F8 t
// the fetcher, returning those that should be handled differently.
// 过去出fetcher请求的body,返回它没有处理的,过程类型header的处理" J5 K! ]4 I: J0 [
func (f *Fetcher) FilterBodies(peer string, transactions [][]*types.Transaction, uncles [][]*types.Header, time time.Time) ([][]*types.Transaction, [][]*types.Header) {. b0 M- k: ?1 A
log.Trace(“Filtering bodies”, “peer”, peer, “txs”, len(transactions), “uncles”, len(uncles))6 h2 c' }0 c6 P( @# ]0 _' _
// Send the filter channel to the fetcher
filter := make(chan *bodyFilterTask)2 @# p3 j2 ^) f a: B/ L2 x
select {
case f.bodyFilter
}& y0 O- T9 `8 V2 Q0 q7 {( P" }9 S
实际过滤body的处理瞧一下,这和Header的处理是不同的。直接看不点:0 F& Y! F( c- I" e% l9 ~) I# m* K4 h/ W
1. 它要的区块,单独取出来存到`blocks`中,它不要的继续留在`task`中。
2. 判断是不是fetcher请求的方法:如果交易列表和叔块列表计算出的hash值与区块头中的一样,并且消息来自请求的Peer,则就是fetcher请求的。
3. 将`blocks`中的区块加入到`queued`,终结。
case filter :=
blocks := []*types.Block{}
// 获取的每个body的txs列表和uncle列表; v! _1 T8 F4 M. r9 H& V
// 遍历每个区块的txs列表和uncle列表,计算hash后判断是否是当前fetcher请求的body3 v3 m0 i& O6 q# N
for i := 0; i
}
2 r3 ~- w0 @: p
至此,fetcher获取完整区块的流程讲完了,fetcher模块中80%的代码也都贴出来了,还有2个值得看看的函数:
1. `forgetHash(hash common.Hash)`:用于清空指定hash指的记/状态录信息。
2. `forgetBlock(hash common.Hash)`:用于从队列中移除一个区块。
最后了,再回到开始看看fetcher模块和新区块的传播流程,有没有豁然开朗。( a$ N6 R7 G* M& P$ z* {) y
& z; i0 |* H' N5 G6 R
成为第一个吐槽的人