1 /*
2  * Copyright 2002-2018 the original author or authors.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 module hunt.stomp.simp.stomp.BufferingStompDecoder;
18 
19 import hunt.stomp.simp.stomp.StompDecoder;
20 import hunt.stomp.simp.stomp.StompHeaderAccessor;
21 
22 
23 import hunt.stomp.exception;
24 import hunt.stomp.Message;
25 
26 import hunt.collection;
27 import hunt.Integer;
28 
29 import std.algorithm;
30 import std.array;
31 import std.conv;
32 import std.container.dlist;
33 // import java.nio.ByteBuffer;
34 // import java.util.List;
35 // import java.util.Queue;
36 // import java.util.concurrent.LinkedBlockingQueue;
37 
38 
39 // import hunt.framework.util.LinkedMultiValueMap;
40 // import hunt.framework.util.MultiValueMap;
41 
42 /**
43  * An extension of {@link hunt.stomp.simp.stomp.StompDecoder}
44  * that buffers content remaining in the input ByteBuffer after the parent
45  * class has read all (complete) STOMP frames from it. The remaining content
46  * represents an incomplete STOMP frame. When called repeatedly with additional
47  * data, the decode method returns one or more messages or, if there is not
48  * enough data still, continues to buffer.
49  *
50  * <p>A single instance of this decoder can be invoked repeatedly to read all
51  * messages from a single stream (e.g. WebSocket session) as long as decoding
52  * does not fail. If there is an exception, StompDecoder instance should not
53  * be used any more as its internal state is not guaranteed to be consistent.
54  * It is expected that the underlying session is closed at that point.
55  *
56  * @author Rossen Stoyanchev
57  * @since 4.0.3
58  * @see StompDecoder
59  */
60 class BufferingStompDecoder {
61 
62 	private StompDecoder stompDecoder;
63 
64 	private int bufferSizeLimit;
65 
66 	// private Queue!(ByteBuffer) chunks = new LinkedBlockingQueue<>();
67 	private DList!(ByteBuffer) chunks;
68 	
69 	private Integer expectedContentLength;
70 
71 
72 	/**
73 	 * Create a new {@code BufferingStompDecoder} wrapping the given {@code StompDecoder}.
74 	 * @param stompDecoder the target decoder to wrap
75 	 * @param bufferSizeLimit the buffer size limit
76 	 */
77 	this(StompDecoder stompDecoder, int bufferSizeLimit) {
78 		// assert(false, "Test");
79 		assert(stompDecoder, "StompDecoder is required");
80 		assert(bufferSizeLimit > 0, "Buffer size limit must be greater than 0");
81 		this.stompDecoder = stompDecoder;
82 		this.bufferSizeLimit = bufferSizeLimit;
83 	}
84 
85 
86 	/**
87 	 * Return the wrapped {@link StompDecoder}.
88 	 */
89 	final StompDecoder getStompDecoder() {
90 		return this.stompDecoder;
91 	}
92 
93 	/**
94 	 * Return the configured buffer size limit.
95 	 */
96 	final int getBufferSizeLimit() {
97 		return this.bufferSizeLimit;
98 	}
99 
100 
101 	/**
102 	 * Decodes one or more STOMP frames from the given {@code ByteBuffer} into a
103 	 * list of {@link Message Messages}.
104 	 * <p>If there was enough data to parse a "content-length" header, then the
105 	 * value is used to determine how much more data is needed before a new
106 	 * attempt to decode is made.
107 	 * <p>If there was not enough data to parse the "content-length", or if there
108 	 * is "content-length" header, every subsequent call to decode attempts to
109 	 * parse again with all available data. Therefore the presence of a "content-length"
110 	 * header helps to optimize the decoding of large messages.
111 	 * @param newBuffer a buffer containing new data to decode
112 	 * @return decoded messages or an empty list
113 	 * @throws StompConversionException raised in case of decoding issues
114 	 */
115 	List!(Message!(byte[])) decode(ByteBuffer newBuffer) {
116 		this.chunks.insertBack(newBuffer);
117 		checkBufferLimits();
118 
119 		Integer contentLength = this.expectedContentLength;
120 		if (contentLength !is null && getBufferSize() < contentLength) {
121 			return Collections.emptyList!(Message!(byte[]))();
122 		}
123 
124 		ByteBuffer bufferToDecode = assembleChunksAndReset();
125 		MultiValueMap!(string, string) headers = new LinkedMultiValueMap!(string, string)();
126 		List!(Message!(byte[])) messages = this.stompDecoder.decode(bufferToDecode, headers);
127 
128 		if (bufferToDecode.hasRemaining()) {
129 			this.chunks.insertBack(bufferToDecode);
130 			this.expectedContentLength = StompHeaderAccessor.getContentLength(headers);
131 		}
132 
133 		return messages;
134 	}
135 
136 	private ByteBuffer assembleChunksAndReset() {
137 		ByteBuffer result;
138 		ByteBuffer[] cs = this.chunks.array();
139 		if (cs.length == 1) {
140 			result = this.chunks.front();
141 		}
142 		else {
143 			result = BufferUtils.allocate(getBufferSize());
144 			foreach (ByteBuffer partial ; this.chunks) {
145 				result.put(partial);
146 			}
147 			result.flip();
148 		}
149 		this.chunks.clear();
150 		this.expectedContentLength = null;
151 		return result;
152 	}
153 
154 	private void checkBufferLimits() {
155 		Integer contentLength = this.expectedContentLength;
156 		if (contentLength !is null && contentLength > this.bufferSizeLimit) {
157 			throw new StompConversionException(
158 					"STOMP 'content-length' header value " ~ this.expectedContentLength.toString() ~
159 					"  exceeds configured buffer size limit " ~ this.bufferSizeLimit.to!string());
160 		}
161 		if (getBufferSize() > this.bufferSizeLimit) {
162 			throw new StompConversionException("The configured STOMP buffer size limit of " ~
163 					this.bufferSizeLimit.to!string() ~ " bytes has been exceeded");
164 		}
165 	}
166 
167 	/**
168 	 * Calculate the current buffer size.
169 	 */
170 	int getBufferSize() {
171 		int size = 0;
172 		foreach (ByteBuffer buffer ; this.chunks) {
173 			size = size + buffer.remaining();
174 		}
175 		return size;
176 	}
177 
178 	/**
179 	 * Get the expected content length of the currently buffered, incomplete STOMP frame.
180 	 */
181 	
182 	Integer getExpectedContentLength() {
183 		return this.expectedContentLength;
184 	}
185 
186 }