Create dynamic assets on runtime #16938
-
It feels like Dagster has been pushing for software-defined assed directions (which is a right direction in my opinion) but I am still struggling to wrap my head about how to do some stuff being used to a more op/job mindset. Right now it's kind of easy to imlpement the following logic using ops/jobs/graphs:
The pseudo code is something along the lines (please understand that is pseudo code) import os
from dagster import op, job, DynamicOut, DynamicOutput
@op(out=DynamicOut())
def get_client_credentials(context):
"""
Creates downstream op for every client whose credentials are on the creds/ path.
"""
for creds_file in os.listdir("/creds/"):
with open(creds_file, 'r') as fr:
yield DynamicOut(value=fr.read(),mapping_key=creds_file)
@op
def get_invoices(creds:str) -> pd.DataFrame:
"""
Extracts credentials to connect to a database and returns a dataframe as the result from the invoice's query.
"""
user,pwd = creds.split(",")
connection = connect(user,pwd)
return connection.query(f"SELECT * FROM invoices WHERE client_name={creds_file}")
@op
def get_max_invoice(context,invoices:pd.DataFrame) -> None:
"""
Stores in S3 on a specific path per client the max invoice in the invoices dataframe.
"""
bucket = boto.s3()
invoices_max = invoices.max()
bucket.write(path=f"{context.get_mapping_key()}/max",data=invoices_max)
@job
def put_max_to_s3():
clients_creds = get_client_credentials()
client_creds.map(get_invoices).map(get_max_invoice) This has several benefits, so I can add as many clients as I want to the folder Now I am not sure how to implement this althought I have been looking at some discussions such as #9559 but I have really struggled to understand the examples and definitions given. And also saw some comments such as
What's the right mindset to approach this based on the software asset definitions? EDIT Just came across #11045, but still would be interested on having a more similiar example to the one I posted, given that if you really need to generate multiple linked assets it seems a bit complex as show in 11045, and still have some flaws: not sure how to access the context inside the asset (given that it should be passed to the factory?) What happens if I want to create dynamically two assets that are dependent between them? I cant pass them as argument to the function (Python does not allow the creation of dynamic arguments in its functions). Similarly #15087 shows what seem to be a too complex way to do it given how easy is to do so with ops. |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 5 replies
-
@AlejandroUPC I believe this is the issue where we're tracking this functionality: #9559. Does what's described there match what you're looking for? |
Beta Was this translation helpful? Give feedback.
In general, if you have large numbers of objects with the same structure, we recommend using partitions instead of using a separate asset for each object. This makes it easier to perform bulk operations like backfills, as well as makes the UI more scalable to navigate.
Here's some content on this: #12061.