1 /*
2  * Copyright 2002-2018 the original author or authors.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 module hunt.stomp.simp.stomp.StompDecoder;
18 
19 import hunt.stomp.simp.stomp.StompHeaderAccessor;
20 import hunt.stomp.exception;
21 import hunt.stomp.Message;
22 import hunt.stomp.simp.stomp.StompCommand;
23 import hunt.stomp.simp.SimpMessageType;
24 import hunt.stomp.support.MessageHeaderAccessor;
25 import hunt.stomp.support.MessageBuilder;
26 import hunt.stomp.support.NativeMessageHeaderAccessor;
27 
28 import hunt.collection;
29 import hunt.io.ByteArrayOutputStream;
30 import hunt.logging;
31 import hunt.Exceptions;
32 import hunt.Integer;
33 import hunt.Nullable;
34 import hunt.text.Common;
35 import hunt.text.StringBuilder;
36 import hunt.util.TypeUtils;
37 
38 import std.conv;
39 import std.string;
40 
41 
42 /**
43  * Decodes one or more STOMP frames contained in a {@link ByteBuffer}.
44  *
45  * <p>An attempt is made to read all complete STOMP frames from the buffer, which
46  * could be zero, one, or more. If there is any left-over content, i.e. an incomplete
47  * STOMP frame, at the end the buffer is reset to point to the beginning of the
48  * partial content. The caller is then responsible for dealing with that
49  * incomplete content by buffering until there is more input available.
50  *
51  * @author Andy Wilkinson
52  * @author Rossen Stoyanchev
53  * @since 4.0
54  */
55 class StompDecoder {
56 
57 	alias ByteMessage = Message!(byte[]);
58 
59 	enum byte[] HEARTBEAT_PAYLOAD = ['\n'];
60 
61 	private MessageHeaderInitializer headerInitializer;
62 
63 
64 	/**
65 	 * Configure a {@link MessageHeaderInitializer} to apply to the headers of
66 	 * {@link Message Messages} from decoded STOMP frames.
67 	 */
68 	void setHeaderInitializer(MessageHeaderInitializer headerInitializer) {
69 		this.headerInitializer = headerInitializer;
70 	}
71 
72 	/**
73 	 * Return the configured {@code MessageHeaderInitializer}, if any.
74 	 */
75 	
76 	MessageHeaderInitializer getHeaderInitializer() {
77 		return this.headerInitializer;
78 	}
79 
80 
81 	/**
82 	 * Decodes one or more STOMP frames from the given {@code ByteBuffer} into a
83 	 * list of {@link Message Messages}. If the input buffer contains partial STOMP frame
84 	 * content, or additional content with a partial STOMP frame, the buffer is
85 	 * reset and {@code null} is returned.
86 	 * @param byteBuffer the buffer to decode the STOMP frame from
87 	 * @return the decoded messages, or an empty list if none
88 	 * @throws StompConversionException raised in case of decoding issues
89 	 */
90 	List!(ByteMessage) decode(ByteBuffer byteBuffer) {
91 		return decode(byteBuffer, null);
92 	}
93 
94 	/**
95 	 * Decodes one or more STOMP frames from the given {@code buffer} and returns
96 	 * a list of {@link Message Messages}.
97 	 * <p>If the given ByteBuffer contains only partial STOMP frame content and no
98 	 * complete STOMP frames, an empty list is returned, and the buffer is reset to
99 	 * to where it was.
100 	 * <p>If the buffer contains one ore more STOMP frames, those are returned and
101 	 * the buffer reset to point to the beginning of the unused partial content.
102 	 * <p>The output partialMessageHeaders map is used to store successfully parsed
103 	 * headers in case of partial content. The caller can then check if a
104 	 * "content-length" header was read, which helps to determine how much more
105 	 * content is needed before the next attempt to decode.
106 	 * @param byteBuffer the buffer to decode the STOMP frame from
107 	 * @param partialMessageHeaders an empty output map that will store the last
108 	 * successfully parsed partialMessageHeaders in case of partial message content
109 	 * in cases where the partial buffer ended with a partial STOMP frame
110 	 * @return the decoded messages, or an empty list if none
111 	 * @throws StompConversionException raised in case of decoding issues
112 	 */
113 	List!(ByteMessage) decode(ByteBuffer byteBuffer,
114 			MultiStringsMap partialMessageHeaders) {
115 		
116 		version(HUNT_DEBUG) tracef("Decoding %s...", byteBuffer.toString());
117 
118 		List!(ByteMessage) messages = new ArrayList!(ByteMessage)();
119 		while (byteBuffer.hasRemaining()) {
120 			ByteMessage message = decodeMessage(byteBuffer, partialMessageHeaders);
121 			if (message !is null) {
122 				version(HUNT_DEBUG) tracef("messages: %s", messages.toString());
123 				messages.add(message);
124 			} else {
125 				break;
126 			}
127 		}
128 		version(HUNT_DEBUG) tracef("Decoding done. Messages size: %d", messages.size());
129 
130 		return messages;
131 	}
132 
133 	/**
134 	 * Decode a single STOMP frame from the given {@code buffer} into a {@link Message}.
135 	 */
136 	
137 	private ByteMessage decodeMessage(ByteBuffer byteBuffer, MultiStringsMap headers) {
138 
139 		version(HUNT_DEBUG) tracef("decoding buffer %s...", byteBuffer.toString());
140 			
141 		ByteMessage decodedMessage = null;
142 		skipLeadingEol(byteBuffer);
143 
144 		// Explicit mark/reset access via Buffer base type for compatibility
145 		// with covariant return type on JDK 9's ByteBuffer...
146 		Buffer buffer = byteBuffer;
147 		buffer.mark();
148 
149 		string command = readCommand(byteBuffer);
150 		version(HUNT_DEBUG) infof("command: %s", command);
151 		if (command.length > 0) {
152 			StompHeaderAccessor headerAccessor = null;
153 			Pair!(bool, byte[]) payload = makePair(false, cast(byte[])null);
154 			if (byteBuffer.remaining() > 0) {
155 				StompCommand stompCommand = StompCommand.valueOf(command);
156 				headerAccessor = StompHeaderAccessor.create(stompCommand);
157 				initHeaders(headerAccessor);
158 				readHeaders(byteBuffer, headerAccessor);
159 				payload = readPayload(byteBuffer, headerAccessor);
160 				version(HUNT_DEBUG) tracef("payload size(bytes): %d", payload.second.length);
161 			}
162 
163 			if (payload.first) {
164 				byte[] payloadBuffer = payload.second;
165 				if (payloadBuffer.length > 0) {
166 					Nullable!StompCommand stompCommand = headerAccessor.getCommand();
167 					if (stompCommand !is null) {
168 						StompCommand cmd = stompCommand.value;
169 						if(!cmd.isBodyAllowed()) {
170 							string hs = "null";
171 							if(headers !is null) hs = headers.toString();
172 							throw new StompConversionException(stompCommand.toString() ~
173 									" shouldn't have a payload: length=" ~ 
174 									to!string(payloadBuffer.length) ~ ", headers=" ~ hs);
175 						}
176 					}
177 				}
178 
179 				// if(headerAccessor !is null) {
180 					headerAccessor.updateSimpMessageHeadersFromStompHeaders();
181 					headerAccessor.setLeaveMutable(true);
182 					decodedMessage = cast(ByteMessage)MessageHelper.createMessage(payloadBuffer, headerAccessor.getMessageHeaders());
183 					version(HUNT_DEBUG) {
184 						trace("Decoded " ~ headerAccessor.getDetailedLogMessage(new Nullable!(byte[])(payloadBuffer)));
185 					}
186 				// } else {
187 				// 	version(HUNT_DEBUG) warning("Incomplete frame, resetting input buffer...");
188 				// 	buffer.reset();
189 				// }
190 			} else {
191 				version(HUNT_DEBUG) warning("Incomplete frame, resetting input buffer...");
192 				if (headers !is null && headerAccessor !is null) {
193 					string name = NativeMessageHeaderAccessor.NATIVE_HEADERS;
194 					
195 					MultiStringsMap map = cast(MultiStringsMap) headerAccessor.getHeader(name);
196 					if (map !is null) {
197 						headers.putAll(map);
198 					}
199 				}
200 				buffer.reset();
201 			}
202 		} else {
203 			StompHeaderAccessor headerAccessor = StompHeaderAccessor.createForHeartbeat();
204 			initHeaders(headerAccessor);
205 			headerAccessor.setLeaveMutable(true);
206 			decodedMessage = MessageHelper.createMessage(HEARTBEAT_PAYLOAD, headerAccessor.getMessageHeaders());
207 			version(HUNT_DEBUG) {
208 				trace("Decoded " ~ headerAccessor.getDetailedLogMessage(null));
209 			}
210 		}
211 
212 		return decodedMessage;
213 	}
214 
215 	private void initHeaders(StompHeaderAccessor headerAccessor) {
216 		MessageHeaderInitializer initializer = getHeaderInitializer();
217 		if (initializer !is null) {
218 			initializer.initHeaders(headerAccessor);
219 		}
220 	}
221 
222 	/**
223 	 * Skip one ore more EOL characters at the start of the given ByteBuffer.
224 	 * Those are STOMP heartbeat frames.
225 	 */
226 	protected void skipLeadingEol(ByteBuffer byteBuffer) {
227 		while (true) {
228 			if (!tryConsumeEndOfLine(byteBuffer)) {
229 				break;
230 			}
231 		}
232 	}
233 
234 	private string readCommand(ByteBuffer byteBuffer) {
235 		ByteArrayOutputStream command = new ByteArrayOutputStream(256);
236 		while (byteBuffer.remaining() > 0 && !tryConsumeEndOfLine(byteBuffer)) {
237 			command.write(byteBuffer.get());
238 		}
239 		return cast(string) (command.toByteArray());
240 	}
241 
242 	private void readHeaders(ByteBuffer byteBuffer, StompHeaderAccessor headerAccessor) {
243 		while (true) {
244 			ByteArrayOutputStream headerStream = new ByteArrayOutputStream(256);
245 			bool headerComplete = false;
246 			while (byteBuffer.hasRemaining()) {
247 				if (tryConsumeEndOfLine(byteBuffer)) {
248 					headerComplete = true;
249 					break;
250 				}
251 				headerStream.write(byteBuffer.get());
252 			}
253 			if (headerStream.size() > 0 && headerComplete) {
254 				string header = cast(string)(headerStream.toByteArray());
255 				int colonIndex = cast(int)header.indexOf(":");
256 				if (colonIndex <= 0) {
257 					if (byteBuffer.remaining() > 0) {
258 						throw new StompConversionException("Illegal header: '" ~ header ~
259 								"'. A header must be of the form <name>:[<value>].");
260 					}
261 				}
262 				else {
263 					string headerName = unescape(header.substring(0, colonIndex));
264 					string headerValue = unescape(header.substring(colonIndex + 1));
265 					version(HUNT_DEBUG) tracef("header: name=%s, value=%s", headerName, headerValue);
266 					try {
267 						headerAccessor.addNativeHeader(headerName, headerValue);
268 					}
269 					catch (InvalidMimeTypeException ex) {
270 						if (byteBuffer.remaining() > 0) {
271 							throw ex;
272 						}
273 					}
274 				}
275 			}
276 			else {
277 				break;
278 			}
279 		}
280 	}
281 
282 	/**
283 	 * See STOMP Spec 1.2:
284 	 * <a href="http://stomp.github.io/stomp-specification-1.2.html#Value_Encoding">"Value Encoding"</a>.
285 	 */
286 	private string unescape(string inString) {
287 		StringBuilder sb = new StringBuilder(inString.length);
288 		int pos = 0;  // position in the old string
289 		int index = cast(int)inString.indexOf("\\");
290 
291 		while (index >= 0) {
292 			sb.append(inString.substring(pos, index));
293 			if (index + 1 >= inString.length) {
294 				throw new StompConversionException("Illegal escape sequence at index " ~ 
295 					index.to!string() ~ ": " ~ inString);
296 			}
297 			char c = inString[index + 1];
298 			if (c == 'r') {
299 				sb.append('\r');
300 			}
301 			else if (c == 'n') {
302 				sb.append('\n');
303 			}
304 			else if (c == 'c') {
305 				sb.append(':');
306 			}
307 			else if (c == '\\') {
308 				sb.append('\\');
309 			}
310 			else {
311 				// should never happen
312 				throw new StompConversionException("Illegal escape sequence at index " ~ 
313 					index.to!string() ~ ": " ~ inString);
314 			}
315 			pos = index + 2;
316 			index = cast(int)inString.indexOf("\\", pos);
317 		}
318 
319 		sb.append(inString.substring(pos));
320 		return sb.toString();
321 	}
322 
323 	
324 	private Pair!(bool, byte[]) readPayload(ByteBuffer byteBuffer, StompHeaderAccessor headerAccessor) {
325 		Integer contentLength;
326 		try {
327 			contentLength = headerAccessor.getContentLength();
328 		}
329 		catch (NumberFormatException ex) {
330 			version(HUNT_DEBUG) {
331 				trace("Ignoring invalid content-length: '" ~ headerAccessor.toString());
332 			}
333 			contentLength = null;
334 		}
335 
336 		if (contentLength !is null && contentLength >= 0) {
337 			if (byteBuffer.remaining() > contentLength) {
338 				byte[] payload = new byte[contentLength.value];
339 				byteBuffer.get(payload);
340 				if (byteBuffer.get() != 0) {
341 					throw new StompConversionException("Frame must be terminated with a null octet");
342 				}
343 				return makePair(true, payload);
344 			}
345 			else {
346 				return makePair(false, cast(byte[])null); // null;
347 			}
348 		} else {
349 			ByteArrayOutputStream payload = new ByteArrayOutputStream(256);
350 			while (byteBuffer.remaining() > 0) {
351 				byte b = byteBuffer.get();
352 				if (b == 0) {
353 					return makePair(true, payload.toByteArray()); // payload.toByteArray();
354 				}
355 				else {
356 					payload.write(b);
357 				}
358 			}
359 		}
360 		return makePair(false, cast(byte[])null); // null;
361 	}
362 
363 	/**
364 	 * Try to read an EOL incrementing the buffer position if successful.
365 	 * @return whether an EOL was consumed
366 	 */
367 	private bool tryConsumeEndOfLine(ByteBuffer byteBuffer) {
368 		if (byteBuffer.remaining() > 0) {
369 			byte b = byteBuffer.get();
370 			if (b == '\n') {
371 				return true;
372 			}
373 			else if (b == '\r') {
374 				if (byteBuffer.remaining() > 0 && byteBuffer.get() == '\n') {
375 					return true;
376 				}
377 				else {
378 					throw new StompConversionException("'\\r' must be followed by '\\n'");
379 				}
380 			}
381 			// Explicit cast for compatibility with covariant return type on JDK 9's ByteBuffer
382 			(cast(Buffer) byteBuffer).position(byteBuffer.position() - 1);
383 		}
384 		return false;
385 	}
386 
387 }