Hi 游客

更多精彩,请登录!

比特池塘 区块链技术 正文

以太坊源码分析:fetcher模块和区块传播

刘艳琴
171 0 0
从区块传播策略入手,介绍新区块是如何传播到远端节点,以及新区块加入到远端节点本地链的过程,同时会介绍fetcher模块,fetcher的功能是处理Peer通知的区块信息。在介绍过程中,还会涉及到p2p,eth等模块,不会专门介绍,而是专注区块的传播和加入区块链的过程。
8 c" Q, w, g) w. t$ x3 |当前代码是以太坊Release 1.8,如果版本不同,代码上可能存在差异。
) j; n6 E& i- I5 ^2 }总体过程和传播策略9 F  a7 k/ x3 w( P) \3 g
本节从宏观角度介绍,节点产生区块后,为了传播给远端节点做了啥,远端节点收到区块后又做了什么,每个节点都连接了很多Peer,它传播的策略是什么样的?
& _2 w/ x; p& o# ?: L: d总体流程和策略可以总结为,传播给远端Peer节点,Peer验证区块无误后,加入到本地区块链,继续传播新区块信息。具体过程如下。
5 \; C  {: f0 @' U先看总体过程。产生区块后,miner模块会发布一个事件NewMinedBlockEvent,订阅事件的协程收到事件后,就会把新区块的消息,广播给它所有的peer,peer收到消息后,会交给自己的fetcher模块处理,fetcher进行基本的验证后,区块没问题,发现这个区块就是本地链需要的下一个区块,则交给blockChain进一步进行完整的验证,这个过程会执行区块所有的交易,无误后把区块加入到本地链,写入数据库,这个过程就是下面的流程图,图1。! c) }3 l6 v( |- T
8 X, a- D) D5 `) ]4 r, I- m
总体流程图,能看到有个分叉,是因为节点传播新区块是有策略的。它的传播策略为:
% b$ ^; r. C5 `( W假如节点连接了N个Peer,它只向Peer列表的sqrt(N)个Peer广播完整的区块消息。向所有的Peer广播只包含区块Hash的消息。
0 s: i# Q9 b; K$ s' C, D: f, F) j策略图的效果如图2,红色节点将区块传播给黄色节点:4 ?1 f6 v- W" X$ Q
' l* S1 q6 b( R6 d# t* _7 z1 Q! n
& L2 f4 F2 E6 U
收到区块Hash的节点,需要从发送给它消息的Peer那里获取对应的完整区块,获取区块后就会按照图1的流程,加入到fetcher队列,最终插入本地区块链后,将区块的Hash值广播给和它相连,但还不知道这个区块的Peer。非产生区块节点的策略图,如图3,黄色节点将区块Hash传播给青色节点:
% K! e# \2 k- G. e. b! ~3 }9 ]) q
) A& D0 F& G) o5 f至此,可以看出以太坊采用以石击水的方式,像水纹一样,层层扩散新产生的区块。% J# E7 f5 f5 M; g9 k
Fetcher模块是干啥的6 {: E2 g0 s& D& n2 q
fetcher模块的功能,就是收集其他Peer通知它的区块信息:1)完整的区块2)区块Hash消息。根据通知的消息,获取完整的区块,然后传递给eth模块把区块插入区块链。- n( t0 x9 S( \7 P' x' l
如果是完整区块,就可以传递给eth插入区块,如果只有区块Hash,则需要从其他的Peer获取此完整的区块,然后再传递给eth插入区块/ S" _! h$ ^' R0 |/ M8 @, C, e

* Y8 {' z3 m# f, v源码解读& n/ Z, y9 r8 b) L0 Z! ?
本节介绍区块传播和处理的细节东西,方式仍然是先用图解释流程,再是代码流程。
! H& }6 a: f1 t1 U6 a+ j产块节点的传播新区块
! }" Z7 c3 m1 n  h7 c节点产生区块后,广播的流程可以表示为图4:
1 l% j+ r3 w$ T5 K发布事件事件处理函数选择要广播完整的Peer,然后将区块加入到它们的队列事件处理函数把区块Hash添加到所有Peer的另外一个通知队列每个Peer的广播处理函数,会遍历它的待广播区块队列和通知队列,把数据封装成消息,调用P2P接口发送出去
$ F3 G. {3 P, W4 N: ^
$ P0 b6 S, _7 r: ^# W* t" ]* Z" E
( T' i/ [* ^9 D8 z0 J. z) S再看下代码上的细节。( Q- r1 [+ S6 k/ ^
worker.wait()函数发布事件NewMinedBlockEvent。ProtocolManager.minedBroadcastLoop()是事件处理函数。它调用了2次pm.BroadcastBlock()。
5 u* s9 a6 u# R; v) D5 o- Y: v  w# ^7 `6 C5 H2 V
// Mined broadcast loop
7 ~% O# |6 u8 L4 dfunc (pm *ProtocolManager) minedBroadcastLoop() {
* j1 F$ b$ K. z0 X  H; ]        // automatically stops if unsubscribe
! G& ~5 W+ @$ y) j# a        for obj := range pm.minedBlockSub.Chan() {( ?7 |" G& }& Y3 G" L2 L
                switch ev := obj.Data.(type) {% N. u7 [% c% B$ C$ ?
                case core.NewMinedBlockEvent:
6 K4 S% h: Q0 A3 Y                        pm.BroadcastBlock(ev.Block, true)  // First propagate block to peers
; z8 _! M% T8 V' h! O; E! O                        pm.BroadcastBlock(ev.Block, false) // Only then announce to the rest* L% @! u' B8 `9 a7 X0 p8 S! u
                }
5 a: ]. @6 k6 {) `' z% e' u4 R" N# X% U        }
7 E1 G, p9 y  G}
5 j8 G0 `2 o3 A0 P$ [; _7 Spm.BroadcastBlock()的入参propagate为真时,向部分Peer广播完整的区块,调用peer.AsyncSendNewBlock(),否则向所有Peer广播区块头,调用peer.AsyncSendNewBlockHash(),这2个函数就是把数据放入队列,此处不再放代码。% C" f, L) x; w  z# R( S: r
// BroadcastBlock will either propagate a block to a subset of it's peers, or# P% v5 {9 m& W( T9 D9 T  ?
// will only announce it's availability (depending what's requested).; ]% C& a  P' ~# {* {
func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) {
" Z, ^, r/ Q+ e- e3 M! j        hash := block.Hash()
3 O. Y5 ~0 S0 G1 L4 k" V  L1 x        peers := pm.peers.PeersWithoutBlock(hash)
9 k0 \/ I) a6 u5 N1 U9 \        // If propagation is requested, send to a subset of the peer
( r+ X; l% I9 M3 ?+ u% |( f        // 这种情况,要把区块广播给部分peer$ d: q  \, m% {
        if propagate {# e( v. ]$ B9 ^  E
                // Calculate the TD of the block (it's not imported yet, so block.Td is not valid)
5 M  b, K  h9 i1 M; ?                // 计算新的总难度' d) S1 S, F- _8 e
                var td *big.Int
3 _5 D8 o; }" s7 N                if parent := pm.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1); parent != nil {7 S7 L- m3 g4 m$ x; }2 ~1 Y4 p
                        td = new(big.Int).Add(block.Difficulty(), pm.blockchain.GetTd(block.ParentHash(), block.NumberU64()-1))8 H% u, L# _* a( F% Y# q! P% h
                } else {
; N. ]7 K7 ^7 ]% a                        log.Error("Propagating dangling block", "number", block.Number(), "hash", hash)
" z8 w) v3 S$ _* W9 [- r4 ^2 D8 V                        return6 b: ^0 E  S4 C
                }
% E( L9 k* p$ T% {" `  [) r                // Send the block to a subset of our peers/ j+ F6 k6 _/ d/ a4 K
                // 广播区块给部分peer) x, ]2 N- Z" m/ Z- E0 D# V
                transfer := peers[:int(math.Sqrt(float64(len(peers))))]
) q  e1 s# A- U; p$ r, d& M                for _, peer := range transfer {
2 r7 `6 \% L1 V1 C                        peer.AsyncSendNewBlock(block, td)
/ h. r% Q$ e9 ^                }
9 T# ~5 x/ @# }$ C- t9 O9 r( t                log.Trace("Propagated block", "hash", hash, "recipients", len(transfer), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))( \# ?1 {% b( @  h  K& O
                return
6 e6 X* A  Y' V8 }& y$ j, P9 Z        }
3 I1 t# K* i2 Y6 b  A        // Otherwise if the block is indeed in out own chain, announce it1 h4 n! A  B" x6 ^5 m3 W6 F+ U
        // 把区块hash值广播给所有peer
, {1 B: h8 G) R0 c) ^( n8 P        if pm.blockchain.HasBlock(hash, block.NumberU64()) {9 S& c% j' n- M$ U) v# I
                for _, peer := range peers {
# h& j( S3 ~( Q3 n                        peer.AsyncSendNewBlockHash(block)
) b$ @; w# w) W$ _! I                }
' ^$ M7 x" ?- [6 M" b                log.Trace("Announced block", "hash", hash, "recipients", len(peers), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))
1 ]( S; W* J9 U$ U) G6 V# M4 o9 U        }
/ b  P% s9 S, I- b+ _: s( r1 D  `}
! U; ~* c' c  h8 l  Y& Xpeer.broadcase()是每个Peer连接的广播函数,它只广播3种消息:交易、完整的区块、区块的Hash,这样表明了节点只会主动广播这3中类型的数据,剩余的数据同步,都是通过请求-响应的方式。
- j  ]: \/ l' z. |// broadcast is a write loop that multiplexes block propagations, announcements5 Y( X& [& C4 K/ g
// and transaction broadcasts into the remote peer. The goal is to have an async
6 c9 y" m# q' P; t2 Y) ?- h( Z// writer that does not lock up node internals.8 r8 v4 \# O5 b3 w5 k: w
func (p *peer) broadcast() {& i) K1 A0 L0 g0 h+ x
        for {
; q" ~1 p! z; t. b* k                select {! B6 _" _7 g9 t* k6 y" o1 c. v
                // 广播交易" u: W  \, F( C: P. v! i
                case txs := 4 b. o0 T: x8 J- o- n
Peer节点处理新区块
4 P2 H* }- z0 C本节介绍远端节点收到2种区块同步消息的处理,其中NewBlockMsg的处理流程比较清晰,也简洁。NewBlockHashesMsg消息的处理就绕了2绕,从总体流程图1上能看出来,它需要先从给他发送消息Peer那里获取到完整的区块,剩下的流程和NewBlockMsg又一致了。
8 {3 j8 N$ I2 b+ ~. S4 D这部分涉及的模块多,画出来有种眼花缭乱的感觉,但只要抓住上面的主线,代码看起来还是很清晰的。通过图5先看下整体流程。
; v  e" v: f1 W& f, N消息处理的起点是ProtocolManager.handleMsg,NewBlockMsg的处理流程是蓝色标记的区域,红色区域是单独的协程,是fetcher处理队列中区块的流程,如果从队列中取出的区块是当前链需要的,校验后,调用blockchian.InsertChain()把区块插入到区块链,最后写入数据库,这是黄色部分。最后,绿色部分是NewBlockHashesMsg的处理流程,代码流程上是比较复杂的,为了能通过图描述整体流程,我把它简化掉了。6 M2 I) Z' V" X- N: v
! \9 c* g5 I8 L; v
仔细看看这幅图,掌握整体的流程后,接下来看每个步骤的细节。3 M& v" _% z; G+ j0 q2 b- ~
NewBlockMsg的处理
& F/ F) R; _* B本节介绍节点收到完整区块的处理,流程如下:) {- J1 m; O6 f2 t* c. F6 H! u
首先进行RLP编解码,然后标记发送消息的Peer已经知道这个区块,这样本节点最后广播这个区块的Hash时,不会再发送给该Peer。# [1 W7 a+ ?% v% }' ~' A$ K
将区块存入到fetcher的队列,调用fetcher.Enqueue。& T) E7 i: X# e4 E" O* T* b; {
更新Peer的Head位置,然后判断本地链是否落后于Peer的链,如果是,则通过Peer更新本地链。; i2 W8 r7 [5 C( S
只看handle.Msg()的NewBlockMsg相关的部分。
+ E8 T: C  W1 D3 u0 h& {" dcase msg.Code == NewBlockMsg:
1 _- G2 G' C- O5 P  b( ?        // Retrieve and decode the propagated block9 ?1 j. }& c( B' W( B* Y
        // 收到新区块,解码,赋值接收数据
2 P* J/ `8 q* I        var request newBlockData" V3 J) w+ [% p4 N
        if err := msg.Decode(&request); err != nil {% {3 y3 e" G5 z5 a- H% z
                return errResp(ErrDecode, "%v: %v", msg, err)# L7 i% c  ~* U" K9 a
        }' @; x9 n5 V  a) c4 @
        request.Block.ReceivedAt = msg.ReceivedAt
: D6 @7 d4 e9 y% V2 K2 b" n5 C& A        request.Block.ReceivedFrom = p2 k2 L" j( ?+ c
        // Mark the peer as owning the block and schedule it for import
, G* V9 }$ f! @/ L1 a+ V        // 标记peer知道这个区块# P% d. C* \9 ~
        p.MarkBlock(request.Block.Hash())6 j8 a3 G7 k; W2 D
        // 为啥要如队列?已经得到完整的区块了
# u0 j1 a2 [& s' }        // 答:存入fetcher的优先级队列,fetcher会从队列中选取当前高度需要的块8 \& r- ^/ \$ I& V1 w
        pm.fetcher.Enqueue(p.id, request.Block)
& f# ^7 b0 A& W        // Assuming the block is importable by the peer, but possibly not yet done so,+ N/ r. K* [) b% y, ?
        // calculate the head hash and TD that the peer truly must have.9 R) V) Q  }2 ]' c  l' t% A
        // 截止到parent区块的头和难度1 p/ Q+ }. ]1 j* _% u7 F, }
        var (  D& N7 i& a4 t% B3 |$ a$ ^% z
                trueHead = request.Block.ParentHash()4 `  ]6 F2 z6 H& V- E) W
                trueTD   = new(big.Int).Sub(request.TD, request.Block.Difficulty())8 S7 w0 J) h1 ], N7 J# i
        )
! V4 n$ x4 |4 n' A$ }        // Update the peers total difficulty if better than the previous& y  u% j5 F- e  J% [% G1 W
        // 如果收到的块的难度大于peer之前的,以及自己本地的,就去和这个peer同步
3 Z$ N5 }- y9 L" |/ H        // 问题:就只用了一下块里的hash指,为啥不直接使用这个块呢,如果这个块不能用,干嘛不少发送些数据,减少网络负载呢。
9 x% u, q6 a, x4 H        // 答案:实际上,这个块加入到了优先级队列中,当fetcher的loop检查到当前下一个区块的高度,正是队列中有的,则不再向peer请求: V  l& P* j: y
        // 该区块,而是直接使用该区块,检查无误后交给block chain执行insertChain
+ d. L# {8 k/ i2 z9 w; W  U( A5 b        if _, td := p.Head(); trueTD.Cmp(td) > 0 {/ w! b& r" q' w2 v3 w, \+ W
                p.SetHead(trueHead, trueTD)3 ]! v5 ^/ ^4 ?4 e5 F1 `3 O, K, m
                // Schedule a sync if above ours. Note, this will not fire a sync for a gap of
' y; l3 ]' p( V0 H                // a singe block (as the true TD is below the propagated block), however this
& D; f3 Y. V/ L3 I8 l* C                // scenario should easily be covered by the fetcher.1 Y3 G! f7 y, Q0 Z( ?  y( D  H
                currentBlock := pm.blockchain.CurrentBlock()
0 x) y7 N1 c5 o# r4 }4 V6 C                if trueTD.Cmp(pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64())) > 0 {
! X( V9 c9 x! E8 Z( X; A7 C                        go pm.synchronise(p)
, v8 m9 D9 J7 n! e% h                }% g' _, w; s5 G, H6 z
        }0 \8 _" R8 F* ~/ j& N
//------------------------ 以上 handleMsg* D$ N' P( ~9 T& n- x9 X5 X& Z
// Enqueue tries to fill gaps the the fetcher's future import queue.
3 E/ y2 W8 Q2 v3 V- b( ]% m" ]// 发给inject通道,当前协程在handleMsg,通过通道发送给fetcher的协程处理
# v1 G) z5 }6 kfunc (f *Fetcher) Enqueue(peer string, block *types.Block) error {
7 x* k2 A4 @! {" N. b9 V        op := &inject{
$ |/ M& a! \) E                origin: peer,4 v6 @  ], Q  |( F
                block:  block,. r0 H; V8 @. w  B$ b
        }* u8 m# [% g9 S. ^9 w& z' q% P
        select {6 O3 E; s+ s6 h, `9 G% D
        case f.inject  blockLimit {
9 N2 k; q6 t* E9 N) T                log.Debug("Discarded propagated block, exceeded allowance", "peer", peer, "number", block.Number(), "hash", hash, "limit", blockLimit)
8 H) ~1 o5 K* d, |1 L4 s                propBroadcastDOSMeter.Mark(1)
) g4 H. x. [7 a# }- d                f.forgetHash(hash)
+ W, p, W% r/ r0 L0 x                return
+ Z6 e% S/ R. t6 d7 |        }
& y' C6 d8 y$ R0 h3 o  ~        // Discard any past or too distant blocks7 Q$ d% F8 N8 k: [
        // 高度检查:未来太远的块丢弃
, D* D6 i4 i" J  ?, l& s' T        if dist := int64(block.NumberU64()) - int64(f.chainHeight()); dist  maxQueueDist {
* J3 |! C2 y4 e* I: L) s+ j0 F: B                log.Debug("Discarded propagated block, too far away", "peer", peer, "number", block.Number(), "hash", hash, "distance", dist)
% \: Z9 _, L+ g& g5 [                propBroadcastDropMeter.Mark(1)) E! v' A8 t6 u% o# {* k! F; _
                f.forgetHash(hash)
) f% V. w6 j0 L5 q9 {7 h2 A8 ~                return
3 ], Q1 x0 M+ S# G$ O/ g6 m        }/ l6 u" D$ ]9 R' \
        // Schedule the block for future importing
( B4 U- q0 Z  e1 U; h' i: ~/ A        // 块先加入优先级队列,加入链之前,还有很多要做# @' j3 x4 f! S
        if _, ok := f.queued[hash]; !ok {
( _8 `+ S! A& U  P, H                op := &inject{( P' J1 w' Q; Y* O3 k
                        origin: peer,- }& V6 `; t) a/ y9 M" J! n; f3 i9 M; c
                        block:  block,
. c# g9 P/ O* S( ]& ^                }2 @6 V1 _7 I* L+ A
                f.queues[peer] = count
1 }1 U3 S, p9 c, L                f.queued[hash] = op
2 b5 N% _+ F3 w3 G9 v                f.queue.Push(op, -float32(block.NumberU64()))  n) H, T" h0 u  T
                if f.queueChangeHook != nil {. k4 U+ I3 e% O3 h- X0 |
                        f.queueChangeHook(op.block.Hash(), true)
6 R5 s3 h# ]: O0 x3 W; l% D4 L# q                }
+ v! M& Q! ^% X) n. s* R% Q                log.Debug("Queued propagated block", "peer", peer, "number", block.Number(), "hash", hash, "queued", f.queue.Size())
% M1 T3 A7 j4 ]" c  t        }! h9 `$ s% i* L  L! s
}
) |+ A8 O4 C$ h& c  ?0 o) g* i6 |! @fetcher队列处理
$ L% `6 a" B& i3 v本节我们看看,区块加入队列后,fetcher如何处理区块,为何不直接校验区块,插入到本地链?' b$ j1 h* j4 R/ j( `: V' ~
由于以太坊又Uncle的机制,节点可能收到老一点的一些区块。另外,节点可能由于网络原因,落后了几个区块,所以可能收到“未来”的一些区块,这些区块都不能直接插入到本地链。% f4 Y$ |$ j4 N0 O3 s
区块入的队列是一个优先级队列,高度低的区块会被优先取出来。fetcher.loop是单独协程,不断运转,清理fecther中的事务和事件。首先会清理正在fetching的区块,但已经超时。然后处理优先级队列中的区块,判断高度是否是下一个区块,如果是则调用f.insert()函数,校验后调用BlockChain.InsertChain(),成功插入后,广播新区块的Hash6 i" ~' D5 C) A  |
// Loop is the main fetcher loop, checking and processing various notification6 J* [" k3 T! m5 F) u* K
// events./ P/ z$ U7 k4 Y" s4 b$ |
func (f *Fetcher) loop() {
8 C" S  Y& ?5 y- t. c% v        // Iterate the block fetching until a quit is requested
6 D1 W4 u$ V- ^- @* W6 n        fetchTimer := time.NewTimer(0)
/ ^1 Q# `  L/ a        completeTimer := time.NewTimer(0)2 B- N/ l4 x: v8 q$ ^
        for {+ E$ j8 w" c& X
                // Clean up any expired block fetches* h7 a; x- }4 q; T. k- P. ^# C
                // 清理过期的区块
& Q$ o4 s! l2 ?7 Q) _9 r2 d7 y: {                for hash, announce := range f.fetching {
0 R% @4 g. l6 E! E6 _& c% b                        if time.Since(announce.time) > fetchTimeout {/ s* @  ]7 j& ]9 r
                                f.forgetHash(hash)1 t" I4 Y: E' I
                        }
% }4 P8 g# Z+ }; J4 n0 I" _7 X4 N0 c                }
# z1 ^8 e7 }! ^/ P                // Import any queued blocks that could potentially fit* @+ G$ z* z% V5 R" n  j
                // 导入队列中合适的块. D, p/ r( O8 V! X
                height := f.chainHeight()
4 T7 y/ _% F5 f( Z! O$ ?. {                for !f.queue.Empty() {; q' g- D( v; O: H8 r* `0 R
                        op := f.queue.PopItem().(*inject)6 x7 K0 R" R* l
                        hash := op.block.Hash()1 t7 ^$ I1 E! o* i6 V
                        if f.queueChangeHook != nil {2 W2 D( Y7 n7 x$ i! e2 O7 N
                                f.queueChangeHook(hash, false)' h( H8 e: Y# C( H- T$ V
                        }6 O( F/ o% h  {
                        // If too high up the chain or phase, continue later) s6 f9 b( u! M2 s6 p
                        // 块不是链需要的下一个块,再入优先级队列,停止循环  m6 E$ l% m2 t
                        number := op.block.NumberU64()
1 S. G: y6 [8 x# [! H5 c% h7 w                        if number > height+1 {1 H6 ^4 F# o" f# u% _" D" x
                                f.queue.Push(op, -float32(number))
) j4 A% @! }# Y                                if f.queueChangeHook != nil {
1 {7 Y, s: Q& E/ L$ }5 D0 ~                                        f.queueChangeHook(hash, true)
+ i9 R0 ?" d1 d+ ^( x0 x  H                                }* H) p$ L0 u( s: S* l) B3 O
                                break" L" I. b9 a) A
                        }
! b3 a5 b- `1 ^& @6 k                        // Otherwise if fresh and still unknown, try and import
" X/ X& M) U' J& A                        // 高度正好是我们想要的,并且链上也没有这个块
# U7 k' A2 ~9 K* I5 G, _3 ~) G0 c' V                        if number+maxUncleDist - d$ O0 s( a4 M9 s5 Y) S. r
func (f *Fetcher) insert(peer string, block *types.Block) {/ O/ Q1 k! U% N* n+ `
        hash := block.Hash()
0 c- N1 d- X3 M1 f, m' R5 ^        // Run the import on a new thread9 B. e" Y3 B: H: h4 |: r0 w4 Z# M
        log.Debug("Importing propagated block", "peer", peer, "number", block.Number(), "hash", hash)/ t4 m- g! `! `& X0 p
        go func() {6 P$ y& N' Y1 E8 U6 z0 ^: F
                defer func() { f.done * j' i0 {& N. K$ ?( n* W" b" L  H
NewBlockHashesMsg的处理
$ l/ K4 J% F. v. T( M5 D5 Q本节介绍NewBlockHashesMsg的处理,其实,消息处理是简单的,而复杂一点的是从Peer哪获取完整的区块,下节再看。
" j' d: `& k% Y流程如下:5 M% q+ q" J% g
对消息进行RLP解码,然后标记Peer已经知道此区块。寻找出本地区块链不存在的区块Hash值,把这些未知的Hash通知给fetcher。fetcher.Notify记录好通知信息,塞入notify通道,以便交给fetcher的协程。fetcher.loop()会对notify中的消息进行处理,确认区块并非DOS攻击,然后检查区块的高度,判断该区块是否已经在fetching或者comleting(代表已经下载区块头,在下载body),如果都没有,则加入到announced中,触发0s定时器,进行处理。2 Q$ S5 T: u0 D% q
关于announced下节再介绍。
' I1 f8 e$ w/ ~. P7 x- H9 t. w. W$ A- v" f4 G" k. L
// handleMsg()部分4 H/ W2 b2 B* g" r) \: G0 U3 s
case msg.Code == NewBlockHashesMsg:
- }+ i5 q+ `( G0 M% A' z9 @' t        var announces newBlockHashesData4 Y( H+ S  v9 U6 }
        if err := msg.Decode(&announces); err != nil {
' R$ p( j4 \# {2 |                return errResp(ErrDecode, "%v: %v", msg, err)
- G$ j8 d/ F6 O+ P        }1 Y* V$ B& s7 k4 h' }5 u* h5 x
        // Mark the hashes as present at the remote node
% Y5 ]+ o8 [5 @8 N        for _, block := range announces {
- w" I+ O% J) K0 G- E) v                p.MarkBlock(block.Hash)
, x6 S3 \6 P6 f& q2 f        }
' \5 Z3 O( q0 v/ N        // Schedule all the unknown hashes for retrieval
/ d& W/ v; E1 X- X4 i1 v        // 把本地链没有的块hash找出来,交给fetcher去下载
% I7 C: C) F, |        unknown := make(newBlockHashesData, 0, len(announces))
) D% }" I) H$ e' k4 i' ]        for _, block := range announces {
; j9 W7 h( }, [3 u! e; w                if !pm.blockchain.HasBlock(block.Hash, block.Number) {% m- l$ V+ c8 i0 t. T3 k# ]- H
                        unknown = append(unknown, block)' s4 E3 D/ r# Q2 `
                }
. X. x# P! t1 ^. u  b        }
% y) g' f) p: |  j        for _, block := range unknown {
8 k, u! A: B# _" S: \                pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestOneHeader, p.RequestBodies)0 @" @; \5 c+ l1 F
        }
# ]( R: D* t# i. K! |// Notify announces the fetcher of the potential availability of a new block in
% w* @5 Y! I3 u: X. V' a// the network.7 t& s' M: ^+ L4 s! {5 Y3 {( O; J
// 通知fetcher(自己)有新块产生,没有块实体,有hash、高度等信息
9 T% N! N+ a: Q+ v& z. h& y: d& Lfunc (f *Fetcher) Notify(peer string, hash common.Hash, number uint64, time time.Time,  Q- {$ Y+ j- e0 k, K7 O
        headerFetcher headerRequesterFn, bodyFetcher bodyRequesterFn) error {" b5 ~+ ~( M7 h, c  E
        block := &announce{" ?& g! U. T' \, ?1 j' z& a9 r
                hash:        hash,' Q4 K* g/ `  U. k! T7 {" x6 m
                number:      number,
: d7 {1 [: I2 D% M9 z                time:        time,
( M) S0 t2 V4 |                origin:      peer,
5 Z$ ]4 S2 [/ h' r                fetchHeader: headerFetcher,) Q* ~* n% g/ }9 ]3 Y! R
                fetchBodies: bodyFetcher,
& _1 p4 [) n- l7 Y4 X        }6 l& ?2 I9 ~4 m2 s
        select {
  x' ]$ O. S! ]! P        case f.notify  hashLimit {4 F  I  u0 k% ~+ N
                log.Debug("Peer exceeded outstanding announces", "peer", notification.origin, "limit", hashLimit)% L4 }7 U+ D7 K/ U
                propAnnounceDOSMeter.Mark(1)
: H: {$ N  M, f7 G. ?, {6 {                break
+ c  B7 ~5 z# A  w$ j/ D4 ?4 ~        }- k$ G. K* e, T; Q! }; |, o
        // If we have a valid block number, check that it's potentially useful" W4 l3 K8 ]( C$ t3 n9 J/ Q
        // 高度检查- Q9 Y; h+ [, a
        if notification.number > 0 {
6 w/ j$ J& m1 k+ L1 T                if dist := int64(notification.number) - int64(f.chainHeight()); dist  maxQueueDist {8 q; W  l# F8 D
                        log.Debug("Peer discarded announcement", "peer", notification.origin, "number", notification.number, "hash", notification.hash, "distance", dist)
- {# b; m5 L3 W; B' C* F                        propAnnounceDropMeter.Mark(1)
8 A! U( c' w8 b: u                        break* I) R. m2 h% I) a7 ^0 ~# B  K! Y
                }
3 n0 ^- o$ ~8 n& z9 B0 w" B        }
" _9 \% o1 b/ a        // All is well, schedule the announce if block's not yet downloading9 [) T, e% K  k& Z6 Y, [7 T* R* b
        // 检查是否已经在下载,已下载则忽略5 w9 m7 m. \9 J" ?- ^
        if _, ok := f.fetching[notification.hash]; ok {
* }( R8 y# w3 p% N                break
) z+ _. |7 @5 i9 S        }0 ^: j0 W( _" d6 N
        if _, ok := f.completing[notification.hash]; ok {. `0 _) n; {& u! M5 U
                break5 |* s9 E% W: Y, Z+ }* ]
        }
% ~, m# K: a$ s! I3 K2 C        // 更新peer已经通知给我们的区块数量3 q. Y( ?: v$ Y1 H; l
        f.announces[notification.origin] = count
  m2 k0 H; s" y3 P) V        // 把通知信息加入到announced,供调度
' k& W. p0 Z& @" w        f.announced[notification.hash] = append(f.announced[notification.hash], notification)1 G  e+ R/ ?4 Q9 a. z0 V0 }
        if f.announceChangeHook != nil && len(f.announced[notification.hash]) == 1 {/ [9 I4 {; |5 r9 P: e
                f.announceChangeHook(notification.hash, true)
9 P/ g' X/ i, Q. Z        }- s* p" X  T8 ]) S& `/ b! I
        if len(f.announced) == 1 {8 K" r; B& c6 H5 G
                // 有通知放入到announced,则重设0s定时器,loop的另外一个分支会处理这些通知
  S+ }6 F) a/ L) u/ F9 K                f.rescheduleFetch(fetchTimer)9 t9 S/ q) k. U. O. O
        }
/ H& `1 q9 r0 D. Gfetcher获取完整区块7 l  T1 s, Y% ^5 J2 T% a3 _
本节介绍fetcher获取完整区块的过程,这也是fetcher最重要的功能,会涉及到fetcher至少80%的代码。单独拉放一大节吧。; H" ^' k( `5 @- f. F
Fetcher的大头
" d  C, {8 f+ H" ?5 n: \& s& yFetcher最主要的功能就是获取完整的区块,然后在合适的实际交给InsertChain去验证和插入到本地区块链。我们还是从宏观入手,看Fetcher是如何工作的,一定要先掌握好宏观,因为代码层面上没有这么清晰。
# [$ z6 I6 j# w9 j$ r6 {宏观, Y2 I# Y; ~- m2 z6 U% X
首先,看两个节点是如何交互,获取完整区块,使用时序图的方式看一下,见图6,流程很清晰不再文字介绍。
, Y- D/ L1 J. q7 {+ F! C+ j* Y* H, N3 t, E2 Y
再看下获取区块过程中,fetcher内部的状态转移,它使用状态来记录,要获取的区块在什么阶段,见图7。我稍微解释一下:) L% p/ `9 I% g, Y! a. r
收到NewBlockHashesMsg后,相关信息会记录到announced,进入announced状态,代表了本节点接收了消息。announced由fetcher协程处理,经过校验后,会向给他发送消息的Peer发送请求,请求该区块的区块头,然后进入fetching状态。获取区块头后,如果区块头表示没有交易和uncle,则转移到completing状态,并且使用区块头合成完整的区块,加入到queued优先级队列。获取区块头后,如果区块头表示该区块有交易和uncle,则转移到fetched状态,然后发送请求,请求交易和uncle,然后转移到completing状态。收到交易和uncle后,使用头、交易、uncle这3个信息,生成完整的区块,加入到队列queued。
) ?9 v$ Y3 ?2 d* {/ v1 a1 M4 [/ e% Z; o* V& X1 Y
) _: {# `, q7 e  M# V5 E
微观
2 S8 j* ^3 }8 x0 K0 X, T+ ]$ Q& f接下来就是从代码角度看如何获取完整区块的流程了,有点多,看不懂的时候,再回顾下上面宏观的介绍图。
: Q( m9 G* M. _; t首先看Fetcher的定义,它存放了通信数据和状态管理,捡加注释的看,上文提到的状态,里面都有。
5 \- ]1 g( |; U// Fetcher is responsible for accumulating block announcements from various peers6 R: @) f' f9 f5 c
// and scheduling them for retrieval.$ B+ x, l1 `8 U4 ]' _4 U5 g
// 积累块通知,然后调度获取这些块! M; }' }; j0 F7 e$ Q! v6 @
type Fetcher struct {
7 H1 t$ ?/ Z0 e1 u( P        // Various event channels
& I- W' r; x2 M9 a    // 收到区块hash值的通道
: I) i8 E; R3 N' F7 b        notify chan *announce- W$ K5 R( ~% A. u2 p
    // 收到完整区块的通道
" C) J- e% k1 G8 \" z% `6 z        inject chan *inject
6 C9 {9 F0 G( d0 L! W, A        blockFilter chan chan []*types.Block& F. q# x  e* D/ P5 C
        // 过滤header的通道的通道
1 ^8 G1 S1 g& t  O        headerFilter chan chan *headerFilterTask
$ |/ a4 y* s& y$ D" H' e        // 过滤body的通道的通道
" y& w, y$ {' B% C# ~5 Z+ i) f        bodyFilter chan chan *bodyFilterTask8 ^; M1 |/ g/ x4 F* U6 D, \# c, G
        done chan common.Hash
! m- P/ s  V. M/ ]$ H        quit chan struct{}) {+ [& R: Y" Y8 U6 B4 j
        // Announce states
5 S  ~' F! G% Y% R: v1 _; U( A% H+ @5 a        // Peer已经给了本节点多少区块头通知1 p! |) Y' m5 |
        announces map[string]int // Per peer announce counts to prevent memory exhaustion
' ~0 _! P# I5 e0 {. V        // 已经announced的区块列表; A* w% Y9 A$ `6 ?
        announced map[common.Hash][]*announce // Announced blocks, scheduled for fetching
' N9 k  r7 [. z        // 正在fetching区块头的请求% Z4 U! T6 z& u) X, \) |9 y
        fetching map[common.Hash]*announce // Announced blocks, currently fetching
' f# l) L) g/ a' r5 Z        // 已经fetch到区块头,还差body的请求,用来获取body. I$ r; l& U. Z; L+ @5 L
        fetched map[common.Hash][]*announce // Blocks with headers fetched, scheduled for body retrieval2 s9 X" m. c5 y- Y, \5 S
        // 已经得到区块头的9 Q4 ^" L8 v  h
        completing map[common.Hash]*announce // Blocks with headers, currently body-completing6 h$ S# X  a) S/ _
        // Block cache
9 o' g2 [  w  m0 e( F        // queue,优先级队列,高度做优先级
. g, u9 d: M: q$ o; o4 ?% ^        // queues,统计peer通告了多少块" T) J( `# {# k! ~# ~
        // queued,代表这个块如队列了,+ o1 ^; ~5 k2 s- p
        queue  *prque.Prque            // Queue containing the import operations (block number sorted)4 S5 n+ r0 \" S6 p: g1 @5 H
        queues map[string]int          // Per peer block counts to prevent memory exhaustion
7 ^$ U7 N# a( A! g  \9 P4 H        queued map[common.Hash]*inject // Set of already queued blocks (to dedupe imports)8 f" n5 `. f* K3 O# ~' c8 [
        // Callbacks
1 B4 w7 C, d! @( S! d        getBlock       blockRetrievalFn   // Retrieves a block from the local chain7 }9 V2 m: k7 v1 s
        verifyHeader   headerVerifierFn   // Checks if a block's headers have a valid proof of work,验证区块头,包含了PoW验证
$ Z. Q0 s3 b+ l5 o4 i' F% B1 s        broadcastBlock blockBroadcasterFn // Broadcasts a block to connected peers,广播给peer
6 S! e: q5 I* V2 ^1 f        chainHeight    chainHeightFn      // Retrieves the current chain's height
1 U- g" j. V& E0 k        insertChain    chainInsertFn      // Injects a batch of blocks into the chain,插入区块到链的函数
, F# G9 X. L4 a7 B0 `7 f        dropPeer       peerDropFn         // Drops a peer for misbehaving2 M4 z6 W. A/ ^- Y& D& y$ l7 P; a3 Q5 k
        // Testing hooks1 j7 e$ _0 p- n; X9 L# Y' q
        announceChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a hash from the announce list* Z# Q9 d: h% X2 x& N3 K/ |
        queueChangeHook    func(common.Hash, bool) // Method to call upon adding or deleting a block from the import queue
; P8 K" P, E) h) W4 O. q- R" N/ c        fetchingHook       func([]common.Hash)     // Method to call upon starting a block (eth/61) or header (eth/62) fetch5 |3 {8 o# ^6 _% g5 I% {
        completingHook     func([]common.Hash)     // Method to call upon starting a block body fetch (eth/62)1 p" W% y8 [, D; y3 y
        importedHook       func(*types.Block)      // Method to call upon successful block import (both eth/61 and eth/62)
" a# N8 Y+ b. r/ d! E: M1 |/ U9 R}
3 c; d, y; [1 j; ]NewBlockHashesMsg消息的处理前面的小节已经讲过了,不记得可向前翻看。这里从announced的状态处理说起。loop()        中,fetchTimer超时后,代表了收到了消息通知,需要处理,会从announced中选择出需要处理的通知,然后创建请求,请求区块头,由于可能有很多节点都通知了它某个区块的Hash,所以随机的从这些发送消息的Peer中选择一个Peer,发送请求的时候,为每个Peer都创建了单独的协程。
4 x5 \2 n1 a7 d4 {( j- ocase  arriveTimeout-gatherSlack {
; J; o, l# X* I5 i0 E                        // Pick a random peer to retrieve from, reset all others
4 t# `. Y& z8 E6 S! V" H                        // 可能有很多peer都发送了这个区块的hash值,随机选择一个peer
/ c& c" \5 g' k9 C                        announce := announces[rand.Intn(len(announces))]
& V" R9 T* X! B+ K, X- L                        f.forgetHash(hash)7 k- }5 d9 ~6 Y7 b  O
                        // If the block still didn't arrive, queue for fetching2 c' R6 J/ }& C' t" f, ?
                        // 本地还没有这个区块,创建获取区块的请求
% ?5 s& J8 k2 b, Y( Y                        if f.getBlock(hash) == nil {
  ~% X+ n$ P+ z+ J5 X) ?# I                                request[announce.origin] = append(request[announce.origin], hash)
1 N/ d1 ]# g9 c8 f! x                                f.fetching[hash] = announce9 Z4 c9 h+ M% F6 h
                        }
; d9 p8 S' l: V/ s( {                }
- F- @7 Z* s$ O/ L9 j+ ~        }
% Y3 {( Q, z- e        // Send out all block header requests3 n. x: C; {& m- S# X0 @
        // 把所有的request发送出去6 B) v8 z# r6 ~0 \
        // 为每一个peer都创建一个协程,然后请求所有需要从该peer获取的请求8 ^; Q+ E& e' K8 p# k  T
        for peer, hashes := range request {% T; @9 _& w8 D9 e" X- c
                log.Trace("Fetching scheduled headers", "peer", peer, "list", hashes)
1 P2 A/ K# b8 |# j" o# t$ \/ o" j7 W6 @                // Create a closure of the fetch and schedule in on a new thread
. x' K2 |# @3 N7 j                fetchHeader, hashes := f.fetching[hashes[0]].fetchHeader, hashes
# y. l* M& H2 o                go func() {
9 @( m' R- ^% b% b" N7 f8 W                        if f.fetchingHook != nil {
+ I! Z: D# j! O  _9 B4 O( Y                                f.fetchingHook(hashes)
5 `$ @7 M. l. U, e1 C- q: n                        }1 J8 I# j0 e5 P( S" [
                        for _, hash := range hashes {
' U' l, `. A* q; W* p                                headerFetchMeter.Mark(1)% D8 J! B1 L* \$ c
                                fetchHeader(hash) // Suboptimal, but protocol doesn't allow batch header retrievals7 O2 n7 s/ M0 C
                        }' \4 w6 W- Y) h3 N% s- @
                }()8 d$ o$ ?* W7 l. ~6 O2 \
        }" k, N" x0 s7 c% L: j: X4 B  ]
        // Schedule the next fetch if blocks are still pending7 f' P% O- A6 }
        f.rescheduleFetch(fetchTimer)
+ v/ W" y1 D: E8 ]从Notify的调用中,可以看出,fetcherHeader()的实际函数是RequestOneHeader(),该函数使用的消息是GetBlockHeadersMsg,可以用来请求多个区块头,不过fetcher只请求一个。2 |6 k7 ]$ A3 Z
pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestOneHeader, p.RequestBodies)8 L# e" `+ x! k9 a4 r, E
// RequestOneHeader is a wrapper around the header query functions to fetch a
% _% `4 j9 L# n// single header. It is used solely by the fetcher.
, c; `/ l+ r- ?func (p *peer) RequestOneHeader(hash common.Hash) error {
# g, x/ A  W) ^3 r        p.Log().Debug("Fetching single header", "hash", hash)5 Z2 C: N) {6 M
        return p2p.Send(p.rw, GetBlockHeadersMsg, &getBlockHeadersData{Origin: hashOrNumber{Hash: hash}, Amount: uint64(1), Skip: uint64(0), Reverse: false})
* A2 l7 m# f, L' C3 Q' W}
; h3 x- A7 r7 f) p- dGetBlockHeadersMsg的处理如下:因为它是获取多个区块头的,所以处理起来比较“麻烦”,还好,fetcher只获取一个区块头,其处理在20行~33行,获取下一个区块头的处理逻辑,这里就不看了,最后调用SendBlockHeaders()将区块头发送给请求的节点,消息是BlockHeadersMsg。
* W* A# T' J# d; ]. M···
; r$ ], L3 S9 L" r: v$ ]1 U/ V// handleMsg()
" d% J+ D% \7 k7 j$ x# j- o// Block header query, collect the requested headers and reply
5 R0 ^: `( C% S9 lcase msg.Code == GetBlockHeadersMsg:7 Q9 X+ G8 o$ {, Z
// Decode the complex header query( N0 b, `% V& U; K. d; {
var query getBlockHeadersData
& k- r) a9 M; t+ j3 @if err := msg.Decode(&query); err != nil {6 g! N/ N& L/ s3 o
return errResp(ErrDecode, “%v: %v”, msg, err)( Z: B+ ~$ l2 c$ \# G' M; o
}9 n6 t8 l/ y. t: H4 N1 d
hashMode := query.Origin.Hash != (common.Hash{}). f% t2 M5 E. P& u
// Gather headers until the fetch or network limits is reached
- Q) v3 g, [4 A9 x4 s1 l6 Z// 收集区块头,直到达到限制, T2 C2 z6 @  [) `
var (9 M" p# [. J$ H2 C9 u
        bytes   common.StorageSize' c- i# g" }" z4 u
        headers []*types.Header
4 U4 n' I& {+ T7 I# `* z- ~        unknown bool; U$ ]9 U, ]5 R7 ?* {$ y& n
), O4 ~- Q- a' H, L  w
// 自己已知区块 && 少于查询的数量 && 大小小于2MB && 小于能下载的最大数量6 D/ K! r4 m' `. l+ H2 c" T; k. Y
for !unknown && len(headers)
" Y4 N: I1 ?% [9 J$ v8 T`BlockHeadersMsg`的处理很有意思,因为`GetBlockHeadersMsg`并不是fetcher独占的消息,downloader也可以调用,所以,响应消息的处理需要分辨出是fetcher请求的,还是downloader请求的。它的处理逻辑是:fetcher先过滤收到的区块头,如果fetcher不要的,那就是downloader的,在调用`fetcher.FilterHeaders`的时候,fetcher就将自己要的区块头拿走了。0 v  ?: J! E5 a) G/ E. ^1 R
// handleMsg()
  f8 G1 K! i8 z5 N% Q/ ncase msg.Code == BlockHeadersMsg:1 X* e' O5 N* m% X  P9 N& c
// A batch of headers arrived to one of our previous requests: ^& _+ N8 J! {8 C& B, ^* X
var headers []*types.Header
( b! t3 P2 A( S0 vif err := msg.Decode(&headers); err != nil {
0 a# u6 w" L% T+ Y; w8 v8 Greturn errResp(ErrDecode, “msg %v: %v”, msg, err)2 \3 p" V: C* _# f5 O
}2 {' T# e- }3 u1 E) t/ d3 k
// If no headers were received, but we’re expending a DAO fork check, maybe it’s that9 w/ j7 F* v9 @$ U+ J% t
// 检查是不是当前DAO的硬分叉6 Q) a( ?1 J$ f: }7 t7 |! f
if len(headers) == 0 && p.forkDrop != nil {
- U4 l9 l/ I4 I% f$ V% |* Y/ a// Possibly an empty reply to the fork header checks, sanity check TDs) R/ x9 C; V; w# f; [$ V7 ~, k
verifyDAO := true9 w7 \4 `: y9 H8 T' d
        // If we already have a DAO header, we can check the peer's TD against it. If
( e. r" y7 b- Q0 b( l( {, z0 x- m        // the peer's ahead of this, it too must have a reply to the DAO check
/ w5 J. [! Z5 E2 P        if daoHeader := pm.blockchain.GetHeaderByNumber(pm.chainconfig.DAOForkBlock.Uint64()); daoHeader != nil {. P' V" r' U7 a' {
                if _, td := p.Head(); td.Cmp(pm.blockchain.GetTd(daoHeader.Hash(), daoHeader.Number.Uint64())) >= 0 {
+ W) ~6 P6 z" s4 j  K8 g* v! L                        verifyDAO = false; f/ J( h1 P+ K# A: D& ~7 }# ?/ l$ i
                }
9 U; E9 [7 }: f' Q' N$ P        }
( @3 U) x6 B$ d& j8 D        // If we're seemingly on the same chain, disable the drop timer  o( `! g- z7 F9 s- t) {/ X& E
        if verifyDAO {
3 f2 C. k$ c1 T" c0 A1 [) M                p.Log().Debug("Seems to be on the same side of the DAO fork")7 b- @! ?7 v$ G
                p.forkDrop.Stop()
" k8 T& d+ w. o* y                p.forkDrop = nil( E& {, f3 ^2 B( t* c
                return nil
2 S) o* J1 w! \+ O! U, e1 x! K6 i- l        }
3 s6 v: n7 s! }0 C1 |) b- L- n}0 d- [. U$ X$ C" @2 F/ o9 p
// Filter out any explicitly requested headers, deliver the rest to the downloader
% O! T  y+ u9 D8 [% L/ o2 J1 \// 过滤是不是fetcher请求的区块头,去掉fetcher请求的区块头再交给downloader7 k5 [' l! `  X/ ]" U. t% o
filter := len(headers) == 1; I7 s; T! b7 j7 X0 }; L8 Z
if filter {. t" Y  V2 E' H) C
        // If it's a potential DAO fork check, validate against the rules! E8 B( [$ P9 m( T# D, X
        // 检查是否硬分叉! P, r: u5 P6 f6 F; ~
        if p.forkDrop != nil && pm.chainconfig.DAOForkBlock.Cmp(headers[0].Number) == 0 {
0 L& K0 N. ?) I, @* s                // Disable the fork drop timer7 Y" {4 y4 U4 G  W5 n$ r
                p.forkDrop.Stop()
- R- e6 X% v# x/ V                p.forkDrop = nil
0 n) N( c" |9 O8 q5 k                // Validate the header and either drop the peer or continue
) p& c! U9 T4 z. P+ h; l                if err := misc.VerifyDAOHeaderExtraData(pm.chainconfig, headers[0]); err != nil {: p- M* U) c0 m4 D
                        p.Log().Debug("Verified to be on the other side of the DAO fork, dropping")2 X- X/ x+ x- V: S! z
                        return err
* x: S% M( N8 P: D, r                }  M* U1 E) h- u  u/ o
                p.Log().Debug("Verified to be on the same side of the DAO fork")
/ s2 o" R( X4 ]; G                return nil
" v! E6 I- Q+ N, r- U1 Y        }
2 X; H, Y: |, @, }. J4 f- n        // Irrelevant of the fork checks, send the header to the fetcher just in case! V1 T7 i& q- p6 }% Z8 U
        // 使用fetcher过滤区块头
, d" u& {5 H2 b# \' p& t: {/ X        headers = pm.fetcher.FilterHeaders(p.id, headers, time.Now())8 R) H9 X9 v1 m2 H+ W- y; y5 o
}
3 P9 S! B3 Q- [// 剩下的区块头交给downloader
# u; e! u. R% {6 S% Nif len(headers) > 0 || !filter {) q4 s; o) `1 Z, L  B7 b
        err := pm.downloader.DeliverHeaders(p.id, headers). ~0 r3 G' p, z1 M3 f, l9 ?
        if err != nil {
# A: X( ~% |7 r& L8 ?1 G                log.Debug("Failed to deliver headers", "err", err)) t7 t0 k6 |1 @( N: c  k
        }
. Q- W6 I7 ~7 V& k- x}
" ^: p& a% b" z3 G+ O9 I6 [, I`FilterHeaders()`是一个很有大智慧的函数,看起来耐人寻味,但实在妙。它要把所有的区块头,都传递给fetcher协程,还要获取fetcher协程处理后的结果。`fetcher.headerFilter`是存放通道的通道,而`filter`是存放包含区块头过滤任务的通道。它先把`filter`传递给了`headerFilter`,这样`fetcher`协程就在另外一段等待了,而后将`headerFilterTask`传入`filter`,`fetcher`就能读到数据了,处理后,再将数据写回`filter`而刚好被`FilterHeaders`函数处理了,该函数实际运行在`handleMsg()`的协程中。" K* A% a4 g6 F/ F& E
每个Peer都会分配一个ProtocolManager然后处理该Peer的消息,但`fetcher`只有一个事件处理协程,如果不创建一个`filter`,fetcher哪知道是谁发给它的区块头呢?过滤之后,该如何发回去呢?
# K0 ]5 x$ D' a# M) \. T2 A// FilterHeaders extracts all the headers that were explicitly requested by the fetcher,! Z7 d; n- P6 H7 |
// returning those that should be handled differently.; P0 t% P* c4 [; m7 a
// 寻找出fetcher请求的区块头
) {: d6 E" F4 A: `5 ofunc (f *Fetcher) FilterHeaders(peer string, headers []*types.Header, time time.Time) []*types.Header {
5 N7 ^3 l6 v+ j& flog.Trace(“Filtering headers”, “peer”, peer, “headers”, len(headers))
5 U$ x, \" @3 S& e5 J// Send the filter channel to the fetcher; f* j$ u) J: C# R! H' m) `
// 任务通道- B7 q  Q( \% b1 F# C
filter := make(chan *headerFilterTask)
* @+ T5 ]9 u8 w6 R# v( Tselect {
0 B" Z$ o3 R7 x% z// 任务通道发送到这个通道
; i+ x; m$ v4 W) ^6 Qcase f.headerFilter # U. q7 a% M- O( Q+ J2 m
}7 P' b; [' b! c# r4 U. }, o
接下来要看f.headerFilter的处理,这段代码有90行,它做了一下几件事:9 ?; s3 I2 e9 [; a
1. 从`f.headerFilter`取出`filter`,然后取出过滤任务`task`。
0 i$ W& L" S8 G" _" ^4 s. E2. 它把区块头分成3类:`unknown`这不是分是要返回给调用者的,即`handleMsg()`, `incomplete`存放还需要获取body的区块头,`complete`存放只包含区块头的区块。遍历所有的区块头,填到到对应的分类中,具体的判断可看18行的注释,记住宏观中将的状态转移图。: M) D1 \9 Z; Y$ ^5 C* r
3. 把`unknonw`中的区块返回给`handleMsg()`。
/ i& o& b: N! O! A4 n4. 把` incomplete`的区块头获取状态移动到`fetched`状态,然后触发定时器,以便去处理complete的区块。+ v5 n4 |% d' s) y3 d7 A, N
5. 把`compelete`的区块加入到`queued`。
4 V' O9 z2 Y% C7 K, Z8 P// fetcher.loop()
5 |! \4 ?$ p9 ecase filter := # ~# B2 j( W$ T' ]. M+ ^
// Split the batch of headers into unknown ones (to return to the caller),( _" ]2 N# d, k1 M# h
// known incomplete ones (requiring body retrievals) and completed blocks.
$ r+ C- U& `$ |/ F$ H// unknown的不是fetcher请求的,complete放没有交易和uncle的区块,有头就够了,incomplete放
7 O6 W- I* h' M+ r$ w+ O  U+ v% s// 还需要获取uncle和交易的区块3 P: a, E' @# `8 K, i( W
unknown, incomplete, complete := []*types.Header{}, []*announce{}, []*types.Block{}2 D- z- }$ C9 o
// 遍历所有收到的header9 U+ M% t/ b9 d- p& b' t
for _, header := range task.headers {
! l4 t9 }+ F' J' D' s4 t- `4 b        hash := header.Hash()
2 I% t1 l9 P* |        // Filter fetcher-requested headers from other synchronisation algorithms8 N: a) H9 Y/ F' r6 j1 b( {
        // 是正在获取的hash,并且对应请求的peer,并且未fetched,未completing,未queued
+ u# }; ?- V" ?7 S) }# }  z        if announce := f.fetching[hash]; announce != nil && announce.origin == task.peer && f.fetched[hash] == nil && f.completing[hash] == nil && f.queued[hash] == nil {
" ?' y/ ?; u# F* g                // If the delivered header does not match the promised number, drop the announcer  K. C( ?0 t; W5 X# v4 k- f
                // 高度校验,竟然不匹配,扰乱秩序,peer肯定是坏蛋。
$ ^( b- y9 e  j, I/ A0 X0 A                if header.Number.Uint64() != announce.number {
4 g4 L4 V* z8 _$ Y$ V: [# q                        log.Trace("Invalid block number fetched", "peer", announce.origin, "hash", header.Hash(), "announced", announce.number, "provided", header.Number)
/ _6 ^: A' _- a& N/ s                        f.dropPeer(announce.origin)" B; W  B- S5 P" }: s, R# h
                        f.forgetHash(hash): Z. z/ e" \6 F# \/ W4 S/ }$ r3 _" o
                        continue
: c0 C3 ^6 L* `4 t! d+ i8 k                }1 U$ t5 r: E! \  d
                // Only keep if not imported by other means" `' B! h! Q! G/ Q( c# K; E
                // 本地链没有当前区块
5 {: Z+ d& j/ W                if f.getBlock(hash) == nil {  l7 L% `8 B2 r' {7 u" }
                        announce.header = header
5 f7 P. V8 H+ \, o                        announce.time = task.time
1 F+ \& g. s% }0 [  }. l                        // If the block is empty (header only), short circuit into the final import queue
' v$ i: K; Q) n                        // 如果区块没有交易和uncle,加入到complete
) m. o1 L0 v; u5 x0 x7 P4 t                        if header.TxHash == types.DeriveSha(types.Transactions{}) && header.UncleHash == types.CalcUncleHash([]*types.Header{}) {
$ C$ C4 P8 d  A; f                                log.Trace("Block empty, skipping body retrieval", "peer", announce.origin, "number", header.Number, "hash", header.Hash())$ ?  J. z- u% x# v- c$ p+ ]; A
                                block := types.NewBlockWithHeader(header)! [5 y+ a8 k& r& L" C, N5 L7 F
                                block.ReceivedAt = task.time' c/ l3 S. t2 N
                                complete = append(complete, block)
+ [9 `  B* [9 v3 r# ?( N                                f.completing[hash] = announce
" d; g/ N9 u- f6 h7 f- _                                continue
) M# m9 L0 e7 s% g; l8 V9 E                        }$ O2 h: o8 ^5 `7 L/ p$ P
                        // Otherwise add to the list of blocks needing completion! v' q$ H) d0 v& o4 `) R
                        // 否则就是不完整的区块
6 h, q4 ^5 w* z                        incomplete = append(incomplete, announce)6 I. k+ A  @; n+ O0 @
                } else {
7 k/ w! Z9 e! H- h1 B" B                        log.Trace("Block already imported, discarding header", "peer", announce.origin, "number", header.Number, "hash", header.Hash())7 E+ }, |; b$ W/ K; h$ z
                        f.forgetHash(hash)8 y3 L  \0 S& [
                }
5 q3 x8 [% d& O: u; F" y        } else {
* N7 y" S  q" ^( l5 m+ _, @5 l. y6 J) n                // Fetcher doesn't know about it, add to the return list( R; Z& G  U8 n4 v6 G
                // 没请求过的header
) t/ ?& m9 A2 x2 f# S) o: S  ?                unknown = append(unknown, header)
/ z# H' n4 Y1 ^( w! a        }
) I- F3 A* z! i, F4 I/ a0 y}
5 E- i; }, g) c; q) }- B4 W# Q// 把未知的区块头,再传递会filter
6 x6 N: ?- V3 [/ vheaderFilterOutMeter.Mark(int64(len(unknown)))
/ r; y% h) F# z. O8 H4 \, mselect {
: a8 H8 n/ J" ~9 C+ a' R  ]case filter 1 D! E, j( W. ~4 F4 y: R
跟随状态图的转义,剩下的工作是`fetched`转移到`completing`        ,上面的流程已经触发了`completeTimer`定时器,超时后就会处理,流程与请求Header类似,不再赘述,此时发送的请求消息是`GetBlockBodiesMsg`,实际调的函数是`RequestBodies`。
8 s! X5 f8 d4 E# ^* J* ~: v1 M( X// fetcher.loop()
2 S3 F1 I5 k" ucase
) _% ]# R$ s: r4 B' Q# v; |8 J' l4 }! I// 遍历所有待获取body的announce
% I" T* ?$ Z% S8 U! Kfor hash, announces := range f.fetched {
" V8 Q5 B; `/ W2 U1 w% N# ]        // Pick a random peer to retrieve from, reset all others
# w2 Z1 e: e6 h$ v9 w) B; |        // 随机选一个Peer发送请求,因为可能已经有很多Peer通知它这个区块了7 @2 {* g$ {5 D2 k0 d0 A$ D2 y! R
        announce := announces[rand.Intn(len(announces))]' G4 e/ n4 {3 Q  U  I6 I% W
        f.forgetHash(hash)1 G0 l5 j- ~( W: i1 F  H
        // If the block still didn't arrive, queue for completion
9 C' J* L. R) @& m. L! i% }        // 如果本地没有这个区块,则放入到completing,创建请求2 d# ~1 ]6 I- W& u7 B) c
        if f.getBlock(hash) == nil {" S- z1 N. d3 t0 {9 V& J
                request[announce.origin] = append(request[announce.origin], hash)
# f* e5 L3 `: y' U, W: N                f.completing[hash] = announce  M- n4 d. C( E) L
        }; B1 x. n: w) ^# z- r3 a. N
}5 Q$ ]$ T- G6 Z
// Send out all block body requests( @! x# L1 P1 T& b2 i8 n
// 发送所有的请求,获取body,依然是每个peer一个单独协程
& O7 J0 w9 w% Y4 p* ofor peer, hashes := range request {' V1 i* Y) a0 g" f0 {2 y
        log.Trace("Fetching scheduled bodies", "peer", peer, "list", hashes)
8 S- V8 z; B4 I        // Create a closure of the fetch and schedule in on a new thread2 C% e5 f$ f; B+ e) E* L/ g6 O: D
        if f.completingHook != nil {
. m8 }9 n  v+ ~' t  ]* j                f.completingHook(hashes)
: c% w2 |! i) d5 X; C        }8 n6 i) D: Q. i
        bodyFetchMeter.Mark(int64(len(hashes)))
1 B0 m" R: k9 L        go f.completing[hashes[0]].fetchBodies(hashes)4 |6 k; r+ D+ f4 J' |; i
}; Y" t* j4 y( N
// Schedule the next fetch if blocks are still pending
' K4 s! Z  P$ D: f% [f.rescheduleComplete(completeTimer)9 x8 S5 H  i: n: `
`handleMsg()`处理该消息也是干净利落,直接获取RLP格式的body,然后发送响应消息。
& V! N) ^( x* h5 }4 V0 K0 e" [# v// handleMsg()
8 z" B1 W; }3 K5 Hcase msg.Code == GetBlockBodiesMsg:
9 _- d  F' e* ]6 C+ `+ O// Decode the retrieval message
! n- D* C# g) E) ~* x& |1 [4 PmsgStream := rlp.NewStream(msg.Payload, uint64(msg.Size)), z6 W& K, U3 E4 z; }8 f
if _, err := msgStream.List(); err != nil {7 S3 w$ E! k: i  j8 S
return err1 P4 f1 a. ]5 q2 M. W2 [. Q& x
}0 \) D7 b% Q0 ~9 u  z
// Gather blocks until the fetch or network limits is reached
, x4 }% X- _, U  d4 V+ m3 E2 X0 \var (
! R1 f+ ?: f9 y0 x+ ?* Thash   common.Hash
2 e9 [# |+ a6 w# r0 l" sbytes  int
6 J' G8 k3 a, K7 `4 Ybodies []rlp.RawValue0 K$ j( P1 _& y+ A- d; H: p
), v6 c/ H! Q; O9 K: `* O
// 遍历所有请求% G3 P4 i& L2 p7 a. I4 |2 e1 T
for bytes 0 j/ W& o! y" ?6 ?9 K
响应消息`BlockBodiesMsg`的处理与处理获取header的处理原理相同,先交给fetcher过滤,然后剩下的才是downloader的。需要注意一点,响应消息里只包含交易列表和叔块列表。& a3 n# ?6 y7 ~3 Q
// handleMsg(): n! \6 P& P: S0 C+ A8 F0 r
case msg.Code == BlockBodiesMsg:4 x5 T( r8 L; _, z' C$ x
// A batch of block bodies arrived to one of our previous requests* d( h; z0 A( h
var request blockBodiesData
  E1 e0 O6 ^. e5 Z: cif err := msg.Decode(&request); err != nil {. u8 r1 I! z3 W: K4 o
return errResp(ErrDecode, “msg %v: %v”, msg, err)
  H$ }7 E( j3 q% q) e}
8 ?  r  Z+ e, N8 A// Deliver them all to the downloader for queuing
5 W/ x" _- H5 D// 传递给downloader去处理3 K" t1 s4 \; s3 s
transactions := make([][]*types.Transaction, len(request))
9 S" R) T( V( y; I3 Y5 k" Z3 J8 Ouncles := make([][]*types.Header, len(request))9 _1 ^0 a; a9 x# a/ l0 [4 s
for i, body := range request {0 h7 w' L8 M* `# b# U! H1 k2 j& H
        transactions = body.Transactions2 {+ u* h8 W0 u  b" p6 {) a
        uncles = body.Uncles; k  O: R7 P# @2 Q8 v' C/ J; l
}. |) b2 a, a! S) f9 F5 ^
// Filter out any explicitly requested bodies, deliver the rest to the downloader8 W+ Y6 u) V& g+ q( W1 s0 j6 z/ t
// 先让fetcher过滤去fetcher请求的body,剩下的给downloader
- M4 R3 e) P# a+ l7 r' [% @8 s& wfilter := len(transactions) > 0 || len(uncles) > 0
, x( i  _" @! N% d1 j: k# Fif filter {
! ?$ x& R, r' q7 b) l        transactions, uncles = pm.fetcher.FilterBodies(p.id, transactions, uncles, time.Now()): S+ F0 A. L  l. t! j! H
}
% k6 T" c; u- d) S/ x// 剩下的body交给downloader9 g( M& L$ ^# `1 l; \9 Y
if len(transactions) > 0 || len(uncles) > 0 || !filter {; s$ K  s8 h: O  M
        err := pm.downloader.DeliverBodies(p.id, transactions, uncles)
$ \$ q( O! N- R; w7 u        if err != nil {
9 t) y* S  e) p; I                log.Debug("Failed to deliver bodies", "err", err), {1 L  ^& z% W
        }4 r4 t# @5 z4 C
}- x: w1 Z% Q( Y' u+ y! G+ g
过滤函数的原理也与Header相同。1 K; P2 ]/ f2 Y0 x
// FilterBodies extracts all the block bodies that were explicitly requested by
) V  \* n4 Q/ G$ \. O' T9 S9 h+ P// the fetcher, returning those that should be handled differently.
! T( n. q' H; F+ s- M* u// 过去出fetcher请求的body,返回它没有处理的,过程类型header的处理
0 _% n+ y% Z$ Cfunc (f *Fetcher) FilterBodies(peer string, transactions [][]*types.Transaction, uncles [][]*types.Header, time time.Time) ([][]*types.Transaction, [][]*types.Header) {
, c3 A: e4 I5 t. ?* i6 y# {9 Ilog.Trace(“Filtering bodies”, “peer”, peer, “txs”, len(transactions), “uncles”, len(uncles))
6 K% i- q0 ?9 v// Send the filter channel to the fetcher4 z- l- _; U4 \- v. y6 P  @; S- h
filter := make(chan *bodyFilterTask)
- [9 ]5 R7 G; I* V' eselect {
& \* \0 |" ]* Q' |( z4 }7 Tcase f.bodyFilter
, K+ y7 _9 }" E% T% r}* {2 L1 s% ~5 q: g2 s) ]5 u
实际过滤body的处理瞧一下,这和Header的处理是不同的。直接看不点:- U* O& B, d5 O2 s' P8 {
1. 它要的区块,单独取出来存到`blocks`中,它不要的继续留在`task`中。
8 P$ r$ r1 m9 W' j! {% f" a2. 判断是不是fetcher请求的方法:如果交易列表和叔块列表计算出的hash值与区块头中的一样,并且消息来自请求的Peer,则就是fetcher请求的。
1 U: \, L' y- D9 {3. 将`blocks`中的区块加入到`queued`,终结。  X3 e' C& ]4 G/ a
case filter :=
# G# v/ P' |/ O9 b" {! Jblocks := []*types.Block{}
* V' b8 x. {+ T4 |// 获取的每个body的txs列表和uncle列表8 J, e6 \/ E  u# M+ x0 N  H
// 遍历每个区块的txs列表和uncle列表,计算hash后判断是否是当前fetcher请求的body+ K  y1 ^5 t! h8 d& T6 I
for i := 0; i
9 f1 ^* ?3 t* L, v% l0 j3 W1 i}
5 b+ V* e: @* }/ @7 b8 l+ \# ~: ^- x
$ [2 f2 v8 O: s$ s3 K' n7 `至此,fetcher获取完整区块的流程讲完了,fetcher模块中80%的代码也都贴出来了,还有2个值得看看的函数:
# q+ ?( g- c1 g2 T; {& e. _7 U- N7 |# c; s1. `forgetHash(hash common.Hash)`:用于清空指定hash指的记/状态录信息。, x; y' ]+ k6 r# k# g( J4 H
2. `forgetBlock(hash common.Hash)`:用于从队列中移除一个区块。) \$ S5 K6 ^4 e
最后了,再回到开始看看fetcher模块和新区块的传播流程,有没有豁然开朗。6 w- D% [. \: j" d  u

3 Y; h2 P+ |( E/ K) I) i
BitMere.com 比特池塘系信息发布平台,比特池塘仅提供信息存储空间服务。
声明:该文观点仅代表作者本人,本文不代表比特池塘立场,且不构成建议,请谨慎对待。
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

成为第一个吐槽的人

刘艳琴 小学生
  • 粉丝

    0

  • 关注

    0

  • 主题

    3