5 Treasure Data からのデータ取得

ここからはTreasure Data (TD) コンソールではなく, pytdtdclient を利用して Python 上で操作する前提で説明します. よって, データを機械学習モデルに使えるように整形する処理は主に Python を使い, TD (HiveQL) クエリの操作は最低限としています. しかし, データの量によっては, 分散処理の可能なTDでの対処が必要な場合もあります.

pytdtdclinent の違いは, 前者が後者に依存しており, 後者はより低レイヤの制御を行っているということです. つまり単にクエリで集計するだけならば pytd の方が簡単に書けますが, 複雑なオプションやデータベースの操作をしたい場合は tdclient を使うことになります. 今回のインターンでは, (Python で主要な処理をする限り) ほとんどの操作は pytd で可能です.

import yaml
from pathlib import Path
import pytd
import tdclient
import pandas as pd
with Path().home().joinpath('conf/conf.yml').open('r') as f:
  doc = yaml.load(f, Loader=yaml.SafeLoader)
  tdcl = pytd.Client(database=doc['td']['db'], default_engine='hive',
                     apikey=doc['td']['apiKey'])
  td_client = tdclient.Client(doc['td']['apiKey'])
  del doc

Pythonに直接インポートすることもできますが, 万が一クラッシュした際のリカバリにかかる時間が惜しいため, 作業用DBにテーブルを保存します. 今回はインターン用に work_<名前> という作業用DBを用意しており, 準備編のとおりに設定しているのなら tdcl.database にテーブル名が代入されているはずです. 先ほどのクエリをもう一度 pytd を使って実行し, 結果をテーブルとして保存します. クライアントオブジェクトの設定では hive をデフォルトのエンジンに指定したため, presto を使用したい場合は明示的に指定する必要があります.

q = """
INSERT OVERWRITE TABLE {}.test_table
SELECT TD_TIME_FORMAT(MAX(time), 'yyyy-MM-dd HH:mm:ss', 'JST') AS max_dt,
TD_TIME_FORMAT(MIN(time), 'yyyy-MM-dd HH:mm:ss', 'JST') AS min_dt,
TD_TIME_FORMAT(0, 'yyyy-MM-dd HH:mm:ss', 'JST') AS unix_zero
FROM <HOGEHOGEDB>.table
WHERE TD_TIME_RANGE(time, '2020-06-01', '2020-06-02', 'JST')
""".format(tdcl.database)

_ = tdcl.query(
  "DROP TABLE IF EXISTS {}.test_table".format(tdcl.database), engine='presto')
_ = td_client.create_log_table(tdcl.database, 'test_table')
res = tdcl.query(q, engine_version='stable')
res

この場合, tdcl.query() が返すのはテーブルではなくクエリジョブのステータスです. Presto でテーブルを作成する場合は構文が少し異なります.

q = """
CREATE TABLE {}.test_table AS
SELECT TD_TIME_FORMAT(MAX(time), 'yyyy-MM-dd HH:mm:ss', 'JST') AS max_dt,
TD_TIME_FORMAT(MIN(time), 'yyyy-MM-dd HH:mm:ss', 'JST') AS min_dt,
TD_TIME_FORMAT(0, 'yyyy-MM-dd HH:mm:ss', 'JST') AS unix_zero
FROM <HOGEHOGEDB>.table
WHERE TD_TIME_RANGE(time, '2020-06-01', '2020-06-02', 'JST')
""".format(tdcl.database)

tdcl.query("DROP TABLE IF EXISTS {}.test_table".format(tdcl.database),
           engine='presto')
_ = tdcl.query(q, engine='presto')
pd.DataFrame(
  **tdcl.query('SELECT * FROM {}.test_table'.format(tdcl.database),
               engine='presto'))

実際に使用するデータを取得する例です. 30日程度のログを一度に取得する場合, Presto ではメモリが足りないので Hive を使用します. 今回は, 5-6月のログを使ってください.

このコードは公開版ではご利用になれません

pytd の仕様ではテーブルのメタデータを作ることができないため, クエリ実行前に tdclient を使う必要があります. それが

tdcl.query("DROP TABLE IF EXISTS {}.data_temp".format(tdcl.database),
           engine='presto')
td_client.create_log_table(tdcl.database, 'data_temp')

の部分で, (1) すでに同名のテーブルがある場合は削除, (2) テーブルのメタデータを新規作成, という処理をしています.

ただし, 今回は処理時間待ちをなくすため, こちらでこのクエリの結果を作業用DBに保存しており, 実行は不要です.