DeltaTable 주요 메서드 정리
1. 테이블 접근
from delta.tables import DeltaTable
# 경로 기반으로 불러오기
delta_table = DeltaTable.forPath(spark, "/mnt/delta/user_metrics")
# 테이블 이름 기반으로 불러오기
delta_table = DeltaTable.forName(spark, "default.user_metrics")
2. MERGE (업서트)
delta_table.alias("t").merge(
df_updates.alias("s"),
"t.user_id = s.user_id AND t.date = s.date"
).whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
3. Update
from pyspark.sql.functions import col, lit
delta_table.update(
condition=col("cnt") > 100,
set={"flag": lit(1)}
)
4. Delete
delta_table.delete(condition=col("date") < "2025-01-01")
5. Optimize & Z-Order
# 작은 파일 합치기 (compaction)
delta_table.optimize().executeCompaction()
# 특정 컬럼 기준으로 정렬 (Z-Order)
delta_table.optimize().executeZOrderBy("date")
6. Vacuum
delta_table.vacuum()
7. Time Travel
# 버전 기준 조회
df_old = spark.read.option("versionAsOf", 5).table("user_metrics")
# 타임스탬프 기준 조회
df_old = spark.read.option("timestampAsOf", "2025-09-01").table("user_metrics")
8. Change Data Feed
stream_df = spark.readStream \
.option("readChangeFeed", "true") \
.table("user_metrics")
간략 정리
| 기능 | 메서드 / 옵션 | 설명 |
| 테이블 불러오기 | forPath(spark, path), forName(spark, tableName) | Delta 테이블을 객체로 불러오기 |
| 업서트(Upsert) | .merge() | 조건 맞으면 업데이트, 없으면 인서트 |
| 조건 기반 수정 | .update(), .updateExpr() | 특정 조건 만족 시 값 변경 |
| 조건 기반 삭제 | .delete() | 특정 조건 만족 시 행 삭제 |
| 성능 최적화 | .optimize().executeCompaction() .optimize().executeZOrderBy(col) |
작은 파일 합치기(Compaction), 특정 컬럼 기준 정렬(Z-Order) |
| 파일 정리(Vacuum) | .vacuum() | 오래된 스냅샷 및 불필요 파일 정리 |
| Time Travel | versionAsOf, timestampAsOf 옵션 | 과거 버전/시점의 데이터 조회 |
| 변경 추적(CDF) | readChangeFeed=true 옵션 | Insert/Update/Delete 변경 이력 추적 |
반응형
'Programming > Python' 카테고리의 다른 글
| PySpark - ETL 코드 흐름 (0) | 2025.09.04 |
|---|