Scalable concurrency, a design pattern in the Apama EPL
Posted by The Progress Guys
This
is my final installment in a series devoted to a specific
example in the Apama EPL. I began this example by describing the basic design pattern of
a consumer/producer. Further enhancements enabled multiple
consumers and as a result the instance
idiom. Finally below, I will again enhance
this consumer/producer by illustrating how one can leverage multi-core
processors for massive scalability and parallelism.
As I have mentioned before, instances or 'sub-monitors' as they're often referred to in the Apama EPL define a discrete unit of work. That unit of work represents a set of business logic however large (a complete application scenario) or small (a simple analytic). Instances are created on demand using the spawn operator in the language. Each scenario instance is invoked with a unique set of input parameters that represent that occurrence. Each instance can then uniquely maintain its own reference data, timers and event streams, in effect its own state. In general programming patterns this is known as a factory behavioral model but we've extended it to include an execution model.
To provide a means to leverage multi-core processors, the Apama EPL provides a syntax and a simple semantic to allow those instances to execute in parallel. We do this with a language feature called contexts. These are silos of execution which take the factory model to the next level. A context defines a logical container that holds and executes instances of a scenario (of the same or differing types). The EPL provides a semantic for inter-context communication, there is no need for mutexes, semaphores or other locking schemes thus avoiding common deadlock code patterns typical of imperative languages such as java. Each context in effect has it's own logical input queue to which events are streamed from external sources or other contexts. Behind contexts our CEP engine squeezes the most out of operating system threads to leverage maximum use of multi-core processors.
The same CEP engine can create multiple contexts (a context pool as you'll soon see in the code example below), they can be used to hold and execute multiple scenario instances, additionally those instances can create sub-contexts for additional parallelism. If for example, these instances are an application for pricing Options and require a compute-intensive calculation such as Black Scholes, additional contexts can be spawned for these calculations. Furthermore, sub-contexts can be designed as shared compute services to be leveraged by multiple scenario instances running in different (parallel) contexts.
Contexts take the factory model and extend it to include a parallel execution model with a few simple keywords in the EPL as you'll soon see below.
The enhancements to the Item consumer/producer include a Context Pool which I've listed the code for below and the enhanced Item Producer that leverages it. The interface is unchanged except for one new event and the Consumer (client) has a minor revision (thus adhering to my belief that an EPL should follow the principles of structured programming of modularity and encapsulation that I've blogged on at the start of this series). The complete example for this revision is available here and requires Apama version 4.1 (or later of course).
The Context Pool.
package com.apamax.sample; event ContextPool { integer numContexts; sequence<context> contexts; integer idx; action create(integer nc, string name) { self.numContexts := nc; while(nc > 0) { contexts.append(context(name, false)); nc := nc - 1; } } action getContext() returns context { context c:= contexts[idx]; idx := idx + 1; if(idx=numContexts) then { idx := 0; } return c; } } |
The ContextPool
as implemented here is a general-purpose utility that provides a pool
of contexts via a create
method (i.e. action) and a means to distribute a workload across them
in a simple round-robining technique each time
the getContext
action is called.
The (parallel) Item Producer.
| package com.apamax.sample; monitor ItemService { event ClearUserID { integer id; } integer count := 0; float price := 0.0; action onload { ContextPool cf:=new ContextPool; cf.create(4, "ClientService"); // list of subscriber (user) identifiers sequence<integer> ids := new sequence<integer>; SubscribeToItems s; on all SubscribeToItems():s { if ids.indexOf(s.subscriberId)= -1 then { context c:= cf.getContext(); ids.append(s.subscriberId); route SubscriptionResponse(s.subscriberId, c); on completed SubscriptionResponse() { spawn startSubscriptions(s.subscriberId, s.item_name, context.current()) to c; } } } ClearUserID c; on all ClearUserID():c { log "in " + c.toString(); integer index := ids.indexOf(c.id); if index != -1 then { ids.remove(index); } } } action startSubscriptions(integer this_subscriberId, string name, context mainContext) { log "in startSubscriptions"; on all wait(0.1) and not UnsubscribeFromItems(subscriberId = this_subscriberId) { route Item(this_subscriberId, name, count, price); count := count + 1; price := price + 0.1; } on UnsubscribeFromItems(subscriberId = this_subscriberId){ enqueue ClearUserID(this_subscriberId) to mainContext; } } } |
To get a general sense of what the multi-instance Item Producer code is intended to do, I
suggest a quick scan of my last installment,
this revision does not change that basic foundation it only
parallelizes it. It is worth pointing out how little the code and
design has changed yet this implementation has the ability to scale
massively to tens of thousands of instances across multiple
processor cores. Clearly this is just a simple example
that does very little real
work (producing Item events). However structurally, it's a model that
represents how one would design such a scalable service in the Apama
EPL.
Louie
Comments