1 /* 2 * Copyright 2002-2018 the original author or authors. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 module hunt.stomp.simp.stomp.StompEncoder; 18 19 20 import hunt.stomp.exception; 21 import hunt.stomp.Message; 22 import hunt.stomp.simp.stomp.StompCommand; 23 import hunt.stomp.simp.stomp.StompDecoder; 24 import hunt.stomp.simp.stomp.StompHeaderAccessor; 25 import hunt.stomp.simp.SimpMessageHeaderAccessor; 26 import hunt.stomp.simp.SimpMessageType; 27 import hunt.stomp.support.NativeMessageHeaderAccessor; 28 29 import hunt.io.ByteArrayOutputStream; 30 import hunt.io.BufferedOutputStream; 31 // import java.util.concurrent.ConcurrentHashMap; 32 33 import hunt.collection; 34 import hunt.logging; 35 import hunt.Exceptions; 36 import hunt.text.Common; 37 import hunt.text.StringBuilder; 38 39 import std.conv; 40 41 42 alias DataOutputStream = BufferedOutputStream; 43 alias ByteArrayMap = Map!(string, byte[]); 44 45 /** 46 * An encoder for STOMP frames. 47 * 48 * @author Andy Wilkinson 49 * @author Rossen Stoyanchev 50 * @since 4.0 51 * @see StompDecoder 52 */ 53 class StompEncoder { 54 55 private enum byte LF = '\n'; 56 57 private enum byte COLON = ':'; 58 59 private enum int HEADER_KEY_CACHE_LIMIT = 32; 60 61 62 private ByteArrayMap headerKeyAccessCache; // = new ConcurrentHashMap<>(HEADER_KEY_CACHE_LIMIT); 63 64 private ByteArrayMap headerKeyUpdateCache; 65 66 this() { 67 headerKeyAccessCache = new HashMap!(string, byte[])(HEADER_KEY_CACHE_LIMIT); 68 69 headerKeyUpdateCache = 70 new class LinkedHashMap!(string, byte[]) { 71 72 this() { 73 super(HEADER_KEY_CACHE_LIMIT, 0.75f, true); 74 } 75 76 override 77 protected bool removeEldestEntry(MapEntry!(string, byte[]) eldest) { 78 if (size() > HEADER_KEY_CACHE_LIMIT) { 79 headerKeyAccessCache.remove(eldest.getKey()); 80 return true; 81 } 82 else { 83 return false; 84 } 85 } 86 }; 87 } 88 89 /** 90 * Encodes the given STOMP {@code message} into a {@code byte[]}. 91 * @param message the message to encode 92 * @return the encoded message 93 */ 94 byte[] encode(Message!(byte[]) message) { 95 return encode(message.getHeaders(), message.getPayload()); 96 } 97 98 /** 99 * Encodes the given payload and headers into a {@code byte[]}. 100 * @param headers the headers 101 * @param payload the payload 102 * @return the encoded message 103 */ 104 byte[] encode(Map!(string, Object) headers, byte[] payload) { 105 assert(headers !is null, "'headers' is required"); 106 // assert(payload !is null, "'payload' is required"); 107 108 try { 109 ByteArrayOutputStream baos = new ByteArrayOutputStream(128 + cast(int)payload.length); 110 DataOutputStream output = new DataOutputStream(baos); 111 112 if (SimpMessageType.HEARTBEAT == SimpMessageHeaderAccessor.getMessageType(headers)) { 113 trace("Encoding heartbeat"); 114 output.write(StompDecoder.HEARTBEAT_PAYLOAD); 115 } else { 116 StompCommand command = StompHeaderAccessor.getCommand(headers); 117 if (command == StompCommand.Null) { 118 throw new IllegalStateException("Missing STOMP command: " ~ (cast(Object)headers).toString()); 119 } 120 121 output.write( cast(byte[]) command.toString()); 122 output.write(LF); 123 writeHeaders(command, headers, payload, output); 124 output.write(LF); 125 writeBody(payload, output); 126 output.write(0); 127 } 128 output.flush(); 129 return baos.toByteArray(); 130 } 131 catch (IOException ex) { 132 throw new StompConversionException( 133 "Failed to encode STOMP frame, headers=" ~ headers.toString(), ex); 134 } 135 } 136 137 private void writeHeaders(StompCommand command, Map!(string, Object) headers, byte[] payload, 138 DataOutputStream output) { 139 140 Map!(string,List!(string)) nativeHeaders = 141 cast(Map!(string, List!(string))) headers.get(NativeMessageHeaderAccessor.NATIVE_HEADERS); 142 143 version(HUNT_DEBUG) { 144 trace("Encoding STOMP " ~ command.toString() ~ 145 ", headers=" ~ nativeHeaders.toString()); 146 } 147 148 if (nativeHeaders is null) { 149 return; 150 } 151 152 bool shouldEscape = (command != StompCommand.CONNECT && command != StompCommand.CONNECTED); 153 154 foreach (string key, List!(string) values ; nativeHeaders) { 155 if (command.requiresContentLength() && "content-length" == key) { 156 continue; 157 } 158 159 if (StompCommand.CONNECT == command && 160 StompHeaderAccessor.STOMP_PASSCODE_HEADER == key) { 161 values = Collections.singletonList(StompHeaderAccessor.getPasscode(headers)); 162 } 163 164 byte[] encodedKey = encodeHeaderKey(key, shouldEscape); 165 foreach (string value ; values) { 166 output.write(encodedKey); 167 output.write(COLON); 168 output.write(encodeHeaderValue(value, shouldEscape)); 169 output.write(LF); 170 } 171 } 172 173 if (command.requiresContentLength()) { 174 size_t contentLength = payload.length; 175 output.write("content-length:"); 176 output.write(contentLength.to!string()); 177 output.write(LF); 178 } 179 } 180 181 private byte[] encodeHeaderKey(string input, bool canEscape) { 182 string inputToUse = (canEscape ? escape(input) : input); 183 if (this.headerKeyAccessCache.containsKey(inputToUse)) { 184 return this.headerKeyAccessCache.get(inputToUse); 185 } 186 synchronized (this.headerKeyUpdateCache) { 187 byte[] bytes = this.headerKeyUpdateCache.get(inputToUse); 188 if (bytes is null) { 189 bytes = cast(byte[])inputToUse.dup; 190 this.headerKeyAccessCache.put(inputToUse, bytes); 191 this.headerKeyUpdateCache.put(inputToUse, bytes); 192 } 193 return bytes; 194 } 195 } 196 197 private byte[] encodeHeaderValue(string input, bool canEscape) { 198 string inputToUse = (canEscape ? escape(input) : input); 199 return cast(byte[])inputToUse.dup; 200 } 201 202 /** 203 * See STOMP Spec 1.2: 204 * <a href="http://stomp.github.io/stomp-specification-1.2.html#Value_Encoding">"Value Encoding"</a>. 205 */ 206 private string escape(string inString) { 207 StringBuilder sb = null; 208 for (int i = 0; i < inString.length; i++) { 209 char c = inString.charAt(i); 210 if (c == '\\') { 211 sb = getStringBuilder(sb, inString, i); 212 sb.append("\\\\"); 213 } 214 else if (c == ':') { 215 sb = getStringBuilder(sb, inString, i); 216 sb.append("\\c"); 217 } 218 else if (c == '\n') { 219 sb = getStringBuilder(sb, inString, i); 220 sb.append("\\n"); 221 } 222 else if (c == '\r') { 223 sb = getStringBuilder(sb, inString, i); 224 sb.append("\\r"); 225 } 226 else if (sb !is null){ 227 sb.append(c); 228 } 229 } 230 return (sb !is null ? sb.toString() : inString); 231 } 232 233 private StringBuilder getStringBuilder(StringBuilder sb, string inString, int i) { 234 if (sb is null) { 235 sb = new StringBuilder(inString.length); 236 sb.append(inString.substring(0, i)); 237 } 238 return sb; 239 } 240 241 private void writeBody(byte[] payload, DataOutputStream output) { 242 output.write(payload); 243 } 244 245 }