1、pom依赖

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.28</version>
    </dependency>
</dependencies>

2、yml配置

server:
  port: 8001

spring:
  application:
    name: kafka
  kafka:
    bootstrap-servers: 172.16.0.17:9092 #指定kafka server的地址,集群配多个中间用英文逗号隔开
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: test-consumer-group #消费群组ID,这个去config/consumer.properties中查看和修改
      enable-auto-commit: true
      auto-commit-interval: 1000
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

3、数据模型User

/**
 * @Author zyf
 * @Date 2022/8/18 11:23
 * @Version 1.0
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
public class User implements Serializable {

    private static final long serialVersionUID = -2606024559927595351L;

    private String id;

    private String name;

    private Integer age;
}

4、测试接口推送消息

/**
 * @Author zyf
 * @Date 2022/8/18 11:23
 * @Version 1.0
 */
@RestController
@RequestMapping("/demo/kafka")
public class ProducerController {

    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;

    @PostMapping("/register")
    public String register(@RequestBody User user) {
        String message = JSON.toJSONString(user);
        System.out.println("接收到用户信息:" + message);
        kafkaTemplate.send("register", message);
        return "OK";
    }
}

5、监听Topic消费消息

/**
 * @Author zyf
 * @Date 2022/8/18 11:23
 * @Version 1.0
 */
@Configuration
public class Consumer {

    @KafkaListener(topics = "register")
    public void consume(String message) {
        System.out.println("接收到用户消息:" + message);
        User user = JSON.parseObject(message, User.class);
        System.out.println("正在为 " + user.getName() + " 办理注册业务...");
        System.out.println("注册成功");
    }
}


6、启动测试

调用接口推送消息

1660807717993.jpg

查看控制台
image.png

此时推送消息和消费消息都成功了,但是控制台每次都会打印多余的kafka相关的日志,可以选择频闭掉

7、配置logback.xml 屏蔽Kafka的 '多余' 日志信息

<?xml version="1.0" encoding="UTF-8"?>
<configuration scan="true" scanPeriod="60 seconds">

    <!-- 引入spring boot默认的logback配置文件 -->
    <include resource="org/springframework/boot/logging/logback/defaults.xml"/>
    <include resource="org/springframework/boot/logging/logback/console-appender.xml"/>


    <contextName>app-server</contextName>
    <property name="LOG_HOME_PATH" value="../logs"/>

    <property name="PATTERN" value="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level ${PID} --- [%thread] %logger{50} - %msg %n"/>

    <!-- 控制台输出 -->
    <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
        <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
            <!-- 采用Spring boot中默认的控制台彩色日志输出模板 -->
            <pattern>${CONSOLE_LOG_PATTERN}</pattern>
        </encoder>
    </appender>

    <appender name="FILE_APPENDER" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <FileNamePattern>${LOG_HOME_PATH}/%d{yyyy-MM-dd}.log</FileNamePattern>
            <MaxHistory>15</MaxHistory>
        </rollingPolicy>

        <encoder>
            <charset>UTF-8</charset>
            <pattern>${PATTERN}</pattern>
        </encoder>
    </appender>


    <root level="INFO">
        <appender-ref ref="CONSOLE"/>
        <appender-ref ref="FILE_APPENDER"/>
    </root>

    <!-- 屏蔽kafka的多余日志输出 -->
    <logger name="org.apache.kafka.clients.consumer.ConsumerConfig" level="off" />
    <logger name="org.apache.kafka.clients.producer.ProducerConfig" level="off" />

</configuration>

重启查看效果:
image.png