响应式编程初探

响应式

响应式系统(Reactive System)

具有以下特质:即时响应性(Responsive)、回弹性(Resilient)、弹性(Elastic)以及消息驱动(Message Driven)响应式系统更加灵活,松耦合,可伸缩

  • 即时响应性
    只要有可能, 系统就会及时地做出响应。 即时响应是可用性和实用性的基石, 而更加重要的是,即时响应意味着可以快速地检测到问题并且有效地对其进行处理。 即时响应的系统专注于提供快速而一致的响应时间, 确立可靠的反馈上限, 以提供一致的服务质量。 这种一致的行为转而将简化错误处理、 建立最终用户的信任并促使用户与系统作进一步的互动
  • 回弹性
    系统在出现失败时依然保持即时响应性。 这不仅适用于高可用的、 任务关键型系统——任何不具备回弹性的系统都将会在发生失败之后丢失即时响应性。 回弹性是通过复制、 遏制、 隔离以及委托来实现的。 失败的扩散被遏制在了每个组件内部, 与其他组件相互隔离, 从而确保系统某部分的失败不会危及整个系统,并能独立恢复。 每个组件的恢复都被委托给了另一个(外部的)组件, 此外,在必要时可以通过复制来保证高可用性。 (因此)组件的客户端不再承担组件失败的处理
  • 弹性
    系统在不断变化的工作负载之下依然保持即时响应性。 响应式系统可以对输入(负载)的速率变化做出反应,比如通过增加或者减少被分配用于服务这些输入(负载)的资源。 这意味着设计上并没有争用点和中央瓶颈, 得以进行组件的分片或者复制, 并在它们之间分布输入(负载)。 通过提供相关的实时性能指标, 响应式系统能支持预测式以及反应式的伸缩算法。 这些系统可以在常规的硬件以及软件平台上实现成本高效的弹性。
  • 消息驱动
    消息驱动:响应式系统依赖异步的消息传递,从而确保了松耦合、隔离、位置透明的组件之间有着明确边界。 这一边界还提供了将失败作为消息委托出去的手段。 使用显式的消息传递,可以通过在系统中塑造并监视消息流队列, 并在必要时应用回压, 从而实现负载管理、 弹性以及流量控制。 使用位置透明的消息传递作为通信的手段, 使得跨集群或者在单个主机中使用相同的结构成分和语义来管理失败成为了可能。 非阻塞的通信使得接收者可以只在活动时才消耗资源, 从而减少系统开销。

响应式编程初探

响应式编程

响应式编程(reactive programming)是一种基于数据流(data stream)和变化传递(propagation of change)的声明式(declarative)的编程范式

变化传递
在命令式编程下,式子 a=b+c,代表a的值是由b和c计算出来的。如果b或c后续有变化,不会影响到a的值。
在响应式编程下,式子 a:=b+c,代表着a的值是由b和c计算出来的。但如果b或者c的值后续有变化,会影响到a的值。

响应式流

Reactive Streams致力于提供一套非阻塞背压的异步流处理标准规范
通过定义一组实体,接口和互操作方法,给出了实现异步非阻塞背压的标准。第三方遵循这个标准来实现具体的解决方案,常见的有Reactor,RxJava,Akka Streams,Ratpack等

响应式流定义 org.reactivestreams
响应式编程初探

Reactor

Reactor 是一个用于JVM的完全非阻塞的响应式编程框架,具备高效的需求管理(即对 “背压(backpressure)”的控制)能力。它与 Java 8 函数式 API 直接集成,比如 CompletableFuture, Stream, 以及 Duration。它提供了异步序列 API Flux(用于[N]个元素)和 Mono(用于 [0|1]个元素),并完全遵循和实现了“响应式扩展规范”(Reactive Extensions Specification)

响应式编程初探

Flux

Flux 是一个能够发出 0 到 N 个元素的标准的 Publisher,它会被一个“错误(error)” 或“完成(completion)”信号终止。因此,一个 flux 的可能结果是一个 value、completion 或 error。 就像在响应式流规范中规定的那样,这三种类型的信号被翻译为面向下游的 onNext,onComplete和onError方法。
响应式编程初探

Mono

Mono 是一种特殊的 Publisher, 它最多发出一个元素,然后终止于一个 onComplete 信号或一个 onError 信号。
响应式编程初探

Scheduler

在 Reactor 中,执行模式以及执行过程取决于所使用的 Scheduler。 Scheduler 是一个拥有广泛实现类的抽象接口。Schedulers 类提供的静态方法:

  • 当前线程(Schedulers.immediate())
  • 可重用的单线程(Schedulers.single())。注意,这个方法对所有调用者都提供同一个线程来使用, 直到该调度器(Scheduler)被废弃。如果你想使用专一的线程,就对每一个调用使用 Schedulers.newSingle()。
  • 弹性线程池(Schedulers.elastic()。它根据需要创建一个线程池,重用空闲线程。线程池如果空闲时间过长 (默认为 60s)就会被废弃。对于 I/O 阻塞的场景比较适用。 Schedulers.elastic() 能够方便地给一个阻塞 的任务分配它自己的线程,从而不会妨碍其他任务和资源。
  • 固定大小线程池(Schedulers.parallel())。所创建线程池的大小与 CPU 个数等同。
  • 自定义调度器,Schedulers.fromExecutorService(ExecutorService) 基于现有的 ExecutorService 创建 Scheduler。

发布订阅

  • publishOn 的用法和处于订阅链(subscriber chain)中的其他操作符一样。它将上游 信号传给下游,同时执行指定的调度器 Scheduler 的某个工作线程上的回调。 它会 改变后续的操作符的执行所在线程 (直到下一个 publishOn 出现在这个链上)。
  • subscribeOn 用于订阅(subscription)过程,作用于那个向上的订阅链(发布者在被订阅 时才激活,订阅的传递方向是向上游的)。所以,无论你把 subscribeOn 至于操作链的什么位置, 它都会影响到源头的线程执行环境(context)。 但是,它不会影响到后续的 publishOn,后者仍能够切换其后操作符的线程执行环境。
    在订阅(subscribe)前,只是定义了处理流程,而没有启动发布者。基于此,Reactor 可以使用这些规则来决定如何执行操作链。然后,一旦被订阅了,整个流程就开始工作了。

异常

  • onErrorReturn 返回静态缺省值
  • onErrorResume 捕获并执行一个异常处理方法,动态计算一个候补值,将异常包装后再次抛出
  • onErrorMap 捕获,包装一个新异常后再次抛出
  • doOnError 捕获异常,并继续抛出
  • retry 重试

Spring WebFlux

响应式编程初探

响应式编程初探

SpringWebFlux开发

// 处理请求参数
Mono<String> string = request.body(BodyExtractors.toMono(String.class));
Flux<Person> people = request.body(BodyExtractors.toFlux(Person.class));
Mono<MultiValueMap<String, String>> map = request.formData(); 
// 封装响应参数
Mono<Person> person = ...
ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(person, Person.class);


public class PersonHandler {
    private final PersonRepository repository;
    public PersonHandler(PersonRepository repository) {
        this.repository = repository;
    }
    public Mono<ServerResponse> listPeople(ServerRequest request) { 
        Flux<Person> people = repository.allPeople();
        return ok().contentType(APPLICATION_JSON).body(people, Person.class);
    }
    public Mono<ServerResponse> createPerson(ServerRequest request) { 
        Mono<Person> person = request.bodyToMono(Person.class);
        return ok().build(repository.savePerson(person));
    }
    public Mono<ServerResponse> getPerson(ServerRequest request) { 
        int personId = Integer.valueOf(request.pathVariable("id"));
        return repository.getPerson(personId)
            .flatMap(person -> ok().contentType(APPLICATION_JSON).bodyValue(person))
            .switchIfEmpty(ServerResponse.notFound().build());
    }
}

PersonRepository repository = ...
PersonHandler handler = new PersonHandler(repository);

RouterFunction<ServerResponse> otherRoute = ...
//接口路由
RouterFunction<ServerResponse> route = route()
    .GET("/person/{id}", accept(APPLICATION_JSON), handler::getPerson) 
    .GET("/person", accept(APPLICATION_JSON), handler::listPeople) 
    .POST("/person", handler::createPerson) 
    .add(otherRoute) 
    .build();


Spring WebFlux 也可以完全复用SpringMVC的注解

@RestController
@RequestMapping("/users")
public class MyRestController {
    private final UserRepository userRepository;
    private final CustomerRepository customerRepository;
    public MyRestController(UserRepository userRepository, CustomerRepository customerRepository) {
        this.userRepository = userRepository;
        this.customerRepository = customerRepository;
    }
    @GetMapping("/{userId}")
    public Mono<User> getUser(@PathVariable Long userId) { return this.userRepository.findById(userId);}

    @GetMapping("/{userId}/customers")
    public Flux<Customer> getUserCustomers(@PathVariable Long userId) {
	return this.userRepository.findById(userId).flatMapMany(this.customerRepository::findByUser);
    }

    @DeleteMapping("/{userId}")
    public Mono<Void> deleteUser(@PathVariable Long userId) { return this.userRepository.deleteById(userId);}
}

SpringWebFlux 使用feign进行微服务调用

<dependency>
            <groupId>com.playtika.reactivefeign</groupId>
            <artifactId>feign-reactor-spring-cloud-starter</artifactId>
            <version>3.1.5</version>
            <type>pom</type>
</dependency>

@EnableReactiveFeignClients


@ReactiveFeignClient(value = "platform-domain-gateway-server")
public interface DomainGateWayClient {

    @PostMapping("/domain/service")
    Mono<ResponseEntity<Object>> createDomainService(JsonLoadParam param);

}

参考文章
https://www.reactiveprinciples.org/
https://www.reactivemanifesto.org/zh-CN
http://www.reactive-streams.org/
https://www.tutorialspoint.com/concurrency_in_python/concurrency_in_python_reactive_programming.htm
https://link.zhihu.com/?target=https%3A//gist.github.com/staltz/868e7e9bc2a7b8c1f754
https://projectreactor.io/
https://docs.spring.io/spring-framework/docs/current/reference/html/web-reactive.html

原文链接:https://www.cnblogs.com/walkAlwaysInCode/p/17323392.html

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:响应式编程初探 - Python技术站

(0)
上一篇 2023年4月17日
下一篇 2023年4月17日

相关文章

  • 解决Asp.net Mvc返回JsonResult中DateTime类型数据格式问题的方法

    下面我来详细讲解“解决Asp.net Mvc返回JsonResult中DateTime类型数据格式问题的方法”的完整攻略。 问题概述 在使用Asp.net Mvc框架返回JsonResult时,我们经常会遇到DateTime类型的数据无法正确序列化的问题。原因在于Json序列化默认使用了UTC时间,而DateTime类型的数据默认是本机时间。为了解决这个问题…

    Java 2023年5月26日
    00
  • mybatis xml 中 大于、小于、等于 写法

    在 *.xml 中使用常规的 < > = <= >= 会与xml的语法存在冲突 方法一:使用xml 原生转义的方式进行转义 字符名称 sql符号 转义字符 大于号 > &gt; 小于号 < &lt; 不等于 <> &lt;&gt; 大于等于号 >= &gt;= 小于…

    Java 2023年4月25日
    00
  • IntelliJ idea 如何生成动态的JSON字符串(步骤详解)

    下面是详细的攻略,包括两个示例说明。 IntelliJ idea 如何生成动态的JSON字符串(步骤详解) 一、使用Gson库生成JSON字符串 在IntelliJ Idea中创建一个Java项目,然后在项目中导入Gson库的jar包。 创建一个Java类,在类中定义一个类成员,用于存储需要生成的JSON数据。 “`java import com.goog…

    Java 2023年5月26日
    00
  • java的Array,List和byte[],String相互转换的方法你了解嘛

    当需要在Java中进行数组和列表(List)数据类型之间的相互转换时,以下是Java中可用的几种方法: 数组转List 方法一:使用Arrays.asList()方法 可以使用Arrays.asList()方法将数组转换为List。以下是示例代码: String[] array = {"一", "二", "三…

    Java 2023年5月26日
    00
  • Mybatis集成Spring的实例代码_动力节点Java 学院整理

    下面是Mybatis集成Spring的实例代码攻略: 概述 Mybatis是一款流行的持久层框架,Spring则是业界广泛使用的框架之一,在使用Mybatis时,我们可以将其集成到Spring中以便更好地管理和使用。 本攻略将对如何将Mybatis集成到Spring中进行详细讲解,同时提供相应的代码示例,以方便读者理解和实践。 步骤 第一步:添加依赖 首先需…

    Java 2023年6月3日
    00
  • Java面试题冲刺第二十七天–JVM2

    Java面试题冲刺第二十七天–JVM2 1. 内存模型 Java内存模型主要分为两种: 堆内存:存放我们new出来的对象以及数组等,这部分内存可以动态申请或释放。一般情况下,堆内存比较大。 栈内存:存放基本类型的变量以及对象的引用变量(指针),这些变量会随着程序的运行而申请或释放。栈的空间比较小,一般情况下,栈的大小是在程序启动的时候就固定下来。 2. J…

    Java 2023年5月19日
    00
  • Java实现按行读取大文件

    对于Java实现按行读取大文件,其主要思路是使用BufferedReader类和FileReader类来实现。具体步骤如下: 1.使用FileReader读取大文件 FileReader类可以一次读取大量文件内容,将其存入缓存区中。我们可以通过创建FileReader对象并将文件对象传递到它的构造函数中,来实现读取大文件的目的。 FileReader fil…

    Java 2023年5月20日
    00
  • SpringBoot统一处理功能实现的全过程

    SpringBoot是一种轻量级的Java框架,提供了一种快速开发的方式,这是因为它提供了大量的自动化配置。SpringBoot为Java开发人员提供了快速开发微服务应用程序所需的各种组件。其中包含了很多与Web应用程序相关的组件,包括MVC(Model-View-Controller)框架。本文将讲解如何实现一个SpringBoot应用程序的统一处理功能,…

    Java 2023年5月15日
    00
合作推广
合作推广
分享本页
返回顶部