Kafka Streams Reduce vs Suppress

While reading up on the suppress() documentation, I saw that the time window will not advance unless records are being published to the topic, because it's based on event time. Right now, my code is outputting the final value for each key, because traffic on the topic is constant, but there are downtimes when that system is brought down, causing existing records in the state store to be "frozen". I was wondering what the difference is between just having reduce(), instead of reduce().suppress(). Does reduce() act like suppress() in that they are both event time driven? My understanding is that both are doing the same thing, aggregating the keys within a certain time window.

My topology is the following:

    final Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url", schemaRegistryUrl);
    final Serde<EligibilityKey> keySpecificAvroSerde = new SpecificAvroSerde<EligibilityKey>();
    keySpecificAvroSerde.configure(serdeConfig, true);
    final Serde<Eligibility> valueSpecificAvroSerde = new SpecificAvroSerde<Eligibility>();
    valueSpecificAvroSerde.configure(serdeConfig, false);

    // KStream<EligibilityKey, Eligibility>
    KStream<EligibilityKey, Eligibility> kStreamInput = builder.stream(input,
            Consumed.with(keySpecificAvroSerde, valueSpecificAvroSerde));

    // KStream<EligibilityKey, String>
    KStream<EligibilityKey, String> kStreamMapValues = kStreamInput
            .mapValues((key, value) -> Processor.process(key, value));

    // WindowBytesStoreSupplier
    WindowBytesStoreSupplier windowBytesStoreSupplier = Stores.inMemoryWindowStore("in-mem",
            Duration.ofSeconds(retentionPeriod), Duration.ofSeconds(windowSize), false);

    // Materialized
    Materialized<EligibilityKey, String, WindowStore<Bytes, byte[]>> materialized = Materialized
            .as(windowBytesStoreSupplier);
    materialized = Materialized.with(keySpecificAvroSerde, Serdes.String());

    // TimeWindows
    TimeWindows timeWindows = TimeWindows.of(Duration.ofSeconds(size)).advanceBy(Duration.ofSeconds(advance))
            .grace(Duration.ofSeconds(afterWindowEnd));

    // KTable<Windowed<EligibilityKey>, String>
    KTable<Windowed<EligibilityKey>, String> kTable = kStreamMapValues
            .groupByKey(Grouped.with(keySpecificAvroSerde, Serdes.String())).windowedBy(timeWindows)
            .reduce((a, b) -> b, materialized.withLoggingDisabled().withRetention(Duration.ofSeconds(retention)))
            .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded().withLoggingDisabled()));

    // KStream<Windowed<EligibilityKey>, String>
    KStream<Windowed<EligibilityKey>, String> kStreamOutput = kTable.toStream();


Read more here: https://stackoverflow.com/questions/64896979/kafka-streams-reduce-vs-suppress

Content Attribution

This content was originally published by mikeayonguyen at Recent Questions - Stack Overflow, and is syndicated here via their RSS feed. You can read the original post over there.

%d bloggers like this: