Skip to content

Commit

Permalink
Fix date range sent to Slack from reingestion workflows (#4093)
Browse files Browse the repository at this point in the history
Co-authored-by: Madison Swain-Bowden <[email protected]>
  • Loading branch information
BaileyMcKelway and AetherUnbound committed Apr 18, 2024
1 parent b3520a7 commit c78bac3
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 11 deletions.
3 changes: 3 additions & 0 deletions catalog/dags/common/loader/reporting.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ def report_completion(
dated: bool = False,
date_range_start: str | None = None,
date_range_end: str | None = None,
is_reingestion_workflow: bool = False,
) -> str:
"""
Send a Slack notification when the load_data task has completed.
Expand Down Expand Up @@ -168,6 +169,8 @@ def report_completion(
date_range = "_all_"
if dated:
date_range = f"{date_range_start} -> {date_range_end}"
if is_reingestion_workflow:
date_range = "_multi-time period spread_"

# Collect data into a single message
message = f"""
Expand Down
29 changes: 18 additions & 11 deletions catalog/dags/providers/provider_dag_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,20 +343,27 @@ def create_report_load_completion(
ingestion_metrics,
dated,
):
is_reingestion_workflow = "reingestion" in dag_id

op_kwargs = {
"dag_id": dag_id,
"media_types": media_types,
"duration": ingestion_metrics["duration"],
"record_counts_by_media_type": ingestion_metrics["record_counts_by_media_type"],
"dated": dated,
"is_reingestion_workflow": is_reingestion_workflow,
}

if not is_reingestion_workflow:
op_kwargs = op_kwargs | {
"date_range_start": "{{ data_interval_start | ds }}",
"date_range_end": "{{ data_interval_end | ds }}",
}

return PythonOperator(
task_id="report_load_completion",
python_callable=reporting.report_completion,
op_kwargs={
"dag_id": dag_id,
"media_types": media_types,
"duration": ingestion_metrics["duration"],
"record_counts_by_media_type": ingestion_metrics[
"record_counts_by_media_type"
],
"dated": dated,
"date_range_start": "{{ data_interval_start | ds }}",
"date_range_end": "{{ data_interval_end | ds }}",
},
op_kwargs=op_kwargs,
trigger_rule=TriggerRule.ALL_DONE,
)

Expand Down
34 changes: 34 additions & 0 deletions catalog/tests/dags/providers/test_provider_dag_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,3 +208,37 @@ def test_get_overrides_for_task(task_id, expected_overrides):
task_id, overrides
)
assert actual_task_overrides == expected_overrides


def test_create_report_load_completion_for_reingestion_dag():
dag_id = "test_provider_reingestion_workflow"
media_types = ["image"]
ingestion_metrics = {
"duration": "1:00:00",
"record_counts_by_media_type": {"image": 100},
}
dated = False

with mock.patch(
"providers.provider_dag_factory.reporting.report_completion"
) as mock_report_completion:
report_task = provider_dag_factory.create_report_load_completion(
dag_id, media_types, ingestion_metrics, dated
)

assert report_task.task_id == "report_load_completion"
assert report_task.python_callable == mock_report_completion

assert report_task.op_kwargs == {
"dag_id": dag_id,
"media_types": media_types,
"duration": ingestion_metrics["duration"],
"record_counts_by_media_type": ingestion_metrics[
"record_counts_by_media_type"
],
"dated": dated,
"is_reingestion_workflow": True,
}

assert "date_range_start" not in report_task.op_kwargs
assert "date_range_end" not in report_task.op_kwargs

0 comments on commit c78bac3

Please sign in to comment.