Echo system/Airflow

Airflow - Apache Spark 연동

박쿠리 2024. 9. 7. 19:42

Apache Spark 

대규모 데이터 처리 및 분석을 위한 오픈 소스 분산 컴퓨팅 프레임워크. 대용량 데이터를 빠르게 처리할 수 있도록 설계되었으며, 특히 메모리 기반 처리(In-memory processing)를 통해 Hadoop의 MapReduce보다 훨씬 빠른 성능을 제공한다.

YARN (Yet Another Resource Negotiator)

Apache Hadoop의 자원 관리 프레임워크로, 클러스터의 리소스를 효율적으로 관리하고 분배하는 역할을 한다. YARN은 클러스터 내에서 애플리케이션(작업)의 실행을 관리하며, 자원 할당 및 작업 스케줄링을 담당한다.

Apache Spark - YARN 관계성

Apache Spark 는 YARN 위에서 실행될 수 있으며, Spark 는 YARN을 자원 관리자로 사용해 분산된 클러스터에서 작업을 실행할 수 있다. Spark는 YARN의 자원 관리 기능을 활용해 클러스터 내 자원을 할당받고, 작업을 병렬로 처리한다.

Spark는 YARN의 ResourceManager와 NodeManager를 통해 자원 요청을 처리하며, 작업을 스케줄링하고 실행한다.

Spark와 YARN의 동작 흐름

  1. Spark 애플리케이션 제출: 사용자가 Spark 애플리케이션을 YARN 클러스터에 제출한다.
  2. YARN의 자원 할당: YARN의 ResourceManager는 Spark 애플리케이션을 실행할 자원을 할당한다.
  3. ApplicationMaster 생성: Spark는 YARN 위에서 실행되며, YARN은 애플리케이션 관리를 위한 ApplicationMaster를 생성한다.
  4. 컨테이너 생성 및 실행: YARN의 NodeManager는 컨테이너를 생성하고, 각 컨테이너에서 Spark 작업이 병렬로 실행된다.
  5. 결과 반환 및 애플리케이션 종료: 작업이 완료되면 Spark는 결과를 반환하고, YARN은 애플리케이션의 자원을 회수하여 다른 애플리케이션에 할당한다.

Airflow - Apache Spark 연동

Admin- connections 에서 클러스터를 연결

Spark 작업을 YARN 에서 실행하려면 yarn resource manager의 URL 을 설정해준다.

Connection Type

Airflow가 Spark 작업을 실행할 수 있도록 정의한다.

 

Host

Airflow는 Spark 작업을 YARN 클러스터에 제출할 때, YARN ResourceManager 를 통해 클러스터의 리소스를 관리한다. 이 때 호스트 정보가 필요하다.

 

Deploy mode

Spark 작업의 Driver 프로그램이 어디서 실행했는지를 나타낸다. 

  • Cluster mode
    • Remote - Driver 가 Yarn 클러스터의 노드 중 하나에서 실행된다. 즉 driver 가 클러스터 내부에서 원격(리모트)으로 실행되며, airflow 서버는 driver 를 직접 실행하지 않고 yarn 에 작업을 제출 = airflow 서버는 원격 클러스터에만 접근
  • Client mode
    • Local - Driver 프로그램이 Airflow 서버 또는 작업을 제출하는 노드(로컬)에서 실행된다. 즉 driver 가 airflow 서버에서 실행되고, yarn 은 executor만 클러스터에서 실행한다.

 

 

Spark 애플리케이션 파일

yarn 에서 실행할 spark.py 을 airflow 서버 디렉토리에 생성한다.

from pyspark.sql import SparkSession

# Spark 세션 생성
spark = SparkSession.builder \
    .appName("SparkSQLTest") \
    .getOrCreate()

# DataFrame 생성 (데이터가 간단할 때는 직접 데이터를 생성할 수 있음)
data = [("Alice", 27), ("Bob", 35), ("Charlie", 45), ("Diana", 29)]
columns = ["Name", "Age"]
df = spark.createDataFrame(data, columns)

# DataFrame을 테이블로 등록
df.createOrReplaceTempView("people")

# SQL 쿼리 실행
result = spark.sql("SELECT Name, Age FROM people WHERE Age > 30")

# 결과 출력
result.show()

# Spark 세션 종료
spark.stop()

 

Airflow Dag 를 생성

import pendulum
from airflow.decorators import dag, task
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator

@dag(
    dag_id = 'chap_sw_spark',
    schedule="@once",
    start_date=pendulum.datetime(2024,9,7,19, tz=pendulum.timezone("Asia/Seoul")),
    catchup=False,
    tags=["spark"]
)

def spark():
    spark_task = SparkSubmitOperator(
        task_id='spark_job',
        application='/opt/airflow/data/spark_sw.py', #yarn 에서 실행할 파일
        conn_id='dd-spark',
        verbose=True, #Spark 작업을 제출할 때 발생하는 상세 실행 로그를 Airflow 에서 볼 수 있다.
    )
    
    spark_task
spark()

 

 

*Error

An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.: java.lang.ExceptionInInitializerError

conn_id 값을 잘못 써서 없는 id 값을 불러오고있었음 ㅎㅎ

 

Airflow Logs 일부

Airflow UI - Logs 에서 Yarn - Application 이 생성된 것을 확인할 수 있다. 그 중 tracking URL 을 참고하면 yarn 에서 내가 실행한 결과의 로그를 확인할 수 있다.

INFO - 24/09/08 15:32:22 INFO Client: Application report for application_1725281056228_0035 (state: RUNNING)
INFO - Identified spark application id: application_1725281056228_0035
INFO - 24/09/08 15:32:23 INFO Client: Application report for application_1725281056228_0035 (state: FINISHED)
INFO - 24/09/08 15:32:23 INFO Client:
INFO - client token: N/A
INFO - diagnostics: N/A
INFO - ApplicationMaster host: hdw2.datalake.net
INFO - ApplicationMaster RPC port: 40603
INFO - queue: default
INFO - start time: 1725777121223
INFO - final status: SUCCEEDED
INFO - Identified spark application id: application_1725281056228_0035
INFO - tracking URL: http://hdm3.datalake.net:8088/proxy/application_1725281056228_0035/
INFO - user: airflow

 

Yarn log 일부

실행한 결과의 로그를 확인할 수 있다.

24/09/08 02:32:22 INFO DAGScheduler: Job 1 finished: showString at NativeMethodAccessorImpl.java:0, took 1.785546 s
24/09/08 02:32:22 INFO CodeGenerator: Code generated in 16.017245 ms
+-------+---+
|   Name|Age|
+-------+---+
|    Bob| 35|
|Charlie| 45|
+-------+---+

 


Airflow - K8S 위에 Apache Spark 연동

SparkKubernetesOperator 를 사용하면 Kubernetes 클러스터 위에서 Spark 작업(Spark job)을 구동할 수 있다.

이 오퍼레이터는 Apache Airflow에서 Spark 작업을 Kubernetes 클러스터에 배포하고, 해당 작업을 실행할 수 있도록 지원하는 기능을 제공한다.

 

패키지 설치

우선 spark 를 사용하기 위한 provider 를 설치해준다.

pip install apache-airflow-providers-cncf-kubernetes

 

k8s 클러스터에 spark operator 를 설치

설치 방법

  1. Helm 사용
    • Helm : k8s 애플리케이션을 배포하기 위한 패키지 관리 도구.
  2. kubectl 을 이용해 수동으로 yaml 파일 적용

 

Helm 사용

spark operator 가 설치될 네임스페이스를 지정한다. 여기에선 spark-operator 라고 설정한다.

spark 작업이 실행될 기본 네임스페이스를 설정한다. (default 네임스페이스 사용)

webhook 을 활성화해서 작업 제출 시 추가 기능을 사용할 수 있도록 설정한다.

#Spark Operator 차트를 사용하는 리포지토리를 추가
helm repo add spark-operator https://googlecloudplatform.github.io/spark-on-k8s-operator

#Spark operator 를 k8s 클러스터에 설치
helm install spark-operator spark-operator/spark-operator \
  --namespace spark-operator \
  --create-namespace \
  --set sparkJobNamespace=default \
  --set webhook.enable=true
  
#Spark operator 설치됐는지 확인
kubectl get pods -n spark-operator

 

Kubectl 을 사용해 직접 yaml 파일로 설치

#spark operator 의 yaml 파일을 다운로드한다.
kubectl apply -f https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/releases/download/v1beta2-1.3.3-3.1.1/spark-operator.yaml

#spark operator 를 실행할 네임스페이스를 생성한다.
kubectl create namespace spark-operator

#다운로드한 yaml 파일을 적용하여 spark operator 를 설치한다.
kubectl apply -f spark-operator.yaml -n spark-operator

#spark operator pod 가 정상적으로 실행되고 있는지 확인한다.
kubectl get pods -n spark-operator

 

spark operator 조회

*작업 환경에서는 이미 설치가 되어있으므로 조회만 해본다.

sparkapplications.sparkoperator.k8s.io 와 같은 CRD 가 출력되면 spark operator 가 설치 된 것이다.

kubectl get crds | grep sparkapplications
#CRD : Custom Resource Definition

#실행 결과
scheduledsparkapplications.sparkoperator.k8s.io       2024-08-29T13:00:12Z
sparkapplications.sparkoperator.k8s.io                2024-08-29T13:00:12Z

 

 

특정 Namespace 의 service account 생성

service account 는 k8s 에서 pod 나 애플리케이션이 API 서버에 접근할 때 인증을 처리하는 방법이다.

Airflow 에서 k8s 클러스터에 spark 작업을 실행해야하므로 필요한 작업이다.

k8s 클러스터에 있는 spark 작업이 필요한 권한을 가지고 실행될 수 있도록 서비스 어카운트를 설정한다.

  • k8s 에서 실행되는 pod 가 클러스터 내에서 API 를 호출하거나 다른 리소스에 접근할 때 권한이 필요

*참고사항 : Airflow 에서 k8s 에 간단한 DAG 만 실행하는 경우 ( KubernetesPodOperator 사용 )

  • k8s 는 각 네임스페이스에 기본적으로 default 서비스 어카운트를 제공한다. 해당 작업이 추가적인 권한이 필요 없다면 기본 serviceaccount 로 충분하다.
#serviceaccount 생성
kubectl create serviceaccount spark-operator-spark -n <namespace>

#클러스터 전체에서 리소스에 접근할 수 있는 권한이 필요하다면 ClusterRole 과 ClusterRoleBinding 을 사용한다.
kubectl create clusterrolebinding spark-operator-role \
  --clusterrole=edit \
  --serviceaccount=spark-operator:spark-operator-spark \
  --namespace=spark-operator

#ClusterRoleBinding 을 생성하는 명령어로, 특정 ClusterRole 을 serviceaccount 와 바인딩하여, 클러스터 내에서 권한을 할당한다.
#edit ClusterRole 은 주로 Pods, Services, Deployments 등 대부분의 리소스에 대해 생성, 수정, 삭제할 수 있는 권한을 부여한다.
#spark-operator 네임스페이스에 있는 spark-operator-spark 서비스 어카운트에 권한 부여
#serviceaccount 가 속한 네임스페이스를 지정한다.

 

해당 namespace 의 serviceaccount 를 조회해본다.

[siwon_park@linux ~]$ kubectl get serviceaccounts -n spark-operator
NAME                   SECRETS   AGE
default                0         9d
spark-operator         0         9d
spark-operator-spark   0         9d

#secrets 는 민감한 데이터(비번, 토큰, API 키 등) 을 관리하고 애플리케이션이 접근할 수 있도록 하는 k8s 리소스다.
#Base64 로 인코딩되어 k8s 클러스터 내부에서 저장되고, 필요한 Pod 가 이 값을 참조하거나 환경 변수 또는 볼륨으로 마운트하여 사용한다.

#age 는 해당 리소스가 생성된 이후로 경과된 시간을 의미한다.

 

SparkApplication Yaml 작성하기

 

Yaml 파일 작성 이유 

  • k8s 위에 spark 작업을 실행하려면 Spark Operator 와 함께 사용될 sparkapplication yaml 파일을 k8s 클러스터에 작성해야한다. 
  • Spark 작업을 k8s 에서 어떻게 실행할지 정의하고, spark driver, executor 의 설정, 이미지 정보, 리소스 할당, spark 애플리케이션 파라미터 등을 포함한다. 
  • Spark 는 분산 시스템이기 때문에 작업이 여러 노드에서 병렬로 수행된다. 자원을 관리하기 위해 yaml 파일을 작성한다.

Airflow 에서 SparkKubernetesOperator 을 사용할 때

  • 이 오퍼레이터는 spark 작업을 k8s 클러스터에 제출하기 위해 sparkapplication yaml 파일을 사용한다.
  • yaml 파일에 정의된 대로 Spark Driver 와 Executor Pod 가 k8s 에서 생성되고, 해당 spark 작업이 실행된다.

SparkApplicaiton.yaml

apiVersion: sparkoperator.k8s.io/v1beta2 #k8s에서 SparkOperator 의 API 버전을 지정
kind: SparkApplication #K8S 의 리소스 종류를 나타냄
metadata:
  name: spark-pi #Spark 작업의 이름을 지정
  namespace: spark-operator
spec:
  type: Scala # Scala 애플리케이션 실행 (JAR 파일로 실행)
  mode: cluster
  image: spark:3.5.0
  imagePullPolicy: IfNotPresent #이미지가 노드에 없으면 레지스트리에서 가져온다.
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.0.jar
  sparkVersion: 3.5.0
  driver:
    labels:  # labels 필드 추가
      version: 3.5.0  # 예시 레이블
    cores: 1
    coreLimit: 1000m
    memory: 512m
    serviceAccount: spark-operator-spark # 서비스 어카운트
  executor:
    labels:  # labels 필드 추가
      version: 3.5.0  # 예시 레이블  
    instances: 2 # executor의 수
    cores: 1 # cpu requests
    coreLimit: 1000m # cpu limit
    memory: 512m

docker hub 에서 사용할 image, 실행할 jar 파일, driver 와 executor spec, serivceAccount 를 입력해준다.

 

SparkKubernetesOperator 사용하기

import pendulum
from airflow.decorators import dag, task
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator

@dag(
    dag_id = 'chap_sw_spark',
    schedule="@once",
    start_date=pendulum.datetime(2024,9,7,19, tz=pendulum.timezone("Asia/Seoul")),
    catchup=False,
    tags=["spark"]
)

def sparkOnk8s():
    spark_task = SparkKubernetesOperator(
        task_id='spark_job',
        config_file='/opt/airflow/.kube/config', #k8s 과 airflow 를 연동하기 위한 config 파일
        in_cluster=False, #airflow 는 k8s 외부에서 실행되므로 false 를 줘야한다.
        namespace='spark-operator', #k8s 클러스터의 네임스페이스
        application_file='spark-sw.yaml', #spark 작업을 k8s 에서 어떻게 실행할지.. spark 관련 상세 정보를 포함한 yaml 파일 / Dag와 같은 경로에 있다면 상대 경로
    )
    spark_task
sparkOnk8s()

k8s 와 airflow 를 연동하기 위한 config 파일 경로, in_cluster, namespace, yaml 파일 정보를 입력해준다.

 

pod 가 성공적으로 도는 것을 확인할 수 있다.

[siwon_park@linux .kube]$ k get pods -n spark-operator
NAME                              READY   STATUS    RESTARTS   AGE
spark-job-jcvmtx3r-driver         1/1     Running   0          2s
spark-operator-7dcb44748b-ch4n5   1/1     Running   0          9d

 

 

Error

 

pod 정보를 확인해보면, configmap 을 못찾는다고 뜬다.

Events:
  Type     Reason       Age   From               Message
  ----     ------       ----  ----               -------
  Normal   Scheduled    16s   default-scheduler  Successfully assigned spark-operator/spark-job-735fqojx-driver to kw3.dd.io
  Warning  FailedMount  16s   kubelet            MountVolume.SetUp failed for volume "spark-conf-volume-driver" : configmap "spark-drv-2fe59291d17fd35d-conf-map" not found
  Normal   Pulling      15s   kubelet            Pulling image "bitnami/spark:3.5.2"
  Normal   Pulled       13s   kubelet            Successfully pulled image "bitnami/spark:3.5.2" in 1.619s (1.619s including waiting)
  Normal   Created      13s   kubelet            Created container spark-kubernetes-driver
  Normal   Started      13s   kubelet            Started container spark-kubernetes-driver
  
  
  
  [siwon_park@linux .kube]$ k get pods -n spark-operator
NAME                              READY   STATUS    RESTARTS   AGE
spark-job-735fqojx-driver         0/1     Error     0          51s
spark-operator-7dcb44748b-ch4n5   1/1     Running   0          9d

 

이미지 2가지 ( yaml - bitnami/spark:3.5.2, spark:3.5.0 ) 를 사용했을 때, 둘 다 해당 에러가 떴지만 첫번째는 계속 남아있다가 실패로 떨어졌고, 두번째는 성공으로 돌았다. ????

 

docker hub 들어가서

 

image - bitnami/spark:3.5.2

class - org.apache.spark.examples.SparkPi

ApplicationFile - local:///opt/bitnami/spark/examples/jars/spark-examples_2.12-3.5.2.jar

 

있는 것도 확인했는데 왜 안된 건지 모르겠음.

 

 

이미지에 대한 개념을 아직 이해 못해서 그런가

비트나미가 패키지 설치 라이브러리라 그런가? 

 

*ConfigMap 개념

  • k8s 에서 애플리케이션의 설정 데이터를 저장하고 관리하기 위한 객체다. ConfigMap을 사용하면 애플리케이션의 설정 파일이나 환경 변수를 별도로 저장하고, 이를 컨테이너에서 참조하여 사용할 수 있다.
  • Spark 작업이 자동으로 생성하고 참조하는 일시적인 리소스로 생성됐다가 작업이 끝나면 삭제되는 것을 볼 수 있다.
kubectl get configmap -n <namespace>

 

*image 

컨테이너 실행에 필요한 파일, 코드, 라이브러리, 환경 설정 등을 포함한 패키지이다.

이미지는 컨테이너화된 애플리케이션을 실행하는 데 필수적인 요소로, k8s 는 이미지를 사용해서 pod 내에 컨테이너를 생성하고 실행한다.

  • image tag
    • 특정 버전을 구분하기 위해 사용한다. ex) nignx:latest, nignx:1.18.0

 

 

반응형