Commit 88db31ff authored by vipwzw's avatar vipwzw

update chain33 master

parent cf38751f
......@@ -10,6 +10,8 @@ datadir*
.idea
.vscode
cmd/chain33/chain33
build/cert.pem
build/key.pem
build/chain33*
build/datadir
build/bityuan*
......
......@@ -195,6 +195,16 @@ func (bs *BlockStore) initQuickIndex(height int64) {
bs.saveQuickIndexFlag()
}
func (bs *BlockStore) isSeqCBExist(name string) bool {
value, err := bs.db.Get(calcSeqCBKey([]byte(name)))
if err == nil {
var cb types.BlockSeqCB
err = types.Decode(value, &cb)
return err == nil
}
return false
}
func (bs *BlockStore) seqCBNum() int64 {
counts := dbm.NewListHelper(bs.db).PrefixCount(seqCBPrefix)
return counts
......@@ -204,6 +214,8 @@ func (bs *BlockStore) addBlockSeqCB(cb *types.BlockSeqCB) error {
if len(cb.Name) > 128 || len(cb.URL) > 1024 {
return types.ErrInvalidParam
}
storeLog.Info("addBlockSeqCB", "key", string(calcSeqCBKey([]byte(cb.Name))), "value", cb)
return bs.db.SetSync(calcSeqCBKey([]byte(cb.Name)), types.Encode(cb))
}
......@@ -461,9 +473,9 @@ func (bs *BlockStore) LoadBlockByHash(hash []byte) (*types.BlockDetail, error) {
return &blockdetail, nil
}
//SaveBlock 批量保存blocks信息到db数据库中
func (bs *BlockStore) SaveBlock(storeBatch dbm.Batch, blockdetail *types.BlockDetail, sequence int64) error {
//SaveBlock 批量保存blocks信息到db数据库中,并返回最新的sequence值
func (bs *BlockStore) SaveBlock(storeBatch dbm.Batch, blockdetail *types.BlockDetail, sequence int64) (int64, error) {
var lastSequence int64 = -1
height := blockdetail.Block.Height
if len(blockdetail.Receipts) == 0 && len(blockdetail.Block.Txs) != 0 {
storeLog.Error("SaveBlock Receipts is nil ", "height", height)
......@@ -478,7 +490,7 @@ func (bs *BlockStore) SaveBlock(storeBatch dbm.Batch, blockdetail *types.BlockDe
body, err := proto.Marshal(&blockbody)
if err != nil {
storeLog.Error("SaveBlock Marshal blockbody", "height", height, "hash", common.ToHex(hash), "error", err)
return err
return lastSequence, err
}
storeBatch.Set(calcHashToBlockBodyKey(hash), body)
......@@ -499,7 +511,7 @@ func (bs *BlockStore) SaveBlock(storeBatch dbm.Batch, blockdetail *types.BlockDe
header, err := proto.Marshal(&blockheader)
if err != nil {
storeLog.Error("SaveBlock Marshal blockheader", "height", height, "hash", common.ToHex(hash), "error", err)
return err
return lastSequence, err
}
storeBatch.Set(calcHashToBlockHeaderKey(hash), header)
......@@ -517,19 +529,19 @@ func (bs *BlockStore) SaveBlock(storeBatch dbm.Batch, blockdetail *types.BlockDe
if isRecordBlockSequence || isParaChain {
//存储记录block序列执行的type add
err = bs.saveBlockSequence(storeBatch, hash, height, AddBlock, sequence)
lastSequence, err = bs.saveBlockSequence(storeBatch, hash, height, AddBlock, sequence)
if err != nil {
storeLog.Error("SaveBlock SaveBlockSequence", "height", height, "hash", common.ToHex(hash), "error", err)
return err
return lastSequence, err
}
}
storeLog.Debug("SaveBlock success", "blockheight", height, "hash", common.ToHex(hash))
return nil
return lastSequence, nil
}
//DelBlock 删除block信息从db数据库中
func (bs *BlockStore) DelBlock(storeBatch dbm.Batch, blockdetail *types.BlockDetail, sequence int64) error {
func (bs *BlockStore) DelBlock(storeBatch dbm.Batch, blockdetail *types.BlockDetail, sequence int64) (int64, error) {
var lastSequence int64 = -1
height := blockdetail.Block.Height
hash := blockdetail.Block.Hash()
......@@ -546,15 +558,15 @@ func (bs *BlockStore) DelBlock(storeBatch dbm.Batch, blockdetail *types.BlockDet
if isRecordBlockSequence || isParaChain {
//存储记录block序列执行的type del
err := bs.saveBlockSequence(storeBatch, hash, height, DelBlock, sequence)
lastSequence, err := bs.saveBlockSequence(storeBatch, hash, height, DelBlock, sequence)
if err != nil {
storeLog.Error("DelBlock SaveBlockSequence", "height", height, "hash", common.ToHex(hash), "error", err)
return err
return lastSequence, err
}
}
storeLog.Debug("DelBlock success", "blockheight", height, "hash", common.ToHex(hash))
return nil
return lastSequence, nil
}
//GetTx 通过tx hash 从db数据库中获取tx交易信息
......@@ -874,25 +886,28 @@ func (bs *BlockStore) setSeqCBLastNum(name []byte, num int64) error {
return bs.db.SetSync(caclSeqCBLastNumKey(name), types.Encode(&types.Int64{Data: num}))
}
//Seq的合法值从0开始的,所以没有获取到或者获取失败都应该返回-1
func (bs *BlockStore) getSeqCBLastNum(name []byte) int64 {
bytes, err := bs.db.Get(caclSeqCBLastNumKey(name))
if bytes == nil || err != nil {
if err != dbm.ErrNotFoundInDb {
return -1
storeLog.Error("getSeqCBLastNum", "error", err)
}
return 0
return -1
}
n, err := decodeHeight(bytes)
if err != nil {
return 0
return -1
}
storeLog.Error("getSeqCBLastNum", "name", string(name), "num", n)
return n
}
//SaveBlockSequence 存储block 序列执行的类型用于blockchain的恢复
//获取当前的序列号,将此序列号加1存储本block的hash ,当主链使能isRecordBlockSequence
// 平行链使能isParaChain时,sequence序列号是传入的
func (bs *BlockStore) saveBlockSequence(storeBatch dbm.Batch, hash []byte, height int64, Type int64, sequence int64) error {
func (bs *BlockStore) saveBlockSequence(storeBatch dbm.Batch, hash []byte, height int64, Type int64, sequence int64) (int64, error) {
var blockSequence types.BlockSequence
var newSequence int64
......@@ -921,7 +936,7 @@ func (bs *BlockStore) saveBlockSequence(storeBatch dbm.Batch, hash []byte, heigh
BlockSequenceByte, err := proto.Marshal(&blockSequence)
if err != nil {
storeLog.Error("SaveBlockSequence Marshal BlockSequence", "hash", common.ToHex(hash), "error", err)
return err
return newSequence, err
}
// seq->hash
......@@ -935,7 +950,7 @@ func (bs *BlockStore) saveBlockSequence(storeBatch dbm.Batch, hash []byte, heigh
Sequencebytes := types.Encode(&types.Int64{Data: newSequence})
storeBatch.Set(LastSequence, Sequencebytes)
return nil
return newSequence, nil
}
//LoadBlockBySequence 通过seq高度获取BlockDetail信息
......
......@@ -112,7 +112,7 @@ func TestBlockChain(t *testing.T) {
testRemoveOrphanBlock(t, blockchain)
testLoadBlockBySequence(t, blockchain)
testAddBlockSeqCB(t, blockchain)
testProcDelParaChainBlockMsg(t, mock33, blockchain)
testProcAddParaChainBlockMsg(t, mock33, blockchain)
......@@ -901,3 +901,33 @@ func testProcBlockChainFork(t *testing.T, blockchain *blockchain.BlockChain) {
blockchain.ProcBlockChainFork(curheight-1, curheight+256, "self")
chainlog.Info("testProcBlockChainFork end --------------------")
}
func testAddBlockSeqCB(t *testing.T, blockchain *blockchain.BlockChain) {
chainlog.Info("testAddBlockSeqCB begin ---------------------")
cb := &types.BlockSeqCB{
Name: "test",
URL: "http://192.168.1.107:15760",
Encode: "json",
}
err := blockchain.ProcAddBlockSeqCB(cb)
require.NoError(t, err)
cbs, err := blockchain.ProcListBlockSeqCB()
require.NoError(t, err)
exist := false
for _, temcb := range cbs.Items {
if temcb.Name == cb.Name {
exist = true
}
}
if !exist {
t.Error("testAddBlockSeqCB listSeqCB fail", "cb", cb, "cbs", cbs)
}
num := blockchain.ProcGetSeqCBLastNum(cb.Name)
if num != -1 {
t.Error("testAddBlockSeqCB getSeqCBLastNum", "num", num, "name", cb.Name)
}
chainlog.Info("testAddBlockSeqCB end -------------------------")
}
......@@ -85,6 +85,12 @@ func (chain *BlockChain) ProcRecvMsg() {
go chain.processMsg(msg, reqnum, chain.localPrefixCount)
case types.EventAddBlockSeqCB:
go chain.processMsg(msg, reqnum, chain.addBlockSeqCB)
case types.EventListBlockSeqCB:
go chain.processMsg(msg, reqnum, chain.listBlockSeqCB)
case types.EventGetSeqCBLastNum:
go chain.processMsg(msg, reqnum, chain.getSeqCBLastNum)
default:
go chain.processMsg(msg, reqnum, chain.unknowMsg)
}
......@@ -96,18 +102,36 @@ func (chain *BlockChain) unknowMsg(msg queue.Message) {
}
func (chain *BlockChain) addBlockSeqCB(msg queue.Message) {
if chain.blockStore.seqCBNum() >= MaxSeqCB {
msg.Reply(chain.client.NewMessage("rpc", types.EventAddBlockSeqCB, types.ErrTooManySeqCB))
return
reply := &types.Reply{
IsOk: true,
}
cb := (msg.Data).(*types.BlockSeqCB)
err := chain.blockStore.addBlockSeqCB(cb)
err := chain.ProcAddBlockSeqCB(cb)
if err != nil {
msg.Reply(chain.client.NewMessage("rpc", types.EventAddBlockSeqCB, err))
reply.IsOk = false
reply.Msg = []byte(err.Error())
msg.Reply(chain.client.NewMessage("rpc", types.EventAddBlockSeqCB, reply))
return
}
chain.pushseq.addTask(cb)
msg.ReplyErr("EventAddBlockSeqCB", nil)
msg.Reply(chain.client.NewMessage("rpc", types.EventAddBlockSeqCB, reply))
}
func (chain *BlockChain) listBlockSeqCB(msg queue.Message) {
cbs, err := chain.ProcListBlockSeqCB()
if err != nil {
chainlog.Error("listBlockSeqCB", "err", err.Error())
msg.Reply(chain.client.NewMessage("rpc", types.EventListBlockSeqCB, err))
return
}
msg.Reply(chain.client.NewMessage("rpc", types.EventListBlockSeqCB, cbs))
}
func (chain *BlockChain) getSeqCBLastNum(msg queue.Message) {
data := (msg.Data).(*types.ReqString)
num := chain.ProcGetSeqCBLastNum(data.Data)
lastNum := &types.Int64{Data: num}
msg.Reply(chain.client.NewMessage("rpc", types.EventGetSeqCBLastNum, lastNum))
}
func (chain *BlockChain) queryTx(msg queue.Message) {
......
......@@ -274,7 +274,6 @@ func (b *BlockChain) connectBestChain(node *blockNode, block *types.BlockDetail)
//将本block信息存储到数据库中,并更新bestchain的tip节点
func (b *BlockChain) connectBlock(node *blockNode, blockdetail *types.BlockDetail) (*types.BlockDetail, error) {
//blockchain close 时不再处理block
if atomic.LoadInt32(&b.isclosed) == 1 {
return nil, types.ErrIsClosed
......@@ -293,6 +292,8 @@ func (b *BlockChain) connectBlock(node *blockNode, blockdetail *types.BlockDetai
}
var err error
var lastSequence int64
block := blockdetail.Block
prevStateHash := b.bestChain.Tip().statehash
//广播或者同步过来的blcok需要调用执行模块
......@@ -308,27 +309,25 @@ func (b *BlockChain) connectBlock(node *blockNode, blockdetail *types.BlockDetai
}
//要更新node的信息
if node.pid == "self" {
//update node info
prevhash := node.hash
node.statehash = blockdetail.Block.GetStateHash()
node.hash = blockdetail.Block.Hash()
b.index.UpdateNode(prevhash, node)
}
beg := types.Now()
// 写入磁盘
//批量将block信息写入磁盘
beg := types.Now()
// 写入磁盘 批量将block信息写入磁盘
newbatch := b.blockStore.NewBatch(sync)
//保存tx信息到db中 (newbatch, blockdetail)
//保存tx信息到db中
err = b.blockStore.AddTxs(newbatch, blockdetail)
if err != nil {
chainlog.Error("connectBlock indexTxs:", "height", block.Height, "err", err)
return nil, err
}
//chainlog.Debug("connectBlock AddTxs!", "height", block.Height, "batchsync", sync)
//保存block信息到db中
err = b.blockStore.SaveBlock(newbatch, blockdetail, node.sequence)
lastSequence, err = b.blockStore.SaveBlock(newbatch, blockdetail, node.sequence)
if err != nil {
chainlog.Error("connectBlock SaveBlock:", "height", block.Height, "err", err)
return nil, err
......@@ -348,8 +347,6 @@ func (b *BlockChain) connectBlock(node *blockNode, blockdetail *types.BlockDetai
return nil, err
}
blocktd = new(big.Int).Add(difficulty, parenttd)
//chainlog.Error("connectBlock Difficulty", "height", block.Height, "parenttd.td", difficulty.BigToCompact(parenttd))
//chainlog.Error("connectBlock Difficulty", "height", block.Height, "self.td", difficulty.BigToCompact(blocktd))
}
err = b.blockStore.SaveTdByBlockHash(newbatch, blockdetail.Block.Hash(), blocktd)
......@@ -389,12 +386,16 @@ func (b *BlockChain) connectBlock(node *blockNode, blockdetail *types.BlockDetai
b.SendBlockBroadcast(blockdetail)
}
}
b.pushseq.updateSeq(node.sequence)
//目前非平行链并开启isRecordBlockSequence功能
if isRecordBlockSequence && !isParaChain {
b.pushseq.updateSeq(lastSequence)
}
return blockdetail, nil
}
//从主链中删除blocks
func (b *BlockChain) disconnectBlock(node *blockNode, blockdetail *types.BlockDetail, sequence int64) error {
var lastSequence int64
// 只能从 best chain tip节点开始删除
if !bytes.Equal(node.hash, b.bestChain.Tip().hash) {
chainlog.Error("disconnectBlock:", "height", blockdetail.Block.Height, "node.hash", common.ToHex(node.hash), "bestChain.top.hash", common.ToHex(b.bestChain.Tip().hash))
......@@ -412,7 +413,7 @@ func (b *BlockChain) disconnectBlock(node *blockNode, blockdetail *types.BlockDe
}
//从db中删除block相关的信息
err = b.blockStore.DelBlock(newbatch, blockdetail, sequence)
lastSequence, err = b.blockStore.DelBlock(newbatch, blockdetail, sequence)
if err != nil {
chainlog.Error("disconnectBlock DelBlock:", "height", blockdetail.Block.Height, "err", err)
return err
......@@ -450,7 +451,11 @@ func (b *BlockChain) disconnectBlock(node *blockNode, blockdetail *types.BlockDe
chainlog.Debug("disconnectBlock success", "newtipnode.height", newtipnode.height, "node.parent.height", node.parent.height)
chainlog.Debug("disconnectBlock success", "newtipnode.hash", common.ToHex(newtipnode.hash), "delblock.parent.hash", common.ToHex(blockdetail.Block.GetParentHash()))
b.pushseq.updateSeq(node.sequence)
//目前非平行链并开启isRecordBlockSequence功能
if isRecordBlockSequence && !isParaChain {
b.pushseq.updateSeq(lastSequence)
}
return nil
}
......
......@@ -11,6 +11,7 @@ import (
"github.com/33cn/chain33/types"
)
//pushNotify push Notify
type pushNotify struct {
cb chan *types.BlockSeqCB
seq chan int64
......@@ -41,13 +42,16 @@ func (p *pushseq) init() {
}
}
func (p *pushseq) updateLastSeq() {
//只更新本cb的seq值,每次add一个新cb时如果刷新所有的cb,会耗时很长在初始化时
func (p *pushseq) updateLastSeq(name string) {
last, err := p.store.LoadBlockLastSequence()
if err != nil {
chainlog.Error("listSeqCB", "err", err)
chainlog.Error("LoadBlockLastSequence", "err", err)
return
}
p.updateSeq(last)
notify := p.cmds[name]
notify.seq <- last
}
//每个name 有一个task
......@@ -57,6 +61,7 @@ func (p *pushseq) addTask(cb *types.BlockSeqCB) {
if notify, ok := p.cmds[cb.Name]; ok {
notify.cb <- cb
if cb.URL == "" {
chainlog.Debug("delete callback", "cb", cb)
delete(p.cmds, cb.Name)
}
return
......@@ -67,14 +72,23 @@ func (p *pushseq) addTask(cb *types.BlockSeqCB) {
}
p.cmds[cb.Name].cb <- cb
p.runTask(p.cmds[cb.Name])
//更新最新的seq
p.updateLastSeq()
p.updateLastSeq(cb.Name)
chainlog.Debug("runTask callback", "cb", cb)
}
func (p *pushseq) updateSeq(seq int64) {
p.mu.Lock()
defer p.mu.Unlock()
for _, notify := range p.cmds {
//如果有seq, 那么先读一个出来
select {
case <-notify.seq:
default:
}
//再写入seq(一定不会block,因为加了lock,不存在两个同时写channel的情况)
notify.seq <- seq
}
}
......@@ -127,8 +141,8 @@ func (p *pushseq) runTask(input pushNotify) {
err = p.postData(cb, data)
if err != nil {
chainlog.Error("postdata", "err", err)
//sleep 10s
p.trigeRun(run, 10000*time.Millisecond)
//sleep 60s
p.trigeRun(run, 60000*time.Millisecond)
continue
}
//update seqid
......@@ -141,6 +155,7 @@ func (p *pushseq) runTask(input pushNotify) {
func (p *pushseq) postData(cb *types.BlockSeqCB, data *types.BlockSeq) (err error) {
var postdata []byte
if cb.Encode == "json" {
postdata, err = types.PBToJSON(data)
if err != nil {
......@@ -149,6 +164,7 @@ func (p *pushseq) postData(cb *types.BlockSeqCB, data *types.BlockSeq) (err erro
} else {
postdata = types.Encode(data)
}
//post data in body
var buf bytes.Buffer
g := gzip.NewWriter(&buf)
......@@ -163,6 +179,7 @@ func (p *pushseq) postData(cb *types.BlockSeqCB, data *types.BlockSeq) (err erro
if err != nil {
return err
}
req.Header.Set("Content-Type", "text/plain")
req.Header.Set("Content-Encoding", "gzip")
resp, err := p.client.Do(req)
......@@ -175,8 +192,10 @@ func (p *pushseq) postData(cb *types.BlockSeqCB, data *types.BlockSeq) (err erro
return err
}
if string(body) != "ok" && string(body) != "OK" {
chainlog.Error("postData fail", "cb.name", cb.Name, "body", string(body))
return types.ErrPushSeqPostData
}
chainlog.Debug("postData success", "cb.name", cb.Name, "SeqNum", data.Num)
p.store.setSeqCBLastNum([]byte(cb.Name), data.Num)
return nil
}
......
......@@ -86,3 +86,40 @@ func (chain *BlockChain) ProcGetSeqByHash(hash []byte) (int64, error) {
return seq, err
}
//ProcAddBlockSeqCB 添加seq callback
func (chain *BlockChain) ProcAddBlockSeqCB(cb *types.BlockSeqCB) error {
if cb == nil {
return types.ErrInvalidParam
}
if chain.blockStore.seqCBNum() >= MaxSeqCB && !chain.blockStore.isSeqCBExist(cb.Name) {
return types.ErrTooManySeqCB
}
err := chain.blockStore.addBlockSeqCB(cb)
if err != nil {
return err
}
chain.pushseq.addTask(cb)
return nil
}
//ProcListBlockSeqCB 列出所有已经设置的seq callback
func (chain *BlockChain) ProcListBlockSeqCB() (*types.BlockSeqCBs, error) {
cbs, err := chain.blockStore.listSeqCB()
if err != nil {
chainlog.Error("ProcListBlockSeqCB", "err", err.Error())
return nil, err
}
var listSeqCBs types.BlockSeqCBs
listSeqCBs.Items = append(listSeqCBs.Items, cbs...)
return &listSeqCBs, nil
}
//ProcGetSeqCBLastNum 获取指定name的callback已经push的最新seq num
func (chain *BlockChain) ProcGetSeqCBLastNum(name string) int64 {
num := chain.blockStore.getSeqCBLastNum([]byte(name))
return num
}
......@@ -5,4 +5,6 @@ COPY chain33 chain33
COPY chain33-cli chain33-cli
COPY chain33.toml ./
RUN ./chain33-cli cert --host=127.0.0.1
CMD ["/root/chain33", "-f", "/root/chain33.toml"]
......@@ -11,6 +11,29 @@ type QueueProtocolAPI struct {
mock.Mock
}
// AddSeqCallBack provides a mock function with given fields: param
func (_m *QueueProtocolAPI) AddSeqCallBack(param *types.BlockSeqCB) (*types.Reply, error) {
ret := _m.Called(param)
var r0 *types.Reply
if rf, ok := ret.Get(0).(func(*types.BlockSeqCB) *types.Reply); ok {
r0 = rf(param)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*types.Reply)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(*types.BlockSeqCB) error); ok {
r1 = rf(param)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// Close provides a mock function with given fields:
func (_m *QueueProtocolAPI) Close() {
_m.Called()
......@@ -453,6 +476,29 @@ func (_m *QueueProtocolAPI) GetSeed(param *types.GetSeedByPw) (*types.ReplySeed,
return r0, r1
}
// GetSeqCallBackLastNum provides a mock function with given fields: param
func (_m *QueueProtocolAPI) GetSeqCallBackLastNum(param *types.ReqString) (*types.Int64, error) {
ret := _m.Called(param)
var r0 *types.Int64
if rf, ok := ret.Get(0).(func(*types.ReqString) *types.Int64); ok {
r0 = rf(param)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*types.Int64)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(*types.ReqString) error); ok {
r1 = rf(param)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// GetTransactionByAddr provides a mock function with given fields: param
func (_m *QueueProtocolAPI) GetTransactionByAddr(param *types.ReqAddr) (*types.ReplyTxInfos, error) {
ret := _m.Called(param)
......@@ -591,6 +637,29 @@ func (_m *QueueProtocolAPI) IsSync() (*types.Reply, error) {
return r0, r1
}
// ListSeqCallBack provides a mock function with given fields:
func (_m *QueueProtocolAPI) ListSeqCallBack() (*types.BlockSeqCBs, error) {
ret := _m.Called()
var r0 *types.BlockSeqCBs
if rf, ok := ret.Get(0).(func() *types.BlockSeqCBs); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*types.BlockSeqCBs)
}
}
var r1 error
if rf, ok := ret.Get(1).(func() error); ok {
r1 = rf()
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// LocalGet provides a mock function with given fields: param
func (_m *QueueProtocolAPI) LocalGet(param *types.LocalDBGet) (*types.LocalReplyValue, error) {
ret := _m.Called(param)
......
......@@ -983,3 +983,45 @@ func (q *QueueProtocol) GetTicketCount() (*types.Int64, error) {
}
return nil, types.ErrTypeAsset
}
// AddSeqCallBack Add Seq CallBack
func (q *QueueProtocol) AddSeqCallBack(param *types.BlockSeqCB) (*types.Reply, error) {
msg, err := q.query(blockchainKey, types.EventAddBlockSeqCB, param)
if err != nil {
log.Error("AddSeqCallBack", "Error", err.Error())
return nil, err
}
if reply, ok := msg.GetData().(*types.Reply); ok {
return reply, nil
}
return nil, types.ErrTypeAsset
}
// ListSeqCallBack List Seq CallBacks
func (q *QueueProtocol) ListSeqCallBack() (*types.BlockSeqCBs, error) {
msg, err := q.query(blockchainKey, types.EventListBlockSeqCB, &types.ReqNil{})
if err != nil {
log.Error("ListSeqCallBack", "Error", err.Error())
return nil, err
}
if reply, ok := msg.GetData().(*types.BlockSeqCBs); ok {
return reply, nil
}
return nil, types.ErrTypeAsset
}
// GetSeqCallBackLastNum Get Seq Call Back Last Num
func (q *QueueProtocol) GetSeqCallBackLastNum(param *types.ReqString) (*types.Int64, error) {
msg, err := q.query(blockchainKey, types.EventGetSeqCBLastNum, param)
if err != nil {
log.Error("ListSeqCallBack", "Error", err.Error())
return nil, err
}
if reply, ok := msg.GetData().(*types.Int64); ok {
return reply, nil
}
return nil, types.ErrTypeAsset
}
......@@ -124,4 +124,11 @@ type QueueProtocolAPI interface {
// close chain33
CloseQueue() (*types.Reply, error)
// --------------- other interfaces end
// types.EventAddBlockSeqCB
AddSeqCallBack(param *types.BlockSeqCB) (*types.Reply, error)
// types.EventListBlockSeqCB
ListSeqCallBack() (*types.BlockSeqCBs, error)
// types.EventGetSeqCBLastNum
GetSeqCallBackLastNum(param *types.ReqString) (*types.Int64, error)
}
......@@ -46,6 +46,9 @@ grpcBindAddr="localhost:8802"
whitelist=["127.0.0.1"]
jrpcFuncWhitelist=["*"]
grpcFuncWhitelist=["*"]
enableTLS=false
certFile="cert.pem"
keyFile="key.pem"
[mempool]
maxTxNumPerAccount=100
......
......@@ -59,6 +59,9 @@ grpcBindAddr="localhost:8802"
whitelist=["127.0.0.1"]
jrpcFuncWhitelist=["*"]
grpcFuncWhitelist=["*"]
enableTLS=false
certFile="cert.pem"
keyFile="key.pem"
[mempool]
poolCacheSize=10240
......
......@@ -59,6 +59,9 @@ grpcBindAddr="localhost:8802"
whitelist=["127.0.0.1"]
jrpcFuncWhitelist=["*"]
grpcFuncWhitelist=["*"]
enableTLS=true
certFile="cert.pem"
keyFile="key.pem"
[mempool]
poolCacheSize=10240
......
......@@ -11,6 +11,7 @@ import (
"testing"
"github.com/33cn/chain33/common/address"
"github.com/33cn/chain33/common/merkle"
_ "github.com/33cn/chain33/system"
"github.com/33cn/chain33/types"
"github.com/33cn/chain33/util"
......@@ -214,6 +215,34 @@ func TestExecBlock2(t *testing.T) {
}
}
var zeroHash [32]byte
func TestSameTx(t *testing.T) {
mock33 := newMockNode()
defer mock33.Close()
newblock := &types.Block{}
newblock.Height = 1
newblock.BlockTime = types.Now().Unix()
newblock.ParentHash = zeroHash[:]
newblock.Txs = util.GenNoneTxs(mock33.GetGenesisKey(), 3)
hash1 := merkle.CalcMerkleRoot(newblock.Txs)
newblock.Txs = append(newblock.Txs, newblock.Txs[2])
newblock.TxHash = merkle.CalcMerkleRoot(newblock.Txs)
assert.Equal(t, hash1, newblock.TxHash)
_, _, err := util.ExecBlock(mock33.GetClient(), nil, newblock, true, true)
assert.Equal(t, types.ErrTxDup, err)
//情况2
//[tx1,xt2,tx3,tx4,tx5,tx6] and [tx1,xt2,tx3,tx4,tx5,tx6,tx5,tx6]
newblock.Txs = util.GenNoneTxs(mock33.GetGenesisKey(), 6)
hash1 = merkle.CalcMerkleRoot(newblock.Txs)
newblock.Txs = append(newblock.Txs, newblock.Txs[4:]...)
newblock.TxHash = merkle.CalcMerkleRoot(newblock.Txs)
assert.Equal(t, hash1, newblock.TxHash)
_, _, err = util.ExecBlock(mock33.GetClient(), nil, newblock, true, true)
assert.Equal(t, types.ErrTxDup, err)
}
func TestExecBlock(t *testing.T) {
mock33 := newMockNode()
defer mock33.Close()
......
......@@ -47,6 +47,11 @@ func (g *Grpc) CreateTransaction(ctx context.Context, in *pb.CreateTxIn) (*pb.Un
if err != nil {
return nil, err
}
//decode protocol buffer
err = pb.Decode(in.Payload, msg)
if err != nil {
return nil, err
}
reply, err := pb.CallCreateTx(string(in.Execer), in.ActionName, msg)
if err != nil {
return nil, err
......
......@@ -80,7 +80,10 @@ func (j *JSONRPCServer) Listen() (int, error) {
if err != nil {
errstr = err.Error()
}
funcName := strings.Split(client.Method, ".")[len(strings.Split(client.Method, "."))-1]
if !checkFilterPrintFuncBlacklist(funcName) {
log.Debug("JSONRPCServer", "request", string(data), "err", errstr)
}
if err != nil {
writeError(w, r, 0, fmt.Sprintf(`parse request err %s`, err.Error()))
return
......@@ -88,7 +91,7 @@ func (j *JSONRPCServer) Listen() (int, error) {
//Release local request
ipaddr := net.ParseIP(ip)
if !ipaddr.IsLoopback() {
funcName := strings.Split(client.Method, ".")[len(strings.Split(client.Method, "."))-1]
//funcName := strings.Split(client.Method, ".")[len(strings.Split(client.Method, "."))-1]
if checkJrpcFuncBlacklist(funcName) || !checkJrpcFuncWhitelist(funcName) {
writeError(w, r, client.ID, fmt.Sprintf(`The %s method is not authorized!`, funcName))
return
......@@ -109,7 +112,11 @@ func (j *JSONRPCServer) Listen() (int, error) {
})
handler = co.Handler(handler)
if !rpcCfg.EnableTLS {
go http.Serve(listener, handler)
} else {
go http.ServeTLS(listener, handler, rpcCfg.CertFile, rpcCfg.KeyFile)
}
return listener.Addr().(*net.TCPAddr).Port, nil
}
......
......@@ -165,43 +165,8 @@ func (c *Chain33) GetBlocks(in rpctypes.BlockParam, result *interface{}) error {
{
var blockDetails rpctypes.BlockDetails
items := reply.GetItems()
for _, item := range items {
var bdtl rpctypes.BlockDetail
var block rpctypes.Block
block.BlockTime = item.Block.GetBlockTime()
block.Height = item.Block.GetHeight()
block.Version = item.Block.GetVersion()
block.ParentHash = common.ToHex(item.Block.GetParentHash())
block.StateHash = common.ToHex(item.Block.GetStateHash())
block.TxHash = common.ToHex(item.Block.GetTxHash())
txs := item.Block.GetTxs()
if in.Isdetail && len(txs) != len(item.Receipts) { //只有获取详情时才需要校验txs和Receipts的数量是否相等CHAIN33-540
return types.ErrDecode
}
for _, tx := range txs {
tran, err := rpctypes.DecodeTx(tx)
if err != nil {
continue
}
block.Txs = append(block.Txs, tran)
}
bdtl.Block = &block
for i, rp := range item.Receipts {
var recp rpctypes.ReceiptData
recp.Ty = rp.GetTy()
for _, log := range rp.Logs {
recp.Logs = append(recp.Logs,
&rpctypes.ReceiptLog{Ty: log.Ty, Log: common.ToHex(log.GetLog())})
}
rd, err := rpctypes.DecodeLog(txs[i].Execer, &recp)
if err != nil {
continue
}
bdtl.Receipts = append(bdtl.Receipts, rd)
}
blockDetails.Items = append(blockDetails.Items, &bdtl)
if err := convertBlockDetails(items, &blockDetails, in.Isdetail); err != nil {
return err
}
*result = &blockDetails
}
......@@ -1139,7 +1104,14 @@ func (c *Chain33) GetBlockByHashes(in rpctypes.ReqHashes, result *interface{}) e
if err != nil {
return err
}
*result = reply
{
var blockDetails rpctypes.BlockDetails
items := reply.Items
if err := convertBlockDetails(items, &blockDetails, !in.DisableDetail); err != nil {
return err
}
*result = &blockDetails
}
return nil
}
......@@ -1148,16 +1120,11 @@ func (c *Chain33) CreateTransaction(in *rpctypes.CreateTxIn, result *interface{}
if in == nil {
return types.ErrInvalidParam
}
exec := types.LoadExecutorType(in.Execer)
if exec == nil {
return types.ErrExecNameNotAllow
}
tx, err := exec.CreateTx(in.ActionName, in.Payload)
btx, err := types.CallCreateTxJSON(in.Execer, in.ActionName, in.Payload)
if err != nil {
log.Error("CreateTransaction", "err", err.Error())
return err
}
*result = hex.EncodeToString(types.Encode(tx))
*result = hex.EncodeToString(btx)
return nil
}
......@@ -1177,3 +1144,80 @@ func (c *Chain33) GetExecBalance(in *types.ReqGetExecBalance, result *interface{
*result = hex.EncodeToString(types.Encode(resp))
return nil
}
// AddSeqCallBack add Seq CallBack
func (c *Chain33) AddSeqCallBack(in *types.BlockSeqCB, result *interface{}) error {
reply, err := c.cli.AddSeqCallBack(in)
log.Error("AddSeqCallBack", "err", err, "reply", reply)
if err != nil {
return err
}
var resp rpctypes.Reply
resp.IsOk = reply.GetIsOk()
resp.Msg = string(reply.GetMsg())
*result = &resp
return nil
}
// ListSeqCallBack List Seq CallBack
func (c *Chain33) ListSeqCallBack(in *types.ReqNil, result *interface{}) error {
resp, err := c.cli.ListSeqCallBack()
if err != nil {
return err
}
*result = resp
return nil
}
// GetSeqCallBackLastNum Get Seq Call Back Last Num
func (c *Chain33) GetSeqCallBackLastNum(in *types.ReqString, result *interface{}) error {
resp, err := c.cli.GetSeqCallBackLastNum(in)
if err != nil {
return err
}
*result = resp
return nil
}
func convertBlockDetails(details []*types.BlockDetail, retDetails *rpctypes.BlockDetails, isDetail bool) error {
for _, item := range details {
var bdtl rpctypes.BlockDetail
var block rpctypes.Block
block.BlockTime = item.Block.GetBlockTime()
block.Height = item.Block.GetHeight()
block.Version = item.Block.GetVersion()
block.ParentHash = common.ToHex(item.Block.GetParentHash())
block.StateHash = common.ToHex(item.Block.GetStateHash())
block.TxHash = common.ToHex(item.Block.GetTxHash())
txs := item.Block.GetTxs()
if isDetail && len(txs) != len(item.Receipts) { //只有获取详情时才需要校验txs和Receipts的数量是否相等CHAIN33-540
return types.ErrDecode
}
for _, tx := range txs {
tran, err := rpctypes.DecodeTx(tx)
if err != nil {
continue
}
block.Txs = append(block.Txs, tran)
}
bdtl.Block = &block
for i, rp := range item.Receipts {
var recp rpctypes.ReceiptData
recp.Ty = rp.GetTy()
for _, log := range rp.Logs {
recp.Logs = append(recp.Logs,
&rpctypes.ReceiptLog{Ty: log.Ty, Log: common.ToHex(log.GetLog())})
}
rd, err := rpctypes.DecodeLog(txs[i].Execer, &recp)
if err != nil {
continue
}
bdtl.Receipts = append(bdtl.Receipts, rd)
}
retDetails.Items = append(retDetails.Items, &bdtl)
}
return nil
}
......@@ -1229,7 +1229,7 @@ func TestChain33_CreateTransaction(t *testing.T) {
in := &rpctypes.CreateTxIn{Execer: "notExist", ActionName: "x", Payload: []byte("x")}
err = client.CreateTransaction(in, &result)
assert.Equal(t, types.ErrExecNameNotAllow, err)
assert.Equal(t, types.ErrNotSupport, err)
in = &rpctypes.CreateTxIn{Execer: types.ExecName("coins"), ActionName: "notExist", Payload: []byte("x")}
err = client.CreateTransaction(in, &result)
......
......@@ -7,6 +7,7 @@ package jsonclient
import (
"bytes"
"crypto/tls"
"encoding/json"
"fmt"
"io/ioutil"
......@@ -21,6 +22,8 @@ import (
type JSONClient struct {
url string
prefix string
tlsVerify bool
client *http.Client
}
func addPrefix(prefix, name string) string {
......@@ -32,12 +35,21 @@ func addPrefix(prefix, name string) string {
// NewJSONClient produce a json object
func NewJSONClient(url string) (*JSONClient, error) {
return &JSONClient{url: url, prefix: "Chain33"}, nil
return New("Chain33", url, false)
}
// New produce a jsonclient by perfix and url
func New(prefix, url string) (*JSONClient, error) {
return &JSONClient{url: url, prefix: prefix}, nil
func New(prefix, url string, tlsVerify bool) (*JSONClient, error) {
httpcli := http.DefaultClient
if strings.Contains(url, "https") { //暂不校验tls证书
httpcli = &http.Client{Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: !tlsVerify}}}
}
return &JSONClient{
url: url,
prefix: prefix,
tlsVerify: tlsVerify,
client: httpcli,
}, nil
}
type clientRequest struct {
......@@ -63,7 +75,7 @@ func (client *JSONClient) Call(method string, params, resp interface{}) error {
return err
}
//println("request JsonStr", string(data), "")
postresp, err := http.Post(client.url, "application/json", bytes.NewBuffer(data))
postresp, err := client.client.Post(client.url, "application/json", bytes.NewBuffer(data))
if err != nil {
return err
}
......
......@@ -14,8 +14,8 @@ import (
"github.com/33cn/chain33/queue"
"github.com/33cn/chain33/types"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
_ "google.golang.org/grpc/encoding/gzip" // register gzip
)
......@@ -26,6 +26,7 @@ var (
grpcFuncWhitelist = make(map[string]bool)
jrpcFuncBlacklist = make(map[string]bool)
grpcFuncBlacklist = make(map[string]bool)
rpcFilterPrintFuncBlacklist = make(map[string]bool)
)
// Chain33 a channel client
......@@ -149,6 +150,14 @@ func NewGRpcServer(c queue.Client, api client.QueueProtocolAPI) *Grpcserver {
return handler(ctx, req)
}
opts = append(opts, grpc.UnaryInterceptor(interceptor))
if rpcCfg.EnableTLS {
creds, err := credentials.NewServerTLSFromFile(rpcCfg.CertFile, rpcCfg.KeyFile)
if err != nil {
panic(err)
}
credsOps := grpc.Creds(creds)
opts = append(opts, credsOps)
}
server := grpc.NewServer(opts...)
s.s = server
types.RegisterChain33Server(server, s.grpc)
......@@ -182,6 +191,7 @@ func InitCfg(cfg *types.RPC) {
InitGrpcFuncWhitelist(cfg)
InitJrpcFuncBlacklist(cfg)
InitGrpcFuncBlacklist(cfg)
InitFilterPrintFuncBlacklist()
}
// New produce a rpc by cfg
......@@ -346,3 +356,19 @@ func InitGrpcFuncBlacklist(cfg *types.RPC) {
grpcFuncBlacklist[funcName] = true
}
}
// InitFilterPrintFuncBlacklist rpc模块打印requet信息时需要过滤掉一些敏感接口的入参打印,比如钱包密码相关的
func InitFilterPrintFuncBlacklist() {
rpcFilterPrintFuncBlacklist["UnLock"] = true
rpcFilterPrintFuncBlacklist["SetPasswd"] = true
rpcFilterPrintFuncBlacklist["GetSeed"] = true
rpcFilterPrintFuncBlacklist["SaveSeed"] = true
rpcFilterPrintFuncBlacklist["ImportPrivkey"] = true
}
func checkFilterPrintFuncBlacklist(funcName string) bool {
if _, ok := rpcFilterPrintFuncBlacklist[funcName]; ok {
return true
}
return false
}
......@@ -122,6 +122,7 @@ func DecodeTx(tx *types.Transaction) (*Transaction, error) {
GroupCount: tx.GroupCount,
Header: common.ToHex(tx.Header),
Next: common.ToHex(tx.Next),
Hash: common.ToHex(tx.Hash()),
}
if result.Amount != 0 {
result.AmountFmt = strconv.FormatFloat(float64(result.Amount)/float64(types.Coin), 'f', 4, 64)
......
......@@ -83,6 +83,7 @@ type Transaction struct {
GroupCount int32 `json:"groupCount,omitempty"`
Header string `json:"header,omitempty"`
Next string `json:"next,omitempty"`
Hash string `json:"hash,omitempty"`
}
// ReceiptLog defines receipt log command
......
......@@ -47,6 +47,12 @@ func New(cfg *types.Consensus, sub []byte) queue.Module {
if subcfg.WaitTxMs == 0 {
subcfg.WaitTxMs = 1000
}
if subcfg.Genesis == "" {
subcfg.Genesis = cfg.Genesis
}
if subcfg.GenesisBlockTime == 0 {
subcfg.GenesisBlockTime = cfg.GenesisBlockTime
}
solo := &Client{c, &subcfg, time.Duration(subcfg.WaitTxMs) * time.Millisecond}
c.SetChild(solo)
return solo
......
......@@ -35,6 +35,9 @@ func BlockCmd() *cobra.Command {
GetBlockByHashsCmd(),
GetBlockSequencesCmd(),
GetLastBlockSequenceCmd(),
AddBlockSeqCallBackCmd(),
ListBlockSeqCallBackCmd(),
GetSeqCallBackLastNumCmd(),
)
return cmd
......@@ -295,3 +298,89 @@ func getblockbyhashs(cmd *cobra.Command, args []string) {
//ctx.SetResultCb(parseQueryTxsByHashesRes)
ctx.Run()
}
// AddBlockSeqCallBackCmd add block sequence call back
func AddBlockSeqCallBackCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "add_callback",
Short: "add block sequence call back",
Run: addblockSeqCallBackCmd,
}
addblockSeqCallBackCmdFlags(cmd)
return cmd
}
func addblockSeqCallBackCmdFlags(cmd *cobra.Command) {
cmd.Flags().StringP("name", "n", "", "call back name")
cmd.MarkFlagRequired("name")
cmd.Flags().StringP("url", "u", "", "call back URL")
cmd.MarkFlagRequired("url")
cmd.Flags().StringP("encode", "e", "", "data encode type,json or proto buff")
cmd.MarkFlagRequired("encode")
}
func addblockSeqCallBackCmd(cmd *cobra.Command, args []string) {
rpcLaddr, _ := cmd.Flags().GetString("rpc_laddr")
name, _ := cmd.Flags().GetString("name")
url, _ := cmd.Flags().GetString("url")
encode, _ := cmd.Flags().GetString("encode")
params := types.BlockSeqCB{
Name: name,
URL: url,
Encode: encode,
}
var res rpctypes.Reply
ctx := jsonclient.NewRPCCtx(rpcLaddr, "Chain33.AddSeqCallBack", params, &res)
ctx.Run()
}
// ListBlockSeqCallBackCmd list block sequence call back
func ListBlockSeqCallBackCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "list_callback",
Short: "list block sequence call back",
Run: listBlockSeqCallBackCmd,
}
return cmd
}
func listBlockSeqCallBackCmd(cmd *cobra.Command, args []string) {
rpcLaddr, _ := cmd.Flags().GetString("rpc_laddr")
var res types.BlockSeqCBs
ctx := jsonclient.NewRPCCtx(rpcLaddr, "Chain33.ListSeqCallBack", nil, &res)
ctx.Run()
}
// GetSeqCallBackLastNumCmd Get Seq Call Back Last Num
func GetSeqCallBackLastNumCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "last_callback_sequence",
Short: "last call back sequence by name",
Run: getSeqCallBackLastNumCmd,
}
getSeqCallBackLastNumCmdFlags(cmd)
return cmd
}
func getSeqCallBackLastNumCmdFlags(cmd *cobra.Command) {
cmd.Flags().StringP("name", "n", "", "call back name")
cmd.MarkFlagRequired("name")
}
func getSeqCallBackLastNumCmd(cmd *cobra.Command, args []string) {
rpcLaddr, _ := cmd.Flags().GetString("rpc_laddr")
name, _ := cmd.Flags().GetString("name")
params := types.ReqString{
Data: name,
}
var res types.Int64
ctx := jsonclient.NewRPCCtx(rpcLaddr, "Chain33.GetSeqCallBackLastNum", params, &res)
ctx.Run()
}
package commands
import (
"crypto/ecdsa"
"crypto/elliptic"
"crypto/rand"
"crypto/rsa"
"crypto/x509"
"crypto/x509/pkix"
"encoding/pem"
"fmt"
"math/big"
"net"
"os"
"strings"
"time"
"github.com/spf13/cobra"
)
//CertCmd generate cert
func CertCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "cert",
Short: "generate cert",
Run: func(cmd *cobra.Command, args []string) {
host, _ := cmd.Flags().GetString("host")
validFrom, _ := cmd.Flags().GetString("start-date")
ecdsaCurve, _ := cmd.Flags().GetString("ecdsa-curve")
rsaBits, _ := cmd.Flags().GetInt("rsa-bits")
isCA, _ := cmd.Flags().GetBool("ca")
validFor, _ := cmd.Flags().GetDuration("duration")
certGenerate(host, validFrom, ecdsaCurve, rsaBits, isCA, validFor)
},
}
addCertFlags(cmd)
return cmd
}
func addCertFlags(cmd *cobra.Command) {
cmd.Flags().StringP("host", "", "", "Comma-separated hostnames and IPs to generate a certificate for")
cmd.Flags().StringP("start-date", "", "", "Creation date formatted as Jan 1 15:04:05 2011")
cmd.Flags().StringP("ecdsa-curve", "", "", "ECDSA curve to use to generate a key. Valid values are P224, P256 (recommended), P384, P521")
cmd.Flags().IntP("rsa-bits", "", 2048, "Size of RSA key to generate. Ignored if --ecdsa-curve is set")
cmd.Flags().Bool("ca", false, "whether this cert should be its own Certificate Authority")
cmd.Flags().Duration("duration", 365*24*time.Hour, "Duration that certificate is valid for")
cmd.MarkFlagRequired("host")
}
func publicKey(priv interface{}) interface{} {
switch k := priv.(type) {
case *rsa.PrivateKey:
return &k.PublicKey
case *ecdsa.PrivateKey:
return &k.PublicKey
default:
return nil
}
}
func pemBlockForKey(priv interface{}) *pem.Block {
switch k := priv.(type) {
case *rsa.PrivateKey:
return &pem.Block{Type: "RSA PRIVATE KEY", Bytes: x509.MarshalPKCS1PrivateKey(k)}
case *ecdsa.PrivateKey:
b, err := x509.MarshalECPrivateKey(k)
if err != nil {
fmt.Fprintf(os.Stderr, "Unable to marshal ECDSA private key: %v", err)
os.Exit(2)
}
return &pem.Block{Type: "EC PRIVATE KEY", Bytes: b}
default:
return nil
}
}
func certGenerate(host, validFrom, ecdsaCurve string, rsaBits int, isCA bool, validFor time.Duration) {
var priv interface{}
var err error
switch ecdsaCurve {
case "":
priv, err = rsa.GenerateKey(rand.Reader, rsaBits)
case "P224":
priv, err = ecdsa.GenerateKey(elliptic.P224(), rand.Reader)
case "P256":
priv, err = ecdsa.GenerateKey(elliptic.P256(), rand.Reader)
case "P384":
priv, err = ecdsa.GenerateKey(elliptic.P384(), rand.Reader)
case "P521":
priv, err = ecdsa.GenerateKey(elliptic.P521(), rand.Reader)
default:
fmt.Fprintf(os.Stderr, "Unrecognized elliptic curve: %q", ecdsaCurve)
os.Exit(1)
}
if err != nil {
fmt.Fprintf(os.Stderr, "failed to generate private key: %s\n", err)
os.Exit(1)
}
var notBefore time.Time
if len(validFrom) == 0 {
notBefore = time.Now()
} else {
notBefore, err = time.Parse("Jan 2 15:04:05 2006", validFrom)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to parse creation date: %s\n", err)
os.Exit(1)
}
}
notAfter := notBefore.Add(validFor)
serialNumberLimit := new(big.Int).Lsh(big.NewInt(1), 128)
serialNumber, err := rand.Int(rand.Reader, serialNumberLimit)
if err != nil {
fmt.Fprintf(os.Stderr, "failed to generate serial number: %s", err)
os.Exit(1)
}
template := x509.Certificate{
SerialNumber: serialNumber,
Subject: pkix.Name{
Organization: []string{"Acme Co"},
},
NotBefore: notBefore,
NotAfter: notAfter,
KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature,
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth},
BasicConstraintsValid: true,
}
hosts := strings.Split(host, ",")
for _, h := range hosts {
if ip := net.ParseIP(h); ip != nil {
template.IPAddresses = append(template.IPAddresses, ip)
} else {
template.DNSNames = append(template.DNSNames, h)
}
}
if isCA {
template.IsCA = true
template.KeyUsage |= x509.KeyUsageCertSign
}
derBytes, err := x509.CreateCertificate(rand.Reader, &template, &template, publicKey(priv), priv)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to create certificate: %s", err)
os.Exit(1)
}
certOut, err := os.Create("cert.pem")
if err != nil {
fmt.Fprintf(os.Stderr, "failed to open cert.pem for writing: %s", err)
os.Exit(1)
}
if err := pem.Encode(certOut, &pem.Block{Type: "CERTIFICATE", Bytes: derBytes}); err != nil {
fmt.Fprintf(os.Stderr, "failed to write data to cert.pem: %s", err)
os.Exit(1)
}
if err := certOut.Close(); err != nil {
fmt.Fprintf(os.Stderr, "error closing cert.pem: %s", err)
os.Exit(1)
}
fmt.Print("wrote cert.pem\n")
keyOut, err := os.OpenFile("key.pem", os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600)
if err != nil {
fmt.Fprintf(os.Stderr, "failed to open key.pem for writing: %s", err)
os.Exit(1)
}
if err := pem.Encode(keyOut, pemBlockForKey(priv)); err != nil {
fmt.Fprintf(os.Stderr, "failed to write data to key.pem: %s", err)
os.Exit(1)
}
if err := keyOut.Close(); err != nil {
fmt.Fprintf(os.Stderr, "error closing key.pem: %s", err)
os.Exit(1)
}
fmt.Print("wrote key.pem\n")
}
......@@ -58,6 +58,7 @@ type TxResult struct {
GroupCount int32 `json:"groupCount,omitempty"`
Header string `json:"header,omitempty"`
Next string `json:"next,omitempty"`
Hash string `json:"hash,omitempty"`
}
// ReceiptAccountTransfer defines receipt account transfer
......
......@@ -39,6 +39,7 @@ func DecodeTransaction(tx *rpctypes.Transaction) *TxResult {
GroupCount: tx.GroupCount,
Header: tx.Header,
Next: tx.Next,
Hash: tx.Hash,
}
return result
}
......
......@@ -353,7 +353,7 @@ func parseTxHeight(expire string) error {
return err
}
if txHeight <= 0 {
fmt.Printf("txHeight should be grate to 0")
//fmt.Printf("txHeight should be grate to 0")
return errors.New("txHeight should be grate to 0")
}
......
......@@ -6,7 +6,6 @@
package types
import (
"encoding/json"
"reflect"
"github.com/33cn/chain33/common/address"
......@@ -60,13 +59,6 @@ func (m ManageType) Amount(tx *types.Transaction) (int64, error) {
return 0, nil
}
// CreateTx create a tx
// TODO not going to change the implementation, complete the reconfiguration of the structure first
func (m ManageType) CreateTx(action string, message json.RawMessage) (*types.Transaction, error) {
var tx *types.Transaction
return tx, nil
}
// GetLogMap get log for map
func (m *ManageType) GetLogMap() map[int64]*types.LogInfo {
return logmap
......@@ -86,3 +78,8 @@ func (m ManageType) GetRealToAddr(tx *types.Transaction) string {
func (m ManageType) GetTypeMap() map[string]int32 {
return actionName
}
// GetName reset name
func (m *ManageType) GetName() string {
return ManageX
}
......@@ -142,6 +142,9 @@ type RPC struct {
JrpcFuncBlacklist []string `protobuf:"bytes,7,rep,name=jrpcFuncBlacklist" json:"jrpcFuncBlacklist,omitempty"`
GrpcFuncBlacklist []string `protobuf:"bytes,8,rep,name=grpcFuncBlacklist" json:"grpcFuncBlacklist,omitempty"`
MainnetJrpcAddr string `protobuf:"bytes,9,opt,name=mainnetJrpcAddr" json:"mainnetJrpcAddr,omitempty"`
EnableTLS bool `protobuf:"varint,10,opt,name=enableTLS" json:"enableTLS,omitempty"`
CertFile string `protobuf:"varint,11,opt,name=certFile" json:"certFile,omitempty"`
KeyFile string `protobuf:"varint,12,opt,name=keyFile" json:"keyFile,omitempty"`
}
// Exec 配置
......
......@@ -136,6 +136,9 @@ const (
EventWalletCreateTx = 129
EventStoreList = 130
EventStoreListReply = 131
EventListBlockSeqCB = 132
EventGetSeqCBLastNum = 133
//exec
EventBlockChainQuery = 212
EventConsensusQuery = 213
......
......@@ -12,7 +12,7 @@ import (
"unicode"
"github.com/33cn/chain33/common/address"
proto "github.com/golang/protobuf/proto"
"github.com/golang/protobuf/proto"
)
func init() {
......@@ -113,17 +113,37 @@ func CallExecNewTx(execName, action string, param interface{}) ([]byte, error) {
func CallCreateTx(execName, action string, param Message) ([]byte, error) {
exec := LoadExecutorType(execName)
if exec == nil {
tlog.Error("callExecNewTx", "Error", "exec not found")
tlog.Error("CallCreateTx", "Error", "exec not found")
return nil, ErrNotSupport
}
// param is interface{type, var-nil}, check with nil always fail
if param == nil {
tlog.Error("callExecNewTx", "Error", "param in nil")
tlog.Error("CallCreateTx", "Error", "param in nil")
return nil, ErrInvalidParam
}
tx, err := exec.Create(action, param)
if err != nil {
tlog.Error("callExecNewTx", "Error", err)
tlog.Error("CallCreateTx", "Error", err)
return nil, err
}
return FormatTxEncode(execName, tx)
}
//CallCreateTxJSON create tx by json
func CallCreateTxJSON(execName, action string, param json.RawMessage) ([]byte, error) {
exec := LoadExecutorType(execName)
if exec == nil {
tlog.Error("CallCreateTxJSON", "Error", "exec not found")
return nil, ErrNotSupport
}
// param is interface{type, var-nil}, check with nil always fail
if param == nil {
tlog.Error("CallCreateTxJSON", "Error", "param in nil")
return nil, ErrInvalidParam
}
tx, err := exec.CreateTx(action, param)
if err != nil {
tlog.Error("CallCreateTxJSON", "Error", err)
return nil, err
}
return FormatTxEncode(execName, tx)
......@@ -655,7 +675,7 @@ func (base *ExecTypeBase) GetAction(action string) (Message, error) {
return nil, ErrActionNotSupport
}
//CreateTx 构造tx交易重构完成后删除
//CreateTx 通过json rpc 创建交易
func (base *ExecTypeBase) CreateTx(action string, msg json.RawMessage) (*Transaction, error) {
data, err := base.GetAction(action)
if err != nil {
......@@ -714,7 +734,10 @@ func (base *ExecTypeBase) CreateTransaction(action string, data Message) (tx *Tr
tymap := base.child.GetTypeMap()
if tyid, ok := tymap[action]; ok {
field.Set(reflect.ValueOf(tyid))
return &Transaction{Payload: Encode(value)}, nil
tx := &Transaction{
Payload: Encode(value),
}
return tx, nil
}
return nil, ErrActionNotSupport
}
......
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package types
import (
"encoding/json"
"testing"
"github.com/stretchr/testify/assert"
)
func TestLoadExecutorType(t *testing.T) {
exec := LoadExecutorType("manage")
assert.NotEqual(t, exec, nil)
assert.Equal(t, exec.GetName(), "manage")
exec = LoadExecutorType("coins")
assert.NotEqual(t, exec, nil)
assert.Equal(t, exec.GetName(), "coins")
exec = LoadExecutorType("xxxx")
assert.Equal(t, exec, nil)
}
func TestFormatTx(t *testing.T) {
tx := &Transaction{
Payload: []byte("this is a test."),
}
tx, err := FormatTx("user.p.none", tx)
assert.Equal(t, err, nil)
assert.Equal(t, tx.Execer, []byte("user.p.none"))
fee, _ := tx.GetRealFee(GInt("MinFee"))
assert.Equal(t, tx.Fee, fee)
}
func TestFormatTxEncode(t *testing.T) {
data, err := FormatTxEncode("coins", &Transaction{
Payload: []byte("this is a test."),
})
assert.Equal(t, err, nil)
var tx Transaction
err = Decode(data, &tx)
assert.Equal(t, err, nil)
assert.Equal(t, tx.Execer, []byte("coins"))
}
func TestCallCreateTxJSON(t *testing.T) {
modify := &ModifyConfig{
Key: "token-finisher",
Value: "xxxxxxxxxxxxxxxxxxxxxxxxxxxx",
Op: "add",
Addr: "",
}
data, err := json.Marshal(modify)
assert.Equal(t, err, nil)
result, err := CallCreateTxJSON("manage", "Modify", data)
assert.Equal(t, err, nil)
assert.NotEqual(t, result, nil)
var tx Transaction
err = Decode(result, &tx)
assert.Equal(t, err, nil)
assert.Equal(t, tx.Execer, []byte("manage"))
fee, _ := tx.GetRealFee(GInt("MinFee"))
assert.Equal(t, tx.Fee, fee)
_, err = CallCreateTxJSON("coins", "Modify", data)
assert.NotEqual(t, err, nil)
_, err = CallCreateTxJSON("xxxx", "xxx", data)
assert.NotEqual(t, err, nil)
modify = &ModifyConfig{
Key: "token-finisher",
Value: "xxxxxxxxxxxxxxxxxxxxxxxxxxxx",
Op: "delete",
Addr: "",
}
data, err = json.Marshal(modify)
assert.Equal(t, err, nil)
result, err = CallCreateTxJSON("manage", "Modify", data)
assert.Equal(t, err, nil)
assert.NotEqual(t, result, nil)
err = Decode(result, &tx)
assert.Equal(t, err, nil)
assert.Equal(t, tx.Execer, []byte("manage"))
fee, _ = tx.GetRealFee(GInt("MinFee"))
assert.Equal(t, tx.Fee, fee)
}
func TestCallCreateTx(t *testing.T) {
modify := &ModifyConfig{
Key: "token-finisher",
Value: "xxxxxxxxxxxxxxxxxxxxxxxxxxxx",
Op: "add",
Addr: "",
}
result, err := CallCreateTx("manage", "Modify", modify)
assert.Equal(t, err, nil)
assert.NotEqual(t, result, nil)
var tx Transaction
err = Decode(result, &tx)
assert.Equal(t, err, nil)
assert.Equal(t, tx.Execer, []byte("manage"))
fee, _ := tx.GetRealFee(GInt("MinFee"))
assert.Equal(t, tx.Fee, fee)
_, err = CallCreateTx("coins", "Modify", modify)
assert.NotEqual(t, err, nil)
_, err = CallCreateTx("xxxx", "xxx", modify)
assert.NotEqual(t, err, nil)
modify = &ModifyConfig{
Key: "token-finisher",
Value: "xxxxxxxxxxxxxxxxxxxxxxxxxxxx",
Op: "delete",
Addr: "",
}
result, err = CallCreateTx("manage", "Modify", modify)
assert.Equal(t, err, nil)
assert.NotEqual(t, result, nil)
err = Decode(result, &tx)
assert.Equal(t, err, nil)
assert.Equal(t, tx.Execer, []byte("manage"))
fee, _ = tx.GetRealFee(GInt("MinFee"))
assert.Equal(t, tx.Fee, fee)
}
......@@ -50,6 +50,10 @@ message BlockSeqCB {
string encode = 3;
}
message BlockSeqCBs {
repeated BlockSeqCB items = 1;
}
message BlockSeq {
int64 num = 1;
BlockSequence seq = 2;
......
......@@ -58,6 +58,9 @@ grpcBindAddr="localhost:8802"
whitelist=["127.0.0.1"]
jrpcFuncWhitelist=["*"]
grpcFuncWhitelist=["*"]
enableTLS=false
certFile="cert.pem"
keyFile="key.pem"
[mempool]
poolCacheSize=102400
......
......@@ -6,7 +6,9 @@ package cli
import (
"fmt"
"net/http"
"os"
"strings"
"github.com/33cn/chain33/common/log"
"github.com/33cn/chain33/pluginmgr"
......@@ -43,6 +45,7 @@ var closeCmd = &cobra.Command{
func init() {
rootCmd.AddCommand(
commands.CertCmd(),
commands.AccountCmd(),
commands.BlockCmd(),
commands.BTYCmd(),
......@@ -60,8 +63,34 @@ func init() {
)
}
func testTLS(RPCAddr string) string {
rpcaddr := RPCAddr
if strings.HasPrefix(rpcaddr, "https://") {
return RPCAddr
}
if !strings.HasPrefix(rpcaddr, "http://") {
return RPCAddr
}
//test tls ok
if rpcaddr[len(rpcaddr)-1] != '/' {
rpcaddr += "/"
}
rpcaddr += "test"
resp, err := http.Get(rpcaddr)
if err != nil {
return "https://" + RPCAddr[7:]
}
defer resp.Body.Close()
if resp.StatusCode == 200 {
return RPCAddr
}
return "https://" + RPCAddr[7:]
}
//Run :
func Run(RPCAddr, ParaName string) {
//test tls is enable
RPCAddr = testTLS(RPCAddr)
pluginmgr.AddCmd(rootCmd)
log.SetLogLevel("error")
types.S("RPCAddr", RPCAddr)
......
......@@ -135,7 +135,8 @@ func GetSeed(db dbm.DB, password string) (string, error) {
}
seed, err := AesgcmDecrypter([]byte(password), Encryptedseed)
if err != nil {
return "", err
seedlog.Error("GetSeed", "AesgcmDecrypter err", err)
return "", types.ErrInputPassword
}
return string(seed), nil
}
......
......@@ -30,16 +30,18 @@ func (wallet *Wallet) parseExpire(expire string) (int64, error) {
if len(expire) == 0 {
return 0, errors.New("Expire string should not be empty")
}
if expire[0] == 'H' && expire[1] == ':' {
txHeight, err := strconv.ParseInt(expire[2:], 10, 64)
if err != nil {
return 0, err
}
if txHeight <= 0 {
fmt.Printf("txHeight should be grate to 0")
//fmt.Printf("txHeight should be grate to 0")
return 0, errors.New("txHeight should be grate to 0")
}
if txHeight+types.TxHeightFlag < txHeight {
return 0, errors.New("txHeight overflow")
}
return txHeight + types.TxHeightFlag, nil
}
......@@ -538,8 +540,12 @@ func (wallet *Wallet) ProcSendToAddress(SendToAddress *types.ReqWalletSendToAddr
}
Balance := accounts[0].Balance
amount := SendToAddress.GetAmount()
//amount必须大于等于0
if amount < 0 {
return nil, types.ErrAmount
}
if !SendToAddress.IsToken {
if Balance < amount+wallet.FeeAmount {
if Balance-amount < wallet.FeeAmount {
return nil, types.ErrInsufficientBalance
}
} else {
......@@ -547,7 +553,6 @@ func (wallet *Wallet) ProcSendToAddress(SendToAddress *types.ReqWalletSendToAddr
if Balance < wallet.FeeAmount {
return nil, types.ErrInsufficientBalance
}
if nil == accTokenMap[SendToAddress.TokenSymbol] {
tokenAccDB, err := account.NewAccountDB("token", SendToAddress.TokenSymbol, nil)
if err != nil {
......
......@@ -479,9 +479,11 @@ func testProcSendToAddress(t *testing.T, wallet *Wallet) {
msg = wallet.client.NewMessage("wallet", types.EventWalletSendToAddress, withdraw)
wallet.client.Send(msg, true)
resp, err = wallet.client.Wait(msg)
require.NoError(t, err)
replyHash = resp.GetData().(*types.ReplyHash)
println("withdraw tx", "ReplyHash", common.ToHex(replyHash.Hash))
//返回ErrAmount错误
assert.Equal(t, string(err.Error()), types.ErrAmount.Error())
require.Error(t, err)
//replyHash = resp.GetData().(*types.ReplyHash)
//println("withdraw tx", "ReplyHash", common.ToHex(replyHash.Hash))
println("TestProcSendToAddress end")
println("--------------------------")
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment