neo-共识算法dBFT 源码解析
viplovezmy
发表于 2022-11-29 23:02:32
111
0
0
dbft改进自算法pbft算法,pbft算法通过多次网络请求确认,最终获得多数共识。其缺点在于随着随着节点的增加,网络开销呈指数级增长,网络通信膨胀,难以快速达成一致。neo的解决方案是通过投票选取出一定数量的节点作为记账人,由此减少网络消耗又可以兼顾交易速度,这也是dbft中d的由来。
代码结构说明
├── Consensus
│ ├── ChangeView.cs //viewchange 消息
│ ├── ConsensusContext.cs //共识上下文
│ ├── ConsensusMessage.cs //共识消息
│ ├── ConsensusMessageType.cs //共识消息类型 ChangeView/PrepareRequest/PrepareResponse
│ ├── ConsensusService.cs //共识核心代码
│ ├── ConsensusState.cs //节点共识状态
│ ├── PrepareRequest.cs //请求消息
│ └── PrepareResponse.cs //签名返回消息
共识状态变化流程
1:开启共识的节点分为两大类,非记账人和记账人节点,非记账人的不参与共识,记账人参与共识流程2:选择议长,Neo议长产生机制是根据当前块高度和记账人数量做MOD运算得到,议长实际上按顺序当选3:节点初始化,议长为primary节点,议员为backup节点。4:满足出块条件后议长发送PrepareRequest5:议员收到请求后,验证通过签名发送PrepareResponse6:记账节点接收到PrepareResponse后,节点保存对方的签名信息,检查如果超过三分之二则发送 block7:节点接收到block,PersistCompleted事件触发后整体重新初始化,
输入图片说明
共识上下文核心成员
public const uint Version = 0;
public ConsensusState State; //节点当前共识状态
public UInt256 PrevHash;
public uint BlockIndex; //块高度
public byte ViewNumber; //试图状态
public ECPoint[] Validators; //记账人
public int MyIndex; //当前记账人次序
public uint PrimaryIndex; //当前记账的记账人
public uint Timestamp;
public ulong Nonce;
public UInt160 NextConsensus; //共识标识
public UInt256[] TransactionHashes;
public Dictionary[U] Transactions;
public byte[][] Signatures; //记账人签名
public byte[] ExpectedView; //记账人试图
public KeyPair KeyPair;
public int M => Validators.Length - (Validators.Length - 1) / 3; //三分之二数量
ExpectedView 维护视图状态中,用于在议长无法正常工作时重新发起新一轮共识。
Signatures 用于维护共识过程中的确认状态。
节点共识状态
[Flags]
internal enum ConsensusState : byte
{
Initial = 0x00, // 0
Primary = 0x01, // 1
Backup = 0x02, // 10
RequestSent = 0x04, // 100
RequestReceived = 0x08, // 1000
SignatureSent = 0x10, // 10000
BlockSent = 0x20, // 100000
ViewChanging = 0x40, //1000000
}
代理节点选择
neo的dbft算法的delegate部分主要体现在一个函数上面,每次产生区块后会重新排序资产,选择前记账人数量的节点出来参与下一轮共识。
///
/// 获取下一个区块的记账人列表
///
/// 返回一组公钥,表示下一个区块的记账人列表
public ECPoint[] GetValidators()
{
lock (_validators)
{
if (_validators.Count == 0)
{
_validators.AddRange(GetValidators(Enumerable.Empty[Tr]()));
}
return _validators.ToArray();
}
}
public virtual IEnumerable GetValidators(IEnumerable[Tr] others)
{
DataCache[U] accounts = GetStates[U]();
DataCache validators = GetStates();
MetaDataCache validators_count = GetMetaData();
///更新账号资产情况
foreach (Transaction tx in others)
{
foreach (TransactionOutput output in tx.Outputs)
{
AccountState account = accounts.GetAndChange(output.ScriptHash, () => new AccountState(output.ScriptHash));
if (account.Balances.ContainsKey(output.AssetId))
account.Balances[output.AssetId] += output.Value;
else
account.Balances[output.AssetId] = output.Value;
if (output.AssetId.Equals(GoverningToken.Hash) && account.Votes.Length > 0)
{
foreach (ECPoint pubkey in account.Votes)
validators.GetAndChange(pubkey, () => new ValidatorState(pubkey)).Votes += output.Value;
validators_count.GetAndChange().Votes[account.Votes.Length - 1] += output.Value;
}
}
foreach (var group in tx.Inputs.GroupBy(p => p.PrevHash))
{
Transaction tx_prev = GetTransaction(group.Key, out int height);
foreach (CoinReference input in group)
{
TransactionOutput out_prev = tx_prev.Outputs[input.PrevIndex];
AccountState account = accounts.GetAndChange(out_prev.ScriptHash);
if (out_prev.AssetId.Equals(GoverningToken.Hash))
{
if (account.Votes.Length > 0)
{
foreach (ECPoint pubkey in account.Votes)
{
ValidatorState validator = validators.GetAndChange(pubkey);
validator.Votes -= out_prev.Value;
if (!validator.Registered && validator.Votes.Equals(Fixed8.Zero))
validators.Delete(pubkey);
}
validators_count.GetAndChange().Votes[account.Votes.Length - 1] -= out_prev.Value;
}
}
account.Balances[out_prev.AssetId] -= out_prev.Value;
}
}
switch (tx)
{
#pragma warning disable CS0612
case EnrollmentTransaction tx_enrollment:
validators.GetAndChange(tx_enrollment.PublicKey, () => new ValidatorState(tx_enrollment.PublicKey)).Registered = true;
break;
#pragma warning restore CS0612
case StateTransaction tx_state:
foreach (StateDescriptor descriptor in tx_state.Descriptors)
switch (descriptor.Type)
{
case StateType.Account:
ProcessAccountStateDescriptor(descriptor, accounts, validators, validators_count);
break;
case StateType.Validator:
ProcessValidatorStateDescriptor(descriptor, validators);
break;
}
break;
}
}
//排序
int count = (int)validators_count.Get().Votes.Select((p, i) => new
{
Count = i,
Votes = p
}).Where(p => p.Votes > Fixed8.Zero).ToArray().WeightedFilter(0.25, 0.75, p => p.Votes.GetData(), (p, w) => new
{
p.Count,
Weight = w
}).WeightedAverage(p => p.Count, p => p.Weight);
count = Math.Max(count, StandbyValidators.Length);
HashSet sv = new HashSet(StandbyValidators);
ECPoint[] pubkeys = validators.Find().Select(p => p.Value).Where(p => (p.Registered && p.Votes > Fixed8.Zero) || sv.Contains(p.PublicKey)).OrderByDescending(p => p.Votes).ThenBy(p => p.PublicKey).Select(p => p.PublicKey).Take(count).ToArray();
IEnumerable result;
if (pubkeys.Length == count)
{
result = pubkeys;
}
else
{
HashSet hashSet = new HashSet(pubkeys);
for (int i = 0; i p);
}
议长选择
在初始化共识状态的时候会设置PrimaryIndex,获知当前议长。原理就是简单的MOD运算。 这里有分为两种情况,如果节点正常则直接块高度和记账人数量mod运算即可,如果存在一场情况,则需要根据view_number进行调整。
//file /Consensus/ConsensusService.cs InitializeConsensus方法
if (view_number == 0)
context.Reset(wallet);
else
context.ChangeView(view_number);
//file /Consensus/ConsensusContext.cs
public void ChangeView(byte view_number)
{
int p = ((int)BlockIndex - view_number) % Validators.Length;
State &= ConsensusState.SignatureSent;
ViewNumber = view_number;
PrimaryIndex = p >= 0 ? (uint)p : (uint)(p + Validators.Length);//当前记账人
if (State == ConsensusState.Initial)
{
TransactionHashes = null;
Signatures = new byte[Validators.Length][];
}
ExpectedView[MyIndex] = view_number;
_header = null;
}
//file /Consensus/ConsensusContext.cs
public void Reset(Wallet wallet)
{
State = ConsensusState.Initial;
PrevHash = Blockchain.Default.CurrentBlockHash;
BlockIndex = Blockchain.Default.Height + 1;
ViewNumber = 0;
Validators = Blockchain.Default.GetValidators();
MyIndex = -1;
PrimaryIndex = BlockIndex % (uint)Validators.Length; //当前记账人
TransactionHashes = null;
Signatures = new byte[Validators.Length][];
ExpectedView = new byte[Validators.Length];
KeyPair = null;
for (int i = 0; i
状态初始化
如果是议长则状态标记为ConsensusState.Primary,同时改变定时器触发事件,再上次出块15s后触发。议员则设置状态为 ConsensusState.Backup,时间调整为30s后触发,如果议长不能正常工作,则这个触发器会开始起作用(具体后边再详细分析)。
//file /Consensus/ConsensusContext.cs
private void InitializeConsensus(byte view_number)
{
lock (context)
{
if (view_number == 0)
context.Reset(wallet);
else
context.ChangeView(view_number);
if (context.MyIndex 1)
{ //广播自身的交易
InvPayload invPayload = InvPayload.Create(InventoryType.TX, context.TransactionHashes.Skip(1).ToArray());
foreach (RemoteNode node in localNode.GetRemoteNodes())
node.EnqueueMessage("inv", invPayload);
}
timer_height = context.BlockIndex;
timer_view = view_number;
TimeSpan span = DateTime.Now - block_received_time;
if (span >= Blockchain.TimePerBlock)
timer.Change(0, Timeout.Infinite);
else
timer.Change(Blockchain.TimePerBlock - span, Timeout.InfiniteTimeSpan);
}
else
{
context.State = ConsensusState.Backup;
timer_height = context.BlockIndex;
timer_view = view_number;
timer.Change(TimeSpan.FromSeconds(Blockchain.SecondsPerBlock
议长发起请求
议长到了该记账的时间后,执行下面方法,发送MakePrepareRequest请求,自身状态转变为RequestSent,也设置了30s后重复触发定时器(同样也是再议长工作异常时起效)。
//file /Consensus/ConsensusContext.cs
private void OnTimeout(object state)
{
lock (context)
{
if (timer_height != context.BlockIndex || timer_view != context.ViewNumber) return;
Log($"timeout: height={timer_height} view={timer_view} state={context.State}");
if (context.State.HasFlag(ConsensusState.Primary) && !context.State.HasFlag(ConsensusState.RequestSent))
{
Log($"send perpare request: height={timer_height} view={timer_view}");
context.State |= ConsensusState.RequestSent;
if (!context.State.HasFlag(ConsensusState.SignatureSent))
{
context.Timestamp = Math.Max(DateTime.Now.ToTimestamp(), Blockchain.Default.GetHeader(context.PrevHash).Timestamp + 1);
context.Signatures[context.MyIndex] = context.MakeHeader().Sign(context.KeyPair);
}
SignAndRelay(context.MakePrepareRequest());
timer.Change(TimeSpan.FromSeconds(Blockchain.SecondsPerBlock
议员广播签名信息
议员接收到PrepareRequest,会对节点信息进行验证,自身不存在的交易会去同步交易。交易同步完成后,验证通过会进行发送PrepareResponse ,包含自己的签名信息,表示自己已经通过了节点验证。状态转变为
ConsensusState.SignatureSent。
//file /Consensus/ConsensusContext.cs
private void OnPrepareRequestReceived(ConsensusPayload payload, PrepareRequest message)
{
Log($"{nameof(OnPrepareRequestReceived)}: height={payload.BlockIndex} view={message.ViewNumber} index={payload.ValidatorIndex} tx={message.TransactionHashes.Length}");
if (!context.State.HasFlag(ConsensusState.Backup) || context.State.HasFlag(ConsensusState.RequestReceived))
return;
if (payload.ValidatorIndex != context.PrimaryIndex) return;
if (payload.Timestamp DateTime.Now.AddMinutes(10).ToTimestamp())
{
Log($"Timestamp incorrect: {payload.Timestamp}");
return;
}
context.State |= ConsensusState.RequestReceived;
context.Timestamp = payload.Timestamp;
context.Nonce = message.Nonce;
context.NextConsensus = message.NextConsensus;
context.TransactionHashes = message.TransactionHashes;
context.Transactions = new Dictionary[U]();
if (!Crypto.Default.VerifySignature(context.MakeHeader().GetHashData(), message.Signature, context.Validators[payload.ValidatorIndex].EncodePoint(false))) return;
context.Signatures = new byte[context.Validators.Length][];
context.Signatures[payload.ValidatorIndex] = message.Signature;
Dictionary[U] mempool = LocalNode.GetMemoryPool().ToDictionary(p => p.Hash);
foreach (UInt256 hash in context.TransactionHashes.Skip(1))
{
if (mempool.TryGetValue(hash, out Transaction tx))
if (!AddTransaction(tx, false))
return;
}
if (!AddTransaction(message.MinerTransaction, true)) return;
if (context.Transactions.Count !context.Transactions.ContainsKey(i)).ToArray();
LocalNode.AllowHashes(hashes);
InvPayload msg = InvPayload.Create(InventoryType.TX, hashes);
foreach (RemoteNode node in localNode.GetRemoteNodes())
node.EnqueueMessage("getdata", msg);
}
}
//file /Consensus/ConsensusContext.cs
private bool AddTransaction(Transaction tx, bool verify)
{
if (Blockchain.Default.ContainsTransaction(tx.Hash) ||
(verify && !tx.Verify(context.Transactions.Values)) ||
!CheckPolicy(tx))
{
Log($"reject tx: {tx.Hash}{Environment.NewLine}{tx.ToArray().ToHexString()}");
RequestChangeView();
return false;
}
context.Transactions[tx.Hash] = tx;
if (context.TransactionHashes.Length == context.Transactions.Count)
{
if (Blockchain.GetConsensusAddress(Blockchain.Default.GetValidators(context.Transactions.Values).ToArray()).Equals(context.NextConsensus))
{
Log($"send perpare response");
context.State |= ConsensusState.SignatureSent;
context.Signatures[context.MyIndex] = context.MakeHeader().Sign(context.KeyPair);
SignAndRelay(context.MakePrepareResponse(context.Signatures[context.MyIndex]));
CheckSignatures();
}
else
{
RequestChangeView();
return false;
}
}
return true;
}
共识达成后广播区块
其他节点接收到PrepareResponse后,在自己的签名列表中记录对方的签名信息,再检查自己的签名列表是否有超过三分之二的签名了,有则判断共识达成开始广播生成的区块。状态状变为ConsensusState.BlockSent。
private void CheckSignatures()
{
if (context.Signatures.Count(p => p != null) >= context.M && context.TransactionHashes.All(p => context.Transactions.ContainsKey(p)))
{
Contract contract = Contract.CreateMultiSigContract(context.M, context.Validators);
Block block = context.MakeHeader();
ContractParametersContext sc = new ContractParametersContext(block);
for (int i = 0, j = 0; i context.Transactions).ToArray();
Log($"relay block: {block.Hash}");
if (!localNode.Relay(block))
Log($"reject block: {block.Hash}");
context.State |= ConsensusState.BlockSent;
}
}
Blockchain_PersistCompleted后恢复状态
区块广播后,节点接收到这个信息,会保存区块,保存完成后触发PersistCompleted事件,最终回到初始状态,重新开始新一轮的共识。
private void Blockchain_PersistCompleted(object sender, Block block)
{
Log($"persist block: {block.Hash}");
block_received_time = DateTime.Now;
InitializeConsensus(0);
}
错误分析
如果议长无法完成记账任务
这种情况下议长在超出时间之后依然无法达成共识,此时议员会增长自身的ExpectedView值,并且广播出去,如果检查到三分之二的成员viewnumber都加了1,则在这些相同viewnumber的节点之间开始新一轮共识,重新选择议长,发起共识。如果此时原来的议长恢复正常,time到时间后会增长自身的viewnumber,慢慢追上当前的视图状态。
成为第一个吐槽的人