ETL および変換マップコンシューマーを使用したデータのインポート

  • リリースバージョン: Washingtondc
  • 更新日 2024年02月01日
  • 読む7読むのに数分
  • ETL 定義と変換マップを使用して、Kafka イベントを処理します。ETL 定義と変換マップは、スケジュール設定済みインポートを通じてデータを取得するときに使用する変換ロジックを指定します。同じ ETL 定義と変換マップを使用して、Kafka を介して受信したイベントを変換できます。

    始める前に

    手順

    1. ETL および変換マップのコンシューマーメッセージを消費するトピックを作成します。
      1. 次のコマンドを使用して、Hermes にトピックを作成します。
        この例では、トピック名は topic2 です。
        bin/kafka-topics.sh --create --command-config config/bootcamp.properties  --bootstrap-server <instance name>.service-now.com:4000,<instance name>.service-now.com:4001,<instance name>.service-now.com:4002, <instance name>.service-now.com:4003 --topic snc.<instance name>.topic2
        必ず <instance name> をインスタンス名で置き換えてください。
      2. トピックを表示するには、インスタンスにログインし、次の場所に移動します。 すべて > 統合ハブ > ストリームコネクト > トピック.
        注:
        トピックの作成には約 10 分かかります。
    2. トピックからデータを消費する ETL コンシューマーを作成します。
      1. 移動先 すべて > 統合ハブ > コンシューマー > ETL コンシューマー.
      2. [新規] を選択します。
      3. Kafka コンシューマーフォームで、[名前] フィールドに名前を入力します。
        この例では、Member Import という名前を使用します。
      4. [強力なインポートセットトランスフォーマー] フィールドで、既存の強力なインポートセットトランスフォーマーを選択します。
        強力なインポートセットトランスフォーマーの場合は、データを単一列に格納するオプションを使用していないものを選択します。メッセージを表示できるようにするには、強力なインポートセットトランスフォーマーで [詳細 ] オプションが有効になっていることを確認してください。
      5. [単一列のデータ] オプションの選択を解除します。
        このオプションは、単一列のデータを含むインポートセットテーブルに対してのみ選択する必要があります。
      6. [列マッピング][ラベル] に設定します。

        これは、入力メッセージの JSON キーがインポートセットテーブルの列ラベルにマッピングされることを意味します。

        [Kafka ETL コンシューマー] フォーム。
      7. フォームを保存します。
      8. コンシューマー用に Kafka ストリームの作成 を行います。
      9. [Kafka ストリーム] フォームで、次のフィールド値を設定します。
        • [名前] を「Topic2 Stream」を設定します。
        • [トピック] を前に作成したトピック topic2 に設定します。
        • [最大同時実行] を「1」に設定します。
        • [消費を開始][最初の有効化後に受信したメッセージ (Messages received after the first activation)] に設定します。
        • [メッセージ処理 (Message handling)][自動的に最適化 (Automatically optimize)] に設定します。
      10. [関連リンク] の下で [アクティブ化] リンクを選択します。

        アクティブ化されたサブスクリプションを確認できるはずです。

        topic2 の Kafka ストリームフォームの例。
    3. topic2 にメッセージを公開します。
      1. 次のコマンドを実行して、topic2 にメッセージを公開します。
        bin/kafka-console-producer.sh --topic snc.<instance name>.topic2  --producer.config  config/bootcamp.properties  --bootstrap-server <instance name>.service-now.com:4000,<instance name>.service-now.com:4001,<instance name>.service-now.com:4002,<instance name>.service-now.com:4003

        必ず <instance name> をインスタンス名で置き換えてください。

        このコマンドはメッセージの入力を促すものです。

      2. 次の JSON メッセージを送信します。
        {"city":"San Diego","name":"Jhon","id":"SN001","state":"California"}
        このメッセージは任意の値で送信できますが、JSON メッセージキーはインポートセットテーブルのラベルと一致する必要があります。
      3. 作成した ETL コンシューマーに戻ります。

        約 1 分後、インポートセットを介してデータがインポートされているはずです。

        [インポートセット] タブが入力された Kafka コンシューマーフォーム。
      4. インポートセットに関する詳細情報を表示するには、インポートセット番号を選択します。
        インポートセットの詳細ビュー。
      5. インポートセット行に関する詳細情報を表示するには、インポートセット行番号を選択します。
        インポートセット行の詳細ビュー
    4. データを消費する変換マップコンシューマーを作成します。
      1. 移動先 すべて > 統合ハブ > コンシューマー > 変換マップコンシューマー.
      2. [新規] を選択します。
      3. [Kafka 変換マップコンシューマー] フォームで、[名前] フィールドに名前を入力します。
      4. [変換マップ] フィールドで、既存の変換マップを選択します。
        [変換マップコンシューマー] フォーム。
      5. フォームを保存します。
      6. コンシューマー用に Kafka ストリームの作成 を行います。
      7. [Kafka ストリーム] フォームで、次のフィールド値を設定します。
        • [名前] を「変換マップストリーム」に設定します。
        • [トピック] を前に作成したトピック topic2 に設定します。
        • [最大同時実行] を「1」に設定します。
        • [消費を開始][最初の有効化後に受信したメッセージ (Messages received after the first activation)] に設定します。
        • [メッセージ処理 (Message handling)][自動的に最適化 (Automatically optimize)] に設定します。
      8. [関連リンク] の下で [アクティブ化] リンクを選択します。

        アクティブ化されたサブスクリプションを確認できるはずです。

        サブスクリプション情報を含む変換マップコンシューマーの Kafka ストリームフォーム。
    5. topic2 にメッセージを公開します。
      1. 次のコマンドを実行して、topic2 にメッセージを公開します。
        bin/kafka-console-producer.sh --topic snc.<instance name>.topic2  --producer.config  config/bootcamp.properties  --bootstrap-server <instance name>.service-now.com:4000,<instance name>.service-now.com:4001,<instance name>.service-now.com:4002,<instance name>.service-now.com:4003

        必ず <instance name> をインスタンス名で置き換えてください。

        このコマンドはメッセージの入力を促すものです。

      2. 次の JSON メッセージを送信します。
        {"city":"San Diego","name":"Jhon","id":"SN001","state":"California"}
        このメッセージは任意の値で送信できますが、JSON メッセージキーはインポートセットテーブルのラベルと一致する必要があります。
      3. 作成した変換マップコンシューマーに移動します。
        約 1 分後、インポートセットを介してデータがインポートされているはずです。
      4. インポートセットに関する詳細情報を表示するには、上記ステップ 3d と同様にインポートセット番号を選択します。

    次のタスク

    Kafka プロデューサーステップと ProducerV2 API を使用したメッセージの公開