Commit fc9eec0c authored by kingwang's avatar kingwang Committed by 33cn

update 0130

parent 203c8aa0
......@@ -29,7 +29,7 @@ import (
*/
//ErrAPIEnv api的执行环境出问题,区块执行的时候,遇到这一个的错误需要retry
var ErrAPIEnv = errors.New("ErrAPIEnv")
var errAPIEnv = errors.New("ErrAPIEnv")
//ExecutorAPI 提供给执行器使用的接口
//因为合约是主链和平行链通用的,所以,主链和平行链都可以调用这套接口
......@@ -96,7 +96,7 @@ func (api *paraChainAPI) IsErr() bool {
func (api *paraChainAPI) QueryTx(param *types.ReqHash) (*types.TransactionDetail, error) {
data, err := api.grpcClient.QueryTransaction(context.Background(), param)
if err != nil {
err = ErrAPIEnv
err = errAPIEnv
}
return data, seterr(err, &api.errflag)
}
......@@ -104,7 +104,7 @@ func (api *paraChainAPI) QueryTx(param *types.ReqHash) (*types.TransactionDetail
func (api *paraChainAPI) GetRandNum(param *types.ReqRandHash) ([]byte, error) {
reply, err := api.grpcClient.QueryRandNum(context.Background(), param)
if err != nil {
err = ErrAPIEnv
err = errAPIEnv
return nil, seterr(err, &api.errflag)
}
return reply.Hash, nil
......@@ -113,7 +113,7 @@ func (api *paraChainAPI) GetRandNum(param *types.ReqRandHash) ([]byte, error) {
func (api *paraChainAPI) GetBlockByHashes(param *types.ReqHashes) (*types.BlockDetails, error) {
data, err := api.grpcClient.GetBlockByHashes(context.Background(), param)
if err != nil {
err = ErrAPIEnv
err = errAPIEnv
}
return data, seterr(err, &api.errflag)
}
......@@ -130,7 +130,7 @@ func IsGrpcError(err error) bool {
if err == nil {
return false
}
if err == ErrAPIEnv {
if err == errAPIEnv {
return true
}
if grpc.Code(err) == codes.Unknown {
......@@ -144,7 +144,7 @@ func IsQueueError(err error) bool {
if err == nil {
return false
}
if err == ErrAPIEnv {
if err == errAPIEnv {
return true
}
if err == queue.ErrQueueTimeout ||
......@@ -154,3 +154,8 @@ func IsQueueError(err error) bool {
}
return false
}
//IsAPIEnvError 是否是api执行环境的错误
func IsAPIEnvError(err error) bool {
return IsGrpcError(err) || IsQueueError(err)
}
......@@ -7,16 +7,25 @@ package executor
import (
"testing"
"github.com/33cn/chain33/common/db"
dbm "github.com/33cn/chain33/common/db"
"github.com/33cn/chain33/types"
"github.com/stretchr/testify/assert"
)
func newStateDbForTest(height int64) db.KV {
func newStateDbForTest(height int64) dbm.KV {
return NewStateDB(nil, nil, nil, &StateDBOption{Height: height})
}
func TestStateDBGet(t *testing.T) {
db := newStateDbForTest(0)
testDBGet(t, db)
}
func TestLocalDBGet(t *testing.T) {
db := NewLocalDB(nil)
testDBGet(t, db)
}
func testDBGet(t *testing.T, db dbm.KV) {
err := db.Set([]byte("k1"), []byte("v1"))
assert.Nil(t, err)
v, err := db.Get([]byte("k1"))
......@@ -72,13 +81,24 @@ func TestStateDBTxGetOld(t *testing.T) {
db.Begin()
db.Rollback()
db.Commit()
}
func TestStateDBTxGet(t *testing.T) {
db := newStateDbForTest(types.GetFork("ForkExecRollback"))
testTxGet(t, db)
}
func TestLocalDBTxGet(t *testing.T) {
db := NewLocalDB(nil)
testTxGet(t, db)
}
func testTxGet(t *testing.T, db dbm.KV) {
//新版本
db = newStateDbForTest(types.GetFork("ForkExecRollback"))
db.Begin()
err = db.Set([]byte("k1"), []byte("v1"))
err := db.Set([]byte("k1"), []byte("v1"))
assert.Nil(t, err)
v, err = db.Get([]byte("k1"))
v, err := db.Get([]byte("k1"))
assert.Nil(t, err)
assert.Equal(t, v, []byte("v1"))
......@@ -87,13 +107,24 @@ func TestStateDBTxGetOld(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, v, []byte("v1"))
//在非transaction中set,直接set成功,不能rollback
err = db.Set([]byte("k1"), []byte("v11"))
assert.Nil(t, err)
db.Begin()
v, err = db.Get([]byte("k1"))
assert.Nil(t, err)
//fork 之前有bug,这里读到了脏数据
assert.Equal(t, v, []byte("v11"))
err = db.Set([]byte("k1"), []byte("v12"))
assert.Nil(t, err)
v, err = db.Get([]byte("k1"))
assert.Nil(t, err)
assert.Equal(t, v, []byte("v12"))
db.Rollback()
v, err = db.Get([]byte("k1"))
assert.Nil(t, err)
assert.Equal(t, v, []byte("v11"))
}
......
......@@ -245,6 +245,10 @@ func (e *executor) execTxGroup(txs []*types.Transaction, index int) ([]*types.Re
}
receipts[0], err = e.execTxOne(feelog, txs[0], index)
if err != nil {
//接口临时错误,取消执行
if api.IsAPIEnvError(err) {
return nil, err
}
//状态数据库回滚
if types.IsFork(e.height, "ForkExecRollback") {
e.stateDB.Rollback()
......@@ -256,6 +260,9 @@ func (e *executor) execTxGroup(txs []*types.Transaction, index int) ([]*types.Re
receipts[i], err = e.execTxOne(receipts[i], txs[i], index+i)
if err != nil {
//reset other exec , and break!
if api.IsAPIEnvError(err) {
return nil, err
}
for k := 1; k < i; k++ {
receipts[k] = &types.Receipt{Ty: types.ExecPack}
}
......@@ -427,6 +434,9 @@ func (e *executor) execTx(tx *types.Transaction, index int) (*types.Receipt, err
}
}
elog.Debug("exec tx = ", "index", index, "execer", string(tx.Execer), "err", err)
if api.IsAPIEnvError(err) {
return nil, err
}
return feelog, nil
}
......
......@@ -217,7 +217,7 @@ func (exec *Executor) procExecTxList(msg queue.Message) {
}
if tx.GroupCount == 0 {
receipt, err := execute.execTx(tx, index)
if api.IsGrpcError(err) || api.IsQueueError(err) {
if api.IsAPIEnvError(err) {
msg.Reply(exec.client.NewMessage("", types.EventReceipts, err))
return
}
......@@ -245,7 +245,7 @@ func (exec *Executor) procExecTxList(msg queue.Message) {
panic("len(receiptlist) must be equal tx.GroupCount")
}
if err != nil {
if api.IsGrpcError(err) || api.IsQueueError(err) {
if api.IsAPIEnvError(err) {
msg.Reply(exec.client.NewMessage("", types.EventReceipts, err))
return
}
......
......@@ -10,6 +10,7 @@ import (
"encoding/hex"
"github.com/33cn/chain33/client/api"
"github.com/33cn/chain33/queue"
_ "github.com/33cn/chain33/system"
drivers "github.com/33cn/chain33/system/dapp"
......@@ -149,3 +150,53 @@ func TestKeyLocalAllow(t *testing.T) {
err = isAllowLocalKey([]byte("user.p.para.paracross"), []byte("LODB-paracross-xxxx"))
assert.Nil(t, err)
}
func init() {
drivers.Register("demo", newdemoApp, 1)
types.AllowUserExec = append(types.AllowUserExec, []byte("demo"))
}
//ErrEnvAPI 测试
type demoApp struct {
*drivers.DriverBase
}
func newdemoApp() drivers.Driver {
demo := &demoApp{DriverBase: &drivers.DriverBase{}}
demo.SetChild(demo)
return demo
}
func (demo *demoApp) GetDriverName() string {
return "demo"
}
func (demo *demoApp) Exec(tx *types.Transaction, index int) (receipt *types.Receipt, err error) {
return nil, queue.ErrQueueTimeout
}
func TestExecutorErrAPIEnv(t *testing.T) {
minfee := types.GInt("MinFee")
types.SetMinFee(0)
defer types.SetMinFee(minfee)
q := queue.New("channel")
exec := &Executor{client: q.Client()}
execInit(nil)
var txs []*types.Transaction
genkey := util.TestPrivkeyList[0]
txs = append(txs, util.CreateTxWithExecer(genkey, "demo"))
txlist := &types.ExecTxList{
StateHash: nil,
Height: 1,
BlockTime: time.Now().Unix(),
Difficulty: 1,
MainHash: nil,
MainHeight: 1,
ParentHash: nil,
Txs: txs,
}
msg := queue.NewMessage(0, "", 1, txlist)
exec.procExecTxList(msg)
_, err := exec.client.WaitTimeout(msg, 100*time.Second)
assert.Equal(t, true, api.IsAPIEnvError(err))
}
package executor
import (
"github.com/33cn/chain33/common/db"
"github.com/33cn/chain33/queue"
"github.com/33cn/chain33/types"
)
// LocalDB local db for store key value in local
type LocalDB struct {
db.TransactionDB
cache map[string][]byte
txcache map[string][]byte
keys []string
intx bool
client queue.Client
}
// NewLocalDB new local db
func NewLocalDB(client queue.Client) db.KVDB {
return &LocalDB{cache: make(map[string][]byte), client: client}
}
// Get get value from local db
func (l *LocalDB) Get(key []byte) ([]byte, error) {
value, err := l.get(key)
debugAccount("==lget==", key, value)
return value, err
}
func (l *LocalDB) get(key []byte) ([]byte, error) {
skey := string(key)
if l.intx && l.txcache != nil {
if value, ok := l.txcache[skey]; ok {
return value, nil
}
}
if value, ok := l.cache[skey]; ok {
return value, nil
}
if l.client == nil {
return nil, types.ErrNotFound
}
query := &types.LocalDBGet{Keys: [][]byte{key}}
msg := l.client.NewMessage("blockchain", types.EventLocalGet, query)
l.client.Send(msg, true)
resp, err := l.client.Wait(msg)
if err != nil {
panic(err) //no happen for ever
}
if nil == resp.GetData().(*types.LocalReplyValue).Values {
return nil, types.ErrNotFound
}
value := resp.GetData().(*types.LocalReplyValue).Values[0]
if value == nil {
//panic(string(key))
return nil, types.ErrNotFound
}
l.cache[string(key)] = value
return value, nil
}
// Set set key value to local db
func (l *LocalDB) Set(key []byte, value []byte) error {
debugAccount("==lset==", key, value)
skey := string(key)
if l.intx {
if l.txcache == nil {
l.txcache = make(map[string][]byte)
}
l.keys = append(l.keys, skey)
setmap(l.txcache, skey, value)
} else {
setmap(l.cache, skey, value)
}
return nil
}
// BatchGet batch get values from local db
func (l *LocalDB) BatchGet(keys [][]byte) (values [][]byte, err error) {
for _, key := range keys {
v, err := l.Get(key)
if err != nil && err != types.ErrNotFound {
return nil, err
}
values = append(values, v)
}
return values, nil
}
// List 从数据库中查询数据列表,set 中的cache 更新不会影响这个list
func (l *LocalDB) List(prefix, key []byte, count, direction int32) ([][]byte, error) {
if l.client == nil {
return nil, types.ErrNotFound
}
query := &types.LocalDBList{Prefix: prefix, Key: key, Count: count, Direction: direction}
msg := l.client.NewMessage("blockchain", types.EventLocalList, query)
l.client.Send(msg, true)
resp, err := l.client.Wait(msg)
if err != nil {
panic(err) //no happen for ever
}
values := resp.GetData().(*types.LocalReplyValue).Values
if values == nil {
//panic(string(key))
return nil, types.ErrNotFound
}
return values, nil
}
// PrefixCount 从数据库中查询指定前缀的key的数量
func (l *LocalDB) PrefixCount(prefix []byte) (count int64) {
if l.client == nil {
return 0
}
query := &types.ReqKey{Key: prefix}
msg := l.client.NewMessage("blockchain", types.EventLocalPrefixCount, query)
l.client.Send(msg, true)
resp, err := l.client.Wait(msg)
if err != nil {
panic(err) //no happen for ever
}
count = resp.GetData().(*types.Int64).Data
return
}
//Begin 开启内存事务处理
func (l *LocalDB) Begin() {
l.intx = true
l.keys = nil
l.txcache = nil
}
// Rollback reset tx
func (l *LocalDB) Rollback() {
l.resetTx()
}
// Commit canche tx
func (l *LocalDB) Commit() {
for k, v := range l.txcache {
l.cache[k] = v
}
l.resetTx()
}
func (l *LocalDB) resetTx() {
l.intx = false
l.txcache = nil
l.keys = nil
}
......@@ -146,7 +146,7 @@ func (s *StateDB) get(key []byte) ([]byte, error) {
}
func debugAccount(prefix string, key []byte, value []byte) {
//println(prefix, string(key), value)
//println(prefix, string(key), string(value))
/*
if !types.Debug {
return
......@@ -174,6 +174,7 @@ func (s *StateDB) Set(key []byte, value []byte) error {
debugAccount("==set==", key, value)
skey := string(key)
if s.intx {
println("set intx")
if s.txcache == nil {
s.txcache = make(map[string][]byte)
}
......@@ -204,104 +205,3 @@ func (s *StateDB) BatchGet(keys [][]byte) (values [][]byte, err error) {
}
return values, nil
}
// LocalDB local db for store key value in local
type LocalDB struct {
db.TransactionDB
cache map[string][]byte
client queue.Client
}
// NewLocalDB new local db
func NewLocalDB(client queue.Client) db.KVDB {
return &LocalDB{cache: make(map[string][]byte), client: client}
}
// Get get value from local db
func (l *LocalDB) Get(key []byte) ([]byte, error) {
value, err := l.get(key)
debugAccount("==lget==", key, value)
return value, err
}
func (l *LocalDB) get(key []byte) ([]byte, error) {
if value, ok := l.cache[string(key)]; ok {
return value, nil
}
if l.client == nil {
return nil, types.ErrNotFound
}
query := &types.LocalDBGet{Keys: [][]byte{key}}
msg := l.client.NewMessage("blockchain", types.EventLocalGet, query)
l.client.Send(msg, true)
resp, err := l.client.Wait(msg)
if err != nil {
panic(err) //no happen for ever
}
if nil == resp.GetData().(*types.LocalReplyValue).Values {
return nil, types.ErrNotFound
}
value := resp.GetData().(*types.LocalReplyValue).Values[0]
if value == nil {
//panic(string(key))
return nil, types.ErrNotFound
}
l.cache[string(key)] = value
return value, nil
}
// Set set key value to local db
func (l *LocalDB) Set(key []byte, value []byte) error {
debugAccount("==lset==", key, value)
setmap(l.cache, string(key), value)
return nil
}
// BatchGet batch get values from local db
func (l *LocalDB) BatchGet(keys [][]byte) (values [][]byte, err error) {
for _, key := range keys {
v, err := l.Get(key)
if err != nil && err != types.ErrNotFound {
return nil, err
}
values = append(values, v)
}
return values, nil
}
// List 从数据库中查询数据列表,set 中的cache 更新不会影响这个list
func (l *LocalDB) List(prefix, key []byte, count, direction int32) ([][]byte, error) {
if l.client == nil {
return nil, types.ErrNotFound
}
query := &types.LocalDBList{Prefix: prefix, Key: key, Count: count, Direction: direction}
msg := l.client.NewMessage("blockchain", types.EventLocalList, query)
l.client.Send(msg, true)
resp, err := l.client.Wait(msg)
if err != nil {
panic(err) //no happen for ever
}
values := resp.GetData().(*types.LocalReplyValue).Values
if values == nil {
//panic(string(key))
return nil, types.ErrNotFound
}
return values, nil
}
// PrefixCount 从数据库中查询指定前缀的key的数量
func (l *LocalDB) PrefixCount(prefix []byte) (count int64) {
if l.client == nil {
return 0
}
query := &types.ReqKey{Key: prefix}
msg := l.client.NewMessage("blockchain", types.EventLocalPrefixCount, query)
l.client.Send(msg, true)
resp, err := l.client.Wait(msg)
if err != nil {
panic(err) //no happen for ever
}
count = resp.GetData().(*types.Int64).Data
return
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment