以太坊1.9挖矿启动过程源码阅读

之前阅读过以太坊1.7.x版本的源码,现在升级到了1.9.25,很多源码都做了很大的改动,网上还没有相关的源码阅读,因此决定自己分享一下。

本文介绍挖矿相关的源码,其余部分不做赘述。

geth启动时,进入的是cmd/geth/main.go中的main方法,执行main方法之前会先执行init方法,init方法中加载了很多命令行参数,关于命令行参数的介绍:Go语言命令行库urfave/cli简介

启动geth时,如果没有加console参数,会执行main.go中的geth方法:

func geth(ctx *cli.Context) error {
   if args := ctx.Args(); len(args) > 0 {
      return fmt.Errorf("invalid command: %q", args[0])
   }

   prepare(ctx)//检查运行环境及机器配置等
   stack, backend := makeFullNode(ctx)//stack就是node实例,backend是service
   defer stack.Close()

   startNode(ctx, stack, backend)
   stack.Wait()
   return nil
}

如果加了console参数,会执行

cmd/geth/consolecmd.go中的localConsole方法:

func localConsole(ctx *cli.Context) error {
   // Create and start the node based on the CLI flags
   prepare(ctx)//检查运行环境及机器配置等
   stack, backend := makeFullNode(ctx)
   startNode(ctx, stack, backend)
   defer stack.Close()
    
    .....
}

两处地方都是先执行makeFullNode方法,再执行startNode方法。

1,makeFullNode

func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) {
   stack, cfg := makeConfigNode(ctx)//配置节点,配置级别:启动参数>配置文件>默认配置

   backend := utils.RegisterEthService(stack, &cfg.Eth)//注册ethService
 
 ...
}

重点关注一下RegisterEthService方法中的New方法,位于eth/backend.go:

func RegisterEthService(stack *node.Node, cfg *eth.Config) ethapi.Backend {
   ...
   backend, err := eth.New(stack, cfg)
   ...
   return backend.APIBackend
}
func New(stack *node.Node, config *Config) (*Ethereum, error) {

   // 设置轻节点及同步模式
   // 设置gasPrice
   // 设置快照缓存
   // 设置创世块SetupGenesisBlock
   // 初始化Ethereum对象等等

   ...
   
   // 创建矿工,创建完成后等待启动停止信号
   eth.miner = miner.New(eth, &config.Miner, chainConfig, eth.EventMux(), eth.engine, eth.isLocalBlock)
   eth.miner.SetExtra(makeExtraData(config.Miner.ExtraData))

   ...
}

进入miner.New方法

func New(eth Backend, config *Config, chainConfig *params.ChainConfig, mux *event.TypeMux, engine consensus.Engine, isLocalBlock func(block *types.Block) bool) *Miner {
   miner := &Miner{
      eth:     eth,
      mux:     mux,
      engine:  engine,
      exitCh:  make(chan struct{}),
      startCh: make(chan common.Address),
      stopCh:  make(chan struct{}),
      worker:  newWorker(config, chainConfig, engine, eth, mux, isLocalBlock, true),
   }
   go miner.update()

   return miner
}

1.1 newWorker

位于miner/worker.go

func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux, isLocalBlock func(*types.Block) bool, init bool) *worker {
    //初始化worker对象
   worker := &worker{
      config:             config,
      chainConfig:        chainConfig,
      engine:             engine,
      eth:                eth,
      mux:                mux,
      chain:              eth.BlockChain(),
      isLocalBlock:       isLocalBlock,
      localUncles:        make(map[common.Hash]*types.Block),
      remoteUncles:       make(map[common.Hash]*types.Block),
      unconfirmed:        newUnconfirmedBlocks(eth.BlockChain(), miningLogAtDepth),
      pendingTasks:       make(map[common.Hash]*task),
      txsCh:              make(chan core.NewTxsEvent, txChanSize),
      chainHeadCh:        make(chan core.ChainHeadEvent, chainHeadChanSize),
      chainSideCh:        make(chan core.ChainSideEvent, chainSideChanSize),
      newWorkCh:          make(chan *newWorkReq),
      taskCh:             make(chan *task),
      resultCh:           make(chan *types.Block, resultQueueSize),
      exitCh:             make(chan struct{}),
      startCh:            make(chan struct{}, 1),
      resubmitIntervalCh: make(chan time.Duration),
      resubmitAdjustCh:   make(chan *intervalAdjust, resubmitAdjustChanSize),
   }
   ...
   //重点关注这几个loop
   go worker.mainLoop()
   go worker.newWorkLoop(recommit)
   go worker.resultLoop()
   go worker.taskLoop()

   // 提交一个初始化的pending状态
   if init {
      worker.startCh <- struct{}{}
   }
   return worker
}

四个loop都是开启无限循环等待指示,接收到不同channel发送过来的命令会执行不同的操作

这些channel都定义在miner/worker.go中的type worker struct {}

1.1.1 mainLoop()监听的channel
newWorkCh
chainSideCh
txsCh
exitCh
txsSub.Err()
chainHeadSub.Err()
chainSideSub.Err()
  • newWorkCh
case req := <-w.newWorkCh:
   w.commitNewWork(req.interrupt, req.noempty, req.timestamp)

执行commitNewWork方法

func (w *worker) commitNewWork(interrupt *int32, noempty bool, timestamp int64) {
  ...
  //检查系统时间和当前区块时间差异
  //新建一个区块头,填入父区块号,区块高度,gasLimit,时间戳等
  //检查coinbase是否存在
  
  //Prepare方法是consenus包下面定义的Engine的接口,由consenus/ethash和clique包分别实现其所有方法,包含Prepare方法
  //ethash实现的是pos算法,clique实现的是poa算法,poa算法主要运行在测试网上,之所以不在测试网也使用pos是因为测试网算力不足,pos会遭到攻击
  //本文主要分析ethash算法的挖矿过程,除了特别说明,下文所有介绍的Engine接口的实现算法都是ethash算法
  //Prepare方法是计算当前区块所需要的难度值,即header.Difficulty,方便后续计算hash的时候做比较(pos算法精髓)
      if err := w.engine.Prepare(w.chain, header); err != nil {
        log.Error("Failed to prepare header for mining", "err", err)
        return
    }
   //检查是否有DAO硬分叉
   //组装叔块,叔块不能距离当前块的高度超过7
   //组装交易池中的交易
   
   //组装成区块并提交新交易
   w.commit(uncles, w.fullTaskHook, true, tstart)
}

commit方法

func (w *worker) commit(uncles []*types.Header, interval func(), update bool, start time.Time) error {
   ...
   //FinalizeAndAssemble也是ethash包中的方法,根据传入参数组装成区块
   block, err := w.engine.FinalizeAndAssemble(w.chain, w.current.header, s, w.current.txs, uncles, receipts)
   ...
      //将组装好的区块和数据发送给taskCh这个channel,该channel是在taskLoop中监听的channel,详见下文
      case w.taskCh <- &task{receipts: receipts, state: s, block: block, createdAt: time.Now()}:
         w.unconfirmed.Shift(block.NumberU64() - 1)
         log.Info("Commit new mining work", "number", block.Number(), "sealhash", w.engine.SealHash(block.Header()),
            "uncles", len(uncles), "txs", w.current.tcount,
            "gas", block.GasUsed(), "fees", totalFees(block, receipts),
            "elapsed", common.PrettyDuration(time.Since(start)))
    ...
}
  • chainSideCh

    这个channel仅是用于测试环境,不再分析

  • txsCh

    接收交易的channel,接收到交易后,如果共识引擎没有工作,就把交易提交到交易池中。如果共识引擎没在工作,但当前是dev环境或者poa共识,那么也会调用commitNewWork方法参与出块

  • exitCh

    接收到退出信号,停止mainLoop循环

  • txsSub.Err()

    订阅交易错误,停止mainLoop循环

  • chainHeadSub.Err()

    订阅区块头错误,停止mainLoop循环

  • chainSideSub.Err()

    订阅ChainSide错误,停止mainLoop循环

1.1.2 newWorkLoop()监听的channel
startCh
chainHeadCh
timer.C
resubmitIntervalCh
resubmitAdjustCh
exitCh

先看看commit和clearPending两个方法:

//中止正在执行的事务,然后重新提交一个新的任务,向newWorkCh通道发消息,mainLoop接收到消息开始工作
commit := func(noempty bool, s int32) {
   if interrupt != nil {
      atomic.StoreInt32(interrupt, s)
   }
   interrupt = new(int32)//0
   w.newWorkCh <- &newWorkReq{interrupt: interrupt, noempty: noempty, timestamp: timestamp}
   timer.Reset(recommit)
   atomic.StoreInt32(&w.newTxs, 0)
}
// 清除旧的pending状态的任务,pendingTask是做什么用的会在taskLoop中说到
clearPending := func(number uint64) {
   w.pendingMu.Lock()
   for h, t := range w.pendingTasks {
      if t.block.NumberU64()+staleThreshold <= number {
         delete(w.pendingTasks, h)
      }
   }
   w.pendingMu.Unlock()
}
  • startCh & chainHeadCh

    都是先执行clearPending再执行commit,向mainLoop传递newWork消息

  • timer.C

    每隔一段时间T(不能低于1秒),检查是否有新交易,如果有新交易,就把新交易也重新统计,方便找到更高手续费的交易,调用commit方法。如果没有新交易,则继续循环

  • resubmitIntervalCh

    用于设置T的大小,即间隔多久会检查新交易,最小不能低于1秒,是通过api中的miner.SetRecommitInterval调用的

  • resubmitAdjustCh

    用于增加或者减少矿工重新提交的时间间隔

  • exitCh

    退出Loop

1.1.3 resultLoop()监听的channel
resultCh
exitCh
  • resultCh
    case block := <-w.resultCh:
       // Short circuit when receiving empty result.
       if block == nil {
          continue
       }
       // Short circuit when receiving duplicate result caused by resubmitting.
       if w.chain.HasBlock(block.Hash(), block.NumberU64()) {
          continue
       }
       var (
          sealhash = w.engine.SealHash(block.Header())
          hash     = block.Hash()
       )
       w.pendingMu.RLock()
       task, exist := w.pendingTasks[sealhash]
       w.pendingMu.RUnlock()
       if !exist {
          log.Error("Block found but no relative pending task", "number", block.Number(), "sealhash", sealhash, "hash", hash)
          continue
       }
       // Different block could share same sealhash, deep copy here to prevent write-write conflict.
       var (
          receipts = make([]*types.Receipt, len(task.receipts))
          logs     []*types.Log
       )
       for i, receipt := range task.receipts {
          // add block location fields
          receipt.BlockHash = hash
          receipt.BlockNumber = block.Number()
          receipt.TransactionIndex = uint(i)
    
          receipts[i] = new(types.Receipt)
          *receipts[i] = *receipt
          // Update the block hash in all logs since it is now available and not when the
          // receipt/log of individual transactions were created.
          for _, log := range receipt.Logs {
             log.BlockHash = hash
          }
          logs = append(logs, receipt.Logs...)
       }
       // Commit block and state to database.
       _, err := w.chain.WriteBlockWithState(block, receipts, logs, task.state, true)
       if err != nil {
          log.Error("Failed writing block to chain", "err", err)
          continue
       }
       log.Info("Successfully sealed new block", "number", block.Number(), "sealhash", sealhash, "hash", hash,
          "elapsed", common.PrettyDuration(time.Since(task.createdAt)))
    
       // Broadcast the block and announce chain insertion event
       w.mux.Post(core.NewMinedBlockEvent{Block: block})
    
       // Insert the block into the set of pending ones to resultLoop for confirmations
       w.unconfirmed.Insert(block.NumberU64(), block.Hash())
    

    主要做了4件事:

    1,校验收到区块的hash是否在pendingTask内

    2,尝试写入本地账本、更改世界状态

    3,广播出去

    4,把该块加入待确认的块中,等待确认

  • exitCh

    退出Loop

1.1.4 taskLoop()监听的channel
taskCh
exitCh
  • taskCh

    taskCh接收到的消息,即为后续需要执行的task

    case task := <-w.taskCh:
       if w.newTaskHook != nil {
          w.newTaskHook(task)
       }
       // 先调用SealHash方法,返回一个区块hash
       sealHash := w.engine.SealHash(task.block.Header())
       if sealHash == prev {
          continue
       }
       // Interrupt previous sealing operation
       interrupt()
       stopCh, prev = make(chan struct{}), sealHash
    
       if w.skipSealHook != nil && w.skipSealHook(task) {
          continue
       }
       w.pendingMu.Lock()
       
       //把该task加入pendingTasks,前文提到过,pending太久的任务会被删除
       w.pendingTasks[sealHash] = task
       w.pendingMu.Unlock()
       
       //调用Seal方法开始挖矿,使用不同的nonce去计算,直到找到难度满足的nonce,找到区块后,发送到resultCh这个channel中,resultCh接收到的消息是在上文
       //介绍的resultLoop中进行处理的
       if err := w.engine.Seal(w.chain, task.block, w.resultCh, stopCh); err != nil {
          log.Warn("Block sealing failed", "err", err)
       }
    
  • exitCh

退出Loop

整理如下图:

1.2 miner.update()

这个方法比较好理解

func (miner *Miner) update() {
   //订阅downloader相关频道
   events := miner.mux.Subscribe(downloader.StartEvent{}, downloader.DoneEvent{}, downloader.FailedEvent{})
   defer func() {
      if !events.Closed() {
         events.Unsubscribe()
      }
   }()

   shouldStart := false
   canStart := true
   dlEventCh := events.Chan()
   for {
      select {
      //接收到downloader频道的消息
      case ev := <-dlEventCh:
         if ev == nil {
            // Unsubscription done, stop listening
            dlEventCh = nil
            continue
         }
         switch ev.Data.(type) {
         //如果是开始下载的信号,就先同步再挖矿
         case downloader.StartEvent:
            wasMining := miner.Mining()
            miner.worker.stop()
            canStart = false
            if wasMining {
               // Resume mining after sync was finished
               shouldStart = true
               log.Info("Mining aborted due to sync")
            }
         //如果是下载失败的信号,就直接开始挖矿
         case downloader.FailedEvent:
            canStart = true
            if shouldStart {
               miner.SetEtherbase(miner.coinbase)
               miner.worker.start()
            }
         //如果是下载完成的信号,也直接开始挖矿
         case downloader.DoneEvent:
            canStart = true
            if shouldStart {
               miner.SetEtherbase(miner.coinbase)
               miner.worker.start()
            }
            // Stop reacting to downloader events
            events.Unsubscribe()
         }
      //如果是miner.startCh传来了信号,则是开始挖矿的信号,检查条件满足后,调用miner.worker.start()方法
      case addr := <-miner.startCh:
         miner.SetEtherbase(addr)
         if canStart {
            miner.worker.start()
         }
         shouldStart = true
      case <-miner.stopCh:
         shouldStart = false
         miner.worker.stop()
      case <-miner.exitCh:
         miner.worker.close()
         return
      }
   }
}

miner.worker.start()方法向newWorkLoop中的startCh传递了消息,矿工开始工作

func (w *worker) start() {
   atomic.StoreInt32(&w.running, 1)
   w.startCh <- struct{}{}
}

2,startNode

func startNode(ctx *cli.Context, stack *node.Node, backend ethapi.Backend) {
   ...
   utils.StartNode(stack) 
   ...
   //账户相关操作
}   
func StartNode(stack *node.Node) {
   ...
   //如果启动参数上面带有--mine或者是--dev(开发测试模式)的话
   if ctx.GlobalBool(utils.MiningEnabledFlag.Name) || ctx.GlobalBool(utils.DeveloperFlag.Name) {
       //
       ...
       //调用eth/api_backend.go中的startMining方法
       if err := ethBackend.StartMining(threads); err != nil {
            utils.Fatalf("Failed to start mining: %v", err)
        }
   }
}   

StartMining则会调用eth/backend.go中的StartMining方法

func (b *EthAPIBackend) StartMining(threads int) error {
   return b.eth.StartMining(threads)
}

同样的,在控制台内调用miner.start()方法的时候,也会调用该方法

func (api *PrivateMinerAPI) Start(threads *int) error {
   if threads == nil {
      return api.e.StartMining(runtime.NumCPU())
   }
   return api.e.StartMining(*threads)
}

下面看看StartMining方法:

func (s *Ethereum) StartMining(threads int) error {
  //设置挖矿线程数量
  //设置gasPrice
  //设置etherbase
  //检查是否是poa
  //最后开启新的协程去挖矿
      go s.miner.Start(eb)
   }
   return nil
}

最后start方法很简单,就是把coinbase传递到startCh这个channel中去,还记得这个channel吗?在newWorkLoop中

func (miner *Miner) Start(coinbase common.Address) {
   miner.startCh <- coinbase
}

至此以太坊挖矿启动的过程就分析完了,还有一个疑问,在启动了四个loop之后,为什么要调用下面的代码发送消息

if init {
      worker.startCh <- struct{}{}
   }

debug之后发现,这其实是为了提交一个初始化的pending状态,并且在后续的代码中还会检查work对象中running字段的状态及coinbase是否设置,如果没有设置,是不会启动挖矿的


转载请注明来源

×

喜欢就点赞,疼爱就打赏