Hi 游客

更多精彩,请登录!

比特池塘 区块链技术 正文

以太坊源码分析:fetcher模块和区块传播

刘艳琴
157 0 0
从区块传播策略入手,介绍新区块是如何传播到远端节点,以及新区块加入到远端节点本地链的过程,同时会介绍fetcher模块,fetcher的功能是处理Peer通知的区块信息。在介绍过程中,还会涉及到p2p,eth等模块,不会专门介绍,而是专注区块的传播和加入区块链的过程。( V# Y3 d; c; F+ ?% y
当前代码是以太坊Release 1.8,如果版本不同,代码上可能存在差异。
/ D: u- `: _# X% _+ c/ W总体过程和传播策略
. u3 x  T! J* ]9 I1 j. q0 p本节从宏观角度介绍,节点产生区块后,为了传播给远端节点做了啥,远端节点收到区块后又做了什么,每个节点都连接了很多Peer,它传播的策略是什么样的?
3 E4 L' t" q* B0 p1 E. c- H总体流程和策略可以总结为,传播给远端Peer节点,Peer验证区块无误后,加入到本地区块链,继续传播新区块信息。具体过程如下。
7 @+ h3 M7 `2 t先看总体过程。产生区块后,miner模块会发布一个事件NewMinedBlockEvent,订阅事件的协程收到事件后,就会把新区块的消息,广播给它所有的peer,peer收到消息后,会交给自己的fetcher模块处理,fetcher进行基本的验证后,区块没问题,发现这个区块就是本地链需要的下一个区块,则交给blockChain进一步进行完整的验证,这个过程会执行区块所有的交易,无误后把区块加入到本地链,写入数据库,这个过程就是下面的流程图,图1。( w) G- ]( b+ k" U: H4 R  Z+ y; g' V# F
. E. T; D" k5 E( @. V$ |  q, ]
总体流程图,能看到有个分叉,是因为节点传播新区块是有策略的。它的传播策略为:
. w. B) ]. [7 O  t7 e* f& u: K假如节点连接了N个Peer,它只向Peer列表的sqrt(N)个Peer广播完整的区块消息。向所有的Peer广播只包含区块Hash的消息。
: t7 Q' ?1 X3 m! F5 I; Z+ M策略图的效果如图2,红色节点将区块传播给黄色节点:
# L3 A; l& p% U2 T; m- s, A! E1 B! w. p5 @
1 v. y, y3 G5 l7 [: a/ _' @: X, S
收到区块Hash的节点,需要从发送给它消息的Peer那里获取对应的完整区块,获取区块后就会按照图1的流程,加入到fetcher队列,最终插入本地区块链后,将区块的Hash值广播给和它相连,但还不知道这个区块的Peer。非产生区块节点的策略图,如图3,黄色节点将区块Hash传播给青色节点:
$ u8 v/ j4 |+ D
! H; c5 ~& y# e3 |4 |# g至此,可以看出以太坊采用以石击水的方式,像水纹一样,层层扩散新产生的区块。* E" E1 N: c/ E9 K
Fetcher模块是干啥的5 y1 o/ |2 S0 }1 @  c# y7 ^) w$ V0 W  k
fetcher模块的功能,就是收集其他Peer通知它的区块信息:1)完整的区块2)区块Hash消息。根据通知的消息,获取完整的区块,然后传递给eth模块把区块插入区块链。0 _; |5 r2 r: l0 c1 E
如果是完整区块,就可以传递给eth插入区块,如果只有区块Hash,则需要从其他的Peer获取此完整的区块,然后再传递给eth插入区块+ U, P- a, X% G, u& `5 U
2 W) G7 v4 _4 ?4 G
源码解读6 v+ ]9 i( g% A) v" L7 X
本节介绍区块传播和处理的细节东西,方式仍然是先用图解释流程,再是代码流程。6 ~* P% @( @/ j* ^& v6 H
产块节点的传播新区块  |6 H7 \* ^% [
节点产生区块后,广播的流程可以表示为图4:& @* _. e0 v% A/ t2 d9 Q& a3 T
发布事件事件处理函数选择要广播完整的Peer,然后将区块加入到它们的队列事件处理函数把区块Hash添加到所有Peer的另外一个通知队列每个Peer的广播处理函数,会遍历它的待广播区块队列和通知队列,把数据封装成消息,调用P2P接口发送出去
& Y# J& f: r! n) q0 w" V
1 t' O7 t8 o) L$ `8 Y. Y" ^
" k% ]( ]( n. A8 ]- p0 s再看下代码上的细节。7 A. I' g! u" ^, D2 L- c) c
worker.wait()函数发布事件NewMinedBlockEvent。ProtocolManager.minedBroadcastLoop()是事件处理函数。它调用了2次pm.BroadcastBlock()。; H4 K% q, Z; Q- U7 J7 K+ e
! w! ^1 P! x9 p5 A$ ~% X
// Mined broadcast loop' g# d) d7 E7 i) W; n8 W
func (pm *ProtocolManager) minedBroadcastLoop() {
  N! T# S# }" V: j/ a7 A: W" h        // automatically stops if unsubscribe
9 p9 M( ^' ^0 o        for obj := range pm.minedBlockSub.Chan() {# h' {$ [8 j$ f
                switch ev := obj.Data.(type) {. {& O+ [# V" d8 A. E# f+ Y
                case core.NewMinedBlockEvent:8 {' f: H- ]- p" S7 c) r- |1 |1 w
                        pm.BroadcastBlock(ev.Block, true)  // First propagate block to peers. E9 [7 g8 |$ N* _
                        pm.BroadcastBlock(ev.Block, false) // Only then announce to the rest7 e# Y6 @9 ~: S) J+ `& b$ B2 L
                }
4 v5 y1 b8 r5 D9 h0 Y) E  R; N7 ]+ ?        }
0 l/ ^! u. m, h, \}
; C7 ]7 R0 _" J& q7 U* `pm.BroadcastBlock()的入参propagate为真时,向部分Peer广播完整的区块,调用peer.AsyncSendNewBlock(),否则向所有Peer广播区块头,调用peer.AsyncSendNewBlockHash(),这2个函数就是把数据放入队列,此处不再放代码。
7 E6 g# \2 u2 b// BroadcastBlock will either propagate a block to a subset of it's peers, or
; p$ h: w6 D8 ]  \, m& O) M1 h3 i% j// will only announce it's availability (depending what's requested).+ v9 g; x$ C1 P; z+ o
func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) {5 H6 V/ X5 z# U- w- G$ j2 X7 K
        hash := block.Hash()' f7 Y* v" h" b* l( i. }, N7 m
        peers := pm.peers.PeersWithoutBlock(hash)
; g4 H. R; Z- y        // If propagation is requested, send to a subset of the peer
3 ]7 S! e2 B2 q: B9 P' S        // 这种情况,要把区块广播给部分peer
, d; ~& v+ e) T& L9 r        if propagate {
  S: R% P& L3 F2 D3 u& Z  b                // Calculate the TD of the block (it's not imported yet, so block.Td is not valid)
1 `0 m- k6 C' u8 r                // 计算新的总难度: Y/ W' i4 k& J0 F5 C" b
                var td *big.Int; z9 v& F. I1 A3 ^5 y7 D. I8 O/ Y8 N
                if parent := pm.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1); parent != nil {
1 I6 N( i6 m+ P3 C                        td = new(big.Int).Add(block.Difficulty(), pm.blockchain.GetTd(block.ParentHash(), block.NumberU64()-1))
+ G9 F. n; N: _* W' Q; Y4 s                } else {
; _8 K4 ]: K8 y9 v9 W1 M                        log.Error("Propagating dangling block", "number", block.Number(), "hash", hash)
- t4 @6 g8 |0 a. m% m0 F                        return
1 _  H! W# b! }! I6 ~                }7 A0 _. }+ F2 n& r3 n
                // Send the block to a subset of our peers, V# z6 k: ?8 n5 A$ y* _' _
                // 广播区块给部分peer
! S4 P* G% U3 h8 w( n2 i- b                transfer := peers[:int(math.Sqrt(float64(len(peers))))]# r+ g' j& @& P5 R
                for _, peer := range transfer {
' W3 K1 I2 U, P5 ]$ @4 S                        peer.AsyncSendNewBlock(block, td)$ J5 G" |5 h/ V4 v
                }% N( I5 Z* K7 w1 c$ p, W
                log.Trace("Propagated block", "hash", hash, "recipients", len(transfer), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))
! m' r, _, O0 Z: c" r* ]3 Q" i                return7 H- G9 x5 O4 W, w( i) a+ M' _$ D
        }/ m0 G5 d, K- z4 Y' t6 i
        // Otherwise if the block is indeed in out own chain, announce it
% v% I1 V% G" C/ o        // 把区块hash值广播给所有peer
5 P1 w9 n' B1 J+ l        if pm.blockchain.HasBlock(hash, block.NumberU64()) {
& Q3 U! X- F) v1 p7 c: n/ `0 U+ V0 m, ~                for _, peer := range peers {
6 Q% M$ s1 M) ^& A- O5 s                        peer.AsyncSendNewBlockHash(block)7 g+ t* M/ q- B% g: R9 Q
                }. B8 W7 n, T( C. V1 o  Q& C
                log.Trace("Announced block", "hash", hash, "recipients", len(peers), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))4 x: S3 S) L( @- w
        }9 ?5 Z* e2 P: F" x
}
2 Y8 u1 j5 R" G7 ~peer.broadcase()是每个Peer连接的广播函数,它只广播3种消息:交易、完整的区块、区块的Hash,这样表明了节点只会主动广播这3中类型的数据,剩余的数据同步,都是通过请求-响应的方式。4 [9 }( X; x0 ]7 ^9 C
// broadcast is a write loop that multiplexes block propagations, announcements
# L- y4 _& k) R9 w" O4 |: Z$ F7 \8 T// and transaction broadcasts into the remote peer. The goal is to have an async! l( ^- ]) _' Y/ C& O9 Q# t
// writer that does not lock up node internals.% d# P6 u3 T+ R5 a
func (p *peer) broadcast() {5 R3 n/ `1 r8 _  i. G9 \# z- `. ]
        for {
5 a' C- E! P. a/ K; J                select {
& q: }& \* \: p. \* J                // 广播交易
0 r# }) x" L, D* V1 c                case txs :=
4 D: T- x1 s# ^/ s3 RPeer节点处理新区块
6 m6 [8 i6 M, w, n本节介绍远端节点收到2种区块同步消息的处理,其中NewBlockMsg的处理流程比较清晰,也简洁。NewBlockHashesMsg消息的处理就绕了2绕,从总体流程图1上能看出来,它需要先从给他发送消息Peer那里获取到完整的区块,剩下的流程和NewBlockMsg又一致了。
8 d8 I( S+ U$ u# j, w这部分涉及的模块多,画出来有种眼花缭乱的感觉,但只要抓住上面的主线,代码看起来还是很清晰的。通过图5先看下整体流程。
# ~) w: b$ ^: K( Z消息处理的起点是ProtocolManager.handleMsg,NewBlockMsg的处理流程是蓝色标记的区域,红色区域是单独的协程,是fetcher处理队列中区块的流程,如果从队列中取出的区块是当前链需要的,校验后,调用blockchian.InsertChain()把区块插入到区块链,最后写入数据库,这是黄色部分。最后,绿色部分是NewBlockHashesMsg的处理流程,代码流程上是比较复杂的,为了能通过图描述整体流程,我把它简化掉了。
  l- p* J  j8 k$ ~$ A3 A
# y+ u1 j2 }  n( y仔细看看这幅图,掌握整体的流程后,接下来看每个步骤的细节。
- C' x9 |7 {& a4 BNewBlockMsg的处理( Z, Y6 h' S! x+ S4 D" O
本节介绍节点收到完整区块的处理,流程如下:
" L5 x# @- \( H; \首先进行RLP编解码,然后标记发送消息的Peer已经知道这个区块,这样本节点最后广播这个区块的Hash时,不会再发送给该Peer。( S5 Z/ h2 }1 P8 J5 n8 V6 }2 z
将区块存入到fetcher的队列,调用fetcher.Enqueue。8 w8 y. B' n, l8 N
更新Peer的Head位置,然后判断本地链是否落后于Peer的链,如果是,则通过Peer更新本地链。
2 V8 A/ w+ @6 o* D( |' X! q只看handle.Msg()的NewBlockMsg相关的部分。- g, h, G0 p. ~& c
case msg.Code == NewBlockMsg:
. r, V6 W" C% v: H6 a        // Retrieve and decode the propagated block
$ Q1 M" w( |8 m4 o3 `& g8 g        // 收到新区块,解码,赋值接收数据, O4 _; Q! M8 v( C$ `
        var request newBlockData6 K( d2 ^: D. ]' K( a
        if err := msg.Decode(&request); err != nil {
* v; _2 Z+ R2 z- _  T                return errResp(ErrDecode, "%v: %v", msg, err)% q: _* F% d9 ?. r
        }7 [# e6 v7 ~8 |; b2 s
        request.Block.ReceivedAt = msg.ReceivedAt' R5 C5 U7 p1 `5 A) v
        request.Block.ReceivedFrom = p2 I# Y2 C" V  d& u: z
        // Mark the peer as owning the block and schedule it for import1 y: p0 t0 X5 B0 ]( s
        // 标记peer知道这个区块
) l+ H: b* Q6 b        p.MarkBlock(request.Block.Hash())& d& J+ ^4 D* k4 H  x! D+ V
        // 为啥要如队列?已经得到完整的区块了1 }5 N$ n, ]2 s
        // 答:存入fetcher的优先级队列,fetcher会从队列中选取当前高度需要的块. K* C% o( x5 E1 {( Z( w9 P% F
        pm.fetcher.Enqueue(p.id, request.Block)
+ G, r1 b8 q+ X- ]0 \) l        // Assuming the block is importable by the peer, but possibly not yet done so,
. z0 g$ W( V4 o. z        // calculate the head hash and TD that the peer truly must have.$ T, z) z* F, ]5 C
        // 截止到parent区块的头和难度- v) t+ F, d( R# k! Y3 r
        var (
; E9 A/ ~$ ^9 q! b' I                trueHead = request.Block.ParentHash()* c* X& ~+ s5 e% }
                trueTD   = new(big.Int).Sub(request.TD, request.Block.Difficulty())* S0 v6 c/ P( e& L. [$ q! J
        )/ {: C5 |9 ^3 w. w% [) [3 Y9 K6 o3 D2 T
        // Update the peers total difficulty if better than the previous
6 b7 }* w6 |+ h! n% W        // 如果收到的块的难度大于peer之前的,以及自己本地的,就去和这个peer同步" E5 ]$ Y8 \+ l9 a- |
        // 问题:就只用了一下块里的hash指,为啥不直接使用这个块呢,如果这个块不能用,干嘛不少发送些数据,减少网络负载呢。* P. O) R& z% M3 q$ Y8 V
        // 答案:实际上,这个块加入到了优先级队列中,当fetcher的loop检查到当前下一个区块的高度,正是队列中有的,则不再向peer请求
- I. U0 ?3 e# x) U% A        // 该区块,而是直接使用该区块,检查无误后交给block chain执行insertChain+ d/ J) @" L3 f2 @5 q
        if _, td := p.Head(); trueTD.Cmp(td) > 0 {
# K, w* w5 L- d/ c3 ^2 H( @7 w0 T# A                p.SetHead(trueHead, trueTD)" t. g4 C  L2 Z/ }" e' l
                // Schedule a sync if above ours. Note, this will not fire a sync for a gap of
  z8 K( N5 E1 w8 u$ \3 H: O                // a singe block (as the true TD is below the propagated block), however this$ n7 j/ G/ d$ ~1 l6 s" Z
                // scenario should easily be covered by the fetcher., b# P# @0 ?6 V2 c" G
                currentBlock := pm.blockchain.CurrentBlock()8 |. V7 e6 V' `6 A) h
                if trueTD.Cmp(pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64())) > 0 {
/ ?6 L) o+ I5 Z( F" h" T                        go pm.synchronise(p)# s3 y1 r& G! {' I: K; S& I! c/ W/ J
                }7 }0 Z! L' i+ X2 _; A9 I+ q
        }
/ B- ^% u" L! @//------------------------ 以上 handleMsg9 _# O# V# m- k; c
// Enqueue tries to fill gaps the the fetcher's future import queue.
& r4 S" H  U7 D" V7 X// 发给inject通道,当前协程在handleMsg,通过通道发送给fetcher的协程处理9 x8 r+ D- F: c3 a2 ^2 {# `
func (f *Fetcher) Enqueue(peer string, block *types.Block) error {
$ T- l9 M1 A! u8 A- i8 ~" q        op := &inject{- j# \/ b0 n% Y1 w7 ?  V% [( l
                origin: peer,
0 S# b, C9 ^& D) M3 \! p+ L                block:  block,; C% n3 v( R8 a. u* R2 @2 j
        }- V8 ]; H' ^$ l
        select {
: |5 a  R3 ^! \7 n' W6 H. v        case f.inject  blockLimit {
- R1 P  b1 g2 ~                log.Debug("Discarded propagated block, exceeded allowance", "peer", peer, "number", block.Number(), "hash", hash, "limit", blockLimit)' a! w) G; @3 j! @8 k* s2 N6 n
                propBroadcastDOSMeter.Mark(1)
1 Z, B% u+ W2 i* P7 _                f.forgetHash(hash)9 o: \& l' h2 G/ @5 \7 n3 Z
                return
: x* Z% J' }: r        }& z9 r6 _' o' u: {# ?
        // Discard any past or too distant blocks! V1 J" |! U1 S8 n9 r
        // 高度检查:未来太远的块丢弃
1 E& w+ R' i0 L" R- {! b        if dist := int64(block.NumberU64()) - int64(f.chainHeight()); dist  maxQueueDist {
( c. w3 ?3 P/ ~8 T7 l2 _% B) m$ k                log.Debug("Discarded propagated block, too far away", "peer", peer, "number", block.Number(), "hash", hash, "distance", dist). @4 I( ]- n5 I
                propBroadcastDropMeter.Mark(1)
) Z! `% G4 @2 y+ v3 U8 {                f.forgetHash(hash)& H( S9 w5 }" a$ l$ ~* ^( K
                return
7 E3 j+ ]# r6 o9 e: a* j        }
: Z1 ?, \! R7 K' z        // Schedule the block for future importing
# d- W+ S) K3 q: b        // 块先加入优先级队列,加入链之前,还有很多要做
7 s4 i; P  K. `- h8 ^+ k        if _, ok := f.queued[hash]; !ok {
2 c1 Z( M, L* n9 i                op := &inject{
3 d3 e, g7 G6 i, z4 o                        origin: peer,  c( x# S; ?' f
                        block:  block,
* p; T, {; ^% i. Y/ o                }
; I- W# P& i+ g                f.queues[peer] = count$ F" t: h. d( k# C2 i1 X( c$ [( B
                f.queued[hash] = op0 d6 l' D9 I8 h. s! g
                f.queue.Push(op, -float32(block.NumberU64()))$ }6 |9 q$ C" a
                if f.queueChangeHook != nil {8 u. k$ Q' u: ~! W
                        f.queueChangeHook(op.block.Hash(), true)
: ?' {& e) V: I9 v+ R0 w5 r8 T2 c% _$ P                }6 J# N% s6 y$ Z! [/ y
                log.Debug("Queued propagated block", "peer", peer, "number", block.Number(), "hash", hash, "queued", f.queue.Size())
. X+ R3 q. |6 F' `' f        }$ |: u" w9 _: r: {- U& Y/ h0 M
}5 i* s5 d: ]$ r- E$ O  ^
fetcher队列处理
5 O1 A+ a4 C8 O4 A本节我们看看,区块加入队列后,fetcher如何处理区块,为何不直接校验区块,插入到本地链?4 U" T  E5 ]" A$ H7 r
由于以太坊又Uncle的机制,节点可能收到老一点的一些区块。另外,节点可能由于网络原因,落后了几个区块,所以可能收到“未来”的一些区块,这些区块都不能直接插入到本地链。0 p3 |6 ]: k- z% ~- M" x5 X
区块入的队列是一个优先级队列,高度低的区块会被优先取出来。fetcher.loop是单独协程,不断运转,清理fecther中的事务和事件。首先会清理正在fetching的区块,但已经超时。然后处理优先级队列中的区块,判断高度是否是下一个区块,如果是则调用f.insert()函数,校验后调用BlockChain.InsertChain(),成功插入后,广播新区块的Hash) u9 B4 A: \2 [# Z2 o8 r6 d: T
// Loop is the main fetcher loop, checking and processing various notification  B6 V! V; P2 B9 }" n9 n
// events.
0 s6 c: I8 g& s- ^func (f *Fetcher) loop() {
+ J2 q$ Z% z9 A( ~; K1 W# E8 j        // Iterate the block fetching until a quit is requested
5 `) r: h3 a! m$ D        fetchTimer := time.NewTimer(0). f5 Y" M& u* i: X7 r$ a
        completeTimer := time.NewTimer(0)5 U/ ^# D% i5 T$ {$ v) l
        for {  ^4 i4 L; m8 d5 O0 h
                // Clean up any expired block fetches  n5 t$ |. @+ S: |
                // 清理过期的区块
# N, q8 v, W2 l8 s3 V1 }5 N                for hash, announce := range f.fetching {# O  r0 B/ X  [- [) b2 i
                        if time.Since(announce.time) > fetchTimeout {
$ L) Y% J, E0 H* W2 t  m4 n                                f.forgetHash(hash)
! k6 }- R( M# @3 c; i" ?/ }                        }
% U8 Q: G! L$ L# l5 L/ H) r$ N                }
, l6 Y( X1 A' M- o/ D) }- W                // Import any queued blocks that could potentially fit
* i" _$ G  f( h; c4 F                // 导入队列中合适的块
7 _9 Y6 O0 |- ]9 |( }/ Z5 D( w                height := f.chainHeight()
$ z$ N8 r- L+ E# W5 ^# |                for !f.queue.Empty() {
) M" m% s. }$ s$ x/ T; t. g                        op := f.queue.PopItem().(*inject)
% M" q; W6 A% R: _' r8 Y                        hash := op.block.Hash()* H5 [# X, j2 y+ z/ A6 {  P
                        if f.queueChangeHook != nil {9 F0 S% n, T6 B# D; L9 D) a, n
                                f.queueChangeHook(hash, false), h$ k6 V$ f( e* i: ^1 [9 X
                        }. [/ {5 |8 M3 u# G6 r( \: ?
                        // If too high up the chain or phase, continue later
0 @- ~* [; }7 Z5 T% p, ~# P                        // 块不是链需要的下一个块,再入优先级队列,停止循环/ z7 r; q- P7 q( u1 s' X
                        number := op.block.NumberU64()) S3 ~3 e  W$ H) ?5 I
                        if number > height+1 {
; _* |/ O2 S( J* M" u8 z( Z- I6 a8 P                                f.queue.Push(op, -float32(number))* q4 _5 n$ W- s3 W3 l
                                if f.queueChangeHook != nil {
& T9 ^. l: R$ D. ^- r# _/ ?, U; ]                                        f.queueChangeHook(hash, true)
9 E$ m( M5 L; J; J; S                                }, E* o8 i! e' @( Q3 {0 B, v
                                break) w3 ?- |7 I+ L
                        }
9 Z" R( H3 [5 K# j6 Q4 }                        // Otherwise if fresh and still unknown, try and import3 m: T/ e+ N: N
                        // 高度正好是我们想要的,并且链上也没有这个块
' M4 I* w4 W" ?- P" Y) q0 B                        if number+maxUncleDist
& v6 g2 `, T$ y- k( S4 D' Qfunc (f *Fetcher) insert(peer string, block *types.Block) {2 C) i. m; u$ o5 a& i. B8 H
        hash := block.Hash()
- P: K# J$ J8 ?. r        // Run the import on a new thread5 M7 f0 m& H* x) x6 u- w  B
        log.Debug("Importing propagated block", "peer", peer, "number", block.Number(), "hash", hash)2 l, V" \+ `6 x2 }
        go func() {
. {# _1 z* @+ m7 A0 v% L% |# m7 ?8 n                defer func() { f.done
; s! w" ]/ d+ H$ W2 A3 FNewBlockHashesMsg的处理: Q: M2 e5 t0 B1 I3 @$ s
本节介绍NewBlockHashesMsg的处理,其实,消息处理是简单的,而复杂一点的是从Peer哪获取完整的区块,下节再看。. x. w1 u2 K* }$ b
流程如下:  n* @/ A$ i) V" T5 p# V
对消息进行RLP解码,然后标记Peer已经知道此区块。寻找出本地区块链不存在的区块Hash值,把这些未知的Hash通知给fetcher。fetcher.Notify记录好通知信息,塞入notify通道,以便交给fetcher的协程。fetcher.loop()会对notify中的消息进行处理,确认区块并非DOS攻击,然后检查区块的高度,判断该区块是否已经在fetching或者comleting(代表已经下载区块头,在下载body),如果都没有,则加入到announced中,触发0s定时器,进行处理。
; W" o4 d) V  ?, M, B& K关于announced下节再介绍。
6 {# \/ ~0 B/ s% V! ?" U+ @# _" j6 u" X. |! Q( H& [
// handleMsg()部分1 r% |' S/ \. X) X. U' h
case msg.Code == NewBlockHashesMsg:& Y: P8 q: f( ^) y/ T0 r0 @
        var announces newBlockHashesData
: G. D3 F+ g9 k& o' R  O' Y( B# ?        if err := msg.Decode(&announces); err != nil {
+ A5 k% b: |: P- P) e$ `1 E                return errResp(ErrDecode, "%v: %v", msg, err)
5 C! Q  c* Y9 m% {( m        }
/ T9 Y4 w, v3 `# V        // Mark the hashes as present at the remote node, `0 q: f& G6 s: p, s% W
        for _, block := range announces {
; p+ E% K+ W' l/ r$ W                p.MarkBlock(block.Hash)2 ~; V8 K8 n* p5 ]
        }( X4 x$ `  g- a3 c3 C
        // Schedule all the unknown hashes for retrieval: l# h% \0 X5 s/ M1 o" B( x
        // 把本地链没有的块hash找出来,交给fetcher去下载5 D; X/ x0 U, _* l( r
        unknown := make(newBlockHashesData, 0, len(announces))% v, g& p4 s# N
        for _, block := range announces {
9 ]0 X# H: W% ], F+ ]" G$ N+ i                if !pm.blockchain.HasBlock(block.Hash, block.Number) {
! n" q9 r7 U/ S  v8 {1 {2 `0 P; {                        unknown = append(unknown, block)
- o0 _# U4 b6 a( @& k5 D                }
7 `* h8 }8 _3 R( G: k% W0 U        }
( _$ O6 h) h+ o  }        for _, block := range unknown {8 o# x  A' R" ?6 k: K4 v  N
                pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestOneHeader, p.RequestBodies)
1 N' Z$ J3 G$ `        }
/ h( L$ q2 _4 }' Y' s' D& o& l// Notify announces the fetcher of the potential availability of a new block in+ D; z, |0 N( Z
// the network.
8 v" s/ O6 X' B) [$ x5 |// 通知fetcher(自己)有新块产生,没有块实体,有hash、高度等信息
( U' _8 d9 \& ^0 `func (f *Fetcher) Notify(peer string, hash common.Hash, number uint64, time time.Time,8 r9 H/ E+ j! R* |7 v! D
        headerFetcher headerRequesterFn, bodyFetcher bodyRequesterFn) error {' x( |3 W  T9 O$ m7 \+ j
        block := &announce{
# W2 R( P3 S# a$ X                hash:        hash,
1 s$ W4 j  C3 u  t                number:      number,# \  @' @" W; N& T, x/ l& c6 J
                time:        time,3 \5 n9 Z  u8 I) |1 A
                origin:      peer,
% `9 }7 K/ a+ K: P                fetchHeader: headerFetcher,
# h* Z3 S+ c/ K8 h                fetchBodies: bodyFetcher,: n4 g/ v$ g5 ^! z2 V
        }* x9 q- k  F! E/ c* [6 O' c
        select {
4 h% M4 T, N7 j: k        case f.notify  hashLimit {9 X* @" Y% [! C7 f9 Y9 V0 v7 A
                log.Debug("Peer exceeded outstanding announces", "peer", notification.origin, "limit", hashLimit)
0 [8 F2 o* s  I; A; X" ^                propAnnounceDOSMeter.Mark(1)8 _# J' e% t8 |. a1 S/ r; c
                break- D- s# m( y: k) |) r( @
        }
9 o5 T& G, j+ t% H; d+ |        // If we have a valid block number, check that it's potentially useful
- Y4 `# h# H) G; N7 z: \        // 高度检查
4 S8 i% v; |" R6 ~% h        if notification.number > 0 {2 h$ u2 {# v! n3 ^
                if dist := int64(notification.number) - int64(f.chainHeight()); dist  maxQueueDist {
: V& d" j( n6 @" M3 I2 x8 w                        log.Debug("Peer discarded announcement", "peer", notification.origin, "number", notification.number, "hash", notification.hash, "distance", dist)8 E/ P5 I% a# T% G+ {9 y
                        propAnnounceDropMeter.Mark(1)
+ }4 }6 F! k5 S, Y& A& o# j1 t  }                        break
5 l+ r# `" j9 X/ r" D2 v                }
* v" r1 w0 x5 u/ e        }5 Y; \5 g3 O5 I; [$ U0 \
        // All is well, schedule the announce if block's not yet downloading
6 h0 A, @5 r& H" O, X        // 检查是否已经在下载,已下载则忽略* x: l' |$ Q7 r
        if _, ok := f.fetching[notification.hash]; ok {( I) c- O4 z; _3 m. {. W6 _& x
                break
3 h* R1 ?; J8 \        }, z  |3 h+ @. T& S3 {
        if _, ok := f.completing[notification.hash]; ok {
. {( F. ~. b* _6 l, d8 X                break
' Y9 E6 ]- \; ^/ J) Y        }
: k9 y& d' [: K6 s% ?% l        // 更新peer已经通知给我们的区块数量3 z$ [# e. }* X5 f0 }2 F: W" }0 ?: H
        f.announces[notification.origin] = count
& c; c2 p. o6 Q        // 把通知信息加入到announced,供调度! o9 d9 u3 X2 U, S: `: V! J
        f.announced[notification.hash] = append(f.announced[notification.hash], notification)1 J7 u( G9 R; |* }$ x9 a# p2 J6 N1 Z
        if f.announceChangeHook != nil && len(f.announced[notification.hash]) == 1 {
( \- q3 F$ L9 `# Z; l% {                f.announceChangeHook(notification.hash, true)2 y8 R' [3 C7 M. X/ Q. z3 p) L
        }
( R# [. o6 {- p5 q4 _7 i$ F# [" J        if len(f.announced) == 1 {- n6 s0 j2 y+ \; S: l
                // 有通知放入到announced,则重设0s定时器,loop的另外一个分支会处理这些通知. r5 x/ O  O; |! L( k$ M. y% C
                f.rescheduleFetch(fetchTimer): p' L' t' d3 V7 _' m# k' t/ x* j
        }* c1 I/ L- m/ Y+ p8 @
fetcher获取完整区块
' X. f8 j" s; Z9 B& p本节介绍fetcher获取完整区块的过程,这也是fetcher最重要的功能,会涉及到fetcher至少80%的代码。单独拉放一大节吧。
4 \& P7 `% |. w- b7 kFetcher的大头3 F6 a& M* b8 A3 J
Fetcher最主要的功能就是获取完整的区块,然后在合适的实际交给InsertChain去验证和插入到本地区块链。我们还是从宏观入手,看Fetcher是如何工作的,一定要先掌握好宏观,因为代码层面上没有这么清晰。
6 C% n7 c7 l0 \  s: I0 q, Y% s宏观! s& w9 u8 ~7 [: W
首先,看两个节点是如何交互,获取完整区块,使用时序图的方式看一下,见图6,流程很清晰不再文字介绍。
' V( ?4 T; D6 H7 Z% y% l; c
. f; i: R5 w) v0 t2 e再看下获取区块过程中,fetcher内部的状态转移,它使用状态来记录,要获取的区块在什么阶段,见图7。我稍微解释一下:$ U  k% G" S! F+ }
收到NewBlockHashesMsg后,相关信息会记录到announced,进入announced状态,代表了本节点接收了消息。announced由fetcher协程处理,经过校验后,会向给他发送消息的Peer发送请求,请求该区块的区块头,然后进入fetching状态。获取区块头后,如果区块头表示没有交易和uncle,则转移到completing状态,并且使用区块头合成完整的区块,加入到queued优先级队列。获取区块头后,如果区块头表示该区块有交易和uncle,则转移到fetched状态,然后发送请求,请求交易和uncle,然后转移到completing状态。收到交易和uncle后,使用头、交易、uncle这3个信息,生成完整的区块,加入到队列queued。
+ B( G4 T' t" Y9 y2 Y: B7 X
5 }+ ?7 W) i1 _, d; \% v4 m. ?- F) g
, e* j0 z6 r2 ?7 J3 p; q微观
% m( j) V# O7 R- X2 f3 w# t9 {接下来就是从代码角度看如何获取完整区块的流程了,有点多,看不懂的时候,再回顾下上面宏观的介绍图。4 ^" k1 t* ]$ }; ]6 K7 k' A
首先看Fetcher的定义,它存放了通信数据和状态管理,捡加注释的看,上文提到的状态,里面都有。
: E4 ~! V( z5 C, ?, u# n' B; s// Fetcher is responsible for accumulating block announcements from various peers3 W7 S/ w+ {# p* T' J5 n
// and scheduling them for retrieval.8 F, F  e" K( p: C# V  r
// 积累块通知,然后调度获取这些块4 R8 d6 Q7 R. R( b6 p+ q+ D
type Fetcher struct {. e' K) ^: R% |% X$ J
        // Various event channels
6 y+ ]) m% }  \% W% f$ o9 B    // 收到区块hash值的通道
9 s* A6 M& M# e+ @$ N! N7 N        notify chan *announce5 i2 w, d) G! I
    // 收到完整区块的通道4 {; o  R* ?  w7 B9 ], n
        inject chan *inject2 t9 d; }4 A  w: H) o
        blockFilter chan chan []*types.Block
" X$ o1 j* D1 ^6 x- `        // 过滤header的通道的通道- e  y. Y- z% |' d) j/ v% Q2 W
        headerFilter chan chan *headerFilterTask' {+ e7 ?* e! ~9 z
        // 过滤body的通道的通道
0 D% {# d' @1 P+ h& }        bodyFilter chan chan *bodyFilterTask0 \. r9 I' J- i7 S* n% c
        done chan common.Hash
4 S, q& A" ^( w3 B" M' |) G5 n, N        quit chan struct{}+ B* U: \0 i! Z4 N
        // Announce states1 Y. J5 j/ A3 K
        // Peer已经给了本节点多少区块头通知6 B6 @. @3 }' [5 G1 L3 B1 c/ I" f6 M
        announces map[string]int // Per peer announce counts to prevent memory exhaustion
" b- l2 y( Z1 a/ U        // 已经announced的区块列表& P. g) X/ c" L, Z  U( R3 ~8 X: x
        announced map[common.Hash][]*announce // Announced blocks, scheduled for fetching
- t& o% l- W- N1 m        // 正在fetching区块头的请求% P( w1 ]8 T) W# g8 A
        fetching map[common.Hash]*announce // Announced blocks, currently fetching4 G+ a2 j  n6 v$ w( ?8 L
        // 已经fetch到区块头,还差body的请求,用来获取body! b0 x+ s% |; _8 ^4 @& O7 S
        fetched map[common.Hash][]*announce // Blocks with headers fetched, scheduled for body retrieval
& O( H9 U9 L4 v3 }4 J% m/ G) l2 Z        // 已经得到区块头的% P0 Z( Z# L3 ]8 e5 @/ u
        completing map[common.Hash]*announce // Blocks with headers, currently body-completing$ ~$ g4 M9 T8 t2 }5 Z
        // Block cache( w# P5 L0 U$ l+ x" J' f
        // queue,优先级队列,高度做优先级
% u3 P; u5 q/ n1 ~. c+ n        // queues,统计peer通告了多少块
6 y0 z" c) d! m( b5 Z( R# E  E        // queued,代表这个块如队列了,
% p' T! P; }% N8 _! m        queue  *prque.Prque            // Queue containing the import operations (block number sorted)* c- l5 ~( G  S% l+ {
        queues map[string]int          // Per peer block counts to prevent memory exhaustion
! P, y% Z7 B2 X7 Y, G7 Q& H        queued map[common.Hash]*inject // Set of already queued blocks (to dedupe imports)5 i' a' M$ P+ T7 ]
        // Callbacks
6 V4 a. F8 ~* d' ?        getBlock       blockRetrievalFn   // Retrieves a block from the local chain
0 A! \! E/ R1 M1 }        verifyHeader   headerVerifierFn   // Checks if a block's headers have a valid proof of work,验证区块头,包含了PoW验证. S# h) B8 s5 x0 K8 J$ [$ v& x
        broadcastBlock blockBroadcasterFn // Broadcasts a block to connected peers,广播给peer' R+ v" s. k  P# o" L
        chainHeight    chainHeightFn      // Retrieves the current chain's height1 ]% Y) {; Q! U7 u$ \0 p4 M
        insertChain    chainInsertFn      // Injects a batch of blocks into the chain,插入区块到链的函数
5 I1 h9 d! p7 d# W        dropPeer       peerDropFn         // Drops a peer for misbehaving# N1 q' S/ ?: |1 S
        // Testing hooks* d6 l# q5 v( l* {4 v+ N: o
        announceChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a hash from the announce list
1 T7 V* v; r1 V        queueChangeHook    func(common.Hash, bool) // Method to call upon adding or deleting a block from the import queue
; h8 d' f, O4 m; ]$ h6 V" J        fetchingHook       func([]common.Hash)     // Method to call upon starting a block (eth/61) or header (eth/62) fetch
6 U: I: n  {6 G$ t; ]        completingHook     func([]common.Hash)     // Method to call upon starting a block body fetch (eth/62)
- t/ A/ \) h$ z( R) Z8 ]2 M* T; T. O        importedHook       func(*types.Block)      // Method to call upon successful block import (both eth/61 and eth/62)
1 t/ w' |- f4 C4 @; |( @}9 j! m" y* Z% T* P8 H6 A
NewBlockHashesMsg消息的处理前面的小节已经讲过了,不记得可向前翻看。这里从announced的状态处理说起。loop()        中,fetchTimer超时后,代表了收到了消息通知,需要处理,会从announced中选择出需要处理的通知,然后创建请求,请求区块头,由于可能有很多节点都通知了它某个区块的Hash,所以随机的从这些发送消息的Peer中选择一个Peer,发送请求的时候,为每个Peer都创建了单独的协程。% P) d% X5 Y0 o) a2 a/ [7 @" L
case  arriveTimeout-gatherSlack {
% M' U+ m  L6 f2 O# T" ?                        // Pick a random peer to retrieve from, reset all others
3 V, ]5 n& r3 V6 a! J4 O                        // 可能有很多peer都发送了这个区块的hash值,随机选择一个peer
! Z5 _  z; [. c- v, u                        announce := announces[rand.Intn(len(announces))]
6 O  k% H9 F7 c8 S                        f.forgetHash(hash)& T3 u. B" N6 u$ q# p
                        // If the block still didn't arrive, queue for fetching
. h/ t" ~+ J" T3 C( c: e/ Z! o1 l# E1 [                        // 本地还没有这个区块,创建获取区块的请求. J4 l2 Y+ |: J7 m2 n
                        if f.getBlock(hash) == nil {3 `! Z) \" p7 U  n8 X0 J0 W6 ]
                                request[announce.origin] = append(request[announce.origin], hash)# x) k# w$ z( F' q
                                f.fetching[hash] = announce
/ q8 \9 |3 P3 L6 q, [                        }: g! d  l- |- V$ G5 q
                }2 E5 y- E# h0 v* Y, v9 e8 L+ s
        }3 _2 w0 @+ J% `5 u" k
        // Send out all block header requests3 l. D+ s& X& E
        // 把所有的request发送出去4 e% f  Q: k+ q1 h
        // 为每一个peer都创建一个协程,然后请求所有需要从该peer获取的请求( X0 s1 s8 d# D
        for peer, hashes := range request {; l8 X2 O  h/ Z# O% Z4 Z
                log.Trace("Fetching scheduled headers", "peer", peer, "list", hashes)
+ B# y1 D. b' `$ t* n                // Create a closure of the fetch and schedule in on a new thread
/ t9 h7 l. J3 ~4 P                fetchHeader, hashes := f.fetching[hashes[0]].fetchHeader, hashes
) }, [: k1 @$ ]3 p                go func() {  q* w. W$ q9 _" G
                        if f.fetchingHook != nil {: a) @" b! o8 P
                                f.fetchingHook(hashes)
& l& \6 M; Y+ U$ b8 s( {                        }  m( Z* X" `: @# r5 S$ u0 ~- z
                        for _, hash := range hashes {
& X, Q$ @' T; h" a1 u0 S; b3 \" F                                headerFetchMeter.Mark(1)
; f! {. c! m/ h/ [8 r                                fetchHeader(hash) // Suboptimal, but protocol doesn't allow batch header retrievals
" C3 X. |# Y3 ^9 h3 i8 N$ O% ~4 q                        }
' G* G1 S8 W2 ~& z. b* U8 u5 h. f                }()( K' A3 H9 K( C- `! y
        }) o. l* j- u7 C5 B
        // Schedule the next fetch if blocks are still pending
2 r4 |" U, x& R1 |        f.rescheduleFetch(fetchTimer); S, {+ }) ?& D! G) o& @. f1 N
从Notify的调用中,可以看出,fetcherHeader()的实际函数是RequestOneHeader(),该函数使用的消息是GetBlockHeadersMsg,可以用来请求多个区块头,不过fetcher只请求一个。
0 G# f$ k' Y% |  Bpm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestOneHeader, p.RequestBodies)
" U7 W! `0 w4 e: c8 g6 U0 w// RequestOneHeader is a wrapper around the header query functions to fetch a! C7 k: P$ N9 d/ p
// single header. It is used solely by the fetcher.  e( [/ j( u7 c% g- C
func (p *peer) RequestOneHeader(hash common.Hash) error {6 K; B2 O2 {0 [: W
        p.Log().Debug("Fetching single header", "hash", hash)
; G& S! V/ M  v, B6 X% a        return p2p.Send(p.rw, GetBlockHeadersMsg, &getBlockHeadersData{Origin: hashOrNumber{Hash: hash}, Amount: uint64(1), Skip: uint64(0), Reverse: false})
4 F' S3 \8 M+ l/ y: y( e}1 ^$ b- I' Q+ I, y, [+ Z  {
GetBlockHeadersMsg的处理如下:因为它是获取多个区块头的,所以处理起来比较“麻烦”,还好,fetcher只获取一个区块头,其处理在20行~33行,获取下一个区块头的处理逻辑,这里就不看了,最后调用SendBlockHeaders()将区块头发送给请求的节点,消息是BlockHeadersMsg。
  h; L$ R2 i1 C6 a  t. y6 J. d* Q···4 A- n5 S& v/ F7 v5 x& Y
// handleMsg()3 m5 n! z; r' |8 D; w
// Block header query, collect the requested headers and reply
8 Y5 g% S9 P" W3 G6 \7 u" Lcase msg.Code == GetBlockHeadersMsg:7 I# E1 Y2 k# C- g$ x
// Decode the complex header query
7 ?$ k& o) G# D* y6 \8 D0 g/ xvar query getBlockHeadersData
, }  C! _  w) M* @' N+ |5 j7 wif err := msg.Decode(&query); err != nil {
/ x9 z6 v# R$ a! o# dreturn errResp(ErrDecode, “%v: %v”, msg, err)7 ]" e3 P0 l, _. ]1 N* f, M* {
}
" M+ A: S7 v0 m  C6 m. f: lhashMode := query.Origin.Hash != (common.Hash{})
/ ~# Q% C* y' m1 w% j// Gather headers until the fetch or network limits is reached) M# v* g/ j3 ]
// 收集区块头,直到达到限制' b" E' o' o" \, l6 w& H% }
var (9 U5 a8 B0 p, }1 L4 w( K
        bytes   common.StorageSize* X7 H$ h) P, n& m5 _3 `% Q5 U
        headers []*types.Header
: A8 C( f3 B5 E0 U! C        unknown bool
9 ?+ e; j& j9 `, ]5 N2 X0 H" {)
. w1 q: b' Q6 t' @! }// 自己已知区块 && 少于查询的数量 && 大小小于2MB && 小于能下载的最大数量0 m1 `2 s5 Q4 R7 T8 N; \1 O2 @; [
for !unknown && len(headers)
8 k8 O4 K* B, {6 |; S& R: A`BlockHeadersMsg`的处理很有意思,因为`GetBlockHeadersMsg`并不是fetcher独占的消息,downloader也可以调用,所以,响应消息的处理需要分辨出是fetcher请求的,还是downloader请求的。它的处理逻辑是:fetcher先过滤收到的区块头,如果fetcher不要的,那就是downloader的,在调用`fetcher.FilterHeaders`的时候,fetcher就将自己要的区块头拿走了。8 |2 Y: u7 }; f& M/ E+ W; ]& K6 d  m
// handleMsg()
$ q7 K& T3 ~7 E6 ocase msg.Code == BlockHeadersMsg:: n/ ^6 Y7 e0 j  m8 B& ^4 }7 O/ N
// A batch of headers arrived to one of our previous requests
5 W+ m  ]7 n0 D4 Zvar headers []*types.Header& W: _4 S6 e' M# ^* Y% g/ x( e
if err := msg.Decode(&headers); err != nil {
2 I" f) X  N4 r4 Jreturn errResp(ErrDecode, “msg %v: %v”, msg, err)# }; J' x& L# l: _# ?4 o: t1 n2 ~
}
3 R( V$ ^; y, e8 D// If no headers were received, but we’re expending a DAO fork check, maybe it’s that
2 R& [, l- y) l9 Z7 H1 |// 检查是不是当前DAO的硬分叉. I( K" H% q9 g/ M3 K8 s
if len(headers) == 0 && p.forkDrop != nil {
1 H  i( W: e  W6 ~0 _4 M// Possibly an empty reply to the fork header checks, sanity check TDs
# G7 [8 ?: ?3 j, T. J3 NverifyDAO := true
7 M  E( n' o2 P) ^  v, C        // If we already have a DAO header, we can check the peer's TD against it. If  D: H( N" a6 r$ q0 Q2 K
        // the peer's ahead of this, it too must have a reply to the DAO check4 _8 q9 O% j5 i0 ~
        if daoHeader := pm.blockchain.GetHeaderByNumber(pm.chainconfig.DAOForkBlock.Uint64()); daoHeader != nil {; Z5 s2 s0 P  p/ v* Y6 I
                if _, td := p.Head(); td.Cmp(pm.blockchain.GetTd(daoHeader.Hash(), daoHeader.Number.Uint64())) >= 0 {2 r4 a- ~# M% n& J# c
                        verifyDAO = false4 N/ A' v8 d- x* \% s3 ?6 o& x
                }
+ a: ]* ~! v1 G) |2 Q1 w4 n" H        }
9 s/ G5 h- n1 n! r" F; m1 ^        // If we're seemingly on the same chain, disable the drop timer
+ Z9 _* k! k9 B' ^6 t1 y        if verifyDAO {3 Y7 b2 c/ ?% \7 _2 W+ e9 z/ d
                p.Log().Debug("Seems to be on the same side of the DAO fork")
  V$ ^6 d! t$ n' I                p.forkDrop.Stop()) ~9 C! V" I: y6 M. M4 }1 c) U% ~
                p.forkDrop = nil
, X8 j. B9 c3 I% W! `6 e* }                return nil
4 Y, j$ v& ]" z/ t        }
- z) n( o" ^2 n# f5 r}
) D5 ?1 A. a4 t9 D( J// Filter out any explicitly requested headers, deliver the rest to the downloader
& a: w7 r1 n6 ~( \  {// 过滤是不是fetcher请求的区块头,去掉fetcher请求的区块头再交给downloader
+ S9 G/ d5 m9 e6 Z  o9 j6 afilter := len(headers) == 1( \7 ?3 D; T: ^$ r, j/ j3 L
if filter {
- f8 \( q$ R8 u' R8 L/ v8 {        // If it's a potential DAO fork check, validate against the rules8 I% \! Y  x3 A3 v) w; ]
        // 检查是否硬分叉
+ x( b, U: p6 f( G        if p.forkDrop != nil && pm.chainconfig.DAOForkBlock.Cmp(headers[0].Number) == 0 {
5 E! N' `# V" N0 h- v$ s# v; w                // Disable the fork drop timer6 t6 [" H/ r( w; B" ?
                p.forkDrop.Stop(). H) u9 t- ~  l6 f
                p.forkDrop = nil$ ?) G3 m  P) K* H
                // Validate the header and either drop the peer or continue
9 A" q- \- t  E' x" Z# R* z, o) q; T                if err := misc.VerifyDAOHeaderExtraData(pm.chainconfig, headers[0]); err != nil {3 d4 S0 T. H9 S6 x
                        p.Log().Debug("Verified to be on the other side of the DAO fork, dropping")
6 k2 J* w# ~% W) S* u/ ~! z2 F                        return err: u* x, Y3 Y5 d: A" r5 S
                }
  H, ]& ~* l, c                p.Log().Debug("Verified to be on the same side of the DAO fork")  \4 q" V% g! B
                return nil
2 k- c# f6 I( o: f% c* n$ Z3 l/ E* \        }& `# D4 C  n; r% _2 K
        // Irrelevant of the fork checks, send the header to the fetcher just in case' C; G5 @1 c8 [  a$ N
        // 使用fetcher过滤区块头' |- T3 ]  `3 ]- ^
        headers = pm.fetcher.FilterHeaders(p.id, headers, time.Now())
' }: m2 k5 b8 A8 D& G0 C5 r}7 ]6 Z8 S" E. y2 Z
// 剩下的区块头交给downloader$ m& C* S$ ]% V( |
if len(headers) > 0 || !filter {
  t8 o! _" I- k- b# y        err := pm.downloader.DeliverHeaders(p.id, headers)/ F. d9 ]( j7 I5 @- r6 m- a
        if err != nil {
, v8 r# d$ w3 o2 e) r                log.Debug("Failed to deliver headers", "err", err)
1 |" Z( ~+ w' W. h; G0 p7 P8 o        }
' t/ r& R& |/ n" e& h  ?# a5 G4 [}: {) h$ s% _2 K4 [; p& j) t
`FilterHeaders()`是一个很有大智慧的函数,看起来耐人寻味,但实在妙。它要把所有的区块头,都传递给fetcher协程,还要获取fetcher协程处理后的结果。`fetcher.headerFilter`是存放通道的通道,而`filter`是存放包含区块头过滤任务的通道。它先把`filter`传递给了`headerFilter`,这样`fetcher`协程就在另外一段等待了,而后将`headerFilterTask`传入`filter`,`fetcher`就能读到数据了,处理后,再将数据写回`filter`而刚好被`FilterHeaders`函数处理了,该函数实际运行在`handleMsg()`的协程中。
" M% a2 L7 z% o每个Peer都会分配一个ProtocolManager然后处理该Peer的消息,但`fetcher`只有一个事件处理协程,如果不创建一个`filter`,fetcher哪知道是谁发给它的区块头呢?过滤之后,该如何发回去呢?
$ t) u0 F+ v1 a4 b8 {3 f  T0 W// FilterHeaders extracts all the headers that were explicitly requested by the fetcher,
; I. c* g! b1 ~- h- s// returning those that should be handled differently.
/ E0 e1 P" C5 ]/ H0 O" f// 寻找出fetcher请求的区块头* F" e9 s6 |9 D# h3 p: k
func (f *Fetcher) FilterHeaders(peer string, headers []*types.Header, time time.Time) []*types.Header {
7 X: s' u6 K" d" }# \& |8 q; Klog.Trace(“Filtering headers”, “peer”, peer, “headers”, len(headers))
* [8 V  R6 E4 N// Send the filter channel to the fetcher
( S) F8 w9 N# F3 T6 b4 @/ O// 任务通道" j, ~. p4 \  i5 j- Z9 U
filter := make(chan *headerFilterTask)
; q) t, p. d, b/ `# D  Cselect {
& o) A# W; d5 l0 {: E6 V// 任务通道发送到这个通道
' n2 ~' d# Q+ i. g: w# X. Fcase f.headerFilter
# U& s. ?8 e9 h( S$ O% p}1 e/ o. z5 N' Y
接下来要看f.headerFilter的处理,这段代码有90行,它做了一下几件事:
0 |" G1 ?: w4 w, \1. 从`f.headerFilter`取出`filter`,然后取出过滤任务`task`。
+ s3 c3 `( j+ A% @- _& l2. 它把区块头分成3类:`unknown`这不是分是要返回给调用者的,即`handleMsg()`, `incomplete`存放还需要获取body的区块头,`complete`存放只包含区块头的区块。遍历所有的区块头,填到到对应的分类中,具体的判断可看18行的注释,记住宏观中将的状态转移图。( g$ x5 d- A" r' c; I7 C4 w# v& x
3. 把`unknonw`中的区块返回给`handleMsg()`。  w: Z# _% i8 g, A7 ]2 g
4. 把` incomplete`的区块头获取状态移动到`fetched`状态,然后触发定时器,以便去处理complete的区块。# x% l* ^7 }3 V3 S2 Y* F
5. 把`compelete`的区块加入到`queued`。& G: K# f& ?! ^8 b
// fetcher.loop()# Z% [' x4 S* {* H! R/ L: |. a- }
case filter := % a+ z8 Z  z2 W: L+ B( ?
// Split the batch of headers into unknown ones (to return to the caller),1 _2 O- C9 R/ k- i, N- H1 [
// known incomplete ones (requiring body retrievals) and completed blocks.8 F0 n5 D; K) F# \
// unknown的不是fetcher请求的,complete放没有交易和uncle的区块,有头就够了,incomplete放
0 J+ \+ F6 U( A  S% `  K' F// 还需要获取uncle和交易的区块
" G  H2 K0 h' t, a6 Qunknown, incomplete, complete := []*types.Header{}, []*announce{}, []*types.Block{}# I. z& X3 {# D1 ^* @
// 遍历所有收到的header0 d2 `$ y2 S' V8 u, v
for _, header := range task.headers {
; d8 i: S( Q& ^: ?- S' `        hash := header.Hash()
/ E4 C5 j* B  L        // Filter fetcher-requested headers from other synchronisation algorithms
# e% y- i8 V# H        // 是正在获取的hash,并且对应请求的peer,并且未fetched,未completing,未queued7 L3 c3 }: K( E6 `) i( @" f
        if announce := f.fetching[hash]; announce != nil && announce.origin == task.peer && f.fetched[hash] == nil && f.completing[hash] == nil && f.queued[hash] == nil {
7 _7 P' C7 ]$ e5 M  Z+ S/ i                // If the delivered header does not match the promised number, drop the announcer! a0 Y% O) E( f% M, D
                // 高度校验,竟然不匹配,扰乱秩序,peer肯定是坏蛋。
% R' O, B8 o' C9 v! Z                if header.Number.Uint64() != announce.number {
  I) B  M6 x2 k% S$ X                        log.Trace("Invalid block number fetched", "peer", announce.origin, "hash", header.Hash(), "announced", announce.number, "provided", header.Number)
5 Q3 D5 D, _$ K                        f.dropPeer(announce.origin)% |) i* E6 U/ i* L2 x2 p
                        f.forgetHash(hash)
3 x! G2 f# s( o$ k  a! D7 k                        continue' [2 h& j& ~' r, c
                }
' e- L* _' t3 T5 ^) p; e                // Only keep if not imported by other means
/ h' [# B' }% l( u6 p! S5 u7 W# Q                // 本地链没有当前区块
2 \2 c& }7 N$ k) ?* \: m' Y                if f.getBlock(hash) == nil {0 y' q" k; v1 T! S8 P
                        announce.header = header) F% ^, v' x4 t6 i. I
                        announce.time = task.time
4 Q+ A7 ^2 |# H5 ]; A3 p2 A                        // If the block is empty (header only), short circuit into the final import queue
" o7 A5 S/ s9 A- c                        // 如果区块没有交易和uncle,加入到complete
2 l' E# ^* K/ V8 ?: h8 X                        if header.TxHash == types.DeriveSha(types.Transactions{}) && header.UncleHash == types.CalcUncleHash([]*types.Header{}) {
( c2 k. J3 r- b$ H# S                                log.Trace("Block empty, skipping body retrieval", "peer", announce.origin, "number", header.Number, "hash", header.Hash())/ n, I% f: t3 Q' H4 t) {1 o
                                block := types.NewBlockWithHeader(header)& D( N. v. F5 W
                                block.ReceivedAt = task.time* ~7 L2 I3 J. u; [2 `
                                complete = append(complete, block)
; U* I+ {1 Z  U$ m& i( h                                f.completing[hash] = announce
# Z0 z' B- ~6 ]# q, W! |                                continue
$ H6 W) Q6 ~0 a: Z                        }, d3 p2 u3 L: V
                        // Otherwise add to the list of blocks needing completion
2 q' T4 n+ x. D! J: j, m                        // 否则就是不完整的区块( s+ C- A& V% Z; r, O1 }
                        incomplete = append(incomplete, announce)
. U: h/ A- `, L/ f; {' L6 l                } else {* M/ q& S7 u$ D5 m9 g4 w& R* a
                        log.Trace("Block already imported, discarding header", "peer", announce.origin, "number", header.Number, "hash", header.Hash()), y% q( k% E& M# ]7 @2 K
                        f.forgetHash(hash)0 i. T( o2 m6 b# f& h8 j
                }
3 b' n; Z% U( t' f  Y& R" q        } else {
$ V6 E% a6 G& r3 [) S7 W                // Fetcher doesn't know about it, add to the return list
/ C; }1 S8 w. e2 L# \: x                // 没请求过的header
$ a- f3 T" g( ~1 y0 Q, n9 J                unknown = append(unknown, header)
& \! l0 C7 X+ [  r        }
% s8 Q1 n# H, s- J, \}# G  E; a: \2 ^; H! j8 T: G
// 把未知的区块头,再传递会filter& o3 ]+ y$ y+ x. e3 y( ^
headerFilterOutMeter.Mark(int64(len(unknown)))
1 P; j0 x" Y; e  V* w( ~1 aselect {4 {+ k* u. ~* i
case filter
, s, A8 Z8 k4 @; p9 h5 }4 E跟随状态图的转义,剩下的工作是`fetched`转移到`completing`        ,上面的流程已经触发了`completeTimer`定时器,超时后就会处理,流程与请求Header类似,不再赘述,此时发送的请求消息是`GetBlockBodiesMsg`,实际调的函数是`RequestBodies`。5 v% ~& Q+ N$ n6 `- W& G, v' a- b
// fetcher.loop()
& G# K' ]3 l/ `case
( j- h) z6 y" L$ }4 w/ J// 遍历所有待获取body的announce& V( L2 b2 a1 a' B* F. j8 ]% j9 i. g2 o
for hash, announces := range f.fetched {8 w, c3 w% B( \+ ^1 L/ u* b. s
        // Pick a random peer to retrieve from, reset all others
2 Y# ^5 o! u& B) h2 q0 J! E        // 随机选一个Peer发送请求,因为可能已经有很多Peer通知它这个区块了
2 l% o4 ?( K9 Y, K) d3 @$ i7 k4 m        announce := announces[rand.Intn(len(announces))]" w% W- [  N1 ]! c) ?
        f.forgetHash(hash); W: @( n: |3 W% g, H
        // If the block still didn't arrive, queue for completion
" U( a( v9 p6 r: |5 D, ]0 A        // 如果本地没有这个区块,则放入到completing,创建请求4 T& Q. X% m5 U) a7 A+ k
        if f.getBlock(hash) == nil {: g% z1 U5 K2 p" M! B
                request[announce.origin] = append(request[announce.origin], hash)7 F, _' B4 |' n8 S6 P
                f.completing[hash] = announce
) [" `9 S% K9 ?        }
, x* z) r9 G" _8 R7 T( l. q: E}- p2 i% E4 @- Z& T0 R! y
// Send out all block body requests
% R3 i- k1 f6 Y// 发送所有的请求,获取body,依然是每个peer一个单独协程2 z& W. D, H$ u* i
for peer, hashes := range request {
4 G( J/ u8 ?2 w/ Y' G8 q        log.Trace("Fetching scheduled bodies", "peer", peer, "list", hashes)5 d6 G9 r* h/ V: r1 ~" x* W1 x" r) Q' B
        // Create a closure of the fetch and schedule in on a new thread
! y% _" w3 l. f2 H, S% N) p  d        if f.completingHook != nil {8 D' o+ X/ D, s
                f.completingHook(hashes)5 |' S& p; l& r, _
        }# S. Z) x; H# {; J- [
        bodyFetchMeter.Mark(int64(len(hashes)))0 J: `. x4 t7 {1 d8 a
        go f.completing[hashes[0]].fetchBodies(hashes)4 X% G2 i/ A8 I
}1 f4 ^/ \, M4 Q0 F# {7 }# U/ }
// Schedule the next fetch if blocks are still pending7 ?1 o+ F  N% Y- ^
f.rescheduleComplete(completeTimer)
3 G  n2 t: p; B- K% u`handleMsg()`处理该消息也是干净利落,直接获取RLP格式的body,然后发送响应消息。
- W- g' W# o( M" B- R// handleMsg()" B3 B+ s6 a5 _+ v
case msg.Code == GetBlockBodiesMsg:. U! ?, V, f- p' c+ b
// Decode the retrieval message  |( s2 \0 Q& ?. W
msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
9 B# J; m, n, R6 R& o# B9 wif _, err := msgStream.List(); err != nil {1 i3 l: K: H1 T: |1 q
return err7 h: v- S% l- {! P
}
! m$ H- ~0 m: ~// Gather blocks until the fetch or network limits is reached& B/ f8 j. X7 Z* T* G- A
var (
" o7 y+ H$ e7 i# W, [hash   common.Hash
! A6 Z* [- F! k0 i. o% S/ C) Lbytes  int" n/ j) ^. s; t' P1 l* j$ h! K
bodies []rlp.RawValue
1 _3 Q% C' W( g' ~" K)) z) |1 P/ Y6 A1 v* d6 m5 T
// 遍历所有请求2 @( e4 O+ [! i2 [9 U
for bytes % n+ e0 G* m2 h  `
响应消息`BlockBodiesMsg`的处理与处理获取header的处理原理相同,先交给fetcher过滤,然后剩下的才是downloader的。需要注意一点,响应消息里只包含交易列表和叔块列表。
( y9 C3 y  C/ n+ {- \3 e& [// handleMsg()$ B6 @. Y5 q$ B
case msg.Code == BlockBodiesMsg:
2 U0 y  H" ?$ e+ O: h  ~8 d' _, z* U2 N// A batch of block bodies arrived to one of our previous requests
6 I! [" W. W' C5 ]  ~var request blockBodiesData7 h* \" g* c* ?$ a: `: k
if err := msg.Decode(&request); err != nil {  h. |5 ^8 {" Y4 [2 ?5 s9 u/ Q
return errResp(ErrDecode, “msg %v: %v”, msg, err)
9 y6 _& y  _' g3 q7 }: N$ ]+ i}
# g4 |( J+ U; O/ a' R( Q, \// Deliver them all to the downloader for queuing
* Z# f- s8 M3 M$ ^// 传递给downloader去处理
' v1 X& j, a1 O2 t: xtransactions := make([][]*types.Transaction, len(request))+ y) H6 q- m/ r& ]; d
uncles := make([][]*types.Header, len(request))
4 p/ v9 i& B! T+ a  _+ ifor i, body := range request {8 ~$ C( r5 a  |5 M, y  a5 D9 E
        transactions = body.Transactions: z9 D' U' i8 r  Z! D
        uncles = body.Uncles' \7 Z0 z) l: u% W& H3 s, Q
}6 X! y9 s: ~4 i7 }& a
// Filter out any explicitly requested bodies, deliver the rest to the downloader
; Z, t) p1 ^' k5 I// 先让fetcher过滤去fetcher请求的body,剩下的给downloader
$ x3 W# j: [- Pfilter := len(transactions) > 0 || len(uncles) > 0
% [8 ?4 a3 N* s8 Q. t0 sif filter {2 M4 N5 ]/ a- Q% |5 S6 F
        transactions, uncles = pm.fetcher.FilterBodies(p.id, transactions, uncles, time.Now())
& X  k  ~+ A9 U}
, {+ ^2 i# c5 m1 F# X* N( {& V// 剩下的body交给downloader
+ [3 t# Z; ?" g, e9 Eif len(transactions) > 0 || len(uncles) > 0 || !filter {+ r1 g; [% M; H8 O+ D! {; l$ j* }
        err := pm.downloader.DeliverBodies(p.id, transactions, uncles)
7 m* \) s( ~( h8 Z: m        if err != nil {
2 s+ L0 A, l9 g2 v                log.Debug("Failed to deliver bodies", "err", err)7 c! [2 }5 S+ u* M; q0 H, S3 ?3 r
        }; H0 y3 C3 v7 B/ W3 n( \9 e
}2 a. M  s% ~" t0 u; o
过滤函数的原理也与Header相同。- r4 F/ n0 v% D, Q3 c$ P' j
// FilterBodies extracts all the block bodies that were explicitly requested by/ _4 c6 [# m1 j9 Q/ _
// the fetcher, returning those that should be handled differently.$ @8 k( H; q3 q* Y- O
// 过去出fetcher请求的body,返回它没有处理的,过程类型header的处理
8 Y, m( H# x9 x5 i" i; Efunc (f *Fetcher) FilterBodies(peer string, transactions [][]*types.Transaction, uncles [][]*types.Header, time time.Time) ([][]*types.Transaction, [][]*types.Header) {
& K5 ~  e1 _1 llog.Trace(“Filtering bodies”, “peer”, peer, “txs”, len(transactions), “uncles”, len(uncles))7 B, J. J) g9 ^% O8 y) d' E
// Send the filter channel to the fetcher
4 j" w( ^' b" `, y3 ?* p- z6 @filter := make(chan *bodyFilterTask)6 Y& F5 d% V% z  q. s
select {- Q2 C( L" `/ G1 z! F, n
case f.bodyFilter
  K/ w" f3 q' |0 L9 [* R}
& Z5 S- n- o! C& M$ j9 J实际过滤body的处理瞧一下,这和Header的处理是不同的。直接看不点:2 F9 u' f! u2 d, \9 d4 x
1. 它要的区块,单独取出来存到`blocks`中,它不要的继续留在`task`中。6 E2 S4 E4 \- b2 P( {$ l
2. 判断是不是fetcher请求的方法:如果交易列表和叔块列表计算出的hash值与区块头中的一样,并且消息来自请求的Peer,则就是fetcher请求的。
& l3 b. r( k" m/ S; q: T+ x3. 将`blocks`中的区块加入到`queued`,终结。" n9 E3 H- b$ I8 n2 D+ t
case filter :=
, L/ k" C5 @( }& x+ Lblocks := []*types.Block{}3 m4 t& ~: W- m' C) t. q5 n1 n* `
// 获取的每个body的txs列表和uncle列表
/ h* |5 f: q& f4 O* d// 遍历每个区块的txs列表和uncle列表,计算hash后判断是否是当前fetcher请求的body
2 A! v* X/ K3 M- z9 t1 w) T) ifor i := 0; i 8 v- w6 h5 P" Y2 `3 ?
}
; Z9 U. @5 j' P. l) v, l1 R* g  Q
至此,fetcher获取完整区块的流程讲完了,fetcher模块中80%的代码也都贴出来了,还有2个值得看看的函数:* B; l  v- W$ J1 d0 }
1. `forgetHash(hash common.Hash)`:用于清空指定hash指的记/状态录信息。# @5 R8 O  y/ F5 O* N
2. `forgetBlock(hash common.Hash)`:用于从队列中移除一个区块。
3 n* t! E$ c: M% g最后了,再回到开始看看fetcher模块和新区块的传播流程,有没有豁然开朗。
! k& \3 P% E" `' q* \
" m) {+ R8 Y& z  n1 E  V- X
BitMere.com 比特池塘系信息发布平台,比特池塘仅提供信息存储空间服务。
声明:该文观点仅代表作者本人,本文不代表比特池塘立场,且不构成建议,请谨慎对待。
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

成为第一个吐槽的人

刘艳琴 小学生
  • 粉丝

    0

  • 关注

    0

  • 主题

    3