airflow 에서 Custom Hook 을 생성하려면 BaseHook 을 사용한다.
Hook : 외부 시스템과 상호작용하는 커넥터 생성, 공통적으로 사용하는 연결 로직이나 작업을 재사용할 수 있도록 캡슐화한다.
Operator : Dag 내에서 실행되는 실제 작업을 정의한다. 다양한 Hook 을 사용해 외부 시스템과 상호작용하거나 데이터를 처리한다.
Custom Hook
from airflow.hooks.base import BaseHook
from airflow.exceptions import AirflowException
import psycopg2
# hook -> 외부 시스템과 상호작용하는 커넥터 만들기
# Basehook 상속 / 모든 Hook 클래스가 상속받는 기본 클래스, Airflow 의 연결 설정을 활용할 수 있는 기능들이 포함되어 있다.
# Connection 설정
# 에러 처리
class CustomPostgresHook(BaseHook):
def __init__(self, conn_id: str):
super().__init__()
self.conn_id = conn_id
#클래스를 상속받아 초기화할 때 부모 클래스의 메서드를 포함하고, 자식 클래스에서 추가적으로 설정이 필요한 부분을 정의할 때 사용된다.
def get_conn(self): #get_connection 을 사용하여 연결 정보를 가져오고, psycopg2 로 PostgresSQL 에 연결한다.
try:
connection = self.get_connection(self.conn_id) # Airflow에서 설정된 connection 가져오기
conn = psycopg2.connect(
host=connection.host,
port=connection.port,
user=connection.login,
password=connection.password,
dbname=connection.schema
)
return conn
except Exception as e:
raise AirflowException(f"Fail {str(e)}")
def __init__(self, conn_id: str)
- 생성자 메서드로, 객체가 생성될 때 자동으로 호출된다. 객체의 초기 상태를 설정하는데 사용된다. conn_id 라는 매개변수를 입력받으면 str 타입이여야 하는 것을 명시한다.
super().__init__()
- Basehook 의 부모 클래스를 호출해서 __init__ 메서드를 실행한다.
self.conn_id = conn_id
- 입력받은 conn_id 값을 클래스 인스턴스의 conn_id 속성에 저장한다. 후속 메서드에서 conn_id 를 사용할 수 있다.
클래스를 상속받아 초기화할 때 부모 클래스의 메서드를 포함하고, 자식 클래스에서 추가적으로 설정이 필요한 부분을 정의할 때 사용된다.
Custom Operator
from airflow.models import BaseOperator
from airflow.exceptions import AirflowException
from typing import Any, Optional
from airflow_sw_provider.custom_hooks.hook import CustomPostgresHook # CustomPostgresHook 가져오기
class CustomPostgresOperator(BaseOperator):
def __init__(self, query: str, conn_id: str, params: Optional[Any] = None, *args, **kwargs):
super().__init__(*args, **kwargs) #초기화된 인자들을 인스턴스 변수로 저장
self.query = query
self.conn_id = conn_id
self.params = params #sql 쿼리에 전달할 매개변수 (optional)
def execute(self, context):
from include.custom_hooks.sw_hook import CustomPostgresHook # CustomPostgresHook 가져오기
hook = CustomPostgresHook(conn_id=self.conn_id)
conn = hook.get_conn() #PostgreSQL DB 에 대한 연결 객체 가져오기
cursor = conn.cursor() #DB 와 상호 작용하기 위해 커서 생성
try:
cursor.execute(self.query, self.params) # SQL 쿼리 실행
result = cursor.fetchall() # 결과 가져오기
conn.commit()
return result
except Exception as e:
conn.rollback()
raise AirflowException(f"Query execution failed: {str(e)}")
finally:
cursor.close()
conn.close() # 연결 닫기
Operator 에서는 Query 매개변수를 담기위해 params: Optional[Any] = None 을 추가해주었다.
Custom Operator 를 실행한 Dag
import pendulum
from datetime import timedelta
from airflow.decorators import dag, task
from include.custom_operators.sw_op import CustomPostgresOperator
@dag(
dag_id='chap_sw_op',
schedule="@once",
start_date=pendulum.datetime(2024, 9, 23, 18, tz=pendulum.timezone("Asia/Seoul")),
catchup=False,
tags=["operator"]
)
def sw_operator():
query = "SELECT * FROM test.airflow_test;"
# CustomPostgresOperator 실행
run_query_task = CustomPostgresOperator(
task_id='run_custom_query',
conn_id='dd_sw_test_postgre',
query=query
)
sw_operator()
여기서 주의할 점은 Hook 과 Operator 를 생성한 경로를 알아야 가져와서 사용할 수 있다.
Custom Sensor
from airflow.sensors.base import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from include.custom_sw_provider.provider.custom_hooks.hook import CustomPostgresHook
from airflow.exceptions import AirflowException
from typing import Any, Optional
class CustomPostgresSensor(BaseSensorOperator):
@apply_defaults
def __init__(self, query: str, conn_id: str, params: Optional[Any] = None, poke_interval: int = 60, *args, **kwargs):
super().__init__(poke_interval=poke_interval, *args, **kwargs)
self.query = query
self.conn_id = conn_id
self.params = params
def poke(self, context):
hook = CustomPostgresHook(conn_id=self.conn_id)
conn = hook.get_conn()
cursor = conn.cursor()
try:
cursor.execute(self.query, self.params)
result = cursor.fetchone() # 결과 가져옴
if result is not None:
return True # 결과가 존재하면 센서 종료
return False # 결과가 없으면 다시 감시
except Exception as e:
raise AirflowException(f"Sensor execution failed: {str(e)}")
finally:
cursor.close()
conn.close()
쿼리 결과 유무로 poke 를 생성
반응형
'Echo system > Airflow' 카테고리의 다른 글
Airflow - Custom provider 생성 (1) | 2024.10.09 |
---|---|
Airflow - Custom 패키지 만들기 (0) | 2024.09.23 |
Airflow - minio 연동 (0) | 2024.09.07 |
Airflow - Apache Spark 연동 (2) | 2024.09.07 |
Airflow - Kubernetes(k8s) 연동 (0) | 2024.09.07 |