Commit 62debb8b authored by linj's avatar linj

impl trade.Upgrade without batch

parent 38f7fb5f
package executor package executor
import ( import (
"bytes"
dbm "github.com/33cn/chain33/common/db" dbm "github.com/33cn/chain33/common/db"
"github.com/33cn/chain33/types" "github.com/33cn/chain33/types"
pty "github.com/33cn/plugin/plugin/dapp/trade/types" pty "github.com/33cn/plugin/plugin/dapp/trade/types"
...@@ -32,40 +30,48 @@ const ( ...@@ -32,40 +30,48 @@ const (
// Upgrade 实现升级接口 // Upgrade 实现升级接口
func (t *trade) Upgrade() error { func (t *trade) Upgrade() error {
localDB := t.GetLocalDB() localDB := t.GetLocalDB()
return TradeUpdateLocalDBV2(localDB, 0) err := TradeUpdateLocalDBV2(localDB)
if err != nil {
tradelog.Error("Upgrade failed", "err", err)
return errors.Cause(err)
}
return nil
} }
// TradeUpdateLocalDBV2 trade 本地数据库升级 // TradeUpdateLocalDBV2 trade 本地数据库升级
// from 1 to 2 // from 1 to 2
func TradeUpdateLocalDBV2(localDB dbm.KVDB, total int) error { func TradeUpdateLocalDBV2(localDB dbm.KVDB) error {
// 外部不指定, 强制分批执行
if total <= 0 {
total = 10000
}
toVersion := 2 toVersion := 2
version, err := getVersion(localDB) version, err := getVersion(localDB)
if err != nil { if err != nil {
tradelog.Error("TradeUpdateLocalDBV2 get version", "err", err) errors.Wrap(err, "TradeUpdateLocalDBV2 get version")
return errors.Cause(err) return err
} }
if version >= toVersion { if version >= toVersion {
return nil return nil
} }
err = UpdateLocalDBPart2(localDB, total) err = UpdateLocalDBPart2(localDB)
if err != nil {
errors.Wrap(err, "TradeUpdateLocalDBV2 UpdateLocalDBPart2")
return err
}
err = UpdateLocalDBPart1(localDB)
if err != nil { if err != nil {
errors.Wrap(err, "TradeUpdateLocalDBV2 UpdateLocalDBPart1")
return err return err
} }
// TODO input DB to KVDB err = setVersion(localDB, toVersion)
err = UpdateLocalDBPart1(nil, total)
if err != nil { if err != nil {
errors.Wrap(err, "TradeUpdateLocalDBV2 setVersion")
return err return err
} }
return setVersion(localDB, toVersion) return nil
} }
// UpdateLocalDBPart1 手动生成KV,需要在原有数据库中删除 // UpdateLocalDBPart1 手动生成KV,需要在原有数据库中删除
func UpdateLocalDBPart1(localDB dbm.DB, total int) error { func UpdateLocalDBPart1(localDB dbm.KVDB) error {
prefixes := []string{ prefixes := []string{
sellOrderSHTAS, sellOrderSHTAS,
sellOrderASTS, sellOrderASTS,
...@@ -79,138 +85,90 @@ func UpdateLocalDBPart1(localDB dbm.DB, total int) error { ...@@ -79,138 +85,90 @@ func UpdateLocalDBPart1(localDB dbm.DB, total int) error {
} }
for _, prefix := range prefixes { for _, prefix := range prefixes {
err := delOnePrefix(localDB, prefix, total) err := delOnePrefix(localDB, prefix)
if err != nil {
tradelog.Error("UpdateLocalDBPart1 failed", "err", err)
return errors.Cause(err)
}
}
return nil
}
func delOnePrefix(localDB dbm.DB, prefix string, total int) (err error) {
allDeleted := false
for !allDeleted {
allDeleted, err = delOnePrefixLimit(localDB, prefix, total)
if err != nil { if err != nil {
errors.Wrapf(err, "UpdateLocalDBPart1 delOnePrefix: %s", prefix)
return err return err
} }
} }
return nil return nil
} }
// 删除指定前缀的N个记录 // delOnePrefix 删除指定前缀的记录
// DB interface -> included IteratorDB interface func delOnePrefix(localDB dbm.KVDB, prefix string) error {
// IteratorDB interface -> Iterator func return Iterator interface
// Iterator interface -> Key func
// 先写入固定的 start, end, 所有key 在这中间, 然后慢慢处理
func delOnePrefixLimit(localDB dbm.DB, prefix string, total int) (allDeleted bool, err error) {
start := []byte(prefix) start := []byte(prefix)
err = localDB.SetSync(start, []byte("")) keys, err := localDB.List(start, nil, 0, dbm.ListASC|dbm.ListKeyOnly)
if err != nil { if err != nil {
return allDeleted, errors.Wrap(err, "SetSync for prefix start:"+prefix) return err
}
keys := make([][]byte, total)
count := 0
it := localDB.Iterator(start, nil, false)
for it.Rewind(); it.Valid(); it.Next() {
keys[count] = it.Key()
count++
if count == total {
break
}
} }
for _, key := range keys {
batch := localDB.NewBatch(false) err = localDB.Set(key, nil)
for i := 0; i < count; i++ { if err != nil {
// 保护下, 避免测试其他bug,误删其他key, 破坏数据库 return err
if !bytes.HasPrefix(keys[i], start) {
tradelog.Error("delOnePrefixLimit delete key not match prefix", "prefix", prefix, "key", string(keys[i]))
panic("bug: " + "delOnePrefixLimit delete key not match prefix: " + prefix + " " + string(keys[i]))
} }
tradelog.Debug("delOnePrefixLimit", "KEY", string(keys[i]))
batch.Delete(keys[i])
}
err = batch.Write()
if err != nil {
return allDeleted, errors.Wrap(err, "batch.Write when delete prefix keys: "+prefix)
} }
return count < total, nil return nil
} }
// UpdateLocalDBPart2 升级order // UpdateLocalDBPart2 升级order
// order 从 v1 升级到 v2 // order 从 v1 升级到 v2
// 通过tableV1 删除, 通过tableV2 添加, 无需通过每个区块扫描对应的交易 // 通过tableV1 删除, 通过tableV2 添加, 无需通过每个区块扫描对应的交易
func UpdateLocalDBPart2(kvdb dbm.KVDB, total int) error { func UpdateLocalDBPart2(kvdb dbm.KVDB) error {
return upgradeOrder(kvdb, total) return upgradeOrder(kvdb)
} }
func upgradeOrder(kvdb dbm.KVDB, total int) (err error) { func upgradeOrder(kvdb dbm.KVDB) (err error) {
allDeleted := false
for !allDeleted {
allDeleted, err = upgradeOrderLimit(kvdb, total)
if err != nil {
return err
}
}
return nil
}
func upgradeOrderLimit(kvdb dbm.KVDB, total int) (allDeleted bool, err error) {
tab2 := NewOrderTableV2(kvdb) tab2 := NewOrderTableV2(kvdb)
tab := NewOrderTable(kvdb) tab := NewOrderTable(kvdb)
q1 := tab.GetQuery(kvdb) q1 := tab.GetQuery(kvdb)
var order1 pty.LocalOrder var order1 pty.LocalOrder
rows, err := q1.List("key", &order1, []byte(""), int32(total), 0) rows, err := q1.List("key", &order1, []byte(""), 0, 0)
if err != nil && err != types.ErrNotFound { if err != nil {
return false, errors.Wrap(err, "upgradeOrderLimit list from order v1 table") if err == types.ErrNotFound {
return nil
}
return errors.Wrap(err, "upgradeOrderLimit list from order v1 table")
} }
for _, row := range rows { for _, row := range rows {
o1, ok := row.Data.(*pty.LocalOrder) o1, ok := row.Data.(*pty.LocalOrder)
if !ok { if !ok {
return false, errors.Wrap(types.ErrTypeAsset, "decode order v1") return errors.Wrap(types.ErrTypeAsset, "decode order v1")
} }
err = tab2.Add(o1) err = tab2.Add(o1)
if err != nil { if err != nil {
return false, errors.Wrap(err, "upgradeOrderLimit add to order v2 table") return errors.Wrap(err, "upgradeOrderLimit add to order v2 table")
} }
err = tab.Del([]byte(o1.GetKey())) err = tab.Del([]byte(o1.GetKey()))
if err != nil { if err != nil {
return false, errors.Wrap(err, "upgradeOrderLimit add to order v2 table") return errors.Wrap(err, "upgradeOrderLimit add to order v2 table")
} }
} }
kvs, err := tab2.Save() kvs, err := tab2.Save()
if err != nil { if err != nil {
return false, errors.Wrap(err, "upgradeOrderLimit save-add to order v2 table") return errors.Wrap(err, "upgradeOrderLimit save-add to order v2 table")
} }
kvs2, err := tab.Save() kvs2, err := tab.Save()
if err != nil { if err != nil {
return false, errors.Wrap(err, "upgradeOrderLimit save-del to order v1 table") return errors.Wrap(err, "upgradeOrderLimit save-del to order v1 table")
} }
kvs = append(kvs, kvs2...) kvs = append(kvs, kvs2...)
kvdb.Begin()
for _, kv := range kvs { for _, kv := range kvs {
tradelog.Debug("upgradeOrderLimit", "KEY", string(kv.GetKey())) tradelog.Debug("upgradeOrderLimit", "KEY", string(kv.GetKey()))
err = kvdb.Set(kv.GetKey(), kv.GetValue()) err = kvdb.Set(kv.GetKey(), kv.GetValue())
if err != nil { if err != nil {
err = errors.Wrap(err, "upgradeOrderLimit sed localdb")
break break
} }
} }
if err != nil { if err != nil {
kvdb.Rollback() return errors.Wrap(err, "upgradeOrderLimit kvdb set")
return false, errors.Wrap(err, "upgradeOrderLimit kvdb set")
} }
err = kvdb.Commit() return nil
if err != nil {
kvdb.Rollback()
return false, errors.Wrap(err, "upgradeOrderLimit kvdb set")
}
return len(rows) < total, nil
} }
// localdb Version // localdb Version
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment