Queue with Application API

../../_images/example-queue.png

Queue Service

Introduction

A user of a queue service submits a request and continues processing without waiting for a result. Baratine’s services fit this fire-and-forget pattern naturally, because they are built around an inbox queue.

Benefits:

  • frees calling thread
  • cpu affinity
  • synchronization
  • can improve performance under load
  • can batch under load
  • can have an asynchronous Result

The queue pattern can be used for small, dynamic services like a TCP writer or a file log, or for larger services like PDF generation, or an email queue. The dynamic services are created programmatically. A WebSocket chat implementation might create a Baratine queue for each new connection.

  • TCP protocol writer
  • WebSocket
  • Printer
  • Database batching writer
  • Email service
  • Report generation

Internally, Baratine uses the service queue in several places. Its logging uses a service queue to avoid synchronization. JAMP WebSockets connections uses a service queue for synchronization and performance. Each database table writes updates using a page writing queue, and the database fsync uses a service queue for performance and to batch fsync data.

ReportQueue.java:

public interface ReportQueue
{
  void requestReport(String reportName);
}

The implementation is similar to the singleton service but does not return a result. Queue services might use blocking resources. Although Baratine is designed to support async development, it’s designed to support blocking services. For example, a WebSocket writer might block if the TCP is overloaded. A report writer might use blocking database calls.

ReportQueueImpl.java:

public class ReportQueueImpl implements HelloQueue
{
  private MyReportModule _myReportModule;

  public ReportQueueImpl(MyReportModule myReport)
  {
    _myReportModule = myReport;
  }

  public void requestReport(String reportName)
  {
    _myReportModule.requestReport(reportName);
  }
}

Dynamic Queue Deployment

A dynamic queue is created for each new resource, such as WebSocket connections, database connections, logging files. These dynamic queues don’t need fixed addresses.

Dynamic Queue:

{
  MyReportModule myReport = ...;

  manager = ServiceManager.current();

  ReportQueue queue;

  queue = manager.newService()
                 .service(new ReportQueueImpl(myReport))
                 .build()
                 .as(ReportQueue.class);
}

The ReportQueue proxy can now be used to submit report requests. The calling thread will submit its report request and continue. If the queue is idle, a new service thread will start processing the request. If the service thread is busy, the request will wait in the queue until the previous report is done. Under heavy load, the service thread gains efficiency because it can process requests with the CPU caches already filled.

Named Queue Deployment

A named queue has a fixed address, which adds flexibility to the clients, loosening the dependency.

Bound Queue with URL address:

manager = ServiceManager.current();

manager.newService()
       .address("/hello-queue")
       .service(new HelloImpl())
       .build();

HelloQueue queue = manager.lookup("/hello-queue")
                          .as(HelloQueue.class);

queue.hello("hello");

Queues Supporting Async Results

While queues are natually fire-and-forget, a result can be useful to tell when the operation is complete, and if it’s completed successfully. The Result interface returns an asynchronous value or an exception if the method fails.

HelloQueueAsync.java:

public interface HelloQueueAsync extends HelloQueue
{
  void hello(String arg, Result<Void> result);
}

An client can receive the result using four basic patterns:

  • Ignore the result using Result.ignore()
  • Simple callback with a lambda function..
  • Value and exception handling using Result.from()
  • Chaining results using result.from()

How the result callback is called depends on the caller context.

  • If the caller is another Baratine service, the result is dropped into the caller’s inbox. Results are processed on the service’s own thread.
  • If the caller is in the JVM context (or servlet), a thread is allocated for the callback.
  • If the caller is in a embedded framework like vert.x, the result can be placed in the frameworks own processing stack.

In this example, since we’re assuming a servlet context, the result callback would be on an allocated thread. This does mean that the servlet application would need to deal with synchronization. If you want to avoid synchronization issues, use a Baratine service to handle the callbacks.

{
  HelloQueueAsync queueAsync = manager.lookup("/hello-queue")
                                      .as(HelloQueueAsync.class);

  queueAsync.hello(Result.ignore());

  queueAsync.hello(x->System.out.println("Complete"));

  queueAsync.hello(Result.from(x->System.out.println("Complete"),
                               exn->exn.printStackTrace()));
}

Batching of Queue Items

One of Baratine’s core features is its automatic and implicit batching. Baratine will automatically batch requests and responses with batch sizes being dependent on load. A lightly-loaded system will usually have a batch size of 1, whereas a heavily-loaded system will have a batch size dependent on load and back pressure at that particular point in time.

Batching is implicit, meaning that your service deals with a method call at a time. However, you may hook your service into Baratine’s batching life-cycle events to do an expensive operation once per batch (instead of on every method call). Logging is a good example to batch because often times it becomes a bottleneck in by itself.

GitHub Repository

The code for this example is in GitHub.

The Batching Logging Service

Baratine provides a variety of lifecycle annotation hooks on methods. For now, we’ll introduce @OnInit, @OnDestroy, @BeforeBatch, and @AfterBatch.

LogServiceImpl.java

package tutorial.batching;

import io.baratine.core.AfterBatch;
import io.baratine.core.BeforeBatch;
import io.baratine.core.OnDestroy;
import io.baratine.core.OnInit;
import io.baratine.core.Result;
import io.baratine.core.Service;

import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintStream;

@Service("public:///log")
public class LogServiceImpl
{
  private PrintStream _out;

  public void log(String msg, Result<Void> result)
  {
    _out.println(msg);

    result.complete(null);
  }

  @OnInit
  public void onInit(Result<Void> result)
  {
    try {
      boolean isAutoFlush = false;

      _out = new PrintStream(new FileOutputStream("/tmp/tutorial_log.txt", true), isAutoFlush);

      result.complete(null);
    }
    catch (IOException e) {
      // catching exceptions is not required because Baratine will automatically
      // call fail() for uncaught exceptions
      result.fail(e);
    }
  }

  @BeforeBatch
  public void beforeBatch()
  {
    // nothing to do here
  }

  @AfterBatch
  public void afterBatch()
  {
    _out.flush();
  }

  @OnDestroy
  public void onShutdown(Result<Void> result)
  {
    _out.close();
    _out = null;

    result.complete(null);
  }
}

We’re using @OnInit to open the stream without automatic line flushing. This minimizes the chances that our log() method will block from calling PrintStream.println(). @OnDestroy closes the stream. @AfterBatch flushes the stream to disk, which is a very expensive and slow operation. We do it once per batch to reduce load on the disk, reduce service pauses, and to improve overall service responsiveness.

A life-cycle method can accept a Result argument. You may complete it with any value of any type at a later time, just as long as it is completed to signal to Baratine that you are done.

LogService.java

package example.logging;

public interface LogService
{
  public void log(String msg, Result<Void> result);
}

The above is the interface that Java clients will use to call our service. If you want log() to be fire-and-forgot, in other words, you do not care about the response, then just remove Result from the interface method:

public void log(String msg);

Baratine will not bother sending the response back to the client.

Baratine Deployment

Build the service jar and you can deploy to a running Baratine instance with:

$ bin/baratine.sh deploy batching.jar

Programmatic deployment

Services can also be programmatically deployed into an existing Baratine ServiceManager. The code to deploy the hello service looks like the following:

ServiceManager manager = ServiceManager.current();

manager.newService()
       .address("public:///log")
       .service(new HelloServiceImpl())
       .build();

To deploy to a JVM that is not already running Baratine, you’ll need to create the ServiceManager yourself, but the rest is the same:

ServiceManager manager = ServiceManager.newManager().build();

manager.newService()
       .address("public:///log")
       .service(new HelloServiceImpl())
       .build();

Then you can look up your service with:

LogService service = manager.lookup("/log")
                            .as(LogService.class);

Java Client

BaratineClient client = new BaratineClient("http://127.0.0.1:8085/s/pod");

LogService service = client.lookup("remote:///log").as(LogService.class);

// fire-and-forget
service.log("logme timbers");

Conclusion

You have just learned how to use Baratine’s life-cycle annotations to implement batching. Batching is a great way to dramatically reduce the number of expensive operations in applications.