之前阅读过以太坊1.7.x版本的源码,现在升级到了1.9.25,很多源码都做了很大的改动,网上还没有相关的源码阅读,因此决定自己分享一下。
本文介绍挖矿相关的源码,其余部分不做赘述。
geth启动时,进入的是cmd/geth/main.go
中的main方法,执行main方法之前会先执行init方法,init方法中加载了很多命令行参数,关于命令行参数的介绍:Go语言命令行库urfave/cli简介 。
启动geth时,如果没有加console参数,会执行main.go中的geth方法:
func geth(ctx *cli.Context) error {
if args := ctx.Args(); len(args) > 0 {
return fmt.Errorf("invalid command: %q", args[0])
}
prepare(ctx)//检查运行环境及机器配置等
stack, backend := makeFullNode(ctx)//stack就是node实例,backend是service
defer stack.Close()
startNode(ctx, stack, backend)
stack.Wait()
return nil
}
如果加了console参数,会执行
cmd/geth/consolecmd.go中的localConsole方法:
func localConsole(ctx *cli.Context) error {
// Create and start the node based on the CLI flags
prepare(ctx)//检查运行环境及机器配置等
stack, backend := makeFullNode(ctx)
startNode(ctx, stack, backend)
defer stack.Close()
.....
}
两处地方都是先执行makeFullNode方法,再执行startNode方法。
1,makeFullNode
func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) {
stack, cfg := makeConfigNode(ctx)//配置节点,配置级别:启动参数>配置文件>默认配置
backend := utils.RegisterEthService(stack, &cfg.Eth)//注册ethService
...
}
重点关注一下RegisterEthService方法中的New方法,位于eth/backend.go:
func RegisterEthService(stack *node.Node, cfg *eth.Config) ethapi.Backend {
...
backend, err := eth.New(stack, cfg)
...
return backend.APIBackend
}
func New(stack *node.Node, config *Config) (*Ethereum, error) {
// 设置轻节点及同步模式
// 设置gasPrice
// 设置快照缓存
// 设置创世块SetupGenesisBlock
// 初始化Ethereum对象等等
...
// 创建矿工,创建完成后等待启动停止信号
eth.miner = miner.New(eth, &config.Miner, chainConfig, eth.EventMux(), eth.engine, eth.isLocalBlock)
eth.miner.SetExtra(makeExtraData(config.Miner.ExtraData))
...
}
进入miner.New方法
func New(eth Backend, config *Config, chainConfig *params.ChainConfig, mux *event.TypeMux, engine consensus.Engine, isLocalBlock func(block *types.Block) bool) *Miner {
miner := &Miner{
eth: eth,
mux: mux,
engine: engine,
exitCh: make(chan struct{}),
startCh: make(chan common.Address),
stopCh: make(chan struct{}),
worker: newWorker(config, chainConfig, engine, eth, mux, isLocalBlock, true),
}
go miner.update()
return miner
}
1.1 newWorker
位于miner/worker.go
func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux, isLocalBlock func(*types.Block) bool, init bool) *worker {
//初始化worker对象
worker := &worker{
config: config,
chainConfig: chainConfig,
engine: engine,
eth: eth,
mux: mux,
chain: eth.BlockChain(),
isLocalBlock: isLocalBlock,
localUncles: make(map[common.Hash]*types.Block),
remoteUncles: make(map[common.Hash]*types.Block),
unconfirmed: newUnconfirmedBlocks(eth.BlockChain(), miningLogAtDepth),
pendingTasks: make(map[common.Hash]*task),
txsCh: make(chan core.NewTxsEvent, txChanSize),
chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize),
chainSideCh: make(chan core.ChainSideEvent, chainSideChanSize),
newWorkCh: make(chan *newWorkReq),
taskCh: make(chan *task),
resultCh: make(chan *types.Block, resultQueueSize),
exitCh: make(chan struct{}),
startCh: make(chan struct{}, 1),
resubmitIntervalCh: make(chan time.Duration),
resubmitAdjustCh: make(chan *intervalAdjust, resubmitAdjustChanSize),
}
...
//重点关注这几个loop
go worker.mainLoop()
go worker.newWorkLoop(recommit)
go worker.resultLoop()
go worker.taskLoop()
// 提交一个初始化的pending状态
if init {
worker.startCh <- struct{}{}
}
return worker
}
四个loop都是开启无限循环等待指示,接收到不同channel发送过来的命令会执行不同的操作
这些channel都定义在miner/worker.go中的type worker struct {}
中
1.1.1 mainLoop()监听的channel
newWorkCh
chainSideCh
txsCh
exitCh
txsSub.Err()
chainHeadSub.Err()
chainSideSub.Err()
case req := <-w.newWorkCh:
w.commitNewWork(req.interrupt, req.noempty, req.timestamp)
执行commitNewWork方法
func (w *worker) commitNewWork(interrupt *int32, noempty bool, timestamp int64) {
...
//检查系统时间和当前区块时间差异
//新建一个区块头,填入父区块号,区块高度,gasLimit,时间戳等
//检查coinbase是否存在
//Prepare方法是consenus包下面定义的Engine的接口,由consenus/ethash和clique包分别实现其所有方法,包含Prepare方法
//ethash实现的是pos算法,clique实现的是poa算法,poa算法主要运行在测试网上,之所以不在测试网也使用pos是因为测试网算力不足,pos会遭到攻击
//本文主要分析ethash算法的挖矿过程,除了特别说明,下文所有介绍的Engine接口的实现算法都是ethash算法
//Prepare方法是计算当前区块所需要的难度值,即header.Difficulty,方便后续计算hash的时候做比较(pos算法精髓)
if err := w.engine.Prepare(w.chain, header); err != nil {
log.Error("Failed to prepare header for mining", "err", err)
return
}
//检查是否有DAO硬分叉
//组装叔块,叔块不能距离当前块的高度超过7
//组装交易池中的交易
//组装成区块并提交新交易
w.commit(uncles, w.fullTaskHook, true, tstart)
}
commit方法
func (w *worker) commit(uncles []*types.Header, interval func(), update bool, start time.Time) error {
...
//FinalizeAndAssemble也是ethash包中的方法,根据传入参数组装成区块
block, err := w.engine.FinalizeAndAssemble(w.chain, w.current.header, s, w.current.txs, uncles, receipts)
...
//将组装好的区块和数据发送给taskCh这个channel,该channel是在taskLoop中监听的channel,详见下文
case w.taskCh <- &task{receipts: receipts, state: s, block: block, createdAt: time.Now()}:
w.unconfirmed.Shift(block.NumberU64() - 1)
log.Info("Commit new mining work", "number", block.Number(), "sealhash", w.engine.SealHash(block.Header()),
"uncles", len(uncles), "txs", w.current.tcount,
"gas", block.GasUsed(), "fees", totalFees(block, receipts),
"elapsed", common.PrettyDuration(time.Since(start)))
...
}
chainSideCh
这个channel仅是用于测试环境,不再分析
txsCh
接收交易的channel,接收到交易后,如果共识引擎没有工作,就把交易提交到交易池中。如果共识引擎没在工作,但当前是dev环境或者poa共识,那么也会调用commitNewWork方法参与出块
exitCh
接收到退出信号,停止mainLoop循环
txsSub.Err()
订阅交易错误,停止mainLoop循环
chainHeadSub.Err()
订阅区块头错误,停止mainLoop循环
chainSideSub.Err()
订阅ChainSide错误,停止mainLoop循环
1.1.2 newWorkLoop()监听的channel
startCh
chainHeadCh
timer.C
resubmitIntervalCh
resubmitAdjustCh
exitCh
先看看commit和clearPending两个方法:
//中止正在执行的事务,然后重新提交一个新的任务,向newWorkCh通道发消息,mainLoop接收到消息开始工作
commit := func(noempty bool, s int32) {
if interrupt != nil {
atomic.StoreInt32(interrupt, s)
}
interrupt = new(int32)//0
w.newWorkCh <- &newWorkReq{interrupt: interrupt, noempty: noempty, timestamp: timestamp}
timer.Reset(recommit)
atomic.StoreInt32(&w.newTxs, 0)
}
// 清除旧的pending状态的任务,pendingTask是做什么用的会在taskLoop中说到
clearPending := func(number uint64) {
w.pendingMu.Lock()
for h, t := range w.pendingTasks {
if t.block.NumberU64()+staleThreshold <= number {
delete(w.pendingTasks, h)
}
}
w.pendingMu.Unlock()
}
startCh & chainHeadCh
都是先执行
clearPending
再执行commit
,向mainLoop传递newWork消息timer.C
每隔一段时间T(不能低于1秒),检查是否有新交易,如果有新交易,就把新交易也重新统计,方便找到更高手续费的交易,调用commit方法。如果没有新交易,则继续循环
resubmitIntervalCh
用于设置T的大小,即间隔多久会检查新交易,最小不能低于1秒,是通过api中的miner.SetRecommitInterval调用的
resubmitAdjustCh
用于增加或者减少矿工重新提交的时间间隔
exitCh
退出Loop
1.1.3 resultLoop()监听的channel
resultCh
exitCh
resultCh
case block := <-w.resultCh: // Short circuit when receiving empty result. if block == nil { continue } // Short circuit when receiving duplicate result caused by resubmitting. if w.chain.HasBlock(block.Hash(), block.NumberU64()) { continue } var ( sealhash = w.engine.SealHash(block.Header()) hash = block.Hash() ) w.pendingMu.RLock() task, exist := w.pendingTasks[sealhash] w.pendingMu.RUnlock() if !exist { log.Error("Block found but no relative pending task", "number", block.Number(), "sealhash", sealhash, "hash", hash) continue } // Different block could share same sealhash, deep copy here to prevent write-write conflict. var ( receipts = make([]*types.Receipt, len(task.receipts)) logs []*types.Log ) for i, receipt := range task.receipts { // add block location fields receipt.BlockHash = hash receipt.BlockNumber = block.Number() receipt.TransactionIndex = uint(i) receipts[i] = new(types.Receipt) *receipts[i] = *receipt // Update the block hash in all logs since it is now available and not when the // receipt/log of individual transactions were created. for _, log := range receipt.Logs { log.BlockHash = hash } logs = append(logs, receipt.Logs...) } // Commit block and state to database. _, err := w.chain.WriteBlockWithState(block, receipts, logs, task.state, true) if err != nil { log.Error("Failed writing block to chain", "err", err) continue } log.Info("Successfully sealed new block", "number", block.Number(), "sealhash", sealhash, "hash", hash, "elapsed", common.PrettyDuration(time.Since(task.createdAt))) // Broadcast the block and announce chain insertion event w.mux.Post(core.NewMinedBlockEvent{Block: block}) // Insert the block into the set of pending ones to resultLoop for confirmations w.unconfirmed.Insert(block.NumberU64(), block.Hash())
主要做了4件事:
1,校验收到区块的hash是否在pendingTask内
2,尝试写入本地账本、更改世界状态
3,广播出去
4,把该块加入待确认的块中,等待确认
exitCh
退出Loop
1.1.4 taskLoop()监听的channel
taskCh
exitCh
taskCh
taskCh接收到的消息,即为后续需要执行的task
case task := <-w.taskCh: if w.newTaskHook != nil { w.newTaskHook(task) } // 先调用SealHash方法,返回一个区块hash sealHash := w.engine.SealHash(task.block.Header()) if sealHash == prev { continue } // Interrupt previous sealing operation interrupt() stopCh, prev = make(chan struct{}), sealHash if w.skipSealHook != nil && w.skipSealHook(task) { continue } w.pendingMu.Lock() //把该task加入pendingTasks,前文提到过,pending太久的任务会被删除 w.pendingTasks[sealHash] = task w.pendingMu.Unlock() //调用Seal方法开始挖矿,使用不同的nonce去计算,直到找到难度满足的nonce,找到区块后,发送到resultCh这个channel中,resultCh接收到的消息是在上文 //介绍的resultLoop中进行处理的 if err := w.engine.Seal(w.chain, task.block, w.resultCh, stopCh); err != nil { log.Warn("Block sealing failed", "err", err) }
exitCh
退出Loop
整理如下图:
1.2 miner.update()
这个方法比较好理解
func (miner *Miner) update() {
//订阅downloader相关频道
events := miner.mux.Subscribe(downloader.StartEvent{}, downloader.DoneEvent{}, downloader.FailedEvent{})
defer func() {
if !events.Closed() {
events.Unsubscribe()
}
}()
shouldStart := false
canStart := true
dlEventCh := events.Chan()
for {
select {
//接收到downloader频道的消息
case ev := <-dlEventCh:
if ev == nil {
// Unsubscription done, stop listening
dlEventCh = nil
continue
}
switch ev.Data.(type) {
//如果是开始下载的信号,就先同步再挖矿
case downloader.StartEvent:
wasMining := miner.Mining()
miner.worker.stop()
canStart = false
if wasMining {
// Resume mining after sync was finished
shouldStart = true
log.Info("Mining aborted due to sync")
}
//如果是下载失败的信号,就直接开始挖矿
case downloader.FailedEvent:
canStart = true
if shouldStart {
miner.SetEtherbase(miner.coinbase)
miner.worker.start()
}
//如果是下载完成的信号,也直接开始挖矿
case downloader.DoneEvent:
canStart = true
if shouldStart {
miner.SetEtherbase(miner.coinbase)
miner.worker.start()
}
// Stop reacting to downloader events
events.Unsubscribe()
}
//如果是miner.startCh传来了信号,则是开始挖矿的信号,检查条件满足后,调用miner.worker.start()方法
case addr := <-miner.startCh:
miner.SetEtherbase(addr)
if canStart {
miner.worker.start()
}
shouldStart = true
case <-miner.stopCh:
shouldStart = false
miner.worker.stop()
case <-miner.exitCh:
miner.worker.close()
return
}
}
}
miner.worker.start()方法向newWorkLoop中的startCh传递了消息,矿工开始工作
func (w *worker) start() {
atomic.StoreInt32(&w.running, 1)
w.startCh <- struct{}{}
}
2,startNode
func startNode(ctx *cli.Context, stack *node.Node, backend ethapi.Backend) {
...
utils.StartNode(stack)
...
//账户相关操作
}
func StartNode(stack *node.Node) {
...
//如果启动参数上面带有--mine或者是--dev(开发测试模式)的话
if ctx.GlobalBool(utils.MiningEnabledFlag.Name) || ctx.GlobalBool(utils.DeveloperFlag.Name) {
//
...
//调用eth/api_backend.go中的startMining方法
if err := ethBackend.StartMining(threads); err != nil {
utils.Fatalf("Failed to start mining: %v", err)
}
}
}
StartMining则会调用eth/backend.go中的StartMining方法
func (b *EthAPIBackend) StartMining(threads int) error {
return b.eth.StartMining(threads)
}
同样的,在控制台内调用miner.start()方法的时候,也会调用该方法
func (api *PrivateMinerAPI) Start(threads *int) error {
if threads == nil {
return api.e.StartMining(runtime.NumCPU())
}
return api.e.StartMining(*threads)
}
下面看看StartMining方法:
func (s *Ethereum) StartMining(threads int) error {
//设置挖矿线程数量
//设置gasPrice
//设置etherbase
//检查是否是poa
//最后开启新的协程去挖矿
go s.miner.Start(eb)
}
return nil
}
最后start方法很简单,就是把coinbase传递到startCh这个channel中去,还记得这个channel吗?在newWorkLoop中
func (miner *Miner) Start(coinbase common.Address) {
miner.startCh <- coinbase
}
至此以太坊挖矿启动的过程就分析完了,还有一个疑问,在启动了四个loop之后,为什么要调用下面的代码发送消息
if init {
worker.startCh <- struct{}{}
}
debug之后发现,这其实是为了提交一个初始化的pending状态,并且在后续的代码中还会检查work对象中running字段的状态及coinbase是否设置,如果没有设置,是不会启动挖矿的
转载请注明来源