Hi 游客

更多精彩,请登录!

比特池塘 区块链技术 正文

以太坊源码分析:fetcher模块和区块传播

刘艳琴
229 0 0
从区块传播策略入手,介绍新区块是如何传播到远端节点,以及新区块加入到远端节点本地链的过程,同时会介绍fetcher模块,fetcher的功能是处理Peer通知的区块信息。在介绍过程中,还会涉及到p2p,eth等模块,不会专门介绍,而是专注区块的传播和加入区块链的过程。+ b! `) e) {. Q/ C, C) ?& K
当前代码是以太坊Release 1.8,如果版本不同,代码上可能存在差异。6 F+ _2 I' C' \7 Q. h8 v
总体过程和传播策略. H# ?) V* b* O9 @, Y: _, ~
本节从宏观角度介绍,节点产生区块后,为了传播给远端节点做了啥,远端节点收到区块后又做了什么,每个节点都连接了很多Peer,它传播的策略是什么样的?9 o. z: v+ q0 v5 `( Q1 u
总体流程和策略可以总结为,传播给远端Peer节点,Peer验证区块无误后,加入到本地区块链,继续传播新区块信息。具体过程如下。! a% B( m7 f# X/ w" H/ u( u" w
先看总体过程。产生区块后,miner模块会发布一个事件NewMinedBlockEvent,订阅事件的协程收到事件后,就会把新区块的消息,广播给它所有的peer,peer收到消息后,会交给自己的fetcher模块处理,fetcher进行基本的验证后,区块没问题,发现这个区块就是本地链需要的下一个区块,则交给blockChain进一步进行完整的验证,这个过程会执行区块所有的交易,无误后把区块加入到本地链,写入数据库,这个过程就是下面的流程图,图1。- v% o6 F; B* ?. m4 ]: {
  b( \- x) i# z/ l/ _! i3 O
总体流程图,能看到有个分叉,是因为节点传播新区块是有策略的。它的传播策略为:
! c- D  v- u" R  {& i假如节点连接了N个Peer,它只向Peer列表的sqrt(N)个Peer广播完整的区块消息。向所有的Peer广播只包含区块Hash的消息。* h" q1 r5 W5 \. f
策略图的效果如图2,红色节点将区块传播给黄色节点:& ~. ]- ?5 e" w; f, n9 i
8 L( p( e: s5 w5 f

3 L# K' m* i# |收到区块Hash的节点,需要从发送给它消息的Peer那里获取对应的完整区块,获取区块后就会按照图1的流程,加入到fetcher队列,最终插入本地区块链后,将区块的Hash值广播给和它相连,但还不知道这个区块的Peer。非产生区块节点的策略图,如图3,黄色节点将区块Hash传播给青色节点:
: [. ~; O2 c) n8 K* E! r0 L
5 S6 p% }1 h0 f$ H至此,可以看出以太坊采用以石击水的方式,像水纹一样,层层扩散新产生的区块。9 [+ X, M- W, A3 G; _. H, }
Fetcher模块是干啥的! W4 G2 Z  I- \# L& T
fetcher模块的功能,就是收集其他Peer通知它的区块信息:1)完整的区块2)区块Hash消息。根据通知的消息,获取完整的区块,然后传递给eth模块把区块插入区块链。" s3 T! c9 n2 V, M
如果是完整区块,就可以传递给eth插入区块,如果只有区块Hash,则需要从其他的Peer获取此完整的区块,然后再传递给eth插入区块
8 w' B$ }, W# S8 q! g! @, C& B3 i6 @$ }& m% K
源码解读3 S  n: t8 ~5 N
本节介绍区块传播和处理的细节东西,方式仍然是先用图解释流程,再是代码流程。* Y" W" C" Y+ a4 u  |5 L; |1 w
产块节点的传播新区块" L2 c& ^( D/ x" M
节点产生区块后,广播的流程可以表示为图4:
% c+ ]' U" o+ ^# q, V  u1 {! {发布事件事件处理函数选择要广播完整的Peer,然后将区块加入到它们的队列事件处理函数把区块Hash添加到所有Peer的另外一个通知队列每个Peer的广播处理函数,会遍历它的待广播区块队列和通知队列,把数据封装成消息,调用P2P接口发送出去
2 C, V0 t2 m, B* J5 E6 V; A! J2 k+ r: K+ h

1 R  T  i1 m7 ?. u- A再看下代码上的细节。+ h2 Z4 O( V9 o8 I! z" {( K% D* q; T
worker.wait()函数发布事件NewMinedBlockEvent。ProtocolManager.minedBroadcastLoop()是事件处理函数。它调用了2次pm.BroadcastBlock()。
! u! p! U3 L7 ?* R/ ?( C" q9 w$ i
/ Q# Q  y# b8 }" I7 B3 M8 q) P// Mined broadcast loop: a# y% H+ {/ q$ @  |! ^% p' `8 y
func (pm *ProtocolManager) minedBroadcastLoop() {
; O: B4 u; C  B8 m2 {        // automatically stops if unsubscribe
6 _/ C, v$ M! O8 y* ~8 C; J! r        for obj := range pm.minedBlockSub.Chan() {4 W4 [/ o: R- _& I* k+ K) b% a
                switch ev := obj.Data.(type) {: R' X( g3 Z6 x$ J- [6 A) F- V% v
                case core.NewMinedBlockEvent:
5 [  `/ x5 M5 }5 N: v) T! k2 i3 T, J! [1 e                        pm.BroadcastBlock(ev.Block, true)  // First propagate block to peers
- `2 B9 U# X5 _- A& t                        pm.BroadcastBlock(ev.Block, false) // Only then announce to the rest
/ ^$ {: a+ g% l: u& p                }
9 _/ W. ^( u3 ?        }
- r( u- n! ?9 `0 q: _3 l1 m}
( U6 t3 n, Q3 e% P6 ]3 Ypm.BroadcastBlock()的入参propagate为真时,向部分Peer广播完整的区块,调用peer.AsyncSendNewBlock(),否则向所有Peer广播区块头,调用peer.AsyncSendNewBlockHash(),这2个函数就是把数据放入队列,此处不再放代码。) a! [0 D2 f, h: x3 s
// BroadcastBlock will either propagate a block to a subset of it's peers, or/ Z3 w0 y9 _/ D, u
// will only announce it's availability (depending what's requested).
+ ~3 o# ^+ Z# k: C2 ~func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) {6 u# }( z) X5 T% K, y; S
        hash := block.Hash()0 ]5 z" l/ y4 r4 v4 w
        peers := pm.peers.PeersWithoutBlock(hash)- s+ R+ M( X4 j6 Q$ A
        // If propagation is requested, send to a subset of the peer
0 ^" Y: D5 q  Z3 z4 h# e# `* \- k        // 这种情况,要把区块广播给部分peer
- w( i5 H1 x5 q! X        if propagate {
$ }/ ]0 C9 f) H! K$ S% X                // Calculate the TD of the block (it's not imported yet, so block.Td is not valid); p; }) m5 Y3 e+ a2 n% \3 Z2 C
                // 计算新的总难度7 m: Y( ]" I- _
                var td *big.Int$ c; F% T. ?3 |: p' ~9 Y6 x
                if parent := pm.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1); parent != nil {
" z) x4 F; \3 @: Z                        td = new(big.Int).Add(block.Difficulty(), pm.blockchain.GetTd(block.ParentHash(), block.NumberU64()-1))
% c0 S( ?3 Y5 r/ r2 S" R                } else {
& P% O: ?$ k$ @6 u8 I& e                        log.Error("Propagating dangling block", "number", block.Number(), "hash", hash)
/ |7 l2 ^1 n+ C! Z                        return& @( u1 `6 v' z
                }
* p0 B, d! G+ T& P" W2 y" x1 ]; a' p                // Send the block to a subset of our peers
$ s  s. K, D, T6 {                // 广播区块给部分peer( d* O) D: |  C/ ]# c4 T: b9 A! R
                transfer := peers[:int(math.Sqrt(float64(len(peers))))]/ }: }- z: X" x
                for _, peer := range transfer {
6 u  z6 J$ v* G5 d                        peer.AsyncSendNewBlock(block, td)
, a+ X# B- ^2 ~# K                }- c) L; A& G' |' x. b) ?7 }! T& t1 p
                log.Trace("Propagated block", "hash", hash, "recipients", len(transfer), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))
2 f& j, |) T% Z8 T" O                return  N. x  e. I9 n3 G' y/ L
        }0 s! f8 x2 p7 ~! z! q
        // Otherwise if the block is indeed in out own chain, announce it+ M4 [5 X/ ?. R; {" x2 f
        // 把区块hash值广播给所有peer, M! l* P/ _" r# M0 b# s
        if pm.blockchain.HasBlock(hash, block.NumberU64()) {  C6 i1 F( K- q! v( t- ^) D/ N
                for _, peer := range peers {
4 u3 U4 A5 x2 o% ?) a& i  n3 H4 T                        peer.AsyncSendNewBlockHash(block)+ o2 O5 S$ F& y/ H2 {( j# i' |
                }
# k2 k/ L& u6 F                log.Trace("Announced block", "hash", hash, "recipients", len(peers), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))- `$ f/ v0 s7 e* h4 d
        }- |; k+ r  m4 u5 d% k$ G5 n6 I4 M
}3 X6 g* j  F. ?1 s
peer.broadcase()是每个Peer连接的广播函数,它只广播3种消息:交易、完整的区块、区块的Hash,这样表明了节点只会主动广播这3中类型的数据,剩余的数据同步,都是通过请求-响应的方式。0 |7 V1 V9 \. M& d& O
// broadcast is a write loop that multiplexes block propagations, announcements
; f% G$ D; ?; b2 C) o; D4 ~; @// and transaction broadcasts into the remote peer. The goal is to have an async5 A& C4 x+ q! _6 `
// writer that does not lock up node internals.. y( w- ?3 G( w$ m8 Q! \3 ]
func (p *peer) broadcast() {1 K5 ]4 t3 I% m! U/ a& ?4 z9 o
        for {
( f2 L- g, t& A6 b- p8 I                select {
$ L8 Q# ^% l; l/ x                // 广播交易5 ~5 b+ ]/ W* t7 B" l7 }
                case txs :=
4 o9 M9 s; f- [- fPeer节点处理新区块
/ E/ r$ a: f, Y0 G! ~: z# q本节介绍远端节点收到2种区块同步消息的处理,其中NewBlockMsg的处理流程比较清晰,也简洁。NewBlockHashesMsg消息的处理就绕了2绕,从总体流程图1上能看出来,它需要先从给他发送消息Peer那里获取到完整的区块,剩下的流程和NewBlockMsg又一致了。- F6 ]0 f( k) @2 d- C" v5 I
这部分涉及的模块多,画出来有种眼花缭乱的感觉,但只要抓住上面的主线,代码看起来还是很清晰的。通过图5先看下整体流程。$ m; D4 |6 y4 I- d
消息处理的起点是ProtocolManager.handleMsg,NewBlockMsg的处理流程是蓝色标记的区域,红色区域是单独的协程,是fetcher处理队列中区块的流程,如果从队列中取出的区块是当前链需要的,校验后,调用blockchian.InsertChain()把区块插入到区块链,最后写入数据库,这是黄色部分。最后,绿色部分是NewBlockHashesMsg的处理流程,代码流程上是比较复杂的,为了能通过图描述整体流程,我把它简化掉了。
( _$ j& N7 q/ a4 Y
* \' a; m& {2 |" z& K8 [$ N8 O. U( n仔细看看这幅图,掌握整体的流程后,接下来看每个步骤的细节。" k. p) J8 h6 e3 o7 h0 W
NewBlockMsg的处理) {  O: @# h6 m" c1 d) m( I
本节介绍节点收到完整区块的处理,流程如下:
7 `4 l; L1 J3 H* g! E( I7 `* S首先进行RLP编解码,然后标记发送消息的Peer已经知道这个区块,这样本节点最后广播这个区块的Hash时,不会再发送给该Peer。# f( \5 E. A1 t) u& {" O
将区块存入到fetcher的队列,调用fetcher.Enqueue。( ]3 p: q  c1 q" n- _" ~
更新Peer的Head位置,然后判断本地链是否落后于Peer的链,如果是,则通过Peer更新本地链。
: X: t$ _2 K( }* W9 J% V/ g5 b只看handle.Msg()的NewBlockMsg相关的部分。  t4 {9 w9 @, n4 @, }7 ?
case msg.Code == NewBlockMsg:
: M* p% `1 \) P" _  K        // Retrieve and decode the propagated block6 V$ d; j  I5 h7 d$ o2 U0 X
        // 收到新区块,解码,赋值接收数据% b+ \7 B$ m& x( m2 c& L
        var request newBlockData
& J* r9 G9 a, N( e2 ?        if err := msg.Decode(&request); err != nil {
$ F( N0 I& D; g9 C8 s. G                return errResp(ErrDecode, "%v: %v", msg, err)' ]0 T0 I- }0 t4 S
        }1 E) `* h+ n$ [2 B( K
        request.Block.ReceivedAt = msg.ReceivedAt
# E: Q! n" W2 O( @: e/ [        request.Block.ReceivedFrom = p9 f% {) p7 c  I
        // Mark the peer as owning the block and schedule it for import5 O1 q" }9 e+ W' c) d5 F4 |5 U
        // 标记peer知道这个区块, w! i  d9 g# ]. K4 I. M
        p.MarkBlock(request.Block.Hash())) T8 ]- I$ H* `) N# p
        // 为啥要如队列?已经得到完整的区块了
4 k( T* W4 P$ L; w! y        // 答:存入fetcher的优先级队列,fetcher会从队列中选取当前高度需要的块1 l) Y: H, c$ F
        pm.fetcher.Enqueue(p.id, request.Block)# {2 \5 P: A) C4 F
        // Assuming the block is importable by the peer, but possibly not yet done so,6 `9 |( g9 S& K8 M# r% J0 L) r# e8 h; d
        // calculate the head hash and TD that the peer truly must have.
3 {9 y& r) L$ g7 o        // 截止到parent区块的头和难度+ M  ~; R# I. `3 U3 N" ?
        var (6 e. X& h) d" Q
                trueHead = request.Block.ParentHash()
; V; D0 O9 ~% ~0 y% y' ]                trueTD   = new(big.Int).Sub(request.TD, request.Block.Difficulty())
; l3 B& o) I! o7 h; ^/ {" h        )) T/ z8 f2 Y; ~: X! V
        // Update the peers total difficulty if better than the previous0 }  {) z" @3 L' Z' \$ k
        // 如果收到的块的难度大于peer之前的,以及自己本地的,就去和这个peer同步' ?- _+ ^* q3 [. x; i
        // 问题:就只用了一下块里的hash指,为啥不直接使用这个块呢,如果这个块不能用,干嘛不少发送些数据,减少网络负载呢。4 n0 C0 q# @6 t: `' ^1 w
        // 答案:实际上,这个块加入到了优先级队列中,当fetcher的loop检查到当前下一个区块的高度,正是队列中有的,则不再向peer请求7 b; ~6 z" Y1 P; t/ C
        // 该区块,而是直接使用该区块,检查无误后交给block chain执行insertChain
8 q' u6 O' Q5 o" t; H) n3 X2 L) v        if _, td := p.Head(); trueTD.Cmp(td) > 0 {
& \3 p, P, [" e/ Q3 @                p.SetHead(trueHead, trueTD)
$ l# C5 W3 W7 V6 Z9 V0 R                // Schedule a sync if above ours. Note, this will not fire a sync for a gap of+ ]& B, v/ i0 d
                // a singe block (as the true TD is below the propagated block), however this; x# d1 e" K6 R" p8 L4 Q2 V
                // scenario should easily be covered by the fetcher.
2 X! T+ S6 ]7 y( P6 `% {                currentBlock := pm.blockchain.CurrentBlock()3 W6 C8 U& g5 E- e
                if trueTD.Cmp(pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64())) > 0 {
7 e$ d. d, A/ w/ j8 A6 _                        go pm.synchronise(p)
# B  E8 W# w( O; e# s                }+ _7 h" {# E# r2 _! r3 I
        }
9 i" s: n) ]% E3 t; C5 q//------------------------ 以上 handleMsg
, V$ N' q$ i1 @' p6 h  ^' O0 E5 Y. n// Enqueue tries to fill gaps the the fetcher's future import queue.
- Q" E. S/ O+ ^3 ]" @// 发给inject通道,当前协程在handleMsg,通过通道发送给fetcher的协程处理% L+ c1 A/ O; n* Y% K
func (f *Fetcher) Enqueue(peer string, block *types.Block) error {
' ~) I* O! E% c8 b: E        op := &inject{4 N6 j: k  P: I5 D) Z
                origin: peer,1 M. @  c! e& H3 E8 w; k
                block:  block,
) W) u0 p, a8 S$ a3 j  I        }
1 ]3 k# F/ @$ q0 R1 f        select {( A- i+ }) O& ~$ @; N/ @
        case f.inject  blockLimit {8 }: l( U8 r- E2 r7 h' ?3 @
                log.Debug("Discarded propagated block, exceeded allowance", "peer", peer, "number", block.Number(), "hash", hash, "limit", blockLimit)8 z" N6 O8 ~2 d& @6 v4 @
                propBroadcastDOSMeter.Mark(1): i  [, J9 [( n& _5 u5 a
                f.forgetHash(hash)2 Z: p2 r2 [5 Y1 B" y
                return
' r! f) ]4 d+ n" V, w6 b        }  M# d( J0 {& [3 L
        // Discard any past or too distant blocks  e. `5 C# r2 [2 T
        // 高度检查:未来太远的块丢弃
; K' W) D1 F# D% ?' F/ K& P( h        if dist := int64(block.NumberU64()) - int64(f.chainHeight()); dist  maxQueueDist {* M# F% V# _8 b) O' c" t
                log.Debug("Discarded propagated block, too far away", "peer", peer, "number", block.Number(), "hash", hash, "distance", dist)' H% w' ^. E" N0 L
                propBroadcastDropMeter.Mark(1)) r7 r: E8 P' e5 @* p
                f.forgetHash(hash)
( ^3 d! ]$ i3 Q) L                return( r; k$ z6 A8 G4 k
        }; ~: F6 \8 }% c, Q- a
        // Schedule the block for future importing
3 L: R; u. ]# _: h7 x8 H0 f        // 块先加入优先级队列,加入链之前,还有很多要做* \4 E6 z8 C3 B/ V. j. ]  W
        if _, ok := f.queued[hash]; !ok {
0 Q! b6 T1 D& k3 z) |2 S: f" L                op := &inject{4 g9 b, T0 w+ t) P% i" f# {
                        origin: peer,
% ]+ M9 t. F9 S2 x. j2 g# ~) q# `                        block:  block,
- L( ~: ?5 G: x2 s5 s4 Z: w9 W                }1 w5 F0 d1 z  T* @, Y
                f.queues[peer] = count
" _$ g( m8 ]5 F* a' Y                f.queued[hash] = op! l9 j1 v+ C* U; w' r# l2 f
                f.queue.Push(op, -float32(block.NumberU64()))) d' |# z& C/ K
                if f.queueChangeHook != nil {
% N, [4 J7 j- s& J) b                        f.queueChangeHook(op.block.Hash(), true)
' a; v0 r% J; D                }9 P7 P9 X. X; N) W, i
                log.Debug("Queued propagated block", "peer", peer, "number", block.Number(), "hash", hash, "queued", f.queue.Size())
- `( X# G+ q" P5 R; f2 }+ }        }
' Z4 y2 J" W6 O" f5 ?& Y# o}
+ ]  n9 r% _% r/ x, q7 I( `fetcher队列处理; l% R2 Q% Y) X9 W7 v2 d
本节我们看看,区块加入队列后,fetcher如何处理区块,为何不直接校验区块,插入到本地链?+ ~$ f0 f0 e' j: M  ?* d' e7 v
由于以太坊又Uncle的机制,节点可能收到老一点的一些区块。另外,节点可能由于网络原因,落后了几个区块,所以可能收到“未来”的一些区块,这些区块都不能直接插入到本地链。
- n9 T/ P$ N, u5 ?2 i区块入的队列是一个优先级队列,高度低的区块会被优先取出来。fetcher.loop是单独协程,不断运转,清理fecther中的事务和事件。首先会清理正在fetching的区块,但已经超时。然后处理优先级队列中的区块,判断高度是否是下一个区块,如果是则调用f.insert()函数,校验后调用BlockChain.InsertChain(),成功插入后,广播新区块的Hash
5 q; B5 }: Y" O3 p: [5 A$ v// Loop is the main fetcher loop, checking and processing various notification& q& e/ o# V6 o# [$ O+ j* X7 A
// events.
9 w  o7 s% P  {! Hfunc (f *Fetcher) loop() {
4 B( K: U$ [5 a) J        // Iterate the block fetching until a quit is requested) P" q' d9 U9 U# J8 Z7 ?8 y3 Y  w
        fetchTimer := time.NewTimer(0)6 @6 {( ~* D/ \8 X* y
        completeTimer := time.NewTimer(0)5 \% {% T' W# `+ ]/ \, n( ?( R2 K
        for {, l* S- ^9 U9 H; j  D) A4 Y9 U
                // Clean up any expired block fetches
" k+ ^; M4 s! `5 d8 m9 u3 I* Y                // 清理过期的区块2 E$ Q/ o' m5 u. Q6 D% j. L
                for hash, announce := range f.fetching {$ I" }# h! V' B1 s7 H& r
                        if time.Since(announce.time) > fetchTimeout {
4 U4 l5 }7 P) K. f, ]  Z                                f.forgetHash(hash)
4 v# o8 s- p+ _! C: R8 K+ ]* [- Z                        }& ]4 c" R9 @/ @! X% Y
                }
, g: ?  c, f  j, {+ i                // Import any queued blocks that could potentially fit
4 ^! p; b3 D3 Z3 x0 p6 ^2 m                // 导入队列中合适的块, F+ [  C7 X- `8 ^* T1 C
                height := f.chainHeight()
9 Y# ~/ z1 t. s8 L1 X- h4 I                for !f.queue.Empty() {
# \. R% I- Z% K% U8 m( Z' ?5 X3 \; q                        op := f.queue.PopItem().(*inject)* ^! O* w. u  f5 l' o* J# `
                        hash := op.block.Hash()
0 U/ U1 a& H. d' h8 O5 Q6 N                        if f.queueChangeHook != nil {' W/ r3 p/ _7 q2 {5 N& K
                                f.queueChangeHook(hash, false)
5 |, g+ M& Y! c* ]- Q0 n5 e: {                        }- G0 J! O$ S) V) ]% I" p3 G
                        // If too high up the chain or phase, continue later% [$ M; P# h8 B  R: ~2 k" g! s& ]
                        // 块不是链需要的下一个块,再入优先级队列,停止循环2 E. H/ S6 [+ D7 o! O  o" ~
                        number := op.block.NumberU64()
5 H1 i: q8 c: m                        if number > height+1 {9 G3 j9 `) f& R5 h7 K) m- S
                                f.queue.Push(op, -float32(number))
! D* m: ?6 K( U0 I                                if f.queueChangeHook != nil {3 R8 X" x3 Z/ I9 w8 u7 Y
                                        f.queueChangeHook(hash, true)
4 A( V5 w6 D. \" j# P                                }, ~& D9 f- p0 y
                                break
% A$ E! g. T1 p/ L/ L                        }/ F1 y, q* V: ^, k
                        // Otherwise if fresh and still unknown, try and import2 g6 t+ v6 N4 D. W$ I0 ~$ S" o7 a/ r
                        // 高度正好是我们想要的,并且链上也没有这个块
5 r" D, a! Y& A4 g, Z1 ]                        if number+maxUncleDist ( v: O+ ?- f( [( f
func (f *Fetcher) insert(peer string, block *types.Block) {% t5 ^/ [4 ?0 b4 L: o. x
        hash := block.Hash()$ n  v+ z- @0 I' z1 C
        // Run the import on a new thread
) C; n4 k$ M. a: `. z        log.Debug("Importing propagated block", "peer", peer, "number", block.Number(), "hash", hash)1 _' R0 a+ [4 T3 [
        go func() {
+ w! D6 O4 y, \6 N& [* @/ ~                defer func() { f.done
7 O2 P) |! w3 Y8 Z; Y4 ~2 b. Y- d9 fNewBlockHashesMsg的处理
* {! y6 @4 e" w2 M  g, e4 [本节介绍NewBlockHashesMsg的处理,其实,消息处理是简单的,而复杂一点的是从Peer哪获取完整的区块,下节再看。/ }$ J- o8 ?( e$ t
流程如下:# N  T% F7 y7 E" g
对消息进行RLP解码,然后标记Peer已经知道此区块。寻找出本地区块链不存在的区块Hash值,把这些未知的Hash通知给fetcher。fetcher.Notify记录好通知信息,塞入notify通道,以便交给fetcher的协程。fetcher.loop()会对notify中的消息进行处理,确认区块并非DOS攻击,然后检查区块的高度,判断该区块是否已经在fetching或者comleting(代表已经下载区块头,在下载body),如果都没有,则加入到announced中,触发0s定时器,进行处理。9 }9 `5 ^" T3 F- B. x
关于announced下节再介绍。
. m- P9 a! \( U/ l, R( f& O
: p6 E5 ]" b+ p2 M2 y8 \// handleMsg()部分7 j5 |" _* P) n) A7 d1 X3 o% Y
case msg.Code == NewBlockHashesMsg:
" h% B+ I3 e5 t' v$ m/ [        var announces newBlockHashesData
! R: h' ]% }) ?+ q" S8 v( x        if err := msg.Decode(&announces); err != nil {
1 N# B& _4 ?( J% D# O- l, G& a) l; B                return errResp(ErrDecode, "%v: %v", msg, err)
% ^/ {7 u' Y6 O        }, m4 M' G/ S  j) S
        // Mark the hashes as present at the remote node
& `5 F3 P4 |) @0 M: P# t        for _, block := range announces {" \4 s- j" N/ a0 P+ H
                p.MarkBlock(block.Hash)" a+ a- u: r) @0 s- u- ^
        }
7 i' b1 ~+ i% t; m1 R( X        // Schedule all the unknown hashes for retrieval
7 r0 H' O0 l3 s# g  V$ [0 T        // 把本地链没有的块hash找出来,交给fetcher去下载
/ E8 r$ [9 M+ ^7 o        unknown := make(newBlockHashesData, 0, len(announces))
! P( l+ y5 E1 n' R! R        for _, block := range announces {$ G% a2 G; T4 N' ?/ H$ ~
                if !pm.blockchain.HasBlock(block.Hash, block.Number) {
# d/ o0 D6 f- Q7 @- m3 y4 G& ]8 L                        unknown = append(unknown, block)
$ w/ L, g* n' f5 S8 C+ v  I' v                }1 K5 u5 O- ]) L& X% X. o
        }
! @' x: n5 g. f# K        for _, block := range unknown {2 [0 ~1 l+ g$ T* V6 d
                pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestOneHeader, p.RequestBodies)
& G8 Q: I& H$ P+ t. q) i& P) _3 R- S6 M        }4 w( C# H6 ?3 E2 y
// Notify announces the fetcher of the potential availability of a new block in
  X  k" H; q* {6 Q, X5 B/ Y// the network.
$ D* J' W1 X$ s* e9 w9 M( f// 通知fetcher(自己)有新块产生,没有块实体,有hash、高度等信息+ @- Q: i# H& {* F
func (f *Fetcher) Notify(peer string, hash common.Hash, number uint64, time time.Time,- D$ L. a3 |! Q5 c2 U. Z/ o  E
        headerFetcher headerRequesterFn, bodyFetcher bodyRequesterFn) error {' R( x5 b/ ?; s/ R5 v
        block := &announce{$ o; |( J. B. J5 E) H4 F) c
                hash:        hash,: w: {4 l# E+ \: v9 u, ]
                number:      number,
- P* I5 F% a# e9 J- ~+ y! Y                time:        time,
+ W& r$ W# Z4 T$ m                origin:      peer,
0 m2 A2 r: R) W0 C5 M2 A                fetchHeader: headerFetcher,
) E. K# X$ M% u: m                fetchBodies: bodyFetcher,
# j, X9 _9 W' B' q        }
( D& k8 U7 i# M& ?; [/ J        select {
( I) Z5 ]0 ^" |/ a* {        case f.notify  hashLimit {) K* v# y& x$ U+ ~/ \3 p
                log.Debug("Peer exceeded outstanding announces", "peer", notification.origin, "limit", hashLimit)
) y- Z; n3 w/ |$ @  |                propAnnounceDOSMeter.Mark(1)4 g: Z( F( T) B$ O
                break
$ y" M( w* D- r( n        }
1 @1 d# I2 R3 i# a- V1 q        // If we have a valid block number, check that it's potentially useful0 K0 ~7 N9 ~* n8 z6 @7 G
        // 高度检查
7 D' E# R- {/ K8 N0 U0 ~* R        if notification.number > 0 {
6 z+ g$ J+ W. _8 q+ f, B" @5 d                if dist := int64(notification.number) - int64(f.chainHeight()); dist  maxQueueDist {
8 I# p- L- k8 r( d* ~& G                        log.Debug("Peer discarded announcement", "peer", notification.origin, "number", notification.number, "hash", notification.hash, "distance", dist)( E1 r  k7 ]9 }: s, g/ O/ ?
                        propAnnounceDropMeter.Mark(1)
4 ?3 i5 C. a4 G                        break* B8 N: a! X8 ?4 ?: L- J4 C
                }: R) x3 K) w4 X* e+ p5 H$ g
        }, H) F$ S4 n" i+ i) \9 U0 w
        // All is well, schedule the announce if block's not yet downloading
- b/ J8 U8 |8 x- o        // 检查是否已经在下载,已下载则忽略
% y. x! c4 {; I7 j4 W: `6 `" J/ w        if _, ok := f.fetching[notification.hash]; ok {
! \9 Z- T! n' D& c- \                break
& G$ Q! m- G" w* t        }4 X& I( l* F) z+ o! `
        if _, ok := f.completing[notification.hash]; ok {
1 f/ @1 a) u  O( a- W* o. V0 \% s                break
9 F/ F- k  y) ^0 R2 e+ V        }
0 C) n, {. q2 p: v8 v" B2 G        // 更新peer已经通知给我们的区块数量  I% q' A3 ?" p! E
        f.announces[notification.origin] = count+ Q- B, x' T( N. H& ~
        // 把通知信息加入到announced,供调度
# V0 N' Z2 U8 F3 J# h, Z        f.announced[notification.hash] = append(f.announced[notification.hash], notification)
. C- V5 o* x8 M8 M        if f.announceChangeHook != nil && len(f.announced[notification.hash]) == 1 {
% f% f' N# X4 d% ]5 W$ `6 v                f.announceChangeHook(notification.hash, true)
. B0 G. t' H$ ^. r        }2 t9 v! D, ]: h$ C# H" g
        if len(f.announced) == 1 {
" M$ v8 m2 G# v8 H                // 有通知放入到announced,则重设0s定时器,loop的另外一个分支会处理这些通知
! T& n2 v1 j+ L                f.rescheduleFetch(fetchTimer)
# d0 q9 S6 h  D" O2 z- Y        }& o9 L* F4 N, A& ^8 j- ]% B
fetcher获取完整区块! U. Y# _  ?% v( p2 F8 L' U0 k
本节介绍fetcher获取完整区块的过程,这也是fetcher最重要的功能,会涉及到fetcher至少80%的代码。单独拉放一大节吧。& R( P8 s# Y! t6 q4 q" g( r1 a1 R
Fetcher的大头' a% W! O7 \1 M  h
Fetcher最主要的功能就是获取完整的区块,然后在合适的实际交给InsertChain去验证和插入到本地区块链。我们还是从宏观入手,看Fetcher是如何工作的,一定要先掌握好宏观,因为代码层面上没有这么清晰。; D. C7 a! @; [4 I! y- Y! K+ y
宏观: |* l& C' I% s" \  L$ c
首先,看两个节点是如何交互,获取完整区块,使用时序图的方式看一下,见图6,流程很清晰不再文字介绍。* p0 a! T" l' \3 G! e
2 N0 w3 v, L/ c' i/ v
再看下获取区块过程中,fetcher内部的状态转移,它使用状态来记录,要获取的区块在什么阶段,见图7。我稍微解释一下:9 y! z4 C% X( F- S; i) y
收到NewBlockHashesMsg后,相关信息会记录到announced,进入announced状态,代表了本节点接收了消息。announced由fetcher协程处理,经过校验后,会向给他发送消息的Peer发送请求,请求该区块的区块头,然后进入fetching状态。获取区块头后,如果区块头表示没有交易和uncle,则转移到completing状态,并且使用区块头合成完整的区块,加入到queued优先级队列。获取区块头后,如果区块头表示该区块有交易和uncle,则转移到fetched状态,然后发送请求,请求交易和uncle,然后转移到completing状态。收到交易和uncle后,使用头、交易、uncle这3个信息,生成完整的区块,加入到队列queued。
' c, ]7 S- @. K% c! v7 Z# K" z; n% Q
/ p  B; R5 o5 \& S0 W  d2 K1 a9 Q2 m, b& V' O
微观
( ?% {! K2 q/ ?# @& J/ }# _接下来就是从代码角度看如何获取完整区块的流程了,有点多,看不懂的时候,再回顾下上面宏观的介绍图。
7 N# D/ U) k( A/ ^. v0 y8 O9 u首先看Fetcher的定义,它存放了通信数据和状态管理,捡加注释的看,上文提到的状态,里面都有。
3 w5 [/ b* O# I0 \4 Y// Fetcher is responsible for accumulating block announcements from various peers
/ H2 f* y; g) _' ]// and scheduling them for retrieval.
/ i( v9 E2 `" ]4 y// 积累块通知,然后调度获取这些块
6 N# K8 l& {1 H1 C  W6 rtype Fetcher struct {
9 M3 O/ {! b( d( x$ q7 b1 ]        // Various event channels$ F& a' ~; O, E5 f& x$ W
    // 收到区块hash值的通道
6 w5 Z* P+ V5 S7 R# c( r        notify chan *announce, F: _  q, I) ~
    // 收到完整区块的通道
! @! ^- I# ?. F& F# o; {2 {/ J+ o        inject chan *inject
2 V# ?3 \# c7 c2 p/ {* K2 ]3 N2 x        blockFilter chan chan []*types.Block" W, }4 L- H9 {, z( W" |4 E+ o9 E; o
        // 过滤header的通道的通道
, g' E- ?+ X9 D- I        headerFilter chan chan *headerFilterTask
6 X: j2 m2 \6 D- X! J; @        // 过滤body的通道的通道0 l! ~% T. [/ J* {) ~1 U
        bodyFilter chan chan *bodyFilterTask1 G+ A2 I! }2 L0 j! l( Z7 k
        done chan common.Hash% d2 I8 Z6 d  P; F! w: a+ M; }$ {
        quit chan struct{}7 |* q* @( h5 e. o4 m
        // Announce states4 _7 t6 [% C' X) _3 y
        // Peer已经给了本节点多少区块头通知
3 m  g6 i5 E. ^7 [9 l6 ^& z        announces map[string]int // Per peer announce counts to prevent memory exhaustion3 |7 ~: M1 F- p
        // 已经announced的区块列表5 A1 F8 O" u& ?
        announced map[common.Hash][]*announce // Announced blocks, scheduled for fetching4 q* g/ C" M' J) m" S  e. q
        // 正在fetching区块头的请求& M* Z- a. F5 X$ t1 `3 E. z" U* U8 k
        fetching map[common.Hash]*announce // Announced blocks, currently fetching8 O. j0 L) K, s/ t
        // 已经fetch到区块头,还差body的请求,用来获取body. S- y- e) q3 W. D' i
        fetched map[common.Hash][]*announce // Blocks with headers fetched, scheduled for body retrieval
. H$ q/ H7 A( P) {) b        // 已经得到区块头的
" ~' `* {6 ]# P3 D+ `3 }        completing map[common.Hash]*announce // Blocks with headers, currently body-completing7 A, s" n4 E( ^4 Q1 A* g
        // Block cache" \1 M0 B- u! ]: o" L! A# W0 R
        // queue,优先级队列,高度做优先级
3 |: p) T2 n1 j) V2 @; Z        // queues,统计peer通告了多少块
2 @* J2 D* {% C9 v        // queued,代表这个块如队列了,- I/ l# E+ p* Y3 K6 O
        queue  *prque.Prque            // Queue containing the import operations (block number sorted)
( H. g. i; ^" g, k( z: x        queues map[string]int          // Per peer block counts to prevent memory exhaustion, x! ~8 |4 M% m$ |& [
        queued map[common.Hash]*inject // Set of already queued blocks (to dedupe imports)
  K: H8 f4 {1 A( R/ G        // Callbacks9 e% \8 u$ H3 F4 j: S2 b# h( P; I
        getBlock       blockRetrievalFn   // Retrieves a block from the local chain
1 b% T: A( W, a        verifyHeader   headerVerifierFn   // Checks if a block's headers have a valid proof of work,验证区块头,包含了PoW验证
% A' i- r4 i5 p, J        broadcastBlock blockBroadcasterFn // Broadcasts a block to connected peers,广播给peer
5 X/ m. O5 T/ a! p6 y        chainHeight    chainHeightFn      // Retrieves the current chain's height2 r0 N8 A2 M' Z, P1 ?7 U+ j8 s
        insertChain    chainInsertFn      // Injects a batch of blocks into the chain,插入区块到链的函数. x+ K" E& ?$ ^) j. }! D6 z
        dropPeer       peerDropFn         // Drops a peer for misbehaving
- x" l; P- @& S6 P4 e        // Testing hooks
2 l% q4 R0 u# X% x) Y+ I( n3 j        announceChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a hash from the announce list
& Y9 p3 ?( j- B0 Q% ~# q* r3 y+ I        queueChangeHook    func(common.Hash, bool) // Method to call upon adding or deleting a block from the import queue, x+ r. B! Z2 L$ Q. }6 q8 v# X$ e
        fetchingHook       func([]common.Hash)     // Method to call upon starting a block (eth/61) or header (eth/62) fetch9 S0 ]- z; n: P0 F
        completingHook     func([]common.Hash)     // Method to call upon starting a block body fetch (eth/62), l5 F7 _: n* F' j
        importedHook       func(*types.Block)      // Method to call upon successful block import (both eth/61 and eth/62). }8 _6 c' s. k& v
}# d7 j0 `" M5 Y3 n$ ^
NewBlockHashesMsg消息的处理前面的小节已经讲过了,不记得可向前翻看。这里从announced的状态处理说起。loop()        中,fetchTimer超时后,代表了收到了消息通知,需要处理,会从announced中选择出需要处理的通知,然后创建请求,请求区块头,由于可能有很多节点都通知了它某个区块的Hash,所以随机的从这些发送消息的Peer中选择一个Peer,发送请求的时候,为每个Peer都创建了单独的协程。/ @5 a; h4 j+ e3 L
case  arriveTimeout-gatherSlack {+ N  J% s' R; q7 s0 Q
                        // Pick a random peer to retrieve from, reset all others
2 I- ^* x0 S: _5 {: a                        // 可能有很多peer都发送了这个区块的hash值,随机选择一个peer
0 B- F1 o0 H4 F( S4 \7 T                        announce := announces[rand.Intn(len(announces))]  Y5 C; I: x$ w' E+ C) X
                        f.forgetHash(hash). C/ N2 _2 T: e; S5 s. T* a6 L0 N
                        // If the block still didn't arrive, queue for fetching! f$ N6 A' ?9 |6 K& @1 @! `
                        // 本地还没有这个区块,创建获取区块的请求3 t  T6 N4 u0 L+ i$ ?
                        if f.getBlock(hash) == nil {
$ U+ C3 M4 N) ?) S7 \- }6 q- f: s                                request[announce.origin] = append(request[announce.origin], hash)
. ]/ u6 q# _/ o1 x5 |5 K1 G                                f.fetching[hash] = announce
; K6 P: K1 V0 ]' M+ b/ h1 i( u                        }
# s6 X- |: k- A, ~$ T+ c2 [& _0 S. f" T                }
) R7 s1 ]6 ~5 Z* U- p; G0 O! C) V& ?        }
7 B3 X( ?2 S" \0 c$ o- l2 D5 ^        // Send out all block header requests
- D& h) E* _: j) L        // 把所有的request发送出去* x, j3 j$ |, z4 {
        // 为每一个peer都创建一个协程,然后请求所有需要从该peer获取的请求4 _" b) v2 ~) H4 N7 H
        for peer, hashes := range request {/ f/ A6 L3 U3 @3 n" W- \1 i( x0 k
                log.Trace("Fetching scheduled headers", "peer", peer, "list", hashes)' O/ f) d) E( S9 `# ]& z
                // Create a closure of the fetch and schedule in on a new thread2 x9 `: ?8 C5 }4 U% y3 ]
                fetchHeader, hashes := f.fetching[hashes[0]].fetchHeader, hashes
# y) D' n" M+ O% f" B                go func() {
5 Z3 L8 }" m4 S. G2 s                        if f.fetchingHook != nil {
  E0 ^0 S. f  i                                f.fetchingHook(hashes)  N- G' M" u: m- L+ B: M' J
                        }
# H. i9 |6 M+ G7 t                        for _, hash := range hashes {; ^) `' q" g! v5 _# r+ k- E- |- s& G
                                headerFetchMeter.Mark(1)
( L+ t3 ?$ b$ A/ g  m# d: @2 i  v                                fetchHeader(hash) // Suboptimal, but protocol doesn't allow batch header retrievals
  t* J5 Q# b% O9 M                        }
" Q- q: G+ E  J: p9 p                }(); }. Q/ K; H8 v3 K' D! W$ i& _
        }
5 G' \  {  v6 @( L' N6 |        // Schedule the next fetch if blocks are still pending
/ G9 \. {2 N' x: j3 C/ S4 N: {        f.rescheduleFetch(fetchTimer)+ N/ t7 `/ m% I' G4 M8 ~/ Z3 Q2 k' T
从Notify的调用中,可以看出,fetcherHeader()的实际函数是RequestOneHeader(),该函数使用的消息是GetBlockHeadersMsg,可以用来请求多个区块头,不过fetcher只请求一个。
6 R. H# i9 u8 e$ j) V/ {# gpm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestOneHeader, p.RequestBodies)8 `* V: b* K$ h6 z
// RequestOneHeader is a wrapper around the header query functions to fetch a0 N7 t3 W: I( P! b5 q
// single header. It is used solely by the fetcher.; W! g) \: R5 p& h9 V
func (p *peer) RequestOneHeader(hash common.Hash) error {
9 q& S" v3 U6 B, @# |% H5 V4 z        p.Log().Debug("Fetching single header", "hash", hash)6 y+ R. C8 A' B' L2 N/ @+ u
        return p2p.Send(p.rw, GetBlockHeadersMsg, &getBlockHeadersData{Origin: hashOrNumber{Hash: hash}, Amount: uint64(1), Skip: uint64(0), Reverse: false})
2 o" W6 q4 N6 p8 L8 O}" \: M+ D3 s" g+ z8 w( C7 w+ ^
GetBlockHeadersMsg的处理如下:因为它是获取多个区块头的,所以处理起来比较“麻烦”,还好,fetcher只获取一个区块头,其处理在20行~33行,获取下一个区块头的处理逻辑,这里就不看了,最后调用SendBlockHeaders()将区块头发送给请求的节点,消息是BlockHeadersMsg。) g, c1 ?, {. c, W; `
···' `- F1 V6 {0 X* J8 y
// handleMsg()
6 u) m# Z6 x- P6 R" G2 r7 s// Block header query, collect the requested headers and reply
. F! v& T* w! a) `: r( rcase msg.Code == GetBlockHeadersMsg:- n- U5 m% p) z9 }/ H. [% j% N6 L+ g
// Decode the complex header query
9 A' Q# c5 E; [( w  }var query getBlockHeadersData
, G1 B- Z8 a  eif err := msg.Decode(&query); err != nil {
& f2 n6 Y! C; W$ b' n2 B+ qreturn errResp(ErrDecode, “%v: %v”, msg, err)$ h/ @" O0 B" D2 s; t# l
}+ q6 [* J9 X7 c0 t' ~
hashMode := query.Origin.Hash != (common.Hash{}), [+ m/ n) J2 j( x  B0 g( |* e
// Gather headers until the fetch or network limits is reached! ]$ J( c2 T% `  v/ w
// 收集区块头,直到达到限制
+ Y! d6 ?% Z+ Pvar (* s5 I3 t# v9 c, `
        bytes   common.StorageSize
3 e/ X( X( \% h. \5 M: |8 D        headers []*types.Header
# P8 }* D# t/ Y        unknown bool
. X0 j5 k  V! s; `. ~8 T" })7 \* j4 N0 s5 v% `& @
// 自己已知区块 && 少于查询的数量 && 大小小于2MB && 小于能下载的最大数量. n# B# I+ L; i) ~5 J" I
for !unknown && len(headers)
. t5 y- n( V6 r/ J+ h/ j`BlockHeadersMsg`的处理很有意思,因为`GetBlockHeadersMsg`并不是fetcher独占的消息,downloader也可以调用,所以,响应消息的处理需要分辨出是fetcher请求的,还是downloader请求的。它的处理逻辑是:fetcher先过滤收到的区块头,如果fetcher不要的,那就是downloader的,在调用`fetcher.FilterHeaders`的时候,fetcher就将自己要的区块头拿走了。8 m1 G% b$ [" F. f
// handleMsg()
( V3 t( i0 E8 x. G: ?case msg.Code == BlockHeadersMsg:
. J' V0 e8 Z# V// A batch of headers arrived to one of our previous requests( g( B7 T! u/ L  c. o. }
var headers []*types.Header9 L* s. B6 i6 ~
if err := msg.Decode(&headers); err != nil {+ G9 Z- n; z) {$ T/ ?! v7 F9 h# l
return errResp(ErrDecode, “msg %v: %v”, msg, err)! N4 x* i% c1 M  \6 j. G6 d+ H( f6 P
}$ b( O# `5 @7 {$ V8 k0 _- P
// If no headers were received, but we’re expending a DAO fork check, maybe it’s that" o4 C3 o2 W! n- T
// 检查是不是当前DAO的硬分叉/ H3 Q3 v$ Q7 x
if len(headers) == 0 && p.forkDrop != nil {
2 j9 Z1 L. F; \+ a4 E. Z// Possibly an empty reply to the fork header checks, sanity check TDs( g* F' ?# h9 S
verifyDAO := true
: M& l' {% V% H) e# S( O) R        // If we already have a DAO header, we can check the peer's TD against it. If
, t8 ?% l2 G) c5 \- ^! F; E1 X/ C        // the peer's ahead of this, it too must have a reply to the DAO check
9 I! ]' d3 @  \5 h2 W7 b& [        if daoHeader := pm.blockchain.GetHeaderByNumber(pm.chainconfig.DAOForkBlock.Uint64()); daoHeader != nil {
3 M, a2 Z, }4 [3 R+ Z. q                if _, td := p.Head(); td.Cmp(pm.blockchain.GetTd(daoHeader.Hash(), daoHeader.Number.Uint64())) >= 0 {
% g) e3 u# c) U5 B+ w& j                        verifyDAO = false
0 f. C8 _7 E# `0 y& H                }& ^* ~1 o0 S, T( X- c2 k  S
        }5 G# o: o4 A5 L# W* \' j/ r
        // If we're seemingly on the same chain, disable the drop timer
1 X1 e( x: W, }+ v+ r+ I; Y& {        if verifyDAO {
/ V  j4 }6 v- _                p.Log().Debug("Seems to be on the same side of the DAO fork")
/ k& J% \. s: F+ `: R# m. ^                p.forkDrop.Stop()" U/ j6 c; t2 k& ?& J% O0 u
                p.forkDrop = nil3 u- A( W8 O3 L$ S- A; P
                return nil) O9 W, }4 R' U* o0 Z
        }
; q+ @& o8 `  w7 }( }/ l}2 m  e9 z. E' ?) X+ h
// Filter out any explicitly requested headers, deliver the rest to the downloader
6 t+ a9 e5 O, x- q0 V// 过滤是不是fetcher请求的区块头,去掉fetcher请求的区块头再交给downloader
( a5 _; Y) O5 E* t. p* O% ^; p1 ?filter := len(headers) == 1: G, |! v4 J3 }, }# |: Z4 r: t
if filter {, H8 f" d+ L( W* \$ M
        // If it's a potential DAO fork check, validate against the rules
' g# l0 z' r# D; @. v5 m9 H7 g        // 检查是否硬分叉
7 t0 e: n2 _/ u        if p.forkDrop != nil && pm.chainconfig.DAOForkBlock.Cmp(headers[0].Number) == 0 {  [) V2 Y$ q# }( Y+ d
                // Disable the fork drop timer
5 S2 t, y! R& T% {                p.forkDrop.Stop()
: _# P  \* ]. ]; m  A& }# t- {                p.forkDrop = nil1 p" n$ n8 j% z: s6 O, [9 K2 m
                // Validate the header and either drop the peer or continue; D; [# _. }9 E& w& d
                if err := misc.VerifyDAOHeaderExtraData(pm.chainconfig, headers[0]); err != nil {9 u/ b) i+ W! D' Q
                        p.Log().Debug("Verified to be on the other side of the DAO fork, dropping")
5 C  I6 }6 [. b                        return err- o- T) J; L' d' V3 O9 r6 b1 {8 i
                }
) L" P- e0 |% d# `: X                p.Log().Debug("Verified to be on the same side of the DAO fork")) X1 ~! y3 X1 p9 m  h. s) C) U3 l0 m
                return nil9 ]  I; x. r) g% K2 T
        }+ ]& z3 v6 Q% v+ h2 V0 p
        // Irrelevant of the fork checks, send the header to the fetcher just in case
4 q$ p0 B$ w" n( I        // 使用fetcher过滤区块头8 y1 h: n5 t4 w
        headers = pm.fetcher.FilterHeaders(p.id, headers, time.Now())
( p" L3 p  G1 W; N" R' U! E4 d}( i4 k0 U& b% d' m& `4 m% d: ]
// 剩下的区块头交给downloader
% b& z  b- [  X8 _0 [( O$ b" yif len(headers) > 0 || !filter {
! v8 B. J7 o( T( F' w( g        err := pm.downloader.DeliverHeaders(p.id, headers); R3 b# C$ S  _4 E- s' ?* W; Y. K1 X
        if err != nil {+ S) F5 E7 p' M9 d" w9 N- s" |1 |  Q
                log.Debug("Failed to deliver headers", "err", err)
5 n/ ^. H# N/ X+ P        }
9 M: u) |+ B# B1 G  q& @}% E9 v6 d5 ]. S- d9 J& j
`FilterHeaders()`是一个很有大智慧的函数,看起来耐人寻味,但实在妙。它要把所有的区块头,都传递给fetcher协程,还要获取fetcher协程处理后的结果。`fetcher.headerFilter`是存放通道的通道,而`filter`是存放包含区块头过滤任务的通道。它先把`filter`传递给了`headerFilter`,这样`fetcher`协程就在另外一段等待了,而后将`headerFilterTask`传入`filter`,`fetcher`就能读到数据了,处理后,再将数据写回`filter`而刚好被`FilterHeaders`函数处理了,该函数实际运行在`handleMsg()`的协程中。
( n& v: p9 L1 f+ S  ~  n+ U- ~每个Peer都会分配一个ProtocolManager然后处理该Peer的消息,但`fetcher`只有一个事件处理协程,如果不创建一个`filter`,fetcher哪知道是谁发给它的区块头呢?过滤之后,该如何发回去呢?
* G# m  u2 G* s. h// FilterHeaders extracts all the headers that were explicitly requested by the fetcher,
" V; k5 S9 F& Q9 k+ W. t5 A- f// returning those that should be handled differently.
; R# h% R# x$ _( L+ M! O// 寻找出fetcher请求的区块头
, C3 U9 U5 ^# Q. R# _func (f *Fetcher) FilterHeaders(peer string, headers []*types.Header, time time.Time) []*types.Header {
- R2 E% b! w4 r$ blog.Trace(“Filtering headers”, “peer”, peer, “headers”, len(headers))6 C& n0 ^! b0 L, c! k5 n3 d
// Send the filter channel to the fetcher, l& G8 W) E8 y/ u- }/ Y
// 任务通道
4 V. \! e2 `$ Kfilter := make(chan *headerFilterTask)* i" [9 f0 s  `
select {
3 L; y! M6 R; |// 任务通道发送到这个通道
' M6 r5 B0 r/ J6 N, ]case f.headerFilter ! y1 A' Y3 C4 h
}2 U! C% G6 n' g, L+ \5 \) Z+ C
接下来要看f.headerFilter的处理,这段代码有90行,它做了一下几件事:
0 ^- y& E- D& n7 s% ]! n$ Z0 n1. 从`f.headerFilter`取出`filter`,然后取出过滤任务`task`。( O/ v% |4 t% W' L# k$ R5 |0 }
2. 它把区块头分成3类:`unknown`这不是分是要返回给调用者的,即`handleMsg()`, `incomplete`存放还需要获取body的区块头,`complete`存放只包含区块头的区块。遍历所有的区块头,填到到对应的分类中,具体的判断可看18行的注释,记住宏观中将的状态转移图。6 U7 Q0 R  ]5 D1 h# \0 S( _
3. 把`unknonw`中的区块返回给`handleMsg()`。, c8 s. M7 ^  q9 Z6 c, E
4. 把` incomplete`的区块头获取状态移动到`fetched`状态,然后触发定时器,以便去处理complete的区块。
5 t" g) i( J+ \) }3 u  ~5. 把`compelete`的区块加入到`queued`。
  g+ C" z! X( a6 i// fetcher.loop()
" D/ {' M, l7 [( K* ?" acase filter :=
0 Y0 @& A8 b* d) {. y% K// Split the batch of headers into unknown ones (to return to the caller),: O9 ~" v, L- g. Y; \, |2 x4 Q
// known incomplete ones (requiring body retrievals) and completed blocks.; I  f9 {- ]' a) [1 F! J
// unknown的不是fetcher请求的,complete放没有交易和uncle的区块,有头就够了,incomplete放- W+ t, G4 \( \/ E9 S/ G' e/ a
// 还需要获取uncle和交易的区块  \) a9 b  D. M" c2 B9 N6 f$ J$ g% P
unknown, incomplete, complete := []*types.Header{}, []*announce{}, []*types.Block{}# v5 J9 @- G. X3 }/ a. ]) [* c
// 遍历所有收到的header6 `' x/ s1 f! V5 Q
for _, header := range task.headers {
2 S$ G: p3 Z; A9 q& `' L% m* X' h        hash := header.Hash()! v& W) k& r  Y/ H. U4 L
        // Filter fetcher-requested headers from other synchronisation algorithms4 Y2 V  C/ v* L: Z5 g
        // 是正在获取的hash,并且对应请求的peer,并且未fetched,未completing,未queued
6 C8 C6 ~/ H6 @7 L6 H        if announce := f.fetching[hash]; announce != nil && announce.origin == task.peer && f.fetched[hash] == nil && f.completing[hash] == nil && f.queued[hash] == nil {  l* \& x+ W$ d7 V
                // If the delivered header does not match the promised number, drop the announcer9 a, C$ f& _) c6 O: I
                // 高度校验,竟然不匹配,扰乱秩序,peer肯定是坏蛋。) X) M+ U7 s0 w) \8 l0 O
                if header.Number.Uint64() != announce.number {
7 K/ R) q" B/ }                        log.Trace("Invalid block number fetched", "peer", announce.origin, "hash", header.Hash(), "announced", announce.number, "provided", header.Number)
$ B7 V+ u# J% C3 X# p                        f.dropPeer(announce.origin)
8 ]9 Q( ?0 |% F) y4 Q8 O                        f.forgetHash(hash)3 U: s: f! f5 ]& o5 u  r+ f
                        continue4 p9 j+ m' m7 d7 \
                }/ Y( ~. f' h: l' R: ?% u
                // Only keep if not imported by other means1 T6 U+ v' L6 X% X2 L9 s. R4 a8 x; f7 a! D
                // 本地链没有当前区块/ [7 n: t' N3 j* }4 {
                if f.getBlock(hash) == nil {
  j6 t9 [; @0 x                        announce.header = header, k& E, t$ c+ g0 z1 g( z
                        announce.time = task.time3 ?1 C, }( b) o
                        // If the block is empty (header only), short circuit into the final import queue
, T& N& K$ |2 ~1 b  I( t1 K; t( J                        // 如果区块没有交易和uncle,加入到complete
1 Y- J  l5 i6 e! R3 |1 x                        if header.TxHash == types.DeriveSha(types.Transactions{}) && header.UncleHash == types.CalcUncleHash([]*types.Header{}) {" g* Z- p5 J' M
                                log.Trace("Block empty, skipping body retrieval", "peer", announce.origin, "number", header.Number, "hash", header.Hash()). O& D' f3 x! K/ P5 P1 V: V8 l
                                block := types.NewBlockWithHeader(header); B) p: @; U3 W* Q/ R! K) l
                                block.ReceivedAt = task.time+ T+ g7 ]% J( m* I. ?0 i+ o% u
                                complete = append(complete, block)
0 Q$ |* x( f! f8 B! g                                f.completing[hash] = announce2 a! f1 f, z, F' h" D6 X
                                continue
9 U- K: J: U; o                        }7 _2 W- \: }7 j) Q/ E
                        // Otherwise add to the list of blocks needing completion
, G/ ?6 _. D& f. r" E3 o                        // 否则就是不完整的区块
5 D5 b3 S9 ?8 ~& q, V  h5 i                        incomplete = append(incomplete, announce)
# z4 e+ `$ p( M                } else {
9 |- \# t* w# {1 j! Q; b                        log.Trace("Block already imported, discarding header", "peer", announce.origin, "number", header.Number, "hash", header.Hash())9 {$ v3 J' ]" Z" T$ ?0 g* T
                        f.forgetHash(hash)
1 @, g. h2 [- _9 {1 z  q                }; j+ @! ]. s9 j: ^5 ]8 w& Y* t; `
        } else {' m" w  y) f3 N8 o' e
                // Fetcher doesn't know about it, add to the return list
3 o* [  B- N4 e) U/ e                // 没请求过的header3 r) `! P8 b2 q1 J' E- x
                unknown = append(unknown, header)
; z/ r3 V+ f6 n' c1 A        }* ?+ n* i9 i9 I9 _( P9 _, W4 C
}
* M. J4 i- \1 Y) k9 P6 o// 把未知的区块头,再传递会filter
- w& z, B( s: R0 gheaderFilterOutMeter.Mark(int64(len(unknown)))
7 L; q; d) r3 B; A4 Z/ t" Fselect {
  G  i' r5 e/ t% B/ T( I6 }# ecase filter
! a% I8 i8 n/ n  c' h跟随状态图的转义,剩下的工作是`fetched`转移到`completing`        ,上面的流程已经触发了`completeTimer`定时器,超时后就会处理,流程与请求Header类似,不再赘述,此时发送的请求消息是`GetBlockBodiesMsg`,实际调的函数是`RequestBodies`。
# c8 W" u1 ^# A) q// fetcher.loop()3 n, j# s/ b8 w9 d, A) B2 K
case 2 _8 ?' c  ?/ a. @* o: l3 Y
// 遍历所有待获取body的announce- g' X0 f/ ]8 z* `9 \: c
for hash, announces := range f.fetched {
' \: \* G3 f6 E9 u9 R) g% e2 P        // Pick a random peer to retrieve from, reset all others0 ~+ c2 p, f* O, F8 i& ?( M" V# x+ o/ {( b
        // 随机选一个Peer发送请求,因为可能已经有很多Peer通知它这个区块了! m5 g& U( l/ H6 t$ F/ l
        announce := announces[rand.Intn(len(announces))]
1 a2 S! z2 B% V) t% i        f.forgetHash(hash)
2 T& ?1 r5 O( ~! l        // If the block still didn't arrive, queue for completion9 b* X7 U. `0 p5 `9 ?- ]
        // 如果本地没有这个区块,则放入到completing,创建请求
) Q% [( f( l+ s% |9 z; V! ?6 a        if f.getBlock(hash) == nil {
. W) l/ q, |2 b                request[announce.origin] = append(request[announce.origin], hash)0 W. o: Z: U# ]0 c
                f.completing[hash] = announce+ r6 G2 T/ k  a( K6 Q8 N" b
        }- {; l! G% e$ F  E: \& ?
}% [# O; s/ u2 ?: p6 i. L& [  t' ?3 i
// Send out all block body requests; U* D; B- q6 [& [% n) V, ^9 d- J
// 发送所有的请求,获取body,依然是每个peer一个单独协程
  }* Q. v0 Q6 Z3 t6 k! ffor peer, hashes := range request {+ f8 k8 Y5 N6 \: Q5 I, s3 N/ I
        log.Trace("Fetching scheduled bodies", "peer", peer, "list", hashes)& J1 H5 u9 f2 s3 @7 n5 ?  H
        // Create a closure of the fetch and schedule in on a new thread. u2 }  L  O+ l" A9 c6 B0 f
        if f.completingHook != nil {2 ^- r& j% S. s
                f.completingHook(hashes)7 ^( I' ~1 c& [' G) ], R
        }
: P* N3 K. h# q4 x        bodyFetchMeter.Mark(int64(len(hashes)))" r& S/ }4 n! l( R
        go f.completing[hashes[0]].fetchBodies(hashes)
0 b2 [# \  g# Y" \, \}
  ]8 t. p% Q9 q0 N( _6 s2 F* J" v1 D// Schedule the next fetch if blocks are still pending
$ m1 Z8 g6 Z3 {  ^" t8 J5 X( Jf.rescheduleComplete(completeTimer)1 K! c' ]+ z# u2 a* c8 o
`handleMsg()`处理该消息也是干净利落,直接获取RLP格式的body,然后发送响应消息。
* d$ }  t0 B4 M* ?( _7 N// handleMsg()
: ~* o" e: O# S* v  @5 vcase msg.Code == GetBlockBodiesMsg:$ u% }3 r2 ]) w# s+ B( A2 j
// Decode the retrieval message& X- u/ {! }# h& `& n/ m# [
msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))0 ^  v; ]: B  I8 u2 U: m2 {6 L
if _, err := msgStream.List(); err != nil {$ X% }  `2 D: n* \* m  g
return err  H' Z$ Q- P* a, j( B
}
! f+ k! E- F# g8 _4 P4 Y// Gather blocks until the fetch or network limits is reached
7 V1 K2 [2 c; Y. _1 @* V% e' i4 ^var (4 ?. E7 h1 F# v; c, P3 b( m
hash   common.Hash. H. e2 i6 y% d* _. D& m! L
bytes  int
* Z1 L5 d6 `8 a7 sbodies []rlp.RawValue
$ `. l$ a2 y4 k3 @). _% ^8 m1 B5 G! {5 S9 w8 J5 s! v0 S( D
// 遍历所有请求. C( W) H/ d* i1 P4 ?
for bytes
* w; C2 J( b+ o# R响应消息`BlockBodiesMsg`的处理与处理获取header的处理原理相同,先交给fetcher过滤,然后剩下的才是downloader的。需要注意一点,响应消息里只包含交易列表和叔块列表。
8 a5 Q) l; g3 x// handleMsg()" Q6 B3 ^3 p5 Q0 \* }+ [; D
case msg.Code == BlockBodiesMsg:
8 e: B/ N& D1 E% v+ ~" L; L, f// A batch of block bodies arrived to one of our previous requests
2 X  K: M2 l7 }# M9 }! e0 i- [var request blockBodiesData  h$ ?  e' R* J( X
if err := msg.Decode(&request); err != nil {
1 i7 n( w7 H7 x& [, U5 [return errResp(ErrDecode, “msg %v: %v”, msg, err)
0 x8 N9 p/ p3 m6 ?% S: @0 o}
2 J! p1 I* N; m$ ^5 `$ a) |# I$ d// Deliver them all to the downloader for queuing
, P5 z) R& k2 ]2 Z// 传递给downloader去处理
! d: e9 Z8 f0 utransactions := make([][]*types.Transaction, len(request))
) m: m) ]5 {  T5 L- |uncles := make([][]*types.Header, len(request))
* L  Z, N4 C( m: A' V2 }for i, body := range request {
+ V. z0 [) W3 D! A  |        transactions = body.Transactions
, ?' Y' i! K3 f( C        uncles = body.Uncles
. E, z7 \! v- f# Z}
, x  ]: D2 D, V' I+ N1 I9 b7 a// Filter out any explicitly requested bodies, deliver the rest to the downloader
" o0 N$ Y9 B, A4 x. o/ Y$ Z// 先让fetcher过滤去fetcher请求的body,剩下的给downloader
8 [  U5 u. W  Q4 B4 P9 v: Ofilter := len(transactions) > 0 || len(uncles) > 0
3 [1 Y3 o! p! i  hif filter {: z: R* N( Y7 O
        transactions, uncles = pm.fetcher.FilterBodies(p.id, transactions, uncles, time.Now())
' [+ ]$ T# B( ^/ c; V+ |}- |  u# H6 f$ K  }# \2 Y+ Z
// 剩下的body交给downloader
4 n; c9 t2 A3 M6 `if len(transactions) > 0 || len(uncles) > 0 || !filter {$ P9 I( P$ w4 P' b3 Q# i6 Y, J
        err := pm.downloader.DeliverBodies(p.id, transactions, uncles)
: ]* }0 ]9 B8 U3 B" S        if err != nil {/ i7 [2 U5 i$ x- J' v3 [# v
                log.Debug("Failed to deliver bodies", "err", err)
( @' o( h& c0 ?# o0 Y        }2 b0 x6 R4 d( e8 u; y* E
}
0 t3 d" V& O0 v* ]过滤函数的原理也与Header相同。" X8 i# t# X/ [; T! \
// FilterBodies extracts all the block bodies that were explicitly requested by
. V2 L2 W' b# N, \/ F: M// the fetcher, returning those that should be handled differently.! a9 _! N, \* `0 @
// 过去出fetcher请求的body,返回它没有处理的,过程类型header的处理* ?: d* x0 R( U& o+ Z# M- e$ N: i
func (f *Fetcher) FilterBodies(peer string, transactions [][]*types.Transaction, uncles [][]*types.Header, time time.Time) ([][]*types.Transaction, [][]*types.Header) {! E1 O5 i" w. Y6 w# Q
log.Trace(“Filtering bodies”, “peer”, peer, “txs”, len(transactions), “uncles”, len(uncles))# A' r. @! ~# H0 j
// Send the filter channel to the fetcher3 P+ h# n- D/ ^* S8 Y
filter := make(chan *bodyFilterTask)
& N% Z( ]( b1 H& g6 @) ^& Nselect {
! m0 A, M+ v9 l1 `. F! Jcase f.bodyFilter
+ @" I5 c3 I, k# u9 u}
( r( R, T& D2 S' k5 z实际过滤body的处理瞧一下,这和Header的处理是不同的。直接看不点:6 l* R9 _2 b! ~$ F
1. 它要的区块,单独取出来存到`blocks`中,它不要的继续留在`task`中。' h5 {- r  Z( T4 E
2. 判断是不是fetcher请求的方法:如果交易列表和叔块列表计算出的hash值与区块头中的一样,并且消息来自请求的Peer,则就是fetcher请求的。+ y8 b1 e% R" |, N# t2 Z
3. 将`blocks`中的区块加入到`queued`,终结。
' L5 t1 c  x. d" `7 L4 z: E" }: Vcase filter := 4 ^/ I% h7 E# K& \
blocks := []*types.Block{}
" O& [1 f. F/ }% a. S0 S// 获取的每个body的txs列表和uncle列表) _0 p& |+ [2 V. e9 I8 W
// 遍历每个区块的txs列表和uncle列表,计算hash后判断是否是当前fetcher请求的body5 l+ N. Q! j, t# a5 h1 I8 U
for i := 0; i
. F4 \4 M2 V% n5 a1 t8 V$ ~}; i0 \: `! ^/ X7 p# f- m4 S
: W5 U& _  l, i" W/ t
至此,fetcher获取完整区块的流程讲完了,fetcher模块中80%的代码也都贴出来了,还有2个值得看看的函数:6 ]# d' D% k8 w% M3 E# b% g
1. `forgetHash(hash common.Hash)`:用于清空指定hash指的记/状态录信息。
/ A2 u' O0 F* j3 z3 j6 y2. `forgetBlock(hash common.Hash)`:用于从队列中移除一个区块。
4 V3 C0 m9 [8 S* k2 n/ ~. ?: i# @最后了,再回到开始看看fetcher模块和新区块的传播流程,有没有豁然开朗。
  c( X; H  I( S) r( {2 Z
+ k; M! G: {, r, ~7 A# J" b
BitMere.com 比特池塘系信息发布平台,比特池塘仅提供信息存储空间服务。
声明:该文观点仅代表作者本人,本文不代表比特池塘立场,且不构成建议,请谨慎对待。
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

成为第一个吐槽的人

刘艳琴 小学生
  • 粉丝

    0

  • 关注

    0

  • 主题

    3