Hi 游客

更多精彩,请登录!

比特池塘 区块链技术 正文

深入区块链以太坊源码之p2p通信

Mohammad61417
133 0 0
一、p2p网络中分为有结构和无结构的网络2 k, ~7 E; ?2 ]. ?$ |7 j/ A2 J
无结构化的. B- d2 B$ V' H/ p9 p6 T6 \2 B
这种p2p网络即最普通的,不对结构作特别设计的实现方案。
  ]' g6 |; P  x5 X9 A, h, z( B优点是结构简单易于组建,网络局部区域内个体可任意分布,9 N5 l* {5 a9 @* L. ?; @7 \- Q
反正此时网络结构对此也没有限制;特别是在应对大量新个体加- S3 N3 e& X* ~  ^
入网络和旧个体离开网络(“churn”)时它的表现非常稳定。
/ X0 d" s" N$ O6 I' V. ]缺点在于在该网络中查找数据的效率太低,因为没有预知信息,* E6 ~; R8 m4 e5 f3 t
所以往往需要将查询请求发遍整个网络(至少大多数个体),
/ r0 k- R) y. a+ a这会占用很大一部分网络资源,并大大拖慢网络中其他业务运行。/ E5 x3 m9 p. j  d2 \" f
结构化的:
: E( s6 j! R" M/ @2 @这种p2p网络中的个体分布经过精心设计,主要目的是为了提高查询数据的效率,- X4 s; r9 L' ]+ G1 e
降低查询数据带来的资源消耗。
2 Q. j7 t3 H( n/ y! n% e以太坊采用了不需要结构化的结构,经过改进的非结构化(比如设计好相邻个体列表peerSet结构)
1 t& u1 O: L; w" }! {网络模型可以满足需求;
& U5 f& p1 i/ z; _; I% {2 z% j6 `二、分布式hash表(DHT)0 C: i# {. S* K* g- }( o2 w
保存数据! v5 M1 I9 q, g0 g
(以下只是大致原理,具体的协议实现可能会有差异)
5 D$ k& c, W6 I/ f; c% v3 s) }当某个节点得到了新加入的数据(K/V),它会先计算自己与新数据的 key 之间的“距离”;
- s  o5 F! P1 J& Y* T3 c然后再计算它所知道的其它节点与这个 key 的距离。
, ~6 B- W' B% \如果计算下来,自己与 key 的距离最小,那么这个数据就保持在自己这里。! A) `. q; j8 w' t% ?8 {+ M6 [! R& F
否则的话,把这个数据转发给距离最小的节点。
& h7 q" U: n" v) C- a收到数据的另一个节点,也采用上述过程进行处理(递归处理)。
- F' W1 W4 P( H$ p) @' y5 G获取数据% d; S0 D! ~/ p* S& b- V: v( Z
(以下只是大致原理,具体的协议实现可能会有差异)! J3 X4 @/ S8 h* Z* H' Y- i
当某个节点接收到查询数据的请求(key),它会先计算自己与 key 之间的“距离”;+ V/ B3 |4 o7 J) f) d2 O* y) Z
然后再计算它所知道的其它节点与这个 key 的距离。/ a" F5 Q, A& f4 K! Z
如果计算下来,自己与 key 的距离最小,那么就在自己这里找有没有 key 对应的 value。
$ r* \* a6 T0 C" T* g  D1 i. v有的话就返回 value,没有的话就报错。
* e3 `: L* H: ~' N2 h* C否则的话,把这个数据转发给距离最小的节点。
# S) x2 i3 ~- S收到数据的另一个节点,也采用上述过程进行处理(递归处理)。; E+ q; k( u5 S
三、以太坊中p2p通信的管理模块ProtocolManager& G4 c* I3 i1 k$ u3 O+ u$ V
/geth.go* n2 ^% }# s1 Y9 U5 }
// Start creates a live P2P node and starts running it.; @2 |2 [' ?% c( ]# P  y
func (n *Node) Start() error {- s6 d7 ^. G% S
   return n.node.Start()" v& J! V, G6 o, c+ c4 R" U9 [
}& T! ]& T3 J9 T+ T0 ]0 }
/*5 F/ \& ^/ J) v4 A2 z3 ^8 c7 C
   Protocol:容纳应用程序所要求的回调函数等.并通过p2p.Server{}在新连接建立后,将其传递给通信对象peer。) o5 U2 T1 b4 h3 Q5 Y
   Node.Start()中首先会创建p2p.Server{},此时Server中的Protocol[]还是空的;( h# Y, U$ k! E8 X( _
   然后将Node中载入的所有实现体中的Protocol都收集起来,
1 m. @6 v1 s8 k1 L' Q8 s. F   一并交给Server对象,作为Server.Protocols列表;然后启动Server对象,
9 S5 V' |3 k4 V, L& N   并将Server对象作为参数去逐一启动每个实现体。
  v1 `& |6 f1 \/ R8 C*/) T9 C* B# q  u  p) |2 k/ {
/node.go
1 [: J% i* S% Z! j. e3 U// Start create a live P2P node and starts running it.6 I, R: U6 J) W7 Q2 X
func (n *Node) Start() error {
* T; _- ]5 G! ]' f8 Z   
9 ^$ r$ K" N; V$ B   ...3 |4 E4 j2 i- C  I* s: i& X, i
   /*
+ Q0 ]( P* N) e           ...( x" I/ @1 K) x  N, T( e- [) y
           初始化serverConfig
$ [+ x1 v2 Y5 c" i  b8 x5 P6 v: {   */
+ z0 @4 q9 k2 D# f+ ]4 [   running := &p2p.Server{Config: n.serverConfig}6 t9 N0 f, s8 m: {  E2 B
   ...
& R5 }2 y7 l( E   // Gather the protocols and start the freshly assembled P2P server
4 ]3 z) s6 F0 t$ y  ~   for _, service := range services {, m1 L0 _. t5 j3 D) y1 W2 [3 P0 S
           running.Protocols = append(running.Protocols, service.Protocols()...)
$ |8 s) h& `/ E/ K" Y   }: W% p2 _# Y2 c' r- _3 a8 h% c0 g% g
   if err := running.Start(); err != nil { //见下面的(srv *Server)Start方法, p9 Y# d+ n: v9 q' |
           return convertFileLockError(err)
: F- U5 U" M$ A0 l( J! J% |; R   }; q( H  Q! F% b7 E- M
   // Start each of the services
+ a1 Q/ y. k- F; \* D3 d   started := []reflect.Type{}" ]% \2 M% h* {
   for kind, service := range services {; U4 y- }% [, K8 Y6 m+ H: {% e1 m7 A$ h
           // Start the next service, stopping all previous upon failure
- h8 }  F4 }! N5 \           //启动每个services通过下面的方法func (s *Ethereum) Start(srvr *p2p.Server) error {
' x0 b' V$ ?; H1 P! l           if err := service.Start(running); err != nil {
, i( ^4 b, M# M: R1 v                   for _, kind := range started {0 d6 V# ^3 W8 `' \0 z" B# X
                           services[kind].Stop()
0 s$ @+ Z7 m) c. K4 ^                   }* ]' _7 w5 C# F. l0 G
                   running.Stop()3 s+ |5 {+ P+ {' V
                   return err" T$ I" O% b% Y4 J( B, {1 }
           }4 u0 S) `$ D3 P! E3 N1 `
           ...
) D* V  R, \9 ^! Y   }
6 [9 j. B4 n4 f}$ h, F2 n5 c% C* \7 ?
// Start starts running the server.
  {) T( B' V& f, m2 w! z// Servers can not be re-used after stopping.0 n8 S2 R5 S( Q# ?% w
func (srv *Server) Start() (err error) {
$ C: K1 ?' ]$ g5 h4 k! Q4 Z   srv.lock.Lock()
  U' M" G* D( \# p7 i8 Q- t8 n6 w   //srv.lock为了避免多线程重复启动
; h  g0 t  A$ _5 O; @; d* _, ~( g   defer srv.lock.Unlock()
% C: q2 `  S! F) L& w6 G   if srv.running {
: F; h& G) W3 x6 X* `) `- Z6 V           return errors.New("server already running"): s( ]7 J$ x% l" U
   }
2 [" }4 {7 P5 d/ j4 K, D   srv.running = true$ O' k) L+ ~+ l. n7 M2 x
   srv.log = srv.Config.Logger- k0 ?" i% C  U+ g% j3 W* N/ N
   if srv.log == nil {" z/ ?' \1 u$ `
           srv.log = log.New()
6 g( u# D/ r7 l# l% k" V   }
7 C& E, R9 M# L   if srv.NoDial && srv.ListenAddr == "" {
( z( j6 R6 [: ?( R# Z, Y           srv.log.Warn("P2P server will be useless, neither dialing nor listening")
3 F3 i: W( n3 L, `# M   }+ a: k. O' |* w' l' U
   // static fields
& w; P% N' `/ e* V  _   if srv.PrivateKey == nil {* w: N) Z8 C1 `
           return fmt.Errorf("Server.PrivateKey must be set to a non-nil key")6 A6 O. \0 r4 a& l8 s; f! v0 L
   }  U2 l/ [- U' p) W- l' q  a1 A6 d
   //newTransport使用了newRLPX使用了rlpx.go中的网络协议。
9 O+ U: J; e1 E  l   if srv.newTransport == nil {
, M- l9 P' e* n           srv.newTransport = newRLPX
$ I9 f) w+ h$ E- d( d! G   }( B5 f6 H1 c# N( h
   if srv.Dialer == nil {
+ y- {1 H9 s. P, s           srv.Dialer = TCPDialer{&net.Dialer{Timeout: defaultDialTimeout}}. A8 K8 x+ K' g6 o3 [
   }
* s: F7 j6 t: |: |. N" Q' `" j   srv.quit = make(chan struct{})  P; d5 ?: Q7 c5 l! E
   srv.addpeer = make(chan *conn)7 y, b6 }' l7 c% C
   srv.delpeer = make(chan peerDrop)
7 @( \6 k+ y* D; o   srv.posthandshake = make(chan *conn)
3 a- V5 _& ?7 ^) B   srv.addstatic = make(chan *enode.Node)
- v! q/ h! o5 g9 X2 [7 I   srv.removestatic = make(chan *enode.Node)
+ R; o1 f* A2 t. J: d( k/ V   srv.addtrusted = make(chan *enode.Node)0 P* U+ g% U0 @% o+ V4 G" {
   srv.removetrusted = make(chan *enode.Node)
3 Q. `! P, _' U, H% ?   srv.peerOp = make(chan peerOpFunc)
+ c3 U8 }' d4 J2 M/ ~& b$ U   srv.peerOpDone = make(chan struct{})
( X: W5 q, T4 o, N; P& I   //srv.setupLocalNode()这里主要执行握手, k6 l/ u) V/ e# J3 i$ v
   if err := srv.setupLocalNode(); err != nil {) O6 f5 {7 e/ g2 B8 U+ g. L% w+ S
           return err* R( \7 p- h! \* a4 d% |
   }7 \3 Z. l: w8 o9 F0 c
   if srv.ListenAddr != "" {
! w- ?1 G; W, z1 }1 ^           //监听TCP端口-->用于业务数据传输,基于RLPx协议)/ P) y1 I4 j7 l/ z2 V- L+ P
           //在setupListening中有个go srv.listenLoop()去监听某个端口有无主动发来的IP连接/ t1 |: h1 h; j- a& Q3 q) o7 ?" w; d
           if err := srv.setupListening(); err != nil {
# s/ r( I/ n* n5 a; x7 \' ]                   return err
! E# k5 w) Q' D           }
, r1 L$ P: l4 G# [7 G   }
6 j3 [2 N! \) x+ q5 s2 }/ d   //侦听UDP端口(用于结点发现内部会启动goroutine)
& L# h# \2 s5 `9 s. s5 b4 K; p   if err := srv.setupDiscovery(); err != nil {9 r0 R  s! ]9 \( }. ^" X
           return err
' B/ \$ \+ A3 G# B( [! X   }
: d0 v5 d( j( v0 c5 j   dynPeers := srv.maxDialedConns()+ g1 ]6 I3 Z# ~# ^# y
   dialer := newDialState(srv.localnode.ID(), srv.StaticNodes, srv.BootstrapNodes, srv.ntab, dynPeers, srv.NetRestrict)" g" O. H0 n/ i
   srv.loopWG.Add(1)
& }0 I) t" w7 s7 F& ?! T. o. _   // 启动新线程发起TCP连接请求
$ w7 |9 _# K4 m" F7 Y6 b5 k   //在run()函数中,监听srv.addpeer通道有没有信息如果有远端peer发来连接请求,0 V8 S4 E- ?6 A
   //则调用Server.newPeer()生成新的peer对象,并把Server.Protocols全交给peer。* m; h2 F0 H& o  Q% `+ l- T1 N
   /*; u  g8 R/ ~, i$ M) Z
   case c :=  0 {
+ r5 t; l/ I! b" A8 c, {8 t           if s.config.LightPeers >= srvr.MaxPeers {
! [- E* g  w  g                   return fmt.Errorf("invalid peer config: light peer count (%d) >= total peer count (%d)", s.config.LightPeers, srvr.MaxPeers)
: P% y, n2 [: h4 h$ F5 J0 ?! [7 T* E           }
8 m+ o1 E" |. o  o           maxPeers -= s.config.LightPeers. r1 Z) y# j8 I- Y5 J
   }+ {, M2 g) W- ~, W
   // Start the networking layer and the light server if requested* L2 K- S1 q- D7 b% f1 G; r
   s.protocolManager.Start(maxPeers)
& y6 Q, d& v" t   if s.lesServer != nil {1 T' \( `1 G+ J
           s.lesServer.Start(srvr)$ \9 i! e0 G+ h
   }! i- M0 b- [4 W
   return nil" Z" A- A+ |9 U
}* j; Y" k# E7 l1 ~+ m: l1 m
/eth/handler.go# H$ w5 y7 V9 p! P5 Z
type ProtocolManager struct {
# y: x4 ^9 j: G+ w. Z   networkID uint64, `* H" C1 p# N
   fastSync  uint32 // Flag whether fast sync is enabled (gets disabled if we already have blocks)0 U, f" {/ P3 T$ F" I* Z" B/ o
   acceptTxs uint32 // Flag whether we're considered synchronised (enables transaction processing)
' O' O# ^5 e6 _# \9 _- O   txpool      txPool0 z  U( D0 o! n7 d; Q- w8 c
   blockchain  *core.BlockChain6 e8 ~! G3 R3 J( a- g  A2 Y& O
   chainconfig *params.ChainConfig
: b& L2 a1 g9 u   maxPeers    int
, j5 j5 s" ?$ w: R: I- w   //Downloader类型成员负责所有向相邻个体主动发起的同步流程。+ }* r) e/ }% Q2 H! Q1 T
   downloader *downloader.Downloader
9 t3 l2 D) f% f- \% g9 J   //Fetcher类型成员累积所有其他个体发送来的有关新数据的宣布消息,并在自身对照后做出安排
" V1 }1 S" W9 o! N# b9 U   fetcher    *fetcher.Fetcher; u" W" s$ Q5 ~/ C
   //用来缓存相邻个体列表,peer{}表示网络中的一个远端个体。
6 j3 A$ a% ~) I   peers      *peerSet$ g7 v9 z( \  ]1 q
   SubProtocols []p2p.Protocol
; q3 }  G  u0 }9 _  n   eventMux      *event.TypeMux
+ U) U; s$ \" g$ z  b   txsCh         chan core.NewTxsEvent
# R/ M" p) T7 s1 M4 Y- l- G   txsSub        event.Subscription
/ `) V: D3 Z$ O# S8 }# |  w, u, _$ A   minedBlockSub *event.TypeMuxSubscription) e+ v+ ~) e! n( }8 V
   
" W0 V  R& T, P& S1 z! D   //通过各种通道(chan)和事件订阅(subscription)的方式,接收和发送包括交易和区块在内的数据更新。
+ i  {/ K: o7 B2 X7 x+ M   //当然在应用中,订阅也往往利用通道来实现事件通知。
/ D- D/ ?! a  r6 `# A   // channels for fetcher, syncer, txsyncLoop
" Y/ ~; _0 V; `6 t& T   newPeerCh   chan *peer6 O: d9 F- t. c
   txsyncCh    chan *txsync" ^; c1 k' \/ z5 ~, d2 n' O$ _
   quitSync    chan struct{}
5 ?& N5 r# |9 f& c   noMorePeers chan struct{}* ], u% y, S. G9 A/ G
   // wait group is used for graceful shutdowns during downloading! c2 |- e6 n& X8 O
   // and processing
/ d' u/ M* ?0 l0 Y7 [( D   wg sync.WaitGroup
* e' r& f9 f: w5 G* m8 F- C}
5 z1 @5 f+ U: m9 p   Start()函数是ProtocolManager的启动函数,它会在eth.Ethereum.Start()中被主动调用。, o8 F2 U, R& w$ {; K1 T
ProtocolManager.Start()会启用4个单独线程(goroutine,协程)去分别执行4个函数," T5 U5 f0 {0 _6 s7 k' O, H4 p
这也标志着该以太坊个体p2p通信的全面启动。
" z$ U2 t, y) x& }' Q- q" efunc (pm *ProtocolManager) Start(maxPeers int) {2 l- n- Q+ @, |  s- W. t% H* v( l1 C
   pm.maxPeers = maxPeers
' F/ M$ J& ], p   // broadcast transactions
% f1 c6 Z4 |5 N. n) V6 N+ q   //广播交易的通道。 txsCh会作为txpool的TxPreEvent订阅通道。: r$ Q( n6 Q3 m$ {- H; B" J
   //txpool有了这种消息会通知给这个txsCh。 广播交易的goroutine会把这个消息广播出去。
2 X8 W7 E1 W: b+ m' U! N   pm.txsCh = make(chan core.NewTxsEvent, txChanSize)
, f9 L; B2 }1 ~' {: e  ^6 [4 M   //订阅交易信息/ j5 x' ^2 A% M$ \( r1 t- W
   pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh)
* d" S0 k; D- [- [# {8 s) q   
1 v7 D0 |" @  ]4 ]$ R   go pm.txBroadcastLoop()6 h6 n3 w* X4 }$ N1 Y2 d
   //订阅挖矿消息。当新的Block被挖出来的时候会产生消息; ]5 y8 W* [: ^
   // broadcast mined blocks: R4 {3 H8 u9 P
   pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
8 |4 n7 |; h+ g/ d1 J- D4 X% W   //挖矿广播 goroutine 当挖出来的时候需要尽快的广播到网络上面去。
8 z$ g* J& h9 B6 M   go pm.minedBroadcastLoop()
; D" o. P1 z% u% A, F3 k   // start sync handlers
( u5 P4 U$ N  Z% v   // 同步器负责周期性地与网络同步,下载散列和块以及处理通知处理程序。/ A% G9 ~4 f& G
   go pm.syncer()
1 a7 a+ E3 B$ F   // txsyncLoop负责每个新连接的初始事务同步。 当新的peer出现时,
6 b& i9 ?6 C+ V   // 转发所有当前待处理的事务。为了最小化出口带宽使用,我们一次只发送一个小包。
; x8 h! L2 F( k/ b3 A7 h2 N) K   go pm.txsyncLoop()
* h0 f3 a) W: P+ n6 b}) j( o* {& ~, j& c* `: O% K2 @
//txBroadcastLoop()会在txCh通道的收端持续等待,一旦接收到有关新交易的事件,7 p* P( e/ ~4 `7 ^
   //会立即调用BroadcastTx()函数广播给那些尚无该交易对象的相邻个体。
+ p" S, m8 O- E7 k0 F//------------------go pm.txBroadcastLoop()-----------------------+ r0 `3 {$ n9 z1 q. U/ I" j4 {& Z) [
func (pm *ProtocolManager) txBroadcastLoop() {
( d/ U( r7 |6 a5 j   for {  z2 M/ ~& R# ?* ?" O
           select {
6 p$ q. \' Z) a$ ?/ N9 {0 Z! j           case event := = pm.maxPeers && !p.Peer.Info().Network.Trusted {
" o! C8 v6 Q9 g& q$ G. {4 c; {           return p2p.DiscTooManyPeers
2 u. c$ V" E- d& x8 e% |   }
+ X5 b$ C) Z/ \: [& B# k/ `* ^   p.Log().Debug("Ethereum peer connected", "name", p.Name())$ x/ i1 }2 f, `, @
   // Execute the Ethereum handshake- x. S& O5 o( Z9 U% p/ f& p% X
   var (
# O/ A; V& }' }/ D3 [4 Z           genesis = pm.blockchain.Genesis()
# ?& v+ P: f2 P6 m           head    = pm.blockchain.CurrentHeader(); W4 l. J: [! n: U* H+ ?' j! e
           hash    = head.Hash()" h% }% U$ J: g+ }
           number  = head.Number.Uint64()
* z1 e9 B( z) ~. U3 ~           td      = pm.blockchain.GetTd(hash, number)' X" _9 {3 y. J: }5 O6 e
   )
! n, `9 V) d6 T* y   //握手,与对方peer沟通己方的区块链状态
% r+ f) s. z  |3 E* J2 D% ^' J# j6 E   if err := p.Handshake(pm.networkID, td, hash, genesis.Hash()); err != nil {
6 U" i/ a3 N3 {           p.Log().Debug("Ethereum handshake failed", "err", err)
7 k3 P% {& c. Q9 P$ U* ?           return err5 P0 E4 p1 [4 o& C, k( e* F
   }: a* [5 m) H1 L) @6 j
   //初始化一个读写通道,用以跟对方peer相互数据传输。
3 t% V# w; G% Z) v) W0 P2 `   if rw, ok := p.rw.(*meteredMsgReadWriter); ok {: m  @# ]8 P/ C. e  ^9 w% x
           rw.Init(p.version)# E1 y& s2 {3 l5 C* R
   }* r9 B) C1 e3 |2 b* \. w
   // Register the peer locally
7 ]' a/ ~0 J( R( C   //注册对方peer,存入己方peer列表;只有handle()函数退出时,才会将这个peer移除出列表。" E# N$ ~% p( w9 P, R
   if err := pm.peers.Register(p); err != nil {; e' R' T# [- A8 b: e# x- |% \  B
           p.Log().Error("Ethereum peer registration failed", "err", err)) u  O$ ^; [; Y
           return err
4 e, Z6 S& ~7 N) x   }
: x3 B! P5 b9 h4 c) g   defer pm.removePeer(p.id)
; C5 D2 {1 \# Z1 M3 q   //Downloader成员注册这个新peer;Downloader会自己维护一个相邻peer列表。2 K$ e: h7 ]7 S  i% I
   // Register the peer in the downloader. If the downloader considers it banned, we disconnect
0 X" ~/ [8 W) e+ a# X$ U2 o" p   if err := pm.downloader.RegisterPeer(p.id, p.version, p); err != nil {
* F& @: [4 j2 |( J% `5 I           return err
0 L5 A% C( L. ]& y! U  m   }7 I" V% B0 d# T+ ?2 u
   // Propagate existing transactions. new transactions appearing
; J  ?- h! b$ C" H0 V   // after this will be sent via broadcasts.* f" p3 S2 H1 s9 V& l& u0 H0 D- V
   /*
( E" _) g" l- \( ]/ h   调用syncTransactions(),用当前txpool中新累计的tx对象组装成一个txsync{}对象,- F9 l% ?8 G7 c/ s; J
   推送到内部通道txsyncCh。还记得Start()启动的四个函数么? 其中第四项txsyncLoop()# l3 b; ~) D+ U5 V
   中用以等待txsync{}数据的通道txsyncCh,正是在这里被推入txsync{}的。7 B9 D- G( l# C; \; W, I% }
   */. [$ ~' C. p9 [/ k' P: Y) v' r% _
   pm.syncTransactions(p)
3 s: ]7 q! A8 j  B  h5 j0 B   // If we're DAO hard-fork aware, validate any remote peer with regard to the hard-fork# n* P; r8 w9 k) `
   if daoBlock := pm.chainconfig.DAOForkBlock; daoBlock != nil {/ G4 H. k" J# g. Q. B
           // Request the peer's DAO fork header for extra-data validation
; X7 F+ \9 Z' i: q           if err := p.RequestHeadersByNumber(daoBlock.Uint64(), 1, 0, false); err != nil {
9 J4 ?$ @1 a* `' _' v6 H/ w                   return err5 j1 M! K9 o3 }
           }! o, c, G0 r- v! [/ y
           // Start a timer to disconnect if the peer doesn't reply in time
; Q+ O# A1 m6 q2 v; A7 }* Q% w           p.forkDrop = time.AfterFunc(daoChallengeTimeout, func() {5 t# H3 S5 m) q! f' y3 _/ U# H. H
                   p.Log().Debug("Timed out DAO fork-check, dropping")
' g) J0 q# n$ A0 g4 B+ [6 ~  `                   pm.removePeer(p.id)
0 s& k5 g6 U2 j# l# i! H           })
: Z2 f# t# r# i           // Make sure it's cleaned up if the peer dies off
7 V+ t2 o' W- q: J; X* m$ W           defer func() {/ U! i. Q: Y( ~4 [( _) k5 B/ E
                   if p.forkDrop != nil {' p6 b5 K. ^+ J" c
                           p.forkDrop.Stop()1 h, |, y+ B3 {  F6 `$ `" g
                           p.forkDrop = nil
3 n* g# W( M0 |* G                   }
0 n. f6 b0 {) }/ f( L6 Q           }()
# v- E" a' y& b( s   }. S. E" W! @$ f4 O- w' D
           //在无限循环中启动handleMsg(),当对方peer发出任何msg时," C1 @7 ^7 n8 I/ _  ^! _* q; O( ~
           //handleMsg()可以捕捉相应类型的消息并在己方进行处理。2 |# u) I9 r" F
   // main loop. handle incoming messages.
7 F% w# c1 \) j$ O   for {
5 Z. t9 c, N* N           if err := pm.handleMsg(p); err != nil {
# r0 m  F! d) E                   p.Log().Debug("Ethereum message handling failed", "err", err)
8 I& P3 u, X7 w, W                   return err
) \8 F0 e  L' M! M9 ?4 e9 r/ x           }
& z+ Z5 O" q6 E- n0 B9 o. A   }/ e2 F8 h9 N$ G0 ~9 C$ u5 F
}
BitMere.com 比特池塘系信息发布平台,比特池塘仅提供信息存储空间服务。
声明:该文观点仅代表作者本人,本文不代表比特池塘立场,且不构成建议,请谨慎对待。
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

成为第一个吐槽的人

Mohammad61417 小学生
  • 粉丝

    0

  • 关注

    0

  • 主题

    2