レッドハットのソリューションアーキテクトの森です。
Red Hat Process Automation Manager と、Red Hat AMQ Streams (Apache Kafka) を統合することができるようになりました。
ビジネスプロセスは、メッセージイベントを使用して Kafka メッセージを送受信することができます。 KIE Server では、プロセス、ケース、またはタスクが完了したときに Kafka イベントを生成させることができます。
今回は、簡単なサンプルプロセスを作成して、Kafka メッセージ からプロセスを開始する方法と、プロセス終了時に Kafka メッセージ を送信する方法についてご紹介していきます。
サンプル実行環境
今回ご紹介するサンプルは、以下の環境で実施をしています。
- Red Hat Process Automation Manager 7.10.0 (以下RHPAM)
- Apache Kafka
- Java 11
- Maven
- Git
- Docker
Kafka のセットアップ
まずは、ローカル環境に Kafka サービス を構築しましょう。
下記のプロジェクトをクローンして、strimzi-all-in-one フォルダに移動し、docker-composeファイルを使って、 以下のコンテナサービスを起動します。
- Zookeeper
- Kafka Broker
- Strimzi HTTP Bridge
- Apicurio Service Registry
git clone https://github.com/hguerrero/amq-examples.git cd amq-examples/strimzi-all-in-one/ docker-compose up
これで、localhostの port 9092 で Kafka サーバーが動作するようになります。
続いて、別のターミナルを立ち上げて、strimzi-all-in-one フォルダに移動します。 以下のコマンドを実行して、今回のサンプルで使用する3つのトピックを作成しておきます。
docker-compose exec kafka bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic new-request docker-compose exec kafka bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic request-approved docker-compose exec kafka bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic request-rejected
RHPAM の設定とサンプルプロジェクトのインポート
RHPAM のプロセスで Kafka メッセージ を送受信するには、EAPの設定ファイルに、システムプロパティ org.kie.kafka.server.ext.disabled
を false
の値で追加する必要があります。
RHPAM を起動する際の設定ファイル(/$EAP_HOME/standalone/configuration/standalone-full.xml 等)を開き、以下の設定を追加します。
・・・ <system-properties> ・・・ <property name="org.kie.kafka.server.ext.disabled" value="false"/> </system-properties> ・・・
次に、RHPAM を起動して Business Central にログインし、以下のサンプルプロジェクトをインポートします。
https://github.com/kamorisan/pam-kafka-sample.git
サンプルのプロセスは、メッセージの情報をインプットとして、リクエストを承認するかどうかをルールで判断します。 自動承認のルールに該当しない場合は、手動で二次判断を行うヒューマンタスクを実施するという、シンプルなものです。 トピックに新しいイベントが発生すると、プロセスの新しいインスタンスが起動します。 リクエストの承認/却下の結果については各々のトピックに送信することで、 承認ステータスに基づいて異なるサービスで対応ができるようになります。
プロセスを開始するための Kafka メッセージを 受け取るイベントの作成
先ほどインポートしたプロジェクトの、ビジネスプロセスのアセット (kafka-process-sample) を開きます。 まずは、プロセスの開始イベントを、Kafka メッセージ を使用する開始メッセージイベントに変更をしてみましょう。
開始イベント(New Request)の緑色の丸をクリックし、左下の歯車アイコンを選択して開始メッセージイベントを選択します。
次に、この開始メッセージイベントが参照する、Kafka トピック を設定します。
右端の開始メッセージイベントのプロパティパネルから、先ほど作成した Kafka トピック の new-request を、実装/実行メニューの、メッセージの項目に設定をします。
次に、Kafka トピック から受け取るメッセージのデータ項目を追加します。 今回のサンプルプロジェクトでは、RequestData というデータオブジェクトを既に作成してありますので、それを使用します。
開始メッセージイベントのプロパティパネルから、データ割り当てメニューの割当の項目をクリックし、以下のデータ出力と割り当てを追加してください。
- Name: request
- データタイプ: RequestData
- ターゲット: request
開始イベントの変更と、プロパティへの設定の追加が完了したら、一度プロセスを保存し、プロジェクトをKIE Serverへデプロイをしましょう。
新しいターミナルを開き、Kafka サービスの strimzi-all-in-one フォルダへアクセスし、下記のコマンドを実行します。
$PROJECTS_DIR
は、自分のフォルダパスの内容に置き換えてください。
cd $PROJECTS_DIR/amq-examples/strimzi-all-in-one docker-compose exec kafka bin/kafka-console-producer.sh --topic new-request --bootstrap-server localhost:9092 >
Kafka の Producer サービスは、new-request
というトピックにイベントを発行できる状態となっています。
ターミナルに、以下のjsonデータを入力してみましょう。
> {"data" : {"id": 1, "status": "Gold", "score": 200}}
Business Central の上部メニューから、管理 → プロセスインスタンス を選択します。
左のフィルターパネルから Stateメニューの 完了
にチェックを入れます。
先ほど、 new-request
で発行したイベントから実施され、完了したプロセスが表示されるはずです。
完了済のプロセスインスタンスを選択し、次にダイアグラムタブへ移動します。 メッセージイベントからのリクエストが、自動承認されたことが確認できます。
プロセスから Kafka メッセージを 送信するイベントの作成
続いて、プロセス内から Kafka メッセージ を送信するイベントを作成していきます。
事前の準備として、メッセージ送信イベントを作成する際に必要なワークアイテムハンドラーを追加しておきます。
プロジェクトのアセット一覧の画面より、設定 → デプロイメント → ワークアイテムハンドラー タブを選択します。 ワークアイテムハンドラーの追加をクリックし、新しい空の行に以下の値を入力します。
- Name: Send Task
- 値: new org.jbpm.bpmn2.handler.SendTaskHandler()
- リゾルバータイプ: MVEL
項目の追加後、保存をしてください。
次に、ビジネスプロセスのアセット (kafka-process-sample) を開きます。
プロセスの終了イベントを、Kafka メッセージ を送信する終了メッセージイベントに変更をしてみましょう。
終了イベント(Approved)の赤色の丸をクリックし、左下の歯車アイコンを選択して終了メッセージイベントを選択します。
次に、この終了メッセージイベントが参照する、Kafka トピック を設定します。
右端の終了メッセージイベントのプロパティパネルから、冒頭で作成した Kafka トピック の request-approved を、実装/実行メニューの、メッセージの項目に設定をします。
次に、Kafka トピック へ送信するメッセージのデータ項目を追加します。 今回は、承認/却下の理由をメッセージとして送信することにします。
終了メッセージイベントのプロパティパネルから、データ割り当てメニューの割当の項目をクリックし、以下のデータ入力と割り当てを追加してください。
- Name: data
- データタイプ: String
- ターゲット: message
同様にして、もう一つの終了イベント(Rejected)についても、終了メッセージイベントへ変更をしておきます。 Kafka トピック は request-rejected を使用し、データ入力と割り当ては上記と同じものを入力してください。
終了イベントの変更と、プロパティへの設定の追加が完了したら、一度プロセスを保存し、プロジェクトをKIE Serverへデプロイをしましょう。
新しいターミナルを2つ開き、Kafka サービスの strimzi-all-in-one フォルダへアクセスし、下記のコマンドを実行します。
$PROJECTS_DIR
は、自分のフォルダパスの内容に置き換えてください。
cd $PROJECTS_DIR/amq-examples/strimzi-all-in-one docker-compose exec kafka bin/kafka-console-consumer.sh --topic request-approved --bootstrap-server localhost:9092
cd $PROJECTS_DIR/amq-examples/strimzi-all-in-one docker-compose exec kafka bin/kafka-console-consumer.sh --topic request-rejected --bootstrap-server localhost:9092
これで、request-approved
と request-rejected
の2つのトピックの Consumer サービス が準備できました。
new-request
の Producer サービス のターミナルから、以下のjsonデータを入力してみましょう。
> {"data" : {"id": 2, "status": "Gold", "score": 150}}
request-approved
の Consumer サービス のターミナルに、終了イベントから送信されたメッセージが表示されるはずです。
{ "specversion": "1.0", "time": "2021-04-19T11:32:25.710+0900", "id": "4d7356ad-606b-4768-bbab-41b749e48d86", "type": "java.lang.String", "source": "/process/pam-kafka-message.process-kafka-sample/2", "data": "自動承認します" }
プロセス、ケース、またはタスクが完了したときに Kafka イベントを生成させる
開始・終了メッセージイベントを通してだけでなく、プロセス・ケースやタスクが完了した時に、イベントに関する Kafka メッセージを生成するように設定できます。 トランザクションをコミットすると、KIE Server はメッセージを送信します。
デフォルトでは、KIE Server は以下のトピックでメッセージを公開します。
- 完了したプロセスに関するメッセージ:
jbpm-processes-events
- 完了したタスクに関するメッセージ:
jbpm-tasks-events
- 完了したケースに関するメッセージ:
jbpm-cases-events
トランザクションのコミット時に、KIE Server からメッセージを送信するには、以下の手順を実行します。 (※ RHPAM 7.10.0 の場合)
Red Hat カスタマーポータルの Software Downloads ページから製品配信可能ファイル rhpam-7.10.0-maven-repository.zip をダウンロードします。
ファイルの内容を展開します。
maven-repository/org/jbpm/jbpm-event-emitters-kafka/7.48.0.Final-redhat-00004/jbpm-event-emitters-kafka-7.48.0.Final-redhat-00004.jar ファイルをアプリケーションサーバーの deployments/kie-server.war/WEB-INF/lib サブディレクトリーにコピーします。
ファイルを指定のフォルダに格納後、EAPを再起動します。
※ maven-repository.zipはファイルサイズが約1.7Gbと大きいため、一般提供版 (GA) リポジトリー: https://maven.repository.redhat.com/ga/ へアクセスし、該当のファイルを直接ダウンロードして格納しても構いません。
新しいターミナルを2つ開き、Kafka サービスの strimzi-all-in-one フォルダへアクセスし、下記のコマンドを実行して、jbpm-processes-events
と jbpm-tasks-events
の2つのトピックの Consumer サービス を準備しておきます。
$PROJECTS_DIR
は、自分のフォルダパスの内容に置き換えてください。
cd $PROJECTS_DIR/amq-examples/strimzi-all-in-one docker-compose exec kafka bin/kafka-console-consumer.sh --topic jbpm-processes-events --bootstrap-server localhost:9092
cd $PROJECTS_DIR/amq-examples/strimzi-all-in-one docker-compose exec kafka bin/kafka-console-consumer.sh --topic jbpm-tasks-events --bootstrap-server localhost:9092
再び Business Central に戻ります。もしサンプルプロジェクトが KIE Server にデプロイされていない場合は、もう一度デプロイを実行してください。
new-request
の Producer サービス のターミナルから、以下のjsonデータを入力してみましょう。
> {"data" : {"id": 3, "status": "Silver", "score": 150}}
jbpm-tasks-events
の Consumer サービス のターミナルに、KIE Serverから送信されたイベントメッセージが表示されるはずです。
{ "specversion": "1.0", "time": "2021-04-19T13:48:38.045+0900", "id": "773d7b97-64cf-41d4-8b35-8090f279f41c", "type": "task", "source": "/process/pam-kafka-message.process-kafka-sample/2", "data": { "compositeId": "default-kieserver_1", "id": 1, "priority": 0, "name": "手動承認", "subject": "", "description": "", "taskType": null, "formName": "Task", "status": "Ready", "actualOwner": null, "createdBy": null, "createdOn": "2021-04-19T13:48:37.611+0900", "activationTime": "2021-04-19T13:48:37.611+0900", "expirationDate": null, "skipable": false, "workItemId": 2, "processInstanceId": 2, "parentId": -1, "processId": "pam-kafka-message.process-kafka-sample", "containerId": "pam-kafka-message_1.0.0-SNAPSHOT", "potentialOwners": [ "kie-server" ], "excludedOwners": [], "businessAdmins": [ "Administrator", "Administrators" ], "inputData": { "request": { "id": 3, "status": "Silver", "score": 150 }, "TaskName": "Task", "NodeName": "手動承認", "approval": false, "Skippable": "false", "message": "却下します", "GroupId": "kie-server" }, "outputData": null } }
Business Central の上部メニューから、管理 → プロセスインスタンス を選択します。 リストに、有効なインスタンスが表示されているので選択し、次にダイアグラムタブへ移動します。
メッセージイベントからのリクエストは自動承認されずに、手動承認のタスクが実行されているのが確認できます。
Business Central の上部メニューから、トラック → タスク受信箱 を選択します。
リストの手動承認のタスクを選択し、遷移したタスク処理画面の、クレーム
→ 開始
の青いボタンをクリックします。
Outputs欄に Approval
のチェックボックスと、承認/却下理由の Message
のテキストボックスがあるので、今回はそのまま却下(Approvalのチェックは入れない)とし、Message
のテキストボックスには何か適当に理由を入力して、完了
の青いボタンをクリックします。
Business Central の上部メニューから、管理 → プロセスインスタンス を選択します。
左のフィルターパネルから Stateメニューの 完了
にチェックを入れます。
手動承認のタスクを処理した結果、プロセスが完了しています。
このインスタンスを選択て、ダイアグラムタブを確認します。
手動承認の処理の結果、Rejected の終了メッセージイベントが実行され、プロセスが終了しています。
request-rejected
の Consumer サービス のターミナルをみてみましょう。
終了イベントから送信されたメッセージが表示されているはずです。
{ "specversion": "1.0", "time": "2021-04-19T14:04:15.790+0900", "id": "dfb359b4-8c1c-44f4-9344-450e6381e9ed", "type": "java.lang.String", "source": "/process/pam-kafka-message.process-kafka-sample/2", "data": "審査の結果、リクエストは却下とします。" }
またタスクの処理後に jbpm-tasks-events
の Consumer サービス のターミナルを確認すると、新しく、KIE Serverから送信されたイベントメッセージが表示されています。
タスク処理前と比べて、state
が 1→2(実行中→完了)になり、variables
の message
が入力した理由に置き換わっています。
{ "specversion": "1.0", "time": "2021-04-19T14:04:15.831+0900", "id": "499f344a-f9b6-4067-819b-2c4fb4afe3d4", "type": "process", "source": "/process/pam-kafka-message.process-kafka-sample/2", "data": { "compositeId": "default-kieserver_2", "id": 2, "processId": "pam-kafka-message.process-kafka-sample", "processName": "process-kafka-sample", "processVersion": "1.0", "state": 2, "containerId": "pam-kafka-message_1.0.0-SNAPSHOT", "initiator": "unknown", "date": "2021-04-19T14:04:15.829+0900", "processInstanceDescription": "process-kafka-sample", "correlationKey": "2", "parentId": -1, "variables": { "request": { "id": 3, "status": "Silver", "score": 150 }, "approval": false, "initiator": "unknown", "message": "審査の結果、リクエストは却下とします。" } } }
jbpm-processes-events
の Consumer サービス のターミナルにも、プロセスに関するメッセージが送信されていますので、確認をしてみてください。
おわりに
いかがでしたでしょうか? RHPAM の開始・終了メッセージイベントを使用することで、Red Hat AMQ Streams (Apache Kafka) と統合し、イベント駆動型のマイクロサービスとして、ビジネスプロセスを運用することができるようになりました。 この機能についてのより詳細な情報については、以下のURLのドキュメントを参照して頂ければと思います。
Red Hat Process Automation Manager と Red Hat AMQ Streams の統合