Hi 游客

更多精彩,请登录!

比特池塘 区块链技术 正文

深入区块链以太坊源码之p2p通信

Mohammad61417
169 0 0
一、p2p网络中分为有结构和无结构的网络
5 z& h8 C% G8 @: e7 Q无结构化的; U5 i8 `$ Z7 [9 @6 h$ c
这种p2p网络即最普通的,不对结构作特别设计的实现方案。: O: W$ `5 |# B) Z0 a
优点是结构简单易于组建,网络局部区域内个体可任意分布,
) u6 ]% k" @. }反正此时网络结构对此也没有限制;特别是在应对大量新个体加
% o+ }0 p- j- U入网络和旧个体离开网络(“churn”)时它的表现非常稳定。" N8 r$ E0 {5 O* V
缺点在于在该网络中查找数据的效率太低,因为没有预知信息,
! ^' L/ e/ J# u* X( _) D& n$ l所以往往需要将查询请求发遍整个网络(至少大多数个体),
2 P+ {3 s; y2 C" C" z0 Y% b这会占用很大一部分网络资源,并大大拖慢网络中其他业务运行。7 L- Q  f. R7 e2 r& H- }5 j  `
结构化的:& Z. O- s! s! @% n, N) I8 e7 M
这种p2p网络中的个体分布经过精心设计,主要目的是为了提高查询数据的效率,
& C9 u( O5 C' R4 q& s降低查询数据带来的资源消耗。
/ }; z0 h) U/ n' ?8 t$ O9 C以太坊采用了不需要结构化的结构,经过改进的非结构化(比如设计好相邻个体列表peerSet结构)
1 R' r4 p# b% W9 C# O网络模型可以满足需求;7 |  l; ?9 e3 l! F- m8 A+ _/ X
二、分布式hash表(DHT)
9 T5 e: s' d  F) N+ h5 b& n保存数据
3 v' D3 m' X) L2 d3 s6 ](以下只是大致原理,具体的协议实现可能会有差异)8 M# z# O9 x  ~- {5 ]5 a
当某个节点得到了新加入的数据(K/V),它会先计算自己与新数据的 key 之间的“距离”;
! D0 W4 a: o7 X8 D$ H9 `' \- A然后再计算它所知道的其它节点与这个 key 的距离。
1 P0 Q" A9 p. Z如果计算下来,自己与 key 的距离最小,那么这个数据就保持在自己这里。; y* \0 s' n+ F0 m! ~( u
否则的话,把这个数据转发给距离最小的节点。
, e$ u3 Y) c& b0 r, N& j5 f- h收到数据的另一个节点,也采用上述过程进行处理(递归处理)。
' }5 s5 L! x* [' H6 G获取数据  T$ v4 q$ ~2 w! |  b  t, p
(以下只是大致原理,具体的协议实现可能会有差异): x, m1 U- g! F" Y6 N
当某个节点接收到查询数据的请求(key),它会先计算自己与 key 之间的“距离”;+ ?+ h9 y% t; d$ @' v" ^
然后再计算它所知道的其它节点与这个 key 的距离。
0 {5 I' G- X; U3 V& j如果计算下来,自己与 key 的距离最小,那么就在自己这里找有没有 key 对应的 value。6 s% d8 G/ p' k& e# E( O
有的话就返回 value,没有的话就报错。- Y3 G" f1 i& s! h! t% D
否则的话,把这个数据转发给距离最小的节点。% @: ?/ ^8 I" ~
收到数据的另一个节点,也采用上述过程进行处理(递归处理)。) f  B7 u/ W; y, T  o
三、以太坊中p2p通信的管理模块ProtocolManager
5 o9 r  G# M4 j7 E0 F) O5 z# p/geth.go  U9 O& u' @: ]) y% j% k9 f
// Start creates a live P2P node and starts running it.& C4 K3 O* G+ A0 ^! [+ J
func (n *Node) Start() error {0 K( L0 \" J" b' G8 ^2 n
   return n.node.Start()
0 N  A* H1 c  ]" T}
6 m3 L- k; v' x7 u$ \% j/*
) e' W$ }0 |" |, b) ~   Protocol:容纳应用程序所要求的回调函数等.并通过p2p.Server{}在新连接建立后,将其传递给通信对象peer。
( O' I) B( N4 u+ e4 E5 D; B   Node.Start()中首先会创建p2p.Server{},此时Server中的Protocol[]还是空的;
1 e8 U' J0 k- Z2 H4 Q% Y   然后将Node中载入的所有实现体中的Protocol都收集起来,* V: ]4 {6 ~6 U* `2 J
   一并交给Server对象,作为Server.Protocols列表;然后启动Server对象,9 s, R# y) a# ?9 L" o. i! U. |9 L
   并将Server对象作为参数去逐一启动每个实现体。4 F* y* Q1 Z# L
*/0 J" z; q% V8 Q6 U% u
/node.go9 |- h( B" J' j: {  m
// Start create a live P2P node and starts running it.
1 l: Q: R0 l; t' G' t' M" mfunc (n *Node) Start() error {+ N' B& Z. {$ g; _% p% x, T/ b! F
   % T! a: l1 a* C1 o
   ...  H5 c4 W0 m0 ~% v1 ?1 y* b
   /*8 P1 M; W6 w0 _" Y4 k* ^/ Q& p
           ...
( R9 f+ A6 o6 H2 T, W% \9 Y& U           初始化serverConfig
6 p" l) x# P5 b3 Y- s   */3 p( d3 L- v* T2 r7 U' Q& A3 \3 M
   running := &p2p.Server{Config: n.serverConfig}+ d% f+ o  k" e6 K
   ...
4 L9 W9 n8 i% E% r   // Gather the protocols and start the freshly assembled P2P server# I' W$ v2 Z+ r; [4 d" w
   for _, service := range services {
8 C2 @% o4 L# w# t4 d- x           running.Protocols = append(running.Protocols, service.Protocols()...)8 n) K& x. o6 i; }
   }
- c$ W! N# }3 c5 C   if err := running.Start(); err != nil { //见下面的(srv *Server)Start方法' M! U9 I- a0 q7 K+ ]9 t
           return convertFileLockError(err)6 ?2 ]+ D$ E# f* a0 k
   }
* n4 W4 C6 ^  w. Q- ^) _# I   // Start each of the services. y: A$ C6 m4 p& q- e2 w5 i9 `" R
   started := []reflect.Type{}7 I/ W2 G( j. A: {" w8 f
   for kind, service := range services {
/ {( q5 P" i- w# Z! Y           // Start the next service, stopping all previous upon failure
) R& K( F7 ]5 }5 K9 e' S; S  X           //启动每个services通过下面的方法func (s *Ethereum) Start(srvr *p2p.Server) error {
0 e( j* ?9 G# j* S1 X1 w           if err := service.Start(running); err != nil {
( O! K5 N1 ]4 r, l5 J                   for _, kind := range started {
* O  w$ P) {, Y! L2 J- \4 q$ b5 n                           services[kind].Stop()' V/ r: Q$ m) w+ F. v+ f
                   }
$ p" {7 `. g5 j( c1 e                   running.Stop()
( }( w% d0 x% A$ Q                   return err
6 @7 @, a8 [* w' l$ F& w" r: o           }& o, U' N2 F  T; A
           ...
: A/ E, j/ T- Q  r% _1 @5 S   }
" @! k$ F, }& S' ~; P}
& I5 S% s  n" }: H( m// Start starts running the server.+ `6 l% p* p7 H7 e* ]
// Servers can not be re-used after stopping., x0 u& l; k2 u
func (srv *Server) Start() (err error) {
0 P- j8 T  H2 U" o. {   srv.lock.Lock()
) i  g7 @1 C, C   //srv.lock为了避免多线程重复启动
' g$ S# s9 n, X2 u0 v% l4 R4 x   defer srv.lock.Unlock()& s9 f" }6 I$ z
   if srv.running {& C6 t" P* C# T7 i. d3 E, M
           return errors.New("server already running")
: S0 W# L; P' O* K% f   }
! @7 z/ v- V5 D2 K, S* j! D- v* @   srv.running = true# q/ x4 }0 Y& I+ X
   srv.log = srv.Config.Logger' l6 o' D& {8 _4 i3 A2 c
   if srv.log == nil {
; |* K' K$ Z) Q) x; S7 Q7 y1 h           srv.log = log.New()
! a5 ]4 g: x# K( `8 t% ~' p' @   }
$ [  j* L+ t0 O+ ^) y9 }' i( C   if srv.NoDial && srv.ListenAddr == "" {
. C# b, w' N0 c: j$ E2 V* ~' N, x           srv.log.Warn("P2P server will be useless, neither dialing nor listening")
4 X4 G* [6 X; S- g" W( P   }
8 A; b2 T4 S3 K0 y   // static fields
# V. L9 ^1 `- P$ ^$ B   if srv.PrivateKey == nil {/ M) V) Z  ?' ?2 Z9 d
           return fmt.Errorf("Server.PrivateKey must be set to a non-nil key")
2 k* g& T- _3 f7 D" L# W   }/ m# R# D6 B8 J( F2 j
   //newTransport使用了newRLPX使用了rlpx.go中的网络协议。" ]  [4 B9 q  C
   if srv.newTransport == nil {
4 @5 d# @9 t+ I9 o- ^' @- S           srv.newTransport = newRLPX4 E$ S5 |& O( J8 e* W8 J) n8 _
   }' ^& f$ ~3 l8 z. m3 m+ V5 {
   if srv.Dialer == nil {* k. B$ A, X) W4 O  X4 U
           srv.Dialer = TCPDialer{&net.Dialer{Timeout: defaultDialTimeout}}
$ y7 `% a% w- g4 X0 e' e7 s# t, h9 m3 E4 [   }( X' W: @) s7 c) u5 ~
   srv.quit = make(chan struct{})& `$ \) X$ T& M
   srv.addpeer = make(chan *conn)* N  d( b% |5 F; @" }
   srv.delpeer = make(chan peerDrop)" i1 ^+ R% k$ ~5 h( X2 Q
   srv.posthandshake = make(chan *conn): H4 V# L  I7 D* d" N7 v5 N
   srv.addstatic = make(chan *enode.Node)
' H) n. ^; a; R( G$ s3 |   srv.removestatic = make(chan *enode.Node)3 p( v9 H3 k5 g
   srv.addtrusted = make(chan *enode.Node)
  h& _2 [- T1 }$ D- \2 {   srv.removetrusted = make(chan *enode.Node)
( Q# Z# e- }# L0 I  i   srv.peerOp = make(chan peerOpFunc)7 k4 k5 o8 \4 F2 C* m
   srv.peerOpDone = make(chan struct{})$ [! q# \( P8 |; w
   //srv.setupLocalNode()这里主要执行握手
# r# _* z0 G) [( H, D- _3 e/ n  s   if err := srv.setupLocalNode(); err != nil {
% f7 p* q9 o0 z) P& I' a  Y( I           return err  g+ v* h" T6 ~  W2 J
   }
5 p; f! Z9 R+ k* a   if srv.ListenAddr != "" {
7 l5 r- ~0 @* x2 n           //监听TCP端口-->用于业务数据传输,基于RLPx协议)
7 V$ w8 D5 `2 @5 r/ ?% u           //在setupListening中有个go srv.listenLoop()去监听某个端口有无主动发来的IP连接
* a6 ^7 C7 N6 |7 z  f) F; X           if err := srv.setupListening(); err != nil {6 Y, J3 [) u3 l( p( u$ M
                   return err. C. s* h) l7 `2 d) e
           }% F; j/ U) Y" B& O3 T4 Q
   }
% I0 x$ X/ M$ V) J   //侦听UDP端口(用于结点发现内部会启动goroutine)7 ?, D1 A& n9 o( `9 o8 H& o
   if err := srv.setupDiscovery(); err != nil {4 l, Y' O9 j' B
           return err6 O! S( o4 Y2 e
   }
$ _% l/ c. x5 J2 T   dynPeers := srv.maxDialedConns()
) ?3 t+ Z7 m8 G# Z   dialer := newDialState(srv.localnode.ID(), srv.StaticNodes, srv.BootstrapNodes, srv.ntab, dynPeers, srv.NetRestrict)
/ Z7 L! c- n8 I% \' W   srv.loopWG.Add(1)
8 H2 m6 h9 w: Q   // 启动新线程发起TCP连接请求) p1 `9 }/ x! d$ }& N
   //在run()函数中,监听srv.addpeer通道有没有信息如果有远端peer发来连接请求,2 B% W; @# f/ o1 m7 E0 M
   //则调用Server.newPeer()生成新的peer对象,并把Server.Protocols全交给peer。
. V  J$ g/ d" |0 P   /*# x7 E( f5 a& s5 }. g+ z
   case c :=  0 {
: O% Y& `9 F2 c  M8 H           if s.config.LightPeers >= srvr.MaxPeers {
# `6 A8 O/ M, }  N                   return fmt.Errorf("invalid peer config: light peer count (%d) >= total peer count (%d)", s.config.LightPeers, srvr.MaxPeers)
2 S* H1 k0 q1 X7 M0 ]4 p           }
. s9 G0 ~2 o' {* p  J  p           maxPeers -= s.config.LightPeers
0 F+ Y8 g4 f! b3 H3 C" S$ T   }1 g6 l% y3 p+ i9 F1 E. H
   // Start the networking layer and the light server if requested
8 P! m( L5 Q$ |$ U2 C/ h4 E   s.protocolManager.Start(maxPeers)
, ?, r0 ]& i* ]  V( h   if s.lesServer != nil {
) T4 m. Z( ^, J7 }* w5 t- Q0 h           s.lesServer.Start(srvr)
( w) M# |1 W# p5 \+ h8 f% ]. _, C   }5 a* e! Y6 k2 ^8 Z
   return nil
: L$ ^5 a& i6 n! E9 w8 I+ j}
( ]5 D! @( f& }" j/ q. K/eth/handler.go
" z9 V4 `6 g; K" L4 t5 wtype ProtocolManager struct {5 j2 a4 a1 ~. O" e7 ?1 {, b" G
   networkID uint64# L3 l% W  ^" W) L- E( L  X  X
   fastSync  uint32 // Flag whether fast sync is enabled (gets disabled if we already have blocks)
2 ]1 k" K: Z1 {  r   acceptTxs uint32 // Flag whether we're considered synchronised (enables transaction processing)
5 L7 L4 ^, \% p4 p) I   txpool      txPool; a! Y( o9 b# u. r2 c5 _0 }
   blockchain  *core.BlockChain2 P% o, U$ O# Q1 w  y* F, t
   chainconfig *params.ChainConfig
' |& \5 y0 K! B& y6 C7 M- e  ?/ Z8 t   maxPeers    int
$ ~6 Z, l, a" l1 m   //Downloader类型成员负责所有向相邻个体主动发起的同步流程。
0 D- E" P; M0 O   downloader *downloader.Downloader
) H* C# z+ e; H3 ^& @4 f5 n8 i   //Fetcher类型成员累积所有其他个体发送来的有关新数据的宣布消息,并在自身对照后做出安排
" X/ A- i0 S. A. W4 r. [, P   fetcher    *fetcher.Fetcher' X1 W2 Q& T- ]) C) M: s
   //用来缓存相邻个体列表,peer{}表示网络中的一个远端个体。  l4 t3 Z. U3 {# k7 T
   peers      *peerSet
! p" [5 Y9 V3 T2 y9 f. P) D7 N0 M+ C   SubProtocols []p2p.Protocol" n! M! D$ H& F
   eventMux      *event.TypeMux
" P  i0 {$ L0 x$ i8 s   txsCh         chan core.NewTxsEvent; t" m* m2 z1 C, {
   txsSub        event.Subscription: x7 x, f& Y, ~) {' ^$ z- P
   minedBlockSub *event.TypeMuxSubscription
$ H+ l& |/ |6 Z$ i  J   
. M- [; g6 E* a; V5 h0 i4 |8 _   //通过各种通道(chan)和事件订阅(subscription)的方式,接收和发送包括交易和区块在内的数据更新。
% }. o5 `: \/ B& K, p   //当然在应用中,订阅也往往利用通道来实现事件通知。2 L: M% q, A8 R# P6 ?" P, M0 l
   // channels for fetcher, syncer, txsyncLoop5 `& N" O/ k+ X- {2 q( j
   newPeerCh   chan *peer
" _0 X$ i7 g% |3 p5 _2 v4 U   txsyncCh    chan *txsync! B- a- J& ^$ P2 a* ~
   quitSync    chan struct{}
4 C  P3 |& w" g9 G, x, N   noMorePeers chan struct{}' ?- x6 }% \9 Y/ k8 |
   // wait group is used for graceful shutdowns during downloading9 u0 @5 i: @+ \5 d: i; \; v! S
   // and processing
$ j7 o$ g0 L# q' \. s: g# J   wg sync.WaitGroup
% C0 T7 d6 ?" U; x2 o: w; ~}
, W5 Q1 x) b0 U  K" a2 T   Start()函数是ProtocolManager的启动函数,它会在eth.Ethereum.Start()中被主动调用。# X1 ~0 C$ b# B8 \$ d9 b1 r
ProtocolManager.Start()会启用4个单独线程(goroutine,协程)去分别执行4个函数,
- c5 K4 M( W4 l- w这也标志着该以太坊个体p2p通信的全面启动。
( t7 K/ K( ^/ x6 I9 @' efunc (pm *ProtocolManager) Start(maxPeers int) {0 b& ]3 y/ W& o" ^9 d' J4 v5 N
   pm.maxPeers = maxPeers3 q! u# s; f) }& `9 ]+ O
   // broadcast transactions) ?, I) j; P+ t$ T7 r/ O6 i# S
   //广播交易的通道。 txsCh会作为txpool的TxPreEvent订阅通道。
# |3 M! ^. L2 S: o5 Q" W" [   //txpool有了这种消息会通知给这个txsCh。 广播交易的goroutine会把这个消息广播出去。
' R/ J7 h$ J3 @2 h, O1 S" ?   pm.txsCh = make(chan core.NewTxsEvent, txChanSize)$ D1 w5 ~' k# E& m. C+ \  V; l
   //订阅交易信息) f' M" W$ [/ P5 ^# V* \6 S: y
   pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh)
, q) h  F/ N/ M4 A   7 }3 f: M/ J. V( n8 o6 L2 M
   go pm.txBroadcastLoop()
  a* D) Q8 P+ u+ b   //订阅挖矿消息。当新的Block被挖出来的时候会产生消息6 _8 L/ t( z& C
   // broadcast mined blocks
& g# t# Z; y/ K  ]5 I) h- k, t   pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})  r$ k$ S" H- D
   //挖矿广播 goroutine 当挖出来的时候需要尽快的广播到网络上面去。3 }! I' I+ B4 ]+ k+ B
   go pm.minedBroadcastLoop()
' k& ^' A' a3 X+ d   // start sync handlers
, i9 a9 c& r6 m- c# }; q   // 同步器负责周期性地与网络同步,下载散列和块以及处理通知处理程序。$ B4 C4 r" d9 L* ^' T
   go pm.syncer()4 M4 b2 P: K' o2 q0 ~! e% l
   // txsyncLoop负责每个新连接的初始事务同步。 当新的peer出现时,0 D  U) O$ v  Y: M/ _
   // 转发所有当前待处理的事务。为了最小化出口带宽使用,我们一次只发送一个小包。4 U9 r! O; l1 L5 l
   go pm.txsyncLoop()8 ?) P0 C- Z3 k9 ^! O! q
}
9 p) o$ v6 M/ C& e% R0 o//txBroadcastLoop()会在txCh通道的收端持续等待,一旦接收到有关新交易的事件,$ `8 D: _' q: k1 S$ G2 p; `
   //会立即调用BroadcastTx()函数广播给那些尚无该交易对象的相邻个体。# P  @* k4 L+ P: H/ ~
//------------------go pm.txBroadcastLoop()-----------------------* m9 C$ J3 m- b/ n5 w
func (pm *ProtocolManager) txBroadcastLoop() {6 n/ |: C3 x; ?0 A" }
   for {7 [% z6 ^3 D; `
           select {
( e4 o0 f4 X# L3 X8 H% q$ K           case event := = pm.maxPeers && !p.Peer.Info().Network.Trusted {# A  R& ]/ n3 m2 o3 N
           return p2p.DiscTooManyPeers
  q0 C, t2 U* }! g9 O% ~' W   }, ?- @: V' U5 E- [# m! `
   p.Log().Debug("Ethereum peer connected", "name", p.Name())
, c% }5 b/ f. ~" M- q   // Execute the Ethereum handshake4 Q# x( \5 {' f0 H- J" L
   var (- I. G8 ~: u" w9 g9 T& ^" ?7 p! s
           genesis = pm.blockchain.Genesis()
# x6 v. X2 q' h           head    = pm.blockchain.CurrentHeader()
% X- ]' b) ~$ r' b           hash    = head.Hash()
: t9 K% m/ Z- [. N% m/ X* Y2 r2 M           number  = head.Number.Uint64()
$ A, `2 W" Y# j/ S4 W$ x8 l           td      = pm.blockchain.GetTd(hash, number)
$ M9 r( Q5 E9 E4 c4 @   )4 O8 ^  q; k" M, K5 D1 E8 }0 U
   //握手,与对方peer沟通己方的区块链状态; F4 T  S2 A! r" ~( }6 ?8 l, ]% M
   if err := p.Handshake(pm.networkID, td, hash, genesis.Hash()); err != nil {
* T$ d8 E( E+ Z1 K           p.Log().Debug("Ethereum handshake failed", "err", err). R% F" l1 m# Q2 M: k4 M! v
           return err
0 ?7 a, x, R5 k6 Y' ?  e2 T! ~   }
5 c9 R  B4 |/ h2 k2 d1 C   //初始化一个读写通道,用以跟对方peer相互数据传输。
" V( m' m/ w4 K  u/ N8 g   if rw, ok := p.rw.(*meteredMsgReadWriter); ok {
3 n8 m5 c* G6 _9 |/ v1 Z           rw.Init(p.version)4 `0 u" ]- v6 _0 m/ C
   }
% F; b* o4 I  x3 v   // Register the peer locally
0 N& {+ }6 t$ S) v7 q   //注册对方peer,存入己方peer列表;只有handle()函数退出时,才会将这个peer移除出列表。
+ L* M; G- ~4 P% z0 Z   if err := pm.peers.Register(p); err != nil {# o+ i& s  X# V7 F
           p.Log().Error("Ethereum peer registration failed", "err", err)# F& v6 S# _9 U  ?3 G
           return err2 O, N8 R' k; i8 N
   }
/ [! H8 C5 B$ ?   defer pm.removePeer(p.id)0 p( f0 c. D: o" v5 N/ p9 g3 h
   //Downloader成员注册这个新peer;Downloader会自己维护一个相邻peer列表。" \5 F! I+ q' \
   // Register the peer in the downloader. If the downloader considers it banned, we disconnect
! d) z2 c. D. _! A( y   if err := pm.downloader.RegisterPeer(p.id, p.version, p); err != nil {
" P, E: F- U  s; m1 L" Y) G, A           return err/ ^& t: K, Y8 Q+ A7 P: |) N+ k3 M1 N
   }
; Q; H+ H+ E( n/ W0 _- {% r# M   // Propagate existing transactions. new transactions appearing
9 q5 {. B0 ~7 {, W; s+ E7 E   // after this will be sent via broadcasts.: N( ^8 M) ^3 Q2 {. O
   /*6 D0 T: v. H8 ]4 X
   调用syncTransactions(),用当前txpool中新累计的tx对象组装成一个txsync{}对象,& x2 {6 s/ H8 n
   推送到内部通道txsyncCh。还记得Start()启动的四个函数么? 其中第四项txsyncLoop()
! M1 r3 S! z3 I5 {/ w   中用以等待txsync{}数据的通道txsyncCh,正是在这里被推入txsync{}的。
& {' Y# \) l6 j   */
+ [) ?& x1 C9 H+ Y* Y$ ?( i   pm.syncTransactions(p)
2 H" A2 f2 g7 A. x   // If we're DAO hard-fork aware, validate any remote peer with regard to the hard-fork6 @, F& {  h5 V
   if daoBlock := pm.chainconfig.DAOForkBlock; daoBlock != nil {5 J3 d/ d, e% a' W% b
           // Request the peer's DAO fork header for extra-data validation
7 O6 p6 c) X' o) o/ A- E8 J. Y# w           if err := p.RequestHeadersByNumber(daoBlock.Uint64(), 1, 0, false); err != nil {
4 G- P# E/ M7 a                   return err* h' q& ?- F0 g1 i! D9 t
           }
* W9 L. U; H1 i4 y5 K* d7 C& f( X           // Start a timer to disconnect if the peer doesn't reply in time+ L# K& d2 M0 @  v- G8 ]" Z9 R
           p.forkDrop = time.AfterFunc(daoChallengeTimeout, func() {: W' y# N  Q$ c+ f" N7 e& i
                   p.Log().Debug("Timed out DAO fork-check, dropping")4 O1 \; i4 b4 E: a! A( {
                   pm.removePeer(p.id)# ~$ M& B2 [8 z2 J/ C
           })# a6 l  v8 ^" F) x
           // Make sure it's cleaned up if the peer dies off1 _: o4 u& }+ V4 r( q4 A; j6 ^
           defer func() {
1 @6 E& r5 S1 B/ ]5 Y                   if p.forkDrop != nil {
$ g: q. e! j+ T, d5 k. E# J                           p.forkDrop.Stop()
5 `: ^0 D' l+ O7 O1 V                           p.forkDrop = nil
3 @4 }* y1 H2 Q5 s  h2 b) _                   }
2 O0 N; x& |* @           }()
) I7 ]/ [/ `) ^0 _: h+ N   }# v) Q4 h! J0 n2 Y* c; l
           //在无限循环中启动handleMsg(),当对方peer发出任何msg时,
$ Z8 W" V3 l6 b( c( l           //handleMsg()可以捕捉相应类型的消息并在己方进行处理。
! \* s- {2 Y7 ]" i4 k   // main loop. handle incoming messages.# l' q1 P: p8 G) z2 ]' g  z
   for {
* W2 {- Q$ w) j1 W7 l6 u0 W& w           if err := pm.handleMsg(p); err != nil {- o2 i3 C) Y8 k  ]% e' k
                   p.Log().Debug("Ethereum message handling failed", "err", err): O! V5 f( E" ]. v& _
                   return err9 l! S6 `3 s" H' {& o
           }
/ L  Q- Z4 ^! U! Z8 O* s! ~   }1 p9 a2 i4 n! \8 ?
}
BitMere.com 比特池塘系信息发布平台,比特池塘仅提供信息存储空间服务。
声明:该文观点仅代表作者本人,本文不代表比特池塘立场,且不构成建议,请谨慎对待。
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

成为第一个吐槽的人

Mohammad61417 小学生
  • 粉丝

    0

  • 关注

    0

  • 主题

    2