Hi 游客

更多精彩,请登录!

比特池塘 区块链技术 正文

以太坊源码分析:fetcher模块和区块传播

刘艳琴
140 0 0
从区块传播策略入手,介绍新区块是如何传播到远端节点,以及新区块加入到远端节点本地链的过程,同时会介绍fetcher模块,fetcher的功能是处理Peer通知的区块信息。在介绍过程中,还会涉及到p2p,eth等模块,不会专门介绍,而是专注区块的传播和加入区块链的过程。$ b# ]' K! H/ j
当前代码是以太坊Release 1.8,如果版本不同,代码上可能存在差异。
+ s* |" N7 C9 P" [总体过程和传播策略$ W. q, V5 W% w/ T
本节从宏观角度介绍,节点产生区块后,为了传播给远端节点做了啥,远端节点收到区块后又做了什么,每个节点都连接了很多Peer,它传播的策略是什么样的?
( n# K% k# ^& L/ F* \! u总体流程和策略可以总结为,传播给远端Peer节点,Peer验证区块无误后,加入到本地区块链,继续传播新区块信息。具体过程如下。( K) C9 J* l6 F  U
先看总体过程。产生区块后,miner模块会发布一个事件NewMinedBlockEvent,订阅事件的协程收到事件后,就会把新区块的消息,广播给它所有的peer,peer收到消息后,会交给自己的fetcher模块处理,fetcher进行基本的验证后,区块没问题,发现这个区块就是本地链需要的下一个区块,则交给blockChain进一步进行完整的验证,这个过程会执行区块所有的交易,无误后把区块加入到本地链,写入数据库,这个过程就是下面的流程图,图1。. f* W1 |3 S; C

/ s7 m& s( i0 {4 a( a总体流程图,能看到有个分叉,是因为节点传播新区块是有策略的。它的传播策略为:
, P+ E% u$ X$ {1 r% k* t: ?假如节点连接了N个Peer,它只向Peer列表的sqrt(N)个Peer广播完整的区块消息。向所有的Peer广播只包含区块Hash的消息。% @6 _4 W4 P) C& C
策略图的效果如图2,红色节点将区块传播给黄色节点:
1 W: u3 F# J3 `* L
; `. u3 r2 U* Y5 y$ Q9 X+ A' k& h7 @+ i6 O  P+ `- \% G" d" T4 O* V
收到区块Hash的节点,需要从发送给它消息的Peer那里获取对应的完整区块,获取区块后就会按照图1的流程,加入到fetcher队列,最终插入本地区块链后,将区块的Hash值广播给和它相连,但还不知道这个区块的Peer。非产生区块节点的策略图,如图3,黄色节点将区块Hash传播给青色节点:; i2 p( q! x$ l- ]& S
; U" p/ m# z" A" n: f; K
至此,可以看出以太坊采用以石击水的方式,像水纹一样,层层扩散新产生的区块。: F" ~6 E8 F8 T2 r' p
Fetcher模块是干啥的
9 t* @+ d, j5 n/ C# H2 L) T. mfetcher模块的功能,就是收集其他Peer通知它的区块信息:1)完整的区块2)区块Hash消息。根据通知的消息,获取完整的区块,然后传递给eth模块把区块插入区块链。
8 c, z$ ^0 x. d, Q; @如果是完整区块,就可以传递给eth插入区块,如果只有区块Hash,则需要从其他的Peer获取此完整的区块,然后再传递给eth插入区块2 l$ F8 w( |9 A/ R  c" [9 i
/ l, ^6 b3 @- E" U' K& \
源码解读
; \( l2 m# i( Q& S' H, `本节介绍区块传播和处理的细节东西,方式仍然是先用图解释流程,再是代码流程。
6 S  [0 @0 H- N产块节点的传播新区块
5 f+ ~% L1 y0 t+ d6 |) E0 M) {节点产生区块后,广播的流程可以表示为图4:
2 G$ M+ E, L5 w* Y6 X发布事件事件处理函数选择要广播完整的Peer,然后将区块加入到它们的队列事件处理函数把区块Hash添加到所有Peer的另外一个通知队列每个Peer的广播处理函数,会遍历它的待广播区块队列和通知队列,把数据封装成消息,调用P2P接口发送出去( F& s# o( z' _* q7 ]: X: _

0 N& W/ y/ j6 u5 j% G  k, \6 w+ @/ R- x, X
再看下代码上的细节。; L8 ]& L- {8 n$ J
worker.wait()函数发布事件NewMinedBlockEvent。ProtocolManager.minedBroadcastLoop()是事件处理函数。它调用了2次pm.BroadcastBlock()。
, q8 j1 N# Y% s
& \% L3 F( C/ e! ^  v& B0 j9 B7 u// Mined broadcast loop
. q2 v( \$ ]/ b. B6 Mfunc (pm *ProtocolManager) minedBroadcastLoop() {
. I/ W  q/ X# v4 K) N+ _        // automatically stops if unsubscribe0 R, C) ]; I" y1 k
        for obj := range pm.minedBlockSub.Chan() {
0 C: n( k4 I4 F& |: ~                switch ev := obj.Data.(type) {
9 u- v9 w9 L5 x% L  |                case core.NewMinedBlockEvent:
4 |( J( Z8 Z0 E- |* b5 Z                        pm.BroadcastBlock(ev.Block, true)  // First propagate block to peers  G( H7 B+ A! B& C
                        pm.BroadcastBlock(ev.Block, false) // Only then announce to the rest: J* V$ I+ R% b! J
                }
7 B3 e& {" M, x$ O  Z" F2 K! q8 H        }  g( c7 _2 A: X: y4 l. \+ f1 g
}5 M7 y/ z4 \) S: `0 M
pm.BroadcastBlock()的入参propagate为真时,向部分Peer广播完整的区块,调用peer.AsyncSendNewBlock(),否则向所有Peer广播区块头,调用peer.AsyncSendNewBlockHash(),这2个函数就是把数据放入队列,此处不再放代码。
9 y2 s. A# U3 q7 u  Z! s// BroadcastBlock will either propagate a block to a subset of it's peers, or
8 R# r5 ~7 O* Q* f# c# l9 O9 S// will only announce it's availability (depending what's requested).
8 C: k% ?( k$ f, b: a* |func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) {4 i5 b# u- v1 W. ?! b- o9 T) i1 y( l
        hash := block.Hash()
% T  n) m3 z( d' l0 J* ~% p        peers := pm.peers.PeersWithoutBlock(hash)4 N- b9 O! ]1 _9 m# }
        // If propagation is requested, send to a subset of the peer$ O2 E) S6 Z7 Y; Q4 ?
        // 这种情况,要把区块广播给部分peer5 N: u5 ^6 t" J  ?# G0 k7 }( L7 I
        if propagate {. ^3 E1 m; R. l9 D
                // Calculate the TD of the block (it's not imported yet, so block.Td is not valid)! p# S  W4 `) ]" ]- @
                // 计算新的总难度
8 q* f: l8 Q5 P* l3 a, M                var td *big.Int
7 o' g& G) {4 W. @: Y) U9 `                if parent := pm.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1); parent != nil {
5 |$ D' W3 X! N7 U9 B                        td = new(big.Int).Add(block.Difficulty(), pm.blockchain.GetTd(block.ParentHash(), block.NumberU64()-1))5 j7 w. m- t6 j% C
                } else {# P/ e9 t' G/ X1 S+ I% }; A- d. f
                        log.Error("Propagating dangling block", "number", block.Number(), "hash", hash)' {% n( B$ P2 {4 K+ _( u3 l( G
                        return& I' I( B; a/ b! i( e
                }7 K. Y# k' ~7 b* h/ h5 K
                // Send the block to a subset of our peers) [# T7 e- k2 E; U* u
                // 广播区块给部分peer8 B6 ~4 O0 B/ }1 ]
                transfer := peers[:int(math.Sqrt(float64(len(peers))))]
& r5 y4 z( k) X! _* }+ J                for _, peer := range transfer {
9 Z2 E' F5 g" |6 ^7 U7 a                        peer.AsyncSendNewBlock(block, td)
3 r: ~. \1 K, K: Z                }
! M6 j3 L9 b# w. t% j                log.Trace("Propagated block", "hash", hash, "recipients", len(transfer), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))
6 @2 Y1 j2 k- K8 @. _# |                return* m2 U1 H* @, d- K2 K" _
        }) n+ c" V& u1 V' ^. Q9 o$ U! F$ \6 i! `
        // Otherwise if the block is indeed in out own chain, announce it
3 K0 k1 M6 a# e: Q* K* H& X        // 把区块hash值广播给所有peer
/ Q; C8 l* E% t; z5 E        if pm.blockchain.HasBlock(hash, block.NumberU64()) {
/ \1 Y8 z5 ]& N, l" R% j( r                for _, peer := range peers {
  D, L  o2 F( `+ a                        peer.AsyncSendNewBlockHash(block)9 t4 H( d# I$ j9 R- d1 d
                }
7 H+ a- w. f# A  Q8 J% z. z2 f                log.Trace("Announced block", "hash", hash, "recipients", len(peers), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))
; T8 H# j$ H/ w* M: t! O; v        }2 ~2 S6 x# Y/ Y7 T
}
8 }$ k6 J2 B7 p* F. k# q) |peer.broadcase()是每个Peer连接的广播函数,它只广播3种消息:交易、完整的区块、区块的Hash,这样表明了节点只会主动广播这3中类型的数据,剩余的数据同步,都是通过请求-响应的方式。
6 g1 K5 w& x1 q// broadcast is a write loop that multiplexes block propagations, announcements, |, L! z  f* V! J
// and transaction broadcasts into the remote peer. The goal is to have an async
% W8 w' I, u. v7 o( x6 o// writer that does not lock up node internals.
$ ?8 v( D# b* p9 l5 b5 u* Pfunc (p *peer) broadcast() {2 y; V6 [; |" U! u
        for {
) R' n  c4 ^. F$ U& ~' z                select {
+ b5 s% `+ l( }0 M& c                // 广播交易6 t4 n- c5 ?0 v5 d% d# Q, ]
                case txs := + H0 r* f5 ]  @: ~; x
Peer节点处理新区块
# w1 k! J9 ?2 W. n+ I2 y本节介绍远端节点收到2种区块同步消息的处理,其中NewBlockMsg的处理流程比较清晰,也简洁。NewBlockHashesMsg消息的处理就绕了2绕,从总体流程图1上能看出来,它需要先从给他发送消息Peer那里获取到完整的区块,剩下的流程和NewBlockMsg又一致了。8 q( A% G" f3 ^" v$ Z
这部分涉及的模块多,画出来有种眼花缭乱的感觉,但只要抓住上面的主线,代码看起来还是很清晰的。通过图5先看下整体流程。( X+ ?/ R# U) s! S" v$ `8 y" S( U
消息处理的起点是ProtocolManager.handleMsg,NewBlockMsg的处理流程是蓝色标记的区域,红色区域是单独的协程,是fetcher处理队列中区块的流程,如果从队列中取出的区块是当前链需要的,校验后,调用blockchian.InsertChain()把区块插入到区块链,最后写入数据库,这是黄色部分。最后,绿色部分是NewBlockHashesMsg的处理流程,代码流程上是比较复杂的,为了能通过图描述整体流程,我把它简化掉了。/ U" O- `8 ~8 v- Z+ P6 T

. w# `( q% W: ?7 F仔细看看这幅图,掌握整体的流程后,接下来看每个步骤的细节。5 e+ a# v. @8 U# B. o
NewBlockMsg的处理
2 h0 x# ~1 y1 [2 M9 x* `. B本节介绍节点收到完整区块的处理,流程如下:
! o7 N6 q* e, v0 X& C8 I首先进行RLP编解码,然后标记发送消息的Peer已经知道这个区块,这样本节点最后广播这个区块的Hash时,不会再发送给该Peer。
8 k( J8 S0 J: V# z* e& O+ o" t1 }将区块存入到fetcher的队列,调用fetcher.Enqueue。8 N0 s5 K$ Y8 O" u
更新Peer的Head位置,然后判断本地链是否落后于Peer的链,如果是,则通过Peer更新本地链。
3 p0 {! T+ }3 f2 G/ r3 ^' ^只看handle.Msg()的NewBlockMsg相关的部分。; C7 z$ Q% B2 }2 }
case msg.Code == NewBlockMsg:. l, O1 g9 O6 B6 z" u
        // Retrieve and decode the propagated block1 `. |# h& ]% t8 L1 \' J! S- B
        // 收到新区块,解码,赋值接收数据" n$ U- G4 V1 }0 `/ a6 b& V* B. v
        var request newBlockData
) P) ^: ^6 j4 n1 k        if err := msg.Decode(&request); err != nil {
5 @4 E4 r  Y6 p4 p5 }/ @1 |) L                return errResp(ErrDecode, "%v: %v", msg, err)
* A) i  Y7 k" R5 N" N3 R        }
& p0 N3 u8 E5 W! Q1 ^  ]        request.Block.ReceivedAt = msg.ReceivedAt, K8 r# q. v' b2 e
        request.Block.ReceivedFrom = p
# o9 a$ \' f9 e2 Y0 `4 B4 V        // Mark the peer as owning the block and schedule it for import
' F; E1 B$ Z" n) u9 d4 F. h        // 标记peer知道这个区块
2 M4 j! A9 A/ i2 e        p.MarkBlock(request.Block.Hash())
& B5 y8 z  a$ n# b# r        // 为啥要如队列?已经得到完整的区块了$ F2 C8 y1 f, n* }% |
        // 答:存入fetcher的优先级队列,fetcher会从队列中选取当前高度需要的块
% U& o: ^" r' f; D/ f+ J3 p        pm.fetcher.Enqueue(p.id, request.Block)# H* r+ w. L6 |
        // Assuming the block is importable by the peer, but possibly not yet done so,  Q, c0 Q4 C0 L. \  }
        // calculate the head hash and TD that the peer truly must have., F( p' T. I6 z
        // 截止到parent区块的头和难度
8 \2 x$ W$ u0 D. R. c$ \8 ?( Q        var () @9 v- S# C5 n; n
                trueHead = request.Block.ParentHash()
1 m/ u, P( k: q4 g! O                trueTD   = new(big.Int).Sub(request.TD, request.Block.Difficulty())
0 r1 J" E) s' J  H) i        )
( Q# }( b9 @7 {$ b        // Update the peers total difficulty if better than the previous
- O  |3 m# g) E! `5 z) D3 M        // 如果收到的块的难度大于peer之前的,以及自己本地的,就去和这个peer同步
' i! J; X. z) \6 M        // 问题:就只用了一下块里的hash指,为啥不直接使用这个块呢,如果这个块不能用,干嘛不少发送些数据,减少网络负载呢。
2 k2 p0 S! k& Z+ q4 R# r        // 答案:实际上,这个块加入到了优先级队列中,当fetcher的loop检查到当前下一个区块的高度,正是队列中有的,则不再向peer请求
3 S+ x4 B6 x+ A: p$ y3 S        // 该区块,而是直接使用该区块,检查无误后交给block chain执行insertChain
/ v& I, \3 |7 j        if _, td := p.Head(); trueTD.Cmp(td) > 0 {7 {; N* T% c2 H( u
                p.SetHead(trueHead, trueTD)
2 R+ [7 b) P  L/ H% E, p: V                // Schedule a sync if above ours. Note, this will not fire a sync for a gap of& L7 L* ]5 C! P* U1 |7 T
                // a singe block (as the true TD is below the propagated block), however this$ X# c6 P9 s/ Q  ]1 _- |! q7 e9 s
                // scenario should easily be covered by the fetcher.
, t6 z  C% H! l                currentBlock := pm.blockchain.CurrentBlock()
: C4 `( @( B+ g9 x                if trueTD.Cmp(pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64())) > 0 {7 \( u3 z5 S: s% I  o% B: k8 A( |
                        go pm.synchronise(p)
; x: \& j2 y- Z2 y7 L$ k                }3 M, e' I3 x2 Q4 n; U9 z, _
        }( i6 M0 I, t, G
//------------------------ 以上 handleMsg
5 i* s8 W! k0 q3 D  q  }# [// Enqueue tries to fill gaps the the fetcher's future import queue.3 t7 p+ P# V0 g9 A) ^# i
// 发给inject通道,当前协程在handleMsg,通过通道发送给fetcher的协程处理
- J' t3 a$ S5 _9 |( Mfunc (f *Fetcher) Enqueue(peer string, block *types.Block) error {
% Z' p& w- S8 f: m- o0 s& R        op := &inject{
7 ?0 M: P; G3 V" h5 n; b                origin: peer,
9 F2 v. n; ?3 K# d! [: ]' t                block:  block,
1 r+ p/ v( H; z) H        }
) T! `% c3 i' h1 s' N$ _; ^3 P        select {
  O% o8 s1 x' C# L        case f.inject  blockLimit {7 D0 Q9 C0 \# K  Y& @- s& p+ s% M
                log.Debug("Discarded propagated block, exceeded allowance", "peer", peer, "number", block.Number(), "hash", hash, "limit", blockLimit)" `5 g' i2 s' D7 l6 n9 B
                propBroadcastDOSMeter.Mark(1)
4 }. w( i2 b0 D8 ~9 U* E2 R6 t                f.forgetHash(hash)( S2 z. w2 o6 N* h! B
                return. A/ s5 X# z9 e: Z: o9 E$ T- d1 ?
        }
1 ^5 Z. H3 r! N$ j' v        // Discard any past or too distant blocks) l5 f5 Z# {; Y/ ]( a2 C6 W
        // 高度检查:未来太远的块丢弃" i: `, I  N6 O! h: k& l
        if dist := int64(block.NumberU64()) - int64(f.chainHeight()); dist  maxQueueDist {; |( V& x/ e, M& ~4 B
                log.Debug("Discarded propagated block, too far away", "peer", peer, "number", block.Number(), "hash", hash, "distance", dist)
  y, E8 u& y5 L- ~& d/ [$ {                propBroadcastDropMeter.Mark(1)
% o1 R! |/ ^  M- v+ C% d                f.forgetHash(hash): e. r+ Y2 E( ]5 j# k8 @" M
                return2 G  l5 W3 Y" |8 I4 B- W
        }4 k2 w3 e0 x4 b. U8 m. b: Y
        // Schedule the block for future importing
, U! s0 {5 s4 k8 {        // 块先加入优先级队列,加入链之前,还有很多要做9 ~. `- S+ z/ n# S  C
        if _, ok := f.queued[hash]; !ok {6 ~6 G1 |+ l' P/ p
                op := &inject{
+ F9 {  v7 Z* j/ G7 ?& p1 n5 O+ m                        origin: peer,  i. N6 y3 j8 D5 {  O0 p
                        block:  block,( k# h9 s9 F% o8 i9 c
                }6 e8 `7 R9 A. ?+ w" G3 h1 u+ t
                f.queues[peer] = count7 ?* D$ Z* ^" ?* x% b* h; x
                f.queued[hash] = op
  z5 H# P; ~" P1 {0 C$ y- v                f.queue.Push(op, -float32(block.NumberU64()))
8 h! m1 x% i( H8 H2 J2 S                if f.queueChangeHook != nil {7 D& i2 L% Z1 I# J. h/ i
                        f.queueChangeHook(op.block.Hash(), true)3 e& k  g: `( |6 J1 U
                }
, H9 j$ v3 ^, [                log.Debug("Queued propagated block", "peer", peer, "number", block.Number(), "hash", hash, "queued", f.queue.Size())
* \4 a" u) N6 b' k: W        }! s* Q+ I! g" {, C* l4 c0 Y) l; D+ P
}7 }' I& [- o. h5 j
fetcher队列处理2 B& s% W. Q4 B% E0 E7 j( Y9 s
本节我们看看,区块加入队列后,fetcher如何处理区块,为何不直接校验区块,插入到本地链?
4 J. |3 k+ i* m, F3 ]+ o6 Z" w6 K由于以太坊又Uncle的机制,节点可能收到老一点的一些区块。另外,节点可能由于网络原因,落后了几个区块,所以可能收到“未来”的一些区块,这些区块都不能直接插入到本地链。
4 [4 A7 b- R  Z+ K; X区块入的队列是一个优先级队列,高度低的区块会被优先取出来。fetcher.loop是单独协程,不断运转,清理fecther中的事务和事件。首先会清理正在fetching的区块,但已经超时。然后处理优先级队列中的区块,判断高度是否是下一个区块,如果是则调用f.insert()函数,校验后调用BlockChain.InsertChain(),成功插入后,广播新区块的Hash% F! H. j' ^! H6 l- U$ \  I
// Loop is the main fetcher loop, checking and processing various notification2 u9 a9 L% V$ D$ E- |+ b
// events.4 n0 z8 x. l0 n6 z
func (f *Fetcher) loop() {# v3 H' n7 u; g
        // Iterate the block fetching until a quit is requested
! h/ {- f) S8 Y& o/ M7 {  b% y$ `        fetchTimer := time.NewTimer(0)
* @4 z0 ^' R4 u$ s9 l) g" A        completeTimer := time.NewTimer(0)' z4 h9 N) Q( w+ ]
        for {
# X: i3 v3 C  }2 ?. X                // Clean up any expired block fetches
5 G% [/ W+ f; O) z9 H4 f; R  G                // 清理过期的区块* R' Z; _: e% L# i+ `5 u
                for hash, announce := range f.fetching {' K, A2 ~, `3 j  _
                        if time.Since(announce.time) > fetchTimeout {3 b0 {  |0 B; A5 w" q; _. `
                                f.forgetHash(hash)4 p6 f' s1 `" p$ J0 \& G* N
                        }
( I* c9 H& r! K$ ^6 H2 m1 [. X! m6 _                }. j( F* X6 U/ {) i- x4 w6 J
                // Import any queued blocks that could potentially fit6 ]7 X" a8 N( y% B2 D2 g" I
                // 导入队列中合适的块
8 p5 |4 Q' D- Q0 A                height := f.chainHeight()
1 A" {0 V2 e% ~9 E! a                for !f.queue.Empty() {4 y' N* H. y3 _* l9 ?0 [
                        op := f.queue.PopItem().(*inject)) D9 u6 V* j8 k3 T2 j' _
                        hash := op.block.Hash()
1 b5 x# _, y& b                        if f.queueChangeHook != nil {
# Q, ^5 d8 Z" l  n4 }                                f.queueChangeHook(hash, false). I( T( r) ]2 s
                        }
7 Z9 [' a) z" l5 V, T2 Z. X+ @% V$ M                        // If too high up the chain or phase, continue later) B9 P% n- Z  B9 ~9 T
                        // 块不是链需要的下一个块,再入优先级队列,停止循环
; q/ ~" Y0 E8 ~/ z; L3 s8 i                        number := op.block.NumberU64(): }$ t6 R6 b5 }: f) H
                        if number > height+1 {/ G. a1 L/ t6 I. A
                                f.queue.Push(op, -float32(number))5 J3 h' P' j: e: U  L0 C
                                if f.queueChangeHook != nil {6 m% _) y6 g' E' F- z1 x
                                        f.queueChangeHook(hash, true)
# ^2 O( L  T1 ]                                }
; o9 X# i- z: {6 o# ^2 O                                break
, U: P( W% |. m9 {1 r                        }
+ E5 E' _, j( m                        // Otherwise if fresh and still unknown, try and import
2 n% H9 }% j, G1 j3 H1 d5 A                        // 高度正好是我们想要的,并且链上也没有这个块! K- ?( ~8 |) h. U
                        if number+maxUncleDist
/ i/ [& Z( R2 b6 ]8 }5 Hfunc (f *Fetcher) insert(peer string, block *types.Block) {+ |$ p' o0 j; \
        hash := block.Hash()
! l9 y: S3 [/ f+ x        // Run the import on a new thread
  X8 W4 \' G9 V+ X        log.Debug("Importing propagated block", "peer", peer, "number", block.Number(), "hash", hash)
' _- \- B' R) ~% A: k, F        go func() {
+ F9 S0 X9 Y! I9 e0 i6 I( g                defer func() { f.done 5 m8 \$ D2 u' {6 q" O
NewBlockHashesMsg的处理
' ^* _. W3 h8 O& ]2 u9 F; E. r, Y; U本节介绍NewBlockHashesMsg的处理,其实,消息处理是简单的,而复杂一点的是从Peer哪获取完整的区块,下节再看。
+ `! j7 d6 G) W' n9 P4 x流程如下:+ n, _$ ~% h" H: J4 s* [) R
对消息进行RLP解码,然后标记Peer已经知道此区块。寻找出本地区块链不存在的区块Hash值,把这些未知的Hash通知给fetcher。fetcher.Notify记录好通知信息,塞入notify通道,以便交给fetcher的协程。fetcher.loop()会对notify中的消息进行处理,确认区块并非DOS攻击,然后检查区块的高度,判断该区块是否已经在fetching或者comleting(代表已经下载区块头,在下载body),如果都没有,则加入到announced中,触发0s定时器,进行处理。. B3 w# x4 E2 b: s
关于announced下节再介绍。
' O$ Q* D7 f9 I2 Q) ?
0 O. \2 i( h+ y! M// handleMsg()部分
9 v% \: W* {6 Z1 e6 l1 jcase msg.Code == NewBlockHashesMsg:
' J) L" j5 M/ f# f% d' \* K; g+ Z        var announces newBlockHashesData
- X, M0 N* t8 V3 {, m2 [1 [, n* @        if err := msg.Decode(&announces); err != nil {: G1 {1 [+ w: k5 q) w. ^
                return errResp(ErrDecode, "%v: %v", msg, err)
' X9 V. t* @" D! s3 O4 a        }* S2 C2 k- I. E' ]0 ?3 {7 ~/ X9 d
        // Mark the hashes as present at the remote node
" U8 e; |: f/ h6 l! g        for _, block := range announces {
) q: b3 g% Y4 l. ^- C                p.MarkBlock(block.Hash)- ]9 I- w2 O8 U6 C0 G$ S/ @
        }6 \$ k- l# v; M
        // Schedule all the unknown hashes for retrieval+ a% v6 N- s6 `
        // 把本地链没有的块hash找出来,交给fetcher去下载
2 c/ U9 D" i! p; Z        unknown := make(newBlockHashesData, 0, len(announces))2 d0 {. |" L* j8 e, B
        for _, block := range announces {
3 Z$ d% O+ x, e; _7 q7 ~* g                if !pm.blockchain.HasBlock(block.Hash, block.Number) {
& |6 H0 }$ ]$ T                        unknown = append(unknown, block)2 _$ S; F6 n; \
                }
, x9 q6 p" U& n        }/ z$ W$ u. `* X
        for _, block := range unknown {" P/ ?! o6 G5 G" O& Q
                pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestOneHeader, p.RequestBodies)
+ k3 h# u+ }4 ]; p6 }' z        }- N" y% S4 x" M  v
// Notify announces the fetcher of the potential availability of a new block in
( S+ H+ S) }! {1 o; r% X5 T" R$ N// the network.0 ?8 w0 d; V; k( t; q
// 通知fetcher(自己)有新块产生,没有块实体,有hash、高度等信息
* @. V! I7 V+ }9 Xfunc (f *Fetcher) Notify(peer string, hash common.Hash, number uint64, time time.Time,0 h+ T6 V3 S8 }- e
        headerFetcher headerRequesterFn, bodyFetcher bodyRequesterFn) error {( }2 Y/ @8 l4 U1 I4 Z' C/ O1 B
        block := &announce{
, N% F& r6 A# s2 C4 K                hash:        hash,, t. l  l& K0 i5 _
                number:      number,
8 _. X$ u( K8 O* F                time:        time,1 q" ^; |  D9 g/ _8 {
                origin:      peer,& Z* N: d9 K5 H
                fetchHeader: headerFetcher,
$ p- X( q2 R( p                fetchBodies: bodyFetcher,3 N' I" j+ k8 U8 A5 |, g! l7 s: x
        }
% d. G' T- M; g: c1 l- G" h        select {2 w; P0 g* `* b; f9 b+ {
        case f.notify  hashLimit {, @/ j" |; r1 H7 R  k: G& H# H
                log.Debug("Peer exceeded outstanding announces", "peer", notification.origin, "limit", hashLimit)( p5 ^/ h- W2 o; E
                propAnnounceDOSMeter.Mark(1)
! Y/ p/ q# b$ _: ?; @- c# ?                break) S  p5 X: x* U
        }
# H. ^9 }; n2 v5 m' w, k        // If we have a valid block number, check that it's potentially useful
+ Z" `! l# B5 Z- @2 m        // 高度检查$ g' s, M0 K* Z
        if notification.number > 0 {
5 I( Y- R. j2 T, b) Q  E! U# D3 L6 ^/ e                if dist := int64(notification.number) - int64(f.chainHeight()); dist  maxQueueDist {
. |) m4 E% K( [$ R9 c7 Q9 F9 F$ n                        log.Debug("Peer discarded announcement", "peer", notification.origin, "number", notification.number, "hash", notification.hash, "distance", dist)
/ S6 n$ J2 ~* H8 Y+ N  S; H% D                        propAnnounceDropMeter.Mark(1)
  Z+ _! p' k- M1 B; t                        break
% ?, X5 I; Z% W& p                }8 H1 S9 A* E7 E" [, _2 f2 a
        }
( J5 ~" Y& }0 g        // All is well, schedule the announce if block's not yet downloading# O& o& v$ S; ^3 D4 @0 H! L
        // 检查是否已经在下载,已下载则忽略; B  I( Y% T) j# M: n/ c- ?! @  k( u
        if _, ok := f.fetching[notification.hash]; ok {
0 r. ~/ M7 b6 \. R4 O2 S                break6 t7 i7 Q8 K2 f, `4 J
        }2 [$ d$ e+ f/ a7 b8 o
        if _, ok := f.completing[notification.hash]; ok {- {, x# I7 i/ D
                break
, b, r6 T& U- }5 Q, ]4 T5 s3 B        }
/ `3 _. }. U- t/ [& H7 n2 |8 i        // 更新peer已经通知给我们的区块数量5 U6 a8 h( e; r. _2 P
        f.announces[notification.origin] = count4 Q' d- ]2 ?7 w; ?- Y
        // 把通知信息加入到announced,供调度
' g+ r! _8 i! C        f.announced[notification.hash] = append(f.announced[notification.hash], notification)! n6 P1 L+ Q  \2 a1 B9 Q$ T9 W
        if f.announceChangeHook != nil && len(f.announced[notification.hash]) == 1 {
: o7 Y( p' @- y- q( |                f.announceChangeHook(notification.hash, true)
8 X9 g7 R; L  b3 {7 C8 J        }
/ `; `9 }9 T: ?" b" O/ G        if len(f.announced) == 1 {9 N( [+ p! h6 |) B* ~
                // 有通知放入到announced,则重设0s定时器,loop的另外一个分支会处理这些通知9 W+ e8 S4 t! K& j+ I: [& p' n3 _
                f.rescheduleFetch(fetchTimer)
& f  q0 S' W+ e$ g9 }& n. O        }8 ]* m! g3 ?0 O, w! [  i2 j
fetcher获取完整区块
0 X/ W4 W, |& O/ S% w, ]2 l本节介绍fetcher获取完整区块的过程,这也是fetcher最重要的功能,会涉及到fetcher至少80%的代码。单独拉放一大节吧。$ ^% @% B" }2 V' c! T0 V
Fetcher的大头
. n1 j, Y0 w2 D* T0 ~$ o) KFetcher最主要的功能就是获取完整的区块,然后在合适的实际交给InsertChain去验证和插入到本地区块链。我们还是从宏观入手,看Fetcher是如何工作的,一定要先掌握好宏观,因为代码层面上没有这么清晰。
5 y5 w+ N' E- Q, |+ \宏观* W/ ~: l  A  H. l
首先,看两个节点是如何交互,获取完整区块,使用时序图的方式看一下,见图6,流程很清晰不再文字介绍。' {3 [2 E8 C# B

0 x( n; X' p7 N' A再看下获取区块过程中,fetcher内部的状态转移,它使用状态来记录,要获取的区块在什么阶段,见图7。我稍微解释一下:
/ w8 p: B6 z, v6 j! I5 b收到NewBlockHashesMsg后,相关信息会记录到announced,进入announced状态,代表了本节点接收了消息。announced由fetcher协程处理,经过校验后,会向给他发送消息的Peer发送请求,请求该区块的区块头,然后进入fetching状态。获取区块头后,如果区块头表示没有交易和uncle,则转移到completing状态,并且使用区块头合成完整的区块,加入到queued优先级队列。获取区块头后,如果区块头表示该区块有交易和uncle,则转移到fetched状态,然后发送请求,请求交易和uncle,然后转移到completing状态。收到交易和uncle后,使用头、交易、uncle这3个信息,生成完整的区块,加入到队列queued。
5 ^$ U' G& \. f
! Y6 Q( h4 x% r4 F+ S( Y$ L$ N' ?# s9 k5 u2 k
微观/ ]$ A/ u/ c! v' J4 m, R% t2 u/ N! K
接下来就是从代码角度看如何获取完整区块的流程了,有点多,看不懂的时候,再回顾下上面宏观的介绍图。/ J) s+ ^  i$ D3 _" n
首先看Fetcher的定义,它存放了通信数据和状态管理,捡加注释的看,上文提到的状态,里面都有。
3 Y" q( |# h3 Z5 k$ P# {( {// Fetcher is responsible for accumulating block announcements from various peers
6 w+ D2 y( _, Q% s5 T// and scheduling them for retrieval.& R/ [# q: a0 K
// 积累块通知,然后调度获取这些块0 V, T8 F2 V! {2 L' E( q' q. A
type Fetcher struct {  x7 r5 G7 J& T1 x1 D
        // Various event channels
7 H! L9 u3 \  }4 b    // 收到区块hash值的通道
1 _0 p5 b. H( }: W$ x( I0 w        notify chan *announce+ V8 ?' m) Y) t
    // 收到完整区块的通道1 _  E: Y+ {, b9 e& G  t) \
        inject chan *inject
+ c) i4 z8 W# L5 O3 F2 w        blockFilter chan chan []*types.Block1 Z5 i* K) J( H2 }$ R; j
        // 过滤header的通道的通道% o0 n3 L8 O% V, d) m! ?; ~+ D
        headerFilter chan chan *headerFilterTask; G/ p  I/ R) I4 ]; [6 Q* Y/ V) z7 ^3 ]
        // 过滤body的通道的通道
3 U2 Q3 ]8 `, Q5 ]3 R7 ]1 u        bodyFilter chan chan *bodyFilterTask
( F) o5 x& V$ u+ F( Y7 H) k# F        done chan common.Hash" U0 B( g) X! {( @$ d
        quit chan struct{}
0 z& ^+ a; d) u/ f        // Announce states3 T( {1 \3 U( `, X. E
        // Peer已经给了本节点多少区块头通知7 q% c/ S1 r4 u, r
        announces map[string]int // Per peer announce counts to prevent memory exhaustion
' x) z, c: V+ A5 E/ t1 @        // 已经announced的区块列表
% I3 G% g1 h1 T2 ~- u        announced map[common.Hash][]*announce // Announced blocks, scheduled for fetching
# [" w# }5 o/ \1 Y2 C/ }$ D: ^        // 正在fetching区块头的请求& y7 }+ f$ M, b5 k6 M' f+ v
        fetching map[common.Hash]*announce // Announced blocks, currently fetching& D, h. |! j3 ~7 P8 \( X# Z
        // 已经fetch到区块头,还差body的请求,用来获取body  {% Y2 @$ G; T
        fetched map[common.Hash][]*announce // Blocks with headers fetched, scheduled for body retrieval
* v% d3 |. X% [8 p; }2 u        // 已经得到区块头的3 f5 e* a7 N. P4 L  P. t
        completing map[common.Hash]*announce // Blocks with headers, currently body-completing
- S7 A+ K1 a$ f, W        // Block cache0 u- |; F1 ~  k+ P4 M) t6 b
        // queue,优先级队列,高度做优先级
9 n4 l5 Z% V8 y6 O0 v6 ?        // queues,统计peer通告了多少块% @/ B1 _# g# E6 n! j
        // queued,代表这个块如队列了,
9 a( q6 B8 V9 j% [1 A" n  `        queue  *prque.Prque            // Queue containing the import operations (block number sorted)( Y. f9 L. X9 v1 b
        queues map[string]int          // Per peer block counts to prevent memory exhaustion- e. E* K, Y, B/ X1 V
        queued map[common.Hash]*inject // Set of already queued blocks (to dedupe imports)
: A- r! D4 J" U  X' D6 B        // Callbacks  V/ c$ O/ H6 Y% d/ h0 O+ J
        getBlock       blockRetrievalFn   // Retrieves a block from the local chain
) r% b7 T/ J; N9 y        verifyHeader   headerVerifierFn   // Checks if a block's headers have a valid proof of work,验证区块头,包含了PoW验证
+ Y2 A; G2 ^% E; n) a5 v" c: K        broadcastBlock blockBroadcasterFn // Broadcasts a block to connected peers,广播给peer- }2 T1 i1 k, @2 ]! z5 k( J
        chainHeight    chainHeightFn      // Retrieves the current chain's height4 O* B  W0 J2 p- {0 M2 J: ]( S
        insertChain    chainInsertFn      // Injects a batch of blocks into the chain,插入区块到链的函数& x$ l& w+ Y1 e
        dropPeer       peerDropFn         // Drops a peer for misbehaving
9 X7 Q* r! i0 o        // Testing hooks
2 Y8 @$ H; t% j; m        announceChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a hash from the announce list0 j* t5 ?/ e4 T! S2 |8 x2 h3 t
        queueChangeHook    func(common.Hash, bool) // Method to call upon adding or deleting a block from the import queue+ _: |* o% N. B7 u
        fetchingHook       func([]common.Hash)     // Method to call upon starting a block (eth/61) or header (eth/62) fetch
% Z/ I2 _" d7 S+ G6 u        completingHook     func([]common.Hash)     // Method to call upon starting a block body fetch (eth/62)
" R  ?9 E1 [2 l& k$ V        importedHook       func(*types.Block)      // Method to call upon successful block import (both eth/61 and eth/62)" b8 ~0 t, R, `' K# _
}
! i7 s. |9 J( w3 R+ zNewBlockHashesMsg消息的处理前面的小节已经讲过了,不记得可向前翻看。这里从announced的状态处理说起。loop()        中,fetchTimer超时后,代表了收到了消息通知,需要处理,会从announced中选择出需要处理的通知,然后创建请求,请求区块头,由于可能有很多节点都通知了它某个区块的Hash,所以随机的从这些发送消息的Peer中选择一个Peer,发送请求的时候,为每个Peer都创建了单独的协程。
8 N8 V: K( k$ F$ [5 B: Z- ?8 h9 lcase  arriveTimeout-gatherSlack {* U: `/ t" L# [7 Y2 }
                        // Pick a random peer to retrieve from, reset all others4 w4 {" L' P5 [: }
                        // 可能有很多peer都发送了这个区块的hash值,随机选择一个peer+ v' K- S9 X% K+ w% y5 O" [
                        announce := announces[rand.Intn(len(announces))]
: n1 _$ ?$ K0 E$ \& s* @0 j                        f.forgetHash(hash)
  T) x4 r; [' q: g, R: q. r0 s+ M                        // If the block still didn't arrive, queue for fetching
( P9 O! ^+ V% O: k; Z& P& Q$ `' u3 `                        // 本地还没有这个区块,创建获取区块的请求' |, k2 d) k3 \; a' S
                        if f.getBlock(hash) == nil {/ U2 {3 W2 z' V$ }
                                request[announce.origin] = append(request[announce.origin], hash)
) h6 q. w5 V/ r                                f.fetching[hash] = announce
" _* i) \# E3 I& q: y) |7 ~4 a                        }. s3 T" d+ ?' n7 l6 {. ?: ~5 [4 E
                }% x! F. h, |2 p7 C+ s8 ?7 l
        }6 y# F6 W4 z, Y
        // Send out all block header requests
0 V! {" [. F+ W2 ]' q+ n        // 把所有的request发送出去
0 @0 Q  v# W+ p  P        // 为每一个peer都创建一个协程,然后请求所有需要从该peer获取的请求9 Y% P7 V: h$ M3 p6 n9 O( i
        for peer, hashes := range request {
5 \5 g4 _1 M+ ?' x, i                log.Trace("Fetching scheduled headers", "peer", peer, "list", hashes)" m4 {$ B$ J* y7 x' R: G
                // Create a closure of the fetch and schedule in on a new thread
  Y# a, e9 c: \, f$ R( P                fetchHeader, hashes := f.fetching[hashes[0]].fetchHeader, hashes
( [5 W4 Q8 B; ~4 R7 ]5 p: P! ^                go func() {! j; h9 v" O1 ?) v/ a
                        if f.fetchingHook != nil {
: r! _, M2 Y" ]( ^8 J) w                                f.fetchingHook(hashes)0 R. i0 L7 K9 u  S6 l9 j5 l
                        }4 Y' }( P: x2 k- l
                        for _, hash := range hashes {
6 ~1 U* \/ a4 M" l3 m- ]                                headerFetchMeter.Mark(1)
+ U- p5 o3 w, Y2 M6 D+ h                                fetchHeader(hash) // Suboptimal, but protocol doesn't allow batch header retrievals  U" p- S; w& S! w. ^) Z/ p
                        }- b6 a% N0 F3 {. ?9 g) V+ A
                }()
' v* @' q8 P6 O        }; N: C1 J* L& z
        // Schedule the next fetch if blocks are still pending4 |7 v" |3 n" n% U5 `0 T
        f.rescheduleFetch(fetchTimer)( P' J) r# g5 o  g/ p
从Notify的调用中,可以看出,fetcherHeader()的实际函数是RequestOneHeader(),该函数使用的消息是GetBlockHeadersMsg,可以用来请求多个区块头,不过fetcher只请求一个。7 @3 W6 C. \8 R- Y: T7 \* w0 _
pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestOneHeader, p.RequestBodies)6 h1 ?- n" s& Q4 g* j
// RequestOneHeader is a wrapper around the header query functions to fetch a: H* L% T# L7 w. \% D
// single header. It is used solely by the fetcher.
7 o6 F  W% C7 x+ N1 `% e: w$ lfunc (p *peer) RequestOneHeader(hash common.Hash) error {% V0 J2 u4 y0 F: z4 c) i
        p.Log().Debug("Fetching single header", "hash", hash)' j  ~5 K4 t: V& {+ e( T
        return p2p.Send(p.rw, GetBlockHeadersMsg, &getBlockHeadersData{Origin: hashOrNumber{Hash: hash}, Amount: uint64(1), Skip: uint64(0), Reverse: false})% w$ T+ H1 Q$ R1 K8 q4 J
}
3 f% B5 m3 O. f1 G% p* ZGetBlockHeadersMsg的处理如下:因为它是获取多个区块头的,所以处理起来比较“麻烦”,还好,fetcher只获取一个区块头,其处理在20行~33行,获取下一个区块头的处理逻辑,这里就不看了,最后调用SendBlockHeaders()将区块头发送给请求的节点,消息是BlockHeadersMsg。
) q  T3 L- G/ `; b- {6 @+ c···4 c. J, b3 P% Q' ^) P) S
// handleMsg()
+ p0 \6 ]5 m8 n/ q6 l- `3 [// Block header query, collect the requested headers and reply. V3 G8 M: f- i
case msg.Code == GetBlockHeadersMsg:) h. s) Y; s" C' l, P, c) F. N1 a
// Decode the complex header query
8 c/ V4 }/ N6 B, S, E) }var query getBlockHeadersData
/ ~9 ?+ y6 g* g( G4 w1 O+ ^& xif err := msg.Decode(&query); err != nil {
1 f# a" ^, I- p! hreturn errResp(ErrDecode, “%v: %v”, msg, err)0 o- w) X1 A4 T
}
; \/ W, \3 T$ @  ~hashMode := query.Origin.Hash != (common.Hash{})
: n+ Z, U" c$ n: o' z// Gather headers until the fetch or network limits is reached
, U3 r! k7 d8 A// 收集区块头,直到达到限制& A, F. ?% G" i- ]+ ~; p& Q
var (+ ^4 c; i: K3 J* C" n6 F2 p2 N$ P- q
        bytes   common.StorageSize
) F* M0 p, k3 s" Q' c        headers []*types.Header
# c1 @4 x( ]; T1 v' w5 f        unknown bool, ]7 N* {1 M( X2 {3 m3 I0 f
)! b: k' K! N: O
// 自己已知区块 && 少于查询的数量 && 大小小于2MB && 小于能下载的最大数量
: R3 d) A7 l# \" u' M5 c& Cfor !unknown && len(headers) ! C5 ~% B) L- X7 h
`BlockHeadersMsg`的处理很有意思,因为`GetBlockHeadersMsg`并不是fetcher独占的消息,downloader也可以调用,所以,响应消息的处理需要分辨出是fetcher请求的,还是downloader请求的。它的处理逻辑是:fetcher先过滤收到的区块头,如果fetcher不要的,那就是downloader的,在调用`fetcher.FilterHeaders`的时候,fetcher就将自己要的区块头拿走了。; q0 S1 n+ F, }! ~
// handleMsg()
  D# p) S0 F- _) `) D2 J5 Scase msg.Code == BlockHeadersMsg:3 {5 J5 {: T4 M
// A batch of headers arrived to one of our previous requests' N) p+ a; u$ t- w2 b- m& A+ g2 x
var headers []*types.Header8 v. G) r, ?# {6 w; k
if err := msg.Decode(&headers); err != nil {
+ r) I% h2 W  ~4 Greturn errResp(ErrDecode, “msg %v: %v”, msg, err)9 `  ~" U  g' d/ y, F
}% L* h8 j) }) M- E$ g, [' Q! V
// If no headers were received, but we’re expending a DAO fork check, maybe it’s that
3 g5 f5 v" g3 i5 _// 检查是不是当前DAO的硬分叉1 T, U4 L( p4 e
if len(headers) == 0 && p.forkDrop != nil {* K; w! O8 v1 N4 G2 N
// Possibly an empty reply to the fork header checks, sanity check TDs
. a, F) D0 u8 t- t- dverifyDAO := true
5 l3 C# U1 N. Q        // If we already have a DAO header, we can check the peer's TD against it. If* Y' p* }( X6 B9 w- w  a9 ?9 L7 `
        // the peer's ahead of this, it too must have a reply to the DAO check
5 f* p  J1 H$ ^, _" d& J5 B        if daoHeader := pm.blockchain.GetHeaderByNumber(pm.chainconfig.DAOForkBlock.Uint64()); daoHeader != nil {
* U: ^8 S! k6 I* R1 C% X5 j& B5 G                if _, td := p.Head(); td.Cmp(pm.blockchain.GetTd(daoHeader.Hash(), daoHeader.Number.Uint64())) >= 0 {
2 n2 _4 q$ s$ Q7 _) \4 F8 b) p* \) H                        verifyDAO = false
' W, d0 Z  U; S! D- \                }4 a1 j) q' ]9 [: k) U# _: g% e
        }
, ?! k" Y; k: F1 x1 B( \        // If we're seemingly on the same chain, disable the drop timer6 H  X8 R* Z, e- s% D# n* ^4 b
        if verifyDAO {
2 F( y& H' K! q# B* E                p.Log().Debug("Seems to be on the same side of the DAO fork")
9 l9 E' r$ I  `# P                p.forkDrop.Stop()
5 n4 S. U# ~3 H                p.forkDrop = nil" }5 }; {5 L0 t0 M
                return nil
7 e+ @3 A; h# S        }1 W0 Z: p* Y$ p; m: J
}
! V; H( F1 ?* D' [// Filter out any explicitly requested headers, deliver the rest to the downloader
. b; M. V2 y. E9 `# m- X// 过滤是不是fetcher请求的区块头,去掉fetcher请求的区块头再交给downloader
: C$ y8 S. ~2 `2 h! f: @8 @# M( Wfilter := len(headers) == 1' B% B, D6 ]) N% _; \# d5 d) W) |
if filter {. v; ~- W0 v- D% e# c
        // If it's a potential DAO fork check, validate against the rules, ^; m7 [; N" E4 e, d4 F8 K6 v
        // 检查是否硬分叉) c0 O: [& t7 \( g! ]: g
        if p.forkDrop != nil && pm.chainconfig.DAOForkBlock.Cmp(headers[0].Number) == 0 {
& {2 ^9 Y3 A# c5 W$ p3 s                // Disable the fork drop timer. v' ]/ K' C, I- W
                p.forkDrop.Stop()
1 y8 N1 ~) z% L1 k8 q                p.forkDrop = nil
' B6 Y8 X( x( `: }; E; U                // Validate the header and either drop the peer or continue3 k* `" k' f- ]; |5 P# W
                if err := misc.VerifyDAOHeaderExtraData(pm.chainconfig, headers[0]); err != nil {% d2 P3 \# E5 M/ v  L% b* }
                        p.Log().Debug("Verified to be on the other side of the DAO fork, dropping")
, `& ]1 D( v7 {$ q9 L0 Q  \) M                        return err
; D5 _' t2 z4 ]9 |$ q2 I. [1 u4 z                }5 J% g+ l6 i2 A5 V+ d
                p.Log().Debug("Verified to be on the same side of the DAO fork"); h6 T( M- i+ Y7 P. g: d- d. w8 _
                return nil4 P' ~$ J; u0 r
        }
- ?  I4 S# i  ?" r( b% C        // Irrelevant of the fork checks, send the header to the fetcher just in case
! t: L7 p1 X5 v0 [) o/ a2 n+ J        // 使用fetcher过滤区块头
0 q: R- S/ @) K3 G        headers = pm.fetcher.FilterHeaders(p.id, headers, time.Now())
6 N. Q5 M7 Z: R4 d/ R}
1 \* O4 K" d# {- D( z0 u& R6 y// 剩下的区块头交给downloader
' v5 \3 _2 ]' i, C3 c# Wif len(headers) > 0 || !filter {
' |$ I3 M6 j, f# l$ [1 U6 \4 Q        err := pm.downloader.DeliverHeaders(p.id, headers)
# V% R' |$ |  G( i' t$ G        if err != nil {) l5 `+ ^: Z! y! ]& R
                log.Debug("Failed to deliver headers", "err", err)
' Z# h8 E2 @% M; A% Q        }4 x( ]1 R  c6 P; F4 U! d
}
( U1 l; r7 N3 c9 S" R, K`FilterHeaders()`是一个很有大智慧的函数,看起来耐人寻味,但实在妙。它要把所有的区块头,都传递给fetcher协程,还要获取fetcher协程处理后的结果。`fetcher.headerFilter`是存放通道的通道,而`filter`是存放包含区块头过滤任务的通道。它先把`filter`传递给了`headerFilter`,这样`fetcher`协程就在另外一段等待了,而后将`headerFilterTask`传入`filter`,`fetcher`就能读到数据了,处理后,再将数据写回`filter`而刚好被`FilterHeaders`函数处理了,该函数实际运行在`handleMsg()`的协程中。' V7 O  g0 W  M$ `" ^9 r: R" _
每个Peer都会分配一个ProtocolManager然后处理该Peer的消息,但`fetcher`只有一个事件处理协程,如果不创建一个`filter`,fetcher哪知道是谁发给它的区块头呢?过滤之后,该如何发回去呢?
0 o  t. D, `& q) q6 `' N// FilterHeaders extracts all the headers that were explicitly requested by the fetcher,
# e* x6 h! N0 D( N9 x) N9 ]// returning those that should be handled differently.' T/ D, J7 g% i4 E1 J& l
// 寻找出fetcher请求的区块头
# M1 L  L/ [, Z. c4 }: j, c. gfunc (f *Fetcher) FilterHeaders(peer string, headers []*types.Header, time time.Time) []*types.Header {
5 _4 n3 z$ _  M9 Jlog.Trace(“Filtering headers”, “peer”, peer, “headers”, len(headers))
+ W& ]  I! U! a2 Y* v// Send the filter channel to the fetcher2 n. q1 o9 `; W( x0 a
// 任务通道
9 s7 _2 ^" |1 \. q0 cfilter := make(chan *headerFilterTask)
+ G% Y& A* z" Q+ Qselect {
& C: W! z3 B5 b( `& c9 K  `# u% x// 任务通道发送到这个通道
5 i# {3 g8 w: wcase f.headerFilter * y- C. B+ H1 k
}& Q1 P* [, \* |5 d* v
接下来要看f.headerFilter的处理,这段代码有90行,它做了一下几件事:
! r( [% c  D5 f3 D9 m% u. n1. 从`f.headerFilter`取出`filter`,然后取出过滤任务`task`。9 }% y/ q* h1 p* `7 w
2. 它把区块头分成3类:`unknown`这不是分是要返回给调用者的,即`handleMsg()`, `incomplete`存放还需要获取body的区块头,`complete`存放只包含区块头的区块。遍历所有的区块头,填到到对应的分类中,具体的判断可看18行的注释,记住宏观中将的状态转移图。; O8 L- J# W. J6 V* \
3. 把`unknonw`中的区块返回给`handleMsg()`。, f2 F) r. Q; |+ E  U3 L3 U
4. 把` incomplete`的区块头获取状态移动到`fetched`状态,然后触发定时器,以便去处理complete的区块。& t% g0 c% O: g" M- ?
5. 把`compelete`的区块加入到`queued`。
+ F5 X' ?0 g% n, B3 g1 _+ u// fetcher.loop()* `5 M% ^  B: @/ B
case filter :=
+ {4 ]% D: Z  f, [# Y// Split the batch of headers into unknown ones (to return to the caller),
* _% Q: b4 D% `) |// known incomplete ones (requiring body retrievals) and completed blocks.
2 x: U/ ?  @# g7 T9 T// unknown的不是fetcher请求的,complete放没有交易和uncle的区块,有头就够了,incomplete放
. f  b$ {. I" G1 C% F, S8 G5 X// 还需要获取uncle和交易的区块; D0 E; e1 \+ O9 o. P
unknown, incomplete, complete := []*types.Header{}, []*announce{}, []*types.Block{}6 S  l/ N( g- Y
// 遍历所有收到的header8 S8 N- c4 {- a; C0 V
for _, header := range task.headers {# M4 K: V( _) u8 G2 h0 ^1 `; F
        hash := header.Hash()
5 m2 s/ ]: v8 z$ |. O+ \        // Filter fetcher-requested headers from other synchronisation algorithms; F. m0 W0 a1 O( i! M2 d( V
        // 是正在获取的hash,并且对应请求的peer,并且未fetched,未completing,未queued
4 M. u# }& _+ C* e7 q        if announce := f.fetching[hash]; announce != nil && announce.origin == task.peer && f.fetched[hash] == nil && f.completing[hash] == nil && f.queued[hash] == nil {; _+ O/ Y/ X6 {2 K' ~0 l
                // If the delivered header does not match the promised number, drop the announcer- d/ [: p/ b7 e3 l4 Q4 }( I, G& o/ T
                // 高度校验,竟然不匹配,扰乱秩序,peer肯定是坏蛋。
' ]7 ]1 D! g" C! I                if header.Number.Uint64() != announce.number {
+ J8 a; h( T& n- A& z                        log.Trace("Invalid block number fetched", "peer", announce.origin, "hash", header.Hash(), "announced", announce.number, "provided", header.Number)  F. T. C2 O2 y+ ]0 h
                        f.dropPeer(announce.origin)+ f% G2 U4 h# C' s: o! t
                        f.forgetHash(hash)
7 L1 B" T- d. C9 {% A                        continue- m3 k3 T) J' `) g% I( N0 b
                }4 \& i. ~' G( E0 m# ~: T) B( L
                // Only keep if not imported by other means
* p* L9 P4 _1 e8 \- V( t                // 本地链没有当前区块) t5 j6 K" y) V& \/ s' F
                if f.getBlock(hash) == nil {7 t5 f* c& w; S, i( r% q
                        announce.header = header
( M" w0 ^! L3 _; {                        announce.time = task.time
) `) E; k4 ]/ i( x                        // If the block is empty (header only), short circuit into the final import queue; L6 j2 r% y# }) p% E. }! @
                        // 如果区块没有交易和uncle,加入到complete
! a& D$ k* o, k/ ~                        if header.TxHash == types.DeriveSha(types.Transactions{}) && header.UncleHash == types.CalcUncleHash([]*types.Header{}) {) |9 M3 A  X* n7 O: x3 S4 J' P
                                log.Trace("Block empty, skipping body retrieval", "peer", announce.origin, "number", header.Number, "hash", header.Hash())
3 v- M; J. E, U! k) m                                block := types.NewBlockWithHeader(header)' L$ D' T. O, A2 K
                                block.ReceivedAt = task.time4 _1 |6 t) H8 a6 L; {. Q; _
                                complete = append(complete, block)% j' b9 C) [/ g" r6 z
                                f.completing[hash] = announce& c3 b2 A5 W$ `& y& v
                                continue5 a! E# B5 I/ o/ T
                        }
& q& X0 E3 [- A! c" @9 \                        // Otherwise add to the list of blocks needing completion: T5 s" V. w) K7 O  m
                        // 否则就是不完整的区块
! f& h# |% G+ U/ T& W4 @8 A" V                        incomplete = append(incomplete, announce)" |: G1 Z) c7 d5 ]1 |6 r
                } else {1 {1 T. E$ B  Q0 F7 @: F
                        log.Trace("Block already imported, discarding header", "peer", announce.origin, "number", header.Number, "hash", header.Hash())
$ D. H; U" Q9 c5 R0 k                        f.forgetHash(hash)
. n! o. G+ D$ V5 N0 l* i                }7 L7 Z: u2 j" R  h+ V
        } else {+ I- ?  A1 B7 v2 D( h
                // Fetcher doesn't know about it, add to the return list2 E6 m) _! c& {* u
                // 没请求过的header1 K$ b4 L% b$ s: J) v( v4 P
                unknown = append(unknown, header)7 m4 @, Q  r6 T( G' W
        }+ g1 l8 [& l0 g( e8 g* |
}
" s. y$ b4 U6 H) A# ^1 I6 F// 把未知的区块头,再传递会filter
. S" U6 T- {  Q) E  B- O! QheaderFilterOutMeter.Mark(int64(len(unknown)))9 Q( F, q& h: g) }0 A
select {; k; r* d; ^. c
case filter
1 t5 s* t' F# B跟随状态图的转义,剩下的工作是`fetched`转移到`completing`        ,上面的流程已经触发了`completeTimer`定时器,超时后就会处理,流程与请求Header类似,不再赘述,此时发送的请求消息是`GetBlockBodiesMsg`,实际调的函数是`RequestBodies`。1 C8 L8 v* Z  E% q5 ], R. a$ A+ R& t
// fetcher.loop()" N5 _5 ~/ R+ v1 h( \
case
- h, W' E$ L  f* ~5 `* f// 遍历所有待获取body的announce
' ]$ T& C% F6 q4 ffor hash, announces := range f.fetched {
6 A, q7 I& `% M5 E# m! E        // Pick a random peer to retrieve from, reset all others
  x( K3 U- t: z) ^7 ]        // 随机选一个Peer发送请求,因为可能已经有很多Peer通知它这个区块了8 m1 B, n7 o* |* x% B( M$ F
        announce := announces[rand.Intn(len(announces))]
6 u- T- E$ e; G! o2 d! B7 ?        f.forgetHash(hash)
7 t5 m- ~+ D5 D' Z0 y5 U( L        // If the block still didn't arrive, queue for completion, q: F+ v* {4 n/ E! c
        // 如果本地没有这个区块,则放入到completing,创建请求
! a/ L' s1 }: S( B        if f.getBlock(hash) == nil {
: j3 \- r- }4 p) T! T                request[announce.origin] = append(request[announce.origin], hash)- @: |! i3 u; S8 S' k
                f.completing[hash] = announce, }, @' d4 `$ H" o9 G3 M
        }, T' _  Z% c( e7 e3 U
}
# h( N& D5 k' e; a// Send out all block body requests
8 r8 N# ?" x' a7 O1 X: M// 发送所有的请求,获取body,依然是每个peer一个单独协程) @6 l$ e% Y, Q4 ~
for peer, hashes := range request {
4 b  |; X5 w: G. k3 e        log.Trace("Fetching scheduled bodies", "peer", peer, "list", hashes)
  I$ _+ `4 J: K. d6 O6 m8 y% V" u        // Create a closure of the fetch and schedule in on a new thread, v* ~. Y: o+ r) h" {
        if f.completingHook != nil {, @) K0 R$ p* K2 q- N
                f.completingHook(hashes)
% ]. k- h1 j- P# Q2 H: m8 ]+ R: V        }; [+ y4 }2 x' Q4 u1 f
        bodyFetchMeter.Mark(int64(len(hashes)))' i8 f1 ]9 W' [1 d
        go f.completing[hashes[0]].fetchBodies(hashes)
! |' D, w; s& |}
- O* W. x  k; {4 U5 x// Schedule the next fetch if blocks are still pending& d8 K7 A' y7 O
f.rescheduleComplete(completeTimer)
; C: A: x- v' P, S: E`handleMsg()`处理该消息也是干净利落,直接获取RLP格式的body,然后发送响应消息。
+ o0 y, S- G/ l// handleMsg()  j2 B  M( ~3 |& ~
case msg.Code == GetBlockBodiesMsg:
1 w" i2 H* F4 @2 u// Decode the retrieval message7 C# ]5 W/ x8 L8 Q/ D4 t7 J
msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
/ e- ?2 f; M1 O6 \8 Fif _, err := msgStream.List(); err != nil {
  U' p2 ]% v; b1 E$ Z' mreturn err
! f6 s/ x2 S- c8 [/ T7 v}
9 ~" e) y+ s# f// Gather blocks until the fetch or network limits is reached
! b7 S! M! J0 Lvar (% @8 U( @6 b; q3 t" C% P3 n
hash   common.Hash! V9 [+ v) v; [8 S1 ^, m3 U
bytes  int- ?/ x- W% j$ v# U0 T9 E
bodies []rlp.RawValue
8 U- y6 q$ s3 a( Q& |, u% H)6 d( V% u) Z0 M4 `  W" u4 P. N
// 遍历所有请求9 T+ x! {' D- }, X
for bytes
1 b5 s; W% O/ r( M; M响应消息`BlockBodiesMsg`的处理与处理获取header的处理原理相同,先交给fetcher过滤,然后剩下的才是downloader的。需要注意一点,响应消息里只包含交易列表和叔块列表。
2 ]1 a5 w# W. k( Q4 q9 C: @// handleMsg()* ]) H3 [; z0 c
case msg.Code == BlockBodiesMsg:# E* q" c* }0 i: T/ |4 c
// A batch of block bodies arrived to one of our previous requests( v  \, k0 n* D0 h5 k5 Q
var request blockBodiesData) Q0 m% h: S* P$ T
if err := msg.Decode(&request); err != nil {; S2 u" f/ c$ h7 l( v+ }' D( p
return errResp(ErrDecode, “msg %v: %v”, msg, err)! I$ ~/ J' W) ]; Q2 g4 V
}8 ~4 Z5 \' w# _9 E* f; i
// Deliver them all to the downloader for queuing$ O! ?* [! A4 L( E& ?, I5 o3 Y
// 传递给downloader去处理
; ^. ?$ E4 E" w0 ~* Ttransactions := make([][]*types.Transaction, len(request))
% j1 F- v' h. A( V; c* O1 y0 Tuncles := make([][]*types.Header, len(request))
# V0 w9 p6 Q; n- Yfor i, body := range request {
' e# m( O' {. D- e9 P8 A. h        transactions = body.Transactions* F2 v% z3 {6 D
        uncles = body.Uncles
  o3 ]- E" S9 N5 {" b; C}
. h, X% U0 ?' @; z( p; `// Filter out any explicitly requested bodies, deliver the rest to the downloader
3 E1 R/ v# ~, }; s2 G) ^// 先让fetcher过滤去fetcher请求的body,剩下的给downloader7 g( k, [# y  x  k+ f  z' G
filter := len(transactions) > 0 || len(uncles) > 0# W/ c3 r5 c- }  U& ^
if filter {' u! w: C: B8 l& e7 h" T! f9 ?
        transactions, uncles = pm.fetcher.FilterBodies(p.id, transactions, uncles, time.Now())
- Z( P0 a; r3 m/ u( P! m}
7 b( `) \2 k5 i+ p// 剩下的body交给downloader
7 L! F6 ]7 x) H+ V% N* pif len(transactions) > 0 || len(uncles) > 0 || !filter {, `, t, c, V/ Z7 ~4 Q1 I
        err := pm.downloader.DeliverBodies(p.id, transactions, uncles)
4 `& P8 m# G( L        if err != nil {
0 u) h  o2 B, ^4 `/ O                log.Debug("Failed to deliver bodies", "err", err), W: b7 r/ Y$ t
        }) q0 f! b: x# W$ J( R
}
3 _+ A% Z5 \0 \+ d5 [过滤函数的原理也与Header相同。
' u9 D4 X3 c3 ]// FilterBodies extracts all the block bodies that were explicitly requested by1 a3 x( |4 j4 b( Q1 V( F8 t
// the fetcher, returning those that should be handled differently.
2 L/ J' L/ c+ p% r3 A// 过去出fetcher请求的body,返回它没有处理的,过程类型header的处理" J5 K! ]4 I: J0 [
func (f *Fetcher) FilterBodies(peer string, transactions [][]*types.Transaction, uncles [][]*types.Header, time time.Time) ([][]*types.Transaction, [][]*types.Header) {. b0 M- k: ?1 A
log.Trace(“Filtering bodies”, “peer”, peer, “txs”, len(transactions), “uncles”, len(uncles))6 h2 c' }0 c6 P( @# ]0 _' _
// Send the filter channel to the fetcher
# |# Z) Z" K. B1 ]8 p8 B2 Xfilter := make(chan *bodyFilterTask)2 @# p3 j2 ^) f  a: B/ L2 x
select {
; _7 a4 ^# X/ i7 _case f.bodyFilter
  Y- m9 h) w3 ?8 T4 t$ H! O}& y0 O- T9 `8 V2 Q0 q7 {( P" }9 S
实际过滤body的处理瞧一下,这和Header的处理是不同的。直接看不点:0 F& Y! F( c- I" e% l9 ~) I# m* K4 h/ W
1. 它要的区块,单独取出来存到`blocks`中,它不要的继续留在`task`中。
+ o5 p% h6 q$ l- t# V$ I! g2. 判断是不是fetcher请求的方法:如果交易列表和叔块列表计算出的hash值与区块头中的一样,并且消息来自请求的Peer,则就是fetcher请求的。
6 c) d. A' [. H' W3. 将`blocks`中的区块加入到`queued`,终结。
% u6 O6 M( U$ Ecase filter :=
+ s: r3 b" D* I5 J+ h0 D# C) Pblocks := []*types.Block{}
# v( I' G$ E; _0 G; D+ A" ~// 获取的每个body的txs列表和uncle列表; v! _1 T8 F4 M. r9 H& V
// 遍历每个区块的txs列表和uncle列表,计算hash后判断是否是当前fetcher请求的body3 v3 m0 i& O6 q# N
for i := 0; i
/ g/ V# C: s$ Y# ~}
# ^- L' a8 h' ~2 r3 ~- w0 @: p
至此,fetcher获取完整区块的流程讲完了,fetcher模块中80%的代码也都贴出来了,还有2个值得看看的函数:
: x( g" x3 j. T/ \1. `forgetHash(hash common.Hash)`:用于清空指定hash指的记/状态录信息。
5 j0 J5 i- [, J2. `forgetBlock(hash common.Hash)`:用于从队列中移除一个区块。
. c/ k- [4 R" F& W7 g3 m/ e最后了,再回到开始看看fetcher模块和新区块的传播流程,有没有豁然开朗。( a$ N6 R7 G* M& P$ z* {) y
& z; i0 |* H' N5 G6 R
BitMere.com 比特池塘系信息发布平台,比特池塘仅提供信息存储空间服务。
声明:该文观点仅代表作者本人,本文不代表比特池塘立场,且不构成建议,请谨慎对待。
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

成为第一个吐槽的人

刘艳琴 小学生
  • 粉丝

    0

  • 关注

    0

  • 主题

    3