Zum Inhalt springen
_CORE
AI & Agentic Systems Core Information Systems Cloud & Platform Engineering Data Platform & Integration Security & Compliance QA, Testing & Observability IoT, Automation & Robotics Mobile & Digital Banking & Finance Insurance Public Administration Defense & Security Healthcare Energy & Utilities Telco & Media Manufacturing Logistics & E-commerce Retail & Loyalty
Referenzen Technologien Blog Know-how Tools
Über uns Zusammenarbeit Karriere
CS EN DE
Lassen Sie uns sprechen

Dagster — moderní orchestrace s asset-based přístupem

16. 03. 2024 1 Min. Lesezeit intermediate

Dagster staví na konceptu software-defined assets. Místo tasků popisujete datové assety a Dagster automaticky odvozuje pipeline.

Proč Dagster

Dagster se zaměřuje na assety (co vytvořit), ne operace (co provést).

Software-Defined Assets

from dagster import asset
import pandas as pd

@asset(group_name="raw")
def raw_orders() -> pd.DataFrame:
    return pd.read_sql("SELECT * FROM orders", conn)

@asset(group_name="staging")
def clean_orders(raw_orders: pd.DataFrame) -> pd.DataFrame:
    df = raw_orders.copy()
    df['total_czk'] = df['total_eur'] * 25.2
    return df.dropna(subset=['customer_id'])

@asset(group_name="marts")
def daily_revenue(clean_orders: pd.DataFrame) -> pd.DataFrame:
    return clean_orders.groupby('order_date').agg(
        revenue=('total_czk', 'sum'), orders=('order_id', 'count')
    ).reset_index()

Asset checks

from dagster import asset_check, AssetCheckResult

@asset_check(asset=clean_orders)
def no_negative_amounts(clean_orders):
    neg = clean_orders[clean_orders['total_czk'] < 0]
    return AssetCheckResult(passed=len(neg) == 0)

Shrnutí

Dagster je ideální pro asset-oriented datové platformy. Software-defined assets, vestavěné testy a partitioning.

dagsterorchestracedata assetspipeline
Teilen:

CORE SYSTEMS tým

Stavíme core systémy a AI agenty, které drží provoz. 15 let zkušeností s enterprise IT.