Customize monthly partition #21401
-
I have a situation where I need to customize a monthly partitioned asset as follows:
Another way to describe this is looking at events (rows) that occurred during a month, and any related follow up events (rows) that happened during the month and up until three days after the end of the month. I would thus want to run this asset on the fourth of the month, and it would look at data bounded by the "start_date" (first and last of month), with "end_date" potentially and likely going up until the third of the new month. Is this feasible in dagster? Thanks! |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 1 reply
-
You can use the How you might set this up:
Here's an example of how you might define such an asset: from dagster import asset, MonthlyPartitionsDefinition, AssetExecutionContext
import pandas as pd
from datetime import timedelta
# Define your monthly partitions starting from a specific date
monthly_partition_def = MonthlyPartitionsDefinition(start_date="2023-01-01")
@asset(partitions_def=monthly_partition_def)
def my_monthly_asset(context: AssetExecutionContext) -> pd.DataFrame:
# Get the time window for the current partition
bounds = context.partition_time_window
start_date = bounds.start
# Adjust the end date to be three days after the end of the month
end_date = bounds.end + timedelta(days=3)
# Now you can use start_date and end_date to filter your data accordingly
# For example, if you're querying a database, you would use these dates in your query
# Here's a placeholder for the actual data fetching logic
data = fetch_data(start_date, end_date)
return data
def fetch_data(start_date, end_date) -> pd.DataFrame:
# Placeholder function to represent fetching data between start_date and end_date
# Replace this with your actual data fetching logic
pass To ensure that this asset runs on the fourth of the month, you would set up a schedule or sensor that triggers the materialization of the asset on that specific day. Here's an example of how you might define a schedule: from dagster import schedule
@schedule(cron="0 0 4 * *", job=my_monthly_asset_job, execution_timezone="UTC")
def my_monthly_asset_schedule(_context):
# This function will be called on the fourth of every month at midnight UTC
pass Replace |
Beta Was this translation helpful? Give feedback.
You can use the
MonthlyPartitionsDefinition
class provided by Dagster, and then customize the asset computation to filter data based on the specific time bounds you need.How you might set this up:
Define your monthly partitions with a
MonthlyPartitionsDefinition
that starts on the first of the month.Schedule the asset to run on the fourth of the month. This can be done using a sensor or a schedule that triggers the materialization of the asset.
Within the asset computation, use the
context.partition_time_window
to get the start and end bounds of the partition. Then, adjust the end bound to be three days after the end of the month.Here's an example of how you might define such a…