Spring/Webflux

Scheduler의 종류

blockbuddy93 2024. 2. 29. 22:31

1. Scheduler의 종류: Schedulers 클래스의 정적 메서드로 제공 됨

1.1 Schedulers.immediate()

  • 별도의 쓰레드를 추가 할당하지 않고, 현재 쓰레드에서 실행된다.
Flux.fromArray(new Integer[] {1, 3, 5, 7})
  .publishOn(Schedulers.parallel())
  .filter(data -> data > 3)
  .doOnNext(Logger::doOnNext)
  .publishOn(Schedulers.immediate())
  .map(data -> data * 10)
  .doOnNext(Logger::doOnNext)
  .subscribe(Logger::onNext);

1.2 Schedulers.single()

  • 하나의 쓰레드를 재사용한다.
  • 쓰레드 하나만 생성해서, 스케줄러가 제거되기 전까지 재사용하는 방식
  • 저지연 일회성 실행에 최적화 되어있다.
public void executeDoTask() {
  doTask("task1").subscribe(Logger::onNext);
  doTask("task2").subscribe(Logger::onNext);
}

private Flux<Integer> doTask(String taskName) {
  return Flux.fromArray(new Integer[] {1, 3, 5, 7})
    .publishOn(Schedulers.single())
    .filter(data -> data > 3)
    .doOnNext(Logger::doOnNext)
    .map(data -> data * 10)
    .doOnNext(Logger::doOnNext);
}

1.3 Schedulers.boundedElastic()

  • 쓰레드 풀을 생성하여 생성된 쓰레드를 재사용한다.
  • 생성된 스레드풀 안에서 정해진 수만큼 스레드만큼 처리하고 작업이 종료되면 해당 스레드를 반납해서 재사용하는 방식
  • 생성할 수 있는 쓰레드 수에 제한이 있다. (Default, CPU 코어수 x 10)
  • 긴 실행 시간을 가질 수 있는 Blocking I/O 작업에 최적화 되어 있다.
  • 우리가 사용하는 subscribe 스케줄러 전용 오퍼레이터에서 사용함.
  • 데이터베이스나, Http request 대량의 데이터 I/O를 사용하는 경우 많이 사용된다.

 

 

1.4 Schedulers.parallel()

  • 여러개의 쓰레드를 할당해서 동시에 작업을 수행할 수 있다.
  • Non-Blocking I/O 작업에 최적화 되어있다.
  • CPU 코어 수만큼 스레드를 생성한다.
Flux.fromArray(new Integer[] {1, 3, 5, 7})
  .publishOn(Schedulers.parallel())
  .filter(data -> data > 3)
  .doOnNext(Logger::doOnNext)
  .map(data -> data * 10)
  .doOnNext(Logger::doOnNext)
  .subscribe(Logger::onNext);

1.5 Schedulers.fromExecutorService()

  • 기존의 ExecutorService를 사용해서 쓰레드를 생성한다.
  • 의미있는 식별자를 제공하기 떄문에 Metric에서 주로 사용된다.

 

1.6 Schedulers.newXXXX()

  • 다양한 유형의 새로운 Schedulers를 생성할 수 있다. (newSingle(), newParallel(), newboundedElastic())
  • Scheduler의 이름을 직접 지정할 수 있다.
  • 생성할때마다 새로운 스케줄러를 생성한다.
public void executeDoTask() {
  doTask("task1").subscribe(Logger::onNext);
  doTask("task2").subscribe(Logger::onNext);
}

private Flux<Integer> doTask(String taskName) {
  return Flux.fromArray(new Integer[] {1, 3, 5, 7})
    .publishOn(Schedulers.newSingle("new-single", true))
    .filter(data -> data > 3)
    .doOnNext(Logger::doOnNext)
    .map(data -> data * 10)
    .doOnNext(Logger::doOnNext);
}

 

 

보충필요.