Publish/Subscribe

The publish/subscribe pattern is built into Baratine’s core. The value of using the pub/sub pattern is that publishing services do not need to know the specific subscribers. The publishers and subscribers do need to agree on an interface, but otherwise they can be decoupled. As with the rest of Baratine, the interface for pub/sub is a Java interface. Messages are published by sending a method, which results in the method calls on each subscriber.

A publish/subscribe system consists of the pub/sub service that acts as a broker, services that publish, and services that subscribe.

The Baratine server includes built-in brokers. The ref:event-scheme broker is a basic memory-based broker. If no subscribers or consumers exist, it will drop the sent messages.

Subscription Example

The example uses event:///hello as the broker address. The subscribing service will subscribe to the broker, and the publishing service publishes to the address. The broker will send a copy of each call to every subscriber.

The MyHello interface documents the methods the publisher will send. As with the rest of Baratine, the subscriber does not need to explicly implement the interface. The hello interface looks like:

public interface MyHello
{
  void hello(String msg);
}

The subscriber will register itself with the event:///hello broker at startup and receive messages.

Subscriber:

import io.baratine.core.Startup;
import io.baratine.core.Service;

@Service
@Startup
public class MySubscriber
{
  @OnInit
  public void onInit()
  {
    ServiceManager manager = ServiceManager.current();
    ServiceRef eventRef = manager.lookup("event:///hello");

    eventRef.subscribe((MyHello) msg->hello(msg));
  }

  public void hello(String msg)
  {
    System.out.println("Hello: " + msg);
  }
}

The subscribe() method subscribes the service with the event:///hello broker. Once it’s subscribed, it will receive messages that are published to the broker.

Implicitly, the subscriber agrees on an interface and a broker address with any publisher, but does not need to know any details of the publisher itself.

Because the subscribe registers the service when the service starts in an @OnInit, the service has a @Startup to ensure it’s started automatically.

Publisher:

@Service("public:///my-publisher")
public class MyPublisher
{
  @Inject @Lookup("event:///test")
  MyHello _hello;

  public void publish()
  {
    _hello.hello("my-publish");
  }
}

The publisher can be any Baratine service. While it needs to know the aggreed broker address, event:///test, and the API MyHello, it doesn’t need to know explicitly that it’s sending to a pub/sub broker. As far as the publisher cares, it’s just sending to a service with the given name.

Consume and Subscribe

Subscription comes in two flavors: serviceref-consume and subscribe(). A broker will send a copy of a published message to every subscriber, but only to one consumer. Brokers will typically use round-robin to send messages to each consumer, although Baratine itself does not enforce round-robin as a requirement.

Consumer services are identical to subscriber services. The only difference is how the broker service distributes messages. Publishers have no control over the destination of the message.

A sample consumer:

@Service
@Startup
public class MyConsumer
{
  @OnInit
  public void onInit()
  {
    ServiceManager manager = ServiceManager.current();
    ServiceRef eventRef = manager.lookup("event:///foo");

    eventRef.consume((MyFoo) msg->hello(msg));
  }

  public void hello(String msg) { ... }
}

A sample subscriber:

@Service
@Startup
public class MySubscriber
{
  @OnInit
  public void onInit()
  {
    ServiceManager manager = ServiceManager.current();
    ServiceRef eventRef = manager.lookup("event:///foo");

    eventRef.subscribe((MyFoo) msg->hello(msg));
  }

  public void hello(String msg) { ... }
}

Programmatic API for subscribe/consume

The ServiceRef API is available for programmatic registration of subscribers, when it’s more convenient to create and register services in code.

Subscribing and unsubscribing:

@Inject ServiceManager _manager;
...

ServiceRef myBroker = _manager.lookup("event:///test");

ServiceRef mySub = _manager.service(new MyService());

myBroker.subscribe(mySub);
...
myBroker.unsubscribe(mySub);

Consuming and unsubscribing:

@Inject ServiceManager _manager;
...

ServiceRef myBroker = _manager.lookup("event:///test");

ServiceRef mySub = _manager.service(new MyService());

myBroker.consume(mySub);
...
myBroker.unsubscribe(mySub);