Hi 游客

更多精彩,请登录!

比特池塘 区块链技术 正文

以太坊源码分析:fetcher模块和区块传播

刘艳琴
228 0 0
从区块传播策略入手,介绍新区块是如何传播到远端节点,以及新区块加入到远端节点本地链的过程,同时会介绍fetcher模块,fetcher的功能是处理Peer通知的区块信息。在介绍过程中,还会涉及到p2p,eth等模块,不会专门介绍,而是专注区块的传播和加入区块链的过程。
/ F. J* ~, `! y( y: ~; ^当前代码是以太坊Release 1.8,如果版本不同,代码上可能存在差异。
1 l) N) u. w/ M总体过程和传播策略: d( y, d0 j9 p4 B2 N% b
本节从宏观角度介绍,节点产生区块后,为了传播给远端节点做了啥,远端节点收到区块后又做了什么,每个节点都连接了很多Peer,它传播的策略是什么样的?
$ |7 @* g: A; d  `总体流程和策略可以总结为,传播给远端Peer节点,Peer验证区块无误后,加入到本地区块链,继续传播新区块信息。具体过程如下。
- O8 L5 F5 `% F  s1 k, I& S先看总体过程。产生区块后,miner模块会发布一个事件NewMinedBlockEvent,订阅事件的协程收到事件后,就会把新区块的消息,广播给它所有的peer,peer收到消息后,会交给自己的fetcher模块处理,fetcher进行基本的验证后,区块没问题,发现这个区块就是本地链需要的下一个区块,则交给blockChain进一步进行完整的验证,这个过程会执行区块所有的交易,无误后把区块加入到本地链,写入数据库,这个过程就是下面的流程图,图1。8 Y9 Y5 [$ j4 K8 W
2 r5 X# Y( V( t1 \- N
总体流程图,能看到有个分叉,是因为节点传播新区块是有策略的。它的传播策略为:
4 H4 Y1 r7 u" E: x' R$ K假如节点连接了N个Peer,它只向Peer列表的sqrt(N)个Peer广播完整的区块消息。向所有的Peer广播只包含区块Hash的消息。
/ {6 `& |7 g. w1 v6 g; B6 N策略图的效果如图2,红色节点将区块传播给黄色节点:
2 |9 L6 a7 s2 ~2 m5 p, n
- ~1 |7 P/ e6 d& `5 U
2 Z- i' x; p, v# q' ^- u收到区块Hash的节点,需要从发送给它消息的Peer那里获取对应的完整区块,获取区块后就会按照图1的流程,加入到fetcher队列,最终插入本地区块链后,将区块的Hash值广播给和它相连,但还不知道这个区块的Peer。非产生区块节点的策略图,如图3,黄色节点将区块Hash传播给青色节点:( `) c$ X0 j+ X

/ ~, F3 a. L; M2 V# J* m2 ]6 y8 M至此,可以看出以太坊采用以石击水的方式,像水纹一样,层层扩散新产生的区块。
( _" ?8 o1 S' J: uFetcher模块是干啥的
+ z+ _9 d* d5 D) E/ I. ?5 lfetcher模块的功能,就是收集其他Peer通知它的区块信息:1)完整的区块2)区块Hash消息。根据通知的消息,获取完整的区块,然后传递给eth模块把区块插入区块链。
* I& ]+ i. x+ N- z( }5 Y& o5 b8 A如果是完整区块,就可以传递给eth插入区块,如果只有区块Hash,则需要从其他的Peer获取此完整的区块,然后再传递给eth插入区块
& ]5 U& n! S  W: a1 ]+ O- u* Z' r
, Q5 X) u- z0 X- j1 Y6 l源码解读8 Q8 g; w: S, R+ n0 u
本节介绍区块传播和处理的细节东西,方式仍然是先用图解释流程,再是代码流程。; U% q, C: A4 K- ^
产块节点的传播新区块3 t1 M" i7 s: O5 j0 j% n
节点产生区块后,广播的流程可以表示为图4:/ l# X& c1 l- K, f9 S% v6 q6 M
发布事件事件处理函数选择要广播完整的Peer,然后将区块加入到它们的队列事件处理函数把区块Hash添加到所有Peer的另外一个通知队列每个Peer的广播处理函数,会遍历它的待广播区块队列和通知队列,把数据封装成消息,调用P2P接口发送出去
2 P' k- E% \4 m# f6 D
  Q" ^/ ]1 [0 R+ ^/ r5 ?2 w3 ]: i; q* z3 G/ Y  r
再看下代码上的细节。: C1 U* K  e4 k* C! n, w8 U1 K
worker.wait()函数发布事件NewMinedBlockEvent。ProtocolManager.minedBroadcastLoop()是事件处理函数。它调用了2次pm.BroadcastBlock()。1 t8 ?2 M2 R. ]% P, d3 t
. p6 N% a  U1 Y( b
// Mined broadcast loop+ \3 p2 v! E, h0 x* V- f2 y! t
func (pm *ProtocolManager) minedBroadcastLoop() {
. [1 [9 ^1 p  H0 X# ?+ H4 m        // automatically stops if unsubscribe- |" B& K# ]0 ]
        for obj := range pm.minedBlockSub.Chan() {, T$ d+ K. M2 r# K# ~3 c
                switch ev := obj.Data.(type) {
! D) ]) k* O$ w                case core.NewMinedBlockEvent:
' L  k0 w. }" Z. P$ q# j                        pm.BroadcastBlock(ev.Block, true)  // First propagate block to peers
: m4 f0 y" p6 r' _6 f                        pm.BroadcastBlock(ev.Block, false) // Only then announce to the rest0 F/ p. t, i, k7 w( W9 J
                }
7 L, J3 _  |  n# o8 Y& C* h        }. M9 ]9 T4 h/ g, a/ Z8 Z, H) l$ Q
}  t4 \7 v* s' M- I8 ]
pm.BroadcastBlock()的入参propagate为真时,向部分Peer广播完整的区块,调用peer.AsyncSendNewBlock(),否则向所有Peer广播区块头,调用peer.AsyncSendNewBlockHash(),这2个函数就是把数据放入队列,此处不再放代码。# e6 C3 S1 V" a; t0 Z$ E# m" x
// BroadcastBlock will either propagate a block to a subset of it's peers, or8 c! U; j; ~' E
// will only announce it's availability (depending what's requested).$ c3 B' R3 v1 T) J* t# R
func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) {; i5 V1 B& @" E" h+ J/ K
        hash := block.Hash()
. P" u- \+ H( Q        peers := pm.peers.PeersWithoutBlock(hash)
" G5 \1 A* m0 [# }. a) e/ L        // If propagation is requested, send to a subset of the peer2 `$ C6 m+ x1 I2 C: F, n
        // 这种情况,要把区块广播给部分peer$ d' _3 J7 @1 T3 B. `
        if propagate {
* r$ d& h" r1 f3 P/ w6 ?/ X                // Calculate the TD of the block (it's not imported yet, so block.Td is not valid)9 J3 N3 |8 d% X- M( w+ Z0 \
                // 计算新的总难度
% a3 U9 C0 ^! r5 Y3 R5 O( w: L9 C                var td *big.Int
4 j# U! w- L( f3 X7 P  k: m                if parent := pm.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1); parent != nil {
" D: }0 Q% f* [( T$ |# P) A( v1 }                        td = new(big.Int).Add(block.Difficulty(), pm.blockchain.GetTd(block.ParentHash(), block.NumberU64()-1))
0 z: P9 E; i% I" h, c9 w                } else {
# I% g0 R7 Y6 E, P" I- \                        log.Error("Propagating dangling block", "number", block.Number(), "hash", hash)7 }+ X: U8 B, X5 n2 O3 j: Q
                        return
9 i) ~1 y0 S. m                }9 E( t, H0 G' u; S
                // Send the block to a subset of our peers5 Q; b6 D9 o  j  ~
                // 广播区块给部分peer$ U  X3 `$ U* }# T
                transfer := peers[:int(math.Sqrt(float64(len(peers))))]
; S! W' K7 s9 I6 A2 @                for _, peer := range transfer {* `' T1 f% W: y, _! d
                        peer.AsyncSendNewBlock(block, td)
, K  ]( m1 f* a2 T5 b. b& @; y                }  W; S# {, [& }8 ^! F
                log.Trace("Propagated block", "hash", hash, "recipients", len(transfer), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))  h' W+ v9 N8 I; B( m
                return9 c4 S! ?( A$ [0 o9 v. D! |
        }- g; r9 ~' V) t; c$ x5 ]; X5 n0 h' l1 y
        // Otherwise if the block is indeed in out own chain, announce it
2 {8 K4 m+ f8 A  @- W) T6 X1 h        // 把区块hash值广播给所有peer9 \& t2 w. D$ J2 |6 Z
        if pm.blockchain.HasBlock(hash, block.NumberU64()) {
5 N4 D- H* |* z; A+ t; `; R; I, y                for _, peer := range peers {
6 }5 {* V/ `$ F* W" z                        peer.AsyncSendNewBlockHash(block)9 h. t( z: ]- x  ?  c' F
                }
( |$ C) p* w4 T6 o; b                log.Trace("Announced block", "hash", hash, "recipients", len(peers), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))+ y& S) u1 X7 P  _6 N
        }
) B; `; \+ t9 l8 {- k}! ?5 o1 s; s! y& N' Z. l
peer.broadcase()是每个Peer连接的广播函数,它只广播3种消息:交易、完整的区块、区块的Hash,这样表明了节点只会主动广播这3中类型的数据,剩余的数据同步,都是通过请求-响应的方式。
( {3 k" ]6 O) G* J// broadcast is a write loop that multiplexes block propagations, announcements
9 W; c2 ?- v7 _- \$ S3 y) B" h// and transaction broadcasts into the remote peer. The goal is to have an async
% }. @! H' i9 v# Z; r// writer that does not lock up node internals.7 y. T4 w9 g1 e' N
func (p *peer) broadcast() {: L8 F0 x5 U: d8 n. ^
        for {% z  y$ F3 g- J' G
                select {8 F( j) c; C; g. d( r' [) P# b
                // 广播交易
% K( l9 w5 K9 S/ |$ c* J9 ?                case txs := % v" A5 b4 L" i  S& S
Peer节点处理新区块
" t4 d( v5 I. V. t7 O9 ]本节介绍远端节点收到2种区块同步消息的处理,其中NewBlockMsg的处理流程比较清晰,也简洁。NewBlockHashesMsg消息的处理就绕了2绕,从总体流程图1上能看出来,它需要先从给他发送消息Peer那里获取到完整的区块,剩下的流程和NewBlockMsg又一致了。+ p# `- O  J& R% E
这部分涉及的模块多,画出来有种眼花缭乱的感觉,但只要抓住上面的主线,代码看起来还是很清晰的。通过图5先看下整体流程。7 X( h$ C; N, H- S
消息处理的起点是ProtocolManager.handleMsg,NewBlockMsg的处理流程是蓝色标记的区域,红色区域是单独的协程,是fetcher处理队列中区块的流程,如果从队列中取出的区块是当前链需要的,校验后,调用blockchian.InsertChain()把区块插入到区块链,最后写入数据库,这是黄色部分。最后,绿色部分是NewBlockHashesMsg的处理流程,代码流程上是比较复杂的,为了能通过图描述整体流程,我把它简化掉了。4 X2 u% m1 a% Q5 t# L. U$ v

7 A( ?0 E$ I" h; @( t* _% U仔细看看这幅图,掌握整体的流程后,接下来看每个步骤的细节。+ Z! c# t7 u3 v, s
NewBlockMsg的处理
9 K- c8 A5 ?% |5 A% B1 M) d本节介绍节点收到完整区块的处理,流程如下:
# V9 t$ E9 |# s; ]( G7 l  K首先进行RLP编解码,然后标记发送消息的Peer已经知道这个区块,这样本节点最后广播这个区块的Hash时,不会再发送给该Peer。- x" y# a' _2 o4 \# A2 [. F( B( D
将区块存入到fetcher的队列,调用fetcher.Enqueue。
1 Y/ i& H% d9 `更新Peer的Head位置,然后判断本地链是否落后于Peer的链,如果是,则通过Peer更新本地链。
- d/ o4 F0 N, f! y只看handle.Msg()的NewBlockMsg相关的部分。
, v0 Q# }3 B! t: l; c+ ?5 [$ Jcase msg.Code == NewBlockMsg:
# o1 I- V4 \2 f0 n        // Retrieve and decode the propagated block& r7 \, W! }: C
        // 收到新区块,解码,赋值接收数据* O* @7 q! d( \' ?
        var request newBlockData& s/ N% o2 T! {' {- I9 t4 \
        if err := msg.Decode(&request); err != nil {# G) v. S5 t  E; c9 }' O3 N
                return errResp(ErrDecode, "%v: %v", msg, err)
" ^9 t8 b3 c- v& J0 ?$ _! ~        }
% }) J$ E; V" l, p- [        request.Block.ReceivedAt = msg.ReceivedAt) _+ {/ g6 U& q
        request.Block.ReceivedFrom = p& B5 C- f0 L3 _* E  I
        // Mark the peer as owning the block and schedule it for import; M' ?6 P3 m* `, Y, w
        // 标记peer知道这个区块
  I2 f4 [/ Y( U. T        p.MarkBlock(request.Block.Hash())! U  ]: L5 E& z9 q" k. R
        // 为啥要如队列?已经得到完整的区块了
& {' X7 l9 q" H3 H- R5 [" @        // 答:存入fetcher的优先级队列,fetcher会从队列中选取当前高度需要的块) f; S( [' r6 \/ r, d
        pm.fetcher.Enqueue(p.id, request.Block)2 Q1 p4 @) h8 y' M  I
        // Assuming the block is importable by the peer, but possibly not yet done so,
' T3 d8 u4 h+ [: n        // calculate the head hash and TD that the peer truly must have.7 F4 G3 [7 o; [, B7 ^
        // 截止到parent区块的头和难度' I+ N* t6 ?7 O& m% i0 Y- O
        var (5 P/ a1 {* l" V7 a0 Y
                trueHead = request.Block.ParentHash()
$ [8 S8 R- ]& D" W6 l7 Q                trueTD   = new(big.Int).Sub(request.TD, request.Block.Difficulty())
+ a0 Y/ P) }0 i' g2 e' C# k        )
" U( U7 \8 @" z* ~  b        // Update the peers total difficulty if better than the previous: P$ n5 x7 g1 X) t3 I
        // 如果收到的块的难度大于peer之前的,以及自己本地的,就去和这个peer同步
0 b* m6 n  M7 L7 H* v: u        // 问题:就只用了一下块里的hash指,为啥不直接使用这个块呢,如果这个块不能用,干嘛不少发送些数据,减少网络负载呢。, q$ ]5 {- i( q* t6 ]0 ~
        // 答案:实际上,这个块加入到了优先级队列中,当fetcher的loop检查到当前下一个区块的高度,正是队列中有的,则不再向peer请求( t4 e/ Z. _5 {) h
        // 该区块,而是直接使用该区块,检查无误后交给block chain执行insertChain- S( C- c! r4 T9 V2 l
        if _, td := p.Head(); trueTD.Cmp(td) > 0 {
0 ?8 G( [' R) H3 O                p.SetHead(trueHead, trueTD)
1 S4 x2 Q5 ^6 D# }                // Schedule a sync if above ours. Note, this will not fire a sync for a gap of
* a. a) v, M8 `0 M% B                // a singe block (as the true TD is below the propagated block), however this5 }% `6 z) V9 A, d+ {
                // scenario should easily be covered by the fetcher.
& z" q% F# q" A( a                currentBlock := pm.blockchain.CurrentBlock(); p6 F& J; j+ n" K8 c- O% X
                if trueTD.Cmp(pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64())) > 0 {" a4 S5 f5 {/ G' Q
                        go pm.synchronise(p)! i" T/ b+ b( b7 n- @
                }  {# o, r! q* L3 X( o, S
        }
4 z6 g/ ?- g3 Z# F) n; G0 Q//------------------------ 以上 handleMsg
* G. x' F: \3 I1 p4 x" H// Enqueue tries to fill gaps the the fetcher's future import queue.  h2 R' R3 Y& l+ B9 w/ h
// 发给inject通道,当前协程在handleMsg,通过通道发送给fetcher的协程处理
3 D$ h, A$ ?8 [) a' l! H8 R' T2 b4 u7 Ifunc (f *Fetcher) Enqueue(peer string, block *types.Block) error {
/ n$ u+ ^$ e& U0 _  W        op := &inject{
% {1 s! E/ b& W0 b                origin: peer,) ~% O0 k& H0 L" Z2 _% Y
                block:  block,+ W, V$ E* E6 o7 |% v0 z3 h. _
        }: N) B5 C! H/ g4 D0 R
        select {
- W- _0 P2 P3 r+ ]        case f.inject  blockLimit {
% J7 \" Q+ u$ Y' N                log.Debug("Discarded propagated block, exceeded allowance", "peer", peer, "number", block.Number(), "hash", hash, "limit", blockLimit)0 H, v; c& w7 e8 i
                propBroadcastDOSMeter.Mark(1)7 g2 x1 M( q! I* r
                f.forgetHash(hash)! M4 e- l) U3 V' Z) E/ t" L0 c
                return+ q5 t& i3 V( i. l9 I6 N+ E) Q
        }
- w$ P2 S, o0 Q9 F! M2 R$ k7 B        // Discard any past or too distant blocks
5 l' S8 o* @# S& k- |3 J! u        // 高度检查:未来太远的块丢弃
, b6 k1 H) m7 n; |% X+ c        if dist := int64(block.NumberU64()) - int64(f.chainHeight()); dist  maxQueueDist {
) N* ~' d: `  L% F& F                log.Debug("Discarded propagated block, too far away", "peer", peer, "number", block.Number(), "hash", hash, "distance", dist). b8 L* |1 d) k# Z
                propBroadcastDropMeter.Mark(1)
, i6 X! b! `% T8 p                f.forgetHash(hash)4 l& H( Q  n2 R0 M, {$ ]2 M2 {7 |
                return
: Q3 {2 b: `9 u8 z        }$ p  Y$ p# }: l
        // Schedule the block for future importing% m3 j9 n+ S/ e& x
        // 块先加入优先级队列,加入链之前,还有很多要做
( ]6 `" E! W) \  z+ K  z        if _, ok := f.queued[hash]; !ok {5 v0 R0 W& \1 \2 F, k4 M
                op := &inject{
  H( Y( B2 V; G. @6 A                        origin: peer,
2 A7 ]6 }. b, |                        block:  block,  M) h5 a3 m! v4 k  T/ k4 @/ m
                }
7 G* n3 J1 Z6 }  |# Q                f.queues[peer] = count
# L: P5 Y) M  s  X% n                f.queued[hash] = op
) r! ~! K( B0 ^, E/ f                f.queue.Push(op, -float32(block.NumberU64()))
) _# ^% D$ k2 Y                if f.queueChangeHook != nil {
# P# z( F) }, ~# ?( E& A+ s                        f.queueChangeHook(op.block.Hash(), true)
. L' q# v* r4 z# }7 K0 C                }
  N! x1 o# X2 C" Q& Y  B                log.Debug("Queued propagated block", "peer", peer, "number", block.Number(), "hash", hash, "queued", f.queue.Size())
0 W3 n5 u3 Z) P$ y! K! M        }/ w  O* q7 a2 W# s4 @, r
}
, _# Z  _+ l' D$ k0 e) ?' Xfetcher队列处理
" X: G- ^& L8 R- A; Z' W# z, `% {本节我们看看,区块加入队列后,fetcher如何处理区块,为何不直接校验区块,插入到本地链?
. w' f. G7 M6 |( G3 U- D6 M8 M$ A8 B由于以太坊又Uncle的机制,节点可能收到老一点的一些区块。另外,节点可能由于网络原因,落后了几个区块,所以可能收到“未来”的一些区块,这些区块都不能直接插入到本地链。
* H0 T8 U/ t. k4 }区块入的队列是一个优先级队列,高度低的区块会被优先取出来。fetcher.loop是单独协程,不断运转,清理fecther中的事务和事件。首先会清理正在fetching的区块,但已经超时。然后处理优先级队列中的区块,判断高度是否是下一个区块,如果是则调用f.insert()函数,校验后调用BlockChain.InsertChain(),成功插入后,广播新区块的Hash
) }4 i& a; _: s// Loop is the main fetcher loop, checking and processing various notification
2 B2 G6 |* g9 d4 W3 }- g  l// events.6 _1 \" _9 U1 r* `: S. _
func (f *Fetcher) loop() {
' p" w' Y/ _7 a8 ?' F        // Iterate the block fetching until a quit is requested) M2 r5 o" I, {
        fetchTimer := time.NewTimer(0)
1 {/ E  f" G0 I; [' o2 _        completeTimer := time.NewTimer(0)
: d+ N# r3 O8 h* U- H        for {: H$ W" @- i; `6 u
                // Clean up any expired block fetches( U9 m" i9 I, W! _7 x' E; J4 U
                // 清理过期的区块" q+ s0 s! `% O
                for hash, announce := range f.fetching {  G& s1 ]1 `2 V0 w+ l
                        if time.Since(announce.time) > fetchTimeout {
; F0 @/ _8 |6 F5 D                                f.forgetHash(hash)
$ _& H2 A  ~* H6 T( W                        }' B+ h) L7 E& O9 C
                }
! H  a( t+ p7 Y9 S3 ]5 w9 S2 T1 n' Y                // Import any queued blocks that could potentially fit8 ?; r+ ^2 k, m$ `, x+ w$ H0 l' S
                // 导入队列中合适的块
8 P" {1 ]* ~; S* ~6 N$ }                height := f.chainHeight()
% t( z7 {+ {9 d1 R& P! u                for !f.queue.Empty() {
7 r: \4 P/ _) ]" b% e                        op := f.queue.PopItem().(*inject)
! t7 @* z* i7 q                        hash := op.block.Hash()
2 Y: i" Q( |6 A9 C# D8 {                        if f.queueChangeHook != nil {; Q) E9 o% m& I
                                f.queueChangeHook(hash, false)0 H8 Y8 ^7 j# t' s* k, B
                        }
, ~+ q( a$ q4 V( e6 ^; r1 h1 E                        // If too high up the chain or phase, continue later
1 i% M' v  b. j" |) q9 [$ X, W                        // 块不是链需要的下一个块,再入优先级队列,停止循环
: C+ P. J9 n" L- V, h8 W                        number := op.block.NumberU64()
* d  \& E2 [1 v$ }                        if number > height+1 {3 b( i6 ?# v* }. Z
                                f.queue.Push(op, -float32(number))
; u( N1 ]  \- z1 g* A                                if f.queueChangeHook != nil {1 K( s9 m5 m% i3 {/ ^' B1 Y
                                        f.queueChangeHook(hash, true)( |7 U1 }4 Z3 @( w5 E, o
                                }1 _# {) n! m3 P4 [  h
                                break
& p6 l) r9 n3 \7 q8 m+ D                        }! o- l# D% f' V* c0 ~, j
                        // Otherwise if fresh and still unknown, try and import
! V" u! c* ~4 g( O6 m: P% {                        // 高度正好是我们想要的,并且链上也没有这个块
) W2 V( L. |5 h                        if number+maxUncleDist
# Y  k; ]: n2 f6 L$ ]func (f *Fetcher) insert(peer string, block *types.Block) {4 O  g0 B& M& ^% c: f1 _$ {0 N! c
        hash := block.Hash()8 \0 h6 N6 f2 t. t2 d  k
        // Run the import on a new thread1 i' M% j2 ?; D5 f7 _* J  D
        log.Debug("Importing propagated block", "peer", peer, "number", block.Number(), "hash", hash)
% O% V' G+ v4 P/ q/ @( j. Q4 w5 Z        go func() {
: d; n# ?& y- ]                defer func() { f.done
0 S! s. A* Y0 C$ A% G) tNewBlockHashesMsg的处理5 [% w/ W2 E6 g. b
本节介绍NewBlockHashesMsg的处理,其实,消息处理是简单的,而复杂一点的是从Peer哪获取完整的区块,下节再看。3 ^, a; _8 L% L" j; H" h
流程如下:- J) g6 X/ E8 F0 b# h
对消息进行RLP解码,然后标记Peer已经知道此区块。寻找出本地区块链不存在的区块Hash值,把这些未知的Hash通知给fetcher。fetcher.Notify记录好通知信息,塞入notify通道,以便交给fetcher的协程。fetcher.loop()会对notify中的消息进行处理,确认区块并非DOS攻击,然后检查区块的高度,判断该区块是否已经在fetching或者comleting(代表已经下载区块头,在下载body),如果都没有,则加入到announced中,触发0s定时器,进行处理。1 D* _# F) C: q1 Z: b8 Q) C4 P, {
关于announced下节再介绍。
/ L- o- y3 @3 j) y6 P, h; D6 e6 A, B: I3 @
// handleMsg()部分
  ~5 ^" w, B( D* |7 lcase msg.Code == NewBlockHashesMsg:
) |8 N6 y! Z* e1 T: q, S1 r, V        var announces newBlockHashesData$ x* s/ z8 Q2 ^
        if err := msg.Decode(&announces); err != nil {( H; }' f3 E9 B- G1 b  C; l
                return errResp(ErrDecode, "%v: %v", msg, err)3 G# O+ e8 Z" X, S# z, I# J. M, {
        }
+ c- C. m: Y6 ]! R        // Mark the hashes as present at the remote node
! U/ {% p- W" l5 P6 i" M4 A+ v. c        for _, block := range announces {; y6 U5 E+ t1 b# q1 A5 l, {" D
                p.MarkBlock(block.Hash)
1 C  B* E9 Q: [        }/ A: w% ~; G, i+ i* S3 d3 c
        // Schedule all the unknown hashes for retrieval4 c* Y, Q3 U$ F8 v+ `
        // 把本地链没有的块hash找出来,交给fetcher去下载) F9 A# ~0 W6 F( F8 r. ^1 I
        unknown := make(newBlockHashesData, 0, len(announces))6 u& f. B. ^1 s& r9 z
        for _, block := range announces {
; A* }- l$ E) z                if !pm.blockchain.HasBlock(block.Hash, block.Number) {
+ o5 w! C  S+ W  o6 O                        unknown = append(unknown, block)
1 e* H; Y/ o6 X) O9 c# @# q                }
8 ?9 b" t0 Q! s2 [8 ~  c) k# s        }
8 R" q) X/ {8 `! O. a        for _, block := range unknown {  k: j' I! E6 f, A
                pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestOneHeader, p.RequestBodies)
. ]4 Q" S4 M0 H        }" J0 h! X' U0 A% w5 X3 ~. K4 [
// Notify announces the fetcher of the potential availability of a new block in2 T  c& o# e& q5 }; B+ K9 [
// the network.- W& k2 ?- C5 k9 t6 l- `0 ]. k
// 通知fetcher(自己)有新块产生,没有块实体,有hash、高度等信息9 i' A& c# |% j$ J, T, D: t) L
func (f *Fetcher) Notify(peer string, hash common.Hash, number uint64, time time.Time,: [) @$ n' \; m. G# _
        headerFetcher headerRequesterFn, bodyFetcher bodyRequesterFn) error {
6 ~! s) o6 I4 u; I# @        block := &announce{
- ]% V+ j+ g* r$ V' D+ M. h7 [                hash:        hash,# p  W( _& d" J) j4 A* N
                number:      number,
' B2 ?2 G) o- K* j. l                time:        time,# e$ g4 n: K2 a  c8 u* y
                origin:      peer,
& P& ?, C$ \/ n  h- L                fetchHeader: headerFetcher,( k' G/ t5 i/ l# C9 f1 Z2 B
                fetchBodies: bodyFetcher,
# d& y  x8 F& U. @        }
6 |# w8 k9 w  s8 s# L( }7 A        select {# M, |4 ?7 L0 c' L' [7 Y
        case f.notify  hashLimit {: k: _4 R5 C" l+ T
                log.Debug("Peer exceeded outstanding announces", "peer", notification.origin, "limit", hashLimit)  U/ x) O; E6 Z! [4 @
                propAnnounceDOSMeter.Mark(1)
9 o1 V+ s: C4 b$ |. X5 A                break( |  g; d, h& o
        }
- d3 `* N" J% ~( {8 t3 q        // If we have a valid block number, check that it's potentially useful4 |6 K: f. k8 q2 ~: J
        // 高度检查2 P' @* P: [4 u9 g
        if notification.number > 0 {
, z$ c5 @6 K& v. j0 E: ]                if dist := int64(notification.number) - int64(f.chainHeight()); dist  maxQueueDist {" [9 ?$ z- D- j- }) E
                        log.Debug("Peer discarded announcement", "peer", notification.origin, "number", notification.number, "hash", notification.hash, "distance", dist)- u$ n  O! y- {
                        propAnnounceDropMeter.Mark(1)
  ~. k! E/ t+ D                        break5 \2 E+ r9 ^/ e! p8 F6 E. N
                }- l% y8 \; Y! M5 Z% K
        }& c  l6 m! d7 R' }  x. A2 N
        // All is well, schedule the announce if block's not yet downloading; t# Z: n, o+ k" Q, H8 Z2 h
        // 检查是否已经在下载,已下载则忽略- n: l" K; ]- M& X) h
        if _, ok := f.fetching[notification.hash]; ok {
3 H  J' |4 B! ]6 Q1 J" V+ o. J+ \                break$ C$ Q8 y( r$ ]2 G/ |
        }
  ]% t; d9 c. j7 _        if _, ok := f.completing[notification.hash]; ok {
3 L+ S( x8 a8 z, d                break1 U( l. ~( t- w# ^! ?$ _# Z
        }
8 \, a/ U8 l# q7 S  A. `/ \        // 更新peer已经通知给我们的区块数量
; S* [+ @, W% E        f.announces[notification.origin] = count
9 A5 l$ f" `" y4 f8 y. }        // 把通知信息加入到announced,供调度
) ?, o+ i+ f# t- J        f.announced[notification.hash] = append(f.announced[notification.hash], notification): }: J% f" N! {5 ]/ j9 G' z7 K
        if f.announceChangeHook != nil && len(f.announced[notification.hash]) == 1 {
; V, Q) N  _1 Y& c2 P. \; a9 W                f.announceChangeHook(notification.hash, true)
& \7 {2 {( c- H0 k+ Q* [        }
+ f. j3 K; u# @0 v5 p, C* w        if len(f.announced) == 1 {4 G" F  I; g; u% }9 `; n* S9 m
                // 有通知放入到announced,则重设0s定时器,loop的另外一个分支会处理这些通知
, G5 O. _7 e. q6 |- ]2 W, ~                f.rescheduleFetch(fetchTimer)$ g& D  q# s& M4 i5 y
        }
+ J  g7 [# t3 x# e0 z3 D: O2 O) Efetcher获取完整区块- A5 ]. W2 ^0 l" I/ U$ q. N2 |4 X
本节介绍fetcher获取完整区块的过程,这也是fetcher最重要的功能,会涉及到fetcher至少80%的代码。单独拉放一大节吧。/ P$ L  t& l' k8 N( M
Fetcher的大头
  M; W5 f6 W+ d  IFetcher最主要的功能就是获取完整的区块,然后在合适的实际交给InsertChain去验证和插入到本地区块链。我们还是从宏观入手,看Fetcher是如何工作的,一定要先掌握好宏观,因为代码层面上没有这么清晰。- H6 F# t9 L" b; W8 Z3 Z
宏观* m: \5 C1 @* T
首先,看两个节点是如何交互,获取完整区块,使用时序图的方式看一下,见图6,流程很清晰不再文字介绍。  Z' ~8 M" ^+ A- H

$ b/ @  x1 n' ?5 N. G' U再看下获取区块过程中,fetcher内部的状态转移,它使用状态来记录,要获取的区块在什么阶段,见图7。我稍微解释一下:$ c3 `' i; C: j0 k4 y. d9 X& y
收到NewBlockHashesMsg后,相关信息会记录到announced,进入announced状态,代表了本节点接收了消息。announced由fetcher协程处理,经过校验后,会向给他发送消息的Peer发送请求,请求该区块的区块头,然后进入fetching状态。获取区块头后,如果区块头表示没有交易和uncle,则转移到completing状态,并且使用区块头合成完整的区块,加入到queued优先级队列。获取区块头后,如果区块头表示该区块有交易和uncle,则转移到fetched状态,然后发送请求,请求交易和uncle,然后转移到completing状态。收到交易和uncle后,使用头、交易、uncle这3个信息,生成完整的区块,加入到队列queued。
9 |. o( P) ?9 g! F" I/ p6 `, k+ R9 g2 ]' h& ^

0 E( s" l7 k* e8 e: B- W微观
0 J; Y' p1 M* R) h9 ?接下来就是从代码角度看如何获取完整区块的流程了,有点多,看不懂的时候,再回顾下上面宏观的介绍图。, F1 N/ Y* R7 p7 Q( _1 p
首先看Fetcher的定义,它存放了通信数据和状态管理,捡加注释的看,上文提到的状态,里面都有。
8 J( g6 O3 Q" H: S/ b5 Q# B1 Q// Fetcher is responsible for accumulating block announcements from various peers
0 k" i  |1 d0 ~// and scheduling them for retrieval.
  u4 N% t! R$ \' \6 ?. Z// 积累块通知,然后调度获取这些块
! W8 A: G4 A+ g3 F  ]type Fetcher struct {9 @& P6 y2 e2 t2 n8 h
        // Various event channels5 U1 @* l' s4 i7 Z3 O/ v
    // 收到区块hash值的通道  f1 U! D$ h. y! O* k. z% L
        notify chan *announce7 V3 v5 N& b7 `
    // 收到完整区块的通道
$ V. C! p  ~" c# s        inject chan *inject( ~5 J! \: n0 W
        blockFilter chan chan []*types.Block; B& ?% K& g$ Q8 B) O5 e
        // 过滤header的通道的通道2 }% p) N; h$ @9 B) ], E) V
        headerFilter chan chan *headerFilterTask2 ]! G+ ^+ h) H9 k! M3 D
        // 过滤body的通道的通道  Z; L' g; S/ w( `! l  M: @% P, n
        bodyFilter chan chan *bodyFilterTask/ ^$ V7 E6 l4 }% M+ U
        done chan common.Hash/ d4 i0 X# [2 {
        quit chan struct{}8 E$ g( n. r0 K- K" [: {
        // Announce states
% \6 S) Y3 J, I        // Peer已经给了本节点多少区块头通知  [8 ]! ?; ]5 ^
        announces map[string]int // Per peer announce counts to prevent memory exhaustion3 }" E! _& f' Q) Q- G& F
        // 已经announced的区块列表. w2 b5 B  u  N. o3 {% t
        announced map[common.Hash][]*announce // Announced blocks, scheduled for fetching
5 J+ C& E* [: h$ S5 A7 A        // 正在fetching区块头的请求
2 r9 ~5 p% |7 m3 o7 G        fetching map[common.Hash]*announce // Announced blocks, currently fetching: ]' T: {3 `. ^0 |
        // 已经fetch到区块头,还差body的请求,用来获取body
" K7 P1 |8 @+ }9 K) `& Q        fetched map[common.Hash][]*announce // Blocks with headers fetched, scheduled for body retrieval
% O6 S% F3 s* T. T' g1 P        // 已经得到区块头的  R; U7 w0 b4 K7 |- M/ k
        completing map[common.Hash]*announce // Blocks with headers, currently body-completing
( X! f, u0 B6 E& t( M        // Block cache7 C. @- T6 e2 w6 P( Q( }
        // queue,优先级队列,高度做优先级; T) [2 z6 J# E# o
        // queues,统计peer通告了多少块
8 \4 I. S& W8 [: H2 U# m7 i8 a1 w& `        // queued,代表这个块如队列了,
! X9 M) l, D# q, d6 }; ]; h        queue  *prque.Prque            // Queue containing the import operations (block number sorted)% l" O/ H" \  ]7 k' f% l
        queues map[string]int          // Per peer block counts to prevent memory exhaustion" I. \1 B; J) M& _
        queued map[common.Hash]*inject // Set of already queued blocks (to dedupe imports)
. S6 C5 @# z* i; P! ]        // Callbacks
# Z% r4 z7 r4 [7 O        getBlock       blockRetrievalFn   // Retrieves a block from the local chain
; T1 I# t7 `: R        verifyHeader   headerVerifierFn   // Checks if a block's headers have a valid proof of work,验证区块头,包含了PoW验证
$ u' p3 |& B/ p/ s& p& w! c) v; y" e0 o        broadcastBlock blockBroadcasterFn // Broadcasts a block to connected peers,广播给peer
. _5 d, I, i: v! [( S        chainHeight    chainHeightFn      // Retrieves the current chain's height8 H% T4 t, o0 @3 q" ~, y& X
        insertChain    chainInsertFn      // Injects a batch of blocks into the chain,插入区块到链的函数) k2 z, [* F2 i" O# H
        dropPeer       peerDropFn         // Drops a peer for misbehaving$ V- C9 A( i8 q: t
        // Testing hooks
+ b- g" {9 l" V+ G+ o        announceChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a hash from the announce list
  g5 g/ B# G! L$ n) P5 [        queueChangeHook    func(common.Hash, bool) // Method to call upon adding or deleting a block from the import queue) D9 s/ T- o0 n, N- U/ l$ L
        fetchingHook       func([]common.Hash)     // Method to call upon starting a block (eth/61) or header (eth/62) fetch
/ q( {' N" {/ ~/ r- N; ]        completingHook     func([]common.Hash)     // Method to call upon starting a block body fetch (eth/62)' h, x: X1 R& F' s: D) G# N( P- A
        importedHook       func(*types.Block)      // Method to call upon successful block import (both eth/61 and eth/62). m) d$ K- J7 ]
}
" A8 N8 t" }8 d5 iNewBlockHashesMsg消息的处理前面的小节已经讲过了,不记得可向前翻看。这里从announced的状态处理说起。loop()        中,fetchTimer超时后,代表了收到了消息通知,需要处理,会从announced中选择出需要处理的通知,然后创建请求,请求区块头,由于可能有很多节点都通知了它某个区块的Hash,所以随机的从这些发送消息的Peer中选择一个Peer,发送请求的时候,为每个Peer都创建了单独的协程。
  r4 F, f1 _5 wcase  arriveTimeout-gatherSlack {0 f- W/ f0 V; {
                        // Pick a random peer to retrieve from, reset all others
6 l. L+ S% p* a1 A2 x                        // 可能有很多peer都发送了这个区块的hash值,随机选择一个peer( f' H; Z) q$ S! g( l( y% e0 h
                        announce := announces[rand.Intn(len(announces))]
( |5 l2 L; p) s* b& I- \9 V                        f.forgetHash(hash)  x, K8 j) H+ s! `
                        // If the block still didn't arrive, queue for fetching/ k( z8 d, }. k' G+ }/ z' {  O% H% Q
                        // 本地还没有这个区块,创建获取区块的请求5 l) ?* e" y& i9 e$ l
                        if f.getBlock(hash) == nil {0 N4 L, ]& S' W* m9 P  X' H1 A+ b
                                request[announce.origin] = append(request[announce.origin], hash)5 M! [! g; d( i( O
                                f.fetching[hash] = announce3 v+ y$ x9 g% |5 [+ [2 a* I0 ?
                        }
; f1 E- g* X( R" {: `" }- V, c3 X  h+ Q                }4 O9 D" T4 X1 t  R# x
        }
9 j+ |* H0 I2 R- p% P9 a        // Send out all block header requests
; K$ u; R8 d: g% v8 X( i& a        // 把所有的request发送出去" d% s7 j" {+ l3 t( m  X
        // 为每一个peer都创建一个协程,然后请求所有需要从该peer获取的请求
/ s! T/ Z+ r) Y# O1 B        for peer, hashes := range request {( T: k# p& t' B. H2 s
                log.Trace("Fetching scheduled headers", "peer", peer, "list", hashes)
; f% Y# [' V( H  _7 V# ^5 c                // Create a closure of the fetch and schedule in on a new thread& H& @: ~5 z' y# `: B/ J
                fetchHeader, hashes := f.fetching[hashes[0]].fetchHeader, hashes
5 n4 Y. Y) j: k3 f+ `% {                go func() {
! E& e3 K% w! F                        if f.fetchingHook != nil {* |) P9 Z$ X* A+ |
                                f.fetchingHook(hashes)+ X8 J# ]- ]/ Z! D' ?$ h
                        }
0 ]: _* X' R9 H  m                        for _, hash := range hashes {
8 ~2 Z2 w% v# \9 J                                headerFetchMeter.Mark(1)
( o, M, m9 S  Q& z9 ]                                fetchHeader(hash) // Suboptimal, but protocol doesn't allow batch header retrievals" A) u; e5 u! ?) R8 A/ ^
                        }$ L7 W9 ?/ ]' \
                }()0 x2 ~: x9 X, c
        }% D) p) z  ?* b- s! A
        // Schedule the next fetch if blocks are still pending% f( l) o' t4 w' H9 }
        f.rescheduleFetch(fetchTimer)
7 L% T1 G6 V/ O% H从Notify的调用中,可以看出,fetcherHeader()的实际函数是RequestOneHeader(),该函数使用的消息是GetBlockHeadersMsg,可以用来请求多个区块头,不过fetcher只请求一个。
2 {2 s# ?2 i* Zpm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestOneHeader, p.RequestBodies); W. F# `' Z4 z. j
// RequestOneHeader is a wrapper around the header query functions to fetch a
$ `/ y" q/ P) {; L+ j  b// single header. It is used solely by the fetcher.! C- M" Q0 \- S. [
func (p *peer) RequestOneHeader(hash common.Hash) error {
+ c& e( A$ h1 z1 B5 v" B, R: J. n        p.Log().Debug("Fetching single header", "hash", hash)7 p, y2 ~6 A3 ~) w
        return p2p.Send(p.rw, GetBlockHeadersMsg, &getBlockHeadersData{Origin: hashOrNumber{Hash: hash}, Amount: uint64(1), Skip: uint64(0), Reverse: false})/ k4 o0 a4 L$ U/ O
}  A1 g1 ~. T' S. M' A
GetBlockHeadersMsg的处理如下:因为它是获取多个区块头的,所以处理起来比较“麻烦”,还好,fetcher只获取一个区块头,其处理在20行~33行,获取下一个区块头的处理逻辑,这里就不看了,最后调用SendBlockHeaders()将区块头发送给请求的节点,消息是BlockHeadersMsg。
& r' p& l! G$ n1 f# c& @···
' z+ G( y$ a+ R- J3 ]* B0 u// handleMsg(); q$ a/ \* [4 q6 y$ Z- B  d5 c
// Block header query, collect the requested headers and reply
% e! Y3 v4 o' F# p5 B& ncase msg.Code == GetBlockHeadersMsg:
  a9 R' @4 j* A. f( v( x; s( p// Decode the complex header query  S& ?7 G  [* c/ d7 E2 y
var query getBlockHeadersData0 }% z4 D) E; G$ d
if err := msg.Decode(&query); err != nil {3 E  d, c2 X- \; C2 d
return errResp(ErrDecode, “%v: %v”, msg, err)
: n! X  s/ P" N6 u" s" n}2 I1 @/ ?3 @6 O  U/ x- P
hashMode := query.Origin.Hash != (common.Hash{})
3 ^3 I; t7 n$ K2 @9 z% ?4 Q. F// Gather headers until the fetch or network limits is reached5 {8 }1 ]! b6 G- o
// 收集区块头,直到达到限制+ O. n- D8 F3 P$ Y- s
var (
5 Y* E  V/ B! P0 l, G        bytes   common.StorageSize; \4 ?5 d) p4 H3 r8 z2 E
        headers []*types.Header
8 ]8 K% l7 d& X9 }        unknown bool
; A, A$ s; j6 _3 S/ g! })
6 B/ j0 h% |1 n// 自己已知区块 && 少于查询的数量 && 大小小于2MB && 小于能下载的最大数量
, u" i2 Q  M- \8 @5 q1 Tfor !unknown && len(headers)
) d) a5 `# O& ^) g/ @`BlockHeadersMsg`的处理很有意思,因为`GetBlockHeadersMsg`并不是fetcher独占的消息,downloader也可以调用,所以,响应消息的处理需要分辨出是fetcher请求的,还是downloader请求的。它的处理逻辑是:fetcher先过滤收到的区块头,如果fetcher不要的,那就是downloader的,在调用`fetcher.FilterHeaders`的时候,fetcher就将自己要的区块头拿走了。% g5 D( M3 }5 x% X4 T2 |0 T
// handleMsg()0 J6 }8 V' m  G
case msg.Code == BlockHeadersMsg:& H1 P# a9 ~2 S3 p+ [
// A batch of headers arrived to one of our previous requests
( j3 U, v. \: o! l& Q: l) hvar headers []*types.Header
2 Y3 J: F" x6 H0 xif err := msg.Decode(&headers); err != nil {1 ^* G! i6 f* u
return errResp(ErrDecode, “msg %v: %v”, msg, err), M, ?) Y0 q( l$ [
}
5 V/ e: {2 j) v1 ~. I, U; ?// If no headers were received, but we’re expending a DAO fork check, maybe it’s that; Y: N, o* u, x
// 检查是不是当前DAO的硬分叉& k7 j# f0 m' I' L+ t
if len(headers) == 0 && p.forkDrop != nil {
( m1 y( T7 N) G. z+ g9 N  S// Possibly an empty reply to the fork header checks, sanity check TDs
9 f! D. h7 I  GverifyDAO := true
' _$ z. n, y1 N& B4 x        // If we already have a DAO header, we can check the peer's TD against it. If% R& H6 C( j1 D+ g  ~0 {$ j" ~# h
        // the peer's ahead of this, it too must have a reply to the DAO check
; d6 G: T( V* F) y3 u2 Y        if daoHeader := pm.blockchain.GetHeaderByNumber(pm.chainconfig.DAOForkBlock.Uint64()); daoHeader != nil {
4 }% D5 w, d0 Z4 l: \! _                if _, td := p.Head(); td.Cmp(pm.blockchain.GetTd(daoHeader.Hash(), daoHeader.Number.Uint64())) >= 0 {9 N/ B0 H$ i1 X: a" u' t7 \
                        verifyDAO = false4 Q+ ~7 m2 E/ a7 g6 P
                }# A0 c6 J2 F: k
        }
+ ^; Z* E- }( E: O        // If we're seemingly on the same chain, disable the drop timer# r7 J& `- y. n- [0 V
        if verifyDAO {
, L/ q6 O1 ^7 D8 W2 U, Q$ t$ }                p.Log().Debug("Seems to be on the same side of the DAO fork")
) [/ |0 X3 T" G                p.forkDrop.Stop()
" O3 h6 c; X- y                p.forkDrop = nil% u- e$ |- I( ~) i) n
                return nil
* q; B( p3 b, R4 v        }
* S1 R, D  }; Y# k% H  |, Y/ |}
9 y2 v& r, e/ d0 {" M// Filter out any explicitly requested headers, deliver the rest to the downloader- T+ P1 F; \% P1 w. |
// 过滤是不是fetcher请求的区块头,去掉fetcher请求的区块头再交给downloader$ V6 |& u, Z4 z2 z' c
filter := len(headers) == 19 K* x6 x+ w- j! j/ i. A" V
if filter {
3 c: n$ I7 i, b( m; _        // If it's a potential DAO fork check, validate against the rules
( w7 |; N5 L8 c2 c* a! w, e# V- ?        // 检查是否硬分叉1 B) Z$ l2 g2 ?! F/ v! @
        if p.forkDrop != nil && pm.chainconfig.DAOForkBlock.Cmp(headers[0].Number) == 0 {1 k1 R& f1 I+ _6 w" h) r
                // Disable the fork drop timer( a; L) D( A8 o; E0 |
                p.forkDrop.Stop()) y+ H, _& A7 n: N/ e" w
                p.forkDrop = nil: j, S% H- W8 `+ i$ L5 o( G
                // Validate the header and either drop the peer or continue9 T/ J; p* s" U1 N
                if err := misc.VerifyDAOHeaderExtraData(pm.chainconfig, headers[0]); err != nil {0 T/ B/ H7 B* U
                        p.Log().Debug("Verified to be on the other side of the DAO fork, dropping")
, F- M/ S% |& k; k& w- b$ q                        return err. z; H1 ^  _+ U% z5 {! p( p# L; r
                }" t9 e* u/ q1 `0 C
                p.Log().Debug("Verified to be on the same side of the DAO fork")
* }" Z8 O5 T1 T* c+ \& M. U                return nil
! O$ I  i0 s8 O        }
/ Q3 G( _6 K" z$ f        // Irrelevant of the fork checks, send the header to the fetcher just in case( M) g2 S5 N9 ~
        // 使用fetcher过滤区块头/ y9 K$ N! {. \6 B& Y  N; u* F
        headers = pm.fetcher.FilterHeaders(p.id, headers, time.Now()); G3 @2 T1 Y9 Y2 R0 q
}2 ?3 x3 Q8 p5 x
// 剩下的区块头交给downloader
+ A0 ^. [! g# T3 `5 Jif len(headers) > 0 || !filter {
6 `5 Q* \8 R. q        err := pm.downloader.DeliverHeaders(p.id, headers)) ^- \% d2 }" B8 `* L
        if err != nil {
9 Z0 C: ^. S2 F# S0 ~% @/ l                log.Debug("Failed to deliver headers", "err", err)
$ N4 {4 V" Y# k' ~. _' y        }
) I0 I8 R: m' c  r}( _* H: z2 k0 O
`FilterHeaders()`是一个很有大智慧的函数,看起来耐人寻味,但实在妙。它要把所有的区块头,都传递给fetcher协程,还要获取fetcher协程处理后的结果。`fetcher.headerFilter`是存放通道的通道,而`filter`是存放包含区块头过滤任务的通道。它先把`filter`传递给了`headerFilter`,这样`fetcher`协程就在另外一段等待了,而后将`headerFilterTask`传入`filter`,`fetcher`就能读到数据了,处理后,再将数据写回`filter`而刚好被`FilterHeaders`函数处理了,该函数实际运行在`handleMsg()`的协程中。
7 ~' v4 l  O- u1 @5 j* Q每个Peer都会分配一个ProtocolManager然后处理该Peer的消息,但`fetcher`只有一个事件处理协程,如果不创建一个`filter`,fetcher哪知道是谁发给它的区块头呢?过滤之后,该如何发回去呢?
0 }) B+ O# W: V9 w' v) N; ]1 ]// FilterHeaders extracts all the headers that were explicitly requested by the fetcher,( f/ d2 |( ], y
// returning those that should be handled differently.
# Z% u: G$ q3 L/ C// 寻找出fetcher请求的区块头6 g# V0 C, C0 T$ ?$ I$ I- g
func (f *Fetcher) FilterHeaders(peer string, headers []*types.Header, time time.Time) []*types.Header {
- g, }: v# S( V6 n% @! o6 C! Llog.Trace(“Filtering headers”, “peer”, peer, “headers”, len(headers))
) X: o$ v& k$ }8 k+ }9 j// Send the filter channel to the fetcher
9 z, w8 D( Y6 G. d+ e// 任务通道' p3 I; ~+ H0 B# R
filter := make(chan *headerFilterTask)
+ x4 |% x( v+ e$ Q2 sselect {
# H6 V' y$ ?9 h) I. _3 H// 任务通道发送到这个通道5 B) F! d" O- z' r9 G: j
case f.headerFilter
' N4 P* m, I7 P% I0 V. s}
9 f: ~6 f, M3 G  V* L8 a* _* Q接下来要看f.headerFilter的处理,这段代码有90行,它做了一下几件事:
4 v: F2 f* U! p7 e$ v1 ~; ^$ M1. 从`f.headerFilter`取出`filter`,然后取出过滤任务`task`。8 g/ w; N; ^! ]$ ]6 U0 f3 j
2. 它把区块头分成3类:`unknown`这不是分是要返回给调用者的,即`handleMsg()`, `incomplete`存放还需要获取body的区块头,`complete`存放只包含区块头的区块。遍历所有的区块头,填到到对应的分类中,具体的判断可看18行的注释,记住宏观中将的状态转移图。
4 @8 t; [+ D& K( n4 ^5 P. `3. 把`unknonw`中的区块返回给`handleMsg()`。
* b) |# i0 o9 K, {+ \& T4. 把` incomplete`的区块头获取状态移动到`fetched`状态,然后触发定时器,以便去处理complete的区块。
$ q- T) {4 [. S/ p! K5. 把`compelete`的区块加入到`queued`。5 f8 u" `$ r9 B/ Q9 T. N0 Z
// fetcher.loop()4 e/ o) X2 @) B- g: e, M( C
case filter := ( F/ E; C, S) ]' p: l8 G
// Split the batch of headers into unknown ones (to return to the caller),
3 {' K$ z7 O  A- D% s// known incomplete ones (requiring body retrievals) and completed blocks.: v/ R# f  b8 D  i2 ]: `2 S
// unknown的不是fetcher请求的,complete放没有交易和uncle的区块,有头就够了,incomplete放
% `3 o1 `; s% k3 U; ?1 r// 还需要获取uncle和交易的区块3 S' U2 z; A5 s
unknown, incomplete, complete := []*types.Header{}, []*announce{}, []*types.Block{}  y( a, p* `& d4 s. s6 ], P
// 遍历所有收到的header
9 W& N# i0 w4 s) @* C# Sfor _, header := range task.headers {
4 @. C: H$ {# [/ F( @: S        hash := header.Hash()* @+ e/ b' [2 `$ \4 s
        // Filter fetcher-requested headers from other synchronisation algorithms# G! S, k+ h; _
        // 是正在获取的hash,并且对应请求的peer,并且未fetched,未completing,未queued
2 U: P/ n7 m! V! W. M        if announce := f.fetching[hash]; announce != nil && announce.origin == task.peer && f.fetched[hash] == nil && f.completing[hash] == nil && f.queued[hash] == nil {
& V) M/ |+ \: K$ g8 b/ S' B) s                // If the delivered header does not match the promised number, drop the announcer
0 g1 a4 e' ?% n% [. p                // 高度校验,竟然不匹配,扰乱秩序,peer肯定是坏蛋。+ [% @% e  N' D: `0 E
                if header.Number.Uint64() != announce.number {' ?1 c8 ~" J. [% X, ?3 L+ T6 h
                        log.Trace("Invalid block number fetched", "peer", announce.origin, "hash", header.Hash(), "announced", announce.number, "provided", header.Number)
8 M' w5 c, z0 T8 K4 i                        f.dropPeer(announce.origin)
7 Z7 G! K$ F3 q6 E+ w- N+ A                        f.forgetHash(hash)
2 [9 |! c' u# W" S8 n' v2 e                        continue
0 @0 r7 R& ~! l0 c# E4 O2 P                }
5 Z$ J+ D; i5 w& g7 l& G                // Only keep if not imported by other means) H. k" h4 I! n" {4 h6 P' n
                // 本地链没有当前区块
5 e2 Q5 @& _. q                if f.getBlock(hash) == nil {
( c+ f( T" {/ w' D2 D: X& B0 k                        announce.header = header
# [0 o- V- @) `0 M4 w! u" i. ?9 M# F2 i+ g                        announce.time = task.time
, R4 `; k8 L" S, S, T% g3 r                        // If the block is empty (header only), short circuit into the final import queue( r( j2 `& J2 @9 ~- v
                        // 如果区块没有交易和uncle,加入到complete' o/ z0 G# [# b$ q- U; k) T, L
                        if header.TxHash == types.DeriveSha(types.Transactions{}) && header.UncleHash == types.CalcUncleHash([]*types.Header{}) {
" {2 {$ n( z# S3 B7 o% I+ ~& Y                                log.Trace("Block empty, skipping body retrieval", "peer", announce.origin, "number", header.Number, "hash", header.Hash())* @5 H7 P% V! t2 I. s9 Q7 Q- S
                                block := types.NewBlockWithHeader(header)
4 L) g' G  s! C" D* z* a! |                                block.ReceivedAt = task.time
6 F* Y" f, l0 {" _* R                                complete = append(complete, block)
/ P0 M  b5 j+ c+ k+ z7 U6 J                                f.completing[hash] = announce
) g* ?- @0 F$ v6 C( ?9 L# c                                continue7 L9 E# k& D  b
                        }" H& {2 o" t# H4 U  `  [( _
                        // Otherwise add to the list of blocks needing completion, G* X8 B( c; y8 @7 d$ D/ f7 E
                        // 否则就是不完整的区块9 U. B" q. t+ k
                        incomplete = append(incomplete, announce)
) V6 G2 C6 J. R6 e$ Y( n                } else {
* X6 V; W. u9 j7 A                        log.Trace("Block already imported, discarding header", "peer", announce.origin, "number", header.Number, "hash", header.Hash())
# {  ^2 v( A. L. N2 K                        f.forgetHash(hash)
  \" |8 z6 t. F2 M" ]$ Y                }
7 }7 U( A# \- @. `0 l        } else {2 u6 k' u6 I' |
                // Fetcher doesn't know about it, add to the return list% X. |7 U/ j$ {0 r  q
                // 没请求过的header
$ f/ ?1 O! O/ X$ O                unknown = append(unknown, header)
. |2 ^& a4 ^& P        }  @7 L; p$ A* Y- Y
}
  L! C  U- [- H: m0 F& J5 v" ~# [// 把未知的区块头,再传递会filter: V, L4 ~! J& x! t+ {
headerFilterOutMeter.Mark(int64(len(unknown)))
6 C1 v1 v( p- M- ?: F! J. ^& Lselect {
: U* [/ L4 p: N# t* ucase filter
; r) @5 X% |$ |. N9 e2 N3 i跟随状态图的转义,剩下的工作是`fetched`转移到`completing`        ,上面的流程已经触发了`completeTimer`定时器,超时后就会处理,流程与请求Header类似,不再赘述,此时发送的请求消息是`GetBlockBodiesMsg`,实际调的函数是`RequestBodies`。( g1 C. H) @( @1 e# c. I; U
// fetcher.loop()
: H0 ?+ \# D! b3 `. o& q4 h1 zcase + C" f& N* W! ?5 A( V& f; e. N+ C! L
// 遍历所有待获取body的announce  E) J) G$ Q; Q, p. f& V; ~
for hash, announces := range f.fetched {
* l; p( G# O$ J% I        // Pick a random peer to retrieve from, reset all others5 B/ Y. Y& y, t# D4 }7 A0 M' }
        // 随机选一个Peer发送请求,因为可能已经有很多Peer通知它这个区块了
$ [: ?& r& h1 M7 G7 R        announce := announces[rand.Intn(len(announces))]! I9 b4 E3 Q  s* a! j4 W
        f.forgetHash(hash)
6 V* q0 d% i; _( ]+ Z% L        // If the block still didn't arrive, queue for completion
1 e& H; G2 ]& y  {        // 如果本地没有这个区块,则放入到completing,创建请求
2 a1 g% }- ]9 F3 X        if f.getBlock(hash) == nil {
" `# E( B% @; e' N  A, _$ C                request[announce.origin] = append(request[announce.origin], hash)$ D6 ^9 h+ L, P4 r
                f.completing[hash] = announce
+ p9 }. @( F) |, J8 J        }
" w8 M" d& ?# E3 b& T}& q: o7 b/ y7 C7 F1 ?9 k
// Send out all block body requests. R" d& P5 r; g& f
// 发送所有的请求,获取body,依然是每个peer一个单独协程6 i3 g" J2 j! v- H% o8 d) l
for peer, hashes := range request {
. Z, P; d/ j, F2 d        log.Trace("Fetching scheduled bodies", "peer", peer, "list", hashes)
& |! D) f3 C- C$ O' I( m        // Create a closure of the fetch and schedule in on a new thread( {7 H7 y, |8 V' ~; a1 N
        if f.completingHook != nil {
7 k# X. p/ c& x) m( Z" X; L                f.completingHook(hashes)
; Z& j; b# \9 x: w- \7 T7 Q6 B        }
7 B! l+ d. c2 C  z. ]        bodyFetchMeter.Mark(int64(len(hashes)))2 J1 y$ ^* j6 t' a* [. d1 x
        go f.completing[hashes[0]].fetchBodies(hashes)
% o1 e/ K3 o  i9 |2 B9 l7 M}
, A0 o. o, ?9 L% I' J2 j5 ^; D% ^// Schedule the next fetch if blocks are still pending$ B# T2 D: C% i5 N& m) h% H; v
f.rescheduleComplete(completeTimer)
" c+ v& L8 g* y" }# j4 c2 \`handleMsg()`处理该消息也是干净利落,直接获取RLP格式的body,然后发送响应消息。( x5 C& p; k& N- G/ W
// handleMsg()2 k$ G6 ]- O+ i$ {6 g' N8 l) I
case msg.Code == GetBlockBodiesMsg:
# ?1 f, h+ Z5 S! r7 A4 m! G// Decode the retrieval message
% ^4 L$ ^6 h: N+ Q2 u2 }, X+ AmsgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
" D3 K$ E6 A& Kif _, err := msgStream.List(); err != nil {
1 R7 Z3 `4 H+ ^# @return err
* y; V+ p8 I* V5 n2 g}( g) F: P2 I3 ]$ A# l5 |4 v, d
// Gather blocks until the fetch or network limits is reached
8 @, N$ k, b: W1 I: u0 R6 m6 Gvar (
& E/ O" K# \. p" G6 z# Chash   common.Hash: Z9 ~3 L( u0 k3 |, S
bytes  int
) N4 ~  Q/ _8 r, L! k- S* ~0 B1 ebodies []rlp.RawValue
9 C9 d7 z5 `! b) ~6 i7 l)8 R0 A: W* k0 U- t/ |, e
// 遍历所有请求
, ^- j( k4 O  C& @- C2 s  x% Qfor bytes & k/ V6 v  F7 c; W; I
响应消息`BlockBodiesMsg`的处理与处理获取header的处理原理相同,先交给fetcher过滤,然后剩下的才是downloader的。需要注意一点,响应消息里只包含交易列表和叔块列表。$ N' i) r; p& a% y" H) K* k' F  I
// handleMsg()
0 D7 c' i, T1 @9 hcase msg.Code == BlockBodiesMsg:
$ o4 f, z4 L7 V1 l! b( |// A batch of block bodies arrived to one of our previous requests
& w3 L8 O% c( v' w% c- Evar request blockBodiesData
" s. l+ `. J1 D" i3 x# Mif err := msg.Decode(&request); err != nil {  R3 G3 u0 o$ [8 u5 ?
return errResp(ErrDecode, “msg %v: %v”, msg, err). q7 z( s$ ]$ n& d; b
}, S' a( F1 `) X4 h8 R! p
// Deliver them all to the downloader for queuing" k/ a9 t5 c% j. D- `: v+ @
// 传递给downloader去处理
5 ]3 g5 W' a) w: S2 m# Btransactions := make([][]*types.Transaction, len(request))
# M# ?4 q( y, p+ I1 J( |) b( Wuncles := make([][]*types.Header, len(request)). o) D3 q; g9 Y; E1 C. q
for i, body := range request {
& V: b/ R: \4 T' N' M        transactions = body.Transactions
* A' ~0 |$ X% I        uncles = body.Uncles
- P9 I" `) u& @. J2 n}& x3 i2 N& N5 U+ }& P2 C
// Filter out any explicitly requested bodies, deliver the rest to the downloader# y; \7 ]9 e8 _; a9 k0 N- k
// 先让fetcher过滤去fetcher请求的body,剩下的给downloader
* C9 b$ |  C  _+ `6 {filter := len(transactions) > 0 || len(uncles) > 0" Q: p7 P& B. s
if filter {
* k- ~+ F2 K" O1 e2 C5 H. k( z# Q        transactions, uncles = pm.fetcher.FilterBodies(p.id, transactions, uncles, time.Now())1 ^. T# o5 H0 t) p& u: A
}  N) x8 C' {# t& N! Y' A
// 剩下的body交给downloader
+ s1 _4 q2 ]0 N6 g* g6 e4 Qif len(transactions) > 0 || len(uncles) > 0 || !filter {# L0 J# L+ C- p; G* P, R: l% R
        err := pm.downloader.DeliverBodies(p.id, transactions, uncles)8 A: L6 P, C% Z& J
        if err != nil {
" Z, G1 j$ b! m8 u  X3 Y: d- f7 Z                log.Debug("Failed to deliver bodies", "err", err)
( ?$ k5 N  o; {, }' r        }: Y* r+ |+ x: m
}: i3 o! |& y0 p
过滤函数的原理也与Header相同。
3 ]" x4 g/ i0 P) z  {// FilterBodies extracts all the block bodies that were explicitly requested by8 A' T1 k5 w# b* g9 q8 H& Q
// the fetcher, returning those that should be handled differently.3 _4 r4 T& ?! v; W* r7 S! {
// 过去出fetcher请求的body,返回它没有处理的,过程类型header的处理
2 _" t% L  _: n' P% H+ J/ j. ufunc (f *Fetcher) FilterBodies(peer string, transactions [][]*types.Transaction, uncles [][]*types.Header, time time.Time) ([][]*types.Transaction, [][]*types.Header) {
# l* B+ |- N+ K0 blog.Trace(“Filtering bodies”, “peer”, peer, “txs”, len(transactions), “uncles”, len(uncles))
1 ^5 T5 Y/ a1 v! G// Send the filter channel to the fetcher! Z" p2 e5 I, W; R) |+ f
filter := make(chan *bodyFilterTask)) m8 o/ G0 R' L& a; ^
select {
# X! f. z( P+ V! u  E( [, W. Gcase f.bodyFilter
6 G: ^! r! L8 m5 g; @; _}) P1 a; \, O8 t3 s2 Y) ~/ ?6 n
实际过滤body的处理瞧一下,这和Header的处理是不同的。直接看不点:
% d% q  |+ X7 F% T8 O2 V' N1. 它要的区块,单独取出来存到`blocks`中,它不要的继续留在`task`中。3 j# i% |/ e' g5 F
2. 判断是不是fetcher请求的方法:如果交易列表和叔块列表计算出的hash值与区块头中的一样,并且消息来自请求的Peer,则就是fetcher请求的。" O& S# {# y- e* s- O
3. 将`blocks`中的区块加入到`queued`,终结。4 S/ J( @# Q9 b; L4 K5 y/ W
case filter :=
9 S/ _; y9 \; Ablocks := []*types.Block{}
) ^; x, i" R7 V8 f! ]: N0 k$ e// 获取的每个body的txs列表和uncle列表* j/ K1 L7 m, V, Z
// 遍历每个区块的txs列表和uncle列表,计算hash后判断是否是当前fetcher请求的body
& \2 G9 F+ B  T& pfor i := 0; i ) L& b; W* [/ q  z9 y# k
}+ i' C+ v! @" `
7 i, J+ d( J3 s, t! h2 o( j' \/ W% J
至此,fetcher获取完整区块的流程讲完了,fetcher模块中80%的代码也都贴出来了,还有2个值得看看的函数:$ N3 _- s3 {& p( n* {$ u
1. `forgetHash(hash common.Hash)`:用于清空指定hash指的记/状态录信息。
, B; T( B" I7 ~8 ^' T2. `forgetBlock(hash common.Hash)`:用于从队列中移除一个区块。1 ^5 b# Y$ m6 g$ K& z
最后了,再回到开始看看fetcher模块和新区块的传播流程,有没有豁然开朗。. \+ G( m- K1 d  o: l0 U$ Z

& H  p. m- K# q
BitMere.com 比特池塘系信息发布平台,比特池塘仅提供信息存储空间服务。
声明:该文观点仅代表作者本人,本文不代表比特池塘立场,且不构成建议,请谨慎对待。
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

成为第一个吐槽的人

刘艳琴 小学生
  • 粉丝

    0

  • 关注

    0

  • 主题

    3