Hi 游客

更多精彩,请登录!

比特池塘 区块链技术 正文

以太坊源码分析:fetcher模块和区块传播

刘艳琴
163 0 0
从区块传播策略入手,介绍新区块是如何传播到远端节点,以及新区块加入到远端节点本地链的过程,同时会介绍fetcher模块,fetcher的功能是处理Peer通知的区块信息。在介绍过程中,还会涉及到p2p,eth等模块,不会专门介绍,而是专注区块的传播和加入区块链的过程。0 _, |' e0 g4 \6 [/ Q" S' ~2 ^( Q
当前代码是以太坊Release 1.8,如果版本不同,代码上可能存在差异。, Z6 {5 j4 B- W# a; [4 k7 M
总体过程和传播策略8 y3 V5 h0 Q, [5 ]
本节从宏观角度介绍,节点产生区块后,为了传播给远端节点做了啥,远端节点收到区块后又做了什么,每个节点都连接了很多Peer,它传播的策略是什么样的?
% {: X* d/ R- Z2 \2 l总体流程和策略可以总结为,传播给远端Peer节点,Peer验证区块无误后,加入到本地区块链,继续传播新区块信息。具体过程如下。, q, F5 r. `; L$ S$ x5 @
先看总体过程。产生区块后,miner模块会发布一个事件NewMinedBlockEvent,订阅事件的协程收到事件后,就会把新区块的消息,广播给它所有的peer,peer收到消息后,会交给自己的fetcher模块处理,fetcher进行基本的验证后,区块没问题,发现这个区块就是本地链需要的下一个区块,则交给blockChain进一步进行完整的验证,这个过程会执行区块所有的交易,无误后把区块加入到本地链,写入数据库,这个过程就是下面的流程图,图1。7 C# M% o8 q( C  k
: b+ Z3 R0 _& A1 Y
总体流程图,能看到有个分叉,是因为节点传播新区块是有策略的。它的传播策略为:* A& F  Z0 U8 t9 j$ m0 a
假如节点连接了N个Peer,它只向Peer列表的sqrt(N)个Peer广播完整的区块消息。向所有的Peer广播只包含区块Hash的消息。" c! E5 ~4 y5 i6 @/ N* m
策略图的效果如图2,红色节点将区块传播给黄色节点:
, S; H/ }3 L! M- l4 g: P) ^! h
' Z) C  [" Q# S1 z/ L( j
+ Z* m8 o5 X, e# N  F收到区块Hash的节点,需要从发送给它消息的Peer那里获取对应的完整区块,获取区块后就会按照图1的流程,加入到fetcher队列,最终插入本地区块链后,将区块的Hash值广播给和它相连,但还不知道这个区块的Peer。非产生区块节点的策略图,如图3,黄色节点将区块Hash传播给青色节点:
/ B/ B' ~7 n- z; u6 c7 R; R3 e  ~5 D. J: g8 i' j% |  ~; i$ e1 `$ ?' S
至此,可以看出以太坊采用以石击水的方式,像水纹一样,层层扩散新产生的区块。. ]+ C: {( l( J5 l
Fetcher模块是干啥的2 n. |# b, g. l# B$ w4 M6 u; ~
fetcher模块的功能,就是收集其他Peer通知它的区块信息:1)完整的区块2)区块Hash消息。根据通知的消息,获取完整的区块,然后传递给eth模块把区块插入区块链。2 A: g  a( c, H
如果是完整区块,就可以传递给eth插入区块,如果只有区块Hash,则需要从其他的Peer获取此完整的区块,然后再传递给eth插入区块6 [. }8 Y) ^& e0 o) R. Q

* V/ y$ U' _6 Y0 S  O5 Z4 D" e源码解读
/ R5 v, q$ |, j) q* F本节介绍区块传播和处理的细节东西,方式仍然是先用图解释流程,再是代码流程。
! z5 ^1 W. B" b5 f, {5 g" C产块节点的传播新区块
" L9 i3 i* f2 G. S  j节点产生区块后,广播的流程可以表示为图4:- D! ~% D9 F# @) ?) _
发布事件事件处理函数选择要广播完整的Peer,然后将区块加入到它们的队列事件处理函数把区块Hash添加到所有Peer的另外一个通知队列每个Peer的广播处理函数,会遍历它的待广播区块队列和通知队列,把数据封装成消息,调用P2P接口发送出去9 U2 q2 Q. G8 f' M2 ^
6 g8 q6 {/ m! B; }/ ]' ~& w; R

- \* W, A: e. V& ^$ n. J7 \再看下代码上的细节。2 q( r9 D' S/ v' E6 J/ H
worker.wait()函数发布事件NewMinedBlockEvent。ProtocolManager.minedBroadcastLoop()是事件处理函数。它调用了2次pm.BroadcastBlock()。
7 e* m0 F6 P* J  [" O0 X+ i# f8 }5 j6 a9 d
// Mined broadcast loop/ U9 f8 r! X) x
func (pm *ProtocolManager) minedBroadcastLoop() {1 U/ q$ O7 s9 s
        // automatically stops if unsubscribe
: x) c+ o- G% d7 {; F! m1 l& o% b        for obj := range pm.minedBlockSub.Chan() {1 X1 d0 W2 ]8 a/ O4 E% I2 e
                switch ev := obj.Data.(type) {! f2 }% q& J; H8 _& ^
                case core.NewMinedBlockEvent:; c8 o7 ]' F1 Y4 P1 K
                        pm.BroadcastBlock(ev.Block, true)  // First propagate block to peers
$ v( E+ \3 u  B1 O                        pm.BroadcastBlock(ev.Block, false) // Only then announce to the rest4 D: p* c) r" G" I0 w
                }
, g! P. ?8 @$ f" c% T        }6 B3 {) n# g9 W3 X9 o6 z: Z# e
}
2 W1 B' |! e7 |% ~) \# Jpm.BroadcastBlock()的入参propagate为真时,向部分Peer广播完整的区块,调用peer.AsyncSendNewBlock(),否则向所有Peer广播区块头,调用peer.AsyncSendNewBlockHash(),这2个函数就是把数据放入队列,此处不再放代码。
5 u; ~' L5 x5 Q- G// BroadcastBlock will either propagate a block to a subset of it's peers, or& m) I& O- Y* a" D( f( i
// will only announce it's availability (depending what's requested).
( L! |% {, \0 v' ]func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) {
; l, c# O1 X( ^0 _, c8 x- j3 c0 a" J        hash := block.Hash()
* h$ E8 K3 z& K# J        peers := pm.peers.PeersWithoutBlock(hash)
2 a; T0 e' X. z4 d/ a        // If propagation is requested, send to a subset of the peer
$ ]5 n  u+ T5 a- C  J  ]0 M: s        // 这种情况,要把区块广播给部分peer
  J  `, E) m/ ~        if propagate {
- Z5 M; ?# a  O1 l* O; S7 ]                // Calculate the TD of the block (it's not imported yet, so block.Td is not valid): H  u: e1 ~$ ?* Z/ l7 U( z9 o
                // 计算新的总难度& q$ N. m3 f8 E' i+ ], `# h4 Q
                var td *big.Int
" j1 u/ ~" S: y+ _* Y                if parent := pm.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1); parent != nil {2 A# |2 [3 F# w$ n3 b3 G5 K
                        td = new(big.Int).Add(block.Difficulty(), pm.blockchain.GetTd(block.ParentHash(), block.NumberU64()-1))2 h4 O" J, J' |& J- c
                } else {6 R, ^9 ]+ R% H
                        log.Error("Propagating dangling block", "number", block.Number(), "hash", hash)
1 N. Z+ N3 L& h5 }, X7 F) a; j9 Q: q                        return
. p) K! T; b6 c4 h3 o7 E9 q                }
7 }$ `4 V' `3 k2 e6 ~                // Send the block to a subset of our peers
# e. G3 k, b( U2 ?* f                // 广播区块给部分peer! A3 U! Z" u0 G/ w
                transfer := peers[:int(math.Sqrt(float64(len(peers))))]
$ W# S+ K# R1 k9 C  t& {                for _, peer := range transfer {
% @- ?0 v0 Q3 e2 ^4 M% J* B                        peer.AsyncSendNewBlock(block, td): O  G2 Y! N9 F1 ~
                }
/ v6 B1 W( w0 ?* H/ p                log.Trace("Propagated block", "hash", hash, "recipients", len(transfer), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))) \0 A5 Z6 ?; U1 `
                return
% |# r& \# y, B# R: e" n# U* U        }
3 U3 A8 q6 q( c- z$ M' ^! P, \        // Otherwise if the block is indeed in out own chain, announce it0 [: |, z/ x( K3 f
        // 把区块hash值广播给所有peer
* D- n$ V) W1 I        if pm.blockchain.HasBlock(hash, block.NumberU64()) {2 H( Z$ v) b& Z1 C0 V0 c3 t
                for _, peer := range peers {
) [" ~% U3 }; P( R- A; I                        peer.AsyncSendNewBlockHash(block)3 c7 c  g5 Y& g. `" ?
                }/ L8 J9 g, h; O( u5 A/ n
                log.Trace("Announced block", "hash", hash, "recipients", len(peers), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))
! d& I/ Q1 {8 N% S6 ?6 s0 k) e        }
' `2 Q( b2 W$ {% u1 ?1 y}$ r5 B* ]! z/ A- K' y4 C
peer.broadcase()是每个Peer连接的广播函数,它只广播3种消息:交易、完整的区块、区块的Hash,这样表明了节点只会主动广播这3中类型的数据,剩余的数据同步,都是通过请求-响应的方式。
5 c2 }1 W3 G% O( v( u// broadcast is a write loop that multiplexes block propagations, announcements+ D- o) b  S: v+ T$ R( N6 e2 ~
// and transaction broadcasts into the remote peer. The goal is to have an async
; C  o" M' ]0 X  C) y4 T& b// writer that does not lock up node internals.% y/ u8 B" v/ F4 [8 e6 y
func (p *peer) broadcast() {
+ e; Z( K2 L# x# u' a  n  \        for {
' L9 j' m! Y/ }. w% m: F" {                select {
: \7 S( V/ S/ C/ _1 x( S3 l+ g                // 广播交易
/ t# X; ~+ O9 y8 b" R7 B                case txs := + X  M* C" ?- ]/ B( |3 p
Peer节点处理新区块/ y" [+ X8 z3 ?  z9 v
本节介绍远端节点收到2种区块同步消息的处理,其中NewBlockMsg的处理流程比较清晰,也简洁。NewBlockHashesMsg消息的处理就绕了2绕,从总体流程图1上能看出来,它需要先从给他发送消息Peer那里获取到完整的区块,剩下的流程和NewBlockMsg又一致了。: f9 [( e+ T' ]
这部分涉及的模块多,画出来有种眼花缭乱的感觉,但只要抓住上面的主线,代码看起来还是很清晰的。通过图5先看下整体流程。
0 z* ~, C* G4 C8 k4 t% g/ k) q* Q消息处理的起点是ProtocolManager.handleMsg,NewBlockMsg的处理流程是蓝色标记的区域,红色区域是单独的协程,是fetcher处理队列中区块的流程,如果从队列中取出的区块是当前链需要的,校验后,调用blockchian.InsertChain()把区块插入到区块链,最后写入数据库,这是黄色部分。最后,绿色部分是NewBlockHashesMsg的处理流程,代码流程上是比较复杂的,为了能通过图描述整体流程,我把它简化掉了。
0 q7 Z7 A) n1 E2 ~8 r. v6 T# W: @% J2 y# g6 A! ^
仔细看看这幅图,掌握整体的流程后,接下来看每个步骤的细节。  j7 l  A) Z' }* u' X0 h
NewBlockMsg的处理
8 V$ C3 M; E- y$ r本节介绍节点收到完整区块的处理,流程如下:7 S5 X) o* y- k8 D! [8 y
首先进行RLP编解码,然后标记发送消息的Peer已经知道这个区块,这样本节点最后广播这个区块的Hash时,不会再发送给该Peer。
3 Z# u6 A- K$ s7 _3 f  e' x将区块存入到fetcher的队列,调用fetcher.Enqueue。
2 {, |+ @( V1 V. c; P更新Peer的Head位置,然后判断本地链是否落后于Peer的链,如果是,则通过Peer更新本地链。
+ H' [8 ^3 {& ^* O; I只看handle.Msg()的NewBlockMsg相关的部分。1 n6 `! s, F. i6 i$ G7 G7 e
case msg.Code == NewBlockMsg:
( ]7 a- Y3 g# q        // Retrieve and decode the propagated block
6 x/ G- H* ]: b        // 收到新区块,解码,赋值接收数据
% q5 E& C- j5 F' U# U. N        var request newBlockData
# Y! G2 t  R# I) O' N        if err := msg.Decode(&request); err != nil {  o6 f- g7 y% T9 Z/ |* h& n$ d
                return errResp(ErrDecode, "%v: %v", msg, err)
9 v6 {: e% p$ P* @2 S        }+ m0 c+ a$ l/ M
        request.Block.ReceivedAt = msg.ReceivedAt' Z+ O5 M  Z, `/ P
        request.Block.ReceivedFrom = p- l# V5 n7 C+ v1 ^+ g" ^  T
        // Mark the peer as owning the block and schedule it for import: S: Z# n0 _) e+ `5 O0 m, {% {
        // 标记peer知道这个区块; I' G, p- d! Y0 F# o4 Q/ N4 J
        p.MarkBlock(request.Block.Hash())' c. B/ m; H: {' g, ?6 Y
        // 为啥要如队列?已经得到完整的区块了4 f  R* t5 A6 n- u
        // 答:存入fetcher的优先级队列,fetcher会从队列中选取当前高度需要的块6 p8 Z6 q$ F9 E# n" \6 {
        pm.fetcher.Enqueue(p.id, request.Block)3 W& E0 O2 }4 q- U
        // Assuming the block is importable by the peer, but possibly not yet done so,
9 \2 o( a4 Y3 Q* O0 z        // calculate the head hash and TD that the peer truly must have.4 A0 p' }' D0 K# b9 E7 V2 C
        // 截止到parent区块的头和难度9 M" x( g$ y8 ]+ b: G9 Z. @
        var (
6 J% E. i2 X* a! p                trueHead = request.Block.ParentHash()3 v: }- r4 V* i% V9 t
                trueTD   = new(big.Int).Sub(request.TD, request.Block.Difficulty())9 p1 T8 ~$ `, O1 H. M
        ), `9 e/ k; p% r3 c1 K. I$ F
        // Update the peers total difficulty if better than the previous
  H) Q: E5 Q4 ?        // 如果收到的块的难度大于peer之前的,以及自己本地的,就去和这个peer同步6 K( S& H6 N$ D" f% v" E
        // 问题:就只用了一下块里的hash指,为啥不直接使用这个块呢,如果这个块不能用,干嘛不少发送些数据,减少网络负载呢。9 P; n  A- `! S& g7 C
        // 答案:实际上,这个块加入到了优先级队列中,当fetcher的loop检查到当前下一个区块的高度,正是队列中有的,则不再向peer请求+ b3 y; S& W  h3 S+ X" T
        // 该区块,而是直接使用该区块,检查无误后交给block chain执行insertChain
1 R9 l; `! Y# V; @* |        if _, td := p.Head(); trueTD.Cmp(td) > 0 {$ y8 }4 a8 W5 a& }# o! \
                p.SetHead(trueHead, trueTD)* ^2 S5 G/ w# [& V& U
                // Schedule a sync if above ours. Note, this will not fire a sync for a gap of
& V" d% m: S' w2 P3 ?                // a singe block (as the true TD is below the propagated block), however this( G5 L# d. _8 j. m3 y! u
                // scenario should easily be covered by the fetcher.
0 X, O# x/ W/ s  {6 E; O" }                currentBlock := pm.blockchain.CurrentBlock()- t' R- H+ R9 ?6 t4 |0 P. {1 X8 Z2 ~$ i
                if trueTD.Cmp(pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64())) > 0 {
: u8 p1 f0 u6 [: A                        go pm.synchronise(p)
& g: M; h) d8 I- b' Y                }
0 ]% h3 Y; B8 N. R5 A        }
( g# c% m" k( @//------------------------ 以上 handleMsg
4 n, D8 v% |+ h3 m5 x% H// Enqueue tries to fill gaps the the fetcher's future import queue.0 [; z: {# F( a$ G; x
// 发给inject通道,当前协程在handleMsg,通过通道发送给fetcher的协程处理) `( G4 v2 v; s$ I2 `8 r+ l
func (f *Fetcher) Enqueue(peer string, block *types.Block) error {
4 E* }& h. g; k' m: o% Q        op := &inject{
0 a+ r( m3 U% w$ t                origin: peer,- `8 J" ?( x) ]0 N* ?( I
                block:  block,8 A) J' Z( v2 P
        }3 |" e2 |  U/ _; g
        select {* _1 j( y9 y( m2 D( l1 U
        case f.inject  blockLimit {7 j5 f" U( w2 H. [' z" B, v' Q8 ^+ P% p
                log.Debug("Discarded propagated block, exceeded allowance", "peer", peer, "number", block.Number(), "hash", hash, "limit", blockLimit), K( j4 ~" c% j; O- G
                propBroadcastDOSMeter.Mark(1)7 E6 Q8 H3 }1 p. _" O! ~
                f.forgetHash(hash)
8 k) `! `* F  r' U                return
3 Z6 _) I( d+ \; y( L  H* h8 ?        }/ p2 q: p% s$ j6 U0 R, k) K, F
        // Discard any past or too distant blocks/ [7 p) D1 U" j- Y; k+ z1 i) _
        // 高度检查:未来太远的块丢弃
  N+ g/ a8 R/ L$ h9 g        if dist := int64(block.NumberU64()) - int64(f.chainHeight()); dist  maxQueueDist {; r5 O4 K& s0 Q" m  z/ l* J4 N
                log.Debug("Discarded propagated block, too far away", "peer", peer, "number", block.Number(), "hash", hash, "distance", dist)2 F0 ^2 n5 G# G/ y
                propBroadcastDropMeter.Mark(1)  J- ^  Q5 m/ \) X: R
                f.forgetHash(hash)' j2 |6 @7 H7 \+ o
                return$ W6 z. v$ K9 _; ^" z* j# F
        }3 J  @. ~9 F9 c+ [* V& [1 x
        // Schedule the block for future importing! q5 \5 A# w& |8 }) f5 A' U6 B
        // 块先加入优先级队列,加入链之前,还有很多要做. H$ {$ T% Y3 A4 n8 n
        if _, ok := f.queued[hash]; !ok {
# c. s- i2 a- {                op := &inject{# m; B9 w8 K6 T* ~, ?9 E7 y
                        origin: peer,
: h1 D  r/ h3 c9 m! g" z" j                        block:  block,
# O" s+ |7 l0 c" n                }# E( S' I6 e& R' J
                f.queues[peer] = count
5 w0 Y# `$ q6 X2 S0 w# s* k9 C5 m                f.queued[hash] = op
3 r* X% `* s& u  e  ^& V. ^+ j                f.queue.Push(op, -float32(block.NumberU64()))
; W+ M: O) r$ y9 s& T3 A                if f.queueChangeHook != nil {  t. z$ z, N/ Z
                        f.queueChangeHook(op.block.Hash(), true)
2 g2 M; P  d+ b2 k+ _* H                }
" j+ i  D' ?  S) E) ^* Q: P% z* X                log.Debug("Queued propagated block", "peer", peer, "number", block.Number(), "hash", hash, "queued", f.queue.Size())
8 V. q9 M/ E  j% D7 X2 D        }) y( \* ~5 S' c% _) S4 t
}2 k" p5 L( c6 d) w* d
fetcher队列处理) u8 W& d0 \# u
本节我们看看,区块加入队列后,fetcher如何处理区块,为何不直接校验区块,插入到本地链?! D& y, r$ \3 Z6 n  _3 D! ~
由于以太坊又Uncle的机制,节点可能收到老一点的一些区块。另外,节点可能由于网络原因,落后了几个区块,所以可能收到“未来”的一些区块,这些区块都不能直接插入到本地链。
9 i( a* c( j1 ]. T5 ~" T区块入的队列是一个优先级队列,高度低的区块会被优先取出来。fetcher.loop是单独协程,不断运转,清理fecther中的事务和事件。首先会清理正在fetching的区块,但已经超时。然后处理优先级队列中的区块,判断高度是否是下一个区块,如果是则调用f.insert()函数,校验后调用BlockChain.InsertChain(),成功插入后,广播新区块的Hash) J& G7 F; ?" a
// Loop is the main fetcher loop, checking and processing various notification7 h' V9 L& Y- V+ J( g. n
// events.
( h, X! C6 N7 c0 l: Ofunc (f *Fetcher) loop() {" `9 @) N3 L, u) Y4 @4 O
        // Iterate the block fetching until a quit is requested
! B( S% h( m* W: n: k+ F        fetchTimer := time.NewTimer(0)7 u0 v- r  J" w
        completeTimer := time.NewTimer(0), \+ m# N4 s8 t" Z1 B
        for {
0 y/ a) G9 z5 }) D8 i* a% {5 T! l                // Clean up any expired block fetches1 q+ c, m1 |1 f" H% h- S
                // 清理过期的区块3 J) z& \* A$ m: x% U& N
                for hash, announce := range f.fetching {( X' a$ o0 l3 \; m+ g( V
                        if time.Since(announce.time) > fetchTimeout {
% {% `1 S: b0 t% P& h& D) e                                f.forgetHash(hash)
- F! p& `! O* y# a                        }
4 }  z; c8 \9 w$ O4 C                }
& u2 d0 }# v7 E# y                // Import any queued blocks that could potentially fit+ h( t! q8 F( B' P1 J
                // 导入队列中合适的块
  t, t* F6 s& Y& V# h& _                height := f.chainHeight()/ B- Z* S% N& w
                for !f.queue.Empty() {
4 @9 o5 a" U2 ?, x2 ]9 G& l& Y                        op := f.queue.PopItem().(*inject)" M0 T1 d5 u6 ^' G
                        hash := op.block.Hash()
/ g3 a) d/ J/ i, G9 v- n                        if f.queueChangeHook != nil {
" s, P. _6 K1 G, O/ W/ A( d                                f.queueChangeHook(hash, false)& L" Z, @2 |4 E2 ~% m
                        }  t4 I4 H9 S. I. c# F
                        // If too high up the chain or phase, continue later
/ _! V6 F8 r9 S8 D3 U/ Y                        // 块不是链需要的下一个块,再入优先级队列,停止循环
" X7 u# f3 O. H                        number := op.block.NumberU64()6 K8 R/ @2 p8 h. K
                        if number > height+1 {
. E  r7 R  F! f* l9 {( a* j0 \                                f.queue.Push(op, -float32(number))8 A6 o' V% G( [% c6 d; c
                                if f.queueChangeHook != nil {6 W  x& t: [4 ?. v  f% x
                                        f.queueChangeHook(hash, true)
8 v' o0 N2 J; _# O% ~0 j1 D                                }$ {& b: E& s( V  s8 q1 a" e
                                break! O4 J: J) K8 C5 U4 w
                        }* D. G9 N9 z- v8 P- @! m/ j
                        // Otherwise if fresh and still unknown, try and import
1 L% f5 R6 w& h                        // 高度正好是我们想要的,并且链上也没有这个块3 k" [1 r; w  }5 ]/ I9 {) j
                        if number+maxUncleDist
1 q$ Q  j9 _6 w* k3 xfunc (f *Fetcher) insert(peer string, block *types.Block) {( {# K. t7 v* r5 o% A8 Y
        hash := block.Hash()
8 a1 U9 W, {1 S  F        // Run the import on a new thread
( R: g# G/ @3 U8 q: [. X        log.Debug("Importing propagated block", "peer", peer, "number", block.Number(), "hash", hash)  ^7 Z/ X5 Q. L/ _" g# p; I1 \$ E
        go func() {
' d4 q, ~7 m7 T% ^                defer func() { f.done
- l* a5 g9 I; X0 wNewBlockHashesMsg的处理
$ Z; s+ u2 F, y& n+ P3 N本节介绍NewBlockHashesMsg的处理,其实,消息处理是简单的,而复杂一点的是从Peer哪获取完整的区块,下节再看。
4 C9 z. s+ Q: O& f2 H1 i+ ]+ Q流程如下:
7 m' g* a0 n' Y) C对消息进行RLP解码,然后标记Peer已经知道此区块。寻找出本地区块链不存在的区块Hash值,把这些未知的Hash通知给fetcher。fetcher.Notify记录好通知信息,塞入notify通道,以便交给fetcher的协程。fetcher.loop()会对notify中的消息进行处理,确认区块并非DOS攻击,然后检查区块的高度,判断该区块是否已经在fetching或者comleting(代表已经下载区块头,在下载body),如果都没有,则加入到announced中,触发0s定时器,进行处理。
; [# d& |, `: S: q* F1 I. I关于announced下节再介绍。, b+ u5 E" B. S( w1 p% j

) m5 }* y- z" z) c  c// handleMsg()部分
" d% K. }; g6 g# [! O0 E2 wcase msg.Code == NewBlockHashesMsg:  t5 v2 x4 Z! V
        var announces newBlockHashesData! ?+ ~' c2 n* i; R% i7 F5 T/ W: n( L
        if err := msg.Decode(&announces); err != nil {+ l2 B/ f* w1 C6 E3 z
                return errResp(ErrDecode, "%v: %v", msg, err)
9 w" u5 Y& D# P# k        }4 S5 n5 X9 W+ h% ^
        // Mark the hashes as present at the remote node$ ]5 I+ q) i% w
        for _, block := range announces {
8 X. N" g" J. I6 m$ k                p.MarkBlock(block.Hash)
  U8 k2 n2 y0 L; z4 t7 v; k( r        }
2 \& L! g( M) t        // Schedule all the unknown hashes for retrieval
/ D1 b. c6 _$ |        // 把本地链没有的块hash找出来,交给fetcher去下载' j$ I1 ]5 u$ {+ m9 P, L
        unknown := make(newBlockHashesData, 0, len(announces))
1 z9 V7 h) L$ x# q' m7 R" p        for _, block := range announces {
3 A. ?0 Q' n9 g                if !pm.blockchain.HasBlock(block.Hash, block.Number) {: Y3 s) [) \$ [
                        unknown = append(unknown, block), x1 w, q7 N- W( R5 t6 B1 s
                }, M1 p) ~3 y' [5 P4 K
        }
$ y+ J& k( Q% c2 P2 L        for _, block := range unknown {: w" A, v$ q! O/ _% }1 F2 k
                pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestOneHeader, p.RequestBodies)
; [2 {# J1 q9 S, R0 `        }0 F3 N& B8 L0 _/ o$ E# X
// Notify announces the fetcher of the potential availability of a new block in
4 y, o, g$ w& B/ {// the network.
; A% Y- o( z# Z9 N' U3 ~# m// 通知fetcher(自己)有新块产生,没有块实体,有hash、高度等信息0 M, [- J) H# i6 }0 O( l* l
func (f *Fetcher) Notify(peer string, hash common.Hash, number uint64, time time.Time,4 p% A% g  S3 ]8 E& s$ [
        headerFetcher headerRequesterFn, bodyFetcher bodyRequesterFn) error {
3 \" q, E5 S) M7 d# d" |        block := &announce{: T  c6 [0 K( ~( \, M( Y( f; o7 _
                hash:        hash,# U$ g4 s- g0 e/ Z: }
                number:      number,7 l& w& Y& p9 P1 l
                time:        time,! O' V2 L6 w5 K* S
                origin:      peer,
7 c! w: k* l8 C! s. w- ?                fetchHeader: headerFetcher,
; ?) W5 u4 c1 {" i5 ?0 C0 v                fetchBodies: bodyFetcher,
0 O/ p  W2 Z! [, W4 G: K        }+ P8 M; Z2 j$ A9 z5 E
        select {: ]; d+ m1 z+ k5 ^
        case f.notify  hashLimit {
5 S) [: G, U$ T' q) q; Q                log.Debug("Peer exceeded outstanding announces", "peer", notification.origin, "limit", hashLimit)0 H0 D1 N" d! c# g6 w3 n9 k
                propAnnounceDOSMeter.Mark(1)" S" l' e" p9 y% L
                break
5 ^5 h* _6 O% H! A        }( v) V- \5 \4 n# n
        // If we have a valid block number, check that it's potentially useful
( d* R- G& S) ]0 P* y        // 高度检查4 a! f! |$ e( Z+ g- Z1 }+ E
        if notification.number > 0 {
' X) n. i$ p( m3 K                if dist := int64(notification.number) - int64(f.chainHeight()); dist  maxQueueDist {+ D- k+ S2 j+ S+ A- }. K5 P
                        log.Debug("Peer discarded announcement", "peer", notification.origin, "number", notification.number, "hash", notification.hash, "distance", dist)4 f9 o1 u9 ?' c' Y0 \2 }4 M
                        propAnnounceDropMeter.Mark(1)
" U) o+ t% ]7 Z6 S                        break
& h8 {' O8 a3 I0 M7 q. z                }
1 L- x3 j+ M; n        }
+ u% n2 @6 _. [* M: A3 r* m7 E        // All is well, schedule the announce if block's not yet downloading) t/ m. ?# s# l: S: `+ k
        // 检查是否已经在下载,已下载则忽略
/ }7 W$ B) P& L- |- C# p- g: u8 m$ P        if _, ok := f.fetching[notification.hash]; ok {* ?4 A. M, N  W7 M5 y: v6 ?
                break
7 I6 h$ P6 v# Z1 p        }
1 H) B/ S# _' ]' q& h0 ~        if _, ok := f.completing[notification.hash]; ok {/ f; Q% N% j5 a2 e) _0 k
                break/ A( X4 X* O+ m3 q
        }2 ~  z/ _! Y7 {8 q: }' b9 @) J
        // 更新peer已经通知给我们的区块数量1 Q% a- T" O2 ^8 ]
        f.announces[notification.origin] = count% q; L# P9 E# ~: W6 ?3 R
        // 把通知信息加入到announced,供调度2 R5 T$ u" O6 v: ~8 p; l
        f.announced[notification.hash] = append(f.announced[notification.hash], notification)  ^6 i" ~$ x1 w! G2 }0 c4 \9 I
        if f.announceChangeHook != nil && len(f.announced[notification.hash]) == 1 {
8 H$ H. r; X2 q! A$ v                f.announceChangeHook(notification.hash, true)
! J/ Q; ?3 `8 @5 {8 W        }( z1 N9 n% i' `8 D* b) p. L$ j
        if len(f.announced) == 1 {
, I5 L' S7 q. g, r, y" a: w5 }                // 有通知放入到announced,则重设0s定时器,loop的另外一个分支会处理这些通知
4 T" {; y5 [4 {$ J- \8 E8 P                f.rescheduleFetch(fetchTimer)
6 I( Y$ q6 ~0 X: t# A) z4 }        }
2 Z* p8 x: o* h' L; l8 T" R+ pfetcher获取完整区块2 r2 d0 j' i# P$ E- M8 x
本节介绍fetcher获取完整区块的过程,这也是fetcher最重要的功能,会涉及到fetcher至少80%的代码。单独拉放一大节吧。
! a' |2 o6 \4 ~! jFetcher的大头
  `0 }4 p% U( l2 MFetcher最主要的功能就是获取完整的区块,然后在合适的实际交给InsertChain去验证和插入到本地区块链。我们还是从宏观入手,看Fetcher是如何工作的,一定要先掌握好宏观,因为代码层面上没有这么清晰。
" d4 ~% F0 P8 n; b宏观1 u$ @: ^; e/ d7 u/ J
首先,看两个节点是如何交互,获取完整区块,使用时序图的方式看一下,见图6,流程很清晰不再文字介绍。' l4 l1 i$ p9 c5 m4 {( T

. B# t  V# x8 |' i8 m% I再看下获取区块过程中,fetcher内部的状态转移,它使用状态来记录,要获取的区块在什么阶段,见图7。我稍微解释一下:
5 b5 W1 e6 o8 p2 A  ~( f2 N收到NewBlockHashesMsg后,相关信息会记录到announced,进入announced状态,代表了本节点接收了消息。announced由fetcher协程处理,经过校验后,会向给他发送消息的Peer发送请求,请求该区块的区块头,然后进入fetching状态。获取区块头后,如果区块头表示没有交易和uncle,则转移到completing状态,并且使用区块头合成完整的区块,加入到queued优先级队列。获取区块头后,如果区块头表示该区块有交易和uncle,则转移到fetched状态,然后发送请求,请求交易和uncle,然后转移到completing状态。收到交易和uncle后,使用头、交易、uncle这3个信息,生成完整的区块,加入到队列queued。0 B% L$ p1 C" w; D/ \& `
, N. Q: W7 u( H1 k# Q) v8 i

. z3 D2 [% o! C. J) w5 q8 @微观( q! a1 r7 V% Z3 C' O
接下来就是从代码角度看如何获取完整区块的流程了,有点多,看不懂的时候,再回顾下上面宏观的介绍图。% l. z! q# H" Q$ Y2 F
首先看Fetcher的定义,它存放了通信数据和状态管理,捡加注释的看,上文提到的状态,里面都有。0 V" V; M9 ^9 X! V$ n; z
// Fetcher is responsible for accumulating block announcements from various peers/ ^3 F0 Z7 Y! N* l
// and scheduling them for retrieval.# }/ b, q6 t: ^/ A6 y& N& _
// 积累块通知,然后调度获取这些块$ Q( [3 }; @8 c. H! i
type Fetcher struct {0 k; }$ J4 `+ G$ }/ H2 J8 C
        // Various event channels& R/ p( c9 [/ H+ o
    // 收到区块hash值的通道; z* J! Y, a7 V
        notify chan *announce
1 k, b1 ]# l$ @$ A4 N# g5 K& C    // 收到完整区块的通道
. ]5 o0 L' o2 C: ?: j7 U$ A4 x        inject chan *inject7 i& P1 T/ W* d" ]" D2 b+ D
        blockFilter chan chan []*types.Block3 e' A$ e3 K% P3 @4 q( k
        // 过滤header的通道的通道; W; F' t3 O* n  g
        headerFilter chan chan *headerFilterTask
& ]" j' N7 Y0 Z# f" ~8 H- |8 _8 a3 O        // 过滤body的通道的通道
6 U: f" n* x4 F: E- }5 l        bodyFilter chan chan *bodyFilterTask
" s7 }; b* n" \9 w9 U        done chan common.Hash; u7 ~9 L( ]8 B1 a  T, j
        quit chan struct{}
- I( O' q* I5 X2 ?# K        // Announce states
# P0 H" Y# P8 I0 ]" V# T: U        // Peer已经给了本节点多少区块头通知
8 m* U) \. I: ~7 k        announces map[string]int // Per peer announce counts to prevent memory exhaustion& S1 H" `6 f% i: i: M* k
        // 已经announced的区块列表' t1 g, O1 F8 z0 R4 `8 F! X
        announced map[common.Hash][]*announce // Announced blocks, scheduled for fetching
3 C3 r. M4 D- n" l+ A9 M3 l9 _3 y        // 正在fetching区块头的请求6 G  l2 o9 \" s2 ^3 [
        fetching map[common.Hash]*announce // Announced blocks, currently fetching
+ w/ o& V% x0 y7 v/ G/ |: y        // 已经fetch到区块头,还差body的请求,用来获取body5 |, _+ n* n& k$ ?8 X; k
        fetched map[common.Hash][]*announce // Blocks with headers fetched, scheduled for body retrieval  h7 O/ e9 y. q
        // 已经得到区块头的) l+ h4 i) w& E
        completing map[common.Hash]*announce // Blocks with headers, currently body-completing
7 y- F$ U# D7 G' W4 W( [& E        // Block cache3 ^0 g! j6 ?/ }" W' N2 g9 _& }
        // queue,优先级队列,高度做优先级" ]; R" q: _" ~( g4 n
        // queues,统计peer通告了多少块
) n- ?- ~* V' `# o8 Q6 {        // queued,代表这个块如队列了,
, x8 I6 J  b0 l; g2 A        queue  *prque.Prque            // Queue containing the import operations (block number sorted)
* ]7 L3 Y( c4 P7 H! M* m% |$ U        queues map[string]int          // Per peer block counts to prevent memory exhaustion- ^. o, c. f$ {( d& m0 m+ X
        queued map[common.Hash]*inject // Set of already queued blocks (to dedupe imports)
5 T2 K' F# m* Y! A2 a, g- Y        // Callbacks  h9 u4 L- I) K6 j+ j' r/ B
        getBlock       blockRetrievalFn   // Retrieves a block from the local chain6 x$ u3 Y& i2 j) g2 _! |+ V
        verifyHeader   headerVerifierFn   // Checks if a block's headers have a valid proof of work,验证区块头,包含了PoW验证
8 A& u7 {" v2 K0 y6 Y        broadcastBlock blockBroadcasterFn // Broadcasts a block to connected peers,广播给peer! V# K+ G. L8 x  R
        chainHeight    chainHeightFn      // Retrieves the current chain's height
# R5 R1 q" ~! C  p% M        insertChain    chainInsertFn      // Injects a batch of blocks into the chain,插入区块到链的函数7 d" u' x6 T% H
        dropPeer       peerDropFn         // Drops a peer for misbehaving
/ d& |# l! o1 Q( D  R7 A1 C2 _+ l        // Testing hooks
! |7 q) ]/ C: V5 G; k        announceChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a hash from the announce list
4 b2 E  ^; d" t9 Q: [        queueChangeHook    func(common.Hash, bool) // Method to call upon adding or deleting a block from the import queue
1 @/ A* b4 @5 \. \, ]  e1 c        fetchingHook       func([]common.Hash)     // Method to call upon starting a block (eth/61) or header (eth/62) fetch$ b& b2 E% X- }* H+ N2 V; T0 g
        completingHook     func([]common.Hash)     // Method to call upon starting a block body fetch (eth/62)7 i" N) D  _. @. G
        importedHook       func(*types.Block)      // Method to call upon successful block import (both eth/61 and eth/62)
& E' G6 z: R- h0 y* z}
' Y' `9 f* `) m" O+ _8 M6 DNewBlockHashesMsg消息的处理前面的小节已经讲过了,不记得可向前翻看。这里从announced的状态处理说起。loop()        中,fetchTimer超时后,代表了收到了消息通知,需要处理,会从announced中选择出需要处理的通知,然后创建请求,请求区块头,由于可能有很多节点都通知了它某个区块的Hash,所以随机的从这些发送消息的Peer中选择一个Peer,发送请求的时候,为每个Peer都创建了单独的协程。
- o- s' N# j8 Q- `case  arriveTimeout-gatherSlack {
* N1 w/ Y5 m8 r; a6 V: V6 l$ P                        // Pick a random peer to retrieve from, reset all others& i/ f2 @" S! P5 r1 N! d) [. f
                        // 可能有很多peer都发送了这个区块的hash值,随机选择一个peer& X4 a, C1 ^, m" U5 D1 ?: S, T
                        announce := announces[rand.Intn(len(announces))]4 u+ L$ K/ B& k) t1 a& @- A
                        f.forgetHash(hash)9 ]' _$ k& T5 q: w" L. w
                        // If the block still didn't arrive, queue for fetching) C( t( W) a9 p, U* `5 ?
                        // 本地还没有这个区块,创建获取区块的请求1 z! f% w* {2 ?. A# x
                        if f.getBlock(hash) == nil {
0 j+ l/ V& t6 u( R0 |                                request[announce.origin] = append(request[announce.origin], hash)
; H# Q0 R7 z7 O) F                                f.fetching[hash] = announce
! J- R, o. H( j: i2 Z/ U  Z- J                        }( B2 e& M& ~% X, ^2 p2 n7 m
                }
, n; c, e* t2 H5 w        }
( C& e( W4 Y7 g3 c        // Send out all block header requests
8 [# H1 a% K7 s; N( {0 I# y* ?+ I        // 把所有的request发送出去  ^" \- _) L* m0 J/ `2 @
        // 为每一个peer都创建一个协程,然后请求所有需要从该peer获取的请求: Y- E# Y: x# O! `& Z0 ~3 C
        for peer, hashes := range request {  o. A0 Z/ w2 P; X6 E8 \
                log.Trace("Fetching scheduled headers", "peer", peer, "list", hashes)6 D! `. G/ s) z/ L& ^( X
                // Create a closure of the fetch and schedule in on a new thread8 o$ c' z3 ]/ r9 g' }% A, S
                fetchHeader, hashes := f.fetching[hashes[0]].fetchHeader, hashes# y; W0 W9 T: q$ f7 p
                go func() {
. ~; x4 u$ e: l2 z1 u) G; K                        if f.fetchingHook != nil {
+ f" u- Z7 @! k. R" y                                f.fetchingHook(hashes)/ B* |6 z$ u9 L
                        }
- Q: q9 K% ?0 P6 W                        for _, hash := range hashes {; v* U" n  z7 e' p# N
                                headerFetchMeter.Mark(1)
8 f7 o& G9 r6 s8 A# p' y$ d" E                                fetchHeader(hash) // Suboptimal, but protocol doesn't allow batch header retrievals
; Y7 Y) }+ I0 N7 N! A. L& E                        }
% P& r' Y; A" {3 s* H* Z1 {9 S7 r9 \$ W                }()
' k3 b  O! P  t. Y/ `* }) Z* G+ B& G        }
* \" e- l) |$ \$ u3 Z        // Schedule the next fetch if blocks are still pending
8 g, W8 k+ x8 T  {! R' I+ F        f.rescheduleFetch(fetchTimer)
! s/ P* ]  |- T% Z3 R从Notify的调用中,可以看出,fetcherHeader()的实际函数是RequestOneHeader(),该函数使用的消息是GetBlockHeadersMsg,可以用来请求多个区块头,不过fetcher只请求一个。/ b1 p) G' I4 k1 }: O2 X7 h
pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestOneHeader, p.RequestBodies)8 R$ w) f1 F3 _4 F5 ~$ l. X
// RequestOneHeader is a wrapper around the header query functions to fetch a
9 n5 h$ S, k* A// single header. It is used solely by the fetcher.+ j3 ^8 u9 W9 O/ @4 a/ Y5 h! Q
func (p *peer) RequestOneHeader(hash common.Hash) error {
) |! B0 ?" ~0 j        p.Log().Debug("Fetching single header", "hash", hash)5 `) v( h# V' @7 d. |, U) j. y
        return p2p.Send(p.rw, GetBlockHeadersMsg, &getBlockHeadersData{Origin: hashOrNumber{Hash: hash}, Amount: uint64(1), Skip: uint64(0), Reverse: false})5 W. v3 Z* l& ~( y1 v
}
  e1 J* S4 ?& U/ X/ \2 |* n' R+ JGetBlockHeadersMsg的处理如下:因为它是获取多个区块头的,所以处理起来比较“麻烦”,还好,fetcher只获取一个区块头,其处理在20行~33行,获取下一个区块头的处理逻辑,这里就不看了,最后调用SendBlockHeaders()将区块头发送给请求的节点,消息是BlockHeadersMsg。2 y# R# v, F3 c: s2 x) X' {
···
+ Y: `2 r8 ^& p. {// handleMsg()& {4 B7 c/ q2 v+ c0 }+ a
// Block header query, collect the requested headers and reply5 M* o# t8 [0 r) R2 N7 @
case msg.Code == GetBlockHeadersMsg:
- v/ C$ k) `' s' k// Decode the complex header query! c  {0 q3 A# R- f8 }
var query getBlockHeadersData
8 l7 B9 q* }4 N# Oif err := msg.Decode(&query); err != nil {  m, G( M+ P1 o! A
return errResp(ErrDecode, “%v: %v”, msg, err)8 y" Q, i) o/ f5 i5 P! j& Z3 s
}
4 ^5 r% \4 l& E8 v% I% u1 |hashMode := query.Origin.Hash != (common.Hash{})
7 X7 Q+ T7 r3 ^9 N) v) ~3 L% m" Q. Y- C// Gather headers until the fetch or network limits is reached0 S7 R' H0 r, X7 e+ a1 W* t
// 收集区块头,直到达到限制
4 d- M, P& }7 Mvar (+ x1 |& T) \% I! _0 l6 m. f% F, |' M
        bytes   common.StorageSize; G5 C5 Q# B, ^/ O
        headers []*types.Header
- H& Y) K/ m- b6 ]- z( g        unknown bool* ~/ n" [/ s) Q, W7 F3 {6 A
)
, ?, ?2 N2 Y$ `# K// 自己已知区块 && 少于查询的数量 && 大小小于2MB && 小于能下载的最大数量
' R% `0 r$ B) e8 F- B! p8 \for !unknown && len(headers)
# @$ D/ v$ Z, M2 j+ c`BlockHeadersMsg`的处理很有意思,因为`GetBlockHeadersMsg`并不是fetcher独占的消息,downloader也可以调用,所以,响应消息的处理需要分辨出是fetcher请求的,还是downloader请求的。它的处理逻辑是:fetcher先过滤收到的区块头,如果fetcher不要的,那就是downloader的,在调用`fetcher.FilterHeaders`的时候,fetcher就将自己要的区块头拿走了。
! w3 L/ M! ]2 l1 ^2 V5 o// handleMsg()
8 K9 u7 _; z3 t) W# o' n) y3 u0 Ycase msg.Code == BlockHeadersMsg:
/ f' }9 G, D& I/ S: Y) m: ^5 F. q5 |1 T// A batch of headers arrived to one of our previous requests
' V' n, F; X# g/ I7 C, Bvar headers []*types.Header" W( S4 G9 v# T# F; d& v0 S4 W
if err := msg.Decode(&headers); err != nil {9 f) I( W6 M" [  l" r. Y
return errResp(ErrDecode, “msg %v: %v”, msg, err)9 n: ]' S0 c9 ^0 \& Y
}
& y: j- u- |" q! ]4 E// If no headers were received, but we’re expending a DAO fork check, maybe it’s that
; i8 t; ?7 @" \) {// 检查是不是当前DAO的硬分叉0 T. Q3 a' d% }/ t
if len(headers) == 0 && p.forkDrop != nil {
4 A% G' j. M7 G+ ]) G7 L// Possibly an empty reply to the fork header checks, sanity check TDs3 b: Z' j1 L& t- {/ Z) b0 G; Y
verifyDAO := true- k) [6 s+ T; g. b- A
        // If we already have a DAO header, we can check the peer's TD against it. If
8 Y( S6 C- q+ m6 ^# ^0 c. Z6 x5 c% P5 P        // the peer's ahead of this, it too must have a reply to the DAO check
7 T5 T9 p* h9 H9 q( C& b        if daoHeader := pm.blockchain.GetHeaderByNumber(pm.chainconfig.DAOForkBlock.Uint64()); daoHeader != nil {
. L, l# h+ n; F0 H                if _, td := p.Head(); td.Cmp(pm.blockchain.GetTd(daoHeader.Hash(), daoHeader.Number.Uint64())) >= 0 {
+ p* `8 I3 m* z+ X                        verifyDAO = false
/ U6 e' J7 v9 x: \3 y                }0 k+ P5 ?" R& O0 A1 R* v$ o
        }+ U" q9 q9 |. w% K2 w% D0 g3 y
        // If we're seemingly on the same chain, disable the drop timer0 `! C6 b2 p+ e' ^0 e+ x
        if verifyDAO {5 L& v/ T$ C8 Q, r2 S% c
                p.Log().Debug("Seems to be on the same side of the DAO fork")
* b, [# S  l" a1 @; ^% s* b2 \                p.forkDrop.Stop()8 T5 K- H8 V6 Z, e5 p
                p.forkDrop = nil
  _. M) ]6 M$ Y9 P- n. S                return nil
" R7 I/ J/ }) i0 w' D        }
& x( C; \4 p9 W9 Y}$ x# X% Q/ g# q. U3 ?
// Filter out any explicitly requested headers, deliver the rest to the downloader9 Z- I* _" T! t& ]  O) g$ L
// 过滤是不是fetcher请求的区块头,去掉fetcher请求的区块头再交给downloader
3 w8 q, P, m% x  }8 g9 U7 L9 [filter := len(headers) == 1- n& p% _, r( N
if filter {8 z+ u* `0 G. R. k  t
        // If it's a potential DAO fork check, validate against the rules
' M% N& V; i+ f        // 检查是否硬分叉
2 k; Z  w4 I# }/ ]. K) |: ]        if p.forkDrop != nil && pm.chainconfig.DAOForkBlock.Cmp(headers[0].Number) == 0 {
; t3 V3 `/ x# h5 T9 A7 H                // Disable the fork drop timer
& {# q# K' ~; E: m. p                p.forkDrop.Stop()7 ]( j% R8 b3 A" p
                p.forkDrop = nil& z( T+ }" o/ w
                // Validate the header and either drop the peer or continue" T# Z4 ]9 j; d
                if err := misc.VerifyDAOHeaderExtraData(pm.chainconfig, headers[0]); err != nil {  ], Z) f# m" [% h5 W
                        p.Log().Debug("Verified to be on the other side of the DAO fork, dropping")3 p3 p; O& b0 i: F
                        return err" ], r( l* p* _  i8 X0 B
                }
- C! O6 o! q, z7 w; \( ?. @                p.Log().Debug("Verified to be on the same side of the DAO fork")/ M1 D" c) C* d7 V) O$ R# [( w
                return nil
7 u5 ]$ A0 q3 Q% `        }9 z. j" d) Q, m. u
        // Irrelevant of the fork checks, send the header to the fetcher just in case
, k$ r: A/ I3 r- r' f! ]        // 使用fetcher过滤区块头
+ r& Q, b+ j4 g+ S0 c& p! H) K1 i        headers = pm.fetcher.FilterHeaders(p.id, headers, time.Now())
% N. O' \3 E+ w}
4 s6 ]' q) h& \7 S// 剩下的区块头交给downloader
5 @( l, y5 S, nif len(headers) > 0 || !filter {
8 q; k8 s- s) h9 n- b/ }" I        err := pm.downloader.DeliverHeaders(p.id, headers)4 g+ }$ c. P" R. P$ }
        if err != nil {
4 b" a* r7 Q2 f2 a7 {8 s                log.Debug("Failed to deliver headers", "err", err)) k) I1 p+ N$ @% H5 c, j0 r
        }, n0 e# o+ W- E/ O
}+ i) B" ^0 @2 ?4 o0 h  l, r
`FilterHeaders()`是一个很有大智慧的函数,看起来耐人寻味,但实在妙。它要把所有的区块头,都传递给fetcher协程,还要获取fetcher协程处理后的结果。`fetcher.headerFilter`是存放通道的通道,而`filter`是存放包含区块头过滤任务的通道。它先把`filter`传递给了`headerFilter`,这样`fetcher`协程就在另外一段等待了,而后将`headerFilterTask`传入`filter`,`fetcher`就能读到数据了,处理后,再将数据写回`filter`而刚好被`FilterHeaders`函数处理了,该函数实际运行在`handleMsg()`的协程中。8 ^1 n# @( n$ f7 Z; z+ B
每个Peer都会分配一个ProtocolManager然后处理该Peer的消息,但`fetcher`只有一个事件处理协程,如果不创建一个`filter`,fetcher哪知道是谁发给它的区块头呢?过滤之后,该如何发回去呢?
1 O+ Y2 k- W8 E% O7 {' v. q( H// FilterHeaders extracts all the headers that were explicitly requested by the fetcher,
+ Y- Q5 Y! J6 ]8 K$ Z5 Q3 Z// returning those that should be handled differently.$ L2 I% C2 r& x1 N$ b
// 寻找出fetcher请求的区块头
+ C9 R( \( e3 o3 X& K3 ~2 P' j) Ifunc (f *Fetcher) FilterHeaders(peer string, headers []*types.Header, time time.Time) []*types.Header {
- v9 {7 }8 N7 _) xlog.Trace(“Filtering headers”, “peer”, peer, “headers”, len(headers))
0 ]) T( A4 m" X8 `* p7 M// Send the filter channel to the fetcher$ }2 U7 G0 i) `5 m
// 任务通道
  z/ J; e9 w5 Y) Z! ~filter := make(chan *headerFilterTask). f3 C7 K$ J6 e& ]* n
select {
6 f1 j6 Q# I; N5 F/ h# t* k// 任务通道发送到这个通道# U9 j+ o; o7 ?6 R5 @  O' K
case f.headerFilter
7 m7 V' f9 ]" U! r: |) _9 r}
& h5 ~9 g* j) `$ S( Z! F接下来要看f.headerFilter的处理,这段代码有90行,它做了一下几件事:8 R( p9 K& F% E* X+ [/ D7 u
1. 从`f.headerFilter`取出`filter`,然后取出过滤任务`task`。! K  o# J: R3 v
2. 它把区块头分成3类:`unknown`这不是分是要返回给调用者的,即`handleMsg()`, `incomplete`存放还需要获取body的区块头,`complete`存放只包含区块头的区块。遍历所有的区块头,填到到对应的分类中,具体的判断可看18行的注释,记住宏观中将的状态转移图。
* |/ `2 W( k, o8 u3. 把`unknonw`中的区块返回给`handleMsg()`。
6 B6 P1 k- K+ X2 P5 \0 F2 e* b4. 把` incomplete`的区块头获取状态移动到`fetched`状态,然后触发定时器,以便去处理complete的区块。
2 u/ O5 [9 m6 D- J' K5. 把`compelete`的区块加入到`queued`。0 E8 I6 {- {. ^2 A7 P3 X0 m) l3 C( u
// fetcher.loop()
0 B8 ?* _( C4 c1 Fcase filter :=
# E  ~' x2 N/ f" L. h1 D  B// Split the batch of headers into unknown ones (to return to the caller),
$ A* ~: t' u5 V1 [// known incomplete ones (requiring body retrievals) and completed blocks.
& S) A) B9 Q. d; M. |- A. J. @) n// unknown的不是fetcher请求的,complete放没有交易和uncle的区块,有头就够了,incomplete放; m# J! U3 G: L1 B
// 还需要获取uncle和交易的区块
, t1 p, _, v( {+ z# lunknown, incomplete, complete := []*types.Header{}, []*announce{}, []*types.Block{}
* J1 N( _5 j2 A! r! F// 遍历所有收到的header
; K, A$ T# P, l$ w1 \+ z# O" {" afor _, header := range task.headers {
6 a. {6 X+ ]9 M        hash := header.Hash()4 T$ z1 s* c6 \. \  J
        // Filter fetcher-requested headers from other synchronisation algorithms
' q) }. y! D. ^( _  o        // 是正在获取的hash,并且对应请求的peer,并且未fetched,未completing,未queued
$ X) ?/ E5 `7 b" |, I2 C( g/ x        if announce := f.fetching[hash]; announce != nil && announce.origin == task.peer && f.fetched[hash] == nil && f.completing[hash] == nil && f.queued[hash] == nil {6 R& y0 E6 ?- L  D3 ~. i6 G
                // If the delivered header does not match the promised number, drop the announcer
, d, _" Y  I+ n7 T- U! f, n                // 高度校验,竟然不匹配,扰乱秩序,peer肯定是坏蛋。; s5 k6 C: x! \. Y! o* T- F
                if header.Number.Uint64() != announce.number {
" \7 G0 k2 F1 k' ?                        log.Trace("Invalid block number fetched", "peer", announce.origin, "hash", header.Hash(), "announced", announce.number, "provided", header.Number)8 E' Q# e! ~! y7 u
                        f.dropPeer(announce.origin)
5 `  e; Z# ?8 ?                        f.forgetHash(hash)+ K" r; K4 I% w" q$ x/ f
                        continue% k3 t! C* O4 P8 O  \* O0 C
                }
; \, G$ ~# K! C6 }$ c                // Only keep if not imported by other means
2 d& J( ~+ v+ l% _                // 本地链没有当前区块) d9 _1 S' x9 l% B' U0 Q9 {
                if f.getBlock(hash) == nil {
1 L$ h1 d6 E4 y! T& h                        announce.header = header
$ _. P0 E7 M; B& {: Y# y                        announce.time = task.time9 ], D, M% N, t5 ]2 y
                        // If the block is empty (header only), short circuit into the final import queue: x. `; f' x8 L; B7 `3 ~
                        // 如果区块没有交易和uncle,加入到complete% A2 z% B7 X, ^! a7 _* M
                        if header.TxHash == types.DeriveSha(types.Transactions{}) && header.UncleHash == types.CalcUncleHash([]*types.Header{}) {6 @6 H1 X  S$ l
                                log.Trace("Block empty, skipping body retrieval", "peer", announce.origin, "number", header.Number, "hash", header.Hash())
) y4 m: o% b9 Z                                block := types.NewBlockWithHeader(header)$ M1 n& f" ^: F+ b' L" I* A( {/ V
                                block.ReceivedAt = task.time3 e, E" \- {) }' _. V* d* \
                                complete = append(complete, block)  N+ x7 x: v* D
                                f.completing[hash] = announce0 V1 Z' v% t4 N1 k
                                continue
1 B4 m4 F" }; N; V, A1 S4 S7 F- _& |                        }4 f2 s# y0 p- F$ {( F7 D5 V
                        // Otherwise add to the list of blocks needing completion& D7 k& _0 l2 j  L+ S  p
                        // 否则就是不完整的区块6 z$ c8 J3 l' H. N4 ?( q+ M) W
                        incomplete = append(incomplete, announce)
7 K! u9 {) v6 c5 ~                } else {  B( w, H3 J5 V! }- y
                        log.Trace("Block already imported, discarding header", "peer", announce.origin, "number", header.Number, "hash", header.Hash())
' l; L! F/ o4 C8 t3 ~2 M                        f.forgetHash(hash)+ h* S) A: t( V- E6 t& c) }8 [
                }
7 j/ L* _) |* x" M" \        } else {; r5 N) z, l. E
                // Fetcher doesn't know about it, add to the return list
4 G8 O* [4 Z$ W  m# I                // 没请求过的header
' w4 K& [2 ]! w9 X; `) U$ J                unknown = append(unknown, header)9 a; G) v8 Q7 p2 J" E2 B6 K9 T; g
        }* N6 x# ]! X; k; M+ u3 r
}3 F7 z! ]) @6 ~2 S  O9 y
// 把未知的区块头,再传递会filter
3 d" }9 _3 n6 K% M5 v' RheaderFilterOutMeter.Mark(int64(len(unknown)))3 F/ O2 S& F! m9 q: ^  @4 B, F
select {6 H' a7 c9 n  V% j8 p4 R9 M" W
case filter 2 V+ v# L' e, S; s, U
跟随状态图的转义,剩下的工作是`fetched`转移到`completing`        ,上面的流程已经触发了`completeTimer`定时器,超时后就会处理,流程与请求Header类似,不再赘述,此时发送的请求消息是`GetBlockBodiesMsg`,实际调的函数是`RequestBodies`。
& k! F8 b) `/ j& h' v* O" s; Q// fetcher.loop(). H2 |( G$ ]8 i  [9 e
case
' e8 U, g0 i. u- E) O// 遍历所有待获取body的announce
3 x' Z, j1 I) \for hash, announces := range f.fetched {' |8 X0 y* {) O" n* e, X; M
        // Pick a random peer to retrieve from, reset all others
) o4 l# G  n6 c5 C6 d" L; B( n7 M        // 随机选一个Peer发送请求,因为可能已经有很多Peer通知它这个区块了
- x$ M! }# P6 i1 h& X0 C        announce := announces[rand.Intn(len(announces))], x9 [: Q' C, w$ Y! b
        f.forgetHash(hash)% i* D- B2 z8 T% }3 D3 v
        // If the block still didn't arrive, queue for completion
8 B0 R# @- V# ^" q8 u        // 如果本地没有这个区块,则放入到completing,创建请求
! c; l. N0 c, z! {: G        if f.getBlock(hash) == nil {: m7 e8 o  M+ w" U& {
                request[announce.origin] = append(request[announce.origin], hash)
* P* ?2 z2 s  M7 X7 r7 d' z: f9 z                f.completing[hash] = announce
1 K3 H$ z: w- Q' j        }
! X. R) ~, ^6 n+ ?/ p}
: T' D, f% H* M0 `. S/ y+ y// Send out all block body requests
# b9 H9 u$ r6 B* o2 b6 q" h// 发送所有的请求,获取body,依然是每个peer一个单独协程
( G4 Y+ w) a5 u; cfor peer, hashes := range request {
  Y% V' B& R3 j4 V  Z# c3 [" W        log.Trace("Fetching scheduled bodies", "peer", peer, "list", hashes)
6 K6 K9 q5 u( G- r; T4 w, R  K        // Create a closure of the fetch and schedule in on a new thread! Q+ x, ?5 z" w1 {- _/ B; q. D
        if f.completingHook != nil {
/ ]5 g7 w; K( c8 t6 \/ G  @: q: I                f.completingHook(hashes)
. a# x/ E3 H$ I' v        }
/ {/ q( f- l4 o: L( J        bodyFetchMeter.Mark(int64(len(hashes)))# D* U9 O& s$ e2 S. d( H6 ?9 h
        go f.completing[hashes[0]].fetchBodies(hashes)4 O) Z! Q( O# d
}" D8 y1 y% s0 I$ p0 }8 M, m0 s
// Schedule the next fetch if blocks are still pending
! C' d* U4 i( |f.rescheduleComplete(completeTimer)
1 P7 `5 w2 m6 c. B`handleMsg()`处理该消息也是干净利落,直接获取RLP格式的body,然后发送响应消息。3 y! }8 A  c) b/ t+ }6 a3 s0 K
// handleMsg()" T: K; {: R; Z# P0 P
case msg.Code == GetBlockBodiesMsg:2 y3 J2 ^8 C0 Q- I+ Q# ^
// Decode the retrieval message
# H; j; }  T6 v& F0 EmsgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
: V$ Z2 h/ B6 G" P, T2 ?- A# Qif _, err := msgStream.List(); err != nil {
0 _; m7 c0 B4 z3 Qreturn err
: B3 l; ^; `" H+ w  k: T3 @* n}3 t, i6 P9 B; U# O  i4 l5 A
// Gather blocks until the fetch or network limits is reached
1 O6 Z5 I# i8 N2 F! v/ svar (
/ V, X8 E: s% v+ E& hhash   common.Hash
6 n; `  Y5 R2 _. c4 ybytes  int
6 ^; {! C" y# [9 ^" r0 m, nbodies []rlp.RawValue+ j) Z) h! }4 J) z8 \
)
: l& b& x$ T+ m- ?! {- U* N- b: S; _// 遍历所有请求2 a. A+ A& G+ P; k' Y/ E5 P
for bytes ) [  i6 w3 L8 f' B8 r% Z
响应消息`BlockBodiesMsg`的处理与处理获取header的处理原理相同,先交给fetcher过滤,然后剩下的才是downloader的。需要注意一点,响应消息里只包含交易列表和叔块列表。
! H" r6 z8 r9 {! C1 \" U// handleMsg()
0 r) b% d% d% ^* z# \case msg.Code == BlockBodiesMsg:
+ E: `! Q( v* s' _// A batch of block bodies arrived to one of our previous requests9 ~5 U- e0 X) B3 ~
var request blockBodiesData4 S/ \7 C3 z. m3 G& z0 h$ X/ N/ W# x
if err := msg.Decode(&request); err != nil {
7 h/ U. ]5 W/ }return errResp(ErrDecode, “msg %v: %v”, msg, err)
4 n- ?# n0 l! k. h}
( ~9 t: q2 k' c: ?0 `0 }// Deliver them all to the downloader for queuing  i7 p+ \# q1 ~$ E
// 传递给downloader去处理0 K; x1 T; F4 N# \2 W
transactions := make([][]*types.Transaction, len(request))# H1 o2 X4 k: E8 N6 c3 N
uncles := make([][]*types.Header, len(request))
3 N$ I/ R. s- J! m- Z/ O: |3 V7 w% Ifor i, body := range request {, Q; z" B; {" ^! q/ J
        transactions = body.Transactions
8 K2 r% c; P4 o5 a+ Y  s/ _) t        uncles = body.Uncles/ K- T4 _3 ~" o8 ?2 C
}- A. O! d  E. J6 p2 g
// Filter out any explicitly requested bodies, deliver the rest to the downloader
' }" `+ B' H; D// 先让fetcher过滤去fetcher请求的body,剩下的给downloader
' J( S; q6 p/ _/ W% N2 s; lfilter := len(transactions) > 0 || len(uncles) > 0. O  M, e8 J& C& a/ X7 U
if filter {# V: I' E! V6 t; p! |, k6 Q6 w# v' ?
        transactions, uncles = pm.fetcher.FilterBodies(p.id, transactions, uncles, time.Now())
, n5 X* r+ l% M& L; W2 E}# R. n" ^4 z4 U) ]- L4 f, E1 c
// 剩下的body交给downloader
& n8 C' G8 w: {$ L' eif len(transactions) > 0 || len(uncles) > 0 || !filter {5 h/ i8 S6 ~/ A, B( S
        err := pm.downloader.DeliverBodies(p.id, transactions, uncles): V- S( i& p6 r2 O) [
        if err != nil {
; U  n7 ?+ a8 b/ g9 C8 K                log.Debug("Failed to deliver bodies", "err", err)
1 e" N, O5 t  d% p( N6 G; J        }
" o6 n2 a" Z/ E  i; W& T5 z}4 e% t4 _; d9 @
过滤函数的原理也与Header相同。$ j9 X$ D: d: T: c0 I/ D5 u
// FilterBodies extracts all the block bodies that were explicitly requested by
9 V0 L! \8 u, r0 w& X) e// the fetcher, returning those that should be handled differently.
- e, R) T4 J/ Y- X1 K0 D// 过去出fetcher请求的body,返回它没有处理的,过程类型header的处理4 ^7 u8 R1 w" X' u/ L
func (f *Fetcher) FilterBodies(peer string, transactions [][]*types.Transaction, uncles [][]*types.Header, time time.Time) ([][]*types.Transaction, [][]*types.Header) {; ^  F  E9 ^7 ?
log.Trace(“Filtering bodies”, “peer”, peer, “txs”, len(transactions), “uncles”, len(uncles))) O+ [6 b# M" @3 c3 X4 x- _
// Send the filter channel to the fetcher
. d: p5 T( G; ^3 @filter := make(chan *bodyFilterTask)1 ^- {2 n/ G; _$ H
select {
: x7 k$ T$ |8 A- N1 X8 J# J7 n, ]case f.bodyFilter - u4 x3 M9 v) O( m9 |6 }5 P; F
}
  [3 Y* d3 y- h  x6 Q实际过滤body的处理瞧一下,这和Header的处理是不同的。直接看不点:  t! A# h0 E" f6 c! O7 b* a
1. 它要的区块,单独取出来存到`blocks`中,它不要的继续留在`task`中。
- h$ R: c. r: h/ J  {9 I9 W2. 判断是不是fetcher请求的方法:如果交易列表和叔块列表计算出的hash值与区块头中的一样,并且消息来自请求的Peer,则就是fetcher请求的。
: w' v) Q" l0 @& ^- r& ]3. 将`blocks`中的区块加入到`queued`,终结。# m  P6 p7 t& q' @" g+ _
case filter := ( ?8 ]( o/ w, z' `1 R& \
blocks := []*types.Block{}
, E- M( I' c7 P, R( x9 s6 G$ W// 获取的每个body的txs列表和uncle列表
8 w3 K7 S1 f0 U4 Y0 y; v" W// 遍历每个区块的txs列表和uncle列表,计算hash后判断是否是当前fetcher请求的body: @7 R9 a7 H' x! m/ k  ?. f
for i := 0; i
' j3 ?9 \$ {* z! g  F/ T7 X. T}
' A6 m  [* o& ?& b6 R7 `  b! m+ K+ W+ I9 n$ f! L& j3 s
至此,fetcher获取完整区块的流程讲完了,fetcher模块中80%的代码也都贴出来了,还有2个值得看看的函数:
' ~$ E  P( U! M7 M+ c8 x1. `forgetHash(hash common.Hash)`:用于清空指定hash指的记/状态录信息。
  E: a5 h, t  o* `: h) T( R$ w2. `forgetBlock(hash common.Hash)`:用于从队列中移除一个区块。
% ~- b! ]  P5 X$ Z4 {( z最后了,再回到开始看看fetcher模块和新区块的传播流程,有没有豁然开朗。. c' i" j0 Y2 p3 `
6 X9 n) h; R9 r& a( L  m9 a8 q; i
BitMere.com 比特池塘系信息发布平台,比特池塘仅提供信息存储空间服务。
声明:该文观点仅代表作者本人,本文不代表比特池塘立场,且不构成建议,请谨慎对待。
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

成为第一个吐槽的人

刘艳琴 小学生
  • 粉丝

    0

  • 关注

    0

  • 主题

    3