随便写写……
昨天晚上没犯那么严重的强迫症……
但是也看了小白好几遍……慢慢调整……LP的30KM威慑力还是挺强大的……
昨天到今天正式开始做XX部推送的需求了,坑比我想象中好填一些,但是他们的文档太官方了,看了N遍愣是看不太懂,在看看吧,在不行劳资要打电话控诉了……
有一个坑可以mark下——
现在我手里的新项目已经基本上都用springboot重新搭建了,目前的项目有个需求是从kafka中读取数据,并打包zip推送。由于采用springboot,就不太想用之前的原生kafka代码来写了,所以google了下,发现有个spring-kafka组件可以实现功能,好,那咱走起:
参考官方文档:
application.yml
1 2 3 4 5 6 7
| spring: application: name: xxx-push-xxx kafka: bootstrap-servers: xxxx:9092,xxxx:9092,xxxx:9092 consumer: group-id: xxx-xxx-push
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| @Component public class PushReceiver {
private Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired private BatchMessage batchMessage;
@KafkaListener(topics = {"xxxx.1"}) public void listen(ConsumerRecord<?, ?> record) { Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
batchMessage.handleMessage((String) message); }
} }
|
pom不知道引用那个版本,索性不写版本了,交给springboot自己来处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
| <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion>
<groupId>com.netease.datacube</groupId> <artifactId>police-push-newscomment</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging>
<name>police-push-newscomment</name> <description>Demo project for Spring Boot</description>
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.2.RELEASE</version> <relativePath/> </parent>
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties>
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency>
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
</dependencies>
<build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
|
完事已启动。。。。什么情况,怎么跟想象的不一样?怎么没有任何消息打印。。。难道kafka流里真的没消息。。。进控制台手动查看了下有消息啊,什么鬼。。。。。
后来我机智的问了句,这个kafka是什么版本的?(实际上是特么过了N久时间)
对方答曰:0.9…
哦,莫非不兼容,赶忙回头看文档:
https://projects.spring.io/spring-kafka/
Spring for Apache Kafka Version |
Spring Integration for Apache Kafka Version |
kafka-clients Version |
2.2.x |
3.1.x |
1.1.x |
2.1.x |
3.0.x |
1.0.x, 1.1.x |
2.0.x |
3.0.x |
0.11.0.x, 1.0.x |
1.3.x |
2.3.x |
0.11.0.x, 1.0.x |
1.2.x |
2.2.x |
0.10.2.x |
1.1.x |
2.1.x |
0.10.0.x, 0.10.1.x |
1.0.x |
2.0.x |
0.9.x.x |
N/A* |
1.3.x |
0.8.2.2 |
哦。。。soga,我得用1.0.x版本的啊。(我用的是最新的springboot,不写spring-kafka版本的话,会依赖最新2.2.x版本)
那就调低,pom改下,加上版本声明,1.0.6.RELEASE
,这应该可以了吧,启动后倒是有反应了,不过在抛异常,找不到某个类。
大哥了,我多余的依赖都没指定,都是你自己处理的,还能找不到类。。。
后来我对了pom文件看了又看,莫非是springboot2的事情?好,那咱们降一个。
将springboot降级为1.5.12.RELEASE,果断好用了。
后记
你以为这坑就这么顺利的完了?nonono,图样。
我们这要求5000个消息打成一个zip包,@KafkaListener
是接收到一个消息就进一次方法,难道木有批量接收的方法吗,我记得原生kafka代码有啊,赶紧找找,发现spring-kafka是支持的,但是要到1.1.x版本(黑人问号?)。但是我是升不了了。。。。这肿么办,后来想了个版本,参考了网上一个大牛的代码,后续有机会再进行展开。
简单总结下就是——
官方文档很重要哟!