Echo system/Airflow

Airflow - Custom 패키지 만들기

박쿠리 2024. 9. 23. 20:17

패키지를 만들기 위해 필요한 파일은 다음과 같다.

 

  • __init__.py  #내용 없이 파일만 생성해주었다.
    • Python 패키지를 만들 때 특정 디렉토리를 패키지로 인식시키기 위해 필요한 파일이다. 
    • 파일 존재 여부에 따라 해당 디렉토리가 python 패키지로 인식되며, 이로 인해 모듈 및 서브 패키지를 임포트할 수 있게 된다.
    • *Python 3.3 이후로는 __ini__.py 이 없어도 디렉토리가 패키지로 인식되지만, 여전히 명시적으로 패키지임을 나타내기 위해 사용하는 것이 일반적이라고 한다..
  • hook.py
  • operator.py
  • setup.py 

 

해당 파일들은 다음과 같은 트리 구조로 만들어준다.

참고로 src 경로 안에 있는 폴더 이름은 패키징할 이름과 같아야한다.

/opt/airflow/dags/include/custom_sw
├── setup.py
└── src
    └── sw_pk
	├── hook.py
	├── __init__.py
   	└── operator.py

굳이 src 경로 안에 안넣고 바로 패키지 디렉토리로 써도된다.

hook 과 operator 파일은 아래 링크에서 참고하면 된다.

Airflow - Custom hook, operator 생성하기 (tistory.com)

 

setup.py

from setuptools import setup, find_packages

setup(
    name="sw_pk",  # 패키지 이름을 sw_pk로 변경
    version="0.1.0",
    description="Hooks, operators for siwon",
    packages=find_packages(where="src"),  # src 경로에서 패키지를 찾도록 지정
    package_dir={"": "src"},  # src 폴더를 루트로 설정
    install_requires=[
        "apache-airflow",
        "requests",
    ],
)

 

가장 중요한 부분은 setuptools.setup 호출이며, 패키지에 대한 메타데이터를 전달한다.

 

setup.py 파일에서 중요한 값은 다음과 같다.

 

  • name : 패키지 이름 정의
  • version : 패키지 버전 번호
  • install_requires : 패키지에 필요한 종속 라이브러리 목록
  • package/package_dir : 설치 시 포함되어야 할 패키지와 패키지 위치를 setuptools 에 전달한다. 이 때 파이썬 패키지에 src 디렉터리 레이아웃을 사용한다.

 

선택적으로 추가할 값은 다음과 같다. (위 파일에는 굳이 넣지 않은 내용도 있다.)

  • author : 패키지 생성자
  • author_email : 생성자 정보
  • description : 패키지 관련 내용
  • url : 온라인에서 패키지 찾을 수 있는 위치
  • license : 패키지 코드를 배포할 때 적용하는 라이선스

 

패키지 설치

setup.py 과 같은 경로로 이동한 후, setup.py 를 참고하여 패키지를 설치한다.

python -m pip install .

 

출력 결과를 보면 성공적으로 설치됨을 확인할 수 있다.

 

출력 결과

Defaulting to user installation because normal site-packages is not writeable
Processing /home/airflow/dags/include/custom_sw
  Preparing metadata (setup.py) ... done
Requirement already satisfied: apache-airflow in /home/airflow/.local/lib/python3.8/site-packages (from sw_pk==0.1.0) (2.10.0)
Requirement already satisfied: requests in /home/airflow/.local/lib/python3.8/site-packages (from sw_pk==0.1.0) (2.31.0)
Requirement already satisfied: alembic<2.0,>=1.13.1 in /home/airflow/.local/lib/python3.8/site-packages (from apache-airflow->sw

....

Building wheels for collected packages: sw_pk
  Building wheel for sw_pk (setup.py) ... done
  Created wheel for sw_pk: filename=sw_pk-0.1.0-py3-none-any.whl size=3059 sha256=89696219579f5a867bee5a5ad7565e4d0e63c41e1fd5dc3c3d493eb4dd629015
  Stored in directory: /tmp/pip-ephem-wheel-cache-8mbzdljp/wheels/bb/79/85/8e84c51d6dedc952bbb8a0c47fe377355bec17e89858c8bca1
Successfully built sw_pk
Installing collected packages: sw_pk
  Attempting uninstall: sw_pk
    Found existing installation: sw_pk 0.1.0
    Uninstalling sw_pk-0.1.0:
      Successfully uninstalled sw_pk-0.1.0
Successfully installed sw_pk-0.1.0

 

Airflow 에서 패키지를 사용하려면, 설치한 후에 Airflow 웹 서버 및 스케줄러를 재시작해야 변경 사항이 반영된다.

[root@airflow ~]# sudo systemctl restart airflow

 

패키지 제대로 설치됐는지 확인

[airflow@airflow custom_sw]$ python
Python 3.8.13 (default, Aug 16 2022, 12:16:29)
[GCC 9.3.1 20200408 (Red Hat 9.3.1-2)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from sw_pk import hook, operator
>>> hook
<module 'sw_pk.hook' from '/opt/airflow/.local/lib/python3.8/site-packages/sw_pk/hook.py'>

 

문제 없이 설치된 것을 확인할 수 있다.

 

참고로 패키지를 설치하면 파일 구조는 다음과 같이 나타난다.

[airflow@airflow custom_sw]$ tree /opt/airflow/dags/include/custom_sw
/opt/airflow/dags/include/custom_sw
├── build
│   ├── bdist.linux-x86_64
│   └── lib
│       └── sw_pk
│           ├── hook.py
│           ├── __init__.py
│           └── operator.py
├── setup.py
└── src
    ├── sw_pk
    │   ├── hook.py
    │   ├── __init__.py
    │   └── operator.py
    └── sw_pk.egg-info
        ├── dependency_links.txt
        ├── PKG-INFO
        ├── requires.txt
        ├── SOURCES.txt
        └── top_level.txt

8 directories, 17 files

 

DAG 실행

기존에는 custom operator 경로를 직접 설정해야 사용할 수 있었지만 패키징을 한 후에는 패키지로부터 operator 를 부르면 된다.

 

from include.custom_operators.sw_op -> from sw_pk.operator

 

DAG

import pendulum
from datetime import timedelta
from airflow.decorators import dag, task
from sw_pk.operator import CustomPostgresOperator

@dag(
    dag_id='chap_sw_op',
    schedule="@once",
    start_date=pendulum.datetime(2024, 9, 23, 18, tz=pendulum.timezone("Asia/Seoul")),
    catchup=False,
    tags=["operator"]
)
def sw_operator():
    query = "SELECT * FROM test.airflow_test;"

    # CustomPostgresOperator 실행
    run_query_task = CustomPostgresOperator(
        task_id='run_custom_query',
        conn_id='dd_sw_test_postgre',
        query=query
    )

sw_operator()

 

Airflow Web UI 로 들어가 확인해보면 Operator 를 정상적으로 실행한 것을 확인할 수 있다.

 

Log

INFO - Retrieving connection 'dd_sw_test_postgre'
▼ Post task execution logs
INFO - Marking task as SUCCESS. dag_id=chap_sw_op, task_id=run_custom_query, run_id=scheduled__2024-09-23T09:00:00+00:00, execution_date=20240923T090000, start_date=20240923T111253, end_date=20240923T111254
INFO - Task instance in success state
INFO -  Previous state of the Task instance: running
INFO - Dag name:chap_sw_op queued_at:2024-09-23 11:12:51.351252+00:00
INFO - Task hostname:airflow.datalake.net operator:CustomPostgresOperator

 

 

반응형