Skip to content

Commit

Permalink
feat(bigquery): adds sql query for VIEW and MATERIALIZED VIEW in asse…
Browse files Browse the repository at this point in the history
…t for BQ (#488)

* feat: group comamnds

* feat(bigquery): adds sql query for VIEW and MATERIALIZED VIEW in asset for BQ
  • Loading branch information
ravisuhag committed May 29, 2024
1 parent d564772 commit 0ff71e6
Showing 1 changed file with 30 additions and 27 deletions.
57 changes: 30 additions & 27 deletions plugins/extractors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,44 +415,47 @@ func (e *Extractor) buildAsset(ctx context.Context, t *bigquery.Table, md *bigqu
}
}

table, err := anypb.New(&v1beta2.Table{
Columns: e.buildColumns(ctx, md.Schema, md),
PreviewFields: previewFields,
PreviewRows: previewRows,
Profile: tableProfile,
Attributes: utils.TryParseMapToProto(map[string]interface{}{
"full_qualified_name": tableFQN,
"dataset": t.DatasetID,
"project": t.ProjectID,
"type": string(md.Type),
"partition_data": partitionData,
"clustering_fields": clusteringFields,
}),
CreateTime: timestamppb.New(md.CreationTime),
UpdateTime: timestamppb.New(md.LastModifiedTime),
})
if err != nil {
e.logger.Warn("error creating Any struct", "error", err)
}

asset := &v1beta2.Asset{
Urn: tableURN,
Name: t.TableID,
Type: "table",
Description: md.Description,
Service: "bigquery",
Data: table,
Labels: md.Labels,
// Data: table,
Labels: md.Labels,
}
attributesData := map[string]interface{}{
"full_qualified_name": tableFQN,
"dataset": t.DatasetID,
"project": t.ProjectID,
"type": string(md.Type),
"partition_data": partitionData,
"clustering_fields": clusteringFields,
}

if e.config.BuildViewLineage && (md.Type == bigquery.ViewTable || md.Type == bigquery.MaterializedView) {
if md.Type == bigquery.ViewTable || md.Type == bigquery.MaterializedView {
query := getViewQuery(md)
upstreamResources := getUpstreamResources(query)
asset.Lineage = &v1beta2.Lineage{
Upstreams: upstreamResources,
attributesData["sql"] = query
if e.config.BuildViewLineage {
upstreamResources := getUpstreamResources(query)
asset.Lineage = &v1beta2.Lineage{
Upstreams: upstreamResources,
}
}
}

table, err := anypb.New(&v1beta2.Table{
Columns: e.buildColumns(ctx, md.Schema, md),
PreviewFields: previewFields,
PreviewRows: previewRows,
Profile: tableProfile,
Attributes: utils.TryParseMapToProto(attributesData),
CreateTime: timestamppb.New(md.CreationTime),
UpdateTime: timestamppb.New(md.LastModifiedTime),
})
if err != nil {
e.logger.Warn("error creating Any struct", "error", err)
}
asset.Data = table
return asset, nil
}

Expand Down

0 comments on commit 0ff71e6

Please sign in to comment.