Hi 游客

更多精彩,请登录!

比特池塘 区块链技术 正文

深入区块链以太坊源码之p2p通信

Mohammad61417
104 0 0
一、p2p网络中分为有结构和无结构的网络
5 v7 l9 s# W1 Y2 a) {无结构化的
  }( B# \9 F* J* [这种p2p网络即最普通的,不对结构作特别设计的实现方案。
! T& C" b/ g" v- J8 T0 `优点是结构简单易于组建,网络局部区域内个体可任意分布,
+ e# I1 |: w% r' i) f8 ^" k反正此时网络结构对此也没有限制;特别是在应对大量新个体加
! z+ M+ y2 q8 P/ m' e- x入网络和旧个体离开网络(“churn”)时它的表现非常稳定。6 m6 ~) w, [) p0 d
缺点在于在该网络中查找数据的效率太低,因为没有预知信息,
! D6 R( i3 ~" g& `( R4 u所以往往需要将查询请求发遍整个网络(至少大多数个体),' Y" o3 D9 K2 A; J3 ^
这会占用很大一部分网络资源,并大大拖慢网络中其他业务运行。8 i4 ^, ~8 K% P- x" c
结构化的:
3 y8 x! ?, A$ Y9 ~) b' m. |/ h这种p2p网络中的个体分布经过精心设计,主要目的是为了提高查询数据的效率,
0 F! c6 X; D$ m) M" @& l2 d1 S! _降低查询数据带来的资源消耗。3 e1 C* X4 q, {! J/ @* ^
以太坊采用了不需要结构化的结构,经过改进的非结构化(比如设计好相邻个体列表peerSet结构)% J7 v) ~6 M# W1 {# B2 z
网络模型可以满足需求;  {0 J# X' m4 J3 G$ [0 w8 g8 o5 }3 C
二、分布式hash表(DHT)
! c, F' e* Z' ^* C6 }保存数据
5 T" T" v' l( _) y9 g(以下只是大致原理,具体的协议实现可能会有差异)
* ?8 U$ \9 W# ~  P% E0 ~/ R当某个节点得到了新加入的数据(K/V),它会先计算自己与新数据的 key 之间的“距离”;
) v; s$ H2 ^8 }+ \' X  @然后再计算它所知道的其它节点与这个 key 的距离。
3 R* h& q1 M+ L  N# ~7 @1 R3 v! j如果计算下来,自己与 key 的距离最小,那么这个数据就保持在自己这里。) N( p( q! V3 `: _, I6 s/ A
否则的话,把这个数据转发给距离最小的节点。/ ?5 ~7 X/ n  N
收到数据的另一个节点,也采用上述过程进行处理(递归处理)。
4 p# g3 m! a9 _/ a# y$ Z1 x: u$ f获取数据6 a  {- o" e& O8 G5 ?% j
(以下只是大致原理,具体的协议实现可能会有差异)
/ ^  X# K8 E8 e: \当某个节点接收到查询数据的请求(key),它会先计算自己与 key 之间的“距离”;1 v& d- Q% Y1 s. r# R6 {
然后再计算它所知道的其它节点与这个 key 的距离。
( y: I' H7 K: ]: r9 @. \8 A如果计算下来,自己与 key 的距离最小,那么就在自己这里找有没有 key 对应的 value。* H5 o% U$ \6 n% ]! J, t1 K
有的话就返回 value,没有的话就报错。5 g$ _8 ^; S+ r7 t  Z3 s# o, D) j
否则的话,把这个数据转发给距离最小的节点。
$ D4 v1 p, V  r9 W9 h$ p& {收到数据的另一个节点,也采用上述过程进行处理(递归处理)。& [4 H: S# ], Q9 a
三、以太坊中p2p通信的管理模块ProtocolManager
/ o& V% Z+ t4 _: O# v* s3 U/ j/geth.go
. \" Z- Z- r6 R3 `( p% c/ k+ j// Start creates a live P2P node and starts running it.
: k3 L. Z8 P  p' i3 h7 [$ B$ Ffunc (n *Node) Start() error {) W; u& y. u: B; W* r: x
   return n.node.Start()
1 W( H! r) X& ?}3 h; l9 x' t- r# x- ~
/*
; n, i3 o, g8 f! m* }   Protocol:容纳应用程序所要求的回调函数等.并通过p2p.Server{}在新连接建立后,将其传递给通信对象peer。
8 s, s$ F+ t, ?: v   Node.Start()中首先会创建p2p.Server{},此时Server中的Protocol[]还是空的;3 w+ Z8 |( V7 w: o% W/ l$ ^. J! h0 L
   然后将Node中载入的所有实现体中的Protocol都收集起来,2 s9 O/ h4 g% h2 X# `
   一并交给Server对象,作为Server.Protocols列表;然后启动Server对象,7 s4 H* p! D4 C/ i4 f, F5 v
   并将Server对象作为参数去逐一启动每个实现体。
. j$ [$ C9 D& R/ H  N*/
' [: v5 \" a6 T* i+ l/node.go+ l8 g! l$ J; \5 Y
// Start create a live P2P node and starts running it.0 I5 m% ^0 d1 z
func (n *Node) Start() error {
( T  b5 F  t$ c   
  w7 x2 p& O4 [' E7 S! c3 U: P   ...
( i. @7 o* z0 A1 V   /*! j! L' N) M+ o# c# \' N
           ...2 {5 u% X" B& G$ X, n5 b9 e
           初始化serverConfig
  T( G, L8 {% b( b  V1 U+ u! o% b   */0 C: a2 o1 M; T) W1 H: }
   running := &p2p.Server{Config: n.serverConfig}
6 E2 Z5 I8 u$ M: M   ...
; a' |9 s4 H" K  l$ m8 Z3 E% @& y" ?   // Gather the protocols and start the freshly assembled P2P server
# D# D  V; \- W- x% M   for _, service := range services {( V7 `0 k& Z. L6 i2 Q
           running.Protocols = append(running.Protocols, service.Protocols()...)3 Y/ ^% `5 \  F0 g
   }
: R$ y9 ~' P' i$ Q/ i5 `   if err := running.Start(); err != nil { //见下面的(srv *Server)Start方法
2 l7 j9 X* U2 }4 a, x* _3 I           return convertFileLockError(err)
+ g/ O0 |6 o( a) N6 O) T$ F   }3 \" X% w6 n' D; v0 w
   // Start each of the services
5 R3 H" ~$ N0 ?! m% F   started := []reflect.Type{}
. K6 ]( i7 N/ V3 \6 k& U   for kind, service := range services {" J2 k+ @0 x# Y4 r
           // Start the next service, stopping all previous upon failure
8 [0 i$ Z: @1 C$ M           //启动每个services通过下面的方法func (s *Ethereum) Start(srvr *p2p.Server) error {
: d5 ~3 }" N/ Z, K$ d1 L7 Y4 G# j           if err := service.Start(running); err != nil {# w, K( n3 R: s$ v  O+ H
                   for _, kind := range started {% }0 k: o8 b1 r6 F
                           services[kind].Stop()
, ?; j" H1 H+ M                   }6 B& m7 b) b; I" d; [, z/ m9 ^
                   running.Stop()1 C' [; S4 s& k3 J
                   return err, ~2 Y( \( L9 F8 v: k+ l8 d
           }
) p* O8 K' N# ?3 }  L$ x6 T           ...
) H/ y+ f9 @( G/ C* `! g   }6 x( H' ~0 n7 H
}& C7 c9 E( n0 Q1 V, H9 M
// Start starts running the server.
$ H, x: P) X9 u# C) P: T( X6 O// Servers can not be re-used after stopping.5 ?- E" n6 Q2 o4 A$ Q' b, R9 p
func (srv *Server) Start() (err error) {2 `. f9 G# o2 _  p3 i
   srv.lock.Lock()
7 Z1 m" P' T% I   //srv.lock为了避免多线程重复启动9 a$ H8 j# v/ X3 g- o1 Q
   defer srv.lock.Unlock()) s( y" k2 i6 _1 r
   if srv.running {
- k' A# C8 r, D+ t; @           return errors.New("server already running")
8 r- E6 A+ f" P" {+ G2 L: v   }
9 p; `! {+ U9 ?2 W9 v   srv.running = true
) a' g; {; S# n' b1 G, ^* o" K% x   srv.log = srv.Config.Logger. M5 C- d' W: r- R/ w
   if srv.log == nil {
8 e, p! m. Z2 }! q4 q+ [7 u           srv.log = log.New(); i- J( V: Y3 X! D
   }
* [% A4 ~) @7 I) g# J4 w; s; |# O   if srv.NoDial && srv.ListenAddr == "" {
' n+ V) p* y: S6 {; y           srv.log.Warn("P2P server will be useless, neither dialing nor listening")
; Q3 ]: b3 I; _. T8 C   }* X- e( l  ~6 c- G
   // static fields$ K6 j( w( @+ v5 D
   if srv.PrivateKey == nil {/ n. ~) C- D# z8 \9 R; [
           return fmt.Errorf("Server.PrivateKey must be set to a non-nil key"). F( T5 L! U4 S8 T
   }9 z) W" n3 k" U3 n' r/ f& F
   //newTransport使用了newRLPX使用了rlpx.go中的网络协议。
0 T) h1 w: N* G+ \   if srv.newTransport == nil {& V# W- }& T$ W1 L' m
           srv.newTransport = newRLPX. d8 U4 ]( H* L# ^& f/ R
   }+ [0 n" [) B" c' }4 v- u
   if srv.Dialer == nil {! R, L0 N1 E7 i1 [8 j# ?
           srv.Dialer = TCPDialer{&net.Dialer{Timeout: defaultDialTimeout}}
5 N/ h( J/ z) C2 C* ]   }
1 y0 d4 y. Q) P) l) r   srv.quit = make(chan struct{})+ Y! D9 N/ L' |# _
   srv.addpeer = make(chan *conn)
0 C, l4 v: t) F( b/ p$ |   srv.delpeer = make(chan peerDrop)" h* t5 K5 s2 }6 Z5 s
   srv.posthandshake = make(chan *conn)
% C+ O% \3 @! l5 I   srv.addstatic = make(chan *enode.Node)+ r. P1 V3 h) q8 r* D0 ~
   srv.removestatic = make(chan *enode.Node)" D6 ]1 o0 Z, |% ]' m
   srv.addtrusted = make(chan *enode.Node)
2 h! t- Y- u! c7 \% n; j6 Q   srv.removetrusted = make(chan *enode.Node)( r/ Z) p6 a% u) Q
   srv.peerOp = make(chan peerOpFunc)3 a$ H: k% L: e- x; n% `( b
   srv.peerOpDone = make(chan struct{})8 j5 ^. x1 w% _8 r1 Q/ B4 B
   //srv.setupLocalNode()这里主要执行握手
# P9 V+ r  r3 p   if err := srv.setupLocalNode(); err != nil {
6 @- d/ X' k7 N* m) V           return err3 `5 ^% x3 q2 f* n$ M: O) \
   }
! I6 @( Q" i, U4 K6 \# L+ Y, b   if srv.ListenAddr != "" {: M2 @" [( T5 P* c* ^' V' R
           //监听TCP端口-->用于业务数据传输,基于RLPx协议)
5 H7 `7 z' n9 L           //在setupListening中有个go srv.listenLoop()去监听某个端口有无主动发来的IP连接: `8 y  m: U  I: t$ ^* @
           if err := srv.setupListening(); err != nil {
/ e/ w5 m* ]7 F. R5 n9 i$ F# @& j! G                   return err0 z: i6 H0 @1 N6 q1 P
           }$ i  \; R/ @- b
   }- c% Q3 X7 e$ A' {6 j9 v; \1 a
   //侦听UDP端口(用于结点发现内部会启动goroutine)6 F! `, `8 |# C; f: |/ |5 w% N) d$ R
   if err := srv.setupDiscovery(); err != nil {
/ |3 [, F) P: y$ V! [+ J+ \. r           return err9 }2 m3 V4 x+ q- L$ g
   }$ x4 J- t9 E! l$ E1 D3 O  n$ ?
   dynPeers := srv.maxDialedConns()% f9 ^* K1 Z: B# E0 W: }. i
   dialer := newDialState(srv.localnode.ID(), srv.StaticNodes, srv.BootstrapNodes, srv.ntab, dynPeers, srv.NetRestrict)
/ v7 s! s" W# o2 U$ t   srv.loopWG.Add(1)
$ v6 ~! |, w3 E( b8 V3 K   // 启动新线程发起TCP连接请求
, [0 i! ^: r. C& {  @2 Z   //在run()函数中,监听srv.addpeer通道有没有信息如果有远端peer发来连接请求,
% E8 Y( n  e& j; u! {% P  G# a   //则调用Server.newPeer()生成新的peer对象,并把Server.Protocols全交给peer。( F7 b: ?! ~$ T: z7 C9 P& X( z5 ?
   /*
9 G, w, J  d  o/ G9 {   case c :=  0 {
, W1 H  L( U4 p8 x! H. ?1 a0 \           if s.config.LightPeers >= srvr.MaxPeers {' G% z+ n; E! c: B2 S9 _" O- g
                   return fmt.Errorf("invalid peer config: light peer count (%d) >= total peer count (%d)", s.config.LightPeers, srvr.MaxPeers)% g5 k& z1 B5 e" f" @  L
           }
1 `# J% C) r: v+ b. g/ N- y$ ^3 D           maxPeers -= s.config.LightPeers: @  F$ w& N0 B) J
   }
4 j9 X3 r) K3 Y/ I6 T" n9 ^6 b% f   // Start the networking layer and the light server if requested) t6 ?8 M- u4 L7 r5 c
   s.protocolManager.Start(maxPeers)
& K; a# t! k' [4 j9 y. e/ U   if s.lesServer != nil {
" l3 ^9 |2 X; A) x4 o. ^! S           s.lesServer.Start(srvr), U) U0 B2 V1 w+ K
   }
# K2 j$ w$ C% y4 g   return nil
9 B4 Q- v- ~, G8 D& Z- F}6 l5 J- h( U9 T1 |
/eth/handler.go
/ K7 C5 R( l5 K% Ttype ProtocolManager struct {' \/ ^5 n9 N  m4 E7 ]* A$ l
   networkID uint64" L) N$ Z1 @- t5 `' k
   fastSync  uint32 // Flag whether fast sync is enabled (gets disabled if we already have blocks)
$ n0 j3 K! L2 m+ k1 J   acceptTxs uint32 // Flag whether we're considered synchronised (enables transaction processing)' F# F0 t4 [) r
   txpool      txPool
* S. M! ]- a" F7 T8 j   blockchain  *core.BlockChain
% p" E0 q( P( R5 S/ T6 t0 e  X   chainconfig *params.ChainConfig
, o' {' d# i/ m/ ~; M7 R$ N" e  R   maxPeers    int; F7 @5 V) N/ c. c( ]! J* H; s
   //Downloader类型成员负责所有向相邻个体主动发起的同步流程。
% d$ D# _' I$ d& L0 C2 ~" x   downloader *downloader.Downloader: \, S5 K( \& k2 y9 N% ~
   //Fetcher类型成员累积所有其他个体发送来的有关新数据的宣布消息,并在自身对照后做出安排4 _$ J/ t* P) g+ }, ~
   fetcher    *fetcher.Fetcher: U# L& e! [' q9 K3 T
   //用来缓存相邻个体列表,peer{}表示网络中的一个远端个体。  s6 h% }" w# z9 U
   peers      *peerSet
, b1 ~2 E# A; d   SubProtocols []p2p.Protocol
* y* [9 t/ ?0 O2 T& |   eventMux      *event.TypeMux7 p  |; c9 R3 W# |+ `
   txsCh         chan core.NewTxsEvent
1 t* w( e$ f4 e2 R: Z  u" y' P   txsSub        event.Subscription0 n8 ~2 @* k6 ^0 V4 c' `4 S- X
   minedBlockSub *event.TypeMuxSubscription
4 Y! W, k$ k% Z; ], U   
. y4 b8 h& D/ L8 D   //通过各种通道(chan)和事件订阅(subscription)的方式,接收和发送包括交易和区块在内的数据更新。
+ H- H/ P. j4 j+ w   //当然在应用中,订阅也往往利用通道来实现事件通知。5 |. y! n" [5 G2 \! c# Q
   // channels for fetcher, syncer, txsyncLoop; T" ^1 \" k/ E
   newPeerCh   chan *peer# O; U: Y* o! H2 N9 U
   txsyncCh    chan *txsync
4 F1 o" J" Y7 V' x! W$ p1 c* _- A+ Y  s   quitSync    chan struct{}6 v# I6 I7 p! {3 p# c
   noMorePeers chan struct{}6 u; t7 ]: v2 t+ O+ l) i& t
   // wait group is used for graceful shutdowns during downloading7 W$ X% f2 ^$ R" [1 e2 L! ~
   // and processing
- |* j+ \# N: c1 g3 t" r0 c   wg sync.WaitGroup5 `# Z& U4 s: k3 m' y% |
}$ {/ G; ]; y. P8 J4 ^$ m4 k( w
   Start()函数是ProtocolManager的启动函数,它会在eth.Ethereum.Start()中被主动调用。
0 J, ^) X5 l) K: o, Q, uProtocolManager.Start()会启用4个单独线程(goroutine,协程)去分别执行4个函数,
- a: f' w5 [" ~3 H7 q这也标志着该以太坊个体p2p通信的全面启动。
7 f6 F$ O# H/ j% b& e' hfunc (pm *ProtocolManager) Start(maxPeers int) {' \5 _' g7 k3 A0 \- t# z" \
   pm.maxPeers = maxPeers3 F* P5 a  H8 M- N0 v. K- _/ W
   // broadcast transactions
# w3 y4 E# S2 D- Y   //广播交易的通道。 txsCh会作为txpool的TxPreEvent订阅通道。/ c/ V/ p% T( P' m$ [! }  d; G
   //txpool有了这种消息会通知给这个txsCh。 广播交易的goroutine会把这个消息广播出去。8 i' I# z, A2 S9 G9 Z) R4 m
   pm.txsCh = make(chan core.NewTxsEvent, txChanSize)( L2 u/ F& H* a: B3 S
   //订阅交易信息
. B8 A9 L) O4 b   pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh)
0 Y& d4 T' O1 _5 j5 l4 Q) T4 l( y   
7 m8 A0 D6 ~! c# Z; E# X   go pm.txBroadcastLoop()& ~/ s4 N* X6 d
   //订阅挖矿消息。当新的Block被挖出来的时候会产生消息
8 o7 p) C( ]! {4 |% X4 K& K: I9 N   // broadcast mined blocks
% @8 t5 a/ `' u3 I   pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})* T0 t4 b. W; G: a/ e
   //挖矿广播 goroutine 当挖出来的时候需要尽快的广播到网络上面去。! R# p3 t% I9 z/ E7 v. F
   go pm.minedBroadcastLoop()8 J1 ]# |- w  [- J4 V* d
   // start sync handlers
+ ^: M: Y! Q3 G   // 同步器负责周期性地与网络同步,下载散列和块以及处理通知处理程序。
, z& d" h0 Y  O1 t   go pm.syncer(). t1 q( n) }3 ?- n0 \2 N
   // txsyncLoop负责每个新连接的初始事务同步。 当新的peer出现时,/ D; {0 s3 i4 _6 K
   // 转发所有当前待处理的事务。为了最小化出口带宽使用,我们一次只发送一个小包。  g+ J# t+ v4 G% c7 z+ t
   go pm.txsyncLoop()
% m+ U% z' |) G$ h/ R0 M}
. y% r# ~5 ^; h- D% N: V; A//txBroadcastLoop()会在txCh通道的收端持续等待,一旦接收到有关新交易的事件,
# H7 I1 N# a  ?4 e   //会立即调用BroadcastTx()函数广播给那些尚无该交易对象的相邻个体。
% Q8 K; Z' _* ?1 D3 a//------------------go pm.txBroadcastLoop()-----------------------
2 a$ _  R0 A, M8 E- D0 k0 R0 Rfunc (pm *ProtocolManager) txBroadcastLoop() {
' c9 R6 F! I/ z: _4 [$ M$ N   for {
+ ^8 {8 a! u+ T           select {7 l6 ^% t1 }1 X/ l% M
           case event := = pm.maxPeers && !p.Peer.Info().Network.Trusted {  j# R8 _/ r. B9 j2 L1 s
           return p2p.DiscTooManyPeers8 y/ x8 Z4 I# E
   }, `  X- c# A/ Y& [4 K
   p.Log().Debug("Ethereum peer connected", "name", p.Name())
# X5 f2 V( N0 P   // Execute the Ethereum handshake
, L) l; d! \; }8 W* T( N. M   var (6 V+ |4 b' K9 Q8 N
           genesis = pm.blockchain.Genesis()
3 Q) \2 J' Z9 J7 i8 U1 J           head    = pm.blockchain.CurrentHeader()
9 g8 \( q  G) `, g           hash    = head.Hash()% }, `" G9 F9 r- G% {/ |
           number  = head.Number.Uint64()
7 W  j1 V! R5 }/ {+ i, K           td      = pm.blockchain.GetTd(hash, number)
/ r' h, Q" g$ i) U5 n& Y   )
2 O" m$ u" n  G) C   //握手,与对方peer沟通己方的区块链状态! d' ?  Q  T* s4 t* ^
   if err := p.Handshake(pm.networkID, td, hash, genesis.Hash()); err != nil {. C4 R0 D" e% \" y6 G# d$ L
           p.Log().Debug("Ethereum handshake failed", "err", err): P4 v! p( x1 _5 A8 P1 o1 E
           return err
4 q, P/ O5 H# V) a   }& i! q$ H& I- C6 C; G
   //初始化一个读写通道,用以跟对方peer相互数据传输。& a; ?$ H% P0 T& I5 a
   if rw, ok := p.rw.(*meteredMsgReadWriter); ok {$ j+ R4 Y2 n' s
           rw.Init(p.version)
6 `) C6 d  k+ d% X' K$ u% z; d   }- V. u6 H/ ~- R: l5 D
   // Register the peer locally: T" {1 P; c+ I# P4 ^5 G6 ~8 }
   //注册对方peer,存入己方peer列表;只有handle()函数退出时,才会将这个peer移除出列表。( I" p4 Q" t2 k; Z' ~: Q8 Q
   if err := pm.peers.Register(p); err != nil {, `% B3 U6 R3 }& W- G5 N$ Y2 b
           p.Log().Error("Ethereum peer registration failed", "err", err)
2 f$ b' T# h7 L! L+ I           return err
; E0 P0 r1 G' X& `   }
! K$ }6 E. _7 o: k! f8 @   defer pm.removePeer(p.id)
3 ?+ _: |. a5 H4 e2 @   //Downloader成员注册这个新peer;Downloader会自己维护一个相邻peer列表。
6 c3 _# D% E4 u   // Register the peer in the downloader. If the downloader considers it banned, we disconnect
/ L- A' ]* W( y) _4 U   if err := pm.downloader.RegisterPeer(p.id, p.version, p); err != nil {
$ x+ x/ m1 y/ Z4 f, k; T% K  R. a           return err; j. G1 ~- U% P" [0 b, E$ X' |
   }
& l' X. U. s( R   // Propagate existing transactions. new transactions appearing
- s, H/ i! j' H5 y9 s- X   // after this will be sent via broadcasts.0 ?& Q& H2 W4 T5 N0 g6 Y
   /*' I5 r2 X* S: L) j5 A5 l$ e
   调用syncTransactions(),用当前txpool中新累计的tx对象组装成一个txsync{}对象,
& R4 V4 u5 J1 w$ _8 W* v   推送到内部通道txsyncCh。还记得Start()启动的四个函数么? 其中第四项txsyncLoop()
) Z3 i7 t0 K% ?! I# ?! {# T% f, P( D   中用以等待txsync{}数据的通道txsyncCh,正是在这里被推入txsync{}的。6 R/ b. B* a6 x% \2 _2 `
   */. a" W8 ^! D8 p. F
   pm.syncTransactions(p)2 ]4 Y' E) C  u" a6 j1 g' `* M6 h
   // If we're DAO hard-fork aware, validate any remote peer with regard to the hard-fork
% d) y7 m( Y2 V) [% T$ b8 U   if daoBlock := pm.chainconfig.DAOForkBlock; daoBlock != nil {
9 E3 P. M. `% D; A2 d' F           // Request the peer's DAO fork header for extra-data validation
( j0 l2 k- R' X1 y: \1 w           if err := p.RequestHeadersByNumber(daoBlock.Uint64(), 1, 0, false); err != nil {
0 O' B: m- R5 B                   return err
) ^9 a) J! R* y& M+ B$ `           }* P; a0 O6 p$ N8 V" c& l
           // Start a timer to disconnect if the peer doesn't reply in time4 x- h) Q5 F$ T, @! c6 N
           p.forkDrop = time.AfterFunc(daoChallengeTimeout, func() {7 B# O2 h7 z& @7 T
                   p.Log().Debug("Timed out DAO fork-check, dropping")
% t* }: }* W& `6 N+ X! b                   pm.removePeer(p.id)8 |  f- {9 @5 y5 G3 D, v
           }): x) j( l, ^. t" B) H; n
           // Make sure it's cleaned up if the peer dies off
% m# Y+ g( K: n! H# E           defer func() {  c7 a& k- `* _3 z( H
                   if p.forkDrop != nil {
! \( B% ]/ Q7 [2 ?& ~. t/ z+ E! v                           p.forkDrop.Stop()/ f, j8 x( _: |
                           p.forkDrop = nil
' C. E2 Y( B; t7 G( G9 k1 d                   }# e7 n: y2 Q& i3 J( I
           }()
. ?  M7 g' Q# s+ A& S# H   }( ?! |( X$ p- a5 e  J
           //在无限循环中启动handleMsg(),当对方peer发出任何msg时,9 k# e, r+ P9 N* {: F
           //handleMsg()可以捕捉相应类型的消息并在己方进行处理。$ Q  L& G0 q  Z$ `* H
   // main loop. handle incoming messages.
6 @: x1 c+ X5 A) J7 H   for {! m2 f* ~2 ^) v& t- A
           if err := pm.handleMsg(p); err != nil {
& z. M  ?1 ~* {6 C* w5 G' c- s                   p.Log().Debug("Ethereum message handling failed", "err", err)
: y1 k$ N$ c. j) |/ K8 Z0 x4 \                   return err
1 j) }% Z* U( m( T! s: m' Z5 ^           }: F5 c+ H( `) _8 O: t7 X
   }
, }. R! h: u& d4 r3 j1 t}
BitMere.com 比特池塘系信息发布平台,比特池塘仅提供信息存储空间服务。
声明:该文观点仅代表作者本人,本文不代表比特池塘立场,且不构成建议,请谨慎对待。
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

成为第一个吐槽的人

Mohammad61417 小学生
  • 粉丝

    0

  • 关注

    0

  • 主题

    2