[Integration] Processing high volumes of updates to a single row using OSA

原文はこちら。
https://blogs.oracle.com/integration/entry/processing_high_volume_updates_using

今日の世界では、指数関数的にデータが生成されています。このような情報をすべて収集して処理して興味深い傾向を発見するための新しいビッグデータ・テクノロジーが利用できますが、即時対応する必要があったり、特定の項目の現在の状況を知りたいという場合もあります。このような要求を実現するために、すべての着信データを保存することもできますが、トレードオフとして、不要なデータを大量に保持し、ユーザーが最初に最新のレコードを決定するクエリを作成する必要があります。別の手法として、データベースの更新を行うこともできますが、デバイスが頻繁にステータスメッセージを送信する可能性があるIoT領域のような、単一の行が頻繁に更新されるシナリオでは、データベースの競合が発生する可能性があります。
この問題を解決する方法の一つとして、あまり知られていませんが、Oracle Stream Analytics(OSA)の利用があります。OSAは、SQLに類似しSQL-99に準拠しつつも、インメモリでの操作と問題を非常に効率的な解決のために、追加の構文を備えた、Continuous Query Language(以下CQL)という非常に洗練された言語を利用することができます。不要なデータベースの更新を避けるという課題は、OSAが十分に、しかもたった1個のクエリでとても簡単に解決します。
はじめにOSAランタイムの基本情報を説明します。非常に使い安いOSAのユーザーインターフェースを使い、わずかな時間でアプリケーション全体を開発することができるだけでなく、JDeveloperを使用した従来の方法でもアプリケーションを開発することができます。JDeveloperを使用して開発する場合、「イベント処理ネットワーク(Event Processing Network、以下EDN)」と呼ばれるアプリケーション・モデルを理解する必要があります。
EPN
EPNは、アプリケーションからのデータフローを処理します。EPNには実行させたいCQLを保持するCQLプロセッサノードが含まれます。今回のケースでは、数十行のJavaコードを使って書き込むという比較的複雑なタスクを実行するためのCQLを、下図のような非常にシンプルな文を使い、非常に簡単かつ効率的にメモリ内で実行します。
CQL
このクエリでは、必要な属性(SELECT * FROM InputChannelも使用できます)のみを選択し、CQL構文を使用してストリームをデバイスIDで"ROWS 1"を使ってパーティション化します("ROWS 1"は、各デバイスID毎に1個の行だけという意味です)。このデバイスIDごとの1行を"RANGE 2 SECONDS"で指定した2秒間保持し、結果を2秒毎(SLIDE 2 SECONDS)に出力します。ここで、"RANGE"と"SLIDE"で指定した値が同じ時間間隔の場合、本質的には2秒ごとに(クエリに状態を持たずに)最初からやり直すことになります。

[注意]
2秒が長いという場合には、1秒もしくはミリ秒単位で時間を指定してください。

このテストでは、データはランダムに生成されますが、データと結果が正確かどうかを確認します。今回は、Oracle BAM 12cにデータを送信します。JMS Mapメッセージとしてデータを自動的に書き出すJMSアダプタを使用してOSAアプリケーションからデータを出力するようにします(設定はイベント・タイプ名をOSA JMS構成に指定するだけです。標準のJMSアダプタはコンバータクラスを使わずにJMSメッセージを作成します)。今回こうした理由は、以下の点で便利だからです。
  • BAMのEnterprise Message Sources(以下EMS)をBAM 12c管理者ページで簡単に構成でき、Mapメッセージ属性をデータオブジェクトへ割り当てることができる。
  • OSAおよびBAMで全て完結し、Javaコードを一切各必要がない


このテストのために、空のデータオブジェクトから始め、EMSメトリックをリセットすることをお忘れなく。

私たちのBAM EMSは "Upsert" で構成されています。これは、レコードが存在しない場合は、(deviceIDとして定義されているキーに基づいて)レコードを挿入し、当該デバイスIDを持つ行がデータオブジェクト(つまりデータベース表)に既に存在する場合には、更新することを意味します。

まず、管理した状態で少量のデータセットを送信し、期待どおりに動作することを確認しましょう。たった10個の一意のデバイスIDに対応する30個のイベントを送信し、各イベント間には意図的にわずかな遅延を挟むことにします。


10個の異なるデバイスIDがサンプル中にあり、10個のメッセージが永続化されているため、10個のメッセージのみが送信されたというBAM EMSのメトリックが正しいことがわかります。

デバイスID毎にdeviceCodeとcodeValueを調査すると、これらのメッセージがBAMダッシュボードでユーザーに見てもらいたい直近10メッセージ(各デバイスから送信された最新のメッセージ)であることがわかります。

”Upsert”の操作のためのキーフィールドが2個ある場合、次のようにコンマで区切られたpartition by句の両方にそれらを並べることができます。

データオブジェクトにはもっと多くの行の組み合わせがあるかもしれませんが、Device ID、Device Codeの組み合わせの1エントリだけにしておきます。

是非ご自身で試してください。そうすれば、デバイスの最新の状態を非常に迅速かつ簡単に提供するという目標を達成していることを目の当たりにされることでしょう。同じ行を何度も更新するためにデータベースの競合を潜在的に発生させる可能性がある、不要なJMSメッセージを生成し、ネットワークを介して送信し、Queueに配置した後に、BAMのEMSがQueueから取り出し、処理する必要はなくなります。ユーザーにデバイスの最新状態のメッセージを配信するという目的を満足するためのソリューションの効率を非常に簡単に、大幅に向上しました。そして、追加ボーナスとして、迅速に上書きされるメッセージを生成しないために他の目的にCPU利用率を節約できたので、運用効率も向上しました。

0 件のコメント:

コメントを投稿