响应式
响应式系统(Reactive System)
具有以下特质:即时响应性(Responsive)、回弹性(Resilient)、弹性(Elastic)以及消息驱动(Message Driven)响应式系统更加灵活,松耦合,可伸缩
- 即时响应性
只要有可能, 系统就会及时地做出响应。 即时响应是可用性和实用性的基石, 而更加重要的是,即时响应意味着可以快速地检测到问题并且有效地对其进行处理。 即时响应的系统专注于提供快速而一致的响应时间, 确立可靠的反馈上限, 以提供一致的服务质量。 这种一致的行为转而将简化错误处理、 建立最终用户的信任并促使用户与系统作进一步的互动 - 回弹性
系统在出现失败时依然保持即时响应性。 这不仅适用于高可用的、 任务关键型系统——任何不具备回弹性的系统都将会在发生失败之后丢失即时响应性。 回弹性是通过复制、 遏制、 隔离以及委托来实现的。 失败的扩散被遏制在了每个组件内部, 与其他组件相互隔离, 从而确保系统某部分的失败不会危及整个系统,并能独立恢复。 每个组件的恢复都被委托给了另一个(外部的)组件, 此外,在必要时可以通过复制来保证高可用性。 (因此)组件的客户端不再承担组件失败的处理 - 弹性
系统在不断变化的工作负载之下依然保持即时响应性。 响应式系统可以对输入(负载)的速率变化做出反应,比如通过增加或者减少被分配用于服务这些输入(负载)的资源。 这意味着设计上并没有争用点和中央瓶颈, 得以进行组件的分片或者复制, 并在它们之间分布输入(负载)。 通过提供相关的实时性能指标, 响应式系统能支持预测式以及反应式的伸缩算法。 这些系统可以在常规的硬件以及软件平台上实现成本高效的弹性。 - 消息驱动
消息驱动:响应式系统依赖异步的消息传递,从而确保了松耦合、隔离、位置透明的组件之间有着明确边界。 这一边界还提供了将失败作为消息委托出去的手段。 使用显式的消息传递,可以通过在系统中塑造并监视消息流队列, 并在必要时应用回压, 从而实现负载管理、 弹性以及流量控制。 使用位置透明的消息传递作为通信的手段, 使得跨集群或者在单个主机中使用相同的结构成分和语义来管理失败成为了可能。 非阻塞的通信使得接收者可以只在活动时才消耗资源, 从而减少系统开销。
响应式编程
响应式编程(reactive programming)是一种基于数据流(data stream)和变化传递(propagation of change)的声明式(declarative)的编程范式
变化传递
在命令式编程下,式子 a=b+c,代表a的值是由b和c计算出来的。如果b或c后续有变化,不会影响到a的值。
在响应式编程下,式子 a:=b+c,代表着a的值是由b和c计算出来的。但如果b或者c的值后续有变化,会影响到a的值。
响应式流
Reactive Streams致力于提供一套非阻塞背压的异步流处理标准规范
通过定义一组实体,接口和互操作方法,给出了实现异步非阻塞背压的标准。第三方遵循这个标准来实现具体的解决方案,常见的有Reactor,RxJava,Akka Streams,Ratpack等
响应式流定义 org.reactivestreams
Reactor
Reactor 是一个用于JVM的完全非阻塞的响应式编程框架,具备高效的需求管理(即对 “背压(backpressure)”的控制)能力。它与 Java 8 函数式 API 直接集成,比如 CompletableFuture, Stream, 以及 Duration。它提供了异步序列 API Flux(用于[N]个元素)和 Mono(用于 [0|1]个元素),并完全遵循和实现了“响应式扩展规范”(Reactive Extensions Specification)
Flux
Flux
Mono
Mono
Scheduler
在 Reactor 中,执行模式以及执行过程取决于所使用的 Scheduler。 Scheduler 是一个拥有广泛实现类的抽象接口。Schedulers 类提供的静态方法:
- 当前线程(Schedulers.immediate())
- 可重用的单线程(Schedulers.single())。注意,这个方法对所有调用者都提供同一个线程来使用, 直到该调度器(Scheduler)被废弃。如果你想使用专一的线程,就对每一个调用使用 Schedulers.newSingle()。
- 弹性线程池(Schedulers.elastic()。它根据需要创建一个线程池,重用空闲线程。线程池如果空闲时间过长 (默认为 60s)就会被废弃。对于 I/O 阻塞的场景比较适用。 Schedulers.elastic() 能够方便地给一个阻塞 的任务分配它自己的线程,从而不会妨碍其他任务和资源。
- 固定大小线程池(Schedulers.parallel())。所创建线程池的大小与 CPU 个数等同。
- 自定义调度器,Schedulers.fromExecutorService(ExecutorService) 基于现有的 ExecutorService 创建 Scheduler。
发布订阅
- publishOn 的用法和处于订阅链(subscriber chain)中的其他操作符一样。它将上游 信号传给下游,同时执行指定的调度器 Scheduler 的某个工作线程上的回调。 它会 改变后续的操作符的执行所在线程 (直到下一个 publishOn 出现在这个链上)。
- subscribeOn 用于订阅(subscription)过程,作用于那个向上的订阅链(发布者在被订阅 时才激活,订阅的传递方向是向上游的)。所以,无论你把 subscribeOn 至于操作链的什么位置, 它都会影响到源头的线程执行环境(context)。 但是,它不会影响到后续的 publishOn,后者仍能够切换其后操作符的线程执行环境。
在订阅(subscribe)前,只是定义了处理流程,而没有启动发布者。基于此,Reactor 可以使用这些规则来决定如何执行操作链。然后,一旦被订阅了,整个流程就开始工作了。
异常
- onErrorReturn 返回静态缺省值
- onErrorResume 捕获并执行一个异常处理方法,动态计算一个候补值,将异常包装后再次抛出
- onErrorMap 捕获,包装一个新异常后再次抛出
- doOnError 捕获异常,并继续抛出
- retry 重试
Spring WebFlux
SpringWebFlux开发
// 处理请求参数
Mono<String> string = request.body(BodyExtractors.toMono(String.class));
Flux<Person> people = request.body(BodyExtractors.toFlux(Person.class));
Mono<MultiValueMap<String, String>> map = request.formData();
// 封装响应参数
Mono<Person> person = ...
ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(person, Person.class);
public class PersonHandler {
private final PersonRepository repository;
public PersonHandler(PersonRepository repository) {
this.repository = repository;
}
public Mono<ServerResponse> listPeople(ServerRequest request) {
Flux<Person> people = repository.allPeople();
return ok().contentType(APPLICATION_JSON).body(people, Person.class);
}
public Mono<ServerResponse> createPerson(ServerRequest request) {
Mono<Person> person = request.bodyToMono(Person.class);
return ok().build(repository.savePerson(person));
}
public Mono<ServerResponse> getPerson(ServerRequest request) {
int personId = Integer.valueOf(request.pathVariable("id"));
return repository.getPerson(personId)
.flatMap(person -> ok().contentType(APPLICATION_JSON).bodyValue(person))
.switchIfEmpty(ServerResponse.notFound().build());
}
}
PersonRepository repository = ...
PersonHandler handler = new PersonHandler(repository);
RouterFunction<ServerResponse> otherRoute = ...
//接口路由
RouterFunction<ServerResponse> route = route()
.GET("/person/{id}", accept(APPLICATION_JSON), handler::getPerson)
.GET("/person", accept(APPLICATION_JSON), handler::listPeople)
.POST("/person", handler::createPerson)
.add(otherRoute)
.build();
Spring WebFlux 也可以完全复用SpringMVC的注解
@RestController
@RequestMapping("/users")
public class MyRestController {
private final UserRepository userRepository;
private final CustomerRepository customerRepository;
public MyRestController(UserRepository userRepository, CustomerRepository customerRepository) {
this.userRepository = userRepository;
this.customerRepository = customerRepository;
}
@GetMapping("/{userId}")
public Mono<User> getUser(@PathVariable Long userId) { return this.userRepository.findById(userId);}
@GetMapping("/{userId}/customers")
public Flux<Customer> getUserCustomers(@PathVariable Long userId) {
return this.userRepository.findById(userId).flatMapMany(this.customerRepository::findByUser);
}
@DeleteMapping("/{userId}")
public Mono<Void> deleteUser(@PathVariable Long userId) { return this.userRepository.deleteById(userId);}
}
SpringWebFlux 使用feign进行微服务调用
<dependency>
<groupId>com.playtika.reactivefeign</groupId>
<artifactId>feign-reactor-spring-cloud-starter</artifactId>
<version>3.1.5</version>
<type>pom</type>
</dependency>
@EnableReactiveFeignClients
@ReactiveFeignClient(value = "platform-domain-gateway-server")
public interface DomainGateWayClient {
@PostMapping("/domain/service")
Mono<ResponseEntity<Object>> createDomainService(JsonLoadParam param);
}
参考文章
https://www.reactiveprinciples.org/
https://www.reactivemanifesto.org/zh-CN
http://www.reactive-streams.org/
https://www.tutorialspoint.com/concurrency_in_python/concurrency_in_python_reactive_programming.htm
https://link.zhihu.com/?target=https%3A//gist.github.com/staltz/868e7e9bc2a7b8c1f754
https://projectreactor.io/
https://docs.spring.io/spring-framework/docs/current/reference/html/web-reactive.html
原文链接:https://www.cnblogs.com/walkAlwaysInCode/p/17323392.html
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:响应式编程初探 - Python技术站