Commit 67c844cc authored by caopingcp's avatar caopingcp Committed by vipwzw

tendermint add unit test

parent 14659e28
...@@ -91,9 +91,9 @@ timeoutPrecommit=1000 ...@@ -91,9 +91,9 @@ timeoutPrecommit=1000
timeoutPrecommitDelta=500 timeoutPrecommitDelta=500
timeoutCommit=500 timeoutCommit=500
skipTimeoutCommit=false skipTimeoutCommit=false
createEmptyBlocks=false createEmptyBlocks=true
createEmptyBlocksInterval=0 createEmptyBlocksInterval=1
validatorNodes=["127.0.0.1:46656"] validatorNodes=["127.0.0.1:46656", "127.0.0.2:46656"]
[store] [store]
name="kvdb" name="kvdb"
......
...@@ -87,7 +87,7 @@ type ConsensusState struct { ...@@ -87,7 +87,7 @@ type ConsensusState struct {
broadcastChannel chan<- MsgInfo broadcastChannel chan<- MsgInfo
ourID ID ourID ID
status uint32 // 0-stop, 1-start status uint32 // 0-stop, 1-start
Quit chan struct{} quit chan struct{}
txsAvailable chan int64 txsAvailable chan int64
begCons time.Time begCons time.Time
...@@ -103,7 +103,7 @@ func NewConsensusState(client *Client, state State, blockExec *BlockExecutor) *C ...@@ -103,7 +103,7 @@ func NewConsensusState(client *Client, state State, blockExec *BlockExecutor) *C
internalMsgQueue: make(chan MsgInfo, msgQueueSize), internalMsgQueue: make(chan MsgInfo, msgQueueSize),
timeoutTicker: NewTimeoutTicker(), timeoutTicker: NewTimeoutTicker(),
Quit: make(chan struct{}), quit: make(chan struct{}),
txsAvailable: make(chan int64, 1), txsAvailable: make(chan int64, 1),
begCons: time.Time{}, begCons: time.Time{},
} }
...@@ -205,7 +205,7 @@ func (cs *ConsensusState) Start() { ...@@ -205,7 +205,7 @@ func (cs *ConsensusState) Start() {
func (cs *ConsensusState) Stop() { func (cs *ConsensusState) Stop() {
atomic.CompareAndSwapUint32(&cs.status, 1, 0) atomic.CompareAndSwapUint32(&cs.status, 1, 0)
cs.timeoutTicker.Stop() cs.timeoutTicker.Stop()
cs.Quit <- struct{}{} cs.quit <- struct{}{}
} }
//------------------------------------------------------------ //------------------------------------------------------------
...@@ -383,7 +383,7 @@ func (cs *ConsensusState) receiveRoutine(maxSteps int) { ...@@ -383,7 +383,7 @@ func (cs *ConsensusState) receiveRoutine(maxSteps int) {
// if the timeout is relevant to the rs // if the timeout is relevant to the rs
// go to the next step // go to the next step
cs.handleTimeout(ti, rs) cs.handleTimeout(ti, rs)
case <-cs.Quit: case <-cs.quit:
// NOTE: the internalMsgQueue may have signed messages from our // NOTE: the internalMsgQueue may have signed messages from our
// priv_val that haven't hit the WAL, but its ok because // priv_val that haven't hit the WAL, but its ok because
// priv_val tracks LastSig // priv_val tracks LastSig
...@@ -667,7 +667,13 @@ func (cs *ConsensusState) defaultDecideProposal(height int64, round int) { ...@@ -667,7 +667,13 @@ func (cs *ConsensusState) defaultDecideProposal(height int64, round int) {
// Decide on block // Decide on block
if cs.ValidBlock != nil { if cs.ValidBlock != nil {
// If there is valid block, choose that. // If there is valid block, PreExec that.
pblockNew := cs.client.PreExecBlock(cs.ValidBlock.Data, false)
if pblockNew == nil {
tendermintlog.Error("defaultDecideProposal PreExecBlock fail")
return
}
cs.ValidBlock.Data = pblockNew
block = cs.ValidBlock block = cs.ValidBlock
} else { } else {
// Create a new proposal block from state/txs from the mempool. // Create a new proposal block from state/txs from the mempool.
...@@ -737,6 +743,10 @@ func (cs *ConsensusState) createProposalBlock() (block *ttypes.TendermintBlock) ...@@ -737,6 +743,10 @@ func (cs *ConsensusState) createProposalBlock() (block *ttypes.TendermintBlock)
proposerAddr := cs.privValidator.GetAddress() proposerAddr := cs.privValidator.GetAddress()
block = cs.state.MakeBlock(cs.Height, int64(cs.Round), pblock, commit, proposerAddr) block = cs.state.MakeBlock(cs.Height, int64(cs.Round), pblock, commit, proposerAddr)
baseTx := cs.createBaseTx(block.TendermintBlock) baseTx := cs.createBaseTx(block.TendermintBlock)
if baseTx == nil {
tendermintlog.Error("createProposalBlock createBaseTx fail")
return nil
}
block.Data.Txs[0] = baseTx block.Data.Txs[0] = baseTx
block.Data.TxHash = merkle.CalcMerkleRoot(block.Data.Txs) block.Data.TxHash = merkle.CalcMerkleRoot(block.Data.Txs)
pblockNew := cs.client.PreExecBlock(block.Data, false) pblockNew := cs.client.PreExecBlock(block.Data, false)
...@@ -748,17 +758,21 @@ func (cs *ConsensusState) createProposalBlock() (block *ttypes.TendermintBlock) ...@@ -748,17 +758,21 @@ func (cs *ConsensusState) createProposalBlock() (block *ttypes.TendermintBlock)
return block return block
} }
func (cs *ConsensusState) createBaseTx(block *tmtypes.TendermintBlock) (tx *types.Transaction) { func (cs *ConsensusState) createBaseTx(block *tmtypes.TendermintBlock) *types.Transaction {
var state *tmtypes.State var state *tmtypes.State
if cs.Height == 1 { if cs.Height == 1 {
state = &tmtypes.State{} genState := cs.client.GenesisState()
if genState == nil {
return nil
}
state = SaveState(*genState)
} else { } else {
state = cs.client.csStore.LoadStateFromStore() state = cs.client.csStore.LoadStateFromStore()
if state == nil { if state == nil {
panic("createBaseTx LoadStateFromStore fail") return nil
} }
} }
tx = CreateBlockInfoTx(cs.client.pubKey, state, block) tx := CreateBlockInfoTx(cs.client.pubKey, state, block)
return tx return tx
} }
...@@ -1160,8 +1174,9 @@ func (cs *ConsensusState) defaultSetProposal(proposal *tmtypes.Proposal) error { ...@@ -1160,8 +1174,9 @@ func (cs *ConsensusState) defaultSetProposal(proposal *tmtypes.Proposal) error {
} }
// Verify POLRound, which must be -1 or in range [0, proposal.Round). // Verify POLRound, which must be -1 or in range [0, proposal.Round).
if proposal.POLRound != -1 || if proposal.POLRound < -1 ||
(proposal.POLRound >= 0 && proposal.Round >= proposal.POLRound) { (proposal.POLRound >= 0 && proposal.POLRound >= proposal.Round) {
tendermintlog.Error("Invalid POLRound", "POLRound", proposal.POLRound, "Round", proposal.Round)
return ErrInvalidProposalPOLRound return ErrInvalidProposalPOLRound
} }
......
{"genesis_time":"0001-01-01T00:00:00Z","chain_id":"test-chain-Ep9EcD","validators":[{"pub_key":{"type":"ed25519","data":"220ACBE680DF2473A0CB48987A00FCC1812F106A7390BE6B8E2D31122C992A19"},"power":10,"name":""}],"app_hash":""} {"genesis_time":"2019-10-25T10:25:24.027375266+08:00","chain_id":"chain33-ROPNZn","validators":[{"pub_key":{"type":"ed25519","data":"8D80E15927EF2854C78D981015BD2AD469867957081357D0FADD88871752A7E1"},"power":10,"name":""}],"app_hash":null}
\ No newline at end of file
...@@ -3,11 +3,14 @@ package tendermint ...@@ -3,11 +3,14 @@ package tendermint
import ( import (
"encoding/hex" "encoding/hex"
"fmt" "fmt"
"net"
"sync" "sync"
"testing" "testing"
"github.com/33cn/chain33/common/crypto" "github.com/33cn/chain33/common/crypto"
"github.com/33cn/chain33/types" "github.com/33cn/chain33/types"
ttypes "github.com/33cn/plugin/plugin/consensus/tendermint/types"
tmtypes "github.com/33cn/plugin/plugin/dapp/valnode/types"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
...@@ -15,8 +18,8 @@ var ( ...@@ -15,8 +18,8 @@ var (
secureConnCrypto crypto.Crypto secureConnCrypto crypto.Crypto
sum = 0 sum = 0
mutx sync.Mutex mutx sync.Mutex
privKey = "B3DC4C0725884EBB7264B92F1D8D37584A64ADE1799D997EC64B4FE3973E08DE220ACBE680DF2473A0CB48987A00FCC1812F106A7390BE6B8E2D31122C992A19" privKey = "23278EA4CFE8B00360EBB376F2BBFAC345136EE5BC4549532C394C0AF2B80DFE8D80E15927EF2854C78D981015BD2AD469867957081357D0FADD88871752A7E1"
expectAddress = "02A13174B92727C4902DB099E51A3339F48BD45E" expectAddress = "07FE011CE6F4C458FD9D417ED38CB262A4364FA1"
) )
func init() { func init() {
...@@ -79,19 +82,31 @@ func TestIP2IPPort(t *testing.T) { ...@@ -79,19 +82,31 @@ func TestIP2IPPort(t *testing.T) {
fmt.Println("TestIP2IPPort ok") fmt.Println("TestIP2IPPort ok")
} }
func TestNodeFunc(t *testing.T) {
node := &Node{Version: "1.1.1", Network: "net1"}
assert.NotNil(t, node.CompatibleWith(NodeInfo{Version: "1.1", Network: "net1"}))
assert.NotNil(t, node.CompatibleWith(NodeInfo{Version: "2.1.1", Network: "net1"}))
assert.NotNil(t, node.CompatibleWith(NodeInfo{Version: "1.1.1", Network: "net2"}))
assert.Nil(t, node.CompatibleWith(NodeInfo{Version: "1.2.3", Network: "net1"}))
assert.False(t, isIpv6(net.IP{127, 0, 0, 1}))
assert.True(t, isIpv6(net.IP{0xff, 0x01, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0x01}))
fmt.Println("TestNodeFunc ok")
}
func TestPeerSet(t *testing.T) { func TestPeerSet(t *testing.T) {
testSet := NewPeerSet() testSet := NewPeerSet()
assert.Equal(t, false, testSet.Has("1")) assert.Equal(t, false, testSet.Has("1"))
peer1 := &peerConn{id: "1", ip: []byte("1.1.1.1")} peer1 := &peerConn{id: "1", ip: net.IP{127, 0, 0, 1}}
testSet.Add(peer1) testSet.Add(peer1)
assert.Equal(t, true, testSet.Has("1")) assert.Equal(t, true, testSet.Has("1"))
assert.Equal(t, true, testSet.HasIP([]byte("1.1.1.1"))) assert.Equal(t, true, testSet.HasIP(net.IP{127, 0, 0, 1}))
err := testSet.Add(peer1) err := testSet.Add(peer1)
assert.NotNil(t, err) assert.NotNil(t, err)
peer2 := &peerConn{id: "2", ip: []byte("1.1.1.2")} peer2 := &peerConn{id: "2", ip: net.IP{127, 0, 0, 2}}
testSet.Add(peer2) testSet.Add(peer2)
assert.Equal(t, true, testSet.Has("2")) assert.Equal(t, true, testSet.Has("2"))
assert.Equal(t, 2, testSet.Size()) assert.Equal(t, 2, testSet.Size())
...@@ -99,7 +114,142 @@ func TestPeerSet(t *testing.T) { ...@@ -99,7 +114,142 @@ func TestPeerSet(t *testing.T) {
testSet.Remove(peer1) testSet.Remove(peer1)
assert.Equal(t, 1, testSet.Size()) assert.Equal(t, 1, testSet.Size())
assert.Equal(t, false, testSet.Has("1")) assert.Equal(t, false, testSet.Has("1"))
assert.Equal(t, false, testSet.HasIP([]byte("1.1.1.1"))) assert.Equal(t, false, testSet.HasIP(net.IP{127, 0, 0, 1}))
fmt.Println("TestPeerSet ok") fmt.Println("TestPeerSet ok")
} }
func TestPeerConn(t *testing.T) {
pc := &peerConn{id: "3", ip: net.IP{127, 0, 0, 3}, outbound: true, persistent: false}
_, err := pc.RemoteAddr()
assert.NotNil(t, err)
assert.True(t, pc.IsOutbound())
assert.False(t, pc.IsPersistent())
pc.sendQueue = make(chan MsgInfo, maxSendQueueSize)
assert.False(t, pc.Send(MsgInfo{}))
assert.False(t, pc.TrySend(MsgInfo{}))
pc.started = 1
assert.True(t, pc.Send(MsgInfo{}))
assert.True(t, pc.TrySend(MsgInfo{}))
testUpdateStateRoutine(t, pc)
fmt.Println("TestPeerConn ok")
}
func testUpdateStateRoutine(t *testing.T, pc *peerConn) {
pc.quitUpdate = make(chan struct{})
pc.updateStateQueue = make(chan MsgInfo)
pc.state = &PeerConnState{
ip: pc.ip,
PeerRoundState: ttypes.PeerRoundState{
Height: int64(2),
Round: 0,
Step: ttypes.RoundStepCommit,
Proposal: true,
ProposalBlockHash: []byte("ProposalBlockHash@2"),
LastCommitRound: 0,
CatchupCommitRound: 0,
},
}
ps := pc.state
pc.waitQuit.Add(1)
go pc.updateStateRoutine()
//NewRoundStepID msg
rsMsg := &tmtypes.NewRoundStepMsg{
Height: int64(3),
Round: int32(1),
Step: int32(3),
SecondsSinceStartTime: int32(1),
LastCommitRound: int32(1),
}
pc.updateStateQueue <- MsgInfo{ttypes.NewRoundStepID, rsMsg, ID("TEST"), pc.ip.String()}
pc.updateStateQueue <- MsgInfo{TypeID: byte(0x00)}
assert.Equal(t, int64(3), ps.Height)
assert.Equal(t, 1, ps.Round)
assert.Equal(t, ttypes.RoundStepPropose, ps.Step)
assert.Equal(t, false, ps.Proposal)
assert.Equal(t, 1, ps.LastCommitRound)
assert.Equal(t, -1, ps.CatchupCommitRound)
//SetHasProposal
proposal := &tmtypes.Proposal{
Height: int64(3),
Round: int32(1),
POLRound: int32(1),
Blockhash: []byte("ProposalBlockHash@3"),
}
ps.SetHasProposal(proposal)
assert.True(t, ps.Proposal)
assert.Equal(t, 1, ps.ProposalPOLRound)
assert.Equal(t, []byte("ProposalBlockHash@3"), ps.ProposalBlockHash)
//SetHasProposalBlock
block := &ttypes.TendermintBlock{
TendermintBlock: &tmtypes.TendermintBlock{
Header: &tmtypes.TendermintBlockHeader{
Height: int64(3),
Round: int64(1),
},
},
}
ps.SetHasProposalBlock(block)
assert.True(t, ps.ProposalBlock)
//ValidBlockID msg
validBlockMsg := &tmtypes.ValidBlockMsg{
Height: int64(3),
Round: int32(1),
Blockhash: []byte("ValidBlockHash@3"),
IsCommit: false,
}
pc.updateStateQueue <- MsgInfo{ttypes.ValidBlockID, validBlockMsg, ID("TEST"), pc.ip.String()}
pc.updateStateQueue <- MsgInfo{TypeID: byte(0x00)}
assert.Equal(t, []byte("ValidBlockHash@3"), ps.ProposalBlockHash)
//HasVoteID msg
hasVoteMsg := &tmtypes.HasVoteMsg{
Height: int64(3),
Round: int32(1),
Type: int32(ttypes.VoteTypePrevote),
Index: int32(1),
}
ps.EnsureVoteBitArrays(int64(3), 2)
ps.EnsureVoteBitArrays(int64(2), 2)
assert.False(t, ps.Prevotes.GetIndex(1))
pc.updateStateQueue <- MsgInfo{ttypes.HasVoteID, hasVoteMsg, ID("TEST"), pc.ip.String()}
pc.updateStateQueue <- MsgInfo{TypeID: byte(0x00)}
assert.True(t, ps.Prevotes.GetIndex(1))
//ProposalPOLID msg
proposalPOL := ps.Prevotes.TendermintBitArray
proposalPOLMsg := &tmtypes.ProposalPOLMsg{
Height: int64(3),
ProposalPOLRound: int32(1),
ProposalPOL: proposalPOL,
}
pc.updateStateQueue <- MsgInfo{ttypes.ProposalPOLID, proposalPOLMsg, ID("TEST"), pc.ip.String()}
pc.updateStateQueue <- MsgInfo{TypeID: byte(0x00)}
assert.EqualValues(t, proposalPOL, ps.ProposalPOL.TendermintBitArray)
//PickSendVote
ttypes.Init()
vals := make([]*ttypes.Validator, 2)
votes := ttypes.NewVoteSet("TEST", 3, 1, ttypes.VoteTypePrevote, &ttypes.ValidatorSet{Validators: vals})
assert.False(t, pc.PickSendVote(votes))
assert.Equal(t, int64(3), ps.GetHeight())
assert.NotNil(t, ps.GetRoundState())
assert.Nil(t, ps.getVoteBitArray(3, 1, byte(0x03)))
assert.NotNil(t, ps.getVoteBitArray(3, 1, ttypes.VoteTypePrecommit))
assert.Nil(t, ps.getVoteBitArray(2, 1, ttypes.VoteTypePrevote))
assert.NotNil(t, ps.getVoteBitArray(2, 1, ttypes.VoteTypePrecommit))
ps.ensureCatchupCommitRound(3, 2, 2)
assert.Equal(t, 2, ps.CatchupCommitRound)
assert.NotNil(t, ps.CatchupCommit)
assert.Nil(t, ps.getVoteBitArray(3, 2, ttypes.VoteTypePrevote))
assert.NotNil(t, ps.getVoteBitArray(3, 2, ttypes.VoteTypePrecommit))
pc.quitUpdate <- struct{}{}
pc.waitQuit.Wait()
fmt.Println("testUpdateStateRoutine ok")
}
...@@ -85,8 +85,10 @@ type peerConn struct { ...@@ -85,8 +85,10 @@ type peerConn struct {
started uint32 //atomic started uint32 //atomic
stopped uint32 // atomic stopped uint32 // atomic
quit chan struct{} quitSend chan struct{}
waitQuit sync.WaitGroup quitUpdate chan struct{}
quitBeat chan struct{}
waitQuit sync.WaitGroup
transferChannel chan MsgInfo transferChannel chan MsgInfo
...@@ -404,7 +406,9 @@ func (pc *peerConn) Start() error { ...@@ -404,7 +406,9 @@ func (pc *peerConn) Start() error {
pc.pongChannel = make(chan struct{}) pc.pongChannel = make(chan struct{})
pc.sendQueue = make(chan MsgInfo, maxSendQueueSize) pc.sendQueue = make(chan MsgInfo, maxSendQueueSize)
pc.sendBuffer = make([]byte, 0, MaxMsgPacketPayloadSize) pc.sendBuffer = make([]byte, 0, MaxMsgPacketPayloadSize)
pc.quit = make(chan struct{}) pc.quitSend = make(chan struct{})
pc.quitUpdate = make(chan struct{})
pc.quitBeat = make(chan struct{})
pc.state = &PeerConnState{ip: pc.ip, PeerRoundState: ttypes.PeerRoundState{ pc.state = &PeerConnState{ip: pc.ip, PeerRoundState: ttypes.PeerRoundState{
Round: -1, Round: -1,
ProposalPOLRound: -1, ProposalPOLRound: -1,
...@@ -413,7 +417,7 @@ func (pc *peerConn) Start() error { ...@@ -413,7 +417,7 @@ func (pc *peerConn) Start() error {
}} }}
pc.updateStateQueue = make(chan MsgInfo, maxSendQueueSize) pc.updateStateQueue = make(chan MsgInfo, maxSendQueueSize)
pc.heartbeatQueue = make(chan proto.Message, 100) pc.heartbeatQueue = make(chan proto.Message, 100)
pc.waitQuit.Add(5) //sendRoutine, updateStateRoutine,gossipDataRoutine,gossipVotesRoutine,queryMaj23Routine pc.waitQuit.Add(5) //heartbeatRoutine, updateStateRoutine,gossipDataRoutine,gossipVotesRoutine,queryMaj23Routine
go pc.sendRoutine() go pc.sendRoutine()
go pc.recvRoutine() go pc.recvRoutine()
...@@ -430,20 +434,18 @@ func (pc *peerConn) Start() error { ...@@ -430,20 +434,18 @@ func (pc *peerConn) Start() error {
func (pc *peerConn) Stop() { func (pc *peerConn) Stop() {
if atomic.CompareAndSwapUint32(&pc.stopped, 0, 1) { if atomic.CompareAndSwapUint32(&pc.stopped, 0, 1) {
if pc.heartbeatQueue != nil { pc.quitSend <- struct{}{}
close(pc.heartbeatQueue) pc.quitUpdate <- struct{}{}
pc.heartbeatQueue = nil pc.quitBeat <- struct{}{}
}
if pc.quit != nil { pc.waitQuit.Wait()
close(pc.quit) tendermintlog.Info("peerConn stop waitQuit", "peerIP", pc.ip.String())
tendermintlog.Info("peerConn stop quit wait", "peerIP", pc.ip.String())
pc.waitQuit.Wait()
tendermintlog.Info("peerConn stop quit wait finish", "peerIP", pc.ip.String())
pc.quit = nil
}
close(pc.sendQueue) close(pc.sendQueue)
pc.sendQueue = nil
pc.transferChannel = nil pc.transferChannel = nil
pc.CloseConn() pc.CloseConn()
tendermintlog.Info("peerConn stop finish", "peerIP", pc.ip.String())
} }
} }
...@@ -460,8 +462,9 @@ func (pc *peerConn) stopForError(r interface{}) { ...@@ -460,8 +462,9 @@ func (pc *peerConn) stopForError(r interface{}) {
tendermintlog.Error("peerConn recovered panic", "error", r, "peer", pc.ip.String()) tendermintlog.Error("peerConn recovered panic", "error", r, "peer", pc.ip.String())
if pc.onPeerError != nil { if pc.onPeerError != nil {
pc.onPeerError(pc, r) pc.onPeerError(pc, r)
} else {
pc.Stop()
} }
pc.Stop()
} }
func (pc *peerConn) sendRoutine() { func (pc *peerConn) sendRoutine() {
...@@ -469,8 +472,7 @@ func (pc *peerConn) sendRoutine() { ...@@ -469,8 +472,7 @@ func (pc *peerConn) sendRoutine() {
FOR_LOOP: FOR_LOOP:
for { for {
select { select {
case <-pc.quit: case <-pc.quitSend:
pc.waitQuit.Done()
break FOR_LOOP break FOR_LOOP
case msg := <-pc.sendQueue: case msg := <-pc.sendQueue:
bytes, err := proto.Marshal(msg.Msg) bytes, err := proto.Marshal(msg.Msg)
...@@ -519,6 +521,7 @@ FOR_LOOP: ...@@ -519,6 +521,7 @@ FOR_LOOP:
} }
} }
} }
tendermintlog.Info("peerConn stop sendRoutine", "peerIP", pc.ip.String())
} }
func (pc *peerConn) recvRoutine() { func (pc *peerConn) recvRoutine() {
...@@ -542,7 +545,6 @@ FOR_LOOP: ...@@ -542,7 +545,6 @@ FOR_LOOP:
if err != nil { if err != nil {
tendermintlog.Error("Connection failed @ recvRoutine", "conn", pc, "err", err) tendermintlog.Error("Connection failed @ recvRoutine", "conn", pc, "err", err)
pc.stopForError(err) pc.stopForError(err)
panic(fmt.Sprintf("peerConn recvRoutine packetTypeMsg failed :%v", err))
} }
pkt.Bytes = buf2 pkt.Bytes = buf2
} }
...@@ -591,23 +593,24 @@ FOR_LOOP: ...@@ -591,23 +593,24 @@ FOR_LOOP:
} }
close(pc.pongChannel) close(pc.pongChannel)
for range pc.pongChannel { close(pc.heartbeatQueue)
// Drain close(pc.updateStateQueue)
} tendermintlog.Info("peerConn stop recvRoutine", "peerIP", pc.ip.String())
} }
func (pc *peerConn) updateStateRoutine() { func (pc *peerConn) updateStateRoutine() {
FOR_LOOP: FOR_LOOP:
for { for {
select { select {
case <-pc.quitUpdate:
pc.waitQuit.Done()
break FOR_LOOP
case msg := <-pc.updateStateQueue: case msg := <-pc.updateStateQueue:
typeID := msg.TypeID typeID := msg.TypeID
if typeID == ttypes.NewRoundStepID { if typeID == ttypes.NewRoundStepID {
pc.state.ApplyNewRoundStepMessage(msg.Msg.(*tmtypes.NewRoundStepMsg)) pc.state.ApplyNewRoundStepMessage(msg.Msg.(*tmtypes.NewRoundStepMsg))
} else if typeID == ttypes.ValidBlockID { } else if typeID == ttypes.ValidBlockID {
pc.state.ApplyValidBlockMessage(msg.Msg.(*tmtypes.ValidBlockMsg)) pc.state.ApplyValidBlockMessage(msg.Msg.(*tmtypes.ValidBlockMsg))
} else if typeID == ttypes.CommitStepID {
pc.state.ApplyCommitStepMessage(msg.Msg.(*tmtypes.CommitStepMsg))
} else if typeID == ttypes.HasVoteID { } else if typeID == ttypes.HasVoteID {
pc.state.ApplyHasVoteMessage(msg.Msg.(*tmtypes.HasVoteMsg)) pc.state.ApplyHasVoteMessage(msg.Msg.(*tmtypes.HasVoteMsg))
} else if typeID == ttypes.VoteSetMaj23ID { } else if typeID == ttypes.VoteSetMaj23ID {
...@@ -655,41 +658,37 @@ FOR_LOOP: ...@@ -655,41 +658,37 @@ FOR_LOOP:
pc.state.ApplyVoteSetBitsMessage(tmp, nil) pc.state.ApplyVoteSetBitsMessage(tmp, nil)
} }
} else { } else {
tendermintlog.Error("msg not deal in updateStateRoutine", "msgTypeName", msg.Msg.String()) tendermintlog.Error("Unknown message type in updateStateRoutine", "msg", msg)
} }
case <-pc.quit:
pc.waitQuit.Done()
break FOR_LOOP
} }
} }
close(pc.updateStateQueue) tendermintlog.Info("peerConn stop updateStateRoutine", "peerIP", pc.ip.String())
for range pc.updateStateQueue {
// Drain
}
} }
func (pc *peerConn) heartbeatRoutine() { func (pc *peerConn) heartbeatRoutine() {
FOR_LOOP:
for { for {
heartbeat, ok := <-pc.heartbeatQueue select {
if !ok { case <-pc.quitBeat:
tendermintlog.Debug("heartbeatQueue closed") pc.waitQuit.Done()
return break FOR_LOOP
case heartbeat := <-pc.heartbeatQueue:
msg := heartbeat.(*tmtypes.Heartbeat)
tendermintlog.Debug("Received proposal heartbeat message",
"height", msg.Height, "round", msg.Round, "sequence", msg.Sequence,
"valIdx", msg.ValidatorIndex, "valAddr", msg.ValidatorAddress)
} }
msg := heartbeat.(*tmtypes.Heartbeat)
tendermintlog.Debug("Received proposal heartbeat message",
"height", msg.Height, "round", msg.Round, "sequence", msg.Sequence,
"valIdx", msg.ValidatorIndex, "valAddr", msg.ValidatorAddress)
} }
tendermintlog.Info("peerConn stop heartbeatRoutine", "peerIP", pc.ip.String())
} }
func (pc *peerConn) gossipDataRoutine() { func (pc *peerConn) gossipDataRoutine() {
OUTER_LOOP: OUTER_LOOP:
for { for {
// Manage disconnects from self or peer. // Manage disconnects from self or peer.
if !pc.IsRunning() { if !pc.IsRunning() {
tendermintlog.Error("Stopping gossipDataRoutine for peer")
pc.waitQuit.Done() pc.waitQuit.Done()
tendermintlog.Info("peerConn stop gossipDataRoutine", "peerIP", pc.ip.String())
return return
} }
...@@ -790,8 +789,8 @@ OUTER_LOOP: ...@@ -790,8 +789,8 @@ OUTER_LOOP:
for { for {
// Manage disconnects from self or peer. // Manage disconnects from self or peer.
if !pc.IsRunning() { if !pc.IsRunning() {
tendermintlog.Info("Stopping gossipVotesRoutine for peer")
pc.waitQuit.Done() pc.waitQuit.Done()
tendermintlog.Info("peerConn stop gossipVotesRoutine", "peerIP", pc.ip.String())
return return
} }
...@@ -915,8 +914,8 @@ OUTER_LOOP: ...@@ -915,8 +914,8 @@ OUTER_LOOP:
for { for {
// Manage disconnects from self or peer. // Manage disconnects from self or peer.
if !pc.IsRunning() { if !pc.IsRunning() {
tendermintlog.Info("Stopping queryMaj23Routine for peer")
pc.waitQuit.Done() pc.waitQuit.Done()
tendermintlog.Info("peerConn stop queryMaj23Routine", "peerIP", pc.ip.String())
return return
} }
...@@ -1294,16 +1293,6 @@ func (ps *PeerConnState) ApplyNewRoundStepMessage(msg *tmtypes.NewRoundStepMsg) ...@@ -1294,16 +1293,6 @@ func (ps *PeerConnState) ApplyNewRoundStepMessage(msg *tmtypes.NewRoundStepMsg)
} }
} }
// ApplyCommitStepMessage updates the peer state for the new commit.
func (ps *PeerConnState) ApplyCommitStepMessage(msg *tmtypes.CommitStepMsg) {
ps.mtx.Lock()
defer ps.mtx.Unlock()
if ps.Height != msg.Height {
return
}
}
// ApplyValidBlockMessage updates the peer state for the new valid block. // ApplyValidBlockMessage updates the peer state for the new valid block.
func (ps *PeerConnState) ApplyValidBlockMessage(msg *tmtypes.ValidBlockMsg) { func (ps *PeerConnState) ApplyValidBlockMessage(msg *tmtypes.ValidBlockMsg) {
ps.mtx.Lock() ps.mtx.Lock()
......
{"address":"02A13174B92727C4902DB099E51A3339F48BD45E","pub_key":{"type":"ed25519","data":"220ACBE680DF2473A0CB48987A00FCC1812F106A7390BE6B8E2D31122C992A19"},"last_height":0,"last_round":0,"last_step":0,"priv_key":{"type":"ed25519","data":"B3DC4C0725884EBB7264B92F1D8D37584A64ADE1799D997EC64B4FE3973E08DE220ACBE680DF2473A0CB48987A00FCC1812F106A7390BE6B8E2D31122C992A19"}} {"address":"07FE011CE6F4C458FD9D417ED38CB262A4364FA1","pub_key":{"type":"ed25519","data":"8D80E15927EF2854C78D981015BD2AD469867957081357D0FADD88871752A7E1"},"last_height":0,"last_round":0,"last_step":0,"priv_key":{"type":"ed25519","data":"23278EA4CFE8B00360EBB376F2BBFAC345136EE5BC4549532C394C0AF2B80DFE8D80E15927EF2854C78D981015BD2AD469867957081357D0FADD88871752A7E1"}}
\ No newline at end of file \ No newline at end of file
...@@ -10,9 +10,7 @@ import ( ...@@ -10,9 +10,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"math/rand"
"sync" "sync"
"time"
"github.com/33cn/chain33/common" "github.com/33cn/chain33/common"
"github.com/33cn/chain33/common/address" "github.com/33cn/chain33/common/address"
...@@ -24,10 +22,6 @@ import ( ...@@ -24,10 +22,6 @@ import (
const fee = 1e6 const fee = 1e6
var (
r *rand.Rand
)
// State is a short description of the latest committed block of the Tendermint consensus. // State is a short description of the latest committed block of the Tendermint consensus.
// It keeps all information necessary to validate new blocks, // It keeps all information necessary to validate new blocks,
// including the last validator set and the consensus params. // including the last validator set and the consensus params.
...@@ -91,6 +85,9 @@ func (s State) Copy() State { ...@@ -91,6 +85,9 @@ func (s State) Copy() State {
// Equals returns true if the States are identical. // Equals returns true if the States are identical.
func (s State) Equals(s2 State) bool { func (s State) Equals(s2 State) bool {
if s.Bytes() == nil || s2.Bytes() == nil {
return false
}
return bytes.Equal(s.Bytes(), s2.Bytes()) return bytes.Equal(s.Bytes(), s2.Bytes())
} }
...@@ -98,7 +95,7 @@ func (s State) Equals(s2 State) bool { ...@@ -98,7 +95,7 @@ func (s State) Equals(s2 State) bool {
func (s State) Bytes() []byte { func (s State) Bytes() []byte {
sbytes, err := json.Marshal(s) sbytes, err := json.Marshal(s)
if err != nil { if err != nil {
fmt.Printf("Error reading GenesisDoc: %v", err) fmt.Printf("Error State Marshal: %v", err)
return nil return nil
} }
return sbytes return sbytes
...@@ -214,7 +211,6 @@ type CSStateDB struct { ...@@ -214,7 +211,6 @@ type CSStateDB struct {
// NewStateDB make a new one // NewStateDB make a new one
func NewStateDB(client *Client, state State) *CSStateDB { func NewStateDB(client *Client, state State) *CSStateDB {
r = rand.New(rand.NewSource(time.Now().UnixNano()))
return &CSStateDB{ return &CSStateDB{
client: client, client: client,
state: state, state: state,
...@@ -229,8 +225,8 @@ func LoadState(state *tmtypes.State) State { ...@@ -229,8 +225,8 @@ func LoadState(state *tmtypes.State) State {
LastBlockTotalTx: state.GetLastBlockTotalTx(), LastBlockTotalTx: state.GetLastBlockTotalTx(),
LastBlockID: ttypes.BlockID{BlockID: *state.LastBlockID}, LastBlockID: ttypes.BlockID{BlockID: *state.LastBlockID},
LastBlockTime: state.LastBlockTime, LastBlockTime: state.LastBlockTime,
Validators: nil, Validators: &ttypes.ValidatorSet{Validators: make([]*ttypes.Validator, 0), Proposer: &ttypes.Validator{}},
LastValidators: nil, LastValidators: &ttypes.ValidatorSet{Validators: make([]*ttypes.Validator, 0), Proposer: &ttypes.Validator{}},
LastHeightValidatorsChanged: state.LastHeightValidatorsChanged, LastHeightValidatorsChanged: state.LastHeightValidatorsChanged,
ConsensusParams: ttypes.ConsensusParams{BlockSize: ttypes.BlockSize{}, TxSize: ttypes.TxSize{}, BlockGossip: ttypes.BlockGossip{}, EvidenceParams: ttypes.EvidenceParams{}}, ConsensusParams: ttypes.ConsensusParams{BlockSize: ttypes.BlockSize{}, TxSize: ttypes.TxSize{}, BlockGossip: ttypes.BlockGossip{}, EvidenceParams: ttypes.EvidenceParams{}},
LastHeightConsensusParamsChanged: state.LastHeightConsensusParamsChanged, LastHeightConsensusParamsChanged: state.LastHeightConsensusParamsChanged,
...@@ -241,15 +237,11 @@ func LoadState(state *tmtypes.State) State { ...@@ -241,15 +237,11 @@ func LoadState(state *tmtypes.State) State {
if array := validators.GetValidators(); array != nil { if array := validators.GetValidators(); array != nil {
targetArray := make([]*ttypes.Validator, len(array)) targetArray := make([]*ttypes.Validator, len(array))
LoadValidators(targetArray, array) LoadValidators(targetArray, array)
stateTmp.Validators = &ttypes.ValidatorSet{Validators: targetArray, Proposer: nil} stateTmp.Validators.Validators = targetArray
} }
if proposer := validators.GetProposer(); proposer != nil { if proposer := validators.GetProposer(); proposer != nil {
if stateTmp.Validators == nil { if val, err := LoadProposer(proposer); err == nil {
tendermintlog.Error("LoadState validator is nil but proposer") stateTmp.Validators.Proposer = val
} else {
if val, err := LoadProposer(proposer); err == nil {
stateTmp.Validators.Proposer = val
}
} }
} }
} }
...@@ -257,15 +249,11 @@ func LoadState(state *tmtypes.State) State { ...@@ -257,15 +249,11 @@ func LoadState(state *tmtypes.State) State {
if array := lastValidators.GetValidators(); array != nil { if array := lastValidators.GetValidators(); array != nil {
targetArray := make([]*ttypes.Validator, len(array)) targetArray := make([]*ttypes.Validator, len(array))
LoadValidators(targetArray, array) LoadValidators(targetArray, array)
stateTmp.LastValidators = &ttypes.ValidatorSet{Validators: targetArray, Proposer: nil} stateTmp.LastValidators.Validators = targetArray
} }
if proposer := lastValidators.GetProposer(); proposer != nil { if proposer := lastValidators.GetProposer(); proposer != nil {
if stateTmp.LastValidators == nil { if val, err := LoadProposer(proposer); err == nil {
tendermintlog.Error("LoadState last validator is nil but proposer") stateTmp.LastValidators.Proposer = val
} else {
if val, err := LoadProposer(proposer); err == nil {
stateTmp.LastValidators.Proposer = val
}
} }
} }
} }
...@@ -306,32 +294,22 @@ func (csdb *CSStateDB) LoadState() State { ...@@ -306,32 +294,22 @@ func (csdb *CSStateDB) LoadState() State {
// LoadValidators by height // LoadValidators by height
func (csdb *CSStateDB) LoadValidators(height int64) (*ttypes.ValidatorSet, error) { func (csdb *CSStateDB) LoadValidators(height int64) (*ttypes.ValidatorSet, error) {
if height == 0 { csdb.mtx.Lock()
return nil, nil defer csdb.mtx.Unlock()
if height < 1 {
return nil, ttypes.ErrHeightLessThanOne
} }
if csdb.state.LastBlockHeight == height { if csdb.state.LastBlockHeight == height {
return csdb.state.Validators, nil return csdb.state.Validators, nil
} }
blockInfo, err := csdb.client.QueryBlockInfoByHeight(height) state := csdb.client.LoadBlockState(height)
if err != nil { if state == nil {
tendermintlog.Error("LoadValidators GetBlockInfo failed", "error", err) return nil, errors.New("ErrLoadBlockState")
panic(fmt.Sprintf("LoadValidators GetBlockInfo failed:%v", err))
}
var state State
if blockInfo == nil {
tendermintlog.Error("LoadValidators", "msg", "block height is not 0 but blockinfo is nil")
panic(fmt.Sprintf("LoadValidators block height is %v but block info is nil", height))
} else {
csState := blockInfo.GetState()
if csState == nil {
tendermintlog.Error("LoadValidators", "msg", "blockInfo.GetState is nil")
return nil, fmt.Errorf("LoadValidators get state from block info is nil")
}
state = LoadState(csState)
} }
return state.Validators.Copy(), nil load := LoadState(state)
return load.Validators.Copy(), nil
} }
func saveConsensusParams(dest *tmtypes.ConsensusParams, source ttypes.ConsensusParams) { func saveConsensusParams(dest *tmtypes.ConsensusParams, source ttypes.ConsensusParams) {
...@@ -474,7 +452,7 @@ func CreateBlockInfoTx(pubkey string, state *tmtypes.State, block *tmtypes.Tende ...@@ -474,7 +452,7 @@ func CreateBlockInfoTx(pubkey string, state *tmtypes.State, block *tmtypes.Tende
action := &tmtypes.ValNodeAction{Value: nput, Ty: tmtypes.ValNodeActionBlockInfo} action := &tmtypes.ValNodeAction{Value: nput, Ty: tmtypes.ValNodeActionBlockInfo}
tx := &types.Transaction{Execer: []byte("valnode"), Payload: types.Encode(action), Fee: fee} tx := &types.Transaction{Execer: []byte("valnode"), Payload: types.Encode(action), Fee: fee}
tx.To = address.ExecAddress("valnode") tx.To = address.ExecAddress("valnode")
tx.Nonce = r.Int63() tx.Nonce = random.Int63()
tx.Sign(types.SECP256K1, getprivkey(pubkey)) tx.Sign(types.SECP256K1, getprivkey(pubkey))
return tx return tx
......
...@@ -7,13 +7,13 @@ package tendermint ...@@ -7,13 +7,13 @@ package tendermint
import ( import (
"bytes" "bytes"
"fmt" "fmt"
"math/rand"
"os" "os"
"time" "time"
"github.com/33cn/chain33/common/crypto" "github.com/33cn/chain33/common/crypto"
dbm "github.com/33cn/chain33/common/db" dbm "github.com/33cn/chain33/common/db"
"github.com/33cn/chain33/common/log/log15" "github.com/33cn/chain33/common/log/log15"
"github.com/33cn/chain33/common/merkle"
"github.com/33cn/chain33/queue" "github.com/33cn/chain33/queue"
drivers "github.com/33cn/chain33/system/consensus" drivers "github.com/33cn/chain33/system/consensus"
cty "github.com/33cn/chain33/system/dapp/coins/types" cty "github.com/33cn/chain33/system/dapp/coins/types"
...@@ -46,11 +46,13 @@ var ( ...@@ -46,11 +46,13 @@ var (
peerGossipSleepDuration int32 = 100 peerGossipSleepDuration int32 = 100
peerQueryMaj23SleepDuration int32 = 2000 peerQueryMaj23SleepDuration int32 = 2000
zeroHash [32]byte zeroHash [32]byte
random *rand.Rand
) )
func init() { func init() {
drivers.Reg("tendermint", New) drivers.Reg("tendermint", New)
drivers.QueryData.Register("tendermint", &Client{}) drivers.QueryData.Register("tendermint", &Client{})
random = rand.New(rand.NewSource(time.Now().UnixNano()))
} }
// Client Tendermint implementation // Client Tendermint implementation
...@@ -63,7 +65,6 @@ type Client struct { ...@@ -63,7 +65,6 @@ type Client struct {
pubKey string pubKey string
csState *ConsensusState csState *ConsensusState
csStore *ConsensusStore // save consensus state csStore *ConsensusStore // save consensus state
evidenceDB dbm.DB
crypto crypto.Crypto crypto crypto.Crypto
node *Node node *Node
txsAvailable chan int64 txsAvailable chan int64
...@@ -152,9 +153,6 @@ func New(cfg *types.Consensus, sub []byte) queue.Module { ...@@ -152,9 +153,6 @@ func New(cfg *types.Consensus, sub []byte) queue.Module {
return nil return nil
} }
// Make Evidence Reactor
evidenceDB := DefaultDBProvider("evidence")
cr, err := crypto.New(types.GetSignName("", types.ED25519)) cr, err := crypto.New(types.GetSignName("", types.ED25519))
if err != nil { if err != nil {
tendermintlog.Error("NewTendermintClient", "err", err) tendermintlog.Error("NewTendermintClient", "err", err)
...@@ -186,7 +184,6 @@ func New(cfg *types.Consensus, sub []byte) queue.Module { ...@@ -186,7 +184,6 @@ func New(cfg *types.Consensus, sub []byte) queue.Module {
privKey: priv, privKey: priv,
pubKey: pubkey, pubKey: pubkey,
csStore: NewConsensusStore(), csStore: NewConsensusStore(),
evidenceDB: evidenceDB,
crypto: cr, crypto: cr,
txsAvailable: make(chan int64, 1), txsAvailable: make(chan int64, 1),
stopC: make(chan struct{}, 1), stopC: make(chan struct{}, 1),
...@@ -198,7 +195,6 @@ func New(cfg *types.Consensus, sub []byte) queue.Module { ...@@ -198,7 +195,6 @@ func New(cfg *types.Consensus, sub []byte) queue.Module {
} }
// PrivValidator returns the Node's PrivValidator. // PrivValidator returns the Node's PrivValidator.
// XXX: for convenience only!
func (client *Client) PrivValidator() ttypes.PrivValidator { func (client *Client) PrivValidator() ttypes.PrivValidator {
return client.privValidator return client.privValidator
} }
...@@ -208,6 +204,16 @@ func (client *Client) GenesisDoc() *ttypes.GenesisDoc { ...@@ -208,6 +204,16 @@ func (client *Client) GenesisDoc() *ttypes.GenesisDoc {
return client.genesisDoc return client.genesisDoc
} }
// GenesisState returns the Node's GenesisState.
func (client *Client) GenesisState() *State {
state, err := MakeGenesisState(client.genesisDoc)
if err != nil {
tendermintlog.Error("GenesisState", "err", err)
return nil
}
return &state
}
// Close TODO:may need optimize // Close TODO:may need optimize
func (client *Client) Close() { func (client *Client) Close() {
client.node.Stop() client.node.Stop()
...@@ -249,9 +255,9 @@ OuterLoop: ...@@ -249,9 +255,9 @@ OuterLoop:
// load state // load state
var state State var state State
if client.GetCurrentHeight() == 0 { if client.GetCurrentHeight() == 0 {
genState, err := MakeGenesisState(client.genesisDoc) genState := client.GenesisState()
if err != nil { if genState == nil {
panic(fmt.Sprintf("StartConsensus MakeGenesisState fail:%v", err)) panic("StartConsensus GenesisState fail")
} }
state = genState.Copy() state = genState.Copy()
} else if client.GetCurrentHeight() <= client.csStore.LoadStateHeight() { } else if client.GetCurrentHeight() <= client.csStore.LoadStateHeight() {
...@@ -336,21 +342,22 @@ func (client *Client) CreateGenesisTx() (ret []*types.Transaction) { ...@@ -336,21 +342,22 @@ func (client *Client) CreateGenesisTx() (ret []*types.Transaction) {
} }
func (client *Client) getBlockInfoTx(current *types.Block) (*tmtypes.ValNodeAction, error) { func (client *Client) getBlockInfoTx(current *types.Block) (*tmtypes.ValNodeAction, error) {
//检查第一个笔交易的execs, 以及执行状态 //检查第一笔交易
if len(current.Txs) == 0 { if len(current.Txs) == 0 {
return nil, types.ErrEmptyTx return nil, types.ErrEmptyTx
} }
baseTx := current.Txs[0] baseTx := current.Txs[0]
//判断交易类型和执行情况
var valAction tmtypes.ValNodeAction var valAction tmtypes.ValNodeAction
err := types.Decode(baseTx.GetPayload(), &valAction) err := types.Decode(baseTx.GetPayload(), &valAction)
if err != nil { if err != nil {
return nil, err return nil, err
} }
//检查交易类型
if valAction.GetTy() != tmtypes.ValNodeActionBlockInfo { if valAction.GetTy() != tmtypes.ValNodeActionBlockInfo {
return nil, ttypes.ErrBaseTxType return nil, ttypes.ErrBaseTxType
} }
//判断交易执行是否OK //检查交易内容
if valAction.GetBlockInfo() == nil { if valAction.GetBlockInfo() == nil {
return nil, ttypes.ErrBlockInfoTx return nil, ttypes.ErrBlockInfoTx
} }
...@@ -389,9 +396,10 @@ func (client *Client) CheckBlock(parent *types.Block, current *types.BlockDetail ...@@ -389,9 +396,10 @@ func (client *Client) CheckBlock(parent *types.Block, current *types.BlockDetail
return nil return nil
} }
// ProcEvent ... // ProcEvent reply not support action err
func (client *Client) ProcEvent(msg *queue.Message) bool { func (client *Client) ProcEvent(msg *queue.Message) bool {
return false msg.ReplyErr("Client", types.ErrActionNotSupport)
return true
} }
// CreateBlock a routine monitor whether some transactions available and tell client by available channel // CreateBlock a routine monitor whether some transactions available and tell client by available channel
...@@ -519,7 +527,7 @@ func (client *Client) WaitBlock(height int64) bool { ...@@ -519,7 +527,7 @@ func (client *Client) WaitBlock(height int64) bool {
retry++ retry++
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
if retry >= 100 { if retry >= 100 {
tendermintlog.Warn("Wait block fail", "height", height, "CurrentHeight", newHeight) tendermintlog.Error("Wait block fail", "height", height, "CurrentHeight", newHeight)
return false return false
} }
} }
...@@ -527,8 +535,8 @@ func (client *Client) WaitBlock(height int64) bool { ...@@ -527,8 +535,8 @@ func (client *Client) WaitBlock(height int64) bool {
// QueryValidatorsByHeight ... // QueryValidatorsByHeight ...
func (client *Client) QueryValidatorsByHeight(height int64) (*tmtypes.ValNodes, error) { func (client *Client) QueryValidatorsByHeight(height int64) (*tmtypes.ValNodes, error) {
if height <= 0 { if height < 1 {
return nil, types.ErrInvalidParam return nil, ttypes.ErrHeightLessThanOne
} }
req := &tmtypes.ReqNodeInfo{Height: height} req := &tmtypes.ReqNodeInfo{Height: height}
param, err := proto.Marshal(req) param, err := proto.Marshal(req)
...@@ -551,89 +559,56 @@ func (client *Client) QueryValidatorsByHeight(height int64) (*tmtypes.ValNodes, ...@@ -551,89 +559,56 @@ func (client *Client) QueryValidatorsByHeight(height int64) (*tmtypes.ValNodes,
return msg.GetData().(types.Message).(*tmtypes.ValNodes), nil return msg.GetData().(types.Message).(*tmtypes.ValNodes), nil
} }
// QueryBlockInfoByHeight ... // QueryBlockInfoByHeight get blockInfo and block by height
func (client *Client) QueryBlockInfoByHeight(height int64) (*tmtypes.TendermintBlockInfo, error) { func (client *Client) QueryBlockInfoByHeight(height int64) (*tmtypes.TendermintBlockInfo, *types.Block, error) {
if height <= 0 { if height < 1 {
return nil, types.ErrInvalidParam return nil, nil, ttypes.ErrHeightLessThanOne
} }
req := &tmtypes.ReqBlockInfo{Height: height} block, err := client.RequestBlock(height)
param, err := proto.Marshal(req)
if err != nil { if err != nil {
tendermintlog.Error("QueryBlockInfoByHeight marshal", "err", err) return nil, nil, err
return nil, types.ErrInvalidParam
} }
msg := client.GetQueueClient().NewMessage("execs", types.EventBlockChainQuery, valAction, err := client.getBlockInfoTx(block)
&types.ChainExecutor{Driver: "valnode", FuncName: "GetBlockInfoByHeight", StateHash: zeroHash[:], Param: param})
err = client.GetQueueClient().Send(msg, true)
if err != nil { if err != nil {
tendermintlog.Error("QueryBlockInfoByHeight send", "err", err) return nil, nil, err
return nil, err
} }
msg, err = client.GetQueueClient().Wait(msg) return valAction.GetBlockInfo(), block, nil
if err != nil {
return nil, err
}
return msg.GetData().(types.Message).(*tmtypes.TendermintBlockInfo), nil
} }
// LoadBlockCommit by height // LoadBlockCommit by height
func (client *Client) LoadBlockCommit(height int64) *tmtypes.TendermintCommit { func (client *Client) LoadBlockCommit(height int64) *tmtypes.TendermintCommit {
blockInfo, err := client.QueryBlockInfoByHeight(height) blockInfo, _, err := client.QueryBlockInfoByHeight(height)
if err != nil { if err != nil {
tendermintlog.Error("LoadBlockCommit GetBlockInfo fail", "err", err) tendermintlog.Error("LoadBlockCommit GetBlockInfo fail", "err", err)
return nil return nil
} }
if blockInfo == nil {
tendermintlog.Error("LoadBlockCommit get nil block info")
return nil
}
return blockInfo.GetBlock().GetLastCommit() return blockInfo.GetBlock().GetLastCommit()
} }
// LoadBlockState by height // LoadBlockState by height
func (client *Client) LoadBlockState(height int64) *tmtypes.State { func (client *Client) LoadBlockState(height int64) *tmtypes.State {
blockInfo, err := client.QueryBlockInfoByHeight(height) blockInfo, _, err := client.QueryBlockInfoByHeight(height)
if err != nil { if err != nil {
tendermintlog.Error("LoadBlockState GetBlockInfo fail", "err", err) tendermintlog.Error("LoadBlockState GetBlockInfo fail", "err", err)
return nil return nil
} }
if blockInfo == nil {
tendermintlog.Error("LoadBlockState get nil block info")
return nil
}
return blockInfo.GetState() return blockInfo.GetState()
} }
// LoadProposalBlock by height // LoadProposalBlock by height
func (client *Client) LoadProposalBlock(height int64) *tmtypes.TendermintBlock { func (client *Client) LoadProposalBlock(height int64) *tmtypes.TendermintBlock {
block, err := client.RequestBlock(height) blockInfo, block, err := client.QueryBlockInfoByHeight(height)
if err != nil { if err != nil {
tendermintlog.Error("LoadProposal by height failed", "curHeight", client.GetCurrentHeight(), "requestHeight", height, "error", err) tendermintlog.Error("LoadProposal GetBlockInfo fail", "err", err)
return nil return nil
} }
blockInfo, err := client.QueryBlockInfoByHeight(height)
if err != nil {
panic(fmt.Sprintf("LoadProposal GetBlockInfo failed:%v", err))
}
if blockInfo == nil {
tendermintlog.Error("LoadProposal get nil block info")
return nil
}
proposalBlock := blockInfo.GetBlock() proposalBlock := blockInfo.GetBlock()
if proposalBlock != nil { proposalBlock.Data = block
proposalBlock.Data = block
txHash := merkle.CalcMerkleRoot(proposalBlock.Data.Txs)
tendermintlog.Debug("LoadProposalBlock txs hash", "height", proposalBlock.Header.Height, "tx-hash", fmt.Sprintf("%X", txHash))
}
return proposalBlock return proposalBlock
} }
// Query_IsHealthy query whether consensus is sync // Query_IsHealthy query whether consensus is sync
func (client *Client) Query_IsHealthy(req *types.ReqNil) (types.Message, error) { func (client *Client) Query_IsHealthy(req *types.ReqNil) (types.Message, error) {
if client == nil {
return nil, fmt.Errorf("%s", "client not bind message queue.")
}
isHealthy := false isHealthy := false
if client.IsCaughtUp() && client.GetCurrentHeight() <= client.csState.GetRoundState().Height+1 { if client.IsCaughtUp() && client.GetCurrentHeight() <= client.csState.GetRoundState().Height+1 {
isHealthy = true isHealthy = true
...@@ -643,9 +618,6 @@ func (client *Client) Query_IsHealthy(req *types.ReqNil) (types.Message, error) ...@@ -643,9 +618,6 @@ func (client *Client) Query_IsHealthy(req *types.ReqNil) (types.Message, error)
// Query_NodeInfo query validator node info // Query_NodeInfo query validator node info
func (client *Client) Query_NodeInfo(req *types.ReqNil) (types.Message, error) { func (client *Client) Query_NodeInfo(req *types.ReqNil) (types.Message, error) {
if client == nil {
return nil, fmt.Errorf("%s", "client not bind message queue.")
}
nodes := client.csState.GetRoundState().Validators.Validators nodes := client.csState.GetRoundState().Validators.Validators
validators := make([]*tmtypes.Validator, 0) validators := make([]*tmtypes.Validator, 0)
for _, node := range nodes { for _, node := range nodes {
......
...@@ -16,8 +16,6 @@ import ( ...@@ -16,8 +16,6 @@ import (
"testing" "testing"
"time" "time"
"github.com/stretchr/testify/assert"
"github.com/33cn/chain33/blockchain" "github.com/33cn/chain33/blockchain"
"github.com/33cn/chain33/common/address" "github.com/33cn/chain33/common/address"
"github.com/33cn/chain33/common/limits" "github.com/33cn/chain33/common/limits"
...@@ -29,8 +27,10 @@ import ( ...@@ -29,8 +27,10 @@ import (
"github.com/33cn/chain33/rpc" "github.com/33cn/chain33/rpc"
"github.com/33cn/chain33/store" "github.com/33cn/chain33/store"
"github.com/33cn/chain33/types" "github.com/33cn/chain33/types"
ty "github.com/33cn/plugin/plugin/consensus/tendermint/types"
pty "github.com/33cn/plugin/plugin/dapp/norm/types" pty "github.com/33cn/plugin/plugin/dapp/norm/types"
ty "github.com/33cn/plugin/plugin/dapp/valnode/types" vty "github.com/33cn/plugin/plugin/dapp/valnode/types"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc" "google.golang.org/grpc"
_ "github.com/33cn/chain33/system" _ "github.com/33cn/chain33/system"
...@@ -39,7 +39,7 @@ import ( ...@@ -39,7 +39,7 @@ import (
) )
var ( var (
random *rand.Rand r *rand.Rand
loopCount = 3 loopCount = 3
conn *grpc.ClientConn conn *grpc.ClientConn
c types.Chain33Client c types.Chain33Client
...@@ -50,7 +50,7 @@ func init() { ...@@ -50,7 +50,7 @@ func init() {
if err != nil { if err != nil {
panic(err) panic(err)
} }
random = rand.New(rand.NewSource(types.Now().UnixNano())) r = rand.New(rand.NewSource(types.Now().UnixNano()))
log.SetLogLevel("info") log.SetLogLevel("info")
} }
func TestTendermintPerf(t *testing.T) { func TestTendermintPerf(t *testing.T) {
...@@ -128,7 +128,6 @@ func createConn() error { ...@@ -128,7 +128,6 @@ func createConn() error {
return err return err
} }
c = types.NewChain33Client(conn) c = types.NewChain33Client(conn)
r = rand.New(rand.NewSource(types.Now().UnixNano()))
return nil return nil
} }
...@@ -164,7 +163,7 @@ func prepareTxList() *types.Transaction { ...@@ -164,7 +163,7 @@ func prepareTxList() *types.Transaction {
action := &pty.NormAction{Value: nput, Ty: pty.NormActionPut} action := &pty.NormAction{Value: nput, Ty: pty.NormActionPut}
tx := &types.Transaction{Execer: []byte("norm"), Payload: types.Encode(action), Fee: fee} tx := &types.Transaction{Execer: []byte("norm"), Payload: types.Encode(action), Fee: fee}
tx.To = address.ExecAddress("norm") tx.To = address.ExecAddress("norm")
tx.Nonce = random.Int63() tx.Nonce = r.Int63()
tx.Sign(types.SECP256K1, getprivkey("CC38546E9E659D15E6B4893F0AB32A06D103931A8230B0BDE71459D2B27D6944")) tx.Sign(types.SECP256K1, getprivkey("CC38546E9E659D15E6B4893F0AB32A06D103931A8230B0BDE71459D2B27D6944"))
return tx return tx
} }
...@@ -198,11 +197,11 @@ func AddNode() { ...@@ -198,11 +197,11 @@ func AddNode() {
fmt.Fprintln(os.Stderr, err) fmt.Fprintln(os.Stderr, err)
return return
} }
nput := &ty.ValNodeAction_Node{Node: &ty.ValNode{PubKey: pubkeybyte, Power: int64(2)}} nput := &vty.ValNodeAction_Node{Node: &vty.ValNode{PubKey: pubkeybyte, Power: int64(2)}}
action := &ty.ValNodeAction{Value: nput, Ty: ty.ValNodeActionUpdate} action := &vty.ValNodeAction{Value: nput, Ty: vty.ValNodeActionUpdate}
tx := &types.Transaction{Execer: []byte("valnode"), Payload: types.Encode(action), Fee: fee} tx := &types.Transaction{Execer: []byte("valnode"), Payload: types.Encode(action), Fee: fee}
tx.To = address.ExecAddress("valnode") tx.To = address.ExecAddress("valnode")
tx.Nonce = random.Int63() tx.Nonce = r.Int63()
tx.Sign(types.SECP256K1, getprivkey("CC38546E9E659D15E6B4893F0AB32A06D103931A8230B0BDE71459D2B27D6944")) tx.Sign(types.SECP256K1, getprivkey("CC38546E9E659D15E6B4893F0AB32A06D103931A8230B0BDE71459D2B27D6944"))
reply, err := c.SendTransaction(context.Background(), tx) reply, err := c.SendTransaction(context.Background(), tx)
...@@ -217,11 +216,75 @@ func AddNode() { ...@@ -217,11 +216,75 @@ func AddNode() {
} }
func CheckState(t *testing.T, client *Client) { func CheckState(t *testing.T, client *Client) {
state := client.csState.GetState()
assert.NotEmpty(t, state)
_, curVals := state.GetValidators()
assert.NotEmpty(t, curVals)
assert.True(t, state.Equals(state.Copy()))
_, vals := client.csState.GetValidators()
assert.Len(t, vals, 1)
storeHeight := client.csStore.LoadStateHeight()
assert.True(t, storeHeight > 0)
sc := client.csState.LoadCommit(storeHeight)
assert.NotEmpty(t, sc)
bc := client.csState.LoadCommit(storeHeight - 1)
assert.NotEmpty(t, bc)
assert.NotEmpty(t, client.LoadBlockState(storeHeight))
assert.NotEmpty(t, client.LoadProposalBlock(storeHeight))
assert.Nil(t, client.LoadBlockCommit(0))
assert.Nil(t, client.LoadBlockState(0))
assert.Nil(t, client.LoadProposalBlock(0))
csdb := client.csState.blockExec.db
assert.NotEmpty(t, csdb)
assert.NotEmpty(t, csdb.LoadState())
valset, err := csdb.LoadValidators(storeHeight - 1)
assert.Nil(t, err)
assert.NotEmpty(t, valset)
genState, err := MakeGenesisStateFromFile("genesis.json")
assert.Nil(t, err)
assert.Equal(t, genState.LastBlockHeight, int64(0))
assert.Equal(t, client.csState.Prevote(0), 1000*time.Millisecond)
assert.Equal(t, client.csState.Precommit(0), 1000*time.Millisecond)
assert.Equal(t, client.csState.PeerGossipSleep(), 100*time.Millisecond)
assert.Equal(t, client.csState.PeerQueryMaj23Sleep(), 2000*time.Millisecond)
assert.Equal(t, client.csState.IsProposer(), true)
assert.Nil(t, client.csState.GetPrevotesState(state.LastBlockHeight, 0, nil))
assert.Nil(t, client.csState.GetPrecommitsState(state.LastBlockHeight, 0, nil))
assert.NotEmpty(t, client.PrivValidator())
assert.Len(t, client.GenesisDoc().Validators, 1)
msg1, err := client.Query_IsHealthy(&types.ReqNil{}) msg1, err := client.Query_IsHealthy(&types.ReqNil{})
assert.Nil(t, err) assert.Nil(t, err)
flag := msg1.(*ty.IsHealthy).IsHealthy flag := msg1.(*vty.IsHealthy).IsHealthy
assert.Equal(t, true, flag) assert.Equal(t, true, flag)
_, err = client.Query_NodeInfo(&types.ReqNil{}) msg2, err := client.Query_NodeInfo(&types.ReqNil{})
assert.Nil(t, err) assert.Nil(t, err)
tvals := msg2.(*vty.ValidatorSet).Validators
assert.Len(t, tvals, 1)
err = client.CommitBlock(client.GetCurrentBlock())
assert.Nil(t, err)
}
func TestCompareHRS(t *testing.T) {
assert.Equal(t, CompareHRS(1, 1, ty.RoundStepNewHeight, 1, 1, ty.RoundStepNewHeight), 0)
assert.Equal(t, CompareHRS(1, 1, ty.RoundStepPrevote, 2, 1, ty.RoundStepNewHeight), -1)
assert.Equal(t, CompareHRS(1, 1, ty.RoundStepPrevote, 1, 2, ty.RoundStepNewHeight), -1)
assert.Equal(t, CompareHRS(1, 1, ty.RoundStepPrevote, 1, 1, ty.RoundStepPrecommit), -1)
assert.Equal(t, CompareHRS(2, 1, ty.RoundStepNewHeight, 1, 1, ty.RoundStepPrevote), 1)
assert.Equal(t, CompareHRS(1, 2, ty.RoundStepNewHeight, 1, 1, ty.RoundStepPrevote), 1)
assert.Equal(t, CompareHRS(1, 1, ty.RoundStepPrecommit, 1, 1, ty.RoundStepPrevote), 1)
fmt.Println("TestCompareHRS ok")
} }
...@@ -7,6 +7,8 @@ package types ...@@ -7,6 +7,8 @@ package types
import "errors" import "errors"
var ( var (
// ErrHeightLessThanOne error type
ErrHeightLessThanOne = errors.New("ErrHeightLessThanOne")
// ErrBaseTxType error type // ErrBaseTxType error type
ErrBaseTxType = errors.New("ErrBaseTxType") ErrBaseTxType = errors.New("ErrBaseTxType")
// ErrBlockInfoTx error type // ErrBlockInfoTx error type
......
...@@ -32,17 +32,16 @@ const ( ...@@ -32,17 +32,16 @@ const (
RoundStepCommit = RoundStepType(0x08) // Entered commit state machine RoundStepCommit = RoundStepType(0x08) // Entered commit state machine
// NOTE: RoundStepNewHeight acts as RoundStepCommitWait. // NOTE: RoundStepNewHeight acts as RoundStepCommitWait.
NewRoundStepID = byte(0x02) NewRoundStepID = byte(0x01)
CommitStepID = byte(0x03) ProposalID = byte(0x02)
ProposalID = byte(0x04) ProposalPOLID = byte(0x03)
ProposalPOLID = byte(0x05) VoteID = byte(0x04)
VoteID = byte(0x06) HasVoteID = byte(0x05)
HasVoteID = byte(0x07) VoteSetMaj23ID = byte(0x06)
VoteSetMaj23ID = byte(0x08) VoteSetBitsID = byte(0x07)
VoteSetBitsID = byte(0x09) ProposalHeartbeatID = byte(0x08)
ProposalHeartbeatID = byte(0x0a) ProposalBlockID = byte(0x09)
ProposalBlockID = byte(0x0b) ValidBlockID = byte(0x0a)
ValidBlockID = byte(0x0c)
PacketTypePing = byte(0xff) PacketTypePing = byte(0xff)
PacketTypePong = byte(0xfe) PacketTypePong = byte(0xfe)
...@@ -52,7 +51,6 @@ const ( ...@@ -52,7 +51,6 @@ const (
func InitMessageMap() { func InitMessageMap() {
MsgMap = map[byte]reflect.Type{ MsgMap = map[byte]reflect.Type{
NewRoundStepID: reflect.TypeOf(tmtypes.NewRoundStepMsg{}), NewRoundStepID: reflect.TypeOf(tmtypes.NewRoundStepMsg{}),
CommitStepID: reflect.TypeOf(tmtypes.CommitStepMsg{}),
ProposalID: reflect.TypeOf(tmtypes.Proposal{}), ProposalID: reflect.TypeOf(tmtypes.Proposal{}),
ProposalPOLID: reflect.TypeOf(tmtypes.ProposalPOLMsg{}), ProposalPOLID: reflect.TypeOf(tmtypes.ProposalPOLMsg{}),
VoteID: reflect.TypeOf(tmtypes.Vote{}), VoteID: reflect.TypeOf(tmtypes.Vote{}),
......
...@@ -135,10 +135,6 @@ message ValidBlockMsg { ...@@ -135,10 +135,6 @@ message ValidBlockMsg {
bool isCommit = 4; bool isCommit = 4;
} }
message CommitStepMsg {
int64 height = 1;
}
message ProposalPOLMsg { message ProposalPOLMsg {
int64 height = 1; int64 height = 1;
int32 proposalPOLRound = 2; int32 proposalPOLRound = 2;
......
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment