Commit 9f5828d8 authored by liuyuhang's avatar liuyuhang

modify mult del mavl method

parent 314762f8
...@@ -356,39 +356,49 @@ func DelMavl(db dbm.DB) { ...@@ -356,39 +356,49 @@ func DelMavl(db dbm.DB) {
defer wg.Done() defer wg.Done()
setDelMavl(delMavlStateStart) setDelMavl(delMavlStateStart)
defer setDelMavl(delMavlStateEnd) defer setDelMavl(delMavlStateEnd)
isDel := delMavlData(db) prefix := ""
if isDel { loop := false
isDelMavlData = true for {
kmlog.Info("DelMavl success") kmlog.Debug("start once del mavl")
loop, prefix = delMavlData(db, prefix)
if !loop {
break
}
kmlog.Debug("end once del mavl")
time.Sleep(time.Second * 1)
} }
} }
func delMavlData(db dbm.DB) bool { func delMavlData(db dbm.DB, prefix string) (bool, string) {
it := db.Iterator(nil, nil, true) it := db.Iterator([]byte(prefix), types.EmptyValue, false)
defer it.Close() defer it.Close()
batch := db.NewBatch(true) batch := db.NewBatch(false)
count := 0 count := 0
const onceCount = 200 const onceCount = 50
for it.Rewind(); it.Valid(); it.Next() { for it.Rewind(); it.Valid(); it.Next() {
if quit { if quit {
return false return false, ""
} }
if !bytes.HasPrefix(it.Key(), mvccPrefix) { // 将非mvcc的mavl数据全部删除 if !bytes.HasPrefix(it.Key(), mvccPrefix) { // 将非mvcc的mavl数据全部删除
batch.Delete(it.Key()) batch.Delete(it.Key())
if batch.ValueSize() > batchDataSize { if batch.ValueSize() > batchDataSize {
dbm.MustWrite(batch) dbm.MustWrite(batch)
batch.Reset() batch.Reset()
time.Sleep(time.Millisecond * 100)
count++ count++
} }
} }
if count > onceCount { if count > onceCount {
return false if it.Next() {
return true, string(it.Key())
}
return true, ""
} }
} }
batch.Set(genDelMavlKey(mvccPrefix), []byte("")) batch.Set(genDelMavlKey(mvccPrefix), []byte(""))
dbm.MustWrite(batch) dbm.MustWrite(batch)
return true isDelMavlData = true
kmlog.Info("DelMavl success")
return false, ""
} }
func genDelMavlKey(prefix []byte) []byte { func genDelMavlKey(prefix []byte) []byte {
...@@ -429,35 +439,23 @@ func deletePrunedMavl(db dbm.DB) { ...@@ -429,35 +439,23 @@ func deletePrunedMavl(db dbm.DB) {
defer wg.Done() defer wg.Done()
setDelPrunedMavl(delPrunedMavlStarting) setDelPrunedMavl(delPrunedMavlStarting)
defer setDelPrunedMavl(delPruneMavlEnd) defer setDelPrunedMavl(delPruneMavlEnd)
prefixS := []string{hashNodePrefix, leafNodePrefix, leafKeyCountPrefix, oldLeafKeyCountPrefix}
for { for _, str := range prefixS {
loop := deletePrunedMavlData(db, hashNodePrefix) for {
if !loop { stat := deletePrunedMavlData(db, str)
break if stat == 0 {
} return
} } else if stat == 1 {
for { break
loop := deletePrunedMavlData(db, leafNodePrefix) } else {
if !loop { time.Sleep(time.Millisecond * 100)
break }
}
}
for {
loop := deletePrunedMavlData(db, leafKeyCountPrefix)
if !loop {
break
}
}
for {
loop := deletePrunedMavlData(db, oldLeafKeyCountPrefix)
if !loop {
break
} }
} }
} }
func deletePrunedMavlData(db dbm.DB, prefix string) (loop bool) { func deletePrunedMavlData(db dbm.DB, prefix string) (status int) {
it := db.Iterator([]byte(prefix), nil, true) it := db.Iterator([]byte(prefix), nil, false)
defer it.Close() defer it.Close()
count := 0 count := 0
const onceCount = 200 const onceCount = 200
...@@ -465,20 +463,19 @@ func deletePrunedMavlData(db dbm.DB, prefix string) (loop bool) { ...@@ -465,20 +463,19 @@ func deletePrunedMavlData(db dbm.DB, prefix string) (loop bool) {
batch := db.NewBatch(false) batch := db.NewBatch(false)
for it.Next(); it.Valid(); it.Next() { //第一个不做删除 for it.Next(); it.Valid(); it.Next() { //第一个不做删除
if quit { if quit {
return false return 0 // quit
} }
batch.Delete(it.Key()) batch.Delete(it.Key())
if batch.ValueSize() > batchDataSize { if batch.ValueSize() > batchDataSize {
dbm.MustWrite(batch) dbm.MustWrite(batch)
batch.Reset() batch.Reset()
time.Sleep(time.Millisecond * 100)
count++ count++
} }
if count > onceCount { if count > onceCount {
return true return 2 //loop
} }
} }
dbm.MustWrite(batch) dbm.MustWrite(batch)
} }
return false return 1 // this prefix Iterator over
} }
...@@ -573,7 +573,14 @@ func TestDelMavlData(t *testing.T) { ...@@ -573,7 +573,14 @@ func TestDelMavlData(t *testing.T) {
db.Set([]byte("key22"), []byte("value22")) db.Set([]byte("key22"), []byte("value22"))
quit = false quit = false
delMavlData(db) prefix := ""
loop := false
for {
loop, prefix = delMavlData(db, prefix)
if !loop {
break
}
}
v, err := db.Get([]byte(mvccPrefix)) v, err := db.Get([]byte(mvccPrefix))
require.NoError(t, err) require.NoError(t, err)
...@@ -748,15 +755,15 @@ func TestDeletePrunedMavl(t *testing.T) { ...@@ -748,15 +755,15 @@ func TestDeletePrunedMavl(t *testing.T) {
require.Equal(t, v1, []byte("v1")) require.Equal(t, v1, []byte("v1"))
//测试再加入一条数据,即两条时候 //测试再加入一条数据,即两条时候
store.GetDB().Set([]byte(fmt.Sprintln(hashNodePrefix, "456")), []byte("v2")) store.GetDB().Set([]byte(fmt.Sprintln(hashNodePrefix, "123")), []byte("v1"))
deletePrunedMavlData(store.GetDB(), hashNodePrefix) deletePrunedMavlData(store.GetDB(), hashNodePrefix)
v1, err = store.GetDB().Get([]byte(fmt.Sprintln(hashNodePrefix, "123"))) v1, err = store.GetDB().Get([]byte(fmt.Sprintln(hashNodePrefix, "456")))
require.Error(t, err) require.Error(t, err)
require.Equal(t, v1, []uint8([]byte(nil))) require.Equal(t, v1, []uint8([]byte(nil)))
v2, err := store.GetDB().Get([]byte(fmt.Sprintln(hashNodePrefix, "456"))) v2, err := store.GetDB().Get([]byte(fmt.Sprintln(hashNodePrefix, "123")))
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, v2, []byte("v2")) require.Equal(t, v2, []byte("v1"))
wg.Add(1) wg.Add(1)
go deletePrunedMavl(store.GetDB()) go deletePrunedMavl(store.GetDB())
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment