Post

(Spring) WebClient

왜 WebClient ?: RestTemplate은 deprecated 예정.

docs.spring.io/spring-framework/docs/current/javadoc-api/org/springframework/web/client/RestTemplate.html

As of 5.0 this class is in maintenance mode, with only minor requests for changes and bugs to be accepted going forward. Please, consider using the org.springframework.web.reactive.client.WebClient which has a more modern API and supports sync, async, and streaming scenarios.

  • RestTemplate은 synchronous, blocking call 밖에 할 수 없으나, WebClient는 여기에 추가로 async, nonblocking IO를 지원한다.
  • 그래서 Spring MVC를 쓰든, WebFlux를 쓰든 WebClient를 사용하는 것이 좋다.

WebClient vs. RestTemplate

  • 요 것만 보면 이해됨.www.baeldung.com/spring-webclient-resttemplate
  • “Mono나 Flux를 리턴하는 컨트롤러는 바로 리턴하며 끝나버리고, 나중에 Mono가 준비되면 이를 컨슘하며 클라이언트 쪽으로 내려준다.”
    • 이 것을 Framework 단에서 지원해주어야 하는 것이고, 그래서 NIO 지원이 중요하다고 하는 것
  • Kotlin Coroutine을 사용하면 실행 흐름이 또 다름! [Kotlin/Spring] Kotlin Coroutines
1
2
3
4
5
6
7
8
9
10
11
12
13
Starting BLOCKING Controller!
Tweet(text=RestTemplate rules, username=@user1)
Tweet(text=WebClient is better, username=@user2)
Tweet(text=OK, both are useful, username=@user1)
Exiting BLOCKING Controller!
--------------------------------------
// WebClient는 non-blocking으로 호출되자마자 바로 Flux<Tweet>를 리턴하면서 지나감.
// 따라서 컨트롤러도 바로 Exit.
Starting NON-BLOCKING Controller!
Exiting NON-BLOCKING Controller!
Tweet(text=RestTemplate rules, username=@user1)
Tweet(text=WebClient is better, username=@user2)
Tweet(text=OK, both are useful, username=@user1)

WebClient 객체 생성 및 설정

생성 - WebClient.create()로 만드는 방법
  • 이렇게 직접 만들면, 당연히 Spring auto config 적용 받지 않는다.
  • 따라서 application.properties 설정도 당연히 적용되지 않는다.
생성 - WebClient.Builder DI 받아서 만드는 방법 (추천)
  • Spring WebClientAutoConfiguration 이 제공하는 WebClient.Builder를 DI 받아 WebClient를 생성하는 방법.
  • Spring Boot는 WebClient를 기본 Bean으로 제공하지 않는다. 대신 WebClient.Builder를 기본 Bean으로 제공하고 있다.
    • WebClient 자체가 아니라 Builder를 기본 Bean으로 제공하는 것은 안정성과 확장성 때문 인 듯.
    • 기존 RestTemplate은 빌더가 아니라 RestTemplate 자체를 Bean으로 제공하고 있어서, 클라이언트가 기본 설정에 설정을 덧씌우고 싶은 경우 RestTemplate 자체에 설정 메서드를 호출해서 상태를 변경했다.
    • 이처럼 RestTemplate은 런타임에 설정 수정이 가능했는데, 상당히 fragile하다. (같은 bean을 공유해서 사용하고 있는데 어디선가 갑자기 설정을 바꿔버리면?)
    • 반면 Builder를 DI하는 방식은, 클라이언트가 추가 설정을 덧씌우고 싶은 경우 build 하기 전에 얼마든지 설정 가능하다. 설정이 끝나고 build 한 다음 부터는 해당 인스턴스의 설정이 수정되지 않도록 설계 할 수 있어 안정성도 좋아진다.
  • auto config를 통해 생성된 Bean이므로 기본 설정이 적용되어 있다.
  • 내부적으로는 Spring 전역 ObjectMapper 사용하게 된다.

docs.spring.io/spring-boot/docs/2.1.18.RELEASE/reference/html/boot-features-webclient.html

Spring Boot creates and pre-configures a WebClient.Builder for you; it is strongly advised to inject it in your components and use it to create WebClient instances. Spring Boot is configuring that builder to share HTTP resources, reflect codecs setup in the same fashion as the server ones (see WebFlux HTTP codecs auto-configuration), and more.

Spring Default ObjectMapper 말고 다른 ObjectMapper 사용하기

[Java] Jackson 변수명 snake_case <-> camelCase 변환

logging : SpringBoot maintainer Brian Clozel의 답변.

1
2
3
4
5
6
7
8
9
10
11
spring.http.log-request-details=true   // Boot 버전 2.1.0 이상일 때
 
// 그 이하라면.
@Configuration
static class LoggingCodecConfig {
  @Bean
  @Order(0)
  public CodecCustomizer loggingCodecCustomizer() {
    return (configurer) -> configurer.defaultCodecs() .enableLoggingRequestDetails(true);
  }
}

logging 비교 : 전역 로깅과 별도로, 필요한 응답만 INFO 로깅하려는 경우

e.g., 전역 로깅은 DEBUG로, 모든 요청/응답 헤더바디를 로깅하고 있다. 따라서 INFO 로깅에서는 결과매핑객체를 로깅하려 한다. Mono의 소비 방법(block, awaitSingle…)은 외부에게 맡기고, 내부에서는 로깅만 강제하고 싶은 상황

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
1.
.log(this.javaClass.simpleName, Level.INFO)  
// => Rx step이 모조리 로깅되기 때문에 권장하지 않음.
 
2.
.doOnSuccess { log().info("$it") }
// => Mono에는 사용 가능. 그러나 Flux에서는 사용 불가.
// Flux는 리스트도 맵도 아니기 때문에 Flux를 로깅한다는게 애매하다. 
// Mono는 doOnSuccess 시점에는 T타입이 되는데, Flux는 비슷하게 끝나는 시점에 ?<T> 타입이 된다.(e.g., List<T>, Map<T>) 
// 근데 Flux가 반환 타입이면 ?<T>를 결정 할 수가 없고... 그래서 파라미터로 받는 타입을 결정할 수 없어 로깅 불가.
 
2-1. Flux<T> Mono<Collection<T>>으로 바꿔서 리턴하도록 하면 doOnSuccess 가능.
// 단점은... Mono<Collection<T>> 타입이므로 코드가 아래와 같이 바뀌어야 해서 nested depth가 생긴다.
As-is : Flux
  .filter { it.symbol.endsWith("US") }
  .map { it.symbol }
 
To-be1 : Mono
  .map {
    it.filter { it.symbol.endsWith("US") }
      .map { it.symbol }
  }
To-be2 : Mono
  .flatMapIterable { it }
  .filter { it.symbol.endsWith("US") }
  .map { it.symbol }
 
3.
아예 외부에서 별도로 logging 하는 방법
// 로깅 누락 가능성 있음.
  • 사실 생각해보면,Flux라는건 Rx에서 스트림 형태의 데이터가 순차적으로 준비될 때, 준비되는 대로 그때그때 받는 것에 의미 가 있는건데 API call 응답으로 list를 수신해서 한 방에 모든 Flux element가 준비되는 상황에서는, 굳이 Flux를 쓸 이유가 없다.
  • 방법 2. & 2-1. 을 사용하고 To-be1, To-be2는 편한대로 상황에 맞게 사용하면 그나마 깔끔하게 처리 가능하며 데이터의 성격도 잘 대변할 수 있다.
에러 로깅?
1
2
.doOnError(WebClientResponseException::class.java) {
    logger.error("### [WebClientResponseException] uri=${t.request?.uri}, Status=${t.rawStatusCode}, Body=${t.responseBodyAsString}")}

Usage, 예제

retrieve() vs exchange()

docs.spring.io/spring-framework/docs/5.3.0-SNAPSHOT/spring-framework-reference/web-reactive.html#webflux-client-exchange

Unlike retrieve(), when using exchange(), it is the responsibility of the application to consume any response content regardless of the scenario (success, error, unexpected data, etc). Not doing so can cause a memory leak.
See ClientResponse for a list of all the available options for consuming the body. Generally prefer using retrieve() unless you have a good reason to use exchange() which does allow to check the response status and headers before deciding how or if to consume the response.

  • 요약하면 exchange()는 memory leak 발생할 수 있으니, 웬만하면 retrieve()를 사용하는 것이 좋다.
  • 하지만 exchangeToMono|Flux()는 memory leak 문제가 없으니, 이를 사용하는 것은 문제가 없다.

retrieve() 예제

  • retrieve가 아니라 exchange를 사용하는 대부분의 케이스는 status code나 header에 접근하기 위함일텐데, 이는 toEntity 사용해서 ResponseEntity 로 만들어도 접근 가능하다.
  • 예제 및 javadoc

exchange()의 문제점 : memory leak

body를 컨슘 하지 않으면 이를 release 하지 않고 계속 가지고 있다. Heap을 5G 정도 소모하고 있는 모습.

exchange () -> exchangeToMono() 예제

docs.spring.io/spring-framework/docs/5.3.0/javadoc-api/org/springframework/web/reactive/function/client/WebClient.RequestHeadersSpec.html#exchange–

Deprecated. since 5.3 due to the possibility to leak memory and/or connections; please, use exchangeToMono(Function), exchangeToFlux(Function); consider also using retrieve() which provides access to the response status and headers via ResponseEntity along with error status handling.

  • 5.3 부터 memory leak 이슈로 exchange는 deprecated 되었고 단점을 해결한 exchangeToMonoFlux 사용
  • response body가 consume 되지 않았다면 자동으로 release 해준다. => leak 방지

기본적인 POST 요청 예제

https://medium.com/@odysseymoon/spring-webclient-%EC%82%AC%EC%9A%A9%EB%B2%95-5f92d295edc0

1
2
3
4
5
6
7
8
9
10
11
12
13
Mono<ThreeDSecureResponse> mono = webClient.post()
   .uri(url)
   .contentType(APPLICATION\_JSON)
   .bodyValue(new ThreeDSecureRequest(userId, uuid))
   .retrieve()
   .bodyToMono(ThreeDSecureResponse.class)
   .doOnSuccess(response -> response.validate())
.doOnError(e -> log.warn(e.getMessage(), e));

// switchIfEmpty(Mono.error(new Exception...)) 도 유용하다

mono.subscribeOn(Schedulers.elastic())
.subscribe();

*** flatMap은 flatten하는데 사용되기도 하지만, Reactive에서는 Mono 안에 있는 것을 꺼내서 다음 체인으로 넘기는 용도로 더 많이 사용한다. Reactor map, flatMap method는 언제 써야할까?

1
2
3
4
5
6
7
8
9
10
11
12
Mono mono = webClient.post()
   .uri(url)
   .contentType(APPLICATION\_JSON)
   .bodyValue(new ThreeDSecureRequest(userId, uuid))
   .exchangeToMono()
.doOnSuccess(response -> checkResponseSucceed(response))


- retrieve와 달리 HTTP status code 500이든 400이든 요청이 성공했다면  doOnSuccess로 들어간다
- 그래서 여기서 request가 정말로 성공했느냐를 체크 해야 .
- doOnSuccess에서 Exception 던지면 doOnError 또는 onErrorResume으로 간다
- Connection reset 같은, 요청이 아예 실패한 경우에는 doOnSuccess로 가지 않는다.

Spring reactor에서 얘기하는 blocking call wrapping하기

https://projectreactor.io/docs/core/release/reference/#faq.wrap-blocking

여러 call을 묶어서 async way로 한번에 resolve 하기

1
2
3
4
5
6
fun example(): List<Ticker> =
  Flux.fromIterable(coinClient.getAllCoinList())
    .flatMap { coinClient.getTicker(it) }
    .filter { it.volume > it.yesterdayVolume }
    .collectSortedList { o1, o2 -> o1.currency.compareTo(o2.currency) }
    .block()!!

Reactor 3 Reference Guide : Appendix A: 이 상황에선 어떤 오퍼레이터를 사용해야 하나?

projectreactor.io/docs/core/release/reference/#which-operator

A.3. Peeking into a Sequence
  • 한개의 시퀀스가 전달 될 때마다 doOnNext 이벤트 발생
  • 모든 데이터가 전달 완료되면 Flux#doOnComplete, Mono#doOnSuccess 이벤트 발생
  • 전달 과정에서 오류가 발생하면 doOnError 이벤트발생
  • 등등!
  • doOnComplete는 Peeking이라서, 결과 값을 이용하려면 collectList()로 만든 다음에 써야 할 듯?

반복 : repeat, retry 처리하기

1
2
3
4
5
6
7
8
9
getOrders(market, listOf("wait", "watch"))
  .flatMap { Mono.justOrEmpty(cancelOrder(it.uuid)) } // 각각에 대해서 취소 요청을 보내고
  .collectList() // 모든 취소 요청을 보낼 때 까지 대기. Mono<List>로
  .repeatWhen(
    Repeat.onlyIf { _: RepeatContext<Any> -> getPendingOrders(market).isNotEmpty() }
    .exponentialBackoff(Duration.ofMillis(200), Duration.ofSeconds(1))
    .repeatMax(5)
  )
  .delaySubscription(Duration.ofSeconds(10))

다양한 예제 코드 : baeldung

github.com/eugenp/tutorials/tree/master/spring-5-reactive-client

javadoc 참고. 다른 것 보다 javadoc을 먼저 찾아볼 것.

특정 HttpStatus에서 Exception이 아니라 empty 반환하기

1
2
3
4
5
6
7
8
9
10
11
webClient.get()
  .uri("https://abc.com/account/123")
  .retrieve()
  .bodyToMono(Account.class)
  .doOnError(WebClientResponseException::class.java) {
    log().error("### ${it.responseBodyAsString}") // 단순 로깅이 필요한 경우 doOnError에서 처리
    // WebClientResponseException 필드라서 타입 명시되어야 사용 가능
  }
  .onErrorResume(WebClientResponseException::class.java) {
    if (it.statusCode == HttpStatus.NOT_FOUND) Mono.empty() else Mono.error(it)
  }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Mono<Object> entityMono = client.get()
  .uri("/persons/1")
  .accept(MediaType.APPLICATION_JSON)
  .exchangeToMono(response -> {
    if (response.statusCode().equals(HttpStatus.OK)) {
      return response.bodyToMono(Person.class);
    }
    else if (response.statusCode().is4xxClientError()) {
      return response.bodyToMono(ErrorContainer.class);
    }
    else {
      return response.createException().flatMap(Mono::error);
    }
  })
  .doOnError(WebClientResponseException::class.java, t -> {
    URI uri = e.getRequest() != null ? e.getRequest().getURI() : null;
    log.error("### [WebClientResponseException] uri=" + uri + ", Status=" + e.getRawStatusCode() + ", Body=" + e.getResponseBodyAsString());
    // 비정상 응답 로깅이 필요한 경우 doOnError에서 처리
    // 실패 시 body 로깅이 필요한 경우 WebClientResponseException.responseBodyAsString를 참조
    // WebClientResponseException 타입이어야 하므로 타입 명시 필수
  });

WebClient 컨벤션

4xx, 5xx 응답 코드로 판단하는 경우와, 200 응답 내에서 ResponseCode로 판단해야 하는 케이스의 컨벤션 등

Rate Limit, 요소의 delay 소모

1
2
3
4
5
6
Flux.fromIterable(Market.values().map { it.getJsonName() })
    .delayElements(Duration.ofMillis(150))
    .flatMap { do(it) }
    .collectMap { it[0].name }
    .block()!!

Cache 관련

  • https://dreamchaser3.tistory.com/17#잘 정리되어 있다.
  • .cache()와 @Cachable의 가장 큰 차이는, 전자는 “같은 Mono/Flux를 여러번 소모했을 때의 cache” 라는 것이다.
  • 그래서 .cache()를 쓰려면, private backing field에 .cache() 반환값을 저장해 두고, 이 녀석을 계속 리턴해주어야 함. (매번 새로운 Mono를 반환하는 것이 아니라)
  • 파라미터가 없는 경우 쓸만하지만, 파라미터가 있다면 파라미터도 같이 private backing field에 저장하고, 요청에 맞는 backing field를 찾아 반환해주어야 한다.
    • 이를 직접 구현하기가 좀 까다로우니, 대신 해주는 것이 Reactor addon이나 @Cachable이다.
This post is licensed under CC BY 4.0 by the author.