1 /* 2 * Copyright 2002-2018 the original author or authors. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 module hunt.stomp.core.GenericMessagingTemplate; 18 19 20 import hunt.stomp.Message; 21 import hunt.stomp.MessageChannel; 22 import hunt.stomp.MessagingException; 23 import hunt.stomp.MessageHeaders; 24 import hunt.stomp.support.MessageBuilder; 25 import hunt.stomp.support.MessageHeaderAccessor; 26 27 // import java.util.concurrent.CountDownLatch; 28 29 import hunt.Nullable; 30 import hunt.Number; 31 import hunt.logging; 32 33 // import hunt.framework.beans.BeansException; 34 // import hunt.framework.beans.factory.BeanFactory; 35 // import hunt.framework.beans.factory.BeanFactoryAware; 36 37 import std.conv; 38 39 40 /** 41 * A messaging template that resolves destinations names to {@link MessageChannel}'s 42 * to send and receive messages from. 43 * 44 * @author Mark Fisher 45 * @author Rossen Stoyanchev 46 * @author Gary Russell 47 * @since 4.0 48 */ 49 class GenericMessagingTemplate(T) : AbstractDestinationResolvingMessagingTemplate!MessageChannel 50 { // implements BeanFactoryAware 51 52 /** 53 * The default header key used for a send timeout. 54 */ 55 enum string DEFAULT_SEND_TIMEOUT_HEADER = "sendTimeout"; 56 57 /** 58 * The default header key used for a receive timeout. 59 */ 60 enum string DEFAULT_RECEIVE_TIMEOUT_HEADER = "receiveTimeout"; 61 62 private long sendTimeout = -1; 63 64 private long receiveTimeout = -1; 65 66 private string sendTimeoutHeader = DEFAULT_SEND_TIMEOUT_HEADER; 67 68 private string receiveTimeoutHeader = DEFAULT_RECEIVE_TIMEOUT_HEADER; 69 70 private bool throwExceptionOnLateReply = false; 71 72 73 /** 74 * Configure the default timeout value to use for send operations. 75 * May be overridden for individual messages. 76 * @param sendTimeout the send timeout in milliseconds 77 * @see #setSendTimeoutHeader(string) 78 */ 79 void setSendTimeout(long sendTimeout) { 80 this.sendTimeout = sendTimeout; 81 } 82 83 /** 84 * Return the configured default send operation timeout value. 85 */ 86 long getSendTimeout() { 87 return this.sendTimeout; 88 } 89 90 /** 91 * Configure the default timeout value to use for receive operations. 92 * May be overridden for individual messages when using sendAndReceive 93 * operations. 94 * @param receiveTimeout the receive timeout in milliseconds 95 * @see #setReceiveTimeoutHeader(string) 96 */ 97 void setReceiveTimeout(long receiveTimeout) { 98 this.receiveTimeout = receiveTimeout; 99 } 100 101 /** 102 * Return the configured receive operation timeout value. 103 */ 104 long getReceiveTimeout() { 105 return this.receiveTimeout; 106 } 107 108 /** 109 * Set the name of the header used to determine the send timeout (if present). 110 * Default {@value #DEFAULT_SEND_TIMEOUT_HEADER}. 111 * <p>The header is removed before sending the message to avoid propagation. 112 * @since 5.0 113 */ 114 void setSendTimeoutHeader(string sendTimeoutHeader) { 115 assert(sendTimeoutHeader, "'sendTimeoutHeader' cannot be null"); 116 this.sendTimeoutHeader = sendTimeoutHeader; 117 } 118 119 /** 120 * Return the configured send-timeout header. 121 * @since 5.0 122 */ 123 string getSendTimeoutHeader() { 124 return this.sendTimeoutHeader; 125 } 126 127 /** 128 * Set the name of the header used to determine the send timeout (if present). 129 * Default {@value #DEFAULT_RECEIVE_TIMEOUT_HEADER}. 130 * The header is removed before sending the message to avoid propagation. 131 * @since 5.0 132 */ 133 void setReceiveTimeoutHeader(string receiveTimeoutHeader) { 134 assert(receiveTimeoutHeader, "'receiveTimeoutHeader' cannot be null"); 135 this.receiveTimeoutHeader = receiveTimeoutHeader; 136 } 137 138 /** 139 * Return the configured receive-timeout header. 140 * @since 5.0 141 */ 142 string getReceiveTimeoutHeader() { 143 return this.receiveTimeoutHeader; 144 } 145 146 /** 147 * Whether the thread sending a reply should have an exception raised if the 148 * receiving thread isn't going to receive the reply either because it timed out, 149 * or because it already received a reply, or because it got an exception while 150 * sending the request message. 151 * <p>The default value is {@code false} in which case only a WARN message is logged. 152 * If set to {@code true} a {@link MessageDeliveryException} is raised in addition 153 * to the log message. 154 * @param throwExceptionOnLateReply whether to throw an exception or not 155 */ 156 void setThrowExceptionOnLateReply(bool throwExceptionOnLateReply) { 157 this.throwExceptionOnLateReply = throwExceptionOnLateReply; 158 } 159 160 override 161 void setBeanFactory(BeanFactory beanFactory) { 162 setDestinationResolver(new BeanFactoryMessageChannelDestinationResolver(beanFactory)); 163 } 164 165 166 override 167 protected final void doSend(MessageChannel channel, MessageBase message) { 168 doSend(channel, message, sendTimeout(message)); 169 } 170 171 protected final void doSend(MessageChannel channel, MessageBase message, long timeout) { 172 assert(channel, "MessageChannel is required"); 173 174 // Message!(T) messageToSend = message; 175 // MessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, MessageHeaderAccessor.class); 176 // if (accessor !is null && accessor.isMutable()) { 177 // accessor.removeHeader(this.sendTimeoutHeader); 178 // accessor.removeHeader(this.receiveTimeoutHeader); 179 // accessor.setImmutable(); 180 // } 181 // else if (message.getHeaders().containsKey(this.sendTimeoutHeader) 182 // || message.getHeaders().containsKey(this.receiveTimeoutHeader)) { 183 // messageToSend = MessageBuilder.fromMessage(message) 184 // .setHeader(this.sendTimeoutHeader, null) 185 // .setHeader(this.receiveTimeoutHeader, null) 186 // .build(); 187 // } 188 189 // bool sent = (timeout >= 0 ? channel.send(messageToSend, timeout) : channel.send(messageToSend)); 190 191 // if (!sent) { 192 // throw new MessageDeliveryException(message, 193 // "Failed to send message to channel '" ~ channel ~ "' within timeout: " ~ timeout); 194 // } 195 } 196 197 override 198 199 protected final Message!(T) doReceive(MessageChannel channel) { 200 return doReceive(channel, this.receiveTimeout); 201 } 202 203 204 protected final Message!(T) doReceive(MessageChannel channel, long timeout) { 205 assert(channel, "MessageChannel is required"); 206 PollableChannel pollChannel = cast(PollableChannel) channel; 207 assert(pollChannel !is null, "A PollableChannel is required to receive messages"); 208 209 MessageBase message = (timeout >= 0 ? 210 pollChannel.receive(timeout) : pollChannel.receive()); 211 212 if (message is null && logger.isTraceEnabled()) { 213 trace("Failed to receive message from channel '" ~ channel ~ "' within timeout: " ~ timeout.to!string()); 214 } 215 216 return message; 217 } 218 219 override 220 221 protected final Message!(T) doSendAndReceive(MessageChannel channel, Message!(T) requestMessage) { 222 assert(channel, "'channel' is required"); 223 Object originalReplyChannelHeader = requestMessage.getHeaders().getReplyChannel(); 224 Object originalErrorChannelHeader = requestMessage.getHeaders().getErrorChannel(); 225 226 long sendTimeout = sendTimeout(requestMessage); 227 long receiveTimeout = receiveTimeout(requestMessage); 228 229 TemporaryReplyChannel tempReplyChannel = new TemporaryReplyChannel(this.throwExceptionOnLateReply); 230 requestMessage = MessageBuilder.fromMessage(requestMessage).setReplyChannel(tempReplyChannel) 231 .setHeader(this.sendTimeoutHeader, null) 232 .setHeader(this.receiveTimeoutHeader, null) 233 .setErrorChannel(tempReplyChannel).build(); 234 235 try { 236 doSend(channel, requestMessage, sendTimeout); 237 } 238 catch (RuntimeException ex) { 239 tempReplyChannel.setSendFailed(true); 240 throw ex; 241 } 242 243 Message!(T) replyMessage = this.doReceive(tempReplyChannel, receiveTimeout); 244 if (replyMessage !is null) { 245 replyMessage = MessageBuilder.fromMessage(replyMessage) 246 .setHeader(MessageHeaders.REPLY_CHANNEL, originalReplyChannelHeader) 247 .setHeader(MessageHeaders.ERROR_CHANNEL, originalErrorChannelHeader) 248 .build(); 249 } 250 251 return replyMessage; 252 } 253 254 private long sendTimeout(Message!(T) requestMessage) { 255 Long sendTimeout = headerToLong(requestMessage.getHeaders().get(this.sendTimeoutHeader)); 256 return (sendTimeout !is null ? sendTimeout : this.sendTimeout); 257 } 258 259 private long receiveTimeout(Message!(T) requestMessage) { 260 Long receiveTimeout = headerToLong(requestMessage.getHeaders().get(this.receiveTimeoutHeader)); 261 return (receiveTimeout !is null ? receiveTimeout : this.receiveTimeout); 262 } 263 264 265 private Long headerToLong(Object headerValue) { 266 Number number = cast(Number)headerValue; 267 if (number !is null) { 268 return number.longValue(); 269 } 270 else { 271 auto stringValue = cast(Nullable!string)headerValue; 272 if (stringValue !is null) { 273 return Long.parseLong(stringValue.value); 274 } else { 275 return null; 276 } 277 278 } 279 } 280 } 281 282 283 284 285 /** 286 * A temporary channel for receiving a single reply message. 287 */ 288 // private static final class TemporaryReplyChannel : PollableChannel { 289 290 // private final CountDownLatch replyLatch = new CountDownLatch(1); 291 292 // private final bool throwExceptionOnLateReply; 293 294 295 // private Message!(T) replyMessage; 296 297 // private bool hasReceived; 298 299 // private bool hasTimedOut; 300 301 // private bool hasSendFailed; 302 303 // this(bool throwExceptionOnLateReply) { 304 // this.throwExceptionOnLateReply = throwExceptionOnLateReply; 305 // } 306 307 // void setSendFailed(bool hasSendError) { 308 // this.hasSendFailed = hasSendError; 309 // } 310 311 // override 312 313 // Message!(T) receive() { 314 // return this.receive(-1); 315 // } 316 317 // override 318 319 // Message!(T) receive(long timeout) { 320 // try { 321 // if (timeout < 0) { 322 // this.replyLatch.await(); 323 // this.hasReceived = true; 324 // } 325 // else { 326 // if (this.replyLatch.await(timeout, TimeUnit.MILLISECONDS)) { 327 // this.hasReceived = true; 328 // } 329 // else { 330 // this.hasTimedOut = true; 331 // } 332 // } 333 // } 334 // catch (InterruptedException ex) { 335 // Thread.currentThread().interrupt(); 336 // } 337 // return this.replyMessage; 338 // } 339 340 // override 341 // bool send(MessageBase message) { 342 // return this.send(message, -1); 343 // } 344 345 // override 346 // bool send(MessageBase message, long timeout) { 347 // this.replyMessage = message; 348 // bool alreadyReceivedReply = this.hasReceived; 349 // this.replyLatch.countDown(); 350 351 // string errorDescription = null; 352 // if (this.hasTimedOut) { 353 // errorDescription = "Reply message received but the receiving thread has exited due to a timeout"; 354 // } 355 // else if (alreadyReceivedReply) { 356 // errorDescription = "Reply message received but the receiving thread has already received a reply"; 357 // } 358 // else if (this.hasSendFailed) { 359 // errorDescription = "Reply message received but the receiving thread has exited due to " ~ 360 // "an exception while sending the request message"; 361 // } 362 363 // if (errorDescription !is null) { 364 // version(HUNT_DEBUG) { 365 // warningf(errorDescription ~ ":" ~ message); 366 // } 367 // if (this.throwExceptionOnLateReply) { 368 // throw new MessageDeliveryException(message, errorDescription); 369 // } 370 // } 371 372 // return true; 373 // } 374 // }