Skip to content
shmurthy62 edited this page Feb 13, 2015 · 1 revision

Esper Processor is a specialized Jetstream processor implementation that encapsulates Esper. All events received by this processor are queued and then submitted to the Esper engine. The events processed are sent out to a listener to be delivered to an appropriate sink. New EPL statements can be hot deployed. It will be compiled at run time and submitted to the engine without disruption to the event stream. An exception listener can be plugged in to receive exceptions along with the event that failed to be processed. If an Advisory listener is plugged in, all exception events are sent to the AdvisoryListener with an advice. The advisory listener can then forward the events to a persistent queue for replay later.

Specifying Esper processor bean

<bean id="EsperProcessor" class="com.ebay.jetstream.event.processor.esper.EsperProcessor">
		<property name="esperEventListener" ref="EsperEventListener" />
		<property name="configuration" ref="EsperConfiguration" />
		<property name="epl" ref="EPL" />

		<property name="eventSinks">
			<list>
				<ref bean="ConsoleLogger" />
			</list>
		</property>
</bean>
    

Specifying the Esper Event Listener

<bean id="EsperEventListener"	class="com.ebay.jetstream.event.processor.esper.EsperEventListener">	
</bean>

Specifying the Esper Configuration Bean

<bean id="EsperConfiguration"
		class="com.ebay.jetstream.event.processor.esper.EsperConfiguration">
		<property name="internalTimerEnabled" value="true" />
		<property name="msecResolution" value="1" />
		<property name="timeSourceNano" value="false" />
		<property name="declaredEvents" ref="EventDefinition" />
		<property name="listenerDispatchTimeout" value="1000" />
		<property name="listenerDispatchPreserveOrder" value="false" />

		<property name="insertIntoDispatchTimeout" value="100" />
		<property name="insertIntoDispatchPreserveOrder" value="false" />
		<property name="threadPoolSize" value="2" />
		<property name="queueSizeLimit" value="30000000" />
		<property name="executionLogging" value="true" />

		<property name="timerLogging" value="true" />
</bean>

Specifying Event Definition

When EPL statements use fields in the events, the event and the fields being accessed along with the type of the fields have to be declared to the engine. This needs to be submitted with the EPL

<bean id="EventDefinition" class="com.ebay.jetstream.event.processor.esper.EsperDeclaredEvents">
		<property name="eventTypes">
			<list>

				<bean class="com.ebay.jetstream.event.processor.esper.MapEventType">
					<property name="eventAlias" value="RawEvent"/>
					<property name="eventFields">
						<map>
							<entry key="D1" value="java.lang.String"/>
							<entry key="D2" value="java.lang.String"/>
							<entry key="D3" value="java.lang.String"/>
							<entry key="D4" value="java.lang.Integer"/>
							<entry key="D5" value="java.lang.String"/>
							<entry key="D6" value="java.lang.Long"/>
							<entry key="D7" value="java.lang.Long"/>
							<entry key="D8" value="java.lang.Boolean"/>
							<entry key="D9" value="java.lang.Integer"/>
						</map>
					</property>
				</bean>

								
			</list>
		</property>
</bean>

Specifying EPL

<bean id="EPL" class="com.ebay.jetstream.event.processor.esper.EPL">
     <property name="statementBlock">
      <value>
        <![CDATA[ 
            create context MCContext start @now end after 10 seconds;
            context MCContext
            insert into MetricAggregate select count(*) as count, D1, D2, ‘M1' as metricName from       
RawEvent(D1 is not null and D2 is not null) group by D1, D2 output snapshot when terminated; 

            @OutputTo(“OMC")
            @PublishOn(topics="ChanB/AggrEvent")
            @ClusterAffinityTag(dimension=@CreateDimension(name="grpdim", dimensionspan=“D1, D2, M1")) 
            select * from MetricAggregate;  
        ]]>
      </value>
     </property>
</bean>