Spring Cloud Stream入门
Spring Cloud Stream是一个用于构建消息驱动微服务的框架,该框架在Spring Boot的基础上整合了Spring Integration来连接消息代理中间件。它支持多个消息中间件的自定义配置,同时吸收了这些消息中间件的部分概念,例如,持久化订阅、消费者分组和分区等概念。
是用Stream框架,我们不必关心如何连接各个消息代理中间件,也不必关心如何进行消息的发送与接收,只需简单地进行配置就可以实现这些功能。
Stream框架的组成部分
Spring Cloud Stream主要简化了消息应用的开发,该框架主要包括以下内容:
- Stream框架自己的应用模型
- 绑定器抽象层,可以与消息代理中间件进行绑定。通过绑定器的API,可实现插件式的绑定器
- 持久化订阅的支持
- 消费者组的支持
- Topic分区的支持
Spring Cloud Stream在生产者和消费者之间加入了一个类似代理的角色,它直接与消息代理进行交互,消息生产者与消息消费者不再需要直接调用各个消息代理框架的API,它们甚至感觉不到消息代理的存在。
Stream整合Spring
我们在微服务客户端实现消息生产者和消费者。先建立一个Maven项目,pom依赖为:
1 | <dependencyManagement> |
然后在新建三个module,分别为:
- stream-spring-server:Eureka服务器,端口为8761
- stream-spring-producer:消息生产者,Eureka客户端,注册到Eureka,端口为8081
- stream-spring-consumer:消息消费者,Eureka客户端,注册到Eureka,端口为8082
结构如下图所示:
stream-spring-server
pom依赖为:
1 | <dependencies> |
配置文件为:
1 | server: |
启动类为:
1 |
|
stream-spring-producer
pom依赖为:
1 | <dependencies> |
新增一个ProduceService接口,添加produceOrder方法,该方法使用@Output注解进行修饰。使用该注解表示会创建myInput的消息通道。调用该方法收,会向myInput通道投递消息。如果@Output注解不提供参数,则使用方法名作为通道名称。接下来,需要让Spring容器开启绑定的功能,在ProducerApplication类中加入@EnableBinding注解。
ProduceService接口
1 | public interface ProduceService { |
启动类
1 |
|
在@@EnableBinding注解中,以ProduceService.class作为参数,Spring容器在启动时,会自动绑定ProduceService接口中定义的通道。
编写控制器,调用ProduceService的发送方法,往服务器发送消息。
1 |
|
stream-spring-consumer
依赖和producer一致。
接受消息的通道接口如下:
1 | public interface ConsumerService { |
在ConsumerService中定义了一个myInput的消息输入通道,与生产者一样,在启动类中绑定消息通道。
启动类
1 |
|
在启动类中使用了@EnableBinding来开启绑定,并声明了通道的接口类。新建了一个consume方法,使用@StreamListener注解进行修饰,声明了订阅myInput通道的消息。
在浏览器中输入:http://localhost:8081/send,能得到成功的响应。
然后再看下consumer的日志,能看到成功接收到了producer发送的日志:
更换绑定器
如果想使用Kafka,可以通过更换Maven依赖来实现,将spring-cloud-starter-stream-rabbit替换为spring-cloud-starter-stream-kafka即可。
Spring Cloud提供了绑定器的API,目前实现了RabbitMQ与Kafka的绑定器。