Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Asset Versioning with partitioned assets does not detect out-of-sync assets after incrementing code_version #22704

Open
sam-goodwin opened this issue Jun 25, 2024 · 11 comments
Labels
area: asset Related to Software-Defined Assets area: partitions Related to Partitions type: bug Something isn't working

Comments

@sam-goodwin
Copy link

sam-goodwin commented Jun 25, 2024

Dagster version

dagster, version 1.7.10

What's the issue?

Asset versioning doesn't properly detect when a partitioned asset is out of sync after incrementing its version or an upstream asset's version. It always says "everything is up to date".

If I remove the partitions_def, everything works as expected. The problems seems to be isolated to partitioned assets.

What did you expect to happen?

Every time I increment code_version in the second asset, I expect it to be displayed as un-synced (it is not)

image

I expected clicking "materialize un-sycned" to provide me a list with the "second" asset and not the "first" asset, but it tells me all assets are up to date:

image

How to reproduce?

This is the simple repro I am using to test partitions + asset versioning.

from dagster import StaticPartitionsDefinition, asset

test_partitions = StaticPartitionsDefinition(["first", "second"])


@asset(
    code_version="v1",
    partitions_def=test_partitions,
)
def first():
    return "first"


@asset(
    code_version="v4", # i keep incrementing this, sometimes it is detected as un-synced, sometimes not
    partitions_def=test_partitions,
)
def second(first):
    return first + " second"

Deployment type

None

Deployment details

I am just running dagster dev with the following config:

storage:
  sqlite:
    base_dir:
      env: DAGSTER_DATA_BASE_DIR

Additional information

No response

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.

@sam-goodwin sam-goodwin added the type: bug Something isn't working label Jun 25, 2024
@sam-goodwin
Copy link
Author

I just tried updating first to v2 and it still says everything is up to date ...

@asset(
    code_version="v2",
    partitions_def=test_partitions,
)
def first():
    return "first"

Screenshot 2024-06-25 at 11 04 45 AM

@sam-goodwin
Copy link
Author

I also get this error

image

When selecting "only backfill missing or failed assets" and clicking "preview":

image

@sam-goodwin
Copy link
Author

sam-goodwin commented Jun 25, 2024

I can confirm that everything behaves as expected in a non-partitioned job. I don't see this limitation anywhere on the docs, however.

@sam-goodwin sam-goodwin changed the title Asset Versioning with Partitioned Assets not correctly computing unsynced dependencies Asset Versioning and Partitioning does not detect out-of-sync assets after incrementing code_version Jun 25, 2024
@sam-goodwin sam-goodwin changed the title Asset Versioning and Partitioning does not detect out-of-sync assets after incrementing code_version Asset Versioning with partitioned assets does not detect out-of-sync assets after incrementing code_version Jun 25, 2024
@garethbrickman garethbrickman added area: partitions Related to Partitions area: asset Related to Software-Defined Assets labels Jun 26, 2024
@garethbrickman
Copy link
Contributor

Could be related: #22553

@ClaytonSmith
Copy link

out of sync partitions has been a huuuge issue for me. Now showing downstream partitions as being out of sync has been deeply impactful (in a bad way). Not just when incrementing the version but also just updating parent partitions

@ClaytonSmith
Copy link

I just found out Dagster IS aware of out of sync partitions

image

Please, someone let me know what needs to happen to get a fix for this. These green dots must be yellow. The ability to rebuild outdated partitions is soooooo valuable.

@sam-goodwin
Copy link
Author

Is it just the UI that's broken or is there a fundamental problem in the backend?

@sam-goodwin
Copy link
Author

This query returns the correct information indicating that dagster does know which assets are stale for each partition.

query AssetsByGroup($groupName: String!) {
  assetNodes(group: {
    groupName: $groupName,
    repositoryName:"__repository__",
    repositoryLocationName:"your_pkg.defs"
  }) {
    id
    assetKey {
      path
    }
    staleStatusByPartition(partitions:[
      "first",
      "second"
    ])
  }
}

What I am unsure about is how to launch a materialization for many partitions and have each run only include the un-synced assets for each partition.

@sam-goodwin
Copy link
Author

Looks like the GraphQL schema is built with a strong coupling to time-based partitions (which does not align with my system):

input PartitionsByAssetSelector {
  assetKey: AssetKeyInput!
  partitions: PartitionsSelector
}

input PartitionsSelector {
  range: PartitionRangeSelector!
}

When launching a backfill, you can't specify a list of partitions per asset. You can only specify a range. I am seeing this over-fitting to time-based partitions a lot in Dagster's design.

@sam-goodwin
Copy link
Author

sam-goodwin commented Jul 2, 2024

Oh actually, it looks like I can use assetSelection and partitionNames along with batching to achieve this behavior.

Here's a prototype that materializes stale partitions of assets in a group: https://gist.github.com/sam-goodwin/d8dd76ad58a241cdb14deba9cb53c2bf

Note

It makes the assumption that the partitioning scheme of each asset in a group is the same (this may not be true for you)

@sam-goodwin
Copy link
Author

Just discovered that the following GraphQL query is extremely slow and can't be executed in parallel because it will crash dagster's SQL database:

query AssetStaleStatus(
    $groupName: String!,
    $assetKey: AssetKeyInput!,
    $partitionKeys: [String!]!,
    $repositoryLocation: String!
) {
  assetNodes(group: {
    groupName: $groupName,
    repositoryName:"__repository__",
    repositoryLocationName: $repositoryLocation
  }, assetKeys: [$assetKey]) {
    id
    staleStatusByPartition(partitions: $partitionKeys)
  }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area: asset Related to Software-Defined Assets area: partitions Related to Partitions type: bug Something isn't working
Projects
None yet
Development

No branches or pull requests

3 participants