Built-in Services

BFS (Bartender File Service)

BFS is a distributed, partitioned file system that lies at the heart of Baratine.

Like Linux, at the root of the BFS file system is a set of logical mount points. Services’ configuration, state, and data are mounted as files that can be cat or grep for easy debugging.

BFS is built on top of Baratine’s Bartender Cluster Heartbeats system. The heartbeat system provides a decentralized and robust means to keep servers within a cluster in sync.

BFS is distributed and partitioned for big-data processing. BFS is co-located alongside Baratine services to provide data locality and ownership (a service would usually only write to its local BFS partition).

Obtaining BFS

Using BFS is similiar to using any other Baratine service. The scheme for BFS is bfs:

ServiceManager manager = ServiceManager.current();

BfsFile file = manager.lookup("bfs:///my-file.txt")
                      .as(BfsFile.class);

or using CDI (Context Dependency Injection):

@Inject @Lookup("bfs:///") root;
@Inject @Lookup("bfs:///proc") procDir;

Looking Up Files

The following paths are equivalent:

ServiceManager manager = ServiceManager.current();
BfsFile root = manager.lookup("bfs:///").as(BfsFile.class);
BfsFile path = root.lookup("foo/bar");
ServiceManager manager = ServiceManager.current();
BfsFile path = manager.lookup("bfs:///foo/bar").as(BfsFile.class);
@Inject @Lookup("bfs:///foo/bar") BfsFile path;

All three have the same address and both point to the same underlying file in BFS.

Reading a File

BfsFile file = ...;

file.openRead(inputStream -> {
    try {
      myReadFile(inputStream);
    }
    finally {
      inputStream.close();
    }
});

Writing to a File

BfsFile file = ...;

file.openWrite(outputStream -> {
    try {
      myWriteFile(outputStream);
    }
    finally {
      outputStream.close();
    }
});

Deleting a File

BfsFile file = ...;

file.delete(isSuccessful -> {
    System.out.println("deleted file result: " + isSuccessful);
});

Listing a directory

BfsFile file = ...;

file.list(childrenStrArray -> {
    for (String child : childrenStrArray) {
      BfsFile childFile = file.lookup(child);

      ...
    }
});

Watching a File for Changes

BfsFile file = ...;

Watch watch = updatedPathStr -> {
    doSomething(updatedPathStr);
};

CancelHandle cancel;

// watch a directory for new/deleted files, or a file for modifications
cancel = path.watch(watch);

// then when done, unregister watch
cancel.cancel();

Using BFS with JDK7 Files/Path API

BFS integrates with JDK7’s Files/Path API. The scheme for BFS is “bfs”.

FileSystem bfs = FileSystems.newFileSystem(new URI("bfs:///"));

Path root = bfs.getPath("/");

try {
  Path file = root.resolve("foo/test.txt");
  Files.write(file, "hello world".getBytes());

  DirectoryStream<Path> dirStream = Files.newDirectoryStream(root);

  for (Path p : dirStream) {
    System.out.println(p.toUri());
  }
}
catch (IOException e) {
  e.printStackTrace();
}

And you can also use java.nio.file.WatchService:

FileSystem bfs = FileSystems.newFileSystem(new URI("bfs:///"));

Path root = bfs.getPath("/");

try {
  WatchService watchService = bfs.newWatchService();
  WatchKey key = root.register(watchService, StandardWatchEventKinds.ENTRY_MODIFY);

  Path file = root.resolve("foo/test.txt");
  Files.write(file, "hello world".getBytes());

  List<WatchEvent<?>> eventList = watchService.take();

  for (WatchEvent event : eventList) {
    System.out.println("event: " + event.context());
  }
}
catch (IOException e) {
  e.printStackTrace();
}

Store Service

The store service provides a simple key-value store. The scheme for the store service is store.

Obtaining the Store Service

ServiceManager manager = ServiceManager.current();
Store<V> store = manager.lookup("store:///").as(Store.class);

or using injection:

@Inject @Lookup("store:///") Store<V> storeRoot;
@Inject @Lookup("store:///myStore") Store<V> store;

where <V> is your type parameter.

Looking Up Your Store

Generally, you don’t want to store everything at the root “store:///”. Given your service name “myService”, you want to use the store that’s owned by your service so that your service and store instance would reside on the same machine.

In other words, service “myService” should use “store:///myService”::

@Service("public:///myService")
public class MyService
{
  @Inject @Lookup("store:///myService") store;

  ...
}

Storing an Object

ArrayList<String> list = new ArrayList<String>();
list.add("a");
list.add("b");
list.add("c");

store.put("myList", list);

Getting an Object

store.get(
  "myList",
  list -> { System.out.println(list); }
);

Removing an Object

store.remove("myList");

Database Service

The database service gives access to the internal Kelp/Kraken database, which is also used for the Store Service and for BFS (Bartender File Service). The database is a fast key-value store with a log-structured backend for reliability.

The database uses a query syntax similar to SQL with some restrictions like no joins and a requirement that tables have a key, and some extensions like object columns.

Create database:

@Inject @Lookup("bardb:///")
DatabaseServiceSync _db;

void create()
{
   String sql = "create table test ("
                + "  id integer primary key,"
                + "  name string,"
                + "  data string"
                + ")";

   System.out.println("Create: " + _db.exec(sql));
   System.out.println("Table: " + _db.exec("show table test"));
}

Insert row:

void insert()
{
  String sql = "insert into test (id, name, data) values (?,?,?)";

  db.exec(sql, Result.ignore(), 17, "test", "test-data");
}

Note that the insert is non-blocking. The method completes when the insert message has been sent to kraken/kelp, not when the insert completes.

Select entry:

void select()
{
  String sql = "select id, name, data from test where id=?";

  Cursor cursor = db.findOne(sql, 17);

  if (cursor != null) {
    System.out.println("  id: " + cursor.getInt(1));
    System.out.println("  name: " + cursor.getString(2));
    System.out.println("  data: " + cursor.getString(3));
  }
}

Asynchronous Calls

Since the Kelp/Kraken database is a collection of Baratine services, the DatabaseService API supports asynchronous calls that are preferable for high-performance services.

Async select:

void select()
{
  String sql = "select id, name from test where id=?";

  _db.findOne(sql,
              c->{ System.out.println("Name: " + c.getString(2)); },
              17);

}

CREATE TABLE statement

The create syntax resembles SQL table create:

CREATE TABLE [pod .] name (
  column,+
)

Where column looks like:

name type [(size)] [[PRIMARY] KEY]

At least one of the columns must be a key. For multiple key tables, the order of keys is important because tables are sorted. An intelligent choice or keys and key order can improve query performance.

Column Types

Internally, the column types are divided into fixed-size and blob types. Each internal row is fixed-length with pointers to variable-sized data.

Fixed data:

* int32 (int, integer) - 32-bit integeter
* int64 (bigint) - 32-bit long
* double - 64-bit double
* bytes - fixed length byte data

Variable (blob) data:

* blob - variable binary data
* string - variable 16-bit unicode string (java)
* object - serialized object (hessian)

Internally, small blob data is store inline in the node block containing the row. Larger blobs are stored in their own pages. This architecture supports large blobs, including the multi-megabyte files saved in the Baratine filesystem.

The current set of column types is limited to those required internally to Baratine. The types may be expanded in future versions.

Pods and Table Names

By default, a created table belongs to the current pod for distribution. The rows are partitioned according to their keys across all servers in the pod. The pod can be overridden. The special pod local can be used to create a table that is local to the current server, not distributed.

Object Query Extensions

Objects can use a path extention in the where clause to query based on internal fields of objects. This query is used by the resource service query system to find objects by internal fields.

A query might look like:

SELECT id,value FROM test WHERE value._myField='my-value'

Where the values are Java objects like:

class MyBean {
  int _id;
  String _myField;
}

event: Service

The event service is an API-based publish/subscribe broker. Publishers send events using application APIs. Subscribers receive events with the same API.

The address for an event node is application-defined. By convention, the classname of the API can be used as the node name.

Event API

The event API is a Java interface. While Baratine doesn’t restrict the interface, using JDK-8 @FunctionalInterface to allow lambda subscribers is a useful pattern to follow.

MyEvent.java:

public interface MyEvent
{
  void onEvent(String msg);
}

Publisher

Since an event publisher is a proxy to the event scheme, it can be created and used in any context.

MyPub.java:

void execute()
{
  ServiceManager manager = ServiceManager.lookup();

  MyEvent pubEvent = manager.lookup("event:///" + MyEvent.class.getName())
                            .as(MyEvent.class);

  pubEvent.onEvent("test message");
}

Subscriber

Subscribers are typically Baratine services. Subscription should occur in an @OnInit method, because the callback needs to be in the service context. Subscribers may typically be @Startup services to register their subscriptions when the pod starts.

The callback for a message occurs in the service context that called the subscribe() method. Because services expect their callbacks to be in their own context, they should subscribe in an @OnInit method.

MySub.java:

@Service("/my-sub")
@Startup
public class MySub
{
  @OnInit
  private void onInit()
  {
    ServiceManager manager = ServiceManager.current();

    ServiceRef eventRef = manager.lookup("event:///" + MyEvent.class.getName());

    eventRef.subscribe(msg->onMessage(msg));
  }

  private void onMessage(String msg)
  {
    System.out.println("Message: " + msg);
  }
}

Timer Service

The timer service allows you to schedule tasks to be run once or multiple times on a schedule. The scheme for the timer service is timer.

Obtaining the Timer Service

TimerService timer = Services.getCurrentManager().lookup("timer:").as(TimeService.class);

or using CDI (Context Dependency Injection):

@Inject @Lookup("timer:") TimerService timer;

Running a task once

Runnable task = ()->{
  System.out.println("ran on: " + System.currentTimeMillis());
};

timer.runAfter(task, 5, TimeUnit.SECONDS);

Running a task with cron-syntax scheduling

Runnable runnable = () -> System.out.println("ran on: " + System.currentTimeMillis());

// run the task every two minutes on the 6th minute of the 7th hour everyday
timer.cron(runnable, "*/2 6 7 *");

Running a task with a custom scheduler that stops after 5 runs

timer.schedule(task, new TimerScheduler() {
    int count = 0;

    public long nextRunTime(long now) {
      if (count++ >= 5) {
        return -1; // negative value to cancel
      }
      else {
        return now + 2000; // run again 2 seconds from now
      }
    }
 };

Cancelling your task

CancelHandle cancel;

cancel = timer.runAfter(myTask, 5, TimeUnit.SECONDS);

cancel.cancel();

Getting run times of your task

MyRunnable runnable = () -> System.out.println("ran on: " + System.currentTimeMillis());

timer.cron(runnable, "*/2 6 7 *");

timer.getTask(runnable,
              taskInfo -> System.out.println(taskInfo.getLastRunTime()));