博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Kafka-与SpringBoot的集成
阅读量:4304 次
发布时间:2019-05-27

本文共 2791 字,大约阅读时间需要 9 分钟。

首先在pom文件中引用kafka依赖

org.springframework.kafka
spring-kafka

一:创建一个消息发送者,发送2条消息,发送完关闭

public static void main(String[] args) {
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 如果不想发送String类型的内容,也可以发送自定义的类型 // 但是需要为该类型写一个org.apache.kafka.common.serialization.Deserializer
的实现 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 第1个string泛型表示topic的类型,基本都是string,还真没见过不是string的topic // 第2个string泛型表示发送的类型,string方便一些,如果想发送对象,则需要为该对象实现一个序列化类 Producer
producer = new KafkaProducer<>(props); for (int i = 0; i < 2; i++) {
Future
future = producer.send(new ProducerRecord
("你的topic", "消息内容")); } // 注意此处,似乎主线程结束了send方法就不会在发送消息, // 所以不要让主线程结束,比如你可以尝试把close方法删掉试试 producer.close(); }

二:创建一个spring boot项目,作为消息接收者,接到消息之后打印到控制台

2.1首先在spring boot启动类中写如下代码(为了演示,对象都直接写到启动类里)

@SpringBootApplicationpublic class DemoApplication {	public static void main(String[] args) {		SpringApplication.run(DemoApplication.class, args);	}	@Bean	public KafkaListenerContainerFactory
> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory
factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } public ConsumerFactory
consumerFactory() { Map
propsMap = new HashMap<>(); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100); propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 6000); propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); // 此处是消费组名字 propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "消费组"); propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); return new DefaultKafkaConsumerFactory<>(propsMap); }}

2.2写一个消费者类,也叫做监听类,发送过来的消息都会进入该类

三.写一个消费者监听类@Componentpublic class Listener {
// @KafkaListener(topics = {"你的topic"},id = "consumerId") @KafkaListener(topics = {
"你的topic" }) public void processMessage(String content) {
// 此处打印发送过来的两条消息 System.out.println(content); }}

转载地址:http://kthws.baihongyu.com/

你可能感兴趣的文章
判断物体是否在视角内(根据摄像机判断)
查看>>
传智播客资料
查看>>
Ubuntu 14.04 配置VNC服务 配置Xfce4桌面
查看>>
Mysql学习(一)
查看>>
n皇后2种解题思路与代码-Java与C++实现
查看>>
python 基础之简单购物车小程序实现
查看>>
超声波
查看>>
C++的四种cast
查看>>
GStreamer pipeline的basetime是如何计算出来的?
查看>>
用gdb如何查看指定地址的内存内容?
查看>>
java实现多线程断点续传,上传下载
查看>>
CSS实现垂直居中
查看>>
freeMarker
查看>>
将老集合中重复的元素删除并添加到新集合中
查看>>
how to install dynalite on centos7?
查看>>
HDU2196(SummerTrainingDay13-D tree dp)
查看>>
C++类的构造函数
查看>>
JavaScript基础——理解变量作用域
查看>>
第三周总结
查看>>
HTML5视频字幕与WebVTT
查看>>