SpringBoot整合Kafka
1、pom依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.28</version>
</dependency>
</dependencies>
2、yml配置
server:
port: 8001
spring:
application:
name: kafka
kafka:
bootstrap-servers: 172.16.0.17:9092 #指定kafka server的地址,集群配多个中间用英文逗号隔开
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: test-consumer-group #消费群组ID,这个去config/consumer.properties中查看和修改
enable-auto-commit: true
auto-commit-interval: 1000
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
3、数据模型User
/**
* @Author zyf
* @Date 2022/8/18 11:23
* @Version 1.0
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class User implements Serializable {
private static final long serialVersionUID = -2606024559927595351L;
private String id;
private String name;
private Integer age;
}
4、测试接口推送消息
/**
* @Author zyf
* @Date 2022/8/18 11:23
* @Version 1.0
*/
@RestController
@RequestMapping("/demo/kafka")
public class ProducerController {
@Resource
private KafkaTemplate<String, String> kafkaTemplate;
@PostMapping("/register")
public String register(@RequestBody User user) {
String message = JSON.toJSONString(user);
System.out.println("接收到用户信息:" + message);
kafkaTemplate.send("register", message);
return "OK";
}
}
5、监听Topic消费消息
/**
* @Author zyf
* @Date 2022/8/18 11:23
* @Version 1.0
*/
@Configuration
public class Consumer {
@KafkaListener(topics = "register")
public void consume(String message) {
System.out.println("接收到用户消息:" + message);
User user = JSON.parseObject(message, User.class);
System.out.println("正在为 " + user.getName() + " 办理注册业务...");
System.out.println("注册成功");
}
}
6、启动测试
调用接口推送消息
查看控制台
此时推送消息和消费消息都成功了,但是控制台每次都会打印多余的kafka相关的日志,可以选择频闭掉
7、配置logback.xml 屏蔽Kafka的 '多余' 日志信息
<?xml version="1.0" encoding="UTF-8"?>
<configuration scan="true" scanPeriod="60 seconds">
<!-- 引入spring boot默认的logback配置文件 -->
<include resource="org/springframework/boot/logging/logback/defaults.xml"/>
<include resource="org/springframework/boot/logging/logback/console-appender.xml"/>
<contextName>app-server</contextName>
<property name="LOG_HOME_PATH" value="../logs"/>
<property name="PATTERN" value="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level ${PID} --- [%thread] %logger{50} - %msg %n"/>
<!-- 控制台输出 -->
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<!-- 采用Spring boot中默认的控制台彩色日志输出模板 -->
<pattern>${CONSOLE_LOG_PATTERN}</pattern>
</encoder>
</appender>
<appender name="FILE_APPENDER" class="ch.qos.logback.core.rolling.RollingFileAppender">
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<FileNamePattern>${LOG_HOME_PATH}/%d{yyyy-MM-dd}.log</FileNamePattern>
<MaxHistory>15</MaxHistory>
</rollingPolicy>
<encoder>
<charset>UTF-8</charset>
<pattern>${PATTERN}</pattern>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="CONSOLE"/>
<appender-ref ref="FILE_APPENDER"/>
</root>
<!-- 屏蔽kafka的多余日志输出 -->
<logger name="org.apache.kafka.clients.consumer.ConsumerConfig" level="off" />
<logger name="org.apache.kafka.clients.producer.ProducerConfig" level="off" />
</configuration>
重启查看效果:
本文是原创文章,采用 CC BY-NC-ND 4.0 协议,完整转载请注明来自 奇怪的阿峰
评论
匿名评论
隐私政策
你无需删除空行,直接评论以获取最佳展示效果