</> Airflow & Orchestration

0 / 16 section completed
Section 00

Airflow & Orchestration Airflow Delta Lake

Ez a kurzus az Apache Airflow alapjait és a data pipeline orchestráció legfontosabb fogalmait vezeti be, a WebShop Pro e-kereskedelmi projekt köré építve. Megtanulod, hogyan ütemezd, monitorozd és kezeld a hibákat az adatpipeline-okban úgy, hogy azok megbízhatóan és reprodukálhatóan fussanak nap mint nap.

Miért fontos az orchestráció? Egy modern adatplatformban tucatnyi feladat fut párhuzamosan: adatok betöltése, tisztítása, transzformálása, minőség-ellenőrzése és riportolása. Ha ezek közül bármelyik elromlik vagy késik, az teljes láncolatotakadhat meg. Az Airflow pontosan ezt a problémát oldja meg: központilag vezérli a feladatok végrehajtási sorrendjét, kezeli az újrapróbálkozásokat, és értesít, ha valami balul sül el.

A kurzus menete: először megismered az Airflow architektúráját és a DAG (Directed Acyclic Graph) fogalmát, majd lépésről lépésre felépítünk egy teljes medallion architektúrájú ETL pipeline-t – bronze (nyers adatok), silver (tisztított) és gold (analitikai) rétegekkel. Minden section egy-egy új Airflow képességet ad a eszköztáradhoz: operátorok, XCom, sensorok, retry-logika, backfill és a modern TaskFlow API.

Tipp: A legjobb módja a tanulásnak, ha minden kódrészletet magad is lefuttatod egy saját Airflow környezetben. Használd az airflow standalone parancsot egy gyors helyi tesztkörnyezet felépítéséhez, és kövesd nyomon a DAG-ok futását a webes felületen (localhost:8080).

Tartalom

DAG, operator, sensor, schedule, retry, backfill, XCom, connections

Projekt

Napi webshop ETL DAG bronze/silver/gold rétegekkel

Szójegyzék

DAG · Sensor · Backfill

Section 01

Airflow architektúra

Az Apache Airflow négy fő komponensből áll, amelyek együtt biztosítják a DAG-ok értelmezését, ütemezését és végrehajtását. A Scheduler folyamatosan elemzi a DAG mappában lévő Python fájlokat, felismeri a feladatfüggőségeket, és elindítja a task-okat, ha az ütemezési feltételek teljesülnek. A Webserver egy Flask-alapú webes felület, ahol vizuálisan monitorozhatod a DAG-ok állapotát, kezelheted a kapcsolatokat és változókat, valamint újraindíthatod a sikertelen task-okat.

Az Executor határozza meg, hol és hogyan futnak a task-ok. A legegyszerűbb a SequentialExecutor (egy task egyszerre, fejlesztésre), de éles környezetben a CeleryExecutor vagy a KubernetesExecutor a szabvány, amelyek párhuzamosítást és skálázhatóságot biztosítanak. A Metadata DB (általában PostgreSQL) tárolja a DAG-ok, task-példányok, kapcsolatok, változók és futási előzmények összes metaadatát – ez az Airflow „emlékezete".

A fenti kódrészlet az airflow standalone parancsot mutatja, amely egyetlen paranccsal elindítja az összes komponenst: schedulert, webservert és egy helyi executort. Ez a leggyorsabb módja a fejlesztési környezet felépítésének, de élesben érdemes Docker Compose vagy Kubernetes segítségével külön erőforrásokat fenntartani az egyes komponenseknek.

Tipp: Az Airflow webes felülete alapértelmezetten a 8080-as porton fut. Ha az airflow standalone parancsot használod, a naplóban látni fogod a pontos URL-t és az automatikusan generált admin jelszót. Mindig ellenőrizd a konzol kimenetét!

KomponensFeladat
SchedulerDAG-ok elemzése, taskok ütemezése
WebserverWeb UI a monitorozáshoz
ExecutorTaskok futtatása
Metadata DBÁllapot tárolás (Postgres)
WorkerA tényleges futtatás
[1]
# Start Airflow standalone
airflow standalone

# Access Web UI
# http://localhost:8081
# Default: admin / admin
Output:
Airflow webserver running at 0.0.0.0:8080
Airflow scheduler started
Section 02

Első DAG

A DAG (Directed Acyclic Graph) az Airflow központi fogalma: egy Python fájl, amely meghatározza a feladatok (task-ok) halmazát és azok végrehajtási sorrendjét. A „direktett" szó azt jelenti, hogy a task-ok közötti élek irányítottak (A → B), az „aciklikus" pedig azt, hogy nem alakulhat ki kör (nem térhetsz vissza egy már végrehajtott task-hoz). Ez a modell garantálja, hogy a pipeline végrehajtása mindig megállapodik egy terminális állapotban.

A fenti kódban a default_args szótár határozza meg az alapértelmezett beállításokat minden task számára: ki a felelős (owner), hányszor próbálkozzon újra hiba esetén (retries), és mennyi idő teljen el az újrapróbálkozások között (retry_delay). A with DAG(...) kontextuskezelő létrehozza a DAG objektumot, és minden benne definiált operátor automatikusan ehhez a DAG-hoz tartozik.

Három PythonOperator-t látunk: extract, transform és load. Mindegyik egy Python függvényt hajt végre (itt egyszerű lambda kifejezéseket). A >> operátorral láncoljuk őket: először kinyerjük az adatokat, majd transzformáljuk, végén betöltjük a célrendszerbe. A catchup=False beállítás fontos: megakadályozza, hogy az Airflow visszamenőleg lefuttassa az összes korábbi ütemezést a start_date óta.

Tipp: A schedule='@daily' cron-jelölés naponta éjfélkor futtatja a DAG-ot. Ha csak manuálisan szeretnéd tesztelni, állítsd schedule=None értékre – így a DAG csak kattintásra indul el a webes felületen.

[2]
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator

default_args = {
    'owner': 'data-engineer',
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    dag_id='webshop_daily_etl',
    default_args=default_args,
    start_date=datetime(2024, 1, 1),
    schedule='@daily',
    catchup=False,
) as dag:
    
    extract = PythonOperator(
        task_id='extract',
        python_callable=lambda: print('Extracting data...')
    )
    
    transform = PythonOperator(
        task_id='transform',
        python_callable=lambda: print('Transforming data...')
    )
    
    load = PythonOperator(
        task_id='load',
        python_callable=lambda: print('Loading data...')
    )
    
    extract >> transform >> load
Output:
DAG: webshop_daily_etl
Tasks: extract -> transform -> load
Schedule: @daily
Section 03

Operátorok: PythonOperator, BashOperator

Az operátorok (operators) az Airflow építőkövei: minden task egy operátor példány, amely meghatározza, hogy mi történjen a végrehajtás során. A leggyakrabban használtak a PythonOperator (Python függvény futtatása) és a BashOperator (shell parancs végrehajtása), de az Airflow gazdag ökoszisztémája több száz provider operátort kínál: SparkSubmitOperator, PostgresOperator, S3KeySensor, EmailOperator és még sok más.

A fenti példa két operátort mutat be. A BashOperator a bash_command paraméterben megadott shell parancsot futtatja le – itt a /tmp/webshop/ könyvtár tartalmát töröljük. A PythonOperator egy Python függvényt hív meg (python_callable=validate_schema), ami rugalmasabb, mint a Bash, mert tetszőleges logikát implementálhatsz: séma validálást, adatminőség-ellenőrzést, API hívásokat stb.

A választás operátor között attól függ, hogy mit szeretnél elérni. Ha egy egyszerű parancssori műveletre van szükséged (fájl törlés, curl hívás, dbt run), a BashOperator a leggyorsabb megoldás. Ha viszont komplex Python logikára van szükséged – adatok feldolgozása, transzformáció, feltételes vezérlés –, akkor a PythonOperator a megfelelő választás. Éles projektekben a kettőt gyakran kombinálják egy DAG-on belül.

Tipp: Mindig igyekezz a python_callable paraméterben egy külön modulban definiált függvényre hivatkozni, ne inline lambda-t használj. Így a kód könnyebben tesztelhető, olvashatóbb és újrafelhasználható más DAG-okban is.

OperátorFeladat
PythonOperatorPython függvény futtatása
BashOperatorShell parancs
SparkSubmitOperatorSpark job indítás
EmailOperatorEmail küldés
SensorValamire várakozás
[3]
from airflow.operators.bash import BashOperator

clean = BashOperator(
    task_id='clean_temp_files',
    bash_command='rm -rf /tmp/webshop/*'
)

validate = PythonOperator(
    task_id='validate_data',
    python_callable=validate_schema
)
Output:
Task: clean_temp_files (BashOperator)
Task: validate_data (PythonOperator)
Section 04

Task dependencies

A task-függőségek (dependencies) határozzák meg a DAG-ok végrehajtási sorrendjét. Az Airflow a >> (downstream) és << (upstream) operátorokkal teszi lehetővé a task-ok közötti relációk definálását. Ezek a bite-shifting operátorok túlterheltek az Airflow-ban, és intuitív szintaxist adnak: A >> B azt jelenti, hogy „A fusson le először, utána B".

A példa három mintát mutat be. Az első egy egyszerű lineáris láncolás: extract >> transform >> load. A második párhuzamos ágakat definiál: az extract_orders és extract_products task-ok egyszerre indulnak el, mindkettőnek be kell fejeződnie a transform előtt, ami után a load_bronze és load_silver ismét párhuzamosan fut. A harmadik egy összetettebb minta, ahol az extract után a validate és profile párhuzamosan futnak, majd a transform, load és végén egy notify következik.

A párhuzamosítás kulcsfontosságú a teljesítmény optimalizálásához. Ha két task között nincs logikai függőség (pl. két különböző tábla adatainak betöltése), érdemes őket párhuzamosan futtatni. Figyelj azonban az erőforrás-korlátokra: ha túl sok task fut egyszerre, az memory vagy CPU szűk keresztmetszetet okozhat.

Tipp: A listás szintaxis ([A, B] >> C) a felbontás (fan-out/fan-in) mintát valósítja meg. A fenti második példában az Airflow automatikusan létrehozza az összes szükséges élt: extract_orders→transform, extract_products→transform, transform→load_bronze és transform→load_silver.

[4]
# Linear chain
extract >> transform >> load

# Parallel branches
[extract_orders, extract_products] >> transform >> [load_bronze, load_silver]

# Complex
extract >> [validate, profile] >> transform >> load >> notify
Output:
extract_orders (parallel) ──┐
                            ├─> transform -> load
extract_products (parallel) ─┘
Section 05

XCom: Taskok közötti kommunikáció

Az XCom (Cross-Communication) az Airflow beépített mechanizmusa a task-ok közötti adatátadásra. Alapértelmezetten minden task elszigetelt környezetben fut, és nem fér hozzá más task memóriájához. Az XCom hidat képez: egy task adatot „pushol" a metadata adatbázisba, egy másik task pedig „pullolja" onnan. Ez lehetővé teszi, hogy pl. az extract task átadja a sorok számát a validate task-nak.

A fenti kódban az extract függvény beolvassa a CSV fájlt egy pandas DataFrame-be, majd xcom_push-szal elküldi a sorok számát (row_count) és az oszlopneveket (columns). A return érték automatikusan 'return_value' kulccsal kerül az XCom-ba. A validate függvény xcom_pull-lal kikéri a sorok számát, és ellenőrzi, hogy nagyobb-e nullánál.

Fontos korlát: az XCom értékek a metadata adatbázisban tárolódnak, ezért csak kis méretű adatokat érdemes átadni (néhány kilobájtnyi JSON, szám, string). Nagy DataFrame-eket soha ne küldj XCom-on keresztül – ehelyett írd ki fájlba (Parquet, CSV) vagy objektárhelyre (S3, GCS), és csak az elérési utat add át XCom-ban. Az Airflow 2.0-tól a TaskFlow API automatikus XCom-kezelést kínál, ami sokkal elegánsabb megoldás.

Tipp: A **context paraméter mindig kell a PythonOperator-höz, ha XCom-ot használsz. A context['ti'] a TaskInstance objektum, amely tartalmazza az xcom_push és xcom_pull metódusokat. Ha TaskFlow API-t használsz (későbbi section), ez mind automatikussá válik.

[5]
def extract(**context):
    import pandas as pd
    df = pd.read_csv('data/orders.csv')
    context['ti'].xcom_push(key='row_count', value=len(df))
    context['ti'].xcom_push(key='columns', value=list(df.columns))
    return len(df)  # Also pushed as 'return_value'

def validate(**context):
    row_count = context['ti'].xcom_pull(task_ids='extract', key='row_count')
    print(f'Validating {row_count} rows...')
    assert row_count > 0, 'No data extracted!'
Output:
XCom pushed: row_count=1500, columns=['id','customer_id',...]
XCom pulled: row_count=1500
Validating 1500 rows...
Section 06

Sensor: várakozás feltételre

A Sensor-ok az Airflow speciális operátorai, amelyek egy adott feltétel teljesülésére várnak, mielőtt a következő task elindulna. Gondolj rájuk úgy, mint egy „ismétlődő kérdésre": a sensor rendszeres időközönként (poke) ellenőrzi, hogy a feltétel teljesült-e, és ha igen, sikeresen befejeződik; ha nem, várakozik a következő ellenőrzésig. Ha a feltétel a megadott időn belül (timeout) nem teljesül, a task megbukik.

A fenti kód egy FileSensor-t mutat be, amely a /data/orders_{{ ds }}.csv fájl létezésére vár. A {{ ds }} Jinja template változó automatikusan behelyettesítődik a futtatás dátumával (pl. 2024-05-04), így minden nap a megfelelő fájlra várakozik. A poke_interval=300 azt jelenti, hogy 5 percenként ellenőriz, a timeout=3600 pedig maximum 1 órát vár.

A sensor-oknak két módja van: mode='poke' (alapértelmezett) és mode='reschedule'. Poke módban a sensor folyamatosan lefoglal egy worker slot-ot, ami erőforrás-pazarlás lehet, ha sokáig várakozik. Reschedule módban a sensor két ellenőrzés között felszabadítja a slot-ot, így más task-ok futhatnak – ez javasolt hosszú várakozás esetén.

Tipp: Használd a mode='reschedule' beállítást, ha a várakozás hosszabb lehet néhány percnél. Ez különösen fontos a CeleryExecutor és KubernetesExecutor esetén, ahol a worker erőforrások korlátozottak, és a foglalt slot-ok blokkolhatják más DAG-ok futását.

[6]
from airflow.sensors.filesystem import FileSensor
from airflow.sensors.python import PythonSensor

# Wait for file to exist
wait_for_file = FileSensor(
    task_id='wait_for_orders',
    filepath='/data/orders_{{ ds }}.csv',
    poke_interval=300,  # check every 5 min
    timeout=3600,       # max 1 hour
    mode='poke'
)
Output:
Sensor: wait_for_orders
Poking for /data/orders_2024-05-04.csv
File found after 2 attempts
Section 07

Schedule és cron

A DAG ütemezését a schedule paraméter vezérli, amely cron kifejezéseket és beépített aliasokat egyaránt támogat. A cron szintaxis öt mezőből áll: perc óra nap hónap hétnapja, és rendkívül rugalmas időzítést tesz lehetővé. A beépített aliasok (pl. @daily, @hourly, @weekly) a leggyakoribb eseteket fedik le tömören.

A fenti kódban a schedule='0 6 * * *' minden nap reggel 6:00-kor (UTC) indítja a DAG-ot. A catchup=False kritikus beállítás: ha True lenne, az Airflow visszamenőleg lefuttatná az összes ütemezést a start_date óta, ami egy régi start_date esetén több száz futtatást eredményezhet. A max_active_runs=1 biztosítja, hogy egyszerre csak egy DAG futás活跃 lehet, megelőzve az erőforrás-versenyt.

A tags paraméter címkéket rendel a DAG-hoz, amelyek segítségével a webes felületen szűrheted a DAG-okat. Egy éles környezetben könnyen 50-100 DAG futtatható, így a címkék (pl. 'webshop', 'etl', 'critical') elengedhetetlenek a szervezettséghez.

Tipp: Mindig UTC időzónát használj a cron kifejezésekben, és a start_date megadásánál is. A lokális időzóna használata gyakori hibaforrás, mert a nyári/téli időszámítás váltás megzavarhatja az ütemezést. Ha magyar idő szerint szeretnél futtatni, számold ki az UTC offset-et ( télen +1, nyáron +2).

KifejezésJelentés
@dailyNaponta éjfélkor
@hourlyÓránként
0 6 * * 1-5Hétköznap reggel 6-kor
0 0 1 * *Hónap első napján
*/15 * * * *15 percenként
[7]
with DAG(
    dag_id='webshop_daily_etl',
    schedule='0 6 * * *',  # Daily at 6 AM
    start_date=datetime(2024, 1, 1),
    catchup=False,  # Don't run past dates
    max_active_runs=1,
    tags=['webshop', 'etl'],
) as dag:
    ...
Output:
DAG scheduled: 0 6 * * * (daily at 06:00 UTC)
Next run: 2024-05-05T06:00:00
Section 08

Retry és hibakezelés

Egy robusztus adatpipeline három fő hibakezelési mechanizmust használ: retry (automatikus újrapróbálkozás), timeout (maximális futásidő) és callback (egyéni hibajelzés). Ezek együtt biztosítják, hogy az átmeneti hibák (hálózati probléma, adatbázis timeout) automatikusan kezelve legyenek, míg a tartós hibák azonnali riasztást generáljanak.

A fenti kódban a risky_task háromszor próbálkozik újra (retries=3), 10 perces szünettel az próbálkozások között (retry_delay=timedelta(minutes=10)). Az execution_timeout=timedelta(hours=2) biztosítja, hogy ha a task két óránál tovább fut, automatikusan megszakadjon – ez megelőzi a „végtelen futás" problémáját, ami különösen fontos nagy fájlok feldolgozásakor.

Az on_failure_callback egy Python függvény, amely akkor hívódik meg, ha a task az összes újrapróbálkozás után is megbukik. A context szótár tartalmazza a DAG azonosítót, a task ID-t és egyéb metaadatokat, amelyeket felhasználhatsz a riasztás testreszabásához: Slack üzenet küldése, PagerDuty riasztás, vagy egy hibanapló írása egy központi rendszerbe.

Tipp: A retries és retry_delay beállítható a default_args szótárban is, így minden task automatikusan örökli az értékeket. Csak azoknál a task-oknál adj meg egyedi beállítást, amelyek eltérnek az alapértelmezetttől. Az exponential retry delay is beállítható, ami fokozatosan növeli a várakozási időt az újrapróbálkozások között.

[8]
from airflow.operators.python import PythonOperator
from datetime import timedelta

def on_failure_callback(context):
    dag_id = context['dag'].dag_id
    task_id = context['task_instance'].task_id
    print(f'ALERT: {dag_id}.{task_id} failed!')

risky_task = PythonOperator(
    task_id='process_large_file',
    python_callable=process_file,
    retries=3,
    retry_delay=timedelta(minutes=10),
    execution_timeout=timedelta(hours=2),
    on_failure_callback=on_failure_callback,
)
Output:
Task failed on attempt 1, retrying in 10min...
Task succeeded on attempt 2!
Section 09

Connections és Hooks

Az Airflow Connections (kapcsolatok) a külső rendszerek – adatbázisok, API-k, felhőszolgáltatások – elérési adatainak központi kezelését biztosítják. A jelszavak, API kulcsok és hitelesítő adatok nem a DAG kódban tárolódnak, hanem az Airflow metadata adatbázisában (titkosítva), így a DAG-ok biztonságosan megoszthatók verziókezelőben anélkül, hogy érzékeny adatok kikerülnének.

A kapcsolatok létrehozhatók a webes felületen (Admin → Connections), CLI-vel (airflow connections add), vagy környezeti változókon keresztül. A fenti példa egy webshop_postgres nevű PostgreSQL kapcsolatot mutat be, amely tartalmazza a host, port, felhasználónév, jelszó és schema adatokat. A DAG kódban a PostgresHook ezen kapcsolat azonosító alapján automatikusan lekéri a hitelesítő adatokat és létrehozza az adatbázis kapcsolatot.

A Hook-ok a kapcsolatok feletti kényelmi réteget biztosítják: a PostgresHook például nemcsak kapcsolódik, hanem insert_rows, get_records, run metódusokat is kínál. Hasonló hook-ok léteznek más rendszerekhez is: S3Hook, GCSHook, SparkHook, HttpHook stb. Ezek jelentősen csökkentik a boilerplate kódot a DAG-okban.

Tipp: Soha ne hard-kódolj jelszavakat a DAG fájlokban! Használd az Airflow Connections rendszert, vagy Secret Backend-et (AWS Secrets Manager, HashiCorp Vault) éles környezetben. Ez nemcsak biztonsági okokból fontos, hanem megkönnyíti a környezetek közötti váltást (dev/staging/prod) is – csak a kapcsolatot kell átírni, a DAG kód változatlan marad.

[9]
# Terminálban vagy az Airflow UI-ban hozd létre:
# airflow connections add 'webshop_postgres' \
#     --conn-type 'postgres' \
#     --conn-host 'postgres' \
#     --conn-port 5432 \
#     --conn-login 'admin' \
#     --conn-password 'secret' \
#     --conn-schema 'webshop_pro'

# DAG-kódban így használod:
from airflow.providers.postgres.hooks.postgres import PostgresHook

def load_to_postgres(**context):
    hook = PostgresHook(postgres_conn_id='webshop_postgres')
    df = context['ti'].xcom_pull(task_ids='transform')
    hook.insert_rows('orders', df.values.tolist())
Output:
Connection: webshop_postgres (postgres://admin@postgres:5432/webshop_pro)
Loaded 1420 rows to orders table
Section 10

Bronze/Silver/Gold DAG

Ez a WebShop Pro projekt teljes ETL pipeline-ját egyetlen DAG-ban valósítja meg, a medallion architektúra (bronze → silver → gold) mintáját követve. A bronze réteg a nyers adatokat tárolja (orders, products), a silver réteg tisztítja és normalizálja őket, a gold réteg pedig aggregált analytics táblákat (data marts) épít. A végén egy minőség-ellenőrző lépés biztosítja az adatok korrektségét.

A függőségi gráf három szintet köt össze. Az ingest_orders és ingest_products párhuzamosan futnak (bronze), majd mindkettőnek befejeződnie kell, mielőtt a clean_orders és clean_products elindulhat (silver). A [bronze_tasks] >> [silver_tasks] szintaxis automatikusan létrehozza az összes kombinációt: minden bronze task a két silver task előfeltétele lesz.

Az utolsó lépés a quality_check, amely futtat adatminőségi ellenőrzéseket a gold rétegen: hiányzó értékek, referenciális integritás, statisztikai anomáliák stb. Ha ez a task megbukik, a pipeline jelez, de a bronze és silver adatok már biztonságosan tárolva vannak, így a hiba kijavítása után csak a gold réteget kell újraépíteni.

Tipp: Éles környezetben érdemes a bronze/silver/gold rétegeket külön DAG-okra bontani, és az XCom vagy külső fájlrendszer segítségével összekapcsolni őket. Így a bronze futása nem blokkolja a silver újraépítését, és a hibák izoláltabban kezelhetők. A medallion minta rugalmassága pont ebben rejlik: minden réteg önállóan újraépíthető.

[10]
# WebShop Pro Medallion DAG
with DAG('webshop_medallion', schedule='@daily') as dag:
    
    # Bronze layer
    ingest_orders = PythonOperator(task_id='bronze_orders', python_callable=ingest_orders)
    ingest_products = PythonOperator(task_id='bronze_products', python_callable=ingest_products)
    
    # Silver layer
    clean_orders = PythonOperator(task_id='silver_orders', python_callable=clean_orders)
    clean_products = PythonOperator(task_id='silver_products', python_callable=clean_products)
    
    # Gold layer
    build_marts = PythonOperator(task_id='gold_marts', python_callable=build_marts)
    
    # Quality check
    quality_check = PythonOperator(task_id='quality_check', python_callable=run_quality_checks)
    
    # Dependencies
    [ingest_orders, ingest_products] >> [clean_orders, clean_products] >> build_marts >> quality_check
Output:
bronze_orders ──┐              ┌──> silver_orders ──┐
                 ├─> clean ──> │                    ├─> gold_marts -> quality_check
bronze_products ─┘              └──> silver_products ─┘
Section 11

Backfill és catchup

A backfill (visszamenőleges feltöltés) az Airflow egyik leghatékonyabb funkciója: lehetővé teszi, hogy múltbeli időpontokra is lefuttasd a DAG-ot, mintha azok az adott időpontban futottak volna. Ez akkor hasznos, ha egy új DAG-ot állítasz élesbe (és be akarod tölteni az elmúlt hónapok adatait), vagy ha egy hiba javítása után újra kell építeni egy adott időszak adatait.

A CLI parancs airflow dags backfill végigmegy a megadott dátumtartományon (start-date és end-date között), és minden egyes napra lefuttatja a DAG-ot. A --reset-dagruns zászló törli a korábbi futási kísérleteket, így tiszta lappal indul a backfill. A catchup=True DAG-szintű beállítás automatikus backfill-t indít, amint a DAG elérhetővé válik: az Airflow a start_date-től kezdve az összes kihagyott ütemezést lefuttatja.

Vigyázat: a catchup=True veszélyes lehet, ha a start_date régi (pl. egy évvel ezelőtti), mert az Airflow megpróbál 365 futtatást indítani! Mindig ellenőrizd a start_date-t, és ha nem szeretnél backfill-t, állítsd catchup=False-ra. A max_active_runs paraméterrel korlátozhatod a párhuzamos futtatásokat.

Tipp: Backfill előtt mindig teszteld a DAG-ot egyetlen napon (airflow dags test webshop_medallion 2024-01-01), mielőtt az egész tartományt indítanád. Ha a DAG hibás, a backfill több száz sikertelen futtatást eredményezhet, ami tisztítást igényel a metadata adatbázisban.

[11]
# Backfill specific date range
airflow dags backfill webshop_medallion \
    --start-date 2024-01-01 \
    --end-date 2024-01-31 \
    --reset-dagruns

# Or enable catchup in DAG definition
with DAG(
    dag_id='webshop_medallion',
    catchup=True,  # Run all missed schedules
    start_date=datetime(2024, 1, 1),
) as dag:
    ...
Output:
Backfilling 31 days:
2024-01-01: SUCCESS
2024-01-02: SUCCESS
...
2024-01-31: SUCCESS
31/31 completed
Section 12

Dynamic task mapping

A dinamikus task mapping (Airflow 2.3+) lehetővé teszi, hogy futásidőben határozd meg, hány task fusson le, és milyen paraméterekkel. Ez különösen hasznos, ha nem előre tudod a feldolgozandó elemek számát – például egy változó számú beérkező CSV fájlt kell konvertálni Parquet formátumba. Hagyományos módon csak egy fix számú task-ot definiálhatnál a DAG-ban, de a dynamic mapping elegánsan megoldja ezt a problémát.

A fenti kód két lépésből áll. Először a list_files task összegyűjti a feldolgozandó CSV fájlokat a /data/incoming könyvtárból, és a visszatérési értéke (fájllista) XCom-ban tárolódik. Ezután a process task a .expand() metódussal leképezi a listát: minden egyes fájlelemre létrehoz egy külön task-példányt. Ha 5 fájl van, 5 task indul; ha 100, akkor 100.

A PythonOperator.partial() létrehozza a task „sablonját", amelyet az .expand() konkretizál a megadott paraméterlistával. Ez sokkal hatékonyabb, mint egy ciklussal 100 PythonOperator-t definiálni a DAG kódban, mert a DAG fájl mérete kicsi marad, és a Scheduler nem terheli túl.

Tipp: A dinamikus mapping korlátja, hogy maximum 1024 task-példányt hozhat létre egyetlen mapping művelettel (ez a max_map_length beállítással módosítható). Ha ennél több elemet kell feldolgoznod, érdemes batch-ekre bontani a munkát, vagy egyetlen task-ban ciklussal feldolgozni az elemeket.

[12]
from airflow.operators.python import PythonOperator

def get_files(**context):
    from pathlib import Path
    return [str(f) for f in Path('/data/incoming').glob('*.csv')]

def process_file(filepath, **context):
    import pandas as pd
    df = pd.read_csv(filepath)
    df.to_parquet(filepath.replace('.csv', '.parquet'))
    return len(df)

list_files = PythonOperator(task_id='list_files', python_callable=get_files)

process = PythonOperator.partial(task_id='process').expand(
    filepath=list_files.output.map(lambda x: x)
)

list_files >> process
Output:
Dynamic mapping: processing 5 files
file_1.csv: 500 rows -> file_1.parquet
file_2.csv: 300 rows -> file_2.parquet
...
Section 13

TaskFlow API

A TaskFlow API (Airflow 2.0+) a modern módja az Airflow DAG-ok írásának: Python dekorátorokkal (@dag, @task) helyettesíti a hagyományos operátor-alapú definíciót. A kód tisztább, rövidebb és Pythonikusabb lesz – nincs szükség explicit PythonOperator példányosításra, és az XCom adatátadás automatikusan történik a függvények visszatérési értékei alapján.

A fenti példában az @dag dekorátor definiálja a DAG metaadatait (schedule, start_date, catchup), az @task dekorátor pedig minden függvényt Airflow task-ká alakít. Az extract() beolvassa az adatokat, a transform(df) tisztítja őket, a load(df) pedig kiírja Parquet formátumban. A függvények közötti adatátadás (DataFrame) automatikusan XCom-on keresztül történik – nem kell kézzel xcom_push/xcom_pull-t írnod.

A függőségek is automatikusan levezethetők: az extract() visszatérési értéke bemenete a transform()-nak, tehát az Airflow „tudja", hogy az extract-nek előbb le kell futnia. A df = extract() és clean = transform(df) sorok valójában nem futtatják a kódot azonnal – csak a DAG gráfot építik fel. A tényleges végrehajtás az Airflow Scheduler irányításával történik.

Tipp: A TaskFlow API és a hagyományos operátor-approach keverhető is egy DAG-on belül. Ha egy feladathoz szükség van egy speciális operátorra (pl. BashOperator, SparkSubmitOperator), azt hagyományos módon definiálhatod, és a TaskFlow task-okkal összekapcsolhatod a >> operátorral.

[13]
from airflow.decorators import dag, task
from datetime import datetime

@dag(schedule='@daily', start_date=datetime(2024, 1, 1), catchup=False)
def webshop_etl_modern():
    
    @task
    def extract():
        import pandas as pd
        return pd.read_csv('data/orders.csv')
    
    @task
    def transform(df):
        return df.dropna().drop_duplicates()
    
    @task
    def load(df):
        df.to_parquet('gold/orders.parquet')
        return len(df)
    
    df = extract()
    clean = transform(df)
    load(clean)

webshop_etl_modern()
Output:
DAG: webshop_etl_modern
extract -> transform -> load
Clean TaskFlow API with automatic XCom
Section 14

Monitoring és alerting

A monitoring és alerting egy éles Airflow környezet kritikus része: tudnod kell, ha egy pipeline késik, ha túllépi a várt futásidőt (SLA), vagy ha teljesen megbukik. Az SLA (Service Level Agreement) egy maximálisan megengedett futásidő, és ha a DAG vagy egy task ennél tovább tart, a rendszer automatikusan riasztást küld.

A fenti kód egy sla_miss_callback függvényt definiál, amely akkor aktiválódik, ha a DAG túllépi az SLA-t. A default_args között az email és email_on_failure beállítások automatikus email riasztást küldenek a megadott címre, ha bármelyik task megbukik. Ez a legegyszerűbb riasztási mechanizmus, de gyakran kiegészítik Slack, PagerDuty vagy Microsoft Teams integrációval.

A monitoring további fontos elemei: a task-ok futási idejének trendje (lassulnak-e?), a sikertelenségi ráta (növekszik-e?), és az erőforrás-használat (memory, CPU). Ezeket az Airflow webes felületén követheted, de nagy rendszerekben érdemes külső monitorozó eszközt is használni (Prometheus + Grafana, Datadog), amelyek metrikákat gyűjtenek az Airflow Schedulerből és Executorból.

Tipp: A Slack integráció beállításához használd a SlackAPIOperator-t vagy a SlackHook-ot az on_failure_callback-ben. Sok csapat egy közös #data-alerts csatornát használ, ahol minden pipeline hiba automatikusan megjelenik színes, formázott üzenetben – tartalmazva a DAG nevét, a sikertelen task-ot és a hibaüzenetet.

[14]
# SLA miss callback
def sla_miss_callback(dag, task_list, blocking_tis, *args):
    print(f'SLA MISSED for {dag.dag_id}!')

with DAG(
    dag_id='webshop_sla',
    sla_miss_callback=sla_miss_callback,
    default_args={'email': ['data-team@webshop.hu'], 'email_on_failure': True}
) as dag:
    ...
Output:
SLA monitoring enabled
Email alerts: data-team@webshop.hu
Slack integration: #data-alerts channel
Section 15

Összefoglalás

Gratulálunk! Ezzel a kurzussal megismerkedtél az Apache Airflow legfontosabb fogalmaival és képességeivel – a DAG alapoktól a modern TaskFlow API-ig. Megtanultad, hogyan definiálj feladatfüggőségeket, hogyan kommunikáltass task-okat XCom-on keresztül, hogyan kezelj hibákat retry és callback mechanizmusokkal, és hogyan építs fel egy teljes medallion architektúrájú ETL pipeline-t a WebShop Pro projekthez.

Amit elsajátítottál: az Airflow architektúra (Scheduler, Webserver, Executor, Metadata DB) megértése; DAG-ok írása operátorokkal (Python, Bash) és a TaskFlow dekorátoros approach-sal; cron ütemezés, sensor-ok, dinamikus task mapping, backfill, connections és monitoring. Ezek az alapok elegendőek ahhoz, hogy éles környezetben is magabiztosan használjad az Airflow-t.

Mit vigyél tovább? A következő lépés a dbt Analytics Engineering kurzus, ahol SQL transzformációkat építesz a gold rétegen belül. A dbt és az Airflow kiegészítik egymást: az Airflow vezérli a pipeline végrehajtást, a dbt pedig a SQL modelleket és adatminőséget kezeli. Ezt követően a Databricks Lakehouse kurzus bemutatja, hogyan skálázd a teljes rendszert felhőben.

Tipp: A legjobb módja az elmélyítésnek, ha a WebShop Pro projektet saját környezetedben felépíted: állíts be egy Airflow instance-t, hozz létre PostgreSQL kapcsolatot, és futtasd le a teljes medallion DAG-ot. Ha bárhol elakadsz, az Airflow hivatalos dokumentációja (airflow.apache.org) kiváló referenciaként szolgál.

MegtanultukKövetkező
DAG, operátorok, scheduledbt Analytics Engineering
XCom, sensor, retrySQL transzformációk
Bronze/Silver/Gold DAGDatabricks Lakehouse
TaskFlow APIModern pipeline-ok
Következő

dbt Analytics Engineering - ahol SQL transzformációkat építünk!

Quiz: Mi az XCom?

Quiz: Melyik dekorátorral definiálunk DAG-ot a TaskFlow API-ban?