Posting

Machbase의 최신 소식을 지금 만나보세요

[MACHBASE 연동] Apache Kafka

마크베이스 V5 와 Apache Kafka 연동

이 문서는, Apache Kafka 에서 생성되는 스트림 데이터를 마크베이스 V5 에 데이터를 넣기 위한 과정을 설명한다. 마크베이스는 폭증하는 실시간 데이터 저장과 조회를 목적으로 하기 때문에, 실시간 스트리밍 이벤트 데이터에 대한 수요 역시 존재한다. 대표적인 스트리밍 플랫폼인 Apache Kafka 를 통해 입력받을 수 있는 방법이 없는지 문의하는 고객 또한 많으므로, 이번 기회를 통해 단계별로 사용 방법을 설명해 보고자 한다.

아래 그림은 Apache Kafka 와 그 사이에 있는 Producer / Consumer 프로그램, 그리고 Consumer 가 수신받은 데이터를 입력받을 마크베이스 서버의 그림을 도식화한 것이다. 이 문서에서는, 마크베이스는 이미 설치되어 있다고 가정하고 Apache Kafka 의 설치를 간략하게 언급한 다음, 데모 환경을 위해 이벤트 데이터를 만들어 넣는 방법과 함께 C 언어로 된 Consumer 코드를 작성해 보도록 한다.

Apache Kafka 설치와 테스트

Apache Kafka 설치는 이 링크를 통해 진행할 수 있다. 단계별로 진행할 수 있으므로 쉽게 서비스 구동까지 따라할 수 있다.

  1. Kafka 를 다운로드 받아 압축을 푼다.
  2. 곧바로 서버를 구동한다. (여기서는 Apache Zookeeper 이다)

이후의 단계들은 마크베이스 데모를 위해, 이 문서에서 개별 설명하도록 한다.

토픽 (Topic) 생성

먼저, 토픽 (Topic) 을 생성한다. Kafka 에서의 토픽은, 하나의 이벤트 스트림을 의미한다.

tag 라고 이름 붙인 토픽을 아래와 같이 생성한다.
여기서 Zookeeper 서버는 localhost:2181 이라고 가정한다.

$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic tag

아래 명령을 통해 토픽 생성이 잘 되었는지 확인할 수 있다.

$ bin/kafka-topics.sh --list --zookeeper localhost:2181

메시지 생성

kafka-console-producer.sh 를 통해, 토픽에 이벤트 데이터 (=메시지) 를 넣어 볼 수 있다. 아무런 메시지나 입력해 보도록 하자.
여기서 Kafka Broker 서버는 localhost:9092 이라고 가정한다.

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic tag
> This is a message
> This is another message

메시지 수신

kafka-console-consumer.sh 를 통해 메시지 수신이 잘 이뤄지는지부터 확인해 보자.

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic tag --from-beginning

이렇게 입력하면, 곧바로 Producer Console 에서 입력한 내용 그대로 수신되는 것을 확인할 수 있다.

This is a message
This is another message

마크베이스와 연결

마크베이스 서버는 이미 설치되어 있고 구동 중이라고 가정한다. 마크베이스 서버의 접속 정보는 다음과 같이 기본값이라고 가정한다.

  • HOST : localhost
  • PORT : 5656

예제 프로젝트 준비

Consumer 프로그램 작성을 위해서는 librdkafka 라는 라이브러리가 필요하다.

librdkafka 는 Producer/Consumer 를 모두 지원하는 Apache Kafka protocol 을 구현한 C 라이브러리이다. 메시지 전송 안정성을 지향하면서도 고성능의 라이브러리이므로, Producer 에서는 초당 1백만건의 메시지를, Consumer 에서는 초당 3백만건의 메시지를 전달받을 수 있도록 설계되었다고 한다.

하지만 librdkafka 를 참고해서 샘플 코드를 직접 작성하려면 굉장히 힘들므로, 마크베이스 Github 에 있는 Kafka Example 프로젝트 를 내려받는다.

git checkout https://github.com/MACHBASE/kafka_example
cd kafka_example
make all

make 가 잘 된다면, kafka_to_machbase 바이너리가 생성된다.

마크베이스 : 테이블 준비

먼저, Consumer 에서 입력받을 테이블을 준비한다.

마크베이스 V5 부터는 태그 테이블 (Tagdata Table) 을 지원하므로, 여기에 입력해 보도록 한다. 
해당 쿼리 구문은, repository 에서 query/create.sql 에 위치해 있다.

CREATE TAGDATA TABLE tag (name VARCHAR(20) PRIMARY KEY, time DATETIME BASETIME, value DOUBLE SUMMARIZED);

태그 데이터 생성

repository 디렉토리에 있는 tag/gen.py 를 사용해서 tag.csv 를 만든다.

python gen.py ${COUNT} > /path/to/kafka/install/tag.csv

COUNT 값은, 생성하고자 하는 태그 데이터 개수를 입력한다. (예제에서는 10000 을 입력했다.)
tag.csv 는, Kafka 설치 디렉토리에 위치할 수 있도록 한다.

Consumer 프로그램 구동

Consumer 프로그램을 먼저 구동해야 한다. Repository 디렉토리에 위치한 다음, 다음과 같이 구동한다.

$ ./kafka_to_machbase tag

Producer : CSV 입력

Producer 는 이전 예제와 같이 kafka-console-producer.sh 를 사용한다.
이전 예제와는 달리, 실제 태그 데이터 (=센서 데이터) 가 입력된다고 여길 수 있는 데이터를 입력하는 것이다.

Kafka 설치 디렉토리에 위치한 다음, 아까 생성한 tag.csv 파일을 가지고 다음과 같이 입력한다.

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic tag < tag.csv

마크베이스 : 입력 확인

다음과 같이 접속해서, 확인해 보자.

SELECT *        FROM tag WHERE name = 'TEST^TAG001' LIMIT 10;
SELECT count(*) FROM tag WHERE name = 'TEST^TAG010';

Mach> SELECT *        FROM tag WHERE name = 'TEST^TAG001' LIMIT 5;
NAME                  TIME                            VALUE
--------------------------------------------------------------------------------------
TEST^TAG001           2019-01-01 00:00:00 000:000:000 94.405
TEST^TAG001           2019-01-01 00:00:00 100:000:000 11.514
TEST^TAG001           2019-01-01 00:00:00 200:000:000 78.049
TEST^TAG001           2019-01-01 00:00:00 300:000:000 79.428
TEST^TAG001           2019-01-01 00:00:00 400:000:000 59.853
[10] row(s) selected.
Elapsed time: 0.002

Mach> SELECT count(*) FROM tag WHERE name = 'TEST^TAG010';
count(*)
-----------------------
100
[1] row(s) selected.

마치며

지금까지 Apache Kafka 에서 생성되는 이벤트 데이터를 Machbase 서버에 입력하는 과정을 단계별로 진행해 보았다. 실시간 데이터 유형 중 하나인 이벤트 데이터를 Pub/Sub 방식으로 제공할 수 있는 플랫폼인 Kafka 에서 초고속 입력을 가능한 방법을 소개하는 것은, 큰 확장성을 부여하는 것이라고 믿는다.

실제로 Kafka 가 스트림 데이터를 일정 기간동안 보관할 수 있고 다른 Consumer 를 통해 데이터 재분배를 할 수 있기 때문에 클러스터 안정성을 올려줄 수 있다면, Machbase 서버는 Kafka 데이터를 지연 없이 빠른 시간에 입력받아 영구적으로 보관할 수 있는 방법을 제공하기 때문에 서로 공존할 수 있는 그림을 충분히 그려볼 수 있을 것이다.

연관 포스트

C언어로 Binary data를 Machbase에 넣기

1.개요 Data를 다루다 보면 numeric, varchar 형 데이터뿐만 아니라 JPG, PNG, MP3와 같은 Binary data도 database에 저장해야 할 때가 존재한다. 그러나 일반 data들과는 달리 Binary

[MACHBASE 연동] Android studio에서 JDBC 연결하기

마크베이스 기술지원본부 이현민 1. 개요 수많은 데이터들이 많은 환경에서 생성되고 있는 오늘날, 우리 현대인들의 동반자인 스마트폰 또한 데이터생성의 주체로써 또는 전달자로서 알게 모르게 그 구실을