1 /* 2 * Copyright 2002-2018 the original author or authors. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 module hunt.stomp.simp.SimpMessagingTemplate; 18 19 import hunt.stomp.simp.SimpMessageHeaderAccessor; 20 import hunt.stomp.simp.SimpMessageSendingOperations; 21 import hunt.stomp.simp.SimpMessageType; 22 23 import hunt.stomp.Message; 24 import hunt.stomp.MessageChannel; 25 import hunt.stomp.MessageHeaders; 26 import hunt.stomp.MessagingException; 27 import hunt.stomp.core.AbstractMessageSendingTemplate; 28 import hunt.stomp.core.MessagePostProcessor; 29 import hunt.stomp.support.MessageBuilder; 30 import hunt.stomp.support.MessageHeaderAccessor; 31 32 import hunt.stomp.support.NativeMessageHeaderAccessor; 33 // import hunt.framework.util.StringUtils; 34 35 import hunt.collection.Map; 36 import hunt.logging; 37 import std.array; 38 import std.conv; 39 import std.string; 40 41 /** 42 * An implementation of 43 * {@link hunt.stomp.simp.SimpMessageSendingOperations}. 44 * 45 * <p>Also provides methods for sending messages to a user. See 46 * {@link hunt.stomp.simp.user.UserDestinationResolver 47 * UserDestinationResolver} 48 * for more on user destinations. 49 * 50 * @author Rossen Stoyanchev 51 * @since 4.0 52 */ 53 class SimpMessagingTemplate : AbstractMessageSendingTemplate!(string), SimpMessageSendingOperations { 54 55 private MessageChannel messageChannel; 56 57 private string destinationPrefix = "/user/"; 58 59 private long sendTimeout = -1; 60 61 private MessageHeaderInitializer headerInitializer; 62 63 64 /** 65 * Create a new {@link SimpMessagingTemplate} instance. 66 * @param messageChannel the message channel (never {@code null}) 67 */ 68 this(MessageChannel messageChannel) { 69 assert(messageChannel, "MessageChannel must not be null"); 70 this.messageChannel = messageChannel; 71 } 72 73 74 /** 75 * Return the configured message channel. 76 */ 77 MessageChannel getMessageChannel() { 78 return this.messageChannel; 79 } 80 81 /** 82 * Configure the prefix to use for destinations targeting a specific user. 83 * <p>The default value is "/user/". 84 * @see hunt.stomp.simp.user.UserDestinationMessageHandler 85 */ 86 void setUserDestinationPrefix(string prefix) { 87 assert(!prefix.empty, "User destination prefix must not be empty"); 88 this.destinationPrefix = (prefix.endsWith("/") ? prefix : prefix ~ "/"); 89 90 } 91 92 /** 93 * Return the configured user destination prefix. 94 */ 95 string getUserDestinationPrefix() { 96 return this.destinationPrefix; 97 } 98 99 /** 100 * Specify the timeout value to use for send operations (in milliseconds). 101 */ 102 void setSendTimeout(long sendTimeout) { 103 this.sendTimeout = sendTimeout; 104 } 105 106 /** 107 * Return the configured send timeout (in milliseconds). 108 */ 109 long getSendTimeout() { 110 return this.sendTimeout; 111 } 112 113 /** 114 * Configure a {@link MessageHeaderInitializer} to apply to the headers of all 115 * messages created through the {@code SimpMessagingTemplate}. 116 * <p>By default, this property is not set. 117 */ 118 void setHeaderInitializer(MessageHeaderInitializer headerInitializer) { 119 this.headerInitializer = headerInitializer; 120 } 121 122 /** 123 * Return the configured header initializer. 124 */ 125 126 MessageHeaderInitializer getHeaderInitializer() { 127 return this.headerInitializer; 128 } 129 130 131 /** 132 * If the headers of the given message already contain a 133 * {@link hunt.stomp.simp.SimpMessageHeaderAccessor#DESTINATION_HEADER 134 * SimpMessageHeaderAccessor#DESTINATION_HEADER} then the message is sent without 135 * further changes. 136 * <p>If a destination header is not already present ,the message is sent 137 * to the configured {@link #setDefaultDestination(Object) defaultDestination} 138 * or an exception an {@code IllegalStateException} is raised if that isn't 139 * configured. 140 * @param message the message to send (never {@code null}) 141 */ 142 override 143 void send(MessageBase message) { 144 assert(message !is null, "Message is required"); 145 string destination = SimpMessageHeaderAccessor.getDestination(message.getHeaders()); 146 if (destination !is null) { 147 sendInternal(message); 148 return; 149 } 150 doSend(getRequiredDefaultDestination(), message); 151 } 152 153 override 154 protected void doSend(string destination, MessageBase message) { 155 assert(destination, "Destination must not be null"); 156 157 SimpMessageHeaderAccessor simpAccessor = 158 MessageHeaderAccessor.getAccessor!SimpMessageHeaderAccessor(message); 159 160 if (simpAccessor !is null) { 161 if (simpAccessor.isMutable()) { 162 simpAccessor.setDestination(destination); 163 simpAccessor.setMessageTypeIfNotSet(SimpMessageType.MESSAGE); 164 simpAccessor.setImmutable(); 165 sendInternal(message); 166 return; 167 } 168 else { 169 // Try and keep the original accessor type 170 simpAccessor = cast(SimpMessageHeaderAccessor) MessageHeaderAccessor.getMutableAccessor(message); 171 initHeaders(simpAccessor); 172 } 173 } 174 else { 175 simpAccessor = SimpMessageHeaderAccessor.wrap(message); 176 initHeaders(simpAccessor); 177 } 178 179 simpAccessor.setDestination(destination); 180 simpAccessor.setMessageTypeIfNotSet(SimpMessageType.MESSAGE); 181 Message!(string) m = cast(Message!(string))message; 182 if(m is null) { 183 warning("Can't handle messsage: ", typeid(cast(Object)message)); 184 } else { 185 message = MessageHelper.createMessage(m.getPayload(), simpAccessor.getMessageHeaders()); 186 sendInternal(message); 187 } 188 } 189 190 private void sendInternal(MessageBase message) { 191 string destination = SimpMessageHeaderAccessor.getDestination(message.getHeaders()); 192 assert(destination, "Destination header required"); 193 194 long timeout = this.sendTimeout; 195 bool sent = (timeout >= 0 ? this.messageChannel.send(message, timeout) : this.messageChannel.send(message)); 196 197 if (!sent) { 198 throw new MessageDeliveryException(message, 199 "Failed to send message to destination '" ~ destination ~ 200 "' within timeout: " ~ timeout.to!string()); 201 } 202 } 203 204 private void initHeaders(SimpMessageHeaderAccessor simpAccessor) { 205 if (getHeaderInitializer() !is null) { 206 getHeaderInitializer().initHeaders(simpAccessor); 207 } 208 } 209 210 211 override 212 void convertAndSendToUser(string user, string destination, Object payload) { 213 convertAndSendToUser(user, destination, payload, cast(MessagePostProcessor) null); 214 } 215 216 override 217 void convertAndSendToUser(string user, string destination, Object payload, 218 Map!(string, Object) headers) { 219 220 convertAndSendToUser(user, destination, payload, headers, null); 221 } 222 223 override 224 void convertAndSendToUser(string user, string destination, Object payload, 225 MessagePostProcessor postProcessor) { 226 227 convertAndSendToUser(user, destination, payload, null, postProcessor); 228 } 229 230 override 231 void convertAndSendToUser(string user, string destination, Object payload, 232 Map!(string, Object) headers, MessagePostProcessor postProcessor) { 233 234 assert(user, "User must not be null"); 235 user = user.replace("/", "%2F"); 236 destination = destination.startsWith("/") ? destination : "/" ~ destination; 237 super.convertAndSend(this.destinationPrefix ~ user ~ destination, payload, headers, postProcessor); 238 } 239 240 241 /** 242 * Creates a new map and puts the given headers under the key 243 * {@link NativeMessageHeaderAccessor#NATIVE_HEADERS NATIVE_HEADERS NATIVE_HEADERS NATIVE_HEADERS}. 244 * effectively treats the input header map as headers to be sent out to the 245 * destination. 246 * <p>However if the given headers already contain the key 247 * {@code NATIVE_HEADERS NATIVE_HEADERS} then the same headers instance is 248 * returned without changes. 249 * <p>Also if the given headers were prepared and obtained with 250 * {@link SimpMessageHeaderAccessor#getMessageHeaders()} then the same headers 251 * instance is also returned without changes. 252 */ 253 override 254 protected Map!(string, Object) processHeadersToSend(Map!(string, Object) headers) { 255 if (headers is null) { 256 SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE); 257 initHeaders(headerAccessor); 258 headerAccessor.setLeaveMutable(true); 259 return headerAccessor.getMessageHeaders(); 260 } 261 262 if (headers.containsKey(NativeMessageHeaderAccessor.NATIVE_HEADERS)) { 263 return headers; 264 } 265 266 MessageHeaders mheaders = cast(MessageHeaders) headers; 267 if (mheaders !is null) { 268 SimpMessageHeaderAccessor accessor = 269 MessageHeaderAccessor.getAccessor!(SimpMessageHeaderAccessor)(mheaders); 270 if (accessor !is null) { 271 return headers; 272 } 273 } 274 275 SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE); 276 initHeaders(headerAccessor); 277 foreach(string key, Object value; headers) { 278 headerAccessor.setNativeHeader(key, (value !is null ? value.toString() : null)); 279 } 280 return headerAccessor.getMessageHeaders(); 281 } 282 283 }