Skip to content

Commit

Permalink
add txn table delegate for sharding (#17341)
Browse files Browse the repository at this point in the history
Add txn table delegate for sharding.

Approved by: @triump2020
  • Loading branch information
zhangxu19830126 committed Jul 5, 2024
1 parent 94b6f88 commit 6d5b09a
Show file tree
Hide file tree
Showing 8 changed files with 432 additions and 213 deletions.
70 changes: 70 additions & 0 deletions pkg/shardservice/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ func (s *service) Close() error {
return s.remote.client.Close()
}

func (s *service) Config() Config {
return s.cfg
}

func (s *service) Create(
ctx context.Context,
table uint64,
Expand Down Expand Up @@ -212,6 +216,56 @@ func (s *service) Delete(
return nil
}

func (s *service) HasLocalReplica(
tableID, shardID uint64,
) bool {
has := false
s.getReadCache().selectShards(
tableID,
func(
metadata pb.ShardsMetadata,
shard pb.TableShard,
) bool {
if shard.ShardID != shardID {
return true
}

for _, replica := range shard.Replicas {
has = s.isLocalReplica(replica)
if has {
break
}
}
return !has
},
)
return has
}

func (s *service) HasAllLocalReplicas(
tableID uint64,
) bool {
total := 0
local := 0
s.getReadCache().selectShards(
tableID,
func(
metadata pb.ShardsMetadata,
shard pb.TableShard,
) bool {
total = int(metadata.ShardsCount)
for _, replica := range shard.Replicas {
s.isLocalReplica(replica)
local++
break
}

return true
},
)
return total > 0 && total == local
}

func (s *service) GetShardInfo(
table uint64,
) (uint64, pb.Policy, bool, error) {
Expand Down Expand Up @@ -795,6 +849,22 @@ func (c *readCache) selectReplicas(
sc.selectReplicas(apply)
}

func (c *readCache) selectShards(
tableID uint64,
apply func(pb.ShardsMetadata, pb.TableShard) bool,
) {
sc, ok := c.shards[tableID]
if !ok {
panic("shards is empty")
}

for _, shard := range sc.shards {
if !apply(sc.metadata, shard) {
return
}
}
}

func (c *readCache) hasTableCache(
tableID uint64,
) bool {
Expand Down
15 changes: 13 additions & 2 deletions pkg/shardservice/storage_mo.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,16 @@ func (s *storage) Get(
return err
}

// For partition table, the origin table id is the shard table id, and
// the partition table id is the shard id.
//
// For normal table, the origin table id is the shard table id, and the
// shard id is a int value that calculated by the shard policy.
//
// Metadata is not found by the special table id means the table is not
// sharding table or is a partition table. We need use the table id as
// shard id to get sharding metadata.
if metadata.IsEmpty() {
// maybe the shard table is partition, and the table id is the
// shard id.
v, err := getTableIDByShardID(
table,
pb.Policy_Partition.String(),
Expand Down Expand Up @@ -217,6 +224,10 @@ func (s *storage) Create(
func(
txn executor.TxnExecutor,
) error {
// Currently we only support partition policy.
// If the current table is a non partition
// table, we should not create sharding metadata
// for the current table.
partitions, err := readPartitionIDs(
table,
txn,
Expand Down
7 changes: 6 additions & 1 deletion pkg/shardservice/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,14 @@ type ShardServer interface {
// ShardService is sharding service. Each CN node holds an instance of the
// ShardService.
type ShardService interface {
// GetConfig returns the configuration of the shard service.
Config() Config
// Read read data from shards.
Read(ctx context.Context, req ReadRequest, opts ReadOptions) error

// HasLocalReplica returns whether the shard has a local replica.
HasLocalReplica(tableID, shardID uint64) bool
// HasAllLocalReplicas returns whether all shards of the table have local replicas.
HasAllLocalReplicas(tableID uint64) bool
// GetShardInfo returns the metadata of the shards corresponding to the table.
GetShardInfo(table uint64) (uint64, pb.Policy, bool, error)
// Create creates table shards metadata in current txn. And create shard
Expand Down
23 changes: 17 additions & 6 deletions pkg/vm/engine/disttae/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,12 @@ func (txn *Transaction) dumpBatchLocked(offset int) error {
blockInfo.Attrs = blockInfo.Attrs[lenVecs-2:]
blockInfo.SetRowCount(blockInfo.Vecs[0].Length())

table := tbl.(*txnTable)
var table *txnTable
if v, ok := tbl.(*txnTableDelegate); ok {
table = v.origin
} else {
table = tbl.(*txnTable)
}
fileName := objectio.DecodeBlockInfo(
blockInfo.Vecs[0].GetBytesAt(0)).
MetaLocation().Name().String()
Expand Down Expand Up @@ -974,7 +979,13 @@ func (txn *Transaction) forEachTableHasDeletesLocked(f func(tbl *txnTable) error
if err != nil {
return err
}
tables[e.tableId] = rel.(*txnTable)

if v, ok := rel.(*txnTableDelegate); ok {
tables[e.tableId] = v.origin
} else {
tables[e.tableId] = rel.(*txnTable)
}

}
for _, tbl := range tables {
if err := f(tbl); err != nil {
Expand Down Expand Up @@ -1005,10 +1016,10 @@ func (txn *Transaction) forEachTableWrites(databaseId uint64, tableId uint64, of
func (txn *Transaction) getCachedTable(
ctx context.Context,
k tableKey,
) *txnTable {
var tbl *txnTable
) *txnTableDelegate {
var tbl *txnTableDelegate
if v, ok := txn.tableCache.tableMap.Load(k); ok {
tbl = v.(*txnTable)
tbl = v.(*txnTableDelegate)

tblKey := cache.TableKey{
AccountId: k.accountId,
Expand All @@ -1029,7 +1040,7 @@ func (txn *Transaction) getCachedTable(
}
val := catache.GetSchemaVersion(tblKey)
if val != nil {
if val.Ts.Greater(tbl.lastTS) && val.Version != tbl.version {
if val.Ts.Greater(tbl.origin.lastTS) && val.Version != tbl.origin.version {
txn.tableCache.tableMap.Delete(genTableKey(k.accountId, k.name, k.databaseId))
return nil
}
Expand Down
14 changes: 11 additions & 3 deletions pkg/vm/engine/disttae/txn_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/matrixorigin/matrixone/pkg/logutil"
txn2 "github.com/matrixorigin/matrixone/pkg/pb/txn"
"github.com/matrixorigin/matrixone/pkg/shardservice"
"github.com/matrixorigin/matrixone/pkg/txn/client"

"github.com/matrixorigin/matrixone/pkg/catalog"
Expand Down Expand Up @@ -181,7 +182,7 @@ func (db *txnDatabase) Relation(ctx context.Context, name string, proc any) (eng

rel := db.getTxn().getCachedTable(ctx, key)
if rel != nil {
rel.proc.Store(p)
rel.origin.proc.Store(p)
return rel, nil
}

Expand Down Expand Up @@ -219,11 +220,15 @@ func (db *txnDatabase) Relation(ctx context.Context, name string, proc any) (eng
return nil, err
}

tbl := newTxnTableWithItem(
tbl, err := newTxnTable(
db,
item,
p,
shardservice.GetService(),
)
if err != nil {
return nil, err
}

db.getTxn().tableCache.tableMap.Store(key, tbl)
return tbl, nil
Expand Down Expand Up @@ -261,7 +266,7 @@ func (db *txnDatabase) Delete(ctx context.Context, name string) error {
a table t1 there after commit.
*/
} else if v, ok := db.getTxn().tableCache.tableMap.Load(k); ok {
table := v.(*txnTable)
table := v.(*txnTableDelegate).origin
id = table.tableId
db.getTxn().tableCache.tableMap.Delete(k)
rowid = table.rowid
Expand Down Expand Up @@ -336,6 +341,9 @@ func (db *txnDatabase) Truncate(ctx context.Context, name string) (uint64, error
v, ok = db.getTxn().createMap.Load(k)
if !ok {
v, ok = db.getTxn().tableCache.tableMap.Load(k)
if ok {
v = v.(*txnTableDelegate).origin
}
}

if ok {
Expand Down
15 changes: 9 additions & 6 deletions pkg/vm/engine/disttae/txn_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -1240,7 +1240,7 @@ func (tbl *txnTable) TableRenameInTxn(ctx context.Context, constraint [][]byte)
panic("The table object in createMap should be the current table object")
}
} else if value, ok := tbl.db.getTxn().tableCache.tableMap.Load(key); ok {
table := value.(*txnTable)
table := value.(*txnTableDelegate).origin
id = table.tableId
rowid = table.rowid
rowids = table.rowids
Expand Down Expand Up @@ -1665,15 +1665,18 @@ func (tbl *txnTable) NewReader(
orderedScan bool,
txnOffset int,
) ([]engine.Reader, error) {
if plan2.IsFalseExpr(expr) {
return []engine.Reader{new(emptyReader)}, nil
}

proc := tbl.proc.Load()
txn := tbl.getTxn()
ts := txn.op.SnapshotTS()
state, err := tbl.getPartitionState(ctx)
if err != nil {
return nil, err
}

proc := tbl.proc.Load()

baseFilter := newBasePKFilter(
expr,
tbl.tableDef,
Expand All @@ -1694,7 +1697,7 @@ func (tbl *txnTable) NewReader(
)

blkArray := objectio.BlockInfoSlice(ranges)
if !memFilter.isValid || plan2.IsFalseExpr(expr) {
if !memFilter.isValid {
return []engine.Reader{new(emptyReader)}, nil
}
if blkArray.Len() == 0 {
Expand Down Expand Up @@ -1727,7 +1730,7 @@ func (tbl *txnTable) NewReader(
}

if len(cleanBlks) > 0 {
rds0, err = tbl.newBlockReader(ctx, num, expr, blockFilter, cleanBlks, tbl.proc.Load(), orderedScan)
rds0, err = tbl.newBlockReader(ctx, num, expr, blockFilter, cleanBlks, proc, orderedScan)
if err != nil {
return nil, err
}
Expand All @@ -1745,7 +1748,7 @@ func (tbl *txnTable) NewReader(
for i := 0; i < blkArray.Len(); i++ {
blkInfos = append(blkInfos, blkArray.Get(i))
}
return tbl.newBlockReader(ctx, num, expr, blockFilter, blkInfos, tbl.proc.Load(), orderedScan)
return tbl.newBlockReader(ctx, num, expr, blockFilter, blkInfos, proc, orderedScan)
}

func (tbl *txnTable) newMergeReader(
Expand Down
Loading

0 comments on commit 6d5b09a

Please sign in to comment.