Commit 680eb2a9 authored by vipwzw's avatar vipwzw Committed by 33cn

update chain33

parent e1b404bd
......@@ -25,7 +25,10 @@ func init() {
go func() {
for {
time.Sleep(60 * time.Second)
if len(globalPointerMap) > 10 {
gloabalMu.Lock()
pointerLen := len(globalPointerMap)
gloabalMu.Unlock()
if pointerLen > 10 {
println("<==>global pointer count = ", len(globalPointerMap))
}
}
......
......@@ -12,10 +12,15 @@ import (
"net"
"sort"
"time"
log "github.com/33cn/chain33/common/log/log15"
)
const ntpEpochOffset = 2208988800
// 如果获取的ntp时间和自己时间相差太大,超过阈值,保守起见,不采用
const safeDeltaScope = 300 * 1000 * int64(time.Millisecond)
//ErrNetWorkDealy error
var ErrNetWorkDealy = errors.New("ErrNetWorkDealy")
......@@ -129,6 +134,11 @@ func GetNtpTime(host string) (time.Time, error) {
d1 := t2.Sub(t1)
d2 := t3.Sub(t4)
delt := (d1 + d2) / 2
if delt > time.Duration(safeDeltaScope) || delt < time.Duration(-safeDeltaScope) {
log.Error("GetNtpTime", "host", host, "delt", delt, "RxSec", rsp.RxTimeSec, "RxNs", rsp.RxTimeFrac, "TxSec", rsp.TxTimeSec, "TxNs", rsp.TxTimeFrac)
log.Error("GetNtpTime", "delt", delt, "t1", t1, "t2", t2, "t3", t3, "now", t4, "d1", d1, "d2", d2)
return time.Time{}, errors.New("WrongNtpDelteTime")
}
return t4.Add(delt), nil
}
......@@ -147,8 +157,9 @@ func GetRealTime(hosts []string) time.Time {
ch := make(chan time.Duration, len(hosts))
for i := 0; i < len(hosts); i++ {
go func(host string) {
ntptime, err := getTimeRetry(host, 10)
ntptime, err := getTimeRetry(host, 1)
if ntptime.IsZero() || err != nil {
println("getTimeRetry", err.Error())
ch <- time.Duration(math.MaxInt64)
} else {
dt := time.Until(ntptime)
......@@ -168,7 +179,7 @@ func GetRealTime(hosts []string) time.Time {
calclist := make([]time.Duration, len(dtlist))
copy(calclist, dtlist)
sort.Sort(durationSlice(calclist))
calclist = maxSubList(calclist)
calclist = maxSubList(calclist, time.Millisecond*100)
if len(calclist) < q {
continue
}
......@@ -189,28 +200,25 @@ func abs(t time.Duration) time.Duration {
return t
}
func maxSubList(list []time.Duration) (sub []time.Duration) {
func maxSubList(list []time.Duration, dt time.Duration) (sub []time.Duration) {
if len(list) == 0 {
return list
}
var start int
var next int
for i := 0; i < len(list); i++ {
var nextheight time.Duration
if i+1 == len(list) {
nextheight = math.MaxInt64
var nextTime time.Duration
next = i + 1
if next == len(list) {
nextTime = math.MaxInt64
} else {
nextheight = list[i+1]
nextTime = list[next]
}
var start int
var end int
if abs(nextheight-list[i]) > time.Millisecond*100 {
end = i
if len(sub) < (end - start) {
sub = list[start:end]
if abs(nextTime-list[i]) > dt {
if len(sub) < (next-start) && (next-start) > 1 {
sub = list[start:next]
}
start = i
end = i
} else {
end = i + 1
start = next
}
}
return sub
......
......@@ -5,7 +5,6 @@
package common
import (
"fmt"
"testing"
"time"
......@@ -37,6 +36,19 @@ func TestGetRealTime(t *testing.T) {
if nettime2.IsZero() {
return
}
fmt.Println(nettime, nettime2)
assert.Equal(t, nettime2.Sub(nettime)/time.Second, delt/time.Second)
}
func TestSubList(t *testing.T) {
sub := maxSubList([]time.Duration{1, 2, 3, 10, 21, 22, 23, 24, 35}, time.Duration(10))
assert.Equal(t, len(sub), 4)
assert.Equal(t, sub[0], time.Duration(1))
sub = maxSubList([]time.Duration{2, 3, 10, 21, 22, 23, 24, 35}, time.Duration(10))
assert.Equal(t, len(sub), 4)
assert.Equal(t, sub[0], time.Duration(21))
sub = maxSubList([]time.Duration{2, 3, 10, 21, 22, 23, 24}, time.Duration(10))
assert.Equal(t, len(sub), 4)
assert.Equal(t, sub[0], time.Duration(21))
}
......@@ -224,7 +224,7 @@ func (e *executor) loadDriver(tx *types.Transaction, index int) (c drivers.Drive
return exec
}
func (e *executor) execTxGroup(exec *Executor, txs []*types.Transaction, index int) ([]*types.Receipt, error) {
func (e *executor) execTxGroup(txs []*types.Transaction, index int) ([]*types.Receipt, error) {
txgroup := &types.Transactions{Txs: txs}
err := e.checkTxGroup(txgroup, index)
if err != nil {
......@@ -237,8 +237,7 @@ func (e *executor) execTxGroup(exec *Executor, txs []*types.Transaction, index i
//开启内存事务处理,假设系统只有一个thread 执行
//如果系统执行失败,回滚到这个状态
rollbackLog := copyReceipt(feelog)
e.stateDB.Begin()
e.localDB.Begin()
e.begin()
receipts := make([]*types.Receipt, len(txs))
for i := 1; i < len(txs); i++ {
receipts[i] = &types.Receipt{Ty: types.ExecPack}
......@@ -251,12 +250,10 @@ func (e *executor) execTxGroup(exec *Executor, txs []*types.Transaction, index i
}
//状态数据库回滚
if types.IsFork(e.height, "ForkExecRollback") {
e.stateDB.Rollback()
e.rollback()
}
e.localDB.Rollback()
return receipts, nil
}
exec.execLocalSameTime(e, txs[0], receipts[0], index)
for i := 1; i < len(txs); i++ {
//如果有一笔执行失败了,那么全部回滚
receipts[i], err = e.execTxOne(receipts[i], txs[i], index+i)
......@@ -273,14 +270,11 @@ func (e *executor) execTxGroup(exec *Executor, txs []*types.Transaction, index i
receipts[0] = rollbackLog
}
//撤销所有的数据库更新
e.stateDB.Rollback()
e.localDB.Rollback()
e.rollback()
return receipts, nil
}
exec.execLocalSameTime(e, txs[i], receipts[i], index+i)
}
e.stateDB.Commit()
e.localDB.Commit()
e.commit()
return receipts, nil
}
......@@ -335,7 +329,7 @@ func copyReceipt(feelog *types.Receipt) *types.Receipt {
func (e *executor) execTxOne(feelog *types.Receipt, tx *types.Transaction, index int) (*types.Receipt, error) {
//只有到pack级别的,才会增加index
e.stateDB.(*StateDB).StartTx()
e.startTx()
receipt, err := e.Exec(tx, index)
if err != nil {
elog.Error("exec tx error = ", "err", err, "exec", string(tx.Execer), "action", tx.ActionName())
......@@ -349,23 +343,37 @@ func (e *executor) execTxOne(feelog *types.Receipt, tx *types.Transaction, index
//1. statedb 中 Set的 key 必须是 在 receipt.GetKV() 这个集合中
//2. receipt.GetKV() 中的 key, 必须符合权限控制要求
memkvset := e.stateDB.(*StateDB).GetSetKeys()
feelog, err = e.checkKV(feelog, memkvset, receipt.GetKV())
err = e.checkKV(memkvset, receipt.GetKV())
if err != nil {
errlog := &types.ReceiptLog{Ty: types.TyLogErr, Log: []byte(err.Error())}
feelog.Logs = append(feelog.Logs, errlog)
return feelog, err
}
feelog, err = e.checkKeyAllow(feelog, tx, index, receipt.GetKV())
if err != nil {
return feelog, err
}
err = e.execLocalSameTime(tx, feelog, index)
if err != nil {
elog.Error("execLocalSameTime", "err", err)
errlog := &types.ReceiptLog{Ty: types.TyLogErr, Log: []byte(err.Error())}
feelog.Logs = append(feelog.Logs, errlog)
return feelog, err
}
if receipt != nil {
feelog.KV = append(feelog.KV, receipt.KV...)
feelog.Logs = append(feelog.Logs, receipt.Logs...)
feelog.Ty = receipt.Ty
}
if types.IsFork(e.height, "ForkStateDBSet") {
for _, v := range feelog.KV {
e.stateDB.Set(v.Key, v.Value)
}
}
return feelog, nil
}
func (e *executor) checkKV(feelog *types.Receipt, memset []string, kvs []*types.KeyValue) (*types.Receipt, error) {
func (e *executor) checkKV(memset []string, kvs []*types.KeyValue) error {
keys := make(map[string]bool)
for _, kv := range kvs {
k := kv.GetKey()
......@@ -375,12 +383,10 @@ func (e *executor) checkKV(feelog *types.Receipt, memset []string, kvs []*types.
if _, ok := keys[key]; !ok {
elog.Error("err memset key", "key", key)
//非法的receipt,交易执行失败
errlog := &types.ReceiptLog{Ty: types.TyLogErr, Log: []byte(types.ErrNotAllowMemSetKey.Error())}
feelog.Logs = append(feelog.Logs, errlog)
return feelog, types.ErrNotAllowMemSetKey
return types.ErrNotAllowMemSetKey
}
}
return feelog, nil
return nil
}
func (e *executor) checkKeyAllow(feelog *types.Receipt, tx *types.Transaction, index int, kvs []*types.KeyValue) (*types.Receipt, error) {
......@@ -398,7 +404,52 @@ func (e *executor) checkKeyAllow(feelog *types.Receipt, tx *types.Transaction, i
return feelog, nil
}
func (e *executor) execTx(tx *types.Transaction, index int) (*types.Receipt, error) {
func (e *executor) begin() {
matchfork := types.IsFork(e.height, "ForkExecRollback")
if matchfork {
if e.stateDB != nil {
e.stateDB.Begin()
}
if e.localDB != nil {
e.localDB.Begin()
}
}
}
func (e *executor) commit() {
matchfork := types.IsFork(e.height, "ForkExecRollback")
if matchfork {
if e.stateDB != nil {
e.stateDB.Commit()
}
if e.localDB != nil {
e.localDB.Commit()
}
}
}
func (e *executor) startTx() {
if e.stateDB != nil {
e.stateDB.(*StateDB).StartTx()
}
if e.localDB != nil {
e.localDB.(*LocalDB).StartTx()
}
}
func (e *executor) rollback() {
matchfork := types.IsFork(e.height, "ForkExecRollback")
if matchfork {
if e.stateDB != nil {
e.stateDB.Rollback()
}
if e.localDB != nil {
e.localDB.Rollback()
}
}
}
func (e *executor) execTx(exec *Executor, tx *types.Transaction, index int) (*types.Receipt, error) {
if e.height == 0 { //genesis block 不检查手续费
receipt, err := e.Exec(tx, index)
if err != nil {
......@@ -424,19 +475,12 @@ func (e *executor) execTx(tx *types.Transaction, index int) (*types.Receipt, err
return nil, err
}
//ignore err
matchfork := types.IsFork(e.height, "ForkExecRollback")
if matchfork {
e.stateDB.Begin()
}
e.begin()
feelog, err = e.execTxOne(feelog, tx, index)
if err != nil {
if matchfork {
e.stateDB.Rollback()
}
e.rollback()
} else {
if matchfork {
e.stateDB.Commit()
}
e.commit()
}
elog.Debug("exec tx = ", "index", index, "execer", string(tx.Execer), "err", err)
if api.IsAPIEnvError(err) {
......@@ -459,3 +503,62 @@ func (e *executor) isAllowExec(key []byte, tx *types.Transaction, index int) boo
height := e.height
return isAllowKeyWrite(key, realExecer, tx, height)
}
func (e *executor) isExecLocalSameTime(tx *types.Transaction, index int) bool {
exec := e.loadDriver(tx, index)
return exec.ExecutorOrder() == drivers.ExecLocalSameTime
}
func (e *executor) checkPrefix(execer []byte, kvs []*types.KeyValue) error {
for i := 0; i < len(kvs); i++ {
err := isAllowLocalKey(execer, kvs[i].Key)
if err != nil {
//测试的情况下,先panic,实际情况下会删除返回错误
panic(err)
//return err
}
}
return nil
}
func (e *executor) execLocalSameTime(tx *types.Transaction, receipt *types.Receipt, index int) error {
if e.isExecLocalSameTime(tx, index) {
var r = &types.ReceiptData{}
if receipt != nil {
r.Ty = receipt.Ty
r.Logs = receipt.Logs
}
_, err := e.execLocalTx(tx, r, index)
return err
}
return nil
}
func (e *executor) execLocalTx(tx *types.Transaction, r *types.ReceiptData, index int) (*types.LocalDBSet, error) {
kv, err := e.execLocal(tx, r, index)
if err == types.ErrActionNotSupport {
return nil, nil
}
if err != nil {
return nil, err
}
memkvset := e.localDB.(*LocalDB).GetSetKeys()
if kv != nil && kv.KV != nil {
err := e.checkKV(memkvset, kv.KV)
if err != nil {
return nil, types.ErrNotAllowMemSetLocalKey
}
err = e.checkPrefix(tx.Execer, kv.KV)
if err != nil {
return nil, err
}
for _, kv := range kv.KV {
e.localDB.Set(kv.Key, kv.Value)
}
} else {
if len(memkvset) > 0 {
return nil, types.ErrNotAllowMemSetLocalKey
}
}
return kv, nil
}
......@@ -232,7 +232,7 @@ func (exec *Executor) procExecTxList(msg queue.Message) {
continue
}
if tx.GroupCount == 0 {
receipt, err := execute.execTx(tx, index)
receipt, err := execute.execTx(exec, tx, index)
if api.IsAPIEnvError(err) {
msg.Reply(exec.client.NewMessage("", types.EventReceipts, err))
return
......@@ -242,7 +242,6 @@ func (exec *Executor) procExecTxList(msg queue.Message) {
continue
}
//update local
exec.execLocalSameTime(execute, tx, receipt, index)
receipts = append(receipts, receipt)
index++
continue
......@@ -257,7 +256,7 @@ func (exec *Executor) procExecTxList(msg queue.Message) {
receipts = append(receipts, types.NewErrReceipt(types.ErrTxGroupCount))
continue
}
receiptlist, err := execute.execTxGroup(exec, datas.Txs[i:i+int(tx.GroupCount)], index)
receiptlist, err := execute.execTxGroup(datas.Txs[i:i+int(tx.GroupCount)], index)
i = i + int(tx.GroupCount) - 1
if len(receiptlist) > 0 && len(receiptlist) != int(tx.GroupCount) {
panic("len(receiptlist) must be equal tx.GroupCount")
......@@ -279,22 +278,6 @@ func (exec *Executor) procExecTxList(msg queue.Message) {
&types.Receipts{Receipts: receipts}))
}
func (exec *Executor) execLocalSameTime(execute *executor, tx *types.Transaction, receipt *types.Receipt, index int) {
e := execute.loadDriver(tx, index)
if e.ExecutorOrder() == drivers.ExecLocalSameTime {
var r = &types.ReceiptData{}
if receipt != nil {
r.Ty = receipt.Ty
r.Logs = receipt.Logs
}
_, err := exec.execLocalTx(execute, tx, r, index)
//ignore err, only print err
if err != nil {
elog.Debug("ExecLocal Same Time", "err", err)
}
}
}
func (exec *Executor) procExecAddBlock(msg queue.Message) {
datas := msg.GetData().(*types.BlockDetail)
b := datas.Block
......@@ -344,7 +327,8 @@ func (exec *Executor) procExecAddBlock(msg queue.Message) {
}
for i := 0; i < len(b.Txs); i++ {
tx := b.Txs[i]
kv, err := exec.execLocalTx(execute, tx, datas.Receipts[i], i)
execute.localDB.(*LocalDB).StartTx()
kv, err := execute.execLocalTx(tx, datas.Receipts[i], i)
if err != nil {
msg.Reply(exec.client.NewMessage("", types.EventAddBlock, err))
return
......@@ -356,26 +340,6 @@ func (exec *Executor) procExecAddBlock(msg queue.Message) {
msg.Reply(exec.client.NewMessage("", types.EventAddBlock, &kvset))
}
func (exec *Executor) execLocalTx(execute *executor, tx *types.Transaction, r *types.ReceiptData, index int) (*types.LocalDBSet, error) {
kv, err := execute.execLocal(tx, r, index)
if err == types.ErrActionNotSupport {
return nil, nil
}
if err != nil {
return nil, err
}
if kv != nil && kv.KV != nil {
err := exec.checkPrefix(tx.Execer, kv.KV)
if err != nil {
return nil, err
}
for _, kv := range kv.KV {
execute.localDB.Set(kv.Key, kv.Value)
}
}
return kv, nil
}
func (exec *Executor) procExecDelBlock(msg queue.Message) {
datas := msg.GetData().(*types.BlockDetail)
b := datas.Block
......@@ -430,7 +394,7 @@ func (exec *Executor) procExecDelBlock(msg queue.Message) {
return
}
if kv != nil && kv.KV != nil {
err := exec.checkPrefix(tx.Execer, kv.KV)
err := execute.checkPrefix(tx.Execer, kv.KV)
if err != nil {
msg.Reply(exec.client.NewMessage("", types.EventDelBlock, err))
return
......@@ -441,18 +405,6 @@ func (exec *Executor) procExecDelBlock(msg queue.Message) {
msg.Reply(exec.client.NewMessage("", types.EventDelBlock, &kvset))
}
func (exec *Executor) checkPrefix(execer []byte, kvs []*types.KeyValue) error {
for i := 0; i < len(kvs); i++ {
err := isAllowLocalKey(execer, kvs[i].Key)
if err != nil {
//测试的情况下,先panic,实际情况下会删除返回错误
panic(err)
//return err
}
}
return nil
}
// Close close executor
func (exec *Executor) Close() {
elog.Info("exec module closed")
......
......@@ -307,6 +307,10 @@ func (demo *demoApp) Exec(tx *types.Transaction, index int) (receipt *types.Rece
if err != nil && err != types.ErrNotFound {
return nil, err
}
if seterrkey {
println("set err key value")
demo.GetLocalDB().Set([]byte("key1"), []byte("value1"))
}
receipt = &types.Receipt{Ty: types.ExecOk}
receipt.KV = append(receipt.KV, &types.KeyValue{
Key: demoCalcStateKey(addr, id),
......@@ -402,3 +406,38 @@ func TestExecLocalSameTime0(t *testing.T) {
}
}
}
var seterrkey = false
func TestExecLocalSameTimeSetErrKey(t *testing.T) {
mock33 := newMockNode()
defer mock33.Close()
orderflag = 1
seterrkey = true
genkey := mock33.GetGenesisKey()
genaddr := mock33.GetGenesisAddress()
mock33.WaitHeight(0)
block := mock33.GetBlock(0)
assert.Equal(t, mock33.GetAccount(block.StateHash, genaddr).Balance, 100000000*types.Coin)
var txs []*types.Transaction
addr1, priv1 := util.Genaddress()
txs = append(txs, util.CreateCoinsTx(genkey, addr1, 1e8))
txs = append(txs, util.CreateTxWithExecer(priv1, "demo2"))
txs = append(txs, util.CreateTxWithExecer(priv1, "demo2"))
block2 := util.CreateNewBlock(block, txs)
detail, _, err := util.ExecBlock(mock33.GetClient(), block.StateHash, block2, false, true)
if err != nil {
t.Error(err)
return
}
for i, receipt := range detail.Receipts {
if i == 0 {
assert.Equal(t, receipt.GetTy(), int32(2))
}
if i >= 1 {
assert.Equal(t, receipt.GetTy(), int32(1))
assert.Equal(t, len(receipt.Logs), 2)
assert.Equal(t, receipt.Logs[1].Ty, int32(1))
}
}
}
......@@ -12,14 +12,15 @@ import (
//数据的get set 主要经过 cache
//如果需要进行list, 那么把get set 的内容加入到 后端数据库
type LocalDB struct {
cache map[string][]byte
txcache map[string][]byte
keys []string
intx bool
kvs []*types.KeyValue
txid *types.Int64
client queue.Client
api client.QueueProtocolAPI
cache map[string][]byte
txcache map[string][]byte
keys []string
intx bool
hasbegin bool
kvs []*types.KeyValue
txid *types.Int64
client queue.Client
api client.QueueProtocolAPI
}
//NewLocalDB 创建一个新的LocalDB
......@@ -44,6 +45,17 @@ func (l *LocalDB) resetTx() {
l.intx = false
l.txcache = nil
l.keys = nil
l.hasbegin = false
}
// StartTx reset state db keys
func (l *LocalDB) StartTx() {
l.keys = nil
}
// GetSetKeys get state db set keys
func (l *LocalDB) GetSetKeys() (keys []string) {
return l.keys
}
//Begin 开始一个事务
......@@ -51,14 +63,23 @@ func (l *LocalDB) Begin() {
l.intx = true
l.keys = nil
l.txcache = nil
l.hasbegin = false
}
func (l *LocalDB) begin() {
err := l.api.LocalBegin(l.txid)
if err != nil {
panic(err)
}
}
//第一次save 的时候,远程做一个 begin 操作,开始事务
func (l *LocalDB) save() error {
if l.kvs != nil {
if !l.hasbegin {
l.begin()
l.hasbegin = true
}
param := &types.LocalDBSet{Txid: l.txid.Data}
param.KV = l.kvs
err := l.api.LocalSet(param)
......@@ -79,8 +100,10 @@ func (l *LocalDB) Commit() error {
if err != nil {
return err
}
if l.hasbegin {
err = l.api.LocalCommit(l.txid)
}
l.resetTx()
err = l.api.LocalCommit(l.txid)
return err
}
......@@ -94,11 +117,13 @@ func (l *LocalDB) Close() error {
//Rollback 回滚修改
func (l *LocalDB) Rollback() {
l.resetTx()
err := l.api.LocalRollback(l.txid)
if err != nil {
panic(err)
if l.hasbegin {
err := l.api.LocalRollback(l.txid)
if err != nil {
panic(err)
}
}
l.resetTx()
}
//Get 获取key
......
......@@ -203,6 +203,7 @@ superManager=[
#但是我们可以替换
[fork.system]
ForkChainParamV1= 0
ForkStateDBSet=-1
ForkCheckTxDup=0
ForkBlockHash= 1
ForkMinerTime= 0
......
......@@ -158,6 +158,7 @@ var (
ErrTxGroupNotSupport = errors.New("ErrTxGroupNotSupport")
ErrNotAllowKey = errors.New("ErrNotAllowKey")
ErrNotAllowMemSetKey = errors.New("ErrNotAllowMemSetKey")
ErrNotAllowMemSetLocalKey = errors.New("ErrNotAllowMemSetLocalKey")
ErrDataBaseDamage = errors.New("ErrDataBaseDamage")
ErrIndex = errors.New("ErrIndex")
ErrTxGroupParaCount = errors.New("ErrTxGroupParaCount")
......
......@@ -210,7 +210,7 @@ func SetTestNetFork() {
systemFork.SetFork("chain33", "ForkTxGroupPara", 806578)
systemFork.SetFork("chain33", "ForkCheckBlockTime", 1200000)
systemFork.SetFork("chain33", "ForkMultiSignAddress", 1298600)
systemFork.SetFork("chain33", "ForkStateDBSet", MaxHeight)
}
func setLocalFork() {
......
......@@ -156,6 +156,7 @@ saveTokenTxList=false
[fork.system]
ForkChainParamV1= 0
ForkChainParamV2= -1
ForkStateDBSet=-1
ForkCheckTxDup=0
ForkBlockHash= 1
ForkMinerTime= 0
......
......@@ -147,6 +147,7 @@ ForkV16Withdraw= 480000
ForkV22ExecRollback= 706531
ForkV23TxHeight= 806578
ForkV24TxGroupPara= 806578
ForkStateDBSet=-1
[fork.sub.manage]
Enable=120000
......
......@@ -174,6 +174,7 @@ name2="ticket-bityuanv5-enable"
[fork.system]
ForkChainParamV1= 10
ForkChainParamV2= 10
ForkStateDBSet=-1
ForkCheckTxDup=0
ForkBlockHash= 1
ForkMinerTime= 10
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment