Skip to content

Commit

Permalink
Update FetcherBase to use cache
Browse files Browse the repository at this point in the history
  • Loading branch information
caiocamatta-stripe committed Feb 21, 2024
1 parent de712ab commit c7cef67
Showing 1 changed file with 163 additions and 68 deletions.
231 changes: 163 additions & 68 deletions online/src/main/scala/ai/chronon/online/FetcherBase.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import ai.chronon.aggregator.windowing.{FinalBatchIr, SawtoothOnlineAggregator,
import ai.chronon.api.Constants.ChrononMetadataKey
import ai.chronon.api._
import ai.chronon.online.Fetcher.{ColumnSpec, PrefixedRequest, Request, Response}
import ai.chronon.online.FetcherCache.{BatchResponses, CachedBatchResponse, KvStoreBatchResponse}
import ai.chronon.online.KVStore.{GetRequest, GetResponse, TimedValue}
import ai.chronon.online.Metrics.Name
import ai.chronon.api.Extensions.{JoinOps, ThrowableOps, GroupByOps}
Expand All @@ -42,97 +43,126 @@ class FetcherBase(kvStore: KVStore,
metaDataSet: String = ChrononMetadataKey,
timeoutMillis: Long = 10000,
debug: Boolean = false)
extends MetadataStore(kvStore, metaDataSet, timeoutMillis) {
extends MetadataStore(kvStore, metaDataSet, timeoutMillis)
with FetcherCache {
import FetcherBase._

// a groupBy request is split into batchRequest and optionally a streamingRequest
// this method decodes bytes (of the appropriate avro schema) into chronon rows aggregates further if necessary
private def constructGroupByResponse(batchResponsesTry: Try[Seq[TimedValue]],
/**
* A groupBy request is split into batchRequest and optionally a streamingRequest. This method decodes bytes
* (of the appropriate avro schema) into chronon rows aggregates further if necessary.
*
* @param batchResponses a BatchResponses, which encapsulates either a response from kv store or a cached batch IR.
* @param streamingResponsesOpt a response from kv store, if the GroupBy was streaming data.
* @param oldServingInfo the GroupByServingInfo used to fetch the GroupBys.
* @param queryTimeMs the Request timestamp
* @param startTimeMs time when we started fetching the KV store
* @param overallLatency the time it took to get the values from the KV store
* @param context the Metrics.Context to use for recording metrics
* @param totalResponseValueBytes the total size of the response from the KV store
* @param keys the keys used to fetch the GroupBy
* @return
*/
private def constructGroupByResponse(batchResponses: BatchResponses,
streamingResponsesOpt: Option[Seq[TimedValue]],
oldServingInfo: GroupByServingInfoParsed,
queryTimeMs: Long,
startTimeMs: Long,
overallLatency: Long,
context: Metrics.Context,
totalResponseValueBytes: Int): Map[String, AnyRef] = {
totalResponseValueBytes: Int,
keys: Map[String, Any] // The keys are used only for caching
): Map[String, AnyRef] = {
@transient lazy val logger = LoggerFactory.getLogger(getClass)
val latestBatchValue = batchResponsesTry.map(_.maxBy(_.millis))
val servingInfo =
latestBatchValue.map(timedVal => updateServingInfo(timedVal.millis, oldServingInfo)).getOrElse(oldServingInfo)
batchResponsesTry.map {
reportKvResponse(context.withSuffix("batch"), _, queryTimeMs, overallLatency, totalResponseValueBytes)

val servingInfo = getServingInfo(oldServingInfo, batchResponses)

// Batch metrics
batchResponses match {
case kvStoreResponse: KvStoreBatchResponse =>
kvStoreResponse.response.map(
reportKvResponse(context.withSuffix("batch"), _, queryTimeMs, overallLatency, totalResponseValueBytes)
)
case _: CachedBatchResponse => // no-op;
}

// bulk upload didn't remove an older batch value - so we manually discard
val batchBytes: Array[Byte] = batchResponsesTry
.map(_.maxBy(_.millis))
.filter(_.millis >= servingInfo.batchEndTsMillis)
.map(_.bytes)
.getOrElse(null)
// The bulk upload may not have removed an older batch values. We manually discard all but the latest one.
val batchBytes: Array[Byte] = batchResponses.getBatchBytes(servingInfo.batchEndTsMillis)

val responseMap: Map[String, AnyRef] = if (servingInfo.groupBy.aggregations == null) { // no-agg
servingInfo.selectedCodec.decodeMap(batchBytes)
getMapResponseFromBatchResponse(batchResponses,
batchBytes,
servingInfo.selectedCodec.decodeMap,
servingInfo,
keys)
} else if (streamingResponsesOpt.isEmpty) { // snapshot accurate
servingInfo.outputCodec.decodeMap(batchBytes)
getMapResponseFromBatchResponse(batchResponses, batchBytes, servingInfo.outputCodec.decodeMap, servingInfo, keys)
} else { // temporal accurate
val streamingResponses = streamingResponsesOpt.get
val mutations: Boolean = servingInfo.groupByOps.dataModel == DataModel.Entities
val aggregator: SawtoothOnlineAggregator = servingInfo.aggregator

// Missing data
if (batchBytes == null && (streamingResponses == null || streamingResponses.isEmpty)) {
if (debug) logger.info("Both batch and streaming data are null")
null
} else {
reportKvResponse(context.withSuffix("streaming"),
streamingResponses,
queryTimeMs,
overallLatency,
totalResponseValueBytes)

val batchIr = toBatchIr(batchBytes, servingInfo)
val output: Array[Any] = if (servingInfo.isTilingEnabled) {
val streamingIrs: Iterator[TiledIr] = streamingResponses.iterator
.filter(tVal => tVal.millis >= servingInfo.batchEndTsMillis)
.map { tVal =>
val (tile, _) = servingInfo.tiledCodec.decodeTileIr(tVal.bytes)
TiledIr(tVal.millis, tile)
}
return null
}

// Streaming metrics
reportKvResponse(context.withSuffix("streaming"),
streamingResponses,
queryTimeMs,
overallLatency,
totalResponseValueBytes)

// If caching is enabled, we try to fetch the batch IR from the cache so we avoid the work of decoding it.
val batchIr: FinalBatchIr =
getBatchIrFromBatchResponse(batchResponses, batchBytes, servingInfo, toBatchIr, keys)

if (debug) {
val gson = new Gson()
logger.info(s"""
val output: Array[Any] = if (servingInfo.isTilingEnabled) {
val streamingIrs: Iterator[TiledIr] = streamingResponses.iterator
.filter(tVal => tVal.millis >= servingInfo.batchEndTsMillis)
.map { tVal =>
val (tile, _) = servingInfo.tiledCodec.decodeTileIr(tVal.bytes)
TiledIr(tVal.millis, tile)
}

if (debug) {
val gson = new Gson()
logger.info(s"""
|batch ir: ${gson.toJson(batchIr)}
|streamingIrs: ${gson.toJson(streamingIrs)}
|batchEnd in millis: ${servingInfo.batchEndTsMillis}
|queryTime in millis: $queryTimeMs
|""".stripMargin)
}
}

aggregator.lambdaAggregateFinalizedTiled(batchIr, streamingIrs, queryTimeMs)
} else {
val selectedCodec = servingInfo.groupByOps.dataModel match {
case DataModel.Events => servingInfo.valueAvroCodec
case DataModel.Entities => servingInfo.mutationValueAvroCodec
}
aggregator.lambdaAggregateFinalizedTiled(batchIr, streamingIrs, queryTimeMs)
} else {
val selectedCodec = servingInfo.groupByOps.dataModel match {
case DataModel.Events => servingInfo.valueAvroCodec
case DataModel.Entities => servingInfo.mutationValueAvroCodec
}

val streamingRows: Array[Row] = streamingResponses.iterator
.filter(tVal => tVal.millis >= servingInfo.batchEndTsMillis)
.map(tVal => selectedCodec.decodeRow(tVal.bytes, tVal.millis, mutations))
.toArray
val streamingRows: Array[Row] = streamingResponses.iterator
.filter(tVal => tVal.millis >= servingInfo.batchEndTsMillis)
.map(tVal => selectedCodec.decodeRow(tVal.bytes, tVal.millis, mutations))
.toArray

if (debug) {
val gson = new Gson()
logger.info(s"""
if (debug) {
val gson = new Gson()
logger.info(s"""
|batch ir: ${gson.toJson(batchIr)}
|streamingRows: ${gson.toJson(streamingRows)}
|batchEnd in millis: ${servingInfo.batchEndTsMillis}
|queryTime in millis: $queryTimeMs
|""".stripMargin)
}

aggregator.lambdaAggregateFinalized(batchIr, streamingRows.iterator, queryTimeMs, mutations)
}
servingInfo.outputCodec.fieldNames.iterator.zip(output.iterator.map(_.asInstanceOf[AnyRef])).toMap

aggregator.lambdaAggregateFinalized(batchIr, streamingRows.iterator, queryTimeMs, mutations)
}
servingInfo.outputCodec.fieldNames.iterator.zip(output.iterator.map(_.asInstanceOf[AnyRef])).toMap
}

context.distribution("group_by.latency.millis", System.currentTimeMillis() - startTimeMs)
responseMap
}
Expand All @@ -155,6 +185,43 @@ class FetcherBase(kvStore: KVStore,
((responseBytes.toDouble / totalResponseBytes.toDouble) * latencyMillis).toLong)
}

/**
* Get the latest serving information based on a batch response.
*
* The underlying metadata store used to store the latest GroupByServingInfoParsed will be updated if needed.
*
* @param oldServingInfo The previous serving information before fetching the latest KV store data.
* @param batchResponses the latest batch responses (either a fresh KV store response or a cached batch ir).
* @return the GroupByServingInfoParsed containing the latest serving information.
*/
private[online] def getServingInfo(oldServingInfo: GroupByServingInfoParsed,
batchResponses: BatchResponses): GroupByServingInfoParsed = {
batchResponses match {
case batchTimedValuesTry: KvStoreBatchResponse => {
val latestBatchValue: Try[TimedValue] = batchTimedValuesTry.response.map(_.maxBy(_.millis))
latestBatchValue.map(timedVal => updateServingInfo(timedVal.millis, oldServingInfo)).getOrElse(oldServingInfo)
}
case _: CachedBatchResponse => {
// If there was cached batch data, there's no point try to update the serving info; it would be the same.
// However, there's one edge case to be handled. If all batch requests are cached and we never hit the kv store,
// we will never try to update the serving info. In that case, if new batch data were to land, we would never
// know of it. So, we force a refresh here to ensure that we are still periodically asynchronously hitting the
// KV store to update the serving info. (See CHIP-1)
getGroupByServingInfo.refresh(oldServingInfo.groupByOps.metaData.name)

oldServingInfo
}
}
}

/**
* If `batchEndTs` is ahead of `groupByServingInfo.batchEndTsMillis`, update the MetadataStore with the new
* timestamp. In practice, this means that new batch data has landed, so future kvstore requests should fetch
* streaming data after the new batchEndTsMillis.
*
* @param batchEndTs the new batchEndTs from the latest batch data
* @param groupByServingInfo the current GroupByServingInfo
*/
private def updateServingInfo(batchEndTs: Long,
groupByServingInfo: GroupByServingInfoParsed): GroupByServingInfoParsed = {
val name = groupByServingInfo.groupBy.metaData.name
Expand Down Expand Up @@ -239,15 +306,25 @@ class FetcherBase(kvStore: KVStore,
}
request -> groupByRequestMetaTry
}.toSeq
val allRequests: Seq[GetRequest] = groupByRequestToKvRequest.flatMap {
case (_, Success(GroupByRequestMeta(_, batchRequest, streamingRequestOpt, _, _))) =>
Some(batchRequest) ++ streamingRequestOpt

// If caching is enabled, we check if any of the GetRequests are already cached. If so, we store them in a Map
// and avoid the work of re-fetching them.
val cachedRequests: Map[GetRequest, CachedBatchResponse] = getCachedRequests(groupByRequestToKvRequest)
// Collect cache metrics once per fetchGroupBys call; Caffeine metrics aren't tagged by groupBy
maybeBatchIrCache.foreach(cache =>
LRUCache.collectCaffeineCacheMetrics(caffeineMetricsContext, cache.cache, cache.cacheName))

val allRequestsToFetch: Seq[GetRequest] = groupByRequestToKvRequest.flatMap {
case (_, Success(GroupByRequestMeta(_, batchRequest, streamingRequestOpt, _, _))) => {
// If a request is cached, don't include it in the list of requests to fetch.
if (cachedRequests.contains(batchRequest)) streamingRequestOpt else Some(batchRequest) ++ streamingRequestOpt
}
case _ => Seq.empty
}

val startTimeMs = System.currentTimeMillis()
val kvResponseFuture: Future[Seq[GetResponse]] = if (allRequests.nonEmpty) {
kvStore.multiGet(allRequests)
val kvResponseFuture: Future[Seq[GetResponse]] = if (allRequestsToFetch.nonEmpty) {
kvStore.multiGet(allRequestsToFetch)
} else {
Future(Seq.empty[GetResponse])
}
Expand All @@ -264,36 +341,51 @@ class FetcherBase(kvStore: KVStore,
.filter(_.isSuccess)
.flatMap(_.get.map(v => Option(v.bytes).map(_.length).getOrElse(0)))
.sum

val responses: Seq[Response] = groupByRequestToKvRequest.iterator.map {
case (request, requestMetaTry) =>
val responseMapTry = requestMetaTry.map { requestMeta =>
val GroupByRequestMeta(groupByServingInfo, batchRequest, streamingRequestOpt, _, context) = requestMeta
context.count("multi_get.batch.size", allRequests.length)

context.count("multi_get.batch.size", allRequestsToFetch.length)
context.distribution("multi_get.bytes", totalResponseValueBytes)
context.distribution("multi_get.response.length", kvResponses.length)
context.distribution("multi_get.latency.millis", multiGetMillis)

// pick the batch version with highest timestamp
val batchResponseTryAll = responsesMap
.getOrElse(batchRequest,
Failure(
new IllegalStateException(
s"Couldn't find corresponding response for $batchRequest in responseMap")))
val batchResponses: BatchResponses =
// Check if the get request was cached. If so, use the cache. Otherwise, try to get it from response.
cachedRequests.get(batchRequest) match {
case None =>
BatchResponses(
responsesMap
.getOrElse(
batchRequest,
// Fail if response is neither in responsesMap nor in cache
Failure(new IllegalStateException(
s"Couldn't find corresponding response for $batchRequest in responseMap or cache"))
))
case Some(cachedResponse: CachedBatchResponse) => cachedResponse
}

val streamingResponsesOpt =
streamingRequestOpt.map(responsesMap.getOrElse(_, Success(Seq.empty)).getOrElse(Seq.empty))
val queryTs = request.atMillis.getOrElse(System.currentTimeMillis())

try {
if (debug)
logger.info(
s"Constructing response for groupBy: ${groupByServingInfo.groupByOps.metaData.getName} " +
s"for keys: ${request.keys}")
constructGroupByResponse(batchResponseTryAll,
constructGroupByResponse(batchResponses,
streamingResponsesOpt,
groupByServingInfo,
queryTs,
startTimeMs,
multiGetMillis,
context,
totalResponseValueBytes)
totalResponseValueBytes,
request.keys)
} catch {
case ex: Exception =>
// not all exceptions are due to stale schema, so we want to control how often we hit kv store
Expand All @@ -310,6 +402,9 @@ class FetcherBase(kvStore: KVStore,
}
}

/**
* Convert an array of bytes to a FinalBatchIr.
*/
def toBatchIr(bytes: Array[Byte], gbInfo: GroupByServingInfoParsed): FinalBatchIr = {
if (bytes == null) return null
val batchRecord =
Expand Down

0 comments on commit c7cef67

Please sign in to comment.