Hi 游客

更多精彩,请登录!

比特池塘 区块链技术 正文

深入区块链以太坊源码之p2p通信

Mohammad61417
127 0 0
一、p2p网络中分为有结构和无结构的网络# M4 a7 F+ b& k# G4 z
无结构化的
+ Q5 ]' O9 Q1 {6 M2 z4 Q, f这种p2p网络即最普通的,不对结构作特别设计的实现方案。% S  I/ o# u# O/ X* H1 @$ {
优点是结构简单易于组建,网络局部区域内个体可任意分布,
/ z' B* R: C  Z. A反正此时网络结构对此也没有限制;特别是在应对大量新个体加
8 G3 ^9 N5 O5 t9 i8 V' H  [" P- s入网络和旧个体离开网络(“churn”)时它的表现非常稳定。
% g; G1 l0 m8 s缺点在于在该网络中查找数据的效率太低,因为没有预知信息,
2 _  Y2 {7 z% g& h% W* M所以往往需要将查询请求发遍整个网络(至少大多数个体),
% U; i5 t( o+ _( y7 G+ w这会占用很大一部分网络资源,并大大拖慢网络中其他业务运行。& C( t0 u4 E$ H% d* Z
结构化的:# L1 k: ]8 D9 C/ B! M6 w
这种p2p网络中的个体分布经过精心设计,主要目的是为了提高查询数据的效率,
& B. w  O+ c# [  ?6 X9 f: _2 P降低查询数据带来的资源消耗。
0 B5 Y8 M3 O# |/ P- T& X以太坊采用了不需要结构化的结构,经过改进的非结构化(比如设计好相邻个体列表peerSet结构)4 S$ E& J1 e; Q$ c- e' `$ U
网络模型可以满足需求;
9 f+ b0 n% k6 ~" {1 B0 c二、分布式hash表(DHT)# L/ c4 r/ p) T
保存数据
+ N' o+ R6 B! ]: Y: {4 }0 ?(以下只是大致原理,具体的协议实现可能会有差异)0 I! B2 t, ^+ B1 X$ n8 Z0 g- m
当某个节点得到了新加入的数据(K/V),它会先计算自己与新数据的 key 之间的“距离”;
4 P1 \; }/ k& q3 Z然后再计算它所知道的其它节点与这个 key 的距离。
# i9 u: r0 j9 p: N如果计算下来,自己与 key 的距离最小,那么这个数据就保持在自己这里。
, Z; t* W8 j* d+ o否则的话,把这个数据转发给距离最小的节点。
4 j3 q" K2 W. C/ b4 z& j+ Y收到数据的另一个节点,也采用上述过程进行处理(递归处理)。5 k* r2 h  e/ }. Q9 ~& R6 Q' c6 F
获取数据
: v2 D; N1 O" r" `5 B- d4 N! j! H(以下只是大致原理,具体的协议实现可能会有差异)
6 G9 N) i. \6 f0 H. {当某个节点接收到查询数据的请求(key),它会先计算自己与 key 之间的“距离”;  k9 P) F1 p! r0 h/ ~* m! j9 j
然后再计算它所知道的其它节点与这个 key 的距离。
. b" ~6 A. h$ z, j  _) M: U如果计算下来,自己与 key 的距离最小,那么就在自己这里找有没有 key 对应的 value。& F+ ], n5 r& {: ~
有的话就返回 value,没有的话就报错。
2 S! t# k/ w& G( ^8 ~( }, M6 c否则的话,把这个数据转发给距离最小的节点。
) N: k5 ^/ y* \' }4 N5 J( l收到数据的另一个节点,也采用上述过程进行处理(递归处理)。
; Z9 t9 H  n4 I; G7 Q. ?' f三、以太坊中p2p通信的管理模块ProtocolManager" Z8 T4 B/ K) d3 p( I; o
/geth.go
, b- y* T, {: e5 d// Start creates a live P2P node and starts running it.! s+ [$ N7 P( i, F
func (n *Node) Start() error {/ _- G" |9 d* r
   return n.node.Start()
& o' N2 j" z( n  w}
' ^$ `$ _) Z- z# Z  D9 ]) k$ Y! O/*
$ H! T+ F0 l7 M" g) l3 Z1 J   Protocol:容纳应用程序所要求的回调函数等.并通过p2p.Server{}在新连接建立后,将其传递给通信对象peer。& T5 U% B! _2 Y# ^$ E( }5 L4 @
   Node.Start()中首先会创建p2p.Server{},此时Server中的Protocol[]还是空的;; R2 e  Z, Q) g
   然后将Node中载入的所有实现体中的Protocol都收集起来,7 p: t  B( u  o: S  j
   一并交给Server对象,作为Server.Protocols列表;然后启动Server对象,
; c; l3 O- q- S# _. W( b7 s1 U+ [   并将Server对象作为参数去逐一启动每个实现体。
1 M# i( R% D1 L- p$ d*/
2 G9 [. W& b5 U3 T/node.go2 @3 k, j0 w' n' }! ]" j; C
// Start create a live P2P node and starts running it.0 K' g+ a0 V3 s
func (n *Node) Start() error {8 h! f. l3 g0 [
   6 R' p& \2 m( s! J; q% y4 r
   ...
* |* I5 m. i0 _3 ]0 V   /*
% _' D5 Q; u* W* m5 Y. c) h           ...
1 b7 H; B5 ], I+ u* `" {. ]           初始化serverConfig6 E* m! G, i' D% K
   */! t8 w2 N6 l  K+ M( W6 P- v+ Q3 V
   running := &p2p.Server{Config: n.serverConfig}3 `1 _6 n2 y7 D* Z, C4 d' q+ g
   ...0 i( I9 B. U" q# |" ~
   // Gather the protocols and start the freshly assembled P2P server
! a' L6 u$ D1 N- ?# ?5 H! t   for _, service := range services {5 m. ]# J/ B1 ?& E% H$ d% P' V; ?
           running.Protocols = append(running.Protocols, service.Protocols()...)0 P& _( }3 }9 J/ V; u
   }+ E4 n6 N. P( H( U1 `. P  C
   if err := running.Start(); err != nil { //见下面的(srv *Server)Start方法
: l; ]: \' c& Q0 D  d3 p           return convertFileLockError(err)- b7 o! d# R9 t4 B
   }
( F6 y6 ]  n1 H+ k6 x% _5 F   // Start each of the services# H) P9 y2 j, k  q6 ]$ V" z
   started := []reflect.Type{}
, N( X/ i# _4 d  W/ b   for kind, service := range services {; t* p5 `/ {! y
           // Start the next service, stopping all previous upon failure
/ c; [& K& D1 z8 h  g* H" N           //启动每个services通过下面的方法func (s *Ethereum) Start(srvr *p2p.Server) error {
1 u, J. ~4 b1 w' B; m           if err := service.Start(running); err != nil {
, J4 U0 p5 j# f                   for _, kind := range started {2 v+ O4 F' c& k8 n8 ]7 U& @
                           services[kind].Stop()
' r2 R- T# n+ X) L& U* W* p+ s                   }6 z. o, O1 j* q0 ~1 T# r4 s
                   running.Stop()
5 |2 v0 v, |+ j' G5 Z8 O$ i) X                   return err7 ?8 J& P0 X! J( P% z4 w* e0 w- D3 ~
           }
' T! \  @, L9 R! k9 u1 ]           ...
. s% ?% Z9 O* x! g3 v7 C$ f   }  K% G1 R9 ?8 K- [+ d% |' _+ k
}" a5 t) s# L7 k: m$ x9 r
// Start starts running the server.
3 `1 q8 w" ^' M. u7 z# w// Servers can not be re-used after stopping.
5 N# a: A/ h3 v. {3 ?  g! Hfunc (srv *Server) Start() (err error) {
1 h: L0 q% N1 C" z- ]8 O$ a   srv.lock.Lock()- }0 z9 u8 e/ B& J5 I
   //srv.lock为了避免多线程重复启动/ A% ~, _% ]# Y
   defer srv.lock.Unlock()
+ m6 W' Z' q! C3 Z2 D) I* }0 O   if srv.running {6 v# `. w  Y$ m  v
           return errors.New("server already running")3 b& O/ F0 }9 _' X. E4 K
   }
- J' z0 h5 M9 Y/ {7 [/ q/ v   srv.running = true
) l5 b$ b( f0 x1 k: b& I  \   srv.log = srv.Config.Logger" v$ j" R' c; R9 y. N7 P3 s) W6 B
   if srv.log == nil {
7 i& J( M0 A7 w" N  ~3 T! I8 R           srv.log = log.New()
. n0 ^3 X* q7 b! w: x$ p+ M* Y   }
: i& S/ i: u& P8 U8 i   if srv.NoDial && srv.ListenAddr == "" {
, S9 Y/ E6 K( Y4 q6 o, g  w           srv.log.Warn("P2P server will be useless, neither dialing nor listening")
' p% b! c/ [, S# c   }, n- y* d( C1 k1 l! [
   // static fields! j) `/ q- C) q: N6 m
   if srv.PrivateKey == nil {4 Y7 v: B& f  u- W6 S; k0 }
           return fmt.Errorf("Server.PrivateKey must be set to a non-nil key")) {% Q: s7 s( z0 I! r# S
   }! R& ]/ y! T% C& s! X
   //newTransport使用了newRLPX使用了rlpx.go中的网络协议。
- Z  l* p! S7 K! H6 v9 L' l   if srv.newTransport == nil {
8 k. o( U# N& r           srv.newTransport = newRLPX
" q2 W" D7 r+ v- [0 t( ]! U$ J' H   }7 A$ u/ W3 H) C9 r3 Y2 \- z* X8 ~- W
   if srv.Dialer == nil {
  ]6 L9 q$ f" U# [3 g3 b3 J           srv.Dialer = TCPDialer{&net.Dialer{Timeout: defaultDialTimeout}}
5 ?8 o6 m- h. U0 k' \# s   }) c8 g7 I, E6 `5 l- p; h
   srv.quit = make(chan struct{})4 K3 S2 _6 V6 Y- A0 _+ j% x
   srv.addpeer = make(chan *conn)" v$ n  _4 w% V/ e) b1 y/ q3 J! D
   srv.delpeer = make(chan peerDrop)
4 }" M9 B3 G: _( F6 ]! X& ~   srv.posthandshake = make(chan *conn)+ T; J& y% b. {  ^' s
   srv.addstatic = make(chan *enode.Node)
5 }. k1 k7 f$ n$ H# R! S   srv.removestatic = make(chan *enode.Node); f1 F/ w  _5 Z) b
   srv.addtrusted = make(chan *enode.Node)
0 P4 Q+ Z+ c# z& m   srv.removetrusted = make(chan *enode.Node)
& L* j- [/ i' l9 R& i   srv.peerOp = make(chan peerOpFunc)
( ?& B6 S2 V, n! F   srv.peerOpDone = make(chan struct{})
' u5 G+ M0 R  \+ g. e   //srv.setupLocalNode()这里主要执行握手
5 q3 F. w) V) G9 k9 G( g# {9 n   if err := srv.setupLocalNode(); err != nil {, y; O6 `$ W$ ^; ]- @" A
           return err
7 o/ q0 G' \( v. X: |   }% i2 I7 J$ w% o: O# i8 [
   if srv.ListenAddr != "" {  _, e& K; y5 `) i/ @! f# W0 L( z
           //监听TCP端口-->用于业务数据传输,基于RLPx协议)
/ m( z- m* v5 Z" E$ ?% b; I: _' R           //在setupListening中有个go srv.listenLoop()去监听某个端口有无主动发来的IP连接
* t2 n, Q5 o- _9 }           if err := srv.setupListening(); err != nil {
2 {" `( |3 h7 |' G                   return err
  G9 @  A; N! }" q& X           }
5 A- [  D, j% f   }3 i0 e- a  t5 N; b- \# U" M
   //侦听UDP端口(用于结点发现内部会启动goroutine)% @& D2 A" S/ y& Y% f
   if err := srv.setupDiscovery(); err != nil {
' {' G9 y  X' Y& e7 _2 H           return err! Z' q/ \' ~- u
   }- m. w9 H/ z8 z& E& Q% H5 h" x
   dynPeers := srv.maxDialedConns()" ~8 O7 J  D4 v9 L! F, x- s: k
   dialer := newDialState(srv.localnode.ID(), srv.StaticNodes, srv.BootstrapNodes, srv.ntab, dynPeers, srv.NetRestrict)
1 j$ d! b" @; ]! f% F   srv.loopWG.Add(1)
5 {0 F; ]# ^. M   // 启动新线程发起TCP连接请求1 z1 e* q. d" [
   //在run()函数中,监听srv.addpeer通道有没有信息如果有远端peer发来连接请求," G; R% D& F; T, j  p+ M. ?% R
   //则调用Server.newPeer()生成新的peer对象,并把Server.Protocols全交给peer。3 O; P" ]# Q8 N+ e
   /*" d) F& E1 P6 T2 k  j
   case c :=  0 {" d  y1 J; V4 a
           if s.config.LightPeers >= srvr.MaxPeers {
3 j( J7 T# \3 j                   return fmt.Errorf("invalid peer config: light peer count (%d) >= total peer count (%d)", s.config.LightPeers, srvr.MaxPeers)
4 ~: ]7 ^/ {- \           }% S2 F; J8 ]9 ~( `/ ~1 B* n
           maxPeers -= s.config.LightPeers
' A- s% T. }* x   }  v6 e/ A# n/ I- c% |1 ^
   // Start the networking layer and the light server if requested3 o; e3 e; Y! k8 Y
   s.protocolManager.Start(maxPeers)% H! S" K  v# @; X0 W2 Y* T
   if s.lesServer != nil {
, }5 g0 D7 O2 |1 Y+ R           s.lesServer.Start(srvr)
) [, C6 k+ B7 X6 W3 L5 J0 Z8 G8 s2 V   }
5 K3 h' m, R) a) B. `! }2 I" w' T   return nil7 _: `( U- ~+ Z- H3 U
}
7 r( h$ |- R% e/eth/handler.go3 |1 D! f/ e' m' Y  L4 r1 g
type ProtocolManager struct {
& V8 J& ^, c" ^& s% U2 g   networkID uint64+ s2 W$ H; e& {3 l
   fastSync  uint32 // Flag whether fast sync is enabled (gets disabled if we already have blocks)
% M5 b& S; U  y   acceptTxs uint32 // Flag whether we're considered synchronised (enables transaction processing)$ w$ @" d& ?+ `/ y
   txpool      txPool
$ L0 Z# L# J7 y8 g  T/ u   blockchain  *core.BlockChain
( P7 X0 |) O" l   chainconfig *params.ChainConfig
' U6 o) r; m4 B  D$ x   maxPeers    int* t- w+ k$ _* T. W4 K. ~, k
   //Downloader类型成员负责所有向相邻个体主动发起的同步流程。; }, }1 @3 a6 @4 x2 g
   downloader *downloader.Downloader5 x- f' E) f. s; r
   //Fetcher类型成员累积所有其他个体发送来的有关新数据的宣布消息,并在自身对照后做出安排( ^" S2 Q! a0 \5 u
   fetcher    *fetcher.Fetcher+ F6 {( I  K) n: I; S( b9 i  n) c" j
   //用来缓存相邻个体列表,peer{}表示网络中的一个远端个体。
0 T9 e4 d) @/ [; j' J, w   peers      *peerSet
6 |2 X' r( B! f3 G% H2 u' X   SubProtocols []p2p.Protocol2 `1 U  Z; \! ]4 Q
   eventMux      *event.TypeMux# A  V! Z$ U9 E- K7 @) H  a4 V
   txsCh         chan core.NewTxsEvent
- T* t4 b3 H/ w/ S+ D: T   txsSub        event.Subscription6 D/ y* y; f5 M  h! q" G& R
   minedBlockSub *event.TypeMuxSubscription9 M; Y2 U8 j( K+ h+ V  N. P5 d
   * `% e2 b! N2 C+ O
   //通过各种通道(chan)和事件订阅(subscription)的方式,接收和发送包括交易和区块在内的数据更新。* |4 d, x( w% [& s
   //当然在应用中,订阅也往往利用通道来实现事件通知。4 K/ f" k: @: @8 o
   // channels for fetcher, syncer, txsyncLoop
) M* f; J/ v6 d   newPeerCh   chan *peer' Y& n; i& ~5 h4 D
   txsyncCh    chan *txsync
+ @7 E2 L' I: S8 i: E( P* `   quitSync    chan struct{}
, c1 |/ y' G# _+ V7 j, E/ I% W   noMorePeers chan struct{}4 v  D- k/ f, T( I3 m9 Y
   // wait group is used for graceful shutdowns during downloading- Q# S# P7 m" s! k; l! d! S0 t& Q3 d
   // and processing( j. |' i0 l! ^& |+ K
   wg sync.WaitGroup. D5 n+ f2 i6 P; f
}
& u/ s. l5 p, O; S' R   Start()函数是ProtocolManager的启动函数,它会在eth.Ethereum.Start()中被主动调用。5 u, c2 H8 T* ~+ T8 |0 @7 P9 g, |
ProtocolManager.Start()会启用4个单独线程(goroutine,协程)去分别执行4个函数,+ e2 |$ `9 p0 ?5 H* \
这也标志着该以太坊个体p2p通信的全面启动。' w- l, ~& b/ m3 s: v: i3 Y
func (pm *ProtocolManager) Start(maxPeers int) {
5 G$ s# [8 f5 f! j7 D, `0 u/ t: \8 `& s   pm.maxPeers = maxPeers4 a) p, q" Q- D& ]
   // broadcast transactions
7 V) c- l) l$ r& ^. h   //广播交易的通道。 txsCh会作为txpool的TxPreEvent订阅通道。
; [1 y% q- \+ n3 O9 c   //txpool有了这种消息会通知给这个txsCh。 广播交易的goroutine会把这个消息广播出去。& l+ z% R# D( Y' X8 b
   pm.txsCh = make(chan core.NewTxsEvent, txChanSize)
/ Q8 _. U4 I2 G5 E( }8 R   //订阅交易信息. Y5 C* v, a5 |6 c+ P/ b& |
   pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh)6 i4 j1 ^9 e6 C: p' b3 v3 p
   " j7 K" ?& h4 A4 w0 f
   go pm.txBroadcastLoop()
+ l4 O$ i! J" J5 B8 V: ^" [+ ?9 B   //订阅挖矿消息。当新的Block被挖出来的时候会产生消息, Q; Y+ s" m# ~9 W" w
   // broadcast mined blocks
# T& p$ J' O/ y   pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
: ~8 y5 `7 P6 B- m' f- t   //挖矿广播 goroutine 当挖出来的时候需要尽快的广播到网络上面去。
/ Q0 t. T7 |' ~( _( D, Q   go pm.minedBroadcastLoop()3 ^" N) O8 J3 w+ N7 v" R# J
   // start sync handlers
0 D, a: r: Y: t, G7 L2 G   // 同步器负责周期性地与网络同步,下载散列和块以及处理通知处理程序。. {# C9 k1 E! J
   go pm.syncer()/ P% Q: P( {- T
   // txsyncLoop负责每个新连接的初始事务同步。 当新的peer出现时,' {) T$ H# z* J0 D+ S$ K
   // 转发所有当前待处理的事务。为了最小化出口带宽使用,我们一次只发送一个小包。
+ B* N* ]$ i9 \   go pm.txsyncLoop()& _# J2 D/ k+ ?/ }/ H( ^: p
}
9 {( m2 d3 G! U//txBroadcastLoop()会在txCh通道的收端持续等待,一旦接收到有关新交易的事件,
$ x* O7 S# Y1 f8 r1 r2 K+ o7 U5 j   //会立即调用BroadcastTx()函数广播给那些尚无该交易对象的相邻个体。. b4 J) u$ V( X6 w! N" ~
//------------------go pm.txBroadcastLoop()-----------------------# y" D- O0 F, `
func (pm *ProtocolManager) txBroadcastLoop() {; r$ O( b$ o  \% t; a6 \
   for {
4 A  P% E& a6 U+ N9 p, ?' c           select {6 K  \; a$ M, A6 G% I+ K
           case event := = pm.maxPeers && !p.Peer.Info().Network.Trusted {
  U& D' V' R( t; u% D5 f2 T           return p2p.DiscTooManyPeers
2 C) g/ V* n* H" X   }
/ d8 D6 Q2 a" ?   p.Log().Debug("Ethereum peer connected", "name", p.Name())
; _$ @2 X' d3 B* C7 E" W6 O: z9 c$ v' J4 y   // Execute the Ethereum handshake2 B! j/ N# Y9 l
   var (
  ^: U* t! K4 A! |: _           genesis = pm.blockchain.Genesis()9 L! M, F1 E" H$ w2 L
           head    = pm.blockchain.CurrentHeader()
, K, f% d4 u* |( ~           hash    = head.Hash()
$ B6 R5 F) w+ P/ Q1 Q           number  = head.Number.Uint64()
, _) {1 \( B: W           td      = pm.blockchain.GetTd(hash, number)3 _& ~( ^; J/ s/ S; k: a* y
   )
4 g- t& u% `0 k2 ^9 U( I- j   //握手,与对方peer沟通己方的区块链状态% ]$ j4 c" r7 u& m' h6 b
   if err := p.Handshake(pm.networkID, td, hash, genesis.Hash()); err != nil {
- f  }9 J) z% J- l' e! Z4 y1 J- j           p.Log().Debug("Ethereum handshake failed", "err", err)
, p( g% u. e$ F- P8 M# c           return err0 J) D2 j0 W4 s( z6 x+ p
   }; ~) V. B* s& m
   //初始化一个读写通道,用以跟对方peer相互数据传输。
# G3 r1 i* D, a# b/ R   if rw, ok := p.rw.(*meteredMsgReadWriter); ok {
* h9 s9 D' v" K* K9 j$ c6 [( b           rw.Init(p.version)
8 `$ S. o0 |! T7 t   }
8 y- g+ U% j+ G5 [  X3 M* }   // Register the peer locally
, h5 W2 B  V: K8 n. O   //注册对方peer,存入己方peer列表;只有handle()函数退出时,才会将这个peer移除出列表。
+ A# D2 q+ D9 s. }1 m   if err := pm.peers.Register(p); err != nil {
2 O% ^$ I2 S5 o( y. U           p.Log().Error("Ethereum peer registration failed", "err", err)  u9 l; o9 B. @. b
           return err$ x9 J( f% G- ^4 u+ L
   }, F! a/ i# y+ }1 A6 L
   defer pm.removePeer(p.id)6 o- }. E4 V0 v$ G5 M! f
   //Downloader成员注册这个新peer;Downloader会自己维护一个相邻peer列表。$ N  V, y- C) j
   // Register the peer in the downloader. If the downloader considers it banned, we disconnect
% p, o& K" h5 T! \5 N' S" e* e: w   if err := pm.downloader.RegisterPeer(p.id, p.version, p); err != nil {
0 L; X) m& i  k) R7 L3 ~, D7 p8 c- P           return err, u5 C) i1 }. ~$ r9 O
   }8 o9 [" [' ]2 l' i- g: N4 X: d, b
   // Propagate existing transactions. new transactions appearing
4 `5 I6 U: r; F) E" E- }   // after this will be sent via broadcasts.
0 O" F( O" Q. Q   /*
% s# y* @' B3 O: R: [; O0 \   调用syncTransactions(),用当前txpool中新累计的tx对象组装成一个txsync{}对象,; L; e8 Y2 @- o* }
   推送到内部通道txsyncCh。还记得Start()启动的四个函数么? 其中第四项txsyncLoop()
" b4 q. M* ?9 Z$ H. M( @6 B   中用以等待txsync{}数据的通道txsyncCh,正是在这里被推入txsync{}的。
6 Y7 F+ V- r3 _! w6 I: Z   */
+ r# p+ o2 O8 t0 w2 e   pm.syncTransactions(p)0 h7 O  X0 L6 h6 l
   // If we're DAO hard-fork aware, validate any remote peer with regard to the hard-fork3 ^% g! D+ j  r3 ^% ]
   if daoBlock := pm.chainconfig.DAOForkBlock; daoBlock != nil {; x0 q  f( h( Y
           // Request the peer's DAO fork header for extra-data validation
+ R- i4 a: ]- n( s7 `           if err := p.RequestHeadersByNumber(daoBlock.Uint64(), 1, 0, false); err != nil {
! \2 U0 x5 C, w# {& F; N6 r. x1 ]                   return err+ O3 ]0 x  ]' Q5 l) |
           }
0 I. m4 \/ d7 i/ h           // Start a timer to disconnect if the peer doesn't reply in time
8 R) x. @, A! Q$ d9 b           p.forkDrop = time.AfterFunc(daoChallengeTimeout, func() {
. |0 }: A2 _: m7 X$ o                   p.Log().Debug("Timed out DAO fork-check, dropping")" Z/ e% j3 C  p  ^
                   pm.removePeer(p.id)* f# S! Z. Q$ a4 X3 |  ?! _% N
           })2 P! |' b) j" x6 N
           // Make sure it's cleaned up if the peer dies off
' Z5 r6 C- f) X/ m  f6 y% L           defer func() {8 r5 N1 `& s6 E: h& A* k3 U' a% G
                   if p.forkDrop != nil {
1 C8 @, w0 y" a9 t6 u' F# u7 u$ t                           p.forkDrop.Stop()& ]# G% C/ G% {4 |& L+ `$ R
                           p.forkDrop = nil
7 V" L& H" m) }  m+ L% `* F0 L) V                   }
. v, ]# c4 C" |. I4 ^           }(). b" }% d- ~9 \6 w+ ~& [1 V: C  Y
   }  [8 ^7 K+ ~4 B$ X1 A! s
           //在无限循环中启动handleMsg(),当对方peer发出任何msg时,
% H$ T/ A9 o" M$ i8 N& [4 H           //handleMsg()可以捕捉相应类型的消息并在己方进行处理。9 O& [1 N+ Z5 ]1 i- k) j; l9 u4 ]5 \
   // main loop. handle incoming messages.
  d. x8 f/ ~) P1 W3 Q# W+ s+ B9 a   for {
' h4 q  m' o; D. S' a- i9 Z           if err := pm.handleMsg(p); err != nil {
5 C& S5 g4 x3 S1 ?# B# u                   p.Log().Debug("Ethereum message handling failed", "err", err)
, `- n- b, Z# Y9 V3 f, K                   return err
- q. J$ t. ^. o  Q# H           }
2 v; i( H! Q6 U   }
' H2 s6 T0 g9 g# v) E+ H$ N}
BitMere.com 比特池塘系信息发布平台,比特池塘仅提供信息存储空间服务。
声明:该文观点仅代表作者本人,本文不代表比特池塘立场,且不构成建议,请谨慎对待。
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

成为第一个吐槽的人

Mohammad61417 小学生
  • 粉丝

    0

  • 关注

    0

  • 主题

    2