본문 바로가기
Back-end/벡엔드

리액티브 프로그래밍 Spring WebFlux를 이용한 Non Blocking 애플리케이션 구현

by javapp 자바앱 2024. 8. 3.
728x90

 

리액티브 시스템
리액티브 스트림즈
Blocking I/O 와 Non-Blocking I/O
함수 디스크립터 (Function Descriptor)
Reactor
Hot Sequence & Cold Sequence
Backpressure
Scheduler
Context

리액티브 시스템과 리액티브 프로그래밍

 

 

 

리액티브 시스템

 

리액티브 시스템이란?

리액티브라는 용어는 어떤 이벤트나 상황이 발생했을 때, 그에 따라 적절하게 행동하는 것을 의미합니다.

그래서 리액티브 시스템은 반응을 잘하는 시스템,
클라이언트의 요청에 즉각적으로 응답함으로써 지연 시간을 최소화합니다.

 

 

 

리액티브 선언문

MEANS: 비동기 메시지 기반의 통신 -> 느슨한 결합, 격리성, 위치 투명성 보장

FORM: 리액티브 시스템이 비동기 메시지 통신 기반하에 탄력성과 회복성을 가지는 시스템이어야 함

- 탄력성: 시스템의 작업량이 변화하더라도 일정한 응답을 유지하는 것

- 회복성: 시스템에 장애가 발생하더라도 응답성을 유지하는 것

VALUE: 비동기 메시지 기반 통신 바탕으로 한 회복성과 예측 가능한 규모 확장 알고리즘을 통해 시스템의 처리량을 자동으로 확장하고 축소하는 탄력성을 확보 -> 즉각적으로 응답 가능한 시스템 구축

 

 

 

리액티브 프로그래밍

 

리액티브 프로그래밍?

리액티브 시스템의 설계 원칙에 잘 부합하는 비동기 Non-Blocking 통신을 위한 프로그래밍 모델

 

 

 

특징

  • 선언형 프로그래밍
  • Data Streams: 데이터가 지속적으로 발생
  • The Propagation of Change: 지속적으로 데이터가 발생할 때마다 이것을 변화하는 이벤트로 보고, 이벤트 발생시키면서 데이터를 계속적으로 전달하는 것
  • 함수형 프로그래밍 방식의 코드 형태의 파라미터를 가진다.

 

 

선언형 프로그래밍 

  • 동작을 구체적으로 명시하지 않고 목표만 선언
  • 메서드 체인을 형성해서 한 문장으로 된 코드로 구성 
  • 함수형 프로그래밍으로 구성 (화살표 함수)

 

 

리액티브 프로그래밍 코드 구성

  • Publisher
    • 입력으로 들어오는 데이터를 제공하는 역할
  • Subscriber
    • Publisher 로 부터 전달받은 데이터를 사용하는 역할
  • Data Source
    • 리액티브 프로그래밍 에서는 Data Stream
    • 입력으로 전달되는 데이터 
  • Operator
    • Publisher 와 Subscriber 사이에서 적절한 가공 처리 담당

 

 


 

 

리액티브 스트림즈

 

리액티브 스트림즈 이란?

데이터 스트림을 Non-Blocking 이면서 비동기적인 방식으로 처리하기 위한 리액티브 라이브러리표준 사양

 

 

리액티브 스트림즈 구성요소 

Publisher, Subscriber, Subscription, Processor

 

 

컴포넌트

Publisher 

public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}

Kafka와 달리 subscribe 메서드의 파라미터인 Subscriber를 등록하는 형태로 구독이 이루어집니다.

 

 

관련 용어

Signal

Publisher와 Subscriber 간에 주고받는 상호작용

 

Demand

Publisher 가 아직 Subscriber 에게 전달하지 않은 Subscriber가 요청한 데이터

 

Emit

Publisher가 Subscriber에게 데이터를 전달하는 것 

 

Sequence

다양한 Operator로 데이터의 연속적인 흐름을 정의한 것

Operator 체인 형태로 정의됨 

 

Operator

just filter map 같은 메서드

 

 

구현체

RxJava

Rx(Reactive Extensions) 

.NET 환경의 리액티브 확장 라이브러리를 넷플릭스에서 Java 언어로 포팅하여 만든 JVM 기반의 대표적인 리액티브 확장 라이브러리

 

Project Reactor

Spring Framework 에 의해 개발된 리액티브 스트림즈의 구현체

 

Akka Streams

 

Java Flow API

 

 


 

 

Blocking I/O 와 Non-Blocking I/O

 

Blocking I/O

파일 I/O, DB I/O, Network I/O

하나의 스레드가 I/O에 의해서 차단되어 대기하는 것

- 해당 방식의 문제점을 보완하기 위해 멀티스레딩 기법으로 추가 스레드를 할당하여 사용 가능

BUT 컨텍스트 스위칭으로 인해 스레드 전환 비용이 발생

(프로세스  P1 과 P2 의 상태 값이 번갈아 가면서 PCB에 저장되고 PCB로부터 reload 되는 과정)

--> 컨텍스트 스위칭이 많으면 많을수록 CPU의 전체 대기 시간은 길어지기 때문에 성능 저하

- 과다한 메모리 사용으로 오버헤드가 발생 가능

- 스레드 풀에서 응답 지연이 발생 가능 (유휴 스레드가 없을 경우)

 

 

Non-Blocking I/O

작업 스레드의 종료 여부와 관계없이 요청한 스레드는 차단되지 않습니다.

단점

- CPU를 많이 사용하는 작업이 포함된 경우에는 성능에 악영향

 

 

 

스프링에서의 Blocking I/O 와 Non-Blocking I/O

Spring MVC 기반의 웹 애플리케이션이 바로 Blocking I/O 방식을 사용

대량의 요청 트래픽이 발생할 때 생기는 문제점을 극복하기 위해서 Spring WebFlux가 나옴

 

Spring WebFlux  는 Netty 같은 비동기 Non-Blocking I/O 기반의 서버 엔진을 사용 --> 적은 수의 스레드로 많은 수의 요청을 처리하기 때문에, CPU 와 메모리를 효율적으로 사용

 

  • Spring MVC에서는 RestTemplate
  • Spring WebFlux에서는 WebClient

 

 

Non-Blocking I/O 적합한 시스템 

  • Blocking I/O 방식으로 처리하는 데 한계가 있는 대량의 요청 트래픽이 발생하는 시스템
  • 마이크로 서비스 기반 시스템
  • 스트리밍 시스템 또는 실시간 시스템

 

 


 

 

함수 디스크립터 (Function Descriptor)

함수 서술자, 함수 설명자

일반화된 람다 표현식을 통해서 이 함수형 인터페이스가 어떤 파라미터를 가지고, 어떤 값을 리턴하는지 설명해주는 역할

 

 

자주 사용하는 함수형 인터페이스와 함수 디스크립터

함수형 인터페이스 함수 디스크립터
Predicate<T> T -> boolean
Consumer<T> T -> void
Function<T, R> T -> R
Supplier<T> () -> T
BiPredicate<L, R> L, R -> boolean
BiConsumer<T, U> T, U -> void
BiFunction<T, U, R> T,U -> R

 

 

 


 

 

 

Reactor

리액티브 스트림즈의 구현체

리액티브 프로그래밍을 위한 라이브러리

public class FluxMain {
    public static void main(String[] args) {
        // Flux 는 Reactor에서 Publisher 역할을 함
        // just 에서 파라미터로 들어온 값이 입력으로 들어온 데이터: 데이터 소스
        // just는 데이터를 생성해서 제공하는 역할
        // map은 전달받은 데이터를 가공하는 역할
        Flux<String> sequence = Flux.just("Hello", "Reactor");
        sequence.map(data -> data.toLowerCase())
                .subscribe(data -> System.out.println(data));
    }
}

 

 

 

 

Mono

단 하나의 데이터를 emit 하는 Publisher

단 하나의 데이터만 표현

public class FluxMain {
    public static void main(String[] args) {
        Mono.empty()
                .subscribe(none -> System.out.println("# emitted onNext signal"),
                        error -> {},
                        () -> System.out.println("# emitted onComplete signal"));
    }
}

결과

# emitted onComplete signal

 

 

empty() 를 사용하면 내부적으로 emit 할 데이터가 없는 것으로 간주

-> onComplete Signal을 전송

즉, 작업이 끝났음을 알리고 이에 따른 후처리를 하고 싶을 때 사용

 

 

 

fromArray

Flux.fromArray(new Integer[]{3,6,7,9}) ..

데이터 소스로 제공되는 배열 데이터를 처리하기 위함 

 

 

 

concatWith

        Flux<String> flux =
                Mono.justOrEmpty("Steve")
                        .concatWith(Mono.justOrEmpty("Jobs"));
        flux.subscribe(System.out::print);

결과:

SteveJobs

 

justOrEmpty() : null 허용ㅇ 

 

 

collectList()

        Flux.concat(
                Flux.just("a1", "a2"),
                Flux.just("b1", "b2"))
                        .collectList()
                        .subscribe(System.out::println);

[a1, a2, b1, b2]

 

concat 은 Flux (0 또는 1개 이상 emit)

collectList 는 Mono (0 또는 1개만 emit)

 


 

 

Hot Sequence & Cold Sequence

리액티브 프로그래밍에서 Hot, Cold 의 의미

Hot은 무언가 처음부터 다시 시작하지 않고, 같은 작업이 반복되지 않는 느낌

Cold는 처음부터 새로 시작해야 하고, 새로 시작하기 때문에 같은 작업이 반복될 것임

Hot 은 무언가를 새로 시작하지 않는다, Cold 는 무언가를 새로 시작한다.

 

 

Cold Sequence

Subscriber의 구독 시점이 달라도 구독할 때마다

Publisher가 데이터를 emit 하는 과정을 처음부터 다시 시작하는 데이터 흐름

s1: 1 3 5 7

      s2: 1 3 5 7

-------시점------>

 

 

Hot Sequence

구독이 발생한 시점 이전에 Publisher로 부터 emit 된 데이터는 구독자가 전달받지 못하고

구독이 발생한 시점 이후에 emit 된 데이터만 전달 받을 수 있음

s1: 1 3 5 7

       s2:5 7

-------시점------>

 

Flux<T> share()

원본 Flux를 멀티캐스트(또는 공유) 한다

여러 구독자가 하나의 원본 Flux를 공유한다.

최초 구독이 발생했을 때 데이터를 emit 하는 Warm up의 의미를 가지는 Hot Sequence

 

 

Http 요청과 응답에서 Cold Sequence 와 Hot Sequence

Http 요청/응답 에서는 RestTemplate 대신에 Non-Blocking 통신을 지원하는 

WebClient 사용

-> Cold

 

Mono<T> cache()

Mono 를 Hot Sequence로 변경해주고 emit 된 데이터를 캐시한 뒤,

구독이 발생할 때마다 캐시된 데이터를 전달

 

 


 

 

Backpressure

Publisher 가 끊임없이 emit 하는 무수히 많은 데이터를 적절하게 제어하여 데이터 처리에 과부하가 걸리지 않도록 제어하는 것

 

Reactor 에서의 Backpressure 처리 방식

1. 데이터 개수 제어

Backpressure 방식을 사용하여 데이터 요청 개수를 적절하게 제어 

 

2. Backpressure 전략 사용

전략 설명
IGNORE Subscriber가 처리하지 못한 데이터는 버려짐
ERROR Exception 발생
DROP 가장 오래된 데이터부터 버림
LATEST 가장 최신 데이터만 유지하고 나머지는 버림
BUFFER 버퍼 크기를 늘려 더 많은 데이터를 저장 

 

 

 

 

Sinks

멀티스레드 방식으로 Signal 을 전송해도 스레드 안전성을 보장하기 때문에 예기치 않은 동작으로 이어지는 것을 방지해줌

Sinks 는 Publisher 와 Subscriber 의 기능을 모두 지닌 Processor의 향상된 기능을 제공

 

* 스레드 안전성이란?

함수나 변수같은 공유 자원에 동시 접근할 경우에도 프로그램의 실행에 문제가 없음을 의미 

즉, 데드락에 빠지게 되면 스레드 안전성이 깨지게 됨

-- Sinks 의 경우에는 동시 접근을 감지하고, 동시 접근하는 스레드 중 하나가 빠르게 실패함으로써 스레드 안전성을 보장

 

Sinks.One

  • 단일 값을 emit 하는 Sink
  • Sinks.one(value) 메서드로 생성
  • 값을 한 번만 emit 할 수 있음
  • 스레드 안전성 보장

Sinks.Many

  • 여러 개의 값을 emit 할 수 있는 Sink
  • Sinks.many() 메서드로 생성
  • 값을 여러 번 emit 할 수있음
  • 스레드 안전성 보장 
  • UnicastSpec 은 단 하나의 구독자에게만 데이터를 emit
  • MulticastSpec은 하나 이상의 구독자에게 데이터를 emit
  • MulticastReplaySpect은 emit 된 데이터 중에서 특정 시점으로 되돌린 데이터부터 emit

 


 

 

Scheduler

Reactor Sequence 에서 사용되는 스레드를 관리해주는 관리자 

 

물리적 스레드와 논리적 스레드

CPU 코어는 물리적인 스레드, 물리적인 스레드는 논리적인 코어라고도 부름

 

논리적인 스레드는 소프트웨어적으로 생성되는 스레드

프로그램이라고 부르는 프로세스 내에서 실행되는 세부작업의 단위가 됨

논리적인 스레드는 이론적으로는 메모리가 허용하는 범위 내에서 얼마든지 만들 수 있지만 물리적인 스레드의 가용 범위 내에서 실행될 수 있음

 

물리적인 스레드는 병렬성과 관련이 있으며, 논리적인 스레드는 동시성과 관련이 있음

- 병렬성은 물리적인 스레드가 실제로 동시에 실행, 여러 작업을 동시에 처리함

- 동시성은 무수히 많은 논리적 스레드가 (네 개의) 물리적인 스레드를 아주 빠른 속도로 번갈아 가며 사용하기에
마치 동시에 실행되는 것 처럼 보이는 동시성을 가지게 됨

 

 

Operator

subscribeOn()

- 구독이 발생한 직후 실행될 스레드를 지정하는 명령어

- 원본 publisher 의 동작을 수행하기 위한 스레드

.subscribeOn(Schedulers.boundedElastic())

 

publishOn()

Publisher: Reactor Sequence 에서 발생하는 Signal을 다운스트림으로 전송하는 주체

- 다운스트림으로 Signal 을 전송할 때 실행되는 스레드를 제어하는 역할

- .publishOn()을 기준으로 아래쪽인 다운스트림의 실행 스레드를 변경

 

.publishOn(Schedulers.parallel()) // 스레드가 바뀜

.subscribe() ...

 

 

parallel()

병렬성을 가지는 물리적인 스레드

라운드 로빈 방식으로 CPU 코어 개수만큼 스레드를 병렬로 실행

(논리프로세서 8스레드의 CPU가 총 8개의 스레드를 병렬로 실행)

 

 

Scheduler 종류

- Schdulers.immediate() 별도의 스레드를 추가 생성하지 않고, 현재 스레드에서 작업을 처리

- Schdulers.single() 은 스레드 하나만 생성, Schduler 가 제거되기 전까지 재사용

- Schdulers.boundedElastic() 은 ExecutorService 기반의 스레드 풀 생성

  Blocking I/O 작업에 최적화

- Schdulers.parallel() 은 Non-Blocking I/O 에 최적화, CPU 코어 수만큼 스레드 생성

- Schdulers.newXXX 는 새로은 Schduler 인스턴스 생성할 수 있다.

 

 


 

 

 

Context

어떠한 상황에서 그 상황을 처리하기 위해 필요한 정보 

ex) ServletContext, ApplicationContext, SecurityContextHolder

 

Reactor API 문서에서는 Context를 이렇게 정의한다.

 

A key/value store that is propagated between components such as operators via the context protocol.

전파: 다운스트림에서 업스트림으로 Context 가 전파되어 Operator 체인상의 각 Operator 가 해당 Context 의 정보를 동일하게 이용할 수 있음을 의미

 

구독이 발생할 때마다 해당 구독과 연결된 하나의 Context 생긴다고 보면 됨

 

Reactor 에서의 Context

  • Operator 체인에 전파되는 키와 값 형태의 저장소
  • Subscriber 와 매핑되어 구독이 발생할 때마다 해당 구독과 연결된 하나의 Context 가 생김
  • contextWriter() 를 사용해서 Context에 데이터 쓰기 작업을 할 수 있다.
  • deferContextual() 를 사용해서 원본 데이터 소스 레벨에서 Context 의 데이터를 읽을 수 있다.
  • transformDeferredContextual() 를 사용해서 Operator 체인의 중간에서 데이터를 읽을 수 있다.

 

Context에 데이터 읽기/쓰기

  • 데이터를 쓰기 위해서는 Context API 를 사용
  • 데이터를 읽기 위해서는 ContextView API 사용
public static void main(String[] args) throws InterruptedException {
    final String key1= "company";

    Mono<String> mono = Mono.deferContextual(contextView ->
        Mono.just("Company: " + " " + contextView.get(key1))
    )
    .publishOn(Schedulers.parallel());

    mono.contextWrite(context -> context.put(key1, "SAMSUNG"))
                    .subscribe(data -> log.info("구독 onNext: {}", data));
    mono.contextWrite(context -> context.put(key1, "APPLE"))
            .subscribe(data -> log.info("구독 onNext: {}", data));

    Thread.sleep(100L);
}

 

실행결과

15:06:18.414 [parallel-2] INFO org.example.webflux.FluxMain -- 구독 onNext: Company:  APPLE
15:06:18.414 [parallel-1] INFO org.example.webflux.FluxMain -- 구독 onNext: Company:  SAMSUNG

 

: 구독이 발생할 때마다 해당하는 하나의 Context가 하나의 구독에 연결됨

  • Context 는 Operator  체인의 아래에서 위로 전파
  • 동일한 키에 대한 값을 중복해서 저장하면 Operator 체인에서 가장 위쪽에 위치한 ContextWriter() 이 저장한 값으로 덮어 쓴다. -> 가장 위에 ContextWriter() 한 것이 가장 나중에 출력 
  • 모든 Operator에서 Context 에 저장된 데이터를 읽을 수 있도록 ContextWriter()을 Operartor 체인의 맨 마지막에 둠
  • Context 는 인증정보 같은 직교성(독립성)을 가지는 정보를 전송하는 데 적합함

 

 

 

 

 

 

 

 

 

참고: 스프링으로 시작하는 리액티브 프로그래밍 - 황정식 지음 

댓글