Echo system/Airflow

dag parsing & write_dag

박쿠리 2025. 4. 18. 00:32

1. Dag 파일 수정(.py)

2. Scheduler 가 감지 (mtime 기준)

3. queue 에 등록 (DagFileProcessorManager)

4. subprocess 가 파싱 시도 (dagbag.process_file)

5. DAG 객체 생성 (ex) dag=DAG(...))

6. DAG 객체를 DB 에 저장하기 위해 serializedDagModel.write_dag(dag) 호출

7. DAG 내부 구조를 JSON 으로 serialized

8. serialized_dag 테이블에 insert/update

9. web ui 에서 테이블 읽어와서 업데이트

 

Dag 파싱

Airflow 가 .py 파일 읽고 실행 후 DAG 객체를 메모리에 올림

-> dagbag.process_File() 가 처리 (메타 정보로 정리해서 저장)

 

Write_dag

파싱된 Dag 객체를 직렬화해서 DB 테이블에 저장하는 함수

SerializedDagModel.write_dag(dag) 함수

  1. 이 DAG이 직렬화가 필요한지 판단
  2. 필요하면 → JSON으로 직렬화해서 serialized_dag 테이블에 저장
  3. 그 판단 기준은 hash 값이 바뀌었는지 여부

환경

standalone 으로 실행

systemctl restart airflow 으로 모두 재시작

(airflow-scheduler.service, airflow-webserver.service X)

 

문제

  • 기존 dag 가 수정되거나, 새 dag 가 추가돼도 serialized db 에는 반영안됨 (dagfileprocessor 가 생성된 dag 객체를 dagbag.dags 에 등록 X) -> 프로세스가 dag 감지 못함, 내부 스레드 동작 X
  • scheduler log 에는 아무 정보 없음 -> write_dag () 호출하다가 실패? 자원 부족?
  • airflow.service 재시작하면 반영 됨
    • Airflow Scheduler가 실행 중일 때만 DAG을 감지하지 못하고 있을 뿐, 재시작하면 정상 작동하는 것
      → Scheduler의 DagFileProcessorManager가 죽었거나, queue 등록이 안 됐거나, 감지가 멈춘 상태

 

DB에 write가 skip된 상황

  • Airflow는 dag_hash가 같으면 serialized_dag을 다시 쓰지 않음
  • 하지만 새 DAG인데도 안 써졌다면 → 해시 비교가 실패했거나, DB 트랜잭션에서 silent rollback

 

0. airflow scheduler 로그 확인

systemctl status airflow.service

 

1. log - queue 등록됐는지 확인

INFO - DAG file /opt/airflow/dags/my_dag.py added to processing queue

grep -i "Queuing DAG file" ~/airflow/logs/scheduler/latest/*.log

 

2. log - serialized 확인

grep -i "Serialized DAG" ~/airflow/logs/scheduler/latest/*.log | grep my_dag

 

3. 스케쥴러 자원 확인

ps -o pid,ppid,%cpu,%mem,etime,cmd -C python3 | grep scheduler

 

4. write_dag 함수 호출이 실패하는지 디버깅

 

    def write_dag(
        cls,
        dag: DAG, 
        min_update_interval: int | None = None,
        processor_subdir: str | None = None,
        session: Session = NEW_SESSION, 
    ) -> bool:
        """ 
        Serialize a DAG and writes it into database.

        If the record already exists, it checks if the Serialized DAG changed or not. If it is
        changed, it updates the record, ignores otherwise.

        :param dag: a DAG to be written into database
        :param min_update_interval: minimal interval in seconds to update serialized DAG
        :param session: ORM Session

        :returns: Boolean indicating if the DAG was written to the DB
        """
        log.debug("[siwon-write_dag] Called for DAG: %s", dag.dag_id)

        # Checks if (Current Time - Time when the DAG was written to DB) < min_update_interval
        # If Yes, does nothing 
        # If No or the DAG does not exists, updates / writes Serialized DAG to DB
        if min_update_interval is not None:
            log.debug("[siwon-write_dag] Checking min_update_interval for DAG: %s", dag.dag_id)
            recently_updated = session.scalar(
                select(literal(True)).where(
                    cls.dag_id == dag.dag_id,
                    (timezone.utcnow() - timedelta(seconds=min_update_interval)) < cls.last_updated,
                )
            )
            if recently_updated:
                log.debug("[siwon-write_dag] Skipped due to min_update_interval for DAG: %s", dag.dag_id)
                return False

        log.debug("Checking if DAG (%s) changed", dag.dag_id)
        new_serialized_dag = cls(dag, processor_subdir)
    
        serialized_dag_db = session.execute(
            select(cls.dag_hash, cls.processor_subdir).where(cls.dag_id == dag.dag_id)
        ).first()
      
        if serialized_dag_db is not None: 
            log.debug("[siwon-write_dag] Found exisiting DAG: %s", dag.dag_id)
            log.debug("[siwon-write_dag] Old hash: %s", serialized_dag_db.dag_hash)
            log.debug("[siwon-write_dag] New hash: %s", new_serialized_dag.dag_hash)
            log.debug("[siwon-write_dag] Subdir: old=%s / new=%s", serialized_dag_db.processor_subdir, new_serialized_dag.processor_subdir)
        if (
            serialized_dag_db is not None
            and serialized_dag_db.dag_hash == new_serialized_dag.dag_hash
            and serialized_dag_db.processor_subdir == new_serialized_dag.processor_subdir
        ):
            log.debug("Serialized DAG (%s) is unchanged. Skipping writing to DB", dag.dag_id)
            return False

        log.debug("Writing Serialized DAG: %s to the DB", dag.dag_id)
        session.merge(new_serialized_dag)
        log.debug("DAG: %s written to the DB", dag.dag_id)
        return True

 


 

Airflow scheduler log

-> scheduler | permissionerror:[errno 13] permission denied:'/root/airflow/logs/dag_processor_manager', during handling of the above exception, another exception occurred:, traceback.., file"/usr/lib64/python3.9/multiprocessing/process.py", line 315, in _bootstrap, self.run()...reload_configuration_for_dag_processing(), file"/usr/local/lib/python3.9/site-packages/airflow/dag_processing/manager.py",line 1302, in reload_con> 

 

Airflow 스케줄러는 내부적으로 여러 서브 프로세스를 띄워서 DAG을 파싱
그중 핵심은 DagFileProcessor, 그리고 그걸 관리하는 DagFileProcessorManager

 

스케줄러가 DAG 파서 로그 디렉토리 /root/airflow/logs/dag_processor_manager에

로그 파일을 생성하거나 접근하려 했는데 권한이 없어서 실패한 상황

 

즉, DAG을 파싱할 프로세스가 실행조차 못 하고 죽었고, 그래서 DAG 가 파싱 X

 

임시로 /root 안에 디렉토리 생성 및 gpadmin 권한 부여

 


 

환경변수 확인

반응형

'Echo system > Airflow' 카테고리의 다른 글

airflow 설정  (0) 2025.04.20
Airflow configuration  (0) 2025.04.20
Airflow received sigterm. terminating subprocesses  (1) 2025.01.18
Airflow - Custom provider 생성  (1) 2024.10.09
Airflow - Custom 패키지 만들기  (0) 2024.09.23