Programming/Python

PySpark - ETL 코드 흐름

박쿠리 2025. 9. 4. 15:13

PySpark - ETL 코드 흐름

 

# 데이터 읽기

### 데이터 읽기 extract 

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("ETLExample").getOrCreate()

# CSV 읽기
df_csv = spark.read.option("header", True).csv("/mnt/data/input/sample.csv")

# Parquet 읽기
df_parquet = spark.read.parquet("/mnt/data/input/sample.parquet")

# Delta 읽기
df_delta = spark.read.format("delta").load("/mnt/delta/sample_delta")

 

# 데이터 변환

### 데이터 변환 transform

from pyspark.sql.functions import col, when, to_date, count, avg

df_transformed = (
    df_csv
    .withColumn("date", to_date(col("timestamp"))) #날짜 컬럼 변환
    .withColumn("status", when(col("status") == "OK", 1).otherwise(0)) #상태값 가공
    .groupBy("user_id", "date") #집계
    .agg(
        count("*").alias("cnt"),
        avg("value").alias("avg_value")
    )
)

 

# 데이터 적재

### 데이터 적재 Load

# databricks 에선 Delta Lake 로 저장하는 패턴을 많이 쓴다.

from delta.tables import DeltaTable

# Delta 테이블이 이미 있을 경우 Merge(Upsert)
target_path = "/mnt/delta/user_metrics"

if DeltaTable.isDeltaTable(spark, target_path): # target_path 경로에 Delta 테이블 있는지 확인, 있으면 upsert 없으면 새로 저장 write 
    delta_table = DeltaTable.forPath(spark, target_path)  # 이미 있는 Delta 테이블을 가져와서 조작할 수 있는 객체 생성
    (
        delta_table.alias("t").merge(
            df_transformed.alias("s"),
            "t.user_id = s.user_id AND t.date = s.date"
        )
        .whenMatchedUpdateAll()
        .whenNotMatchedInsertAll()
        .execute()
    )
else:
    df_transformed.write.format("delta").mode("overwrite").save(target_path)
반응형

'Programming > Python' 카테고리의 다른 글

PySpark - DeltaTable 주요 method 정리  (0) 2025.09.04