Hi 游客

更多精彩,请登录!

比特池塘 区块链技术 正文

深入区块链以太坊源码之p2p通信

Mohammad61417
280 0 0
一、p2p网络中分为有结构和无结构的网络
2 [1 c# P, c, j4 e' N. ^无结构化的
% {0 k* W% E0 A5 J" P这种p2p网络即最普通的,不对结构作特别设计的实现方案。1 o' e6 t* C4 h5 }" i4 r
优点是结构简单易于组建,网络局部区域内个体可任意分布,$ a$ `  V3 Q. ?- i* Y" w
反正此时网络结构对此也没有限制;特别是在应对大量新个体加+ P. f& ]- n. {4 I+ v/ m7 g+ Q
入网络和旧个体离开网络(“churn”)时它的表现非常稳定。
0 i2 h& v5 W$ h' w0 S缺点在于在该网络中查找数据的效率太低,因为没有预知信息,
& b. l: S& q  S所以往往需要将查询请求发遍整个网络(至少大多数个体),% P6 Q+ i) l; I
这会占用很大一部分网络资源,并大大拖慢网络中其他业务运行。! j' [0 ~8 v+ I
结构化的:" Y9 E$ i$ F  Y& e  s$ E- ^
这种p2p网络中的个体分布经过精心设计,主要目的是为了提高查询数据的效率,- K& b% L5 j( w) n2 m/ o  X  W
降低查询数据带来的资源消耗。$ |0 r# g# ]" v9 }9 o
以太坊采用了不需要结构化的结构,经过改进的非结构化(比如设计好相邻个体列表peerSet结构)
: f& z7 U. P/ ]1 T2 t; v, w网络模型可以满足需求;
8 Y) U0 v  `5 k2 Z7 V/ k二、分布式hash表(DHT)
9 t9 |$ f  m6 P% ]9 E# r4 A6 Y2 p保存数据
4 N3 e# \( g0 l(以下只是大致原理,具体的协议实现可能会有差异)
1 @  ?! O0 @. O# g; `: W6 j当某个节点得到了新加入的数据(K/V),它会先计算自己与新数据的 key 之间的“距离”;
) ?" U$ c7 U; y& K8 r然后再计算它所知道的其它节点与这个 key 的距离。
4 Q" O$ ]9 B! p/ w3 }/ E1 Z$ b如果计算下来,自己与 key 的距离最小,那么这个数据就保持在自己这里。
* w; p) [  F! p4 @* |6 S" v否则的话,把这个数据转发给距离最小的节点。2 w& `8 w7 Q  G: L
收到数据的另一个节点,也采用上述过程进行处理(递归处理)。3 p/ C/ N) ~% j- q% ~: [0 J
获取数据
) ?! R5 t4 u  ^' V, i& \+ b" s. N3 B(以下只是大致原理,具体的协议实现可能会有差异)6 i: l- L. C3 e. C  p& i8 c* @( d
当某个节点接收到查询数据的请求(key),它会先计算自己与 key 之间的“距离”;( Q) d' X* @5 n7 B5 T0 t1 R
然后再计算它所知道的其它节点与这个 key 的距离。; Y( s! i$ U0 ], E. t( x2 B7 S
如果计算下来,自己与 key 的距离最小,那么就在自己这里找有没有 key 对应的 value。
, h; p. J7 c" z; @% B有的话就返回 value,没有的话就报错。
2 [9 ~, q$ P2 E7 p" E否则的话,把这个数据转发给距离最小的节点。
" H: T8 c: _( s7 n: r% D' h0 K' a收到数据的另一个节点,也采用上述过程进行处理(递归处理)。* s7 e& o9 P8 a9 N4 K# D
三、以太坊中p2p通信的管理模块ProtocolManager
2 d% q3 _! D4 S( s2 z1 o/geth.go
8 r; ~: n' c. X, O% A// Start creates a live P2P node and starts running it.* s3 ^3 V0 d, @& @
func (n *Node) Start() error {
# J; ^4 U4 s9 h! K* n9 ]2 V9 B: y   return n.node.Start()" F! n. H8 b3 |0 \* [$ ~: {& @
}$ q1 N* a0 j( I1 Q
/*
4 l" b: k! L8 W( e4 B: e   Protocol:容纳应用程序所要求的回调函数等.并通过p2p.Server{}在新连接建立后,将其传递给通信对象peer。
/ F8 B' \/ S" f  z! ?2 X   Node.Start()中首先会创建p2p.Server{},此时Server中的Protocol[]还是空的;% j2 }# y% i* O( k' v# L8 I
   然后将Node中载入的所有实现体中的Protocol都收集起来,
( J7 p; T0 F) e* I3 s9 S3 n   一并交给Server对象,作为Server.Protocols列表;然后启动Server对象,
% f$ i9 n. [2 g5 k4 c: v4 q; m   并将Server对象作为参数去逐一启动每个实现体。) d0 U! n% O" y  g2 \0 E% E  l3 a5 f* ?
*/
3 O: t5 _1 l! q6 k/node.go
: `! |$ J* y1 Y" \; D) L/ o/ z// Start create a live P2P node and starts running it.: I5 }3 y- G; y: ]+ x& g' \7 w
func (n *Node) Start() error {3 O( @# s2 K0 X% K8 _. q
   
: D) T+ J3 `0 x8 G/ J   ...
- J7 m; i1 L4 j, E* E   /*
" h: o6 b! m( J: q& T5 t9 W           ...% W+ S3 z- g. c2 F8 X$ \1 E
           初始化serverConfig1 T8 R' j3 W- w8 g
   */5 i' g5 @" c6 Q- O. y0 v
   running := &p2p.Server{Config: n.serverConfig}
- {5 N" g# f$ v6 |   ...- Y5 b6 x& F* P4 m/ I5 K( o
   // Gather the protocols and start the freshly assembled P2P server
- S) Z) b; N2 ^$ W, V   for _, service := range services {
& L; d1 M$ ?. b+ i, {0 L           running.Protocols = append(running.Protocols, service.Protocols()...)7 F8 [3 U3 T& I* S" m
   }
5 M' ~# t! a7 @8 o( e   if err := running.Start(); err != nil { //见下面的(srv *Server)Start方法
$ p* E3 a3 ^8 e: e           return convertFileLockError(err)0 V& ?6 S4 g/ {# r
   }
0 P* @1 B9 E7 `' i   // Start each of the services: h  ?# g8 c. Q
   started := []reflect.Type{}
4 z# T% l8 V  T6 G  [! R5 r$ x   for kind, service := range services {- ?; q$ v( u0 w2 r9 C. s& A4 P! I
           // Start the next service, stopping all previous upon failure5 Z: S/ q0 L/ E- e0 `6 l: c
           //启动每个services通过下面的方法func (s *Ethereum) Start(srvr *p2p.Server) error {, h0 W  ~/ s- |( e7 v5 n
           if err := service.Start(running); err != nil {
3 u; e& E+ {4 b$ }                   for _, kind := range started {% e/ x2 r* s" o* d( A- S/ j
                           services[kind].Stop(); z/ {& Z7 a$ x
                   }+ w0 ~* v$ ^2 ?, z  X' {! D" [2 `! O
                   running.Stop()8 G, z! m  ]1 f2 @, I& g
                   return err
9 H) \  {( i3 ]" q3 g           }
; K3 C( {6 s( z1 M5 k# S! c           ...2 h" l8 h/ a8 W' Q
   }7 N8 g# w  n7 y+ g& x8 `1 p
}" @3 @2 n% ]4 W) K
// Start starts running the server.1 ~3 T# C, O$ Y* Z- [% x! m
// Servers can not be re-used after stopping.
- ^( i' A4 l) r2 N$ W' cfunc (srv *Server) Start() (err error) {0 g/ N+ K' @1 Z) P# C
   srv.lock.Lock()
9 [9 M$ g' m# Y- I3 k6 B+ H3 N   //srv.lock为了避免多线程重复启动
, T" u: Z+ X; l3 S9 B& A2 l   defer srv.lock.Unlock()0 t# U7 j: W! F2 j& z
   if srv.running {2 N6 {: o9 }, D4 S$ {: ?9 s
           return errors.New("server already running")
! ]/ k0 W! C5 c, W. C$ Z* m   }$ \: Y# S$ m1 G) J
   srv.running = true% m: I" n& x1 b( z1 U, X
   srv.log = srv.Config.Logger
. c3 e1 v9 L+ o* b  S- Z8 x   if srv.log == nil {
' u( j( Y' o7 M* e9 ]+ U           srv.log = log.New()" L5 t# c! U  l: P. z4 I4 q9 i8 S
   }4 E: Y6 l5 D7 t4 ?4 o
   if srv.NoDial && srv.ListenAddr == "" {  L0 e- v) k" _2 \
           srv.log.Warn("P2P server will be useless, neither dialing nor listening")
+ X8 ?' p3 Q3 j+ Y5 f   }3 X- k. G2 h' J. j" ]1 B
   // static fields4 c3 Q6 h+ I( G! M8 z
   if srv.PrivateKey == nil {/ D4 {# K: P/ [; m
           return fmt.Errorf("Server.PrivateKey must be set to a non-nil key")
9 l" {7 q1 _* K) z' s   }
( d6 @$ u0 ]% u5 x( {8 O, N   //newTransport使用了newRLPX使用了rlpx.go中的网络协议。
, h' u$ |( O% D* H) }   if srv.newTransport == nil {
* z; X* a& }& w& {# v           srv.newTransport = newRLPX
& M! u- {" t+ g" R; t. v   }4 e& E( P; ?$ J9 L4 f7 F. d
   if srv.Dialer == nil {
: G4 Y! G3 C6 u9 w* e" H; D2 n           srv.Dialer = TCPDialer{&net.Dialer{Timeout: defaultDialTimeout}}
8 i: t6 H9 `7 \7 [8 U   }2 u" w6 ^5 y; y  U, O, ~6 x
   srv.quit = make(chan struct{})
9 Z& s& |' v( C8 ]6 N   srv.addpeer = make(chan *conn), j( i0 f0 d9 `4 o( ^& R9 O
   srv.delpeer = make(chan peerDrop)
. Z/ V5 U4 d0 s( t( @' r   srv.posthandshake = make(chan *conn)
2 A) \' W/ J6 |! D, e) u   srv.addstatic = make(chan *enode.Node)# F0 ^! w- Y+ ^6 h
   srv.removestatic = make(chan *enode.Node)
3 w. L0 ^* `/ R   srv.addtrusted = make(chan *enode.Node)
! g% ]' D8 n( R   srv.removetrusted = make(chan *enode.Node)0 e0 O3 f$ _1 Y/ Z/ b3 F
   srv.peerOp = make(chan peerOpFunc)! ~; c7 H6 ]" d# a% ^: O$ Q# x
   srv.peerOpDone = make(chan struct{})
8 Q! V8 G/ z: o+ @$ |) X9 w7 d   //srv.setupLocalNode()这里主要执行握手
/ s! F/ H; K* n: T5 s9 k" u1 P6 r   if err := srv.setupLocalNode(); err != nil {
/ t, v2 b" @, J# x( N           return err
& l! [" ?! J" s* P, [9 e   }
  g4 }) [' Q2 U- r+ @   if srv.ListenAddr != "" {! T3 [4 ?0 x1 G8 h" z! ?
           //监听TCP端口-->用于业务数据传输,基于RLPx协议)
: p6 p! A& Q- J/ U! e           //在setupListening中有个go srv.listenLoop()去监听某个端口有无主动发来的IP连接
# }8 V* i7 o( D$ P: e- C. U           if err := srv.setupListening(); err != nil {, i7 d' W% a- O: k
                   return err
* C) H- I9 Y! @$ T2 l           }7 {! l5 n4 F1 I$ C
   }2 {  }" ^! j* A2 p
   //侦听UDP端口(用于结点发现内部会启动goroutine)# K6 h) q% U7 U3 p' p8 n( e
   if err := srv.setupDiscovery(); err != nil {4 k; r* K6 b9 `
           return err6 {7 n; p6 _! z: X$ E( b
   }+ l' M5 s: I6 H' o
   dynPeers := srv.maxDialedConns()5 v6 N. C- J" s5 j! Q6 F7 Z1 T
   dialer := newDialState(srv.localnode.ID(), srv.StaticNodes, srv.BootstrapNodes, srv.ntab, dynPeers, srv.NetRestrict)( b2 F: t0 B$ j. M; G
   srv.loopWG.Add(1)
# T  x8 i% Y; |   // 启动新线程发起TCP连接请求
( c4 \+ X0 E0 R% F- o% ]! E   //在run()函数中,监听srv.addpeer通道有没有信息如果有远端peer发来连接请求,! }9 U* q3 E( X9 x* \& I4 ]
   //则调用Server.newPeer()生成新的peer对象,并把Server.Protocols全交给peer。) v; s& J) e" @
   /*
9 [$ s% j6 W$ t+ T% p) X   case c :=  0 {
6 W+ Y2 g; x  _( Z: p0 K6 K& E0 P           if s.config.LightPeers >= srvr.MaxPeers {
9 \- \; _9 D( J8 l& q1 W; H                   return fmt.Errorf("invalid peer config: light peer count (%d) >= total peer count (%d)", s.config.LightPeers, srvr.MaxPeers)$ u2 o% m6 _. ?
           }+ k$ c/ p5 F/ V. a
           maxPeers -= s.config.LightPeers5 o. @' S" N0 O+ z/ Z; S9 K1 n
   }# h, e7 z5 U* v7 ~  B
   // Start the networking layer and the light server if requested
' W* y  R. p/ [/ {8 B2 ~+ o" h   s.protocolManager.Start(maxPeers): L9 f3 o$ W: w  \% W% T
   if s.lesServer != nil {
8 }1 E% S5 w$ B           s.lesServer.Start(srvr)
! t3 F$ {% y- I, D   }( w# i4 O* y/ v2 e8 u
   return nil
" C. I; s( F, \8 ^' _! G}) m4 R# p+ l# J% ^; P/ g! k: l
/eth/handler.go- q' u' q, _, p  Y
type ProtocolManager struct {1 C6 @) J; P1 P' A
   networkID uint64
; R$ v  b2 c7 ]) p   fastSync  uint32 // Flag whether fast sync is enabled (gets disabled if we already have blocks)+ i$ Q9 d2 H% f
   acceptTxs uint32 // Flag whether we're considered synchronised (enables transaction processing)
& b; {1 ]8 F. ]$ d0 N   txpool      txPool0 d8 l) n: x" n% B0 s4 v- ^. c
   blockchain  *core.BlockChain
! j5 N  {4 z" q) W   chainconfig *params.ChainConfig) W) \' c7 ~4 J* N$ Z# ?
   maxPeers    int* e* k& G0 g3 [9 N; i7 K2 P
   //Downloader类型成员负责所有向相邻个体主动发起的同步流程。
' B, m: C5 i  b1 k! R   downloader *downloader.Downloader5 \6 \1 f$ l0 A
   //Fetcher类型成员累积所有其他个体发送来的有关新数据的宣布消息,并在自身对照后做出安排$ _; t6 y2 O% p1 _1 U- L
   fetcher    *fetcher.Fetcher
% g% H, K5 K6 w8 R3 C   //用来缓存相邻个体列表,peer{}表示网络中的一个远端个体。
! I1 l. Z+ V: |) V: t' x# H   peers      *peerSet
- B3 N/ o5 ^$ I5 |+ Z6 `8 B   SubProtocols []p2p.Protocol
' q5 U% R9 q# @! B   eventMux      *event.TypeMux* k  R5 ]& \5 w, d
   txsCh         chan core.NewTxsEvent
; D1 j) c# L6 ?$ K2 z! T* }3 |- _/ L   txsSub        event.Subscription& u# R. z- w' U; i: D
   minedBlockSub *event.TypeMuxSubscription
) v: s/ A5 i" u$ x0 o   
# i* F9 y5 |8 E* V3 A5 D  x   //通过各种通道(chan)和事件订阅(subscription)的方式,接收和发送包括交易和区块在内的数据更新。
, N0 i( ^* b0 t9 N- D/ J   //当然在应用中,订阅也往往利用通道来实现事件通知。( t7 n; P+ i& `9 ?+ F: n9 w' B
   // channels for fetcher, syncer, txsyncLoop
7 s5 w7 o5 m0 d# A3 C2 C   newPeerCh   chan *peer: @6 ~+ I% U" {0 ]  S
   txsyncCh    chan *txsync0 C0 o+ G* i! g  A- m* b
   quitSync    chan struct{}
# g$ v+ k; U6 [. T7 B0 c   noMorePeers chan struct{}
( r( v/ r$ L  b% J: U( F% G( Q$ J! @9 D   // wait group is used for graceful shutdowns during downloading+ {% t* j6 H; Q. a
   // and processing  e4 ^2 [4 `, ~& \& r7 ^: E
   wg sync.WaitGroup1 K3 I! P# V! E! W  O7 z- H
}
: R5 Z" U$ i7 H5 j* |. x   Start()函数是ProtocolManager的启动函数,它会在eth.Ethereum.Start()中被主动调用。
9 Q0 B' Q; r" J3 F3 tProtocolManager.Start()会启用4个单独线程(goroutine,协程)去分别执行4个函数,
) K) X9 n! P  I) j  x- ?  C0 \2 }这也标志着该以太坊个体p2p通信的全面启动。9 b* g9 k/ H+ S1 v- s/ {
func (pm *ProtocolManager) Start(maxPeers int) {/ D5 h7 F2 T- S$ P' J2 g2 @6 i
   pm.maxPeers = maxPeers# ]  b0 x  [8 L- j5 y0 }
   // broadcast transactions
+ A- b2 n+ `9 S( F/ L: N- O/ N1 d   //广播交易的通道。 txsCh会作为txpool的TxPreEvent订阅通道。7 x5 y3 Z/ h; _7 Y$ A+ s; w* x* e; e
   //txpool有了这种消息会通知给这个txsCh。 广播交易的goroutine会把这个消息广播出去。: p. N3 b* [3 e* b
   pm.txsCh = make(chan core.NewTxsEvent, txChanSize)) y4 O6 X9 i. m$ Y+ a9 y1 }
   //订阅交易信息
9 t) T' D$ Y7 X   pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh)/ C$ B# K2 L# l- b; x. x
   3 w+ P0 B+ f& Y0 z. v
   go pm.txBroadcastLoop()
2 @3 N7 x/ |- E   //订阅挖矿消息。当新的Block被挖出来的时候会产生消息3 \! T4 I7 n! ~" P
   // broadcast mined blocks
# B( w' [  {3 z4 h   pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})( W: d) [4 r: D3 q" r
   //挖矿广播 goroutine 当挖出来的时候需要尽快的广播到网络上面去。
" ^5 V/ V* C. V6 J   go pm.minedBroadcastLoop(): g1 W( A2 N* b. A, s% I
   // start sync handlers
0 `+ f' X8 t  N0 w   // 同步器负责周期性地与网络同步,下载散列和块以及处理通知处理程序。
) R' S) k% L5 J+ h  b2 {, _   go pm.syncer()2 o, |% j9 T+ ]/ ]7 p
   // txsyncLoop负责每个新连接的初始事务同步。 当新的peer出现时,
. F9 G# K; ^0 c. K/ B   // 转发所有当前待处理的事务。为了最小化出口带宽使用,我们一次只发送一个小包。
0 X( w- f. A. a( p1 t: p   go pm.txsyncLoop(). i  {2 ?7 [) A5 [! w3 o1 {4 t
}$ Y, s! \% o  w! {
//txBroadcastLoop()会在txCh通道的收端持续等待,一旦接收到有关新交易的事件," l2 O1 E6 Y3 c0 N7 y; w
   //会立即调用BroadcastTx()函数广播给那些尚无该交易对象的相邻个体。
% L: r) N- I5 a, }/ t# e1 {: J8 |//------------------go pm.txBroadcastLoop()-----------------------
5 H1 a- D1 G( n9 ^& o+ pfunc (pm *ProtocolManager) txBroadcastLoop() {5 j- o) G  x3 \0 c5 Q
   for {9 |) h" J: D& \: T! Q0 X
           select {
4 f3 W/ F' r# P# C           case event := = pm.maxPeers && !p.Peer.Info().Network.Trusted {. G( {! ^3 C$ T3 j0 v9 R
           return p2p.DiscTooManyPeers4 }# }4 l4 {' |0 \  k
   }3 n  O' i0 y$ X9 Z# @$ `3 s
   p.Log().Debug("Ethereum peer connected", "name", p.Name()). g8 u1 N" B( ~. m6 T
   // Execute the Ethereum handshake
! N& p# d) R3 S   var (/ }: ]2 @" I- S
           genesis = pm.blockchain.Genesis()/ L  j! R% ^0 @0 Z6 v
           head    = pm.blockchain.CurrentHeader()
) ], x# Z: E- j; t% t           hash    = head.Hash()
4 R) N8 Z4 u8 Z  R           number  = head.Number.Uint64()2 x. j. ^. d% |5 r7 K( W$ |
           td      = pm.blockchain.GetTd(hash, number)
% V* p9 z# w/ G9 j7 x* r7 X) U   )( d4 c8 x9 }0 f0 }+ Y9 ?/ N9 F( R6 \
   //握手,与对方peer沟通己方的区块链状态
: X! {' S2 \/ j' A3 f1 `   if err := p.Handshake(pm.networkID, td, hash, genesis.Hash()); err != nil {- B( q5 J/ v6 o7 g. m7 ]6 A
           p.Log().Debug("Ethereum handshake failed", "err", err)
- c2 C8 T% }, ]9 @( h1 |& k           return err2 T' d8 F3 `/ c
   }
% d. y7 G" [- p) |, Z   //初始化一个读写通道,用以跟对方peer相互数据传输。& Z# Y: b/ N, i, W
   if rw, ok := p.rw.(*meteredMsgReadWriter); ok {
) ~4 q+ F& O) B* @  O$ n* ]           rw.Init(p.version)9 ]. ?; W+ B" k: H
   }
( A6 d5 Q! Y, [0 d  d   // Register the peer locally9 z: R) f6 Z  h
   //注册对方peer,存入己方peer列表;只有handle()函数退出时,才会将这个peer移除出列表。
$ \& \( f1 q1 W: p' W* _   if err := pm.peers.Register(p); err != nil {
+ L5 P/ Y0 W9 ~' u0 o: _           p.Log().Error("Ethereum peer registration failed", "err", err)/ L" m* n% S0 _, q) f# I3 q
           return err2 e4 E  `! y2 A3 x1 k1 H7 q: P% p
   }
" p- Z* a5 F# Z  R0 G' c3 c7 U0 R   defer pm.removePeer(p.id)
8 I: {" [  h3 B9 ?- j   //Downloader成员注册这个新peer;Downloader会自己维护一个相邻peer列表。9 l3 T/ Z* R, R
   // Register the peer in the downloader. If the downloader considers it banned, we disconnect4 M/ v) U  h# @- H* L9 R; ]
   if err := pm.downloader.RegisterPeer(p.id, p.version, p); err != nil {. _. t4 p" ~+ e& e2 I7 w# p  c, N
           return err
8 ?. e( U4 k4 \" H   }
% q$ G# G  P1 G# v8 M* r& n   // Propagate existing transactions. new transactions appearing! o: y7 L6 w$ h% b5 d
   // after this will be sent via broadcasts.
0 Y+ u5 z) A& r  T' W   /*
8 X+ }: ]5 C  m6 G) o3 }: E   调用syncTransactions(),用当前txpool中新累计的tx对象组装成一个txsync{}对象,
3 I2 G. g+ c! g% J/ i   推送到内部通道txsyncCh。还记得Start()启动的四个函数么? 其中第四项txsyncLoop(), E; @$ _9 X4 {4 `' n5 I) v: H& }
   中用以等待txsync{}数据的通道txsyncCh,正是在这里被推入txsync{}的。  A: v; m2 U% i  f
   */' ^! x/ h# X5 I
   pm.syncTransactions(p)( p- @6 s3 T- L9 ]
   // If we're DAO hard-fork aware, validate any remote peer with regard to the hard-fork7 \+ Y, \6 q( Z8 m
   if daoBlock := pm.chainconfig.DAOForkBlock; daoBlock != nil {
* M1 R4 d+ v  }! u           // Request the peer's DAO fork header for extra-data validation$ D4 c. K# X& v8 a, U6 [8 c& `
           if err := p.RequestHeadersByNumber(daoBlock.Uint64(), 1, 0, false); err != nil {/ L1 {0 l# r2 G" K$ L
                   return err5 B% j1 T6 M' I% k
           }9 v- o  |/ W- I# Y% j, \
           // Start a timer to disconnect if the peer doesn't reply in time
1 o! F4 }; @6 C           p.forkDrop = time.AfterFunc(daoChallengeTimeout, func() {3 s; k, Z7 {( {7 Q( L3 F( z8 Z
                   p.Log().Debug("Timed out DAO fork-check, dropping")
2 y5 @9 z- m% [                   pm.removePeer(p.id)
# Y8 S& E  e4 f4 k- x  B: X6 Z           })5 ?: C" l" Q6 M7 `
           // Make sure it's cleaned up if the peer dies off
! s. p- k% x( o1 ~3 E           defer func() {
/ i0 c7 L: m6 |) O7 L                   if p.forkDrop != nil {9 o, S6 j9 J' z7 B1 V7 R; m
                           p.forkDrop.Stop()
8 h9 ?, a$ N3 _) y. T% d4 d# g: \                           p.forkDrop = nil
* F) {8 h3 u+ S2 D1 G, ?                   }
0 A! o9 e* B/ m* y. V5 V           }()
% E' G- u4 c* ?8 `" z   }
3 m2 G$ ]7 _/ Z3 A: T  w           //在无限循环中启动handleMsg(),当对方peer发出任何msg时,6 b9 w3 W( O7 _' z2 C
           //handleMsg()可以捕捉相应类型的消息并在己方进行处理。
, V! j3 p! g  s) \   // main loop. handle incoming messages.! |; n0 k/ }- x3 F/ A4 Y6 Y
   for {  o+ t7 O! D4 o+ w
           if err := pm.handleMsg(p); err != nil {
7 F- v7 @# f9 I( d% }( x2 ^/ A3 }                   p.Log().Debug("Ethereum message handling failed", "err", err)
7 C6 K+ C/ \" j# z5 C                   return err
( a5 D# ~- p2 j) Z* i( S           }
4 Y' G3 ~$ y: P& b- t/ m   }5 @, w8 X& z" Z# f
}
BitMere.com 比特池塘系信息发布平台,比特池塘仅提供信息存储空间服务。
声明:该文观点仅代表作者本人,本文不代表比特池塘立场,且不构成建议,请谨慎对待。
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

成为第一个吐槽的人

Mohammad61417 小学生
  • 粉丝

    0

  • 关注

    0

  • 主题

    2