深入区块链以太坊源码之p2p通信
Mohammad61417
发表于 2022-12-7 15:31:22
280
0
0
无结构化的:
这种p2p网络即最普通的,不对结构作特别设计的实现方案。1 o' e6 t* C4 h5 }" i4 r
优点是结构简单易于组建,网络局部区域内个体可任意分布,$ a$ ` V3 Q. ?- i* Y" w
反正此时网络结构对此也没有限制;特别是在应对大量新个体加+ P. f& ]- n. {4 I+ v/ m7 g+ Q
入网络和旧个体离开网络(“churn”)时它的表现非常稳定。
缺点在于在该网络中查找数据的效率太低,因为没有预知信息,
所以往往需要将查询请求发遍整个网络(至少大多数个体),% P6 Q+ i) l; I
这会占用很大一部分网络资源,并大大拖慢网络中其他业务运行。! j' [0 ~8 v+ I
结构化的:" Y9 E$ i$ F Y& e s$ E- ^
这种p2p网络中的个体分布经过精心设计,主要目的是为了提高查询数据的效率,- K& b% L5 j( w) n2 m/ o X W
降低查询数据带来的资源消耗。$ |0 r# g# ]" v9 }9 o
以太坊采用了不需要结构化的结构,经过改进的非结构化(比如设计好相邻个体列表peerSet结构)
网络模型可以满足需求;
二、分布式hash表(DHT)
保存数据
(以下只是大致原理,具体的协议实现可能会有差异)
当某个节点得到了新加入的数据(K/V),它会先计算自己与新数据的 key 之间的“距离”;
然后再计算它所知道的其它节点与这个 key 的距离。
如果计算下来,自己与 key 的距离最小,那么这个数据就保持在自己这里。
否则的话,把这个数据转发给距离最小的节点。2 w& `8 w7 Q G: L
收到数据的另一个节点,也采用上述过程进行处理(递归处理)。3 p/ C/ N) ~% j- q% ~: [0 J
获取数据
(以下只是大致原理,具体的协议实现可能会有差异)6 i: l- L. C3 e. C p& i8 c* @( d
当某个节点接收到查询数据的请求(key),它会先计算自己与 key 之间的“距离”;( Q) d' X* @5 n7 B5 T0 t1 R
然后再计算它所知道的其它节点与这个 key 的距离。; Y( s! i$ U0 ], E. t( x2 B7 S
如果计算下来,自己与 key 的距离最小,那么就在自己这里找有没有 key 对应的 value。
有的话就返回 value,没有的话就报错。
否则的话,把这个数据转发给距离最小的节点。
收到数据的另一个节点,也采用上述过程进行处理(递归处理)。* s7 e& o9 P8 a9 N4 K# D
三、以太坊中p2p通信的管理模块ProtocolManager
/geth.go
// Start creates a live P2P node and starts running it.* s3 ^3 V0 d, @& @
func (n *Node) Start() error {
return n.node.Start()" F! n. H8 b3 |0 \* [$ ~: {& @
}$ q1 N* a0 j( I1 Q
/*
Protocol:容纳应用程序所要求的回调函数等.并通过p2p.Server{}在新连接建立后,将其传递给通信对象peer。
Node.Start()中首先会创建p2p.Server{},此时Server中的Protocol[]还是空的;% j2 }# y% i* O( k' v# L8 I
然后将Node中载入的所有实现体中的Protocol都收集起来,
一并交给Server对象,作为Server.Protocols列表;然后启动Server对象,
并将Server对象作为参数去逐一启动每个实现体。) d0 U! n% O" y g2 \0 E% E l3 a5 f* ?
*/
/node.go
// Start create a live P2P node and starts running it.: I5 }3 y- G; y: ]+ x& g' \7 w
func (n *Node) Start() error {3 O( @# s2 K0 X% K8 _. q
...
/*
...% W+ S3 z- g. c2 F8 X$ \1 E
初始化serverConfig1 T8 R' j3 W- w8 g
*/5 i' g5 @" c6 Q- O. y0 v
running := &p2p.Server{Config: n.serverConfig}
...- Y5 b6 x& F* P4 m/ I5 K( o
// Gather the protocols and start the freshly assembled P2P server
for _, service := range services {
running.Protocols = append(running.Protocols, service.Protocols()...)7 F8 [3 U3 T& I* S" m
}
if err := running.Start(); err != nil { //见下面的(srv *Server)Start方法
return convertFileLockError(err)0 V& ?6 S4 g/ {# r
}
// Start each of the services: h ?# g8 c. Q
started := []reflect.Type{}
for kind, service := range services {- ?; q$ v( u0 w2 r9 C. s& A4 P! I
// Start the next service, stopping all previous upon failure5 Z: S/ q0 L/ E- e0 `6 l: c
//启动每个services通过下面的方法func (s *Ethereum) Start(srvr *p2p.Server) error {, h0 W ~/ s- |( e7 v5 n
if err := service.Start(running); err != nil {
for _, kind := range started {% e/ x2 r* s" o* d( A- S/ j
services[kind].Stop(); z/ {& Z7 a$ x
}+ w0 ~* v$ ^2 ?, z X' {! D" [2 `! O
running.Stop()8 G, z! m ]1 f2 @, I& g
return err
}
...2 h" l8 h/ a8 W' Q
}7 N8 g# w n7 y+ g& x8 `1 p
}" @3 @2 n% ]4 W) K
// Start starts running the server.1 ~3 T# C, O$ Y* Z- [% x! m
// Servers can not be re-used after stopping.
func (srv *Server) Start() (err error) {0 g/ N+ K' @1 Z) P# C
srv.lock.Lock()
//srv.lock为了避免多线程重复启动
defer srv.lock.Unlock()0 t# U7 j: W! F2 j& z
if srv.running {2 N6 {: o9 }, D4 S$ {: ?9 s
return errors.New("server already running")
}$ \: Y# S$ m1 G) J
srv.running = true% m: I" n& x1 b( z1 U, X
srv.log = srv.Config.Logger
if srv.log == nil {
srv.log = log.New()" L5 t# c! U l: P. z4 I4 q9 i8 S
}4 E: Y6 l5 D7 t4 ?4 o
if srv.NoDial && srv.ListenAddr == "" { L0 e- v) k" _2 \
srv.log.Warn("P2P server will be useless, neither dialing nor listening")
}3 X- k. G2 h' J. j" ]1 B
// static fields4 c3 Q6 h+ I( G! M8 z
if srv.PrivateKey == nil {/ D4 {# K: P/ [; m
return fmt.Errorf("Server.PrivateKey must be set to a non-nil key")
}
//newTransport使用了newRLPX使用了rlpx.go中的网络协议。
if srv.newTransport == nil {
srv.newTransport = newRLPX
}4 e& E( P; ?$ J9 L4 f7 F. d
if srv.Dialer == nil {
srv.Dialer = TCPDialer{&net.Dialer{Timeout: defaultDialTimeout}}
}2 u" w6 ^5 y; y U, O, ~6 x
srv.quit = make(chan struct{})
srv.addpeer = make(chan *conn), j( i0 f0 d9 `4 o( ^& R9 O
srv.delpeer = make(chan peerDrop)
srv.posthandshake = make(chan *conn)
srv.addstatic = make(chan *enode.Node)# F0 ^! w- Y+ ^6 h
srv.removestatic = make(chan *enode.Node)
srv.addtrusted = make(chan *enode.Node)
srv.removetrusted = make(chan *enode.Node)0 e0 O3 f$ _1 Y/ Z/ b3 F
srv.peerOp = make(chan peerOpFunc)! ~; c7 H6 ]" d# a% ^: O$ Q# x
srv.peerOpDone = make(chan struct{})
//srv.setupLocalNode()这里主要执行握手
if err := srv.setupLocalNode(); err != nil {
return err
}
if srv.ListenAddr != "" {! T3 [4 ?0 x1 G8 h" z! ?
//监听TCP端口-->用于业务数据传输,基于RLPx协议)
//在setupListening中有个go srv.listenLoop()去监听某个端口有无主动发来的IP连接
if err := srv.setupListening(); err != nil {, i7 d' W% a- O: k
return err
}7 {! l5 n4 F1 I$ C
}2 { }" ^! j* A2 p
//侦听UDP端口(用于结点发现内部会启动goroutine)# K6 h) q% U7 U3 p' p8 n( e
if err := srv.setupDiscovery(); err != nil {4 k; r* K6 b9 `
return err6 {7 n; p6 _! z: X$ E( b
}+ l' M5 s: I6 H' o
dynPeers := srv.maxDialedConns()5 v6 N. C- J" s5 j! Q6 F7 Z1 T
dialer := newDialState(srv.localnode.ID(), srv.StaticNodes, srv.BootstrapNodes, srv.ntab, dynPeers, srv.NetRestrict)( b2 F: t0 B$ j. M; G
srv.loopWG.Add(1)
// 启动新线程发起TCP连接请求
//在run()函数中,监听srv.addpeer通道有没有信息如果有远端peer发来连接请求,! }9 U* q3 E( X9 x* \& I4 ]
//则调用Server.newPeer()生成新的peer对象,并把Server.Protocols全交给peer。) v; s& J) e" @
/*
case c := 0 {
if s.config.LightPeers >= srvr.MaxPeers {
return fmt.Errorf("invalid peer config: light peer count (%d) >= total peer count (%d)", s.config.LightPeers, srvr.MaxPeers)$ u2 o% m6 _. ?
}+ k$ c/ p5 F/ V. a
maxPeers -= s.config.LightPeers5 o. @' S" N0 O+ z/ Z; S9 K1 n
}# h, e7 z5 U* v7 ~ B
// Start the networking layer and the light server if requested
s.protocolManager.Start(maxPeers): L9 f3 o$ W: w \% W% T
if s.lesServer != nil {
s.lesServer.Start(srvr)
}( w# i4 O* y/ v2 e8 u
return nil
}) m4 R# p+ l# J% ^; P/ g! k: l
/eth/handler.go- q' u' q, _, p Y
type ProtocolManager struct {1 C6 @) J; P1 P' A
networkID uint64
fastSync uint32 // Flag whether fast sync is enabled (gets disabled if we already have blocks)+ i$ Q9 d2 H% f
acceptTxs uint32 // Flag whether we're considered synchronised (enables transaction processing)
txpool txPool0 d8 l) n: x" n% B0 s4 v- ^. c
blockchain *core.BlockChain
chainconfig *params.ChainConfig) W) \' c7 ~4 J* N$ Z# ?
maxPeers int* e* k& G0 g3 [9 N; i7 K2 P
//Downloader类型成员负责所有向相邻个体主动发起的同步流程。
downloader *downloader.Downloader5 \6 \1 f$ l0 A
//Fetcher类型成员累积所有其他个体发送来的有关新数据的宣布消息,并在自身对照后做出安排$ _; t6 y2 O% p1 _1 U- L
fetcher *fetcher.Fetcher
//用来缓存相邻个体列表,peer{}表示网络中的一个远端个体。
peers *peerSet
SubProtocols []p2p.Protocol
eventMux *event.TypeMux* k R5 ]& \5 w, d
txsCh chan core.NewTxsEvent
txsSub event.Subscription& u# R. z- w' U; i: D
minedBlockSub *event.TypeMuxSubscription
//通过各种通道(chan)和事件订阅(subscription)的方式,接收和发送包括交易和区块在内的数据更新。
//当然在应用中,订阅也往往利用通道来实现事件通知。( t7 n; P+ i& `9 ?+ F: n9 w' B
// channels for fetcher, syncer, txsyncLoop
newPeerCh chan *peer: @6 ~+ I% U" {0 ] S
txsyncCh chan *txsync0 C0 o+ G* i! g A- m* b
quitSync chan struct{}
noMorePeers chan struct{}
// wait group is used for graceful shutdowns during downloading+ {% t* j6 H; Q. a
// and processing e4 ^2 [4 `, ~& \& r7 ^: E
wg sync.WaitGroup1 K3 I! P# V! E! W O7 z- H
}
Start()函数是ProtocolManager的启动函数,它会在eth.Ethereum.Start()中被主动调用。
ProtocolManager.Start()会启用4个单独线程(goroutine,协程)去分别执行4个函数,
这也标志着该以太坊个体p2p通信的全面启动。9 b* g9 k/ H+ S1 v- s/ {
func (pm *ProtocolManager) Start(maxPeers int) {/ D5 h7 F2 T- S$ P' J2 g2 @6 i
pm.maxPeers = maxPeers# ] b0 x [8 L- j5 y0 }
// broadcast transactions
//广播交易的通道。 txsCh会作为txpool的TxPreEvent订阅通道。7 x5 y3 Z/ h; _7 Y$ A+ s; w* x* e; e
//txpool有了这种消息会通知给这个txsCh。 广播交易的goroutine会把这个消息广播出去。: p. N3 b* [3 e* b
pm.txsCh = make(chan core.NewTxsEvent, txChanSize)) y4 O6 X9 i. m$ Y+ a9 y1 }
//订阅交易信息
pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh)/ C$ B# K2 L# l- b; x. x
3 w+ P0 B+ f& Y0 z. v
go pm.txBroadcastLoop()
//订阅挖矿消息。当新的Block被挖出来的时候会产生消息3 \! T4 I7 n! ~" P
// broadcast mined blocks
pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})( W: d) [4 r: D3 q" r
//挖矿广播 goroutine 当挖出来的时候需要尽快的广播到网络上面去。
go pm.minedBroadcastLoop(): g1 W( A2 N* b. A, s% I
// start sync handlers
// 同步器负责周期性地与网络同步,下载散列和块以及处理通知处理程序。
go pm.syncer()2 o, |% j9 T+ ]/ ]7 p
// txsyncLoop负责每个新连接的初始事务同步。 当新的peer出现时,
// 转发所有当前待处理的事务。为了最小化出口带宽使用,我们一次只发送一个小包。
go pm.txsyncLoop(). i {2 ?7 [) A5 [! w3 o1 {4 t
}$ Y, s! \% o w! {
//txBroadcastLoop()会在txCh通道的收端持续等待,一旦接收到有关新交易的事件," l2 O1 E6 Y3 c0 N7 y; w
//会立即调用BroadcastTx()函数广播给那些尚无该交易对象的相邻个体。
//------------------go pm.txBroadcastLoop()-----------------------
func (pm *ProtocolManager) txBroadcastLoop() {5 j- o) G x3 \0 c5 Q
for {9 |) h" J: D& \: T! Q0 X
select {
case event := = pm.maxPeers && !p.Peer.Info().Network.Trusted {. G( {! ^3 C$ T3 j0 v9 R
return p2p.DiscTooManyPeers4 }# }4 l4 {' |0 \ k
}3 n O' i0 y$ X9 Z# @$ `3 s
p.Log().Debug("Ethereum peer connected", "name", p.Name()). g8 u1 N" B( ~. m6 T
// Execute the Ethereum handshake
var (/ }: ]2 @" I- S
genesis = pm.blockchain.Genesis()/ L j! R% ^0 @0 Z6 v
head = pm.blockchain.CurrentHeader()
hash = head.Hash()
number = head.Number.Uint64()2 x. j. ^. d% |5 r7 K( W$ |
td = pm.blockchain.GetTd(hash, number)
)( d4 c8 x9 }0 f0 }+ Y9 ?/ N9 F( R6 \
//握手,与对方peer沟通己方的区块链状态
if err := p.Handshake(pm.networkID, td, hash, genesis.Hash()); err != nil {- B( q5 J/ v6 o7 g. m7 ]6 A
p.Log().Debug("Ethereum handshake failed", "err", err)
return err2 T' d8 F3 `/ c
}
//初始化一个读写通道,用以跟对方peer相互数据传输。& Z# Y: b/ N, i, W
if rw, ok := p.rw.(*meteredMsgReadWriter); ok {
rw.Init(p.version)9 ]. ?; W+ B" k: H
}
// Register the peer locally9 z: R) f6 Z h
//注册对方peer,存入己方peer列表;只有handle()函数退出时,才会将这个peer移除出列表。
if err := pm.peers.Register(p); err != nil {
p.Log().Error("Ethereum peer registration failed", "err", err)/ L" m* n% S0 _, q) f# I3 q
return err2 e4 E `! y2 A3 x1 k1 H7 q: P% p
}
defer pm.removePeer(p.id)
//Downloader成员注册这个新peer;Downloader会自己维护一个相邻peer列表。9 l3 T/ Z* R, R
// Register the peer in the downloader. If the downloader considers it banned, we disconnect4 M/ v) U h# @- H* L9 R; ]
if err := pm.downloader.RegisterPeer(p.id, p.version, p); err != nil {. _. t4 p" ~+ e& e2 I7 w# p c, N
return err
}
// Propagate existing transactions. new transactions appearing! o: y7 L6 w$ h% b5 d
// after this will be sent via broadcasts.
/*
调用syncTransactions(),用当前txpool中新累计的tx对象组装成一个txsync{}对象,
推送到内部通道txsyncCh。还记得Start()启动的四个函数么? 其中第四项txsyncLoop(), E; @$ _9 X4 {4 `' n5 I) v: H& }
中用以等待txsync{}数据的通道txsyncCh,正是在这里被推入txsync{}的。 A: v; m2 U% i f
*/' ^! x/ h# X5 I
pm.syncTransactions(p)( p- @6 s3 T- L9 ]
// If we're DAO hard-fork aware, validate any remote peer with regard to the hard-fork7 \+ Y, \6 q( Z8 m
if daoBlock := pm.chainconfig.DAOForkBlock; daoBlock != nil {
// Request the peer's DAO fork header for extra-data validation$ D4 c. K# X& v8 a, U6 [8 c& `
if err := p.RequestHeadersByNumber(daoBlock.Uint64(), 1, 0, false); err != nil {/ L1 {0 l# r2 G" K$ L
return err5 B% j1 T6 M' I% k
}9 v- o |/ W- I# Y% j, \
// Start a timer to disconnect if the peer doesn't reply in time
p.forkDrop = time.AfterFunc(daoChallengeTimeout, func() {3 s; k, Z7 {( {7 Q( L3 F( z8 Z
p.Log().Debug("Timed out DAO fork-check, dropping")
pm.removePeer(p.id)
})5 ?: C" l" Q6 M7 `
// Make sure it's cleaned up if the peer dies off
defer func() {
if p.forkDrop != nil {9 o, S6 j9 J' z7 B1 V7 R; m
p.forkDrop.Stop()
p.forkDrop = nil
}
}()
}
//在无限循环中启动handleMsg(),当对方peer发出任何msg时,6 b9 w3 W( O7 _' z2 C
//handleMsg()可以捕捉相应类型的消息并在己方进行处理。
// main loop. handle incoming messages.! |; n0 k/ }- x3 F/ A4 Y6 Y
for { o+ t7 O! D4 o+ w
if err := pm.handleMsg(p); err != nil {
p.Log().Debug("Ethereum message handling failed", "err", err)
return err
}
}5 @, w8 X& z" Z# f
}
成为第一个吐槽的人