Echo system/Airflow

Airflow DAG 개념 및 구조

박쿠리 2024. 8. 10. 11:46

DAG 구조

  • 전체 - 각 오퍼레이터는 하나의 태스크를 수행하고, 여러 개의 오퍼레이터가 airflow 의 워크플로 또는 dag을 구성한다.
  • DAG - 오퍼레이터 집합에 대한 실행을 오케트레이션(조정) 하는 역할을 한다. 
  • Task - 작업의 올바른 실행을 보장하기 위한 오퍼레이터의 wrapper 또는 manager 라고 본다.
  • 오퍼레이터 : 단일 작업 수행 역할을 한다.
    • ex) Bashoperator, pythonoperator, emailoperator, simpleHTTPoperator ...

 

 

예시

DAG 객체 인스턴스를 생성한다.

dag = DAG( #DAG 클래스는 두 개의 인수가 필요
		dag_id = "download_rocket_launches", #Airflow UI 에 표시되는 DAG 이름
        start_date=airflow.utils.dates.days_ago(14), #워크플로가 처음 실행되는 날짜/시간
        schedule_interval=None,
)

 

Bash 커맨드를 실행하기 위해 BashOperator 객체 인스턴스 생성한다.

download_launches=BashOperator(
	task_id="download_launches", #태스크 이름
    bash_command="curl -o /tmp/launches.json
    	'https://ll.thespacedevs.com/2.0.0/launch/upcoming'", #실행할 배시 커맨드
    dag=dag, #DAG 변수에 대한 참조
)

 

PythonOperator 를 사용한 파이썬 함수를 실행한다.

def _get_pictures():
	#디렉터리 확인
    pathlib.Path("/tmp/images").mkdir(parents=True, exist_ok = True) # 경로 없으면 디렉터리 생성
    #Download all pictures in launches.json
    with open("/tmp/launches.json") as f: # 이전 단계의 태스크 결과 확인
    	launches=
        
        ...
        
        
        
    get_pictures=PythonOperator( #파이썬 함수 호출을 위해 Pythonoperator 구체화
    	task_id="get_pictures",
        python_callable=_get_pictures, #실행할 파이썬 함수를 지정
        dag=dag,
	)

오퍼레이터 자신(get_pictures)을 정의해야한다.

python_callable 은 인수에 호출이 가능한 일반 함수(_get_pictures)를 가리킨다.

 

태스크 실행 순서 정의

  • 아래 순서로 태스크가 실행된다.
download_launches >> get_pictures >> notify

 

반응형