1 /* 2 * Copyright 2002-2017 the original author or authors. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 module hunt.stomp.simp.stomp.StompHeaderAccessor; 18 19 import hunt.stomp.Message; 20 import hunt.stomp.simp.stomp.StompCommand; 21 import hunt.stomp.simp.SimpMessageHeaderAccessor; 22 import hunt.stomp.simp.SimpMessageType; 23 import hunt.stomp.support.MessageHeaderAccessor; 24 25 import hunt.util.MimeType; 26 27 import hunt.collection; 28 import hunt.Exceptions; 29 import hunt.Integer; 30 import hunt.Nullable; 31 import hunt.text.Charset; 32 import hunt.text.Common; 33 import hunt.text.StringBuilder; 34 import hunt.text.StringUtils; 35 36 import std.conv; 37 import std.string; 38 39 40 // import hunt.framework.util.ClassUtils; 41 // import hunt.framework.util.CollectionUtils; 42 import hunt.util.MimeType; 43 // import hunt.framework.util.MimeTypeUtils; 44 // import hunt.framework.util.StringUtils; 45 46 alias Charset = string; 47 48 /** 49 * A {@code MessageHeaderAccessor} to use when creating a {@code Message} from 50 * a decoded STOMP frame, or when encoding a {@code Message} to a STOMP frame. 51 * 52 * <p>When created from STOMP frame content, the actual STOMP headers are 53 * stored in the native header sub-map managed by the parent class 54 * {@link hunt.stomp.support.NativeMessageHeaderAccessor} 55 * while the parent class {@link SimpMessageHeaderAccessor} manages common 56 * processing headers some of which are based on STOMP headers 57 * (e.g. destination, content-type, etc). 58 * 59 * <p>An instance of this class can also be created by wrapping an existing 60 * {@code Message}. That message may have been created with the more generic 61 * {@link hunt.stomp.simp.SimpMessageHeaderAccessor} in 62 * which case STOMP headers are created from common processing headers. 63 * In this case it is also necessary to invoke either 64 * {@link #updateStompCommandAsClientMessage()} or 65 * {@link #updateStompCommandAsServerMessage()} if sending a message and 66 * depending on whether a message is sent to a client or the message broker. 67 * 68 * @author Rossen Stoyanchev 69 * @since 4.0 70 */ 71 class StompHeaderAccessor : SimpMessageHeaderAccessor { 72 73 private static shared(long) messageIdCounter = 0; 74 75 private enum long[] DEFAULT_HEARTBEAT = [0, 0]; 76 77 78 // STOMP header names 79 80 enum string STOMP_ID_HEADER = "id"; 81 82 enum string STOMP_HOST_HEADER = "host"; 83 84 enum string STOMP_ACCEPT_VERSION_HEADER = "accept-version"; 85 86 enum string STOMP_MESSAGE_ID_HEADER = "message-id"; 87 88 enum string STOMP_RECEIPT_HEADER = "receipt"; // any client frame except CONNECT 89 90 enum string STOMP_RECEIPT_ID_HEADER = "receipt-id"; // RECEIPT frame 91 92 enum string STOMP_SUBSCRIPTION_HEADER = "subscription"; 93 94 enum string STOMP_VERSION_HEADER = "version"; 95 96 enum string STOMP_MESSAGE_HEADER = "message"; 97 98 enum string STOMP_ACK_HEADER = "ack"; 99 100 enum string STOMP_NACK_HEADER = "nack"; 101 102 enum string STOMP_LOGIN_HEADER = "login"; 103 104 enum string STOMP_PASSCODE_HEADER = "passcode"; 105 106 enum string STOMP_DESTINATION_HEADER = "destination"; 107 108 enum string STOMP_CONTENT_TYPE_HEADER = "content-type"; 109 110 enum string STOMP_CONTENT_LENGTH_HEADER = "content-length"; 111 112 enum string STOMP_HEARTBEAT_HEADER = "heart-beat"; 113 114 // Other header names 115 116 private enum string COMMAND_HEADER = "stompCommand"; 117 118 private enum string CREDENTIALS_HEADER = "stompCredentials"; 119 120 121 /** 122 * A constructor for creating message headers from a parsed STOMP frame. 123 */ 124 this(StompCommand command, Map!(string, List!(string)) externalSourceHeaders) { 125 super(command.getMessageType(), externalSourceHeaders); 126 setHeader(COMMAND_HEADER, command); 127 updateSimpMessageHeadersFromStompHeaders(); 128 } 129 130 /** 131 * A constructor for accessing and modifying existing message headers. 132 * Note that the message headers may not have been created from a STOMP frame 133 * but may have rather originated from using the more generic 134 * {@link hunt.stomp.simp.SimpMessageHeaderAccessor}. 135 */ 136 this(MessageBase message) { 137 super(message); 138 updateStompHeadersFromSimpMessageHeaders(); 139 } 140 141 this() { 142 super(SimpMessageType.HEARTBEAT, null); 143 } 144 145 146 void updateSimpMessageHeadersFromStompHeaders() { 147 if (getNativeHeaders() is null) { 148 return; 149 } 150 string value = getFirstNativeHeader(STOMP_DESTINATION_HEADER); 151 if (value !is null) { 152 super.setDestination(value); 153 } 154 value = getFirstNativeHeader(STOMP_CONTENT_TYPE_HEADER); 155 if (value !is null) { 156 // super.setContentType(MimeTypeUtils.parseMimeType(value)); 157 super.setContentType(new MimeType(value)); 158 } 159 Nullable!StompCommand command = getCommand(); 160 if (StompCommand.MESSAGE == command) { 161 value = getFirstNativeHeader(STOMP_SUBSCRIPTION_HEADER); 162 if (value !is null) { 163 super.setSubscriptionId(value); 164 } 165 } 166 else if (StompCommand.SUBSCRIBE == command || StompCommand.UNSUBSCRIBE == command) { 167 value = getFirstNativeHeader(STOMP_ID_HEADER); 168 if (value !is null) { 169 super.setSubscriptionId(value); 170 } 171 } 172 else if (StompCommand.CONNECT == command) { 173 protectPasscode(); 174 } 175 } 176 177 void updateStompHeadersFromSimpMessageHeaders() { 178 string destination = getDestination(); 179 if (destination !is null) { 180 setNativeHeader(STOMP_DESTINATION_HEADER, destination); 181 } 182 MimeType contentType = getContentType(); 183 if (contentType !is null) { 184 setNativeHeader(STOMP_CONTENT_TYPE_HEADER, contentType.toString()); 185 } 186 trySetStompHeaderForSubscriptionId(); 187 } 188 189 190 override 191 protected MessageHeaderAccessor createAccessor(MessageBase message) { 192 return wrap(message); 193 } 194 195 // Redeclared for visibility within simp.stomp 196 override 197 protected Map!(string, List!(string)) getNativeHeaders() { 198 return super.getNativeHeaders(); 199 } 200 201 StompCommand updateStompCommandAsClientMessage() { 202 Nullable!SimpMessageType messageType = getMessageType(); 203 if (messageType != SimpMessageType.MESSAGE) { 204 throw new IllegalStateException("Unexpected message type " ~ messageType.toString()); 205 } 206 Nullable!StompCommand command = getCommand(); 207 if (command is null) { 208 command = new Nullable!StompCommand(StompCommand.SEND); 209 setHeader(COMMAND_HEADER, command); 210 } 211 else if (command != (StompCommand.SEND)) { 212 throw new IllegalStateException("Unexpected STOMP command " ~ command.toString()); 213 } 214 return command; 215 } 216 217 void updateStompCommandAsServerMessage() { 218 Nullable!SimpMessageType messageType = getMessageType(); 219 if (messageType != SimpMessageType.MESSAGE) { 220 throw new IllegalStateException("Unexpected message type " ~ messageType.toString()); 221 } 222 Nullable!StompCommand command = getCommand(); 223 if ((command is null) || StompCommand.SEND == command) { 224 setHeader(COMMAND_HEADER, StompCommand.MESSAGE); 225 } 226 else if (StompCommand.MESSAGE != command) { 227 throw new IllegalStateException("Unexpected STOMP command " ~ command.toString()); 228 } 229 trySetStompHeaderForSubscriptionId(); 230 if (getMessageId() is null) { 231 import core.atomic; 232 long c = atomicOp!"+="(messageIdCounter, 1); 233 string messageId = getSessionId() ~ "-" ~ (c-1).to!string(); 234 setNativeHeader(STOMP_MESSAGE_ID_HEADER, messageId); 235 } 236 } 237 238 /** 239 * Return the STOMP command, or {@code null} if not yet set. 240 */ 241 242 Nullable!StompCommand getCommand() { 243 return cast(Nullable!StompCommand) getHeader(COMMAND_HEADER); 244 } 245 246 bool isHeartbeat() { 247 return (SimpMessageType.HEARTBEAT == getMessageType()); 248 } 249 250 long[] getHeartbeat() { 251 string rawValue = getFirstNativeHeader(STOMP_HEARTBEAT_HEADER); 252 string[] rawValues = StringUtils.split(rawValue, ","); 253 if (rawValues is null) { 254 return DEFAULT_HEARTBEAT.dup; 255 } 256 return [rawValues[0].to!long(), rawValues[1].to!long()]; 257 } 258 259 void setAcceptVersion(string acceptVersion) { 260 setNativeHeader(STOMP_ACCEPT_VERSION_HEADER, acceptVersion); 261 } 262 263 string[] getAcceptVersion() { 264 string rawValue = getFirstNativeHeader(STOMP_ACCEPT_VERSION_HEADER); 265 return split(rawValue, ","); 266 // return (rawValue !is null ? StringUtils.commaDelimitedListToSet(rawValue) : Collections.emptySet!string()); 267 } 268 269 void setHost(string host) { 270 setNativeHeader(STOMP_HOST_HEADER, host); 271 } 272 273 string getHost() { 274 return getFirstNativeHeader(STOMP_HOST_HEADER); 275 } 276 277 override 278 void setDestination(string destination) { 279 super.setDestination(destination); 280 setNativeHeader(STOMP_DESTINATION_HEADER, destination); 281 } 282 283 override 284 void setContentType(MimeType contentType) { 285 super.setContentType(contentType); 286 setNativeHeader(STOMP_CONTENT_TYPE_HEADER, contentType.toString()); 287 } 288 289 override 290 void setSubscriptionId(string subscriptionId) { 291 super.setSubscriptionId(subscriptionId); 292 trySetStompHeaderForSubscriptionId(); 293 } 294 295 private void trySetStompHeaderForSubscriptionId() { 296 string subscriptionId = getSubscriptionId(); 297 if (subscriptionId !is null) { 298 Nullable!StompCommand command = getCommand(); 299 if (command !is null && StompCommand.MESSAGE == command) { 300 setNativeHeader(STOMP_SUBSCRIPTION_HEADER, subscriptionId); 301 } 302 else { 303 SimpMessageType messageType = getMessageType(); 304 if (SimpMessageType.SUBSCRIBE == messageType || SimpMessageType.UNSUBSCRIBE == messageType) { 305 setNativeHeader(STOMP_ID_HEADER, subscriptionId); 306 } 307 } 308 } 309 } 310 311 312 Integer getContentLength() { 313 string header = getFirstNativeHeader(STOMP_CONTENT_LENGTH_HEADER); 314 return (header is null ? null : Integer.valueOf(header)); 315 } 316 317 void setContentLength(int contentLength) { 318 setNativeHeader(STOMP_CONTENT_LENGTH_HEADER, contentLength.to!string()); 319 } 320 321 void setHeartbeat(long cx, long cy) { 322 setNativeHeader(STOMP_HEARTBEAT_HEADER, cx.to!string() ~ "," ~ cy.to!string()); 323 } 324 325 void setAck(string ack) { 326 setNativeHeader(STOMP_ACK_HEADER, ack); 327 } 328 329 330 string getAck() { 331 return getFirstNativeHeader(STOMP_ACK_HEADER); 332 } 333 334 void setNack(string nack) { 335 setNativeHeader(STOMP_NACK_HEADER, nack); 336 } 337 338 339 string getNack() { 340 return getFirstNativeHeader(STOMP_NACK_HEADER); 341 } 342 343 void setLogin(string login) { 344 setNativeHeader(STOMP_LOGIN_HEADER, login); 345 } 346 347 348 string getLogin() { 349 return getFirstNativeHeader(STOMP_LOGIN_HEADER); 350 } 351 352 void setPasscode(string passcode) { 353 setNativeHeader(STOMP_PASSCODE_HEADER, passcode); 354 protectPasscode(); 355 } 356 357 private void protectPasscode() { 358 string value = getFirstNativeHeader(STOMP_PASSCODE_HEADER); 359 if (value !is null && "PROTECTED" != value) { 360 setHeader(CREDENTIALS_HEADER, new StompPasscode(value)); 361 setNativeHeader(STOMP_PASSCODE_HEADER, "PROTECTED"); 362 } 363 } 364 365 /** 366 * Return the passcode header value, or {@code null} if not set. 367 */ 368 369 string getPasscode() { 370 StompPasscode credentials = cast(StompPasscode) getHeader(CREDENTIALS_HEADER); 371 return (credentials !is null ? credentials.passcode : null); 372 } 373 374 void setReceiptId(string receiptId) { 375 setNativeHeader(STOMP_RECEIPT_ID_HEADER, receiptId); 376 } 377 378 379 string getReceiptId() { 380 return getFirstNativeHeader(STOMP_RECEIPT_ID_HEADER); 381 } 382 383 void setReceipt(string receiptId) { 384 setNativeHeader(STOMP_RECEIPT_HEADER, receiptId); 385 } 386 387 388 string getReceipt() { 389 return getFirstNativeHeader(STOMP_RECEIPT_HEADER); 390 } 391 392 393 string getMessage() { 394 return getFirstNativeHeader(STOMP_MESSAGE_HEADER); 395 } 396 397 void setMessage(string content) { 398 setNativeHeader(STOMP_MESSAGE_HEADER, content); 399 } 400 401 402 string getMessageId() { 403 return getFirstNativeHeader(STOMP_MESSAGE_ID_HEADER); 404 } 405 406 void setMessageId(string id) { 407 setNativeHeader(STOMP_MESSAGE_ID_HEADER, id); 408 } 409 410 411 string getVersion() { 412 return getFirstNativeHeader(STOMP_VERSION_HEADER); 413 } 414 415 void setVersion(string ver) { 416 setNativeHeader(STOMP_VERSION_HEADER, ver); 417 } 418 419 420 // Logging related 421 422 override 423 string getShortLogMessage(Object payload) { 424 Nullable!StompCommand command = getCommand(); 425 if (StompCommand.SUBSCRIBE == command) { 426 return "SUBSCRIBE " ~ getDestination() ~ " id=" ~ getSubscriptionId() ~ appendSession(); 427 } 428 else if (StompCommand.UNSUBSCRIBE == command) { 429 return "UNSUBSCRIBE id=" ~ getSubscriptionId() ~ appendSession(); 430 } 431 else if (StompCommand.SEND == command) { 432 return "SEND " ~ getDestination() ~ appendSession() ~ appendPayload(payload); 433 } 434 else if (StompCommand.CONNECT == command) { 435 // Principal user = getUser(); 436 // return "CONNECT" ~ (user !is null ? " user=" ~ user.getName() : "") ~ appendSession(); 437 return "CONNECT" ~ appendSession(); 438 } 439 else if (StompCommand.CONNECTED == command) { 440 return "CONNECTED heart-beat=" ~ to!string(getHeartbeat()) ~ appendSession(); 441 } 442 else if (StompCommand.DISCONNECT == command) { 443 string receipt = getReceipt(); 444 return "DISCONNECT" ~ (receipt !is null ? " receipt=" ~ receipt : "") ~ appendSession(); 445 } 446 else { 447 return getDetailedLogMessage(payload); 448 } 449 } 450 451 override 452 string getDetailedLogMessage(Object payload) { 453 if (isHeartbeat()) { 454 string sessionId = getSessionId(); 455 return "heart-beat" ~ (sessionId !is null ? " in session " ~ sessionId : ""); 456 } 457 Nullable!StompCommand command = getCommand(); 458 if (command is null) { 459 return super.getDetailedLogMessage(payload); 460 } 461 StringBuilder sb = new StringBuilder(); 462 sb.append(command.name()).append(" "); 463 Map!(string, List!(string)) nativeHeaders = getNativeHeaders(); 464 if (nativeHeaders !is null) { 465 sb.append(nativeHeaders.toString()); 466 } 467 sb.append(appendSession()); 468 // if (getUser() !is null) { 469 // sb.append(", user=").append(getUser().getName()); 470 // } 471 if (payload !is null && command.isBodyAllowed()) { 472 sb.append(appendPayload(payload)); 473 } 474 return sb.toString(); 475 } 476 477 private string appendSession() { 478 return " session=" ~ getSessionId(); 479 } 480 481 private string appendPayload(Object payload) { 482 Nullable!(byte[]) _payload = cast(Nullable!(byte[])) payload; 483 if (_payload is null) { 484 throw new IllegalStateException( 485 "Expected byte array payload but got: " ~ typeid(payload).name); 486 } 487 byte[] bytes = _payload.value; 488 MimeType mimeType = getContentType(); 489 string contentType = (mimeType !is null ? " " ~ mimeType.toString() : ""); 490 if (bytes.length == 0 || mimeType is null || !isReadableContentType()) { 491 return contentType; 492 } 493 // Charset charset = mimeType.getCharset(); 494 // charset = (charset !is null ? charset : StandardCharsets.UTF_8); 495 return (bytes.length < 80) ? 496 contentType ~ " payload=" ~ cast(string)(bytes) : 497 contentType ~ " payload=" ~ cast(string)(bytes[0..80]) ~ "...(truncated)"; 498 } 499 500 501 // Static factory methods and accessors 502 503 /** 504 * Create an instance for the given STOMP command. 505 */ 506 static StompHeaderAccessor create(StompCommand command) { 507 return new StompHeaderAccessor(command, null); 508 } 509 510 /** 511 * Create an instance for the given STOMP command and headers. 512 */ 513 static StompHeaderAccessor create(StompCommand command, Map!(string, List!(string)) headers) { 514 return new StompHeaderAccessor(command, headers); 515 } 516 517 /** 518 * Create headers for a heartbeat. While a STOMP heartbeat frame does not 519 * have headers, a session id is needed for processing purposes at a minimum. 520 */ 521 static StompHeaderAccessor createForHeartbeat() { 522 return new StompHeaderAccessor(); 523 } 524 525 /** 526 * Create an instance from the payload and headers of the given Message. 527 */ 528 static StompHeaderAccessor wrap(MessageBase message) { 529 return new StompHeaderAccessor(message); 530 } 531 532 /** 533 * Return the STOMP command from the given headers, or {@code null} if not set. 534 */ 535 536 static Nullable!StompCommand getCommand(Map!(string, Object) headers) { 537 return cast(Nullable!StompCommand) headers.get(COMMAND_HEADER); 538 } 539 540 /** 541 * Return the passcode header value, or {@code null} if not set. 542 */ 543 544 static string getPasscode(Map!(string, Object) headers) { 545 StompPasscode credentials = cast(StompPasscode) headers.get(CREDENTIALS_HEADER); 546 return (credentials !is null ? credentials.passcode : null); 547 } 548 549 550 static Integer getContentLength(Map!(string, List!(string)) nativeHeaders) { 551 List!(string) values = nativeHeaders.get(STOMP_CONTENT_LENGTH_HEADER); 552 bool isEmpty = values is null || values.isEmpty(); 553 return (isEmpty ? null : Integer.valueOf(values.get(0))); 554 } 555 556 } 557 558 559 private class StompPasscode { 560 561 private string passcode; 562 563 this(string passcode) { 564 this.passcode = passcode; 565 } 566 567 override 568 string toString() { 569 return "[PROTECTED]"; 570 } 571 }