Hi 游客

更多精彩,请登录!

比特池塘 区块链技术 正文

深入区块链以太坊源码之p2p通信

Mohammad61417
240 0 0
一、p2p网络中分为有结构和无结构的网络
$ z3 _6 c+ k' V7 M无结构化的; p6 x: L# B1 C: b5 Z% s1 F1 j8 o
这种p2p网络即最普通的,不对结构作特别设计的实现方案。1 I  p5 g3 J( \9 H  W2 s  X
优点是结构简单易于组建,网络局部区域内个体可任意分布,
/ `* I8 y# ?. Z* ~' F9 j反正此时网络结构对此也没有限制;特别是在应对大量新个体加
/ M! H/ [& O) s: H5 K8 B" q/ ?0 g3 A入网络和旧个体离开网络(“churn”)时它的表现非常稳定。
  o7 L* Y, L$ B$ w/ }缺点在于在该网络中查找数据的效率太低,因为没有预知信息,7 y2 I' m3 l, n( H/ E) y
所以往往需要将查询请求发遍整个网络(至少大多数个体),$ n( v1 J: W: |' y' M# z
这会占用很大一部分网络资源,并大大拖慢网络中其他业务运行。, \3 ~9 a9 |" N" s4 A
结构化的:
; @% S. S+ x+ b# W2 Y3 U这种p2p网络中的个体分布经过精心设计,主要目的是为了提高查询数据的效率,
4 r" [5 [" O$ y1 u6 O# q! m9 n# [降低查询数据带来的资源消耗。$ x& V& e) d, s3 e8 ^) b
以太坊采用了不需要结构化的结构,经过改进的非结构化(比如设计好相邻个体列表peerSet结构). I: ?. H3 ^2 C4 @3 r
网络模型可以满足需求;
6 E2 w8 w& s9 J3 r. j' B) O5 |二、分布式hash表(DHT)" P# C% [+ i2 a" g! o0 g
保存数据
4 W1 o; Z% [4 ~4 t8 L8 Y3 p" v(以下只是大致原理,具体的协议实现可能会有差异)# w" L3 l) @* T8 E. K$ q
当某个节点得到了新加入的数据(K/V),它会先计算自己与新数据的 key 之间的“距离”;
. w/ W+ Q- A- C3 L: ]  X8 ~" d5 x) r然后再计算它所知道的其它节点与这个 key 的距离。
8 S$ n0 T6 ~  S# j% F如果计算下来,自己与 key 的距离最小,那么这个数据就保持在自己这里。+ j( {; q: g- |0 i5 ?
否则的话,把这个数据转发给距离最小的节点。
! k6 a) D& a$ d( [1 n收到数据的另一个节点,也采用上述过程进行处理(递归处理)。
6 a9 W2 V% G$ W2 Y/ |4 J6 d6 W- e8 k获取数据( s! P- d3 _& R) Y4 J2 E
(以下只是大致原理,具体的协议实现可能会有差异)7 y6 n  B5 [; e, \" S) o# o" D
当某个节点接收到查询数据的请求(key),它会先计算自己与 key 之间的“距离”;
" l, u8 [) F' ^+ O: E然后再计算它所知道的其它节点与这个 key 的距离。
% e" M2 f, S8 Z; M! _如果计算下来,自己与 key 的距离最小,那么就在自己这里找有没有 key 对应的 value。9 o0 C3 r# U$ J- m. S% O
有的话就返回 value,没有的话就报错。1 K$ y. a- V/ v) r
否则的话,把这个数据转发给距离最小的节点。7 l6 M4 I3 r) K
收到数据的另一个节点,也采用上述过程进行处理(递归处理)。
9 H2 L/ o  m1 u, I三、以太坊中p2p通信的管理模块ProtocolManager
9 W  b+ n- }- _# n) _1 Y/geth.go" V0 E" m8 M1 A" U1 y
// Start creates a live P2P node and starts running it.  M/ B4 _5 H' h& K2 H+ K& V
func (n *Node) Start() error {" h! L/ q( n( a; ^5 F8 R* L3 U
   return n.node.Start()
6 s8 {+ R6 a' M" {  |}
& T- O4 r/ u: R1 c/*
: t* Z( d- h; c: Z; s   Protocol:容纳应用程序所要求的回调函数等.并通过p2p.Server{}在新连接建立后,将其传递给通信对象peer。: J. m9 R$ G6 G# a/ w
   Node.Start()中首先会创建p2p.Server{},此时Server中的Protocol[]还是空的;# O2 g$ H5 ?  }  [% p
   然后将Node中载入的所有实现体中的Protocol都收集起来,0 g$ y% S) J0 ^( a/ C5 I4 x
   一并交给Server对象,作为Server.Protocols列表;然后启动Server对象,: T9 f# F+ \4 J: j7 w
   并将Server对象作为参数去逐一启动每个实现体。
" F2 U. m& ?1 \0 s- v5 X, m2 y*/
- r+ O) C7 c- E: w. H7 u/node.go
1 v+ y- Z" {' V  W// Start create a live P2P node and starts running it.
2 u  Q7 z! ~% _" x0 o. jfunc (n *Node) Start() error {4 ?, _: |, A. j9 H; u
   . _% |7 |* D4 i) ^3 i
   ...- ?2 |" t9 d0 B4 e+ v
   /*5 g; I; ]6 h/ T" l
           .../ F: c) C' @' B5 R& V( l0 @/ E
           初始化serverConfig
( ^# B& j4 e" V5 q; h) w+ u3 V   */
8 s6 D! j! x9 b! Z   running := &p2p.Server{Config: n.serverConfig}
/ H0 U& f- A4 Z6 Z3 [( k& k   ...
5 U% h) D; C" ^; T% e+ l) Y" X! @   // Gather the protocols and start the freshly assembled P2P server
0 w( x/ d- N4 O/ ~( o8 W0 A   for _, service := range services {
- x. N5 d/ i3 f7 W0 ?           running.Protocols = append(running.Protocols, service.Protocols()...); a4 R3 Z& V; f9 I, v% T
   }5 d( s5 [. j$ }! {/ s' z& r1 I
   if err := running.Start(); err != nil { //见下面的(srv *Server)Start方法5 [* h5 v7 u6 n' r: B
           return convertFileLockError(err)
2 f3 I$ J7 h6 f, k2 N2 m   }6 ~5 P) U7 {% G* |. _! v( k
   // Start each of the services; B# _7 F; V1 [$ q  c
   started := []reflect.Type{}
# T" A. K" ?5 v0 {0 H   for kind, service := range services {
, B6 B0 ^  E, ~4 C9 V8 E           // Start the next service, stopping all previous upon failure
. \" H- Q; E% p+ e* {           //启动每个services通过下面的方法func (s *Ethereum) Start(srvr *p2p.Server) error {
" q& ?8 [! x# j* [: |* B           if err := service.Start(running); err != nil {: e& s% e2 c, i+ @3 P, H+ l
                   for _, kind := range started {9 Q- i% p. _5 v  u, ?; I
                           services[kind].Stop()
9 o1 Z: _3 L' B, n' I. H1 u                   }
6 m* Z+ c. q6 a" O                   running.Stop()
" S  K5 Y( r; l7 A                   return err
2 ]- t1 m# K2 v( t) w( T) y$ ]  I           }
' n7 P7 d9 W$ \7 c           ...
* t2 l& M" B* {3 B# `' j   }
. J7 W4 s; j! t* P, r7 @}# c: u% Y3 L$ o" L- P6 r8 z  _
// Start starts running the server.! }; L4 u! R/ c8 d- B
// Servers can not be re-used after stopping.2 [0 D7 R* V' v
func (srv *Server) Start() (err error) {
" Y3 D7 Q1 e' U; T, X1 |$ u   srv.lock.Lock()
9 V* K# Q/ p: D5 r5 C   //srv.lock为了避免多线程重复启动
! A( C+ B- k: T9 O; R   defer srv.lock.Unlock()
: g* }; x5 i2 r9 q: K! }6 T& d  s& `   if srv.running {
# ~+ V( e" d! V/ h7 L           return errors.New("server already running")
9 P3 s+ ?( S+ G5 ~: B% j1 C   }
( B/ z8 K$ Y. |   srv.running = true8 F2 b; M8 o- r) ]' [* \7 }* T/ P
   srv.log = srv.Config.Logger
, @, ^# a# |* K3 O9 x5 J  o   if srv.log == nil {
' d# ^6 k3 _2 \8 e2 o# g" p           srv.log = log.New()
) a/ |3 E- M6 e  |. N' ^! G   }
3 s9 q0 V: Z; R1 W   if srv.NoDial && srv.ListenAddr == "" {9 f) {! L0 y7 C; t, d" s% I- j; z
           srv.log.Warn("P2P server will be useless, neither dialing nor listening")
: `1 |5 y. P1 l   }, n. z& S" t( [! N
   // static fields2 p8 x. B* c' q) U# y" p% \8 y$ m
   if srv.PrivateKey == nil {/ C5 r7 l9 r# o8 I
           return fmt.Errorf("Server.PrivateKey must be set to a non-nil key")& P, l, P6 q% n2 ?1 L3 r
   }) P. E. O' B/ ]
   //newTransport使用了newRLPX使用了rlpx.go中的网络协议。. s3 p, Z: |" G. k1 H- _$ Q( p
   if srv.newTransport == nil {
( ^- U3 s5 a/ E  `' K           srv.newTransport = newRLPX6 a0 K+ ?* e' V' j; W8 {
   }. L5 d) W. z# b# `
   if srv.Dialer == nil {( l" p- H+ s0 Z8 \5 E( a/ }  c% O2 D
           srv.Dialer = TCPDialer{&net.Dialer{Timeout: defaultDialTimeout}}
7 D, K4 ]8 R; I   }; i5 }5 O) S& U# h2 b8 W
   srv.quit = make(chan struct{})5 h4 \$ _* q5 x
   srv.addpeer = make(chan *conn)( D. i" q% H7 |# I
   srv.delpeer = make(chan peerDrop)
8 L4 @/ {% p' `   srv.posthandshake = make(chan *conn)( D, J# u5 E& n3 y$ V7 P! T0 J
   srv.addstatic = make(chan *enode.Node)( R2 D# {: b$ r( f: O* W6 v7 u3 f5 B0 e
   srv.removestatic = make(chan *enode.Node)3 K, U# l5 _' j* h5 t' a' p1 ?; T! V
   srv.addtrusted = make(chan *enode.Node)2 I! I/ |4 Y" H+ D
   srv.removetrusted = make(chan *enode.Node)
2 C) d" g; W1 u5 U3 r1 {' k; `# @   srv.peerOp = make(chan peerOpFunc)
. {9 w0 f& \7 b; i2 v0 }' g   srv.peerOpDone = make(chan struct{})6 o/ }; w- h0 x* ?  H6 C/ O
   //srv.setupLocalNode()这里主要执行握手
* w. x9 z( M& T, q) t( M: |   if err := srv.setupLocalNode(); err != nil {, _( G- n7 B: E/ r; W+ S. L
           return err$ {8 |% w4 G  ]! W
   }7 j1 o$ R) I0 Z% B- i
   if srv.ListenAddr != "" {: _6 l6 z5 \% e5 V( {9 v, `4 Z; }
           //监听TCP端口-->用于业务数据传输,基于RLPx协议)) h' t/ J+ I& j7 F% u6 f
           //在setupListening中有个go srv.listenLoop()去监听某个端口有无主动发来的IP连接
2 \3 }7 R9 t: |! X: k8 N           if err := srv.setupListening(); err != nil {
+ ~% u" ~, d* f8 K  }) \                   return err1 l8 P- B4 p; l( i
           }
; Y' X! a( J+ e* R$ [; P* c8 U   }$ A% x6 C8 }( O! F2 A( D. n# u+ g: a
   //侦听UDP端口(用于结点发现内部会启动goroutine)
, f5 c# i9 G9 C  @  d  K, S   if err := srv.setupDiscovery(); err != nil {
# _. |" ^4 j3 ?9 g% e           return err
* F9 r( K# x5 c- @+ b+ R* {( Y   }- Y! c4 s8 H; H3 _
   dynPeers := srv.maxDialedConns()" C# z% e9 s4 m" @! Y( U
   dialer := newDialState(srv.localnode.ID(), srv.StaticNodes, srv.BootstrapNodes, srv.ntab, dynPeers, srv.NetRestrict)  s. c5 k" M+ |3 }! O  e0 |
   srv.loopWG.Add(1); l, U' O( ^4 B1 b6 C, R
   // 启动新线程发起TCP连接请求
: C4 P8 d5 S# }3 P/ d0 W8 a   //在run()函数中,监听srv.addpeer通道有没有信息如果有远端peer发来连接请求,5 b) G" ?9 T7 a4 V: @8 a2 {9 e& |
   //则调用Server.newPeer()生成新的peer对象,并把Server.Protocols全交给peer。: M( C: L: O7 ?
   /*
4 ?3 D4 z! C, i9 D3 k   case c :=  0 {( d8 o/ T+ M: o1 u  q
           if s.config.LightPeers >= srvr.MaxPeers {  L" ~& W# j" d! K- \' m* S0 C4 W
                   return fmt.Errorf("invalid peer config: light peer count (%d) >= total peer count (%d)", s.config.LightPeers, srvr.MaxPeers)
7 H: o6 C% Q3 F, b           }- I- ^  T3 K6 Q( m+ @
           maxPeers -= s.config.LightPeers; f9 X& C$ W" s# m/ J" P; O. }
   }6 \$ {7 Z9 k/ k0 j! s; W% @6 B
   // Start the networking layer and the light server if requested3 @- E+ f! n( V" v2 Y: y
   s.protocolManager.Start(maxPeers)2 C9 B8 F) T& k- P' O
   if s.lesServer != nil {% H# a% L1 W, O% g5 A# u$ ]' ]" J
           s.lesServer.Start(srvr)" ~. t% Z+ I/ l4 K  a/ {* }
   }$ D8 x% D# @0 k, A5 x! q
   return nil
' e: q! x4 ~# J* j6 F7 \5 P}
+ ~9 `7 c+ _* x" H/eth/handler.go
( q" f5 q5 U( t& C2 H- ~type ProtocolManager struct {. m. [) j5 M8 \2 {. F
   networkID uint64
$ m* ^$ P. E; \8 N   fastSync  uint32 // Flag whether fast sync is enabled (gets disabled if we already have blocks)8 M4 G- o: O! O. Y4 P* H. \
   acceptTxs uint32 // Flag whether we're considered synchronised (enables transaction processing)7 X% u' i& L5 }5 w! \& F7 B0 h
   txpool      txPool
; \/ t6 o0 v  f   blockchain  *core.BlockChain
. C; h( x: Y  X1 ]2 S   chainconfig *params.ChainConfig
3 k: m5 k7 A2 p5 P& Z7 A   maxPeers    int& o$ ]' M7 z2 e! J) J, ^4 e$ X! |3 `
   //Downloader类型成员负责所有向相邻个体主动发起的同步流程。% `5 q. q, Q  U4 z' Q, B
   downloader *downloader.Downloader
. N( ]! \- j- C. r& K! [   //Fetcher类型成员累积所有其他个体发送来的有关新数据的宣布消息,并在自身对照后做出安排$ q: v, R1 R% f+ [4 c1 S0 C
   fetcher    *fetcher.Fetcher1 \- _3 {: q3 b, h
   //用来缓存相邻个体列表,peer{}表示网络中的一个远端个体。3 z2 w, R8 [' [% `7 A( ^
   peers      *peerSet
0 b9 X; n0 R) P/ D   SubProtocols []p2p.Protocol
1 K" t8 K# e; g5 Z, c5 ~3 @   eventMux      *event.TypeMux
, P3 X( |" f  b2 x8 P7 H+ W   txsCh         chan core.NewTxsEvent
! e; e% A2 L! C" A5 A6 Y   txsSub        event.Subscription
: o6 \: ]! x. w& O/ l4 y% `8 e   minedBlockSub *event.TypeMuxSubscription
5 _% Q: Y2 p8 P3 {   ) K8 \1 K- S+ ]: {0 K4 P- K- N0 G
   //通过各种通道(chan)和事件订阅(subscription)的方式,接收和发送包括交易和区块在内的数据更新。, @) T8 d4 L* G4 R( i- K
   //当然在应用中,订阅也往往利用通道来实现事件通知。
# _7 Z/ G7 b; ^1 B* v/ A   // channels for fetcher, syncer, txsyncLoop
0 P& [$ C7 \$ l+ K0 o   newPeerCh   chan *peer
  D2 t! T2 L; U0 ]0 i   txsyncCh    chan *txsync0 n# e/ G% n, u' x" ?- w5 O
   quitSync    chan struct{}3 s# s  E/ L1 Y$ B
   noMorePeers chan struct{}- X; a- `* `! k) G$ |
   // wait group is used for graceful shutdowns during downloading
, ^/ M. s; O! ~' U  o   // and processing
) E! t* C& Q) |$ |8 K   wg sync.WaitGroup" d, C/ u. }! s0 ^  a" D
}0 u. V8 r1 y) K7 Q
   Start()函数是ProtocolManager的启动函数,它会在eth.Ethereum.Start()中被主动调用。
6 I' [* o& M5 m) W$ @" BProtocolManager.Start()会启用4个单独线程(goroutine,协程)去分别执行4个函数,
1 s! b" e4 f1 T9 G这也标志着该以太坊个体p2p通信的全面启动。4 ^5 ~% {, G" Z' {
func (pm *ProtocolManager) Start(maxPeers int) {
/ Z. m/ Q9 e1 W8 X+ s3 I! r   pm.maxPeers = maxPeers
, J" S" R. q. H. j- C* [& R' x& [   // broadcast transactions
" n  `3 O  Z1 a8 X1 w: D   //广播交易的通道。 txsCh会作为txpool的TxPreEvent订阅通道。2 J, e" I0 k# d( p8 D/ V
   //txpool有了这种消息会通知给这个txsCh。 广播交易的goroutine会把这个消息广播出去。
8 Q& @- |1 `2 s: m- j   pm.txsCh = make(chan core.NewTxsEvent, txChanSize)' f, K4 @$ j9 a% ^' x( ^1 D# T
   //订阅交易信息
  ?3 E) j1 S0 I7 Z, m3 }# Q   pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh)  @7 _) P5 a, a4 `* S0 r
   
- S8 e( O, k$ x( ]8 Y   go pm.txBroadcastLoop()* A3 j% Q3 s3 a
   //订阅挖矿消息。当新的Block被挖出来的时候会产生消息
+ z( |3 q0 y; @5 m! r0 I   // broadcast mined blocks
* W  b" m9 Y: W9 c   pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
8 d- f( B" d9 {- u# M   //挖矿广播 goroutine 当挖出来的时候需要尽快的广播到网络上面去。
! p7 b# Z3 F# s   go pm.minedBroadcastLoop()
* l4 j2 H+ O6 F7 X" `4 B   // start sync handlers  A. g2 d: q- k# H1 L7 ^* q
   // 同步器负责周期性地与网络同步,下载散列和块以及处理通知处理程序。; q& _, l& ^8 y8 i2 y! ?9 }
   go pm.syncer()$ F3 d& Z$ v6 N3 _
   // txsyncLoop负责每个新连接的初始事务同步。 当新的peer出现时,5 z  R+ K9 z1 K' E7 M. c8 L
   // 转发所有当前待处理的事务。为了最小化出口带宽使用,我们一次只发送一个小包。2 |5 n. q0 z! d. L
   go pm.txsyncLoop()
, r# g- e, e6 T# j0 T" B8 ?+ r  W}
" d! M  V9 _: A: Z//txBroadcastLoop()会在txCh通道的收端持续等待,一旦接收到有关新交易的事件,
4 a; O7 v6 m" W( _' F3 I   //会立即调用BroadcastTx()函数广播给那些尚无该交易对象的相邻个体。
5 N6 O6 z  u. c9 x' [* k: T//------------------go pm.txBroadcastLoop()-----------------------
8 }; D; _" ~% R+ B! p) sfunc (pm *ProtocolManager) txBroadcastLoop() {
) N4 g/ @  D5 G/ W! p4 m* _7 F   for {. J; k" t5 m5 }$ G9 j) x+ c
           select {# E2 R; N% q; ^  v. f% b
           case event := = pm.maxPeers && !p.Peer.Info().Network.Trusted {
* m, K( P. Y7 G+ `5 k9 W6 N. d           return p2p.DiscTooManyPeers' C5 a& o8 D% O2 y" I8 Q0 R) H
   }: e- o" F7 M8 R9 g. W
   p.Log().Debug("Ethereum peer connected", "name", p.Name())# a4 Q9 O9 T" W/ `" A. {
   // Execute the Ethereum handshake' a! g* M& p7 Y, z3 K" G: L! Q1 y
   var (
4 ~$ p3 K, V: W7 W9 d8 L" O5 x           genesis = pm.blockchain.Genesis()
! b4 ]$ D- L4 P1 u1 f. F4 `) Y           head    = pm.blockchain.CurrentHeader(), f1 |3 N6 j" V4 p4 Y+ y
           hash    = head.Hash()" z2 B& U( J. Z
           number  = head.Number.Uint64()
# M: ?, O/ H9 N2 P$ E% E           td      = pm.blockchain.GetTd(hash, number)
- ?' e3 j9 s, p- p# n+ o& e   )
0 I9 u  e3 w9 P" m8 F# c   //握手,与对方peer沟通己方的区块链状态) d! E! H% w3 J* X& {' D& Y
   if err := p.Handshake(pm.networkID, td, hash, genesis.Hash()); err != nil {
7 s" \" A) @# V. Z6 i) a7 x           p.Log().Debug("Ethereum handshake failed", "err", err)
1 ^* [! m) Z% O5 k/ n. m           return err
& ?, b3 I5 g, x   }) l6 ?' N0 h/ E. o: ?# Q: f# `
   //初始化一个读写通道,用以跟对方peer相互数据传输。
) N$ H6 ?# E0 v* v" W4 z( b% H   if rw, ok := p.rw.(*meteredMsgReadWriter); ok {0 S* z' g3 w; y( a1 }, s
           rw.Init(p.version)
/ U, z: {) x2 p   }5 w: |! {$ f8 U6 \, O& n. {
   // Register the peer locally
6 J- j7 y6 U$ b/ I  s   //注册对方peer,存入己方peer列表;只有handle()函数退出时,才会将这个peer移除出列表。
) F! N- y" B+ x0 y3 I9 @. }   if err := pm.peers.Register(p); err != nil {
4 v+ z' J6 g+ O3 E/ s" H1 y2 G( a           p.Log().Error("Ethereum peer registration failed", "err", err)4 g5 T* ^3 G. |
           return err
8 I5 n8 D8 N# M* d   }" X0 R& }, i2 k  C# {
   defer pm.removePeer(p.id)
/ y1 L- x3 h" q   //Downloader成员注册这个新peer;Downloader会自己维护一个相邻peer列表。% Q' U9 m/ r8 z
   // Register the peer in the downloader. If the downloader considers it banned, we disconnect" G& \9 E* F( ^( C& B
   if err := pm.downloader.RegisterPeer(p.id, p.version, p); err != nil {
3 s- |9 V( Y" L5 f           return err
% U; H1 Z7 P( }# x5 A' n8 e   }
6 t# Y( F' O: l5 H+ Y3 ?0 F   // Propagate existing transactions. new transactions appearing$ I$ i. c; j  D/ M0 J) u& U0 Z
   // after this will be sent via broadcasts.
! @& D7 T5 @1 W# r   /*
# R, V* E; r5 v# j9 L9 C   调用syncTransactions(),用当前txpool中新累计的tx对象组装成一个txsync{}对象,' h& N3 B+ t+ V  n5 A
   推送到内部通道txsyncCh。还记得Start()启动的四个函数么? 其中第四项txsyncLoop()
/ v- d$ w$ f3 G0 b7 q. a   中用以等待txsync{}数据的通道txsyncCh,正是在这里被推入txsync{}的。3 B  H& z& Q' [9 V
   */" Y; ^/ \8 e) _4 r# m
   pm.syncTransactions(p)5 Y* o. ?+ Y# h' o! ^+ E
   // If we're DAO hard-fork aware, validate any remote peer with regard to the hard-fork' N$ V0 z3 w3 M- L
   if daoBlock := pm.chainconfig.DAOForkBlock; daoBlock != nil {
, @$ ^9 H1 O. o1 r2 X           // Request the peer's DAO fork header for extra-data validation/ j( w9 w) `5 z! P8 U$ u
           if err := p.RequestHeadersByNumber(daoBlock.Uint64(), 1, 0, false); err != nil {
- S0 n8 _/ v5 P                   return err
2 E, I3 V1 A7 O7 X) l           }
7 M. Y3 |( {* X5 N( w           // Start a timer to disconnect if the peer doesn't reply in time+ I; m5 i2 J( v
           p.forkDrop = time.AfterFunc(daoChallengeTimeout, func() {
" `7 v6 @' u' }# P) f                   p.Log().Debug("Timed out DAO fork-check, dropping")# u  X' o  ]; a* G: F' b
                   pm.removePeer(p.id)2 T1 D' N  A% ]0 Y. X7 A
           })
  l8 T! k0 i% z* V& O1 ^! x           // Make sure it's cleaned up if the peer dies off% r2 L8 _, x- h/ \% O. k
           defer func() {
% t+ ~: h/ ~. b6 p) e+ Y# w                   if p.forkDrop != nil {
; U% @* N- N2 v" j4 k$ ~                           p.forkDrop.Stop()
) ^' m/ z/ ], c7 H! k' Z                           p.forkDrop = nil
# d! X: [. W: Q$ R$ H- }9 S                   }+ q! [! V3 ], H$ ?9 K
           }()
. b% N! y! y' Q- ]   }
8 V; I) ~1 v3 U3 _           //在无限循环中启动handleMsg(),当对方peer发出任何msg时,1 T1 |8 i* d8 P0 w) W# w6 c# d
           //handleMsg()可以捕捉相应类型的消息并在己方进行处理。
4 T$ S1 e, a) B4 G0 }  K" n   // main loop. handle incoming messages.3 ?6 J6 s6 n/ c+ N
   for {8 ]& h9 p) p# v  M' q3 j3 `
           if err := pm.handleMsg(p); err != nil {
3 D: l/ s/ s; Z! E& Q" |                   p.Log().Debug("Ethereum message handling failed", "err", err)
3 h' [1 q' K8 T" U, \4 T                   return err/ Q- I4 @' \- V& k
           }
, h: L+ S& J. b6 F" C1 E   }
3 @& t- @( T' z! e1 e( a}
BitMere.com 比特池塘系信息发布平台,比特池塘仅提供信息存储空间服务。
声明:该文观点仅代表作者本人,本文不代表比特池塘立场,且不构成建议,请谨慎对待。
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

成为第一个吐槽的人

Mohammad61417 小学生
  • 粉丝

    0

  • 关注

    0

  • 主题

    2