1 /* 2 * Copyright 2002-2018 the original author or authors. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 module hunt.stomp.simp.stomp.StompBrokerRelayMessageHandler; 18 19 // import hunt.security.Principal; 20 // import hunt.collection; 21 22 // import hunt.stomp.Message; 23 // import hunt.stomp.MessageChannel; 24 // import hunt.stomp.MessageDeliveryException; 25 // 26 // import hunt.stomp.MessageHeaders; 27 // 28 // 29 // import hunt.stomp.simp.SimpMessageHeaderAccessor; 30 // import hunt.stomp.simp.SimpMessageType; 31 // import hunt.stomp.simp.broker.AbstractBrokerMessageHandler; 32 // import hunt.stomp.support.MessageBuilder; 33 // import hunt.stomp.support.MessageHeaderAccessor; 34 // 35 // // import hunt.stomp.tcp.FixedIntervalReconnectStrategy; 36 // // import hunt.stomp.tcp.TcpConnection; 37 // // import hunt.stomp.tcp.TcpConnectionHandler; 38 // // import hunt.stomp.tcp.TcpOperations; 39 // // import hunt.stomp.tcp.reactor.ReactorNettyCodec; 40 // // import hunt.stomp.tcp.reactor.ReactorNettyTcpClient; 41 42 // // import hunt.framework.util.concurrent.ListenableFuture; 43 // // import hunt.framework.util.concurrent.ListenableFutureCallback; 44 // // import hunt.framework.util.concurrent.ListenableFutureTask; 45 46 // /** 47 // * A {@link hunt.stomp.MessageHandler} that handles messages by 48 // * forwarding them to a STOMP broker. 49 // * 50 // * <p>For each new {@link SimpMessageType#CONNECT CONNECT} message, an independent TCP 51 // * connection to the broker is opened and used exclusively for all messages from the 52 // * client that originated the CONNECT message. Messages from the same client are 53 // * identified through the session id message header. Reversely, when the STOMP broker 54 // * sends messages back on the TCP connection, those messages are enriched with the 55 // * session id of the client and sent back downstream through the {@link MessageChannel} 56 // * provided to the constructor. 57 // * 58 // * <p>This class also automatically opens a default "system" TCP connection to the 59 // * message broker that is used for sending messages that originate from the server 60 // * application (as opposed to from a client). Such messages are not associated with 61 // * any client and therefore do not have a session id header. The "system" connection 62 // * is effectively shared and cannot be used to receive messages. Several properties 63 // * are provided to configure the "system" connection including: 64 // * <ul> 65 // * <li>{@link #setSystemLogin}</li> 66 // * <li>{@link #setSystemPasscode}</li> 67 // * <li>{@link #setSystemHeartbeatSendInterval}</li> 68 // * <li>{@link #setSystemHeartbeatReceiveInterval}</li> 69 // * </ul> 70 // * 71 // * @author Rossen Stoyanchev 72 // * @author Andy Wilkinson 73 // * @since 4.0 74 // */ 75 class StompBrokerRelayMessageHandler { // : AbstractBrokerMessageHandler 76 77 // /** 78 // * The system session ID. 79 // */ 80 // enum string SYSTEM_SESSION_ID = "_system_"; 81 82 // /** STOMP recommended error of margin for receiving heartbeats. */ 83 // private enum long HEARTBEAT_MULTIPLIER = 3; 84 85 // /** 86 // * Heartbeat starts once CONNECTED frame with heartbeat settings is received. 87 // * If CONNECTED doesn't arrive within a minute, we'll close the connection. 88 // */ 89 // private enum int MAX_TIME_TO_CONNECTED_FRAME = 60 * 1000; 90 91 // private enum byte[] EMPTY_PAYLOAD = []; 92 93 // // private static final ListenableFutureTask!(Void) EMPTY_TASK = new ListenableFutureTask<>(new VoidCallable()); 94 95 // private __gshared Message!(byte[]) HEARTBEAT_MESSAGE; 96 97 98 // shared static this() { 99 // // EMPTY_TASK.run(); 100 // StompHeaderAccessor accessor = StompHeaderAccessor.createForHeartbeat(); 101 // HEARTBEAT_MESSAGE = MessageHelper.createMessage(StompDecoder.HEARTBEAT_PAYLOAD, accessor.getMessageHeaders()); 102 // } 103 104 105 // private string relayHost = "127.0.0.1"; 106 107 // private int relayPort = 61613; 108 109 // private string clientLogin = "guest"; 110 111 // private string clientPasscode = "guest"; 112 113 // private string systemLogin = "guest"; 114 115 // private string systemPasscode = "guest"; 116 117 // private long systemHeartbeatSendInterval = 10000; 118 119 // private long systemHeartbeatReceiveInterval = 10000; 120 121 // private final Map!(string, MessageHandler) systemSubscriptions = new HashMap<>(4); 122 123 124 // private string virtualHost; 125 126 127 // private TcpOperations!(byte[]) tcpClient; 128 129 130 // private MessageHeaderInitializer headerInitializer; 131 132 // private final Stats stats = new Stats(); 133 134 // private final Map!(string, StompConnectionHandler) connectionHandlers = new ConcurrentHashMap<>(); 135 136 137 // /** 138 // * Create a StompBrokerRelayMessageHandler instance with the given message channels 139 // * and destination prefixes. 140 // * @param inboundChannel the channel for receiving messages from clients (e.g. WebSocket clients) 141 // * @param outboundChannel the channel for sending messages to clients (e.g. WebSocket clients) 142 // * @param brokerChannel the channel for the application to send messages to the broker 143 // * @param destinationPrefixes the broker supported destination prefixes; destinations 144 // * that do not match the given prefix are ignored. 145 // */ 146 // StompBrokerRelayMessageHandler(SubscribableChannel inboundChannel, MessageChannel outboundChannel, 147 // SubscribableChannel brokerChannel, Collection!(string) destinationPrefixes) { 148 149 // super(inboundChannel, outboundChannel, brokerChannel, destinationPrefixes); 150 // } 151 152 153 // /** 154 // * Set the STOMP message broker host. 155 // */ 156 // void setRelayHost(string relayHost) { 157 // Assert.hasText(relayHost, "relayHost must not be empty"); 158 // this.relayHost = relayHost; 159 // } 160 161 // /** 162 // * Return the STOMP message broker host. 163 // */ 164 // string getRelayHost() { 165 // return this.relayHost; 166 // } 167 168 // /** 169 // * Set the STOMP message broker port. 170 // */ 171 // void setRelayPort(int relayPort) { 172 // this.relayPort = relayPort; 173 // } 174 175 // /** 176 // * Return the STOMP message broker port. 177 // */ 178 // int getRelayPort() { 179 // return this.relayPort; 180 // } 181 // /** 182 // * Set the login to use when creating connections to the STOMP broker on 183 // * behalf of connected clients. 184 // * <p>By default this is set to "guest". 185 // * @see #setSystemLogin(string) 186 // */ 187 // void setClientLogin(string clientLogin) { 188 // Assert.hasText(clientLogin, "clientLogin must not be empty"); 189 // this.clientLogin = clientLogin; 190 // } 191 192 // /** 193 // * Return the configured login to use for connections to the STOMP broker 194 // * on behalf of connected clients. 195 // * @see #getSystemLogin() 196 // */ 197 // string getClientLogin() { 198 // return this.clientLogin; 199 // } 200 201 // /** 202 // * Set the client passcode to use to create connections to the STOMP broker on 203 // * behalf of connected clients. 204 // * <p>By default this is set to "guest". 205 // * @see #setSystemPasscode 206 // */ 207 // void setClientPasscode(string clientPasscode) { 208 // Assert.hasText(clientPasscode, "clientPasscode must not be empty"); 209 // this.clientPasscode = clientPasscode; 210 // } 211 212 // /** 213 // * Return the configured passcode to use for connections to the STOMP broker on 214 // * behalf of connected clients. 215 // * @see #getSystemPasscode() 216 // */ 217 // string getClientPasscode() { 218 // return this.clientPasscode; 219 // } 220 221 // /** 222 // * Set the login for the shared "system" connection used to send messages to 223 // * the STOMP broker from within the application, i.e. messages not associated 224 // * with a specific client session (e.g. REST/HTTP request handling method). 225 // * <p>By default this is set to "guest". 226 // */ 227 // void setSystemLogin(string systemLogin) { 228 // Assert.hasText(systemLogin, "systemLogin must not be empty"); 229 // this.systemLogin = systemLogin; 230 // } 231 232 // /** 233 // * Return the login used for the shared "system" connection to the STOMP broker. 234 // */ 235 // string getSystemLogin() { 236 // return this.systemLogin; 237 // } 238 239 // /** 240 // * Set the passcode for the shared "system" connection used to send messages to 241 // * the STOMP broker from within the application, i.e. messages not associated 242 // * with a specific client session (e.g. REST/HTTP request handling method). 243 // * <p>By default this is set to "guest". 244 // */ 245 // void setSystemPasscode(string systemPasscode) { 246 // this.systemPasscode = systemPasscode; 247 // } 248 249 // /** 250 // * Return the passcode used for the shared "system" connection to the STOMP broker. 251 // */ 252 // string getSystemPasscode() { 253 // return this.systemPasscode; 254 // } 255 256 257 // /** 258 // * Set the interval, in milliseconds, at which the "system" connection will, in the 259 // * absence of any other data being sent, send a heartbeat to the STOMP broker. A value 260 // * of zero will prevent heartbeats from being sent to the broker. 261 // * <p>The default value is 10000. 262 // * <p>See class-level documentation for more information on the "system" connection. 263 // */ 264 // void setSystemHeartbeatSendInterval(long systemHeartbeatSendInterval) { 265 // this.systemHeartbeatSendInterval = systemHeartbeatSendInterval; 266 // } 267 268 // /** 269 // * Return the interval, in milliseconds, at which the "system" connection will 270 // * send heartbeats to the STOMP broker. 271 // */ 272 // long getSystemHeartbeatSendInterval() { 273 // return this.systemHeartbeatSendInterval; 274 // } 275 276 // /** 277 // * Set the maximum interval, in milliseconds, at which the "system" connection 278 // * expects, in the absence of any other data, to receive a heartbeat from the STOMP 279 // * broker. A value of zero will configure the connection to expect not to receive 280 // * heartbeats from the broker. 281 // * <p>The default value is 10000. 282 // * <p>See class-level documentation for more information on the "system" connection. 283 // */ 284 // void setSystemHeartbeatReceiveInterval(long heartbeatReceiveInterval) { 285 // this.systemHeartbeatReceiveInterval = heartbeatReceiveInterval; 286 // } 287 288 // /** 289 // * Return the interval, in milliseconds, at which the "system" connection expects 290 // * to receive heartbeats from the STOMP broker. 291 // */ 292 // long getSystemHeartbeatReceiveInterval() { 293 // return this.systemHeartbeatReceiveInterval; 294 // } 295 296 // /** 297 // * Configure one more destinations to subscribe to on the shared "system" 298 // * connection along with MessageHandler's to handle received messages. 299 // * <p>This is for internal use in a multi-application server scenario where 300 // * servers forward messages to each other (e.g. unresolved user destinations). 301 // * @param subscriptions the destinations to subscribe to. 302 // */ 303 // void setSystemSubscriptions(Map!(string, MessageHandler) subscriptions) { 304 // this.systemSubscriptions.clear(); 305 // if (subscriptions !is null) { 306 // this.systemSubscriptions.putAll(subscriptions); 307 // } 308 // } 309 310 // /** 311 // * Return the configured map with subscriptions on the "system" connection. 312 // */ 313 // Map!(string, MessageHandler) getSystemSubscriptions() { 314 // return this.systemSubscriptions; 315 // } 316 317 // /** 318 // * Set the value of the "host" header to use in STOMP CONNECT frames. When this 319 // * property is configured, a "host" header will be added to every STOMP frame sent to 320 // * the STOMP broker. This may be useful for example in a cloud environment where the 321 // * actual host to which the TCP connection is established is different from the host 322 // * providing the cloud-based STOMP service. 323 // * <p>By default this property is not set. 324 // */ 325 // void setVirtualHost(string virtualHost) { 326 // this.virtualHost = virtualHost; 327 // } 328 329 // /** 330 // * Return the configured virtual host value. 331 // */ 332 333 // string getVirtualHost() { 334 // return this.virtualHost; 335 // } 336 337 // /** 338 // * Configure a TCP client for managing TCP connections to the STOMP broker. 339 // * <p>By default {@link ReactorNettyTcpClient} is used. 340 // * <p><strong>Note:</strong> when this property is used, any 341 // * {@link #setRelayHost(string) host} or {@link #setRelayPort(int) port} 342 // * specified are effectively ignored. 343 // */ 344 // void setTcpClient(TcpOperations!(byte[]) tcpClient) { 345 // this.tcpClient = tcpClient; 346 // } 347 348 // /** 349 // * Get the configured TCP client (never {@code null} unless not configured 350 // * invoked and this method is invoked before the handler is started and 351 // * hence a default implementation initialized). 352 // */ 353 354 // TcpOperations!(byte[]) getTcpClient() { 355 // return this.tcpClient; 356 // } 357 358 // /** 359 // * Configure a {@link MessageHeaderInitializer} to apply to the headers of all 360 // * messages created through the {@code StompBrokerRelayMessageHandler} that 361 // * are sent to the client outbound message channel. 362 // * <p>By default this property is not set. 363 // */ 364 // void setHeaderInitializer(MessageHeaderInitializer headerInitializer) { 365 // this.headerInitializer = headerInitializer; 366 // } 367 368 // /** 369 // * Return the configured header initializer. 370 // */ 371 372 // MessageHeaderInitializer getHeaderInitializer() { 373 // return this.headerInitializer; 374 // } 375 376 // /** 377 // * Return a string describing internal state and counters. 378 // */ 379 // string getStatsInfo() { 380 // return this.stats.toString(); 381 // } 382 383 // /** 384 // * Return the current count of TCP connection to the broker. 385 // */ 386 // int getConnectionCount() { 387 // return this.connectionHandlers.size(); 388 // } 389 390 391 // override 392 // protected void startInternal() { 393 // if (this.tcpClient is null) { 394 // this.tcpClient = initTcpClient(); 395 // } 396 397 // version(HUNT_DEBUG) { 398 // info("Starting \"system\" session, " ~ toString()); 399 // } 400 401 // StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.CONNECT); 402 // accessor.setAcceptVersion("1.1,1.2"); 403 // accessor.setLogin(this.systemLogin); 404 // accessor.setPasscode(this.systemPasscode); 405 // accessor.setHeartbeat(this.systemHeartbeatSendInterval, this.systemHeartbeatReceiveInterval); 406 // string virtualHost = getVirtualHost(); 407 // if (virtualHost !is null) { 408 // accessor.setHost(virtualHost); 409 // } 410 // accessor.setSessionId(SYSTEM_SESSION_ID); 411 // version(HUNT_DEBUG) { 412 // trace("Forwarding " ~ accessor.getShortLogMessage(EMPTY_PAYLOAD)); 413 // } 414 415 // SystemStompConnectionHandler handler = new SystemStompConnectionHandler(accessor); 416 // this.connectionHandlers.put(handler.getSessionId(), handler); 417 418 // this.stats.incrementConnectCount(); 419 // this.tcpClient.connect(handler, new FixedIntervalReconnectStrategy(5000)); 420 // } 421 422 // private ReactorNettyTcpClient!(byte[]) initTcpClient() { 423 // StompDecoder decoder = new StompDecoder(); 424 // if (this.headerInitializer !is null) { 425 // decoder.setHeaderInitializer(this.headerInitializer); 426 // } 427 // ReactorNettyCodec!(byte[]) codec = new StompReactorNettyCodec(decoder); 428 // ReactorNettyTcpClient!(byte[]) client = new ReactorNettyTcpClient<>(this.relayHost, this.relayPort, codec); 429 // client.setLogger(SimpLogging.forLog(client.getLogger())); 430 // return client; 431 // } 432 433 // override 434 // protected void stopInternal() { 435 // publishBrokerUnavailableEvent(); 436 // if (this.tcpClient !is null) { 437 // try { 438 // this.tcpClient.shutdown().get(5000, TimeUnit.MILLISECONDS); 439 // } 440 // catch (Throwable ex) { 441 // error("Error in shutdown of TCP client", ex); 442 // } 443 // } 444 // } 445 446 // override 447 // protected void handleMessageInternal(MessageBase message) { 448 // string sessionId = SimpMessageHeaderAccessor.getSessionId(message.getHeaders()); 449 450 // if (!isBrokerAvailable()) { 451 // if (sessionId is null || SYSTEM_SESSION_ID.equals(sessionId)) { 452 // throw new MessageDeliveryException("Message broker not active. Consider subscribing to " ~ 453 // "receive BrokerAvailabilityEvent's from an ApplicationListener Spring bean."); 454 // } 455 // StompConnectionHandler handler = this.connectionHandlers.get(sessionId); 456 // if (handler !is null) { 457 // handler.sendStompErrorFrameToClient("Broker not available."); 458 // handler.clearConnection(); 459 // } 460 // else { 461 // StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.ERROR); 462 // if (getHeaderInitializer() !is null) { 463 // getHeaderInitializer().initHeaders(accessor); 464 // } 465 // accessor.setSessionId(sessionId); 466 // Principal user = SimpMessageHeaderAccessor.getUser(message.getHeaders()); 467 // if (user !is null) { 468 // accessor.setUser(user); 469 // } 470 // accessor.setMessage("Broker not available."); 471 // MessageHeaders headers = accessor.getMessageHeaders(); 472 // getClientOutboundChannel().send(MessageHelper.createMessage(EMPTY_PAYLOAD, headers)); 473 // } 474 // return; 475 // } 476 477 // StompHeaderAccessor stompAccessor; 478 // StompCommand command; 479 480 // MessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, MessageHeaderAccessor.class); 481 // if (accessor is null) { 482 // throw new IllegalStateException( 483 // "No header accessor (not using the SimpMessagingTemplate?): " ~ message); 484 // } 485 // else if (accessor instanceof StompHeaderAccessor) { 486 // stompAccessor = (StompHeaderAccessor) accessor; 487 // command = stompAccessor.getCommand(); 488 // } 489 // else if (accessor instanceof SimpMessageHeaderAccessor) { 490 // stompAccessor = StompHeaderAccessor.wrap(message); 491 // command = stompAccessor.getCommand(); 492 // if (command is null) { 493 // command = stompAccessor.updateStompCommandAsClientMessage(); 494 // } 495 // } 496 // else { 497 // throw new IllegalStateException( 498 // "Unexpected header accessor type " ~ accessor.getClass() ~ " in " ~ message); 499 // } 500 501 // if (sessionId is null) { 502 // if (!SimpMessageType.MESSAGE.equals(stompAccessor.getMessageType())) { 503 // version(HUNT_DEBUG) { 504 // error("Only STOMP SEND supported from within the server side. Ignoring " ~ message); 505 // } 506 // return; 507 // } 508 // sessionId = SYSTEM_SESSION_ID; 509 // stompAccessor.setSessionId(sessionId); 510 // } 511 512 // string destination = stompAccessor.getDestination(); 513 // if (command !is null && command.requiresDestination() && !checkDestinationPrefix(destination)) { 514 // return; 515 // } 516 517 // if (StompCommand.CONNECT.equals(command)) { 518 // version(HUNT_DEBUG) { 519 // trace(stompAccessor.getShortLogMessage(EMPTY_PAYLOAD)); 520 // } 521 // stompAccessor = (stompAccessor.isMutable() ? stompAccessor : StompHeaderAccessor.wrap(message)); 522 // stompAccessor.setLogin(this.clientLogin); 523 // stompAccessor.setPasscode(this.clientPasscode); 524 // if (getVirtualHost() !is null) { 525 // stompAccessor.setHost(getVirtualHost()); 526 // } 527 // StompConnectionHandler handler = new StompConnectionHandler(sessionId, stompAccessor); 528 // this.connectionHandlers.put(sessionId, handler); 529 // this.stats.incrementConnectCount(); 530 // assert(this.tcpClient !is null, "No TCP client available"); 531 // this.tcpClient.connect(handler); 532 // } 533 // else if (StompCommand.DISCONNECT.equals(command)) { 534 // StompConnectionHandler handler = this.connectionHandlers.get(sessionId); 535 // if (handler is null) { 536 // version(HUNT_DEBUG) { 537 // trace("Ignoring DISCONNECT in session " ~ sessionId ~ ". Connection already cleaned up."); 538 // } 539 // return; 540 // } 541 // this.stats.incrementDisconnectCount(); 542 // handler.forward(message, stompAccessor); 543 // } 544 // else { 545 // StompConnectionHandler handler = this.connectionHandlers.get(sessionId); 546 // if (handler is null) { 547 // version(HUNT_DEBUG) { 548 // trace("No TCP connection for session " ~ sessionId ~ " in " ~ message); 549 // } 550 // return; 551 // } 552 // handler.forward(message, stompAccessor); 553 // } 554 // } 555 556 // override 557 // string toString() { 558 // return "StompBrokerRelay[" ~ getTcpClientInfo() ~ "]"; 559 // } 560 561 // private string getTcpClientInfo() { 562 // return this.tcpClient !is null ? this.tcpClient.toString() : this.relayHost ~ ":" ~ this.relayPort; 563 // } 564 565 566 // private class StompConnectionHandler implements TcpConnectionHandler!(byte[]) { 567 568 // private final string sessionId; 569 570 // private bool isRemoteClientSession; 571 572 // private final StompHeaderAccessor connectHeaders; 573 574 // private final MessageChannel outboundChannel; 575 576 577 // private TcpConnection!(byte[]) tcpConnection; 578 579 // private bool isStompConnected; 580 581 582 // protected StompConnectionHandler(string sessionId, StompHeaderAccessor connectHeaders) { 583 // this(sessionId, connectHeaders, true); 584 // } 585 586 // private StompConnectionHandler(string sessionId, StompHeaderAccessor connectHeaders, isClientSession) { 587 // assert(sessionId, "'sessionId' must not be null"); 588 // assert(connectHeaders, "'connectHeaders' must not be null"); 589 // this.sessionId = sessionId; 590 // this.connectHeaders = connectHeaders; 591 // this.isRemoteClientSession = isClientSession; 592 // this.outboundChannel = getClientOutboundChannelForSession(sessionId); 593 // } 594 595 // string getSessionId() { 596 // return this.sessionId; 597 // } 598 599 600 // protected TcpConnection!(byte[]) getTcpConnection() { 601 // return this.tcpConnection; 602 // } 603 604 // override 605 // void afterConnected(TcpConnection!(byte[]) connection) { 606 // version(HUNT_DEBUG) { 607 // trace("TCP connection opened in session=" ~ getSessionId()); 608 // } 609 // this.tcpConnection = connection; 610 // connection.onReadInactivity(() -> { 611 // if (this.tcpConnection !is null && !this.isStompConnected) { 612 // handleTcpConnectionFailure("No CONNECTED frame received in " ~ 613 // MAX_TIME_TO_CONNECTED_FRAME ~ " ms.", null); 614 // } 615 // }, MAX_TIME_TO_CONNECTED_FRAME); 616 // connection.send(MessageHelper.createMessage(EMPTY_PAYLOAD, this.connectHeaders.getMessageHeaders())); 617 // } 618 619 // override 620 // void afterConnectFailure(Throwable ex) { 621 // handleTcpConnectionFailure("Failed to connect: " ~ ex.getMessage(), ex); 622 // } 623 624 // /** 625 // * Invoked when any TCP connectivity issue is detected, i.e. failure to establish 626 // * the TCP connection, failure to send a message, missed heartbeat, etc. 627 // */ 628 // protected void handleTcpConnectionFailure(string error, Throwable ex) { 629 // version(HUNT_DEBUG) { 630 // info("TCP connection failure in session " ~ this.sessionId ~ ": " ~ error, ex); 631 // } 632 // try { 633 // sendStompErrorFrameToClient(error); 634 // } 635 // finally { 636 // try { 637 // clearConnection(); 638 // } 639 // catch (Throwable ex2) { 640 // version(HUNT_DEBUG) { 641 // trace("Failure while clearing TCP connection state in session " ~ this.sessionId, ex2); 642 // } 643 // } 644 // } 645 // } 646 647 // private void sendStompErrorFrameToClient(string errorText) { 648 // if (this.isRemoteClientSession) { 649 // StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.ERROR); 650 // if (getHeaderInitializer() !is null) { 651 // getHeaderInitializer().initHeaders(accessor); 652 // } 653 // accessor.setSessionId(this.sessionId); 654 // Principal user = this.connectHeaders.getUser(); 655 // if (user !is null) { 656 // accessor.setUser(user); 657 // } 658 // accessor.setMessage(errorText); 659 // accessor.setLeaveMutable(true); 660 // MessageBase errorMessage = MessageHelper.createMessage(EMPTY_PAYLOAD, accessor.getMessageHeaders()); 661 // handleInboundMessage(errorMessage); 662 // } 663 // } 664 665 // protected void handleInboundMessage(MessageBase message) { 666 // if (this.isRemoteClientSession) { 667 // this.outboundChannel.send(message); 668 // } 669 // } 670 671 // override 672 // void handleMessage(Message!(byte[]) message) { 673 // StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class); 674 // assert(accessor !is null, "No StompHeaderAccessor"); 675 // accessor.setSessionId(this.sessionId); 676 // Principal user = this.connectHeaders.getUser(); 677 // if (user !is null) { 678 // accessor.setUser(user); 679 // } 680 681 // StompCommand command = accessor.getCommand(); 682 // if (StompCommand.CONNECTED.equals(command)) { 683 // version(HUNT_DEBUG) { 684 // trace("Received " ~ accessor.getShortLogMessage(EMPTY_PAYLOAD)); 685 // } 686 // afterStompConnected(accessor); 687 // } 688 // else if (logger.isErrorEnabled() && StompCommand.ERROR.equals(command)) { 689 // error("Received " ~ accessor.getShortLogMessage(message.getPayload())); 690 // } 691 // else version(HUNT_DEBUG) { 692 // trace("Received " ~ accessor.getDetailedLogMessage(message.getPayload())); 693 // } 694 695 // handleInboundMessage(message); 696 // } 697 698 // /** 699 // * Invoked after the STOMP CONNECTED frame is received. At this point the 700 // * connection is ready for sending STOMP messages to the broker. 701 // */ 702 // protected void afterStompConnected(StompHeaderAccessor connectedHeaders) { 703 // this.isStompConnected = true; 704 // stats.incrementConnectedCount(); 705 // initHeartbeats(connectedHeaders); 706 // } 707 708 // private void initHeartbeats(StompHeaderAccessor connectedHeaders) { 709 // if (this.isRemoteClientSession) { 710 // return; 711 // } 712 713 // TcpConnection!(byte[]) con = this.tcpConnection; 714 // assert(con !is null, "No TcpConnection available"); 715 716 // long clientSendInterval = this.connectHeaders.getHeartbeat()[0]; 717 // long clientReceiveInterval = this.connectHeaders.getHeartbeat()[1]; 718 // long serverSendInterval = connectedHeaders.getHeartbeat()[0]; 719 // long serverReceiveInterval = connectedHeaders.getHeartbeat()[1]; 720 721 // if (clientSendInterval > 0 && serverReceiveInterval > 0) { 722 // long interval = Math.max(clientSendInterval, serverReceiveInterval); 723 // con.onWriteInactivity(() -> 724 // con.send(HEARTBEAT_MESSAGE).addCallback( 725 // result -> {}, 726 // ex -> handleTcpConnectionFailure( 727 // "Failed to forward heartbeat: " ~ ex.getMessage(), ex)), interval); 728 // } 729 // if (clientReceiveInterval > 0 && serverSendInterval > 0) { 730 // final long interval = Math.max(clientReceiveInterval, serverSendInterval) * HEARTBEAT_MULTIPLIER; 731 // con.onReadInactivity( 732 // () -> handleTcpConnectionFailure("No messages received in " ~ interval ~ " ms.", null), interval); 733 // } 734 // } 735 736 // override 737 // void handleFailure(Throwable ex) { 738 // if (this.tcpConnection !is null) { 739 // handleTcpConnectionFailure("Transport failure: " ~ ex.getMessage(), ex); 740 // } 741 // else version(HUNT_DEBUG) { 742 // error("Transport failure: " ~ ex); 743 // } 744 // } 745 746 // override 747 // void afterConnectionClosed() { 748 // if (this.tcpConnection is null) { 749 // return; 750 // } 751 // try { 752 // version(HUNT_DEBUG) { 753 // trace("TCP connection to broker closed in session " ~ this.sessionId); 754 // } 755 // sendStompErrorFrameToClient("Connection to broker closed."); 756 // } 757 // finally { 758 // try { 759 // // Prevent clearConnection() from trying to close 760 // this.tcpConnection = null; 761 // clearConnection(); 762 // } 763 // catch (Throwable ex) { 764 // // Shouldn't happen with connection reset beforehand 765 // } 766 // } 767 // } 768 769 // /** 770 // * Forward the given message to the STOMP broker. 771 // * <p>The method checks whether we have an active TCP connection and have 772 // * received the STOMP CONNECTED frame. For client messages this should be 773 // * false only if we lose the TCP connection around the same time when a 774 // * client message is being forwarded, so we simply log the ignored message 775 // * at debug level. For messages from within the application being sent on 776 // * the "system" connection an exception is raised so that components sending 777 // * the message have a chance to handle it -- by default the broker message 778 // * channel is synchronous. 779 // * <p>Note that if messages arrive concurrently around the same time a TCP 780 // * connection is lost, there is a brief period of time before the connection 781 // * is reset when one or more messages may sneak through and an attempt made 782 // * to forward them. Rather than synchronizing to guard against that, this 783 // * method simply lets them try and fail. For client sessions that may 784 // * result in an additional STOMP ERROR frame(s) being sent downstream but 785 // * code handling that downstream should be idempotent in such cases. 786 // * @param message the message to send (never {@code null}) 787 // * @return a future to wait for the result 788 // */ 789 790 // ListenableFuture!(Void) forward(final MessageBase message, final StompHeaderAccessor accessor) { 791 // TcpConnection!(byte[]) conn = this.tcpConnection; 792 793 // if (!this.isStompConnected || conn is null) { 794 // if (this.isRemoteClientSession) { 795 // version(HUNT_DEBUG) { 796 // trace("TCP connection closed already, ignoring " ~ 797 // accessor.getShortLogMessage(message.getPayload())); 798 // } 799 // return EMPTY_TASK; 800 // } 801 // else { 802 // throw new IllegalStateException("Cannot forward messages " ~ 803 // (conn !is null ? "before STOMP CONNECTED. " : "while inactive. ") + 804 // "Consider subscribing to receive BrokerAvailabilityEvent's from " ~ 805 // "an ApplicationListener Spring bean. Dropped " ~ 806 // accessor.getShortLogMessage(message.getPayload())); 807 // } 808 // } 809 810 // final MessageBase messageToSend = (accessor.isMutable() && accessor.isModified()) ? 811 // MessageHelper.createMessage(message.getPayload(), accessor.getMessageHeaders()) : message; 812 813 // StompCommand command = accessor.getCommand(); 814 // if (logger.isDebugEnabled() && (StompCommand.SEND.equals(command) || StompCommand.SUBSCRIBE.equals(command) || 815 // StompCommand.UNSUBSCRIBE.equals(command) || StompCommand.DISCONNECT.equals(command))) { 816 // trace("Forwarding " ~ accessor.getShortLogMessage(message.getPayload())); 817 // } 818 // else version(HUNT_DEBUG) { 819 // trace("Forwarding " ~ accessor.getDetailedLogMessage(message.getPayload())); 820 // } 821 822 // ListenableFuture!(Void) future = conn.send((Message!(byte[])) messageToSend); 823 // future.addCallback(new ListenableFutureCallback!(Void)() { 824 // override 825 // void onSuccess(Void result) { 826 // if (accessor.getCommand() == StompCommand.DISCONNECT) { 827 // afterDisconnectSent(accessor); 828 // } 829 // } 830 // override 831 // void onFailure(Throwable ex) { 832 // if (tcpConnection !is null) { 833 // handleTcpConnectionFailure("failed to forward " ~ 834 // accessor.getShortLogMessage(message.getPayload()), ex); 835 // } 836 // else version(HUNT_DEBUG) { 837 // error("Failed to forward " ~ accessor.getShortLogMessage(message.getPayload())); 838 // } 839 // } 840 // }); 841 // return future; 842 // } 843 844 // /** 845 // * After a DISCONNECT there should be no more client frames so we can 846 // * close the connection pro-actively. However, if the DISCONNECT has a 847 // * receipt header we leave the connection open and expect the server will 848 // * respond with a RECEIPT and then close the connection. 849 // * @see <a href="http://stomp.github.io/stomp-specification-1.2.html#DISCONNECT"> 850 // * STOMP Specification 1.2 DISCONNECT</a> 851 // */ 852 // private void afterDisconnectSent(StompHeaderAccessor accessor) { 853 // if (accessor.getReceipt() is null) { 854 // try { 855 // clearConnection(); 856 // } 857 // catch (Throwable ex) { 858 // version(HUNT_DEBUG) { 859 // trace("Failure while clearing TCP connection state in session " ~ this.sessionId, ex); 860 // } 861 // } 862 // } 863 // } 864 865 // /** 866 // * Clean up state associated with the connection and close it. 867 // * Any exception arising from closing the connection are propagated. 868 // */ 869 // void clearConnection() { 870 // version(HUNT_DEBUG) { 871 // trace("Cleaning up connection state for session " ~ this.sessionId); 872 // } 873 874 // if (this.isRemoteClientSession) { 875 // StompBrokerRelayMessageHandler.this.connectionHandlers.remove(this.sessionId); 876 // } 877 878 // this.isStompConnected = false; 879 880 // TcpConnection!(byte[]) conn = this.tcpConnection; 881 // this.tcpConnection = null; 882 // if (conn !is null) { 883 // version(HUNT_DEBUG) { 884 // trace("Closing TCP connection in session " ~ this.sessionId); 885 // } 886 // conn.close(); 887 // } 888 // } 889 890 // override 891 // string toString() { 892 // return "StompConnectionHandler[sessionId=" ~ this.sessionId ~ "]"; 893 // } 894 // } 895 896 897 // private class SystemStompConnectionHandler extends StompConnectionHandler { 898 899 // SystemStompConnectionHandler(StompHeaderAccessor connectHeaders) { 900 // super(SYSTEM_SESSION_ID, connectHeaders, false); 901 // } 902 903 // override 904 // protected void afterStompConnected(StompHeaderAccessor connectedHeaders) { 905 // version(HUNT_DEBUG) { 906 // info("\"System\" session connected."); 907 // } 908 // super.afterStompConnected(connectedHeaders); 909 // publishBrokerAvailableEvent(); 910 // sendSystemSubscriptions(); 911 // } 912 913 // private void sendSystemSubscriptions() { 914 // int i = 0; 915 // for (string destination : getSystemSubscriptions().keySet()) { 916 // StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.SUBSCRIBE); 917 // accessor.setSubscriptionId(string.valueOf(i++)); 918 // accessor.setDestination(destination); 919 // version(HUNT_DEBUG) { 920 // trace("Subscribing to " ~ destination ~ " on \"system\" connection."); 921 // } 922 // TcpConnection!(byte[]) conn = getTcpConnection(); 923 // if (conn !is null) { 924 // MessageHeaders headers = accessor.getMessageHeaders(); 925 // conn.send(MessageHelper.createMessage(EMPTY_PAYLOAD, headers)).addCallback( 926 // result -> {}, 927 // ex -> { 928 // string error = "Failed to subscribe in \"system\" session."; 929 // handleTcpConnectionFailure(error, ex); 930 // }); 931 // } 932 // } 933 // } 934 935 // override 936 // protected void handleInboundMessage(MessageBase message) { 937 // StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class); 938 // if (accessor !is null && StompCommand.MESSAGE.equals(accessor.getCommand())) { 939 // string destination = accessor.getDestination(); 940 // if (destination is null) { 941 // version(HUNT_DEBUG) { 942 // trace("Got message on \"system\" connection, with no destination: " ~ 943 // accessor.getDetailedLogMessage(message.getPayload())); 944 // } 945 // return; 946 // } 947 // if (!getSystemSubscriptions().containsKey(destination)) { 948 // version(HUNT_DEBUG) { 949 // trace("Got message on \"system\" connection with no handler: " ~ 950 // accessor.getDetailedLogMessage(message.getPayload())); 951 // } 952 // return; 953 // } 954 // try { 955 // MessageHandler handler = getSystemSubscriptions().get(destination); 956 // handler.handleMessage(message); 957 // } 958 // catch (Throwable ex) { 959 // version(HUNT_DEBUG) { 960 // trace("Error while handling message on \"system\" connection.", ex); 961 // } 962 // } 963 // } 964 // } 965 966 // override 967 // protected void handleTcpConnectionFailure(string errorMessage, Throwable ex) { 968 // super.handleTcpConnectionFailure(errorMessage, ex); 969 // publishBrokerUnavailableEvent(); 970 // } 971 972 // override 973 // void afterConnectionClosed() { 974 // super.afterConnectionClosed(); 975 // publishBrokerUnavailableEvent(); 976 // } 977 978 // override 979 // ListenableFuture!(Void) forward(MessageBase message, StompHeaderAccessor accessor) { 980 // try { 981 // ListenableFuture!(Void) future = super.forward(message, accessor); 982 // if (message.getHeaders().get(SimpMessageHeaderAccessor.IGNORE_ERROR) is null) { 983 // future.get(); 984 // } 985 // return future; 986 // } 987 // catch (Throwable ex) { 988 // throw new MessageDeliveryException(message, ex); 989 // } 990 // } 991 // } 992 993 994 // private static class VoidCallable implements Callable!(Void) { 995 996 // override 997 // Void call() { 998 // return null; 999 // } 1000 // } 1001 1002 1003 // private class Stats { 1004 1005 // private final AtomicInteger connect = new AtomicInteger(); 1006 1007 // private final AtomicInteger connected = new AtomicInteger(); 1008 1009 // private final AtomicInteger disconnect = new AtomicInteger(); 1010 1011 // void incrementConnectCount() { 1012 // this.connect.incrementAndGet(); 1013 // } 1014 1015 // void incrementConnectedCount() { 1016 // this.connected.incrementAndGet(); 1017 // } 1018 1019 // void incrementDisconnectCount() { 1020 // this.disconnect.incrementAndGet(); 1021 // } 1022 1023 // string toString() { 1024 // return (connectionHandlers.size() ~ " sessions, " ~ getTcpClientInfo() + 1025 // (isBrokerAvailable() ? " (available)" : " (not available)") + 1026 // ", processed CONNECT(" ~ this.connect.get() ~ ")-CONNECTED(" ~ 1027 // this.connected.get() ~ ")-DISCONNECT(" ~ this.disconnect.get() ~ ")"); 1028 // } 1029 // } 1030 1031 }