AI Data Engineering: adatok ML-hez
Kafka
dbt
Ez a kurzus az AI-specifikus adat-pipeline-ok világába vezet be: megtanulod, hogyan kell adatokat előkészíteni gépi tanulási modellekhez, hogyan építs feature store-t a tanítási és kiszolgálási konzisztencia érdekében, és hogyan biztosítsd az adatminőséget az egész folyamaton át.
A hagyományos data engineering és az AI data engineering között az a fő különbség, hogy míg előbbi inkább adattárházakat és BI reportingot szolgál, utóbbi közvetlenül az ML modellek inputját állítja elő. Ez azt jelenti, hogy az adatoknak nem csak helyesnek kell lenniük, hanem konzisztensnek is a tanítási és a kiszolgálási (serving) környezetben -- erről szól a feature store.
A kurzus során megismerkedhetsz a streaming adatfeldolgozással (Kafka), a SQL transzformációk version control-lal történő kezelésével (dbt), a adatminőség automatikus ellenőrzésével (Great Expectations), és a Delta Lake által nyújtott ACID tranzakciókkal. Mindez egybefoglalva az a tudás, amellyel megbízható ML adat-pipelineokat építhetsz.
Pipeline → Feature Eng → Feature Store → Streaming → Quality → Delta Lake → Catalog
AI Data Engineering fogalmak
| Fogalom | Leírás | Fontos |
|---|---|---|
| Feature Store | Feature tároló | Train/serve konzisztencia |
| Feature Eng | Nyers→modell bemenet | Jó feature=jó modell |
| Streaming | Valós idejű feldolgozás | Online feature frissítés |
| Batch Pipeline | Ütemezett feldolgozás | Tanítási adatok |
| Data Quality | Adatminőség | Rossz adat=rossz modell |
| Data Catalog | Metaadat | Discoverability |
| CDC | Change Data Capture | DB változások real-time |
| Medallion | Bronze/Silver/Gold | Adat érése |
Medallion: Bronze→Silver→Gold
A Medallion architektúra a modern adat-pipeline-ok iparági standardja, amely három rétegbe szervezi az adatokat aszerint, hogy mennyire lettek feldolgozva. A cél, hogy az adatok fokozatosan "érjenek": a nyers forrásokból (Bronze) megtisztított adatok (Silver) jönnek létre, amelyekből végül az ML modellek számára optimalizált feature táblák (Gold) készülnek.
Az AI adat-pipeline esetében ez az út a következő: az adatforrások (API-k, adatbázisok, log fájlok) betöltődnek a Bronze rétegbe nyers formátumban. A Silver rétegben megtörténik a tisztítás, validáció és átformázás (pl. Parquet formátum). A Gold réteg már tartalmazza az aggregált, feature engineering által előkészített táblákat, amelyek közvetlenül betölthetők a Feature Store-ba.
| Réteg | Formátum | Eszköz |
|---|---|---|
| Bronze | JSON, CSV, logs | S3, Delta |
| Silver | Parquet, validált | Spark, dbt |
| Gold | Feature táblák | Feast, Tecton |
| Serving | Online store | Redis, DynamoDB |
Környezet
pip install feast kafka-python delta-spark dbt-core great-expectations
[env] feast-0.37 kafka-2.0 delta-3.2 dbt-1.8 gx-0.10
Feature Engineering
A feature engineering az a folyamat, amely során a nyers adatokat a gépi tanulási modellek számára megfelelő bemeneti formátummá (feature-ökké) alakítjuk. Ez az ML munkafolyamat legfontosabb lépése: egy jól felépített feature-vel egy egyszerű modell is jobb eredményt érhet el, mint egy rosszul felépített feature-vel a legbonyolultabb modell.
A példában egy felhasználói adathalmazzal dolgozunk, és három új feature-t hozunk létre: (1) income_per_purchase -- a jövedelem és a vásárlások száma közötti arány, amely a vásárlási erőt jelzi, (2) is_active -- bináris flag, amely megmutatja, hogy a felhasználó az elmúlt 30 napban aktív volt-e, és (3) age_bucket -- a kor kategóriákba sorolása (young, mid, senior), mert sok ML algoritmus jobban kezeli a kategóriákat, mint a folytonos értékeket.
A feature engineering során mindig a modell szempontjából kell gondolkodni: mi az az információ, ami segít a modellnek jobb döntést hozni? A nyers adatokban rejlő összefüggéseket (pl. jövedelem/vásárlás arány) gyakran külön feature-ként kell kinyerni, mert a modellek nem feltétlenül találják meg ezeket automatikusan.
import pandas as pd df = pd.DataFrame({"age":[25,35,45],"income":[50000,75000,90000],"purchases":[5,12,3],"last_days":[5,45,2]}) df["income_per_purchase"] = df.income / df.purchases df["is_active"] = (df.last_days < 30).astype(int) df["age_bucket"] = pd.cut(df.age, bins=[0,30,40,100], labels=["young","mid","senior"]) print(df)
| age | income | purchases | last_days | income_per_purchase | is_active | age_bucket |
|---|---|---|---|---|---|---|
| 25 | 50000 | 5 | 5 | 10000.0 | 1 | young |
| 35 | 75000 | 12 | 45 | 6250.0 | 0 | mid |
| 45 | 90000 | 3 | 2 | 30000.0 | 1 | senior |
Feature típusok
| Típus | Példa | Transzformáció |
|---|---|---|
| Numerikus | age, income | Standard scaling |
| Kategorikus | country, type | One-hot, embedding |
| Idősoros | last_7d_revenue | Lag, rolling mean |
| Aggregált | total_spend | GroupBy + sum |
| Interakció | age × income | Feature cross |
| Szöveges | description | BERT embedding |
A feature engineering 80%-a az ML munkának. A jó feature-ök fontosabbak, mint a modell választás.
Feature Store: Feast
Feast
A Feast (Feature Store) egy nyílt forráskódú feature store keretrendszer, amelynek fő célja a tanítási és kiszolgálási (train/serve) konzisztencia biztosítása. Ez azt jelenti, hogy ugyanazok a feature-ök, ugyanazzal a számítási logikával állnak rendelkezésre mind a modell tanításakor (offline), mind a modell kiszolgálásakor (online, valós időben).
A Feast-ben a feature-öket FeatureView objektumokként definiálod, amelyek meghatározzák a feature nevét, típusát, a hozzá tartozó entitást (pl. user_id), és az érvényességi időt (TTL). A példában egy user_stats feature view-t látunk, amely három feature-t tartalmaz: age, income és purchase_count_30d. A TTL 24 órára van állítva, ami azt jelenti, hogy ha egy feature értéke 24 óránál régebbi, a Feast frissíteni fogja.
A Feast két tárolóréteget használ: az offline store (pl. Delta Lake, BigQuery) nagy mennyiségű történelmi adatot tárol a modell tanításához, míg az online store (pl. Redis, DynamoDB) alacsony késleltetésű (10ms alatt) hozzáférést biztosít a szolgáltatás (serving) idején. Ez a kettős architektúra oldja fel a train/serve skew problémát.
# Feast feature definitions user = Entity(name="user_id", value_type=ValueType.INT64) user_features = FeatureView( name="user_stats", entities=["user_id"], features=[ Feature("age", Float), Feature("income", Float), Feature("purchase_count_30d", Int64), ], online=True, ttl=timedelta(hours=24), )
Registered: user_stats Offline: Delta Lake Online: Redis TTL: 24h
Offline vs Online Store
# Offline (tanítás) offline = store.get_historical_features( entity_df=train_df, features=["user_stats:age","user_stats:income"] ).to_df() # Online (serving, <10ms) online = store.get_online_features( entity_rows=[{"user_id":42}], features=["user_stats:age"] ).to_dict()
Offline: (50000, 5) Online: {user_id:[42], age:[35.0]}
| Offline | Online | |
|---|---|---|
| Cél | Tanítás (batch) | Serving (real-time) |
| Latencia | Perc | <10ms |
| Méret | TB | GB |
| Forrás | Delta/BigQuery | Redis/DynamoDB |
Streaming: Apache Kafka
Kafka
Az Apache Kafka egy elosztott esemény-streaming platform, amely valós időben képes milliónyi eseményt másodpercenként feldolgozni. Az ML rendszerekben a Kafka elsődleges szerepe az online feature-ök valós idejű frissítése: amikor egy felhasználó vásárol, a vásárlási esemény azonnal bekerül a Kafka topic-ba, és a consumer frissíti a feature store-ban a felhasználó statisztikáit.
A Kafka alapvető építőkövei: a Producer (aki eseményeket küld), a Consumer (aki eseményeket fogad), és a Topic (a kategória, amelybe az események kerülnek). A példában egy egyszerű producert látunk, amely egy vásárlási eseményt küld a "purchases" topic-ba. Az esemény JSON formátumú, és tartalmazza a felhasználó azonosítót és az összeget.
A Kafka ereje abban rejlik, hogy az események megőrződnek (retention), így a consumerek bármikor újraképesek olvasni azokat. Ez biztosítja, hogy ha egy consumer leáll, újraindítás után onnan folytathatja, ahol abbahagyta. Éles ML pipeline-okban a Kafka gyakran egy CDC (Change Data Capture) forrással párosul, amely adatbázis-változásokat streamel a feature pipeline-ba.
from kafka import KafkaProducer import json producer = KafkaProducer(bootstrap_servers=["localhost:9092"]) event = {"user_id":42, "amount":1250} producer.send("purchases", json.dumps(event).encode()) print(f"Sent: {event}")
[kafka] Sent: {user_id:42, amount:1250} [kafka] Topic: purchases | Offset: 15420
Kafka Consumer: feature frissítés
from kafka import KafkaConsumer consumer = KafkaConsumer("purchases",bootstrap_servers=["localhost:9092"]) for msg in consumer: event = json.loads(msg.value) update_feature(event["user_id"],{"last":event["amount"]}) print(f"Updated user {event['user_id']}")
[kafka] Updated user 42 [kafka] Updated user 17 [kafka] 1.2k events/sec, 8ms avg
Data Quality
GE
A Great Expectations (GE) egy Python könyvtár adatminőség-elennőrzésre, amely "expectation-ök" (várakozások) formájában definiálja, hogy az adatoknak hogyan kell kinézniük. Az ML világban a "rossz adat = rossz modell" elv érvényesül: ha a feature-ök között null értékek, out-of-range adatok vagy duplikátumok vannak, a modell hibás predikciókat fog adni.
A példában egy ExpectationSuite-ot hozunk létre három ellenőrzéssel: (1) az age oszlop értékei 0 és 120 között kell legyenek, (2) a user_id oszlopban nem lehet null érték, (3) a user_id értékeknek egyedieknek kell lenniük. Ezek az alapelvárások minden felhasználói feature táblánál szükségesek.
Éles környezetben a GE-t a pipeline minden lépésébe integrálják: a Bronze→Silver átalakításnál validálják a nyers adatokat, a Silver→Gold lépésnél az aggregált feature-öket, és a Feature Store-ba történő publikálás előtt a végleges táblákat. Ha egy expectation elbukik, a pipeline leállítja a futást, és riasztást küld, mielőtt a hibás adatok eljutnának az ML modellig.
import great_expectations as gx suite = gx.ExpectationSuite("user_features") suite.add_expectation(ExpectColumnValuesToBeBetween(column="age",min_value=0,max_value=120)) suite.add_expectation(ExpectColumnValuesToNotBeNull(column="user_id")) suite.add_expectation(ExpectColumnValuesToBeUnique(column="user_id")) print(f"Suite: {len(suite)} checks")
✅ age 0-120: 100% ✅ user_id not null: 100% ✅ user_id unique: 100% Overall: PASSED ✓
dbt: SQL transzformációk
dbt
A dbt (data build tool) lehetővé teszi, hogy az adat-transzformációkat SQLben írd meg, version control alatt tartsd (Git), és automatikusan teszteld. A dbt filozófiája: az analitikus mérnökök (analytics engineers) SQL-t írnak, a dbt pedig lefuttatja, dokumentálja és teszteli a kódot. Az ML világban a dbt a Silver→Gold réteg transzformációit végzi.
A példában egy stg_purchases staging modellt látunk, amely a nyers vásárlási adatokból aggregál: megszámolja a vásárlásokat, kiszámolja az átlagos rendelési értéket és az összköltést felhasználónként. Ez a fajta aggregáció tipikus Gold-réteg feature engineering feladat, amelynek eredménye közvetlenül betölthető a Feature Store-ba.
A dbt ereje a modularitásban és az újrafelhasználhatóságban rejlik: a modellek hivatkozhatnak egymásra a ref() függvénnyel, a forrásokat a source() függvénnyel lehet definiálni, és a lineage (származási lánc) automatikusan generálódik. Ha módosítod egy upstream modellt, a dbt tudja, mely downstream modelleket kell újra futtatni.
# models/staging/stg_purchases.sql WITH purchases AS ( SELECT * FROM {{ source('raw','purchases') }} WHERE amount > 0 ) SELECT user_id, COUNT(*) AS purchase_count, AVG(amount) AS avg_order, SUM(amount) AS total_spend FROM purchases GROUP BY user_id
[dbt] $ dbt run [dbt] OK stg_purchases [12k rows in 3.2s] [dbt] OK int_user_features [50k in 5.1s] [dbt] OK fct_user_stats [50k in 2.8s]
End-to-end Pipeline
Airflow
A teljes end-to-end adat-pipeline az adatforrástól a Feature Store-ig tart, és minden korábbi szekcióban megismert lépést egyetlen ütemezett munkafolyamatba (DAG) foglal össze. Az Airflow a legelterjedtebb workflow-orchestrátor, amely lehetővé teszi a lépések definiálását, azok közötti függőségek beállítását és a napi szintű ütemezést.
A pipeline lépései: (1) Extract -- adatok kinyerése API-kból, adatbázisokból vagy fájlokból, (2) Load -- nyers adatok betöltése a Bronze rétegbe, (3) Transform -- dbt vagy Spark segítségével tisztítás és aggregálás (Silver), (4) Validate -- Great Expectations ellenőrzés futtatása, (5) Publish -- validált feature-ök publikálása a Gold rétegbe, (6) Store -- feature-ök betöltése a Feature Store online és offline tárolójába.
A pipeline naponta fut, és a futás eredménye (sikeres vagy sikertelen, feldolgozott sorok száma, futásidő) automatikusan naplózásra kerül. Ha bármelyik lépés elbukik, a Airflow riasztást küld, és a downstream lépések nem futnak le -- így a hibás adatok nem jutnak el a Feature Store-ba.
# Airflow DAG with DAG("feature_pipeline",schedule="@daily") as dag: extract >> transform >> validate >> publish print("DAG: feature_pipeline")
[airflow] DAG: feature_pipeline @daily [airflow] Last run: SUCCESS 8m32s [airflow] Rows: 1.2M
Delta Lake
Delta
A Delta Lake egy nyílt forráskódú tárolóréteg, amely ACID tranzakciókat, time travel képességet és schema evolution-t ad hozzá a data lake-hez. Az ML adat-pipeline-okban a Delta Lake a Bronze és Silver rétegek elsődleges tárolóformátuma, mert garantálja az adatok konzisztenciáját még párhuzamos írás és olvasás esetén is.
Az ACID tranzakciók biztosítják, hogy az adatműveletek (insert, update, delete) atomiak, konzisztensek, izoláltak és tartósak. A time travel lehetővé teszi, hogy bármely korábbi verzióhoz visszatérj -- például megvizsgálhasd a feature-ök állapotát egy korábbi tanítási futás idején. A schema evolution pedig lehetővé teszi, hogy új oszlopokat adj hozzá a táblához a meglévő adatok érintése nélkül.
A példában három Delta műveletet látható: (1) adatok appendálása a táblához, (2) time travel a 5-ös verzióhoz, és (3) merge (upsert) művelet, amely frissíti a meglévő sorokat és beszúrja az újakat. Az upsert különösen fontos a feature pipeline-okban, ahol a felhasználói feature-öket naponta kell frissíteni.
# Delta operations df.write.format("delta").mode("append").save("/data/features") # Time travel df_v5 = spark.read.format("delta").option("versionAsOf",5).load("/data/features") # Upsert (merge) (delta_table.alias("t") .merge(updates.alias("s"), "t.id=s.id") .whenMatchedUpdateAll() .whenNotMatchedInsertAll() .execute())
[delta] Version: 42 | Rows: 1.2M | Size: 8.3GB [delta] Time travel to v5: OK [delta] Merge: 50k updated, 2k inserted
Data Catalog
| Meta | Leírás | Eszköz |
|---|---|---|
| Table Registry | Sémák nyilvántartása | Unity Catalog, Glue |
| Lineage | Adat eredet | OpenLineage, Marquez |
| Tags | PII, sensitive | Automatikus PII detection |
| Search | Kereshető katalógus | DataHub, Amundsen |
| Access | Ki fér hozzá | RBAC |
CDC: Change Data Capture
Debezium
A CDC (Change Data Capture) egy technika, amely az adatbázisok változásait (insert, update, delete) valós időben érzékeli és továbbítja a streaming pipeline-ba. A Debezium a legnépszerűbb nyílt forráskódú CDC megoldás, amely a PostgreSQL, MySQL, MongoDB és más adatbázisok transaction logját (WAL) olvassa, és a változásokat Kafka topic-okba küldi.
Az ML rendszerekben a CDC azért kritikus, mert lehetővé teszi, hogy a feature-ök mindig naprakészek legyenek. Amikor egy felhasználó frissíti a profilját, vagy lead egy új rendelést, a Debezium azonnal érzékeli a változást, és egy eseményt küld a Kafka-ba. A consumer feldolgozza az eseményt, és frissíti a feature store-ban az érintett feature-öket -- mindez másodperceken belül megtörténik.
A példában egy Debezium konfigurációt látunk, amely a PostgreSQL "shop" adatbázis "users" és "purchases" tábláit figyeli. A CDC→Kafka→Spark→Delta→FeatureStore láncolat egy teljes valós idejű ML adat-pipeline-ot alkot, ahol az adatbázis-változásoktól a feature frissítésig mindössze másodpercek telnek el.
# Debezium CDC: PostgreSQL → Kafka config = { "connector.class": "PostgresConnector", "database.dbname": "shop", "table.include.list": "users,purchases", } print("CDC: postgres→kafka")
[kafka] CDC: postgres→kafka [kafka] Events: 500/sec, Lag: <1s [kafka] Stream: CDC→Kafka→Spark→Delta→FeatureStore
Train/Serve Skew
| Skew | Példa | Megoldás |
|---|---|---|
| Feature | Tanítás: bruttó. Serving: nettó | Feature Store |
| Label | Tanítás: click. Serving: purchase. | Egyértelmű definíció |
| Distribution | Tanítás: 2024. Serving: 2025. | Monitoring + retraining |
| Latency | Batch vs online aggregation | Ugyanaz a kód és store |
A Feature Store csökkenti a skew kockázatát: tanítás és serving ugyanabból a definiált feature-forrásból dolgozik.
Összefoglalás
✅ Feature Engineering, típusok
✅ Feature Store (Feast): offline+online
✅ Streaming: Kafka producer/consumer
✅ Data Quality: Great Expectations
✅ dbt: SQL transzformációk
✅ Delta Lake: ACID, time travel, merge
✅ Data Catalog, CDC
✅ Train/Serve Skew megoldása
Feature Store · Data Lineage · Feast