Spring Cloud Stream入门

Spring Cloud Stream入门

Spring Cloud Stream是一个用于构建消息驱动微服务的框架,该框架在Spring Boot的基础上整合了Spring Integration来连接消息代理中间件。它支持多个消息中间件的自定义配置,同时吸收了这些消息中间件的部分概念,例如,持久化订阅、消费者分组和分区等概念。

是用Stream框架,我们不必关心如何连接各个消息代理中间件,也不必关心如何进行消息的发送与接收,只需简单地进行配置就可以实现这些功能。

Stream框架的组成部分

Spring Cloud Stream主要简化了消息应用的开发,该框架主要包括以下内容:

  • Stream框架自己的应用模型
  • 绑定器抽象层,可以与消息代理中间件进行绑定。通过绑定器的API,可实现插件式的绑定器
  • 持久化订阅的支持
  • 消费者组的支持
  • Topic分区的支持

Spring Cloud Stream在生产者和消费者之间加入了一个类似代理的角色,它直接与消息代理进行交互,消息生产者与消息消费者不再需要直接调用各个消息代理框架的API,它们甚至感觉不到消息代理的存在。

Stream整合Spring

我们在微服务客户端实现消息生产者和消费者。先建立一个Maven项目,pom依赖为:

1
2
3
4
5
6
7
8
9
10
11
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Dalston.SR1</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

然后在新建三个module,分别为:

  • stream-spring-server:Eureka服务器,端口为8761
  • stream-spring-producer:消息生产者,Eureka客户端,注册到Eureka,端口为8081
  • stream-spring-consumer:消息消费者,Eureka客户端,注册到Eureka,端口为8082

结构如下图所示:

stream-spring-server

pom依赖为:

1
2
3
4
5
6
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-eureka-server</artifactId>
</dependency>
</dependencies>

配置文件为:

1
2
3
4
5
6
7
8
9
10
11
server:
port: 8761
eureka:
client:
registerWithEureka: false
fetchRegistry: false
server:
enable-self-preservation: false
logging:
level:
com.netflix: INFO

启动类为:

1
2
3
4
5
6
7
8
9
@SpringBootApplication
@EnableEurekaServer
public class ServerApplication {

public static void main(String[] args) {
new SpringApplicationBuilder(ServerApplication.class).run(args);
}

}

stream-spring-producer

pom依赖为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-config</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-eureka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
</dependencies>

新增一个ProduceService接口,添加produceOrder方法,该方法使用@Output注解进行修饰。使用该注解表示会创建myInput的消息通道。调用该方法收,会向myInput通道投递消息。如果@Output注解不提供参数,则使用方法名作为通道名称。接下来,需要让Spring容器开启绑定的功能,在ProducerApplication类中加入@EnableBinding注解。

ProduceService接口

1
2
3
4
5
6
public interface ProduceService {

@Output("myInput")
SubscribableChannel produceOrder();
}

启动类

1
2
3
4
5
6
7
8
9
10
11
@SpringBootApplication
@EnableEurekaClient
@EnableBinding(ProduceService.class)
public class ProducerApplication {

public static void main(String[] args) {
SpringApplication.run(ProducerApplication.class, args);
}

}

在@@EnableBinding注解中,以ProduceService.class作为参数,Spring容器在启动时,会自动绑定ProduceService接口中定义的通道。

编写控制器,调用ProduceService的发送方法,往服务器发送消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@RestController
public class ProducerController {

@Autowired
ProduceService produceService;

@RequestMapping(value = "/send", method = RequestMethod.GET)
public String sendMessage() {
// 创建消息
Message message = MessageBuilder.withPayload("Hello World".getBytes()).build();
produceService.produceOrder().send(message);
return "Success";
}
}

stream-spring-consumer

依赖和producer一致。

接受消息的通道接口如下:

1
2
3
4
5
public interface ConsumerService {
@Input("myInput")
SubscribableChannel myInput();
}

在ConsumerService中定义了一个myInput的消息输入通道,与生产者一样,在启动类中绑定消息通道。

启动类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@SpringBootApplication
@EnableEurekaClient
@EnableBinding(ConsumerService.class)
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}

@StreamListener("myInput")
public void consume(byte[] message) {
System.out.println("收到的消息:" + new String(message));
}
}

在启动类中使用了@EnableBinding来开启绑定,并声明了通道的接口类。新建了一个consume方法,使用@StreamListener注解进行修饰,声明了订阅myInput通道的消息。

在浏览器中输入:http://localhost:8081/send,能得到成功的响应。

然后再看下consumer的日志,能看到成功接收到了producer发送的日志:

更换绑定器

如果想使用Kafka,可以通过更换Maven依赖来实现,将spring-cloud-starter-stream-rabbit替换为spring-cloud-starter-stream-kafka即可。

Spring Cloud提供了绑定器的API,目前实现了RabbitMQ与Kafka的绑定器。

参考资料

疯狂Spring Cloud微服务架构实战

0%