-
Notifications
You must be signed in to change notification settings - Fork 56
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add tiled implementation of the Flink app #627
Conversation
79eeecf
to
920cc4e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Largely looks good, few minor comments
extends RichFlatMapFunction[Map[String, Any], PutRequest] { | ||
@transient lazy val logger = LoggerFactory.getLogger(getClass) | ||
// This utility contains common code for AvroCodecFn and TiledAvroCodecFn | ||
sealed trait AvroCodecFnUtility { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about a name like BaseAvroCodecFn
? I tend to associate utility with singletons which isn't what you're getting at here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to make things explicit, we could make this an abstract class that extends from RichFlatMapFunction (so then subclasses know their contract is to provide groupByServingInfoParsed and fill out the open / flatMap methods)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agreed - BaseAvroCodecFn
is a better name and making the contracts clearer is a good idea.
case class AvroCodecFn[T](groupByServingInfoParsed: GroupByServingInfoParsed) | ||
extends RichFlatMapFunction[Map[String, Any], PutRequest] { | ||
@transient lazy val logger = LoggerFactory.getLogger(getClass) | ||
// This utility contains common code for AvroCodecFn and TiledAvroCodecFn |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets also highlight what the subclasses need to override / specialize
* Spark expression eval - Evaluates the Spark SQL expression in the GroupBy and projects and filters the input data | ||
* Avro conversion - Converts the Spark expr eval output to a form that can be written out to the KV store (PutRequest object) | ||
* KV store writer - Writes the PutRequest objects to the KV store using the AsyncDataStream API | ||
* Kafka source -> Spark expression eval -> Avro conversion -> KV store writer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this changes now with the new tiling operators right? Should we add those too or maybe we move the operator breakdown to the run*
methods
@@ -125,11 +88,76 @@ class FlinkJobIntegrationTest { | |||
// capture the datastream of the 'created' timestamps of all the written out events | |||
val writeEventCreatedDS = CollectSink.values.asScala | |||
|
|||
println(writeEventCreatedDS.size) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove?
* Find the smallest tail window resolution in a GroupBy. Returns None if the GroupBy does not define any windows. | ||
* The window resolutions are: 5 min for a GroupBy a window < 12 hrs, 1 hr for < 12 days, 1 day for > 12 days. | ||
* */ | ||
def getSmallestWindowResolutionInMillis(groupBy: GroupBy): Option[Long] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could live in Extensions
- we already have helper functions like MaxWindow
etc on GroupBy as implicits: https://github.com/airbnb/chronon/blob/master/api/src/main/scala/ai/chronon/api/Extensions.scala#L416
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extensions
does seem like a good place for this, but it would require api
to depend on aggregator
so it can use FiveMinuteResolution.calculateTailHop
. Or duplicating calculateTailHop
in Extensions
. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aggregator should already be able to access api/Extensions.scala
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@caiocamatta-stripe if IntelliJ doesn't auto-complete or detect the functions in Aggregator you might need to install a scala extension, because the implicits need a little help to work in IDE I think. But it should still compile from sbt.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I may be doing something silly..
- error message:
object aggregator is not a member of package ai.chronon
- code diff
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Feel free to move resolution & 5minResolution into api if you want to achieve this. But not necessary in this PR IMO
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Deleting my last comment, I see that you did create some tests. Will continue reviewing shortyl!
// We use Flink "Side Outputs" to track any late events that aren't computed. | ||
val tilingLateEventsTag = OutputTag[Map[String, Any]]("tiling-late-events") | ||
|
||
// The tiling operator works the following way: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice comments!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Everything looks pretty good to me. Might be good to get a stamp from @nikhilsimha too for this one.
List("id1") -> List(4.0), // Add up the double_val of the two 'id1' events | ||
List("id2") -> List(10.0) | ||
) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It does not necessarily need to be part of this PR, but eventually having a test case with events that fall outside of the tail of the window might be interesting to make sure that we tail-exclude properly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea! cc @piyushn-stripe - one of us can include it when we get a chance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah +1 to that. Would be a good follow up (for both the tiled & untiled)
hey @nikhilsimha can I get your review too pls? |
aggregator/src/main/scala/ai/chronon/aggregator/windowing/Resolution.scala
Outdated
Show resolved
Hide resolved
aggregator/src/main/scala/ai/chronon/aggregator/windowing/Resolution.scala
Outdated
Show resolved
Hide resolved
@@ -75,6 +75,8 @@ class AsyncKVStoreWriter(onlineImpl: Api, featureGroupName: String) | |||
// The context used for the future callbacks | |||
implicit lazy val executor: ExecutionContext = AsyncKVStoreWriter.ExecutionContextInstance | |||
|
|||
// One may want to use different KV stores depending on whether tiling is on. | |||
// The untiled version of Chronon works on "append" store semantics, and the tiled version works on "overwrite". |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❤️
* @tparam IN The input data type which contains the data to be avro-converted to bytes. | ||
* @tparam OUT The output data type (generally a PutRequest). | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the naming itself makes a lot of what is written here obvious.
I am personally feel auto generated doc strings reduce readability of code, but it is subjective.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it hurts here, but noted!
I'll leave it for now but don't mind removing it if anyone has strong opinions.
@@ -47,7 +58,25 @@ class FlinkJob[T](eventSrc: FlinkSource[T], | |||
// The source of our Flink application is a Kafka topic | |||
val kafkaTopic: String = groupByServingInfoParsed.groupBy.streamingSource.get.topic | |||
|
|||
/** | |||
* The "untiled" version of the Flink app. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I really like this style of comments - over the autogen doc strings
if (debug) { | ||
logger.info( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not logger.debug?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea was to copy what's done in BaseFetcher, but I do prefer logger.debug
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed to logger.debug
, I prefer it that way too
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Solid PR!
Can you also add a Readme to the chronon/flink dir with a rough control flow diagram?
@nikhilsimha, I can add a control flow in a follow-up PR. For now we have the doc on the Tiled Architecture (which I'm merging along with this PR). |
Summary
Adds a tiled implementation of the Flink app.
The tiled version of the Flink job adds a window / tiling operator. It has roughly the following DAG:
GroupBy
and projects and filters the input dataPutRequest
object)PutRequest
objects to the KV store using theAsyncDataStream
APIIn a subsequent PR I will add documentation on tiling in general -- probably in a markdown file. This documentation will be similar to the "Flink and Tiling" presentation I gave at the Stripe <> AirBnB summit we had.
Why / Goal
In our experience at Stripe, the tiled version of Chronon (Flink + Fetcher) has much lower latency and significantly better scalability than the untiled/regular version.
Test Plan
I added
Checklist
Reviewers
@piyushn-stripe @nikhilsimha @cristianfr