1 /*
2  * Copyright 2002-2018 the original author or authors.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 module hunt.stomp.core.GenericMessagingTemplate;
18 
19 
20 import hunt.stomp.Message;
21 import hunt.stomp.MessageChannel;
22 import hunt.stomp.MessagingException;
23 import hunt.stomp.MessageHeaders;
24 import hunt.stomp.support.MessageBuilder;
25 import hunt.stomp.support.MessageHeaderAccessor;
26 
27 // import java.util.concurrent.CountDownLatch;
28 
29 import hunt.Nullable;
30 import hunt.Number;
31 import hunt.logging;
32 
33 // import hunt.framework.beans.BeansException;
34 // import hunt.framework.beans.factory.BeanFactory;
35 // import hunt.framework.beans.factory.BeanFactoryAware;
36 
37 import std.conv;
38 
39 
40 /**
41  * A messaging template that resolves destinations names to {@link MessageChannel}'s
42  * to send and receive messages from.
43  *
44  * @author Mark Fisher
45  * @author Rossen Stoyanchev
46  * @author Gary Russell
47  * @since 4.0
48  */
49 class GenericMessagingTemplate(T) : AbstractDestinationResolvingMessagingTemplate!MessageChannel
50 		{ // implements BeanFactoryAware
51 
52 	/**
53 	 * The default header key used for a send timeout.
54 	 */
55 	enum string DEFAULT_SEND_TIMEOUT_HEADER = "sendTimeout";
56 
57 	/**
58 	 * The default header key used for a receive timeout.
59 	 */
60 	enum string DEFAULT_RECEIVE_TIMEOUT_HEADER = "receiveTimeout";
61 
62 	private long sendTimeout = -1;
63 
64 	private long receiveTimeout = -1;
65 
66 	private string sendTimeoutHeader = DEFAULT_SEND_TIMEOUT_HEADER;
67 
68 	private string receiveTimeoutHeader = DEFAULT_RECEIVE_TIMEOUT_HEADER;
69 
70 	private bool throwExceptionOnLateReply = false;
71 
72 
73 	/**
74 	 * Configure the default timeout value to use for send operations.
75 	 * May be overridden for individual messages.
76 	 * @param sendTimeout the send timeout in milliseconds
77 	 * @see #setSendTimeoutHeader(string)
78 	 */
79 	void setSendTimeout(long sendTimeout) {
80 		this.sendTimeout = sendTimeout;
81 	}
82 
83 	/**
84 	 * Return the configured default send operation timeout value.
85 	 */
86 	long getSendTimeout() {
87 		return this.sendTimeout;
88 	}
89 
90 	/**
91 	 * Configure the default timeout value to use for receive operations.
92 	 * May be overridden for individual messages when using sendAndReceive
93 	 * operations.
94 	 * @param receiveTimeout the receive timeout in milliseconds
95 	 * @see #setReceiveTimeoutHeader(string)
96 	 */
97 	void setReceiveTimeout(long receiveTimeout) {
98 		this.receiveTimeout = receiveTimeout;
99 	}
100 
101 	/**
102 	 * Return the configured receive operation timeout value.
103 	 */
104 	long getReceiveTimeout() {
105 		return this.receiveTimeout;
106 	}
107 
108 	/**
109 	 * Set the name of the header used to determine the send timeout (if present).
110 	 * Default {@value #DEFAULT_SEND_TIMEOUT_HEADER}.
111 	 * <p>The header is removed before sending the message to avoid propagation.
112 	 * @since 5.0
113 	 */
114 	void setSendTimeoutHeader(string sendTimeoutHeader) {
115 		assert(sendTimeoutHeader, "'sendTimeoutHeader' cannot be null");
116 		this.sendTimeoutHeader = sendTimeoutHeader;
117 	}
118 
119 	/**
120 	 * Return the configured send-timeout header.
121 	 * @since 5.0
122 	 */
123 	string getSendTimeoutHeader() {
124 		return this.sendTimeoutHeader;
125 	}
126 
127 	/**
128 	 * Set the name of the header used to determine the send timeout (if present).
129 	 * Default {@value #DEFAULT_RECEIVE_TIMEOUT_HEADER}.
130 	 * The header is removed before sending the message to avoid propagation.
131 	 * @since 5.0
132 	 */
133 	void setReceiveTimeoutHeader(string receiveTimeoutHeader) {
134 		assert(receiveTimeoutHeader, "'receiveTimeoutHeader' cannot be null");
135 		this.receiveTimeoutHeader = receiveTimeoutHeader;
136 	}
137 
138 	/**
139 	 * Return the configured receive-timeout header.
140 	 * @since 5.0
141 	 */
142 	string getReceiveTimeoutHeader() {
143 		return this.receiveTimeoutHeader;
144 	}
145 
146 	/**
147 	 * Whether the thread sending a reply should have an exception raised if the
148 	 * receiving thread isn't going to receive the reply either because it timed out,
149 	 * or because it already received a reply, or because it got an exception while
150 	 * sending the request message.
151 	 * <p>The default value is {@code false} in which case only a WARN message is logged.
152 	 * If set to {@code true} a {@link MessageDeliveryException} is raised in addition
153 	 * to the log message.
154 	 * @param throwExceptionOnLateReply whether to throw an exception or not
155 	 */
156 	void setThrowExceptionOnLateReply(bool throwExceptionOnLateReply) {
157 		this.throwExceptionOnLateReply = throwExceptionOnLateReply;
158 	}
159 
160 	override
161 	void setBeanFactory(BeanFactory beanFactory) {
162 		setDestinationResolver(new BeanFactoryMessageChannelDestinationResolver(beanFactory));
163 	}
164 
165 
166 	override
167 	protected final void doSend(MessageChannel channel, MessageBase message) {
168 		doSend(channel, message, sendTimeout(message));
169 	}
170 
171 	protected final void doSend(MessageChannel channel, MessageBase message, long timeout) {
172 		assert(channel, "MessageChannel is required");
173 
174 		// Message!(T) messageToSend = message;
175 		// MessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, MessageHeaderAccessor.class);
176 		// if (accessor !is null && accessor.isMutable()) {
177 		// 	accessor.removeHeader(this.sendTimeoutHeader);
178 		// 	accessor.removeHeader(this.receiveTimeoutHeader);
179 		// 	accessor.setImmutable();
180 		// }
181 		// else if (message.getHeaders().containsKey(this.sendTimeoutHeader)
182 		// 		|| message.getHeaders().containsKey(this.receiveTimeoutHeader)) {
183 		// 	messageToSend = MessageBuilder.fromMessage(message)
184 		// 			.setHeader(this.sendTimeoutHeader, null)
185 		// 			.setHeader(this.receiveTimeoutHeader, null)
186 		// 			.build();
187 		// }
188 
189 		// bool sent = (timeout >= 0 ? channel.send(messageToSend, timeout) : channel.send(messageToSend));
190 
191 		// if (!sent) {
192 		// 	throw new MessageDeliveryException(message,
193 		// 			"Failed to send message to channel '" ~ channel ~ "' within timeout: " ~ timeout);
194 		// }
195 	}
196 
197 	override
198 	
199 	protected final Message!(T) doReceive(MessageChannel channel) {
200 		return doReceive(channel, this.receiveTimeout);
201 	}
202 
203 	
204 	protected final Message!(T) doReceive(MessageChannel channel, long timeout) {
205 		assert(channel, "MessageChannel is required");
206 		PollableChannel pollChannel = cast(PollableChannel) channel;
207 		assert(pollChannel !is null, "A PollableChannel is required to receive messages");
208 
209 		MessageBase message = (timeout >= 0 ?
210 				pollChannel.receive(timeout) : pollChannel.receive());
211 
212 		if (message is null && logger.isTraceEnabled()) {
213 			trace("Failed to receive message from channel '" ~ channel ~ "' within timeout: " ~ timeout.to!string());
214 		}
215 
216 		return message;
217 	}
218 
219 	override
220 	
221 	protected final Message!(T) doSendAndReceive(MessageChannel channel, Message!(T) requestMessage) {
222 		assert(channel, "'channel' is required");
223 		Object originalReplyChannelHeader = requestMessage.getHeaders().getReplyChannel();
224 		Object originalErrorChannelHeader = requestMessage.getHeaders().getErrorChannel();
225 
226 		long sendTimeout = sendTimeout(requestMessage);
227 		long receiveTimeout = receiveTimeout(requestMessage);
228 
229 		TemporaryReplyChannel tempReplyChannel = new TemporaryReplyChannel(this.throwExceptionOnLateReply);
230 		requestMessage = MessageBuilder.fromMessage(requestMessage).setReplyChannel(tempReplyChannel)
231 				.setHeader(this.sendTimeoutHeader, null)
232 				.setHeader(this.receiveTimeoutHeader, null)
233 				.setErrorChannel(tempReplyChannel).build();
234 
235 		try {
236 			doSend(channel, requestMessage, sendTimeout);
237 		}
238 		catch (RuntimeException ex) {
239 			tempReplyChannel.setSendFailed(true);
240 			throw ex;
241 		}
242 
243 		Message!(T) replyMessage = this.doReceive(tempReplyChannel, receiveTimeout);
244 		if (replyMessage !is null) {
245 			replyMessage = MessageBuilder.fromMessage(replyMessage)
246 					.setHeader(MessageHeaders.REPLY_CHANNEL, originalReplyChannelHeader)
247 					.setHeader(MessageHeaders.ERROR_CHANNEL, originalErrorChannelHeader)
248 					.build();
249 		}
250 
251 		return replyMessage;
252 	}
253 
254 	private long sendTimeout(Message!(T) requestMessage) {
255 		Long sendTimeout = headerToLong(requestMessage.getHeaders().get(this.sendTimeoutHeader));
256 		return (sendTimeout !is null ? sendTimeout : this.sendTimeout);
257 	}
258 
259 	private long receiveTimeout(Message!(T) requestMessage) {
260 		Long receiveTimeout = headerToLong(requestMessage.getHeaders().get(this.receiveTimeoutHeader));
261 		return (receiveTimeout !is null ? receiveTimeout : this.receiveTimeout);
262 	}
263 
264 	
265 	private Long headerToLong(Object headerValue) {
266 		Number number = cast(Number)headerValue;
267 		if (number !is null) {
268 			return number.longValue();
269 		}
270 		else {
271 			auto stringValue = cast(Nullable!string)headerValue;
272 			if (stringValue !is null) {
273 				return Long.parseLong(stringValue.value);
274 			} else {
275 				return null;
276 			}
277 
278 		} 
279 	}
280 }
281 
282 
283 
284 
285 /**
286  * A temporary channel for receiving a single reply message.
287  */
288 // private static final class TemporaryReplyChannel : PollableChannel {
289 
290 //     private final CountDownLatch replyLatch = new CountDownLatch(1);
291 
292 //     private final bool throwExceptionOnLateReply;
293 
294     
295 //     private Message!(T) replyMessage;
296 
297 //     private bool hasReceived;
298 
299 //     private bool hasTimedOut;
300 
301 //     private bool hasSendFailed;
302 
303 //     this(bool throwExceptionOnLateReply) {
304 //         this.throwExceptionOnLateReply = throwExceptionOnLateReply;
305 //     }
306 
307 //     void setSendFailed(bool hasSendError) {
308 //         this.hasSendFailed = hasSendError;
309 //     }
310 
311 //     override
312     
313 //     Message!(T) receive() {
314 //         return this.receive(-1);
315 //     }
316 
317 //     override
318     
319 //     Message!(T) receive(long timeout) {
320 //         try {
321 //             if (timeout < 0) {
322 //                 this.replyLatch.await();
323 //                 this.hasReceived = true;
324 //             }
325 //             else {
326 //                 if (this.replyLatch.await(timeout, TimeUnit.MILLISECONDS)) {
327 //                     this.hasReceived = true;
328 //                 }
329 //                 else {
330 //                     this.hasTimedOut = true;
331 //                 }
332 //             }
333 //         }
334 //         catch (InterruptedException ex) {
335 //             Thread.currentThread().interrupt();
336 //         }
337 //         return this.replyMessage;
338 //     }
339 
340 //     override
341 //     bool send(MessageBase message) {
342 //         return this.send(message, -1);
343 //     }
344 
345 //     override
346 //     bool send(MessageBase message, long timeout) {
347 //         this.replyMessage = message;
348 //         bool alreadyReceivedReply = this.hasReceived;
349 //         this.replyLatch.countDown();
350 
351 //         string errorDescription = null;
352 //         if (this.hasTimedOut) {
353 //             errorDescription = "Reply message received but the receiving thread has exited due to a timeout";
354 //         }
355 //         else if (alreadyReceivedReply) {
356 //             errorDescription = "Reply message received but the receiving thread has already received a reply";
357 //         }
358 //         else if (this.hasSendFailed) {
359 //             errorDescription = "Reply message received but the receiving thread has exited due to " ~
360 //                     "an exception while sending the request message";
361 //         }
362 
363 //         if (errorDescription !is null) {
364 //             version(HUNT_DEBUG) {
365 //                 warningf(errorDescription ~ ":" ~ message);
366 //             }
367 //             if (this.throwExceptionOnLateReply) {
368 //                 throw new MessageDeliveryException(message, errorDescription);
369 //             }
370 //         }
371 
372 //         return true;
373 //     }
374 // }