简介 跟 Spring Data Redis、Spring Data MongoDB、Spring Data JPA 等项目类似,Spring Kafka 提供了在 Spring 应用中通过简单配置从而访问 Kafka 集群的途径。
本文主要介绍在 Spring 应用中消息生产者如何向 Kafka 集群发送消息 、消息消费者如何消费消息 、如何批量消费消息 以及多消费者组同时消费消息 等等。
使用 Spring Kafka 的最新特性,以下测试代码采用了 Spring Boot 2.0.0 构建
Spring Kafka 的基本用法 在 pom.xml 中添加依赖: 1 2 3 4 <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
基本配置 springBoot properties 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 #kafka,更多配置:org.springframework.boot.autoconfigure.kafka.KafkaProperties #指定kafka 代理地址,可以多个 spring.kafka.bootstrap-servers=127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094 #指定默认topic id spring.kafka.template.default-topic=topic-test #指定listener 容器中的线程数,用于提高并发量 spring.kafka.listener.concurrency=3 #=============== provider ======================= #生产者重试次数 spring.kafka.producer.retries=0 #每次批量发送消息的数量 16K spring.kafka.producer.batch-size=16384 # 32M spring.kafka.producer.buffer-memory=33554432 #指定消息key和消息体的编解码方式 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer #=============== consumer ======================= #指定默认消费者group id spring.kafka.consumer.group-id=myGroup1 #若设置为earliest,那么会从头开始读partition spring.kafka.consumer.auto-offset-reset=latest spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
普通 Maven 构建项目,或者想要自定义更多配置,可以采用 JavaConfig 配置 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 @Bean public Map<String, Object> producerConfigs () { Map<String, Object> props = Maps.newHashMap(); props.put(ProducerConfig.ACKS_CONFIG, "0" ); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.RETRIES_CONFIG, retries); props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); props.put(ProducerConfig.LINGER_MS_CONFIG, 1 ); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } @Bean public ProducerFactory<String, String> producerFactory () { return new DefaultKafkaProducerFactory <>(producerConfigs()); } @Bean public KafkaTemplate<String, String> kafkaTemplate () { return new KafkaTemplate <>(producerFactory()); } @Bean public Map<String, Object> consumerConfigs () { Map<String, Object> propsMap = new HashMap <>(); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, propsConfig.getBroker()); propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, propsConfig.getEnableAutoCommit()); propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100" ); propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000" ); propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, propsConfig.getGroupId()); propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, propsConfig.getAutoOffsetReset()); propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50 ); return propsMap; } @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory () { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory <>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(4 ); factory.setBatchListener(true ); factory.getContainerProperties().setPollTimeout(3000 ); return factory; } }
详细含义参考官方文档 : https://kafka.apache.org/documentation/#producerapi
第一个消息生产 / 消费的实例 SimpleProducer :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 @Component public class SimpleProducer { private static final Logger logger = LoggerFactory.getLogger(SimpleProducer.class); @Resource private KafkaTemplate<Object, Object> kafkaTemplate; public void send (String topic, String msg) { try { kafkaTemplate.send(topic, msg); logger.info("推送数据成功!" ); } catch (Exception e) { logger.error(MessageFormat.format("推送数据出错,topic:{},data:{}" ,topic,msg)); } } public void send (String topic, List<String> msgs) { msgs.forEach(msg -> kafkaTemplate.send(topic, msg)); } }
SimpleConsumer :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 @Component public class SimpleConsumer { private static final Logger logger = LoggerFactory.getLogger(SimpleProducer.class); @Resource private KafkaTemplate<String, String> kafkaTemplate; @KafkaListener(id = "test", topics = {"topicName"}) public void listen (String data) { System.out.println("SimpleConsumer收到消息:" + data); logger.info(MessageFormat.format("SimpleConsumer收到消息:{}" , data)); } }
批量消费消息 如果生产者写入消息的速度比消费者读取的速度快的情况下,随着时间增长,消息堆积会越来越严重。 对于这种场景,我们需要增加多个消费者来进行水平扩展。 Kafka 消费者是消费组 的一部分,当多个消费者形成一个消费组来消费主题时,每个消费者会收到不同分区的消息 。
假设有一个 T1 主题,该主题有 4 个分区;同时我们有一个消费组 G1,这个消费组只有一个消费者 C1。那么消费者 C1 将会收到这 4 个分区的消息,如下所示:
如果我们增加新的消费者 C2 到消费组 G1,那么每个消费者将会分别收到两个分区的消息,如下所示:
如果增加到 4 个消费者,那么每个消费者将会分别收到一个分区的消息,如下所示:
但如果我们继续增加消费者到这个消费组,剩余的消费者将会空闲,不会收到任何消息:
总而言之,我们可以通过增加消费组的消费者来进行水平扩展提升消费能力。这也是为什么建议创建主题时使用比较多的分区数,这样可以在消费负载高的情况下增加消费者来提升性能。另外,消费者的数量不应该比分区数多,因为多出来的消费者是空闲的,没有任何帮助
Kafka 一个很重要的特性就是,只需写入一次消息,可以支持任意多的应用读取这个消息。 换句话说,每个应用都可以读到全量的消息。 为了使得每个应用都能读到全量消息,应用需要有不同的消费组。 对于上面的例子,假如我们新增了一个新的消费组 G2,而这个消费组有两个消费者,那么会是这样的:
在这个场景中,消费组 G1 和消费组 G2 都能收到 T1 主题的全量消息,在逻辑意义上来说它们属于不同的应用。
最后,总结起来就是: 如果应用需要读取全量消息,那么请为该应用设置一个消费组; 如果该应用消费能力不足,那么可以考虑在这个消费组里增加消费者。
监听指定Topic的partition0,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @KafkaListener(id = "id0", topicPartitions = {@TopicPartition(topic = TOPIC, partitions = {"0"})}) public void listenPartition0 (List<ConsumerRecord<?, ?>> records) { log.info("Id0 Listener, Thread ID: " + Thread.currentThread().getId()); log.info("Id0 records size " + records.size()); for (ConsumerRecord<?, ?> record : records) { Optional<?> kafkaMessage = Optional.ofNullable(record.value()); log.info("Received: " + record); if (kafkaMessage.isPresent()) { Object message = record.value(); String topic = record.topic(); log.info("p0 Received message={},topic={}" , message,topic); } } }
Demo解析 地址: http://git.blz.netease.com/june.wang/springKafka-demo.git
如何创建消费者 读取 Kafka 消息只需要创建一个 kafkaConsumer,创建过程与 KafkaProducer 非常相像。 我们需要使用四个基本属性,bootstrap.servers、key.deserializer、value.deserializer 和 group.id。 其中,bootstrap.servers 与创建 KafkaProducer 的含义一样; key.deserializer 和 value.deserializer 是用来做反序列化的,也就是将字节数组转换成对象; group.id 不是严格必须的,但通常都会指定,这个参数是消费者的消费组。
1 2 3 4 5 6 Properties props = new Properties (); props.put("bootstrap.servers" , "broker1:9092,broker2:9092" ); props.put("group.id" , "CountryCounter" ); props.put("key.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer" ); props.put("value.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer" ); KafkaConsumer<String, String> consumer = new KafkaConsumer <String,String>(props);
上面的例子中只设置了几个最基本的消费者参数,bootstrap.servers,group.id,key.deserializer 和 value.deserializer,其他的参数可以看Kafka文档 。 虽然我们很多情况下只是使用默认设置就行,但了解一些比较重要的参数还是很有帮助的。 一些比较重要的参数:
订阅主题 1 consumer.subscribe(Collections.singletonList("topicName" ));
循环拉取 消费数据的 API 和处理方式很简单,我们只需要循环不断拉取消息即可。Kafka 对外暴露了一个非常简洁的 poll 方法,其内部实现了协作、分区重平衡、心跳、数据拉取等功能,但使用时这些细节都被隐藏了,我们也不需要关注这些。
需要提醒的是,消费者对象不是线程安全的,也就是不能够多个线程同时使用一个消费者对象;而且也不能够一个线程有多个消费者对象。简而言之,一个线程一个消费者,如果需要多个消费者那么请使用多线程来进行一一对应。
提交(commit)与位移(offset) 当我们调用 poll () 时,该方法会返回我们没有消费的消息。 当消息从 broker 返回消费者时,broker 并不跟踪这些消息是否被消费者接收到; Kafka 让消费者自身来管理消费的位移(offset),并向消费者提供更新位移的接口,这种更新位移方式称为提交(commit)。
在正常情况下,消费者会发送分区的提交信息到 Kafka,Kafka 进行记录。 当消费者宕机或者新消费者加入时,Kafka 会进行重平衡,这会导致消费者负责之前并不属于它的分区。 重平衡完成后,消费者会重新获取分区的位移,下面来看下两种有意思的情况。
假如一个消费者在重平衡前后都负责某个分区,如果提交位移比之前实际处理的消息位移要小,那么会导致消息重复消费。
假如在重平衡前某个消费者拉取分区消息,在进行消息处理前提交了位移,但还没完成处理宕机了,然后 Kafka 进行重平衡,新的消费者负责此分区并读取提交位移,此时会 “丢失”消息
因此,提交位移的方式会对应用有比较大的影响
自动确认offset :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public class AutoCommitConsumerDemo { public static void main (String[] args) { Properties props = new Properties (); props.put("bootstrap.servers" , "192.168.106.203:9092" ); props.put("group.id" , "autoCommitConsumers_group" ); props.put("enable.auto.commit" , "true" ); props.put("auto.commit.interval.ms" , "1000" ); props.put("key.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer" ); props.put("value.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer" ); @SuppressWarnings("resource") KafkaConsumer<String, String> consumer = new KafkaConsumer <>(props); consumer.subscribe(Collections.singletonList("test" )); while (true ) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000 )); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n" , record.offset(), record.key(), record.value()); } } } }
自动提交 offset 的方式非常简单,但多数情况下,我们不会使用自动提交的方式。 因为不论从 Kafka 集群中拉取的数据是否被处理成功,offset 都会被更新,也就是如果处理过程中出现错误可能会出现数据丢失的情况。所以多数情况下我们会选择手动提交方式
手动提交offset:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 public class ManualCommitConsumerDemo { public static void main (String[] args) { Properties props = new Properties (); props.put("bootstrap.servers" , "192.168.106.203:9092" ); props.put("group.id" , "manualCommitConsumers_group" ); props.put("enable.auto.commit" , "false" ); props.put("key.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer" ); props.put("value.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer" ); KafkaConsumer<String, String> consumer = new KafkaConsumer <>(props); consumer.subscribe(Collections.singletonList("topic02" )); final int minBatchSize = 200 ; List<ConsumerRecord<String, String>> buffer = new ArrayList <>(); while (true ) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000 )); for (ConsumerRecord<String, String> record : records) { buffer.add(record); } if (buffer.size() >= minBatchSize) { insertIntoDb(buffer); consumer.commitAsync((offsets, e) -> { if (e != null ) { log.error("Commit failed for offsets {}" , offsets, e); } }); buffer.clear(); } } } private static void insertIntoDb (List<ConsumerRecord<String, String>> buffer) { System.out.println(buffer); } }
KafkaConsumer从指定位移(offset)开始消费。: 1 2 3 4 5 6 7 8 9 10 11 12 13 @Override public void seek (TopicPartition partition, long offset) { if (offset < 0 ) throw new IllegalArgumentException ("seek offset must not be a negative number" ); acquireAndEnsureOpen(); try { log.debug("Seeking to offset {} for partition {}" , offset, partition); this .subscriptions.seek(partition, offset); } finally { release(); } }
另外注意的是,seek () 只是指定了 poll () 拉取的开始位移,这并不影响在 Kafka 中保存的提交位移(当然我们可以在 seek 和 poll 之后提交位移覆盖)。
优雅退出 在一般情况下,我们会在一个主线程中循环 poll 消息并进行处理。 当需要退出 poll 循环时,我们可以使用另一个线程调用 consumer.wakeup (),调用此方法会使得 poll () 抛出 WakeupException。如果调用 wakup 时,主线程正在处理消息,那么在下一次主线程调用 poll 时会抛出异常WakeUpException。 主线程在抛出 WakeUpException 后,需要调用 consumer.close (),此方法会提交位移,同时发送一个退出消费组的消息到 Kafka 的组协调者。组协调者收到消息后会立即进行重平衡(而无需等待此消费者会话过期)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 try { while (true ) { ConsumerRecords<String, String> records = consumer.poll(1000 ); System.out.println(System.currentTimeMillis() + "-- waiting for data..." ); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s\n" ,record.offset(), record.key(), record.value()); } for (TopicPartition tp: consumer.assignment()) System.out.println("Committing offset at position:" + consumer.position(tp)); consumer.commitSync(); } } catch (WakeupException e) { log.error("" , e); } finally { consumer.close(); System.out.println("Consumer Closed" ); }