forked from mirror/go-ethereum
Merge pull request #19550 from ethersphere/swarm-rather-stable
swarm v0.4-rc1ChrisChinchilla-patch-3
commit
494f5d448a
File diff suppressed because it is too large
Load Diff
@ -1,85 +0,0 @@ |
|||||||
// Copyright 2016 The go-ethereum Authors
|
|
||||||
// This file is part of the go-ethereum library.
|
|
||||||
//
|
|
||||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
|
||||||
// it under the terms of the GNU Lesser General Public License as published by
|
|
||||||
// the Free Software Foundation, either version 3 of the License, or
|
|
||||||
// (at your option) any later version.
|
|
||||||
//
|
|
||||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
|
||||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
// GNU Lesser General Public License for more details.
|
|
||||||
//
|
|
||||||
// You should have received a copy of the GNU Lesser General Public License
|
|
||||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
|
|
||||||
package api |
|
||||||
|
|
||||||
import ( |
|
||||||
"context" |
|
||||||
"path" |
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/swarm/storage" |
|
||||||
) |
|
||||||
|
|
||||||
type Response struct { |
|
||||||
MimeType string |
|
||||||
Status int |
|
||||||
Size int64 |
|
||||||
// Content []byte
|
|
||||||
Content string |
|
||||||
} |
|
||||||
|
|
||||||
// implements a service
|
|
||||||
//
|
|
||||||
// DEPRECATED: Use the HTTP API instead
|
|
||||||
type Storage struct { |
|
||||||
api *API |
|
||||||
} |
|
||||||
|
|
||||||
func NewStorage(api *API) *Storage { |
|
||||||
return &Storage{api} |
|
||||||
} |
|
||||||
|
|
||||||
// Put uploads the content to the swarm with a simple manifest speficying
|
|
||||||
// its content type
|
|
||||||
//
|
|
||||||
// DEPRECATED: Use the HTTP API instead
|
|
||||||
func (s *Storage) Put(ctx context.Context, content string, contentType string, toEncrypt bool) (storage.Address, func(context.Context) error, error) { |
|
||||||
return s.api.Put(ctx, content, contentType, toEncrypt) |
|
||||||
} |
|
||||||
|
|
||||||
// Get retrieves the content from bzzpath and reads the response in full
|
|
||||||
// It returns the Response object, which serialises containing the
|
|
||||||
// response body as the value of the Content field
|
|
||||||
// NOTE: if error is non-nil, sResponse may still have partial content
|
|
||||||
// the actual size of which is given in len(resp.Content), while the expected
|
|
||||||
// size is resp.Size
|
|
||||||
//
|
|
||||||
// DEPRECATED: Use the HTTP API instead
|
|
||||||
func (s *Storage) Get(ctx context.Context, bzzpath string) (*Response, error) { |
|
||||||
uri, err := Parse(path.Join("bzz:/", bzzpath)) |
|
||||||
if err != nil { |
|
||||||
return nil, err |
|
||||||
} |
|
||||||
addr, err := s.api.Resolve(ctx, uri.Addr) |
|
||||||
if err != nil { |
|
||||||
return nil, err |
|
||||||
} |
|
||||||
reader, mimeType, status, _, err := s.api.Get(ctx, nil, addr, uri.Path) |
|
||||||
if err != nil { |
|
||||||
return nil, err |
|
||||||
} |
|
||||||
quitC := make(chan bool) |
|
||||||
expsize, err := reader.Size(ctx, quitC) |
|
||||||
if err != nil { |
|
||||||
return nil, err |
|
||||||
} |
|
||||||
body := make([]byte, expsize) |
|
||||||
size, err := reader.Read(body) |
|
||||||
if int64(size) == expsize { |
|
||||||
err = nil |
|
||||||
} |
|
||||||
return &Response{mimeType, status, expsize, string(body[:size])}, err |
|
||||||
} |
|
@ -1,56 +0,0 @@ |
|||||||
// Copyright 2016 The go-ethereum Authors
|
|
||||||
// This file is part of the go-ethereum library.
|
|
||||||
//
|
|
||||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
|
||||||
// it under the terms of the GNU Lesser General Public License as published by
|
|
||||||
// the Free Software Foundation, either version 3 of the License, or
|
|
||||||
// (at your option) any later version.
|
|
||||||
//
|
|
||||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
|
||||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
// GNU Lesser General Public License for more details.
|
|
||||||
//
|
|
||||||
// You should have received a copy of the GNU Lesser General Public License
|
|
||||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
|
|
||||||
package api |
|
||||||
|
|
||||||
import ( |
|
||||||
"context" |
|
||||||
"testing" |
|
||||||
) |
|
||||||
|
|
||||||
func testStorage(t *testing.T, f func(*Storage, bool)) { |
|
||||||
testAPI(t, func(api *API, toEncrypt bool) { |
|
||||||
f(NewStorage(api), toEncrypt) |
|
||||||
}) |
|
||||||
} |
|
||||||
|
|
||||||
func TestStoragePutGet(t *testing.T) { |
|
||||||
testStorage(t, func(api *Storage, toEncrypt bool) { |
|
||||||
content := "hello" |
|
||||||
exp := expResponse(content, "text/plain", 0) |
|
||||||
// exp := expResponse([]byte(content), "text/plain", 0)
|
|
||||||
ctx := context.TODO() |
|
||||||
bzzkey, wait, err := api.Put(ctx, content, exp.MimeType, toEncrypt) |
|
||||||
if err != nil { |
|
||||||
t.Fatalf("unexpected error: %v", err) |
|
||||||
} |
|
||||||
err = wait(ctx) |
|
||||||
if err != nil { |
|
||||||
t.Fatalf("unexpected error: %v", err) |
|
||||||
} |
|
||||||
bzzhash := bzzkey.Hex() |
|
||||||
// to check put against the API#Get
|
|
||||||
resp0 := testGet(t, api.api, bzzhash, "") |
|
||||||
checkResponse(t, resp0, exp) |
|
||||||
|
|
||||||
// check storage#Get
|
|
||||||
resp, err := api.Get(context.TODO(), bzzhash) |
|
||||||
if err != nil { |
|
||||||
t.Fatalf("unexpected error: %v", err) |
|
||||||
} |
|
||||||
checkResponse(t, &testResponse{nil, resp}, exp) |
|
||||||
}) |
|
||||||
} |
|
@ -0,0 +1,218 @@ |
|||||||
|
// Copyright 2019 The go-ethereum Authors
|
||||||
|
// This file is part of the go-ethereum library.
|
||||||
|
//
|
||||||
|
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU Lesser General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
//
|
||||||
|
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU Lesser General Public License for more details.
|
||||||
|
//
|
||||||
|
// You should have received a copy of the GNU Lesser General Public License
|
||||||
|
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
package chunk |
||||||
|
|
||||||
|
import ( |
||||||
|
"encoding/binary" |
||||||
|
"errors" |
||||||
|
"sync/atomic" |
||||||
|
"time" |
||||||
|
) |
||||||
|
|
||||||
|
var ( |
||||||
|
errExists = errors.New("already exists") |
||||||
|
errNA = errors.New("not available yet") |
||||||
|
errNoETA = errors.New("unable to calculate ETA") |
||||||
|
errTagNotFound = errors.New("tag not found") |
||||||
|
) |
||||||
|
|
||||||
|
// State is the enum type for chunk states
|
||||||
|
type State = uint32 |
||||||
|
|
||||||
|
const ( |
||||||
|
StateSplit State = iota // chunk has been processed by filehasher/swarm safe call
|
||||||
|
StateStored // chunk stored locally
|
||||||
|
StateSeen // chunk previously seen
|
||||||
|
StateSent // chunk sent to neighbourhood
|
||||||
|
StateSynced // proof is received; chunk removed from sync db; chunk is available everywhere
|
||||||
|
) |
||||||
|
|
||||||
|
// Tag represents info on the status of new chunks
|
||||||
|
type Tag struct { |
||||||
|
Uid uint32 // a unique identifier for this tag
|
||||||
|
Name string // a name tag for this tag
|
||||||
|
Address Address // the associated swarm hash for this tag
|
||||||
|
total int64 // total chunks belonging to a tag
|
||||||
|
split int64 // number of chunks already processed by splitter for hashing
|
||||||
|
seen int64 // number of chunks already seen
|
||||||
|
stored int64 // number of chunks already stored locally
|
||||||
|
sent int64 // number of chunks sent for push syncing
|
||||||
|
synced int64 // number of chunks synced with proof
|
||||||
|
startedAt time.Time // tag started to calculate ETA
|
||||||
|
} |
||||||
|
|
||||||
|
// New creates a new tag, stores it by the name and returns it
|
||||||
|
// it returns an error if the tag with this name already exists
|
||||||
|
func NewTag(uid uint32, s string, total int64) *Tag { |
||||||
|
t := &Tag{ |
||||||
|
Uid: uid, |
||||||
|
Name: s, |
||||||
|
startedAt: time.Now(), |
||||||
|
total: total, |
||||||
|
} |
||||||
|
return t |
||||||
|
} |
||||||
|
|
||||||
|
// Inc increments the count for a state
|
||||||
|
func (t *Tag) Inc(state State) { |
||||||
|
var v *int64 |
||||||
|
switch state { |
||||||
|
case StateSplit: |
||||||
|
v = &t.split |
||||||
|
case StateStored: |
||||||
|
v = &t.stored |
||||||
|
case StateSeen: |
||||||
|
v = &t.seen |
||||||
|
case StateSent: |
||||||
|
v = &t.sent |
||||||
|
case StateSynced: |
||||||
|
v = &t.synced |
||||||
|
} |
||||||
|
atomic.AddInt64(v, 1) |
||||||
|
} |
||||||
|
|
||||||
|
// Get returns the count for a state on a tag
|
||||||
|
func (t *Tag) Get(state State) int64 { |
||||||
|
var v *int64 |
||||||
|
switch state { |
||||||
|
case StateSplit: |
||||||
|
v = &t.split |
||||||
|
case StateStored: |
||||||
|
v = &t.stored |
||||||
|
case StateSeen: |
||||||
|
v = &t.seen |
||||||
|
case StateSent: |
||||||
|
v = &t.sent |
||||||
|
case StateSynced: |
||||||
|
v = &t.synced |
||||||
|
} |
||||||
|
return atomic.LoadInt64(v) |
||||||
|
} |
||||||
|
|
||||||
|
// GetTotal returns the total count
|
||||||
|
func (t *Tag) Total() int64 { |
||||||
|
return atomic.LoadInt64(&t.total) |
||||||
|
} |
||||||
|
|
||||||
|
// DoneSplit sets total count to SPLIT count and sets the associated swarm hash for this tag
|
||||||
|
// is meant to be called when splitter finishes for input streams of unknown size
|
||||||
|
func (t *Tag) DoneSplit(address Address) int64 { |
||||||
|
total := atomic.LoadInt64(&t.split) |
||||||
|
atomic.StoreInt64(&t.total, total) |
||||||
|
t.Address = address |
||||||
|
return total |
||||||
|
} |
||||||
|
|
||||||
|
// Status returns the value of state and the total count
|
||||||
|
func (t *Tag) Status(state State) (int64, int64, error) { |
||||||
|
count, seen, total := t.Get(state), atomic.LoadInt64(&t.seen), atomic.LoadInt64(&t.total) |
||||||
|
if total == 0 { |
||||||
|
return count, total, errNA |
||||||
|
} |
||||||
|
switch state { |
||||||
|
case StateSplit, StateStored, StateSeen: |
||||||
|
return count, total, nil |
||||||
|
case StateSent, StateSynced: |
||||||
|
stored := atomic.LoadInt64(&t.stored) |
||||||
|
if stored < total { |
||||||
|
return count, total - seen, errNA |
||||||
|
} |
||||||
|
return count, total - seen, nil |
||||||
|
} |
||||||
|
return count, total, errNA |
||||||
|
} |
||||||
|
|
||||||
|
// ETA returns the time of completion estimated based on time passed and rate of completion
|
||||||
|
func (t *Tag) ETA(state State) (time.Time, error) { |
||||||
|
cnt, total, err := t.Status(state) |
||||||
|
if err != nil { |
||||||
|
return time.Time{}, err |
||||||
|
} |
||||||
|
if cnt == 0 || total == 0 { |
||||||
|
return time.Time{}, errNoETA |
||||||
|
} |
||||||
|
diff := time.Since(t.startedAt) |
||||||
|
dur := time.Duration(total) * diff / time.Duration(cnt) |
||||||
|
return t.startedAt.Add(dur), nil |
||||||
|
} |
||||||
|
|
||||||
|
// MarshalBinary marshals the tag into a byte slice
|
||||||
|
func (tag *Tag) MarshalBinary() (data []byte, err error) { |
||||||
|
buffer := make([]byte, 4) |
||||||
|
binary.BigEndian.PutUint32(buffer, tag.Uid) |
||||||
|
encodeInt64Append(&buffer, tag.total) |
||||||
|
encodeInt64Append(&buffer, tag.split) |
||||||
|
encodeInt64Append(&buffer, tag.seen) |
||||||
|
encodeInt64Append(&buffer, tag.stored) |
||||||
|
encodeInt64Append(&buffer, tag.sent) |
||||||
|
encodeInt64Append(&buffer, tag.synced) |
||||||
|
|
||||||
|
intBuffer := make([]byte, 8) |
||||||
|
|
||||||
|
n := binary.PutVarint(intBuffer, tag.startedAt.Unix()) |
||||||
|
buffer = append(buffer, intBuffer[:n]...) |
||||||
|
|
||||||
|
n = binary.PutVarint(intBuffer, int64(len(tag.Address))) |
||||||
|
buffer = append(buffer, intBuffer[:n]...) |
||||||
|
|
||||||
|
buffer = append(buffer, tag.Address[:]...) |
||||||
|
|
||||||
|
buffer = append(buffer, []byte(tag.Name)...) |
||||||
|
|
||||||
|
return buffer, nil |
||||||
|
} |
||||||
|
|
||||||
|
// UnmarshalBinary unmarshals a byte slice into a tag
|
||||||
|
func (tag *Tag) UnmarshalBinary(buffer []byte) error { |
||||||
|
if len(buffer) < 13 { |
||||||
|
return errors.New("buffer too short") |
||||||
|
} |
||||||
|
tag.Uid = binary.BigEndian.Uint32(buffer) |
||||||
|
buffer = buffer[4:] |
||||||
|
|
||||||
|
tag.total = decodeInt64Splice(&buffer) |
||||||
|
tag.split = decodeInt64Splice(&buffer) |
||||||
|
tag.seen = decodeInt64Splice(&buffer) |
||||||
|
tag.stored = decodeInt64Splice(&buffer) |
||||||
|
tag.sent = decodeInt64Splice(&buffer) |
||||||
|
tag.synced = decodeInt64Splice(&buffer) |
||||||
|
|
||||||
|
t, n := binary.Varint(buffer) |
||||||
|
tag.startedAt = time.Unix(t, 0) |
||||||
|
buffer = buffer[n:] |
||||||
|
|
||||||
|
t, n = binary.Varint(buffer) |
||||||
|
buffer = buffer[n:] |
||||||
|
if t > 0 { |
||||||
|
tag.Address = buffer[:t] |
||||||
|
} |
||||||
|
tag.Name = string(buffer[t:]) |
||||||
|
|
||||||
|
return nil |
||||||
|
} |
||||||
|
|
||||||
|
func encodeInt64Append(buffer *[]byte, val int64) { |
||||||
|
intBuffer := make([]byte, 8) |
||||||
|
n := binary.PutVarint(intBuffer, val) |
||||||
|
*buffer = append(*buffer, intBuffer[:n]...) |
||||||
|
} |
||||||
|
|
||||||
|
func decodeInt64Splice(buffer *[]byte) int64 { |
||||||
|
val, n := binary.Varint((*buffer)) |
||||||
|
*buffer = (*buffer)[n:] |
||||||
|
return val |
||||||
|
} |
@ -0,0 +1,273 @@ |
|||||||
|
// Copyright 2019 The go-ethereum Authors
|
||||||
|
// This file is part of the go-ethereum library.
|
||||||
|
//
|
||||||
|
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU Lesser General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
//
|
||||||
|
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU Lesser General Public License for more details.
|
||||||
|
//
|
||||||
|
// You should have received a copy of the GNU Lesser General Public License
|
||||||
|
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
package chunk |
||||||
|
|
||||||
|
import ( |
||||||
|
"bytes" |
||||||
|
"sync" |
||||||
|
"testing" |
||||||
|
"time" |
||||||
|
) |
||||||
|
|
||||||
|
var ( |
||||||
|
allStates = []State{StateSplit, StateStored, StateSeen, StateSent, StateSynced} |
||||||
|
) |
||||||
|
|
||||||
|
// TestTagSingleIncrements tests if Inc increments the tag state value
|
||||||
|
func TestTagSingleIncrements(t *testing.T) { |
||||||
|
tg := &Tag{total: 10} |
||||||
|
|
||||||
|
tc := []struct { |
||||||
|
state uint32 |
||||||
|
inc int |
||||||
|
expcount int64 |
||||||
|
exptotal int64 |
||||||
|
}{ |
||||||
|
{state: StateSplit, inc: 10, expcount: 10, exptotal: 10}, |
||||||
|
{state: StateStored, inc: 9, expcount: 9, exptotal: 9}, |
||||||
|
{state: StateSeen, inc: 1, expcount: 1, exptotal: 10}, |
||||||
|
{state: StateSent, inc: 9, expcount: 9, exptotal: 9}, |
||||||
|
{state: StateSynced, inc: 9, expcount: 9, exptotal: 9}, |
||||||
|
} |
||||||
|
|
||||||
|
for _, tc := range tc { |
||||||
|
for i := 0; i < tc.inc; i++ { |
||||||
|
tg.Inc(tc.state) |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
for _, tc := range tc { |
||||||
|
if tg.Get(tc.state) != tc.expcount { |
||||||
|
t.Fatalf("not incremented") |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
// TestTagStatus is a unit test to cover Tag.Status method functionality
|
||||||
|
func TestTagStatus(t *testing.T) { |
||||||
|
tg := &Tag{total: 10} |
||||||
|
tg.Inc(StateSeen) |
||||||
|
tg.Inc(StateSent) |
||||||
|
tg.Inc(StateSynced) |
||||||
|
|
||||||
|
for i := 0; i < 10; i++ { |
||||||
|
tg.Inc(StateSplit) |
||||||
|
tg.Inc(StateStored) |
||||||
|
} |
||||||
|
for _, v := range []struct { |
||||||
|
state State |
||||||
|
expVal int64 |
||||||
|
expTotal int64 |
||||||
|
}{ |
||||||
|
{state: StateStored, expVal: 10, expTotal: 10}, |
||||||
|
{state: StateSplit, expVal: 10, expTotal: 10}, |
||||||
|
{state: StateSeen, expVal: 1, expTotal: 10}, |
||||||
|
{state: StateSent, expVal: 1, expTotal: 9}, |
||||||
|
{state: StateSynced, expVal: 1, expTotal: 9}, |
||||||
|
} { |
||||||
|
val, total, err := tg.Status(v.state) |
||||||
|
if err != nil { |
||||||
|
t.Fatal(err) |
||||||
|
} |
||||||
|
if val != v.expVal { |
||||||
|
t.Fatalf("should be %d, got %d", v.expVal, val) |
||||||
|
} |
||||||
|
if total != v.expTotal { |
||||||
|
t.Fatalf("expected total to be %d, got %d", v.expTotal, total) |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
// tests ETA is precise
|
||||||
|
func TestTagETA(t *testing.T) { |
||||||
|
now := time.Now() |
||||||
|
maxDiff := 100000 // 100 microsecond
|
||||||
|
tg := &Tag{total: 10, startedAt: now} |
||||||
|
time.Sleep(100 * time.Millisecond) |
||||||
|
tg.Inc(StateSplit) |
||||||
|
eta, err := tg.ETA(StateSplit) |
||||||
|
if err != nil { |
||||||
|
t.Fatal(err) |
||||||
|
} |
||||||
|
diff := time.Until(eta) - 9*time.Since(now) |
||||||
|
if int(diff) > maxDiff { |
||||||
|
t.Fatalf("ETA is not precise, got diff %v > .1ms", diff) |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
// TestTagConcurrentIncrements tests Inc calls concurrently
|
||||||
|
func TestTagConcurrentIncrements(t *testing.T) { |
||||||
|
tg := &Tag{} |
||||||
|
n := 1000 |
||||||
|
wg := sync.WaitGroup{} |
||||||
|
wg.Add(5 * n) |
||||||
|
for _, f := range allStates { |
||||||
|
go func(f State) { |
||||||
|
for j := 0; j < n; j++ { |
||||||
|
go func() { |
||||||
|
tg.Inc(f) |
||||||
|
wg.Done() |
||||||
|
}() |
||||||
|
} |
||||||
|
}(f) |
||||||
|
} |
||||||
|
wg.Wait() |
||||||
|
for _, f := range allStates { |
||||||
|
v := tg.Get(f) |
||||||
|
if v != int64(n) { |
||||||
|
t.Fatalf("expected state %v to be %v, got %v", f, n, v) |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
// TestTagsMultipleConcurrentIncrements tests Inc calls concurrently
|
||||||
|
func TestTagsMultipleConcurrentIncrementsSyncMap(t *testing.T) { |
||||||
|
ts := NewTags() |
||||||
|
n := 100 |
||||||
|
wg := sync.WaitGroup{} |
||||||
|
wg.Add(10 * 5 * n) |
||||||
|
for i := 0; i < 10; i++ { |
||||||
|
s := string([]byte{uint8(i)}) |
||||||
|
tag, err := ts.New(s, int64(n)) |
||||||
|
if err != nil { |
||||||
|
t.Fatal(err) |
||||||
|
} |
||||||
|
for _, f := range allStates { |
||||||
|
go func(tag *Tag, f State) { |
||||||
|
for j := 0; j < n; j++ { |
||||||
|
go func() { |
||||||
|
tag.Inc(f) |
||||||
|
wg.Done() |
||||||
|
}() |
||||||
|
} |
||||||
|
}(tag, f) |
||||||
|
} |
||||||
|
} |
||||||
|
wg.Wait() |
||||||
|
i := 0 |
||||||
|
ts.Range(func(k, v interface{}) bool { |
||||||
|
i++ |
||||||
|
uid := k.(uint32) |
||||||
|
for _, f := range allStates { |
||||||
|
tag, err := ts.Get(uid) |
||||||
|
if err != nil { |
||||||
|
t.Fatal(err) |
||||||
|
} |
||||||
|
stateVal := tag.Get(f) |
||||||
|
if stateVal != int64(n) { |
||||||
|
t.Fatalf("expected tag %v state %v to be %v, got %v", uid, f, n, v) |
||||||
|
} |
||||||
|
} |
||||||
|
return true |
||||||
|
|
||||||
|
}) |
||||||
|
if i != 10 { |
||||||
|
t.Fatal("not enough tagz") |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
// TestMarshallingWithAddr tests that marshalling and unmarshalling is done correctly when the
|
||||||
|
// tag Address (byte slice) contains some arbitrary value
|
||||||
|
func TestMarshallingWithAddr(t *testing.T) { |
||||||
|
tg := NewTag(111, "test/tag", 10) |
||||||
|
tg.Address = []byte{0, 1, 2, 3, 4, 5, 6} |
||||||
|
|
||||||
|
for _, f := range allStates { |
||||||
|
tg.Inc(f) |
||||||
|
} |
||||||
|
|
||||||
|
b, err := tg.MarshalBinary() |
||||||
|
if err != nil { |
||||||
|
t.Fatal(err) |
||||||
|
} |
||||||
|
|
||||||
|
unmarshalledTag := &Tag{} |
||||||
|
err = unmarshalledTag.UnmarshalBinary(b) |
||||||
|
if err != nil { |
||||||
|
t.Fatal(err) |
||||||
|
} |
||||||
|
|
||||||
|
if unmarshalledTag.Uid != tg.Uid { |
||||||
|
t.Fatalf("tag uids not equal. want %d got %d", tg.Uid, unmarshalledTag.Uid) |
||||||
|
} |
||||||
|
|
||||||
|
if unmarshalledTag.Name != tg.Name { |
||||||
|
t.Fatalf("tag names not equal. want %s got %s", tg.Name, unmarshalledTag.Name) |
||||||
|
} |
||||||
|
|
||||||
|
for _, state := range allStates { |
||||||
|
uv, tv := unmarshalledTag.Get(state), tg.Get(state) |
||||||
|
if uv != tv { |
||||||
|
t.Fatalf("state %d inconsistent. expected %d to equal %d", state, uv, tv) |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
if unmarshalledTag.Total() != tg.Total() { |
||||||
|
t.Fatalf("tag names not equal. want %d got %d", tg.Total(), unmarshalledTag.Total()) |
||||||
|
} |
||||||
|
|
||||||
|
if len(unmarshalledTag.Address) != len(tg.Address) { |
||||||
|
t.Fatalf("tag addresses length mismatch, want %d, got %d", len(tg.Address), len(unmarshalledTag.Address)) |
||||||
|
} |
||||||
|
|
||||||
|
if !bytes.Equal(unmarshalledTag.Address, tg.Address) { |
||||||
|
t.Fatalf("expected tag address to be %v got %v", unmarshalledTag.Address, tg.Address) |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
// TestMarshallingNoAddress tests that marshalling and unmarshalling is done correctly
|
||||||
|
// when the tag Address (byte slice) is empty in this case
|
||||||
|
func TestMarshallingNoAddr(t *testing.T) { |
||||||
|
tg := NewTag(111, "test/tag", 10) |
||||||
|
for _, f := range allStates { |
||||||
|
tg.Inc(f) |
||||||
|
} |
||||||
|
|
||||||
|
b, err := tg.MarshalBinary() |
||||||
|
if err != nil { |
||||||
|
t.Fatal(err) |
||||||
|
} |
||||||
|
|
||||||
|
unmarshalledTag := &Tag{} |
||||||
|
err = unmarshalledTag.UnmarshalBinary(b) |
||||||
|
if err != nil { |
||||||
|
t.Fatal(err) |
||||||
|
} |
||||||
|
|
||||||
|
if unmarshalledTag.Uid != tg.Uid { |
||||||
|
t.Fatalf("tag uids not equal. want %d got %d", tg.Uid, unmarshalledTag.Uid) |
||||||
|
} |
||||||
|
|
||||||
|
if unmarshalledTag.Name != tg.Name { |
||||||
|
t.Fatalf("tag names not equal. want %s got %s", tg.Name, unmarshalledTag.Name) |
||||||
|
} |
||||||
|
|
||||||
|
for _, state := range allStates { |
||||||
|
uv, tv := unmarshalledTag.Get(state), tg.Get(state) |
||||||
|
if uv != tv { |
||||||
|
t.Fatalf("state %d inconsistent. expected %d to equal %d", state, uv, tv) |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
if unmarshalledTag.Total() != tg.Total() { |
||||||
|
t.Fatalf("tag names not equal. want %d got %d", tg.Total(), unmarshalledTag.Total()) |
||||||
|
} |
||||||
|
|
||||||
|
if len(unmarshalledTag.Address) != len(tg.Address) { |
||||||
|
t.Fatalf("expected tag addresses to be equal length") |
||||||
|
} |
||||||
|
} |
@ -0,0 +1,96 @@ |
|||||||
|
// Copyright 2019 The go-ethereum Authors
|
||||||
|
// This file is part of the go-ethereum library.
|
||||||
|
//
|
||||||
|
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU Lesser General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
//
|
||||||
|
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU Lesser General Public License for more details.
|
||||||
|
//
|
||||||
|
// You should have received a copy of the GNU Lesser General Public License
|
||||||
|
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
package chunk |
||||||
|
|
||||||
|
import ( |
||||||
|
"context" |
||||||
|
"errors" |
||||||
|
"math/rand" |
||||||
|
"sync" |
||||||
|
"time" |
||||||
|
|
||||||
|
"github.com/ethereum/go-ethereum/swarm/sctx" |
||||||
|
) |
||||||
|
|
||||||
|
// Tags hold tag information indexed by a unique random uint32
|
||||||
|
type Tags struct { |
||||||
|
tags *sync.Map |
||||||
|
rng *rand.Rand |
||||||
|
} |
||||||
|
|
||||||
|
// NewTags creates a tags object
|
||||||
|
func NewTags() *Tags { |
||||||
|
return &Tags{ |
||||||
|
tags: &sync.Map{}, |
||||||
|
rng: rand.New(rand.NewSource(time.Now().Unix())), |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
// New creates a new tag, stores it by the name and returns it
|
||||||
|
// it returns an error if the tag with this name already exists
|
||||||
|
func (ts *Tags) New(s string, total int64) (*Tag, error) { |
||||||
|
t := &Tag{ |
||||||
|
Uid: ts.rng.Uint32(), |
||||||
|
Name: s, |
||||||
|
startedAt: time.Now(), |
||||||
|
total: total, |
||||||
|
} |
||||||
|
if _, loaded := ts.tags.LoadOrStore(t.Uid, t); loaded { |
||||||
|
return nil, errExists |
||||||
|
} |
||||||
|
return t, nil |
||||||
|
} |
||||||
|
|
||||||
|
// All returns all existing tags in Tags' sync.Map
|
||||||
|
// Note that tags are returned in no particular order
|
||||||
|
func (ts *Tags) All() (t []*Tag) { |
||||||
|
ts.tags.Range(func(k, v interface{}) bool { |
||||||
|
t = append(t, v.(*Tag)) |
||||||
|
|
||||||
|
return true |
||||||
|
}) |
||||||
|
|
||||||
|
return t |
||||||
|
} |
||||||
|
|
||||||
|
// Get returns the undelying tag for the uid or an error if not found
|
||||||
|
func (ts *Tags) Get(uid uint32) (*Tag, error) { |
||||||
|
t, ok := ts.tags.Load(uid) |
||||||
|
if !ok { |
||||||
|
return nil, errors.New("tag not found") |
||||||
|
} |
||||||
|
return t.(*Tag), nil |
||||||
|
} |
||||||
|
|
||||||
|
// GetFromContext gets a tag from the tag uid stored in the context
|
||||||
|
func (ts *Tags) GetFromContext(ctx context.Context) (*Tag, error) { |
||||||
|
uid := sctx.GetTag(ctx) |
||||||
|
t, ok := ts.tags.Load(uid) |
||||||
|
if !ok { |
||||||
|
return nil, errTagNotFound |
||||||
|
} |
||||||
|
return t.(*Tag), nil |
||||||
|
} |
||||||
|
|
||||||
|
// Range exposes sync.Map's iterator
|
||||||
|
func (ts *Tags) Range(fn func(k, v interface{}) bool) { |
||||||
|
ts.tags.Range(fn) |
||||||
|
} |
||||||
|
|
||||||
|
func (ts *Tags) Delete(k interface{}) { |
||||||
|
ts.tags.Delete(k) |
||||||
|
} |
@ -0,0 +1,48 @@ |
|||||||
|
// Copyright 2019 The go-ethereum Authors
|
||||||
|
// This file is part of the go-ethereum library.
|
||||||
|
//
|
||||||
|
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU Lesser General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
//
|
||||||
|
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU Lesser General Public License for more details.
|
||||||
|
//
|
||||||
|
// You should have received a copy of the GNU Lesser General Public License
|
||||||
|
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
package chunk |
||||||
|
|
||||||
|
import "testing" |
||||||
|
|
||||||
|
func TestAll(t *testing.T) { |
||||||
|
ts := NewTags() |
||||||
|
|
||||||
|
ts.New("1", 1) |
||||||
|
ts.New("2", 1) |
||||||
|
|
||||||
|
all := ts.All() |
||||||
|
|
||||||
|
if len(all) != 2 { |
||||||
|
t.Fatalf("expected length to be 2 got %d", len(all)) |
||||||
|
} |
||||||
|
|
||||||
|
if n := all[0].Total(); n != 1 { |
||||||
|
t.Fatalf("expected tag 0 total to be 1 got %d", n) |
||||||
|
} |
||||||
|
|
||||||
|
if n := all[1].Total(); n != 1 { |
||||||
|
t.Fatalf("expected tag 1 total to be 1 got %d", n) |
||||||
|
} |
||||||
|
|
||||||
|
ts.New("3", 1) |
||||||
|
all = ts.All() |
||||||
|
|
||||||
|
if len(all) != 3 { |
||||||
|
t.Fatalf("expected length to be 3 got %d", len(all)) |
||||||
|
} |
||||||
|
|
||||||
|
} |
@ -0,0 +1,309 @@ |
|||||||
|
// Copyright 2019 The go-ethereum Authors
|
||||||
|
// This file is part of the go-ethereum library.
|
||||||
|
//
|
||||||
|
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU Lesser General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
//
|
||||||
|
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU Lesser General Public License for more details.
|
||||||
|
//
|
||||||
|
// You should have received a copy of the GNU Lesser General Public License
|
||||||
|
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
package stream |
||||||
|
|
||||||
|
import ( |
||||||
|
"context" |
||||||
|
"fmt" |
||||||
|
"reflect" |
||||||
|
"sort" |
||||||
|
"sync" |
||||||
|
"testing" |
||||||
|
"time" |
||||||
|
|
||||||
|
"github.com/ethereum/go-ethereum/node" |
||||||
|
"github.com/ethereum/go-ethereum/p2p/enode" |
||||||
|
"github.com/ethereum/go-ethereum/p2p/simulations/adapters" |
||||||
|
"github.com/ethereum/go-ethereum/swarm/chunk" |
||||||
|
"github.com/ethereum/go-ethereum/swarm/network" |
||||||
|
"github.com/ethereum/go-ethereum/swarm/network/simulation" |
||||||
|
"github.com/ethereum/go-ethereum/swarm/state" |
||||||
|
) |
||||||
|
|
||||||
|
// TestSyncSubscriptionsDiff validates the output of syncSubscriptionsDiff
|
||||||
|
// function for various arguments.
|
||||||
|
func TestSyncSubscriptionsDiff(t *testing.T) { |
||||||
|
max := network.NewKadParams().MaxProxDisplay |
||||||
|
for _, tc := range []struct { |
||||||
|
po, prevDepth, newDepth int |
||||||
|
subBins, quitBins []int |
||||||
|
}{ |
||||||
|
{ |
||||||
|
po: 0, prevDepth: -1, newDepth: 0, |
||||||
|
subBins: []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, |
||||||
|
}, |
||||||
|
{ |
||||||
|
po: 1, prevDepth: -1, newDepth: 0, |
||||||
|
subBins: []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, |
||||||
|
}, |
||||||
|
{ |
||||||
|
po: 2, prevDepth: -1, newDepth: 0, |
||||||
|
subBins: []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, |
||||||
|
}, |
||||||
|
{ |
||||||
|
po: 0, prevDepth: -1, newDepth: 1, |
||||||
|
subBins: []int{0}, |
||||||
|
}, |
||||||
|
{ |
||||||
|
po: 1, prevDepth: -1, newDepth: 1, |
||||||
|
subBins: []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, |
||||||
|
}, |
||||||
|
{ |
||||||
|
po: 2, prevDepth: -1, newDepth: 2, |
||||||
|
subBins: []int{2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, |
||||||
|
}, |
||||||
|
{ |
||||||
|
po: 3, prevDepth: -1, newDepth: 2, |
||||||
|
subBins: []int{2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, |
||||||
|
}, |
||||||
|
{ |
||||||
|
po: 1, prevDepth: -1, newDepth: 2, |
||||||
|
subBins: []int{1}, |
||||||
|
}, |
||||||
|
{ |
||||||
|
po: 0, prevDepth: 0, newDepth: 0, // 0-16 -> 0-16
|
||||||
|
}, |
||||||
|
{ |
||||||
|
po: 1, prevDepth: 0, newDepth: 0, // 0-16 -> 0-16
|
||||||
|
}, |
||||||
|
{ |
||||||
|
po: 0, prevDepth: 0, newDepth: 1, // 0-16 -> 0
|
||||||
|
quitBins: []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, |
||||||
|
}, |
||||||
|
{ |
||||||
|
po: 0, prevDepth: 0, newDepth: 2, // 0-16 -> 0
|
||||||
|
quitBins: []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, |
||||||
|
}, |
||||||
|
{ |
||||||
|
po: 1, prevDepth: 0, newDepth: 1, // 0-16 -> 1-16
|
||||||
|
quitBins: []int{0}, |
||||||
|
}, |
||||||
|
{ |
||||||
|
po: 1, prevDepth: 1, newDepth: 0, // 1-16 -> 0-16
|
||||||
|
subBins: []int{0}, |
||||||
|
}, |
||||||
|
{ |
||||||
|
po: 4, prevDepth: 0, newDepth: 1, // 0-16 -> 1-16
|
||||||
|
quitBins: []int{0}, |
||||||
|
}, |
||||||
|
{ |
||||||
|
po: 4, prevDepth: 0, newDepth: 4, // 0-16 -> 4-16
|
||||||
|
quitBins: []int{0, 1, 2, 3}, |
||||||
|
}, |
||||||
|
{ |
||||||
|
po: 4, prevDepth: 0, newDepth: 5, // 0-16 -> 4
|
||||||
|
quitBins: []int{0, 1, 2, 3, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, |
||||||
|
}, |
||||||
|
{ |
||||||
|
po: 4, prevDepth: 5, newDepth: 0, // 4 -> 0-16
|
||||||
|
subBins: []int{0, 1, 2, 3, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, |
||||||
|
}, |
||||||
|
{ |
||||||
|
po: 4, prevDepth: 5, newDepth: 6, // 4 -> 4
|
||||||
|
}, |
||||||
|
} { |
||||||
|
subBins, quitBins := syncSubscriptionsDiff(tc.po, tc.prevDepth, tc.newDepth, max) |
||||||
|
if fmt.Sprint(subBins) != fmt.Sprint(tc.subBins) { |
||||||
|
t.Errorf("po: %v, prevDepth: %v, newDepth: %v: got subBins %v, want %v", tc.po, tc.prevDepth, tc.newDepth, subBins, tc.subBins) |
||||||
|
} |
||||||
|
if fmt.Sprint(quitBins) != fmt.Sprint(tc.quitBins) { |
||||||
|
t.Errorf("po: %v, prevDepth: %v, newDepth: %v: got quitBins %v, want %v", tc.po, tc.prevDepth, tc.newDepth, quitBins, tc.quitBins) |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
// TestUpdateSyncingSubscriptions validates that syncing subscriptions are correctly
|
||||||
|
// made on initial node connections and that subscriptions are correctly changed
|
||||||
|
// when kademlia neighbourhood depth is changed by connecting more nodes.
|
||||||
|
func TestUpdateSyncingSubscriptions(t *testing.T) { |
||||||
|
sim := simulation.New(map[string]simulation.ServiceFunc{ |
||||||
|
"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { |
||||||
|
addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket) |
||||||
|
if err != nil { |
||||||
|
return nil, nil, err |
||||||
|
} |
||||||
|
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ |
||||||
|
SyncUpdateDelay: 100 * time.Millisecond, |
||||||
|
Syncing: SyncingAutoSubscribe, |
||||||
|
}, nil) |
||||||
|
cleanup = func() { |
||||||
|
r.Close() |
||||||
|
clean() |
||||||
|
} |
||||||
|
bucket.Store("bzz-address", addr) |
||||||
|
return r, cleanup, nil |
||||||
|
}, |
||||||
|
}) |
||||||
|
defer sim.Close() |
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) |
||||||
|
defer cancel() |
||||||
|
|
||||||
|
result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) (err error) { |
||||||
|
// initial nodes, first one as pivot center of the start
|
||||||
|
ids, err := sim.AddNodesAndConnectStar(10) |
||||||
|
if err != nil { |
||||||
|
return err |
||||||
|
} |
||||||
|
|
||||||
|
// pivot values
|
||||||
|
pivotRegistryID := ids[0] |
||||||
|
pivotRegistry := sim.Service("streamer", pivotRegistryID).(*Registry) |
||||||
|
pivotKademlia := pivotRegistry.delivery.kad |
||||||
|
// nodes proximities from the pivot node
|
||||||
|
nodeProximities := make(map[string]int) |
||||||
|
for _, id := range ids[1:] { |
||||||
|
bzzAddr, ok := sim.NodeItem(id, "bzz-address") |
||||||
|
if !ok { |
||||||
|
t.Fatal("no bzz address for node") |
||||||
|
} |
||||||
|
nodeProximities[id.String()] = chunk.Proximity(pivotKademlia.BaseAddr(), bzzAddr.(*network.BzzAddr).Over()) |
||||||
|
} |
||||||
|
// wait until sync subscriptions are done for all nodes
|
||||||
|
waitForSubscriptions(t, pivotRegistry, ids[1:]...) |
||||||
|
|
||||||
|
// check initial sync streams
|
||||||
|
err = checkSyncStreamsWithRetry(pivotRegistry, nodeProximities) |
||||||
|
if err != nil { |
||||||
|
return err |
||||||
|
} |
||||||
|
|
||||||
|
// add more nodes until the depth is changed
|
||||||
|
prevDepth := pivotKademlia.NeighbourhoodDepth() |
||||||
|
var noDepthChangeChecked bool // true it there was a check when no depth is changed
|
||||||
|
for { |
||||||
|
ids, err := sim.AddNodes(5) |
||||||
|
if err != nil { |
||||||
|
return err |
||||||
|
} |
||||||
|
// add new nodes to sync subscriptions check
|
||||||
|
for _, id := range ids { |
||||||
|
bzzAddr, ok := sim.NodeItem(id, "bzz-address") |
||||||
|
if !ok { |
||||||
|
t.Fatal("no bzz address for node") |
||||||
|
} |
||||||
|
nodeProximities[id.String()] = chunk.Proximity(pivotKademlia.BaseAddr(), bzzAddr.(*network.BzzAddr).Over()) |
||||||
|
} |
||||||
|
err = sim.Net.ConnectNodesStar(ids, pivotRegistryID) |
||||||
|
if err != nil { |
||||||
|
return err |
||||||
|
} |
||||||
|
waitForSubscriptions(t, pivotRegistry, ids...) |
||||||
|
|
||||||
|
newDepth := pivotKademlia.NeighbourhoodDepth() |
||||||
|
// depth is not changed, check if streams are still correct
|
||||||
|
if newDepth == prevDepth { |
||||||
|
err = checkSyncStreamsWithRetry(pivotRegistry, nodeProximities) |
||||||
|
if err != nil { |
||||||
|
return err |
||||||
|
} |
||||||
|
noDepthChangeChecked = true |
||||||
|
} |
||||||
|
// do the final check when depth is changed and
|
||||||
|
// there has been at least one check
|
||||||
|
// for the case when depth is not changed
|
||||||
|
if newDepth != prevDepth && noDepthChangeChecked { |
||||||
|
// check sync streams for changed depth
|
||||||
|
return checkSyncStreamsWithRetry(pivotRegistry, nodeProximities) |
||||||
|
} |
||||||
|
prevDepth = newDepth |
||||||
|
} |
||||||
|
}) |
||||||
|
if result.Error != nil { |
||||||
|
t.Fatal(result.Error) |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
// waitForSubscriptions is a test helper function that blocks until
|
||||||
|
// stream server subscriptions are established on the provided registry
|
||||||
|
// to the nodes with provided IDs.
|
||||||
|
func waitForSubscriptions(t *testing.T, r *Registry, ids ...enode.ID) { |
||||||
|
t.Helper() |
||||||
|
|
||||||
|
for retries := 0; retries < 100; retries++ { |
||||||
|
subs := r.api.GetPeerServerSubscriptions() |
||||||
|
if allSubscribed(subs, ids) { |
||||||
|
return |
||||||
|
} |
||||||
|
time.Sleep(50 * time.Millisecond) |
||||||
|
} |
||||||
|
t.Fatalf("missing subscriptions") |
||||||
|
} |
||||||
|
|
||||||
|
// allSubscribed returns true if nodes with ids have subscriptions
|
||||||
|
// in provided subs map.
|
||||||
|
func allSubscribed(subs map[string][]string, ids []enode.ID) bool { |
||||||
|
for _, id := range ids { |
||||||
|
if s, ok := subs[id.String()]; !ok || len(s) == 0 { |
||||||
|
return false |
||||||
|
} |
||||||
|
} |
||||||
|
return true |
||||||
|
} |
||||||
|
|
||||||
|
// checkSyncStreamsWithRetry is calling checkSyncStreams with retries.
|
||||||
|
func checkSyncStreamsWithRetry(r *Registry, nodeProximities map[string]int) (err error) { |
||||||
|
for retries := 0; retries < 5; retries++ { |
||||||
|
err = checkSyncStreams(r, nodeProximities) |
||||||
|
if err == nil { |
||||||
|
return nil |
||||||
|
} |
||||||
|
time.Sleep(500 * time.Millisecond) |
||||||
|
} |
||||||
|
return err |
||||||
|
} |
||||||
|
|
||||||
|
// checkSyncStreams validates that registry contains expected sync
|
||||||
|
// subscriptions to nodes with proximities in a map nodeProximities.
|
||||||
|
func checkSyncStreams(r *Registry, nodeProximities map[string]int) error { |
||||||
|
depth := r.delivery.kad.NeighbourhoodDepth() |
||||||
|
maxPO := r.delivery.kad.MaxProxDisplay |
||||||
|
for id, po := range nodeProximities { |
||||||
|
wantStreams := syncStreams(po, depth, maxPO) |
||||||
|
gotStreams := nodeStreams(r, id) |
||||||
|
|
||||||
|
if r.getPeer(enode.HexID(id)) == nil { |
||||||
|
// ignore removed peer
|
||||||
|
continue |
||||||
|
} |
||||||
|
|
||||||
|
if !reflect.DeepEqual(gotStreams, wantStreams) { |
||||||
|
return fmt.Errorf("node %s got streams %v, want %v", id, gotStreams, wantStreams) |
||||||
|
} |
||||||
|
} |
||||||
|
return nil |
||||||
|
} |
||||||
|
|
||||||
|
// syncStreams returns expected sync streams that need to be
|
||||||
|
// established between a node with kademlia neighbourhood depth
|
||||||
|
// and a node with proximity order po.
|
||||||
|
func syncStreams(po, depth, maxPO int) (streams []string) { |
||||||
|
start, end := syncBins(po, depth, maxPO) |
||||||
|
for bin := start; bin < end; bin++ { |
||||||
|
streams = append(streams, NewStream("SYNC", FormatSyncBinKey(uint8(bin)), false).String()) |
||||||
|
streams = append(streams, NewStream("SYNC", FormatSyncBinKey(uint8(bin)), true).String()) |
||||||
|
} |
||||||
|
return streams |
||||||
|
} |
||||||
|
|
||||||
|
// nodeStreams returns stream server subscriptions on a registry
|
||||||
|
// to the peer with provided id.
|
||||||
|
func nodeStreams(r *Registry, id string) []string { |
||||||
|
streams := r.api.GetPeerServerSubscriptions()[id] |
||||||
|
sort.Strings(streams) |
||||||
|
return streams |
||||||
|
} |
@ -1,82 +0,0 @@ |
|||||||
// Copyright 2016 The go-ethereum Authors
|
|
||||||
// This file is part of the go-ethereum library.
|
|
||||||
//
|
|
||||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
|
||||||
// it under the terms of the GNU Lesser General Public License as published by
|
|
||||||
// the Free Software Foundation, either version 3 of the License, or
|
|
||||||
// (at your option) any later version.
|
|
||||||
//
|
|
||||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
|
||||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
// GNU Lesser General Public License for more details.
|
|
||||||
//
|
|
||||||
// You should have received a copy of the GNU Lesser General Public License
|
|
||||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
|
|
||||||
package storage |
|
||||||
|
|
||||||
// this is a clone of an earlier state of the ethereum ethdb/database
|
|
||||||
// no need for queueing/caching
|
|
||||||
|
|
||||||
import ( |
|
||||||
"github.com/ethereum/go-ethereum/metrics" |
|
||||||
"github.com/syndtr/goleveldb/leveldb" |
|
||||||
"github.com/syndtr/goleveldb/leveldb/iterator" |
|
||||||
"github.com/syndtr/goleveldb/leveldb/opt" |
|
||||||
) |
|
||||||
|
|
||||||
const openFileLimit = 128 |
|
||||||
|
|
||||||
type LDBDatabase struct { |
|
||||||
db *leveldb.DB |
|
||||||
} |
|
||||||
|
|
||||||
func NewLDBDatabase(file string) (*LDBDatabase, error) { |
|
||||||
// Open the db
|
|
||||||
db, err := leveldb.OpenFile(file, &opt.Options{OpenFilesCacheCapacity: openFileLimit}) |
|
||||||
if err != nil { |
|
||||||
return nil, err |
|
||||||
} |
|
||||||
|
|
||||||
database := &LDBDatabase{db: db} |
|
||||||
|
|
||||||
return database, nil |
|
||||||
} |
|
||||||
|
|
||||||
func (db *LDBDatabase) Put(key []byte, value []byte) error { |
|
||||||
metrics.GetOrRegisterCounter("ldbdatabase.put", nil).Inc(1) |
|
||||||
|
|
||||||
return db.db.Put(key, value, nil) |
|
||||||
} |
|
||||||
|
|
||||||
func (db *LDBDatabase) Get(key []byte) ([]byte, error) { |
|
||||||
metrics.GetOrRegisterCounter("ldbdatabase.get", nil).Inc(1) |
|
||||||
|
|
||||||
dat, err := db.db.Get(key, nil) |
|
||||||
if err != nil { |
|
||||||
return nil, err |
|
||||||
} |
|
||||||
return dat, nil |
|
||||||
} |
|
||||||
|
|
||||||
func (db *LDBDatabase) Delete(key []byte) error { |
|
||||||
return db.db.Delete(key, nil) |
|
||||||
} |
|
||||||
|
|
||||||
func (db *LDBDatabase) NewIterator() iterator.Iterator { |
|
||||||
metrics.GetOrRegisterCounter("ldbdatabase.newiterator", nil).Inc(1) |
|
||||||
|
|
||||||
return db.db.NewIterator(nil, nil) |
|
||||||
} |
|
||||||
|
|
||||||
func (db *LDBDatabase) Write(batch *leveldb.Batch) error { |
|
||||||
metrics.GetOrRegisterCounter("ldbdatabase.write", nil).Inc(1) |
|
||||||
|
|
||||||
return db.db.Write(batch, nil) |
|
||||||
} |
|
||||||
|
|
||||||
func (db *LDBDatabase) Close() { |
|
||||||
// Close the leveldb database
|
|
||||||
db.db.Close() |
|
||||||
} |
|
File diff suppressed because it is too large
Load Diff
@ -1,788 +0,0 @@ |
|||||||
// Copyright 2016 The go-ethereum Authors
|
|
||||||
// This file is part of the go-ethereum library.
|
|
||||||
//
|
|
||||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
|
||||||
// it under the terms of the GNU Lesser General Public License as published by
|
|
||||||
// the Free Software Foundation, either version 3 of the License, or
|
|
||||||
// (at your option) any later version.
|
|
||||||
//
|
|
||||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
|
||||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
// GNU Lesser General Public License for more details.
|
|
||||||
//
|
|
||||||
// You should have received a copy of the GNU Lesser General Public License
|
|
||||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
|
|
||||||
package storage |
|
||||||
|
|
||||||
import ( |
|
||||||
"bytes" |
|
||||||
"context" |
|
||||||
"encoding/binary" |
|
||||||
"fmt" |
|
||||||
"io/ioutil" |
|
||||||
"os" |
|
||||||
"strconv" |
|
||||||
"strings" |
|
||||||
"testing" |
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/common" |
|
||||||
"github.com/ethereum/go-ethereum/swarm/chunk" |
|
||||||
"github.com/ethereum/go-ethereum/swarm/log" |
|
||||||
"github.com/ethereum/go-ethereum/swarm/storage/mock/mem" |
|
||||||
"github.com/ethereum/go-ethereum/swarm/testutil" |
|
||||||
ldberrors "github.com/syndtr/goleveldb/leveldb/errors" |
|
||||||
) |
|
||||||
|
|
||||||
type testDbStore struct { |
|
||||||
*LDBStore |
|
||||||
dir string |
|
||||||
} |
|
||||||
|
|
||||||
func newTestDbStore(mock bool, trusted bool) (*testDbStore, func(), error) { |
|
||||||
dir, err := ioutil.TempDir("", "bzz-storage-test") |
|
||||||
if err != nil { |
|
||||||
return nil, func() {}, err |
|
||||||
} |
|
||||||
|
|
||||||
var db *LDBStore |
|
||||||
storeparams := NewDefaultStoreParams() |
|
||||||
params := NewLDBStoreParams(storeparams, dir) |
|
||||||
params.Po = testPoFunc |
|
||||||
|
|
||||||
if mock { |
|
||||||
globalStore := mem.NewGlobalStore() |
|
||||||
addr := common.HexToAddress("0x5aaeb6053f3e94c9b9a09f33669435e7ef1beaed") |
|
||||||
mockStore := globalStore.NewNodeStore(addr) |
|
||||||
|
|
||||||
db, err = NewMockDbStore(params, mockStore) |
|
||||||
} else { |
|
||||||
db, err = NewLDBStore(params) |
|
||||||
} |
|
||||||
|
|
||||||
cleanup := func() { |
|
||||||
if db != nil { |
|
||||||
db.Close() |
|
||||||
} |
|
||||||
err = os.RemoveAll(dir) |
|
||||||
if err != nil { |
|
||||||
panic(fmt.Sprintf("db cleanup failed: %v", err)) |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
return &testDbStore{db, dir}, cleanup, err |
|
||||||
} |
|
||||||
|
|
||||||
func testPoFunc(k Address) (ret uint8) { |
|
||||||
basekey := make([]byte, 32) |
|
||||||
return uint8(Proximity(basekey, k[:])) |
|
||||||
} |
|
||||||
|
|
||||||
func testDbStoreRandom(n int, mock bool, t *testing.T) { |
|
||||||
db, cleanup, err := newTestDbStore(mock, true) |
|
||||||
defer cleanup() |
|
||||||
if err != nil { |
|
||||||
t.Fatalf("init dbStore failed: %v", err) |
|
||||||
} |
|
||||||
testStoreRandom(db, n, t) |
|
||||||
} |
|
||||||
|
|
||||||
func testDbStoreCorrect(n int, mock bool, t *testing.T) { |
|
||||||
db, cleanup, err := newTestDbStore(mock, false) |
|
||||||
defer cleanup() |
|
||||||
if err != nil { |
|
||||||
t.Fatalf("init dbStore failed: %v", err) |
|
||||||
} |
|
||||||
testStoreCorrect(db, n, t) |
|
||||||
} |
|
||||||
|
|
||||||
func TestMarkAccessed(t *testing.T) { |
|
||||||
db, cleanup, err := newTestDbStore(false, true) |
|
||||||
defer cleanup() |
|
||||||
if err != nil { |
|
||||||
t.Fatalf("init dbStore failed: %v", err) |
|
||||||
} |
|
||||||
|
|
||||||
h := GenerateRandomChunk(chunk.DefaultSize) |
|
||||||
|
|
||||||
db.Put(context.Background(), h) |
|
||||||
|
|
||||||
var index dpaDBIndex |
|
||||||
addr := h.Address() |
|
||||||
idxk := getIndexKey(addr) |
|
||||||
|
|
||||||
idata, err := db.db.Get(idxk) |
|
||||||
if err != nil { |
|
||||||
t.Fatal(err) |
|
||||||
} |
|
||||||
decodeIndex(idata, &index) |
|
||||||
|
|
||||||
if index.Access != 0 { |
|
||||||
t.Fatalf("Expected the access index to be %d, but it is %d", 0, index.Access) |
|
||||||
} |
|
||||||
|
|
||||||
db.MarkAccessed(addr) |
|
||||||
db.writeCurrentBatch() |
|
||||||
|
|
||||||
idata, err = db.db.Get(idxk) |
|
||||||
if err != nil { |
|
||||||
t.Fatal(err) |
|
||||||
} |
|
||||||
decodeIndex(idata, &index) |
|
||||||
|
|
||||||
if index.Access != 1 { |
|
||||||
t.Fatalf("Expected the access index to be %d, but it is %d", 1, index.Access) |
|
||||||
} |
|
||||||
|
|
||||||
} |
|
||||||
|
|
||||||
func TestDbStoreRandom_1(t *testing.T) { |
|
||||||
testDbStoreRandom(1, false, t) |
|
||||||
} |
|
||||||
|
|
||||||
func TestDbStoreCorrect_1(t *testing.T) { |
|
||||||
testDbStoreCorrect(1, false, t) |
|
||||||
} |
|
||||||
|
|
||||||
func TestDbStoreRandom_1k(t *testing.T) { |
|
||||||
testDbStoreRandom(1000, false, t) |
|
||||||
} |
|
||||||
|
|
||||||
func TestDbStoreCorrect_1k(t *testing.T) { |
|
||||||
testDbStoreCorrect(1000, false, t) |
|
||||||
} |
|
||||||
|
|
||||||
func TestMockDbStoreRandom_1(t *testing.T) { |
|
||||||
testDbStoreRandom(1, true, t) |
|
||||||
} |
|
||||||
|
|
||||||
func TestMockDbStoreCorrect_1(t *testing.T) { |
|
||||||
testDbStoreCorrect(1, true, t) |
|
||||||
} |
|
||||||
|
|
||||||
func TestMockDbStoreRandom_1k(t *testing.T) { |
|
||||||
testDbStoreRandom(1000, true, t) |
|
||||||
} |
|
||||||
|
|
||||||
func TestMockDbStoreCorrect_1k(t *testing.T) { |
|
||||||
testDbStoreCorrect(1000, true, t) |
|
||||||
} |
|
||||||
|
|
||||||
func testDbStoreNotFound(t *testing.T, mock bool) { |
|
||||||
db, cleanup, err := newTestDbStore(mock, false) |
|
||||||
defer cleanup() |
|
||||||
if err != nil { |
|
||||||
t.Fatalf("init dbStore failed: %v", err) |
|
||||||
} |
|
||||||
|
|
||||||
_, err = db.Get(context.TODO(), ZeroAddr) |
|
||||||
if err != ErrChunkNotFound { |
|
||||||
t.Errorf("Expected ErrChunkNotFound, got %v", err) |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
func TestDbStoreNotFound(t *testing.T) { |
|
||||||
testDbStoreNotFound(t, false) |
|
||||||
} |
|
||||||
func TestMockDbStoreNotFound(t *testing.T) { |
|
||||||
testDbStoreNotFound(t, true) |
|
||||||
} |
|
||||||
|
|
||||||
func testIterator(t *testing.T, mock bool) { |
|
||||||
var i int |
|
||||||
var poc uint |
|
||||||
chunkcount := 32 |
|
||||||
chunkkeys := NewAddressCollection(chunkcount) |
|
||||||
chunkkeysResults := NewAddressCollection(chunkcount) |
|
||||||
|
|
||||||
db, cleanup, err := newTestDbStore(mock, false) |
|
||||||
defer cleanup() |
|
||||||
if err != nil { |
|
||||||
t.Fatalf("init dbStore failed: %v", err) |
|
||||||
} |
|
||||||
|
|
||||||
chunks := GenerateRandomChunks(chunk.DefaultSize, chunkcount) |
|
||||||
|
|
||||||
for i = 0; i < len(chunks); i++ { |
|
||||||
chunkkeys[i] = chunks[i].Address() |
|
||||||
err := db.Put(context.TODO(), chunks[i]) |
|
||||||
if err != nil { |
|
||||||
t.Fatalf("dbStore.Put failed: %v", err) |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
for i = 0; i < len(chunkkeys); i++ { |
|
||||||
log.Trace(fmt.Sprintf("Chunk array pos %d/%d: '%v'", i, chunkcount, chunkkeys[i])) |
|
||||||
} |
|
||||||
i = 0 |
|
||||||
for poc = 0; poc <= 255; poc++ { |
|
||||||
err := db.SyncIterator(0, uint64(chunkkeys.Len()), uint8(poc), func(k Address, n uint64) bool { |
|
||||||
log.Trace(fmt.Sprintf("Got key %v number %d poc %d", k, n, uint8(poc))) |
|
||||||
chunkkeysResults[n] = k |
|
||||||
i++ |
|
||||||
return true |
|
||||||
}) |
|
||||||
if err != nil { |
|
||||||
t.Fatalf("Iterator call failed: %v", err) |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
for i = 0; i < chunkcount; i++ { |
|
||||||
if !bytes.Equal(chunkkeys[i], chunkkeysResults[i]) { |
|
||||||
t.Fatalf("Chunk put #%d key '%v' does not match iterator's key '%v'", i, chunkkeys[i], chunkkeysResults[i]) |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
} |
|
||||||
|
|
||||||
func TestIterator(t *testing.T) { |
|
||||||
testIterator(t, false) |
|
||||||
} |
|
||||||
func TestMockIterator(t *testing.T) { |
|
||||||
testIterator(t, true) |
|
||||||
} |
|
||||||
|
|
||||||
func benchmarkDbStorePut(n int, mock bool, b *testing.B) { |
|
||||||
db, cleanup, err := newTestDbStore(mock, true) |
|
||||||
defer cleanup() |
|
||||||
if err != nil { |
|
||||||
b.Fatalf("init dbStore failed: %v", err) |
|
||||||
} |
|
||||||
benchmarkStorePut(db, n, b) |
|
||||||
} |
|
||||||
|
|
||||||
func benchmarkDbStoreGet(n int, mock bool, b *testing.B) { |
|
||||||
db, cleanup, err := newTestDbStore(mock, true) |
|
||||||
defer cleanup() |
|
||||||
if err != nil { |
|
||||||
b.Fatalf("init dbStore failed: %v", err) |
|
||||||
} |
|
||||||
benchmarkStoreGet(db, n, b) |
|
||||||
} |
|
||||||
|
|
||||||
func BenchmarkDbStorePut_500(b *testing.B) { |
|
||||||
benchmarkDbStorePut(500, false, b) |
|
||||||
} |
|
||||||
|
|
||||||
func BenchmarkDbStoreGet_500(b *testing.B) { |
|
||||||
benchmarkDbStoreGet(500, false, b) |
|
||||||
} |
|
||||||
|
|
||||||
func BenchmarkMockDbStorePut_500(b *testing.B) { |
|
||||||
benchmarkDbStorePut(500, true, b) |
|
||||||
} |
|
||||||
|
|
||||||
func BenchmarkMockDbStoreGet_500(b *testing.B) { |
|
||||||
benchmarkDbStoreGet(500, true, b) |
|
||||||
} |
|
||||||
|
|
||||||
// TestLDBStoreWithoutCollectGarbage tests that we can put a number of random chunks in the LevelDB store, and
|
|
||||||
// retrieve them, provided we don't hit the garbage collection
|
|
||||||
func TestLDBStoreWithoutCollectGarbage(t *testing.T) { |
|
||||||
capacity := 50 |
|
||||||
n := 10 |
|
||||||
|
|
||||||
ldb, cleanup := newLDBStore(t) |
|
||||||
ldb.setCapacity(uint64(capacity)) |
|
||||||
defer cleanup() |
|
||||||
|
|
||||||
chunks, err := mputRandomChunks(ldb, n) |
|
||||||
if err != nil { |
|
||||||
t.Fatal(err.Error()) |
|
||||||
} |
|
||||||
|
|
||||||
log.Info("ldbstore", "entrycnt", ldb.entryCnt, "accesscnt", ldb.accessCnt) |
|
||||||
|
|
||||||
for _, ch := range chunks { |
|
||||||
ret, err := ldb.Get(context.TODO(), ch.Address()) |
|
||||||
if err != nil { |
|
||||||
t.Fatal(err) |
|
||||||
} |
|
||||||
|
|
||||||
if !bytes.Equal(ret.Data(), ch.Data()) { |
|
||||||
t.Fatal("expected to get the same data back, but got smth else") |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
if ldb.entryCnt != uint64(n) { |
|
||||||
t.Fatalf("expected entryCnt to be equal to %v, but got %v", n, ldb.entryCnt) |
|
||||||
} |
|
||||||
|
|
||||||
if ldb.accessCnt != uint64(2*n) { |
|
||||||
t.Fatalf("expected accessCnt to be equal to %v, but got %v", 2*n, ldb.accessCnt) |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
// TestLDBStoreCollectGarbage tests that we can put more chunks than LevelDB's capacity, and
|
|
||||||
// retrieve only some of them, because garbage collection must have partially cleared the store
|
|
||||||
// Also tests that we can delete chunks and that we can trigger garbage collection
|
|
||||||
func TestLDBStoreCollectGarbage(t *testing.T) { |
|
||||||
|
|
||||||
// below max ronud
|
|
||||||
initialCap := defaultMaxGCRound / 100 |
|
||||||
cap := initialCap / 2 |
|
||||||
t.Run(fmt.Sprintf("A/%d/%d", cap, cap*4), testLDBStoreCollectGarbage) |
|
||||||
|
|
||||||
if testutil.RaceEnabled { |
|
||||||
t.Skip("only the simplest case run as others are flaky with race") |
|
||||||
// Note: some tests fail consistently and even locally with `-race`
|
|
||||||
} |
|
||||||
|
|
||||||
t.Run(fmt.Sprintf("B/%d/%d", cap, cap*4), testLDBStoreRemoveThenCollectGarbage) |
|
||||||
|
|
||||||
// at max round
|
|
||||||
cap = initialCap |
|
||||||
t.Run(fmt.Sprintf("A/%d/%d", cap, cap*4), testLDBStoreCollectGarbage) |
|
||||||
t.Run(fmt.Sprintf("B/%d/%d", cap, cap*4), testLDBStoreRemoveThenCollectGarbage) |
|
||||||
|
|
||||||
// more than max around, not on threshold
|
|
||||||
cap = initialCap + 500 |
|
||||||
t.Run(fmt.Sprintf("A/%d/%d", cap, cap*4), testLDBStoreCollectGarbage) |
|
||||||
t.Run(fmt.Sprintf("B/%d/%d", cap, cap*4), testLDBStoreRemoveThenCollectGarbage) |
|
||||||
|
|
||||||
} |
|
||||||
|
|
||||||
func testLDBStoreCollectGarbage(t *testing.T) { |
|
||||||
params := strings.Split(t.Name(), "/") |
|
||||||
capacity, err := strconv.Atoi(params[2]) |
|
||||||
if err != nil { |
|
||||||
t.Fatal(err) |
|
||||||
} |
|
||||||
n, err := strconv.Atoi(params[3]) |
|
||||||
if err != nil { |
|
||||||
t.Fatal(err) |
|
||||||
} |
|
||||||
|
|
||||||
ldb, cleanup := newLDBStore(t) |
|
||||||
ldb.setCapacity(uint64(capacity)) |
|
||||||
defer cleanup() |
|
||||||
|
|
||||||
// retrieve the gc round target count for the db capacity
|
|
||||||
ldb.startGC(capacity) |
|
||||||
roundTarget := ldb.gc.target |
|
||||||
|
|
||||||
// split put counts to gc target count threshold, and wait for gc to finish in between
|
|
||||||
var allChunks []Chunk |
|
||||||
remaining := n |
|
||||||
for remaining > 0 { |
|
||||||
var putCount int |
|
||||||
if remaining < roundTarget { |
|
||||||
putCount = remaining |
|
||||||
} else { |
|
||||||
putCount = roundTarget |
|
||||||
} |
|
||||||
remaining -= putCount |
|
||||||
chunks, err := mputRandomChunks(ldb, putCount) |
|
||||||
if err != nil { |
|
||||||
t.Fatal(err.Error()) |
|
||||||
} |
|
||||||
allChunks = append(allChunks, chunks...) |
|
||||||
ldb.lock.RLock() |
|
||||||
log.Debug("ldbstore", "entrycnt", ldb.entryCnt, "accesscnt", ldb.accessCnt, "cap", capacity, "n", n) |
|
||||||
ldb.lock.RUnlock() |
|
||||||
|
|
||||||
waitGc(ldb) |
|
||||||
} |
|
||||||
|
|
||||||
// attempt gets on all put chunks
|
|
||||||
var missing int |
|
||||||
for _, ch := range allChunks { |
|
||||||
ret, err := ldb.Get(context.TODO(), ch.Address()) |
|
||||||
if err == ErrChunkNotFound || err == ldberrors.ErrNotFound { |
|
||||||
missing++ |
|
||||||
continue |
|
||||||
} |
|
||||||
if err != nil { |
|
||||||
t.Fatal(err) |
|
||||||
} |
|
||||||
|
|
||||||
if !bytes.Equal(ret.Data(), ch.Data()) { |
|
||||||
t.Fatal("expected to get the same data back, but got smth else") |
|
||||||
} |
|
||||||
|
|
||||||
log.Trace("got back chunk", "chunk", ret) |
|
||||||
} |
|
||||||
|
|
||||||
// all surplus chunks should be missing
|
|
||||||
expectMissing := roundTarget + (((n - capacity) / roundTarget) * roundTarget) |
|
||||||
if missing != expectMissing { |
|
||||||
t.Fatalf("gc failure: expected to miss %v chunks, but only %v are actually missing", expectMissing, missing) |
|
||||||
} |
|
||||||
|
|
||||||
log.Info("ldbstore", "total", n, "missing", missing, "entrycnt", ldb.entryCnt, "accesscnt", ldb.accessCnt) |
|
||||||
} |
|
||||||
|
|
||||||
// TestLDBStoreAddRemove tests that we can put and then delete a given chunk
|
|
||||||
func TestLDBStoreAddRemove(t *testing.T) { |
|
||||||
ldb, cleanup := newLDBStore(t) |
|
||||||
ldb.setCapacity(200) |
|
||||||
defer cleanup() |
|
||||||
|
|
||||||
n := 100 |
|
||||||
chunks, err := mputRandomChunks(ldb, n) |
|
||||||
if err != nil { |
|
||||||
t.Fatalf(err.Error()) |
|
||||||
} |
|
||||||
|
|
||||||
for i := 0; i < n; i++ { |
|
||||||
// delete all even index chunks
|
|
||||||
if i%2 == 0 { |
|
||||||
ldb.Delete(chunks[i].Address()) |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
log.Info("ldbstore", "entrycnt", ldb.entryCnt, "accesscnt", ldb.accessCnt) |
|
||||||
|
|
||||||
for i := 0; i < n; i++ { |
|
||||||
ret, err := ldb.Get(context.TODO(), chunks[i].Address()) |
|
||||||
|
|
||||||
if i%2 == 0 { |
|
||||||
// expect even chunks to be missing
|
|
||||||
if err == nil { |
|
||||||
t.Fatal("expected chunk to be missing, but got no error") |
|
||||||
} |
|
||||||
} else { |
|
||||||
// expect odd chunks to be retrieved successfully
|
|
||||||
if err != nil { |
|
||||||
t.Fatalf("expected no error, but got %s", err) |
|
||||||
} |
|
||||||
|
|
||||||
if !bytes.Equal(ret.Data(), chunks[i].Data()) { |
|
||||||
t.Fatal("expected to get the same data back, but got smth else") |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
func testLDBStoreRemoveThenCollectGarbage(t *testing.T) { |
|
||||||
t.Skip("flaky with -race flag") |
|
||||||
|
|
||||||
params := strings.Split(t.Name(), "/") |
|
||||||
capacity, err := strconv.Atoi(params[2]) |
|
||||||
if err != nil { |
|
||||||
t.Fatal(err) |
|
||||||
} |
|
||||||
n, err := strconv.Atoi(params[3]) |
|
||||||
if err != nil { |
|
||||||
t.Fatal(err) |
|
||||||
} |
|
||||||
|
|
||||||
ldb, cleanup := newLDBStore(t) |
|
||||||
defer cleanup() |
|
||||||
ldb.setCapacity(uint64(capacity)) |
|
||||||
|
|
||||||
// put capacity count number of chunks
|
|
||||||
chunks := make([]Chunk, n) |
|
||||||
for i := 0; i < n; i++ { |
|
||||||
c := GenerateRandomChunk(chunk.DefaultSize) |
|
||||||
chunks[i] = c |
|
||||||
log.Trace("generate random chunk", "idx", i, "chunk", c) |
|
||||||
} |
|
||||||
|
|
||||||
for i := 0; i < n; i++ { |
|
||||||
err := ldb.Put(context.TODO(), chunks[i]) |
|
||||||
if err != nil { |
|
||||||
t.Fatal(err) |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
waitGc(ldb) |
|
||||||
|
|
||||||
// delete all chunks
|
|
||||||
// (only count the ones actually deleted, the rest will have been gc'd)
|
|
||||||
deletes := 0 |
|
||||||
for i := 0; i < n; i++ { |
|
||||||
if ldb.Delete(chunks[i].Address()) == nil { |
|
||||||
deletes++ |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
log.Info("ldbstore", "entrycnt", ldb.entryCnt, "accesscnt", ldb.accessCnt) |
|
||||||
|
|
||||||
if ldb.entryCnt != 0 { |
|
||||||
t.Fatalf("ldb.entrCnt expected 0 got %v", ldb.entryCnt) |
|
||||||
} |
|
||||||
|
|
||||||
// the manual deletes will have increased accesscnt, so we need to add this when we verify the current count
|
|
||||||
expAccessCnt := uint64(n) |
|
||||||
if ldb.accessCnt != expAccessCnt { |
|
||||||
t.Fatalf("ldb.accessCnt expected %v got %v", expAccessCnt, ldb.accessCnt) |
|
||||||
} |
|
||||||
|
|
||||||
// retrieve the gc round target count for the db capacity
|
|
||||||
ldb.startGC(capacity) |
|
||||||
roundTarget := ldb.gc.target |
|
||||||
|
|
||||||
remaining := n |
|
||||||
var puts int |
|
||||||
for remaining > 0 { |
|
||||||
var putCount int |
|
||||||
if remaining < roundTarget { |
|
||||||
putCount = remaining |
|
||||||
} else { |
|
||||||
putCount = roundTarget |
|
||||||
} |
|
||||||
remaining -= putCount |
|
||||||
for putCount > 0 { |
|
||||||
ldb.Put(context.TODO(), chunks[puts]) |
|
||||||
ldb.lock.RLock() |
|
||||||
log.Debug("ldbstore", "entrycnt", ldb.entryCnt, "accesscnt", ldb.accessCnt, "cap", capacity, "n", n, "puts", puts, "remaining", remaining, "roundtarget", roundTarget) |
|
||||||
ldb.lock.RUnlock() |
|
||||||
puts++ |
|
||||||
putCount-- |
|
||||||
} |
|
||||||
|
|
||||||
waitGc(ldb) |
|
||||||
} |
|
||||||
|
|
||||||
// expect first surplus chunks to be missing, because they have the smallest access value
|
|
||||||
expectMissing := roundTarget + (((n - capacity) / roundTarget) * roundTarget) |
|
||||||
for i := 0; i < expectMissing; i++ { |
|
||||||
_, err := ldb.Get(context.TODO(), chunks[i].Address()) |
|
||||||
if err == nil { |
|
||||||
t.Fatalf("expected surplus chunk %d to be missing, but got no error", i) |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
// expect last chunks to be present, as they have the largest access value
|
|
||||||
for i := expectMissing; i < n; i++ { |
|
||||||
ret, err := ldb.Get(context.TODO(), chunks[i].Address()) |
|
||||||
if err != nil { |
|
||||||
t.Fatalf("chunk %v: expected no error, but got %s", i, err) |
|
||||||
} |
|
||||||
if !bytes.Equal(ret.Data(), chunks[i].Data()) { |
|
||||||
t.Fatal("expected to get the same data back, but got smth else") |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
// TestLDBStoreCollectGarbageAccessUnlikeIndex tests garbage collection where accesscount differs from indexcount
|
|
||||||
func TestLDBStoreCollectGarbageAccessUnlikeIndex(t *testing.T) { |
|
||||||
|
|
||||||
capacity := defaultMaxGCRound / 100 * 2 |
|
||||||
n := capacity - 1 |
|
||||||
|
|
||||||
ldb, cleanup := newLDBStore(t) |
|
||||||
ldb.setCapacity(uint64(capacity)) |
|
||||||
defer cleanup() |
|
||||||
|
|
||||||
chunks, err := mputRandomChunks(ldb, n) |
|
||||||
if err != nil { |
|
||||||
t.Fatal(err.Error()) |
|
||||||
} |
|
||||||
log.Info("ldbstore", "entrycnt", ldb.entryCnt, "accesscnt", ldb.accessCnt) |
|
||||||
|
|
||||||
// set first added capacity/2 chunks to highest accesscount
|
|
||||||
for i := 0; i < capacity/2; i++ { |
|
||||||
_, err := ldb.Get(context.TODO(), chunks[i].Address()) |
|
||||||
if err != nil { |
|
||||||
t.Fatalf("fail add chunk #%d - %s: %v", i, chunks[i].Address(), err) |
|
||||||
} |
|
||||||
} |
|
||||||
_, err = mputRandomChunks(ldb, 2) |
|
||||||
if err != nil { |
|
||||||
t.Fatal(err.Error()) |
|
||||||
} |
|
||||||
|
|
||||||
// wait for garbage collection to kick in on the responsible actor
|
|
||||||
waitGc(ldb) |
|
||||||
|
|
||||||
var missing int |
|
||||||
for i, ch := range chunks[2 : capacity/2] { |
|
||||||
ret, err := ldb.Get(context.TODO(), ch.Address()) |
|
||||||
if err == ErrChunkNotFound || err == ldberrors.ErrNotFound { |
|
||||||
t.Fatalf("fail find chunk #%d - %s: %v", i, ch.Address(), err) |
|
||||||
} |
|
||||||
|
|
||||||
if !bytes.Equal(ret.Data(), ch.Data()) { |
|
||||||
t.Fatal("expected to get the same data back, but got smth else") |
|
||||||
} |
|
||||||
log.Trace("got back chunk", "chunk", ret) |
|
||||||
} |
|
||||||
|
|
||||||
log.Info("ldbstore", "total", n, "missing", missing, "entrycnt", ldb.entryCnt, "accesscnt", ldb.accessCnt) |
|
||||||
} |
|
||||||
|
|
||||||
func TestCleanIndex(t *testing.T) { |
|
||||||
if testutil.RaceEnabled { |
|
||||||
t.Skip("disabled because it times out with race detector") |
|
||||||
} |
|
||||||
|
|
||||||
capacity := 5000 |
|
||||||
n := 3 |
|
||||||
|
|
||||||
ldb, cleanup := newLDBStore(t) |
|
||||||
ldb.setCapacity(uint64(capacity)) |
|
||||||
defer cleanup() |
|
||||||
|
|
||||||
chunks, err := mputRandomChunks(ldb, n) |
|
||||||
if err != nil { |
|
||||||
t.Fatal(err) |
|
||||||
} |
|
||||||
|
|
||||||
// remove the data of the first chunk
|
|
||||||
po := ldb.po(chunks[0].Address()[:]) |
|
||||||
dataKey := make([]byte, 10) |
|
||||||
dataKey[0] = keyData |
|
||||||
dataKey[1] = byte(po) |
|
||||||
// dataKey[2:10] = first chunk has storageIdx 0 on [2:10]
|
|
||||||
if _, err := ldb.db.Get(dataKey); err != nil { |
|
||||||
t.Fatal(err) |
|
||||||
} |
|
||||||
if err := ldb.db.Delete(dataKey); err != nil { |
|
||||||
t.Fatal(err) |
|
||||||
} |
|
||||||
|
|
||||||
// remove the gc index row for the first chunk
|
|
||||||
gcFirstCorrectKey := make([]byte, 9) |
|
||||||
gcFirstCorrectKey[0] = keyGCIdx |
|
||||||
if err := ldb.db.Delete(gcFirstCorrectKey); err != nil { |
|
||||||
t.Fatal(err) |
|
||||||
} |
|
||||||
|
|
||||||
// warp the gc data of the second chunk
|
|
||||||
// this data should be correct again after the clean
|
|
||||||
gcSecondCorrectKey := make([]byte, 9) |
|
||||||
gcSecondCorrectKey[0] = keyGCIdx |
|
||||||
binary.BigEndian.PutUint64(gcSecondCorrectKey[1:], uint64(1)) |
|
||||||
gcSecondCorrectVal, err := ldb.db.Get(gcSecondCorrectKey) |
|
||||||
if err != nil { |
|
||||||
t.Fatal(err) |
|
||||||
} |
|
||||||
warpedGCVal := make([]byte, len(gcSecondCorrectVal)+1) |
|
||||||
copy(warpedGCVal[1:], gcSecondCorrectVal) |
|
||||||
if err := ldb.db.Delete(gcSecondCorrectKey); err != nil { |
|
||||||
t.Fatal(err) |
|
||||||
} |
|
||||||
if err := ldb.db.Put(gcSecondCorrectKey, warpedGCVal); err != nil { |
|
||||||
t.Fatal(err) |
|
||||||
} |
|
||||||
|
|
||||||
if err := ldb.CleanGCIndex(); err != nil { |
|
||||||
t.Fatal(err) |
|
||||||
} |
|
||||||
|
|
||||||
// the index without corresponding data should have been deleted
|
|
||||||
idxKey := make([]byte, 33) |
|
||||||
idxKey[0] = keyIndex |
|
||||||
copy(idxKey[1:], chunks[0].Address()) |
|
||||||
if _, err := ldb.db.Get(idxKey); err == nil { |
|
||||||
t.Fatalf("expected chunk 0 idx to be pruned: %v", idxKey) |
|
||||||
} |
|
||||||
|
|
||||||
// the two other indices should be present
|
|
||||||
copy(idxKey[1:], chunks[1].Address()) |
|
||||||
if _, err := ldb.db.Get(idxKey); err != nil { |
|
||||||
t.Fatalf("expected chunk 1 idx to be present: %v", idxKey) |
|
||||||
} |
|
||||||
|
|
||||||
copy(idxKey[1:], chunks[2].Address()) |
|
||||||
if _, err := ldb.db.Get(idxKey); err != nil { |
|
||||||
t.Fatalf("expected chunk 2 idx to be present: %v", idxKey) |
|
||||||
} |
|
||||||
|
|
||||||
// first gc index should still be gone
|
|
||||||
if _, err := ldb.db.Get(gcFirstCorrectKey); err == nil { |
|
||||||
t.Fatalf("expected gc 0 idx to be pruned: %v", idxKey) |
|
||||||
} |
|
||||||
|
|
||||||
// second gc index should still be fixed
|
|
||||||
if _, err := ldb.db.Get(gcSecondCorrectKey); err != nil { |
|
||||||
t.Fatalf("expected gc 1 idx to be present: %v", idxKey) |
|
||||||
} |
|
||||||
|
|
||||||
// third gc index should be unchanged
|
|
||||||
binary.BigEndian.PutUint64(gcSecondCorrectKey[1:], uint64(2)) |
|
||||||
if _, err := ldb.db.Get(gcSecondCorrectKey); err != nil { |
|
||||||
t.Fatalf("expected gc 2 idx to be present: %v", idxKey) |
|
||||||
} |
|
||||||
|
|
||||||
c, err := ldb.db.Get(keyEntryCnt) |
|
||||||
if err != nil { |
|
||||||
t.Fatalf("expected gc 2 idx to be present: %v", idxKey) |
|
||||||
} |
|
||||||
|
|
||||||
// entrycount should now be one less
|
|
||||||
entryCount := binary.BigEndian.Uint64(c) |
|
||||||
if entryCount != 2 { |
|
||||||
t.Fatalf("expected entrycnt to be 2, was %d", c) |
|
||||||
} |
|
||||||
|
|
||||||
// the chunks might accidentally be in the same bin
|
|
||||||
// if so that bin counter will now be 2 - the highest added index.
|
|
||||||
// if not, the total of them will be 3
|
|
||||||
poBins := []uint8{ldb.po(chunks[1].Address()), ldb.po(chunks[2].Address())} |
|
||||||
if poBins[0] == poBins[1] { |
|
||||||
poBins = poBins[:1] |
|
||||||
} |
|
||||||
|
|
||||||
var binTotal uint64 |
|
||||||
var currentBin [2]byte |
|
||||||
currentBin[0] = keyDistanceCnt |
|
||||||
if len(poBins) == 1 { |
|
||||||
currentBin[1] = poBins[0] |
|
||||||
c, err := ldb.db.Get(currentBin[:]) |
|
||||||
if err != nil { |
|
||||||
t.Fatalf("expected gc 2 idx to be present: %v", idxKey) |
|
||||||
} |
|
||||||
binCount := binary.BigEndian.Uint64(c) |
|
||||||
if binCount != 2 { |
|
||||||
t.Fatalf("expected entrycnt to be 2, was %d", binCount) |
|
||||||
} |
|
||||||
} else { |
|
||||||
for _, bin := range poBins { |
|
||||||
currentBin[1] = bin |
|
||||||
c, err := ldb.db.Get(currentBin[:]) |
|
||||||
if err != nil { |
|
||||||
t.Fatalf("expected gc 2 idx to be present: %v", idxKey) |
|
||||||
} |
|
||||||
binCount := binary.BigEndian.Uint64(c) |
|
||||||
binTotal += binCount |
|
||||||
|
|
||||||
} |
|
||||||
if binTotal != 3 { |
|
||||||
t.Fatalf("expected sum of bin indices to be 3, was %d", binTotal) |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
// check that the iterator quits properly
|
|
||||||
chunks, err = mputRandomChunks(ldb, 4100) |
|
||||||
if err != nil { |
|
||||||
t.Fatal(err) |
|
||||||
} |
|
||||||
|
|
||||||
po = ldb.po(chunks[4099].Address()[:]) |
|
||||||
dataKey = make([]byte, 10) |
|
||||||
dataKey[0] = keyData |
|
||||||
dataKey[1] = byte(po) |
|
||||||
binary.BigEndian.PutUint64(dataKey[2:], 4099+3) |
|
||||||
if _, err := ldb.db.Get(dataKey); err != nil { |
|
||||||
t.Fatal(err) |
|
||||||
} |
|
||||||
if err := ldb.db.Delete(dataKey); err != nil { |
|
||||||
t.Fatal(err) |
|
||||||
} |
|
||||||
|
|
||||||
if err := ldb.CleanGCIndex(); err != nil { |
|
||||||
t.Fatal(err) |
|
||||||
} |
|
||||||
|
|
||||||
// entrycount should now be one less of added chunks
|
|
||||||
c, err = ldb.db.Get(keyEntryCnt) |
|
||||||
if err != nil { |
|
||||||
t.Fatalf("expected gc 2 idx to be present: %v", idxKey) |
|
||||||
} |
|
||||||
entryCount = binary.BigEndian.Uint64(c) |
|
||||||
if entryCount != 4099+2 { |
|
||||||
t.Fatalf("expected entrycnt to be 2, was %d", c) |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
// Note: waitGc does not guarantee that we wait 1 GC round; it only
|
|
||||||
// guarantees that if the GC is running we wait for that run to finish
|
|
||||||
// ticket: https://github.com/ethersphere/go-ethereum/issues/1151
|
|
||||||
func waitGc(ldb *LDBStore) { |
|
||||||
<-ldb.gc.runC |
|
||||||
ldb.gc.runC <- struct{}{} |
|
||||||
} |
|
@ -1,251 +0,0 @@ |
|||||||
// Copyright 2016 The go-ethereum Authors
|
|
||||||
// This file is part of the go-ethereum library.
|
|
||||||
//
|
|
||||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
|
||||||
// it under the terms of the GNU Lesser General Public License as published by
|
|
||||||
// the Free Software Foundation, either version 3 of the License, or
|
|
||||||
// (at your option) any later version.
|
|
||||||
//
|
|
||||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
|
||||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
// GNU Lesser General Public License for more details.
|
|
||||||
//
|
|
||||||
// You should have received a copy of the GNU Lesser General Public License
|
|
||||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
|
|
||||||
package storage |
|
||||||
|
|
||||||
import ( |
|
||||||
"context" |
|
||||||
"path/filepath" |
|
||||||
"sync" |
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/metrics" |
|
||||||
"github.com/ethereum/go-ethereum/swarm/log" |
|
||||||
"github.com/ethereum/go-ethereum/swarm/storage/mock" |
|
||||||
) |
|
||||||
|
|
||||||
type LocalStoreParams struct { |
|
||||||
*StoreParams |
|
||||||
ChunkDbPath string |
|
||||||
Validators []ChunkValidator `toml:"-"` |
|
||||||
} |
|
||||||
|
|
||||||
func NewDefaultLocalStoreParams() *LocalStoreParams { |
|
||||||
return &LocalStoreParams{ |
|
||||||
StoreParams: NewDefaultStoreParams(), |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
//this can only finally be set after all config options (file, cmd line, env vars)
|
|
||||||
//have been evaluated
|
|
||||||
func (p *LocalStoreParams) Init(path string) { |
|
||||||
if p.ChunkDbPath == "" { |
|
||||||
p.ChunkDbPath = filepath.Join(path, "chunks") |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
// LocalStore is a combination of inmemory db over a disk persisted db
|
|
||||||
// implements a Get/Put with fallback (caching) logic using any 2 ChunkStores
|
|
||||||
type LocalStore struct { |
|
||||||
Validators []ChunkValidator |
|
||||||
memStore *MemStore |
|
||||||
DbStore *LDBStore |
|
||||||
mu sync.Mutex |
|
||||||
} |
|
||||||
|
|
||||||
// This constructor uses MemStore and DbStore as components
|
|
||||||
func NewLocalStore(params *LocalStoreParams, mockStore *mock.NodeStore) (*LocalStore, error) { |
|
||||||
ldbparams := NewLDBStoreParams(params.StoreParams, params.ChunkDbPath) |
|
||||||
dbStore, err := NewMockDbStore(ldbparams, mockStore) |
|
||||||
if err != nil { |
|
||||||
return nil, err |
|
||||||
} |
|
||||||
return &LocalStore{ |
|
||||||
memStore: NewMemStore(params.StoreParams, dbStore), |
|
||||||
DbStore: dbStore, |
|
||||||
Validators: params.Validators, |
|
||||||
}, nil |
|
||||||
} |
|
||||||
|
|
||||||
func NewTestLocalStoreForAddr(params *LocalStoreParams) (*LocalStore, error) { |
|
||||||
ldbparams := NewLDBStoreParams(params.StoreParams, params.ChunkDbPath) |
|
||||||
dbStore, err := NewLDBStore(ldbparams) |
|
||||||
if err != nil { |
|
||||||
return nil, err |
|
||||||
} |
|
||||||
localStore := &LocalStore{ |
|
||||||
memStore: NewMemStore(params.StoreParams, dbStore), |
|
||||||
DbStore: dbStore, |
|
||||||
Validators: params.Validators, |
|
||||||
} |
|
||||||
return localStore, nil |
|
||||||
} |
|
||||||
|
|
||||||
// isValid returns true if chunk passes any of the LocalStore Validators.
|
|
||||||
// isValid also returns true if LocalStore has no Validators.
|
|
||||||
func (ls *LocalStore) isValid(chunk Chunk) bool { |
|
||||||
// by default chunks are valid. if we have 0 validators, then all chunks are valid.
|
|
||||||
valid := true |
|
||||||
|
|
||||||
// ls.Validators contains a list of one validator per chunk type.
|
|
||||||
// if one validator succeeds, then the chunk is valid
|
|
||||||
for _, v := range ls.Validators { |
|
||||||
if valid = v.Validate(chunk); valid { |
|
||||||
break |
|
||||||
} |
|
||||||
} |
|
||||||
return valid |
|
||||||
} |
|
||||||
|
|
||||||
// Put is responsible for doing validation and storage of the chunk
|
|
||||||
// by using configured ChunkValidators, MemStore and LDBStore.
|
|
||||||
// If the chunk is not valid, its GetErrored function will
|
|
||||||
// return ErrChunkInvalid.
|
|
||||||
// This method will check if the chunk is already in the MemStore
|
|
||||||
// and it will return it if it is. If there is an error from
|
|
||||||
// the MemStore.Get, it will be returned by calling GetErrored
|
|
||||||
// on the chunk.
|
|
||||||
// This method is responsible for closing Chunk.ReqC channel
|
|
||||||
// when the chunk is stored in memstore.
|
|
||||||
// After the LDBStore.Put, it is ensured that the MemStore
|
|
||||||
// contains the chunk with the same data, but nil ReqC channel.
|
|
||||||
func (ls *LocalStore) Put(ctx context.Context, chunk Chunk) error { |
|
||||||
if !ls.isValid(chunk) { |
|
||||||
return ErrChunkInvalid |
|
||||||
} |
|
||||||
|
|
||||||
log.Trace("localstore.put", "key", chunk.Address()) |
|
||||||
ls.mu.Lock() |
|
||||||
defer ls.mu.Unlock() |
|
||||||
|
|
||||||
_, err := ls.memStore.Get(ctx, chunk.Address()) |
|
||||||
if err == nil { |
|
||||||
return nil |
|
||||||
} |
|
||||||
if err != nil && err != ErrChunkNotFound { |
|
||||||
return err |
|
||||||
} |
|
||||||
ls.memStore.Put(ctx, chunk) |
|
||||||
err = ls.DbStore.Put(ctx, chunk) |
|
||||||
return err |
|
||||||
} |
|
||||||
|
|
||||||
// Has queries the underlying DbStore if a chunk with the given address
|
|
||||||
// is being stored there.
|
|
||||||
// Returns true if it is stored, false if not
|
|
||||||
func (ls *LocalStore) Has(ctx context.Context, addr Address) bool { |
|
||||||
return ls.DbStore.Has(ctx, addr) |
|
||||||
} |
|
||||||
|
|
||||||
// Get(chunk *Chunk) looks up a chunk in the local stores
|
|
||||||
// This method is blocking until the chunk is retrieved
|
|
||||||
// so additional timeout may be needed to wrap this call if
|
|
||||||
// ChunkStores are remote and can have long latency
|
|
||||||
func (ls *LocalStore) Get(ctx context.Context, addr Address) (chunk Chunk, err error) { |
|
||||||
ls.mu.Lock() |
|
||||||
defer ls.mu.Unlock() |
|
||||||
|
|
||||||
return ls.get(ctx, addr) |
|
||||||
} |
|
||||||
|
|
||||||
func (ls *LocalStore) get(ctx context.Context, addr Address) (chunk Chunk, err error) { |
|
||||||
chunk, err = ls.memStore.Get(ctx, addr) |
|
||||||
|
|
||||||
if err != nil && err != ErrChunkNotFound { |
|
||||||
metrics.GetOrRegisterCounter("localstore.get.error", nil).Inc(1) |
|
||||||
return nil, err |
|
||||||
} |
|
||||||
|
|
||||||
if err == nil { |
|
||||||
metrics.GetOrRegisterCounter("localstore.get.cachehit", nil).Inc(1) |
|
||||||
go ls.DbStore.MarkAccessed(addr) |
|
||||||
return chunk, nil |
|
||||||
} |
|
||||||
|
|
||||||
metrics.GetOrRegisterCounter("localstore.get.cachemiss", nil).Inc(1) |
|
||||||
chunk, err = ls.DbStore.Get(ctx, addr) |
|
||||||
if err != nil { |
|
||||||
metrics.GetOrRegisterCounter("localstore.get.error", nil).Inc(1) |
|
||||||
return nil, err |
|
||||||
} |
|
||||||
|
|
||||||
ls.memStore.Put(ctx, chunk) |
|
||||||
return chunk, nil |
|
||||||
} |
|
||||||
|
|
||||||
func (ls *LocalStore) FetchFunc(ctx context.Context, addr Address) func(context.Context) error { |
|
||||||
ls.mu.Lock() |
|
||||||
defer ls.mu.Unlock() |
|
||||||
|
|
||||||
_, err := ls.get(ctx, addr) |
|
||||||
if err == nil { |
|
||||||
return nil |
|
||||||
} |
|
||||||
return func(context.Context) error { |
|
||||||
return err |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
func (ls *LocalStore) BinIndex(po uint8) uint64 { |
|
||||||
return ls.DbStore.BinIndex(po) |
|
||||||
} |
|
||||||
|
|
||||||
func (ls *LocalStore) Iterator(from uint64, to uint64, po uint8, f func(Address, uint64) bool) error { |
|
||||||
return ls.DbStore.SyncIterator(from, to, po, f) |
|
||||||
} |
|
||||||
|
|
||||||
// Close the local store
|
|
||||||
func (ls *LocalStore) Close() { |
|
||||||
ls.DbStore.Close() |
|
||||||
} |
|
||||||
|
|
||||||
// Migrate checks the datastore schema vs the runtime schema and runs
|
|
||||||
// migrations if they don't match
|
|
||||||
func (ls *LocalStore) Migrate() error { |
|
||||||
actualDbSchema, err := ls.DbStore.GetSchema() |
|
||||||
if err != nil { |
|
||||||
log.Error(err.Error()) |
|
||||||
return err |
|
||||||
} |
|
||||||
|
|
||||||
if actualDbSchema == CurrentDbSchema { |
|
||||||
return nil |
|
||||||
} |
|
||||||
|
|
||||||
log.Debug("running migrations for", "schema", actualDbSchema, "runtime-schema", CurrentDbSchema) |
|
||||||
|
|
||||||
if actualDbSchema == DbSchemaNone { |
|
||||||
ls.migrateFromNoneToPurity() |
|
||||||
actualDbSchema = DbSchemaPurity |
|
||||||
} |
|
||||||
|
|
||||||
if err := ls.DbStore.PutSchema(actualDbSchema); err != nil { |
|
||||||
return err |
|
||||||
} |
|
||||||
|
|
||||||
if actualDbSchema == DbSchemaPurity { |
|
||||||
if err := ls.migrateFromPurityToHalloween(); err != nil { |
|
||||||
return err |
|
||||||
} |
|
||||||
actualDbSchema = DbSchemaHalloween |
|
||||||
} |
|
||||||
|
|
||||||
if err := ls.DbStore.PutSchema(actualDbSchema); err != nil { |
|
||||||
return err |
|
||||||
} |
|
||||||
return nil |
|
||||||
} |
|
||||||
|
|
||||||
func (ls *LocalStore) migrateFromNoneToPurity() { |
|
||||||
// delete chunks that are not valid, i.e. chunks that do not pass
|
|
||||||
// any of the ls.Validators
|
|
||||||
ls.DbStore.Cleanup(func(c Chunk) bool { |
|
||||||
return !ls.isValid(c) |
|
||||||
}) |
|
||||||
} |
|
||||||
|
|
||||||
func (ls *LocalStore) migrateFromPurityToHalloween() error { |
|
||||||
return ls.DbStore.CleanGCIndex() |
|
||||||
} |
|
@ -0,0 +1,204 @@ |
|||||||
|
// Copyright 2019 The go-ethereum Authors
|
||||||
|
// This file is part of the go-ethereum library.
|
||||||
|
//
|
||||||
|
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU Lesser General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
//
|
||||||
|
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU Lesser General Public License for more details.
|
||||||
|
//
|
||||||
|
// You should have received a copy of the GNU Lesser General Public License
|
||||||
|
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
package localstore |
||||||
|
|
||||||
|
import ( |
||||||
|
"archive/tar" |
||||||
|
"context" |
||||||
|
"encoding/hex" |
||||||
|
"fmt" |
||||||
|
"io" |
||||||
|
"io/ioutil" |
||||||
|
"sync" |
||||||
|
|
||||||
|
"github.com/ethereum/go-ethereum/swarm/chunk" |
||||||
|
"github.com/ethereum/go-ethereum/swarm/log" |
||||||
|
"github.com/ethereum/go-ethereum/swarm/shed" |
||||||
|
) |
||||||
|
|
||||||
|
const ( |
||||||
|
// filename in tar archive that holds the information
|
||||||
|
// about exported data format version
|
||||||
|
exportVersionFilename = ".swarm-export-version" |
||||||
|
// legacy version for previous LDBStore
|
||||||
|
legacyExportVersion = "1" |
||||||
|
// current export format version
|
||||||
|
currentExportVersion = "2" |
||||||
|
) |
||||||
|
|
||||||
|
// Export writes a tar structured data to the writer of
|
||||||
|
// all chunks in the retrieval data index. It returns the
|
||||||
|
// number of chunks exported.
|
||||||
|
func (db *DB) Export(w io.Writer) (count int64, err error) { |
||||||
|
tw := tar.NewWriter(w) |
||||||
|
defer tw.Close() |
||||||
|
|
||||||
|
if err := tw.WriteHeader(&tar.Header{ |
||||||
|
Name: exportVersionFilename, |
||||||
|
Mode: 0644, |
||||||
|
Size: int64(len(currentExportVersion)), |
||||||
|
}); err != nil { |
||||||
|
return 0, err |
||||||
|
} |
||||||
|
if _, err := tw.Write([]byte(currentExportVersion)); err != nil { |
||||||
|
return 0, err |
||||||
|
} |
||||||
|
|
||||||
|
err = db.retrievalDataIndex.Iterate(func(item shed.Item) (stop bool, err error) { |
||||||
|
hdr := &tar.Header{ |
||||||
|
Name: hex.EncodeToString(item.Address), |
||||||
|
Mode: 0644, |
||||||
|
Size: int64(len(item.Data)), |
||||||
|
} |
||||||
|
if err := tw.WriteHeader(hdr); err != nil { |
||||||
|
return false, err |
||||||
|
} |
||||||
|
if _, err := tw.Write(item.Data); err != nil { |
||||||
|
return false, err |
||||||
|
} |
||||||
|
count++ |
||||||
|
return false, nil |
||||||
|
}, nil) |
||||||
|
|
||||||
|
return count, err |
||||||
|
} |
||||||
|
|
||||||
|
// Import reads a tar structured data from the reader and
|
||||||
|
// stores chunks in the database. It returns the number of
|
||||||
|
// chunks imported.
|
||||||
|
func (db *DB) Import(r io.Reader, legacy bool) (count int64, err error) { |
||||||
|
tr := tar.NewReader(r) |
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background()) |
||||||
|
defer cancel() |
||||||
|
|
||||||
|
errC := make(chan error) |
||||||
|
doneC := make(chan struct{}) |
||||||
|
tokenPool := make(chan struct{}, 100) |
||||||
|
var wg sync.WaitGroup |
||||||
|
go func() { |
||||||
|
var ( |
||||||
|
firstFile = true |
||||||
|
// if exportVersionFilename file is not present
|
||||||
|
// assume legacy version
|
||||||
|
version = legacyExportVersion |
||||||
|
) |
||||||
|
for { |
||||||
|
hdr, err := tr.Next() |
||||||
|
if err != nil { |
||||||
|
if err == io.EOF { |
||||||
|
break |
||||||
|
} |
||||||
|
select { |
||||||
|
case errC <- err: |
||||||
|
case <-ctx.Done(): |
||||||
|
} |
||||||
|
} |
||||||
|
if firstFile { |
||||||
|
firstFile = false |
||||||
|
if hdr.Name == exportVersionFilename { |
||||||
|
data, err := ioutil.ReadAll(tr) |
||||||
|
if err != nil { |
||||||
|
select { |
||||||
|
case errC <- err: |
||||||
|
case <-ctx.Done(): |
||||||
|
} |
||||||
|
} |
||||||
|
version = string(data) |
||||||
|
continue |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
if len(hdr.Name) != 64 { |
||||||
|
log.Warn("ignoring non-chunk file", "name", hdr.Name) |
||||||
|
continue |
||||||
|
} |
||||||
|
|
||||||
|
keybytes, err := hex.DecodeString(hdr.Name) |
||||||
|
if err != nil { |
||||||
|
log.Warn("ignoring invalid chunk file", "name", hdr.Name, "err", err) |
||||||
|
continue |
||||||
|
} |
||||||
|
|
||||||
|
data, err := ioutil.ReadAll(tr) |
||||||
|
if err != nil { |
||||||
|
select { |
||||||
|
case errC <- err: |
||||||
|
case <-ctx.Done(): |
||||||
|
} |
||||||
|
} |
||||||
|
key := chunk.Address(keybytes) |
||||||
|
|
||||||
|
var ch chunk.Chunk |
||||||
|
switch version { |
||||||
|
case legacyExportVersion: |
||||||
|
// LDBStore Export exported chunk data prefixed with the chunk key.
|
||||||
|
// That is not necessary, as the key is in the chunk filename,
|
||||||
|
// but backward compatibility needs to be preserved.
|
||||||
|
ch = chunk.NewChunk(key, data[32:]) |
||||||
|
case currentExportVersion: |
||||||
|
ch = chunk.NewChunk(key, data) |
||||||
|
default: |
||||||
|
select { |
||||||
|
case errC <- fmt.Errorf("unsupported export data version %q", version): |
||||||
|
case <-ctx.Done(): |
||||||
|
} |
||||||
|
} |
||||||
|
tokenPool <- struct{}{} |
||||||
|
wg.Add(1) |
||||||
|
|
||||||
|
go func() { |
||||||
|
_, err := db.Put(ctx, chunk.ModePutUpload, ch) |
||||||
|
select { |
||||||
|
case errC <- err: |
||||||
|
case <-ctx.Done(): |
||||||
|
wg.Done() |
||||||
|
<-tokenPool |
||||||
|
default: |
||||||
|
_, err := db.Put(ctx, chunk.ModePutUpload, ch) |
||||||
|
if err != nil { |
||||||
|
errC <- err |
||||||
|
} |
||||||
|
wg.Done() |
||||||
|
<-tokenPool |
||||||
|
} |
||||||
|
}() |
||||||
|
|
||||||
|
count++ |
||||||
|
} |
||||||
|
wg.Wait() |
||||||
|
close(doneC) |
||||||
|
}() |
||||||
|
|
||||||
|
// wait for all chunks to be stored
|
||||||
|
for { |
||||||
|
select { |
||||||
|
case err := <-errC: |
||||||
|
if err != nil { |
||||||
|
return count, err |
||||||
|
} |
||||||
|
case <-ctx.Done(): |
||||||
|
return count, ctx.Err() |
||||||
|
default: |
||||||
|
select { |
||||||
|
case <-doneC: |
||||||
|
return count, nil |
||||||
|
default: |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
} |
@ -0,0 +1,80 @@ |
|||||||
|
// Copyright 2019 The go-ethereum Authors
|
||||||
|
// This file is part of the go-ethereum library.
|
||||||
|
//
|
||||||
|
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU Lesser General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
//
|
||||||
|
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU Lesser General Public License for more details.
|
||||||
|
//
|
||||||
|
// You should have received a copy of the GNU Lesser General Public License
|
||||||
|
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
package localstore |
||||||
|
|
||||||
|
import ( |
||||||
|
"bytes" |
||||||
|
"context" |
||||||
|
"testing" |
||||||
|
|
||||||
|
"github.com/ethereum/go-ethereum/swarm/chunk" |
||||||
|
) |
||||||
|
|
||||||
|
// TestExportImport constructs two databases, one to put and export
|
||||||
|
// chunks and another one to import and validate that all chunks are
|
||||||
|
// imported.
|
||||||
|
func TestExportImport(t *testing.T) { |
||||||
|
db1, cleanup1 := newTestDB(t, nil) |
||||||
|
defer cleanup1() |
||||||
|
|
||||||
|
var chunkCount = 100 |
||||||
|
|
||||||
|
chunks := make(map[string][]byte, chunkCount) |
||||||
|
for i := 0; i < chunkCount; i++ { |
||||||
|
ch := generateTestRandomChunk() |
||||||
|
|
||||||
|
_, err := db1.Put(context.Background(), chunk.ModePutUpload, ch) |
||||||
|
if err != nil { |
||||||
|
t.Fatal(err) |
||||||
|
} |
||||||
|
chunks[string(ch.Address())] = ch.Data() |
||||||
|
} |
||||||
|
|
||||||
|
var buf bytes.Buffer |
||||||
|
|
||||||
|
c, err := db1.Export(&buf) |
||||||
|
if err != nil { |
||||||
|
t.Fatal(err) |
||||||
|
} |
||||||
|
wantChunksCount := int64(len(chunks)) |
||||||
|
if c != wantChunksCount { |
||||||
|
t.Errorf("got export count %v, want %v", c, wantChunksCount) |
||||||
|
} |
||||||
|
|
||||||
|
db2, cleanup2 := newTestDB(t, nil) |
||||||
|
defer cleanup2() |
||||||
|
|
||||||
|
c, err = db2.Import(&buf, false) |
||||||
|
if err != nil { |
||||||
|
t.Fatal(err) |
||||||
|
} |
||||||
|
if c != wantChunksCount { |
||||||
|
t.Errorf("got import count %v, want %v", c, wantChunksCount) |
||||||
|
} |
||||||
|
|
||||||
|
for a, want := range chunks { |
||||||
|
addr := chunk.Address([]byte(a)) |
||||||
|
ch, err := db2.Get(context.Background(), chunk.ModeGetRequest, addr) |
||||||
|
if err != nil { |
||||||
|
t.Fatal(err) |
||||||
|
} |
||||||
|
got := ch.Data() |
||||||
|
if !bytes.Equal(got, want) { |
||||||
|
t.Fatalf("chunk %s: got data %x, want %x", addr.Hex(), got, want) |
||||||
|
} |
||||||
|
} |
||||||
|
} |
@ -0,0 +1,52 @@ |
|||||||
|
package localstore |
||||||
|
|
||||||
|
import ( |
||||||
|
"github.com/ethereum/go-ethereum/swarm/log" |
||||||
|
"github.com/syndtr/goleveldb/leveldb" |
||||||
|
"github.com/syndtr/goleveldb/leveldb/opt" |
||||||
|
) |
||||||
|
|
||||||
|
// The DB schema we want to use. The actual/current DB schema might differ
|
||||||
|
// until migrations are run.
|
||||||
|
const CurrentDbSchema = DbSchemaSanctuary |
||||||
|
|
||||||
|
// There was a time when we had no schema at all.
|
||||||
|
const DbSchemaNone = "" |
||||||
|
|
||||||
|
// "purity" is the first formal schema of LevelDB we release together with Swarm 0.3.5
|
||||||
|
const DbSchemaPurity = "purity" |
||||||
|
|
||||||
|
// "halloween" is here because we had a screw in the garbage collector index.
|
||||||
|
// Because of that we had to rebuild the GC index to get rid of erroneous
|
||||||
|
// entries and that takes a long time. This schema is used for bookkeeping,
|
||||||
|
// so rebuild index will run just once.
|
||||||
|
const DbSchemaHalloween = "halloween" |
||||||
|
|
||||||
|
const DbSchemaSanctuary = "sanctuary" |
||||||
|
|
||||||
|
// returns true if legacy database is in the datadir
|
||||||
|
func IsLegacyDatabase(datadir string) bool { |
||||||
|
|
||||||
|
var ( |
||||||
|
legacyDbSchemaKey = []byte{8} |
||||||
|
) |
||||||
|
|
||||||
|
db, err := leveldb.OpenFile(datadir, &opt.Options{OpenFilesCacheCapacity: 128}) |
||||||
|
if err != nil { |
||||||
|
log.Error("got an error while trying to open leveldb path", "path", datadir, "err", err) |
||||||
|
return false |
||||||
|
} |
||||||
|
defer db.Close() |
||||||
|
|
||||||
|
data, err := db.Get(legacyDbSchemaKey, nil) |
||||||
|
if err != nil { |
||||||
|
if err == leveldb.ErrNotFound { |
||||||
|
// if we haven't found anything under the legacy db schema key- we are not on legacy
|
||||||
|
return false |
||||||
|
} |
||||||
|
|
||||||
|
log.Error("got an unexpected error fetching legacy name from the database", "err", err) |
||||||
|
} |
||||||
|
log.Trace("checking if database scheme is legacy", "schema name", string(data)) |
||||||
|
return string(data) == DbSchemaHalloween || string(data) == DbSchemaPurity |
||||||
|
} |
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in new issue