1 /* 2 * Copyright 2002-2018 the original author or authors. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 module hunt.stomp.simp.stomp.BufferingStompDecoder; 18 19 import hunt.stomp.simp.stomp.StompDecoder; 20 import hunt.stomp.simp.stomp.StompHeaderAccessor; 21 22 23 import hunt.stomp.exception; 24 import hunt.stomp.Message; 25 26 import hunt.collection; 27 import hunt.Integer; 28 29 import std.algorithm; 30 import std.array; 31 import std.conv; 32 import std.container.dlist; 33 // import java.nio.ByteBuffer; 34 // import java.util.List; 35 // import java.util.Queue; 36 // import java.util.concurrent.LinkedBlockingQueue; 37 38 39 // import hunt.framework.util.LinkedMultiValueMap; 40 // import hunt.framework.util.MultiValueMap; 41 42 /** 43 * An extension of {@link hunt.stomp.simp.stomp.StompDecoder} 44 * that buffers content remaining in the input ByteBuffer after the parent 45 * class has read all (complete) STOMP frames from it. The remaining content 46 * represents an incomplete STOMP frame. When called repeatedly with additional 47 * data, the decode method returns one or more messages or, if there is not 48 * enough data still, continues to buffer. 49 * 50 * <p>A single instance of this decoder can be invoked repeatedly to read all 51 * messages from a single stream (e.g. WebSocket session) as long as decoding 52 * does not fail. If there is an exception, StompDecoder instance should not 53 * be used any more as its internal state is not guaranteed to be consistent. 54 * It is expected that the underlying session is closed at that point. 55 * 56 * @author Rossen Stoyanchev 57 * @since 4.0.3 58 * @see StompDecoder 59 */ 60 class BufferingStompDecoder { 61 62 private StompDecoder stompDecoder; 63 64 private int bufferSizeLimit; 65 66 // private Queue!(ByteBuffer) chunks = new LinkedBlockingQueue<>(); 67 private DList!(ByteBuffer) chunks; 68 69 private Integer expectedContentLength; 70 71 72 /** 73 * Create a new {@code BufferingStompDecoder} wrapping the given {@code StompDecoder}. 74 * @param stompDecoder the target decoder to wrap 75 * @param bufferSizeLimit the buffer size limit 76 */ 77 this(StompDecoder stompDecoder, int bufferSizeLimit) { 78 // assert(false, "Test"); 79 assert(stompDecoder, "StompDecoder is required"); 80 assert(bufferSizeLimit > 0, "Buffer size limit must be greater than 0"); 81 this.stompDecoder = stompDecoder; 82 this.bufferSizeLimit = bufferSizeLimit; 83 } 84 85 86 /** 87 * Return the wrapped {@link StompDecoder}. 88 */ 89 final StompDecoder getStompDecoder() { 90 return this.stompDecoder; 91 } 92 93 /** 94 * Return the configured buffer size limit. 95 */ 96 final int getBufferSizeLimit() { 97 return this.bufferSizeLimit; 98 } 99 100 101 /** 102 * Decodes one or more STOMP frames from the given {@code ByteBuffer} into a 103 * list of {@link Message Messages}. 104 * <p>If there was enough data to parse a "content-length" header, then the 105 * value is used to determine how much more data is needed before a new 106 * attempt to decode is made. 107 * <p>If there was not enough data to parse the "content-length", or if there 108 * is "content-length" header, every subsequent call to decode attempts to 109 * parse again with all available data. Therefore the presence of a "content-length" 110 * header helps to optimize the decoding of large messages. 111 * @param newBuffer a buffer containing new data to decode 112 * @return decoded messages or an empty list 113 * @throws StompConversionException raised in case of decoding issues 114 */ 115 List!(Message!(byte[])) decode(ByteBuffer newBuffer) { 116 this.chunks.insertBack(newBuffer); 117 checkBufferLimits(); 118 119 Integer contentLength = this.expectedContentLength; 120 if (contentLength !is null && getBufferSize() < contentLength) { 121 return Collections.emptyList!(Message!(byte[]))(); 122 } 123 124 ByteBuffer bufferToDecode = assembleChunksAndReset(); 125 MultiValueMap!(string, string) headers = new LinkedMultiValueMap!(string, string)(); 126 List!(Message!(byte[])) messages = this.stompDecoder.decode(bufferToDecode, headers); 127 128 if (bufferToDecode.hasRemaining()) { 129 this.chunks.insertBack(bufferToDecode); 130 this.expectedContentLength = StompHeaderAccessor.getContentLength(headers); 131 } 132 133 return messages; 134 } 135 136 private ByteBuffer assembleChunksAndReset() { 137 ByteBuffer result; 138 ByteBuffer[] cs = this.chunks.array(); 139 if (cs.length == 1) { 140 result = this.chunks.front(); 141 } 142 else { 143 result = BufferUtils.allocate(getBufferSize()); 144 foreach (ByteBuffer partial ; this.chunks) { 145 result.put(partial); 146 } 147 result.flip(); 148 } 149 this.chunks.clear(); 150 this.expectedContentLength = null; 151 return result; 152 } 153 154 private void checkBufferLimits() { 155 Integer contentLength = this.expectedContentLength; 156 if (contentLength !is null && contentLength > this.bufferSizeLimit) { 157 throw new StompConversionException( 158 "STOMP 'content-length' header value " ~ this.expectedContentLength.toString() ~ 159 " exceeds configured buffer size limit " ~ this.bufferSizeLimit.to!string()); 160 } 161 if (getBufferSize() > this.bufferSizeLimit) { 162 throw new StompConversionException("The configured STOMP buffer size limit of " ~ 163 this.bufferSizeLimit.to!string() ~ " bytes has been exceeded"); 164 } 165 } 166 167 /** 168 * Calculate the current buffer size. 169 */ 170 int getBufferSize() { 171 int size = 0; 172 foreach (ByteBuffer buffer ; this.chunks) { 173 size = size + buffer.remaining(); 174 } 175 return size; 176 } 177 178 /** 179 * Get the expected content length of the currently buffered, incomplete STOMP frame. 180 */ 181 182 Integer getExpectedContentLength() { 183 return this.expectedContentLength; 184 } 185 186 }