Baratine on GitHub

Pipes

Pipes Overview

Pipes are unidirectional queues between two services, a publisher and a consumer. Pipes are created with a subscription and torn down when complete.

Pipes are a super high-performance version of the inbox. Unlike the inbox, Pipes support flow-control. Flow-control allows the publisher to coordinate with the consumer to know when the consumer is ready to accept new messages. This allows the publisher to send messages as fast as the consumer can handle without blocking. If there is no flow control, whenever the consumer is overwhelmed and behind in processing messages, then the publisher would have to block and wait for the consumer to be ready again (which is detrimental in a non-blocking system).

With Pipes, you send messages, which can be any Java object. This contrasts with the inbox where you call methods.

Subscribing

A subscriber sends its callback to the PipeBroker at a particular address to start receiving messages. The callback executes in the subscriber’s service thread.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
@Startup
@Service
public void MyServiceSubscriber
{
  @Inject @Service("pipe:///auction-bid")
  PipeBroker<AuctionBid> _pipeBroker;

  @OnInit
  void onInit()
  {
    _pipeBroker.subscribe(PipeSub.of(this::onBid));
  }

  private void onBid(AuctionBid bid)
  {
    ...
  }
}

This subscriber uses the default subscriber flow control model: prefetch. If the subscriber gets behind, the queue will fill to the prefetch size and tell the publisher to stop sending more messages.

Publishing

A publisher sends its callback to the PipeBroker at a particular address to start publishing messages. The callback executes in the publisher’s service thread.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
@Startup
public void MyPublisher
{
  @Inject @Service("pipe:///auction-bid")
  PipeBroker<AuctionBid> _pipeBroker;

  PipePub<AuctionBid> _pipeOut;

  @OnInit
  void onInit()
  {
    _pipeBroker.publish((pub, ex) -> { _pipeOut = pub; });
  }

  // called to send the bid update
  private void bid(AuctionBid bid)
  {
    _pipeOut.next(bid);
  }
}

Note

A full pipe will block at pipe.next() call because this example doesn’t use flow control.

Pipe Flow Control

Flow control is a main reason to use pipes because subscribers may fall behind or freeze entirely.

With flow control, the publisher checks if the subscriber can handle more data. If the subscriber such as a frozen TCP client cannot accept more data, the publisher will pause itself. When space is available, the client notifies the publisher that it is ready to accept more data.

For example, a websocket chat service is a pub/sub broadcast service. One of the chat clients might freeze, but that frozen connection must not freeze other connections.

With flow control the publisher never sends more messages than the subscriber can handle, counted by credits. To do this the publisher follows two steps:

  • registers a callback to be notified when credits are available
  • checks if credits are available before sending messages

The credits() method on the Pipes class tells how many available credits there are, and the onAvailable() method registers the callback.

Some sample code might look like the following:

1
2
3
4
5
6
7
8
9
void sendStuff(Pipe<MyMessage> pipe)
{
  MyMessage msg;

  while (pipe.credits().available() > 0
         && (msg = myNextMessage()) != null) {
    pipe.next(msg);
  }
}

When credits are available, the publisher can be notified using an Credits.OnAvailable callback.

When the publisher is a client, it sends the flow with its registration call.

When the publisher is a service, it returns the flow with its result completion.

Example for a consumer service:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
@OnInit
void sendStuff(Result<Void> result)
{
  PipeBroker<T> pipeBroker = ...;

  pipeBroker.publish(result.of(this::onPipeRegister));
}

private Void onPipeRegister(Pipe<T> pipe)
{
  _pipe = pipe;
  pipe.credits().onAvailable(this::onAvailable);
}

private void onAvailable()
{
  ... // send data if pending
}

Example for a publisher service:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
public void subscribe(PipeSub<Void> result)
{
  Pipe<T> pipe = result.pipe();

  _pipe = pipe;
  pipe.credits().onAvailable(this::onAvailable);
  result.ok(null);
}

private void onAvailable()
{
  ... // send data if pending
}

Pipe Credit Chaining

When a service is in the middle, it acts as a subscriber and as a publisher. A websocket service that subscribes to a message broker is a subscriber to the message broker and a publisher to the websocket. For flow control to work, the credits from the websocket needs to be passed upstream to the publisher. This is pipe chaining.

Pipe chaining works even when the service is a filter that drops some of the messages, such as a news feed that filters selected topics.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
@Override
public void init(WebSocket<String> webSocket)
{
  Pipes<String> pipes = ...;

  pipes.subscribe(Pipe.in(new MyHandler(webSocket))
                      .chain(webSocket.credits()));
}

private class MyHandler implements Pipe<String>
{
  public void next(String data)
  {
    ...
  }
  ...
}

Pipe

Pipe is used by publishers to send data and by subscribers as a callback to receive data.

next(data) is Pipe’s main method. It sends or receives the next item in the pipe.

If the pipe is full and the publisher has enabled flow-control by registering an onAvailable callback, next will throw an exception because flow-control requires the publisher to check availability before sending.

If the publisher has not enabled flow-control, the default, a full pipe will block the publisher for a short time before throwing an exception. This blocking is used for gateway services between external libraries like a MQ broker that don’t support the pipe credits.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
public interface Pipe<T>
{
  void next(T value);
  void fail(Throwable exn);
  void close();
  boolean isClosed();

  // flow control
  default Credits credits();
  default void credits(Credits credits);

  // convenience registration methods
}

Credits

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
public interface Credits extends Cancel
{
  // credit sequence
  long get();
  void set(long credits);
  void add(int newCredits);

  // credits available for the publisher
  int available();

  // subscriber cancels the pipe
  void cancel();
  void fail(Throwable exn);

  // publisher callback when credits is available
  void onAvailable(OnAvailable onAvailable);

  // publisher delay when pipe is full
  void offerTimeout(long timeout, TimeUnit unit);
}

Credits.OnAvailable

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
public interface OnAvailable
{
  // credits may be available in the pipe
  void available();

  // exception from the subscriber
  default void fail(Throwable exn);

  // cancel pipe from the subscriber
  default void cancel();
}

PipeSub

A subscription method uses PipeSub to register its callback with a new pipe and send the pipe to the publisher.

The subscriber implements the pipe() method for the callback, or it can use PipeSub.in(pipe) to generated the PipeSub for convenience.

The receiving publisher calls pipe() to receive the completed pipe. The publisher can then call flow(FlowOut) on the pipe to register its flow control.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
@FunctionalInterface
public interface PipeSub<T> extends ResultChain<Void>
{
  default Pipe<T> pipe();

  void handle(T next, Throwable fail, boolean ok);

  // pipe initialization
  default int capacity();
  default int prefetch();
  default int credits();
}

Client code might look like the following:

1
2
3
4
5
6
public void onInit()
{
  Pipes<MyMessage> pipes = ...;

  pipes.subscribe(Pipes.in(new MyPipe()));
}

Or using JDK8 lambdas:

1
2
3
4
5
...
pipes.subscribe(Pipes.in(this::onMyMessage)
                     .prefetch(64)
                     .close(this::onMyClose)
                     .fail(this::onMyFail);

PipePub

1
2
3
4
5
6
7
public interface PipePub<T> extends Result<Pipe<T>>
{
  // pipe initialization
  PipePub<T> capacity(int capacity);
  PipePub<T> prefetch(int prefetch);
  PipePub<T> credits(long credits);
}

PipeBroker

The PipeBroker service is a broker available to any Baratine service at an address like “pipe:///my-channel”. The pipes service allows for multiple publishers and subscribers in a channel.

1
2
3
4
5
6
7
@Service("pipe://{param[0]}
public interface PipeBroker<T>
{
  void publish(PipePub<T> result);
  void subscribe(PipeSub<T> result);
  void consume(PipeSub<T> result);
}

A Pipes consumer is a queue service. Only one consumer will process a message, while every subscriber will receive the message. In other words, Pipes supports both queue and topic (pub/sub) models with one API.