Update goleveldb

This commit is contained in:
Jakob Borg 2014-12-29 12:22:58 +01:00
parent d90b2c1d52
commit 8c7f1421c6
20 changed files with 1451 additions and 1395 deletions

16
Godeps/Godeps.json generated
View File

@ -1,6 +1,6 @@
{ {
"ImportPath": "github.com/syncthing/syncthing", "ImportPath": "github.com/syncthing/syncthing",
"GoVersion": "go1.4rc1", "GoVersion": "go1.4",
"Packages": [ "Packages": [
"./cmd/..." "./cmd/..."
], ],
@ -31,7 +31,7 @@
}, },
{ {
"ImportPath": "github.com/syndtr/goleveldb/leveldb", "ImportPath": "github.com/syndtr/goleveldb/leveldb",
"Rev": "97e257099d2ab9578151ba85e2641e2cd14d3ca8" "Rev": "63c9e642efad852f49e20a6f90194cae112fd2ac"
}, },
{ {
"ImportPath": "github.com/syndtr/gosnappy/snappy", "ImportPath": "github.com/syndtr/gosnappy/snappy",
@ -51,23 +51,19 @@
}, },
{ {
"ImportPath": "golang.org/x/crypto/bcrypt", "ImportPath": "golang.org/x/crypto/bcrypt",
"Comment": "null-236", "Rev": "731db29863ea7213d9556d0170afb38987f401d4"
"Rev": "69e2a90ed92d03812364aeb947b7068dc42e561e"
}, },
{ {
"ImportPath": "golang.org/x/crypto/blowfish", "ImportPath": "golang.org/x/crypto/blowfish",
"Comment": "null-236", "Rev": "731db29863ea7213d9556d0170afb38987f401d4"
"Rev": "69e2a90ed92d03812364aeb947b7068dc42e561e"
}, },
{ {
"ImportPath": "golang.org/x/text/transform", "ImportPath": "golang.org/x/text/transform",
"Comment": "null-112", "Rev": "985ee5acfaf1ff6712c7c99438752f8e09416ccb"
"Rev": "2f707e0ad64637ca1318279be7201f5ed19c4050"
}, },
{ {
"ImportPath": "golang.org/x/text/unicode/norm", "ImportPath": "golang.org/x/text/unicode/norm",
"Comment": "null-112", "Rev": "985ee5acfaf1ff6712c7c99438752f8e09416ccb"
"Rev": "2f707e0ad64637ca1318279be7201f5ed19c4050"
} }
] ]
} }

View File

@ -8,152 +8,669 @@
package cache package cache
import ( import (
"sync"
"sync/atomic" "sync/atomic"
"unsafe"
"github.com/syndtr/goleveldb/leveldb/util"
) )
// SetFunc is the function that will be called by Namespace.Get to create // Cacher provides interface to implements a caching functionality.
// a cache object, if charge is less than one than the cache object will // An implementation must be goroutine-safe.
// not be registered to cache tree, if value is nil then the cache object type Cacher interface {
// will not be created. // Capacity returns cache capacity.
type SetFunc func() (charge int, value interface{})
// DelFin is the function that will be called as the result of a delete operation.
// Exist == true is indication that the object is exist, and pending == true is
// indication of deletion already happen but haven't done yet (wait for all handles
// to be released). And exist == false means the object doesn't exist.
type DelFin func(exist, pending bool)
// PurgeFin is the function that will be called as the result of a purge operation.
type PurgeFin func(ns, key uint64)
// Cache is a cache tree. A cache instance must be goroutine-safe.
type Cache interface {
// SetCapacity sets cache tree capacity.
SetCapacity(capacity int)
// Capacity returns cache tree capacity.
Capacity() int Capacity() int
// Used returns used cache tree capacity. // SetCapacity sets cache capacity.
Used() int SetCapacity(capacity int)
// Size returns entire alive cache objects size. // Promote promotes the 'cache node'.
Size() int Promote(n *Node)
// NumObjects returns number of alive objects. // Ban evicts the 'cache node' and prevent subsequent 'promote'.
NumObjects() int Ban(n *Node)
// GetNamespace gets cache namespace with the given id. // Evict evicts the 'cache node'.
// GetNamespace is never return nil. Evict(n *Node)
GetNamespace(id uint64) Namespace
// PurgeNamespace purges cache namespace with the given id from this cache tree. // EvictNS evicts 'cache node' with the given namespace.
// Also read Namespace.Purge. EvictNS(ns uint64)
PurgeNamespace(id uint64, fin PurgeFin)
// ZapNamespace detaches cache namespace with the given id from this cache tree. // EvictAll evicts all 'cache node'.
// Also read Namespace.Zap. EvictAll()
ZapNamespace(id uint64)
// Purge purges all cache namespace from this cache tree. // Close closes the 'cache tree'
// This is behave the same as calling Namespace.Purge method on all cache namespace. Close() error
Purge(fin PurgeFin)
// Zap detaches all cache namespace from this cache tree.
// This is behave the same as calling Namespace.Zap method on all cache namespace.
Zap()
} }
// Namespace is a cache namespace. A namespace instance must be goroutine-safe. // Value is a 'cacheable object'. It may implements util.Releaser, if
type Namespace interface { // so the the Release method will be called once object is released.
// Get gets cache object with the given key. type Value interface{}
// If cache object is not found and setf is not nil, Get will atomically creates
// the cache object by calling setf. Otherwise Get will returns nil.
//
// The returned cache handle should be released after use by calling Release
// method.
Get(key uint64, setf SetFunc) Handle
// Delete removes cache object with the given key from cache tree. type CacheGetter struct {
// A deleted cache object will be released as soon as all of its handles have Cache *Cache
// been released. NS uint64
// Delete only happen once, subsequent delete will consider cache object doesn't
// exist, even if the cache object ins't released yet.
//
// If not nil, fin will be called if the cache object doesn't exist or when
// finally be released.
//
// Delete returns true if such cache object exist and never been deleted.
Delete(key uint64, fin DelFin) bool
// Purge removes all cache objects within this namespace from cache tree.
// This is the same as doing delete on all cache objects.
//
// If not nil, fin will be called on all cache objects when its finally be
// released.
Purge(fin PurgeFin)
// Zap detaches namespace from cache tree and release all its cache objects.
// A zapped namespace can never be filled again.
// Calling Get on zapped namespace will always return nil.
Zap()
} }
// Handle is a cache handle. func (g *CacheGetter) Get(key uint64, setFunc func() (size int, value Value)) *Handle {
type Handle interface { return g.Cache.Get(g.NS, key, setFunc)
// Release releases this cache handle. This method can be safely called mutiple
// times.
Release()
// Value returns value of this cache handle.
// Value will returns nil after this cache handle have be released.
Value() interface{}
} }
// The hash tables implementation is based on:
// "Dynamic-Sized Nonblocking Hash Tables", by Yujie Liu, Kunlong Zhang, and Michael Spear. ACM Symposium on Principles of Distributed Computing, Jul 2014.
const ( const (
DelNotExist = iota mInitialSize = 1 << 4
DelExist mOverflowThreshold = 1 << 5
DelPendig mOverflowGrowThreshold = 1 << 7
) )
// Namespace state. type mBucket struct {
type nsState int mu sync.Mutex
node []*Node
const ( frozen bool
nsEffective nsState = iota
nsZapped
)
// Node state.
type nodeState int
const (
nodeZero nodeState = iota
nodeEffective
nodeEvicted
nodeDeleted
)
// Fake handle.
type fakeHandle struct {
value interface{}
fin func()
once uint32
} }
func (h *fakeHandle) Value() interface{} { func (b *mBucket) freeze() []*Node {
if atomic.LoadUint32(&h.once) == 0 { b.mu.Lock()
return h.value defer b.mu.Unlock()
if !b.frozen {
b.frozen = true
}
return b.node
}
func (b *mBucket) get(r *Cache, h *mNode, hash uint32, ns, key uint64, noset bool) (done, added bool, n *Node) {
b.mu.Lock()
if b.frozen {
b.mu.Unlock()
return
}
// Scan the node.
for _, n := range b.node {
if n.hash == hash && n.ns == ns && n.key == key {
atomic.AddInt32(&n.ref, 1)
b.mu.Unlock()
return true, false, n
}
}
// Get only.
if noset {
b.mu.Unlock()
return true, false, nil
}
// Create node.
n = &Node{
r: r,
hash: hash,
ns: ns,
key: key,
ref: 1,
}
// Add node to bucket.
b.node = append(b.node, n)
bLen := len(b.node)
b.mu.Unlock()
// Update counter.
grow := atomic.AddInt32(&r.nodes, 1) >= h.growThreshold
if bLen > mOverflowThreshold {
grow = grow || atomic.AddInt32(&h.overflow, 1) >= mOverflowGrowThreshold
}
// Grow.
if grow && atomic.CompareAndSwapInt32(&h.resizeInProgess, 0, 1) {
nhLen := len(h.buckets) << 1
nh := &mNode{
buckets: make([]unsafe.Pointer, nhLen),
mask: uint32(nhLen) - 1,
pred: unsafe.Pointer(h),
growThreshold: int32(nhLen * mOverflowThreshold),
shrinkThreshold: int32(nhLen >> 1),
}
ok := atomic.CompareAndSwapPointer(&r.mHead, unsafe.Pointer(h), unsafe.Pointer(nh))
if !ok {
panic("BUG: failed swapping head")
}
go nh.initBuckets()
}
return true, true, n
}
func (b *mBucket) delete(r *Cache, h *mNode, hash uint32, ns, key uint64) (done, deleted bool) {
b.mu.Lock()
if b.frozen {
b.mu.Unlock()
return
}
// Scan the node.
var (
n *Node
bLen int
)
for i := range b.node {
n = b.node[i]
if n.ns == ns && n.key == key {
if atomic.LoadInt32(&n.ref) == 0 {
deleted = true
// Call releaser.
if n.value != nil {
if r, ok := n.value.(util.Releaser); ok {
r.Release()
}
n.value = nil
}
// Remove node from bucket.
b.node = append(b.node[:i], b.node[i+1:]...)
bLen = len(b.node)
}
break
}
}
b.mu.Unlock()
if deleted {
// Call OnDel.
for _, f := range n.onDel {
f()
}
// Update counter.
atomic.AddInt32(&r.size, int32(n.size)*-1)
shrink := atomic.AddInt32(&r.nodes, -1) < h.shrinkThreshold
if bLen >= mOverflowThreshold {
atomic.AddInt32(&h.overflow, -1)
}
// Shrink.
if shrink && len(h.buckets) > mInitialSize && atomic.CompareAndSwapInt32(&h.resizeInProgess, 0, 1) {
nhLen := len(h.buckets) >> 1
nh := &mNode{
buckets: make([]unsafe.Pointer, nhLen),
mask: uint32(nhLen) - 1,
pred: unsafe.Pointer(h),
growThreshold: int32(nhLen * mOverflowThreshold),
shrinkThreshold: int32(nhLen >> 1),
}
ok := atomic.CompareAndSwapPointer(&r.mHead, unsafe.Pointer(h), unsafe.Pointer(nh))
if !ok {
panic("BUG: failed swapping head")
}
go nh.initBuckets()
}
}
return true, deleted
}
type mNode struct {
buckets []unsafe.Pointer // []*mBucket
mask uint32
pred unsafe.Pointer // *mNode
resizeInProgess int32
overflow int32
growThreshold int32
shrinkThreshold int32
}
func (n *mNode) initBucket(i uint32) *mBucket {
if b := (*mBucket)(atomic.LoadPointer(&n.buckets[i])); b != nil {
return b
}
p := (*mNode)(atomic.LoadPointer(&n.pred))
if p != nil {
var node []*Node
if n.mask > p.mask {
// Grow.
pb := (*mBucket)(atomic.LoadPointer(&p.buckets[i&p.mask]))
if pb == nil {
pb = p.initBucket(i & p.mask)
}
m := pb.freeze()
// Split nodes.
for _, x := range m {
if x.hash&n.mask == i {
node = append(node, x)
}
}
} else {
// Shrink.
pb0 := (*mBucket)(atomic.LoadPointer(&p.buckets[i]))
if pb0 == nil {
pb0 = p.initBucket(i)
}
pb1 := (*mBucket)(atomic.LoadPointer(&p.buckets[i+uint32(len(n.buckets))]))
if pb1 == nil {
pb1 = p.initBucket(i + uint32(len(n.buckets)))
}
m0 := pb0.freeze()
m1 := pb1.freeze()
// Merge nodes.
node = make([]*Node, 0, len(m0)+len(m1))
node = append(node, m0...)
node = append(node, m1...)
}
b := &mBucket{node: node}
if atomic.CompareAndSwapPointer(&n.buckets[i], nil, unsafe.Pointer(b)) {
if len(node) > mOverflowThreshold {
atomic.AddInt32(&n.overflow, int32(len(node)-mOverflowThreshold))
}
return b
}
}
return (*mBucket)(atomic.LoadPointer(&n.buckets[i]))
}
func (n *mNode) initBuckets() {
for i := range n.buckets {
n.initBucket(uint32(i))
}
atomic.StorePointer(&n.pred, nil)
}
// Cache is a 'cache map'.
type Cache struct {
mu sync.RWMutex
mHead unsafe.Pointer // *mNode
nodes int32
size int32
cacher Cacher
closed bool
}
// NewCache creates a new 'cache map'. The cacher is optional and
// may be nil.
func NewCache(cacher Cacher) *Cache {
h := &mNode{
buckets: make([]unsafe.Pointer, mInitialSize),
mask: mInitialSize - 1,
growThreshold: int32(mInitialSize * mOverflowThreshold),
shrinkThreshold: 0,
}
for i := range h.buckets {
h.buckets[i] = unsafe.Pointer(&mBucket{})
}
r := &Cache{
mHead: unsafe.Pointer(h),
cacher: cacher,
}
return r
}
func (r *Cache) getBucket(hash uint32) (*mNode, *mBucket) {
h := (*mNode)(atomic.LoadPointer(&r.mHead))
i := hash & h.mask
b := (*mBucket)(atomic.LoadPointer(&h.buckets[i]))
if b == nil {
b = h.initBucket(i)
}
return h, b
}
func (r *Cache) delete(n *Node) bool {
for {
h, b := r.getBucket(n.hash)
done, deleted := b.delete(r, h, n.hash, n.ns, n.key)
if done {
return deleted
}
}
return false
}
// Nodes returns number of 'cache node' in the map.
func (r *Cache) Nodes() int {
return int(atomic.LoadInt32(&r.nodes))
}
// Size returns sums of 'cache node' size in the map.
func (r *Cache) Size() int {
return int(atomic.LoadInt32(&r.size))
}
// Capacity returns cache capacity.
func (r *Cache) Capacity() int {
if r.cacher == nil {
return 0
}
return r.cacher.Capacity()
}
// SetCapacity sets cache capacity.
func (r *Cache) SetCapacity(capacity int) {
if r.cacher != nil {
r.cacher.SetCapacity(capacity)
}
}
// Get gets 'cache node' with the given namespace and key.
// If cache node is not found and setFunc is not nil, Get will atomically creates
// the 'cache node' by calling setFunc. Otherwise Get will returns nil.
//
// The returned 'cache handle' should be released after use by calling Release
// method.
func (r *Cache) Get(ns, key uint64, setFunc func() (size int, value Value)) *Handle {
r.mu.RLock()
defer r.mu.RUnlock()
if r.closed {
return nil
}
hash := murmur32(ns, key, 0xf00)
for {
h, b := r.getBucket(hash)
done, _, n := b.get(r, h, hash, ns, key, setFunc == nil)
if done {
if n != nil {
n.mu.Lock()
if n.value == nil {
if setFunc == nil {
n.mu.Unlock()
n.unref()
return nil
}
n.size, n.value = setFunc()
if n.value == nil {
n.size = 0
n.mu.Unlock()
n.unref()
return nil
}
atomic.AddInt32(&r.size, int32(n.size))
}
n.mu.Unlock()
if r.cacher != nil {
r.cacher.Promote(n)
}
return &Handle{unsafe.Pointer(n)}
}
break
}
} }
return nil return nil
} }
func (h *fakeHandle) Release() { // Delete removes and ban 'cache node' with the given namespace and key.
if !atomic.CompareAndSwapUint32(&h.once, 0, 1) { // A banned 'cache node' will never inserted into the 'cache tree'. Ban
// only attributed to the particular 'cache node', so when a 'cache node'
// is recreated it will not be banned.
//
// If onDel is not nil, then it will be executed if such 'cache node'
// doesn't exist or once the 'cache node' is released.
//
// Delete return true is such 'cache node' exist.
func (r *Cache) Delete(ns, key uint64, onDel func()) bool {
r.mu.RLock()
defer r.mu.RUnlock()
if r.closed {
return false
}
hash := murmur32(ns, key, 0xf00)
for {
h, b := r.getBucket(hash)
done, _, n := b.get(r, h, hash, ns, key, true)
if done {
if n != nil {
if onDel != nil {
n.mu.Lock()
n.onDel = append(n.onDel, onDel)
n.mu.Unlock()
}
if r.cacher != nil {
r.cacher.Ban(n)
}
n.unref()
return true
}
break
}
}
if onDel != nil {
onDel()
}
return false
}
// Evict evicts 'cache node' with the given namespace and key. This will
// simply call Cacher.Evict.
//
// Evict return true is such 'cache node' exist.
func (r *Cache) Evict(ns, key uint64) bool {
r.mu.RLock()
defer r.mu.RUnlock()
if r.closed {
return false
}
hash := murmur32(ns, key, 0xf00)
for {
h, b := r.getBucket(hash)
done, _, n := b.get(r, h, hash, ns, key, true)
if done {
if n != nil {
if r.cacher != nil {
r.cacher.Evict(n)
}
n.unref()
return true
}
break
}
}
return false
}
// EvictNS evicts 'cache node' with the given namespace. This will
// simply call Cacher.EvictNS.
func (r *Cache) EvictNS(ns uint64) {
r.mu.RLock()
defer r.mu.RUnlock()
if r.closed {
return return
} }
if h.fin != nil {
h.fin() if r.cacher != nil {
h.fin = nil r.cacher.EvictNS(ns)
} }
} }
// EvictAll evicts all 'cache node'. This will simply call Cacher.EvictAll.
func (r *Cache) EvictAll() {
r.mu.RLock()
defer r.mu.RUnlock()
if r.closed {
return
}
if r.cacher != nil {
r.cacher.EvictAll()
}
}
// Close closes the 'cache map' and releases all 'cache node'.
func (r *Cache) Close() error {
r.mu.Lock()
if !r.closed {
r.closed = true
if r.cacher != nil {
if err := r.cacher.Close(); err != nil {
return err
}
}
h := (*mNode)(r.mHead)
h.initBuckets()
for i := range h.buckets {
b := (*mBucket)(h.buckets[i])
for _, n := range b.node {
// Call releaser.
if n.value != nil {
if r, ok := n.value.(util.Releaser); ok {
r.Release()
}
n.value = nil
}
// Call OnDel.
for _, f := range n.onDel {
f()
}
}
}
}
r.mu.Unlock()
return nil
}
// Node is a 'cache node'.
type Node struct {
r *Cache
hash uint32
ns, key uint64
mu sync.Mutex
size int
value Value
ref int32
onDel []func()
CacheData unsafe.Pointer
}
// NS returns this 'cache node' namespace.
func (n *Node) NS() uint64 {
return n.ns
}
// Key returns this 'cache node' key.
func (n *Node) Key() uint64 {
return n.key
}
// Size returns this 'cache node' size.
func (n *Node) Size() int {
return n.size
}
// Value returns this 'cache node' value.
func (n *Node) Value() Value {
return n.value
}
// Ref returns this 'cache node' ref counter.
func (n *Node) Ref() int32 {
return atomic.LoadInt32(&n.ref)
}
// GetHandle returns an handle for this 'cache node'.
func (n *Node) GetHandle() *Handle {
if atomic.AddInt32(&n.ref, 1) <= 1 {
panic("BUG: Node.GetHandle on zero ref")
}
return &Handle{unsafe.Pointer(n)}
}
func (n *Node) unref() {
if atomic.AddInt32(&n.ref, -1) == 0 {
n.r.delete(n)
}
}
func (n *Node) unrefLocked() {
if atomic.AddInt32(&n.ref, -1) == 0 {
n.r.mu.RLock()
if !n.r.closed {
n.r.delete(n)
}
n.r.mu.RUnlock()
}
}
type Handle struct {
n unsafe.Pointer // *Node
}
func (h *Handle) Value() Value {
n := (*Node)(atomic.LoadPointer(&h.n))
if n != nil {
return n.value
}
return nil
}
func (h *Handle) Release() {
nPtr := atomic.LoadPointer(&h.n)
if nPtr != nil && atomic.CompareAndSwapPointer(&h.n, nPtr, nil) {
n := (*Node)(nPtr)
n.unrefLocked()
}
}
func murmur32(ns, key uint64, seed uint32) uint32 {
const (
m = uint32(0x5bd1e995)
r = 24
)
k1 := uint32(ns >> 32)
k2 := uint32(ns)
k3 := uint32(key >> 32)
k4 := uint32(key)
k1 *= m
k1 ^= k1 >> r
k1 *= m
k2 *= m
k2 ^= k2 >> r
k2 *= m
k3 *= m
k3 ^= k3 >> r
k3 *= m
k4 *= m
k4 ^= k4 >> r
k4 *= m
h := seed
h *= m
h ^= k1
h *= m
h ^= k2
h *= m
h ^= k3
h *= m
h ^= k4
h ^= h >> 13
h *= m
h ^= h >> 15
return h
}

View File

@ -13,11 +13,26 @@ import (
"sync/atomic" "sync/atomic"
"testing" "testing"
"time" "time"
"unsafe"
) )
type int32o int32
func (o *int32o) acquire() {
if atomic.AddInt32((*int32)(o), 1) != 1 {
panic("BUG: invalid ref")
}
}
func (o *int32o) Release() {
if atomic.AddInt32((*int32)(o), -1) != 0 {
panic("BUG: invalid ref")
}
}
type releaserFunc struct { type releaserFunc struct {
fn func() fn func()
value interface{} value Value
} }
func (r releaserFunc) Release() { func (r releaserFunc) Release() {
@ -26,8 +41,8 @@ func (r releaserFunc) Release() {
} }
} }
func set(ns Namespace, key uint64, value interface{}, charge int, relf func()) Handle { func set(c *Cache, ns, key uint64, value Value, charge int, relf func()) *Handle {
return ns.Get(key, func() (int, interface{}) { return c.Get(ns, key, func() (int, Value) {
if relf != nil { if relf != nil {
return charge, releaserFunc{relf, value} return charge, releaserFunc{relf, value}
} else { } else {
@ -36,7 +51,246 @@ func set(ns Namespace, key uint64, value interface{}, charge int, relf func()) H
}) })
} }
func TestCache_HitMiss(t *testing.T) { func TestCacheMap(t *testing.T) {
runtime.GOMAXPROCS(runtime.NumCPU())
nsx := []struct {
nobjects, nhandles, concurrent, repeat int
}{
{10000, 400, 50, 3},
{100000, 1000, 100, 10},
}
var (
objects [][]int32o
handles [][]unsafe.Pointer
)
for _, x := range nsx {
objects = append(objects, make([]int32o, x.nobjects))
handles = append(handles, make([]unsafe.Pointer, x.nhandles))
}
c := NewCache(nil)
wg := new(sync.WaitGroup)
var done int32
for ns, x := range nsx {
for i := 0; i < x.concurrent; i++ {
wg.Add(1)
go func(ns, i, repeat int, objects []int32o, handles []unsafe.Pointer) {
defer wg.Done()
r := rand.New(rand.NewSource(time.Now().UnixNano()))
for j := len(objects) * repeat; j >= 0; j-- {
key := uint64(r.Intn(len(objects)))
h := c.Get(uint64(ns), key, func() (int, Value) {
o := &objects[key]
o.acquire()
return 1, o
})
if v := h.Value().(*int32o); v != &objects[key] {
t.Fatalf("#%d invalid value: want=%p got=%p", ns, &objects[key], v)
}
if objects[key] != 1 {
t.Fatalf("#%d invalid object %d: %d", ns, key, objects[key])
}
if !atomic.CompareAndSwapPointer(&handles[r.Intn(len(handles))], nil, unsafe.Pointer(h)) {
h.Release()
}
}
}(ns, i, x.repeat, objects[ns], handles[ns])
}
go func(handles []unsafe.Pointer) {
r := rand.New(rand.NewSource(time.Now().UnixNano()))
for atomic.LoadInt32(&done) == 0 {
i := r.Intn(len(handles))
h := (*Handle)(atomic.LoadPointer(&handles[i]))
if h != nil && atomic.CompareAndSwapPointer(&handles[i], unsafe.Pointer(h), nil) {
h.Release()
}
time.Sleep(time.Millisecond)
}
}(handles[ns])
}
go func() {
handles := make([]*Handle, 100000)
for atomic.LoadInt32(&done) == 0 {
for i := range handles {
handles[i] = c.Get(999999999, uint64(i), func() (int, Value) {
return 1, 1
})
}
for _, h := range handles {
h.Release()
}
}
}()
wg.Wait()
atomic.StoreInt32(&done, 1)
for _, handles0 := range handles {
for i := range handles0 {
h := (*Handle)(atomic.LoadPointer(&handles0[i]))
if h != nil && atomic.CompareAndSwapPointer(&handles0[i], unsafe.Pointer(h), nil) {
h.Release()
}
}
}
for ns, objects0 := range objects {
for i, o := range objects0 {
if o != 0 {
t.Fatalf("invalid object #%d.%d: ref=%d", ns, i, o)
}
}
}
}
func TestCacheMap_NodesAndSize(t *testing.T) {
c := NewCache(nil)
if c.Nodes() != 0 {
t.Errorf("invalid nodes counter: want=%d got=%d", 0, c.Nodes())
}
if c.Size() != 0 {
t.Errorf("invalid size counter: want=%d got=%d", 0, c.Size())
}
set(c, 0, 1, 1, 1, nil)
set(c, 0, 2, 2, 2, nil)
set(c, 1, 1, 3, 3, nil)
set(c, 2, 1, 4, 1, nil)
if c.Nodes() != 4 {
t.Errorf("invalid nodes counter: want=%d got=%d", 4, c.Nodes())
}
if c.Size() != 7 {
t.Errorf("invalid size counter: want=%d got=%d", 4, c.Size())
}
}
func TestLRUCache_Capacity(t *testing.T) {
c := NewCache(NewLRU(10))
if c.Capacity() != 10 {
t.Errorf("invalid capacity: want=%d got=%d", 10, c.Capacity())
}
set(c, 0, 1, 1, 1, nil).Release()
set(c, 0, 2, 2, 2, nil).Release()
set(c, 1, 1, 3, 3, nil).Release()
set(c, 2, 1, 4, 1, nil).Release()
set(c, 2, 2, 5, 1, nil).Release()
set(c, 2, 3, 6, 1, nil).Release()
set(c, 2, 4, 7, 1, nil).Release()
set(c, 2, 5, 8, 1, nil).Release()
if c.Nodes() != 7 {
t.Errorf("invalid nodes counter: want=%d got=%d", 7, c.Nodes())
}
if c.Size() != 10 {
t.Errorf("invalid size counter: want=%d got=%d", 10, c.Size())
}
c.SetCapacity(9)
if c.Capacity() != 9 {
t.Errorf("invalid capacity: want=%d got=%d", 9, c.Capacity())
}
if c.Nodes() != 6 {
t.Errorf("invalid nodes counter: want=%d got=%d", 6, c.Nodes())
}
if c.Size() != 8 {
t.Errorf("invalid size counter: want=%d got=%d", 8, c.Size())
}
}
func TestCacheMap_NilValue(t *testing.T) {
c := NewCache(NewLRU(10))
h := c.Get(0, 0, func() (size int, value Value) {
return 1, nil
})
if h != nil {
t.Error("cache handle is non-nil")
}
if c.Nodes() != 0 {
t.Errorf("invalid nodes counter: want=%d got=%d", 0, c.Nodes())
}
if c.Size() != 0 {
t.Errorf("invalid size counter: want=%d got=%d", 0, c.Size())
}
}
func TestLRUCache_GetLatency(t *testing.T) {
runtime.GOMAXPROCS(runtime.NumCPU())
const (
concurrentSet = 30
concurrentGet = 3
duration = 3 * time.Second
delay = 3 * time.Millisecond
maxkey = 100000
)
var (
set, getHit, getAll int32
getMaxLatency, getDuration int64
)
c := NewCache(NewLRU(5000))
wg := &sync.WaitGroup{}
until := time.Now().Add(duration)
for i := 0; i < concurrentSet; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
r := rand.New(rand.NewSource(time.Now().UnixNano()))
for time.Now().Before(until) {
c.Get(0, uint64(r.Intn(maxkey)), func() (int, Value) {
time.Sleep(delay)
atomic.AddInt32(&set, 1)
return 1, 1
}).Release()
}
}(i)
}
for i := 0; i < concurrentGet; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
r := rand.New(rand.NewSource(time.Now().UnixNano()))
for {
mark := time.Now()
if mark.Before(until) {
h := c.Get(0, uint64(r.Intn(maxkey)), nil)
latency := int64(time.Now().Sub(mark))
m := atomic.LoadInt64(&getMaxLatency)
if latency > m {
atomic.CompareAndSwapInt64(&getMaxLatency, m, latency)
}
atomic.AddInt64(&getDuration, latency)
if h != nil {
atomic.AddInt32(&getHit, 1)
h.Release()
}
atomic.AddInt32(&getAll, 1)
} else {
break
}
}
}(i)
}
wg.Wait()
getAvglatency := time.Duration(getDuration) / time.Duration(getAll)
t.Logf("set=%d getHit=%d getAll=%d getMaxLatency=%v getAvgLatency=%v",
set, getHit, getAll, time.Duration(getMaxLatency), getAvglatency)
if getAvglatency > delay/3 {
t.Errorf("get avg latency > %v: got=%v", delay/3, getAvglatency)
}
}
func TestLRUCache_HitMiss(t *testing.T) {
cases := []struct { cases := []struct {
key uint64 key uint64
value string value string
@ -54,14 +308,13 @@ func TestCache_HitMiss(t *testing.T) {
} }
setfin := 0 setfin := 0
c := NewLRUCache(1000) c := NewCache(NewLRU(1000))
ns := c.GetNamespace(0)
for i, x := range cases { for i, x := range cases {
set(ns, x.key, x.value, len(x.value), func() { set(c, 0, x.key, x.value, len(x.value), func() {
setfin++ setfin++
}).Release() }).Release()
for j, y := range cases { for j, y := range cases {
h := ns.Get(y.key, nil) h := c.Get(0, y.key, nil)
if j <= i { if j <= i {
// should hit // should hit
if h == nil { if h == nil {
@ -85,7 +338,7 @@ func TestCache_HitMiss(t *testing.T) {
for i, x := range cases { for i, x := range cases {
finalizerOk := false finalizerOk := false
ns.Delete(x.key, func(exist, pending bool) { c.Delete(0, x.key, func() {
finalizerOk = true finalizerOk = true
}) })
@ -94,7 +347,7 @@ func TestCache_HitMiss(t *testing.T) {
} }
for j, y := range cases { for j, y := range cases {
h := ns.Get(y.key, nil) h := c.Get(0, y.key, nil)
if j > i { if j > i {
// should hit // should hit
if h == nil { if h == nil {
@ -122,20 +375,19 @@ func TestCache_HitMiss(t *testing.T) {
} }
func TestLRUCache_Eviction(t *testing.T) { func TestLRUCache_Eviction(t *testing.T) {
c := NewLRUCache(12) c := NewCache(NewLRU(12))
ns := c.GetNamespace(0) o1 := set(c, 0, 1, 1, 1, nil)
o1 := set(ns, 1, 1, 1, nil) set(c, 0, 2, 2, 1, nil).Release()
set(ns, 2, 2, 1, nil).Release() set(c, 0, 3, 3, 1, nil).Release()
set(ns, 3, 3, 1, nil).Release() set(c, 0, 4, 4, 1, nil).Release()
set(ns, 4, 4, 1, nil).Release() set(c, 0, 5, 5, 1, nil).Release()
set(ns, 5, 5, 1, nil).Release() if h := c.Get(0, 2, nil); h != nil { // 1,3,4,5,2
if h := ns.Get(2, nil); h != nil { // 1,3,4,5,2
h.Release() h.Release()
} }
set(ns, 9, 9, 10, nil).Release() // 5,2,9 set(c, 0, 9, 9, 10, nil).Release() // 5,2,9
for _, key := range []uint64{9, 2, 5, 1} { for _, key := range []uint64{9, 2, 5, 1} {
h := ns.Get(key, nil) h := c.Get(0, key, nil)
if h == nil { if h == nil {
t.Errorf("miss for key '%d'", key) t.Errorf("miss for key '%d'", key)
} else { } else {
@ -147,7 +399,7 @@ func TestLRUCache_Eviction(t *testing.T) {
} }
o1.Release() o1.Release()
for _, key := range []uint64{1, 2, 5} { for _, key := range []uint64{1, 2, 5} {
h := ns.Get(key, nil) h := c.Get(0, key, nil)
if h == nil { if h == nil {
t.Errorf("miss for key '%d'", key) t.Errorf("miss for key '%d'", key)
} else { } else {
@ -158,7 +410,7 @@ func TestLRUCache_Eviction(t *testing.T) {
} }
} }
for _, key := range []uint64{3, 4, 9} { for _, key := range []uint64{3, 4, 9} {
h := ns.Get(key, nil) h := c.Get(0, key, nil)
if h != nil { if h != nil {
t.Errorf("hit for key '%d'", key) t.Errorf("hit for key '%d'", key)
if x := h.Value().(int); x != int(key) { if x := h.Value().(int); x != int(key) {
@ -169,487 +421,150 @@ func TestLRUCache_Eviction(t *testing.T) {
} }
} }
func TestLRUCache_SetGet(t *testing.T) { func TestLRUCache_Evict(t *testing.T) {
c := NewLRUCache(13) c := NewCache(NewLRU(6))
ns := c.GetNamespace(0) set(c, 0, 1, 1, 1, nil).Release()
for i := 0; i < 200; i++ { set(c, 0, 2, 2, 1, nil).Release()
n := uint64(rand.Intn(99999) % 20) set(c, 1, 1, 4, 1, nil).Release()
set(ns, n, n, 1, nil).Release() set(c, 1, 2, 5, 1, nil).Release()
if h := ns.Get(n, nil); h != nil { set(c, 2, 1, 6, 1, nil).Release()
if h.Value() == nil { set(c, 2, 2, 7, 1, nil).Release()
t.Errorf("key '%d' contains nil value", n)
for ns := 0; ns < 3; ns++ {
for key := 1; key < 3; key++ {
if h := c.Get(uint64(ns), uint64(key), nil); h != nil {
h.Release()
} else { } else {
if x := h.Value().(uint64); x != n { t.Errorf("Cache.Get on #%d.%d return nil", ns, key)
t.Errorf("invalid value for key '%d' want '%d', got '%d'", n, n, x)
}
} }
}
}
if ok := c.Evict(0, 1); !ok {
t.Error("first Cache.Evict on #0.1 return false")
}
if ok := c.Evict(0, 1); ok {
t.Error("second Cache.Evict on #0.1 return true")
}
if h := c.Get(0, 1, nil); h != nil {
t.Errorf("Cache.Get on #0.1 return non-nil: %v", h.Value())
}
c.EvictNS(1)
if h := c.Get(1, 1, nil); h != nil {
t.Errorf("Cache.Get on #1.1 return non-nil: %v", h.Value())
}
if h := c.Get(1, 2, nil); h != nil {
t.Errorf("Cache.Get on #1.2 return non-nil: %v", h.Value())
}
c.EvictAll()
for ns := 0; ns < 3; ns++ {
for key := 1; key < 3; key++ {
if h := c.Get(uint64(ns), uint64(key), nil); h != nil {
t.Errorf("Cache.Get on #%d.%d return non-nil: %v", ns, key, h.Value())
}
}
}
}
func TestLRUCache_Delete(t *testing.T) {
delFuncCalled := 0
delFunc := func() {
delFuncCalled++
}
c := NewCache(NewLRU(2))
set(c, 0, 1, 1, 1, nil).Release()
set(c, 0, 2, 2, 1, nil).Release()
if ok := c.Delete(0, 1, delFunc); !ok {
t.Error("Cache.Delete on #1 return false")
}
if h := c.Get(0, 1, nil); h != nil {
t.Errorf("Cache.Get on #1 return non-nil: %v", h.Value())
}
if ok := c.Delete(0, 1, delFunc); ok {
t.Error("Cache.Delete on #1 return true")
}
h2 := c.Get(0, 2, nil)
if h2 == nil {
t.Error("Cache.Get on #2 return nil")
}
if ok := c.Delete(0, 2, delFunc); !ok {
t.Error("(1) Cache.Delete on #2 return false")
}
if ok := c.Delete(0, 2, delFunc); !ok {
t.Error("(2) Cache.Delete on #2 return false")
}
set(c, 0, 3, 3, 1, nil).Release()
set(c, 0, 4, 4, 1, nil).Release()
c.Get(0, 2, nil).Release()
for key := 2; key <= 4; key++ {
if h := c.Get(0, uint64(key), nil); h != nil {
h.Release() h.Release()
} else { } else {
t.Errorf("key '%d' doesn't exist", n) t.Errorf("Cache.Get on #%d return nil", key)
}
}
}
func TestLRUCache_Purge(t *testing.T) {
c := NewLRUCache(3)
ns1 := c.GetNamespace(0)
o1 := set(ns1, 1, 1, 1, nil)
o2 := set(ns1, 2, 2, 1, nil)
ns1.Purge(nil)
set(ns1, 3, 3, 1, nil).Release()
for _, key := range []uint64{1, 2, 3} {
h := ns1.Get(key, nil)
if h == nil {
t.Errorf("miss for key '%d'", key)
} else {
if x := h.Value().(int); x != int(key) {
t.Errorf("invalid value for key '%d' want '%d', got '%d'", key, key, x)
}
h.Release()
}
}
o1.Release()
o2.Release()
for _, key := range []uint64{1, 2} {
h := ns1.Get(key, nil)
if h != nil {
t.Errorf("hit for key '%d'", key)
if x := h.Value().(int); x != int(key) {
t.Errorf("invalid value for key '%d' want '%d', got '%d'", key, key, x)
}
h.Release()
}
}
}
type testingCacheObjectCounter struct {
created uint
released uint
}
func (c *testingCacheObjectCounter) createOne() {
c.created++
}
func (c *testingCacheObjectCounter) releaseOne() {
c.released++
}
type testingCacheObject struct {
t *testing.T
cnt *testingCacheObjectCounter
ns, key uint64
releaseCalled bool
}
func (x *testingCacheObject) Release() {
if !x.releaseCalled {
x.releaseCalled = true
x.cnt.releaseOne()
} else {
x.t.Errorf("duplicate setfin NS#%d KEY#%d", x.ns, x.key)
}
}
func TestLRUCache_ConcurrentSetGet(t *testing.T) {
runtime.GOMAXPROCS(runtime.NumCPU())
seed := time.Now().UnixNano()
t.Logf("seed=%d", seed)
const (
N = 2000000
M = 4000
C = 3
)
var set, get uint32
wg := &sync.WaitGroup{}
c := NewLRUCache(M / 4)
for ni := uint64(0); ni < C; ni++ {
r0 := rand.New(rand.NewSource(seed + int64(ni)))
r1 := rand.New(rand.NewSource(seed + int64(ni) + 1))
ns := c.GetNamespace(ni)
wg.Add(2)
go func(ns Namespace, r *rand.Rand) {
for i := 0; i < N; i++ {
x := uint64(r.Int63n(M))
o := ns.Get(x, func() (int, interface{}) {
atomic.AddUint32(&set, 1)
return 1, x
})
if v := o.Value().(uint64); v != x {
t.Errorf("#%d invalid value, got=%d", x, v)
}
o.Release()
}
wg.Done()
}(ns, r0)
go func(ns Namespace, r *rand.Rand) {
for i := 0; i < N; i++ {
x := uint64(r.Int63n(M))
o := ns.Get(x, nil)
if o != nil {
atomic.AddUint32(&get, 1)
if v := o.Value().(uint64); v != x {
t.Errorf("#%d invalid value, got=%d", x, v)
}
o.Release()
}
}
wg.Done()
}(ns, r1)
}
wg.Wait()
t.Logf("set=%d get=%d", set, get)
}
func TestLRUCache_Finalizer(t *testing.T) {
const (
capacity = 100
goroutines = 100
iterations = 10000
keymax = 8000
)
cnt := &testingCacheObjectCounter{}
c := NewLRUCache(capacity)
type instance struct {
seed int64
rnd *rand.Rand
nsid uint64
ns Namespace
effective int
handles []Handle
handlesMap map[uint64]int
delete bool
purge bool
zap bool
wantDel int
delfinCalled int
delfinCalledAll int
delfinCalledEff int
purgefinCalled int
}
instanceGet := func(p *instance, key uint64) {
h := p.ns.Get(key, func() (charge int, value interface{}) {
to := &testingCacheObject{
t: t, cnt: cnt,
ns: p.nsid,
key: key,
}
p.effective++
cnt.createOne()
return 1, releaserFunc{func() {
to.Release()
p.effective--
}, to}
})
p.handles = append(p.handles, h)
p.handlesMap[key] = p.handlesMap[key] + 1
}
instanceRelease := func(p *instance, i int) {
h := p.handles[i]
key := h.Value().(releaserFunc).value.(*testingCacheObject).key
if n := p.handlesMap[key]; n == 0 {
t.Fatal("key ref == 0")
} else if n > 1 {
p.handlesMap[key] = n - 1
} else {
delete(p.handlesMap, key)
}
h.Release()
p.handles = append(p.handles[:i], p.handles[i+1:]...)
p.handles[len(p.handles) : len(p.handles)+1][0] = nil
}
seed := time.Now().UnixNano()
t.Logf("seed=%d", seed)
instances := make([]*instance, goroutines)
for i := range instances {
p := &instance{}
p.handlesMap = make(map[uint64]int)
p.seed = seed + int64(i)
p.rnd = rand.New(rand.NewSource(p.seed))
p.nsid = uint64(i)
p.ns = c.GetNamespace(p.nsid)
p.delete = i%6 == 0
p.purge = i%8 == 0
p.zap = i%12 == 0 || i%3 == 0
instances[i] = p
}
runr := rand.New(rand.NewSource(seed - 1))
run := func(rnd *rand.Rand, x []*instance, init func(p *instance) bool, fn func(p *instance, i int) bool) {
var (
rx []*instance
rn []int
)
if init == nil {
rx = append([]*instance{}, x...)
rn = make([]int, len(x))
} else {
for _, p := range x {
if init(p) {
rx = append(rx, p)
rn = append(rn, 0)
}
}
}
for len(rx) > 0 {
i := rand.Intn(len(rx))
if fn(rx[i], rn[i]) {
rn[i]++
} else {
rx = append(rx[:i], rx[i+1:]...)
rn = append(rn[:i], rn[i+1:]...)
}
} }
} }
// Get and release. h2.Release()
run(runr, instances, nil, func(p *instance, i int) bool { if h := c.Get(0, 2, nil); h != nil {
if i < iterations { t.Errorf("Cache.Get on #2 return non-nil: %v", h.Value())
if len(p.handles) == 0 || p.rnd.Int()%2 == 0 { }
instanceGet(p, uint64(p.rnd.Intn(keymax)))
} else { if delFuncCalled != 4 {
instanceRelease(p, p.rnd.Intn(len(p.handles))) t.Errorf("delFunc isn't called 4 times: got=%d", delFuncCalled)
} }
return true }
} else {
return false func TestLRUCache_Close(t *testing.T) {
relFuncCalled := 0
relFunc := func() {
relFuncCalled++
}
delFuncCalled := 0
delFunc := func() {
delFuncCalled++
}
c := NewCache(NewLRU(2))
set(c, 0, 1, 1, 1, relFunc).Release()
set(c, 0, 2, 2, 1, relFunc).Release()
h3 := set(c, 0, 3, 3, 1, relFunc)
if h3 == nil {
t.Error("Cache.Get on #3 return nil")
}
if ok := c.Delete(0, 3, delFunc); !ok {
t.Error("Cache.Delete on #3 return false")
}
c.Close()
if relFuncCalled != 3 {
t.Errorf("relFunc isn't called 3 times: got=%d", relFuncCalled)
}
if delFuncCalled != 1 {
t.Errorf("delFunc isn't called 1 times: got=%d", delFuncCalled)
}
}
func BenchmarkLRUCache(b *testing.B) {
c := NewCache(NewLRU(10000))
b.SetParallelism(10)
b.RunParallel(func(pb *testing.PB) {
r := rand.New(rand.NewSource(time.Now().UnixNano()))
for pb.Next() {
key := uint64(r.Intn(1000000))
c.Get(0, key, func() (int, Value) {
return 1, key
}).Release()
} }
}) })
if used, cap := c.Used(), c.Capacity(); used > cap {
t.Errorf("Used > capacity, used=%d cap=%d", used, cap)
}
// Check effective objects.
for i, p := range instances {
if int(p.effective) < len(p.handlesMap) {
t.Errorf("#%d effective objects < acquired handle, eo=%d ah=%d", i, p.effective, len(p.handlesMap))
}
}
if want := int(cnt.created - cnt.released); c.Size() != want {
t.Errorf("Invalid cache size, want=%d got=%d", want, c.Size())
}
// First delete.
run(runr, instances, func(p *instance) bool {
p.wantDel = p.effective
return p.delete
}, func(p *instance, i int) bool {
key := uint64(i)
if key < keymax {
_, wantExist := p.handlesMap[key]
gotExist := p.ns.Delete(key, func(exist, pending bool) {
p.delfinCalledAll++
if exist {
p.delfinCalledEff++
}
})
if !gotExist && wantExist {
t.Errorf("delete on NS#%d KEY#%d not found", p.nsid, key)
}
return true
} else {
return false
}
})
// Second delete.
run(runr, instances, func(p *instance) bool {
p.delfinCalled = 0
return p.delete
}, func(p *instance, i int) bool {
key := uint64(i)
if key < keymax {
gotExist := p.ns.Delete(key, func(exist, pending bool) {
if exist && !pending {
t.Errorf("delete fin on NS#%d KEY#%d exist and not pending for deletion", p.nsid, key)
}
p.delfinCalled++
})
if gotExist {
t.Errorf("delete on NS#%d KEY#%d found", p.nsid, key)
}
return true
} else {
if p.delfinCalled != keymax {
t.Errorf("(2) NS#%d not all delete fin called, diff=%d", p.nsid, keymax-p.delfinCalled)
}
return false
}
})
// Purge.
run(runr, instances, func(p *instance) bool {
return p.purge
}, func(p *instance, i int) bool {
p.ns.Purge(func(ns, key uint64) {
p.purgefinCalled++
})
return false
})
if want := int(cnt.created - cnt.released); c.Size() != want {
t.Errorf("Invalid cache size, want=%d got=%d", want, c.Size())
}
// Release.
run(runr, instances, func(p *instance) bool {
return !p.zap
}, func(p *instance, i int) bool {
if len(p.handles) > 0 {
instanceRelease(p, len(p.handles)-1)
return true
} else {
return false
}
})
if want := int(cnt.created - cnt.released); c.Size() != want {
t.Errorf("Invalid cache size, want=%d got=%d", want, c.Size())
}
// Zap.
run(runr, instances, func(p *instance) bool {
return p.zap
}, func(p *instance, i int) bool {
p.ns.Zap()
p.handles = nil
p.handlesMap = nil
return false
})
if want := int(cnt.created - cnt.released); c.Size() != want {
t.Errorf("Invalid cache size, want=%d got=%d", want, c.Size())
}
if notrel, used := int(cnt.created-cnt.released), c.Used(); notrel != used {
t.Errorf("Invalid used value, want=%d got=%d", notrel, used)
}
c.Purge(nil)
for _, p := range instances {
if p.delete {
if p.delfinCalledAll != keymax {
t.Errorf("#%d not all delete fin called, purge=%v zap=%v diff=%d", p.nsid, p.purge, p.zap, keymax-p.delfinCalledAll)
}
if p.delfinCalledEff != p.wantDel {
t.Errorf("#%d not all effective delete fin called, diff=%d", p.nsid, p.wantDel-p.delfinCalledEff)
}
if p.purge && p.purgefinCalled > 0 {
t.Errorf("#%d some purge fin called, delete=%v zap=%v n=%d", p.nsid, p.delete, p.zap, p.purgefinCalled)
}
} else {
if p.purge {
if p.purgefinCalled != p.wantDel {
t.Errorf("#%d not all purge fin called, delete=%v zap=%v diff=%d", p.nsid, p.delete, p.zap, p.wantDel-p.purgefinCalled)
}
}
}
}
if cnt.created != cnt.released {
t.Errorf("Some cache object weren't released, created=%d released=%d", cnt.created, cnt.released)
}
}
func BenchmarkLRUCache_Set(b *testing.B) {
c := NewLRUCache(0)
ns := c.GetNamespace(0)
b.ResetTimer()
for i := uint64(0); i < uint64(b.N); i++ {
set(ns, i, "", 1, nil)
}
}
func BenchmarkLRUCache_Get(b *testing.B) {
c := NewLRUCache(0)
ns := c.GetNamespace(0)
b.ResetTimer()
for i := uint64(0); i < uint64(b.N); i++ {
set(ns, i, "", 1, nil)
}
b.ResetTimer()
for i := uint64(0); i < uint64(b.N); i++ {
ns.Get(i, nil)
}
}
func BenchmarkLRUCache_Get2(b *testing.B) {
c := NewLRUCache(0)
ns := c.GetNamespace(0)
b.ResetTimer()
for i := uint64(0); i < uint64(b.N); i++ {
set(ns, i, "", 1, nil)
}
b.ResetTimer()
for i := uint64(0); i < uint64(b.N); i++ {
ns.Get(i, func() (charge int, value interface{}) {
return 0, nil
})
}
}
func BenchmarkLRUCache_Release(b *testing.B) {
c := NewLRUCache(0)
ns := c.GetNamespace(0)
handles := make([]Handle, b.N)
for i := uint64(0); i < uint64(b.N); i++ {
handles[i] = set(ns, i, "", 1, nil)
}
b.ResetTimer()
for _, h := range handles {
h.Release()
}
}
func BenchmarkLRUCache_SetRelease(b *testing.B) {
capacity := b.N / 100
if capacity <= 0 {
capacity = 10
}
c := NewLRUCache(capacity)
ns := c.GetNamespace(0)
b.ResetTimer()
for i := uint64(0); i < uint64(b.N); i++ {
set(ns, i, "", 1, nil).Release()
}
}
func BenchmarkLRUCache_SetReleaseTwice(b *testing.B) {
capacity := b.N / 100
if capacity <= 0 {
capacity = 10
}
c := NewLRUCache(capacity)
ns := c.GetNamespace(0)
b.ResetTimer()
na := b.N / 2
nb := b.N - na
for i := uint64(0); i < uint64(na); i++ {
set(ns, i, "", 1, nil).Release()
}
for i := uint64(0); i < uint64(nb); i++ {
set(ns, i, "", 1, nil).Release()
}
} }

View File

@ -0,0 +1,195 @@
// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
// All rights reserved.
//
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
package cache
import (
"sync"
"unsafe"
)
type lruNode struct {
n *Node
h *Handle
ban bool
next, prev *lruNode
}
func (n *lruNode) insert(at *lruNode) {
x := at.next
at.next = n
n.prev = at
n.next = x
x.prev = n
}
func (n *lruNode) remove() {
if n.prev != nil {
n.prev.next = n.next
n.next.prev = n.prev
n.prev = nil
n.next = nil
} else {
panic("BUG: removing removed node")
}
}
type lru struct {
mu sync.Mutex
capacity int
used int
recent lruNode
}
func (r *lru) reset() {
r.recent.next = &r.recent
r.recent.prev = &r.recent
r.used = 0
}
func (r *lru) Capacity() int {
r.mu.Lock()
defer r.mu.Unlock()
return r.capacity
}
func (r *lru) SetCapacity(capacity int) {
var evicted []*lruNode
r.mu.Lock()
r.capacity = capacity
for r.used > r.capacity {
rn := r.recent.prev
if rn == nil {
panic("BUG: invalid LRU used or capacity counter")
}
rn.remove()
rn.n.CacheData = nil
r.used -= rn.n.Size()
evicted = append(evicted, rn)
}
r.mu.Unlock()
for _, rn := range evicted {
rn.h.Release()
}
}
func (r *lru) Promote(n *Node) {
var evicted []*lruNode
r.mu.Lock()
if n.CacheData == nil {
if n.Size() <= r.capacity {
rn := &lruNode{n: n, h: n.GetHandle()}
rn.insert(&r.recent)
n.CacheData = unsafe.Pointer(rn)
r.used += n.Size()
for r.used > r.capacity {
rn := r.recent.prev
if rn == nil {
panic("BUG: invalid LRU used or capacity counter")
}
rn.remove()
rn.n.CacheData = nil
r.used -= rn.n.Size()
evicted = append(evicted, rn)
}
}
} else {
rn := (*lruNode)(n.CacheData)
if !rn.ban {
rn.remove()
rn.insert(&r.recent)
}
}
r.mu.Unlock()
for _, rn := range evicted {
rn.h.Release()
}
}
func (r *lru) Ban(n *Node) {
r.mu.Lock()
if n.CacheData == nil {
n.CacheData = unsafe.Pointer(&lruNode{n: n, ban: true})
} else {
rn := (*lruNode)(n.CacheData)
if !rn.ban {
rn.remove()
rn.ban = true
r.used -= rn.n.Size()
r.mu.Unlock()
rn.h.Release()
rn.h = nil
return
}
}
r.mu.Unlock()
}
func (r *lru) Evict(n *Node) {
r.mu.Lock()
rn := (*lruNode)(n.CacheData)
if rn == nil || rn.ban {
r.mu.Unlock()
return
}
n.CacheData = nil
r.mu.Unlock()
rn.h.Release()
}
func (r *lru) EvictNS(ns uint64) {
var evicted []*lruNode
r.mu.Lock()
for e := r.recent.prev; e != &r.recent; {
rn := e
e = e.prev
if rn.n.NS() == ns {
rn.remove()
rn.n.CacheData = nil
r.used -= rn.n.Size()
evicted = append(evicted, rn)
}
}
r.mu.Unlock()
for _, rn := range evicted {
rn.h.Release()
}
}
func (r *lru) EvictAll() {
r.mu.Lock()
back := r.recent.prev
for rn := back; rn != &r.recent; rn = rn.prev {
rn.n.CacheData = nil
}
r.reset()
r.mu.Unlock()
for rn := back; rn != &r.recent; rn = rn.prev {
rn.h.Release()
}
}
func (r *lru) Close() error {
return nil
}
// NewLRU create a new LRU-cache.
func NewLRU(capacity int) Cacher {
r := &lru{capacity: capacity}
r.reset()
return r
}

View File

@ -1,622 +0,0 @@
// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
// All rights reserved.
//
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
package cache
import (
"sync"
"sync/atomic"
"github.com/syndtr/goleveldb/leveldb/util"
)
// The LLRB implementation were taken from https://github.com/petar/GoLLRB.
// Which contains the following header:
//
// Copyright 2010 Petar Maymounkov. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// lruCache represent a LRU cache state.
type lruCache struct {
mu sync.Mutex
recent lruNode
table map[uint64]*lruNs
capacity int
used, size, alive int
}
// NewLRUCache creates a new initialized LRU cache with the given capacity.
func NewLRUCache(capacity int) Cache {
c := &lruCache{
table: make(map[uint64]*lruNs),
capacity: capacity,
}
c.recent.rNext = &c.recent
c.recent.rPrev = &c.recent
return c
}
func (c *lruCache) Capacity() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.capacity
}
func (c *lruCache) Used() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.used
}
func (c *lruCache) Size() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.size
}
func (c *lruCache) NumObjects() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.alive
}
// SetCapacity set cache capacity.
func (c *lruCache) SetCapacity(capacity int) {
c.mu.Lock()
c.capacity = capacity
c.evict()
c.mu.Unlock()
}
// GetNamespace return namespace object for given id.
func (c *lruCache) GetNamespace(id uint64) Namespace {
c.mu.Lock()
defer c.mu.Unlock()
if ns, ok := c.table[id]; ok {
return ns
}
ns := &lruNs{lru: c, id: id}
c.table[id] = ns
return ns
}
func (c *lruCache) ZapNamespace(id uint64) {
c.mu.Lock()
if ns, exist := c.table[id]; exist {
ns.zapNB()
delete(c.table, id)
}
c.mu.Unlock()
}
func (c *lruCache) PurgeNamespace(id uint64, fin PurgeFin) {
c.mu.Lock()
if ns, exist := c.table[id]; exist {
ns.purgeNB(fin)
}
c.mu.Unlock()
}
// Purge purge entire cache.
func (c *lruCache) Purge(fin PurgeFin) {
c.mu.Lock()
for _, ns := range c.table {
ns.purgeNB(fin)
}
c.mu.Unlock()
}
func (c *lruCache) Zap() {
c.mu.Lock()
for _, ns := range c.table {
ns.zapNB()
}
c.table = make(map[uint64]*lruNs)
c.mu.Unlock()
}
func (c *lruCache) evict() {
top := &c.recent
for n := c.recent.rPrev; c.used > c.capacity && n != top; {
if n.state != nodeEffective {
panic("evicting non effective node")
}
n.state = nodeEvicted
n.rRemove()
n.derefNB()
c.used -= n.charge
n = c.recent.rPrev
}
}
type lruNs struct {
lru *lruCache
id uint64
rbRoot *lruNode
state nsState
}
func (ns *lruNs) rbGetOrCreateNode(h *lruNode, key uint64) (hn, n *lruNode) {
if h == nil {
n = &lruNode{ns: ns, key: key}
return n, n
}
if key < h.key {
hn, n = ns.rbGetOrCreateNode(h.rbLeft, key)
if hn != nil {
h.rbLeft = hn
} else {
return nil, n
}
} else if key > h.key {
hn, n = ns.rbGetOrCreateNode(h.rbRight, key)
if hn != nil {
h.rbRight = hn
} else {
return nil, n
}
} else {
return nil, h
}
if rbIsRed(h.rbRight) && !rbIsRed(h.rbLeft) {
h = rbRotLeft(h)
}
if rbIsRed(h.rbLeft) && rbIsRed(h.rbLeft.rbLeft) {
h = rbRotRight(h)
}
if rbIsRed(h.rbLeft) && rbIsRed(h.rbRight) {
rbFlip(h)
}
return h, n
}
func (ns *lruNs) getOrCreateNode(key uint64) *lruNode {
hn, n := ns.rbGetOrCreateNode(ns.rbRoot, key)
if hn != nil {
ns.rbRoot = hn
ns.rbRoot.rbBlack = true
}
return n
}
func (ns *lruNs) rbGetNode(key uint64) *lruNode {
h := ns.rbRoot
for h != nil {
switch {
case key < h.key:
h = h.rbLeft
case key > h.key:
h = h.rbRight
default:
return h
}
}
return nil
}
func (ns *lruNs) getNode(key uint64) *lruNode {
return ns.rbGetNode(key)
}
func (ns *lruNs) rbDeleteNode(h *lruNode, key uint64) *lruNode {
if h == nil {
return nil
}
if key < h.key {
if h.rbLeft == nil { // key not present. Nothing to delete
return h
}
if !rbIsRed(h.rbLeft) && !rbIsRed(h.rbLeft.rbLeft) {
h = rbMoveLeft(h)
}
h.rbLeft = ns.rbDeleteNode(h.rbLeft, key)
} else {
if rbIsRed(h.rbLeft) {
h = rbRotRight(h)
}
// If @key equals @h.key and no right children at @h
if h.key == key && h.rbRight == nil {
return nil
}
if h.rbRight != nil && !rbIsRed(h.rbRight) && !rbIsRed(h.rbRight.rbLeft) {
h = rbMoveRight(h)
}
// If @key equals @h.key, and (from above) 'h.Right != nil'
if h.key == key {
var x *lruNode
h.rbRight, x = rbDeleteMin(h.rbRight)
if x == nil {
panic("logic")
}
x.rbLeft, h.rbLeft = h.rbLeft, nil
x.rbRight, h.rbRight = h.rbRight, nil
x.rbBlack = h.rbBlack
h = x
} else { // Else, @key is bigger than @h.key
h.rbRight = ns.rbDeleteNode(h.rbRight, key)
}
}
return rbFixup(h)
}
func (ns *lruNs) deleteNode(key uint64) {
ns.rbRoot = ns.rbDeleteNode(ns.rbRoot, key)
if ns.rbRoot != nil {
ns.rbRoot.rbBlack = true
}
}
func (ns *lruNs) rbIterateNodes(h *lruNode, pivot uint64, iter func(n *lruNode) bool) bool {
if h == nil {
return true
}
if h.key >= pivot {
if !ns.rbIterateNodes(h.rbLeft, pivot, iter) {
return false
}
if !iter(h) {
return false
}
}
return ns.rbIterateNodes(h.rbRight, pivot, iter)
}
func (ns *lruNs) iterateNodes(iter func(n *lruNode) bool) {
ns.rbIterateNodes(ns.rbRoot, 0, iter)
}
func (ns *lruNs) Get(key uint64, setf SetFunc) Handle {
ns.lru.mu.Lock()
defer ns.lru.mu.Unlock()
if ns.state != nsEffective {
return nil
}
var n *lruNode
if setf == nil {
n = ns.getNode(key)
if n == nil {
return nil
}
} else {
n = ns.getOrCreateNode(key)
}
switch n.state {
case nodeZero:
charge, value := setf()
if value == nil {
ns.deleteNode(key)
return nil
}
if charge < 0 {
charge = 0
}
n.value = value
n.charge = charge
n.state = nodeEvicted
ns.lru.size += charge
ns.lru.alive++
fallthrough
case nodeEvicted:
if n.charge == 0 {
break
}
// Insert to recent list.
n.state = nodeEffective
n.ref++
ns.lru.used += n.charge
ns.lru.evict()
fallthrough
case nodeEffective:
// Bump to front.
n.rRemove()
n.rInsert(&ns.lru.recent)
case nodeDeleted:
// Do nothing.
default:
panic("invalid state")
}
n.ref++
return &lruHandle{node: n}
}
func (ns *lruNs) Delete(key uint64, fin DelFin) bool {
ns.lru.mu.Lock()
defer ns.lru.mu.Unlock()
if ns.state != nsEffective {
if fin != nil {
fin(false, false)
}
return false
}
n := ns.getNode(key)
if n == nil {
if fin != nil {
fin(false, false)
}
return false
}
switch n.state {
case nodeEffective:
ns.lru.used -= n.charge
n.state = nodeDeleted
n.delfin = fin
n.rRemove()
n.derefNB()
case nodeEvicted:
n.state = nodeDeleted
n.delfin = fin
case nodeDeleted:
if fin != nil {
fin(true, true)
}
return false
default:
panic("invalid state")
}
return true
}
func (ns *lruNs) purgeNB(fin PurgeFin) {
if ns.state == nsEffective {
var nodes []*lruNode
ns.iterateNodes(func(n *lruNode) bool {
nodes = append(nodes, n)
return true
})
for _, n := range nodes {
switch n.state {
case nodeEffective:
ns.lru.used -= n.charge
n.state = nodeDeleted
n.purgefin = fin
n.rRemove()
n.derefNB()
case nodeEvicted:
n.state = nodeDeleted
n.purgefin = fin
case nodeDeleted:
default:
panic("invalid state")
}
}
}
}
func (ns *lruNs) Purge(fin PurgeFin) {
ns.lru.mu.Lock()
ns.purgeNB(fin)
ns.lru.mu.Unlock()
}
func (ns *lruNs) zapNB() {
if ns.state == nsEffective {
ns.state = nsZapped
ns.iterateNodes(func(n *lruNode) bool {
if n.state == nodeEffective {
ns.lru.used -= n.charge
n.rRemove()
}
ns.lru.size -= n.charge
n.state = nodeDeleted
n.fin()
return true
})
ns.rbRoot = nil
}
}
func (ns *lruNs) Zap() {
ns.lru.mu.Lock()
ns.zapNB()
delete(ns.lru.table, ns.id)
ns.lru.mu.Unlock()
}
type lruNode struct {
ns *lruNs
rNext, rPrev *lruNode
rbLeft, rbRight *lruNode
rbBlack bool
key uint64
value interface{}
charge int
ref int
state nodeState
delfin DelFin
purgefin PurgeFin
}
func (n *lruNode) rInsert(at *lruNode) {
x := at.rNext
at.rNext = n
n.rPrev = at
n.rNext = x
x.rPrev = n
}
func (n *lruNode) rRemove() bool {
if n.rPrev == nil {
return false
}
n.rPrev.rNext = n.rNext
n.rNext.rPrev = n.rPrev
n.rPrev = nil
n.rNext = nil
return true
}
func (n *lruNode) fin() {
if r, ok := n.value.(util.Releaser); ok {
r.Release()
}
if n.purgefin != nil {
if n.delfin != nil {
panic("conflicting delete and purge fin")
}
n.purgefin(n.ns.id, n.key)
n.purgefin = nil
} else if n.delfin != nil {
n.delfin(true, false)
n.delfin = nil
}
}
func (n *lruNode) derefNB() {
n.ref--
if n.ref == 0 {
if n.ns.state == nsEffective {
// Remove elemement.
n.ns.deleteNode(n.key)
n.ns.lru.size -= n.charge
n.ns.lru.alive--
n.fin()
}
n.value = nil
} else if n.ref < 0 {
panic("leveldb/cache: lruCache: negative node reference")
}
}
func (n *lruNode) deref() {
n.ns.lru.mu.Lock()
n.derefNB()
n.ns.lru.mu.Unlock()
}
type lruHandle struct {
node *lruNode
once uint32
}
func (h *lruHandle) Value() interface{} {
if atomic.LoadUint32(&h.once) == 0 {
return h.node.value
}
return nil
}
func (h *lruHandle) Release() {
if !atomic.CompareAndSwapUint32(&h.once, 0, 1) {
return
}
h.node.deref()
h.node = nil
}
func rbIsRed(h *lruNode) bool {
if h == nil {
return false
}
return !h.rbBlack
}
func rbRotLeft(h *lruNode) *lruNode {
x := h.rbRight
if x.rbBlack {
panic("rotating a black link")
}
h.rbRight = x.rbLeft
x.rbLeft = h
x.rbBlack = h.rbBlack
h.rbBlack = false
return x
}
func rbRotRight(h *lruNode) *lruNode {
x := h.rbLeft
if x.rbBlack {
panic("rotating a black link")
}
h.rbLeft = x.rbRight
x.rbRight = h
x.rbBlack = h.rbBlack
h.rbBlack = false
return x
}
func rbFlip(h *lruNode) {
h.rbBlack = !h.rbBlack
h.rbLeft.rbBlack = !h.rbLeft.rbBlack
h.rbRight.rbBlack = !h.rbRight.rbBlack
}
func rbMoveLeft(h *lruNode) *lruNode {
rbFlip(h)
if rbIsRed(h.rbRight.rbLeft) {
h.rbRight = rbRotRight(h.rbRight)
h = rbRotLeft(h)
rbFlip(h)
}
return h
}
func rbMoveRight(h *lruNode) *lruNode {
rbFlip(h)
if rbIsRed(h.rbLeft.rbLeft) {
h = rbRotRight(h)
rbFlip(h)
}
return h
}
func rbFixup(h *lruNode) *lruNode {
if rbIsRed(h.rbRight) {
h = rbRotLeft(h)
}
if rbIsRed(h.rbLeft) && rbIsRed(h.rbLeft.rbLeft) {
h = rbRotRight(h)
}
if rbIsRed(h.rbLeft) && rbIsRed(h.rbRight) {
rbFlip(h)
}
return h
}
func rbDeleteMin(h *lruNode) (hn, n *lruNode) {
if h == nil {
return nil, nil
}
if h.rbLeft == nil {
return nil, h
}
if !rbIsRed(h.rbLeft) && !rbIsRed(h.rbLeft.rbLeft) {
h = rbMoveLeft(h)
}
h.rbLeft, n = rbDeleteMin(h.rbLeft)
return rbFixup(h), n
}

View File

@ -9,14 +9,12 @@ package leveldb
import ( import (
"bytes" "bytes"
"fmt" "fmt"
"io"
"math/rand"
"testing"
"github.com/syndtr/goleveldb/leveldb/cache"
"github.com/syndtr/goleveldb/leveldb/filter" "github.com/syndtr/goleveldb/leveldb/filter"
"github.com/syndtr/goleveldb/leveldb/opt" "github.com/syndtr/goleveldb/leveldb/opt"
"github.com/syndtr/goleveldb/leveldb/storage" "github.com/syndtr/goleveldb/leveldb/storage"
"io"
"math/rand"
"testing"
) )
const ctValSize = 1000 const ctValSize = 1000
@ -33,8 +31,8 @@ func newDbCorruptHarnessWopt(t *testing.T, o *opt.Options) *dbCorruptHarness {
func newDbCorruptHarness(t *testing.T) *dbCorruptHarness { func newDbCorruptHarness(t *testing.T) *dbCorruptHarness {
return newDbCorruptHarnessWopt(t, &opt.Options{ return newDbCorruptHarnessWopt(t, &opt.Options{
BlockCache: cache.NewLRUCache(100), BlockCacheCapacity: 100,
Strict: opt.StrictJournalChecksum, Strict: opt.StrictJournalChecksum,
}) })
} }
@ -269,9 +267,9 @@ func TestCorruptDB_TableIndex(t *testing.T) {
func TestCorruptDB_MissingManifest(t *testing.T) { func TestCorruptDB_MissingManifest(t *testing.T) {
rnd := rand.New(rand.NewSource(0x0badda7a)) rnd := rand.New(rand.NewSource(0x0badda7a))
h := newDbCorruptHarnessWopt(t, &opt.Options{ h := newDbCorruptHarnessWopt(t, &opt.Options{
BlockCache: cache.NewLRUCache(100), BlockCacheCapacity: 100,
Strict: opt.StrictJournalChecksum, Strict: opt.StrictJournalChecksum,
WriteBuffer: 1000 * 60, WriteBuffer: 1000 * 60,
}) })
h.build(1000) h.build(1000)

View File

@ -823,8 +823,8 @@ func (db *DB) GetProperty(name string) (value string, err error) {
case p == "blockpool": case p == "blockpool":
value = fmt.Sprintf("%v", db.s.tops.bpool) value = fmt.Sprintf("%v", db.s.tops.bpool)
case p == "cachedblock": case p == "cachedblock":
if bc := db.s.o.GetBlockCache(); bc != nil { if db.s.tops.bcache != nil {
value = fmt.Sprintf("%d", bc.Size()) value = fmt.Sprintf("%d", db.s.tops.bcache.Size())
} else { } else {
value = "<nil>" value = "<nil>"
} }

View File

@ -8,6 +8,7 @@ package leveldb
import ( import (
"container/list" "container/list"
"fmt"
"runtime" "runtime"
"sync" "sync"
"sync/atomic" "sync/atomic"
@ -89,6 +90,10 @@ func (db *DB) newSnapshot() *Snapshot {
return snap return snap
} }
func (snap *Snapshot) String() string {
return fmt.Sprintf("leveldb.Snapshot{%d}", snap.elem.seq)
}
// Get gets the value for the given key. It returns ErrNotFound if // Get gets the value for the given key. It returns ErrNotFound if
// the DB does not contains the key. // the DB does not contains the key.
// //

View File

@ -1271,7 +1271,7 @@ func TestDB_DeletionMarkers2(t *testing.T) {
} }
func TestDB_CompactionTableOpenError(t *testing.T) { func TestDB_CompactionTableOpenError(t *testing.T) {
h := newDbHarnessWopt(t, &opt.Options{CachedOpenFiles: -1}) h := newDbHarnessWopt(t, &opt.Options{OpenFilesCacheCapacity: -1})
defer h.close() defer h.close()
im := 10 im := 10
@ -1629,8 +1629,8 @@ func TestDB_ManualCompaction(t *testing.T) {
func TestDB_BloomFilter(t *testing.T) { func TestDB_BloomFilter(t *testing.T) {
h := newDbHarnessWopt(t, &opt.Options{ h := newDbHarnessWopt(t, &opt.Options{
BlockCache: opt.NoCache, DisableBlockCache: true,
Filter: filter.NewBloomFilter(10), Filter: filter.NewBloomFilter(10),
}) })
defer h.close() defer h.close()
@ -2066,8 +2066,8 @@ func TestDB_GetProperties(t *testing.T) {
func TestDB_GoleveldbIssue72and83(t *testing.T) { func TestDB_GoleveldbIssue72and83(t *testing.T) {
h := newDbHarnessWopt(t, &opt.Options{ h := newDbHarnessWopt(t, &opt.Options{
WriteBuffer: 1 * opt.MiB, WriteBuffer: 1 * opt.MiB,
CachedOpenFiles: 3, OpenFilesCacheCapacity: 3,
}) })
defer h.close() defer h.close()
@ -2200,7 +2200,7 @@ func TestDB_GoleveldbIssue72and83(t *testing.T) {
func TestDB_TransientError(t *testing.T) { func TestDB_TransientError(t *testing.T) {
h := newDbHarnessWopt(t, &opt.Options{ h := newDbHarnessWopt(t, &opt.Options{
WriteBuffer: 128 * opt.KiB, WriteBuffer: 128 * opt.KiB,
CachedOpenFiles: 3, OpenFilesCacheCapacity: 3,
DisableCompactionBackoff: true, DisableCompactionBackoff: true,
}) })
defer h.close() defer h.close()
@ -2410,7 +2410,7 @@ func TestDB_TableCompactionBuilder(t *testing.T) {
CompactionTableSize: 43 * opt.KiB, CompactionTableSize: 43 * opt.KiB,
CompactionExpandLimitFactor: 1, CompactionExpandLimitFactor: 1,
CompactionGPOverlapsFactor: 1, CompactionGPOverlapsFactor: 1,
BlockCache: opt.NoCache, DisableBlockCache: true,
} }
s, err := newSession(stor, o) s, err := newSession(stor, o)
if err != nil { if err != nil {

View File

@ -112,9 +112,9 @@ func (db *DB) flush(n int) (mem *memDB, nn int, err error) {
db.writeDelay += time.Since(start) db.writeDelay += time.Since(start)
db.writeDelayN++ db.writeDelayN++
} else if db.writeDelayN > 0 { } else if db.writeDelayN > 0 {
db.logf("db@write was delayed N·%d T·%v", db.writeDelayN, db.writeDelay)
db.writeDelay = 0 db.writeDelay = 0
db.writeDelayN = 0 db.writeDelayN = 0
db.logf("db@write was delayed N·%d T·%v", db.writeDelayN, db.writeDelay)
} }
return return
} }

View File

@ -17,14 +17,14 @@ import (
var _ = testutil.Defer(func() { var _ = testutil.Defer(func() {
Describe("Leveldb external", func() { Describe("Leveldb external", func() {
o := &opt.Options{ o := &opt.Options{
BlockCache: opt.NoCache, DisableBlockCache: true,
BlockRestartInterval: 5, BlockRestartInterval: 5,
BlockSize: 80, BlockSize: 80,
Compression: opt.NoCompression, Compression: opt.NoCompression,
CachedOpenFiles: -1, OpenFilesCacheCapacity: -1,
Strict: opt.StrictAll, Strict: opt.StrictAll,
WriteBuffer: 1000, WriteBuffer: 1000,
CompactionTableSize: 2000, CompactionTableSize: 2000,
} }
Describe("write test", func() { Describe("write test", func() {

View File

@ -106,7 +106,7 @@ func (ik iKey) assert() {
panic("leveldb: nil iKey") panic("leveldb: nil iKey")
} }
if len(ik) < 8 { if len(ik) < 8 {
panic(fmt.Sprintf("leveldb: iKey %q, len=%d: invalid length", ik, len(ik))) panic(fmt.Sprintf("leveldb: iKey %q, len=%d: invalid length", []byte(ik), len(ik)))
} }
} }
@ -124,7 +124,7 @@ func (ik iKey) parseNum() (seq uint64, kt kType) {
num := ik.num() num := ik.num()
seq, kt = uint64(num>>8), kType(num&0xff) seq, kt = uint64(num>>8), kType(num&0xff)
if kt > ktVal { if kt > ktVal {
panic(fmt.Sprintf("leveldb: iKey %q, len=%d: invalid type %#x", ik, len(ik), kt)) panic(fmt.Sprintf("leveldb: iKey %q, len=%d: invalid type %#x", []byte(ik), len(ik), kt))
} }
return return
} }

View File

@ -20,8 +20,9 @@ const (
GiB = MiB * 1024 GiB = MiB * 1024
) )
const ( var (
DefaultBlockCacheSize = 8 * MiB DefaultBlockCacher = LRUCacher
DefaultBlockCacheCapacity = 8 * MiB
DefaultBlockRestartInterval = 16 DefaultBlockRestartInterval = 16
DefaultBlockSize = 4 * KiB DefaultBlockSize = 4 * KiB
DefaultCompactionExpandLimitFactor = 25 DefaultCompactionExpandLimitFactor = 25
@ -33,7 +34,8 @@ const (
DefaultCompactionTotalSize = 10 * MiB DefaultCompactionTotalSize = 10 * MiB
DefaultCompactionTotalSizeMultiplier = 10.0 DefaultCompactionTotalSizeMultiplier = 10.0
DefaultCompressionType = SnappyCompression DefaultCompressionType = SnappyCompression
DefaultCachedOpenFiles = 500 DefaultOpenFilesCacher = LRUCacher
DefaultOpenFilesCacheCapacity = 500
DefaultMaxMemCompationLevel = 2 DefaultMaxMemCompationLevel = 2
DefaultNumLevel = 7 DefaultNumLevel = 7
DefaultWriteBuffer = 4 * MiB DefaultWriteBuffer = 4 * MiB
@ -41,22 +43,33 @@ const (
DefaultWriteL0SlowdownTrigger = 8 DefaultWriteL0SlowdownTrigger = 8
) )
type noCache struct{} // Cacher is a caching algorithm.
type Cacher interface {
New(capacity int) cache.Cacher
}
func (noCache) SetCapacity(capacity int) {} type CacherFunc struct {
func (noCache) Capacity() int { return 0 } NewFunc func(capacity int) cache.Cacher
func (noCache) Used() int { return 0 } }
func (noCache) Size() int { return 0 }
func (noCache) NumObjects() int { return 0 }
func (noCache) GetNamespace(id uint64) cache.Namespace { return nil }
func (noCache) PurgeNamespace(id uint64, fin cache.PurgeFin) {}
func (noCache) ZapNamespace(id uint64) {}
func (noCache) Purge(fin cache.PurgeFin) {}
func (noCache) Zap() {}
var NoCache cache.Cache = noCache{} func (f *CacherFunc) New(capacity int) cache.Cacher {
if f.NewFunc != nil {
return f.NewFunc(capacity)
}
return nil
}
// Compression is the per-block compression algorithm to use. func noCacher(int) cache.Cacher { return nil }
var (
// LRUCacher is the LRU-cache algorithm.
LRUCacher = &CacherFunc{cache.NewLRU}
// NoCacher is the value to disable caching algorithm.
NoCacher = &CacherFunc{}
)
// Compression is the 'sorted table' block compression algorithm to use.
type Compression uint type Compression uint
func (c Compression) String() string { func (c Compression) String() string {
@ -133,16 +146,17 @@ type Options struct {
// The default value is nil // The default value is nil
AltFilters []filter.Filter AltFilters []filter.Filter
// BlockCache provides per-block caching for LevelDB. Specify NoCache to // BlockCacher provides cache algorithm for LevelDB 'sorted table' block caching.
// disable block caching. // Specify NoCacher to disable caching algorithm.
// //
// By default LevelDB will create LRU-cache with capacity of BlockCacheSize. // The default value is LRUCacher.
BlockCache cache.Cache BlockCacher Cacher
// BlockCacheSize defines the capacity of the default 'block cache'. // BlockCacheCapacity defines the capacity of the 'sorted table' block caching.
// Use -1 for zero, this has same effect with specifying NoCacher to BlockCacher.
// //
// The default value is 8MiB. // The default value is 8MiB.
BlockCacheSize int BlockCacheCapacity int
// BlockRestartInterval is the number of keys between restart points for // BlockRestartInterval is the number of keys between restart points for
// delta encoding of keys. // delta encoding of keys.
@ -156,13 +170,6 @@ type Options struct {
// The default value is 4KiB. // The default value is 4KiB.
BlockSize int BlockSize int
// CachedOpenFiles defines number of open files to kept around when not
// in-use, the counting includes still in-use files.
// Set this to negative value to disable caching.
//
// The default value is 500.
CachedOpenFiles int
// CompactionExpandLimitFactor limits compaction size after expanded. // CompactionExpandLimitFactor limits compaction size after expanded.
// This will be multiplied by table size limit at compaction target level. // This will be multiplied by table size limit at compaction target level.
// //
@ -237,11 +244,17 @@ type Options struct {
// The default value uses the same ordering as bytes.Compare. // The default value uses the same ordering as bytes.Compare.
Comparer comparer.Comparer Comparer comparer.Comparer
// Compression defines the per-block compression to use. // Compression defines the 'sorted table' block compression to use.
// //
// The default value (DefaultCompression) uses snappy compression. // The default value (DefaultCompression) uses snappy compression.
Compression Compression Compression Compression
// DisableBlockCache allows disable use of cache.Cache functionality on
// 'sorted table' block.
//
// The default value is false.
DisableBlockCache bool
// DisableCompactionBackoff allows disable compaction retry backoff. // DisableCompactionBackoff allows disable compaction retry backoff.
// //
// The default value is false. // The default value is false.
@ -288,6 +301,18 @@ type Options struct {
// The default is 7. // The default is 7.
NumLevel int NumLevel int
// OpenFilesCacher provides cache algorithm for open files caching.
// Specify NoCacher to disable caching algorithm.
//
// The default value is LRUCacher.
OpenFilesCacher Cacher
// OpenFilesCacheCapacity defines the capacity of the open files caching.
// Use -1 for zero, this has same effect with specifying NoCacher to OpenFilesCacher.
//
// The default value is 500.
OpenFilesCacheCapacity int
// Strict defines the DB strict level. // Strict defines the DB strict level.
Strict Strict Strict Strict
@ -320,18 +345,22 @@ func (o *Options) GetAltFilters() []filter.Filter {
return o.AltFilters return o.AltFilters
} }
func (o *Options) GetBlockCache() cache.Cache { func (o *Options) GetBlockCacher() Cacher {
if o == nil { if o == nil || o.BlockCacher == nil {
return DefaultBlockCacher
} else if o.BlockCacher == NoCacher {
return nil return nil
} }
return o.BlockCache return o.BlockCacher
} }
func (o *Options) GetBlockCacheSize() int { func (o *Options) GetBlockCacheCapacity() int {
if o == nil || o.BlockCacheSize <= 0 { if o == nil || o.BlockCacheCapacity <= 0 {
return DefaultBlockCacheSize return DefaultBlockCacheCapacity
} else if o.BlockCacheCapacity == -1 {
return 0
} }
return o.BlockCacheSize return o.BlockCacheCapacity
} }
func (o *Options) GetBlockRestartInterval() int { func (o *Options) GetBlockRestartInterval() int {
@ -348,15 +377,6 @@ func (o *Options) GetBlockSize() int {
return o.BlockSize return o.BlockSize
} }
func (o *Options) GetCachedOpenFiles() int {
if o == nil || o.CachedOpenFiles == 0 {
return DefaultCachedOpenFiles
} else if o.CachedOpenFiles < 0 {
return 0
}
return o.CachedOpenFiles
}
func (o *Options) GetCompactionExpandLimit(level int) int { func (o *Options) GetCompactionExpandLimit(level int) int {
factor := DefaultCompactionExpandLimitFactor factor := DefaultCompactionExpandLimitFactor
if o != nil && o.CompactionExpandLimitFactor > 0 { if o != nil && o.CompactionExpandLimitFactor > 0 {
@ -494,6 +514,25 @@ func (o *Options) GetNumLevel() int {
return o.NumLevel return o.NumLevel
} }
func (o *Options) GetOpenFilesCacher() Cacher {
if o == nil || o.OpenFilesCacher == nil {
return DefaultOpenFilesCacher
}
if o.OpenFilesCacher == NoCacher {
return nil
}
return o.OpenFilesCacher
}
func (o *Options) GetOpenFilesCacheCapacity() int {
if o == nil || o.OpenFilesCacheCapacity <= 0 {
return DefaultOpenFilesCacheCapacity
} else if o.OpenFilesCacheCapacity == -1 {
return 0
}
return o.OpenFilesCacheCapacity
}
func (o *Options) GetStrict(strict Strict) bool { func (o *Options) GetStrict(strict Strict) bool {
if o == nil || o.Strict == 0 { if o == nil || o.Strict == 0 {
return DefaultStrict&strict != 0 return DefaultStrict&strict != 0

View File

@ -7,7 +7,6 @@
package leveldb package leveldb
import ( import (
"github.com/syndtr/goleveldb/leveldb/cache"
"github.com/syndtr/goleveldb/leveldb/filter" "github.com/syndtr/goleveldb/leveldb/filter"
"github.com/syndtr/goleveldb/leveldb/opt" "github.com/syndtr/goleveldb/leveldb/opt"
) )
@ -32,13 +31,6 @@ func (s *session) setOptions(o *opt.Options) {
no.AltFilters[i] = &iFilter{filter} no.AltFilters[i] = &iFilter{filter}
} }
} }
// Block cache.
switch o.GetBlockCache() {
case nil:
no.BlockCache = cache.NewLRUCache(o.GetBlockCacheSize())
case opt.NoCache:
no.BlockCache = nil
}
// Comparer. // Comparer.
s.icmp = &iComparer{o.GetComparer()} s.icmp = &iComparer{o.GetComparer()}
no.Comparer = s.icmp no.Comparer = s.icmp

View File

@ -73,7 +73,7 @@ func newSession(stor storage.Storage, o *opt.Options) (s *session, err error) {
stCompPtrs: make([]iKey, o.GetNumLevel()), stCompPtrs: make([]iKey, o.GetNumLevel()),
} }
s.setOptions(o) s.setOptions(o)
s.tops = newTableOps(s, s.o.GetCachedOpenFiles()) s.tops = newTableOps(s)
s.setVersion(newVersion(s)) s.setVersion(newVersion(s))
s.log("log@legend F·NumFile S·FileSize N·Entry C·BadEntry B·BadBlock Ke·KeyError D·DroppedEntry L·Level Q·SeqNum T·TimeElapsed") s.log("log@legend F·NumFile S·FileSize N·Entry C·BadEntry B·BadBlock Ke·KeyError D·DroppedEntry L·Level Q·SeqNum T·TimeElapsed")
return return
@ -82,9 +82,6 @@ func newSession(stor storage.Storage, o *opt.Options) (s *session, err error) {
// Close session. // Close session.
func (s *session) close() { func (s *session) close() {
s.tops.close() s.tops.close()
if bc := s.o.GetBlockCache(); bc != nil {
bc.Purge(nil)
}
if s.manifest != nil { if s.manifest != nil {
s.manifest.Close() s.manifest.Close()
} }

View File

@ -221,7 +221,7 @@ func (fs *fileStorage) GetManifest() (f File, err error) {
fs.log(fmt.Sprintf("skipping %s: invalid file name", fn)) fs.log(fmt.Sprintf("skipping %s: invalid file name", fn))
continue continue
} }
if _, e1 := strconv.ParseUint(fn[7:], 10, 0); e1 != nil { if _, e1 := strconv.ParseUint(fn[8:], 10, 0); e1 != nil {
fs.log(fmt.Sprintf("skipping %s: invalid file num: %v", fn, e1)) fs.log(fmt.Sprintf("skipping %s: invalid file num: %v", fn, e1))
continue continue
} }

View File

@ -286,10 +286,10 @@ func (x *tFilesSortByNum) Less(i, j int) bool {
// Table operations. // Table operations.
type tOps struct { type tOps struct {
s *session s *session
cache cache.Cache cache *cache.Cache
cacheNS cache.Namespace bcache *cache.Cache
bpool *util.BufferPool bpool *util.BufferPool
} }
// Creates an empty table and returns table writer. // Creates an empty table and returns table writer.
@ -338,26 +338,28 @@ func (t *tOps) createFrom(src iterator.Iterator) (f *tFile, n int, err error) {
// Opens table. It returns a cache handle, which should // Opens table. It returns a cache handle, which should
// be released after use. // be released after use.
func (t *tOps) open(f *tFile) (ch cache.Handle, err error) { func (t *tOps) open(f *tFile) (ch *cache.Handle, err error) {
num := f.file.Num() num := f.file.Num()
ch = t.cacheNS.Get(num, func() (charge int, value interface{}) { ch = t.cache.Get(0, num, func() (size int, value cache.Value) {
var r storage.Reader var r storage.Reader
r, err = f.file.Open() r, err = f.file.Open()
if err != nil { if err != nil {
return 0, nil return 0, nil
} }
var bcacheNS cache.Namespace var bcache *cache.CacheGetter
if bc := t.s.o.GetBlockCache(); bc != nil { if t.bcache != nil {
bcacheNS = bc.GetNamespace(num) bcache = &cache.CacheGetter{Cache: t.bcache, NS: num}
} }
var tr *table.Reader var tr *table.Reader
tr, err = table.NewReader(r, int64(f.size), storage.NewFileInfo(f.file), bcacheNS, t.bpool, t.s.o.Options) tr, err = table.NewReader(r, int64(f.size), storage.NewFileInfo(f.file), bcache, t.bpool, t.s.o.Options)
if err != nil { if err != nil {
r.Close() r.Close()
return 0, nil return 0, nil
} }
return 1, tr return 1, tr
}) })
if ch == nil && err == nil { if ch == nil && err == nil {
err = ErrClosed err = ErrClosed
@ -412,16 +414,14 @@ func (t *tOps) newIterator(f *tFile, slice *util.Range, ro *opt.ReadOptions) ite
// no one use the the table. // no one use the the table.
func (t *tOps) remove(f *tFile) { func (t *tOps) remove(f *tFile) {
num := f.file.Num() num := f.file.Num()
t.cacheNS.Delete(num, func(exist, pending bool) { t.cache.Delete(0, num, func() {
if !pending { if err := f.file.Remove(); err != nil {
if err := f.file.Remove(); err != nil { t.s.logf("table@remove removing @%d %q", num, err)
t.s.logf("table@remove removing @%d %q", num, err) } else {
} else { t.s.logf("table@remove removed @%d", num)
t.s.logf("table@remove removed @%d", num) }
} if t.bcache != nil {
if bc := t.s.o.GetBlockCache(); bc != nil { t.bcache.EvictNS(num)
bc.ZapNamespace(num)
}
} }
}) })
} }
@ -429,18 +429,34 @@ func (t *tOps) remove(f *tFile) {
// Closes the table ops instance. It will close all tables, // Closes the table ops instance. It will close all tables,
// regadless still used or not. // regadless still used or not.
func (t *tOps) close() { func (t *tOps) close() {
t.cache.Zap()
t.bpool.Close() t.bpool.Close()
t.cache.Close()
if t.bcache != nil {
t.bcache.Close()
}
} }
// Creates new initialized table ops instance. // Creates new initialized table ops instance.
func newTableOps(s *session, cacheCap int) *tOps { func newTableOps(s *session) *tOps {
c := cache.NewLRUCache(cacheCap) var (
cacher cache.Cacher
bcache *cache.Cache
)
if s.o.GetOpenFilesCacheCapacity() > 0 {
cacher = cache.NewLRU(s.o.GetOpenFilesCacheCapacity())
}
if !s.o.DisableBlockCache {
var bcacher cache.Cacher
if s.o.GetBlockCacheCapacity() > 0 {
bcacher = cache.NewLRU(s.o.GetBlockCacheCapacity())
}
bcache = cache.NewCache(bcacher)
}
return &tOps{ return &tOps{
s: s, s: s,
cache: c, cache: cache.NewCache(cacher),
cacheNS: c.GetNamespace(0), bcache: bcache,
bpool: util.NewBufferPool(s.o.GetBlockSize() + 5), bpool: util.NewBufferPool(s.o.GetBlockSize() + 5),
} }
} }

View File

@ -509,7 +509,7 @@ type Reader struct {
mu sync.RWMutex mu sync.RWMutex
fi *storage.FileInfo fi *storage.FileInfo
reader io.ReaderAt reader io.ReaderAt
cache cache.Namespace cache *cache.CacheGetter
err error err error
bpool *util.BufferPool bpool *util.BufferPool
// Options // Options
@ -613,18 +613,22 @@ func (r *Reader) readBlock(bh blockHandle, verifyChecksum bool) (*block, error)
func (r *Reader) readBlockCached(bh blockHandle, verifyChecksum, fillCache bool) (*block, util.Releaser, error) { func (r *Reader) readBlockCached(bh blockHandle, verifyChecksum, fillCache bool) (*block, util.Releaser, error) {
if r.cache != nil { if r.cache != nil {
var err error var (
ch := r.cache.Get(bh.offset, func() (charge int, value interface{}) { err error
if !fillCache { ch *cache.Handle
return 0, nil )
} if fillCache {
var b *block ch = r.cache.Get(bh.offset, func() (size int, value cache.Value) {
b, err = r.readBlock(bh, verifyChecksum) var b *block
if err != nil { b, err = r.readBlock(bh, verifyChecksum)
return 0, nil if err != nil {
} return 0, nil
return cap(b.data), b }
}) return cap(b.data), b
})
} else {
ch = r.cache.Get(bh.offset, nil)
}
if ch != nil { if ch != nil {
b, ok := ch.Value().(*block) b, ok := ch.Value().(*block)
if !ok { if !ok {
@ -667,18 +671,22 @@ func (r *Reader) readFilterBlock(bh blockHandle) (*filterBlock, error) {
func (r *Reader) readFilterBlockCached(bh blockHandle, fillCache bool) (*filterBlock, util.Releaser, error) { func (r *Reader) readFilterBlockCached(bh blockHandle, fillCache bool) (*filterBlock, util.Releaser, error) {
if r.cache != nil { if r.cache != nil {
var err error var (
ch := r.cache.Get(bh.offset, func() (charge int, value interface{}) { err error
if !fillCache { ch *cache.Handle
return 0, nil )
} if fillCache {
var b *filterBlock ch = r.cache.Get(bh.offset, func() (size int, value cache.Value) {
b, err = r.readFilterBlock(bh) var b *filterBlock
if err != nil { b, err = r.readFilterBlock(bh)
return 0, nil if err != nil {
} return 0, nil
return cap(b.data), b }
}) return cap(b.data), b
})
} else {
ch = r.cache.Get(bh.offset, nil)
}
if ch != nil { if ch != nil {
b, ok := ch.Value().(*filterBlock) b, ok := ch.Value().(*filterBlock)
if !ok { if !ok {
@ -980,7 +988,7 @@ func (r *Reader) Release() {
// The fi, cache and bpool is optional and can be nil. // The fi, cache and bpool is optional and can be nil.
// //
// The returned table reader instance is goroutine-safe. // The returned table reader instance is goroutine-safe.
func NewReader(f io.ReaderAt, size int64, fi *storage.FileInfo, cache cache.Namespace, bpool *util.BufferPool, o *opt.Options) (*Reader, error) { func NewReader(f io.ReaderAt, size int64, fi *storage.FileInfo, cache *cache.CacheGetter, bpool *util.BufferPool, o *opt.Options) (*Reader, error) {
if f == nil { if f == nil {
return nil, errors.New("leveldb/table: nil file") return nil, errors.New("leveldb/table: nil file")
} }

View File

@ -16,7 +16,7 @@ import (
"unicode/utf8" "unicode/utf8"
) )
type lowerCaseASCII struct{ transform.NopResetter } type lowerCaseASCII struct{ NopResetter }
func (lowerCaseASCII) Transform(dst, src []byte, atEOF bool) (nDst, nSrc int, err error) { func (lowerCaseASCII) Transform(dst, src []byte, atEOF bool) (nDst, nSrc int, err error) {
n := len(src) n := len(src)
@ -34,7 +34,7 @@ func (lowerCaseASCII) Transform(dst, src []byte, atEOF bool) (nDst, nSrc int, er
var errYouMentionedX = errors.New("you mentioned X") var errYouMentionedX = errors.New("you mentioned X")
type dontMentionX struct{ transform.NopResetter } type dontMentionX struct{ NopResetter }
func (dontMentionX) Transform(dst, src []byte, atEOF bool) (nDst, nSrc int, err error) { func (dontMentionX) Transform(dst, src []byte, atEOF bool) (nDst, nSrc int, err error) {
n := len(src) n := len(src)
@ -52,7 +52,7 @@ func (dontMentionX) Transform(dst, src []byte, atEOF bool) (nDst, nSrc int, err
// doublerAtEOF is a strange Transformer that transforms "this" to "tthhiiss", // doublerAtEOF is a strange Transformer that transforms "this" to "tthhiiss",
// but only if atEOF is true. // but only if atEOF is true.
type doublerAtEOF struct{ transform.NopResetter } type doublerAtEOF struct{ NopResetter }
func (doublerAtEOF) Transform(dst, src []byte, atEOF bool) (nDst, nSrc int, err error) { func (doublerAtEOF) Transform(dst, src []byte, atEOF bool) (nDst, nSrc int, err error) {
if !atEOF { if !atEOF {
@ -71,7 +71,7 @@ func (doublerAtEOF) Transform(dst, src []byte, atEOF bool) (nDst, nSrc int, err
// rleDecode and rleEncode implement a toy run-length encoding: "aabbbbbbbbbb" // rleDecode and rleEncode implement a toy run-length encoding: "aabbbbbbbbbb"
// is encoded as "2a10b". The decoding is assumed to not contain any numbers. // is encoded as "2a10b". The decoding is assumed to not contain any numbers.
type rleDecode struct{ transform.NopResetter } type rleDecode struct{ NopResetter }
func (rleDecode) Transform(dst, src []byte, atEOF bool) (nDst, nSrc int, err error) { func (rleDecode) Transform(dst, src []byte, atEOF bool) (nDst, nSrc int, err error) {
loop: loop:
@ -104,7 +104,7 @@ loop:
} }
type rleEncode struct { type rleEncode struct {
transform.NopResetter NopResetter
// allowStutter means that "xxxxxxxx" can be encoded as "5x3x" // allowStutter means that "xxxxxxxx" can be encoded as "5x3x"
// instead of always as "8x". // instead of always as "8x".

View File

@ -341,7 +341,7 @@ func main() {
if doUpgrade { if doUpgrade {
// Use leveldb database locks to protect against concurrent upgrades // Use leveldb database locks to protect against concurrent upgrades
_, err = leveldb.OpenFile(filepath.Join(confDir, "index"), &opt.Options{CachedOpenFiles: 100}) _, err = leveldb.OpenFile(filepath.Join(confDir, "index"), &opt.Options{OpenFilesCacheCapacity: 100})
if err != nil { if err != nil {
l.Fatalln("Cannot upgrade, database seems to be locked. Is another copy of Syncthing already running?") l.Fatalln("Cannot upgrade, database seems to be locked. Is another copy of Syncthing already running?")
} }
@ -489,7 +489,7 @@ func syncthingMain() {
readRateLimit = ratelimit.NewBucketWithRate(float64(1000*opts.MaxRecvKbps), int64(5*1000*opts.MaxRecvKbps)) readRateLimit = ratelimit.NewBucketWithRate(float64(1000*opts.MaxRecvKbps), int64(5*1000*opts.MaxRecvKbps))
} }
db, err := leveldb.OpenFile(filepath.Join(confDir, "index"), &opt.Options{CachedOpenFiles: 100}) db, err := leveldb.OpenFile(filepath.Join(confDir, "index"), &opt.Options{OpenFilesCacheCapacity: 100})
if err != nil { if err != nil {
l.Fatalln("Cannot open database:", err, "- Is another copy of Syncthing already running?") l.Fatalln("Cannot open database:", err, "- Is another copy of Syncthing already running?")
} }