forked from filecoin-project/dagstore
-
Notifications
You must be signed in to change notification settings - Fork 0
/
accessor.go
144 lines (123 loc) · 4.01 KB
/
accessor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package dagstore
import (
"context"
"fmt"
"io"
"os"
"sync"
"github.com/filecoin-project/dagstore/mount"
"github.com/filecoin-project/dagstore/shard"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
carv2 "github.com/ipld/go-car/v2"
"github.com/ipld/go-car/v2/blockstore"
"github.com/ipld/go-car/v2/index"
"golang.org/x/exp/mmap"
)
// ReadBlockstore is a read-only view of Blockstores. This will be implemented
// by the CARv2 indexed blockstore.
type ReadBlockstore interface {
Has(context.Context, cid.Cid) (bool, error)
Get(context.Context, cid.Cid) (blocks.Block, error)
GetSize(context.Context, cid.Cid) (int, error)
AllKeysChan(ctx context.Context) (<-chan cid.Cid, error)
HashOnRead(enabled bool)
}
// ShardAccessor provides various means to access the data contained
// in a shard.
type ShardAccessor struct {
data mount.Reader
idx index.Index
shard *Shard
// mmapr is an optional mmap.ReaderAt. It will be non-nil if the mount
// has been mmapped because the mount.Reader was an underlying *os.File,
// and an mmap-backed accessor was requested (e.g. Blockstore).
lk sync.Mutex
mmapr *mmap.ReaderAt
}
func NewShardAccessor(data mount.Reader, idx index.Index, s *Shard) (*ShardAccessor, error) {
return &ShardAccessor{
data: data,
idx: idx,
shard: s,
}, nil
}
func (sa *ShardAccessor) Shard() shard.Key {
return sa.shard.key
}
// Reader returns an io.Reader that can be used to read the data from the shard.
func (sa *ShardAccessor) Reader() io.Reader {
return &readerAtWrapper{
readerAt: sa.tryMmap(),
accessor: sa,
}
}
func (sa *ShardAccessor) Blockstore() (ReadBlockstore, error) {
r := &readerAtWrapper{
readerAt: sa.tryMmap(),
accessor: sa,
}
bs, err := blockstore.NewReadOnly(r, sa.idx, carv2.ZeroLengthSectionAsEOF(true))
return bs, err
}
// tryMmap attempts to mmap the file if the underlying data is an *os.File. It returns an
// io.ReaderAt which can be used to read the data. If the operation was successful or the file is
// already mapped , it will return the mmap.ReaderAt. If the memory mapping fails, it falls back to
// the original io.ReaderAt implementation from the mount.Reader and logs a warning message.
// The method is safe for concurrent use.
func (sa *ShardAccessor) tryMmap() io.ReaderAt {
sa.lk.Lock()
defer sa.lk.Unlock()
if sa.mmapr != nil {
return sa.mmapr
}
if f, ok := sa.data.(*os.File); ok {
if mmapr, err := mmap.Open(f.Name()); err != nil {
log.Warnf("failed to mmap reader of type %T: %s; using reader as-is", sa.data, err)
} else {
// we don't close the mount.Reader file descriptor because the user
// may have called other non-mmap-backed accessors.
sa.mmapr = mmapr
return mmapr
}
}
return sa.data
}
// Close terminates this shard accessor, releasing any resources associated
// with it, and decrementing internal refcounts.
func (sa *ShardAccessor) Close() error {
if err := sa.data.Close(); err != nil {
log.Warnf("failed to close mount when closing shard accessor: %s", err)
}
sa.lk.Lock()
if sa.mmapr != nil {
if err := sa.mmapr.Close(); err != nil {
log.Warnf("failed to close mmap when closing shard accessor: %s", err)
}
}
sa.lk.Unlock()
tsk := &task{op: OpShardRelease, shard: sa.shard}
return sa.shard.d.queueTask(tsk, sa.shard.d.externalCh)
}
// readerAtWrapper is a wrapper around an io.ReaderAt that implements io.Reader.
type readerAtWrapper struct {
readerAt io.ReaderAt
readOffset int64
// backreference to the accessor to prevent concurrent ReadAt+Close from mmap
accessor *ShardAccessor
}
func (w *readerAtWrapper) Read(p []byte) (n int, err error) {
w.accessor.lk.Lock()
defer w.accessor.lk.Unlock()
n, err = w.readerAt.ReadAt(p, w.readOffset)
w.readOffset += int64(n)
if err != nil && err != io.EOF {
return n, fmt.Errorf("readerAtWrapper: error reading from the underlying ReaderAt: %w", err)
}
return n, err
}
func (w *readerAtWrapper) ReadAt(p []byte, offset int64) (int, error) {
w.accessor.lk.Lock()
defer w.accessor.lk.Unlock()
return w.readerAt.ReadAt(p, offset)
}