본문 바로가기

Prefect

[Prefect] Blocks for Infrastructure & RemoteFileSystem

728x90

Overview

  • 블록은 구성 저장을 활성화하고 외부 시스템과 상호 작용하기 위한 인터페이스를 제공하는 Prefect 내의 기본 요소입니다.
  • Docker-Container 기반의 Infrastructure Block을 코드로 작성합니다.
    • Infrastructure는 Deployments에 의해 생성된 Flow를 실행시키는 환경입니다.
      • Process, Docker, KubernetesJob 등등이 있습니다.
    • Docker 기반의 Infrastructure Block은 Local Storage 사용이 불가능하다.
      • 추가적으로 Worker는 RemoteFileSystem을 이용한 동작이 불가능하므로 Agent를 이용합니다.
    • Docker 이미지를 정의하지 않으면 Prefect에서 제공하는 기본 이미지를 사용합니다.
    • Private Docker-Registry 를 사용하여 Infrastructure를 구성할 수 있습니다.
  • RemoteFileSystem Storage Block을 코드를 작성합니다.
    • Flow 코드가 저장되어 있는 스토리지로부터 Code 호출을 위해 사용됩니다.
    • RemoteFileSystem으로는 MinIO 오브젝트 스토리지를 사용합니다.
  • Deployment for Flow 코드를 작성합니다.

참고

1. deployments.py 코드 작성

# deployments.py

# 작성한 Flow 코드를 import 합니다.
from flows.docker_insert_data_flow import docker_insert_data_flow
from flows.docker_retcode_alert_flow import docker_retcode_alert_flow

from prefect import variables
from prefect.deployments import Deployment
from prefect.infrastructure import DockerContainer
from prefect.filesystems import RemoteFileSystem
from prefect.server.schemas.schedules import CronSchedule

# Create RemoteFileSystem with MinIO
minio_server = variables.get('minio_server')
minio_port = variables.get('minio_port')
minio_bucket = variables.get('minio_bucket')
minio_block = RemoteFileSystem(
    basepath="s3://" + minio_bucket,
    settings={
        "key": "username",
        "secret": "password",
        "client_kwargs": {"endpoint_url": "http://" + minio_server + ":" + minio_port},
    },
)
uuid = minio_block.save("prefect-storage-block", overwrite=True)
print(uuid)

# Create Infrastructure with Docker-Container
infra_structure_block = DockerContainer(
    type='docker-container',
    name='prefect-docker-container',
    # volumes=["/mnt/s3fs:/mnt/s3fs", "/etc/hosts:/etc/hosts"],
    image='prefect-container',
    image_pull_policy='IF_NOT_PRESENT',
    env={"EXTRA_PIP_PACKAGES": 'psycopg2-binary s3fs pymysql pandas'},
    auto_remove=True,
    mem_limit='2g',
    privileged=True)
uuid = infra_structure_block.save("prefect-infra-structure", overwrite=True)
print(uuid)

infra_overrides = {"env": {
    "PREFECT_LOGGING_LEVEL": "DEBUG",
    "EXTRA_PIP_PACKAGES": "psycopg2-binary s3fs pymysql pandas"}}

deployment = Deployment.build_from_flow(
    flow=docker_retcode_alert_flow,
    name="argos_deployment",
    parameters={"host": "my_host", "port": "15432", "user": "username", "password": "password", "database": "tsdb"},
    infra_overrides=infra_overrides,
    infrastructure=infra_structure_block,
    work_queue_name="MY_QUEUE",
    work_pool_name="MY_POOL",
    storage=minio_block,
    apply=True,
    schedule=(CronSchedule(cron="*/5 * * * *", timezone="Asia/Seoul")))

print(deployment.apply())

deployment = Deployment.build_from_flow(
    flow=docker_insert_data_flow,
    name="argos_deployment",
    parameters={"host": "My_host", "port": "15432", "user": "postgres", "password": "postgres", "database": "tsdb"},
    infra_overrides=infra_overrides,
    infrastructure=infra_structure_block,
    work_queue_name="PREFECT_QUEUE",
    work_pool_name="ARGOS_POOL",
    storage=minio_block,
    apply=True,
    schedule=(CronSchedule(cron="* * * * *", timezone="Asia/Seoul")))

# if __name__ == "__main__":
print(deployment.apply())

 

2. Deployments 적용

> python deployments.py
> prefect deployment build flows/docker_retcode_alert_flow.py:docker_retcode_alert_flow -n argos_deployment -q MY_QUEUE -p MY_POOL --infra docker-container -sb remote-file-system/prefect-storage-block --apply
> prefect deployment build flows/docker_insert_data_flow.py:docker_insert_data_flow -n argos_deployment -q MY_QUEUE -p MY_POOL --infra docker-container -sb remote-file-system/prefect-storage-block --apply

 

3. Agent 실행

> prefect agent start -p 'MY_POOL'
728x90

'Prefect' 카테고리의 다른 글

[Prefect] Server 로깅 설정  (0) 2023.12.24
[Prefect] SlackWebhook 사용법  (1) 2023.12.20
[Prefect] Configuration  (0) 2023.05.22
[Prefect] Installation & Run  (0) 2023.05.22
[Prefect] Docker-Compose  (0) 2023.02.01