</> AI Data Engineer

0 / 18 section completed
Section 00

AI Data Engineering: adatok ML-hez Kafka dbt

Ez a kurzus az AI-specifikus adat-pipeline-ok világába vezet be: megtanulod, hogyan kell adatokat előkészíteni gépi tanulási modellekhez, hogyan építs feature store-t a tanítási és kiszolgálási konzisztencia érdekében, és hogyan biztosítsd az adatminőséget az egész folyamaton át.

A hagyományos data engineering és az AI data engineering között az a fő különbség, hogy míg előbbi inkább adattárházakat és BI reportingot szolgál, utóbbi közvetlenül az ML modellek inputját állítja elő. Ez azt jelenti, hogy az adatoknak nem csak helyesnek kell lenniük, hanem konzisztensnek is a tanítási és a kiszolgálási (serving) környezetben -- erről szól a feature store.

A kurzus során megismerkedhetsz a streaming adatfeldolgozással (Kafka), a SQL transzformációk version control-lal történő kezelésével (dbt), a adatminőség automatikus ellenőrzésével (Great Expectations), és a Delta Lake által nyújtott ACID tranzakciókkal. Mindez egybefoglalva az a tudás, amellyel megbízható ML adat-pipelineokat építhetsz.

Tartalom

Pipeline → Feature Eng → Feature Store → Streaming → Quality → Delta Lake → Catalog

Section 01

AI Data Engineering fogalmak

FogalomLeírásFontos
Feature StoreFeature tárolóTrain/serve konzisztencia
Feature EngNyers→modell bemenetJó feature=jó modell
StreamingValós idejű feldolgozásOnline feature frissítés
Batch PipelineÜtemezett feldolgozásTanítási adatok
Data QualityAdatminőségRossz adat=rossz modell
Data CatalogMetaadatDiscoverability
CDCChange Data CaptureDB változások real-time
MedallionBronze/Silver/GoldAdat érése
Section 02

Medallion: Bronze→Silver→Gold

A Medallion architektúra a modern adat-pipeline-ok iparági standardja, amely három rétegbe szervezi az adatokat aszerint, hogy mennyire lettek feldolgozva. A cél, hogy az adatok fokozatosan "érjenek": a nyers forrásokból (Bronze) megtisztított adatok (Silver) jönnek létre, amelyekből végül az ML modellek számára optimalizált feature táblák (Gold) készülnek.

Az AI adat-pipeline esetében ez az út a következő: az adatforrások (API-k, adatbázisok, log fájlok) betöltődnek a Bronze rétegbe nyers formátumban. A Silver rétegben megtörténik a tisztítás, validáció és átformázás (pl. Parquet formátum). A Gold réteg már tartalmazza az aggregált, feature engineering által előkészített táblákat, amelyek közvetlenül betölthetők a Feature Store-ba.

Source
Forrás
Ingest
Betöltés
Bronze
Raw
Silver
Clean
Gold
Feature
Store
Feature Store
Serve
ML
RétegFormátumEszköz
BronzeJSON, CSV, logsS3, Delta
SilverParquet, validáltSpark, dbt
GoldFeature táblákFeast, Tecton
ServingOnline storeRedis, DynamoDB
Section 03

Környezet

[s3]
pip install feast kafka-python delta-spark dbt-core great-expectations
Output:
[env] feast-0.37 kafka-2.0 delta-3.2 dbt-1.8 gx-0.10
Section 04

Feature Engineering

A feature engineering az a folyamat, amely során a nyers adatokat a gépi tanulási modellek számára megfelelő bemeneti formátummá (feature-ökké) alakítjuk. Ez az ML munkafolyamat legfontosabb lépése: egy jól felépített feature-vel egy egyszerű modell is jobb eredményt érhet el, mint egy rosszul felépített feature-vel a legbonyolultabb modell.

A példában egy felhasználói adathalmazzal dolgozunk, és három új feature-t hozunk létre: (1) income_per_purchase -- a jövedelem és a vásárlások száma közötti arány, amely a vásárlási erőt jelzi, (2) is_active -- bináris flag, amely megmutatja, hogy a felhasználó az elmúlt 30 napban aktív volt-e, és (3) age_bucket -- a kor kategóriákba sorolása (young, mid, senior), mert sok ML algoritmus jobban kezeli a kategóriákat, mint a folytonos értékeket.

A feature engineering során mindig a modell szempontjából kell gondolkodni: mi az az információ, ami segít a modellnek jobb döntést hozni? A nyers adatokban rejlő összefüggéseket (pl. jövedelem/vásárlás arány) gyakran külön feature-ként kell kinyerni, mert a modellek nem feltétlenül találják meg ezeket automatikusan.

[s4]
import pandas as pd
df = pd.DataFrame({"age":[25,35,45],"income":[50000,75000,90000],"purchases":[5,12,3],"last_days":[5,45,2]})
df["income_per_purchase"] = df.income / df.purchases
df["is_active"] = (df.last_days < 30).astype(int)
df["age_bucket"] = pd.cut(df.age, bins=[0,30,40,100], labels=["young","mid","senior"])
print(df)
Output:
ageincomepurchaseslast_daysincome_per_purchaseis_activeage_bucket
25500005510000.01young
357500012456250.00mid
45900003230000.01senior
Section 05

Feature típusok

TípusPéldaTranszformáció
Numerikusage, incomeStandard scaling
Kategorikuscountry, typeOne-hot, embedding
Idősoroslast_7d_revenueLag, rolling mean
Aggregálttotal_spendGroupBy + sum
Interakcióage × incomeFeature cross
SzövegesdescriptionBERT embedding
Fontos!

A feature engineering 80%-a az ML munkának. A jó feature-ök fontosabbak, mint a modell választás.

Section 06

Feature Store: Feast Feast

A Feast (Feature Store) egy nyílt forráskódú feature store keretrendszer, amelynek fő célja a tanítási és kiszolgálási (train/serve) konzisztencia biztosítása. Ez azt jelenti, hogy ugyanazok a feature-ök, ugyanazzal a számítási logikával állnak rendelkezésre mind a modell tanításakor (offline), mind a modell kiszolgálásakor (online, valós időben).

A Feast-ben a feature-öket FeatureView objektumokként definiálod, amelyek meghatározzák a feature nevét, típusát, a hozzá tartozó entitást (pl. user_id), és az érvényességi időt (TTL). A példában egy user_stats feature view-t látunk, amely három feature-t tartalmaz: age, income és purchase_count_30d. A TTL 24 órára van állítva, ami azt jelenti, hogy ha egy feature értéke 24 óránál régebbi, a Feast frissíteni fogja.

A Feast két tárolóréteget használ: az offline store (pl. Delta Lake, BigQuery) nagy mennyiségű történelmi adatot tárol a modell tanításához, míg az online store (pl. Redis, DynamoDB) alacsony késleltetésű (10ms alatt) hozzáférést biztosít a szolgáltatás (serving) idején. Ez a kettős architektúra oldja fel a train/serve skew problémát.

[s6]
# Feast feature definitions
user = Entity(name="user_id", value_type=ValueType.INT64)
user_features = FeatureView(
    name="user_stats",
    entities=["user_id"],
    features=[
        Feature("age", Float),
        Feature("income", Float),
        Feature("purchase_count_30d", Int64),
    ],
    online=True,
    ttl=timedelta(hours=24),
)
Output:
Registered: user_stats
  Offline: Delta Lake
  Online: Redis
  TTL: 24h
Section 07

Offline vs Online Store

[s7]
# Offline (tanítás)
offline = store.get_historical_features(
    entity_df=train_df,
    features=["user_stats:age","user_stats:income"]
).to_df()

# Online (serving, <10ms)
online = store.get_online_features(
    entity_rows=[{"user_id":42}],
    features=["user_stats:age"]
).to_dict()
Output:
Offline: (50000, 5)
Online: {user_id:[42], age:[35.0]}
OfflineOnline
CélTanítás (batch)Serving (real-time)
LatenciaPerc<10ms
MéretTBGB
ForrásDelta/BigQueryRedis/DynamoDB
Section 08

Streaming: Apache Kafka Kafka

Az Apache Kafka egy elosztott esemény-streaming platform, amely valós időben képes milliónyi eseményt másodpercenként feldolgozni. Az ML rendszerekben a Kafka elsődleges szerepe az online feature-ök valós idejű frissítése: amikor egy felhasználó vásárol, a vásárlási esemény azonnal bekerül a Kafka topic-ba, és a consumer frissíti a feature store-ban a felhasználó statisztikáit.

A Kafka alapvető építőkövei: a Producer (aki eseményeket küld), a Consumer (aki eseményeket fogad), és a Topic (a kategória, amelybe az események kerülnek). A példában egy egyszerű producert látunk, amely egy vásárlási eseményt küld a "purchases" topic-ba. Az esemény JSON formátumú, és tartalmazza a felhasználó azonosítót és az összeget.

A Kafka ereje abban rejlik, hogy az események megőrződnek (retention), így a consumerek bármikor újraképesek olvasni azokat. Ez biztosítja, hogy ha egy consumer leáll, újraindítás után onnan folytathatja, ahol abbahagyta. Éles ML pipeline-okban a Kafka gyakran egy CDC (Change Data Capture) forrással párosul, amely adatbázis-változásokat streamel a feature pipeline-ba.

[s8]
from kafka import KafkaProducer
import json

producer = KafkaProducer(bootstrap_servers=["localhost:9092"])
event = {"user_id":42, "amount":1250}
producer.send("purchases", json.dumps(event).encode())
print(f"Sent: {event}")
Output:
[kafka] Sent: {user_id:42, amount:1250}
[kafka] Topic: purchases | Offset: 15420
Section 09

Kafka Consumer: feature frissítés

[s9]
from kafka import KafkaConsumer
consumer = KafkaConsumer("purchases",bootstrap_servers=["localhost:9092"])
for msg in consumer:
    event = json.loads(msg.value)
    update_feature(event["user_id"],{"last":event["amount"]})
    print(f"Updated user {event['user_id']}")
Output:
[kafka] Updated user 42
[kafka] Updated user 17
[kafka] 1.2k events/sec, 8ms avg
Section 10

Data Quality GE

A Great Expectations (GE) egy Python könyvtár adatminőség-elennőrzésre, amely "expectation-ök" (várakozások) formájában definiálja, hogy az adatoknak hogyan kell kinézniük. Az ML világban a "rossz adat = rossz modell" elv érvényesül: ha a feature-ök között null értékek, out-of-range adatok vagy duplikátumok vannak, a modell hibás predikciókat fog adni.

A példában egy ExpectationSuite-ot hozunk létre három ellenőrzéssel: (1) az age oszlop értékei 0 és 120 között kell legyenek, (2) a user_id oszlopban nem lehet null érték, (3) a user_id értékeknek egyedieknek kell lenniük. Ezek az alapelvárások minden felhasználói feature táblánál szükségesek.

Éles környezetben a GE-t a pipeline minden lépésébe integrálják: a Bronze→Silver átalakításnál validálják a nyers adatokat, a Silver→Gold lépésnél az aggregált feature-öket, és a Feature Store-ba történő publikálás előtt a végleges táblákat. Ha egy expectation elbukik, a pipeline leállítja a futást, és riasztást küld, mielőtt a hibás adatok eljutnának az ML modellig.

[s10]
import great_expectations as gx
suite = gx.ExpectationSuite("user_features")
suite.add_expectation(ExpectColumnValuesToBeBetween(column="age",min_value=0,max_value=120))
suite.add_expectation(ExpectColumnValuesToNotBeNull(column="user_id"))
suite.add_expectation(ExpectColumnValuesToBeUnique(column="user_id"))
print(f"Suite: {len(suite)} checks")
Output:
✅ age 0-120: 100%
✅ user_id not null: 100%
✅ user_id unique: 100%
Overall: PASSED ✓
Section 11

dbt: SQL transzformációk dbt

A dbt (data build tool) lehetővé teszi, hogy az adat-transzformációkat SQLben írd meg, version control alatt tartsd (Git), és automatikusan teszteld. A dbt filozófiája: az analitikus mérnökök (analytics engineers) SQL-t írnak, a dbt pedig lefuttatja, dokumentálja és teszteli a kódot. Az ML világban a dbt a Silver→Gold réteg transzformációit végzi.

A példában egy stg_purchases staging modellt látunk, amely a nyers vásárlási adatokból aggregál: megszámolja a vásárlásokat, kiszámolja az átlagos rendelési értéket és az összköltést felhasználónként. Ez a fajta aggregáció tipikus Gold-réteg feature engineering feladat, amelynek eredménye közvetlenül betölthető a Feature Store-ba.

A dbt ereje a modularitásban és az újrafelhasználhatóságban rejlik: a modellek hivatkozhatnak egymásra a ref() függvénnyel, a forrásokat a source() függvénnyel lehet definiálni, és a lineage (származási lánc) automatikusan generálódik. Ha módosítod egy upstream modellt, a dbt tudja, mely downstream modelleket kell újra futtatni.

[s11]
# models/staging/stg_purchases.sql
WITH purchases AS (
    SELECT * FROM {{ source('raw','purchases') }}
    WHERE amount > 0
)
SELECT user_id,
    COUNT(*) AS purchase_count,
    AVG(amount) AS avg_order,
    SUM(amount) AS total_spend
FROM purchases GROUP BY user_id
Output:
[dbt] $ dbt run
[dbt] OK stg_purchases [12k rows in 3.2s]
[dbt] OK int_user_features [50k in 5.1s]
[dbt] OK fct_user_stats [50k in 2.8s]
Section 12

End-to-end Pipeline Airflow

A teljes end-to-end adat-pipeline az adatforrástól a Feature Store-ig tart, és minden korábbi szekcióban megismert lépést egyetlen ütemezett munkafolyamatba (DAG) foglal össze. Az Airflow a legelterjedtebb workflow-orchestrátor, amely lehetővé teszi a lépések definiálását, azok közötti függőségek beállítását és a napi szintű ütemezést.

A pipeline lépései: (1) Extract -- adatok kinyerése API-kból, adatbázisokból vagy fájlokból, (2) Load -- nyers adatok betöltése a Bronze rétegbe, (3) Transform -- dbt vagy Spark segítségével tisztítás és aggregálás (Silver), (4) Validate -- Great Expectations ellenőrzés futtatása, (5) Publish -- validált feature-ök publikálása a Gold rétegbe, (6) Store -- feature-ök betöltése a Feature Store online és offline tárolójába.

A pipeline naponta fut, és a futás eredménye (sikeres vagy sikertelen, feldolgozott sorok száma, futásidő) automatikusan naplózásra kerül. Ha bármelyik lépés elbukik, a Airflow riasztást küld, és a downstream lépések nem futnak le -- így a hibás adatok nem jutnak el a Feature Store-ba.

Extract
API/DB
Load
Bronze
Transform
Silver
Validate
Quality
Publish
Gold
Store
Feature Store
[s12]
# Airflow DAG
with DAG("feature_pipeline",schedule="@daily") as dag:
    extract >> transform >> validate >> publish
print("DAG: feature_pipeline")
Output:
[airflow] DAG: feature_pipeline @daily
[airflow] Last run: SUCCESS 8m32s
[airflow] Rows: 1.2M
Section 13

Delta Lake Delta

A Delta Lake egy nyílt forráskódú tárolóréteg, amely ACID tranzakciókat, time travel képességet és schema evolution-t ad hozzá a data lake-hez. Az ML adat-pipeline-okban a Delta Lake a Bronze és Silver rétegek elsődleges tárolóformátuma, mert garantálja az adatok konzisztenciáját még párhuzamos írás és olvasás esetén is.

Az ACID tranzakciók biztosítják, hogy az adatműveletek (insert, update, delete) atomiak, konzisztensek, izoláltak és tartósak. A time travel lehetővé teszi, hogy bármely korábbi verzióhoz visszatérj -- például megvizsgálhasd a feature-ök állapotát egy korábbi tanítási futás idején. A schema evolution pedig lehetővé teszi, hogy új oszlopokat adj hozzá a táblához a meglévő adatok érintése nélkül.

A példában három Delta műveletet látható: (1) adatok appendálása a táblához, (2) time travel a 5-ös verzióhoz, és (3) merge (upsert) művelet, amely frissíti a meglévő sorokat és beszúrja az újakat. Az upsert különösen fontos a feature pipeline-okban, ahol a felhasználói feature-öket naponta kell frissíteni.

[s13]
# Delta operations
df.write.format("delta").mode("append").save("/data/features")

# Time travel
df_v5 = spark.read.format("delta").option("versionAsOf",5).load("/data/features")

# Upsert (merge)
(delta_table.alias("t")
  .merge(updates.alias("s"), "t.id=s.id")
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .execute())
Output:
[delta] Version: 42 | Rows: 1.2M | Size: 8.3GB
[delta] Time travel to v5: OK
[delta] Merge: 50k updated, 2k inserted
Section 14

Data Catalog

MetaLeírásEszköz
Table RegistrySémák nyilvántartásaUnity Catalog, Glue
LineageAdat eredetOpenLineage, Marquez
TagsPII, sensitiveAutomatikus PII detection
SearchKereshető katalógusDataHub, Amundsen
AccessKi fér hozzáRBAC
Section 15

CDC: Change Data Capture Debezium

A CDC (Change Data Capture) egy technika, amely az adatbázisok változásait (insert, update, delete) valós időben érzékeli és továbbítja a streaming pipeline-ba. A Debezium a legnépszerűbb nyílt forráskódú CDC megoldás, amely a PostgreSQL, MySQL, MongoDB és más adatbázisok transaction logját (WAL) olvassa, és a változásokat Kafka topic-okba küldi.

Az ML rendszerekben a CDC azért kritikus, mert lehetővé teszi, hogy a feature-ök mindig naprakészek legyenek. Amikor egy felhasználó frissíti a profilját, vagy lead egy új rendelést, a Debezium azonnal érzékeli a változást, és egy eseményt küld a Kafka-ba. A consumer feldolgozza az eseményt, és frissíti a feature store-ban az érintett feature-öket -- mindez másodperceken belül megtörténik.

A példában egy Debezium konfigurációt látunk, amely a PostgreSQL "shop" adatbázis "users" és "purchases" tábláit figyeli. A CDC→Kafka→Spark→Delta→FeatureStore láncolat egy teljes valós idejű ML adat-pipeline-ot alkot, ahol az adatbázis-változásoktól a feature frissítésig mindössze másodpercek telnek el.

[s15]
# Debezium CDC: PostgreSQL → Kafka
config = {
    "connector.class": "PostgresConnector",
    "database.dbname": "shop",
    "table.include.list": "users,purchases",
}
print("CDC: postgres→kafka")
Output:
[kafka] CDC: postgres→kafka
[kafka] Events: 500/sec, Lag: <1s
[kafka] Stream: CDC→Kafka→Spark→Delta→FeatureStore
Section 16

Train/Serve Skew

SkewPéldaMegoldás
FeatureTanítás: bruttó. Serving: nettóFeature Store
LabelTanítás: click. Serving: purchase.Egyértelmű definíció
DistributionTanítás: 2024. Serving: 2025.Monitoring + retraining
LatencyBatch vs online aggregationUgyanaz a kód és store
Megoldás

A Feature Store csökkenti a skew kockázatát: tanítás és serving ugyanabból a definiált feature-forrásból dolgozik.

Section 17

Összefoglalás

Megtanultad:

✅ Feature Engineering, típusok
✅ Feature Store (Feast): offline+online
✅ Streaming: Kafka producer/consumer
✅ Data Quality: Great Expectations
✅ dbt: SQL transzformációk
✅ Delta Lake: ACID, time travel, merge
✅ Data Catalog, CDC
✅ Train/Serve Skew megoldása

Szójegyzék

Feature Store · Data Lineage · Feast

Quiz: Mi a feature store célja?

Quiz: Melyik a data quality legfontosabb dimenziója?