Karavan Designerの紹介(その2)

みなさんこんにちは、レッドハットでソリューションアーキテクトをしている暮林といいます。

前回はKaravan Designerがどんなものかということを紹介いたしました。

rheb.hatenablog.com

今回の記事では、前回の予告通りノーコードでapache KafkaのPoCができるような使い方の紹介をしたいとおもいます。 Karavan DesignerにAtlas Mapを組み合わせていきます。

AtlasMapとは

AtlasMapはデータマッピングソリューションです。

www.atlasmap.io

https://www.atlasmap.io/images/datamapper.png

データマッピングというのは、あるサービスと別のサービス(またはデータベースなど)を接続するときに、どの項目をどの項目に移送するのか、または編集を施すのか、といった作業のことです。

このときに複雑な変換を行う必要がある場合と、よくある編集操作(たとえばTrimや文字数でカットなど)で済む場合があります。AtrasMapは後者で済んでしまうような場合に効果を発揮します。もちろんこの手の編集は通常のプログラミングでも行うことはできますが、Boring Jobになること間違いなしです。

使ってみる

AtlasMapは実行可能Jarとして配布されていますが、VSCodeプラグインとしても起動できます。とはいえVSCode内で実行可能Jarを起動しているだけっぽいので、Javaが実行できる環境は必要です。

プラグインインストール

VSCodeのマーケットプレースから「AtlasMap」を検索して、「AtlasMap Data Transformation editor by Red Hat」をインストールしてください。

f:id:Tatsuyak9i:20220125090457p:plain

空ファイル作成

AtlasMapプラグインは「*.adm」という拡張子に反応します。ここではためしに、「order-mapping.adm」というファイルをVSCodeのファイルメニューから作成して、ダブルクリックしてみましょう。

f:id:Tatsuyak9i:20220125153025p:plain

AtlasMapのエディターが開きましたね!

早速マッピングしてみる

早速マッピングをしてみましょう。JsonやXMLのスキーマ、もしくはインスタンスをインポートしましょう。

f:id:Tatsuyak9i:20220125153232p:plain

少し長いですが、今回はこのJsonをつかってみましょう。適当な名前で保存してから、上記のメニューからインポートしてください。インスタンスかスキーマかをきれるので、インスタンスを選んでください。

{
    "schema": {
        "type": "struct",
        "fields": [
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "int32",
                        "optional": false,
                        "field": "OrderId"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "OrderType"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "OrderItemName"
                    },
                    {
                        "type": "int32",
                        "optional": false,
                        "field": "Quantity"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "Price"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "ShipmentAddress"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "ZipCode"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "OrderUser"
                    }
                ],
                "optional": true,
                "name": "user1.earth.dbo.Orders.Value",
                "field": "before"
            },
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "int32",
                        "optional": false,
                        "field": "OrderId"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "OrderType"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "OrderItemName"
                    },
                    {
                        "type": "int32",
                        "optional": false,
                        "field": "Quantity"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "Price"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "ShipmentAddress"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "ZipCode"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "OrderUser"
                    }
                ],
                "optional": true,
                "name": "user1.earth.dbo.Orders.Value",
                "field": "after"
            },
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "string",
                        "optional": false,
                        "field": "version"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "connector"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "name"
                    },
                    {
                        "type": "int64",
                        "optional": false,
                        "field": "ts_ms"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "name": "io.debezium.data.Enum",
                        "version": 1,
                        "parameters": {
                            "allowed": "true,last,false"
                        },
                        "default": "false",
                        "field": "snapshot"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "db"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "schema"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "table"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "change_lsn"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "commit_lsn"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "event_serial_no"
                    }
                ],
                "optional": false,
                "name": "io.debezium.connector.sqlserver.Source",
                "field": "source"
            },
            {
                "type": "string",
                "optional": false,
                "field": "op"
            },
            {
                "type": "int64",
                "optional": true,
                "field": "ts_ms"
            },
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "string",
                        "optional": false,
                        "field": "id"
                    },
                    {
                        "type": "int64",
                        "optional": false,
                        "field": "total_order"
                    },
                    {
                        "type": "int64",
                        "optional": false,
                        "field": "data_collection_order"
                    }
                ],
                "optional": true,
                "field": "transaction"
            }
        ],
        "optional": false,
        "name": "user1.earth.dbo.Orders.Envelope"
    },
    "payload": {
        "before": null,
        "after": {
            "OrderId": 1,
            "OrderType": "E",
            "OrderItemName": "Lime",
            "Quantity": 100,
            "Price": "3.69",
            "ShipmentAddress": "541-428 Nulla Avenue",
            "ZipCode": "4286",
            "OrderUser": "user1"
        },
        "source": {
            "version": "1.1.2.Final-redhat-00001",
            "connector": "sqlserver",
            "name": "user1.earth",
            "ts_ms": 1636711635163,
            "snapshot": "true",
            "db": "InternationalDB",
            "schema": "dbo",
            "table": "Orders",
            "change_lsn": null,
            "commit_lsn": "00000049:00007a08:0003",
            "event_serial_no": null
        },
        "op": "r",
        "ts_ms": 1636711635166,
        "transaction": null
    }
}

このJsonはDebeziumというChange Data Captureソリューションが出力するJsonの例です。アプリとしては前半部分のスキーマが不要で、payloadの中のafterの中身だけが欲しいと想定します。今度はTarget側でインポートしましょう。(これも一旦適当なファイルに保存してください)

 {
    "OrderId": 1,
    "OrderType": "E",
    "OrderItemName": "Lime",
    "Quantity": 100,
    "Price": "3.69",
    "ShipmentAddress": "541-428 Nulla Avenue",
    "ZipCode": "4286",
    "OrderUser": "user1"
}

こんな感じになったはずです。(Payload→Afterと開いてあります)

f:id:Tatsuyak9i:20220125154324p:plain

つなげてみる

勘のいい方はここからの操作は想像がついているかもしれません。左側の項目を右側の項目のマッピングしたい項目にドラッグアンドドロップしてみてください。

f:id:Tatsuyak9i:20220125154603p:plain

線が引けましたね。あとは他の項目もどんどん線をひいていきましょう。

f:id:Tatsuyak9i:20220125154713p:plain

あとはファイルをセーブしてマッピングの作成は完了です!admファイルはそのまま利用します。

Karavan Designerから利用する

では当初の目的に戻ってKaravan DesignerからできあがったAtlasMapを利用するDSLを書いてみましょう。 まず最初のステップはKafka TopicNot Secured Sourceからです。検索ボックスに「Kafka」といれてSourceタブを選んでください。

f:id:Tatsuyak9i:20220125155214p:plain

f:id:Tatsuyak9i:20220125155307p:plain

ひとまずパラメータの設定はおいておきましょう。次がAtlasMapのステップです。検索ボックスにatlasといれてProducerタブを選んでください。

f:id:Tatsuyak9i:20220125155359p:plain

f:id:Tatsuyak9i:20220125155421p:plain

最後はまたKafkaですね。検索ボックスに「Kafka」といれてSinkタブを選んでください。

f:id:Tatsuyak9i:20220125155650p:plain

f:id:Tatsuyak9i:20220125155713p:plain

こんな感じになったと思います。この例では色気を出して途中でログを出力しています。(Log Sinkをつかっています) 現状のYAML DSLはこんなかんじです。

apiVersion: camel.apache.org/v1
kind: Integration
metadata:
  name: kafkatokafka.yaml
spec:
  flows:
    - from:
        uri: 'kamelet:kafka-not-secured-source'
        steps:
          - to:
              uri: atlasmap
          - kamelet:
              name: log-sink
          - kamelet:
              name: kafka-not-secured-sink

あとはそれぞれのステップをクリックしてパラメータをうめましょう。

Kafka not Secured SourceについてはBrokerとTopic Namesだけいれれば十分ですが、Consumer GroupまでいれておくとKafka側でどこまで読んだか覚えてくれるようになります。(Consumer GroupはConsumerをスケールアウトしたときにきちんとそれぞれのプロセス割り振るためのパラメータですが、Consumerが1多重でもどこまで読んだかをKafka側に記憶してもらうために使うことができます)

次はAtlasMapのパラメーターです。ここはちょっと覚えゲー的なところがありますが、Resource Uriにこのようにいれてください。

file:/etc/camel/resources/order-mapping.adm

f:id:Tatsuyak9i:20220125160552p:plain

パラメータを入れ終わったらファイルメニューからセーブしておきましょう。今回の例では最終的にこうなりました。

apiVersion: camel.apache.org/v1
kind: Integration
metadata:
  name: kafkatokafka.yaml
spec:
  flows:
    - from:
        uri: 'kamelet:kafka-not-secured-source'
        steps:
          - to:
              uri: 'atlasmap:file:/etc/camel/resources/order-mapping.adm'
          - kamelet:
              name: log-sink
          - kamelet:
              name: kafka-not-secured-sink
              parameters:
                brokers: my-cluster.svc
                topic: outcoming-topic
        parameters:
          brokers: 'my-cluster.svc:9092'
          topic: incoming-topic
          consumerGroup: group1

これでKafka Topicを読み出して、項目を編集、ログを出して、別のKafka Topicに出力するDSLがかけました! おいおい、これじゃAtlasMapのadmファイルローカルにあるままだぞ?感じですが、Camel-Kを実行するコマンドで --resource オプションとして指定してあげるとadmファイルがクラスパスの通っている場所にアップロードされる仕組みになっています。(下記のコマンドはadmファイルをmapディレクトリに置いている前提のコマンドです)

 kamel run kafkatokafka.yaml --resource file:map/order-mapping.adm

まとめ

今回はAtlasMapでデータマッピングを作成して、Karavan Designerのステップに組み込む方法をおつたえいたしました。 例として前後をKafkaとましたが、Camel-KのSourceとSinkには様々なサービスが使えることがチラチラと見えたと思いますので、同じようなやりかたで別々のサービスをつなげることができます。

そうはいってもREST APIで普通受けるよね?それはどうやるの?と思った方は下記のブログを読んでみてください。OpenAPI Specを使ってhttpの受け口を自動生成する方法です。

developers.redhat.com

developers.redhat.com

Karavan Designerについて2回の連載を行いましたがいかがでしたでしょううか?エンジニアを駆り出さなくてもサービスとサービスの接続ができそうな気がして来た!という方がいらっしゃれば幸いです。

* 各記事は著者の見解によるものでありその所属組織を代表する公式なものではありません。その内容については非公式見解を含みます。