Hi 游客

更多精彩,请登录!

比特池塘 区块链技术 正文

以太坊源码分析:fetcher模块和区块传播

刘艳琴
161 0 0
从区块传播策略入手,介绍新区块是如何传播到远端节点,以及新区块加入到远端节点本地链的过程,同时会介绍fetcher模块,fetcher的功能是处理Peer通知的区块信息。在介绍过程中,还会涉及到p2p,eth等模块,不会专门介绍,而是专注区块的传播和加入区块链的过程。5 V  ^4 Y% C" c
当前代码是以太坊Release 1.8,如果版本不同,代码上可能存在差异。) N; }  d7 q! {$ T, W
总体过程和传播策略; w! g  q8 _) b/ x- @2 C
本节从宏观角度介绍,节点产生区块后,为了传播给远端节点做了啥,远端节点收到区块后又做了什么,每个节点都连接了很多Peer,它传播的策略是什么样的?
: t8 w8 q! b- ^3 z7 y' q8 h( {总体流程和策略可以总结为,传播给远端Peer节点,Peer验证区块无误后,加入到本地区块链,继续传播新区块信息。具体过程如下。
9 _! ^' ~7 G0 X- g先看总体过程。产生区块后,miner模块会发布一个事件NewMinedBlockEvent,订阅事件的协程收到事件后,就会把新区块的消息,广播给它所有的peer,peer收到消息后,会交给自己的fetcher模块处理,fetcher进行基本的验证后,区块没问题,发现这个区块就是本地链需要的下一个区块,则交给blockChain进一步进行完整的验证,这个过程会执行区块所有的交易,无误后把区块加入到本地链,写入数据库,这个过程就是下面的流程图,图1。3 z, O, T  G1 v# q5 }  O
; n# l& B3 }; j; T1 _" ?
总体流程图,能看到有个分叉,是因为节点传播新区块是有策略的。它的传播策略为:3 [; s0 y# r. K" Z+ }2 @; s! q
假如节点连接了N个Peer,它只向Peer列表的sqrt(N)个Peer广播完整的区块消息。向所有的Peer广播只包含区块Hash的消息。0 D" r6 \: ]2 w' ?. C
策略图的效果如图2,红色节点将区块传播给黄色节点:
0 ^) h1 E+ v  O
+ |. g+ \" W6 P. G+ K. D# q. V
4 z) K0 R( Z9 h; l! H7 \收到区块Hash的节点,需要从发送给它消息的Peer那里获取对应的完整区块,获取区块后就会按照图1的流程,加入到fetcher队列,最终插入本地区块链后,将区块的Hash值广播给和它相连,但还不知道这个区块的Peer。非产生区块节点的策略图,如图3,黄色节点将区块Hash传播给青色节点:* `0 C9 R& |) O) u- {6 V

7 r! j1 U! C. [3 T" `至此,可以看出以太坊采用以石击水的方式,像水纹一样,层层扩散新产生的区块。5 R* \0 m( [  k# f- G; p, j+ T$ z: J
Fetcher模块是干啥的5 @( |' Q& d1 R4 Y" {
fetcher模块的功能,就是收集其他Peer通知它的区块信息:1)完整的区块2)区块Hash消息。根据通知的消息,获取完整的区块,然后传递给eth模块把区块插入区块链。
( k; Q2 @5 f2 L1 D2 L- W如果是完整区块,就可以传递给eth插入区块,如果只有区块Hash,则需要从其他的Peer获取此完整的区块,然后再传递给eth插入区块
% r8 j' f, ]' ~! j* l) g2 k$ `/ J2 c, b: a1 B+ Y
源码解读1 C8 W4 L% S$ d/ K5 U& n7 E( O
本节介绍区块传播和处理的细节东西,方式仍然是先用图解释流程,再是代码流程。$ g  S3 a/ ~4 i4 `' {$ b
产块节点的传播新区块
& J+ R- K4 _5 o! ~0 L& N0 Q% M节点产生区块后,广播的流程可以表示为图4:( I" H) D1 p$ O/ c) a
发布事件事件处理函数选择要广播完整的Peer,然后将区块加入到它们的队列事件处理函数把区块Hash添加到所有Peer的另外一个通知队列每个Peer的广播处理函数,会遍历它的待广播区块队列和通知队列,把数据封装成消息,调用P2P接口发送出去- o0 _3 V( n! A. Y% {1 B. A$ j

3 u( k+ O9 x7 W  p/ \. J  W7 S5 K/ t- K: a3 g
再看下代码上的细节。" B$ k3 C) c6 ^/ n
worker.wait()函数发布事件NewMinedBlockEvent。ProtocolManager.minedBroadcastLoop()是事件处理函数。它调用了2次pm.BroadcastBlock()。
. C( ?, t, \9 l0 Q8 ^/ u
6 |6 X9 |  h! C! l8 _8 P// Mined broadcast loop& z: N# x6 k8 L8 |4 h1 z
func (pm *ProtocolManager) minedBroadcastLoop() {8 P; A9 m2 \; s$ E
        // automatically stops if unsubscribe
7 G) q( w0 L- |2 A. J2 K( s        for obj := range pm.minedBlockSub.Chan() {7 m3 N/ @4 ?: g, S% m& x" |
                switch ev := obj.Data.(type) {: K% v# I* I# H2 L" r' F- B; M
                case core.NewMinedBlockEvent:5 Q  B# |; z. J; n) t- s
                        pm.BroadcastBlock(ev.Block, true)  // First propagate block to peers: K' U( |( `. s$ |. M
                        pm.BroadcastBlock(ev.Block, false) // Only then announce to the rest
% x' l& z9 U; s! t                }
7 d0 e' [, g4 H( A        }
7 x+ P2 O3 s) ~1 j$ ^" V}% d+ _( h" j. W# ]% Q: f2 d0 R
pm.BroadcastBlock()的入参propagate为真时,向部分Peer广播完整的区块,调用peer.AsyncSendNewBlock(),否则向所有Peer广播区块头,调用peer.AsyncSendNewBlockHash(),这2个函数就是把数据放入队列,此处不再放代码。
2 c, V. |! G4 u) @' z6 g// BroadcastBlock will either propagate a block to a subset of it's peers, or
+ n2 ?6 N( f; r4 d+ _// will only announce it's availability (depending what's requested).3 E5 h8 ?8 W+ s
func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) {% F5 Q% x7 R: a0 O; _1 ]
        hash := block.Hash()* C  I( t: |1 `# p
        peers := pm.peers.PeersWithoutBlock(hash)" a  d) z9 c! ~
        // If propagation is requested, send to a subset of the peer2 o9 \4 v( n" @
        // 这种情况,要把区块广播给部分peer
, M- G6 M2 N; {' p        if propagate {' S0 r( s* Q. l, L+ D1 S4 v! Z
                // Calculate the TD of the block (it's not imported yet, so block.Td is not valid)
5 Q" v, g5 j. @7 s+ B4 {  i                // 计算新的总难度$ E# N9 ?  R% s1 L
                var td *big.Int
2 e# w2 H9 T& o& T                if parent := pm.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1); parent != nil {# `. U0 J: d3 z/ f' U* D0 y
                        td = new(big.Int).Add(block.Difficulty(), pm.blockchain.GetTd(block.ParentHash(), block.NumberU64()-1))- O. n4 `; Q. d0 r$ L) l8 i- ?
                } else {
% U9 d9 J3 _! e- `# {9 b3 N                        log.Error("Propagating dangling block", "number", block.Number(), "hash", hash)
8 s! x. {9 [3 ?                        return
4 w* v& S, L; n+ ^, n9 g                }
( l1 I! N2 ~# N4 ]  k1 l2 T                // Send the block to a subset of our peers9 ^& E4 C3 Q" R) l( {' L- O# Z
                // 广播区块给部分peer" ?% _: ]! S) i8 x3 `) r
                transfer := peers[:int(math.Sqrt(float64(len(peers))))]: }: V, r* g+ w2 ]( t1 O! X
                for _, peer := range transfer {. d( H. R& Z0 z, W& n- o. T( b- B, n
                        peer.AsyncSendNewBlock(block, td)7 o3 M" h3 L) }3 t; U1 f+ G
                }5 n- p; a: k- R/ o  o0 ?
                log.Trace("Propagated block", "hash", hash, "recipients", len(transfer), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))
3 ^: z) r' M2 t/ C: ^                return
# t3 l2 C( {1 D3 ?        }
' P) R. H- U) k        // Otherwise if the block is indeed in out own chain, announce it' m) ~0 J  y* }: \  N; A( w
        // 把区块hash值广播给所有peer
( i1 ]1 h% o0 J4 l        if pm.blockchain.HasBlock(hash, block.NumberU64()) {" P; U% P4 r2 ~) b3 Y5 u" H2 f
                for _, peer := range peers {
+ @+ d$ H8 U. T  d7 O                        peer.AsyncSendNewBlockHash(block)
, q, P5 t2 u* r+ {4 ~% g5 @                }* D- @" c- E9 Z5 e* b
                log.Trace("Announced block", "hash", hash, "recipients", len(peers), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))4 ]( ?* |$ S8 ^7 i0 u3 d% m
        }6 E- ^- h" T1 o' W7 B, L" N
}$ }4 `  q0 F& P& C; C
peer.broadcase()是每个Peer连接的广播函数,它只广播3种消息:交易、完整的区块、区块的Hash,这样表明了节点只会主动广播这3中类型的数据,剩余的数据同步,都是通过请求-响应的方式。" `5 V% D0 @& }* I# T" ^
// broadcast is a write loop that multiplexes block propagations, announcements
3 {- t1 }: T4 K/ j// and transaction broadcasts into the remote peer. The goal is to have an async
* X* M. Q* Q4 c/ C// writer that does not lock up node internals.
6 F+ s8 P- w) t/ J$ I/ v/ c. Vfunc (p *peer) broadcast() {! V: H' ~$ }' b4 G
        for {
$ p! ]6 X; d2 {                select {: p; a, b6 o: W+ n- P8 h5 x9 |
                // 广播交易6 s+ S, W6 V& ]' T
                case txs :=
* I& p# A0 s, hPeer节点处理新区块
: s# A" w4 t2 S1 F; Q1 H本节介绍远端节点收到2种区块同步消息的处理,其中NewBlockMsg的处理流程比较清晰,也简洁。NewBlockHashesMsg消息的处理就绕了2绕,从总体流程图1上能看出来,它需要先从给他发送消息Peer那里获取到完整的区块,剩下的流程和NewBlockMsg又一致了。
& U' `  b1 q0 x% ^7 g; e' k! W3 l这部分涉及的模块多,画出来有种眼花缭乱的感觉,但只要抓住上面的主线,代码看起来还是很清晰的。通过图5先看下整体流程。
7 R5 v( x$ X; x- w" G6 j消息处理的起点是ProtocolManager.handleMsg,NewBlockMsg的处理流程是蓝色标记的区域,红色区域是单独的协程,是fetcher处理队列中区块的流程,如果从队列中取出的区块是当前链需要的,校验后,调用blockchian.InsertChain()把区块插入到区块链,最后写入数据库,这是黄色部分。最后,绿色部分是NewBlockHashesMsg的处理流程,代码流程上是比较复杂的,为了能通过图描述整体流程,我把它简化掉了。8 t) c: g0 o$ |9 a& g* r! r

+ G& ]; G" x$ R& }. |仔细看看这幅图,掌握整体的流程后,接下来看每个步骤的细节。2 e% T( T1 H5 G) a* {! R- t) Q. P7 f$ p7 Y
NewBlockMsg的处理6 z' X8 |" s7 o& d. f
本节介绍节点收到完整区块的处理,流程如下:
+ g  I9 Q! L9 L  z( ]首先进行RLP编解码,然后标记发送消息的Peer已经知道这个区块,这样本节点最后广播这个区块的Hash时,不会再发送给该Peer。3 V9 \$ _! ]- D; i- u/ g
将区块存入到fetcher的队列,调用fetcher.Enqueue。
+ S# s8 H( j! q: S( X8 n更新Peer的Head位置,然后判断本地链是否落后于Peer的链,如果是,则通过Peer更新本地链。7 w$ w+ C$ i" v8 `7 w8 S
只看handle.Msg()的NewBlockMsg相关的部分。1 D" z( h' C  K9 k; `( U
case msg.Code == NewBlockMsg:5 P- ?! {2 y% _" d* a: W( v5 a/ N
        // Retrieve and decode the propagated block
' e! g* d9 L, ^. R- u# @        // 收到新区块,解码,赋值接收数据
) ]- Y0 W$ w( d/ N        var request newBlockData
& h' n" l% I" [! `( R        if err := msg.Decode(&request); err != nil {
1 J2 d8 h! o) |3 {+ ^3 D5 ~( Y                return errResp(ErrDecode, "%v: %v", msg, err)
! |* l" L1 o9 \  w/ [6 {, @$ V4 y: }        }4 i. i! V! |8 ~) P; C
        request.Block.ReceivedAt = msg.ReceivedAt
# H* L: A7 Q. o0 v) Q( O+ i# N        request.Block.ReceivedFrom = p
% n* {4 E6 e1 @7 L. ?) H" B        // Mark the peer as owning the block and schedule it for import$ n6 C; _, e: k# [; u0 K" s; [: t
        // 标记peer知道这个区块  y+ {2 r5 A1 }4 @3 K; m
        p.MarkBlock(request.Block.Hash())3 g& V8 @; _. q4 ?1 I( |+ N. ^
        // 为啥要如队列?已经得到完整的区块了/ R) H% d5 S1 V  L3 [
        // 答:存入fetcher的优先级队列,fetcher会从队列中选取当前高度需要的块
7 h  x: |9 Z- Y$ l        pm.fetcher.Enqueue(p.id, request.Block)' K) l1 W9 O, }) v  T
        // Assuming the block is importable by the peer, but possibly not yet done so,
. x, B6 W2 ]5 |& q, g& c% h, S        // calculate the head hash and TD that the peer truly must have.
1 E, X' j9 E) e% @        // 截止到parent区块的头和难度7 ~% A9 Z+ y5 z0 [6 Y
        var (
: I- O& u  I; Q                trueHead = request.Block.ParentHash()
# |: |. R, J  u- v                trueTD   = new(big.Int).Sub(request.TD, request.Block.Difficulty())3 l' d9 P) t; s& t. _; v
        )( I9 q1 f9 G2 T
        // Update the peers total difficulty if better than the previous
# s; o: T) @5 z  Q! {6 _        // 如果收到的块的难度大于peer之前的,以及自己本地的,就去和这个peer同步
, k7 n: Y/ Y+ ~+ q8 m1 R        // 问题:就只用了一下块里的hash指,为啥不直接使用这个块呢,如果这个块不能用,干嘛不少发送些数据,减少网络负载呢。
! g3 C1 b5 I3 _5 C7 ]# i2 Y! ]: H. O        // 答案:实际上,这个块加入到了优先级队列中,当fetcher的loop检查到当前下一个区块的高度,正是队列中有的,则不再向peer请求! n) n# ]  e$ ]# k
        // 该区块,而是直接使用该区块,检查无误后交给block chain执行insertChain6 V% @& k' J# I. Y3 z" t
        if _, td := p.Head(); trueTD.Cmp(td) > 0 {& N' M) M8 V6 u
                p.SetHead(trueHead, trueTD); Q1 ^# X+ ^0 r! w1 u0 f& F
                // Schedule a sync if above ours. Note, this will not fire a sync for a gap of
6 b0 `/ Q3 F5 ^% ^                // a singe block (as the true TD is below the propagated block), however this6 d# R2 M) R$ f: o& a/ C% c: Y
                // scenario should easily be covered by the fetcher.  @. f# }6 k, N0 ~! \6 s
                currentBlock := pm.blockchain.CurrentBlock()
! {6 H( X; P( C6 j; z7 o                if trueTD.Cmp(pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64())) > 0 {/ {& ^: L5 `/ k' O
                        go pm.synchronise(p)% q7 F! S0 _# h# D
                }7 j5 u8 j% E; \' ?) y$ I. i
        }; }% R# [. w  G* z, J
//------------------------ 以上 handleMsg
( y- J& m9 U8 S9 y% X1 [// Enqueue tries to fill gaps the the fetcher's future import queue.
5 a5 J5 u* {- Z9 p/ }// 发给inject通道,当前协程在handleMsg,通过通道发送给fetcher的协程处理& I# H) A# K; T& r. k8 W- ~
func (f *Fetcher) Enqueue(peer string, block *types.Block) error {* T* a3 E0 w8 u8 Z. y/ R8 Q8 N
        op := &inject{" j7 d9 Q$ w* T4 O
                origin: peer,1 F" g9 N) a& ]! J' C% X; L2 B
                block:  block,
6 U% l1 T# }8 y. @9 p. B        }; r( \' D% r" a! y# ~0 [0 ?) F" [& c
        select {6 ?) A! a$ t0 O) J7 L
        case f.inject  blockLimit {8 {3 w3 Y- Q6 _, p* c/ d  U
                log.Debug("Discarded propagated block, exceeded allowance", "peer", peer, "number", block.Number(), "hash", hash, "limit", blockLimit)( z; Z" q1 D9 ~* v& ~. M/ K1 Q
                propBroadcastDOSMeter.Mark(1)  Z4 i6 P) d7 X4 A6 p
                f.forgetHash(hash)
, p4 p2 x8 c8 c/ l& {                return
1 w+ s7 O. ^9 T" }! h        }. d1 y/ q4 X5 `3 W
        // Discard any past or too distant blocks% z5 t) f0 @7 \( f8 Y3 v
        // 高度检查:未来太远的块丢弃
3 Y) k* B1 I1 N) B* Z! r6 a3 ~        if dist := int64(block.NumberU64()) - int64(f.chainHeight()); dist  maxQueueDist {, {: J( o  S# n+ `8 W" {5 r
                log.Debug("Discarded propagated block, too far away", "peer", peer, "number", block.Number(), "hash", hash, "distance", dist)
; d; f( U; X5 O$ Y4 C: ]* c% j1 u. C8 M                propBroadcastDropMeter.Mark(1)2 V# t$ l; {1 o; A1 [& D
                f.forgetHash(hash)
- w4 X0 r4 W! H4 i. C/ ?                return  t+ \$ e" j2 p( M
        }
) v6 ^6 g" h' g2 t3 J        // Schedule the block for future importing6 `5 X$ W5 h; {3 Z
        // 块先加入优先级队列,加入链之前,还有很多要做
: B. ]9 B9 P' p4 c! T& s        if _, ok := f.queued[hash]; !ok {
; X4 |7 u) m7 u+ c# m                op := &inject{
* S" P* q' K# z; ]9 B                        origin: peer,$ {5 T3 c# Q2 R" z  L
                        block:  block," Z* p- n9 S" z5 ]+ G) Z
                }% i# x$ u, ]/ x( `
                f.queues[peer] = count" j$ q) Z1 `% q+ C5 i" H! O* v
                f.queued[hash] = op. Y  K) G$ D* s# y! v0 Z! E0 H
                f.queue.Push(op, -float32(block.NumberU64()))
' K$ `2 p  n0 d2 v) k                if f.queueChangeHook != nil {
  s: g( s; [  U' ?; u                        f.queueChangeHook(op.block.Hash(), true)6 @8 v* m/ K, J3 U6 R* x' l7 a
                }9 {# t  \# E# `% r% Y+ h
                log.Debug("Queued propagated block", "peer", peer, "number", block.Number(), "hash", hash, "queued", f.queue.Size())
( J( I" @0 b# E- z4 A/ @        }
8 H2 a. d2 Y' r% Q& ?}
, R0 Q& q+ H2 M9 ifetcher队列处理
% {* X3 I4 y; F% u本节我们看看,区块加入队列后,fetcher如何处理区块,为何不直接校验区块,插入到本地链?
) [, l2 J3 P6 k. E: ]由于以太坊又Uncle的机制,节点可能收到老一点的一些区块。另外,节点可能由于网络原因,落后了几个区块,所以可能收到“未来”的一些区块,这些区块都不能直接插入到本地链。8 ~; L8 U- |; X! _
区块入的队列是一个优先级队列,高度低的区块会被优先取出来。fetcher.loop是单独协程,不断运转,清理fecther中的事务和事件。首先会清理正在fetching的区块,但已经超时。然后处理优先级队列中的区块,判断高度是否是下一个区块,如果是则调用f.insert()函数,校验后调用BlockChain.InsertChain(),成功插入后,广播新区块的Hash
' Y$ }( }# J8 I2 }// Loop is the main fetcher loop, checking and processing various notification
) e3 q9 C) I$ f0 D$ }, `// events.+ W9 r& C: L( F; T$ R( G, ~
func (f *Fetcher) loop() {
. q. f. i9 \, f: f2 s7 X" B        // Iterate the block fetching until a quit is requested
% y+ [4 B; b0 _; `1 x7 `        fetchTimer := time.NewTimer(0)
/ M. e0 j8 b/ b1 H" L/ y6 p        completeTimer := time.NewTimer(0): I3 N+ C3 X/ R. h
        for {
/ P' L% `# C4 G                // Clean up any expired block fetches
/ ]+ ^9 m- O9 Y- f% F/ R                // 清理过期的区块
! Q0 D* K' u. R# O. j! m                for hash, announce := range f.fetching {
7 ~7 b" p% J" ?- }3 u; Q                        if time.Since(announce.time) > fetchTimeout {& F& V$ N, |" A& Y
                                f.forgetHash(hash)8 E( s0 w. v+ I, m8 I
                        }1 e& }3 S$ Z0 E& u9 ]7 ]7 o# ]
                }
5 n" a+ N' V$ T                // Import any queued blocks that could potentially fit
9 Q4 W& Q4 p% H# n                // 导入队列中合适的块
: M7 c6 k7 w: \+ y. S                height := f.chainHeight()1 c! C6 ^) h/ |' a& o( J
                for !f.queue.Empty() {
. {% N5 Y; D6 t) L, A                        op := f.queue.PopItem().(*inject)
- [. e' d) d' L8 s) _                        hash := op.block.Hash()- t6 P0 u( H; g
                        if f.queueChangeHook != nil {; v1 Y( _6 |; m- l3 a- i
                                f.queueChangeHook(hash, false)
0 i, X* L5 q1 c, S5 ]" F6 [                        }
1 G2 J0 X% ~! I. T- z                        // If too high up the chain or phase, continue later' ?' w8 {0 C, N1 }9 b0 `) v
                        // 块不是链需要的下一个块,再入优先级队列,停止循环
. b& @3 |* N( P3 u, n6 p& ^4 v7 {; Z                        number := op.block.NumberU64(). n) ]  e8 F4 `" m
                        if number > height+1 {
1 W& R$ l% `$ S1 [% i                                f.queue.Push(op, -float32(number))
( y$ j1 j) g1 J' ~: R$ Z% c+ f3 z                                if f.queueChangeHook != nil {
  H% g6 F3 \5 P- z) e/ W9 E9 O                                        f.queueChangeHook(hash, true)# {- F7 }8 Z4 J  T9 q! T
                                }
/ k# @4 Q. q( `( i                                break
1 K1 O/ N, ~9 f" d; ]7 U1 W                        }/ |# T$ w- q7 Q4 T" N( i9 T  m( k
                        // Otherwise if fresh and still unknown, try and import9 w" i6 l! Y5 E% e
                        // 高度正好是我们想要的,并且链上也没有这个块( E) [4 l0 |: @9 n* K
                        if number+maxUncleDist . j3 H6 L: O/ S. c* k8 V
func (f *Fetcher) insert(peer string, block *types.Block) {, q3 L" V; `4 B1 }* k) @6 Y
        hash := block.Hash()
1 _# @% M* G5 X+ K& y        // Run the import on a new thread
6 K- ~3 a! O3 v% D8 T        log.Debug("Importing propagated block", "peer", peer, "number", block.Number(), "hash", hash)
9 h% h, l. F  s1 u7 s+ }0 |        go func() {8 s4 d' y: H3 \+ z' Z+ S& N: z8 H
                defer func() { f.done
$ I0 U4 k! E. L' k) {NewBlockHashesMsg的处理" q# y( n& P/ ^3 Y4 y) H) v! |" v
本节介绍NewBlockHashesMsg的处理,其实,消息处理是简单的,而复杂一点的是从Peer哪获取完整的区块,下节再看。
$ o' s9 H# D( m1 @; S4 ?7 b6 D流程如下:
2 t: Q3 a' k* z% V) E2 W# C对消息进行RLP解码,然后标记Peer已经知道此区块。寻找出本地区块链不存在的区块Hash值,把这些未知的Hash通知给fetcher。fetcher.Notify记录好通知信息,塞入notify通道,以便交给fetcher的协程。fetcher.loop()会对notify中的消息进行处理,确认区块并非DOS攻击,然后检查区块的高度,判断该区块是否已经在fetching或者comleting(代表已经下载区块头,在下载body),如果都没有,则加入到announced中,触发0s定时器,进行处理。
. b& L( J- D9 Q5 S5 a& |关于announced下节再介绍。8 [/ S9 _* T" I, i, N* x+ K( C8 r
& ^( _5 {9 f8 n9 Y# U/ i
// handleMsg()部分2 n& c# }3 p, a
case msg.Code == NewBlockHashesMsg:, b7 t) s/ d0 m' i! I
        var announces newBlockHashesData) _0 A7 Y( Q) c
        if err := msg.Decode(&announces); err != nil {5 Q0 ~* B9 z& [2 h7 ]
                return errResp(ErrDecode, "%v: %v", msg, err)
0 c3 t, y3 t  |$ M2 }& P        }
+ Z  y, L+ F1 E% }7 j% z        // Mark the hashes as present at the remote node% z+ u) J+ E2 ~6 M! E
        for _, block := range announces {
3 A! y) Q6 c; Q& s3 N                p.MarkBlock(block.Hash)8 M& O$ s3 E* i) h+ }3 y
        }3 {* s- W8 V9 o0 I
        // Schedule all the unknown hashes for retrieval
) H6 U/ r8 Z' ?6 @! z        // 把本地链没有的块hash找出来,交给fetcher去下载- H9 j- L5 B5 A$ z$ a) N- B
        unknown := make(newBlockHashesData, 0, len(announces)). P! n2 n1 b; f9 t2 E% p
        for _, block := range announces {
. [5 j- q$ D4 y" x/ m                if !pm.blockchain.HasBlock(block.Hash, block.Number) {: w9 r4 `8 y) p4 u# x' X  E# E
                        unknown = append(unknown, block)
) [" ]/ H) G- @                }
, ?6 j* y) W1 j9 R1 o# I% N        }3 Z# S1 _) _9 p& d+ I& e1 M3 w
        for _, block := range unknown {
5 q5 A# f' h! A% y$ k/ F! f* @                pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestOneHeader, p.RequestBodies)# A4 q1 \% E- R+ x! [
        }
* u  b5 y% ~" ~* Z// Notify announces the fetcher of the potential availability of a new block in. j1 r- E+ A9 ]7 ^
// the network.# X* A$ h- L! y8 ]" B- {
// 通知fetcher(自己)有新块产生,没有块实体,有hash、高度等信息
1 x4 X, M$ j" T, U8 u* o3 m4 w; nfunc (f *Fetcher) Notify(peer string, hash common.Hash, number uint64, time time.Time,$ H0 n0 S% m0 T
        headerFetcher headerRequesterFn, bodyFetcher bodyRequesterFn) error {$ Z4 g1 b7 f( N4 l
        block := &announce{; b  a$ i& s8 ~' B, v$ f. Q8 b% G
                hash:        hash,! @1 E9 J4 g0 U, s# W6 K) m( ~
                number:      number,
' l, Y! a0 v8 ?& X! j                time:        time,
2 N+ j4 n0 A1 R# Y+ W                origin:      peer,, a; k0 `( K( [  A* W
                fetchHeader: headerFetcher,
: v+ n( z' |/ Q/ ]                fetchBodies: bodyFetcher,* Z( v% {+ E  S* W
        }
3 X, |1 E. l; g* P        select {! _/ `5 D- d1 |6 ?/ y7 e
        case f.notify  hashLimit {. Z& N, E0 _# c7 P  V
                log.Debug("Peer exceeded outstanding announces", "peer", notification.origin, "limit", hashLimit)
& |* T# _" y/ p& J9 [: u                propAnnounceDOSMeter.Mark(1)
; v( W$ Q! e3 {6 ^7 F" ?                break; y0 J* Z; H% ]3 r
        }
6 _4 ^3 j4 a5 f9 W( v        // If we have a valid block number, check that it's potentially useful
$ V3 l/ Q$ r& K& N: \% v        // 高度检查
4 \) {/ l" i" L8 y. }5 l) o0 }        if notification.number > 0 {
5 J! @0 r- |0 b! R3 t: u                if dist := int64(notification.number) - int64(f.chainHeight()); dist  maxQueueDist {4 ~! D7 F1 l- v1 \
                        log.Debug("Peer discarded announcement", "peer", notification.origin, "number", notification.number, "hash", notification.hash, "distance", dist)
7 ]; T1 s0 u! y. d) ?$ t                        propAnnounceDropMeter.Mark(1)1 m- K) }3 w9 S# F7 [
                        break
  q% s5 r8 n9 z$ r; Z* ~                }/ P9 d7 K3 U5 r! O; a
        }
, w0 F1 L9 _) Q% ?        // All is well, schedule the announce if block's not yet downloading
6 k1 u8 q8 O( }' ?! c        // 检查是否已经在下载,已下载则忽略
: B; v# y  n; R7 b7 P. `        if _, ok := f.fetching[notification.hash]; ok {' G1 h* `& j7 O% @
                break5 P' j+ U% g. P5 U& X0 v2 {6 l
        }
5 u. D, I+ y6 @* J( d        if _, ok := f.completing[notification.hash]; ok {# V6 s6 A) e& {# }6 F
                break
8 @( y% Y# P5 I: R$ a        }
3 \0 D  A# Q. a- V/ `& |7 X        // 更新peer已经通知给我们的区块数量5 j! j6 A" ?$ P  c% ?
        f.announces[notification.origin] = count' g9 Y. l  Z8 \& ]
        // 把通知信息加入到announced,供调度
; z) v3 @9 H. t* c9 s4 b9 h        f.announced[notification.hash] = append(f.announced[notification.hash], notification)1 L: J3 L' S7 S1 q, @
        if f.announceChangeHook != nil && len(f.announced[notification.hash]) == 1 {
, f. `( x( N) {+ g                f.announceChangeHook(notification.hash, true)9 a% `$ h& `( q* }; e
        }7 p9 v. }7 j- o* S
        if len(f.announced) == 1 {& l4 C/ N4 i" M  b$ \
                // 有通知放入到announced,则重设0s定时器,loop的另外一个分支会处理这些通知
1 H! q2 h7 Q' k1 h  [                f.rescheduleFetch(fetchTimer)4 `3 n3 Z' N1 f/ f4 Q
        }# U% }6 n  |! e- d" o  `: B
fetcher获取完整区块( h: o+ j+ g6 p2 N/ x
本节介绍fetcher获取完整区块的过程,这也是fetcher最重要的功能,会涉及到fetcher至少80%的代码。单独拉放一大节吧。
$ Q" h0 x- M) R+ h- o8 D$ sFetcher的大头
4 u8 q+ s  X5 I5 E' X; P9 r: }Fetcher最主要的功能就是获取完整的区块,然后在合适的实际交给InsertChain去验证和插入到本地区块链。我们还是从宏观入手,看Fetcher是如何工作的,一定要先掌握好宏观,因为代码层面上没有这么清晰。
% e0 p. q: u! J4 l+ D9 O宏观% c" w7 H, j7 E1 D  v
首先,看两个节点是如何交互,获取完整区块,使用时序图的方式看一下,见图6,流程很清晰不再文字介绍。
+ [$ M3 E- |3 V3 s$ J
  Z6 `3 P7 @$ m) F) p7 V6 `再看下获取区块过程中,fetcher内部的状态转移,它使用状态来记录,要获取的区块在什么阶段,见图7。我稍微解释一下:% W- y7 M. c% o: ?, F% q
收到NewBlockHashesMsg后,相关信息会记录到announced,进入announced状态,代表了本节点接收了消息。announced由fetcher协程处理,经过校验后,会向给他发送消息的Peer发送请求,请求该区块的区块头,然后进入fetching状态。获取区块头后,如果区块头表示没有交易和uncle,则转移到completing状态,并且使用区块头合成完整的区块,加入到queued优先级队列。获取区块头后,如果区块头表示该区块有交易和uncle,则转移到fetched状态,然后发送请求,请求交易和uncle,然后转移到completing状态。收到交易和uncle后,使用头、交易、uncle这3个信息,生成完整的区块,加入到队列queued。
4 S8 \/ a- h' j  v
7 {, E" T: J0 |/ p$ Z2 K4 L# B; F
微观. G; s2 t$ G8 C. y3 \/ u. R) g
接下来就是从代码角度看如何获取完整区块的流程了,有点多,看不懂的时候,再回顾下上面宏观的介绍图。
' b* P: }3 d5 U, A- p首先看Fetcher的定义,它存放了通信数据和状态管理,捡加注释的看,上文提到的状态,里面都有。
4 i; L& n. E/ y+ ?1 J. I' r! V# D: A// Fetcher is responsible for accumulating block announcements from various peers
) w5 {  A5 f$ j) Z  ?// and scheduling them for retrieval.
$ ^+ ^0 A, p- X// 积累块通知,然后调度获取这些块
& E! V' f8 a/ R# l' C0 P4 a/ p: M2 }type Fetcher struct {- @/ @$ }+ I6 Y$ x6 S7 M/ x) O
        // Various event channels
' _4 S# d& s, X8 K    // 收到区块hash值的通道" u1 O1 k' Z# Q6 x( L* h- _- I
        notify chan *announce! h4 O( U8 {- r6 x
    // 收到完整区块的通道9 U& ^& h' B' o  a9 ~9 ?
        inject chan *inject
6 [' D) g; k. C2 E; n        blockFilter chan chan []*types.Block7 K1 D* L& h! C( ?4 ]" T
        // 过滤header的通道的通道
: M' [+ `# k& Q8 o: W        headerFilter chan chan *headerFilterTask
- L+ p4 K8 U5 b% E9 V5 O0 C0 y        // 过滤body的通道的通道
" a. C" V- L( [& l2 I        bodyFilter chan chan *bodyFilterTask
3 {" d/ d7 `5 v0 l" B2 `# ^        done chan common.Hash* V& Q2 O4 ^' J2 ]
        quit chan struct{}
* \) v# q4 Q& }, g) ~' M! }        // Announce states* I9 x/ U4 s+ k
        // Peer已经给了本节点多少区块头通知4 L/ }8 s& [7 n1 ^9 F% ~
        announces map[string]int // Per peer announce counts to prevent memory exhaustion
( W' i) H+ t% {5 o6 C        // 已经announced的区块列表
" t. R3 P3 O% `9 `, \5 z( x        announced map[common.Hash][]*announce // Announced blocks, scheduled for fetching+ m3 E5 L; P$ p; B2 u
        // 正在fetching区块头的请求4 f. P3 }$ l, ^* C
        fetching map[common.Hash]*announce // Announced blocks, currently fetching
2 Q" E% l3 M4 o$ X' E0 V# r8 T        // 已经fetch到区块头,还差body的请求,用来获取body
$ |- w0 y( ]; l        fetched map[common.Hash][]*announce // Blocks with headers fetched, scheduled for body retrieval# R9 ?# x; g3 ?2 H
        // 已经得到区块头的" t* G3 l( J0 [& R. U! ?: l1 f) E
        completing map[common.Hash]*announce // Blocks with headers, currently body-completing# Y9 }: J; c" V( L% E
        // Block cache  M) {2 Y5 r( K9 o/ `0 Z
        // queue,优先级队列,高度做优先级
8 ~& x2 T% j7 [  r/ J        // queues,统计peer通告了多少块& J; \8 ]7 g* }: R$ n, b7 k
        // queued,代表这个块如队列了,* }* o4 ]0 _9 ]
        queue  *prque.Prque            // Queue containing the import operations (block number sorted)- V, X- V. \: y) Z! C3 U6 q
        queues map[string]int          // Per peer block counts to prevent memory exhaustion+ q& G  ^, {# [, k7 J
        queued map[common.Hash]*inject // Set of already queued blocks (to dedupe imports)# q5 e' ?: Y" e! O9 |! Y# z
        // Callbacks
2 {$ c$ a% k* V$ x' Y7 x2 Z( y        getBlock       blockRetrievalFn   // Retrieves a block from the local chain
. a3 @4 L3 @/ s1 p  g$ D  w& E        verifyHeader   headerVerifierFn   // Checks if a block's headers have a valid proof of work,验证区块头,包含了PoW验证
& @5 D9 j1 b: A: ]2 {        broadcastBlock blockBroadcasterFn // Broadcasts a block to connected peers,广播给peer8 G0 X  M5 v9 c$ H
        chainHeight    chainHeightFn      // Retrieves the current chain's height+ n) G" c  m& b3 l3 a
        insertChain    chainInsertFn      // Injects a batch of blocks into the chain,插入区块到链的函数% X4 V4 u$ {3 v) \' N2 |5 j4 t' Y- `
        dropPeer       peerDropFn         // Drops a peer for misbehaving
2 N% G7 _* O- m8 R        // Testing hooks
3 d& S+ B7 Z& I' W- p        announceChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a hash from the announce list5 J9 @. |* }5 x' W+ m% i4 W( ~
        queueChangeHook    func(common.Hash, bool) // Method to call upon adding or deleting a block from the import queue5 g# D$ k" r/ U( ]  Z
        fetchingHook       func([]common.Hash)     // Method to call upon starting a block (eth/61) or header (eth/62) fetch* q* e& }' F, O# p3 s& L9 `
        completingHook     func([]common.Hash)     // Method to call upon starting a block body fetch (eth/62)  `& _, b# O+ k; W0 R* v6 y
        importedHook       func(*types.Block)      // Method to call upon successful block import (both eth/61 and eth/62)
) D( M) n2 f0 |  a" h}
% L* f, W6 z3 b! v6 D7 o% KNewBlockHashesMsg消息的处理前面的小节已经讲过了,不记得可向前翻看。这里从announced的状态处理说起。loop()        中,fetchTimer超时后,代表了收到了消息通知,需要处理,会从announced中选择出需要处理的通知,然后创建请求,请求区块头,由于可能有很多节点都通知了它某个区块的Hash,所以随机的从这些发送消息的Peer中选择一个Peer,发送请求的时候,为每个Peer都创建了单独的协程。, z( S7 J  `/ q+ V% `
case  arriveTimeout-gatherSlack {- R  N* x6 j1 P- d
                        // Pick a random peer to retrieve from, reset all others
, t* ~, y3 ?9 A6 z                        // 可能有很多peer都发送了这个区块的hash值,随机选择一个peer
8 B; K2 t& I% m  i- D/ W# T                        announce := announces[rand.Intn(len(announces))]  q/ y1 B) }/ L5 A/ K& O! ?& G
                        f.forgetHash(hash)
/ h5 m7 ?* v* ]8 E0 ?) ^                        // If the block still didn't arrive, queue for fetching
% p* @& _0 P  K; K! G5 \: ?# I                        // 本地还没有这个区块,创建获取区块的请求
  `9 C; z! Z$ U& ?  D                        if f.getBlock(hash) == nil {
9 n; m: M( {8 o& n$ q7 M7 v. w1 {, q: [                                request[announce.origin] = append(request[announce.origin], hash)
2 G/ s- n  O2 y: S5 y, a" c- x' U3 @                                f.fetching[hash] = announce
0 `( F0 g1 v2 E: A' J9 D9 a                        }7 _0 {. |& t1 [# k3 q4 z9 W
                }
5 P* z2 L" O, X9 `4 V/ i+ U        }3 |( i8 ~! `' ]& i8 _+ c( N  p: K
        // Send out all block header requests( \: r7 L- y- S( u* i1 c2 y
        // 把所有的request发送出去
1 Z0 C  q( V2 i( }        // 为每一个peer都创建一个协程,然后请求所有需要从该peer获取的请求
; e* b5 Y' n0 ~        for peer, hashes := range request {
  c& M* G9 ~; m% l8 v: g                log.Trace("Fetching scheduled headers", "peer", peer, "list", hashes)9 }# L/ w/ |. p0 B$ g
                // Create a closure of the fetch and schedule in on a new thread& v- J% f; Q3 E1 K/ V1 h0 F
                fetchHeader, hashes := f.fetching[hashes[0]].fetchHeader, hashes8 q% ~. i0 e% K# d. N. R
                go func() {
0 A$ W+ L$ F) c, H5 d; o5 P                        if f.fetchingHook != nil {
; O- {2 _" {( [$ f6 H; w3 C: a3 G                                f.fetchingHook(hashes)
; \: a3 K0 w2 i6 S$ O! U+ T                        }- }% A: {  Z0 y" d, A
                        for _, hash := range hashes {' g+ f" B% c, E" B1 i9 t# X  O
                                headerFetchMeter.Mark(1); I, |  `8 X) u4 C' `& ^2 }7 P0 }
                                fetchHeader(hash) // Suboptimal, but protocol doesn't allow batch header retrievals
1 }2 A! t; m+ h                        }
  l, ]. B" _. O+ o3 O. o/ H& m                }()" N8 n5 o. E8 F% M( ~
        }
5 @. {: {' v) V        // Schedule the next fetch if blocks are still pending
/ T( G/ l3 F/ u2 b6 p! J" D        f.rescheduleFetch(fetchTimer)4 O2 B# H! |, N5 I5 y, ?
从Notify的调用中,可以看出,fetcherHeader()的实际函数是RequestOneHeader(),该函数使用的消息是GetBlockHeadersMsg,可以用来请求多个区块头,不过fetcher只请求一个。7 u$ A+ i& ^. [1 c
pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestOneHeader, p.RequestBodies)
) U0 |6 D2 R" `5 U// RequestOneHeader is a wrapper around the header query functions to fetch a
* f; t/ U7 k/ s6 H7 p// single header. It is used solely by the fetcher.
0 \% p8 B5 V3 `* ufunc (p *peer) RequestOneHeader(hash common.Hash) error {
2 O9 U5 a, k6 k        p.Log().Debug("Fetching single header", "hash", hash)
' F* Z. }- {* J        return p2p.Send(p.rw, GetBlockHeadersMsg, &getBlockHeadersData{Origin: hashOrNumber{Hash: hash}, Amount: uint64(1), Skip: uint64(0), Reverse: false})6 E" S. B! s1 C8 h; l
}( V8 V1 s/ _' X8 |. m: P
GetBlockHeadersMsg的处理如下:因为它是获取多个区块头的,所以处理起来比较“麻烦”,还好,fetcher只获取一个区块头,其处理在20行~33行,获取下一个区块头的处理逻辑,这里就不看了,最后调用SendBlockHeaders()将区块头发送给请求的节点,消息是BlockHeadersMsg。9 h  j* ^$ O& b3 ]7 H
···& a9 n' i. S4 t. a
// handleMsg()0 Z* z0 c, `& C3 Z5 `* k
// Block header query, collect the requested headers and reply% p$ X, I# Y6 ^  l: i( L% p  W/ E# ]
case msg.Code == GetBlockHeadersMsg:
& ^9 x+ x3 B* ]9 f  L! B, J+ Y4 e  X# A// Decode the complex header query
. S  E0 S, ?3 dvar query getBlockHeadersData+ O! O: T( `  \
if err := msg.Decode(&query); err != nil {* s8 @# _' `8 v7 X
return errResp(ErrDecode, “%v: %v”, msg, err)) R$ l. U, l/ P# S
}3 H- p8 P, E; k" G/ F
hashMode := query.Origin.Hash != (common.Hash{})
8 d7 d- B. E6 {: K7 D! }0 P// Gather headers until the fetch or network limits is reached
" @2 M1 V3 g2 D4 [: g, d( K// 收集区块头,直到达到限制
( R/ n0 H- s! K6 v- ?' uvar (" T" q; ~. t" h3 T% g0 U0 @
        bytes   common.StorageSize4 P( @. r. _! K4 q5 G& h8 b
        headers []*types.Header  F$ ?2 Y# e( M
        unknown bool; W6 c2 A5 s  q+ c% o
)0 p+ w5 H& z4 j7 D. \9 h
// 自己已知区块 && 少于查询的数量 && 大小小于2MB && 小于能下载的最大数量
9 N* w: ?/ I5 k/ T4 nfor !unknown && len(headers) ) c7 v1 x8 x0 i- J" A5 U7 y2 v
`BlockHeadersMsg`的处理很有意思,因为`GetBlockHeadersMsg`并不是fetcher独占的消息,downloader也可以调用,所以,响应消息的处理需要分辨出是fetcher请求的,还是downloader请求的。它的处理逻辑是:fetcher先过滤收到的区块头,如果fetcher不要的,那就是downloader的,在调用`fetcher.FilterHeaders`的时候,fetcher就将自己要的区块头拿走了。
7 L# ?/ ]9 E0 p// handleMsg()
5 l, F! y5 a  G4 s3 {case msg.Code == BlockHeadersMsg:
# q& M, @( n0 E3 a) h3 t// A batch of headers arrived to one of our previous requests1 r& @8 }' ]1 |' v9 D8 c8 [
var headers []*types.Header+ f6 D% _% V, O; h6 E
if err := msg.Decode(&headers); err != nil {# A2 E  h0 O, C# n7 V7 V
return errResp(ErrDecode, “msg %v: %v”, msg, err)
! J8 r* n+ x. |6 K}
4 p- l  S7 M0 B; F% d& I; z4 j// If no headers were received, but we’re expending a DAO fork check, maybe it’s that% s: P4 J' z* D6 U
// 检查是不是当前DAO的硬分叉
5 |! E# r$ l; qif len(headers) == 0 && p.forkDrop != nil {
. A. x4 A. z* Z* G! `" T// Possibly an empty reply to the fork header checks, sanity check TDs
8 k& n, Z1 i) D7 wverifyDAO := true. `( Y3 s! ^+ R! ?, C- a' Z" m
        // If we already have a DAO header, we can check the peer's TD against it. If
5 q0 J; i# ?- U! \7 u1 R7 X! `        // the peer's ahead of this, it too must have a reply to the DAO check
5 j4 m% j- {! Y4 X( ~        if daoHeader := pm.blockchain.GetHeaderByNumber(pm.chainconfig.DAOForkBlock.Uint64()); daoHeader != nil {
. L, J4 T! t  p2 @( a$ |                if _, td := p.Head(); td.Cmp(pm.blockchain.GetTd(daoHeader.Hash(), daoHeader.Number.Uint64())) >= 0 {
1 S3 m: e' P6 d                        verifyDAO = false
7 ^: f# \7 i2 w0 w9 X* S/ H                }
9 o0 \$ j. R8 r; Y        }
4 O% H9 w/ ?' F+ k* Q" R        // If we're seemingly on the same chain, disable the drop timer" g  y  K) c6 ~" j
        if verifyDAO {, d3 A- S! G: r% E  f
                p.Log().Debug("Seems to be on the same side of the DAO fork")- n( a, b3 i) J- Z2 b% Z: s+ g7 h
                p.forkDrop.Stop()3 E- a5 `% k# U# a& a
                p.forkDrop = nil3 Q8 M( n9 w7 I. ^% |
                return nil
' G" \- z2 G' i) F0 V; J        }
0 f: x* y5 n4 W1 s1 X! z! X}
, i+ O9 i. f$ l5 T7 z/ z. A// Filter out any explicitly requested headers, deliver the rest to the downloader9 F: O% D9 C. @
// 过滤是不是fetcher请求的区块头,去掉fetcher请求的区块头再交给downloader
3 x2 D" x+ ^9 |3 h6 ~) M7 vfilter := len(headers) == 1
, i2 s) F" ~  G# a) P/ n( Aif filter {
- e5 B  y+ R2 x7 i        // If it's a potential DAO fork check, validate against the rules
5 t! j9 e' ]& |        // 检查是否硬分叉4 b' h. f' Q3 a& N3 _, x5 U
        if p.forkDrop != nil && pm.chainconfig.DAOForkBlock.Cmp(headers[0].Number) == 0 {
. i* y% y9 l  }0 @0 }- a8 o; c7 i" {                // Disable the fork drop timer
6 N/ X/ M. P- W" b# P                p.forkDrop.Stop()! `- U: u" v6 ~8 l) [- Q: o% B1 i
                p.forkDrop = nil
% y& f) j0 \/ \; f! {- ?1 W                // Validate the header and either drop the peer or continue
8 _9 H2 t/ [1 o3 e% w  n$ j$ N( T                if err := misc.VerifyDAOHeaderExtraData(pm.chainconfig, headers[0]); err != nil {
. d( y; \% W! c                        p.Log().Debug("Verified to be on the other side of the DAO fork, dropping")+ t9 }- w0 l) @. `9 i% H
                        return err# k) L( B8 h/ U% h) t
                }
) k" d( F# h8 L5 E7 g                p.Log().Debug("Verified to be on the same side of the DAO fork")
* d  i0 z7 A/ D; n                return nil. q* G' c3 b& r9 \, u
        }+ \7 K! K' W. V( N3 s+ h
        // Irrelevant of the fork checks, send the header to the fetcher just in case
9 G" D8 v* i- r1 p* K$ r$ N        // 使用fetcher过滤区块头3 ~4 j' `% ^: J* _. j* _( {
        headers = pm.fetcher.FilterHeaders(p.id, headers, time.Now())0 {7 \: V. M6 R' H4 m
}
" ^9 a* C# L3 U// 剩下的区块头交给downloader. m7 U  o' G. u7 z6 f
if len(headers) > 0 || !filter {, N8 x. o7 i' \& G
        err := pm.downloader.DeliverHeaders(p.id, headers)+ d' e3 |( z6 X) `7 h* W# Y
        if err != nil {- M0 L3 N9 |1 Z5 Q) F5 D2 s1 f
                log.Debug("Failed to deliver headers", "err", err)
$ u' U0 m! T- s+ }        }$ Y) M9 Y' Z7 c. c; w2 J2 I7 u7 C% O) w
}' i% n, b( e- G  ?  M' B
`FilterHeaders()`是一个很有大智慧的函数,看起来耐人寻味,但实在妙。它要把所有的区块头,都传递给fetcher协程,还要获取fetcher协程处理后的结果。`fetcher.headerFilter`是存放通道的通道,而`filter`是存放包含区块头过滤任务的通道。它先把`filter`传递给了`headerFilter`,这样`fetcher`协程就在另外一段等待了,而后将`headerFilterTask`传入`filter`,`fetcher`就能读到数据了,处理后,再将数据写回`filter`而刚好被`FilterHeaders`函数处理了,该函数实际运行在`handleMsg()`的协程中。& H0 |: c/ k1 _& i* O% k7 j
每个Peer都会分配一个ProtocolManager然后处理该Peer的消息,但`fetcher`只有一个事件处理协程,如果不创建一个`filter`,fetcher哪知道是谁发给它的区块头呢?过滤之后,该如何发回去呢?2 a+ N3 g( W9 U6 v+ y8 q
// FilterHeaders extracts all the headers that were explicitly requested by the fetcher,( {( x; y3 G4 G' S( @
// returning those that should be handled differently.# ~" [' c' q( F( V# \5 W) A
// 寻找出fetcher请求的区块头
' q1 C  x! M, m+ V- efunc (f *Fetcher) FilterHeaders(peer string, headers []*types.Header, time time.Time) []*types.Header {8 {6 S  U* ^( t, W
log.Trace(“Filtering headers”, “peer”, peer, “headers”, len(headers))
3 b& [: S( i  S: c) f9 s% x' ?// Send the filter channel to the fetcher! m( W/ i+ a4 w0 J  C
// 任务通道) F. D3 |) }) @7 g; j
filter := make(chan *headerFilterTask)- M8 O9 h% S3 |# O# m
select {4 Y' N3 d  }+ W: l) j
// 任务通道发送到这个通道
$ a* o2 j' Y1 gcase f.headerFilter ' a5 T% p9 A* y5 M4 H
}
. g2 z3 N6 O" B0 P5 ]- s: p接下来要看f.headerFilter的处理,这段代码有90行,它做了一下几件事:
3 |- q! M( j6 ]+ V1. 从`f.headerFilter`取出`filter`,然后取出过滤任务`task`。
. C$ E& Z. T8 w" ?1 R3 A9 ]- w. ~2. 它把区块头分成3类:`unknown`这不是分是要返回给调用者的,即`handleMsg()`, `incomplete`存放还需要获取body的区块头,`complete`存放只包含区块头的区块。遍历所有的区块头,填到到对应的分类中,具体的判断可看18行的注释,记住宏观中将的状态转移图。
7 y4 B: W9 T6 h( T3. 把`unknonw`中的区块返回给`handleMsg()`。) h: L( s' _6 D% e* b& I
4. 把` incomplete`的区块头获取状态移动到`fetched`状态,然后触发定时器,以便去处理complete的区块。
" z1 R2 B: R/ a! n) k5. 把`compelete`的区块加入到`queued`。! p/ }$ ?" l, r) b, S% M3 t
// fetcher.loop()7 N$ t9 m  ?" N9 Z3 g; c! r( i  M
case filter :=
0 S9 h: f% f( Y$ k& j: M& Z! c// Split the batch of headers into unknown ones (to return to the caller),
; M9 [) L0 D8 @5 F6 i" Q7 e6 S+ l# S// known incomplete ones (requiring body retrievals) and completed blocks.3 D8 \) e) U/ Z- M$ g# s
// unknown的不是fetcher请求的,complete放没有交易和uncle的区块,有头就够了,incomplete放
% T+ r5 C  }$ w+ O. e// 还需要获取uncle和交易的区块
- I- }; J# b% B: \+ f+ S: A+ Z3 f, Funknown, incomplete, complete := []*types.Header{}, []*announce{}, []*types.Block{}
) O/ L* a. @: k+ I; Z// 遍历所有收到的header
% n8 b4 i5 B" e) ~3 Qfor _, header := range task.headers {
8 Q- \7 N( \" j' _- ?! u        hash := header.Hash()
+ M/ J" m2 \- c/ e' @- O4 J        // Filter fetcher-requested headers from other synchronisation algorithms$ f9 H$ Y' P- ~3 C6 ^
        // 是正在获取的hash,并且对应请求的peer,并且未fetched,未completing,未queued9 M0 E( t6 K: i* T5 j
        if announce := f.fetching[hash]; announce != nil && announce.origin == task.peer && f.fetched[hash] == nil && f.completing[hash] == nil && f.queued[hash] == nil {# X+ E3 {$ J& i* J0 B7 [! i
                // If the delivered header does not match the promised number, drop the announcer, D; |, F% U  x3 f; J
                // 高度校验,竟然不匹配,扰乱秩序,peer肯定是坏蛋。
2 A7 P# h" h1 c, n                if header.Number.Uint64() != announce.number {
; w8 x' e! p) C* z2 G+ s$ d7 m, E                        log.Trace("Invalid block number fetched", "peer", announce.origin, "hash", header.Hash(), "announced", announce.number, "provided", header.Number)
1 E8 ]5 _3 q/ `# Y8 T3 g                        f.dropPeer(announce.origin)0 R4 y7 d9 D/ |
                        f.forgetHash(hash)' T6 }" g5 T: q9 [; P
                        continue
& z/ m3 S( i$ V                }
5 x( |$ G8 a0 i& g                // Only keep if not imported by other means
0 R, Y8 ]8 V3 ~3 ?  a                // 本地链没有当前区块
. L0 T7 t) a2 e" ]; H( N. c                if f.getBlock(hash) == nil {* R; @1 `- w2 a9 P5 \$ Y  {
                        announce.header = header
" n! L" w0 Z* h: n/ }                        announce.time = task.time, L, j4 A) V( m2 k! g
                        // If the block is empty (header only), short circuit into the final import queue& g' ?2 o0 O/ Y0 l7 x6 x
                        // 如果区块没有交易和uncle,加入到complete. y; b" M* w& S5 F
                        if header.TxHash == types.DeriveSha(types.Transactions{}) && header.UncleHash == types.CalcUncleHash([]*types.Header{}) {
8 Q. i& N+ p: B! I( x                                log.Trace("Block empty, skipping body retrieval", "peer", announce.origin, "number", header.Number, "hash", header.Hash())7 O9 w, `9 q+ A/ `- b$ S. [
                                block := types.NewBlockWithHeader(header)9 `( F" J& L: h6 n4 M
                                block.ReceivedAt = task.time
+ ]* V5 _6 {, z1 W                                complete = append(complete, block)' G/ B. C% h6 [1 p; e' w
                                f.completing[hash] = announce
' J! k7 ^0 E' M5 I( N* Z                                continue- \! Q4 J& T# u/ C0 |
                        }4 ^5 W! _0 ~8 s# G4 N6 D: t4 \
                        // Otherwise add to the list of blocks needing completion
- M  _' f% O5 ]+ e                        // 否则就是不完整的区块5 a2 d0 f* m( f4 G* D
                        incomplete = append(incomplete, announce)
! X4 B* y( o7 K6 B- D                } else {/ k2 K: n* K  v( v$ I' v8 L- q
                        log.Trace("Block already imported, discarding header", "peer", announce.origin, "number", header.Number, "hash", header.Hash())
# C1 N2 a& \5 W  h4 i7 d/ {                        f.forgetHash(hash)
2 Y3 T2 H9 Z; O* M) u/ `5 T                }
0 L' T5 @* P4 o6 W# Q* [7 ~        } else {
9 i4 G" G' t5 v4 W/ m: |- i                // Fetcher doesn't know about it, add to the return list
; O) C: \& K. L9 g% M                // 没请求过的header, S" C( X# c4 v$ t' P. I$ V
                unknown = append(unknown, header)
  p( L. p& n- |/ C        }
7 a& G8 Q1 s3 C. A}
0 d( {! M' c7 v, o# K4 u/ `// 把未知的区块头,再传递会filter
7 D( j6 s  k* @3 ~1 H2 ~: LheaderFilterOutMeter.Mark(int64(len(unknown)))
  Y9 I( u3 g$ X- E* {) nselect {3 g$ r/ l, X1 ]+ I! |
case filter ! l2 _$ {) c3 Z& c( `  K
跟随状态图的转义,剩下的工作是`fetched`转移到`completing`        ,上面的流程已经触发了`completeTimer`定时器,超时后就会处理,流程与请求Header类似,不再赘述,此时发送的请求消息是`GetBlockBodiesMsg`,实际调的函数是`RequestBodies`。. e+ x3 v# N. {' c2 `  A/ `/ g9 P' U
// fetcher.loop()  U$ r  N6 ~' q  F# z
case 5 a  E+ t7 R2 e/ ~# A9 o
// 遍历所有待获取body的announce
- Y9 ?9 V9 ]* ofor hash, announces := range f.fetched {/ X) f- F) ~) I3 o
        // Pick a random peer to retrieve from, reset all others
. E3 D" L' [+ }" z) @9 N        // 随机选一个Peer发送请求,因为可能已经有很多Peer通知它这个区块了
6 T! W+ I. n9 G7 S2 |* H        announce := announces[rand.Intn(len(announces))]* A2 P( g  K2 \, n5 T
        f.forgetHash(hash)6 F, I/ y& o% Y; p$ M3 {( G3 E. Q
        // If the block still didn't arrive, queue for completion& ?/ s$ S, f0 H2 C
        // 如果本地没有这个区块,则放入到completing,创建请求5 l3 Y& R& ~' c2 A& K
        if f.getBlock(hash) == nil {$ [2 V* Z  h4 M6 ]
                request[announce.origin] = append(request[announce.origin], hash)5 F6 e4 Y8 k2 L) |) H; M- x1 o) ^
                f.completing[hash] = announce
2 ?& f& L) s$ V, C6 U: \5 ?& M: U        }
0 z6 d3 M! |4 C" L. K' c+ A( C* G}
6 g) q# g( y  [: H: r3 A% _// Send out all block body requests  w( ?; \$ [5 w; B# q* o1 p
// 发送所有的请求,获取body,依然是每个peer一个单独协程. a% o7 P+ R+ l7 N$ V
for peer, hashes := range request {% S! Z, u) ]  v0 l+ [
        log.Trace("Fetching scheduled bodies", "peer", peer, "list", hashes)' ^$ U8 W) F  j+ d2 F8 e0 O9 L
        // Create a closure of the fetch and schedule in on a new thread$ z* G( ]4 a1 N/ S& g! _
        if f.completingHook != nil {
" ]  s2 S' c# ~                f.completingHook(hashes)
+ c, e# h" A& |# E7 P' m4 h+ m        }
7 V  b7 |9 R/ s/ N9 I        bodyFetchMeter.Mark(int64(len(hashes)))& Y& S* w1 s1 o! O2 U( q- x
        go f.completing[hashes[0]].fetchBodies(hashes)
3 C7 N4 j* D# P% N/ ]( N* j- S}& F. O1 `/ d8 a) V" X: l: z# ]) f( z# y
// Schedule the next fetch if blocks are still pending  x6 h3 t5 n: @3 k  }! _/ |( h5 y4 y
f.rescheduleComplete(completeTimer)
- b" }3 j7 r2 H. f! Z8 ^6 {`handleMsg()`处理该消息也是干净利落,直接获取RLP格式的body,然后发送响应消息。
- K3 r; {+ ^, I// handleMsg()5 y2 ]. ^5 }5 W' ^* U# {9 K7 S
case msg.Code == GetBlockBodiesMsg:
( Q1 [' s  E/ e0 w5 m  d// Decode the retrieval message
9 t2 P( b. U" W2 G' dmsgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))" }) O2 t2 X: S4 l; S# x
if _, err := msgStream.List(); err != nil {
/ T1 c- }9 l/ c# E6 j+ c: mreturn err
4 h' j( w8 ^' X5 @0 Y6 y) F}6 o. _7 H, N; H' k$ `4 X
// Gather blocks until the fetch or network limits is reached% w3 M1 f) s/ ]! S# T' g( X
var (
& i* o9 c6 W/ s  hhash   common.Hash
8 A: u2 b! I7 h- Cbytes  int
# V3 a; X! p1 J( k- O/ Gbodies []rlp.RawValue
5 C0 H9 o4 l1 g- g0 C)7 e/ j6 x: N' k4 c
// 遍历所有请求( ]0 m  I7 H2 Q  t4 ]' E
for bytes 2 `3 _" G& g& E
响应消息`BlockBodiesMsg`的处理与处理获取header的处理原理相同,先交给fetcher过滤,然后剩下的才是downloader的。需要注意一点,响应消息里只包含交易列表和叔块列表。
6 X0 j" e+ i5 M9 E+ S9 Z" x! g$ X// handleMsg()5 O: H$ ?8 q( [  S3 Q
case msg.Code == BlockBodiesMsg:
6 s. ?$ O! N: p; U/ H7 @2 L// A batch of block bodies arrived to one of our previous requests+ T+ p1 ]$ a6 _( t* @
var request blockBodiesData# @1 W5 \; N  U" L. H4 n* F
if err := msg.Decode(&request); err != nil {; I8 a. l* N+ d  ]' i  j# V9 a# }( q
return errResp(ErrDecode, “msg %v: %v”, msg, err)
0 G4 z- `$ ?: g2 `3 f" f" c5 z+ `3 ^& g}
9 D2 r; k3 y& f" ?& Z- b  Q+ }# w// Deliver them all to the downloader for queuing
& b) D7 g+ }1 {( P// 传递给downloader去处理4 N7 g. m6 J* u: g* j) r
transactions := make([][]*types.Transaction, len(request))
1 j2 k- L+ [/ Q6 r! |uncles := make([][]*types.Header, len(request))4 p% r7 l+ S$ k! ~
for i, body := range request {4 U9 o! F3 Y6 p: I+ |
        transactions = body.Transactions
% \: y. ?& r! U4 G8 r        uncles = body.Uncles
. {; z  }# v9 F4 M- t}
/ B  d$ |( q. n5 y// Filter out any explicitly requested bodies, deliver the rest to the downloader
& ?+ @- ]8 u! j5 D% `3 ~// 先让fetcher过滤去fetcher请求的body,剩下的给downloader% I: |) ?( M, L
filter := len(transactions) > 0 || len(uncles) > 0
+ x/ u' \8 a9 g& z/ d( p1 |- U6 k+ kif filter {5 P+ u6 ?# O4 e# b, y* i# U
        transactions, uncles = pm.fetcher.FilterBodies(p.id, transactions, uncles, time.Now())
* t. ^4 }  {# m! t}! X0 e5 D+ I/ e0 I% X) u* A
// 剩下的body交给downloader
+ V% P. d# K2 R4 |  sif len(transactions) > 0 || len(uncles) > 0 || !filter {
, |) a. N- d3 `3 t- i        err := pm.downloader.DeliverBodies(p.id, transactions, uncles)
% O; F- D3 P. t" o/ P3 H        if err != nil {) \* \4 _; P6 F6 ^
                log.Debug("Failed to deliver bodies", "err", err)8 ]+ I- R4 |9 d% o5 F- C' n
        }2 o% O" d: _( n2 D9 w
}" O9 o0 }! d. k5 E
过滤函数的原理也与Header相同。
- t. a/ X2 ]- g0 E, s// FilterBodies extracts all the block bodies that were explicitly requested by1 z% c5 W, F/ X5 k4 n
// the fetcher, returning those that should be handled differently.
1 o4 W# P) z6 D4 K8 r// 过去出fetcher请求的body,返回它没有处理的,过程类型header的处理/ f" s+ r$ d" n0 v
func (f *Fetcher) FilterBodies(peer string, transactions [][]*types.Transaction, uncles [][]*types.Header, time time.Time) ([][]*types.Transaction, [][]*types.Header) {
; l" v# Z5 k3 p) f& rlog.Trace(“Filtering bodies”, “peer”, peer, “txs”, len(transactions), “uncles”, len(uncles))
4 r5 b1 h1 R0 c+ o  @// Send the filter channel to the fetcher8 t/ w! v* ]- f# d+ Z
filter := make(chan *bodyFilterTask)
" T7 C) z! v; k+ A  M  q9 K2 \select {* f2 }7 m, o1 e/ Z- s, g9 o
case f.bodyFilter   s1 J/ i: L  e0 G& D! D7 f* B
}6 I7 ]+ }" f/ y& [1 L2 J: a
实际过滤body的处理瞧一下,这和Header的处理是不同的。直接看不点:
! E" Z+ X- ~6 M( P* x7 ]8 [" w) C1. 它要的区块,单独取出来存到`blocks`中,它不要的继续留在`task`中。
4 }" j* w) I/ r7 \2. 判断是不是fetcher请求的方法:如果交易列表和叔块列表计算出的hash值与区块头中的一样,并且消息来自请求的Peer,则就是fetcher请求的。
% P0 E7 G9 L& V7 v' @+ L3. 将`blocks`中的区块加入到`queued`,终结。6 Q" R1 z: }$ v( B; N4 n
case filter := $ [) O9 @. ?  }3 Q) b# Q* `
blocks := []*types.Block{}
* R7 }4 G5 g, f8 A// 获取的每个body的txs列表和uncle列表( Y( Q8 W+ @9 s. g8 h
// 遍历每个区块的txs列表和uncle列表,计算hash后判断是否是当前fetcher请求的body
* p( L! v- \& M5 V! Pfor i := 0; i
; p. p; v1 b* q6 x: q* g- _5 q}9 Z7 ?; e( c" ?/ e
* q! J* h* N4 R, Y# z- j  c
至此,fetcher获取完整区块的流程讲完了,fetcher模块中80%的代码也都贴出来了,还有2个值得看看的函数:
9 y, e/ t& Q! \7 i1. `forgetHash(hash common.Hash)`:用于清空指定hash指的记/状态录信息。
% [* K6 s1 j" T, L2. `forgetBlock(hash common.Hash)`:用于从队列中移除一个区块。
7 a. j( @, M- e; V) [3 N: h+ U最后了,再回到开始看看fetcher模块和新区块的传播流程,有没有豁然开朗。$ T( Y* T6 f+ o8 q: e, U2 y, Q

+ \- g0 t* D+ M! q
BitMere.com 比特池塘系信息发布平台,比特池塘仅提供信息存储空间服务。
声明:该文观点仅代表作者本人,本文不代表比特池塘立场,且不构成建议,请谨慎对待。
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

成为第一个吐槽的人

刘艳琴 小学生
  • 粉丝

    0

  • 关注

    0

  • 主题

    3