From c54d123b31cc66f81de815493ab83f019190f241 Mon Sep 17 00:00:00 2001 From: zsfelfoldi Date: Wed, 22 Apr 2015 02:31:59 +0200 Subject: [PATCH] setTimeout and sendAsync implemented added and eval queue for serializing JSRE vm execution --- cmd/geth/js.go | 4 + jsre/jsre.go | 233 +++++++++++++++++++++++++++++++++++++++++++--- jsre/jsre_test.go | 28 ++++++ rpc/jeth.go | 69 +++++++++----- 4 files changed, 299 insertions(+), 35 deletions(-) diff --git a/cmd/geth/js.go b/cmd/geth/js.go index a545de1d0..d8c26eb2f 100644 --- a/cmd/geth/js.go +++ b/cmd/geth/js.go @@ -103,6 +103,7 @@ func (js *jsre) apiBindings() { t, _ := js.re.Get("jeth") jethObj := t.Object() jethObj.Set("send", jeth.Send) + jethObj.Set("sendAsync", jeth.Send) err := js.re.Compile("bignumber.js", re.BigNumber_JS) if err != nil { @@ -172,8 +173,10 @@ func (self *jsre) UnlockAccount(addr []byte) bool { func (self *jsre) exec(filename string) error { if err := self.re.Exec(filename); err != nil { + self.re.Stop(false) return fmt.Errorf("Javascript Error: %v", err) } + self.re.Stop(true) return nil } @@ -201,6 +204,7 @@ func (self *jsre) interactive() { if self.atexit != nil { self.atexit() } + self.re.Stop(false) } func (self *jsre) withHistory(op func(*os.File)) { diff --git a/jsre/jsre.go b/jsre/jsre.go index a49422a12..7549b5e1e 100644 --- a/jsre/jsre.go +++ b/jsre/jsre.go @@ -3,10 +3,11 @@ package jsre import ( "fmt" "io/ioutil" - - "github.com/robertkrimen/otto" + "sync" + "time" "github.com/ethereum/go-ethereum/common" + "github.com/robertkrimen/otto" ) /* @@ -20,59 +21,261 @@ It provides some helper functions to type JSRE struct { assetPath string vm *otto.Otto + + evalQueue chan *evalReq + stopEventLoop chan bool + loopWg sync.WaitGroup +} + +// jsTimer is a single timer instance with a callback function +type jsTimer struct { + timer *time.Timer + duration time.Duration + interval bool + call otto.FunctionCall } +// evalResult is a structure to store the result of any serialized vm execution +type evalResult struct { + result otto.Value + err error +} + +// evalReq is a serialized vm execution request put in evalQueue and processed by runEventLoop +type evalReq struct { + fn func(res *evalResult) + done chan bool + res evalResult +} + +// runtime must be stopped with Stop() after use and cannot be used after stopping func New(assetPath string) *JSRE { re := &JSRE{ - assetPath, - otto.New(), + assetPath: assetPath, + vm: otto.New(), } // load prettyprint func definition re.vm.Run(pp_js) re.vm.Set("loadScript", re.loadScript) + re.evalQueue = make(chan *evalReq) + re.stopEventLoop = make(chan bool) + re.loopWg.Add(1) + go re.runEventLoop() + return re } +// this function runs a piece of JS code either in a serialized way (when useEQ is true) or instantly, circumventing the evalQueue +func (self *JSRE) run(src interface{}, useEQ bool) (value otto.Value, err error) { + if useEQ { + done := make(chan bool) + req := &evalReq{ + fn: func(res *evalResult) { + res.result, res.err = self.vm.Run(src) + }, + done: done, + } + self.evalQueue <- req + <-done + return req.res.result, req.res.err + } else { + return self.vm.Run(src) + } +} + +/* +This function runs the main event loop from a goroutine that is started + when JSRE is created. Use Stop() before exiting to properly stop it. +The event loop processes vm access requests from the evalQueue in a + serialized way and calls timer callback functions at the appropriate time. + +Exported functions always access the vm through the event queue. You can + call the functions of the otto vm directly to circumvent the queue. These + functions should be used if and only if running a routine that was already + called from JS through an RPC call. +*/ +func (self *JSRE) runEventLoop() { + registry := map[*jsTimer]*jsTimer{} + ready := make(chan *jsTimer) + + newTimer := func(call otto.FunctionCall, interval bool) (*jsTimer, otto.Value) { + + delay, _ := call.Argument(1).ToInteger() + if 0 >= delay { + delay = 1 + } + timer := &jsTimer{ + duration: time.Duration(delay) * time.Millisecond, + call: call, + interval: interval, + } + registry[timer] = timer + + timer.timer = time.AfterFunc(timer.duration, func() { + ready <- timer + }) + + value, err := call.Otto.ToValue(timer) + if err != nil { + panic(err) + } + + return timer, value + } + + setTimeout := func(call otto.FunctionCall) otto.Value { + _, value := newTimer(call, false) + return value + } + + setInterval := func(call otto.FunctionCall) otto.Value { + _, value := newTimer(call, true) + return value + } + + clearTimeout := func(call otto.FunctionCall) otto.Value { + timer, _ := call.Argument(0).Export() + if timer, ok := timer.(*jsTimer); ok { + timer.timer.Stop() + delete(registry, timer) + } + return otto.UndefinedValue() + } + + var waitForCallbacks bool + +loop: + for { + select { + case timer := <-ready: + // execute callback, remove/reschedule the timer + var arguments []interface{} + if len(timer.call.ArgumentList) > 2 { + tmp := timer.call.ArgumentList[2:] + arguments = make([]interface{}, 2+len(tmp)) + for i, value := range tmp { + arguments[i+2] = value + } + } else { + arguments = make([]interface{}, 1) + } + arguments[0] = timer.call.ArgumentList[0] + _, err := self.vm.Call(`Function.call.call`, nil, arguments...) + + if err != nil { + break loop + } + if timer.interval { + timer.timer.Reset(timer.duration) + } else { + delete(registry, timer) + if waitForCallbacks && (len(registry) == 0) { + break loop + } + } + case evalReq := <-self.evalQueue: + // run the code, send the result back + self.vm.Set("setTimeout", setTimeout) + self.vm.Set("setInterval", setInterval) + self.vm.Set("clearTimeout", clearTimeout) + self.vm.Set("clearInterval", clearTimeout) + evalReq.fn(&evalReq.res) + close(evalReq.done) + if waitForCallbacks && (len(registry) == 0) { + break loop + } + case waitForCallbacks = <-self.stopEventLoop: + if !waitForCallbacks || (len(registry) == 0) { + break loop + } + } + } + + for _, timer := range registry { + timer.timer.Stop() + delete(registry, timer) + } + + self.loopWg.Done() +} + +// stops the event loop before exit, optionally waits for all timers to expire +func (self *JSRE) Stop(waitForCallbacks bool) { + self.stopEventLoop <- waitForCallbacks + self.loopWg.Wait() +} + // Exec(file) loads and runs the contents of a file // if a relative path is given, the jsre's assetPath is used func (self *JSRE) Exec(file string) error { - return self.exec(common.AbsolutePath(self.assetPath, file)) + return self.exec(common.AbsolutePath(self.assetPath, file), true) } -func (self *JSRE) exec(path string) error { +// circumvents the eval queue, see runEventLoop +func (self *JSRE) execWithoutEQ(file string) error { + return self.exec(common.AbsolutePath(self.assetPath, file), false) +} + +func (self *JSRE) exec(path string, useEQ bool) error { code, err := ioutil.ReadFile(path) if err != nil { return err } - _, err = self.vm.Run(code) + _, err = self.run(code, useEQ) return err } +// assigns value v to a variable in the JS environment func (self *JSRE) Bind(name string, v interface{}) (err error) { - self.vm.Set(name, v) + self.Set(name, v) return } +// runs a piece of JS code func (self *JSRE) Run(code string) (otto.Value, error) { - return self.vm.Run(code) + return self.run(code, true) } +// returns the value of a variable in the JS environment func (self *JSRE) Get(ns string) (otto.Value, error) { - return self.vm.Get(ns) + done := make(chan bool) + req := &evalReq{ + fn: func(res *evalResult) { + res.result, res.err = self.vm.Get(ns) + }, + done: done, + } + self.evalQueue <- req + <-done + return req.res.result, req.res.err } +// assigns value v to a variable in the JS environment func (self *JSRE) Set(ns string, v interface{}) error { - return self.vm.Set(ns, v) + done := make(chan bool) + req := &evalReq{ + fn: func(res *evalResult) { + res.err = self.vm.Set(ns, v) + }, + done: done, + } + self.evalQueue <- req + <-done + return req.res.err } +/* +Executes a JS script from inside the currently executing JS code. +Should only be called from inside an RPC routine. +*/ func (self *JSRE) loadScript(call otto.FunctionCall) otto.Value { file, err := call.Argument(0).ToString() if err != nil { return otto.FalseValue() } - if err := self.Exec(file); err != nil { + if err := self.execWithoutEQ(file); err != nil { // loadScript is only called from inside js fmt.Println("err:", err) return otto.FalseValue() } @@ -80,6 +283,7 @@ func (self *JSRE) loadScript(call otto.FunctionCall) otto.Value { return otto.TrueValue() } +// uses the "prettyPrint" JS function to format a value func (self *JSRE) PrettyPrint(v interface{}) (val otto.Value, err error) { var method otto.Value v, err = self.vm.ToValue(v) @@ -93,6 +297,7 @@ func (self *JSRE) PrettyPrint(v interface{}) (val otto.Value, err error) { return method.Call(method, v) } +// creates an otto value from a go type func (self *JSRE) ToVal(v interface{}) otto.Value { result, err := self.vm.ToValue(v) if err != nil { @@ -102,6 +307,7 @@ func (self *JSRE) ToVal(v interface{}) otto.Value { return result } +// evaluates JS function and returns result in a pretty printed string format func (self *JSRE) Eval(code string) (s string, err error) { var val otto.Value val, err = self.Run(code) @@ -115,11 +321,12 @@ func (self *JSRE) Eval(code string) (s string, err error) { return fmt.Sprintf("%v", val), nil } +// compiles and then runs a piece of JS code func (self *JSRE) Compile(fn string, src interface{}) error { script, err := self.vm.Compile(fn, src) if err != nil { return err } - self.vm.Run(script) + self.run(script, true) return nil } diff --git a/jsre/jsre_test.go b/jsre/jsre_test.go index 667ed4bdc..5eaca2b91 100644 --- a/jsre/jsre_test.go +++ b/jsre/jsre_test.go @@ -5,6 +5,7 @@ import ( "io/ioutil" "os" "testing" + "time" ) type testNativeObjectBinding struct { @@ -43,6 +44,31 @@ func TestExec(t *testing.T) { if exp != got { t.Errorf("expected '%v', got '%v'", exp, got) } + jsre.Stop(false) +} + +func TestNatto(t *testing.T) { + jsre := New("/tmp") + + ioutil.WriteFile("/tmp/test.js", []byte(`setTimeout(function(){msg = "testMsg"}, 1);`), os.ModePerm) + err := jsre.Exec("test.js") + if err != nil { + t.Errorf("expected no error, got %v", err) + } + time.Sleep(time.Millisecond * 10) + val, err := jsre.Run("msg") + if err != nil { + t.Errorf("expected no error, got %v", err) + } + if !val.IsString() { + t.Errorf("expected string value, got %v", val) + } + exp := "testMsg" + got, _ := val.ToString() + if exp != got { + t.Errorf("expected '%v', got '%v'", exp, got) + } + jsre.Stop(false) } func TestBind(t *testing.T) { @@ -59,6 +85,7 @@ func TestBind(t *testing.T) { t.Errorf("expected no error, got %v", err) } t.Logf("no: %v", pp) + jsre.Stop(false) } func TestLoadScript(t *testing.T) { @@ -81,4 +108,5 @@ func TestLoadScript(t *testing.T) { if exp != got { t.Errorf("expected '%v', got '%v'", exp, got) } + jsre.Stop(false) } diff --git a/rpc/jeth.go b/rpc/jeth.go index e83212bb5..4739316b2 100644 --- a/rpc/jeth.go +++ b/rpc/jeth.go @@ -2,8 +2,7 @@ package rpc import ( "encoding/json" - "fmt" - // "fmt" + "github.com/ethereum/go-ethereum/jsre" "github.com/robertkrimen/otto" ) @@ -18,12 +17,12 @@ func NewJeth(ethApi *EthereumApi, toVal func(interface{}) otto.Value, re *jsre.J return &Jeth{ethApi, toVal, re} } -func (self *Jeth) err(code int, msg string, id interface{}) (response otto.Value) { +func (self *Jeth) err(call otto.FunctionCall, code int, msg string, id interface{}) (response otto.Value) { rpcerr := &RpcErrorObject{code, msg} - self.re.Set("ret_jsonrpc", jsonrpcver) - self.re.Set("ret_id", id) - self.re.Set("ret_error", rpcerr) - response, _ = self.re.Run(` + call.Otto.Set("ret_jsonrpc", jsonrpcver) + call.Otto.Set("ret_id", id) + call.Otto.Set("ret_error", rpcerr) + response, _ = call.Otto.Run(` ret_response = { jsonrpc: ret_jsonrpc, id: ret_id, error: ret_error }; `) return @@ -32,27 +31,53 @@ func (self *Jeth) err(code int, msg string, id interface{}) (response otto.Value func (self *Jeth) Send(call otto.FunctionCall) (response otto.Value) { reqif, err := call.Argument(0).Export() if err != nil { - return self.err(-32700, err.Error(), nil) + return self.err(call, -32700, err.Error(), nil) } jsonreq, err := json.Marshal(reqif) - var req RpcRequest - err = json.Unmarshal(jsonreq, &req) - - var respif interface{} - err = self.ethApi.GetRequestReply(&req, &respif) + var reqs []RpcRequest + batch := true + err = json.Unmarshal(jsonreq, &reqs) if err != nil { - fmt.Printf("error: %s\n", err) - return self.err(-32603, err.Error(), req.Id) + reqs = make([]RpcRequest, 1) + err = json.Unmarshal(jsonreq, &reqs[0]) + batch = false + } + + call.Otto.Set("response_len", len(reqs)) + call.Otto.Run("var ret_response = new Array(response_len);") + + for i, req := range reqs { + var respif interface{} + err = self.ethApi.GetRequestReply(&req, &respif) + if err != nil { + return self.err(call, -32603, err.Error(), req.Id) + } + call.Otto.Set("ret_jsonrpc", jsonrpcver) + call.Otto.Set("ret_id", req.Id) + + res, _ := json.Marshal(respif) + + call.Otto.Set("ret_result", string(res)) + call.Otto.Set("response_idx", i) + response, err = call.Otto.Run(` + ret_response[response_idx] = { jsonrpc: ret_jsonrpc, id: ret_id, result: JSON.parse(ret_result) }; + `) + } + + if !batch { + call.Otto.Run("ret_response = ret_response[0];") + } + + if call.Argument(1).IsObject() { + call.Otto.Set("callback", call.Argument(1)) + call.Otto.Run(` + if (Object.prototype.toString.call(callback) == '[object Function]') { + callback(null, ret_response); + } + `) } - self.re.Set("ret_jsonrpc", jsonrpcver) - self.re.Set("ret_id", req.Id) - res, _ := json.Marshal(respif) - self.re.Set("ret_result", string(res)) - response, err = self.re.Run(` - ret_response = { jsonrpc: ret_jsonrpc, id: ret_id, result: JSON.parse(ret_result) }; - `) return }