Skip to content

Messages duplicated after container restart #809

@yakhyadabo

Description

@yakhyadabo

Subject of the issue

I deploy Brooklin with Kubernetes to migrate data from one Kafka cluster to another.
For some reason the container dies and restart during the migration. At the end of the migration I noticed that there are more messages in the target cluster than there are in the source cluster. Most of the messages are duplicated.

I’d like to know how can I ensure that no message will be duplicated in the same data stream, even when the pod restarts after failure.

The datastream :

${BROOKLIN_HOME}/bin/brooklin-rest-client.sh -o CREATE -u http://localhost:32311/ -n test-mirroring-stream -s "kafkassl://kafka.kafka-non-prod:9092/.*” -c kafkaMirroringC -t kafkaTP -m '{"owner":"test-user","system.reuseExistingDestination":"false"}'

Config :

 ############################# Server Basics #############################
        brooklin.server.coordinator.cluster=brooklin-cluster
        brooklin.server.coordinator.zkAddress=zookeeper:2181/brooklin
        brooklin.server.httpPort=${HTTP_PORT}
        brooklin.server.connectorNames=kafkaC,kafkaMirroringC
        brooklin.server.transportProviderNames=kafkaTP
        brooklin.server.csvMetricsDir=/tmp/brooklin-example/

 ########################### Kafka connector Configs ######################
        brooklin.server.connector.kafkaC.factoryClassName=com.linkedin.datastream.connectors.kafka.KafkaConnectorFactory
        brooklin.server.connector.kafkaC.assignmentStrategyFactory=com.linkedin.datastream.server.assignment.BroadcastStrategyFactory
        brooklin.server.connector.kafkaC.assignmentStrategyFactory=com.linkedin.datastream.server.assignment.BroadcastStrategyFactory
        brooklin.server.connector.kafkaC.consummer.enable.auto.commit=false
        brooklin.server.connector.kafkaC.consummer.auto.offset.reset=latest
        brooklin.server.connector.kafkaC.retryCount=2147483647
        brooklin.server.connector.kafkaC.retrySleepDurationMs=2500

 ########################### Kafka Mirroring connector Configs ###################### (Source properties)
        brooklin.server.connector.kafkaMirroringC.factoryClassName=com.linkedin.datastream.connectors.kafka.mirrormaker.KafkaMirrorMakerConnectorFactory
        brooklin.server.connector.kafkaMirroringC.assignmentStrategyFactory=com.linkedin.datastream.server.assignment.BroadcastStrategyFactory
        brooklin.server.connector.kafkaMirroringC.isFlushlessModeEnabled=true
        brooklin.server.connector.kafkaMirroringC.flowControlEnabled=true
        brooklin.server.connector.kafkaMirroringC.maxInFlightMessagesThreshold=1
        brooklin.server.connector.kafkaMirroringC.minInFlightMessagesThreshold=1

 ########################### Kafka transport provider configs ###################### (Target properties)
        brooklin.server.transportProvider.kafkaTP.factoryClassName=com.linkedin.datastream.kafka.KafkaTransportProviderAdminFactory
        brooklin.server.transportProvider.kafkaTP.bootstrap.servers=kafka-headless.da-saas-rss-env-region.svc.cluster.local:9092
        brooklin.server.transportProvider.kafkaTP.zookeeper.connect=zookeeper:2181/kafka
        brooklin.server.transportProvider.kafkaTP.client.id=datastream-producer
        brooklin.server.transportProvider.kafkaTP.retries=2147483647
        brooklin.server.transportProvider.kafkaTP.acks=all
...

Your environment

  • Operating System: Alpine
  • Brooklin version: 1.0.2
  • Java version: openjdk:8u212
  • Kafka version: 2.5.0
  • ZooKeeper version: 3.6.1

Steps to reproduce

  1. Create a datastream in a pod.
  2. Restart the container in the pod.

Expected behaviour

The datastream should resume without duplicating any message that has already been copied.

Actual behaviour

The datastream doesn’t resume, most of the messages are duplicated.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions