Kafka는 현재 세계에서 가장 많이 쓰이는 이벤트 기반의 스트리밍 메세지 플랫폼 중 하나일 것입니다. 그만큼 안정성이나 성능에 있어서 어느정도 보장이 된다는 의미일텐데요, 기존의 Kafka는 Zookeeper라는 분산 애플리케이션 코디네이터에 의존적인 시스템이었습니다. Kafka와 Zookeeper를 분리 구축하여 각각 3대 이상씩을 운영해야 했었는데, 무엇보다 중요한건 Kafka의 메타데이터를 외부 시스템 (Zookeeper)에서 관리한다는 것입니다. 이 말은 네트워크 성능에 따른 지연과 메타데이터 불일치 이슈가 종종 발생한다는 것인데 이러한 구조를 개선하기 위해 나온 것이 Kraft 모드입니다. 향후 Kafka 에서는 Kraft 모드만 지원할 예정이라고 해서 한번 구축을 해보려고 합니다. 설치 및 구성의 편의성을 위해 Confluent Kafka 커뮤니터 버전을 이용합니다.
설치 참고 : https://docs.confluent.io/platform/current/installation/installing_cp/zip-tar.html#prerequisites
Install Confluent Platform using ZIP and TAR Archives | Confluent Documentation
Start Confluent Platform by using Kafka CLI commands. Tip In ZooKeeper mode, ZooKeeper must be started first. Kafka, and Schema Registry must be started in this order, and must be started after ZooKeeper, if you are using it, and before any other component
docs.confluent.io
설정 참고 : https://docs.confluent.io/platform/current/kafka/deployment.html#
Running Kafka in Production with Confluent Platform | Confluent Documentation
If you’ve been following the normal development path, you’ve probably been playing with Apache Kafka® on your laptop or on a small cluster of machines. But when it comes time to deploying Kafka to production, there are a few recommendations that you s
docs.confluent.io
Kraft Overview : https://docs.confluent.io/platform/current/kafka-metadata/kraft.html#
KRaft Overview | Confluent Documentation
Moving to KRaft for metadata storage simplifies Kafka. ZooKeeper is a separate system, with its own configuration file syntax, management tools, and deployment patterns, which makes deploying Kafka more complicated for system administrators. Specifically:
docs.confluent.io
서버 구성
- OS : Rocky Linux 8.9
- Java : 11
Install Confluent Kafka
# Update package & install curl
> dnf -y update
> dnf -y install curl
# Install Confluent kafka
> curl -O https://packages.confluent.io/archive/7.6/confluent-community-7.6.0.tar.gz
> tar xzf confluent-community-7.6.0.tar.gz
> mv confluent-7.6.0 /usr/local/kafka
Configuration Kafka
# Create directory
> mkdir -p /usr/local/kafka/logs
> mkdir -p /stg/kafka_storage
# Configuraion server.properties
> vim /usr/local/kafka/etc/kafka/kraft/server.properties
############################# Server Basics #############################
# The role of this server. Setting this puts us in KRaft mode
process.roles=broker,controller
# The node id associated with this instance's roles
node.id=1
# The connect string for the controller quorum
controller.quorum.voters=1@172.30.1.180:19093
############################# Socket Server Settings #############################
# The address the socket server listens on.
# Combined nodes (i.e. those with `process.roles=broker,controller`) must list the controller listener here at a minimum.
# If the broker listener is not defined, the default listener will use a host name that is equal to the value of java.net.InetAddress.getCanonicalHostName(),
# with PLAINTEXT listener name, and port 19092.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:19092
listeners=PLAINTEXT://:19092,CONTROLLER://:19093
# Name of listener used for communication between brokers.
inter.broker.listener.name=PLAINTEXT
# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
advertised.listeners=PLAINTEXT://172.30.1.180:19092
# A comma-separated list of the names of the listeners used by the controller.
# If no explicit mapping set in `listener.security.protocol.map`, default will be using PLAINTEXT protocol
# This is required if running in KRaft mode.
controller.listener.names=CONTROLLER
# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3
# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
############################# Log Basics #############################
# A comma separated list of directories under which to store log files
log.dirs=/stg/kafka_storage
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
############################# Log Flush Policy #############################
# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
# 1. Durability: Unflushed data may be lost if you are not using replication.
# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.
# The number of messages to accept before forcing a flush of data to disk
log.flush.interval.messages=10000
# The maximum amount of time a message can sit in a log before we force a flush
log.flush.interval.ms=1000
############################# Log Retention Policy #############################
# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.
# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168
# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000
############################# Group Coordinator Settings #############################
# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0
# Create Cluster ID
> /usr/local/kafka/bin/kafka-storage random-uuid
4fcyFt-PRnC42TmmVEgmZg
> /usr/local/kafka/bin/kafka-storage format -t 4fcyFt-PRnC42TmmVEgmZg -c /usr/local/kafka/etc/kafka/kraft/server.properties
Formatting /stg/kafka_storage with metadata.version 3.6-IV2.
Configuration Systemd & Run
# Systemd
> /etc/systemd/system/multi-user.target.wants/kafka.service
[Unit]
Description=confluent-kafka
After=network.target network-online.target
[Service]
Type=forked
User=root
Group=root
SyslogIdentifier=kafka
WorkingDirectory=/usr/local/kafka
TimeoutStopSec=600
Restart=always
RestartSec=3s
ExecStart=/usr/local/kafka/bin/kafka-server-start /usr/local/kafka/etc/kafka/kraft/server.properties
ExecStop=/usr/local/kafka/bin/kafka-server-stop
Environment="KAFKA_HEAP_OPTS=-Xmx1G -Xms1G -XX:MetaspaceSize=96m -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80"
Environment="LOG_DIR=/usr/local/kafka/logs"
SuccessExitStatus=143
# Other directives omitted
# (file size)
LimitFSIZE=infinity
# (cpu time)
LimitCPU=infinity
# (virtual memory size)
LimitAS=infinity
# (locked-in-memory size)
LimitMEMLOCK=infinity
# (open files)
LimitNOFILE=524288
# (processes/threads)
LimitNPROC=64000
[Install]
WantedBy=multi-user.target
# Run
> systemctl daemon-reload
> systemctl start kafka
> systemctl enable kafka
간단하게 단일 구성으로 구축해보았습니다. server.properties 옵션이나 Java의 옵션들은 Kafka 서버의 리소스를 고려하여 설정해야 하며, Production 환경에서는 반드시 3대 이상으로 구성하는 것을 권장합니다. 카프카를 운영하면서 가장 중요했던 요소는 디스크와 네트워크였던 것 같습니다. 당연하게도 로컬 디스크에 데이터를 저장하므로 디스크는 최소 SSD급 이상으로 구성하는 것이 좋으며, 장애가 발생하여 1대가 다운되었을 때 데이터가 이동하기 때문에 급격하게 네트워크를 사용함에 따라서 네트워크 성능도 받쳐줘야 합니다. (물론 데이터가 많지 않다면 복구도 빠르며 네트워크도 적게 사용하겠지요.)
이번에는 단일로 구성하였지만 Production 환경에서 클러스터 환경으로 구성한다고 해도 카프카 운영에는 큰 어려움이 있을 것입니다. 카프카에서 제공하는 모든 CLI 기능을 능숙하게 사용 할 수 있다면 좋겠지만, 모든것을 다 해내기에는 시간이 많이 부족하겠지요. 이러한 것들을 극복하기 위해서 카프카 관리를 위한 여러 매니지먼트 오픈 소스 툴이 존재합니다. 시간이 된다면 이러한 오픈소스를 사용 하는 방법에 대해 다루고자 합니다. 읽어주셔서 감사합니다.