4. Intellij 환경에서 Spring boot를 이용하여 Kafka api 구축 (1)

kafka
출처 : https://kafka.apache.org/

이번 장에서는 매우 간단하게 spring boot로 kafka api를 구축하여 보겠습니다.

필자의 프로젝트 개발 환경입니다.

  • maven project
  • jdk : 1.8
  • os : windows 10
  • ide : intellij 2018 ultimate

[목차]
1. Apache Kafka 소개 및 아키텍쳐
2. Apache Kafka 다운로드 및 실행 ( Windows )
3. Apache Kafka Java를 이용하여 producer 와 consumer 작성
4. Intellij 환경에서 Spring boot를 이용하여 Kafka api 구축 (1)

소스코드 다운로드 : demo

1. project 생성

project 생성
project 생성

intelilj에서 새로운 프로젝트를 생성합니다 ( create new project 클릭 )

 

empty project로 생성합니다.
empty project로 생성합니다.

intelilj spring boot kafka (2)intellij의 project = eclipse의 workspace
intellij의 module = eclipse의 project

따라서 empty project로 생성후에 module을 추가할 것입니다.

 

project name 및 location 지정
project name 및 location 지정

project name을 입력하고, project location을 지정합니다.

sdk 지정
sdk 지정

Project Structure ( Ctrl + Alt + Shift + s ) 에서 SDK를 지정합니다.

 

module 생성
module 생성

intelilj spring boot kafka (5)그 다음에 새로운 모듈 ( New Module ) 을 추가합니다.

 

 

 

spring boot로 지정
spring boot로 지정

intellij ultimate에는 spring boot 환경을 구축할 수 있습니다.

 

spring boot module 설정
spring boot module 설정

Maven Project 기반으로 구축합니다.
gradle로 하여도 딱히 상관없습니다.

 

그리고 다음과 같은 의존성 ( Dependencies ) 을 선택합니다.

depedency 지정 : Devtools
depedency 지정 : Devtools
depedency 지정 : Web
depedency 지정 : Web

 

depedency 지정 : Kafka
depedency 지정 : Kafka
  1. Dev Tools
  2. Web
  3. Kafka
  4. Kafka Streams

 

 

depedency 지정 : Kafka

프로젝트 생성이 완료되었습니다.

 

Depedencies install 완료
Depedencies install 완료

intelilj spring boot kafka위와 같이 library가 추가되어야 프로젝트를 진행할 수 있습니다.

 

2. Zookeeper와 Kafka 실행

1) zookeeper server 실행

kafka를 실행하기 위해선 일단 zookeeper가 실행되고 있어야 합니다.
kafka가 설치된 directory에서 다음과 같은 명령어를 실행합니다.

> bin/windows/zookeeper-server-start.bat config/zookeeper.properties

zookeeper 실행
zookeeper 실행

 

2) Kafka server 실행

zookeeper가 정상적으로 실행되었다면 이제 kafka를 실행합니다.

> bin/windows/kafka-server-start.bat config/server.properties

Kafka 실행
Kafka 실행

 

3) Topic 생성

kafka도 실행이 되었다면, 이제 topic을 생성합니다.

> bin/windows/kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic mytopic

Topic 생성
Topic 생성

 

3. api code 작성

1) kafka.properties

/kafkaProject/demo/src/main/resources/kafka.properties

bootstrap.servers=localhost:9092
retries=0
batch.size=4096
linger.ms=1
buffer.memory=40960

kafka 기본 설정을 properties를 작성한다.

 

2) KafkaConfiguration.java

/kafkaProject/demo/src/main/java/kafkaspringtutorial/demo/KafkaConfiguration.java

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.core.env.Environment;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
@PropertySource("classpath:kafka.properties")
@EnableKafka
public class KafkaConfiguration {

    @Autowired
    private Environment env;

    public Map<String, Object> producerConfigs() {

        Map<String, Object> props = new HashMap<>();

        // server host 및 port 지정
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));

        // retries 횟수
        props.put(ProducerConfig.RETRIES_CONFIG, env.getProperty(ProducerConfig.RETRIES_CONFIG));

        // batch size 지정
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, env.getProperty(ProducerConfig.BATCH_SIZE_CONFIG));

        // linger.ms
        props.put(ProducerConfig.LINGER_MS_CONFIG, env.getProperty(ProducerConfig.LINGER_MS_CONFIG));

        // buffer memory size 지정
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, env.getProperty(ProducerConfig.BUFFER_MEMORY_CONFIG));

        // key serialize 지정
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        // value serialize 지정
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        return props;
    }


    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        // Bean을 통하여 의존성 주입
        return new KafkaTemplate<String, String>(producerFactory());
    }

}

KafkaTemplate을 작성한다.
후에 Controller에서 Bean을 통하여 주입시킬 것입니다.

 

3) HomeController.java

/kafkaProject/demo/src/main/java/kafkaspringtutorial/demo/HomeController.java

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

@RestController
public class HomeController {

    private static final DateTimeFormatter fmt = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

    // KafkaConfiguration에서 작성한 Bean 주입.
    @Autowired
    KafkaTemplate kafkaTemplate;

    /**
     * /get?message=value 형태로 접근할 수 있도록 api 작성
     * @param message
     * @return
     */
    @RequestMapping(value="/get")
    public String getData(@RequestParam(value = "message", required = true, defaultValue = "") String message ){
        // 현재 시간
        LocalDateTime date = LocalDateTime.now();
        String dateStr = date.format(fmt);

        // mytopic에 현재 시간 + message를 produce 한다.
        kafkaTemplate.send("mytopic", dateStr + "   " + message);
        return "kafkaTemplate.send >>  " + message ;
    }
}

이렇게 Controller를 작성 후에 실행하면

http://localhost:8080/get?message=메세지

위와 같은 주소로 접근 했을 때 consumer에서 메세지를 받아옵니다.

 

4) 실행하기

/kafkaProject/demo/src/main/java/kafkaspringtutorial/demo/DemoApplication.java

위의 코드들을 작성했으면 위의 java class에서 run을 해줍니다. 그러면 spring boot가 실행됩니다.
(실제로 배포할 때는 war로 추출하여 tomcat에서 실행시켜야 합니다.)

 

4. 결과 확인

api test 1
api test 1
consumer receive 1
consumer receive 1
api test 2
api test 2
consumer receive 2
consumer receive 2
api test 3
api test 3
consumer receive 3
consumer receive 3
api test 4
api test 4
consumer receive 4
consumer receive 4

 

 

[참고]
https://docs.spring.io/spring-boot/docs/current/reference/html/boot-features-messaging.html#boot-features-kafka
https://m.blog.naver.com/PostView.nhn?blogId=talag&logNo=220930435941&proxyReferer=https%3A%2F%2Fwww.google.co.kr%2F