Apache Kafka で動かすイベント駆動型のビジネスプロセス

レッドハットのソリューションアーキテクトの森です。
Red Hat Process Automation Manager と、Red Hat AMQ Streams (Apache Kafka) を統合することができるようになりました。

ビジネスプロセスは、メッセージイベントを使用して Kafka メッセージを送受信することができます。 KIE Server では、プロセス、ケース、またはタスクが完了したときに Kafka イベントを生成させることができます。

今回は、簡単なサンプルプロセスを作成して、Kafka メッセージ からプロセスを開始する方法と、プロセス終了時に Kafka メッセージ を送信する方法についてご紹介していきます。

サンプル実行環境

今回ご紹介するサンプルは、以下の環境で実施をしています。

  • Red Hat Process Automation Manager 7.10.0 (以下RHPAM)
  • Apache Kafka
  • Java 11
  • Maven
  • Git
  • Docker

Kafka のセットアップ

まずは、ローカル環境に Kafka サービス を構築しましょう。

下記のプロジェクトをクローンして、strimzi-all-in-one フォルダに移動し、docker-composeファイルを使って、 以下のコンテナサービスを起動します。

  • Zookeeper
  • Kafka Broker
  • Strimzi HTTP Bridge
  • Apicurio Service Registry
git clone https://github.com/hguerrero/amq-examples.git  
cd amq-examples/strimzi-all-in-one/ 
docker-compose up

これで、localhostの port 9092 で Kafka サーバーが動作するようになります。

続いて、別のターミナルを立ち上げて、strimzi-all-in-one フォルダに移動します。 以下のコマンドを実行して、今回のサンプルで使用する3つのトピックを作成しておきます。

docker-compose exec kafka bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic new-request

docker-compose exec kafka bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic request-approved

docker-compose exec kafka bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic request-rejected

RHPAM の設定とサンプルプロジェクトのインポート

RHPAM のプロセスで Kafka メッセージ を送受信するには、EAPの設定ファイルに、システムプロパティ org.kie.kafka.server.ext.disabledfalse の値で追加する必要があります。 RHPAM を起動する際の設定ファイル(/$EAP_HOME/standalone/configuration/standalone-full.xml 等)を開き、以下の設定を追加します。

・・・
<system-properties>
・・・
   <property name="org.kie.kafka.server.ext.disabled" value="false"/>
</system-properties>
・・・

次に、RHPAM を起動して Business Central にログインし、以下のサンプルプロジェクトをインポートします。

https://github.com/kamorisan/pam-kafka-sample.git

f:id:ka_mori:20210420111142p:plain

サンプルのプロセスは、メッセージの情報をインプットとして、リクエストを承認するかどうかをルールで判断します。 自動承認のルールに該当しない場合は、手動で二次判断を行うヒューマンタスクを実施するという、シンプルなものです。 トピックに新しいイベントが発生すると、プロセスの新しいインスタンスが起動します。 リクエストの承認/却下の結果については各々のトピックに送信することで、 承認ステータスに基づいて異なるサービスで対応ができるようになります。

f:id:ka_mori:20210420111534p:plain

プロセスを開始するための Kafka メッセージを 受け取るイベントの作成

先ほどインポートしたプロジェクトの、ビジネスプロセスのアセット (kafka-process-sample) を開きます。 まずは、プロセスの開始イベントを、Kafka メッセージ を使用する開始メッセージイベントに変更をしてみましょう。

開始イベント(New Request)の緑色の丸をクリックし、左下の歯車アイコンを選択して開始メッセージイベントを選択します。

f:id:ka_mori:20210420111604p:plain

次に、この開始メッセージイベントが参照する、Kafka トピック を設定します。

右端の開始メッセージイベントのプロパティパネルから、先ほど作成した Kafka トピック の new-request を、実装/実行メニューの、メッセージの項目に設定をします。

f:id:ka_mori:20210420111621p:plain

次に、Kafka トピック から受け取るメッセージのデータ項目を追加します。 今回のサンプルプロジェクトでは、RequestData というデータオブジェクトを既に作成してありますので、それを使用します。

開始メッセージイベントのプロパティパネルから、データ割り当てメニューの割当の項目をクリックし、以下のデータ出力と割り当てを追加してください。

  • Name: request
  • データタイプ: RequestData
  • ターゲット: request

f:id:ka_mori:20210420111637p:plain

開始イベントの変更と、プロパティへの設定の追加が完了したら、一度プロセスを保存し、プロジェクトをKIE Serverへデプロイをしましょう。

新しいターミナルを開き、Kafka サービスの strimzi-all-in-one フォルダへアクセスし、下記のコマンドを実行します。 $PROJECTS_DIR は、自分のフォルダパスの内容に置き換えてください。

cd $PROJECTS_DIR/amq-examples/strimzi-all-in-one
docker-compose exec kafka bin/kafka-console-producer.sh --topic new-request --bootstrap-server localhost:9092
>

Kafka の Producer サービスは、new-request というトピックにイベントを発行できる状態となっています。 ターミナルに、以下のjsonデータを入力してみましょう。

> {"data" : {"id": 1, "status": "Gold", "score": 200}}

Business Central の上部メニューから、管理 → プロセスインスタンス を選択します。 左のフィルターパネルから Stateメニューの 完了 にチェックを入れます。 先ほど、 new-request で発行したイベントから実施され、完了したプロセスが表示されるはずです。

f:id:ka_mori:20210420111702p:plain

完了済のプロセスインスタンスを選択し、次にダイアグラムタブへ移動します。 メッセージイベントからのリクエストが、自動承認されたことが確認できます。

f:id:ka_mori:20210420111718p:plain

プロセスから Kafka メッセージを 送信するイベントの作成

続いて、プロセス内から Kafka メッセージ を送信するイベントを作成していきます。

事前の準備として、メッセージ送信イベントを作成する際に必要なワークアイテムハンドラーを追加しておきます。

プロジェクトのアセット一覧の画面より、設定 → デプロイメント → ワークアイテムハンドラー タブを選択します。 ワークアイテムハンドラーの追加をクリックし、新しい空の行に以下の値を入力します。

  • Name: Send Task
  • 値: new org.jbpm.bpmn2.handler.SendTaskHandler()
  • リゾルバータイプ: MVEL

f:id:ka_mori:20210420111736p:plain

項目の追加後、保存をしてください。

次に、ビジネスプロセスのアセット (kafka-process-sample) を開きます。

プロセスの終了イベントを、Kafka メッセージ を送信する終了メッセージイベントに変更をしてみましょう。

終了イベント(Approved)の赤色の丸をクリックし、左下の歯車アイコンを選択して終了メッセージイベントを選択します。

f:id:ka_mori:20210420111752p:plain

次に、この終了メッセージイベントが参照する、Kafka トピック を設定します。

f:id:ka_mori:20210420111811p:plain

右端の終了メッセージイベントのプロパティパネルから、冒頭で作成した Kafka トピック の request-approved を、実装/実行メニューの、メッセージの項目に設定をします。

f:id:ka_mori:20210420111830p:plain

次に、Kafka トピック へ送信するメッセージのデータ項目を追加します。 今回は、承認/却下の理由をメッセージとして送信することにします。

終了メッセージイベントのプロパティパネルから、データ割り当てメニューの割当の項目をクリックし、以下のデータ入力と割り当てを追加してください。

  • Name: data
  • データタイプ: String
  • ターゲット: message

同様にして、もう一つの終了イベント(Rejected)についても、終了メッセージイベントへ変更をしておきます。 Kafka トピック は request-rejected を使用し、データ入力と割り当ては上記と同じものを入力してください。

終了イベントの変更と、プロパティへの設定の追加が完了したら、一度プロセスを保存し、プロジェクトをKIE Serverへデプロイをしましょう。

新しいターミナルを2つ開き、Kafka サービスの strimzi-all-in-one フォルダへアクセスし、下記のコマンドを実行します。 $PROJECTS_DIR は、自分のフォルダパスの内容に置き換えてください。

cd $PROJECTS_DIR/amq-examples/strimzi-all-in-one
docker-compose exec kafka bin/kafka-console-consumer.sh --topic request-approved  --bootstrap-server localhost:9092
cd $PROJECTS_DIR/amq-examples/strimzi-all-in-one
docker-compose exec kafka bin/kafka-console-consumer.sh --topic request-rejected  --bootstrap-server localhost:9092

これで、request-approvedrequest-rejected の2つのトピックの Consumer サービス が準備できました。

new-request の Producer サービス のターミナルから、以下のjsonデータを入力してみましょう。

> {"data" : {"id": 2, "status": "Gold", "score": 150}}

request-approved の Consumer サービス のターミナルに、終了イベントから送信されたメッセージが表示されるはずです。

{
  "specversion": "1.0",
  "time": "2021-04-19T11:32:25.710+0900",
  "id": "4d7356ad-606b-4768-bbab-41b749e48d86",
  "type": "java.lang.String",
  "source": "/process/pam-kafka-message.process-kafka-sample/2",
  "data": "自動承認します"
}

プロセス、ケース、またはタスクが完了したときに Kafka イベントを生成させる

開始・終了メッセージイベントを通してだけでなく、プロセス・ケースやタスクが完了した時に、イベントに関する Kafka メッセージを生成するように設定できます。 トランザクションをコミットすると、KIE Server はメッセージを送信します。

デフォルトでは、KIE Server は以下のトピックでメッセージを公開します。

  • 完了したプロセスに関するメッセージ: jbpm-processes-events
  • 完了したタスクに関するメッセージ: jbpm-tasks-events
  • 完了したケースに関するメッセージ: jbpm-cases-events

トランザクションのコミット時に、KIE Server からメッセージを送信するには、以下の手順を実行します。 (※ RHPAM 7.10.0 の場合)

  1. Red Hat カスタマーポータルの Software Downloads ページから製品配信可能ファイル rhpam-7.10.0-maven-repository.zip をダウンロードします。

  2. ファイルの内容を展開します。

  3. maven-repository/org/jbpm/jbpm-event-emitters-kafka/7.48.0.Final-redhat-00004/jbpm-event-emitters-kafka-7.48.0.Final-redhat-00004.jar ファイルをアプリケーションサーバーの deployments/kie-server.war/WEB-INF/lib サブディレクトリーにコピーします。

  4. ファイルを指定のフォルダに格納後、EAPを再起動します。

※ maven-repository.zipはファイルサイズが約1.7Gbと大きいため、一般提供版 (GA) リポジトリー: https://maven.repository.redhat.com/ga/ へアクセスし、該当のファイルを直接ダウンロードして格納しても構いません。

新しいターミナルを2つ開き、Kafka サービスの strimzi-all-in-one フォルダへアクセスし、下記のコマンドを実行して、jbpm-processes-eventsjbpm-tasks-events の2つのトピックの Consumer サービス を準備しておきます。 $PROJECTS_DIR は、自分のフォルダパスの内容に置き換えてください。

cd $PROJECTS_DIR/amq-examples/strimzi-all-in-one
docker-compose exec kafka bin/kafka-console-consumer.sh --topic jbpm-processes-events  --bootstrap-server localhost:9092
cd $PROJECTS_DIR/amq-examples/strimzi-all-in-one
docker-compose exec kafka bin/kafka-console-consumer.sh --topic jbpm-tasks-events  --bootstrap-server localhost:9092

再び Business Central に戻ります。もしサンプルプロジェクトが KIE Server にデプロイされていない場合は、もう一度デプロイを実行してください。

new-request の Producer サービス のターミナルから、以下のjsonデータを入力してみましょう。

> {"data" : {"id": 3, "status": "Silver", "score": 150}}

jbpm-tasks-events の Consumer サービス のターミナルに、KIE Serverから送信されたイベントメッセージが表示されるはずです。

{
  "specversion": "1.0",
  "time": "2021-04-19T13:48:38.045+0900",
  "id": "773d7b97-64cf-41d4-8b35-8090f279f41c",
  "type": "task",
  "source": "/process/pam-kafka-message.process-kafka-sample/2",
  "data": {
    "compositeId": "default-kieserver_1",
    "id": 1,
    "priority": 0,
    "name": "手動承認",
    "subject": "",
    "description": "",
    "taskType": null,
    "formName": "Task",
    "status": "Ready",
    "actualOwner": null,
    "createdBy": null,
    "createdOn": "2021-04-19T13:48:37.611+0900",
    "activationTime": "2021-04-19T13:48:37.611+0900",
    "expirationDate": null,
    "skipable": false,
    "workItemId": 2,
    "processInstanceId": 2,
    "parentId": -1,
    "processId": "pam-kafka-message.process-kafka-sample",
    "containerId": "pam-kafka-message_1.0.0-SNAPSHOT",
    "potentialOwners": [
      "kie-server"
    ],
    "excludedOwners": [],
    "businessAdmins": [
      "Administrator",
      "Administrators"
    ],
    "inputData": {
      "request": {
        "id": 3,
        "status": "Silver",
        "score": 150
      },
      "TaskName": "Task",
      "NodeName": "手動承認",
      "approval": false,
      "Skippable": "false",
      "message": "却下します",
      "GroupId": "kie-server"
    },
    "outputData": null
  }
}

Business Central の上部メニューから、管理 → プロセスインスタンス を選択します。 リストに、有効なインスタンスが表示されているので選択し、次にダイアグラムタブへ移動します。

メッセージイベントからのリクエストは自動承認されずに、手動承認のタスクが実行されているのが確認できます。

f:id:ka_mori:20210420111857p:plain

Business Central の上部メニューから、トラック → タスク受信箱 を選択します。

f:id:ka_mori:20210420111910p:plain

リストの手動承認のタスクを選択し、遷移したタスク処理画面の、クレーム開始 の青いボタンをクリックします。

Outputs欄に Approval のチェックボックスと、承認/却下理由の Message のテキストボックスがあるので、今回はそのまま却下(Approvalのチェックは入れない)とし、Message のテキストボックスには何か適当に理由を入力して、完了 の青いボタンをクリックします。

f:id:ka_mori:20210420111929p:plain

Business Central の上部メニューから、管理 → プロセスインスタンス を選択します。 左のフィルターパネルから Stateメニューの 完了 にチェックを入れます。 手動承認のタスクを処理した結果、プロセスが完了しています。 このインスタンスを選択て、ダイアグラムタブを確認します。

f:id:ka_mori:20210420111944p:plain

手動承認の処理の結果、Rejected の終了メッセージイベントが実行され、プロセスが終了しています。

request-rejected の Consumer サービス のターミナルをみてみましょう。 終了イベントから送信されたメッセージが表示されているはずです。

{
  "specversion": "1.0",
  "time": "2021-04-19T14:04:15.790+0900",
  "id": "dfb359b4-8c1c-44f4-9344-450e6381e9ed",
  "type": "java.lang.String",
  "source": "/process/pam-kafka-message.process-kafka-sample/2",
  "data": "審査の結果、リクエストは却下とします。"
}

またタスクの処理後に jbpm-tasks-events の Consumer サービス のターミナルを確認すると、新しく、KIE Serverから送信されたイベントメッセージが表示されています。

タスク処理前と比べて、state が 1→2(実行中→完了)になり、variablesmessage が入力した理由に置き換わっています。

{
  "specversion": "1.0",
  "time": "2021-04-19T14:04:15.831+0900",
  "id": "499f344a-f9b6-4067-819b-2c4fb4afe3d4",
  "type": "process",
  "source": "/process/pam-kafka-message.process-kafka-sample/2",
  "data": {
    "compositeId": "default-kieserver_2",
    "id": 2,
    "processId": "pam-kafka-message.process-kafka-sample",
    "processName": "process-kafka-sample",
    "processVersion": "1.0",
    "state": 2,
    "containerId": "pam-kafka-message_1.0.0-SNAPSHOT",
    "initiator": "unknown",
    "date": "2021-04-19T14:04:15.829+0900",
    "processInstanceDescription": "process-kafka-sample",
    "correlationKey": "2",
    "parentId": -1,
    "variables": {
      "request": {
        "id": 3,
        "status": "Silver",
        "score": 150
      },
      "approval": false,
      "initiator": "unknown",
      "message": "審査の結果、リクエストは却下とします。"
    }
  }
}

jbpm-processes-events の Consumer サービス のターミナルにも、プロセスに関するメッセージが送信されていますので、確認をしてみてください。

おわりに

いかがでしたでしょうか? RHPAM の開始・終了メッセージイベントを使用することで、Red Hat AMQ Streams (Apache Kafka) と統合し、イベント駆動型のマイクロサービスとして、ビジネスプロセスを運用することができるようになりました。 この機能についてのより詳細な情報については、以下のURLのドキュメントを参照して頂ければと思います。

Red Hat Process Automation Manager と Red Hat AMQ Streams の統合

* 各記事は著者の見解によるものでありその所属組織を代表する公式なものではありません。その内容については非公式見解を含みます。