Hi 游客

更多精彩,请登录!

比特池塘 区块链技术 正文

深入区块链以太坊源码之p2p通信

Mohammad61417
182 0 0
一、p2p网络中分为有结构和无结构的网络
$ Z7 R& z& J% d7 ]" n2 j无结构化的+ l1 H, \. l$ x) T2 l9 A
这种p2p网络即最普通的,不对结构作特别设计的实现方案。
$ n. Z/ h2 D6 r& s: y7 v: y. |3 k$ [优点是结构简单易于组建,网络局部区域内个体可任意分布,( v( Z* e0 r8 R: N& V, f7 E
反正此时网络结构对此也没有限制;特别是在应对大量新个体加
* x- n& V- V/ C, ?, U( B入网络和旧个体离开网络(“churn”)时它的表现非常稳定。+ A, B7 a  d1 O1 d: e- g3 L1 F3 d
缺点在于在该网络中查找数据的效率太低,因为没有预知信息,
& p" z$ y5 C* M# n( r5 I所以往往需要将查询请求发遍整个网络(至少大多数个体),. A3 ]! s3 I7 \' r! d/ F/ f9 w" m
这会占用很大一部分网络资源,并大大拖慢网络中其他业务运行。2 Q; v; |7 [% p' c4 l! Z& S
结构化的:
8 P( f- `$ J) G& o& J这种p2p网络中的个体分布经过精心设计,主要目的是为了提高查询数据的效率,
8 u: a3 x7 Y" J: {7 S降低查询数据带来的资源消耗。
7 r( P0 O# U* Z, k以太坊采用了不需要结构化的结构,经过改进的非结构化(比如设计好相邻个体列表peerSet结构)  {5 _' D- V6 J) U9 U- X/ b! }4 T
网络模型可以满足需求;
" V1 g$ L9 k& N) u2 r& r二、分布式hash表(DHT)
$ I3 O# ?2 u2 ^( V6 Y$ I保存数据
  ^, |/ [9 y+ f; }% O( h- O(以下只是大致原理,具体的协议实现可能会有差异). M/ L0 I( k8 c2 _  w7 b
当某个节点得到了新加入的数据(K/V),它会先计算自己与新数据的 key 之间的“距离”;
# I* O! G  F# A; x然后再计算它所知道的其它节点与这个 key 的距离。8 J8 @% D& o& I1 w: R* Y
如果计算下来,自己与 key 的距离最小,那么这个数据就保持在自己这里。7 r, z8 v& k  K  }
否则的话,把这个数据转发给距离最小的节点。" a4 F/ q6 g- `, a/ ]
收到数据的另一个节点,也采用上述过程进行处理(递归处理)。
" T' L( r. Z7 g$ Y/ `' d: b2 W获取数据% T  y$ ^1 `$ a1 x
(以下只是大致原理,具体的协议实现可能会有差异)
" h* [( z5 V* D; k) r+ ^' o当某个节点接收到查询数据的请求(key),它会先计算自己与 key 之间的“距离”;" p! r1 t7 ?. e% b8 D7 T# {
然后再计算它所知道的其它节点与这个 key 的距离。) a2 @( c$ Q  R( m3 p0 m/ `( G
如果计算下来,自己与 key 的距离最小,那么就在自己这里找有没有 key 对应的 value。
+ F1 o' y6 A" H8 z! V: j% g+ j& q7 {有的话就返回 value,没有的话就报错。, Y& i& x2 I0 A$ L4 T
否则的话,把这个数据转发给距离最小的节点。
0 Y0 n* K! {0 o0 S* h收到数据的另一个节点,也采用上述过程进行处理(递归处理)。
& P7 i$ @/ Y6 `6 U' }三、以太坊中p2p通信的管理模块ProtocolManager  E! U5 v2 l0 {% H, W
/geth.go
6 g! X# g: C& A- [  [$ ?// Start creates a live P2P node and starts running it.. t3 [' Q: @; l6 W9 l1 Y7 a
func (n *Node) Start() error {2 \6 k( n( q; k9 A9 J5 k1 v* P
   return n.node.Start()
& i8 v' V# i( N0 e: e}
( T1 B* \2 S9 l/*/ H5 z3 |* q* Z* g
   Protocol:容纳应用程序所要求的回调函数等.并通过p2p.Server{}在新连接建立后,将其传递给通信对象peer。
* [4 {/ |- ], V- d/ ~$ {! G! ]3 j   Node.Start()中首先会创建p2p.Server{},此时Server中的Protocol[]还是空的;5 L( U+ ^4 z% `- J
   然后将Node中载入的所有实现体中的Protocol都收集起来,: N4 W+ y# ]9 M
   一并交给Server对象,作为Server.Protocols列表;然后启动Server对象,* ?. F/ J7 d' f- U; y& f
   并将Server对象作为参数去逐一启动每个实现体。
. l: V. r$ h( O$ A/ h*/+ \, [& n* x8 P4 q: ~
/node.go4 e5 w3 T; k* Q4 ?
// Start create a live P2P node and starts running it.# ^; g$ {6 @( a7 t0 \! s
func (n *Node) Start() error {
6 Y1 I1 v& u6 H& t8 P# P* U6 |   
: c; k/ |" B! W   ...' o, q5 r/ o9 r' S2 {( B  g2 U
   /*5 V8 ~/ `- D7 @
           ...# u# ?% R7 b( q# }2 E' O( L0 e
           初始化serverConfig
3 w7 t  b/ r; z- C. O/ C& m* k   */
( z3 z) p  d" A, m: ^0 l: l8 w   running := &p2p.Server{Config: n.serverConfig}( q  P+ n" f8 q$ l. W: h6 t
   ...
# U6 q* _) u! a# \   // Gather the protocols and start the freshly assembled P2P server
" _. _4 g" V9 C4 h3 f+ O; e4 x* d   for _, service := range services {
8 T5 f* T1 b: v) U, U1 W           running.Protocols = append(running.Protocols, service.Protocols()...); y+ o+ a$ T, D$ f, ^
   }
, y0 Y7 I+ x3 O3 \* f   if err := running.Start(); err != nil { //见下面的(srv *Server)Start方法1 c3 Z! W5 P: I2 Z4 N( I+ X2 {
           return convertFileLockError(err)
. r7 v  ]% q- ]  P) t0 ]5 y   }# W( G. e: f8 |; l" L
   // Start each of the services. @: k  P, k$ n8 n4 y
   started := []reflect.Type{}/ U+ w& O7 J0 V0 a
   for kind, service := range services {. C/ v' s: U, \1 X5 F4 N1 d( O
           // Start the next service, stopping all previous upon failure$ ~8 {, P+ _' ^# W
           //启动每个services通过下面的方法func (s *Ethereum) Start(srvr *p2p.Server) error {
. z5 y: z8 L+ C  d: k+ f           if err := service.Start(running); err != nil {
* O$ W( R2 j1 B4 W                   for _, kind := range started {' y1 l& p& Y; D3 R, f1 {6 g3 n
                           services[kind].Stop()
& R+ p* A; _; k% O  h: o; F; F                   }: T# O  x. {3 w
                   running.Stop()
1 K0 @' ?  J7 D) _! `7 y                   return err6 q7 w% O0 `$ O1 t
           }
3 m2 L0 u5 \; R9 }% c           ...
" C& X; ^7 {; E( Y  s   }
0 Z/ _; K3 c2 y) x}
6 \, U- w  X8 U4 l# a' }3 ^// Start starts running the server.
8 d. n% ?3 m! q, ~2 i8 U// Servers can not be re-used after stopping.& m) r- m$ E2 R7 n
func (srv *Server) Start() (err error) {
/ s- Z% G' B3 u1 o- {9 E% j# c   srv.lock.Lock()
- g9 \  G$ p% R9 h2 A/ \6 @" s   //srv.lock为了避免多线程重复启动1 K0 M% _; y. n% ?# D% q
   defer srv.lock.Unlock()
1 @; G3 f$ Q# C: y, j   if srv.running {
6 [+ T- g1 {0 |7 M           return errors.New("server already running")! R$ z, N8 s: P. Q
   }; u) p  j  o, ?0 I% @$ d3 u
   srv.running = true
- x. r$ B( u$ u: K' H9 L. o1 o1 v6 {   srv.log = srv.Config.Logger* D% Q  v' w6 w4 {1 o
   if srv.log == nil {
& [% _+ t2 Z8 L% X0 V; L           srv.log = log.New()
0 P0 Z  m  D( `7 ^  O) h$ Y7 r' n5 O   }6 `, L3 M; h8 _! P' U
   if srv.NoDial && srv.ListenAddr == "" {* Q- _" R/ Y9 s- X* [7 Y2 O
           srv.log.Warn("P2P server will be useless, neither dialing nor listening")
1 H9 K# c  v9 n7 g2 z! M   }
& m, M4 w. u5 B  P. F5 m; e   // static fields9 G  L% d) N& j. K# p( [
   if srv.PrivateKey == nil {; w' h# K1 \+ K8 }/ K+ g: B0 ?: X7 s4 _- r
           return fmt.Errorf("Server.PrivateKey must be set to a non-nil key")
; o# F# M$ \) b% H9 ?9 A+ \   }( ?4 d! E& u" H# G6 o6 ]
   //newTransport使用了newRLPX使用了rlpx.go中的网络协议。
1 `) P: n) u6 s  }0 g7 l% n   if srv.newTransport == nil {
% a6 ~( q: q$ i; X' E* u           srv.newTransport = newRLPX3 [" Y) E& x1 z5 s+ U4 w; J
   }$ S0 g; Z, L/ N/ S
   if srv.Dialer == nil {
0 g) [: T0 v# Z* L           srv.Dialer = TCPDialer{&net.Dialer{Timeout: defaultDialTimeout}}
  o0 B* f$ ]- t+ M8 H( D5 ~" z   }
1 O& r; o3 z2 _- G4 r   srv.quit = make(chan struct{})
2 o5 C  r! C/ ~/ v7 o   srv.addpeer = make(chan *conn); H4 e5 m3 P; e" k
   srv.delpeer = make(chan peerDrop)
5 \. m9 F* Z( {. y   srv.posthandshake = make(chan *conn)
2 I  {! F+ R2 X% J1 i$ ^8 b* p   srv.addstatic = make(chan *enode.Node)5 R% L4 P) l: ^* e0 T6 m) a
   srv.removestatic = make(chan *enode.Node)
0 n) E; ^1 N3 q! J8 F$ q   srv.addtrusted = make(chan *enode.Node)
( ?1 z. ^3 ]% _. f" a/ [, _; ~   srv.removetrusted = make(chan *enode.Node)
. H& I! J, h- I) n) l+ d# @% a& C   srv.peerOp = make(chan peerOpFunc)
+ B) T: \/ n$ F   srv.peerOpDone = make(chan struct{})
1 N, |, m  J$ N8 p   //srv.setupLocalNode()这里主要执行握手# H; S/ Y' K  a
   if err := srv.setupLocalNode(); err != nil {
# M) o$ `: b1 ^7 J+ o! G& m           return err
0 G0 t* i, R# }+ A# C; {! \3 z   }$ m: _; T; e& M9 R5 c
   if srv.ListenAddr != "" {
# A3 A  z* T$ h; X5 g- ~           //监听TCP端口-->用于业务数据传输,基于RLPx协议)0 z5 I# A1 t* A
           //在setupListening中有个go srv.listenLoop()去监听某个端口有无主动发来的IP连接
" B/ ?6 r# h9 [: O1 |% [           if err := srv.setupListening(); err != nil {( X7 z6 l" m: b  }/ K; ~& A# ^
                   return err
: @) w8 z: P) e8 p4 Q4 f! n. [           }
3 Y$ E/ n* E  v  K. s% n0 Q9 G  P1 G1 @2 x   }* g. p0 q3 L, C3 r3 B; y0 t$ H
   //侦听UDP端口(用于结点发现内部会启动goroutine)7 O  E# Q2 N5 K/ W+ F5 B
   if err := srv.setupDiscovery(); err != nil {
+ \. p; ^% I: }8 K' v- ^. G           return err
6 n' f# t- N7 d- B. T   }
* |7 M2 b6 R% w9 Q; Q   dynPeers := srv.maxDialedConns()8 [* K& J) m! \& A) F
   dialer := newDialState(srv.localnode.ID(), srv.StaticNodes, srv.BootstrapNodes, srv.ntab, dynPeers, srv.NetRestrict)% m) ?8 `3 d5 \! s
   srv.loopWG.Add(1)
) I' @. k  F! E/ [1 S- {7 n   // 启动新线程发起TCP连接请求
: c' G- k2 v, J   //在run()函数中,监听srv.addpeer通道有没有信息如果有远端peer发来连接请求,. P5 _- d$ I: ]$ D
   //则调用Server.newPeer()生成新的peer对象,并把Server.Protocols全交给peer。
  ~& ?( {5 E' m' ^3 l8 `   /*
% g# f$ o. Z6 S6 O5 c   case c :=  0 {
. X( H0 a3 _; O$ R% j           if s.config.LightPeers >= srvr.MaxPeers {- ], @* o+ V- X4 N2 f
                   return fmt.Errorf("invalid peer config: light peer count (%d) >= total peer count (%d)", s.config.LightPeers, srvr.MaxPeers)
* }4 z7 o9 d# i0 f6 a7 ?/ J" k3 E           }
6 E( O+ r4 |. P3 R% @           maxPeers -= s.config.LightPeers
* u+ k+ X, I' }* R   }
+ M6 a. N" G1 {  G   // Start the networking layer and the light server if requested+ g8 c$ ^$ c) ?1 w" A
   s.protocolManager.Start(maxPeers)
3 m. ~. s4 _6 |   if s.lesServer != nil {! Z( ~1 _; |. g2 v' p# Y  b# o  q
           s.lesServer.Start(srvr)
) m/ A1 d6 M, O/ i1 }   }
9 r9 J9 b  |7 x! T+ a1 q% j( [1 @   return nil
- e; v3 X* F7 v, A: ]4 x/ g2 E}
5 C' |. |" A8 z/eth/handler.go; j/ S! K; K! h" T7 w5 u5 c2 W  h
type ProtocolManager struct {9 y. o6 |8 l7 U6 U
   networkID uint64+ j+ c' m5 S0 j% [: S
   fastSync  uint32 // Flag whether fast sync is enabled (gets disabled if we already have blocks)# f6 w0 s+ ~$ O" p
   acceptTxs uint32 // Flag whether we're considered synchronised (enables transaction processing)
* A$ n6 H0 N; {8 G$ k   txpool      txPool
; J$ e. O* ]) L( q   blockchain  *core.BlockChain$ @* O" K5 P2 f0 t3 s8 U
   chainconfig *params.ChainConfig, B3 P0 D( H6 r& D
   maxPeers    int
, J+ }  R; A: l) x# C) z% M& X   //Downloader类型成员负责所有向相邻个体主动发起的同步流程。+ d$ |# T. y, h5 B
   downloader *downloader.Downloader/ U/ ], V1 F+ Q8 D, ?* `. R
   //Fetcher类型成员累积所有其他个体发送来的有关新数据的宣布消息,并在自身对照后做出安排
* _2 J& @. A; |* a7 v   fetcher    *fetcher.Fetcher
7 O: j# S' H- ?( y) \& |   //用来缓存相邻个体列表,peer{}表示网络中的一个远端个体。
; b- {7 h" z8 I% O- W/ M) i0 Q7 V) z6 J* y8 c   peers      *peerSet( p8 D1 k% p- Y1 v2 Q
   SubProtocols []p2p.Protocol3 Z* u- P8 q1 M8 Y" C* W- @
   eventMux      *event.TypeMux4 C* C/ N6 H3 H3 m! {
   txsCh         chan core.NewTxsEvent2 N- Q( ^; M: p) W' R9 T" N
   txsSub        event.Subscription& k) w6 X- P' s. S, j
   minedBlockSub *event.TypeMuxSubscription
* |  u/ k0 T+ E3 b2 C( a   
0 v9 `, g* l* [5 u3 I   //通过各种通道(chan)和事件订阅(subscription)的方式,接收和发送包括交易和区块在内的数据更新。
/ s5 P- e3 y5 I& ?" g4 ?) e/ Q0 g$ S   //当然在应用中,订阅也往往利用通道来实现事件通知。
2 Q" @( l. y; {9 G# a: L   // channels for fetcher, syncer, txsyncLoop
6 \! u+ G1 H  B+ Y   newPeerCh   chan *peer
  a4 G9 H& q2 {! W' _9 r! ]   txsyncCh    chan *txsync9 B  S5 c1 f) J' V  [
   quitSync    chan struct{}* G) A4 v, I$ p0 Y, }
   noMorePeers chan struct{}
+ |: X/ N2 F+ Y) }8 K   // wait group is used for graceful shutdowns during downloading4 m7 @. F. ^3 u: k6 [, q" x& c" S
   // and processing
/ o3 D/ |; K% V# U" u( }   wg sync.WaitGroup( R; x0 j. L& p4 n7 b
}
9 P1 R! W/ ~1 Z1 ^7 L   Start()函数是ProtocolManager的启动函数,它会在eth.Ethereum.Start()中被主动调用。
3 l# ?( a) D& Y8 qProtocolManager.Start()会启用4个单独线程(goroutine,协程)去分别执行4个函数,
9 Y$ ~3 p7 V# T: s: J4 t这也标志着该以太坊个体p2p通信的全面启动。2 C" E5 g5 p' H9 j/ v. z, Q, q
func (pm *ProtocolManager) Start(maxPeers int) {# z& J4 i; X* t, E* L; C
   pm.maxPeers = maxPeers
" q% k# P! N; X   // broadcast transactions
8 z; s  ], P3 @  K3 _0 e% A) Z   //广播交易的通道。 txsCh会作为txpool的TxPreEvent订阅通道。* Q6 F7 F* X  w0 i0 p" R
   //txpool有了这种消息会通知给这个txsCh。 广播交易的goroutine会把这个消息广播出去。
4 v/ z$ j% {5 V; b   pm.txsCh = make(chan core.NewTxsEvent, txChanSize)$ F3 c2 a+ j7 H3 J( B' f
   //订阅交易信息; u! V  A1 t. N$ `
   pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh)2 c. U, C, L4 c/ m, U" m; _- s
   3 c+ c1 Z) P( A1 B, r9 B
   go pm.txBroadcastLoop()2 L- E3 x8 ~6 L$ Q/ a( F( h
   //订阅挖矿消息。当新的Block被挖出来的时候会产生消息- U: W3 ?' _9 y: ]3 L
   // broadcast mined blocks
$ ]9 B! Q$ F' r* v: H. c0 }   pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{}). b+ e/ k: C/ ^1 B; V
   //挖矿广播 goroutine 当挖出来的时候需要尽快的广播到网络上面去。5 @0 @% `* b( k7 x9 z
   go pm.minedBroadcastLoop()
( u3 Z$ B- z* o- X2 ~   // start sync handlers4 l1 K/ v$ @' e) `2 R, V/ I& c
   // 同步器负责周期性地与网络同步,下载散列和块以及处理通知处理程序。
. B* d0 ?  i1 E3 i$ u5 {" d   go pm.syncer()$ _6 l8 {! A/ r# _) f; `6 Y
   // txsyncLoop负责每个新连接的初始事务同步。 当新的peer出现时,
2 W; p+ e# g) K( U$ f3 L   // 转发所有当前待处理的事务。为了最小化出口带宽使用,我们一次只发送一个小包。
! |) Z+ g5 ]: F# n* C. f+ S# a& F   go pm.txsyncLoop()2 ^. ?/ O: r# ~% r3 g2 Q
}" `: j( u- H5 w5 p8 i) P
//txBroadcastLoop()会在txCh通道的收端持续等待,一旦接收到有关新交易的事件,: e& ~$ s2 j$ {: S- j& V
   //会立即调用BroadcastTx()函数广播给那些尚无该交易对象的相邻个体。5 s  X' T/ ^3 f" Y# u
//------------------go pm.txBroadcastLoop()-----------------------
- ^) q2 ~" \# i# ?: ~, Qfunc (pm *ProtocolManager) txBroadcastLoop() {
/ {, ]/ q; a! h4 E* T   for {
5 S: X& S7 o% q           select {: V) Q+ l3 {/ P3 S3 H/ k
           case event := = pm.maxPeers && !p.Peer.Info().Network.Trusted {
  v- q- s& v' U% l+ o7 {8 q           return p2p.DiscTooManyPeers$ G# p) b7 E; a) X0 V1 o! C8 k) _
   }) h/ A* h( i$ v, |3 x
   p.Log().Debug("Ethereum peer connected", "name", p.Name())8 T' c" m/ p/ s* m6 w' d8 s
   // Execute the Ethereum handshake7 \5 t$ w" c7 w, T( K
   var (
5 c. j" b% Q! u  F9 {: R           genesis = pm.blockchain.Genesis()
6 ]. V. a# _9 H, k+ h8 x& W           head    = pm.blockchain.CurrentHeader()
  V0 Z) J# s8 J           hash    = head.Hash()& a& p  X; @8 d& Q: H5 a
           number  = head.Number.Uint64()
5 b1 Z# t& \% _           td      = pm.blockchain.GetTd(hash, number)
) e: }& H  X; W" W1 I+ ?   )& ]. Z) D) k  y
   //握手,与对方peer沟通己方的区块链状态
5 g7 T2 `# J, g" ?4 Q% a. C; l2 T   if err := p.Handshake(pm.networkID, td, hash, genesis.Hash()); err != nil {
1 n" U' A. N# f( J0 C5 y9 y           p.Log().Debug("Ethereum handshake failed", "err", err)8 [: g# U1 Y+ A2 P/ M+ o
           return err) S( P$ U) z. T* L: m  H4 h- p# a
   }/ f  b0 P9 C9 B! z2 `
   //初始化一个读写通道,用以跟对方peer相互数据传输。1 E5 t6 }/ b5 I, ?9 ]; v
   if rw, ok := p.rw.(*meteredMsgReadWriter); ok {
9 R4 l$ s7 Q* e4 p1 a2 [% M           rw.Init(p.version)
( [- b0 Z0 \6 y) ^* W   }
; \- z( V: p, b" a1 |$ m  O0 A9 A   // Register the peer locally
. b( K0 o, N' q! W! p1 Q   //注册对方peer,存入己方peer列表;只有handle()函数退出时,才会将这个peer移除出列表。
4 _. F! e7 }0 h4 i1 i1 g   if err := pm.peers.Register(p); err != nil {
: U3 l7 N! w! M/ q; ~6 l           p.Log().Error("Ethereum peer registration failed", "err", err)
7 w3 q! k# x# t) n9 T           return err& Z4 c/ [2 e# h/ I0 S  N+ p: p: Z
   }* G# @  O, E) d; _, W+ j
   defer pm.removePeer(p.id)- f# \$ |) T6 W7 ?3 t' u
   //Downloader成员注册这个新peer;Downloader会自己维护一个相邻peer列表。7 w# k* m5 G# o
   // Register the peer in the downloader. If the downloader considers it banned, we disconnect
2 f; [6 a) A  }$ c  o4 W, B8 ^   if err := pm.downloader.RegisterPeer(p.id, p.version, p); err != nil {
9 S9 A  X) b" R7 B           return err
$ g# X8 ]7 l/ U/ e7 w, e7 U7 S, D   }
4 B7 V  R8 ]2 x" k4 B% k   // Propagate existing transactions. new transactions appearing
. a# t, u$ s+ v& \# s; C   // after this will be sent via broadcasts.
) K% U; n4 x  n; C& H" }! V- z$ v   /*
+ B8 O3 Z8 Y5 n+ ^   调用syncTransactions(),用当前txpool中新累计的tx对象组装成一个txsync{}对象,4 g4 U9 t& k$ }
   推送到内部通道txsyncCh。还记得Start()启动的四个函数么? 其中第四项txsyncLoop()
, S5 T3 f' y- _$ I' ^   中用以等待txsync{}数据的通道txsyncCh,正是在这里被推入txsync{}的。  w; e4 k* f! w
   */
5 c9 b& I/ d: }5 K& B. E   pm.syncTransactions(p): b4 D- l; C/ R/ Y6 J2 B
   // If we're DAO hard-fork aware, validate any remote peer with regard to the hard-fork
6 ~' P& S6 T4 H& d1 D/ b   if daoBlock := pm.chainconfig.DAOForkBlock; daoBlock != nil {; A1 X! ]8 \! b: r6 k' L
           // Request the peer's DAO fork header for extra-data validation1 O% B$ r5 m+ ~5 j2 L
           if err := p.RequestHeadersByNumber(daoBlock.Uint64(), 1, 0, false); err != nil {
# ]; w3 p# g  [                   return err
  ?0 v4 k4 W5 d+ U0 H" f* R. d           }
1 K% F( q+ \: B* w           // Start a timer to disconnect if the peer doesn't reply in time
! i0 b- l! P! X9 s  g           p.forkDrop = time.AfterFunc(daoChallengeTimeout, func() {( o* Q! E/ s# e( Q6 j" p
                   p.Log().Debug("Timed out DAO fork-check, dropping")' l7 T, ^9 l7 e1 k2 Q
                   pm.removePeer(p.id)  `, y( A! J" k* m$ V6 x9 N: K
           })
! M6 |* T2 {, Q, _  ]           // Make sure it's cleaned up if the peer dies off
3 [' H# i4 K9 N2 G3 L  s# _           defer func() {& J0 M4 ~  C4 a+ Z4 }
                   if p.forkDrop != nil {
  _- p/ ~! Z, Z8 \1 {                           p.forkDrop.Stop()9 }) N) J$ `0 u) C
                           p.forkDrop = nil
  k4 @, R8 k. U                   }" ^5 W) W8 R9 B) o
           }()
' ?& O9 z; z1 r   }/ C. D2 i: O0 D8 w/ x
           //在无限循环中启动handleMsg(),当对方peer发出任何msg时,( h% F# P* f* f6 O9 ^
           //handleMsg()可以捕捉相应类型的消息并在己方进行处理。
+ r" l, s% D- k' Q0 T/ I   // main loop. handle incoming messages.
% R5 e& v: O( T7 X6 j+ c   for {9 x" F. [; h* G1 V) x/ J
           if err := pm.handleMsg(p); err != nil {
, d9 @' L# ?3 h/ P/ b- f. m7 V( j                   p.Log().Debug("Ethereum message handling failed", "err", err)
$ R" s( x# M+ p                   return err
: ~- Q8 U2 q. ^           }4 c. n  h, P0 Y6 W
   }
( G& m  k( C& P) L9 x  H' p}
BitMere.com 比特池塘系信息发布平台,比特池塘仅提供信息存储空间服务。
声明:该文观点仅代表作者本人,本文不代表比特池塘立场,且不构成建议,请谨慎对待。
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

成为第一个吐槽的人

Mohammad61417 小学生
  • 粉丝

    0

  • 关注

    0

  • 主题

    2