Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor merge policy #17025

Closed
wants to merge 53 commits into from
Closed
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
72b57ee
refactor merge policy
w-zr Jun 14, 2024
5de9055
reduce single object merge frequency
w-zr Jun 14, 2024
7090769
reduce multi object merge frequency
w-zr Jun 14, 2024
753b45c
Merge remote-tracking branch 'upstream/main' into refacor-merge-policy
w-zr Jun 19, 2024
a8a0c19
rename merge scheduler
w-zr Jun 19, 2024
af35bdf
reduce multi object merge freqency
w-zr Jun 19, 2024
8c2461c
Merge remote-tracking branch 'upstream/main' into refacor-merge-policy
w-zr Jun 19, 2024
d1cf9ed
copy single object targets
w-zr Jun 19, 2024
97351e3
improve log print & limit cpu usage
w-zr Jun 19, 2024
19598b7
add some log
w-zr Jun 19, 2024
322f3cd
remove unnessary lock
w-zr Jun 19, 2024
e408457
fix deadlock
w-zr Jun 20, 2024
09b8a44
add lock to get createdAt
w-zr Jun 20, 2024
19f8e90
remove some log
w-zr Jun 20, 2024
7dd9710
Merge remote-tracking branch 'upstream/main' into refacor-merge-policy
w-zr Jun 20, 2024
0cb13ee
revert some change
w-zr Jun 20, 2024
5f357b1
Merge remote-tracking branch 'upstream/main' into refacor-merge-policy
w-zr Jun 20, 2024
d677326
refactor lock
w-zr Jun 20, 2024
0b4e413
Merge branch 'main' into refacor-merge-policy
mergify[bot] Jun 20, 2024
279c5e7
fix sca error
w-zr Jun 20, 2024
36c2da9
Merge branch 'refacor-merge-policy' of github.com:w-zr/matrixone into…
w-zr Jun 20, 2024
0558812
Merge remote-tracking branch 'upstream/main' into refacor-merge-policy
w-zr Jul 3, 2024
35dbb64
use deltaLoc to determin single object merge
w-zr Jul 3, 2024
8dcadd9
reduce merge size
w-zr Jul 3, 2024
2bd515f
enable cn merge
w-zr Jul 3, 2024
9fb7e4a
reduce multi object merge size
w-zr Jul 3, 2024
9ff911a
reduce log
w-zr Jul 4, 2024
7b27e16
dn merge
w-zr Jul 4, 2024
f67cf50
refactor
w-zr Jul 4, 2024
1e6424d
Merge remote-tracking branch 'upstream/main' into refacor-merge-policy
w-zr Jul 4, 2024
2211bd8
fix merge error
w-zr Jul 4, 2024
f714768
fix bug
w-zr Jul 5, 2024
8c8d6bd
reduce multi obj merge
w-zr Jul 5, 2024
6931f5c
refresh mem info before exec
w-zr Jul 5, 2024
dcd6407
log obj size
w-zr Jul 5, 2024
d7b3ab4
humanreadable bytes
w-zr Jul 5, 2024
693c305
fix bug
w-zr Jul 5, 2024
28ce572
restrict active merge blk
w-zr Jul 5, 2024
001e98d
remove merging object
w-zr Jul 5, 2024
e529636
fix error
w-zr Jul 5, 2024
43fe5c7
add min delete limit
w-zr Jul 5, 2024
200d8c5
disable single obj policy
w-zr Jul 8, 2024
5f9baab
do not consider cpu usage
w-zr Jul 8, 2024
58a85b9
release memory restrict
w-zr Jul 8, 2024
43613e5
fix
w-zr Jul 8, 2024
dcdf6ef
add restiction
w-zr Jul 8, 2024
52b62a2
do not merge same table too frequently
w-zr Jul 8, 2024
bd5e72a
Merge remote-tracking branch 'upstream/main' into refacor-merge-policy
w-zr Jul 9, 2024
a108a59
remove restriction
w-zr Jul 9, 2024
ff6af73
fix error
w-zr Jul 9, 2024
20946c2
merge overlapped objects with most cnt
w-zr Jul 10, 2024
2e82447
reuse memory
w-zr Jul 10, 2024
6486bb3
do not consider big gap
w-zr Jul 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/cnservice/server_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,11 +273,11 @@ func (s *service) registerExecutorsLocked() {
stats := objectio.ObjectStats(b)
objs[i] = stats.ObjectName().String()
}
sql := fmt.Sprintf("select mo_ctl('DN', 'MERGEOBJECTS', '%s.%s:%s')",
mergeTask.DbName, mergeTask.TableName, strings.Join(objs, ","))
sql := fmt.Sprintf("select mo_ctl('DN', 'MERGEOBJECTS', '.%d.%d:%s')",
mergeTask.TblId, mergeTask.AccountId, strings.Join(objs, ","))
ctx, cancel := context.WithTimeout(ctx, 10*time.Minute)
defer cancel()
opts := executor.Options{}.WithAccountID(mergeTask.AccountId).WithWaitCommittedLogApplied()
opts := executor.Options{}.WithWaitCommittedLogApplied()
_, err = s.sqlExecutor.Exec(ctx, sql, opts)
return err
},
Expand Down
98 changes: 50 additions & 48 deletions pkg/vm/engine/tae/db/checkpoint/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -967,65 +967,67 @@ func (r *runner) tryCompactTree(entry *logtail.DirtyTreeEntry, force bool) {
dirtyTree := entry.GetTree().GetTable(table.ID)
_, endTs := entry.GetTimeRange()

stats := table.Stats
stats.Lock()
defer stats.Unlock()
func() {
stats := table.Stats
stats.Lock()
defer stats.Unlock()

if force {
logutil.Infof("[flushtabletail] force flush %v-%s", table.ID, table.GetLastestSchemaLocked().Name)
if err := r.fireFlushTabletail(table, dirtyTree, endTs); err == nil {
stats.ResetDeadlineWithLock()
}
return
}

if force {
logutil.Infof("[flushtabletail] force flush %v-%s", table.ID, table.GetLastestSchemaLocked().Name)
if err := r.fireFlushTabletail(table, dirtyTree, endTs); err == nil {
if stats.LastFlush.IsEmpty() {
// first boot, just bail out, and never enter this branch again
stats.LastFlush = stats.LastFlush.Next()
stats.ResetDeadlineWithLock()
return
}
continue
}

if stats.LastFlush.IsEmpty() {
// first boot, just bail out, and never enter this branch again
stats.LastFlush = stats.LastFlush.Next()
stats.ResetDeadlineWithLock()
continue
}

flushReady := func() bool {
if !table.IsActive() {
count++
if pressure < 0.5 || count < 200 {
flushReady := func() bool {
if !table.IsActive() {
count++
if pressure < 0.5 || count < 200 {
return true
}
return false
}
if stats.FlushDeadline.Before(time.Now()) {
return true
}
if asize+dsize > stats.FlushMemCapacity {
return true
}
if asize < common.Const1MBytes && dsize > 2*common.Const1MBytes+common.Const1MBytes/2 {
return true
}
if asize > common.Const1MBytes && rand.Float64() < pressure {
return true
}
return false
}
if stats.FlushDeadline.Before(time.Now()) {
return true
}
if asize+dsize > stats.FlushMemCapacity {
return true
}
if asize < common.Const1MBytes && dsize > 2*common.Const1MBytes+common.Const1MBytes/2 {
return true
}
if asize > common.Const1MBytes && rand.Float64() < pressure {
return true
}
return false
}

ready := flushReady()
// debug log, delete later
if !stats.LastFlush.IsEmpty() && asize+dsize > 2*1000*1024 {
logutil.Infof("[flushtabletail] %v(%v) %v dels FlushCountDown %v, flushReady %v",
table.GetLastestSchemaLocked().Name,
common.HumanReadableBytes(asize+dsize),
common.HumanReadableBytes(dsize),
time.Until(stats.FlushDeadline),
ready,
)
}
ready := flushReady()
// debug log, delete later
if !stats.LastFlush.IsEmpty() && asize+dsize > 2*1000*1024 {
logutil.Infof("[flushtabletail] %v(%v) %v dels FlushCountDown %v, flushReady %v",
table.GetLastestSchemaLocked().Name,
common.HumanReadableBytes(asize+dsize),
common.HumanReadableBytes(dsize),
time.Until(stats.FlushDeadline),
ready,
)
}

if ready {
if err := r.fireFlushTabletail(table, dirtyTree, endTs); err == nil {
stats.ResetDeadlineWithLock()
if ready {
if err := r.fireFlushTabletail(table, dirtyTree, endTs); err == nil {
stats.ResetDeadlineWithLock()
}
}
}
}()
}
}

Expand Down
1 change: 1 addition & 0 deletions pkg/vm/engine/tae/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ type DB struct {
BGScanner wb.IHeartbeater
BGCheckpointRunner checkpoint.Runner
MergeHandle *MergeTaskBuilder
mergeScheduler *merge.Scheduler

DiskCleaner *gc2.DiskCleaner

Expand Down
88 changes: 71 additions & 17 deletions pkg/vm/engine/tae/db/merge/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,17 @@ package merge
import (
"bytes"
"context"
"errors"
"fmt"
"math"
"sync"
"sync/atomic"

"github.com/KimMachineGun/automemlimit/memlimit"
"github.com/matrixorigin/matrixone/pkg/logutil"
"github.com/matrixorigin/matrixone/pkg/objectio"
"github.com/matrixorigin/matrixone/pkg/pb/api"
v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2"

"github.com/KimMachineGun/automemlimit/memlimit"
"github.com/matrixorigin/matrixone/pkg/logutil"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/db/dbutils"
Expand All @@ -36,6 +36,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tasks"
"github.com/shirou/gopsutil/v3/cpu"
"github.com/shirou/gopsutil/v3/mem"
"go.uber.org/zap"
)

type activeTaskStats map[uint64]struct {
Expand All @@ -51,8 +52,8 @@ type MergeExecutor struct {
memAvail int
memSpare int // 10% of total memory or container memory limit
cpuPercent float64
activeMergeBlkCount int32
activeEstimateBytes int64
activeMergeBlkCount atomic.Int32
activeEstimateBytes atomic.Int64
taskConsume struct {
sync.Mutex
o map[objectio.ObjectId]struct{}
Expand Down Expand Up @@ -98,22 +99,22 @@ func (e *MergeExecutor) RefreshMemInfo() {
}

func (e *MergeExecutor) PrintStats() {
cnt := atomic.LoadInt32(&e.activeMergeBlkCount)
cnt := e.activeMergeBlkCount.Load()
if cnt == 0 && e.MemAvailBytes() > 512*common.Const1MBytes {
return
}

logutil.Infof(
"Mergeblocks avail mem: %v(%v reserved), active mergeing size: %v, active merging blk cnt: %d",
"[Mergeblocks] avail mem: %v(%v reserved), active mergeing size: %v, active merging blk cnt: %d",
common.HumanReadableBytes(e.memAvail),
common.HumanReadableBytes(e.memSpare),
common.HumanReadableBytes(int(atomic.LoadInt64(&e.activeEstimateBytes))), cnt,
common.HumanReadableBytes(int(e.activeEstimateBytes.Load())), cnt,
)
}

func (e *MergeExecutor) AddActiveTask(taskId uint64, blkn, esize int) {
atomic.AddInt64(&e.activeEstimateBytes, int64(esize))
atomic.AddInt32(&e.activeMergeBlkCount, int32(blkn))
e.activeEstimateBytes.Add(int64(esize))
e.activeMergeBlkCount.Add(int32(blkn))
e.taskConsume.Lock()
if e.taskConsume.m == nil {
e.taskConsume.m = make(activeTaskStats)
Expand All @@ -133,11 +134,42 @@ func (e *MergeExecutor) OnExecDone(v any) {
delete(e.taskConsume.m, task.ID())
e.taskConsume.Unlock()

atomic.AddInt32(&e.activeMergeBlkCount, -int32(stat.blk))
atomic.AddInt64(&e.activeEstimateBytes, -int64(stat.estBytes))
e.activeMergeBlkCount.Add(-int32(stat.blk))
e.activeEstimateBytes.Add(-int64(stat.estBytes))
}

func (e *MergeExecutor) ExecuteSingleObjMerge(entry *catalog.TableEntry, mobjs []*catalog.ObjectEntry, kind TaskHostKind) {
e.tableName = fmt.Sprintf("%v-%v", entry.ID, entry.GetLastestSchema().Name)

if ActiveCNObj.CheckOverlapOnCNActive(mobjs) {
return
}

osize, _, _ := estimateMergeConsume(mobjs)
blkCnt := 0
for _, obj := range mobjs {
blkCnt += obj.BlockCnt()
}

for _, obj := range mobjs {
factory := func(ctx *tasks.Context, txn txnif.AsyncTxn) (tasks.Task, error) {
return jobs.NewMergeObjectsTask(ctx, txn, []*catalog.ObjectEntry{obj}, e.rt, common.DefaultMaxOsizeObjMB*common.Const1MBytes)
}
task, err := e.rt.Scheduler.ScheduleMultiScopedTxnTaskWithObserver(nil, tasks.DataCompactionTask, []common.ID{*obj.AsCommonID()}, factory, e)
if err != nil {
if !errors.Is(err, tasks.ErrScheduleScopeConflict) {
logutil.Info("[Mergeblocks] Schedule error", zap.Error(err))
}
return
}
singleOSize, singleESize, _ := estimateSingleObjMergeConsume(obj)
e.AddActiveTask(task.ID(), obj.BlockCnt(), singleESize)
logSingleObjMergeTask(e.tableName, task.ID(), obj, obj.BlockCnt(), singleOSize, singleESize)
}
entry.Stats.AddMerge(osize, len(mobjs), blkCnt)
}

func (e *MergeExecutor) ExecuteFor(entry *catalog.TableEntry, mobjs []*catalog.ObjectEntry, kind TaskHostKind) {
func (e *MergeExecutor) ExecuteMultiObjMerge(entry *catalog.TableEntry, mobjs []*catalog.ObjectEntry, kind TaskHostKind) {
e.tableName = fmt.Sprintf("%v-%v", entry.ID, entry.GetLastestSchema().Name)

if ActiveCNObj.CheckOverlapOnCNActive(mobjs) {
Expand Down Expand Up @@ -176,7 +208,7 @@ func (e *MergeExecutor) ExecuteFor(entry *catalog.TableEntry, mobjs []*catalog.O
ActiveCNObj.AddActiveCNObj(mobjs)
logMergeTask(e.tableName, math.MaxUint64, mobjs, blkCnt, osize, esize)
} else {
logutil.Warnf("mergeblocks send to cn error: %v", err)
logutil.Error("[Mergeblocks] Send to cn error", zap.Error(err))
return
}
} else {
Expand All @@ -191,8 +223,8 @@ func (e *MergeExecutor) ExecuteFor(entry *catalog.TableEntry, mobjs []*catalog.O
}
task, err := e.rt.Scheduler.ScheduleMultiScopedTxnTaskWithObserver(nil, tasks.DataCompactionTask, scopes, factory, e)
if err != nil {
if err != tasks.ErrScheduleScopeConflict {
logutil.Infof("[Mergeblocks] Schedule error info=%v", err)
if !errors.Is(err, tasks.ErrScheduleScopeConflict) {
logutil.Info("[Mergeblocks] Schedule error", zap.Error(err))
}
return
}
Expand All @@ -204,7 +236,7 @@ func (e *MergeExecutor) ExecuteFor(entry *catalog.TableEntry, mobjs []*catalog.O
}

func (e *MergeExecutor) MemAvailBytes() int {
merging := int(atomic.LoadInt64(&e.activeEstimateBytes))
merging := int(e.activeEstimateBytes.Load())
avail := e.memAvail - e.memSpare - merging
if avail < 0 {
avail = 0
Expand All @@ -216,6 +248,28 @@ func (e *MergeExecutor) CPUPercent() int64 {
return int64(e.cpuPercent)
}

func logSingleObjMergeTask(name string, taskId uint64, obj *catalog.ObjectEntry, blkn, osize, esize int) {
rows := obj.GetRemainingRows()
infoBuf := &bytes.Buffer{}
infoBuf.WriteString(fmt.Sprintf(" %d(%s)", rows, common.ShortObjId(obj.ID)))
platform := fmt.Sprintf("t%d", taskId)
if taskId == math.MaxUint64 {
platform = "CN"
v2.TaskCNMergeScheduledByCounter.Inc()
v2.TaskCNMergedSizeCounter.Add(float64(osize))
} else {
v2.TaskDNMergeScheduledByCounter.Inc()
v2.TaskDNMergedSizeCounter.Add(float64(osize))
}
logutil.Infof(
"[Mergeblocks] Scheduled %v [%v|on1,bn%d|%s,%s], merged(%v): %s", name,
platform, blkn,
common.HumanReadableBytes(osize), common.HumanReadableBytes(esize),
rows,
infoBuf.String(),
)
}

func logMergeTask(name string, taskId uint64, merges []*catalog.ObjectEntry, blkn, osize, esize int) {
rows := 0
infoBuf := &bytes.Buffer{}
Expand Down
10 changes: 8 additions & 2 deletions pkg/vm/engine/tae/db/merge/mod.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,13 +200,19 @@ const (
)

type Policy interface {
OnObject(obj *catalog.ObjectEntry, force bool)
Revise(cpu, mem int64, largeFirst bool) ([]*catalog.ObjectEntry, TaskHostKind)
OnObject(obj *catalog.ObjectEntry)
Revise(cpu, mem int64) ([]*catalog.ObjectEntry, TaskHostKind)
ResetForTable(*catalog.TableEntry)
SetConfig(*catalog.TableEntry, func() txnif.AsyncTxn, any)
GetConfig(*catalog.TableEntry) any
}

type policy interface {
Revise(cpu, mem int64) ([]*catalog.ObjectEntry, TaskHostKind)
OnObject(*catalog.ObjectEntry)
ResetForTable(*catalog.TableEntry)
}

func NewUpdatePolicyReq(c *BasicPolicyConfig) *api.AlterTableReq {
return &api.AlterTableReq{
Kind: api.AlterKind_UpdatePolicy,
Expand Down
Loading
Loading