Hi 游客

更多精彩,请登录!

比特池塘 区块链技术 正文

深入区块链以太坊源码之p2p通信

Mohammad61417
279 0 0
一、p2p网络中分为有结构和无结构的网络2 a  o# ~: V, B. C2 W9 j9 q
无结构化的4 D* g; c  z* Y  F/ h. Z& P
这种p2p网络即最普通的,不对结构作特别设计的实现方案。- Y% W2 m; U8 b0 n: e7 U
优点是结构简单易于组建,网络局部区域内个体可任意分布,
/ \& W0 R- K5 ?  @' N0 J反正此时网络结构对此也没有限制;特别是在应对大量新个体加& B. ~+ n$ A7 g: c6 T. Y1 D
入网络和旧个体离开网络(“churn”)时它的表现非常稳定。8 B; C' {, C+ Y  c
缺点在于在该网络中查找数据的效率太低,因为没有预知信息,- O- k) x! Q" M  v1 q  L
所以往往需要将查询请求发遍整个网络(至少大多数个体),
) Y1 A0 {% w" o3 j$ \这会占用很大一部分网络资源,并大大拖慢网络中其他业务运行。
% R# Q' v- i" G- q! C- _) {, F7 S结构化的:
' Z" y# F% {7 O7 z" M9 x这种p2p网络中的个体分布经过精心设计,主要目的是为了提高查询数据的效率,
( h8 T! V1 x# w2 L- ^) g# O% Z降低查询数据带来的资源消耗。% |, x% h8 q& X  T) k: r+ I
以太坊采用了不需要结构化的结构,经过改进的非结构化(比如设计好相邻个体列表peerSet结构)
: k) F4 Q1 P& k: ?1 E1 {, B9 Y; x网络模型可以满足需求;5 Q3 c( i: X: Z5 o8 O' t! r& U
二、分布式hash表(DHT)
1 d$ Q! |! K3 y) H& i% r保存数据
! x! C$ _" C& w; ]; ](以下只是大致原理,具体的协议实现可能会有差异)
1 P2 A& @/ G+ a9 Y. |当某个节点得到了新加入的数据(K/V),它会先计算自己与新数据的 key 之间的“距离”;
: N6 C) E* E8 [然后再计算它所知道的其它节点与这个 key 的距离。+ Q7 O. Z* q. X: O, D& t
如果计算下来,自己与 key 的距离最小,那么这个数据就保持在自己这里。/ a! N7 o  b! |* z3 x9 G6 ~
否则的话,把这个数据转发给距离最小的节点。
* i/ P4 u. M/ L, r& P0 O0 P5 Y收到数据的另一个节点,也采用上述过程进行处理(递归处理)。3 I& Y% b- x- b. N5 U. q! r
获取数据
! O* N; U& |6 o: E8 p. i: J" m2 B( f(以下只是大致原理,具体的协议实现可能会有差异)& t: v: P" B5 m( [* @
当某个节点接收到查询数据的请求(key),它会先计算自己与 key 之间的“距离”;
! ~4 l! W! @+ K1 m; @6 [# o然后再计算它所知道的其它节点与这个 key 的距离。& f7 \5 D& N% C1 i$ ]/ j
如果计算下来,自己与 key 的距离最小,那么就在自己这里找有没有 key 对应的 value。
+ ?8 G/ }5 Q- j) t( V; p2 y$ G有的话就返回 value,没有的话就报错。
, B+ B8 I1 m) v. E! M7 D: x否则的话,把这个数据转发给距离最小的节点。$ a+ Q) k/ e& u$ B3 C7 Q
收到数据的另一个节点,也采用上述过程进行处理(递归处理)。
$ E+ E3 H  }; p; C2 e三、以太坊中p2p通信的管理模块ProtocolManager
# g/ |% z2 _9 v8 ~) V$ f3 h/geth.go
4 Q5 |5 _& U8 ~- W4 c& ^// Start creates a live P2P node and starts running it.* B  A9 T5 C$ m8 M- d
func (n *Node) Start() error {
& @+ c: v* A) I7 g" P* R3 r. n   return n.node.Start()4 Y+ b: o7 ~5 u
}
. w+ [  T8 V" Q! T% e& a% S$ Q/*
+ ^1 Y: ]5 h# p0 E   Protocol:容纳应用程序所要求的回调函数等.并通过p2p.Server{}在新连接建立后,将其传递给通信对象peer。9 t$ s' W" `: {. v
   Node.Start()中首先会创建p2p.Server{},此时Server中的Protocol[]还是空的;
4 E, C# D4 J1 B! t. Q   然后将Node中载入的所有实现体中的Protocol都收集起来,
  Q9 b! {& [3 \2 U& u  ?( Q   一并交给Server对象,作为Server.Protocols列表;然后启动Server对象,/ M3 n8 R) m( Q- @/ \
   并将Server对象作为参数去逐一启动每个实现体。
6 E9 J. i) Y, _3 B9 ^*/( z( T9 r. y/ |: z# V
/node.go
% H& A* x* E4 y) G$ C' ?& M// Start create a live P2P node and starts running it.  c7 V3 w" ?0 r' y: `
func (n *Node) Start() error {; k4 {6 F5 C0 O6 E) c9 p: u0 f5 x. C
   * g2 i# ~& e9 Q% [9 k
   ...$ C" P7 t6 [+ D! y/ G
   /*1 q; s, Z) i( x7 m2 W0 N$ e# r
           ...
2 T) e0 [9 Z+ u7 k6 z           初始化serverConfig
( Y! A; I* j5 p& f4 K   */( x7 {/ F; D9 k
   running := &p2p.Server{Config: n.serverConfig}6 `: O5 {+ X$ b% P0 l8 Z) |
   ...
4 l* v9 M! F* G- I6 ]* v& V   // Gather the protocols and start the freshly assembled P2P server& O- E1 f& n) C1 h5 H9 s+ b
   for _, service := range services {/ N; C- C- g% ]2 F6 X7 |+ y
           running.Protocols = append(running.Protocols, service.Protocols()...)6 ?3 i* G: ]; b+ h/ \
   }; Q0 Y2 |. v/ l4 m/ P6 y6 D
   if err := running.Start(); err != nil { //见下面的(srv *Server)Start方法
9 n" y4 \7 e, A; G           return convertFileLockError(err)
4 B) o7 y; w" \# w! g   }4 G& \% W( x7 r  Z
   // Start each of the services
' n1 |$ @% f  u6 e   started := []reflect.Type{}+ {# k1 Y! Z5 N8 _( v
   for kind, service := range services {7 ~0 [* E. P# v! M5 ]& l
           // Start the next service, stopping all previous upon failure
" Q& p) p0 `- w2 J           //启动每个services通过下面的方法func (s *Ethereum) Start(srvr *p2p.Server) error {
" p$ v, w! V% Z/ z. s8 q           if err := service.Start(running); err != nil {" z8 w. F  s; y. G, L
                   for _, kind := range started {% j6 p, l7 ^+ k, J5 p
                           services[kind].Stop()5 S& h5 m9 r. O) U2 ~; Z) N
                   }! f# {$ H9 e& ]1 V* a7 r7 O) j6 K/ S
                   running.Stop()
& F1 s" E. P! b; @" X- N                   return err7 Y9 p% `  b$ n( \2 K7 d
           }
8 {+ n5 P' u. l           ...
4 j# ?# h( V- z5 p5 S9 R  }7 o   }8 ]+ b, Y% z& P0 M& x- K
}
8 ]5 @) z5 |: ?  W// Start starts running the server.
. u# V4 n& r2 Z2 _' `' n) m3 d" ^8 t// Servers can not be re-used after stopping.
% W1 y$ z& Y- o2 M0 F8 \func (srv *Server) Start() (err error) {
5 W* Y& z- B( i   srv.lock.Lock()
/ `0 _) E5 b) w! @- m2 F  Z   //srv.lock为了避免多线程重复启动9 O2 {. G5 A% X0 p. q: C" O
   defer srv.lock.Unlock()0 A" |8 q$ C/ p% _& E6 _1 N' l
   if srv.running {0 _  R, c# i! G; G- g  p' k/ G; A
           return errors.New("server already running")" j3 ]- o; D- h) Z% @  D
   }
- T2 k- q( {4 I* A, [   srv.running = true
( H% h8 [" j, L; g( k$ L) B   srv.log = srv.Config.Logger
5 B, S+ R3 l: b, I3 M   if srv.log == nil {
( c1 L, K7 _, b0 ~9 G' x! k           srv.log = log.New(); T& s0 S) p' K) C3 t& L# j
   }
( P6 r2 w3 B+ U2 O8 [7 }2 v; m   if srv.NoDial && srv.ListenAddr == "" {+ q4 i- B0 W. w) t' n* \; b2 M
           srv.log.Warn("P2P server will be useless, neither dialing nor listening")& _, A9 X3 Y/ L
   }
' J' [/ L( u/ T- Y( ^, j   // static fields
! o3 D0 r+ l( O! I" l! U  ?( `   if srv.PrivateKey == nil {
+ p' }7 a1 w" e2 e           return fmt.Errorf("Server.PrivateKey must be set to a non-nil key"): t) A+ C9 {% n9 C
   }. z& ]) F3 g/ |# w0 u
   //newTransport使用了newRLPX使用了rlpx.go中的网络协议。
2 J: B) [5 K; N! L- \   if srv.newTransport == nil {
8 |% K! E  J6 Q! a# p           srv.newTransport = newRLPX; ^$ h5 c6 m/ p, c3 S
   }
& U$ M2 J* h0 k0 J" g! P: K   if srv.Dialer == nil {
) U4 m+ V( G& ]; t           srv.Dialer = TCPDialer{&net.Dialer{Timeout: defaultDialTimeout}}! n: p2 f0 ~" t$ o, ]
   }
# E! n' D( r" E5 Y4 B   srv.quit = make(chan struct{})) @! i4 ^1 j$ k$ n- ]
   srv.addpeer = make(chan *conn)
4 r7 Z7 p- p7 \+ s* C5 E3 f6 S   srv.delpeer = make(chan peerDrop)6 B' t4 ?5 |) j! x  ]- r( F$ N
   srv.posthandshake = make(chan *conn)
& s6 \2 j6 \! C8 R: P# [- k2 `   srv.addstatic = make(chan *enode.Node)6 }4 A1 W+ W/ M+ V& l7 g" s
   srv.removestatic = make(chan *enode.Node)- z7 U+ C/ j* }6 q' Y
   srv.addtrusted = make(chan *enode.Node)
! G( b0 K  f; y7 s   srv.removetrusted = make(chan *enode.Node)
+ p) ~$ J5 Q2 z+ O$ O1 a7 `4 V   srv.peerOp = make(chan peerOpFunc)# }  i) c& j  J* M% c& _9 r8 r
   srv.peerOpDone = make(chan struct{})
3 f5 o/ Z  ~; F6 n3 u! _   //srv.setupLocalNode()这里主要执行握手8 O; }' O* `4 u' ~  Q0 G. @! i: v
   if err := srv.setupLocalNode(); err != nil {
" o8 P+ O5 I% j+ @0 B1 B           return err
, v' H& a* I+ n- O   }
- J  ]. p7 V& i) R& V3 `# T/ B   if srv.ListenAddr != "" {; n$ u& e1 b* m$ Y9 m9 d0 O
           //监听TCP端口-->用于业务数据传输,基于RLPx协议)
4 T. @0 v$ l. j7 l           //在setupListening中有个go srv.listenLoop()去监听某个端口有无主动发来的IP连接
' ]6 q5 T. E7 |& S0 A           if err := srv.setupListening(); err != nil {
, ?) B  x# q: q1 A$ E                   return err
: k& t6 I# [/ N! h* x& X           }
; |4 r" _) I$ Q, s   }0 H8 j7 l8 |; |$ M1 |$ ~, N
   //侦听UDP端口(用于结点发现内部会启动goroutine)$ e  I4 ]* V/ x% C- X
   if err := srv.setupDiscovery(); err != nil {
" ^" Y4 N; N- d: x, F           return err
1 W3 P0 M* Z4 j! ~9 o   }
) E6 A& ^" D, [( {! M   dynPeers := srv.maxDialedConns()
' |/ A7 u9 @: ^. j   dialer := newDialState(srv.localnode.ID(), srv.StaticNodes, srv.BootstrapNodes, srv.ntab, dynPeers, srv.NetRestrict)
* n; f) X  ~# u+ r& r! l$ Q+ l   srv.loopWG.Add(1)
; n9 {5 Z9 G& E7 R" `3 R   // 启动新线程发起TCP连接请求+ P) X- v8 q. Q( e
   //在run()函数中,监听srv.addpeer通道有没有信息如果有远端peer发来连接请求,1 V1 Z; ]# \5 e# }9 P+ d
   //则调用Server.newPeer()生成新的peer对象,并把Server.Protocols全交给peer。
6 \2 t' `" f  o) i+ C( m   /*. s; y) R& S5 i$ Q" B
   case c :=  0 {
6 P; `7 Z- E, t           if s.config.LightPeers >= srvr.MaxPeers {' S# b$ D8 p9 v! h2 \: x* ^
                   return fmt.Errorf("invalid peer config: light peer count (%d) >= total peer count (%d)", s.config.LightPeers, srvr.MaxPeers)7 c6 W% |; N' X' v( A
           }
" ^: s, {$ ^$ Q- e0 I           maxPeers -= s.config.LightPeers. Z! t! A9 l6 o4 V2 C
   }
! v0 v) p; ]- i" S' w& @$ {   // Start the networking layer and the light server if requested
- {  B+ _1 v. |! _  d3 Y9 w   s.protocolManager.Start(maxPeers)
9 Y. E4 I. t) M   if s.lesServer != nil {
6 L8 Z( \+ h, q2 u8 n           s.lesServer.Start(srvr)) j! A* G: l6 x4 p! y
   }
8 L1 E9 j6 I4 t6 k* `   return nil+ V" ^( ^) `5 {; V( ]' K
}
5 _8 t+ w8 A- v8 @0 ]/eth/handler.go" n+ v0 W  a' b9 M
type ProtocolManager struct {  b. Q( t1 t) Z# x$ ^, Z
   networkID uint64/ l/ r( g6 R9 \* L0 g
   fastSync  uint32 // Flag whether fast sync is enabled (gets disabled if we already have blocks)$ i/ s9 V4 x: ]5 z. r
   acceptTxs uint32 // Flag whether we're considered synchronised (enables transaction processing)' k$ m4 f9 G, J2 r: b8 m
   txpool      txPool7 F% [5 V! T4 C; e! k1 Y
   blockchain  *core.BlockChain
9 C; N, k2 W5 h8 F& \# N* M1 U2 f   chainconfig *params.ChainConfig5 F2 B' I) ^/ g( c% }
   maxPeers    int
: g" u! M. v# L   //Downloader类型成员负责所有向相邻个体主动发起的同步流程。
# q, B! o" R. O1 O8 Y7 O; R   downloader *downloader.Downloader3 V1 Z) [$ _8 T: P' _
   //Fetcher类型成员累积所有其他个体发送来的有关新数据的宣布消息,并在自身对照后做出安排
; V# e1 i; j7 C; F7 I   fetcher    *fetcher.Fetcher
2 m  j1 l  x( b$ _6 P4 [: C! U   //用来缓存相邻个体列表,peer{}表示网络中的一个远端个体。
5 f5 [- ]* Z' h: x( n4 l$ }6 l- |   peers      *peerSet
; s& x% j4 S6 `) z$ ^/ g! ^   SubProtocols []p2p.Protocol! {7 |; z  K" X6 i
   eventMux      *event.TypeMux$ z& _7 k8 s2 m  l4 W7 P6 M2 c
   txsCh         chan core.NewTxsEvent
* K8 R2 t# o9 v4 Y, p0 E; H! S   txsSub        event.Subscription
7 I/ z" A3 [( C% E& z. s1 W   minedBlockSub *event.TypeMuxSubscription
; [7 G5 J. n6 x' C9 E   * u" d7 v; h  m2 F- p7 k
   //通过各种通道(chan)和事件订阅(subscription)的方式,接收和发送包括交易和区块在内的数据更新。
. D8 h1 B8 H( c3 z5 b   //当然在应用中,订阅也往往利用通道来实现事件通知。
& Z. m: S' z7 Z( y   // channels for fetcher, syncer, txsyncLoop
  z: ]' X) b, y8 A2 w   newPeerCh   chan *peer
0 }+ |; i  V$ X  N+ F- M   txsyncCh    chan *txsync- T2 w6 Q) m; e% ?# J+ {
   quitSync    chan struct{}1 p' `8 }: X  Q' Z% t! h2 L
   noMorePeers chan struct{}
" ^6 m. g0 Z/ L) P2 P! {0 C   // wait group is used for graceful shutdowns during downloading
; i7 d) H: v* B, k5 v- b6 f6 f   // and processing
3 H+ A5 M' W6 m5 d7 _   wg sync.WaitGroup
( h5 ^; j2 ]+ {8 k}
1 N, Z& }* y, k" v- [   Start()函数是ProtocolManager的启动函数,它会在eth.Ethereum.Start()中被主动调用。1 ~" K( R3 T4 I+ @
ProtocolManager.Start()会启用4个单独线程(goroutine,协程)去分别执行4个函数,
& ?) U/ ~  J! |, y* U' L这也标志着该以太坊个体p2p通信的全面启动。
8 Q; Z0 z9 O, n/ x, @) T( |func (pm *ProtocolManager) Start(maxPeers int) {
" S5 J) f) E7 c; }7 x9 \- B4 C- ^   pm.maxPeers = maxPeers2 x# I& K0 m% d7 h
   // broadcast transactions
5 h6 S9 N+ i& k4 h! i! u1 m   //广播交易的通道。 txsCh会作为txpool的TxPreEvent订阅通道。
2 v1 K: c% j# g+ T  R; ]8 f8 S' _; Z   //txpool有了这种消息会通知给这个txsCh。 广播交易的goroutine会把这个消息广播出去。
* Z8 u+ o4 ~9 h5 v   pm.txsCh = make(chan core.NewTxsEvent, txChanSize)! ?# p& q- l) W! e
   //订阅交易信息
$ s& _* g4 l4 n1 [0 ?  h   pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh)/ _' H/ W, L5 g! |: }: u
   
: s6 B+ F; t2 R3 T/ p   go pm.txBroadcastLoop()
3 Z; |. g1 O2 @8 l! N3 H. m; X   //订阅挖矿消息。当新的Block被挖出来的时候会产生消息
0 N5 l1 z3 i. ]4 [" l6 h   // broadcast mined blocks6 p$ G* W" g1 M5 b5 s: z1 {6 s1 O
   pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})! q* w" l) N) i; _9 I
   //挖矿广播 goroutine 当挖出来的时候需要尽快的广播到网络上面去。0 P' K# Y; g7 L# c3 P$ T; p
   go pm.minedBroadcastLoop()4 J$ f5 W# R6 M+ @6 A
   // start sync handlers
( F! u* @* U. G6 ^. W* @7 L   // 同步器负责周期性地与网络同步,下载散列和块以及处理通知处理程序。( L5 X* ^6 n) S9 v& U3 ?
   go pm.syncer()) B/ }2 y0 G5 ]: x% U1 ^
   // txsyncLoop负责每个新连接的初始事务同步。 当新的peer出现时,: d0 X% u7 A; A! x' W3 l
   // 转发所有当前待处理的事务。为了最小化出口带宽使用,我们一次只发送一个小包。
: M$ n* p+ ~7 k2 q( ?   go pm.txsyncLoop(); K2 {* Y2 }) B0 @, ]* |
}
* S& L5 S3 ?4 z- O% [//txBroadcastLoop()会在txCh通道的收端持续等待,一旦接收到有关新交易的事件,
7 {! p5 S4 t# v; W2 I* `4 K   //会立即调用BroadcastTx()函数广播给那些尚无该交易对象的相邻个体。
  U: i! ^9 u. c/ U//------------------go pm.txBroadcastLoop()-----------------------4 I) Q/ q  p5 b" F* W; W
func (pm *ProtocolManager) txBroadcastLoop() {
4 t% D. O  V3 X( k9 @$ L   for {+ [- d& D+ S) U& k. S7 `
           select {
  f9 k4 X9 U: m% y           case event := = pm.maxPeers && !p.Peer.Info().Network.Trusted {4 S3 r2 w; F: d  a
           return p2p.DiscTooManyPeers. G0 o6 Q7 u" q1 w: J0 X9 c
   }; W. D5 g9 r" `: }
   p.Log().Debug("Ethereum peer connected", "name", p.Name())
$ N% w* u/ E6 U! _   // Execute the Ethereum handshake& m3 i6 ^$ ^5 j- C$ _0 X3 [& I
   var (
( O3 ~) T5 x5 x' \- W           genesis = pm.blockchain.Genesis()
& j" _( K9 e7 K           head    = pm.blockchain.CurrentHeader()6 b6 E' m7 y4 U/ O+ M4 I* h9 a
           hash    = head.Hash()
$ W# v3 M8 L: f0 X           number  = head.Number.Uint64()5 k1 Q! r! {) |# U( Q4 s! K
           td      = pm.blockchain.GetTd(hash, number)9 h  U$ L7 m( m5 l
   )2 M1 P$ ^( @  }: I2 P8 l9 o, z6 N
   //握手,与对方peer沟通己方的区块链状态
9 a4 _( K, [6 S3 H3 Q: d   if err := p.Handshake(pm.networkID, td, hash, genesis.Hash()); err != nil {, U  W% c/ o! C, |. {8 ]
           p.Log().Debug("Ethereum handshake failed", "err", err)
) |% M" }( }0 P8 n2 P           return err
% x1 `0 E! b2 q) p. r7 k* L   }
' a6 m+ x, A) w5 {2 D/ m& G! _   //初始化一个读写通道,用以跟对方peer相互数据传输。
3 c( I, D# x, z6 V0 d% `   if rw, ok := p.rw.(*meteredMsgReadWriter); ok {
/ ^  i/ A# \/ u/ r: z           rw.Init(p.version)
% p8 e$ p: {4 E& l5 z# ~% m) ^# W   }
, |$ {+ k( i9 w% x. L4 `% ]0 G   // Register the peer locally
0 Q$ U6 m$ ~, R' e   //注册对方peer,存入己方peer列表;只有handle()函数退出时,才会将这个peer移除出列表。
1 W2 g6 _7 x/ E8 D   if err := pm.peers.Register(p); err != nil {# f5 r1 G3 g% `
           p.Log().Error("Ethereum peer registration failed", "err", err)
" Q# n2 s: t+ J           return err1 R( G  [8 P/ K( A# Q" P
   }$ p) i1 H. X) z% o. J  p
   defer pm.removePeer(p.id)
0 q9 W" {, r( T! j: f& z   //Downloader成员注册这个新peer;Downloader会自己维护一个相邻peer列表。
; V  L+ A' N, O; _. a   // Register the peer in the downloader. If the downloader considers it banned, we disconnect
# \; U2 ?% v3 I$ [. v6 |   if err := pm.downloader.RegisterPeer(p.id, p.version, p); err != nil {
; V8 ]% u- Y! ~6 Z$ H7 r. V/ F  g           return err7 o3 Q1 `! F& r* w* @+ g- Y
   }
+ M+ Z/ N# f8 D   // Propagate existing transactions. new transactions appearing1 F% g, p6 o+ x& q4 H
   // after this will be sent via broadcasts.% U* e$ j. L) Y5 s) ~1 u4 U5 f
   /*
5 S( P0 o. p+ E$ e- l, I; j   调用syncTransactions(),用当前txpool中新累计的tx对象组装成一个txsync{}对象,( f9 R5 }5 F# E/ i  X+ I
   推送到内部通道txsyncCh。还记得Start()启动的四个函数么? 其中第四项txsyncLoop()
/ C% t: c1 R( l* E9 S" B   中用以等待txsync{}数据的通道txsyncCh,正是在这里被推入txsync{}的。
9 U* b4 s! h7 x( _2 ]   */9 g* \3 H3 ~5 x" n
   pm.syncTransactions(p)1 g/ c$ m" ^/ F2 x6 D
   // If we're DAO hard-fork aware, validate any remote peer with regard to the hard-fork
$ d. i; S3 F, D; J3 q   if daoBlock := pm.chainconfig.DAOForkBlock; daoBlock != nil {
! }* q' M6 d- V3 M- Y           // Request the peer's DAO fork header for extra-data validation* s  m! x9 f1 Q% J) A6 F& L
           if err := p.RequestHeadersByNumber(daoBlock.Uint64(), 1, 0, false); err != nil {, W% \' c3 A: a$ e& {1 L2 G
                   return err+ ?0 @$ A2 b0 x: Y+ `6 u; s0 N
           }
8 Q$ o/ \/ l, p- E! G2 w           // Start a timer to disconnect if the peer doesn't reply in time9 L' c* }1 S5 R2 e7 j3 x
           p.forkDrop = time.AfterFunc(daoChallengeTimeout, func() {
, p8 q* W, G) g% W$ \! J                   p.Log().Debug("Timed out DAO fork-check, dropping")8 V6 ?/ A- ^8 A/ x% e
                   pm.removePeer(p.id)
/ A! D3 M7 `, f" k; g4 R           })
2 K. @( y1 K$ U0 Y) z  ?8 Q9 U  d           // Make sure it's cleaned up if the peer dies off
& H! z( m1 L. |' a$ O: m) }7 ^           defer func() {
7 g! C3 C" H6 W# `3 w                   if p.forkDrop != nil {; h. k# N4 x) n% G+ ]
                           p.forkDrop.Stop()9 v) Z2 m" K; j* r6 {: s: u
                           p.forkDrop = nil
- n3 Z2 r! R9 g) u9 k; \& w                   }
/ z7 |3 T  \7 W; Z           }()
$ A" g7 M. Z/ x$ C   }
0 N0 R! _7 d+ K' |6 M           //在无限循环中启动handleMsg(),当对方peer发出任何msg时,
" Q, b+ C, |6 l- I+ S. x           //handleMsg()可以捕捉相应类型的消息并在己方进行处理。, ?6 c5 Z& i+ U5 F0 X+ K
   // main loop. handle incoming messages.
. q% l1 Z0 H- w5 Q& n   for {. P, Y' V* u/ {2 ?! w- Z0 ]. ^* ~
           if err := pm.handleMsg(p); err != nil {! r$ \3 I# [% z' v
                   p.Log().Debug("Ethereum message handling failed", "err", err)* a- k, k, C2 {" Q$ z* `! a) g
                   return err( n( T& b: `" k! K
           }' E( p2 c. M! {% b# W
   }
- }  W# \4 q5 ?! a7 `8 M}
BitMere.com 比特池塘系信息发布平台,比特池塘仅提供信息存储空间服务。
声明:该文观点仅代表作者本人,本文不代表比特池塘立场,且不构成建议,请谨慎对待。
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

成为第一个吐槽的人

Mohammad61417 小学生
  • 粉丝

    0

  • 关注

    0

  • 主题

    2