本文共 2791 字,大约阅读时间需要 9 分钟。
首先在pom文件中引用kafka依赖
org.springframework.kafka spring-kafka
一:创建一个消息发送者,发送2条消息,发送完关闭
public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 如果不想发送String类型的内容,也可以发送自定义的类型 // 但是需要为该类型写一个org.apache.kafka.common.serialization.Deserializer的实现 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 第1个string泛型表示topic的类型,基本都是string,还真没见过不是string的topic // 第2个string泛型表示发送的类型,string方便一些,如果想发送对象,则需要为该对象实现一个序列化类 Producer producer = new KafkaProducer<>(props); for (int i = 0; i < 2; i++) { Future future = producer.send(new ProducerRecord ("你的topic", "消息内容")); } // 注意此处,似乎主线程结束了send方法就不会在发送消息, // 所以不要让主线程结束,比如你可以尝试把close方法删掉试试 producer.close(); }
二:创建一个spring boot项目,作为消息接收者,接到消息之后打印到控制台
2.1首先在spring boot启动类中写如下代码(为了演示,对象都直接写到启动类里)@SpringBootApplicationpublic class DemoApplication { public static void main(String[] args) { SpringApplication.run(DemoApplication.class, args); } @Bean public KafkaListenerContainerFactory> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } public ConsumerFactory consumerFactory() { Map propsMap = new HashMap<>(); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100); propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 6000); propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); // 此处是消费组名字 propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "消费组"); propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); return new DefaultKafkaConsumerFactory<>(propsMap); }}
2.2写一个消费者类,也叫做监听类,发送过来的消息都会进入该类
三.写一个消费者监听类@Componentpublic class Listener { // @KafkaListener(topics = {"你的topic"},id = "consumerId") @KafkaListener(topics = { "你的topic" }) public void processMessage(String content) { // 此处打印发送过来的两条消息 System.out.println(content); }}
转载地址:http://kthws.baihongyu.com/