How Reaqtor combines reactivity with reliable data processing
One of the big features that distinguishes Reaqtor from classic Rx is the ability to run long running stateful queries. It does this through the persistence of state which allows these queries to survive restarts, outages and even moving between different machines. The Reaqtor framework supports this by persistently storing the state via checkpointing.
But what does this mean?
Let’s use the Take
operator as an example.
Firstly, using normal Rx, let's say we used this operator to take the first 8 items in an observable stream as follows:
var taken = observable.Take(8)
We will assume that the observable stream acts like a queue, where items are processed once and then removed from the queue.
Using the code above, we would only take the first 8 items in the stream. However, if the process was restarted then the Take
operator would be reapplied and we would again take the first 8 items.
This would mean that all of the items that had previously been processed would be lost, and that the first 8 items from part way through the stream would be processed.
When using Reaqtor on the other hand, a key feature is that operator state is persistent.
In Reaqtor
The way this works is that the reactive engine generates checkpoints regularly. These checkpoints record where in the stream you have reached, and the operator state at that point.
This means that should the engine be restarted, you can restart processing from where you left off. Not only this, but the preserving of operator state means that operations (such as take) will not be reapplied.
Let's run through a demo to highlight what this looks like. The main method of the demo looks like this:
static async Task Main(string[] args)
{
await PayNoAttentionToTheManBehindTheCurtain();
switch (args[0])
{
case "1":
await Part1();
break;
case "2":
await Part2();
break;
}
ps.Dispose();
}
Where we can either run Part1
or Part2
each time we start up the process.
Part 1
When we start up Reaqtor's query engine we run the following code:
Uri queryEngineIdentifier = new Uri("demo://test/engine1");
var engine = new QueryEngine(queryEngineIdentifier, scheduler, store, ingressEgressManager);
await engine.RecoverAsync(store.GetReader());
var ctx = new ReactorContext(engine);
First, we recover the query engine from any existing state (at this point no subscriptions have been started in the engine, so when it starts nothing will happen).
Once we have instantiated our query engine, we then get a reference to definitions of the timer
observable , and the consoleOutput
observer which will allow us to produce output to the console:
var timer = ctx.GetObservable<TimeSpan, DateTimeOffset>(new Uri("demo://observables/timer"));
var consoleOutput = ctx.GetObserver<double>(new Uri("demo://observers/cout"));
We are able to retrieve these from there definitions becase they have been registered on the reactive context:
await ctx.DefineObserverAsync(new Uri("demo://observers/cout"),
ctx.Provider.CreateQbserver<T>(Expression.New(typeof(ConsoleObserver<T>))),
null,
CancellationToken.None);
await ctx.DefineObservableAsync<TimeSpan, DateTimeOffset>(new Uri("demo://observables/timer"),
t => new TimerObservable(t).AsAsyncQbservable(),
null,
CancellationTokenNone);
In this case the observer/observable definitions are a part of the demo project, e.g.:
internal sealed class ConsoleObserver<T> : IObserver<T>
{
public void OnCompleted() => Console.WriteLine("OnCompleted()");
public void OnError(Exception error) => Console.WriteLine($"OnError({error})");
public void OnNext(T value) => Console.WriteLine($"OnNext({value})");
}
However there are many of these types defined as part of Reaqtor, with more being added as we speak!
Once we have retrieved our timer
and consoleOutput
definitions, we can then use them to create a subscription:
var currentSpeedObservable =
timer(TimeSpan.FromSeconds(1))
.Select(_ => GetSpeed())
.Take(8);
This takes the timer observable definition, tells it to call the GetSpeed
method every second, and take the first 8 of those values. This is an asynchronous process definition, and no work will start until something subscribes.
The report speed method here is just reporting a random number:
private static double GetSpeed()
{
return Math.Round((30 + (new Random().NextDouble() * 5)), 0);
}
But this is where the real data that you are processing using Reaqtor would be produced. This data could be anything - current speed, temperature, failure events, location data, etc. Reaqtor has the potential to be used in a huge variety of different scenarios - many of which we are yet to think of!
So, we will now subscribe our consoleOutput
observer:
await currentSpeedObservable.SubscribeAsync(consoleOutput,
new Uri("demo://subscriptions/sub1"),
null,
CancellationToken.None);
And wait 4 seconds before checkpointing and then unloading the engine:
await Task.Delay(TimeSpan.FromSeconds(4.3));
await engine.CheckpointAsync(store.GetWriter());
await engine.UnloadAsync();
When we run the code, we will see the following output:
OnNext(34)
OnNext(33)
OnNext(33)
OnNext(32)
OnNext(30)
Engine was unloaded, saving state...
State saved.
We can see that five items are passed to the OnNext
method of our output observer. The engine is then unloaded and the state is saved.
Back in the code we can see that we are writing this state out to a file on disk:
Console.WriteLine("Engine was unloaded, saving state...");
string json = store.AsJson();
File.WriteAllText("State.json", json);
Console.WriteLine("State saved.");
After saving out this state, we reach the end of the Part1
method and the program exits.
Part 2
If we now restart the program, this time running Part2
, we again start by reloading the engine from the stored state:
string json = File.ReadAllText("State.json");
store = InMemoryKeyValueStore.FromJson(json);
var engine = new QueryEngine(QueryEngineIdentifier, scheduler, store, ingressEgressManager);
await engine.RecoverAsync(store.GetReader());
As soon as the engine recovers, it finds that there are unprocessed items which need to be passed to our consoleOutput
observer, and the following output will be produced:
Loading state...
Recovering from stored state
Recovery complete
OnNext(31)
OnNext(30)
OnNext(33)
We can see that the remaining items from the observable are now passed to our observer. Unlike in regular Rx, the state of the Take
operator has been saved and instead of restarting and taking the next 8 items, it remembers that it has already taken 5 and just produces the remaining 3.
Therefore, processing continues as if the restart had never happened.
The fact that queries can continue to run throughout engine restarts means that you have the ability to support extremely long running operations. These queries can move between separate processes and machines, with the current state of the process being written out to persistent storage.
Reaqtor also gives assurances about reliability that mean that you can be sure that no state will be lost throughout these restarts, and that you can run Rx-like queries but with predictable and repeatable results.
The upshot is that you can create queries which can run for hours, days, months, or even years! Without worrying about engine restarts losing state, or producing unpredictable and incorrect results. And, it this ability to execute these incredibly long running queries that enables support for services like Bing, Cortana and M365.
This exciting technology brings the world of live, reactive, Rx-based queries together with the long-term persistence of information!
Here's a link to a video recording of this demo.
And if you want to know more about Reaqtor, head over to reaqtive.net for more detail about how it works, the kind of scenarios it can support, and the history of the project!