[CEP] Aggregating cache data from OCEP in CQL

時として、Oracle CEPのアプリケーションが、Coherenceキャッシュ中のデータなどの外部データと、ストリームのデータをJoinしなければならないユースケースがあります。Oracle CEPのストリーミング言語であるCQLは、Coherenceのデータ(より複雑なクエリは、将来のリリースでサポートされる予定)とストリームデータの単純なキャッシュキーベースのJoinをサポートしていますが、ストリームからの入力データに基づいて、Coherence内のデータを集約しなければならない場合があります。このエントリではそういう場合の例を説明いたします。

この例では、クレジットカード詐欺の検知を簡略化して考えます。このサンプルアプリケーションへの入力は、クレジットカードのトランザクションデータのストリームです。入力ストリームには、クレジットカード番号、取引の時間や取引金額などの情報が含まれています。このアプリケーションの目的は、疑わしい取引を検出し、警告イベントを送信することです。単純化のために、1000ドルを超えるすべての取引が不審であると仮定します。取引履歴はCoherenceの分散キャッシュにあり、検出されたすべての疑わしい取引について、次の図に示すように、過去30日間の最大取引金額、取引金額の合計と取引件数と共に警告イベントを送信する必要があります。

アプリケーションの入力
EPNへの入力ストリームには、CCTransactionEvent型のイベントを含んでいます。

CCTransactionEventクラス
http://blogs.oracle.com/CEP/resource/aggregating-cache-data/CCTransactionEvent.java

この入力データはキャッシュ中の全クレジットカード取引情報とJoinする必要があります。キャッシュはEPN内で以下のように構成されています。
<wlevs:caching-system id="CohCacheSystem" provider="coherence"/>
    <wlevs:cache id="CCTransactionsCache" value-type="CCTransactionEvent" 
                 key-properties="cardID, transactionTime"
                 caching-system="CohCacheSystem">
    </wlevs:cache>
アプリケーションからの出力
アプリケーションからの出力は不正警告イベントで、以下のようなSpringファイル内で設定されています。cardHistoryプロパティのソースはこちらで確認できます。

CardHistoryDataクラス
http://blogs.oracle.com/CEP/resource/aggregating-cache-data/CardHistoryData.java
<wlevs:event-type type-name="FraudWarningEvent">
          <wlevs:properties type="tuple">
              <wlevs:property name="cardID" type="CHAR"/>
              <wlevs:property name="transactionTime" type="BIGINT"/>
              <wlevs:property name="transactionAmount" type="DOUBLE"/>
              <wlevs:property name="cardHistory" type="OBJECT"/>
          </wlevs:properties>
      </wlevs:event-type>
Javaカートリッジを使ったキャッシュデータの集約
出力警告イベントで、cardHistoryプロパティには過去30日の情報を集約したキャッシュからの情報が含まれています。この情報を取得するため、Javaカートリッジを使います。これは、CoherenceのクエリAPIをクレジットカード取引のキャッシュに対して実行し、必要な情報を取得するというものです。Javaカートリッジにはキャッシュへの参照が必要ゆえ、Springコンテキストファイルに次のような設定をする必要があります。
<bean class="com.oracle.cep.ccfraud.CCTransactionsAggregator">
        <property name="cache" ref="CCTransactionsCache"/>
    </bean>
これは静的プロパティを設定するためにJavaクラスで使用します。
public void setCache(Map cache)
    {
        s_cache = (NamedCache) cache;
    }
以下のコードスニペットは、過去30日の全ての取引金額の合計の計算方法を示しています。CardHistoryオブジェクトが必要とする情報の残りも同様に計算できます。このクラスの完全なソースは以下からどうぞ。

CCTransactionsAffregatorクラス
http://blogs.oracle.com/CEP/resource/aggregating-cache-data/CCTransactionsAggregator.java

CoherenceのAPIを使ってキャッシュへのクエリを発行する方法についてはCoherenceの開発者ガイドをご覧下さい。
public static CreditHistoryData(String cardID)
{
…       
     Filter filter = QueryHelper.createFilter("cardID = :cardID and transactionTime > :transactionTime", map);
        CardHistoryData history = new CardHistoryData();
        Double sum = (Double) s_cache.aggregate(filter, new DoubleSum("getTransactionAmount"));
        history.setTotalAmount(sum);
…
    return history;
}
JavaカートリッジはCQLから以下のように利用します。
select cardID,
          transactionTime,
          transactionAmount,
          CCTransactionsAggregator.execute(cardID) as cardHistory
from inputChannel
where transactionAmount>1000
こうして、1000ドルを上回るのすべてのクレジットカード取引の履歴データとの警告のイベントが発生します。

これで終わりです。サンプルアプリケーションの完全なソースは、コンフィグレーションファイルと一緒に、こちらから入手可能です。

サンプルアプリケーションのソース
http://blogs.oracle.com/CEP/resource/aggregating-cache-data/ccfraud.zip

サンプルでは、簡単なJava Beanを使用して、取引履歴データの初期値をキャッシュにロードしています。入力アダプターは、入力ストリームのトランザクションイベントを作成、送信するために使用しています。


原文はこちら。
http://blogs.oracle.com/CEP/entry/aggregating_cache_data_from_ocep

0 件のコメント:

コメントを投稿