Echo system/Airflow

Airflow - Custom hook, operator, sensor 생성하기

박쿠리 2024. 9. 23. 19:20

airflow 에서 Custom Hook 을 생성하려면 BaseHook 을 사용한다.

 

Hook : 외부 시스템과 상호작용하는 커넥터 생성, 공통적으로 사용하는 연결 로직이나 작업을 재사용할 수 있도록 캡슐화한다.

Operator : Dag 내에서 실행되는 실제 작업을 정의한다. 다양한 Hook 을 사용해 외부 시스템과 상호작용하거나 데이터를 처리한다.

 

Custom Hook

from airflow.hooks.base import BaseHook
from airflow.exceptions import AirflowException
import psycopg2

# hook -> 외부 시스템과 상호작용하는 커넥터 만들기

# Basehook 상속 / 모든 Hook 클래스가 상속받는 기본 클래스, Airflow 의 연결 설정을 활용할 수 있는 기능들이 포함되어 있다.
# Connection 설정 
# 에러 처리

class CustomPostgresHook(BaseHook):

    def __init__(self, conn_id: str):
        super().__init__() 
        self.conn_id = conn_id
    #클래스를 상속받아 초기화할 때 부모 클래스의 메서드를 포함하고, 자식 클래스에서 추가적으로 설정이 필요한 부분을 정의할 때 사용된다.

    def get_conn(self): #get_connection 을 사용하여 연결 정보를 가져오고, psycopg2 로 PostgresSQL 에 연결한다.
        try:
            connection = self.get_connection(self.conn_id)  # Airflow에서 설정된 connection 가져오기
            conn = psycopg2.connect(
                host=connection.host,
                port=connection.port,
                user=connection.login,
                password=connection.password,
                dbname=connection.schema
            )
            return conn
        except Exception as e:
            raise AirflowException(f"Fail {str(e)}")

 

def __init__(self, conn_id: str)

  • 생성자 메서드로, 객체가 생성될 때 자동으로 호출된다. 객체의 초기 상태를 설정하는데 사용된다. conn_id 라는 매개변수를 입력받으면 str 타입이여야 하는 것을 명시한다.

super().__init__()

  • Basehook 의 부모 클래스를 호출해서 __init__ 메서드를 실행한다.

self.conn_id = conn_id

  • 입력받은 conn_id 값을 클래스 인스턴스의 conn_id 속성에 저장한다. 후속 메서드에서 conn_id 를 사용할 수 있다.

 

클래스를 상속받아 초기화할 때 부모 클래스의 메서드를 포함하고, 자식 클래스에서 추가적으로 설정이 필요한 부분을 정의할 때 사용된다.

 

 

Custom Operator

from airflow.models import BaseOperator
from airflow.exceptions import AirflowException
from typing import Any, Optional
from airflow_sw_provider.custom_hooks.hook import CustomPostgresHook  # CustomPostgresHook 가져오기

class CustomPostgresOperator(BaseOperator):

    def __init__(self, query: str, conn_id: str, params: Optional[Any] = None, *args, **kwargs):
        super().__init__(*args, **kwargs) #초기화된 인자들을 인스턴스 변수로 저장
        self.query = query
        self.conn_id = conn_id
        self.params = params #sql 쿼리에 전달할 매개변수 (optional)

    def execute(self, context):
        from include.custom_hooks.sw_hook import CustomPostgresHook  # CustomPostgresHook 가져오기
        
        hook = CustomPostgresHook(conn_id=self.conn_id)
        conn = hook.get_conn() #PostgreSQL DB 에 대한 연결 객체 가져오기
        cursor = conn.cursor() #DB 와 상호 작용하기 위해 커서 생성
        
        try:
            cursor.execute(self.query, self.params)  # SQL 쿼리 실행
            result = cursor.fetchall()  # 결과 가져오기
            conn.commit()
            return result
        except Exception as e:
            conn.rollback()
            raise AirflowException(f"Query execution failed: {str(e)}")
        finally:
            cursor.close()
            conn.close()  # 연결 닫기

 

Operator 에서는 Query 매개변수를 담기위해 params: Optional[Any] = None 을 추가해주었다.

 

Custom Operator 를 실행한 Dag 

import pendulum
from datetime import timedelta
from airflow.decorators import dag, task
from include.custom_operators.sw_op import CustomPostgresOperator

@dag(
    dag_id='chap_sw_op',
    schedule="@once",
    start_date=pendulum.datetime(2024, 9, 23, 18, tz=pendulum.timezone("Asia/Seoul")),
    catchup=False,
    tags=["operator"]
)
def sw_operator():
    query = "SELECT * FROM test.airflow_test;"

    # CustomPostgresOperator 실행
    run_query_task = CustomPostgresOperator(
        task_id='run_custom_query',
        conn_id='dd_sw_test_postgre',
        query=query
    )

sw_operator()

 

여기서 주의할 점은 Hook 과 Operator 를 생성한 경로를 알아야 가져와서 사용할 수 있다.

 

Custom Sensor

from airflow.sensors.base import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from include.custom_sw_provider.provider.custom_hooks.hook import CustomPostgresHook
from airflow.exceptions import AirflowException
from typing import Any, Optional

class CustomPostgresSensor(BaseSensorOperator):

    @apply_defaults
    def __init__(self, query: str, conn_id: str, params: Optional[Any] = None, poke_interval: int = 60, *args, **kwargs):
        super().__init__(poke_interval=poke_interval, *args, **kwargs)
        self.query = query
        self.conn_id = conn_id
        self.params = params

    def poke(self, context):
        hook = CustomPostgresHook(conn_id=self.conn_id)
        conn = hook.get_conn()
        cursor = conn.cursor()

        try:
            cursor.execute(self.query, self.params)
            result = cursor.fetchone()  # 결과 가져옴
            if result is not None:
                return True  # 결과가 존재하면 센서 종료
            return False  # 결과가 없으면 다시 감시
        except Exception as e:
            raise AirflowException(f"Sensor execution failed: {str(e)}")
        finally:
            cursor.close()
            conn.close()

 

쿼리 결과 유무로 poke 를 생성

반응형

'Echo system > Airflow' 카테고리의 다른 글

Airflow - Custom provider 생성  (1) 2024.10.09
Airflow - Custom 패키지 만들기  (0) 2024.09.23
Airflow - minio 연동  (0) 2024.09.07
Airflow - Apache Spark 연동  (2) 2024.09.07
Airflow - Kubernetes(k8s) 연동  (0) 2024.09.07