A photograph of a road and rail line Between Hapuku and Mangamaunu, South Island, New Zealand

This post in the second in a series on implementing publish/subscribe with Akka and Scala. The first post, Publish/Subscribe using Scala and Akka EventStream covers some facets of Scala and Akka that'll be skipped over in this article, so it may be worth a look if things aren't clear.

In Akka based publish/subscribe systems, publishers post messages to an event bus, and subscribers register subscriptions with that event bus. The bus takes care of filtering messages and delivering specific messages to those who have registered an interest in the channel that message is published to. Subscribers can register or deregister their subscriptions for any particular channel(s) at any time.

In this article we'll implement an event bus using lookup classifiers with subchannel classification.

An image depicting Publish/Subscribe using Scala and Akka EventBus



TL;DR

Just give me the code: GitHub

Subchannel Classification

The Akka docs describe subchannel classification as follows: If classifiers form a hierarchy and it is desired that subscription be possible not only at the leaf nodes, this classification may be just the right one.

It’s probably easiest explained this way: /event/42 is a subchannel of /event, therefore any subscription to /event/42 will only receive that event, whereas subscriptions to /event will receive all “events”. Note: I may be referring to events, payloads and/or messages throughout this article and they are to be treated as synonyms.

Enough of that. Let’s build our subchannel classification event bus using Akka EventBus.

SCEventBus.scala

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import akka.util.Subclassification
import akka.actor.ActorRef
import akka.event.{EventBus, SubchannelClassification}

object SCEventBus extends EventBus with SubchannelClassification {
  type Event = (String, Any, ActorRef)
  type Classifier = String
  type Subscriber = ActorRef

  override protected def classify(event: Event): Classifier = event._1

  protected def subclassification = new Subclassification[Classifier] {
    def isEqual(x: Classifier, y: Classifier) = x == y
    def isSubclass(x: Classifier, y: Classifier) = x.startsWith(y)
  }

  override protected def publish(event: Event, subscriber: Subscriber): Unit =
    subscriber.tell(event._2, event._3)
}


Please explain.

Here we’ve created our Subchannel classifying event bus by mixing in the EventBus traits and the SubchannelClassification trait. Next (lines 6, 7, 8) we must define the abstract types:

  • Event is the type of all events published on that bus
  • Subscriber is the type of subscribers allowed to register on that event bus
  • Classifier defines the classifier to be used in selecting subscribers for dispatching events

There’s a lot of stuff going on in the background which requires this type aliasing. If you’re interested in what’s happening behind the scenes, take a look at the Akka EventBus code on GitHub.

On line 6 we are defining our event, which in this case is a tuple with three items consisting of:

  • String representing the channel the event is to be published to (e.g. /event/42)
  • Any representing the actual payload to be published
  • ActorRef representing the sender of the event (EventBus does not preserve the publisher of events)

On line 10 we’re simply defining how our messages will be classified, and in this case, we’re supplying the first item of the event tuple event._1, which as explained above is our channel parameter.

On line 12 we’ve got a simple function that determines how to classify sub events.

Finally, on line 17 we have the publish function.

override protected def publish(event: Event, subscriber: Subscriber): Unit =
    subscriber.tell(event._2, event._3)

publish takes our (String, Any, ActorRef) tuple as Event and a Subscriber (our ActorRef who’s subscribed to the channel the event is being published to)1.

Now, we simply send our event payload and sender (2nd and 3rd tuple parameter) to the subscriber using the tell2 function which enacts a “fire and forget” strategy of sending a message asynchronously and returning immediately3.

The last thing to mention here is that our choice to use tell rather than ! is a deliberate one2.

Now, let’s create a file containing some helper functions to build our actors.

Actors.scala

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import akka.actor.{Actor, ActorRef, Props, ActorSystem}

sealed class Subscription(f: (Any, Subscription, ActorRef) => Unit) extends Actor {
  override def receive = { case (payload: Any) => f(payload, this, sender) }
}

object Actors {

  val system = ActorSystem()

  def create(actorType: Class[_], name: String, args: AnyRef): ActorRef = {
    val props = Props(actorType, args)
    val actor = system.actorOf(props, name = name)
    actor
  }

  // receive handlers
  def onReceive = (payload: Any, receiver: Any, sender: ActorRef) => {
    println(s"$sender -> $receiver: $payload")
  }

  def onReceive(log: (String) => Unit) =
    (payload: Any, receiver: Any, sender: ActorRef) => {
      log(s"$sender -> $receiver: $payload")
      println(s"$sender -> $receiver: $payload")
  }
}


Please explain.

On line 3 we define our Subscription actor who will be able to subscribe to and publish events on the event bus. Subscription takes a function f: (Any, Subscription, ActorRef) => Unit as a parameter argument and returns Unit (equivalent to Java void). Within our Subscription we need to override the receive function from Actor to tell our subscription to execute the function passed in at construction time when receiving a message matching (payload: Any).

Next, on line 9, we create an ActorSystem which is used to supervise top level actors. Only one of these can be built per application.

On line 11 we define a function to create our actors. The first parameter is the Class[_] parameter defining the class type that implements the actor we wish to construct. This particular function implementation is a generic function which has other potential uses outside this example which is why we have simply used the “place holder” _ in our Class[] parameter as we don’t wish to hard code it into the function definition. In our case we will be passing the Subscription type, i.e. ClassOf[Subscription] to this function. The next parameter, name is simply the name of the actor/subscriber and args contains the arguments needed by the constructor that implements the actor we create (e.g. the construction arguments needed by Subscription - a function with the signature f: (Any, Subscription, ActorRef) => Unit. This function then goes about constructing the actor in the usual way.

A final thing of note here is line 14. In scala, the return keyword is redundant as whatever is on the last line of the function is returned.

Finally, on lines 18 and 22, we define our various receive functions that’ll be used by subscribers (remember, we inject these into Subscription upon creation, which we will see an example of in Main.scala).

Oh, you’ll also notice value substitution in strings like so s"$sender -> $receiver: $payload"

Now we’ll create a simple logger to complement our demo.

Logger.scala

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import java.io.{PrintStream, IOException}

object Logger {

  val log = (ps: PrintStream, msg: String) => {
    try {
      ps.println(msg)
    }
    catch {
      case ioe: IOException => println("IOException: " + ioe.toString)
      case e: Exception => println("Exception: " + e.toString)
    }
  }

  def stop(ps: PrintStream) = ps.close()
}



All that’s left to do is write a test program to demonstrate our subchannel classification publish/subscribe system.

Main.scala

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import java.io.{File, FileOutputStream, PrintStream}

object Main extends App{

  // set up logger
  val ps = new PrintStream(new FileOutputStream(new File("output.txt")))
  val log = Logger.log(ps, _: String)

  // create subscribers
  val rootSubscriber = Actors.create(
    classOf[Subscription], "rootSubscriber", Actors.onReceive)

  val eventSubscriber = Actors.create(
    classOf[Subscription], "eventSubscriber", Actors.onReceive)

  val itemSubscriber = Actors.create(
    classOf[Subscription], "itemSubscriber", Actors.onReceive(log))

  // set up subscriptions
  SCEventBus.subscribe(rootSubscriber, "/")
  SCEventBus.subscribe(eventSubscriber, "/event")
  SCEventBus.subscribe(itemSubscriber, "/event/42")

  // create event publisher
  val eventPublisher = Actors.create(
    classOf[Subscription], "eventPublisher", Actors.onReceive)

  // generate some events
  SCEventBus.publish(("/", "payload A", eventPublisher))
  SCEventBus.publish(("/event", "payload B", eventPublisher))
  SCEventBus.publish(("/event/42", "payload C", eventPublisher))

  // clean up
  Logger.stop(ps)
}

Please explain.

Not much to tell here other than a few cursory notes.

On line 7 you’ll notice a call to Logger.log(ps, _: String). We have passed a placeholder _ as our second parameter to this function creating what’s called a “partially applied function”. This function basically takes the parameters we have supplied and returns a new function that only takes the parameters we have omitted. Now the log val contains a function that takes a String only.

Lines 20, 21, 22 are calls to publish that take a (channel, payload, sender) tuple as its only parameter1.

That’s it for now. Bye.


Notes

  1. Note: The Subscriber parameter is supplied to the publish function in the background (meaning that you don’t supply it in your call to publish), which will become apparent a bit later on in the article. Again, If you’re interested in what’s happening, take a look at the Akka EventBus code on GitHub 2

  2. You’ll often see tell represented in an alternative form, namely !. e.g. subscriber ! event.payload. Note: Normally, when the information is available, the ! call silently sends a second parameter containing the ActorRef of the Actor sending the event, along with the payload. e.g subscriber ! event.payload is the same as subscriber.tell(event.payload, sender), however, EventBus does not preserve the sender of the published messages. For this reason we are explicitly wrapping our sender in our event and then using the explicit tell call. Note++: If no information is available, or an event is sent from something other than an actor, the ! call will report the sender as DeadLetter 2

  3. Another function call: ask (rather than tell) sends a message asynchronously and returns a Future representing a possible reply. Ask can also be represented as ?. e.g. subscriber ? event.payload





Related Articles

Publish/Subscribe using Scala and Akka EventStream
Invocation Matters