</> Delta Table Crash Course

0 / 26 section completed
Intro

Delta table lokálisan, Databricks és Synapse nélkül Delta Lake

Ez a notebook azt mutatja meg, hogy egy Delta table nem feltétlenül Databricksben vagy Synapse-ban jön létre. A Delta Lake egy nyílt tárolási formátum: ha van hozzá kompatibilis writer, akkor lokális fájlrendszerre is tudsz Delta táblát írni.

A notebook fő gondolata:

Delta table = Parquet adatfájlok + _delta_log tranzakciós napló
Metastore/Catalog = névvel ellátott bejegyzés, ami rámutat a Delta table locationre

Itt nem használunk Databrickset, Synapse-t vagy Azure-t. Minden lokálisan történik.

A notebook későbbi részei azt is megmutatják, hogyan kapcsolódik ehhez az ETL/ELT, a bronze-silver-gold rétegezés, valamint a fact/dimension modellezés.

Section 01

Fogalmi térkép: Data Lake, Data Warehouse, Lakehouse, Metastore

Mielőtt létrehozunk egy lokális Delta table-t, érdemes elhelyezni a példát a nagyobb data engineering térképen.

FogalomRövidenAdatErősségTipikus használat
Data WarehouseÜzleti riporting és konzisztens SQL elemzésTisztított, modellezett, strukturáltGyors BI, erős SQL, governancePénzügyi riport, vezetői dashboard, data mart
Data LakeNyers és sokféle adat olcsó, skálázható tárolásaFájlok: JSON, CSV, Parquet, képek, logokRugalmasság, nagy skála, alacsony storage költségRaw landing zone, archívum, data science előkészítés
LakehouseLake rugalmasság warehouse-szerű táblakezelésselNyílt table formatok objektumtáronACID, time travel, batch + streaming, ML + BIBronze-silver-gold platform, közös analitikai adatbázis
MetastoreTáblák, sémák és lokációk nyilvántartásaMetaadat, nem maga az üzleti adatNév, schema, permission, discoverySpark SQL táblák, catalog, jogosultságkezelés
Ebben a notebookban a lakehouse gondolkodást próbáljuk ki kicsiben:
Data Lake rész: lokális fájlok és mappák
Lakehouse rész: Delta table = Parquet + _delta_log
Metastore rész: most nincs tényleges metastore, csak bemutatjuk, mit adna hozzá
Data Warehouse rész: fact/dimension és gold réteg, vagyis riportbarát modell

A lényeg: a data lake főleg tárolási minta, a warehouse főleg riportolási és modellezési minta, a lakehouse a kettő közötti híd, a metastore pedig a név- és metadata-réteg.

Section 02

Architektúra-térkép

A architektúra-térkép azt mutatja meg, hogyan áll össze egy tipikus data engineering platform a forrásrendszerektől a fogyasztásig. A hatszakaszos folyamat (forrás, beérkeztetés, tárolás, feldolgozás, metaadat, fogyasztás) szinte minden modern adatplatformon megtalálható valamilyen formában, függetlenül attól, hogy Databricks, Snowflake, vagy saját építésű megoldásról van szó.

A diagram vizuálisan is megmutatja az adatáramlás irányát: balról jobbra halad a nyers adat a forrásrendszerektől a feldolgozott, riportkész adatokig. Minden szakasznak megvannak a saját tipikus eszközei és technológiái, de a Delta Lake a "tárolás" sávban tölt be kulcsszerepet, mint a nyílt táblázatformátum, amely összeköti a feldolgozó rendszereket a tároló réteggel.

Forrás
OLTP, API, IoT
Beérkeztetés
Batch, CDC, Kafka
Tárolás
Delta, Parquet, Lake
Feldolgozás
Spark, dbt, SQL
Metaadat
Catalog, Lineage
Fogyasztás
BI, Dashboard, ML
SávTipikus komponensekMit jelent ebben a notebookban?
ForrásOLTP adatbázis, API, fájl export, event stream, IoTKézzel létrehozott pandas DataFrame-ek és CSV fájlok szimulálják a forrásadatot
BeérkeztetésBatch ingest, CDC, Kafka, Event Hub, landing zoneCSV-k beolvasása batch módban, plusz micro-batch streaming szimuláció
TárolásObject storage, data lake, Delta table, Iceberg tableLokális mappák, Parquet fájlok és Delta táblákat tartó könyvtárak
FeldolgozásSpark, SQL engine, dbt, stream processingpandas + deltalake transzformációk: bronze → silver → gold
MetaadatMetastore, data catalog, schema registry, lineageA notebook elmagyarázza a metastore szerepet
FogyasztásBI dashboard, data mart, feature storeGold KPI tábla, fact/dimension modell és riportbarát aggregátumok
Ugyanez a kis példára vetítve:
Fájl export / forrásadat
  → landing/orders/*.csv
  → bronze Delta table
  → silver Delta table
  → gold KPI és fact/dimension Delta táblák
  → riport / BI / SQL elemzésre kész adat
Section 03

Notebook bootstrap: virtuális környezet és csomagok Python

Ez a notebook teljesen bootstrapelt: a projekt gyökerében lévő bootstrap.ps1 vagy bootstrap.sh létrehozza a .venv virtuális környezetet, telepíti a requirements.txt csomagjait, és regisztrálja a Jupyter kernelt. Ez biztosítja, hogy mindenki ugyanabban a környezetben futtatja a kódot, függetlenül attól, hogy milyen operációs rendszert használ.

A data engineering projektekben a reproduckálható környezet kiemelkedően fontos: ha a csapattagok különböző csomagverziókkal dolgoznak, akkor könnyen előfordulhat, hogy az egyik gépen működő pipeline a másikon hibát dob. A virtuális környezet (venv) ezt a kockázatot szünteti meg úgy, hogy minden függőséget a projekt könyvtárában tart, a globális Python telepítés érintetlenül hagyása mellett.

A kódban a REQUIRED_IMPORTS lista határozza meg, mely csomagoknak feltétlenül elérhetőnek kell lenniük a futás elkezdése előtt. Az os.name ágon a szkript megkülönbözteti a Windows és Linux/macOS környezeteket, így a megfelelő bootstrap szkriptet hívja meg. A subprocess.check_call() biztosítja, hogy ha a bootstrap bármely lépése sikertelen, a notebook azonnal jelezze a hibát.

Tip: Ha a bootstrap lefutott, a .venv mappa tartalmazza a teljes Python környezetet -- ezt érdemes hozzáadni a .gitignore-hoz, mert a gépen generálódik, nem pedig verziózandó.

[3]
from pathlib import Path
import os, subprocess, sys

REQUIRED_IMPORTS = ["deltalake", "pandas", "pyarrow"]
KERNEL_NAME = "de-delta-demo"
project_dir = Path.cwd()
venv_dir = Path(".venv")

if os.name == "nt":
    venv_python = venv_dir / "Scripts" / "python.exe"
    bootstrap_cmd = ["powershell", "-NoProfile", "-File", "bootstrap.ps1"]
else:
    venv_python = venv_dir / "bin" / "python"
    bootstrap_cmd = ["bash", "bootstrap.sh"]

subprocess.check_call(bootstrap_cmd, cwd=project_dir)

for module_name in REQUIRED_IMPORTS:
    __import__(module_name)

print(f"Virtuális környezet kész: {venv_python}")
print(f"Kernel regisztrálva: Python (.venv - Delta demo)")
Output:
[delta] Virtuális környezet kész: /home/user/engineering_crash_courses/.venv/bin/python
[delta] Kernel regisztrálva: Python (.venv - Delta demo)
[delta] Aktuális futó Python: /home/user/engineering_crash_courses/.venv/bin/python
Section 04

Környezet ellenőrzése deltalake pandas pyarrow

A példához a deltalake Python csomagot használjuk. Ez a delta-rs projekt Python bindingja, és Spark nélkül is tud Delta táblát írni/olvasni. A delta-rs egy Rust nyelven írt, nagy teljesítményű Delta Lake implementáció, amely lehetővé teszi, hogy nem csak JVM-alapú rendszerekben (pl. Spark) használjuk a Delta formátumot, hanem tiszta Python környezetben is.

A data engineering gyakorlatban ez azért jelentős, mert sok ETL pipeline nem igényel elosztott feldolgozást -- egy egyszerű Python scripttel is létrehozhatsz production-grade Delta táblákat, amelyek aztán bármely Delta-kompatibilis eszközzel (Spark, Databricks, Trino, Presto) olvashatók. A pyarrow biztosítja a hatékony memórián belüli adatkezelést, a pandas pedig a kényelmes DataFrame API-t.

A kódcella meghívja a sys.executable útvonalat, valamint a három fő csomag verziószámát, hogy ellenőrizhesd: a megfelelő virtuális környezet aktív, és a verziók kompatibilisek egymással.

Tip: A deltalake csomag verziója határozza meg, mely Delta protokollverziókat tudja írni és olvasni -- régebbi verziók nem támogatják például a column mapping vagy a deletion vectors funkciókat.

[4]
import sys
import deltalake
import pandas as pd
import pyarrow as pa

print(f"Python: {sys.executable}")
print(f"deltalake: {deltalake.__version__}")
print(f"pandas: {pd.__version__}")
print(f"pyarrow: {pa.__version__}")
Output:
[delta] Python: /home/user/engineering_crash_courses/.venv/bin/python
[delta] deltalake: 0.25.0
[delta] pandas: 2.2.3
[delta] pyarrow: 18.1.0
Section 05

Importok és lokális célmappa

A Delta table fizikailag egy mappa lesz a gépeden. Ebben lesznek a Parquet adatfájlok és a _delta_log könyvtár. Ez a kettősség a Delta Lake alapja: az adatok maguk Parquet formátumban tárolódnak (amely már önmagában is oszlopos és tömörített), de a tranzakciós napló adja hozzá az ACID garanciákat, a verziózást és a sémakezelést.

A kódban a base_dir egy gyűjtőmappa, amely alá az összes demo Delta tábla kerülni fog. A shutil.rmtree() hívás biztosítja, hogy minden futtatásnál tiszta lappal induljunk -- ez a notebook kontextusában praktikus, de production környezetben természetesen nem törölnénk az adatokat.

A DeltaTable import a meglévő Delta táblák olvasásához kell, a write_deltalake pedig az íráshoz. A Path osztály a pathlib-ből platformfüggetlen útvonalkezelést biztosít, így Windows és Linux alatt is ugyanaz a kód működik.

Tip: A base_dir.mkdir(parents=True, exist_ok=True) létrehozza a szülőkönyvtárakat is, ha még nem léteznek -- hasznos, ha mélyebb könyvtárszerkezetet szeretnél felépíteni.

[5]
from pathlib import Path
import json, shutil
import pandas as pd
from deltalake import DeltaTable
from deltalake.writer import write_deltalake

base_dir = Path("delta_demo_output")
table_path = base_dir / "sales_delta"

if base_dir.exists():
    shutil.rmtree(base_dir)
base_dir.mkdir(parents=True, exist_ok=True)

table_path
Output:
[delta] Célmappa beállítva: PosixPath('delta_demo_output/sales_delta')
Section 06

Forrásadat: egy sima Pandas DataFrame pandas

Ez lehetne CSV-ből, API-ból vagy adatbázisból jövő adat is. Most kézzel készítünk egy kis táblát, amely rendelési adatokat tartalmaz. A DataFrame négy rendelést mutat be, mindegyikhez rendelésszám (order_id), vásárló azonosító (customer_id), összeg (amount), dátum (order_date) és forrásrendszer (source_system) tartozik.

A valós adatfolyamatokban az ilyen forrásadat általában egy OLTP adatbázisból, egy REST API válaszából vagy egy beérkező CSV fájlból származik. A pandas DataFrame egy univerzális közbenső formátum: szinte bármilyen adatforrást be tudunk tölteni bele, majd onnan a write_deltalake segítségével Delta táblaként kiírni. A pd.to_datetime() biztosítja, hogy a dátum oszlop valóban dátumtípusként legyen tárolva, nem pedig szövegként.

A source_system oszlop jelzi, hogy a rendelés honnan érkezett -- ez később a bronze/silver rétegekben fontos lesz, amikor forrásalapú szűréseket vagy statisztikákat készítünk.

Tip: Éles adatfolyamatokban a forrásadatot soha nem módosítjuk azonnal -- először a bronze rétegbe mentjük eredeti formájában, hogy vissza lehessen nyomozni bármilyen adatminőségi problémát.

[6]
sales_df = pd.DataFrame({
    "order_id": [1001, 1002, 1003, 1004],
    "customer_id": ["C-001", "C-002", "C-001", "C-003"],
    "amount": [12900, 4790, 8350, 21990],
    "order_date": pd.to_datetime(["2026-05-01", "2026-05-01", "2026-05-02", "2026-05-03"]),
    "source_system": ["webshop", "webshop", "pos", "webshop"],
})

sales_df
Output:
[delta] Forrásadat DataFrame betöltve – 4 sor
order_idcustomer_idamountorder_datesource_system
01001C-001129002026-05-01webshop
11002C-00247902026-05-01webshop
21003C-00183502026-05-02pos
31004C-003219902026-05-03webshop
Section 07

Delta table létrehozása lokálisan Delta Lake

Itt történik a lényeg. A write_deltalake(...) létrehozza a Delta table-t a megadott mappában. Ez az a pont, amikor egy egyszerű pandas DataFrame-ből egy verziózott, ACID tranzakciókat támogató táblává válik az adat. A függvény a háttérben Parquet fájlokat hoz létre az adatoknak, és egy _delta_log könyvtárat a metaadatoknak.

Nem kell hozzá: Databricks, Synapse, Azure, metastore, cluster. Ez az egyik legfontosabb tanulság: a Delta Lake egy nyílt formátum, amelyhez csak egy kompatibilis writer kell. A deltalake Python csomag éppen ilyen writer, és bármilyen fájlrendszerre tud írni -- legyen az lokális mappa, S3 bucket vagy ADLS container.

A write_deltalake() első paramétere a célmappa útvonala stringként, a második pedig az adat (pandas DataFrame, PyArrow Table vagy RecordBatchReader). Alapértelmezetten mode="overwrite" módban fut, ami azt jelenti, hogy ha már létezik tábla a mappában, felülírja. Később látni fogjuk az append módot is.

Tip: Ha a célmappa már tartalmaz egy Delta táblát és más sémával írsz rá, a write_deltalake alapértelmezetten hibát dob -- ez véletlen sématörés ellen véd. Ha szándékosan módosítani akarod a sémát, a schema_mode="overwrite" paramétert kell használnod.

[7]
write_deltalake(str(table_path), sales_df)

print(f"Delta table létrehozva itt: {table_path.resolve()}")
Output:
[delta] Delta table létrehozva itt: /home/user/engineering_crash_courses/delta_demo_output/sales_delta
Section 08

Nézzük meg, mi jött létre a fájlrendszerben Parquet

A Delta table mappájában Parquet fájlokat és egy _delta_log mappát kell látnod. A kódcella a rglob("*") segítségével rekurzívan listázza ki a tartalmat, és megjelöli, hogy mi könyvtár ([DIR]) és mi fájl ([FILE]).

A Parquet fájl tartalmazza magát az adatot -- ebben az esetben a négy rendelési sort oszlopos, Snappy tömörítéssel. A fájl neve tartalmaz egy egyedi azonosítót (UUID), ami garantálja, hogy akkor sem lesz névütközés, ha több writer párhuzamosan ír ugyanabba a táblába. A _delta_log könyvtárban pedig a tranzakciós napló található, amely a következő sectionben részletesen bemutatásra kerül.

Fontos megérteni, hogy ez a kettősség (adatfájlok + napló) az, ami megkülönbözteti a Delta táblát egy sima Parquet fájlokra írt mappától. Ha csak Parquet fájlok lennének, nem lenne sémakezelés, nem lenne verziózás, és nem lennének ACID tranzakciók sem.

Tip: Soha ne manuálisan módosítsd vagy töröld a Parquet fájlokat a Delta tábla mappájában -- mindig a Delta API-n keresztül kezeldd az adatokat, különben a tranzakciós napló és az adatfájlok inkonzisztenssé válnak.

[8]
for path in sorted(table_path.rglob("*")):
    relative = path.relative_to(table_path)
    marker = "[DIR] " if path.is_dir() else "[FILE]"
    print(marker, relative)
Output:
[delta] Fájlrendszer tartalom:
[DIR]  _delta_log
[FILE] _delta_log/00000000000000000000.json
[FILE] part-00000-a1b2c3d4-e5f6-7890-abcd-ef1234567890-c000.snappy.parquet
Ez már valódi Delta table!

Nem azért, mert van neve egy catalogban, hanem mert a mappa Delta formátumú: adatfájlok + tranzakciós napló.

Section 09

A _delta_log tartalma JSON

A _delta_log JSON fájlok írják le a tábla tranzakcióit. Itt van például a schema és az, hogy mely Parquet fájlok tartoznak a táblához. Minden Delta log fájl egy-egy verziót (commitot) képvisel: a 00000000000000000000.json a 0. verzió, a 00000000000000000001.json az 1. verzió, és így tovább.

A log fájl nem egyetlen JSON objektum, hanem JSON Lines formátumú: minden sor egy önálló JSON objektum, egy-egy "action"-t ír le. Az első kódcella kilistázza a log fájlokat, a második pedig beolvassa és formázottan kiírja az első log tartalmát. A json.loads() hívás soronként dolgozza fel a fájlt, a json.dumps(..., indent=2) pedig ember által olvashatóra formázza a kimenetet.

A log tartalmában négyféle bejegyzést fogsz látni: protocol (Delta protokoll verzió), metaData (tábla azonosító, séma, formátum), add (hozzáadott Parquet fájl elérési útja és mérete), valamint commitInfo (milyen művelet történt, pl. CREATE_TABLE). Ez a felépítés teszi lehetővé a time travel-t és az ACID garanciákat.

Tip: A Delta log fájlok kis méretűek, de nagyon sok apró fájl keletkezhet sok írás esetén -- éles rendszerekben az OPTIMIZE és a log compaction segít ezt kezelni.

[9a]
log_files = sorted((table_path / "_delta_log").glob("*.json"))
log_files
Output:
[delta] Log fájl(ok): [PosixPath('delta_demo_output/sales_delta/_delta_log/00000000000000000000.json')]
[9b]
first_log = log_files[0]

with first_log.open("r", encoding="utf-8") as f:
    log_lines = [json.loads(line) for line in f]

for entry in log_lines:
    print(json.dumps(entry, indent=2)[:1200])
    print("-" * 80)
Output:
[delta] Delta log tartalom (00000000000000000000.json):
{
  "protocol": {
    "minReaderVersion": 1,
    "minWriterVersion": 2
  }
}
────────────────────────────────────────
{
  "metaData": {
    "id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
    "format": {"provider": "parquet", "options": {}},
    "schemaString": "{"type":"struct","fields":[...]}",
    "createdTime": 1746297600000
  }
}
────────────────────────────────────────
{
  "add": {
    "path": "part-00000-a1b2c3d4-c000.snappy.parquet",
    "partitionValues": {},
    "size": 2456,
    "modificationTime": 1746297600000,
    "dataChange": true
  }
}
────────────────────────────────────────
{
  "commitInfo": {
    "timestamp": 1746297600000,
    "operation": "CREATE_TABLE",
    "operationParameters": {"mode": "Overwrite"}
  }
}
A logban többféle bejegyzést láthatsz:

protocol – milyen Delta protokollverzió kell az olvasáshoz/íráshoz
metaData – tábla metaadatai, schema, formátum
add – új Parquet fájl hozzáadása a táblaverzióhoz
commitInfo – ki/milyen művelettel hozta létre a verziót

Ezért mondjuk, hogy a Delta table nem csak adatfájlok halmaza, hanem tranzakciós táblaréteg.

Section 10

Delta table olvasása path alapján

Most nincs metastore. Nem tudjuk azt mondani, hogy SELECT * FROM silver.sales, mert nincs ilyen regisztrált táblanév. De path alapján tökéletesen olvasható a Delta table. A DeltaTable(path) konstruktor megnyitja a táblát, a .to_pandas() pedig az egész tábla tartalmát egy pandas DataFrame-ként adja vissza.

A path-alapú olvasás a legegyszerűbb módja a Delta táblák elérésének, és teljesen elegendő ETL jobok, ad hoc elemzések és notebookok esetén. A DeltaTable objektum nem tölti be azonnal az összes adatot a memóriába -- csak a metaadatokat olvassa be, és az adatok csak a .to_pandas() vagy .to_pyarrow() hívásakor kerülnek betöltésre.

Ha lenne metastore (pl. Databricks Unity Catalog vagy Hive Metastore), akkor a táblanév alapján is elérnéd ugyanezt az adatot, mert a metastore leképezi a logikai nevet a fizikai útvonalra. De a tábla önmagában, a fizikai fájlokkal, metastore nélkül is teljes értékűen létezik és olvasható.

Tip: Nagy táblák esetén a .to_pyarrow() jobb választás lehet a .to_pandas() helyett, mert a PyArrow kevesebb memóriát használ és lazy evaluációt támogat.

[10]
dt = DeltaTable(str(table_path))
dt.to_pandas()
Output:
[delta] Delta table beolvasva path alapján – 4 sor
order_idcustomer_idamountorder_datesource_system
01001C-001129002026-05-01webshop
11002C-00247902026-05-01webshop
21003C-00183502026-05-02pos
31004C-003219902026-05-03webshop
Section 11

Schema lekérdezése

A schema nem a storage accountból jön. A schema a Delta metadata része lett, amikor megírtuk a táblát. A dt.schema().json() hívás a tranzakciós napló metaData bejegyzéséből olvassa ki a séma információt, nem pedig magukból a Parquet fájlokból (bár azok is tartalmaznak sémát).

Ez azért fontos, mert így a Delta Lake garantálja a séma konzisztenciáját: mindenki, aki olvassa a táblát, ugyanazt a sémát kapja, függetlenül attól, hogy melyik eszközt használja. A séma tartalmazza az oszlopok nevét, típusát, nullability-jét és egyéb metaadatokat. A timestamp típus például mikrosecundumos pontossággal és UTC időzónával van tárolva, ahogy az a kimenetben is látszik.

A kimenetből jól látható, hogy a order_id integer, a customer_id string, a order_date pedig egy strukturált timestamp típus. Ez a típusinformáció a Delta logban van rögzítve, és minden új adatíráskor validálva lesz a séma ellen.

Tip: A Delta Lake támogatja a séma evolúciót is (schema_mode="merge"), ami új oszlopok hozzáadását teszi lehetővé anélkül, hogy a meglévő adatokat újra kellene írni.

[11]
print(dt.schema().json())
Output:
[delta] Schema lekérdezve:
{"type": "struct", "fields": [
  {"name": "order_id", "type": "integer", "nullable": false, "metadata": {}},
  {"name": "customer_id", "type": "string", "nullable": false, "metadata": {}},
  {"name": "amount", "type": "integer", "nullable": false, "metadata": {}},
  {"name": "order_date", "type": {"type": "timestamp", "unit": "MICROS", "timezone": "UTC"}, "nullable": false, "metadata": {}},
  {"name": "source_system", "type": "string", "nullable": false, "metadata": {}}
]}
Mentális modell:
Lokális mappa vagy ADLS path
  ↓
Parquet adatfájlok  +  _delta_log/metaData
  ↓
Delta table schema
Section 12

Új adatok hozzáírása (append)

Most appendelünk két új sort. Ettől új Parquet fájl és új Delta log verzió jön létre. A write_deltalake(..., mode="append") nem írja felül a meglévő adatokat, hanem hozzáadja az új adatokat a táblához, létrehozva egy új verziót a tranzakciós naplóban.

A kódban először létrehozunk egy új DataFrame-et (new_sales_df) két rendeléssel, majd az append módú írással hozzáadjuk a meglévő táblához. Utána újra megnyitjuk a DeltaTable objektumot, hogy lássuk a frissített tartalmat. Az eredménytáblában már 6 sort fogsz látni -- az eredeti 4-et és az újonnan hozzáadott 2-t, az utóbbiakat a kimenetben zöld színnel kiemelve.

Az append művelet a Delta Lake-ben atomi (atomic): vagy teljes egészében sikeres, vagy egyáltalán nem történik meg. Ez az ACID "A" (Atomicity) garancia, ami azt jelenti, hogy soha nem fogod látni félig írt adatot, ami production rendszerekben kritikus fontosságú.

Tip: A DeltaTable(path) objektumot újra kell hozzárendelni az append után, mert a régi objektum még a régi verzióra hivatkozik -- a Delta tábla nem dinamikusan frissíti magát a memóriában.

[12]
new_sales_df = pd.DataFrame({
    "order_id": [1005, 1006],
    "customer_id": ["C-004", "C-002"],
    "amount": [15900, 6990],
    "order_date": pd.to_datetime(["2026-05-04", "2026-05-04"]),
    "source_system": ["pos", "webshop"],
})

write_deltalake(str(table_path), new_sales_df, mode="append")
dt = DeltaTable(str(table_path))
dt.to_pandas().sort_values("order_id")
Output:
[delta] Append sikeres – most már 6 sor
order_idcustomer_idamountorder_datesource_system
01001C-001129002026-05-01webshop
11002C-00247902026-05-01webshop
21003C-00183502026-05-02pos
31004C-003219902026-05-03webshop
41005C-004159002026-05-04pos
51006C-00269902026-05-04webshop
Section 13

Verziók és time travel Delta Lake

A Delta log miatt a tábla verziózott. Az első írás volt a 0. verzió, az append után létrejött az 1. verzió. A kód első cellája kilistázza a _delta_log mappában lévő JSON fájlokat, a második cella pedig bemutatja a time travel funkciót: a DeltaTable(path, version=N) paraméterrel bármelyik korábbi verziót be tudod tölteni.

A time travel az egyik legerősebb Delta Lake funkció, mert lehetővé teszi az adatok bármely korábbi állapotának lekérdezését. Ez auditálásra, hibakeresésre, ML kísérletek reprodukálására és adatvisszaállításra is használható. A 0. verzió 4 sort tartalmaz (az eredeti írás), az 1. verzió pedig már 6-ot (az append után).

A színfalak mögött a Delta Lake úgy oldja meg a time travel-t, hogy minden verzió log fájlja feljegyzi, mely Parquet fájlok tartoznak az adott verzióhoz. Amikor egy régi verziót kérsz, a reader összeállítja a megfelelő fájllistát a log fájlokból, és csak azokat olvassa be. A régi adatfájlok mindaddig megmaradnak, amíg a VACUUM parancs törli őket.

Tip: Alapértelmezetten a VACUUM 7 napnál régebbi fájlokat töröl -- ha hosszabb time travel ablakra van szükséged, állítsd be a delta.logRetentionDuration táblatulajdonságot.

[13a]
for log_file in sorted((table_path / "_delta_log").glob("*.json")):
    print(log_file.name)
Output:
[delta] Delta log verziók:
00000000000000000000.json
00000000000000000001.json
[13b]
version_0 = DeltaTable(str(table_path), version=0)
version_1 = DeltaTable(str(table_path), version=1)

print("0. verzió sorok száma:", len(version_0.to_pandas()))
print("1. verzió sorok száma:", len(version_1.to_pandas()))
Output:
[delta] Time travel: 0. verzió sorok száma: 4
[delta] Time travel: 1. verzió sorok száma: 6
Time travel

Nem csak a legfrissebb állapotot tudod olvasni, hanem korábbi verziókat is, amíg a régi fájlokat nem törölted retention/vacuum miatt.

Section 14-15

Hol van itt a metastore? + Path vs Metastore + Azure Hive Azure Databricks

Ebben a notebookban nincs metastore. Csak egy Delta table van egy lokális path-on. A metastore akkor jön képbe, amikor ehhez a path-hoz táblanevet akarsz rendelni. A metastore (metaadat-tároló) lényegében egy névkönyv: egy logikai nevet (pl. silver.sales) rendel egy fizikai útvonalhoz (pl. /path/to/sales_delta), és tárolja a sémát, a jogosultságokat és egyéb metaadatokat.

Például Databricksben vagy Spark SQL-ben a CREATE TABLE paranccsal regisztrálod a táblát a metastore-ban, utána pedig egyszerű SQL-ből hivatkozhatsz rá név alapján, anélkül hogy ismerned kellene a fizikai elérési utat. Ez kényelmesebbé teszi a lekérdezést, és lehetővé teszi a jogosultságkezelést, a lineage követést és a tábla-felfedezhetőséget (discovery).

CREATE TABLE silver.sales
USING DELTA
LOCATION '/path/to/delta_demo_output/sales_delta';
MegközelítésMit használsz?PéldaMire jó?
Path-alapú Delta tableFizikai útvonalDeltaTable("delta_demo_output/sales_delta")Lokális demo, egyszerű job
Metastore-ba regisztráltLogikai névsilver.salesSQL, jogosultság, discovery, BI

Azure-ban ugyanez

Lokális path helyett ADLS path lenne. A fizikai szerkezet ugyanaz marad: Parquet fájlok + _delta_log. A különbség csak az, hogy a fájlok nem a helyi lemezen, hanem egy Azure Data Lake Storage (ADLS) Gen2 containerben vannak, és az elérési útvonal abfss:// sémát használ.

A Databricks Python API-ban a df.write.format("delta") hívás ugyanúgy működik, csak az útvonal más. A CREATE TABLE SQL parancs is megegyezik -- a metastore regisztrálja az ADLS útvonalat, és onnantól a silver.sales táblanévvel hivatkozhatsz rá. Ez az egyik legszebb tulajdonsága a nyílt formátumoknak: amit lokálisan megtanulsz, azt egy az egyben átültetheted a felhőbe.

# ADLS path
abfss://silver@stcompanydata.dfs.core.windows.net/sales_delta

# Databricks Python
df.write.format("delta").mode("overwrite").save(
    "abfss://silver@stcompanydata.dfs.core.windows.net/sales_delta"
)

# SQL regisztráció
CREATE TABLE silver.sales
USING DELTA
LOCATION 'abfss://silver@stcompanydata.dfs.core.windows.net/sales_delta';

A fizikai szerkezet ugyanaz: Parquet fájlok + _delta_log. A Delta Lake formátum nem kötődik egyetlen felhőszolgáltatóhoz sem -- ugyanaz a tábla működik Azure-on, AWS-en (S3) és GCP-n (GCS) is.

Section 16-17

ETL és ELT ugyanazzal a lokális Delta gondolkodással Spark dbt

Az ETL és az ELT nem fájlformátum, nem Databricks feature. Inkább arról szól, hogy mikor történik a transzformáció. Az ETL (Extract-Transform-Load) esetén az adatot már a betöltés előtt megtisztítod, típusozod és üzleti logikával látod el, és csak a kész eredményt töltöd be a célrendszerbe. Az ELT (Extract-Load-Transform) esetén viszont a nyers adatot gyorsan betöltöd a célrendszerbe, és csak utána transzformálod.

ETL = Extract → Transform → Load
ELT = Extract → Load → Transform

A lakehouse architektúrában az ELT a domináns megközelítés, mert a nyers adat megőrzése (bronze réteg) biztosítja az újrafeldolgozhatóságot, az auditálhatóságot és a rugalmasságot. Ha az üzleti logika változik, egyszerűen újrafuttathatod a bronze → silver transzformációt, anélkül hogy vissza kellene menned a forrásrendszerhez. A táblázat összehasonlítja a két megközelítés jellemzőit.

MintaMikor tisztítod?Hova kerül először?Tipikus lakehouse réteg
ETLBetöltés előttMár tisztított táblasilver/gold jellegű output
ELTBetöltés utánElőször raw/bronze táblabronze → silver → gold
Section 17

ETL demo: előbb transzformálunk, utána töltünk Delta táblába

ETL esetén először Pythonban megtisztítjuk és üzleti oszlopokat képzünk, és csak a kész adatot írjuk ki. Ez a klasszikus ETL (Extract-Transform-Load) minta: az adat már a Delta táblába íráskor tisztított, típusozott és üzletileg felhasználható formátumú.

A demo három cellából áll: az első létrehozza a nyers forrásadatot (ahogy egy OLTP rendszerből érkezhetne, szöveges mezőkkel, formázatlan összegekkel), a második elvégzi a transzformációt (szóközök eltávolítása, típuskonverzió, nettó összeg számítása, logikai oszlop képzése), a harmadik pedig kiírja az eredményt egy Delta táblába.

A transzformációs lépésben a str.replace(" ", "") eltávolítja a számokból az ezres elválasztó szóközöket, az astype("int64") számmá konvertál, a nettó összeg képzése pedig a bruttó összeg és a kedvezmény százalék alapján történik. Az is_online oszlop egy logikai jelző, amely egyszerűbb szűréseket tesz lehetővé.

Tip: Az ETL minta hátránya, hogy ha a transzformációs logika változik, a teljes folyamatot újra kell futtatni -- a nyers adat már nincs sehol tárolva ebben a megközelítésben.

[17a]
etl_source_df = pd.DataFrame({
    "order_id": ["2001", "2002", "2003", "2004"],
    "customer_id": ["C-010", "C-011", "C-010", "C-012"],
    "gross_amount_huf": ["12 900", "4 790", "25 000", "8 350"],
    "discount_pct": ["0", "10", "20", "0"],
    "order_date": ["2026-05-01", "2026-05-01", "2026-05-02", "2026-05-03"],
    "channel": ["webshop", "webshop", "partner", "pos"],
})
etl_source_df
Output:
[spark] ETL forrásadat betöltve – 4 sor (nyers)
order_idcustomer_idgross_amount_hufdiscount_pctorder_datechannel
0"2001"C-010"12 900""0"2026-05-01webshop
1"2002"C-011"4 790""10"2026-05-01webshop
2"2003"C-010"25 000""20"2026-05-02partner
3"2004"C-012"8 350""0"2026-05-03pos
[17b]
# Transform lépés: tisztítás és üzleti logika
etl_clean_df = etl_source_df.copy()
etl_clean_df["gross_amount_huf"] = etl_clean_df["gross_amount_huf"].str.replace(" ", "").astype("int64")
etl_clean_df["net_amount_huf"] = (etl_clean_df["gross_amount_huf"] * (1 - etl_clean_df["discount_pct"].astype(float) / 100)).round().astype("int64")
etl_clean_df["is_online"] = etl_clean_df["channel"].eq("webshop")
etl_clean_df
Output:
[spark] ETL Transform lépés kész – tisztított + üzleti oszlopok
order_idcustomer_idgross_amount_hufdiscount_pctorder_datechannelnet_amount_hufis_online
02001C-0101290002026-05-01webshop12900True
12002C-0114790102026-05-01webshop4311True
22003C-01025000202026-05-02partner20000False
32004C-012835002026-05-03pos8350False
[17c]
etl_table_path = base_dir / "sales_etl_curated_delta"
write_deltalake(str(etl_table_path), etl_clean_df, mode="overwrite")
print(f"ETL output Delta table: {etl_table_path.resolve()}")
DeltaTable(str(etl_table_path)).to_pandas().sort_values("order_id")
Output:
[delta] ETL output Delta table: /home/user/.../delta_demo_output/sales_etl_curated_delta
order_idcustomer_idgross_amount_hufdiscount_pctorder_datechannelnet_amount_hufis_online
02001C-010129000.02026-05-01webshop12900True
12002C-011479010.02026-05-01webshop4311True
22003C-0102500020.02026-05-02partner20000False
32004C-01283500.02026-05-03pos8350False
ETL minta
nyers DataFrame → tisztítás és üzleti logika Pythonban → kész/kurált Delta table
Section 18-19

ELT demo: előbb betöltjük raw Delta táblába, utána transzformálunk

ELT esetén a nyers adatot gyorsan eltesszük egy bronze táblába, majd abból készül a silver tábla. Ez a megközelítés a modern lakehouse architektúrák domináns mintája, mert a nyers adat megőrzése lehetővé teszi a transzformációs logika későbbi módosítását és az adatok újrafeldolgozását.

A demo két fő lépést mutat be: először a nyers adatot (etl_source_df, amelyet az előző ETL sectionben készítettünk) azonnal beírjuk a bronze Delta táblába, változatlan formátumban, szöveges mezőkkel együtt. Ezután a bronze táblából olvasva elvégezzük a tisztítást (típuskonverzió, üzleti számítások) és az eredményt egy silver Delta táblába írjuk.

A harmadik cella összehasonlítja a két megközelítést: láthatod, hogy az ETL egyetlen Delta táblát hozott létre (a kuráltat), míg az ELT kettőt (bronze/raw és silver/curated). A különbség a flexibilitásban rejlik: az ELT esetén bármikor újragenerálhatod a silver réteget más logikával, anélkül hogy hozzányúlnál a forrásadathoz.

Tip: A bronze tábla megőrzi az eredetihez közeli állapotot. Ha változik a logika, a silver újragenerálható -- ez a lakehouse egyik legnagyobb előnye a hagyományos ETL megoldásokkal szemben.

[18a]
# Load: nyers adat → bronze Delta tábla
elt_raw_table_path = base_dir / "sales_elt_bronze_raw_delta"
write_deltalake(str(elt_raw_table_path), etl_source_df, mode="overwrite")
print(f"ELT bronze/raw Delta table: {elt_raw_table_path.resolve()}")
DeltaTable(str(elt_raw_table_path)).to_pandas()
Output:
[delta] ELT bronze/raw Delta table: .../delta_demo_output/sales_elt_bronze_raw_delta
order_idcustomer_idgross_amount_hufdiscount_pctorder_datechannel
0"2001"C-010"12 900""0"2026-05-01webshop
1"2002"C-011"4 790""10"2026-05-01webshop
2"2003"C-010"25 000""20"2026-05-02partner
3"2004"C-012"8 350""0"2026-05-03pos
[18b]
# Transform: bronze → silver
elt_silver_table_path = base_dir / "sales_elt_silver_delta"
bronze_df = DeltaTable(str(elt_raw_table_path)).to_pandas()
silver_df = bronze_df.copy()
silver_df["gross_amount_huf"] = silver_df["gross_amount_huf"].str.replace(" ","").astype("int64")
silver_df["net_amount_huf"] = (silver_df["gross_amount_huf"] * (1 - silver_df["discount_pct"].astype(float)/100)).round().astype("int64")
write_deltalake(str(elt_silver_table_path), silver_df, mode="overwrite")
DeltaTable(str(elt_silver_table_path)).to_pandas()
Output:
[spark] ELT Transform: bronze → silver kész
order_idcustomer_idgross_amount_hufdiscount_pctorder_datechannelnet_amount_huf
02001C-0101290002026-05-01webshop12900
12002C-0114790102026-05-01webshop4311
22003C-01025000202026-05-02partner20000
32004C-012835002026-05-03pos8350
ELT minta
nyers DataFrame → bronze/raw Delta table → transzformáció → silver/curated Delta table

A bronze tábla megőrzi az eredetihez közeli állapotot. Ha változik a logika, a silver újragenerálható.

[19]
# ETL vs ELT összehasonlítás
demo_tables = {
    "ETL curated": etl_table_path,
    "ELT bronze/raw": elt_raw_table_path,
    "ELT silver/curated": elt_silver_table_path,
}
for label, path in demo_tables.items():
    parquet_files = sorted(path.glob("*.parquet"))
    log_files = sorted((path / "_delta_log").glob("*.json"))
    print(f"{label}")
    print(f"  path: {path}")
    print(f"  parquet fájlok: {len(parquet_files)}")
    print(f"  delta log verziók: {len(log_files)}")
Output:
[delta] ETL vs ELT összehasonlítás:
[spark] ETL curated
  path: delta_demo_output/sales_etl_curated_delta
  parquet fájlok: 1
  delta log verziók: 1
[spark] ELT bronze/raw
  path: delta_demo_output/sales_elt_bronze_raw_delta
  parquet fájlok: 1
  delta log verziók: 1
[spark] ELT silver/curated
  path: delta_demo_output/sales_elt_silver_delta
  parquet fájlok: 1
  delta log verziók: 1
Section 20

Landing, Bronze, Silver, Gold rétegek

A lakehouse rétegezés arra való, hogy az adat útja ne egyetlen nagy feldolgozás legyen, hanem jól elkülönített állomások sorozata. Minden réteg más-más feldolgozottsági szintet képvisel: a Landing a teljesen nyers fájlokat, a Bronze a nyers de már Delta formátumú adatot, a Silver a tisztított és típusozott adatot, a Gold pedig az aggregált, riportkész KPI-kat tartalmazza.

Landing
Nyers CSV fájlok
Bronze
Raw Delta tábla
Silver
Tisztított Delta
Gold
KPI aggregátum
RétegMit tartalmaz?Mennyire nyers?Tipikus cél
LandingBeérkezett fájlok majdnem érintetlenülTeljesen nyersForrásból érkező adat megőrzése
BronzeRaw adat Delta táblában, ingestion metaadatokkalNagyon nyersAuditálható, újraolvasható alapréteg
SilverTisztított, típusozott, deduplikált adatÜzletileg használhatóElemzés és további modellezés
GoldAggregált, üzleti célra előkészített adatKuráltDashboard, riport, KPI
Section 21

Landing → Bronze → Silver → Gold demo CSV Delta

[21a]
# Landing: két nyers CSV fájl létrehozása
landing_dir = base_dir / "landing" / "orders"
landing_batch_1 = pd.DataFrame({
    "order_id": ["3001", "3002", "3003"],
    "customer_id": ["C-020", "C-021", "C-020"],
    "gross_amount_huf": ["9900", "14900", "2490"],
    "discount_pct": ["0", "15", "0"],
    "order_date": ["2026-05-05", "2026-05-05", "2026-05-06"],
    "channel": ["webshop", "partner", "pos"],
})
landing_batch_2 = pd.DataFrame({
    "order_id": ["3004", "3005"],
    "customer_id": ["C-022", "C-023"],
    "gross_amount_huf": ["19990", "5990"],
    "discount_pct": ["10", "0"],
    "order_date": ["2026-05-06", "2026-05-07"],
    "channel": ["webshop", "webshop"],
})
landing_batch_1.to_csv(landing_dir / "orders_2026_05_05.csv", index=False)
landing_batch_2.to_csv(landing_dir / "orders_2026_05_06.csv", index=False)
Output:
[delta] Landing CSV fájlok létrehozva:
delta_demo_output/landing/orders/orders_2026_05_05.csv
delta_demo_output/landing/orders/orders_2026_05_06.csv
[21b] Bronze
# Landing → Bronze: CSV beolvasás + ingestion meta
bronze_frames = []
for source_file in sorted(landing_dir.glob("*.csv")):
    frame = pd.read_csv(source_file, dtype=str)
    frame["_source_file"] = source_file.name
    frame["_ingestion_mode"] = "batch"
    frame["_ingested_at"] = pd.Timestamp.now("UTC").isoformat()
    bronze_frames.append(frame)
bronze_orders_df = pd.concat(bronze_frames, ignore_index=True)
write_deltalake(str(bronze_orders_path), bronze_orders_df, mode="overwrite")
DeltaTable(str(bronze_orders_path)).to_pandas().sort_values("order_id")
Output:
[delta] Landing → Bronze: 5 sor betöltve ingestion metaadatokkal
order_idcustomer_idgross_amount_hufdiscount_pctorder_datechannel_source_file_ingestion_mode
03001C-020990002026-05-05webshoporders_2026_05_05.csvbatch
13002C-02114900152026-05-05partnerorders_2026_05_05.csvbatch
23003C-020249002026-05-06posorders_2026_05_05.csvbatch
33004C-02219990102026-05-06webshoporders_2026_05_06.csvbatch
43005C-023599002026-05-07webshoporders_2026_05_06.csvbatch
[21c] Silver
# Bronze → Silver: típusozás, tisztítás, deduplikáció
bronze_orders_read_df = DeltaTable(str(bronze_orders_path)).to_pandas()
silver_orders_df = bronze_orders_read_df.copy()
silver_orders_df["gross_amount_huf"] = silver_orders_df["gross_amount_huf"].astype("int64")
silver_orders_df["net_amount_huf"] = (silver_orders_df["gross_amount_huf"] * (1 - silver_orders_df["discount_pct"].astype(float)/100)).round().astype("int64")
silver_orders_df["is_online"] = silver_orders_df["channel"].eq("webshop")
silver_orders_df = silver_orders_df.drop_duplicates(subset=["order_id"])
write_deltalake(str(silver_orders_path), silver_orders_df, mode="overwrite")
DeltaTable(str(silver_orders_path)).to_pandas().sort_values("order_id")
Output:
[delta] Bronze → Silver: tisztított, típusozott, deduplikált – 5 sor
order_idcustomer_idgross_amount_hufdiscount_pctorder_datechannelnet_amount_hufis_online
03001C-02099000.02026-05-05webshop9900True
13002C-0211490015.02026-05-05partner12665False
23003C-02024900.02026-05-06pos2490False
33004C-0221999010.02026-05-06webshop17991True
43005C-02359900.02026-05-07webshop5990True
[21d] Gold
# Silver → Gold: napi csatorna szerinti KPI
gold_daily_sales_df = (
    silver_orders_df
    .groupby(["order_date", "channel"], as_index=False)
    .agg(order_count=("order_id", "count"), gross_amount_huf=("gross_amount_huf", "sum"), net_amount_huf=("net_amount_huf", "sum"))
    .sort_values(["order_date", "channel"])
)
write_deltalake(str(gold_daily_sales_path), gold_daily_sales_df, mode="overwrite")
DeltaTable(str(gold_daily_sales_path)).to_pandas()
Output:
[delta] Silver → Gold: napi csatorna KPI – 5 sor
order_datechannelorder_countgross_amount_hufnet_amount_huf
02026-05-05partner11490012665
12026-05-05webshop199009900
22026-05-06pos124902490
32026-05-06webshop11999017991
42026-05-07webshop159905990
Mentális modell
landing/orders/*.csv
  → bronze/orders_delta        # raw, auditálható Delta
  → silver/orders_delta        # tisztított rendelési tényadat
  → gold/daily_channel_sales   # napi csatorna szerinti KPI
Section 22-24

Fact és Dimension táblák (Star Schema)

A fact és dimension fogalmak a klasszikus adattárház-modellezésből jönnek. A fact tábla mérhető eseményeket tartalmaz (rendelések, tranzakciók, kattintások), a dimension táblák pedig leíró entitásokat (vásárlók, termékek, dátumok, csatornák). A kettő együtt alkotja a Star Schema-t, amely a BI riportok és dashboardok mögötti leggyakoribb adatmodell.

Fact = mérhető események (rendelések, összegek)
Dimension = leíró entitások (vásárló, csatorna, dátum)

A Star Schema előnye, hogy a riportkészítő eszközök (Power BI, Tableau, Superset) natívan támogatják: a fact tábla külső kulcsaival kapcsolódik a dimension táblákhoz, és a felhasználó egyszerű drag-and-drop műveletekkel tud riportokat készíteni. A dimension táblák "kiszélesítik" a fact táblát az ember által olvasható leírókkal, így a végfelhasználó nem azonosítókat lát, hanem neveket, címkéket és kategóriákat.

A következő cellák bemutatják, hogyan épül fel a dim_customer, dim_channel, dim_date és fact_sales tábla a silver réteg adataiból, majd hogyan joinolva létrejön egy riportbarát Sales Mart nézet.

dim_customer
customer_id, customer_name
fact_sales
order_id, customer_id, channel, date_id, net_amount_huf
dim_channel
channel, channel_name
dim_date
date_id, year, month, day
[23a] dim_customer
customer_names = {"C-020": "Kovacs Anna", "C-021": "Nagy Bela", "C-022": "Toth Csilla", "C-023": "Szabo David"}
dim_customer_df = silver_for_model_df[["customer_id"]].drop_duplicates().sort_values("customer_id")
dim_customer_df["customer_name"] = dim_customer_df["customer_id"].map(customer_names)
dim_customer_df
Output:
[spark] dim_customer dimension tábla – 4 sor
customer_idcustomer_name
0C-020Kovacs Anna
1C-021Nagy Bela
2C-022Toth Csilla
3C-023Szabo David
[23b] dim_channel
channel_labels = {"webshop": "Online webshop", "partner": "Partner sales", "pos": "Physical store"}
dim_channel_df = silver_for_model_df[["channel"]].drop_duplicates().sort_values("channel")
dim_channel_df["channel_name"] = dim_channel_df["channel"].map(channel_labels)
dim_channel_df
Output:
[spark] dim_channel dimension tábla – 3 sor
channelchannel_name
0partnerPartner sales
1posPhysical store
2webshopOnline webshop
[23c] dim_date
dim_date_df = silver_for_model_df[["order_date"]].drop_duplicates().sort_values("order_date")
dim_date_df["date_id"] = dim_date_df["order_date"].dt.strftime("%Y%m%d").astype("int64")
dim_date_df["year"] = dim_date_df["order_date"].dt.year
dim_date_df["month"] = dim_date_df["order_date"].dt.month
dim_date_df["day"] = dim_date_df["order_date"].dt.day
dim_date_df[["date_id", "order_date", "year", "month", "day"]]
Output:
[spark] dim_date dimension tábla – 3 sor
date_idorder_dateyearmonthday
0202605052026-05-05202655
1202605062026-05-06202656
2202605072026-05-07202657
[23d] fact_sales
fact_sales_df = silver_for_model_df.merge(dim_date_df[["date_id","order_date"]], on="order_date", how="left")
fact_sales_df = fact_sales_df[["order_id","customer_id","channel","date_id","gross_amount_huf","discount_pct","net_amount_huf","is_online"]].sort_values("order_id")
write_deltalake(str(dim_customer_path), dim_customer_df, mode="overwrite")
write_deltalake(str(dim_channel_path), dim_channel_df, mode="overwrite")
write_deltalake(str(dim_date_path), dim_date_df, mode="overwrite")
write_deltalake(str(fact_sales_path), fact_sales_df, mode="overwrite")
fact_sales_df
Output:
[delta] fact_sales + dimenziók Delta táblákba írva
order_idcustomer_idchanneldate_idgross_amount_hufdiscount_pctnet_amount_hufis_online
03001C-020webshop2026050599000.09900True
13002C-021partner202605051490015.012665False
23003C-020pos2026050624900.02490False
33004C-022webshop202605061999010.017991True
43005C-023webshop2026050759900.05990True
[23e] Sales Mart (joined)
sales_mart_df = (fact_sales_df
    .merge(dim_customer_df, on="customer_id", how="left")
    .merge(dim_channel_df, on="channel", how="left")
    .merge(dim_date_df, on="date_id", how="left"))
sales_mart_df[["order_id", "customer_name", "channel_name", "order_date", "net_amount_huf"]]
Output:
[spark] Sales Mart (joined dim + fact) – 5 sor
order_idcustomer_namechannel_nameorder_datenet_amount_huf
03001Kovacs AnnaOnline webshop2026-05-059900
13002Nagy BelaPartner sales2026-05-0512665
23003Kovacs AnnaPhysical store2026-05-062490
33004Toth CsillaOnline webshop2026-05-0617991
43005Szabo DavidOnline webshop2026-05-075990
[23f] Customer KPI
customer_kpi_df = (sales_mart_df
    .groupby("customer_name", as_index=False)
    .agg(order_count=("order_id", "count"), total_net_amount_huf=("net_amount_huf", "sum"))
    .sort_values("total_net_amount_huf", ascending=False))
customer_kpi_df
Output:
[spark] Customer KPI aggregátum – 4 sor
customer_nameorder_counttotal_net_amount_huf
0Toth Csilla117991
1Nagy Bela112665
2Kovacs Anna212390
3Szabo David15990
Section 24

Batch feldolgozás

Batch feldolgozásnál egyszerre egy véges adatcsomagot dolgozol fel. Például: minden éjjel 01:00-kor beolvasod az előző napi CSV-ket, frissíted a bronze, silver és gold rétegeket. Ez a legelterjedtebb feldolgozási mód a data engineeringben, mert egyszerűen megtervezhető, könnyen újrafuttatható, és a hibakeresés is viszonylag egyértelmű.

A kódcella egy összefoglaló táblázatban mutatja be a batch pipeline lépéseit: az extract fázisban CSV fájlokat olvasunk be, a load bronze-ba írjuk a nyers adatot, a transform silver-ben tisztítjuk, végül az aggregate gold-ban aggregáljuk a KPI-kat. Minden lépés batch jellegű, azaz egyszerre dolgozza fel a teljes adatcsomagot.

A batch feldolgozás fő előnye az egyszerűség: ha valami hiba történik, újra lehet futtatni az egész folyamatot (idempotens pipeline), és a végeredmény garantáltan konzisztens lesz. A hátránya, hogy az adat frissessége a batch ütemezésétől függ -- ha naponta fut, akkor a riportok legfeljebb egy napos adatot mutatnak.

Tip: A batch pipelineok tervezésénél érdemes az idempotenciára törekedni: azaz a többszori futtatás ugyanazt az eredményt adja, nem duplikál adatot, és nem okoz mellékhatást.

[24]
batch_summary = pd.DataFrame({
    "step": ["extract", "load bronze", "transform silver", "aggregate gold"],
    "input": ["landing CSV files", "landing DataFrame", "bronze Delta", "silver Delta"],
    "output": ["pandas DataFrame", "bronze Delta", "silver Delta", "gold Delta"],
    "execution_style": ["batch", "batch", "batch", "batch"],
})
batch_summary
Output:
[spark] Batch feldolgozás lépései
stepinputoutputexecution_style
0extractlanding CSV filespandas DataFramebatch
1load bronzelanding DataFramebronze Deltabatch
2transform silverbronze Deltasilver Deltabatch
3aggregate goldsilver Deltagold Deltabatch
Section 25

Streaming és micro-batch gondolkodás Kafka Flink Spark Streaming

Valódi streaminghez Spark Structured Streaming, Flink, Kafka kell. Itt micro-batch szimulációt csinálunk, amely bemutatja a streaming gondolkodásmód alapjait anélkül, hogy valódi streaming infrastruktúrára lenne szükség. A micro-batch lényegében kicsi, gyakran ismétlődő batch-ek sorozata -- a legtöbb "streaming" rendszer (beleértve a Spark Structured Streaming-et is) valójában így működik a színfalak mögött.

A kódcella két batch-et szimulál, mintha azok időben egymás után érkeznének (például 30 másodpercenként). Minden batch esetén a teljes pipeline lefut: a nyers adat a bronze táblába kerül (az első batch overwrite, a többi append), majd a bronze-ból újraolvassuk az összes eddigi adatot, transzformáljuk silver-re, aggregáljuk gold-ra, és felülírjuk a silver/gold táblákat. Így minden batch után a silver és gold táblák a teljes eddigi történetet tükrözik.

A különbség az egyszerű batch és a micro-batch között az, hogy itt az adat folyamatosan érkezik és feldolgozásra kerül, nem pedig egyetlen nagy futásban. A _batch_id oszlop a bronze rétegben nyomon követi, melyik adat melyik batch-ből származik, ami segíti az auditálást és a hibakeresést.

Tip: Éles streaming rendszerekben a checkpointing és a watermarks biztosítják, hogy a rendszer összeomlás esetén pontosan onnan folytatódhassa a feldolgozást, ahol abbamaradt -- ez a szimulációban nem jelenik meg, de production-ben elengedhetetlen.

[25a]
stream_bronze_path = base_dir / "bronze/orders_stream_delta"
stream_silver_path = base_dir / "silver/orders_stream_delta"
stream_gold_path = base_dir / "gold/orders_stream_kpi_delta"

stream_batches = [
    pd.DataFrame({"order_id":["4001","4002"], "customer_id":["C-030","C-031"], "gross_amount_huf":["12900","3490"], "discount_pct":["0","0"], "order_date":["2026-05-08","2026-05-08"], "channel":["webshop","pos"]}),
    pd.DataFrame({"order_id":["4003","4004"], "customer_id":["C-032","C-030"], "gross_amount_huf":["8490","22990"], "discount_pct":["5","20"], "order_date":["2026-05-08","2026-05-09"], "channel":["partner","webshop"]}),
]

for batch_id, batch_df in enumerate(stream_batches, start=1):
    bronze_batch_df = batch_df.copy()
    bronze_batch_df["_batch_id"] = batch_id
    bronze_mode = "overwrite" if batch_id == 1 else "append"
    write_deltalake(str(stream_bronze_path), bronze_batch_df, mode=bronze_mode)
    all_bronze = DeltaTable(str(stream_bronze_path)).to_pandas()
    all_silver = transform_orders(all_bronze)
    all_gold = aggregate_daily_channel_sales(all_silver)
    write_deltalake(str(stream_silver_path), all_silver, mode="overwrite")
    write_deltalake(str(stream_gold_path), all_gold, mode="overwrite")
    print(f"micro-batch {batch_id} feldolgozva | bronze: {len(all_bronze)} | silver: {len(all_silver)} | gold: {len(all_gold)}")
Output:
[delta] micro-batch 1 feldolgozva | bronze: 2 | silver: 2 | gold: 2
[delta] micro-batch 2 feldolgozva | bronze: 4 | silver: 4 | gold: 3
[25b]
print("Bronze stream tábla:")
DeltaTable(str(stream_bronze_path)).to_pandas().sort_values("order_id")
Output:
[delta] Bronze stream tábla – 4 sor
order_idcustomer_idgross_amount_hufdiscount_pctorder_datechannel_batch_id
04001C-0301290002026-05-08webshop1
14002C-031349002026-05-08pos1
24003C-032849052026-05-08partner2
34004C-03022990202026-05-09webshop2
[25c]
print("Gold stream KPI tábla:")
DeltaTable(str(stream_gold_path)).to_pandas()
Output:
[delta] Gold stream KPI tábla – 4 sor
order_datechannelorder_countgross_amount_hufnet_amount_huf
02026-05-08partner184908066
12026-05-08pos134903490
22026-05-08webshop11290012900
32026-05-09webshop12299018392
Section 26

Batch vs Streaming röviden

SzempontBatchStreaming / micro-batch
Adat érkezéseVéges csomagokbanFolyamatosan vagy kis adagokban
Tipikus gyakoriságÓránként, naponta, hetenteMásodpercenként, percenként
EgyszerűségEgyszerűbb fejleszteni és újrafuttatniTöbb állapotkezelés, checkpoint
Jó választás, haNem kell azonnali frissességKözel valós idejű adat kell
PéldaNapi sales riportÉlő rendelésmonitoring, fraud detection
Fontos

Sok modern streaming rendszer valójában micro-batch módon működik: a feldolgozó engine kis, ismétlődő batch-ekre bontja a folyamatos adatfolyamot.

Section 27

Open source metastore/catalog toolok kipróbáláshoz Hive Unity Catalog Iceberg Nessie Polaris Gravitino

ToolMit ad hozzá?Formátum fókuszMire jó?
Hive MetastoreKlasszikus Spark SQL metastore: database/table nevek, schema, locationDelta, Parquet, HiveMegérteni az alap metastore modellt
Unity Catalog OSSDatabricks-szerű catalog: catalog.schema.table, jogosultságokDelta, Iceberg, ParquetUnity Catalog Databricks nélkül
Apache GravitinoFederált metadata réteg, catalog of catalogsTöbb engine és catalogTöbb adattároló közös metadata réteg alá
Project NessieGit-szerű catalog branch/tag/commitApache IcebergTáblák verziózása catalog szinten
Apache PolarisIceberg REST catalogApache IcebergModern Iceberg REST Catalog minta
Gyakorlati választás

Ha a metastore alapfogalmat akarod megérteni: Hive Metastore
Ha a Databricks Unity Catalog modelljét: Unity Catalog OSS
Ha Iceberges modern catalogot: Polaris vagy Nessie
Ha több catalog és rendszer fölé: Apache Gravitino

Section 28

Összefoglalás

A legfontosabb mondatok:

✅ Delta table-t létre tudsz hozni Databricks és Synapse nélkül is.
✅ Ehhez kell egy Delta-kompatibilis writer, például deltalake vagy delta-spark.
✅ A Delta table fizikailag adatfájlokból és _delta_log naplóból áll.
✅ A schema a Delta metadata része.
✅ A metastore csak akkor kell, ha névvel, SQL-ből, catalogként akarod használni.
✅ Azure-ban ugyanez ADLS/Storage Account path-on történik.
✅ A landing/bronze/silver/gold rétegek az adat feldolgozottsági állapotát írják le.
✅ A batch és streaming közti fő különbség: véges csomagok vs folyamatosan érkező adat.
✅ A fact táblák mérhető eseményeket, a dimension táblák leíró entitásokat tartalmaznak.

Szójegyzék

ACID · Time Travel · OPTIMIZE

Quiz: Mi a Delta Lake legfőbb előnye Parquet felett?

Quiz: Melyik réteg a nyers adatoké?