
はじめに
こんにちは、アプリケーションチームの川北です。
LANSCOPE エンドポイントマネージャー クラウド版(以下、クラウド版)では、PC 操作ログの受信基盤を AWS 環境に構築しています。ログの取り込み、および ETL 処理は Amazon Kinesis Data Streams と Amazon Managed Service for Apache Flink を活用しています。
ETL 処理では「サーバー受信日時」を扱う処理があります。ログ受信基盤ではサーバー受信日時として、データを ETL 処理で扱う時刻ではなく、ApproximateArrivalTimestamp(Amazon Kinesis Data Streams がレコードを受信した日時)を用いています。これは処理遅延の影響を小さくするためです。例えば ETL 処理がデータ流量に追いついていない場合、データが送信された時刻と ETL 処理でデータを取り扱う時刻に大きな差が出てしまいます。
今回は、ストリーム処理における時刻の考え方と、Amazon Managed Service for Apache Flink で ApproximateArrivalTimestamp を使うための実装方法についてご紹介します。
ストリーム処理と時刻
ストリーム処理は大きく以下の 3 つの要素から構成されます。
- Data Source
- データの発生元(例:WEBサイトのクリックストリームデータ、IoTのセンサーデータ)
- Stream storage
- バッファリング用のストレージ(例:Amazon Kinesis Data Streams)
- Consumer
- データを取り出して処理する部分(例:Amazon Managed Service for Apache Flink)
したがって、ひとえに「時刻」と言ってもいくつかの種類があります。例えば Event Time、Ingestion Time、Processing Time の 3 つが挙げられます。
Event Time はデータの「生成時刻」を指し、Data Source でデータが生成されたタイミングとなります。例えばクリックストリームデータの場合、ページの表示やリンクをクリックしたタイミングです。
Ingestion Time はデータの「取り込み時刻」を指し、Data Source から Stream Storage に取り込まれたタイミングとなります。例えばクリックストリームデータを Amazon Kinesis Data Streams が受信したタイミングです。
Processing Time はデータの「処理時刻」を指し、Consumer が Stream Storage 内のイベントを取り出すタイミングとなります。例えば Amazon Managed Service for Apache Flink が Amazon Kinesis Data Streams からデータを取り出したタイミングです。
それぞれの時刻の特性を考慮し、使い分ける必要があります。

クラウド版では ETL 処理の一部で Ingestion Time を使っており、具体的には ApproximateArrivalTimestamp を採用しています。 以下、Amazon Managed Service for Apache Flink で ApproximateArrivalTimestamp を取得する方法について紹介します。
実装
前提として、Amazon Managed Service for Apache Flink で Amazon Kinesis Data Streams のレコードを読み取るにはコネクタを用います。具体的には、AWS が OSS として提供している flink-connector-aws-kinesis-streams を用います。
結論、以下のように実装することで ApproximateArrivalTimestamp を取り扱うことができます。次の節からもう少し掘り下げて説明をします。
KinesisDeserializationSchemaを継承したスキーマを作成するdeserializeメソッドに渡されるRecordからapproximateArrivalTimestampを取得する
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} import org.apache.flink.connector.kinesis.source.serialization.KinesisDeserializationSchema import org.apache.flink.util.Collector import software.amazon.awssdk.services.kinesis.model.Record import java.nio.charset.StandardCharsets // KinesisDeserializationSchema を継承 class TimestampSchema extends KinesisDeserializationSchema[String] { override def deserialize(record: Record, stream: String, shardId: String, output: Collector[String]): Unit = { // ApproximateArrivalTimestamp を取得 val timestamp = record.approximateArrivalTimestamp().getEpochSecond val data = new String(record.data().asByteArray(), StandardCharsets.UTF_8) if (data != null) { output.collect(s"$timestamp, $data") } } override def getProducedType: TypeInformation[String] = BasicTypeInfo.STRING_TYPE_INFO }
実装の詳細: KinesisDeserializationSchema
KinesisDeserializationSchema は flink-connector-aws-kinesis-streams で定義されている、Amazon Kinesis DataStreams 専用の deserialization インタフェースです。データを受信すると deserialize メソッドでデータが変換され、次の処理に渡されるような仕組みです。
引数の Record は Amazon Kinesis DataStreams のレコードを表します。レコードのデータ・メタデータを保持しており、ApproximateArrivalTimestamp も取得できます。また、Collector<T> は変換したデータを収集し次の処理に渡す役割を持ちます。
したがって KinesisDeserializationSchema を継承したクラスを実装し、deserialize メソッドは以下のように実装すれば完成です。
- ApproximateArrivalTimestamp を取得し、変換後のデータに含める
- 変換したデータを
Collectorに渡す
なお、ApproximateArrivalTimestamp は Record クラスの approximateArrivalTimestamp メソッドで取得できます。また、Collector にデータを渡す時は collect メソッドを使います。
public interface KinesisDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> { default void open(DeserializationSchema.InitializationContext context) throws Exception {} void deserialize(Record record, String stream, String shardId, Collector<T> output) throws IOException; static <T> KinesisDeserializationSchema<T> of(DeserializationSchema<T> deserializationSchema) { return new KinesisDeserializationSchemaWrapper<>(deserializationSchema); } }
補足
KinesisDeserializationSchema の of メソッドについて補足となります。これは Amazon Kinesis DataStreams に限らず使える汎用的な deserialization インタフェースから KinesisDeserializationSchema を作成するメソッドです。
例えばAWS Big Data Blog: Introducing the new Amazon Kinesis source connector for Apache Flink で紹介されている以下のサンプルコードでは、deserialization に SimpleStringSchema が使われています。
KinesisStreamsSource<String> kdsSource = KinesisStreamsSource.<String>builder() .setStreamArn("arn:aws:kinesis:us-east-1:123456789012:stream/test-stream") .setDeserializationSchema(new SimpleStringSchema()) .build();
SimpleStringSchema 単体では Record クラスを扱うような実装にはなっていませんが、setDeserializationSchema で of メソッドが呼ばれるために、Data Source が Amazon Kinesis DataStreams の場合でも使うことができます。
public KinesisStreamsSourceBuilder<T> setDeserializationSchema( DeserializationSchema<T> deserializationSchema) { this.deserializationSchema = KinesisDeserializationSchema.of(deserializationSchema); return this; }
なお、of メソッドで生成される KinesisDeserializationSchemaWrapper は KinesisDeserializationSchema を継承した以下のクラスです。Record のデータだけを取り出し、ラップしているスキーマに渡します。
このようにレコードのデータ部分だけを取り扱う場合は、既存のスキーマを使うのが便利かと思います。
class KinesisDeserializationSchemaWrapper<T> implements KinesisDeserializationSchema<T> { private static final long serialVersionUID = 9143148962928375886L; private final DeserializationSchema<T> deserializationSchema; KinesisDeserializationSchemaWrapper(DeserializationSchema<T> deserializationSchema) { this.deserializationSchema = deserializationSchema; } @Override public void open(DeserializationSchema.InitializationContext context) throws Exception { this.deserializationSchema.open(context); } @Override public void deserialize(Record record, String stream, String shardId, Collector<T> output) throws IOException { deserializationSchema.deserialize(record.data().asByteArray(), output); } @Override public TypeInformation<T> getProducedType() { return deserializationSchema.getProducedType(); } }
おわりに
本記事では、Amazon Managed Service for Apache Flink で Amazon Kinesis Data Streams の ApproximateArrivalTimestamp を取得する方法を紹介しました。ストリーム処理では時刻の扱いが重要な要素となるため、要件に応じて適切な時刻を選択することが大切です。
また、ライブラリのドキュメントや実装を確認することの重要さを改めて認識することができました。
ここまでお読みいただきありがとうございました。
本内容がお役に立てれば幸いです。