Prestoを使えばKafkaに入っているメッセージにSQLでクエリが投げれるみたいなのでやってみました。
KafkaからHBaseやらNoSQLに入れ直すのはめんどうなのでPrestoを使ってみた感じです。
ちなみにPrestoは複数のデータソースに対して分散SQLクエリを発行するエンジンです。Kafkaは接続出来るデータソースの1つにすぎません。
環境
CentOS7に全部まとめてインストールして動作を確認します。
- CentOS 7
- Java 1.8
- Presto 0.180
- Kafka 0.11.0.0
kafkaインストール
まず、kafkaのインストールから。
root$ yum update -y |
Kafkaのサポートバージョンはpresto 0.180のドキュメントを見るとKafkaは0.8.xでテストしていると書かれています。
Prestoのチケットを見るとkafka 0.10まではサポートしているみたいです。0.11.0は動くかわからないですが、試しに0.11.0を使ってみることにしました。
ダウンロードして起動する。$ wget http://ftp.jaist.ac.jp/pub/apache/kafka/0.11.0.0/kafka_2.11-0.11.0.0.tgz
$ tar zxfv kafka_2.11-0.11.0.0.tgz
$ cd kafka_2.11-0.11.0.0
$ bin/zookeeper-server-start.sh config/zookeeper.properties &
$ bin/kafka-server-start.sh config/server.properties &
Prestoインストール
Singleで動かす場合も若干設定が必要なのでココを参考に設定します。
ダウンロードします。
$ wget https://repo1.maven.org/maven2/com/facebook/presto/presto-server/0.180/presto-server-0.180.tar.gz |
最小構成の設定をコピペします。
$ cd presto-server-0.180 |
VMに割り当てているメモリが少なかったらしく、jvmに割り当てるメモリを少し減らしました。
起動します。
$ bin/launcher start |
クエリを投げるためにpresto-cliをインストールします。
$ wget https://repo1.maven.org/maven2/com/facebook/presto/presto-cli/0.180/presto-cli-0.180-executable.jar |
Kafkaにデータを入れる
Kafka Connector Tutorialではtpchのデータを使うのでダウンロードしてデータをロードします。
$ curl -o kafka-tpch https://repo1.maven.org/maven2/de/softwareforge/kafka_tpch_0811/1.0/kafka_tpch_0811-1.0.sh |
いくつかERRORが出ますが、データが格納できているようなので大丈夫そうです。
念のため確認したところちゃんとロードできているようです。
$ bin/kafka-topics.sh --zookeeper localhost:2181 --list |
kafkaにクエリを投げてみる
tpchのテーブルが読み込めているはずです。
$ ./presto --catalog kafka --schema tpch |
customerの定義を見ます。
presto:tpch> DESCRIBE customer; |
countしてみます。presto:tpch> SELECT count(*) FROM customer;
_col0
-------
1500
(1 row)
Query 20170714_182346_00004_9aq43, FINISHED, 1 node
Splits: 18 total, 18 done (100.00%)
0:01 [1.5K rows, 411KB] [1.28K rows/s, 352KB/s]
selectします。presto:tpch> SELECT _message FROM customer LIMIT 5;
--------------------------------------------------------------------------------
{"rowNumber":1,"customerKey":1,"name":"Customer#000000001","address":"IVhzIApeR
{"rowNumber":2,"customerKey":2,"name":"Customer#000000002","address":"XSTf4,NCw
{"rowNumber":3,"customerKey":3,"name":"Customer#000000003","address":"MG9kdTD2W
{"rowNumber":4,"customerKey":4,"name":"Customer#000000004","address":"XxVSJsLAG
{"rowNumber":5,"customerKey":5,"name":"Customer#000000005","address":"KvpyuHCpl
(5 rows)
Query 20170714_182355_00005_9aq43, FINISHED, 1 node
Splits: 18 total, 18 done (100.00%)
0:04 [232 rows, 63.1KB] [58 rows/s, 15.9KB/s]
JSONが返ってきているので、これをテーブルとして取り込むためににはPresto側でSchemeの定義が必要みたいです。
Schemeを設定します。$ mkdir etc/kafka
$ cat etc/kafka/tpch.customer.json
{
"tableName": "customer",
"schemaName": "tpch",
"topicName": "tpch.customer",
"key": {
"dataFormat": "raw",
"fields": [
{
"name": "kafka_key",
"dataFormat": "LONG",
"type": "BIGINT",
"hidden": "false"
}
]
},
"message": {
"dataFormat": "json",
"fields": [
{
"name": "row_number",
"mapping": "rowNumber",
"type": "BIGINT"
},
{
"name": "customer_key",
"mapping": "customerKey",
"type": "BIGINT"
},
{
"name": "name",
"mapping": "name",
"type": "VARCHAR"
},
{
"name": "address",
"mapping": "address",
"type": "VARCHAR"
},
{
"name": "nation_key",
"mapping": "nationKey",
"type": "BIGINT"
},
{
"name": "phone",
"mapping": "phone",
"type": "VARCHAR"
},
{
"name": "account_balance",
"mapping": "accountBalance",
"type": "DOUBLE"
},
{
"name": "market_segment",
"mapping": "marketSegment",
"type": "VARCHAR"
},
{
"name": "comment",
"mapping": "comment",
"type": "VARCHAR"
}
]
}
}
Scheme定義することでJSONだった部分がテーブルとして読み込むことができます。
$ bin/launcher restart |
Schemeの定義が必要なのは面倒ですが、いいかんじですね。
おわり。