728x90
Overview
- 블록은 구성 저장을 활성화하고 외부 시스템과 상호 작용하기 위한 인터페이스를 제공하는 Prefect 내의 기본 요소입니다.
- Docker-Container 기반의 Infrastructure Block을 코드로 작성합니다.
- Infrastructure는 Deployments에 의해 생성된 Flow를 실행시키는 환경입니다.
- Process, Docker, KubernetesJob 등등이 있습니다.
- Docker 기반의 Infrastructure Block은 Local Storage 사용이 불가능하다.
- 추가적으로 Worker는 RemoteFileSystem을 이용한 동작이 불가능하므로 Agent를 이용합니다.
- Docker 이미지를 정의하지 않으면 Prefect에서 제공하는 기본 이미지를 사용합니다.
- Private Docker-Registry 를 사용하여 Infrastructure를 구성할 수 있습니다.
- Infrastructure는 Deployments에 의해 생성된 Flow를 실행시키는 환경입니다.
- RemoteFileSystem Storage Block을 코드를 작성합니다.
- Flow 코드가 저장되어 있는 스토리지로부터 Code 호출을 위해 사용됩니다.
- RemoteFileSystem으로는 MinIO 오브젝트 스토리지를 사용합니다.
- Deployment for Flow 코드를 작성합니다.
참고
- Infrastructure - Prefect Docs
- https://min.io/docs/minio/linux/operations/install-deploy-manage/deploy-minio-single-node-single-drive.html#minio-snsd
1. deployments.py 코드 작성
# deployments.py
# 작성한 Flow 코드를 import 합니다.
from flows.docker_insert_data_flow import docker_insert_data_flow
from flows.docker_retcode_alert_flow import docker_retcode_alert_flow
from prefect import variables
from prefect.deployments import Deployment
from prefect.infrastructure import DockerContainer
from prefect.filesystems import RemoteFileSystem
from prefect.server.schemas.schedules import CronSchedule
# Create RemoteFileSystem with MinIO
minio_server = variables.get('minio_server')
minio_port = variables.get('minio_port')
minio_bucket = variables.get('minio_bucket')
minio_block = RemoteFileSystem(
basepath="s3://" + minio_bucket,
settings={
"key": "username",
"secret": "password",
"client_kwargs": {"endpoint_url": "http://" + minio_server + ":" + minio_port},
},
)
uuid = minio_block.save("prefect-storage-block", overwrite=True)
print(uuid)
# Create Infrastructure with Docker-Container
infra_structure_block = DockerContainer(
type='docker-container',
name='prefect-docker-container',
# volumes=["/mnt/s3fs:/mnt/s3fs", "/etc/hosts:/etc/hosts"],
image='prefect-container',
image_pull_policy='IF_NOT_PRESENT',
env={"EXTRA_PIP_PACKAGES": 'psycopg2-binary s3fs pymysql pandas'},
auto_remove=True,
mem_limit='2g',
privileged=True)
uuid = infra_structure_block.save("prefect-infra-structure", overwrite=True)
print(uuid)
infra_overrides = {"env": {
"PREFECT_LOGGING_LEVEL": "DEBUG",
"EXTRA_PIP_PACKAGES": "psycopg2-binary s3fs pymysql pandas"}}
deployment = Deployment.build_from_flow(
flow=docker_retcode_alert_flow,
name="argos_deployment",
parameters={"host": "my_host", "port": "15432", "user": "username", "password": "password", "database": "tsdb"},
infra_overrides=infra_overrides,
infrastructure=infra_structure_block,
work_queue_name="MY_QUEUE",
work_pool_name="MY_POOL",
storage=minio_block,
apply=True,
schedule=(CronSchedule(cron="*/5 * * * *", timezone="Asia/Seoul")))
print(deployment.apply())
deployment = Deployment.build_from_flow(
flow=docker_insert_data_flow,
name="argos_deployment",
parameters={"host": "My_host", "port": "15432", "user": "postgres", "password": "postgres", "database": "tsdb"},
infra_overrides=infra_overrides,
infrastructure=infra_structure_block,
work_queue_name="PREFECT_QUEUE",
work_pool_name="ARGOS_POOL",
storage=minio_block,
apply=True,
schedule=(CronSchedule(cron="* * * * *", timezone="Asia/Seoul")))
# if __name__ == "__main__":
print(deployment.apply())
2. Deployments 적용
> python deployments.py
> prefect deployment build flows/docker_retcode_alert_flow.py:docker_retcode_alert_flow -n argos_deployment -q MY_QUEUE -p MY_POOL --infra docker-container -sb remote-file-system/prefect-storage-block --apply
> prefect deployment build flows/docker_insert_data_flow.py:docker_insert_data_flow -n argos_deployment -q MY_QUEUE -p MY_POOL --infra docker-container -sb remote-file-system/prefect-storage-block --apply
3. Agent 실행
> prefect agent start -p 'MY_POOL'
728x90
'Prefect' 카테고리의 다른 글
[Prefect] Server 로깅 설정 (0) | 2023.12.24 |
---|---|
[Prefect] SlackWebhook 사용법 (1) | 2023.12.20 |
[Prefect] Configuration (0) | 2023.05.22 |
[Prefect] Installation & Run (0) | 2023.05.22 |
[Prefect] Docker-Compose (0) | 2023.02.01 |