-
Notifications
You must be signed in to change notification settings - Fork 4
Reference Implementation
The reference implementation for Stroom Data is a java based process that allows you to host all of the different components from the Stroom specification within one process. You will need Java 8 in order to run it.
The service is started using java as follows.
java -jar multihost-0.1.jar stroomconfig.json
If you start it without any configuration parameters, it will start up a simple stream service that stores its data in "./streamdata/". You can give it a single configuration file - either as the name of a local file or as a url if you store the configuration in a service such as consul or in a stream on a different Stroom node.
The following snippet shows default configuration used if you do not supply one. If you do give a configuration at startup, but don't include all parameters, these are also the defaults that will be used.
{
"streams":{
"path":"./streamdata/",
"commit_batch_size":32,
"commit_batch_timeout":50
},
"api":{
"port":8080
}
}
You can run a Stroom service without any additional configuration - simply as a host of stream data. But things become a lot more interesting once you setup services that read and process documents as they are added to streams.
The configuration and management of services is handled through a separate api endpoint at /service/. To get the current set of services and their state, simply send a get request to /service/ to get the configuration and state of a specific service, send a get request to /service/:id
To create a new service, send a post request to /service/ with the new service configuration as the post body (see the following sub sections for configuration samples). To update a service send a put request with the new configuration as the post body to /service/:id this will stop the old service, create the new service and start it - it will however not reset its state. If you want to also reset its state so it truncates both its output and state streams, you can send your put request to /service/:id/reset.
You can manually change the status of a service at any time by sending a post request to /service/:id/:command - the available commands are stop, start, restart, reset, disable and enable.
Any streams that have not been disabled will automatically be started when Stroom is started. When you disable a stream, it is also stopped. When you enable a stream, it is also started.
A restart will reload any queries or scripts that service is dependent on, but will not reset its state.
Script files referenced by Stroom services must be stored in the Stroom instance. You can list the current set of scripts by sending a get request to /script/ you can retrieve a script by sending a get request to /script/:path you can add or update a script by sending a post request to /script/:path with the script in the body and finally you can delete a script by sending a delete request to /script/:path
The basic configuration of any filter has an id, a type, the source stream and the output stream. You can optionally also specify a state stream - if you do not ".state" will be appended to your output stream and used for that purpose.
{
"id":"my_filter",
"service":"filter",
"type":"",
"input_stream":"",
"output_stream":""
}
Streams in the same process as the filter can be referenced using the following notation to avoid the cost of http:
local://direct/stream/my_stream
The sample filter takes a sample of the input stream at random based on a given rate. You can use this to create a new stream that has a statistically predetermined subset of the original. The following sample configuration would take 10% of the input stream and put it on the output stream.
{
"id":"my_filter",
"service":"filter",
"type":"sample",
"sample_rate":0.1
"input_stream":"",
"output_stream":""
}
The http filter will make a post request for every document in the input stream with the contents of the document in the post body. If an empty response is given, it is assumed that nothing should be placed on the output stream for that document - otherwise the JSON returned from that endpoint will be put on the output stream.
{
"id":"my_filter",
"service":"filter",
"type":"http",
"url":"http://myhost/myscript",
"input_stream":"",
"output_stream":""
}
The javascript filter loads a local javascript file into Java's scripting engine. For every document in the input stream, it transfers the document to the scripting context and tries to execute a function called map with the given document as argument (in the form of a native javascript object). The output of the function will be put on the output stream unless it is null.
{
"id":"my_filter",
"service":"filter",
"type":"javascript",
"script":"myscript.js",
"input_stream":"",
"output_stream":""
}
The following sample shows what a usable javascript file could look like:
function map(obj){
// Do something with obj
return obj
}
The basic configuration of any aggregate function has an id, a type, the source stream and the output stream. You can optionally also specify a state stream - if you do not ".state" will be appended to your output stream and used for that purpose.
{
"id":"my_aggregate",
"service":"aggregate",
"type":"",
"input_stream":"",
"output_stream":""
}
Streams in the same process as the aggregate can be referenced using the following notation to avoid the cost of http:
local://direct/stream/my_stream
If you are creating a plain aggregate, the latest aggregate can be found as the last document in the output stream.
The http aggregate will make a post request for every document in the input stream with the contents of the document and the last aggregate in the post body.
{
"id":"my_aggregate",
"service":"aggregate",
"type":"http",
"url":"http://myhost/myreducescript",
"input_stream":"",
"output_stream":""
}
The format of the objects posted to the endpoint is as follows:
{
"aggregate":{},
"event":{}
}
Be aware that the aggregate value will be null for the very first document in the stream.
The javascript aggregate loads a local javascript file into Java's scripting engine. For every document in the input stream, it transfers the document and the last known aggregate to the scripting context and tries to execute a function called reduce with the given document and aggregate as arguments (in the form of native javascript objects). The output of the function will be used as the new aggregate.
{
"id":"my_aggregate",
"service":"aggregate",
"type":"javascript",
"script":"myreducer.js",
"input_stream":"",
"output_stream":""
}
The following sample shows what a usable javascript file could look like:
function reduce(aggregate,obj){
if(aggregate==null){
// Initialize aggregate
aggregate={}
}
// Do something with obj
return aggregate
}
If you specify a partition key, your aggregates will be partitioned by that. This means that for each unique partition key, there will be a separate aggregate. The aggregate function itself - wether it be javascript or over http - will work in exactly the same way.
The partition key is specified as a path into the json data of each document processed by the aggregate. The following sample shows a configuration that would use the value of user.id as the partition key (in this case an imaginary user id for the event being processed):
{
"id":"my_aggregate",
"service":"aggregate",
"type":"javascript",
"script":"myreducer.js",
"partition":"user.id",
"input_stream":"",
"output_stream":""
}
While the setup of the partitioned aggregate is simple, the output is a little more complex. The simple aggregate, simple puts each aggregate snapshot onto the output stream - this means you can just request the latest document in that stream to get the latest aggregate. For the partitioned aggregate, you need read the state stream which contains maps of partition keys into locations in the output stream. Not only must you read both streams, but you must read all the way through the state stream to get the latest location for the requested aggregate.
It is recommended that you use a query service that reads through the state stream and uses it to build up an in memory hash index of the last locations for each partition key. The reference implementation of stroom automatically does this for every partitioned aggregate service setup to run in the process. This lets you request the latest aggregate for a partition key by sending a get request to the following url: /aggregates/:id/:partition_key