Hi 游客

更多精彩,请登录!

比特池塘 区块链技术 正文

以太坊源码分析:fetcher模块和区块传播

刘艳琴
188 0 0
从区块传播策略入手,介绍新区块是如何传播到远端节点,以及新区块加入到远端节点本地链的过程,同时会介绍fetcher模块,fetcher的功能是处理Peer通知的区块信息。在介绍过程中,还会涉及到p2p,eth等模块,不会专门介绍,而是专注区块的传播和加入区块链的过程。0 g5 h9 N7 N% C' `2 Z& }" i
当前代码是以太坊Release 1.8,如果版本不同,代码上可能存在差异。8 x9 Q" X+ y* @( h! D3 @9 B
总体过程和传播策略
$ z9 N+ o, f. x( w本节从宏观角度介绍,节点产生区块后,为了传播给远端节点做了啥,远端节点收到区块后又做了什么,每个节点都连接了很多Peer,它传播的策略是什么样的?
7 m) a* D, v& b% D总体流程和策略可以总结为,传播给远端Peer节点,Peer验证区块无误后,加入到本地区块链,继续传播新区块信息。具体过程如下。
" [. Y6 [7 i- O6 v先看总体过程。产生区块后,miner模块会发布一个事件NewMinedBlockEvent,订阅事件的协程收到事件后,就会把新区块的消息,广播给它所有的peer,peer收到消息后,会交给自己的fetcher模块处理,fetcher进行基本的验证后,区块没问题,发现这个区块就是本地链需要的下一个区块,则交给blockChain进一步进行完整的验证,这个过程会执行区块所有的交易,无误后把区块加入到本地链,写入数据库,这个过程就是下面的流程图,图1。- O# h: e3 `  R: N0 O

# G) b2 h) F1 w5 O总体流程图,能看到有个分叉,是因为节点传播新区块是有策略的。它的传播策略为:
9 u4 K- g& ^1 W$ o假如节点连接了N个Peer,它只向Peer列表的sqrt(N)个Peer广播完整的区块消息。向所有的Peer广播只包含区块Hash的消息。* E* ]) ]* Z" i; L3 x8 ^5 P  D
策略图的效果如图2,红色节点将区块传播给黄色节点:
/ [3 [( |* z8 L4 G  `1 E
! i" G# @( K. M6 ~. A$ ]) x8 q: a( q' p* J# M( H, R: V
收到区块Hash的节点,需要从发送给它消息的Peer那里获取对应的完整区块,获取区块后就会按照图1的流程,加入到fetcher队列,最终插入本地区块链后,将区块的Hash值广播给和它相连,但还不知道这个区块的Peer。非产生区块节点的策略图,如图3,黄色节点将区块Hash传播给青色节点:/ I/ ]/ R) r$ {0 b6 b3 I+ \9 r

! N. j- {- q4 p  a至此,可以看出以太坊采用以石击水的方式,像水纹一样,层层扩散新产生的区块。
' s2 V$ ^2 a$ t# jFetcher模块是干啥的$ D, ]4 C( B& O! i4 ~' K0 U. N9 p
fetcher模块的功能,就是收集其他Peer通知它的区块信息:1)完整的区块2)区块Hash消息。根据通知的消息,获取完整的区块,然后传递给eth模块把区块插入区块链。0 ]3 O' s2 b1 ^2 x1 u+ o, J  c0 x
如果是完整区块,就可以传递给eth插入区块,如果只有区块Hash,则需要从其他的Peer获取此完整的区块,然后再传递给eth插入区块
, g( h4 R3 `) S
- }  s6 E2 G) @; v. a0 y4 U源码解读0 N1 H# u* ^' [9 l" s* _" @/ t5 K
本节介绍区块传播和处理的细节东西,方式仍然是先用图解释流程,再是代码流程。% N; b! Q9 O  S+ f, M; R  v; @
产块节点的传播新区块
) x6 R0 ~( ^% y9 k# b节点产生区块后,广播的流程可以表示为图4:
& c* N. n( ^) {% ^1 I发布事件事件处理函数选择要广播完整的Peer,然后将区块加入到它们的队列事件处理函数把区块Hash添加到所有Peer的另外一个通知队列每个Peer的广播处理函数,会遍历它的待广播区块队列和通知队列,把数据封装成消息,调用P2P接口发送出去
" R% a+ H, ]0 i8 Q" x
  E. o5 g: K6 \' {7 z* O
- x: t  z. q& u: g/ C再看下代码上的细节。
# y# `8 c- B8 M7 z8 c9 a$ rworker.wait()函数发布事件NewMinedBlockEvent。ProtocolManager.minedBroadcastLoop()是事件处理函数。它调用了2次pm.BroadcastBlock()。  x: G3 H+ D) U" {
* A( b& D) @5 g/ ^, H8 ]
// Mined broadcast loop
" z- H1 S' j* L2 e  t: w1 |func (pm *ProtocolManager) minedBroadcastLoop() {2 t4 \4 h6 r+ s% N: l' M
        // automatically stops if unsubscribe* `) @5 a3 J3 L- {& q  j# ~
        for obj := range pm.minedBlockSub.Chan() {
8 g! ?# s$ n7 c8 ]( a                switch ev := obj.Data.(type) {
( a/ _0 Y9 U' J# R" F                case core.NewMinedBlockEvent:
% g# X' t0 x0 Z" w' ]                        pm.BroadcastBlock(ev.Block, true)  // First propagate block to peers
) E$ o: M. O$ t8 E3 v& z                        pm.BroadcastBlock(ev.Block, false) // Only then announce to the rest
6 @- u- J6 ]1 H5 Q                }0 T) I1 p8 ]8 w# |, b
        }" l' D7 G, B. V: z# u/ O! D# b4 r# @
}8 p- N. T( I& a. Y, R
pm.BroadcastBlock()的入参propagate为真时,向部分Peer广播完整的区块,调用peer.AsyncSendNewBlock(),否则向所有Peer广播区块头,调用peer.AsyncSendNewBlockHash(),这2个函数就是把数据放入队列,此处不再放代码。
' Y9 h" f0 q" m, |! `// BroadcastBlock will either propagate a block to a subset of it's peers, or% d+ G( d4 A2 w" M
// will only announce it's availability (depending what's requested).' R, E" `7 W% z: E
func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) {
% `* k0 ?( _7 k" f4 {  c        hash := block.Hash()% P1 C6 J2 |6 o5 i& v4 k- |+ f* r
        peers := pm.peers.PeersWithoutBlock(hash)
2 e+ p9 r# O7 {( k% z        // If propagation is requested, send to a subset of the peer
8 `, z' `" r7 A. S( y        // 这种情况,要把区块广播给部分peer
  J4 |/ h3 J, F) m. h        if propagate {
1 D5 E! g% @- p# P1 w) h                // Calculate the TD of the block (it's not imported yet, so block.Td is not valid)
* D5 o: r2 [) J                // 计算新的总难度
! A+ Z6 S. A( M/ I% l                var td *big.Int) Y; L3 Z9 E" k5 X
                if parent := pm.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1); parent != nil {3 x1 J% F) [/ j2 }
                        td = new(big.Int).Add(block.Difficulty(), pm.blockchain.GetTd(block.ParentHash(), block.NumberU64()-1))4 ?6 ~: Y& D6 j! g2 @4 ]1 s
                } else {
- U! y; B6 v4 U8 z- Q                        log.Error("Propagating dangling block", "number", block.Number(), "hash", hash)1 n6 f( `" F: [
                        return* ?1 J" H. C4 y+ b. N
                }+ `# S; h$ b! x5 o
                // Send the block to a subset of our peers
7 s0 j4 o6 N+ A* J- P                // 广播区块给部分peer/ m( S7 }- t! l* x' E
                transfer := peers[:int(math.Sqrt(float64(len(peers))))]
5 d2 D2 ~# n" Q# L                for _, peer := range transfer {( `0 p5 b) K  s3 X: g  {
                        peer.AsyncSendNewBlock(block, td)) z1 E  r/ x2 A6 S. n8 N
                }" ]0 ?3 p7 h7 V, ^* A- w5 f3 X
                log.Trace("Propagated block", "hash", hash, "recipients", len(transfer), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))* }% J+ Z! c, ]. Z& `
                return  @7 [( ^. O- c& n! T8 Y
        }
4 v( a9 ]3 W% U! N! I% w        // Otherwise if the block is indeed in out own chain, announce it
. n2 I( u. U; z: K3 d' k$ q        // 把区块hash值广播给所有peer
9 R6 {5 C. U  J2 ?2 _0 ?- s        if pm.blockchain.HasBlock(hash, block.NumberU64()) {! `: H5 q7 A  U7 E2 [4 c
                for _, peer := range peers {' i- E5 }2 t! k1 B4 A9 l
                        peer.AsyncSendNewBlockHash(block)
/ m; X! k9 |: {  \- J                }
6 B/ |1 w% h) i5 s+ I                log.Trace("Announced block", "hash", hash, "recipients", len(peers), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))5 \) T4 A& R$ @" ]* k  F+ ?) _
        }" [+ d3 V6 Z. B8 n7 r2 Y: Y+ r" [
}
% m; J& X. c- Tpeer.broadcase()是每个Peer连接的广播函数,它只广播3种消息:交易、完整的区块、区块的Hash,这样表明了节点只会主动广播这3中类型的数据,剩余的数据同步,都是通过请求-响应的方式。. W, F* `2 Y& A7 h  g" Z5 b
// broadcast is a write loop that multiplexes block propagations, announcements! ~  Y, e8 L' P; I
// and transaction broadcasts into the remote peer. The goal is to have an async( M" w( A. i: R5 h
// writer that does not lock up node internals.
( d, r7 Y: B0 Ofunc (p *peer) broadcast() {: M& C% q4 q( R* y1 B
        for {) E0 h+ n) c$ `/ Y# h
                select {6 k& D$ R8 g1 }+ j! I% J
                // 广播交易" V( r( X) f( ^% G
                case txs :=
% v* S# M: w' L, C& ^Peer节点处理新区块
7 U8 _; B6 E5 \/ K5 z$ p本节介绍远端节点收到2种区块同步消息的处理,其中NewBlockMsg的处理流程比较清晰,也简洁。NewBlockHashesMsg消息的处理就绕了2绕,从总体流程图1上能看出来,它需要先从给他发送消息Peer那里获取到完整的区块,剩下的流程和NewBlockMsg又一致了。" x# Y, m$ D2 ]# ?
这部分涉及的模块多,画出来有种眼花缭乱的感觉,但只要抓住上面的主线,代码看起来还是很清晰的。通过图5先看下整体流程。, M; f4 @: _/ T3 n. ^6 X
消息处理的起点是ProtocolManager.handleMsg,NewBlockMsg的处理流程是蓝色标记的区域,红色区域是单独的协程,是fetcher处理队列中区块的流程,如果从队列中取出的区块是当前链需要的,校验后,调用blockchian.InsertChain()把区块插入到区块链,最后写入数据库,这是黄色部分。最后,绿色部分是NewBlockHashesMsg的处理流程,代码流程上是比较复杂的,为了能通过图描述整体流程,我把它简化掉了。1 Q9 o# z/ D3 b+ P
* R& r( e1 S. Q2 I! T
仔细看看这幅图,掌握整体的流程后,接下来看每个步骤的细节。
; u; O$ [' `: M( F" fNewBlockMsg的处理3 z9 Y2 [% M& N: I( {9 S
本节介绍节点收到完整区块的处理,流程如下:+ [' B9 z4 {6 v5 p% S' h. H! H
首先进行RLP编解码,然后标记发送消息的Peer已经知道这个区块,这样本节点最后广播这个区块的Hash时,不会再发送给该Peer。
' u5 B0 _0 j3 K! P6 J将区块存入到fetcher的队列,调用fetcher.Enqueue。9 h2 H+ w; z- W( [. {
更新Peer的Head位置,然后判断本地链是否落后于Peer的链,如果是,则通过Peer更新本地链。/ y5 Z+ h" L+ ]2 B  S
只看handle.Msg()的NewBlockMsg相关的部分。
, H( r7 U( i+ Kcase msg.Code == NewBlockMsg:
0 ^/ b% ?0 g- P" J1 E        // Retrieve and decode the propagated block
( V" X- m. R6 J4 P2 l: o        // 收到新区块,解码,赋值接收数据0 X9 t, Y) W- v5 Q9 O
        var request newBlockData
" z) f) n9 d6 P; C/ I4 j        if err := msg.Decode(&request); err != nil {, C, y$ x8 z. L# V! c, W! p8 s+ o
                return errResp(ErrDecode, "%v: %v", msg, err)
( x0 R' t# W/ n        }
( f! b; v. X8 b( a" h9 k/ v! T        request.Block.ReceivedAt = msg.ReceivedAt
$ c; z3 a' R5 |" I        request.Block.ReceivedFrom = p
; f5 o5 L% Y. @1 w' l0 g        // Mark the peer as owning the block and schedule it for import
% b, {6 S6 \! M! z        // 标记peer知道这个区块
1 L" O* w! N/ ~' i2 @8 K& }$ t* l- {        p.MarkBlock(request.Block.Hash())
. V8 b5 f% n7 e. L% W5 s  e6 d. i8 y        // 为啥要如队列?已经得到完整的区块了) i2 c( T+ U; X7 B9 x" I
        // 答:存入fetcher的优先级队列,fetcher会从队列中选取当前高度需要的块
# I) i1 |" [2 W# V8 @  F$ L        pm.fetcher.Enqueue(p.id, request.Block)
/ q4 g, \* {7 ^+ w' K        // Assuming the block is importable by the peer, but possibly not yet done so,
9 K+ f8 u9 T- [; O. @: g        // calculate the head hash and TD that the peer truly must have.9 |, U. P0 \; w1 R# n! F
        // 截止到parent区块的头和难度" |& l: u* D1 S( X; |8 D: @5 j  G
        var ($ D6 h; _( K7 {
                trueHead = request.Block.ParentHash()
. D  Q- O1 G& v8 P. W8 J                trueTD   = new(big.Int).Sub(request.TD, request.Block.Difficulty())) j9 T2 g5 ]2 ?9 S
        )1 |; g( p" ?5 B7 @$ {' M; t9 x
        // Update the peers total difficulty if better than the previous
4 {9 n0 ~1 ?! D5 v6 r7 N        // 如果收到的块的难度大于peer之前的,以及自己本地的,就去和这个peer同步
: x4 e, {/ a  g9 M" O+ I4 Y        // 问题:就只用了一下块里的hash指,为啥不直接使用这个块呢,如果这个块不能用,干嘛不少发送些数据,减少网络负载呢。
6 ~; K1 q8 H2 A% j' O' e        // 答案:实际上,这个块加入到了优先级队列中,当fetcher的loop检查到当前下一个区块的高度,正是队列中有的,则不再向peer请求$ ?- i  X. \1 N! |6 a
        // 该区块,而是直接使用该区块,检查无误后交给block chain执行insertChain
8 T/ h1 D# ^; Q: t& b        if _, td := p.Head(); trueTD.Cmp(td) > 0 {2 R# a2 d# m3 G# b" a7 |
                p.SetHead(trueHead, trueTD)
# q! r; a4 X: Q6 i2 o: n                // Schedule a sync if above ours. Note, this will not fire a sync for a gap of7 u9 r" v" a2 W: y
                // a singe block (as the true TD is below the propagated block), however this
7 \2 ^& t0 t7 _: ~                // scenario should easily be covered by the fetcher.
) G+ m0 d) y  p" ~+ i                currentBlock := pm.blockchain.CurrentBlock()  I2 u. |( {% J4 V0 C$ {' X8 ?$ h
                if trueTD.Cmp(pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64())) > 0 {$ o0 {% n& x  {8 C5 O
                        go pm.synchronise(p)
8 _) `9 X- i% G                }4 z* |& r& @# S0 [/ T& D& d- R
        }# j' f) Q. x0 d- H9 i+ a9 Y
//------------------------ 以上 handleMsg1 _) q1 \% ~. V4 z+ ?
// Enqueue tries to fill gaps the the fetcher's future import queue." e0 e& q& e: C4 u
// 发给inject通道,当前协程在handleMsg,通过通道发送给fetcher的协程处理+ }' H6 K9 t; Q' S6 O8 M& k
func (f *Fetcher) Enqueue(peer string, block *types.Block) error {4 y: t* G& v! {, U5 @
        op := &inject{
0 m6 f# ^% u' {$ I, R2 b- ^9 v                origin: peer,
! T3 E6 z* ~( t% j1 C5 l1 O                block:  block,! V, K1 }) a! S0 N( m6 ^
        }0 G. y8 e7 w* E# d4 s7 j8 i
        select {# X# ?- a, x& f2 p2 O
        case f.inject  blockLimit {9 y* J+ Q, E5 k8 l* y5 A) I
                log.Debug("Discarded propagated block, exceeded allowance", "peer", peer, "number", block.Number(), "hash", hash, "limit", blockLimit)
: ?- N% ]5 A( G1 R                propBroadcastDOSMeter.Mark(1)7 s4 E* B: M( W1 J
                f.forgetHash(hash)
; W! O4 `# F* y: i9 K* U                return
+ Z- K; d* ]& H        }; M$ n* E- n, j$ I" x8 m
        // Discard any past or too distant blocks
& x: K- l; i+ F9 m; i, f        // 高度检查:未来太远的块丢弃
* m7 m( R# R4 L        if dist := int64(block.NumberU64()) - int64(f.chainHeight()); dist  maxQueueDist {
/ v! s7 N) P$ p/ o% g                log.Debug("Discarded propagated block, too far away", "peer", peer, "number", block.Number(), "hash", hash, "distance", dist)
) j9 j" ~3 |7 {# ^- v+ h9 g) V( V                propBroadcastDropMeter.Mark(1)) ?1 \, a* r6 U7 _* W
                f.forgetHash(hash). R1 o- C6 d" H8 ~7 c6 N" G' h
                return7 ?$ Q. e1 ^# |$ a3 z1 p
        }% r2 j# O! `& }1 N$ Z2 d5 d
        // Schedule the block for future importing& G* O+ |% n8 ^
        // 块先加入优先级队列,加入链之前,还有很多要做
0 |% }, {1 T/ @$ V4 h9 n        if _, ok := f.queued[hash]; !ok {7 _" ~: ?- {: O9 s, A( J
                op := &inject{
9 d# G! m% e) W. k8 ?                        origin: peer,
% z! g% b$ P7 N5 g8 Q/ l                        block:  block,
1 x- Q( S* b  K5 A                }
) X; y0 `/ ^' u- L% s                f.queues[peer] = count
4 k5 _3 |. }, I% c) e+ l! q2 J                f.queued[hash] = op* J  Z7 J) x& k8 w/ A
                f.queue.Push(op, -float32(block.NumberU64()))
7 `1 X2 X9 ~4 X                if f.queueChangeHook != nil {* S' [  ^/ d5 a; l! P% E
                        f.queueChangeHook(op.block.Hash(), true)  ~0 Q4 O# s  B: n6 T2 j
                }
7 F% c+ m7 _0 {7 f0 n5 w  f* q+ x                log.Debug("Queued propagated block", "peer", peer, "number", block.Number(), "hash", hash, "queued", f.queue.Size())
' l* n6 U4 I  m        }# w8 N6 j) i5 q' R. l5 E1 t% P3 j
}
8 G7 ]( f# ?" d$ Y5 A& Ffetcher队列处理
* r2 p# u: t$ H+ n本节我们看看,区块加入队列后,fetcher如何处理区块,为何不直接校验区块,插入到本地链?
7 M; u; w* B/ {% c2 B由于以太坊又Uncle的机制,节点可能收到老一点的一些区块。另外,节点可能由于网络原因,落后了几个区块,所以可能收到“未来”的一些区块,这些区块都不能直接插入到本地链。
! H' Q$ o' R0 j; j区块入的队列是一个优先级队列,高度低的区块会被优先取出来。fetcher.loop是单独协程,不断运转,清理fecther中的事务和事件。首先会清理正在fetching的区块,但已经超时。然后处理优先级队列中的区块,判断高度是否是下一个区块,如果是则调用f.insert()函数,校验后调用BlockChain.InsertChain(),成功插入后,广播新区块的Hash& z" j$ N) P9 G- ^* X5 N
// Loop is the main fetcher loop, checking and processing various notification/ Y9 ]. `4 U, ~: ^6 a% ^! J+ ]8 f, P
// events.0 C$ S; \+ p" f, l8 t. H7 j7 x" M3 I
func (f *Fetcher) loop() {7 I- b% }1 Y2 U9 n. T
        // Iterate the block fetching until a quit is requested
% [5 G2 M  T' ?* M; s        fetchTimer := time.NewTimer(0)3 q7 d* O8 X; O" _
        completeTimer := time.NewTimer(0)" s( T, q6 v$ ~8 ]# s0 O# ]
        for {5 c: F7 a* L/ Y
                // Clean up any expired block fetches' c+ z6 d5 c/ _' r6 d0 B/ N5 u. O* Q" j
                // 清理过期的区块; Y! b* P& Q% M
                for hash, announce := range f.fetching {
2 b$ a& a& e0 N! ]" ?                        if time.Since(announce.time) > fetchTimeout {
$ N# ~1 x9 g9 p3 n. l                                f.forgetHash(hash)
' J6 r+ K+ d" [                        }
$ Q3 @6 c* O8 O8 L                }, s! r7 {1 U0 Y" Y
                // Import any queued blocks that could potentially fit' n3 ^5 C- h  h2 L" S% G3 L
                // 导入队列中合适的块; P0 y8 |0 K; m. P1 I& j
                height := f.chainHeight()
( N1 p5 c8 D8 S% E! |. U                for !f.queue.Empty() {0 H$ X! m4 N/ ]
                        op := f.queue.PopItem().(*inject)) U4 H2 T0 t$ r8 l1 n' X3 u3 Z8 M. T
                        hash := op.block.Hash()0 W7 x6 k# x3 d( _) h8 {
                        if f.queueChangeHook != nil {
: m% V8 W! E; X9 p                                f.queueChangeHook(hash, false)- V; `$ n! S; Y% g: \# @. P0 u
                        }
. I$ p5 V/ [/ ^8 `- Z  x                        // If too high up the chain or phase, continue later" n% j( I4 g& I9 e
                        // 块不是链需要的下一个块,再入优先级队列,停止循环
9 k: y+ v% X# I  l0 ^                        number := op.block.NumberU64()
2 a" D" A, @$ p5 Y3 R                        if number > height+1 {
1 g7 g7 B' s3 K4 u" {                                f.queue.Push(op, -float32(number))$ E6 i; ^3 g5 M) f) m6 ~
                                if f.queueChangeHook != nil {
1 U3 e4 ~8 h/ ?+ R                                        f.queueChangeHook(hash, true)
  O- _6 @, I1 H$ q* D, G  a                                }
8 G4 E( ?4 U+ V6 Q7 x3 h- S4 q% m! Z                                break
1 w3 c+ U# Q, ?1 I+ R( L                        }
* v3 n( n7 _$ r! |6 _                        // Otherwise if fresh and still unknown, try and import* ~; [7 u( ^4 \
                        // 高度正好是我们想要的,并且链上也没有这个块3 k% `9 u. k5 @
                        if number+maxUncleDist
% [3 z: S, O5 z6 T& W; i8 Gfunc (f *Fetcher) insert(peer string, block *types.Block) {
; m5 `, J) k( `4 u        hash := block.Hash()% ~+ h* w: C7 q* ~* P" S2 O
        // Run the import on a new thread5 o: ^$ S6 @0 _" F' [
        log.Debug("Importing propagated block", "peer", peer, "number", block.Number(), "hash", hash)
6 V6 ^+ M4 U; F/ B8 R4 C, x# I( Q        go func() {
0 ]' a# ]& K1 S! ], W0 I! k( Q, K                defer func() { f.done
0 J8 Z" p9 j. @" eNewBlockHashesMsg的处理" `+ l. I8 A  Q) p+ z9 z( C1 ]
本节介绍NewBlockHashesMsg的处理,其实,消息处理是简单的,而复杂一点的是从Peer哪获取完整的区块,下节再看。, j. B- Y8 k/ T- n# S. L4 i4 I
流程如下:
, ~# l7 W- m1 S( L3 p- d, j" ~) Q对消息进行RLP解码,然后标记Peer已经知道此区块。寻找出本地区块链不存在的区块Hash值,把这些未知的Hash通知给fetcher。fetcher.Notify记录好通知信息,塞入notify通道,以便交给fetcher的协程。fetcher.loop()会对notify中的消息进行处理,确认区块并非DOS攻击,然后检查区块的高度,判断该区块是否已经在fetching或者comleting(代表已经下载区块头,在下载body),如果都没有,则加入到announced中,触发0s定时器,进行处理。8 |/ ^8 q+ n5 E& F
关于announced下节再介绍。
0 V) l/ V1 `3 J) O( |$ j
& s5 f/ M; @' v$ n3 k// handleMsg()部分5 k% l8 n" ]5 T* a( u6 b; u
case msg.Code == NewBlockHashesMsg:
& J: V" b% `  V( q0 K. R9 `        var announces newBlockHashesData' f# ^4 J" x" o/ g( C( R1 j
        if err := msg.Decode(&announces); err != nil {7 c7 G& p# A# s
                return errResp(ErrDecode, "%v: %v", msg, err)
2 a& s: t* N! U/ Y% V' k        }
/ W- b- {9 _# P! a) s, z5 z. V8 \        // Mark the hashes as present at the remote node% Q' \; o( N, ]: O' T6 Y
        for _, block := range announces {
* }, l! A# i/ P& g                p.MarkBlock(block.Hash)
4 N7 m; M, t8 s- [        }
, x# h0 |1 m& g" e' ^4 s  b  [        // Schedule all the unknown hashes for retrieval+ _; ^( V3 a1 u* u
        // 把本地链没有的块hash找出来,交给fetcher去下载
0 w1 b3 U7 v- ~( K2 S% I        unknown := make(newBlockHashesData, 0, len(announces))  g3 [& V* z6 E7 w9 @" T
        for _, block := range announces {: ?! F4 Z/ Y3 a  e
                if !pm.blockchain.HasBlock(block.Hash, block.Number) {1 R' G( [! f, g
                        unknown = append(unknown, block). p! R( z) D  m/ v9 W3 B
                }
; K$ T3 @$ X$ l        }
+ I6 R; \" ~+ M7 V- F        for _, block := range unknown {
+ b1 Z, I- v2 Q$ S. `. x" x0 ^                pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestOneHeader, p.RequestBodies)" L- B6 D2 j% w# U0 d2 ?, C/ x0 v
        }
8 I, j% E. |+ q( c2 D5 X8 z" a// Notify announces the fetcher of the potential availability of a new block in
3 j% R, [6 _/ ^: _2 u8 P- ~// the network.5 }+ N$ |$ h+ E: O" @# S* N0 N
// 通知fetcher(自己)有新块产生,没有块实体,有hash、高度等信息/ F2 N) s) \3 `* L0 f/ p1 G" `% ^9 `
func (f *Fetcher) Notify(peer string, hash common.Hash, number uint64, time time.Time,
* j& j8 w; P5 r  ?% K        headerFetcher headerRequesterFn, bodyFetcher bodyRequesterFn) error {+ F( @7 Q3 Y' B: O8 c
        block := &announce{
! p9 B/ e+ C; _5 a8 m: m  F                hash:        hash,3 y" _/ v- a3 I
                number:      number,
1 {) {' A. d* A. W& P+ n                time:        time,
) s! D9 z5 |' B1 v                origin:      peer,
' o2 B. L8 u) t                fetchHeader: headerFetcher,4 U' S  p8 v9 a
                fetchBodies: bodyFetcher,. r* t6 K- U0 A3 m; w+ m  [. h
        }
; t& h. P* n5 k6 b3 O3 M4 N0 x        select {
8 |8 }' ^3 X6 i0 q' W        case f.notify  hashLimit {
% S7 U. h1 J& t+ Y. o- _                log.Debug("Peer exceeded outstanding announces", "peer", notification.origin, "limit", hashLimit)
; i2 c! S$ R/ }- }7 l                propAnnounceDOSMeter.Mark(1)6 F, y) f( x- Q% g- d# l
                break. G0 W4 v- ]1 S1 n9 D
        }
# r" V1 P0 x" V6 X        // If we have a valid block number, check that it's potentially useful
) w1 E* a/ u* {/ h        // 高度检查+ L( e  ^3 d9 ~' X- d* ~, ]
        if notification.number > 0 {+ z7 Q( N" P% g+ {. m, f
                if dist := int64(notification.number) - int64(f.chainHeight()); dist  maxQueueDist {
- g& Q* t- X# P                        log.Debug("Peer discarded announcement", "peer", notification.origin, "number", notification.number, "hash", notification.hash, "distance", dist)" Z, C" Z" J2 k2 Q
                        propAnnounceDropMeter.Mark(1)+ a# m* m/ ]$ ?' p% _
                        break: ^& q% u0 k: v% g
                }
  `9 t5 a5 t5 t# l/ A0 |        }
& f* y2 |) ]- W9 N. g+ I3 a+ s        // All is well, schedule the announce if block's not yet downloading! I$ t/ S  m6 B
        // 检查是否已经在下载,已下载则忽略0 A% b# _; b4 A" Z4 j- g4 G
        if _, ok := f.fetching[notification.hash]; ok {
  j$ u2 x/ p0 z+ {. d' K. R                break
2 H: `0 I/ E, i/ ?2 E' h1 d        }
- f7 X- ~1 U! X9 ?2 p0 w. w& R9 \        if _, ok := f.completing[notification.hash]; ok {& g  d; C  Y' p% q2 `
                break" C5 ~4 X, J7 x
        }
& h: e+ e' s: x        // 更新peer已经通知给我们的区块数量
/ i8 E! E: G3 g7 s        f.announces[notification.origin] = count. n* Z5 R" }0 g8 z
        // 把通知信息加入到announced,供调度
- T3 E: S: s9 R2 @        f.announced[notification.hash] = append(f.announced[notification.hash], notification): ?* w8 |9 V0 j! P- p( ~3 `
        if f.announceChangeHook != nil && len(f.announced[notification.hash]) == 1 {% r2 y: }7 C" r# n  n
                f.announceChangeHook(notification.hash, true)- W: y$ M+ b- e! C& Z( ]* g7 d
        }
* \) |$ ^$ @$ A& M; c$ `; a        if len(f.announced) == 1 {
) Y: E/ \9 Q7 A: ~" _1 e8 |- P. j                // 有通知放入到announced,则重设0s定时器,loop的另外一个分支会处理这些通知$ ^7 t. _7 d8 j6 ~# A
                f.rescheduleFetch(fetchTimer)' x. O% C5 X% ]# p2 C
        }  h) f- {8 g% b3 k& e
fetcher获取完整区块6 M* y* O0 E! |8 B. K2 L1 M
本节介绍fetcher获取完整区块的过程,这也是fetcher最重要的功能,会涉及到fetcher至少80%的代码。单独拉放一大节吧。
8 X4 q5 D; o) |/ c3 i' e/ ZFetcher的大头2 m  `2 ?# _7 Y/ `1 G7 G& {8 S
Fetcher最主要的功能就是获取完整的区块,然后在合适的实际交给InsertChain去验证和插入到本地区块链。我们还是从宏观入手,看Fetcher是如何工作的,一定要先掌握好宏观,因为代码层面上没有这么清晰。5 m3 S% B1 x0 W
宏观$ P& g) o: \" d9 z  O9 t
首先,看两个节点是如何交互,获取完整区块,使用时序图的方式看一下,见图6,流程很清晰不再文字介绍。2 v( b" v0 B# M' o; u
' `% Y+ M1 s* B& V
再看下获取区块过程中,fetcher内部的状态转移,它使用状态来记录,要获取的区块在什么阶段,见图7。我稍微解释一下:1 C- l/ x/ T6 J% v& m+ C& D
收到NewBlockHashesMsg后,相关信息会记录到announced,进入announced状态,代表了本节点接收了消息。announced由fetcher协程处理,经过校验后,会向给他发送消息的Peer发送请求,请求该区块的区块头,然后进入fetching状态。获取区块头后,如果区块头表示没有交易和uncle,则转移到completing状态,并且使用区块头合成完整的区块,加入到queued优先级队列。获取区块头后,如果区块头表示该区块有交易和uncle,则转移到fetched状态,然后发送请求,请求交易和uncle,然后转移到completing状态。收到交易和uncle后,使用头、交易、uncle这3个信息,生成完整的区块,加入到队列queued。
* d1 R: {$ x" u3 w) k9 {) `& |% f( m& I* \, ]

7 b# ]5 q, V9 p, V6 p' B; x; r6 l微观
& k" C) Z% \7 w% W0 p接下来就是从代码角度看如何获取完整区块的流程了,有点多,看不懂的时候,再回顾下上面宏观的介绍图。
& X1 B) e8 m# v5 A3 T首先看Fetcher的定义,它存放了通信数据和状态管理,捡加注释的看,上文提到的状态,里面都有。
9 k7 ?3 s9 x" ?7 J% _// Fetcher is responsible for accumulating block announcements from various peers5 {: @2 r4 Z0 x9 _( x
// and scheduling them for retrieval.3 b! b4 M# h! A" K- v
// 积累块通知,然后调度获取这些块
3 M: z0 Z) J1 ytype Fetcher struct {3 m8 c, V5 p* B( W% {) V# e
        // Various event channels
! \" O5 F' y5 U8 I  Y* W4 x    // 收到区块hash值的通道
  h9 t: ]: i5 n  i9 z        notify chan *announce% E! z9 y, O2 ^4 |" D/ d1 X* p
    // 收到完整区块的通道
$ H1 I: r/ p3 R3 E( p+ J        inject chan *inject
  C' ]0 |" s. B) U$ _        blockFilter chan chan []*types.Block. A1 k0 J' m' \
        // 过滤header的通道的通道
8 q1 L/ a4 ]) a; L( g. N        headerFilter chan chan *headerFilterTask) W# a& M9 y6 t1 t! M
        // 过滤body的通道的通道6 K7 @; C- T6 Q9 r
        bodyFilter chan chan *bodyFilterTask0 s1 h/ V. r3 j8 q3 }* O" c
        done chan common.Hash
; d& ^. q* z" z* f) y+ [& Y# W7 P2 I        quit chan struct{}
0 C+ F9 r) y; [        // Announce states+ ~' ~* Z* j4 U1 x6 e6 L
        // Peer已经给了本节点多少区块头通知
8 L% ^& o% n. @$ O; o9 Q        announces map[string]int // Per peer announce counts to prevent memory exhaustion
& M. z% \' ?) Q% ~. V8 V: Y8 A        // 已经announced的区块列表% i, s# U! ^" W9 u: b* x
        announced map[common.Hash][]*announce // Announced blocks, scheduled for fetching. E9 |! Q* }6 A( \# z
        // 正在fetching区块头的请求& f* l( c$ V5 c! E! N; a$ }8 o
        fetching map[common.Hash]*announce // Announced blocks, currently fetching: ?( L4 U  ?, ]+ h$ J
        // 已经fetch到区块头,还差body的请求,用来获取body
$ w% Z& n3 B: A  T& Y$ y; z+ W        fetched map[common.Hash][]*announce // Blocks with headers fetched, scheduled for body retrieval/ T2 i% @  \$ V. T- b! c- x" W
        // 已经得到区块头的& X0 ?+ [! f+ V* V0 v: |% g
        completing map[common.Hash]*announce // Blocks with headers, currently body-completing* i" H6 X5 F1 V# F1 Z/ F
        // Block cache
4 z: J. d, Y/ X) d  v' T( F& h        // queue,优先级队列,高度做优先级$ F+ Z: B8 d! r. U
        // queues,统计peer通告了多少块9 a4 T" W7 Y9 _; I4 M
        // queued,代表这个块如队列了," B  R; e7 _. v+ u/ V: L& N: E2 E4 e2 v
        queue  *prque.Prque            // Queue containing the import operations (block number sorted)( s: O" v. ?* y7 Q$ A1 F0 _
        queues map[string]int          // Per peer block counts to prevent memory exhaustion
! z7 d5 z0 Q# b% L! U; F$ |        queued map[common.Hash]*inject // Set of already queued blocks (to dedupe imports)
# Q2 [% [$ T0 w. t9 _% J' t        // Callbacks
/ ^, \9 f0 o" L9 b: j        getBlock       blockRetrievalFn   // Retrieves a block from the local chain
" B! F9 P: e  _* h% D0 t        verifyHeader   headerVerifierFn   // Checks if a block's headers have a valid proof of work,验证区块头,包含了PoW验证
; _( O6 q' o  p        broadcastBlock blockBroadcasterFn // Broadcasts a block to connected peers,广播给peer
! L. P7 S$ z/ D3 H5 i        chainHeight    chainHeightFn      // Retrieves the current chain's height! W( \8 v9 K" _; C# P! V1 N! U) a
        insertChain    chainInsertFn      // Injects a batch of blocks into the chain,插入区块到链的函数- _+ s0 ^0 ?" ?1 t9 s
        dropPeer       peerDropFn         // Drops a peer for misbehaving3 Q3 J5 m7 o; n4 `. p
        // Testing hooks: h5 r( O' @' t" s/ ]" H
        announceChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a hash from the announce list
, E2 l4 T6 O# y0 ~9 }        queueChangeHook    func(common.Hash, bool) // Method to call upon adding or deleting a block from the import queue: t# H+ T8 k/ d! R+ K. f
        fetchingHook       func([]common.Hash)     // Method to call upon starting a block (eth/61) or header (eth/62) fetch: m% n* Z' ?( {1 N: Q* {
        completingHook     func([]common.Hash)     // Method to call upon starting a block body fetch (eth/62)
4 `5 T0 }  R1 t2 q/ n; }' `* F: l7 r        importedHook       func(*types.Block)      // Method to call upon successful block import (both eth/61 and eth/62)
+ T4 p0 \$ c0 O- k, v% k8 ^}
1 S2 n/ W+ T; T, i; o: K. U4 Y  q5 ]NewBlockHashesMsg消息的处理前面的小节已经讲过了,不记得可向前翻看。这里从announced的状态处理说起。loop()        中,fetchTimer超时后,代表了收到了消息通知,需要处理,会从announced中选择出需要处理的通知,然后创建请求,请求区块头,由于可能有很多节点都通知了它某个区块的Hash,所以随机的从这些发送消息的Peer中选择一个Peer,发送请求的时候,为每个Peer都创建了单独的协程。$ c: n9 e. s+ m# B8 k
case  arriveTimeout-gatherSlack {9 ^  m/ }! ?8 w2 T7 [! ]
                        // Pick a random peer to retrieve from, reset all others
( z+ G3 P% U. F! S3 t                        // 可能有很多peer都发送了这个区块的hash值,随机选择一个peer1 V1 Y% t6 \2 F. z* ^8 ?+ A
                        announce := announces[rand.Intn(len(announces))]
0 H/ X$ L$ _/ {+ c# I( _                        f.forgetHash(hash)
# m2 F/ k, v0 R% ~                        // If the block still didn't arrive, queue for fetching
" o& c) p+ l7 ]* s; ]                        // 本地还没有这个区块,创建获取区块的请求5 j# g0 h" y7 {/ d. J' h" k) l
                        if f.getBlock(hash) == nil {1 n! Z  K( {6 x7 P! J, Y( {6 `. D$ i
                                request[announce.origin] = append(request[announce.origin], hash)
9 t) e0 d+ S3 m$ w4 Z                                f.fetching[hash] = announce
: _2 [/ i: X/ C  O/ X                        }5 M$ c0 \7 V7 e0 G) T5 P2 U8 _
                }8 T1 i( h3 V+ G( ~6 o4 I
        }0 O* @8 @: [) U* ~7 i
        // Send out all block header requests
: B2 Y. F8 f# I: x1 \. K        // 把所有的request发送出去
8 F6 D: C7 w5 K* y1 \, h) g2 l        // 为每一个peer都创建一个协程,然后请求所有需要从该peer获取的请求
( a5 F; e3 N* {        for peer, hashes := range request {! l, m/ J( k. b% ~& |7 ]* N
                log.Trace("Fetching scheduled headers", "peer", peer, "list", hashes)
' f) X* F$ e7 [! K. X: Q; w, Q% D                // Create a closure of the fetch and schedule in on a new thread+ H- W+ Y: W0 _, S( \7 W$ J3 Z
                fetchHeader, hashes := f.fetching[hashes[0]].fetchHeader, hashes2 T8 G, n% x! b; }' a( d5 P
                go func() {0 L/ Q6 g3 S6 a" F4 b3 K
                        if f.fetchingHook != nil {3 D+ w6 m$ a% o; \: j
                                f.fetchingHook(hashes)0 i# f9 l, |. z2 e3 x- ^) E
                        }4 T& Q& `5 C; [' e, l" O! d8 |
                        for _, hash := range hashes {
9 ?! m) J. d* J                                headerFetchMeter.Mark(1)
6 v& c; k: e- a- u6 u+ A" B                                fetchHeader(hash) // Suboptimal, but protocol doesn't allow batch header retrievals5 g" h6 i, D/ W- `( x$ o
                        }
2 X+ u+ z1 b" |, @                }(): W/ d1 Y- Q, j& N; R
        }
- f9 p0 f) d& m, \% |6 q        // Schedule the next fetch if blocks are still pending( X/ r, I9 ~/ i. h8 H# ~1 n
        f.rescheduleFetch(fetchTimer)
* N3 [- }6 D" [3 u+ H0 z9 m从Notify的调用中,可以看出,fetcherHeader()的实际函数是RequestOneHeader(),该函数使用的消息是GetBlockHeadersMsg,可以用来请求多个区块头,不过fetcher只请求一个。9 G: p& h" s+ D9 {* Q- V
pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestOneHeader, p.RequestBodies)7 V7 {* A1 E7 f& ], g6 l
// RequestOneHeader is a wrapper around the header query functions to fetch a
5 p0 c0 g6 Y3 t0 V/ m" ?// single header. It is used solely by the fetcher., ^/ T( m% o) _( P; G
func (p *peer) RequestOneHeader(hash common.Hash) error {
$ l2 k0 @$ m3 S1 H        p.Log().Debug("Fetching single header", "hash", hash)
. Y* v7 i# ~" F( }2 p- I        return p2p.Send(p.rw, GetBlockHeadersMsg, &getBlockHeadersData{Origin: hashOrNumber{Hash: hash}, Amount: uint64(1), Skip: uint64(0), Reverse: false})' b+ B; z9 h) _# r
}
' V+ k5 z, s7 \' y4 OGetBlockHeadersMsg的处理如下:因为它是获取多个区块头的,所以处理起来比较“麻烦”,还好,fetcher只获取一个区块头,其处理在20行~33行,获取下一个区块头的处理逻辑,这里就不看了,最后调用SendBlockHeaders()将区块头发送给请求的节点,消息是BlockHeadersMsg。' s$ V' T7 F$ g3 `2 z5 w7 o
···
* F7 f% K; R8 S) s" i( Q, K// handleMsg()# j" X0 j/ m1 t, D  Y0 \) D
// Block header query, collect the requested headers and reply
5 `, L' c7 v/ m0 Zcase msg.Code == GetBlockHeadersMsg:7 M" B  y: G/ t  y1 ^9 t4 ^
// Decode the complex header query
9 y% ]: M! Q& Q6 tvar query getBlockHeadersData
4 P+ p" y" A5 g; a+ bif err := msg.Decode(&query); err != nil {+ P1 g3 R5 t" C
return errResp(ErrDecode, “%v: %v”, msg, err)
- H7 ^  u8 e- v# {& t7 _; u5 Y}
; @( ?" J: j  _hashMode := query.Origin.Hash != (common.Hash{})
# c0 p. C1 Z. L4 b- v5 `- s5 l8 c) s3 C// Gather headers until the fetch or network limits is reached
  N# ]4 b( l- F) j- c' V% ]. i// 收集区块头,直到达到限制  k* x6 S- H  K7 ^1 x, ^6 y5 [8 T0 W1 Y
var (, Q! s% j/ [2 Z
        bytes   common.StorageSize/ y  v. }% K- _% ^
        headers []*types.Header
7 G. B& Q# N$ o. N) y9 V/ N5 l        unknown bool
) k: {+ k; N& P1 z)
8 ?, }1 V8 Y8 c2 }4 t1 |// 自己已知区块 && 少于查询的数量 && 大小小于2MB && 小于能下载的最大数量: O3 G. m' Z6 O
for !unknown && len(headers) . A: o  g5 c. W# }$ z
`BlockHeadersMsg`的处理很有意思,因为`GetBlockHeadersMsg`并不是fetcher独占的消息,downloader也可以调用,所以,响应消息的处理需要分辨出是fetcher请求的,还是downloader请求的。它的处理逻辑是:fetcher先过滤收到的区块头,如果fetcher不要的,那就是downloader的,在调用`fetcher.FilterHeaders`的时候,fetcher就将自己要的区块头拿走了。  m: }+ {6 I$ M$ f8 m
// handleMsg(); [/ m7 C# y& k% X9 a% I
case msg.Code == BlockHeadersMsg:
7 J; E+ U) V; h7 s, B7 Z$ v' Y# v- `// A batch of headers arrived to one of our previous requests4 `/ v  ^$ k/ Q& I
var headers []*types.Header
% y9 Y7 X. M: ]8 I; _* kif err := msg.Decode(&headers); err != nil {
. B8 G2 e2 a. l4 areturn errResp(ErrDecode, “msg %v: %v”, msg, err)
1 L7 V% q* l/ h: b/ H  g) u}5 |6 |  m. k# [8 g* w5 L+ @8 o
// If no headers were received, but we’re expending a DAO fork check, maybe it’s that+ w7 w( h( F. y0 [7 n$ O* S9 c
// 检查是不是当前DAO的硬分叉/ o4 Q+ N' I- }, q& h
if len(headers) == 0 && p.forkDrop != nil {
4 R- Z* M# P3 {// Possibly an empty reply to the fork header checks, sanity check TDs. J6 s: T5 b( T, u! P
verifyDAO := true' P% q/ b5 a; }: K- N5 {: C
        // If we already have a DAO header, we can check the peer's TD against it. If
" a, f4 [; l* K* t6 u2 O: x2 r! m        // the peer's ahead of this, it too must have a reply to the DAO check
$ P4 c0 }* R; [4 L        if daoHeader := pm.blockchain.GetHeaderByNumber(pm.chainconfig.DAOForkBlock.Uint64()); daoHeader != nil {
+ g5 [% X0 s* `  m# R1 a2 A( b2 m                if _, td := p.Head(); td.Cmp(pm.blockchain.GetTd(daoHeader.Hash(), daoHeader.Number.Uint64())) >= 0 {
- o6 o7 S4 S9 j$ J5 o% M& f4 ?1 Y" O                        verifyDAO = false1 E& m! o: F. h; m
                }+ R; `2 ?5 P% j$ _' m$ |: d
        }3 @4 A2 D- f( D+ q1 [
        // If we're seemingly on the same chain, disable the drop timer
3 U! z0 H; R  }1 C% E        if verifyDAO {( d" a3 h# i5 T
                p.Log().Debug("Seems to be on the same side of the DAO fork")
# C8 g! y% j$ B                p.forkDrop.Stop()+ F1 K1 S0 z7 P
                p.forkDrop = nil: ?3 Q6 ?3 J! M3 t1 [4 A& a8 S
                return nil
! K1 s7 \+ \  }# L7 S        }
( U# J9 @$ K; i- @1 Y; ^$ I- j- c0 _}
# Q# ^0 E% W8 E/ d5 L- N8 ^; j, W// Filter out any explicitly requested headers, deliver the rest to the downloader
" Y$ G3 Y* E- \8 g: ]0 y// 过滤是不是fetcher请求的区块头,去掉fetcher请求的区块头再交给downloader( N* G* N" m& b1 o$ G1 k  H4 Y$ o! a
filter := len(headers) == 14 J& V7 ^! w, H9 U5 \% Y4 D
if filter {9 D" Y1 U9 q1 T
        // If it's a potential DAO fork check, validate against the rules
$ v& e7 g5 |! q3 I9 y$ B        // 检查是否硬分叉  s. M3 N, z1 w5 e. l0 F9 e6 Z0 s
        if p.forkDrop != nil && pm.chainconfig.DAOForkBlock.Cmp(headers[0].Number) == 0 {
6 V4 @, R, I0 }, }4 f                // Disable the fork drop timer
7 r7 z; O. y/ Y# Q* Q* c1 R9 W6 I                p.forkDrop.Stop()/ q/ {2 f4 D' o6 t
                p.forkDrop = nil
9 }* @4 |- I+ L                // Validate the header and either drop the peer or continue
( {6 {# k/ [, m1 \8 ?  D" g                if err := misc.VerifyDAOHeaderExtraData(pm.chainconfig, headers[0]); err != nil {
8 T! `& o0 t1 ]( {                        p.Log().Debug("Verified to be on the other side of the DAO fork, dropping")) T$ j# c/ l* N- Z
                        return err
' X8 j; ]: ]4 X1 `: c# U                }$ A$ A' I% F. |* z2 n7 d3 s
                p.Log().Debug("Verified to be on the same side of the DAO fork")
6 U' s9 @0 D; w                return nil$ p  F) B3 r7 ~% n' c! I7 E/ K
        }) c5 |2 b( o& [' l5 o1 v
        // Irrelevant of the fork checks, send the header to the fetcher just in case2 [2 I8 w- I: s) S2 s+ D( W  a' q
        // 使用fetcher过滤区块头. t# o1 O* g7 p+ D; c; i8 c+ c! R* e
        headers = pm.fetcher.FilterHeaders(p.id, headers, time.Now())" Y8 h; o% v0 O5 v2 g  H
}
9 u: ?% ]( Z! L. o; w+ H3 a& o// 剩下的区块头交给downloader0 W! `/ X7 v2 N* M1 |' n
if len(headers) > 0 || !filter {3 m4 l; _9 U" \, h: F4 T
        err := pm.downloader.DeliverHeaders(p.id, headers)* E2 I* }6 r% \8 q( }: J$ ]
        if err != nil {
; B. f" B8 ?$ n/ c5 F. B                log.Debug("Failed to deliver headers", "err", err)6 \6 r1 Q0 O) Y: h: I" g+ z
        }, @5 r7 P6 f7 i
}
+ q( ?! M" @+ L7 V`FilterHeaders()`是一个很有大智慧的函数,看起来耐人寻味,但实在妙。它要把所有的区块头,都传递给fetcher协程,还要获取fetcher协程处理后的结果。`fetcher.headerFilter`是存放通道的通道,而`filter`是存放包含区块头过滤任务的通道。它先把`filter`传递给了`headerFilter`,这样`fetcher`协程就在另外一段等待了,而后将`headerFilterTask`传入`filter`,`fetcher`就能读到数据了,处理后,再将数据写回`filter`而刚好被`FilterHeaders`函数处理了,该函数实际运行在`handleMsg()`的协程中。
6 S! T2 V5 P! y* k& \% R1 N每个Peer都会分配一个ProtocolManager然后处理该Peer的消息,但`fetcher`只有一个事件处理协程,如果不创建一个`filter`,fetcher哪知道是谁发给它的区块头呢?过滤之后,该如何发回去呢?  I: a1 S' w' p) J5 D
// FilterHeaders extracts all the headers that were explicitly requested by the fetcher,- J; J% {& n, P9 ~
// returning those that should be handled differently.6 c$ [/ G0 C, u- a( L% `* m
// 寻找出fetcher请求的区块头, w- S, l5 X$ \+ k( i
func (f *Fetcher) FilterHeaders(peer string, headers []*types.Header, time time.Time) []*types.Header {3 ], m9 K; {( P; e/ z& {
log.Trace(“Filtering headers”, “peer”, peer, “headers”, len(headers))4 ?6 q& A8 z4 s# [# `6 A" w
// Send the filter channel to the fetcher8 s4 d) p. U* S; X, E% w5 Z
// 任务通道
% ^/ f9 P( l! A: P; s5 a( `0 D+ o4 bfilter := make(chan *headerFilterTask)
5 Z3 J& v' n- C2 N+ aselect {
% J( e9 N' A$ p1 h7 J// 任务通道发送到这个通道
0 `. U: ?' ?  P1 M; Z9 ^9 acase f.headerFilter
, s6 X4 y. P7 E% G# \2 z}: f$ N3 }9 K2 v$ `2 J1 v6 ?
接下来要看f.headerFilter的处理,这段代码有90行,它做了一下几件事:) {2 p" L" V, _1 O, H: ^
1. 从`f.headerFilter`取出`filter`,然后取出过滤任务`task`。
+ y$ Y5 D8 |" A5 D; O) t0 q8 E2. 它把区块头分成3类:`unknown`这不是分是要返回给调用者的,即`handleMsg()`, `incomplete`存放还需要获取body的区块头,`complete`存放只包含区块头的区块。遍历所有的区块头,填到到对应的分类中,具体的判断可看18行的注释,记住宏观中将的状态转移图。3 y6 E  C8 v1 t% p/ S; ?: S- q% F
3. 把`unknonw`中的区块返回给`handleMsg()`。
4 `. V' n! c+ ^+ N$ @1 C4. 把` incomplete`的区块头获取状态移动到`fetched`状态,然后触发定时器,以便去处理complete的区块。
0 P. x& [; F5 M& s% b5. 把`compelete`的区块加入到`queued`。
5 H3 L3 P3 {# a7 q, d& \( R// fetcher.loop()* K, f: S, t! p" h& ~
case filter :=
/ T; s0 h! l9 R// Split the batch of headers into unknown ones (to return to the caller),
; B2 ?! D) r' T. q; I- d4 R// known incomplete ones (requiring body retrievals) and completed blocks.
% w$ o: a6 D' t& ]7 S* ^// unknown的不是fetcher请求的,complete放没有交易和uncle的区块,有头就够了,incomplete放% i' E2 N" o+ F$ a8 J
// 还需要获取uncle和交易的区块
" t9 e# X( ?9 E( }' runknown, incomplete, complete := []*types.Header{}, []*announce{}, []*types.Block{}9 T* l$ Z7 t; u: J7 l) L
// 遍历所有收到的header
* P3 p" a8 w& C2 t+ D7 d+ cfor _, header := range task.headers {
! |* ^+ r: L$ Y0 F$ ]* o: `        hash := header.Hash()
. [8 C' u( v8 j- E8 ~3 Z' y        // Filter fetcher-requested headers from other synchronisation algorithms6 \1 l' R/ _# W, p4 x1 T* ]5 u# p
        // 是正在获取的hash,并且对应请求的peer,并且未fetched,未completing,未queued
* u2 d6 M6 i2 u9 }$ T        if announce := f.fetching[hash]; announce != nil && announce.origin == task.peer && f.fetched[hash] == nil && f.completing[hash] == nil && f.queued[hash] == nil {
) x& M# }; o& W- Q4 a3 m6 v6 U                // If the delivered header does not match the promised number, drop the announcer
% J9 o- S$ U: x5 o' {9 m2 T                // 高度校验,竟然不匹配,扰乱秩序,peer肯定是坏蛋。3 X+ B/ w5 o- [0 p  n
                if header.Number.Uint64() != announce.number {
+ a3 {4 ?' a9 E0 e+ {                        log.Trace("Invalid block number fetched", "peer", announce.origin, "hash", header.Hash(), "announced", announce.number, "provided", header.Number)5 Q6 j* f, n  [
                        f.dropPeer(announce.origin)
$ Q& Q) N" ]2 a2 x( d                        f.forgetHash(hash)& a( H8 c# c: Q% O) ^% r9 t7 [
                        continue& z/ a/ X1 Q2 ?
                }# J1 ^& f( u8 v4 f4 Z  F
                // Only keep if not imported by other means
6 {# w4 {4 K5 i! I6 o( d                // 本地链没有当前区块
4 R; d+ a3 W$ }, O' f                if f.getBlock(hash) == nil {
9 c8 \$ g0 t( ^                        announce.header = header
$ H6 @8 `# F, H7 m                        announce.time = task.time
- k. M, q4 D4 P* ~                        // If the block is empty (header only), short circuit into the final import queue$ G' G7 {% w- X# @& P( h2 |
                        // 如果区块没有交易和uncle,加入到complete" j- X/ P- w; Z* E( E! g
                        if header.TxHash == types.DeriveSha(types.Transactions{}) && header.UncleHash == types.CalcUncleHash([]*types.Header{}) {
4 n1 g3 t9 W' l6 B- _) d                                log.Trace("Block empty, skipping body retrieval", "peer", announce.origin, "number", header.Number, "hash", header.Hash())
( p% ^% V7 _4 z! i                                block := types.NewBlockWithHeader(header)
& z% R- [1 k2 o  U7 I' p                                block.ReceivedAt = task.time
. @  S' G# a/ N                                complete = append(complete, block)9 ]& @( }6 W* H& a
                                f.completing[hash] = announce
/ i/ A. J9 I9 d! j  Y                                continue
) `* }1 V6 _( N. k# ^! R* Z                        }; @3 D( y8 G% K- M3 T* M* L
                        // Otherwise add to the list of blocks needing completion, d/ ]9 y- W' E8 Y
                        // 否则就是不完整的区块
! H! h6 i. O. ~3 N3 B. E                        incomplete = append(incomplete, announce)
5 i4 q7 }4 D; D5 o6 y- N                } else {
. t# [3 X& P. c6 j                        log.Trace("Block already imported, discarding header", "peer", announce.origin, "number", header.Number, "hash", header.Hash())
5 S; U- Z) t/ N; L# u2 g1 C                        f.forgetHash(hash), E9 b9 D2 E8 ~# c5 C- k2 C  F. L
                }- C2 `$ _( c2 w1 e! F
        } else {
% T/ ^, K, s( ~: z                // Fetcher doesn't know about it, add to the return list, I5 {7 E) O: n: j1 N, j+ Y. j
                // 没请求过的header
( G$ ?8 ~) ], H  Z! J4 {2 s                unknown = append(unknown, header)/ l) |: b: e- }" ]% W) |" H0 y4 F
        }( A3 p. W/ |4 X( k
}
9 \+ ~7 t. o! o& ]- `% P  {// 把未知的区块头,再传递会filter+ n$ x# ]: L1 }1 ~2 l
headerFilterOutMeter.Mark(int64(len(unknown)))
( _+ ?: {. |: `* ]select {
" s# {; V5 d! j9 J. X! t- _case filter 7 g7 O# R) C0 s
跟随状态图的转义,剩下的工作是`fetched`转移到`completing`        ,上面的流程已经触发了`completeTimer`定时器,超时后就会处理,流程与请求Header类似,不再赘述,此时发送的请求消息是`GetBlockBodiesMsg`,实际调的函数是`RequestBodies`。% D) G$ H2 D' N) m( U  ~( p0 A( L
// fetcher.loop()
3 p8 X$ b! M& J& t/ T. ]' t' icase
' A$ n* \- H" }2 o$ g8 b' H// 遍历所有待获取body的announce) ^' }: J4 y- H( ?8 Q+ ~
for hash, announces := range f.fetched {
: z* ^1 M: I8 ?+ a- f5 a5 s        // Pick a random peer to retrieve from, reset all others
: j: c9 e6 _+ p; r% u; B& ~        // 随机选一个Peer发送请求,因为可能已经有很多Peer通知它这个区块了% f1 Y( e& D) L/ k& U. E
        announce := announces[rand.Intn(len(announces))]
& n1 y" t! I& a$ O3 `; d        f.forgetHash(hash)
0 b7 q( }& n0 w* K3 p# l5 t        // If the block still didn't arrive, queue for completion
- r6 G* g% N. Y$ ~- I. x* e        // 如果本地没有这个区块,则放入到completing,创建请求
9 M: }, y  K* A- A0 @        if f.getBlock(hash) == nil {
* d6 `, s/ B1 t( `3 m                request[announce.origin] = append(request[announce.origin], hash)
- x( n% R3 k0 H5 a$ y: s                f.completing[hash] = announce
7 Q5 j4 }: H! L1 c$ w        }' V# W0 z( {7 A
}
& W, R0 x  G* S! w3 K" y6 r5 ?, P// Send out all block body requests
& f4 h+ X1 l' i3 e7 q7 T// 发送所有的请求,获取body,依然是每个peer一个单独协程
( Q2 Z3 V4 ~8 r5 E! cfor peer, hashes := range request {
" l. U) e; \0 D9 L% ^/ ~1 u* V0 s        log.Trace("Fetching scheduled bodies", "peer", peer, "list", hashes)
. y4 o# S/ q5 V" E' F# I; R        // Create a closure of the fetch and schedule in on a new thread0 T$ v! m; {. y$ {/ Y; e7 x
        if f.completingHook != nil {
2 y8 w$ A2 }( ~+ W6 T$ o, k5 b                f.completingHook(hashes)4 u+ T, h% I9 B( L2 ~6 H
        }( w3 [$ H6 U8 R. q6 ?) f
        bodyFetchMeter.Mark(int64(len(hashes))), F( T" ]; L2 N6 X( d& {7 ~
        go f.completing[hashes[0]].fetchBodies(hashes)3 i' n1 \( Q- i5 N* l) b
}5 l9 g' ?) t7 [* |, O0 I' S1 e) }
// Schedule the next fetch if blocks are still pending
1 P- Z. M% ^6 w8 bf.rescheduleComplete(completeTimer)- @8 J; ]9 h& D; b% i/ T' V
`handleMsg()`处理该消息也是干净利落,直接获取RLP格式的body,然后发送响应消息。
0 s3 C+ s; l& ?// handleMsg(): l! F% {5 c7 O0 Z& v. G1 b
case msg.Code == GetBlockBodiesMsg:8 @+ d6 v( `' a3 r( k- W
// Decode the retrieval message
7 q' P  t3 R3 Y' c% K- YmsgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
, ]* x" a& d: Gif _, err := msgStream.List(); err != nil {
. R$ @! ]9 ^- D% e' Q9 Y1 _* q+ Greturn err  {. `" V  |) n' j& W9 n. ~# [2 q
}8 }1 j& F0 }* O) p( @
// Gather blocks until the fetch or network limits is reached
$ E* Z9 M& _" [0 ^var (
! W9 G: C$ F' t2 O, p/ w, thash   common.Hash) u7 G0 t3 t2 R- e
bytes  int
' u, y- P  [1 p# _( H6 _bodies []rlp.RawValue
& ]' {( J8 P: T7 c+ a  F4 c$ e)6 U  o2 R7 ^7 H& w) j
// 遍历所有请求# V' ~# ^- m$ y6 `
for bytes ! H  ^0 x' d$ r0 K3 \9 |) a, h. B, ~
响应消息`BlockBodiesMsg`的处理与处理获取header的处理原理相同,先交给fetcher过滤,然后剩下的才是downloader的。需要注意一点,响应消息里只包含交易列表和叔块列表。' ~# M) P* b( _$ b7 |$ F0 p8 ~0 A
// handleMsg()
; h! L9 x) z# ~, P2 p, hcase msg.Code == BlockBodiesMsg:
0 E0 b2 I# ?3 N% M6 F; y// A batch of block bodies arrived to one of our previous requests, U0 f! E, c- v- J8 |/ y, U
var request blockBodiesData% ?* |- w' x; o% f
if err := msg.Decode(&request); err != nil {
& s1 Y" |, d2 O# N6 p/ breturn errResp(ErrDecode, “msg %v: %v”, msg, err)0 E) I0 G7 M( V# Z, a: ~: o
}
* ?5 p8 E# ]& {9 f% r# Y// Deliver them all to the downloader for queuing) M; D/ g/ ^5 a7 w) O# O
// 传递给downloader去处理
$ l8 d% z& i% v2 q% X9 X3 c- u- Btransactions := make([][]*types.Transaction, len(request))
6 R+ a) o& I2 v% C1 t- E8 ouncles := make([][]*types.Header, len(request))
; H# D& T' U+ q5 {9 S$ z* Rfor i, body := range request {3 R9 T" I7 z# H$ Z. D
        transactions = body.Transactions
$ C: p  U+ Z% |+ s4 u        uncles = body.Uncles
- V4 c4 G" w/ h+ N& H}1 D; {2 F6 {* t: }1 F0 C
// Filter out any explicitly requested bodies, deliver the rest to the downloader( n, K1 O2 h: u5 m7 Z: U4 K
// 先让fetcher过滤去fetcher请求的body,剩下的给downloader
8 B' o% w; M- L6 i+ U2 @filter := len(transactions) > 0 || len(uncles) > 0
$ p2 T% o( V- ~0 cif filter {% `0 A$ z) j4 E; Y; }0 N, x, S
        transactions, uncles = pm.fetcher.FilterBodies(p.id, transactions, uncles, time.Now())
; }, l; Y4 D9 t}
7 U8 p" `5 O! I% ?5 f// 剩下的body交给downloader
0 K% K, C+ R5 C: jif len(transactions) > 0 || len(uncles) > 0 || !filter {5 N1 G- i. m/ C8 P$ z' o
        err := pm.downloader.DeliverBodies(p.id, transactions, uncles)
% k$ v- o8 ^- z* S+ V+ j        if err != nil {
2 e, v# ~" a! z+ z                log.Debug("Failed to deliver bodies", "err", err); O4 w$ f6 _  E$ }# H
        }7 _" I* f" ^" l
}
/ s) Y* w) H6 @5 Q) n8 C7 D$ h过滤函数的原理也与Header相同。; a' \/ r4 v5 T6 ^0 O
// FilterBodies extracts all the block bodies that were explicitly requested by* }4 L2 o' H' l- b5 J
// the fetcher, returning those that should be handled differently.
9 k: ]  h! W3 Y: g// 过去出fetcher请求的body,返回它没有处理的,过程类型header的处理
6 f7 C( ?+ b% |7 j6 Z: h; qfunc (f *Fetcher) FilterBodies(peer string, transactions [][]*types.Transaction, uncles [][]*types.Header, time time.Time) ([][]*types.Transaction, [][]*types.Header) {) R+ R, m- a- O. a$ u4 G. P" f
log.Trace(“Filtering bodies”, “peer”, peer, “txs”, len(transactions), “uncles”, len(uncles))- Y& g) d' e3 N6 x3 d8 d* Y# T# f# {: h
// Send the filter channel to the fetcher
! J1 D  W: A/ z7 w* Ifilter := make(chan *bodyFilterTask)
- v+ [/ ^4 D; O$ pselect {( K, L" J, A: h+ w  T
case f.bodyFilter . B3 V" T$ t' Y3 g' ]) q: r% p
}( f4 G, ~' q! X& p$ k+ r/ e1 j) R$ a
实际过滤body的处理瞧一下,这和Header的处理是不同的。直接看不点:" C) X( `+ Q0 C4 h$ m2 k; D
1. 它要的区块,单独取出来存到`blocks`中,它不要的继续留在`task`中。
2 ]! [$ T; O+ U+ Q2. 判断是不是fetcher请求的方法:如果交易列表和叔块列表计算出的hash值与区块头中的一样,并且消息来自请求的Peer,则就是fetcher请求的。5 Z5 R  P/ K% u' P+ o: }- \* h  g
3. 将`blocks`中的区块加入到`queued`,终结。
: ^0 [9 L2 [( p: J/ Qcase filter :=
$ K+ s' ?( V$ iblocks := []*types.Block{}  d+ A  e/ Z6 E0 k5 B  D
// 获取的每个body的txs列表和uncle列表
5 P0 i$ A3 r6 Y: y// 遍历每个区块的txs列表和uncle列表,计算hash后判断是否是当前fetcher请求的body
! B1 a7 {3 X2 P' Rfor i := 0; i : C( `& w4 M1 y0 t8 V% F
}, t6 L2 |( }0 P* M3 h+ p: m
7 H3 o- B+ H/ \: e% P2 T. x
至此,fetcher获取完整区块的流程讲完了,fetcher模块中80%的代码也都贴出来了,还有2个值得看看的函数:2 F* e! |6 h+ |) i/ m
1. `forgetHash(hash common.Hash)`:用于清空指定hash指的记/状态录信息。. O! A- |6 n# u4 Q6 W5 Z$ h
2. `forgetBlock(hash common.Hash)`:用于从队列中移除一个区块。) w0 b1 K6 L: L3 U
最后了,再回到开始看看fetcher模块和新区块的传播流程,有没有豁然开朗。+ a# g: N8 I% o
5 [  c" ?# i3 J. V" F$ o
BitMere.com 比特池塘系信息发布平台,比特池塘仅提供信息存储空间服务。
声明:该文观点仅代表作者本人,本文不代表比特池塘立场,且不构成建议,请谨慎对待。
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

成为第一个吐槽的人

刘艳琴 小学生
  • 粉丝

    0

  • 关注

    0

  • 主题

    3