当前位置:首页 > 区块链新闻 > 正文

Cosmos整体流程

来源: 互联网时间:2019-02-26 15:00:01

Cosmos主要的源码其实是在SDK部分,听名字也可以理解出来,直接用这个SDK就可以写出一条不考虑底层的区块链来,但是做为中继链的一个代表,理想和现实并不是那么完美的结合在一起。

目前区块链跨链的难点在于,网络异构、共识算法不兼容等,而解决这些问题,都意味着巨大的投入和风险。Cosmos的目的是想建立一个区块链互联网,所以他把网络和共识抽象出来,专门做了一层。但是这样做的方法,虽然从理论上讲是没有问题的,可开发上难度还是增加了,开发者必须适应新的套路和不同的设计方法,怎么办?

弄个SDK,隔离变化,软件界的通用手段。

一、SDK的架构

看一下架构图:

上图可以看出来,其实SDK就是为了APP服务的,图中的应用程序其实就是给的例子,让大家能快速上手。然后另外两部分一个是和抽象层(共识和P2P)通信的,另外一个是用来调用各种插件的。

SDK从开始到现在,也进行了好几次比较大的改动了,至于今后会不会再有大的改动,也不敢肯定。所以说,做成插件化,是一个最好的选择,到时候看谁不顺眼,直接搞掉就可以了,喜欢谁,把插件接进来就OK。

1、plugins层

在插件层其实图中画的并不是很完全只是一个示意。主要的几个插件包括staking、IBC、 bank、 auth、 governance 、tx、 keys等几个。staking主要是控制Atom持有者相关贡献。类似一个汇率机制,动态变化。IBC其实就是链间通信机制,因为各个通信链是通过插件插入到HUB中进行通信,所以需要一个相应的通信机制来保证通信的安全性。governance这个模块目前在源码中看好像注释了不少,只保留了较少的东西,它主要是治理相关的实现,如提议、投票等。bank其实就是提供了一系列的通信接口(资产转移的),所以叫“银行”。

2、APP层

这一层基本没啥可说的,应该就是客户开发的APP,但是为了能让客户迅速进入,提供了三个相关的Demo。其中Basecoin是第一个完成的,是一个相对完整的应用,实现了SDK的核心模块的扩展,提供了诸如帐户管理、管理交易类型、处理存储等。

其它两个都是相关的扩展。

3、BaseApp

这一层主要是ABCI的通信,和Tendermint进行交互,Cosmos的核心就在这里。

二、源码流程

1、启动流程

从主程序的接口来分析源码:

这里只分析前两步,最后一步等分析Tendermint时再展开分析。

func NewGaiaApp(logger log.Logger, db dbm.DB) *GaiaApp {

cdc := MakeCodec()

// create your application object

//创建一个相关的APP,其它所有的APP都可以按照这个方法

var app = &GaiaApp{

BaseApp: bam.NewBaseApp(appName, cdc, logger, db),

cdc: cdc,

keyMain: sdk.NewKVStoreKey("main"),

keyAccount: sdk.NewKVStoreKey("acc"),

keyIBC: sdk.NewKVStoreKey("ibc"),

keyStake: sdk.NewKVStoreKey("stake"),

keySlashing: sdk.NewKVStoreKey("slashing"),

}

// define the accountMapper

//帐户管理--从KVSTROE抽象

app.accountMapper = auth.NewAccountMapper(

app.cdc,

app.keyAccount, // target store

&auth.BaseAccount{}, // prototype

)

// add handlers

//添加各种操作——它们都从KVSTORE抽象出来,但是它们的抽象度更高,或者可以认为是accountMapper的更高一层。

//处理帐户的操作,再抽象一层

app.coinKeeper = bank.NewKeeper(app.accountMapper)

app.ibcMapper = ibc.NewMapper(app.cdc, app.keyIBC, app.RegisterCodespace(ibc.DefaultCodespace))

//处理Atom

app.stakeKeeper = stake.NewKeeper(app.cdc, app.keyStake, app.coinKeeper, app.RegisterCodespace(stake.DefaultCodespace))

//设置惩罚机制操作者

app.slashingKeeper = slashing.NewKeeper(app.cdc, app.keySlashing, app.stakeKeeper, app.RegisterCodespace(slashing.DefaultCodespace))

// register message routes

//这个是重点,在这里注册路由的句柄

app.Router().

AddRoute("bank", bank.NewHandler(app.coinKeeper)).

AddRoute("ibc", ibc.NewHandler(app.ibcMapper, app.coinKeeper)).

AddRoute("stake", stake.NewHandler(app.stakeKeeper))

// initialize BaseApp

//初始化相关参数

app.SetInitChainer(app.initChainer)

app.SetBeginBlocker(app.BeginBlocker)

app.SetEndBlocker(app.EndBlocker)

//设置权限控制句柄

app.SetAnteHandler(auth.NewAnteHandler(app.accountMapper, app.feeCollectionKeeper))

//从KV数据库加载相关数据--在当前版本中,IVAL存储是KVStore基础的实现

app.MountStoresIAVL(app.keyMain, app.keyAccount, app.keyIBC, app.keyStake, app.keySlashing)

err := app.LoadLatestVersion(app.keyMain)

if err != nil {

cmn.Exit(err.Error())

}

return app

}

// custom tx codec

//将相关的编码器注册到相关的各方

func MakeCodec() *wire.Codec {

var cdc = wire.NewCodec()

ibc.RegisterWire(cdc)

bank.RegisterWire(cdc)

stake.RegisterWire(cdc)

slashing.RegisterWire(cdc)

auth.RegisterWire(cdc)

sdk.RegisterWire(cdc)

wire.RegisterCrypto(cdc)

return cdc

}

//其下为具体的上面的HANDLER的设置

// application updates every end block

func (app *GaiaApp) BeginBlocker(ctx sdk.Context, req abci.RequestBeginBlock) abci.ResponseBeginBlock {

tags := slashing.BeginBlocker(ctx, req, app.slashingKeeper)

return abci.ResponseBeginBlock{

Tags: tags.ToKVPairs(),

}

}

// application updates every end block

func (app *GaiaApp) EndBlocker(ctx sdk.Context, req abci.RequestEndBlock) abci.ResponseEndBlock {

validatorUpdates := stake.EndBlocker(ctx, app.stakeKeeper)

return abci.ResponseEndBlock{

ValidatorUpdates: validatorUpdates,

}

}

// custom logic for gaia initialization

func (app *GaiaApp) initChainer(ctx sdk.Context, req abci.RequestInitChain) abci.ResponseInitChain {

stateJSON := req.AppStateBytes

// TODO is this now the whole genesis file?

var genesisState GenesisState

err := app.cdc.UnmarshalJSON(stateJSON, &genesisState)

if err != nil {

panic(err) // TODO https://github.com/cosmos/cosmos-sdk/issues/468

// return sdk.ErrGenesisParse("").TraceCause(err, "")

}

// load the accounts

for _, gacc := range genesisState.Accounts {

acc := gacc.ToAccount()

app.accountMapper.SetAccount(ctx, acc)

}

// load the initial stake information

stake.InitGenesis(ctx, app.stakeKeeper, genesisState.StakeData)

return abci.ResponseInitChain{}

}

这里面需要说明的是,Mapper和Keeper。记得在写数据库程序的时候,有几种方法,一种是直接连接操作数据库,拿到结果,这种方法最原始,但是权力也最大,想怎么操作就怎么操作。后来有了可以使用封装对象,这样访问数据库就被控制了起来,但是仍然是可以访问很多原始的东西。现在主流的使用的是Mybaits什么的,抽象的更厉害,基本上与你无关的数据,你根本不知道在哪儿了。

Mapper和Keeper就是干这个的,前者抽象度一般,后者更高一些。目的就是限制模块对功能访问的方式。按照最小权限原则来提供访问机制。这样,安全性和不必要的异常的出现就被控制起来,使得应用上更容易扩展。

这里其实主要是governance和slashing,前者主要是控制提议和投票等,后者主要是防止有人做恶,然后从staking中slash掉你的Atom。说白了就是把你的抵押的钱没收。这里顺道说一下这个原则:Atom的持有者可以是验证人也可以是委托人,委托人可以根据他们对验证人的认知和具体的情况将token委托给验证人,验证人即可代理Atom资产并从每个出块奖励中得到大部分,另外有一小部分给委托人,还有一小部分供节点的自运行。而为了保证验证人的诚实,向区块链中发布不正确的数据的恶意验证人会失去他们的Atom。这就叫做slashing。

2、ABCI接口分析

在整个的SDK的流程中,调用ABCI同Tendermint进行通信是一个重要的机制。虽然这篇并不讨论Tendermint,但是相关的ABCI的接口得说明一下,否则在SDK的流程调用中不明白相关的规则,就会导致对整个流程的理解无法正常进行。ABCI有三种消息类型,DeliverTx,CheckTx, Commit。其中DeliverTx和BeginBlock和EndBlock两个接口有关系。

1、InitChain

在上面的流程介绍过app.initChain的方法,它会被Tendermint在启动时调用一次,用来初始化各种相关的Message,比如共识层的参数和最初的验证人的集合数据。当然,肯定还会有决定信息处理的方式。在白皮书中提到,你可以在此处将引进的数据结构进行JSON编码,然后在调用这个函数的期间,对这些信息进行填充并存储。

// Implements ABCI

// InitChain runs the initialization logic directly on the CommitMultiStore and commits it.

func (app *BaseApp) InitChain(req abci.RequestInitChain) (res abci.ResponseInitChain) {

if app.initChainer == nil {

return

}

// Initialize the deliver state and run initChain

app.setDeliverState(abci.Header{})

app.initChainer(app.deliverState.ctx, req) // no error

// NOTE: we don't commit, but BeginBlock for block 1

// starts from this deliverState

return

}

func (app *BaseApp) setDeliverState(header abci.Header) {

ms := app.cms.CacheMultiStore()

app.deliverState = &state{

ms: ms,

ctx: sdk.NewContext(ms, header, false, nil, app.Logger),

}

}

当这些信息被正确的处理后,比如是一个帐户相关的信息,那么就可以使用它来进行交易的处理了。

2、BeginBlock

在上面提到过Tendermint的三种消息,其中的交易处理消息DeliverTx,它就是在区块开始被调用前,在这个接口中处理验证人签名的信息。如果大家写过数据库的底层操作,这个东西应该和它非常类似,不外乎是Begin准备,End结束,清扫资源。不过使用它的时候也需要注意,它和其它的相类似的操作一样,在这两个函数的处理过程中,不应该包含过多的和过于复杂的操作,导致整个消息的阻塞。

如果在这二者中出现了不合理的循环等,就有可能导致应用程序APP的假死。

// application updates every end block

func (app *GaiaApp) BeginBlocker(ctx sdk.Context, req abci.RequestBeginBlock) abci.ResponseBeginBlock {

tags := slashing.BeginBlocker(ctx, req, app.slashingKeeper)

return abci.ResponseBeginBlock{

Tags: tags.ToKVPairs(),

}

}

// slashing begin block functionality

func BeginBlocker(ctx sdk.Context, req abci.RequestBeginBlock, sk Keeper) (tags sdk.Tags) {

// Tag the height

heightBytes := make([]byte, 8)

binary.LittleEndian.PutUint64(heightBytes, uint64(req.Header.Height))

tags = sdk.NewTags("height", heightBytes)

// Deal with any equivocation evidence

for _, evidence := range req.ByzantineValidators {

pk, err := tmtypes.PB2TM.PubKey(evidence.Validator.PubKey)

if err != nil {

panic(err)

}

switch string(evidence.Type) {

case tmtypes.ABCIEvidenceTypeDuplicateVote:

//处理验证器在同一高度签名两个块

sk.handleDoubleSign(ctx, evidence.Height, evidence.Time, pk)

default:

ctx.Logger().With("module", "x/slashing").Error(fmt.Sprintf("Ignored unknown evidence type: %s", string(evidence.Type)))

}

}

// Iterate over all the validators which *should* have signed this block

for _, validator := range req.Validators {

present := validator.SignedLastBlock

pubkey, err := tmtypes.PB2TM.PubKey(validator.Validator.PubKey)

if err != nil {

panic(err)

}

sk.handleValidatorSignature(ctx, pubkey, present)

}

return

}

3、EndBlock

响应上一个函数接口,在DeliverTx消息处理完成所有的交易后调用,主要用来对验证人集合的结果进行维护。

// Implements ABCI

func (app *BaseApp) EndBlock(req abci.RequestEndBlock) (res abci.ResponseEndBlock) {

if app.endBlocker != nil {

res = app.endBlocker(app.deliverState.ctx, req)

} else {

res.ValidatorUpdates = app.valUpdates

}

return

}

4、Commit

当处理完成交易后,应该把完成的交易从内存持久化到硬盘上,并根据创建返回被下一个Tendermint区块需要的默克尔树的Root哈希值。这个哈希值 的作用在区块链中基本是一样的,用来验证合法性。

// Implements ABCI

func (app *BaseApp) Commit() (res abci.ResponseCommit) {

header := app.deliverState.ctx.BlockHeader()

/*

// Write the latest Header to the store

headerBytes, err := proto.Marshal(&header)

if err != nil {

panic(err)

}

app.db.SetSync(dbHeaderKey, headerBytes)

*/

// Write the Deliver state and commit the MultiStore

app.deliverState.ms.Write()

commitID := app.cms.Commit()

app.Logger.Debug("Commit synced",

"commit", commitID,

)

// Reset the Check state to the latest committed

// NOTE: safe because Tendermint holds a lock on the mempool for Commit.

// Use the header from this latest block.

app.setCheckState(header)

// Empty the Deliver state

app.deliverState = nil

return abci.ResponseCommit{

Data: commitID.Hash,

}

}

5、Query

这个就不多说了吧,你总得给别人一个看一看的机会。

// Implements ABCI.

// Delegates to CommitMultiStore if it implements Queryable

func (app *BaseApp) Query(req abci.RequestQuery) (res abci.ResponseQuery) {

path := strings.Split(req.Path, "/")

// first element is empty string

if len(path) > 0 && path[0] == "" {

path = path[1:]

}

// "/app" prefix for special application queries

if len(path) >= 2 && path[0] == "app" {

var result sdk.Result

switch path[1] {

case "simulate":

txBytes := req.Data

tx, err := app.txDecoder(txBytes)

if err != nil {

result = err.Result()

} else {

result = app.Simulate(tx)

}

default:

result = sdk.ErrUnknownRequest(fmt.Sprintf("Unknown query: %s", path)).Result()

}

value := app.cdc.MustMarshalBinary(result)

return abci.ResponseQuery{

Code: uint32(sdk.ABCICodeOK),

Value: value,

}

}

// "/store" prefix for store queries

if len(path) >= 1 && path[0] == "store" {

queryable, ok := app.cms.(sdk.Queryable)

if !ok {

msg := "multistore doesn't support queries"

return sdk.ErrUnknownRequest(msg).QueryResult()

}

req.Path = "/" + strings.Join(path[1:], "/")

return queryable.Query(req)

}

// "/p2p" prefix for p2p queries

if len(path) >= 4 && path[0] == "p2p" {

if path[1] == "filter" {

if path[2] == "addr" {

return app.FilterPeerByAddrPort(path[3])

}

if path[2] == "pubkey" {

return app.FilterPeerByPubKey(path[3])

}

}

}

msg := "unknown query path"

return sdk.ErrUnknownRequest(msg).QueryResult()

}

6、CheckTx

所有的拥有交易池的区块链,基本上在进池前后都要搞一些事情,包括对各种合法性的检查,目的只有一个,防止千辛万苦才生产出来的区块打包一些没用的交易。在Cosmos中也会有这种手段,在前面提到过AnteHandler,通过其对发送者授权,确定在交易前有足够的手续费,不过它和以太坊有些类似,如果交易失败,这笔费用仍然没有了,收不回去。

// Implements ABCI

func (app *BaseApp) CheckTx(txBytes []byte) (res abci.ResponseCheckTx) {

// Decode the Tx.

var result sdk.Result

var tx, err = app.txDecoder(txBytes)

if err != nil {

result = err.Result()

} else {

result = app.runTx(runTxModeCheck, txBytes, tx)

}

return abci.ResponseCheckTx{

Code: uint32(result.Code),

Data: result.Data,

Log: result.Log,

GasWanted: result.GasWanted,

GasUsed: result.GasUsed,

Fee: cmn.KI64Pair{

[]byte(result.FeeDenom),

result.FeeAmount,

},

Tags: result.Tags,

}

}

3、IBC通信源码

在前面的代码中初始化时需要对路由进行注册,在这里同样会有路由的实际注册过程,先看一看提供的命令处理方式:

// IBC transfer command

func IBCTransferCmd(cdc *wire.Codec) *cobra.Command {

cmd := &cobra.Command{

Use: "transfer",

RunE: func(cmd *cobra.Command, args []string) error {

ctx := context.NewCoreContextFromViper().WithDecoder(authcmd.GetAccountDecoder(cdc))

// get the from address

from, err := ctx.GetFromAddress()

if err != nil {

return err

}

// build the message

msg, err := buildMsg(from)

if err != nil {

return err

}

// get password

res, err := ctx.EnsureSignBuildBroadcast(ctx.FromAddressName, msg, cdc)

if err != nil {

return err

}

fmt.Printf("Committed at block %d. Hash: %s\n", res.Height, res.Hash.String())

return nil

},

}

cmd.Flags().String(flagTo, "", "Address to send coins")

cmd.Flags().String(flagAmount, "", "Amount of coins to send")

cmd.Flags().String(flagChain, "", "Destination chain to send coins")

return cmd

}

处理传输命令,进入中继环节处理:

// flags--代表从一个空间转向另外一个窠

const (

FlagFromChainID = "from-chain-id"

FlagFromChainNode = "from-chain-node"

FlagToChainID = "to-chain-id"

FlagToChainNode = "to-chain-node"

)

type relayCommander struct {

cdc *wire.Codec

address sdk.Address

decoder auth.AccountDecoder

mainStore string

ibcStore string

accStore string

logger log.Logger

}

// IBC relay command

func IBCRelayCmd(cdc *wire.Codec) *cobra.Command {

cmdr := relayCommander{

cdc: cdc,

decoder: authcmd.GetAccountDecoder(cdc),

ibcStore: "ibc",

mainStore: "main",

accStore: "acc",

logger: log.NewTMLogger(log.NewSyncWriter(os.Stdout)),

}

cmd := &cobra.Command{

Use: "relay",

Run: cmdr.runIBCRelay,

}

cmd.Flags().String(FlagFromChainID, "", "Chain ID for ibc node to check outgoing packets")

cmd.Flags().String(FlagFromChainNode, "tcp://localhost:46657", "<host>:<port> to tendermint rpc interface for this chain")

cmd.Flags().String(FlagToChainID, "", "Chain ID for ibc node to broadcast incoming packets")

cmd.Flags().String(FlagToChainNode, "tcp://localhost:36657", "<host>:<port> to tendermint rpc interface for this chain")

cmd.MarkFlagRequired(FlagFromChainID)

cmd.MarkFlagRequired(FlagFromChainNode)

cmd.MarkFlagRequired(FlagToChainID)

cmd.MarkFlagRequired(FlagToChainNode)

viper.BindPFlag(FlagFromChainID, cmd.Flags().Lookup(FlagFromChainID))

viper.BindPFlag(FlagFromChainNode, cmd.Flags().Lookup(FlagFromChainNode))

viper.BindPFlag(FlagToChainID, cmd.Flags().Lookup(FlagToChainID))

viper.BindPFlag(FlagToChainNode, cmd.Flags().Lookup(FlagToChainNode))

return cmd

}

//启动遍历监听

func (c relayCommander) runIBCRelay(cmd *cobra.Command, args []string) {

fromChainID := viper.GetString(FlagFromChainID)

fromChainNode := viper.GetString(FlagFromChainNode)

toChainID := viper.GetString(FlagToChainID)

toChainNode := viper.GetString(FlagToChainNode)

address, err := context.NewCoreContextFromViper().GetFromAddress()

if err != nil {

panic(err)

}

c.address = address

c.loop(fromChainID, fromChainNode, toChainID, toChainNode)

}

func (c relayCommander) loop(fromChainID, fromChainNode, toChainID,

toChainNode string) {

ctx := context.NewCoreContextFromViper()

// get password

passphrase, err := ctx.GetPassphraseFromStdin(ctx.FromAddressName)

if err != nil {

panic(err)

}

ingressKey := ibc.IngressSequenceKey(fromChainID)

OUTER:

for {

time.Sleep(5 * time.Second)

processedbz, err := query(toChainNode, ingressKey, c.ibcStore)

if err != nil {

panic(err)

}

var processed int64

if processedbz == nil {

processed = 0

} else if err = c.cdc.UnmarshalBinary(processedbz, &processed); err != nil {

panic(err)

}

lengthKey := ibc.EgressLengthKey(toChainID)

egressLengthbz, err := query(fromChainNode, lengthKey, c.ibcStore)

if err != nil {

c.logger.Error("Error querying outgoing packet list length", "err", err)

continue OUTER //TODO replace with continue (I think it should just to the correct place where OUTER is now)

}

var egressLength int64

if egressLengthbz == nil {

egressLength = 0

} else if err = c.cdc.UnmarshalBinary(egressLengthbz, &egressLength); err != nil {

panic(err)

}

if egressLength > processed {

c.logger.Info("Detected IBC packet", "number", egressLength-1)

}

seq := c.getSequence(toChainNode)

for i := processed; i < egressLength; i++ {

egressbz, err := query(fromChainNode, ibc.EgressKey(toChainID, i), c.ibcStore)

if err != nil {

c.logger.Error("Error querying egress packet", "err", err)

continue OUTER // TODO replace to break, will break first loop then send back to the beginning (aka OUTER)

}

err = c.broadcastTx(seq, toChainNode, c.refine(egressbz, i, passphrase))

seq++

if err != nil {

c.logger.Error("Error broadcasting ingress packet", "err", err)

continue OUTER // TODO replace to break, will break first loop then send back to the beginning (aka OUTER)

}

c.logger.Info("Relayed IBC packet", "number", i)

}

}

}

func (c relayCommander) broadcastTx(seq int64, node string, tx []byte) error {

_, err := context.NewCoreContextFromViper().WithNodeURI(node).WithSequence(seq + 1).BroadcastTx(tx)

return err

}

//处理接收的消息

func (c relayCommander) refine(bz []byte, sequence int64, passphrase string) []byte {

var packet ibc.IBCPacket

if err := c.cdc.UnmarshalBinary(bz, &packet); err != nil {

panic(err)

}

msg := ibc.IBCReceiveMsg{

IBCPacket: packet,

Relayer: c.address,

Sequence: sequence,

}

ctx := context.NewCoreContextFromViper().WithSequence(sequence)

res, err := ctx.SignAndBuild(ctx.FromAddressName, passphrase, msg, c.cdc)

if err != nil {

panic(err)

}

return res

}

通过一个中继节点来监听两条不同的链,进行消息的路由注册来达到自动跨链交易,Cosmos提供的这个方式还是比较不错的。至少,不用自己再犯愁怎么做。但是这个有一个前提,需要注册一下:

// RegisterRoutes - Central function to define routes that get registered by the main application

func RegisterRoutes(ctx context.CoreContext, r *mux.Router, cdc *wire.Codec, kb keys.Keybase) {

r.HandleFunc("/ibc/{destchain}/{address}/send", TransferRequestHandlerFn(cdc, kb, ctx)).Methods("POST")

}

三、总结

通过上面的代码分析,可以看出,ABCI和IBC两个模块,是运行整个Cosmos的一个基础。Cosmos-SDK把这几个模块有机的抽象到一起,并提供了基础的交易、通信等功能。新的区块链可以从它上面调用或者继承Example中的例程,只关心区块链功能的开发,短时间内就可以方便的开发出一条公链来。

免责声明:

1.本文内容综合整理自互联网,观点仅代表作者本人,不代表本站立场。

2.资讯内容不构成投资建议,投资者应独立决策并自行承担风险。

你可能感兴趣

    error