深入区块链以太坊源码之p2p通信
Mohammad61417
发表于 2022-12-7 15:31:22
279
0
0
无结构化的:4 D* g; c z* Y F/ h. Z& P
这种p2p网络即最普通的,不对结构作特别设计的实现方案。- Y% W2 m; U8 b0 n: e7 U
优点是结构简单易于组建,网络局部区域内个体可任意分布,
反正此时网络结构对此也没有限制;特别是在应对大量新个体加& B. ~+ n$ A7 g: c6 T. Y1 D
入网络和旧个体离开网络(“churn”)时它的表现非常稳定。8 B; C' {, C+ Y c
缺点在于在该网络中查找数据的效率太低,因为没有预知信息,- O- k) x! Q" M v1 q L
所以往往需要将查询请求发遍整个网络(至少大多数个体),
这会占用很大一部分网络资源,并大大拖慢网络中其他业务运行。
结构化的:
这种p2p网络中的个体分布经过精心设计,主要目的是为了提高查询数据的效率,
降低查询数据带来的资源消耗。% |, x% h8 q& X T) k: r+ I
以太坊采用了不需要结构化的结构,经过改进的非结构化(比如设计好相邻个体列表peerSet结构)
网络模型可以满足需求;5 Q3 c( i: X: Z5 o8 O' t! r& U
二、分布式hash表(DHT)
保存数据
(以下只是大致原理,具体的协议实现可能会有差异)
当某个节点得到了新加入的数据(K/V),它会先计算自己与新数据的 key 之间的“距离”;
然后再计算它所知道的其它节点与这个 key 的距离。+ Q7 O. Z* q. X: O, D& t
如果计算下来,自己与 key 的距离最小,那么这个数据就保持在自己这里。/ a! N7 o b! |* z3 x9 G6 ~
否则的话,把这个数据转发给距离最小的节点。
收到数据的另一个节点,也采用上述过程进行处理(递归处理)。3 I& Y% b- x- b. N5 U. q! r
获取数据
(以下只是大致原理,具体的协议实现可能会有差异)& t: v: P" B5 m( [* @
当某个节点接收到查询数据的请求(key),它会先计算自己与 key 之间的“距离”;
然后再计算它所知道的其它节点与这个 key 的距离。& f7 \5 D& N% C1 i$ ]/ j
如果计算下来,自己与 key 的距离最小,那么就在自己这里找有没有 key 对应的 value。
有的话就返回 value,没有的话就报错。
否则的话,把这个数据转发给距离最小的节点。$ a+ Q) k/ e& u$ B3 C7 Q
收到数据的另一个节点,也采用上述过程进行处理(递归处理)。
三、以太坊中p2p通信的管理模块ProtocolManager
/geth.go
// Start creates a live P2P node and starts running it.* B A9 T5 C$ m8 M- d
func (n *Node) Start() error {
return n.node.Start()4 Y+ b: o7 ~5 u
}
/*
Protocol:容纳应用程序所要求的回调函数等.并通过p2p.Server{}在新连接建立后,将其传递给通信对象peer。9 t$ s' W" `: {. v
Node.Start()中首先会创建p2p.Server{},此时Server中的Protocol[]还是空的;
然后将Node中载入的所有实现体中的Protocol都收集起来,
一并交给Server对象,作为Server.Protocols列表;然后启动Server对象,/ M3 n8 R) m( Q- @/ \
并将Server对象作为参数去逐一启动每个实现体。
*/( z( T9 r. y/ |: z# V
/node.go
// Start create a live P2P node and starts running it. c7 V3 w" ?0 r' y: `
func (n *Node) Start() error {; k4 {6 F5 C0 O6 E) c9 p: u0 f5 x. C
* g2 i# ~& e9 Q% [9 k
...$ C" P7 t6 [+ D! y/ G
/*1 q; s, Z) i( x7 m2 W0 N$ e# r
...
初始化serverConfig
*/( x7 {/ F; D9 k
running := &p2p.Server{Config: n.serverConfig}6 `: O5 {+ X$ b% P0 l8 Z) |
...
// Gather the protocols and start the freshly assembled P2P server& O- E1 f& n) C1 h5 H9 s+ b
for _, service := range services {/ N; C- C- g% ]2 F6 X7 |+ y
running.Protocols = append(running.Protocols, service.Protocols()...)6 ?3 i* G: ]; b+ h/ \
}; Q0 Y2 |. v/ l4 m/ P6 y6 D
if err := running.Start(); err != nil { //见下面的(srv *Server)Start方法
return convertFileLockError(err)
}4 G& \% W( x7 r Z
// Start each of the services
started := []reflect.Type{}+ {# k1 Y! Z5 N8 _( v
for kind, service := range services {7 ~0 [* E. P# v! M5 ]& l
// Start the next service, stopping all previous upon failure
//启动每个services通过下面的方法func (s *Ethereum) Start(srvr *p2p.Server) error {
if err := service.Start(running); err != nil {" z8 w. F s; y. G, L
for _, kind := range started {% j6 p, l7 ^+ k, J5 p
services[kind].Stop()5 S& h5 m9 r. O) U2 ~; Z) N
}! f# {$ H9 e& ]1 V* a7 r7 O) j6 K/ S
running.Stop()
return err7 Y9 p% ` b$ n( \2 K7 d
}
...
}8 ]+ b, Y% z& P0 M& x- K
}
// Start starts running the server.
// Servers can not be re-used after stopping.
func (srv *Server) Start() (err error) {
srv.lock.Lock()
//srv.lock为了避免多线程重复启动9 O2 {. G5 A% X0 p. q: C" O
defer srv.lock.Unlock()0 A" |8 q$ C/ p% _& E6 _1 N' l
if srv.running {0 _ R, c# i! G; G- g p' k/ G; A
return errors.New("server already running")" j3 ]- o; D- h) Z% @ D
}
srv.running = true
srv.log = srv.Config.Logger
if srv.log == nil {
srv.log = log.New(); T& s0 S) p' K) C3 t& L# j
}
if srv.NoDial && srv.ListenAddr == "" {+ q4 i- B0 W. w) t' n* \; b2 M
srv.log.Warn("P2P server will be useless, neither dialing nor listening")& _, A9 X3 Y/ L
}
// static fields
if srv.PrivateKey == nil {
return fmt.Errorf("Server.PrivateKey must be set to a non-nil key"): t) A+ C9 {% n9 C
}. z& ]) F3 g/ |# w0 u
//newTransport使用了newRLPX使用了rlpx.go中的网络协议。
if srv.newTransport == nil {
srv.newTransport = newRLPX; ^$ h5 c6 m/ p, c3 S
}
if srv.Dialer == nil {
srv.Dialer = TCPDialer{&net.Dialer{Timeout: defaultDialTimeout}}! n: p2 f0 ~" t$ o, ]
}
srv.quit = make(chan struct{})) @! i4 ^1 j$ k$ n- ]
srv.addpeer = make(chan *conn)
srv.delpeer = make(chan peerDrop)6 B' t4 ?5 |) j! x ]- r( F$ N
srv.posthandshake = make(chan *conn)
srv.addstatic = make(chan *enode.Node)6 }4 A1 W+ W/ M+ V& l7 g" s
srv.removestatic = make(chan *enode.Node)- z7 U+ C/ j* }6 q' Y
srv.addtrusted = make(chan *enode.Node)
srv.removetrusted = make(chan *enode.Node)
srv.peerOp = make(chan peerOpFunc)# } i) c& j J* M% c& _9 r8 r
srv.peerOpDone = make(chan struct{})
//srv.setupLocalNode()这里主要执行握手8 O; }' O* `4 u' ~ Q0 G. @! i: v
if err := srv.setupLocalNode(); err != nil {
return err
}
if srv.ListenAddr != "" {; n$ u& e1 b* m$ Y9 m9 d0 O
//监听TCP端口-->用于业务数据传输,基于RLPx协议)
//在setupListening中有个go srv.listenLoop()去监听某个端口有无主动发来的IP连接
if err := srv.setupListening(); err != nil {
return err
}
}0 H8 j7 l8 |; |$ M1 |$ ~, N
//侦听UDP端口(用于结点发现内部会启动goroutine)$ e I4 ]* V/ x% C- X
if err := srv.setupDiscovery(); err != nil {
return err
}
dynPeers := srv.maxDialedConns()
dialer := newDialState(srv.localnode.ID(), srv.StaticNodes, srv.BootstrapNodes, srv.ntab, dynPeers, srv.NetRestrict)
srv.loopWG.Add(1)
// 启动新线程发起TCP连接请求+ P) X- v8 q. Q( e
//在run()函数中,监听srv.addpeer通道有没有信息如果有远端peer发来连接请求,1 V1 Z; ]# \5 e# }9 P+ d
//则调用Server.newPeer()生成新的peer对象,并把Server.Protocols全交给peer。
/*. s; y) R& S5 i$ Q" B
case c := 0 {
if s.config.LightPeers >= srvr.MaxPeers {' S# b$ D8 p9 v! h2 \: x* ^
return fmt.Errorf("invalid peer config: light peer count (%d) >= total peer count (%d)", s.config.LightPeers, srvr.MaxPeers)7 c6 W% |; N' X' v( A
}
maxPeers -= s.config.LightPeers. Z! t! A9 l6 o4 V2 C
}
// Start the networking layer and the light server if requested
s.protocolManager.Start(maxPeers)
if s.lesServer != nil {
s.lesServer.Start(srvr)) j! A* G: l6 x4 p! y
}
return nil+ V" ^( ^) `5 {; V( ]' K
}
/eth/handler.go" n+ v0 W a' b9 M
type ProtocolManager struct { b. Q( t1 t) Z# x$ ^, Z
networkID uint64/ l/ r( g6 R9 \* L0 g
fastSync uint32 // Flag whether fast sync is enabled (gets disabled if we already have blocks)$ i/ s9 V4 x: ]5 z. r
acceptTxs uint32 // Flag whether we're considered synchronised (enables transaction processing)' k$ m4 f9 G, J2 r: b8 m
txpool txPool7 F% [5 V! T4 C; e! k1 Y
blockchain *core.BlockChain
chainconfig *params.ChainConfig5 F2 B' I) ^/ g( c% }
maxPeers int
//Downloader类型成员负责所有向相邻个体主动发起的同步流程。
downloader *downloader.Downloader3 V1 Z) [$ _8 T: P' _
//Fetcher类型成员累积所有其他个体发送来的有关新数据的宣布消息,并在自身对照后做出安排
fetcher *fetcher.Fetcher
//用来缓存相邻个体列表,peer{}表示网络中的一个远端个体。
peers *peerSet
SubProtocols []p2p.Protocol! {7 |; z K" X6 i
eventMux *event.TypeMux$ z& _7 k8 s2 m l4 W7 P6 M2 c
txsCh chan core.NewTxsEvent
txsSub event.Subscription
minedBlockSub *event.TypeMuxSubscription
* u" d7 v; h m2 F- p7 k
//通过各种通道(chan)和事件订阅(subscription)的方式,接收和发送包括交易和区块在内的数据更新。
//当然在应用中,订阅也往往利用通道来实现事件通知。
// channels for fetcher, syncer, txsyncLoop
newPeerCh chan *peer
txsyncCh chan *txsync- T2 w6 Q) m; e% ?# J+ {
quitSync chan struct{}1 p' `8 }: X Q' Z% t! h2 L
noMorePeers chan struct{}
// wait group is used for graceful shutdowns during downloading
// and processing
wg sync.WaitGroup
}
Start()函数是ProtocolManager的启动函数,它会在eth.Ethereum.Start()中被主动调用。1 ~" K( R3 T4 I+ @
ProtocolManager.Start()会启用4个单独线程(goroutine,协程)去分别执行4个函数,
这也标志着该以太坊个体p2p通信的全面启动。
func (pm *ProtocolManager) Start(maxPeers int) {
pm.maxPeers = maxPeers2 x# I& K0 m% d7 h
// broadcast transactions
//广播交易的通道。 txsCh会作为txpool的TxPreEvent订阅通道。
//txpool有了这种消息会通知给这个txsCh。 广播交易的goroutine会把这个消息广播出去。
pm.txsCh = make(chan core.NewTxsEvent, txChanSize)! ?# p& q- l) W! e
//订阅交易信息
pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh)/ _' H/ W, L5 g! |: }: u
go pm.txBroadcastLoop()
//订阅挖矿消息。当新的Block被挖出来的时候会产生消息
// broadcast mined blocks6 p$ G* W" g1 M5 b5 s: z1 {6 s1 O
pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})! q* w" l) N) i; _9 I
//挖矿广播 goroutine 当挖出来的时候需要尽快的广播到网络上面去。0 P' K# Y; g7 L# c3 P$ T; p
go pm.minedBroadcastLoop()4 J$ f5 W# R6 M+ @6 A
// start sync handlers
// 同步器负责周期性地与网络同步,下载散列和块以及处理通知处理程序。( L5 X* ^6 n) S9 v& U3 ?
go pm.syncer()) B/ }2 y0 G5 ]: x% U1 ^
// txsyncLoop负责每个新连接的初始事务同步。 当新的peer出现时,: d0 X% u7 A; A! x' W3 l
// 转发所有当前待处理的事务。为了最小化出口带宽使用,我们一次只发送一个小包。
go pm.txsyncLoop(); K2 {* Y2 }) B0 @, ]* |
}
//txBroadcastLoop()会在txCh通道的收端持续等待,一旦接收到有关新交易的事件,
//会立即调用BroadcastTx()函数广播给那些尚无该交易对象的相邻个体。
//------------------go pm.txBroadcastLoop()-----------------------4 I) Q/ q p5 b" F* W; W
func (pm *ProtocolManager) txBroadcastLoop() {
for {+ [- d& D+ S) U& k. S7 `
select {
case event := = pm.maxPeers && !p.Peer.Info().Network.Trusted {4 S3 r2 w; F: d a
return p2p.DiscTooManyPeers. G0 o6 Q7 u" q1 w: J0 X9 c
}; W. D5 g9 r" `: }
p.Log().Debug("Ethereum peer connected", "name", p.Name())
// Execute the Ethereum handshake& m3 i6 ^$ ^5 j- C$ _0 X3 [& I
var (
genesis = pm.blockchain.Genesis()
head = pm.blockchain.CurrentHeader()6 b6 E' m7 y4 U/ O+ M4 I* h9 a
hash = head.Hash()
number = head.Number.Uint64()5 k1 Q! r! {) |# U( Q4 s! K
td = pm.blockchain.GetTd(hash, number)9 h U$ L7 m( m5 l
)2 M1 P$ ^( @ }: I2 P8 l9 o, z6 N
//握手,与对方peer沟通己方的区块链状态
if err := p.Handshake(pm.networkID, td, hash, genesis.Hash()); err != nil {, U W% c/ o! C, |. {8 ]
p.Log().Debug("Ethereum handshake failed", "err", err)
return err
}
//初始化一个读写通道,用以跟对方peer相互数据传输。
if rw, ok := p.rw.(*meteredMsgReadWriter); ok {
rw.Init(p.version)
}
// Register the peer locally
//注册对方peer,存入己方peer列表;只有handle()函数退出时,才会将这个peer移除出列表。
if err := pm.peers.Register(p); err != nil {# f5 r1 G3 g% `
p.Log().Error("Ethereum peer registration failed", "err", err)
return err1 R( G [8 P/ K( A# Q" P
}$ p) i1 H. X) z% o. J p
defer pm.removePeer(p.id)
//Downloader成员注册这个新peer;Downloader会自己维护一个相邻peer列表。
// Register the peer in the downloader. If the downloader considers it banned, we disconnect
if err := pm.downloader.RegisterPeer(p.id, p.version, p); err != nil {
return err7 o3 Q1 `! F& r* w* @+ g- Y
}
// Propagate existing transactions. new transactions appearing1 F% g, p6 o+ x& q4 H
// after this will be sent via broadcasts.% U* e$ j. L) Y5 s) ~1 u4 U5 f
/*
调用syncTransactions(),用当前txpool中新累计的tx对象组装成一个txsync{}对象,( f9 R5 }5 F# E/ i X+ I
推送到内部通道txsyncCh。还记得Start()启动的四个函数么? 其中第四项txsyncLoop()
中用以等待txsync{}数据的通道txsyncCh,正是在这里被推入txsync{}的。
*/9 g* \3 H3 ~5 x" n
pm.syncTransactions(p)1 g/ c$ m" ^/ F2 x6 D
// If we're DAO hard-fork aware, validate any remote peer with regard to the hard-fork
if daoBlock := pm.chainconfig.DAOForkBlock; daoBlock != nil {
// Request the peer's DAO fork header for extra-data validation* s m! x9 f1 Q% J) A6 F& L
if err := p.RequestHeadersByNumber(daoBlock.Uint64(), 1, 0, false); err != nil {, W% \' c3 A: a$ e& {1 L2 G
return err+ ?0 @$ A2 b0 x: Y+ `6 u; s0 N
}
// Start a timer to disconnect if the peer doesn't reply in time9 L' c* }1 S5 R2 e7 j3 x
p.forkDrop = time.AfterFunc(daoChallengeTimeout, func() {
p.Log().Debug("Timed out DAO fork-check, dropping")8 V6 ?/ A- ^8 A/ x% e
pm.removePeer(p.id)
})
// Make sure it's cleaned up if the peer dies off
defer func() {
if p.forkDrop != nil {; h. k# N4 x) n% G+ ]
p.forkDrop.Stop()9 v) Z2 m" K; j* r6 {: s: u
p.forkDrop = nil
}
}()
}
//在无限循环中启动handleMsg(),当对方peer发出任何msg时,
//handleMsg()可以捕捉相应类型的消息并在己方进行处理。, ?6 c5 Z& i+ U5 F0 X+ K
// main loop. handle incoming messages.
for {. P, Y' V* u/ {2 ?! w- Z0 ]. ^* ~
if err := pm.handleMsg(p); err != nil {! r$ \3 I# [% z' v
p.Log().Debug("Ethereum message handling failed", "err", err)* a- k, k, C2 {" Q$ z* `! a) g
return err( n( T& b: `" k! K
}' E( p2 c. M! {% b# W
}
}
成为第一个吐槽的人