Skip to content
edwardcapriolo edited this page Nov 21, 2013 · 4 revisions

A Feed in Teknek is an abstraction over an input source. Feed normally have partitions.

A Feed could represent an entity like a message queue. For most message queues offset storage is impossible as messages can only be consumed once.

Another source of Feed could be new entries in a relational database table organized by time. Because the events are persistent it is possible to store a position in the feed. This position can be used to pick up processing in the same place after an outage. It could also be used to replay data.

Let's look at some of the Feed API (As always there can be subtle changes but this should remain)

public abstract class Feed {
  private String name;
  protected Map<String,Object> properties;
  public abstract List<FeedPartition> getFeedPartitions();
}

Let's also look at the FeedPartition.

public abstract class FeedPartition {
  public abstract boolean next(ITuple t);
  public abstract boolean supportsOffsetManagement();  
  public abstract String getOffset();
  public abstract void setOffset(String offset);
}

If a feed returns true for supportsOffsetManagement() this means that getOffset() and setOffset(String) are supplied. These methods are called by OffsetStorage instances.

public abstract class OffsetStorage {
  protected FeedPartition feedPartiton;
  protected Plan plan;
  protected Map<String,String> properties;
  
  ....

  /** write offset to whatever the underlying storage is **/
  public abstract void persistOffset(Offset o);

  /** get the current offset of the feed */
  public abstract Offset getCurrentOffset();

  /**get the last offset to resume from progress */
  public abstract Offset findLatestPersistedOffset();
}

Zookeeper Offset Manager

The OffsetManager is plugable by plan. Currently the Zookeeper OffsetManager is supplied out of the box.

It stores the offsets like this /teknek/offset/plan_name-feed_name-feed_patition_id

The content of the node is the data returned from the Feed. This data is a string representing the position. Usually a number like "5" or "500034234234".

Enabling the offset Manager

The Plan has a top level field called .withOffsetStorageDesc. Set this to the appropriate class.

Map zkOffset = MapBuilder.makeMap(ZookeeperOffsetStorage.ZK_CONNECT,    
    "localhost:2181");
Plan p = new Plan()
  .withOffsetStorageDesc(new OffsetStorageDesc()
    .withOperatorClass(ZookeeperOffsetStorage.class.getName())
      .withParameters(zkOffset))

Benefits of per-plan-plugable

Because the interface is easily plugable this allows the user to chose the system that is right for you. For example you might have a feed with 2000 partitions and want to update the offset at 1 second increments. Designing a high performance Cassandra or sharded memcache offset manager might be better then crushing load on zookeeper. (Zookeeper is still reasonable performance with periodic 30 second offsets commits)