Delta table lokálisan, Databricks és Synapse nélkül
Delta Lake
Ez a notebook azt mutatja meg, hogy egy Delta table nem feltétlenül Databricksben vagy Synapse-ban jön létre. A Delta Lake egy nyílt tárolási formátum: ha van hozzá kompatibilis writer, akkor lokális fájlrendszerre is tudsz Delta táblát írni.
A notebook fő gondolata:
Delta table = Parquet adatfájlok + _delta_log tranzakciós napló
Metastore/Catalog = névvel ellátott bejegyzés, ami rámutat a Delta table locationre
Itt nem használunk Databrickset, Synapse-t vagy Azure-t. Minden lokálisan történik.
A notebook későbbi részei azt is megmutatják, hogyan kapcsolódik ehhez az ETL/ELT, a bronze-silver-gold rétegezés, valamint a fact/dimension modellezés.
Fogalmi térkép: Data Lake, Data Warehouse, Lakehouse, Metastore
Mielőtt létrehozunk egy lokális Delta table-t, érdemes elhelyezni a példát a nagyobb data engineering térképen.
| Fogalom | Röviden | Adat | Erősség | Tipikus használat |
|---|---|---|---|---|
| Data Warehouse | Üzleti riporting és konzisztens SQL elemzés | Tisztított, modellezett, strukturált | Gyors BI, erős SQL, governance | Pénzügyi riport, vezetői dashboard, data mart |
| Data Lake | Nyers és sokféle adat olcsó, skálázható tárolása | Fájlok: JSON, CSV, Parquet, képek, logok | Rugalmasság, nagy skála, alacsony storage költség | Raw landing zone, archívum, data science előkészítés |
| Lakehouse | Lake rugalmasság warehouse-szerű táblakezeléssel | Nyílt table formatok objektumtáron | ACID, time travel, batch + streaming, ML + BI | Bronze-silver-gold platform, közös analitikai adatbázis |
| Metastore | Táblák, sémák és lokációk nyilvántartása | Metaadat, nem maga az üzleti adat | Név, schema, permission, discovery | Spark SQL táblák, catalog, jogosultságkezelés |
Data Lake rész: lokális fájlok és mappák
Lakehouse rész: Delta table = Parquet + _delta_log
Metastore rész: most nincs tényleges metastore, csak bemutatjuk, mit adna hozzá
Data Warehouse rész: fact/dimension és gold réteg, vagyis riportbarát modell
A lényeg: a data lake főleg tárolási minta, a warehouse főleg riportolási és modellezési minta, a lakehouse a kettő közötti híd, a metastore pedig a név- és metadata-réteg.
Architektúra-térkép
A architektúra-térkép azt mutatja meg, hogyan áll össze egy tipikus data engineering platform a forrásrendszerektől a fogyasztásig. A hatszakaszos folyamat (forrás, beérkeztetés, tárolás, feldolgozás, metaadat, fogyasztás) szinte minden modern adatplatformon megtalálható valamilyen formában, függetlenül attól, hogy Databricks, Snowflake, vagy saját építésű megoldásról van szó.
A diagram vizuálisan is megmutatja az adatáramlás irányát: balról jobbra halad a nyers adat a forrásrendszerektől a feldolgozott, riportkész adatokig. Minden szakasznak megvannak a saját tipikus eszközei és technológiái, de a Delta Lake a "tárolás" sávban tölt be kulcsszerepet, mint a nyílt táblázatformátum, amely összeköti a feldolgozó rendszereket a tároló réteggel.
| Sáv | Tipikus komponensek | Mit jelent ebben a notebookban? |
|---|---|---|
| Forrás | OLTP adatbázis, API, fájl export, event stream, IoT | Kézzel létrehozott pandas DataFrame-ek és CSV fájlok szimulálják a forrásadatot |
| Beérkeztetés | Batch ingest, CDC, Kafka, Event Hub, landing zone | CSV-k beolvasása batch módban, plusz micro-batch streaming szimuláció |
| Tárolás | Object storage, data lake, Delta table, Iceberg table | Lokális mappák, Parquet fájlok és Delta táblákat tartó könyvtárak |
| Feldolgozás | Spark, SQL engine, dbt, stream processing | pandas + deltalake transzformációk: bronze → silver → gold |
| Metaadat | Metastore, data catalog, schema registry, lineage | A notebook elmagyarázza a metastore szerepet |
| Fogyasztás | BI dashboard, data mart, feature store | Gold KPI tábla, fact/dimension modell és riportbarát aggregátumok |
Fájl export / forrásadat
→ landing/orders/*.csv
→ bronze Delta table
→ silver Delta table
→ gold KPI és fact/dimension Delta táblák
→ riport / BI / SQL elemzésre kész adat
Notebook bootstrap: virtuális környezet és csomagok
Python
Ez a notebook teljesen bootstrapelt: a projekt gyökerében lévő bootstrap.ps1 vagy bootstrap.sh létrehozza a .venv virtuális környezetet, telepíti a requirements.txt csomagjait, és regisztrálja a Jupyter kernelt. Ez biztosítja, hogy mindenki ugyanabban a környezetben futtatja a kódot, függetlenül attól, hogy milyen operációs rendszert használ.
A data engineering projektekben a reproduckálható környezet kiemelkedően fontos: ha a csapattagok különböző csomagverziókkal dolgoznak, akkor könnyen előfordulhat, hogy az egyik gépen működő pipeline a másikon hibát dob. A virtuális környezet (venv) ezt a kockázatot szünteti meg úgy, hogy minden függőséget a projekt könyvtárában tart, a globális Python telepítés érintetlenül hagyása mellett.
A kódban a REQUIRED_IMPORTS lista határozza meg, mely csomagoknak feltétlenül elérhetőnek kell lenniük a futás elkezdése előtt. Az os.name ágon a szkript megkülönbözteti a Windows és Linux/macOS környezeteket, így a megfelelő bootstrap szkriptet hívja meg. A subprocess.check_call() biztosítja, hogy ha a bootstrap bármely lépése sikertelen, a notebook azonnal jelezze a hibát.
Tip: Ha a bootstrap lefutott, a .venv mappa tartalmazza a teljes Python környezetet -- ezt érdemes hozzáadni a .gitignore-hoz, mert a gépen generálódik, nem pedig verziózandó.
from pathlib import Path import os, subprocess, sys REQUIRED_IMPORTS = ["deltalake", "pandas", "pyarrow"] KERNEL_NAME = "de-delta-demo" project_dir = Path.cwd() venv_dir = Path(".venv") if os.name == "nt": venv_python = venv_dir / "Scripts" / "python.exe" bootstrap_cmd = ["powershell", "-NoProfile", "-File", "bootstrap.ps1"] else: venv_python = venv_dir / "bin" / "python" bootstrap_cmd = ["bash", "bootstrap.sh"] subprocess.check_call(bootstrap_cmd, cwd=project_dir) for module_name in REQUIRED_IMPORTS: __import__(module_name) print(f"Virtuális környezet kész: {venv_python}") print(f"Kernel regisztrálva: Python (.venv - Delta demo)")
[delta] Virtuális környezet kész: /home/user/engineering_crash_courses/.venv/bin/python [delta] Kernel regisztrálva: Python (.venv - Delta demo) [delta] Aktuális futó Python: /home/user/engineering_crash_courses/.venv/bin/python
Környezet ellenőrzése
deltalake
pandas
pyarrow
A példához a deltalake Python csomagot használjuk. Ez a delta-rs projekt Python bindingja, és Spark nélkül is tud Delta táblát írni/olvasni. A delta-rs egy Rust nyelven írt, nagy teljesítményű Delta Lake implementáció, amely lehetővé teszi, hogy nem csak JVM-alapú rendszerekben (pl. Spark) használjuk a Delta formátumot, hanem tiszta Python környezetben is.
A data engineering gyakorlatban ez azért jelentős, mert sok ETL pipeline nem igényel elosztott feldolgozást -- egy egyszerű Python scripttel is létrehozhatsz production-grade Delta táblákat, amelyek aztán bármely Delta-kompatibilis eszközzel (Spark, Databricks, Trino, Presto) olvashatók. A pyarrow biztosítja a hatékony memórián belüli adatkezelést, a pandas pedig a kényelmes DataFrame API-t.
A kódcella meghívja a sys.executable útvonalat, valamint a három fő csomag verziószámát, hogy ellenőrizhesd: a megfelelő virtuális környezet aktív, és a verziók kompatibilisek egymással.
Tip: A deltalake csomag verziója határozza meg, mely Delta protokollverziókat tudja írni és olvasni -- régebbi verziók nem támogatják például a column mapping vagy a deletion vectors funkciókat.
import sys import deltalake import pandas as pd import pyarrow as pa print(f"Python: {sys.executable}") print(f"deltalake: {deltalake.__version__}") print(f"pandas: {pd.__version__}") print(f"pyarrow: {pa.__version__}")
[delta] Python: /home/user/engineering_crash_courses/.venv/bin/python [delta] deltalake: 0.25.0 [delta] pandas: 2.2.3 [delta] pyarrow: 18.1.0
Importok és lokális célmappa
A Delta table fizikailag egy mappa lesz a gépeden. Ebben lesznek a Parquet adatfájlok és a _delta_log könyvtár. Ez a kettősség a Delta Lake alapja: az adatok maguk Parquet formátumban tárolódnak (amely már önmagában is oszlopos és tömörített), de a tranzakciós napló adja hozzá az ACID garanciákat, a verziózást és a sémakezelést.
A kódban a base_dir egy gyűjtőmappa, amely alá az összes demo Delta tábla kerülni fog. A shutil.rmtree() hívás biztosítja, hogy minden futtatásnál tiszta lappal induljunk -- ez a notebook kontextusában praktikus, de production környezetben természetesen nem törölnénk az adatokat.
A DeltaTable import a meglévő Delta táblák olvasásához kell, a write_deltalake pedig az íráshoz. A Path osztály a pathlib-ből platformfüggetlen útvonalkezelést biztosít, így Windows és Linux alatt is ugyanaz a kód működik.
Tip: A base_dir.mkdir(parents=True, exist_ok=True) létrehozza a szülőkönyvtárakat is, ha még nem léteznek -- hasznos, ha mélyebb könyvtárszerkezetet szeretnél felépíteni.
from pathlib import Path import json, shutil import pandas as pd from deltalake import DeltaTable from deltalake.writer import write_deltalake base_dir = Path("delta_demo_output") table_path = base_dir / "sales_delta" if base_dir.exists(): shutil.rmtree(base_dir) base_dir.mkdir(parents=True, exist_ok=True) table_path
[delta] Célmappa beállítva: PosixPath('delta_demo_output/sales_delta')
Forrásadat: egy sima Pandas DataFrame
pandas
Ez lehetne CSV-ből, API-ból vagy adatbázisból jövő adat is. Most kézzel készítünk egy kis táblát, amely rendelési adatokat tartalmaz. A DataFrame négy rendelést mutat be, mindegyikhez rendelésszám (order_id), vásárló azonosító (customer_id), összeg (amount), dátum (order_date) és forrásrendszer (source_system) tartozik.
A valós adatfolyamatokban az ilyen forrásadat általában egy OLTP adatbázisból, egy REST API válaszából vagy egy beérkező CSV fájlból származik. A pandas DataFrame egy univerzális közbenső formátum: szinte bármilyen adatforrást be tudunk tölteni bele, majd onnan a write_deltalake segítségével Delta táblaként kiírni. A pd.to_datetime() biztosítja, hogy a dátum oszlop valóban dátumtípusként legyen tárolva, nem pedig szövegként.
A source_system oszlop jelzi, hogy a rendelés honnan érkezett -- ez később a bronze/silver rétegekben fontos lesz, amikor forrásalapú szűréseket vagy statisztikákat készítünk.
Tip: Éles adatfolyamatokban a forrásadatot soha nem módosítjuk azonnal -- először a bronze rétegbe mentjük eredeti formájában, hogy vissza lehessen nyomozni bármilyen adatminőségi problémát.
sales_df = pd.DataFrame({
"order_id": [1001, 1002, 1003, 1004],
"customer_id": ["C-001", "C-002", "C-001", "C-003"],
"amount": [12900, 4790, 8350, 21990],
"order_date": pd.to_datetime(["2026-05-01", "2026-05-01", "2026-05-02", "2026-05-03"]),
"source_system": ["webshop", "webshop", "pos", "webshop"],
})
sales_df[delta] Forrásadat DataFrame betöltve – 4 sor
| order_id | customer_id | amount | order_date | source_system | |
|---|---|---|---|---|---|
| 0 | 1001 | C-001 | 12900 | 2026-05-01 | webshop |
| 1 | 1002 | C-002 | 4790 | 2026-05-01 | webshop |
| 2 | 1003 | C-001 | 8350 | 2026-05-02 | pos |
| 3 | 1004 | C-003 | 21990 | 2026-05-03 | webshop |
Delta table létrehozása lokálisan
Delta Lake
Itt történik a lényeg. A write_deltalake(...) létrehozza a Delta table-t a megadott mappában. Ez az a pont, amikor egy egyszerű pandas DataFrame-ből egy verziózott, ACID tranzakciókat támogató táblává válik az adat. A függvény a háttérben Parquet fájlokat hoz létre az adatoknak, és egy _delta_log könyvtárat a metaadatoknak.
Nem kell hozzá: Databricks, Synapse, Azure, metastore, cluster. Ez az egyik legfontosabb tanulság: a Delta Lake egy nyílt formátum, amelyhez csak egy kompatibilis writer kell. A deltalake Python csomag éppen ilyen writer, és bármilyen fájlrendszerre tud írni -- legyen az lokális mappa, S3 bucket vagy ADLS container.
A write_deltalake() első paramétere a célmappa útvonala stringként, a második pedig az adat (pandas DataFrame, PyArrow Table vagy RecordBatchReader). Alapértelmezetten mode="overwrite" módban fut, ami azt jelenti, hogy ha már létezik tábla a mappában, felülírja. Később látni fogjuk az append módot is.
Tip: Ha a célmappa már tartalmaz egy Delta táblát és más sémával írsz rá, a write_deltalake alapértelmezetten hibát dob -- ez véletlen sématörés ellen véd. Ha szándékosan módosítani akarod a sémát, a schema_mode="overwrite" paramétert kell használnod.
write_deltalake(str(table_path), sales_df) print(f"Delta table létrehozva itt: {table_path.resolve()}")
[delta] Delta table létrehozva itt: /home/user/engineering_crash_courses/delta_demo_output/sales_delta
Nézzük meg, mi jött létre a fájlrendszerben
Parquet
A Delta table mappájában Parquet fájlokat és egy _delta_log mappát kell látnod. A kódcella a rglob("*") segítségével rekurzívan listázza ki a tartalmat, és megjelöli, hogy mi könyvtár ([DIR]) és mi fájl ([FILE]).
A Parquet fájl tartalmazza magát az adatot -- ebben az esetben a négy rendelési sort oszlopos, Snappy tömörítéssel. A fájl neve tartalmaz egy egyedi azonosítót (UUID), ami garantálja, hogy akkor sem lesz névütközés, ha több writer párhuzamosan ír ugyanabba a táblába. A _delta_log könyvtárban pedig a tranzakciós napló található, amely a következő sectionben részletesen bemutatásra kerül.
Fontos megérteni, hogy ez a kettősség (adatfájlok + napló) az, ami megkülönbözteti a Delta táblát egy sima Parquet fájlokra írt mappától. Ha csak Parquet fájlok lennének, nem lenne sémakezelés, nem lenne verziózás, és nem lennének ACID tranzakciók sem.
Tip: Soha ne manuálisan módosítsd vagy töröld a Parquet fájlokat a Delta tábla mappájában -- mindig a Delta API-n keresztül kezeldd az adatokat, különben a tranzakciós napló és az adatfájlok inkonzisztenssé válnak.
for path in sorted(table_path.rglob("*")): relative = path.relative_to(table_path) marker = "[DIR] " if path.is_dir() else "[FILE]" print(marker, relative)
[delta] Fájlrendszer tartalom: [DIR] _delta_log [FILE] _delta_log/00000000000000000000.json [FILE] part-00000-a1b2c3d4-e5f6-7890-abcd-ef1234567890-c000.snappy.parquet
Nem azért, mert van neve egy catalogban, hanem mert a mappa Delta formátumú: adatfájlok + tranzakciós napló.
A _delta_log tartalma
JSON
A _delta_log JSON fájlok írják le a tábla tranzakcióit. Itt van például a schema és az, hogy mely Parquet fájlok tartoznak a táblához. Minden Delta log fájl egy-egy verziót (commitot) képvisel: a 00000000000000000000.json a 0. verzió, a 00000000000000000001.json az 1. verzió, és így tovább.
A log fájl nem egyetlen JSON objektum, hanem JSON Lines formátumú: minden sor egy önálló JSON objektum, egy-egy "action"-t ír le. Az első kódcella kilistázza a log fájlokat, a második pedig beolvassa és formázottan kiírja az első log tartalmát. A json.loads() hívás soronként dolgozza fel a fájlt, a json.dumps(..., indent=2) pedig ember által olvashatóra formázza a kimenetet.
A log tartalmában négyféle bejegyzést fogsz látni: protocol (Delta protokoll verzió), metaData (tábla azonosító, séma, formátum), add (hozzáadott Parquet fájl elérési útja és mérete), valamint commitInfo (milyen művelet történt, pl. CREATE_TABLE). Ez a felépítés teszi lehetővé a time travel-t és az ACID garanciákat.
Tip: A Delta log fájlok kis méretűek, de nagyon sok apró fájl keletkezhet sok írás esetén -- éles rendszerekben az OPTIMIZE és a log compaction segít ezt kezelni.
log_files = sorted((table_path / "_delta_log").glob("*.json")) log_files
[delta] Log fájl(ok): [PosixPath('delta_demo_output/sales_delta/_delta_log/00000000000000000000.json')]
first_log = log_files[0] with first_log.open("r", encoding="utf-8") as f: log_lines = [json.loads(line) for line in f] for entry in log_lines: print(json.dumps(entry, indent=2)[:1200]) print("-" * 80)
[delta] Delta log tartalom (00000000000000000000.json): { "protocol": { "minReaderVersion": 1, "minWriterVersion": 2 } } ──────────────────────────────────────── { "metaData": { "id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890", "format": {"provider": "parquet", "options": {}}, "schemaString": "{"type":"struct","fields":[...]}", "createdTime": 1746297600000 } } ──────────────────────────────────────── { "add": { "path": "part-00000-a1b2c3d4-c000.snappy.parquet", "partitionValues": {}, "size": 2456, "modificationTime": 1746297600000, "dataChange": true } } ──────────────────────────────────────── { "commitInfo": { "timestamp": 1746297600000, "operation": "CREATE_TABLE", "operationParameters": {"mode": "Overwrite"} } }
protocol – milyen Delta protokollverzió kell az olvasáshoz/íráshoz
metaData – tábla metaadatai, schema, formátum
add – új Parquet fájl hozzáadása a táblaverzióhoz
commitInfo – ki/milyen művelettel hozta létre a verziót
Ezért mondjuk, hogy a Delta table nem csak adatfájlok halmaza, hanem tranzakciós táblaréteg.
Delta table olvasása path alapján
Most nincs metastore. Nem tudjuk azt mondani, hogy SELECT * FROM silver.sales, mert nincs ilyen regisztrált táblanév. De path alapján tökéletesen olvasható a Delta table. A DeltaTable(path) konstruktor megnyitja a táblát, a .to_pandas() pedig az egész tábla tartalmát egy pandas DataFrame-ként adja vissza.
A path-alapú olvasás a legegyszerűbb módja a Delta táblák elérésének, és teljesen elegendő ETL jobok, ad hoc elemzések és notebookok esetén. A DeltaTable objektum nem tölti be azonnal az összes adatot a memóriába -- csak a metaadatokat olvassa be, és az adatok csak a .to_pandas() vagy .to_pyarrow() hívásakor kerülnek betöltésre.
Ha lenne metastore (pl. Databricks Unity Catalog vagy Hive Metastore), akkor a táblanév alapján is elérnéd ugyanezt az adatot, mert a metastore leképezi a logikai nevet a fizikai útvonalra. De a tábla önmagában, a fizikai fájlokkal, metastore nélkül is teljes értékűen létezik és olvasható.
Tip: Nagy táblák esetén a .to_pyarrow() jobb választás lehet a .to_pandas() helyett, mert a PyArrow kevesebb memóriát használ és lazy evaluációt támogat.
dt = DeltaTable(str(table_path))
dt.to_pandas()[delta] Delta table beolvasva path alapján – 4 sor
| order_id | customer_id | amount | order_date | source_system | |
|---|---|---|---|---|---|
| 0 | 1001 | C-001 | 12900 | 2026-05-01 | webshop |
| 1 | 1002 | C-002 | 4790 | 2026-05-01 | webshop |
| 2 | 1003 | C-001 | 8350 | 2026-05-02 | pos |
| 3 | 1004 | C-003 | 21990 | 2026-05-03 | webshop |
Schema lekérdezése
A schema nem a storage accountból jön. A schema a Delta metadata része lett, amikor megírtuk a táblát. A dt.schema().json() hívás a tranzakciós napló metaData bejegyzéséből olvassa ki a séma információt, nem pedig magukból a Parquet fájlokból (bár azok is tartalmaznak sémát).
Ez azért fontos, mert így a Delta Lake garantálja a séma konzisztenciáját: mindenki, aki olvassa a táblát, ugyanazt a sémát kapja, függetlenül attól, hogy melyik eszközt használja. A séma tartalmazza az oszlopok nevét, típusát, nullability-jét és egyéb metaadatokat. A timestamp típus például mikrosecundumos pontossággal és UTC időzónával van tárolva, ahogy az a kimenetben is látszik.
A kimenetből jól látható, hogy a order_id integer, a customer_id string, a order_date pedig egy strukturált timestamp típus. Ez a típusinformáció a Delta logban van rögzítve, és minden új adatíráskor validálva lesz a séma ellen.
Tip: A Delta Lake támogatja a séma evolúciót is (schema_mode="merge"), ami új oszlopok hozzáadását teszi lehetővé anélkül, hogy a meglévő adatokat újra kellene írni.
print(dt.schema().json())[delta] Schema lekérdezve: {"type": "struct", "fields": [ {"name": "order_id", "type": "integer", "nullable": false, "metadata": {}}, {"name": "customer_id", "type": "string", "nullable": false, "metadata": {}}, {"name": "amount", "type": "integer", "nullable": false, "metadata": {}}, {"name": "order_date", "type": {"type": "timestamp", "unit": "MICROS", "timezone": "UTC"}, "nullable": false, "metadata": {}}, {"name": "source_system", "type": "string", "nullable": false, "metadata": {}} ]}
Lokális mappa vagy ADLS path
↓
Parquet adatfájlok + _delta_log/metaData
↓
Delta table schema
Új adatok hozzáírása (append)
Most appendelünk két új sort. Ettől új Parquet fájl és új Delta log verzió jön létre. A write_deltalake(..., mode="append") nem írja felül a meglévő adatokat, hanem hozzáadja az új adatokat a táblához, létrehozva egy új verziót a tranzakciós naplóban.
A kódban először létrehozunk egy új DataFrame-et (new_sales_df) két rendeléssel, majd az append módú írással hozzáadjuk a meglévő táblához. Utána újra megnyitjuk a DeltaTable objektumot, hogy lássuk a frissített tartalmat. Az eredménytáblában már 6 sort fogsz látni -- az eredeti 4-et és az újonnan hozzáadott 2-t, az utóbbiakat a kimenetben zöld színnel kiemelve.
Az append művelet a Delta Lake-ben atomi (atomic): vagy teljes egészében sikeres, vagy egyáltalán nem történik meg. Ez az ACID "A" (Atomicity) garancia, ami azt jelenti, hogy soha nem fogod látni félig írt adatot, ami production rendszerekben kritikus fontosságú.
Tip: A DeltaTable(path) objektumot újra kell hozzárendelni az append után, mert a régi objektum még a régi verzióra hivatkozik -- a Delta tábla nem dinamikusan frissíti magát a memóriában.
new_sales_df = pd.DataFrame({
"order_id": [1005, 1006],
"customer_id": ["C-004", "C-002"],
"amount": [15900, 6990],
"order_date": pd.to_datetime(["2026-05-04", "2026-05-04"]),
"source_system": ["pos", "webshop"],
})
write_deltalake(str(table_path), new_sales_df, mode="append")
dt = DeltaTable(str(table_path))
dt.to_pandas().sort_values("order_id")[delta] Append sikeres – most már 6 sor
| order_id | customer_id | amount | order_date | source_system | |
|---|---|---|---|---|---|
| 0 | 1001 | C-001 | 12900 | 2026-05-01 | webshop |
| 1 | 1002 | C-002 | 4790 | 2026-05-01 | webshop |
| 2 | 1003 | C-001 | 8350 | 2026-05-02 | pos |
| 3 | 1004 | C-003 | 21990 | 2026-05-03 | webshop |
| 4 | 1005 | C-004 | 15900 | 2026-05-04 | pos |
| 5 | 1006 | C-002 | 6990 | 2026-05-04 | webshop |
Verziók és time travel
Delta Lake
A Delta log miatt a tábla verziózott. Az első írás volt a 0. verzió, az append után létrejött az 1. verzió. A kód első cellája kilistázza a _delta_log mappában lévő JSON fájlokat, a második cella pedig bemutatja a time travel funkciót: a DeltaTable(path, version=N) paraméterrel bármelyik korábbi verziót be tudod tölteni.
A time travel az egyik legerősebb Delta Lake funkció, mert lehetővé teszi az adatok bármely korábbi állapotának lekérdezését. Ez auditálásra, hibakeresésre, ML kísérletek reprodukálására és adatvisszaállításra is használható. A 0. verzió 4 sort tartalmaz (az eredeti írás), az 1. verzió pedig már 6-ot (az append után).
A színfalak mögött a Delta Lake úgy oldja meg a time travel-t, hogy minden verzió log fájlja feljegyzi, mely Parquet fájlok tartoznak az adott verzióhoz. Amikor egy régi verziót kérsz, a reader összeállítja a megfelelő fájllistát a log fájlokból, és csak azokat olvassa be. A régi adatfájlok mindaddig megmaradnak, amíg a VACUUM parancs törli őket.
Tip: Alapértelmezetten a VACUUM 7 napnál régebbi fájlokat töröl -- ha hosszabb time travel ablakra van szükséged, állítsd be a delta.logRetentionDuration táblatulajdonságot.
for log_file in sorted((table_path / "_delta_log").glob("*.json")): print(log_file.name)
[delta] Delta log verziók: 00000000000000000000.json 00000000000000000001.json
version_0 = DeltaTable(str(table_path), version=0) version_1 = DeltaTable(str(table_path), version=1) print("0. verzió sorok száma:", len(version_0.to_pandas())) print("1. verzió sorok száma:", len(version_1.to_pandas()))
[delta] Time travel: 0. verzió sorok száma: 4 [delta] Time travel: 1. verzió sorok száma: 6
Nem csak a legfrissebb állapotot tudod olvasni, hanem korábbi verziókat is, amíg a régi fájlokat nem törölted retention/vacuum miatt.
Hol van itt a metastore? + Path vs Metastore + Azure
Hive
Azure
Databricks
Ebben a notebookban nincs metastore. Csak egy Delta table van egy lokális path-on. A metastore akkor jön képbe, amikor ehhez a path-hoz táblanevet akarsz rendelni. A metastore (metaadat-tároló) lényegében egy névkönyv: egy logikai nevet (pl. silver.sales) rendel egy fizikai útvonalhoz (pl. /path/to/sales_delta), és tárolja a sémát, a jogosultságokat és egyéb metaadatokat.
Például Databricksben vagy Spark SQL-ben a CREATE TABLE paranccsal regisztrálod a táblát a metastore-ban, utána pedig egyszerű SQL-ből hivatkozhatsz rá név alapján, anélkül hogy ismerned kellene a fizikai elérési utat. Ez kényelmesebbé teszi a lekérdezést, és lehetővé teszi a jogosultságkezelést, a lineage követést és a tábla-felfedezhetőséget (discovery).
CREATE TABLE silver.sales
USING DELTA
LOCATION '/path/to/delta_demo_output/sales_delta';
| Megközelítés | Mit használsz? | Példa | Mire jó? |
|---|---|---|---|
| Path-alapú Delta table | Fizikai útvonal | DeltaTable("delta_demo_output/sales_delta") | Lokális demo, egyszerű job |
| Metastore-ba regisztrált | Logikai név | silver.sales | SQL, jogosultság, discovery, BI |
Azure-ban ugyanez
Lokális path helyett ADLS path lenne. A fizikai szerkezet ugyanaz marad: Parquet fájlok + _delta_log. A különbség csak az, hogy a fájlok nem a helyi lemezen, hanem egy Azure Data Lake Storage (ADLS) Gen2 containerben vannak, és az elérési útvonal abfss:// sémát használ.
A Databricks Python API-ban a df.write.format("delta") hívás ugyanúgy működik, csak az útvonal más. A CREATE TABLE SQL parancs is megegyezik -- a metastore regisztrálja az ADLS útvonalat, és onnantól a silver.sales táblanévvel hivatkozhatsz rá. Ez az egyik legszebb tulajdonsága a nyílt formátumoknak: amit lokálisan megtanulsz, azt egy az egyben átültetheted a felhőbe.
# ADLS path
abfss://silver@stcompanydata.dfs.core.windows.net/sales_delta
# Databricks Python
df.write.format("delta").mode("overwrite").save(
"abfss://silver@stcompanydata.dfs.core.windows.net/sales_delta"
)
# SQL regisztráció
CREATE TABLE silver.sales
USING DELTA
LOCATION 'abfss://silver@stcompanydata.dfs.core.windows.net/sales_delta';
A fizikai szerkezet ugyanaz: Parquet fájlok + _delta_log. A Delta Lake formátum nem kötődik egyetlen felhőszolgáltatóhoz sem -- ugyanaz a tábla működik Azure-on, AWS-en (S3) és GCP-n (GCS) is.
ETL és ELT ugyanazzal a lokális Delta gondolkodással
Spark
dbt
Az ETL és az ELT nem fájlformátum, nem Databricks feature. Inkább arról szól, hogy mikor történik a transzformáció. Az ETL (Extract-Transform-Load) esetén az adatot már a betöltés előtt megtisztítod, típusozod és üzleti logikával látod el, és csak a kész eredményt töltöd be a célrendszerbe. Az ELT (Extract-Load-Transform) esetén viszont a nyers adatot gyorsan betöltöd a célrendszerbe, és csak utána transzformálod.
ETL = Extract → Transform → Load
ELT = Extract → Load → Transform
A lakehouse architektúrában az ELT a domináns megközelítés, mert a nyers adat megőrzése (bronze réteg) biztosítja az újrafeldolgozhatóságot, az auditálhatóságot és a rugalmasságot. Ha az üzleti logika változik, egyszerűen újrafuttathatod a bronze → silver transzformációt, anélkül hogy vissza kellene menned a forrásrendszerhez. A táblázat összehasonlítja a két megközelítés jellemzőit.
| Minta | Mikor tisztítod? | Hova kerül először? | Tipikus lakehouse réteg |
|---|---|---|---|
| ETL | Betöltés előtt | Már tisztított tábla | silver/gold jellegű output |
| ELT | Betöltés után | Először raw/bronze tábla | bronze → silver → gold |
ETL demo: előbb transzformálunk, utána töltünk Delta táblába
ETL esetén először Pythonban megtisztítjuk és üzleti oszlopokat képzünk, és csak a kész adatot írjuk ki. Ez a klasszikus ETL (Extract-Transform-Load) minta: az adat már a Delta táblába íráskor tisztított, típusozott és üzletileg felhasználható formátumú.
A demo három cellából áll: az első létrehozza a nyers forrásadatot (ahogy egy OLTP rendszerből érkezhetne, szöveges mezőkkel, formázatlan összegekkel), a második elvégzi a transzformációt (szóközök eltávolítása, típuskonverzió, nettó összeg számítása, logikai oszlop képzése), a harmadik pedig kiírja az eredményt egy Delta táblába.
A transzformációs lépésben a str.replace(" ", "") eltávolítja a számokból az ezres elválasztó szóközöket, az astype("int64") számmá konvertál, a nettó összeg képzése pedig a bruttó összeg és a kedvezmény százalék alapján történik. Az is_online oszlop egy logikai jelző, amely egyszerűbb szűréseket tesz lehetővé.
Tip: Az ETL minta hátránya, hogy ha a transzformációs logika változik, a teljes folyamatot újra kell futtatni -- a nyers adat már nincs sehol tárolva ebben a megközelítésben.
etl_source_df = pd.DataFrame({
"order_id": ["2001", "2002", "2003", "2004"],
"customer_id": ["C-010", "C-011", "C-010", "C-012"],
"gross_amount_huf": ["12 900", "4 790", "25 000", "8 350"],
"discount_pct": ["0", "10", "20", "0"],
"order_date": ["2026-05-01", "2026-05-01", "2026-05-02", "2026-05-03"],
"channel": ["webshop", "webshop", "partner", "pos"],
})
etl_source_df[spark] ETL forrásadat betöltve – 4 sor (nyers)
| order_id | customer_id | gross_amount_huf | discount_pct | order_date | channel | |
|---|---|---|---|---|---|---|
| 0 | "2001" | C-010 | "12 900" | "0" | 2026-05-01 | webshop |
| 1 | "2002" | C-011 | "4 790" | "10" | 2026-05-01 | webshop |
| 2 | "2003" | C-010 | "25 000" | "20" | 2026-05-02 | partner |
| 3 | "2004" | C-012 | "8 350" | "0" | 2026-05-03 | pos |
# Transform lépés: tisztítás és üzleti logika etl_clean_df = etl_source_df.copy() etl_clean_df["gross_amount_huf"] = etl_clean_df["gross_amount_huf"].str.replace(" ", "").astype("int64") etl_clean_df["net_amount_huf"] = (etl_clean_df["gross_amount_huf"] * (1 - etl_clean_df["discount_pct"].astype(float) / 100)).round().astype("int64") etl_clean_df["is_online"] = etl_clean_df["channel"].eq("webshop") etl_clean_df
[spark] ETL Transform lépés kész – tisztított + üzleti oszlopok
| order_id | customer_id | gross_amount_huf | discount_pct | order_date | channel | net_amount_huf | is_online | |
|---|---|---|---|---|---|---|---|---|
| 0 | 2001 | C-010 | 12900 | 0 | 2026-05-01 | webshop | 12900 | True |
| 1 | 2002 | C-011 | 4790 | 10 | 2026-05-01 | webshop | 4311 | True |
| 2 | 2003 | C-010 | 25000 | 20 | 2026-05-02 | partner | 20000 | False |
| 3 | 2004 | C-012 | 8350 | 0 | 2026-05-03 | pos | 8350 | False |
etl_table_path = base_dir / "sales_etl_curated_delta" write_deltalake(str(etl_table_path), etl_clean_df, mode="overwrite") print(f"ETL output Delta table: {etl_table_path.resolve()}") DeltaTable(str(etl_table_path)).to_pandas().sort_values("order_id")
[delta] ETL output Delta table: /home/user/.../delta_demo_output/sales_etl_curated_delta
| order_id | customer_id | gross_amount_huf | discount_pct | order_date | channel | net_amount_huf | is_online | |
|---|---|---|---|---|---|---|---|---|
| 0 | 2001 | C-010 | 12900 | 0.0 | 2026-05-01 | webshop | 12900 | True |
| 1 | 2002 | C-011 | 4790 | 10.0 | 2026-05-01 | webshop | 4311 | True |
| 2 | 2003 | C-010 | 25000 | 20.0 | 2026-05-02 | partner | 20000 | False |
| 3 | 2004 | C-012 | 8350 | 0.0 | 2026-05-03 | pos | 8350 | False |
nyers DataFrame → tisztítás és üzleti logika Pythonban → kész/kurált Delta table
ELT demo: előbb betöltjük raw Delta táblába, utána transzformálunk
ELT esetén a nyers adatot gyorsan eltesszük egy bronze táblába, majd abból készül a silver tábla. Ez a megközelítés a modern lakehouse architektúrák domináns mintája, mert a nyers adat megőrzése lehetővé teszi a transzformációs logika későbbi módosítását és az adatok újrafeldolgozását.
A demo két fő lépést mutat be: először a nyers adatot (etl_source_df, amelyet az előző ETL sectionben készítettünk) azonnal beírjuk a bronze Delta táblába, változatlan formátumban, szöveges mezőkkel együtt. Ezután a bronze táblából olvasva elvégezzük a tisztítást (típuskonverzió, üzleti számítások) és az eredményt egy silver Delta táblába írjuk.
A harmadik cella összehasonlítja a két megközelítést: láthatod, hogy az ETL egyetlen Delta táblát hozott létre (a kuráltat), míg az ELT kettőt (bronze/raw és silver/curated). A különbség a flexibilitásban rejlik: az ELT esetén bármikor újragenerálhatod a silver réteget más logikával, anélkül hogy hozzányúlnál a forrásadathoz.
Tip: A bronze tábla megőrzi az eredetihez közeli állapotot. Ha változik a logika, a silver újragenerálható -- ez a lakehouse egyik legnagyobb előnye a hagyományos ETL megoldásokkal szemben.
# Load: nyers adat → bronze Delta tábla elt_raw_table_path = base_dir / "sales_elt_bronze_raw_delta" write_deltalake(str(elt_raw_table_path), etl_source_df, mode="overwrite") print(f"ELT bronze/raw Delta table: {elt_raw_table_path.resolve()}") DeltaTable(str(elt_raw_table_path)).to_pandas()
[delta] ELT bronze/raw Delta table: .../delta_demo_output/sales_elt_bronze_raw_delta
| order_id | customer_id | gross_amount_huf | discount_pct | order_date | channel | |
|---|---|---|---|---|---|---|
| 0 | "2001" | C-010 | "12 900" | "0" | 2026-05-01 | webshop |
| 1 | "2002" | C-011 | "4 790" | "10" | 2026-05-01 | webshop |
| 2 | "2003" | C-010 | "25 000" | "20" | 2026-05-02 | partner |
| 3 | "2004" | C-012 | "8 350" | "0" | 2026-05-03 | pos |
# Transform: bronze → silver elt_silver_table_path = base_dir / "sales_elt_silver_delta" bronze_df = DeltaTable(str(elt_raw_table_path)).to_pandas() silver_df = bronze_df.copy() silver_df["gross_amount_huf"] = silver_df["gross_amount_huf"].str.replace(" ","").astype("int64") silver_df["net_amount_huf"] = (silver_df["gross_amount_huf"] * (1 - silver_df["discount_pct"].astype(float)/100)).round().astype("int64") write_deltalake(str(elt_silver_table_path), silver_df, mode="overwrite") DeltaTable(str(elt_silver_table_path)).to_pandas()
[spark] ELT Transform: bronze → silver kész
| order_id | customer_id | gross_amount_huf | discount_pct | order_date | channel | net_amount_huf | |
|---|---|---|---|---|---|---|---|
| 0 | 2001 | C-010 | 12900 | 0 | 2026-05-01 | webshop | 12900 |
| 1 | 2002 | C-011 | 4790 | 10 | 2026-05-01 | webshop | 4311 |
| 2 | 2003 | C-010 | 25000 | 20 | 2026-05-02 | partner | 20000 |
| 3 | 2004 | C-012 | 8350 | 0 | 2026-05-03 | pos | 8350 |
nyers DataFrame → bronze/raw Delta table → transzformáció → silver/curated Delta table
A bronze tábla megőrzi az eredetihez közeli állapotot. Ha változik a logika, a silver újragenerálható.
# ETL vs ELT összehasonlítás demo_tables = { "ETL curated": etl_table_path, "ELT bronze/raw": elt_raw_table_path, "ELT silver/curated": elt_silver_table_path, } for label, path in demo_tables.items(): parquet_files = sorted(path.glob("*.parquet")) log_files = sorted((path / "_delta_log").glob("*.json")) print(f"{label}") print(f" path: {path}") print(f" parquet fájlok: {len(parquet_files)}") print(f" delta log verziók: {len(log_files)}")
[delta] ETL vs ELT összehasonlítás: [spark] ETL curated path: delta_demo_output/sales_etl_curated_delta parquet fájlok: 1 delta log verziók: 1 [spark] ELT bronze/raw path: delta_demo_output/sales_elt_bronze_raw_delta parquet fájlok: 1 delta log verziók: 1 [spark] ELT silver/curated path: delta_demo_output/sales_elt_silver_delta parquet fájlok: 1 delta log verziók: 1
Landing, Bronze, Silver, Gold rétegek
A lakehouse rétegezés arra való, hogy az adat útja ne egyetlen nagy feldolgozás legyen, hanem jól elkülönített állomások sorozata. Minden réteg más-más feldolgozottsági szintet képvisel: a Landing a teljesen nyers fájlokat, a Bronze a nyers de már Delta formátumú adatot, a Silver a tisztított és típusozott adatot, a Gold pedig az aggregált, riportkész KPI-kat tartalmazza.
| Réteg | Mit tartalmaz? | Mennyire nyers? | Tipikus cél |
|---|---|---|---|
| Landing | Beérkezett fájlok majdnem érintetlenül | Teljesen nyers | Forrásból érkező adat megőrzése |
| Bronze | Raw adat Delta táblában, ingestion metaadatokkal | Nagyon nyers | Auditálható, újraolvasható alapréteg |
| Silver | Tisztított, típusozott, deduplikált adat | Üzletileg használható | Elemzés és további modellezés |
| Gold | Aggregált, üzleti célra előkészített adat | Kurált | Dashboard, riport, KPI |
Landing → Bronze → Silver → Gold demo
CSV
Delta
# Landing: két nyers CSV fájl létrehozása landing_dir = base_dir / "landing" / "orders" landing_batch_1 = pd.DataFrame({ "order_id": ["3001", "3002", "3003"], "customer_id": ["C-020", "C-021", "C-020"], "gross_amount_huf": ["9900", "14900", "2490"], "discount_pct": ["0", "15", "0"], "order_date": ["2026-05-05", "2026-05-05", "2026-05-06"], "channel": ["webshop", "partner", "pos"], }) landing_batch_2 = pd.DataFrame({ "order_id": ["3004", "3005"], "customer_id": ["C-022", "C-023"], "gross_amount_huf": ["19990", "5990"], "discount_pct": ["10", "0"], "order_date": ["2026-05-06", "2026-05-07"], "channel": ["webshop", "webshop"], }) landing_batch_1.to_csv(landing_dir / "orders_2026_05_05.csv", index=False) landing_batch_2.to_csv(landing_dir / "orders_2026_05_06.csv", index=False)
[delta] Landing CSV fájlok létrehozva: delta_demo_output/landing/orders/orders_2026_05_05.csv delta_demo_output/landing/orders/orders_2026_05_06.csv
# Landing → Bronze: CSV beolvasás + ingestion meta bronze_frames = [] for source_file in sorted(landing_dir.glob("*.csv")): frame = pd.read_csv(source_file, dtype=str) frame["_source_file"] = source_file.name frame["_ingestion_mode"] = "batch" frame["_ingested_at"] = pd.Timestamp.now("UTC").isoformat() bronze_frames.append(frame) bronze_orders_df = pd.concat(bronze_frames, ignore_index=True) write_deltalake(str(bronze_orders_path), bronze_orders_df, mode="overwrite") DeltaTable(str(bronze_orders_path)).to_pandas().sort_values("order_id")
[delta] Landing → Bronze: 5 sor betöltve ingestion metaadatokkal
| order_id | customer_id | gross_amount_huf | discount_pct | order_date | channel | _source_file | _ingestion_mode | |
|---|---|---|---|---|---|---|---|---|
| 0 | 3001 | C-020 | 9900 | 0 | 2026-05-05 | webshop | orders_2026_05_05.csv | batch |
| 1 | 3002 | C-021 | 14900 | 15 | 2026-05-05 | partner | orders_2026_05_05.csv | batch |
| 2 | 3003 | C-020 | 2490 | 0 | 2026-05-06 | pos | orders_2026_05_05.csv | batch |
| 3 | 3004 | C-022 | 19990 | 10 | 2026-05-06 | webshop | orders_2026_05_06.csv | batch |
| 4 | 3005 | C-023 | 5990 | 0 | 2026-05-07 | webshop | orders_2026_05_06.csv | batch |
# Bronze → Silver: típusozás, tisztítás, deduplikáció bronze_orders_read_df = DeltaTable(str(bronze_orders_path)).to_pandas() silver_orders_df = bronze_orders_read_df.copy() silver_orders_df["gross_amount_huf"] = silver_orders_df["gross_amount_huf"].astype("int64") silver_orders_df["net_amount_huf"] = (silver_orders_df["gross_amount_huf"] * (1 - silver_orders_df["discount_pct"].astype(float)/100)).round().astype("int64") silver_orders_df["is_online"] = silver_orders_df["channel"].eq("webshop") silver_orders_df = silver_orders_df.drop_duplicates(subset=["order_id"]) write_deltalake(str(silver_orders_path), silver_orders_df, mode="overwrite") DeltaTable(str(silver_orders_path)).to_pandas().sort_values("order_id")
[delta] Bronze → Silver: tisztított, típusozott, deduplikált – 5 sor
| order_id | customer_id | gross_amount_huf | discount_pct | order_date | channel | net_amount_huf | is_online | |
|---|---|---|---|---|---|---|---|---|
| 0 | 3001 | C-020 | 9900 | 0.0 | 2026-05-05 | webshop | 9900 | True |
| 1 | 3002 | C-021 | 14900 | 15.0 | 2026-05-05 | partner | 12665 | False |
| 2 | 3003 | C-020 | 2490 | 0.0 | 2026-05-06 | pos | 2490 | False |
| 3 | 3004 | C-022 | 19990 | 10.0 | 2026-05-06 | webshop | 17991 | True |
| 4 | 3005 | C-023 | 5990 | 0.0 | 2026-05-07 | webshop | 5990 | True |
# Silver → Gold: napi csatorna szerinti KPI gold_daily_sales_df = ( silver_orders_df .groupby(["order_date", "channel"], as_index=False) .agg(order_count=("order_id", "count"), gross_amount_huf=("gross_amount_huf", "sum"), net_amount_huf=("net_amount_huf", "sum")) .sort_values(["order_date", "channel"]) ) write_deltalake(str(gold_daily_sales_path), gold_daily_sales_df, mode="overwrite") DeltaTable(str(gold_daily_sales_path)).to_pandas()
[delta] Silver → Gold: napi csatorna KPI – 5 sor
| order_date | channel | order_count | gross_amount_huf | net_amount_huf | |
|---|---|---|---|---|---|
| 0 | 2026-05-05 | partner | 1 | 14900 | 12665 |
| 1 | 2026-05-05 | webshop | 1 | 9900 | 9900 |
| 2 | 2026-05-06 | pos | 1 | 2490 | 2490 |
| 3 | 2026-05-06 | webshop | 1 | 19990 | 17991 |
| 4 | 2026-05-07 | webshop | 1 | 5990 | 5990 |
landing/orders/*.csv
→ bronze/orders_delta # raw, auditálható Delta
→ silver/orders_delta # tisztított rendelési tényadat
→ gold/daily_channel_sales # napi csatorna szerinti KPI
Fact és Dimension táblák (Star Schema)
A fact és dimension fogalmak a klasszikus adattárház-modellezésből jönnek. A fact tábla mérhető eseményeket tartalmaz (rendelések, tranzakciók, kattintások), a dimension táblák pedig leíró entitásokat (vásárlók, termékek, dátumok, csatornák). A kettő együtt alkotja a Star Schema-t, amely a BI riportok és dashboardok mögötti leggyakoribb adatmodell.
Fact = mérhető események (rendelések, összegek)
Dimension = leíró entitások (vásárló, csatorna, dátum)
A Star Schema előnye, hogy a riportkészítő eszközök (Power BI, Tableau, Superset) natívan támogatják: a fact tábla külső kulcsaival kapcsolódik a dimension táblákhoz, és a felhasználó egyszerű drag-and-drop műveletekkel tud riportokat készíteni. A dimension táblák "kiszélesítik" a fact táblát az ember által olvasható leírókkal, így a végfelhasználó nem azonosítókat lát, hanem neveket, címkéket és kategóriákat.
A következő cellák bemutatják, hogyan épül fel a dim_customer, dim_channel, dim_date és fact_sales tábla a silver réteg adataiból, majd hogyan joinolva létrejön egy riportbarát Sales Mart nézet.
customer_names = {"C-020": "Kovacs Anna", "C-021": "Nagy Bela", "C-022": "Toth Csilla", "C-023": "Szabo David"}
dim_customer_df = silver_for_model_df[["customer_id"]].drop_duplicates().sort_values("customer_id")
dim_customer_df["customer_name"] = dim_customer_df["customer_id"].map(customer_names)
dim_customer_df[spark] dim_customer dimension tábla – 4 sor
| customer_id | customer_name | |
|---|---|---|
| 0 | C-020 | Kovacs Anna |
| 1 | C-021 | Nagy Bela |
| 2 | C-022 | Toth Csilla |
| 3 | C-023 | Szabo David |
channel_labels = {"webshop": "Online webshop", "partner": "Partner sales", "pos": "Physical store"}
dim_channel_df = silver_for_model_df[["channel"]].drop_duplicates().sort_values("channel")
dim_channel_df["channel_name"] = dim_channel_df["channel"].map(channel_labels)
dim_channel_df[spark] dim_channel dimension tábla – 3 sor
| channel | channel_name | |
|---|---|---|
| 0 | partner | Partner sales |
| 1 | pos | Physical store |
| 2 | webshop | Online webshop |
dim_date_df = silver_for_model_df[["order_date"]].drop_duplicates().sort_values("order_date") dim_date_df["date_id"] = dim_date_df["order_date"].dt.strftime("%Y%m%d").astype("int64") dim_date_df["year"] = dim_date_df["order_date"].dt.year dim_date_df["month"] = dim_date_df["order_date"].dt.month dim_date_df["day"] = dim_date_df["order_date"].dt.day dim_date_df[["date_id", "order_date", "year", "month", "day"]]
[spark] dim_date dimension tábla – 3 sor
| date_id | order_date | year | month | day | |
|---|---|---|---|---|---|
| 0 | 20260505 | 2026-05-05 | 2026 | 5 | 5 |
| 1 | 20260506 | 2026-05-06 | 2026 | 5 | 6 |
| 2 | 20260507 | 2026-05-07 | 2026 | 5 | 7 |
fact_sales_df = silver_for_model_df.merge(dim_date_df[["date_id","order_date"]], on="order_date", how="left") fact_sales_df = fact_sales_df[["order_id","customer_id","channel","date_id","gross_amount_huf","discount_pct","net_amount_huf","is_online"]].sort_values("order_id") write_deltalake(str(dim_customer_path), dim_customer_df, mode="overwrite") write_deltalake(str(dim_channel_path), dim_channel_df, mode="overwrite") write_deltalake(str(dim_date_path), dim_date_df, mode="overwrite") write_deltalake(str(fact_sales_path), fact_sales_df, mode="overwrite") fact_sales_df
[delta] fact_sales + dimenziók Delta táblákba írva
| order_id | customer_id | channel | date_id | gross_amount_huf | discount_pct | net_amount_huf | is_online | |
|---|---|---|---|---|---|---|---|---|
| 0 | 3001 | C-020 | webshop | 20260505 | 9900 | 0.0 | 9900 | True |
| 1 | 3002 | C-021 | partner | 20260505 | 14900 | 15.0 | 12665 | False |
| 2 | 3003 | C-020 | pos | 20260506 | 2490 | 0.0 | 2490 | False |
| 3 | 3004 | C-022 | webshop | 20260506 | 19990 | 10.0 | 17991 | True |
| 4 | 3005 | C-023 | webshop | 20260507 | 5990 | 0.0 | 5990 | True |
sales_mart_df = (fact_sales_df
.merge(dim_customer_df, on="customer_id", how="left")
.merge(dim_channel_df, on="channel", how="left")
.merge(dim_date_df, on="date_id", how="left"))
sales_mart_df[["order_id", "customer_name", "channel_name", "order_date", "net_amount_huf"]][spark] Sales Mart (joined dim + fact) – 5 sor
| order_id | customer_name | channel_name | order_date | net_amount_huf | |
|---|---|---|---|---|---|
| 0 | 3001 | Kovacs Anna | Online webshop | 2026-05-05 | 9900 |
| 1 | 3002 | Nagy Bela | Partner sales | 2026-05-05 | 12665 |
| 2 | 3003 | Kovacs Anna | Physical store | 2026-05-06 | 2490 |
| 3 | 3004 | Toth Csilla | Online webshop | 2026-05-06 | 17991 |
| 4 | 3005 | Szabo David | Online webshop | 2026-05-07 | 5990 |
customer_kpi_df = (sales_mart_df
.groupby("customer_name", as_index=False)
.agg(order_count=("order_id", "count"), total_net_amount_huf=("net_amount_huf", "sum"))
.sort_values("total_net_amount_huf", ascending=False))
customer_kpi_df[spark] Customer KPI aggregátum – 4 sor
| customer_name | order_count | total_net_amount_huf | |
|---|---|---|---|
| 0 | Toth Csilla | 1 | 17991 |
| 1 | Nagy Bela | 1 | 12665 |
| 2 | Kovacs Anna | 2 | 12390 |
| 3 | Szabo David | 1 | 5990 |
Batch feldolgozás
Batch feldolgozásnál egyszerre egy véges adatcsomagot dolgozol fel. Például: minden éjjel 01:00-kor beolvasod az előző napi CSV-ket, frissíted a bronze, silver és gold rétegeket. Ez a legelterjedtebb feldolgozási mód a data engineeringben, mert egyszerűen megtervezhető, könnyen újrafuttatható, és a hibakeresés is viszonylag egyértelmű.
A kódcella egy összefoglaló táblázatban mutatja be a batch pipeline lépéseit: az extract fázisban CSV fájlokat olvasunk be, a load bronze-ba írjuk a nyers adatot, a transform silver-ben tisztítjuk, végül az aggregate gold-ban aggregáljuk a KPI-kat. Minden lépés batch jellegű, azaz egyszerre dolgozza fel a teljes adatcsomagot.
A batch feldolgozás fő előnye az egyszerűség: ha valami hiba történik, újra lehet futtatni az egész folyamatot (idempotens pipeline), és a végeredmény garantáltan konzisztens lesz. A hátránya, hogy az adat frissessége a batch ütemezésétől függ -- ha naponta fut, akkor a riportok legfeljebb egy napos adatot mutatnak.
Tip: A batch pipelineok tervezésénél érdemes az idempotenciára törekedni: azaz a többszori futtatás ugyanazt az eredményt adja, nem duplikál adatot, és nem okoz mellékhatást.
batch_summary = pd.DataFrame({
"step": ["extract", "load bronze", "transform silver", "aggregate gold"],
"input": ["landing CSV files", "landing DataFrame", "bronze Delta", "silver Delta"],
"output": ["pandas DataFrame", "bronze Delta", "silver Delta", "gold Delta"],
"execution_style": ["batch", "batch", "batch", "batch"],
})
batch_summary[spark] Batch feldolgozás lépései
| step | input | output | execution_style | |
|---|---|---|---|---|
| 0 | extract | landing CSV files | pandas DataFrame | batch |
| 1 | load bronze | landing DataFrame | bronze Delta | batch |
| 2 | transform silver | bronze Delta | silver Delta | batch |
| 3 | aggregate gold | silver Delta | gold Delta | batch |
Streaming és micro-batch gondolkodás
Kafka
Flink
Spark Streaming
Valódi streaminghez Spark Structured Streaming, Flink, Kafka kell. Itt micro-batch szimulációt csinálunk, amely bemutatja a streaming gondolkodásmód alapjait anélkül, hogy valódi streaming infrastruktúrára lenne szükség. A micro-batch lényegében kicsi, gyakran ismétlődő batch-ek sorozata -- a legtöbb "streaming" rendszer (beleértve a Spark Structured Streaming-et is) valójában így működik a színfalak mögött.
A kódcella két batch-et szimulál, mintha azok időben egymás után érkeznének (például 30 másodpercenként). Minden batch esetén a teljes pipeline lefut: a nyers adat a bronze táblába kerül (az első batch overwrite, a többi append), majd a bronze-ból újraolvassuk az összes eddigi adatot, transzformáljuk silver-re, aggregáljuk gold-ra, és felülírjuk a silver/gold táblákat. Így minden batch után a silver és gold táblák a teljes eddigi történetet tükrözik.
A különbség az egyszerű batch és a micro-batch között az, hogy itt az adat folyamatosan érkezik és feldolgozásra kerül, nem pedig egyetlen nagy futásban. A _batch_id oszlop a bronze rétegben nyomon követi, melyik adat melyik batch-ből származik, ami segíti az auditálást és a hibakeresést.
Tip: Éles streaming rendszerekben a checkpointing és a watermarks biztosítják, hogy a rendszer összeomlás esetén pontosan onnan folytatódhassa a feldolgozást, ahol abbamaradt -- ez a szimulációban nem jelenik meg, de production-ben elengedhetetlen.
stream_bronze_path = base_dir / "bronze/orders_stream_delta" stream_silver_path = base_dir / "silver/orders_stream_delta" stream_gold_path = base_dir / "gold/orders_stream_kpi_delta" stream_batches = [ pd.DataFrame({"order_id":["4001","4002"], "customer_id":["C-030","C-031"], "gross_amount_huf":["12900","3490"], "discount_pct":["0","0"], "order_date":["2026-05-08","2026-05-08"], "channel":["webshop","pos"]}), pd.DataFrame({"order_id":["4003","4004"], "customer_id":["C-032","C-030"], "gross_amount_huf":["8490","22990"], "discount_pct":["5","20"], "order_date":["2026-05-08","2026-05-09"], "channel":["partner","webshop"]}), ] for batch_id, batch_df in enumerate(stream_batches, start=1): bronze_batch_df = batch_df.copy() bronze_batch_df["_batch_id"] = batch_id bronze_mode = "overwrite" if batch_id == 1 else "append" write_deltalake(str(stream_bronze_path), bronze_batch_df, mode=bronze_mode) all_bronze = DeltaTable(str(stream_bronze_path)).to_pandas() all_silver = transform_orders(all_bronze) all_gold = aggregate_daily_channel_sales(all_silver) write_deltalake(str(stream_silver_path), all_silver, mode="overwrite") write_deltalake(str(stream_gold_path), all_gold, mode="overwrite") print(f"micro-batch {batch_id} feldolgozva | bronze: {len(all_bronze)} | silver: {len(all_silver)} | gold: {len(all_gold)}")
[delta] micro-batch 1 feldolgozva | bronze: 2 | silver: 2 | gold: 2 [delta] micro-batch 2 feldolgozva | bronze: 4 | silver: 4 | gold: 3
print("Bronze stream tábla:") DeltaTable(str(stream_bronze_path)).to_pandas().sort_values("order_id")
[delta] Bronze stream tábla – 4 sor
| order_id | customer_id | gross_amount_huf | discount_pct | order_date | channel | _batch_id | |
|---|---|---|---|---|---|---|---|
| 0 | 4001 | C-030 | 12900 | 0 | 2026-05-08 | webshop | 1 |
| 1 | 4002 | C-031 | 3490 | 0 | 2026-05-08 | pos | 1 |
| 2 | 4003 | C-032 | 8490 | 5 | 2026-05-08 | partner | 2 |
| 3 | 4004 | C-030 | 22990 | 20 | 2026-05-09 | webshop | 2 |
print("Gold stream KPI tábla:") DeltaTable(str(stream_gold_path)).to_pandas()
[delta] Gold stream KPI tábla – 4 sor
| order_date | channel | order_count | gross_amount_huf | net_amount_huf | |
|---|---|---|---|---|---|
| 0 | 2026-05-08 | partner | 1 | 8490 | 8066 |
| 1 | 2026-05-08 | pos | 1 | 3490 | 3490 |
| 2 | 2026-05-08 | webshop | 1 | 12900 | 12900 |
| 3 | 2026-05-09 | webshop | 1 | 22990 | 18392 |
Batch vs Streaming röviden
| Szempont | Batch | Streaming / micro-batch |
|---|---|---|
| Adat érkezése | Véges csomagokban | Folyamatosan vagy kis adagokban |
| Tipikus gyakoriság | Óránként, naponta, hetente | Másodpercenként, percenként |
| Egyszerűség | Egyszerűbb fejleszteni és újrafuttatni | Több állapotkezelés, checkpoint |
| Jó választás, ha | Nem kell azonnali frissesség | Közel valós idejű adat kell |
| Példa | Napi sales riport | Élő rendelésmonitoring, fraud detection |
Sok modern streaming rendszer valójában micro-batch módon működik: a feldolgozó engine kis, ismétlődő batch-ekre bontja a folyamatos adatfolyamot.
Open source metastore/catalog toolok kipróbáláshoz
Hive
Unity Catalog
Iceberg
Nessie
Polaris
Gravitino
| Tool | Mit ad hozzá? | Formátum fókusz | Mire jó? |
|---|---|---|---|
| Hive Metastore | Klasszikus Spark SQL metastore: database/table nevek, schema, location | Delta, Parquet, Hive | Megérteni az alap metastore modellt |
| Unity Catalog OSS | Databricks-szerű catalog: catalog.schema.table, jogosultságok | Delta, Iceberg, Parquet | Unity Catalog Databricks nélkül |
| Apache Gravitino | Federált metadata réteg, catalog of catalogs | Több engine és catalog | Több adattároló közös metadata réteg alá |
| Project Nessie | Git-szerű catalog branch/tag/commit | Apache Iceberg | Táblák verziózása catalog szinten |
| Apache Polaris | Iceberg REST catalog | Apache Iceberg | Modern Iceberg REST Catalog minta |
Ha a metastore alapfogalmat akarod megérteni: Hive Metastore
Ha a Databricks Unity Catalog modelljét: Unity Catalog OSS
Ha Iceberges modern catalogot: Polaris vagy Nessie
Ha több catalog és rendszer fölé: Apache Gravitino
Összefoglalás
✅ Delta table-t létre tudsz hozni Databricks és Synapse nélkül is.
✅ Ehhez kell egy Delta-kompatibilis writer, például deltalake vagy delta-spark.
✅ A Delta table fizikailag adatfájlokból és _delta_log naplóból áll.
✅ A schema a Delta metadata része.
✅ A metastore csak akkor kell, ha névvel, SQL-ből, catalogként akarod használni.
✅ Azure-ban ugyanez ADLS/Storage Account path-on történik.
✅ A landing/bronze/silver/gold rétegek az adat feldolgozottsági állapotát írják le.
✅ A batch és streaming közti fő különbség: véges csomagok vs folyamatosan érkező adat.
✅ A fact táblák mérhető eseményeket, a dimension táblák leíró entitásokat tartalmaznak.
ACID · Time Travel · OPTIMIZE