Echo system/Airflow

Airflow - Custom provider 생성

박쿠리 2024. 10. 9. 18:42

Provider 생성은 특정 외부 시스템이나 서비스와의 통합을 위한 기능을 제공하는 패키지를 만드는 과정이다. Provider는 Airflow가 외부 시스템과 연결할 수 있도록 Hook, Operator, Sensor 등의 구성 요소를 제공한다.

 

provider 를 만들기 위해 필요한 파일은 다음과 같다.

  • __init__.py 
  • hook.py
  • operator.py
  • setup.py 
  • pyproject.toml

 

프로젝트 생성

 

poetry 를 사용해 프로젝트를 생성한다.

poetry new airflow-sw-provider

*참고 : toml 파일을 직접 생성해서 사용하니 인식을 못하는 것 같아서 poetry 로 다시 생성했음

*참고 : provider 이름을 custom_sw_provider 로 생성하고 패키지 설치한 후 패키지 조회 시 아래와 같은 에러 발생.

패키지 명은 언더로 쓰면 안되는듯 (?)

ValueError: The package 'custom-sw-provider' from setuptools and custom_sw_provider do not match. Please make sure they are aligned
  •  

 

생성한 프로젝트 구조는 다음과 같다.

├── airflow-sw-provider
│   ├── airflow_sw_provider
│   │   ├── custom_hooks
│   │   │   ├── hook.py
│   │   │   └── __init__.py
│   │   ├── custom_operators
│   │   │   ├── __init__.py
│   │   │   └── operator.py
│   │   ├── custom_sensors
│   │   │   ├── __init__.py
│   │   │   └── sensor.py
│   │   └── __init__.py
│   ├── pyproject.toml
│   ├── README.md
│   └── tests
│       └── __init__.py

 

airflow_sw_provider/__init__.py

 

해당 함수는 airflow 에 provider 에 대한 정보를 제공한다.

def get_provider_info():
    return {
        "package-name": "airflow-sw-provider",
        "name": "sw_provider",
        "description": "Airflow provider for siwon",
        "versions": ["0.0.1"],
        "hook-class-names": ["airflow_sw_provider.custom_hooks.hook.CustomPostgresHook"],
        "connection-types": [
            {
                "hook-class-name": "airflow_sw_provider.custom_hooks.hook.CustomPostgresHook",
                "connection-type": "siwon",
            }
        ],
    }

 

pyproject.toml

[tool.poetry]
name = "airflow-sw-provider"
version = "0.1.0"
description = ""
authors = ["Your Name <you@example.com>"]
readme = "README.md"
packages = [{include = "airflow_sw_provider"}] # 패키지에 포함될 Python 모듈

[tool.poetry.dependencies]
# 프로젝트의 런타임 의존성을 정의하는 섹션
python = ">=3.8,<3.13"  # 지원하는 Python 버전 범위
apache-airflow = "^2.0" # apache-airflow 의존성 추가

[build-system]
# 빌드 시스템 요구사항을 지정하는 섹션
requires = ["poetry-core"]  # 필요한 빌드 시스템 (poetry-core 사용)
build-backend = "poetry.core.masonry.api"  # 사용할 빌드 백엔드

[tool.poetry.plugins."airflow_sw_provider"]
# Airflow provider 플러그인 설정
provider_info = "airflow_sw_provider.__init__:get_provider_info" # provider_info 함수의 위치

 

Provider 패키징

airflow-sw-provider 경로에서 wheel 파일을 생성한 후, whl 파일을 설치한다.

[airflow@airflow airflow-sw-provider]$ poetry build
[airflow@airflow dist]$ pip install dist/airflow_sw_provider-0.1.0-py3-none-any.whl

 

 

Airflow 를 재시작한다.

[root@airflow ~]# sudo systemctl restart airflow

 

provider 가 정상적으로 설치됐는지 확인한다.

 

[airflow@airflow include]$ airflow providers list | grep sw
airflow-sw-provider                      | Airflow provider for siwon                                                                      | 0.1.0

 


 

poetry build 해도 provider 인식 안되는 문제

 

toml 파일에서 airflow.providers 를 apache_airflow_provider 로 변경했더니 인식됨 

# 에러
[tool.poetry.plugins."airflow.providers"]
# Airflow provider 플러그인 설정
provider_info = "airflow_sw2_provider.__init__:get_provider_info" # provider_info 함수의 위치

# 정상
[tool.poetry.plugins."apache_airflow_provider"]
# Airflow provider 플러그인 설정
provider_info = "airflow_sw2_provider.__init__:get_provider_info" # provider_info 함수의 위치

 

 

Airflow 플러그인의 로딩 규칙

 

Airflow 는 entry point 시스템을 사용해서 플러그인 로드하고, providers_manager.py 를 통해 설치된 provider 를 탐색한다.

 

ProvidersManager

해당 클래스 통해 provider 를 로드한다. 플러그인을 등록하고 관리하는 역할이고 플러그인에 대한 정보를 수집하는데 사용된다.

entry_point.load() 를 통해 각 provider 를 로드하며, airflow 가 플러그인을 인식하고 관리한다.

클래스가 어떤 네임스페이스를 사용할지 결정하며, 기본적으로 apache_airflow_provider 를 사용한다. 다른 네임스페이스는 자동으로 인식되지 않을 수 있음 (?)

entry point name 은  airflow/providers_manager.py 파일에서 확인 가능

 

[airflow@airflow site-packages]$ cat /opt/airflow/.local/lib/python3.8/site-packages/airflow/providers_manager.py | grep apache
#   http://www.apache.org/licenses/LICENSE-2.0
    "apache-airflow-providers-celery": "2.1.0",
    if provider_package.startswith("apache-airflow"):
        provider_path = provider_package[len("apache-") :].replace("-", ".")
KNOWN_UNHANDLED_OPTIONAL_FEATURE_ERRORS = [("apache-airflow-providers-google", "No module named 'paramiko'")]
    For apache-airflow providers - it checks if it starts with appropriate package. For all providers
        The list of providers should be returned via the 'apache_airflow_provider'
        for entry_point, dist in entry_points_with_dist("apache_airflow_provider"):
                    package_name = "apache-airflow-providers" + folder[len(root_path) :].replace(os.sep, "-")

 

반응형