Airflow & Orchestration
Airflow
Delta Lake
Ez a kurzus az Apache Airflow alapjait és a data pipeline orchestráció legfontosabb fogalmait vezeti be, a WebShop Pro e-kereskedelmi projekt köré építve. Megtanulod, hogyan ütemezd, monitorozd és kezeld a hibákat az adatpipeline-okban úgy, hogy azok megbízhatóan és reprodukálhatóan fussanak nap mint nap.
Miért fontos az orchestráció? Egy modern adatplatformban tucatnyi feladat fut párhuzamosan: adatok betöltése, tisztítása, transzformálása, minőség-ellenőrzése és riportolása. Ha ezek közül bármelyik elromlik vagy késik, az teljes láncolatotakadhat meg. Az Airflow pontosan ezt a problémát oldja meg: központilag vezérli a feladatok végrehajtási sorrendjét, kezeli az újrapróbálkozásokat, és értesít, ha valami balul sül el.
A kurzus menete: először megismered az Airflow architektúráját és a DAG (Directed Acyclic Graph) fogalmát, majd lépésről lépésre felépítünk egy teljes medallion architektúrájú ETL pipeline-t – bronze (nyers adatok), silver (tisztított) és gold (analitikai) rétegekkel. Minden section egy-egy új Airflow képességet ad a eszköztáradhoz: operátorok, XCom, sensorok, retry-logika, backfill és a modern TaskFlow API.
Tipp: A legjobb módja a tanulásnak, ha minden kódrészletet magad is lefuttatod egy saját Airflow környezetben. Használd az airflow standalone parancsot egy gyors helyi tesztkörnyezet felépítéséhez, és kövesd nyomon a DAG-ok futását a webes felületen (localhost:8080).
DAG, operator, sensor, schedule, retry, backfill, XCom, connections
Napi webshop ETL DAG bronze/silver/gold rétegekkel
DAG · Sensor · Backfill
Airflow architektúra
Az Apache Airflow négy fő komponensből áll, amelyek együtt biztosítják a DAG-ok értelmezését, ütemezését és végrehajtását. A Scheduler folyamatosan elemzi a DAG mappában lévő Python fájlokat, felismeri a feladatfüggőségeket, és elindítja a task-okat, ha az ütemezési feltételek teljesülnek. A Webserver egy Flask-alapú webes felület, ahol vizuálisan monitorozhatod a DAG-ok állapotát, kezelheted a kapcsolatokat és változókat, valamint újraindíthatod a sikertelen task-okat.
Az Executor határozza meg, hol és hogyan futnak a task-ok. A legegyszerűbb a SequentialExecutor (egy task egyszerre, fejlesztésre), de éles környezetben a CeleryExecutor vagy a KubernetesExecutor a szabvány, amelyek párhuzamosítást és skálázhatóságot biztosítanak. A Metadata DB (általában PostgreSQL) tárolja a DAG-ok, task-példányok, kapcsolatok, változók és futási előzmények összes metaadatát – ez az Airflow „emlékezete".
A fenti kódrészlet az airflow standalone parancsot mutatja, amely egyetlen paranccsal elindítja az összes komponenst: schedulert, webservert és egy helyi executort. Ez a leggyorsabb módja a fejlesztési környezet felépítésének, de élesben érdemes Docker Compose vagy Kubernetes segítségével külön erőforrásokat fenntartani az egyes komponenseknek.
Tipp: Az Airflow webes felülete alapértelmezetten a 8080-as porton fut. Ha az airflow standalone parancsot használod, a naplóban látni fogod a pontos URL-t és az automatikusan generált admin jelszót. Mindig ellenőrizd a konzol kimenetét!
| Komponens | Feladat |
|---|---|
| Scheduler | DAG-ok elemzése, taskok ütemezése |
| Webserver | Web UI a monitorozáshoz |
| Executor | Taskok futtatása |
| Metadata DB | Állapot tárolás (Postgres) |
| Worker | A tényleges futtatás |
# Start Airflow standalone airflow standalone # Access Web UI # http://localhost:8081 # Default: admin / admin
Airflow webserver running at 0.0.0.0:8080 Airflow scheduler started
Első DAG
A DAG (Directed Acyclic Graph) az Airflow központi fogalma: egy Python fájl, amely meghatározza a feladatok (task-ok) halmazát és azok végrehajtási sorrendjét. A „direktett" szó azt jelenti, hogy a task-ok közötti élek irányítottak (A → B), az „aciklikus" pedig azt, hogy nem alakulhat ki kör (nem térhetsz vissza egy már végrehajtott task-hoz). Ez a modell garantálja, hogy a pipeline végrehajtása mindig megállapodik egy terminális állapotban.
A fenti kódban a default_args szótár határozza meg az alapértelmezett beállításokat minden task számára: ki a felelős (owner), hányszor próbálkozzon újra hiba esetén (retries), és mennyi idő teljen el az újrapróbálkozások között (retry_delay). A with DAG(...) kontextuskezelő létrehozza a DAG objektumot, és minden benne definiált operátor automatikusan ehhez a DAG-hoz tartozik.
Három PythonOperator-t látunk: extract, transform és load. Mindegyik egy Python függvényt hajt végre (itt egyszerű lambda kifejezéseket). A >> operátorral láncoljuk őket: először kinyerjük az adatokat, majd transzformáljuk, végén betöltjük a célrendszerbe. A catchup=False beállítás fontos: megakadályozza, hogy az Airflow visszamenőleg lefuttassa az összes korábbi ütemezést a start_date óta.
Tipp: A schedule='@daily' cron-jelölés naponta éjfélkor futtatja a DAG-ot. Ha csak manuálisan szeretnéd tesztelni, állítsd schedule=None értékre – így a DAG csak kattintásra indul el a webes felületen.
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
default_args = {
'owner': 'data-engineer',
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
with DAG(
dag_id='webshop_daily_etl',
default_args=default_args,
start_date=datetime(2024, 1, 1),
schedule='@daily',
catchup=False,
) as dag:
extract = PythonOperator(
task_id='extract',
python_callable=lambda: print('Extracting data...')
)
transform = PythonOperator(
task_id='transform',
python_callable=lambda: print('Transforming data...')
)
load = PythonOperator(
task_id='load',
python_callable=lambda: print('Loading data...')
)
extract >> transform >> loadDAG: webshop_daily_etl Tasks: extract -> transform -> load Schedule: @daily
Operátorok: PythonOperator, BashOperator
Az operátorok (operators) az Airflow építőkövei: minden task egy operátor példány, amely meghatározza, hogy mi történjen a végrehajtás során. A leggyakrabban használtak a PythonOperator (Python függvény futtatása) és a BashOperator (shell parancs végrehajtása), de az Airflow gazdag ökoszisztémája több száz provider operátort kínál: SparkSubmitOperator, PostgresOperator, S3KeySensor, EmailOperator és még sok más.
A fenti példa két operátort mutat be. A BashOperator a bash_command paraméterben megadott shell parancsot futtatja le – itt a /tmp/webshop/ könyvtár tartalmát töröljük. A PythonOperator egy Python függvényt hív meg (python_callable=validate_schema), ami rugalmasabb, mint a Bash, mert tetszőleges logikát implementálhatsz: séma validálást, adatminőség-ellenőrzést, API hívásokat stb.
A választás operátor között attól függ, hogy mit szeretnél elérni. Ha egy egyszerű parancssori műveletre van szükséged (fájl törlés, curl hívás, dbt run), a BashOperator a leggyorsabb megoldás. Ha viszont komplex Python logikára van szükséged – adatok feldolgozása, transzformáció, feltételes vezérlés –, akkor a PythonOperator a megfelelő választás. Éles projektekben a kettőt gyakran kombinálják egy DAG-on belül.
Tipp: Mindig igyekezz a python_callable paraméterben egy külön modulban definiált függvényre hivatkozni, ne inline lambda-t használj. Így a kód könnyebben tesztelhető, olvashatóbb és újrafelhasználható más DAG-okban is.
| Operátor | Feladat |
|---|---|
| PythonOperator | Python függvény futtatása |
| BashOperator | Shell parancs |
| SparkSubmitOperator | Spark job indítás |
| EmailOperator | Email küldés |
| Sensor | Valamire várakozás |
from airflow.operators.bash import BashOperator
clean = BashOperator(
task_id='clean_temp_files',
bash_command='rm -rf /tmp/webshop/*'
)
validate = PythonOperator(
task_id='validate_data',
python_callable=validate_schema
)Task: clean_temp_files (BashOperator) Task: validate_data (PythonOperator)
Task dependencies
A task-függőségek (dependencies) határozzák meg a DAG-ok végrehajtási sorrendjét. Az Airflow a >> (downstream) és << (upstream) operátorokkal teszi lehetővé a task-ok közötti relációk definálását. Ezek a bite-shifting operátorok túlterheltek az Airflow-ban, és intuitív szintaxist adnak: A >> B azt jelenti, hogy „A fusson le először, utána B".
A példa három mintát mutat be. Az első egy egyszerű lineáris láncolás: extract >> transform >> load. A második párhuzamos ágakat definiál: az extract_orders és extract_products task-ok egyszerre indulnak el, mindkettőnek be kell fejeződnie a transform előtt, ami után a load_bronze és load_silver ismét párhuzamosan fut. A harmadik egy összetettebb minta, ahol az extract után a validate és profile párhuzamosan futnak, majd a transform, load és végén egy notify következik.
A párhuzamosítás kulcsfontosságú a teljesítmény optimalizálásához. Ha két task között nincs logikai függőség (pl. két különböző tábla adatainak betöltése), érdemes őket párhuzamosan futtatni. Figyelj azonban az erőforrás-korlátokra: ha túl sok task fut egyszerre, az memory vagy CPU szűk keresztmetszetet okozhat.
Tipp: A listás szintaxis ([A, B] >> C) a felbontás (fan-out/fan-in) mintát valósítja meg. A fenti második példában az Airflow automatikusan létrehozza az összes szükséges élt: extract_orders→transform, extract_products→transform, transform→load_bronze és transform→load_silver.
# Linear chain extract >> transform >> load # Parallel branches [extract_orders, extract_products] >> transform >> [load_bronze, load_silver] # Complex extract >> [validate, profile] >> transform >> load >> notify
extract_orders (parallel) ──┐
├─> transform -> load
extract_products (parallel) ─┘XCom: Taskok közötti kommunikáció
Az XCom (Cross-Communication) az Airflow beépített mechanizmusa a task-ok közötti adatátadásra. Alapértelmezetten minden task elszigetelt környezetben fut, és nem fér hozzá más task memóriájához. Az XCom hidat képez: egy task adatot „pushol" a metadata adatbázisba, egy másik task pedig „pullolja" onnan. Ez lehetővé teszi, hogy pl. az extract task átadja a sorok számát a validate task-nak.
A fenti kódban az extract függvény beolvassa a CSV fájlt egy pandas DataFrame-be, majd xcom_push-szal elküldi a sorok számát (row_count) és az oszlopneveket (columns). A return érték automatikusan 'return_value' kulccsal kerül az XCom-ba. A validate függvény xcom_pull-lal kikéri a sorok számát, és ellenőrzi, hogy nagyobb-e nullánál.
Fontos korlát: az XCom értékek a metadata adatbázisban tárolódnak, ezért csak kis méretű adatokat érdemes átadni (néhány kilobájtnyi JSON, szám, string). Nagy DataFrame-eket soha ne küldj XCom-on keresztül – ehelyett írd ki fájlba (Parquet, CSV) vagy objektárhelyre (S3, GCS), és csak az elérési utat add át XCom-ban. Az Airflow 2.0-tól a TaskFlow API automatikus XCom-kezelést kínál, ami sokkal elegánsabb megoldás.
Tipp: A **context paraméter mindig kell a PythonOperator-höz, ha XCom-ot használsz. A context['ti'] a TaskInstance objektum, amely tartalmazza az xcom_push és xcom_pull metódusokat. Ha TaskFlow API-t használsz (későbbi section), ez mind automatikussá válik.
def extract(**context):
import pandas as pd
df = pd.read_csv('data/orders.csv')
context['ti'].xcom_push(key='row_count', value=len(df))
context['ti'].xcom_push(key='columns', value=list(df.columns))
return len(df) # Also pushed as 'return_value'
def validate(**context):
row_count = context['ti'].xcom_pull(task_ids='extract', key='row_count')
print(f'Validating {row_count} rows...')
assert row_count > 0, 'No data extracted!'XCom pushed: row_count=1500, columns=['id','customer_id',...] XCom pulled: row_count=1500 Validating 1500 rows...
Sensor: várakozás feltételre
A Sensor-ok az Airflow speciális operátorai, amelyek egy adott feltétel teljesülésére várnak, mielőtt a következő task elindulna. Gondolj rájuk úgy, mint egy „ismétlődő kérdésre": a sensor rendszeres időközönként (poke) ellenőrzi, hogy a feltétel teljesült-e, és ha igen, sikeresen befejeződik; ha nem, várakozik a következő ellenőrzésig. Ha a feltétel a megadott időn belül (timeout) nem teljesül, a task megbukik.
A fenti kód egy FileSensor-t mutat be, amely a /data/orders_{{ ds }}.csv fájl létezésére vár. A {{ ds }} Jinja template változó automatikusan behelyettesítődik a futtatás dátumával (pl. 2024-05-04), így minden nap a megfelelő fájlra várakozik. A poke_interval=300 azt jelenti, hogy 5 percenként ellenőriz, a timeout=3600 pedig maximum 1 órát vár.
A sensor-oknak két módja van: mode='poke' (alapértelmezett) és mode='reschedule'. Poke módban a sensor folyamatosan lefoglal egy worker slot-ot, ami erőforrás-pazarlás lehet, ha sokáig várakozik. Reschedule módban a sensor két ellenőrzés között felszabadítja a slot-ot, így más task-ok futhatnak – ez javasolt hosszú várakozás esetén.
Tipp: Használd a mode='reschedule' beállítást, ha a várakozás hosszabb lehet néhány percnél. Ez különösen fontos a CeleryExecutor és KubernetesExecutor esetén, ahol a worker erőforrások korlátozottak, és a foglalt slot-ok blokkolhatják más DAG-ok futását.
from airflow.sensors.filesystem import FileSensor
from airflow.sensors.python import PythonSensor
# Wait for file to exist
wait_for_file = FileSensor(
task_id='wait_for_orders',
filepath='/data/orders_{{ ds }}.csv',
poke_interval=300, # check every 5 min
timeout=3600, # max 1 hour
mode='poke'
)Sensor: wait_for_orders Poking for /data/orders_2024-05-04.csv File found after 2 attempts
Schedule és cron
A DAG ütemezését a schedule paraméter vezérli, amely cron kifejezéseket és beépített aliasokat egyaránt támogat. A cron szintaxis öt mezőből áll: perc óra nap hónap hétnapja, és rendkívül rugalmas időzítést tesz lehetővé. A beépített aliasok (pl. @daily, @hourly, @weekly) a leggyakoribb eseteket fedik le tömören.
A fenti kódban a schedule='0 6 * * *' minden nap reggel 6:00-kor (UTC) indítja a DAG-ot. A catchup=False kritikus beállítás: ha True lenne, az Airflow visszamenőleg lefuttatná az összes ütemezést a start_date óta, ami egy régi start_date esetén több száz futtatást eredményezhet. A max_active_runs=1 biztosítja, hogy egyszerre csak egy DAG futás活跃 lehet, megelőzve az erőforrás-versenyt.
A tags paraméter címkéket rendel a DAG-hoz, amelyek segítségével a webes felületen szűrheted a DAG-okat. Egy éles környezetben könnyen 50-100 DAG futtatható, így a címkék (pl. 'webshop', 'etl', 'critical') elengedhetetlenek a szervezettséghez.
Tipp: Mindig UTC időzónát használj a cron kifejezésekben, és a start_date megadásánál is. A lokális időzóna használata gyakori hibaforrás, mert a nyári/téli időszámítás váltás megzavarhatja az ütemezést. Ha magyar idő szerint szeretnél futtatni, számold ki az UTC offset-et ( télen +1, nyáron +2).
| Kifejezés | Jelentés |
|---|---|
| @daily | Naponta éjfélkor |
| @hourly | Óránként |
| 0 6 * * 1-5 | Hétköznap reggel 6-kor |
| 0 0 1 * * | Hónap első napján |
| */15 * * * * | 15 percenként |
with DAG(
dag_id='webshop_daily_etl',
schedule='0 6 * * *', # Daily at 6 AM
start_date=datetime(2024, 1, 1),
catchup=False, # Don't run past dates
max_active_runs=1,
tags=['webshop', 'etl'],
) as dag:
...DAG scheduled: 0 6 * * * (daily at 06:00 UTC) Next run: 2024-05-05T06:00:00
Retry és hibakezelés
Egy robusztus adatpipeline három fő hibakezelési mechanizmust használ: retry (automatikus újrapróbálkozás), timeout (maximális futásidő) és callback (egyéni hibajelzés). Ezek együtt biztosítják, hogy az átmeneti hibák (hálózati probléma, adatbázis timeout) automatikusan kezelve legyenek, míg a tartós hibák azonnali riasztást generáljanak.
A fenti kódban a risky_task háromszor próbálkozik újra (retries=3), 10 perces szünettel az próbálkozások között (retry_delay=timedelta(minutes=10)). Az execution_timeout=timedelta(hours=2) biztosítja, hogy ha a task két óránál tovább fut, automatikusan megszakadjon – ez megelőzi a „végtelen futás" problémáját, ami különösen fontos nagy fájlok feldolgozásakor.
Az on_failure_callback egy Python függvény, amely akkor hívódik meg, ha a task az összes újrapróbálkozás után is megbukik. A context szótár tartalmazza a DAG azonosítót, a task ID-t és egyéb metaadatokat, amelyeket felhasználhatsz a riasztás testreszabásához: Slack üzenet küldése, PagerDuty riasztás, vagy egy hibanapló írása egy központi rendszerbe.
Tipp: A retries és retry_delay beállítható a default_args szótárban is, így minden task automatikusan örökli az értékeket. Csak azoknál a task-oknál adj meg egyedi beállítást, amelyek eltérnek az alapértelmezetttől. Az exponential retry delay is beállítható, ami fokozatosan növeli a várakozási időt az újrapróbálkozások között.
from airflow.operators.python import PythonOperator
from datetime import timedelta
def on_failure_callback(context):
dag_id = context['dag'].dag_id
task_id = context['task_instance'].task_id
print(f'ALERT: {dag_id}.{task_id} failed!')
risky_task = PythonOperator(
task_id='process_large_file',
python_callable=process_file,
retries=3,
retry_delay=timedelta(minutes=10),
execution_timeout=timedelta(hours=2),
on_failure_callback=on_failure_callback,
)Task failed on attempt 1, retrying in 10min... Task succeeded on attempt 2!
Connections és Hooks
Az Airflow Connections (kapcsolatok) a külső rendszerek – adatbázisok, API-k, felhőszolgáltatások – elérési adatainak központi kezelését biztosítják. A jelszavak, API kulcsok és hitelesítő adatok nem a DAG kódban tárolódnak, hanem az Airflow metadata adatbázisában (titkosítva), így a DAG-ok biztonságosan megoszthatók verziókezelőben anélkül, hogy érzékeny adatok kikerülnének.
A kapcsolatok létrehozhatók a webes felületen (Admin → Connections), CLI-vel (airflow connections add), vagy környezeti változókon keresztül. A fenti példa egy webshop_postgres nevű PostgreSQL kapcsolatot mutat be, amely tartalmazza a host, port, felhasználónév, jelszó és schema adatokat. A DAG kódban a PostgresHook ezen kapcsolat azonosító alapján automatikusan lekéri a hitelesítő adatokat és létrehozza az adatbázis kapcsolatot.
A Hook-ok a kapcsolatok feletti kényelmi réteget biztosítják: a PostgresHook például nemcsak kapcsolódik, hanem insert_rows, get_records, run metódusokat is kínál. Hasonló hook-ok léteznek más rendszerekhez is: S3Hook, GCSHook, SparkHook, HttpHook stb. Ezek jelentősen csökkentik a boilerplate kódot a DAG-okban.
Tipp: Soha ne hard-kódolj jelszavakat a DAG fájlokban! Használd az Airflow Connections rendszert, vagy Secret Backend-et (AWS Secrets Manager, HashiCorp Vault) éles környezetben. Ez nemcsak biztonsági okokból fontos, hanem megkönnyíti a környezetek közötti váltást (dev/staging/prod) is – csak a kapcsolatot kell átírni, a DAG kód változatlan marad.
# Terminálban vagy az Airflow UI-ban hozd létre:
# airflow connections add 'webshop_postgres' \
# --conn-type 'postgres' \
# --conn-host 'postgres' \
# --conn-port 5432 \
# --conn-login 'admin' \
# --conn-password 'secret' \
# --conn-schema 'webshop_pro'
# DAG-kódban így használod:
from airflow.providers.postgres.hooks.postgres import PostgresHook
def load_to_postgres(**context):
hook = PostgresHook(postgres_conn_id='webshop_postgres')
df = context['ti'].xcom_pull(task_ids='transform')
hook.insert_rows('orders', df.values.tolist())Connection: webshop_postgres (postgres://admin@postgres:5432/webshop_pro) Loaded 1420 rows to orders table
Bronze/Silver/Gold DAG
Ez a WebShop Pro projekt teljes ETL pipeline-ját egyetlen DAG-ban valósítja meg, a medallion architektúra (bronze → silver → gold) mintáját követve. A bronze réteg a nyers adatokat tárolja (orders, products), a silver réteg tisztítja és normalizálja őket, a gold réteg pedig aggregált analytics táblákat (data marts) épít. A végén egy minőség-ellenőrző lépés biztosítja az adatok korrektségét.
A függőségi gráf három szintet köt össze. Az ingest_orders és ingest_products párhuzamosan futnak (bronze), majd mindkettőnek befejeződnie kell, mielőtt a clean_orders és clean_products elindulhat (silver). A [bronze_tasks] >> [silver_tasks] szintaxis automatikusan létrehozza az összes kombinációt: minden bronze task a két silver task előfeltétele lesz.
Az utolsó lépés a quality_check, amely futtat adatminőségi ellenőrzéseket a gold rétegen: hiányzó értékek, referenciális integritás, statisztikai anomáliák stb. Ha ez a task megbukik, a pipeline jelez, de a bronze és silver adatok már biztonságosan tárolva vannak, így a hiba kijavítása után csak a gold réteget kell újraépíteni.
Tipp: Éles környezetben érdemes a bronze/silver/gold rétegeket külön DAG-okra bontani, és az XCom vagy külső fájlrendszer segítségével összekapcsolni őket. Így a bronze futása nem blokkolja a silver újraépítését, és a hibák izoláltabban kezelhetők. A medallion minta rugalmassága pont ebben rejlik: minden réteg önállóan újraépíthető.
# WebShop Pro Medallion DAG
with DAG('webshop_medallion', schedule='@daily') as dag:
# Bronze layer
ingest_orders = PythonOperator(task_id='bronze_orders', python_callable=ingest_orders)
ingest_products = PythonOperator(task_id='bronze_products', python_callable=ingest_products)
# Silver layer
clean_orders = PythonOperator(task_id='silver_orders', python_callable=clean_orders)
clean_products = PythonOperator(task_id='silver_products', python_callable=clean_products)
# Gold layer
build_marts = PythonOperator(task_id='gold_marts', python_callable=build_marts)
# Quality check
quality_check = PythonOperator(task_id='quality_check', python_callable=run_quality_checks)
# Dependencies
[ingest_orders, ingest_products] >> [clean_orders, clean_products] >> build_marts >> quality_checkbronze_orders ──┐ ┌──> silver_orders ──┐
├─> clean ──> │ ├─> gold_marts -> quality_check
bronze_products ─┘ └──> silver_products ─┘Backfill és catchup
A backfill (visszamenőleges feltöltés) az Airflow egyik leghatékonyabb funkciója: lehetővé teszi, hogy múltbeli időpontokra is lefuttasd a DAG-ot, mintha azok az adott időpontban futottak volna. Ez akkor hasznos, ha egy új DAG-ot állítasz élesbe (és be akarod tölteni az elmúlt hónapok adatait), vagy ha egy hiba javítása után újra kell építeni egy adott időszak adatait.
A CLI parancs airflow dags backfill végigmegy a megadott dátumtartományon (start-date és end-date között), és minden egyes napra lefuttatja a DAG-ot. A --reset-dagruns zászló törli a korábbi futási kísérleteket, így tiszta lappal indul a backfill. A catchup=True DAG-szintű beállítás automatikus backfill-t indít, amint a DAG elérhetővé válik: az Airflow a start_date-től kezdve az összes kihagyott ütemezést lefuttatja.
Vigyázat: a catchup=True veszélyes lehet, ha a start_date régi (pl. egy évvel ezelőtti), mert az Airflow megpróbál 365 futtatást indítani! Mindig ellenőrizd a start_date-t, és ha nem szeretnél backfill-t, állítsd catchup=False-ra. A max_active_runs paraméterrel korlátozhatod a párhuzamos futtatásokat.
Tipp: Backfill előtt mindig teszteld a DAG-ot egyetlen napon (airflow dags test webshop_medallion 2024-01-01), mielőtt az egész tartományt indítanád. Ha a DAG hibás, a backfill több száz sikertelen futtatást eredményezhet, ami tisztítást igényel a metadata adatbázisban.
# Backfill specific date range
airflow dags backfill webshop_medallion \
--start-date 2024-01-01 \
--end-date 2024-01-31 \
--reset-dagruns
# Or enable catchup in DAG definition
with DAG(
dag_id='webshop_medallion',
catchup=True, # Run all missed schedules
start_date=datetime(2024, 1, 1),
) as dag:
...Backfilling 31 days: 2024-01-01: SUCCESS 2024-01-02: SUCCESS ... 2024-01-31: SUCCESS 31/31 completed
Dynamic task mapping
A dinamikus task mapping (Airflow 2.3+) lehetővé teszi, hogy futásidőben határozd meg, hány task fusson le, és milyen paraméterekkel. Ez különösen hasznos, ha nem előre tudod a feldolgozandó elemek számát – például egy változó számú beérkező CSV fájlt kell konvertálni Parquet formátumba. Hagyományos módon csak egy fix számú task-ot definiálhatnál a DAG-ban, de a dynamic mapping elegánsan megoldja ezt a problémát.
A fenti kód két lépésből áll. Először a list_files task összegyűjti a feldolgozandó CSV fájlokat a /data/incoming könyvtárból, és a visszatérési értéke (fájllista) XCom-ban tárolódik. Ezután a process task a .expand() metódussal leképezi a listát: minden egyes fájlelemre létrehoz egy külön task-példányt. Ha 5 fájl van, 5 task indul; ha 100, akkor 100.
A PythonOperator.partial() létrehozza a task „sablonját", amelyet az .expand() konkretizál a megadott paraméterlistával. Ez sokkal hatékonyabb, mint egy ciklussal 100 PythonOperator-t definiálni a DAG kódban, mert a DAG fájl mérete kicsi marad, és a Scheduler nem terheli túl.
Tipp: A dinamikus mapping korlátja, hogy maximum 1024 task-példányt hozhat létre egyetlen mapping művelettel (ez a max_map_length beállítással módosítható). Ha ennél több elemet kell feldolgoznod, érdemes batch-ekre bontani a munkát, vagy egyetlen task-ban ciklussal feldolgozni az elemeket.
from airflow.operators.python import PythonOperator
def get_files(**context):
from pathlib import Path
return [str(f) for f in Path('/data/incoming').glob('*.csv')]
def process_file(filepath, **context):
import pandas as pd
df = pd.read_csv(filepath)
df.to_parquet(filepath.replace('.csv', '.parquet'))
return len(df)
list_files = PythonOperator(task_id='list_files', python_callable=get_files)
process = PythonOperator.partial(task_id='process').expand(
filepath=list_files.output.map(lambda x: x)
)
list_files >> processDynamic mapping: processing 5 files file_1.csv: 500 rows -> file_1.parquet file_2.csv: 300 rows -> file_2.parquet ...
TaskFlow API
A TaskFlow API (Airflow 2.0+) a modern módja az Airflow DAG-ok írásának: Python dekorátorokkal (@dag, @task) helyettesíti a hagyományos operátor-alapú definíciót. A kód tisztább, rövidebb és Pythonikusabb lesz – nincs szükség explicit PythonOperator példányosításra, és az XCom adatátadás automatikusan történik a függvények visszatérési értékei alapján.
A fenti példában az @dag dekorátor definiálja a DAG metaadatait (schedule, start_date, catchup), az @task dekorátor pedig minden függvényt Airflow task-ká alakít. Az extract() beolvassa az adatokat, a transform(df) tisztítja őket, a load(df) pedig kiírja Parquet formátumban. A függvények közötti adatátadás (DataFrame) automatikusan XCom-on keresztül történik – nem kell kézzel xcom_push/xcom_pull-t írnod.
A függőségek is automatikusan levezethetők: az extract() visszatérési értéke bemenete a transform()-nak, tehát az Airflow „tudja", hogy az extract-nek előbb le kell futnia. A df = extract() és clean = transform(df) sorok valójában nem futtatják a kódot azonnal – csak a DAG gráfot építik fel. A tényleges végrehajtás az Airflow Scheduler irányításával történik.
Tipp: A TaskFlow API és a hagyományos operátor-approach keverhető is egy DAG-on belül. Ha egy feladathoz szükség van egy speciális operátorra (pl. BashOperator, SparkSubmitOperator), azt hagyományos módon definiálhatod, és a TaskFlow task-okkal összekapcsolhatod a >> operátorral.
from airflow.decorators import dag, task
from datetime import datetime
@dag(schedule='@daily', start_date=datetime(2024, 1, 1), catchup=False)
def webshop_etl_modern():
@task
def extract():
import pandas as pd
return pd.read_csv('data/orders.csv')
@task
def transform(df):
return df.dropna().drop_duplicates()
@task
def load(df):
df.to_parquet('gold/orders.parquet')
return len(df)
df = extract()
clean = transform(df)
load(clean)
webshop_etl_modern()DAG: webshop_etl_modern extract -> transform -> load Clean TaskFlow API with automatic XCom
Monitoring és alerting
A monitoring és alerting egy éles Airflow környezet kritikus része: tudnod kell, ha egy pipeline késik, ha túllépi a várt futásidőt (SLA), vagy ha teljesen megbukik. Az SLA (Service Level Agreement) egy maximálisan megengedett futásidő, és ha a DAG vagy egy task ennél tovább tart, a rendszer automatikusan riasztást küld.
A fenti kód egy sla_miss_callback függvényt definiál, amely akkor aktiválódik, ha a DAG túllépi az SLA-t. A default_args között az email és email_on_failure beállítások automatikus email riasztást küldenek a megadott címre, ha bármelyik task megbukik. Ez a legegyszerűbb riasztási mechanizmus, de gyakran kiegészítik Slack, PagerDuty vagy Microsoft Teams integrációval.
A monitoring további fontos elemei: a task-ok futási idejének trendje (lassulnak-e?), a sikertelenségi ráta (növekszik-e?), és az erőforrás-használat (memory, CPU). Ezeket az Airflow webes felületén követheted, de nagy rendszerekben érdemes külső monitorozó eszközt is használni (Prometheus + Grafana, Datadog), amelyek metrikákat gyűjtenek az Airflow Schedulerből és Executorból.
Tipp: A Slack integráció beállításához használd a SlackAPIOperator-t vagy a SlackHook-ot az on_failure_callback-ben. Sok csapat egy közös #data-alerts csatornát használ, ahol minden pipeline hiba automatikusan megjelenik színes, formázott üzenetben – tartalmazva a DAG nevét, a sikertelen task-ot és a hibaüzenetet.
# SLA miss callback
def sla_miss_callback(dag, task_list, blocking_tis, *args):
print(f'SLA MISSED for {dag.dag_id}!')
with DAG(
dag_id='webshop_sla',
sla_miss_callback=sla_miss_callback,
default_args={'email': ['data-team@webshop.hu'], 'email_on_failure': True}
) as dag:
...SLA monitoring enabled Email alerts: data-team@webshop.hu Slack integration: #data-alerts channel
Összefoglalás
Gratulálunk! Ezzel a kurzussal megismerkedtél az Apache Airflow legfontosabb fogalmaival és képességeivel – a DAG alapoktól a modern TaskFlow API-ig. Megtanultad, hogyan definiálj feladatfüggőségeket, hogyan kommunikáltass task-okat XCom-on keresztül, hogyan kezelj hibákat retry és callback mechanizmusokkal, és hogyan építs fel egy teljes medallion architektúrájú ETL pipeline-t a WebShop Pro projekthez.
Amit elsajátítottál: az Airflow architektúra (Scheduler, Webserver, Executor, Metadata DB) megértése; DAG-ok írása operátorokkal (Python, Bash) és a TaskFlow dekorátoros approach-sal; cron ütemezés, sensor-ok, dinamikus task mapping, backfill, connections és monitoring. Ezek az alapok elegendőek ahhoz, hogy éles környezetben is magabiztosan használjad az Airflow-t.
Mit vigyél tovább? A következő lépés a dbt Analytics Engineering kurzus, ahol SQL transzformációkat építesz a gold rétegen belül. A dbt és az Airflow kiegészítik egymást: az Airflow vezérli a pipeline végrehajtást, a dbt pedig a SQL modelleket és adatminőséget kezeli. Ezt követően a Databricks Lakehouse kurzus bemutatja, hogyan skálázd a teljes rendszert felhőben.
Tipp: A legjobb módja az elmélyítésnek, ha a WebShop Pro projektet saját környezetedben felépíted: állíts be egy Airflow instance-t, hozz létre PostgreSQL kapcsolatot, és futtasd le a teljes medallion DAG-ot. Ha bárhol elakadsz, az Airflow hivatalos dokumentációja (airflow.apache.org) kiváló referenciaként szolgál.
| Megtanultuk | Következő |
|---|---|
| DAG, operátorok, schedule | dbt Analytics Engineering |
| XCom, sensor, retry | SQL transzformációk |
| Bronze/Silver/Gold DAG | Databricks Lakehouse |
| TaskFlow API | Modern pipeline-ok |
dbt Analytics Engineering - ahol SQL transzformációkat építünk!