3. Apache Kafka – Java를 이용하여 producer 와 consumer 작성

kafka
출처 : https://kafka.apache.org/

이번 장에서는 spring boot로 api를 구축하기 이전에 간단하게 Java를 이용하여 카프카 클러스터 ( kafka cluster ) 를 구축하여 보도록 하겠습니다.

필자의 프로젝트 개발 환경입니다.

  • maven project
  • jdk : 1.8
  • os : windows 10

[목차]
1. Apache Kafka 소개 및 아키텍쳐
2. Apache Kafka 다운로드 및 실행 ( Windows )
3. Apache Kafka Java를 이용하여 producer 와 consumer 작성
4. Intellij 환경에서 Spring boot를 이용하여 Kafka api 구축 (1)

java project 생성

 

소스코드 다운로드 : kafka-tutorial

 

1. 다음 스냅샷과 같이 프로젝트를 생성합니다.

maven project
1. Maven Project를 생성합니다.
maven archetypes
2. maven-archetype-quicstart ( 기본 ) 선택
project naming
3. groupid 및 artifact id 지정

 

2. dependencies (의존성) 추가

kafka dependency
kafka dependency 추가
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>1.1.0</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>1.1.0</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>0.8.2.1</version>
</dependency>

 

3. consumer 작성

package kafkaTutorial.tutorial;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;

public class Consumer {

    public static void main(String[] args) {
        Properties configs = new Properties();
        // 환경 변수 설정
        configs.put("bootstrap.servers", "localhost:9092");     // kafka server host 및 port
        configs.put("session.timeout.ms", "10000");             // session 설정
        configs.put("group.id", "test20180604");                // topic 설정
        configs.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");    // key deserializer
        configs.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");  // value deserializer
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configs);    // consumer 생성
        consumer.subscribe(Arrays.asList("test20180604"));      // topic 설정
        while (true) {  // 계속 loop를 돌면서 producer의 message를 띄운다.
            ConsumerRecords<String, String> records = consumer.poll(500);
            for (ConsumerRecord<String, String> record : records) {
                String s = record.topic();
                if ("test20180604".equals(s)) {
                    System.out.println(record.value());
                } else {
                    throw new IllegalStateException("get message on topic " + record.topic());
                }
            }
        }   
    }
    
}

consumer를 먼저 실행해놓는다.

그 후에 producer에서 message를 보낼 때 consumer의 콘솔창에 message가 나타난다.

 

4. producer 작성

package kafkaTutorial.tutorial;
import java.io.IOException;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class Producer {

    public static void main(String[] args) throws IOException {

        Properties configs = new Properties();
        configs.put("bootstrap.servers", "localhost:9092"); // kafka host 및 server 설정
        configs.put("acks", "all");                         // 자신이 보낸 메시지에 대해 카프카로부터 확인을 기다리지 않습니다.
        configs.put("block.on.buffer.full", "true");        // 서버로 보낼 레코드를 버퍼링 할 때 사용할 수 있는 전체 메모리의 바이트수
        configs.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");   // serialize 설정
        configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // serialize 설정

        // producer 생성
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configs);
        
        // message 전달
        for (int i = 0; i < 5; i++) {
            String v = "hello"+i;
            producer.send(new ProducerRecord<String, String>("test20180604", v));
        }
        
        // 종료
        producer.flush();
        producer.close();
    }

}

 

5. 결과 확인

java kafka 결과
java kafka producer 와 consumer 결과 확인

 

 

[참조]

  1. 전체적인 내용 : http://epicdevs.com/21?category=460351
  2. acks : https://www.popit.kr/kafka-%EC%9A%B4%EC%98%81%EC%9E%90%EA%B0%80-%EB%A7%90%ED%95%98%EB%8A%94-producer-acks/
  3. block.on.buffer.full : https://free-strings.blogspot.com/2016/04/producer.html
  4. 공식문서 : https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example