Echo system/Airflow
Airflow DAG 개념 및 구조
박쿠리
2024. 8. 10. 11:46
DAG 구조
- 전체 - 각 오퍼레이터는 하나의 태스크를 수행하고, 여러 개의 오퍼레이터가 airflow 의 워크플로 또는 dag을 구성한다.
- DAG - 오퍼레이터 집합에 대한 실행을 오케트레이션(조정) 하는 역할을 한다.
- Task - 작업의 올바른 실행을 보장하기 위한 오퍼레이터의 wrapper 또는 manager 라고 본다.
- 오퍼레이터 : 단일 작업 수행 역할을 한다.
- ex) Bashoperator, pythonoperator, emailoperator, simpleHTTPoperator ...
예시
DAG 객체 인스턴스를 생성한다.
dag = DAG( #DAG 클래스는 두 개의 인수가 필요
dag_id = "download_rocket_launches", #Airflow UI 에 표시되는 DAG 이름
start_date=airflow.utils.dates.days_ago(14), #워크플로가 처음 실행되는 날짜/시간
schedule_interval=None,
)
Bash 커맨드를 실행하기 위해 BashOperator 객체 인스턴스 생성한다.
download_launches=BashOperator(
task_id="download_launches", #태스크 이름
bash_command="curl -o /tmp/launches.json
'https://ll.thespacedevs.com/2.0.0/launch/upcoming'", #실행할 배시 커맨드
dag=dag, #DAG 변수에 대한 참조
)
PythonOperator 를 사용한 파이썬 함수를 실행한다.
def _get_pictures():
#디렉터리 확인
pathlib.Path("/tmp/images").mkdir(parents=True, exist_ok = True) # 경로 없으면 디렉터리 생성
#Download all pictures in launches.json
with open("/tmp/launches.json") as f: # 이전 단계의 태스크 결과 확인
launches=
...
get_pictures=PythonOperator( #파이썬 함수 호출을 위해 Pythonoperator 구체화
task_id="get_pictures",
python_callable=_get_pictures, #실행할 파이썬 함수를 지정
dag=dag,
)
오퍼레이터 자신(get_pictures)을 정의해야한다.
python_callable 은 인수에 호출이 가능한 일반 함수(_get_pictures)를 가리킨다.
태스크 실행 순서 정의
- 아래 순서로 태스크가 실행된다.
download_launches >> get_pictures >> notify
반응형