Skip to content

Commit

Permalink
compactor: hold lock for a shorter amount of time (#7265)
Browse files Browse the repository at this point in the history
If we are constantly running compactor in a loop then we shouldn't pay
the price of constantly holding the lock in the garbage collection
function. What the lock holding means in practice that we have to wait
two or sometimes even three times the amount it takes to sync metas.
That doesn't make sense since we are running the compactor in a loop and
the compacted blocks are properly taken care of.

Signed-off-by: Giedrius Statkevičius <[email protected]>
  • Loading branch information
GiedriusS committed May 29, 2024
1 parent dfa7dd5 commit a252b24
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 11 deletions.
8 changes: 5 additions & 3 deletions pkg/block/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,8 +585,6 @@ func (f *BaseFetcher) fetch(ctx context.Context, metrics *FetcherMetrics, filter
metrics.SyncFailures.Inc()
}
}()
metrics.Syncs.Inc()
metrics.ResetTx()

// Run this in thread safe run group.
// TODO(bwplotka): Consider custom singleflight with ttl.
Expand Down Expand Up @@ -617,7 +615,6 @@ func (f *BaseFetcher) fetch(ctx context.Context, metrics *FetcherMetrics, filter
}

metrics.Synced.WithLabelValues(LoadedMeta).Set(float64(len(metas)))
metrics.Submit()

if len(resp.metaErrs) > 0 {
return metas, resp.partial, errors.Wrap(resp.metaErrs.Err(), "incomplete view")
Expand Down Expand Up @@ -650,6 +647,9 @@ type MetaFetcher struct {
//
// Returned error indicates a failure in fetching metadata. Returned meta can be assumed as correct, with some blocks missing.
func (f *MetaFetcher) Fetch(ctx context.Context) (metas map[ulid.ULID]*metadata.Meta, partial map[ulid.ULID]error, err error) {
f.metrics.Syncs.Inc()
f.metrics.ResetTx()

metas, partial, err = f.wrapped.fetch(ctx, f.metrics, f.filters)
if f.listener != nil {
blocks := make([]metadata.Meta, 0, len(metas))
Expand All @@ -658,6 +658,8 @@ func (f *MetaFetcher) Fetch(ctx context.Context) (metas map[ulid.ULID]*metadata.
}
f.listener(blocks, err)
}

f.metrics.Submit()
return metas, partial, err
}

Expand Down
25 changes: 17 additions & 8 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/golang/groupcache/singleflight"
"github.com/oklog/ulid"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
Expand Down Expand Up @@ -60,6 +61,8 @@ type Syncer struct {
metrics *SyncerMetrics
duplicateBlocksFilter block.DeduplicateFilter
ignoreDeletionMarkFilter *block.IgnoreDeletionMarkFilter

g singleflight.Group
}

// SyncerMetrics holds metrics tracked by the syncer. This struct and its fields are exported
Expand Down Expand Up @@ -135,15 +138,22 @@ func UntilNextDownsampling(m *metadata.Meta) (time.Duration, error) {

// SyncMetas synchronizes local state of block metas with what we have in the bucket.
func (s *Syncer) SyncMetas(ctx context.Context) error {
s.mtx.Lock()
defer s.mtx.Unlock()
type metasContainer struct {
metas map[ulid.ULID]*metadata.Meta
partial map[ulid.ULID]error
}

metas, partial, err := s.fetcher.Fetch(ctx)
container, err := s.g.Do("", func() (interface{}, error) {
metas, partial, err := s.fetcher.Fetch(ctx)
return metasContainer{metas, partial}, err
})
if err != nil {
return retry(err)
}
s.blocks = metas
s.partial = partial
s.mtx.Lock()
s.blocks = container.(metasContainer).metas
s.partial = container.(metasContainer).partial
s.mtx.Unlock()
return nil
}

Expand Down Expand Up @@ -172,9 +182,6 @@ func (s *Syncer) Metas() map[ulid.ULID]*metadata.Meta {
// block with a higher compaction level.
// Call to SyncMetas function is required to populate duplicateIDs in duplicateBlocksFilter.
func (s *Syncer) GarbageCollect(ctx context.Context) error {
s.mtx.Lock()
defer s.mtx.Unlock()

begin := time.Now()

// Ignore filter exists before deduplicate filter.
Expand Down Expand Up @@ -209,7 +216,9 @@ func (s *Syncer) GarbageCollect(ctx context.Context) error {

// Immediately update our in-memory state so no further call to SyncMetas is needed
// after running garbage collection.
s.mtx.Lock()
delete(s.blocks, id)
s.mtx.Unlock()
s.metrics.GarbageCollectedBlocks.Inc()
}
s.metrics.GarbageCollections.Inc()
Expand Down

0 comments on commit a252b24

Please sign in to comment.