CEP Client with WPF GridDataControl

Microsoft StreamInsight CEP is a very powerful platform for developing complex event processing (CEP) systems. There are several development models that we can follow. In this post we will use the IObservable/IObserver model using .NET Reactive extensions (Rx). Since this will be a real-time application, we will also be using F# async workflows to pull stock data.

 

clip_image002[1]

 

 

F# async workflows are the coolest part of using F# in a real-time application. They allow writing concise code that (1) executes in parallel and (2) exposes to another .NET library with ease.

I won’t go into detail about F# except for the async workflow used in this application. There is a three-part series on using design patterns for F# async workflows; I have used Pattern #3 in this post, since we are using Rx to invoke the workflows. In this design pattern, the worker reports the progress through events—a modified version of AsyncWorker<> code is shown below,

type JobCompletedEventArgs<'T>(job:int, result:'T) =

        inherit EventArgs()

        member x.Job with get() = job

        member x.Result with get() = result

    type AsyncWorker<'T>(jobs: seq>) =

     

        // This declares an F# event that we can raise

        let allCompleted  = new Event<'T[]>()

        let error         = new Event()

#if WPF

        let canceled      = new Event()

#else

        let canceled      = new Event()

#endif

        let jobCompleted  = new Event>()

        let cancellationCapability = new CancellationTokenSource()

        /// Start an instance of the work

        member x.Start()    =

            // Capture the synchronization context to allow us to raise events back on the GUI thread

            let syncContext = SynchronizationContext.CaptureCurrent()

     

            // Mark up the jobs with numbers

            let jobs = jobs |> Seq.mapi (fun i job -> (job, i+1))

            let raiseEventOnGuiThread(evt, args) = syncContext.RaiseEvent evt args                

            let work = 

                Async.Parallel

                   [ for (job,jobNumber) in jobs ->

                       async { let! result = job

                               syncContext.RaiseEvent jobCompleted (new JobCompletedEventArgs<'T>(jobNumber, result))

                               return result } ]

     

            Async.StartWithContinuations

                ( work,

                  (fun res -> raiseEventOnGuiThread(allCompleted, res)),

                  (fun exn -> raiseEventOnGuiThread(error, exn)),

                  (fun exn -> raiseEventOnGuiThread(canceled, exn)),

                    cancellationCapability.Token)

        /// Raised when a particular job completes

        []

        member x.JobCompleted = jobCompleted.Publish

        /// Raised when all jobs complete

        []

        member x.AllCompleted = allCompleted.Publish

        /// Raised when the composition is cancelled successfully

        []

        member x.Canceled = canceled.Publish

        /// Raised when the composition exhibits an error

        []

        member x.Error = error.Publish

We have used [] attributes to mark these events for exposing to other .NET CLI languages. Since we are using Rx, we need to have an event that inherits from System.EventArgs--JobCompletedEventArgs does that here. The AsyncWorker is now ready to be used as a library for running parallel code.

Stock Quotes Reader

The Stock Quotes Reader defines a wrapper that performs a request to the server (it would be Yahoo finance here) and pulls the stocks.

type StockAvailableEventArgs(stocks:string[]) =

        inherit EventArgs()

        member x.Stocks with get() = stocks

    type StockQuotesReader(quotes:string) =

        

        let stockAvailableEvent = new Event()

        let httpLines (uri:string) =

          async { let request = WebRequest.Create uri   

                  use! response = Async.FromBeginEnd(request.BeginGetResponse, request.EndGetResponse)

                  use stream = response.GetResponseStream()

                  use reader = new StreamReader(stream)

                  let lines = [ while not reader.EndOfStream do yield reader.ReadLine() ]

                  return lines }        

        // n - name, s - symbol, x - Stock Exchange, l1 - Last Trade, p2 - change in percent, h - high, l - low, o - open, p - previous close, v - volume

        let yahooUri (quotes:string) = 

            let uri = String.Format("http://finance.yahoo.com/d/quotes.csv?s={0}&f=nsxl1hlopv", quotes)

            uri

        member x.GetStocks() =

            let stocks = [httpLines(yahooUri quotes)]

            stocks

//        member x.TestAsync() =

//            let stocks = httpLines(yahooUri quotes)

//            Async.RunSynchronously(stocks)

        member x.PullStocks() =

            let stocks = x.GetStocks()

            let worker = new AsyncWorker<_>(stocks)

            worker.JobCompleted.Add(fun args ->

                stockAvailableEvent.Trigger(new StockAvailableEventArgs(args.Result |> List.toArray))

            )

            worker.Start()

        static member GetAsyncReader(quotes) =

            let reader = new StockQuotesReader(quotes)

            let stocks = reader.GetStocks()

            let worker = new AsyncWorker<_>(stocks)

            worker       

        []

        member x.StockAvailable = stockAvailableEvent.Publish

The above wrapper class does some interesting things:

  • It has an async block code that returns a line of data based on the response stream.
  • PullStocks will create async requests and raise the StockAvailable event whenever the async job is completed.

CEP Client

On the CEP client we will be using the following things:

· Syncfusion WPF GridDataControl – works well with high-speed data changes, keeping CPU usage to a minimum.

· Rx – creates requests and updates the ViewModel bound to the grid.

Application Setup

The WPF application uses a simple MV-VM by defining a StocksViewModel to hold stock data. The Stocks collection is bound to the Syncfusion WPF GridDataControl.

  

        

Using Rx to create requests

This real-time application requires real-time data that will be pulled over the wire for every 500 milliseconds. We will be making use of IObservable to create a streaming request and repeat that over a time delay.

 private void RealTimeStocks(int delay, string quotes)

        {

            var stockReader = new StockQuotesReader(quotes);

            var stockFeeds = Observable.Defer(() =>

            {

                stockReader.PullStocks();

                var evt = from e in Observable.FromEvent(stockReader, "StockAvailable")

                          select new { Stocks = e.EventArgs.Stocks.ToStockQuotes() };

                var delayedEvt = Observable.Return(evt).Delay(TimeSpan.FromMilliseconds(delay));

                return delayedEvt;

            }).Repeat();

            var stocks = from s in stockFeeds

                         from t in s

                         select t.Stocks;

            stocks.SubscribeOnDispatcher().Subscribe((stockQuotes) =>
            {
                this.AddOrUpdateModel(stockQuotes);

            });

        }

We now have streaming real-time stock data pulled asynchronously over the Web and shown on the Syncfusion GridDataControl.

 

clip_image004[1]

This also works with Essential Grid Silverlight in out-of-browser (OOB) mode. If you want to get hold of the sample, send me a request.

 

Loading