Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support LRU and LFU eviction #167

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions aof/rdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func (persister *Persister) GenerateRDBForReplication(rdbFilename string, listen
if err != nil {
return err
}
ctx.tmpFile.Close()
err = os.Rename(ctx.tmpFile.Name(), rdbFilename)
if err != nil {
return err
Expand Down
9 changes: 9 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,15 @@ type ServerProperties struct {

// config file path
CfPath string `cfg:"cf,omitempty"`

//eviction
MaxmemoryPolicy string `cfg:"maxmemory-policy"`
LfuDecayTime int32 `cfg:"lfu-decay-time"`
LfuLogFactor int32 `cfg:"lfu-log-factor"`
LfuInitVal uint8 `cfg:"lfu-init-val"`
MaxmemorySamples int `cfg:"maxmemory-samples"`
// MB
Maxmemory uint64 `cfg:"maxmemory"`
}

type ServerInfo struct {
Expand Down
171 changes: 171 additions & 0 deletions database/command.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
package database

import (
"github.com/hdt3213/godis/interface/redis"
"github.com/hdt3213/godis/redis/protocol"
"strings"
)

func init() {

}

var commandTable = make(map[string]*godisCommand)

const (
Write = "write"
Readonly = "readonly"
Denyoom = "denyoom"
Admin = "admin"
Pubsub = "pubsub"
Noscript = "noscript"
Random = "random"
SortForScript = "sortforscript"
Loading = "loading"
Stale = "stale"
SkipMonitor = "skip_monitor"
Asking = "asking"
Fast = "fast"
Movablekeys = "movablekeys"
)

type godisCommand struct {
name string
arity int
signs []string
firstKey int
lastKey int
stepNumber int
prepare PreFunc
}

func RegisterGodisCommand(name string, arity int, signs []string, firstKey int, lastKey int, stepNumber int, prepare PreFunc) {
name = strings.ToLower(name)
commandTable[name] = &godisCommand{
name: name,
arity: arity,
signs: signs,
firstKey: firstKey,
lastKey: lastKey,
stepNumber: stepNumber,
prepare: prepare,
}
}

func execCommand(args [][]byte) redis.Reply {
n := len(args)
if n > 1 {
subCommand := strings.ToUpper(string(args[1]))
if subCommand == "INFO" {
return getCommands(args[2:])
} else if subCommand == "COUNT" {
return protocol.MakeIntReply(int64(len(commandTable)))
} else if subCommand == "GETKEYS" {
if n < 2 {
return protocol.MakeErrReply("Unknown subcommand or wrong number of arguments for '" + subCommand + "'")
}
return getKeys(args[2:])
} else {
return protocol.MakeErrReply("Unknow subcomand or wrong number of arguments for '" + subCommand + "'")
}
} else {
return getAllGodisCommandReply()
}
}

func getKeys(args [][]byte) redis.Reply {
key := string(args[0])
command, ok := commandTable[key]
if !ok {
return protocol.MakeErrReply("Invalid command specified")
}
arity := command.arity
if arity > 0 {
if len(args) != arity {
return protocol.MakeErrReply("Invalid number of arguments specified for command")
}
} else {
if len(args) < -arity {
return protocol.MakeErrReply("Invalid number of arguments specified for command")
}
}

prepare := command.prepare
if prepare == nil {
return protocol.MakeErrReply("The command has no key arguments")
}
writeKeys, readKeys := prepare(args[1:])

keys := append(writeKeys, readKeys...)
replies := make([]redis.Reply, len(keys))
for i, key := range keys {
replies[i] = protocol.MakeBulkReply([]byte(key))
}
return protocol.MakeMultiRawReply(replies)
}

func getCommands(args [][]byte) redis.Reply {
replies := make([]redis.Reply, len(args))

for i, v := range args {
reply, ok := commandTable[string(v)]
if ok {
replies[i] = reply.ToReply()
} else {
replies[i] = protocol.MakeNullBulkReply()
}
}

return protocol.MakeMultiRawReply(replies)
}

func getAllGodisCommandReply() redis.Reply {
replies := make([]redis.Reply, len(commandTable))
i := 0
for _, v := range commandTable {
replies[i] = v.ToReply()
i++
}
return protocol.MakeMultiRawReply(replies)
}

func (g *godisCommand) ToReply() redis.Reply {
args := make([]redis.Reply, 6)
args[0] = protocol.MakeBulkReply([]byte(g.name))
args[1] = protocol.MakeIntReply(int64(g.arity))
signs := make([]redis.Reply, len(g.signs))
for i, v := range g.signs {
signs[i] = protocol.MakeStatusReply(v)
}
args[2] = protocol.MakeMultiRawReply(signs)
args[3] = protocol.MakeIntReply(int64(g.firstKey))
args[4] = protocol.MakeIntReply(int64(g.lastKey))
args[5] = protocol.MakeIntReply(int64(g.stepNumber))

return protocol.MakeMultiRawReply(args)
}

func init() {
RegisterGodisCommand("Command", 0, []string{Random, Loading, Stale}, 0, 0, 0, nil)

RegisterGodisCommand("Keys", 2, []string{Readonly, SortForScript}, 0, 0, 0, nil)
RegisterGodisCommand("Auth", 2, []string{Noscript, Loading, Stale, SkipMonitor, Fast}, 0, 0, 0, nil)
RegisterGodisCommand("Info", -1, []string{Random, Loading, Stale}, 0, 0, 0, nil)
RegisterGodisCommand("Slaveof", 3, []string{Admin, Noscript, Stale}, 0, 0, 0, nil)
RegisterGodisCommand("Subscribe", -2, []string{Pubsub, Noscript, Loading, Stale}, 0, 0, 0, nil)
RegisterGodisCommand("Publish", 3, []string{Pubsub, Noscript, Loading, Fast}, 0, 0, 0, nil)
RegisterGodisCommand("FlushAll", -1, []string{Write}, 0, 0, 0, nil)
RegisterGodisCommand("FlushDb", -1, []string{Write}, 0, 0, 0, nil)
RegisterGodisCommand("Save", 1, []string{Admin, Noscript}, 0, 0, 0, nil)
RegisterGodisCommand("BgSave", 1, []string{Admin, Noscript}, 0, 0, 0, nil)
RegisterGodisCommand("Select", 2, []string{Loading, Fast}, 0, 0, 0, nil)
RegisterGodisCommand("Replconf", -1, []string{Admin, Noscript, Loading, Stale}, 0, 0, 0, nil)
RegisterGodisCommand("Replconf", 3, []string{Readonly, Admin, Noscript}, 0, 0, 0, nil)

// transaction command
RegisterGodisCommand("Multi", 1, []string{Noscript, Fast}, 0, 0, 0, nil)
RegisterGodisCommand("Discard", 1, []string{Noscript, Fast}, 0, 0, 0, nil)
RegisterGodisCommand("Exec", 1, []string{Noscript, SkipMonitor}, 0, 0, 0, nil)
RegisterGodisCommand("Watch", 1, []string{Noscript, Fast}, 1, -1, 1, readAllKeys)

}
124 changes: 117 additions & 7 deletions database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,23 @@
package database

import (
"strings"
"time"

"github.com/hdt3213/godis/config"
"github.com/hdt3213/godis/database/eviction"

"github.com/hdt3213/godis/datastruct/dict"
"github.com/hdt3213/godis/interface/database"
"github.com/hdt3213/godis/interface/redis"
"github.com/hdt3213/godis/lib/logger"
"github.com/hdt3213/godis/lib/mem"
"github.com/hdt3213/godis/lib/timewheel"
"github.com/hdt3213/godis/lib/utils"
"github.com/hdt3213/godis/redis/protocol"

"strings"
"time"
"runtime"

)

const (
Expand All @@ -26,8 +34,13 @@ type DB struct {
// key -> expireTime (time.Time)
ttlMap *dict.ConcurrentDict
// key -> version(uint32)

// key -> eviction(uint32)
evictionMap dict.Dict
evictionPolicy eviction.MaxmemoryPolicy
versionMap *dict.ConcurrentDict


// addaof is used to add command to aof
addAof func(CmdLine)
}
Expand All @@ -50,10 +63,14 @@ type UndoFunc func(db *DB, args [][]byte) []CmdLine
// makeDB create DB instance
func makeDB() *DB {
db := &DB{
data: dict.MakeConcurrent(dataDictSize),
ttlMap: dict.MakeConcurrent(ttlDictSize),
versionMap: dict.MakeConcurrent(dataDictSize),
addAof: func(line CmdLine) {},

data: dict.MakeConcurrent(dataDictSize),
ttlMap: dict.MakeConcurrent(ttlDictSize),
versionMap: dict.MakeConcurrent(dataDictSize),
evictionMap: dict.MakeConcurrent(dataDictSize),
evictionPolicy: makeEvictionPolicy(),
addAof: func(line CmdLine) {},

}
return db
}
Expand All @@ -65,6 +82,8 @@ func makeBasicDB() *DB {
ttlMap: dict.MakeConcurrent(ttlDictSize),
versionMap: dict.MakeConcurrent(dataDictSize),
addAof: func(line CmdLine) {},
evictionMap: dict.MakeSimple(),
evictionPolicy: makeEvictionPolicy(),
}
return db
}
Expand Down Expand Up @@ -115,7 +134,12 @@ func (db *DB) execNormalCommand(cmdLine [][]byte) redis.Reply {
write, read := prepare(cmdLine[1:])
db.addVersion(write...)
db.RWLocks(write, read)
defer db.RWUnLocks(write, read)
db.initEvictionMark(write)
db.updateEvictionMark(read)
defer func() {
db.RWUnLocks(write, read)
db.Eviction()
}()
fun := cmd.executor
return fun(db, cmdLine[1:])
}
Expand Down Expand Up @@ -176,6 +200,7 @@ func (db *DB) PutIfAbsent(key string, entity *database.DataEntity) int {
func (db *DB) Remove(key string) {
db.data.RemoveWithLock(key)
db.ttlMap.Remove(key)
db.evictionMap.Remove(key)
taskKey := genExpireTask(key)
timewheel.Cancel(taskKey)
}
Expand Down Expand Up @@ -294,3 +319,88 @@ func (db *DB) ForEach(cb func(key string, data *database.DataEntity, expiration
return cb(key, entity, expiration)
})
}

//eviction
func (db *DB) Eviction() {
// is not out of max-memory,no need to lock
if db.evictionPolicy == nil || !mem.GetMaxMemoryState(nil) {
return
}
mem.Lock.Lock()
defer mem.Lock.Unlock()
var memFreed uint64 = 0
var memToFree uint64
mem.GetMaxMemoryState(memToFree)
for memFreed < memToFree {
var keys []string
if db.evictionPolicy.IsAllKeys() {
keys = db.data.RandomDistinctKeys(config.Properties.MaxmemorySamples)
} else {
keys = db.ttlMap.RandomDistinctKeys(config.Properties.MaxmemorySamples)
}

marks := make([]eviction.KeyMark, config.Properties.MaxmemorySamples)
for i, key := range keys {
mark, _ := db.evictionMap.Get(key)
marks[i] = eviction.KeyMark{
Key: key,
Mark: mark.(int32),
}
}
key := db.evictionPolicy.Eviction(marks)
delta := mem.UsedMemory()
db.Remove(key)
runtime.GC()
delta -= mem.UsedMemory()
memFreed += delta
db.addAof(utils.ToCmdLine2("DEL", key))
}

}

//MakeEviction make a new mark about a key
func (db *DB) initEvictionMark(keys []string) {
if db.evictionPolicy == nil {
return
}
mark := db.evictionPolicy.MakeMark()
for _, key := range keys {
db.evictionMap.Put(key, mark)
}
}

//UpdateMark update mark about eviction
func (db *DB) updateEvictionMark(keys []string) {
if db.evictionPolicy == nil {
return
}
for _, key := range keys {
mark, exists := db.evictionMap.Get(key)
if !exists {
continue
}
l := mark.(int32)
updateMark := db.evictionPolicy.UpdateMark(l)
db.evictionMap.Put(key, updateMark)
}
}

func makeEvictionPolicy() eviction.MaxmemoryPolicy {
policy := config.Properties.MaxmemoryPolicy
if policy == "volatile-lru" {
return &eviction.LRUPolicy{
AllKeys: false,
}
} else if policy == "volatile-lfu" {
return &eviction.LRUPolicy{}
} else if policy == "allkeys-lru" {
return &eviction.LRUPolicy{
AllKeys: true,
}
} else if policy == "allkeys-lfu" {
return &eviction.LRUPolicy{
AllKeys: true,
}
}
return nil
}
13 changes: 13 additions & 0 deletions database/eviction/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package eviction

type MaxmemoryPolicy interface {
MakeMark() int32
UpdateMark(int32) int32
Eviction([]KeyMark) string
IsAllKeys() bool
}

type KeyMark struct {
Key string
Mark int32
}
Loading