WebFlux에서 Mono, Flux에 Map 또는 flatMap을 사용할 때 null을 리턴하는 경우
web/Spring

WebFlux에서 Mono, Flux에 Map 또는 flatMap을 사용할 때 null을 리턴하는 경우

반응형

WebFlux로 구성되어 있는 프로젝트에서 Mono, Flux stream에서 map이나 flatMap을 사용해서 특정 데이터를 매핑하는 과정에서 특정한 경우에 대해서 null을 리턴하고 다음 파이프라인에서 filter로 Objects의 nonNull을 사용해서 컨텐츠를 필터링 하려고 했다.

 

작성하려고 한 코드의 일부 예시를 만들어서 작성해봤다.

만약 resultData를 통해 전달받은 데이터가 예상 대로라면 map과정에서 500보다 큰 5189값만 정상 반환하고 나머지는 null을 반환한 후 filter를 통해 정상적으로 하나의 데이터만 남을 것이라고 예상했다.

@Test
@DisplayName("map 과정에서 반환된 null이 정상적으로 필터링 되었는지 확인하는 테스트")
void mono_null_filter_test() {
    StepVerifier.create(resultData()
        .map(d -> {
            if (Long.parseLong(d) > 500L) {
                return d;
            }
            return null;
        })
        .filter(Objects::nonNull)
    )
    .assertNext(d -> {
	   assertThat(d).isNotNull();
       assertThat(d).isEqualTo("5189");
    }).verifyComplete();
}


// Flux String stream 값을 반환하는 메소드 resultData
Flux<String> resultData() {
    List<String> data = new ArrayList<>();

    data.add("123");
    data.add("456");
    data.add("5189");

    return Flux.fromIterable(
        data
    );
}

 

하지만 실제 테스트 결과 NullPointerException가 발생하고 테스트가 죽어버렸다.

expectation "assertNext" failed (expected: onNext(); actual: onError(java.lang.NullPointerException: The mapper returned a null value.))
java.lang.AssertionError: expectation "assertNext" failed (expected: onNext(); actual: onError(java.lang.NullPointerException: The mapper returned a null value.))
	at reactor.test.MessageFormatter.assertionError(MessageFormatter.java:115)
	at reactor.test.MessageFormatter.failPrefix(MessageFormatter.java:104)
	at reactor.test.MessageFormatter.fail(MessageFormatter.java:73)
	at reactor.test.MessageFormatter.failOptional(MessageFormatter.java:88)
	at reactor.test.DefaultStepVerifierBuilder.lambda$consumeNextWith$1(DefaultStepVerifierBuilder.java:268)
	at reactor.test.DefaultStepVerifierBuilder$SignalEvent.test(DefaultStepVerifierBuilder.java:2213)
	at reactor.test.DefaultStepVerifierBuilder$DefaultVerifySubscriber.onSignal(DefaultStepVerifierBuilder.java:1485)
	at reactor.test.DefaultStepVerifierBuilder$DefaultVerifySubscriber.onExpectation(DefaultStepVerifierBuilder.java:1433)
	at reactor.test.DefaultStepVerifierBuilder$DefaultVerifySubscriber.onError(DefaultStepVerifierBuilder.java:1092)
	at reactor.core.publisher.FluxFilter$FilterSubscriber.onError(FluxFilter.java:151)
	at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onError(FluxMap.java:252)
	at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:205)
	at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:76)
	at reactor.core.publisher.FluxIterable$IterableSubscription.fastPath(FluxIterable.java:335)
	at reactor.core.publisher.FluxIterable$IterableSubscription.request(FluxIterable.java:222)
	at reactor.core.publisher.MonoNext$NextSubscriber.request(MonoNext.java:102)
	at reactor.core.publisher.FluxMap$MapConditionalSubscriber.request(FluxMap.java:281)
	at reactor.core.publisher.FluxFilter$FilterSubscriber.request(FluxFilter.java:179)
	at reactor.test.DefaultStepVerifierBuilder$DefaultVerifySubscriber.onSubscribe(DefaultStepVerifierBuilder.java:1124)
	at reactor.core.publisher.FluxFilter$FilterSubscriber.onSubscribe(FluxFilter.java:79)
	at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onSubscribe(FluxMap.java:185)
	at reactor.core.publisher.MonoNext$NextSubscriber.onSubscribe(MonoNext.java:64)
	at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:161)
	at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:86)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4213)
	at reactor.test.DefaultStepVerifierBuilder$DefaultStepVerifier.toVerifierAndSubscribe(DefaultStepVerifierBuilder.java:868)
	at reactor.test.DefaultStepVerifierBuilder$DefaultStepVerifier.verify(DefaultStepVerifierBuilder.java:824)
	at reactor.test.DefaultStepVerifierBuilder$DefaultStepVerifier.verify(DefaultStepVerifierBuilder.java:816)
	at reactor.test.DefaultStepVerifierBuilder.verifyComplete(DefaultStepVerifierBuilder.java:683)
	at com.example.demo.DemoApplicationTests.mono_null_filter_test(DemoApplicationTests.java:31)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:686)
	at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
	at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
	at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
	at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
	at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
	at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
	at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
	at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:212)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:208)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:137)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:71)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:135)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:125)
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:135)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:123)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:122)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:80)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1540)
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:125)
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:135)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:123)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:122)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:80)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1540)
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:125)
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:135)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:123)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:122)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:80)
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:248)
	at org.junit.platform.launcher.core.DefaultLauncher.lambda$execute$5(DefaultLauncher.java:211)
	at org.junit.platform.launcher.core.DefaultLauncher.withInterceptedStreams(DefaultLauncher.java:226)
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:199)
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:132)
	at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.processAllTestClasses(JUnitPlatformTestClassProcessor.java:99)
	at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.access$000(JUnitPlatformTestClassProcessor.java:79)
	at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor.stop(JUnitPlatformTestClassProcessor.java:75)
	at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.stop(SuiteTestClassProcessor.java:61)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
	at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
	at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
	at com.sun.proxy.$Proxy2.stop(Unknown Source)
	at org.gradle.api.internal.tasks.testing.worker.TestWorker.stop(TestWorker.java:133)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
	at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:182)
	at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:164)
	at org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:414)
	at org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:64)
	at org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:48)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:56)
	at java.base/java.lang.Thread.run(Thread.java:834)
	Suppressed: java.lang.NullPointerException: The mapper returned a null value.
		at java.base/java.util.Objects.requireNonNull(Objects.java:246)
		at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:199)
		... 106 more

 

예상과 다른 결과에 Reactor 코드에 Mono에서 사용되는 Map 코드를 보았더니 다음과 같이 되어있었다.

데이터가 subscribe return 되는 과정에서 mapper(Function)에 apply 되었을 때 반환되는 값이 null이게 되면 NPE를 내놓게 되어있었다. 

그래서 MonoMap에서 mapper에 result값이 null인 경우에 subscribe 하는 과정에서 NPE가 되는 것이었다.

 

이것은 map이 아닌 flatMap이어도 동일하다. 

resultData()
    .flatMap(d -> {
        if (Long.parseLong(d) > 500L) {
            return Mono.just(d);
        }
        return null;
    })

 

 

이를 해결 하는 방법은 2가지 있다.


1. flatMap 사용

flatmap의 경우 inner Mono를 unwrap 하는 과정에서 비어있는 Mono를 필터링 하기 때문에 다음과 같이 해결할 수 있다.

@Test
void mono_null_filter_test() {
    StepVerifier.create(resultData()
        .flatMap(d -> {
            if (Long.parseLong(d) > 500L) {
                return Mono.just(d);
            }
            return Mono.empty();
        })
    )
        .assertNext(d -> {
            assertThat(d).isNotNull();
            assertThat(d).isEqualTo("5189");
        }).verifyComplete();
}

 

 

 

2. Optional 사용

null 값을 사용할 수 없기 때문에 Map에서 Null을 반환할 객체를 Null 대신 Optional로 감싸서 반환하고 filter에서 실제 값이 있는 데이터만 filter해서 보여줄 수 있는 방법이 있다.

@Test
void mono_null_filter_test() {
    StepVerifier.create(resultData()
        .map(d -> {
            if (Long.parseLong(d) > 500L) {
                return Optional.of(d);
            }
            return Optional.empty();
        })
        .filter(Optional::isPresent)
        .map(Optional::get)
    )
        .assertNext(d -> {
            assertThat(d).isNotNull();
            assertThat(d).isEqualTo("5189");
        }).verifyComplete();
}

 

 

 

 

출처 : 

tech.kakao.com/2018/05/29/reactor-programming/

 

사용하면서 알게 된 Reactor, 예제 코드로 살펴보기

Reactor는 Pivotal의 오픈소스 프로젝트로, JVM 위에서 동작하는 논블럭킹 애플리케이션을 만들기 위한 리액티브 라이브러리입니다. Reactor는 RxJava 2와 함께 Reactive Stream의 구현체이기도 하고, Spring Fra

tech.kakao.com

stackoverflow.com/questions/58215810/returning-a-null-in-a-map-versus-flatmap-in-reactor

 

Returning a null in a .map() versus .flatMap() in Reactor

The following piece of code works : // emitting employees... .flatMap(employee -> { boolean isAlive = employee.isAlive(); return Mono.just(isAlive) .flatMap(myBoolean -&g...

stackoverflow.com

luvstudy.tistory.com/95

 

Reactor map, flatMap method는 언제 써야할까?

webflux로 서비스를 만들어보면서 map과 flatMap을 언제 써야 할지 헷갈릴 때가 있어 공부한 내용을 정리함. map과 flatMap은 둘 다 스트림의 중간에 값을 변환해주는 역할을 한다. map은 1 : 1로 반환을 보

luvstudy.tistory.com

 

반응형