深入区块链以太坊源码之p2p通信
Mohammad61417
发表于 2022-12-7 15:31:22
136
0
0
无结构化的:2 X& O% v: c6 _1 X/ y
这种p2p网络即最普通的,不对结构作特别设计的实现方案。5 _% I6 ]3 p, F# M
优点是结构简单易于组建,网络局部区域内个体可任意分布,9 ^% B8 M: c) ~7 X
反正此时网络结构对此也没有限制;特别是在应对大量新个体加
入网络和旧个体离开网络(“churn”)时它的表现非常稳定。: ^" K/ W$ _, y' R
缺点在于在该网络中查找数据的效率太低,因为没有预知信息,
所以往往需要将查询请求发遍整个网络(至少大多数个体),
这会占用很大一部分网络资源,并大大拖慢网络中其他业务运行。- e: B# T8 v2 W8 M) i
结构化的:) y3 e, Z% O b
这种p2p网络中的个体分布经过精心设计,主要目的是为了提高查询数据的效率,
降低查询数据带来的资源消耗。
以太坊采用了不需要结构化的结构,经过改进的非结构化(比如设计好相邻个体列表peerSet结构)
网络模型可以满足需求;
二、分布式hash表(DHT)) k k4 E5 n C+ ]; _( P+ R
保存数据
(以下只是大致原理,具体的协议实现可能会有差异)
当某个节点得到了新加入的数据(K/V),它会先计算自己与新数据的 key 之间的“距离”;6 n4 O. k& d z. ]1 M3 q
然后再计算它所知道的其它节点与这个 key 的距离。
如果计算下来,自己与 key 的距离最小,那么这个数据就保持在自己这里。8 F+ D5 s# r6 E( E0 W4 M
否则的话,把这个数据转发给距离最小的节点。6 y& C; b" {8 \- O/ B( Z: C* t
收到数据的另一个节点,也采用上述过程进行处理(递归处理)。+ s3 [8 k- q/ s* ~* ]* T4 |$ S
获取数据! Z& l1 n2 U2 }
(以下只是大致原理,具体的协议实现可能会有差异)
当某个节点接收到查询数据的请求(key),它会先计算自己与 key 之间的“距离”;
然后再计算它所知道的其它节点与这个 key 的距离。
如果计算下来,自己与 key 的距离最小,那么就在自己这里找有没有 key 对应的 value。
有的话就返回 value,没有的话就报错。
否则的话,把这个数据转发给距离最小的节点。
收到数据的另一个节点,也采用上述过程进行处理(递归处理)。. Z" N/ p' s; o8 Z1 J; ^1 j
三、以太坊中p2p通信的管理模块ProtocolManager- Y1 m) z' H$ E3 _! K; u6 ^: S
/geth.go* r/ u: S0 ~( |
// Start creates a live P2P node and starts running it.( l3 ?/ ^& G( t9 I- l
func (n *Node) Start() error {
return n.node.Start()
}
/*
Protocol:容纳应用程序所要求的回调函数等.并通过p2p.Server{}在新连接建立后,将其传递给通信对象peer。
Node.Start()中首先会创建p2p.Server{},此时Server中的Protocol[]还是空的;
然后将Node中载入的所有实现体中的Protocol都收集起来,- [$ Z' A# R' }/ u
一并交给Server对象,作为Server.Protocols列表;然后启动Server对象,
并将Server对象作为参数去逐一启动每个实现体。
*/+ p% {( o7 A7 W; U ?
/node.go4 Q G! @7 l6 B
// Start create a live P2P node and starts running it.6 Q. V. o0 @1 T6 P( V( E: [$ W
func (n *Node) Start() error {* C' p' N; C* {
& V1 y2 P, b" F5 A$ B5 w
...
/*1 {' Z) G6 z! |# |% _$ K/ h
.../ n7 b( f& h* j
初始化serverConfig
*/9 J8 Z9 L ]' R8 \$ ]
running := &p2p.Server{Config: n.serverConfig}
...
// Gather the protocols and start the freshly assembled P2P server8 {# |- _$ k) m
for _, service := range services {
running.Protocols = append(running.Protocols, service.Protocols()...)7 H2 u6 p V. \
}/ G3 k/ h# F4 S8 e
if err := running.Start(); err != nil { //见下面的(srv *Server)Start方法
return convertFileLockError(err)
}! y9 W" P$ ^9 ?2 _' a. E8 f
// Start each of the services) t1 t, X1 ?8 P9 {8 |
started := []reflect.Type{}( ?1 V2 ~, h$ @+ ~6 J9 r0 E
for kind, service := range services {, H. W* H7 S5 _6 T
// Start the next service, stopping all previous upon failure
//启动每个services通过下面的方法func (s *Ethereum) Start(srvr *p2p.Server) error {. d8 ~5 z+ b: U' w b, `, n+ X
if err := service.Start(running); err != nil {
for _, kind := range started {* Y& v# r$ c2 D* q2 [" y
services[kind].Stop(), @/ y+ {& S$ a* h
}
running.Stop()
return err* p/ `# o# v1 c7 F) {+ ^. a# ^ g
}
... a, y! H) c* z- i$ E$ @* Q; [
}% N r+ n# N+ x& ^- ~- s% \
}) W) O& ^! m5 P* c( H! E2 \" J( I
// Start starts running the server.
// Servers can not be re-used after stopping.+ j1 s8 q5 P* Q! D4 k: e
func (srv *Server) Start() (err error) {2 r* J$ [" ?/ P0 @, j; _
srv.lock.Lock()6 o* E# \( W( z; y; p
//srv.lock为了避免多线程重复启动
defer srv.lock.Unlock()
if srv.running {" E$ r# E( H' C+ x9 `
return errors.New("server already running")
}% o" h7 \1 k( `8 H
srv.running = true
srv.log = srv.Config.Logger) \2 O( _: u+ Y8 m) T
if srv.log == nil {
srv.log = log.New()# q2 R2 R% Y5 r/ x
}
if srv.NoDial && srv.ListenAddr == "" {
srv.log.Warn("P2P server will be useless, neither dialing nor listening")
}5 n: ~4 T# V0 L7 C/ b5 d/ i
// static fields4 Y' r# e" Y4 y! j; H% Q1 n' i. C
if srv.PrivateKey == nil {% x3 C, V3 j: D2 t. H
return fmt.Errorf("Server.PrivateKey must be set to a non-nil key")
}
//newTransport使用了newRLPX使用了rlpx.go中的网络协议。4 q5 [; `+ _1 f
if srv.newTransport == nil {$ e6 R) `% T4 U, B. O, u; Z# ^
srv.newTransport = newRLPX+ f4 \7 Z' D, W: r: Y
}/ W R* `. t0 l4 y2 @: m0 ?
if srv.Dialer == nil {
srv.Dialer = TCPDialer{&net.Dialer{Timeout: defaultDialTimeout}}/ V& a- T/ P! }# _4 I- ?/ E3 G4 x
}
srv.quit = make(chan struct{})
srv.addpeer = make(chan *conn)% D2 q8 \6 r* b2 {" e6 ?# @
srv.delpeer = make(chan peerDrop): y# S- ], ]* ~ s9 O
srv.posthandshake = make(chan *conn)
srv.addstatic = make(chan *enode.Node), K3 E" h- G/ D& S
srv.removestatic = make(chan *enode.Node)' b/ \2 ]0 j( H
srv.addtrusted = make(chan *enode.Node)
srv.removetrusted = make(chan *enode.Node)7 y6 d; N. K4 g: i0 K- X
srv.peerOp = make(chan peerOpFunc)6 ?! l" e, W$ |0 }* k4 o- L
srv.peerOpDone = make(chan struct{})/ w& k O4 J, ^$ X' e8 K
//srv.setupLocalNode()这里主要执行握手
if err := srv.setupLocalNode(); err != nil {& c% y4 P9 W {5 r) z
return err
}. A6 t1 `8 q; c6 P
if srv.ListenAddr != "" {
//监听TCP端口-->用于业务数据传输,基于RLPx协议)% w* p# u; s0 \) R1 B/ d
//在setupListening中有个go srv.listenLoop()去监听某个端口有无主动发来的IP连接
if err := srv.setupListening(); err != nil {
return err
}
}
//侦听UDP端口(用于结点发现内部会启动goroutine)
if err := srv.setupDiscovery(); err != nil {
return err& d) q. M; `5 e
}
dynPeers := srv.maxDialedConns()
dialer := newDialState(srv.localnode.ID(), srv.StaticNodes, srv.BootstrapNodes, srv.ntab, dynPeers, srv.NetRestrict)
srv.loopWG.Add(1)8 [' I- L" _$ f$ t
// 启动新线程发起TCP连接请求
//在run()函数中,监听srv.addpeer通道有没有信息如果有远端peer发来连接请求,
//则调用Server.newPeer()生成新的peer对象,并把Server.Protocols全交给peer。
/*, x2 k( B/ D5 i% Q' W
case c := 0 {9 D! E& h5 s4 q2 m5 U
if s.config.LightPeers >= srvr.MaxPeers { ^: ~3 T. I: [4 t( `) Z
return fmt.Errorf("invalid peer config: light peer count (%d) >= total peer count (%d)", s.config.LightPeers, srvr.MaxPeers)+ T4 k) _/ c* c" f& w
}
maxPeers -= s.config.LightPeers
}7 k; ]' n0 p& k! V
// Start the networking layer and the light server if requested
s.protocolManager.Start(maxPeers)3 ?! \0 p2 V( N2 `& H
if s.lesServer != nil {; I2 o6 Z& p% k8 y w' l8 e- l" I( r: \* @
s.lesServer.Start(srvr)! H; m" w- T" J' d9 ~- s y- y
}3 C% P- O E& |5 U& a1 W5 K
return nil
}# Y0 N1 m! Z' K2 {
/eth/handler.go
type ProtocolManager struct {
networkID uint64
fastSync uint32 // Flag whether fast sync is enabled (gets disabled if we already have blocks)
acceptTxs uint32 // Flag whether we're considered synchronised (enables transaction processing)$ K9 u4 J T. E% A8 Q2 y v- K
txpool txPool
blockchain *core.BlockChain
chainconfig *params.ChainConfig3 Y4 F5 [( H3 }
maxPeers int6 u# l' E" q+ ]; I- I* z
//Downloader类型成员负责所有向相邻个体主动发起的同步流程。8 Y) A- W4 J1 W: h
downloader *downloader.Downloader. ?# g# D8 j% V `# o; r+ c
//Fetcher类型成员累积所有其他个体发送来的有关新数据的宣布消息,并在自身对照后做出安排2 [$ c& S h( k6 P) X
fetcher *fetcher.Fetcher( l c Z3 `. i+ q& I; i; j( T' `6 {
//用来缓存相邻个体列表,peer{}表示网络中的一个远端个体。
peers *peerSet
SubProtocols []p2p.Protocol
eventMux *event.TypeMux$ z2 T$ w3 D% W3 @) [
txsCh chan core.NewTxsEvent+ C2 _! h2 Y$ i
txsSub event.Subscription
minedBlockSub *event.TypeMuxSubscription
$ f! y5 g* ]" Y0 t) ~
//通过各种通道(chan)和事件订阅(subscription)的方式,接收和发送包括交易和区块在内的数据更新。! |& X- T! a9 C9 S/ N" G
//当然在应用中,订阅也往往利用通道来实现事件通知。
// channels for fetcher, syncer, txsyncLoop
newPeerCh chan *peer* V( B4 _( U" _
txsyncCh chan *txsync# m2 _9 S) d1 h
quitSync chan struct{}! Q/ L# @" a3 n
noMorePeers chan struct{}
// wait group is used for graceful shutdowns during downloading
// and processing
wg sync.WaitGroup$ I: J8 x. a" |; m) b' k
}9 a; w8 B% `7 k- Q0 L1 W* q
Start()函数是ProtocolManager的启动函数,它会在eth.Ethereum.Start()中被主动调用。
ProtocolManager.Start()会启用4个单独线程(goroutine,协程)去分别执行4个函数, [7 h7 v; q/ ^- z- B8 h, l- ?
这也标志着该以太坊个体p2p通信的全面启动。1 j: M7 c, n3 t& L2 t
func (pm *ProtocolManager) Start(maxPeers int) {0 K! V1 A7 }# b+ a
pm.maxPeers = maxPeers
// broadcast transactions
//广播交易的通道。 txsCh会作为txpool的TxPreEvent订阅通道。5 A& K i/ z% E/ r [ R$ a
//txpool有了这种消息会通知给这个txsCh。 广播交易的goroutine会把这个消息广播出去。
pm.txsCh = make(chan core.NewTxsEvent, txChanSize)# q9 U# Y# J* ^% v% k& o3 K% T
//订阅交易信息% Y+ x) _" H: O2 y
pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh)
go pm.txBroadcastLoop(), Z# _3 o9 M% w- Y/ y1 D$ T( h! j
//订阅挖矿消息。当新的Block被挖出来的时候会产生消息
// broadcast mined blocks
pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
//挖矿广播 goroutine 当挖出来的时候需要尽快的广播到网络上面去。3 ^0 }" V+ ~9 T7 t6 A% D) B
go pm.minedBroadcastLoop()3 k$ Z* W1 U/ @( d" B ?+ d
// start sync handlers
// 同步器负责周期性地与网络同步,下载散列和块以及处理通知处理程序。+ n* a6 G- W C5 V. C
go pm.syncer()
// txsyncLoop负责每个新连接的初始事务同步。 当新的peer出现时," L' O/ T9 u8 V, a+ I
// 转发所有当前待处理的事务。为了最小化出口带宽使用,我们一次只发送一个小包。3 ^" t: m1 @0 G
go pm.txsyncLoop()
}5 C+ D% a N& ]9 \0 F+ k8 _
//txBroadcastLoop()会在txCh通道的收端持续等待,一旦接收到有关新交易的事件,: w0 _* k) h! F* a
//会立即调用BroadcastTx()函数广播给那些尚无该交易对象的相邻个体。* S' | l6 A7 C. C
//------------------go pm.txBroadcastLoop()-----------------------
func (pm *ProtocolManager) txBroadcastLoop() {
for {' X$ @! j' ^) `6 C* ?5 X
select {* \. e3 v- f$ A ?
case event := = pm.maxPeers && !p.Peer.Info().Network.Trusted {
return p2p.DiscTooManyPeers# y {0 Y4 z: s- U, J8 H& a1 c
}
p.Log().Debug("Ethereum peer connected", "name", p.Name())3 N2 J* Q f0 A2 Q% r3 I/ y
// Execute the Ethereum handshake
var (, G/ i9 q) W( g( j, n6 V
genesis = pm.blockchain.Genesis()/ m/ ^- S* X1 w' b( [: p
head = pm.blockchain.CurrentHeader(). Y3 ^% f4 M7 C$ M6 C" n
hash = head.Hash()
number = head.Number.Uint64()
td = pm.blockchain.GetTd(hash, number)" n( p1 x3 S. f) e& r5 M
)
//握手,与对方peer沟通己方的区块链状态$ J. a! Y- t7 Y; u& z% @
if err := p.Handshake(pm.networkID, td, hash, genesis.Hash()); err != nil {* R+ S' r) x7 B! T$ ~
p.Log().Debug("Ethereum handshake failed", "err", err)+ l0 n3 {2 W3 v. T* P
return err! l# ~) o$ x/ F( Q/ n& M
}8 U; c) R7 ~) r* h5 m% X4 @% o
//初始化一个读写通道,用以跟对方peer相互数据传输。
if rw, ok := p.rw.(*meteredMsgReadWriter); ok {! u: Y- n& c6 c5 _/ B
rw.Init(p.version) ^6 R6 _2 }5 C: N; x5 N. b# i: E
}
// Register the peer locally
//注册对方peer,存入己方peer列表;只有handle()函数退出时,才会将这个peer移除出列表。7 K# B) W8 P F2 \- l
if err := pm.peers.Register(p); err != nil {1 \% m9 ]2 b) W! k! K# R3 J
p.Log().Error("Ethereum peer registration failed", "err", err)
return err/ J% Z7 T+ I. O: D# r5 `/ p# g
}7 d0 R- i8 N1 B- @: m; b' f1 E: l
defer pm.removePeer(p.id)4 U: F+ e3 U$ J6 Y1 ]. G0 _- N
//Downloader成员注册这个新peer;Downloader会自己维护一个相邻peer列表。
// Register the peer in the downloader. If the downloader considers it banned, we disconnect
if err := pm.downloader.RegisterPeer(p.id, p.version, p); err != nil {
return err
}( Q, l, I+ W- U3 S) o* J0 g1 v
// Propagate existing transactions. new transactions appearing
// after this will be sent via broadcasts.
/*
调用syncTransactions(),用当前txpool中新累计的tx对象组装成一个txsync{}对象,
推送到内部通道txsyncCh。还记得Start()启动的四个函数么? 其中第四项txsyncLoop()
中用以等待txsync{}数据的通道txsyncCh,正是在这里被推入txsync{}的。% y' O8 E5 R: s& b+ L9 [8 r" p0 } X
*/" H% R0 q( \, Q, F" q# {
pm.syncTransactions(p)2 n t% M4 P: ~6 T) n% p V
// If we're DAO hard-fork aware, validate any remote peer with regard to the hard-fork# k# X n2 j& r4 J6 m! a! h
if daoBlock := pm.chainconfig.DAOForkBlock; daoBlock != nil {
// Request the peer's DAO fork header for extra-data validation
if err := p.RequestHeadersByNumber(daoBlock.Uint64(), 1, 0, false); err != nil {
return err( q4 o/ `6 _6 V. }1 A
}: u. H6 {) b5 n# |. x* o- Y
// Start a timer to disconnect if the peer doesn't reply in time
p.forkDrop = time.AfterFunc(daoChallengeTimeout, func() {
p.Log().Debug("Timed out DAO fork-check, dropping")
pm.removePeer(p.id)! }3 \ N. [" r' R! P/ j8 I
})! i2 Z; A+ t: `2 n1 i6 `
// Make sure it's cleaned up if the peer dies off
defer func() {; W2 E& A U- @5 f. n
if p.forkDrop != nil {
p.forkDrop.Stop()
p.forkDrop = nil8 m3 m; k' ?8 J8 z: y' Q! ]
}1 f; a2 k: X7 ^+ M; L$ D5 L
}()
}
//在无限循环中启动handleMsg(),当对方peer发出任何msg时,
//handleMsg()可以捕捉相应类型的消息并在己方进行处理。
// main loop. handle incoming messages.
for {
if err := pm.handleMsg(p); err != nil { V8 \( P$ R9 }) \" N
p.Log().Debug("Ethereum message handling failed", "err", err)+ m* h- F5 X! F; _
return err+ ~6 j7 m# F6 a7 t
}, y+ R" P4 h. d) x/ l( b' f0 }1 @
}
}
成为第一个吐槽的人