1 /* 2 * Copyright 2002-2018 the original author or authors. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 module hunt.stomp.support.ExecutorSubscribableChannel; 18 19 import hunt.stomp.support.AbstractSubscribableChannel; 20 import hunt.stomp.support.ChannelInterceptor; 21 import hunt.stomp.support.ExecutorChannelInterceptor; 22 23 import hunt.stomp.Message; 24 import hunt.stomp.MessagingException; 25 import hunt.stomp.MessageChannel; 26 27 import hunt.collection.ArrayList; 28 import hunt.collection.List; 29 import hunt.Exceptions; 30 import hunt.util.Common; 31 import hunt.logging; 32 33 import std.conv; 34 35 // import hunt.concurrent.Executor; 36 37 /** 38 * A {@link SubscribableChannel} that sends messages to each of its subscribers. 39 * 40 * @author Phillip Webb 41 * @author Rossen Stoyanchev 42 * @since 4.0 43 */ 44 class ExecutorSubscribableChannel : AbstractSubscribableChannel { 45 46 private Executor executor; 47 48 // private List!(ExecutorChannelInterceptor) executorInterceptors = new ArrayList<>(4); 49 50 51 /** 52 * Create a new {@link ExecutorSubscribableChannel} instance 53 * where messages will be sent in the callers thread. 54 */ 55 this() { 56 this(null); 57 } 58 59 /** 60 * Create a new {@link ExecutorSubscribableChannel} instance 61 * where messages will be sent via the specified executor. 62 * @param executor the executor used to send the message, 63 * or {@code null} to execute in the callers thread. 64 */ 65 this(Executor executor) { 66 this.executor = executor; 67 } 68 69 Executor getExecutor() { 70 return this.executor; 71 } 72 73 override 74 void setInterceptors(ChannelInterceptor[] interceptors) { 75 super.setInterceptors(interceptors); 76 // this.executorInterceptors.clear(); 77 // interceptors.forEach(this::updateExecutorInterceptorsFor); 78 foreach(ChannelInterceptor c; interceptors) { 79 this.updateExecutorInterceptorsFor(c); 80 } 81 } 82 83 override 84 void addInterceptor(ChannelInterceptor interceptor) { 85 super.addInterceptor(interceptor); 86 updateExecutorInterceptorsFor(interceptor); 87 } 88 89 override 90 void addInterceptor(int index, ChannelInterceptor interceptor) { 91 super.addInterceptor(index, interceptor); 92 updateExecutorInterceptorsFor(interceptor); 93 } 94 95 private void updateExecutorInterceptorsFor(ChannelInterceptor interceptor) { 96 auto ec = cast(ExecutorChannelInterceptor) interceptor; 97 if (ec !is null) { 98 // this.executorInterceptors.add(ec); 99 implementationMissing(false); 100 } 101 } 102 103 104 override 105 bool sendInternal(MessageBase message, long timeout) { 106 version(HUNT_DEBUG) { 107 trace("sending message: ", message.to!string()); 108 } 109 foreach (MessageHandler handler ; getSubscribers()) { 110 SendTask sendTask = new SendTask(message, handler); 111 if (this.executor is null) { 112 sendTask.run(); 113 } else { 114 this.executor.execute(sendTask); 115 } 116 } 117 return true; 118 } 119 120 121 /** 122 * Invoke a MessageHandler with ExecutorChannelInterceptors. 123 */ 124 private class SendTask : MessageHandlingRunnable { 125 126 private MessageBase inputMessage; 127 128 private MessageHandler messageHandler; 129 130 private int interceptorIndex = -1; 131 132 this(MessageBase message, MessageHandler handler) { 133 version(HUNT_DEBUG) { 134 tracef("creating SendTask for Message: %s, with handler: %s", 135 typeid(cast(Object)message), typeid(cast(Object)handler)); 136 } 137 this.inputMessage = message; 138 this.messageHandler = handler; 139 } 140 141 override 142 MessageBase getMessage() { 143 return this.inputMessage; 144 } 145 146 override 147 MessageHandler getMessageHandler() { 148 return this.messageHandler; 149 } 150 151 override 152 void run() { 153 MessageBase message = this.inputMessage; 154 try { 155 message = applyBeforeHandle(message); 156 if (message is null) 157 return; 158 this.messageHandler.handleMessage(message); 159 triggerAfterMessageHandled(message, null); 160 } 161 catch (Exception ex) { 162 triggerAfterMessageHandled(message, ex); 163 auto e = cast(MessagingException) ex; 164 if (e !is null) { 165 throw e; 166 } 167 string description = "Failed to handle " ~ message.to!string() ~ 168 " to " ~ this.toString() ~ " in " ~ this.messageHandler.to!string(); 169 throw new MessageDeliveryException(message, description, ex); 170 } 171 catch (Throwable err) { 172 string description = "Failed to handle " ~ message.to!string() ~ 173 " to " ~ this.toString() ~ " in " ~ this.messageHandler.to!string(); 174 MessageDeliveryException ex2 = new MessageDeliveryException(message, description, err); 175 triggerAfterMessageHandled(message, ex2); 176 throw ex2; 177 } 178 } 179 180 181 private MessageBase applyBeforeHandle(MessageBase message) { 182 MessageBase messageToUse = message; 183 // TODO: Tasks pending completion -@zxp at 11/13/2018, 2:17:10 PM 184 // 185 // implementationMissing(false); 186 // foreach (ExecutorChannelInterceptor interceptor ; executorInterceptors) { 187 // messageToUse = interceptor.beforeHandle(messageToUse, ExecutorSubscribableChannel.this, this.messageHandler); 188 // if (messageToUse is null) { 189 // string name = interceptor.TypeUtils.getSimpleName(typeid(this)); 190 // version(HUNT_DEBUG) { 191 // trace(name ~ " returned null from beforeHandle, i.e. precluding the send."); 192 // } 193 // triggerAfterMessageHandled(message, null); 194 // return null; 195 // } 196 // this.interceptorIndex++; 197 // } 198 return messageToUse; 199 } 200 201 private void triggerAfterMessageHandled(MessageBase message, Exception ex) { 202 // TODO: Tasks pending completion -@zxp at 11/13/2018, 2:17:31 PM 203 // 204 // implementationMissing(false); 205 for (int i = this.interceptorIndex; i >= 0; i--) { 206 // ExecutorChannelInterceptor interceptor = executorInterceptors.get(i); 207 // try { 208 // interceptor.afterMessageHandled(message, this.outer, this.messageHandler, ex); 209 // } 210 // catch (Throwable ex2) { 211 // errorf("Exception from afterMessageHandled in " ~ interceptor.to!string(), ex2); 212 // } 213 } 214 } 215 } 216 217 }