Apache Spark: Big Data feldolgozás
Spark
Az Apache Spark a big data feldolgozás iparági standardja, amely elosztott, in-memory feldolgozással képes több gigabájttól petabájtos nagyságrendű adatokkal dolgozni. A Hadoop MapReduce-hoz képest akár 100-szor gyorsabb memóriában és 10-szor gyorsabb lemezen, köszönhetően a DAG (Directed Acyclic Graph) alapú végrehajtási modellnek, amely elkerüli a felesleges lemezírásokat a lépések között.
Miért fontos? A modern data engineering világában a Spark az alapvető eszköz: a vállalati adatfeldolgozás, ETL pipeline-ek, streaming feldolgozás és gépi tanulás mögött is gyakran ez áll. Egyetlen egységes motor biztosítja a batch és streaming feldolgozást, az SQL lekérdezéseket és a gépi tanulást – nem kell külön rendszereket integrálni. A Spark futtatható YARN-on, Kubernetes-en, saját standalone cluster-ön vagy akár lokálisan is fejlesztés céljából.
A kurzus felépítése: Először a Spark architektúráját és alapfogalmait ismerjük meg (Driver, Executor, RDD, DataFrame). Ezután átmegyünk a gyakorlati DataFrame operációkon, Spark SQL-en, join és window függvényeken. A haladó rész a teljesítményoptimalizálást, a Structured Streaminget (Kafka → Delta), az MLlib gépi tanulási pipeline-t, a Catalyst optimalizálót és a Delta Lake integrációt fedi le. Végül egy teljes, valós ETL pipeline-t építünk össze.
Tippek a tanuláshoz: Minden kódrészlet futtatható – használd a "Run" gombot azonnali eredményekért. A Spark UI (localhost:4040) megértése kulcsfontosságú: ott látod a DAG-ot, a stage-eket, a shuffle-t és a memóriahasználatot. Ha valamit nem értesz, nézd meg a Section 01 fogalomtáblázatát, ott minden kulcsfogalom röviden definiálva van.
Spark Session → RDD → DataFrame → SQL → Streaming → MLlib → Optimizer → Delta Lake
Spark alapfogalmak
| Fogalom | Leírás | Fontos |
|---|---|---|
| Spark Session | A Spark belépési pontja | Egyetlen objektum, ami mindent kezel |
| RDD | Resilient Distributed Dataset | Alacsony szintű API, immutable, partitioned |
| DataFrame | Táblázatos adatstruktúra | High-level API, optimalizált |
| Partition | Adatpartíció | Elosztott feldolgozás egysége |
| Executor | Worker node folyamat | Ahol a tényleges számítás fut |
| Driver | Fő vezérlő folyamat | Taskok kiosztása, scheduler |
| Transform | Lazy operation | filter, map, groupBy – nem fut azonnal |
| Action | Triggers execution | collect, count, show – ez indítja el |
| Shuffle | Adat mozgatás node-ok között | Drága művelet, minimalizálandó |
Spark architektúra
Spark
A Spark architektúra három fő komponensből áll: a Driver program, amely a felhasználói kódot futtatja és a végrehajtási tervet (DAG) építi; a Cluster Manager (YARN, Kubernetes vagy Standalone), amely az erőforrásokat allokálja; és az Executor folyamatok, amelyek a munkánókon futtatják a tényleges feladatokat és tárolják a cache-elt adatokat. Ez a master-worker minta teszi lehetővé a horizontális skálázást – több node hozzáadásával lineárisan nő a feldolgozási kapacitás.
Miért fontos ezt érteni? A Spark teljesítményhangolás alapja, hogy tudd, melyik komponens mit csinál. A Driver építi a logikai tervet, a Catalyst optimalizáló alakítja fizikai tervvé, majd a Driver osztja fel task-okra és küldi az Executor-oknak. Ha a Driver memóriája elfogy (OOM), az egész alkalmazás összeomlik. Ha az Executor-ok nem kapnak elég memóriát, spill (lemezírás) történik, ami drasztikusan lelassítja a feldolgozást. A Cluster Manager biztosítja, hogy erőforrás-okozta konfliktusok ne lépjenek fel a munkaterhelések között.
A végrehajtási modell: Amikor egy action-t meghívsz (pl. collect(), count()), a Driver létrehoz egy Job-ot. A Job stage-ekre bomlik a shuffle határoknál. Minden stage task-okból áll, ahol egy task pontosan egy partition-t dolgoz fel egy Executor-on. A Spark UI-n (localhost:4040) mindez vizuálisan nyomon követhető: a DAG vizualizáció mutatja a stage-eket, az Event Timeline a task-ok futását, a Storage tab pedig a cache-elt adatokat.
Gyakorlati tippek: Fejlesztéshez a local[*] mód elegendő (minden CPU mag egy szálnak felel meg). Termelésben a YARN vagy Kubernetes a szabvány. A spark-submit parancs --num-executors, --executor-cores és --executor-memory paramétereivel finomhangolható a cluster. Alapszabály: egy Executor 5 core-t kapjon, és a memória legyen az adatméret 2-3-szorosa. A Driver memóriája akkor kell nagy, ha nagy méretű collect()-et hívsz vagy széles join-okat végzel.
| Komponens | Szerep | Példa |
|---|---|---|
| Driver | Vezérlés, DAG építés | spark-submit --master yarn |
| Cluster Manager | Erőforrás allokáció | YARN, Kubernetes, Standalone |
| Executor | Task végrehajtás, cache | 1-5 per worker node |
| Task | Egyetlen munkaegység | 1 partition = 1 task |
| Stage | Task csoport shuffle előtt | shuffle boundary-k között |
| Job | Egy action által indított | collect(), count() = 1 job |
Környezet beállítása
Spark
A Spark használatának első lépése a Spark Session létrehozása, amely a Spark minden funkcionalitásának belépési pontja. A SparkSession egyesíti a korábbi SparkContext, SQLContext és HiveContext objektumokat egyetlen egységes interfészbe. A builder minta lehetővé teszi az alkalmazásnév, konfigurációs beállítások és bővítmények megadását – a példában a Delta Lake bővítményt is betöltjük, ami ACID tranzakciókat és time travel-t ad a Spark-hoz.
Miért fontos a konfiguráció? A Spark Session-ben megadott beállítások határozzák meg a végrehajtási környezetet: a master URL (local[*] vagy yarn/k8s), a memória allokációt, a shuffle partition-ok számát, a serializer-t és sok mást. A Delta Lake extension betöltése elengedhetetlen, ha Delta formátumú adatokat akarsz olvasni vagy írni. A getOrCreate() biztosítja, hogy ha már létezik aktív session, azt használja új létrehozása helyett.
A kódrészlet bemutatása: A pyspark.sql modulból importáljuk a SparkSession-t, majd a builder láncolásával konfiguráljuk. Az appName() a Spark UI-n lesz látható, ami segíti a debuggolást. A config() hívás a Delta extension-t regisztrálja. A getOrCreate() végül létrehozza vagy visszakapja a session-t. A spark.version és a UI URL kiírása ellenőrzésre szolgál – ha ezeket látod, a Spark megfelelően fut.
Tippek: Fejlesztéshez a local[*] mód (minden elérhető mag használata) elegendő. Termelésben a spark-submit --master yarn vagy --master k8s://... paranccsal indítod. A Spark UI a localhost:4040-en érhető el, és a legfontosabb debug eszközöd – mindig nyisd meg, ha valami nem a várt módon működik. A Delta Lake használatához a pip install delta-spark csomag is szükséges.
# PySpark telepítés és indítás # Terminálban: pip install pyspark delta-spark # Spark Session létrehozása from pyspark.sql import SparkSession spark = SparkSession.builder \ .appName("crash-course") \ .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \ .getOrCreate() print(f"Spark version: {spark.version}") print(spark.sparkContext.uiWebUrl)
[spark] Spark version: 3.5.1 [spark] Spark UI: http://localhost:4040 [spark] Cluster: local[*] (8 cores) [spark] Memory: 4GB allocated [spark] Delta Lake extension loaded
RDD alapok
Spark
Az RDD (Resilient Distributed Dataset) a Spark legalacsonyabb szintű adatstruktúrája, amely három alapvető tulajdonsággal rendelkezik: immutable (megváltoztathatatlan – minden transzformáció új RDD-t hoz létre), partitioned (az adatok partitions-okra vannak osztva és elosztva a cluster node-jai között), és fault-tolerant (hibatűrő – ha egy node összeomlik, a Spark a lineage, azaz a származási lánc alapján újraépíti az elveszett partition-t).
Miért fontos az RDD-t ismerni? Bár a legtöbb esetben DataFrame-et használunk (magasabb szintű, optimalizált API), vannak helyzetek, amikor RDD-re van szükség: nem strukturált adatok feldolgozása, egyedi particionálási logika implementálása, vagy ha a Catalyst optimalizáló nem tudja hatékonyan kezelni a komplex transzformációkat. Az RDD megértése segít abban is, hogy mélyebben átlásd a DataFrame mögötti működést, hiszen a DataFrame végül RDD-ként hajtódik végre az Executor-okon.
A kódrészlet bemutatása: A parallelize() metódus létrehoz egy RDD-t a memóriában lévő adatokból. A filter() és map() transzformációk lazy-ek – nem futnak azonnal, csak feljegyzik a műveletet a DAG-ban. Csak az action-k (collect(), reduce(), count()) indítják el a végrehajtást. A példában először szűrjük a páros számokat, majd négyzetetre emeljük őket, végül összesítjük az eredményt. Figyeld meg, hogy a getNumPartitions() megmutatja, hány partition-ra osztotta a Spark az adatot – ez megegyezik a rendelkezésre álló CPU magok számával local módban.
Tippek: Ne használd az RDD-t, ha DataFrame is megoldja a feladatot – a DataFrame 10-100x gyorsabb lehet a Catalyst optimalizálónak köszönhetően. Ha mégis RDD-t kell használnod, kerüld a collect()-et nagy adatokon (mindent a Driver memóriájába hoz), és használj mapPartitions()-t a map() helyett, ha expensive inicializációra van szükség (pl. adatbázis kapcsolat nyitása partition-onként egyszer).
# RDD létrehozása rdd = spark.sparkContext.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) # Transformations (lazy) filtered = rdd.filter(lambda x: x % 2 == 0) # párosak mapped = filtered.map(lambda x: x * x) # négyzet # Actions (triggers execution) print(f"Partitions: {rdd.getNumPartitions()}") print(f"Even squares: {mapped.collect()}") print(f"Sum: {mapped.reduce(lambda a,b: a+b)}") print(f"Count: {mapped.count()}")
[spark] Partitions: 8 [spark] Even squares: [4, 16, 36, 64, 100] [spark] Sum: 220 [spark] Count: 5
A transformations (filter, map, flatMap) nem futnak azonnal – csak felépítik a DAG-ot. Az actions (collect, count, reduce) indítják el a végrehajtást.
RDD haladó: Word Count
Spark
A Word Count (szószámolás) a big data feldolgozás "Hello World"-je – ez a klasszikus példa, amely bemutatja a MapReduce programozási modellt és az RDD-k harness erejét. Bár egyszerűnek tűnik, a háttérben azonos mintákat használ, mint a valós feldolgozási feladatok: adatok felosztása (split), transzformáció (map), aggregáció (reduce) és rendezés (sort). Ez a minta alapja a log-elemzésnek, a szövegbányászatnak és az ETL pipeline-oknak.
A művelet lánc megértése: A textFile() beolvassa a fájlt soronként egy RDD-be. A flatMap() minden sort szavakra bont – ellentétben a map()-pel, amely egy-az-egyhez leképezést ad, a flatMap() "kilapítja" az eredményt, tehát egy sorból több szó lesz külön elem. A map() ezután minden szót egy (szó, 1) párra alakít – ez a "mapper" fázis. A reduceByKey() összeadja az azonos kulcsú értékeket – ez a "reducer" fázis, ami shuffle-t vált ki. Végül a sortBy() csökkenő sorrendbe rendezi az eredményt.
A shuffle fontossága: A reduceByKey() az egyetlen olyan művelet a láncban, amely shuffle-t (adatmozgatás a node-ok között) igényel. Ez azért van, mert az azonos szót tartalmazó partition-ok adatait össze kell gyűjteni egy helyre. A Spark itt egy optimalizációt vélez: mindegyik partition-on lokálisan "pre-aggregál" (combineByKey), mielőtt shuffle-nek küldené az adatokat – ez drasztikusan csökkenti a hálózati forgalmat.
Tippek: A textFile() helyett valós projektekben gyakran wholeTextFiles()-t használd, ha könyvtárban lévő összes fájlt be kell olvasni. A take(n) biztonságosabb, mint a collect(), mert csak az első n elemet hozza a Driver-be. Ha a feldolgozás lassú, ellenőrizd a partition-ok számát – túl sok kis partition esetén coalesce()-szel csökkentsd. HaPontosabb eredmény kell, használj reguláris kifejezést a split()-ben a szóhatárok pontosabb azonosítására.
# Szószámolás (Word Count) – klasszikus példa text_rdd = spark.sparkContext.textFile("data.txt") word_counts = text_rdd \ .flatMap(lambda line: line.split(" ")) \ .map(lambda word: (word.lower(), 1)) \ .reduceByKey(lambda a, b: a + b) \ .sortBy(lambda x: x[1], ascending=False) print(f"Unique words: {word_counts.count()}") print("Top 10:") for word, count in word_counts.take(10): print(f" {word}: {count}")
[spark] Unique words: 2,847 [spark] Top 10: the: 1,245 and: 892 spark: 756 data: 634 rdd: 521 dataframe: 489 partition: 412 shuffle: 367 transform: 334 action: 298
DataFrame alapok
Spark
A DataFrame a Spark legfontosabb és leggyakrabban használt API-ja, amely táblázatos formában tartja az adatokat, hasonlóan egy relációs adatbázis táblához vagy a pandas DataFrame-hez – azzal a lényeges különbséggel, hogy elosztott, tehát több gépen párhuzamosan dolgozik. A DataFrame rendelkezik sémával (schema), azaz minden oszlopnak ismert a típusa, ami lehetővé teszi a Catalyst optimalizáló számára, hogy hatékony végrehajtási tervet készítsen. A Tungsten execution engine emellett bináris formátumot használ az adatok tárolására, ami kevesebb memóriát fogyaszt és gyorsabb feldolgozást eredményez.
Miért jobb az RDD-nél? A DataFrame esetében a Spark "látja" az adatok struktúráját és a lekérdezés logikáját, így automatikusan optimalizál: predicate pushdown (szűrés minél korábban), column pruning (csak a szükséges oszlopok olvasása), join reorder (kis tábla broadcast-olása). Az RDD esetében a Spark csak lambdát lát, és nem tudja optimalizálni a végrehajtást. Ez a különbség 10-100x teljesítménytöbbletet jelenthet valós munkaterheléseknél.
A kódrészlet bemutatása: A createDataFrame() metódus tuple-ök listájából hoz létre DataFrame-et, ahol a második paraméter az oszlopneveket adja meg. A show() kiírja az adatokat táblázatos formában, a printSchema() megmutatja az oszlopok típusait. A count() és a rdd.getNumPartitions() megmutatja, hány sor és hány partition van – kis adatoknál általában 1 partition elég, de nagy adatoknál a Spark automatikusan többet hoz létre (alapértelmezetten 200 a shuffle után).
Tippek: Ha a DataFrame sémája nem egyértelmű, használd a StructType/StructField explicit sémadefiniálást – ez gyorsabb, mint a séma-kikövetkeztetés (schema inference). A createDataFrame() helyett éles projektekben általában spark.read paranccsal olvasunk adatokat Parquet, JSON, CSV, Delta vagy adatbázis formátumból. Mindig ellenőrizd a sémát a printSchema()-val, mielőtt tovább dolgozol az adatokkal.
# DataFrame létrehozása df = spark.createDataFrame([ (1, "Alice", 25, 50000), (2, "Bob", 35, 75000), (3, "Charlie", 45, 90000), (4, "Diana", 28, 60000), (5, "Eve", 52, 120000), ], ["id", "name", "age", "salary"]) df.show() df.printSchema() print(f"Rows: {df.count()}, Partitions: {df.rdd.getNumPartitions()}")
[spark] DataFrame created with 5 rows +---+-------+---+------+ | id| name|age|salary| +---+-------+---+------+ | 1| Alice| 25| 50000| | 2| Bob| 35| 75000| | 3|Charlie| 45| 90000| | 4| Diana| 28| 60000| | 5| Eve| 52|120000| +---+-------+---+------+ root |-- id: integer |-- name: string |-- age: integer |-- salary: integer [spark] Rows: 5, Partitions: 1
DataFrame operációk: filter, agg, join
Spark
A DataFrame operációk a Spark napi munkájának 90%-át teszik ki: szűrés (filter), oszlopkiválasztás (select), új oszlop hozzáadása (withColumn), aggregáció (agg) és feltételes logika (when/otherwise). Ezek a műveletek a DataFrame API építőkövei, amelyekkel komplex adattranszformációkat építhetsz fel deklaratívan, SQL-szerű szintaxissal. Minden művelet lazy – csak az action hívásakor (pl. show()) fut le a tényleges végrehajtás.
Miért érdemes a DataFrame API-t használni SQL helyett? Bár a Spark SQL ugyanazt az eredményt adja, a DataFrame API összetett transzformációk esetén jobban kezelhető: dinamikus oszlopnevek, változókon alapuló szűrők, és programozott pipeline építés. Az API emellett típusbiztosabb (IDE támogatás, compile-time ellenőrzés), és könnyebben parametrizálható. A két megközelítés keverhető is: a DataFrame API és a Spark SQL ugyanazt a Catalyst optimalizálót használja, így a teljesítmény azonos.
A kódrészlet bemutatása: A filter() és select() kombinációval szűrünk és oszlopokat választunk ki – a col() függvény oszlophivatkozást hoz létre. A withColumn() új oszlopot ad hozzá (bonus = salary * 0.1). Az agg() több aggregációt végez egyszerre: avg, sum, count. A when().otherwise() a SQL CASE-WHEN megfelelője, amivel feltételes oszlopokat hozhatunk létre. A show() mindig az utolsó lépés, ami a végrehajtást triggereli.
Tippek: Használd a col() függvényt oszlophivatkozásokhoz a string-alapú hivatkozások ("age") helyett, ahol lehet – a típusbiztosabb és jobban olvasható. Az agg()-ben alias() használatával nevezd el az eredményoszlopokat érthetően. Ha sok withColumn()-t láncolsz, fontold meg a select()-et expr()-rel, ami tömörebb. A when() láncolható: when(..., "A").when(..., "B").otherwise("C") formában.
from pyspark.sql.functions import col, avg, sum, count, when # Filter + Select young = df.filter(col("age") < 40).select("name", "salary") # With new column df2 = df.withColumn("bonus", col("salary") * 0.1) # Aggregation stats = df.agg( avg("salary").alias("avg_salary"), sum("salary").alias("total"), count("*").alias("count") ) # Conditional df3 = df.withColumn("bracket", when(col("salary") > 80000, "high").otherwise("normal")) stats.show() df3.show()
[spark] Aggregation: +----------+------+-----+ |avg_salary| total|count| +----------+------+-----+ | 79000.0|395000| 5| +----------+------+-----+ [spark] With bracket: +---+-------+---+------+-------+ | id| name|age|salary|bracket| +---+-------+---+------+-------+ | 1| Alice| 25| 50000| normal| | 2| Bob| 35| 75000| normal| | 3|Charlie| 45| 90000| high| | 4| Diana| 28| 60000| normal| | 5| Eve| 52|120000| high| +---+-------+---+------+-------+
Spark SQL
Spark
A Spark SQL lehetővé teszi, hogy standard SQL lekérdezéseket futtass a DataFrame-eken, mintha relációs adatbázis táblák lennének. A createOrReplaceTempView() metódus ideiglenes nézetet hoz létre, amely a jelenlegi Spark Session életéig él, és SQL-ből hivatkozható. A Spark SQL teljes ANSI SQL-92 kompatibilitást nyújt: SELECT, WHERE, GROUP BY, HAVING, JOIN, UNION, subquery, CASE-WHEN és sok window function mind támogatott.
Miért hasznos az SQL támogatás? Sok data engineer és elemző már ismeri az SQL-t, és gyorsabban tud dolgozni SQL-ben, mint DataFrame API-ban. A Spark SQL emellett integrálható BI eszközökkel (Tableau, Power BI, Superset) JDBC/ODBC kapcsolaton keresztül. Az SQL és a DataFrame API felcserélhetők – ugyanaz a Catalyst optimalizáló dolgozik mindkettő mögött, így a teljesítmény azonos. A választás az igényeden múlik: dinamikus pipeline-hoz a DataFrame API jobb, ad hoc elemzéshez és riportokhoz az SQL természetesebb.
A kódrészlet bemutatása: Először a createOrReplaceTempView("employees") létrehozza az ideiglenes nézetet. A spark.sql() fogadja a SQL string-et, ahol a CASE-WHEN korcsoportokat hoz létre (young/mid/senior), a GROUP BY 1 az első oszlopra csoportosít, az aggregációk (COUNT, AVG, MIN, MAX) statisztikákat számolnak, az ORDER BY DESC csökkenő sorrendbe teszi az eredményt. A result DataFrame-et show()-val jelenítjük meg.
Tippek: A temp view csak a session-en belül látható – ha globális nézet kell (más session-ek számára), használd a createGlobalTempView()-t. Komplex lekérdezéseknél a CTE (WITH clause) javítja az olvashatóságot. Kerüld a SELECT *-ot nagy táblákon (minden oszlop beolvasása felesleges I/O). A spark.sql() paramétezésére használj f-string-et vagy format()-ot, de SQL injection ellen mindig validáld a bemenetet.
# Temp view létrehozása df.createOrReplaceTempView("employees") # SQL lekérdezés result = spark.sql(""" SELECT CASE WHEN age < 30 THEN 'young' WHEN age < 50 THEN 'mid' ELSE 'senior' END as age_group, COUNT(*) as cnt, AVG(salary) as avg_salary, MIN(salary) as min_salary, MAX(salary) as max_salary FROM employees GROUP BY 1 ORDER BY avg_salary DESC """) result.show()
[spark] SQL query executed ✓ +---------+---+----------+----------+----------+ |age_group|cnt|avg_salary|min_salary|max_salary| +---------+---+----------+----------+----------+ | senior| 1| 120000.0| 120000| 120000| | mid| 2| 82500.0| 75000| 90000| | young| 2| 55000.0| 50000| 60000| +---------+---+----------+----------+----------+
Join és Window Functions
Spark
A Join és a Window Functions a Spark két legerősebb elemzőeszköze. A join összekapcsol két DataFrame-et egy megadott kulcs alapján (inner, left, right, full, cross), hasonlóan a relációs adatbázisokhoz. A window functions lehetővé teszik olyan számításokat, amelyek a sorok kontextusában maradnak – azaz nem összevonják a sorokat, mint a groupBy, hanem minden sorhoz hozzáadják az ablakfüggvény eredményét (rank, running sum, moving average, row number).
Miért fontosak a window functions? Gyakori feladat, hogy minden sorhoz szükség van egy "kontextusra": mi az adott sor rangja a csoportján belül (rank), mennyi az eddigi összeg (running total), mi a mozgóátlag (moving average). Ezeket groupBy-val nem lehet megoldani, mert a groupBy összevonja a sorokat. A window functions a partitionBy (csoportosítás) és orderBy (rendezés) definícióval dolgoznak, és az ablak keret (rowsBetween/rangeBetween) határozza meg, mely sorok vesznek részt a számításban.
A kódrészlet bemutatása: Az első részben egy inner join kapcsolja össze a felhasználókat a rendelésekkel. A második részben a Window.partitionBy("user_id").orderBy(col("amount").desc()) definíál egy ablakot, ahol a rank() megadja a rendelés rangját felhasználónként csökkenő összeg szerint. A harmadik részben a rowsBetween(unboundedPreceding, unboundedFollowing) egy "teljes ablakot" határoz meg, ahol a spark_sum az adott felhasználó összes rendelésének összegét adja minden sornál.
Tippek: A join-nál mindig add meg a join típust ("inner", "left", "right"), mert az alapértelmezés nem egyértelmű. Nagy táblák join-nál használj broadcast()-ot, ha az egyik tábla kicsi (< 10MB alapértelmezetten). A window functions után következő shuffle miatt ezek drága műveletek lehetnek – megfelelő partitioninggal csökkentheted a költséget. A rowsBetween paraméterekkel pontosan vezérelheted, mely sorok számítanak bele az ablakba.
# Join példa orders = spark.createDataFrame([ (1, 1, "laptop", 1200), (2, 2, "phone", 800), (3, 1, "tablet", 400) ], ["order_id", "user_id", "product", "amount"]) joined = df.join(orders, df.id == orders.user_id, "inner") joined.select("name", "product", "amount").show() # Window function from pyspark.sql.window import Window from pyspark.sql.functions import rank, sum as spark_sum w = Window.partitionBy("user_id").orderBy(col("amount").desc()) ranked = orders.withColumn("rank", rank().over(w)) w2 = Window.partitionBy("user_id").rowsBetween( Window.unboundedPreceding, Window.unboundedFollowing) running = orders.withColumn("total", spark_sum("amount").over(w2)) ranked.show()
[spark] Inner join: +-----+-------+------+ | name|product|amount| +-----+-------+------+ |Alice| laptop| 1200| |Alice| tablet| 400| | Bob| phone| 800| +-----+-------+------+ [spark] Window rank by user: +--------+-------+------+----+ |order_id|product|amount|rank| +--------+-------+------+----+ | 1| laptop| 1200| 1| | 3| tablet| 400| 2| | 2| phone| 800| 1| +--------+-------+------+----+
Performance optimalizálás
Spark
A Spark teljesítményoptimalizálás három pillérre épül: partitioning (az adatok megfelelő felosztása a clusterön), caching (gyakran használt adatok memóriában tartása), és a shuffle minimalizálás (az adatmozgatás csökkentése a node-ok között). Ezek közül a shuffle a legdrágább művelet, mert hálózati I/O-t és lemezírást igényel – egyetlen felesleges groupBy vagy join komplett shuffle-t válthat ki, ami percekkel növelheti a futási időt.
A kódrészlet bemutatása: A repartition() hash-alapú újraosztást végez, ami shuffle-t vált ki, de utána a kulcs alapú műveletek (join, groupBy) lokálisak maradnak. A coalesce() ellentétben nem vált ki shuffle-t – csak összevonja a meglévő partition-ket, ami ideális a kimenet csökkentésére (pl. 200 partition-ről 10-re egy fájlba írás előtt). A cache() és persist() memóriában tartja az adatokat, de figyelj: csak akkor érdemes használni, ha ugyanazt a DataFrame-et legalább 2-szer használod. A broadcast() egy kis táblát minden node-ra másol, elkerülve a shuffle-join-t – ez az egyik leghatékonyabb optimalizálás.
Az explain() megértése: Az explain(True) kilistázza a teljes végrehajtási tervet: Parsed Logical Plan → Analyzed → Optimized → Physical Plan. A Physical Plan mutatja meg a valós végrehajtást: milyen join stratégiát használ (BroadcastHashJoin, SortMergeJoin), hol van filter, project, aggregate. Ha a tervben sok Exchange lépést látsz, az shuffle-t jelent – ezeket érdemes minimalizálni.
Gyakorlati tippek: Alapszabály: 128MB adat partition-onként. Túl sok kis partition = sok scheduler overhead. Túl kevés nagy partition = nem használja ki a párhuzamosságot. Használd a spark.sql.shuffle.partitions beállítást (alapértelmezett 200) a munkaterhelésnek megfelelően. A Spark UI "SQL" tabján látod a tényleges végrehajtási tervet minden lekérdezésnél – ezt érdemes vizsgálni, ha lassú a pipeline.
# Repartition df_repartitioned = df.repartition(200, "age") # Coalesce (reduce partitions, no shuffle) df_coalesced = df.coalesce(10) # Cache df.cache() df.persist(StorageLevel.MEMORY_AND_DISK) # Broadcast join (small table) from pyspark.sql.functions import broadcast small_df = spark.createDataFrame([(1, "IT"), (2, "HR")], ["dept_id", "dept"]) joined = df.join(broadcast(small_df), "dept_id") # Explain plan df.filter("age > 30").select("name").explain(True)
[spark] == Physical Plan == *(1) Project [name#1] +- *(1) Filter (age#2 > 30) +- Scan ExistingRDD [spark] Optimizations applied: ✓ Predicate pushdown ✓ Column pruning ✓ BroadcastHashJoin (small table) ✓ WholeStageCodegen
| Művelet | Leírás | Shuffle? | Mikor? |
|---|---|---|---|
| repartition() | Hash partitioning | Igen | 200 part / nagy data |
| coalesce() | Csökkentés | NEM | Kimenet optimalizálás |
| cache() | Memory cache | Nincs | Ismételt lekérdezés |
| broadcast() | Kis tábla minden node | Nincs | Small/large join |
Structured Streaming: Kafka → Delta
Spark
Kafka
A Structured Streaming a Spark deklaratív stream feldolgozó motorja, amely a DataFrame API-t kiterjeszti valós idejű adatfolyamokra. A kulcsötlelet: a stream egy korlátlan DataFrame, azaz ugyanazokat a transzformációkat alkalmazhatod, mint batch módban (filter, join, aggregation). A motor a háttérben növekményesen frissíti az eredményt, ahogy új adatok érkeznek. A példa egy Kafka topicból olvassa az eseményeket, feldolgozza őket, és Delta Lake-be írja – ez az egyik leggyakoribb valós streaming architektúra.
Miért Kafka + Delta? Az Apache Kafka a de facto standard esemény-streaming platform, amely milliónyi eseményt tud másodpercenként fogadni és szállítani. A Delta Lake adatformátum pedig ACID transakciókat biztosít a stream írásakor: ha a pipeline összeomlik, a checkpoint-ból folytatódik, nem lesz duplikált vagy hiányzó adat. Ez a kombináció (Kafka source → Spark processing → Delta sink) az alapja a legtöbb modern valós idejű adatplatformnak.
A kódrészlet bemutatása: A readStream.format("kafka") egy streaming DataFrame-et hoz létre a Kafka topicból. A from_json() parse-olja a JSON üzeneteket a megadott séma alapján (user_id, action, value). A writeStream.format("delta") a Delta Lake-be írja az eredményt. Az outputMode("append") új sorokat ad hozzá (létezőket nem módosít). A checkpointLocation biztosítja, hogy újraindítás után onnan folytatódik a feldolgozás, ahol abbamaradt. A start() elindítja a streamet a háttérben.
Tippek: A checkpoint elengedhetetlen termelésben – nélküle az újraindítás az elejétől kezdi a feldolgozást. A schema mindig explicit legyen, ne hagyd a Spark-ot kitalálni. Használd a trigger beállítást (processingTime) a batch méret vezérlésére. A stream figyelését a query.status és query.lastProgress hívásokkal végezheted. Termelésben a streaming query-t mindig wrap-old try/except-be és állíts be alert-et, ha a query leáll.
from pyspark.sql.functions import from_json, col from pyspark.sql.types import StructType, StringType, IntegerType schema = StructType() \ .add("user_id", IntegerType()) \ .add("action", StringType()) \ .add("value", IntegerType()) stream = spark \ .readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "localhost:9092") \ .option("subscribe", "events") \ .load() parsed = stream.select( from_json(col("value").cast("string"), schema).alias("data") ).select("data.*") query = parsed.writeStream \ .format("delta") \ .outputMode("append") \ .option("checkpointLocation", "/checkpoint") \ .start("/delta/events") print(f"Streaming active: {query.isActive}")
[spark] Structured Streaming started [spark] Source: Kafka [events] [spark] Sink: Delta [/delta/events] [spark] Trigger: processingTime=0ms (continuous) [spark] Batch 1: 1,245 events processed (0.8s) [spark] Batch 2: 1,389 events processed (0.6s) [spark] Batch 3: 1,102 events processed (0.7s) [spark] Throughput: ~1.2k events/sec [spark] Latency: 150ms avg
MLlib: ML Pipeline Spark-on
Spark
Az MLlib a Spark beépített gépi tanulás könyvtára, amely skálázható ML algoritmusokat és pipeline eszközöket biztosít. A Pipeline API a scikit-learnéhoz hasonló: a gépi tanulási folyamatot lépésekre (stage-ekre) bontja – feature engineering (VectorAssembler, StandardScaler), modell illesztés (RandomForest, LogisticRegression), és értékelés (evaluator) –, és ezeket láncolhatóvá teszi. A teljes pipeline elmenthető és betölthető, ami biztosítja a reprodukálhatóságot.
Miért MLlib és nem scikit-learn? A scikit-learn egy gépen fut, és nem skálázódik 10+ millió sor fölé. Az MLlib ugyanazt a Spark architektúrát használja, mint a batch feldolgozás: az adatok elosztottak maradnak, a tréning párhuzamosan fut minden Executor-on. Ha az adatod már Spark-ban van (DataFram-ként), nincs szükség adatmozgatásra – a tréning ott történik, ahol az adat van. A Pipeline emellett termelésbe könnyebben vihető: mentés/betöltés, batch scoring, és streaming predikció is támogatott.
A kódrészlet bemutatása: A VectorAssembler egyesíti a feature oszlopokat (age, income, tenure, usage) egy vektor oszloppá. A StandardScaler normalizálja a feature-ket (z-score), ami fontos a távolság-alapú algoritmusoknál. A RandomForestClassifier a célváltozó (churn) alapján tanít. A Pipeline összefűzi ezeket stage-eket. A randomSplit([0.8, 0.2]) 80-20%-os train-test felosztást ad. A BinaryClassificationEvaluator AUC-ROC metrikával értékeli a modellt. Az AUC 0.92 felett kiváló eredmény.
Tippek: A Pipeline mentése (model.save()) és betöltése (PipelineModel.load()) biztosítja a reprodukálhatóságot. A CrossValidator használatával hyperparameter tuning-ot végezhetsz, de figyelj: ez drága, mert minden paraméterkombinációra lefuttatja a tréninget. A feature importance (jellemzőfontosság) mutatja, melyik változó járul hozzá leginkább a predikcióhoz – ez értelmezhetőbbé teszi a modellt. Termelésben használj MLflow-t a kísérletek és modellek nyomon követésére.
from pyspark.ml import Pipeline from pyspark.ml.feature import VectorAssembler, StandardScaler from pyspark.ml.classification import RandomForestClassifier from pyspark.ml.evaluation import BinaryClassificationEvaluator # Feature assembly assembler = VectorAssembler( inputCols=["age", "income", "tenure", "usage"], outputCol="features_raw" ) scaler = StandardScaler(inputCol="features_raw", outputCol="features") classifier = RandomForestClassifier(labelCol="churn", numTrees=100) pipeline = Pipeline(stages=[assembler, scaler, classifier]) # Train/test split train, test = df.randomSplit([0.8, 0.2], seed=42) model = pipeline.fit(train) predictions = model.transform(test) evaluator = BinaryClassificationEvaluator(labelCol="churn") auc = evaluator.evaluate(predictions) print(f"AUC: {auc:.4f}")
[spark] Pipeline stages: 3 (VectorAssembler → StandardScaler → RandomForest) [spark] Training: 800 rows | Test: 200 rows [spark] AUC: 0.9234 [spark] Feature Importance: income: 0.342 usage: 0.289 tenure: 0.213 age: 0.156 [spark] Model saved: /models/churn_rf_v1
Catalyst Optimizer
Spark
A Catalyst a Spark SQL lekérdezés-optimalizáló motorja, amely automatikusan átalakítja a felhasználó által írt logikai lekérdezést a leghatékonyabb fizikai végrehajtási tervvé. A Catalyst négy fázisban dolgozik: Analysis (feloldja a hivatkozásokat, ellenőrzi a típusokat), Logical Optimization (alkalmazza a szabályalapú optimalizálásokat: predicate pushdown, constant folding, column pruning), Physical Planning (több fizikai tervet generál és kiválasztja a legjobbat, pl. BroadcastHashJoin vs. SortMergeJoin), és Code Generation (Tungsten által JIT-re fordítja a tervet Java bytekódra).
Miért fontos? A Catalyst teszi lehetővé, hogy a Spark DataFrame API-ja versenyezzen a kézzel optimalizált kóddal. Amikor írsz egy df.filter().join().groupBy() láncot, a Catalyst automatikusan átszervezi: a filter-t a join elé tolja (így kevesebb soron kell join-olni), eldobja a felesleges oszlopokat, és kiválasztja a megfelelő join stratégiát. Ehhez nem kell semmit tenned – a Catalyst a DataFrame sémáját és a logikai tervet használja ezekhez az optimalizálásokhoz.
A kódrészlet bemutatása: Az explain(True) parancs megmutatja a teljes optimalizálási folyamatot. A Parsed Logical Plan a nyers lekérdezés, az Analyzed Plan ellenőrzi a típusokat, az Optimized Plan mutatja az átszervezett tervet (filter pushdown, column pruning), a Physical Plan pedig a végső végrehajtási tervet. A példában a filter az age > 30 feltétellel a join elé kerül, a BroadcastHashJoin a kis tábla miatt, és csak a szükséges oszlopok (department, salary) maradnak.
Tippek: Mindig futtasd az explain()-t lassú lekérdezéseknél – a legtöbb probléma azonnal láthatóvá válik. Ha SortMergeJoin-t látsz ott, ahol BroadcastHashJoin jobb lenne, próbáld meg a spark.sql.autoBroadcastJoinThreshold beállítást növelni. Ha sok Exchange lépés van, ott shuffle történik – érdemes megnézni, lehet-e elkerülni partition-on belüli műveletekkel. A WholeStageCodegen jelzi, hogy a Spark egyben fordítja le az egymás utáni operátorokat – ez nagyon jó teljesítményszempontból.
# Explain plan df.filter(col("age") > 30) \ .join(other_df, "id") \ .groupBy("department") \ .agg(avg("salary")) \ .explain(True)
== Parsed Logical Plan == Filter (age > 30) → Join → Aggregate == Analyzed Logical Plan == [Resolved references, types checked] == Optimized Logical Plan == 1. Push down Filter before Join 2. Column pruning (only needed cols) 3. Join reorder (small table first) == Physical Plan == *(2) HashAggregate(department, avg(salary)) +- *(2) Project [department, salary] +- *(1) BroadcastHashJoin [id] +- *(1) Filter (age > 30) +- BroadcastExchange
✅ Predicate pushdown: filter minél korábban
✅ Column pruning: csak a szükséges oszlopok
✅ Join reorder: kis tábla broadcast
✅ Constant folding: 1+1→2
✅ WholeStageCodegen: JIT kompiláció
Delta Lake + Spark
Delta
A Delta Lake az open source adattároló réteg, amely ACID tranzakciókat, time travel-t (adat visszaállítás korábbi verzióra) és MERGE/upsert műveleteket ad a Spark-hoz. A Delta Lake a Parquet formátumra épül, de tranzakciós naplót (transaction log, _delta_log/) ad hozzá, amely minden változást nyomon követ. Ez lehetővé teszi a párhuzamos írást és olvasást, a séma fejlődést (schema evolution) és a adatépség garantálását – mindezt a Parquet sebességével.
Miért Delta Lake és nem sima Parquet? A sima Parquet fájloknál a "párhuzamos írók felülírják egymás adatait" probléma gyakori. Nincs tranzakciókezelés, nincs visszaállítás, és a METALÓG sem frissül atomikusan. A Delta Lake ezeket a problémákat megoldja: az ACID garanciák miatt a párhuzamos írók nem ütköznek, a time travel lehetővé teszi az adatok visszaállítását bármely korábbi verzióra, és a MERGE (upsert) művelet hatékonyan kezeli a "meglévő sorok frissítése, újak beszúrása" mintát, ami a legtöbb ETL pipeline alapja.
A kódrészlet bemutatása: A write.format("delta") Delta formátumba menti az adatokat, a partitionBy("date") dátum alapján particionál. A read.format("delta") visszolvassa az adatokat. A time travel az option("versionAsOf", 5) paraméterrel egy korábbi verziót olvas vissza. A DeltaTable.forPath() és merge() kombináció egy SQL MERGE-nek felel meg: ha a source és target ID megegyezik, frissíti a sort (whenMatchedUpdateAll), ha nincs egyezés, beszúrja (whenNotMatchedInsertAll). A history() megmutatja a tranzakciós naplót.
Tippek: Mindig használj checkpointLocation-t a streaming írásoknál. A MERGE performance javítható a join feltétel particionálásával. A VACUUM paranccsal törölhetők a régi fájlverziók (alapértelmezetten 7 napnál régebbiek). Az OPTIMIZE paranccsal kicsi fájlokat egyesíthetsz nagyobbakká a lekérdezési teljesítmény javítására. A ZORDER paranccsal az adatokat egy adott oszlop szerint rendezheted a fájlban, ami felgyorsítja a szűrést.
# Write to Delta df.write.format("delta") \ .mode("overwrite") \ .partitionBy("date") \ .save("/delta/users") # Read from Delta delta_df = spark.read.format("delta").load("/delta/users") # Time travel yesterday = spark.read.format("delta") \ .option("versionAsOf", 5) \ .load("/delta/users") # MERGE (upsert) from delta.tables import DeltaTable dt = DeltaTable.forPath(spark, "/delta/users") dt.alias("target").merge( updates.alias("source"), "target.id = source.id" ).whenMatchedUpdateAll() \ .whenNotMatchedInsertAll() \ .execute() print(f"Delta version: {dt.history().first().version}")
[delta] Write: 5 rows → /delta/users (partitioned by date) [delta] Read: 5 rows from /delta/users [delta] Time travel: version 5 → 5 rows [delta] MERGE executed: Rows updated: 15,234 Rows inserted: 3,421 Rows unchanged: 31,345 [delta] Delta version: 42 [delta] History: v42: MERGE (2 min ago) v41: WRITE (1 hour ago) v40: DELETE (3 hours ago)
Valós ETL pipeline
Spark
Delta
Ez a szekció egy valós ETL (Extract-Transform-Load) pipeline-t mutat be, amely a nyers adatoktól az elemzési kész adatokig vezeti végig a teljes folyamatot. A példában három különböző adatforrásból (JSON, Parquet, CSV) olvasunk adatokat, transzformáljuk őket (join, groupBy, aggregation), majd az eredményt Delta Lake formátumban mentjük. Ez a minta a legtöbb vállalati data pipeline alapja: bronze (nyers) → silver (tisztított) → gold (elemzési kész) rétegek.
Miért három formátum? A valós életben az adatok különböző forrásokból és formátumokból érkeznek. A felhasználói adatok gyakran JSON-ben jönnek (API-k, event-ek), a tranzakciós adatok Parquet-ben (optimalizált oszlopos tárolás), a kattintási adatok pedig CSV-ben (naplók, exportok). A Spark spark.read API-ja egységes interfészt biztosít mindezekhez, és mindegyiket DataFrame-ként kezelhetővé teszi – ez a Spark egyik legnagyobb erőssége.
A kódrészlet bemutatása: Az első lépésben a spark.read.json(), spark.read.parquet() és spark.read.csv() parancsokkal beolvassuk a forrásadatokat S3-ról. A transzformációs lépésben a felhasználókat join-oljuk a rendelésekkel user_id alapján, majd groupBy-val és aggregációkkal (count, sum, avg, max) jellemzőket generálunk minden felhasználóhoz: rendelések száma, össz költés, átlagos rendelés, utolsó rendelés dátuma. A végső lépésben az eredményt Delta formátumban mentjük, ami ACID tranzakciókat, time travel-t és MERGE műveleteket biztosít.
Tippek: Valós pipeline-okban mindig használj schema validációt (expectedSchema-t), ne hagyatkozz a séma-kikövetkeztetésre (lassabb és hibára hajlamos). Használj repartition()-t vagy coalesce()-t a kimenet optimalizálására írás előtt. A Delta Lake partitioning-ot (pl. partitionBy("date")) használj a gyakori szűrőoszlopokra. Monitorozd a pipeline-t a Spark UI-n, és állíts be alert-et a sikertelen jobokra. A termelési pipeline-okban mindig legyen checkpoint és error handling.
# Full ETL pipeline # 1. Read multiple sources users = spark.read.json("s3://raw/users/") orders = spark.read.parquet("s3://raw/orders/") clicks = spark.read.csv("s3://raw/clicks/", header=True) # 2. Transform user_features = users \ .join(orders, "user_id") \ .groupBy("user_id") \ .agg( count("order_id").alias("order_count"), sum("amount").alias("total_spend"), avg("amount").alias("avg_order"), max("order_date").alias("last_order") ) # 3. Write to Delta user_features.write.format("delta") \ .mode("overwrite") \ .save("s3://gold/user_features") print(f"Features: {user_features.count()} users")
[spark] Reading sources: users: 500,000 rows (JSON, 2.1s) orders: 12,345,678 rows (Parquet, 8.4s) clicks: 45,678,901 rows (CSV, 22.1s) [spark] Transform: join + aggregate... [spark] Features generated: 500,000 users [spark] Columns: user_id, order_count, total_spend, avg_order, last_order [delta] Write to s3://gold/user_features (Delta, 42 partitions) [spark] Pipeline total: 3m 42s
Összefoglalás
Spark
✅ Spark Session, architektúra (Driver/Executor) ✅ RDD: transformations (lazy) vs actions ✅ DataFrame: high-level API, optimalizált ✅ Spark SQL: temp view, SQL queries ✅ Join és Window Functions ✅ Performance: repartition, cache, broadcast ✅ Structured Streaming: Kafka → Delta ✅ MLlib: Pipeline, Feature, Model ✅ Catalyst Optimizer: plan, optimization ✅ Delta Lake: ACID, time travel, merge ✅ Valós ETL pipeline példa
RDD · Shuffle · Catalyst