深入区块链以太坊源码之p2p通信
Mohammad61417
发表于 2022-12-7 15:31:22
240
0
0
无结构化的:; p6 x: L# B1 C: b5 Z% s1 F1 j8 o
这种p2p网络即最普通的,不对结构作特别设计的实现方案。1 I p5 g3 J( \9 H W2 s X
优点是结构简单易于组建,网络局部区域内个体可任意分布,
反正此时网络结构对此也没有限制;特别是在应对大量新个体加
入网络和旧个体离开网络(“churn”)时它的表现非常稳定。
缺点在于在该网络中查找数据的效率太低,因为没有预知信息,7 y2 I' m3 l, n( H/ E) y
所以往往需要将查询请求发遍整个网络(至少大多数个体),$ n( v1 J: W: |' y' M# z
这会占用很大一部分网络资源,并大大拖慢网络中其他业务运行。, \3 ~9 a9 |" N" s4 A
结构化的:
这种p2p网络中的个体分布经过精心设计,主要目的是为了提高查询数据的效率,
降低查询数据带来的资源消耗。$ x& V& e) d, s3 e8 ^) b
以太坊采用了不需要结构化的结构,经过改进的非结构化(比如设计好相邻个体列表peerSet结构). I: ?. H3 ^2 C4 @3 r
网络模型可以满足需求;
二、分布式hash表(DHT)" P# C% [+ i2 a" g! o0 g
保存数据
(以下只是大致原理,具体的协议实现可能会有差异)# w" L3 l) @* T8 E. K$ q
当某个节点得到了新加入的数据(K/V),它会先计算自己与新数据的 key 之间的“距离”;
然后再计算它所知道的其它节点与这个 key 的距离。
如果计算下来,自己与 key 的距离最小,那么这个数据就保持在自己这里。+ j( {; q: g- |0 i5 ?
否则的话,把这个数据转发给距离最小的节点。
收到数据的另一个节点,也采用上述过程进行处理(递归处理)。
获取数据( s! P- d3 _& R) Y4 J2 E
(以下只是大致原理,具体的协议实现可能会有差异)7 y6 n B5 [; e, \" S) o# o" D
当某个节点接收到查询数据的请求(key),它会先计算自己与 key 之间的“距离”;
然后再计算它所知道的其它节点与这个 key 的距离。
如果计算下来,自己与 key 的距离最小,那么就在自己这里找有没有 key 对应的 value。9 o0 C3 r# U$ J- m. S% O
有的话就返回 value,没有的话就报错。1 K$ y. a- V/ v) r
否则的话,把这个数据转发给距离最小的节点。7 l6 M4 I3 r) K
收到数据的另一个节点,也采用上述过程进行处理(递归处理)。
三、以太坊中p2p通信的管理模块ProtocolManager
/geth.go" V0 E" m8 M1 A" U1 y
// Start creates a live P2P node and starts running it. M/ B4 _5 H' h& K2 H+ K& V
func (n *Node) Start() error {" h! L/ q( n( a; ^5 F8 R* L3 U
return n.node.Start()
}
/*
Protocol:容纳应用程序所要求的回调函数等.并通过p2p.Server{}在新连接建立后,将其传递给通信对象peer。: J. m9 R$ G6 G# a/ w
Node.Start()中首先会创建p2p.Server{},此时Server中的Protocol[]还是空的;# O2 g$ H5 ? } [% p
然后将Node中载入的所有实现体中的Protocol都收集起来,0 g$ y% S) J0 ^( a/ C5 I4 x
一并交给Server对象,作为Server.Protocols列表;然后启动Server对象,: T9 f# F+ \4 J: j7 w
并将Server对象作为参数去逐一启动每个实现体。
*/
/node.go
// Start create a live P2P node and starts running it.
func (n *Node) Start() error {4 ?, _: |, A. j9 H; u
. _% |7 |* D4 i) ^3 i
...- ?2 |" t9 d0 B4 e+ v
/*5 g; I; ]6 h/ T" l
.../ F: c) C' @' B5 R& V( l0 @/ E
初始化serverConfig
*/
running := &p2p.Server{Config: n.serverConfig}
...
// Gather the protocols and start the freshly assembled P2P server
for _, service := range services {
running.Protocols = append(running.Protocols, service.Protocols()...); a4 R3 Z& V; f9 I, v% T
}5 d( s5 [. j$ }! {/ s' z& r1 I
if err := running.Start(); err != nil { //见下面的(srv *Server)Start方法5 [* h5 v7 u6 n' r: B
return convertFileLockError(err)
}6 ~5 P) U7 {% G* |. _! v( k
// Start each of the services; B# _7 F; V1 [$ q c
started := []reflect.Type{}
for kind, service := range services {
// Start the next service, stopping all previous upon failure
//启动每个services通过下面的方法func (s *Ethereum) Start(srvr *p2p.Server) error {
if err := service.Start(running); err != nil {: e& s% e2 c, i+ @3 P, H+ l
for _, kind := range started {9 Q- i% p. _5 v u, ?; I
services[kind].Stop()
}
running.Stop()
return err
}
...
}
}# c: u% Y3 L$ o" L- P6 r8 z _
// Start starts running the server.! }; L4 u! R/ c8 d- B
// Servers can not be re-used after stopping.2 [0 D7 R* V' v
func (srv *Server) Start() (err error) {
srv.lock.Lock()
//srv.lock为了避免多线程重复启动
defer srv.lock.Unlock()
if srv.running {
return errors.New("server already running")
}
srv.running = true8 F2 b; M8 o- r) ]' [* \7 }* T/ P
srv.log = srv.Config.Logger
if srv.log == nil {
srv.log = log.New()
}
if srv.NoDial && srv.ListenAddr == "" {9 f) {! L0 y7 C; t, d" s% I- j; z
srv.log.Warn("P2P server will be useless, neither dialing nor listening")
}, n. z& S" t( [! N
// static fields2 p8 x. B* c' q) U# y" p% \8 y$ m
if srv.PrivateKey == nil {/ C5 r7 l9 r# o8 I
return fmt.Errorf("Server.PrivateKey must be set to a non-nil key")& P, l, P6 q% n2 ?1 L3 r
}) P. E. O' B/ ]
//newTransport使用了newRLPX使用了rlpx.go中的网络协议。. s3 p, Z: |" G. k1 H- _$ Q( p
if srv.newTransport == nil {
srv.newTransport = newRLPX6 a0 K+ ?* e' V' j; W8 {
}. L5 d) W. z# b# `
if srv.Dialer == nil {( l" p- H+ s0 Z8 \5 E( a/ } c% O2 D
srv.Dialer = TCPDialer{&net.Dialer{Timeout: defaultDialTimeout}}
}; i5 }5 O) S& U# h2 b8 W
srv.quit = make(chan struct{})5 h4 \$ _* q5 x
srv.addpeer = make(chan *conn)( D. i" q% H7 |# I
srv.delpeer = make(chan peerDrop)
srv.posthandshake = make(chan *conn)( D, J# u5 E& n3 y$ V7 P! T0 J
srv.addstatic = make(chan *enode.Node)( R2 D# {: b$ r( f: O* W6 v7 u3 f5 B0 e
srv.removestatic = make(chan *enode.Node)3 K, U# l5 _' j* h5 t' a' p1 ?; T! V
srv.addtrusted = make(chan *enode.Node)2 I! I/ |4 Y" H+ D
srv.removetrusted = make(chan *enode.Node)
srv.peerOp = make(chan peerOpFunc)
srv.peerOpDone = make(chan struct{})6 o/ }; w- h0 x* ? H6 C/ O
//srv.setupLocalNode()这里主要执行握手
if err := srv.setupLocalNode(); err != nil {, _( G- n7 B: E/ r; W+ S. L
return err$ {8 |% w4 G ]! W
}7 j1 o$ R) I0 Z% B- i
if srv.ListenAddr != "" {: _6 l6 z5 \% e5 V( {9 v, `4 Z; }
//监听TCP端口-->用于业务数据传输,基于RLPx协议)) h' t/ J+ I& j7 F% u6 f
//在setupListening中有个go srv.listenLoop()去监听某个端口有无主动发来的IP连接
if err := srv.setupListening(); err != nil {
return err1 l8 P- B4 p; l( i
}
}$ A% x6 C8 }( O! F2 A( D. n# u+ g: a
//侦听UDP端口(用于结点发现内部会启动goroutine)
if err := srv.setupDiscovery(); err != nil {
return err
}- Y! c4 s8 H; H3 _
dynPeers := srv.maxDialedConns()" C# z% e9 s4 m" @! Y( U
dialer := newDialState(srv.localnode.ID(), srv.StaticNodes, srv.BootstrapNodes, srv.ntab, dynPeers, srv.NetRestrict) s. c5 k" M+ |3 }! O e0 |
srv.loopWG.Add(1); l, U' O( ^4 B1 b6 C, R
// 启动新线程发起TCP连接请求
//在run()函数中,监听srv.addpeer通道有没有信息如果有远端peer发来连接请求,5 b) G" ?9 T7 a4 V: @8 a2 {9 e& |
//则调用Server.newPeer()生成新的peer对象,并把Server.Protocols全交给peer。: M( C: L: O7 ?
/*
case c := 0 {( d8 o/ T+ M: o1 u q
if s.config.LightPeers >= srvr.MaxPeers { L" ~& W# j" d! K- \' m* S0 C4 W
return fmt.Errorf("invalid peer config: light peer count (%d) >= total peer count (%d)", s.config.LightPeers, srvr.MaxPeers)
}- I- ^ T3 K6 Q( m+ @
maxPeers -= s.config.LightPeers; f9 X& C$ W" s# m/ J" P; O. }
}6 \$ {7 Z9 k/ k0 j! s; W% @6 B
// Start the networking layer and the light server if requested3 @- E+ f! n( V" v2 Y: y
s.protocolManager.Start(maxPeers)2 C9 B8 F) T& k- P' O
if s.lesServer != nil {% H# a% L1 W, O% g5 A# u$ ]' ]" J
s.lesServer.Start(srvr)" ~. t% Z+ I/ l4 K a/ {* }
}$ D8 x% D# @0 k, A5 x! q
return nil
}
/eth/handler.go
type ProtocolManager struct {. m. [) j5 M8 \2 {. F
networkID uint64
fastSync uint32 // Flag whether fast sync is enabled (gets disabled if we already have blocks)8 M4 G- o: O! O. Y4 P* H. \
acceptTxs uint32 // Flag whether we're considered synchronised (enables transaction processing)7 X% u' i& L5 }5 w! \& F7 B0 h
txpool txPool
blockchain *core.BlockChain
chainconfig *params.ChainConfig
maxPeers int& o$ ]' M7 z2 e! J) J, ^4 e$ X! |3 `
//Downloader类型成员负责所有向相邻个体主动发起的同步流程。% `5 q. q, Q U4 z' Q, B
downloader *downloader.Downloader
//Fetcher类型成员累积所有其他个体发送来的有关新数据的宣布消息,并在自身对照后做出安排$ q: v, R1 R% f+ [4 c1 S0 C
fetcher *fetcher.Fetcher1 \- _3 {: q3 b, h
//用来缓存相邻个体列表,peer{}表示网络中的一个远端个体。3 z2 w, R8 [' [% `7 A( ^
peers *peerSet
SubProtocols []p2p.Protocol
eventMux *event.TypeMux
txsCh chan core.NewTxsEvent
txsSub event.Subscription
minedBlockSub *event.TypeMuxSubscription
) K8 \1 K- S+ ]: {0 K4 P- K- N0 G
//通过各种通道(chan)和事件订阅(subscription)的方式,接收和发送包括交易和区块在内的数据更新。, @) T8 d4 L* G4 R( i- K
//当然在应用中,订阅也往往利用通道来实现事件通知。
// channels for fetcher, syncer, txsyncLoop
newPeerCh chan *peer
txsyncCh chan *txsync0 n# e/ G% n, u' x" ?- w5 O
quitSync chan struct{}3 s# s E/ L1 Y$ B
noMorePeers chan struct{}- X; a- `* `! k) G$ |
// wait group is used for graceful shutdowns during downloading
// and processing
wg sync.WaitGroup" d, C/ u. }! s0 ^ a" D
}0 u. V8 r1 y) K7 Q
Start()函数是ProtocolManager的启动函数,它会在eth.Ethereum.Start()中被主动调用。
ProtocolManager.Start()会启用4个单独线程(goroutine,协程)去分别执行4个函数,
这也标志着该以太坊个体p2p通信的全面启动。4 ^5 ~% {, G" Z' {
func (pm *ProtocolManager) Start(maxPeers int) {
pm.maxPeers = maxPeers
// broadcast transactions
//广播交易的通道。 txsCh会作为txpool的TxPreEvent订阅通道。2 J, e" I0 k# d( p8 D/ V
//txpool有了这种消息会通知给这个txsCh。 广播交易的goroutine会把这个消息广播出去。
pm.txsCh = make(chan core.NewTxsEvent, txChanSize)' f, K4 @$ j9 a% ^' x( ^1 D# T
//订阅交易信息
pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh) @7 _) P5 a, a4 `* S0 r
go pm.txBroadcastLoop()* A3 j% Q3 s3 a
//订阅挖矿消息。当新的Block被挖出来的时候会产生消息
// broadcast mined blocks
pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
//挖矿广播 goroutine 当挖出来的时候需要尽快的广播到网络上面去。
go pm.minedBroadcastLoop()
// start sync handlers A. g2 d: q- k# H1 L7 ^* q
// 同步器负责周期性地与网络同步,下载散列和块以及处理通知处理程序。; q& _, l& ^8 y8 i2 y! ?9 }
go pm.syncer()$ F3 d& Z$ v6 N3 _
// txsyncLoop负责每个新连接的初始事务同步。 当新的peer出现时,5 z R+ K9 z1 K' E7 M. c8 L
// 转发所有当前待处理的事务。为了最小化出口带宽使用,我们一次只发送一个小包。2 |5 n. q0 z! d. L
go pm.txsyncLoop()
}
//txBroadcastLoop()会在txCh通道的收端持续等待,一旦接收到有关新交易的事件,
//会立即调用BroadcastTx()函数广播给那些尚无该交易对象的相邻个体。
//------------------go pm.txBroadcastLoop()-----------------------
func (pm *ProtocolManager) txBroadcastLoop() {
for {. J; k" t5 m5 }$ G9 j) x+ c
select {# E2 R; N% q; ^ v. f% b
case event := = pm.maxPeers && !p.Peer.Info().Network.Trusted {
return p2p.DiscTooManyPeers' C5 a& o8 D% O2 y" I8 Q0 R) H
}: e- o" F7 M8 R9 g. W
p.Log().Debug("Ethereum peer connected", "name", p.Name())# a4 Q9 O9 T" W/ `" A. {
// Execute the Ethereum handshake' a! g* M& p7 Y, z3 K" G: L! Q1 y
var (
genesis = pm.blockchain.Genesis()
head = pm.blockchain.CurrentHeader(), f1 |3 N6 j" V4 p4 Y+ y
hash = head.Hash()" z2 B& U( J. Z
number = head.Number.Uint64()
td = pm.blockchain.GetTd(hash, number)
)
//握手,与对方peer沟通己方的区块链状态) d! E! H% w3 J* X& {' D& Y
if err := p.Handshake(pm.networkID, td, hash, genesis.Hash()); err != nil {
p.Log().Debug("Ethereum handshake failed", "err", err)
return err
}) l6 ?' N0 h/ E. o: ?# Q: f# `
//初始化一个读写通道,用以跟对方peer相互数据传输。
if rw, ok := p.rw.(*meteredMsgReadWriter); ok {0 S* z' g3 w; y( a1 }, s
rw.Init(p.version)
}5 w: |! {$ f8 U6 \, O& n. {
// Register the peer locally
//注册对方peer,存入己方peer列表;只有handle()函数退出时,才会将这个peer移除出列表。
if err := pm.peers.Register(p); err != nil {
p.Log().Error("Ethereum peer registration failed", "err", err)4 g5 T* ^3 G. |
return err
}" X0 R& }, i2 k C# {
defer pm.removePeer(p.id)
//Downloader成员注册这个新peer;Downloader会自己维护一个相邻peer列表。% Q' U9 m/ r8 z
// Register the peer in the downloader. If the downloader considers it banned, we disconnect" G& \9 E* F( ^( C& B
if err := pm.downloader.RegisterPeer(p.id, p.version, p); err != nil {
return err
}
// Propagate existing transactions. new transactions appearing$ I$ i. c; j D/ M0 J) u& U0 Z
// after this will be sent via broadcasts.
/*
调用syncTransactions(),用当前txpool中新累计的tx对象组装成一个txsync{}对象,' h& N3 B+ t+ V n5 A
推送到内部通道txsyncCh。还记得Start()启动的四个函数么? 其中第四项txsyncLoop()
中用以等待txsync{}数据的通道txsyncCh,正是在这里被推入txsync{}的。3 B H& z& Q' [9 V
*/" Y; ^/ \8 e) _4 r# m
pm.syncTransactions(p)5 Y* o. ?+ Y# h' o! ^+ E
// If we're DAO hard-fork aware, validate any remote peer with regard to the hard-fork' N$ V0 z3 w3 M- L
if daoBlock := pm.chainconfig.DAOForkBlock; daoBlock != nil {
// Request the peer's DAO fork header for extra-data validation/ j( w9 w) `5 z! P8 U$ u
if err := p.RequestHeadersByNumber(daoBlock.Uint64(), 1, 0, false); err != nil {
return err
}
// Start a timer to disconnect if the peer doesn't reply in time+ I; m5 i2 J( v
p.forkDrop = time.AfterFunc(daoChallengeTimeout, func() {
p.Log().Debug("Timed out DAO fork-check, dropping")# u X' o ]; a* G: F' b
pm.removePeer(p.id)2 T1 D' N A% ]0 Y. X7 A
})
// Make sure it's cleaned up if the peer dies off% r2 L8 _, x- h/ \% O. k
defer func() {
if p.forkDrop != nil {
p.forkDrop.Stop()
p.forkDrop = nil
}+ q! [! V3 ], H$ ?9 K
}()
}
//在无限循环中启动handleMsg(),当对方peer发出任何msg时,1 T1 |8 i* d8 P0 w) W# w6 c# d
//handleMsg()可以捕捉相应类型的消息并在己方进行处理。
// main loop. handle incoming messages.3 ?6 J6 s6 n/ c+ N
for {8 ]& h9 p) p# v M' q3 j3 `
if err := pm.handleMsg(p); err != nil {
p.Log().Debug("Ethereum message handling failed", "err", err)
return err/ Q- I4 @' \- V& k
}
}
}
成为第一个吐槽的人