深夜の怠惰な技術ブログ

日々の技術的な話題などを徒然なるままに

TerraformによるKinesis Data Firehoseの動的パーティショニングの設定

はじめに

Kinesis Data Firehoseは、Amazon Simple Storage Service(Amazon S3)、Amazon Redshiftなどやサードパーティサービスプロバイダが所有するHTTPエンドポイントなどの送信先へストリーミングデータのリアルタイム配信を実現するマネージドサービスです。Kinesis Data Firehoseのストリーミングデータ受信元としてAWSサービスでは、Kinesis Data StreamsやKinesis Data Analyticsなどがあります。 今回は、このKinesis Data Firehoseの動的パーティショニングをTerraformで設定する方法を記載します。 動的パーティショニングを使用すると、データ内のキーを利用してKinesis Data FIrehoseでストリーミングデータを継続的にパーティショニングし、これらのキーでグループ化されたデータを、対応するS3プレフィックスに配信できます。

Terraformコード

以下はKinesis Data Firehoseの動的パーティショニングを設定するTerraformのサンプルコードです。 ここで注意するのは、動的パーティショニングはKinesis Data Firehoseの初回作成時のみ設定可能ということです。 また、バッファサイズの設定は少なくとも64MBでないとならない点もあります。

resource "aws_kinesis_firehose_delivery_stream" "example" {
  name        = "<kinesis_firehose_delivery_stream_name>"
  destination = "extended_s3"

  extended_s3_configuration {
    bucket_arn          = "<destination_S3_bucket_arn>"
    role_arn            = "<iam_role_arn>"
    buffer_size         = 64
    buffer_interval     = 60
    prefix              = "!{partitionKeyFromQuery:Id}/!{timestamp:yyyy}/!{timestamp:MM}/!{timestamp:dd}/!{timestamp:HH}/"
    error_output_prefix = "errors/!{timestamp:yyyy}/!{timestamp:MM}/!{timestamp:dd}/!{timestamp:HH}/!{firehose:error-output-type}/"

    dynamic_partitioning_configuration {
      enabled = true
    }

    processing_configuration {
      enabled = true

      processors {
        type = "AppendDelimiterToRecord"
        parameters {
          parameter_name  = "Delimiter"
          parameter_value = "\\n"
        }
      }

      processors {
        type = "MetadataExtraction"
        parameters {
          parameter_name  = "JsonParsingEngine"
          parameter_value = "JQ-1.6"
        }
        parameters {
          parameter_name  = "MetadataExtractionQuery"
          parameter_value = "{Id:.Id}"
        }
      }
    }
  }
}

動的パーティショニングの設定

動的パーティショニングの設定自体は以下の部分になります。

dynamic_partitioning_configuration {
  enabled = true
}

ただし、今回はS3バケットへのデータ配信のPrefixとして受信したJSONデータ内のIdを利用する場合には、processing_configurationがの設定も必要となります。

さいごに

今回は備忘のためにサンプルコードのみを記載しました。 お役立ちできれば幸いです。