Member stream consumer failed removing it from group

I have a Streams application that throws the following exception:

Exception in thread "<StreamsApp>-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=topic1, partition=0, offset=1
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:240)
    at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94)
    at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:411)
    at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:918)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:798)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720)
Caused by: org.apache.kafka.streams.errors.StreamsException: task [0_0] Abort sending since an error caught with a previous record (key null value {...} timestamp 1530812658459) to topic topic2 due to Failed to update metadata after 60000 ms.
You can increase producer parameter `retries` and `retry.backoff.ms` to avoid this error.

In Streams app I have the following configs:

props.put(StreamsConfig.producerPrefix(ProducerConfig.RETRIES_CONFIG), 5);
props.put(StreamsConfig.producerPrefix(ProducerConfig.RETRY_BACKOFF_MS_CONFIG), 300000);
props.put(StreamsConfig.producerPrefix(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG), 300000);

I checked kafka broker logs (I have a single kafka broker) and see the following logs related to this:

INFO [GroupCoordinator 1001]: Member <StreamsApp>-StreamThread-1-consumer-49d0a5b3-be2a-4b5c-a4ab-ced7a2484a02 in group <StreamsApp> has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2018-07-03 14:39:23,893] INFO [GroupCoordinator 1001]: Preparing to rebalance group <StreamsApp> with old generation 1 (__consumer_offsets-46) (kafka.coordinator.group.GroupCoordinator)
[2018-07-03 14:39:23,893] INFO [GroupCoordinator 1001]: Group <StreamsApp> with generation 2 is now empty (__consumer_offsets-46) (kafka.coordinator.group.GroupCoordinator)

I read somewhere that its related to consumer not calling poll() for quite some time and hence it is kicked out by the consumer coordinator and now the new consumer uses heartbeat as the failure detection protocol. I am not sure if this can be the reason since I am using Kafka version 1.1.0 and streams version 1.1.0 as well.

How can I avoid this failure scenario? For now I have to restart streams application every time this happens.

UPDATE-1:

I am trying to handle this StreamsException by enclosing the main in try-catch block but I cant catch the exception. What can be the reason for it and how can I catch it and exit the application? Currently streams-app is in a halt state and not doing anything after this exception.

UPDATE-2:

Following this and this I have updated my code to this:

final KafkaStreams streams = new KafkaStreams(topology, props);
final CountDownLatch latch = new CountDownLatch(1);

streams.setUncaughtExceptionHandler((Thread thread, Throwable throwable) -> {
    log.error("EXITTING");
    log.error(throwable.getMessage());
    streams.close(5 ,TimeUnit.SECONDS);
    latch.countDown();
    System.exit(-1);            
});

Now the exception is handled and logged. However, Streams app is not exited (its still running in terminal in halt state). Ctrl+C doesn't kill it. I have to kill it by getting pid of the process and calling kill on it.



Read more here: https://stackoverflow.com/questions/51239141/member-stream-consumer-failed-removing-it-from-group

Content Attribution

This content was originally published by el323 at Recent Questions - Stack Overflow, and is syndicated here via their RSS feed. You can read the original post over there.

%d bloggers like this: