Hi 游客

更多精彩,请登录!

比特池塘 区块链技术 正文

深入区块链以太坊源码之p2p通信

Mohammad61417
281 0 0
一、p2p网络中分为有结构和无结构的网络
$ t3 d* Z9 L! L) z) S5 @7 X无结构化的
# D' v- e+ [8 Z, D1 x这种p2p网络即最普通的,不对结构作特别设计的实现方案。7 ~6 p% ]0 {6 z
优点是结构简单易于组建,网络局部区域内个体可任意分布,3 |, M% v3 K2 h4 f0 M4 _
反正此时网络结构对此也没有限制;特别是在应对大量新个体加
7 f0 c0 O% i$ _$ Z$ R入网络和旧个体离开网络(“churn”)时它的表现非常稳定。1 N/ ]7 r9 x( V8 k- b2 v- H
缺点在于在该网络中查找数据的效率太低,因为没有预知信息,
/ N9 r! q& N! g; T' }* `# B所以往往需要将查询请求发遍整个网络(至少大多数个体),
. S1 ^/ N) _8 H+ d这会占用很大一部分网络资源,并大大拖慢网络中其他业务运行。
" p3 R3 Q, L% M. x4 ?& f6 h9 A结构化的:
: M( z& ?" J& e, h) i' y这种p2p网络中的个体分布经过精心设计,主要目的是为了提高查询数据的效率,( Q% h6 J! n2 b
降低查询数据带来的资源消耗。
& q* |+ l9 M( \6 B% S$ T4 f以太坊采用了不需要结构化的结构,经过改进的非结构化(比如设计好相邻个体列表peerSet结构): r! V( I4 D1 a- D1 S- @5 F
网络模型可以满足需求;
; n4 Z' }) x( G3 P8 g3 s二、分布式hash表(DHT): S- I( p% X5 G: I1 K) \
保存数据+ P- U% D, c4 {: ~& ?6 O( s
(以下只是大致原理,具体的协议实现可能会有差异)
( f" i$ W8 N8 m: s! a6 N. ^* z4 G当某个节点得到了新加入的数据(K/V),它会先计算自己与新数据的 key 之间的“距离”;/ P3 a* o9 o, t% {! Z
然后再计算它所知道的其它节点与这个 key 的距离。
6 {/ r8 [: w) B如果计算下来,自己与 key 的距离最小,那么这个数据就保持在自己这里。
+ o7 }: O2 J% J# S2 L否则的话,把这个数据转发给距离最小的节点。
) t" s( q/ Z) D8 |: B* K& N收到数据的另一个节点,也采用上述过程进行处理(递归处理)。
3 P: b4 I  |/ H  G& E5 h% Q. a获取数据5 H2 M, J: s3 R4 P* ]8 d% W
(以下只是大致原理,具体的协议实现可能会有差异)
* H% ~2 L% T, y8 p- S3 c4 T当某个节点接收到查询数据的请求(key),它会先计算自己与 key 之间的“距离”;3 H" s' R' w2 l
然后再计算它所知道的其它节点与这个 key 的距离。, H8 y: S1 H3 Y
如果计算下来,自己与 key 的距离最小,那么就在自己这里找有没有 key 对应的 value。
0 q* _* T& E/ w6 N0 v有的话就返回 value,没有的话就报错。
, ^0 t& S  k' W: |否则的话,把这个数据转发给距离最小的节点。
$ E- E7 _4 ]; g8 j" z收到数据的另一个节点,也采用上述过程进行处理(递归处理)。" i. L- P3 ]# ~+ `5 T
三、以太坊中p2p通信的管理模块ProtocolManager; h8 ~: @0 V! L5 Y3 \
/geth.go
! q1 y  c3 L% q  C// Start creates a live P2P node and starts running it.9 P1 s6 K- P' Y! w, h
func (n *Node) Start() error {0 s# @1 X& Z# R! x3 w7 R7 F
   return n.node.Start()
8 t: X4 r+ o7 i0 @, l' }' y}' h4 c% H& f- Z6 u
/*
4 Z& l' B. c0 n4 c6 Z: n8 ^2 I+ ~   Protocol:容纳应用程序所要求的回调函数等.并通过p2p.Server{}在新连接建立后,将其传递给通信对象peer。% K/ I. j# z% g1 p! C( l' u
   Node.Start()中首先会创建p2p.Server{},此时Server中的Protocol[]还是空的;* b0 y+ g  V) K* L$ L) E) Q7 l
   然后将Node中载入的所有实现体中的Protocol都收集起来,
+ W# L$ _4 K5 J; R! Z, T   一并交给Server对象,作为Server.Protocols列表;然后启动Server对象,
: Z) w( C9 f4 u" d5 F   并将Server对象作为参数去逐一启动每个实现体。
3 p* D! u3 u) [* R7 b7 n  ~0 Z*/4 r0 N) O& ?: {5 Z- f
/node.go8 S) G! L7 t2 o9 c: Q  k+ v, H7 H
// Start create a live P2P node and starts running it.
% b% e, H7 m, p7 |func (n *Node) Start() error {6 W8 i) T# [7 w, ]2 I
   1 P- t4 _% E- i
   ...2 n( `/ p" P: l& b
   /*
; o2 ]) |, L3 C- N0 b: o  R. ]4 r           ...
8 Q$ F. I1 A& @, V$ G. S8 j           初始化serverConfig: d7 h+ x! q/ \: t8 N
   *// [- `. }! E  T: ]/ v
   running := &p2p.Server{Config: n.serverConfig}
! \8 {! j) G8 b" K. Q7 F   ...
1 t* J$ i7 X; h- ?   // Gather the protocols and start the freshly assembled P2P server1 {# W2 g0 _$ _
   for _, service := range services {& X& j% H0 A- l' r( a9 m. C  u
           running.Protocols = append(running.Protocols, service.Protocols()...)
, ~& Q0 F3 j2 ~- D' Q. [   }
' @$ t+ H" A% Y9 f8 f1 n   if err := running.Start(); err != nil { //见下面的(srv *Server)Start方法
% }( E7 _) B0 R; M1 W4 S           return convertFileLockError(err)% W( I7 E" I; Y% g$ J
   }
4 k0 D+ g; U+ {: G# d: T  {   // Start each of the services5 `4 R; E5 q. {5 r8 A3 J3 M
   started := []reflect.Type{}* J+ `9 x2 x. n* K) D5 n0 x, t
   for kind, service := range services {
; k+ ~9 z* A4 J$ ?) k. @$ D  H9 Z1 k           // Start the next service, stopping all previous upon failure! S. k. Y0 [! e8 z
           //启动每个services通过下面的方法func (s *Ethereum) Start(srvr *p2p.Server) error {: D& t. A0 A3 b
           if err := service.Start(running); err != nil {
& ?8 [! i! c/ [; \$ {4 i                   for _, kind := range started {( ]* H+ ~9 H3 s( \
                           services[kind].Stop()
& x: Z. m1 ^, {5 Z- I& q( k                   }: l+ d  r9 o% @
                   running.Stop()0 Y# e: r9 A& |9 F- u0 T( K
                   return err
+ H: R. O# J- P" ^/ E           }1 Z# x6 K) {# u& y8 o
           ...  T8 X+ T' o2 N" B! O7 l0 F! W  f+ o; _
   }
6 w% C2 T9 [( f3 ?4 n$ N3 x}
. ]5 x- a. U% r) h// Start starts running the server.
0 V# _& w# A7 M+ g' C// Servers can not be re-used after stopping.
  g; @) K) k0 M2 {func (srv *Server) Start() (err error) {
1 c- f# Y( D* I9 x" ^0 n   srv.lock.Lock()7 w& D, d/ G1 }7 |$ G
   //srv.lock为了避免多线程重复启动
+ b* C' Q) k3 l% `* F" U   defer srv.lock.Unlock()/ G) p1 Q$ ~4 y
   if srv.running {+ r, K1 W7 e5 D, Q5 K" S' R
           return errors.New("server already running"); a& u, B( P* u3 ^' q6 F% f
   }  F& l0 `7 l( Y. Z
   srv.running = true
& A( {- l7 T0 _' V6 t   srv.log = srv.Config.Logger
  Z( @$ I, N/ D( z   if srv.log == nil {. @7 |# _' l! n1 ?& J. c8 l
           srv.log = log.New()
/ j; p# |1 Q! s: b, {' [/ m   }3 i% N) f" R0 p$ }
   if srv.NoDial && srv.ListenAddr == "" {2 m& _; K0 {* a; I0 s& x" V, H
           srv.log.Warn("P2P server will be useless, neither dialing nor listening")6 T/ n/ v! B2 i& {% j$ x: W
   }5 N# O2 k& O7 n0 }, I
   // static fields3 |3 a. q6 r0 O; Y
   if srv.PrivateKey == nil {6 b9 T- b6 s8 Q  I
           return fmt.Errorf("Server.PrivateKey must be set to a non-nil key"), l$ d( V% E$ Y4 K: F, B
   }
, o- w- U6 q5 X7 H- o8 r   //newTransport使用了newRLPX使用了rlpx.go中的网络协议。
, l' _; i5 u: N7 j   if srv.newTransport == nil {
/ F, Q  A& H3 I2 ~9 Y. C; }           srv.newTransport = newRLPX* C  ^* `  F% u6 ]2 `
   }
5 j0 a8 ]: S+ Y7 {9 s5 ?   if srv.Dialer == nil {
$ e: {* a! }  I' g1 C; e& Z           srv.Dialer = TCPDialer{&net.Dialer{Timeout: defaultDialTimeout}}% R+ O  C) b# ~1 b" h# W
   }5 J' s7 G* o' q  q: z5 o, Z
   srv.quit = make(chan struct{})5 L/ }( k8 ^- X
   srv.addpeer = make(chan *conn)8 G: {4 a+ H- R: x3 a0 C' [
   srv.delpeer = make(chan peerDrop)
* Z( }% ]6 j, P   srv.posthandshake = make(chan *conn)
7 B& h) T8 c$ s' a& c$ m   srv.addstatic = make(chan *enode.Node)
- N+ J' k" l/ c   srv.removestatic = make(chan *enode.Node)% F% o* e0 i% G4 m# n
   srv.addtrusted = make(chan *enode.Node), R- P& @2 }9 X# |6 T4 k. L
   srv.removetrusted = make(chan *enode.Node)3 k0 C! v1 o, o* Y1 H- _% t
   srv.peerOp = make(chan peerOpFunc)
7 l* K2 l7 _! O   srv.peerOpDone = make(chan struct{})
  g' S7 N0 ?. S   //srv.setupLocalNode()这里主要执行握手7 G% W9 D4 D4 l2 _# A" f
   if err := srv.setupLocalNode(); err != nil {! t# m# y/ P$ W9 B" D
           return err1 i" \: @# E/ U
   }! ~, {2 k2 h/ D# [) @+ n% z9 S
   if srv.ListenAddr != "" {
, u' ]7 x8 I; r4 {& M: \           //监听TCP端口-->用于业务数据传输,基于RLPx协议)
. h8 `2 J* m$ L9 v/ i$ G- l- ^* \           //在setupListening中有个go srv.listenLoop()去监听某个端口有无主动发来的IP连接
, |# c6 x1 F2 A) t- f& N           if err := srv.setupListening(); err != nil {
- D0 O% T$ \3 L; ^8 f                   return err
) m3 V) h+ m/ ?+ _  O3 e" I           }- @& ?3 P% F2 s0 ^) i! C) R0 ^& ?; j
   }
! A$ S- I: g9 e% e   //侦听UDP端口(用于结点发现内部会启动goroutine)1 e  x0 a2 I- o, |
   if err := srv.setupDiscovery(); err != nil {
/ O/ ^8 }% R3 Y           return err% S2 e8 F4 p3 U# I2 g" l0 e
   }
' @. W) u4 W$ {4 R   dynPeers := srv.maxDialedConns()
% T% H& I8 c0 S( U3 o   dialer := newDialState(srv.localnode.ID(), srv.StaticNodes, srv.BootstrapNodes, srv.ntab, dynPeers, srv.NetRestrict)  d" \/ X8 z. S0 r  Q
   srv.loopWG.Add(1)
. x9 \8 s6 `0 Y, m   // 启动新线程发起TCP连接请求7 c6 f  H9 Z- h9 o
   //在run()函数中,监听srv.addpeer通道有没有信息如果有远端peer发来连接请求,
+ @2 x7 Y) `6 @5 a; R5 ?& [/ x   //则调用Server.newPeer()生成新的peer对象,并把Server.Protocols全交给peer。+ z2 K9 K1 Y! _; T" S! K% ?
   /*
  b, o" @0 r* M3 F) l. L   case c :=  0 {& b5 e, G! W2 K. t, x& Y7 o2 v
           if s.config.LightPeers >= srvr.MaxPeers {
( w% i+ |: B+ `+ P% `4 Z                   return fmt.Errorf("invalid peer config: light peer count (%d) >= total peer count (%d)", s.config.LightPeers, srvr.MaxPeers)
) r& m  |3 D8 o( L' m           }( w' A4 {$ R9 ^: K- ?- H+ b1 b! x
           maxPeers -= s.config.LightPeers
( O# {2 e& S4 ~+ F   }
. g5 E- z' j9 y- S6 \2 T. \. ~1 X+ _   // Start the networking layer and the light server if requested: ^0 F' O1 w: p- J
   s.protocolManager.Start(maxPeers)1 b3 [9 N. Y: J7 \6 w* c! N
   if s.lesServer != nil {; u: Z6 D' C. E! n) Q: E' I
           s.lesServer.Start(srvr)
0 Q7 E' @- \% I   }$ ^; n% i. E' Z* z- T4 ?
   return nil
. M3 e1 c+ Z+ U' W; L. h+ ?5 @1 K8 Z}0 U. T+ k- W0 A+ v  Z
/eth/handler.go9 V  P; |/ X' w3 j; F
type ProtocolManager struct {( z# Z4 l3 }) j5 M8 M
   networkID uint64
/ f8 v" L& W$ U+ f9 L   fastSync  uint32 // Flag whether fast sync is enabled (gets disabled if we already have blocks)
8 I7 {/ s- |' X& N5 U( ]   acceptTxs uint32 // Flag whether we're considered synchronised (enables transaction processing)) U8 S* V" V/ {1 \6 B$ _+ `, d  w7 H+ H2 s
   txpool      txPool
' ?6 E0 N& I; U, W   blockchain  *core.BlockChain5 Z* D! Z3 \! T; Y
   chainconfig *params.ChainConfig1 _  G# \- s$ Y- U' ^6 A4 j
   maxPeers    int9 z) A3 T( a6 Q- b- r3 b& F
   //Downloader类型成员负责所有向相邻个体主动发起的同步流程。
( N" B4 b5 X5 u; m# f   downloader *downloader.Downloader
: l4 w3 u# ?! o# o) W   //Fetcher类型成员累积所有其他个体发送来的有关新数据的宣布消息,并在自身对照后做出安排
- \6 X% W# K: N5 g: C- l/ P   fetcher    *fetcher.Fetcher
, @  ~8 A  k0 h( k& \7 ^. s) p   //用来缓存相邻个体列表,peer{}表示网络中的一个远端个体。
+ D. l: i% `1 Q: M: O   peers      *peerSet
8 c. b, d% C0 `   SubProtocols []p2p.Protocol* F# w# h7 z& k% Y
   eventMux      *event.TypeMux
/ @$ r+ ^6 h8 i" S4 o8 o   txsCh         chan core.NewTxsEvent
7 ^6 T9 h" l; @% r4 C   txsSub        event.Subscription9 x4 \+ c, h2 O; c
   minedBlockSub *event.TypeMuxSubscription8 g; V. H1 p/ t% `2 g1 S* k
   ! c0 j7 P& a% }/ R( u
   //通过各种通道(chan)和事件订阅(subscription)的方式,接收和发送包括交易和区块在内的数据更新。
7 l6 z2 _# h1 m( h- L   //当然在应用中,订阅也往往利用通道来实现事件通知。  E; t' [( }3 P
   // channels for fetcher, syncer, txsyncLoop; e! i0 g+ I* e9 F+ W2 y
   newPeerCh   chan *peer2 @. q$ I$ U6 s. z- Z
   txsyncCh    chan *txsync
' N$ M; l- [, n8 R9 C9 K0 C   quitSync    chan struct{}
1 o: c; G1 M( f+ A! y   noMorePeers chan struct{}  C7 _4 L' [: h! ^; \& N
   // wait group is used for graceful shutdowns during downloading
/ E0 c7 m, a/ v/ |4 G   // and processing% Z/ s5 j( s: M" u
   wg sync.WaitGroup
, k& D( I$ p  p" K/ \: s}3 ?8 S" A6 B" u4 @
   Start()函数是ProtocolManager的启动函数,它会在eth.Ethereum.Start()中被主动调用。
/ v  d, k) T+ x6 iProtocolManager.Start()会启用4个单独线程(goroutine,协程)去分别执行4个函数,9 o. ?- X2 L/ {4 P/ C9 s
这也标志着该以太坊个体p2p通信的全面启动。
5 L; z' o3 g% Z4 Efunc (pm *ProtocolManager) Start(maxPeers int) {
0 X; [  n4 d0 J. m3 B: Z   pm.maxPeers = maxPeers; `6 H) a3 H+ Z; u8 s2 q
   // broadcast transactions
0 p7 }6 a2 h8 v  ?" {$ D1 g% N   //广播交易的通道。 txsCh会作为txpool的TxPreEvent订阅通道。
2 `, J- Q2 t( ]. b9 Y3 r   //txpool有了这种消息会通知给这个txsCh。 广播交易的goroutine会把这个消息广播出去。
7 Z7 Z* c; a3 s( s  X/ M0 [   pm.txsCh = make(chan core.NewTxsEvent, txChanSize); C/ g& d3 v9 B9 c0 ?7 u
   //订阅交易信息
- w6 ^3 E% G+ y2 I0 ~   pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh)& `* M6 r# |7 y
   ) G4 @# i  z% Z
   go pm.txBroadcastLoop()
5 E6 V% H- m% {" O8 Y) z   //订阅挖矿消息。当新的Block被挖出来的时候会产生消息
. ?; w( m: v' d/ N0 t* {/ L   // broadcast mined blocks
$ C7 R7 ?. k) S. f6 N9 D   pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})2 O8 X$ N4 y4 s& F* L+ t, l
   //挖矿广播 goroutine 当挖出来的时候需要尽快的广播到网络上面去。, x- J( S# T! y9 d- N* l2 Z  I
   go pm.minedBroadcastLoop()
; C% x2 _( v( }) o- W2 ^' k   // start sync handlers
% S7 ?" E) N/ [2 x   // 同步器负责周期性地与网络同步,下载散列和块以及处理通知处理程序。) \1 _! j: Q1 Y; U- Z% q8 X
   go pm.syncer()
6 I0 w9 a0 \& q% o2 O8 q+ }2 i5 _   // txsyncLoop负责每个新连接的初始事务同步。 当新的peer出现时,1 p4 T+ l/ k$ ]# Q3 R; c2 b4 r# a
   // 转发所有当前待处理的事务。为了最小化出口带宽使用,我们一次只发送一个小包。! ]6 J7 K: W+ Q: ^* ?$ b2 D$ O
   go pm.txsyncLoop()" o* a$ |" n- k
}- j: m5 l! x7 c
//txBroadcastLoop()会在txCh通道的收端持续等待,一旦接收到有关新交易的事件,
) u( u+ z% M* I- v2 O* V8 @   //会立即调用BroadcastTx()函数广播给那些尚无该交易对象的相邻个体。5 I3 V: t, m( F" t& Y) T9 \! v1 t) `
//------------------go pm.txBroadcastLoop()-----------------------( V% p4 `1 D" B: ^$ `
func (pm *ProtocolManager) txBroadcastLoop() {" l7 l. P9 p; r! @6 B  o
   for {2 }  w4 A+ i& Q3 N; S
           select {
  u8 V# j: {; D$ J! Q' @5 ~           case event := = pm.maxPeers && !p.Peer.Info().Network.Trusted {; Y& l& G( V( D) \* u8 R
           return p2p.DiscTooManyPeers" z+ X: T1 s* i7 @
   }
4 d  N6 {* G) b& N   p.Log().Debug("Ethereum peer connected", "name", p.Name())
' Q6 K; P' A/ c& Z* ?! o   // Execute the Ethereum handshake
7 N- v" }, Z1 f5 {- |% C   var (. y! s. \+ z8 E
           genesis = pm.blockchain.Genesis(): A( e+ t2 U6 O$ K! ~: E
           head    = pm.blockchain.CurrentHeader()0 B( f% @# ~/ b) d/ \
           hash    = head.Hash()
- u. T0 `* e# Y3 }. E, @           number  = head.Number.Uint64()% u) {- o+ |6 O0 l, u' |
           td      = pm.blockchain.GetTd(hash, number)
' ]; x" e3 ~8 T2 t' l   )' \) T. N6 e5 O* D3 U' ?
   //握手,与对方peer沟通己方的区块链状态
1 j5 I+ d0 J' R   if err := p.Handshake(pm.networkID, td, hash, genesis.Hash()); err != nil {6 \# E' Q, S4 {
           p.Log().Debug("Ethereum handshake failed", "err", err)
' N2 n; T- f* r0 a7 q: \% s4 S           return err
& k, ^/ f8 Z0 E: C% _% C, W+ b   }' g: Z* y" C7 c1 [! a1 a! M  _: _" U
   //初始化一个读写通道,用以跟对方peer相互数据传输。
9 m% ^# K# h' W# ]5 W  \% t   if rw, ok := p.rw.(*meteredMsgReadWriter); ok {
; M6 {6 n% b! |4 k/ N/ {- b- l. N           rw.Init(p.version)
6 k% h  E' ~. I" T& n  p: w; ~   }
8 ?7 h2 Q2 m! q+ Z' x# J   // Register the peer locally
9 ?% {2 ]% y1 L$ b7 m   //注册对方peer,存入己方peer列表;只有handle()函数退出时,才会将这个peer移除出列表。
7 F6 V0 _! x" ?% _9 a2 Q, Z1 w   if err := pm.peers.Register(p); err != nil {; V! h, }) Z. [0 o+ q/ M
           p.Log().Error("Ethereum peer registration failed", "err", err): Q0 z: @: `* S2 N6 O9 m6 ~  w
           return err: |; y! Z9 N5 X# n$ ?! G  q
   }0 Q& m. C8 I! p9 i# {2 b
   defer pm.removePeer(p.id)8 c/ g; ]- E6 u8 D! p6 M  \5 f- V
   //Downloader成员注册这个新peer;Downloader会自己维护一个相邻peer列表。2 L& }" D7 B2 P' o8 u( A
   // Register the peer in the downloader. If the downloader considers it banned, we disconnect
' T! d7 z( I! ?# \9 N- T   if err := pm.downloader.RegisterPeer(p.id, p.version, p); err != nil {
0 \3 j+ C  X3 o; H           return err8 W. A+ s& W$ f1 w- [! D
   }0 M- W8 l; X; Q: o
   // Propagate existing transactions. new transactions appearing7 `+ C& h6 T3 {
   // after this will be sent via broadcasts.6 e! x7 \2 i2 {  H
   /*1 j; G* V6 Q: J: v$ @
   调用syncTransactions(),用当前txpool中新累计的tx对象组装成一个txsync{}对象,
' _: X0 D  ~0 G1 f# i9 f) U   推送到内部通道txsyncCh。还记得Start()启动的四个函数么? 其中第四项txsyncLoop(); C5 d4 o7 m0 e3 B. m" Z" v
   中用以等待txsync{}数据的通道txsyncCh,正是在这里被推入txsync{}的。
4 N) R! v* j% M' B, f   */! l3 T5 ~, J, U& W! Z
   pm.syncTransactions(p)
# f, S9 y9 P4 g' a   // If we're DAO hard-fork aware, validate any remote peer with regard to the hard-fork) L2 S: p% v/ Q6 n5 l% e. w
   if daoBlock := pm.chainconfig.DAOForkBlock; daoBlock != nil {9 k7 q; N  c& f4 q
           // Request the peer's DAO fork header for extra-data validation# M6 C& S. e# v$ |) j3 W
           if err := p.RequestHeadersByNumber(daoBlock.Uint64(), 1, 0, false); err != nil {% j, [8 W% }3 |: }6 n. P  h4 _
                   return err8 |+ P+ d. w2 r+ m6 |5 k/ Z, g
           }- N+ O4 A- {8 J% g: e3 B2 G( J
           // Start a timer to disconnect if the peer doesn't reply in time
7 s% P7 x% p* t           p.forkDrop = time.AfterFunc(daoChallengeTimeout, func() {' z. c. z2 j( ^9 K& |' Y+ L4 u' D
                   p.Log().Debug("Timed out DAO fork-check, dropping")
! m/ w4 Z( g- }; j                   pm.removePeer(p.id)1 I+ o% d% @7 r3 R1 K2 o
           })2 f" a) Z  f1 K$ e6 i' e
           // Make sure it's cleaned up if the peer dies off( t0 R5 B1 x, R  r- X. E
           defer func() {. _5 l; |( u# ?$ Y$ x! }! A
                   if p.forkDrop != nil {
  ?# V+ r! K4 f0 F3 X                           p.forkDrop.Stop()" |8 Q1 Z6 n7 b
                           p.forkDrop = nil4 W- F0 l% Q5 ~( n2 g* Q' Z& a
                   }
8 t- k$ ]6 m6 C. X2 K. [           }()6 C& {' z* g; t
   }& e* r: u' r2 O+ x
           //在无限循环中启动handleMsg(),当对方peer发出任何msg时,
% h4 n9 J0 ~7 z           //handleMsg()可以捕捉相应类型的消息并在己方进行处理。
6 ~7 S. N1 M% P; c- }+ a! ]   // main loop. handle incoming messages.
) [0 n( A5 H8 S( A$ ]   for {
! [! e& L  G  ~# F  ^) h& f           if err := pm.handleMsg(p); err != nil {( x7 s" \0 D. d! J* c% m% N
                   p.Log().Debug("Ethereum message handling failed", "err", err)9 W) h) {7 \4 S) Y5 |2 m
                   return err
% a2 ^# h5 p- x% e+ F; A; C; V( j           }
2 f  p6 {( k' u' G   }
& J0 l, J: }) ^' s4 C}
BitMere.com 比特池塘系信息发布平台,比特池塘仅提供信息存储空间服务。
声明:该文观点仅代表作者本人,本文不代表比特池塘立场,且不构成建议,请谨慎对待。
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

成为第一个吐槽的人

Mohammad61417 小学生
  • 粉丝

    0

  • 关注

    0

  • 主题

    2