Programming/Python

PySpark - DeltaTable 주요 method 정리

박쿠리 2025. 9. 4. 16:11

DeltaTable 주요 메서드 정리

 

1. 테이블 접근

from delta.tables import DeltaTable

# 경로 기반으로 불러오기
delta_table = DeltaTable.forPath(spark, "/mnt/delta/user_metrics")

# 테이블 이름 기반으로 불러오기
delta_table = DeltaTable.forName(spark, "default.user_metrics")

 

2. MERGE (업서트)

delta_table.alias("t").merge(
    df_updates.alias("s"),
    "t.user_id = s.user_id AND t.date = s.date"
).whenMatchedUpdateAll() \
 .whenNotMatchedInsertAll() \
 .execute()

 

3. Update

from pyspark.sql.functions import col, lit

delta_table.update(
    condition=col("cnt") > 100,
    set={"flag": lit(1)}
)

 

4. Delete

delta_table.delete(condition=col("date") < "2025-01-01")

 

5. Optimize & Z-Order

# 작은 파일 합치기 (compaction)
delta_table.optimize().executeCompaction()

# 특정 컬럼 기준으로 정렬 (Z-Order)
delta_table.optimize().executeZOrderBy("date")

 

6. Vacuum

delta_table.vacuum()

 

7. Time Travel

# 버전 기준 조회
df_old = spark.read.option("versionAsOf", 5).table("user_metrics")

# 타임스탬프 기준 조회
df_old = spark.read.option("timestampAsOf", "2025-09-01").table("user_metrics")

 

8. Change Data Feed

stream_df = spark.readStream \
    .option("readChangeFeed", "true") \
    .table("user_metrics")

 

 

간략 정리

 

기능 메서드 / 옵션 설명
테이블 불러오기 forPath(spark, path), forName(spark, tableName) Delta 테이블을 객체로 불러오기
업서트(Upsert) .merge() 조건 맞으면 업데이트, 없으면 인서트
조건 기반 수정 .update(), .updateExpr() 특정 조건 만족 시 값 변경
조건 기반 삭제 .delete() 특정 조건 만족 시 행 삭제
성능 최적화 .optimize().executeCompaction()
.optimize().executeZOrderBy(col)
작은 파일 합치기(Compaction), 특정 컬럼 기준 정렬(Z-Order)
파일 정리(Vacuum) .vacuum() 오래된 스냅샷 및 불필요 파일 정리
Time Travel versionAsOf, timestampAsOf 옵션 과거 버전/시점의 데이터 조회
변경 추적(CDF) readChangeFeed=true 옵션 Insert/Update/Delete 변경 이력 추적
반응형

'Programming > Python' 카테고리의 다른 글

PySpark - ETL 코드 흐름  (0) 2025.09.04