Properly uninstall filters. Mining issue fixed #closes #365

* Added an additional tx state which is used to get the current nonce
* Refresh transient state each time a new canonical block is found
* Properly uninstall filters. Fixes a possible crash in RPC
pull/371/head
obscuren 10 years ago
parent 20aa6dde06
commit b2a225a52e
  1. 24
      cmd/mist/assets/examples/coin.html
  2. 30
      core/chain_manager.go
  3. 11
      rpc/packages.go
  4. 2
      rpc/util.go
  5. 4
      whisper/whisper.go
  6. 10
      xeth/xeth.go

@ -19,6 +19,7 @@
<span>Amount:</span> <span>Amount:</span>
<input type="text" id="amount" style="width:200px"> <input type="text" id="amount" style="width:200px">
<button onclick="transact()">Send</button> <button onclick="transact()">Send</button>
<span id="message"></span>
</div> </div>
<hr> <hr>
@ -95,17 +96,28 @@
} }
function transact() { function transact() {
var to = document.querySelector("#address").value; var to = document.querySelector("#address");
if( to.length == 0 ) { if( to.value.length == 0 ) {
to = "0x4205b06c2cfa0e30359edcab94543266cb6fa1d3"; to = "0x4205b06c2cfa0e30359edcab94543266cb6fa1d3";
} else { } else {
to = "0x"+to; if (to.value.substr(0,2) != "0x")
to.value = "0x"+to.value;
} }
var value = parseInt( document.querySelector("#amount").value ); var value = document.querySelector("#amount");
console.log("transact: ", to, " => ", value) var amount = parseInt( value.value );
console.log("transact: ", to.value, " => ", amount)
contract.send( to, value ); contract.send( to.value, amount );
to.value = "";
value.value = "";
var message = document.querySelector("#message")
message.innerHTML = "Submitted";
setTimeout(function() {
message.innerHTML = "";
}, 1000);
} }
refresh(); refresh();

@ -85,12 +85,14 @@ type ChainManager struct {
lastBlockHash []byte lastBlockHash []byte
transState *state.StateDB transState *state.StateDB
txState *state.StateDB
} }
func NewChainManager(db ethutil.Database, mux *event.TypeMux) *ChainManager { func NewChainManager(db ethutil.Database, mux *event.TypeMux) *ChainManager {
bc := &ChainManager{db: db, genesisBlock: GenesisBlock(db), eventMux: mux} bc := &ChainManager{db: db, genesisBlock: GenesisBlock(db), eventMux: mux}
bc.setLastBlock() bc.setLastBlock()
bc.transState = bc.State().Copy() bc.transState = bc.State().Copy()
bc.txState = bc.State().Copy()
return bc return bc
} }
@ -138,6 +140,19 @@ func (self *ChainManager) TransState() *state.StateDB {
return self.transState return self.transState
} }
func (self *ChainManager) TxState() *state.StateDB {
self.tsmu.RLock()
defer self.tsmu.RUnlock()
return self.txState
}
func (self *ChainManager) setTxState(state *state.StateDB) {
self.tsmu.Lock()
defer self.tsmu.Unlock()
self.txState = state
}
func (self *ChainManager) setTransState(statedb *state.StateDB) { func (self *ChainManager) setTransState(statedb *state.StateDB) {
self.transState = statedb self.transState = statedb
} }
@ -363,6 +378,8 @@ func (self *ChainManager) InsertChain(chain types.Blocks) error {
defer self.tsmu.Unlock() defer self.tsmu.Unlock()
for _, block := range chain { for _, block := range chain {
// Call in to the block processor and check for errors. It's likely that if one block fails
// all others will fail too (unless a known block is returned).
td, err := self.processor.Process(block) td, err := self.processor.Process(block)
if err != nil { if err != nil {
if IsKnownBlockErr(err) { if IsKnownBlockErr(err) {
@ -377,11 +394,15 @@ func (self *ChainManager) InsertChain(chain types.Blocks) error {
} }
block.Td = td block.Td = td
var chain, split bool var canonical, split bool
self.mu.Lock() self.mu.Lock()
{ {
// Write block to database. Eventually we'll have to improve on this and throw away blocks that are
// not in the canonical chain.
self.write(block) self.write(block)
cblock := self.currentBlock cblock := self.currentBlock
// Compare the TD of the last known block in the canonical chain to make sure it's greater.
// At this point it's possible that a different chain (fork) becomes the new canonical chain.
if td.Cmp(self.td) > 0 { if td.Cmp(self.td) > 0 {
if block.Header().Number.Cmp(new(big.Int).Add(cblock.Header().Number, ethutil.Big1)) < 0 { if block.Header().Number.Cmp(new(big.Int).Add(cblock.Header().Number, ethutil.Big1)) < 0 {
chainlogger.Infof("Split detected. New head #%v (%x) TD=%v, was #%v (%x) TD=%v\n", block.Header().Number, block.Hash()[:4], td, cblock.Header().Number, cblock.Hash()[:4], self.td) chainlogger.Infof("Split detected. New head #%v (%x) TD=%v, was #%v (%x) TD=%v\n", block.Header().Number, block.Hash()[:4], td, cblock.Header().Number, cblock.Hash()[:4], self.td)
@ -391,17 +412,18 @@ func (self *ChainManager) InsertChain(chain types.Blocks) error {
self.setTotalDifficulty(td) self.setTotalDifficulty(td)
self.insert(block) self.insert(block)
chain = true canonical = true
} }
} }
self.mu.Unlock() self.mu.Unlock()
if chain { if canonical {
self.setTransState(state.New(block.Root(), self.db))
self.eventMux.Post(ChainEvent{block, td}) self.eventMux.Post(ChainEvent{block, td})
} }
if split { if split {
self.setTransState(state.New(block.Root(), self.db)) self.setTxState(state.New(block.Root(), self.db))
self.eventMux.Post(ChainSplitEvent{block}) self.eventMux.Post(ChainSplitEvent{block})
} }
} }

@ -126,10 +126,6 @@ func (self *EthereumApi) NewFilterString(args string, reply *interface{}) error
self.logMut.Lock() self.logMut.Lock()
defer self.logMut.Unlock() defer self.logMut.Unlock()
if self.logs[id] == nil {
self.logs[id] = &logFilter{timeout: time.Now()}
}
self.logs[id].add(&state.StateLog{}) self.logs[id].add(&state.StateLog{})
} }
if args == "pending" { if args == "pending" {
@ -139,6 +135,7 @@ func (self *EthereumApi) NewFilterString(args string, reply *interface{}) error
} }
id = self.filterManager.InstallFilter(filter) id = self.filterManager.InstallFilter(filter)
self.logs[id] = &logFilter{timeout: time.Now()}
*reply = id *reply = id
return nil return nil
@ -377,12 +374,10 @@ func (p *EthereumApi) NewWhisperFilter(args *xeth.Options, reply *interface{}) e
args.Fn = func(msg xeth.WhisperMessage) { args.Fn = func(msg xeth.WhisperMessage) {
p.messagesMut.Lock() p.messagesMut.Lock()
defer p.messagesMut.Unlock() defer p.messagesMut.Unlock()
if p.messages[id] == nil {
p.messages[id] = &whisperFilter{timeout: time.Now()}
}
p.messages[id].add(msg) // = append(p.messages[id], msg) p.messages[id].add(msg) // = append(p.messages[id], msg)
} }
id = p.xeth.Whisper().Watch(args) id = p.xeth.Whisper().Watch(args)
p.messages[id] = &whisperFilter{timeout: time.Now()}
*reply = id *reply = id
return nil return nil
} }
@ -623,12 +618,14 @@ done:
self.messagesMut.Lock() self.messagesMut.Lock()
for id, filter := range self.logs { for id, filter := range self.logs {
if time.Since(filter.timeout) > 20*time.Second { if time.Since(filter.timeout) > 20*time.Second {
self.filterManager.UninstallFilter(id)
delete(self.logs, id) delete(self.logs, id)
} }
} }
for id, filter := range self.messages { for id, filter := range self.messages {
if time.Since(filter.timeout) > 20*time.Second { if time.Since(filter.timeout) > 20*time.Second {
self.xeth.Whisper().Unwatch(id)
delete(self.messages, id) delete(self.messages, id)
} }
} }

@ -108,6 +108,7 @@ func toLogs(logs state.Logs) (ls []Log) {
type whisperFilter struct { type whisperFilter struct {
messages []xeth.WhisperMessage messages []xeth.WhisperMessage
timeout time.Time timeout time.Time
id int
} }
func (w *whisperFilter) add(msgs ...xeth.WhisperMessage) { func (w *whisperFilter) add(msgs ...xeth.WhisperMessage) {
@ -123,6 +124,7 @@ func (w *whisperFilter) get() []xeth.WhisperMessage {
type logFilter struct { type logFilter struct {
logs state.Logs logs state.Logs
timeout time.Time timeout time.Time
id int
} }
func (l *logFilter) add(logs ...state.Log) { func (l *logFilter) add(logs ...state.Log) {

@ -127,6 +127,10 @@ func (self *Whisper) Watch(opts Filter) int {
}) })
} }
func (self *Whisper) Unwatch(id int) {
self.filters.Uninstall(id)
}
func (self *Whisper) Messages(id int) (messages []*Message) { func (self *Whisper) Messages(id int) (messages []*Message) {
filter := self.filters.Get(id) filter := self.filters.Get(id)
if filter != nil { if filter != nil {

@ -277,7 +277,7 @@ func (self *XEth) Transact(toStr, valueStr, gasStr, gasPriceStr, codeStr string)
} }
var err error var err error
state := self.eth.ChainManager().TransState() state := self.eth.ChainManager().TxState()
if balance := state.GetBalance(key.Address()); balance.Cmp(tx.Value()) < 0 { if balance := state.GetBalance(key.Address()); balance.Cmp(tx.Value()) < 0 {
return "", fmt.Errorf("insufficient balance. balance=%v tx=%v", balance, tx.Value()) return "", fmt.Errorf("insufficient balance. balance=%v tx=%v", balance, tx.Value())
} }
@ -289,10 +289,10 @@ func (self *XEth) Transact(toStr, valueStr, gasStr, gasPriceStr, codeStr string)
//fmt.Printf("create tx: %x %v\n", tx.Hash()[:4], tx.Nonce()) //fmt.Printf("create tx: %x %v\n", tx.Hash()[:4], tx.Nonce())
// Do some pre processing for our "pre" events and hooks // Do some pre processing for our "pre" events and hooks
block := self.chainManager.NewBlock(key.Address()) //block := self.chainManager.NewBlock(key.Address())
coinbase := state.GetOrNewStateObject(key.Address()) //coinbase := state.GetOrNewStateObject(key.Address())
coinbase.SetGasPool(block.GasLimit()) //coinbase.SetGasPool(block.GasLimit())
self.blockProcessor.ApplyTransactions(coinbase, state, block, types.Transactions{tx}, true) //self.blockProcessor.ApplyTransactions(coinbase, state, block, types.Transactions{tx}, true)
err = self.eth.TxPool().Add(tx) err = self.eth.TxPool().Add(tx)
if err != nil { if err != nil {

Loading…
Cancel
Save