I'm experiencing a strange situation.
My Confluent Kafka Connect S3 is processing around 8K messages/sec. What started happening randomly every few days is:
- The Fetcher class would try to fetch an offset (e.g. 158473047).
- It would get a "Fetch offset 158473047 is out of range for partition", then log the error and would reset the offset to ZERO.
- When I search for that offset in the __consumer_offsets, it is there. So it's not a case that my offset was removed from the __consumer_offsets topic.
- Due to this, my Kafka connector re-process a few hundred million messages.
Kafka Connect S3 log:
2020-05-11 05:18:41,372] INFO WorkerSinkTask{id=datalake-m3-to-s3-TEST-bulk-0} Committing offsets asynchronously using sequence number 16703: {M3.TEST-BULK-0=OffsetAndMetadata{offset=158473001, leaderEpoch=null, metadata=''}} (org.apache.kafka.connect.runtime.WorkerSinkTask:345)
[2020-05-11 05:18:42,020] INFO Opening record writer for: M3BULK/M3.TEST-BULK/year=2017/month=05/day=15/hour=18/M3.TEST-BULK+0+0158473001.snappy.parquet (io.confluent.connect.s3.format.parquet.ParquetRecordWriterProvider:69)
[2020-05-11 05:18:42,033] INFO [Consumer clientId=connector-consumer-datalake-m3-to-s3-TEST-bulk-0, groupId=connect-datalake-m3-to-s3-TEST-bulk] Fetch offset 158473047 is out of range for partition M3.TEST-BULK-0, resetting offset (org.apache.kafka.clients.consumer.internals.Fetcher:1256)
[2020-05-11 05:18:42,035] INFO [Consumer clientId=connector-consumer-datalake-m3-to-s3-TEST-bulk-0, groupId=connect-datalake-m3-to-s3-TEST-bulk] Resetting offset for partition M3.TEST-BULK-0 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState:385)
[2020-05-11 05:18:42,174] INFO Opening record writer for: M3BULK/M3.TEST-BULK/year=1970/month=01/day=01/hour=00/M3.TEST-BULK+0+0000000000.snappy.parquet (io.confluent.connect.s3.format.parquet.ParquetRecordWriterProvider:69)
Cluster Components:
- Kafka Build: confluent-community-2.12
- Installation Guide: https://docs.confluent.io/5.4.2/installation/installing_cp/rhel-centos.html
- Kafka node count: 3 (CentOS 7, 4 Cores, 16 GB RAM, 250 GB SSD each)
Kafka Broker server.properties
content:
auto.create.topics.enable=true
broker.id=1
compression.type=producer
confluent.support.metrics.enable=false
controlled.shutdown.enable=true
delete.topic.enable=true
group.initial.rebalance.delay.ms=3000
default.replication.factor=2
group.max.session.timeout.ms=900000
log.retention.check.interval.ms=60000
log.retention.hours=24
log.segment.bytes=67108864
log.segment.delete.delay.ms=1000
num.io.threads=8
num.network.threads=6
num.partitions=1
num.recovery.threads.per.data.dir=4
offsets.topic.replication.factor=3
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
socket.send.buffer.bytes=102400
transaction.state.log.min.isr=2
transaction.state.log.replication.factor=3
unclean.leader.election.enable=false
zookeeper.connection.timeout.ms=6000
log.dirs=/data/kafka-data
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://KAFKA01.CONFLUENT.LOCAL:9092
zookeeper.connect=KAFKA01.CONFLUENT.LOCAL:2181,KAFKA02.CONFLUENT.LOCAL:2181,KAFKA03.CONFLUENT.LOCAL:2181
Important Points:
- Cluster is NOT idle.
- Server time is correct.
- The offsets.retention.minute property is using the default value, which is 10080 minutes.
I really appreciate your help on this matter. Thank you!