Publish/Subscribe (aka: pub/sub) is a messaging pattern that helps decouple1 the sender and receiver of messages. Generally, this pattern is used across network(s) and provides ease of scalability and dynamic network topologies.
The publish/subscribe pattern can also be used within applications to provide scalability as an alternative to the more traditional Observable/Observer pattern as it offers some distinct advantages which I will address in a future article.
In the next few articles we'll look at some using Scala and Akka, such as this: Publish/Subscribe using Scala and Akka EventBus
TL;DR
Just give me the code: GitHub
EventStream
Let’s build our first example using the Akka main event bus: EventStream2.
EventStream.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import akka.actor.{Actor, Props, ActorSystem}
sealed class Subscriber(f: (String, Any) => Unit) extends Actor {
override def receive = { case (topic: String, payload: Any) => f(topic, payload) }
}
object EventStream{
// ActorSystem is a heavy object: create only one per application
// http://doc.example.io/docs/example/snapshot/scala/actors.html
val system = ActorSystem("actorsystem")
def subscribe(f: (String, Any) => Option[Unit], name: String) = {
val props = Props(classOf[Subscriber], f)
val subscriber = system.actorOf(props, name = name)
system.eventStream.subscribe(subscriber, classOf[(String, Any)])
}
def publish(topic: String, payload: Any) {
system.eventStream.publish(topic, payload)
}
}
Congratulations, you have now created your entire publish/subscribe infrastructure!
Please explain.
First we create our Subscriber
who will listen for messages on the event stream bus. This Actor will act on any messages matching the class (String, Any)
, which is simply a tuple of String
and Any
3, however you can create any class of message you desire4. Within our Subscriber
we need to override the receive
function from Actor
to tell our subscriber what to do when receiving a message matching the (String, Any)
class. Upon construction of our subscriber we pass in the function f: (String, Any) => Option[Unit]
which will be executed by the receive function each time a new message is received. The sealed
keyword means that this class can only be referred to within the file it is declared in, in this case, only EventStream.scala. One last thing worth mentioning here is that in the Scala language, object
declares a singleton.
1
2
3
sealed class Subscriber(f: (String, Any) => Unit) extends Actor {
override def receive = { case (topic: String, payload: Any) => f(topic, payload) }
}
Next, we create an ActorSystem
which is used to supervise top level actors. As the comments suggest, only one of these can be built per application.
1
2
3
// ActorSystem is a heavy object: create only one per application
// http://doc.example.io/docs/example/snapshot/scala/actors.html
val system = ActorSystem("actorsystem")
Now we define our subscribe function which takes a function f: (String, Any) => Option[Unit]
and a name
to represent the entity creating the subscription. Create our props
Props object which is merely a configuration class for the creation of Actors. Our props
constructor takes the Subscriber
type class we defined earlier, plus the function argument f
which we in turn use to create our subscriber. Finally, we register this subscriber with the system.eventStream
and start listening for messages.
1
2
3
4
5
def subscribe(f: (String, Any) => Option[Unit], name: String) = {
val props = Props(classOf[Subscriber], f)
val subscriber = system.actorOf(props, name = name)
system.eventStream.subscribe(subscriber, classOf[(String, Any)])
}
Our work here is done.
Now to demo the system, we’ll create a couple of simple subscribers.
Foo.scala
1
2
3
4
5
object Foo {
val onEvent = (topic: String, payload: Any) => Some(topic) collect {
case "topic A" => println("Foo received: topic = " + topic + ", payload = " + payload)
}
}
Bar.scala
1
2
3
4
5
6
7
object Bar {
val onEvent = (topic: String, payload: Any) => Some(topic) collect {
case "topic B" =>
println("Bar received: topic = " + topic + " payload = " + payload)
EventStream.publish("topic C", "payload C")
}
}
Please explain.
In Foo.scala and Bar.scala above, we’ve declared the function (topic: String, payload: Any) => Some(topic)
and assigned it to a val5. We are defining the return type Some(topic)
which represents any valid topic (i.e. is not None
), and the collect
pattern match which will only return a result where topic
matches one of the following case
s. Note: Some(value)
and None
are the two possible return types for the Option
monad.
Lastly, we’ll finish off with a subscriber that requires extra parameters for its onEvent
function.
Logger.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import java.io._
object Logger {
def onEvent(ps: PrintStream) = (topic: String, payload: Any) => Option[Unit] {
try {
println("Logger received: topic = " + topic + ", payload = " + payload)
ps.println("Logger [" + topic + "] [" + payload + "]")
}
catch {
case ioe: IOException => println("IOException: " + ioe.toString)
case e: Exception => println("Exception: " + e.toString)
}
}
def stop(ps: PrintStream) = ps.close()
}
To do this, we will define a function that takes the extra parameter(s), in this case (ps: PrintStream)
which itself returns a function that closes over the ps
parameter with signature (topic: String, payload: Any) => Option[Unit]
.
All that’s left to do is write a test program to demonstrate our publish/subscribe system.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import java.io.{File, FileOutputStream, PrintStream}
object Main {
val ps = new PrintStream(new FileOutputStream(new File("output.txt")))
def main(args: Array[String]) {
// setup subscriptions
EventStream.subscribe(Foo.onEvent, "foosubscriber")
EventStream.subscribe(Bar.onEvent, "barsubscriber")
EventStream.subscribe(Logger.onEvent(ps), "loggersubscriber")
run
}
def run {
EventStream.publish("topic A", "payload A")
EventStream.publish("topic B", "payload B")
stop
}
def stop = Logger.stop(ps)
}
That’s all for now.
In an upcoming article, I will demonstrate the publish/subscribe pattern using the Akka EventBus and LookupClassification
. Edit, the article in question is now up: Publish/Subscribe using Scala and Akka EventBus
Notes
-
Decoupling as far as space and time is concerned. Publish/Subscribe introduces a different type of coupling, namely: semantic coupling. ↩
-
EventStream is NOT a distributed solution and is only intended to be implemented within a single application. ↩
-
Scala type
Any
is roughly equivalent to Java Object, that is, the root type that all others derive from. ↩ -
e.g.
Subscriber(f: Message => Unit)
whereclass Message(topic: String, payload: Any)
however, wrapping plain values this way is bad practice and should be avoided. ↩ -
We could also design our system to do full pattern matching, however each
onEvent
type function would need to return a specific type (i.e.Unit
) and explicitly deal with the default case where no pattern could be matched. ↩