Hi 游客

更多精彩,请登录!

比特池塘 区块链技术 正文

以太坊源码分析:fetcher模块和区块传播

刘艳琴
162 0 0
从区块传播策略入手,介绍新区块是如何传播到远端节点,以及新区块加入到远端节点本地链的过程,同时会介绍fetcher模块,fetcher的功能是处理Peer通知的区块信息。在介绍过程中,还会涉及到p2p,eth等模块,不会专门介绍,而是专注区块的传播和加入区块链的过程。+ S3 q0 y& }5 \6 P" I& p+ R
当前代码是以太坊Release 1.8,如果版本不同,代码上可能存在差异。
: d/ R3 N5 J. c; g! B$ j5 C/ c& q总体过程和传播策略$ g4 ~. g( O. f! T3 O8 `
本节从宏观角度介绍,节点产生区块后,为了传播给远端节点做了啥,远端节点收到区块后又做了什么,每个节点都连接了很多Peer,它传播的策略是什么样的?
& v" o  u6 A2 q总体流程和策略可以总结为,传播给远端Peer节点,Peer验证区块无误后,加入到本地区块链,继续传播新区块信息。具体过程如下。$ ~& }; |1 U% A! H: `6 I
先看总体过程。产生区块后,miner模块会发布一个事件NewMinedBlockEvent,订阅事件的协程收到事件后,就会把新区块的消息,广播给它所有的peer,peer收到消息后,会交给自己的fetcher模块处理,fetcher进行基本的验证后,区块没问题,发现这个区块就是本地链需要的下一个区块,则交给blockChain进一步进行完整的验证,这个过程会执行区块所有的交易,无误后把区块加入到本地链,写入数据库,这个过程就是下面的流程图,图1。6 i6 T; X9 O1 e* F; n, Q

; g. _# o7 Z2 M% {( x+ R总体流程图,能看到有个分叉,是因为节点传播新区块是有策略的。它的传播策略为:
, C$ D6 M  E9 a假如节点连接了N个Peer,它只向Peer列表的sqrt(N)个Peer广播完整的区块消息。向所有的Peer广播只包含区块Hash的消息。
( W) k, f; D/ L策略图的效果如图2,红色节点将区块传播给黄色节点:
" V0 u2 M1 I7 K) r. M" m; k1 K0 B' a; j& Y+ s% T/ a
- @6 z2 F5 P' z& i
收到区块Hash的节点,需要从发送给它消息的Peer那里获取对应的完整区块,获取区块后就会按照图1的流程,加入到fetcher队列,最终插入本地区块链后,将区块的Hash值广播给和它相连,但还不知道这个区块的Peer。非产生区块节点的策略图,如图3,黄色节点将区块Hash传播给青色节点:
: o" M8 _1 I/ A' T8 J* O/ B, q0 Q) ^, b) Z9 |
至此,可以看出以太坊采用以石击水的方式,像水纹一样,层层扩散新产生的区块。
0 A3 U, j) M7 b0 N' J) B) LFetcher模块是干啥的) d/ k- _, L1 \3 b$ R0 g) R' o
fetcher模块的功能,就是收集其他Peer通知它的区块信息:1)完整的区块2)区块Hash消息。根据通知的消息,获取完整的区块,然后传递给eth模块把区块插入区块链。
3 ~+ z5 |6 q6 O. o: s& c  W) w如果是完整区块,就可以传递给eth插入区块,如果只有区块Hash,则需要从其他的Peer获取此完整的区块,然后再传递给eth插入区块
9 d" x# g3 I# p- S
' g# f4 `' n! Z3 P8 x, i$ f源码解读
, Z; J& o& Z4 j( D; }, j/ I  n& U' O本节介绍区块传播和处理的细节东西,方式仍然是先用图解释流程,再是代码流程。( U: ^$ Z) P, p  E1 K! e
产块节点的传播新区块
! Q7 r6 a0 R7 J; G  y  F) q! q* i节点产生区块后,广播的流程可以表示为图4:1 V* S% O/ j/ ?# X
发布事件事件处理函数选择要广播完整的Peer,然后将区块加入到它们的队列事件处理函数把区块Hash添加到所有Peer的另外一个通知队列每个Peer的广播处理函数,会遍历它的待广播区块队列和通知队列,把数据封装成消息,调用P2P接口发送出去' O+ q4 m& B/ Z& d6 C4 A

. B& s$ v& F, `; J' \8 P4 b% x) N9 e" u/ q
再看下代码上的细节。" h$ f9 t) J% d; Z. Z
worker.wait()函数发布事件NewMinedBlockEvent。ProtocolManager.minedBroadcastLoop()是事件处理函数。它调用了2次pm.BroadcastBlock()。
% E2 A$ _  ?% W/ Z  r+ \( D8 i. Z2 P# t$ [) e; Z+ p1 F8 y
// Mined broadcast loop
3 Q' T) y+ h6 N+ I! R# Yfunc (pm *ProtocolManager) minedBroadcastLoop() {
3 u& S$ {: N# s7 f7 F+ `        // automatically stops if unsubscribe6 s3 {0 r+ [- S" Q
        for obj := range pm.minedBlockSub.Chan() {
% ^/ |# ?; y0 @% O                switch ev := obj.Data.(type) {6 e4 A2 h/ b" g4 |
                case core.NewMinedBlockEvent:
( W5 P3 |, c' r6 L( M                        pm.BroadcastBlock(ev.Block, true)  // First propagate block to peers
7 s2 p# B8 Q" P: {                        pm.BroadcastBlock(ev.Block, false) // Only then announce to the rest
: {  I  G+ ^3 v/ W$ y# D                }
4 ?. o+ M$ n* a3 [2 u% E% u0 s        }
4 Q" l) h) a. _) O9 A}
" M( d% e% H/ @1 x. Vpm.BroadcastBlock()的入参propagate为真时,向部分Peer广播完整的区块,调用peer.AsyncSendNewBlock(),否则向所有Peer广播区块头,调用peer.AsyncSendNewBlockHash(),这2个函数就是把数据放入队列,此处不再放代码。
) x+ D4 K4 U, k// BroadcastBlock will either propagate a block to a subset of it's peers, or* v3 A* _$ P/ H% H( f6 B- T
// will only announce it's availability (depending what's requested).: t1 h6 i5 T% J! ^3 ^/ a# D
func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) {) I1 \; Q+ G; ~5 b
        hash := block.Hash()
$ @3 E2 G$ V# ~5 }: J5 T- C        peers := pm.peers.PeersWithoutBlock(hash)" }$ w* w9 R' T/ J" I( b- ~
        // If propagation is requested, send to a subset of the peer$ p6 Q0 u' F1 U2 X
        // 这种情况,要把区块广播给部分peer' p8 E  y% D( J' G
        if propagate {
8 j: u$ E5 g- n) a5 D                // Calculate the TD of the block (it's not imported yet, so block.Td is not valid)
; `3 b1 @% Z$ o& p. {& T) W8 J, ?                // 计算新的总难度
) T% j+ w+ z# W) B                var td *big.Int! M0 R! T; |+ }7 r" D7 F
                if parent := pm.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1); parent != nil {6 E6 ?$ ?! J" v- \& l: m
                        td = new(big.Int).Add(block.Difficulty(), pm.blockchain.GetTd(block.ParentHash(), block.NumberU64()-1))( I/ `2 N4 {; n$ G2 P# C
                } else {
; T( T0 u9 B% ^* R/ m                        log.Error("Propagating dangling block", "number", block.Number(), "hash", hash)
* v6 n. c. E' u" ]                        return2 N; s: d) X: @% m7 z
                }
& w6 X( ]/ E' {                // Send the block to a subset of our peers8 D  L$ |( t8 _3 m
                // 广播区块给部分peer$ M- M5 N1 k1 J6 _
                transfer := peers[:int(math.Sqrt(float64(len(peers))))]" Y2 D% g3 m8 r8 y4 x! z$ ~1 z
                for _, peer := range transfer {
/ k4 Y2 _0 r7 m# W1 V/ X                        peer.AsyncSendNewBlock(block, td)* x* _4 ?7 `9 C, f
                }
* ^7 k/ B) g6 ~+ A                log.Trace("Propagated block", "hash", hash, "recipients", len(transfer), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))
7 n& ?, [0 ^4 E5 [/ c+ u                return4 H4 t4 c# F' N* O; M
        }  L* m# n6 i  @3 u5 a6 V
        // Otherwise if the block is indeed in out own chain, announce it
* K( O) ]. l2 I3 M0 v# m        // 把区块hash值广播给所有peer
" M- p8 k# b9 y& B( O& ~        if pm.blockchain.HasBlock(hash, block.NumberU64()) {
. g( ?8 u0 I. c& M5 O                for _, peer := range peers {4 Y8 o4 D$ I( @+ j, J& C1 M- k
                        peer.AsyncSendNewBlockHash(block)2 ~% L+ C9 }2 J
                }4 K; F) {% p( B3 l/ ?  s6 z. G
                log.Trace("Announced block", "hash", hash, "recipients", len(peers), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))  M1 _  K  |" t) [% d" S7 H1 r
        }0 z  \7 t1 e0 n" N2 V! ~. }" Y
}5 E# x- S5 c2 g/ g- t
peer.broadcase()是每个Peer连接的广播函数,它只广播3种消息:交易、完整的区块、区块的Hash,这样表明了节点只会主动广播这3中类型的数据,剩余的数据同步,都是通过请求-响应的方式。
/ q% b. `' v! \2 i$ z% e6 }// broadcast is a write loop that multiplexes block propagations, announcements
7 x! {% |% a: D9 z# A// and transaction broadcasts into the remote peer. The goal is to have an async# v% n' z2 W6 F3 K: e+ M  l' L
// writer that does not lock up node internals.
) s$ p' J$ H7 r4 @1 U3 Rfunc (p *peer) broadcast() {
: Q$ k, O4 N  n0 N$ T$ P        for {
. J1 J# r/ O+ d$ y                select {
$ `4 c" G7 T$ l  P7 p" ]                // 广播交易% g# b9 d# d$ u0 ?; L7 U
                case txs := 9 V4 H  a! m2 c- F7 u
Peer节点处理新区块* F* }, Z) ?7 y5 w& n9 B; [
本节介绍远端节点收到2种区块同步消息的处理,其中NewBlockMsg的处理流程比较清晰,也简洁。NewBlockHashesMsg消息的处理就绕了2绕,从总体流程图1上能看出来,它需要先从给他发送消息Peer那里获取到完整的区块,剩下的流程和NewBlockMsg又一致了。
9 J5 C' F9 d0 A+ F这部分涉及的模块多,画出来有种眼花缭乱的感觉,但只要抓住上面的主线,代码看起来还是很清晰的。通过图5先看下整体流程。+ O5 F! k+ _) `$ K& W! g' _
消息处理的起点是ProtocolManager.handleMsg,NewBlockMsg的处理流程是蓝色标记的区域,红色区域是单独的协程,是fetcher处理队列中区块的流程,如果从队列中取出的区块是当前链需要的,校验后,调用blockchian.InsertChain()把区块插入到区块链,最后写入数据库,这是黄色部分。最后,绿色部分是NewBlockHashesMsg的处理流程,代码流程上是比较复杂的,为了能通过图描述整体流程,我把它简化掉了。
( r1 O  j5 b' m* j) W) d, V
9 v9 Q4 |, P* R! G# B仔细看看这幅图,掌握整体的流程后,接下来看每个步骤的细节。/ s  K1 ^! t) [8 L" x9 T- Y
NewBlockMsg的处理: b( s3 m" O0 _; q, F
本节介绍节点收到完整区块的处理,流程如下:
% B2 q( G0 A, H( |# a+ P% P, U首先进行RLP编解码,然后标记发送消息的Peer已经知道这个区块,这样本节点最后广播这个区块的Hash时,不会再发送给该Peer。; z* \) r( `  C1 f$ h
将区块存入到fetcher的队列,调用fetcher.Enqueue。
" r& {0 i2 ~3 y0 q, h% R更新Peer的Head位置,然后判断本地链是否落后于Peer的链,如果是,则通过Peer更新本地链。" I0 J0 I. V0 k7 Z8 t
只看handle.Msg()的NewBlockMsg相关的部分。+ a. Q* c3 |  t: x0 Z
case msg.Code == NewBlockMsg:: \* I$ ~7 O, E5 I  E6 l
        // Retrieve and decode the propagated block
/ [0 p' k: }* Z( L  K        // 收到新区块,解码,赋值接收数据+ c3 w( E# }: D1 q$ H/ i0 s9 L
        var request newBlockData4 Y% J9 O( s( m4 Z# q
        if err := msg.Decode(&request); err != nil {) x- Y" |: f% v* W6 z
                return errResp(ErrDecode, "%v: %v", msg, err)9 C4 \2 f+ @" a+ ]" _
        }7 e* Q; J7 V% |$ g  x
        request.Block.ReceivedAt = msg.ReceivedAt
, P3 p# S$ E& O5 b! b9 ~        request.Block.ReceivedFrom = p
$ ]7 E4 H6 v9 @3 u# @        // Mark the peer as owning the block and schedule it for import" B% \7 R; W& _
        // 标记peer知道这个区块
3 l- n4 n! L) W4 Q% Z. G: p        p.MarkBlock(request.Block.Hash())' a. ~5 h( m7 Q0 u; ?. ]7 V. O, p  W4 A
        // 为啥要如队列?已经得到完整的区块了/ x+ n/ X' x4 p& c
        // 答:存入fetcher的优先级队列,fetcher会从队列中选取当前高度需要的块+ Q# I% s. l+ _4 s7 A% R. `; u
        pm.fetcher.Enqueue(p.id, request.Block)
! c: z) C  i0 o6 }$ ^" j        // Assuming the block is importable by the peer, but possibly not yet done so,
+ v/ b; _/ r2 t1 ^" r( o7 v( v7 @9 @        // calculate the head hash and TD that the peer truly must have.
) Z. \; _8 M% l  U0 u        // 截止到parent区块的头和难度
" T" D# I2 J, U- c5 |6 `. Y        var (
9 z9 K* c2 H9 p2 M, D' I/ ~8 v2 _                trueHead = request.Block.ParentHash()
! x+ q, y1 e: b" r. B                trueTD   = new(big.Int).Sub(request.TD, request.Block.Difficulty())$ q6 M  w6 V+ z5 s) v: v
        )4 \" t6 {' B2 W) B2 \  U5 n
        // Update the peers total difficulty if better than the previous( M$ a- Q' G% {& ]1 `) n$ B* [
        // 如果收到的块的难度大于peer之前的,以及自己本地的,就去和这个peer同步0 x, ]  n  d7 C! }8 x8 }% w, J
        // 问题:就只用了一下块里的hash指,为啥不直接使用这个块呢,如果这个块不能用,干嘛不少发送些数据,减少网络负载呢。8 y& s9 y& T* k% j8 U) \0 Q1 x! |
        // 答案:实际上,这个块加入到了优先级队列中,当fetcher的loop检查到当前下一个区块的高度,正是队列中有的,则不再向peer请求. L! q5 c9 K( G! q! ]3 o
        // 该区块,而是直接使用该区块,检查无误后交给block chain执行insertChain8 O3 o) ]7 k2 m
        if _, td := p.Head(); trueTD.Cmp(td) > 0 {; I# D% X4 c  Y7 O
                p.SetHead(trueHead, trueTD)& z  z' w- C" W* m/ X9 |7 e' k0 e
                // Schedule a sync if above ours. Note, this will not fire a sync for a gap of
! S$ L2 Q7 a9 \- m                // a singe block (as the true TD is below the propagated block), however this
8 F; I/ @- s- U' Z                // scenario should easily be covered by the fetcher.) ]# d" g& y' [7 h1 {
                currentBlock := pm.blockchain.CurrentBlock()! N7 x' Z  N  q  A8 V
                if trueTD.Cmp(pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64())) > 0 {8 u/ G) ^# Z. `
                        go pm.synchronise(p)
, {, e! ]8 @5 |5 o( }, x6 {- Q7 K                }
6 _4 C: ~8 r/ |2 F, `" }* X        }
0 \. x3 L6 U5 x9 M//------------------------ 以上 handleMsg
8 d! Z' H$ g  W0 |# W// Enqueue tries to fill gaps the the fetcher's future import queue.
% R% A( V5 [* w: j0 L0 l// 发给inject通道,当前协程在handleMsg,通过通道发送给fetcher的协程处理
5 c9 z1 f/ J# d% q5 d$ ?! u; Y% m: n, Ifunc (f *Fetcher) Enqueue(peer string, block *types.Block) error {
4 u! F* f1 P# n7 H8 G0 f        op := &inject{
/ i% D8 K3 U$ F                origin: peer,
, ~2 @& m9 s6 m# l: d                block:  block,
- b3 B" R1 F2 `& E) Y0 W' t# ~* w% n        }/ H  u: l7 }% B6 I4 V
        select {: k9 H3 h# C2 Z6 y9 `9 z
        case f.inject  blockLimit {0 R  W" ~  F1 J/ D& P
                log.Debug("Discarded propagated block, exceeded allowance", "peer", peer, "number", block.Number(), "hash", hash, "limit", blockLimit)9 m3 R6 [4 i& N# g, u4 s5 I4 B
                propBroadcastDOSMeter.Mark(1)
) e6 H1 \) m" h# m                f.forgetHash(hash)
9 E+ b8 \2 H! Z) R: e3 Q                return
0 a* ?- j: P. y" u# C        }& w* q* Y4 r; t- y
        // Discard any past or too distant blocks
3 k3 D  J; `5 @. i* ?        // 高度检查:未来太远的块丢弃
, U! F; L# n) f! C' B8 {6 F# p        if dist := int64(block.NumberU64()) - int64(f.chainHeight()); dist  maxQueueDist {( F2 z  T) F' q) I
                log.Debug("Discarded propagated block, too far away", "peer", peer, "number", block.Number(), "hash", hash, "distance", dist)
, x1 g- l! G! P6 f. B: T1 S/ q+ h& ?                propBroadcastDropMeter.Mark(1)7 L& |+ b" G! Y9 |
                f.forgetHash(hash)2 F! D! v" h# t) X% E3 X
                return
* M7 W" c3 v* p/ {$ J8 w        }
+ k) N) c, j$ q' P4 J) U        // Schedule the block for future importing0 o, C9 A% P) k) p  j6 _
        // 块先加入优先级队列,加入链之前,还有很多要做
+ y/ d7 k( L: x: g, x        if _, ok := f.queued[hash]; !ok {
! c% B! x0 d5 {* d& {. B; H                op := &inject{- R2 j' }/ C2 B. N
                        origin: peer,, Q8 i3 K5 N1 \) @, S' Y! p+ W+ O! F
                        block:  block,! \9 L5 R7 u7 v' k; i
                }( S5 f+ @# ?$ U5 c5 F% V
                f.queues[peer] = count$ ]& k7 }; Y5 i+ S6 i, n& m
                f.queued[hash] = op
3 s3 y  F5 ^/ v8 p  ]                f.queue.Push(op, -float32(block.NumberU64()))9 N: W4 r& R0 g5 O. j/ J3 T0 R, P
                if f.queueChangeHook != nil {. p$ w# p$ w. `/ u% |
                        f.queueChangeHook(op.block.Hash(), true)
3 h7 [8 X( h+ p; T! d9 J3 n0 u6 z                }; s2 }9 I! g7 q" b$ |
                log.Debug("Queued propagated block", "peer", peer, "number", block.Number(), "hash", hash, "queued", f.queue.Size())2 u6 i! @8 ?% C. L" J
        }7 }9 |/ I% K  T; y
}1 Q$ P4 y5 X# K: ^- |: n2 x3 y
fetcher队列处理/ O/ k4 }  B# d1 A
本节我们看看,区块加入队列后,fetcher如何处理区块,为何不直接校验区块,插入到本地链?. i4 o7 @3 u) N1 N6 I% p3 J% x
由于以太坊又Uncle的机制,节点可能收到老一点的一些区块。另外,节点可能由于网络原因,落后了几个区块,所以可能收到“未来”的一些区块,这些区块都不能直接插入到本地链。. N4 G( U6 S/ ~* i7 n! M
区块入的队列是一个优先级队列,高度低的区块会被优先取出来。fetcher.loop是单独协程,不断运转,清理fecther中的事务和事件。首先会清理正在fetching的区块,但已经超时。然后处理优先级队列中的区块,判断高度是否是下一个区块,如果是则调用f.insert()函数,校验后调用BlockChain.InsertChain(),成功插入后,广播新区块的Hash" I1 U2 ~5 o7 G- v" Z1 a! p
// Loop is the main fetcher loop, checking and processing various notification
' d: \% |9 S/ a2 {' Y5 T// events.
: N+ B. _* c: I  }func (f *Fetcher) loop() {
: W; s3 N" x4 f4 U6 ?        // Iterate the block fetching until a quit is requested- w+ T) ?$ V" L  Z. k
        fetchTimer := time.NewTimer(0)8 r& |8 w4 _: }
        completeTimer := time.NewTimer(0)$ M0 z9 q4 I- N0 t: t9 _& ^
        for {! Y1 e; [3 }7 Q* u. h' D% m
                // Clean up any expired block fetches) M, c' J# X! {
                // 清理过期的区块  A) z# a; o! q% d+ M
                for hash, announce := range f.fetching {4 j* g' W3 G8 G  m
                        if time.Since(announce.time) > fetchTimeout {, R# q- h- \- h( n3 F
                                f.forgetHash(hash)
- P0 k/ S' b$ p& }! B+ X! |8 l                        }
1 i: Q. H$ ~7 w3 K. A5 d( X1 e                }+ X5 Q, v# q' G) @$ i4 h
                // Import any queued blocks that could potentially fit' P. I' A8 ]6 n/ H6 w& l
                // 导入队列中合适的块+ J- r! x# i: t1 m5 T- H
                height := f.chainHeight()$ {5 {0 \" q, N( X" |
                for !f.queue.Empty() {
8 `2 [& |$ ^0 N* R9 k9 A  l' N                        op := f.queue.PopItem().(*inject)
) i/ O/ \! B0 U0 t- |                        hash := op.block.Hash()1 N) u5 N4 b4 n# u- y6 o5 M6 o
                        if f.queueChangeHook != nil {
1 p$ Z% A: _2 F2 x8 j! W                                f.queueChangeHook(hash, false)  K3 [/ q( ]7 b! r; k
                        }' R0 y/ @5 d' a% p4 n  d7 Z4 S
                        // If too high up the chain or phase, continue later9 v1 F; F  u; a4 L- `+ P7 _  M
                        // 块不是链需要的下一个块,再入优先级队列,停止循环
' s+ p7 p3 d1 l, H                        number := op.block.NumberU64()
% [* ?: E  A: C$ C  e+ a                        if number > height+1 {+ X  ~+ q7 s$ G8 ^5 Z! O
                                f.queue.Push(op, -float32(number)), _4 _) }8 C: O
                                if f.queueChangeHook != nil {
% U! u- s0 K# ?$ b8 `" z" N                                        f.queueChangeHook(hash, true), m7 f* Z) ?# w/ @" ]  M
                                }
' {' \& D" X5 T3 p- ?                                break
# D" p6 u# \2 a9 Y                        }4 W9 B5 ~- p) c1 ^$ Q5 v
                        // Otherwise if fresh and still unknown, try and import
6 R' A/ x# J4 G' Z+ l/ P                        // 高度正好是我们想要的,并且链上也没有这个块) v- e% v( L2 c4 I8 K3 }
                        if number+maxUncleDist 5 R* }. o' i+ H: X
func (f *Fetcher) insert(peer string, block *types.Block) {6 {1 S4 N3 H& x. g/ j
        hash := block.Hash()
# v& t% p7 r- I8 P# L, d        // Run the import on a new thread
, Z0 i+ S' Y: ^! r4 {! Z        log.Debug("Importing propagated block", "peer", peer, "number", block.Number(), "hash", hash)2 Z$ r. U( u! O7 b
        go func() {. W5 B3 D/ m! M# |7 g" O
                defer func() { f.done + _+ B! I. Z3 r: I0 g
NewBlockHashesMsg的处理
  x% \) s% y% ?7 X3 m本节介绍NewBlockHashesMsg的处理,其实,消息处理是简单的,而复杂一点的是从Peer哪获取完整的区块,下节再看。8 n/ E1 x% Z/ y# \# J; Q
流程如下:
1 r9 Q- R% }0 i* K+ [3 K对消息进行RLP解码,然后标记Peer已经知道此区块。寻找出本地区块链不存在的区块Hash值,把这些未知的Hash通知给fetcher。fetcher.Notify记录好通知信息,塞入notify通道,以便交给fetcher的协程。fetcher.loop()会对notify中的消息进行处理,确认区块并非DOS攻击,然后检查区块的高度,判断该区块是否已经在fetching或者comleting(代表已经下载区块头,在下载body),如果都没有,则加入到announced中,触发0s定时器,进行处理。" J: A" u$ `  Q2 d& b* @
关于announced下节再介绍。
$ w( p- P" T: d  D
% F  U3 F. \8 g; Y, T  \// handleMsg()部分
( ~0 M  f* T7 l- {: Kcase msg.Code == NewBlockHashesMsg:) v2 Y; e# }" A4 v  W
        var announces newBlockHashesData  U9 U1 @6 F6 D. u2 J
        if err := msg.Decode(&announces); err != nil {8 W2 e7 w/ H- ]
                return errResp(ErrDecode, "%v: %v", msg, err)
8 J4 J- h7 B' D+ y3 ]        }: t: [0 H& {% j, Z: c4 z
        // Mark the hashes as present at the remote node3 ~( W( R# Z3 M* u! n7 y1 A2 n: X
        for _, block := range announces {
. I/ B8 g2 Y. [0 F$ z/ ~$ d                p.MarkBlock(block.Hash)/ b% `  h3 d+ M
        }0 Q* k. N' G3 ?
        // Schedule all the unknown hashes for retrieval
9 K: o' |9 P+ _. A        // 把本地链没有的块hash找出来,交给fetcher去下载
( x* t1 V+ ^& B) l* \& ?        unknown := make(newBlockHashesData, 0, len(announces))
! V( S0 f7 X* s        for _, block := range announces {" ~1 g1 I2 g/ I; O# C
                if !pm.blockchain.HasBlock(block.Hash, block.Number) {
3 J1 @# ?$ F2 [. s$ E4 y" W! U: |( R                        unknown = append(unknown, block)& m3 [; k; V- C8 s1 X. k
                }' g5 `' p& F# I/ S
        }
- _, h8 Z' v: d6 s! `( l" p        for _, block := range unknown {
+ H5 ~3 N& B& p7 M! M9 s                pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestOneHeader, p.RequestBodies)
' d8 |$ i; C# V9 x# {" W: J        }$ l- s3 {% D1 F, u% g
// Notify announces the fetcher of the potential availability of a new block in$ Q$ \+ P! K: x1 W& g5 J) ?( G% W% k
// the network.
" ~& C0 V5 M! d& _8 t// 通知fetcher(自己)有新块产生,没有块实体,有hash、高度等信息# i* P* ]2 M$ }, h. o! q. @  M- O" v
func (f *Fetcher) Notify(peer string, hash common.Hash, number uint64, time time.Time,$ A' ]% A& o) G
        headerFetcher headerRequesterFn, bodyFetcher bodyRequesterFn) error {
  I, c2 b1 I$ H: j  c, X        block := &announce{
/ Y6 d" D% }9 A, s. `1 E% U                hash:        hash,; L  J; a2 O' q& D7 W' O
                number:      number,
* C, S4 S/ f6 v9 I, V& D, U                time:        time,& w( s( W( t( W3 K2 ^/ V! }4 i4 [) n
                origin:      peer,
3 |. X6 H* `1 g6 N5 l' J( L% l                fetchHeader: headerFetcher,
$ o: F9 a# D9 ~! E                fetchBodies: bodyFetcher,$ |5 C$ ~5 k8 b( N
        }. W9 G, e& l" b4 [- x. s
        select {
" z4 W' z7 B4 R        case f.notify  hashLimit {7 d+ Q4 o4 R$ V$ L5 \) X- i
                log.Debug("Peer exceeded outstanding announces", "peer", notification.origin, "limit", hashLimit)
# o0 u, x0 u5 d  r% _: P- v                propAnnounceDOSMeter.Mark(1)
: z+ J' c9 h2 f. D8 w  q4 f( \: j                break
& a& e4 y8 p0 o, m7 p# T) D' s6 [        }
* A, _; A$ O2 @4 F- H' g        // If we have a valid block number, check that it's potentially useful
7 J- ~; B5 k5 ?7 F, t! V# o  x        // 高度检查5 b" w& G4 s" K" g
        if notification.number > 0 {  d" Z+ w4 Z) s
                if dist := int64(notification.number) - int64(f.chainHeight()); dist  maxQueueDist {
3 S: b3 r- X3 x3 q8 P( G; {                        log.Debug("Peer discarded announcement", "peer", notification.origin, "number", notification.number, "hash", notification.hash, "distance", dist)
: K" K0 l8 T( _                        propAnnounceDropMeter.Mark(1)
# M* r+ y2 U) R" }. V  N* F, b                        break; T& X' }1 f2 t7 Q! g  ?' N6 `' H3 _
                }
# K1 O& w& {: v$ J$ G5 ^        }9 Y4 X) k: t% d- n' T
        // All is well, schedule the announce if block's not yet downloading
7 `2 a* _0 K6 _6 O! N( W  c: Y, _        // 检查是否已经在下载,已下载则忽略: H( Q0 T8 L6 e3 j( {: z
        if _, ok := f.fetching[notification.hash]; ok {
! R. b0 A$ J: @& W8 Y5 D9 C4 n                break9 i% ^7 _/ ~0 q, Y! Z$ e* {- l: ~
        }
, ~4 C' T7 l& z1 O        if _, ok := f.completing[notification.hash]; ok {1 e/ o- g: p: ^. G( P
                break9 o8 n, X; ^9 ~4 {. Z  t" t5 N
        }3 m7 V. O8 n9 F. V# D5 d8 w& C
        // 更新peer已经通知给我们的区块数量& j0 S, j# R7 G. ]* C
        f.announces[notification.origin] = count
0 O+ z9 y4 Q" M/ }7 X        // 把通知信息加入到announced,供调度
* u, H5 B* h" i        f.announced[notification.hash] = append(f.announced[notification.hash], notification)
# w! X+ T- r* Z        if f.announceChangeHook != nil && len(f.announced[notification.hash]) == 1 {+ t3 n9 c' S6 z3 E" l( n
                f.announceChangeHook(notification.hash, true)
' Y! D& }6 t8 w        }
% k0 F  t8 }! ]        if len(f.announced) == 1 {  i8 x" B9 c; O' m  m
                // 有通知放入到announced,则重设0s定时器,loop的另外一个分支会处理这些通知
+ R5 @9 `  f- h/ p. @5 o                f.rescheduleFetch(fetchTimer). M: C% k1 Q, r$ v( ^
        }6 j# i5 s. L- D$ m4 K1 W) @
fetcher获取完整区块0 i+ [# o( y( \
本节介绍fetcher获取完整区块的过程,这也是fetcher最重要的功能,会涉及到fetcher至少80%的代码。单独拉放一大节吧。
. d& M/ Z& l& YFetcher的大头2 i2 J8 W% U6 s. Q/ P& ]; s) T
Fetcher最主要的功能就是获取完整的区块,然后在合适的实际交给InsertChain去验证和插入到本地区块链。我们还是从宏观入手,看Fetcher是如何工作的,一定要先掌握好宏观,因为代码层面上没有这么清晰。
  C3 q  H4 N2 A( o宏观0 d) o6 A/ {3 {& k- \( M
首先,看两个节点是如何交互,获取完整区块,使用时序图的方式看一下,见图6,流程很清晰不再文字介绍。( |. V4 t& l9 f# I; y- s
) z3 }/ w4 A1 d
再看下获取区块过程中,fetcher内部的状态转移,它使用状态来记录,要获取的区块在什么阶段,见图7。我稍微解释一下:
0 D& V' L) A! }# s  b& |. i收到NewBlockHashesMsg后,相关信息会记录到announced,进入announced状态,代表了本节点接收了消息。announced由fetcher协程处理,经过校验后,会向给他发送消息的Peer发送请求,请求该区块的区块头,然后进入fetching状态。获取区块头后,如果区块头表示没有交易和uncle,则转移到completing状态,并且使用区块头合成完整的区块,加入到queued优先级队列。获取区块头后,如果区块头表示该区块有交易和uncle,则转移到fetched状态,然后发送请求,请求交易和uncle,然后转移到completing状态。收到交易和uncle后,使用头、交易、uncle这3个信息,生成完整的区块,加入到队列queued。" J/ Q) m$ a5 v0 k, C  B; g

8 d% B. ?. P% l' t& |# J/ E
; q( Q2 X2 s, \' }" b, `微观" K4 z; Y5 M$ ?/ K
接下来就是从代码角度看如何获取完整区块的流程了,有点多,看不懂的时候,再回顾下上面宏观的介绍图。
. B) J7 A' l# J/ P7 [" g1 ?. E. ^首先看Fetcher的定义,它存放了通信数据和状态管理,捡加注释的看,上文提到的状态,里面都有。; ^; F0 z0 B" F6 h. s; T
// Fetcher is responsible for accumulating block announcements from various peers
$ R, B, g. A! q' r% Z# T// and scheduling them for retrieval.
; \9 y) E4 l) }* o// 积累块通知,然后调度获取这些块
2 [; q- V& Z; k0 W* C% }, |type Fetcher struct {% Z& e# K5 c; d4 y! Z& e9 |# |
        // Various event channels
, J; M9 D+ ^# K5 a9 c+ }: h    // 收到区块hash值的通道6 A( k, A9 t0 |
        notify chan *announce" H  x  r: C1 D' t4 c
    // 收到完整区块的通道5 ?5 U% m5 ^& ~, D* z) K3 x
        inject chan *inject
6 I8 N6 ~0 H$ ?% d3 w        blockFilter chan chan []*types.Block
" {( X! Y5 D' w+ _  g        // 过滤header的通道的通道
' a+ ?5 g  {  H        headerFilter chan chan *headerFilterTask2 n5 K+ \/ \. B9 u* j5 C' J
        // 过滤body的通道的通道, R; g2 k( K4 c% Q% z6 ~" Q
        bodyFilter chan chan *bodyFilterTask
% g( B3 ?7 Y5 T) I6 h7 h- ]/ l        done chan common.Hash* a+ [+ q+ a7 `( H# _1 W
        quit chan struct{}
1 D) b; Z/ D. B/ H+ O        // Announce states
# D& s$ {8 K  R5 G* `: y, n4 Z        // Peer已经给了本节点多少区块头通知
: K3 ^- E) j- M) y7 B! l5 d. v        announces map[string]int // Per peer announce counts to prevent memory exhaustion# V/ \; p( b# [) r0 o% D
        // 已经announced的区块列表1 d( S0 s  j0 Q. X* A8 ?
        announced map[common.Hash][]*announce // Announced blocks, scheduled for fetching
; O, ?7 d5 F$ r- T2 P        // 正在fetching区块头的请求; n5 v4 R8 I& A* N$ W
        fetching map[common.Hash]*announce // Announced blocks, currently fetching
  n, Q) ^5 |+ f9 D        // 已经fetch到区块头,还差body的请求,用来获取body; j$ m. v& S- P- h$ t
        fetched map[common.Hash][]*announce // Blocks with headers fetched, scheduled for body retrieval
9 y1 `( e9 p5 N" v        // 已经得到区块头的: K0 H9 Q# n# H: ^! E) i5 n+ B! F8 X
        completing map[common.Hash]*announce // Blocks with headers, currently body-completing. u& c4 k  x* T% e, t/ `" k4 e
        // Block cache# ]9 c6 Z& y' C. C6 R
        // queue,优先级队列,高度做优先级
  v5 k# E+ [$ w9 j6 S1 f! V        // queues,统计peer通告了多少块3 @2 k% S- z- u( M
        // queued,代表这个块如队列了,
$ ]* Q2 Y, E6 y        queue  *prque.Prque            // Queue containing the import operations (block number sorted)
! p& K0 d) t8 P/ O1 u8 T        queues map[string]int          // Per peer block counts to prevent memory exhaustion! B8 N1 W" E! V$ l$ R
        queued map[common.Hash]*inject // Set of already queued blocks (to dedupe imports)3 b- ~  d( z' F  ^2 l$ }$ V
        // Callbacks; v$ q' t2 I( J( j& |6 y+ _3 P
        getBlock       blockRetrievalFn   // Retrieves a block from the local chain
! l$ M( q# }5 ]        verifyHeader   headerVerifierFn   // Checks if a block's headers have a valid proof of work,验证区块头,包含了PoW验证) ]% @$ [8 l" e+ P" _! d
        broadcastBlock blockBroadcasterFn // Broadcasts a block to connected peers,广播给peer
: a: I7 P2 v8 G. e3 j& U        chainHeight    chainHeightFn      // Retrieves the current chain's height( @- S) B7 e/ w: B  V3 W% {" `$ Z
        insertChain    chainInsertFn      // Injects a batch of blocks into the chain,插入区块到链的函数
+ H5 v. T4 _$ s4 ^        dropPeer       peerDropFn         // Drops a peer for misbehaving, X# ^+ U8 J0 y) }, ^8 i
        // Testing hooks
. P6 a2 ^2 L( p3 s* [        announceChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a hash from the announce list
9 w( @  }3 [9 c8 X# Z) `1 X! l        queueChangeHook    func(common.Hash, bool) // Method to call upon adding or deleting a block from the import queue
5 T5 ]5 L, X: A8 x8 f+ \) o        fetchingHook       func([]common.Hash)     // Method to call upon starting a block (eth/61) or header (eth/62) fetch4 J( f. M- m$ T$ v
        completingHook     func([]common.Hash)     // Method to call upon starting a block body fetch (eth/62)
* H( J! T% Y% G        importedHook       func(*types.Block)      // Method to call upon successful block import (both eth/61 and eth/62); F% O) {) T& L4 O+ ?
}% U3 Z* `3 m6 Z9 k+ U- g6 Q
NewBlockHashesMsg消息的处理前面的小节已经讲过了,不记得可向前翻看。这里从announced的状态处理说起。loop()        中,fetchTimer超时后,代表了收到了消息通知,需要处理,会从announced中选择出需要处理的通知,然后创建请求,请求区块头,由于可能有很多节点都通知了它某个区块的Hash,所以随机的从这些发送消息的Peer中选择一个Peer,发送请求的时候,为每个Peer都创建了单独的协程。
* u/ a+ q  I9 V+ q" K! x7 ~7 Gcase  arriveTimeout-gatherSlack {
4 U9 i& W2 G  ?3 o, k                        // Pick a random peer to retrieve from, reset all others
  d% p8 z8 K4 K                        // 可能有很多peer都发送了这个区块的hash值,随机选择一个peer' C5 O$ K3 x- O( S# e* v
                        announce := announces[rand.Intn(len(announces))]: }# K. v7 C; u
                        f.forgetHash(hash)
1 t; j* F( A; m( R% w8 h6 z! n8 U                        // If the block still didn't arrive, queue for fetching+ b3 d1 W0 R# T
                        // 本地还没有这个区块,创建获取区块的请求+ C3 c' m0 ]; o6 _( R6 l
                        if f.getBlock(hash) == nil {
  X0 U" S+ [) Y& x+ f) Y                                request[announce.origin] = append(request[announce.origin], hash)
; U) z% L/ O3 u% D' I: V3 \                                f.fetching[hash] = announce
; D' o+ ?1 o9 ?6 ?# C& K% |) S                        }3 l& Z, D2 W6 U
                }4 q/ d& I# k1 K# ?
        }
! Q; `& U, K% u( S        // Send out all block header requests- m9 p# T; d; S* H! ]0 V
        // 把所有的request发送出去
- O" \7 q/ i1 ~4 W: H        // 为每一个peer都创建一个协程,然后请求所有需要从该peer获取的请求
# U! L* A4 `( K3 }. `        for peer, hashes := range request {
4 L) r1 t4 W6 g6 `3 }                log.Trace("Fetching scheduled headers", "peer", peer, "list", hashes)
$ \# ?3 z  U* ~& A+ _                // Create a closure of the fetch and schedule in on a new thread
3 d/ o' I. L% c5 a* w0 k                fetchHeader, hashes := f.fetching[hashes[0]].fetchHeader, hashes
! h8 I: @( p  i# g* F6 U& L                go func() {; f0 c1 l& f' u) v
                        if f.fetchingHook != nil {
# q# ~" A+ t3 z& x                                f.fetchingHook(hashes)
( B0 o+ ?8 S% v3 [                        }
$ I( ~# [- E$ b( `( _) }* z, J" ^$ a+ w                        for _, hash := range hashes {: e& o& a  t- O! ?
                                headerFetchMeter.Mark(1)
1 d- e' M. M* @7 n. Q) I$ ^                                fetchHeader(hash) // Suboptimal, but protocol doesn't allow batch header retrievals
6 Q$ R$ N% O8 a6 L6 M/ X                        }1 q; D/ _  x/ K- p* Z; M
                }()+ k* a$ c2 n0 A1 Z! X+ O
        }
  B5 |9 V2 U6 M* K        // Schedule the next fetch if blocks are still pending4 [' R0 a" K* D8 k5 j0 R
        f.rescheduleFetch(fetchTimer)
* q! O1 d9 J. S; S: Y3 W( C从Notify的调用中,可以看出,fetcherHeader()的实际函数是RequestOneHeader(),该函数使用的消息是GetBlockHeadersMsg,可以用来请求多个区块头,不过fetcher只请求一个。
+ v: C+ m2 b- H& i2 Hpm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestOneHeader, p.RequestBodies)/ z! _, ]# l8 x7 h0 W3 Y/ d
// RequestOneHeader is a wrapper around the header query functions to fetch a
+ j6 ~, k6 u- T: |/ P4 \// single header. It is used solely by the fetcher.
+ Y6 u! b7 X; f, z& v6 a; pfunc (p *peer) RequestOneHeader(hash common.Hash) error {
0 V& {  S$ D1 w# |9 }        p.Log().Debug("Fetching single header", "hash", hash)
+ F/ Q: T0 ?, a9 g/ E3 u" M        return p2p.Send(p.rw, GetBlockHeadersMsg, &getBlockHeadersData{Origin: hashOrNumber{Hash: hash}, Amount: uint64(1), Skip: uint64(0), Reverse: false}): r* l8 H: v& I1 n* w
}: I4 M' s" j! |" s
GetBlockHeadersMsg的处理如下:因为它是获取多个区块头的,所以处理起来比较“麻烦”,还好,fetcher只获取一个区块头,其处理在20行~33行,获取下一个区块头的处理逻辑,这里就不看了,最后调用SendBlockHeaders()将区块头发送给请求的节点,消息是BlockHeadersMsg。
$ X2 @* a4 a2 n···! D8 d% C+ u: {5 G  M% R
// handleMsg()- [9 ?4 Q% \: i+ W. w+ H4 [
// Block header query, collect the requested headers and reply
# u$ @9 ?: W/ E/ y$ q5 J) Ycase msg.Code == GetBlockHeadersMsg:( c% J& o  H3 ^5 @1 q) R
// Decode the complex header query
( B( E6 C9 F. d3 U0 S/ j5 M; zvar query getBlockHeadersData
% W- i( L8 u( u' Fif err := msg.Decode(&query); err != nil {
3 Q. M0 F4 j/ M. S8 z9 @return errResp(ErrDecode, “%v: %v”, msg, err)8 c6 W3 L! L' N, ^0 U$ ]
}
; }5 A6 Q; @+ t9 AhashMode := query.Origin.Hash != (common.Hash{})0 r7 |1 v! F( B" c
// Gather headers until the fetch or network limits is reached7 r! u9 U! ?& k& K7 d# m' p
// 收集区块头,直到达到限制4 q. G) Z2 s. ^6 A# y; h
var (
. F% m* T8 [/ f! S3 |1 A" d        bytes   common.StorageSize6 R& t+ @( `- Z5 U* I
        headers []*types.Header
5 M2 m1 W; i# d        unknown bool4 j1 T; R1 b& _; V7 t- v& Q4 `
)) \, s* v. G: D8 I' Y1 E( G
// 自己已知区块 && 少于查询的数量 && 大小小于2MB && 小于能下载的最大数量
+ r; L- J8 X9 s/ `for !unknown && len(headers)
1 n- O, w, G' V8 u, m; p0 E  D`BlockHeadersMsg`的处理很有意思,因为`GetBlockHeadersMsg`并不是fetcher独占的消息,downloader也可以调用,所以,响应消息的处理需要分辨出是fetcher请求的,还是downloader请求的。它的处理逻辑是:fetcher先过滤收到的区块头,如果fetcher不要的,那就是downloader的,在调用`fetcher.FilterHeaders`的时候,fetcher就将自己要的区块头拿走了。
( p1 k+ T5 S  J# c// handleMsg()! t/ y+ Y! C3 t- s" }' ?& Y
case msg.Code == BlockHeadersMsg:& X; a% m9 _1 `
// A batch of headers arrived to one of our previous requests7 y, f. C/ i% V1 h/ B
var headers []*types.Header& U5 H* ^/ t: D, S: B& R8 b4 V; q
if err := msg.Decode(&headers); err != nil {
, C7 [: r- d8 i) ]+ \# u+ L0 g+ @  yreturn errResp(ErrDecode, “msg %v: %v”, msg, err)/ E0 t  S8 q( K
}! A1 S. v! a( e* g  i* p3 l. j
// If no headers were received, but we’re expending a DAO fork check, maybe it’s that
* G8 ~+ ?$ f' g1 n% t! q9 T// 检查是不是当前DAO的硬分叉
& c, o$ J6 ]' Z6 U  M" r9 A" Nif len(headers) == 0 && p.forkDrop != nil {
! f; r4 U9 x( z// Possibly an empty reply to the fork header checks, sanity check TDs
7 T6 Z+ d/ n/ e! t! X* kverifyDAO := true* a5 _+ D4 }+ J- @6 m; A- b6 v
        // If we already have a DAO header, we can check the peer's TD against it. If
( r: D$ Q# {: W0 A        // the peer's ahead of this, it too must have a reply to the DAO check2 f7 i( s8 v4 V6 u+ f
        if daoHeader := pm.blockchain.GetHeaderByNumber(pm.chainconfig.DAOForkBlock.Uint64()); daoHeader != nil {
0 }5 d* `- W8 N                if _, td := p.Head(); td.Cmp(pm.blockchain.GetTd(daoHeader.Hash(), daoHeader.Number.Uint64())) >= 0 {1 k; @! y2 |- y, D- a
                        verifyDAO = false
) k# V( X" c8 q                }7 y/ P& V# i) ^* {2 M& q9 F6 ~" [
        }
1 K( S  D" y0 w4 q1 _        // If we're seemingly on the same chain, disable the drop timer- c' Y& f5 ^) m1 b
        if verifyDAO {9 S5 m  Y& s0 [2 e2 K9 }+ M
                p.Log().Debug("Seems to be on the same side of the DAO fork")5 Y* W& g6 J1 V) y# j
                p.forkDrop.Stop()/ k0 m& A2 N0 A( J  L4 ?0 N# ^
                p.forkDrop = nil: s# d7 i5 P( r9 t1 V0 H% y
                return nil" i4 |. I/ M7 C& d1 b9 [# G" K* Z2 h
        }6 Q$ |+ }/ P+ Z1 s3 n
}% \+ n/ p  \* R- O+ W
// Filter out any explicitly requested headers, deliver the rest to the downloader) N* B* J6 Z$ H0 `
// 过滤是不是fetcher请求的区块头,去掉fetcher请求的区块头再交给downloader
2 S5 R7 u3 E3 }7 k& h* ]; Z9 z9 m' qfilter := len(headers) == 1
2 P' |# O' K& b* X; iif filter {
: R1 N1 i* z2 H1 V        // If it's a potential DAO fork check, validate against the rules  F1 z* |7 Z. m# g& T( D, p) o
        // 检查是否硬分叉
* W6 l# [" }# u0 A9 J$ ~        if p.forkDrop != nil && pm.chainconfig.DAOForkBlock.Cmp(headers[0].Number) == 0 {
$ V; }8 ]5 k/ j& g7 \                // Disable the fork drop timer
) b& {$ k6 o# b  p/ e3 ~- n                p.forkDrop.Stop()
/ U4 [+ r( N; Z' x5 W- t+ L                p.forkDrop = nil+ V. w5 f3 s9 _
                // Validate the header and either drop the peer or continue
! d3 e% P1 [- `                if err := misc.VerifyDAOHeaderExtraData(pm.chainconfig, headers[0]); err != nil {* [$ I' D; v5 l2 Y- L; C
                        p.Log().Debug("Verified to be on the other side of the DAO fork, dropping")/ {/ @# B) L4 {+ W* H+ M' V
                        return err
7 g5 d, G" Q. t" V, N                }
  s5 k! Y8 Z, P7 O  V. E' x  F7 ^( v. _                p.Log().Debug("Verified to be on the same side of the DAO fork")
& f- y0 f; j0 W0 A                return nil- o- |3 Y& n$ F" y5 D
        }- C: D% a6 v; v  `' I
        // Irrelevant of the fork checks, send the header to the fetcher just in case
! A, F" F9 ^" D& \        // 使用fetcher过滤区块头4 _4 n9 @3 V- O/ x  _5 U7 A
        headers = pm.fetcher.FilterHeaders(p.id, headers, time.Now())$ R; w# v: _7 r8 C! {( \
}
, w7 ]$ ^6 v$ ~+ a+ O" Y/ u// 剩下的区块头交给downloader
  n( z; V/ R0 ^if len(headers) > 0 || !filter {
! Q/ f* x/ w% P- R& |, d2 J8 ?        err := pm.downloader.DeliverHeaders(p.id, headers)
1 ^9 x# E: d! W  u2 s        if err != nil {
  l. q& l2 `/ i, |: w. |                log.Debug("Failed to deliver headers", "err", err)
- z" T  b' ]  D/ Z" n3 a/ ~, b  M        }/ I1 x; u: p# L, Z% B7 A
}
: {4 f  u' y8 V! t+ X( m7 f" J. T) ^`FilterHeaders()`是一个很有大智慧的函数,看起来耐人寻味,但实在妙。它要把所有的区块头,都传递给fetcher协程,还要获取fetcher协程处理后的结果。`fetcher.headerFilter`是存放通道的通道,而`filter`是存放包含区块头过滤任务的通道。它先把`filter`传递给了`headerFilter`,这样`fetcher`协程就在另外一段等待了,而后将`headerFilterTask`传入`filter`,`fetcher`就能读到数据了,处理后,再将数据写回`filter`而刚好被`FilterHeaders`函数处理了,该函数实际运行在`handleMsg()`的协程中。. A- K! H% f6 ^! m& C
每个Peer都会分配一个ProtocolManager然后处理该Peer的消息,但`fetcher`只有一个事件处理协程,如果不创建一个`filter`,fetcher哪知道是谁发给它的区块头呢?过滤之后,该如何发回去呢?
  O3 `5 X2 c6 H; _# U7 m// FilterHeaders extracts all the headers that were explicitly requested by the fetcher,; C7 I$ G5 k0 F' j4 r$ ]
// returning those that should be handled differently.
8 L" J4 S6 ^1 @& j( ^" I$ b4 {" t// 寻找出fetcher请求的区块头! I6 s' L- u' P( U5 p9 D
func (f *Fetcher) FilterHeaders(peer string, headers []*types.Header, time time.Time) []*types.Header {% X$ B) n/ f( t
log.Trace(“Filtering headers”, “peer”, peer, “headers”, len(headers))
1 j4 P9 y( E" G* v0 {// Send the filter channel to the fetcher
! L3 \) v2 M" F5 ?; e) J# B// 任务通道
/ r& T2 ]: S; o! d) K3 u2 e7 v: ]filter := make(chan *headerFilterTask)
* E/ p6 P1 ^/ |/ G, g  Vselect {
* P% H. K' \8 d4 }// 任务通道发送到这个通道
) i0 [) \5 k  rcase f.headerFilter , X! l" c, r( d$ t
}
: {  S8 I% t2 T! Z接下来要看f.headerFilter的处理,这段代码有90行,它做了一下几件事:# Q7 W7 b6 g7 h5 t, q& ]; i
1. 从`f.headerFilter`取出`filter`,然后取出过滤任务`task`。7 z' }% \) T. n1 r4 v
2. 它把区块头分成3类:`unknown`这不是分是要返回给调用者的,即`handleMsg()`, `incomplete`存放还需要获取body的区块头,`complete`存放只包含区块头的区块。遍历所有的区块头,填到到对应的分类中,具体的判断可看18行的注释,记住宏观中将的状态转移图。
5 F1 J, w3 U* W" y9 m3. 把`unknonw`中的区块返回给`handleMsg()`。
1 n8 P' |' _* E) w4 Y4. 把` incomplete`的区块头获取状态移动到`fetched`状态,然后触发定时器,以便去处理complete的区块。3 h8 S) c9 G6 R% m2 G
5. 把`compelete`的区块加入到`queued`。+ p4 ]. \5 Z- s$ W2 I- j
// fetcher.loop()
( z$ |' b8 N/ K- K2 l% l$ [" ~case filter :=
/ }* g( y; l$ }- Z- |// Split the batch of headers into unknown ones (to return to the caller),' c& w. |0 k* ?$ d2 N
// known incomplete ones (requiring body retrievals) and completed blocks.; |+ H4 g6 x# e
// unknown的不是fetcher请求的,complete放没有交易和uncle的区块,有头就够了,incomplete放
' G0 l- Y% H% ^// 还需要获取uncle和交易的区块
( }6 x6 u% n" R, ~unknown, incomplete, complete := []*types.Header{}, []*announce{}, []*types.Block{}( N3 @! A0 o; \# X! S: R& |
// 遍历所有收到的header' x) j9 u. \0 o
for _, header := range task.headers {0 W. z( m" B4 [2 R6 t6 y7 ^
        hash := header.Hash()
0 i0 H. Z. C2 X3 V0 G* o4 i% k& j        // Filter fetcher-requested headers from other synchronisation algorithms6 r& V  B) ]& M" o5 a
        // 是正在获取的hash,并且对应请求的peer,并且未fetched,未completing,未queued+ z. ?6 j" t" U( c& v* J: `
        if announce := f.fetching[hash]; announce != nil && announce.origin == task.peer && f.fetched[hash] == nil && f.completing[hash] == nil && f.queued[hash] == nil {
/ N7 j9 s  J- g+ ~/ D( A                // If the delivered header does not match the promised number, drop the announcer
0 C& z1 A) E0 J* s, t                // 高度校验,竟然不匹配,扰乱秩序,peer肯定是坏蛋。# j0 ~+ Z1 S1 S0 F$ M' \
                if header.Number.Uint64() != announce.number {- R& ]! f7 S7 A* i$ \' q6 k$ ~* J
                        log.Trace("Invalid block number fetched", "peer", announce.origin, "hash", header.Hash(), "announced", announce.number, "provided", header.Number)& u( [/ d/ T, B3 _  C5 p- a' y; E
                        f.dropPeer(announce.origin)$ g- W3 F& d/ g# \' l0 c
                        f.forgetHash(hash)
" C+ a1 T$ S2 a$ _# q+ k                        continue! ^: D0 ]% z3 {1 a: X- m
                }
5 @7 {! C  q$ \! Q9 g3 J: R                // Only keep if not imported by other means' M. Y/ B; e$ w" y3 w
                // 本地链没有当前区块( o$ o! o* p5 j, v, l
                if f.getBlock(hash) == nil {
8 X0 ]7 R% n- D' ]0 X- I                        announce.header = header
2 F% m! Q/ j8 `2 L* l                        announce.time = task.time7 Y% p7 _) y& y7 G3 B' W
                        // If the block is empty (header only), short circuit into the final import queue
) E6 j* U, e- p; ?- ]                        // 如果区块没有交易和uncle,加入到complete
: }0 s/ l7 c8 a/ \- a5 `- r, o                        if header.TxHash == types.DeriveSha(types.Transactions{}) && header.UncleHash == types.CalcUncleHash([]*types.Header{}) {3 x' k  h% _4 U1 I0 g" v2 z
                                log.Trace("Block empty, skipping body retrieval", "peer", announce.origin, "number", header.Number, "hash", header.Hash())
+ {7 h" D. q. h' @8 B                                block := types.NewBlockWithHeader(header)
) ]  e3 b  @. s4 ?7 J                                block.ReceivedAt = task.time6 C+ H/ w$ ~/ Z3 [
                                complete = append(complete, block)
9 o8 A9 V$ p9 h- Z  a7 m# q, V                                f.completing[hash] = announce/ Z9 h8 G% Z. I: L$ K$ z* a
                                continue/ M+ o$ z* {$ X
                        }
, G8 g( Q. l  q% T7 K: S                        // Otherwise add to the list of blocks needing completion
, Q/ N! \1 L$ l" E( F. r" B                        // 否则就是不完整的区块
6 e0 \. i( t: x  u0 j0 x                        incomplete = append(incomplete, announce)0 X, D* V! ~! V) {, E
                } else {- ^! c& B" @1 L; k9 v9 Q2 q) O
                        log.Trace("Block already imported, discarding header", "peer", announce.origin, "number", header.Number, "hash", header.Hash())
% m$ I& [6 ^3 k, B                        f.forgetHash(hash)
& `, p! M6 y0 B9 \$ ]+ T                }* P4 o( X9 L" F, P! H0 {+ w$ a
        } else {; k& }! J4 N+ {6 d4 k0 {8 n3 n
                // Fetcher doesn't know about it, add to the return list
) {% }' ~" ^. o1 I+ n                // 没请求过的header/ O, o% X8 v: {- e! u; m
                unknown = append(unknown, header)$ N' S: G# Y& f
        }
+ Q/ x2 `/ q9 U0 k; ~}
" v; F# a3 n6 F0 E- J// 把未知的区块头,再传递会filter, w6 N$ W! i& d9 Y! K
headerFilterOutMeter.Mark(int64(len(unknown)))
( z4 r! h3 k) |select {
  E0 D! }4 L' g- g* T: p$ \case filter
) U8 t7 d$ G) f" w跟随状态图的转义,剩下的工作是`fetched`转移到`completing`        ,上面的流程已经触发了`completeTimer`定时器,超时后就会处理,流程与请求Header类似,不再赘述,此时发送的请求消息是`GetBlockBodiesMsg`,实际调的函数是`RequestBodies`。- a6 u0 [9 O* `3 N( Y0 Y
// fetcher.loop()+ e  D; H! @- e7 @/ t7 E
case 3 ~/ M& F9 n2 T3 G' e# T
// 遍历所有待获取body的announce+ k, M3 f: O) s  `
for hash, announces := range f.fetched {
& w. U9 Q* R9 w. D! @2 v        // Pick a random peer to retrieve from, reset all others
8 l! E/ H+ G7 w        // 随机选一个Peer发送请求,因为可能已经有很多Peer通知它这个区块了
. S4 F( [% o: _$ p9 P) l        announce := announces[rand.Intn(len(announces))]
+ s' i& T0 y, I8 z        f.forgetHash(hash), V$ F( S" W% Q5 n
        // If the block still didn't arrive, queue for completion
( \9 L1 [$ A& j: ?1 v        // 如果本地没有这个区块,则放入到completing,创建请求
- a4 F1 [& A# _) _        if f.getBlock(hash) == nil {
# R$ L$ h3 j7 u0 P0 c% p3 B& G                request[announce.origin] = append(request[announce.origin], hash)
& x& y) j" M  R$ ?% ~; @9 s                f.completing[hash] = announce* ^7 T0 x+ N( |% a0 O. e# V
        }, |& L# t) `) N9 q" V
}0 C6 Q+ Z" g4 ~* {9 I: o
// Send out all block body requests
% I! L7 \  ~3 ~( I  ~6 F// 发送所有的请求,获取body,依然是每个peer一个单独协程8 \% @0 g# d8 I9 F
for peer, hashes := range request {& O4 }. \) E. c( c3 E
        log.Trace("Fetching scheduled bodies", "peer", peer, "list", hashes)
1 Z! M* V+ L1 K2 C) L! r        // Create a closure of the fetch and schedule in on a new thread: J5 n, U' i7 X9 F1 w8 f* N4 b, K
        if f.completingHook != nil {
- O0 V! `6 ~, o$ z: k  o                f.completingHook(hashes)
3 g4 w, s' v# e        }
2 F. P1 P. D# w9 d: _9 O6 _        bodyFetchMeter.Mark(int64(len(hashes)))) R6 T8 g% _0 K7 x
        go f.completing[hashes[0]].fetchBodies(hashes)
  {; K' B4 q5 d7 O) j, Z, a}
: D. D3 N6 B# P& J2 D/ M// Schedule the next fetch if blocks are still pending
8 a/ t! e5 w/ U1 I; ?; L% c6 ?f.rescheduleComplete(completeTimer)
8 l! W3 H5 `+ Z4 V`handleMsg()`处理该消息也是干净利落,直接获取RLP格式的body,然后发送响应消息。6 ]  \' f$ g" k# s
// handleMsg()% {7 n7 ^% }4 X6 K) E) z& A
case msg.Code == GetBlockBodiesMsg:' l! d- [0 _6 Z. M1 @8 @1 `
// Decode the retrieval message
+ O1 o3 W3 p0 V3 Q; pmsgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
$ I  `5 l9 U  zif _, err := msgStream.List(); err != nil {
/ ~' O$ m1 [8 ~" ureturn err
: z$ K/ K% P2 P7 e' h: o  Z}
# {4 m# N) V. c( e// Gather blocks until the fetch or network limits is reached
( ~* A% n9 H! Z1 tvar (9 z' ?1 B2 F+ E) u* z! E
hash   common.Hash
; s0 c( O- L9 }1 F$ kbytes  int
  N) v! f3 N+ }3 E) Q3 ~5 C$ Z+ nbodies []rlp.RawValue
2 `+ s0 A/ V. Y6 P! s& o# @+ R)
0 [+ @' z9 r2 v" [// 遍历所有请求
" L; o1 j8 K6 n1 l: g& ^0 mfor bytes
0 p" H( v- n$ C! S; O1 v: c, ]- t- V响应消息`BlockBodiesMsg`的处理与处理获取header的处理原理相同,先交给fetcher过滤,然后剩下的才是downloader的。需要注意一点,响应消息里只包含交易列表和叔块列表。
2 J6 a) y# h& S; o6 ?: [: |// handleMsg()0 X( v. w1 i' i- L) W3 e) M. m
case msg.Code == BlockBodiesMsg:
2 p0 ~+ Y9 o+ S; y- c% T( A// A batch of block bodies arrived to one of our previous requests
/ I3 f( f* J; Mvar request blockBodiesData
1 d- l* E6 ^' g6 m7 D, ~5 cif err := msg.Decode(&request); err != nil {
5 W0 x3 Q7 {. V) Y8 g3 }5 {6 Xreturn errResp(ErrDecode, “msg %v: %v”, msg, err)
% |# @9 p5 O  h}
: d) n6 n2 k+ M// Deliver them all to the downloader for queuing% W( R4 G" u$ e
// 传递给downloader去处理. y6 \! A  P1 D: ], Q( ^5 D5 G
transactions := make([][]*types.Transaction, len(request))
' v" ]6 |1 e2 z: o" Duncles := make([][]*types.Header, len(request))
8 b' R% C0 b& }+ L& _1 s0 Yfor i, body := range request {5 Z. }1 Q3 B  Z* p
        transactions = body.Transactions
8 I) j6 {# a0 Q. b        uncles = body.Uncles
* j0 m# t5 E1 i  f9 [# T- b}
  _, L' H. E* _  t6 S, r* _// Filter out any explicitly requested bodies, deliver the rest to the downloader
# Q' Y) }! |" u5 y+ S% ]// 先让fetcher过滤去fetcher请求的body,剩下的给downloader7 T& e" p% Y4 Q
filter := len(transactions) > 0 || len(uncles) > 03 r6 u% _" [1 K4 N+ `$ W
if filter {! r' a4 }0 r& R: c
        transactions, uncles = pm.fetcher.FilterBodies(p.id, transactions, uncles, time.Now())2 y5 i! A- k& d. b# c" Z: M5 \
}
. _) X' U9 X' O$ x+ W8 ]. f. n// 剩下的body交给downloader
4 f- L2 e: q1 P( g$ R6 y4 b/ Bif len(transactions) > 0 || len(uncles) > 0 || !filter {
& j5 d8 `2 Y. A) k$ S; Y, u        err := pm.downloader.DeliverBodies(p.id, transactions, uncles)
: c" O9 B2 J6 I        if err != nil {8 a6 M- e5 U& [* U, b3 ^
                log.Debug("Failed to deliver bodies", "err", err)1 n( ]# j( `, f) l$ s
        }4 M- g  X  J: T2 ]1 o5 K
}+ P$ M% C% R6 k7 Q6 d  _# f  W
过滤函数的原理也与Header相同。5 J' D! ?- |7 |* ~, F2 m; a+ J
// FilterBodies extracts all the block bodies that were explicitly requested by6 x* q8 R: ^, m7 b, ~- A6 Y
// the fetcher, returning those that should be handled differently.9 E0 x7 c6 B# T$ ~& a
// 过去出fetcher请求的body,返回它没有处理的,过程类型header的处理7 r& P3 _2 ]5 F% \
func (f *Fetcher) FilterBodies(peer string, transactions [][]*types.Transaction, uncles [][]*types.Header, time time.Time) ([][]*types.Transaction, [][]*types.Header) {
1 m" h1 `' e+ u% G! tlog.Trace(“Filtering bodies”, “peer”, peer, “txs”, len(transactions), “uncles”, len(uncles))
/ S8 f0 b1 c- @- ]$ T/ u// Send the filter channel to the fetcher
$ U! [+ V' G0 T: l: `- n6 \" c' a+ g6 V& nfilter := make(chan *bodyFilterTask)
, {) |2 r3 g" Y% `. Qselect {0 `! O6 F1 j1 R/ P' c+ I
case f.bodyFilter . c4 L  v7 z8 U1 j( f8 a, S8 ^
}
; Q8 {" ~: g- q3 N; y: Z# R$ e实际过滤body的处理瞧一下,这和Header的处理是不同的。直接看不点:
4 f8 ]9 j6 Y! m$ E$ c1. 它要的区块,单独取出来存到`blocks`中,它不要的继续留在`task`中。
, }6 O; s9 Q& |  u' M$ H2. 判断是不是fetcher请求的方法:如果交易列表和叔块列表计算出的hash值与区块头中的一样,并且消息来自请求的Peer,则就是fetcher请求的。5 i7 I0 u& Y/ x7 i( }  b
3. 将`blocks`中的区块加入到`queued`,终结。
8 }8 n* A( `% {case filter := ' V* z* _/ F. T$ z0 p9 U4 Q) o2 ^
blocks := []*types.Block{}
% |' T% q+ E7 `5 A// 获取的每个body的txs列表和uncle列表# F! @5 z+ _# W7 Z5 C2 ~! T
// 遍历每个区块的txs列表和uncle列表,计算hash后判断是否是当前fetcher请求的body5 G5 P8 e+ H) z2 e1 {
for i := 0; i
7 H& v& H5 a$ F% R% d}- c* b0 r0 H2 v+ b& r
: w& _" \/ K) Y/ I
至此,fetcher获取完整区块的流程讲完了,fetcher模块中80%的代码也都贴出来了,还有2个值得看看的函数:
7 Q3 p- L  X! b5 B' _1. `forgetHash(hash common.Hash)`:用于清空指定hash指的记/状态录信息。3 R& ?7 t" H; k( P) Z: [5 F! I
2. `forgetBlock(hash common.Hash)`:用于从队列中移除一个区块。. L6 S8 m5 {' Q& _+ [1 O1 ]1 o
最后了,再回到开始看看fetcher模块和新区块的传播流程,有没有豁然开朗。
  u4 z0 i& w* q& o: b( X4 C
; s% Y& C2 G# ~
BitMere.com 比特池塘系信息发布平台,比特池塘仅提供信息存储空间服务。
声明:该文观点仅代表作者本人,本文不代表比特池塘立场,且不构成建议,请谨慎对待。
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

成为第一个吐槽的人

刘艳琴 小学生
  • 粉丝

    0

  • 关注

    0

  • 主题

    3