Provider 생성은 특정 외부 시스템이나 서비스와의 통합을 위한 기능을 제공하는 패키지를 만드는 과정이다. Provider는 Airflow가 외부 시스템과 연결할 수 있도록 Hook, Operator, Sensor 등의 구성 요소를 제공한다.
provider 를 만들기 위해 필요한 파일은 다음과 같다.
- __init__.py
- hook.py
- operator.py
- setup.py
- pyproject.toml
프로젝트 생성
poetry 를 사용해 프로젝트를 생성한다.
poetry new airflow-sw-provider
*참고 : toml 파일을 직접 생성해서 사용하니 인식을 못하는 것 같아서 poetry 로 다시 생성했음
*참고 : provider 이름을 custom_sw_provider 로 생성하고 패키지 설치한 후 패키지 조회 시 아래와 같은 에러 발생.
패키지 명은 언더로 쓰면 안되는듯 (?)
ValueError: The package 'custom-sw-provider' from setuptools and custom_sw_provider do not match. Please make sure they are aligned |
생성한 프로젝트 구조는 다음과 같다.
├── airflow-sw-provider
│ ├── airflow_sw_provider
│ │ ├── custom_hooks
│ │ │ ├── hook.py
│ │ │ └── __init__.py
│ │ ├── custom_operators
│ │ │ ├── __init__.py
│ │ │ └── operator.py
│ │ ├── custom_sensors
│ │ │ ├── __init__.py
│ │ │ └── sensor.py
│ │ └── __init__.py
│ ├── pyproject.toml
│ ├── README.md
│ └── tests
│ └── __init__.py
airflow_sw_provider/__init__.py
해당 함수는 airflow 에 provider 에 대한 정보를 제공한다.
def get_provider_info():
return {
"package-name": "airflow-sw-provider",
"name": "sw_provider",
"description": "Airflow provider for siwon",
"versions": ["0.0.1"],
"hook-class-names": ["airflow_sw_provider.custom_hooks.hook.CustomPostgresHook"],
"connection-types": [
{
"hook-class-name": "airflow_sw_provider.custom_hooks.hook.CustomPostgresHook",
"connection-type": "siwon",
}
],
}
pyproject.toml
[tool.poetry]
name = "airflow-sw-provider"
version = "0.1.0"
description = ""
authors = ["Your Name <you@example.com>"]
readme = "README.md"
packages = [{include = "airflow_sw_provider"}] # 패키지에 포함될 Python 모듈
[tool.poetry.dependencies]
# 프로젝트의 런타임 의존성을 정의하는 섹션
python = ">=3.8,<3.13" # 지원하는 Python 버전 범위
apache-airflow = "^2.0" # apache-airflow 의존성 추가
[build-system]
# 빌드 시스템 요구사항을 지정하는 섹션
requires = ["poetry-core"] # 필요한 빌드 시스템 (poetry-core 사용)
build-backend = "poetry.core.masonry.api" # 사용할 빌드 백엔드
[tool.poetry.plugins."airflow_sw_provider"]
# Airflow provider 플러그인 설정
provider_info = "airflow_sw_provider.__init__:get_provider_info" # provider_info 함수의 위치
Provider 패키징
airflow-sw-provider 경로에서 wheel 파일을 생성한 후, whl 파일을 설치한다.
[airflow@airflow airflow-sw-provider]$ poetry build
[airflow@airflow dist]$ pip install dist/airflow_sw_provider-0.1.0-py3-none-any.whl
Airflow 를 재시작한다.
[root@airflow ~]# sudo systemctl restart airflow
provider 가 정상적으로 설치됐는지 확인한다.
[airflow@airflow include]$ airflow providers list | grep sw
airflow-sw-provider | Airflow provider for siwon | 0.1.0
poetry build 해도 provider 인식 안되는 문제
toml 파일에서 airflow.providers 를 apache_airflow_provider 로 변경했더니 인식됨
# 에러
[tool.poetry.plugins."airflow.providers"]
# Airflow provider 플러그인 설정
provider_info = "airflow_sw2_provider.__init__:get_provider_info" # provider_info 함수의 위치
# 정상
[tool.poetry.plugins."apache_airflow_provider"]
# Airflow provider 플러그인 설정
provider_info = "airflow_sw2_provider.__init__:get_provider_info" # provider_info 함수의 위치
Airflow 플러그인의 로딩 규칙
Airflow 는 entry point 시스템을 사용해서 플러그인 로드하고, providers_manager.py 를 통해 설치된 provider 를 탐색한다.
ProvidersManager
해당 클래스 통해 provider 를 로드한다. 플러그인을 등록하고 관리하는 역할이고 플러그인에 대한 정보를 수집하는데 사용된다.
entry_point.load() 를 통해 각 provider 를 로드하며, airflow 가 플러그인을 인식하고 관리한다.
클래스가 어떤 네임스페이스를 사용할지 결정하며, 기본적으로 apache_airflow_provider 를 사용한다. 다른 네임스페이스는 자동으로 인식되지 않을 수 있음 (?)
entry point name 은 airflow/providers_manager.py 파일에서 확인 가능
[airflow@airflow site-packages]$ cat /opt/airflow/.local/lib/python3.8/site-packages/airflow/providers_manager.py | grep apache
# http://www.apache.org/licenses/LICENSE-2.0
"apache-airflow-providers-celery": "2.1.0",
if provider_package.startswith("apache-airflow"):
provider_path = provider_package[len("apache-") :].replace("-", ".")
KNOWN_UNHANDLED_OPTIONAL_FEATURE_ERRORS = [("apache-airflow-providers-google", "No module named 'paramiko'")]
For apache-airflow providers - it checks if it starts with appropriate package. For all providers
The list of providers should be returned via the 'apache_airflow_provider'
for entry_point, dist in entry_points_with_dist("apache_airflow_provider"):
package_name = "apache-airflow-providers" + folder[len(root_path) :].replace(os.sep, "-")
'Echo system > Airflow' 카테고리의 다른 글
dag parsing & write_dag (0) | 2025.04.18 |
---|---|
Airflow received sigterm. terminating subprocesses (1) | 2025.01.18 |
Airflow - Custom 패키지 만들기 (0) | 2024.09.23 |
Airflow - Custom hook, operator, sensor 생성하기 (1) | 2024.09.23 |
Airflow - minio 연동 (0) | 2024.09.07 |