1 /*
2  * Copyright 2002-2018 the original author or authors.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 module hunt.stomp.simp.SimpMessagingTemplate;
18 
19 import hunt.stomp.simp.SimpMessageHeaderAccessor;
20 import hunt.stomp.simp.SimpMessageSendingOperations;
21 import hunt.stomp.simp.SimpMessageType;
22 
23 import hunt.stomp.Message;
24 import hunt.stomp.MessageChannel;
25 import hunt.stomp.MessageHeaders;
26 import hunt.stomp.MessagingException;
27 import hunt.stomp.core.AbstractMessageSendingTemplate;
28 import hunt.stomp.core.MessagePostProcessor;
29 import hunt.stomp.support.MessageBuilder;
30 import hunt.stomp.support.MessageHeaderAccessor;
31 
32 import hunt.stomp.support.NativeMessageHeaderAccessor;
33 // import hunt.framework.util.StringUtils;
34 
35 import hunt.collection.Map;
36 import hunt.logging;
37 import std.array;
38 import std.conv;
39 import std.string;
40 
41 /**
42  * An implementation of
43  * {@link hunt.stomp.simp.SimpMessageSendingOperations}.
44  *
45  * <p>Also provides methods for sending messages to a user. See
46  * {@link hunt.stomp.simp.user.UserDestinationResolver
47  * UserDestinationResolver}
48  * for more on user destinations.
49  *
50  * @author Rossen Stoyanchev
51  * @since 4.0
52  */
53 class SimpMessagingTemplate : AbstractMessageSendingTemplate!(string), SimpMessageSendingOperations { 
54 
55 	private MessageChannel messageChannel;
56 
57 	private string destinationPrefix = "/user/";
58 
59 	private long sendTimeout = -1;
60 	
61 	private MessageHeaderInitializer headerInitializer;
62 
63 
64 	/**
65 	 * Create a new {@link SimpMessagingTemplate} instance.
66 	 * @param messageChannel the message channel (never {@code null})
67 	 */
68 	this(MessageChannel messageChannel) {
69 		assert(messageChannel, "MessageChannel must not be null");
70 		this.messageChannel = messageChannel;
71 	}
72 
73 
74 	/**
75 	 * Return the configured message channel.
76 	 */
77 	MessageChannel getMessageChannel() {
78 		return this.messageChannel;
79 	}
80 
81 	/**
82 	 * Configure the prefix to use for destinations targeting a specific user.
83 	 * <p>The default value is "/user/".
84 	 * @see hunt.stomp.simp.user.UserDestinationMessageHandler
85 	 */
86 	void setUserDestinationPrefix(string prefix) {
87 		assert(!prefix.empty, "User destination prefix must not be empty");
88 		this.destinationPrefix = (prefix.endsWith("/") ? prefix : prefix ~ "/");
89 
90 	}
91 
92 	/**
93 	 * Return the configured user destination prefix.
94 	 */
95 	string getUserDestinationPrefix() {
96 		return this.destinationPrefix;
97 	}
98 
99 	/**
100 	 * Specify the timeout value to use for send operations (in milliseconds).
101 	 */
102 	void setSendTimeout(long sendTimeout) {
103 		this.sendTimeout = sendTimeout;
104 	}
105 
106 	/**
107 	 * Return the configured send timeout (in milliseconds).
108 	 */
109 	long getSendTimeout() {
110 		return this.sendTimeout;
111 	}
112 
113 	/**
114 	 * Configure a {@link MessageHeaderInitializer} to apply to the headers of all
115 	 * messages created through the {@code SimpMessagingTemplate}.
116 	 * <p>By default, this property is not set.
117 	 */
118 	void setHeaderInitializer(MessageHeaderInitializer headerInitializer) {
119 		this.headerInitializer = headerInitializer;
120 	}
121 
122 	/**
123 	 * Return the configured header initializer.
124 	 */
125 	
126 	MessageHeaderInitializer getHeaderInitializer() {
127 		return this.headerInitializer;
128 	}
129 
130 
131 	/**
132 	 * If the headers of the given message already contain a
133 	 * {@link hunt.stomp.simp.SimpMessageHeaderAccessor#DESTINATION_HEADER
134 	 * SimpMessageHeaderAccessor#DESTINATION_HEADER} then the message is sent without
135 	 * further changes.
136 	 * <p>If a destination header is not already present ,the message is sent
137 	 * to the configured {@link #setDefaultDestination(Object) defaultDestination}
138 	 * or an exception an {@code IllegalStateException} is raised if that isn't
139 	 * configured.
140 	 * @param message the message to send (never {@code null})
141 	 */
142 	override
143 	void send(MessageBase message) {
144 		assert(message !is null, "Message is required");
145 		string destination = SimpMessageHeaderAccessor.getDestination(message.getHeaders());
146 		if (destination !is null) {
147 			sendInternal(message);
148 			return;
149 		}
150 		doSend(getRequiredDefaultDestination(), message);
151 	}
152 
153 	override
154 	protected void doSend(string destination, MessageBase message) {
155 		assert(destination, "Destination must not be null");
156 
157 		SimpMessageHeaderAccessor simpAccessor =
158 				MessageHeaderAccessor.getAccessor!SimpMessageHeaderAccessor(message);
159 
160 		if (simpAccessor !is null) {
161 			if (simpAccessor.isMutable()) {
162 				simpAccessor.setDestination(destination);
163 				simpAccessor.setMessageTypeIfNotSet(SimpMessageType.MESSAGE);
164 				simpAccessor.setImmutable();
165 				sendInternal(message);
166 				return;
167 			}
168 			else {
169 				// Try and keep the original accessor type
170 				simpAccessor = cast(SimpMessageHeaderAccessor) MessageHeaderAccessor.getMutableAccessor(message);
171 				initHeaders(simpAccessor);
172 			}
173 		}
174 		else {
175 			simpAccessor = SimpMessageHeaderAccessor.wrap(message);
176 			initHeaders(simpAccessor);
177 		}
178 
179 		simpAccessor.setDestination(destination);
180 		simpAccessor.setMessageTypeIfNotSet(SimpMessageType.MESSAGE);
181         Message!(string) m = cast(Message!(string))message;
182         if(m is null) {
183             warning("Can't handle messsage: ", typeid(cast(Object)message));
184         } else {
185             message = MessageHelper.createMessage(m.getPayload(), simpAccessor.getMessageHeaders());
186             sendInternal(message);
187         }
188 	}
189 
190 	private void sendInternal(MessageBase message) {
191 		string destination = SimpMessageHeaderAccessor.getDestination(message.getHeaders());
192 		assert(destination, "Destination header required");
193 
194 		long timeout = this.sendTimeout;
195 		bool sent = (timeout >= 0 ? this.messageChannel.send(message, timeout) : this.messageChannel.send(message));
196 
197 		if (!sent) {
198 			throw new MessageDeliveryException(message,
199 					"Failed to send message to destination '" ~ destination ~ 
200                     "' within timeout: " ~ timeout.to!string());
201 		}
202 	}
203 
204 	private void initHeaders(SimpMessageHeaderAccessor simpAccessor) {
205 		if (getHeaderInitializer() !is null) {
206 			getHeaderInitializer().initHeaders(simpAccessor);
207 		}
208 	}
209 
210 
211 	override
212 	void convertAndSendToUser(string user, string destination, Object payload) {
213 		convertAndSendToUser(user, destination, payload, cast(MessagePostProcessor) null);
214 	}
215 
216 	override
217 	void convertAndSendToUser(string user, string destination, Object payload,
218 			Map!(string, Object) headers) {
219 
220 		convertAndSendToUser(user, destination, payload, headers, null);
221 	}
222 
223 	override
224 	void convertAndSendToUser(string user, string destination, Object payload,
225 			MessagePostProcessor postProcessor) {
226 
227 		convertAndSendToUser(user, destination, payload, null, postProcessor);
228 	}
229 
230 	override
231 	void convertAndSendToUser(string user, string destination, Object payload,
232 			Map!(string, Object) headers, MessagePostProcessor postProcessor) {
233 
234 		assert(user, "User must not be null");
235 		user = user.replace("/", "%2F");
236 		destination = destination.startsWith("/") ? destination : "/" ~ destination;
237 		super.convertAndSend(this.destinationPrefix ~ user ~ destination, payload, headers, postProcessor);
238 	}
239 
240 
241 	/**
242 	 * Creates a new map and puts the given headers under the key
243 	 * {@link NativeMessageHeaderAccessor#NATIVE_HEADERS NATIVE_HEADERS NATIVE_HEADERS NATIVE_HEADERS}.
244 	 * effectively treats the input header map as headers to be sent out to the
245 	 * destination.
246 	 * <p>However if the given headers already contain the key
247 	 * {@code NATIVE_HEADERS NATIVE_HEADERS} then the same headers instance is
248 	 * returned without changes.
249 	 * <p>Also if the given headers were prepared and obtained with
250 	 * {@link SimpMessageHeaderAccessor#getMessageHeaders()} then the same headers
251 	 * instance is also returned without changes.
252 	 */
253 	override
254 	protected Map!(string, Object) processHeadersToSend(Map!(string, Object) headers) {
255 		if (headers is null) {
256 			SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
257 			initHeaders(headerAccessor);
258 			headerAccessor.setLeaveMutable(true);
259 			return headerAccessor.getMessageHeaders();
260 		}
261 
262 		if (headers.containsKey(NativeMessageHeaderAccessor.NATIVE_HEADERS)) {
263 			return headers;
264 		}
265 
266         MessageHeaders mheaders = cast(MessageHeaders) headers;
267 		if (mheaders !is null) {
268 			SimpMessageHeaderAccessor accessor =
269 					MessageHeaderAccessor.getAccessor!(SimpMessageHeaderAccessor)(mheaders);
270 			if (accessor !is null) {
271 				return headers;
272 			}
273 		}
274 
275 		SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
276 		initHeaders(headerAccessor);
277         foreach(string key, Object value; headers) {
278             headerAccessor.setNativeHeader(key, (value !is null ? value.toString() : null));
279         }
280 		return headerAccessor.getMessageHeaders();
281 	}
282 
283 }