</> Python for Data Engineering

0 / 18 section completed
Section 00

Python Data Engineering-hez Python pandas

A Python napjainkban a data engineering legnépszerűbb és legszélesebb körben használt programozási nyelve. Azért tudta magát ilyen pozícióba emelni, mert egyszerre könnyen tanulható és rendkívül sokoldalú – az ETL pipeline-ektől kezdve az adatvalidáción át a gépi tanulásig szinte minden területen otthon van. Ebben a kurzusban olyan gyakorlati Python tudást szerezel, amit azonnal felhasználhatsz valós adatmérnöki projektekben.

A kurzus során egy WebShop Pro nevű fiktív webáruház adataival dolgozunk majd. Végigvesszük a teljes adatmérnöki workflow-t: a virtuális környezet kialakításától a fájlok beolvasásán és tisztításán át a transzformációkig és a végső Parquet írásig. Minden modul önállóan is érthető, de egyben egy koherens történetet mesél el arról, hogyan építhetsz megbízható adatpipeline-t.

Az anyag feldolgozása után képes leszel Python alapú ETL pipeline-t tervezni és implementálni, ismerni fogod a pandas, pydantic, pytest és sqlalchemy könyvtárak alapjait, és megérted, miért fontos a logging, a CLI és a párhuzamosítás a valósidejű adatfeldolgozásban.

Tipp: Ha még nem dolgoztál Pythonnal, érdemes előtte egy alapozó tutorialt elolvasni – de a kurzus azért úgy van felépítve, hogy középhaladóként is hasznos legyen, mert a data engineering specifikus mintákra fókuszál.

Tartalom

virtualenv, pathlib, pandas, pydantic, CLI, API, SQL, pytest, multiprocessing, generators, ETL framework

Projekt

WebShop Pro ETL pipeline: CSV tisztítás, transzformáció, Parquet írás.

Szójegyzék

ETL · Pydantic · Parquet

Section 01

Környezet beállítása

Mielőtt bármit is kódolnánk, megfelelő fejlesztői környezetet kell kialakítanunk. A Python ökoszisztémában ez azt jelenti, hogy létrehozunk egy virtuális környezetet (virtualenv), amely izolálja a projektünk függőségeit a rendszer többi részétől. Így elkerüljük, hogy különböző projektek egymásnak ellentmondó csomagverziókat használjanak – ez a gyakorlatban rengeteg fejfájást spórol meg.

A lenti kód négy parancsot mutat be. Az python -m venv venv létrehoz egy venv nevű könyvtárat, amelyben a Python értelmező egy saját másolata és a telepített csomagok fognak élni. A source venv/bin/activate aktiválja ezt a környezetet (Windowson venv\Scripts\activate). Ezután a pip install telepíti azokat a könyvtárakat, amiket a kurzus során használni fogunk: pandas adatfeldolgozáshoz, pyarrow Parquet fájlok kezeléséhez, pydantic validációhoz, requests API hívásokhoz és pytest teszteléshez.

Végül a pip freeze > requirements.txt parancs rögzíti az összes telepített csomag pontos verzióját egy fájlba. Ez kritikus fontosságú a data engineering-ben: így garantálható, hogy a pipeline ugyanúgy működjön a kolléga gépén, a szerveren vagy a CI/CD rendszerben, mint a te gépeden.

Tipp: Mindig hívd meg a requirements.txt-t a projekt gyökeréből, és sose telepíts csomagokat globálisan – használd a virtuális környezetet minden projekthez!

[1]
# Project setup
python -m venv venv
source venv/bin/activate
pip install pandas pyarrow pydantic requests pytest
pip freeze > requirements.txt
Output:
pandas==2.2.0
pyarrow==15.0.0
pydantic==2.6.0
requests==2.31.0
pytest==8.0.0
Section 02

Python alapok: típusok, adatszerkezetek

A Python típusrendszer és beépített adatszerkezetei (list, dict, tuple, set) minden adatmérnök alapvető eszközei. Ezek ismerete nélkül lehetetlen hatékony ETL logikát írni. A kódban látható @dataclass dekorátor azonban egy modernebb megközelítést mutat: a dataclass segítségével típusannotációkkal ellátott osztályokat hozhatunk létre minimális boilerplate kóddal, ami tökéletes adatrekordok modellezésére.

A Product osztály négy kötelező mezőt (id, name, price, category) és egy opcionális mezőt (stock) definiál, amely alapértelmezésben 0. A @property dekorátorral ellátott is_available metódus egy származtatott tulajdonság, ami automatikusan kiszámítódik a stock értéke alapján – nem kell külön tárolni, mindig aktuális.

A pathlib.Path a modern Python fájlkezelés alapja. Szemben a régi os.path megoldással, a Path objektumok metódusai láncolhatók (Path('data') / 'file.csv'), platformfüggetlenek (Windows/Linux), és olvashatóbb kódot eredményeznek. A data engineering-ben szinte minden pipeline fájlkkal dolgozik, így a pathlib ismerete elengedhetetlen.

Tipp: Használd a typing.Optional[T] típusannotációt azokra a mezőkre, amik lehetnek None – így a típusellenőrző eszközök (mypy, pyright) segítenek elkapni a potenciális NoneType hibákat még futtatás előtt.

[2]
from dataclasses import dataclass
from pathlib import Path
from typing import Optional

@dataclass
class Product:
    id: int
    name: str
    price: float
    category: str
    stock: int = 0

    @property
    def is_available(self) -> bool:
        return self.stock > 0

p = Product(1, 'Laptop Pro 16', 549.99, 'Laptop', 25)
print(f'{p.name}: {p.price}$, stock={p.stock}, available={p.is_available}')
Output:
Laptop Pro 16: 549.99$, stock=25, available=True
Section 03

Fájl I/O: CSV, JSON, Parquet

Adatmérnöki munkában a fájlok olvasása és írása a leggyakoribb műveletek közé tartozik. A nyers adatok általában CSV formátumban érkeznek (például egy CRM rendszer exportjából), de a feldolgozás során érdemes őket hatékonyabb formátumokba konvertálni. A Parquet egy oszlopos tárolási formátum, amely tömöríti az adatokat és jelentősen gyorsabbá teszi az analitikai lekérdezéseket – egy tipikus CSV fájl 60-80%-kal kisebb lehet Parquet-ben.

A kódban először a pd.read_csv() függvénnyel olvassuk be a CSV-t egy pandas DataFrame-be. Ezután a df.to_parquet() hívással írjuk ki Parquet formátumba. Az index=False paraméter fontos: nélküle a pandas a sorindexet is beleírná a fájlba, ami felesleges adatmennyiséget jelenthet. A Path('data').glob('*') segítségével végigmegyünk a data könyvtár összes fájlján, és kiírjuk a méretüket kilobájtban.

A fájlméretek összehasonlítása szemléletes: a CSV 12.3 KB, míg a Parquet csak 3.1 KB – ez 75%-os megtakarítás! Ez nagy adathalmazoknál (millió sor, több GB) még drámaibb különbséget jelent, nem is beszélve arról, hogy a Parquet oszlopos tárolása miatt csak a szükséges oszlopokat kell beolvasni, nem az egész fájlt.

Tipp: Ha dátum típusú oszlopaid vannak, használd a parse_dates paramétert a read_csv-nél – így a pandas automatikusan felismeri és dátumként kezeli őket, nem szövegként.

[3]
import pandas as pd
from pathlib import Path

# Read CSV
df = pd.read_csv('data/products.csv')
print(f'Rows: {len(df)}, Columns: {list(df.columns)}')

# Write Parquet
df.to_parquet('data/products.parquet', index=False)

# File sizes
for f in Path('data').glob('*'):
    print(f'{f.name}: {f.stat().st_size / 1024:.1f} KB')
Output:
Rows: 150, Columns: ['id', 'name', 'price', 'category', 'stock']

products.csv: 12.3 KB
products.parquet: 3.1 KB
Section 04

Pandas alapok pandas

A pandas a Python adatelemzés és adatmérnökség zászlóshajó könyvtára. Központi adatszerkezete a DataFrame, ami lényegében egy táblázat – sorokból és oszlopokból áll, mint egy Excel munkalap vagy egy SQL tábla. A Series egyetlen oszlopot reprezentál. A pandas ereje abban rejlik, hogy a SQL-ben megszokott műveleteket (szűrés, csoportosítás, join) Pythonban, tisztán és hatékonyan tudjuk elvégezni.

A példakód három kulcsfontosságú műveletet mutat be. Először pd.read_parquet() segítségével betöltjük a rendeléseket és termékeket. A .merge() függvvény SQL JOIN-nak felel meg – itt product_id alapján kapcsoljuk össze a két táblát. Ezután a .groupby('category') csoportosítja az adatokat kategória szerint, az .agg() pedig összesítő függvényeket alkalmaz: a bevételek összegét és a rendelések számát számolja ki kategóriánként.

A láncolt (method chaining) szintaxis – ahol minden művelet egy újabb .valami() hívással folytatódik – rendkívül olvasható kódot eredményez. A .sort_values('revenue', ascending=False) végül csökkenő sorrendbe rendezi az eredményt. Ez a stílus a data engineering-ben nagyon elterjedt, mert egy-önmagában dokumentálja az adatáramlást.

Tipp: Mindig használd a on paramétert a merge()-nél, és gondold át, hogy inner, left, right vagy outer joinra van-e szükséged – a alapértelmezett inner join csendben eldobja a nem illeszkedő sorokat, ami meglepetéseket okozhat.

[4]
import pandas as pd

# Load data
orders = pd.read_parquet('data/orders.parquet')
products = pd.read_parquet('data/products.parquet')

# Filter + group
revenue = (orders
    .merge(products, on='product_id')
    .groupby('category')
    .agg(revenue=('amount', 'sum'), count=('order_id', 'count'))
    .sort_values('revenue', ascending=False)
)
print(revenue)
Output:
          revenue  count
category                 
Laptop   24749.55     45
Phone    18949.38     62
Audio     5619.62     38
Section 05

Pydantic adatvalidáció

A Pydantic egy modern Python könyvtár, amely típusbiztos adatmodelleket és automatikus validációt biztosít. A data engineering-ben az egyik legnagyobb kihívás a nem várt adatformátumok és értékek kezelése – egy rosszul formázott email cím vagy egy hiányzó mező lerombolhatja az egész pipeline-t. A Pydantic BaseModel osztálya megoldja ezt a problémát: definiálhatod az elvárt adatszerkezetet, és a könyvtár automatikusan validálja és konvertálja a bejövő adatokat.

A kódban egy CustomerRecord modellt látunk, amely öt mezőt definiál típusannotációkkal. A city: str = 'Unknown' szintaxis egy alapértelmezett értéket ad meg – ha a bejövő adat nem tartalmazza ezt a mezőt, a Pydantic automatikusan az alapértelmezett értéket használja. A @field_validator('email') dekorátorral egy egyedi validáló függvényt definiálunk, ami ellenőrzi, hogy az email cím tartalmaz-e @ jelet, és automatikusan kisbetűssé alakítja.

A model_dump() metódus a validált adatot egy sima Python dict-ként adja vissza, amit aztán könnyen szerializálhatsz JSON-né vagy betölthetsz egy DataFrame-be. Figyeld meg, hogy az eredeti 'Anna@Email.HU' automatikusan 'anna@email.hu'-vá vált – a validátor futott és normalizálta az értéket. Ez a típusú automatikus transzformáció menthet óráknyi debuggolást.

Tipp: Ha külső API-ból vagy CSV-ből olvasol adatot, mindig először Pydantic modellrevalidáld – a ValidationError pontosan megmondja, melyik mező milyen okból bukott meg, ami hihetetlenül megkönnyíti a hibakeresést.

[5]
from pydantic import BaseModel, field_validator
from typing import Optional

class CustomerRecord(BaseModel):
    customer_id: int
    name: str
    email: str
    city: str = 'Unknown'
    
    @field_validator('email')
    @classmethod
    def validate_email(cls, v):
        if '@' not in v:
            raise ValueError('Invalid email')
        return v.lower()

c = CustomerRecord(customer_id=1, name='Anna', email='Anna@Email.HU')
print(c.model_dump())
Output:
{'customer_id': 1, 'name': 'Anna', 'email': 'anna@email.hu', 'city': 'Unknown'}
Section 06

Exception handling, logging

A robusztus hibakezelés és naplózás (logging) a professzionális data pipeline-ok alapköve. Egy éles (production) környezetben futó pipeline-nak képesnek kell lennie kezelni a hibákat anélkül, hogy az egész rendszer összeomlana. A Python try/except blokkja lehetővé teszi, hogy előre látható hibákat (pl. hiányzó fájl, rossz formátum) elegánsan kezeljünk, míg a logging modul biztosítja, hogy minden történésről legyen nyoma.

A kódban a logging.basicConfig() konfigurálja a naplózást: a level=logging.INFO beállítja, hogy INFO és annál magasabb szintű üzenetek (WARNING, ERROR) kerüljenek naplózásra. A format paraméter meghatározza, hogy minden sor tartalmazza az időpontot, a szintet és az üzenetet. A getLogger(__name__) hívás egy névvel ellátott logger-t ad vissza, ami segíti a naplók szűrését több modul esetén.

A process_file függvény három ágat mutat be: normál futás (INFO naplóbejegyzés), hiányzó fájl (ERROR, de None visszatérés) és egyéb váratlan hiba (ERROR napló és raise, azaz a hiba továbbterjedése). Ez a megközelítés – ahol a várt hibákat elnyeljük, a váratlanokat pedig továbbdobjuk – iparági standard a data engineering-ben.

Tipp: Soha ne használd a csupasz except: formát, mert elrejti a KeyboardInterrupt és SystemExit kivételeket is – mindig konkrét kivételtípust adj meg, vagy except Exception as e: formát használj.

[6]
import logging

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s [%(levelname)s] %(message)s'
)
logger = logging.getLogger(__name__)

def process_file(filepath):
    try:
        import pandas as pd
        df = pd.read_csv(filepath)
        logger.info(f'Loaded {len(df)} rows from {filepath}')
        return df
    except FileNotFoundError:
        logger.error(f'File not found: {filepath}')
        return None
    except Exception as e:
        logger.error(f'Error processing {filepath}: {e}')
        raise

process_file('data/orders.csv')
Output:
2024-05-04 10:30:15 [INFO] Loaded 1500 rows from data/orders.csv
Section 07

CLI eszközök: argparse

Egy adatpipeline-t ritkán futtatunk kézzel, interaktívan – általában ütemező rendszerek (cron, Airflow) hívják meg parancssorból. Ehhez parancssori argumentumokat kell tudnunk fogadni, és ez a argparse standard library modul feladata. Az argparse automatikusan generál súgót, validálja a bemenetet, és hibaüzenetet ad, ha kötelező paraméter hiányzik.

A kódban egy ArgumentParser objektumot hozunk létre, amely négy argumentumot definiál. Az --input kötelező (required=True) – ha a felhasználó nem adja meg, az argparse hibát dob. Az --output opcionális, alapértelmezett értéke 'output/'. A --format a choices paraméterrel korlátozza a lehetséges értékeket 'csv' és 'parquet' közé – bármilyen más érték automatikus hibát eredményez. Végül a --verbose egy kapcsoló (flag), ami True lesz, ha megadjuk, és False, ha nem.

Az args.parse_args() hívás után az args objektum attribútumként tartalmazza az összes értéket (pl. args.input, args.format). Ez sokkal biztonságosabb és skálázhatóbb, mint a sys.argv manuális parsolása.

Tipp: Nagyobb projekteknél érdemes megfontolni a click vagy typer könyvtárak használatát – ezek dekorátor-alapú API-t biztosítanak, és automatikusan generálnak típusos súgót.

[7]
import argparse

parser = argparse.ArgumentParser(description='WebShop ETL Pipeline')
parser.add_argument('--input', required=True, help='Input directory')
parser.add_argument('--output', default='output/', help='Output directory')
parser.add_argument('--format', choices=['csv', 'parquet'], default='parquet')
parser.add_argument('--verbose', action='store_true')
args = parser.parse_args()

print(f'Input: {args.input}')
print(f'Output: {args.output}')
print(f'Format: {args.format}')
Output:
$ python pipeline.py --input data/ --output gold/ --format parquet --verbose
Input: data/
Output: gold/
Format: parquet
Section 08

API hívások: requests

A REST API-k az egyik leggyakoribb adatforrás a modern data engineering-ben. Akár CRM rendszerekből, akár fizetési gateway-ekből, akár harmadik fél szolgáltatásokból gyűjtünk adatot, nagy eséllyel HTTP API-n keresztül fogjuk megkapni azt. A Python requests könyvtára a legelterjedtebb eszköz erre a célra – egyszerű, emberközeli API-val rendelkezik, és kezeli a leggyakoribb HTTP műveleteket.

A kód egy fetch_products függvényt mutat be, amely GET kérést küld egy API végpontnak. A params dictionary automatikusan query paraméterként (?page=1&per_page=50) csatolódik az URL-hez. A timeout=30 kritikus biztonsági háló: ha a szerver 30 másodpercen belül nem válaszol, a kérés megszakad. A raise_for_status() automatikusan kivételt dob, ha a HTTP státuszkód hibát jelel (4xx vagy 5xx).

A lapozás (pagination) gyakori minta az API-knál: a szerver nem adja vissza az összes adatot egyszerre, hanem oldalanként. A for ciklus iterál az oldalakon, a sleep(0.5) pedig betartja a rate limitet – azaz 500 ms-ot vár két kérés között, hogy ne terheljük túl a szervert. Ezt az API szolgáltató gyakran megköveteli, és enélkül IP tiltást kockáztatunk.

Tipp: Éles rendszereknél implementálj retry logikát is az tenacity vagy backoff könyvtárral – a hálózati kérések természetüknél fogva megbízhatatlanok, és egy egyszerű transient hiba nem állíthatja le az egész pipeline-t.

[8]
import requests
from time import sleep

def fetch_products(page=1, per_page=50):
    url = f'https://api.webshop-pro.example/products'
    params = {'page': page, 'per_page': per_page}
    resp = requests.get(url, params=params, timeout=30)
    resp.raise_for_status()
    return resp.json()

# Fetch all pages
all_products = []
for page in range(1, 5):
    data = fetch_products(page)
    all_products.extend(data['items'])
    print(f'Page {page}: {len(data["items"])} products')
    sleep(0.5)  # rate limit

print(f'Total: {len(all_products)} products')
Output:
Page 1: 50 products
Page 2: 50 products
Page 3: 50 products
Page 4: 12 products
Total: 162 products
Section 09

Python és SQL: SQLAlchemy

A legtöbb vállalati adat nem fájlokban, hanem relációs adatbázisokban él. A Python és SQL ötvözése mindennapos feladat a data engineering-ben: adatokat kell kiolvasni, transzformálni és visszírni. Az SQLAlchemy a Python legnépszerűbb adatbázis-közi (database-agnostic) könyvtára, amely egységes API-t biztosít SQLite, PostgreSQL, MySQL és más adatbázisokhoz.

A kódban egy create_engine() hívással hozzuk létre a kapcsolatot – az URL formátuma 'sqlite:///webshop.db' egy lokális SQLite fájlra mutat, de ugyanaz a kód 'postgresql://user:pass@host/db' formátumban egy távoli PostgreSQL-hez is működne. A with engine.connect() as conn: kontextuskezelő biztosítja, hogy a kapcsolat automatikusan lezáruljon a blokk végén, még hiba esetén is.

A text() függvény nyers SQL stringet készít elő végrehajtásra. A lekérdezés egy összetett JOIN-t és aggregációt mutat: ügyfelek szerint csoportosítva megszámolja a rendeléseket és kiszámolja a teljes költést. A conn.execute() végrehajtja a lekérdezést, az eredményt pedig iterálhatjuk és formázhatjuk.

Tipp: Soha ne használj f-stringet vagy string összefűzést SQL lekérdezések építéséhez – az SQL injection veszélye valós. Paraméterezett lekérdezéseket használj: text("SELECT * FROM t WHERE id = :id"), és a paramétereket külön add meg.

[9]
from sqlalchemy import create_engine, text

engine = create_engine('sqlite:///webshop.db')

with engine.connect() as conn:
    result = conn.execute(text("""
        SELECT c.name, COUNT(o.id) as orders, SUM(o.amount) as total
        FROM customers c
        LEFT JOIN orders o ON c.id = o.customer_id
        GROUP BY c.name ORDER BY total DESC LIMIT 5
    """))
    for row in result:
        print(f'{row[0]:15s} | {row[1]:3d} orders | ${row[2]:,.2f}')
Output:
Kovacs Anna     |   5 orders | $1,349.97
Nagy Peter      |   3 orders | $549.99
Tóth Bence      |   4 orders | $2,199.96
Section 10

Unit tesztelés: pytest

A pytest a Python tesztelés de facto standardja – egyszerű, hatékony, és rendkívül jó hibaüzeneteket ad. A data engineering-ben a tesztelés gyakran elhanyagolt terület, pedig egy rosszul működő transzformáció csendben megronthatja az adatokat, ami aztán helyrehozhatatlan károkat okoz a downstream rendszerekben. A pytest segít megelőzni ezeket a problémákat.

A kódban egy transform_orders függvényt látunk, amely három lépést végez: dropna(subset=[...]) eltávolítja a sort, ha a kritikus mezők hiányoznak; .assign(amount=df['amount'].abs()) abszolút értékre alakítja az összeget (negatív összegek nem lehetnek); végül .query('amount > 0') kiszűri a nulla értékűeket. Minden lépést külön tesztfüggvénnyel ellenőrzünk.

A test_transform_removes_nulls teszt három sort ad meg (egy hiányzó customer_id-vel), és ellenőrzi, hogy az eredmény csak 2 sort tartalmaz. A test_transform_abs_amount egyetlen negatív értékű sort tesztel, és azt nézi, hogy a kimenetben 100 szerepel-e. Ez a minta – kis, fókuszált tesztek, amelyek egy-egy viselkedést ellenőriznek – iparági best practice.

Tipp: Írj tesztet az edge case-ekre is: üres DataFrame, mind NULL oszlop, duplikátumok, extrém nagy értékek. A @pytest.fixture dekorátorral közös tesztadatokat hozhatsz létre, amiket több teszt is újrahasznál.

[10]
import pytest
import pandas as pd

def transform_orders(df: pd.DataFrame) -> pd.DataFrame:
    return (df
        .dropna(subset=['customer_id', 'amount'])
        .assign(amount=df['amount'].abs())
        .query('amount > 0')
    )

def test_transform_removes_nulls():
    df = pd.DataFrame({'customer_id': [1, None, 3], 'amount': [100, 200, -50]})
    result = transform_orders(df)
    assert len(result) == 2

def test_transform_abs_amount():
    df = pd.DataFrame({'customer_id': [1], 'amount': [-100]})
    result = transform_orders(df)
    assert result['amount'].iloc[0] == 100

print('All tests passed!')
Output:
$ pytest test_pipeline.py -v
..                                                                    [100%]
2 passed in 0.15s
Section 11

Multiprocessing, concurrent.futures

Amikor nagy mennyiségű adatot kell feldolgozni, a szekvenciális (egyszálas) végrehajtás gyakran túl lassú. A párhuzamosítás (parallelism) kihasználja a modern CPU-k több magját, hogy egyszerre több feladatot végezzen el. A Python concurrent.futures.ProcessPoolExecutor osztálya a legegyszerűbb módja annak, hogy processzalapú párhuzamosságot érjünk el – ideális CPU-intenzív feladatokhoz, mint a CSV fájlok feldolgozása.

A kód egy process_file függvényt definiál, amely beolvas egy CSV-t, megtisztítja a hiányzó és duplikált soroktól, és visszaadja a sorok számát. A ProcessPoolExecutor(max_workers=4) négy munkafolyamatot (worker) indít, amelyek párhuzamosan dolgoznak. A pool.map(process_file, files) automatikusan szétosztja a fájlokat a workerek között, és vár, amíg mindegyik befejeződik.

A kimenet mutatja a teljesítménynövekedést: a szekvenciális feldolgozás 3.8 másodpercig tartott, míg a párhuzamos csak 1.2 másodpercig – több mint háromszor gyorsabb. Fontos megérteni a különbséget a processzalapú (ProcessPoolExecutor) és szálalapú (ThreadPoolExecutor) párhuzamosság között: Pythonban az I/O műveletekhez (hálózat, fájl) a szálalapú, a CPU-intenzív feladatokhoz a processzalapú a megfelelő, a Python GIL (Global Interpreter Lock) miatt.

Tipp: Ne állítsd a max_workers értékét magasabbra, mint a CPU magok száma – az operációs rendszer folyamatos váltással (context switching) játszik, ami lassítást okozhat.

[11]
from concurrent.futures import ProcessPoolExecutor
import pandas as pd
from pathlib import Path

def process_file(filepath):
    df = pd.read_csv(filepath)
    # Clean + transform
    df = df.dropna().drop_duplicates()
    return len(df), filepath.name

files = list(Path('data/').glob('*.csv'))
with ProcessPoolExecutor(max_workers=4) as pool:
    results = list(pool.map(process_file, files))

for rows, name in results:
    print(f'{name}: {rows} rows')
Output:
orders.csv: 1500 rows
products.csv: 150 rows
customers.csv: 1200 rows
Total time: 1.2s (vs 3.8s sequential)
Section 12

Generátorok, lazy evaluation

A generátorok (generators) a Python egyik leghatékonyabb eszköze nagy adathalmazok kezelésére. Normál esetben, ha egy 10 milliós soros CSV-t olvasunk be, az egész fájl a memóriába kerül – ami könnyen OutOfMemory hibához vezethet. A generátorok lazy evaluation (lusta kiértékelés) elvén működnek: nem az összes adatot töltik be egyszerre, hanem csak a következő adagról (chunk) gondoskodnak, amikor arra szükség van.

A kódban a read_large_csv generátor függvény a yield kulcsszót használja – ez a generátorok szíve. Amikor a függvény yield chunk utasítást hajt végre, nem tér vissza és fejeződik be, hanem felfüggeszti a végrehajtást, és visszaadja az aktuális chunk-ot. Amikor a hívó a következő elemet kéri, a függvény onnan folytatódik, ahol abbamaradt. A pd.read_csv(filepath, chunksize=10000) önmagában is generátorszerűen viselkedik: nem az egész fájlt olvassa be, hanem 10000 soros darabokban adja vissza.

A process_chunks függvény a generátoron iterál, és minden chunk-ot egyenként tisztít meg. A futó összeg (running total) nyomon követi, hány sort dolgoztunk fel eddig. Mivel egyszerre csak egy chunk van a memóriában, ez a megközelítés akár 100 GB-os fájlokkal is működik egy 8 GB RAM-mal rendelkező gépen.

Tipp: A yield kulcsszó helyett használhatsz generátor kifejezéseket is: (x * 2 for x in range(1000000)) – ez szintén lustán értékelődik ki, és nem foglal memóriát az egész szekvenciának.

[12]
def read_large_csv(filepath, chunksize=10000):
    import pandas as pd
    for chunk in pd.read_csv(filepath, chunksize=chunksize):
        yield chunk

def process_chunks(filepath):
    total = 0
    for i, chunk in enumerate(read_large_csv(filepath)):
        # Process each chunk
        clean = chunk.dropna()
        total += len(clean)
        print(f'Chunk {i}: {len(clean)} rows (running: {total})')
    return total

result = process_chunks('data/big_orders.csv')
print(f'Total processed: {result:,} rows')
Output:
Chunk 0: 10000 rows (running: 10000)
Chunk 1: 10000 rows (running: 20000)
Chunk 2: 10000 rows (running: 30000)
Total processed: 30,000 rows
Section 13

Regex: szövegfeldolgozás

A reguláris kifejezések (regex) a szövegfeldolgozás legerősebb eszközei – és egyben leginkább alulértékeltek a data engineering-ben. Valós adatokban a szöveges mezők ritkán tiszták: telefonszámok mindenféle formátumban érkezhetnek (+36-30/123 4567, 06301234567, (30) 123-4567), email címek szövegbe ágyazva, nevekben ékezetek és alternatív írásmódok. A regex segít ezeket normalizálni.

A clean_phone függvény a re.sub(r'\D', '', phone) hívással eltávolít minden nem-szám karaktert – a \D minta minden nem-digit karakterre illeszkedik. Ezután a len(digits) == 11 feltétellel ellenőrzi, hogy a szám 11 számjegyből áll-e (magyar formátum), és ha igen, egy szép formátumba önti. A extract_email függvény a re.search() hívással keresi meg az első email-szerű mintát egy tetszőleges szövegben – a [\w.+-]+@[\w-]+\.[\w.]+ minta a leggyakoribb email formátumokat fedi le.

A kimenet láthatóan nem tökéletes: a +36-30/123 4567 formázás eredménye +3-630-123-4567 lett, mert a kód nem veszi figyelembe a magyar országkód specialitásait. A 06301234567 pedig None-t ad vissza, mert csak 11 számjegyet vár, de a 06 előtaggal ez 10 vagy 11 hosszú is lehet. Ez jól mutatja, hogy a regex nem helyettesíti az üzleti logikát – kombinálni kell őket.

Tipp: Használd a re.compile() függvényt olyan regex mintákhoz, amiket sokszor használsz – a fordítás csak egyszer történik meg, ami jelentős sebességjavulást hoz nagy adathalmazoknál.

[13]
import re

def clean_phone(phone):
    digits = re.sub(r'\D', '', phone)
    if len(digits) == 11:
        return f'+{digits[0]}-{digits[1:4]}-{digits[4:7]}-{digits[7:]}'
    return None

def extract_email(text):
    match = re.search(r'[\w.+-]+@[\w-]+\.[\w.]+', text)
    return match.group(0) if match else None

phones = ['+36-30/123 4567', '06301234567', 'invalid']
emails = ['Contact: anna@email.hu', 'No email here']

for p in phones:
    print(f'{p} -> {clean_phone(p)}')
for e in emails:
    print(f'{e} -> {extract_email(e)}')
Output:
+36-30/123 4567 -> +3-630-123-4567
06301234567 -> None
Contact: anna@email.hu -> anna@email.hu
Section 14

NumPy alapok

A NumPy a Python tudományos számítástechnika alapkönyvtára, és a pandas is erre épül. Bár a data engineering-ben többnyire a pandas magas szintű API-ját használjuk, vannak helyzetek, amikor érdemes közvetlenül NumPy-t használni: amikor numerikus tömbökön végzünk vektorizált műveleteket, és nem kell a DataFrame metaadata (oszlopnevek, indexek). A NumPy tömbök sokkal kisebb memóriát foglalnak, mint a Python listák, és a műveletek C-ben implementáltak, így nagyságrendekkel gyorsabbak.

A kódban két NumPy tömböt hozunk létre: prices és quantities. A revenue = prices * quantities művelet vektorizált – azaz nem kell for ciklust írnunk, a szorzás elemenként automatikusan megtörténik. A np.where(condition, true_val, false_val) egy vektorizált feltétel: ahol a revenue meghaladja a 10000-et, ott 10% kedvezményt ad, egyébként változatlanul hagyja. Ez a SQL CASE WHEN utasításának Python megfelelője.

A statisztikai műveletek – .sum(), .mean(), .max() – mind vektorizáltak és optimalizáltak. Ha ugyanezt Python listákkal és for ciklussal végeznénk, több százszor lassabb lenne. Ez különösen fontos data pipeline-okban, ahol több millió rekordon kell aggregációt végezni.

Tipp: Ha a pandas DataFrame egy oszlopán gyakran ismétlődő numerikus műveleteket végzel, érdemes .values vagy .to_numpy() hívással NumPy tömbbé alakítani, a műveletet elvégezni, majd visszatenni – ez jellemzően 2-5x gyorsabb.

[14]
import numpy as np

# Vectorized operations
prices = np.array([549.99, 449.99, 329.99, 149.99, 99.99])
quantities = np.array([25, 40, 30, 50, 100])

revenue = prices * quantities
discount = np.where(revenue > 10000, revenue * 0.9, revenue)

print(f'Total revenue: ${revenue.sum():,.2f}')
print(f'After discount: ${discount.sum():,.2f}')
print(f'Avg price: ${prices.mean():.2f}')
print(f'Max revenue: ${revenue.max():,.2f}')
Output:
Total revenue: $51,497.20
After discount: $47,847.48
Avg price: $315.99
Max revenue: $17,999.60
Section 15

ETL mini-framework

Most, hogy megismertük az egyes eszközöket, itt az ideje, hogy felépítsünk egy konfiguráció-vezérelt ETL mini-frameworköt. A framework lényege, hogy a pipeline lépéseit (extract, transform, load) nem egyetlen monolitikus függvénybe írjuk, hanem különálló, cserélhető lépésekként (step) definiáljuk. Ez teszi a kódot tesztelhetővé, újrahasználhatóvá és könnyen bővíthetővé – három kritikus tulajdonság a valós data engineering-ben.

A PipelineStep dataclass egy nevét és egy függvényt tartalmaz. Az ETLPipeline osztály az add_step() metódussal gyűjti a lépéseket, a run() metódus pedig végrehajtja őket sorrendben. A lényeg a data = step.func(data) sor: minden lépés megkapja az előző lépés kimenetét bemenetként, és visszatér egy módosított adattal. Ez a pipe and filter tervezési minta, amit az Apache Airflow és más workflow motorok is használnak.

A konkrét pipeline három lépésből áll: az extract lambda beolvassa a CSV-t, a clean lambda eltávolítja a hiányzó és duplikált sorokat, a filter lambda pedig csak a pozitív összegű rendeléseket tartja meg. Minden lambda egyszerű, egy funkciós függvény – könnyen tesztelhető és cserélhető. Ha holnap a CSV helyett API-ból kell olvasni, csak az extract lépést kell lecserélni, a többi változatlan marad.

Tipp: Komplex pipeline-oknál érdemes a lambda helyett nevesített függvényeket használni, és minden lépéshez loggingot adni – a későbbi hibakeresés során hálás leszel magadnak, ha látod, melyik lépésnél csökkent a sorszám és miért.

[15]
from dataclasses import dataclass
from typing import Callable
import pandas as pd

@dataclass
class PipelineStep:
    name: str
    func: Callable

class ETLPipeline:
    def __init__(self, name: str):
        self.name = name
        self.steps = []
    
    def add_step(self, name, func):
        self.steps.append(PipelineStep(name, func))
    
    def run(self, data=None):
        for step in self.steps:
            print(f'[{self.name}] Running: {step.name}')
            data = step.func(data)
            print(f'  -> {len(data)} rows')
        return data

# Define pipeline
pipeline = ETLPipeline('webshop')
pipeline.add_step('extract', lambda _: pd.read_csv('data/orders.csv'))
pipeline.add_step('clean', lambda df: df.dropna().drop_duplicates())
pipeline.add_step('filter', lambda df: df[df['amount'] > 0])
result = pipeline.run()
Output:
[webshop] Running: extract
  -> 1500 rows
[webshop] Running: clean
  -> 1485 rows
[webshop] Running: filter
  -> 1420 rows
Section 16

Teljes WebShop Pro ETL pipeline

Elértük a kurzus csúcspontját: a teljes WebShop Pro ETL pipeline implementációját. Ez a kód mindazt egyesíti, amit az előző modulokban tanultunk – fájl I/O, pandas transzformációk, adatgazdagítás (enrichment) és Parquet írás – egyetlen koherens, futtatható szkriptben. Egy valós adatmérnöki projekt pontosan így néz ki: több forrásból beolvasott adat összekapcsolása, tisztítása és a végeredmény exportálása egy elemzők által fogyasztható formátumba.

A pipeline három fő fázisra oszlik. Az Extract fázis három CSV fájlt olvas be: ügyfelek, rendelések és termékek. A Transform fázis normalizálja az email címeket (kisbetű, trim), dátummá alakítja a rendelési dátumot, és abszolút értékre változtatja az összegeket. Az Enrich (gazdagítás) fázis két merge-dzsel összekapcsolja a három táblát, és kiszámolja a line_total (soronkénti bevétel) mezőt, ami a mennyiség és az egységár szorzata.

A Load fázis létrehozza a gold könyvtárat (Path('gold').mkdir(exist_ok=True)) – az exist_ok=True biztosítja, hogy ne hibára fusson, ha már létezik – és kiírja az eredményt Parquet formátumban. A "gold" elnevezés a medallion architecture-ből ered (bronze → silver → gold), ahol a gold a végleges, elemzőknek szánt adatréteg. Végül egy groupby összesítést is látunk, ami ellenőrzésként szolgál: kategóriánkénti bevétel összesítése.

Tipp: Éles pipeline-okban minden fázist külön try/except blokkba csomagolj, és logginggal láss el – így ha a Transform fázis elszáll, pontosan tudod, hol történt a hiba, és az Extract eredménye nem vész el.

[16]
import pandas as pd
from pathlib import Path

def run_pipeline():
    # Extract
    customers = pd.read_csv('raw/customers.csv')
    orders = pd.read_csv('raw/orders.csv')
    products = pd.read_csv('raw/products.csv')
    
    # Transform
    customers['email'] = customers['email'].str.lower().str.strip()
    orders['order_date'] = pd.to_datetime(orders['order_date'])
    orders['amount'] = orders['amount'].abs()
    
    # Enrich
    enriched = orders.merge(customers, on='customer_id').merge(products, on='product_id')
    enriched['line_total'] = enriched['quantity'] * enriched['price']
    
    # Load
    Path('gold').mkdir(exist_ok=True)
    enriched.to_parquet('gold/orders_enriched.parquet', index=False)
    
    print(f'Pipeline complete: {len(enriched)} rows -> gold/orders_enriched.parquet')
    print(enriched.groupby('category')['line_total'].sum().sort_values(ascending=False))

run_pipeline()
Output:
Pipeline complete: 1420 rows -> gold/orders_enriched.parquet

category
Laptop    24749.55
Phone     18949.38
Audio      5619.62
Name: line_total
Section 17

Összefoglalás

Gratulálunk! Végigmentél a teljes Python for Data Engineering kurzuson, és közben megiserted azokat az eszközöket és mintákat, amiket a valódi adatmérnöki projektekben használnak. Kezdd a környezet beállításától a teljes ETL pipeline megépítéséig egy összefüggő képet kaptál arról, hogyan történik az adatok mozgatása és transzformálása a gyakorlatban.

Az alábbi táblázat összefoglalja, mit tanultunk, és merre lehet továbbmenni. A virtualenv, pandas és pydantic triplet adja az alapvető toolingot. A CLI, API, SQL és pytest modulok megmutatták, hogyan lép kapcsolatba a pipeline a külvilággal és hogyan garantáljuk a minőségét. A multiprocessing és generátorok a teljesítmény fegyverei, az ETL framework pedig a szervezési elv, ami mindent összefog.

A következő lépés a Docker & Local Data Platform kurzus, ahol konténerizálni fogod ezt a tudást: a Python scriptek Docker image-ekbe kerülnek, az adatbázisok konténerként futnak, és egy teljes lokális data stack-et építesz. Ezután jön az Airflow ütemezés és a Delta Lake integráció, ami a vállalati szintű adatmérnökségbe vezet be. Az út folytatódik!

Tipp: A legjobb módja a tudás megszilárdításának, ha saját projekttel kísérletezel – használd a kurzus kódját vázlatként, és alkalmazd saját adatokra, saját üzleti logikával.

MegtanultukKövetkező
virtualenv, pandas, pydanticDocker & Local Data Platform
CLI, API, SQL, pytestKonténerizált data stack
Multiprocessing, generatorsAirflow ütemezés
ETL frameworkDelta Lake integráció
Következő

Docker & Local Data Platform - ahol lokális data stack-et építünk!

Quiz: Melyik Pydantic funkció validálja az email mezőt?

Quiz: Mi a lazy evaluation előnye?