Kafka로 비트코인 데이터 수집 파이프라인을 구축해보자

2025-04-02

소스코드는 깃허브를 통해 자세히 확인 가능합니다!

What?

  • Kafka를 이론적으로 학습하는 것만으로는 이해하기 어려웠습니다.

    → 직접 코드를 구현하며 Kafka의 동작을 익히고자 합니다!

  • 관심 있는 암호화폐 거래 시스템을 주제로 선정했습니다.

    • 빗썸 거래소의 비트코인 최근 체결 내역을 수집하는 프로젝트를 진행해볼게요!

How?


주요 개념

개념설명
Topic메시지를 전달하는 채널 역할. 여기서는 btc-trades라는 Topic을 사용할 예정
Producer데이터를 Kafka에 발행하는 역할
API 요청을 메시지로 만들어 Kafka에 저장
ConsumerKafka에서 메시지를 읽어 API를 호출하고 데이터를 처리하는 역할
PartitionTopic을 여러 개의 파티션으로 나눠 병렬 처리할 수 있도록 함

고려할 점

✅ 총 10개의 코인 데이터를 1초에 1개씩 수집하려고 한다.

  • 기존 방식: 단일 Consumer초당 1개 메시지 처리10개 요청 처리에 10초 소요
  • 해결책: Kafka의 병렬 Consumer 활용
    • 10개 파티션 생성 → 각 코인의 API 요청을 각각의 파티션에 저장
    • 10개 Consumer 병렬 실행각 Consumer가 1초에 하나씩만 처리

Kafka를 활용한 데이터 흐름

  1. Kafka Producer각 코인의 체결 데이터 조회 요청을 Kafka Topic에 발행
  2. Kafka Topic10개의 파티션으로 분배하여 저장
  3. 10개의 Kafka Consumer가 병렬로 동작하며 1초에 1개씩 API 호출
  4. Consumer가 API에서 체결 데이터를 가져와 저장
  5. 완료된 데이터는 로그로 기록

Topic 설정

@Configuration
public class KafkaTopicConfig {

    @Value("${crypto.kafka.topics.coin-trades}")
    private String topic;

    @Bean
    public NewTopic minuteCandleTopic() {
        return TopicBuilder.name(topic)
                .configs(
                        Map.of(
                                TopicConfig.RETENTION_MS_CONFIG, "3600000" // 1h
                        )
                )
                .partitions(10)
                .replicas(1)
                .build();
    }
}
  • 토픽의 보관 기간의 기본값은 7일이지만, 이번 실습은 로그로 찍어보기만 할 것이기 때문에 굳이 7일까지 가지고있을 필요가 없다. 따라서 토픽 보관 기간을 1시간으로 설정했다.
  • 위에 설명한대로 파티션은 10개로 나누도록 설정했다.

Producer 설정

@Configuration
public class KafkaProducerConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> properties = new HashMap<>();
        properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        return new KafkaAdmin(properties);
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> properties = new HashMap<>();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.springframework.kafka.support.serializer.JsonSerializer");

        return new DefaultKafkaProducerFactory<>(properties);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> producerFactory) {
        return new KafkaTemplate<>(producerFactory);
    }
}

Kafka 브로커와의 연결을 설정하고, 메시지를 Kafka 토픽으로 발행할 수 있도록 설정했다.

  1. bootstrapServers 변수
    • Kafka 브로커들이 실행 중인 서버 주소를 나타냄
  2. kafkaAdmin Bean
    • Kafka 클러스터와의 연결을 관리하는 KafkaAdmin 객체를 생성
    • Kafka 클러스터의 메타데이터를 관리하고, 토픽의 생성 및 삭제 같은 관리 작업을 처리하는 데 사용
    • 클러스터와의 연결을 설정하는 데 필요한 bootstrapServers 정보를 properties 맵에 담아 KafkaAdmin 객체를 생성하고 반환
  3. producerFactory Bean
    • Kafka Producer가 어떻게 메시지를 전송할지에 대한 세부 설정을 제공
    • ProducerConfig.BOOTSTRAP_SERVERS_CONFIG
      • Kafka 서버 주소를 설정
    • ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG
      • 메시지의 Key를 직렬화할 방식을 설정
      • StringSerializer를 사용하여 String 타입을 직렬화
    • ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG
      • 메시지의 Value를 직렬화할 방식을 설정
      • JsonSerializer를 사용하여 객체를 JSON 형식으로 직렬화
  4. kafkaTemplate Bean
    • Kafka Producer를 이용해 메시지를 전송할 수 있도록 도와주는 KafkaTemplate 객체를 생성

Consumer 설정

@Configuration
@EnableKafka
public class KafkaConsumerConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> properties = new HashMap<>();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10);

        return new DefaultKafkaConsumerFactory<>(properties);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
            ConsumerFactory<String, String> consumerFactory) {
        ConcurrentKafkaListenerContainerFactory<String, String> containerFactory = new ConcurrentKafkaListenerContainerFactory<>();
        containerFactory.setConsumerFactory(consumerFactory);
        containerFactory.setConcurrency(10);

        containerFactory.getContainerProperties().setIdleBetweenPolls(1000);
        containerFactory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);

        return containerFactory;
    }
}
  1. ConsumerFactory 설정 (consumerFactory)
    • Kafka에서 메시지를 가져오는 Consumer 설정을 정의
      • BOOTSTRAP_SERVERS_CONFIG: Kafka 브로커 주소를 설정 (1개의 브로커 사용)
      • KEY_DESERIALIZER_CLASS_CONFIG: 메시지 Key를 문자열(String)로 변환
      • VALUE_DESERIALIZER_CLASS_CONFIG: 메시지 Value를 JSON 형태로 변환
      • AUTO_OFFSET_RESET_CONFIG: 컨슈머가 가장 오래된 메시지부터 읽도록 설정
      • MAX_POLL_RECORDS_CONFIG: 한 번의 poll() 요청에서 최대 10개의 메시지를 가져옴
  2. Kafka 리스너 컨테이너 팩토리 설정 (kafkaListenerContainerFactory)
    • Kafka 메시지를 처리하는 컨슈머 리스너를 설정
      • setConsumerFactory(consumerFactory): 위에서 설정한 ConsumerFactory를 적용
      • setConcurrency(10): 10개의 Consumer가 동시에 실행되도록 설정 → 병렬 처리 지원
      • setIdleBetweenPolls(1000): 각 poll() 요청 간 1초 대기 → 부하 방지
      • setAckMode(AckMode.MANUAL_IMMEDIATE): 메시지를 처리한 즉시 ACK → 빠르고 안정적인 처리 가능

Producer 서비스 로직

@Service
@Slf4j
public class KafkaProducerService {

    @Value("${crypto.kafka.topics.coin-trades}")
    private String topic;

    @Autowired
    KafkaTemplate<String, String> kafkaTemplate;

    private final List<String> coins = List.of("BTC", "ETH", "XRP", "ADA", "DOGE", "SOL", "DOT", "BCH", "ETC", "LINK");

    @Scheduled(fixedRate = 1000) // 1초마다 실행
    public void sendCoinSymbols() {
        for (String coin : coins) {
            CompletableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, coin);
            future.whenComplete((result, ex) -> {
                if (ex == null) {
                    log.info("[PRODUCER] Sent message: {}", coin);
                } else {
                    log.error("[PRODUCER] Failed to send message: {} due to {}", coin, ex.getMessage());
                }
            });
        }
    }
}
  • 1초마다 10개의 코인을 차례로 Kafka Topic에 전송하는 Scheduler을 만들었다.

Consumer 서비스 로직

@Service
@Slf4j
public class KafkaConsumerService {
    @Autowired
    private ObjectMapper objectMapper;

    @Value("${crypto.kafka.api}")
    private String BITHUMB_API_URL;

    private final RestTemplate restTemplate = new RestTemplate();

    @KafkaListener(topics = "${crypto.kafka.topics.coin-trades}", groupId = "myGroup1")
    public void consume(String coin) {
        String url = UriComponentsBuilder.fromHttpUrl(BITHUMB_API_URL)
                .queryParam("market", "KRW-" + coin)
                .queryParam("count", 1)
                .toUriString();

        try {
            String response = restTemplate.getForObject(url, String.class);
            List<Map<String, Object>> tradeData = objectMapper.readValue(response, new TypeReference<>() {});

            log.info("[CONSUMER] Trade Data: {}", tradeData);
        } catch (Exception e) {
            log.error("[CONSUMER] API 요청 실패: {}", e.getMessage());
        }
    }
}
  • Kafka로부터 어떤 코인 정보를 호출할지에 대해 수신한 후, Bithumb API를 호출하여 최신 거래 데이터를 가져오는 역할을 합니다.
  • @KafkaListener(topics = "...", groupId = "...")
    • crypto.kafka.topics.coin-trades 토픽을 구독(consume) 하는 역할
    • groupId = "myGroup1"
      • 같은 그룹 ID를 가진 Consumer들이 메시지를 분산 처리
      • 같은 그룹 내에서는 각 메시지를 한 Consumer만 처리
  • consume(String coin)
    • Producer가 전송한 코인 심볼(BTC, ETH, XRP 등)을 매개변수로 받음
    • 예: Kafka에서 "BTC" 메시지를 받으면, coin = "BTC"

결과

%E1%84%89%E1%85%B3%E1%84%8F%E1%85%B3%E1%84%85%E1%85%B5%E1%86%AB%E1%84%89%E1%85%A3%E1%86%BA_2025-04-03_%E1%84%8B%E1%85%A9%E1%84%92%E1%85%AE_12.42.08.png

  • 초당 Producer가 어떤 코인을 어떤 Topic에 전송했는지 로그로 확인할 수 있다!

%E1%84%89%E1%85%B3%E1%84%8F%E1%85%B3%E1%84%85%E1%85%B5%E1%86%AB%E1%84%89%E1%85%A3%E1%86%BA_2025-04-03_%E1%84%8B%E1%85%A9%E1%84%92%E1%85%AE_12.41.24.png

  • Consumer가 비동기적으로 수신한 코인 데이터를 처리한 후, 해당 코인의 최근 체결 내역을 조회하고 로그로 출력하는 걸 확인할 수 있다.
  • 이를 확장해 데이터베이스나 인메모리 저장소에 저장하는 로직을 추가할 수도 있을 것이다.

Kafka 활용 효과

  • API 요청량 조절초당 10회로 제한하면서도 최적의 성능 유지
  • 병렬 처리로 지연 문제 해결1초당 10개 요청을 병렬로 수행 가능
  • 확장성 확보추가 코인 데이터 수집 시 Consumer 수만 늘리면 OK!
    • 이 부분이 가장 크게 와닿았다. 확장성이 좋다는 점!!

마무리하며

이번 글에서는 Kafka를 활용해 트래픽을 분산하는 방법을 살펴보았다. Kafka를 도입함으로써 대량의 요청을 효과적으로 분산 처리할 수 있었고, 특정 API의 호출 빈도 제한과 실제 데이터 조회 로직을 분리해 유연하게 운영할 수 있다는 점이 인상적이었다.

특히, 새로운 데이터가 필요할 경우 간단히 메시지를 추가 발행하면 되고, Consumer 측에서는 일정 속도로 처리함으로써 시스템 안정성을 유지할 수 있다는 점이 강점이었다.

Kafka를 실습을 통해 알아보며 안정적인 트래픽 관리란 무엇인지, 0.00001프로 정도 가닥을 잡을 수 있었다. 대용량 데이터 처리란 정말 매력적이다!!!!! 🤓


참고