Hi 游客

更多精彩,请登录!

比特池塘 区块链技术 正文

以太坊源码分析:fetcher模块和区块传播

刘艳琴
153 0 0
从区块传播策略入手,介绍新区块是如何传播到远端节点,以及新区块加入到远端节点本地链的过程,同时会介绍fetcher模块,fetcher的功能是处理Peer通知的区块信息。在介绍过程中,还会涉及到p2p,eth等模块,不会专门介绍,而是专注区块的传播和加入区块链的过程。5 I5 U4 f# u2 |$ Q3 n' T
当前代码是以太坊Release 1.8,如果版本不同,代码上可能存在差异。
5 O: |" p7 u9 F$ I总体过程和传播策略$ a: l2 r2 X( r% {6 j* f7 D# `
本节从宏观角度介绍,节点产生区块后,为了传播给远端节点做了啥,远端节点收到区块后又做了什么,每个节点都连接了很多Peer,它传播的策略是什么样的?
9 P6 P  I$ v. x2 y5 l总体流程和策略可以总结为,传播给远端Peer节点,Peer验证区块无误后,加入到本地区块链,继续传播新区块信息。具体过程如下。5 v2 U. L: V1 {  y1 u6 b; \3 n
先看总体过程。产生区块后,miner模块会发布一个事件NewMinedBlockEvent,订阅事件的协程收到事件后,就会把新区块的消息,广播给它所有的peer,peer收到消息后,会交给自己的fetcher模块处理,fetcher进行基本的验证后,区块没问题,发现这个区块就是本地链需要的下一个区块,则交给blockChain进一步进行完整的验证,这个过程会执行区块所有的交易,无误后把区块加入到本地链,写入数据库,这个过程就是下面的流程图,图1。
6 m7 u: ~* Q" S, r; j6 k3 o6 ~- `% n% `1 F, ?7 }0 h: A1 V
总体流程图,能看到有个分叉,是因为节点传播新区块是有策略的。它的传播策略为:: g4 ]( Y& }) B* S' c6 @
假如节点连接了N个Peer,它只向Peer列表的sqrt(N)个Peer广播完整的区块消息。向所有的Peer广播只包含区块Hash的消息。) X. `6 q1 p0 e' x# i% E
策略图的效果如图2,红色节点将区块传播给黄色节点:# M9 E3 S1 M- T9 ]
+ `! j$ L; M2 \3 x+ n4 P0 u! o
1 |1 J! k) g8 [1 o4 J% n
收到区块Hash的节点,需要从发送给它消息的Peer那里获取对应的完整区块,获取区块后就会按照图1的流程,加入到fetcher队列,最终插入本地区块链后,将区块的Hash值广播给和它相连,但还不知道这个区块的Peer。非产生区块节点的策略图,如图3,黄色节点将区块Hash传播给青色节点:
; `, h" d- z+ X; C' s  _2 n7 o' f$ s- N% q
至此,可以看出以太坊采用以石击水的方式,像水纹一样,层层扩散新产生的区块。
, @9 X6 A0 Z4 o7 J# L$ B2 uFetcher模块是干啥的/ W& u8 D4 ~' z9 E! ]
fetcher模块的功能,就是收集其他Peer通知它的区块信息:1)完整的区块2)区块Hash消息。根据通知的消息,获取完整的区块,然后传递给eth模块把区块插入区块链。, v, ^- P7 C" C# w3 S! M4 i
如果是完整区块,就可以传递给eth插入区块,如果只有区块Hash,则需要从其他的Peer获取此完整的区块,然后再传递给eth插入区块
* x$ b/ R) N2 Y4 _
5 U: v9 ?2 e8 u源码解读
1 U; U) I% O' l: F; n, Q* _. |本节介绍区块传播和处理的细节东西,方式仍然是先用图解释流程,再是代码流程。" L% g* Q# p9 w
产块节点的传播新区块
' t. e5 l! l& X' h1 f节点产生区块后,广播的流程可以表示为图4:4 x& P+ h- S- @7 p6 Q/ ^9 `
发布事件事件处理函数选择要广播完整的Peer,然后将区块加入到它们的队列事件处理函数把区块Hash添加到所有Peer的另外一个通知队列每个Peer的广播处理函数,会遍历它的待广播区块队列和通知队列,把数据封装成消息,调用P2P接口发送出去$ s4 r& Q6 T0 b+ g

2 {: F3 H) n8 H/ |. ?0 C- W) s- Z/ M7 D, w. g; B2 t
再看下代码上的细节。
$ Q" ?9 _& a6 P, T9 k$ Q. g& s- Yworker.wait()函数发布事件NewMinedBlockEvent。ProtocolManager.minedBroadcastLoop()是事件处理函数。它调用了2次pm.BroadcastBlock()。+ N# f5 M& Z6 @' d0 n" n

" E3 ^& E* y7 ~" W3 d$ l) d5 ~// Mined broadcast loop2 E% s, ?; w( J1 Q/ }6 U, C
func (pm *ProtocolManager) minedBroadcastLoop() {
" K: H6 h8 H' Y, a% Y' ]        // automatically stops if unsubscribe
9 _, H3 _" u0 ~; [3 ^$ [        for obj := range pm.minedBlockSub.Chan() {
) \- _6 w6 v  H) h  [' [8 X                switch ev := obj.Data.(type) {
1 l4 C/ L9 j4 }* w2 @& t  W0 S. a                case core.NewMinedBlockEvent:
3 L8 f+ a0 u/ a& D# |3 q                        pm.BroadcastBlock(ev.Block, true)  // First propagate block to peers
" z4 c" L; i( R! l) {" D& p                        pm.BroadcastBlock(ev.Block, false) // Only then announce to the rest9 e. I. G$ B3 D, v+ A4 u: Y: l
                }" E* Y4 o) q( K) m$ `9 R# J
        }( O8 V( N/ O5 p9 z! y
}
- B9 o! I) T4 a8 v0 |: J: zpm.BroadcastBlock()的入参propagate为真时,向部分Peer广播完整的区块,调用peer.AsyncSendNewBlock(),否则向所有Peer广播区块头,调用peer.AsyncSendNewBlockHash(),这2个函数就是把数据放入队列,此处不再放代码。: P. r( t9 ^: {8 o' S) e3 A
// BroadcastBlock will either propagate a block to a subset of it's peers, or/ l/ R% B& @+ d+ |7 [* z" b
// will only announce it's availability (depending what's requested).
5 H7 R" s( K; p: [. M# hfunc (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) {
( ^7 i2 K. V3 y( i3 M' S! b1 j        hash := block.Hash(), K1 ^8 \( x' Q8 @- |  @* ^
        peers := pm.peers.PeersWithoutBlock(hash)
6 R' v+ e# F, l: V' |2 ~        // If propagation is requested, send to a subset of the peer( M2 @- g3 x. g  X4 i" m
        // 这种情况,要把区块广播给部分peer3 Q& ^& E/ L: h1 H% H9 u5 O
        if propagate {: i0 d9 v$ w) _; I; [0 A6 D% C
                // Calculate the TD of the block (it's not imported yet, so block.Td is not valid)8 i5 l- D5 l, C5 f/ Q
                // 计算新的总难度# j8 d. K' H0 b! I9 E* B6 `
                var td *big.Int
% o) }/ v4 s, _/ J1 G' A/ \                if parent := pm.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1); parent != nil {
! O7 L1 o: ]1 C- x9 E3 G* g8 M                        td = new(big.Int).Add(block.Difficulty(), pm.blockchain.GetTd(block.ParentHash(), block.NumberU64()-1))% M9 M! w; ]7 c. H" l4 d8 l( a
                } else {
1 [1 F! s+ ?  B# o# N: y3 @                        log.Error("Propagating dangling block", "number", block.Number(), "hash", hash)
2 j' A7 [0 A+ o, Z                        return
8 x' g7 K1 ?7 u                }
$ R6 B% t0 d& b- Q! O                // Send the block to a subset of our peers
/ r8 f1 m1 }4 m0 F' A                // 广播区块给部分peer
/ ~. p  o8 Y3 c" Q5 @+ `                transfer := peers[:int(math.Sqrt(float64(len(peers))))]
9 D- X* s: t# J                for _, peer := range transfer {# J2 X* K- i. B- D3 t3 E
                        peer.AsyncSendNewBlock(block, td)
  F% H0 V) H' F  [                }
7 J8 C8 \8 ^! K7 I: o3 \! b$ h                log.Trace("Propagated block", "hash", hash, "recipients", len(transfer), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))
' I: I* J8 a6 d; k0 `. T& M9 o                return
' N+ o7 a0 O) [  G  w' F: M: ]        }
- U0 ~) u. t9 g$ a2 |        // Otherwise if the block is indeed in out own chain, announce it
4 b7 h) k; N; a% h, x        // 把区块hash值广播给所有peer
1 E' Q& W& g0 h9 e        if pm.blockchain.HasBlock(hash, block.NumberU64()) {" z2 ~' Y% I, c& E! B
                for _, peer := range peers {
% C3 _% F* H! |" A$ r) V& x/ L/ c                        peer.AsyncSendNewBlockHash(block)
6 I* }. K+ M# n2 E                }
6 _; q1 H$ W+ @4 a2 ~                log.Trace("Announced block", "hash", hash, "recipients", len(peers), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))3 ]* w2 ?! V# W% \4 ^
        }4 E' H4 z$ d4 J1 C
}, G8 `, d& s) s4 V5 A4 q4 @
peer.broadcase()是每个Peer连接的广播函数,它只广播3种消息:交易、完整的区块、区块的Hash,这样表明了节点只会主动广播这3中类型的数据,剩余的数据同步,都是通过请求-响应的方式。
( s, b' R8 m! u9 _' G$ R// broadcast is a write loop that multiplexes block propagations, announcements" ~1 g. K3 p8 K/ V7 D- \
// and transaction broadcasts into the remote peer. The goal is to have an async+ K/ m" E' V5 Y% C
// writer that does not lock up node internals.
  Q4 G0 k" z; Rfunc (p *peer) broadcast() {8 @0 h) r, Q; U" u. n/ E1 ]
        for {
7 j' R  _+ K! O# D3 K7 a, d                select {1 H. N$ F8 Q& F7 x( d& s1 A! P
                // 广播交易
8 b/ r% [2 c) U: ~5 Y: F                case txs := ( C3 T) N2 ^6 A, x
Peer节点处理新区块
5 L8 v7 @4 P8 w; @本节介绍远端节点收到2种区块同步消息的处理,其中NewBlockMsg的处理流程比较清晰,也简洁。NewBlockHashesMsg消息的处理就绕了2绕,从总体流程图1上能看出来,它需要先从给他发送消息Peer那里获取到完整的区块,剩下的流程和NewBlockMsg又一致了。& {7 h& l1 r, _. O3 D' C
这部分涉及的模块多,画出来有种眼花缭乱的感觉,但只要抓住上面的主线,代码看起来还是很清晰的。通过图5先看下整体流程。8 m$ G6 \7 W- e+ J4 V; }" r
消息处理的起点是ProtocolManager.handleMsg,NewBlockMsg的处理流程是蓝色标记的区域,红色区域是单独的协程,是fetcher处理队列中区块的流程,如果从队列中取出的区块是当前链需要的,校验后,调用blockchian.InsertChain()把区块插入到区块链,最后写入数据库,这是黄色部分。最后,绿色部分是NewBlockHashesMsg的处理流程,代码流程上是比较复杂的,为了能通过图描述整体流程,我把它简化掉了。
3 w; c+ b7 S" s$ k
- P2 Z" f# @1 m' s1 {; E9 f9 q8 |仔细看看这幅图,掌握整体的流程后,接下来看每个步骤的细节。2 \  K; T. Q6 p9 [
NewBlockMsg的处理' Q6 z- Q6 d( ]7 V! X: S2 D
本节介绍节点收到完整区块的处理,流程如下:
6 t1 `* q9 J/ |7 X! p# L3 Y首先进行RLP编解码,然后标记发送消息的Peer已经知道这个区块,这样本节点最后广播这个区块的Hash时,不会再发送给该Peer。# n, o0 Z+ }4 }+ ~/ Y4 j" i
将区块存入到fetcher的队列,调用fetcher.Enqueue。' S  e$ b( r3 ?( l
更新Peer的Head位置,然后判断本地链是否落后于Peer的链,如果是,则通过Peer更新本地链。
2 h1 _0 b& A7 i% v只看handle.Msg()的NewBlockMsg相关的部分。$ Z; j* G- @5 G
case msg.Code == NewBlockMsg:
& e0 Q0 I9 I- `& ?/ R7 u        // Retrieve and decode the propagated block
( c' G4 h1 ]  T- D# A        // 收到新区块,解码,赋值接收数据
6 W% |4 Z, f$ Q  ]        var request newBlockData
2 q  P( e7 ?0 r. c* S1 y        if err := msg.Decode(&request); err != nil {3 S0 s$ E; l5 _9 t* `2 I
                return errResp(ErrDecode, "%v: %v", msg, err)
( g/ }2 k; @; Q: c        }
3 O+ D* r5 |. G/ ~3 Y        request.Block.ReceivedAt = msg.ReceivedAt
" ]( i& U3 k# B# K        request.Block.ReceivedFrom = p% y/ g, p# ?$ a% k$ |' B) N+ J
        // Mark the peer as owning the block and schedule it for import8 Y# T  i5 W; B% W& K$ r
        // 标记peer知道这个区块
) [% {# s  Q& k1 M" F8 \& B        p.MarkBlock(request.Block.Hash())9 B( I9 G/ ^2 Y
        // 为啥要如队列?已经得到完整的区块了! f+ ?' n- _( F& p1 h
        // 答:存入fetcher的优先级队列,fetcher会从队列中选取当前高度需要的块
. {; L! r3 \6 |' V; d" B" e7 h        pm.fetcher.Enqueue(p.id, request.Block)
' d. s: f9 e% C) W0 f* ~        // Assuming the block is importable by the peer, but possibly not yet done so,
) Q8 ]% H6 ]) c        // calculate the head hash and TD that the peer truly must have.5 c/ m- U5 w% n6 W& N+ N) m
        // 截止到parent区块的头和难度
5 F  M& }) f% p5 ?3 _        var (! _5 a. G- s, a2 w
                trueHead = request.Block.ParentHash()5 j$ k: T$ u3 B5 l1 [, {% z
                trueTD   = new(big.Int).Sub(request.TD, request.Block.Difficulty())( v! @' u& a! \
        )$ H) }) }: c$ j! ^
        // Update the peers total difficulty if better than the previous$ w# N- \' D! S: k) N
        // 如果收到的块的难度大于peer之前的,以及自己本地的,就去和这个peer同步
. M" |6 }) q$ X1 d& R; w; c        // 问题:就只用了一下块里的hash指,为啥不直接使用这个块呢,如果这个块不能用,干嘛不少发送些数据,减少网络负载呢。
; g! u2 R* R# @        // 答案:实际上,这个块加入到了优先级队列中,当fetcher的loop检查到当前下一个区块的高度,正是队列中有的,则不再向peer请求. X4 z: p- w! T2 {" I% y
        // 该区块,而是直接使用该区块,检查无误后交给block chain执行insertChain
" U4 Q8 L1 N' z/ V        if _, td := p.Head(); trueTD.Cmp(td) > 0 {
2 U: `5 `0 C# h2 S& [: {# J4 j3 Y                p.SetHead(trueHead, trueTD)
/ B9 g& q* T8 E( ?( a                // Schedule a sync if above ours. Note, this will not fire a sync for a gap of9 j7 C" L, q9 \) Z$ q* i/ [1 ~- _9 f2 v
                // a singe block (as the true TD is below the propagated block), however this9 g+ X) J  @" L0 x& T  h
                // scenario should easily be covered by the fetcher.
& F4 r% K) z$ g: t. M! ~9 `  ]                currentBlock := pm.blockchain.CurrentBlock()* p- ]2 w. N) w6 ]) i5 `
                if trueTD.Cmp(pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64())) > 0 {
3 s5 r  ?8 n* D& i* w* U/ ^) E5 `                        go pm.synchronise(p)
) V- I; o' [3 ]  U                }% w; v) B' m" F3 n+ K, M
        }
0 c. k5 E: @! N: a2 x//------------------------ 以上 handleMsg& j! P% }5 w4 \: j- [" k# `- n- l0 B. z
// Enqueue tries to fill gaps the the fetcher's future import queue.* W" v/ x  C: N4 [
// 发给inject通道,当前协程在handleMsg,通过通道发送给fetcher的协程处理
3 t$ X1 t, G. A4 C3 ffunc (f *Fetcher) Enqueue(peer string, block *types.Block) error {/ L! r9 [& f2 U6 R1 g3 m' c
        op := &inject{% t7 l5 |* x( n  ]. _
                origin: peer,. K( I/ u2 \/ h- f: D, {  W9 _
                block:  block,& R  W+ E: p% ?$ o3 V
        }
; o0 y" J- `7 ]$ x; ]7 E: r        select {+ Q+ E1 ?0 Q6 L7 Q- a1 F$ L
        case f.inject  blockLimit {; n7 D& X% s1 t
                log.Debug("Discarded propagated block, exceeded allowance", "peer", peer, "number", block.Number(), "hash", hash, "limit", blockLimit)
0 g8 y$ `- L) J3 ^1 B6 [                propBroadcastDOSMeter.Mark(1)
& s9 h7 D6 A' J. [                f.forgetHash(hash)
: F- r+ y- Q, y2 `7 R                return
- k! n# q# N' Q7 O3 Y9 S. k        }" i# X$ ?8 v0 i3 {
        // Discard any past or too distant blocks7 Y# E! e8 [* m
        // 高度检查:未来太远的块丢弃
1 n* ]9 p  H. l# g9 L" E        if dist := int64(block.NumberU64()) - int64(f.chainHeight()); dist  maxQueueDist {% n; T& m$ O$ N+ u
                log.Debug("Discarded propagated block, too far away", "peer", peer, "number", block.Number(), "hash", hash, "distance", dist)' ^/ f6 C) _, b- h" b. ?
                propBroadcastDropMeter.Mark(1)
0 j/ x* h3 \5 c                f.forgetHash(hash). F- K4 V4 _' @( [; `; Z0 b! d
                return3 ^( S  f4 W9 O, K% ?9 i3 o
        }
2 P3 f: v# {& ]2 k        // Schedule the block for future importing
' M- q7 m$ u% E5 Q/ M8 Y7 O        // 块先加入优先级队列,加入链之前,还有很多要做
0 `, k, ?* \# W1 c$ N        if _, ok := f.queued[hash]; !ok {
4 [+ R: p0 D5 s                op := &inject{
  I+ l: R! o6 t                        origin: peer,8 P% U# j3 D1 J+ d6 Q3 c' O) c
                        block:  block,
' Z+ P5 W5 `! v6 Z  T                }
# m0 S2 Q" G3 i                f.queues[peer] = count6 f6 A* g* H( x% `, A
                f.queued[hash] = op- w4 i2 m' T" c" ^7 j$ R8 u7 x
                f.queue.Push(op, -float32(block.NumberU64()))* W" z3 i) O7 S+ `( g
                if f.queueChangeHook != nil {
% Z8 n; W: n  l" s; ?' l                        f.queueChangeHook(op.block.Hash(), true)6 h. N: L% o. ~5 R
                }& a/ J) r' w6 v$ s) u: D* w, Z
                log.Debug("Queued propagated block", "peer", peer, "number", block.Number(), "hash", hash, "queued", f.queue.Size())
3 h6 l! K& U# b4 s. D- \# `- j        }. D$ m4 L! o1 q
}
  u& A: S7 _* w# U  @( `& W/ l4 hfetcher队列处理
/ g, m: X) ?/ N8 o$ m本节我们看看,区块加入队列后,fetcher如何处理区块,为何不直接校验区块,插入到本地链?; \, F9 ~( T# a
由于以太坊又Uncle的机制,节点可能收到老一点的一些区块。另外,节点可能由于网络原因,落后了几个区块,所以可能收到“未来”的一些区块,这些区块都不能直接插入到本地链。: p  H9 a! Q) R
区块入的队列是一个优先级队列,高度低的区块会被优先取出来。fetcher.loop是单独协程,不断运转,清理fecther中的事务和事件。首先会清理正在fetching的区块,但已经超时。然后处理优先级队列中的区块,判断高度是否是下一个区块,如果是则调用f.insert()函数,校验后调用BlockChain.InsertChain(),成功插入后,广播新区块的Hash
6 Q8 N( F$ ?+ H( Y/ h// Loop is the main fetcher loop, checking and processing various notification
) z# {7 h9 T9 V+ N5 p7 h// events.1 d# k% W, q% _: e+ E* r
func (f *Fetcher) loop() {
1 P/ g9 R4 D# l5 V; r$ N        // Iterate the block fetching until a quit is requested% i) f& w- i+ ~" o% H5 M2 c; i& h
        fetchTimer := time.NewTimer(0)/ @# w4 M  s( i4 V0 ~' J
        completeTimer := time.NewTimer(0)) r, C7 U8 ^% w0 y
        for {# s3 e6 l% J3 K( `- |" {! P" w6 s
                // Clean up any expired block fetches
7 j2 N  \1 @7 b: E- d$ _: ]* V                // 清理过期的区块
8 D: ~! Z0 Y. r: r& p3 b                for hash, announce := range f.fetching {3 \0 `- m/ x) o2 L) U# B. Y) @/ Y# y6 i
                        if time.Since(announce.time) > fetchTimeout {
, D* q' s* L3 H& s                                f.forgetHash(hash)
/ d, n9 e0 k% w$ q  N5 Z                        }! H( g' z; R$ s2 `; h4 t( Z
                }
8 T4 A) n6 A9 c$ p% _' \* O% V, f                // Import any queued blocks that could potentially fit# \3 {# C  g( X  {- l0 h
                // 导入队列中合适的块
5 `3 Q! a/ [- i* N- L7 l                height := f.chainHeight()
# L$ T2 I6 y, Y1 z+ K                for !f.queue.Empty() {% c# R1 S3 H) n; X
                        op := f.queue.PopItem().(*inject)* ]- E# T0 q* `, r; O" T4 [
                        hash := op.block.Hash()2 Q7 p( x" w7 @8 a9 U4 H
                        if f.queueChangeHook != nil {+ k9 }4 T# \- X/ P# C* A* q
                                f.queueChangeHook(hash, false)$ b2 U6 X8 j' o" I, j
                        }% \6 {! v! m3 T
                        // If too high up the chain or phase, continue later; Q4 h7 F6 x! k0 l7 z3 a' {
                        // 块不是链需要的下一个块,再入优先级队列,停止循环
. J7 `9 b4 a; }. l                        number := op.block.NumberU64()
7 p$ ~3 V+ K+ |7 v                        if number > height+1 {
5 G4 v7 `( {2 ~7 r" R0 d1 p; e                                f.queue.Push(op, -float32(number))2 j. [# ~" y, o: J+ w- t% E4 w
                                if f.queueChangeHook != nil {
8 |& V4 n6 }& Y# l                                        f.queueChangeHook(hash, true)# W3 b0 D* m! u: K# X/ s6 T
                                }
+ {/ n: V5 M+ D" n2 d8 W" O                                break
6 {6 z( `9 E; Y* y# ~                        }+ g* \& t  J3 G  c% r+ k; ?  \" l4 w
                        // Otherwise if fresh and still unknown, try and import
0 W! j9 l0 K0 s- R* N                        // 高度正好是我们想要的,并且链上也没有这个块. x* g, }- W% p- a5 \4 _* w
                        if number+maxUncleDist # b" V) b* n/ |2 P; M& p! L, @/ ]- H3 Z
func (f *Fetcher) insert(peer string, block *types.Block) {
/ P! ~1 P; _, O: ^0 h        hash := block.Hash()
4 T8 B  h) j  {/ I* x, N        // Run the import on a new thread- P. o) s, ^* T+ i
        log.Debug("Importing propagated block", "peer", peer, "number", block.Number(), "hash", hash)
+ f$ J+ W9 f6 a! e- Y        go func() {
) a) `8 R$ c/ x3 A                defer func() { f.done
6 t% f% n( k7 j- ?NewBlockHashesMsg的处理
/ g/ R7 V# F# f. o0 D2 v% R6 |6 N本节介绍NewBlockHashesMsg的处理,其实,消息处理是简单的,而复杂一点的是从Peer哪获取完整的区块,下节再看。* f3 ?9 R5 X( u- P. z- Z/ n
流程如下:* [) f1 h/ z% l: t
对消息进行RLP解码,然后标记Peer已经知道此区块。寻找出本地区块链不存在的区块Hash值,把这些未知的Hash通知给fetcher。fetcher.Notify记录好通知信息,塞入notify通道,以便交给fetcher的协程。fetcher.loop()会对notify中的消息进行处理,确认区块并非DOS攻击,然后检查区块的高度,判断该区块是否已经在fetching或者comleting(代表已经下载区块头,在下载body),如果都没有,则加入到announced中,触发0s定时器,进行处理。
9 h# j; w# V( A9 y7 J关于announced下节再介绍。
7 z2 e" w* U5 q5 M; i9 l0 |2 g
( L8 A6 ~/ R3 ^$ Z: R+ b// handleMsg()部分2 Z/ s- \) w) E9 U: ]
case msg.Code == NewBlockHashesMsg:
9 u( [; A& b0 D        var announces newBlockHashesData
0 U5 O( W/ ?+ s8 p! ^9 e+ }        if err := msg.Decode(&announces); err != nil {
/ x2 M6 j: [+ X5 ?3 E1 G                return errResp(ErrDecode, "%v: %v", msg, err)
5 ]" X0 z: Z% F        }
5 n, E. X* M/ ~$ A7 G        // Mark the hashes as present at the remote node
' x9 H% f$ u: W9 @1 P8 t! v: p4 M        for _, block := range announces {7 ^- ?. L1 z) {5 o1 M$ S0 l
                p.MarkBlock(block.Hash)( J4 q+ Z  |; ~4 c9 u5 N
        }! @% H) |$ C, b( y5 B
        // Schedule all the unknown hashes for retrieval
( q! k) j2 H  a7 L9 f. g' R% M; f        // 把本地链没有的块hash找出来,交给fetcher去下载
$ x+ ~0 L, B" J3 o& k' Q$ C' v* [        unknown := make(newBlockHashesData, 0, len(announces))
% D( S# D# m% n* q9 `* G        for _, block := range announces {
: T+ R, q. k; w                if !pm.blockchain.HasBlock(block.Hash, block.Number) {+ K! Q; f) B0 h3 g9 s0 ~+ L5 e6 H; k
                        unknown = append(unknown, block)
6 E$ D& \3 m, R- T0 T" l                }2 S- \6 y* b% \* l1 R
        }
4 B. {! _/ q: ]3 w        for _, block := range unknown {! T; o3 Y; g. @( f0 s
                pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestOneHeader, p.RequestBodies)8 U* q- V4 g3 m
        }) j  c; ~2 F6 G  j" z' s* `
// Notify announces the fetcher of the potential availability of a new block in
+ w$ k" T! w* Q. ?! [// the network.5 C* P$ b0 E2 K( Z; w, d
// 通知fetcher(自己)有新块产生,没有块实体,有hash、高度等信息! O9 j8 m$ R. ?4 g  t9 Z
func (f *Fetcher) Notify(peer string, hash common.Hash, number uint64, time time.Time,
- }/ b! M  g( o. n        headerFetcher headerRequesterFn, bodyFetcher bodyRequesterFn) error {
  p$ s& s2 O3 P* l        block := &announce{5 h* E& [8 L- H9 ]8 v+ o; \; M" q
                hash:        hash,5 w6 ]8 N( k& E/ X
                number:      number,( p9 |+ ~8 z* @5 ^, `: P
                time:        time,9 _4 z4 n; [$ @% y. Y) m% e" V5 z
                origin:      peer,
; _, k7 K  e% @  M9 c" f                fetchHeader: headerFetcher,9 X3 J6 z& a! |; o4 `$ y3 M0 {
                fetchBodies: bodyFetcher,$ X6 t. u: b) z
        }5 n+ \4 p/ S' X1 d
        select {
4 P3 ?& _* F' k% q        case f.notify  hashLimit {# z, H2 O+ {, A5 P# z9 L
                log.Debug("Peer exceeded outstanding announces", "peer", notification.origin, "limit", hashLimit)
- v1 o/ ^" @2 o+ Q; M; P4 V4 y                propAnnounceDOSMeter.Mark(1)5 u5 e: w3 q  j! _" K% {4 q  t
                break5 C% \4 H  q( ~' W9 w( O: ~7 E* Q
        }" [6 x( r- o3 A7 f9 B
        // If we have a valid block number, check that it's potentially useful* c7 r! k& n* K3 k  A6 P
        // 高度检查& d+ K2 M6 J# O* G
        if notification.number > 0 {3 j% r3 s9 M8 P% k
                if dist := int64(notification.number) - int64(f.chainHeight()); dist  maxQueueDist {
0 X% S4 Z% R+ r) t" n) }& m                        log.Debug("Peer discarded announcement", "peer", notification.origin, "number", notification.number, "hash", notification.hash, "distance", dist)2 @* I/ ?8 q  z5 z  N' E% t) h
                        propAnnounceDropMeter.Mark(1)
' T# r) P# _: l3 u                        break+ {# ?, Q2 E1 }% P  N. d8 D" ^
                }1 ~0 c4 ~% \( q# i7 \% c
        }
5 S5 }: ~# u/ A2 j9 J$ d        // All is well, schedule the announce if block's not yet downloading# G& l1 T3 j6 U( F
        // 检查是否已经在下载,已下载则忽略
1 C- h) C- G& g        if _, ok := f.fetching[notification.hash]; ok {
* C. A2 n' E; D                break
/ K: B" U; K! H: s        }( k  L5 ~$ t% _8 s. f
        if _, ok := f.completing[notification.hash]; ok {
& h) k  c2 a  S* j1 T! w- U& Q                break$ r* t- x3 f9 Y& }
        }
/ p8 B9 `# {! R7 n        // 更新peer已经通知给我们的区块数量; L1 H6 V) ~1 q% ~4 ?' g
        f.announces[notification.origin] = count/ a$ n2 L4 }- F9 X- E- F
        // 把通知信息加入到announced,供调度
0 |; K  z) [3 t1 A3 e4 O        f.announced[notification.hash] = append(f.announced[notification.hash], notification)$ Z/ k& a4 d% r
        if f.announceChangeHook != nil && len(f.announced[notification.hash]) == 1 {
/ y6 e6 _) p3 l  j# J3 Q                f.announceChangeHook(notification.hash, true)8 y$ `9 i; |& g1 Z) X
        }- l! K& w# k. \7 z; h
        if len(f.announced) == 1 {3 {( C2 l- o9 R+ _" o/ ~* ?
                // 有通知放入到announced,则重设0s定时器,loop的另外一个分支会处理这些通知
$ |" u# w" T9 t4 a& D4 y' s* h  l                f.rescheduleFetch(fetchTimer)
$ f# V( f5 Z( l+ p" }        }! w1 Q$ o% L& g7 n
fetcher获取完整区块
! O% H* t, |) J" q$ m0 S% `5 C3 N$ C4 X本节介绍fetcher获取完整区块的过程,这也是fetcher最重要的功能,会涉及到fetcher至少80%的代码。单独拉放一大节吧。
) P  U5 D) Q& ]7 VFetcher的大头
, w7 g9 ~; G8 ^3 G+ L0 w" ]3 gFetcher最主要的功能就是获取完整的区块,然后在合适的实际交给InsertChain去验证和插入到本地区块链。我们还是从宏观入手,看Fetcher是如何工作的,一定要先掌握好宏观,因为代码层面上没有这么清晰。
  e# P1 b- [+ l# J宏观) L) c9 z- ?) {8 R0 o: O: y
首先,看两个节点是如何交互,获取完整区块,使用时序图的方式看一下,见图6,流程很清晰不再文字介绍。6 H$ d& J1 j: i) E; b/ L

, O# ?% M. L. U! C" r8 L7 V2 K再看下获取区块过程中,fetcher内部的状态转移,它使用状态来记录,要获取的区块在什么阶段,见图7。我稍微解释一下:
0 y8 f) m7 i: B# e& \) @收到NewBlockHashesMsg后,相关信息会记录到announced,进入announced状态,代表了本节点接收了消息。announced由fetcher协程处理,经过校验后,会向给他发送消息的Peer发送请求,请求该区块的区块头,然后进入fetching状态。获取区块头后,如果区块头表示没有交易和uncle,则转移到completing状态,并且使用区块头合成完整的区块,加入到queued优先级队列。获取区块头后,如果区块头表示该区块有交易和uncle,则转移到fetched状态,然后发送请求,请求交易和uncle,然后转移到completing状态。收到交易和uncle后,使用头、交易、uncle这3个信息,生成完整的区块,加入到队列queued。
2 j- Y7 h3 H' Y" I- ?4 t
; m; w3 C$ e% D' o0 o, U# [& G4 ?1 m( N
微观
! |4 g6 m/ ~2 Y- ~1 }; N" r接下来就是从代码角度看如何获取完整区块的流程了,有点多,看不懂的时候,再回顾下上面宏观的介绍图。
  E1 y8 O( f- \+ r) ^) q首先看Fetcher的定义,它存放了通信数据和状态管理,捡加注释的看,上文提到的状态,里面都有。
0 W6 ]! q8 N, z// Fetcher is responsible for accumulating block announcements from various peers3 n- X# |9 U# g& E, d  u3 V" _
// and scheduling them for retrieval.  j3 ~) t4 s2 H1 R9 Z" e
// 积累块通知,然后调度获取这些块3 n5 o( y' h4 D3 v! v9 B& _
type Fetcher struct {3 G" I# @$ y( |4 r
        // Various event channels
  P, L  C' y4 }- h) L    // 收到区块hash值的通道% g9 d* m( U7 I3 `  }
        notify chan *announce6 j  e* H* x$ O, J& o
    // 收到完整区块的通道! f7 T! M7 _3 X) F" c& i
        inject chan *inject8 m) l" M  s% S; [% ~% H" m
        blockFilter chan chan []*types.Block; L+ k/ s( C5 C* d& B# f# O$ ?
        // 过滤header的通道的通道
; E( B! d4 ]4 [9 O  Z, Y% c        headerFilter chan chan *headerFilterTask
; \6 I& k6 v, r0 a) E( ~        // 过滤body的通道的通道
6 J. [% K1 @( G, ~        bodyFilter chan chan *bodyFilterTask, B) ?5 ^  c  t/ ?. _3 F, |
        done chan common.Hash
8 u- Q% @& m: h2 v. }        quit chan struct{}
7 A0 n( m) e! s) {        // Announce states' O1 h% I$ j9 \7 n" x
        // Peer已经给了本节点多少区块头通知
% ]% T3 F. A+ Z9 K( v$ U        announces map[string]int // Per peer announce counts to prevent memory exhaustion3 S; f- @' U; }: }' c* ^4 S* W' a
        // 已经announced的区块列表; A# g5 j0 n- v  _
        announced map[common.Hash][]*announce // Announced blocks, scheduled for fetching' f4 S8 B  `5 `+ ~8 \. D& l  k" B3 U7 M
        // 正在fetching区块头的请求
; V+ o4 [4 k" Z4 d3 {! y. R        fetching map[common.Hash]*announce // Announced blocks, currently fetching
" o8 o  r* B9 w1 A+ l2 `        // 已经fetch到区块头,还差body的请求,用来获取body
9 Q: L5 b* g9 c3 Z, ]+ i) a; F        fetched map[common.Hash][]*announce // Blocks with headers fetched, scheduled for body retrieval) B+ X: S; e% I+ d
        // 已经得到区块头的& U  ]. x; C6 T+ N  G* {# F8 N
        completing map[common.Hash]*announce // Blocks with headers, currently body-completing
6 B9 {( R& l5 K% Z8 y! `+ x# I        // Block cache4 [3 l; L8 E5 O) R* @0 x4 {. ]
        // queue,优先级队列,高度做优先级
- M; n8 U$ F5 N        // queues,统计peer通告了多少块
# e2 `/ N, B* [' _  T( U5 x        // queued,代表这个块如队列了,1 g3 p2 x, j( K' M# R
        queue  *prque.Prque            // Queue containing the import operations (block number sorted)3 `& g0 ~2 F, e" r
        queues map[string]int          // Per peer block counts to prevent memory exhaustion
) F* }- E2 ^( r/ P# b        queued map[common.Hash]*inject // Set of already queued blocks (to dedupe imports)
- r; O2 }( d" j1 N/ r0 |$ _( W4 o. H        // Callbacks
- W/ z- y! Z5 l- ]! M        getBlock       blockRetrievalFn   // Retrieves a block from the local chain
* E1 b) D( v) @3 o4 F1 r        verifyHeader   headerVerifierFn   // Checks if a block's headers have a valid proof of work,验证区块头,包含了PoW验证
, f: i+ Q6 B4 N( C4 y        broadcastBlock blockBroadcasterFn // Broadcasts a block to connected peers,广播给peer
$ |# t: E! Y5 h        chainHeight    chainHeightFn      // Retrieves the current chain's height
! `3 Q9 ?2 Z1 U9 a        insertChain    chainInsertFn      // Injects a batch of blocks into the chain,插入区块到链的函数
2 a8 Y% r( m$ I  t        dropPeer       peerDropFn         // Drops a peer for misbehaving
' n* T1 K( F; c% v        // Testing hooks& j, _' D: @; J/ P, {2 E8 x: r
        announceChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a hash from the announce list
. ?5 x' p% C3 h7 ?& A  X" M        queueChangeHook    func(common.Hash, bool) // Method to call upon adding or deleting a block from the import queue
! ?- w. ]! H! |. E& J% \9 w1 L        fetchingHook       func([]common.Hash)     // Method to call upon starting a block (eth/61) or header (eth/62) fetch
# w! Y2 V( _+ Y) T        completingHook     func([]common.Hash)     // Method to call upon starting a block body fetch (eth/62)( Z, \. v4 g! z9 j) A
        importedHook       func(*types.Block)      // Method to call upon successful block import (both eth/61 and eth/62)" f3 n  E3 k9 x8 Q  R3 Z
}
" u! A- b# O- s2 ^NewBlockHashesMsg消息的处理前面的小节已经讲过了,不记得可向前翻看。这里从announced的状态处理说起。loop()        中,fetchTimer超时后,代表了收到了消息通知,需要处理,会从announced中选择出需要处理的通知,然后创建请求,请求区块头,由于可能有很多节点都通知了它某个区块的Hash,所以随机的从这些发送消息的Peer中选择一个Peer,发送请求的时候,为每个Peer都创建了单独的协程。. t; q! G8 t7 \( |" }  X
case  arriveTimeout-gatherSlack {2 r& x3 i, g" ^: }
                        // Pick a random peer to retrieve from, reset all others
5 K- i: }+ A1 [" j- X, Q                        // 可能有很多peer都发送了这个区块的hash值,随机选择一个peer/ Q+ g9 i, @) |2 q# ?+ W; W) E
                        announce := announces[rand.Intn(len(announces))]
" _7 F* U  N# C3 K                        f.forgetHash(hash)
7 p4 ^$ ]" D5 P; b$ Z7 O                        // If the block still didn't arrive, queue for fetching
* X0 W, H! E/ ]3 k1 O& ~" O" ^" a- }                        // 本地还没有这个区块,创建获取区块的请求- t1 p8 ?2 {# {& x5 J$ [9 e  h7 W
                        if f.getBlock(hash) == nil {; e; f2 t  u" M6 B8 O
                                request[announce.origin] = append(request[announce.origin], hash)
% S: N7 Y* b. R' X                                f.fetching[hash] = announce9 V4 O- Q1 h, T% Z9 ^' {6 @9 v
                        }+ I9 u" k4 f' h: W; v+ ], E) [
                }
6 U! ~7 [9 j' a) ?: ?        }
5 R; q* B" }1 p7 M" t2 p9 ~        // Send out all block header requests5 h7 |( D% g4 E* Y
        // 把所有的request发送出去
5 |: B; U; l9 _# H; L  j6 _        // 为每一个peer都创建一个协程,然后请求所有需要从该peer获取的请求
  R! u- j" O: z: I        for peer, hashes := range request {
5 A; ?# f9 P1 R  y; \9 x( y                log.Trace("Fetching scheduled headers", "peer", peer, "list", hashes)" ]* [! V- H" G' Q+ v
                // Create a closure of the fetch and schedule in on a new thread0 f2 B) M9 q; C' N" p7 K
                fetchHeader, hashes := f.fetching[hashes[0]].fetchHeader, hashes- v( x, S! F! c
                go func() {! ]" C7 ?& W+ Z# }- c
                        if f.fetchingHook != nil {
2 F4 b6 X. w0 V3 k; m7 s3 v                                f.fetchingHook(hashes)
4 }$ ^& x# B& R# {/ X0 H5 S9 Y                        }
* b7 P2 u% M# b( u3 z                        for _, hash := range hashes {% h4 I' F$ b$ k
                                headerFetchMeter.Mark(1)
- {8 j+ \, e  V4 [) o, H                                fetchHeader(hash) // Suboptimal, but protocol doesn't allow batch header retrievals
& g7 D4 d( v2 ]0 R                        }
/ R# l4 N4 @" S" U                }()2 w2 ?- {  B: f) _) Q, U
        }
* X0 s' f! l- w: V9 m        // Schedule the next fetch if blocks are still pending, M5 g8 O" c& V$ m
        f.rescheduleFetch(fetchTimer)* M# E2 M/ N/ e9 A2 y
从Notify的调用中,可以看出,fetcherHeader()的实际函数是RequestOneHeader(),该函数使用的消息是GetBlockHeadersMsg,可以用来请求多个区块头,不过fetcher只请求一个。
5 G% U0 @4 w( u7 c) E4 \pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestOneHeader, p.RequestBodies)
0 N, M( `, n2 o$ g9 @, B// RequestOneHeader is a wrapper around the header query functions to fetch a
" w5 b: ^$ C- h0 K// single header. It is used solely by the fetcher.
/ F* Q  R- H0 h/ rfunc (p *peer) RequestOneHeader(hash common.Hash) error {
% b8 W/ X8 I4 ^  N" }% O: [4 F        p.Log().Debug("Fetching single header", "hash", hash)1 u% P' g6 |1 j# t
        return p2p.Send(p.rw, GetBlockHeadersMsg, &getBlockHeadersData{Origin: hashOrNumber{Hash: hash}, Amount: uint64(1), Skip: uint64(0), Reverse: false})7 w. Z; |' C* z# W
}
& y8 R( W( z0 t- q5 I+ }* V8 ], y8 kGetBlockHeadersMsg的处理如下:因为它是获取多个区块头的,所以处理起来比较“麻烦”,还好,fetcher只获取一个区块头,其处理在20行~33行,获取下一个区块头的处理逻辑,这里就不看了,最后调用SendBlockHeaders()将区块头发送给请求的节点,消息是BlockHeadersMsg。6 f/ k/ O' \! r/ p2 l( T! Q& \$ r+ B7 @' G
···# X+ M) n$ L0 z
// handleMsg()
' ^8 @7 i5 z# L- \9 M- {// Block header query, collect the requested headers and reply
, o; s0 ?0 Z- W* K! B! w; kcase msg.Code == GetBlockHeadersMsg:1 s$ X$ }2 b; n& D0 r
// Decode the complex header query
) R) v: ?8 L4 z* m- y- M$ {! `var query getBlockHeadersData
# A$ g9 t" k1 x) I: lif err := msg.Decode(&query); err != nil {
% q3 M! z* o+ s$ z2 greturn errResp(ErrDecode, “%v: %v”, msg, err)
1 o: t6 |4 F2 o# Q2 d. ]% h5 E8 a+ O, m2 l}  N/ o- q- L5 R# P* y4 b6 R3 S
hashMode := query.Origin.Hash != (common.Hash{})
$ ]$ f1 ^. C: Q! Y' j2 ?! {: s// Gather headers until the fetch or network limits is reached
0 J- X7 g) |& s8 P6 O// 收集区块头,直到达到限制
8 C: X' D, R) K/ c! b! zvar (3 X; f1 B. |- i! A2 A# G* B( b
        bytes   common.StorageSize9 {6 W6 `- F1 {+ [; x5 d* S
        headers []*types.Header; T; T3 w/ U6 b& `
        unknown bool% W  Q0 D* i9 Z  m+ G) X
)
" b9 I: o5 M) ?! \7 z/ H// 自己已知区块 && 少于查询的数量 && 大小小于2MB && 小于能下载的最大数量4 P6 M  `. r6 p
for !unknown && len(headers)
3 b& c3 z9 G8 ~* f+ g8 q`BlockHeadersMsg`的处理很有意思,因为`GetBlockHeadersMsg`并不是fetcher独占的消息,downloader也可以调用,所以,响应消息的处理需要分辨出是fetcher请求的,还是downloader请求的。它的处理逻辑是:fetcher先过滤收到的区块头,如果fetcher不要的,那就是downloader的,在调用`fetcher.FilterHeaders`的时候,fetcher就将自己要的区块头拿走了。
6 q; u# v+ m: e: D7 P9 N// handleMsg()& t! T$ ]6 T- E$ i% i/ G  @5 ^  j) i
case msg.Code == BlockHeadersMsg:. g& a# b# t& W" _7 A
// A batch of headers arrived to one of our previous requests. G  p1 {3 \" Y0 s, Z
var headers []*types.Header
8 `  z# `7 \% M/ P* g8 hif err := msg.Decode(&headers); err != nil {6 [3 C7 I& y* j2 P1 X& |& I
return errResp(ErrDecode, “msg %v: %v”, msg, err)
* i# g: h& G( x9 l7 O  [! s% C}, r( P( p7 \: j( U, x
// If no headers were received, but we’re expending a DAO fork check, maybe it’s that2 O+ E, p& ]! s; }6 h& ]; b8 u* A: e! a
// 检查是不是当前DAO的硬分叉
& z8 F  l4 V' P( l! Rif len(headers) == 0 && p.forkDrop != nil {" I; I' F5 \9 Y2 n
// Possibly an empty reply to the fork header checks, sanity check TDs+ E' `& ^8 |( V( h$ E
verifyDAO := true' a0 }; w) f# p4 `6 {  Z3 c
        // If we already have a DAO header, we can check the peer's TD against it. If
1 |- D1 Q1 D# X7 n$ L2 [; D        // the peer's ahead of this, it too must have a reply to the DAO check
! b6 u) a$ |8 r( J6 d7 Q        if daoHeader := pm.blockchain.GetHeaderByNumber(pm.chainconfig.DAOForkBlock.Uint64()); daoHeader != nil {
8 k2 t. \" S# I                if _, td := p.Head(); td.Cmp(pm.blockchain.GetTd(daoHeader.Hash(), daoHeader.Number.Uint64())) >= 0 {
" j0 l- [, Z$ c: h" s1 J, o% i                        verifyDAO = false# U# X( P) j, v0 o
                }! p4 o5 Q8 p2 m  u4 `! G
        }
9 ]+ o$ K( n! b  Z& {4 \        // If we're seemingly on the same chain, disable the drop timer
( L9 l$ W, O' p6 R; i        if verifyDAO {
5 ?# j6 h) x7 G% e" ?' a% A                p.Log().Debug("Seems to be on the same side of the DAO fork")7 X6 `: N/ ?! |& a
                p.forkDrop.Stop()
; [' j0 E& _5 D8 L                p.forkDrop = nil
9 M2 F* q# ?: Q: J. n                return nil, `: ?+ ?" u0 N- e3 f6 y7 x/ G$ K
        }
+ y! f( Y- ?. e1 b. z2 Q: C/ i}: B, p3 k. X& j' e  s/ L! D" y
// Filter out any explicitly requested headers, deliver the rest to the downloader
( z* ]" ?% H9 A! ^9 d// 过滤是不是fetcher请求的区块头,去掉fetcher请求的区块头再交给downloader
0 f8 x" d2 F; k1 p7 vfilter := len(headers) == 14 \% I6 F* u6 I
if filter {3 Z& l" }7 _1 ^) l
        // If it's a potential DAO fork check, validate against the rules
$ a. Q6 _' O3 Q6 U        // 检查是否硬分叉
4 C% a/ Y+ M; }& m' c" r1 w9 E        if p.forkDrop != nil && pm.chainconfig.DAOForkBlock.Cmp(headers[0].Number) == 0 {
. p  v. a* q& I# G( Y2 n! ]  w                // Disable the fork drop timer
* [: [& T$ @( ^                p.forkDrop.Stop()
0 y" g" l: ]; X4 q& x& j                p.forkDrop = nil; L' G' Y. ]4 F6 ?3 D/ Q) Y
                // Validate the header and either drop the peer or continue
8 U1 X1 e0 z6 p" E                if err := misc.VerifyDAOHeaderExtraData(pm.chainconfig, headers[0]); err != nil {: M; ?2 }( x( C% n$ [
                        p.Log().Debug("Verified to be on the other side of the DAO fork, dropping")% b2 }* }) Q( b/ g8 K; n) Y7 L
                        return err
0 P; L: D& C6 I5 f3 p1 L$ z                }' a- [6 @% Q8 p+ K& [
                p.Log().Debug("Verified to be on the same side of the DAO fork")
4 b. a* t7 O+ |4 R% G- a, U) n, W! `# u                return nil
" b; ~8 f2 D3 ~# h% R6 S        }/ F4 o4 h4 Y+ X, E
        // Irrelevant of the fork checks, send the header to the fetcher just in case! v  t$ ?/ b/ ~3 B, w
        // 使用fetcher过滤区块头/ P8 k, [5 n( d! j+ w6 h8 i3 l
        headers = pm.fetcher.FilterHeaders(p.id, headers, time.Now())$ R. d0 k  [( ~1 C3 u
}
5 u7 [! n& f' F& \; E, {// 剩下的区块头交给downloader7 G; Z! Y& l* y: W* o
if len(headers) > 0 || !filter {
* l- ?8 D9 x0 `( B& s        err := pm.downloader.DeliverHeaders(p.id, headers)6 |9 _1 e1 t6 ~3 G( A; q
        if err != nil {6 p4 Z0 t- v1 Q; j: |3 W8 p
                log.Debug("Failed to deliver headers", "err", err)& u$ s( d/ O5 N# e
        }( N% z+ m' h( O$ p1 E
}3 N+ I9 ]( ?3 Y+ R8 M
`FilterHeaders()`是一个很有大智慧的函数,看起来耐人寻味,但实在妙。它要把所有的区块头,都传递给fetcher协程,还要获取fetcher协程处理后的结果。`fetcher.headerFilter`是存放通道的通道,而`filter`是存放包含区块头过滤任务的通道。它先把`filter`传递给了`headerFilter`,这样`fetcher`协程就在另外一段等待了,而后将`headerFilterTask`传入`filter`,`fetcher`就能读到数据了,处理后,再将数据写回`filter`而刚好被`FilterHeaders`函数处理了,该函数实际运行在`handleMsg()`的协程中。; j6 d2 @9 F3 i. _6 M9 J
每个Peer都会分配一个ProtocolManager然后处理该Peer的消息,但`fetcher`只有一个事件处理协程,如果不创建一个`filter`,fetcher哪知道是谁发给它的区块头呢?过滤之后,该如何发回去呢?
0 Y" E+ ?' Y4 H9 C" P/ n, Z// FilterHeaders extracts all the headers that were explicitly requested by the fetcher,
  q4 Q/ w- Y- U% ?; [9 _6 B// returning those that should be handled differently.* M$ N4 o+ l4 t( @
// 寻找出fetcher请求的区块头
( f* j4 U5 O8 {6 Q8 Y0 c" Ffunc (f *Fetcher) FilterHeaders(peer string, headers []*types.Header, time time.Time) []*types.Header {
* y6 E" u2 R  Z% m6 S+ Plog.Trace(“Filtering headers”, “peer”, peer, “headers”, len(headers))
" L" p) T% q7 G+ t4 x$ X// Send the filter channel to the fetcher
: ~6 a9 Y$ j& N2 h% f3 g// 任务通道
& M& C/ L7 L, B9 G" Gfilter := make(chan *headerFilterTask)
" u: i: Y2 q( N3 R. |( Cselect {1 }1 C3 l+ K  y9 X  N: X
// 任务通道发送到这个通道4 ~/ ~7 h+ u3 B% P( }5 n% J, i
case f.headerFilter , c4 ?$ b0 u" [! R1 }
}
  M' s$ w7 o; a1 v0 B# ~接下来要看f.headerFilter的处理,这段代码有90行,它做了一下几件事:; \5 k0 e* t, \! V* D
1. 从`f.headerFilter`取出`filter`,然后取出过滤任务`task`。! R( q7 Q( }, {1 H/ J* n
2. 它把区块头分成3类:`unknown`这不是分是要返回给调用者的,即`handleMsg()`, `incomplete`存放还需要获取body的区块头,`complete`存放只包含区块头的区块。遍历所有的区块头,填到到对应的分类中,具体的判断可看18行的注释,记住宏观中将的状态转移图。
3 c/ _6 Z4 o6 u& ~3. 把`unknonw`中的区块返回给`handleMsg()`。0 U( R5 R$ i0 @9 W
4. 把` incomplete`的区块头获取状态移动到`fetched`状态,然后触发定时器,以便去处理complete的区块。
2 y: P0 m7 Z' C5. 把`compelete`的区块加入到`queued`。/ l& b' O: d5 U3 M
// fetcher.loop()9 @, I( e1 t1 i
case filter :=
  o$ q, H9 m4 O5 U: O' V/ X4 o; a// Split the batch of headers into unknown ones (to return to the caller),2 @; S8 w6 e# X5 a/ r. C. F
// known incomplete ones (requiring body retrievals) and completed blocks.3 d1 G  g! `/ F3 w
// unknown的不是fetcher请求的,complete放没有交易和uncle的区块,有头就够了,incomplete放" K6 k/ N5 g8 R1 v0 e4 c9 S" ~" k$ ^
// 还需要获取uncle和交易的区块2 J, i3 s, o0 S; \7 h
unknown, incomplete, complete := []*types.Header{}, []*announce{}, []*types.Block{}2 K$ x# K! b1 g0 ~
// 遍历所有收到的header  O% }7 Z# `; o2 |5 l/ I
for _, header := range task.headers {/ i+ f( h( U: Y) c
        hash := header.Hash()
' T. I+ `; N& U  {+ o        // Filter fetcher-requested headers from other synchronisation algorithms# R0 e+ G, D4 @4 v/ H- }
        // 是正在获取的hash,并且对应请求的peer,并且未fetched,未completing,未queued; M$ S2 `" J/ T/ h5 h
        if announce := f.fetching[hash]; announce != nil && announce.origin == task.peer && f.fetched[hash] == nil && f.completing[hash] == nil && f.queued[hash] == nil {
+ M0 D8 x2 c  o; T. j: F                // If the delivered header does not match the promised number, drop the announcer
: g* B/ ^1 c9 A2 Q0 O- Y4 T$ l, i                // 高度校验,竟然不匹配,扰乱秩序,peer肯定是坏蛋。% j" C' Q4 L- c
                if header.Number.Uint64() != announce.number {
, s1 H" a; Y* a6 z8 S( F. Y0 z                        log.Trace("Invalid block number fetched", "peer", announce.origin, "hash", header.Hash(), "announced", announce.number, "provided", header.Number)2 O$ k# i- b! A* M, @/ s% Y. f
                        f.dropPeer(announce.origin)
, A$ y/ B0 p+ B$ Z, z                        f.forgetHash(hash)9 U7 w( |. m8 @* d" u# A
                        continue
. D0 y7 \) q/ k0 z' T) ?                }3 Z* C# N3 d" V9 a) y. z
                // Only keep if not imported by other means& u* }1 Q: ~0 j) |+ i( X
                // 本地链没有当前区块" O: `& K; Z  ?' A. R
                if f.getBlock(hash) == nil {
* D) O4 f8 l& ~' c" G                        announce.header = header8 ?+ t/ K" t% N3 W/ M3 x  |
                        announce.time = task.time
. }$ s; z; w' S" F3 P                        // If the block is empty (header only), short circuit into the final import queue+ i. P- {  J% H4 ?& w% b0 s
                        // 如果区块没有交易和uncle,加入到complete: D5 y2 l; F- Q3 M# N: L( R& Y
                        if header.TxHash == types.DeriveSha(types.Transactions{}) && header.UncleHash == types.CalcUncleHash([]*types.Header{}) {: d% J, [2 J& u& J* v3 Q$ @  K
                                log.Trace("Block empty, skipping body retrieval", "peer", announce.origin, "number", header.Number, "hash", header.Hash())" V6 n1 P+ s% F% s9 b
                                block := types.NewBlockWithHeader(header)2 Y* C' A% I4 \6 l- C
                                block.ReceivedAt = task.time
: i; o% d& m6 E$ T6 F2 y% n                                complete = append(complete, block)
: d8 x. G( l% u9 c, u: f+ O8 o( m! d  k                                f.completing[hash] = announce
4 v, t0 G. d/ q# {                                continue8 Q9 L0 v" J- a; b+ \( V
                        }: k3 b/ t! G1 v9 g5 a
                        // Otherwise add to the list of blocks needing completion9 M% h3 ~+ o8 h6 Q- H: j* N
                        // 否则就是不完整的区块
8 c' t  V$ c2 y) {* N2 y& K                        incomplete = append(incomplete, announce)& x2 T. H# w% T+ m
                } else {' B( n' \2 Y/ n/ X
                        log.Trace("Block already imported, discarding header", "peer", announce.origin, "number", header.Number, "hash", header.Hash())
) V9 u# m0 U, S  y0 F2 f                        f.forgetHash(hash)
" n  x) m% v( v                }) r4 L/ h2 U' T5 @! Q7 X' l7 n
        } else {
, W/ T! M) \4 h5 p) \: s- ]                // Fetcher doesn't know about it, add to the return list1 Z3 ]. B4 v4 v9 S% o' U$ B
                // 没请求过的header
9 I5 B4 @' Y, i                unknown = append(unknown, header)
7 b. p  f$ e3 f" y: p# J- t        }6 `7 u  Y6 i& |* ]" ]  t) O  s
}
$ s$ D9 E( o# `) B// 把未知的区块头,再传递会filter
. K: ?; h. e: |1 y4 N8 sheaderFilterOutMeter.Mark(int64(len(unknown)))7 }- C( Z& ~* f
select {* s! ~; t: [" U9 R  C
case filter ' \7 x3 X. v6 ?6 B3 U
跟随状态图的转义,剩下的工作是`fetched`转移到`completing`        ,上面的流程已经触发了`completeTimer`定时器,超时后就会处理,流程与请求Header类似,不再赘述,此时发送的请求消息是`GetBlockBodiesMsg`,实际调的函数是`RequestBodies`。# _4 l# E# r" u
// fetcher.loop()6 m  D7 |% e, l6 c/ r+ c1 B8 n, f# S
case * @7 U- m" C' i! j
// 遍历所有待获取body的announce
- Z; \8 c6 j, x' G, kfor hash, announces := range f.fetched {! [  n1 l: |5 I2 k8 P$ R
        // Pick a random peer to retrieve from, reset all others/ |; N  W4 V: A8 c
        // 随机选一个Peer发送请求,因为可能已经有很多Peer通知它这个区块了
/ c; `: H1 V+ i3 T: o+ o! w: U: `* I        announce := announces[rand.Intn(len(announces))]
* x. K. ~  W! w; U8 Y7 S( ^        f.forgetHash(hash); H/ r/ D$ x5 b+ J- Q+ o9 f2 X
        // If the block still didn't arrive, queue for completion0 V! a% ^9 _8 ?
        // 如果本地没有这个区块,则放入到completing,创建请求2 \8 X) D: \, j
        if f.getBlock(hash) == nil {0 ]) B3 |2 r2 k9 r) u
                request[announce.origin] = append(request[announce.origin], hash)
: \; U# ]3 V8 S' l* h                f.completing[hash] = announce
7 ]' X. p' Z" c2 B        }
2 Z& |' U2 _3 h- @: _. X. D}
  j' A& D$ Q/ \1 C+ s" }3 v// Send out all block body requests$ A. ]) Z5 R( b9 t- W3 N9 L6 \
// 发送所有的请求,获取body,依然是每个peer一个单独协程
: p8 F# O4 j3 H7 nfor peer, hashes := range request {8 O. P0 q. l4 ?" {8 r- z; V
        log.Trace("Fetching scheduled bodies", "peer", peer, "list", hashes)
* r- R+ V" Z$ i8 Y        // Create a closure of the fetch and schedule in on a new thread
( G$ J$ l3 J. D. g        if f.completingHook != nil {
9 t8 S( a) U3 o! ^8 z5 E                f.completingHook(hashes)9 u8 \0 Q+ e0 o" G  _7 t
        }
' M2 L  J/ z# o- D# q        bodyFetchMeter.Mark(int64(len(hashes)))3 f; R; [' }/ |1 ?$ p0 f
        go f.completing[hashes[0]].fetchBodies(hashes)
0 l3 e$ I! M6 h* i- `; j}9 ~/ n2 U* `7 [% l
// Schedule the next fetch if blocks are still pending  E4 @1 j3 l9 i) g3 ?( U; c. }
f.rescheduleComplete(completeTimer)* g  J; A3 Y8 E# F- z  D- I
`handleMsg()`处理该消息也是干净利落,直接获取RLP格式的body,然后发送响应消息。
$ B4 f, J3 Y5 p0 n// handleMsg()& @9 a7 x, s8 J% X$ Q% U
case msg.Code == GetBlockBodiesMsg:
7 d- i# b" `9 Y, d3 P4 V* g// Decode the retrieval message" N6 R/ {# C, ^( d: r7 ^! I; D
msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))7 c% a0 S+ v2 a8 N* I3 v7 z- ~
if _, err := msgStream.List(); err != nil {( q1 K, t  S1 _* ^+ ?
return err. w5 N6 n8 G( @- F+ n; d- F
}
6 ]  q; g2 p' N" g1 O: m" p8 H6 Y// Gather blocks until the fetch or network limits is reached
: g) ^& _2 w+ avar (7 |. \% p  a0 T0 k" S8 H
hash   common.Hash
! v% y! e" O( [; Z0 p( S+ mbytes  int& b( O8 y! V( a9 X- r; f
bodies []rlp.RawValue
3 F2 ~% E, V& f7 k6 ^)3 L2 i9 O2 ?1 T3 d; A
// 遍历所有请求- V! W$ t# }6 M  d) D* d% V
for bytes   @4 c* }2 W6 [$ H8 V+ H. M6 Q/ z# I
响应消息`BlockBodiesMsg`的处理与处理获取header的处理原理相同,先交给fetcher过滤,然后剩下的才是downloader的。需要注意一点,响应消息里只包含交易列表和叔块列表。& X' k% r2 d+ F: }$ W
// handleMsg()
8 R: y1 j; I0 Ncase msg.Code == BlockBodiesMsg:
9 u7 T$ }1 D& z# J1 n% A1 H// A batch of block bodies arrived to one of our previous requests9 ^) r3 C4 @8 u4 s
var request blockBodiesData8 x7 C8 [# K2 |6 U) B( D: d! Y8 x; n
if err := msg.Decode(&request); err != nil {
8 e0 }( \' t; a2 g# jreturn errResp(ErrDecode, “msg %v: %v”, msg, err)
4 M8 @5 ]! X' e7 {}7 @( q9 v+ q  M) n7 U' G0 P
// Deliver them all to the downloader for queuing, v8 S9 {: W" Y; x" c1 F- B
// 传递给downloader去处理
9 C2 }; d$ `1 a1 qtransactions := make([][]*types.Transaction, len(request))
# W6 G% \7 q. \0 M3 H, N6 Tuncles := make([][]*types.Header, len(request))1 X# L% j. Z# v% a- K( O3 r
for i, body := range request {
( m+ o( e4 b1 b) r        transactions = body.Transactions4 a; u7 [4 u* V% h- R% i3 o1 m
        uncles = body.Uncles
7 x$ \( L' I. g7 s- j& |9 F}
, q2 N. @6 Y3 G! S& V4 u// Filter out any explicitly requested bodies, deliver the rest to the downloader4 D0 j+ n1 p* b1 X# M$ \% t; P9 r  `
// 先让fetcher过滤去fetcher请求的body,剩下的给downloader
) f; i7 n4 I$ s0 {. ^/ v3 \filter := len(transactions) > 0 || len(uncles) > 0
$ C: l- {4 ?0 @$ i4 Nif filter {/ q4 h. P$ |4 b# ]
        transactions, uncles = pm.fetcher.FilterBodies(p.id, transactions, uncles, time.Now())! z  ~4 D, i# w" q
}# z) V% c5 |$ H- a9 ?, B
// 剩下的body交给downloader6 s6 e5 d( [7 h6 k
if len(transactions) > 0 || len(uncles) > 0 || !filter {$ J4 t" b1 G% p, A: e. @
        err := pm.downloader.DeliverBodies(p.id, transactions, uncles)5 F3 T. \& T, v+ l
        if err != nil {
, K1 N* p9 R' I  G& M  s  T+ S                log.Debug("Failed to deliver bodies", "err", err)' V# a5 |; Y: p" H) i# f
        }; b- d1 `- Y& ~, E3 B/ a
}
" {, m% L) {9 |过滤函数的原理也与Header相同。6 f5 f; x+ D/ V
// FilterBodies extracts all the block bodies that were explicitly requested by8 i& ]4 |+ N  J6 {# k2 M* D$ U
// the fetcher, returning those that should be handled differently.* p$ G5 ^# C9 I4 P# V7 L' Y/ ~
// 过去出fetcher请求的body,返回它没有处理的,过程类型header的处理( d- O, Z4 q2 \( [
func (f *Fetcher) FilterBodies(peer string, transactions [][]*types.Transaction, uncles [][]*types.Header, time time.Time) ([][]*types.Transaction, [][]*types.Header) {
9 N" p+ a. n( T9 _0 blog.Trace(“Filtering bodies”, “peer”, peer, “txs”, len(transactions), “uncles”, len(uncles))* \' c! {: [! j) j/ Y0 W7 {
// Send the filter channel to the fetcher
5 Q# l# o: e/ y! `( pfilter := make(chan *bodyFilterTask)
5 q: v3 g6 }* N, U+ ?; Bselect {
- ^9 r7 r  z, acase f.bodyFilter . T- J  x( Q, c2 k
}
% F% F- t$ c# t) }% v; G& \实际过滤body的处理瞧一下,这和Header的处理是不同的。直接看不点:- l$ \; k4 b: r, u" L) J
1. 它要的区块,单独取出来存到`blocks`中,它不要的继续留在`task`中。
  _/ d7 v# P$ n4 ]  u. T2. 判断是不是fetcher请求的方法:如果交易列表和叔块列表计算出的hash值与区块头中的一样,并且消息来自请求的Peer,则就是fetcher请求的。* y4 j+ `! F/ x8 d0 R% u' p/ T& d: m
3. 将`blocks`中的区块加入到`queued`,终结。& p; i2 a- u1 C/ t
case filter :=
, W4 y7 ^2 U8 V# g9 u& @. d5 Wblocks := []*types.Block{}
) J3 V3 l, i  P' ]// 获取的每个body的txs列表和uncle列表
- e. I$ e$ V& P0 h5 m( R, I( V// 遍历每个区块的txs列表和uncle列表,计算hash后判断是否是当前fetcher请求的body
. Z( l( P: u6 S7 }6 U9 v0 k% nfor i := 0; i 9 V& a2 w; v& H) T8 m/ M
}, O9 u% f3 s; z7 i

5 s& X0 e" n4 _, {3 \) I2 r至此,fetcher获取完整区块的流程讲完了,fetcher模块中80%的代码也都贴出来了,还有2个值得看看的函数:
+ v2 R: l/ E( g( Y3 M1. `forgetHash(hash common.Hash)`:用于清空指定hash指的记/状态录信息。
: o! n* X) {" h2. `forgetBlock(hash common.Hash)`:用于从队列中移除一个区块。
& M6 m0 d3 G' R: {6 ]- e最后了,再回到开始看看fetcher模块和新区块的传播流程,有没有豁然开朗。
) o# S' [  T1 ~. X
# m6 i4 T5 ~& t) H( \9 d$ y
BitMere.com 比特池塘系信息发布平台,比特池塘仅提供信息存储空间服务。
声明:该文观点仅代表作者本人,本文不代表比特池塘立场,且不构成建议,请谨慎对待。
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

成为第一个吐槽的人

刘艳琴 小学生
  • 粉丝

    0

  • 关注

    0

  • 主题

    3