소스코드는 깃허브를 통해 자세히 확인 가능합니다!
What?
-
Kafka를 이론적으로 학습하는 것만으로는 이해하기 어려웠습니다.
→ 직접 코드를 구현하며 Kafka의 동작을 익히고자 합니다!
-
관심 있는 암호화폐 거래 시스템을 주제로 선정했습니다.
- 빗썸 거래소의 비트코인 최근 체결 내역을 수집하는 프로젝트를 진행해볼게요!
How?
- 빗썸 API를 이용해 비트코인의 최근 체결 내역을 조회
- https://apidocs.bithumb.com/reference/최근-체결-내역
- API 요청에는 초당 150회 호출 제한이 존재
- Kafka를 활용해 트래픽을 분산 처리하여 API 요청량을 제어할 예정
주요 개념
| 개념 | 설명 |
|---|---|
| Topic | 메시지를 전달하는 채널 역할. 여기서는 btc-trades라는 Topic을 사용할 예정 |
| Producer | 데이터를 Kafka에 발행하는 역할 |
| API 요청을 메시지로 만들어 Kafka에 저장 | |
| Consumer | Kafka에서 메시지를 읽어 API를 호출하고 데이터를 처리하는 역할 |
| Partition | Topic을 여러 개의 파티션으로 나눠 병렬 처리할 수 있도록 함 |
고려할 점
✅ 총 10개의 코인 데이터를 1초에 1개씩 수집하려고 한다.
- 기존 방식: 단일 Consumer → 초당 1개 메시지 처리 → 10개 요청 처리에 10초 소요
- 해결책: Kafka의 병렬 Consumer 활용
- 10개 파티션 생성 → 각 코인의 API 요청을 각각의 파티션에 저장
- 10개 Consumer 병렬 실행 → 각 Consumer가 1초에 하나씩만 처리
Kafka를 활용한 데이터 흐름
- Kafka Producer가 각 코인의 체결 데이터 조회 요청을 Kafka Topic에 발행
- Kafka Topic이 10개의 파티션으로 분배하여 저장
- 10개의 Kafka Consumer가 병렬로 동작하며 1초에 1개씩 API 호출
- Consumer가 API에서 체결 데이터를 가져와 저장
- 완료된 데이터는 로그로 기록
Topic 설정
@Configuration
public class KafkaTopicConfig {
@Value("${crypto.kafka.topics.coin-trades}")
private String topic;
@Bean
public NewTopic minuteCandleTopic() {
return TopicBuilder.name(topic)
.configs(
Map.of(
TopicConfig.RETENTION_MS_CONFIG, "3600000" // 1h
)
)
.partitions(10)
.replicas(1)
.build();
}
}
- 토픽의 보관 기간의 기본값은 7일이지만, 이번 실습은 로그로 찍어보기만 할 것이기 때문에 굳이 7일까지 가지고있을 필요가 없다. 따라서 토픽 보관 기간을 1시간으로 설정했다.
- 위에 설명한대로 파티션은 10개로 나누도록 설정했다.
Producer 설정
@Configuration
public class KafkaProducerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> properties = new HashMap<>();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
return new KafkaAdmin(properties);
}
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> properties = new HashMap<>();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.springframework.kafka.support.serializer.JsonSerializer");
return new DefaultKafkaProducerFactory<>(properties);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> producerFactory) {
return new KafkaTemplate<>(producerFactory);
}
}
Kafka 브로커와의 연결을 설정하고, 메시지를 Kafka 토픽으로 발행할 수 있도록 설정했다.
bootstrapServers변수- Kafka 브로커들이 실행 중인 서버 주소를 나타냄
kafkaAdminBean- Kafka 클러스터와의 연결을 관리하는
KafkaAdmin객체를 생성 - Kafka 클러스터의 메타데이터를 관리하고, 토픽의 생성 및 삭제 같은 관리 작업을 처리하는 데 사용
- 클러스터와의 연결을 설정하는 데 필요한
bootstrapServers정보를properties맵에 담아KafkaAdmin객체를 생성하고 반환
- Kafka 클러스터와의 연결을 관리하는
producerFactoryBean- Kafka Producer가 어떻게 메시지를 전송할지에 대한 세부 설정을 제공
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG- Kafka 서버 주소를 설정
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG- 메시지의
Key를 직렬화할 방식을 설정 StringSerializer를 사용하여String타입을 직렬화
- 메시지의
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG- 메시지의
Value를 직렬화할 방식을 설정 JsonSerializer를 사용하여 객체를 JSON 형식으로 직렬화
- 메시지의
kafkaTemplateBean- Kafka Producer를 이용해 메시지를 전송할 수 있도록 도와주는
KafkaTemplate객체를 생성
- Kafka Producer를 이용해 메시지를 전송할 수 있도록 도와주는
Consumer 설정
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10);
return new DefaultKafkaConsumerFactory<>(properties);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
ConsumerFactory<String, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, String> containerFactory = new ConcurrentKafkaListenerContainerFactory<>();
containerFactory.setConsumerFactory(consumerFactory);
containerFactory.setConcurrency(10);
containerFactory.getContainerProperties().setIdleBetweenPolls(1000);
containerFactory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return containerFactory;
}
}
- ConsumerFactory 설정 (
consumerFactory)- Kafka에서 메시지를 가져오는 Consumer 설정을 정의
BOOTSTRAP_SERVERS_CONFIG: Kafka 브로커 주소를 설정 (1개의 브로커 사용)KEY_DESERIALIZER_CLASS_CONFIG: 메시지 Key를 문자열(String)로 변환VALUE_DESERIALIZER_CLASS_CONFIG: 메시지 Value를 JSON 형태로 변환AUTO_OFFSET_RESET_CONFIG: 컨슈머가 가장 오래된 메시지부터 읽도록 설정MAX_POLL_RECORDS_CONFIG: 한 번의poll()요청에서 최대 10개의 메시지를 가져옴
- Kafka에서 메시지를 가져오는 Consumer 설정을 정의
- Kafka 리스너 컨테이너 팩토리 설정 (
kafkaListenerContainerFactory)- Kafka 메시지를 처리하는 컨슈머 리스너를 설정
setConsumerFactory(consumerFactory): 위에서 설정한 ConsumerFactory를 적용setConcurrency(10): 10개의 Consumer가 동시에 실행되도록 설정 → 병렬 처리 지원setIdleBetweenPolls(1000): 각poll()요청 간 1초 대기 → 부하 방지setAckMode(AckMode.MANUAL_IMMEDIATE): 메시지를 처리한 즉시 ACK → 빠르고 안정적인 처리 가능
- Kafka 메시지를 처리하는 컨슈머 리스너를 설정
Producer 서비스 로직
@Service
@Slf4j
public class KafkaProducerService {
@Value("${crypto.kafka.topics.coin-trades}")
private String topic;
@Autowired
KafkaTemplate<String, String> kafkaTemplate;
private final List<String> coins = List.of("BTC", "ETH", "XRP", "ADA", "DOGE", "SOL", "DOT", "BCH", "ETC", "LINK");
@Scheduled(fixedRate = 1000) // 1초마다 실행
public void sendCoinSymbols() {
for (String coin : coins) {
CompletableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, coin);
future.whenComplete((result, ex) -> {
if (ex == null) {
log.info("[PRODUCER] Sent message: {}", coin);
} else {
log.error("[PRODUCER] Failed to send message: {} due to {}", coin, ex.getMessage());
}
});
}
}
}
- 1초마다 10개의 코인을 차례로 Kafka Topic에 전송하는 Scheduler을 만들었다.
Consumer 서비스 로직
@Service
@Slf4j
public class KafkaConsumerService {
@Autowired
private ObjectMapper objectMapper;
@Value("${crypto.kafka.api}")
private String BITHUMB_API_URL;
private final RestTemplate restTemplate = new RestTemplate();
@KafkaListener(topics = "${crypto.kafka.topics.coin-trades}", groupId = "myGroup1")
public void consume(String coin) {
String url = UriComponentsBuilder.fromHttpUrl(BITHUMB_API_URL)
.queryParam("market", "KRW-" + coin)
.queryParam("count", 1)
.toUriString();
try {
String response = restTemplate.getForObject(url, String.class);
List<Map<String, Object>> tradeData = objectMapper.readValue(response, new TypeReference<>() {});
log.info("[CONSUMER] Trade Data: {}", tradeData);
} catch (Exception e) {
log.error("[CONSUMER] API 요청 실패: {}", e.getMessage());
}
}
}
- Kafka로부터 어떤 코인 정보를 호출할지에 대해 수신한 후, Bithumb API를 호출하여 최신 거래 데이터를 가져오는 역할을 합니다.
@KafkaListener(topics = "...", groupId = "...")crypto.kafka.topics.coin-trades토픽을 구독(consume) 하는 역할groupId = "myGroup1"- 같은 그룹 ID를 가진 Consumer들이 메시지를 분산 처리
- 같은 그룹 내에서는 각 메시지를 한 Consumer만 처리
consume(String coin)- Producer가 전송한 코인 심볼(BTC, ETH, XRP 등)을 매개변수로 받음
- 예: Kafka에서
"BTC"메시지를 받으면,coin = "BTC"
결과

- 초당 Producer가 어떤 코인을 어떤 Topic에 전송했는지 로그로 확인할 수 있다!

- Consumer가 비동기적으로 수신한 코인 데이터를 처리한 후, 해당 코인의 최근 체결 내역을 조회하고 로그로 출력하는 걸 확인할 수 있다.
- 이를 확장해 데이터베이스나 인메모리 저장소에 저장하는 로직을 추가할 수도 있을 것이다.
Kafka 활용 효과
- API 요청량 조절 → 초당 10회로 제한하면서도 최적의 성능 유지
- 병렬 처리로 지연 문제 해결 → 1초당 10개 요청을 병렬로 수행 가능
- 확장성 확보 → 추가 코인 데이터 수집 시 Consumer 수만 늘리면 OK!
- 이 부분이 가장 크게 와닿았다. 확장성이 좋다는 점!!
마무리하며
이번 글에서는 Kafka를 활용해 트래픽을 분산하는 방법을 살펴보았다. Kafka를 도입함으로써 대량의 요청을 효과적으로 분산 처리할 수 있었고, 특정 API의 호출 빈도 제한과 실제 데이터 조회 로직을 분리해 유연하게 운영할 수 있다는 점이 인상적이었다.
특히, 새로운 데이터가 필요할 경우 간단히 메시지를 추가 발행하면 되고, Consumer 측에서는 일정 속도로 처리함으로써 시스템 안정성을 유지할 수 있다는 점이 강점이었다.
Kafka를 실습을 통해 알아보며 안정적인 트래픽 관리란 무엇인지, 0.00001프로 정도 가닥을 잡을 수 있었다. 대용량 데이터 처리란 정말 매력적이다!!!!! 🤓