(Spring) WebClient 예제
retrieve()
retrieve() vs exchange()
Unlike retrieve(), when using exchange(), it is the responsibility of the application to consume any response content regardless of the scenario (success, error, unexpected data, etc). Not doing so can cause a memory leak.
See ClientResponse for a list of all the available options for consuming the body. Generally prefer using retrieve() unless you have a good reason to use exchange() which does allow to check the response status and headers before deciding how or if to consume the response.
- 요약하면 exchange()는 memory leak 발생할 수 있으니, 웬만하면 retrieve()를 사용하는 것이 좋다.
- 하지만
exchangeToMono / Flux()
는 memory leak 문제가 없으니, 이를 사용하는 것은 문제가 없다.
retrieve() 예제
- retrieve가 아니라 exchange를 사용하는 대부분의 케이스는 status code나 header에 접근하기 위함일텐데, 이는
toEntity
사용해서ResponseEntity
로 만들어도 접근 가능하다. - 예제 및 javadoc
exchange()
exchange()의 문제점 : memory leak
body를 컨슘 하지 않으면 이를 release 하지 않고 계속 가지고 있다. Heap을 5G 정도 소모하고 있는 모습.
- https://github.com/reactor/reactor-netty/issues/1401#issuecomment-736393872
- memory leak을 방지하기 위해서는? 항상 response가 consume 되는지 신경써주어야 한다.
- releaseBody()
- toBodilessEntity()
- bodyToMono(Void.class) - 이러면 connection을 닫는다.
exchange () -> exchangeToMono() 예제
Deprecated. since 5.3 due to the possibility to leak memory and/or connections; please, use exchangeToMono(Function), exchangeToFlux(Function); consider also using retrieve() which provides access to the response status and headers via ResponseEntity along with error status handling.
- 5.3 부터 memory leak 이슈로 exchange는 deprecated 되었고 단점을 해결한
exchangeToMono / Flux
사용 - response body가 consume 되지 않았다면 자동으로 release 해준다. => leak 방지
기타 예제
기본적인 POST 요청 예제
https://medium.com/@odysseymoon/spring-webclient-%EC%82%AC%EC%9A%A9%EB%B2%95-5f92d295edc0
1
2
3
4
5
6
7
8
9
10
11
12
13
Mono<ThreeDSecureResponse> mono = webClient.post()
.uri(url)
.contentType(APPLICATION\_JSON)
.bodyValue(new ThreeDSecureRequest(userId, uuid))
.retrieve()
.bodyToMono(ThreeDSecureResponse.class)
.doOnSuccess(response -> response.validate())
.doOnError(e -> log.warn(e.getMessage(), e));
// switchIfEmpty(Mono.error(new Exception...)) 도 유용하다
mono.subscribeOn(Schedulers.elastic())
.subscribe();
flatMap은 flatten하는데 사용되기도 하지만, Reactive에서는 Mono 안에 있는 것을 꺼내서 다음 체인으로 넘기는 용도로 더 많이 사용한다.
Reactor map, flatMap method는 언제 써야할까?
1
2
3
4
5
6
Mono mono = webClient.post()
.uri(url)
.contentType(APPLICATION\_JSON)
.bodyValue(new ThreeDSecureRequest(userId, uuid))
.exchangeToMono()
.doOnSuccess(response -> checkResponseSucceed(response))
- retrieve와 달리 HTTP status code 500이든 400이든 요청이 성공했다면 다 doOnSuccess로 들어간다
- 그래서 여기서 request가 정말로 성공했느냐를 체크 해야 함.
- doOnSuccess에서 Exception 던지면 doOnError 또는 onErrorResume으로 간다
- Connection reset 같은, 요청이 아예 실패한 경우에는 doOnSuccess로 가지 않는다.
Spring reactor에서 얘기하는 blocking call wrapping하기
https://projectreactor.io/docs/core/release/reference/#faq.wrap-blocking
여러 call을 묶어서 async way로 한번에 resolve 하기
- www.baeldung.com/spring-webclient-simultaneous-calls
- stackoverflow.com/questions/43269275/parallelflux-vs-flatmap-for-a-blocking-i-o-task
- 타입이 다른 여러 Mono 들을 한번에 resolve 하기 위해서는 Mono.zip을 써야 하는데… 반환 값이 tuple이라. 순서 기반 매칭을 써야만 한다는 것이 약간 어글리 한 점? 근데 이 정도는 납득할만하다. 스레드 쓰는 것 보단 나아보임.
- Flux는 Mono.zip에 집어넣을 때 collectMap 등을 사용해서 Mono로 바꿔주면 된다.
1
2
3
4
5
6
fun example(): List<Ticker> =
Flux.fromIterable(coinClient.getAllCoinList())
.flatMap { coinClient.getTicker(it) }
.filter { it.volume > it.yesterdayVolume }
.collectSortedList { o1, o2 -> o1.currency.compareTo(o2.currency) }
.block()!!
Reactor 3 Reference Guide : Appendix A: 이 상황에선 어떤 오퍼레이터를 사용해야 하나?
projectreactor.io/docs/core/release/reference/#which-operator
A.3. Peeking into a Sequence
- 한개의 시퀀스가 전달 될 때마다 doOnNext 이벤트 발생
- 모든 데이터가 전달 완료되면 Flux#doOnComplete, Mono#doOnSuccess 이벤트 발생
- 전달 과정에서 오류가 발생하면 doOnError 이벤트발생
- 등등!
- doOnComplete는 Peeking이라서, 결과 값을 이용하려면 collectList()로 만든 다음에 써야 할 듯?
반복 : repeat, retry 처리하기
- stackoverflow.com/questions/55923326/conditional-repeat-or-retry-on-mono-with-webclient-from-spring-webflux
- https://www.baeldung.com/spring-webflux-retry
- retry의 경우 재시도가 모두 실패하면 RetryExhaustedException 를 던지게 되어 있다.
1
2
3
4
5
6
7
8
9
getOrders(market, listOf("wait", "watch"))
.flatMap { Mono.justOrEmpty(cancelOrder(it.uuid)) } // 각각에 대해서 취소 요청을 보내고
.collectList() // 모든 취소 요청을 보낼 때 까지 대기. Mono<List>로
.repeatWhen(
Repeat.onlyIf { _: RepeatContext<Any> -> getPendingOrders(market).isNotEmpty() }
.exponentialBackoff(Duration.ofMillis(200), Duration.ofSeconds(1))
.repeatMax(5)
)
.delaySubscription(Duration.ofSeconds(10))
다양한 예제 코드 : baeldung
github.com/eugenp/tutorials/tree/master/spring-5-reactive-client
javadoc 참고. 다른 것 보다 javadoc을 먼저 찾아볼 것.
특정 HttpStatus에서 Exception이 아니라 empty 반환하기
1
2
3
4
5
6
7
8
9
10
11
webClient.get()
.uri("https://abc.com/account/123")
.retrieve()
.bodyToMono(Account.class)
.doOnError(WebClientResponseException::class.java) {
log().error("### ${it.responseBodyAsString}") // 단순 로깅이 필요한 경우 doOnError에서 처리
// WebClientResponseException 필드라서 타입 명시되어야 사용 가능
}
.onErrorResume(WebClientResponseException::class.java) {
if (it.statusCode == HttpStatus.NOT_FOUND) Mono.empty() else Mono.error(it)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Mono<Object> entityMono = client.get()
.uri("/persons/1")
.accept(MediaType.APPLICATION_JSON)
.exchangeToMono(response -> {
if (response.statusCode().equals(HttpStatus.OK)) {
return response.bodyToMono(Person.class);
}
else if (response.statusCode().is4xxClientError()) {
return response.bodyToMono(ErrorContainer.class);
}
else {
return response.createException().flatMap(Mono::error);
}
})
.doOnError(WebClientResponseException::class.java, t -> {
URI uri = e.getRequest() != null ? e.getRequest().getURI() : null;
log.error("### [WebClientResponseException] uri=" + uri + ", Status=" + e.getRawStatusCode() + ", Body=" + e.getResponseBodyAsString());
// 비정상 응답 로깅이 필요한 경우 doOnError에서 처리
// 실패 시 body 로깅이 필요한 경우 WebClientResponseException.responseBodyAsString를 참조
// WebClientResponseException 타입이어야 하므로 타입 명시 필수
});
WebClient 컨벤션
4xx, 5xx 응답 코드로 판단하는 경우와, 200 응답 내에서 ResponseCode로 판단해야 하는 케이스의 컨벤션 등
Rate Limit, 요소의 delay 소모
1
2
3
4
5
Flux.fromIterable(Market.values().map { it.getJsonName() })
.delayElements(Duration.ofMillis(150))
.flatMap { do(it) }
.collectMap { it[0].name }
.block()!!
Cache 관련
- https://dreamchaser3.tistory.com/17#잘 정리되어 있다.
- .cache()와 @Cachable의 가장 큰 차이는, 전자는 “같은 Mono/Flux를 여러번 소모했을 때의 cache” 라는 것이다.
- 그래서 .cache()를 쓰려면, private backing field에 .cache() 반환값을 저장해 두고, 이 녀석을 계속 리턴해주어야 함. (매번 새로운 Mono를 반환하는 것이 아니라)
- 파라미터가 없는 경우 쓸만하지만, 파라미터가 있다면 파라미터도 같이 private backing field에 저장하고, 요청에 맞는 backing field를 찾아 반환해주어야 한다.
- 이를 직접 구현하기가 좀 까다로우니, 대신 해주는 것이 Reactor addon이나 @Cachable이다.
Mono<T>
는 notnull 타입만 적어주는 것이 좋다.
1
2
3
4
X | Mono<WalletAddress?>.awaitSingle()
-> Err (No value received via onNext for awaitSingle)
O | Mono<WalletAddress>.awaitSingleOrNull()