1 /* 2 * Copyright 2002-2018 the original author or authors. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 module hunt.stomp.support.AbstractMonoToListenableFutureAdapter; 18 19 // import java.time.Duration; 20 // import java.util.concurrent.ExecutionException; 21 // import java.util.concurrent.TimeUnit; 22 // import java.util.concurrent.TimeoutException; 23 24 // import reactor.core.publisher.Mono; 25 // import reactor.core.publisher.MonoProcessor; 26 27 28 // 29 // import hunt.framework.util.concurrent.FailureCallback; 30 // import hunt.framework.util.concurrent.ListenableFuture; 31 // import hunt.framework.util.concurrent.ListenableFutureCallback; 32 // import hunt.framework.util.concurrent.ListenableFutureCallbackRegistry; 33 // import hunt.framework.util.concurrent.SuccessCallback; 34 35 // /** 36 // * Adapts {@link Mono} to {@link ListenableFuture} optionally converting the 37 // * result Object type {@code <S>} to the expected target type {@code !(T)}. 38 // * 39 // * @author Rossen Stoyanchev 40 // * @since 5.0 41 // * @param <S> the type of object expected from the {@link Mono} 42 // * @param (T) the type of object expected from the {@link ListenableFuture} 43 // */ 44 // abstract class AbstractMonoToListenableFutureAdapter!(S, T) implements ListenableFuture!(T) { 45 46 // private final MonoProcessor!(S) monoProcessor; 47 48 // private final ListenableFutureCallbackRegistry!(T) registry = new ListenableFutureCallbackRegistry<>(); 49 50 51 // protected AbstractMonoToListenableFutureAdapter(Mono!(S) mono) { 52 // assert(mono, "Mono must not be null"); 53 // this.monoProcessor = mono 54 // .doOnSuccess(result -> { 55 // T adapted; 56 // try { 57 // adapted = adapt(result); 58 // } 59 // catch (Throwable ex) { 60 // this.registry.failure(ex); 61 // return; 62 // } 63 // this.registry.success(adapted); 64 // }) 65 // .doOnError(this.registry::failure) 66 // .toProcessor(); 67 // } 68 69 70 // override 71 72 // public T get() throws InterruptedException { 73 // S result = this.monoProcessor.block(); 74 // return adapt(result); 75 // } 76 77 // override 78 79 // public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { 80 // assert(unit, "TimeUnit must not be null"); 81 // Duration duration = Duration.ofMillis(TimeUnit.MILLISECONDS.convert(timeout, unit)); 82 // S result = this.monoProcessor.block(duration); 83 // return adapt(result); 84 // } 85 86 // override 87 // bool cancel( mayInterruptIfRunning) { 88 // if (isCancelled()) { 89 // return false; 90 // } 91 // this.monoProcessor.cancel(); 92 // return true; 93 // } 94 95 // override 96 // bool isCancelled() { 97 // return this.monoProcessor.isCancelled(); 98 // } 99 100 // override 101 // bool isDone() { 102 // return this.monoProcessor.isTerminated(); 103 // } 104 105 // override 106 // public void addCallback(ListenableFutureCallback<? super T> callback) { 107 // this.registry.addCallback(callback); 108 // } 109 110 // override 111 // public void addCallback(SuccessCallback<? super T> successCallback, FailureCallback failureCallback) { 112 // this.registry.addSuccessCallback(successCallback); 113 // this.registry.addFailureCallback(failureCallback); 114 // } 115 116 117 118 // protected abstract T adapt(S result); 119 120 // }