深入区块链以太坊源码之p2p通信
Mohammad61417
post on 2022-12-7 15:31:22
55
0
0
无结构化的:
这种p2p网络即最普通的,不对结构作特别设计的实现方案。
优点是结构简单易于组建,网络局部区域内个体可任意分布,
反正此时网络结构对此也没有限制;特别是在应对大量新个体加
入网络和旧个体离开网络(“churn”)时它的表现非常稳定。
缺点在于在该网络中查找数据的效率太低,因为没有预知信息,
所以往往需要将查询请求发遍整个网络(至少大多数个体),
这会占用很大一部分网络资源,并大大拖慢网络中其他业务运行。
结构化的:
这种p2p网络中的个体分布经过精心设计,主要目的是为了提高查询数据的效率,
降低查询数据带来的资源消耗。
以太坊采用了不需要结构化的结构,经过改进的非结构化(比如设计好相邻个体列表peerSet结构)
网络模型可以满足需求;
二、分布式hash表(DHT)
保存数据
(以下只是大致原理,具体的协议实现可能会有差异)
当某个节点得到了新加入的数据(K/V),它会先计算自己与新数据的 key 之间的“距离”;
然后再计算它所知道的其它节点与这个 key 的距离。
如果计算下来,自己与 key 的距离最小,那么这个数据就保持在自己这里。
否则的话,把这个数据转发给距离最小的节点。
收到数据的另一个节点,也采用上述过程进行处理(递归处理)。
获取数据
(以下只是大致原理,具体的协议实现可能会有差异)
当某个节点接收到查询数据的请求(key),它会先计算自己与 key 之间的“距离”;
然后再计算它所知道的其它节点与这个 key 的距离。
如果计算下来,自己与 key 的距离最小,那么就在自己这里找有没有 key 对应的 value。
有的话就返回 value,没有的话就报错。
否则的话,把这个数据转发给距离最小的节点。
收到数据的另一个节点,也采用上述过程进行处理(递归处理)。
三、以太坊中p2p通信的管理模块ProtocolManager
/geth.go
// Start creates a live P2P node and starts running it.
func (n *Node) Start() error {
return n.node.Start()
}
/*
Protocol:容纳应用程序所要求的回调函数等.并通过p2p.Server{}在新连接建立后,将其传递给通信对象peer。
Node.Start()中首先会创建p2p.Server{},此时Server中的Protocol[]还是空的;
然后将Node中载入的所有实现体中的Protocol都收集起来,
一并交给Server对象,作为Server.Protocols列表;然后启动Server对象,
并将Server对象作为参数去逐一启动每个实现体。
*/
/node.go
// Start create a live P2P node and starts running it.
func (n *Node) Start() error {
...
/*
...
初始化serverConfig
*/
running := &p2p.Server{Config: n.serverConfig}
...
// Gather the protocols and start the freshly assembled P2P server
for _, service := range services {
running.Protocols = append(running.Protocols, service.Protocols()...)
}
if err := running.Start(); err != nil { //见下面的(srv *Server)Start方法
return convertFileLockError(err)
}
// Start each of the services
started := []reflect.Type{}
for kind, service := range services {
// Start the next service, stopping all previous upon failure
//启动每个services通过下面的方法func (s *Ethereum) Start(srvr *p2p.Server) error {
if err := service.Start(running); err != nil {
for _, kind := range started {
services[kind].Stop()
}
running.Stop()
return err
}
...
}
}
// Start starts running the server.
// Servers can not be re-used after stopping.
func (srv *Server) Start() (err error) {
srv.lock.Lock()
//srv.lock为了避免多线程重复启动
defer srv.lock.Unlock()
if srv.running {
return errors.New("server already running")
}
srv.running = true
srv.log = srv.Config.Logger
if srv.log == nil {
srv.log = log.New()
}
if srv.NoDial && srv.ListenAddr == "" {
srv.log.Warn("P2P server will be useless, neither dialing nor listening")
}
// static fields
if srv.PrivateKey == nil {
return fmt.Errorf("Server.PrivateKey must be set to a non-nil key")
}
//newTransport使用了newRLPX使用了rlpx.go中的网络协议。
if srv.newTransport == nil {
srv.newTransport = newRLPX
}
if srv.Dialer == nil {
srv.Dialer = TCPDialer{&net.Dialer{Timeout: defaultDialTimeout}}
}
srv.quit = make(chan struct{})
srv.addpeer = make(chan *conn)
srv.delpeer = make(chan peerDrop)
srv.posthandshake = make(chan *conn)
srv.addstatic = make(chan *enode.Node)
srv.removestatic = make(chan *enode.Node)
srv.addtrusted = make(chan *enode.Node)
srv.removetrusted = make(chan *enode.Node)
srv.peerOp = make(chan peerOpFunc)
srv.peerOpDone = make(chan struct{})
//srv.setupLocalNode()这里主要执行握手
if err := srv.setupLocalNode(); err != nil {
return err
}
if srv.ListenAddr != "" {
//监听TCP端口-->用于业务数据传输,基于RLPx协议)
//在setupListening中有个go srv.listenLoop()去监听某个端口有无主动发来的IP连接
if err := srv.setupListening(); err != nil {
return err
}
}
//侦听UDP端口(用于结点发现内部会启动goroutine)
if err := srv.setupDiscovery(); err != nil {
return err
}
dynPeers := srv.maxDialedConns()
dialer := newDialState(srv.localnode.ID(), srv.StaticNodes, srv.BootstrapNodes, srv.ntab, dynPeers, srv.NetRestrict)
srv.loopWG.Add(1)
// 启动新线程发起TCP连接请求
//在run()函数中,监听srv.addpeer通道有没有信息如果有远端peer发来连接请求,
//则调用Server.newPeer()生成新的peer对象,并把Server.Protocols全交给peer。
/*
case c := 0 {
if s.config.LightPeers >= srvr.MaxPeers {
return fmt.Errorf("invalid peer config: light peer count (%d) >= total peer count (%d)", s.config.LightPeers, srvr.MaxPeers)
}
maxPeers -= s.config.LightPeers
}
// Start the networking layer and the light server if requested
s.protocolManager.Start(maxPeers)
if s.lesServer != nil {
s.lesServer.Start(srvr)
}
return nil
}
/eth/handler.go
type ProtocolManager struct {
networkID uint64
fastSync uint32 // Flag whether fast sync is enabled (gets disabled if we already have blocks)
acceptTxs uint32 // Flag whether we're considered synchronised (enables transaction processing)
txpool txPool
blockchain *core.BlockChain
chainconfig *params.ChainConfig
maxPeers int
//Downloader类型成员负责所有向相邻个体主动发起的同步流程。
downloader *downloader.Downloader
//Fetcher类型成员累积所有其他个体发送来的有关新数据的宣布消息,并在自身对照后做出安排
fetcher *fetcher.Fetcher
//用来缓存相邻个体列表,peer{}表示网络中的一个远端个体。
peers *peerSet
SubProtocols []p2p.Protocol
eventMux *event.TypeMux
txsCh chan core.NewTxsEvent
txsSub event.Subscription
minedBlockSub *event.TypeMuxSubscription
//通过各种通道(chan)和事件订阅(subscription)的方式,接收和发送包括交易和区块在内的数据更新。
//当然在应用中,订阅也往往利用通道来实现事件通知。
// channels for fetcher, syncer, txsyncLoop
newPeerCh chan *peer
txsyncCh chan *txsync
quitSync chan struct{}
noMorePeers chan struct{}
// wait group is used for graceful shutdowns during downloading
// and processing
wg sync.WaitGroup
}
Start()函数是ProtocolManager的启动函数,它会在eth.Ethereum.Start()中被主动调用。
ProtocolManager.Start()会启用4个单独线程(goroutine,协程)去分别执行4个函数,
这也标志着该以太坊个体p2p通信的全面启动。
func (pm *ProtocolManager) Start(maxPeers int) {
pm.maxPeers = maxPeers
// broadcast transactions
//广播交易的通道。 txsCh会作为txpool的TxPreEvent订阅通道。
//txpool有了这种消息会通知给这个txsCh。 广播交易的goroutine会把这个消息广播出去。
pm.txsCh = make(chan core.NewTxsEvent, txChanSize)
//订阅交易信息
pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh)
go pm.txBroadcastLoop()
//订阅挖矿消息。当新的Block被挖出来的时候会产生消息
// broadcast mined blocks
pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
//挖矿广播 goroutine 当挖出来的时候需要尽快的广播到网络上面去。
go pm.minedBroadcastLoop()
// start sync handlers
// 同步器负责周期性地与网络同步,下载散列和块以及处理通知处理程序。
go pm.syncer()
// txsyncLoop负责每个新连接的初始事务同步。 当新的peer出现时,
// 转发所有当前待处理的事务。为了最小化出口带宽使用,我们一次只发送一个小包。
go pm.txsyncLoop()
}
//txBroadcastLoop()会在txCh通道的收端持续等待,一旦接收到有关新交易的事件,
//会立即调用BroadcastTx()函数广播给那些尚无该交易对象的相邻个体。
//------------------go pm.txBroadcastLoop()-----------------------
func (pm *ProtocolManager) txBroadcastLoop() {
for {
select {
case event := = pm.maxPeers && !p.Peer.Info().Network.Trusted {
return p2p.DiscTooManyPeers
}
p.Log().Debug("Ethereum peer connected", "name", p.Name())
// Execute the Ethereum handshake
var (
genesis = pm.blockchain.Genesis()
head = pm.blockchain.CurrentHeader()
hash = head.Hash()
number = head.Number.Uint64()
td = pm.blockchain.GetTd(hash, number)
)
//握手,与对方peer沟通己方的区块链状态
if err := p.Handshake(pm.networkID, td, hash, genesis.Hash()); err != nil {
p.Log().Debug("Ethereum handshake failed", "err", err)
return err
}
//初始化一个读写通道,用以跟对方peer相互数据传输。
if rw, ok := p.rw.(*meteredMsgReadWriter); ok {
rw.Init(p.version)
}
// Register the peer locally
//注册对方peer,存入己方peer列表;只有handle()函数退出时,才会将这个peer移除出列表。
if err := pm.peers.Register(p); err != nil {
p.Log().Error("Ethereum peer registration failed", "err", err)
return err
}
defer pm.removePeer(p.id)
//Downloader成员注册这个新peer;Downloader会自己维护一个相邻peer列表。
// Register the peer in the downloader. If the downloader considers it banned, we disconnect
if err := pm.downloader.RegisterPeer(p.id, p.version, p); err != nil {
return err
}
// Propagate existing transactions. new transactions appearing
// after this will be sent via broadcasts.
/*
调用syncTransactions(),用当前txpool中新累计的tx对象组装成一个txsync{}对象,
推送到内部通道txsyncCh。还记得Start()启动的四个函数么? 其中第四项txsyncLoop()
中用以等待txsync{}数据的通道txsyncCh,正是在这里被推入txsync{}的。
*/
pm.syncTransactions(p)
// If we're DAO hard-fork aware, validate any remote peer with regard to the hard-fork
if daoBlock := pm.chainconfig.DAOForkBlock; daoBlock != nil {
// Request the peer's DAO fork header for extra-data validation
if err := p.RequestHeadersByNumber(daoBlock.Uint64(), 1, 0, false); err != nil {
return err
}
// Start a timer to disconnect if the peer doesn't reply in time
p.forkDrop = time.AfterFunc(daoChallengeTimeout, func() {
p.Log().Debug("Timed out DAO fork-check, dropping")
pm.removePeer(p.id)
})
// Make sure it's cleaned up if the peer dies off
defer func() {
if p.forkDrop != nil {
p.forkDrop.Stop()
p.forkDrop = nil
}
}()
}
//在无限循环中启动handleMsg(),当对方peer发出任何msg时,
//handleMsg()可以捕捉相应类型的消息并在己方进行处理。
// main loop. handle incoming messages.
for {
if err := pm.handleMsg(p); err != nil {
p.Log().Debug("Ethereum message handling failed", "err", err)
return err
}
}
}
BitMere.com is Information release platform,just provides information storage space services.
The opinions expressed are solely those of the author,Does not constitute advice, please treat with caution.
The opinions expressed are solely those of the author,Does not constitute advice, please treat with caution.
Write the first review