forked from saubury/wildlife-watch
-
Notifications
You must be signed in to change notification settings - Fork 0
/
02_ksql.ksql
36 lines (33 loc) · 1.33 KB
/
02_ksql.ksql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
create stream objects (camera_name VARCHAR, objects_found VARCHAR, objects_count VARCHAR)
WITH (KAFKA_TOPIC='objects', VALUE_FORMAT='json');
create stream animals as
select camera_name
, objects_count
, cast(extractjsonfield(objects_count, '$.person') as bigint) as person
, cast(extractjsonfield(objects_count, '$.bird') as bigint) as bird
, cast(extractjsonfield(objects_count, '$.horse') as bigint) as horse
, cast(extractjsonfield(objects_count, '$.sheep') as bigint) as sheep
, cast(extractjsonfield(objects_count, '$.elephant') as bigint) as elephant
, cast(extractjsonfield(objects_count, '$.bear') as bigint) as bear
, cast(extractjsonfield(objects_count, '$.zebra') as bigint) as zebra
, cast(extractjsonfield(objects_count, '$.giraffe') as bigint) as giraffe
, cast(extractjsonfield(objects_count, '$.teddybear') as bigint) as teddybear
from objects;
create table zoo as
select camera_name
, max(person) as max_person
, max(bird) as max_bird
, max(horse) as max_horse
, max(sheep) as max_sheep
, max(elephant) as max_elephant
, max(bear) as max_bear
, max(zebra) as max_zebra
, max(giraffe) as max_giraffe
, max(teddybear) as max_teddybear
from animals
window tumbling (size 30 seconds)
group by camera_name
emit changes;
create stream TEDDYTOPIC as select '📢 Just saw a 🧸 TEDDY BEAR in the garden' as message
from animals
where teddybear > 0 ;