レッドハットのソリューションアーキテクトの森です。
Red Hat Process Automation Manager と、Red Hat AMQ Streams (Apache Kafka) を統合することができるようになりました。
ビジネスプロセスは、メッセージイベントを使用して Kafka メッセージを送受信することができます。 KIE Server では、プロセス、ケース、またはタスクが完了したときに Kafka イベントを生成させることができます。
今回は、簡単なサンプルプロセスを作成して、Kafka メッセージ からプロセスを開始する方法と、プロセス終了時に Kafka メッセージ を送信する方法についてご紹介していきます。
サンプル実行環境
今回ご紹介するサンプルは、以下の環境で実施をしています。
- Red Hat Process Automation Manager 7.10.0 (以下RHPAM)
- Apache Kafka
- Java 11
- Maven
- Git
- Docker
Kafka のセットアップ
まずは、ローカル環境に Kafka サービス を構築しましょう。
下記のプロジェクトをクローンして、strimzi-all-in-one フォルダに移動し、docker-composeファイルを使って、 以下のコンテナサービスを起動します。
- Zookeeper
- Kafka Broker
- Strimzi HTTP Bridge
- Apicurio Service Registry
git clone https://github.com/hguerrero/amq-examples.git cd amq-examples/strimzi-all-in-one/ docker-compose up
これで、localhostの port 9092 で Kafka サーバーが動作するようになります。
続いて、別のターミナルを立ち上げて、strimzi-all-in-one フォルダに移動します。 以下のコマンドを実行して、今回のサンプルで使用する3つのトピックを作成しておきます。
docker-compose exec kafka bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic new-request docker-compose exec kafka bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic request-approved docker-compose exec kafka bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic request-rejected
RHPAM の設定とサンプルプロジェクトのインポート
RHPAM のプロセスで Kafka メッセージ を送受信するには、EAPの設定ファイルに、システムプロパティ org.kie.kafka.server.ext.disabled を false の値で追加する必要があります。
RHPAM を起動する際の設定ファイル(/$EAP_HOME/standalone/configuration/standalone-full.xml 等)を開き、以下の設定を追加します。
・・・ <system-properties> ・・・ <property name="org.kie.kafka.server.ext.disabled" value="false"/> </system-properties> ・・・
次に、RHPAM を起動して Business Central にログインし、以下のサンプルプロジェクトをインポートします。
https://github.com/kamorisan/pam-kafka-sample.git

サンプルのプロセスは、メッセージの情報をインプットとして、リクエストを承認するかどうかをルールで判断します。 自動承認のルールに該当しない場合は、手動で二次判断を行うヒューマンタスクを実施するという、シンプルなものです。 トピックに新しいイベントが発生すると、プロセスの新しいインスタンスが起動します。 リクエストの承認/却下の結果については各々のトピックに送信することで、 承認ステータスに基づいて異なるサービスで対応ができるようになります。

プロセスを開始するための Kafka メッセージを 受け取るイベントの作成
先ほどインポートしたプロジェクトの、ビジネスプロセスのアセット (kafka-process-sample) を開きます。 まずは、プロセスの開始イベントを、Kafka メッセージ を使用する開始メッセージイベントに変更をしてみましょう。
開始イベント(New Request)の緑色の丸をクリックし、左下の歯車アイコンを選択して開始メッセージイベントを選択します。

次に、この開始メッセージイベントが参照する、Kafka トピック を設定します。
右端の開始メッセージイベントのプロパティパネルから、先ほど作成した Kafka トピック の new-request を、実装/実行メニューの、メッセージの項目に設定をします。

次に、Kafka トピック から受け取るメッセージのデータ項目を追加します。 今回のサンプルプロジェクトでは、RequestData というデータオブジェクトを既に作成してありますので、それを使用します。
開始メッセージイベントのプロパティパネルから、データ割り当てメニューの割当の項目をクリックし、以下のデータ出力と割り当てを追加してください。
- Name: request
- データタイプ: RequestData
- ターゲット: request

開始イベントの変更と、プロパティへの設定の追加が完了したら、一度プロセスを保存し、プロジェクトをKIE Serverへデプロイをしましょう。
新しいターミナルを開き、Kafka サービスの strimzi-all-in-one フォルダへアクセスし、下記のコマンドを実行します。
$PROJECTS_DIR は、自分のフォルダパスの内容に置き換えてください。
cd $PROJECTS_DIR/amq-examples/strimzi-all-in-one docker-compose exec kafka bin/kafka-console-producer.sh --topic new-request --bootstrap-server localhost:9092 >
Kafka の Producer サービスは、new-request というトピックにイベントを発行できる状態となっています。
ターミナルに、以下のjsonデータを入力してみましょう。
> {"data" : {"id": 1, "status": "Gold", "score": 200}}
Business Central の上部メニューから、管理 → プロセスインスタンス を選択します。
左のフィルターパネルから Stateメニューの 完了 にチェックを入れます。
先ほど、 new-request で発行したイベントから実施され、完了したプロセスが表示されるはずです。

完了済のプロセスインスタンスを選択し、次にダイアグラムタブへ移動します。 メッセージイベントからのリクエストが、自動承認されたことが確認できます。

プロセスから Kafka メッセージを 送信するイベントの作成
続いて、プロセス内から Kafka メッセージ を送信するイベントを作成していきます。
事前の準備として、メッセージ送信イベントを作成する際に必要なワークアイテムハンドラーを追加しておきます。
プロジェクトのアセット一覧の画面より、設定 → デプロイメント → ワークアイテムハンドラー タブを選択します。 ワークアイテムハンドラーの追加をクリックし、新しい空の行に以下の値を入力します。
- Name: Send Task
- 値: new org.jbpm.bpmn2.handler.SendTaskHandler()
- リゾルバータイプ: MVEL

項目の追加後、保存をしてください。
次に、ビジネスプロセスのアセット (kafka-process-sample) を開きます。
プロセスの終了イベントを、Kafka メッセージ を送信する終了メッセージイベントに変更をしてみましょう。
終了イベント(Approved)の赤色の丸をクリックし、左下の歯車アイコンを選択して終了メッセージイベントを選択します。

次に、この終了メッセージイベントが参照する、Kafka トピック を設定します。

右端の終了メッセージイベントのプロパティパネルから、冒頭で作成した Kafka トピック の request-approved を、実装/実行メニューの、メッセージの項目に設定をします。

次に、Kafka トピック へ送信するメッセージのデータ項目を追加します。 今回は、承認/却下の理由をメッセージとして送信することにします。
終了メッセージイベントのプロパティパネルから、データ割り当てメニューの割当の項目をクリックし、以下のデータ入力と割り当てを追加してください。
- Name: data
- データタイプ: String
- ターゲット: message
同様にして、もう一つの終了イベント(Rejected)についても、終了メッセージイベントへ変更をしておきます。 Kafka トピック は request-rejected を使用し、データ入力と割り当ては上記と同じものを入力してください。
終了イベントの変更と、プロパティへの設定の追加が完了したら、一度プロセスを保存し、プロジェクトをKIE Serverへデプロイをしましょう。
新しいターミナルを2つ開き、Kafka サービスの strimzi-all-in-one フォルダへアクセスし、下記のコマンドを実行します。
$PROJECTS_DIR は、自分のフォルダパスの内容に置き換えてください。
cd $PROJECTS_DIR/amq-examples/strimzi-all-in-one docker-compose exec kafka bin/kafka-console-consumer.sh --topic request-approved --bootstrap-server localhost:9092
cd $PROJECTS_DIR/amq-examples/strimzi-all-in-one docker-compose exec kafka bin/kafka-console-consumer.sh --topic request-rejected --bootstrap-server localhost:9092
これで、request-approved と request-rejected の2つのトピックの Consumer サービス が準備できました。
new-request の Producer サービス のターミナルから、以下のjsonデータを入力してみましょう。
> {"data" : {"id": 2, "status": "Gold", "score": 150}}
request-approved の Consumer サービス のターミナルに、終了イベントから送信されたメッセージが表示されるはずです。
{
"specversion": "1.0",
"time": "2021-04-19T11:32:25.710+0900",
"id": "4d7356ad-606b-4768-bbab-41b749e48d86",
"type": "java.lang.String",
"source": "/process/pam-kafka-message.process-kafka-sample/2",
"data": "自動承認します"
}
プロセス、ケース、またはタスクが完了したときに Kafka イベントを生成させる
開始・終了メッセージイベントを通してだけでなく、プロセス・ケースやタスクが完了した時に、イベントに関する Kafka メッセージを生成するように設定できます。 トランザクションをコミットすると、KIE Server はメッセージを送信します。
デフォルトでは、KIE Server は以下のトピックでメッセージを公開します。
- 完了したプロセスに関するメッセージ:
jbpm-processes-events - 完了したタスクに関するメッセージ:
jbpm-tasks-events - 完了したケースに関するメッセージ:
jbpm-cases-events
トランザクションのコミット時に、KIE Server からメッセージを送信するには、以下の手順を実行します。 (※ RHPAM 7.10.0 の場合)
Red Hat カスタマーポータルの Software Downloads ページから製品配信可能ファイル rhpam-7.10.0-maven-repository.zip をダウンロードします。
ファイルの内容を展開します。
maven-repository/org/jbpm/jbpm-event-emitters-kafka/7.48.0.Final-redhat-00004/jbpm-event-emitters-kafka-7.48.0.Final-redhat-00004.jar ファイルをアプリケーションサーバーの deployments/kie-server.war/WEB-INF/lib サブディレクトリーにコピーします。
ファイルを指定のフォルダに格納後、EAPを再起動します。
※ maven-repository.zipはファイルサイズが約1.7Gbと大きいため、一般提供版 (GA) リポジトリー: https://maven.repository.redhat.com/ga/ へアクセスし、該当のファイルを直接ダウンロードして格納しても構いません。
新しいターミナルを2つ開き、Kafka サービスの strimzi-all-in-one フォルダへアクセスし、下記のコマンドを実行して、jbpm-processes-events と jbpm-tasks-events の2つのトピックの Consumer サービス を準備しておきます。
$PROJECTS_DIR は、自分のフォルダパスの内容に置き換えてください。
cd $PROJECTS_DIR/amq-examples/strimzi-all-in-one docker-compose exec kafka bin/kafka-console-consumer.sh --topic jbpm-processes-events --bootstrap-server localhost:9092
cd $PROJECTS_DIR/amq-examples/strimzi-all-in-one docker-compose exec kafka bin/kafka-console-consumer.sh --topic jbpm-tasks-events --bootstrap-server localhost:9092
再び Business Central に戻ります。もしサンプルプロジェクトが KIE Server にデプロイされていない場合は、もう一度デプロイを実行してください。
new-request の Producer サービス のターミナルから、以下のjsonデータを入力してみましょう。
> {"data" : {"id": 3, "status": "Silver", "score": 150}}
jbpm-tasks-events の Consumer サービス のターミナルに、KIE Serverから送信されたイベントメッセージが表示されるはずです。
{
"specversion": "1.0",
"time": "2021-04-19T13:48:38.045+0900",
"id": "773d7b97-64cf-41d4-8b35-8090f279f41c",
"type": "task",
"source": "/process/pam-kafka-message.process-kafka-sample/2",
"data": {
"compositeId": "default-kieserver_1",
"id": 1,
"priority": 0,
"name": "手動承認",
"subject": "",
"description": "",
"taskType": null,
"formName": "Task",
"status": "Ready",
"actualOwner": null,
"createdBy": null,
"createdOn": "2021-04-19T13:48:37.611+0900",
"activationTime": "2021-04-19T13:48:37.611+0900",
"expirationDate": null,
"skipable": false,
"workItemId": 2,
"processInstanceId": 2,
"parentId": -1,
"processId": "pam-kafka-message.process-kafka-sample",
"containerId": "pam-kafka-message_1.0.0-SNAPSHOT",
"potentialOwners": [
"kie-server"
],
"excludedOwners": [],
"businessAdmins": [
"Administrator",
"Administrators"
],
"inputData": {
"request": {
"id": 3,
"status": "Silver",
"score": 150
},
"TaskName": "Task",
"NodeName": "手動承認",
"approval": false,
"Skippable": "false",
"message": "却下します",
"GroupId": "kie-server"
},
"outputData": null
}
}
Business Central の上部メニューから、管理 → プロセスインスタンス を選択します。 リストに、有効なインスタンスが表示されているので選択し、次にダイアグラムタブへ移動します。
メッセージイベントからのリクエストは自動承認されずに、手動承認のタスクが実行されているのが確認できます。

Business Central の上部メニューから、トラック → タスク受信箱 を選択します。

リストの手動承認のタスクを選択し、遷移したタスク処理画面の、クレーム → 開始 の青いボタンをクリックします。
Outputs欄に Approval のチェックボックスと、承認/却下理由の Message のテキストボックスがあるので、今回はそのまま却下(Approvalのチェックは入れない)とし、Message のテキストボックスには何か適当に理由を入力して、完了 の青いボタンをクリックします。

Business Central の上部メニューから、管理 → プロセスインスタンス を選択します。
左のフィルターパネルから Stateメニューの 完了 にチェックを入れます。
手動承認のタスクを処理した結果、プロセスが完了しています。
このインスタンスを選択て、ダイアグラムタブを確認します。

手動承認の処理の結果、Rejected の終了メッセージイベントが実行され、プロセスが終了しています。
request-rejected の Consumer サービス のターミナルをみてみましょう。
終了イベントから送信されたメッセージが表示されているはずです。
{
"specversion": "1.0",
"time": "2021-04-19T14:04:15.790+0900",
"id": "dfb359b4-8c1c-44f4-9344-450e6381e9ed",
"type": "java.lang.String",
"source": "/process/pam-kafka-message.process-kafka-sample/2",
"data": "審査の結果、リクエストは却下とします。"
}
またタスクの処理後に jbpm-tasks-events の Consumer サービス のターミナルを確認すると、新しく、KIE Serverから送信されたイベントメッセージが表示されています。
タスク処理前と比べて、state が 1→2(実行中→完了)になり、variables の message が入力した理由に置き換わっています。
{
"specversion": "1.0",
"time": "2021-04-19T14:04:15.831+0900",
"id": "499f344a-f9b6-4067-819b-2c4fb4afe3d4",
"type": "process",
"source": "/process/pam-kafka-message.process-kafka-sample/2",
"data": {
"compositeId": "default-kieserver_2",
"id": 2,
"processId": "pam-kafka-message.process-kafka-sample",
"processName": "process-kafka-sample",
"processVersion": "1.0",
"state": 2,
"containerId": "pam-kafka-message_1.0.0-SNAPSHOT",
"initiator": "unknown",
"date": "2021-04-19T14:04:15.829+0900",
"processInstanceDescription": "process-kafka-sample",
"correlationKey": "2",
"parentId": -1,
"variables": {
"request": {
"id": 3,
"status": "Silver",
"score": 150
},
"approval": false,
"initiator": "unknown",
"message": "審査の結果、リクエストは却下とします。"
}
}
}
jbpm-processes-events の Consumer サービス のターミナルにも、プロセスに関するメッセージが送信されていますので、確認をしてみてください。
おわりに
いかがでしたでしょうか? RHPAM の開始・終了メッセージイベントを使用することで、Red Hat AMQ Streams (Apache Kafka) と統合し、イベント駆動型のマイクロサービスとして、ビジネスプロセスを運用することができるようになりました。 この機能についてのより詳細な情報については、以下のURLのドキュメントを参照して頂ければと思います。
Red Hat Process Automation Manager と Red Hat AMQ Streams の統合