Skip to content

Commit

Permalink
combine memtables before flushing to L0
Browse files Browse the repository at this point in the history
Taken from PR #1696, commit b21f591
  • Loading branch information
mangalaman93 committed Feb 18, 2023
1 parent 328c10d commit c569be1
Showing 1 changed file with 58 additions and 10 deletions.
68 changes: 58 additions & 10 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -826,6 +826,7 @@ func (db *DB) writeRequests(reqs []*request) error {
}
count += len(b.Entries)
var i uint64
var err error
for err = db.ensureRoomForWrite(); err == errNoRoom; err = db.ensureRoomForWrite() {
i++
if i%100 == 0 {
Expand Down Expand Up @@ -1010,10 +1011,16 @@ func arenaSize(opt Options) int64 {

// buildL0Table builds a new table from the memtable.
func buildL0Table(ft flushTask, bopts table.Options) *table.Builder {
iter := ft.mt.sl.NewIterator()
var iter y.Iterator
if ft.itr != nil {
iter = ft.itr
} else {
iter = ft.mt.sl.NewUniIterator(false)
}
defer iter.Close()

b := table.NewTableBuilder(bopts)
for iter.SeekToFirst(); iter.Valid(); iter.Next() {
for iter.Rewind(); iter.Valid(); iter.Next() {
if len(ft.dropPrefixes) > 0 && hasAnyPrefixes(iter.Key(), ft.dropPrefixes) {
continue
}
Expand All @@ -1029,16 +1036,13 @@ func buildL0Table(ft flushTask, bopts table.Options) *table.Builder {

type flushTask struct {
mt *memTable
itr y.Iterator
dropPrefixes [][]byte
}

// handleFlushTask must be run serially.
func (db *DB) handleFlushTask(ft flushTask) error {
// There can be a scenario, when empty memtable is flushed.
if ft.mt.sl.Empty() {
return nil
}

// ft.mt could be nil with ft.itr being the valid field.
bopts := buildTableOptions(db)
builder := buildL0Table(ft, bopts)
defer builder.Close()
Expand Down Expand Up @@ -1074,11 +1078,51 @@ func (db *DB) handleFlushTask(ft flushTask) error {
func (db *DB) flushMemtable(lc *z.Closer) error {
defer lc.Done()

var sz int64
var itrs []y.Iterator
var mts []*memTable
slurp := func() {
for {
select {
case more, ok := <-db.flushChan:
if !ok {
return
}
if more.mt == nil {
continue
}
sl := more.mt.sl
itrs = append(itrs, sl.NewUniIterator(false))
mts = append(mts, more.mt)

sz += sl.MemSize()
if sz > db.opt.MemTableSize {
return
}
default:
return
}
}
}

for ft := range db.flushChan {
if ft.mt == nil {
// We close db.flushChan now, instead of sending a nil ft.mt.
continue
}
sz = ft.mt.sl.MemSize()
// Reset of itrs, mts etc. is being done below.
y.AssertTrue(len(itrs) == 0 && len(mts) == 0)
itrs = append(itrs, ft.mt.sl.NewUniIterator(false))
mts = append(mts, ft.mt)

// Pick more memtables, so we can really fill up the L0 table.
slurp()

// db.opt.Infof("Picked %d memtables. Size: %d\n", len(itrs), sz)
ft.mt = nil
ft.itr = table.NewMergeIterator(itrs, false)

for {
err := db.handleFlushTask(ft)
if err == nil {
Expand All @@ -1089,9 +1133,11 @@ func (db *DB) flushMemtable(lc *z.Closer) error {
// which would arrive here would match db.imm[0], because we acquire a
// lock over DB when pushing to flushChan.
// TODO: This logic is dirty AF. Any change and this could easily break.
y.AssertTrue(ft.mt == db.imm[0])
db.imm = db.imm[1:]
ft.mt.DecrRef() // Return memory.
for _, mt := range mts {
y.AssertTrue(mt == db.imm[0])
db.imm = db.imm[1:]
mt.DecrRef() // Return memory.
}
db.lock.Unlock()

break
Expand All @@ -1100,6 +1146,8 @@ func (db *DB) flushMemtable(lc *z.Closer) error {
db.opt.Errorf("Failure while flushing memtable to disk: %v. Retrying...\n", err)
time.Sleep(time.Second)
}
// Reset everything.
itrs, mts, sz = itrs[:0], mts[:0], 0
}
return nil
}
Expand Down

0 comments on commit c569be1

Please sign in to comment.