Echo system/Airflow

Airflow - Kubernetes(k8s) 연동

박쿠리 2024. 9. 7. 18:26

Kubernetes

컨테이너화된 애플리케이션의 자동 배포, 확장, 관리 등을 지원하는 오픈 소스 플랫폼이다. 구글에서 개발되었으며, 현재는 CNCF(Cloud Native Computing Foundation)에서 관리한다. 주로 Docker와 같은 컨테이너 런타임을 기반으로 애플리케이션을 실행한다.

*참고 링크 : Docker 에서 Airflow 실행 (tistory.com)

 

Kubernetes - 주요 기능

  • 컨테이너 오케스트레이션: 다수의 컨테이너를 클러스터 환경에서 자동으로 배포하고 관리할 수 있다.
  • 자동 스케일링: 트래픽 또는 리소스 사용량에 따라 컨테이너 수를 자동으로 늘리거나 줄인다.
  • 자가 복구: 장애가 발생한 컨테이너를 자동으로 다시 시작하거나 교체한다.
  • 로드 밸런싱: 트래픽을 여러 컨테이너로 분산시켜 고가용성을 보장한다.
  • 롤링 업데이트 및 롤백: 애플리케이션을 중단 없이 업데이트하고, 문제가 생기면 이전 버전으로 쉽게 롤백할 수 있다.

Kubernetes - 구성 요소

  • Pod: 하나 이상의 컨테이너가 포함된 기본 배포 단위.
  • Node: 컨테이너가 실행되는 물리적 또는 가상 서버.
  • Cluster: 여러 노드로 구성된 컨테이너 실행 환경.
  • Deployment: 애플리케이션의 배포와 관리를 담당하는 객체.
  • Service: Pod에 대한 네트워크 접근을 제공하며, 로드 밸런싱 역할을 한다.

Airflow - Kubernetes 연동

Config 파일 설정

Kubernetes cluster 의 config 파일을 airflow 서버로 복사한다. 

  • Airflow 가 k8s API 서버와 통신하기 위해 필요한 인증 정보와 클러스터 설정을 가져와야 하기 때문이다. 이를 통해 Airflow는 Kubernetes 클러스터에 작업을 제출하고, 배포 및 실행을 관리할 수 있다.
  • ~/.kube/config 파일을 복사한다.

*참고 : 운영에서는 k8s 서버에 직접 접근 못하고, Bastion 서버에서만 쓸 수 있게 해놓는다. 

*Bastion 서버 : 외부에서 프라이빗 서브넷에 있는 Kubernetes 클러스터에 접근할 수 있는 유일한 방법이다. 프라이빗 클러스터는 외부 네트워크와 연결이 차단되어 있기 때문에, Bastion 서버를 통해서만 SSH나 kubectl 등의 명령을 실행할 수 있다.

 

~/.kube/config -> Airflow 서버로 복사

apiVersion: v1
clusters:
- cluster:
    certificate-authority-data: ...
    server: https://IP:6443
  name: kubernetes
contexts:
- context:
    cluster: kubernetes
    user: kubernetes-admin
  name: kubernetes-admin@kubernetes
current-context: kubernetes-admin@kubernetes
kind: Config
preferences: {}
users:
- name: kubernetes-admin
  user:
    client-certificate-data: ...
    client-key-data: ...

패키지 설치

pip install apache-airflow-providers-cncf-kubernetes

 

Airflow DAG 실행

import pendulum
from airflow.decorators import dag, task
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator

@dag(
    dag_id = 'chap_sw_k8s',
    schedule="@once",
    start_date=pendulum.datetime(2024,9,7,19, tz=pendulum.timezone("Asia/Seoul")),
    catchup=False,
    tags=["k8s"]
)

def k8s():
    k8s = KubernetesPodOperator(
        task_id="k8s_op",
        image="busybox", #간단한 유틸리티 및 명령어를 포함
        #컨테이너화된 애플리케이션 또는 서비스를 포함하는 이미지. 주로 Docker 이미지 형태로 제공된다. 
        #이 이미지는 k8s 클러스터 내에서 실행된 Pod 안에서 컨테이너로 배포된다.
        namespace="airflow", #pod를 실행할 namespace 지정
        cmds=["echo", "Hello world"], #실행할 명령어
        config_file="/opt/airflow/.kube/config", #default 는 '~/.kube/config' 이다.
        in_cluster=False, #Airflow 가 k8s 클러스터 외부에서 실행 중이라면 False 로 설정해줘야한다. 내부 실행일 경우 default 는 True 이므로 설정안해줘도 된다.
        cluster_context=None, 
        #k8s 클러스터의 컨텍스트를 지정한다. k8s 의 kubeconfig 파일에는 여러 클러스터 정보가 포함될 수 있으며, 설정해주면 airflow 가 사용할 클러스터를 선택하게 된다.
        #None 일 경우, kubeconfig (/opt/airflow/.kube/config) 에서 설정된 current-context 를 사용한다.
        get_logs=True, #로그 출력 설정
    )

    k8s
k8s()

 

코드 설명

함수 k8s()

  • KubernetesPodOperator 를 사용해서 k8s 에서 Pod 를 생성하고 그 안에 Busybox 이미지를 사용한 컨테이너를 실행하도록 지시한다.
  • 사용할 image, namespace, config_file, 클러스터 옵션을 설정해준다.
  • 간단하게 'Hello world' 를 출력했다.
  • in_cluster=False 는 Airflow 가 k8s 클러스터 외부에서 실행 중일 때 설정해줘야 한다.
  • cluster_context=None 일 경우 config 에 설정된 current-context 를 사용한다.
  • get_logs=True 로 줬기 때문에, Airflow UI 의 로그에서 Pod 가 실행한 결과, 명령어 출력, 에러 메시지 등을 확인할 수 있다. false 로 줄 경우, KubernetesPodOperator는 Pod의 로그를 수집하지 않는다.

Log

k8s 의 pod name 과 busybox 이미지를 사용해 출력한 결과를 확인할 수 있다.

INFO - Building pod k8s-op-pflbyp2p with labels: {'dag_id': 'chap_sw_k8s', 'task_id': 'k8s_op', 'run_id': 'scheduled__2024-09-07T1000000000-7f8a7c1fa', 'kubernetes_pod_operator': 'True', 'try_number': '1'}
INFO - Retrieving connection 'kubernetes_default'
INFO - Found matching pod k8s-op-pflbyp2p with labels {'airflow_kpo_in_cluster': 'False', 'airflow_version': '2.10.0', 'dag_id': 'chap_sw_k8s', 'kubernetes_pod_operator': 'True', 'run_id': 'scheduled__2024-09-07T1000000000-7f8a7c1fa', 'task_id': 'k8s_op', 'try_number': '1'}
INFO - `try_number` of task_instance: 1
INFO - `try_number` of pod: 1
WARNING - Pod not yet started: k8s-op-pflbyp2p
INFO - [base] Hello world
INFO - Deleting pod: k8s-op-pflbyp2p

 

DAG 실행 시, k8s cluster pod 를 조회해보면 다음과 같이 뜬다.

[siwon_park@linux ~]$ k get pod -n airflow
NAME                                READY   STATUS                  RESTARTS         AGE
k8s-op-pflbyp2p                 0/1          ContainerCreating   0                         2s

 

Airflow - k8s Pod 설명

  • Pod name 
    • Airflow 에서 KubernetesPodOperator를 사용하여 Kubernetes에서 실행된 작업은, 기본적으로 자동으로 생성된 Pod 이름을 가지게 된다. 이 Pod 이름은 주로 **Airflow DAG의 task_id**와 연관되며, 작업을 실행할 때 task_id를 기반으로 Pod 이름이 생성된다.
  • Deleting Pod 
    • 로그에서 Deleting pod: k8s-op-pflbyp2p 를 확인할 수 있다. KubernetesPodOperator 는 기본적으로 작업이 완료된 후 Pod 를 삭제하도록 설정되어 있다. 작업이 끝난 후 불필요한 리소스를 해제하고, Kubernetes 클러스터에서 자원을 효율적으로 관리하기 위한 일반적인 동작이다. Pod 가 삭제돼도 작업 로그는 Airflow UI 에서 확인할 수 있다. 
    • 만약 Pod 를 삭제하지 않고 남겨두고 싶다면 KubernetesPodOperator 에서 is_delete_operator_pod=False 를 설정해주면 된다.
  • Pod 로그 확인 명령어
    • 'kubectl logs <pod_name> -n <namespace>' 를 사용한다. 
    • 이미 제거된 Pod 의 로그는 기본적으로 확인할 수 없으며, k8s 에서 로그 보관 기능을 사용하면, 삭제된 Pod 의 로그를 조회하거나 보관할 수 있다.
반응형