Kafka メッセージトリガーを使用したフローの作成
Kafka ストリームからのイベントを処理するフローをビルドします。指定されたトピックでイベントが利用可能になったときにフローを開始します。
始める前に
必要なロール:flow_designer または admin
このトリガーには、ストリームコネクト サブスクリプションが必要です。詳細については、「https://www.servicenow.com/now-platform/workflow-data-fabric.html」を参照してください。
このトリガーには、 ServiceNow Stream Connect Installer [com.glide.hub.stream_connect.installer] プラグインが必要です。
手順
- 移動先 すべて > プロセス自動化 > フローデザイナー.
- クリック 新規 > フロー.
-
フロー プロパティを定義します。
詳細については、「ワークフロースタジオ でのフローの作成」を参照してください。
- [トリガー] セクションで、[ トリガーの追加 ] をクリックし、 アプリケーション > Kafka メッセージ.
-
[Kafka メッセージ] フォームで、フィールドに入力します。
フィールド 説明 トピック メッセージを監視するトピックへの参照。 シリアル化形式 メッセージのシリアル化形式。次のいずれかを選択します。 - プレーンテキスト:プレーンテキストメッセージの場合は、このオプションを選択します。これがデフォルトの形式です。
- エンコード済み:Apache Avro 形式のメッセージには、このオプションを選択します。プレーンテキストメッセージを Avro 形式に変換するには、スキーマが必要です。[スキーマレジストリ] フィールドでスキーマレジストリを選択します。スキーマの詳細については、「Schema management in Stream Connect」を参照してください。
スキーマレジストリ 選択したスキーマのレジストリ。次のいずれかを選択します。- スタンドアロンのスキーマレジストリ
- Confluent Schema Registry
このフィールドは、[シリアル化形式] が [エンコード済み] に設定されている場合にのみ表示されます。
Confluent Schema Registry では、受信したメッセージのスキーマ ID がスキーマテーブルにない場合、構成された REST 接続を使用してスキーマが動的にインポートされます。
詳細オプション 処理を開始する順 (Start processing from) 日付順に整理されたキューの先頭または末尾からメッセージの処理を開始するオプション。次のいずれかを選択します。 - キュー内の最も古いメッセージ (The earliest message in the queue):キュー内の最も古いメッセージから処理を開始します。
- キューの最後 (The end of the queue):キュー内の最も新しいメッセージから処理を開始します。
1 回の実行で処理するメッセージの数 (Number of messages to process per run) 実行ごとに処理されるメッセージの数を指定するか、システムがメッセージの数を決定するオプション。次のいずれかを選択します。 - 自動的に最適化 (Automatically optimize):システムが 1 回の実行で処理するメッセージの数を決定します。
- 手動で上書き (高度) (Manually override (advanced)):実行ごとに処理するメッセージの数を指定します。
メッセージの数を入力してください (システム生成値が小さい場合は無視) (Enter the number of messages (ignored if system-generated value is lower)) 1 回の実行で処理するメッセージの数。このフィールドは、[1 回の実行で処理するメッセージの数(Number of messages to process per run)] が [手動で上書き (高度) (Manually override (advanced))]に設定されている場合にのみ表示されます。 - タイプ:整数
- デフォルト値: 100
- 最小値:1
- 最大値:100,000
選択した数がシステムによって計算された数より大きい場合、その計算された数でフローが実行されます。
メッセージのサイズによっては、一度の実行で処理されるメッセージの数が指定された値よりも少なくなる場合があります。
- [完了] をクリックします。
- オプション:
[最大同時実行] フィールドと [相対的重み付け (Relative weight)] フィールドを設定します。
[最大同時実行] フィールドは、作成するパーティショングループの数と使用する並列プロセッサーの数を決定します。[相対的重み付け (Relative weight)] フィールドを使用すると、他のコンシューマーよりも多くの処理時間を当該コンシューマーに割り当てることができます。
- 移動先 すべて > プロセス自動化 > フロー管理 > 設定.
-
[フロー/サブフロー/アクション] フィールドで、フローの情報アイコン (
) を選択し、[レコードを開く] を選択します。
-
[他のアクション] アイコン (
) を選択し、 表示 > Kafka.
[最大同時実行] フィールドがフォームに表示されます。 - [相対的重み付け (Relative weight)] フィールドを表示するには、[詳細]を選択します。
-
[最大同時実行] と [相対的重み付け (Relative weight)] の値を設定します。
フィールド 説明 最大同時実行 メッセージを消費するために使用する並列プロセッサーの最大数と作成するパーティショングループの数。
デフォルト値:1
相対的重み付け 各サイクルについて、他のコンシューマーと比較して、メッセージを処理するためにコンシューマーに割り当てられた最大時間。たとえば、相対的な重み付けが別のコンシューマーの 2 倍であるコンシューマーは、2 倍の時間を取得します。相対的な重み付けが同じコンシューマーは、同じ時間になります。
最小値は 5、最大値は 2000 です。デフォルト値は、最大同時実行数に glide.ih.kafka.consumer.max_seconds_per_partition_group プロパティの値を掛けた値です。glide.ih.kafka.consumer.max_seconds_per_partition_group プロパティは、各パーティショングループに割り当てられる最大時間 (秒) を指定します。デフォルト値は 10 です。
相対重み付けを使用して、サブスクリプションのパーティショングループタイムアウトが計算されます。パーティショングループタイムアウトは、特定のサブスクリプションの各パーティショングループに割り当てられる最大時間 (ミリ秒) を指定します。
このフィールドは、[詳細] が選択されている場合にのみ表示されます。 - [更新] を選択します。
- ワークフロースタジオ で作業していたフローに戻ります。
- アクション、サブフロー、およびフローロジックをフローに追加します。
-
フローをテストするには、[テスト] ボタンをクリックします。
[テスト] ボタンをクリックすると [フローのテスト (Test flow)] ダイアログが開き、ここでフローに送信するメッセージを作成できます。Kafka トピックからのメッセージではなく、この新しく作成されたメッセージを使用して、フローがテストされます。フローは、有効になるまではトピックからのメッセージの受信を開始しません。詳細については、「フローをテストする」を参照してください。
-
フローを有効にするには、[有効化] ボタンをクリックします。
フローを有効にすると、そのトピック内でメッセージの検索が開始されます。メッセージを受信するには、フローを有効にする必要があります。詳細については、「フローをアクティブ化する」を参照してください。
タスクの結果
Kafka トピックにメッセージがあると、フローがトリガーされてアクションが実行されます。