Kafka: Handling Poison Pills

Kafka is a distributed event log and messages come and get processed in the order they get added. A poison pill is a message that our application is unable to process. This can lead to our consumer failing and if more messages are getting added, lead to the lag increasing which in turn means the amazing throughput we expect is now affected.

There are cases where a corrupt message stopping the consumption of more messages may be a good thing, like if the order of the messages is important to the consumer. In that case we can tolerate a disruption.

On the other hand, if ordering does not matter, then there’s no reason for one bad message that our app can’t process to hold up other messages that we can process, which is likely why you chose Kafka the first place.

In this article we’ll go over how we can achieve this by making use of retry and dead letter topics to prevent congestions. We’ll make use of spring-kafka as it provides support for such features.

Use case

Say we have an app that receives messages in topic: customer-ratings. We have two services that consume from this one topic. One that updates the rating for the given product and another that updates the customer loyalty where we keep track of their reviews so we reward them.

@Bean
public NewTopic topic() {
  return TopicBuilder.name("customer-ratings")
      .partitions(10)
      .replicas(1)
      .build();
}

Hands on example

I have created a simple spring application that uses spring-kafka to receive and updates ratings of a given product.

To ensure our app is resilient we’ll allow retries and add config for which topics to retry plus where to send failed messages.

@RetryableTopic(retryTopicSuffix = "-product-ratings-app-retry", dltTopicSuffix = "-product-ratings-app-dlt", attempts = "3")
@KafkaListener(id = "product-ratings-app", topics = "customer-ratings")
public void productRatingsApp(String in) {
	log.info("product-ratings-app: {}", in);
}

@RetryableTopic(retryTopicSuffix = "-customer-loyalty-app-retry", dltTopicSuffix = "-customer-loyalty-app-dlt", attempts = "3")
@KafkaListener(id = "customer-loyalty-app", topics = "customer-ratings")
public void customerLoyaltyApp(String in) {
	log.info("customer-loyalty-app: {}", in);
}

With above, we have set up two consumers for for the customer-ratings topic. In each, we define the suffix that should be used for the retry and dead letter topics. We are adding the app name to these topics to make our topics more meaningful. Finally, we’re setting maximum number of attempts, the default is 3. The number of attempts determine how many retry topics will be created. Because we have 3, we expect 2 additional retry topics created for each app (first attempt reading the message in customer-ratings then 2 more in the additional retry topics).

If we run our app we see this many topics created.

Now let’s say we have a bug in our customer-loyalty-app, we expect only the failed message to be retried and if it still cannot be processed, then it should be sent to the dead letter topic. We will update the consumer as shown below to trigger an error.

@RetryableTopic(retryTopicSuffix = "-customer-loyalty-app-retry", dltTopicSuffix = "-customer-loyalty-app-dlt", attempts = "3")
@KafkaListener(id = "customer-loyalty-app", topics = "customer-ratings")
public void customerLoyaltyApp(String in) {
	log.info("customer-loyalty-app: {}", in);
	if (Objects.equals(in, "test2")) {
		throw new IllegalStateException("don't feel like processing today");
	}
}

Then we’ll add application runner bean to queue messages to our topic.

@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
	return args -> {
		template.send("customer-ratings", "test1");
		template.send("customer-ratings", "test2");
		template.send("customer-ratings", "test3");
	};
}

If we run the app now, we see 3 retry attempts in the logs from our customer-loyalty-app.

INFO 17316 --- [           main] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-1] Instantiated an idempotent producer.
INFO 17316 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 3.6.1
INFO 17316 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 5e3c2b738d253ff5
INFO 17316 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1708629872653
INFO 17316 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-1] Cluster ID: Cot0DhGjTwWvzlPfvxmNtA
INFO 17316 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-1] ProducerId set to 8015 with epoch 0
INFO 17316 --- [tings-app-0-C-1] com.example.demo.DlqDemoApplication      : product-ratings-app: test1
INFO 17316 --- [yalty-app-0-C-1] com.example.demo.DlqDemoApplication      : customer-loyalty-app: test1
INFO 17316 --- [yalty-app-0-C-1] com.example.demo.DlqDemoApplication      : customer-loyalty-app: test2
INFO 17316 --- [tings-app-0-C-1] com.example.demo.DlqDemoApplication      : product-ratings-app: test2
INFO 17316 --- [tings-app-0-C-1] com.example.demo.DlqDemoApplication      : product-ratings-app: test3
WARN 17316 --- [yalty-app-0-C-1] o.s.k.l.DeadLetterPublishingRecoverer    : Destination resolver returned non-existent partition customer-ratings-customer-loyalty-app-retry-0-3, KafkaProducer will determine partition to use for this topic
INFO 17316 --- [yalty-app-0-C-1] fkaConsumerFactory$ExtendedKafkaConsumer : [Consumer clientId=consumer-customer-loyalty-app-7, groupId=customer-loyalty-app] Seeking to offset 2 for partition customer-ratings-3
INFO 17316 --- [p-retry-0-0-C-1] fkaConsumerFactory$ExtendedKafkaConsumer : [Consumer clientId=consumer-customer-loyalty-app-customer-loyalty-app-retry-0-4, groupId=customer-loyalty-app-customer-loyalty-app-retry-0] Seeking to offset 2 for partition customer-ratings-customer-loyalty-app-retry-0-0
INFO 17316 --- [yalty-app-0-C-1] com.example.demo.DlqDemoApplication      : customer-loyalty-app: test3
INFO 17316 --- [p-retry-0-0-C-1] com.example.demo.DlqDemoApplication      : customer-loyalty-app: test2
INFO 17316 --- [p-retry-1-0-C-1] fkaConsumerFactory$ExtendedKafkaConsumer : [Consumer clientId=consumer-customer-loyalty-app-customer-loyalty-app-retry-1-6, groupId=customer-loyalty-app-customer-loyalty-app-retry-1] Seeking to offset 2 for partition customer-ratings-customer-loyalty-app-retry-1-0
INFO 17316 --- [p-retry-1-0-C-1] com.example.demo.DlqDemoApplication      : customer-loyalty-app: test2
ERROR 17316 --- [p-retry-1-0-C-1] k.r.DeadLetterPublishingRecovererFactory : Record: topic = customer-ratings-customer-loyalty-app-retry-1, partition = 0, offset = 2, main topic = customer-ratings threw an error at topic customer-ratings-customer-loyalty-app-retry-1 and won't be retried. Sending to DLT with name customer-ratings-customer-loyalty-app-dlt.

And we check the logs we see, which tells us the retry has been exhausted and the message is not being sent to dead letter topic. customer-ratings threw an error at topic customer-ratings-customer-loyalty-app-retry-1 and won't be retried. Sending to DLT with name customer-ratings-customer-loyalty-app-dlt

Then if we were to fix the bug and we want to reprocess the messages, we would not queue to customer-rating because we don’t want all apps to consume the same message again, only the one that failed should consume ie customer-loyalty-app.

We can have a reprocess topic for each consumer and queue the message to the specific reprocess we want.

@Bean
public NewTopic productRatingReprocess() {
	return TopicBuilder.name("customer-ratings.product-ratings-app.reprocess")
			.partitions(10)
			.replicas(1)
			.build();
}

@Bean
public NewTopic customerLoyaltyReprocess() {
	return TopicBuilder.name("customer-ratings.customer-loyalty-app.reprocess")
			.partitions(10)
			.replicas(1)
			.build();
}

That’s it. We’re now able to avoid our app from being crashed by bad messages. Thanks for reading.