Hi 游客

更多精彩,请登录!

比特池塘 区块链技术 正文

深入区块链以太坊源码之p2p通信

Mohammad61417
80 0 0
一、p2p网络中分为有结构和无结构的网络) v+ \9 ^% P& C
无结构化的# ~# V5 |2 I! k2 y' E% O% p
这种p2p网络即最普通的,不对结构作特别设计的实现方案。9 j# ^4 }+ n3 U% o, R. x& U
优点是结构简单易于组建,网络局部区域内个体可任意分布,9 D/ m# s3 P5 O8 O5 L, k2 j% L
反正此时网络结构对此也没有限制;特别是在应对大量新个体加8 s  U  c$ Q2 _! ?6 F1 A
入网络和旧个体离开网络(“churn”)时它的表现非常稳定。. L3 j  g8 ~: v
缺点在于在该网络中查找数据的效率太低,因为没有预知信息,8 @. U5 d5 x5 Z
所以往往需要将查询请求发遍整个网络(至少大多数个体),  Z3 C) C% ]# o/ `! r8 V
这会占用很大一部分网络资源,并大大拖慢网络中其他业务运行。
9 H; Q: w$ w/ \: I结构化的:# u0 _) z- |+ Z, p( Z: l4 A% }
这种p2p网络中的个体分布经过精心设计,主要目的是为了提高查询数据的效率," [& s+ r- R& P# E. w
降低查询数据带来的资源消耗。6 [$ t/ T/ Y) S$ w. W" y" O
以太坊采用了不需要结构化的结构,经过改进的非结构化(比如设计好相邻个体列表peerSet结构)
1 @; {; r4 D/ _- o! B网络模型可以满足需求;
7 Q, `" p0 \# n9 d二、分布式hash表(DHT)8 ?  e. P7 S& Z% w  P
保存数据; s9 v2 p# G2 x$ @( V' J
(以下只是大致原理,具体的协议实现可能会有差异)3 C5 o1 _+ O8 i1 a, O
当某个节点得到了新加入的数据(K/V),它会先计算自己与新数据的 key 之间的“距离”;$ H" W6 |* y9 |: M; w
然后再计算它所知道的其它节点与这个 key 的距离。& [0 g) n: T1 o8 d  d. \" G+ O
如果计算下来,自己与 key 的距离最小,那么这个数据就保持在自己这里。* {- F2 H3 n: d4 P. A
否则的话,把这个数据转发给距离最小的节点。5 n  S5 b# p- |2 u
收到数据的另一个节点,也采用上述过程进行处理(递归处理)。
( @3 O+ D3 z0 a& ^获取数据
. J( v0 N6 @9 |$ R9 O- r, C  j(以下只是大致原理,具体的协议实现可能会有差异). X9 o! Q' X( ?6 c1 A3 l
当某个节点接收到查询数据的请求(key),它会先计算自己与 key 之间的“距离”;
4 {- p9 W, ^( R然后再计算它所知道的其它节点与这个 key 的距离。: g( A& W; a: u) u
如果计算下来,自己与 key 的距离最小,那么就在自己这里找有没有 key 对应的 value。
* H& b+ [1 N( Y8 g( g% U$ i有的话就返回 value,没有的话就报错。
4 a! K' p: M0 W否则的话,把这个数据转发给距离最小的节点。4 C$ `7 f* }. T- A
收到数据的另一个节点,也采用上述过程进行处理(递归处理)。$ B& e; x0 A6 j& b
三、以太坊中p2p通信的管理模块ProtocolManager
: _1 z5 x) @8 |' b8 z/geth.go# I0 ]4 x1 `* G7 Y4 [
// Start creates a live P2P node and starts running it." Z: _7 g0 K# a- }* Z* l
func (n *Node) Start() error {) g- Y6 w3 R! P1 w( W
   return n.node.Start()
# k: O" d4 V2 q! _7 l! E9 @, A}
$ C1 x1 E. P7 q* [) U! j. R/*
$ \8 E5 i2 ]- Q4 S) r: S8 x; [   Protocol:容纳应用程序所要求的回调函数等.并通过p2p.Server{}在新连接建立后,将其传递给通信对象peer。
6 y4 w6 g  x/ D. R# d  D# H, R   Node.Start()中首先会创建p2p.Server{},此时Server中的Protocol[]还是空的;
0 y! V# q/ i# Y   然后将Node中载入的所有实现体中的Protocol都收集起来,# z/ P# D) y* F# [" g0 s
   一并交给Server对象,作为Server.Protocols列表;然后启动Server对象,: O; E! h# o' o2 b& T
   并将Server对象作为参数去逐一启动每个实现体。
7 G. t! |  R! b' C8 B*/
; p+ Q% W% L$ x$ A; U( a4 q. O5 o! Q! p/node.go
( u( p, z. [) ^( F$ G3 |// Start create a live P2P node and starts running it.. ~. [  o; w9 x8 y) S7 f8 F  s  I
func (n *Node) Start() error {
) o4 B! l6 C0 y0 W& Q! P   
2 @( m  \# F& L- @   ...
, e/ [( ^+ O* D8 \6 Y5 ?7 ]   /*
" B0 j" ?( Z, B6 p  T% @           ...
/ V; @& v( D( U/ v+ a# V+ e           初始化serverConfig
9 h9 }; H4 ^3 v8 z- U8 [" }   */
8 Z4 T$ B, a% {% X  Q) ?   running := &p2p.Server{Config: n.serverConfig}9 ?7 G. q- F0 X* J
   ...8 ?  W1 j* ^3 V3 R- M  D* o7 k# Z1 x2 x
   // Gather the protocols and start the freshly assembled P2P server
% v; n* G: L$ z; i   for _, service := range services {. t) r$ b! t% x! k) [4 G
           running.Protocols = append(running.Protocols, service.Protocols()...)
  U5 t  j9 }0 b/ Y' l   }
% I8 S8 ^6 W7 g% s8 c1 j  i2 k   if err := running.Start(); err != nil { //见下面的(srv *Server)Start方法* E- B' ~( g* E
           return convertFileLockError(err)
& F' Z2 a% L3 l9 I$ r; {$ a  {4 I   }2 c. p& w# ~$ Z* n2 e( Y, n
   // Start each of the services- ~6 E1 I- I4 s  N
   started := []reflect.Type{}- X- m9 R7 T1 X: g# E
   for kind, service := range services {- f" Q& c0 W. x# T4 `. o# ^
           // Start the next service, stopping all previous upon failure
+ \$ n- A4 H% L) c           //启动每个services通过下面的方法func (s *Ethereum) Start(srvr *p2p.Server) error {
6 g& {; ^- K/ Y( S$ Z           if err := service.Start(running); err != nil {
: T; J9 j# `9 n' c% B7 y                   for _, kind := range started {
. g4 D' C8 I) Q                           services[kind].Stop()
+ A8 o" w( d9 T                   }
3 m  F3 Q/ E* Z( U) ]                   running.Stop()' |5 k* k, f4 }! }
                   return err1 [* ?  F2 T9 N% w0 C
           }
+ H, v% H. n0 o5 S5 j& T           ...# P& }7 T" ]' _( t7 y. n
   }
* i2 Q4 `& \$ ]) `3 n$ d}
* J8 [" }5 k7 m" @$ ]// Start starts running the server.
8 B& [4 x) k+ J' _// Servers can not be re-used after stopping./ a1 s1 }1 F1 X& w2 Z4 V8 Q- X  x# L
func (srv *Server) Start() (err error) {: n) X; X2 L% {( @* ~& i+ d4 O+ L
   srv.lock.Lock()
1 W3 }6 m* O" u) w3 e$ Z& X0 k   //srv.lock为了避免多线程重复启动( Q2 }) t7 _: T& l
   defer srv.lock.Unlock()
* S3 S+ j4 |# i# y4 i2 y   if srv.running {
7 {, E7 \* J, {. \" }' d           return errors.New("server already running")
% i5 i: n, V7 `/ z9 a2 J   }8 S4 E; c' ?* z8 ~
   srv.running = true
% o2 Y  c% v  I  T   srv.log = srv.Config.Logger
  e0 i9 P/ i/ h) d( N   if srv.log == nil {
" q( x( R3 c0 l7 m" Q  ^) P           srv.log = log.New()
+ `8 u2 f% Z  p$ u   }& g! k0 ^* l8 ^
   if srv.NoDial && srv.ListenAddr == "" {
6 S5 \( _. s- L, C* x9 S           srv.log.Warn("P2P server will be useless, neither dialing nor listening")3 S% |4 b# i2 ]; l7 m
   }
6 S  s4 e5 Y( l   // static fields
0 j2 [# P$ M' ?$ v( R   if srv.PrivateKey == nil {  @+ W2 z' y+ L( {
           return fmt.Errorf("Server.PrivateKey must be set to a non-nil key")
0 o; v$ [4 b: W5 _   }9 F% j7 s) y4 G* f
   //newTransport使用了newRLPX使用了rlpx.go中的网络协议。
% f& ]3 ^$ K# S/ }4 O/ K9 V% g   if srv.newTransport == nil {4 c; ^% j" s& Z! g2 O: m
           srv.newTransport = newRLPX' n( D6 B2 D7 j( F% `3 F% J
   }# e' Y5 s9 x& s: l6 D. ~
   if srv.Dialer == nil {
1 a! S* ^! T' s/ J/ Y5 K6 \           srv.Dialer = TCPDialer{&net.Dialer{Timeout: defaultDialTimeout}}
- e/ c) g' }3 p4 y' b   }
7 p4 ?" p) ?* B   srv.quit = make(chan struct{})
$ A6 j4 y9 r6 O2 Q   srv.addpeer = make(chan *conn)( F9 \' I0 a; c1 m1 v) r
   srv.delpeer = make(chan peerDrop)
3 G/ V1 X/ u" x   srv.posthandshake = make(chan *conn)5 Y' Q0 Q9 v/ B0 y
   srv.addstatic = make(chan *enode.Node)5 Q" b" z9 a' g. i+ H
   srv.removestatic = make(chan *enode.Node)  ^9 G+ m! v1 ^" e# S4 q
   srv.addtrusted = make(chan *enode.Node)
, k* ?$ i: t) W+ j1 ?9 {   srv.removetrusted = make(chan *enode.Node)# b% G! {( |8 @! l# O
   srv.peerOp = make(chan peerOpFunc)7 f' `+ y) P2 W; j, M
   srv.peerOpDone = make(chan struct{})
: Y) J) C3 d2 ~- }" g3 d   //srv.setupLocalNode()这里主要执行握手! c& X) B' e; o) d
   if err := srv.setupLocalNode(); err != nil {
/ D; Q' L& D% ]4 t. E; K- m           return err0 ~9 }' k' h3 r) h
   }
. _* a3 D  P+ \/ `7 o   if srv.ListenAddr != "" {
9 m8 |; U. j" C( _& j/ v; b           //监听TCP端口-->用于业务数据传输,基于RLPx协议)
* G1 X6 q+ Y$ C/ d           //在setupListening中有个go srv.listenLoop()去监听某个端口有无主动发来的IP连接
4 C7 ?/ Z* B% H5 H: z. l  B1 A           if err := srv.setupListening(); err != nil {3 Y4 s, g1 g: B
                   return err4 Y; ^) r8 O9 F7 e9 ]- S  h/ \
           }
8 f8 U1 N- X1 k0 W% G. a4 h8 {   }
  E& T0 R! k: Y- e6 w2 d0 \2 m   //侦听UDP端口(用于结点发现内部会启动goroutine)( |( B: h, j9 c( L5 {) d; M0 C
   if err := srv.setupDiscovery(); err != nil {. O3 y/ J8 Q, r( A! s* m' }
           return err
1 t, K5 F0 ~) r# i) O5 G3 b   }8 t0 B( n* H- G3 k0 J8 M
   dynPeers := srv.maxDialedConns()
" @& Q" G7 G9 _7 k! B0 w, D   dialer := newDialState(srv.localnode.ID(), srv.StaticNodes, srv.BootstrapNodes, srv.ntab, dynPeers, srv.NetRestrict)1 N/ @/ u2 y2 t: T* `% u
   srv.loopWG.Add(1)
" q9 S  K- z% o7 j( w7 V   // 启动新线程发起TCP连接请求
. \# Q8 S  `: \  E( b. k1 t) G   //在run()函数中,监听srv.addpeer通道有没有信息如果有远端peer发来连接请求,: C# E# t+ p& f  w- K. P0 K5 z2 X
   //则调用Server.newPeer()生成新的peer对象,并把Server.Protocols全交给peer。
( Q3 r6 ?4 y. n' d5 O, Z. J4 k5 s   /*, C2 f, X% K% @2 R$ E' E6 m
   case c :=  0 {9 f; y" I# _2 n* Z& q+ ]
           if s.config.LightPeers >= srvr.MaxPeers {) Y! F* Z: w! {: H3 ?
                   return fmt.Errorf("invalid peer config: light peer count (%d) >= total peer count (%d)", s.config.LightPeers, srvr.MaxPeers)
3 K* x& ?# v6 K& @           }4 D8 v& i: _" L" O; [4 q  D
           maxPeers -= s.config.LightPeers
8 j$ m, X* ?! v: T! [$ ?% v   }
' R# w& J6 C) z   // Start the networking layer and the light server if requested5 D  U5 Z# U: s* U: p3 E
   s.protocolManager.Start(maxPeers)
8 i$ e( ^. h. W4 k   if s.lesServer != nil {
0 G6 u! q/ @; U* f, v  N9 U3 S7 J7 J( @           s.lesServer.Start(srvr)) e. Q2 n3 S# r0 z
   }
& D( C' h8 p% z   return nil- G* T5 u  {- y+ e+ t* E% ]
}
/ y0 o! |$ ^3 F; |5 k/ b# a) n/eth/handler.go
' x3 x* L) A* b. ~# Htype ProtocolManager struct {# A( O( x$ G- R  e
   networkID uint64
) c, e& U/ @6 C* x3 O   fastSync  uint32 // Flag whether fast sync is enabled (gets disabled if we already have blocks)" x# d  X3 \; n4 W6 M$ j5 K8 I4 u
   acceptTxs uint32 // Flag whether we're considered synchronised (enables transaction processing)
5 s# ]5 V( ?6 Q   txpool      txPool
3 L6 [+ i$ L/ K; n7 r. R   blockchain  *core.BlockChain( S  R, U# Q+ \/ o- L5 S
   chainconfig *params.ChainConfig
5 w( k+ B+ k# _; ~   maxPeers    int9 ^" L5 T' C& }
   //Downloader类型成员负责所有向相邻个体主动发起的同步流程。0 D3 u7 ^* w& P' _: W+ u  G
   downloader *downloader.Downloader
5 `( C6 j2 T- y' K   //Fetcher类型成员累积所有其他个体发送来的有关新数据的宣布消息,并在自身对照后做出安排1 B1 P! G6 [* }9 ?+ g( O1 U
   fetcher    *fetcher.Fetcher% j( z& I) o' @4 N: i
   //用来缓存相邻个体列表,peer{}表示网络中的一个远端个体。
5 g# o( o% l; }7 D" i1 {   peers      *peerSet" W) c3 i: |7 A/ E! t1 j8 A8 V
   SubProtocols []p2p.Protocol
, {& Z7 f: f" h/ r3 ]   eventMux      *event.TypeMux
  J- |* D0 Z0 q$ U( C   txsCh         chan core.NewTxsEvent1 m' D$ L, k: R! D) g9 V
   txsSub        event.Subscription
3 X7 L5 Z* c. c   minedBlockSub *event.TypeMuxSubscription5 ?. v; z# I# c# ^9 l
   1 v0 K2 O( N0 y! V5 n( p8 l) g3 A9 r6 n
   //通过各种通道(chan)和事件订阅(subscription)的方式,接收和发送包括交易和区块在内的数据更新。
" W4 @9 C: c* `3 f( E   //当然在应用中,订阅也往往利用通道来实现事件通知。
9 T7 R+ q! }2 M' z   // channels for fetcher, syncer, txsyncLoop) z: @: E. t3 k: v3 E
   newPeerCh   chan *peer' \: Z& m7 X7 a/ Y
   txsyncCh    chan *txsync3 d$ u2 W8 y5 I& k0 B
   quitSync    chan struct{}
; H: Q5 O4 h- k! A" _   noMorePeers chan struct{}& h9 o# i$ W+ {' ?+ e$ _; U
   // wait group is used for graceful shutdowns during downloading
6 P; i, Y  {  h  J+ t8 G) ~   // and processing
& q/ j) |- Z! n( D0 f; Q   wg sync.WaitGroup! L+ [- R. B# P: l# _% I9 l. K, f
}" |- B! L2 J7 c" v" `
   Start()函数是ProtocolManager的启动函数,它会在eth.Ethereum.Start()中被主动调用。
6 M, N& }9 W' J) G$ o3 cProtocolManager.Start()会启用4个单独线程(goroutine,协程)去分别执行4个函数,0 c: o4 Q( ?3 ~9 r) N. v2 U8 L
这也标志着该以太坊个体p2p通信的全面启动。) o, ~1 V" l6 S. H# W" h# `
func (pm *ProtocolManager) Start(maxPeers int) {
% D5 h  B! k+ \0 o" F# k   pm.maxPeers = maxPeers+ m( ^! q' o1 V! l
   // broadcast transactions
' Y& j/ K5 w4 g3 k% F4 i% B   //广播交易的通道。 txsCh会作为txpool的TxPreEvent订阅通道。
' E; y+ m3 e3 m+ u9 C2 u& d# R; r   //txpool有了这种消息会通知给这个txsCh。 广播交易的goroutine会把这个消息广播出去。
: s* X+ {. G2 c# Y% }$ x   pm.txsCh = make(chan core.NewTxsEvent, txChanSize)
+ W7 S; V1 ^3 ~$ \6 R2 \2 F   //订阅交易信息( C* V1 W% j" r" V' [
   pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh)4 K" s. ]9 {' ~, d) k. d3 C7 G/ D
   
5 x% S  H. ]6 J" v7 B! E; p   go pm.txBroadcastLoop()8 j- ^. \) p: w0 }
   //订阅挖矿消息。当新的Block被挖出来的时候会产生消息
3 L& q  [+ Q9 Q& R+ {" d   // broadcast mined blocks" e  ^, `" z: l0 }
   pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})6 Y" s. N' W4 y5 N/ h; _0 J; \3 \
   //挖矿广播 goroutine 当挖出来的时候需要尽快的广播到网络上面去。
/ }4 S: w+ p" }+ [4 U$ s   go pm.minedBroadcastLoop()& X* @  n) q% G& K: [* a3 \! L
   // start sync handlers
: S7 i5 ~5 E' e   // 同步器负责周期性地与网络同步,下载散列和块以及处理通知处理程序。$ g  z. y+ ^! n. _2 I2 j  C
   go pm.syncer()
' f& c( j$ I# _   // txsyncLoop负责每个新连接的初始事务同步。 当新的peer出现时,) \& |* T) n% L: w+ P
   // 转发所有当前待处理的事务。为了最小化出口带宽使用,我们一次只发送一个小包。
6 Z/ D" ]2 v# M4 `   go pm.txsyncLoop()# U" D1 n9 S' Z  a
}
8 C, q9 |- {2 j- H6 m* `//txBroadcastLoop()会在txCh通道的收端持续等待,一旦接收到有关新交易的事件,
& v4 M- S7 O2 x7 D8 L2 U   //会立即调用BroadcastTx()函数广播给那些尚无该交易对象的相邻个体。
1 w# L0 c3 O$ G) x! C8 m% Z1 n//------------------go pm.txBroadcastLoop()-----------------------
8 s1 H" \( _& F+ E5 gfunc (pm *ProtocolManager) txBroadcastLoop() {
; o9 j4 C; S3 }) Y; r+ I1 @   for {
6 l- j# D6 ^8 y           select {
" m# q/ K4 U" q5 T+ h) u           case event := = pm.maxPeers && !p.Peer.Info().Network.Trusted {
6 m- G5 ~+ D+ B. G- ~; V, i# [           return p2p.DiscTooManyPeers8 d4 E% V/ q. S! ^0 j# L$ N( {) ?3 D) P
   }
. u  l- J6 ?# R6 L' q+ R   p.Log().Debug("Ethereum peer connected", "name", p.Name())8 A7 ^! m9 O$ i/ f
   // Execute the Ethereum handshake' G3 Q) j6 k- ~
   var (
9 B4 ?* |4 _4 j* W           genesis = pm.blockchain.Genesis()  g( o- [0 C; }( @7 T7 M
           head    = pm.blockchain.CurrentHeader()
+ q% \. [4 ]3 z8 V           hash    = head.Hash()
( K2 W5 l' M$ I0 D5 ~0 N           number  = head.Number.Uint64()1 y( X0 p! b1 F# S  P) e
           td      = pm.blockchain.GetTd(hash, number)
( k+ k# s& k1 J5 R3 B' K, Y9 A7 R   )
) Z& i4 W/ }3 P+ Y* n   //握手,与对方peer沟通己方的区块链状态
8 V( H1 E$ W! C6 k   if err := p.Handshake(pm.networkID, td, hash, genesis.Hash()); err != nil {- m5 D* v: R" ?: V7 _9 ?
           p.Log().Debug("Ethereum handshake failed", "err", err)6 z# s5 t' X; d) W% \& s& S
           return err
5 b$ ?4 j% {6 m) d2 Z( v   }8 h4 G0 p% |* a( Y* \
   //初始化一个读写通道,用以跟对方peer相互数据传输。9 V; R7 R5 t1 J  {. p) s6 e' [
   if rw, ok := p.rw.(*meteredMsgReadWriter); ok {+ w! n3 P: [1 y% R% @/ g5 B; a. z
           rw.Init(p.version)3 I6 K" U5 `, Q! U3 ]8 @* W
   }2 R* S/ Y, o" B/ q
   // Register the peer locally4 T7 ?' _% r8 j$ i
   //注册对方peer,存入己方peer列表;只有handle()函数退出时,才会将这个peer移除出列表。: r* \8 `  A' v! W
   if err := pm.peers.Register(p); err != nil {+ e4 I8 s3 }% E4 K# V
           p.Log().Error("Ethereum peer registration failed", "err", err)
& p. |  U4 c( i7 `% ]- n1 P( w2 O8 n           return err! t$ P( t8 Z% ~2 x$ f) D
   }! v) I5 G# h* G3 V% ]
   defer pm.removePeer(p.id)
; K+ A8 B, H) z2 r+ w9 L/ n   //Downloader成员注册这个新peer;Downloader会自己维护一个相邻peer列表。
% i: j* T* c7 v) O4 a: z   // Register the peer in the downloader. If the downloader considers it banned, we disconnect
# ?; x0 ]8 I0 @% [# Z; J7 M! y/ r* V8 u   if err := pm.downloader.RegisterPeer(p.id, p.version, p); err != nil {
$ _1 N% V, k0 m           return err
( y7 t9 c/ T" o* O   }( ?- h% H, Z. m1 J* E  z2 I
   // Propagate existing transactions. new transactions appearing7 E( M  b/ J9 N: o
   // after this will be sent via broadcasts.
" L" r" F" v  W) w' A9 }9 |( Z   /** m, Y' p0 }; ~3 x" I' n
   调用syncTransactions(),用当前txpool中新累计的tx对象组装成一个txsync{}对象,/ n- {/ l0 L5 u3 J3 Y( W. v
   推送到内部通道txsyncCh。还记得Start()启动的四个函数么? 其中第四项txsyncLoop()5 r9 q3 s) W# P- g! r
   中用以等待txsync{}数据的通道txsyncCh,正是在这里被推入txsync{}的。
% Q" `8 R7 ~' J2 c9 }7 A) j   */
3 A7 i0 T% |8 g* G   pm.syncTransactions(p)
( s  t4 N) |! K1 r2 i$ X   // If we're DAO hard-fork aware, validate any remote peer with regard to the hard-fork
& a5 t# |6 p( O( B0 ~. M; T   if daoBlock := pm.chainconfig.DAOForkBlock; daoBlock != nil {  i: L6 `9 `$ Y
           // Request the peer's DAO fork header for extra-data validation+ H* ?7 ~  s3 V" D
           if err := p.RequestHeadersByNumber(daoBlock.Uint64(), 1, 0, false); err != nil {# h# f3 C; t$ ~6 T- T
                   return err: m5 ?) S8 n# g/ |
           }
9 {- {2 g" D% J' h$ ?           // Start a timer to disconnect if the peer doesn't reply in time
$ ], e' X2 w0 f0 ?& w* p6 u           p.forkDrop = time.AfterFunc(daoChallengeTimeout, func() {
$ `$ X& A) u1 R                   p.Log().Debug("Timed out DAO fork-check, dropping")
8 O+ n6 y& g7 ]                   pm.removePeer(p.id)  Y- {/ J. \* L# s/ X
           })" f4 d% N) T5 Z! b
           // Make sure it's cleaned up if the peer dies off2 e. h4 a$ a0 F! P6 ~
           defer func() {
, b% K8 @3 Z7 c% g                   if p.forkDrop != nil {$ X0 D7 L* [6 T5 g; K9 }
                           p.forkDrop.Stop()
& ]  e9 S* P& j" ]) h4 q3 u" N5 E                           p.forkDrop = nil
' l4 t/ X) w0 c                   }4 |- n( v0 ^0 c
           }()# W# Z6 d6 l2 k8 a# `
   }/ ~" f7 {7 X" U( h! U7 H5 P: b
           //在无限循环中启动handleMsg(),当对方peer发出任何msg时," _# r; o8 S2 H% v  {7 ~
           //handleMsg()可以捕捉相应类型的消息并在己方进行处理。
- R" ]$ {1 D& y9 I   // main loop. handle incoming messages.8 ^4 f0 W' a$ V
   for {
% S: A* y* F9 w6 u- R           if err := pm.handleMsg(p); err != nil {
# W- E$ }( Q" s8 }                   p.Log().Debug("Ethereum message handling failed", "err", err)" S0 L+ p- C8 G( q
                   return err
+ B' \# R2 o! O0 C- \           }) T4 i% v# t! a& U
   }) L# ^1 d% _8 I* e
}
BitMere.com 比特池塘系信息发布平台,比特池塘仅提供信息存储空间服务。
声明:该文观点仅代表作者本人,本文不代表比特池塘立场,且不构成建议,请谨慎对待。
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

成为第一个吐槽的人

Mohammad61417 小学生
  • 粉丝

    0

  • 关注

    0

  • 主题

    2