|
|
@ -41,10 +41,10 @@ import ( |
|
|
|
|
|
|
|
|
|
|
|
func TestStreamerSubscribe(t *testing.T) { |
|
|
|
func TestStreamerSubscribe(t *testing.T) { |
|
|
|
tester, streamer, _, teardown, err := newStreamerTester(nil) |
|
|
|
tester, streamer, _, teardown, err := newStreamerTester(nil) |
|
|
|
defer teardown() |
|
|
|
|
|
|
|
if err != nil { |
|
|
|
if err != nil { |
|
|
|
t.Fatal(err) |
|
|
|
t.Fatal(err) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
defer teardown() |
|
|
|
|
|
|
|
|
|
|
|
stream := NewStream("foo", "", true) |
|
|
|
stream := NewStream("foo", "", true) |
|
|
|
err = streamer.Subscribe(tester.Nodes[0].ID(), stream, NewRange(0, 0), Top) |
|
|
|
err = streamer.Subscribe(tester.Nodes[0].ID(), stream, NewRange(0, 0), Top) |
|
|
@ -55,10 +55,10 @@ func TestStreamerSubscribe(t *testing.T) { |
|
|
|
|
|
|
|
|
|
|
|
func TestStreamerRequestSubscription(t *testing.T) { |
|
|
|
func TestStreamerRequestSubscription(t *testing.T) { |
|
|
|
tester, streamer, _, teardown, err := newStreamerTester(nil) |
|
|
|
tester, streamer, _, teardown, err := newStreamerTester(nil) |
|
|
|
defer teardown() |
|
|
|
|
|
|
|
if err != nil { |
|
|
|
if err != nil { |
|
|
|
t.Fatal(err) |
|
|
|
t.Fatal(err) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
defer teardown() |
|
|
|
|
|
|
|
|
|
|
|
stream := NewStream("foo", "", false) |
|
|
|
stream := NewStream("foo", "", false) |
|
|
|
err = streamer.RequestSubscription(tester.Nodes[0].ID(), stream, &Range{}, Top) |
|
|
|
err = streamer.RequestSubscription(tester.Nodes[0].ID(), stream, &Range{}, Top) |
|
|
@ -146,10 +146,10 @@ func (self *testServer) Close() { |
|
|
|
|
|
|
|
|
|
|
|
func TestStreamerDownstreamSubscribeUnsubscribeMsgExchange(t *testing.T) { |
|
|
|
func TestStreamerDownstreamSubscribeUnsubscribeMsgExchange(t *testing.T) { |
|
|
|
tester, streamer, _, teardown, err := newStreamerTester(nil) |
|
|
|
tester, streamer, _, teardown, err := newStreamerTester(nil) |
|
|
|
defer teardown() |
|
|
|
|
|
|
|
if err != nil { |
|
|
|
if err != nil { |
|
|
|
t.Fatal(err) |
|
|
|
t.Fatal(err) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
defer teardown() |
|
|
|
|
|
|
|
|
|
|
|
streamer.RegisterClientFunc("foo", func(p *Peer, t string, live bool) (Client, error) { |
|
|
|
streamer.RegisterClientFunc("foo", func(p *Peer, t string, live bool) (Client, error) { |
|
|
|
return newTestClient(t), nil |
|
|
|
return newTestClient(t), nil |
|
|
@ -239,10 +239,10 @@ func TestStreamerDownstreamSubscribeUnsubscribeMsgExchange(t *testing.T) { |
|
|
|
|
|
|
|
|
|
|
|
func TestStreamerUpstreamSubscribeUnsubscribeMsgExchange(t *testing.T) { |
|
|
|
func TestStreamerUpstreamSubscribeUnsubscribeMsgExchange(t *testing.T) { |
|
|
|
tester, streamer, _, teardown, err := newStreamerTester(nil) |
|
|
|
tester, streamer, _, teardown, err := newStreamerTester(nil) |
|
|
|
defer teardown() |
|
|
|
|
|
|
|
if err != nil { |
|
|
|
if err != nil { |
|
|
|
t.Fatal(err) |
|
|
|
t.Fatal(err) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
defer teardown() |
|
|
|
|
|
|
|
|
|
|
|
stream := NewStream("foo", "", false) |
|
|
|
stream := NewStream("foo", "", false) |
|
|
|
|
|
|
|
|
|
|
@ -306,10 +306,10 @@ func TestStreamerUpstreamSubscribeUnsubscribeMsgExchange(t *testing.T) { |
|
|
|
|
|
|
|
|
|
|
|
func TestStreamerUpstreamSubscribeUnsubscribeMsgExchangeLive(t *testing.T) { |
|
|
|
func TestStreamerUpstreamSubscribeUnsubscribeMsgExchangeLive(t *testing.T) { |
|
|
|
tester, streamer, _, teardown, err := newStreamerTester(nil) |
|
|
|
tester, streamer, _, teardown, err := newStreamerTester(nil) |
|
|
|
defer teardown() |
|
|
|
|
|
|
|
if err != nil { |
|
|
|
if err != nil { |
|
|
|
t.Fatal(err) |
|
|
|
t.Fatal(err) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
defer teardown() |
|
|
|
|
|
|
|
|
|
|
|
stream := NewStream("foo", "", true) |
|
|
|
stream := NewStream("foo", "", true) |
|
|
|
|
|
|
|
|
|
|
@ -372,10 +372,10 @@ func TestStreamerUpstreamSubscribeUnsubscribeMsgExchangeLive(t *testing.T) { |
|
|
|
|
|
|
|
|
|
|
|
func TestStreamerUpstreamSubscribeErrorMsgExchange(t *testing.T) { |
|
|
|
func TestStreamerUpstreamSubscribeErrorMsgExchange(t *testing.T) { |
|
|
|
tester, streamer, _, teardown, err := newStreamerTester(nil) |
|
|
|
tester, streamer, _, teardown, err := newStreamerTester(nil) |
|
|
|
defer teardown() |
|
|
|
|
|
|
|
if err != nil { |
|
|
|
if err != nil { |
|
|
|
t.Fatal(err) |
|
|
|
t.Fatal(err) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
defer teardown() |
|
|
|
|
|
|
|
|
|
|
|
streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) { |
|
|
|
streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) { |
|
|
|
return newTestServer(t, 0), nil |
|
|
|
return newTestServer(t, 0), nil |
|
|
@ -416,10 +416,10 @@ func TestStreamerUpstreamSubscribeErrorMsgExchange(t *testing.T) { |
|
|
|
|
|
|
|
|
|
|
|
func TestStreamerUpstreamSubscribeLiveAndHistory(t *testing.T) { |
|
|
|
func TestStreamerUpstreamSubscribeLiveAndHistory(t *testing.T) { |
|
|
|
tester, streamer, _, teardown, err := newStreamerTester(nil) |
|
|
|
tester, streamer, _, teardown, err := newStreamerTester(nil) |
|
|
|
defer teardown() |
|
|
|
|
|
|
|
if err != nil { |
|
|
|
if err != nil { |
|
|
|
t.Fatal(err) |
|
|
|
t.Fatal(err) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
defer teardown() |
|
|
|
|
|
|
|
|
|
|
|
stream := NewStream("foo", "", true) |
|
|
|
stream := NewStream("foo", "", true) |
|
|
|
|
|
|
|
|
|
|
@ -479,10 +479,10 @@ func TestStreamerUpstreamSubscribeLiveAndHistory(t *testing.T) { |
|
|
|
|
|
|
|
|
|
|
|
func TestStreamerDownstreamCorruptHashesMsgExchange(t *testing.T) { |
|
|
|
func TestStreamerDownstreamCorruptHashesMsgExchange(t *testing.T) { |
|
|
|
tester, streamer, _, teardown, err := newStreamerTester(nil) |
|
|
|
tester, streamer, _, teardown, err := newStreamerTester(nil) |
|
|
|
defer teardown() |
|
|
|
|
|
|
|
if err != nil { |
|
|
|
if err != nil { |
|
|
|
t.Fatal(err) |
|
|
|
t.Fatal(err) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
defer teardown() |
|
|
|
|
|
|
|
|
|
|
|
stream := NewStream("foo", "", true) |
|
|
|
stream := NewStream("foo", "", true) |
|
|
|
|
|
|
|
|
|
|
@ -544,10 +544,10 @@ func TestStreamerDownstreamCorruptHashesMsgExchange(t *testing.T) { |
|
|
|
|
|
|
|
|
|
|
|
func TestStreamerDownstreamOfferedHashesMsgExchange(t *testing.T) { |
|
|
|
func TestStreamerDownstreamOfferedHashesMsgExchange(t *testing.T) { |
|
|
|
tester, streamer, _, teardown, err := newStreamerTester(nil) |
|
|
|
tester, streamer, _, teardown, err := newStreamerTester(nil) |
|
|
|
defer teardown() |
|
|
|
|
|
|
|
if err != nil { |
|
|
|
if err != nil { |
|
|
|
t.Fatal(err) |
|
|
|
t.Fatal(err) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
defer teardown() |
|
|
|
|
|
|
|
|
|
|
|
stream := NewStream("foo", "", true) |
|
|
|
stream := NewStream("foo", "", true) |
|
|
|
|
|
|
|
|
|
|
@ -643,10 +643,10 @@ func TestStreamerDownstreamOfferedHashesMsgExchange(t *testing.T) { |
|
|
|
|
|
|
|
|
|
|
|
func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) { |
|
|
|
func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) { |
|
|
|
tester, streamer, _, teardown, err := newStreamerTester(nil) |
|
|
|
tester, streamer, _, teardown, err := newStreamerTester(nil) |
|
|
|
defer teardown() |
|
|
|
|
|
|
|
if err != nil { |
|
|
|
if err != nil { |
|
|
|
t.Fatal(err) |
|
|
|
t.Fatal(err) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
defer teardown() |
|
|
|
|
|
|
|
|
|
|
|
streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) { |
|
|
|
streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) { |
|
|
|
return newTestServer(t, 10), nil |
|
|
|
return newTestServer(t, 10), nil |
|
|
@ -780,10 +780,10 @@ func TestMaxPeerServersWithUnsubscribe(t *testing.T) { |
|
|
|
Syncing: SyncingDisabled, |
|
|
|
Syncing: SyncingDisabled, |
|
|
|
MaxPeerServers: maxPeerServers, |
|
|
|
MaxPeerServers: maxPeerServers, |
|
|
|
}) |
|
|
|
}) |
|
|
|
defer teardown() |
|
|
|
|
|
|
|
if err != nil { |
|
|
|
if err != nil { |
|
|
|
t.Fatal(err) |
|
|
|
t.Fatal(err) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
defer teardown() |
|
|
|
|
|
|
|
|
|
|
|
streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) { |
|
|
|
streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) { |
|
|
|
return newTestServer(t, 0), nil |
|
|
|
return newTestServer(t, 0), nil |
|
|
@ -854,10 +854,10 @@ func TestMaxPeerServersWithoutUnsubscribe(t *testing.T) { |
|
|
|
tester, streamer, _, teardown, err := newStreamerTester(&RegistryOptions{ |
|
|
|
tester, streamer, _, teardown, err := newStreamerTester(&RegistryOptions{ |
|
|
|
MaxPeerServers: maxPeerServers, |
|
|
|
MaxPeerServers: maxPeerServers, |
|
|
|
}) |
|
|
|
}) |
|
|
|
defer teardown() |
|
|
|
|
|
|
|
if err != nil { |
|
|
|
if err != nil { |
|
|
|
t.Fatal(err) |
|
|
|
t.Fatal(err) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
defer teardown() |
|
|
|
|
|
|
|
|
|
|
|
streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) { |
|
|
|
streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) { |
|
|
|
return newTestServer(t, 0), nil |
|
|
|
return newTestServer(t, 0), nil |
|
|
@ -940,10 +940,10 @@ func TestHasPriceImplementation(t *testing.T) { |
|
|
|
Retrieval: RetrievalDisabled, |
|
|
|
Retrieval: RetrievalDisabled, |
|
|
|
Syncing: SyncingDisabled, |
|
|
|
Syncing: SyncingDisabled, |
|
|
|
}) |
|
|
|
}) |
|
|
|
defer teardown() |
|
|
|
|
|
|
|
if err != nil { |
|
|
|
if err != nil { |
|
|
|
t.Fatal(err) |
|
|
|
t.Fatal(err) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
defer teardown() |
|
|
|
|
|
|
|
|
|
|
|
if r.prices == nil { |
|
|
|
if r.prices == nil { |
|
|
|
t.Fatal("No prices implementation available for the stream protocol") |
|
|
|
t.Fatal("No prices implementation available for the stream protocol") |
|
|
@ -1177,6 +1177,7 @@ starts the simulation, waits for SyncUpdateDelay in order to kick off |
|
|
|
stream registration, then tests that there are subscriptions. |
|
|
|
stream registration, then tests that there are subscriptions. |
|
|
|
*/ |
|
|
|
*/ |
|
|
|
func TestGetSubscriptionsRPC(t *testing.T) { |
|
|
|
func TestGetSubscriptionsRPC(t *testing.T) { |
|
|
|
|
|
|
|
|
|
|
|
// arbitrarily set to 4
|
|
|
|
// arbitrarily set to 4
|
|
|
|
nodeCount := 4 |
|
|
|
nodeCount := 4 |
|
|
|
// run with more nodes if `longrunning` flag is set
|
|
|
|
// run with more nodes if `longrunning` flag is set
|
|
|
@ -1188,19 +1189,16 @@ func TestGetSubscriptionsRPC(t *testing.T) { |
|
|
|
// holds the msg code for SubscribeMsg
|
|
|
|
// holds the msg code for SubscribeMsg
|
|
|
|
var subscribeMsgCode uint64 |
|
|
|
var subscribeMsgCode uint64 |
|
|
|
var ok bool |
|
|
|
var ok bool |
|
|
|
var expectedMsgCount = 0 |
|
|
|
var expectedMsgCount counter |
|
|
|
|
|
|
|
|
|
|
|
// this channel signalizes that the expected amount of subscriptiosn is done
|
|
|
|
// this channel signalizes that the expected amount of subscriptiosn is done
|
|
|
|
allSubscriptionsDone := make(chan struct{}) |
|
|
|
allSubscriptionsDone := make(chan struct{}) |
|
|
|
lock := sync.RWMutex{} |
|
|
|
|
|
|
|
// after the test, we need to reset the subscriptionFunc to the default
|
|
|
|
// after the test, we need to reset the subscriptionFunc to the default
|
|
|
|
defer func() { subscriptionFunc = doRequestSubscription }() |
|
|
|
defer func() { subscriptionFunc = doRequestSubscription }() |
|
|
|
|
|
|
|
|
|
|
|
// we use this subscriptionFunc for this test: just increases count and calls the actual subscription
|
|
|
|
// we use this subscriptionFunc for this test: just increases count and calls the actual subscription
|
|
|
|
subscriptionFunc = func(r *Registry, p *network.Peer, bin uint8, subs map[enode.ID]map[Stream]struct{}) bool { |
|
|
|
subscriptionFunc = func(r *Registry, p *network.Peer, bin uint8, subs map[enode.ID]map[Stream]struct{}) bool { |
|
|
|
lock.Lock() |
|
|
|
expectedMsgCount.inc() |
|
|
|
expectedMsgCount++ |
|
|
|
|
|
|
|
lock.Unlock() |
|
|
|
|
|
|
|
doRequestSubscription(r, p, bin, subs) |
|
|
|
doRequestSubscription(r, p, bin, subs) |
|
|
|
return true |
|
|
|
return true |
|
|
|
} |
|
|
|
} |
|
|
@ -1290,24 +1288,24 @@ func TestGetSubscriptionsRPC(t *testing.T) { |
|
|
|
select { |
|
|
|
select { |
|
|
|
case <-allSubscriptionsDone: |
|
|
|
case <-allSubscriptionsDone: |
|
|
|
case <-ctx.Done(): |
|
|
|
case <-ctx.Done(): |
|
|
|
t.Fatal("Context timed out") |
|
|
|
return errors.New("Context timed out") |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
log.Debug("Expected message count: ", "expectedMsgCount", expectedMsgCount) |
|
|
|
log.Debug("Expected message count: ", "expectedMsgCount", expectedMsgCount.count()) |
|
|
|
//now iterate again, this time we call each node via RPC to get its subscriptions
|
|
|
|
//now iterate again, this time we call each node via RPC to get its subscriptions
|
|
|
|
realCount := 0 |
|
|
|
realCount := 0 |
|
|
|
for _, node := range nodes { |
|
|
|
for _, node := range nodes { |
|
|
|
//create rpc client
|
|
|
|
//create rpc client
|
|
|
|
client, err := node.Client() |
|
|
|
client, err := node.Client() |
|
|
|
if err != nil { |
|
|
|
if err != nil { |
|
|
|
t.Fatalf("create node 1 rpc client fail: %v", err) |
|
|
|
return fmt.Errorf("create node 1 rpc client fail: %v", err) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
//ask it for subscriptions
|
|
|
|
//ask it for subscriptions
|
|
|
|
pstreams := make(map[string][]string) |
|
|
|
pstreams := make(map[string][]string) |
|
|
|
err = client.Call(&pstreams, "stream_getPeerSubscriptions") |
|
|
|
err = client.Call(&pstreams, "stream_getPeerSubscriptions") |
|
|
|
if err != nil { |
|
|
|
if err != nil { |
|
|
|
t.Fatal(err) |
|
|
|
return fmt.Errorf("client call stream_getPeerSubscriptions: %v", err) |
|
|
|
} |
|
|
|
} |
|
|
|
//length of the subscriptions can not be smaller than number of peers
|
|
|
|
//length of the subscriptions can not be smaller than number of peers
|
|
|
|
log.Debug("node subscriptions", "node", node.String()) |
|
|
|
log.Debug("node subscriptions", "node", node.String()) |
|
|
@ -1324,8 +1322,9 @@ func TestGetSubscriptionsRPC(t *testing.T) { |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
// every node is mutually subscribed to each other, so the actual count is half of it
|
|
|
|
// every node is mutually subscribed to each other, so the actual count is half of it
|
|
|
|
if realCount/2 != expectedMsgCount { |
|
|
|
emc := expectedMsgCount.count() |
|
|
|
return fmt.Errorf("Real subscriptions and expected amount don't match; real: %d, expected: %d", realCount/2, expectedMsgCount) |
|
|
|
if realCount/2 != emc { |
|
|
|
|
|
|
|
return fmt.Errorf("Real subscriptions and expected amount don't match; real: %d, expected: %d", realCount/2, emc) |
|
|
|
} |
|
|
|
} |
|
|
|
return nil |
|
|
|
return nil |
|
|
|
}) |
|
|
|
}) |
|
|
@ -1333,3 +1332,26 @@ func TestGetSubscriptionsRPC(t *testing.T) { |
|
|
|
t.Fatal(result.Error) |
|
|
|
t.Fatal(result.Error) |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// counter is used to concurrently increment
|
|
|
|
|
|
|
|
// and read an integer value.
|
|
|
|
|
|
|
|
type counter struct { |
|
|
|
|
|
|
|
v int |
|
|
|
|
|
|
|
mu sync.RWMutex |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Increment the counter.
|
|
|
|
|
|
|
|
func (c *counter) inc() { |
|
|
|
|
|
|
|
c.mu.Lock() |
|
|
|
|
|
|
|
defer c.mu.Unlock() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
c.v++ |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Read the counter value.
|
|
|
|
|
|
|
|
func (c *counter) count() int { |
|
|
|
|
|
|
|
c.mu.RLock() |
|
|
|
|
|
|
|
defer c.mu.RUnlock() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return c.v |
|
|
|
|
|
|
|
} |
|
|
|