深入区块链以太坊源码之p2p通信
Mohammad61417
发表于 2022-12-7 15:31:22
182
0
0
无结构化的:+ l1 H, \. l$ x) T2 l9 A
这种p2p网络即最普通的,不对结构作特别设计的实现方案。
优点是结构简单易于组建,网络局部区域内个体可任意分布,( v( Z* e0 r8 R: N& V, f7 E
反正此时网络结构对此也没有限制;特别是在应对大量新个体加
入网络和旧个体离开网络(“churn”)时它的表现非常稳定。+ A, B7 a d1 O1 d: e- g3 L1 F3 d
缺点在于在该网络中查找数据的效率太低,因为没有预知信息,
所以往往需要将查询请求发遍整个网络(至少大多数个体),. A3 ]! s3 I7 \' r! d/ F/ f9 w" m
这会占用很大一部分网络资源,并大大拖慢网络中其他业务运行。2 Q; v; |7 [% p' c4 l! Z& S
结构化的:
这种p2p网络中的个体分布经过精心设计,主要目的是为了提高查询数据的效率,
降低查询数据带来的资源消耗。
以太坊采用了不需要结构化的结构,经过改进的非结构化(比如设计好相邻个体列表peerSet结构) {5 _' D- V6 J) U9 U- X/ b! }4 T
网络模型可以满足需求;
二、分布式hash表(DHT)
保存数据
(以下只是大致原理,具体的协议实现可能会有差异). M/ L0 I( k8 c2 _ w7 b
当某个节点得到了新加入的数据(K/V),它会先计算自己与新数据的 key 之间的“距离”;
然后再计算它所知道的其它节点与这个 key 的距离。8 J8 @% D& o& I1 w: R* Y
如果计算下来,自己与 key 的距离最小,那么这个数据就保持在自己这里。7 r, z8 v& k K }
否则的话,把这个数据转发给距离最小的节点。" a4 F/ q6 g- `, a/ ]
收到数据的另一个节点,也采用上述过程进行处理(递归处理)。
获取数据% T y$ ^1 `$ a1 x
(以下只是大致原理,具体的协议实现可能会有差异)
当某个节点接收到查询数据的请求(key),它会先计算自己与 key 之间的“距离”;" p! r1 t7 ?. e% b8 D7 T# {
然后再计算它所知道的其它节点与这个 key 的距离。) a2 @( c$ Q R( m3 p0 m/ `( G
如果计算下来,自己与 key 的距离最小,那么就在自己这里找有没有 key 对应的 value。
有的话就返回 value,没有的话就报错。, Y& i& x2 I0 A$ L4 T
否则的话,把这个数据转发给距离最小的节点。
收到数据的另一个节点,也采用上述过程进行处理(递归处理)。
三、以太坊中p2p通信的管理模块ProtocolManager E! U5 v2 l0 {% H, W
/geth.go
// Start creates a live P2P node and starts running it.. t3 [' Q: @; l6 W9 l1 Y7 a
func (n *Node) Start() error {2 \6 k( n( q; k9 A9 J5 k1 v* P
return n.node.Start()
}
/*/ H5 z3 |* q* Z* g
Protocol:容纳应用程序所要求的回调函数等.并通过p2p.Server{}在新连接建立后,将其传递给通信对象peer。
Node.Start()中首先会创建p2p.Server{},此时Server中的Protocol[]还是空的;5 L( U+ ^4 z% `- J
然后将Node中载入的所有实现体中的Protocol都收集起来,: N4 W+ y# ]9 M
一并交给Server对象,作为Server.Protocols列表;然后启动Server对象,* ?. F/ J7 d' f- U; y& f
并将Server对象作为参数去逐一启动每个实现体。
*/+ \, [& n* x8 P4 q: ~
/node.go4 e5 w3 T; k* Q4 ?
// Start create a live P2P node and starts running it.# ^; g$ {6 @( a7 t0 \! s
func (n *Node) Start() error {
...' o, q5 r/ o9 r' S2 {( B g2 U
/*5 V8 ~/ `- D7 @
...# u# ?% R7 b( q# }2 E' O( L0 e
初始化serverConfig
*/
running := &p2p.Server{Config: n.serverConfig}( q P+ n" f8 q$ l. W: h6 t
...
// Gather the protocols and start the freshly assembled P2P server
for _, service := range services {
running.Protocols = append(running.Protocols, service.Protocols()...); y+ o+ a$ T, D$ f, ^
}
if err := running.Start(); err != nil { //见下面的(srv *Server)Start方法1 c3 Z! W5 P: I2 Z4 N( I+ X2 {
return convertFileLockError(err)
}# W( G. e: f8 |; l" L
// Start each of the services. @: k P, k$ n8 n4 y
started := []reflect.Type{}/ U+ w& O7 J0 V0 a
for kind, service := range services {. C/ v' s: U, \1 X5 F4 N1 d( O
// Start the next service, stopping all previous upon failure$ ~8 {, P+ _' ^# W
//启动每个services通过下面的方法func (s *Ethereum) Start(srvr *p2p.Server) error {
if err := service.Start(running); err != nil {
for _, kind := range started {' y1 l& p& Y; D3 R, f1 {6 g3 n
services[kind].Stop()
}: T# O x. {3 w
running.Stop()
return err6 q7 w% O0 `$ O1 t
}
...
}
}
// Start starts running the server.
// Servers can not be re-used after stopping.& m) r- m$ E2 R7 n
func (srv *Server) Start() (err error) {
srv.lock.Lock()
//srv.lock为了避免多线程重复启动1 K0 M% _; y. n% ?# D% q
defer srv.lock.Unlock()
if srv.running {
return errors.New("server already running")! R$ z, N8 s: P. Q
}; u) p j o, ?0 I% @$ d3 u
srv.running = true
srv.log = srv.Config.Logger* D% Q v' w6 w4 {1 o
if srv.log == nil {
srv.log = log.New()
}6 `, L3 M; h8 _! P' U
if srv.NoDial && srv.ListenAddr == "" {* Q- _" R/ Y9 s- X* [7 Y2 O
srv.log.Warn("P2P server will be useless, neither dialing nor listening")
}
// static fields9 G L% d) N& j. K# p( [
if srv.PrivateKey == nil {; w' h# K1 \+ K8 }/ K+ g: B0 ?: X7 s4 _- r
return fmt.Errorf("Server.PrivateKey must be set to a non-nil key")
}( ?4 d! E& u" H# G6 o6 ]
//newTransport使用了newRLPX使用了rlpx.go中的网络协议。
if srv.newTransport == nil {
srv.newTransport = newRLPX3 [" Y) E& x1 z5 s+ U4 w; J
}$ S0 g; Z, L/ N/ S
if srv.Dialer == nil {
srv.Dialer = TCPDialer{&net.Dialer{Timeout: defaultDialTimeout}}
}
srv.quit = make(chan struct{})
srv.addpeer = make(chan *conn); H4 e5 m3 P; e" k
srv.delpeer = make(chan peerDrop)
srv.posthandshake = make(chan *conn)
srv.addstatic = make(chan *enode.Node)5 R% L4 P) l: ^* e0 T6 m) a
srv.removestatic = make(chan *enode.Node)
srv.addtrusted = make(chan *enode.Node)
srv.removetrusted = make(chan *enode.Node)
srv.peerOp = make(chan peerOpFunc)
srv.peerOpDone = make(chan struct{})
//srv.setupLocalNode()这里主要执行握手# H; S/ Y' K a
if err := srv.setupLocalNode(); err != nil {
return err
}$ m: _; T; e& M9 R5 c
if srv.ListenAddr != "" {
//监听TCP端口-->用于业务数据传输,基于RLPx协议)0 z5 I# A1 t* A
//在setupListening中有个go srv.listenLoop()去监听某个端口有无主动发来的IP连接
if err := srv.setupListening(); err != nil {( X7 z6 l" m: b }/ K; ~& A# ^
return err
}
}* g. p0 q3 L, C3 r3 B; y0 t$ H
//侦听UDP端口(用于结点发现内部会启动goroutine)7 O E# Q2 N5 K/ W+ F5 B
if err := srv.setupDiscovery(); err != nil {
return err
}
dynPeers := srv.maxDialedConns()8 [* K& J) m! \& A) F
dialer := newDialState(srv.localnode.ID(), srv.StaticNodes, srv.BootstrapNodes, srv.ntab, dynPeers, srv.NetRestrict)% m) ?8 `3 d5 \! s
srv.loopWG.Add(1)
// 启动新线程发起TCP连接请求
//在run()函数中,监听srv.addpeer通道有没有信息如果有远端peer发来连接请求,. P5 _- d$ I: ]$ D
//则调用Server.newPeer()生成新的peer对象,并把Server.Protocols全交给peer。
/*
case c := 0 {
if s.config.LightPeers >= srvr.MaxPeers {- ], @* o+ V- X4 N2 f
return fmt.Errorf("invalid peer config: light peer count (%d) >= total peer count (%d)", s.config.LightPeers, srvr.MaxPeers)
}
maxPeers -= s.config.LightPeers
}
// Start the networking layer and the light server if requested+ g8 c$ ^$ c) ?1 w" A
s.protocolManager.Start(maxPeers)
if s.lesServer != nil {! Z( ~1 _; |. g2 v' p# Y b# o q
s.lesServer.Start(srvr)
}
return nil
}
/eth/handler.go; j/ S! K; K! h" T7 w5 u5 c2 W h
type ProtocolManager struct {9 y. o6 |8 l7 U6 U
networkID uint64+ j+ c' m5 S0 j% [: S
fastSync uint32 // Flag whether fast sync is enabled (gets disabled if we already have blocks)# f6 w0 s+ ~$ O" p
acceptTxs uint32 // Flag whether we're considered synchronised (enables transaction processing)
txpool txPool
blockchain *core.BlockChain$ @* O" K5 P2 f0 t3 s8 U
chainconfig *params.ChainConfig, B3 P0 D( H6 r& D
maxPeers int
//Downloader类型成员负责所有向相邻个体主动发起的同步流程。+ d$ |# T. y, h5 B
downloader *downloader.Downloader/ U/ ], V1 F+ Q8 D, ?* `. R
//Fetcher类型成员累积所有其他个体发送来的有关新数据的宣布消息,并在自身对照后做出安排
fetcher *fetcher.Fetcher
//用来缓存相邻个体列表,peer{}表示网络中的一个远端个体。
peers *peerSet( p8 D1 k% p- Y1 v2 Q
SubProtocols []p2p.Protocol3 Z* u- P8 q1 M8 Y" C* W- @
eventMux *event.TypeMux4 C* C/ N6 H3 H3 m! {
txsCh chan core.NewTxsEvent2 N- Q( ^; M: p) W' R9 T" N
txsSub event.Subscription& k) w6 X- P' s. S, j
minedBlockSub *event.TypeMuxSubscription
//通过各种通道(chan)和事件订阅(subscription)的方式,接收和发送包括交易和区块在内的数据更新。
//当然在应用中,订阅也往往利用通道来实现事件通知。
// channels for fetcher, syncer, txsyncLoop
newPeerCh chan *peer
txsyncCh chan *txsync9 B S5 c1 f) J' V [
quitSync chan struct{}* G) A4 v, I$ p0 Y, }
noMorePeers chan struct{}
// wait group is used for graceful shutdowns during downloading4 m7 @. F. ^3 u: k6 [, q" x& c" S
// and processing
wg sync.WaitGroup( R; x0 j. L& p4 n7 b
}
Start()函数是ProtocolManager的启动函数,它会在eth.Ethereum.Start()中被主动调用。
ProtocolManager.Start()会启用4个单独线程(goroutine,协程)去分别执行4个函数,
这也标志着该以太坊个体p2p通信的全面启动。2 C" E5 g5 p' H9 j/ v. z, Q, q
func (pm *ProtocolManager) Start(maxPeers int) {# z& J4 i; X* t, E* L; C
pm.maxPeers = maxPeers
// broadcast transactions
//广播交易的通道。 txsCh会作为txpool的TxPreEvent订阅通道。* Q6 F7 F* X w0 i0 p" R
//txpool有了这种消息会通知给这个txsCh。 广播交易的goroutine会把这个消息广播出去。
pm.txsCh = make(chan core.NewTxsEvent, txChanSize)$ F3 c2 a+ j7 H3 J( B' f
//订阅交易信息; u! V A1 t. N$ `
pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh)2 c. U, C, L4 c/ m, U" m; _- s
3 c+ c1 Z) P( A1 B, r9 B
go pm.txBroadcastLoop()2 L- E3 x8 ~6 L$ Q/ a( F( h
//订阅挖矿消息。当新的Block被挖出来的时候会产生消息- U: W3 ?' _9 y: ]3 L
// broadcast mined blocks
pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{}). b+ e/ k: C/ ^1 B; V
//挖矿广播 goroutine 当挖出来的时候需要尽快的广播到网络上面去。5 @0 @% `* b( k7 x9 z
go pm.minedBroadcastLoop()
// start sync handlers4 l1 K/ v$ @' e) `2 R, V/ I& c
// 同步器负责周期性地与网络同步,下载散列和块以及处理通知处理程序。
go pm.syncer()$ _6 l8 {! A/ r# _) f; `6 Y
// txsyncLoop负责每个新连接的初始事务同步。 当新的peer出现时,
// 转发所有当前待处理的事务。为了最小化出口带宽使用,我们一次只发送一个小包。
go pm.txsyncLoop()2 ^. ?/ O: r# ~% r3 g2 Q
}" `: j( u- H5 w5 p8 i) P
//txBroadcastLoop()会在txCh通道的收端持续等待,一旦接收到有关新交易的事件,: e& ~$ s2 j$ {: S- j& V
//会立即调用BroadcastTx()函数广播给那些尚无该交易对象的相邻个体。5 s X' T/ ^3 f" Y# u
//------------------go pm.txBroadcastLoop()-----------------------
func (pm *ProtocolManager) txBroadcastLoop() {
for {
select {: V) Q+ l3 {/ P3 S3 H/ k
case event := = pm.maxPeers && !p.Peer.Info().Network.Trusted {
return p2p.DiscTooManyPeers$ G# p) b7 E; a) X0 V1 o! C8 k) _
}) h/ A* h( i$ v, |3 x
p.Log().Debug("Ethereum peer connected", "name", p.Name())8 T' c" m/ p/ s* m6 w' d8 s
// Execute the Ethereum handshake7 \5 t$ w" c7 w, T( K
var (
genesis = pm.blockchain.Genesis()
head = pm.blockchain.CurrentHeader()
hash = head.Hash()& a& p X; @8 d& Q: H5 a
number = head.Number.Uint64()
td = pm.blockchain.GetTd(hash, number)
)& ]. Z) D) k y
//握手,与对方peer沟通己方的区块链状态
if err := p.Handshake(pm.networkID, td, hash, genesis.Hash()); err != nil {
p.Log().Debug("Ethereum handshake failed", "err", err)8 [: g# U1 Y+ A2 P/ M+ o
return err) S( P$ U) z. T* L: m H4 h- p# a
}/ f b0 P9 C9 B! z2 `
//初始化一个读写通道,用以跟对方peer相互数据传输。1 E5 t6 }/ b5 I, ?9 ]; v
if rw, ok := p.rw.(*meteredMsgReadWriter); ok {
rw.Init(p.version)
}
// Register the peer locally
//注册对方peer,存入己方peer列表;只有handle()函数退出时,才会将这个peer移除出列表。
if err := pm.peers.Register(p); err != nil {
p.Log().Error("Ethereum peer registration failed", "err", err)
return err& Z4 c/ [2 e# h/ I0 S N+ p: p: Z
}* G# @ O, E) d; _, W+ j
defer pm.removePeer(p.id)- f# \$ |) T6 W7 ?3 t' u
//Downloader成员注册这个新peer;Downloader会自己维护一个相邻peer列表。7 w# k* m5 G# o
// Register the peer in the downloader. If the downloader considers it banned, we disconnect
if err := pm.downloader.RegisterPeer(p.id, p.version, p); err != nil {
return err
}
// Propagate existing transactions. new transactions appearing
// after this will be sent via broadcasts.
/*
调用syncTransactions(),用当前txpool中新累计的tx对象组装成一个txsync{}对象,4 g4 U9 t& k$ }
推送到内部通道txsyncCh。还记得Start()启动的四个函数么? 其中第四项txsyncLoop()
中用以等待txsync{}数据的通道txsyncCh,正是在这里被推入txsync{}的。 w; e4 k* f! w
*/
pm.syncTransactions(p): b4 D- l; C/ R/ Y6 J2 B
// If we're DAO hard-fork aware, validate any remote peer with regard to the hard-fork
if daoBlock := pm.chainconfig.DAOForkBlock; daoBlock != nil {; A1 X! ]8 \! b: r6 k' L
// Request the peer's DAO fork header for extra-data validation1 O% B$ r5 m+ ~5 j2 L
if err := p.RequestHeadersByNumber(daoBlock.Uint64(), 1, 0, false); err != nil {
return err
}
// Start a timer to disconnect if the peer doesn't reply in time
p.forkDrop = time.AfterFunc(daoChallengeTimeout, func() {( o* Q! E/ s# e( Q6 j" p
p.Log().Debug("Timed out DAO fork-check, dropping")' l7 T, ^9 l7 e1 k2 Q
pm.removePeer(p.id) `, y( A! J" k* m$ V6 x9 N: K
})
// Make sure it's cleaned up if the peer dies off
defer func() {& J0 M4 ~ C4 a+ Z4 }
if p.forkDrop != nil {
p.forkDrop.Stop()9 }) N) J$ `0 u) C
p.forkDrop = nil
}" ^5 W) W8 R9 B) o
}()
}/ C. D2 i: O0 D8 w/ x
//在无限循环中启动handleMsg(),当对方peer发出任何msg时,( h% F# P* f* f6 O9 ^
//handleMsg()可以捕捉相应类型的消息并在己方进行处理。
// main loop. handle incoming messages.
for {9 x" F. [; h* G1 V) x/ J
if err := pm.handleMsg(p); err != nil {
p.Log().Debug("Ethereum message handling failed", "err", err)
return err
}4 c. n h, P0 Y6 W
}
}
成为第一个吐槽的人