Fault Tolerance in Akka
The official documentation on Fault Tolerance gives a partial introduction to supervisor strategies and a worked example of supervision. If you want the complete lowdown on supervision then you need to check out the official documentation on Supervision and Monitoring and the source code.
I struggled to extract what I consider to be the salient facts about supervision from the official documentation. That probably reflects more poorly on me than on the authors of the documentation. Nevertheless, I wanted something written down for my own reference.
Key Points
- Each actor supervises its children using a supervision strategy.
- System level actors are supervised by a guardian actor:
/user
. - The message that caused an actor to crash is not reprocessed.
That last one is a real gotcha, more on that later.
Supervisor Strategies
When creating an actor using context.actorOf
the resulting actor is supervised by the invoking actor. By default, every actor uses the supervisorStrategy
defined in Actor.scala:
def supervisorStrategy: SupervisorStrategy =
SupervisorStrategy.defaultStrategy
The SupervisorStrategy.defaultStrategy
is defined in FaultHandling.scala:
final val defaultStrategy: SupervisorStrategy = {
OneForOneStrategy()(defaultDecider)
}
The OneForOneStrategy
only applies the Decider
to the faulty child actor. For reference, the only alternative is the AllForOneStrategy
that applies the Decider
to all child actors regardless of who is at fault. For the most part it is the Decider
that you will want to change.
The defaultDecider
is also defined in FaultHandling.scala:
final val defaultDecider: Decider = {
case _: ActorInitializationException ⇒ Stop
case _: ActorKilledException ⇒ Stop
case _: DeathPactException ⇒ Stop
case _: Exception ⇒ Restart
}
So basically, unless you override it, any Exception
other than those first three will result in the child actor being restarted. Restarting an actor causes the state of an actor to be reset without notifying the supervisor.
Generally speaking, restarting an actor only makes sense if you believe its state has been corrupted. As a result, there are times when you really do not want that behaviour. Imagine, for example, a child actor that is responsible for running some stateful processing over several windows of data. You probably would not want to loose the accumulated state because of one bad window of data.
Fortunately defining a custom Decider
is straight forward and the directives Resume
and Escalate
allow you to either Resume processing whilst maintaining the child actors state or propagate the exception upwards:
class Supervisor extends Actor {
override def receive: Receive = {
case _ => ???
}
override def supervisorStrategy = OneForOneStrategy()(decider)
private def decider: Decider = {
case _: ActorInitializationException ⇒ Stop
case _: ActorKilledException ⇒ Stop
case _: DeathPactException ⇒ Stop
case _: Exception ⇒ Resume
}
}
You can find out more about each directive in: FaultHandling.scala.
Guardian Actor
When creating an actor using system.actorOf
the resulting actor is supervised by the guardian actor. The guardian actor supervisor strategy is stored in reference.conf:
akka.actor {
guardian-supervisor-strategy = "akka.actor.DefaultSupervisorStrategy"
}
That class is an implementation of SupervisorStrategyConfigurator
defined in FaultHandling.scala:
final class DefaultSupervisorStrategy extends SupervisorStrategyConfigurator {
override def create(): SupervisorStrategy =
SupervisorStrategy.defaultStrategy
}
You can override it by providing an application.conf:
akka.actor {
guardian-supervisor-strategy = "akka.actor.StoppingSupervisorStrategy"
}
Reprocessing Messages
There are a lot of ways you can reprocess failed messages. Each with there own advantages and disadvantages. Once you decide on an approach you should really try to be consistent throughout the project. You can take just about any actor:
class UnreliableNetwork extends Actor {
import Protocol._
override def receive: Receive = {
case packet: Packet => if(shouldDrop()) {
sender() ! PacketSent(packet)
} else {
throw new PacketException(packet, sender())
}
}
private def shouldDrop() = {
import scala.util.Random
Random.nextInt(2) == 0
}
}
object Protocol {
sealed trait Command
case class Packet(id: Int) extends Command
sealed trait Event
case class PacketSent(packet: Packet) extends Event
case class PacketException(packet: Packet, sender: ActorRef)
extends Exception
}
And make it reprocess messages using the postRestart
hook:
class ReliableNetwork extends Actor {
import Protocol._
override def postRestart(throwable: Throwable) = {
throwable match {
case PacketException(packet, sender) => self.!(packet)(sender)
case _ =>
}
super.postRestart(throwable)
}
override def receive: Receive = {
case packet: Packet => if(shouldDrop()) {
sender() ! PacketSent(packet)
} else {
throw new PacketException(packet, sender())
}
}
private def shouldDrop() = {
import scala.util.Random
Random.nextInt(2) == 0
}
}
There are a couple of issues with this approach however. First, there is an implicit assumption about the parent actors SupervisorStrategy
. Fortunately, the default strategy would work. However, there is no way to enforce this assumption. Second, assuming the sender doesn’t wait for a PacketSent
then the packets are no longer sent in order. Third, and this is really a philosophical point, you have changed the responsibility and intent of the actor. It no longer models an unreliable network.
Instead of throwing an exception you could reply to the sender with a CommandFailed
message.
class UnreliableNetwork extends Actor {
import Protocol._
import scala.util.Random
def shouldDrop() = {
Random.nextInt(2) == 0
}
override def receive: Receive = {
case packet: Packet => if(shouldDrop()) {
sender() ! PacketSent(packet)
} else {
sender() ! CommandFailed(packet)
}
}
}
object Protocol {
sealed trait Command
case class Packet(id: Int) extends Command
sealed trait Event
case class PacketSent(packet: Packet) extends Event
case class CommandFailed(command: Command) extends Event
}
This is the pattern used by most of Akka’s IO extensions and it works really well if the sender is in the same JVM as you i.e. deploy = local
. Providing reliability then becomes the responsibility of another actor:
class ReliableNetwork(unreliableNetworkFactory: ActorRefFactory => ActorRef) extends Actor {
import scala.collection.mutable
import Protocol._
val queue = mutable.Queue.empty[(Packet, ActorRef)]
val unreliableNetwork = unreliableNetworkFactory(context)
override def receive: Receive = {
case packet: Packet =>
queue.enqueue(packet -> sender())
if (queue.size == 1) unreliableNetwork ! packet
case sent: PacketSent =>
val (_, commander) = queue.dequeue()
commander ! sent
if (!queue.isEmpty) {
val (packet, _) = queue.front
unreliableNetwork ! packet
}
case failed@CommandFailed(packet) =>
unreliableNetwork ! packet
}
}
This implementation ensures that packets are sent in order and that the PacketSent
events are sent back to the original sender. The mutable state means that, again, you make an implicit assumption about the parent actors SupervisorStrategy
however. Perhaps the biggest problem with the strategy is that it does not scale across JVM boundaries. This is because the CommandFailed
event may be lost between JVMs. The only solution to this is to use timeouts. A more detailed discussion can be found in the official documentation on Message Delivery Reliability.