BerryKV: 基于Bitcask模型的KV存储

2023-04-07
7分钟阅读时长

Bitcask起源于Riak 这个分布式数据库,是一个Log-Structured Key Value存储引擎,并且使用Hash Table作为索引。

本文将探索一下Bitcask的设计并实现一个简单的基于Bitcask的KV数据库

架构

一个Bitcask实例其实就是一个文件夹,里面有多个data file,就是存储的持久化数据。其中只有一个active data file,表示当前可写入的文件,其余文件都是固定不变的older data file。当一个文件不论什么原因被关闭后,都不再可写,仅可读。

active data file只能采用追加写的方式,顺序写可以减少磁盘寻道时间。每一个KV对写入的格式是固定的,共有6个部分组成。
crc 循环冗余校验Cyclic Redundancy Check
tstamp 时间戳
ksz key的大小
vsz value的大小
key key
val value

在内存中,维护了一个hash table叫keydir,Bitcask中的每一个key映射到一个固定大小的结构体,这个结构体存储着在data file找到value的关键信息,分别有以下4个部分:
file_id 存储该key的文件id
vsz value的大小
val_pos 文件中的偏移量offset
tstamp 时间戳

操作

读一个key的值非常的简单,从keydir中获取到对应的结构体,根据file_id得到data file的id,根据val_pos得到这个key对应Entry的偏移量,然后根据对应的kszvsz就可以得到value的值了。在大多数情况下,文件系统的read ahead cache会让读操作比预期的快。

写操作首先将对应的Entry写入到当前的active data file中,并且更新keydir中对应的信息。旧的值依然保留在data file中,在之后的Merge操作会将这些无用的Entry合并掉。

删除

删除是特殊的写操作,只需要写入一个特殊的value值即表明删除了该key,在之后的Merge操作会合并掉已经删除的信息。同时需要将keydir中的索引删除。

Merge

这种简单的模型会造成很多空间上的浪费,data file中会存在很多已经无效的Entry,而Merge操作就是将这些无效的Entry去除的过程。Merge会合并所有older data file成一个或多个data file,其中仅保留仍然有效的key的最新的Entry

同时在Merge之后还会为每一个data file生成hint file,里面存储的内容和data file非常像,只是缺少了value

当数据库启动时,可以通过扫描hint file来快速构建keydir索引,完成启动。

BerryKV

BerryKV是用go实现的Bitcask模型的demo,用于加深对论文的理解。

Model

首先我们需要定义一个Berry对象,这个对象包含一个active data file用于写入数据,一个map记录id到older data file的映射关系,一个keydir用于索引。

type Berry struct {
	sync.Mutex
	active *DataFile
	olders map[int32]*DataFile
	keydir KeyDir
}

DataFile对象是一个log对象的实例,用于读写数据。包含文件id,文件描述符fd,以及当前的文件偏移量offset

type DataFile struct {
	id     int32
	offset int32
	fd     *os.File
}

DataFile中读取或写入时的单位对象叫做Entry,包含了论文中提到的crc、timestamp、keysize、valsize、key以及value。

type Entry struct {
	CheckSum  uint32
	Timestamp uint32
	KeySize   uint32
	ValSize   uint32
	Key       string
	Value     []byte
}

keydir是将key和一些元信息进行映射,方便后续查找。

type KeyDir map[string]Meta

type Meta struct {
	FileID      int32
	EntrySize   int32
	EntryOffset int32
	Timestamp   int32
}

Set操作

Set操作接收一个keyvalue,并将key和value关联起来。

func (b *Berry) Set(key, val string) error {
	b.Lock()
	defer b.Unlock()

	return b.set(b.active, key, []byte(val))
}

根据key和value新生成一个Entry对象,并且写入到active data file中。写入后同步更新keydir中的信息。

func (b *Berry) set(df *DataFile, key string, val []byte) error {
	e := NewEntry(key, val)
	data := e.Encode()

	offset, err := df.Write(data)
	if err != nil {
		return err
	}

	b.keydir[key] = Meta{
		FileID:      int32(df.ID()),
		EntrySize:   int32(len(data)),
		EntryOffset: offset,
		Timestamp:   int32(time.Now().Unix()),
	}
	return nil
}

其中Entry.Encode()方法将entry对象编码成二进制字节序列,方便后续写入到文件中。

func (e *Entry) Encode() []byte {
	size := e.Size()
	buf := make([]byte, size)
	binary.LittleEndian.PutUint32(buf[0:4], e.CheckSum)
	binary.LittleEndian.PutUint32(buf[4:8], e.Timestamp)
	binary.LittleEndian.PutUint32(buf[8:12], e.KeySize)
	binary.LittleEndian.PutUint32(buf[12:16], e.ValSize)
	copy(buf[headerSize:headerSize+e.KeySize], []byte(e.Key))
	copy(buf[headerSize+e.KeySize:], e.Value)
	return buf
}

Get操作

Get操作根据key寻找对应的value值。首先会在keydir中检索是否存在key,存在的话就在key最后写入的data file中读取value值。

func (b *Berry) Get(key string) (string, error) {
	b.Lock()
	defer b.Unlock()

	meta, ok := b.keydir[key]
	if !ok {
		return "", ErrKeyNotFound
	}

	return b.get(meta)
}

func (b *Berry) get(m Meta) (string, error) {
	fid := m.FileID
	var df *DataFile

	if fid == b.active.ID() {
		df = b.active
	} else {
		_, ok := b.olders[fid]
		if ok {
			df = b.olders[fid]
		}
	}

	if df == nil {
		return "", ErrDataFileNotFound
	}

	return df.Read(m.EntryOffset, m.EntrySize)
}

DataFile.Read()根据传入的偏移量和Entry大小读取Entry的值,并且解码,从中获取value值。Entry在解码过程中会校验crc值是否正确。

func (df *DataFile) Read(offset, size int32) (string, error) {
	buf := make([]byte, size)

	_, err := df.fd.ReadAt(buf, int64(offset))
	if err != nil && err != io.EOF {
		return "", err
	}

	e := &Entry{}
	err = e.Decode(buf)
	if err != nil {
		return "", err
	}

	return string(e.Value), nil
}

func (e *Entry) Decode(buf []byte) error {
	n := len(buf)
	if n < headerSize {
		return ErrEntryIllegal
	}

	e.CheckSum = binary.LittleEndian.Uint32(buf[0:4])
	e.Timestamp = binary.LittleEndian.Uint32(buf[4:8])
	e.KeySize = binary.LittleEndian.Uint32(buf[8:12])
	e.ValSize = binary.LittleEndian.Uint32(buf[12:16])

	if n != int(headerSize+e.KeySize+e.ValSize) {
		return ErrEntryIllegal
	}

	e.Key = string(buf[headerSize : headerSize+e.KeySize])
	e.Value = make([]byte, e.ValSize)
	copy(e.Value, buf[headerSize+e.KeySize:])
	crc := crc32.ChecksumIEEE(e.Value)
	if crc != e.CheckSum {
		return ErrCrcFail
	}
	return nil
}

Del操作

Del操作是特殊的Set操作,通过写入一个特殊值表示该key被删除,同时删除掉keydir中的索引。

func (b *Berry) Del(key string) error {
	b.Lock()
	defer b.Unlock()

	_, ok := b.keydir[key]
	if !ok {
		return nil
	}

	return b.del(key)
}

func (b *Berry) del(key string) error {
	e := NewEntry(key, []byte(SpecialVal))
	data := e.Encode()

	_, err := b.active.Write(data)
	if err != nil {
		return err
	}

	delete(b.keydir, key)

	return nil
}

Merge

Merge是删除过期Entry来压缩data file文件的操作,减少磁盘空间的消耗。触发Merge操作有很多的策略和方法,BerryKV采用了最简单的定时触发,只要到达一定时间就进行一次Merge操作。

Merge操作除了压缩data file外,还会生成一个Hint File用于记录keydir中的信息,方便db启动是快速创建keydir。

func (b *Berry) Merge(d time.Duration) {
	ticker := time.NewTicker(d).C

	for range ticker {
		b.Lock()
		b.merge()
		b.makeHintFile()
		b.Unlock()
	}
}

首先来关注一下Berry.merge()操作。该操作先创建了一个新的active data file,并将keydir中所有的key对应的value信息写入到新的active中,并且关闭当前的active和所有的olders,然后删除掉所有的data file文件,并将新的active文件替换老的active文件,后续所有的Entry都写入到该active中。

func (b *Berry) merge() error {
	// make a temp datafile
	tmpDir, err := os.MkdirTemp("", "merge")
	if err != nil {
		return err
	}
	defer os.RemoveAll(tmpDir)

	mdf, err := NewDataFile(tmpDir, 0)
	if err != nil {
		return err
	}

	// rewrite k-v into temp datafile
	for k := range b.keydir {
		v, _ := b.get(b.keydir[k])
		b.set(mdf, k, []byte(v))
	}

	// close active
	b.active.Close()

	// close all olders
	for _, df := range b.olders {
		df.Close()
	}

	b.olders = make(map[int32]*DataFile)

	// remove all datafile
	filepath.Walk(DataDir, func(path string, info fs.FileInfo, err error) error {
		if err != nil {
			return err
		}
		if info.IsDir() {
			return nil
		}
		if filepath.Ext(path) == ".db" {
			err := os.Remove(path)
			if err != nil {
				return err
			}
		}
		return nil
	})

	// replace active datafile
	os.Rename(filepath.Join(tmpDir, fmt.Sprintf(DataFileNameFormat, 0)),
		filepath.Join(DataDir, fmt.Sprintf(DataFileNameFormat, 0)))

	b.active = mdf

	return nil
}

创建hint file就是将keydir序列化到文件中。

func (b *Berry) makeHintFile() error {
	path := filepath.Join(DataDir, HintFile)
	err := b.keydir.Encode(path)
	if err != nil {
		return err
	}

	return nil
}

func (k *KeyDir) Encode(path string) error {
	file, err := os.Create(path)
	if err != nil {
		return err
	}
	defer file.Close()

	encoder := gob.NewEncoder(file)
	err = encoder.Encode(k)
	if err != nil {
		return err
	}

	return nil
}

Check active file size

当active file的大小超过了文件的最大限制时,将当前active变成只读的older file,并新建一个active文件进行新的写入。

为了方便实现,这个操作也是定时触发检查。

func (b *Berry) CheckActiveFileSize(d time.Duration) {
	ticker := time.NewTicker(d).C

	for range ticker {
		b.checkActiveFileSize()
	}
}

func (b *Berry) checkActiveFileSize() error {
	b.Lock()
	defer b.Unlock()

	stat, err := b.active.fd.Stat()
	if err != nil {
		return err
	}

	size := stat.Size()
	if size < MaxDataFileSize {
		return nil
	}

	id := b.active.ID()
	b.olders[id] = b.active

	df, err := NewDataFile(DataDir, id+1)
	if err != nil {
		return err
	}

	b.active = df

	return nil
}

启动DB

当data file不论什么原因被关闭后,都会变成只读状态的older data file,所以db启动的时候,会先去把所有的data file都加载成到olders中,并生成一个新的data file作为active。如果有hint file存在,则解析hint file用于构建keydir。完成加载之后,创建两个协程定时进行Merge操作和Check File Size操作。

func New() (*Berry, error) {
	var maxID int32 = 0
	olders := make(map[int32]*DataFile)

	// get all datafiles
	files, err := filepath.Glob(fmt.Sprintf("%s/*.db", DataDir))
	if err != nil {
		return nil, err
	}

	for _, file := range files {
		filename := filepath.Base(file)
		id, err := strconv.ParseInt(strings.TrimPrefix(strings.TrimSuffix(filename, ".db"), "berry_"), 10, 32)
		if err != nil {
			return nil, err
		}

		fd, err := os.Open(file)
		if err != nil {
			return nil, err
		}

		stat, err := fd.Stat()
		if err != nil {
			return nil, err
		}

		df := &DataFile{
			id:     int32(id),
			fd:     fd,
			offset: int32(stat.Size()),
		}

		olders[int32(id)] = df

		if int32(id) > maxID {
			maxID = int32(id)
		}
	}

	activeDF, err := NewDataFile(DataDir, maxID+1)
	if err != nil {
		return nil, err
	}

	keydir := make(KeyDir)

	// check if a hint file already
	hintFile := filepath.Join(DataDir, HintFile)
	_, err = os.Stat(hintFile)
	if err == nil {
		keydir.Decode(hintFile)
	}

	b := &Berry{
		active: activeDF,
		olders: olders,
		keydir: keydir,
	}

	go b.CheckActiveFileSize(defaultCheckFileSizeInterval)

	go b.Merge(defaultMergeInterval)

	return b, nil
}

参考

github bitcask
bitcask paper