개요

Machbase 에 입력된 데이터를 Logstash 를 통해 Elasticsearch 에도 데이터를 입력하고자 하는 경우에 대한 연동 방법에 대해 기술합니다.
Machbase는 JDBC 환경을 지원하며,
Logstash 에서 제공하는 여러 Input plugin 중에 Jdbc input plugin 을 통해 연동하고자 합니다.
테스트 환경
Machbase, Logstash, Elasticsearch 는 설치가 되어 있다는 가정 하에 진행하며,
Machbase의 테이블은 일반적으로 많이 사용하시는 TAG 테이블로 진행합니다.
Logstash, Elasticsearch 설치는 아래 링크를 참고하시면 되겠습니다.
항목 | Version |
OS | CentOS 7.7 |
Machbase | Fog 6.1.11 |
Logstash | 7.10.1 |
Elasticsearch | 7.10.1 |
etc | curl 7.29.0 |
테스트 순서
1) Machbase Jar 파일 설정
- machbase.jar 파일 copy 및 권한 변경
sudo cp /home/machbase/machbase_home/lib/machbase.jar /usr/share/logstash/lib sudo chown logstash:logstash /usr/share/logstash/lib/machbase.jar
2) Machbase 테이블 생성
- TAG 테이블 생성
create tagdata table tag (name varchar(20) primary key, time datetime basetime, value double summarized);
3) Logstash jdbc input plugin conf 파일 설정
- /etc/logstash/conf.d/logstash-jdbc-es.conf 파일(생성 필요)
input { jdbc { jdbc_driver_library => "/usr/share/logstash/lib/machbase.jar" jdbc_driver_class => "com.machbase.jdbc.driver" jdbc_connection_string => "jdbc:machbase://localhost:5656/mhdb" jdbc_user => "sys" jdbc_password => "manager" jdbc_paging_enabled => true jdbc_page_size => 100000 tracking_column => "time" tracking_column_type => "timestamp" use_column_value => true schedule => "*/5 * * * * *" statement => "SELECT name, time, value FROM tag where time BETWEEN TO_DATE(:sql_last_value, 'YYYY-MM-DD HH24:MI:SS.mmmuuu') AND NOW" last_run_metadata_path => "/last_run/logstash-jdbc-es" } } filter { ruby { code => "event.set('time', event.get('time').time.localtime('+09:00').strftime('%Y-%m-%d %H:%M:%S.%3N'))" } mutate { add_field => { "pkid" => "%{name}_%{time}" } remove_field => "@version" } } output { # stdout { codec => "rubydebug"} elasticsearch { hosts => ["localhost:9200"] index => "tag" document_id => "%{pkid}" } }
- last_run_metadata_path 설정
sudo mkdir /last_run sudo chown logstash:logstash /last_run
- sql_last_value 초기 값 설정
sql_last_value 의 초기 값은 0 또는 ‘1970-01-01T00:00:00 Z’ 이다
하지만, 위 TO_DATE(:sql_last_value, ‘YYYY-MM-DD HH24:MI:SS.mmmuuu’) 쿼리문에서 machbase의 TO_DATE 는 ‘1970-01-01 09:00:00’ 보다 큰 값일 경우에 정상 처리된다.
하여, /last_run/logstash-jdbc-es 파일에 아래와 같이 입력하도록 한다.--- 1970-01-01 09:00:01.000000000 Z
- logstash 재구동 및 로그 확인
systemctl restart logstash 한 후, tail -f /var/log/logstash/logstash-plain.log 로 실행 로그 확인 또는, /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/logstash-jdbc-es.conf 로 바로 확인 가능
4) Machbase 데이터 입력
- 데이터 입력 및 확인원래대로 라면, 대량의 입력 Append 함수를 통해 (Bulk)입력을 해야 하지만 테스트이므로 5개의 데이터만 입력함이 후, Elasticsearch 에서 아래 5개의 데이터가 입력되는지 확인해보겠습니다.
-- 데이터 입력 쿼리 insert into tag values ('EQ1^TAG1', to_date('2020-12-18 12:00:01 111:222:333'), 1); insert into tag values ('EQ1^TAG1', to_date('2020-12-18 13:00:01 222:333:444'), 2); insert into tag values ('EQ2^TAG1', to_date('2020-12-18 14:05:01 333:444:555'), 1); insert into tag values ('EQ2^TAG1', to_date('2020-12-18 15:06:01 444:555:666'), 2); insert into tag values ('EQ2^TAG1', to_date('2020-12-18 17:06:01 555:666:777'), 3); -- 입력된 데이터 조회 결과 Mach> select * from tag; NAME TIME VALUE -------------------------------------------------------------------------------------- EQ1^TAG1 2020-12-18 12:00:01 111:222:333 1 EQ1^TAG1 2020-12-18 13:00:01 222:333:444 2 EQ2^TAG1 2020-12-18 14:05:01 333:444:555 1 EQ2^TAG1 2020-12-18 15:06:01 444:555:666 2 EQ2^TAG1 2020-12-18 17:06:01 555:666:777 3 [5] row(s) selected. Elapsed time: 0.001 Mach>
5) Elasticsearch 로 입력된 데이터 확인
- 모든 데이터 조회5개의 모든 데이터가 입력되었음을 확인할 수 있습니다.
curl -XGET http://localhost:9200/tag/_search?pretty -H 'Content-Type: application/json' -d '{ "query": { "match_all": {} } }' { "took" : 0, "timed_out" : false, "_shards" : { "total" : 1, "successful" : 1, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : { "value" : 5, "relation" : "eq" }, "max_score" : 1.0, "hits" : [ { "_index" : "tag", "_type" : "_doc", "_id" : "EQ1^TAG1_2020-12-18 12:00:01.111", "_score" : 1.0, "_source" : { "name" : "EQ1^TAG1", "@timestamp" : "2020-12-18T09:18:05.315Z", "time" : "2020-12-18 12:00:01.111", "pkid" : "EQ1^TAG1_2020-12-18 12:00:01.111", "value" : 1.0 } }, { "_index" : "tag", "_type" : "_doc", "_id" : "EQ1^TAG1_2020-12-18 13:00:01.222", "_score" : 1.0, "_source" : { "name" : "EQ1^TAG1", "@timestamp" : "2020-12-18T09:18:05.316Z", "time" : "2020-12-18 13:00:01.222", "pkid" : "EQ1^TAG1_2020-12-18 13:00:01.222", "value" : 2.0 } }, { "_index" : "tag", "_type" : "_doc", "_id" : "EQ2^TAG1_2020-12-18 14:05:01.333", "_score" : 1.0, "_source" : { "name" : "EQ2^TAG1", "@timestamp" : "2020-12-18T09:18:05.316Z", "time" : "2020-12-18 14:05:01.333", "pkid" : "EQ2^TAG1_2020-12-18 14:05:01.333", "value" : 1.0 } }, { "_index" : "tag", "_type" : "_doc", "_id" : "EQ2^TAG1_2020-12-18 15:06:01.444", "_score" : 1.0, "_source" : { "name" : "EQ2^TAG1", "@timestamp" : "2020-12-18T09:18:05.317Z", "time" : "2020-12-18 15:06:01.444", "pkid" : "EQ2^TAG1_2020-12-18 15:06:01.444", "value" : 2.0 } }, { "_index" : "tag", "_type" : "_doc", "_id" : "EQ2^TAG1_2020-12-18 17:06:01.555", "_score" : 1.0, "_source" : { "name" : "EQ2^TAG1", "@timestamp" : "2020-12-18T09:18:05.317Z", "time" : "2020-12-18 17:06:01.555", "pkid" : "EQ2^TAG1_2020-12-18 17:06:01.555", "value" : 3.0 } } ] } }
아쉬웠던 점
- UNIX_TIMESTAMP() 함수 호환 문제
- 날짜범위를 지정하기 위해 일반적으로 jdbc conf 설정 파일에서 아래와 같이 tracking_column_type 과 query statement 를 사용한다.
- 하지만, UNIX_TIMESTAMP(time) as unix_ts_in_secs 로 tracking_column 을 지정하면 오류가 발생한다. (machsql 상에서는 정상 쿼리 동작한다.)
-- 쿼리 실행 오류 발생(unknown SQL type : 108) input { jdbc { ... ... tracking_column => "unix_ts_in_secs" tracking_column_type => "numeric" ... statement => "SELECT name, value, UNIX_TIMESTAMP(time) as unix_ts_in_secs FROM tag where UNIX_TIMESTAMP(time) > :sql_last_value AND time < NOW ORDER BY time" ... } }
- 날짜 microseconds / nanoseconds 단위 표현 문제
- 이건 제가 잘 모르는 것일 수도 있으나, milliseconds 단위까지만 표현된다.
- 실제 아래 statement 가 실행되면 날짜 표현은 microseconds까지 되어있으나, 실제로는 milliseconds 까지만 인식되고 microseconds 는 000 처리된다.
-- logstash jdbc input plugin conf 파일의 statement 해당 부분 statement => "SELECT name, time, value FROM tag where time BETWEEN TO_DATE(:sql_last_value, 'YYYY-MM-DD HH24:MI:SS.mmmuuu') AND NOW" -- statement 실행 결과 로그 [INFO ] 2020-12-18 08:50:45.035 [Ruby-0-Thread-17: :1] jdbc - (0.000260s) SELECT * FROM (SELECT name, time, value FROM tag where time BETWEEN TO_DATE('2020-12-18 08:06:01.555000', 'YYYY-MM-DD HH24:MI:SS.mmmuuu') AND NOW) AS "T1" LIMIT 100000 OFFSET 0
- 2020-12-18 08:06:01.555000(UTC) 는 sql_last_value 값으로 최근 읽어온 마지막 time 값이다.(실제 입력도 milliseconds 까지만 입력되었다.)
- 2020-12-18 08:06:01.555000(UTC) 의 실제 machbase에 입력된 time 값은 2020-12-18 17:06:01 555:666:777 로 2020-12-18 17:06:01 555:666 까지는 읽어 왔어야 한다.
- 하지만, 2020-12-18 17:06:01 555 까지만 읽어온 셈이다. 결국, milliseconds 까지만 인식된다.
맺음말
- Logstash 의 jdbc input plugin 은 주로 RDBMS(mysql, mariaDB) 에 대해 사용되나, Machbase 에서 제공하는 jdbc 를 통해서도 연동이 가능함을 확인하였습니다.
- 연동 테스트를 진행하며, 몇 가지 아쉬웠던 점이 있었으나 사용함에 있어서는 크게 문제는 없을 것으로 보이며,
- 다시 한번, 저희 Machbase 의 다양한 SDK 지원으로 마크베이스의 유연성을 확인할 수 있었습니다.
참고 자료
- ELK 설치 관련 :
- Logstash JDBC 관련 :
아래 링크를 통해 세계 1위 마크베이스 TSDB 무료 다운로드 받을 수 있습니다.
— MACHBASE (@machbase) August 4, 2021
조회수
436