深入区块链以太坊源码之p2p通信
Mohammad61417
发表于 2022-12-7 15:31:22
259
0
0
无结构化的:. ]5 m9 a) h+ G; g/ N `
这种p2p网络即最普通的,不对结构作特别设计的实现方案。
优点是结构简单易于组建,网络局部区域内个体可任意分布,
反正此时网络结构对此也没有限制;特别是在应对大量新个体加! J, d1 }/ }# Z9 |, n G. V- x
入网络和旧个体离开网络(“churn”)时它的表现非常稳定。" Q/ w3 d9 A. J1 {: t+ Q7 f4 o E
缺点在于在该网络中查找数据的效率太低,因为没有预知信息,
所以往往需要将查询请求发遍整个网络(至少大多数个体),
这会占用很大一部分网络资源,并大大拖慢网络中其他业务运行。- B# x2 S& X& x! l& k; {( j
结构化的:: L7 J0 H$ e, E5 L
这种p2p网络中的个体分布经过精心设计,主要目的是为了提高查询数据的效率,* p# ~5 [; I5 D6 k% r( a
降低查询数据带来的资源消耗。6 u4 l: d) f* [6 o% T- t
以太坊采用了不需要结构化的结构,经过改进的非结构化(比如设计好相邻个体列表peerSet结构)( E2 @% @" e J$ s, e3 _
网络模型可以满足需求;
二、分布式hash表(DHT)
保存数据$ ] b5 |, C+ t
(以下只是大致原理,具体的协议实现可能会有差异)
当某个节点得到了新加入的数据(K/V),它会先计算自己与新数据的 key 之间的“距离”;- K, M, t6 V. w1 l
然后再计算它所知道的其它节点与这个 key 的距离。6 k% @ Q9 M) R4 C$ Y
如果计算下来,自己与 key 的距离最小,那么这个数据就保持在自己这里。! F' g; E S; H7 b# U h' h" y" |. [+ Z
否则的话,把这个数据转发给距离最小的节点。
收到数据的另一个节点,也采用上述过程进行处理(递归处理)。
获取数据
(以下只是大致原理,具体的协议实现可能会有差异)2 ]3 |3 Q" v ?) M4 V
当某个节点接收到查询数据的请求(key),它会先计算自己与 key 之间的“距离”;' w; W/ X, P# r' q
然后再计算它所知道的其它节点与这个 key 的距离。) F$ h# B2 o; G. }' X2 r0 K/ h5 o
如果计算下来,自己与 key 的距离最小,那么就在自己这里找有没有 key 对应的 value。& N! E( u( Q* {' Y
有的话就返回 value,没有的话就报错。
否则的话,把这个数据转发给距离最小的节点。. G3 h" |: @$ \+ R) g
收到数据的另一个节点,也采用上述过程进行处理(递归处理)。. J4 a+ N6 b) ], }4 B6 T5 @6 L1 S
三、以太坊中p2p通信的管理模块ProtocolManager2 B. B& R, M% g% P
/geth.go
// Start creates a live P2P node and starts running it.
func (n *Node) Start() error {
return n.node.Start()
}
/*
Protocol:容纳应用程序所要求的回调函数等.并通过p2p.Server{}在新连接建立后,将其传递给通信对象peer。( @8 a% ~4 [1 y/ |) ?
Node.Start()中首先会创建p2p.Server{},此时Server中的Protocol[]还是空的;7 ^1 V+ \$ f8 V- P
然后将Node中载入的所有实现体中的Protocol都收集起来,
一并交给Server对象,作为Server.Protocols列表;然后启动Server对象,
并将Server对象作为参数去逐一启动每个实现体。
*/5 J3 v9 M8 ^* k0 p1 g
/node.go
// Start create a live P2P node and starts running it.
func (n *Node) Start() error {* E9 Z! I) c# r( p# Z
...( r1 Q4 @$ E1 S4 h
/*; g Q! r2 l) l1 C8 C5 u v
...
初始化serverConfig
*/
running := &p2p.Server{Config: n.serverConfig}5 z; J4 L5 v" ?. j; ]
...
// Gather the protocols and start the freshly assembled P2P server
for _, service := range services {
running.Protocols = append(running.Protocols, service.Protocols()...)) a1 ^, a# `7 b f3 J7 d
}5 F: |4 a4 V. X2 K
if err := running.Start(); err != nil { //见下面的(srv *Server)Start方法
return convertFileLockError(err)$ u" F7 N6 x8 w& ~* ?; ?1 c3 Q
}) g# Y7 t" L+ y1 g0 V$ |
// Start each of the services4 D' e b! G/ s w$ r/ n0 O u
started := []reflect.Type{}0 ]8 C; {- `$ s0 w+ A
for kind, service := range services {
// Start the next service, stopping all previous upon failure" F# j3 t4 L( r
//启动每个services通过下面的方法func (s *Ethereum) Start(srvr *p2p.Server) error {- A" l. b# _0 O
if err := service.Start(running); err != nil {
for _, kind := range started {
services[kind].Stop()2 E# q- X8 A$ u0 ?
}
running.Stop()" j% ~, |/ [% l7 }& u$ z
return err8 t0 C- O; s# Q9 _1 f' p
}
.... N! u# }9 H2 Y, F3 H- u, N9 i
} S) ^% f9 Q8 ^0 r1 c ~+ g+ i5 y2 N
}1 C; f* r& q( V7 x6 h
// Start starts running the server.
// Servers can not be re-used after stopping.7 r+ g5 u) s8 v7 c \: h) N1 \
func (srv *Server) Start() (err error) {- E) ~$ d6 Q1 D- B. }5 @
srv.lock.Lock()
//srv.lock为了避免多线程重复启动0 F0 g) x* p, q
defer srv.lock.Unlock()0 z# e! O( a# S7 U/ n: n2 Q( t* |8 j
if srv.running {
return errors.New("server already running")
}
srv.running = true
srv.log = srv.Config.Logger, i. g& z! Y4 J: ]; k/ h
if srv.log == nil {
srv.log = log.New()9 Y' c& s5 k, r1 ]0 W# r
}
if srv.NoDial && srv.ListenAddr == "" {
srv.log.Warn("P2P server will be useless, neither dialing nor listening")
}
// static fields! c/ C# R# m5 ]/ h y4 G# H6 y# n; @
if srv.PrivateKey == nil { B/ S1 H% R" _
return fmt.Errorf("Server.PrivateKey must be set to a non-nil key")
}
//newTransport使用了newRLPX使用了rlpx.go中的网络协议。
if srv.newTransport == nil {. n: d( c- [1 p8 B# i4 k+ m
srv.newTransport = newRLPX
}& P8 |( L& U) x
if srv.Dialer == nil {
srv.Dialer = TCPDialer{&net.Dialer{Timeout: defaultDialTimeout}}6 I0 q3 b, t3 N/ j6 W
}
srv.quit = make(chan struct{})6 r7 t+ e; ~; U" A
srv.addpeer = make(chan *conn)
srv.delpeer = make(chan peerDrop)
srv.posthandshake = make(chan *conn)
srv.addstatic = make(chan *enode.Node)1 p( D U" o' ?0 U/ v* }9 l
srv.removestatic = make(chan *enode.Node)
srv.addtrusted = make(chan *enode.Node)
srv.removetrusted = make(chan *enode.Node)0 Q8 k' ?8 y6 Z0 Z% ?" r& b
srv.peerOp = make(chan peerOpFunc): N, F8 c: L/ }* G& K1 W
srv.peerOpDone = make(chan struct{})
//srv.setupLocalNode()这里主要执行握手
if err := srv.setupLocalNode(); err != nil {
return err( Y5 G U6 H& P0 v% S
}8 W7 g \: W: v5 r8 f! d
if srv.ListenAddr != "" {
//监听TCP端口-->用于业务数据传输,基于RLPx协议), L5 u$ _2 h3 L/ P3 N" c7 S
//在setupListening中有个go srv.listenLoop()去监听某个端口有无主动发来的IP连接6 [/ T5 {( Q! k% a" ]3 U& v* T6 X
if err := srv.setupListening(); err != nil { `0 ?5 \) m1 w
return err
}# R* A, d- s" U6 `
}
//侦听UDP端口(用于结点发现内部会启动goroutine)
if err := srv.setupDiscovery(); err != nil {: b, j; M0 ~8 r# Y- n: `4 ~) K
return err
}
dynPeers := srv.maxDialedConns()6 `; |! P8 G+ i7 R( ~
dialer := newDialState(srv.localnode.ID(), srv.StaticNodes, srv.BootstrapNodes, srv.ntab, dynPeers, srv.NetRestrict)5 K; Q2 N! L5 J0 C5 J2 _5 d9 H
srv.loopWG.Add(1)
// 启动新线程发起TCP连接请求2 j G4 [' l) @3 \2 B g: ?& ?5 u! V
//在run()函数中,监听srv.addpeer通道有没有信息如果有远端peer发来连接请求,
//则调用Server.newPeer()生成新的peer对象,并把Server.Protocols全交给peer。
/*
case c := 0 {5 [% F& m. ~( |* O; a- T1 c& e! h
if s.config.LightPeers >= srvr.MaxPeers {& t3 O+ s/ Q8 ^! Y; T
return fmt.Errorf("invalid peer config: light peer count (%d) >= total peer count (%d)", s.config.LightPeers, srvr.MaxPeers)$ \, i% D8 V. Q- D& z
}
maxPeers -= s.config.LightPeers
}
// Start the networking layer and the light server if requested$ k$ m: n+ S! W6 Y1 B- u1 O* y
s.protocolManager.Start(maxPeers)
if s.lesServer != nil {3 s. W" e0 J' z+ s
s.lesServer.Start(srvr)4 ?# J/ P5 l0 O1 h) G' a) M1 c
}
return nil
}7 I( P. A" [- }9 O0 d
/eth/handler.go
type ProtocolManager struct {0 x/ c3 T: U! u" v( Z6 s {3 U5 s
networkID uint64
fastSync uint32 // Flag whether fast sync is enabled (gets disabled if we already have blocks)9 T; v1 s7 c/ S6 N' y* ^
acceptTxs uint32 // Flag whether we're considered synchronised (enables transaction processing)
txpool txPool. ^7 r" l6 l# r
blockchain *core.BlockChain
chainconfig *params.ChainConfig' u6 F, U3 N ~" ~6 t
maxPeers int
//Downloader类型成员负责所有向相邻个体主动发起的同步流程。
downloader *downloader.Downloader
//Fetcher类型成员累积所有其他个体发送来的有关新数据的宣布消息,并在自身对照后做出安排
fetcher *fetcher.Fetcher
//用来缓存相邻个体列表,peer{}表示网络中的一个远端个体。9 b' W" k. Q+ o5 T" g' N
peers *peerSet
SubProtocols []p2p.Protocol' E% r [% y$ {
eventMux *event.TypeMux ^. c8 ^- S0 {
txsCh chan core.NewTxsEvent
txsSub event.Subscription5 n4 n6 f- j- y! T/ H
minedBlockSub *event.TypeMuxSubscription
//通过各种通道(chan)和事件订阅(subscription)的方式,接收和发送包括交易和区块在内的数据更新。: P- `/ ^# I2 s( n+ O8 c
//当然在应用中,订阅也往往利用通道来实现事件通知。6 Z8 a g. j% t* n+ M G
// channels for fetcher, syncer, txsyncLoop
newPeerCh chan *peer
txsyncCh chan *txsync: h( v( D/ s3 p$ z5 M: Y4 R/ d
quitSync chan struct{}0 B1 S- I# }7 ~2 C" b
noMorePeers chan struct{}
// wait group is used for graceful shutdowns during downloading$ \4 g3 l" Z! X. _+ N
// and processing' t2 p# b8 B3 w! v
wg sync.WaitGroup
}
Start()函数是ProtocolManager的启动函数,它会在eth.Ethereum.Start()中被主动调用。
ProtocolManager.Start()会启用4个单独线程(goroutine,协程)去分别执行4个函数,
这也标志着该以太坊个体p2p通信的全面启动。
func (pm *ProtocolManager) Start(maxPeers int) {
pm.maxPeers = maxPeers7 s9 k* }* r- P( M% E' D
// broadcast transactions6 b) N S' f. S% F" q% z
//广播交易的通道。 txsCh会作为txpool的TxPreEvent订阅通道。, T( ?* J& B/ j) o7 h7 `+ k
//txpool有了这种消息会通知给这个txsCh。 广播交易的goroutine会把这个消息广播出去。
pm.txsCh = make(chan core.NewTxsEvent, txChanSize)* k' J/ j* S) g ]+ e
//订阅交易信息
pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh)
7 _, t/ F9 O3 W; A; l
go pm.txBroadcastLoop()
//订阅挖矿消息。当新的Block被挖出来的时候会产生消息
// broadcast mined blocks1 `( n/ z$ j# q4 G O" Y% X$ e
pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})4 b1 \# S. O3 w0 o W6 C0 ]4 C- ^* W
//挖矿广播 goroutine 当挖出来的时候需要尽快的广播到网络上面去。
go pm.minedBroadcastLoop()
// start sync handlers% o0 Y- O4 a. _! [2 @" |
// 同步器负责周期性地与网络同步,下载散列和块以及处理通知处理程序。1 j& b; q" z/ y
go pm.syncer()
// txsyncLoop负责每个新连接的初始事务同步。 当新的peer出现时,
// 转发所有当前待处理的事务。为了最小化出口带宽使用,我们一次只发送一个小包。( L H: [' A. t" k! x- I6 ?! z
go pm.txsyncLoop()8 G6 l" ?" E# h: h" p) H
}# b8 | B. L7 X9 V, i( |3 f: w
//txBroadcastLoop()会在txCh通道的收端持续等待,一旦接收到有关新交易的事件,
//会立即调用BroadcastTx()函数广播给那些尚无该交易对象的相邻个体。2 b1 S, a* q7 z( M' M% B
//------------------go pm.txBroadcastLoop()-----------------------
func (pm *ProtocolManager) txBroadcastLoop() {# n& F3 i" h& s8 Q4 P( n3 t8 ]
for {$ Y* a9 K# x2 G5 o4 H2 |
select {% X6 K4 G3 b5 M _' E
case event := = pm.maxPeers && !p.Peer.Info().Network.Trusted {
return p2p.DiscTooManyPeers- P h% w" L6 h) N
}
p.Log().Debug("Ethereum peer connected", "name", p.Name())7 L3 @$ u8 {$ J# W& b9 M- t
// Execute the Ethereum handshake
var (
genesis = pm.blockchain.Genesis()
head = pm.blockchain.CurrentHeader()
hash = head.Hash()
number = head.Number.Uint64()
td = pm.blockchain.GetTd(hash, number)
)
//握手,与对方peer沟通己方的区块链状态 M; a0 o; C' h0 [' G6 j
if err := p.Handshake(pm.networkID, td, hash, genesis.Hash()); err != nil {
p.Log().Debug("Ethereum handshake failed", "err", err)
return err
}
//初始化一个读写通道,用以跟对方peer相互数据传输。! s! ]/ G& O5 o, C8 @- J) m D! f
if rw, ok := p.rw.(*meteredMsgReadWriter); ok {
rw.Init(p.version)
}
// Register the peer locally( o# O% P+ T9 X" G" w0 U" q: k
//注册对方peer,存入己方peer列表;只有handle()函数退出时,才会将这个peer移除出列表。, s t: Z# c" K$ {
if err := pm.peers.Register(p); err != nil {" a5 R5 a6 [* V, s, |
p.Log().Error("Ethereum peer registration failed", "err", err)" q! m! ^2 V/ h. u9 d$ G8 r3 _$ [
return err
}9 f: g* {7 }3 T# N4 U% y
defer pm.removePeer(p.id)7 s) u3 L* O% R6 ]' O) D
//Downloader成员注册这个新peer;Downloader会自己维护一个相邻peer列表。7 m% F0 _, O3 W# E6 l* W9 R
// Register the peer in the downloader. If the downloader considers it banned, we disconnect1 k, {" J3 P! K9 S! e
if err := pm.downloader.RegisterPeer(p.id, p.version, p); err != nil {6 |& n$ s3 l! P% x9 {. G' m
return err
}. L8 |6 V {3 [2 u' f
// Propagate existing transactions. new transactions appearing- |9 m, m6 r( v3 C B8 C2 B
// after this will be sent via broadcasts.- h" _. j4 s8 h4 Z0 Z
/*4 ]4 \9 y" Q, X" m( n# ?8 E
调用syncTransactions(),用当前txpool中新累计的tx对象组装成一个txsync{}对象,, `2 I8 C5 E( e; K" X2 N& @$ [' f* R' z V
推送到内部通道txsyncCh。还记得Start()启动的四个函数么? 其中第四项txsyncLoop()
中用以等待txsync{}数据的通道txsyncCh,正是在这里被推入txsync{}的。
*/
pm.syncTransactions(p)
// If we're DAO hard-fork aware, validate any remote peer with regard to the hard-fork* k! |9 Y) ?( k: y& q
if daoBlock := pm.chainconfig.DAOForkBlock; daoBlock != nil {
// Request the peer's DAO fork header for extra-data validation
if err := p.RequestHeadersByNumber(daoBlock.Uint64(), 1, 0, false); err != nil {8 J- C6 { Y/ Z
return err
}
// Start a timer to disconnect if the peer doesn't reply in time
p.forkDrop = time.AfterFunc(daoChallengeTimeout, func() {/ I+ f5 U6 Q& u- U' Y7 @3 _. h
p.Log().Debug("Timed out DAO fork-check, dropping")! [6 j6 Z5 R" K0 x/ b6 A, e
pm.removePeer(p.id)
}), U/ k0 f' Y5 F& h8 b& ]# X$ F I. X
// Make sure it's cleaned up if the peer dies off( t y x" Y8 V' q) [% \! w, g, G
defer func() {
if p.forkDrop != nil {) p1 Z, d5 R/ F3 N0 d$ I0 O0 Q
p.forkDrop.Stop()
p.forkDrop = nil' g+ B# m& q0 e2 K% o9 P: ]3 S
}& q8 x7 O( h7 t+ n5 ?
}()7 }9 A) k2 T" V+ F7 w! G
}$ Y" ^; q( J$ H; a2 o) I3 h
//在无限循环中启动handleMsg(),当对方peer发出任何msg时,
//handleMsg()可以捕捉相应类型的消息并在己方进行处理。8 s/ t4 m' E0 t( |4 A: a3 U% @/ K: L
// main loop. handle incoming messages.- g3 Q+ K, Y& o0 k4 a9 o
for {
if err := pm.handleMsg(p); err != nil {# |" f5 a/ q# @" P/ r8 @# _
p.Log().Debug("Ethereum message handling failed", "err", err)) ], D. b8 D9 r! I( b; F6 t! E/ h" A
return err
}
}% N2 T6 b4 I8 z- n* C; {8 p
}
成为第一个吐槽的人