みなさんこんにちは。エンジニアの佐藤です。今回はIoTの話題をご紹介させていただきたいと思います。
重複データの蠢き
先日、とあるIoTシステムのログデータをETL(Extract, transform, load)処理によりDWHに収めました。このシステムは運用暦約1年で、利用量は右肩上がりで上昇してきました。蓄積データの活用が企画され、数億件のデータがDWHに集積されました。多数のデバイスが毎日送信して積み上げてきたものです。
ですが、およそコンピュータと名の付く物、何事も疑いの目で見なければなりません。筆者がまず気になったのは、「まともなデータなのか」という点です。数億レコードともなると、もはや個別確認はできません。どんなゴミが混入しているのかわかったものではありません。
「まともなデータ」の基準はいろいろあると思いますが、筆者が一番重要だと思うのは、一意性です。RDBMSで言えば「テーブルのキー項目は何か」ということになります。DWHにユニーク制約はありませんから、スキーマが一意性を担保することはありません。まずこの点から確認する必要があります。
このシステムの場合、送信データにはデバイスのID、対象事象発生時のデバイス時刻、シーケンス番号が含まれています。シーケンス番号は電源OFFでリセットされますから、一意ではありませんが、デバイスのIDは重複がないことが保証されています。時刻は外部の信頼できる時計に自動補正ですから、一旦正確だと前提しましょう。詳細は省きますが、 デバイスID、事象発生時のデバイス時刻、ログ到着時刻の全てが重複することは、仕様上無いはずです。 DWHにはBigQueryを使っていますので、以下のようにクエリしてこの点を確認してみました。
#standardSQL SELECT xxxxx AS dt_log, xxxxx AS device_id, xxxxx AS device_time, COUNT(*) count FROM table02 WHERE _PARTITIONTIME = TIMESTAMP('2018-06-10') GROUP BY dt_log, device_id, device_time HAVING count > 1 ※ 一部は伏字または仮の値にしています。 +-------------------------+-----------+---------------------+-------+ | dt_log | device_id | device_time | count | +-------------------------+-----------+---------------------+-------+ | 2018-06-10 00:00:00.045 | 4641 | 2018-06-09 23:59:56 | 2 | | 2018-06-10 00:00:00.572 | 9458 | 2018-06-09 23:59:57 | 2 | | 2018-06-10 00:00:00.014 | 9499 | 2018-06-09 23:59:57 | 2 | | 2018-06-10 00:00:00.235 | 1686 | 2018-06-09 23:59:58 | 2 | | 2018-06-10 00:00:00.195 | 1712 | 2018-06-09 23:59:58 | 2 | +-------------------------+-----------+---------------------+-------+
それ見たことか。ダブりデータです。 日付が変わった直後から約1秒間の間で発生しているように見えます。念には念を入れて受信した生データも比較してみましたが、完全に重複しています。 これはいわばデータの「バグ」。原因を特定せねばなりません。
何と何がダブっているのか
同じデータが2回DWHに登録された理由は何か?この原因追求のために役立つのが、私が「経路情報」と呼んでいる、レコードの出自に関する記録です。ETL処理の節目ごとにこの経路情報を追記していき、全てのレコードについて元ネタが特定できるように仕掛けておくのです。
今回のダブりデータについて経路情報(log_file_pos)を表示すると、以下のようになりました。簡単に解説しますと、「2018-06-09/1/343159(00717206)/00」は、2018年06月09日のログファイルに記録された、受信ホスト1番が出力したログファイルの第343159番目のデータで、ファイルの第717206行目に記録されている、という意味です。(末尾の「/00」は説明を省略します。)
+-------------------------+-----------+---------------------+----------------------------------+ | dt_log | device_id | device_time | log_file_pos | +-------------------------+-----------+---------------------+----------------------------------+ | 2018-06-10 00:00:00.235 | 1686 | 2018-06-09 23:59:58 | 2018-06-09/1/343159(00717206)/00 | | 2018-06-10 00:00:00.235 | 1686 | 2018-06-09 23:59:58 | 2018-06-10/1/003(00000006)/00 | | 2018-06-10 00:00:00.195 | 1712 | 2018-06-09 23:59:58 | 2018-06-09/1/343158(00717205)/00 | | 2018-06-10 00:00:00.195 | 1712 | 2018-06-09 23:59:58 | 2018-06-10/1/002(00000005)/00 | | 2018-06-10 00:00:00.045 | 4641 | 2018-06-09 23:59:56 | 2018-06-09/1/343157(00717202)/00 | | 2018-06-10 00:00:00.045 | 4641 | 2018-06-09 23:59:56 | 2018-06-10/1/001(00000002)/00 | | 2018-06-10 00:00:00.572 | 9458 | 2018-06-09 23:59:57 | 2018-06-09/1/343160(00717207)/00 | | 2018-06-10 00:00:00.572 | 9458 | 2018-06-09 23:59:57 | 2018-06-10/1/004(00000007)/00 | | 2018-06-10 00:00:00.014 | 9499 | 2018-06-09 23:59:57 | 2018-06-09/1/343156(00717201)/00 | | 2018-06-10 00:00:00.014 | 9499 | 2018-06-09 23:59:57 | 2018-06-10/1/000(00000001)/00 | +-------------------------+-----------+---------------------+----------------------------------+
この経路情報をダブりデータで比較してみると、連続する2つの日付の、ちょうど境界あたりに分布していることがわかります。ここまでくると、ダブりの原因は、日付ごとにログファイルを分ける処理にある可能性が高いと考えるのが妥当でしょう。
原因は AWS API CloudWatch Logs CreateExportTask の仕様誤認+だった
今回ETL処理のインプットとなったログファイルは、AWS CloudWatch Logsに出力されたログを定期的にエクスポートしてS3へ保存させたものです。このエクスポート処理の設定は定時ジョブで実行しており、以下のようになっています。
client = boto3.client('logs') response = client.create_export_task( logGroupName = log_group_name, fromTime = from_ts * 1000, to = to_ts * 1000, destination = s3_bucket_name, destinationPrefix = s3_prefix )
from_ts と to_ts はちょうど24時間(=86400秒)離れています。一見良さそうですが、この種の境界設定については、常に「inclusive(指定値を含む)」なのか「exclusive(指定値は含まない)」かを注意する必要があります。以下のURLからAPIの仕様を確認してみましょう。
https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_CreateExportTask.html
Request Parameters
- from
- The start time of the range for the request, expressed as the number of milliseconds after Jan 1, 1970 00:00:00 UTC. Events with a time stamp earlier than this time are not exported.
- to
- The end time of the range for the request, expressed as the number of milliseconds after Jan 1, 1970 00:00:00 UTC. Events with a time stamp later than this time are not exported.
否定形で書かれていますが、本質的には inclusiveです。 つまり、ちょうど24時間分(=86400000ミリ秒)離れた2つの値をtoとfromに指定した場合、1ミリ秒分はダブる ということです。この点については、不注意だったと言えます。
原因がわかりました!と言いたいところですが、あれ?何かおかしいですよね。ダブりデータは「2018-06-10 00:00:00.045」から「2018-06-10 00:00:00.572」に分布していたのです。1ミリ秒ではなく、1秒ですね。結局AWSのこのAPI、ミリ秒指定と言っておきながら、ミリ秒部分は評価していないようです。
上記のAPIリファレンスの文章に当てはめてみましょう。2018-06-09のログファイルについて、
from: Events with a time stamp earlier than 2018-06-09 00:00:00.000 are not included.
to: Events with a time stamp later than 2018-06-10 00:00:00.000 are not included.
となっているわけですが、実際には 2018-06-10 00:00:00.000 から、少なくとも2018-06-10 00:00:00.572 までのログが、入ってしまっているわけです。 ミリ秒単位で指定させておきながら、これはひどいよな、と思います。
ExclusiveにすればOK、なのか?
AWSのAPI仕様はわかりました。では、以下のようにエクスポート設定コードを改めればOKでしょうか?
client = boto3.client('logs') response = client.create_export_task( logGroupName = log_group_name, fromTime = from_ts * 1000, to = (to_ts * 1000) - 1, destination = s3_bucket_name, destinationPrefix = s3_prefix )
つまり、2018-06-09のログファイルについては以下のような指定になるわけです。
from: Events with a time stamp earlier than 2018-06-09 00:00:00.000 are not included.
to: Events with a time stamp later than 2018-06-09 23:59:59.999 are not included.
筆者は、これは危険な設定だと思います。今度はエクスポートの欠損が起こる可能性があるからです。 例えば、CloudWatch Logsにログが 2018-06-09 23:59:59.9995 に到着したら、取りこぼされてしまいます。
では、本当に23:59:59.9995などというタイミングでログが到着したという事象が発生するのでしょうか?
以下のCloudWatch Logsログ取得APIの仕様によると、
https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_GetLogEvents.html
https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_OutputLogEvent.html
ログの収録時刻(ingestionTime)は「ミリ秒単位の整数」となっていますので、23:59:59.9995は、23:59:59.999か、翌日00:00:00.000に丸められるでしょうから、現時点では大丈夫そうではあります。
しかし現在巷で使用されているハードウェアクロックの時間分解能は1ナノ秒となっており、AWS CloudWatch Logsがミリ秒よりさらに細かい到着時刻記録をサポートした場合、取りこぼしの心配が生じる可能性があります。(あくまで仕様上の話ですが。。。)
筆者の結論としては、このエクスポート設定コードは、このままにしておくことにしました。 データの受信記録が永久的に失われるというのは非常に恐ろしい出来事ですので、潜在的な危険はできるだけ排除したい。それなら多少の手間をかけ、後から重複を除去した方がマシ、だと思ったのです。
これで終わり、ではなかった!
これでダブりデータの原因は特定されました。安心して眠れそうです。。。しかし、本当にそうなのでしょうか…?
疑い深い筆者は、今度は「00:00:00.000 ~ 00:00:00.999 以外の時間帯のダブりデータ」を探してみました。そして。。。見つけてしまったのです。
+-------------------------+-----------+---------------------+---------------------------------+ | dt_log | device_id | device_time | log_file_pos | +-------------------------+-----------+---------------------+---------------------------------+ | 2018-06-12 14:37:31.449 | 4896 | 2018-06-12 14:36:54 | 2018-06-12/2/65245(00158574)/00 | | 2018-06-12 14:37:31.449 | 4896 | 2018-06-12 14:37:00 | 2018-06-12/2/65258(00158587)/00 | | 2018-06-12 14:37:31.449 | 4896 | 2018-06-12 14:37:00 | 2018-06-12/2/65244(00158573)/00 | | 2018-06-12 14:37:31.449 | 4896 | 2018-06-12 14:37:03 | 2018-06-12/2/65246(00158575)/00 | | 2018-06-12 14:37:31.449 | 4896 | 2018-06-12 14:37:03 | 2018-06-12/2/65250(00158579)/00 | | 2018-06-12 14:37:31.449 | 4896 | 2018-06-12 14:36:54 | 2018-06-12/2/65248(00158577)/00 | | 2018-06-12 14:37:31.449 | 4896 | 2018-06-12 14:36:57 | 2018-06-12/2/65257(00158586)/00 | | 2018-06-12 14:37:31.449 | 4896 | 2018-06-12 14:36:57 | 2018-06-12/2/65247(00158576)/00 | | 2018-06-13 00:44:05.565 | 2164 | 2018-06-13 00:43:57 | 2018-06-13/2/61588(00078056)/00 | | 2018-06-13 00:44:05.565 | 2164 | 2018-06-13 00:43:57 | 2018-06-13/2/61587(00078055)/00 | +-------------------------+-----------+---------------------+---------------------------------+
このダブりはこれまで特定した原因だけでは説明できません。一体何が起こったのか、筆者は元ログを調査しました。その結果、同時刻に同一データの到着が2回記録されていることを発見してしまいました。
158575 2018-06-12 23:37:31.449 : trackingProc len:36 data:4dxxxx... 158579 2018-06-12 23:37:31.449 : trackingProc len:36 data:4dxxxx... ※ ここでは時刻はデバイスからのデータを受信するインスタンスのシステム時刻で、JSTで書かれている。
そんなばかなと思ってログファイルのエクスポート元のAWS CloudWatch Logsもクエリしてみましたが、上記の2行のログは、全く同時刻に到着しています。
$ aws logs get-log-events --cli-input-json "`cat query_aws_logs.json`" --profile=xxxx | jq -r '.events[]|select(.message | contains("4dxxxx..."))|.ingestionTime' 1528814254774 1528814254774
次はネットワーク通信ログを、と言いたいところですが、残念ながらここまでは記録していませんでした。どこかで想定外の重複動作があったのでしょうが、真実はやぶの中です。原因が特定できないので対策方針が立つわけもなく、データクレンジングで対処するしかありません。今回の場合は同一デバイス・同一時刻に発生する物理事象はひとつに決まっていますから、ダブりデータのうちひとつだけを採用すれば良いでしょう。
この類の現象にどんな姿勢で臨めば良いのか
ここまでくると、「もう何を信じて良いやら」という気持ちになってきました。結局ログの記録、転送のそれぞれの段階で、ぴったり完璧とは行かない例外事象が起こってしまっているのです。そしてマニュアルも厳密でなく、オーナーシップがあるわけでもないので仕様は変わる。こういう状態でデータ分析をやるときは、いかなるポリシーを定めて事に取りかかればいいのでしょうか?
筆者の結論は以下のようなものです。
- まず原理的に、分散環境でデータ転送する時は、「消えてもいいから最大1回(at most once)」「消えないがダブる可能性あり(at least once)」の2択。
- そしてデータが途中で消えるのは困るので、ダブりを許容するしかない。
- ダブりは、最後に洗い落とすしかない。
プログラムコードのデバッグにも似た対策措置が、データに対しても必要なようです。
最後までお読みいただき、ありがとうございました。