송민준의 개발노트

[local] kafka 세팅 본문

인프라/kafka

[local] kafka 세팅

송민준 2023. 4. 25. 03:33

https://kafka.apache.org/downloads

 

Apache Kafka

Apache Kafka: A Distributed Streaming Platform.

kafka.apache.org

1. 카프카 공식 사이트 들어가서 원하는 버전의 kafka 다운

 

2. 다운 받은 tgz 파일을 원하는 경로에 zip 풀기

- 세팅 관련 파일들은 config 폴더

- 실행 파일들은 bin 폴더, window는 bin/windows

 

3. 아래는 기본적인 실행 명령어 예시

cd C:\Work\kafka_2.12-3.4.0\bin\windows

// 쥬키퍼 서버
zookeeper-server-start.bat ../../config/zookeeper.properties

// 카프카 서버(프로듀서)
kafka-server-start.bat ../../config/server.properties

// 카프카 토픽 리스트 확인
kafka-topics.bat --bootstrap-server localhost:9092 --list

// 카프카 토픽 상세 확인
kafka-topics.bat --bootstrap-server localhost:9092 --describe --topic quickstart-events

// 카프카 토픽 생성(quickstart-events 이름)
kafka-topics.bat --bootstrap-server localhost:9092 --create --topic quickstart-events --partitions 1

// 프로듀서 실행
kafka-console-producer.bat --broker-list localhost:9092 --topic quickstart-events

// 컨슈머 실행
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic quickstart-events --from-beginning

 

 

* docker-compose.yml 예시

version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.2.1
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-server:7.2.1
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "9101:9101"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost
      KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'

  schema-registry:
    image: confluentinc/cp-schema-registry:7.2.1
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - broker
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081

  connect:
    image: cnfldemos/cp-server-connect-datagen:0.5.3-7.1.0
    hostname: connect
    container_name: connect
    depends_on:
      - broker
      - schema-registry
    ports:
      - "8083:8083"
    volumes:
      - ./jdbc jar가 들어있는 로컬 디렉토리:/etc/kafka-connect/jars #jdbc 연결시 필요   
    environment:
      CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      # CLASSPATH required due to CC-2422
      CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-7.2.1.jar
      CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
      CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR

  control-center:
    image: confluentinc/cp-enterprise-control-center:7.2.1
    hostname: control-center
    container_name: control-center
    depends_on:
      - broker
      - schema-registry
      - connect
      - ksqldb-server
    ports:
      - "9021:9021"
    environment:
      CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
      CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'connect:8083'
      CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksqldb-server:8088"
      CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://localhost:8088"
      CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      CONTROL_CENTER_REPLICATION_FACTOR: 1
      CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
      CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
      CONFLUENT_METRICS_TOPIC_REPLICATION: 1
      PORT: 9021

  ksqldb-server:
    image: confluentinc/cp-ksqldb-server:7.2.1
    hostname: ksqldb-server
    container_name: ksqldb-server
    depends_on:
      - broker
      - connect
    ports:
      - "8088:8088"
    environment:
      KSQL_CONFIG_DIR: "/etc/ksql"
      KSQL_BOOTSTRAP_SERVERS: "broker:29092"
      KSQL_HOST_NAME: ksqldb-server
      KSQL_LISTENERS: "http://0.0.0.0:8088"
      KSQL_CACHE_MAX_BYTES_BUFFERING: 0
      KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
      KSQL_KSQL_CONNECT_URL: "http://connect:8083"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: 1
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: 'true'
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: 'true'

  ksqldb-cli:
    image: confluentinc/cp-ksqldb-cli:7.2.1
    container_name: ksqldb-cli
    depends_on:
      - broker
      - connect
      - ksqldb-server
    entrypoint: /bin/sh
    tty: true

  ksql-datagen:
    image: confluentinc/ksqldb-examples:7.2.1
    hostname: ksql-datagen
    container_name: ksql-datagen
    depends_on:
      - ksqldb-server
      - broker
      - schema-registry
      - connect
    command: "bash -c 'echo Waiting for Kafka to be ready... && \
                       cub kafka-ready -b broker:29092 1 40 && \
                       echo Waiting for Confluent Schema Registry to be ready... && \
                       cub sr-ready schema-registry 8081 40 && \
                       echo Waiting a few seconds for topic creation to finish... && \
                       sleep 11 && \
                       tail -f /dev/null'"
    environment:
      KSQL_CONFIG_DIR: "/etc/ksql"
      STREAMS_BOOTSTRAP_SERVERS: broker:29092
      STREAMS_SCHEMA_REGISTRY_HOST: schema-registry
      STREAMS_SCHEMA_REGISTRY_PORT: 8081

  rest-proxy:
    image: confluentinc/cp-kafka-rest:7.2.1
    depends_on:
      - broker
      - schema-registry
    ports:
      - 8082:8082
    hostname: rest-proxy
    container_name: rest-proxy
    environment:
      KAFKA_REST_HOST_NAME: rest-proxy
      KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092'
      KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
      KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'

 

 

kafka connect 설치

 

1. 패키지 다운

curl -O https://packages.confluent.io/archive/7.3/confluent-community-7.3.3.tar.gz

tar xvf confluent-community-7.3.3.tar.gz

 

2. jdbc connecter 다운로드

https://docs.confluent.io/kafka-connectors/jdbc/current/source-connector/overview.html#install-the-connector-using-c-hub

 

3. mysql-driver 다운로드

- zip으로 풀어서 jar파일 챙김

https://downloads.mysql.com/archives/c-j/

 

 

4. connect-distributed.properties에 plugin 추가

confluent-7.3.3 밑에 confluent-7.3.3\etc\kafka 경로에 connect-distributed.properties에 마지막 줄 추가(jdbc 라이브러리)

plugin.path=\C:\\Work\\confluentinc-kafka-connect-jdbc-10.6.3\\lib

 

5. C:\Work\confluent-7.3.3\share\java\kafka 경로에 mysql driver추가

mysql-connector-j-8.0.32.jar

 

6. connect-distributed.bat 에 log 세팅 수정

rem Log4j settings
IF ["%KAFKA_LOG4J_OPTS%"] EQU [""] (
	set KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:%BASE_DIR%/etc/kafka/connect-log4j.properties
)

7. kafka-run-class.bat에 Classpath addition for LSB style path 수정

rem Classpath addition for LSB style path
if exist %BASE_DIR%\share\java\kafka\* (
	call:concat %BASE_DIR%\share\java\kafka\*
)

8. 명령어 실행으로 확인

cd C:\Work\confluent-7.3.1
.\bin\windows\connect-distributed.bat .\etc\kafka\connect-distributed.properties

 

* kafka Source Connect *

 

9. kafka Source Connect 추가(기본 포트가 8083인듯)

echo '

{

    "name" : "my-source-connect",

    "config" : {

        "connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",

        "connection.url":"jdbc:mysql://localhost:3306/msa",

        "connection.user":"msa",

        "connection.password":"1234",

        "mode": "incrementing",

        "incrementing.column.name" : "id",

        "table.whitelist":"users",

        "topic.prefix" : "my_topic_",

        "tasks.max" : "1"

    }

}

' | curl -X POST -d @- http://localhost:8083/connectors --header "content-Type:application/json"
### kafka connect 리스트
GET http://localhost:8083/connectors

### kafka connect 리스트 상세
GET http://localhost:8083/connectors/my-source-connect


### kafka connect 생성
POST http://localhost:8083/connectors
Content-Type: application/json

{

    "name" : "my-source-connect",

    "config" : {

        "connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",

        "connection.url":"jdbc:mysql://localhost:3306/msa",

        "connection.user":"msa",

        "connection.password":"1234",

        "mode": "incrementing",

        "incrementing.column.name" : "id",

        "table.whitelist":"users",

        "topic.prefix" : "my_topic_",

        "tasks.max" : "1"

    }

}

### kafka connect connect2
POST http://localhost:8083/connectors
Content-Type: application/json

{

  "name" : "my-source-connect-2",

  "config" : {

    "connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",

    "connection.url":"jdbc:mysql://localhost:3306/msa",

    "connection.user":"msa",

    "connection.password":"1234",

    "mode": "timestamp+incrementing",

    "timestamp.column.name" : "modified_at",

    "incremental.column.name" : "id",

    "validate.non.null": false,

    "table.whitelist":"users",

    "topic.prefix" : "my_topic_2_",

    "tasks.max" : "1"

  }

}

 

10. 9)번 연결 작업 후 연결된 db의 로그를 보고싶다면 해당 토픽을 기준으로 컨슈머 실행

kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic my-source-connect --from-beginning

 

 

* kafka Sink Connect *

 

11. kafka sink connect 추가

### kafka sink connect
POST http://localhost:8083/connectors
Content-Type: application/json

{

  "name":"my-sink-connect",

  "config":{

      "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",

      "connection.url":"jdbc:mysql://localhost:3306/msa",

      "connection.user":"msa",

      "connection.password":"1234",

      "auto.create":"true",

      "auto.evolve":"true",

      "delete.enabled":"false",

      "tasks.max":"1",

      "topics":"my_topic_users"

  }

}

 

* 윈도우에서 '현재 연결은 원격 호스트에 의해 강제로 끊겼습니다' 에러 시 *

 

* sink tasks status failed 일 때 질문 남긴 것 *

 

참고

https://inflearn.com/questions/616977