Posting

Machbase의 최신 소식을 지금 만나보세요

[MACHBASE 연동] Logstash – Elasticsearch 연동 방법

개요

image2020-12-21 11_53_17.png

Machbase 에 입력된 데이터를 Logstash 를 통해 Elasticsearch 에도 데이터를 입력하고자 하는 경우에 대한 연동 방법에 대해 기술합니다.

Machbase는 JDBC 환경을 지원하며,

Logstash 에서 제공하는 여러 Input plugin 중에 Jdbc input plugin 을 통해 연동하고자 합니다.

테스트 환경

Machbase, Logstash, Elasticsearch 는 설치가 되어 있다는 가정 하에 진행하며,

Machbase의 테이블은 일반적으로 많이 사용하시는 TAG 테이블로 진행합니다.

Logstash, Elasticsearch 설치는 아래 링크를 참고하시면 되겠습니다.

항목Version
OSCentOS 7.7
MachbaseFog 6.1.11
Logstash7.10.1
Elasticsearch7.10.1
etccurl 7.29.0

테스트 순서

1) Machbase Jar 파일 설정

  • machbase.jar 파일 copy 및 권한 변경sudo cp /home/machbase/machbase_home/lib/machbase.jar /usr/share/logstash/lib sudo chown logstash:logstash /usr/share/logstash/lib/machbase.jar

2) Machbase 테이블 생성

  • TAG 테이블 생성create tagdata table tag (name varchar(20) primary key, time datetime basetime, value double summarized);

3) Logstash jdbc input plugin conf 파일 설정

  • /etc/logstash/conf.d/logstash-jdbc-es.conf 파일(생성 필요)input { jdbc { jdbc_driver_library => "/usr/share/logstash/lib/machbase.jar" jdbc_driver_class => "com.machbase.jdbc.driver" jdbc_connection_string => "jdbc:machbase://localhost:5656/mhdb" jdbc_user => "sys" jdbc_password => "manager" jdbc_paging_enabled => true jdbc_page_size => 100000 tracking_column => "time" tracking_column_type => "timestamp" use_column_value => true schedule => "*/5 * * * * *" statement => "SELECT name, time, value FROM tag where time BETWEEN TO_DATE(:sql_last_value, 'YYYY-MM-DD HH24:MI:SS.mmmuuu') AND NOW" last_run_metadata_path => "/last_run/logstash-jdbc-es" } } filter { ruby { code => "event.set('time', event.get('time').time.localtime('+09:00').strftime('%Y-%m-%d %H:%M:%S.%3N'))" } mutate { add_field => { "pkid" => "%{name}_%{time}" } remove_field => "@version" } } output { # stdout { codec => "rubydebug"} elasticsearch { hosts => ["localhost:9200"] index => "tag" document_id => "%{pkid}" } }
  • last_run_metadata_path 설정sudo mkdir /last_run sudo chown logstash:logstash /last_run
  • sql_last_value 초기 값 설정
    sql_last_value 의 초기 값은 0 또는 ‘1970-01-01T00:00:00 Z’ 이다
    하지만,  위 TO_DATE(:sql_last_value, ‘YYYY-MM-DD HH24:MI:SS.mmmuuu’) 쿼리문에서 machbase의 TO_DATE 는 ‘1970-01-01 09:00:00’ 보다 큰 값일 경우에 정상 처리된다. 
    하여, /last_run/logstash-jdbc-es 파일에 아래와 같이 입력하도록 한다.--- 1970-01-01 09:00:01.000000000 Z
  • logstash 재구동 및 로그 확인systemctl restart logstash 한 후, tail -f /var/log/logstash/logstash-plain.log 로 실행 로그 확인 또는, /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/logstash-jdbc-es.conf 로 바로 확인 가능

4) Machbase 데이터 입력

  • 데이터 입력 및 확인원래대로 라면, 대량의 입력 Append 함수를 통해 (Bulk)입력을 해야 하지만 테스트이므로 5개의 데이터만 입력함이 후, Elasticsearch 에서 아래 5개의 데이터가 입력되는지 확인해보겠습니다.-- 데이터 입력 쿼리 insert into tag values ('EQ1^TAG1', to_date('2020-12-18 12:00:01 111:222:333'), 1); insert into tag values ('EQ1^TAG1', to_date('2020-12-18 13:00:01 222:333:444'), 2); insert into tag values ('EQ2^TAG1', to_date('2020-12-18 14:05:01 333:444:555'), 1); insert into tag values ('EQ2^TAG1', to_date('2020-12-18 15:06:01 444:555:666'), 2); insert into tag values ('EQ2^TAG1', to_date('2020-12-18 17:06:01 555:666:777'), 3); -- 입력된 데이터 조회 결과 Mach> select * from tag; NAME TIME VALUE -------------------------------------------------------------------------------------- EQ1^TAG1 2020-12-18 12:00:01 111:222:333 1 EQ1^TAG1 2020-12-18 13:00:01 222:333:444 2 EQ2^TAG1 2020-12-18 14:05:01 333:444:555 1 EQ2^TAG1 2020-12-18 15:06:01 444:555:666 2 EQ2^TAG1 2020-12-18 17:06:01 555:666:777 3 [5] row(s) selected. Elapsed time: 0.001 Mach>

5) Elasticsearch 로 입력된 데이터 확인

  • 모든 데이터 조회5개의 모든 데이터가 입력되었음을 확인할 수 있습니다.curl -XGET http://localhost:9200/tag/_search?pretty -H 'Content-Type: application/json' -d '{ "query": { "match_all": {} } }' { "took" : 0, "timed_out" : false, "_shards" : { "total" : 1, "successful" : 1, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : { "value" : 5, "relation" : "eq" }, "max_score" : 1.0, "hits" : [ { "_index" : "tag", "_type" : "_doc", "_id" : "EQ1^TAG1_2020-12-18 12:00:01.111", "_score" : 1.0, "_source" : { "name" : "EQ1^TAG1", "@timestamp" : "2020-12-18T09:18:05.315Z", "time" : "2020-12-18 12:00:01.111", "pkid" : "EQ1^TAG1_2020-12-18 12:00:01.111", "value" : 1.0 } }, { "_index" : "tag", "_type" : "_doc", "_id" : "EQ1^TAG1_2020-12-18 13:00:01.222", "_score" : 1.0, "_source" : { "name" : "EQ1^TAG1", "@timestamp" : "2020-12-18T09:18:05.316Z", "time" : "2020-12-18 13:00:01.222", "pkid" : "EQ1^TAG1_2020-12-18 13:00:01.222", "value" : 2.0 } }, { "_index" : "tag", "_type" : "_doc", "_id" : "EQ2^TAG1_2020-12-18 14:05:01.333", "_score" : 1.0, "_source" : { "name" : "EQ2^TAG1", "@timestamp" : "2020-12-18T09:18:05.316Z", "time" : "2020-12-18 14:05:01.333", "pkid" : "EQ2^TAG1_2020-12-18 14:05:01.333", "value" : 1.0 } }, { "_index" : "tag", "_type" : "_doc", "_id" : "EQ2^TAG1_2020-12-18 15:06:01.444", "_score" : 1.0, "_source" : { "name" : "EQ2^TAG1", "@timestamp" : "2020-12-18T09:18:05.317Z", "time" : "2020-12-18 15:06:01.444", "pkid" : "EQ2^TAG1_2020-12-18 15:06:01.444", "value" : 2.0 } }, { "_index" : "tag", "_type" : "_doc", "_id" : "EQ2^TAG1_2020-12-18 17:06:01.555", "_score" : 1.0, "_source" : { "name" : "EQ2^TAG1", "@timestamp" : "2020-12-18T09:18:05.317Z", "time" : "2020-12-18 17:06:01.555", "pkid" : "EQ2^TAG1_2020-12-18 17:06:01.555", "value" : 3.0 } } ] } }

아쉬웠던 점

  • UNIX_TIMESTAMP() 함수 호환 문제
    • 날짜범위를 지정하기 위해 일반적으로 jdbc conf 설정 파일에서 아래와 같이 tracking_column_type 과 query statement 를 사용한다.
    • 하지만, UNIX_TIMESTAMP(time) as unix_ts_in_secs 로 tracking_column 을 지정하면 오류가 발생한다. (machsql 상에서는 정상 쿼리 동작한다.) -- 쿼리 실행 오류 발생(unknown SQL type : 108) input { jdbc { ... ... tracking_column => "unix_ts_in_secs" tracking_column_type => "numeric" ... statement => "SELECT name, value, UNIX_TIMESTAMP(time) as unix_ts_in_secs FROM tag where UNIX_TIMESTAMP(time) > :sql_last_value AND time < NOW ORDER BY time" ... } }
  • 날짜 microseconds / nanoseconds 단위 표현 문제
    • 이건 제가 잘 모르는 것일 수도 있으나, milliseconds 단위까지만 표현된다.
    • 실제 아래 statement 가 실행되면 날짜 표현은 microseconds까지 되어있으나, 실제로는 milliseconds 까지만 인식되고 microseconds 는 000 처리된다.-- logstash jdbc input plugin conf 파일의 statement 해당 부분 statement => "SELECT name, time, value FROM tag where time BETWEEN TO_DATE(:sql_last_value, 'YYYY-MM-DD HH24:MI:SS.mmmuuu') AND NOW" -- statement 실행 결과 로그 [INFO ] 2020-12-18 08:50:45.035 [Ruby-0-Thread-17: :1] jdbc - (0.000260s) SELECT * FROM (SELECT name, time, value FROM tag where time BETWEEN TO_DATE('2020-12-18 08:06:01.555000', 'YYYY-MM-DD HH24:MI:SS.mmmuuu') AND NOW) AS "T1" LIMIT 100000 OFFSET 0
    • 2020-12-18 08:06:01.555000(UTC) 는 sql_last_value 값으로 최근 읽어온 마지막 time 값이다.(실제 입력도 milliseconds 까지만 입력되었다.)
    • 2020-12-18 08:06:01.555000(UTC) 의 실제 machbase에 입력된 time 값은 2020-12-18 17:06:01 555:666:777 로 2020-12-18 17:06:01 555:666 까지는 읽어 왔어야 한다.
    • 하지만, 2020-12-18 17:06:01 555 까지만 읽어온 셈이다. 결국, milliseconds 까지만 인식된다.

맺음말

  • Logstash 의 jdbc input plugin 은 주로 RDBMS(mysql, mariaDB) 에 대해 사용되나, Machbase 에서 제공하는 jdbc 를 통해서도 연동이 가능함을 확인하였습니다.
  • 연동 테스트를 진행하며, 몇 가지 아쉬웠던 점이 있었으나 사용함에 있어서는  크게 문제는 없을 것으로 보이며, 
  • 다시 한번, 저희 Machbase 의 다양한 SDK 지원으로 마크베이스의 유연성을 확인할 수 있었습니다.

참고 자료



아래 링크를 통해 세계 1위 마크베이스 TSDB 무료 다운로드 받을 수 있습니다.

 

연관 포스트

C언어로 Binary data를 Machbase에 넣기

1.개요 Data를 다루다 보면 numeric, varchar 형 데이터뿐만 아니라 JPG, PNG, MP3와 같은 Binary data도 database에 저장해야 할 때가 존재한다. 그러나 일반 data들과는 달리 Binary

[MACHBASE 연동] Android studio에서 JDBC 연결하기

마크베이스 기술지원본부 이현민 1. 개요 수많은 데이터들이 많은 환경에서 생성되고 있는 오늘날, 우리 현대인들의 동반자인 스마트폰 또한 데이터생성의 주체로써 또는 전달자로서 알게 모르게 그 구실을