1 /* 2 * Copyright 2002-2018 the original author or authors. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 module hunt.stomp.simp.stomp.StompDecoder; 18 19 import hunt.stomp.simp.stomp.StompHeaderAccessor; 20 import hunt.stomp.exception; 21 import hunt.stomp.Message; 22 import hunt.stomp.simp.stomp.StompCommand; 23 import hunt.stomp.simp.SimpMessageType; 24 import hunt.stomp.support.MessageHeaderAccessor; 25 import hunt.stomp.support.MessageBuilder; 26 import hunt.stomp.support.NativeMessageHeaderAccessor; 27 28 import hunt.collection; 29 import hunt.io.ByteArrayOutputStream; 30 import hunt.logging; 31 import hunt.Exceptions; 32 import hunt.Integer; 33 import hunt.Nullable; 34 import hunt.text.Common; 35 import hunt.text.StringBuilder; 36 import hunt.util.TypeUtils; 37 38 import std.conv; 39 import std.string; 40 41 42 /** 43 * Decodes one or more STOMP frames contained in a {@link ByteBuffer}. 44 * 45 * <p>An attempt is made to read all complete STOMP frames from the buffer, which 46 * could be zero, one, or more. If there is any left-over content, i.e. an incomplete 47 * STOMP frame, at the end the buffer is reset to point to the beginning of the 48 * partial content. The caller is then responsible for dealing with that 49 * incomplete content by buffering until there is more input available. 50 * 51 * @author Andy Wilkinson 52 * @author Rossen Stoyanchev 53 * @since 4.0 54 */ 55 class StompDecoder { 56 57 alias ByteMessage = Message!(byte[]); 58 59 enum byte[] HEARTBEAT_PAYLOAD = ['\n']; 60 61 private MessageHeaderInitializer headerInitializer; 62 63 64 /** 65 * Configure a {@link MessageHeaderInitializer} to apply to the headers of 66 * {@link Message Messages} from decoded STOMP frames. 67 */ 68 void setHeaderInitializer(MessageHeaderInitializer headerInitializer) { 69 this.headerInitializer = headerInitializer; 70 } 71 72 /** 73 * Return the configured {@code MessageHeaderInitializer}, if any. 74 */ 75 76 MessageHeaderInitializer getHeaderInitializer() { 77 return this.headerInitializer; 78 } 79 80 81 /** 82 * Decodes one or more STOMP frames from the given {@code ByteBuffer} into a 83 * list of {@link Message Messages}. If the input buffer contains partial STOMP frame 84 * content, or additional content with a partial STOMP frame, the buffer is 85 * reset and {@code null} is returned. 86 * @param byteBuffer the buffer to decode the STOMP frame from 87 * @return the decoded messages, or an empty list if none 88 * @throws StompConversionException raised in case of decoding issues 89 */ 90 List!(ByteMessage) decode(ByteBuffer byteBuffer) { 91 return decode(byteBuffer, null); 92 } 93 94 /** 95 * Decodes one or more STOMP frames from the given {@code buffer} and returns 96 * a list of {@link Message Messages}. 97 * <p>If the given ByteBuffer contains only partial STOMP frame content and no 98 * complete STOMP frames, an empty list is returned, and the buffer is reset to 99 * to where it was. 100 * <p>If the buffer contains one ore more STOMP frames, those are returned and 101 * the buffer reset to point to the beginning of the unused partial content. 102 * <p>The output partialMessageHeaders map is used to store successfully parsed 103 * headers in case of partial content. The caller can then check if a 104 * "content-length" header was read, which helps to determine how much more 105 * content is needed before the next attempt to decode. 106 * @param byteBuffer the buffer to decode the STOMP frame from 107 * @param partialMessageHeaders an empty output map that will store the last 108 * successfully parsed partialMessageHeaders in case of partial message content 109 * in cases where the partial buffer ended with a partial STOMP frame 110 * @return the decoded messages, or an empty list if none 111 * @throws StompConversionException raised in case of decoding issues 112 */ 113 List!(ByteMessage) decode(ByteBuffer byteBuffer, 114 MultiStringsMap partialMessageHeaders) { 115 116 version(HUNT_DEBUG) tracef("Decoding %s...", byteBuffer.toString()); 117 118 List!(ByteMessage) messages = new ArrayList!(ByteMessage)(); 119 while (byteBuffer.hasRemaining()) { 120 ByteMessage message = decodeMessage(byteBuffer, partialMessageHeaders); 121 if (message !is null) { 122 version(HUNT_DEBUG) tracef("messages: %s", messages.toString()); 123 messages.add(message); 124 } else { 125 break; 126 } 127 } 128 version(HUNT_DEBUG) tracef("Decoding done. Messages size: %d", messages.size()); 129 130 return messages; 131 } 132 133 /** 134 * Decode a single STOMP frame from the given {@code buffer} into a {@link Message}. 135 */ 136 137 private ByteMessage decodeMessage(ByteBuffer byteBuffer, MultiStringsMap headers) { 138 139 version(HUNT_DEBUG) tracef("decoding buffer %s...", byteBuffer.toString()); 140 141 ByteMessage decodedMessage = null; 142 skipLeadingEol(byteBuffer); 143 144 // Explicit mark/reset access via Buffer base type for compatibility 145 // with covariant return type on JDK 9's ByteBuffer... 146 Buffer buffer = byteBuffer; 147 buffer.mark(); 148 149 string command = readCommand(byteBuffer); 150 version(HUNT_DEBUG) infof("command: %s", command); 151 if (command.length > 0) { 152 StompHeaderAccessor headerAccessor = null; 153 Pair!(bool, byte[]) payload = makePair(false, cast(byte[])null); 154 if (byteBuffer.remaining() > 0) { 155 StompCommand stompCommand = StompCommand.valueOf(command); 156 headerAccessor = StompHeaderAccessor.create(stompCommand); 157 initHeaders(headerAccessor); 158 readHeaders(byteBuffer, headerAccessor); 159 payload = readPayload(byteBuffer, headerAccessor); 160 version(HUNT_DEBUG) tracef("payload size(bytes): %d", payload.second.length); 161 } 162 163 if (payload.first) { 164 byte[] payloadBuffer = payload.second; 165 if (payloadBuffer.length > 0) { 166 Nullable!StompCommand stompCommand = headerAccessor.getCommand(); 167 if (stompCommand !is null) { 168 StompCommand cmd = stompCommand.value; 169 if(!cmd.isBodyAllowed()) { 170 string hs = "null"; 171 if(headers !is null) hs = headers.toString(); 172 throw new StompConversionException(stompCommand.toString() ~ 173 " shouldn't have a payload: length=" ~ 174 to!string(payloadBuffer.length) ~ ", headers=" ~ hs); 175 } 176 } 177 } 178 179 // if(headerAccessor !is null) { 180 headerAccessor.updateSimpMessageHeadersFromStompHeaders(); 181 headerAccessor.setLeaveMutable(true); 182 decodedMessage = cast(ByteMessage)MessageHelper.createMessage(payloadBuffer, headerAccessor.getMessageHeaders()); 183 version(HUNT_DEBUG) { 184 trace("Decoded " ~ headerAccessor.getDetailedLogMessage(new Nullable!(byte[])(payloadBuffer))); 185 } 186 // } else { 187 // version(HUNT_DEBUG) warning("Incomplete frame, resetting input buffer..."); 188 // buffer.reset(); 189 // } 190 } else { 191 version(HUNT_DEBUG) warning("Incomplete frame, resetting input buffer..."); 192 if (headers !is null && headerAccessor !is null) { 193 string name = NativeMessageHeaderAccessor.NATIVE_HEADERS; 194 195 MultiStringsMap map = cast(MultiStringsMap) headerAccessor.getHeader(name); 196 if (map !is null) { 197 headers.putAll(map); 198 } 199 } 200 buffer.reset(); 201 } 202 } else { 203 StompHeaderAccessor headerAccessor = StompHeaderAccessor.createForHeartbeat(); 204 initHeaders(headerAccessor); 205 headerAccessor.setLeaveMutable(true); 206 decodedMessage = MessageHelper.createMessage(HEARTBEAT_PAYLOAD, headerAccessor.getMessageHeaders()); 207 version(HUNT_DEBUG) { 208 trace("Decoded " ~ headerAccessor.getDetailedLogMessage(null)); 209 } 210 } 211 212 return decodedMessage; 213 } 214 215 private void initHeaders(StompHeaderAccessor headerAccessor) { 216 MessageHeaderInitializer initializer = getHeaderInitializer(); 217 if (initializer !is null) { 218 initializer.initHeaders(headerAccessor); 219 } 220 } 221 222 /** 223 * Skip one ore more EOL characters at the start of the given ByteBuffer. 224 * Those are STOMP heartbeat frames. 225 */ 226 protected void skipLeadingEol(ByteBuffer byteBuffer) { 227 while (true) { 228 if (!tryConsumeEndOfLine(byteBuffer)) { 229 break; 230 } 231 } 232 } 233 234 private string readCommand(ByteBuffer byteBuffer) { 235 ByteArrayOutputStream command = new ByteArrayOutputStream(256); 236 while (byteBuffer.remaining() > 0 && !tryConsumeEndOfLine(byteBuffer)) { 237 command.write(byteBuffer.get()); 238 } 239 return cast(string) (command.toByteArray()); 240 } 241 242 private void readHeaders(ByteBuffer byteBuffer, StompHeaderAccessor headerAccessor) { 243 while (true) { 244 ByteArrayOutputStream headerStream = new ByteArrayOutputStream(256); 245 bool headerComplete = false; 246 while (byteBuffer.hasRemaining()) { 247 if (tryConsumeEndOfLine(byteBuffer)) { 248 headerComplete = true; 249 break; 250 } 251 headerStream.write(byteBuffer.get()); 252 } 253 if (headerStream.size() > 0 && headerComplete) { 254 string header = cast(string)(headerStream.toByteArray()); 255 int colonIndex = cast(int)header.indexOf(":"); 256 if (colonIndex <= 0) { 257 if (byteBuffer.remaining() > 0) { 258 throw new StompConversionException("Illegal header: '" ~ header ~ 259 "'. A header must be of the form <name>:[<value>]."); 260 } 261 } 262 else { 263 string headerName = unescape(header.substring(0, colonIndex)); 264 string headerValue = unescape(header.substring(colonIndex + 1)); 265 version(HUNT_DEBUG) tracef("header: name=%s, value=%s", headerName, headerValue); 266 try { 267 headerAccessor.addNativeHeader(headerName, headerValue); 268 } 269 catch (InvalidMimeTypeException ex) { 270 if (byteBuffer.remaining() > 0) { 271 throw ex; 272 } 273 } 274 } 275 } 276 else { 277 break; 278 } 279 } 280 } 281 282 /** 283 * See STOMP Spec 1.2: 284 * <a href="http://stomp.github.io/stomp-specification-1.2.html#Value_Encoding">"Value Encoding"</a>. 285 */ 286 private string unescape(string inString) { 287 StringBuilder sb = new StringBuilder(inString.length); 288 int pos = 0; // position in the old string 289 int index = cast(int)inString.indexOf("\\"); 290 291 while (index >= 0) { 292 sb.append(inString.substring(pos, index)); 293 if (index + 1 >= inString.length) { 294 throw new StompConversionException("Illegal escape sequence at index " ~ 295 index.to!string() ~ ": " ~ inString); 296 } 297 char c = inString[index + 1]; 298 if (c == 'r') { 299 sb.append('\r'); 300 } 301 else if (c == 'n') { 302 sb.append('\n'); 303 } 304 else if (c == 'c') { 305 sb.append(':'); 306 } 307 else if (c == '\\') { 308 sb.append('\\'); 309 } 310 else { 311 // should never happen 312 throw new StompConversionException("Illegal escape sequence at index " ~ 313 index.to!string() ~ ": " ~ inString); 314 } 315 pos = index + 2; 316 index = cast(int)inString.indexOf("\\", pos); 317 } 318 319 sb.append(inString.substring(pos)); 320 return sb.toString(); 321 } 322 323 324 private Pair!(bool, byte[]) readPayload(ByteBuffer byteBuffer, StompHeaderAccessor headerAccessor) { 325 Integer contentLength; 326 try { 327 contentLength = headerAccessor.getContentLength(); 328 } 329 catch (NumberFormatException ex) { 330 version(HUNT_DEBUG) { 331 trace("Ignoring invalid content-length: '" ~ headerAccessor.toString()); 332 } 333 contentLength = null; 334 } 335 336 if (contentLength !is null && contentLength >= 0) { 337 if (byteBuffer.remaining() > contentLength) { 338 byte[] payload = new byte[contentLength.value]; 339 byteBuffer.get(payload); 340 if (byteBuffer.get() != 0) { 341 throw new StompConversionException("Frame must be terminated with a null octet"); 342 } 343 return makePair(true, payload); 344 } 345 else { 346 return makePair(false, cast(byte[])null); // null; 347 } 348 } else { 349 ByteArrayOutputStream payload = new ByteArrayOutputStream(256); 350 while (byteBuffer.remaining() > 0) { 351 byte b = byteBuffer.get(); 352 if (b == 0) { 353 return makePair(true, payload.toByteArray()); // payload.toByteArray(); 354 } 355 else { 356 payload.write(b); 357 } 358 } 359 } 360 return makePair(false, cast(byte[])null); // null; 361 } 362 363 /** 364 * Try to read an EOL incrementing the buffer position if successful. 365 * @return whether an EOL was consumed 366 */ 367 private bool tryConsumeEndOfLine(ByteBuffer byteBuffer) { 368 if (byteBuffer.remaining() > 0) { 369 byte b = byteBuffer.get(); 370 if (b == '\n') { 371 return true; 372 } 373 else if (b == '\r') { 374 if (byteBuffer.remaining() > 0 && byteBuffer.get() == '\n') { 375 return true; 376 } 377 else { 378 throw new StompConversionException("'\\r' must be followed by '\\n'"); 379 } 380 } 381 // Explicit cast for compatibility with covariant return type on JDK 9's ByteBuffer 382 (cast(Buffer) byteBuffer).position(byteBuffer.position() - 1); 383 } 384 return false; 385 } 386 387 }