ActorSystem
The ActorSystem provides local actor-based concurrency with supervision and lifecycle management.
Creating Actors
Define actors by extending the Actor trait:
case class EchoActor() extends Actor {
override def onMsg: PartialFunction[Any, Any] = {
case msg => println(s"Echo: $msg")
}
}
case class CounterActor(actorSystem: ActorSystem) extends Actor {
private var count = 0
override def onMsg: PartialFunction[Any, Any] = {
case n: Int =>
count += n
actorSystem.tell[EchoActor]("echo", s"Count is now $count")
case "get" => count
case "print" => println(s"Count is $count")
}
}
Setting Up the ActorSystem
// Create a new ActorSystem
val system = ActorSystem()
// Register actor types with their constructor arguments
system.registerProp(ActorProps.props[EchoActor]())
system.registerProp(ActorProps.props[CounterActor](system))
Sending Messages
// Send messages to named actor instances
system.tell[CounterActor]("counter1", 5)
system.tell[CounterActor]("counter1", "get")
system.tell[EchoActor]("echo1", "Hello!")
// Helper for repeated messages to same actor
val counter = system.tell[CounterActor]("counter1", _)
counter(1)
counter(2)
counter("print")
TypedActor - Type-Safe Message Handling
The TypedActor trait provides compile-time type safety for actor message handling. Instead of handling messages of type Any, typed actors constrain their messages to a specific type, reducing runtime errors and improving code clarity.
Basic TypedActor Usage
Define a sealed trait for your message types and extend TypedActor[M]:
sealed trait CounterMessage
case object Increment extends CounterMessage
case object Decrement extends CounterMessage
case class AddValue(value: Int) extends CounterMessage
case class TypedCounterActor() extends TypedActor[CounterMessage] {
private var count = 0
override def typedOnMsg: PartialFunction[CounterMessage, Any] = {
case Increment =>
count += 1
count
case Decrement =>
count -= 1
count
case AddValue(n) =>
count += n
count
}
}
Register and use typed actors just like regular actors:
val system = ActorSystem()
system.registerProp(ActorProps.props[TypedCounterActor]())
system.tell[TypedCounterActor]("counter", Increment)
system.tell[TypedCounterActor]("counter", AddValue(5))
system.tell[TypedCounterActor]("counter", Decrement)
Messages that don't match the expected type will be automatically sent to dead letters:
// This message won't be processed and goes to dead letters
system.tell[TypedCounterActor]("counter", "invalid message")
// Check dead letters to debug
val deadLetters = system.getDeadLetters(100)
TypedActor with Complex Message Hierarchies
TypedActor works well with rich message type hierarchies:
sealed trait DatabaseMessage
sealed trait QueryMessage extends DatabaseMessage
sealed trait CommandMessage extends DatabaseMessage
case class SelectQuery(table: String, conditions: Map[String, Any]) extends QueryMessage
case class InsertCommand(table: String, data: Map[String, Any]) extends CommandMessage
case class DeleteCommand(table: String, id: String) extends CommandMessage
case class DatabaseActor() extends TypedActor[DatabaseMessage] {
override def typedOnMsg: PartialFunction[DatabaseMessage, Any] = {
case SelectQuery(table, conditions) =>
// Execute select query
performSelect(table, conditions)
case InsertCommand(table, data) =>
// Execute insert
performInsert(table, data)
case DeleteCommand(table, id) =>
// Execute delete
performDelete(table, id)
}
}
StatefulActor - Immutable State Management
The StatefulActor trait extends TypedActor to provide built-in immutable state management. Instead of managing mutable state yourself, StatefulActor handles state updates through a functional approach.
Basic StatefulActor Usage
Define your state type, message types, and extend StatefulActor[State, Msg]:
case class CounterState(count: Int, history: List[String])
sealed trait CounterMessage
case object Increment extends CounterMessage
case object Decrement extends CounterMessage
case object Reset extends CounterMessage
case class AddValue(n: Int) extends CounterMessage
case class StatefulCounterActor() extends StatefulActor[CounterState, CounterMessage] {
override def initialState: CounterState =
CounterState(count = 0, history = List.empty)
override def statefulOnMsg: PartialFunction[CounterMessage, CounterState] = {
case Increment =>
state.copy(
count = state.count + 1,
history = state.history :+ "increment"
)
case Decrement =>
state.copy(
count = state.count - 1,
history = state.history :+ "decrement"
)
case AddValue(n) =>
state.copy(
count = state.count + n,
history = state.history :+ s"add $n"
)
case Reset =>
initialState
}
}
The statefulOnMsg method returns a new state for each message. Access the current state using the protected state method.
State Initialization and Updates
Key concepts for StatefulActor:
- initialState: Called when the actor starts and after restarts
- state: Protected accessor for reading current state
- statefulOnMsg: Returns new state; this becomes the current state for the next message
case class ShoppingCartState(
items: Map[String, Int],
total: Double,
discountApplied: Boolean
)
sealed trait CartMessage
case class AddItem(item: String, price: Double) extends CartMessage
case class RemoveItem(item: String) extends CartMessage
case class ApplyDiscount(percent: Double) extends CartMessage
case class ShoppingCartActor() extends StatefulActor[ShoppingCartState, CartMessage] {
override def initialState: ShoppingCartState =
ShoppingCartState(Map.empty, 0.0, discountApplied = false)
override def statefulOnMsg: PartialFunction[CartMessage, ShoppingCartState] = {
case AddItem(item, price) =>
val newItems = state.items.updatedWith(item) {
case Some(count) => Some(count + 1)
case None => Some(1)
}
state.copy(
items = newItems,
total = state.total + price
)
case RemoveItem(item) =>
state.items.get(item).fold(state) { count =>
if (count > 1) {
state.copy(items = state.items.updated(item, count - 1))
} else {
state.copy(items = state.items.removed(item))
}
}
case ApplyDiscount(percent) if !state.discountApplied =>
state.copy(
total = state.total * (1.0 - percent / 100.0),
discountApplied = true
)
case ApplyDiscount(_) =>
state // Already applied, no change
}
}
Request-Response Pattern (Ask)
The ask method enables request-response communication with actors, returning a Future with the response:
import scala.concurrent.duration.*
import scala.concurrent.Await
case class QueryActor() extends Actor {
override def onMsg: PartialFunction[Any, Any] = {
case ask: Ask[?] =>
ask.message match {
case "count" => ask.complete(42)
case query: String => ask.complete(s"Result for: $query")
case _ => ask.fail(new IllegalArgumentException("Unknown message"))
}
}
}
// Register the actor
system.registerProp(ActorProps.props[QueryActor]())
// Ask for a response with a timeout
val future = system.ask[QueryActor, Int]("query1", "count", 5.seconds)
val result = Await.result(future, 5.seconds) // result: Int = 42
// Handle responses asynchronously
system.ask[QueryActor, String]("query1", "search", 5.seconds).foreach { result =>
println(s"Got result: $result")
}
Actors should pattern match on Ask[?] to handle request-response messages. Use the complete method to send a successful response or fail to send an error.
Actor Hierarchy and Context
Each actor has an ActorContext that provides access to its identity, hierarchy, and lifecycle management:
case class SupervisorActor() extends Actor {
override def onMsg: PartialFunction[Any, Any] = {
case "create-workers" =>
// Create child actors under this supervisor
val worker1 = context.actorOf[WorkerActor]("worker1")
val worker2 = context.actorOf[WorkerActor]("worker2")
println(s"Created workers: ${context.children}")
case "stop-worker" =>
// Stop a specific child
context.children.headOption.foreach(child => context.stop(child))
case msg =>
// Send message to all children
context.children.foreach(child => context.system.tell(child, msg))
}
}
case class WorkerActor() extends Actor {
override def onMsg: PartialFunction[Any, Any] = {
case "parent" =>
println(s"My parent is: ${context.parent}")
case "self" =>
println(s"I am: ${context.self}")
case msg =>
println(s"Worker ${context.self.path.name} processing: $msg")
}
}
The ActorContext provides:
self- Reference to this actor's identityparent- Reference to the parent actor (if any)children- List of all child actorsactorOf[A](name)- Create a child actorstop(child)- Stop a child actorsystem- Access to the ActorSystem
Actor Paths
Actors are organized in a hierarchical path structure similar to file systems. All user actors live under the /user root:
// Top-level actors
val actor1 = system.actorOf[MyActor](ActorPath.user("myActor"))
println(actor1.path) // /user/myActor
// Child actors
val supervisor = system.actorOf[SupervisorActor](ActorPath.user("supervisor"))
// Inside SupervisorActor:
// context.actorOf[WorkerActor]("worker1") creates /user/supervisor/worker1
// Path operations
val path = ActorPath.user("parent") / "child" / "grandchild"
println(path) // /user/parent/child/grandchild
println(path.name) // "grandchild"
println(path.parent) // Some(/user/parent/child)
println(path.isChildOf(ActorPath.user("parent"))) // false
println(path.isDescendantOf(ActorPath.user("parent"))) // true
Lifecycle Hooks
Actors can override lifecycle hooks for initialization and cleanup:
case class DatabaseActor() extends Actor {
private var connection: Connection = null
override def preStart(): Unit = {
// Called when actor starts (before processing any messages)
connection = openDatabaseConnection()
println("Database connection opened")
}
override def postStop(): Unit = {
// Called when actor stops (after PoisonPill or system shutdown)
if (connection != null) connection.close()
println("Database connection closed")
}
override def preRestart(reason: Throwable): Unit = {
// Called before actor restarts (after failure)
println(s"Actor restarting due to: ${reason.getMessage}")
// Save any state that should survive the restart
}
override def postRestart(reason: Throwable): Unit = {
// Called after actor restarts (before processing new messages)
println("Actor restarted, reinitializing...")
// Restore state or reinitialize resources
}
override def onMsg: PartialFunction[Any, Any] = {
case query: String => connection.executeQuery(query)
}
}
Next Steps
- Learn about Advanced Features for supervision strategies and mailbox types
- Explore the EventBus for simpler pub/sub patterns
- Return to Keanu Overview