KTables are the way to go when working with state in Kafka Streams. Basically they are a materialized view on a topic where every message is an upsert of the last record with the same key. (Read duality of streams and tables).
KTables eventually contains every change published to the underlying topic.
Normal KTables only contain the data of the partitions consumed by the Kafka Streams application. If you run N instances of your application, a KTable will contain roughly total entries / N
entries. Every instance consumes a different set of Kafka partitions resulting in different KTable content.
If you’d like to have the same state in every instance, you’ll have to use a GlobalKTable. Its API is very similar to KTable but it contains the full state. So every instance of your application will read all partitions of the underlying topic and thus eventually have the same state.
Scenario’s to consider a GlobalTable over a normal one:
- to avoid co-partitioning when you have to join a stream with multiple tables or two tables with each other. (See this example)
- the content of the table is limited or changes not very often
- the table key is not a good candidate for evenly distributed partitioning. When joining this table, some instances will have a much bigger workload than others. This makes it hard to scale the application.
Some remarks when you work with GlobalKTables:
- GlobalKTables are fully populated before actual data processing starts (details). If your underlying topic is big, this might take a while. It makes sense to configure log compaction on that topic.
- Think twice if you can loose messages because of the topic retention strategy or non-durable state stores after an application crash. The state of the GlobalKTable can be assembled by one of the following two options, make sure the relevant data is available.
- all the messages of the underlying topic or
- the state store + the unread messages of the topic (lag)
- As of release 0.11, global tables checkpoints their offset. This improves reboot time if you use a durable state store. This is the default, but doesn’t make sense if you run your application inside a container without mount.
- GlobalKTables are populated in a different thread (
client_id-GlobalStreamThread
) with a consumer without a group. (See source code) So each consumer will read all messages on the topic. - When you join a KStream with a GlobalKTable, messages with
null
key or value are ignored and do not trigger a join. Make sure you input stream contains a key. If not, first map your stream and then join. (Read more)
stream
.map((k, v) -> new KeyValue(v.getId(), v))
.leftJoin(...)