Skip to content

FAQ:Aggregations

Andy Coates edited this page Oct 23, 2019 · 2 revisions

Why does KSQL output two events for each input event when aggregating a table?

Inspired by this SO question

Example

-- Given:x
-- Create Trades table:
create table Trades(TradeId string, AccountId string, Amount double) with (KAFKA_TOPIC = 'TradeHistory', VALUE_FORMAT = 'JSON', PARTITIONS=1, KEY='TradeId');

-- Run this in a different console to see the results of the aggregation:
-- pre version 5.4:
select AccountId, count(*) as Count, sum(Amount) as Total from Trades group by AccountId;
-- post version 5.4:
select AccountId, count(*) as Count, sum(Amount) as Total from Trades group by AccountId EMIT CHANGES;

-- When we:
INSERT INTO Trades (TradeId, AccountId, Amount) VALUES ('t1', 'acc1', 106.0);

-- Then the above select window outputs:
-- AccountId | Count | Sum
   acc1 | 1 | 106.0 

-- When we:
INSERT INTO Trades (TradeId, AccountId, Amount) VALUES ('t1', 'acc1', 107.0);

-- Then the above select window may output:
-- AccountId | Count | Sum
   acc1 | 0 | 0.0
   acc1 | 1 | 107.0

If you run this example locally you may get the two output rows shown above when you insert the update to the trade t1, or you may only see the last row. The question is why does the second insert potentially result in two output events, rather than one?

Answer

When KSQL sees an update to an existing row in a table it internally emits a CDC event, which contains the old and new value. Aggregations handle this by first undoing the old value, before applying the new value.

So, in the example above, when the second insert happens it's actually updating an existing row in the Trades table. So KSQL first undos the old value. This results in the COUNT going down by 1 to 0, and the SUM going down by the old value of 106.0 to 0. Then KSQL applies the new row value, which sees the COUNT going up by 1 to 1 and the SUM going up by the new value 107.0 to 107.0.

If a third insert was done for the same TradeId and AccountId then the same pattern would be seen, i.e. first the old value would be removed, resulting in the count and sum going to zero, before the new row is added.

Why does KSQL do this? Well, to help understand what might at first look like strange behaviour, let's consider what happens when we add a few more rows:

-- When we insert with different tradeId, but same AccountId:
INSERT INTO Trades (TradeId, AccountId, Amount) VALUES ('t2', 'acc1', 10.0);

-- Then select window will output
-- Single row as this the above is an insert of a new row, so no undo to do
-- COUNT is now 2, as 2 source table rows are contributing to the aggregate
-- SUM is now 117.0, as this is the sum of the two source trade's Amount
-- TradeId | Count | Sum
   acc1 | 2 | 117.0 

-- When we update the new trade to reference a different AccountId:
INSERT INTO Trades (TradeId, AccountId, Amount) VALUES ('t2', 'acc3', 10.0);

-- Then the above select window outputs:
-- First KSQL undoes the old value for tradeId 2:
--   This drops the count of trades against acc1 to a single trade
--   And drops the sum of Amount for acc1 to the amount of that single trade
-- AccountId | Count | Sum
   acc1 | 1 | 107.0
-- Then it applies the new aggregate value for the new AccountId:
--   This outputs a new row with AccountId acc3
--   With a single trade contributing to the aggregate, so COUNT of 1 and SUM of 10.0
-- AccountId | Count | Sum
   acc3 | 1 | 10.0

Hopefully, the above example makes the behaviour seem a little less strange. Undoing the old value is important to ensure the aggregate values are correct. The behaviour only seems strange when the old and new values both affect the same aggregate row.

By default, KSQL is configured to buffer results for up to 2 seconds, or 10MB of data, before flushing the results to Kafka. This is why you may see a slight delay on the output when inserting values in this example. If both output rows are buffered together then KSQL will suppress the first result. This is why you often do not see the intermediate row being output. The configurations commit.interval.ms and cache.max.bytes.buffering, which are set to 2 seconds and 10MB, respectively, can be used to tune this behaviour. Setting either of these settings to zero will cause KSQL to always output all intermediate results.

We have a Github issue to enhance KSQL to make use of Kafka Stream's Suppression functionality, which would allow user better control over how results are materialized, and avoid the intermediate output when the old and new values affect the same aggregate row.