-
Notifications
You must be signed in to change notification settings - Fork 16
Esper Processor
Esper Processor is a specialized Jetstream processor implementation that encapsulates Esper. All events received by this processor are queued and then submitted to the Esper engine. The events processed are sent out to a listener to be delivered to an appropriate sink. New EPL statements can be hot deployed. It will be compiled at run time and submitted to the engine without disruption to the event stream. An exception listener can be plugged in to receive exceptions along with the event that failed to be processed. If an Advisory listener is plugged in, all exception events are sent to the AdvisoryListener with an advice. The advisory listener can then forward the events to a persistent queue for replay later.
<bean id="EsperProcessor" class="com.ebay.jetstream.event.processor.esper.EsperProcessor">
<property name="esperEventListener" ref="EsperEventListener" />
<property name="configuration" ref="EsperConfiguration" />
<property name="epl" ref="EPL" />
<property name="eventSinks">
<list>
<ref bean="ConsoleLogger" />
</list>
</property>
</bean>
<bean id="EsperEventListener" class="com.ebay.jetstream.event.processor.esper.EsperEventListener">
</bean>
<bean id="EsperConfiguration"
class="com.ebay.jetstream.event.processor.esper.EsperConfiguration">
<property name="internalTimerEnabled" value="true" />
<property name="msecResolution" value="1" />
<property name="timeSourceNano" value="false" />
<property name="declaredEvents" ref="EventDefinition" />
<property name="listenerDispatchTimeout" value="1000" />
<property name="listenerDispatchPreserveOrder" value="false" />
<property name="insertIntoDispatchTimeout" value="100" />
<property name="insertIntoDispatchPreserveOrder" value="false" />
<property name="threadPoolSize" value="2" />
<property name="queueSizeLimit" value="30000000" />
<property name="executionLogging" value="true" />
<property name="timerLogging" value="true" />
</bean>
When EPL statements use fields in the events, the event and the fields being accessed along with the type of the fields have to be declared to the engine. This needs to be submitted with the EPL
<bean id="EventDefinition" class="com.ebay.jetstream.event.processor.esper.EsperDeclaredEvents">
<property name="eventTypes">
<list>
<bean class="com.ebay.jetstream.event.processor.esper.MapEventType">
<property name="eventAlias" value="RawEvent"/>
<property name="eventFields">
<map>
<entry key="D1" value="java.lang.String"/>
<entry key="D2" value="java.lang.String"/>
<entry key="D3" value="java.lang.String"/>
<entry key="D4" value="java.lang.Integer"/>
<entry key="D5" value="java.lang.String"/>
<entry key="D6" value="java.lang.Long"/>
<entry key="D7" value="java.lang.Long"/>
<entry key="D8" value="java.lang.Boolean"/>
<entry key="D9" value="java.lang.Integer"/>
</map>
</property>
</bean>
</list>
</property>
</bean>
<bean id="EPL" class="com.ebay.jetstream.event.processor.esper.EPL">
<property name="statementBlock">
<value>
<![CDATA[
create context MCContext start @now end after 10 seconds;
context MCContext
insert into MetricAggregate select count(*) as count, D1, D2, ‘M1' as metricName from
RawEvent(D1 is not null and D2 is not null) group by D1, D2 output snapshot when terminated;
@OutputTo(“OMC")
@PublishOn(topics="ChanB/AggrEvent")
@ClusterAffinityTag(dimension=@CreateDimension(name="grpdim", dimensionspan=“D1, D2, M1"))
select * from MetricAggregate;
]]>
</value>
</property>
</bean>