Commit 143311fe authored by liuyuhang's avatar liuyuhang

modify

parent 28de2d98
...@@ -131,7 +131,7 @@ returnAddr="1KcCVZLSQYRUwE5EXTsAoQs9LuJW6xwfQa" ...@@ -131,7 +131,7 @@ returnAddr="1KcCVZLSQYRUwE5EXTsAoQs9LuJW6xwfQa"
count=10000 count=10000
[store] [store]
name="mavl" name="kvmvccmavl"
driver="leveldb" driver="leveldb"
dbPath="datadir/mavltree" dbPath="datadir/mavltree"
dbCache=128 dbCache=128
......
...@@ -1092,6 +1092,31 @@ func (bs *BlockStore) SetUpgradeMeta(meta *types.UpgradeMeta) error { ...@@ -1092,6 +1092,31 @@ func (bs *BlockStore) SetUpgradeMeta(meta *types.UpgradeMeta) error {
return bs.db.SetSync(version.LocalDBMeta, verByte) return bs.db.SetSync(version.LocalDBMeta, verByte)
} }
//SetStoreUpgradeMeta 获取存在blockchain中的Store的数据库版本号
func (bs *BlockStore) GetStoreUpgradeMeta() (*types.UpgradeMeta, error) {
ver := types.UpgradeMeta{}
version, err := bs.db.Get(version.StoreDBMeta)
if err != nil && err != dbm.ErrNotFoundInDb {
return nil, err
}
if len(version) == 0 {
return &types.UpgradeMeta{Version: "0.0.0"}, nil
}
err = types.Decode(version, &ver)
if err != nil {
return nil, err
}
storeLog.Info("GetStoreUpgradeMeta", "blockchain db version", ver)
return &ver, nil
}
//SetStoreUpgradeMeta 设置blockchain中的Store的数据库版本号
func (bs *BlockStore) SetStoreUpgradeMeta(meta *types.UpgradeMeta) error {
verByte := types.Encode(meta)
storeLog.Info("SetStoreUpgradeMeta", "meta", meta)
return bs.db.SetSync(version.StoreDBMeta, verByte)
}
//isRecordBlockSequence配置的合法性检测 //isRecordBlockSequence配置的合法性检测
func (bs *BlockStore) isRecordBlockSequenceValid(chain *BlockChain) { func (bs *BlockStore) isRecordBlockSequenceValid(chain *BlockChain) {
lastHeight := bs.Height() lastHeight := bs.Height()
......
...@@ -128,7 +128,6 @@ func TestBlockChain(t *testing.T) { ...@@ -128,7 +128,6 @@ func TestBlockChain(t *testing.T) {
testWriteBlockToDbTemp(t, blockchain) testWriteBlockToDbTemp(t, blockchain)
testReadBlockToExec(t, blockchain) testReadBlockToExec(t, blockchain)
testReExecBlock(t, blockchain) testReExecBlock(t, blockchain)
testReExecBlockMsg(t, mock33, blockchain)
} }
func testProcAddBlockMsg(t *testing.T, mock33 *testnode.Chain33Mock, blockchain *blockchain.BlockChain) { func testProcAddBlockMsg(t *testing.T, mock33 *testnode.Chain33Mock, blockchain *blockchain.BlockChain) {
...@@ -1115,17 +1114,6 @@ func testWriteBlockToDbTemp(t *testing.T, chain *blockchain.BlockChain) { ...@@ -1115,17 +1114,6 @@ func testWriteBlockToDbTemp(t *testing.T, chain *blockchain.BlockChain) {
func testReExecBlock(t *testing.T, chain *blockchain.BlockChain) { func testReExecBlock(t *testing.T, chain *blockchain.BlockChain) {
chainlog.Info("ReExecBlock begin ---------------------") chainlog.Info("ReExecBlock begin ---------------------")
curheight := chain.GetBlockHeight() curheight := chain.GetBlockHeight()
chain.ProcessReExecBlock(0, curheight) chain.ReExecBlock(0, curheight)
chainlog.Info("ReExecBlock end ---------------------") chainlog.Info("ReExecBlock end ---------------------")
} }
func testReExecBlockMsg(t *testing.T, mock33 *testnode.Chain33Mock, chain *blockchain.BlockChain) {
var err error
client := mock33.GetClient()
msg1 := client.NewMessage("blockchain", types.EventReExecBlock, &types.ReqInt{Height: 8})
err = client.Send(msg1, true)
require.NoError(t, err)
_, err = client.Wait(msg1)
require.NoError(t, err)
time.Sleep(time.Millisecond * 20)
}
...@@ -90,8 +90,6 @@ func (chain *BlockChain) ProcRecvMsg() { ...@@ -90,8 +90,6 @@ func (chain *BlockChain) ProcRecvMsg() {
case types.EventGetSeqCBLastNum: case types.EventGetSeqCBLastNum:
go chain.processMsg(msg, reqnum, chain.getSeqCBLastNum) go chain.processMsg(msg, reqnum, chain.getSeqCBLastNum)
case types.EventReExecBlock:
go chain.processMsg(msg, reqnum, chain.reExecBlock)
default: default:
go chain.processMsg(msg, reqnum, chain.unknowMsg) go chain.processMsg(msg, reqnum, chain.unknowMsg)
} }
...@@ -135,17 +133,6 @@ func (chain *BlockChain) getSeqCBLastNum(msg *queue.Message) { ...@@ -135,17 +133,6 @@ func (chain *BlockChain) getSeqCBLastNum(msg *queue.Message) {
msg.Reply(chain.client.NewMessage("rpc", types.EventGetSeqCBLastNum, lastNum)) msg.Reply(chain.client.NewMessage("rpc", types.EventGetSeqCBLastNum, lastNum))
} }
func (chain *BlockChain) reExecBlock(msg *queue.Message) {
data := (msg.Data).(*types.ReqInt)
curHeight := chain.GetBlockHeight()
if curHeight < data.Height {
msg.Reply(chain.client.NewMessage("store", types.EventReExecBlock, &types.ReplyString{Data: "none"}))
return
}
msg.Reply(chain.client.NewMessage("store", types.EventReExecBlock, &types.ReplyString{Data: "need"}))
chain.ProcessReExecBlock(data.Height, curHeight)
}
func (chain *BlockChain) queryTx(msg *queue.Message) { func (chain *BlockChain) queryTx(msg *queue.Message) {
txhash := (msg.Data).(*types.ReqHash) txhash := (msg.Data).(*types.ReqHash)
TransactionDetail, err := chain.ProcQueryTxMsg(txhash.Hash) TransactionDetail, err := chain.ProcQueryTxMsg(txhash.Hash)
......
...@@ -10,8 +10,6 @@ import ( ...@@ -10,8 +10,6 @@ import (
"math/big" "math/big"
"sync/atomic" "sync/atomic"
"fmt"
"github.com/33cn/chain33/client/api" "github.com/33cn/chain33/client/api"
"github.com/33cn/chain33/common" "github.com/33cn/chain33/common"
"github.com/33cn/chain33/common/difficulty" "github.com/33cn/chain33/common/difficulty"
...@@ -601,40 +599,6 @@ func (b *BlockChain) ProcessDelParaChainBlock(broadcast bool, blockdetail *types ...@@ -601,40 +599,6 @@ func (b *BlockChain) ProcessDelParaChainBlock(broadcast bool, blockdetail *types
return nil, true, false, nil return nil, true, false, nil
} }
// ProcessReExecBlock 从对应高度本地重新执行区块
func (b *BlockChain) ProcessReExecBlock(startHeight, curHeight int64) {
var prevStateHash []byte
if startHeight > 0 {
blockdetail, err := b.GetBlock(startHeight - 1)
if err != nil {
panic(fmt.Sprintf("get height=%d err, this not allow fail", startHeight-1))
}
prevStateHash = blockdetail.Block.StateHash
}
for i := startHeight; i <= curHeight; i++ {
blockdetail, err := b.GetBlock(i)
if err != nil {
panic(fmt.Sprintf("get height=%d err, this not allow fail", i))
}
block := blockdetail.Block
err = execBlockUpgrade(b.client, prevStateHash, block, false)
if err != nil {
panic(fmt.Sprintf("execBlockEx height=%d err=%s, this not allow fail", i, err.Error()))
}
prevStateHash = block.StateHash
}
// 通知执行结束
msg := b.client.NewMessage("store", types.EventReExecBlock, &types.ReplyString{Data: "over"})
err := b.client.Send(msg, true)
if err != nil {
return
}
_, err = b.client.Wait(msg)
if err != nil {
return
}
}
// IsRecordFaultErr 检测此错误是否要记录到故障错误中 // IsRecordFaultErr 检测此错误是否要记录到故障错误中
func IsRecordFaultErr(err error) bool { func IsRecordFaultErr(err error) bool {
return err != types.ErrFutureBlock && !api.IsGrpcError(err) && !api.IsQueueError(err) return err != types.ErrFutureBlock && !api.IsGrpcError(err) && !api.IsQueueError(err)
......
...@@ -29,7 +29,7 @@ func (chain *BlockChain) UpgradeChain() { ...@@ -29,7 +29,7 @@ func (chain *BlockChain) UpgradeChain() {
} }
if chain.needReIndex(meta) { if chain.needReIndex(meta) {
//如果没有开始重建index,那么先del all keys //如果没有开始重建index,那么先del all keys
if !meta.Indexing { if !meta.Starting {
chainlog.Info("begin del all keys") chainlog.Info("begin del all keys")
chain.blockStore.delAllKeys() chain.blockStore.delAllKeys()
chainlog.Info("end del all keys") chainlog.Info("end del all keys")
...@@ -38,7 +38,7 @@ func (chain *BlockChain) UpgradeChain() { ...@@ -38,7 +38,7 @@ func (chain *BlockChain) UpgradeChain() {
//reindex 的过程中,会每个高度都去更新meta //reindex 的过程中,会每个高度都去更新meta
chain.reIndex(start, curheight) chain.reIndex(start, curheight)
meta := &types.UpgradeMeta{ meta := &types.UpgradeMeta{
Indexing: false, Starting: false,
Version: version.GetLocalDBVersion(), Version: version.GetLocalDBVersion(),
Height: 0, Height: 0,
} }
...@@ -59,7 +59,7 @@ func (chain *BlockChain) reIndex(start, end int64) { ...@@ -59,7 +59,7 @@ func (chain *BlockChain) reIndex(start, end int64) {
} }
func (chain *BlockChain) needReIndex(meta *types.UpgradeMeta) bool { func (chain *BlockChain) needReIndex(meta *types.UpgradeMeta) bool {
if meta.Indexing { //正在index if meta.Starting { //正在index
return true return true
} }
v1 := meta.Version v1 := meta.Version
...@@ -89,7 +89,7 @@ func (chain *BlockChain) reIndexOne(height int64) error { ...@@ -89,7 +89,7 @@ func (chain *BlockChain) reIndexOne(height int64) error {
panic(err) panic(err)
} }
meta := &types.UpgradeMeta{ meta := &types.UpgradeMeta{
Indexing: true, Starting: true,
Version: version.GetLocalDBVersion(), Version: version.GetLocalDBVersion(),
Height: height + 1, Height: height + 1,
} }
......
...@@ -202,6 +202,8 @@ dbPath="datadir/mavltree" ...@@ -202,6 +202,8 @@ dbPath="datadir/mavltree"
dbCache=128 dbCache=128
# 数据库版本 # 数据库版本
localdbVersion="1.0.0" localdbVersion="1.0.0"
# 状态数据库版本
storedbVersion="1.0.0"
[store.sub.mavl] [store.sub.mavl]
# 是否使能mavl加前缀 # 是否使能mavl加前缀
......
...@@ -12,8 +12,10 @@ var ( ...@@ -12,8 +12,10 @@ var (
WalletVerKey = []byte("WalletVerKey") WalletVerKey = []byte("WalletVerKey")
BlockChainVerKey = []byte("BlockChainVerKey") BlockChainVerKey = []byte("BlockChainVerKey")
LocalDBMeta = []byte("LocalDBMeta") LocalDBMeta = []byte("LocalDBMeta")
StoreDBMeta = []byte("StoreDBMeta")
MavlTreeVerKey = []byte("MavlTreeVerKey") MavlTreeVerKey = []byte("MavlTreeVerKey")
localversion = "1.0.0" localversion = "1.0.0"
storeversion = "1.0.0"
appversion = "1.0.0" appversion = "1.0.0"
GitCommit string GitCommit string
) )
...@@ -49,6 +51,22 @@ func SetLocalDBVersion(version string) { ...@@ -49,6 +51,22 @@ func SetLocalDBVersion(version string) {
} }
} }
//GetStoreDBVersion 数据库版本解析
/*
格式: v1.v2.v3
如果: v1 升级了, 那么意味着localdb 需要 重新 reindex
*/
func GetStoreDBVersion() string {
return localversion
}
//SetStoreDBVersion 通过设置版本号,强制重建数据库
func SetStoreDBVersion(version string) {
if version != "" {
localversion = version
}
}
//GetAppVersion 获取应用 app 的版本 //GetAppVersion 获取应用 app 的版本
func GetAppVersion() string { func GetAppVersion() string {
return appversion return appversion
......
...@@ -160,26 +160,6 @@ func decodeTx(hexstr string) (*types.Transaction, error) { ...@@ -160,26 +160,6 @@ func decodeTx(hexstr string) (*types.Transaction, error) {
return &tx, nil return &tx, nil
} }
// SendRawTransaction send rawtransaction by p2p
func (c *channelClient) SendRawTransaction(param *types.SignedTx) (*types.Reply, error) {
if param == nil {
err := types.ErrInvalidParam
log.Error("SendRawTransaction", "Error", err)
return nil, err
}
var tx types.Transaction
err := types.Decode(param.GetUnsign(), &tx)
if err == nil {
tx.Signature = &types.Signature{
Ty: param.GetTy(),
Pubkey: param.GetPubkey(),
Signature: param.GetSign(),
}
return c.SendTx(&tx)
}
return nil, err
}
// GetAddrOverview get overview of address // GetAddrOverview get overview of address
func (c *channelClient) GetAddrOverview(parm *types.ReqAddr) (*types.AddrOverview, error) { func (c *channelClient) GetAddrOverview(parm *types.ReqAddr) (*types.AddrOverview, error) {
err := address.CheckAddress(parm.Addr) err := address.CheckAddress(parm.Addr)
......
...@@ -80,11 +80,6 @@ func (g *Grpc) CreateRawTxGroup(ctx context.Context, in *pb.CreateTransactionGro ...@@ -80,11 +80,6 @@ func (g *Grpc) CreateRawTxGroup(ctx context.Context, in *pb.CreateTransactionGro
return &pb.UnsignTx{Data: reply}, nil return &pb.UnsignTx{Data: reply}, nil
} }
// SendRawTransaction send rawtransaction
func (g *Grpc) SendRawTransaction(ctx context.Context, in *pb.SignedTx) (*pb.Reply, error) {
return g.cli.SendRawTransaction(in)
}
// QueryTransaction query transaction by grpc // QueryTransaction query transaction by grpc
func (g *Grpc) QueryTransaction(ctx context.Context, in *pb.ReqHash) (*pb.TransactionDetail, error) { func (g *Grpc) QueryTransaction(ctx context.Context, in *pb.ReqHash) (*pb.TransactionDetail, error) {
return g.cli.QueryTx(in) return g.cli.QueryTx(in)
......
...@@ -8,7 +8,6 @@ import ( ...@@ -8,7 +8,6 @@ import (
"context" "context"
"encoding/hex" "encoding/hex"
"encoding/json" "encoding/json"
"fmt"
"time" "time"
"github.com/33cn/chain33/common" "github.com/33cn/chain33/common"
...@@ -83,36 +82,6 @@ func (c *Chain33) CreateNoBalanceTransaction(in *types.NoBalanceTx, result *stri ...@@ -83,36 +82,6 @@ func (c *Chain33) CreateNoBalanceTransaction(in *types.NoBalanceTx, result *stri
return nil return nil
} }
// SendRawTransaction send rawtransacion
func (c *Chain33) SendRawTransaction(in rpctypes.SignedTx, result *interface{}) error {
var stx types.SignedTx
var err error
stx.Pubkey, err = hex.DecodeString(in.Pubkey)
if err != nil {
return err
}
stx.Sign, err = hex.DecodeString(in.Sign)
if err != nil {
return err
}
stx.Unsign, err = hex.DecodeString(in.Unsign)
if err != nil {
return err
}
stx.Ty = in.Ty
reply, err := c.cli.SendRawTransaction(&stx)
if err != nil {
return err
}
if reply.IsOk {
*result = "0x" + hex.EncodeToString(reply.Msg)
return nil
}
return fmt.Errorf(string(reply.Msg))
}
// SendTransaction send transaction // SendTransaction send transaction
func (c *Chain33) SendTransaction(in rpctypes.RawParm, result *interface{}) error { func (c *Chain33) SendTransaction(in rpctypes.RawParm, result *interface{}) error {
var parm types.Transaction var parm types.Transaction
......
...@@ -86,9 +86,6 @@ func (store *BaseStore) SetQueueClient(c queue.Client) { ...@@ -86,9 +86,6 @@ func (store *BaseStore) SetQueueClient(c queue.Client) {
} }
store.done <- struct{}{} store.done <- struct{}{}
}() }()
if store.child != nil {
store.child.ProcEvent(nil)
}
} }
//Wait wait for basestore ready //Wait wait for basestore ready
......
...@@ -81,6 +81,8 @@ type Consensus struct { ...@@ -81,6 +81,8 @@ type Consensus struct {
Genesis string `protobuf:"bytes,4,opt,name=genesis" json:"genesis,omitempty"` Genesis string `protobuf:"bytes,4,opt,name=genesis" json:"genesis,omitempty"`
HotkeyAddr string `protobuf:"bytes,5,opt,name=hotkeyAddr" json:"hotkeyAddr,omitempty"` HotkeyAddr string `protobuf:"bytes,5,opt,name=hotkeyAddr" json:"hotkeyAddr,omitempty"`
ForceMining bool `protobuf:"varint,6,opt,name=forceMining" json:"forceMining,omitempty"` ForceMining bool `protobuf:"varint,6,opt,name=forceMining" json:"forceMining,omitempty"`
// 配置挖矿的合约名单
MinerExecs []string `protobuf:"bytes,7,rep,name=minerExecs" json:"minerExecs,omitempty"`
} }
// Wallet 配置 // Wallet 配置
...@@ -109,6 +111,8 @@ type Store struct { ...@@ -109,6 +111,8 @@ type Store struct {
DbCache int32 `protobuf:"varint,4,opt,name=dbCache" json:"dbCache,omitempty"` DbCache int32 `protobuf:"varint,4,opt,name=dbCache" json:"dbCache,omitempty"`
// 数据库版本 // 数据库版本
LocalDBVersion string `protobuf:"bytes,5,opt,name=localdbVersion" json:"localdbVersion,omitempty"` LocalDBVersion string `protobuf:"bytes,5,opt,name=localdbVersion" json:"localdbVersion,omitempty"`
// 数据库版本
StoreDBVersion string `protobuf:"bytes,5,opt,name=storedbVersion" json:"storedbVersion,omitempty"`
} }
// BlockChain 配置 // BlockChain 配置
......
...@@ -20,8 +20,6 @@ service chain33 { ...@@ -20,8 +20,6 @@ service chain33 {
//交易接口 //交易接口
rpc CreateRawTransaction(CreateTx) returns (UnsignTx) {} rpc CreateRawTransaction(CreateTx) returns (UnsignTx) {}
rpc CreateRawTxGroup(CreateTransactionGroup) returns (UnsignTx) {} rpc CreateRawTxGroup(CreateTransactionGroup) returns (UnsignTx) {}
//发送签名后交易
rpc SendRawTransaction(SignedTx) returns (Reply) {}
// 根据哈希查询交易 // 根据哈希查询交易
rpc QueryTransaction(ReqHash) returns (TransactionDetail) {} rpc QueryTransaction(ReqHash) returns (TransactionDetail) {}
// 发送交易 // 发送交易
......
...@@ -76,13 +76,6 @@ message NoBalanceTx { ...@@ -76,13 +76,6 @@ message NoBalanceTx {
string expire = 4; string expire = 4;
} }
message SignedTx {
bytes unsign = 1;
bytes sign = 2;
bytes pubkey = 3;
int32 ty = 4;
}
message Transaction { message Transaction {
bytes execer = 1; bytes execer = 1;
bytes payload = 2; bytes payload = 2;
...@@ -244,7 +237,7 @@ message UserWrite { ...@@ -244,7 +237,7 @@ message UserWrite {
} }
message UpgradeMeta { message UpgradeMeta {
bool indexing = 1; bool starting = 1;
string version = 2; string version = 2;
int64 height = 3; int64 height = 3;
} }
\ No newline at end of file
...@@ -134,6 +134,7 @@ func RunChain33(name string) { ...@@ -134,6 +134,7 @@ func RunChain33(name string) {
//开始区块链模块加载 //开始区块链模块加载
//channel, rabitmq 等 //channel, rabitmq 等
version.SetLocalDBVersion(cfg.Store.LocalDBVersion) version.SetLocalDBVersion(cfg.Store.LocalDBVersion)
version.SetStoreDBVersion(cfg.Store.StoreDBVersion)
version.SetAppVersion(cfg.Version) version.SetAppVersion(cfg.Version)
log.Info(cfg.Title + "-app:" + version.GetAppVersion() + " chain33:" + version.GetVersion() + " localdb:" + version.GetLocalDBVersion()) log.Info(cfg.Title + "-app:" + version.GetAppVersion() + " chain33:" + version.GetVersion() + " localdb:" + version.GetLocalDBVersion())
log.Info("loading queue") log.Info("loading queue")
...@@ -155,12 +156,13 @@ func RunChain33(name string) { ...@@ -155,12 +156,13 @@ func RunChain33(name string) {
log.Info("loading blockchain module") log.Info("loading blockchain module")
chain := blockchain.New(cfg.BlockChain) chain := blockchain.New(cfg.BlockChain)
chain.SetQueueClient(q.Client()) chain.SetQueueClient(q.Client())
chain.UpgradeChain()
log.Info("loading store module") log.Info("loading store module")
s := store.New(cfg.Store, sub.Store) s := store.New(cfg.Store, sub.Store)
s.SetQueueClient(q.Client()) s.SetQueueClient(q.Client())
chain.Upgrade()
log.Info("loading consensus module") log.Info("loading consensus module")
cs := consensus.New(cfg.Consensus, sub.Consensus) cs := consensus.New(cfg.Consensus, sub.Consensus)
cs.SetQueueClient(q.Client()) cs.SetQueueClient(q.Client())
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment