Reaqtive.Core
Provides a set of base classes and utilities used for in-memory reactive event processing.
Layering of the assemblies
This library implements various interfaces that are provided in Reaqtive.Interfaces
. It is used by a variety of components in Reaqtive, including Reaqtive.Linq
but also query engine implementations. The separation of the Core
assembly from the Linq
assembly on top is an intentional design goal to reduce coupling between a query engine and the operator library, because alternative implementations (or subsets) can be provided.
Subscribables
The SubscribableBase<TResult>
class provides an abstract base class to implement ISubscribable<T>
with a few asserting behaviors enabled in DEBUG builds. It merely provides a SubscribeCore
abstract method, as shown below:
protected abstract ISubscription SubscribeCore(IObserver<TResult> observer);
Using this base class is recommended over manual implementation of the interface.
Subscriptions
Various ISubscription
implementations are provided by this library, in a manner similar to Rx's IDisposable
algebra. These include:
CompositeSubscription
to wrap a dynamic number ofISubscription
objects, where the number of them can vary widely (e.g. dependent on event counts).StableCompositeSubscription
to wrap a dynamic number ofISubscription
objects, whose total count is relatively stable (e.g. dependent on static properties of an operator).StaticCompositeSubscription
to wrap a static number ofISubscription
objects (e.g. when an operator always has the same number of child subscriptions).SingleAssignmentSubscription
to wrap a singleISubscription
object that can get assigned at a later point in time.SerialSubscription
to wrap a singleISubscription
object that replaces an existingISubscription
object upon reassignment.NopSubscription
as a no-op implementation ofISubscription
.
Besides the ISubscription
implementations, the library also provides facilities to visit subscription object trees. The base class is a SubscriptionVisitor
that implements ISubscriptionVisitor
and discovers children of IOperator
nodes by using the IOperator.Inputs
property. Derived classes can override Visit
(to control behavior of visiting children) and VisitCore
methods (to simply handle visiting each node). For example:
class MyVisitor : SubscriptionVisitor
{
protected override void VisitCore(IOperator node)
{
// Handle operator node
}
}
Auxiliary visitor types are provided to perform simple type checks on nodes prior to calling a virtual method. This is useful to visit nodes that implement a certain interface, e.g. IUnloadableOperator
. To do so, one can instantiate an SubscriptionVisitor<T>
, giving it an Action<T>
to handle nodes that pass the if (node is T)
check. An easier way is to use a factory method that uses a builder pattern:
public class SubscriptionVisitor : ISubscriptionVisitor
{
public static SubscriptionVisitorBuilder<T> Do<T>(Action<T> visit)
where T : class, IOperator;
// Omitted other members
}
The builder allows specifying more Do<T>
actions for different types, so you can build a visitor that handles nodes of different types. When the builder is configured, one can use Apply(ISubscription)
to apply the logic to a given subscription object. For example:
SubscriptionVisitor.Do<IUnloadableOperator>(u => u.Unload()).Apply(subscription);
A few standard visitors are provided as well, which operate on the basic lifecycle operations provided by IOperator
and IStatefulOperator
. These visitors are SubscriptionInitializeVisitor
and SubscriptionStateVisitor
. Let's have a look at the former one first, focusing on a few commonly used methods:
public class SubscriptionInitializeVisitor
{
public static void Initialize(ISubscription subscription, IOperatorContext context);
public static void Subscribe(ISubscription subscription);
public static void SetContext(ISubscription subscription, IOperatorContext context);
public static void Start(ISubscription subscription);
}
Note: Besides static methods, instance variants are provided as well, which operate on an instance of the visitor that's given an
ISubscription
. We omit this detail for clarity.
In here, Initialize
calls Subscribe
, SetContext
, and Start
on all of the nodes in the given subscription. This combination is useful when node state needs to be loaded (e.g. when recovering a subscription). If state loading should take place prior to calling Start
, one can call SetContext
first, then use the SubscriptionStateVisitor
to handle state loading, and finally call Start
.
Note: A query engine does perform these operations in separate stages of recovery. For example, it may first provide context to all reactive entities in the system after reinstantiating these from persisted expression trees. Next, it may restore state to stateful artifacts, possibly in a concurrent fashion. Finally, it can kick off the event flow using
Start
, often phasing this on an artifact-by-artifact basis (e.g. subscriptions first, subjects lsat).
The SubscriptionStateVisitor
provides ways to visit IStatefulOperator
nodes to perform state loading and saving, and related operations. The essence is shown below:
public class SubscriptionStateVisitor
{
public static bool HasStateChanged(ISubscription subscription);
public static void SaveState(ISubscription subscription, IOperatorStateWriterFactory factory);
public static void LoadState(ISubscription subscription, IOperatorStateReaderFactory factory);
public static void OnStateSaved(ISubscription subscription);
}
Note: Besides static methods, instance variants are provided as well, which operate on an instance of the visitor that's given an
ISubscription
. We omit this detail for clarity.
These methods should be pretty self-explanatory. For state recovery, the LoadState
method is used, between calls to SetContext
and Start
. For state checkpointing, three phases are used:
- Use
HasStateChanged
to figure out if any state in a subscription has changed. This is optional if one wants to perform a full checkpoint, but is used to support differential checkpoints. - Use
SaveState
to write the state of a subscription to a state writer. - Use
OnStateSaved
to unmark dirty bits after a successful commit of the transaction used to save the state.
Note: A query engine performs all of these steps after pausing the scheduler, and often does this in a highly concurrent manner. State is often saved to in-memory streams that only later get committed to a persistent store. This is done to ensure that the first two steps take as little time as possible, in order to avoid pausing the scheduler for too long.
Operators
In order to make implementing query operators easier, this library provides a number of base classes that can be used to reduce the amount of plumbing needed. These are:
Operator<TParam, TResult>
as the most general purpose base class.VersionedOperator<TParam, TResult>
adds support for versioning.StatefulVersionedOperator<TParam, TResult>
adds supports to save and load state.
UnaryOperator<TParam, TResult>
as a specialization for operators with a single source.VersionedUnaryOperator<TParam, TResult>
adds support for versioning.StatefulVersionedUnaryOperator<TParam, TResult>
adds supports to save and load state.
First, note the commonality in generic parameters:
TParam
is used to specify a piece of state that represents the operator's parameters, which are not persisted as part of operator state. These typically flow down from operator factories in theISubscribable<T>
space, e.g. thecount
parameter of aTake
operator. Upon recovery of the expression that represents the composition of operators (the "query" expression), the operators get reinstantiated, and thus the (immutable) parameters will be retained.TResult
parameter represents the result produced by the operator, which is fed to the downstream observer through anIObserver<TResult>
that's made available to derived classes using theOutput
property.
Operator base classes implement some logic directly, e.g. Start
, and provide additional virtual methods that can be overridden to implement custom behaviors. For example, OnSubscribe
is called when the operator is connected to one or more sources, and the derived class is asked to return a sequence of ISubscription
objects that will be exposed through the Inputs
property that's used by visitors to traverse the tree:
// Operator<TParam, TResult>
protected virtual IEnumerable<ISubscription> OnSubscribe();
For unary operators, this method is required to return a single ISubscription
instead:
// UnaryOperator<TParam, TResult>
protected virtual ISubscription OnSubscribe();
Other virtual methods include OnStart
and OnDispose
to deal with the basic lifecycle events.
protected virtual void OnStart();
protected virtual void OnDispose();
Finally, SetContext
provides a means to consult the context, or store it in a field. By default, this method doesn't do anything. Common uses are for operators to look up settings from the context, or for sources and sinks to access host-level facilities that are provided by the query engine that hosts the operator instance (e.g. an ingress and egress manager that is used to receive and send events to other machines or services).
public virtual void SetContext(IOperatorContext context);
The Versioned*
variants implement IVersioned
and require the derived class to implement Name
and Version
:
public abstract string Name { get; }
public abstract string Version { get; }
Finally, the Stateful*
variants derive from the Versioned*
variants and add IStatefulOperator
support by providing a few members:
public virtual bool StateChanged { get; protected set; }
public void LoadState(IOperatorStateReader reader, Version version);
protected virtual void LoadStateCore(IOperatorStateReader reader);
protected virtual void LoadStateCore(IOperatorStateReader reader, Version version);
public void SaveState(IOperatorStateWriter writer, Version version);
protected virtual void SaveStateCore(IOperatorStateWriter writer);
protected virtual void SaveStateCore(IOperatorStateWriter writer, Version version);
public virtual void OnStateSaved();
The first StateChanged
property is used to detect whether state is dirty. Operators use the setter to mark the state as dirty by assigning true
. For example, this is done after receiving an event in an Average
operator which has caused the sum
and count
fields to get updated. On the other hand, receiving in event in DistinctUntilChanged
that's the same as the previous event will not toggle the dirty flag. Every operator has different behavior when it comes to marking state as dirty.
Next, LoadState
and SaveState
are implemented in terms of *Core
methods. Variants without a Version
parameter are called when the version is the same as the one returned by the Version
property, thus reflecting state persistence using the latest version. If the version ever changes, the overloads with Version
parameters become important to deal with state that has a different version, e.g. when recovering state that was written using an older version of the operator, and thus may need to deal with missing values or conversions of state representation.
Finally, OnStateSaved
is typically not overridden and deals with the StateChanged
dirty flag logic internally, but derived classes can override it as a way to get notified when a checkpoint was successfully committed. An example of such an override is an ingress operator that receives events from an external system, which may use the OnStateSaved
call to send an acknowledgement to the external system allowing it to prune events up to a given sequence number.
Context-switch operators
The ContextSwitchOperator<TParam, TResult>
operator deserves its own section. This type acts as a base class for stateful operators that receive events from external sources and need to produce On*
invocations on downstream observers running within a query engine. One of the requirements for events flowing through query operators hosted in a query engine is that they originate on the right IScheduler
in order to allow the processing of events to be paused when a checkpoint is made. To achieve this, we need to "context switch" events from the outside world into the query engine world, i.e. on the IScheduler
.
From a top-level point of view, a context switch operator exposes the IObserver<TResult>
interface which is used to send notifications from any thread external to the engine. Think of this interface at sticking out of the boundaries of the hosting query engine, into the external world. Rather than flowing these events directly to downstream observers (for example, query operators to process the events), the context switch operator maintains an internal queue that gets drained on the IScheduler
that's obtained from the IOperatorContext
passed to SetContext
. This is part of the IOperator
implementation, which is used within the query engine's world. The context switch operator connects both worlds.
Internally, the context switch operator is implemented as a stateful operator (where the state is comprised of the events in the queue that have not yet been sent downstream) and an IYieldableItemProcessor
, which is a scheduler construct that processes the items in a queue and gets scheduled periodically to continue to drain the queue. The yield support is required to support prompt bail out from processing events when a checkpoint is requested.
Observers
This library also provides a number of built-in observers and base classes to build observers. Some of the ready-to-use observers include:
NopObserver<T>
, an observer that ignores allOn*
notifications.FaultObserver<T>
, an observer that throws exceptions when handlingOn*
notifications.Observer.Create<T>
, a factory method to create observers given delegates for the threeOn*
methods.
The Observer<T>
, VersionedObserver<T>
, and StatefulObserver<T>
are similar to the base classes used to implement operators; they do provide a set of virtual and/or abstract methods to control the behavior of an observer. Stateful observers implement IStatefulOperator
can be used within ISubscription
-based operator trees (for example in the Do
operator and as the input to the top-level Subscribe
method) and are hence reachable by visitors. Examples of stateful observers include egress sinks that generate sequence numbers for events sent to an external service; their state can contain the latest known sequence number that was used when emitting an event (i.e. a high watermark).
Subjects
Besides subscribables ("observables") and observers ("subscribers"), this library supports subjects based on the IMultiSubject
interface. Base classes are provided in the form of MultiSubjectBase
and related classes, which all implement IMultiSubject
. The two abstract methods to override are:
protected abstract IObserver<T> GetObserverCore<T>();
protected abstract ISubscribable<T> GetObservableCore<T>();
to handle both the consuming (observable) and producing (observer) sides of a subject. Implementations of subjects may be in-memory, akin to the different Rx subjects, or be backed by an external streaming service where the observers publish events and the observables receive events. State can be maintained by inheritin from StatefulMultiSubjectBase
which derives from VerionedMultiSubjectBase
, similar to operators and observers.
Note: There are many valid approaches to model external streams. One is to have a subject that acts as a proxy to the external stream both for publication and subscription. Another is to have observable/observer pairs that are defined as distinct artifacts. Yet another approach is to have generic observable and observer implementations for a specific streaming service, accepting the external stream name (or a more elaborate connection string) as a parameter (e.g.
EventHubObserver<T>(string)
).
Tasks
Scheduling work through the IScheduler
interface requires the use of the ISchedulerTask
interface. This enables reduction of allocations compared to a delegate-based approach, as is done in Rx. For example, an operator can implement the task interface and use it to register for callbacks from the scheduler at a specified time. However, it's often handy to allocate simple tasks through common implementations of ISchedulerTask
. Therefore, this library provides a few such implementations, including:
ActionTask
which takes anAction
to invoke on the scheduler.ActionTask<T>
which takes anAction<T>
and aT
, representing some state to avoid a closure, to invoke on the scheduler.
All action-based tasks are single-shot and implement the Execute
method by returning true
to indicate that the task has completed after invoking the action once.
Besides regular tasks, there's also support for IYieldableSchedulerTask
which is recommended if a task goes in a busy loop and should be able to yield back to the scheduler promptly, e.g. in case a checkpoint is initiated. Yieldable tasks are given a YieldToken
that should be observed for yield requests. The implementation of the Execute
method is represented through a Func<YieldToken, bool>
, thus supporting periodically invoked tasks that return a false
value to indicate they are not yet complete and have to be invoked again. Because yielding is interrupting running work, it's natural for a yieldable task to return in a non-completed status. Two yieldable tasks are provided:
YieldableActionTask
which takes aFunc<YieldToken, bool>
to invoke on the scheduler.YieldableActionTask<T>
which takes aFunc<T, YieldToken, bool>
and aT
, representing some state to avoid a closure, to invoke on the scheduler.
Finally, the library introduces the notion of an IItemProcessor
which looks like this:
public interface IItemProcessor
{
int ItemCount { get; }
void Process(int batchSize);
}
Item processors are runnable whenever ItemCount
is larger than 0
. When invoked, the Process
method is requested to process a batch of items, causing ItemCount
to decrease (though, obviously, other items may get enqueued at the same time). This is useful for constructs such as ContextSwitchOperator
which maintain an internal queue of events that need to get processed on a scheduler, while periodically yielding after processing a batch, in order to allow other work to make process as well. The IYieldableItemProcessor
variant supports yielding while processing a batch:
public interface IYieldableItemProcessor : IItemProcessor
{
void Process(int batchSize, YieldToken yieldToken);
}
Two implementations of these interfaces are provided in the library:
ItemProcessingTask
which wraps anIItemProcessor
implementation to make it anISchedulerTask
.YieldableItemProcessingTask
which wraps anIYieldableItemProcessor
implementation to make it anIYieldableSchedulerTask
.
It is quite common though to implement the interfaces directly, e.g. on an operator whose state contains a queue of events to dispatch.
Utilities
Finally, this library provides a number of utilities, including:
- The
ToSubscribable
conversion forIObservable<T>
implementations. - An implementation of
IOperatorContext
provided by theOperatorContext
class. - Various static methods on
StateManager
to deal with loading and saving operator state. - A
StateChangedManager
struct which can be embedded in the implementaton of stateful operators and manages theStateChanged
transitions.