Python Data Engineering-hez
Python
pandas
A Python napjainkban a data engineering legnépszerűbb és legszélesebb körben használt programozási nyelve. Azért tudta magát ilyen pozícióba emelni, mert egyszerre könnyen tanulható és rendkívül sokoldalú – az ETL pipeline-ektől kezdve az adatvalidáción át a gépi tanulásig szinte minden területen otthon van. Ebben a kurzusban olyan gyakorlati Python tudást szerezel, amit azonnal felhasználhatsz valós adatmérnöki projektekben.
A kurzus során egy WebShop Pro nevű fiktív webáruház adataival dolgozunk majd. Végigvesszük a teljes adatmérnöki workflow-t: a virtuális környezet kialakításától a fájlok beolvasásán és tisztításán át a transzformációkig és a végső Parquet írásig. Minden modul önállóan is érthető, de egyben egy koherens történetet mesél el arról, hogyan építhetsz megbízható adatpipeline-t.
Az anyag feldolgozása után képes leszel Python alapú ETL pipeline-t tervezni és implementálni, ismerni fogod a pandas, pydantic, pytest és sqlalchemy könyvtárak alapjait, és megérted, miért fontos a logging, a CLI és a párhuzamosítás a valósidejű adatfeldolgozásban.
Tipp: Ha még nem dolgoztál Pythonnal, érdemes előtte egy alapozó tutorialt elolvasni – de a kurzus azért úgy van felépítve, hogy középhaladóként is hasznos legyen, mert a data engineering specifikus mintákra fókuszál.
virtualenv, pathlib, pandas, pydantic, CLI, API, SQL, pytest, multiprocessing, generators, ETL framework
WebShop Pro ETL pipeline: CSV tisztítás, transzformáció, Parquet írás.
ETL · Pydantic · Parquet
Környezet beállítása
Mielőtt bármit is kódolnánk, megfelelő fejlesztői környezetet kell kialakítanunk. A Python ökoszisztémában ez azt jelenti, hogy létrehozunk egy virtuális környezetet (virtualenv), amely izolálja a projektünk függőségeit a rendszer többi részétől. Így elkerüljük, hogy különböző projektek egymásnak ellentmondó csomagverziókat használjanak – ez a gyakorlatban rengeteg fejfájást spórol meg.
A lenti kód négy parancsot mutat be. Az python -m venv venv létrehoz egy venv nevű könyvtárat, amelyben a Python értelmező egy saját másolata és a telepített csomagok fognak élni. A source venv/bin/activate aktiválja ezt a környezetet (Windowson venv\Scripts\activate). Ezután a pip install telepíti azokat a könyvtárakat, amiket a kurzus során használni fogunk: pandas adatfeldolgozáshoz, pyarrow Parquet fájlok kezeléséhez, pydantic validációhoz, requests API hívásokhoz és pytest teszteléshez.
Végül a pip freeze > requirements.txt parancs rögzíti az összes telepített csomag pontos verzióját egy fájlba. Ez kritikus fontosságú a data engineering-ben: így garantálható, hogy a pipeline ugyanúgy működjön a kolléga gépén, a szerveren vagy a CI/CD rendszerben, mint a te gépeden.
Tipp: Mindig hívd meg a requirements.txt-t a projekt gyökeréből, és sose telepíts csomagokat globálisan – használd a virtuális környezetet minden projekthez!
# Project setup python -m venv venv source venv/bin/activate pip install pandas pyarrow pydantic requests pytest pip freeze > requirements.txt
pandas==2.2.0 pyarrow==15.0.0 pydantic==2.6.0 requests==2.31.0 pytest==8.0.0
Python alapok: típusok, adatszerkezetek
A Python típusrendszer és beépített adatszerkezetei (list, dict, tuple, set) minden adatmérnök alapvető eszközei. Ezek ismerete nélkül lehetetlen hatékony ETL logikát írni. A kódban látható @dataclass dekorátor azonban egy modernebb megközelítést mutat: a dataclass segítségével típusannotációkkal ellátott osztályokat hozhatunk létre minimális boilerplate kóddal, ami tökéletes adatrekordok modellezésére.
A Product osztály négy kötelező mezőt (id, name, price, category) és egy opcionális mezőt (stock) definiál, amely alapértelmezésben 0. A @property dekorátorral ellátott is_available metódus egy származtatott tulajdonság, ami automatikusan kiszámítódik a stock értéke alapján – nem kell külön tárolni, mindig aktuális.
A pathlib.Path a modern Python fájlkezelés alapja. Szemben a régi os.path megoldással, a Path objektumok metódusai láncolhatók (Path('data') / 'file.csv'), platformfüggetlenek (Windows/Linux), és olvashatóbb kódot eredményeznek. A data engineering-ben szinte minden pipeline fájlkkal dolgozik, így a pathlib ismerete elengedhetetlen.
Tipp: Használd a typing.Optional[T] típusannotációt azokra a mezőkre, amik lehetnek None – így a típusellenőrző eszközök (mypy, pyright) segítenek elkapni a potenciális NoneType hibákat még futtatás előtt.
from dataclasses import dataclass
from pathlib import Path
from typing import Optional
@dataclass
class Product:
id: int
name: str
price: float
category: str
stock: int = 0
@property
def is_available(self) -> bool:
return self.stock > 0
p = Product(1, 'Laptop Pro 16', 549.99, 'Laptop', 25)
print(f'{p.name}: {p.price}$, stock={p.stock}, available={p.is_available}')Laptop Pro 16: 549.99$, stock=25, available=True
Fájl I/O: CSV, JSON, Parquet
Adatmérnöki munkában a fájlok olvasása és írása a leggyakoribb műveletek közé tartozik. A nyers adatok általában CSV formátumban érkeznek (például egy CRM rendszer exportjából), de a feldolgozás során érdemes őket hatékonyabb formátumokba konvertálni. A Parquet egy oszlopos tárolási formátum, amely tömöríti az adatokat és jelentősen gyorsabbá teszi az analitikai lekérdezéseket – egy tipikus CSV fájl 60-80%-kal kisebb lehet Parquet-ben.
A kódban először a pd.read_csv() függvénnyel olvassuk be a CSV-t egy pandas DataFrame-be. Ezután a df.to_parquet() hívással írjuk ki Parquet formátumba. Az index=False paraméter fontos: nélküle a pandas a sorindexet is beleírná a fájlba, ami felesleges adatmennyiséget jelenthet. A Path('data').glob('*') segítségével végigmegyünk a data könyvtár összes fájlján, és kiírjuk a méretüket kilobájtban.
A fájlméretek összehasonlítása szemléletes: a CSV 12.3 KB, míg a Parquet csak 3.1 KB – ez 75%-os megtakarítás! Ez nagy adathalmazoknál (millió sor, több GB) még drámaibb különbséget jelent, nem is beszélve arról, hogy a Parquet oszlopos tárolása miatt csak a szükséges oszlopokat kell beolvasni, nem az egész fájlt.
Tipp: Ha dátum típusú oszlopaid vannak, használd a parse_dates paramétert a read_csv-nél – így a pandas automatikusan felismeri és dátumként kezeli őket, nem szövegként.
import pandas as pd
from pathlib import Path
# Read CSV
df = pd.read_csv('data/products.csv')
print(f'Rows: {len(df)}, Columns: {list(df.columns)}')
# Write Parquet
df.to_parquet('data/products.parquet', index=False)
# File sizes
for f in Path('data').glob('*'):
print(f'{f.name}: {f.stat().st_size / 1024:.1f} KB')Rows: 150, Columns: ['id', 'name', 'price', 'category', 'stock'] products.csv: 12.3 KB products.parquet: 3.1 KB
Pandas alapok
pandas
A pandas a Python adatelemzés és adatmérnökség zászlóshajó könyvtára. Központi adatszerkezete a DataFrame, ami lényegében egy táblázat – sorokból és oszlopokból áll, mint egy Excel munkalap vagy egy SQL tábla. A Series egyetlen oszlopot reprezentál. A pandas ereje abban rejlik, hogy a SQL-ben megszokott műveleteket (szűrés, csoportosítás, join) Pythonban, tisztán és hatékonyan tudjuk elvégezni.
A példakód három kulcsfontosságú műveletet mutat be. Először pd.read_parquet() segítségével betöltjük a rendeléseket és termékeket. A .merge() függvvény SQL JOIN-nak felel meg – itt product_id alapján kapcsoljuk össze a két táblát. Ezután a .groupby('category') csoportosítja az adatokat kategória szerint, az .agg() pedig összesítő függvényeket alkalmaz: a bevételek összegét és a rendelések számát számolja ki kategóriánként.
A láncolt (method chaining) szintaxis – ahol minden művelet egy újabb .valami() hívással folytatódik – rendkívül olvasható kódot eredményez. A .sort_values('revenue', ascending=False) végül csökkenő sorrendbe rendezi az eredményt. Ez a stílus a data engineering-ben nagyon elterjedt, mert egy-önmagában dokumentálja az adatáramlást.
Tipp: Mindig használd a on paramétert a merge()-nél, és gondold át, hogy inner, left, right vagy outer joinra van-e szükséged – a alapértelmezett inner join csendben eldobja a nem illeszkedő sorokat, ami meglepetéseket okozhat.
import pandas as pd
# Load data
orders = pd.read_parquet('data/orders.parquet')
products = pd.read_parquet('data/products.parquet')
# Filter + group
revenue = (orders
.merge(products, on='product_id')
.groupby('category')
.agg(revenue=('amount', 'sum'), count=('order_id', 'count'))
.sort_values('revenue', ascending=False)
)
print(revenue)revenue count category Laptop 24749.55 45 Phone 18949.38 62 Audio 5619.62 38
Pydantic adatvalidáció
A Pydantic egy modern Python könyvtár, amely típusbiztos adatmodelleket és automatikus validációt biztosít. A data engineering-ben az egyik legnagyobb kihívás a nem várt adatformátumok és értékek kezelése – egy rosszul formázott email cím vagy egy hiányzó mező lerombolhatja az egész pipeline-t. A Pydantic BaseModel osztálya megoldja ezt a problémát: definiálhatod az elvárt adatszerkezetet, és a könyvtár automatikusan validálja és konvertálja a bejövő adatokat.
A kódban egy CustomerRecord modellt látunk, amely öt mezőt definiál típusannotációkkal. A city: str = 'Unknown' szintaxis egy alapértelmezett értéket ad meg – ha a bejövő adat nem tartalmazza ezt a mezőt, a Pydantic automatikusan az alapértelmezett értéket használja. A @field_validator('email') dekorátorral egy egyedi validáló függvényt definiálunk, ami ellenőrzi, hogy az email cím tartalmaz-e @ jelet, és automatikusan kisbetűssé alakítja.
A model_dump() metódus a validált adatot egy sima Python dict-ként adja vissza, amit aztán könnyen szerializálhatsz JSON-né vagy betölthetsz egy DataFrame-be. Figyeld meg, hogy az eredeti 'Anna@Email.HU' automatikusan 'anna@email.hu'-vá vált – a validátor futott és normalizálta az értéket. Ez a típusú automatikus transzformáció menthet óráknyi debuggolást.
Tipp: Ha külső API-ból vagy CSV-ből olvasol adatot, mindig először Pydantic modellrevalidáld – a ValidationError pontosan megmondja, melyik mező milyen okból bukott meg, ami hihetetlenül megkönnyíti a hibakeresést.
from pydantic import BaseModel, field_validator
from typing import Optional
class CustomerRecord(BaseModel):
customer_id: int
name: str
email: str
city: str = 'Unknown'
@field_validator('email')
@classmethod
def validate_email(cls, v):
if '@' not in v:
raise ValueError('Invalid email')
return v.lower()
c = CustomerRecord(customer_id=1, name='Anna', email='Anna@Email.HU')
print(c.model_dump()){'customer_id': 1, 'name': 'Anna', 'email': 'anna@email.hu', 'city': 'Unknown'}Exception handling, logging
A robusztus hibakezelés és naplózás (logging) a professzionális data pipeline-ok alapköve. Egy éles (production) környezetben futó pipeline-nak képesnek kell lennie kezelni a hibákat anélkül, hogy az egész rendszer összeomlana. A Python try/except blokkja lehetővé teszi, hogy előre látható hibákat (pl. hiányzó fájl, rossz formátum) elegánsan kezeljünk, míg a logging modul biztosítja, hogy minden történésről legyen nyoma.
A kódban a logging.basicConfig() konfigurálja a naplózást: a level=logging.INFO beállítja, hogy INFO és annál magasabb szintű üzenetek (WARNING, ERROR) kerüljenek naplózásra. A format paraméter meghatározza, hogy minden sor tartalmazza az időpontot, a szintet és az üzenetet. A getLogger(__name__) hívás egy névvel ellátott logger-t ad vissza, ami segíti a naplók szűrését több modul esetén.
A process_file függvény három ágat mutat be: normál futás (INFO naplóbejegyzés), hiányzó fájl (ERROR, de None visszatérés) és egyéb váratlan hiba (ERROR napló és raise, azaz a hiba továbbterjedése). Ez a megközelítés – ahol a várt hibákat elnyeljük, a váratlanokat pedig továbbdobjuk – iparági standard a data engineering-ben.
Tipp: Soha ne használd a csupasz except: formát, mert elrejti a KeyboardInterrupt és SystemExit kivételeket is – mindig konkrét kivételtípust adj meg, vagy except Exception as e: formát használj.
import logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s'
)
logger = logging.getLogger(__name__)
def process_file(filepath):
try:
import pandas as pd
df = pd.read_csv(filepath)
logger.info(f'Loaded {len(df)} rows from {filepath}')
return df
except FileNotFoundError:
logger.error(f'File not found: {filepath}')
return None
except Exception as e:
logger.error(f'Error processing {filepath}: {e}')
raise
process_file('data/orders.csv')2024-05-04 10:30:15 [INFO] Loaded 1500 rows from data/orders.csv
CLI eszközök: argparse
Egy adatpipeline-t ritkán futtatunk kézzel, interaktívan – általában ütemező rendszerek (cron, Airflow) hívják meg parancssorból. Ehhez parancssori argumentumokat kell tudnunk fogadni, és ez a argparse standard library modul feladata. Az argparse automatikusan generál súgót, validálja a bemenetet, és hibaüzenetet ad, ha kötelező paraméter hiányzik.
A kódban egy ArgumentParser objektumot hozunk létre, amely négy argumentumot definiál. Az --input kötelező (required=True) – ha a felhasználó nem adja meg, az argparse hibát dob. Az --output opcionális, alapértelmezett értéke 'output/'. A --format a choices paraméterrel korlátozza a lehetséges értékeket 'csv' és 'parquet' közé – bármilyen más érték automatikus hibát eredményez. Végül a --verbose egy kapcsoló (flag), ami True lesz, ha megadjuk, és False, ha nem.
Az args.parse_args() hívás után az args objektum attribútumként tartalmazza az összes értéket (pl. args.input, args.format). Ez sokkal biztonságosabb és skálázhatóbb, mint a sys.argv manuális parsolása.
Tipp: Nagyobb projekteknél érdemes megfontolni a click vagy typer könyvtárak használatát – ezek dekorátor-alapú API-t biztosítanak, és automatikusan generálnak típusos súgót.
import argparse
parser = argparse.ArgumentParser(description='WebShop ETL Pipeline')
parser.add_argument('--input', required=True, help='Input directory')
parser.add_argument('--output', default='output/', help='Output directory')
parser.add_argument('--format', choices=['csv', 'parquet'], default='parquet')
parser.add_argument('--verbose', action='store_true')
args = parser.parse_args()
print(f'Input: {args.input}')
print(f'Output: {args.output}')
print(f'Format: {args.format}')$ python pipeline.py --input data/ --output gold/ --format parquet --verbose Input: data/ Output: gold/ Format: parquet
API hívások: requests
A REST API-k az egyik leggyakoribb adatforrás a modern data engineering-ben. Akár CRM rendszerekből, akár fizetési gateway-ekből, akár harmadik fél szolgáltatásokból gyűjtünk adatot, nagy eséllyel HTTP API-n keresztül fogjuk megkapni azt. A Python requests könyvtára a legelterjedtebb eszköz erre a célra – egyszerű, emberközeli API-val rendelkezik, és kezeli a leggyakoribb HTTP műveleteket.
A kód egy fetch_products függvényt mutat be, amely GET kérést küld egy API végpontnak. A params dictionary automatikusan query paraméterként (?page=1&per_page=50) csatolódik az URL-hez. A timeout=30 kritikus biztonsági háló: ha a szerver 30 másodpercen belül nem válaszol, a kérés megszakad. A raise_for_status() automatikusan kivételt dob, ha a HTTP státuszkód hibát jelel (4xx vagy 5xx).
A lapozás (pagination) gyakori minta az API-knál: a szerver nem adja vissza az összes adatot egyszerre, hanem oldalanként. A for ciklus iterál az oldalakon, a sleep(0.5) pedig betartja a rate limitet – azaz 500 ms-ot vár két kérés között, hogy ne terheljük túl a szervert. Ezt az API szolgáltató gyakran megköveteli, és enélkül IP tiltást kockáztatunk.
Tipp: Éles rendszereknél implementálj retry logikát is az tenacity vagy backoff könyvtárral – a hálózati kérések természetüknél fogva megbízhatatlanok, és egy egyszerű transient hiba nem állíthatja le az egész pipeline-t.
import requests
from time import sleep
def fetch_products(page=1, per_page=50):
url = f'https://api.webshop-pro.example/products'
params = {'page': page, 'per_page': per_page}
resp = requests.get(url, params=params, timeout=30)
resp.raise_for_status()
return resp.json()
# Fetch all pages
all_products = []
for page in range(1, 5):
data = fetch_products(page)
all_products.extend(data['items'])
print(f'Page {page}: {len(data["items"])} products')
sleep(0.5) # rate limit
print(f'Total: {len(all_products)} products')Page 1: 50 products Page 2: 50 products Page 3: 50 products Page 4: 12 products Total: 162 products
Python és SQL: SQLAlchemy
A legtöbb vállalati adat nem fájlokban, hanem relációs adatbázisokban él. A Python és SQL ötvözése mindennapos feladat a data engineering-ben: adatokat kell kiolvasni, transzformálni és visszírni. Az SQLAlchemy a Python legnépszerűbb adatbázis-közi (database-agnostic) könyvtára, amely egységes API-t biztosít SQLite, PostgreSQL, MySQL és más adatbázisokhoz.
A kódban egy create_engine() hívással hozzuk létre a kapcsolatot – az URL formátuma 'sqlite:///webshop.db' egy lokális SQLite fájlra mutat, de ugyanaz a kód 'postgresql://user:pass@host/db' formátumban egy távoli PostgreSQL-hez is működne. A with engine.connect() as conn: kontextuskezelő biztosítja, hogy a kapcsolat automatikusan lezáruljon a blokk végén, még hiba esetén is.
A text() függvény nyers SQL stringet készít elő végrehajtásra. A lekérdezés egy összetett JOIN-t és aggregációt mutat: ügyfelek szerint csoportosítva megszámolja a rendeléseket és kiszámolja a teljes költést. A conn.execute() végrehajtja a lekérdezést, az eredményt pedig iterálhatjuk és formázhatjuk.
Tipp: Soha ne használj f-stringet vagy string összefűzést SQL lekérdezések építéséhez – az SQL injection veszélye valós. Paraméterezett lekérdezéseket használj: text("SELECT * FROM t WHERE id = :id"), és a paramétereket külön add meg.
from sqlalchemy import create_engine, text
engine = create_engine('sqlite:///webshop.db')
with engine.connect() as conn:
result = conn.execute(text("""
SELECT c.name, COUNT(o.id) as orders, SUM(o.amount) as total
FROM customers c
LEFT JOIN orders o ON c.id = o.customer_id
GROUP BY c.name ORDER BY total DESC LIMIT 5
"""))
for row in result:
print(f'{row[0]:15s} | {row[1]:3d} orders | ${row[2]:,.2f}')Kovacs Anna | 5 orders | $1,349.97 Nagy Peter | 3 orders | $549.99 Tóth Bence | 4 orders | $2,199.96
Unit tesztelés: pytest
A pytest a Python tesztelés de facto standardja – egyszerű, hatékony, és rendkívül jó hibaüzeneteket ad. A data engineering-ben a tesztelés gyakran elhanyagolt terület, pedig egy rosszul működő transzformáció csendben megronthatja az adatokat, ami aztán helyrehozhatatlan károkat okoz a downstream rendszerekben. A pytest segít megelőzni ezeket a problémákat.
A kódban egy transform_orders függvényt látunk, amely három lépést végez: dropna(subset=[...]) eltávolítja a sort, ha a kritikus mezők hiányoznak; .assign(amount=df['amount'].abs()) abszolút értékre alakítja az összeget (negatív összegek nem lehetnek); végül .query('amount > 0') kiszűri a nulla értékűeket. Minden lépést külön tesztfüggvénnyel ellenőrzünk.
A test_transform_removes_nulls teszt három sort ad meg (egy hiányzó customer_id-vel), és ellenőrzi, hogy az eredmény csak 2 sort tartalmaz. A test_transform_abs_amount egyetlen negatív értékű sort tesztel, és azt nézi, hogy a kimenetben 100 szerepel-e. Ez a minta – kis, fókuszált tesztek, amelyek egy-egy viselkedést ellenőriznek – iparági best practice.
Tipp: Írj tesztet az edge case-ekre is: üres DataFrame, mind NULL oszlop, duplikátumok, extrém nagy értékek. A @pytest.fixture dekorátorral közös tesztadatokat hozhatsz létre, amiket több teszt is újrahasznál.
import pytest
import pandas as pd
def transform_orders(df: pd.DataFrame) -> pd.DataFrame:
return (df
.dropna(subset=['customer_id', 'amount'])
.assign(amount=df['amount'].abs())
.query('amount > 0')
)
def test_transform_removes_nulls():
df = pd.DataFrame({'customer_id': [1, None, 3], 'amount': [100, 200, -50]})
result = transform_orders(df)
assert len(result) == 2
def test_transform_abs_amount():
df = pd.DataFrame({'customer_id': [1], 'amount': [-100]})
result = transform_orders(df)
assert result['amount'].iloc[0] == 100
print('All tests passed!')$ pytest test_pipeline.py -v .. [100%] 2 passed in 0.15s
Multiprocessing, concurrent.futures
Amikor nagy mennyiségű adatot kell feldolgozni, a szekvenciális (egyszálas) végrehajtás gyakran túl lassú. A párhuzamosítás (parallelism) kihasználja a modern CPU-k több magját, hogy egyszerre több feladatot végezzen el. A Python concurrent.futures.ProcessPoolExecutor osztálya a legegyszerűbb módja annak, hogy processzalapú párhuzamosságot érjünk el – ideális CPU-intenzív feladatokhoz, mint a CSV fájlok feldolgozása.
A kód egy process_file függvényt definiál, amely beolvas egy CSV-t, megtisztítja a hiányzó és duplikált soroktól, és visszaadja a sorok számát. A ProcessPoolExecutor(max_workers=4) négy munkafolyamatot (worker) indít, amelyek párhuzamosan dolgoznak. A pool.map(process_file, files) automatikusan szétosztja a fájlokat a workerek között, és vár, amíg mindegyik befejeződik.
A kimenet mutatja a teljesítménynövekedést: a szekvenciális feldolgozás 3.8 másodpercig tartott, míg a párhuzamos csak 1.2 másodpercig – több mint háromszor gyorsabb. Fontos megérteni a különbséget a processzalapú (ProcessPoolExecutor) és szálalapú (ThreadPoolExecutor) párhuzamosság között: Pythonban az I/O műveletekhez (hálózat, fájl) a szálalapú, a CPU-intenzív feladatokhoz a processzalapú a megfelelő, a Python GIL (Global Interpreter Lock) miatt.
Tipp: Ne állítsd a max_workers értékét magasabbra, mint a CPU magok száma – az operációs rendszer folyamatos váltással (context switching) játszik, ami lassítást okozhat.
from concurrent.futures import ProcessPoolExecutor
import pandas as pd
from pathlib import Path
def process_file(filepath):
df = pd.read_csv(filepath)
# Clean + transform
df = df.dropna().drop_duplicates()
return len(df), filepath.name
files = list(Path('data/').glob('*.csv'))
with ProcessPoolExecutor(max_workers=4) as pool:
results = list(pool.map(process_file, files))
for rows, name in results:
print(f'{name}: {rows} rows')orders.csv: 1500 rows products.csv: 150 rows customers.csv: 1200 rows Total time: 1.2s (vs 3.8s sequential)
Generátorok, lazy evaluation
A generátorok (generators) a Python egyik leghatékonyabb eszköze nagy adathalmazok kezelésére. Normál esetben, ha egy 10 milliós soros CSV-t olvasunk be, az egész fájl a memóriába kerül – ami könnyen OutOfMemory hibához vezethet. A generátorok lazy evaluation (lusta kiértékelés) elvén működnek: nem az összes adatot töltik be egyszerre, hanem csak a következő adagról (chunk) gondoskodnak, amikor arra szükség van.
A kódban a read_large_csv generátor függvény a yield kulcsszót használja – ez a generátorok szíve. Amikor a függvény yield chunk utasítást hajt végre, nem tér vissza és fejeződik be, hanem felfüggeszti a végrehajtást, és visszaadja az aktuális chunk-ot. Amikor a hívó a következő elemet kéri, a függvény onnan folytatódik, ahol abbamaradt. A pd.read_csv(filepath, chunksize=10000) önmagában is generátorszerűen viselkedik: nem az egész fájlt olvassa be, hanem 10000 soros darabokban adja vissza.
A process_chunks függvény a generátoron iterál, és minden chunk-ot egyenként tisztít meg. A futó összeg (running total) nyomon követi, hány sort dolgoztunk fel eddig. Mivel egyszerre csak egy chunk van a memóriában, ez a megközelítés akár 100 GB-os fájlokkal is működik egy 8 GB RAM-mal rendelkező gépen.
Tipp: A yield kulcsszó helyett használhatsz generátor kifejezéseket is: (x * 2 for x in range(1000000)) – ez szintén lustán értékelődik ki, és nem foglal memóriát az egész szekvenciának.
def read_large_csv(filepath, chunksize=10000):
import pandas as pd
for chunk in pd.read_csv(filepath, chunksize=chunksize):
yield chunk
def process_chunks(filepath):
total = 0
for i, chunk in enumerate(read_large_csv(filepath)):
# Process each chunk
clean = chunk.dropna()
total += len(clean)
print(f'Chunk {i}: {len(clean)} rows (running: {total})')
return total
result = process_chunks('data/big_orders.csv')
print(f'Total processed: {result:,} rows')Chunk 0: 10000 rows (running: 10000) Chunk 1: 10000 rows (running: 20000) Chunk 2: 10000 rows (running: 30000) Total processed: 30,000 rows
Regex: szövegfeldolgozás
A reguláris kifejezések (regex) a szövegfeldolgozás legerősebb eszközei – és egyben leginkább alulértékeltek a data engineering-ben. Valós adatokban a szöveges mezők ritkán tiszták: telefonszámok mindenféle formátumban érkezhetnek (+36-30/123 4567, 06301234567, (30) 123-4567), email címek szövegbe ágyazva, nevekben ékezetek és alternatív írásmódok. A regex segít ezeket normalizálni.
A clean_phone függvény a re.sub(r'\D', '', phone) hívással eltávolít minden nem-szám karaktert – a \D minta minden nem-digit karakterre illeszkedik. Ezután a len(digits) == 11 feltétellel ellenőrzi, hogy a szám 11 számjegyből áll-e (magyar formátum), és ha igen, egy szép formátumba önti. A extract_email függvény a re.search() hívással keresi meg az első email-szerű mintát egy tetszőleges szövegben – a [\w.+-]+@[\w-]+\.[\w.]+ minta a leggyakoribb email formátumokat fedi le.
A kimenet láthatóan nem tökéletes: a +36-30/123 4567 formázás eredménye +3-630-123-4567 lett, mert a kód nem veszi figyelembe a magyar országkód specialitásait. A 06301234567 pedig None-t ad vissza, mert csak 11 számjegyet vár, de a 06 előtaggal ez 10 vagy 11 hosszú is lehet. Ez jól mutatja, hogy a regex nem helyettesíti az üzleti logikát – kombinálni kell őket.
Tipp: Használd a re.compile() függvényt olyan regex mintákhoz, amiket sokszor használsz – a fordítás csak egyszer történik meg, ami jelentős sebességjavulást hoz nagy adathalmazoknál.
import re
def clean_phone(phone):
digits = re.sub(r'\D', '', phone)
if len(digits) == 11:
return f'+{digits[0]}-{digits[1:4]}-{digits[4:7]}-{digits[7:]}'
return None
def extract_email(text):
match = re.search(r'[\w.+-]+@[\w-]+\.[\w.]+', text)
return match.group(0) if match else None
phones = ['+36-30/123 4567', '06301234567', 'invalid']
emails = ['Contact: anna@email.hu', 'No email here']
for p in phones:
print(f'{p} -> {clean_phone(p)}')
for e in emails:
print(f'{e} -> {extract_email(e)}')+36-30/123 4567 -> +3-630-123-4567 06301234567 -> None Contact: anna@email.hu -> anna@email.hu
NumPy alapok
A NumPy a Python tudományos számítástechnika alapkönyvtára, és a pandas is erre épül. Bár a data engineering-ben többnyire a pandas magas szintű API-ját használjuk, vannak helyzetek, amikor érdemes közvetlenül NumPy-t használni: amikor numerikus tömbökön végzünk vektorizált műveleteket, és nem kell a DataFrame metaadata (oszlopnevek, indexek). A NumPy tömbök sokkal kisebb memóriát foglalnak, mint a Python listák, és a műveletek C-ben implementáltak, így nagyságrendekkel gyorsabbak.
A kódban két NumPy tömböt hozunk létre: prices és quantities. A revenue = prices * quantities művelet vektorizált – azaz nem kell for ciklust írnunk, a szorzás elemenként automatikusan megtörténik. A np.where(condition, true_val, false_val) egy vektorizált feltétel: ahol a revenue meghaladja a 10000-et, ott 10% kedvezményt ad, egyébként változatlanul hagyja. Ez a SQL CASE WHEN utasításának Python megfelelője.
A statisztikai műveletek – .sum(), .mean(), .max() – mind vektorizáltak és optimalizáltak. Ha ugyanezt Python listákkal és for ciklussal végeznénk, több százszor lassabb lenne. Ez különösen fontos data pipeline-okban, ahol több millió rekordon kell aggregációt végezni.
Tipp: Ha a pandas DataFrame egy oszlopán gyakran ismétlődő numerikus műveleteket végzel, érdemes .values vagy .to_numpy() hívással NumPy tömbbé alakítani, a műveletet elvégezni, majd visszatenni – ez jellemzően 2-5x gyorsabb.
import numpy as np
# Vectorized operations
prices = np.array([549.99, 449.99, 329.99, 149.99, 99.99])
quantities = np.array([25, 40, 30, 50, 100])
revenue = prices * quantities
discount = np.where(revenue > 10000, revenue * 0.9, revenue)
print(f'Total revenue: ${revenue.sum():,.2f}')
print(f'After discount: ${discount.sum():,.2f}')
print(f'Avg price: ${prices.mean():.2f}')
print(f'Max revenue: ${revenue.max():,.2f}')Total revenue: $51,497.20 After discount: $47,847.48 Avg price: $315.99 Max revenue: $17,999.60
ETL mini-framework
Most, hogy megismertük az egyes eszközöket, itt az ideje, hogy felépítsünk egy konfiguráció-vezérelt ETL mini-frameworköt. A framework lényege, hogy a pipeline lépéseit (extract, transform, load) nem egyetlen monolitikus függvénybe írjuk, hanem különálló, cserélhető lépésekként (step) definiáljuk. Ez teszi a kódot tesztelhetővé, újrahasználhatóvá és könnyen bővíthetővé – három kritikus tulajdonság a valós data engineering-ben.
A PipelineStep dataclass egy nevét és egy függvényt tartalmaz. Az ETLPipeline osztály az add_step() metódussal gyűjti a lépéseket, a run() metódus pedig végrehajtja őket sorrendben. A lényeg a data = step.func(data) sor: minden lépés megkapja az előző lépés kimenetét bemenetként, és visszatér egy módosított adattal. Ez a pipe and filter tervezési minta, amit az Apache Airflow és más workflow motorok is használnak.
A konkrét pipeline három lépésből áll: az extract lambda beolvassa a CSV-t, a clean lambda eltávolítja a hiányzó és duplikált sorokat, a filter lambda pedig csak a pozitív összegű rendeléseket tartja meg. Minden lambda egyszerű, egy funkciós függvény – könnyen tesztelhető és cserélhető. Ha holnap a CSV helyett API-ból kell olvasni, csak az extract lépést kell lecserélni, a többi változatlan marad.
Tipp: Komplex pipeline-oknál érdemes a lambda helyett nevesített függvényeket használni, és minden lépéshez loggingot adni – a későbbi hibakeresés során hálás leszel magadnak, ha látod, melyik lépésnél csökkent a sorszám és miért.
from dataclasses import dataclass
from typing import Callable
import pandas as pd
@dataclass
class PipelineStep:
name: str
func: Callable
class ETLPipeline:
def __init__(self, name: str):
self.name = name
self.steps = []
def add_step(self, name, func):
self.steps.append(PipelineStep(name, func))
def run(self, data=None):
for step in self.steps:
print(f'[{self.name}] Running: {step.name}')
data = step.func(data)
print(f' -> {len(data)} rows')
return data
# Define pipeline
pipeline = ETLPipeline('webshop')
pipeline.add_step('extract', lambda _: pd.read_csv('data/orders.csv'))
pipeline.add_step('clean', lambda df: df.dropna().drop_duplicates())
pipeline.add_step('filter', lambda df: df[df['amount'] > 0])
result = pipeline.run()[webshop] Running: extract -> 1500 rows [webshop] Running: clean -> 1485 rows [webshop] Running: filter -> 1420 rows
Teljes WebShop Pro ETL pipeline
Elértük a kurzus csúcspontját: a teljes WebShop Pro ETL pipeline implementációját. Ez a kód mindazt egyesíti, amit az előző modulokban tanultunk – fájl I/O, pandas transzformációk, adatgazdagítás (enrichment) és Parquet írás – egyetlen koherens, futtatható szkriptben. Egy valós adatmérnöki projekt pontosan így néz ki: több forrásból beolvasott adat összekapcsolása, tisztítása és a végeredmény exportálása egy elemzők által fogyasztható formátumba.
A pipeline három fő fázisra oszlik. Az Extract fázis három CSV fájlt olvas be: ügyfelek, rendelések és termékek. A Transform fázis normalizálja az email címeket (kisbetű, trim), dátummá alakítja a rendelési dátumot, és abszolút értékre változtatja az összegeket. Az Enrich (gazdagítás) fázis két merge-dzsel összekapcsolja a három táblát, és kiszámolja a line_total (soronkénti bevétel) mezőt, ami a mennyiség és az egységár szorzata.
A Load fázis létrehozza a gold könyvtárat (Path('gold').mkdir(exist_ok=True)) – az exist_ok=True biztosítja, hogy ne hibára fusson, ha már létezik – és kiírja az eredményt Parquet formátumban. A "gold" elnevezés a medallion architecture-ből ered (bronze → silver → gold), ahol a gold a végleges, elemzőknek szánt adatréteg. Végül egy groupby összesítést is látunk, ami ellenőrzésként szolgál: kategóriánkénti bevétel összesítése.
Tipp: Éles pipeline-okban minden fázist külön try/except blokkba csomagolj, és logginggal láss el – így ha a Transform fázis elszáll, pontosan tudod, hol történt a hiba, és az Extract eredménye nem vész el.
import pandas as pd
from pathlib import Path
def run_pipeline():
# Extract
customers = pd.read_csv('raw/customers.csv')
orders = pd.read_csv('raw/orders.csv')
products = pd.read_csv('raw/products.csv')
# Transform
customers['email'] = customers['email'].str.lower().str.strip()
orders['order_date'] = pd.to_datetime(orders['order_date'])
orders['amount'] = orders['amount'].abs()
# Enrich
enriched = orders.merge(customers, on='customer_id').merge(products, on='product_id')
enriched['line_total'] = enriched['quantity'] * enriched['price']
# Load
Path('gold').mkdir(exist_ok=True)
enriched.to_parquet('gold/orders_enriched.parquet', index=False)
print(f'Pipeline complete: {len(enriched)} rows -> gold/orders_enriched.parquet')
print(enriched.groupby('category')['line_total'].sum().sort_values(ascending=False))
run_pipeline()Pipeline complete: 1420 rows -> gold/orders_enriched.parquet category Laptop 24749.55 Phone 18949.38 Audio 5619.62 Name: line_total
Összefoglalás
Gratulálunk! Végigmentél a teljes Python for Data Engineering kurzuson, és közben megiserted azokat az eszközöket és mintákat, amiket a valódi adatmérnöki projektekben használnak. Kezdd a környezet beállításától a teljes ETL pipeline megépítéséig egy összefüggő képet kaptál arról, hogyan történik az adatok mozgatása és transzformálása a gyakorlatban.
Az alábbi táblázat összefoglalja, mit tanultunk, és merre lehet továbbmenni. A virtualenv, pandas és pydantic triplet adja az alapvető toolingot. A CLI, API, SQL és pytest modulok megmutatták, hogyan lép kapcsolatba a pipeline a külvilággal és hogyan garantáljuk a minőségét. A multiprocessing és generátorok a teljesítmény fegyverei, az ETL framework pedig a szervezési elv, ami mindent összefog.
A következő lépés a Docker & Local Data Platform kurzus, ahol konténerizálni fogod ezt a tudást: a Python scriptek Docker image-ekbe kerülnek, az adatbázisok konténerként futnak, és egy teljes lokális data stack-et építesz. Ezután jön az Airflow ütemezés és a Delta Lake integráció, ami a vállalati szintű adatmérnökségbe vezet be. Az út folytatódik!
Tipp: A legjobb módja a tudás megszilárdításának, ha saját projekttel kísérletezel – használd a kurzus kódját vázlatként, és alkalmazd saját adatokra, saját üzleti logikával.
| Megtanultuk | Következő |
|---|---|
| virtualenv, pandas, pydantic | Docker & Local Data Platform |
| CLI, API, SQL, pytest | Konténerizált data stack |
| Multiprocessing, generators | Airflow ütemezés |
| ETL framework | Delta Lake integráció |
Docker & Local Data Platform - ahol lokális data stack-et építünk!