PySpark - ETL 코드 흐름
# 데이터 읽기
### 데이터 읽기 extract
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("ETLExample").getOrCreate()
# CSV 읽기
df_csv = spark.read.option("header", True).csv("/mnt/data/input/sample.csv")
# Parquet 읽기
df_parquet = spark.read.parquet("/mnt/data/input/sample.parquet")
# Delta 읽기
df_delta = spark.read.format("delta").load("/mnt/delta/sample_delta")
# 데이터 변환
### 데이터 변환 transform
from pyspark.sql.functions import col, when, to_date, count, avg
df_transformed = (
df_csv
.withColumn("date", to_date(col("timestamp"))) #날짜 컬럼 변환
.withColumn("status", when(col("status") == "OK", 1).otherwise(0)) #상태값 가공
.groupBy("user_id", "date") #집계
.agg(
count("*").alias("cnt"),
avg("value").alias("avg_value")
)
)
# 데이터 적재
### 데이터 적재 Load
# databricks 에선 Delta Lake 로 저장하는 패턴을 많이 쓴다.
from delta.tables import DeltaTable
# Delta 테이블이 이미 있을 경우 Merge(Upsert)
target_path = "/mnt/delta/user_metrics"
if DeltaTable.isDeltaTable(spark, target_path): # target_path 경로에 Delta 테이블 있는지 확인, 있으면 upsert 없으면 새로 저장 write
delta_table = DeltaTable.forPath(spark, target_path) # 이미 있는 Delta 테이블을 가져와서 조작할 수 있는 객체 생성
(
delta_table.alias("t").merge(
df_transformed.alias("s"),
"t.user_id = s.user_id AND t.date = s.date"
)
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
else:
df_transformed.write.format("delta").mode("overwrite").save(target_path)반응형
'Programming > Python' 카테고리의 다른 글
| PySpark - DeltaTable 주요 method 정리 (0) | 2025.09.04 |
|---|