Hi 游客

更多精彩,请登录!

比特池塘 区块链技术 正文

以太坊源码分析:fetcher模块和区块传播

刘艳琴
227 0 0
从区块传播策略入手,介绍新区块是如何传播到远端节点,以及新区块加入到远端节点本地链的过程,同时会介绍fetcher模块,fetcher的功能是处理Peer通知的区块信息。在介绍过程中,还会涉及到p2p,eth等模块,不会专门介绍,而是专注区块的传播和加入区块链的过程。
; E( h; n6 K" a' p. {. J, f当前代码是以太坊Release 1.8,如果版本不同,代码上可能存在差异。$ r9 {- k) K2 r2 ^7 R
总体过程和传播策略
+ Q' E1 K1 G9 u/ }) n) ~' o; H本节从宏观角度介绍,节点产生区块后,为了传播给远端节点做了啥,远端节点收到区块后又做了什么,每个节点都连接了很多Peer,它传播的策略是什么样的?
1 |5 Y2 o$ G$ H总体流程和策略可以总结为,传播给远端Peer节点,Peer验证区块无误后,加入到本地区块链,继续传播新区块信息。具体过程如下。( `* ]; {0 L6 z9 ~$ Q
先看总体过程。产生区块后,miner模块会发布一个事件NewMinedBlockEvent,订阅事件的协程收到事件后,就会把新区块的消息,广播给它所有的peer,peer收到消息后,会交给自己的fetcher模块处理,fetcher进行基本的验证后,区块没问题,发现这个区块就是本地链需要的下一个区块,则交给blockChain进一步进行完整的验证,这个过程会执行区块所有的交易,无误后把区块加入到本地链,写入数据库,这个过程就是下面的流程图,图1。% t  f: a/ [+ x7 P! {) q' Q4 m

/ _. Q" j0 j2 p/ ^- U: S* N8 M8 j* k总体流程图,能看到有个分叉,是因为节点传播新区块是有策略的。它的传播策略为:
' f+ a- h# ]5 N; T假如节点连接了N个Peer,它只向Peer列表的sqrt(N)个Peer广播完整的区块消息。向所有的Peer广播只包含区块Hash的消息。
# ?6 L8 N6 d7 y4 B! h策略图的效果如图2,红色节点将区块传播给黄色节点:
+ E3 o9 y0 m  X. e2 k- u) |; |) R7 b( D, X; j; j

6 ^' M5 w  d2 v收到区块Hash的节点,需要从发送给它消息的Peer那里获取对应的完整区块,获取区块后就会按照图1的流程,加入到fetcher队列,最终插入本地区块链后,将区块的Hash值广播给和它相连,但还不知道这个区块的Peer。非产生区块节点的策略图,如图3,黄色节点将区块Hash传播给青色节点:
; D2 _  o: {8 I- s4 G6 V0 A5 O6 q4 u2 j; P; c
至此,可以看出以太坊采用以石击水的方式,像水纹一样,层层扩散新产生的区块。
* i' U- p6 f4 g; o: Y7 w4 `' @, P* y4 NFetcher模块是干啥的
. M) A, K% ~- J0 o: D, gfetcher模块的功能,就是收集其他Peer通知它的区块信息:1)完整的区块2)区块Hash消息。根据通知的消息,获取完整的区块,然后传递给eth模块把区块插入区块链。" N- z; E2 Y# s' t* G: h
如果是完整区块,就可以传递给eth插入区块,如果只有区块Hash,则需要从其他的Peer获取此完整的区块,然后再传递给eth插入区块
# f0 e9 f: a* m% m9 ]$ R- Z; y0 E
/ F; l5 y( ]1 s源码解读
  q2 ?* e- E+ `; R5 O+ i4 O; F) \本节介绍区块传播和处理的细节东西,方式仍然是先用图解释流程,再是代码流程。0 z" X. ~" n& e7 F; D
产块节点的传播新区块9 p* L: R: T; `# X7 f& {
节点产生区块后,广播的流程可以表示为图4:
9 x9 N. T: z. N& `& a' L7 P+ U发布事件事件处理函数选择要广播完整的Peer,然后将区块加入到它们的队列事件处理函数把区块Hash添加到所有Peer的另外一个通知队列每个Peer的广播处理函数,会遍历它的待广播区块队列和通知队列,把数据封装成消息,调用P2P接口发送出去0 c5 u4 i0 d: c3 |; Z1 ~

0 s( J  X! e. s6 ~$ I) {/ z2 w* I1 z( }5 @6 }4 a1 J# o6 s
再看下代码上的细节。
/ B( h& L% N! ]+ `7 }  D8 Jworker.wait()函数发布事件NewMinedBlockEvent。ProtocolManager.minedBroadcastLoop()是事件处理函数。它调用了2次pm.BroadcastBlock()。
" k6 }& z4 I7 a* I! ?6 i  ]  A6 r  N' D2 v. T1 v) F7 M' y5 Q
// Mined broadcast loop% M( N' O2 H' C5 \! {6 v8 N
func (pm *ProtocolManager) minedBroadcastLoop() {
: x, M, H- D. g$ S* k. m, [" j        // automatically stops if unsubscribe' @9 [6 V; Q9 i6 W4 G
        for obj := range pm.minedBlockSub.Chan() {
8 v# {7 E# v" N                switch ev := obj.Data.(type) {0 H# g+ m7 N; Y$ F- W* l
                case core.NewMinedBlockEvent:
* E  a8 t& o, {) [  v& @2 _1 ]                        pm.BroadcastBlock(ev.Block, true)  // First propagate block to peers
' y+ A/ p! Z3 p                        pm.BroadcastBlock(ev.Block, false) // Only then announce to the rest& C$ g' h. \, e, U) G7 Q, d" f
                }4 {' ~$ \6 G- J' Z" f; N
        }
; N& E! [0 z  E% ?/ t9 c}3 S! v4 i: Q6 W7 d3 z
pm.BroadcastBlock()的入参propagate为真时,向部分Peer广播完整的区块,调用peer.AsyncSendNewBlock(),否则向所有Peer广播区块头,调用peer.AsyncSendNewBlockHash(),这2个函数就是把数据放入队列,此处不再放代码。3 {  y0 S) c$ W6 J# d% }) Y
// BroadcastBlock will either propagate a block to a subset of it's peers, or
; o5 C  u2 m* W: o// will only announce it's availability (depending what's requested).' T9 M5 K8 B/ f; I: t- V
func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) {
! Y9 A9 s# h8 r8 b( B; P: O% C        hash := block.Hash()$ z5 V- A4 w7 ^  U2 F/ R
        peers := pm.peers.PeersWithoutBlock(hash)
9 {/ b4 X. |0 i, k; F) \        // If propagation is requested, send to a subset of the peer; P. m+ v  n- P; F+ p, V
        // 这种情况,要把区块广播给部分peer
% R4 R; G+ ?; N& |- C& e  M1 u        if propagate {+ p) J. U: g  M* q+ C9 Q2 W& E" m/ v
                // Calculate the TD of the block (it's not imported yet, so block.Td is not valid)
+ Y5 w. D: z/ H/ T- H                // 计算新的总难度) F1 o5 O2 v( ?% \2 f" B6 I
                var td *big.Int
, K. I% H* }% z& k& ~* Z- x3 {* k; Q                if parent := pm.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1); parent != nil {# Q/ q, ?# X9 G* H1 }: H3 c
                        td = new(big.Int).Add(block.Difficulty(), pm.blockchain.GetTd(block.ParentHash(), block.NumberU64()-1))$ D+ q9 O# g7 }) q3 f8 a
                } else {- X# N1 g1 c; s
                        log.Error("Propagating dangling block", "number", block.Number(), "hash", hash)! t  ]  E' z4 |1 U+ {0 _1 D
                        return7 z( F2 H  u" Y$ |  o2 d
                }! d6 f9 J0 i. g( u; p7 [9 I
                // Send the block to a subset of our peers
$ _: E1 k5 ^( m                // 广播区块给部分peer3 @; W3 f3 h+ G( _$ V
                transfer := peers[:int(math.Sqrt(float64(len(peers))))]& J2 f3 u6 _& O$ L
                for _, peer := range transfer {
# U( H$ D( B, S. j                        peer.AsyncSendNewBlock(block, td)
3 q0 N# T: H; D                }2 ]1 T% D9 ^3 k% S4 S8 T* P  @
                log.Trace("Propagated block", "hash", hash, "recipients", len(transfer), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))
& Q+ Q) Z! Z# ]: H( K8 L                return
% {  p% v2 ~! z2 @5 v        }
0 S$ p+ x( l7 D4 S$ G' j        // Otherwise if the block is indeed in out own chain, announce it" i4 }8 U( M5 g  l( m
        // 把区块hash值广播给所有peer
- M! Y( G+ k$ m- Q3 l+ P        if pm.blockchain.HasBlock(hash, block.NumberU64()) {+ R' Q2 [& {# ]
                for _, peer := range peers {
' _& V6 I* ]) ^" J4 `: w( F                        peer.AsyncSendNewBlockHash(block)& J$ F( b* d4 w$ u0 E/ T
                }
; S2 K6 e" \/ d8 c) i                log.Trace("Announced block", "hash", hash, "recipients", len(peers), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))
- K1 z- w; V; ^- P0 Z, C3 l        }! D! a! }* v/ _# R
}
4 |# n% y( d. t  npeer.broadcase()是每个Peer连接的广播函数,它只广播3种消息:交易、完整的区块、区块的Hash,这样表明了节点只会主动广播这3中类型的数据,剩余的数据同步,都是通过请求-响应的方式。
8 Y: }5 ^$ J9 l// broadcast is a write loop that multiplexes block propagations, announcements/ P$ u* @1 i# d( n2 }3 |
// and transaction broadcasts into the remote peer. The goal is to have an async
$ [% V( f- M2 F2 b* F// writer that does not lock up node internals.# K) A/ l" j1 h# l( Z0 g1 U
func (p *peer) broadcast() {
4 P# v/ b" e: O5 X* V        for {
. ?! S. e7 [7 a+ n: D' J                select {0 h  d$ ^- p( J6 a; [
                // 广播交易
1 b% o  g/ h2 M7 p& y9 h( T                case txs :=
# C  v% K2 p( k: {Peer节点处理新区块% ?8 }, R, r; A( F) Z8 a4 h
本节介绍远端节点收到2种区块同步消息的处理,其中NewBlockMsg的处理流程比较清晰,也简洁。NewBlockHashesMsg消息的处理就绕了2绕,从总体流程图1上能看出来,它需要先从给他发送消息Peer那里获取到完整的区块,剩下的流程和NewBlockMsg又一致了。
: A3 m$ i, {3 P这部分涉及的模块多,画出来有种眼花缭乱的感觉,但只要抓住上面的主线,代码看起来还是很清晰的。通过图5先看下整体流程。
3 h, ^% c$ N' X& e- u! i消息处理的起点是ProtocolManager.handleMsg,NewBlockMsg的处理流程是蓝色标记的区域,红色区域是单独的协程,是fetcher处理队列中区块的流程,如果从队列中取出的区块是当前链需要的,校验后,调用blockchian.InsertChain()把区块插入到区块链,最后写入数据库,这是黄色部分。最后,绿色部分是NewBlockHashesMsg的处理流程,代码流程上是比较复杂的,为了能通过图描述整体流程,我把它简化掉了。
8 w$ k# V0 A- C5 o- E- V. F
' m4 V- H$ N$ C. V) R仔细看看这幅图,掌握整体的流程后,接下来看每个步骤的细节。
$ }- o5 N) `1 E/ A6 QNewBlockMsg的处理
% B* J% b/ h6 {, F: _" S' R本节介绍节点收到完整区块的处理,流程如下:' r1 ~7 |$ o# O8 E2 z# Z
首先进行RLP编解码,然后标记发送消息的Peer已经知道这个区块,这样本节点最后广播这个区块的Hash时,不会再发送给该Peer。- w8 O0 I" Q! D9 z% p$ V& w* [
将区块存入到fetcher的队列,调用fetcher.Enqueue。
0 s5 g- {& h( E更新Peer的Head位置,然后判断本地链是否落后于Peer的链,如果是,则通过Peer更新本地链。
) Z. x, O' O0 c+ A" y只看handle.Msg()的NewBlockMsg相关的部分。' F: o5 Q. _; y2 R7 O
case msg.Code == NewBlockMsg:
5 d. e5 z- D2 u8 y2 u        // Retrieve and decode the propagated block
# F% ?1 V! R' Z6 G! s+ n5 c: X8 N        // 收到新区块,解码,赋值接收数据: ]; r+ W; Y' W0 L
        var request newBlockData
7 N8 I8 y% a7 y        if err := msg.Decode(&request); err != nil {6 J4 n+ ^7 w' `) E
                return errResp(ErrDecode, "%v: %v", msg, err)
6 V7 M4 K7 s& O3 ~  T' i        }
0 P4 ]  a% x# x7 `- C5 a# }9 E        request.Block.ReceivedAt = msg.ReceivedAt2 n7 u3 D7 U3 m& i9 a8 O5 ^
        request.Block.ReceivedFrom = p( b$ X: T$ S. m3 B; s- \" Q
        // Mark the peer as owning the block and schedule it for import
& G4 ~# P' F  x# J4 Y        // 标记peer知道这个区块
+ w! m2 m$ ~/ f+ `2 b8 V3 `        p.MarkBlock(request.Block.Hash())
( L( o- H% ^, a" m0 A. M/ ]! ?        // 为啥要如队列?已经得到完整的区块了2 i7 o' m+ l$ [3 \7 h
        // 答:存入fetcher的优先级队列,fetcher会从队列中选取当前高度需要的块1 ^/ a3 s2 z3 V* @1 O
        pm.fetcher.Enqueue(p.id, request.Block)4 ^- [. \4 ]& D8 A6 c
        // Assuming the block is importable by the peer, but possibly not yet done so,( w% v8 [9 G4 f# B+ R
        // calculate the head hash and TD that the peer truly must have.
$ X0 u3 d) v4 a' u% K( N        // 截止到parent区块的头和难度) q! l9 G7 s2 ~) h7 z9 j# z/ H
        var (
- a, U3 x$ @4 n  W' U                trueHead = request.Block.ParentHash()+ U+ I; v- W! b2 ?+ c# N
                trueTD   = new(big.Int).Sub(request.TD, request.Block.Difficulty())
2 y# ?6 j, _; f  n- g        ); x- A4 d  L  w) X3 T. b2 `' L
        // Update the peers total difficulty if better than the previous1 Z! l* m3 _  @& a# l% t
        // 如果收到的块的难度大于peer之前的,以及自己本地的,就去和这个peer同步; Q: c  _1 J/ S( ?
        // 问题:就只用了一下块里的hash指,为啥不直接使用这个块呢,如果这个块不能用,干嘛不少发送些数据,减少网络负载呢。6 f" E/ U8 A- X( g& w" R
        // 答案:实际上,这个块加入到了优先级队列中,当fetcher的loop检查到当前下一个区块的高度,正是队列中有的,则不再向peer请求
3 ~! G! i4 ]& l5 V9 `9 G        // 该区块,而是直接使用该区块,检查无误后交给block chain执行insertChain6 b8 g. K+ e. [" J+ ?
        if _, td := p.Head(); trueTD.Cmp(td) > 0 {4 G1 U; R) P" H6 j7 D
                p.SetHead(trueHead, trueTD)' r) D, [5 Q  t3 a
                // Schedule a sync if above ours. Note, this will not fire a sync for a gap of7 s0 s* t# `: A" ~
                // a singe block (as the true TD is below the propagated block), however this+ b; b; A/ X) i( B6 C
                // scenario should easily be covered by the fetcher.9 L4 Q+ k+ i$ j$ j
                currentBlock := pm.blockchain.CurrentBlock()
) l8 r; n) H. J* n: X! @( [3 t                if trueTD.Cmp(pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64())) > 0 {
- L+ X( S" l  l9 g3 \3 \                        go pm.synchronise(p)
. m7 c: W- h- A                }
/ c* J! c, t( V, e3 g1 F$ f8 L        }
6 M% B& W' Y1 T: T6 h* h! T//------------------------ 以上 handleMsg3 X+ Q% y* k" Y: s. q- W
// Enqueue tries to fill gaps the the fetcher's future import queue.  W+ A; o% q- L( {
// 发给inject通道,当前协程在handleMsg,通过通道发送给fetcher的协程处理/ h- g$ F/ t- k7 K1 J! u/ F( D* v
func (f *Fetcher) Enqueue(peer string, block *types.Block) error {8 D$ s" O" N0 J# x% a7 n
        op := &inject{  y2 g- _2 ~- _; J
                origin: peer,7 _* s: h1 ]  E
                block:  block,
3 P2 B# L$ S6 t7 ^" K. o# o) y        }6 E3 c8 z/ z" I
        select {
$ Y% p+ p9 U$ X( K" X& s$ P9 ~; `        case f.inject  blockLimit {
) S) [% l/ g+ ^/ L; s                log.Debug("Discarded propagated block, exceeded allowance", "peer", peer, "number", block.Number(), "hash", hash, "limit", blockLimit)( X: t" x$ m' {6 v) |& S) r! e
                propBroadcastDOSMeter.Mark(1)
3 |8 }4 u7 W  g1 K: Q, e) Z                f.forgetHash(hash). p, L+ R9 b2 `( w( V! z, M4 j
                return
  p% a9 D9 O( H: X/ p) s/ ~        }
1 D8 k" u' z6 A- S2 `9 c% c% X        // Discard any past or too distant blocks
0 g1 ]$ N7 K0 b. ]; ~1 m. h        // 高度检查:未来太远的块丢弃1 j8 [3 L3 i% l" ?, X
        if dist := int64(block.NumberU64()) - int64(f.chainHeight()); dist  maxQueueDist {: {. {" a9 f8 O% s: ~4 C/ W
                log.Debug("Discarded propagated block, too far away", "peer", peer, "number", block.Number(), "hash", hash, "distance", dist)
7 f4 m! ^1 c7 d) K                propBroadcastDropMeter.Mark(1)
4 _, O4 @" N% n3 K+ v& s                f.forgetHash(hash); A0 [1 |1 T0 L: B; N3 F
                return
( y' y7 q5 Q+ j( d5 B7 s7 l, }7 B        }
) j; k( c; |' b. a0 e& K9 w        // Schedule the block for future importing
6 _# q& i0 g+ m. z, v9 v        // 块先加入优先级队列,加入链之前,还有很多要做
" ^+ x" \) Q" z; e7 Y2 c. o        if _, ok := f.queued[hash]; !ok {8 M! L- Y" |' g4 W2 |) w
                op := &inject{
% T0 i5 _! g% `! \' [  }. W                        origin: peer,- u9 v' G7 {) ?9 b! c  Z: r
                        block:  block,
. K( h) E8 X$ z8 q7 e6 }                }
- g5 F5 ~- C7 A, K0 k                f.queues[peer] = count
. }6 b7 r$ F- K                f.queued[hash] = op! m: {* p  l: z$ o  N4 ], b1 C. ^
                f.queue.Push(op, -float32(block.NumberU64()))
" z1 K* x% F7 F- b& R9 Y: Z4 j                if f.queueChangeHook != nil {- s2 b7 N* [. _
                        f.queueChangeHook(op.block.Hash(), true)
: {3 o7 [0 @6 ?; o/ P                }, I, c( e! @9 M* w5 S7 v
                log.Debug("Queued propagated block", "peer", peer, "number", block.Number(), "hash", hash, "queued", f.queue.Size())
/ d9 ?2 R/ M6 |        }
6 T% ], y; T2 t3 P8 ~# q}
0 A: S# y( s2 A8 i5 b9 X. ?9 ]fetcher队列处理
+ ?: D$ j3 {5 K5 o* h0 Q本节我们看看,区块加入队列后,fetcher如何处理区块,为何不直接校验区块,插入到本地链?
7 E1 E% i# ]& Z# D* |5 a) D1 d由于以太坊又Uncle的机制,节点可能收到老一点的一些区块。另外,节点可能由于网络原因,落后了几个区块,所以可能收到“未来”的一些区块,这些区块都不能直接插入到本地链。
) m  K& ]: |. L& z3 l( |, S" k区块入的队列是一个优先级队列,高度低的区块会被优先取出来。fetcher.loop是单独协程,不断运转,清理fecther中的事务和事件。首先会清理正在fetching的区块,但已经超时。然后处理优先级队列中的区块,判断高度是否是下一个区块,如果是则调用f.insert()函数,校验后调用BlockChain.InsertChain(),成功插入后,广播新区块的Hash
. g4 q$ L0 f4 z+ s0 H* A. E// Loop is the main fetcher loop, checking and processing various notification
& s+ O7 ~) u: K2 X, J# y// events.6 L  y0 Q+ k% D
func (f *Fetcher) loop() {: r6 L; y0 [. |8 f7 V9 B
        // Iterate the block fetching until a quit is requested6 W5 G' V( s: Y
        fetchTimer := time.NewTimer(0)! j* U% d* E! x7 O' I
        completeTimer := time.NewTimer(0)3 ~) x& N" E6 J$ f# e% A) }/ z" Q
        for {
: t8 `; ~% g; C8 B% y+ N                // Clean up any expired block fetches% n( {0 ]  Q8 n5 p4 S. w2 J/ b
                // 清理过期的区块
" Q+ x' J2 ~/ M; G/ R                for hash, announce := range f.fetching {5 d+ [0 ~3 J- r8 c0 m
                        if time.Since(announce.time) > fetchTimeout {
& Q* j9 `  E) \2 `; f  a( q                                f.forgetHash(hash), b/ E* Y3 W, ~5 d- f3 U: w- g$ D
                        }
& d4 V1 V( Y/ U$ A/ F5 q                }5 u8 I/ [) N. |& N6 h' N- r1 l
                // Import any queued blocks that could potentially fit
6 \) ~% r* {6 d( v                // 导入队列中合适的块
2 q( w. U  s5 v) Q, q: r- ~                height := f.chainHeight()) b* G# ^' ^6 T- W: N' h
                for !f.queue.Empty() {
8 m! \/ T3 a: P0 }9 N; f                        op := f.queue.PopItem().(*inject)
- [. g/ B1 o. g( ?                        hash := op.block.Hash()
& s. m# x; U1 W0 ]0 y/ X4 \                        if f.queueChangeHook != nil {
0 I% i/ \, H. A                                f.queueChangeHook(hash, false): r7 M- y+ t5 @* m. V
                        }
2 T7 P2 R8 [( l5 R* @; A                        // If too high up the chain or phase, continue later
! ~6 x9 |7 A) u1 g8 Q5 O7 e                        // 块不是链需要的下一个块,再入优先级队列,停止循环
8 ~8 F2 i- o' E                        number := op.block.NumberU64()# s. V: R6 e4 {6 `
                        if number > height+1 {# ^4 K* G5 v* g6 ^7 E% W
                                f.queue.Push(op, -float32(number))
2 j. I, X: A5 }( A                                if f.queueChangeHook != nil {! a  t  ^3 j& v. K# P5 p4 M' v- t
                                        f.queueChangeHook(hash, true). {- k: ?6 }+ c! V  q
                                }
6 X8 A) h& V) j& a                                break
' E. n; f- \  B8 E! h                        }' l6 Y1 o7 b' K% }% A5 M
                        // Otherwise if fresh and still unknown, try and import
8 [+ L$ @+ s' L# [% g  ^                        // 高度正好是我们想要的,并且链上也没有这个块
& S+ h6 R5 z/ Y% D' q1 g                        if number+maxUncleDist 6 h( p/ u' C9 j
func (f *Fetcher) insert(peer string, block *types.Block) {
  T9 I* k% a; w6 N/ z. i/ [/ ~1 x3 L        hash := block.Hash()' Q. u4 n( \8 N& a+ \
        // Run the import on a new thread/ _9 f/ v$ L4 Z# R# |
        log.Debug("Importing propagated block", "peer", peer, "number", block.Number(), "hash", hash)2 C3 g1 D) f+ ?' t" W
        go func() {
0 x# H; h) c! e+ N5 Z                defer func() { f.done
5 I  o8 P0 r* i* ]: v2 XNewBlockHashesMsg的处理. B) k* `7 @9 g! \- l0 |
本节介绍NewBlockHashesMsg的处理,其实,消息处理是简单的,而复杂一点的是从Peer哪获取完整的区块,下节再看。' \0 t. i" X/ F$ O* `  E9 h
流程如下:
. \* r1 R- T; Y7 z0 R% R: H对消息进行RLP解码,然后标记Peer已经知道此区块。寻找出本地区块链不存在的区块Hash值,把这些未知的Hash通知给fetcher。fetcher.Notify记录好通知信息,塞入notify通道,以便交给fetcher的协程。fetcher.loop()会对notify中的消息进行处理,确认区块并非DOS攻击,然后检查区块的高度,判断该区块是否已经在fetching或者comleting(代表已经下载区块头,在下载body),如果都没有,则加入到announced中,触发0s定时器,进行处理。
- ]6 |* ]" |3 D" l( I/ ]关于announced下节再介绍。
# @: A3 ~7 m5 D# R* P/ \0 ^7 ~2 L8 G" q, B) |5 I
// handleMsg()部分4 t7 ~* K: A) [: O/ l
case msg.Code == NewBlockHashesMsg:$ k& E, O8 t  f. x4 j
        var announces newBlockHashesData
4 Z" u$ x  `4 l' ^        if err := msg.Decode(&announces); err != nil {1 M( O# @% T) P5 i( X
                return errResp(ErrDecode, "%v: %v", msg, err)
# Z0 F& U8 M5 A0 L6 v        }
+ j+ G1 l  Q, X1 j% ~7 h$ z" Q  R        // Mark the hashes as present at the remote node
$ \4 r, K, l' g( r3 n* S        for _, block := range announces {# l1 o( q8 C- g8 O
                p.MarkBlock(block.Hash)* }$ Y$ m; h# x% |) o. g1 q
        }
2 Y, h! M! I) V        // Schedule all the unknown hashes for retrieval
( o" q' l5 s: U        // 把本地链没有的块hash找出来,交给fetcher去下载( G  @$ i  W1 Z2 N- U$ h
        unknown := make(newBlockHashesData, 0, len(announces))
' m9 c& p3 t9 O  J/ ^: U/ z; G, c        for _, block := range announces {) q( K( e  L' ~3 q) W
                if !pm.blockchain.HasBlock(block.Hash, block.Number) {* b; g: h7 b, ~6 \/ m5 }# Q" d
                        unknown = append(unknown, block)
5 H8 d3 j! |/ w, Z6 e# ?, M                }: z) g0 K# f  K' X
        }2 [% W1 D3 I: [) E9 J
        for _, block := range unknown {# W5 i- h& _2 j6 `5 u2 Y
                pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestOneHeader, p.RequestBodies)
" o# r* R3 U8 U, c7 M        }
' [) H( U) N" U' Q. T! Z; A3 b) x// Notify announces the fetcher of the potential availability of a new block in
/ `/ `' }: i* ~9 A8 H// the network.
) {+ J, a5 M* K% U+ u// 通知fetcher(自己)有新块产生,没有块实体,有hash、高度等信息6 g5 w: c5 j. ~4 R3 i
func (f *Fetcher) Notify(peer string, hash common.Hash, number uint64, time time.Time,. i3 b0 M) W! G! n( ]* G
        headerFetcher headerRequesterFn, bodyFetcher bodyRequesterFn) error {/ W  ?: X2 u! F
        block := &announce{( D5 D! x% K; i; P, V# P: d! u
                hash:        hash,
+ }3 r! d( h2 y                number:      number,
8 B. W+ K, d6 R/ S8 a. V) c  u0 m                time:        time,: P% m% C6 y; F& x
                origin:      peer,
5 \; R& r! A( }& ^/ Z* h7 a                fetchHeader: headerFetcher,) n. u5 @' K; N3 J( S1 F3 Q
                fetchBodies: bodyFetcher,) P) [$ a! ?( {3 J
        }
; m+ d8 y. q$ k$ ~3 L        select {- v$ J: ^7 N  `
        case f.notify  hashLimit {
) r0 V1 ^/ ~6 ?7 o, h0 [3 Z5 e                log.Debug("Peer exceeded outstanding announces", "peer", notification.origin, "limit", hashLimit)% p  }$ W3 w) s' q7 w8 f
                propAnnounceDOSMeter.Mark(1)
2 \: n8 A3 }1 K                break
; S5 Q: h9 W7 d- k% T        }
' \  W; w" e3 x! G( Q. k; ^        // If we have a valid block number, check that it's potentially useful
( M7 _! i, E- ?        // 高度检查% B# \$ p- m) g" L0 u
        if notification.number > 0 {
* u  U( G; d+ T                if dist := int64(notification.number) - int64(f.chainHeight()); dist  maxQueueDist {
' H9 G" m. B+ |2 _+ Y& }                        log.Debug("Peer discarded announcement", "peer", notification.origin, "number", notification.number, "hash", notification.hash, "distance", dist)' T4 y* ^% i) G) s3 ^
                        propAnnounceDropMeter.Mark(1)& r0 A' A" ]& W; z& D' B
                        break! l$ Z& g- R; V- C' p
                }
; ^  q" r9 c+ o$ Y/ Y6 A" |        }
$ E6 J. f+ R& d! i2 |, D5 l4 Z# F        // All is well, schedule the announce if block's not yet downloading& C, m( \. R( @
        // 检查是否已经在下载,已下载则忽略' X1 I/ l3 m4 M0 T# p$ ]
        if _, ok := f.fetching[notification.hash]; ok {
: h  H1 e6 S, Z                break3 F/ _8 P) |/ z+ ]. `. t  P' j/ \
        }
7 a) I# I* {, ~        if _, ok := f.completing[notification.hash]; ok {+ b. S" w/ }3 o' K& r# h* p/ M
                break0 K! ~0 Y9 I) ^; h$ m$ y+ Y
        }; M6 W5 z+ r' z9 @! _0 n8 ^
        // 更新peer已经通知给我们的区块数量
' {* W: M( a1 k! R- j: R; ?1 z        f.announces[notification.origin] = count
5 x9 P2 x0 N! w$ T        // 把通知信息加入到announced,供调度
# T8 W  ?8 P0 W; F5 R        f.announced[notification.hash] = append(f.announced[notification.hash], notification)- x# L: @9 n/ T
        if f.announceChangeHook != nil && len(f.announced[notification.hash]) == 1 {0 x2 F. C+ C, d0 n- O/ H; z
                f.announceChangeHook(notification.hash, true)1 M$ M2 D* B8 D
        }5 {$ g. d0 {3 R9 o" V( C' ?, y7 m
        if len(f.announced) == 1 {
$ D' m9 ^: O. B  F5 y8 ^$ K                // 有通知放入到announced,则重设0s定时器,loop的另外一个分支会处理这些通知6 g: j3 g, Z5 M, ^; h6 S
                f.rescheduleFetch(fetchTimer)) h# F1 l! I9 ~- g
        }5 {) r  Y( F9 \+ O2 c4 I  _5 Y
fetcher获取完整区块6 t" @% k' x' G) K% F3 `# C
本节介绍fetcher获取完整区块的过程,这也是fetcher最重要的功能,会涉及到fetcher至少80%的代码。单独拉放一大节吧。7 M4 W. {% f" z
Fetcher的大头! ]* ], I7 H; \! y& [6 ?# h
Fetcher最主要的功能就是获取完整的区块,然后在合适的实际交给InsertChain去验证和插入到本地区块链。我们还是从宏观入手,看Fetcher是如何工作的,一定要先掌握好宏观,因为代码层面上没有这么清晰。- E5 U  `4 {. H
宏观
8 P' D% h* |) d$ V) |6 ]首先,看两个节点是如何交互,获取完整区块,使用时序图的方式看一下,见图6,流程很清晰不再文字介绍。
7 y& e/ J* [! c# H* E+ c/ G
* ?, J2 m# h7 w5 d' r8 Z, ]再看下获取区块过程中,fetcher内部的状态转移,它使用状态来记录,要获取的区块在什么阶段,见图7。我稍微解释一下:) J! w$ Q1 [% c  G5 ~
收到NewBlockHashesMsg后,相关信息会记录到announced,进入announced状态,代表了本节点接收了消息。announced由fetcher协程处理,经过校验后,会向给他发送消息的Peer发送请求,请求该区块的区块头,然后进入fetching状态。获取区块头后,如果区块头表示没有交易和uncle,则转移到completing状态,并且使用区块头合成完整的区块,加入到queued优先级队列。获取区块头后,如果区块头表示该区块有交易和uncle,则转移到fetched状态,然后发送请求,请求交易和uncle,然后转移到completing状态。收到交易和uncle后,使用头、交易、uncle这3个信息,生成完整的区块,加入到队列queued。
/ U0 `7 z7 O) [+ H8 `/ u  o8 C
; Z; E" @" ^/ o8 ]$ t+ J4 ]. S+ K* R# @% R- x, ]
微观( P1 t) z5 ]  q
接下来就是从代码角度看如何获取完整区块的流程了,有点多,看不懂的时候,再回顾下上面宏观的介绍图。
$ v  `% ^4 O2 D0 C首先看Fetcher的定义,它存放了通信数据和状态管理,捡加注释的看,上文提到的状态,里面都有。3 w9 L+ F- m2 M6 R. k# p% K
// Fetcher is responsible for accumulating block announcements from various peers
9 E5 Z2 s2 F4 k0 c+ p// and scheduling them for retrieval.6 D# k7 H4 P5 q6 f, `3 R" R
// 积累块通知,然后调度获取这些块
7 q- A& }* ~* `# v: Utype Fetcher struct {  @4 z% k/ K7 B
        // Various event channels( k. l" E3 }" G, @
    // 收到区块hash值的通道$ ~- j+ M1 e( t7 ]* D1 x! q0 f
        notify chan *announce
+ h2 H4 p5 i1 ~    // 收到完整区块的通道+ C6 I/ e& ^7 d# G' X) J
        inject chan *inject
5 j/ E# \7 j. `( r' c        blockFilter chan chan []*types.Block
* ^% h% [$ `: d) k- I        // 过滤header的通道的通道# _+ O$ C1 I+ k
        headerFilter chan chan *headerFilterTask
& i2 }$ d  V+ i/ w/ q        // 过滤body的通道的通道
8 P" q1 _5 N, Z( e$ B* ~( d        bodyFilter chan chan *bodyFilterTask
$ U+ r- o9 X8 W3 V        done chan common.Hash
/ S$ Q9 f: V; x0 a        quit chan struct{}
$ |" H. U* y1 j2 h( `$ C" J" S        // Announce states
" i! h4 ]$ {1 K0 L4 o0 S        // Peer已经给了本节点多少区块头通知
* n3 d  H$ l: c: U  Y        announces map[string]int // Per peer announce counts to prevent memory exhaustion
& c5 Z* R9 R4 ~  o+ U        // 已经announced的区块列表3 t. K4 a: L5 d7 Z: H* n
        announced map[common.Hash][]*announce // Announced blocks, scheduled for fetching# A9 z+ m5 C- \2 C' q
        // 正在fetching区块头的请求7 C1 r1 I+ b4 r1 e4 }# R8 \/ A
        fetching map[common.Hash]*announce // Announced blocks, currently fetching
( y% A9 Z2 r: P        // 已经fetch到区块头,还差body的请求,用来获取body
5 Z, h, g* ?* K( h3 x2 V" c        fetched map[common.Hash][]*announce // Blocks with headers fetched, scheduled for body retrieval* p* H3 O; `+ x
        // 已经得到区块头的* t1 R) Q' A  w" P* U
        completing map[common.Hash]*announce // Blocks with headers, currently body-completing/ I. o8 s0 [% N" f$ D5 c- a
        // Block cache+ N. ^3 D5 L8 f: v
        // queue,优先级队列,高度做优先级
0 c' o' X; e% \1 X2 [" I# ~! u        // queues,统计peer通告了多少块
2 G1 b9 u' d, f* k6 A        // queued,代表这个块如队列了,3 D7 K5 B8 x+ A& c. I: s
        queue  *prque.Prque            // Queue containing the import operations (block number sorted)# q4 e% z- ^# x8 _% f: z3 L8 y
        queues map[string]int          // Per peer block counts to prevent memory exhaustion
1 T0 A8 d" \: f8 ~. Z        queued map[common.Hash]*inject // Set of already queued blocks (to dedupe imports)
" K! f% D7 F! R4 L. C4 y        // Callbacks; B' X  E  f% c! e' N7 l8 K2 f  w* U
        getBlock       blockRetrievalFn   // Retrieves a block from the local chain
( F7 l9 c: T' D% W# h0 m$ \# `        verifyHeader   headerVerifierFn   // Checks if a block's headers have a valid proof of work,验证区块头,包含了PoW验证* Y/ C9 X9 J4 m, U
        broadcastBlock blockBroadcasterFn // Broadcasts a block to connected peers,广播给peer
6 d, b' w2 x& p! ~, E; X! O        chainHeight    chainHeightFn      // Retrieves the current chain's height
% M; [0 W$ t8 i        insertChain    chainInsertFn      // Injects a batch of blocks into the chain,插入区块到链的函数
3 t  S( v! _7 \6 L; @9 S( j        dropPeer       peerDropFn         // Drops a peer for misbehaving
5 l& Y0 g- W( B" v: T        // Testing hooks% O. J4 _( \3 K* I: w
        announceChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a hash from the announce list
% `2 N- C- g- C3 @( B; q        queueChangeHook    func(common.Hash, bool) // Method to call upon adding or deleting a block from the import queue* G# s- n6 Q# V# K8 f
        fetchingHook       func([]common.Hash)     // Method to call upon starting a block (eth/61) or header (eth/62) fetch4 R8 W  E" D# m: j3 P6 `
        completingHook     func([]common.Hash)     // Method to call upon starting a block body fetch (eth/62), g. H' r6 h* L1 `2 J9 T% a
        importedHook       func(*types.Block)      // Method to call upon successful block import (both eth/61 and eth/62)6 B7 s& P* T% Z1 \3 b" f3 j) g
}
6 R7 _/ H' y6 g) ~/ N1 dNewBlockHashesMsg消息的处理前面的小节已经讲过了,不记得可向前翻看。这里从announced的状态处理说起。loop()        中,fetchTimer超时后,代表了收到了消息通知,需要处理,会从announced中选择出需要处理的通知,然后创建请求,请求区块头,由于可能有很多节点都通知了它某个区块的Hash,所以随机的从这些发送消息的Peer中选择一个Peer,发送请求的时候,为每个Peer都创建了单独的协程。- D% c/ Q! q8 R
case  arriveTimeout-gatherSlack {
! ]' n- y% A, t) [7 X) k7 r2 s, }                        // Pick a random peer to retrieve from, reset all others: r# C8 x. K9 y. U* e3 Q" D3 u
                        // 可能有很多peer都发送了这个区块的hash值,随机选择一个peer
/ M- `2 s1 D: G. \                        announce := announces[rand.Intn(len(announces))]
7 L' u; `9 r3 R" m4 O2 y0 C) v                        f.forgetHash(hash)3 X  K8 f7 G8 M
                        // If the block still didn't arrive, queue for fetching9 L1 J' @) f2 \# y# x8 N: E
                        // 本地还没有这个区块,创建获取区块的请求
' e8 r$ Z8 y( s% u, G" W3 _                        if f.getBlock(hash) == nil {
% {/ {1 S+ I+ t2 Z" d/ V4 C' N  i( I                                request[announce.origin] = append(request[announce.origin], hash)+ k" k: P. y: z2 a. p
                                f.fetching[hash] = announce
' q. S0 b1 ~- r$ c# }                        }( V+ H1 T6 F0 j  ]
                }3 B' b% ^+ q) |  n
        }
, X, x3 b, W% @9 z5 |8 s        // Send out all block header requests
7 }3 y& ~, Z! z8 L        // 把所有的request发送出去3 Q; E: o9 Y' }: O% ^3 Q7 |% r6 G+ ?% q
        // 为每一个peer都创建一个协程,然后请求所有需要从该peer获取的请求
! V9 d6 e. C, K8 o, Q        for peer, hashes := range request {
$ c, q: r' J( \1 Y* S( X5 h                log.Trace("Fetching scheduled headers", "peer", peer, "list", hashes)
; o  E+ \# r1 U# G1 ^                // Create a closure of the fetch and schedule in on a new thread6 {  O7 W- T" W4 e- f
                fetchHeader, hashes := f.fetching[hashes[0]].fetchHeader, hashes, I% N1 {$ f) o( l
                go func() {3 B- d3 Y$ Y) p) ^5 u! z
                        if f.fetchingHook != nil {+ Q* {$ m/ D6 X$ V, e  m
                                f.fetchingHook(hashes). M9 s' P0 g/ f9 Y& O" X, c
                        }
9 w4 @1 d5 y+ l3 s4 Z3 Y  b                        for _, hash := range hashes {
1 o# \3 X1 u) q                                headerFetchMeter.Mark(1)% |* M) Z. R7 }7 r6 \, f) ]* \0 w
                                fetchHeader(hash) // Suboptimal, but protocol doesn't allow batch header retrievals0 F6 D8 P$ P' t4 U+ E% T
                        }
  H: z1 p3 I$ _/ q8 ^6 u2 s0 o                }()) {6 M* ?( G' p) ~' t4 |; h# |
        }) B+ n5 s# v! \- O+ Z
        // Schedule the next fetch if blocks are still pending) ?/ g  V, s: I/ g8 k, J$ D- X
        f.rescheduleFetch(fetchTimer)
% Y, ]+ v# g' w. p! F; x从Notify的调用中,可以看出,fetcherHeader()的实际函数是RequestOneHeader(),该函数使用的消息是GetBlockHeadersMsg,可以用来请求多个区块头,不过fetcher只请求一个。
3 r- t6 F9 Z$ R9 T2 a# P, T3 Ipm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestOneHeader, p.RequestBodies)* p( j2 T  z% M1 p2 K9 S5 d+ f% c- g
// RequestOneHeader is a wrapper around the header query functions to fetch a) o( y6 @: ~8 f: @
// single header. It is used solely by the fetcher.9 j# X9 l/ J& ]5 q  k
func (p *peer) RequestOneHeader(hash common.Hash) error {
. l) m% {( [+ d- l4 @        p.Log().Debug("Fetching single header", "hash", hash)
3 h5 Y; b5 I) J        return p2p.Send(p.rw, GetBlockHeadersMsg, &getBlockHeadersData{Origin: hashOrNumber{Hash: hash}, Amount: uint64(1), Skip: uint64(0), Reverse: false})1 c# _9 V; b. s: @: _9 Y
}) W! U% l- m1 j! p- T  \
GetBlockHeadersMsg的处理如下:因为它是获取多个区块头的,所以处理起来比较“麻烦”,还好,fetcher只获取一个区块头,其处理在20行~33行,获取下一个区块头的处理逻辑,这里就不看了,最后调用SendBlockHeaders()将区块头发送给请求的节点,消息是BlockHeadersMsg。
: v! |# f! \2 \···
9 h9 _% R- j" }9 L2 S// handleMsg()
( Q0 I  ?- Z# s0 R8 b& P7 T// Block header query, collect the requested headers and reply
& p3 t# _' N0 f- B: G% e8 O( @5 ?9 tcase msg.Code == GetBlockHeadersMsg:$ E" |1 p' T6 G0 b3 P
// Decode the complex header query
2 p9 x4 [, Q, P3 \. `% a# fvar query getBlockHeadersData
4 d4 K/ \& p5 r$ D8 o  V7 N. Qif err := msg.Decode(&query); err != nil {; A2 ~  e6 y- X  ?* n  C$ m
return errResp(ErrDecode, “%v: %v”, msg, err)3 T7 A: D- S1 T0 g2 T# o! l/ `* z4 X+ c
}
; U9 A! O/ D/ E5 c+ ?  h' e8 `hashMode := query.Origin.Hash != (common.Hash{})' S/ ^  y. x) E/ l% H
// Gather headers until the fetch or network limits is reached
5 n1 ?! s1 b* @$ E3 M. L( T// 收集区块头,直到达到限制
$ D) Y& A( u5 A- R, r5 ~5 y+ cvar (
/ B$ ?& c8 a4 I9 i# X# l        bytes   common.StorageSize" l' o5 Z, m7 D& |& q0 C
        headers []*types.Header
" {# s0 t5 z1 P( C$ k        unknown bool4 p7 E. ?! [3 f
)/ ]$ L' h! s5 N4 U7 }9 h
// 自己已知区块 && 少于查询的数量 && 大小小于2MB && 小于能下载的最大数量. m, |. c+ F; c- k, u' C  Z# f
for !unknown && len(headers)
; p! m+ w& z7 G. {`BlockHeadersMsg`的处理很有意思,因为`GetBlockHeadersMsg`并不是fetcher独占的消息,downloader也可以调用,所以,响应消息的处理需要分辨出是fetcher请求的,还是downloader请求的。它的处理逻辑是:fetcher先过滤收到的区块头,如果fetcher不要的,那就是downloader的,在调用`fetcher.FilterHeaders`的时候,fetcher就将自己要的区块头拿走了。
( j! z" }' B0 [# f. |. ]  Q// handleMsg()
" Z1 e9 v# J3 X3 ], \! Q: vcase msg.Code == BlockHeadersMsg:" B# w- g2 |; v, j+ c
// A batch of headers arrived to one of our previous requests0 f# F0 r0 @: i: Q. Y2 Y5 w, A
var headers []*types.Header$ h, Q3 O5 c, ^3 Y7 ~6 F
if err := msg.Decode(&headers); err != nil {$ X  P/ k# E1 K& Q+ A8 N6 b6 ~
return errResp(ErrDecode, “msg %v: %v”, msg, err)5 |7 @8 A3 `/ L# o
}
6 k- X5 O, S6 _5 f9 v: a5 |// If no headers were received, but we’re expending a DAO fork check, maybe it’s that- d. U. D. ^8 J' W" N  C; r! P0 g6 s
// 检查是不是当前DAO的硬分叉4 ^/ ]* Q( L/ _; Q
if len(headers) == 0 && p.forkDrop != nil {8 t8 T- @; @  u2 l* y2 Z  F, ?6 h
// Possibly an empty reply to the fork header checks, sanity check TDs
+ ^1 t- N5 o, O. N1 sverifyDAO := true' n; Y* p% ~- _, b5 l9 c8 }
        // If we already have a DAO header, we can check the peer's TD against it. If. ~+ g* w% n4 j" ]
        // the peer's ahead of this, it too must have a reply to the DAO check
& {( w1 u8 z( f& @! e2 h4 j( z; b        if daoHeader := pm.blockchain.GetHeaderByNumber(pm.chainconfig.DAOForkBlock.Uint64()); daoHeader != nil {8 z' q* D4 M/ S8 W0 u* G3 {. ]
                if _, td := p.Head(); td.Cmp(pm.blockchain.GetTd(daoHeader.Hash(), daoHeader.Number.Uint64())) >= 0 {
% J; f8 o+ k& U5 g* r/ f                        verifyDAO = false
5 C' Q4 u0 K- Z* o, Y, e+ a                }, e+ C+ |2 g6 V. N+ ~* C; \0 V
        }
( A/ m8 D$ n) f- P& k5 Q        // If we're seemingly on the same chain, disable the drop timer
8 ]9 B% e6 P! q        if verifyDAO {; q  g  I+ i+ r" n1 ]9 q& @# N  m
                p.Log().Debug("Seems to be on the same side of the DAO fork")
: u" h. ]) O7 L1 n                p.forkDrop.Stop()# A9 W: Q7 x( j* W! O0 a' K3 F  V
                p.forkDrop = nil
/ e& q$ U) F  _- o                return nil% ?1 q) ~! n. X7 Y- [  F
        }: Y0 N7 A5 h9 s8 L" ?; T- a! g
}
. E1 P! ]2 e4 m9 W0 q// Filter out any explicitly requested headers, deliver the rest to the downloader
, q$ L- x$ ~8 Y9 U$ U// 过滤是不是fetcher请求的区块头,去掉fetcher请求的区块头再交给downloader
- l9 |& y$ W0 C& n! K* C% Kfilter := len(headers) == 1
8 H& X: u7 e( |$ f, Zif filter {
6 X2 i2 t; W9 X! R- @        // If it's a potential DAO fork check, validate against the rules9 j, ]! p3 U; T; Z% z  {, f
        // 检查是否硬分叉2 p9 s  _5 \2 A, ?2 q
        if p.forkDrop != nil && pm.chainconfig.DAOForkBlock.Cmp(headers[0].Number) == 0 {; Z  ~; B1 S- y: G: o8 o
                // Disable the fork drop timer, c: g* o( W  q8 @1 R7 ]3 B) D" I! F/ I
                p.forkDrop.Stop()( E+ h0 y7 ]# e; I) `- A  e& l( K/ @
                p.forkDrop = nil
! Y6 d1 j- L9 r2 h" U                // Validate the header and either drop the peer or continue
4 [1 G% w- D. Z1 v# l# F                if err := misc.VerifyDAOHeaderExtraData(pm.chainconfig, headers[0]); err != nil {
+ F5 o( j2 b" u" O' m: ~! h( G. O                        p.Log().Debug("Verified to be on the other side of the DAO fork, dropping")0 R* N( p9 c$ U- p5 s
                        return err6 O5 p; W0 P- y* q( U
                }9 Z' s5 Q7 i; q0 m' ~: x; L% Z
                p.Log().Debug("Verified to be on the same side of the DAO fork")
1 T1 n4 E7 h: D& J! h+ K; e+ P                return nil  u- f  g: |1 ~1 o3 d0 B
        }$ L$ E2 s* b) t: c0 g; r: \
        // Irrelevant of the fork checks, send the header to the fetcher just in case
; R6 V# s2 K! L: }7 ]: i: j        // 使用fetcher过滤区块头
- b! M+ O4 ~' ?) L1 A* b% d        headers = pm.fetcher.FilterHeaders(p.id, headers, time.Now())
0 u  M$ G8 E; ^$ ~$ X}
! ^3 X* i  W+ x" p  }) X# E! r// 剩下的区块头交给downloader
' ^, M/ X/ f3 I$ p( Fif len(headers) > 0 || !filter {5 f) x% G2 k' f0 Y( U* O6 t9 ?
        err := pm.downloader.DeliverHeaders(p.id, headers)! ^% D5 P2 N3 `
        if err != nil {9 x5 W' ~+ ]' W/ z0 o! m% B
                log.Debug("Failed to deliver headers", "err", err)
6 L! a4 E' p6 f6 Z! _        }& w) M/ Q$ z, m% O
}
/ L* p, b5 S- }' d8 q7 Q, n`FilterHeaders()`是一个很有大智慧的函数,看起来耐人寻味,但实在妙。它要把所有的区块头,都传递给fetcher协程,还要获取fetcher协程处理后的结果。`fetcher.headerFilter`是存放通道的通道,而`filter`是存放包含区块头过滤任务的通道。它先把`filter`传递给了`headerFilter`,这样`fetcher`协程就在另外一段等待了,而后将`headerFilterTask`传入`filter`,`fetcher`就能读到数据了,处理后,再将数据写回`filter`而刚好被`FilterHeaders`函数处理了,该函数实际运行在`handleMsg()`的协程中。
% r  I6 A2 ?) ?, _8 Y( ]: w; E每个Peer都会分配一个ProtocolManager然后处理该Peer的消息,但`fetcher`只有一个事件处理协程,如果不创建一个`filter`,fetcher哪知道是谁发给它的区块头呢?过滤之后,该如何发回去呢?
8 u, b( J& t2 b, f// FilterHeaders extracts all the headers that were explicitly requested by the fetcher,
, N  f/ n7 n1 z// returning those that should be handled differently.
2 }8 [* `5 n) l3 f2 U5 J7 z  S// 寻找出fetcher请求的区块头& ^: X2 ]; H5 l& F$ a8 s* ^3 j
func (f *Fetcher) FilterHeaders(peer string, headers []*types.Header, time time.Time) []*types.Header {
% u' A  n0 G2 ^7 hlog.Trace(“Filtering headers”, “peer”, peer, “headers”, len(headers))
1 k5 N$ P" u! Q2 n3 r$ v// Send the filter channel to the fetcher8 u. z! s% e; `- d# v  y
// 任务通道
  h7 u; T) X! C6 e. z0 U7 vfilter := make(chan *headerFilterTask)5 r: g0 a9 m0 d0 F
select {
) \! N. e# l+ I1 O" L8 i// 任务通道发送到这个通道
5 r0 Y9 {# E# O, W, ~, d. Acase f.headerFilter % s" |0 N/ I6 W
}; |9 r) y; ~! J4 r+ c" {
接下来要看f.headerFilter的处理,这段代码有90行,它做了一下几件事:0 P6 ]% V! }9 t/ H$ V* i- ^% S
1. 从`f.headerFilter`取出`filter`,然后取出过滤任务`task`。
4 `; b5 c  ~5 |$ I' x2. 它把区块头分成3类:`unknown`这不是分是要返回给调用者的,即`handleMsg()`, `incomplete`存放还需要获取body的区块头,`complete`存放只包含区块头的区块。遍历所有的区块头,填到到对应的分类中,具体的判断可看18行的注释,记住宏观中将的状态转移图。
" ]' g, A  U2 l3 O, f& E2 `3 F3. 把`unknonw`中的区块返回给`handleMsg()`。* _0 o; _+ ^6 P: a3 T
4. 把` incomplete`的区块头获取状态移动到`fetched`状态,然后触发定时器,以便去处理complete的区块。
" b+ v' K8 X% q! T, x( V9 E5. 把`compelete`的区块加入到`queued`。
% z* [3 k( U) a5 ^& E// fetcher.loop()& m- s. j& J! ]* y- ]  @
case filter :=
1 ?$ X' T6 C, E7 g/ U+ s5 ]9 }3 C// Split the batch of headers into unknown ones (to return to the caller),1 Y# b* Z6 o7 Q( p+ o
// known incomplete ones (requiring body retrievals) and completed blocks.
# H7 i* A2 q+ f/ T1 c# Q0 r// unknown的不是fetcher请求的,complete放没有交易和uncle的区块,有头就够了,incomplete放+ `1 e; u# V+ o. {" b
// 还需要获取uncle和交易的区块
, Q$ p4 U+ T  E8 o5 eunknown, incomplete, complete := []*types.Header{}, []*announce{}, []*types.Block{}
: r3 o7 l( m- e  O) l( U. {// 遍历所有收到的header, }+ d; u: z7 f$ B% B2 }# M# F
for _, header := range task.headers {- H% y* e: x8 I. }6 Y0 d) G6 v
        hash := header.Hash()- K9 ~  k) u% U" N( Y4 r2 n
        // Filter fetcher-requested headers from other synchronisation algorithms
2 R6 `8 V9 w5 `0 Z# S        // 是正在获取的hash,并且对应请求的peer,并且未fetched,未completing,未queued
" Z5 d; \7 o2 H  X/ \        if announce := f.fetching[hash]; announce != nil && announce.origin == task.peer && f.fetched[hash] == nil && f.completing[hash] == nil && f.queued[hash] == nil {
% J. h5 i  t  E+ V6 M! O3 ]                // If the delivered header does not match the promised number, drop the announcer
& T+ g. j" z! O2 @* N1 i                // 高度校验,竟然不匹配,扰乱秩序,peer肯定是坏蛋。: ~0 k: `; A' t* i% X3 ~0 x
                if header.Number.Uint64() != announce.number {
- \$ ]; Z9 Y3 C1 w. d1 M7 A                        log.Trace("Invalid block number fetched", "peer", announce.origin, "hash", header.Hash(), "announced", announce.number, "provided", header.Number); ]4 K* Y/ {9 K) T/ \7 T
                        f.dropPeer(announce.origin)
1 h' w, S3 i4 Y. E                        f.forgetHash(hash)9 {$ i" e! p( U1 i. b3 G
                        continue; y  K  E% o4 S; a4 Z+ F2 P8 F$ ^4 n1 p
                }- e3 i9 z5 W: q! W' F
                // Only keep if not imported by other means
# u7 E$ H) M7 q5 o$ \; W% ?                // 本地链没有当前区块% A6 \6 r% E: e2 N4 Z* c+ g
                if f.getBlock(hash) == nil {
8 |5 u' L' c0 o. L                        announce.header = header
% w4 Z  v" y; ^" T- _                        announce.time = task.time, w4 P( E) l% H; I2 i
                        // If the block is empty (header only), short circuit into the final import queue) k6 }' ?" p* K, e+ v" l( n1 I' s
                        // 如果区块没有交易和uncle,加入到complete
  v& V/ |& c  Z5 l                        if header.TxHash == types.DeriveSha(types.Transactions{}) && header.UncleHash == types.CalcUncleHash([]*types.Header{}) {: a. ~+ l( k8 ]. M
                                log.Trace("Block empty, skipping body retrieval", "peer", announce.origin, "number", header.Number, "hash", header.Hash())
- {! ^- S6 A9 {' q) ]  [                                block := types.NewBlockWithHeader(header)
" \. s' a& {7 A; Y3 J  b                                block.ReceivedAt = task.time
0 `. |% o" e3 c/ |' Y/ b; ]                                complete = append(complete, block)
; E. m  |+ C; M* {# ?! V8 S( g: {                                f.completing[hash] = announce
+ I, u( b; o. g+ `( o) m2 d; N- O                                continue
+ \& J; s) ?3 h3 k                        }3 |( \$ P% g" G' v
                        // Otherwise add to the list of blocks needing completion
6 a% {$ }3 q( d. [  s! Q9 _                        // 否则就是不完整的区块% f5 }2 o8 E+ n1 [2 \3 z6 O+ B
                        incomplete = append(incomplete, announce)
3 E: Z# k7 [# n/ L# G1 t. ?                } else {  i  [9 ^$ I# z0 X2 {
                        log.Trace("Block already imported, discarding header", "peer", announce.origin, "number", header.Number, "hash", header.Hash()), R3 Y+ j+ L0 C/ t
                        f.forgetHash(hash)3 H+ Z2 s% B1 d! E+ ~  V) Y. i0 k
                }: S$ k2 n! b- H, Y# a
        } else {/ M. V2 e, |4 _& c5 B6 @
                // Fetcher doesn't know about it, add to the return list
/ J" U: ~. g. g' A                // 没请求过的header
+ p1 }* @0 Z& ^% a2 k5 `, ?                unknown = append(unknown, header)$ w! V9 P  `$ ^9 k% d
        }
; J# L7 P+ b2 D- H5 k% G( K% x}
( u. \( G" K# Q3 X% m// 把未知的区块头,再传递会filter
8 c- h8 c: P1 M* j" L. ^headerFilterOutMeter.Mark(int64(len(unknown)))' P  p6 V) L+ d8 r
select {7 V7 e+ [; `( |4 n8 K
case filter
2 Z0 R" a/ z1 z" ^3 K跟随状态图的转义,剩下的工作是`fetched`转移到`completing`        ,上面的流程已经触发了`completeTimer`定时器,超时后就会处理,流程与请求Header类似,不再赘述,此时发送的请求消息是`GetBlockBodiesMsg`,实际调的函数是`RequestBodies`。
6 d, ~* u5 n& ]% O3 x5 @// fetcher.loop()- U4 _7 N1 I4 E2 }& Y
case
! L% v! i/ ?' e# ]) j* N+ r// 遍历所有待获取body的announce8 e7 H3 L& F8 s6 K! X
for hash, announces := range f.fetched {. W% o' ^( k* N
        // Pick a random peer to retrieve from, reset all others
8 y0 C  I  [! s" _( C0 h8 Z        // 随机选一个Peer发送请求,因为可能已经有很多Peer通知它这个区块了
8 ~1 ]" V1 @  O; m: Z        announce := announces[rand.Intn(len(announces))]
; P1 n; t5 d  O2 v; A5 f4 a1 a        f.forgetHash(hash). j* b1 N$ C2 [& ?
        // If the block still didn't arrive, queue for completion
# ]) V, e; {# S+ d        // 如果本地没有这个区块,则放入到completing,创建请求4 A- g2 E6 A+ Z: f
        if f.getBlock(hash) == nil {# s$ \& y" C3 q( N' E
                request[announce.origin] = append(request[announce.origin], hash)5 N9 c1 G' {# {' B
                f.completing[hash] = announce
# N' Y& r5 {3 n0 @8 c( j( ^        }  A. f% W" l6 S% O. s; f( ]# }; W
}
& [- q7 }1 _( }+ t' o- I- ^) C// Send out all block body requests. [) B2 Z. t6 t" f4 Y; k
// 发送所有的请求,获取body,依然是每个peer一个单独协程
- w$ ^6 f, Y+ p7 tfor peer, hashes := range request {' r% b1 M  \/ @) k( S
        log.Trace("Fetching scheduled bodies", "peer", peer, "list", hashes)
7 i4 J4 x$ g( ?) ?        // Create a closure of the fetch and schedule in on a new thread' z" C* ?/ }& b7 A3 B1 m( o& \
        if f.completingHook != nil {& c3 ^4 M% w% v( T+ I
                f.completingHook(hashes)
' _: E& f  G1 G- x' @  ~1 c6 A        }
6 g( ]9 L. ?/ _0 j* b/ p0 M        bodyFetchMeter.Mark(int64(len(hashes)))
1 A2 @) K( U9 [, u' z7 ~        go f.completing[hashes[0]].fetchBodies(hashes)3 ?, T. W" q1 S
}
$ V7 }6 j* ~6 ^! x/ k+ n// Schedule the next fetch if blocks are still pending6 s7 r9 d( M! R/ [( ~9 ~& @! ]' l
f.rescheduleComplete(completeTimer)
, Z, `: `$ X8 G`handleMsg()`处理该消息也是干净利落,直接获取RLP格式的body,然后发送响应消息。4 _# y# S5 y; {) j
// handleMsg()4 F5 m' }! @: h2 a2 k( W
case msg.Code == GetBlockBodiesMsg:
! E3 X8 D, }3 b+ c" m  f// Decode the retrieval message
$ I% }! Z; D! }1 H8 JmsgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
" F; ]9 b* p3 Y5 o! H0 w6 k$ zif _, err := msgStream.List(); err != nil {# a5 g, T3 Z' ~
return err% Q9 `1 A' D0 Y2 |! S( g. y
}8 B' O5 b% R4 M3 ~. x1 K
// Gather blocks until the fetch or network limits is reached
% ]  h! Q8 c4 Yvar (
: E1 \5 J4 @9 V: N+ Lhash   common.Hash; D  D$ c% g0 J% L( K" w
bytes  int
; I( p9 A+ [! i0 y8 gbodies []rlp.RawValue" V% g) B* }: A( Q/ X2 F
)
+ y) I. n0 J9 m" P( ]* _// 遍历所有请求9 W0 A; l8 m+ M6 l& p) e
for bytes   ^) @4 T, L; o  W8 p
响应消息`BlockBodiesMsg`的处理与处理获取header的处理原理相同,先交给fetcher过滤,然后剩下的才是downloader的。需要注意一点,响应消息里只包含交易列表和叔块列表。3 {$ t* m& m/ e0 Q
// handleMsg()
6 j" ~% N$ y. b) c. ^case msg.Code == BlockBodiesMsg:
6 B+ o  w: `, g% ?( I// A batch of block bodies arrived to one of our previous requests
' h6 v5 l0 _* x+ M5 ?/ yvar request blockBodiesData
: ?- f* H* s: M, f/ X1 c. Fif err := msg.Decode(&request); err != nil {
5 S' G& m9 l% R" l; A: hreturn errResp(ErrDecode, “msg %v: %v”, msg, err)7 ~' o: }0 d! |9 f/ b7 U( f7 S
}1 y' ?  Z$ e; S8 Z' i
// Deliver them all to the downloader for queuing* P2 x2 V2 b& v/ l
// 传递给downloader去处理
! a, G% q& H* V/ O7 H* t, p8 w, Itransactions := make([][]*types.Transaction, len(request))4 l' [& g/ G3 Z- o
uncles := make([][]*types.Header, len(request))
7 z; n4 |5 d* c& u# y* ffor i, body := range request {3 G* `( E: ]$ A% F
        transactions = body.Transactions, I' L  D: F4 N' S
        uncles = body.Uncles
" @6 K; _8 i9 Y" q  Q, `}
5 J9 e2 L5 w5 L5 h// Filter out any explicitly requested bodies, deliver the rest to the downloader
0 [8 r9 B/ s1 U7 y5 ^// 先让fetcher过滤去fetcher请求的body,剩下的给downloader% T* T6 n9 ~$ f+ A
filter := len(transactions) > 0 || len(uncles) > 03 _- J8 ^" s9 Y1 b
if filter {3 w) @3 ]% ^1 M, _' m9 Y+ |$ b
        transactions, uncles = pm.fetcher.FilterBodies(p.id, transactions, uncles, time.Now())
+ s& Q4 a  ^& q$ N}* x4 `  T1 f6 E! e# V# W" ~8 c1 O
// 剩下的body交给downloader
7 A0 P$ h9 C- u, W% Nif len(transactions) > 0 || len(uncles) > 0 || !filter {
) u( P5 w+ C- z        err := pm.downloader.DeliverBodies(p.id, transactions, uncles)* P- D* c- y9 V: |( p6 T
        if err != nil {6 ^% t) `' c% D  o& _. X
                log.Debug("Failed to deliver bodies", "err", err)5 M, X0 p" l  G4 {7 T
        }8 ~2 ^6 [: V8 n% e& R
}+ }6 d9 |! k. l7 _3 m' o0 Q. u: E
过滤函数的原理也与Header相同。3 Y+ B& b6 v) L7 Q, ]4 \
// FilterBodies extracts all the block bodies that were explicitly requested by1 v* Z: w9 Q! c7 u
// the fetcher, returning those that should be handled differently.  s' y- W1 h2 o( U: x  f% p5 T
// 过去出fetcher请求的body,返回它没有处理的,过程类型header的处理$ f' G8 \4 E( r7 c& t7 I, V9 K
func (f *Fetcher) FilterBodies(peer string, transactions [][]*types.Transaction, uncles [][]*types.Header, time time.Time) ([][]*types.Transaction, [][]*types.Header) {
9 M1 D+ N5 Y+ q# k; D( k/ e( x9 clog.Trace(“Filtering bodies”, “peer”, peer, “txs”, len(transactions), “uncles”, len(uncles))4 t0 T! |& L8 ^1 l/ f- h6 A$ M' O# u
// Send the filter channel to the fetcher
+ q# O( D) H) ]  C3 Hfilter := make(chan *bodyFilterTask): u& L9 U* ^. F( }' m5 \
select {) i" t. U% _* Z
case f.bodyFilter
3 m. L: K! c2 i; E6 ?}
0 [5 ^; c. a$ e( I6 c: y* f- W# [" o1 ^实际过滤body的处理瞧一下,这和Header的处理是不同的。直接看不点:9 ?" B/ I: g+ }
1. 它要的区块,单独取出来存到`blocks`中,它不要的继续留在`task`中。
6 ~4 d5 w: s. o$ Y$ |2. 判断是不是fetcher请求的方法:如果交易列表和叔块列表计算出的hash值与区块头中的一样,并且消息来自请求的Peer,则就是fetcher请求的。. A2 t# ]# J5 Q8 \) z
3. 将`blocks`中的区块加入到`queued`,终结。
- q" V# p) G1 a% hcase filter := . H  C5 F! N( E; k
blocks := []*types.Block{}
- _: H* ^( H6 K3 M// 获取的每个body的txs列表和uncle列表/ x. s3 b& x0 o! A% Y  M
// 遍历每个区块的txs列表和uncle列表,计算hash后判断是否是当前fetcher请求的body
1 W- y9 @0 f; {for i := 0; i 7 M5 j/ U6 ?. Y. J0 H- J+ U
}
' H/ p7 R1 G3 U) F2 e2 o3 R
% J  R' A- [- g% [; Y至此,fetcher获取完整区块的流程讲完了,fetcher模块中80%的代码也都贴出来了,还有2个值得看看的函数:4 |- J/ Z2 t# l
1. `forgetHash(hash common.Hash)`:用于清空指定hash指的记/状态录信息。. z  t$ l3 o4 i" L) a& W
2. `forgetBlock(hash common.Hash)`:用于从队列中移除一个区块。
( v/ J" z1 S% L$ i6 n最后了,再回到开始看看fetcher模块和新区块的传播流程,有没有豁然开朗。
  S' G2 ]. f9 I" H5 @' ^9 U& H7 q) Z* p
BitMere.com 比特池塘系信息发布平台,比特池塘仅提供信息存储空间服务。
声明:该文观点仅代表作者本人,本文不代表比特池塘立场,且不构成建议,请谨慎对待。
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

成为第一个吐槽的人

刘艳琴 小学生
  • 粉丝

    0

  • 关注

    0

  • 主题

    3