BackEnd/Kafka

[Kafka] Kafka Connect on Docker

샤아이인 2023. 12. 6. 21:26

 

1. 문제의 상황

MSA 토이프로젝트를 진행하던 도중 Local에서만 사용하던 Kafka와 Kafka Connector를 Container로 만들어 사용해야 하는 일이 발생하였다. 따라서 모든 서비스에 Dockerfile을 만들어 커테이너화 시켜주었으며, 추가적으로 DB와 Kafka또한 컨테이너화 시켜야 했다.

 

우선 MSA 토이 프로젝트의 구조는 다음과 같다!!

 

간단한 쇼핑몰 서비스 이며, 오늘 글에서 다룰 부분은 Order-Service와 Kafka 부분이다.

 

그냥 Kafka 자체만 필요하면 제공되는 Docker 이미지 사용하면 끝이라 쉽지만, 그 외 Connector를 함께 사용하려 하니 준비해야 할 점이 추가적으로 있었다. 또한 DB에 저장하기 위해 최종적으로 JDBC 라이브러리 또한 필요하여 추가적으로 준비해줄 부분이 있었다.

 

내가 이를 해결하고자 검색해본 결과 Docker로 Connector를 사용하는 관련 내용의 한국어 블로그가 없는것으로 생각되어 이렇게 글로 남겨보고자 한다!

 

2. 해결해보기

2-1. Kafka Connector 다운로드하기

우선 사용할 Kafka Connector와 JDBC plugins을 다운로드하여야 한다.

 

MySQL Connector Driver


Confluent JDBC plugins

 

mysql-connector-java-8.0.22.tar.gz와 confluentinc-kafka-connect-jdbc-10.0-2.1.zip의 압축을 모두 풉니다. 

jars 디렉터리를 하나 생성하고, mysql-connector-java-8.0.22.jaronfluentinc-kafka-connect-jdbc-10.0-2.1/lib/ 디렉터리에 있는 모든 .jar 파일을 이전에 생성한 jars 디렉터리로 이동시켜 줍니다!

 

저는 kafka-connect-docker라는 폴더를 하나 만든 후, 해당 폴더 안에 jars를 만들어주었습니다.

 

2-2. docker-compose.yml 작성하기

이제 필요한 파일은 전부 준비하였으니, Docker-Compose파일을 다음과 같이 만들어 봅시다!

 

설치된 jar 파일 또한 docker volume으로 잡아둔 디렉터리로 연동하여 사용하게 된다!

volumes:
  - $PWD/jars:/etc/kafka-connect/jars

 

전체 파일은 다음과 같다.

version: '2'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    networks:
      my-network:
        ipv4_address: 172.18.0.100 #ip 할당 직접했습니다

  kafka:
    image: wurstmeister/kafka
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 172.18.0.101
      KAFKA_CREATE_TOPICS: "test:1:1"
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
    depends_on:
      - zookeeper
    networks:
      my-network:
        ipv4_address: 172.18.0.101 #ip 할당 직접했습니다

  kafka-connector-mariadb:
    image: confluentinc/cp-kafka-connect:latest
    ports:
      - 8083:8083
    links:
      - kafka
      - zookeeper
    environment:
      CONNECT_BOOTSTRAP_SERVERS: kafka:9092
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: "quickstart-avro"
      CONNECT_CONFIG_STORAGE_TOPIC: "quickstart-avro-config"
      CONNECT_OFFSET_STORAGE_TOPIC: "quickstart-avro-offsets"
      CONNECT_STATUS_STORAGE_TOPIC: "quickstart-avro-status"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_REST_ADVERTISED_HOST_NAME: "localhost"
      CONNECT_LOG4J_ROOT_LOGLEVEL: DEBUG
      CONNECT_PLUGIN_PATH: "/usr/share/java,/etc/kafka-connect/jars"
    volumes:
      - $PWD/jars:/etc/kafka-connect/jars #jars파일들 volume을 통하여 사용
    networks:
      my-network:
        ipv4_address: 172.18.0.102 #ip 할당 직접했습니다

networks:
  my-network:
    name: ecommerce-network

 

이후 정상적으로 실행되었는지 확인해 봅시다!

3개다 원하던 데로 실행되고 있음을 확인할 수 있었습니다!

 

2-3. Source Connector 등록하기

2-3-1) Connector 생성

우리는 PostMan을 통하여 Source Connector를 등록할 것이다!

다음과 같이 POST 메서드를 사용하여 connectors 리소스에 자원을 생성해 주자!

POST : http://localhost:8083/connectors/ 

 

넘겨줄 JSON Body는 다음과 같다. "order-sink-connect" 라는 이름으로 connector를 생성하겠다는 의미이다.

{
    "name":"order-sink-connect",
    "config":{
        "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
        "connection.url":"jdbc:mariadb://ip:3306/데이터베이스이름",
        "connection.user":"root",
        "connection.password":"비밀번호",
        "auto.create":"true", // 첫 요청시 자동 생성 활성화
        "auto.evolve":"true",
        "delete.enabled":"false",
        "tasks.max":"1",
        "topics":"dms-order-topic"
    }
}

 

정상적으로 생성되었다면 다음 API call을 통하여 Connectors의 생성을 확인할 수 있다.

GET : http://127.0.0.1:8083/connectors

 

2-3-2) Connector 상태 확인

또한 다음과 같이 status를 통하여 상태를 확인할 수 있다.

GET : http://127.0.0.1:8083/connectors/order-sink-connect/status

 

2-3-3) Connector 삭제

마지막으로 삭제 또한 가능하다!

DELETE : http://127.0.0.1:8083/connectors/order-sink-connect

 

3. 최종 테스트

토이 프로젝트를 통하여 정상적으로 데이터가 들어가는지 확인해 보자!

 

우선 MSA상의 대략적 구조는 다음과 같다.

위 구조처럼 필요한 모든 컨테이너가 기동 중인지 Eureka Server를 통하여 확인해 보자.

 

유레카 서버에서 확인 시 모든 서버가 성공적으로 동작중임을 확인할 수 있었다.

 

다음과 같이 2개의 Order-Service가 동작중이며, 데이터 동기화를 위하여 중간에 Kafka를 사용하게 된다.

 

각각의 Order-Service에 Spring으로 구현해 둔 Controller는 다음과 같이 동작한다.

 

주문 생성 시 

  1. Catalog-Service에 주문 정보가 전달되어 제고량이 감소하고
  2. Order-Service에 주문이 전달되어 주문이 생성된다. 

 

POST man을 통하여 확인해 보자.

 

남은 부분은 발표용으로 준비한 PPT에 정리해둔 부분이다!!

 

 

 

4. 참고

https://dev.to/cosmostail/mysql-8-kafka-connect-tutorial-on-docker-479p

 

MySQL 8 Kafka Connect Tutorial on Docker

In this tutorial, we will use docker-compose, MySQL 8 as examples to demonstrate Kafka Connector by u...

dev.to