Women in Technology

Women in Tech is a publication to highlight women in STEM, their accomplishments, career lessons, and stories.

Follow publication

Member-only story

How to avoid duplicate data in your kafka streaming application

Recently, I had to solve an interesting issue at work on our Kafka streaming application namely de-duplication.

A part of the pipeline reads messages from Kafka, decodes the messages, and then aggregates them, and uploads the result to object storage.

1) Read protobuf from kafka in batches
2) decode messages in batches
3) some logic with the messages
4) write files locally
5) upload local files
6) kafka consumer commit

Problem:

As you can see our application required no data loss, so we replaced auto-commit with a manual commit which is done after the data has been fully processed and uploaded. (here, the data is processed at least once). At times when the pipeline was unstable, we were seeing more data than expected. Eg: if the process dies somewhere on step 5 before step 6, when the process restarts the records already uploaded will be processed again, resulting in duplicate data.

Causes of Issue: Dynamic Partitioning- Subscribe

from kafka import KafkaConsumer

consumer = KafkaConsumer(
bootstrap_servers = 'hostname:port',
)
consumer.subscribe(topics=[list_of_topic_names])

Initially, we had n consumers running on k8 connected to the Kafka broker via subscribe. During a k8 upgrade (canary deployment) we had pods restart/upgrade one by one, which caused the Kafka broker to repetitively rebalance the partitions on the consumers. Which let the topic lag build up and a lot of duplicate issues.

Solution:

Part 1: Mitigate topic lag build up via static partitioning :

from kafka import KafkaConsumer

consumer = KafkaConsumer(
bootstrap_servers = 'hostname:port',
)
topic = "test-123"
number_of_partitions = len(consumer.list_topics(topic=topic, timeout=30).topics[topic].partitions)
PARTITIONS = []

for partition in range(pod_ordinal_number, number_of_partitions, number_of_consumers):
PARTITIONS.append(TopicPartition(topic, partition))

consumer.assign(PARTITIONS)

By implementing static partitioning where a consumer is manually assigned we got over the topic lag build-up on all partitions due to frequent rebalancing, now when a consumer goes down it doesn’t affect the other consumers. Hence, a lag doesn’t build up on partitions and only the restarted pods will have any case of duplicates.
This solution got us 70% towards a…

Create an account to read the full story.

The author made this story available to Medium members only.
If you’re new to Medium, create a new account to read this story on us.

Or, continue in mobile web

Already have an account? Sign in

Women in Technology
Women in Technology

Published in Women in Technology

Women in Tech is a publication to highlight women in STEM, their accomplishments, career lessons, and stories.

Responses (1)

Write a response