Skip to content

Commit

Permalink
perf(dump): replace indexed batch with prefixed keys
Browse files Browse the repository at this point in the history
  • Loading branch information
adrienaury committed Mar 27, 2024
1 parent ba2a17c commit 6bfdb1d
Showing 1 changed file with 38 additions and 9 deletions.
47 changes: 38 additions & 9 deletions internal/infra/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ import (
"github.com/rs/zerolog/log"
)

var (
prefixPulled = []byte{0} //nolint:gochecknoglobals
prefixNormal = []byte{1} //nolint:gochecknoglobals
)

func decode(value []byte) ([]silo.DataNode, error) {
var set map[silo.DataNode]any

Expand Down Expand Up @@ -68,11 +73,11 @@ func encode(items []silo.DataNode) ([]byte, error) {
}

type Snapshot struct {
db *pebble.Batch
db *pebble.DB
}

func (s Snapshot) Next() (silo.DataNode, bool, error) {
iter, err := s.db.NewIter(&pebble.IterOptions{}) //nolint:exhaustruct
iter, err := s.db.NewIter(&pebble.IterOptions{LowerBound: prefixNormal}) //nolint:exhaustruct
if errors.Is(err, pebble.ErrNotFound) {
return silo.DataNode{Key: "", Data: ""}, false, nil
} else if err != nil {
Expand All @@ -85,7 +90,7 @@ func (s Snapshot) Next() (silo.DataNode, bool, error) {
return silo.DataNode{Key: "", Data: ""}, false, nil
}

key, err := silo.DecodeDataNode(iter.Key())
key, err := silo.DecodeDataNode(iter.Key()[1:])
if err != nil {
return silo.DataNode{Key: "", Data: ""}, false, fmt.Errorf("%w", err)
}
Expand All @@ -99,7 +104,10 @@ func (s Snapshot) PullAll(node silo.DataNode) ([]silo.DataNode, error) {
return nil, fmt.Errorf("%w", err)
}

item, closer, err := s.db.Get(key)
keyNormal := append(prefixNormal, key...) //nolint:gocritic
keyPulled := append(prefixPulled, key...) //nolint:gocritic

item, closer, err := s.db.Get(keyNormal)
if errors.Is(err, pebble.ErrNotFound) {
return []silo.DataNode{}, nil
} else if err != nil {
Expand All @@ -112,18 +120,39 @@ func (s Snapshot) PullAll(node silo.DataNode) ([]silo.DataNode, error) {
return nil, fmt.Errorf("%w", err)
}

if err := s.db.Delete(key, pebble.NoSync); err != nil {
if err := s.db.Set(keyPulled, item, pebble.NoSync); err != nil {
return nil, fmt.Errorf("%w", err)
}

if err := s.db.Delete(keyNormal, pebble.NoSync); err != nil {
return nil, fmt.Errorf("%w", err)
}

return set, nil
}

func (s Snapshot) Close() error {
if err := s.db.Close(); err != nil {
iter, err := s.db.NewIter(&pebble.IterOptions{UpperBound: prefixNormal}) //nolint:exhaustruct
if err != nil && !errors.Is(err, pebble.ErrNotFound) {
return fmt.Errorf("%w", err)
}

defer iter.Close()

for iter.First(); iter.Valid(); iter.Next() {
key := iter.Key()[1:]
keyNormal := append(prefixNormal, key...) //nolint:gocritic
keyPulled := append(prefixPulled, key...) //nolint:gocritic

if err := s.db.Set(keyNormal, iter.Value(), pebble.NoSync); err != nil {
return fmt.Errorf("%w", err)
}

if err := s.db.Delete(keyPulled, pebble.NoSync); err != nil {
return fmt.Errorf("%w", err)
}
}

return nil
}

Expand All @@ -137,7 +166,7 @@ func (b Backend) Get(node silo.DataNode) ([]silo.DataNode, error) {
return nil, fmt.Errorf("%w", err)
}

item, closer, err := b.db.Get(key)
item, closer, err := b.db.Get(append(prefixNormal, key...))
if errors.Is(err, pebble.ErrNotFound) {
return []silo.DataNode{}, nil
} else if err != nil {
Expand Down Expand Up @@ -167,15 +196,15 @@ func (b Backend) Store(key silo.DataNode, value silo.DataNode) error {
return fmt.Errorf("%w", err)
}

if err := b.db.Set(rawKey, rawNodes, pebble.NoSync); err != nil {
if err := b.db.Set(append(prefixNormal, rawKey...), rawNodes, pebble.NoSync); err != nil {
return fmt.Errorf("%w", err)
}

return nil
}

func (b Backend) Snapshot() silo.Snapshot { //nolint:ireturn
return Snapshot{b.db.NewIndexedBatch()}
return Snapshot(b)
}

func (b Backend) Close() error {
Expand Down

0 comments on commit 6bfdb1d

Please sign in to comment.