Streaming Engineering Crash Course
Kafka
Spark
A batch feldolgozás nem elég, ha valós idejű adatokkal dolgozol. A streaming engineering lehetővé teszi, hogy adatokat milliszekundumok alatt dolgozz fel, ahogy érkeznek.
WebShop Pro kontextus: Képzeljük el, hogy a webshopunk minden kattintását, kosár-eseményét, rendelését és készlet-frissítését valós időben akarjuk feldolgozni. Nem várhatunk éjfélig a batch ETL-re!
Event streaming · Kafka · Structured Streaming
Kafka dokumentáció 📖 · Spark Streaming 📖 · Kafka in 6 minutes 🎬
Kafka · Topic/partition · Event schema · Producer/Consumer · Exactly-once · Dead letter queue · Spark Streaming · Watermarking · Stream-to-Delta · WebShop Pro pipeline
WebShop Pro real-time clickstream + rendelés feldolgozó pipeline
Event-driven architektúra
A hagyományos batch architektúrában az adatforrás → ETL → adattárház lánc fut napi/heti rendszerességgel. Az event-driven architektúrában minden művelet eseményt generál, amit azonnal fel lehet dolgozni.
WebShop Pro események:
- Page view — felhasználó megnyitott egy terméoldalt
- Add to cart — terméket tett a kosárba
- Purchase — sikeres vásárlás
- Inventory update — készlet változás
- Return — visszaküldés
| Batch vs Streaming | Batch | Streaming |
|---|---|---|
| Késleltetés | Órák / napok | Milliszekundumok / másodpercek |
| Adat | Teljes adathalmaz | Eseményenként / micro-batch |
| Komplexitás | Alacsonyabb | Magasabb (state, ordering, late data) |
| Use case | Reporting, BI | Real-time dashboard, alert, personalization |
Kafka alapok
Kafka
Az Apache Kafka egy distributed event streaming platform. Három szerepe van:
- Publish — producerek eseményeket küldenek
- Store — Kafka tárolja az eseményeket (retention alapján)
- Subscribe — consumerek feliratkoznak és feldolgozzák
Broker · Zookeeper/KRaft
# Kafka indítása Docker-rel (WebShop Pro dev környezet)
version: '3'
services:
kafka:
image: confluentinc/cp-kafka:7.5.0
ports:
- "9092:9092"
environment:
KAFKA_NODE_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,HOST://localhost:9092
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:29093
KAFKA_LISTENERS: PLAINTEXT://kafka:29092,CONTROLLER://kafka:29093,HOST://0.0.0.0:9092
CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk
# Indítás: docker compose up -dKafka elindítható: docker compose up -d
Topic, Partition, Consumer Group
Kafka
A Kafka három alapfogalma a topic, a partition és a consumer group. Ezek együtt határozzák meg, hogyan áramlanak az események a rendszerben, és hogyan skálázható a feldolgozás.
- Topic — kategória, amire a producerek küldenek és a consumerek olvasnak (pl.
webshop.orders). Gondolj rá úgy, mint egy adatbázis táblára, de stream-ként: az üzenetek sorrendben érkeznek, és a topic konfigurálható retenciós szabály alapján őrzi meg őket. - Partition — a topic vízszintes skálázási egysége. Minden partition egy rendezett, append-only log. A particionálás kulcs alapján történik: azonos kulcsú üzenetek mindig ugyanarra a partitionra kerülnek, így garantált a sorrend. Minél több partition, annál több consumer dolgozhat párhuzamosan.
- Consumer Group — consumerek csoportja, ahol minden partitiont pontosan egy consumer olvas. Ha egy consumer kiesik, a Kafka automatikusan újraosztja a partitionöket (rebalance). A group.id határozza meg, mely consumerek tartoznak össze.
WebShop Pro: A webshop.purchases topic 6 partitionnel rendelkezik, így akár 6 consumer dolgozhat párhuzamosan. A rendeléseket user_id alapján particionáljuk, így egy adott felhasználó rendelései mindig sorrendben érkeznek.
Partition key · Rebalance
from kafka import KafkaAdminClient
from kafka.admin import NewTopic
admin = KafkaAdminClient(bootstrap_servers="localhost:9092")
# WebShop Pro topic-ok létrehozása
topics = [
NewTopic(name="webshop.page_views", num_partitions=3, replication_factor=1),
NewTopic(name="webshop.cart_events", num_partitions=3, replication_factor=1),
NewTopic(name="webshop.purchases", num_partitions=6, replication_factor=1),
NewTopic(name="webshop.inventory", num_partitions=3, replication_factor=1),
]
admin.create_topics(topics)
print(f"Topics created: {[t.name for t in topics]}")Topics created: ['webshop.page_views', 'webshop.cart_events', 'webshop.purchases', 'webshop.inventory']
Event schema
Kafka
A Kafka maga nem kényszerít sémát — bármilyen byte-sorozat lehet az üzenet. De production-ben séma validáció kell. Két megközelítés:
- JSON Schema — egyszerű, jó debugra
- Avro / Protobuf — kompakt bináris, sémaváltozás támogatással
Schema Registry · Schema Registry 📖
import json
from datetime import datetime
# WebShop Pro event schema (JSON)
purchase_event = {
"event_type": "purchase",
"event_id": "evt-abc123",
"timestamp": datetime.now().isoformat(),
"user_id": "usr-456",
"session_id": "ses-789",
"data": {
"order_id": "ORD-2024-001",
"items": [
{"product_id": "P-100", "name": "Mechanikus billentyűzet", "qty": 1, "price": 25000},
{"product_id": "P-200", "name": "USB-C hub", "qty": 2, "price": 8500},
],
"total_amount": 42000,
"currency": "HUF",
"payment_method": "card"
}
}
print(json.dumps(purchase_event, indent=2, ensure_ascii=False)){
"event_type": "purchase",
"event_id": "evt-abc123",
"timestamp": "2024-01-15T14:30:00.123456",
"user_id": "usr-456",
"data": {
"order_id": "ORD-2024-001",
"items": [
{"product_id": "P-100", "name": "Mechanikus billentyűzet", "qty": 1, "price": 25000},
{"product_id": "P-200", "name": "USB-C hub", "qty": 2, "price": 8500}
],
"total_amount": 42000
}
}
Kafka Producer
Kafka
Python
A Kafka Producer felel az események Kafka topicokba küldéséért. Bár egy egyszerű send() hívás is elegendő az üzenet elküldéséhez, production környezetben több kulcsfontosságú beállítást is meg kell adni a megbízhatóság és a teljesítmény érdekében.
- acks=all — minden in-sync replica broker visszaigazolja az írást, mielőtt a producer megkapja a megerősítést. Ez biztosítja a maximális tartósságot: ha a broker összeomlik, az adat nem vész el. Az
acks=1gyorsabb, de kockázatosabb. - key — az üzenet kulcsa meghatározza, melyik partitionra kerül (hash-alapú partitioner). Azonos kulcs = azonos partition, ami garantálja a sorrendet az adott kulcshoz. WebShop Pro-ban a user_id-át használjuk key-ként.
- compression — gzip, snappy vagy lz4 tömörítés csökkenti a hálózati forgalmat és a tárolási igényt. A snappy a leggyorsabb, a gzip a leghatékonyabb tömörítés szempontjából.
WebShop Pro forgatókönyv: Amikor egy felhasználó kosárba tesz egy terméket, a frontend AJAX hívást indít, ami egy producer endpointot hí meg. A producer user_id alapján key-ként küldi az eseményt, így az azonos felhasználó összes kosár-eseménye ugyanarra a partitionra kerül, garantálva a cronológiai sorrendet.
Tip: Mindig használj acks=all-t pénzügyi tranzakcióknál, és fontold meg a enable.idempotence=true beállítást, ami megakadályozza az üzenet duplikációt hálózati hiba esetén.
from kafka import KafkaProducer
import json
from datetime import datetime
producer = KafkaProducer(
bootstrap_servers="localhost:9092",
value_serializer=lambda v: json.dumps(v).encode("utf-8"),
key_serializer=lambda k: k.encode("utf-8") if k else None,
acks="all",
compression_type="gzip",
)
# WebShop Pro: kosár esemény küldése
events = [
("usr-456", {"event_type": "add_to_cart", "product_id": "P-100", "qty": 1}),
("usr-456", {"event_type": "add_to_cart", "product_id": "P-200", "qty": 2}),
("usr-789", {"event_type": "add_to_cart", "product_id": "P-300", "qty": 1}),
]
for key, event in events:
event["timestamp"] = datetime.now().isoformat()
future = producer.send("webshop.cart_events", key=key, value=event)
record = future.get(timeout=10)
print(f"Sent to partition {record.partition} at offset {record.offset}")Sent to partition 1 at offset 0 Sent to partition 1 at offset 1 Sent to partition 0 at offset 0
Kafka Consumer
Kafka
Python
A Kafka Consumer olvassa a topic üzeneteket, és azokat továbbítja az üzleti logikának. A consumer működése több fontos koncepcióra épül, amelyek megértése elengedhetetlen egy megbízható streaming pipeline felépítéséhez.
- Consumer Group — azonos
group.id-val rendelkező consumerek megosztják a partitionöket egymás között. Minden partitiont pontosan egy consumer olvas a csoporton belül. Ha több consumer van, mint partition, a felesleges consumerek tétlenek maradnak (standby). Ha kevesebb, akkor egy consumer több partitiont is olvashat. - Offset — egy monoton növekvő szám, ami megmutatja, hol tart a consumer az adott partition olvasásában. A Kafka tárolja az offsetet (committed offset), így ha a consumer újraindul, onnan folytatja, ahol abbahagyta — nem kell újraolvasni a már feldolgozott üzeneteket.
- auto.offset.reset — amikor egy teljesen új consumer (vagy egy olyan, aminek nincs mentett offsetje) elkezdi az olvasást, ez a beállítás határozza meg, hol kezdjen:
earliest(a legrégebbi üzenettől) vagylatest(csak az új üzenetek).
WebShop Pro példa: Az analytics consumer group (webshop-analytics) a webshop.cart_events topicot olvassa. Ha a consumer leáll és újraindul, a mentett offset alapján folytatja, így egyetlen kosár-esemény sem marad ki. A manual commit (enable.auto.commit=false) biztosítja, hogy csak a sikeresen feldolgozott üzenetek offsetje kerül mentésre.
Tip: Production-ben mindig használd a enable.auto.commit=false beállítást és manuálisan commitolj csak azokat az offseteket, amelyekhez tartozó üzeneteket sikeresen feldolgoztad. Ez megakadályozza az adatvesztést.
from kafka import KafkaConsumer
consumer = KafkaConsumer(
"webshop.cart_events",
bootstrap_servers="localhost:9092",
group_id="webshop-analytics",
auto_offset_reset="earliest",
value_deserializer=lambda m: json.loads(m.decode("utf-8")),
key_deserializer=lambda m: m.decode("utf-8") if m else None,
enable_auto_commit=False, # manual offset commit!
)
# Process a few messages
for msg in consumer:
print(f"[{msg.partition}:{msg.offset}] key={msg.key} value={msg.value}")
# Manual commit after successful processing
consumer.commit()
break # demo: just one message[1:1] key=usr-456 value={'event_type': 'add_to_cart', 'product_id': 'P-200', 'qty': 2, 'timestamp': '2024-01-15T14:30:01'}
Exactly-once vs at-least-once
Kafka
A delivery guarantee (kézbesítési garancia) az egyik legfontosabb architekturális döntés streaming rendszerek tervezésekor. A három fő szint között kell választani, és a döntés hatással van a rendszer megbízhatóságára, teljesítményére és komplexitására.
- At-most-once — a producer elküldi az üzenetet és nem vár visszaigazolást. Ha valami hiba történik (hálózati probléma, broker összeomlás), az üzenet elveszhet. Ez a leggyorsabb megközelítés, de a legkevésbé megbízható. Olyan esetekben megfelelő, ahol egy-egy elveszett üzenet nem kritikus.
- At-least-once — a producer újrapróbálkozik, amíg meg nem kapja a visszaigazolást. Ha hiba történik, az üzenet újraküldésre kerül. Ez garantálja, hogy egyetlen üzenet sem vész el, de duplikátumok előfordulhatnak. A consumer oldalon idempotens feldolgozással (pl. deduplikáció event_id alapján) kezelhető a probléma.
- Exactly-once — az üzenet pontosan egyszer kerül kézbesítésre, sem el nem veszik, sem nem duplikálódik. Ez a legerősebb garancia, de a legdrágább: tranzakciós protokollt és kételyes koordinációt igényel a producer, a broker és a consumer között. A Kafka
enable.idempotence=trueés tranzakciós API-ja biztosítja ezt.
WebShop Pro stratégia: A rendelések (webshop.purchases) exactly-once garanciát kapnak, mert pénzügyi tranzakciók — egy duplikált rendelés komoly ügyféligénylési problémát okozna. A page view események at-least-once garanciával működnek, mert az aggregált metrikák idempotensek (egy extra page view minimális torzítást okoz).
| Garancia | Elveszhet? | Duplikálódhat? | Teljesítmény | Használat |
|---|---|---|---|---|
| At-most-once | Igen | Nem | Legjobb | Metrikák, logok |
| At-least-once | Nem | Igen | Jó | Idempotens feldolgozás |
| Exactly-once | Nem | Nem | Lassabb | Pénzügyi tranzakciók |
Rendelések: exactly-once (pénzügyi). Page views: at-least-once (idempotens aggregálás).
Dead Letter Queue (DLQ)
Kafka
Mi van, ha egy üzenetet nem sikerül feldolgozni? Nem dobhatjuk el, de nem is akadhatunk rajta végtelenül. A DLQ a megoldás:
- Próbálkozz N alkalommal (retry)
- Ha továbbra is hibás → küld DLQ topicra
- DLQ-t monitorozzuk és manuálisan/vagy automatikusan újra feldolgozzuk
from kafka import KafkaProducer, KafkaConsumer
import json
DLQ_TOPIC = "webshop.cart_events.dlq"
MAX_RETRIES = 3
dlq_producer = KafkaProducer(
bootstrap_servers="localhost:9092",
value_serializer=lambda v: json.dumps(v).encode("utf-8"),
)
def process_with_retry(msg, attempt=1):
try:
event = msg.value
# Simulate processing that might fail
if event.get("product_id") is None:
raise ValueError("Missing product_id")
print(f"Processed: {event['event_type']} product={event['product_id']}")
except Exception as e:
if attempt < MAX_RETRIES:
print(f"Retry {attempt}/{MAX_RETRIES}: {e}")
process_with_retry(msg, attempt + 1)
else:
# Send to DLQ after max retries
dlq_event = {
"original": msg.value,
"error": str(e),
"topic": msg.topic,
"partition": msg.partition,
"offset": msg.offset,
"retry_count": attempt
}
dlq_producer.send(DLQ_TOPIC, value=dlq_event)
print(f"Sent to DLQ: {e}")Processed: add_to_cart product=P-100 Sent to DLQ: Missing product_id
Spark Structured Streaming
Spark
A Spark Structured Streaming az Apache Spark deklaratív streaming API-ja, amely a batch DataFrame API-t terjeszti ki folyamatos adatfeldolgozásra. A lényeg: nem kell külön streaming koncepciókat megtanulnod — ugyanazok a transzformációk (select, filter, groupBy, join) működnek, mint batch módban, de a motor a háttérben folyamatosan futtatja őket.
A motor micro-batch módban működik alapértelmezetten: a bejövő adatfolyamot kis, fix időközönként (pl. 500ms vagy 5s) batch-ekre bontja, majd mindegyiket egy-egy Spark job-ként dolgozza fel. Létezik continuous processing mód is, ami milliszekendumos késleltetést biztosít, de korlátozottabb operátorokkal.
Miért Spark és nem sima Python consumer? Mert a Spark automatikusan kezeli a fault tolerance-t (checkpointing), a state-kezelést (aggregációk, window-ok), és skálázható több node-ra. Egy sima Python consumer nem tud amortizálni egy rebalance-t, nem tud nagy state-et tartani memóriában, és nem tud pontosan egyszer szematikusan dolgozni.
Micro-batch · Spark Streaming docs 📖
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StringType, IntegerType
spark = SparkSession.builder \
.appName("WebShop-Streaming") \
.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0") \
.getOrCreate()
# Schema definiálása a Kafka üzenethez
cart_schema = StructType() \
.add("event_type", StringType()) \
.add("product_id", StringType()) \
.add("qty", IntegerType()) \
.add("timestamp", StringType())
# Kafka stream olvasása
cart_stream = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "webshop.cart_events") \
.option("startingOffsets", "latest") \
.load()
# JSON parse
parsed = cart_stream.select(
from_json(col("value").cast("string"), cart_schema).alias("data")
).select("data.*")
print("Streaming query definiálva (micro-batch, 5s trigger)")Streaming query definiálva (micro-batch, 5s trigger)
Watermarking
Spark
A watermark a Spark Structured Streaming egyik legfontosabb mechanizmusa, amely meghatározza, meddig várjon a motor a későn érkező (late) eseményekre, mielőtt lezár egy időablakot. Megadja a maximális késleltetést, amit az engine tolerál — ezen túl érkező adatokat eldobja vagy külön kezeli.
Hogyan működik? A Spark folyamatosan nyomon követi a legfrissebb esemény-időt minden partitionön. Ha egy új esemény érkezik, az esemény időpontja alapján frissíti a watermarket: watermark = max(event_time) - maxDelay. Bármely ablak, amelynek a vége a watermark előtt van, lezárul és kimenetre kerül. A későn érkező, de még a watermark ablakon belüli események frissítik a korábbi ablak eredményét.
WebShop Pro: Ha egy felhasználó mobilon kosárba tesz valamit, de a hálózat miatt 2 perc múlva érkezik meg a Kafka-hoz, a 2 perces watermark ezt még kezeli. Ha viszont 10 percet késik, a Spark eldobja, mert a watermark már túlhaladt az adott ablakon. Ez egy ésszerű kompromisszum a valós idejű feldolgozás pontossága és teljesítménye között.
Tip: A watermark beállítása kritikus egyensúly: túl kis érték → sok late data elvész; túl nagy érték → az ablakok lassan zárulnak, nő a memória-használat és a késleltetés. Kezdd a várható hálózati késleltetés 2-3-szorosával, és monitorozd a late data arányt.
from pyspark.sql.functions import window, to_timestamp, current_timestamp
# Watermark alkalmazása: 2 perc max delay
windowed = parsed \
.withColumn("event_time", to_timestamp("timestamp")) \
.withWatermark("event_time", "2 minutes") \
.groupBy(
window("event_time", "5 minutes"), # 5 perces ablakok
"product_id"
).count()
# Output: 5 percenként aggregált kosár-események per termék
query = windowed.writeStream \
.outputMode("complete") \
.format("console") \
.trigger(processingTime="10 seconds") \
.start()
print("Windowed aggregation: 5 min windows, 2 min watermark")Windowed aggregation: 5 min windows, 2 min watermark
Late arriving data
Spark
A late data (későn érkező adat) olyan esemény, amelynek az esemény-időpontja már a watermark mögött van, amikor megérkezik a feldolgozó rendszerbe. Ez gyakran előfordozik mobil hálózati késleltetés, óraszinkronizációs problémák vagy upstream rendszer leállás miatt.
A Spark Structured Streaming három stratégiát kínál a late data kezelésére:
- Drop — figyelmen kívül hagyjuk a későn érkező eseményeket (alapértelmezett viselkedés). A legegyszerűbb és leggyorsabb megoldás, de adatvesztéssel jár. Olyan metrikáknál megfelelő, ahol egy-két elveszett esemény nem torzítja jelentősen az eredményt.
- Update — frissítjük a korábbi ablak eredményét az új adattal. A kimenetben a korábbi sor felülíródik a frissített értékkel. Ez alkalmas real-time dashboardokhoz, ahol a pontosság fontos, de a state mérete nőhet, mert a Spark-nak memóriában kell tartania a korábbi ablakok állapotát.
- Append — új sorként hozzáadjuk az eredményhez, a korábbi ablak eredménye nem módosul. Ritkábban használt, de hasznos lehet audit célú naplózásnál.
WebShop Pro: A rendeléseknél Update módot használunk, mert a rendelési statisztikáknak pontosnak kell lenniük. A page view-knál Drop módot, mert egy-két elveszett page view nem kritikus az aggregált metrikák szempontjából.
| Stratégia | Output mode | Használat | Költség |
|---|---|---|---|
| Drop late | Append | Metrikák ahol nem kritikus | Ingyenes |
| Update result | Update | Real-time dashboard | Közepes (state storage) |
| Reprocess | Complete | Pontos aggregáció | Magas (teljes state) |
Rendelések: Update mode (pontos kell). Page views: Drop late (nem kritikus ha egy view elveszik).
Stream-to-Delta
Spark
Delta Lake
A streaming adatok Delta táblába írása a leggyakoribb lakehouse minta, amely ötvözi a streaming feldolgozás valós idejűségét a Delta Lake ACID tranzakcióinak megbízhatóságával. A Spark Structured Streaming natívan támogatja a Delta sink-et, így egyetlen API hívással írhatunk stream-ből Delta táblába.
Hogyan működik? A Spark minden micro-batch-ben új Parquet fájlokat ír a Delta tábla könyvtárába, és atomi művelettel frissíti a _delta_log/ tranzakciós naplót. A checkpointLocation könyvtár tartalmazza a streaming progress-t (offsetek, state), így ha a query újraindul, onnan folytatja, ahol abbahagyta — ez a fault tolerance alapja.
Output módok: Az append mód csak új sorokat ad hozzá (későn érkező adatok nem módosítják a korábbiakat). Az update mód frissíti a meglévő sorokat (pl. aggregációknál). A complete mód minden micro-batch-ben kiírja a teljes eredményt (ritkán használt, nagy state-nél drága).
WebShop Pro: A kosár-események és készlet-frissítések is Delta táblákba íródnak. A kosár-események append módban, mert csak új események jönnek. A készlet-frissítések update módban, mert a termékkészlet értéke felülíródik a legfrissebb állapottal.
# Kosár események stream → Delta tábla
delta_query = parsed \
.withColumn("event_time", to_timestamp("timestamp")) \
.withWatermark("event_time", "2 minutes") \
.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "./checkpoint/cart_events") \
.partitionBy("product_id") \
.trigger(processingTime="30 seconds") \
.start("./data/delta/cart_events")
# Készlet-frissítések stream → Delta tábla
inventory_stream = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "webshop.inventory") \
.load()
inventory_query = inventory_stream \
.select(from_json(col("value").cast("string"), inv_schema).alias("d")) \
.select("d.*") \
.writeStream \
.format("delta") \
.outputMode("update") \
.option("checkpointLocation", "./checkpoint/inventory") \
.start("./data/delta/inventory")
print("2 streaming query fut: cart_events → Delta, inventory → Delta")2 streaming query fut: cart_events → Delta, inventory → Delta
Real-time feature update
Kafka
Delta Lake
Az ML modellek valós idejű frissítéséhez real-time feature-ökre van szükség. A batch-alapú feature engineering (pl. napi aggregáció) túl lassú az olyan alkalmazásokhoz, mint a valós idejű termékajánlás vagy a csalásdetektálás. A streaming pipeline képes folyamatosan frissíteni a feature-öket, ahogy az adatok érkeznek.
Mik azok a real-time feature-ök? Olyan aggregált jellemzők, amelyek egy csúszó időablakban (sliding window) számítódnak. Például: "a felhasználó az elmúlt 10 percben hány terméket nézett meg", "az elmúlt 1 órában mennyit költött", vagy "az adott IP címről hány sikertelen bejelentkezési kísérlet történt az elmúlt 5 percben".
A Spark Structured Streaming groupBy(window(...), key) operátorral natívan támogatja a csúszó ablakokat. A complete output módot használjuk, mert minden trigger-ben a teljes, frissített aggregációt ki akarjuk írni a feature store-ba. A Delta Lake-ben tárolt feature-öket az ML modell közvetlenül olvashatja a legfrissebb állapotban.
WebShop Pro: A kosár-események stream-ből 10 perces csúszó ablakkal (1 perces slide) számítjuk a termékenkénti eseményszámot és mennyiséget. Ezek a feature-ök bemenetként szolgálnak a termékajánló modellnek.
from pyspark.sql.functions import count, sum as _sum, avg
# Real-time feature kalkuláció: user activity az elmúlt 10 percben
user_features = parsed \
.withColumn("event_time", to_timestamp("timestamp")) \
.withWatermark("event_time", "1 minute") \
.groupBy(
window("event_time", "10 minutes", "1 minute"), # sliding window
col("product_id")
).agg(
count("*").alias("event_count"),
_sum("qty").alias("total_qty"),
)
# Feature store-ba írás Delta-ként
feature_query = user_features.writeStream \
.format("delta") \
.outputMode("complete") \
.option("checkpointLocation", "./checkpoint/features") \
.trigger(processingTime="1 minute") \
.start("./data/delta/features/user_activity")
print("Real-time feature update: 10 min sliding window, 1 min slide, 1 min trigger")Real-time feature update: 10 min sliding window, 1 min slide, 1 min trigger
WebShop Pro streaming pipeline
Kafka
Spark
Delta Lake
A teljes WebShop Pro streaming architektúra egy end-to-end pipeline, amely a frontend eseményektől a Delta Lake táblákig terjed. Ez a szakasz összefoglalja a kurzus során tanultakat egy koherens rendszerarchitektúrává.
Architektúra áttekintés: A webshop frontend minden felhasználói interakciót Kafka topicokra küld (page views, kosár-események, rendelések, készlet-frissítések). A Spark Structured Streaming feliratkozik ezekre a topicokra, parse-olja a JSON üzeneteket, alkalmazza a watermarking-ot és az ablakos aggregációkat, majd az eredményt Delta Lake táblákba írja. A hibás üzenetek Dead Letter Queue-ba kerülnek, a valós idejű feature-ök pedig egy külön feature store Delta táblát táplálnak.
Topic tervezés: A webshop.purchases topic 12 partitionnel rendelkezik (a legnagyobb forgalmú) és végtelen retencióval (a rendeléseket sosem töröljük). A webshop.page_views 1 napos retencióval működik (a régi page view-kat a Kafka automatikusan törli). A particionálás száma a várható forgalom és a párhuzamos consumerek száma alapján került meghatározásra.
Tip: Éles környezetben a topic-ok számát, a partition-ök számát és a retenciót a tényleges adatmennyiség és SLA alapján kell meghatározni. Kezdd konzervatív értékekkel, és monitorozd a consumer lag-et (hány üzenet van lemaradásban) a skálázás döntéshez.
# WebShop Pro Streaming Pipeline Architecture
#
# [Web/Frontend] → Kafka Topics → Spark Structured Streaming → Delta Lake
#
# Topics:
# webshop.page_views → parsed → Delta: delta/page_views
# webshop.cart_events → parsed → Delta: delta/cart_events
# webshop.purchases → parsed → Delta: delta/purchases
# webshop.inventory → parsed → Delta: delta/inventory
#
# + Dead Letter Queue: webshop.*.dlq (hibás üzenetek)
# + Real-time features: 10 min sliding window → Delta: delta/features
# + Alerting: purchase amount > 500000 HUF → Slack notification
pipeline_topics = {
"webshop.page_views": {"partitions": 6, "retention_ms": 86400000},
"webshop.cart_events": {"partitions": 6, "retention_ms": 604800000},
"webshop.purchases": {"partitions": 12, "retention_ms": -1}, # forever
"webshop.inventory": {"partitions": 3, "retention_ms": 604800000},
}
for topic, cfg in pipeline_topics.items():
print(f" {topic}: {cfg['partitions']} partitions, retention={cfg['retention_ms']}ms")webshop.page_views: 6 partitions, retention=86400000ms webshop.cart_events: 6 partitions, retention=604800000ms webshop.purchases: 12 partitions, retention=-1ms webshop.inventory: 3 partitions, retention=604800000ms
Összefoglalás
Gratulálunk! Ebben a kurzusban megismerted a streaming engineering alapjait, az event-driven architektúrától a teljes production pipeline felépítéséig. A streaming nem csak egy technológia — egy teljesen más gondolkodásmód az adatfeldolgozásról.
Főbb tanulságok: Megtanultad, hogy a Kafka topic/partition/consumer group modellje hogyan teszi lehetővé a skálázható és megbízható esemény-feldolgozást. Megértetted a delivery guarantee-k (at-most-once, at-least-once, exactly-once) közötti különbségeket és azt, hogy mikor melyiket érdemes választani. A Spark Structured Streaming micro-batch modellje, a watermarking és a late data kezelés mind olyan eszközök, amelyek a valós idejű feldolgozás kihívásait oldják meg.
WebShop Pro teljes kép: A kurzus végére felépítettél egy teljes streaming pipeline-t, amely a webshop eseményeit valós időben dolgozza fel, Delta Lake táblákba írja az eredményeket, és valós idejű feature-öket biztosít az ML modelleknek. Ez az architektúra iparági standard, és a tanultakat bármilyen streaming use case-re alkalmazhatod.
Tip: A következő lépés a RAG Evaluation & AI Safety kurzus, ahol megtanulod, hogyan értékeld, hogy egy AI rendszer mennyire megbízható — ami kritikus kiegészítője a streaming pipeline-ból származó real-time adatok felhasználásának.
| Megtanultuk | Következő |
|---|---|
| Event-driven architektúra | RAG Evaluation & AI Safety |
| Kafka producer/consumer | LLM eval pipeline |
| Exactly-once, DLQ | Hallucination detection |
| Spark Structured Streaming | Guardrails & safety |
| Watermarking, late data | Production eval pipeline |
| Stream-to-Delta | Real-time AI features |
RAG Evaluation & AI Safety — ahol megtanuljuk, honnan tudjuk, hogy az AI jól válaszol!