패키지를 만들기 위해 필요한 파일은 다음과 같다.
- __init__.py #내용 없이 파일만 생성해주었다.
- Python 패키지를 만들 때 특정 디렉토리를 패키지로 인식시키기 위해 필요한 파일이다.
- 파일 존재 여부에 따라 해당 디렉토리가 python 패키지로 인식되며, 이로 인해 모듈 및 서브 패키지를 임포트할 수 있게 된다.
- *Python 3.3 이후로는 __ini__.py 이 없어도 디렉토리가 패키지로 인식되지만, 여전히 명시적으로 패키지임을 나타내기 위해 사용하는 것이 일반적이라고 한다..
- hook.py
- operator.py
- setup.py
해당 파일들은 다음과 같은 트리 구조로 만들어준다.
참고로 src 경로 안에 있는 폴더 이름은 패키징할 이름과 같아야한다.
/opt/airflow/dags/include/custom_sw
├── setup.py
└── src
└── sw_pk
├── hook.py
├── __init__.py
└── operator.py
굳이 src 경로 안에 안넣고 바로 패키지 디렉토리로 써도된다.
hook 과 operator 파일은 아래 링크에서 참고하면 된다.
Airflow - Custom hook, operator 생성하기 (tistory.com)
setup.py
from setuptools import setup, find_packages
setup(
name="sw_pk", # 패키지 이름을 sw_pk로 변경
version="0.1.0",
description="Hooks, operators for siwon",
packages=find_packages(where="src"), # src 경로에서 패키지를 찾도록 지정
package_dir={"": "src"}, # src 폴더를 루트로 설정
install_requires=[
"apache-airflow",
"requests",
],
)
가장 중요한 부분은 setuptools.setup 호출이며, 패키지에 대한 메타데이터를 전달한다.
setup.py 파일에서 중요한 값은 다음과 같다.
- name : 패키지 이름 정의
- version : 패키지 버전 번호
- install_requires : 패키지에 필요한 종속 라이브러리 목록
- package/package_dir : 설치 시 포함되어야 할 패키지와 패키지 위치를 setuptools 에 전달한다. 이 때 파이썬 패키지에 src 디렉터리 레이아웃을 사용한다.
선택적으로 추가할 값은 다음과 같다. (위 파일에는 굳이 넣지 않은 내용도 있다.)
- author : 패키지 생성자
- author_email : 생성자 정보
- description : 패키지 관련 내용
- url : 온라인에서 패키지 찾을 수 있는 위치
- license : 패키지 코드를 배포할 때 적용하는 라이선스
패키지 설치
setup.py 과 같은 경로로 이동한 후, setup.py 를 참고하여 패키지를 설치한다.
python -m pip install .
출력 결과를 보면 성공적으로 설치됨을 확인할 수 있다.
출력 결과
Defaulting to user installation because normal site-packages is not writeable Processing /home/airflow/dags/include/custom_sw Preparing metadata (setup.py) ... done Requirement already satisfied: apache-airflow in /home/airflow/.local/lib/python3.8/site-packages (from sw_pk==0.1.0) (2.10.0) Requirement already satisfied: requests in /home/airflow/.local/lib/python3.8/site-packages (from sw_pk==0.1.0) (2.31.0) Requirement already satisfied: alembic<2.0,>=1.13.1 in /home/airflow/.local/lib/python3.8/site-packages (from apache-airflow->sw .... Building wheels for collected packages: sw_pk Building wheel for sw_pk (setup.py) ... done Created wheel for sw_pk: filename=sw_pk-0.1.0-py3-none-any.whl size=3059 sha256=89696219579f5a867bee5a5ad7565e4d0e63c41e1fd5dc3c3d493eb4dd629015 Stored in directory: /tmp/pip-ephem-wheel-cache-8mbzdljp/wheels/bb/79/85/8e84c51d6dedc952bbb8a0c47fe377355bec17e89858c8bca1 Successfully built sw_pk Installing collected packages: sw_pk Attempting uninstall: sw_pk Found existing installation: sw_pk 0.1.0 Uninstalling sw_pk-0.1.0: Successfully uninstalled sw_pk-0.1.0 Successfully installed sw_pk-0.1.0 |
Airflow 에서 패키지를 사용하려면, 설치한 후에 Airflow 웹 서버 및 스케줄러를 재시작해야 변경 사항이 반영된다.
[root@airflow ~]# sudo systemctl restart airflow
패키지 제대로 설치됐는지 확인
[airflow@airflow custom_sw]$ python
Python 3.8.13 (default, Aug 16 2022, 12:16:29)
[GCC 9.3.1 20200408 (Red Hat 9.3.1-2)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from sw_pk import hook, operator
>>> hook
<module 'sw_pk.hook' from '/opt/airflow/.local/lib/python3.8/site-packages/sw_pk/hook.py'>
문제 없이 설치된 것을 확인할 수 있다.
참고로 패키지를 설치하면 파일 구조는 다음과 같이 나타난다.
[airflow@airflow custom_sw]$ tree /opt/airflow/dags/include/custom_sw
/opt/airflow/dags/include/custom_sw
├── build
│ ├── bdist.linux-x86_64
│ └── lib
│ └── sw_pk
│ ├── hook.py
│ ├── __init__.py
│ └── operator.py
├── setup.py
└── src
├── sw_pk
│ ├── hook.py
│ ├── __init__.py
│ └── operator.py
└── sw_pk.egg-info
├── dependency_links.txt
├── PKG-INFO
├── requires.txt
├── SOURCES.txt
└── top_level.txt
8 directories, 17 files
DAG 실행
기존에는 custom operator 경로를 직접 설정해야 사용할 수 있었지만 패키징을 한 후에는 패키지로부터 operator 를 부르면 된다.
from include.custom_operators.sw_op -> from sw_pk.operator
DAG
import pendulum
from datetime import timedelta
from airflow.decorators import dag, task
from sw_pk.operator import CustomPostgresOperator
@dag(
dag_id='chap_sw_op',
schedule="@once",
start_date=pendulum.datetime(2024, 9, 23, 18, tz=pendulum.timezone("Asia/Seoul")),
catchup=False,
tags=["operator"]
)
def sw_operator():
query = "SELECT * FROM test.airflow_test;"
# CustomPostgresOperator 실행
run_query_task = CustomPostgresOperator(
task_id='run_custom_query',
conn_id='dd_sw_test_postgre',
query=query
)
sw_operator()
Airflow Web UI 로 들어가 확인해보면 Operator 를 정상적으로 실행한 것을 확인할 수 있다.
Log
INFO - Retrieving connection 'dd_sw_test_postgre' ▼ Post task execution logs INFO - Marking task as SUCCESS. dag_id=chap_sw_op, task_id=run_custom_query, run_id=scheduled__2024-09-23T09:00:00+00:00, execution_date=20240923T090000, start_date=20240923T111253, end_date=20240923T111254 INFO - Task instance in success state INFO - Previous state of the Task instance: running INFO - Dag name:chap_sw_op queued_at:2024-09-23 11:12:51.351252+00:00 INFO - Task hostname:airflow.datalake.net operator:CustomPostgresOperator |
반응형
'Echo system > Airflow' 카테고리의 다른 글
Airflow received sigterm. terminating subprocesses (1) | 2025.01.18 |
---|---|
Airflow - Custom provider 생성 (1) | 2024.10.09 |
Airflow - Custom hook, operator, sensor 생성하기 (1) | 2024.09.23 |
Airflow - minio 연동 (0) | 2024.09.07 |
Airflow - Apache Spark 연동 (2) | 2024.09.07 |