Hi 游客

更多精彩,请登录!

比特池塘 区块链技术 正文

深入区块链以太坊源码之p2p通信

Mohammad61417
136 0 0
一、p2p网络中分为有结构和无结构的网络
( d  u$ P) {: w) \) g7 x! {4 v无结构化的2 X& O% v: c6 _1 X/ y
这种p2p网络即最普通的,不对结构作特别设计的实现方案。5 _% I6 ]3 p, F# M
优点是结构简单易于组建,网络局部区域内个体可任意分布,9 ^% B8 M: c) ~7 X
反正此时网络结构对此也没有限制;特别是在应对大量新个体加
5 }- P* ^6 s* @1 u3 \2 O入网络和旧个体离开网络(“churn”)时它的表现非常稳定。: ^" K/ W$ _, y' R
缺点在于在该网络中查找数据的效率太低,因为没有预知信息,
( T1 {) m4 H- Z7 v0 i所以往往需要将查询请求发遍整个网络(至少大多数个体),
1 ?/ U+ u, Y6 J' M3 L! A7 r. w2 P这会占用很大一部分网络资源,并大大拖慢网络中其他业务运行。- e: B# T8 v2 W8 M) i
结构化的:) y3 e, Z% O  b
这种p2p网络中的个体分布经过精心设计,主要目的是为了提高查询数据的效率,
3 k7 _! q/ D# v/ w: {1 O降低查询数据带来的资源消耗。
" o, B& Q1 T& G- m9 l以太坊采用了不需要结构化的结构,经过改进的非结构化(比如设计好相邻个体列表peerSet结构)
, n+ v8 S* w2 h. p& s1 a2 j* O2 P网络模型可以满足需求;
. s0 p- R4 W) D9 a' @" s! G二、分布式hash表(DHT)) k  k4 E5 n  C+ ]; _( P+ R
保存数据
+ s! Q- ^& o6 P1 |6 I(以下只是大致原理,具体的协议实现可能会有差异)
$ c+ v# Q& K1 E当某个节点得到了新加入的数据(K/V),它会先计算自己与新数据的 key 之间的“距离”;6 n4 O. k& d  z. ]1 M3 q
然后再计算它所知道的其它节点与这个 key 的距离。
9 P% ~0 T, p, n9 u/ X( o9 Y如果计算下来,自己与 key 的距离最小,那么这个数据就保持在自己这里。8 F+ D5 s# r6 E( E0 W4 M
否则的话,把这个数据转发给距离最小的节点。6 y& C; b" {8 \- O/ B( Z: C* t
收到数据的另一个节点,也采用上述过程进行处理(递归处理)。+ s3 [8 k- q/ s* ~* ]* T4 |$ S
获取数据! Z& l1 n2 U2 }
(以下只是大致原理,具体的协议实现可能会有差异)
* T9 u! w; y. V5 T+ `+ Q$ C- }当某个节点接收到查询数据的请求(key),它会先计算自己与 key 之间的“距离”;
) Z$ i7 {: n7 j4 O  @( S然后再计算它所知道的其它节点与这个 key 的距离。
4 V8 L0 @8 I6 a( m/ A* O如果计算下来,自己与 key 的距离最小,那么就在自己这里找有没有 key 对应的 value。
) {7 q0 M2 n$ [  z有的话就返回 value,没有的话就报错。
2 _1 l5 H1 \* E9 y3 U否则的话,把这个数据转发给距离最小的节点。
* Z4 p9 {# \, m# v6 o' b& ]收到数据的另一个节点,也采用上述过程进行处理(递归处理)。. Z" N/ p' s; o8 Z1 J; ^1 j
三、以太坊中p2p通信的管理模块ProtocolManager- Y1 m) z' H$ E3 _! K; u6 ^: S
/geth.go* r/ u: S0 ~( |
// Start creates a live P2P node and starts running it.( l3 ?/ ^& G( t9 I- l
func (n *Node) Start() error {
' U# b8 f* a' r2 @   return n.node.Start()
% ]0 [0 u1 j; e5 s}
, i9 V4 v5 G6 a* S1 M- Q$ ]+ E9 m/*
. Y, I2 u! }6 ~( p" B5 E   Protocol:容纳应用程序所要求的回调函数等.并通过p2p.Server{}在新连接建立后,将其传递给通信对象peer。
( U. f* A+ R: q% @1 ]   Node.Start()中首先会创建p2p.Server{},此时Server中的Protocol[]还是空的;
9 g4 W' L( N+ K/ v: ?0 x8 N; [/ N   然后将Node中载入的所有实现体中的Protocol都收集起来,- [$ Z' A# R' }/ u
   一并交给Server对象,作为Server.Protocols列表;然后启动Server对象,
6 U6 y) y. ^; t" j5 [3 m   并将Server对象作为参数去逐一启动每个实现体。
2 t& U- R9 m. H# R0 Z' K' X* n*/+ p% {( o7 A7 W; U  ?
/node.go4 Q  G! @7 l6 B
// Start create a live P2P node and starts running it.6 Q. V. o0 @1 T6 P( V( E: [$ W
func (n *Node) Start() error {* C' p' N; C* {
   & V1 y2 P, b" F5 A$ B5 w
   ...
# W7 ?5 G# T7 w0 K% J7 L   /*1 {' Z) G6 z! |# |% _$ K/ h
           .../ n7 b( f& h* j
           初始化serverConfig
8 }- e# [" ]; `% Y: y9 o1 I' g   */9 J8 Z9 L  ]' R8 \$ ]
   running := &p2p.Server{Config: n.serverConfig}
8 ^  M+ b: v. Z3 C; }   ...
: g/ t6 L7 l& J3 x   // Gather the protocols and start the freshly assembled P2P server8 {# |- _$ k) m
   for _, service := range services {
" X; V( U+ N) ^- u/ b           running.Protocols = append(running.Protocols, service.Protocols()...)7 H2 u6 p  V. \
   }/ G3 k/ h# F4 S8 e
   if err := running.Start(); err != nil { //见下面的(srv *Server)Start方法
5 a- x0 A2 `0 {* ?1 G           return convertFileLockError(err)
/ M! t- R! y$ r2 o7 ^   }! y9 W" P$ ^9 ?2 _' a. E8 f
   // Start each of the services) t1 t, X1 ?8 P9 {8 |
   started := []reflect.Type{}( ?1 V2 ~, h$ @+ ~6 J9 r0 E
   for kind, service := range services {, H. W* H7 S5 _6 T
           // Start the next service, stopping all previous upon failure
' T1 {* ~2 R* Z5 W$ a           //启动每个services通过下面的方法func (s *Ethereum) Start(srvr *p2p.Server) error {. d8 ~5 z+ b: U' w  b, `, n+ X
           if err := service.Start(running); err != nil {
4 ]3 p. g- L, x; x8 C& E! Q8 f0 n                   for _, kind := range started {* Y& v# r$ c2 D* q2 [" y
                           services[kind].Stop(), @/ y+ {& S$ a* h
                   }
8 s/ v" J0 O/ D) k                   running.Stop()
% U1 I0 U4 ]- {8 r4 I. R                   return err* p/ `# o# v1 c7 F) {+ ^. a# ^  g
           }
- Y; P+ E6 g' p  J4 V3 Q8 D           ...  a, y! H) c* z- i$ E$ @* Q; [
   }% N  r+ n# N+ x& ^- ~- s% \
}) W) O& ^! m5 P* c( H! E2 \" J( I
// Start starts running the server.
. k) w$ P8 i( x8 i/ q// Servers can not be re-used after stopping.+ j1 s8 q5 P* Q! D4 k: e
func (srv *Server) Start() (err error) {2 r* J$ [" ?/ P0 @, j; _
   srv.lock.Lock()6 o* E# \( W( z; y; p
   //srv.lock为了避免多线程重复启动
8 w4 |' Y0 {! i' W4 O- A" z   defer srv.lock.Unlock()
8 U7 @4 V. C; |! ^   if srv.running {" E$ r# E( H' C+ x9 `
           return errors.New("server already running")
' \' R& {% u' g7 E   }% o" h7 \1 k( `8 H
   srv.running = true
" p+ D3 e8 R0 |5 V' h   srv.log = srv.Config.Logger) \2 O( _: u+ Y8 m) T
   if srv.log == nil {
# }9 z5 z$ F& T5 W6 q2 W8 v8 [5 _* _4 ]           srv.log = log.New()# q2 R2 R% Y5 r/ x
   }
& f# f5 f/ j& N! A/ c   if srv.NoDial && srv.ListenAddr == "" {
. l  R0 V% q& P* e- }           srv.log.Warn("P2P server will be useless, neither dialing nor listening")
! z2 E. N/ P% {" M  x  y- i   }5 n: ~4 T# V0 L7 C/ b5 d/ i
   // static fields4 Y' r# e" Y4 y! j; H% Q1 n' i. C
   if srv.PrivateKey == nil {% x3 C, V3 j: D2 t. H
           return fmt.Errorf("Server.PrivateKey must be set to a non-nil key")
2 b3 M1 V; H+ ~1 v   }
$ S5 Q. A* m7 n; h" {& ]   //newTransport使用了newRLPX使用了rlpx.go中的网络协议。4 q5 [; `+ _1 f
   if srv.newTransport == nil {$ e6 R) `% T4 U, B. O, u; Z# ^
           srv.newTransport = newRLPX+ f4 \7 Z' D, W: r: Y
   }/ W  R* `. t0 l4 y2 @: m0 ?
   if srv.Dialer == nil {
  P6 Z  a/ ]3 M4 i5 f           srv.Dialer = TCPDialer{&net.Dialer{Timeout: defaultDialTimeout}}/ V& a- T/ P! }# _4 I- ?/ E3 G4 x
   }
" X/ e! J# Z, r6 T1 e0 E   srv.quit = make(chan struct{})
7 {5 }5 z  ?  W0 ]$ S- [1 d$ }% b   srv.addpeer = make(chan *conn)% D2 q8 \6 r* b2 {" e6 ?# @
   srv.delpeer = make(chan peerDrop): y# S- ], ]* ~  s9 O
   srv.posthandshake = make(chan *conn)
" D+ `( u0 D) |0 K& V* P   srv.addstatic = make(chan *enode.Node), K3 E" h- G/ D& S
   srv.removestatic = make(chan *enode.Node)' b/ \2 ]0 j( H
   srv.addtrusted = make(chan *enode.Node)
9 K8 i* w2 g9 B6 U2 P   srv.removetrusted = make(chan *enode.Node)7 y6 d; N. K4 g: i0 K- X
   srv.peerOp = make(chan peerOpFunc)6 ?! l" e, W$ |0 }* k4 o- L
   srv.peerOpDone = make(chan struct{})/ w& k  O4 J, ^$ X' e8 K
   //srv.setupLocalNode()这里主要执行握手
3 b. r; N, v, ^6 W   if err := srv.setupLocalNode(); err != nil {& c% y4 P9 W  {5 r) z
           return err
& G2 L# W( S' B; ]   }. A6 t1 `8 q; c6 P
   if srv.ListenAddr != "" {
( u' P# Q$ e( k, F           //监听TCP端口-->用于业务数据传输,基于RLPx协议)% w* p# u; s0 \) R1 B/ d
           //在setupListening中有个go srv.listenLoop()去监听某个端口有无主动发来的IP连接
# u# w2 N1 _  _& D- z; {; T           if err := srv.setupListening(); err != nil {
% Y1 U( L& a( b. N" z! `! u                   return err
. b& m0 Q$ A* O" K8 s  V+ F0 y           }
4 y/ X5 @2 g. q& A' Z# U4 e   }
) p3 s4 C& U% t# t   //侦听UDP端口(用于结点发现内部会启动goroutine)
7 x- b2 e  j/ u. D   if err := srv.setupDiscovery(); err != nil {
( d/ s3 A0 z6 H9 ?+ [* ?           return err& d) q. M; `5 e
   }
/ k' |4 V. e9 A5 S; o7 p   dynPeers := srv.maxDialedConns()
6 H9 ~7 h" h- g- Y& @- y' n: d4 a   dialer := newDialState(srv.localnode.ID(), srv.StaticNodes, srv.BootstrapNodes, srv.ntab, dynPeers, srv.NetRestrict)
5 h) \1 v7 N$ t2 p, E, `   srv.loopWG.Add(1)8 [' I- L" _$ f$ t
   // 启动新线程发起TCP连接请求
- _0 a$ F7 _/ [& |2 f" j% t& N   //在run()函数中,监听srv.addpeer通道有没有信息如果有远端peer发来连接请求,
5 N  s6 Q4 K' t! a4 @1 ?: |   //则调用Server.newPeer()生成新的peer对象,并把Server.Protocols全交给peer。
' G! E; W0 V. R   /*, x2 k( B/ D5 i% Q' W
   case c :=  0 {9 D! E& h5 s4 q2 m5 U
           if s.config.LightPeers >= srvr.MaxPeers {  ^: ~3 T. I: [4 t( `) Z
                   return fmt.Errorf("invalid peer config: light peer count (%d) >= total peer count (%d)", s.config.LightPeers, srvr.MaxPeers)+ T4 k) _/ c* c" f& w
           }
( f1 k! {' ]7 m7 `  Z/ @           maxPeers -= s.config.LightPeers
. f3 u$ C9 C8 x* C3 f   }7 k; ]' n0 p& k! V
   // Start the networking layer and the light server if requested
5 i1 p, N. b  j   s.protocolManager.Start(maxPeers)3 ?! \0 p2 V( N2 `& H
   if s.lesServer != nil {; I2 o6 Z& p% k8 y  w' l8 e- l" I( r: \* @
           s.lesServer.Start(srvr)! H; m" w- T" J' d9 ~- s  y- y
   }3 C% P- O  E& |5 U& a1 W5 K
   return nil
& u4 k' M2 C0 _4 V( I}# Y0 N1 m! Z' K2 {
/eth/handler.go
1 o& O4 |, Q9 `, Wtype ProtocolManager struct {
+ m) s( D: x8 t# Z, Q; S5 m   networkID uint64
8 R$ ?" Y* k( j) f   fastSync  uint32 // Flag whether fast sync is enabled (gets disabled if we already have blocks)
; Y3 }; I. }" y# y& z' O9 p/ X  V$ ]   acceptTxs uint32 // Flag whether we're considered synchronised (enables transaction processing)$ K9 u4 J  T. E% A8 Q2 y  v- K
   txpool      txPool
' y- S# Z7 A1 l3 S$ h+ _$ n6 B' u   blockchain  *core.BlockChain
8 _$ A. b- v5 [. r2 l5 F   chainconfig *params.ChainConfig3 Y4 F5 [( H3 }
   maxPeers    int6 u# l' E" q+ ]; I- I* z
   //Downloader类型成员负责所有向相邻个体主动发起的同步流程。8 Y) A- W4 J1 W: h
   downloader *downloader.Downloader. ?# g# D8 j% V  `# o; r+ c
   //Fetcher类型成员累积所有其他个体发送来的有关新数据的宣布消息,并在自身对照后做出安排2 [$ c& S  h( k6 P) X
   fetcher    *fetcher.Fetcher( l  c  Z3 `. i+ q& I; i; j( T' `6 {
   //用来缓存相邻个体列表,peer{}表示网络中的一个远端个体。
. v  B# d2 ]- {8 q: @! L   peers      *peerSet
* D4 h7 J- }' m9 Y" k5 W   SubProtocols []p2p.Protocol
) y+ @' U# }% L% z! I) @7 O   eventMux      *event.TypeMux$ z2 T$ w3 D% W3 @) [
   txsCh         chan core.NewTxsEvent+ C2 _! h2 Y$ i
   txsSub        event.Subscription
6 |6 ]; j6 a9 \) |5 k, M   minedBlockSub *event.TypeMuxSubscription
& Y3 b( Q$ o' e. h, J- h   $ f! y5 g* ]" Y0 t) ~
   //通过各种通道(chan)和事件订阅(subscription)的方式,接收和发送包括交易和区块在内的数据更新。! |& X- T! a9 C9 S/ N" G
   //当然在应用中,订阅也往往利用通道来实现事件通知。
( ~3 C" |' i4 l8 f   // channels for fetcher, syncer, txsyncLoop
- s/ `5 ~0 V* u8 i. d/ p   newPeerCh   chan *peer* V( B4 _( U" _
   txsyncCh    chan *txsync# m2 _9 S) d1 h
   quitSync    chan struct{}! Q/ L# @" a3 n
   noMorePeers chan struct{}
& u, H- V3 E# K, Y! H2 }# U   // wait group is used for graceful shutdowns during downloading
! I* Q* R0 `- p! \. H' W   // and processing
' F0 M5 ]4 e* E& r2 E   wg sync.WaitGroup$ I: J8 x. a" |; m) b' k
}9 a; w8 B% `7 k- Q0 L1 W* q
   Start()函数是ProtocolManager的启动函数,它会在eth.Ethereum.Start()中被主动调用。
. J6 T/ d( c( I( e  a; XProtocolManager.Start()会启用4个单独线程(goroutine,协程)去分别执行4个函数,  [7 h7 v; q/ ^- z- B8 h, l- ?
这也标志着该以太坊个体p2p通信的全面启动。1 j: M7 c, n3 t& L2 t
func (pm *ProtocolManager) Start(maxPeers int) {0 K! V1 A7 }# b+ a
   pm.maxPeers = maxPeers
: Y& b  w+ _# q7 u   // broadcast transactions
( Z! i: z- L* n/ |% ^; w   //广播交易的通道。 txsCh会作为txpool的TxPreEvent订阅通道。5 A& K  i/ z% E/ r  [  R$ a
   //txpool有了这种消息会通知给这个txsCh。 广播交易的goroutine会把这个消息广播出去。
& `9 ~8 p; c7 Z   pm.txsCh = make(chan core.NewTxsEvent, txChanSize)# q9 U# Y# J* ^% v% k& o3 K% T
   //订阅交易信息% Y+ x) _" H: O2 y
   pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh)
. C/ a. o+ s0 Y' T+ J   
5 g% t+ p3 a. }" h1 g# [   go pm.txBroadcastLoop(), Z# _3 o9 M% w- Y/ y1 D$ T( h! j
   //订阅挖矿消息。当新的Block被挖出来的时候会产生消息
! n+ z6 o6 ~2 A2 ~0 h7 O+ E4 G   // broadcast mined blocks
7 j% p% i7 q- {+ B   pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
( J3 R9 U( Y2 Y& C9 g   //挖矿广播 goroutine 当挖出来的时候需要尽快的广播到网络上面去。3 ^0 }" V+ ~9 T7 t6 A% D) B
   go pm.minedBroadcastLoop()3 k$ Z* W1 U/ @( d" B  ?+ d
   // start sync handlers
% k$ r0 m9 c/ H$ ]1 O8 K   // 同步器负责周期性地与网络同步,下载散列和块以及处理通知处理程序。+ n* a6 G- W  C5 V. C
   go pm.syncer()
% J0 u5 L  E. U: z0 d   // txsyncLoop负责每个新连接的初始事务同步。 当新的peer出现时," L' O/ T9 u8 V, a+ I
   // 转发所有当前待处理的事务。为了最小化出口带宽使用,我们一次只发送一个小包。3 ^" t: m1 @0 G
   go pm.txsyncLoop()
) I, R# ]! q# |( H+ @# t7 S4 B}5 C+ D% a  N& ]9 \0 F+ k8 _
//txBroadcastLoop()会在txCh通道的收端持续等待,一旦接收到有关新交易的事件,: w0 _* k) h! F* a
   //会立即调用BroadcastTx()函数广播给那些尚无该交易对象的相邻个体。* S' |  l6 A7 C. C
//------------------go pm.txBroadcastLoop()-----------------------
- N# A7 _2 B( A: Y8 Ofunc (pm *ProtocolManager) txBroadcastLoop() {
* [8 z& b" k8 s2 g   for {' X$ @! j' ^) `6 C* ?5 X
           select {* \. e3 v- f$ A  ?
           case event := = pm.maxPeers && !p.Peer.Info().Network.Trusted {
% }5 n0 P6 D, q' n' R0 U, E/ I# M           return p2p.DiscTooManyPeers# y  {0 Y4 z: s- U, J8 H& a1 c
   }
4 E$ p* T, w( U# @, C   p.Log().Debug("Ethereum peer connected", "name", p.Name())3 N2 J* Q  f0 A2 Q% r3 I/ y
   // Execute the Ethereum handshake
4 x* c; P) \$ ~. ]* ^   var (, G/ i9 q) W( g( j, n6 V
           genesis = pm.blockchain.Genesis()/ m/ ^- S* X1 w' b( [: p
           head    = pm.blockchain.CurrentHeader(). Y3 ^% f4 M7 C$ M6 C" n
           hash    = head.Hash()
0 I3 f) Z) N/ `% E% c; x           number  = head.Number.Uint64()
4 i/ v' c" H# p6 R           td      = pm.blockchain.GetTd(hash, number)" n( p1 x3 S. f) e& r5 M
   )
( F) ]* H5 Z& B   //握手,与对方peer沟通己方的区块链状态$ J. a! Y- t7 Y; u& z% @
   if err := p.Handshake(pm.networkID, td, hash, genesis.Hash()); err != nil {* R+ S' r) x7 B! T$ ~
           p.Log().Debug("Ethereum handshake failed", "err", err)+ l0 n3 {2 W3 v. T* P
           return err! l# ~) o$ x/ F( Q/ n& M
   }8 U; c) R7 ~) r* h5 m% X4 @% o
   //初始化一个读写通道,用以跟对方peer相互数据传输。
' X1 k* A! `* V3 M. Z- ]5 I8 o) e   if rw, ok := p.rw.(*meteredMsgReadWriter); ok {! u: Y- n& c6 c5 _/ B
           rw.Init(p.version)  ^6 R6 _2 }5 C: N; x5 N. b# i: E
   }
, p0 S' n! g' E& y( p$ y   // Register the peer locally
0 K0 C$ q) H* y6 \) h  c  \7 J   //注册对方peer,存入己方peer列表;只有handle()函数退出时,才会将这个peer移除出列表。7 K# B) W8 P  F2 \- l
   if err := pm.peers.Register(p); err != nil {1 \% m9 ]2 b) W! k! K# R3 J
           p.Log().Error("Ethereum peer registration failed", "err", err)
0 I, b( m- V, y) M* r, s+ Z           return err/ J% Z7 T+ I. O: D# r5 `/ p# g
   }7 d0 R- i8 N1 B- @: m; b' f1 E: l
   defer pm.removePeer(p.id)4 U: F+ e3 U$ J6 Y1 ]. G0 _- N
   //Downloader成员注册这个新peer;Downloader会自己维护一个相邻peer列表。
; w; z9 Z- M$ g1 I# T; D$ \4 X, c   // Register the peer in the downloader. If the downloader considers it banned, we disconnect
/ m/ Z1 e# ?4 \* B& E! k   if err := pm.downloader.RegisterPeer(p.id, p.version, p); err != nil {
6 K- Q8 Q: z# @/ E4 K6 o; X! B) W           return err
6 t4 X5 Z) J6 J; J5 o7 x+ }   }( Q, l, I+ W- U3 S) o* J0 g1 v
   // Propagate existing transactions. new transactions appearing
  E! K) i8 V" S( T; B$ N& z   // after this will be sent via broadcasts.
6 p6 N+ `* @3 }" O. Y   /*
4 n- l3 @! c9 s" ^" I& R+ u+ X   调用syncTransactions(),用当前txpool中新累计的tx对象组装成一个txsync{}对象,
% a. U7 j) y; F: U5 T   推送到内部通道txsyncCh。还记得Start()启动的四个函数么? 其中第四项txsyncLoop()
- T7 \; p, h2 q8 N# V   中用以等待txsync{}数据的通道txsyncCh,正是在这里被推入txsync{}的。% y' O8 E5 R: s& b+ L9 [8 r" p0 }  X
   */" H% R0 q( \, Q, F" q# {
   pm.syncTransactions(p)2 n  t% M4 P: ~6 T) n% p  V
   // If we're DAO hard-fork aware, validate any remote peer with regard to the hard-fork# k# X  n2 j& r4 J6 m! a! h
   if daoBlock := pm.chainconfig.DAOForkBlock; daoBlock != nil {
. {. J( d! R9 `" \  w$ J0 y           // Request the peer's DAO fork header for extra-data validation
/ N8 L. [# z( m1 b           if err := p.RequestHeadersByNumber(daoBlock.Uint64(), 1, 0, false); err != nil {
+ {  U" o1 G6 _7 i4 j. {$ V  X                   return err( q4 o/ `6 _6 V. }1 A
           }: u. H6 {) b5 n# |. x* o- Y
           // Start a timer to disconnect if the peer doesn't reply in time
7 }# M3 d7 V0 B* A/ L           p.forkDrop = time.AfterFunc(daoChallengeTimeout, func() {
7 o3 N( p( l3 D1 V; f                   p.Log().Debug("Timed out DAO fork-check, dropping")
3 {& ]7 n' E  I) }% q8 f# I. ?                   pm.removePeer(p.id)! }3 \  N. [" r' R! P/ j8 I
           })! i2 Z; A+ t: `2 n1 i6 `
           // Make sure it's cleaned up if the peer dies off
& d+ u" v+ D% m8 T2 C           defer func() {; W2 E& A  U- @5 f. n
                   if p.forkDrop != nil {
$ T* A/ k# C, o- G% h$ _                           p.forkDrop.Stop()
' E9 D/ m/ z# E. U                           p.forkDrop = nil8 m3 m; k' ?8 J8 z: y' Q! ]
                   }1 f; a2 k: X7 ^+ M; L$ D5 L
           }()
, o6 h7 M1 X" J   }
  n3 y# A" o+ L" p" Z- N$ a           //在无限循环中启动handleMsg(),当对方peer发出任何msg时,
% P! h+ A. S' E6 y( P; y           //handleMsg()可以捕捉相应类型的消息并在己方进行处理。
9 z8 j& s6 U9 W& C   // main loop. handle incoming messages.
- R1 l+ A+ ^" ~8 P  f% S7 @4 ]/ u   for {
6 g- }2 F6 g0 M' F2 }4 Y           if err := pm.handleMsg(p); err != nil {  V8 \( P$ R9 }) \" N
                   p.Log().Debug("Ethereum message handling failed", "err", err)+ m* h- F5 X! F; _
                   return err+ ~6 j7 m# F6 a7 t
           }, y+ R" P4 h. d) x/ l( b' f0 }1 @
   }
9 X" h: L; O6 Y! t( V& R6 z9 J/ w  ~- l}
BitMere.com 比特池塘系信息发布平台,比特池塘仅提供信息存储空间服务。
声明:该文观点仅代表作者本人,本文不代表比特池塘立场,且不构成建议,请谨慎对待。
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

成为第一个吐槽的人

Mohammad61417 小学生
  • 粉丝

    0

  • 关注

    0

  • 主题

    2