Hi 游客

更多精彩,请登录!

比特池塘 区块链技术 正文

深入区块链以太坊源码之p2p通信

Mohammad61417
105 0 0
一、p2p网络中分为有结构和无结构的网络
6 e; R7 E1 \  p无结构化的$ v( @1 C7 z) T! B( I
这种p2p网络即最普通的,不对结构作特别设计的实现方案。) w* g& ^3 a/ ]$ w1 K6 g6 Y2 \
优点是结构简单易于组建,网络局部区域内个体可任意分布,
5 M, }: ~6 ]+ S4 A; u6 U7 u反正此时网络结构对此也没有限制;特别是在应对大量新个体加8 a2 D. p* V' V) m0 t, v" w
入网络和旧个体离开网络(“churn”)时它的表现非常稳定。
+ w+ u* J4 v  _- f: W8 n缺点在于在该网络中查找数据的效率太低,因为没有预知信息,
2 p8 O* F' B7 Y4 b/ }* G+ n所以往往需要将查询请求发遍整个网络(至少大多数个体),
4 ~+ a0 w1 \$ X% ~这会占用很大一部分网络资源,并大大拖慢网络中其他业务运行。
/ V* m7 h# X0 O6 A1 l. H/ n结构化的:' o/ d, i  H7 C
这种p2p网络中的个体分布经过精心设计,主要目的是为了提高查询数据的效率,
" z' ?3 g! e7 h5 u0 ?+ E* `" D) \" l降低查询数据带来的资源消耗。# S' z) k. a& O  s( P: t
以太坊采用了不需要结构化的结构,经过改进的非结构化(比如设计好相邻个体列表peerSet结构)0 o/ t7 E! k; I8 V1 d4 \" _  ?* P" j
网络模型可以满足需求;
8 B  [$ ~$ n, n4 ?6 C二、分布式hash表(DHT): M- E# l( Y% I1 L; T
保存数据
, J9 x( }2 u! ?3 g. B% ~0 a(以下只是大致原理,具体的协议实现可能会有差异). m# @$ M% x& `, L
当某个节点得到了新加入的数据(K/V),它会先计算自己与新数据的 key 之间的“距离”;0 V# C4 i& z* l$ A* @: W  F) Q
然后再计算它所知道的其它节点与这个 key 的距离。
  @$ Y0 \. @  B0 A* r如果计算下来,自己与 key 的距离最小,那么这个数据就保持在自己这里。9 D, x' ^# e7 S' U
否则的话,把这个数据转发给距离最小的节点。2 ?: v- V9 u+ @5 q
收到数据的另一个节点,也采用上述过程进行处理(递归处理)。( Z8 b" B  Q) X4 M% v2 o
获取数据) y% I: q! I8 i# L* d* K
(以下只是大致原理,具体的协议实现可能会有差异)
9 V6 D, O5 K/ _: N4 H$ g3 n; g# W* P当某个节点接收到查询数据的请求(key),它会先计算自己与 key 之间的“距离”;8 C) \% r/ V1 U
然后再计算它所知道的其它节点与这个 key 的距离。! B! a, `3 \# s6 t
如果计算下来,自己与 key 的距离最小,那么就在自己这里找有没有 key 对应的 value。' s. z% K4 e4 J( u
有的话就返回 value,没有的话就报错。
* A( L0 v& }% w; k+ k否则的话,把这个数据转发给距离最小的节点。
0 g2 W! |) I( Z' o9 O1 ~收到数据的另一个节点,也采用上述过程进行处理(递归处理)。7 t+ X5 t, z. b! `% i) h
三、以太坊中p2p通信的管理模块ProtocolManager
5 K8 L( H$ G+ E# i2 K5 _/geth.go
# k% e5 o! T1 |( O' H// Start creates a live P2P node and starts running it.0 Y6 z2 z" H8 Y5 O' t6 ?; c! `1 h& d
func (n *Node) Start() error {' p7 m- t# o; J  D
   return n.node.Start()
& ?' [. U* \$ A/ z}: C3 w9 e- X. Z) P8 i. o
/*
( z& i0 L6 Q$ Q! D! v/ p4 v7 x   Protocol:容纳应用程序所要求的回调函数等.并通过p2p.Server{}在新连接建立后,将其传递给通信对象peer。8 {! g: j3 ~' A6 W! e; ?9 g3 f
   Node.Start()中首先会创建p2p.Server{},此时Server中的Protocol[]还是空的;7 h7 k+ Q8 n5 P. m" B2 x! h: x. n
   然后将Node中载入的所有实现体中的Protocol都收集起来,
% X  X( P: P+ b! ^   一并交给Server对象,作为Server.Protocols列表;然后启动Server对象," V4 O* e1 ~, p8 C$ z0 }
   并将Server对象作为参数去逐一启动每个实现体。
2 E7 [- K* O1 `- a* d% n6 a  \*/
8 j  G6 A6 ~3 S# t/node.go/ d1 }2 l: v4 r6 b  n; m* z$ h2 W
// Start create a live P2P node and starts running it.% P* H7 q; @; p- M
func (n *Node) Start() error {! t7 B  L$ [! e6 k6 q3 I- q
     t2 t0 k+ f  ^  ?" b- @6 ]# J
   ...
4 ~" |& m2 F- l, E9 X8 D/ s   /*1 m6 }  s5 E  M& J' G" V) a
           ...
0 q7 C9 ^7 X% j) t. A( C           初始化serverConfig7 c2 X5 E# b* M; k9 u8 d9 W+ y
   */
8 w: L3 {  V: }1 Q   running := &p2p.Server{Config: n.serverConfig}8 B8 @1 q, _- V# A1 |2 c+ ?* ~
   ...7 |2 H0 k+ [- ~5 u8 r4 Q+ o
   // Gather the protocols and start the freshly assembled P2P server
9 S& T( v5 a( ?2 H   for _, service := range services {  i" }" q) h3 t- g) t( q
           running.Protocols = append(running.Protocols, service.Protocols()...), k6 F7 |5 V  E/ f
   }
) G/ M* e) }% w! [7 ~0 y8 `/ p   if err := running.Start(); err != nil { //见下面的(srv *Server)Start方法+ l$ t6 |0 A1 }! ^" g6 D
           return convertFileLockError(err)
8 W" w* a  Z+ ]7 O! L# y   }
# J/ s8 |2 Y% X  Z( ~* |3 p   // Start each of the services
4 |8 [; g/ _4 e" z9 b6 C; @3 z" k   started := []reflect.Type{}- L, e$ g2 b) Y! ?% i
   for kind, service := range services {
# F/ O+ ^8 B5 ]           // Start the next service, stopping all previous upon failure
8 c* {2 X) a  U2 H( ?3 E           //启动每个services通过下面的方法func (s *Ethereum) Start(srvr *p2p.Server) error {
' y- b% L0 T! g" B6 c2 `8 K           if err := service.Start(running); err != nil {/ Q% `# ?' k# S
                   for _, kind := range started {
( ]( J1 B; _& G* f1 y; D2 d: U                           services[kind].Stop()
3 B1 C6 E; B: R( [- R$ t                   }
% @1 M- t5 O8 f  r                   running.Stop()! Q: B/ r3 H6 H! l, \9 c& X1 Q
                   return err' x1 V5 h8 Q% Q
           }
& i7 P8 B. C9 A) I/ L  ?           ...
) G9 q+ R' J8 |# c7 i   }
0 W' f" D1 e- s9 L' U1 I0 q& d}; Z  b- y% W* g
// Start starts running the server.0 M1 ?8 P, W7 G$ x# b1 L# E7 ?( |
// Servers can not be re-used after stopping.7 e+ N0 u5 C8 U) D+ E" ^
func (srv *Server) Start() (err error) {
2 ]6 z) L. m9 [" [6 m1 X! E   srv.lock.Lock()
+ e/ \) t* F6 z) d   //srv.lock为了避免多线程重复启动: X# @: ^0 E2 @4 G
   defer srv.lock.Unlock()
! @0 h6 y1 ~0 h/ T( y' D, k   if srv.running {
# u, p9 ~' k3 ]  j5 V0 [& v           return errors.New("server already running")
0 L6 V' [4 w! k: d   }! Z& q1 e! ]1 w/ r0 c  f- J' w
   srv.running = true
) C' ]' Z5 r( G: U7 N, [   srv.log = srv.Config.Logger
% D* a7 p0 k% I9 r" G4 m* P   if srv.log == nil {$ O, r( |6 `& }5 `' j5 ?% [9 s
           srv.log = log.New()
6 ^; a: d6 }7 G* K; v% I- z   }
; x1 U& C) M$ b% I- j* I$ a5 u4 \   if srv.NoDial && srv.ListenAddr == "" {3 n6 m, Q" n4 Y& S6 U$ k
           srv.log.Warn("P2P server will be useless, neither dialing nor listening")
, Y7 c2 ~& K! Z: Y  t   }3 A! |1 i- O, e/ e, j
   // static fields+ A1 S! g% s1 G! v! I; _
   if srv.PrivateKey == nil {+ u1 g8 f4 o6 Y! P3 k
           return fmt.Errorf("Server.PrivateKey must be set to a non-nil key")
. O# a+ |  Z1 O. `, o   }* m$ f( {. v& |; v
   //newTransport使用了newRLPX使用了rlpx.go中的网络协议。
  N' K) y6 b, G/ C   if srv.newTransport == nil {
4 B& Y3 B0 M, x# d" a8 v% V           srv.newTransport = newRLPX
" @* d! A: p- u6 i6 G: D   }
; f+ y6 M) K8 g& h6 N: D  ^0 \# i# J   if srv.Dialer == nil {
) O" G$ `8 g% ?           srv.Dialer = TCPDialer{&net.Dialer{Timeout: defaultDialTimeout}}0 r0 J, a, y* s/ Y- ]+ D
   }& {) H" @; I2 i, X5 w" d. @+ p
   srv.quit = make(chan struct{})6 N3 q5 X4 A) |, i+ L3 B
   srv.addpeer = make(chan *conn)
7 N# ]" v) `& ~& N   srv.delpeer = make(chan peerDrop)7 h% |+ m1 U% ~* Q2 c& o7 |3 Z
   srv.posthandshake = make(chan *conn)
  L% V: X2 ~( V+ X! k   srv.addstatic = make(chan *enode.Node)1 k2 v4 y! t+ H& L7 V! V9 J& d
   srv.removestatic = make(chan *enode.Node)
9 d, Z* T& G0 Q" O. R1 `8 x   srv.addtrusted = make(chan *enode.Node)0 E3 p( D' S% h8 S2 A* d7 q, a& K9 @
   srv.removetrusted = make(chan *enode.Node)
5 W  V3 k1 [) K   srv.peerOp = make(chan peerOpFunc)
( k( O( {& d) t+ U6 W% O   srv.peerOpDone = make(chan struct{}): K4 {( D. I0 g% r
   //srv.setupLocalNode()这里主要执行握手5 p- Y, k/ s, i0 |: ^) @7 [4 C+ I
   if err := srv.setupLocalNode(); err != nil {
- j- k8 G: n4 y& ]           return err2 w5 B  n% o+ y0 n$ h4 n% v* J+ q# P
   }6 a! G% N+ S% [9 e1 [. ^( `
   if srv.ListenAddr != "" {
2 l$ _8 R0 C: p2 K, [2 h3 Y/ Z# `           //监听TCP端口-->用于业务数据传输,基于RLPx协议)
. `% m9 |" k$ O# {. P0 N  z4 ~           //在setupListening中有个go srv.listenLoop()去监听某个端口有无主动发来的IP连接. K) z7 s4 d5 q7 d( \% D$ M# s- `
           if err := srv.setupListening(); err != nil {
$ p9 G! V- K1 f  z; D& V                   return err" ?. p2 r8 e' r! l4 y6 W, u6 Q7 u  Q
           }
; ]5 V9 T4 D7 R7 [1 I% f, D   }
1 e: J, M, Q4 x* ~   //侦听UDP端口(用于结点发现内部会启动goroutine)3 Q% R  C9 [- a0 L/ Y
   if err := srv.setupDiscovery(); err != nil {: L) \! f# {1 Y3 K
           return err
  j& @, |( H3 t* Y   }
: B7 Z0 Q; _7 \' H0 z" A/ t3 W   dynPeers := srv.maxDialedConns()% n7 f& o$ M' ?0 ~& H+ E. `0 A7 p1 j
   dialer := newDialState(srv.localnode.ID(), srv.StaticNodes, srv.BootstrapNodes, srv.ntab, dynPeers, srv.NetRestrict)) G1 U; L  \1 q( O2 U( P. H: t
   srv.loopWG.Add(1)- f, X4 X& m# G" M
   // 启动新线程发起TCP连接请求
0 B* m! d! s) ?/ [8 w   //在run()函数中,监听srv.addpeer通道有没有信息如果有远端peer发来连接请求,
  n. @/ ]: `$ O$ @   //则调用Server.newPeer()生成新的peer对象,并把Server.Protocols全交给peer。
: a, b- z- N/ ]   /*
5 \! [: ^" [. C9 {- l   case c :=  0 {! Q3 i# S; p$ P, l
           if s.config.LightPeers >= srvr.MaxPeers {
4 _+ f: ~2 ?% T3 S( e2 i                   return fmt.Errorf("invalid peer config: light peer count (%d) >= total peer count (%d)", s.config.LightPeers, srvr.MaxPeers)
$ W/ g$ v7 k+ w           }# r- D' W& X* i$ h3 R, _; |
           maxPeers -= s.config.LightPeers
* A$ @9 Y  M  L  l* w   }
4 {4 p9 `0 N8 @; O   // Start the networking layer and the light server if requested7 p4 p5 ]& Y& r9 ^! j
   s.protocolManager.Start(maxPeers)6 ?3 V9 X2 x7 w- y% n) x
   if s.lesServer != nil {$ t: c; y1 S- Y4 R. M' Z' P) `
           s.lesServer.Start(srvr)
. J2 R$ b( T( e( Z. ^   }- V6 ?/ C3 J% N* a0 w1 C& z' \
   return nil
( D8 ^) ~( Q4 ^& G. M) X}) z' E3 B( b; M" L3 T
/eth/handler.go
& P' J; y4 W3 i8 rtype ProtocolManager struct {
7 g5 g  s+ `. A8 Y/ Q! O% ?   networkID uint648 u0 B9 H& v# D, h
   fastSync  uint32 // Flag whether fast sync is enabled (gets disabled if we already have blocks)
+ X8 L; W% f- `: _   acceptTxs uint32 // Flag whether we're considered synchronised (enables transaction processing)* V, O: f# {3 m; G* q
   txpool      txPool( T, S8 Z- h, V* u6 @$ l0 y
   blockchain  *core.BlockChain! p# ~3 \. ~: f# J# w
   chainconfig *params.ChainConfig
* ^7 b& x+ z5 L# E0 e" T# B   maxPeers    int% D5 x7 O. ]/ c* X9 d2 a
   //Downloader类型成员负责所有向相邻个体主动发起的同步流程。
, G- `- n' W3 p. a) o5 h( c. |   downloader *downloader.Downloader# d$ I8 `. O6 ~- R# q: [: y5 p7 T
   //Fetcher类型成员累积所有其他个体发送来的有关新数据的宣布消息,并在自身对照后做出安排
0 ^5 S  m& h3 J5 R( H   fetcher    *fetcher.Fetcher& |* M) W. J0 V9 @2 c
   //用来缓存相邻个体列表,peer{}表示网络中的一个远端个体。
' @7 `: x& r2 k1 |1 c   peers      *peerSet
2 P2 e0 O/ B# L- F7 y) e" b% m- c   SubProtocols []p2p.Protocol7 O8 W" t9 u. ], a& b
   eventMux      *event.TypeMux9 t  z! y0 p9 _" L/ P
   txsCh         chan core.NewTxsEvent
5 l' ?: V+ E9 t   txsSub        event.Subscription
" d6 g7 \/ @  e& V4 h! X2 q/ R: w   minedBlockSub *event.TypeMuxSubscription
9 y# u2 R/ U. Z1 m   # Q& N( {+ y+ A8 }& t) X$ H
   //通过各种通道(chan)和事件订阅(subscription)的方式,接收和发送包括交易和区块在内的数据更新。
" F! i3 u6 v$ t& R$ s. \7 C   //当然在应用中,订阅也往往利用通道来实现事件通知。
2 e1 H8 c7 x' H4 n   // channels for fetcher, syncer, txsyncLoop/ h% \1 T, u: l) z/ c
   newPeerCh   chan *peer
: J, ^  ]# u, E7 \( m   txsyncCh    chan *txsync
( n7 U* _/ @5 r/ j   quitSync    chan struct{}" K, t! t' D! V9 S; d
   noMorePeers chan struct{}8 O* B$ o$ O# ~& p" X5 a
   // wait group is used for graceful shutdowns during downloading
7 z/ R; A) A9 V! B. ]   // and processing# P; X6 {+ t$ W2 L+ w
   wg sync.WaitGroup
$ b8 O* V9 v9 |6 V0 ]& f6 `}
$ x' F# o1 L9 z; S* K   Start()函数是ProtocolManager的启动函数,它会在eth.Ethereum.Start()中被主动调用。/ l; i+ \( L. T8 S0 K' B2 |6 f
ProtocolManager.Start()会启用4个单独线程(goroutine,协程)去分别执行4个函数,
5 t$ k) H2 l7 u+ ?这也标志着该以太坊个体p2p通信的全面启动。
9 t6 i0 w2 x7 C) s( O' X2 nfunc (pm *ProtocolManager) Start(maxPeers int) {
  ~1 S4 u4 Z, n3 V7 N1 N   pm.maxPeers = maxPeers
9 e  w5 ?( o, K3 h   // broadcast transactions
! q; P1 B# c2 n8 [" e# M! ?$ O- E   //广播交易的通道。 txsCh会作为txpool的TxPreEvent订阅通道。7 p8 T0 R" G, I8 E+ B5 m/ N
   //txpool有了这种消息会通知给这个txsCh。 广播交易的goroutine会把这个消息广播出去。- {: y; g) S8 j1 _/ r- u+ F+ V+ G& J
   pm.txsCh = make(chan core.NewTxsEvent, txChanSize)2 v2 @# Z0 S  i* c/ M
   //订阅交易信息
8 q6 L+ P" y; a  x, b9 Q   pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh)
( h% K& }4 e' Y, U   
% V6 _" J) G( k7 }2 j   go pm.txBroadcastLoop()
) y7 R! |3 l% j# K- e4 q9 h+ V1 y   //订阅挖矿消息。当新的Block被挖出来的时候会产生消息7 }0 `/ _" p, g
   // broadcast mined blocks
! \6 n* r+ h; i   pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
3 r4 t; q2 V2 y" X   //挖矿广播 goroutine 当挖出来的时候需要尽快的广播到网络上面去。5 M3 G( H; T* A- }
   go pm.minedBroadcastLoop()# p/ Z1 U/ U) v4 b
   // start sync handlers
/ Q% w* {4 |- w   // 同步器负责周期性地与网络同步,下载散列和块以及处理通知处理程序。
- ]9 Q( }# S" t6 z2 |0 G! }   go pm.syncer()3 G$ w7 u1 ?9 f. a- X" L" _
   // txsyncLoop负责每个新连接的初始事务同步。 当新的peer出现时,
& V) I6 w! ?$ F/ s6 |+ E& o# s   // 转发所有当前待处理的事务。为了最小化出口带宽使用,我们一次只发送一个小包。
$ h- G9 M4 N5 E! l   go pm.txsyncLoop()
% @  Q$ E' b: H/ v}
# Q* b- ?8 g- w5 w; s! d5 s+ [//txBroadcastLoop()会在txCh通道的收端持续等待,一旦接收到有关新交易的事件,! E* S, J. O$ K0 o+ _+ }) C7 z
   //会立即调用BroadcastTx()函数广播给那些尚无该交易对象的相邻个体。
  c; k1 a& o1 d! e//------------------go pm.txBroadcastLoop()-----------------------
% {3 c: d9 \! G$ Efunc (pm *ProtocolManager) txBroadcastLoop() {
! r# h- O( D) C" l! W- _   for {" c) S4 k! x3 F5 Z# j6 [  y. J
           select {
! _6 h7 F4 x. \           case event := = pm.maxPeers && !p.Peer.Info().Network.Trusted {5 U1 Z; \" Q. h1 [$ C
           return p2p.DiscTooManyPeers
, N; o, `& y5 F- a' z& v( d9 a8 s. N   }" s, Y1 x0 m  L# i# k! S
   p.Log().Debug("Ethereum peer connected", "name", p.Name())2 T, `4 k* d" }8 @
   // Execute the Ethereum handshake
0 s. E0 y) H  x* N   var (
; T1 ?9 e- h! P0 j' A( B           genesis = pm.blockchain.Genesis()
: ^) K! Q* A! E1 _  i" E           head    = pm.blockchain.CurrentHeader()
: D  K8 L- H6 S; b+ h+ K. d6 C           hash    = head.Hash()% u6 A: P) B0 `' k
           number  = head.Number.Uint64()
0 s; F6 J! h; h2 M           td      = pm.blockchain.GetTd(hash, number); o0 ]$ M0 B' z* c
   )$ Y5 }+ z6 o- u0 o4 W8 V
   //握手,与对方peer沟通己方的区块链状态. ?9 `3 ^6 e$ i5 i" y3 ^% l9 r
   if err := p.Handshake(pm.networkID, td, hash, genesis.Hash()); err != nil {( v1 r9 Y& m% h- G; |
           p.Log().Debug("Ethereum handshake failed", "err", err)
( k* y$ U: s4 C, m' t           return err- u+ C, F% p" k; h, I4 J& X
   }& _: n9 k6 M* C* B" l" Q
   //初始化一个读写通道,用以跟对方peer相互数据传输。& i; Y9 O% ?+ Y8 R
   if rw, ok := p.rw.(*meteredMsgReadWriter); ok {
$ o6 N' H) O0 Z, L' G* s; M1 V           rw.Init(p.version)& i" H3 a/ ]/ j6 @) s9 m
   }' h% T; W  ^( |7 U  K8 l
   // Register the peer locally6 I) l( l: p+ _9 `: J8 o: w
   //注册对方peer,存入己方peer列表;只有handle()函数退出时,才会将这个peer移除出列表。
) p2 i4 w" X7 K   if err := pm.peers.Register(p); err != nil {
+ T1 L# x8 m' q           p.Log().Error("Ethereum peer registration failed", "err", err)* H% j) [0 @8 K
           return err2 J; E7 X& G5 s  }
   }
5 B4 b0 f, @7 Z6 h, _: q   defer pm.removePeer(p.id)- E5 u- P6 j/ |" `- m8 _
   //Downloader成员注册这个新peer;Downloader会自己维护一个相邻peer列表。. p8 H" b" ^) A3 l
   // Register the peer in the downloader. If the downloader considers it banned, we disconnect' q' |4 m, _# T! e
   if err := pm.downloader.RegisterPeer(p.id, p.version, p); err != nil {2 F! I/ Z) p" b1 s3 A5 K1 m
           return err
7 Q4 c. ~+ X2 O- M  s$ ^   }
# K  a* Z! h2 G2 f& u   // Propagate existing transactions. new transactions appearing
/ F. I6 S6 Z/ W8 j6 T9 V: L# A. O   // after this will be sent via broadcasts.
4 ^7 I% u8 [0 ~# p   /*
% w, ~5 G% V+ T6 k+ o   调用syncTransactions(),用当前txpool中新累计的tx对象组装成一个txsync{}对象,0 c2 X3 s* L. g7 P0 H$ a  U
   推送到内部通道txsyncCh。还记得Start()启动的四个函数么? 其中第四项txsyncLoop(), x  C6 v4 T# K/ a) X+ g: `
   中用以等待txsync{}数据的通道txsyncCh,正是在这里被推入txsync{}的。2 q' R& W0 j! i, t/ U6 z8 l5 U; a
   */
" I% {% t/ K, U, b- H2 E   pm.syncTransactions(p)
) a6 \; q4 _3 M1 q' I3 p   // If we're DAO hard-fork aware, validate any remote peer with regard to the hard-fork
$ E7 j6 M8 v: i   if daoBlock := pm.chainconfig.DAOForkBlock; daoBlock != nil {
+ _: v+ E/ v% t7 i6 ]+ X           // Request the peer's DAO fork header for extra-data validation
6 }; k, U5 ~/ `% g) P9 K" J' q2 ]           if err := p.RequestHeadersByNumber(daoBlock.Uint64(), 1, 0, false); err != nil {+ J$ c" Y3 `* y/ ]
                   return err
' R+ a4 J8 n& J- x# e           }
( i5 K1 S0 c7 Z3 Q; n, L           // Start a timer to disconnect if the peer doesn't reply in time
, P2 s; ]) [# M8 N           p.forkDrop = time.AfterFunc(daoChallengeTimeout, func() {! E% C6 u4 }: u" s, H* `; ]
                   p.Log().Debug("Timed out DAO fork-check, dropping")  }; m' {/ S! j) Q4 g
                   pm.removePeer(p.id)" \4 I6 X2 }- y3 C% g- x
           })0 ^* ^9 N- e" h4 J& D& V# ^) E
           // Make sure it's cleaned up if the peer dies off0 k7 G" P5 {/ v* n; J0 h$ ], @4 D$ ?
           defer func() {
! z9 c: F, J6 E! K                   if p.forkDrop != nil {
6 Y2 c& s7 U& w& P" }$ C# {                           p.forkDrop.Stop()+ ^7 h( `- J$ I, ^0 [" l+ P! O
                           p.forkDrop = nil$ B% [# D) L: m  s. I" \  z. v
                   }
8 M4 b& o* M' `& M# O3 C           }()
6 o# t2 [0 e5 O! V2 _% r   }1 Y. c  z1 Y0 ?8 f* I  Y
           //在无限循环中启动handleMsg(),当对方peer发出任何msg时,1 T( S8 i1 h# N+ @
           //handleMsg()可以捕捉相应类型的消息并在己方进行处理。5 R9 X3 @$ ?! v8 z' H" Y4 m+ n
   // main loop. handle incoming messages.4 s- N. g6 g: |- a8 W$ O0 C; f
   for {
: f5 [" Y$ b% a# g           if err := pm.handleMsg(p); err != nil {2 L: f1 g" ?5 d6 ~# y5 ~
                   p.Log().Debug("Ethereum message handling failed", "err", err)
2 g8 V  T! _7 v: p4 O                   return err
9 l  F9 _/ x1 R# Z- k3 e4 O: P           }6 S& L, @5 R+ e3 e/ o1 m* M" A. j
   }3 Y) D$ M, w. w3 K
}
BitMere.com 比特池塘系信息发布平台,比特池塘仅提供信息存储空间服务。
声明:该文观点仅代表作者本人,本文不代表比特池塘立场,且不构成建议,请谨慎对待。
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

成为第一个吐槽的人

Mohammad61417 小学生
  • 粉丝

    0

  • 关注

    0

  • 主题

    2