CEP Client with WPF GridDataControl

Microsoft StreamInsight CEP is a very powerful platform for developing complex event processing (CEP) systems. There are several development models that we can follow. In this post we will use the IObservable/IObserver model using .NET Reactive extensions (Rx). Since this will be a real-time application, we will also be using F# async workflows to pull stock data.





F# async workflows are the coolest part of using F# in a real-time application. They allow writing concise code that (1) executes in parallel and (2) exposes to another .NET library with ease.

I won’t go into detail about F# except for the async workflow used in this application. There is a three-part series on using design patterns for F# async workflows; I have used Pattern #3 in this post, since we are using Rx to invoke the workflows. In this design pattern, the worker reports the progress through events—a modified version of AsyncWorker<> code is shown below,

type JobCompletedEventArgs<'T>(job:int, result:'T) =

        inherit EventArgs()

        member x.Job with get() = job

        member x.Result with get() = result

    type AsyncWorker<'T>(jobs: seq>) =


        // This declares an F# event that we can raise

        let allCompleted  = new Event<'T[]>()

        let error         = new Event()

#if WPF

        let canceled      = new Event()


        let canceled      = new Event()


        let jobCompleted  = new Event>()

        let cancellationCapability = new CancellationTokenSource()

        /// Start an instance of the work

        member x.Start()    =

            // Capture the synchronization context to allow us to raise events back on the GUI thread

            let syncContext = SynchronizationContext.CaptureCurrent()


            // Mark up the jobs with numbers

            let jobs = jobs |> Seq.mapi (fun i job -> (job, i+1))

            let raiseEventOnGuiThread(evt, args) = syncContext.RaiseEvent evt args                

            let work = 


                   [ for (job,jobNumber) in jobs ->

                       async { let! result = job

                               syncContext.RaiseEvent jobCompleted (new JobCompletedEventArgs<'T>(jobNumber, result))

                               return result } ]



                ( work,

                  (fun res -> raiseEventOnGuiThread(allCompleted, res)),

                  (fun exn -> raiseEventOnGuiThread(error, exn)),

                  (fun exn -> raiseEventOnGuiThread(canceled, exn)),


        /// Raised when a particular job completes


        member x.JobCompleted = jobCompleted.Publish

        /// Raised when all jobs complete


        member x.AllCompleted = allCompleted.Publish

        /// Raised when the composition is cancelled successfully


        member x.Canceled = canceled.Publish

        /// Raised when the composition exhibits an error


        member x.Error = error.Publish

We have used [] attributes to mark these events for exposing to other .NET CLI languages. Since we are using Rx, we need to have an event that inherits from System.EventArgs--JobCompletedEventArgs does that here. The AsyncWorker is now ready to be used as a library for running parallel code.

Stock Quotes Reader

The Stock Quotes Reader defines a wrapper that performs a request to the server (it would be Yahoo finance here) and pulls the stocks.

type StockAvailableEventArgs(stocks:string[]) =

        inherit EventArgs()

        member x.Stocks with get() = stocks

    type StockQuotesReader(quotes:string) =


        let stockAvailableEvent = new Event()

        let httpLines (uri:string) =

          async { let request = WebRequest.Create uri   

                  use! response = Async.FromBeginEnd(request.BeginGetResponse, request.EndGetResponse)

                  use stream = response.GetResponseStream()

                  use reader = new StreamReader(stream)

                  let lines = [ while not reader.EndOfStream do yield reader.ReadLine() ]

                  return lines }        

        // n - name, s - symbol, x - Stock Exchange, l1 - Last Trade, p2 - change in percent, h - high, l - low, o - open, p - previous close, v - volume

        let yahooUri (quotes:string) = 

            let uri = String.Format("{0}&f=nsxl1hlopv", quotes)


        member x.GetStocks() =

            let stocks = [httpLines(yahooUri quotes)]


//        member x.TestAsync() =

//            let stocks = httpLines(yahooUri quotes)

//            Async.RunSynchronously(stocks)

        member x.PullStocks() =

            let stocks = x.GetStocks()

            let worker = new AsyncWorker<_>(stocks)

            worker.JobCompleted.Add(fun args ->

                stockAvailableEvent.Trigger(new StockAvailableEventArgs(args.Result |> List.toArray))



        static member GetAsyncReader(quotes) =

            let reader = new StockQuotesReader(quotes)

            let stocks = reader.GetStocks()

            let worker = new AsyncWorker<_>(stocks)



        member x.StockAvailable = stockAvailableEvent.Publish

The above wrapper class does some interesting things:

  • It has an async block code that returns a line of data based on the response stream.
  • PullStocks will create async requests and raise the StockAvailable event whenever the async job is completed.

CEP Client

On the CEP client we will be using the following things:

· Syncfusion WPF GridDataControl – works well with high-speed data changes, keeping CPU usage to a minimum.

· Rx – creates requests and updates the ViewModel bound to the grid.

Application Setup

The WPF application uses a simple MV-VM by defining a StocksViewModel to hold stock data. The Stocks collection is bound to the Syncfusion WPF GridDataControl.



Using Rx to create requests

This real-time application requires real-time data that will be pulled over the wire for every 500 milliseconds. We will be making use of IObservable to create a streaming request and repeat that over a time delay.

 private void RealTimeStocks(int delay, string quotes)


            var stockReader = new StockQuotesReader(quotes);

            var stockFeeds = Observable.Defer(() =>



                var evt = from e in Observable.FromEvent(stockReader, "StockAvailable")

                          select new { Stocks = e.EventArgs.Stocks.ToStockQuotes() };

                var delayedEvt = Observable.Return(evt).Delay(TimeSpan.FromMilliseconds(delay));

                return delayedEvt;


            var stocks = from s in stockFeeds

                         from t in s

                         select t.Stocks;

            stocks.SubscribeOnDispatcher().Subscribe((stockQuotes) =>



We now have streaming real-time stock data pulled asynchronously over the Web and shown on the Syncfusion GridDataControl.



This also works with Essential Grid Silverlight in out-of-browser (OOB) mode. If you want to get hold of the sample, send me a request.


Leave a comment