Hi 游客

更多精彩,请登录!

比特池塘 区块链技术 正文

深入区块链以太坊源码之p2p通信

Mohammad61417
259 0 0
一、p2p网络中分为有结构和无结构的网络% ]1 |- t# ?  v" B, n2 Q* k
无结构化的. ]5 m9 a) h+ G; g/ N  `
这种p2p网络即最普通的,不对结构作特别设计的实现方案。
; G) q* |" D; e# j1 E( J优点是结构简单易于组建,网络局部区域内个体可任意分布,
; g8 t& g0 B5 K9 \) R0 h反正此时网络结构对此也没有限制;特别是在应对大量新个体加! J, d1 }/ }# Z9 |, n  G. V- x
入网络和旧个体离开网络(“churn”)时它的表现非常稳定。" Q/ w3 d9 A. J1 {: t+ Q7 f4 o  E
缺点在于在该网络中查找数据的效率太低,因为没有预知信息,
+ ?3 M! O/ n7 Z0 Y: s# G1 ~1 ^所以往往需要将查询请求发遍整个网络(至少大多数个体),
+ C( ~  S1 F% }这会占用很大一部分网络资源,并大大拖慢网络中其他业务运行。- B# x2 S& X& x! l& k; {( j
结构化的:: L7 J0 H$ e, E5 L
这种p2p网络中的个体分布经过精心设计,主要目的是为了提高查询数据的效率,* p# ~5 [; I5 D6 k% r( a
降低查询数据带来的资源消耗。6 u4 l: d) f* [6 o% T- t
以太坊采用了不需要结构化的结构,经过改进的非结构化(比如设计好相邻个体列表peerSet结构)( E2 @% @" e  J$ s, e3 _
网络模型可以满足需求;
  H1 p3 a8 O) \  k, G! e% b; K; m二、分布式hash表(DHT)
% {; d& U8 B9 B保存数据$ ]  b5 |, C+ t
(以下只是大致原理,具体的协议实现可能会有差异)
( H0 S: @/ `1 K1 \当某个节点得到了新加入的数据(K/V),它会先计算自己与新数据的 key 之间的“距离”;- K, M, t6 V. w1 l
然后再计算它所知道的其它节点与这个 key 的距离。6 k% @  Q9 M) R4 C$ Y
如果计算下来,自己与 key 的距离最小,那么这个数据就保持在自己这里。! F' g; E  S; H7 b# U  h' h" y" |. [+ Z
否则的话,把这个数据转发给距离最小的节点。
6 j% g8 k* [6 j# t8 ]收到数据的另一个节点,也采用上述过程进行处理(递归处理)。
' E- }, {. Z- E# H5 f! U2 T, W获取数据
3 c! d5 |3 m6 t0 E) N6 ^0 K(以下只是大致原理,具体的协议实现可能会有差异)2 ]3 |3 Q" v  ?) M4 V
当某个节点接收到查询数据的请求(key),它会先计算自己与 key 之间的“距离”;' w; W/ X, P# r' q
然后再计算它所知道的其它节点与这个 key 的距离。) F$ h# B2 o; G. }' X2 r0 K/ h5 o
如果计算下来,自己与 key 的距离最小,那么就在自己这里找有没有 key 对应的 value。& N! E( u( Q* {' Y
有的话就返回 value,没有的话就报错。
/ y1 X9 |7 u4 b8 M: A& Q否则的话,把这个数据转发给距离最小的节点。. G3 h" |: @$ \+ R) g
收到数据的另一个节点,也采用上述过程进行处理(递归处理)。. J4 a+ N6 b) ], }4 B6 T5 @6 L1 S
三、以太坊中p2p通信的管理模块ProtocolManager2 B. B& R, M% g% P
/geth.go
# J* K3 O' B2 T5 B& T, |// Start creates a live P2P node and starts running it.
5 B) G, T1 g: w, vfunc (n *Node) Start() error {
2 j% ~: b4 S# d  v$ b   return n.node.Start()
+ H! t/ k/ Q! ~2 B# W; {5 u}
( _9 f2 N2 ]7 U/*
1 \# B9 Q6 r$ j6 q   Protocol:容纳应用程序所要求的回调函数等.并通过p2p.Server{}在新连接建立后,将其传递给通信对象peer。( @8 a% ~4 [1 y/ |) ?
   Node.Start()中首先会创建p2p.Server{},此时Server中的Protocol[]还是空的;7 ^1 V+ \$ f8 V- P
   然后将Node中载入的所有实现体中的Protocol都收集起来,
( g! f0 X4 i/ C6 E6 p   一并交给Server对象,作为Server.Protocols列表;然后启动Server对象,
8 c2 D8 l( F2 R: }   并将Server对象作为参数去逐一启动每个实现体。
- V0 W: s+ M  ?3 K! V: k! K+ p' K*/5 J3 v9 M8 ^* k0 p1 g
/node.go
7 ?5 D8 e5 v/ P# @// Start create a live P2P node and starts running it.
- Y' g3 |: j5 l; gfunc (n *Node) Start() error {* E9 Z! I) c# r( p# Z
   
! j9 w- W2 y4 y9 \& ]   ...( r1 Q4 @$ E1 S4 h
   /*; g  Q! r2 l) l1 C8 C5 u  v
           ...
; P' h/ q1 t% E$ D; G2 ?           初始化serverConfig
" A  l- C6 O7 N' B7 ^4 U   */
) U3 p: b) v" Z1 @9 Q8 F: r" a! c* I   running := &p2p.Server{Config: n.serverConfig}5 z; J4 L5 v" ?. j; ]
   ...
) y$ h0 b% J0 m& {  T   // Gather the protocols and start the freshly assembled P2P server
6 ^/ L6 V/ A  a, ^   for _, service := range services {
/ S2 J1 {/ ]6 n" R! T           running.Protocols = append(running.Protocols, service.Protocols()...)) a1 ^, a# `7 b  f3 J7 d
   }5 F: |4 a4 V. X2 K
   if err := running.Start(); err != nil { //见下面的(srv *Server)Start方法
, F" \8 J( Y. M           return convertFileLockError(err)$ u" F7 N6 x8 w& ~* ?; ?1 c3 Q
   }) g# Y7 t" L+ y1 g0 V$ |
   // Start each of the services4 D' e  b! G/ s  w$ r/ n0 O  u
   started := []reflect.Type{}0 ]8 C; {- `$ s0 w+ A
   for kind, service := range services {
3 }: F. g1 [5 |* n$ U7 S/ j           // Start the next service, stopping all previous upon failure" F# j3 t4 L( r
           //启动每个services通过下面的方法func (s *Ethereum) Start(srvr *p2p.Server) error {- A" l. b# _0 O
           if err := service.Start(running); err != nil {
% L! d' K: k$ T/ P                   for _, kind := range started {
. x+ T  C  m* }% I                           services[kind].Stop()2 E# q- X8 A$ u0 ?
                   }
$ e$ @4 q4 g. c9 o* q9 P  h  N                   running.Stop()" j% ~, |/ [% l7 }& u$ z
                   return err8 t0 C- O; s# Q9 _1 f' p
           }
2 T1 }* s6 G1 k* c           .... N! u# }9 H2 Y, F3 H- u, N9 i
   }  S) ^% f9 Q8 ^0 r1 c  ~+ g+ i5 y2 N
}1 C; f* r& q( V7 x6 h
// Start starts running the server.
( D! k) y* Y& o1 |2 l// Servers can not be re-used after stopping.7 r+ g5 u) s8 v7 c  \: h) N1 \
func (srv *Server) Start() (err error) {- E) ~$ d6 Q1 D- B. }5 @
   srv.lock.Lock()
/ |6 ^4 x/ k; M* `   //srv.lock为了避免多线程重复启动0 F0 g) x* p, q
   defer srv.lock.Unlock()0 z# e! O( a# S7 U/ n: n2 Q( t* |8 j
   if srv.running {
$ Y/ o# a5 q4 `0 x/ [: g* ?1 W. W4 g* K           return errors.New("server already running")
( n! j% C1 H9 t+ x. C2 Q5 @   }
" P- a" M' o9 c, n& l   srv.running = true
4 G$ m& A2 W  h& [   srv.log = srv.Config.Logger, i. g& z! Y4 J: ]; k/ h
   if srv.log == nil {
5 l. [/ ?9 I! V/ V1 A2 {; j           srv.log = log.New()9 Y' c& s5 k, r1 ]0 W# r
   }
+ [# t# m, ~0 L   if srv.NoDial && srv.ListenAddr == "" {
# u5 {" V: `; X+ C0 T           srv.log.Warn("P2P server will be useless, neither dialing nor listening")
% C9 y( a+ c- g' y   }
3 w  ~! g% g8 l! G   // static fields! c/ C# R# m5 ]/ h  y4 G# H6 y# n; @
   if srv.PrivateKey == nil {  B/ S1 H% R" _
           return fmt.Errorf("Server.PrivateKey must be set to a non-nil key")
, x& [% K7 l0 A   }
1 R1 D1 p2 f( G6 [& j1 E   //newTransport使用了newRLPX使用了rlpx.go中的网络协议。
( Z* B! n- `, ]! V1 z# ^  X   if srv.newTransport == nil {. n: d( c- [1 p8 B# i4 k+ m
           srv.newTransport = newRLPX
) t6 R* t& Y! M( H2 {& t   }& P8 |( L& U) x
   if srv.Dialer == nil {
: k/ n3 F! E1 g; ]5 A6 _           srv.Dialer = TCPDialer{&net.Dialer{Timeout: defaultDialTimeout}}6 I0 q3 b, t3 N/ j6 W
   }
8 ]& w8 M9 s  ~& |2 m7 |$ u. Z( T   srv.quit = make(chan struct{})6 r7 t+ e; ~; U" A
   srv.addpeer = make(chan *conn)
3 i2 A2 l/ N& e" S! h. \   srv.delpeer = make(chan peerDrop)
# r! @+ K0 r2 \+ G3 Y3 {; A2 C   srv.posthandshake = make(chan *conn)
0 R3 w& w) b+ g+ x+ }' ?, T3 g/ p   srv.addstatic = make(chan *enode.Node)1 p( D  U" o' ?0 U/ v* }9 l
   srv.removestatic = make(chan *enode.Node)
+ x$ w6 b7 @7 H% S, u9 L5 u   srv.addtrusted = make(chan *enode.Node)
% Q! Q. j) X) U1 f& k1 N+ m3 \/ h   srv.removetrusted = make(chan *enode.Node)0 Q8 k' ?8 y6 Z0 Z% ?" r& b
   srv.peerOp = make(chan peerOpFunc): N, F8 c: L/ }* G& K1 W
   srv.peerOpDone = make(chan struct{})
1 b: b  x6 Q- f* p* F! v! P   //srv.setupLocalNode()这里主要执行握手
- s6 O$ x' L: Y   if err := srv.setupLocalNode(); err != nil {
6 U% q# _) s1 j& v; z# d( [+ e% O9 N           return err( Y5 G  U6 H& P0 v% S
   }8 W7 g  \: W: v5 r8 f! d
   if srv.ListenAddr != "" {
# t' F- ?0 t8 h2 _# ?/ u, C0 k           //监听TCP端口-->用于业务数据传输,基于RLPx协议), L5 u$ _2 h3 L/ P3 N" c7 S
           //在setupListening中有个go srv.listenLoop()去监听某个端口有无主动发来的IP连接6 [/ T5 {( Q! k% a" ]3 U& v* T6 X
           if err := srv.setupListening(); err != nil {  `0 ?5 \) m1 w
                   return err
- J( @- ~( D/ q( O( @           }# R* A, d- s" U6 `
   }
( ~5 d. f$ \5 ~1 H1 ?* E$ H  o   //侦听UDP端口(用于结点发现内部会启动goroutine)
2 S& ]  e+ }  H1 b+ q# L   if err := srv.setupDiscovery(); err != nil {: b, j; M0 ~8 r# Y- n: `4 ~) K
           return err
/ ?4 J4 [' N$ w2 A# \   }
* r, E- c- e5 L+ _   dynPeers := srv.maxDialedConns()6 `; |! P8 G+ i7 R( ~
   dialer := newDialState(srv.localnode.ID(), srv.StaticNodes, srv.BootstrapNodes, srv.ntab, dynPeers, srv.NetRestrict)5 K; Q2 N! L5 J0 C5 J2 _5 d9 H
   srv.loopWG.Add(1)
6 w3 e6 Q4 K) U/ d) ~   // 启动新线程发起TCP连接请求2 j  G4 [' l) @3 \2 B  g: ?& ?5 u! V
   //在run()函数中,监听srv.addpeer通道有没有信息如果有远端peer发来连接请求,
) g5 q3 H7 o: }) ^+ F   //则调用Server.newPeer()生成新的peer对象,并把Server.Protocols全交给peer。
) X4 S# A) H* N  |' Z0 \+ Y  V- A   /*
- o5 y2 E4 _$ K$ N   case c :=  0 {5 [% F& m. ~( |* O; a- T1 c& e! h
           if s.config.LightPeers >= srvr.MaxPeers {& t3 O+ s/ Q8 ^! Y; T
                   return fmt.Errorf("invalid peer config: light peer count (%d) >= total peer count (%d)", s.config.LightPeers, srvr.MaxPeers)$ \, i% D8 V. Q- D& z
           }
2 s! d8 K" b1 K6 u% K; B8 K           maxPeers -= s.config.LightPeers
. i" Y- W3 Y) B$ q- M7 C# O   }
* k' o5 J; n$ D   // Start the networking layer and the light server if requested$ k$ m: n+ S! W6 Y1 B- u1 O* y
   s.protocolManager.Start(maxPeers)
" q1 N0 @* W# D7 M. v  T   if s.lesServer != nil {3 s. W" e0 J' z+ s
           s.lesServer.Start(srvr)4 ?# J/ P5 l0 O1 h) G' a) M1 c
   }
( y8 X; g2 o) j8 X% \7 ]& K, T   return nil
7 d; O- ?! S6 }5 {( Z% t}7 I( P. A" [- }9 O0 d
/eth/handler.go
  a6 v* R) E6 X9 u8 n3 qtype ProtocolManager struct {0 x/ c3 T: U! u" v( Z6 s  {3 U5 s
   networkID uint64
( P3 F% r; s2 S4 O' d' f   fastSync  uint32 // Flag whether fast sync is enabled (gets disabled if we already have blocks)9 T; v1 s7 c/ S6 N' y* ^
   acceptTxs uint32 // Flag whether we're considered synchronised (enables transaction processing)
: {% j/ _' |. g; t   txpool      txPool. ^7 r" l6 l# r
   blockchain  *core.BlockChain
  ?" V7 Q! ^+ }; C) p7 L5 a+ U, Q   chainconfig *params.ChainConfig' u6 F, U3 N  ~" ~6 t
   maxPeers    int
) d+ n/ y5 W) J" |' \  N) M  [   //Downloader类型成员负责所有向相邻个体主动发起的同步流程。
( s2 x, R' Z, d   downloader *downloader.Downloader
* c, ]( R  C  _: P; p   //Fetcher类型成员累积所有其他个体发送来的有关新数据的宣布消息,并在自身对照后做出安排
1 w3 G; t7 U) D! A2 b4 q! q& u   fetcher    *fetcher.Fetcher
4 R0 d0 n7 D. r; Q: S6 f   //用来缓存相邻个体列表,peer{}表示网络中的一个远端个体。9 b' W" k. Q+ o5 T" g' N
   peers      *peerSet
1 h1 J% g' Q# H  p* ^   SubProtocols []p2p.Protocol' E% r  [% y$ {
   eventMux      *event.TypeMux  ^. c8 ^- S0 {
   txsCh         chan core.NewTxsEvent
" S( F, e5 C# e, M   txsSub        event.Subscription5 n4 n6 f- j- y! T/ H
   minedBlockSub *event.TypeMuxSubscription
, Y) m6 \4 |9 S# b0 P2 g   
' p1 w' U7 j1 n2 h- }   //通过各种通道(chan)和事件订阅(subscription)的方式,接收和发送包括交易和区块在内的数据更新。: P- `/ ^# I2 s( n+ O8 c
   //当然在应用中,订阅也往往利用通道来实现事件通知。6 Z8 a  g. j% t* n+ M  G
   // channels for fetcher, syncer, txsyncLoop
! r( c) X- W% [7 J5 f9 A1 k4 b9 O   newPeerCh   chan *peer
# P; T3 X" m# O3 [   txsyncCh    chan *txsync: h( v( D/ s3 p$ z5 M: Y4 R/ d
   quitSync    chan struct{}0 B1 S- I# }7 ~2 C" b
   noMorePeers chan struct{}
4 J: X/ u0 U0 y5 i1 g  P   // wait group is used for graceful shutdowns during downloading$ \4 g3 l" Z! X. _+ N
   // and processing' t2 p# b8 B3 w! v
   wg sync.WaitGroup
1 v9 b- F- f* J& V}
7 K) D( X, V# o   Start()函数是ProtocolManager的启动函数,它会在eth.Ethereum.Start()中被主动调用。
; I6 N' R! B/ ^, ]4 r* cProtocolManager.Start()会启用4个单独线程(goroutine,协程)去分别执行4个函数,
- Z0 Z/ H3 U' L$ t这也标志着该以太坊个体p2p通信的全面启动。
7 }8 Y5 h2 L+ M! Z. Efunc (pm *ProtocolManager) Start(maxPeers int) {
. u! G' ^; z- p  Z" P% D: H   pm.maxPeers = maxPeers7 s9 k* }* r- P( M% E' D
   // broadcast transactions6 b) N  S' f. S% F" q% z
   //广播交易的通道。 txsCh会作为txpool的TxPreEvent订阅通道。, T( ?* J& B/ j) o7 h7 `+ k
   //txpool有了这种消息会通知给这个txsCh。 广播交易的goroutine会把这个消息广播出去。
4 |1 h0 F, C% d, t: u/ r  n/ y   pm.txsCh = make(chan core.NewTxsEvent, txChanSize)* k' J/ j* S) g  ]+ e
   //订阅交易信息
- u) f6 h9 }: ^   pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh)
6 ~9 V( j0 y% G   7 _, t/ F9 O3 W; A; l
   go pm.txBroadcastLoop()
* @/ p! s, [1 P, }   //订阅挖矿消息。当新的Block被挖出来的时候会产生消息
0 a' r% T( u8 Q% t7 w  @  G3 i   // broadcast mined blocks1 `( n/ z$ j# q4 G  O" Y% X$ e
   pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})4 b1 \# S. O3 w0 o  W6 C0 ]4 C- ^* W
   //挖矿广播 goroutine 当挖出来的时候需要尽快的广播到网络上面去。
# P7 L4 t% k. @- J   go pm.minedBroadcastLoop()
! w, Z2 j+ \9 `   // start sync handlers% o0 Y- O4 a. _! [2 @" |
   // 同步器负责周期性地与网络同步,下载散列和块以及处理通知处理程序。1 j& b; q" z/ y
   go pm.syncer()
: I4 u2 O7 n8 n4 I: V9 D   // txsyncLoop负责每个新连接的初始事务同步。 当新的peer出现时,
7 R& b, i2 T& h9 k! L1 n& g0 |( r   // 转发所有当前待处理的事务。为了最小化出口带宽使用,我们一次只发送一个小包。( L  H: [' A. t" k! x- I6 ?! z
   go pm.txsyncLoop()8 G6 l" ?" E# h: h" p) H
}# b8 |  B. L7 X9 V, i( |3 f: w
//txBroadcastLoop()会在txCh通道的收端持续等待,一旦接收到有关新交易的事件,
7 g. e, u8 b- O( L5 z. }/ k   //会立即调用BroadcastTx()函数广播给那些尚无该交易对象的相邻个体。2 b1 S, a* q7 z( M' M% B
//------------------go pm.txBroadcastLoop()-----------------------
& U9 n, ^% f  T2 ?func (pm *ProtocolManager) txBroadcastLoop() {# n& F3 i" h& s8 Q4 P( n3 t8 ]
   for {$ Y* a9 K# x2 G5 o4 H2 |
           select {% X6 K4 G3 b5 M  _' E
           case event := = pm.maxPeers && !p.Peer.Info().Network.Trusted {
3 _" |* e! q6 x0 Z0 p* `1 h           return p2p.DiscTooManyPeers- P  h% w" L6 h) N
   }
: m: X" I; z3 N2 ^1 N   p.Log().Debug("Ethereum peer connected", "name", p.Name())7 L3 @$ u8 {$ J# W& b9 M- t
   // Execute the Ethereum handshake
4 c' M9 M+ m) m8 r3 H   var (
% o& p1 W/ G. r! t2 `           genesis = pm.blockchain.Genesis()
0 k3 M. E% R* b+ s           head    = pm.blockchain.CurrentHeader()
+ ?0 @# s) |% p# w8 g           hash    = head.Hash()
+ c4 Y7 w3 R5 k           number  = head.Number.Uint64()
2 ~6 [  Q/ \7 P; r. j/ L           td      = pm.blockchain.GetTd(hash, number)
" s7 ]$ a, }- L4 R7 \' \4 O9 r   )
- o+ g* A' X2 U! |) G   //握手,与对方peer沟通己方的区块链状态  M; a0 o; C' h0 [' G6 j
   if err := p.Handshake(pm.networkID, td, hash, genesis.Hash()); err != nil {
: ~! c. \- u  }" E. J           p.Log().Debug("Ethereum handshake failed", "err", err)
) Q! N% _7 f: K7 f! {8 p           return err
  E4 r% u/ Z& v" E   }
. q3 q( \% H+ r$ b1 ^+ T5 O   //初始化一个读写通道,用以跟对方peer相互数据传输。! s! ]/ G& O5 o, C8 @- J) m  D! f
   if rw, ok := p.rw.(*meteredMsgReadWriter); ok {
, d1 F: G" B6 V2 [  e8 F           rw.Init(p.version)
& j3 t, X& e/ Q% Q   }
- M/ j8 D$ ~8 A" n9 Q- E8 W1 Y   // Register the peer locally( o# O% P+ T9 X" G" w0 U" q: k
   //注册对方peer,存入己方peer列表;只有handle()函数退出时,才会将这个peer移除出列表。, s  t: Z# c" K$ {
   if err := pm.peers.Register(p); err != nil {" a5 R5 a6 [* V, s, |
           p.Log().Error("Ethereum peer registration failed", "err", err)" q! m! ^2 V/ h. u9 d$ G8 r3 _$ [
           return err
& V, F4 }  ^) |. y   }9 f: g* {7 }3 T# N4 U% y
   defer pm.removePeer(p.id)7 s) u3 L* O% R6 ]' O) D
   //Downloader成员注册这个新peer;Downloader会自己维护一个相邻peer列表。7 m% F0 _, O3 W# E6 l* W9 R
   // Register the peer in the downloader. If the downloader considers it banned, we disconnect1 k, {" J3 P! K9 S! e
   if err := pm.downloader.RegisterPeer(p.id, p.version, p); err != nil {6 |& n$ s3 l! P% x9 {. G' m
           return err
8 k7 X/ v# y# J9 w   }. L8 |6 V  {3 [2 u' f
   // Propagate existing transactions. new transactions appearing- |9 m, m6 r( v3 C  B8 C2 B
   // after this will be sent via broadcasts.- h" _. j4 s8 h4 Z0 Z
   /*4 ]4 \9 y" Q, X" m( n# ?8 E
   调用syncTransactions(),用当前txpool中新累计的tx对象组装成一个txsync{}对象,, `2 I8 C5 E( e; K" X2 N& @$ [' f* R' z  V
   推送到内部通道txsyncCh。还记得Start()启动的四个函数么? 其中第四项txsyncLoop()
' P0 w/ x: \' R! }   中用以等待txsync{}数据的通道txsyncCh,正是在这里被推入txsync{}的。
! j2 G1 J/ n0 Z- n5 @. v4 G   */
  Q  g' O8 Q6 L$ h2 }   pm.syncTransactions(p)
  }, ]4 d. ]; Z$ {% g2 L' A   // If we're DAO hard-fork aware, validate any remote peer with regard to the hard-fork* k! |9 Y) ?( k: y& q
   if daoBlock := pm.chainconfig.DAOForkBlock; daoBlock != nil {
4 V  d3 y) {8 |% H1 }           // Request the peer's DAO fork header for extra-data validation
8 w. {; R/ f: L. N% v           if err := p.RequestHeadersByNumber(daoBlock.Uint64(), 1, 0, false); err != nil {8 J- C6 {  Y/ Z
                   return err
  I* [9 o$ s; J. v$ M4 {2 s           }
% n, C+ q, J& W: f) k           // Start a timer to disconnect if the peer doesn't reply in time
# q6 d: Z& c* u/ }1 |4 |           p.forkDrop = time.AfterFunc(daoChallengeTimeout, func() {/ I+ f5 U6 Q& u- U' Y7 @3 _. h
                   p.Log().Debug("Timed out DAO fork-check, dropping")! [6 j6 Z5 R" K0 x/ b6 A, e
                   pm.removePeer(p.id)
' O! \+ Q* M" O4 ~% h# ^           }), U/ k0 f' Y5 F& h8 b& ]# X$ F  I. X
           // Make sure it's cleaned up if the peer dies off( t  y  x" Y8 V' q) [% \! w, g, G
           defer func() {
5 @5 X0 d0 ]: ~2 C: e+ y                   if p.forkDrop != nil {) p1 Z, d5 R/ F3 N0 d$ I0 O0 Q
                           p.forkDrop.Stop()
. d& t, w7 \% t  z. j# ?) Y                           p.forkDrop = nil' g+ B# m& q0 e2 K% o9 P: ]3 S
                   }& q8 x7 O( h7 t+ n5 ?
           }()7 }9 A) k2 T" V+ F7 w! G
   }$ Y" ^; q( J$ H; a2 o) I3 h
           //在无限循环中启动handleMsg(),当对方peer发出任何msg时,
& B3 l( |, R: t. A           //handleMsg()可以捕捉相应类型的消息并在己方进行处理。8 s/ t4 m' E0 t( |4 A: a3 U% @/ K: L
   // main loop. handle incoming messages.- g3 Q+ K, Y& o0 k4 a9 o
   for {
( }2 d& ~5 L/ `" y! m  F           if err := pm.handleMsg(p); err != nil {# |" f5 a/ q# @" P/ r8 @# _
                   p.Log().Debug("Ethereum message handling failed", "err", err)) ], D. b8 D9 r! I( b; F6 t! E/ h" A
                   return err
8 ]+ f& r5 C7 F           }
: R0 s  O0 i) u. [   }% N2 T6 b4 I8 z- n* C; {8 p
}
BitMere.com 比特池塘系信息发布平台,比特池塘仅提供信息存储空间服务。
声明:该文观点仅代表作者本人,本文不代表比特池塘立场,且不构成建议,请谨慎对待。
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

成为第一个吐槽的人

Mohammad61417 小学生
  • 粉丝

    0

  • 关注

    0

  • 主题

    2