1 /*
2  * Copyright 2002-2017 the original author or authors.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 module hunt.stomp.simp.stomp.StompHeaderAccessor;
18 
19 import hunt.stomp.Message;
20 import hunt.stomp.simp.stomp.StompCommand;
21 import hunt.stomp.simp.SimpMessageHeaderAccessor;
22 import hunt.stomp.simp.SimpMessageType;
23 import hunt.stomp.support.MessageHeaderAccessor;
24 
25 import hunt.util.MimeType;
26 
27 import hunt.collection;
28 import hunt.Exceptions;
29 import hunt.Integer;
30 import hunt.Nullable;
31 import hunt.text.Charset;
32 import hunt.text.Common;
33 import hunt.text.StringBuilder;
34 import hunt.text.StringUtils;
35 
36 import std.conv;
37 import std.string;
38 
39 
40 // import hunt.framework.util.ClassUtils;
41 // import hunt.framework.util.CollectionUtils;
42 import hunt.util.MimeType;
43 // import hunt.framework.util.MimeTypeUtils;
44 // import hunt.framework.util.StringUtils;
45 
46 alias Charset = string;
47 
48 /**
49  * A {@code MessageHeaderAccessor} to use when creating a {@code Message} from
50  * a decoded STOMP frame, or when encoding a {@code Message} to a STOMP frame.
51  *
52  * <p>When created from STOMP frame content, the actual STOMP headers are
53  * stored in the native header sub-map managed by the parent class
54  * {@link hunt.stomp.support.NativeMessageHeaderAccessor}
55  * while the parent class {@link SimpMessageHeaderAccessor} manages common
56  * processing headers some of which are based on STOMP headers
57  * (e.g. destination, content-type, etc).
58  *
59  * <p>An instance of this class can also be created by wrapping an existing
60  * {@code Message}. That message may have been created with the more generic
61  * {@link hunt.stomp.simp.SimpMessageHeaderAccessor} in
62  * which case STOMP headers are created from common processing headers.
63  * In this case it is also necessary to invoke either
64  * {@link #updateStompCommandAsClientMessage()} or
65  * {@link #updateStompCommandAsServerMessage()} if sending a message and
66  * depending on whether a message is sent to a client or the message broker.
67  *
68  * @author Rossen Stoyanchev
69  * @since 4.0
70  */
71 class StompHeaderAccessor : SimpMessageHeaderAccessor {
72 
73 	private static shared(long) messageIdCounter = 0;
74 
75 	private enum long[] DEFAULT_HEARTBEAT = [0, 0];
76 
77 
78 	// STOMP header names
79 
80 	enum string STOMP_ID_HEADER = "id";
81 
82 	enum string STOMP_HOST_HEADER = "host";
83 
84 	enum string STOMP_ACCEPT_VERSION_HEADER = "accept-version";
85 
86 	enum string STOMP_MESSAGE_ID_HEADER = "message-id";
87 
88 	enum string STOMP_RECEIPT_HEADER = "receipt"; // any client frame except CONNECT
89 
90 	enum string STOMP_RECEIPT_ID_HEADER = "receipt-id"; // RECEIPT frame
91 
92 	enum string STOMP_SUBSCRIPTION_HEADER = "subscription";
93 
94 	enum string STOMP_VERSION_HEADER = "version";
95 
96 	enum string STOMP_MESSAGE_HEADER = "message";
97 
98 	enum string STOMP_ACK_HEADER = "ack";
99 
100 	enum string STOMP_NACK_HEADER = "nack";
101 
102 	enum string STOMP_LOGIN_HEADER = "login";
103 
104 	enum string STOMP_PASSCODE_HEADER = "passcode";
105 
106 	enum string STOMP_DESTINATION_HEADER = "destination";
107 
108 	enum string STOMP_CONTENT_TYPE_HEADER = "content-type";
109 
110 	enum string STOMP_CONTENT_LENGTH_HEADER = "content-length";
111 
112 	enum string STOMP_HEARTBEAT_HEADER = "heart-beat";
113 
114 	// Other header names
115 
116 	private enum string COMMAND_HEADER = "stompCommand";
117 
118 	private enum string CREDENTIALS_HEADER = "stompCredentials";
119 
120 
121 	/**
122 	 * A constructor for creating message headers from a parsed STOMP frame.
123 	 */
124 	this(StompCommand command, Map!(string, List!(string)) externalSourceHeaders) {
125 		super(command.getMessageType(), externalSourceHeaders);
126 		setHeader(COMMAND_HEADER, command);
127 		updateSimpMessageHeadersFromStompHeaders();
128 	}
129 
130 	/**
131 	 * A constructor for accessing and modifying existing message headers.
132 	 * Note that the message headers may not have been created from a STOMP frame
133 	 * but may have rather originated from using the more generic
134 	 * {@link hunt.stomp.simp.SimpMessageHeaderAccessor}.
135 	 */
136 	this(MessageBase message) {
137 		super(message);
138 		updateStompHeadersFromSimpMessageHeaders();
139 	}
140 
141 	this() {
142 		super(SimpMessageType.HEARTBEAT, null);
143 	}
144 
145 
146 	void updateSimpMessageHeadersFromStompHeaders() {
147 		if (getNativeHeaders() is null) {
148 			return;
149 		}
150 		string value = getFirstNativeHeader(STOMP_DESTINATION_HEADER);
151 		if (value !is null) {
152 			super.setDestination(value);
153 		}
154 		value = getFirstNativeHeader(STOMP_CONTENT_TYPE_HEADER);
155 		if (value !is null) {
156 			// super.setContentType(MimeTypeUtils.parseMimeType(value));
157 			super.setContentType(new MimeType(value));
158 		}
159 		Nullable!StompCommand command = getCommand();
160 		if (StompCommand.MESSAGE == command) {
161 			value = getFirstNativeHeader(STOMP_SUBSCRIPTION_HEADER);
162 			if (value !is null) {
163 				super.setSubscriptionId(value);
164 			}
165 		}
166 		else if (StompCommand.SUBSCRIBE == command || StompCommand.UNSUBSCRIBE == command) {
167 			value = getFirstNativeHeader(STOMP_ID_HEADER);
168 			if (value !is null) {
169 				super.setSubscriptionId(value);
170 			}
171 		}
172 		else if (StompCommand.CONNECT == command) {
173 			protectPasscode();
174 		}
175 	}
176 
177 	void updateStompHeadersFromSimpMessageHeaders() {
178 		string destination = getDestination();
179 		if (destination !is null) {
180 			setNativeHeader(STOMP_DESTINATION_HEADER, destination);
181 		}
182 		MimeType contentType = getContentType();
183 		if (contentType !is null) {
184 			setNativeHeader(STOMP_CONTENT_TYPE_HEADER, contentType.toString());
185 		}
186 		trySetStompHeaderForSubscriptionId();
187 	}
188 
189 
190 	override
191 	protected MessageHeaderAccessor createAccessor(MessageBase message) {
192 		return wrap(message);
193 	}
194 
195 	// Redeclared for visibility within simp.stomp
196 	override
197 	protected Map!(string, List!(string)) getNativeHeaders() {
198 		return super.getNativeHeaders();
199 	}
200 
201 	StompCommand updateStompCommandAsClientMessage() {
202 		Nullable!SimpMessageType messageType = getMessageType();
203 		if (messageType != SimpMessageType.MESSAGE) {
204 			throw new IllegalStateException("Unexpected message type " ~ messageType.toString());
205 		}
206 		Nullable!StompCommand command = getCommand();
207 		if (command is null) {
208 			command = new Nullable!StompCommand(StompCommand.SEND);
209 			setHeader(COMMAND_HEADER, command);
210 		}
211 		else if (command != (StompCommand.SEND)) {
212 			throw new IllegalStateException("Unexpected STOMP command " ~ command.toString());
213 		}
214 		return command;
215 	}
216 
217 	void updateStompCommandAsServerMessage() {
218 		Nullable!SimpMessageType messageType = getMessageType();
219 		if (messageType != SimpMessageType.MESSAGE) {
220 			throw new IllegalStateException("Unexpected message type " ~ messageType.toString());
221 		}
222 		Nullable!StompCommand command = getCommand();
223 		if ((command is null) || StompCommand.SEND == command) {
224 			setHeader(COMMAND_HEADER, StompCommand.MESSAGE);
225 		}
226 		else if (StompCommand.MESSAGE != command) {
227 			throw new IllegalStateException("Unexpected STOMP command " ~ command.toString());
228 		}
229 		trySetStompHeaderForSubscriptionId();
230 		if (getMessageId() is null) {
231 			import core.atomic;
232 			long c = atomicOp!"+="(messageIdCounter, 1);
233 			string messageId = getSessionId() ~ "-" ~ (c-1).to!string();
234 			setNativeHeader(STOMP_MESSAGE_ID_HEADER, messageId);
235 		}
236 	}
237 
238 	/**
239 	 * Return the STOMP command, or {@code null} if not yet set.
240 	 */
241 	
242 	Nullable!StompCommand getCommand() {
243 		return cast(Nullable!StompCommand) getHeader(COMMAND_HEADER);
244 	}
245 
246 	bool isHeartbeat() {
247 		return (SimpMessageType.HEARTBEAT == getMessageType());
248 	}
249 
250 	long[] getHeartbeat() {
251 		string rawValue = getFirstNativeHeader(STOMP_HEARTBEAT_HEADER);
252 		string[] rawValues = StringUtils.split(rawValue, ",");
253 		if (rawValues is null) {
254 			return DEFAULT_HEARTBEAT.dup;
255 		}
256 		return [rawValues[0].to!long(), rawValues[1].to!long()];
257 	}
258 
259 	void setAcceptVersion(string acceptVersion) {
260 		setNativeHeader(STOMP_ACCEPT_VERSION_HEADER, acceptVersion);
261 	}
262 
263 	string[] getAcceptVersion() {
264 		string rawValue = getFirstNativeHeader(STOMP_ACCEPT_VERSION_HEADER);
265 		return split(rawValue, ",");
266 		// return (rawValue !is null ? StringUtils.commaDelimitedListToSet(rawValue) : Collections.emptySet!string());
267 	}
268 
269 	void setHost(string host) {
270 		setNativeHeader(STOMP_HOST_HEADER, host);
271 	}
272 
273 	string getHost() {
274 		return getFirstNativeHeader(STOMP_HOST_HEADER);
275 	}
276 
277 	override
278 	void setDestination(string destination) {
279 		super.setDestination(destination);
280 		setNativeHeader(STOMP_DESTINATION_HEADER, destination);
281 	}
282 
283 	override
284 	void setContentType(MimeType contentType) {
285 		super.setContentType(contentType);
286 		setNativeHeader(STOMP_CONTENT_TYPE_HEADER, contentType.toString());
287 	}
288 
289 	override
290 	void setSubscriptionId(string subscriptionId) {
291 		super.setSubscriptionId(subscriptionId);
292 		trySetStompHeaderForSubscriptionId();
293 	}
294 
295 	private void trySetStompHeaderForSubscriptionId() {
296 		string subscriptionId = getSubscriptionId();
297 		if (subscriptionId !is null) {
298 			Nullable!StompCommand command = getCommand();
299 			if (command !is null && StompCommand.MESSAGE == command) {
300 				setNativeHeader(STOMP_SUBSCRIPTION_HEADER, subscriptionId);
301 			}
302 			else {
303 				SimpMessageType messageType = getMessageType();
304 				if (SimpMessageType.SUBSCRIBE == messageType || SimpMessageType.UNSUBSCRIBE == messageType) {
305 					setNativeHeader(STOMP_ID_HEADER, subscriptionId);
306 				}
307 			}
308 		}
309 	}
310 
311 	
312 	Integer getContentLength() {
313 		string header = getFirstNativeHeader(STOMP_CONTENT_LENGTH_HEADER);
314 		return (header is null ? null : Integer.valueOf(header));
315 	}
316 
317 	void setContentLength(int contentLength) {
318 		setNativeHeader(STOMP_CONTENT_LENGTH_HEADER, contentLength.to!string());
319 	}
320 
321 	void setHeartbeat(long cx, long cy) {
322 		setNativeHeader(STOMP_HEARTBEAT_HEADER, cx.to!string() ~ "," ~ cy.to!string());
323 	}
324 
325 	void setAck(string ack) {
326 		setNativeHeader(STOMP_ACK_HEADER, ack);
327 	}
328 
329 	
330 	string getAck() {
331 		return getFirstNativeHeader(STOMP_ACK_HEADER);
332 	}
333 
334 	void setNack(string nack) {
335 		setNativeHeader(STOMP_NACK_HEADER, nack);
336 	}
337 
338 	
339 	string getNack() {
340 		return getFirstNativeHeader(STOMP_NACK_HEADER);
341 	}
342 
343 	void setLogin(string login) {
344 		setNativeHeader(STOMP_LOGIN_HEADER, login);
345 	}
346 
347 	
348 	string getLogin() {
349 		return getFirstNativeHeader(STOMP_LOGIN_HEADER);
350 	}
351 
352 	void setPasscode(string passcode) {
353 		setNativeHeader(STOMP_PASSCODE_HEADER, passcode);
354 		protectPasscode();
355 	}
356 
357 	private void protectPasscode() {
358 		string value = getFirstNativeHeader(STOMP_PASSCODE_HEADER);
359 		if (value !is null && "PROTECTED" != value) {
360 			setHeader(CREDENTIALS_HEADER, new StompPasscode(value));
361 			setNativeHeader(STOMP_PASSCODE_HEADER, "PROTECTED");
362 		}
363 	}
364 
365 	/**
366 	 * Return the passcode header value, or {@code null} if not set.
367 	 */
368 	
369 	string getPasscode() {
370 		StompPasscode credentials = cast(StompPasscode) getHeader(CREDENTIALS_HEADER);
371 		return (credentials !is null ? credentials.passcode : null);
372 	}
373 
374 	void setReceiptId(string receiptId) {
375 		setNativeHeader(STOMP_RECEIPT_ID_HEADER, receiptId);
376 	}
377 
378 	
379 	string getReceiptId() {
380 		return getFirstNativeHeader(STOMP_RECEIPT_ID_HEADER);
381 	}
382 
383 	void setReceipt(string receiptId) {
384 		setNativeHeader(STOMP_RECEIPT_HEADER, receiptId);
385 	}
386 
387 	
388 	string getReceipt() {
389 		return getFirstNativeHeader(STOMP_RECEIPT_HEADER);
390 	}
391 
392 	
393 	string getMessage() {
394 		return getFirstNativeHeader(STOMP_MESSAGE_HEADER);
395 	}
396 
397 	void setMessage(string content) {
398 		setNativeHeader(STOMP_MESSAGE_HEADER, content);
399 	}
400 
401 	
402 	string getMessageId() {
403 		return getFirstNativeHeader(STOMP_MESSAGE_ID_HEADER);
404 	}
405 
406 	void setMessageId(string id) {
407 		setNativeHeader(STOMP_MESSAGE_ID_HEADER, id);
408 	}
409 
410 	
411 	string getVersion() {
412 		return getFirstNativeHeader(STOMP_VERSION_HEADER);
413 	}
414 
415 	void setVersion(string ver) {
416 		setNativeHeader(STOMP_VERSION_HEADER, ver);
417 	}
418 
419 
420 	// Logging related
421 
422 	override
423 	string getShortLogMessage(Object payload) {
424 		Nullable!StompCommand command = getCommand();
425 		if (StompCommand.SUBSCRIBE == command) {
426 			return "SUBSCRIBE " ~ getDestination() ~ " id=" ~ getSubscriptionId() ~ appendSession();
427 		}
428 		else if (StompCommand.UNSUBSCRIBE == command) {
429 			return "UNSUBSCRIBE id=" ~ getSubscriptionId() ~ appendSession();
430 		}
431 		else if (StompCommand.SEND == command) {
432 			return "SEND " ~ getDestination() ~ appendSession() ~ appendPayload(payload);
433 		}
434 		else if (StompCommand.CONNECT == command) {
435 			// Principal user = getUser();
436 			// return "CONNECT" ~ (user !is null ? " user=" ~ user.getName() : "") ~ appendSession();
437 			return "CONNECT" ~ appendSession();
438 		}
439 		else if (StompCommand.CONNECTED == command) {
440 			return "CONNECTED heart-beat=" ~ to!string(getHeartbeat()) ~ appendSession();
441 		}
442 		else if (StompCommand.DISCONNECT == command) {
443 			string receipt = getReceipt();
444 			return "DISCONNECT" ~ (receipt !is null ? " receipt=" ~ receipt : "") ~ appendSession();
445 		}
446 		else {
447 			return getDetailedLogMessage(payload);
448 		}
449 	}
450 
451 	override
452 	string getDetailedLogMessage(Object payload) {
453 		if (isHeartbeat()) {
454 			string sessionId = getSessionId();
455 			return "heart-beat" ~ (sessionId !is null ? " in session " ~ sessionId : "");
456 		}
457 		Nullable!StompCommand command = getCommand();
458 		if (command is null) {
459 			return super.getDetailedLogMessage(payload);
460 		}
461 		StringBuilder sb = new StringBuilder();
462 		sb.append(command.name()).append(" ");
463 		Map!(string, List!(string)) nativeHeaders = getNativeHeaders();
464 		if (nativeHeaders !is null) {
465 			sb.append(nativeHeaders.toString());
466 		}
467 		sb.append(appendSession());
468 		// if (getUser() !is null) {
469 		// 	sb.append(", user=").append(getUser().getName());
470 		// }
471 		if (payload !is null && command.isBodyAllowed()) {
472 			sb.append(appendPayload(payload));
473 		}
474 		return sb.toString();
475 	}
476 
477 	private string appendSession() {
478 		return " session=" ~ getSessionId();
479 	}
480 
481 	private string appendPayload(Object payload) {
482 		Nullable!(byte[]) _payload = cast(Nullable!(byte[])) payload;
483 		if (_payload is null) {
484 			throw new IllegalStateException(
485 					"Expected byte array payload but got: " ~ typeid(payload).name);
486 		}
487 		byte[] bytes = _payload.value;
488 		MimeType mimeType = getContentType();
489 		string contentType = (mimeType !is null ? " " ~ mimeType.toString() : "");
490 		if (bytes.length == 0 || mimeType is null || !isReadableContentType()) {
491 			return contentType;
492 		}
493 		// Charset charset = mimeType.getCharset();
494 		// charset = (charset !is null ? charset : StandardCharsets.UTF_8);
495 		return (bytes.length < 80) ?
496 				contentType ~ " payload=" ~ cast(string)(bytes) :
497 				contentType ~ " payload=" ~ cast(string)(bytes[0..80]) ~ "...(truncated)";
498 	}
499 
500 
501 	// Static factory methods and accessors
502 
503 	/**
504 	 * Create an instance for the given STOMP command.
505 	 */
506 	static StompHeaderAccessor create(StompCommand command) {
507 		return new StompHeaderAccessor(command, null);
508 	}
509 
510 	/**
511 	 * Create an instance for the given STOMP command and headers.
512 	 */
513 	static StompHeaderAccessor create(StompCommand command, Map!(string, List!(string)) headers) {
514 		return new StompHeaderAccessor(command, headers);
515 	}
516 
517 	/**
518 	 * Create headers for a heartbeat. While a STOMP heartbeat frame does not
519 	 * have headers, a session id is needed for processing purposes at a minimum.
520 	 */
521 	static StompHeaderAccessor createForHeartbeat() {
522 		return new StompHeaderAccessor();
523 	}
524 
525 	/**
526 	 * Create an instance from the payload and headers of the given Message.
527 	 */
528 	static StompHeaderAccessor wrap(MessageBase message) {
529 		return new StompHeaderAccessor(message);
530 	}
531 
532 	/**
533 	 * Return the STOMP command from the given headers, or {@code null} if not set.
534 	 */
535 	
536 	static Nullable!StompCommand getCommand(Map!(string, Object) headers) {
537 		return cast(Nullable!StompCommand) headers.get(COMMAND_HEADER);
538 	}
539 
540 	/**
541 	 * Return the passcode header value, or {@code null} if not set.
542 	 */
543 	
544 	static string getPasscode(Map!(string, Object) headers) {
545 		StompPasscode credentials = cast(StompPasscode) headers.get(CREDENTIALS_HEADER);
546 		return (credentials !is null ? credentials.passcode : null);
547 	}
548 
549 	
550 	static Integer getContentLength(Map!(string, List!(string)) nativeHeaders) {
551 		List!(string) values = nativeHeaders.get(STOMP_CONTENT_LENGTH_HEADER);
552 		bool isEmpty = values is null || values.isEmpty();
553 		return (isEmpty ? null : Integer.valueOf(values.get(0)));
554 	}
555 
556 }
557 
558 
559 private class StompPasscode {
560 
561 	private string passcode;
562 
563 	this(string passcode) {
564 		this.passcode = passcode;
565 	}
566 
567 	override
568 	string toString() {
569 		return "[PROTECTED]";
570 	}
571 }