响应式编程初探

响应式

响应式系统(Reactive System)

具有以下特质:即时响应性(Responsive)、回弹性(Resilient)、弹性(Elastic)以及消息驱动(Message Driven)响应式系统更加灵活,松耦合,可伸缩

  • 即时响应性
    只要有可能, 系统就会及时地做出响应。 即时响应是可用性和实用性的基石, 而更加重要的是,即时响应意味着可以快速地检测到问题并且有效地对其进行处理。 即时响应的系统专注于提供快速而一致的响应时间, 确立可靠的反馈上限, 以提供一致的服务质量。 这种一致的行为转而将简化错误处理、 建立最终用户的信任并促使用户与系统作进一步的互动
  • 回弹性
    系统在出现失败时依然保持即时响应性。 这不仅适用于高可用的、 任务关键型系统——任何不具备回弹性的系统都将会在发生失败之后丢失即时响应性。 回弹性是通过复制、 遏制、 隔离以及委托来实现的。 失败的扩散被遏制在了每个组件内部, 与其他组件相互隔离, 从而确保系统某部分的失败不会危及整个系统,并能独立恢复。 每个组件的恢复都被委托给了另一个(外部的)组件, 此外,在必要时可以通过复制来保证高可用性。 (因此)组件的客户端不再承担组件失败的处理
  • 弹性
    系统在不断变化的工作负载之下依然保持即时响应性。 响应式系统可以对输入(负载)的速率变化做出反应,比如通过增加或者减少被分配用于服务这些输入(负载)的资源。 这意味着设计上并没有争用点和中央瓶颈, 得以进行组件的分片或者复制, 并在它们之间分布输入(负载)。 通过提供相关的实时性能指标, 响应式系统能支持预测式以及反应式的伸缩算法。 这些系统可以在常规的硬件以及软件平台上实现成本高效的弹性。
  • 消息驱动
    消息驱动:响应式系统依赖异步的消息传递,从而确保了松耦合、隔离、位置透明的组件之间有着明确边界。 这一边界还提供了将失败作为消息委托出去的手段。 使用显式的消息传递,可以通过在系统中塑造并监视消息流队列, 并在必要时应用回压, 从而实现负载管理、 弹性以及流量控制。 使用位置透明的消息传递作为通信的手段, 使得跨集群或者在单个主机中使用相同的结构成分和语义来管理失败成为了可能。 非阻塞的通信使得接收者可以只在活动时才消耗资源, 从而减少系统开销。

响应式编程初探

响应式编程

响应式编程(reactive programming)是一种基于数据流(data stream)和变化传递(propagation of change)的声明式(declarative)的编程范式

变化传递
在命令式编程下,式子 a=b+c,代表a的值是由b和c计算出来的。如果b或c后续有变化,不会影响到a的值。
在响应式编程下,式子 a:=b+c,代表着a的值是由b和c计算出来的。但如果b或者c的值后续有变化,会影响到a的值。

响应式流

Reactive Streams致力于提供一套非阻塞背压的异步流处理标准规范
通过定义一组实体,接口和互操作方法,给出了实现异步非阻塞背压的标准。第三方遵循这个标准来实现具体的解决方案,常见的有Reactor,RxJava,Akka Streams,Ratpack等

响应式流定义 org.reactivestreams
响应式编程初探

Reactor

Reactor 是一个用于JVM的完全非阻塞的响应式编程框架,具备高效的需求管理(即对 “背压(backpressure)”的控制)能力。它与 Java 8 函数式 API 直接集成,比如 CompletableFuture, Stream, 以及 Duration。它提供了异步序列 API Flux(用于[N]个元素)和 Mono(用于 [0|1]个元素),并完全遵循和实现了“响应式扩展规范”(Reactive Extensions Specification)

响应式编程初探

Flux

Flux 是一个能够发出 0 到 N 个元素的标准的 Publisher,它会被一个“错误(error)” 或“完成(completion)”信号终止。因此,一个 flux 的可能结果是一个 value、completion 或 error。 就像在响应式流规范中规定的那样,这三种类型的信号被翻译为面向下游的 onNext,onComplete和onError方法。
响应式编程初探

Mono

Mono 是一种特殊的 Publisher, 它最多发出一个元素,然后终止于一个 onComplete 信号或一个 onError 信号。
响应式编程初探

Scheduler

在 Reactor 中,执行模式以及执行过程取决于所使用的 Scheduler。 Scheduler 是一个拥有广泛实现类的抽象接口。Schedulers 类提供的静态方法:

  • 当前线程(Schedulers.immediate())
  • 可重用的单线程(Schedulers.single())。注意,这个方法对所有调用者都提供同一个线程来使用, 直到该调度器(Scheduler)被废弃。如果你想使用专一的线程,就对每一个调用使用 Schedulers.newSingle()。
  • 弹性线程池(Schedulers.elastic()。它根据需要创建一个线程池,重用空闲线程。线程池如果空闲时间过长 (默认为 60s)就会被废弃。对于 I/O 阻塞的场景比较适用。 Schedulers.elastic() 能够方便地给一个阻塞 的任务分配它自己的线程,从而不会妨碍其他任务和资源。
  • 固定大小线程池(Schedulers.parallel())。所创建线程池的大小与 CPU 个数等同。
  • 自定义调度器,Schedulers.fromExecutorService(ExecutorService) 基于现有的 ExecutorService 创建 Scheduler。

发布订阅

  • publishOn 的用法和处于订阅链(subscriber chain)中的其他操作符一样。它将上游 信号传给下游,同时执行指定的调度器 Scheduler 的某个工作线程上的回调。 它会 改变后续的操作符的执行所在线程 (直到下一个 publishOn 出现在这个链上)。
  • subscribeOn 用于订阅(subscription)过程,作用于那个向上的订阅链(发布者在被订阅 时才激活,订阅的传递方向是向上游的)。所以,无论你把 subscribeOn 至于操作链的什么位置, 它都会影响到源头的线程执行环境(context)。 但是,它不会影响到后续的 publishOn,后者仍能够切换其后操作符的线程执行环境。
    在订阅(subscribe)前,只是定义了处理流程,而没有启动发布者。基于此,Reactor 可以使用这些规则来决定如何执行操作链。然后,一旦被订阅了,整个流程就开始工作了。

异常

  • onErrorReturn 返回静态缺省值
  • onErrorResume 捕获并执行一个异常处理方法,动态计算一个候补值,将异常包装后再次抛出
  • onErrorMap 捕获,包装一个新异常后再次抛出
  • doOnError 捕获异常,并继续抛出
  • retry 重试

Spring WebFlux

响应式编程初探

响应式编程初探

SpringWebFlux开发

// 处理请求参数
Mono<String> string = request.body(BodyExtractors.toMono(String.class));
Flux<Person> people = request.body(BodyExtractors.toFlux(Person.class));
Mono<MultiValueMap<String, String>> map = request.formData(); 
// 封装响应参数
Mono<Person> person = ...
ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(person, Person.class);


public class PersonHandler {
    private final PersonRepository repository;
    public PersonHandler(PersonRepository repository) {
        this.repository = repository;
    }
    public Mono<ServerResponse> listPeople(ServerRequest request) { 
        Flux<Person> people = repository.allPeople();
        return ok().contentType(APPLICATION_JSON).body(people, Person.class);
    }
    public Mono<ServerResponse> createPerson(ServerRequest request) { 
        Mono<Person> person = request.bodyToMono(Person.class);
        return ok().build(repository.savePerson(person));
    }
    public Mono<ServerResponse> getPerson(ServerRequest request) { 
        int personId = Integer.valueOf(request.pathVariable("id"));
        return repository.getPerson(personId)
            .flatMap(person -> ok().contentType(APPLICATION_JSON).bodyValue(person))
            .switchIfEmpty(ServerResponse.notFound().build());
    }
}

PersonRepository repository = ...
PersonHandler handler = new PersonHandler(repository);

RouterFunction<ServerResponse> otherRoute = ...
//接口路由
RouterFunction<ServerResponse> route = route()
    .GET("/person/{id}", accept(APPLICATION_JSON), handler::getPerson) 
    .GET("/person", accept(APPLICATION_JSON), handler::listPeople) 
    .POST("/person", handler::createPerson) 
    .add(otherRoute) 
    .build();


Spring WebFlux 也可以完全复用SpringMVC的注解

@RestController
@RequestMapping("/users")
public class MyRestController {
    private final UserRepository userRepository;
    private final CustomerRepository customerRepository;
    public MyRestController(UserRepository userRepository, CustomerRepository customerRepository) {
        this.userRepository = userRepository;
        this.customerRepository = customerRepository;
    }
    @GetMapping("/{userId}")
    public Mono<User> getUser(@PathVariable Long userId) { return this.userRepository.findById(userId);}

    @GetMapping("/{userId}/customers")
    public Flux<Customer> getUserCustomers(@PathVariable Long userId) {
	return this.userRepository.findById(userId).flatMapMany(this.customerRepository::findByUser);
    }

    @DeleteMapping("/{userId}")
    public Mono<Void> deleteUser(@PathVariable Long userId) { return this.userRepository.deleteById(userId);}
}

SpringWebFlux 使用feign进行微服务调用

<dependency>
            <groupId>com.playtika.reactivefeign</groupId>
            <artifactId>feign-reactor-spring-cloud-starter</artifactId>
            <version>3.1.5</version>
            <type>pom</type>
</dependency>

@EnableReactiveFeignClients


@ReactiveFeignClient(value = "platform-domain-gateway-server")
public interface DomainGateWayClient {

    @PostMapping("/domain/service")
    Mono<ResponseEntity<Object>> createDomainService(JsonLoadParam param);

}

参考文章
https://www.reactiveprinciples.org/
https://www.reactivemanifesto.org/zh-CN
http://www.reactive-streams.org/
https://www.tutorialspoint.com/concurrency_in_python/concurrency_in_python_reactive_programming.htm
https://link.zhihu.com/?target=https%3A//gist.github.com/staltz/868e7e9bc2a7b8c1f754
https://projectreactor.io/
https://docs.spring.io/spring-framework/docs/current/reference/html/web-reactive.html

原文链接:https://www.cnblogs.com/walkAlwaysInCode/p/17323392.html

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:响应式编程初探 - Python技术站

(0)
上一篇 2023年4月17日
下一篇 2023年4月17日

相关文章

  • 浅谈Java读写注册表的方式Preferences与jRegistry

    浅谈Java读写注册表的方式Preferences与jRegistry 在Windows操作系统中,注册表是用来存储系统和应用程序相关设置的数据库。Java提供了两种方式读写注册表的数据:Preferences和jRegistry。 使用Preferences读写注册表 Preferences是Java 1.4及以上版本中提供的读写注册表数据的API。它可以…

    Java 2023年5月19日
    00
  • Java中的异常处理如何提高程序安全性?

    Java中的异常处理机制是提高程序安全性和稳定性的重要手段之一。它可以让我们在程序运行时捕获和处理可能发生的异常情况,以避免程序的崩溃或者无效输出。 以下是使用Java中的异常处理机制来提高程序安全性的一些攻略: 异常分类 在Java中异常是分为可检查异常和非可检查异常两种: 可检查异常(checked exception):指在编译阶段就可以预测并处理的异…

    Java 2023年4月27日
    00
  • JDBC使用Statement修改数据库

    JDBC是Java Database Connectivity的简称,是Java专门用于访问数据库的标准API。它提供了一种标准的访问关系型数据库的方法,可以通过它访问MySQL、Oracle、SQL Server等数据库。Statement是JDBC中用于执行SQL语句的接口,包含了执行SQL查询、更新等操作的方法。 下面是使用Statement修改数据库…

    Java 2023年5月20日
    00
  • 常见的Java反射应用场景有哪些?

    常见的Java反射应用场景主要包括以下几个方面: 动态代理 取得类的方法、属性等信息 调用私有方法,破解封装性 注解解析 以下是两个具体的示例: 动态代理 动态代理是Java反射的一大应用,主要用于在运行时动态地创建一个代理类。这个代理类实现了一组给定接口,它的方法调用会被转发到一个调用处理器上。在代理对象的实现中,我们可以在方法执行前后加入任意的操作,比如…

    Java 2023年5月11日
    00
  • Struts2修改上传文件大小限制方法解析

    当我们使用Struts2框架进行文件上传时,有时候会遇到上传的文件大小超过了限制的问题。默认情况下,Struts2上传文件大小限制为2M,如果需要修改文件上传大小限制,则需要进行如下操作: 步骤1:添加struts.xml配置 在struts.xml配置文件中添加以下配置,其中10485760代表文件大小限制为10M。 <interceptors&gt…

    Java 2023年5月19日
    00
  • Spring mvc实现Restful返回xml格式数据实例详解

    下面是关于“Spring MVC实现Restful返回XML格式数据实例详解”的完整攻略,包含两个示例说明。 Spring MVC实现Restful返回XML格式数据实例详解 在Java Web开发中,Spring MVC是一个非常流行的框架。在本文中,我们将介绍如何使用Spring MVC实现Restful返回XML格式数据。 步骤1:添加依赖 首先,我们…

    Java 2023年5月17日
    00
  • Bootstrap的fileinput插件实现多文件上传的方法

    下面我来介绍一下Bootstrap的fileinput插件实现多文件上传的方法。 1. 插件介绍 Bootstrap的fileinput插件是一个强大的文件上传插件,支持多文件上传、图片预览等功能,而且使用起来也非常方便,只需要简单的配置和调用就可以了。 2. 安装插件 你可以通过多种方法来安装Bootstrap的fileinput插件,比如使用CDN、下载…

    Java 2023年6月15日
    00
  • JavaScript创建对象方式总结【工厂模式、构造函数模式、原型模式等】

    JavaScript创建对象方式总结 在JavaScript中,我们可以使用多种方式来创建对象,包括工厂模式、构造函数模式、原型模式等。下面将针对每种方式进行详细讲解。 工厂模式 工厂模式是一种基本的对象创建方式,通过工厂函数来创建对象。这种方式可以避免重复代码,提高了代码的可复用性。 实现一个创建人物的工厂,示例代码如下: function createP…

    Java 2023年5月26日
    00
合作推广
合作推广
分享本页
返回顶部