Acking (Partially) Parallelly Processed Elements

Event sourcing with stream processing of events is great. It makes it possible to express your processing pipeline in a really natural way; it makes it easy to understand what processing steps are performed and in what order; and it allows for easily scaling up your processing pipeline, when properly done. Moreover, event streams allow for resiliency, and can be resumed in case of a crash or application restart.

That last part is the key word in this post: making partially sequential processing scale up while preserving some ordering properties and keeping things resumable.

But what’s the problem exactly?

At Free2Move, we use stream processing in different places, one of which is where user events are processed. vents are stored in an event store, and then read and processed to build the current user state. The processing steps have requirements on the order in which some events are processed, and it is also crucial for the infrastructure to scale in order to be able to process unrelated events in parallel. We can hence define a partial order on the user events with the following constraint: all events related to one specific user must be processed in the order they occurred.

In the remainder we will use three different drawings to demonstrate the solution. For illustration purpose, let’s assume we have three users on their vehicle: Carlo, Rolanda, and Bikey, each of whom uses Free2Move to connect to and use their favorite mobility providers. By doing so, events are created for each of these users so that registration and connection to different services are tracked and handled correctly. These flows require several steps to be performed, some of them requiring previous steps to have been processed. On the illustrations, the different steps are represented as several parts of the drawings being assembled.

The Carlo user is made of 4 parts:

Event Offset Representation
Front wheel front-wheel
Back wheel back-wheel
Car body car
Carlo carlo

On the other hand, the Rolanda user is made of 3 parts:

 

Event Offset Representation
Roller roller
Rolanda rolanda
Helmet helmet

Finally, the Bikey user is made of 2 parts:

Event Offset Representation
Bike bike
Bikey bikey

All these events must be processed in the specified order to build up the final user. Now let’s see what ways we have to solve this problem.

Let’s talk solution

Technologies and simple solution

Before we dive into the technical detail, let me introduce the technology stack used here. This part of the Free2Move infrastructure uses Akka and in particular:

Events are stored in a MongoDB storage and are read and processed using the Akka persistence query interface to store the current state in a PostgreSQL database. In this post we will focus on the read side, which is the interesting part in our case.

Each event takes then part to building the current state of the associated user. Let’s assume that we have a function named processOne that takes an event, processes it and returns the event offset, all of this asynchronously. Processing consist of assembling events together to build the current user state and persist it in the database.

The bikey event is about to be processed.
A simple solution is to connect to the event stream and process all the events as they are stored.

By setting the parallelism in mapAsync to 1, we ensure that all events are processed sequentially, and thus the constraint is respected. Processing is also resumable by storing the event offset once processed and restarting stream from that point on crash or restart.

The bikey event has been processed, Bikey is on his bike now.
Of course the simple sequential flow works, all events will eventually be processed and result in an up-to-date state, but it does not really scale for a number of reasons:

  • when the number of users increases, so does the number of events and the propagation time of an event highly depends on its order of arrival;
  • events depending on external resources (external service, database, …) may take some time to be processed, slowing down the entire event flow;
  • a de facto dependency is created between all events, which is completely artificial.

Lucky us, Akka provides us with a large variety of ways to perform processing in parallel without much changes.

Parallel processing of unrelated events

The first obvious way to parallelize work is to increase the parallelism parameter of mapAsync. However this does not work as we loose an important property: every events related to a given user must be treated in order. That is where the Partition stream comes into play.

This stream makes it possible to dispatch each element to one of many consumers, based on some decider function — the partitioner. By using a consistent partitioner, one can route all events of a given user to the same consumer, and assuming that every consumer treats the events in the order they arrive, we can achieve the parallelism we want for unrelated events while conserving order for events dispatched to one consumer.

For instance, let’s say we have two partitions, and assume the dispatch is based on the hash code of the persistenceId.

Events are dispatched based on their entity identifier (the color in the illustration). Carlo and Bikey events are dispatched to the top pipe, Rolanda events to the bottom one, offsets are then merged.

This is a bit more code but now we have parallelism for events that are unrelated to each other, while preserving ordering within each partition. However, the second constraint we had was that stream must be resumable. Each EventEnvelope is associated to an abstract offset from the database, which is returned and emitted downstream. When the stream is opened against the event store, one can provide such an offset to start the stream from that point on. Our processing pipeline can persist the last processed offset whenever we are sure it succeeded, so that the stream can be resumed in case of crash or upon service restart. However, with the parallelism we introduced, the order between different partition is not ensured, and because the offsets are abstract opaque values, they cannot be compared and ordered.

For instance, we first receive event front-wheel and then event roller, and both are processed in separate partitions in parallel. Let’s assume that the roller event is processed and emitted downstream and then the processing of front wheel fails, crashing the stream. If we were to restart the stream with the roller offset (the last seen of the successfully events), the front wheel event, coming before in the input stream, would not be re-processed, and its effects will be lost. This is not desirable, and leads to the need for some re-synchronization after processing of events.

Acking processed events

As we saw before, we are halfway through, the only missing part is the ability to reliably resume the stream of events at some point without skipping any of them. Events have a certain order in the input stream, but this order is not the same anymore in the output, so when one event offset is output, we cannot be sure that all the events that came before it in the input stream have been processed as well. We thus need a Merge operator but with some kind of memory of processed events, and that only emits an offset if we can ensure that all previous events have been treated as well. The offset being some opaque values, we cannot compare them, thus we need to associate every event with its index in the input stream. This can be easily achieved with the zipWithIndex method from Akka streams before stream is partitioned. Then, every partition emits the offset along with the incoming index, making it possible to order the offsets. Basically this looks like that:

Each event is associated with its index in the input stream before dispatch.

That’s it for the pre- and post-processing parts where events are associated to their index, and output offsets associated back to the index. Note that the processing function itself has no knowledge of that index, and works independently from it, so the business logic does not need to be adapted.

The output of this flow is a pair of offsets, now we can reorder everything and emit offsets in the correct order. To do so, we will implement our own GraphStage whose role will be to emit offsets that can be safely committed downstream.

Let’s start by defining the skeleton of the graph stage, with all the needed plumbing. Our stage is simply a flow that takes a pair of (Offset, Long) as input (the offsets together with their index in the original stream), and outputs Offsets in correct order.

The interesting part is then the logic part. The basic idea of this stage is that it accumulates offsets with their index until it gets a contiguous range of offsets that have been treated. Receiving this contiguous range of offsets ensures that we can safely commit the last one, as all previous ones have been processed. If we store the highest emitted index in the connector, we can trigger the next offset emission whenever the lowest index of the contiguous range is the one directly after the last emitted one.

So let’s start by storing the last emitted index in the stage state, and a priority queue of the received offset and index. The order used by the priority queue is lowest index first. Whenever we receive a new pair, we can store it in the priority queue.

Then when downstream asks for an element, we can deliver the highest offset of the starting contiguous range of offsets, if they start right after the last emitted index.

The deliver method starts with the lowest identifier and iterates through the priority queue to find the highest offset of the contiguous range. Then it stores the chosen index, an pushes the offset downstream.

It seems nice, but there still are cases that are not handled correctly here, and would make the stream hang. For instance, what happens if:

  • a new element is pushed on the input and none has been asked downstream?
  • upstream finishes but a new element could be emitted before ending?

To handle these cases, we can:

  • use the isAvailable method to determine whether output has been pulled by downstream, and emit directly when possible;
  • override the onUpstreamFinish method to emit one last element before ending the stream.

The final version of the stage is then:

The use of emit instead of push in the deliver method ensures that the element will be emitted on next downstream pull.

This all work fine, and we can then bind our previous stream to that stage.

The resulting stream now emits the highest offset that can be committed at any point in time. The offset can be persisted and used to resume the stream at that given point.

Only offset with highest index is emitted after merging.

This implementation works well under several assumptions:

  • Each offset is unique, in our case this is ensured by the event store (MongoDB) when opening the stream of events.
  • Processing of events must be idempotent and allow for already processed events can be reprocessed. This can happen when a stream fails because of one event. Some later events might already have been processed by other partitions, but not committed because the failing events was not delivered yet. When restarting from the last saved offset, these events would be replayed as well.
  • The processing function should always end, either with a success or an error. If some Future never ends, then the entire stream will hang waiting for the offset of the processed event, and the queue might grow indefinitely, which no one wants.

If your processing step respects these assumptions, then this solution might work for you.

Possible improvements

This implementation works but may be improved in several ways, depending on your use case.

For instance, the queue could be implemented in a smarter way using interval trees. Indeed, we do not need to store all offsets, but only the highest one in each contiguous range. Using a kind of map whose keys are intervals and values are the highest offset, we can avoid storing a lot of useless data.

In some cases, the last step can behave as the identity, but you can always throttle the output, so that you do not constantly persist offsets. This might be a good idea if your system doesn’t mind delaying the commit of treated events.

When event processing fails in any partition, then the entire stream is failed immediately. As for the onUpstreamFinish method, we can override the onUpstreamFailure method and emit one last element before failing if possible. However the solution is not as straight-forward, as the failStage method seems to fail immediately and not waiting for the last push to be over. We could have the wanted behavior by adding a flag that indicates that one last pull is allowed before failing, but the overhead did not seem worth it in our case.

Acknowledgments

I would like to give a special thank to Sheyda Sabetian for the nice illustrations and animation in this article.

FacebookTwitterWhatsApp