ETL 定義と変換マップを使用して、Kafka イベントを処理します。ETL 定義と変換マップは、スケジュール設定済みインポートを通じてデータを取得するときに使用する変換ロジックを指定します。同じ ETL 定義と変換マップを使用して、Kafka を介して受信したイベントを変換できます。
手順
-
ETL および変換マップのコンシューマーメッセージを消費するトピックを作成します。
-
次のコマンドを使用して、Hermes にトピックを作成します。
この例では、トピック名は topic2 です。
bin/kafka-topics.sh --create --command-config config/bootcamp.properties --bootstrap-server <instance name>.service-now.com:4000,<instance name>.service-now.com:4001,<instance name>.service-now.com:4002, <instance name>.service-now.com:4003 --topic snc.<instance name>.topic2
必ず
<instance name> をインスタンス名で置き換えてください。
-
トピックを表示するには、インスタンスにログインし、次の場所に移動します。 .
-
トピックからデータを消費する ETL コンシューマーを作成します。
-
移動先 .
-
[新規] を選択します。
-
Kafka コンシューマーフォームで、[名前] フィールドに名前を入力します。
この例では、Member Import という名前を使用します。
-
[強力なインポートセットトランスフォーマー] フィールドで、既存の強力なインポートセットトランスフォーマーを選択します。
強力なインポートセットトランスフォーマーの場合は、データを単一列に格納するオプションを使用していないものを選択します。メッセージを表示できるようにするには、強力なインポートセットトランスフォーマーで [詳細 ] オプションが有効になっていることを確認してください。
-
[単一列のデータ] オプションの選択を解除します。
このオプションは、単一列のデータを含むインポートセットテーブルに対してのみ選択する必要があります。
-
[列マッピング] を [ラベル] に設定します。
これは、入力メッセージの JSON キーがインポートセットテーブルの列ラベルにマッピングされることを意味します。
-
フォームを保存します。
-
コンシューマー用に Kafka ストリームの作成 を行います。
-
[Kafka ストリーム] フォームで、次のフィールド値を設定します。
-
[関連リンク] の下で [アクティブ化] リンクを選択します。
アクティブ化されたサブスクリプションを確認できるはずです。
-
topic2 にメッセージを公開します。
-
次のコマンドを実行して、topic2 にメッセージを公開します。
bin/kafka-console-producer.sh --topic snc.<instance name>.topic2 --producer.config config/bootcamp.properties --bootstrap-server <instance name>.service-now.com:4000,<instance name>.service-now.com:4001,<instance name>.service-now.com:4002,<instance name>.service-now.com:4003
必ず <instance name> をインスタンス名で置き換えてください。
このコマンドはメッセージの入力を促すものです。
-
次の JSON メッセージを送信します。
{"city":"San Diego","name":"Jhon","id":"SN001","state":"California"}
このメッセージは任意の値で送信できますが、JSON メッセージキーはインポートセットテーブルのラベルと一致する必要があります。
-
作成した ETL コンシューマーに戻ります。
約 1 分後、インポートセットを介してデータがインポートされているはずです。
-
インポートセットに関する詳細情報を表示するには、インポートセット番号を選択します。
-
インポートセット行に関する詳細情報を表示するには、インポートセット行番号を選択します。
-
データを消費する変換マップコンシューマーを作成します。
-
移動先 .
-
[新規] を選択します。
-
[Kafka 変換マップコンシューマー] フォームで、[名前] フィールドに名前を入力します。
-
[変換マップ] フィールドで、既存の変換マップを選択します。
-
フォームを保存します。
-
コンシューマー用に Kafka ストリームの作成 を行います。
-
[Kafka ストリーム] フォームで、次のフィールド値を設定します。
-
[関連リンク] の下で [アクティブ化] リンクを選択します。
アクティブ化されたサブスクリプションを確認できるはずです。
-
topic2 にメッセージを公開します。
-
次のコマンドを実行して、topic2 にメッセージを公開します。
bin/kafka-console-producer.sh --topic snc.<instance name>.topic2 --producer.config config/bootcamp.properties --bootstrap-server <instance name>.service-now.com:4000,<instance name>.service-now.com:4001,<instance name>.service-now.com:4002,<instance name>.service-now.com:4003
必ず <instance name> をインスタンス名で置き換えてください。
このコマンドはメッセージの入力を促すものです。
-
次の JSON メッセージを送信します。
{"city":"San Diego","name":"Jhon","id":"SN001","state":"California"}
このメッセージは任意の値で送信できますが、JSON メッセージキーはインポートセットテーブルのラベルと一致する必要があります。
-
作成した変換マップコンシューマーに移動します。
約 1 分後、インポートセットを介してデータがインポートされているはずです。
-
インポートセットに関する詳細情報を表示するには、上記ステップ 3d と同様にインポートセット番号を選択します。