Dagster basiert auf dem Konzept der Software-defined Assets. Statt Tasks beschreiben Sie Daten-Assets und Dagster leitet automatisch die Pipeline ab.
Warum Dagster¶
Dagster konzentriert sich auf Assets (was erstellt werden soll), nicht auf Operationen (was ausgeführt werden soll).
Software-Defined Assets¶
from dagster import asset
import pandas as pd
@asset(group_name="raw")
def raw_orders() -> pd.DataFrame:
return pd.read_sql("SELECT * FROM orders", conn)
@asset(group_name="staging")
def clean_orders(raw_orders: pd.DataFrame) -> pd.DataFrame:
df = raw_orders.copy()
df['total_czk'] = df['total_eur'] * 25.2
return df.dropna(subset=['customer_id'])
@asset(group_name="marts")
def daily_revenue(clean_orders: pd.DataFrame) -> pd.DataFrame:
return clean_orders.groupby('order_date').agg(
revenue=('total_czk', 'sum'), orders=('order_id', 'count')
).reset_index()
Asset Checks¶
from dagster import asset_check, AssetCheckResult
@asset_check(asset=clean_orders)
def no_negative_amounts(clean_orders):
neg = clean_orders[clean_orders['total_czk'] < 0]
return AssetCheckResult(passed=len(neg) == 0)
Zusammenfassung¶
Dagster ist ideal für asset-orientierte Datenplattformen. Software-defined Assets, integrierte Tests und Partitionierung.
dagsterOrchestrierungdata assetspipeline