1. Dag 파일 수정(.py)
2. Scheduler 가 감지 (mtime 기준)
3. queue 에 등록 (DagFileProcessorManager)
4. subprocess 가 파싱 시도 (dagbag.process_file)
5. DAG 객체 생성 (ex) dag=DAG(...))
6. DAG 객체를 DB 에 저장하기 위해 serializedDagModel.write_dag(dag) 호출
7. DAG 내부 구조를 JSON 으로 serialized
8. serialized_dag 테이블에 insert/update
9. web ui 에서 테이블 읽어와서 업데이트
Dag 파싱
Airflow 가 .py 파일 읽고 실행 후 DAG 객체를 메모리에 올림
-> dagbag.process_File() 가 처리 (메타 정보로 정리해서 저장)
Write_dag
파싱된 Dag 객체를 직렬화해서 DB 테이블에 저장하는 함수
SerializedDagModel.write_dag(dag) 함수
- 이 DAG이 직렬화가 필요한지 판단
- 필요하면 → JSON으로 직렬화해서 serialized_dag 테이블에 저장
- 그 판단 기준은 hash 값이 바뀌었는지 여부
환경
standalone 으로 실행
systemctl restart airflow 으로 모두 재시작
(airflow-scheduler.service, airflow-webserver.service X)
문제
- 기존 dag 가 수정되거나, 새 dag 가 추가돼도 serialized db 에는 반영안됨 (dagfileprocessor 가 생성된 dag 객체를 dagbag.dags 에 등록 X) -> 프로세스가 dag 감지 못함, 내부 스레드 동작 X
- scheduler log 에는 아무 정보 없음 -> write_dag () 호출하다가 실패? 자원 부족?
- airflow.service 재시작하면 반영 됨
- Airflow Scheduler가 실행 중일 때만 DAG을 감지하지 못하고 있을 뿐, 재시작하면 정상 작동하는 것
→ Scheduler의 DagFileProcessorManager가 죽었거나, queue 등록이 안 됐거나, 감지가 멈춘 상태
- Airflow Scheduler가 실행 중일 때만 DAG을 감지하지 못하고 있을 뿐, 재시작하면 정상 작동하는 것
DB에 write가 skip된 상황
- Airflow는 dag_hash가 같으면 serialized_dag을 다시 쓰지 않음
- 하지만 새 DAG인데도 안 써졌다면 → 해시 비교가 실패했거나, DB 트랜잭션에서 silent rollback
0. airflow scheduler 로그 확인
systemctl status airflow.service
1. log - queue 등록됐는지 확인
INFO - DAG file /opt/airflow/dags/my_dag.py added to processing queue
grep -i "Queuing DAG file" ~/airflow/logs/scheduler/latest/*.log
2. log - serialized 확인
grep -i "Serialized DAG" ~/airflow/logs/scheduler/latest/*.log | grep my_dag
3. 스케쥴러 자원 확인
ps -o pid,ppid,%cpu,%mem,etime,cmd -C python3 | grep scheduler
4. write_dag 함수 호출이 실패하는지 디버깅
def write_dag(
cls,
dag: DAG,
min_update_interval: int | None = None,
processor_subdir: str | None = None,
session: Session = NEW_SESSION,
) -> bool:
"""
Serialize a DAG and writes it into database.
If the record already exists, it checks if the Serialized DAG changed or not. If it is
changed, it updates the record, ignores otherwise.
:param dag: a DAG to be written into database
:param min_update_interval: minimal interval in seconds to update serialized DAG
:param session: ORM Session
:returns: Boolean indicating if the DAG was written to the DB
"""
log.debug("[siwon-write_dag] Called for DAG: %s", dag.dag_id)
# Checks if (Current Time - Time when the DAG was written to DB) < min_update_interval
# If Yes, does nothing
# If No or the DAG does not exists, updates / writes Serialized DAG to DB
if min_update_interval is not None:
log.debug("[siwon-write_dag] Checking min_update_interval for DAG: %s", dag.dag_id)
recently_updated = session.scalar(
select(literal(True)).where(
cls.dag_id == dag.dag_id,
(timezone.utcnow() - timedelta(seconds=min_update_interval)) < cls.last_updated,
)
)
if recently_updated:
log.debug("[siwon-write_dag] Skipped due to min_update_interval for DAG: %s", dag.dag_id)
return False
log.debug("Checking if DAG (%s) changed", dag.dag_id)
new_serialized_dag = cls(dag, processor_subdir)
serialized_dag_db = session.execute(
select(cls.dag_hash, cls.processor_subdir).where(cls.dag_id == dag.dag_id)
).first()
if serialized_dag_db is not None:
log.debug("[siwon-write_dag] Found exisiting DAG: %s", dag.dag_id)
log.debug("[siwon-write_dag] Old hash: %s", serialized_dag_db.dag_hash)
log.debug("[siwon-write_dag] New hash: %s", new_serialized_dag.dag_hash)
log.debug("[siwon-write_dag] Subdir: old=%s / new=%s", serialized_dag_db.processor_subdir, new_serialized_dag.processor_subdir)
if (
serialized_dag_db is not None
and serialized_dag_db.dag_hash == new_serialized_dag.dag_hash
and serialized_dag_db.processor_subdir == new_serialized_dag.processor_subdir
):
log.debug("Serialized DAG (%s) is unchanged. Skipping writing to DB", dag.dag_id)
return False
log.debug("Writing Serialized DAG: %s to the DB", dag.dag_id)
session.merge(new_serialized_dag)
log.debug("DAG: %s written to the DB", dag.dag_id)
return True
Airflow scheduler log
-> scheduler | permissionerror:[errno 13] permission denied:'/root/airflow/logs/dag_processor_manager', during handling of the above exception, another exception occurred:, traceback.., file"/usr/lib64/python3.9/multiprocessing/process.py", line 315, in _bootstrap, self.run()...reload_configuration_for_dag_processing(), file"/usr/local/lib/python3.9/site-packages/airflow/dag_processing/manager.py",line 1302, in reload_con>
Airflow 스케줄러는 내부적으로 여러 서브 프로세스를 띄워서 DAG을 파싱
그중 핵심은 DagFileProcessor, 그리고 그걸 관리하는 DagFileProcessorManager
스케줄러가 DAG 파서 로그 디렉토리 /root/airflow/logs/dag_processor_manager에
로그 파일을 생성하거나 접근하려 했는데 권한이 없어서 실패한 상황
즉, DAG을 파싱할 프로세스가 실행조차 못 하고 죽었고, 그래서 DAG 가 파싱 X
임시로 /root 안에 디렉토리 생성 및 gpadmin 권한 부여
환경변수 확인
'Echo system > Airflow' 카테고리의 다른 글
airflow 설정 (0) | 2025.04.20 |
---|---|
Airflow configuration (0) | 2025.04.20 |
Airflow received sigterm. terminating subprocesses (1) | 2025.01.18 |
Airflow - Custom provider 생성 (1) | 2024.10.09 |
Airflow - Custom 패키지 만들기 (0) | 2024.09.23 |