</> Apache Spark

0 / 17 section completed
Section 00

Apache Spark: Big Data feldolgozás Spark

Az Apache Spark a big data feldolgozás iparági standardja, amely elosztott, in-memory feldolgozással képes több gigabájttól petabájtos nagyságrendű adatokkal dolgozni. A Hadoop MapReduce-hoz képest akár 100-szor gyorsabb memóriában és 10-szor gyorsabb lemezen, köszönhetően a DAG (Directed Acyclic Graph) alapú végrehajtási modellnek, amely elkerüli a felesleges lemezírásokat a lépések között.

Miért fontos? A modern data engineering világában a Spark az alapvető eszköz: a vállalati adatfeldolgozás, ETL pipeline-ek, streaming feldolgozás és gépi tanulás mögött is gyakran ez áll. Egyetlen egységes motor biztosítja a batch és streaming feldolgozást, az SQL lekérdezéseket és a gépi tanulást – nem kell külön rendszereket integrálni. A Spark futtatható YARN-on, Kubernetes-en, saját standalone cluster-ön vagy akár lokálisan is fejlesztés céljából.

A kurzus felépítése: Először a Spark architektúráját és alapfogalmait ismerjük meg (Driver, Executor, RDD, DataFrame). Ezután átmegyünk a gyakorlati DataFrame operációkon, Spark SQL-en, join és window függvényeken. A haladó rész a teljesítményoptimalizálást, a Structured Streaminget (Kafka → Delta), az MLlib gépi tanulási pipeline-t, a Catalyst optimalizálót és a Delta Lake integrációt fedi le. Végül egy teljes, valós ETL pipeline-t építünk össze.

Tippek a tanuláshoz: Minden kódrészlet futtatható – használd a "Run" gombot azonnali eredményekért. A Spark UI (localhost:4040) megértése kulcsfontosságú: ott látod a DAG-ot, a stage-eket, a shuffle-t és a memóriahasználatot. Ha valamit nem értesz, nézd meg a Section 01 fogalomtáblázatát, ott minden kulcsfogalom röviden definiálva van.

Miről szól?

Spark Session → RDD → DataFrame → SQL → Streaming → MLlib → Optimizer → Delta Lake

Section 01

Spark alapfogalmak

FogalomLeírásFontos
Spark SessionA Spark belépési pontjaEgyetlen objektum, ami mindent kezel
RDDResilient Distributed DatasetAlacsony szintű API, immutable, partitioned
DataFrameTáblázatos adatstruktúraHigh-level API, optimalizált
PartitionAdatpartícióElosztott feldolgozás egysége
ExecutorWorker node folyamatAhol a tényleges számítás fut
DriverFő vezérlő folyamatTaskok kiosztása, scheduler
TransformLazy operationfilter, map, groupBy – nem fut azonnal
ActionTriggers executioncollect, count, show – ez indítja el
ShuffleAdat mozgatás node-ok közöttDrága művelet, minimalizálandó
Section 02

Spark architektúra Spark

A Spark architektúra három fő komponensből áll: a Driver program, amely a felhasználói kódot futtatja és a végrehajtási tervet (DAG) építi; a Cluster Manager (YARN, Kubernetes vagy Standalone), amely az erőforrásokat allokálja; és az Executor folyamatok, amelyek a munkánókon futtatják a tényleges feladatokat és tárolják a cache-elt adatokat. Ez a master-worker minta teszi lehetővé a horizontális skálázást – több node hozzáadásával lineárisan nő a feldolgozási kapacitás.

Miért fontos ezt érteni? A Spark teljesítményhangolás alapja, hogy tudd, melyik komponens mit csinál. A Driver építi a logikai tervet, a Catalyst optimalizáló alakítja fizikai tervvé, majd a Driver osztja fel task-okra és küldi az Executor-oknak. Ha a Driver memóriája elfogy (OOM), az egész alkalmazás összeomlik. Ha az Executor-ok nem kapnak elég memóriát, spill (lemezírás) történik, ami drasztikusan lelassítja a feldolgozást. A Cluster Manager biztosítja, hogy erőforrás-okozta konfliktusok ne lépjenek fel a munkaterhelések között.

A végrehajtási modell: Amikor egy action-t meghívsz (pl. collect(), count()), a Driver létrehoz egy Job-ot. A Job stage-ekre bomlik a shuffle határoknál. Minden stage task-okból áll, ahol egy task pontosan egy partition-t dolgoz fel egy Executor-on. A Spark UI-n (localhost:4040) mindez vizuálisan nyomon követhető: a DAG vizualizáció mutatja a stage-eket, az Event Timeline a task-ok futását, a Storage tab pedig a cache-elt adatokat.

Gyakorlati tippek: Fejlesztéshez a local[*] mód elegendő (minden CPU mag egy szálnak felel meg). Termelésben a YARN vagy Kubernetes a szabvány. A spark-submit parancs --num-executors, --executor-cores és --executor-memory paramétereivel finomhangolható a cluster. Alapszabály: egy Executor 5 core-t kapjon, és a memória legyen az adatméret 2-3-szorosa. A Driver memóriája akkor kell nagy, ha nagy méretű collect()-et hívsz vagy széles join-okat végzel.

Driver
Program, SparkContext
Cluster Manager
YARN/K8s/Standalone
Executor 1
Task 1, Task 2, Cache
Executor 2
Task 3, Task 4
KomponensSzerepPélda
DriverVezérlés, DAG építésspark-submit --master yarn
Cluster ManagerErőforrás allokációYARN, Kubernetes, Standalone
ExecutorTask végrehajtás, cache1-5 per worker node
TaskEgyetlen munkaegység1 partition = 1 task
StageTask csoport shuffle előttshuffle boundary-k között
JobEgy action által indítottcollect(), count() = 1 job
Section 03

Környezet beállítása Spark

A Spark használatának első lépése a Spark Session létrehozása, amely a Spark minden funkcionalitásának belépési pontja. A SparkSession egyesíti a korábbi SparkContext, SQLContext és HiveContext objektumokat egyetlen egységes interfészbe. A builder minta lehetővé teszi az alkalmazásnév, konfigurációs beállítások és bővítmények megadását – a példában a Delta Lake bővítményt is betöltjük, ami ACID tranzakciókat és time travel-t ad a Spark-hoz.

Miért fontos a konfiguráció? A Spark Session-ben megadott beállítások határozzák meg a végrehajtási környezetet: a master URL (local[*] vagy yarn/k8s), a memória allokációt, a shuffle partition-ok számát, a serializer-t és sok mást. A Delta Lake extension betöltése elengedhetetlen, ha Delta formátumú adatokat akarsz olvasni vagy írni. A getOrCreate() biztosítja, hogy ha már létezik aktív session, azt használja új létrehozása helyett.

A kódrészlet bemutatása: A pyspark.sql modulból importáljuk a SparkSession-t, majd a builder láncolásával konfiguráljuk. Az appName() a Spark UI-n lesz látható, ami segíti a debuggolást. A config() hívás a Delta extension-t regisztrálja. A getOrCreate() végül létrehozza vagy visszakapja a session-t. A spark.version és a UI URL kiírása ellenőrzésre szolgál – ha ezeket látod, a Spark megfelelően fut.

Tippek: Fejlesztéshez a local[*] mód (minden elérhető mag használata) elegendő. Termelésben a spark-submit --master yarn vagy --master k8s://... paranccsal indítod. A Spark UI a localhost:4040-en érhető el, és a legfontosabb debug eszközöd – mindig nyisd meg, ha valami nem a várt módon működik. A Delta Lake használatához a pip install delta-spark csomag is szükséges.

[1]
# PySpark telepítés és indítás
# Terminálban: pip install pyspark delta-spark

# Spark Session létrehozása
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("crash-course") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .getOrCreate()

print(f"Spark version: {spark.version}")
print(spark.sparkContext.uiWebUrl)
Output:
[spark] Spark version: 3.5.1
[spark] Spark UI: http://localhost:4040
[spark] Cluster: local[*] (8 cores)
[spark] Memory: 4GB allocated
[spark] Delta Lake extension loaded
Section 04

RDD alapok Spark

Az RDD (Resilient Distributed Dataset) a Spark legalacsonyabb szintű adatstruktúrája, amely három alapvető tulajdonsággal rendelkezik: immutable (megváltoztathatatlan – minden transzformáció új RDD-t hoz létre), partitioned (az adatok partitions-okra vannak osztva és elosztva a cluster node-jai között), és fault-tolerant (hibatűrő – ha egy node összeomlik, a Spark a lineage, azaz a származási lánc alapján újraépíti az elveszett partition-t).

Miért fontos az RDD-t ismerni? Bár a legtöbb esetben DataFrame-et használunk (magasabb szintű, optimalizált API), vannak helyzetek, amikor RDD-re van szükség: nem strukturált adatok feldolgozása, egyedi particionálási logika implementálása, vagy ha a Catalyst optimalizáló nem tudja hatékonyan kezelni a komplex transzformációkat. Az RDD megértése segít abban is, hogy mélyebben átlásd a DataFrame mögötti működést, hiszen a DataFrame végül RDD-ként hajtódik végre az Executor-okon.

A kódrészlet bemutatása: A parallelize() metódus létrehoz egy RDD-t a memóriában lévő adatokból. A filter() és map() transzformációk lazy-ek – nem futnak azonnal, csak feljegyzik a műveletet a DAG-ban. Csak az action-k (collect(), reduce(), count()) indítják el a végrehajtást. A példában először szűrjük a páros számokat, majd négyzetetre emeljük őket, végül összesítjük az eredményt. Figyeld meg, hogy a getNumPartitions() megmutatja, hány partition-ra osztotta a Spark az adatot – ez megegyezik a rendelkezésre álló CPU magok számával local módban.

Tippek: Ne használd az RDD-t, ha DataFrame is megoldja a feladatot – a DataFrame 10-100x gyorsabb lehet a Catalyst optimalizálónak köszönhetően. Ha mégis RDD-t kell használnod, kerüld a collect()-et nagy adatokon (mindent a Driver memóriájába hoz), és használj mapPartitions()-t a map() helyett, ha expensive inicializációra van szükség (pl. adatbázis kapcsolat nyitása partition-onként egyszer).

[2]
# RDD létrehozása
rdd = spark.sparkContext.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])

# Transformations (lazy)
filtered = rdd.filter(lambda x: x % 2 == 0)  # párosak
mapped = filtered.map(lambda x: x * x)      # négyzet

# Actions (triggers execution)
print(f"Partitions: {rdd.getNumPartitions()}")
print(f"Even squares: {mapped.collect()}")
print(f"Sum: {mapped.reduce(lambda a,b: a+b)}")
print(f"Count: {mapped.count()}")
Output:
[spark] Partitions: 8
[spark] Even squares: [4, 16, 36, 64, 100]
[spark] Sum: 220
[spark] Count: 5
Lazy Evaluation

A transformations (filter, map, flatMap) nem futnak azonnal – csak felépítik a DAG-ot. Az actions (collect, count, reduce) indítják el a végrehajtást.

Section 05

RDD haladó: Word Count Spark

A Word Count (szószámolás) a big data feldolgozás "Hello World"-je – ez a klasszikus példa, amely bemutatja a MapReduce programozási modellt és az RDD-k harness erejét. Bár egyszerűnek tűnik, a háttérben azonos mintákat használ, mint a valós feldolgozási feladatok: adatok felosztása (split), transzformáció (map), aggregáció (reduce) és rendezés (sort). Ez a minta alapja a log-elemzésnek, a szövegbányászatnak és az ETL pipeline-oknak.

A művelet lánc megértése: A textFile() beolvassa a fájlt soronként egy RDD-be. A flatMap() minden sort szavakra bont – ellentétben a map()-pel, amely egy-az-egyhez leképezést ad, a flatMap() "kilapítja" az eredményt, tehát egy sorból több szó lesz külön elem. A map() ezután minden szót egy (szó, 1) párra alakít – ez a "mapper" fázis. A reduceByKey() összeadja az azonos kulcsú értékeket – ez a "reducer" fázis, ami shuffle-t vált ki. Végül a sortBy() csökkenő sorrendbe rendezi az eredményt.

A shuffle fontossága: A reduceByKey() az egyetlen olyan művelet a láncban, amely shuffle-t (adatmozgatás a node-ok között) igényel. Ez azért van, mert az azonos szót tartalmazó partition-ok adatait össze kell gyűjteni egy helyre. A Spark itt egy optimalizációt vélez: mindegyik partition-on lokálisan "pre-aggregál" (combineByKey), mielőtt shuffle-nek küldené az adatokat – ez drasztikusan csökkenti a hálózati forgalmat.

Tippek: A textFile() helyett valós projektekben gyakran wholeTextFiles()-t használd, ha könyvtárban lévő összes fájlt be kell olvasni. A take(n) biztonságosabb, mint a collect(), mert csak az első n elemet hozza a Driver-be. Ha a feldolgozás lassú, ellenőrizd a partition-ok számát – túl sok kis partition esetén coalesce()-szel csökkentsd. HaPontosabb eredmény kell, használj reguláris kifejezést a split()-ben a szóhatárok pontosabb azonosítására.

[3]
# Szószámolás (Word Count) – klasszikus példa
text_rdd = spark.sparkContext.textFile("data.txt")

word_counts = text_rdd \
    .flatMap(lambda line: line.split(" ")) \
    .map(lambda word: (word.lower(), 1)) \
    .reduceByKey(lambda a, b: a + b) \
    .sortBy(lambda x: x[1], ascending=False)

print(f"Unique words: {word_counts.count()}")
print("Top 10:")
for word, count in word_counts.take(10):
    print(f"  {word}: {count}")
Output:
[spark] Unique words: 2,847
[spark] Top 10:
  the: 1,245
  and: 892
  spark: 756
  data: 634
  rdd: 521
  dataframe: 489
  partition: 412
  shuffle: 367
  transform: 334
  action: 298
Section 06

DataFrame alapok Spark

A DataFrame a Spark legfontosabb és leggyakrabban használt API-ja, amely táblázatos formában tartja az adatokat, hasonlóan egy relációs adatbázis táblához vagy a pandas DataFrame-hez – azzal a lényeges különbséggel, hogy elosztott, tehát több gépen párhuzamosan dolgozik. A DataFrame rendelkezik sémával (schema), azaz minden oszlopnak ismert a típusa, ami lehetővé teszi a Catalyst optimalizáló számára, hogy hatékony végrehajtási tervet készítsen. A Tungsten execution engine emellett bináris formátumot használ az adatok tárolására, ami kevesebb memóriát fogyaszt és gyorsabb feldolgozást eredményez.

Miért jobb az RDD-nél? A DataFrame esetében a Spark "látja" az adatok struktúráját és a lekérdezés logikáját, így automatikusan optimalizál: predicate pushdown (szűrés minél korábban), column pruning (csak a szükséges oszlopok olvasása), join reorder (kis tábla broadcast-olása). Az RDD esetében a Spark csak lambdát lát, és nem tudja optimalizálni a végrehajtást. Ez a különbség 10-100x teljesítménytöbbletet jelenthet valós munkaterheléseknél.

A kódrészlet bemutatása: A createDataFrame() metódus tuple-ök listájából hoz létre DataFrame-et, ahol a második paraméter az oszlopneveket adja meg. A show() kiírja az adatokat táblázatos formában, a printSchema() megmutatja az oszlopok típusait. A count() és a rdd.getNumPartitions() megmutatja, hány sor és hány partition van – kis adatoknál általában 1 partition elég, de nagy adatoknál a Spark automatikusan többet hoz létre (alapértelmezetten 200 a shuffle után).

Tippek: Ha a DataFrame sémája nem egyértelmű, használd a StructType/StructField explicit sémadefiniálást – ez gyorsabb, mint a séma-kikövetkeztetés (schema inference). A createDataFrame() helyett éles projektekben általában spark.read paranccsal olvasunk adatokat Parquet, JSON, CSV, Delta vagy adatbázis formátumból. Mindig ellenőrizd a sémát a printSchema()-val, mielőtt tovább dolgozol az adatokkal.

[4]
# DataFrame létrehozása
df = spark.createDataFrame([
    (1, "Alice", 25, 50000),
    (2, "Bob", 35, 75000),
    (3, "Charlie", 45, 90000),
    (4, "Diana", 28, 60000),
    (5, "Eve", 52, 120000),
], ["id", "name", "age", "salary"])

df.show()
df.printSchema()
print(f"Rows: {df.count()}, Partitions: {df.rdd.getNumPartitions()}")
Output:
[spark] DataFrame created with 5 rows
+---+-------+---+------+
| id|   name|age|salary|
+---+-------+---+------+
|  1|  Alice| 25| 50000|
|  2|    Bob| 35| 75000|
|  3|Charlie| 45| 90000|
|  4|  Diana| 28| 60000|
|  5|    Eve| 52|120000|
+---+-------+---+------+

root
 |-- id: integer
 |-- name: string
 |-- age: integer
 |-- salary: integer

[spark] Rows: 5, Partitions: 1
Section 07

DataFrame operációk: filter, agg, join Spark

A DataFrame operációk a Spark napi munkájának 90%-át teszik ki: szűrés (filter), oszlopkiválasztás (select), új oszlop hozzáadása (withColumn), aggregáció (agg) és feltételes logika (when/otherwise). Ezek a műveletek a DataFrame API építőkövei, amelyekkel komplex adattranszformációkat építhetsz fel deklaratívan, SQL-szerű szintaxissal. Minden művelet lazy – csak az action hívásakor (pl. show()) fut le a tényleges végrehajtás.

Miért érdemes a DataFrame API-t használni SQL helyett? Bár a Spark SQL ugyanazt az eredményt adja, a DataFrame API összetett transzformációk esetén jobban kezelhető: dinamikus oszlopnevek, változókon alapuló szűrők, és programozott pipeline építés. Az API emellett típusbiztosabb (IDE támogatás, compile-time ellenőrzés), és könnyebben parametrizálható. A két megközelítés keverhető is: a DataFrame API és a Spark SQL ugyanazt a Catalyst optimalizálót használja, így a teljesítmény azonos.

A kódrészlet bemutatása: A filter() és select() kombinációval szűrünk és oszlopokat választunk ki – a col() függvény oszlophivatkozást hoz létre. A withColumn() új oszlopot ad hozzá (bonus = salary * 0.1). Az agg() több aggregációt végez egyszerre: avg, sum, count. A when().otherwise() a SQL CASE-WHEN megfelelője, amivel feltételes oszlopokat hozhatunk létre. A show() mindig az utolsó lépés, ami a végrehajtást triggereli.

Tippek: Használd a col() függvényt oszlophivatkozásokhoz a string-alapú hivatkozások ("age") helyett, ahol lehet – a típusbiztosabb és jobban olvasható. Az agg()-ben alias() használatával nevezd el az eredményoszlopokat érthetően. Ha sok withColumn()-t láncolsz, fontold meg a select()-et expr()-rel, ami tömörebb. A when() láncolható: when(..., "A").when(..., "B").otherwise("C") formában.

[5]
from pyspark.sql.functions import col, avg, sum, count, when

# Filter + Select
young = df.filter(col("age") < 40).select("name", "salary")

# With new column
df2 = df.withColumn("bonus", col("salary") * 0.1)

# Aggregation
stats = df.agg(
    avg("salary").alias("avg_salary"),
    sum("salary").alias("total"),
    count("*").alias("count")
)

# Conditional
df3 = df.withColumn("bracket", when(col("salary") > 80000, "high").otherwise("normal"))

stats.show()
df3.show()
Output:
[spark] Aggregation:
+----------+------+-----+
|avg_salary| total|count|
+----------+------+-----+
|   79000.0|395000|    5|
+----------+------+-----+

[spark] With bracket:
+---+-------+---+------+-------+
| id|   name|age|salary|bracket|
+---+-------+---+------+-------+
|  1|  Alice| 25| 50000| normal|
|  2|    Bob| 35| 75000| normal|
|  3|Charlie| 45| 90000|   high|
|  4|  Diana| 28| 60000| normal|
|  5|    Eve| 52|120000|   high|
+---+-------+---+------+-------+
Section 08

Spark SQL Spark

A Spark SQL lehetővé teszi, hogy standard SQL lekérdezéseket futtass a DataFrame-eken, mintha relációs adatbázis táblák lennének. A createOrReplaceTempView() metódus ideiglenes nézetet hoz létre, amely a jelenlegi Spark Session életéig él, és SQL-ből hivatkozható. A Spark SQL teljes ANSI SQL-92 kompatibilitást nyújt: SELECT, WHERE, GROUP BY, HAVING, JOIN, UNION, subquery, CASE-WHEN és sok window function mind támogatott.

Miért hasznos az SQL támogatás? Sok data engineer és elemző már ismeri az SQL-t, és gyorsabban tud dolgozni SQL-ben, mint DataFrame API-ban. A Spark SQL emellett integrálható BI eszközökkel (Tableau, Power BI, Superset) JDBC/ODBC kapcsolaton keresztül. Az SQL és a DataFrame API felcserélhetők – ugyanaz a Catalyst optimalizáló dolgozik mindkettő mögött, így a teljesítmény azonos. A választás az igényeden múlik: dinamikus pipeline-hoz a DataFrame API jobb, ad hoc elemzéshez és riportokhoz az SQL természetesebb.

A kódrészlet bemutatása: Először a createOrReplaceTempView("employees") létrehozza az ideiglenes nézetet. A spark.sql() fogadja a SQL string-et, ahol a CASE-WHEN korcsoportokat hoz létre (young/mid/senior), a GROUP BY 1 az első oszlopra csoportosít, az aggregációk (COUNT, AVG, MIN, MAX) statisztikákat számolnak, az ORDER BY DESC csökkenő sorrendbe teszi az eredményt. A result DataFrame-et show()-val jelenítjük meg.

Tippek: A temp view csak a session-en belül látható – ha globális nézet kell (más session-ek számára), használd a createGlobalTempView()-t. Komplex lekérdezéseknél a CTE (WITH clause) javítja az olvashatóságot. Kerüld a SELECT *-ot nagy táblákon (minden oszlop beolvasása felesleges I/O). A spark.sql() paramétezésére használj f-string-et vagy format()-ot, de SQL injection ellen mindig validáld a bemenetet.

[6]
# Temp view létrehozása
df.createOrReplaceTempView("employees")

# SQL lekérdezés
result = spark.sql("""
    SELECT
        CASE
            WHEN age < 30 THEN 'young'
            WHEN age < 50 THEN 'mid'
            ELSE 'senior'
        END as age_group,
        COUNT(*) as cnt,
        AVG(salary) as avg_salary,
        MIN(salary) as min_salary,
        MAX(salary) as max_salary
    FROM employees
    GROUP BY 1
    ORDER BY avg_salary DESC
""")
result.show()
Output:
[spark] SQL query executed 
+---------+---+----------+----------+----------+
|age_group|cnt|avg_salary|min_salary|max_salary|
+---------+---+----------+----------+----------+
|   senior|  1|  120000.0|    120000|    120000|
|      mid|  2|   82500.0|     75000|     90000|
|    young|  2|   55000.0|     50000|     60000|
+---------+---+----------+----------+----------+
Section 09

Join és Window Functions Spark

A Join és a Window Functions a Spark két legerősebb elemzőeszköze. A join összekapcsol két DataFrame-et egy megadott kulcs alapján (inner, left, right, full, cross), hasonlóan a relációs adatbázisokhoz. A window functions lehetővé teszik olyan számításokat, amelyek a sorok kontextusában maradnak – azaz nem összevonják a sorokat, mint a groupBy, hanem minden sorhoz hozzáadják az ablakfüggvény eredményét (rank, running sum, moving average, row number).

Miért fontosak a window functions? Gyakori feladat, hogy minden sorhoz szükség van egy "kontextusra": mi az adott sor rangja a csoportján belül (rank), mennyi az eddigi összeg (running total), mi a mozgóátlag (moving average). Ezeket groupBy-val nem lehet megoldani, mert a groupBy összevonja a sorokat. A window functions a partitionBy (csoportosítás) és orderBy (rendezés) definícióval dolgoznak, és az ablak keret (rowsBetween/rangeBetween) határozza meg, mely sorok vesznek részt a számításban.

A kódrészlet bemutatása: Az első részben egy inner join kapcsolja össze a felhasználókat a rendelésekkel. A második részben a Window.partitionBy("user_id").orderBy(col("amount").desc()) definíál egy ablakot, ahol a rank() megadja a rendelés rangját felhasználónként csökkenő összeg szerint. A harmadik részben a rowsBetween(unboundedPreceding, unboundedFollowing) egy "teljes ablakot" határoz meg, ahol a spark_sum az adott felhasználó összes rendelésének összegét adja minden sornál.

Tippek: A join-nál mindig add meg a join típust ("inner", "left", "right"), mert az alapértelmezés nem egyértelmű. Nagy táblák join-nál használj broadcast()-ot, ha az egyik tábla kicsi (< 10MB alapértelmezetten). A window functions után következő shuffle miatt ezek drága műveletek lehetnek – megfelelő partitioninggal csökkentheted a költséget. A rowsBetween paraméterekkel pontosan vezérelheted, mely sorok számítanak bele az ablakba.

[7]
# Join példa
orders = spark.createDataFrame([
    (1, 1, "laptop", 1200),
    (2, 2, "phone", 800),
    (3, 1, "tablet", 400)
], ["order_id", "user_id", "product", "amount"])

joined = df.join(orders, df.id == orders.user_id, "inner")
joined.select("name", "product", "amount").show()

# Window function
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, sum as spark_sum

w = Window.partitionBy("user_id").orderBy(col("amount").desc())
ranked = orders.withColumn("rank", rank().over(w))

w2 = Window.partitionBy("user_id").rowsBetween(
    Window.unboundedPreceding, Window.unboundedFollowing)
running = orders.withColumn("total", spark_sum("amount").over(w2))

ranked.show()
Output:
[spark] Inner join:
+-----+-------+------+
| name|product|amount|
+-----+-------+------+
|Alice| laptop|  1200|
|Alice| tablet|   400|
|  Bob|  phone|   800|
+-----+-------+------+

[spark] Window rank by user:
+--------+-------+------+----+
|order_id|product|amount|rank|
+--------+-------+------+----+
|       1| laptop|  1200|   1|
|       3| tablet|   400|   2|
|       2|  phone|   800|   1|
+--------+-------+------+----+
Section 10

Performance optimalizálás Spark

A Spark teljesítményoptimalizálás három pillérre épül: partitioning (az adatok megfelelő felosztása a clusterön), caching (gyakran használt adatok memóriában tartása), és a shuffle minimalizálás (az adatmozgatás csökkentése a node-ok között). Ezek közül a shuffle a legdrágább művelet, mert hálózati I/O-t és lemezírást igényel – egyetlen felesleges groupBy vagy join komplett shuffle-t válthat ki, ami percekkel növelheti a futási időt.

A kódrészlet bemutatása: A repartition() hash-alapú újraosztást végez, ami shuffle-t vált ki, de utána a kulcs alapú műveletek (join, groupBy) lokálisak maradnak. A coalesce() ellentétben nem vált ki shuffle-t – csak összevonja a meglévő partition-ket, ami ideális a kimenet csökkentésére (pl. 200 partition-ről 10-re egy fájlba írás előtt). A cache() és persist() memóriában tartja az adatokat, de figyelj: csak akkor érdemes használni, ha ugyanazt a DataFrame-et legalább 2-szer használod. A broadcast() egy kis táblát minden node-ra másol, elkerülve a shuffle-join-t – ez az egyik leghatékonyabb optimalizálás.

Az explain() megértése: Az explain(True) kilistázza a teljes végrehajtási tervet: Parsed Logical Plan → Analyzed → Optimized → Physical Plan. A Physical Plan mutatja meg a valós végrehajtást: milyen join stratégiát használ (BroadcastHashJoin, SortMergeJoin), hol van filter, project, aggregate. Ha a tervben sok Exchange lépést látsz, az shuffle-t jelent – ezeket érdemes minimalizálni.

Gyakorlati tippek: Alapszabály: 128MB adat partition-onként. Túl sok kis partition = sok scheduler overhead. Túl kevés nagy partition = nem használja ki a párhuzamosságot. Használd a spark.sql.shuffle.partitions beállítást (alapértelmezett 200) a munkaterhelésnek megfelelően. A Spark UI "SQL" tabján látod a tényleges végrehajtási tervet minden lekérdezésnél – ezt érdemes vizsgálni, ha lassú a pipeline.

[8]
# Repartition
df_repartitioned = df.repartition(200, "age")

# Coalesce (reduce partitions, no shuffle)
df_coalesced = df.coalesce(10)

# Cache
df.cache()
df.persist(StorageLevel.MEMORY_AND_DISK)

# Broadcast join (small table)
from pyspark.sql.functions import broadcast
small_df = spark.createDataFrame([(1, "IT"), (2, "HR")], ["dept_id", "dept"])
joined = df.join(broadcast(small_df), "dept_id")

# Explain plan
df.filter("age > 30").select("name").explain(True)
Output:
[spark] == Physical Plan ==
*(1) Project [name#1]
+- *(1) Filter (age#2 > 30)
   +- Scan ExistingRDD

[spark] Optimizations applied:
   Predicate pushdown
   Column pruning
   BroadcastHashJoin (small table)
   WholeStageCodegen
MűveletLeírásShuffle?Mikor?
repartition()Hash partitioningIgen200 part / nagy data
coalesce()CsökkentésNEMKimenet optimalizálás
cache()Memory cacheNincsIsmételt lekérdezés
broadcast()Kis tábla minden nodeNincsSmall/large join
Section 11

Structured Streaming: Kafka → Delta Spark Kafka

A Structured Streaming a Spark deklaratív stream feldolgozó motorja, amely a DataFrame API-t kiterjeszti valós idejű adatfolyamokra. A kulcsötlelet: a stream egy korlátlan DataFrame, azaz ugyanazokat a transzformációkat alkalmazhatod, mint batch módban (filter, join, aggregation). A motor a háttérben növekményesen frissíti az eredményt, ahogy új adatok érkeznek. A példa egy Kafka topicból olvassa az eseményeket, feldolgozza őket, és Delta Lake-be írja – ez az egyik leggyakoribb valós streaming architektúra.

Miért Kafka + Delta? Az Apache Kafka a de facto standard esemény-streaming platform, amely milliónyi eseményt tud másodpercenként fogadni és szállítani. A Delta Lake adatformátum pedig ACID transakciókat biztosít a stream írásakor: ha a pipeline összeomlik, a checkpoint-ból folytatódik, nem lesz duplikált vagy hiányzó adat. Ez a kombináció (Kafka source → Spark processing → Delta sink) az alapja a legtöbb modern valós idejű adatplatformnak.

A kódrészlet bemutatása: A readStream.format("kafka") egy streaming DataFrame-et hoz létre a Kafka topicból. A from_json() parse-olja a JSON üzeneteket a megadott séma alapján (user_id, action, value). A writeStream.format("delta") a Delta Lake-be írja az eredményt. Az outputMode("append") új sorokat ad hozzá (létezőket nem módosít). A checkpointLocation biztosítja, hogy újraindítás után onnan folytatódik a feldolgozás, ahol abbamaradt. A start() elindítja a streamet a háttérben.

Tippek: A checkpoint elengedhetetlen termelésben – nélküle az újraindítás az elejétől kezdi a feldolgozást. A schema mindig explicit legyen, ne hagyd a Spark-ot kitalálni. Használd a trigger beállítást (processingTime) a batch méret vezérlésére. A stream figyelését a query.status és query.lastProgress hívásokkal végezheted. Termelésben a streaming query-t mindig wrap-old try/except-be és állíts be alert-et, ha a query leáll.

[9]
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StringType, IntegerType

schema = StructType() \
    .add("user_id", IntegerType()) \
    .add("action", StringType()) \
    .add("value", IntegerType())

stream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "events") \
    .load()

parsed = stream.select(
    from_json(col("value").cast("string"), schema).alias("data")
).select("data.*")

query = parsed.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/checkpoint") \
    .start("/delta/events")

print(f"Streaming active: {query.isActive}")
Output:
[spark] Structured Streaming started
[spark] Source: Kafka [events]
[spark] Sink: Delta [/delta/events]
[spark] Trigger: processingTime=0ms (continuous)
[spark] Batch 1: 1,245 events processed (0.8s)
[spark] Batch 2: 1,389 events processed (0.6s)
[spark] Batch 3: 1,102 events processed (0.7s)
[spark] Throughput: ~1.2k events/sec
[spark] Latency: 150ms avg
Section 12

MLlib: ML Pipeline Spark-on Spark

Az MLlib a Spark beépített gépi tanulás könyvtára, amely skálázható ML algoritmusokat és pipeline eszközöket biztosít. A Pipeline API a scikit-learnéhoz hasonló: a gépi tanulási folyamatot lépésekre (stage-ekre) bontja – feature engineering (VectorAssembler, StandardScaler), modell illesztés (RandomForest, LogisticRegression), és értékelés (evaluator) –, és ezeket láncolhatóvá teszi. A teljes pipeline elmenthető és betölthető, ami biztosítja a reprodukálhatóságot.

Miért MLlib és nem scikit-learn? A scikit-learn egy gépen fut, és nem skálázódik 10+ millió sor fölé. Az MLlib ugyanazt a Spark architektúrát használja, mint a batch feldolgozás: az adatok elosztottak maradnak, a tréning párhuzamosan fut minden Executor-on. Ha az adatod már Spark-ban van (DataFram-ként), nincs szükség adatmozgatásra – a tréning ott történik, ahol az adat van. A Pipeline emellett termelésbe könnyebben vihető: mentés/betöltés, batch scoring, és streaming predikció is támogatott.

A kódrészlet bemutatása: A VectorAssembler egyesíti a feature oszlopokat (age, income, tenure, usage) egy vektor oszloppá. A StandardScaler normalizálja a feature-ket (z-score), ami fontos a távolság-alapú algoritmusoknál. A RandomForestClassifier a célváltozó (churn) alapján tanít. A Pipeline összefűzi ezeket stage-eket. A randomSplit([0.8, 0.2]) 80-20%-os train-test felosztást ad. A BinaryClassificationEvaluator AUC-ROC metrikával értékeli a modellt. Az AUC 0.92 felett kiváló eredmény.

Tippek: A Pipeline mentése (model.save()) és betöltése (PipelineModel.load()) biztosítja a reprodukálhatóságot. A CrossValidator használatával hyperparameter tuning-ot végezhetsz, de figyelj: ez drága, mert minden paraméterkombinációra lefuttatja a tréninget. A feature importance (jellemzőfontosság) mutatja, melyik változó járul hozzá leginkább a predikcióhoz – ez értelmezhetőbbé teszi a modellt. Termelésben használj MLflow-t a kísérletek és modellek nyomon követésére.

[10]
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Feature assembly
assembler = VectorAssembler(
    inputCols=["age", "income", "tenure", "usage"],
    outputCol="features_raw"
)
scaler = StandardScaler(inputCol="features_raw", outputCol="features")
classifier = RandomForestClassifier(labelCol="churn", numTrees=100)

pipeline = Pipeline(stages=[assembler, scaler, classifier])

# Train/test split
train, test = df.randomSplit([0.8, 0.2], seed=42)
model = pipeline.fit(train)
predictions = model.transform(test)

evaluator = BinaryClassificationEvaluator(labelCol="churn")
auc = evaluator.evaluate(predictions)
print(f"AUC: {auc:.4f}")
Output:
[spark] Pipeline stages: 3 (VectorAssembler → StandardScaler → RandomForest)
[spark] Training: 800 rows | Test: 200 rows
[spark] AUC: 0.9234
[spark] Feature Importance:
  income: 0.342
  usage:  0.289
  tenure: 0.213
  age:    0.156
[spark] Model saved: /models/churn_rf_v1
Section 13

Catalyst Optimizer Spark

A Catalyst a Spark SQL lekérdezés-optimalizáló motorja, amely automatikusan átalakítja a felhasználó által írt logikai lekérdezést a leghatékonyabb fizikai végrehajtási tervvé. A Catalyst négy fázisban dolgozik: Analysis (feloldja a hivatkozásokat, ellenőrzi a típusokat), Logical Optimization (alkalmazza a szabályalapú optimalizálásokat: predicate pushdown, constant folding, column pruning), Physical Planning (több fizikai tervet generál és kiválasztja a legjobbat, pl. BroadcastHashJoin vs. SortMergeJoin), és Code Generation (Tungsten által JIT-re fordítja a tervet Java bytekódra).

Miért fontos? A Catalyst teszi lehetővé, hogy a Spark DataFrame API-ja versenyezzen a kézzel optimalizált kóddal. Amikor írsz egy df.filter().join().groupBy() láncot, a Catalyst automatikusan átszervezi: a filter-t a join elé tolja (így kevesebb soron kell join-olni), eldobja a felesleges oszlopokat, és kiválasztja a megfelelő join stratégiát. Ehhez nem kell semmit tenned – a Catalyst a DataFrame sémáját és a logikai tervet használja ezekhez az optimalizálásokhoz.

A kódrészlet bemutatása: Az explain(True) parancs megmutatja a teljes optimalizálási folyamatot. A Parsed Logical Plan a nyers lekérdezés, az Analyzed Plan ellenőrzi a típusokat, az Optimized Plan mutatja az átszervezett tervet (filter pushdown, column pruning), a Physical Plan pedig a végső végrehajtási tervet. A példában a filter az age > 30 feltétellel a join elé kerül, a BroadcastHashJoin a kis tábla miatt, és csak a szükséges oszlopok (department, salary) maradnak.

Tippek: Mindig futtasd az explain()-t lassú lekérdezéseknél – a legtöbb probléma azonnal láthatóvá válik. Ha SortMergeJoin-t látsz ott, ahol BroadcastHashJoin jobb lenne, próbáld meg a spark.sql.autoBroadcastJoinThreshold beállítást növelni. Ha sok Exchange lépés van, ott shuffle történik – érdemes megnézni, lehet-e elkerülni partition-on belüli műveletekkel. A WholeStageCodegen jelzi, hogy a Spark egyben fordítja le az egymás utáni operátorokat – ez nagyon jó teljesítményszempontból.

[11]
# Explain plan
df.filter(col("age") > 30) \
  .join(other_df, "id") \
  .groupBy("department") \
  .agg(avg("salary")) \
  .explain(True)
Output:
== Parsed Logical Plan ==
Filter (age > 30) → Join → Aggregate

== Analyzed Logical Plan ==
[Resolved references, types checked]

== Optimized Logical Plan ==
1. Push down Filter before Join
2. Column pruning (only needed cols)
3. Join reorder (small table first)

== Physical Plan ==
*(2) HashAggregate(department, avg(salary))
+- *(2) Project [department, salary]
   +- *(1) BroadcastHashJoin [id]
      +- *(1) Filter (age > 30)
      +- BroadcastExchange
Optimalizálások

✅ Predicate pushdown: filter minél korábban
✅ Column pruning: csak a szükséges oszlopok
✅ Join reorder: kis tábla broadcast
✅ Constant folding: 1+1→2
✅ WholeStageCodegen: JIT kompiláció

Section 14

Delta Lake + Spark Delta

A Delta Lake az open source adattároló réteg, amely ACID tranzakciókat, time travel-t (adat visszaállítás korábbi verzióra) és MERGE/upsert műveleteket ad a Spark-hoz. A Delta Lake a Parquet formátumra épül, de tranzakciós naplót (transaction log, _delta_log/) ad hozzá, amely minden változást nyomon követ. Ez lehetővé teszi a párhuzamos írást és olvasást, a séma fejlődést (schema evolution) és a adatépség garantálását – mindezt a Parquet sebességével.

Miért Delta Lake és nem sima Parquet? A sima Parquet fájloknál a "párhuzamos írók felülírják egymás adatait" probléma gyakori. Nincs tranzakciókezelés, nincs visszaállítás, és a METALÓG sem frissül atomikusan. A Delta Lake ezeket a problémákat megoldja: az ACID garanciák miatt a párhuzamos írók nem ütköznek, a time travel lehetővé teszi az adatok visszaállítását bármely korábbi verzióra, és a MERGE (upsert) művelet hatékonyan kezeli a "meglévő sorok frissítése, újak beszúrása" mintát, ami a legtöbb ETL pipeline alapja.

A kódrészlet bemutatása: A write.format("delta") Delta formátumba menti az adatokat, a partitionBy("date") dátum alapján particionál. A read.format("delta") visszolvassa az adatokat. A time travel az option("versionAsOf", 5) paraméterrel egy korábbi verziót olvas vissza. A DeltaTable.forPath() és merge() kombináció egy SQL MERGE-nek felel meg: ha a source és target ID megegyezik, frissíti a sort (whenMatchedUpdateAll), ha nincs egyezés, beszúrja (whenNotMatchedInsertAll). A history() megmutatja a tranzakciós naplót.

Tippek: Mindig használj checkpointLocation-t a streaming írásoknál. A MERGE performance javítható a join feltétel particionálásával. A VACUUM paranccsal törölhetők a régi fájlverziók (alapértelmezetten 7 napnál régebbiek). Az OPTIMIZE paranccsal kicsi fájlokat egyesíthetsz nagyobbakká a lekérdezési teljesítmény javítására. A ZORDER paranccsal az adatokat egy adott oszlop szerint rendezheted a fájlban, ami felgyorsítja a szűrést.

[12]
# Write to Delta
df.write.format("delta") \
    .mode("overwrite") \
    .partitionBy("date") \
    .save("/delta/users")

# Read from Delta
delta_df = spark.read.format("delta").load("/delta/users")

# Time travel
yesterday = spark.read.format("delta") \
    .option("versionAsOf", 5) \
    .load("/delta/users")

# MERGE (upsert)
from delta.tables import DeltaTable
dt = DeltaTable.forPath(spark, "/delta/users")
dt.alias("target").merge(
    updates.alias("source"),
    "target.id = source.id"
).whenMatchedUpdateAll() \
 .whenNotMatchedInsertAll() \
 .execute()

print(f"Delta version: {dt.history().first().version}")
Output:
[delta] Write: 5 rows → /delta/users (partitioned by date)
[delta] Read: 5 rows from /delta/users
[delta] Time travel: version 55 rows
[delta] MERGE executed:
  Rows updated:   15,234
  Rows inserted:  3,421
  Rows unchanged: 31,345
[delta] Delta version: 42
[delta] History:
  v42: MERGE (2 min ago)
  v41: WRITE (1 hour ago)
  v40: DELETE (3 hours ago)
Section 15

Valós ETL pipeline Spark Delta

Ez a szekció egy valós ETL (Extract-Transform-Load) pipeline-t mutat be, amely a nyers adatoktól az elemzési kész adatokig vezeti végig a teljes folyamatot. A példában három különböző adatforrásból (JSON, Parquet, CSV) olvasunk adatokat, transzformáljuk őket (join, groupBy, aggregation), majd az eredményt Delta Lake formátumban mentjük. Ez a minta a legtöbb vállalati data pipeline alapja: bronze (nyers) → silver (tisztított) → gold (elemzési kész) rétegek.

Miért három formátum? A valós életben az adatok különböző forrásokból és formátumokból érkeznek. A felhasználói adatok gyakran JSON-ben jönnek (API-k, event-ek), a tranzakciós adatok Parquet-ben (optimalizált oszlopos tárolás), a kattintási adatok pedig CSV-ben (naplók, exportok). A Spark spark.read API-ja egységes interfészt biztosít mindezekhez, és mindegyiket DataFrame-ként kezelhetővé teszi – ez a Spark egyik legnagyobb erőssége.

A kódrészlet bemutatása: Az első lépésben a spark.read.json(), spark.read.parquet() és spark.read.csv() parancsokkal beolvassuk a forrásadatokat S3-ról. A transzformációs lépésben a felhasználókat join-oljuk a rendelésekkel user_id alapján, majd groupBy-val és aggregációkkal (count, sum, avg, max) jellemzőket generálunk minden felhasználóhoz: rendelések száma, össz költés, átlagos rendelés, utolsó rendelés dátuma. A végső lépésben az eredményt Delta formátumban mentjük, ami ACID tranzakciókat, time travel-t és MERGE műveleteket biztosít.

Tippek: Valós pipeline-okban mindig használj schema validációt (expectedSchema-t), ne hagyatkozz a séma-kikövetkeztetésre (lassabb és hibára hajlamos). Használj repartition()-t vagy coalesce()-t a kimenet optimalizálására írás előtt. A Delta Lake partitioning-ot (pl. partitionBy("date")) használj a gyakori szűrőoszlopokra. Monitorozd a pipeline-t a Spark UI-n, és állíts be alert-et a sikertelen jobokra. A termelési pipeline-okban mindig legyen checkpoint és error handling.

[13]
# Full ETL pipeline

# 1. Read multiple sources
users = spark.read.json("s3://raw/users/")
orders = spark.read.parquet("s3://raw/orders/")
clicks = spark.read.csv("s3://raw/clicks/", header=True)

# 2. Transform
user_features = users \
    .join(orders, "user_id") \
    .groupBy("user_id") \
    .agg(
        count("order_id").alias("order_count"),
        sum("amount").alias("total_spend"),
        avg("amount").alias("avg_order"),
        max("order_date").alias("last_order")
    )

# 3. Write to Delta
user_features.write.format("delta") \
    .mode("overwrite") \
    .save("s3://gold/user_features")

print(f"Features: {user_features.count()} users")
Output:
[spark] Reading sources:
  users:  500,000 rows  (JSON, 2.1s)
  orders: 12,345,678 rows (Parquet, 8.4s)
  clicks: 45,678,901 rows (CSV, 22.1s)
[spark] Transform: join + aggregate...
[spark] Features generated: 500,000 users
[spark] Columns: user_id, order_count, total_spend, avg_order, last_order
[delta] Write to s3://gold/user_features (Delta, 42 partitions)
[spark] Pipeline total: 3m 42s
Section 16

Összefoglalás Spark

Megtanultad:

✅ Spark Session, architektúra (Driver/Executor) ✅ RDD: transformations (lazy) vs actions ✅ DataFrame: high-level API, optimalizált ✅ Spark SQL: temp view, SQL queries ✅ Join és Window Functions ✅ Performance: repartition, cache, broadcast ✅ Structured Streaming: Kafka → Delta ✅ MLlib: Pipeline, Feature, Model ✅ Catalyst Optimizer: plan, optimization ✅ Delta Lake: ACID, time travel, merge ✅ Valós ETL pipeline példa

Szójegyzék

RDD · Shuffle · Catalyst

Quiz: Mi a DataFrame fő előnye az RDD-vel szemben?

Quiz: Melyik partitioning stratégia a legjobb range query-khez?