1 /*
2  * Copyright 2002-2018 the original author or authors.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 module hunt.stomp.simp.stomp.StompEncoder;
18 
19 
20 import hunt.stomp.exception;
21 import hunt.stomp.Message;
22 import hunt.stomp.simp.stomp.StompCommand;
23 import hunt.stomp.simp.stomp.StompDecoder;
24 import hunt.stomp.simp.stomp.StompHeaderAccessor;
25 import hunt.stomp.simp.SimpMessageHeaderAccessor;
26 import hunt.stomp.simp.SimpMessageType;
27 import hunt.stomp.support.NativeMessageHeaderAccessor;
28 
29 import hunt.io.ByteArrayOutputStream;
30 import hunt.io.BufferedOutputStream;
31 // import java.util.concurrent.ConcurrentHashMap;
32 
33 import hunt.collection;
34 import hunt.logging;
35 import hunt.Exceptions;
36 import hunt.text.Common;
37 import hunt.text.StringBuilder;
38 
39 import std.conv;
40 
41 
42 alias DataOutputStream = BufferedOutputStream;
43 alias ByteArrayMap = Map!(string, byte[]);
44 
45 /**
46  * An encoder for STOMP frames.
47  *
48  * @author Andy Wilkinson
49  * @author Rossen Stoyanchev
50  * @since 4.0
51  * @see StompDecoder
52  */
53 class StompEncoder  {
54 
55 	private enum byte LF = '\n';
56 
57 	private enum byte COLON = ':';
58 
59 	private enum int HEADER_KEY_CACHE_LIMIT = 32;
60 
61 
62 	private ByteArrayMap headerKeyAccessCache; // = new ConcurrentHashMap<>(HEADER_KEY_CACHE_LIMIT);
63 
64 	private ByteArrayMap headerKeyUpdateCache;
65 	
66 	this() {
67 		headerKeyAccessCache = new HashMap!(string, byte[])(HEADER_KEY_CACHE_LIMIT);
68 
69 		headerKeyUpdateCache =
70 			new class LinkedHashMap!(string, byte[]) {
71 				
72 				this() {
73 					super(HEADER_KEY_CACHE_LIMIT, 0.75f, true);
74 				}
75 
76 				override
77 				protected bool removeEldestEntry(MapEntry!(string, byte[]) eldest) {
78 					if (size() > HEADER_KEY_CACHE_LIMIT) {
79 						headerKeyAccessCache.remove(eldest.getKey());
80 						return true;
81 					}
82 					else {
83 						return false;
84 					}
85 				}
86 			};
87 	}
88 
89 	/**
90 	 * Encodes the given STOMP {@code message} into a {@code byte[]}.
91 	 * @param message the message to encode
92 	 * @return the encoded message
93 	 */
94 	byte[] encode(Message!(byte[]) message) {
95 		return encode(message.getHeaders(), message.getPayload());
96 	}
97 
98 	/**
99 	 * Encodes the given payload and headers into a {@code byte[]}.
100 	 * @param headers the headers
101 	 * @param payload the payload
102 	 * @return the encoded message
103 	 */
104 	byte[] encode(Map!(string, Object) headers, byte[] payload) {
105 		assert(headers !is null, "'headers' is required");
106 		// assert(payload !is null, "'payload' is required");
107 
108 		try {
109 			ByteArrayOutputStream baos = new ByteArrayOutputStream(128 + cast(int)payload.length);
110 			DataOutputStream output = new DataOutputStream(baos);
111 
112 			if (SimpMessageType.HEARTBEAT == SimpMessageHeaderAccessor.getMessageType(headers)) {
113 				trace("Encoding heartbeat");
114 				output.write(StompDecoder.HEARTBEAT_PAYLOAD);
115 			} else {
116 				StompCommand command = StompHeaderAccessor.getCommand(headers);
117 				if (command == StompCommand.Null) {
118 					throw new IllegalStateException("Missing STOMP command: " ~ (cast(Object)headers).toString());
119 				}
120 
121 				output.write( cast(byte[]) command.toString());
122 				output.write(LF);
123 				writeHeaders(command, headers, payload, output);
124 				output.write(LF);
125 				writeBody(payload, output);
126 				output.write(0);
127 			}
128 			output.flush();
129 			return baos.toByteArray();
130 		}
131 		catch (IOException ex) {
132 			throw new StompConversionException(
133 					"Failed to encode STOMP frame, headers=" ~ headers.toString(), ex);
134 		}
135 	}
136 
137 	private void writeHeaders(StompCommand command, Map!(string, Object) headers, byte[] payload,
138 			DataOutputStream output) {
139 		
140 		Map!(string,List!(string)) nativeHeaders =
141 				cast(Map!(string, List!(string))) headers.get(NativeMessageHeaderAccessor.NATIVE_HEADERS);
142 
143 		version(HUNT_DEBUG) {
144 			trace("Encoding STOMP " ~ command.toString() ~ 
145 				", headers=" ~ nativeHeaders.toString());
146 		}
147 
148 		if (nativeHeaders is null) {
149 			return;
150 		}
151 
152 		bool shouldEscape = (command != StompCommand.CONNECT && command != StompCommand.CONNECTED);
153 
154 		foreach (string key, List!(string) values ; nativeHeaders) {
155 			if (command.requiresContentLength() && "content-length" == key) {
156 				continue;
157 			}
158 
159 			if (StompCommand.CONNECT == command &&
160 					StompHeaderAccessor.STOMP_PASSCODE_HEADER == key) {
161 				values = Collections.singletonList(StompHeaderAccessor.getPasscode(headers));
162 			}
163 
164 			byte[] encodedKey = encodeHeaderKey(key, shouldEscape);
165 			foreach (string value ; values) {
166 				output.write(encodedKey);
167 				output.write(COLON);
168 				output.write(encodeHeaderValue(value, shouldEscape));
169 				output.write(LF);
170 			}
171 		}
172 
173 		if (command.requiresContentLength()) {
174 			size_t contentLength = payload.length;
175 			output.write("content-length:");
176 			output.write(contentLength.to!string());
177 			output.write(LF);
178 		}
179 	}
180 
181 	private byte[] encodeHeaderKey(string input, bool canEscape) {
182 		string inputToUse = (canEscape ? escape(input) : input);
183 		if (this.headerKeyAccessCache.containsKey(inputToUse)) {
184 			return this.headerKeyAccessCache.get(inputToUse);
185 		}
186 		synchronized (this.headerKeyUpdateCache) {
187 			byte[] bytes = this.headerKeyUpdateCache.get(inputToUse);
188 			if (bytes is null) {
189 				bytes = cast(byte[])inputToUse.dup;
190 				this.headerKeyAccessCache.put(inputToUse, bytes);
191 				this.headerKeyUpdateCache.put(inputToUse, bytes);
192 			}
193 			return bytes;
194 		}
195 	}
196 
197 	private byte[] encodeHeaderValue(string input, bool canEscape) {
198 		string inputToUse = (canEscape ? escape(input) : input);
199 		return cast(byte[])inputToUse.dup;
200 	}
201 
202 	/**
203 	 * See STOMP Spec 1.2:
204 	 * <a href="http://stomp.github.io/stomp-specification-1.2.html#Value_Encoding">"Value Encoding"</a>.
205 	 */
206 	private string escape(string inString) {
207 		StringBuilder sb = null;
208 		for (int i = 0; i < inString.length; i++) {
209 			char c = inString.charAt(i);
210 			if (c == '\\') {
211 				sb = getStringBuilder(sb, inString, i);
212 				sb.append("\\\\");
213 			}
214 			else if (c == ':') {
215 				sb = getStringBuilder(sb, inString, i);
216 				sb.append("\\c");
217 			}
218 			else if (c == '\n') {
219 				sb = getStringBuilder(sb, inString, i);
220 				sb.append("\\n");
221 			}
222 			else if (c == '\r') {
223 				sb = getStringBuilder(sb, inString, i);
224 				sb.append("\\r");
225 			}
226 			else if (sb !is null){
227 				sb.append(c);
228 			}
229 		}
230 		return (sb !is null ? sb.toString() : inString);
231 	}
232 
233 	private StringBuilder getStringBuilder(StringBuilder sb, string inString, int i) {
234 		if (sb is null) {
235 			sb = new StringBuilder(inString.length);
236 			sb.append(inString.substring(0, i));
237 		}
238 		return sb;
239 	}
240 
241 	private void writeBody(byte[] payload, DataOutputStream output) {
242 		output.write(payload);
243 	}
244 
245 }