深入区块链以太坊源码之p2p通信
Mohammad61417
发表于 2022-12-7 15:31:22
105
0
0
无结构化的:$ v( @1 C7 z) T! B( I
这种p2p网络即最普通的,不对结构作特别设计的实现方案。) w* g& ^3 a/ ]$ w1 K6 g6 Y2 \
优点是结构简单易于组建,网络局部区域内个体可任意分布,
反正此时网络结构对此也没有限制;特别是在应对大量新个体加8 a2 D. p* V' V) m0 t, v" w
入网络和旧个体离开网络(“churn”)时它的表现非常稳定。
缺点在于在该网络中查找数据的效率太低,因为没有预知信息,
所以往往需要将查询请求发遍整个网络(至少大多数个体),
这会占用很大一部分网络资源,并大大拖慢网络中其他业务运行。
结构化的:' o/ d, i H7 C
这种p2p网络中的个体分布经过精心设计,主要目的是为了提高查询数据的效率,
降低查询数据带来的资源消耗。# S' z) k. a& O s( P: t
以太坊采用了不需要结构化的结构,经过改进的非结构化(比如设计好相邻个体列表peerSet结构)0 o/ t7 E! k; I8 V1 d4 \" _ ?* P" j
网络模型可以满足需求;
二、分布式hash表(DHT): M- E# l( Y% I1 L; T
保存数据
(以下只是大致原理,具体的协议实现可能会有差异). m# @$ M% x& `, L
当某个节点得到了新加入的数据(K/V),它会先计算自己与新数据的 key 之间的“距离”;0 V# C4 i& z* l$ A* @: W F) Q
然后再计算它所知道的其它节点与这个 key 的距离。
如果计算下来,自己与 key 的距离最小,那么这个数据就保持在自己这里。9 D, x' ^# e7 S' U
否则的话,把这个数据转发给距离最小的节点。2 ?: v- V9 u+ @5 q
收到数据的另一个节点,也采用上述过程进行处理(递归处理)。( Z8 b" B Q) X4 M% v2 o
获取数据) y% I: q! I8 i# L* d* K
(以下只是大致原理,具体的协议实现可能会有差异)
当某个节点接收到查询数据的请求(key),它会先计算自己与 key 之间的“距离”;8 C) \% r/ V1 U
然后再计算它所知道的其它节点与这个 key 的距离。! B! a, `3 \# s6 t
如果计算下来,自己与 key 的距离最小,那么就在自己这里找有没有 key 对应的 value。' s. z% K4 e4 J( u
有的话就返回 value,没有的话就报错。
否则的话,把这个数据转发给距离最小的节点。
收到数据的另一个节点,也采用上述过程进行处理(递归处理)。7 t+ X5 t, z. b! `% i) h
三、以太坊中p2p通信的管理模块ProtocolManager
/geth.go
// Start creates a live P2P node and starts running it.0 Y6 z2 z" H8 Y5 O' t6 ?; c! `1 h& d
func (n *Node) Start() error {' p7 m- t# o; J D
return n.node.Start()
}: C3 w9 e- X. Z) P8 i. o
/*
Protocol:容纳应用程序所要求的回调函数等.并通过p2p.Server{}在新连接建立后,将其传递给通信对象peer。8 {! g: j3 ~' A6 W! e; ?9 g3 f
Node.Start()中首先会创建p2p.Server{},此时Server中的Protocol[]还是空的;7 h7 k+ Q8 n5 P. m" B2 x! h: x. n
然后将Node中载入的所有实现体中的Protocol都收集起来,
一并交给Server对象,作为Server.Protocols列表;然后启动Server对象," V4 O* e1 ~, p8 C$ z0 }
并将Server对象作为参数去逐一启动每个实现体。
*/
/node.go/ d1 }2 l: v4 r6 b n; m* z$ h2 W
// Start create a live P2P node and starts running it.% P* H7 q; @; p- M
func (n *Node) Start() error {! t7 B L$ [! e6 k6 q3 I- q
t2 t0 k+ f ^ ?" b- @6 ]# J
...
/*1 m6 } s5 E M& J' G" V) a
...
初始化serverConfig7 c2 X5 E# b* M; k9 u8 d9 W+ y
*/
running := &p2p.Server{Config: n.serverConfig}8 B8 @1 q, _- V# A1 |2 c+ ?* ~
...7 |2 H0 k+ [- ~5 u8 r4 Q+ o
// Gather the protocols and start the freshly assembled P2P server
for _, service := range services { i" }" q) h3 t- g) t( q
running.Protocols = append(running.Protocols, service.Protocols()...), k6 F7 |5 V E/ f
}
if err := running.Start(); err != nil { //见下面的(srv *Server)Start方法+ l$ t6 |0 A1 }! ^" g6 D
return convertFileLockError(err)
}
// Start each of the services
started := []reflect.Type{}- L, e$ g2 b) Y! ?% i
for kind, service := range services {
// Start the next service, stopping all previous upon failure
//启动每个services通过下面的方法func (s *Ethereum) Start(srvr *p2p.Server) error {
if err := service.Start(running); err != nil {/ Q% `# ?' k# S
for _, kind := range started {
services[kind].Stop()
}
running.Stop()! Q: B/ r3 H6 H! l, \9 c& X1 Q
return err' x1 V5 h8 Q% Q
}
...
}
}; Z b- y% W* g
// Start starts running the server.0 M1 ?8 P, W7 G$ x# b1 L# E7 ?( |
// Servers can not be re-used after stopping.7 e+ N0 u5 C8 U) D+ E" ^
func (srv *Server) Start() (err error) {
srv.lock.Lock()
//srv.lock为了避免多线程重复启动: X# @: ^0 E2 @4 G
defer srv.lock.Unlock()
if srv.running {
return errors.New("server already running")
}! Z& q1 e! ]1 w/ r0 c f- J' w
srv.running = true
srv.log = srv.Config.Logger
if srv.log == nil {$ O, r( |6 `& }5 `' j5 ?% [9 s
srv.log = log.New()
}
if srv.NoDial && srv.ListenAddr == "" {3 n6 m, Q" n4 Y& S6 U$ k
srv.log.Warn("P2P server will be useless, neither dialing nor listening")
}3 A! |1 i- O, e/ e, j
// static fields+ A1 S! g% s1 G! v! I; _
if srv.PrivateKey == nil {+ u1 g8 f4 o6 Y! P3 k
return fmt.Errorf("Server.PrivateKey must be set to a non-nil key")
}* m$ f( {. v& |; v
//newTransport使用了newRLPX使用了rlpx.go中的网络协议。
if srv.newTransport == nil {
srv.newTransport = newRLPX
}
if srv.Dialer == nil {
srv.Dialer = TCPDialer{&net.Dialer{Timeout: defaultDialTimeout}}0 r0 J, a, y* s/ Y- ]+ D
}& {) H" @; I2 i, X5 w" d. @+ p
srv.quit = make(chan struct{})6 N3 q5 X4 A) |, i+ L3 B
srv.addpeer = make(chan *conn)
srv.delpeer = make(chan peerDrop)7 h% |+ m1 U% ~* Q2 c& o7 |3 Z
srv.posthandshake = make(chan *conn)
srv.addstatic = make(chan *enode.Node)1 k2 v4 y! t+ H& L7 V! V9 J& d
srv.removestatic = make(chan *enode.Node)
srv.addtrusted = make(chan *enode.Node)0 E3 p( D' S% h8 S2 A* d7 q, a& K9 @
srv.removetrusted = make(chan *enode.Node)
srv.peerOp = make(chan peerOpFunc)
srv.peerOpDone = make(chan struct{}): K4 {( D. I0 g% r
//srv.setupLocalNode()这里主要执行握手5 p- Y, k/ s, i0 |: ^) @7 [4 C+ I
if err := srv.setupLocalNode(); err != nil {
return err2 w5 B n% o+ y0 n$ h4 n% v* J+ q# P
}6 a! G% N+ S% [9 e1 [. ^( `
if srv.ListenAddr != "" {
//监听TCP端口-->用于业务数据传输,基于RLPx协议)
//在setupListening中有个go srv.listenLoop()去监听某个端口有无主动发来的IP连接. K) z7 s4 d5 q7 d( \% D$ M# s- `
if err := srv.setupListening(); err != nil {
return err" ?. p2 r8 e' r! l4 y6 W, u6 Q7 u Q
}
}
//侦听UDP端口(用于结点发现内部会启动goroutine)3 Q% R C9 [- a0 L/ Y
if err := srv.setupDiscovery(); err != nil {: L) \! f# {1 Y3 K
return err
}
dynPeers := srv.maxDialedConns()% n7 f& o$ M' ?0 ~& H+ E. `0 A7 p1 j
dialer := newDialState(srv.localnode.ID(), srv.StaticNodes, srv.BootstrapNodes, srv.ntab, dynPeers, srv.NetRestrict)) G1 U; L \1 q( O2 U( P. H: t
srv.loopWG.Add(1)- f, X4 X& m# G" M
// 启动新线程发起TCP连接请求
//在run()函数中,监听srv.addpeer通道有没有信息如果有远端peer发来连接请求,
//则调用Server.newPeer()生成新的peer对象,并把Server.Protocols全交给peer。
/*
case c := 0 {! Q3 i# S; p$ P, l
if s.config.LightPeers >= srvr.MaxPeers {
return fmt.Errorf("invalid peer config: light peer count (%d) >= total peer count (%d)", s.config.LightPeers, srvr.MaxPeers)
}# r- D' W& X* i$ h3 R, _; |
maxPeers -= s.config.LightPeers
}
// Start the networking layer and the light server if requested7 p4 p5 ]& Y& r9 ^! j
s.protocolManager.Start(maxPeers)6 ?3 V9 X2 x7 w- y% n) x
if s.lesServer != nil {$ t: c; y1 S- Y4 R. M' Z' P) `
s.lesServer.Start(srvr)
}- V6 ?/ C3 J% N* a0 w1 C& z' \
return nil
}) z' E3 B( b; M" L3 T
/eth/handler.go
type ProtocolManager struct {
networkID uint648 u0 B9 H& v# D, h
fastSync uint32 // Flag whether fast sync is enabled (gets disabled if we already have blocks)
acceptTxs uint32 // Flag whether we're considered synchronised (enables transaction processing)* V, O: f# {3 m; G* q
txpool txPool( T, S8 Z- h, V* u6 @$ l0 y
blockchain *core.BlockChain! p# ~3 \. ~: f# J# w
chainconfig *params.ChainConfig
maxPeers int% D5 x7 O. ]/ c* X9 d2 a
//Downloader类型成员负责所有向相邻个体主动发起的同步流程。
downloader *downloader.Downloader# d$ I8 `. O6 ~- R# q: [: y5 p7 T
//Fetcher类型成员累积所有其他个体发送来的有关新数据的宣布消息,并在自身对照后做出安排
fetcher *fetcher.Fetcher& |* M) W. J0 V9 @2 c
//用来缓存相邻个体列表,peer{}表示网络中的一个远端个体。
peers *peerSet
SubProtocols []p2p.Protocol7 O8 W" t9 u. ], a& b
eventMux *event.TypeMux9 t z! y0 p9 _" L/ P
txsCh chan core.NewTxsEvent
txsSub event.Subscription
minedBlockSub *event.TypeMuxSubscription
# Q& N( {+ y+ A8 }& t) X$ H
//通过各种通道(chan)和事件订阅(subscription)的方式,接收和发送包括交易和区块在内的数据更新。
//当然在应用中,订阅也往往利用通道来实现事件通知。
// channels for fetcher, syncer, txsyncLoop/ h% \1 T, u: l) z/ c
newPeerCh chan *peer
txsyncCh chan *txsync
quitSync chan struct{}" K, t! t' D! V9 S; d
noMorePeers chan struct{}8 O* B$ o$ O# ~& p" X5 a
// wait group is used for graceful shutdowns during downloading
// and processing# P; X6 {+ t$ W2 L+ w
wg sync.WaitGroup
}
Start()函数是ProtocolManager的启动函数,它会在eth.Ethereum.Start()中被主动调用。/ l; i+ \( L. T8 S0 K' B2 |6 f
ProtocolManager.Start()会启用4个单独线程(goroutine,协程)去分别执行4个函数,
这也标志着该以太坊个体p2p通信的全面启动。
func (pm *ProtocolManager) Start(maxPeers int) {
pm.maxPeers = maxPeers
// broadcast transactions
//广播交易的通道。 txsCh会作为txpool的TxPreEvent订阅通道。7 p8 T0 R" G, I8 E+ B5 m/ N
//txpool有了这种消息会通知给这个txsCh。 广播交易的goroutine会把这个消息广播出去。- {: y; g) S8 j1 _/ r- u+ F+ V+ G& J
pm.txsCh = make(chan core.NewTxsEvent, txChanSize)2 v2 @# Z0 S i* c/ M
//订阅交易信息
pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh)
go pm.txBroadcastLoop()
//订阅挖矿消息。当新的Block被挖出来的时候会产生消息7 }0 `/ _" p, g
// broadcast mined blocks
pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
//挖矿广播 goroutine 当挖出来的时候需要尽快的广播到网络上面去。5 M3 G( H; T* A- }
go pm.minedBroadcastLoop()# p/ Z1 U/ U) v4 b
// start sync handlers
// 同步器负责周期性地与网络同步,下载散列和块以及处理通知处理程序。
go pm.syncer()3 G$ w7 u1 ?9 f. a- X" L" _
// txsyncLoop负责每个新连接的初始事务同步。 当新的peer出现时,
// 转发所有当前待处理的事务。为了最小化出口带宽使用,我们一次只发送一个小包。
go pm.txsyncLoop()
}
//txBroadcastLoop()会在txCh通道的收端持续等待,一旦接收到有关新交易的事件,! E* S, J. O$ K0 o+ _+ }) C7 z
//会立即调用BroadcastTx()函数广播给那些尚无该交易对象的相邻个体。
//------------------go pm.txBroadcastLoop()-----------------------
func (pm *ProtocolManager) txBroadcastLoop() {
for {" c) S4 k! x3 F5 Z# j6 [ y. J
select {
case event := = pm.maxPeers && !p.Peer.Info().Network.Trusted {5 U1 Z; \" Q. h1 [$ C
return p2p.DiscTooManyPeers
}" s, Y1 x0 m L# i# k! S
p.Log().Debug("Ethereum peer connected", "name", p.Name())2 T, `4 k* d" }8 @
// Execute the Ethereum handshake
var (
genesis = pm.blockchain.Genesis()
head = pm.blockchain.CurrentHeader()
hash = head.Hash()% u6 A: P) B0 `' k
number = head.Number.Uint64()
td = pm.blockchain.GetTd(hash, number); o0 ]$ M0 B' z* c
)$ Y5 }+ z6 o- u0 o4 W8 V
//握手,与对方peer沟通己方的区块链状态. ?9 `3 ^6 e$ i5 i" y3 ^% l9 r
if err := p.Handshake(pm.networkID, td, hash, genesis.Hash()); err != nil {( v1 r9 Y& m% h- G; |
p.Log().Debug("Ethereum handshake failed", "err", err)
return err- u+ C, F% p" k; h, I4 J& X
}& _: n9 k6 M* C* B" l" Q
//初始化一个读写通道,用以跟对方peer相互数据传输。& i; Y9 O% ?+ Y8 R
if rw, ok := p.rw.(*meteredMsgReadWriter); ok {
rw.Init(p.version)& i" H3 a/ ]/ j6 @) s9 m
}' h% T; W ^( |7 U K8 l
// Register the peer locally6 I) l( l: p+ _9 `: J8 o: w
//注册对方peer,存入己方peer列表;只有handle()函数退出时,才会将这个peer移除出列表。
if err := pm.peers.Register(p); err != nil {
p.Log().Error("Ethereum peer registration failed", "err", err)* H% j) [0 @8 K
return err2 J; E7 X& G5 s }
}
defer pm.removePeer(p.id)- E5 u- P6 j/ |" `- m8 _
//Downloader成员注册这个新peer;Downloader会自己维护一个相邻peer列表。. p8 H" b" ^) A3 l
// Register the peer in the downloader. If the downloader considers it banned, we disconnect' q' |4 m, _# T! e
if err := pm.downloader.RegisterPeer(p.id, p.version, p); err != nil {2 F! I/ Z) p" b1 s3 A5 K1 m
return err
}
// Propagate existing transactions. new transactions appearing
// after this will be sent via broadcasts.
/*
调用syncTransactions(),用当前txpool中新累计的tx对象组装成一个txsync{}对象,0 c2 X3 s* L. g7 P0 H$ a U
推送到内部通道txsyncCh。还记得Start()启动的四个函数么? 其中第四项txsyncLoop(), x C6 v4 T# K/ a) X+ g: `
中用以等待txsync{}数据的通道txsyncCh,正是在这里被推入txsync{}的。2 q' R& W0 j! i, t/ U6 z8 l5 U; a
*/
pm.syncTransactions(p)
// If we're DAO hard-fork aware, validate any remote peer with regard to the hard-fork
if daoBlock := pm.chainconfig.DAOForkBlock; daoBlock != nil {
// Request the peer's DAO fork header for extra-data validation
if err := p.RequestHeadersByNumber(daoBlock.Uint64(), 1, 0, false); err != nil {+ J$ c" Y3 `* y/ ]
return err
}
// Start a timer to disconnect if the peer doesn't reply in time
p.forkDrop = time.AfterFunc(daoChallengeTimeout, func() {! E% C6 u4 }: u" s, H* `; ]
p.Log().Debug("Timed out DAO fork-check, dropping") }; m' {/ S! j) Q4 g
pm.removePeer(p.id)" \4 I6 X2 }- y3 C% g- x
})0 ^* ^9 N- e" h4 J& D& V# ^) E
// Make sure it's cleaned up if the peer dies off0 k7 G" P5 {/ v* n; J0 h$ ], @4 D$ ?
defer func() {
if p.forkDrop != nil {
p.forkDrop.Stop()+ ^7 h( `- J$ I, ^0 [" l+ P! O
p.forkDrop = nil$ B% [# D) L: m s. I" \ z. v
}
}()
}1 Y. c z1 Y0 ?8 f* I Y
//在无限循环中启动handleMsg(),当对方peer发出任何msg时,1 T( S8 i1 h# N+ @
//handleMsg()可以捕捉相应类型的消息并在己方进行处理。5 R9 X3 @$ ?! v8 z' H" Y4 m+ n
// main loop. handle incoming messages.4 s- N. g6 g: |- a8 W$ O0 C; f
for {
if err := pm.handleMsg(p); err != nil {2 L: f1 g" ?5 d6 ~# y5 ~
p.Log().Debug("Ethereum message handling failed", "err", err)
return err
}6 S& L, @5 R+ e3 e/ o1 m* M" A. j
}3 Y) D$ M, w. w3 K
}
成为第一个吐槽的人