Kafka メッセージトリガーを使用したフローの作成

  • リリースバージョン: Zurich
  • 更新日 2025年07月31日
  • 所要時間:6分
  • Kafka ストリームからのイベントを処理するフローをビルドします。指定されたトピックでイベントが利用可能になったときにフローを開始します。

    始める前に

    必要なロール:flow_designer または admin

    このトリガーには、ストリームコネクト サブスクリプションが必要です。詳細については、「https://www.servicenow.com/now-platform/workflow-data-fabric.html」を参照してください。

    このトリガーには、 ServiceNow Stream Connect Installer [com.glide.hub.stream_connect.installer] プラグインが必要です。

    手順

    1. 移動先 すべて > プロセス自動化 > フローデザイナー.
    2. クリック 新規 > フロー.
    3. フロー プロパティを定義します。
      詳細については、「ワークフロースタジオ でのフローの作成」を参照してください。
    4. [トリガー] セクションで、[ トリガーの追加 ] をクリックし、 アプリケーション > Kafka メッセージ.
    5. [Kafka メッセージ] フォームで、フィールドに入力します。
      フィールド 説明
      トピック メッセージを監視するトピックへの参照。
      シリアル化形式 メッセージのシリアル化形式。次のいずれかを選択します。
      • プレーンテキスト:プレーンテキストメッセージの場合は、このオプションを選択します。これがデフォルトの形式です。
      • エンコード済みApache Avro 形式のメッセージには、このオプションを選択します。プレーンテキストメッセージを Avro 形式に変換するには、スキーマが必要です。[スキーマレジストリ] フィールドでスキーマレジストリを選択します。スキーマの詳細については、「Schema management in Stream Connect」を参照してください。
      スキーマレジストリ
      選択したスキーマのレジストリ。次のいずれかを選択します。
      • スタンドアロンのスキーマレジストリ
      • Confluent Schema Registry

      このフィールドは、[シリアル化形式][エンコード済み] に設定されている場合にのみ表示されます。

      Confluent Schema Registry では、受信したメッセージのスキーマ ID がスキーマテーブルにない場合、構成された REST 接続を使用してスキーマが動的にインポートされます。

      詳細オプション
      処理を開始する順 (Start processing from) 日付順に整理されたキューの先頭または末尾からメッセージの処理を開始するオプション。次のいずれかを選択します。
      • キュー内の最も古いメッセージ (The earliest message in the queue):キュー内の最も古いメッセージから処理を開始します。
      • キューの最後 (The end of the queue):キュー内の最も新しいメッセージから処理を開始します。
      1 回の実行で処理するメッセージの数 (Number of messages to process per run) 実行ごとに処理されるメッセージの数を指定するか、システムがメッセージの数を決定するオプション。次のいずれかを選択します。
      • 自動的に最適化 (Automatically optimize):システムが 1 回の実行で処理するメッセージの数を決定します。
      • 手動で上書き (高度) (Manually override (advanced)):実行ごとに処理するメッセージの数を指定します。
      メッセージの数を入力してください (システム生成値が小さい場合は無視) (Enter the number of messages (ignored if system-generated value is lower)) 1 回の実行で処理するメッセージの数。このフィールドは、[1 回の実行で処理するメッセージの数(Number of messages to process per run)][手動で上書き (高度) (Manually override (advanced))]に設定されている場合にのみ表示されます。
      • タイプ:整数
      • デフォルト値: 100
      • 最小値:1
      • 最大値:100,000

      選択した数がシステムによって計算された数より大きい場合、その計算された数でフローが実行されます。

      メッセージのサイズによっては、一度の実行で処理されるメッセージの数が指定された値よりも少なくなる場合があります。

    6. [完了] をクリックします。
    7. オプション: [最大同時実行] フィールドと [相対的重み付け (Relative weight)] フィールドを設定します。
      [最大同時実行] フィールドは、作成するパーティショングループの数と使用する並列プロセッサーの数を決定します。[相対的重み付け (Relative weight)] フィールドを使用すると、他のコンシューマーよりも多くの処理時間を当該コンシューマーに割り当てることができます。
      1. 移動先 すべて > プロセス自動化 > フロー管理 > 設定.
      2. [フロー/サブフロー/アクション] フィールドで、フローの情報アイコン (情報アイコン) を選択し、[レコードを開く] を選択します。
      3. [他のアクション] アイコン ( [他のアクション] アイコン) を選択し、 表示 > Kafka.
        [最大同時実行] フィールドがフォームに表示されます。
      4. [相対的重み付け (Relative weight)] フィールドを表示するには、[詳細]を選択します。
      5. [最大同時実行][相対的重み付け (Relative weight)] の値を設定します。
        フィールド 説明
        最大同時実行

        メッセージを消費するために使用する並列プロセッサーの最大数と作成するパーティショングループの数。

        デフォルト値:1

        相対的重み付け

        各サイクルについて、他のコンシューマーと比較して、メッセージを処理するためにコンシューマーに割り当てられた最大時間。たとえば、相対的な重み付けが別のコンシューマーの 2 倍であるコンシューマーは、2 倍の時間を取得します。相対的な重み付けが同じコンシューマーは、同じ時間になります。

        最小値は 5、最大値は 2000 です。デフォルト値は、最大同時実行数に glide.ih.kafka.consumer.max_seconds_per_partition_group プロパティの値を掛けた値です。glide.ih.kafka.consumer.max_seconds_per_partition_group プロパティは、各パーティショングループに割り当てられる最大時間 (秒) を指定します。デフォルト値は 10 です。

        相対重み付けを使用して、サブスクリプションのパーティショングループタイムアウトが計算されます。パーティショングループタイムアウトは、特定のサブスクリプションの各パーティショングループに割り当てられる最大時間 (ミリ秒) を指定します。

        このフィールドは、[詳細] が選択されている場合にのみ表示されます。
      6. [更新] を選択します。
      7. ワークフロースタジオ で作業していたフローに戻ります。
    8. アクション、サブフロー、およびフローロジックをフローに追加します。
    9. フローをテストするには、[テスト] ボタンをクリックします。
      [テスト] ボタンをクリックすると [フローのテスト (Test flow)] ダイアログが開き、ここでフローに送信するメッセージを作成できます。Kafka トピックからのメッセージではなく、この新しく作成されたメッセージを使用して、フローがテストされます。フローは、有効になるまではトピックからのメッセージの受信を開始しません。詳細については、「フローをテストする」を参照してください。
    10. フローを有効にするには、[有効化] ボタンをクリックします。
      フローを有効にすると、そのトピック内でメッセージの検索が開始されます。メッセージを受信するには、フローを有効にする必要があります。詳細については、「フローをアクティブ化する」を参照してください。

    タスクの結果

    Kafka トピックにメッセージがあると、フローがトリガーされてアクションが実行されます。