1 /* 2 * Copyright 2002-2017 the original author or authors. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 module hunt.stomp.simp.SimpMessageHeaderAccessor; 18 19 import hunt.stomp.simp.SimpMessageType; 20 21 import hunt.stomp.Message; 22 import hunt.stomp.support.IdTimestampMessageHeaderInitializer; 23 import hunt.stomp.support.MessageHeaderAccessor; 24 import hunt.stomp.support.NativeMessageHeaderAccessor; 25 26 import hunt.collection.List; 27 import hunt.collection.Map; 28 import hunt.Nullable; 29 import hunt.text.Common; 30 import hunt.text.StringBuilder; 31 32 version(Have_hunt_security) { 33 import hunt.security.Principal; 34 } 35 36 import std.conv; 37 38 /** 39 * A base class for working with message headers in simple messaging protocols that 40 * support basic messaging patterns. Provides uniform access to specific values common 41 * across protocols such as a destination, message type (e.g. publish, subscribe, etc), 42 * session id, and others. 43 * 44 * <p>Use one of the static factory method in this class, then call getters and setters, 45 * and at the end if necessary call {@link #toMap()} to obtain the updated headers. 46 * 47 * @author Rossen Stoyanchev 48 * @since 4.0 49 */ 50 class SimpMessageHeaderAccessor : NativeMessageHeaderAccessor { 51 52 private __gshared IdTimestampMessageHeaderInitializer headerInitializer; 53 54 shared static this() { 55 headerInitializer = new IdTimestampMessageHeaderInitializer(); 56 headerInitializer.setDisableIdGeneration(); 57 headerInitializer.setEnableTimestamp(false); 58 } 59 60 // SIMP header names 61 62 enum string DESTINATION_HEADER = "simpDestination"; 63 64 enum string MESSAGE_TYPE_HEADER = "simpMessageType"; 65 66 enum string SESSION_ID_HEADER = "simpSessionId"; 67 68 enum string SESSION_ATTRIBUTES = "simpSessionAttributes"; 69 70 enum string SUBSCRIPTION_ID_HEADER = "simpSubscriptionId"; 71 72 enum string USER_HEADER = "simpUser"; 73 74 enum string CONNECT_MESSAGE_HEADER = "simpConnectMessage"; 75 76 enum string DISCONNECT_MESSAGE_HEADER = "simpDisconnectMessage"; 77 78 enum string HEART_BEAT_HEADER = "simpHeartbeat"; 79 80 81 /** 82 * A header for internal use with "user" destinations where we need to 83 * restore the destination prior to sending messages to clients. 84 */ 85 enum string ORIGINAL_DESTINATION = "simpOrigDestination"; 86 87 /** 88 * A header that indicates to the broker that the sender will ignore errors. 89 * The header is simply checked for presence or absence. 90 */ 91 enum string IGNORE_ERROR = "simpIgnoreError"; 92 93 94 /** 95 * A constructor for creating new message headers. 96 * This constructor is protected. See factory methods in this and sub-classes. 97 */ 98 protected this(SimpMessageType messageType, 99 Map!(string, List!string) externalSourceHeaders) { 100 101 super(externalSourceHeaders); 102 setHeader(MESSAGE_TYPE_HEADER, new Nullable!SimpMessageType(messageType)); 103 headerInitializer.initHeaders(this); 104 } 105 106 /** 107 * A constructor for accessing and modifying existing message headers. This 108 * constructor is protected. See factory methods in this and sub-classes. 109 */ 110 protected this(MessageBase message) { 111 super(message); 112 headerInitializer.initHeaders(this); 113 } 114 115 override 116 protected MessageHeaderAccessor createAccessor(MessageBase message) { 117 return wrap(message); 118 } 119 120 void setMessageTypeIfNotSet(SimpMessageType messageType) { 121 if (getMessageType() is null) { 122 setHeader(MESSAGE_TYPE_HEADER, new Nullable!SimpMessageType(messageType)); 123 } 124 } 125 126 Nullable!SimpMessageType getMessageType() { 127 return cast(Nullable!SimpMessageType) getHeader(MESSAGE_TYPE_HEADER); 128 } 129 130 void setDestination(string destination) { 131 setHeader(DESTINATION_HEADER, destination); 132 } 133 134 string getDestination() { 135 return getHeaderAs!(string)(DESTINATION_HEADER); 136 } 137 138 void setSubscriptionId(string subscriptionId) { 139 setHeader(SUBSCRIPTION_ID_HEADER, subscriptionId); 140 } 141 142 143 string getSubscriptionId() { 144 return getHeaderAs!(string)(SUBSCRIPTION_ID_HEADER); 145 } 146 147 void setSessionId(int sessionId) { 148 setHeader(SESSION_ID_HEADER, sessionId.to!string()); 149 } 150 151 void setSessionId(string sessionId) { 152 setHeader(SESSION_ID_HEADER, sessionId); 153 } 154 155 /** 156 * Return the id of the current session. 157 */ 158 159 string getSessionId() { 160 return getHeaderAs!(string)(SESSION_ID_HEADER); 161 } 162 163 /** 164 * A static alternative for access to the session attributes header. 165 */ 166 void setSessionAttributes(Map!(string, Object) attributes) { 167 setHeader(SESSION_ATTRIBUTES, attributes); 168 } 169 170 /** 171 * Return the attributes associated with the current session. 172 */ 173 Map!(string, Object) getSessionAttributes() { 174 return cast(Map!(string, Object)) getHeader(SESSION_ATTRIBUTES); 175 } 176 177 // void setUser(Principal principal) { 178 // setHeader(USER_HEADER, principal); 179 // } 180 181 /** 182 * Return the user associated with the current session. 183 */ 184 185 // Principal getUser() { 186 // return (Principal) getHeader(USER_HEADER); 187 // } 188 189 override 190 string getShortLogMessage(Object payload) { 191 if (getMessageType() is null) { 192 return super.getDetailedLogMessage(payload); 193 } 194 StringBuilder sb = getBaseLogMessage(); 195 Map!(string, Object) map = getSessionAttributes(); 196 if (map !is null && !map.isEmpty()) { 197 sb.append(" attributes[").append(map.size()).append("]"); 198 } 199 sb.append(getShortPayloadLogMessage(payload)); 200 return sb.toString(); 201 } 202 203 204 override 205 string getDetailedLogMessage(Object payload) { 206 if (getMessageType() is null) { 207 return super.getDetailedLogMessage(payload); 208 } 209 StringBuilder sb = getBaseLogMessage(); 210 auto map = getSessionAttributes(); 211 if (map !is null && !map.isEmpty()) { 212 sb.append(" attributes=").append(map.toString()); 213 } 214 215 auto m = cast(Map!(string, List!string)) getHeader(NATIVE_HEADERS); 216 if (m !is null && !m.isEmpty()) { 217 sb.append(" nativeHeaders=").append(m.toString()); 218 } 219 sb.append(getDetailedPayloadLogMessage(payload)); 220 return sb.toString(); 221 } 222 223 private StringBuilder getBaseLogMessage() { 224 StringBuilder sb = new StringBuilder(); 225 auto messageType = getMessageType(); 226 sb.append(messageType !is null ? messageType.to!string() : to!string(SimpMessageType.OTHER)); 227 string destination = getDestination(); 228 if (destination !is null) { 229 sb.append(" destination=").append(destination); 230 } 231 string subscriptionId = getSubscriptionId(); 232 if (subscriptionId !is null) { 233 sb.append(" subscriptionId=").append(subscriptionId); 234 } 235 sb.append(" session=").append(getSessionId()); 236 237 // Principal user = getUser(); 238 // if (user !is null) { 239 // sb.append(" user=").append(user.getName()); 240 // } 241 242 return sb; 243 } 244 245 246 // Static factory methods and accessors 247 248 /** 249 * Create an instance with 250 * {@link hunt.stomp.simp.SimpMessageType} {@code MESSAGE}. 251 */ 252 static SimpMessageHeaderAccessor create() { 253 return new SimpMessageHeaderAccessor(SimpMessageType.MESSAGE, null); 254 } 255 256 /** 257 * Create an instance with the given 258 * {@link hunt.stomp.simp.SimpMessageType}. 259 */ 260 static SimpMessageHeaderAccessor create(SimpMessageType messageType) { 261 return new SimpMessageHeaderAccessor(messageType, null); 262 } 263 264 /** 265 * Create an instance from the payload and headers of the given Message. 266 */ 267 static SimpMessageHeaderAccessor wrap(MessageBase message) { 268 return new SimpMessageHeaderAccessor(message); 269 } 270 271 static Nullable!SimpMessageType getMessageType(Map!(string, Object) headers) { 272 auto h = cast(Nullable!SimpMessageType)headers.get(MESSAGE_TYPE_HEADER); 273 return h; 274 } 275 276 277 static string getDestination(Map!(string, Object) headers) { 278 auto h = cast(Nullable!string)headers.get(DESTINATION_HEADER); 279 if(h is null) 280 return null; 281 else 282 return cast(string) h; 283 } 284 285 286 static string getSubscriptionId(Map!(string, Object) headers) { 287 auto h = cast(Nullable!string)headers.get(SUBSCRIPTION_ID_HEADER); 288 if(h is null) 289 return null; 290 else 291 return cast(string) h; 292 } 293 294 295 static string getSessionId(Map!(string, Object) headers) { 296 auto h = cast(Nullable!string)headers.get(SESSION_ID_HEADER); 297 if(h is null) 298 return null; 299 else 300 return cast(string) h; 301 } 302 303 304 static Map!(string, Object) getSessionAttributes(Map!(string, Object) headers) { 305 return cast(Map!(string, Object)) headers.get(SESSION_ATTRIBUTES); 306 } 307 308 309 // static Principal getUser(Map!(string, Object) headers) { 310 // return cast(Principal) headers.get(USER_HEADER); 311 // } 312 313 314 static long[] getHeartbeat(Map!(string, Object) headers) { 315 auto h = cast(Nullable!(long[]))headers.get(HEART_BEAT_HEADER); 316 if(h is null) 317 return null; 318 else 319 return cast(long[]) h; 320 } 321 322 }