Introduction

Baratine’s streams using the ResultStream interface are used in two distinct use-patterns:

  • Publish/Subscribe streams that expect long-lived updates, such as a chat-room client.
  • Map/Reduce streams to query one or more services in a Pod, a partitioned service cluster.

Map/Reduce Query Streams

Hello Example

Service code:

@Service("/hello")
public class HelloImpl
{
  public void myStream(ResultStream<String> result)
  {
    result.accept("hello");
    result.accept("world");
    result.complete();
  }
}

Proxy API:

import io.baratine.stream.*;

public interface HelloSync
{
  ResultStreamBuilder<String> myStream();
}

Client code:

HelloSync hello = ...;

hello.myStream()
     .reduce((x,y)-> x + "!" + y)
     .result(x->System.out.println(x));

Output:

hello!world

Stream/lambda interfaces (following JDK-8)

Services can create streamable methods using the ResultStreamBuilder and ResultStream arguments. The caller can build custom queries using the ResultStreamBuilder and caller lambda expressions.

The client API and service implementation will differ. The client API uses a ServiceBuilder and the service implementation has a corresponding ResultStream.

The service impl calls the ResultSink with all the objects in the stream using accept() and finishes by calling complete(). The call to complete() is required to finish the stream. An implementation might look like the following:

void myStream(String myArg, ResultStream<String> result)
{
  ArrayList<String> list = getList(myArg);

  for (String value : list) {
    result.accept(value);
  }

  result.complete();
}

The client can process the stream as it sees fit. Lambda expressions in the stream execute in the target method’s thread (not the caller’s thread.)

The following executes the println lambda for each stream value, in the MyServiceImpl's service thread:

myService.myStream("arg1")
         .forEach(x->System.out.println("arg: " + x))
         .exec();

The following selects strings that start with “myprefix-” and then concatenates them. The filter and the concatentation run in the MyServiceImpl thread, while the result println runs in the caller’s thread, because it’s a normal Result.

myService.myStream("arg1")
         .filter(x->x.startsWith("myprefix-"))
         .reduce((x,y)->x + "::" + y)
         .result(x->System.out.println("Result: " + x));

Stream calls across a pod

Stream calls across a pod become map/reduce calls if the pod has multiple nodes. A “pair” pod will run the stream call on both node-0 and node-1 and then combine the results before returning to the caller. The calling code is identical to the non-pod call:

myService = manager.lookup("pod://my-pod/my-service");
myService.myStream("arg1")
         .filter(x->x.startsWith("my-prefix")
         .reduce((x,y)->x + "::" + y)
         .result(x->System.out.println("Result: " + x));

Note that the lambda must be accessible to the remote service, because the JDK’s lambda object does not include the code itself, but merely a pointer to the code. To use new the lambda code in the service, the caller can point to a jar file:

myService.myStream("arg1")
         .filter(...)
         .reduce(...)

Baratine will deploy myjar.jar across the cluster by publishing it to BFS, and will use it when the service executes the stream code.

io.baratine.stream

The interfaces in io.baratine.stream duplicate equivalent interfaces in JDK-8 java.util.function with two important differences:

* They implement ``Serializable`` which is required to serialize lambdas for remote calls.
* They include both ``-Sync`` and ``-Async`` versions of the interface.

The -Async versions are important because a blocking lambda expression will block the entire service thread, potentially freezing all requests. If a stream lambda needs to wait for a remote result, the -Async allows it to release the thread.

Publish/Subscribe Streams

Streams can also be used for long-lived pub/sub applications, such as a chat room. Clients can either use the ResultStream directly or use a ResultStreamBuilder for more complicated queries.

Service

In this example, we send the results and complete the stream. A longer example would save the result as a registration.

@Service(“/hello”) public class HelloImpl {

public void myStream(ResultStream<String> result) {

result.accept(“hello”); result.accept(“world”); result.complete();

}

}

Client

The simple client prints the results stream as they arrive:

{
  ServiceManager manager = ServiceManager.lookup();

  Hello hello = manager.lookup("/hello").as(Hello.class);

  hello.myStream(msg->System.out.println(msg));
}

A stream based client uses the foreach.

{
  ServiceManager manager = ServiceManager.lookup();

  HelloSync hello = manager.lookup("/hello")
                           .as(HelloSync.class);

  hello.myStream()
       .foreach(msg->System.out.println(msg))
       .exec();
}