Hi 游客

更多精彩,请登录!

比特池塘 区块链技术 正文

以太坊源码分析:fetcher模块和区块传播

刘艳琴
230 0 0
从区块传播策略入手,介绍新区块是如何传播到远端节点,以及新区块加入到远端节点本地链的过程,同时会介绍fetcher模块,fetcher的功能是处理Peer通知的区块信息。在介绍过程中,还会涉及到p2p,eth等模块,不会专门介绍,而是专注区块的传播和加入区块链的过程。
' D: V* n4 E* g当前代码是以太坊Release 1.8,如果版本不同,代码上可能存在差异。
3 U: G, t& @" h! u( A. l- t总体过程和传播策略
: I& P! j# V0 H9 n" |2 s本节从宏观角度介绍,节点产生区块后,为了传播给远端节点做了啥,远端节点收到区块后又做了什么,每个节点都连接了很多Peer,它传播的策略是什么样的?" F0 p. `, Z5 a; R8 Z4 L. y% m2 m7 h
总体流程和策略可以总结为,传播给远端Peer节点,Peer验证区块无误后,加入到本地区块链,继续传播新区块信息。具体过程如下。7 x4 `2 h5 q! @3 J+ b+ c
先看总体过程。产生区块后,miner模块会发布一个事件NewMinedBlockEvent,订阅事件的协程收到事件后,就会把新区块的消息,广播给它所有的peer,peer收到消息后,会交给自己的fetcher模块处理,fetcher进行基本的验证后,区块没问题,发现这个区块就是本地链需要的下一个区块,则交给blockChain进一步进行完整的验证,这个过程会执行区块所有的交易,无误后把区块加入到本地链,写入数据库,这个过程就是下面的流程图,图1。* h# D# D3 Q0 H2 n) }
. t* |6 Q4 [% R
总体流程图,能看到有个分叉,是因为节点传播新区块是有策略的。它的传播策略为:
8 W3 _2 a8 y5 n6 k3 y  H, |+ G假如节点连接了N个Peer,它只向Peer列表的sqrt(N)个Peer广播完整的区块消息。向所有的Peer广播只包含区块Hash的消息。6 @3 H# W$ u( M5 J
策略图的效果如图2,红色节点将区块传播给黄色节点:
: C+ P& R- }9 B4 d7 i
. T3 c, ^! n; l/ U6 T/ Q1 f" S
. o5 ^; K( x) U( N收到区块Hash的节点,需要从发送给它消息的Peer那里获取对应的完整区块,获取区块后就会按照图1的流程,加入到fetcher队列,最终插入本地区块链后,将区块的Hash值广播给和它相连,但还不知道这个区块的Peer。非产生区块节点的策略图,如图3,黄色节点将区块Hash传播给青色节点:
/ ?, k; E* i# Q" z! R7 }* @0 D8 C, I1 N9 o  _3 Y; T# v
至此,可以看出以太坊采用以石击水的方式,像水纹一样,层层扩散新产生的区块。
$ W' Z. o  G/ K- GFetcher模块是干啥的
/ V! d, ?" l- z8 \; @* ~fetcher模块的功能,就是收集其他Peer通知它的区块信息:1)完整的区块2)区块Hash消息。根据通知的消息,获取完整的区块,然后传递给eth模块把区块插入区块链。! c9 Q. ?  \$ J/ U- }# c. S. v- i& O3 j5 G
如果是完整区块,就可以传递给eth插入区块,如果只有区块Hash,则需要从其他的Peer获取此完整的区块,然后再传递给eth插入区块* y- b6 O' R! c7 I

& B% P% h3 P, V2 a1 v; `- w* A源码解读# n! t" I: p9 j+ O1 q
本节介绍区块传播和处理的细节东西,方式仍然是先用图解释流程,再是代码流程。0 Q/ i# Z7 i7 c6 @; x
产块节点的传播新区块5 H4 p) [! n, B5 F$ S; H
节点产生区块后,广播的流程可以表示为图4:
6 }+ ?% E% P6 M$ ?$ q6 t3 [发布事件事件处理函数选择要广播完整的Peer,然后将区块加入到它们的队列事件处理函数把区块Hash添加到所有Peer的另外一个通知队列每个Peer的广播处理函数,会遍历它的待广播区块队列和通知队列,把数据封装成消息,调用P2P接口发送出去: z' R2 x# U( f! [3 Z- e$ H/ U8 P

) g5 F7 G4 I5 b1 e; ?( J% d! a3 D6 m& E4 |. q2 i' l$ z0 b* ]
再看下代码上的细节。4 C! t  K! o* u
worker.wait()函数发布事件NewMinedBlockEvent。ProtocolManager.minedBroadcastLoop()是事件处理函数。它调用了2次pm.BroadcastBlock()。3 \  U, M6 Q$ ^$ s( o
# N) e' }/ f0 C: [! Q2 m1 p* C' L
// Mined broadcast loop& ?) g) t+ S' |/ ~: K2 y- j
func (pm *ProtocolManager) minedBroadcastLoop() {
  {1 M: p! P8 d0 k        // automatically stops if unsubscribe
4 s( V( `7 |& `. n; b% c4 v# f, F2 f! Y        for obj := range pm.minedBlockSub.Chan() {
' q; [! a! Z( w! A/ `                switch ev := obj.Data.(type) {9 q+ q; Z$ ?% s3 Z4 ]! W
                case core.NewMinedBlockEvent:
" |' j/ A* d+ S% z: y                        pm.BroadcastBlock(ev.Block, true)  // First propagate block to peers  H. N" {" v) A. F# m
                        pm.BroadcastBlock(ev.Block, false) // Only then announce to the rest- y% v6 ~* M% }
                }* m1 h( `' v2 {, F
        }6 V8 M) G0 x1 c6 A; x/ F; U
}
# W& F( r/ r# D8 q+ n: S! k! Dpm.BroadcastBlock()的入参propagate为真时,向部分Peer广播完整的区块,调用peer.AsyncSendNewBlock(),否则向所有Peer广播区块头,调用peer.AsyncSendNewBlockHash(),这2个函数就是把数据放入队列,此处不再放代码。7 Y7 f% c. ~+ S, O
// BroadcastBlock will either propagate a block to a subset of it's peers, or
0 U8 Z% W( i$ x  D/ i// will only announce it's availability (depending what's requested).0 D- W& Q! }, @
func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) {
, X8 ~$ c  w; D- i- w9 L% u  K        hash := block.Hash()9 b9 b, b2 A8 h7 u8 A% l& q' i
        peers := pm.peers.PeersWithoutBlock(hash)# Z, S& [. m( l  c# b8 N: l
        // If propagation is requested, send to a subset of the peer; \" g  V* ^: x2 }+ T& t
        // 这种情况,要把区块广播给部分peer8 w. g- M& u) P$ M% G3 p! F
        if propagate {
1 l# F4 s% k) c4 c* v                // Calculate the TD of the block (it's not imported yet, so block.Td is not valid)6 w* [1 x2 F# {# d2 z
                // 计算新的总难度
* D7 r$ F: h% u* b& g# R                var td *big.Int
1 W$ R/ k* ~: [6 k) ]8 c+ B                if parent := pm.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1); parent != nil {. W, E6 n' S7 V) ?
                        td = new(big.Int).Add(block.Difficulty(), pm.blockchain.GetTd(block.ParentHash(), block.NumberU64()-1))
: x+ }  r! b4 m: H; \! K6 ?                } else {
7 n9 A& z: i2 ^7 y' q                        log.Error("Propagating dangling block", "number", block.Number(), "hash", hash)
/ J* p- P/ }0 r+ `: H1 P, A) e                        return
% U/ c) _( A; q7 d                }
1 e5 f8 G" C' T/ e0 S) H) ^( J                // Send the block to a subset of our peers) Q5 }( {1 M7 {3 q: T
                // 广播区块给部分peer! L$ v# P7 N- q
                transfer := peers[:int(math.Sqrt(float64(len(peers))))]8 g* f0 ^. Z" I* O
                for _, peer := range transfer {
( K9 I4 ?# A" m3 R: ?                        peer.AsyncSendNewBlock(block, td)! N$ ~; j8 f4 o9 n' L; u4 t$ u
                }
& |1 G$ K! h* D' R2 _6 u                log.Trace("Propagated block", "hash", hash, "recipients", len(transfer), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))7 F% T; B/ g/ ?' t6 F, l
                return
6 S  T- q% t; N# s' T        }/ T+ r* t7 o) E' Q
        // Otherwise if the block is indeed in out own chain, announce it
  N- K6 J7 G" r; E# f5 B. c        // 把区块hash值广播给所有peer
! i5 j% O% k1 j* B" L% H        if pm.blockchain.HasBlock(hash, block.NumberU64()) {7 T# `) M2 M9 s' h1 |" q
                for _, peer := range peers {' W5 e, Y1 s7 G% z' `8 F5 J- X5 n" m, o
                        peer.AsyncSendNewBlockHash(block)
; j  k5 g- e7 P, h3 \6 c# B, E                }9 R' {4 s2 x5 v6 H0 J
                log.Trace("Announced block", "hash", hash, "recipients", len(peers), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))
% \- j" a4 k9 v. u; @3 U# ~        }9 {, s! O; V, p+ u# z; s/ P
}
7 [+ U7 w* A1 a; P- x) J: Bpeer.broadcase()是每个Peer连接的广播函数,它只广播3种消息:交易、完整的区块、区块的Hash,这样表明了节点只会主动广播这3中类型的数据,剩余的数据同步,都是通过请求-响应的方式。
. o4 p- Z$ b6 R3 }. B# h$ C# U// broadcast is a write loop that multiplexes block propagations, announcements
$ S' \$ y+ ?4 r9 {6 m+ W7 h5 Q// and transaction broadcasts into the remote peer. The goal is to have an async
2 j7 k. s9 _+ ]// writer that does not lock up node internals.; k* C* q; x( P5 Y" x2 n
func (p *peer) broadcast() {
4 g* l1 b  d5 Q, \/ [9 j/ \        for {! _" M3 u7 |* z3 v- j
                select {4 U# _( p! g9 g' h8 ]0 ~3 c! b+ q
                // 广播交易0 n0 n- P  y$ ^' N0 k  e3 D8 k$ N  p
                case txs :=
: o4 T  f8 v4 q! B& X3 xPeer节点处理新区块
0 |; Z/ a% ?0 [4 u/ B本节介绍远端节点收到2种区块同步消息的处理,其中NewBlockMsg的处理流程比较清晰,也简洁。NewBlockHashesMsg消息的处理就绕了2绕,从总体流程图1上能看出来,它需要先从给他发送消息Peer那里获取到完整的区块,剩下的流程和NewBlockMsg又一致了。
) p7 J! r" c0 S; u$ n# d这部分涉及的模块多,画出来有种眼花缭乱的感觉,但只要抓住上面的主线,代码看起来还是很清晰的。通过图5先看下整体流程。
1 j& N: A# Z$ ~7 O- J( j0 V消息处理的起点是ProtocolManager.handleMsg,NewBlockMsg的处理流程是蓝色标记的区域,红色区域是单独的协程,是fetcher处理队列中区块的流程,如果从队列中取出的区块是当前链需要的,校验后,调用blockchian.InsertChain()把区块插入到区块链,最后写入数据库,这是黄色部分。最后,绿色部分是NewBlockHashesMsg的处理流程,代码流程上是比较复杂的,为了能通过图描述整体流程,我把它简化掉了。
7 L- y0 s6 h6 X: p$ j0 D) J+ ?2 \( f' u) D
仔细看看这幅图,掌握整体的流程后,接下来看每个步骤的细节。! S6 g6 h8 ^8 W" j- `
NewBlockMsg的处理
: U% ~) t* H8 u3 |本节介绍节点收到完整区块的处理,流程如下:
" Q( T$ N8 W- u首先进行RLP编解码,然后标记发送消息的Peer已经知道这个区块,这样本节点最后广播这个区块的Hash时,不会再发送给该Peer。
, W$ R( h! z) J# L4 J) ?# E3 g将区块存入到fetcher的队列,调用fetcher.Enqueue。0 W! {, _5 s1 P
更新Peer的Head位置,然后判断本地链是否落后于Peer的链,如果是,则通过Peer更新本地链。1 [( k9 i* K8 J& w3 O& c# N0 v
只看handle.Msg()的NewBlockMsg相关的部分。
: ?0 L! G# }! W- F$ acase msg.Code == NewBlockMsg:- K1 T, v% {, x& y8 C6 G
        // Retrieve and decode the propagated block; a5 E' h& G. S6 X
        // 收到新区块,解码,赋值接收数据
& h/ F' A/ ]/ z        var request newBlockData
. o0 }, a9 h. H9 m1 b0 G        if err := msg.Decode(&request); err != nil {  U) K# ~3 h( y' U
                return errResp(ErrDecode, "%v: %v", msg, err)
! u: Y' g5 J1 @% E1 X2 M        }5 u( P: {6 ?8 v
        request.Block.ReceivedAt = msg.ReceivedAt
  z0 ~6 N  ]9 q" [        request.Block.ReceivedFrom = p
& m& _  z  q4 g3 {% g( V: A        // Mark the peer as owning the block and schedule it for import
- N/ |! r- y6 J: Z        // 标记peer知道这个区块
* F7 f  v0 h9 R6 i' Q% d% ^        p.MarkBlock(request.Block.Hash())
# S6 T5 ~4 x. I        // 为啥要如队列?已经得到完整的区块了
4 y% m- R& c- P) ?! K        // 答:存入fetcher的优先级队列,fetcher会从队列中选取当前高度需要的块, j: w3 X4 Y  A0 e* S
        pm.fetcher.Enqueue(p.id, request.Block)
" E4 b+ u, ~9 S- s& ?        // Assuming the block is importable by the peer, but possibly not yet done so,) k0 I  r% Z. F( I( A9 F' Y
        // calculate the head hash and TD that the peer truly must have.$ I0 `: R% T% s& X  _. S
        // 截止到parent区块的头和难度1 F# Y% I7 e' [  c
        var (
" o3 {  }" M8 A- v8 X                trueHead = request.Block.ParentHash()
2 r. \1 h6 B, a$ C                trueTD   = new(big.Int).Sub(request.TD, request.Block.Difficulty())
8 S9 g% Z1 b! O5 i        )
4 h4 _/ a; a( z+ Y$ Y' A' s        // Update the peers total difficulty if better than the previous6 P( `3 G/ S$ G5 m; B$ ~+ F
        // 如果收到的块的难度大于peer之前的,以及自己本地的,就去和这个peer同步
' s8 j, p# b& U) B        // 问题:就只用了一下块里的hash指,为啥不直接使用这个块呢,如果这个块不能用,干嘛不少发送些数据,减少网络负载呢。- K$ O) z' a$ \; d+ i
        // 答案:实际上,这个块加入到了优先级队列中,当fetcher的loop检查到当前下一个区块的高度,正是队列中有的,则不再向peer请求5 R* P4 E3 f# F5 l/ }5 ?6 q/ I
        // 该区块,而是直接使用该区块,检查无误后交给block chain执行insertChain
* Z* r2 L" I( T8 d: \  X        if _, td := p.Head(); trueTD.Cmp(td) > 0 {
; s' r. V8 K7 F: t/ n# f                p.SetHead(trueHead, trueTD)
! j# z5 `; V: @) [$ L                // Schedule a sync if above ours. Note, this will not fire a sync for a gap of5 F$ r% m6 t) ]. L1 o
                // a singe block (as the true TD is below the propagated block), however this# T+ u0 G, _9 ]
                // scenario should easily be covered by the fetcher.
8 @' }2 D* Z3 r3 p6 E, h) O+ o                currentBlock := pm.blockchain.CurrentBlock()
% O. |! y8 ], r3 m                if trueTD.Cmp(pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64())) > 0 {+ }1 T, l: g; w2 K5 _' K6 `$ _+ I
                        go pm.synchronise(p)
) d- l4 J( G- }                }
) E4 \2 n8 I4 r+ s8 {1 }& d+ e/ H' q        }' E4 r2 t& j2 `- F  g& Z
//------------------------ 以上 handleMsg. |; q3 |: U0 K- \
// Enqueue tries to fill gaps the the fetcher's future import queue.
$ o$ B% j# t( {" ~% d: T8 _// 发给inject通道,当前协程在handleMsg,通过通道发送给fetcher的协程处理, @" r3 X( W5 S" @
func (f *Fetcher) Enqueue(peer string, block *types.Block) error {$ {) E1 Y! h+ A' D5 A. S/ x3 j2 `
        op := &inject{: h2 h" y3 K& {% f) f. N3 J
                origin: peer,
/ K7 I' @5 E8 C/ B% {* H                block:  block,
4 |0 ~' x: k! I. n+ _        }6 v6 P$ z! e: A& O2 Q; f
        select {7 u8 w! f! B# t/ o. L$ ^
        case f.inject  blockLimit {
9 p: H3 w) b" l+ |* i4 e                log.Debug("Discarded propagated block, exceeded allowance", "peer", peer, "number", block.Number(), "hash", hash, "limit", blockLimit)6 K! S; w5 X2 D3 g
                propBroadcastDOSMeter.Mark(1)& ]) F0 g7 Y& ~3 f& |$ ~, j3 p  c
                f.forgetHash(hash): {1 F8 t, S* l9 Y
                return
1 p0 O! n. Z1 d) B9 W3 d1 z; j        }# N% p" V* N5 B( ]" ~
        // Discard any past or too distant blocks
9 [( [( W% x0 @7 Z        // 高度检查:未来太远的块丢弃
. u( }" P/ _) }7 }: }( C; o        if dist := int64(block.NumberU64()) - int64(f.chainHeight()); dist  maxQueueDist {" q1 i' t) L6 B5 d  s$ G
                log.Debug("Discarded propagated block, too far away", "peer", peer, "number", block.Number(), "hash", hash, "distance", dist)
$ V/ P( g. v( _5 P9 M0 A                propBroadcastDropMeter.Mark(1)
2 j. L6 A6 i& w# H& [; l                f.forgetHash(hash)
3 \6 A0 Z- f$ U7 i                return
3 h" X5 X6 }4 }        }
/ t+ ~; T! L" |  N% D0 W        // Schedule the block for future importing
( N* U* J( r/ M( ~& |' O        // 块先加入优先级队列,加入链之前,还有很多要做5 S& i3 ]3 s* e8 w
        if _, ok := f.queued[hash]; !ok {
7 h. }' }& ~# P  x                op := &inject{0 j& ~% T" t7 P5 c
                        origin: peer,4 [/ {7 [5 E6 e2 l5 K/ o
                        block:  block,7 z+ c+ E- y* k+ g9 ~' @# x
                }
# _" D& u* ?9 b& z- N+ u  V. z                f.queues[peer] = count
$ l, ]5 Q1 m1 F; B: {                f.queued[hash] = op
- M" U1 z+ i4 @9 Z" [0 o* C& G                f.queue.Push(op, -float32(block.NumberU64()))! v0 G8 m! c+ @" G# ?
                if f.queueChangeHook != nil {
7 o6 [" ~7 V0 ]2 x; t: ~$ N8 ?6 `                        f.queueChangeHook(op.block.Hash(), true)* @" z9 l4 H( |  A/ Q( F5 o' z
                }
, Z- @  c; {" G" U                log.Debug("Queued propagated block", "peer", peer, "number", block.Number(), "hash", hash, "queued", f.queue.Size())
; O" O/ N% c: t  y        }' i8 _$ V. e: I
}
: Y( e, q! k2 `- m/ u( gfetcher队列处理
: v: v5 p' g2 U1 x9 Y( ~1 S) S4 Q本节我们看看,区块加入队列后,fetcher如何处理区块,为何不直接校验区块,插入到本地链?. H8 s; a: l6 n7 y" y& T
由于以太坊又Uncle的机制,节点可能收到老一点的一些区块。另外,节点可能由于网络原因,落后了几个区块,所以可能收到“未来”的一些区块,这些区块都不能直接插入到本地链。7 s9 Y4 e4 u9 w5 i4 R
区块入的队列是一个优先级队列,高度低的区块会被优先取出来。fetcher.loop是单独协程,不断运转,清理fecther中的事务和事件。首先会清理正在fetching的区块,但已经超时。然后处理优先级队列中的区块,判断高度是否是下一个区块,如果是则调用f.insert()函数,校验后调用BlockChain.InsertChain(),成功插入后,广播新区块的Hash) h: d7 I8 H! e7 p0 N* I& C, x) G
// Loop is the main fetcher loop, checking and processing various notification
, f1 h" j. f+ f// events.% j" D0 e: a4 k) ~9 S
func (f *Fetcher) loop() {
2 P9 `& B9 [* F6 V" ^        // Iterate the block fetching until a quit is requested
) u# G7 `3 w% r# n, w9 Y        fetchTimer := time.NewTimer(0)
5 \6 @5 a  h  t$ \! x: `4 B        completeTimer := time.NewTimer(0)# M# i; d1 A' i' J! b; P" y
        for {* ?/ [6 x# x2 I0 ^
                // Clean up any expired block fetches
8 j8 _# l! K' g2 j4 c, J% d                // 清理过期的区块' i+ _$ @* J$ U
                for hash, announce := range f.fetching {' f7 \6 d5 E, [6 @
                        if time.Since(announce.time) > fetchTimeout {0 x$ b8 H' F4 a3 ~$ C! W5 ]  K
                                f.forgetHash(hash)* j) a1 [/ Y# j( e( q* {
                        }- G, |# x0 k/ I% p- O) [7 R
                }7 X7 g7 g, D: k! r! z  t' R! y
                // Import any queued blocks that could potentially fit2 e9 Q  e4 p% Q) m/ ^1 f9 m
                // 导入队列中合适的块8 P0 }: e9 }* r& _1 o
                height := f.chainHeight()
. N1 J/ W" n1 G' ^! n7 P9 W+ D! w5 k                for !f.queue.Empty() {
4 b! J8 |1 D% F7 K( D7 l+ H! Z                        op := f.queue.PopItem().(*inject)
$ c  i9 e  X* C, [% h                        hash := op.block.Hash()
( B. H  q+ b5 Y0 S3 V4 R                        if f.queueChangeHook != nil {
" M. X" Z$ |4 M7 V& c6 l! l* N                                f.queueChangeHook(hash, false)
5 T3 A# ?0 `+ Q5 u' i8 I2 n                        }
1 o6 H5 @  k8 q9 Q3 [2 T2 y                        // If too high up the chain or phase, continue later
0 R/ p% f8 d% |5 L  D7 `                        // 块不是链需要的下一个块,再入优先级队列,停止循环; U8 a# Z& p- u
                        number := op.block.NumberU64()9 _* `4 s0 b" t8 G
                        if number > height+1 {$ A) J; z: f. M
                                f.queue.Push(op, -float32(number))
' ^9 L7 E6 a0 X                                if f.queueChangeHook != nil {& C% M8 Q0 ^+ Q2 \0 x) q1 H1 u1 G& {
                                        f.queueChangeHook(hash, true)
; O+ g8 q, I. a3 A. ]- |$ C3 [                                }
! e0 R# u" Y$ Q, @                                break
2 A; t* h4 a" V$ U  m                        }) A' k* L/ h# _( o. w6 q, g+ v
                        // Otherwise if fresh and still unknown, try and import4 _' _8 W5 N, v1 Z$ L+ R
                        // 高度正好是我们想要的,并且链上也没有这个块* j$ H( B2 |2 J- S- f
                        if number+maxUncleDist
2 Q6 q! ?6 c$ h: X" cfunc (f *Fetcher) insert(peer string, block *types.Block) {
( |( |3 v- b' T; I, l$ M# }        hash := block.Hash()4 p/ \0 M% u( y8 _
        // Run the import on a new thread
' h3 O. k6 M' ~* ?% M        log.Debug("Importing propagated block", "peer", peer, "number", block.Number(), "hash", hash)
) V) h; |' K! E5 x; r) @( l* r        go func() {
" `4 x5 G* y% X" Y                defer func() { f.done - y% U, u, p; j( i/ Z8 V
NewBlockHashesMsg的处理
; j4 g0 {# |# H$ z本节介绍NewBlockHashesMsg的处理,其实,消息处理是简单的,而复杂一点的是从Peer哪获取完整的区块,下节再看。
% {* N  v. \% M% i流程如下:
! J3 R0 \8 ?0 X, c3 i. I0 b2 N对消息进行RLP解码,然后标记Peer已经知道此区块。寻找出本地区块链不存在的区块Hash值,把这些未知的Hash通知给fetcher。fetcher.Notify记录好通知信息,塞入notify通道,以便交给fetcher的协程。fetcher.loop()会对notify中的消息进行处理,确认区块并非DOS攻击,然后检查区块的高度,判断该区块是否已经在fetching或者comleting(代表已经下载区块头,在下载body),如果都没有,则加入到announced中,触发0s定时器,进行处理。
1 s/ [8 @( J. ]关于announced下节再介绍。
1 ^  R5 D8 _$ q. H0 ]& U& T: t3 m+ e
// handleMsg()部分. x5 q& ~8 I& D& _4 ]$ v
case msg.Code == NewBlockHashesMsg:, N' g; G- E8 d) }7 E. y7 E& l
        var announces newBlockHashesData' T5 x; `3 T/ d) v* z3 Q! Z0 i
        if err := msg.Decode(&announces); err != nil {
+ X# L, Y' b* I" N1 Q                return errResp(ErrDecode, "%v: %v", msg, err)& T- z2 A2 C3 `( Y/ ~& }9 r% X
        }
3 N* I% G8 n, H        // Mark the hashes as present at the remote node
, l& Z4 a1 i, W  |; N        for _, block := range announces {
! R* i6 G) C! }5 v0 R! T6 v                p.MarkBlock(block.Hash)
0 y( ?# ~7 T# S4 c        }
2 l9 G( T# M# t3 @7 S! U        // Schedule all the unknown hashes for retrieval- J) \8 T/ S, L
        // 把本地链没有的块hash找出来,交给fetcher去下载
' @7 N$ a  s( a        unknown := make(newBlockHashesData, 0, len(announces))
7 k. Q/ F: o0 r# h& H6 E        for _, block := range announces {  Y7 f; \$ U1 @& L
                if !pm.blockchain.HasBlock(block.Hash, block.Number) {3 p, i% d8 l7 z- G- U
                        unknown = append(unknown, block)9 d; O1 H5 ~( Z; i4 @' _
                }
% ^, C& W1 F/ T/ F        }6 z* _8 U3 \% o
        for _, block := range unknown {7 z, u2 Y% J' o0 J" i  ~5 ]+ E
                pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestOneHeader, p.RequestBodies)7 k1 P7 C' }9 p& T7 O" D
        }' M  i6 K, v; Q) V+ z. F
// Notify announces the fetcher of the potential availability of a new block in
- ^- D) i5 b0 u  l// the network.
7 N3 ^# G8 w. T5 s! y// 通知fetcher(自己)有新块产生,没有块实体,有hash、高度等信息2 L7 V: z( `" Y2 W- j& v. {
func (f *Fetcher) Notify(peer string, hash common.Hash, number uint64, time time.Time,
% y8 x! E% H$ T. b        headerFetcher headerRequesterFn, bodyFetcher bodyRequesterFn) error {
* o# {7 X! h8 x+ k. {5 B7 X. P        block := &announce{
& a3 }, ?3 X  {                hash:        hash,
' [" h+ `7 v2 P0 x5 S& P9 \                number:      number,- T2 U9 L. l& K# I9 [
                time:        time,$ G; g. T" B* p2 U. U) Q. ~1 m
                origin:      peer,
. ], D$ l: U( ?+ S1 c4 d# l' \/ m                fetchHeader: headerFetcher,
; k' A4 s6 D* d  a* N                fetchBodies: bodyFetcher,) w- Z: q% x3 ]8 x, J: k+ I1 f
        }
3 f4 `* ^. u9 w4 C& z" E' P0 G# S        select {5 D: w: v9 f- m- s$ a: D, ^' c; F8 T
        case f.notify  hashLimit {
. u- M, s5 T% T$ C                log.Debug("Peer exceeded outstanding announces", "peer", notification.origin, "limit", hashLimit)
0 _8 |# [& A; y$ n. z* ?                propAnnounceDOSMeter.Mark(1)) s$ ~, E0 s5 t, J" P
                break
' n7 N8 C5 G6 l        }& t% B  d. H, V/ f5 l2 g
        // If we have a valid block number, check that it's potentially useful
! B; L! X( G3 S( `6 Z( H: o# ^% U2 C        // 高度检查- l0 W& J3 f) q9 [. ?( ~$ Z8 E( C
        if notification.number > 0 {$ [! a1 `0 B8 m: L
                if dist := int64(notification.number) - int64(f.chainHeight()); dist  maxQueueDist {
1 C; s2 G4 t+ [8 b7 G                        log.Debug("Peer discarded announcement", "peer", notification.origin, "number", notification.number, "hash", notification.hash, "distance", dist)3 c5 p0 Z6 a) e- H; O" B8 d
                        propAnnounceDropMeter.Mark(1)4 a2 Q5 k9 m, p3 D
                        break4 a$ o/ R; J% A
                }( {" q; H$ p' L7 y. H& N3 g& u' g5 w
        }2 i1 @8 e; k3 }1 y
        // All is well, schedule the announce if block's not yet downloading8 [; C( `; O% M+ `0 r5 Z$ @+ a
        // 检查是否已经在下载,已下载则忽略
" N. [- G! n0 C6 H( X        if _, ok := f.fetching[notification.hash]; ok {
8 Y  M+ i8 t. f" f. {                break
$ n) y; B6 d# e4 u/ B; x( J        }
2 {. p# W( P5 O  W0 A& \* x        if _, ok := f.completing[notification.hash]; ok {
- c$ m9 g5 s4 _$ P7 O5 u                break
# e) l; V( s; A, l: q# n        }( d# F- @5 b+ e8 a# X( ?8 H
        // 更新peer已经通知给我们的区块数量
, M- j7 d: i4 Y$ u+ v        f.announces[notification.origin] = count
2 I& |2 d8 u8 l, D( f, P        // 把通知信息加入到announced,供调度
, [4 K/ R. \0 n4 q- T# _% s        f.announced[notification.hash] = append(f.announced[notification.hash], notification)
5 z2 G* ~% y9 e+ K) v/ k4 c0 R        if f.announceChangeHook != nil && len(f.announced[notification.hash]) == 1 {
; R7 Y2 ~+ [) c, w8 j                f.announceChangeHook(notification.hash, true)  \' C+ v/ y0 K4 w4 l
        }
6 C! _$ w% z: b        if len(f.announced) == 1 {
  }9 H* j' |; Y: G' I) r/ f                // 有通知放入到announced,则重设0s定时器,loop的另外一个分支会处理这些通知3 N3 w: o* p( @1 F3 z3 w$ y
                f.rescheduleFetch(fetchTimer)
! s/ b4 z  K5 u- _3 V$ ?6 Q% X        }( y4 R2 Q9 x+ j. R: {. \! v
fetcher获取完整区块
' S  }3 [/ L2 e' [3 L9 t本节介绍fetcher获取完整区块的过程,这也是fetcher最重要的功能,会涉及到fetcher至少80%的代码。单独拉放一大节吧。
, l" i1 B) T" a; `; B  }3 h! J  jFetcher的大头
# @5 x: }5 F- Z3 F" ]3 lFetcher最主要的功能就是获取完整的区块,然后在合适的实际交给InsertChain去验证和插入到本地区块链。我们还是从宏观入手,看Fetcher是如何工作的,一定要先掌握好宏观,因为代码层面上没有这么清晰。6 b' N0 J$ @9 Z4 e& I  k
宏观* I6 z- s2 e& H
首先,看两个节点是如何交互,获取完整区块,使用时序图的方式看一下,见图6,流程很清晰不再文字介绍。  [' G1 D9 r" r3 H# @- s" Z
7 [, N) z. }; N$ ^4 F0 G: A
再看下获取区块过程中,fetcher内部的状态转移,它使用状态来记录,要获取的区块在什么阶段,见图7。我稍微解释一下:' ~$ l8 c: b! a
收到NewBlockHashesMsg后,相关信息会记录到announced,进入announced状态,代表了本节点接收了消息。announced由fetcher协程处理,经过校验后,会向给他发送消息的Peer发送请求,请求该区块的区块头,然后进入fetching状态。获取区块头后,如果区块头表示没有交易和uncle,则转移到completing状态,并且使用区块头合成完整的区块,加入到queued优先级队列。获取区块头后,如果区块头表示该区块有交易和uncle,则转移到fetched状态,然后发送请求,请求交易和uncle,然后转移到completing状态。收到交易和uncle后,使用头、交易、uncle这3个信息,生成完整的区块,加入到队列queued。7 ~5 j; R: ?4 V" M$ F3 ^& g7 O
; q9 j* [- U8 F. m- E* f0 `$ q+ Y7 l

9 V& }- a. T( r4 Y1 @1 I/ G" @微观/ |& A4 U2 `7 L
接下来就是从代码角度看如何获取完整区块的流程了,有点多,看不懂的时候,再回顾下上面宏观的介绍图。
, {' v7 l& T- U首先看Fetcher的定义,它存放了通信数据和状态管理,捡加注释的看,上文提到的状态,里面都有。
2 a* T1 [1 d2 ~! t. k0 [+ Y// Fetcher is responsible for accumulating block announcements from various peers4 t! X' |5 q" S, q/ h9 B
// and scheduling them for retrieval.3 s+ ^! R6 M( b. O8 \
// 积累块通知,然后调度获取这些块6 v: e4 S1 Z' M: X
type Fetcher struct {( k; C; g4 w  t- M
        // Various event channels
( b, o6 j1 {8 c; }7 q0 _% B    // 收到区块hash值的通道
1 \" J) e% }4 ]        notify chan *announce
3 k/ w& u" k$ [" o$ R9 T3 e    // 收到完整区块的通道
. N" {; P% n* ?8 |        inject chan *inject
  R; ?1 ?( v2 n% g  d$ @+ Z; g, F        blockFilter chan chan []*types.Block
+ F5 }) W8 I& w% H. X" ]        // 过滤header的通道的通道$ h5 ?7 b, ?. B+ G6 J! j
        headerFilter chan chan *headerFilterTask1 H' P$ y$ Q" ]% Y3 x5 |/ o5 e
        // 过滤body的通道的通道. A! t# ]4 F& ?( U( h
        bodyFilter chan chan *bodyFilterTask% T( L, ~. D) Z
        done chan common.Hash% T2 @  j4 Y' P, h$ x
        quit chan struct{}6 t4 `1 T. M2 Z% W9 r
        // Announce states
( N3 Y& ^+ V! ]- {. s        // Peer已经给了本节点多少区块头通知
1 m# k5 r' q- L, d3 O' L- U" N        announces map[string]int // Per peer announce counts to prevent memory exhaustion
" d1 u. e5 Q  Y1 H9 D        // 已经announced的区块列表
# p5 w0 H/ c5 n6 s% {) L2 i        announced map[common.Hash][]*announce // Announced blocks, scheduled for fetching
' D* s5 N/ k, z- E. I9 e* X( |2 S        // 正在fetching区块头的请求+ k' ~. |* x+ A% P5 O+ q  O' L
        fetching map[common.Hash]*announce // Announced blocks, currently fetching
; U8 ]) c+ j+ `* \4 l        // 已经fetch到区块头,还差body的请求,用来获取body
' Z) W- D: E- @9 M6 j        fetched map[common.Hash][]*announce // Blocks with headers fetched, scheduled for body retrieval
0 N( I( K1 e6 y4 V  I$ T/ I* t        // 已经得到区块头的& [$ p/ p1 g: U" _$ w9 Q
        completing map[common.Hash]*announce // Blocks with headers, currently body-completing! M7 _/ q; O7 Y- q* U; C( ?+ ?0 ]4 Z0 U
        // Block cache3 a+ d+ J) `& v8 _! C! P2 s
        // queue,优先级队列,高度做优先级3 g! i' c/ m( f; _
        // queues,统计peer通告了多少块
8 ?+ l* c' b- ^4 f# _/ ~5 O; `8 w# k! I2 B  D        // queued,代表这个块如队列了,. S* N. ~2 C+ h3 F2 @  T
        queue  *prque.Prque            // Queue containing the import operations (block number sorted)
% U' X$ `* t( @/ |2 e. g        queues map[string]int          // Per peer block counts to prevent memory exhaustion2 G8 U& T5 |7 l* N
        queued map[common.Hash]*inject // Set of already queued blocks (to dedupe imports)# e: }$ Y, y7 U5 e
        // Callbacks
5 ?/ m8 e( \4 F  G8 l: T        getBlock       blockRetrievalFn   // Retrieves a block from the local chain
' c( ^/ C9 k3 R" k/ @, x1 |        verifyHeader   headerVerifierFn   // Checks if a block's headers have a valid proof of work,验证区块头,包含了PoW验证
) d$ a* r( n, k) @* t/ s        broadcastBlock blockBroadcasterFn // Broadcasts a block to connected peers,广播给peer% g+ P% E) E* _3 Y
        chainHeight    chainHeightFn      // Retrieves the current chain's height/ ^' d; q8 K5 Q8 F5 T+ X7 ]# K
        insertChain    chainInsertFn      // Injects a batch of blocks into the chain,插入区块到链的函数
0 B$ s- o- z9 Y& i, D' ]        dropPeer       peerDropFn         // Drops a peer for misbehaving
& k* x" r7 I. g; W1 {" C/ G        // Testing hooks
3 ?5 H& ]. ]2 B& F; V        announceChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a hash from the announce list4 e2 n) a* b8 T- ~! H) Z
        queueChangeHook    func(common.Hash, bool) // Method to call upon adding or deleting a block from the import queue
% [( ?- [; c: T8 k& F        fetchingHook       func([]common.Hash)     // Method to call upon starting a block (eth/61) or header (eth/62) fetch8 z# c2 H* l2 M) S7 X
        completingHook     func([]common.Hash)     // Method to call upon starting a block body fetch (eth/62)
/ b" A" a) K6 ^" g7 X, E* J* m' A+ ]        importedHook       func(*types.Block)      // Method to call upon successful block import (both eth/61 and eth/62)
7 K/ G5 M& r0 N* [: _) \3 n! B2 u( s4 K- y}7 n+ a- o- z. m  N; z5 a
NewBlockHashesMsg消息的处理前面的小节已经讲过了,不记得可向前翻看。这里从announced的状态处理说起。loop()        中,fetchTimer超时后,代表了收到了消息通知,需要处理,会从announced中选择出需要处理的通知,然后创建请求,请求区块头,由于可能有很多节点都通知了它某个区块的Hash,所以随机的从这些发送消息的Peer中选择一个Peer,发送请求的时候,为每个Peer都创建了单独的协程。6 Z8 X, f6 B8 t2 O/ `
case  arriveTimeout-gatherSlack {
" f) D. o  W2 {: X6 r                        // Pick a random peer to retrieve from, reset all others" U6 ]; S3 ]" C+ g
                        // 可能有很多peer都发送了这个区块的hash值,随机选择一个peer
9 G0 U4 T  N0 I. N                        announce := announces[rand.Intn(len(announces))]
) d+ m2 B6 d* P: ]4 F, e1 w- C                        f.forgetHash(hash)
6 b, c8 D* J4 u/ H9 {, K                        // If the block still didn't arrive, queue for fetching
% }; r3 }9 u" r0 F) ^                        // 本地还没有这个区块,创建获取区块的请求
9 w. a) n0 A( z                        if f.getBlock(hash) == nil {4 Y0 g( `" v0 |$ A6 `8 Y1 G
                                request[announce.origin] = append(request[announce.origin], hash)
$ _$ F% I% v7 z/ T9 o: z$ a9 F                                f.fetching[hash] = announce
/ y* v: v5 d0 h4 p" ]                        }
( y3 ^% L$ H* |1 r5 Y4 {  _0 H. i: T                }
! G8 Y! Z5 G( `+ \$ v9 Z        }. Z+ R$ @; n, q# m
        // Send out all block header requests9 t( l1 o. D$ v1 }
        // 把所有的request发送出去9 ^/ {3 d8 e& v( t  w3 y. y
        // 为每一个peer都创建一个协程,然后请求所有需要从该peer获取的请求
( E' S/ b* u$ @; m. B0 G        for peer, hashes := range request {, ~8 I' y7 h9 x0 r6 u
                log.Trace("Fetching scheduled headers", "peer", peer, "list", hashes)
. ^) e* d+ y* H- o* U8 X2 B                // Create a closure of the fetch and schedule in on a new thread
# G/ X, ~. K1 h4 e" S+ B                fetchHeader, hashes := f.fetching[hashes[0]].fetchHeader, hashes/ `, \! s+ u$ a! G. R4 H
                go func() {
( ]3 r9 L; h! t+ u/ Y% u( I                        if f.fetchingHook != nil {
! k/ r, i3 X: m8 _2 P                                f.fetchingHook(hashes), U( i1 r: r  a8 g, L; h
                        }
5 z4 N5 t! s7 T! R# O* u                        for _, hash := range hashes {
8 v4 a' }  t% u' k1 R                                headerFetchMeter.Mark(1)$ R1 r! F- L/ `# S2 s. \0 u8 y
                                fetchHeader(hash) // Suboptimal, but protocol doesn't allow batch header retrievals
) s9 C! r. |3 |+ ~3 N8 l0 k% Q0 i                        }% j0 W2 q" j% B& r8 T2 K
                }()3 o6 R  f# S% w' X
        }2 q# [% Y; R  D" u8 M3 G! z  U# Z
        // Schedule the next fetch if blocks are still pending
1 ^4 @% s5 O3 R- R1 X2 E2 i0 m        f.rescheduleFetch(fetchTimer)' F6 q( k) V# W: C! Q/ C$ R* T. ~$ ?
从Notify的调用中,可以看出,fetcherHeader()的实际函数是RequestOneHeader(),该函数使用的消息是GetBlockHeadersMsg,可以用来请求多个区块头,不过fetcher只请求一个。
' e+ q) \- o9 H5 M6 n' K& h* `pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestOneHeader, p.RequestBodies)$ w! x' S& T5 p. Q
// RequestOneHeader is a wrapper around the header query functions to fetch a
& A/ a, U0 L- |// single header. It is used solely by the fetcher.: z( d) ?  Q& x. g+ O
func (p *peer) RequestOneHeader(hash common.Hash) error {
; |' I, J) F5 [; A* y# b        p.Log().Debug("Fetching single header", "hash", hash)
. H0 f8 d% s: h5 c# }6 c        return p2p.Send(p.rw, GetBlockHeadersMsg, &getBlockHeadersData{Origin: hashOrNumber{Hash: hash}, Amount: uint64(1), Skip: uint64(0), Reverse: false})
; B* r3 H7 a% g5 P8 k}9 l% b6 |5 T, U' Q* O% [1 e
GetBlockHeadersMsg的处理如下:因为它是获取多个区块头的,所以处理起来比较“麻烦”,还好,fetcher只获取一个区块头,其处理在20行~33行,获取下一个区块头的处理逻辑,这里就不看了,最后调用SendBlockHeaders()将区块头发送给请求的节点,消息是BlockHeadersMsg。* [0 J% N" _5 x" |  G8 U: B
···, R4 t9 X" D' ^9 i
// handleMsg()
% x$ M( w( \; r% k; ^5 ^// Block header query, collect the requested headers and reply: n8 r% s3 U6 G6 H" d$ ?
case msg.Code == GetBlockHeadersMsg:: u; a0 A) f+ V9 T0 m
// Decode the complex header query
+ l. F1 E0 {% H! j1 }% R  ^/ m& R: evar query getBlockHeadersData
3 K5 V+ Y! [* |" V+ D! R8 ]if err := msg.Decode(&query); err != nil {
" q4 o; o2 Q5 preturn errResp(ErrDecode, “%v: %v”, msg, err)5 c0 b- u* Q0 ~. j% x2 S
}  {# [. V( T3 s  ?0 v
hashMode := query.Origin.Hash != (common.Hash{})
+ j3 V' f" e. s! C// Gather headers until the fetch or network limits is reached2 ~# |' R- ^5 ^# a
// 收集区块头,直到达到限制( N/ a! y1 C7 `
var (
8 \3 }5 a5 Z4 V! X; f) x        bytes   common.StorageSize
& B9 }1 K, p: [- ]        headers []*types.Header
$ ~1 [/ x5 n' e$ i  S* e5 w        unknown bool
- W; ^" l2 q& {: u8 c- X# u, J), ^% X, I; U0 p5 x
// 自己已知区块 && 少于查询的数量 && 大小小于2MB && 小于能下载的最大数量
5 q! `9 t' z3 ~  n. Hfor !unknown && len(headers) 3 R6 D: m6 K" l6 S
`BlockHeadersMsg`的处理很有意思,因为`GetBlockHeadersMsg`并不是fetcher独占的消息,downloader也可以调用,所以,响应消息的处理需要分辨出是fetcher请求的,还是downloader请求的。它的处理逻辑是:fetcher先过滤收到的区块头,如果fetcher不要的,那就是downloader的,在调用`fetcher.FilterHeaders`的时候,fetcher就将自己要的区块头拿走了。# q3 D  C( L% Y% X/ \# x, m8 K$ m
// handleMsg()
0 |2 {' x& L, ^# Hcase msg.Code == BlockHeadersMsg:
$ a4 f& l3 E/ q3 t9 b6 v// A batch of headers arrived to one of our previous requests
. s$ g4 g5 x( o  b% S3 Lvar headers []*types.Header2 |9 O: Z  z" c: G# F
if err := msg.Decode(&headers); err != nil {
* y/ P' l$ @* G+ w7 B/ ]$ }! ^+ Preturn errResp(ErrDecode, “msg %v: %v”, msg, err)& Y3 P9 Z' N# o6 ~
}% e/ v& N8 ]; I$ x1 D" Y# m8 Y- m2 k3 q
// If no headers were received, but we’re expending a DAO fork check, maybe it’s that8 \' F5 m) V: j! W1 W* J+ |
// 检查是不是当前DAO的硬分叉' D6 I4 V; r. i: D' O# a$ v
if len(headers) == 0 && p.forkDrop != nil {
: g8 [/ F3 T! \7 I/ q& e8 N, C// Possibly an empty reply to the fork header checks, sanity check TDs% \' O. q& }; ^  T* w2 T6 ?
verifyDAO := true. B) F% ]; ]1 p$ [
        // If we already have a DAO header, we can check the peer's TD against it. If
, i: {6 f0 z1 Y$ G$ G; m        // the peer's ahead of this, it too must have a reply to the DAO check
0 s9 o. R) D0 B" v: p! E, X" C        if daoHeader := pm.blockchain.GetHeaderByNumber(pm.chainconfig.DAOForkBlock.Uint64()); daoHeader != nil {/ Y/ l3 N; D9 q2 v; s+ C& A
                if _, td := p.Head(); td.Cmp(pm.blockchain.GetTd(daoHeader.Hash(), daoHeader.Number.Uint64())) >= 0 {
, |% {3 ?( v5 b# l1 B) k" G0 S* y                        verifyDAO = false
1 T1 T& j2 d4 L' N0 {& e                }
8 t* t# k6 M7 o7 o, |, L3 ^* h0 o% ^        }8 E" r; q' s- \- R) f, b, h
        // If we're seemingly on the same chain, disable the drop timer4 N, s8 D. }. c. o
        if verifyDAO {
% ?3 j/ T3 I3 I5 }" I+ P                p.Log().Debug("Seems to be on the same side of the DAO fork")5 A( s1 s5 Q  O; \! T& V$ a
                p.forkDrop.Stop()
# \5 {! ~) ?9 b                p.forkDrop = nil
9 J; |. a$ ]1 c2 r                return nil: k. f% e( j% b' g; C0 z  s. z
        }  @/ E+ E2 o3 Z- A. ?; X
}( W' ?6 a# \( Y3 `# d
// Filter out any explicitly requested headers, deliver the rest to the downloader
0 ~& i' R1 Q) N0 s8 X- e. n* P// 过滤是不是fetcher请求的区块头,去掉fetcher请求的区块头再交给downloader
# H, U4 C7 ]& w, K) F  t% `/ Q/ pfilter := len(headers) == 12 L8 j6 E8 o1 I9 c; D+ a# Q9 G
if filter {
2 |& v4 R4 y' R' w; e        // If it's a potential DAO fork check, validate against the rules- s5 m& h  h6 n0 ]. P5 s
        // 检查是否硬分叉( Q. s5 M& M, z" x0 n: j* t
        if p.forkDrop != nil && pm.chainconfig.DAOForkBlock.Cmp(headers[0].Number) == 0 {  U, b3 d7 H* b! o0 O* @" {( t
                // Disable the fork drop timer
9 j: ^! ^% i; b5 q. H9 Z% Q. m                p.forkDrop.Stop()
1 ?3 d9 b4 ]" c( W                p.forkDrop = nil
4 U# _" }  q. x                // Validate the header and either drop the peer or continue
# {! L$ B: V- L4 H9 g4 }& p7 F                if err := misc.VerifyDAOHeaderExtraData(pm.chainconfig, headers[0]); err != nil {
9 }0 d* [0 _- |" z* }) X0 ?; C                        p.Log().Debug("Verified to be on the other side of the DAO fork, dropping")
, |, u3 @6 v* F8 Z                        return err
- h3 f' q+ {) D: a6 x/ x) `# U                }
* F' D5 Q4 y- [                p.Log().Debug("Verified to be on the same side of the DAO fork")1 M' ^2 C" `% H4 J* ^
                return nil* Q# S  z0 N4 Y$ s  a
        }
+ N5 Y4 r% l, I1 \: g% S; h( @        // Irrelevant of the fork checks, send the header to the fetcher just in case
5 f* p! M2 U! q/ `7 b3 g        // 使用fetcher过滤区块头& m* M1 ?' }- j3 ^7 L  x
        headers = pm.fetcher.FilterHeaders(p.id, headers, time.Now())
- p& j4 N" {! t2 g. J  Y}
- t* L' }5 W8 }: s// 剩下的区块头交给downloader) {  u- G' B, R) y1 V. o5 E" z! }# o
if len(headers) > 0 || !filter {4 ~/ ^" y  C% Z8 D1 }, N
        err := pm.downloader.DeliverHeaders(p.id, headers)8 Q- J* q8 c$ ?1 z8 ?
        if err != nil {
" m3 m5 x( H' v4 g( N5 G  z4 R9 h                log.Debug("Failed to deliver headers", "err", err)$ I! g) T+ W. c  ?+ [8 ^
        }
$ c) Q/ |( s( I, B+ m5 u8 A5 c. F, F}+ g8 H  r  M' N% A- y
`FilterHeaders()`是一个很有大智慧的函数,看起来耐人寻味,但实在妙。它要把所有的区块头,都传递给fetcher协程,还要获取fetcher协程处理后的结果。`fetcher.headerFilter`是存放通道的通道,而`filter`是存放包含区块头过滤任务的通道。它先把`filter`传递给了`headerFilter`,这样`fetcher`协程就在另外一段等待了,而后将`headerFilterTask`传入`filter`,`fetcher`就能读到数据了,处理后,再将数据写回`filter`而刚好被`FilterHeaders`函数处理了,该函数实际运行在`handleMsg()`的协程中。; H$ F. j# H+ n1 G) A
每个Peer都会分配一个ProtocolManager然后处理该Peer的消息,但`fetcher`只有一个事件处理协程,如果不创建一个`filter`,fetcher哪知道是谁发给它的区块头呢?过滤之后,该如何发回去呢?) r2 l/ a! W  a/ }6 e+ q/ f
// FilterHeaders extracts all the headers that were explicitly requested by the fetcher,
4 v$ s7 i+ E% z  U. Z// returning those that should be handled differently.3 J4 J1 c( {% H: K' h8 ]6 y8 _
// 寻找出fetcher请求的区块头
, k% U- ~0 i6 kfunc (f *Fetcher) FilterHeaders(peer string, headers []*types.Header, time time.Time) []*types.Header {  |2 b  }( p' @& b8 [
log.Trace(“Filtering headers”, “peer”, peer, “headers”, len(headers))
) O+ B3 J% |; S6 j0 y: E& I9 r7 t// Send the filter channel to the fetcher% }' z) z5 ~, M" a6 s
// 任务通道+ u! G* C4 H* P+ ?
filter := make(chan *headerFilterTask)
: C1 ~" R+ V- m1 Jselect {5 K! ^1 p1 |/ x: H; y( @
// 任务通道发送到这个通道- t1 [4 d6 E. u% F3 z/ V& @# w! B. m
case f.headerFilter
# w: l/ }; ?8 w) R/ i( A, u. K}
# h0 v% v9 g7 t9 f9 o: i6 K4 }接下来要看f.headerFilter的处理,这段代码有90行,它做了一下几件事:6 l, l: Q. ~6 X1 h0 @$ g
1. 从`f.headerFilter`取出`filter`,然后取出过滤任务`task`。
+ S5 I0 j  z6 Y% u2. 它把区块头分成3类:`unknown`这不是分是要返回给调用者的,即`handleMsg()`, `incomplete`存放还需要获取body的区块头,`complete`存放只包含区块头的区块。遍历所有的区块头,填到到对应的分类中,具体的判断可看18行的注释,记住宏观中将的状态转移图。  c0 W$ `/ V6 s& h% ?1 p2 I# J
3. 把`unknonw`中的区块返回给`handleMsg()`。5 J1 l, Y/ r3 O; Q
4. 把` incomplete`的区块头获取状态移动到`fetched`状态,然后触发定时器,以便去处理complete的区块。( Z+ }8 [7 Q1 v8 N  g! O
5. 把`compelete`的区块加入到`queued`。9 `* O6 M9 l2 k* [/ S/ A  V' P
// fetcher.loop()0 E' O& v# M! s% z4 D6 s. S
case filter :=
. U0 ?1 L6 |; U6 Z; w// Split the batch of headers into unknown ones (to return to the caller),  N4 A3 z" E1 ~0 }1 F
// known incomplete ones (requiring body retrievals) and completed blocks.% H; y1 f2 {3 m) f
// unknown的不是fetcher请求的,complete放没有交易和uncle的区块,有头就够了,incomplete放6 R/ i$ A' Y# P8 X
// 还需要获取uncle和交易的区块# ^2 R/ U3 L! {# ]
unknown, incomplete, complete := []*types.Header{}, []*announce{}, []*types.Block{}
9 x- Z' X$ o% ~- J4 ~$ I3 I// 遍历所有收到的header
. O" J, t: P, a1 O9 a# Xfor _, header := range task.headers {
. S- ^! Y4 Z9 r  V/ R6 W! b. I4 ~5 Y$ N        hash := header.Hash()
( N+ C2 d* B! W' D        // Filter fetcher-requested headers from other synchronisation algorithms
- m0 w' w  {+ t1 t8 j- ?3 S        // 是正在获取的hash,并且对应请求的peer,并且未fetched,未completing,未queued
6 ^- T" I- z7 D& o+ }& W        if announce := f.fetching[hash]; announce != nil && announce.origin == task.peer && f.fetched[hash] == nil && f.completing[hash] == nil && f.queued[hash] == nil {/ r$ K0 i/ F0 j& z  E  x
                // If the delivered header does not match the promised number, drop the announcer8 [( J% S0 ~) E1 R4 Y% K9 f
                // 高度校验,竟然不匹配,扰乱秩序,peer肯定是坏蛋。( A9 |" e7 a4 S; G6 v2 w3 ?/ m
                if header.Number.Uint64() != announce.number {6 D8 L" b: y5 @4 c3 L7 n
                        log.Trace("Invalid block number fetched", "peer", announce.origin, "hash", header.Hash(), "announced", announce.number, "provided", header.Number)/ l1 @; V7 ]1 h
                        f.dropPeer(announce.origin)
# F" c) h. B+ `8 I5 k+ p                        f.forgetHash(hash)- V4 U  G' f6 t
                        continue
! t$ L! i8 a3 m' A" k( [% i9 U                }! T0 Z. d; j3 t. e
                // Only keep if not imported by other means
5 T  Y2 O1 `  q7 g9 D+ t: x                // 本地链没有当前区块( v# D4 O1 S. ?
                if f.getBlock(hash) == nil {0 B* c/ q' S9 X+ y
                        announce.header = header
& f/ T# A& f; I4 Z' L                        announce.time = task.time5 Z' C" \8 v7 n6 m8 @: z
                        // If the block is empty (header only), short circuit into the final import queue! M6 O, O0 C+ b  o
                        // 如果区块没有交易和uncle,加入到complete1 R' j* M8 c3 Q1 B+ P
                        if header.TxHash == types.DeriveSha(types.Transactions{}) && header.UncleHash == types.CalcUncleHash([]*types.Header{}) {, l7 a6 z: m) I8 k9 g% {
                                log.Trace("Block empty, skipping body retrieval", "peer", announce.origin, "number", header.Number, "hash", header.Hash())- _) f  u2 V: e1 q2 e
                                block := types.NewBlockWithHeader(header)& N3 d7 ?( g" c
                                block.ReceivedAt = task.time
9 L. y; }" f+ W% v8 T, {, S7 U                                complete = append(complete, block)
6 z: D/ L- l0 A( ^" E                                f.completing[hash] = announce
' |" f4 }/ y! t  X! X5 V                                continue6 Y( a7 i% y- t. O9 O# K
                        }, o: _( }6 ~: s0 k: p- j3 z
                        // Otherwise add to the list of blocks needing completion
- B" ?5 B8 s5 Y7 z' `: B7 L% K                        // 否则就是不完整的区块/ o. ?6 l5 D* z3 }; t9 b$ @
                        incomplete = append(incomplete, announce)
) U7 i: C2 @5 z6 `                } else {1 [; _4 w& R8 q
                        log.Trace("Block already imported, discarding header", "peer", announce.origin, "number", header.Number, "hash", header.Hash())6 `' H# O5 \8 V. Y
                        f.forgetHash(hash)7 z' m, W3 s; S9 z% Y
                }$ d( C0 W/ |! @, d; r6 s3 b# v5 Z
        } else {7 q3 t$ M1 b7 C$ w, S4 H1 V
                // Fetcher doesn't know about it, add to the return list) A- u7 f* ^$ `; y- B& {; I, K
                // 没请求过的header
" i% Z( @3 R+ ~, ~6 \5 P8 P5 u: w                unknown = append(unknown, header)
/ p! e4 a$ h. y+ l+ o! b; z        }
7 @, T# X( m2 q1 J# \$ z5 N}
1 R0 ~: T  `4 h% `- z// 把未知的区块头,再传递会filter# E& X3 J) x! ?
headerFilterOutMeter.Mark(int64(len(unknown)))
" z8 ~, f; S0 F& E. `; A, eselect {
5 Q3 m/ o0 U0 R6 Rcase filter
+ d/ E( G9 A% D0 {跟随状态图的转义,剩下的工作是`fetched`转移到`completing`        ,上面的流程已经触发了`completeTimer`定时器,超时后就会处理,流程与请求Header类似,不再赘述,此时发送的请求消息是`GetBlockBodiesMsg`,实际调的函数是`RequestBodies`。
% K7 v5 D6 q. _( v! Z// fetcher.loop()
; c( f7 `! w& P+ c' {case
: d5 T3 Q, _8 K5 t// 遍历所有待获取body的announce4 N2 N5 i, }7 Y6 k* C; c
for hash, announces := range f.fetched {$ }# }9 r3 i8 u% \
        // Pick a random peer to retrieve from, reset all others( t! D8 ?$ e5 H2 |/ W
        // 随机选一个Peer发送请求,因为可能已经有很多Peer通知它这个区块了
6 b- Y% {6 H, M& T6 i+ d        announce := announces[rand.Intn(len(announces))]
6 C$ E3 u+ K! D$ l        f.forgetHash(hash)
2 B$ k, c- I8 w6 h        // If the block still didn't arrive, queue for completion6 R8 f7 m: q* `) C" r, d4 _
        // 如果本地没有这个区块,则放入到completing,创建请求2 [0 Q- s  b. B+ D# [
        if f.getBlock(hash) == nil {, `' X# b, Z6 `
                request[announce.origin] = append(request[announce.origin], hash)" S8 Y( E2 w" e9 z
                f.completing[hash] = announce- F3 Q4 y3 Q7 K+ E
        }6 e8 X8 S2 }1 k; x
}" K0 `3 w- [: D0 S
// Send out all block body requests% W3 H0 o! A& N2 n" w# H
// 发送所有的请求,获取body,依然是每个peer一个单独协程# f/ W8 c: q9 t; |
for peer, hashes := range request {
8 o$ X+ R) }2 J5 @2 u1 l/ k0 ]% o' Z        log.Trace("Fetching scheduled bodies", "peer", peer, "list", hashes)
/ {4 {( u) U, _) s" }( z        // Create a closure of the fetch and schedule in on a new thread
/ c* B( _; l- I4 v# S        if f.completingHook != nil {
( I0 `' o' A/ }9 R+ M; y. l0 m' m9 D                f.completingHook(hashes)- o9 f7 Y* V2 L7 C
        }
8 [5 X/ @; y$ ~6 q2 g1 H$ L; O' m        bodyFetchMeter.Mark(int64(len(hashes)))! D: u+ I3 A  n3 L; s8 b4 k( k
        go f.completing[hashes[0]].fetchBodies(hashes)( ]+ A2 w! s  a. ~5 r
}! H$ L: _' g2 G# s. ]0 R
// Schedule the next fetch if blocks are still pending0 q" i8 c1 i' V
f.rescheduleComplete(completeTimer)
9 z. R3 c- s4 [; ~* }' x6 J/ O`handleMsg()`处理该消息也是干净利落,直接获取RLP格式的body,然后发送响应消息。1 c' D, L9 j4 o7 Y
// handleMsg()
5 s$ Y/ }( m2 T. j, ycase msg.Code == GetBlockBodiesMsg:" K" S, w$ R* n! Z0 w' m1 E# j
// Decode the retrieval message
; B# B# a3 E1 a0 {& R+ t4 T1 y7 WmsgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
+ A  z0 W, e8 K/ ]$ Vif _, err := msgStream.List(); err != nil {0 ~3 N  o2 z$ {+ P
return err6 ?+ E( b3 x7 ]6 u5 K! w
}
" @& d  F1 s5 w- J// Gather blocks until the fetch or network limits is reached
0 s: M* O7 c3 Pvar (; {! `. E- x6 A( _+ K4 j  p
hash   common.Hash7 m5 i8 S! i( f  X! K8 r
bytes  int- D( `! N+ `4 E# S# O9 Z
bodies []rlp.RawValue' P, h" a5 b! |( q: i( G7 P6 |
)& D" ~& l2 {" m: y, q8 A' N) j
// 遍历所有请求8 ?  S; \# X1 }" @/ G
for bytes
8 z* a, T4 y8 ~. w响应消息`BlockBodiesMsg`的处理与处理获取header的处理原理相同,先交给fetcher过滤,然后剩下的才是downloader的。需要注意一点,响应消息里只包含交易列表和叔块列表。
# ^% N4 E" h! w// handleMsg()1 |1 U1 f& i3 I! H! m# X& U" C4 j& e
case msg.Code == BlockBodiesMsg:
- [( m2 ~" j* `- U; A7 U0 ]// A batch of block bodies arrived to one of our previous requests
$ ?- w* i  t+ s6 y( P& _% H2 x6 P- v% E8 ~var request blockBodiesData
; _& h0 D/ ?: U! b3 L- gif err := msg.Decode(&request); err != nil {2 [* U/ U1 G$ D2 m* l! r2 ^
return errResp(ErrDecode, “msg %v: %v”, msg, err)1 l. `3 x! u4 t4 o2 `1 K8 X
}3 p/ f' }, R* ?  T5 v6 S
// Deliver them all to the downloader for queuing
8 E0 c9 X7 ~  `/ z7 m4 P// 传递给downloader去处理
& }0 L8 k% {; v8 Htransactions := make([][]*types.Transaction, len(request)). G4 d" W$ z1 u; {! G  ]; L
uncles := make([][]*types.Header, len(request)), K/ N: n7 \/ P# O6 o9 R% B- j9 h
for i, body := range request {4 J- ]" D# i$ [2 F! s% i  y! f
        transactions = body.Transactions
" v9 l$ u4 j( `        uncles = body.Uncles3 [$ m: G% O/ N. h+ I8 J/ |
}
, A  S2 W: V; ?) p// Filter out any explicitly requested bodies, deliver the rest to the downloader0 V; J7 e  U* }6 {
// 先让fetcher过滤去fetcher请求的body,剩下的给downloader
  p' s; u; Q- o' b8 [5 q0 W% X+ r% {- wfilter := len(transactions) > 0 || len(uncles) > 0: N! A  m; C% U; ^* S$ h0 y2 ~
if filter {0 p1 [& N7 S! n, R. t* D% v
        transactions, uncles = pm.fetcher.FilterBodies(p.id, transactions, uncles, time.Now())
+ K* h; J* U7 C" _  m. F9 p}
) r# A: s8 w/ F  o* X" C5 ]- A& o// 剩下的body交给downloader" b( W# G6 P, U9 s
if len(transactions) > 0 || len(uncles) > 0 || !filter {
6 Z8 P& S! d$ J- T4 E1 Z3 G7 w        err := pm.downloader.DeliverBodies(p.id, transactions, uncles)
$ J, {6 l$ L7 `, e$ `2 A        if err != nil {) X+ `7 N' K& @, z
                log.Debug("Failed to deliver bodies", "err", err)& f) V3 s" F1 M6 C7 G) y  U
        }9 \2 R' h/ g0 H7 a: m. S! K
}
. N0 `; `7 S6 r2 s& i% _8 b过滤函数的原理也与Header相同。
6 j" d+ y3 b. f6 ~// FilterBodies extracts all the block bodies that were explicitly requested by& ?  [% a! X' \+ m1 @
// the fetcher, returning those that should be handled differently.
2 O1 r0 z0 D8 W// 过去出fetcher请求的body,返回它没有处理的,过程类型header的处理' x6 x; |* f* A' R) q$ }& P
func (f *Fetcher) FilterBodies(peer string, transactions [][]*types.Transaction, uncles [][]*types.Header, time time.Time) ([][]*types.Transaction, [][]*types.Header) {
( w% T3 c7 \- ?5 T& ?2 Glog.Trace(“Filtering bodies”, “peer”, peer, “txs”, len(transactions), “uncles”, len(uncles))
# h6 P& l" }# F- z// Send the filter channel to the fetcher
( g& p  G7 i" h8 h% T( Z' {* Jfilter := make(chan *bodyFilterTask)
: y. g1 q( \) l, v; C9 |select {1 ?$ ?3 Z/ i6 i- w7 G- [
case f.bodyFilter 2 \; ?( u  R7 Z) w! H& E" [
}1 H+ H: T8 [' K7 k6 A
实际过滤body的处理瞧一下,这和Header的处理是不同的。直接看不点:4 z8 Z3 s" n/ F) T  b4 W, e
1. 它要的区块,单独取出来存到`blocks`中,它不要的继续留在`task`中。8 Y3 r3 n* b1 B% h& e  z2 a8 a
2. 判断是不是fetcher请求的方法:如果交易列表和叔块列表计算出的hash值与区块头中的一样,并且消息来自请求的Peer,则就是fetcher请求的。8 ]7 E) s' E! J# J/ p
3. 将`blocks`中的区块加入到`queued`,终结。
# o1 A  V" z5 L1 G" |& }1 @5 N/ bcase filter :=
; _6 W/ W; X4 K1 y, Cblocks := []*types.Block{}, {( n8 x6 P3 ~( e" Y, S8 ~6 y3 @
// 获取的每个body的txs列表和uncle列表
$ w: b0 _  E+ w* j( c  Y// 遍历每个区块的txs列表和uncle列表,计算hash后判断是否是当前fetcher请求的body. [3 H# j7 z9 c: e7 @' f: z
for i := 0; i * \" Z4 @- g; m! J% h% ?+ c
}
8 E9 }7 V* g* P# k: o/ B% u3 h2 @% M3 |! K
至此,fetcher获取完整区块的流程讲完了,fetcher模块中80%的代码也都贴出来了,还有2个值得看看的函数:6 a" X3 q0 [* M0 I
1. `forgetHash(hash common.Hash)`:用于清空指定hash指的记/状态录信息。
+ m1 j( K4 G3 M2. `forgetBlock(hash common.Hash)`:用于从队列中移除一个区块。
3 I7 G6 B+ j. }3 D3 ~, i- O最后了,再回到开始看看fetcher模块和新区块的传播流程,有没有豁然开朗。& p/ R. {. T$ U% z

$ k- V: _+ ?4 ^$ @
BitMere.com 比特池塘系信息发布平台,比特池塘仅提供信息存储空间服务。
声明:该文观点仅代表作者本人,本文不代表比特池塘立场,且不构成建议,请谨慎对待。
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

成为第一个吐槽的人

刘艳琴 小学生
  • 粉丝

    0

  • 关注

    0

  • 主题

    3