BerryKV: 基于Bitcask模型的KV存储
Bitcask起源于Riak 这个分布式数据库,是一个Log-Structured
Key Value存储引擎,并且使用Hash Table作为索引。
本文将探索一下Bitcask的设计并实现一个简单的基于Bitcask的KV数据库
架构
一个Bitcask实例其实就是一个文件夹,里面有多个data file,就是存储的持久化数据。其中只有一个active data file
,表示当前可写入的文件,其余文件都是固定不变的older data file
。当一个文件不论什么原因被关闭后,都不再可写,仅可读。
active data file
只能采用追加写的方式,顺序写可以减少磁盘寻道时间。每一个KV对写入的格式是固定的,共有6个部分组成。crc
循环冗余校验Cyclic Redundancy Checktstamp
时间戳ksz
key的大小vsz
value的大小key
keyval
value
在内存中,维护了一个hash table叫keydir
,Bitcask中的每一个key映射到一个固定大小的结构体,这个结构体存储着在data file找到value的关键信息,分别有以下4个部分:file_id
存储该key的文件idvsz
value的大小val_pos
文件中的偏移量offsettstamp
时间戳
操作
读
读一个key
的值非常的简单,从keydir
中获取到对应的结构体,根据file_id
得到data file
的id,根据val_pos
得到这个key
对应Entry
的偏移量,然后根据对应的ksz
和vsz
就可以得到value
的值了。在大多数情况下,文件系统的read ahead cache
会让读操作比预期的快。
写
写操作首先将对应的Entry
写入到当前的active data file
中,并且更新keydir
中对应的信息。旧的值依然保留在data file
中,在之后的Merge
操作会将这些无用的Entry
合并掉。
删除
删除是特殊的写操作,只需要写入一个特殊的value
值即表明删除了该key
,在之后的Merge
操作会合并掉已经删除的信息。同时需要将keydir
中的索引删除。
Merge
这种简单的模型会造成很多空间上的浪费,data file
中会存在很多已经无效的Entry
,而Merge
操作就是将这些无效的Entry
去除的过程。Merge
会合并所有older data file
成一个或多个data file
,其中仅保留仍然有效的key
的最新的Entry
。
同时在Merge
之后还会为每一个data file
生成hint file
,里面存储的内容和data file
非常像,只是缺少了value
。
当数据库启动时,可以通过扫描hint file
来快速构建keydir
索引,完成启动。
BerryKV
BerryKV是用go实现的Bitcask模型的demo,用于加深对论文的理解。
Model
首先我们需要定义一个Berry
对象,这个对象包含一个active data file用于写入数据,一个map记录id到older data file的映射关系,一个keydir用于索引。
type Berry struct {
sync.Mutex
active *DataFile
olders map[int32]*DataFile
keydir KeyDir
}
DataFile
对象是一个log对象的实例,用于读写数据。包含文件id,文件描述符fd,以及当前的文件偏移量offset
type DataFile struct {
id int32
offset int32
fd *os.File
}
从DataFile
中读取或写入时的单位对象叫做Entry
,包含了论文中提到的crc、timestamp、keysize、valsize、key以及value。
type Entry struct {
CheckSum uint32
Timestamp uint32
KeySize uint32
ValSize uint32
Key string
Value []byte
}
keydir是将key和一些元信息进行映射,方便后续查找。
type KeyDir map[string]Meta
type Meta struct {
FileID int32
EntrySize int32
EntryOffset int32
Timestamp int32
}
Set操作
Set
操作接收一个key
和value
,并将key和value关联起来。
func (b *Berry) Set(key, val string) error {
b.Lock()
defer b.Unlock()
return b.set(b.active, key, []byte(val))
}
根据key和value新生成一个Entry对象,并且写入到active data file中。写入后同步更新keydir中的信息。
func (b *Berry) set(df *DataFile, key string, val []byte) error {
e := NewEntry(key, val)
data := e.Encode()
offset, err := df.Write(data)
if err != nil {
return err
}
b.keydir[key] = Meta{
FileID: int32(df.ID()),
EntrySize: int32(len(data)),
EntryOffset: offset,
Timestamp: int32(time.Now().Unix()),
}
return nil
}
其中Entry.Encode()
方法将entry对象编码成二进制字节序列,方便后续写入到文件中。
func (e *Entry) Encode() []byte {
size := e.Size()
buf := make([]byte, size)
binary.LittleEndian.PutUint32(buf[0:4], e.CheckSum)
binary.LittleEndian.PutUint32(buf[4:8], e.Timestamp)
binary.LittleEndian.PutUint32(buf[8:12], e.KeySize)
binary.LittleEndian.PutUint32(buf[12:16], e.ValSize)
copy(buf[headerSize:headerSize+e.KeySize], []byte(e.Key))
copy(buf[headerSize+e.KeySize:], e.Value)
return buf
}
Get操作
Get
操作根据key寻找对应的value值。首先会在keydir中检索是否存在key,存在的话就在key最后写入的data file中读取value值。
func (b *Berry) Get(key string) (string, error) {
b.Lock()
defer b.Unlock()
meta, ok := b.keydir[key]
if !ok {
return "", ErrKeyNotFound
}
return b.get(meta)
}
func (b *Berry) get(m Meta) (string, error) {
fid := m.FileID
var df *DataFile
if fid == b.active.ID() {
df = b.active
} else {
_, ok := b.olders[fid]
if ok {
df = b.olders[fid]
}
}
if df == nil {
return "", ErrDataFileNotFound
}
return df.Read(m.EntryOffset, m.EntrySize)
}
DataFile.Read()
根据传入的偏移量和Entry大小读取Entry的值,并且解码,从中获取value值。Entry在解码过程中会校验crc值是否正确。
func (df *DataFile) Read(offset, size int32) (string, error) {
buf := make([]byte, size)
_, err := df.fd.ReadAt(buf, int64(offset))
if err != nil && err != io.EOF {
return "", err
}
e := &Entry{}
err = e.Decode(buf)
if err != nil {
return "", err
}
return string(e.Value), nil
}
func (e *Entry) Decode(buf []byte) error {
n := len(buf)
if n < headerSize {
return ErrEntryIllegal
}
e.CheckSum = binary.LittleEndian.Uint32(buf[0:4])
e.Timestamp = binary.LittleEndian.Uint32(buf[4:8])
e.KeySize = binary.LittleEndian.Uint32(buf[8:12])
e.ValSize = binary.LittleEndian.Uint32(buf[12:16])
if n != int(headerSize+e.KeySize+e.ValSize) {
return ErrEntryIllegal
}
e.Key = string(buf[headerSize : headerSize+e.KeySize])
e.Value = make([]byte, e.ValSize)
copy(e.Value, buf[headerSize+e.KeySize:])
crc := crc32.ChecksumIEEE(e.Value)
if crc != e.CheckSum {
return ErrCrcFail
}
return nil
}
Del操作
Del
操作是特殊的Set
操作,通过写入一个特殊值表示该key被删除,同时删除掉keydir中的索引。
func (b *Berry) Del(key string) error {
b.Lock()
defer b.Unlock()
_, ok := b.keydir[key]
if !ok {
return nil
}
return b.del(key)
}
func (b *Berry) del(key string) error {
e := NewEntry(key, []byte(SpecialVal))
data := e.Encode()
_, err := b.active.Write(data)
if err != nil {
return err
}
delete(b.keydir, key)
return nil
}
Merge
Merge
是删除过期Entry来压缩data file文件的操作,减少磁盘空间的消耗。触发Merge
操作有很多的策略和方法,BerryKV
采用了最简单的定时触发,只要到达一定时间就进行一次Merge操作。
Merge操作除了压缩data file外,还会生成一个Hint File
用于记录keydir中的信息,方便db启动是快速创建keydir。
func (b *Berry) Merge(d time.Duration) {
ticker := time.NewTicker(d).C
for range ticker {
b.Lock()
b.merge()
b.makeHintFile()
b.Unlock()
}
}
首先来关注一下Berry.merge()
操作。该操作先创建了一个新的active data file,并将keydir中所有的key对应的value信息写入到新的active中,并且关闭当前的active和所有的olders,然后删除掉所有的data file文件,并将新的active文件替换老的active文件,后续所有的Entry都写入到该active中。
func (b *Berry) merge() error {
// make a temp datafile
tmpDir, err := os.MkdirTemp("", "merge")
if err != nil {
return err
}
defer os.RemoveAll(tmpDir)
mdf, err := NewDataFile(tmpDir, 0)
if err != nil {
return err
}
// rewrite k-v into temp datafile
for k := range b.keydir {
v, _ := b.get(b.keydir[k])
b.set(mdf, k, []byte(v))
}
// close active
b.active.Close()
// close all olders
for _, df := range b.olders {
df.Close()
}
b.olders = make(map[int32]*DataFile)
// remove all datafile
filepath.Walk(DataDir, func(path string, info fs.FileInfo, err error) error {
if err != nil {
return err
}
if info.IsDir() {
return nil
}
if filepath.Ext(path) == ".db" {
err := os.Remove(path)
if err != nil {
return err
}
}
return nil
})
// replace active datafile
os.Rename(filepath.Join(tmpDir, fmt.Sprintf(DataFileNameFormat, 0)),
filepath.Join(DataDir, fmt.Sprintf(DataFileNameFormat, 0)))
b.active = mdf
return nil
}
创建hint file就是将keydir序列化到文件中。
func (b *Berry) makeHintFile() error {
path := filepath.Join(DataDir, HintFile)
err := b.keydir.Encode(path)
if err != nil {
return err
}
return nil
}
func (k *KeyDir) Encode(path string) error {
file, err := os.Create(path)
if err != nil {
return err
}
defer file.Close()
encoder := gob.NewEncoder(file)
err = encoder.Encode(k)
if err != nil {
return err
}
return nil
}
Check active file size
当active file的大小超过了文件的最大限制时,将当前active变成只读的older file,并新建一个active文件进行新的写入。
为了方便实现,这个操作也是定时触发检查。
func (b *Berry) CheckActiveFileSize(d time.Duration) {
ticker := time.NewTicker(d).C
for range ticker {
b.checkActiveFileSize()
}
}
func (b *Berry) checkActiveFileSize() error {
b.Lock()
defer b.Unlock()
stat, err := b.active.fd.Stat()
if err != nil {
return err
}
size := stat.Size()
if size < MaxDataFileSize {
return nil
}
id := b.active.ID()
b.olders[id] = b.active
df, err := NewDataFile(DataDir, id+1)
if err != nil {
return err
}
b.active = df
return nil
}
启动DB
当data file不论什么原因被关闭后,都会变成只读状态的older data file,所以db启动的时候,会先去把所有的data file都加载成到olders中,并生成一个新的data file作为active。如果有hint file存在,则解析hint file用于构建keydir。完成加载之后,创建两个协程定时进行Merge操作和Check File Size操作。
func New() (*Berry, error) {
var maxID int32 = 0
olders := make(map[int32]*DataFile)
// get all datafiles
files, err := filepath.Glob(fmt.Sprintf("%s/*.db", DataDir))
if err != nil {
return nil, err
}
for _, file := range files {
filename := filepath.Base(file)
id, err := strconv.ParseInt(strings.TrimPrefix(strings.TrimSuffix(filename, ".db"), "berry_"), 10, 32)
if err != nil {
return nil, err
}
fd, err := os.Open(file)
if err != nil {
return nil, err
}
stat, err := fd.Stat()
if err != nil {
return nil, err
}
df := &DataFile{
id: int32(id),
fd: fd,
offset: int32(stat.Size()),
}
olders[int32(id)] = df
if int32(id) > maxID {
maxID = int32(id)
}
}
activeDF, err := NewDataFile(DataDir, maxID+1)
if err != nil {
return nil, err
}
keydir := make(KeyDir)
// check if a hint file already
hintFile := filepath.Join(DataDir, HintFile)
_, err = os.Stat(hintFile)
if err == nil {
keydir.Decode(hintFile)
}
b := &Berry{
active: activeDF,
olders: olders,
keydir: keydir,
}
go b.CheckActiveFileSize(defaultCheckFileSizeInterval)
go b.Merge(defaultMergeInterval)
return b, nil
}