Saturday May 1, 2010 at 09:23 PM | Posted by: | Category: LINQ | Silverlight | WPF

Microsoft StreamInsight CEP is a very powerful platform for developing complex event processing (CEP) systems. There are several development models that we can follow. In this post we will use the IObservable/IObserver model using .NET Reactive extensions (Rx). Since this will be a real-time application, we will also be using F# async workflows to pull stock data.

 

clip_image002[1]

 

 

F# async workflows are the coolest part of using F# in a real-time application. They allow writing concise code that (1) executes in parallel and (2) exposes to another .NET library with ease.

I won’t go into detail about F# except for the async workflow used in this application. There is a three-part series on using design patterns for F# async workflows; I have used Pattern #3 in this post, since we are using Rx to invoke the workflows. In this design pattern, the worker reports the progress through events—a modified version of AsyncWorker<> code is shown below,

    type JobCompletedEventArgs<'T>(job:int, result:'T) =
        inherit EventArgs()
        member x.Job with get() = job
        member x.Result with get() = result
    type AsyncWorker<'T>(jobs: seq<Async<'T>>) =
     
        // This declares an F# event that we can raise
        let allCompleted  = new Event<'T[]>()
        let error         = new Event<System.Exception>()
#if WPF
        let canceled      = new Event<System.OperationCanceledException>()
#else
        let canceled      = new Event<OperationCanceledException>()
#endif
        let jobCompleted  = new Event<JobCompletedEventArgs<'T>>()
        let cancellationCapability = new CancellationTokenSource()
        /// Start an instance of the work
        member x.Start()    =
            // Capture the synchronization context to allow us to raise events back on the GUI thread
            let syncContext = SynchronizationContext.CaptureCurrent()
     
            // Mark up the jobs with numbers
            let jobs = jobs |> Seq.mapi (fun i job -> (job, i+1))
            let raiseEventOnGuiThread(evt, args) = syncContext.RaiseEvent evt args                
            let work = 
                Async.Parallel
                   [ for (job,jobNumber) in jobs ->
                       async { let! result = job
                               syncContext.RaiseEvent jobCompleted (new JobCompletedEventArgs<'T>(jobNumber, result))
                               return result } ]
     
            Async.StartWithContinuations
                ( work,
                  (fun res -> raiseEventOnGuiThread(allCompleted, res)),
                  (fun exn -> raiseEventOnGuiThread(error, exn)),
                  (fun exn -> raiseEventOnGuiThread(canceled, exn)),
                    cancellationCapability.Token)
        /// Raised when a particular job completes
        [<CLIEvent>]
        member x.JobCompleted = jobCompleted.Publish
        /// Raised when all jobs complete
        [<CLIEvent>]
        member x.AllCompleted = allCompleted.Publish
        /// Raised when the composition is cancelled successfully
        [<CLIEvent>]
        member x.Canceled = canceled.Publish
        /// Raised when the composition exhibits an error
        [<CLIEvent>]
        member x.Error = error.Publish

We have used [<CLIEvent>] attributes to mark these events for exposing to other .NET CLI languages. Since we are using Rx, we need to have an event that inherits from System.EventArgs--JobCompletedEventArgs<T> does that here. The AsyncWorker is now ready to be used as a library for running parallel code.

Stock Quotes Reader

The Stock Quotes Reader defines a wrapper that performs a request to the server (it would be Yahoo finance here) and pulls the stocks.

    type StockAvailableEventArgs(stocks:string[]) =
        inherit EventArgs()
        member x.Stocks with get() = stocks
    type StockQuotesReader(quotes:string) =
        
        let stockAvailableEvent = new Event<StockAvailableEventArgs>()
        let httpLines (uri:string) =
          async { let request = WebRequest.Create uri   
                  use! response = Async.FromBeginEnd(request.BeginGetResponse, request.EndGetResponse)
                  use stream = response.GetResponseStream()
                  use reader = new StreamReader(stream)
                  let lines = [ while not reader.EndOfStream do yield reader.ReadLine() ]
                  return lines }
        
        // n - name, s - symbol, x - Stock Exchange, l1 - Last Trade, p2 - change in percent, h - high, l - low, o - open, p - previous close, v - volume
        let yahooUri (quotes:string) = 
            let uri = String.Format("http://finance.yahoo.com/d/quotes.csv?s={0}&f=nsxl1hlopv", quotes)
            uri
        member x.GetStocks() =
            let stocks = [httpLines(yahooUri quotes)]
            stocks
//        member x.TestAsync() =
//            let stocks = httpLines(yahooUri quotes)
//            Async.RunSynchronously(stocks)
        member x.PullStocks() =
            let stocks = x.GetStocks()
            let worker = new AsyncWorker<_>(stocks)
            worker.JobCompleted.Add(fun args ->
                stockAvailableEvent.Trigger(new StockAvailableEventArgs(args.Result |> List.toArray))
            )
            worker.Start()
        static member GetAsyncReader(quotes) =
            let reader = new StockQuotesReader(quotes)
            let stocks = reader.GetStocks()
            let worker = new AsyncWorker<_>(stocks)
            worker
        
        [<CLIEvent>]
        member x.StockAvailable = stockAvailableEvent.Publish

The above wrapper class does some interesting things:

  • It has an async block code that returns a line of data based on the response stream.
  • PullStocks will create async requests and raise the StockAvailable event whenever the async job is completed.

CEP Client

On the CEP client we will be using the following things:

· Syncfusion WPF GridDataControl – works well with high-speed data changes, keeping CPU usage to a minimum.

· Rx – creates requests and updates the ViewModel bound to the grid.

Application Setup

The WPF application uses a simple MV-VM by defining a StocksViewModel to hold stock data. The Stocks collection is bound to the Syncfusion WPF GridDataControl.

        <syncfusion:GridDataControl 
            x:Name="grid"                                         
            NotifyPropertyChanges="True"                                          
            AutoPopulateRelations="False"                                        
            Width="Auto"
            VisualStyle="Office14Silver"
            AllowGroup="True"                                    
            ShowGroupDropArea="True"                                    
            AutoFocusCurrentItem="False"     
            ColumnSizer="AutoOnLoad"
            ItemsSource="{Binding Model.Stocks}"                                         
            AllowEdit="False"                                         
            IsGroupsExpanded="True"
            ShowAddNewRow="False">
        </syncfusion:GridDataControl>

Using Rx to create requests

This real-time application requires real-time data that will be pulled over the wire for every 500 milliseconds. We will be making use of IObservable to create a streaming request and repeat that over a time delay.

        private void RealTimeStocks(int delay, string quotes)
        {
            var stockReader = new StockQuotesReader(quotes);
            var stockFeeds = Observable.Defer(() =>
            {
                stockReader.PullStocks();
                var evt = from e in Observable.FromEvent<StockAvailableEventArgs>(stockReader, "StockAvailable")
                          select new { Stocks = e.EventArgs.Stocks.ToStockQuotes() };
                var delayedEvt = Observable.Return(evt).Delay(TimeSpan.FromMilliseconds(delay));
                return delayedEvt;
            }).Repeat();
            var stocks = from s in stockFeeds
                         from t in s
                         select t.Stocks;
            stocks.SubscribeOnDispatcher().Subscribe((stockQuotes) =>
            {
                this.AddOrUpdateModel(stockQuotes);
            });
        }

We now have streaming real-time stock data pulled asynchronously over the Web and shown on the Syncfusion GridDataControl.

 

clip_image004[1]

This also works with Essential Grid Silverlight in out-of-browser (OOB) mode. If you want to get hold of the sample, send me a request.

 

Comments

Add comment



biuquote
  • Comment
  • Preview
Loading

Tag cloud