1 /*
2  * Copyright 2002-2018 the original author or authors.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 module hunt.stomp.simp.stomp.StompBrokerRelayMessageHandler;
18 
19 // import hunt.security.Principal;
20 // import hunt.collection;
21 
22 // import hunt.stomp.Message;
23 // import hunt.stomp.MessageChannel;
24 // import hunt.stomp.MessageDeliveryException;
25 // 
26 // import hunt.stomp.MessageHeaders;
27 // 
28 // 
29 // import hunt.stomp.simp.SimpMessageHeaderAccessor;
30 // import hunt.stomp.simp.SimpMessageType;
31 // import hunt.stomp.simp.broker.AbstractBrokerMessageHandler;
32 // import hunt.stomp.support.MessageBuilder;
33 // import hunt.stomp.support.MessageHeaderAccessor;
34 // 
35 // // import hunt.stomp.tcp.FixedIntervalReconnectStrategy;
36 // // import hunt.stomp.tcp.TcpConnection;
37 // // import hunt.stomp.tcp.TcpConnectionHandler;
38 // // import hunt.stomp.tcp.TcpOperations;
39 // // import hunt.stomp.tcp.reactor.ReactorNettyCodec;
40 // // import hunt.stomp.tcp.reactor.ReactorNettyTcpClient;
41 
42 // // import hunt.framework.util.concurrent.ListenableFuture;
43 // // import hunt.framework.util.concurrent.ListenableFutureCallback;
44 // // import hunt.framework.util.concurrent.ListenableFutureTask;
45 
46 // /**
47 //  * A {@link hunt.stomp.MessageHandler} that handles messages by
48 //  * forwarding them to a STOMP broker.
49 //  *
50 //  * <p>For each new {@link SimpMessageType#CONNECT CONNECT} message, an independent TCP
51 //  * connection to the broker is opened and used exclusively for all messages from the
52 //  * client that originated the CONNECT message. Messages from the same client are
53 //  * identified through the session id message header. Reversely, when the STOMP broker
54 //  * sends messages back on the TCP connection, those messages are enriched with the
55 //  * session id of the client and sent back downstream through the {@link MessageChannel}
56 //  * provided to the constructor.
57 //  *
58 //  * <p>This class also automatically opens a default "system" TCP connection to the
59 //  * message broker that is used for sending messages that originate from the server
60 //  * application (as opposed to from a client). Such messages are not associated with
61 //  * any client and therefore do not have a session id header. The "system" connection
62 //  * is effectively shared and cannot be used to receive messages. Several properties
63 //  * are provided to configure the "system" connection including:
64 //  * <ul>
65 //  * <li>{@link #setSystemLogin}</li>
66 //  * <li>{@link #setSystemPasscode}</li>
67 //  * <li>{@link #setSystemHeartbeatSendInterval}</li>
68 //  * <li>{@link #setSystemHeartbeatReceiveInterval}</li>
69 //  * </ul>
70 //  *
71 //  * @author Rossen Stoyanchev
72 //  * @author Andy Wilkinson
73 //  * @since 4.0
74 //  */
75 class StompBrokerRelayMessageHandler  { // : AbstractBrokerMessageHandler
76 
77 // 	/**
78 // 	 * The system session ID.
79 // 	 */
80 // 	enum string SYSTEM_SESSION_ID = "_system_";
81 
82 // 	/** STOMP recommended error of margin for receiving heartbeats. */
83 // 	private enum long HEARTBEAT_MULTIPLIER = 3;
84 
85 // 	/**
86 // 	 * Heartbeat starts once CONNECTED frame with heartbeat settings is received.
87 // 	 * If CONNECTED doesn't arrive within a minute, we'll close the connection.
88 // 	 */
89 // 	private enum int MAX_TIME_TO_CONNECTED_FRAME = 60 * 1000;
90 
91 // 	private enum byte[] EMPTY_PAYLOAD = [];
92 
93 // 	// private static final ListenableFutureTask!(Void) EMPTY_TASK = new ListenableFutureTask<>(new VoidCallable());
94 
95 // 	private __gshared Message!(byte[]) HEARTBEAT_MESSAGE;
96 
97 
98 // 	shared static this() {
99 // 		// EMPTY_TASK.run();
100 // 		StompHeaderAccessor accessor = StompHeaderAccessor.createForHeartbeat();
101 // 		HEARTBEAT_MESSAGE = MessageHelper.createMessage(StompDecoder.HEARTBEAT_PAYLOAD, accessor.getMessageHeaders());
102 // 	}
103 
104 
105 // 	private string relayHost = "127.0.0.1";
106 
107 // 	private int relayPort = 61613;
108 
109 // 	private string clientLogin = "guest";
110 
111 // 	private string clientPasscode = "guest";
112 
113 // 	private string systemLogin = "guest";
114 
115 // 	private string systemPasscode = "guest";
116 
117 // 	private long systemHeartbeatSendInterval = 10000;
118 
119 // 	private long systemHeartbeatReceiveInterval = 10000;
120 
121 // 	private final Map!(string, MessageHandler) systemSubscriptions = new HashMap<>(4);
122 
123 	
124 // 	private string virtualHost;
125 
126 	
127 // 	private TcpOperations!(byte[]) tcpClient;
128 
129 	
130 // 	private MessageHeaderInitializer headerInitializer;
131 
132 // 	private final Stats stats = new Stats();
133 
134 // 	private final Map!(string, StompConnectionHandler) connectionHandlers = new ConcurrentHashMap<>();
135 
136 
137 // 	/**
138 // 	 * Create a StompBrokerRelayMessageHandler instance with the given message channels
139 // 	 * and destination prefixes.
140 // 	 * @param inboundChannel the channel for receiving messages from clients (e.g. WebSocket clients)
141 // 	 * @param outboundChannel the channel for sending messages to clients (e.g. WebSocket clients)
142 // 	 * @param brokerChannel the channel for the application to send messages to the broker
143 // 	 * @param destinationPrefixes the broker supported destination prefixes; destinations
144 // 	 * that do not match the given prefix are ignored.
145 // 	 */
146 // 	StompBrokerRelayMessageHandler(SubscribableChannel inboundChannel, MessageChannel outboundChannel,
147 // 			SubscribableChannel brokerChannel, Collection!(string) destinationPrefixes) {
148 
149 // 		super(inboundChannel, outboundChannel, brokerChannel, destinationPrefixes);
150 // 	}
151 
152 
153 // 	/**
154 // 	 * Set the STOMP message broker host.
155 // 	 */
156 // 	void setRelayHost(string relayHost) {
157 // 		Assert.hasText(relayHost, "relayHost must not be empty");
158 // 		this.relayHost = relayHost;
159 // 	}
160 
161 // 	/**
162 // 	 * Return the STOMP message broker host.
163 // 	 */
164 // 	string getRelayHost() {
165 // 		return this.relayHost;
166 // 	}
167 
168 // 	/**
169 // 	 * Set the STOMP message broker port.
170 // 	 */
171 // 	void setRelayPort(int relayPort) {
172 // 		this.relayPort = relayPort;
173 // 	}
174 
175 // 	/**
176 // 	 * Return the STOMP message broker port.
177 // 	 */
178 // 	int getRelayPort() {
179 // 		return this.relayPort;
180 // 	}
181 // 	/**
182 // 	 * Set the login to use when creating connections to the STOMP broker on
183 // 	 * behalf of connected clients.
184 // 	 * <p>By default this is set to "guest".
185 // 	 * @see #setSystemLogin(string)
186 // 	 */
187 // 	void setClientLogin(string clientLogin) {
188 // 		Assert.hasText(clientLogin, "clientLogin must not be empty");
189 // 		this.clientLogin = clientLogin;
190 // 	}
191 
192 // 	/**
193 // 	 * Return the configured login to use for connections to the STOMP broker
194 // 	 * on behalf of connected clients.
195 // 	 * @see #getSystemLogin()
196 // 	 */
197 // 	string getClientLogin() {
198 // 		return this.clientLogin;
199 // 	}
200 
201 // 	/**
202 // 	 * Set the client passcode to use to create connections to the STOMP broker on
203 // 	 * behalf of connected clients.
204 // 	 * <p>By default this is set to "guest".
205 // 	 * @see #setSystemPasscode
206 // 	 */
207 // 	void setClientPasscode(string clientPasscode) {
208 // 		Assert.hasText(clientPasscode, "clientPasscode must not be empty");
209 // 		this.clientPasscode = clientPasscode;
210 // 	}
211 
212 // 	/**
213 // 	 * Return the configured passcode to use for connections to the STOMP broker on
214 // 	 * behalf of connected clients.
215 // 	 * @see #getSystemPasscode()
216 // 	 */
217 // 	string getClientPasscode() {
218 // 		return this.clientPasscode;
219 // 	}
220 
221 // 	/**
222 // 	 * Set the login for the shared "system" connection used to send messages to
223 // 	 * the STOMP broker from within the application, i.e. messages not associated
224 // 	 * with a specific client session (e.g. REST/HTTP request handling method).
225 // 	 * <p>By default this is set to "guest".
226 // 	 */
227 // 	void setSystemLogin(string systemLogin) {
228 // 		Assert.hasText(systemLogin, "systemLogin must not be empty");
229 // 		this.systemLogin = systemLogin;
230 // 	}
231 
232 // 	/**
233 // 	 * Return the login used for the shared "system" connection to the STOMP broker.
234 // 	 */
235 // 	string getSystemLogin() {
236 // 		return this.systemLogin;
237 // 	}
238 
239 // 	/**
240 // 	 * Set the passcode for the shared "system" connection used to send messages to
241 // 	 * the STOMP broker from within the application, i.e. messages not associated
242 // 	 * with a specific client session (e.g. REST/HTTP request handling method).
243 // 	 * <p>By default this is set to "guest".
244 // 	 */
245 // 	void setSystemPasscode(string systemPasscode) {
246 // 		this.systemPasscode = systemPasscode;
247 // 	}
248 
249 // 	/**
250 // 	 * Return the passcode used for the shared "system" connection to the STOMP broker.
251 // 	 */
252 // 	string getSystemPasscode() {
253 // 		return this.systemPasscode;
254 // 	}
255 
256 
257 // 	/**
258 // 	 * Set the interval, in milliseconds, at which the "system" connection will, in the
259 // 	 * absence of any other data being sent, send a heartbeat to the STOMP broker. A value
260 // 	 * of zero will prevent heartbeats from being sent to the broker.
261 // 	 * <p>The default value is 10000.
262 // 	 * <p>See class-level documentation for more information on the "system" connection.
263 // 	 */
264 // 	void setSystemHeartbeatSendInterval(long systemHeartbeatSendInterval) {
265 // 		this.systemHeartbeatSendInterval = systemHeartbeatSendInterval;
266 // 	}
267 
268 // 	/**
269 // 	 * Return the interval, in milliseconds, at which the "system" connection will
270 // 	 * send heartbeats to the STOMP broker.
271 // 	 */
272 // 	long getSystemHeartbeatSendInterval() {
273 // 		return this.systemHeartbeatSendInterval;
274 // 	}
275 
276 // 	/**
277 // 	 * Set the maximum interval, in milliseconds, at which the "system" connection
278 // 	 * expects, in the absence of any other data, to receive a heartbeat from the STOMP
279 // 	 * broker. A value of zero will configure the connection to expect not to receive
280 // 	 * heartbeats from the broker.
281 // 	 * <p>The default value is 10000.
282 // 	 * <p>See class-level documentation for more information on the "system" connection.
283 // 	 */
284 // 	void setSystemHeartbeatReceiveInterval(long heartbeatReceiveInterval) {
285 // 		this.systemHeartbeatReceiveInterval = heartbeatReceiveInterval;
286 // 	}
287 
288 // 	/**
289 // 	 * Return the interval, in milliseconds, at which the "system" connection expects
290 // 	 * to receive heartbeats from the STOMP broker.
291 // 	 */
292 // 	long getSystemHeartbeatReceiveInterval() {
293 // 		return this.systemHeartbeatReceiveInterval;
294 // 	}
295 
296 // 	/**
297 // 	 * Configure one more destinations to subscribe to on the shared "system"
298 // 	 * connection along with MessageHandler's to handle received messages.
299 // 	 * <p>This is for internal use in a multi-application server scenario where
300 // 	 * servers forward messages to each other (e.g. unresolved user destinations).
301 // 	 * @param subscriptions the destinations to subscribe to.
302 // 	 */
303 // 	void setSystemSubscriptions(Map!(string, MessageHandler) subscriptions) {
304 // 		this.systemSubscriptions.clear();
305 // 		if (subscriptions !is null) {
306 // 			this.systemSubscriptions.putAll(subscriptions);
307 // 		}
308 // 	}
309 
310 // 	/**
311 // 	 * Return the configured map with subscriptions on the "system" connection.
312 // 	 */
313 // 	Map!(string, MessageHandler) getSystemSubscriptions() {
314 // 		return this.systemSubscriptions;
315 // 	}
316 
317 // 	/**
318 // 	 * Set the value of the "host" header to use in STOMP CONNECT frames. When this
319 // 	 * property is configured, a "host" header will be added to every STOMP frame sent to
320 // 	 * the STOMP broker. This may be useful for example in a cloud environment where the
321 // 	 * actual host to which the TCP connection is established is different from the host
322 // 	 * providing the cloud-based STOMP service.
323 // 	 * <p>By default this property is not set.
324 // 	 */
325 // 	void setVirtualHost(string virtualHost) {
326 // 		this.virtualHost = virtualHost;
327 // 	}
328 
329 // 	/**
330 // 	 * Return the configured virtual host value.
331 // 	 */
332 	
333 // 	string getVirtualHost() {
334 // 		return this.virtualHost;
335 // 	}
336 
337 // 	/**
338 // 	 * Configure a TCP client for managing TCP connections to the STOMP broker.
339 // 	 * <p>By default {@link ReactorNettyTcpClient} is used.
340 // 	 * <p><strong>Note:</strong> when this property is used, any
341 // 	 * {@link #setRelayHost(string) host} or {@link #setRelayPort(int) port}
342 // 	 * specified are effectively ignored.
343 // 	 */
344 // 	void setTcpClient(TcpOperations!(byte[]) tcpClient) {
345 // 		this.tcpClient = tcpClient;
346 // 	}
347 
348 // 	/**
349 // 	 * Get the configured TCP client (never {@code null} unless not configured
350 // 	 * invoked and this method is invoked before the handler is started and
351 // 	 * hence a default implementation initialized).
352 // 	 */
353 	
354 // 	TcpOperations!(byte[]) getTcpClient() {
355 // 		return this.tcpClient;
356 // 	}
357 
358 // 	/**
359 // 	 * Configure a {@link MessageHeaderInitializer} to apply to the headers of all
360 // 	 * messages created through the {@code StompBrokerRelayMessageHandler} that
361 // 	 * are sent to the client outbound message channel.
362 // 	 * <p>By default this property is not set.
363 // 	 */
364 // 	void setHeaderInitializer(MessageHeaderInitializer headerInitializer) {
365 // 		this.headerInitializer = headerInitializer;
366 // 	}
367 
368 // 	/**
369 // 	 * Return the configured header initializer.
370 // 	 */
371 	
372 // 	MessageHeaderInitializer getHeaderInitializer() {
373 // 		return this.headerInitializer;
374 // 	}
375 
376 // 	/**
377 // 	 * Return a string describing internal state and counters.
378 // 	 */
379 // 	string getStatsInfo() {
380 // 		return this.stats.toString();
381 // 	}
382 
383 // 	/**
384 // 	 * Return the current count of TCP connection to the broker.
385 // 	 */
386 // 	int getConnectionCount() {
387 // 		return this.connectionHandlers.size();
388 // 	}
389 
390 
391 // 	override
392 // 	protected void startInternal() {
393 // 		if (this.tcpClient is null) {
394 // 			this.tcpClient = initTcpClient();
395 // 		}
396 
397 // 		version(HUNT_DEBUG) {
398 // 			info("Starting \"system\" session, " ~ toString());
399 // 		}
400 
401 // 		StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.CONNECT);
402 // 		accessor.setAcceptVersion("1.1,1.2");
403 // 		accessor.setLogin(this.systemLogin);
404 // 		accessor.setPasscode(this.systemPasscode);
405 // 		accessor.setHeartbeat(this.systemHeartbeatSendInterval, this.systemHeartbeatReceiveInterval);
406 // 		string virtualHost = getVirtualHost();
407 // 		if (virtualHost !is null) {
408 // 			accessor.setHost(virtualHost);
409 // 		}
410 // 		accessor.setSessionId(SYSTEM_SESSION_ID);
411 // 		version(HUNT_DEBUG) {
412 // 			trace("Forwarding " ~ accessor.getShortLogMessage(EMPTY_PAYLOAD));
413 // 		}
414 
415 // 		SystemStompConnectionHandler handler = new SystemStompConnectionHandler(accessor);
416 // 		this.connectionHandlers.put(handler.getSessionId(), handler);
417 
418 // 		this.stats.incrementConnectCount();
419 // 		this.tcpClient.connect(handler, new FixedIntervalReconnectStrategy(5000));
420 // 	}
421 
422 // 	private ReactorNettyTcpClient!(byte[]) initTcpClient() {
423 // 		StompDecoder decoder = new StompDecoder();
424 // 		if (this.headerInitializer !is null) {
425 // 			decoder.setHeaderInitializer(this.headerInitializer);
426 // 		}
427 // 		ReactorNettyCodec!(byte[]) codec = new StompReactorNettyCodec(decoder);
428 // 		ReactorNettyTcpClient!(byte[]) client = new ReactorNettyTcpClient<>(this.relayHost, this.relayPort, codec);
429 // 		client.setLogger(SimpLogging.forLog(client.getLogger()));
430 // 		return client;
431 // 	}
432 
433 // 	override
434 // 	protected void stopInternal() {
435 // 		publishBrokerUnavailableEvent();
436 // 		if (this.tcpClient !is null) {
437 // 			try {
438 // 				this.tcpClient.shutdown().get(5000, TimeUnit.MILLISECONDS);
439 // 			}
440 // 			catch (Throwable ex) {
441 // 				error("Error in shutdown of TCP client", ex);
442 // 			}
443 // 		}
444 // 	}
445 
446 // 	override
447 // 	protected void handleMessageInternal(MessageBase message) {
448 // 		string sessionId = SimpMessageHeaderAccessor.getSessionId(message.getHeaders());
449 
450 // 		if (!isBrokerAvailable()) {
451 // 			if (sessionId is null || SYSTEM_SESSION_ID.equals(sessionId)) {
452 // 				throw new MessageDeliveryException("Message broker not active. Consider subscribing to " ~
453 // 						"receive BrokerAvailabilityEvent's from an ApplicationListener Spring bean.");
454 // 			}
455 // 			StompConnectionHandler handler = this.connectionHandlers.get(sessionId);
456 // 			if (handler !is null) {
457 // 				handler.sendStompErrorFrameToClient("Broker not available.");
458 // 				handler.clearConnection();
459 // 			}
460 // 			else {
461 // 				StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.ERROR);
462 // 				if (getHeaderInitializer() !is null) {
463 // 					getHeaderInitializer().initHeaders(accessor);
464 // 				}
465 // 				accessor.setSessionId(sessionId);
466 // 				Principal user = SimpMessageHeaderAccessor.getUser(message.getHeaders());
467 // 				if (user !is null) {
468 // 					accessor.setUser(user);
469 // 				}
470 // 				accessor.setMessage("Broker not available.");
471 // 				MessageHeaders headers = accessor.getMessageHeaders();
472 // 				getClientOutboundChannel().send(MessageHelper.createMessage(EMPTY_PAYLOAD, headers));
473 // 			}
474 // 			return;
475 // 		}
476 
477 // 		StompHeaderAccessor stompAccessor;
478 // 		StompCommand command;
479 
480 // 		MessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, MessageHeaderAccessor.class);
481 // 		if (accessor is null) {
482 // 			throw new IllegalStateException(
483 // 					"No header accessor (not using the SimpMessagingTemplate?): " ~ message);
484 // 		}
485 // 		else if (accessor instanceof StompHeaderAccessor) {
486 // 			stompAccessor = (StompHeaderAccessor) accessor;
487 // 			command = stompAccessor.getCommand();
488 // 		}
489 // 		else if (accessor instanceof SimpMessageHeaderAccessor) {
490 // 			stompAccessor = StompHeaderAccessor.wrap(message);
491 // 			command = stompAccessor.getCommand();
492 // 			if (command is null) {
493 // 				command = stompAccessor.updateStompCommandAsClientMessage();
494 // 			}
495 // 		}
496 // 		else {
497 // 			throw new IllegalStateException(
498 // 					"Unexpected header accessor type " ~ accessor.getClass() ~ " in " ~ message);
499 // 		}
500 
501 // 		if (sessionId is null) {
502 // 			if (!SimpMessageType.MESSAGE.equals(stompAccessor.getMessageType())) {
503 // 				version(HUNT_DEBUG) {
504 // 					error("Only STOMP SEND supported from within the server side. Ignoring " ~ message);
505 // 				}
506 // 				return;
507 // 			}
508 // 			sessionId = SYSTEM_SESSION_ID;
509 // 			stompAccessor.setSessionId(sessionId);
510 // 		}
511 
512 // 		string destination = stompAccessor.getDestination();
513 // 		if (command !is null && command.requiresDestination() && !checkDestinationPrefix(destination)) {
514 // 			return;
515 // 		}
516 
517 // 		if (StompCommand.CONNECT.equals(command)) {
518 // 			version(HUNT_DEBUG) {
519 // 				trace(stompAccessor.getShortLogMessage(EMPTY_PAYLOAD));
520 // 			}
521 // 			stompAccessor = (stompAccessor.isMutable() ? stompAccessor : StompHeaderAccessor.wrap(message));
522 // 			stompAccessor.setLogin(this.clientLogin);
523 // 			stompAccessor.setPasscode(this.clientPasscode);
524 // 			if (getVirtualHost() !is null) {
525 // 				stompAccessor.setHost(getVirtualHost());
526 // 			}
527 // 			StompConnectionHandler handler = new StompConnectionHandler(sessionId, stompAccessor);
528 // 			this.connectionHandlers.put(sessionId, handler);
529 // 			this.stats.incrementConnectCount();
530 // 			assert(this.tcpClient !is null, "No TCP client available");
531 // 			this.tcpClient.connect(handler);
532 // 		}
533 // 		else if (StompCommand.DISCONNECT.equals(command)) {
534 // 			StompConnectionHandler handler = this.connectionHandlers.get(sessionId);
535 // 			if (handler is null) {
536 // 				version(HUNT_DEBUG) {
537 // 					trace("Ignoring DISCONNECT in session " ~ sessionId ~ ". Connection already cleaned up.");
538 // 				}
539 // 				return;
540 // 			}
541 // 			this.stats.incrementDisconnectCount();
542 // 			handler.forward(message, stompAccessor);
543 // 		}
544 // 		else {
545 // 			StompConnectionHandler handler = this.connectionHandlers.get(sessionId);
546 // 			if (handler is null) {
547 // 				version(HUNT_DEBUG) {
548 // 					trace("No TCP connection for session " ~ sessionId ~ " in " ~ message);
549 // 				}
550 // 				return;
551 // 			}
552 // 			handler.forward(message, stompAccessor);
553 // 		}
554 // 	}
555 
556 // 	override
557 // 	string toString() {
558 // 		return "StompBrokerRelay[" ~ getTcpClientInfo() ~ "]";
559 // 	}
560 
561 // 	private string getTcpClientInfo() {
562 // 		return this.tcpClient !is null ? this.tcpClient.toString() : this.relayHost ~ ":" ~ this.relayPort;
563 // 	}
564 
565 
566 // 	private class StompConnectionHandler implements TcpConnectionHandler!(byte[]) {
567 
568 // 		private final string sessionId;
569 
570 // 		private bool isRemoteClientSession;
571 
572 // 		private final StompHeaderAccessor connectHeaders;
573 
574 // 		private final MessageChannel outboundChannel;
575 
576 		
577 // 		private TcpConnection!(byte[]) tcpConnection;
578 
579 // 		private bool isStompConnected;
580 
581 
582 // 		protected StompConnectionHandler(string sessionId, StompHeaderAccessor connectHeaders) {
583 // 			this(sessionId, connectHeaders, true);
584 // 		}
585 
586 // 		private StompConnectionHandler(string sessionId, StompHeaderAccessor connectHeaders,  isClientSession) {
587 // 			assert(sessionId, "'sessionId' must not be null");
588 // 			assert(connectHeaders, "'connectHeaders' must not be null");
589 // 			this.sessionId = sessionId;
590 // 			this.connectHeaders = connectHeaders;
591 // 			this.isRemoteClientSession = isClientSession;
592 // 			this.outboundChannel = getClientOutboundChannelForSession(sessionId);
593 // 		}
594 
595 // 		string getSessionId() {
596 // 			return this.sessionId;
597 // 		}
598 
599 		
600 // 		protected TcpConnection!(byte[]) getTcpConnection() {
601 // 			return this.tcpConnection;
602 // 		}
603 
604 // 		override
605 // 		void afterConnected(TcpConnection!(byte[]) connection) {
606 // 			version(HUNT_DEBUG) {
607 // 				trace("TCP connection opened in session=" ~ getSessionId());
608 // 			}
609 // 			this.tcpConnection = connection;
610 // 			connection.onReadInactivity(() -> {
611 // 				if (this.tcpConnection !is null && !this.isStompConnected) {
612 // 					handleTcpConnectionFailure("No CONNECTED frame received in " ~
613 // 							MAX_TIME_TO_CONNECTED_FRAME ~ " ms.", null);
614 // 				}
615 // 			}, MAX_TIME_TO_CONNECTED_FRAME);
616 // 			connection.send(MessageHelper.createMessage(EMPTY_PAYLOAD, this.connectHeaders.getMessageHeaders()));
617 // 		}
618 
619 // 		override
620 // 		void afterConnectFailure(Throwable ex) {
621 // 			handleTcpConnectionFailure("Failed to connect: " ~ ex.getMessage(), ex);
622 // 		}
623 
624 // 		/**
625 // 		 * Invoked when any TCP connectivity issue is detected, i.e. failure to establish
626 // 		 * the TCP connection, failure to send a message, missed heartbeat, etc.
627 // 		 */
628 // 		protected void handleTcpConnectionFailure(string error, Throwable ex) {
629 // 			version(HUNT_DEBUG) {
630 // 				info("TCP connection failure in session " ~ this.sessionId ~ ": " ~ error, ex);
631 // 			}
632 // 			try {
633 // 				sendStompErrorFrameToClient(error);
634 // 			}
635 // 			finally {
636 // 				try {
637 // 					clearConnection();
638 // 				}
639 // 				catch (Throwable ex2) {
640 // 					version(HUNT_DEBUG) {
641 // 						trace("Failure while clearing TCP connection state in session " ~ this.sessionId, ex2);
642 // 					}
643 // 				}
644 // 			}
645 // 		}
646 
647 // 		private void sendStompErrorFrameToClient(string errorText) {
648 // 			if (this.isRemoteClientSession) {
649 // 				StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.ERROR);
650 // 				if (getHeaderInitializer() !is null) {
651 // 					getHeaderInitializer().initHeaders(accessor);
652 // 				}
653 // 				accessor.setSessionId(this.sessionId);
654 // 				Principal user = this.connectHeaders.getUser();
655 // 				if (user !is null) {
656 // 					accessor.setUser(user);
657 // 				}
658 // 				accessor.setMessage(errorText);
659 // 				accessor.setLeaveMutable(true);
660 // 				MessageBase errorMessage = MessageHelper.createMessage(EMPTY_PAYLOAD, accessor.getMessageHeaders());
661 // 				handleInboundMessage(errorMessage);
662 // 			}
663 // 		}
664 
665 // 		protected void handleInboundMessage(MessageBase message) {
666 // 			if (this.isRemoteClientSession) {
667 // 				this.outboundChannel.send(message);
668 // 			}
669 // 		}
670 
671 // 		override
672 // 		void handleMessage(Message!(byte[]) message) {
673 // 			StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
674 // 			assert(accessor !is null, "No StompHeaderAccessor");
675 // 			accessor.setSessionId(this.sessionId);
676 // 			Principal user = this.connectHeaders.getUser();
677 // 			if (user !is null) {
678 // 				accessor.setUser(user);
679 // 			}
680 
681 // 			StompCommand command = accessor.getCommand();
682 // 			if (StompCommand.CONNECTED.equals(command)) {
683 // 				version(HUNT_DEBUG) {
684 // 					trace("Received " ~ accessor.getShortLogMessage(EMPTY_PAYLOAD));
685 // 				}
686 // 				afterStompConnected(accessor);
687 // 			}
688 // 			else if (logger.isErrorEnabled() && StompCommand.ERROR.equals(command)) {
689 // 				error("Received " ~ accessor.getShortLogMessage(message.getPayload()));
690 // 			}
691 // 			else version(HUNT_DEBUG) {
692 // 				trace("Received " ~ accessor.getDetailedLogMessage(message.getPayload()));
693 // 			}
694 
695 // 			handleInboundMessage(message);
696 // 		}
697 
698 // 		/**
699 // 		 * Invoked after the STOMP CONNECTED frame is received. At this point the
700 // 		 * connection is ready for sending STOMP messages to the broker.
701 // 		 */
702 // 		protected void afterStompConnected(StompHeaderAccessor connectedHeaders) {
703 // 			this.isStompConnected = true;
704 // 			stats.incrementConnectedCount();
705 // 			initHeartbeats(connectedHeaders);
706 // 		}
707 
708 // 		private void initHeartbeats(StompHeaderAccessor connectedHeaders) {
709 // 			if (this.isRemoteClientSession) {
710 // 				return;
711 // 			}
712 
713 // 			TcpConnection!(byte[]) con = this.tcpConnection;
714 // 			assert(con !is null, "No TcpConnection available");
715 
716 // 			long clientSendInterval = this.connectHeaders.getHeartbeat()[0];
717 // 			long clientReceiveInterval = this.connectHeaders.getHeartbeat()[1];
718 // 			long serverSendInterval = connectedHeaders.getHeartbeat()[0];
719 // 			long serverReceiveInterval = connectedHeaders.getHeartbeat()[1];
720 
721 // 			if (clientSendInterval > 0 && serverReceiveInterval > 0) {
722 // 				long interval = Math.max(clientSendInterval, serverReceiveInterval);
723 // 				con.onWriteInactivity(() ->
724 // 						con.send(HEARTBEAT_MESSAGE).addCallback(
725 // 								result -> {},
726 // 								ex -> handleTcpConnectionFailure(
727 // 										"Failed to forward heartbeat: " ~ ex.getMessage(), ex)), interval);
728 // 			}
729 // 			if (clientReceiveInterval > 0 && serverSendInterval > 0) {
730 // 				final long interval = Math.max(clientReceiveInterval, serverSendInterval) * HEARTBEAT_MULTIPLIER;
731 // 				con.onReadInactivity(
732 // 						() -> handleTcpConnectionFailure("No messages received in " ~ interval ~ " ms.", null), interval);
733 // 			}
734 // 		}
735 
736 // 		override
737 // 		void handleFailure(Throwable ex) {
738 // 			if (this.tcpConnection !is null) {
739 // 				handleTcpConnectionFailure("Transport failure: " ~ ex.getMessage(), ex);
740 // 			}
741 // 			else version(HUNT_DEBUG) {
742 // 				error("Transport failure: " ~ ex);
743 // 			}
744 // 		}
745 
746 // 		override
747 // 		void afterConnectionClosed() {
748 // 			if (this.tcpConnection is null) {
749 // 				return;
750 // 			}
751 // 			try {
752 // 				version(HUNT_DEBUG) {
753 // 					trace("TCP connection to broker closed in session " ~ this.sessionId);
754 // 				}
755 // 				sendStompErrorFrameToClient("Connection to broker closed.");
756 // 			}
757 // 			finally {
758 // 				try {
759 // 					// Prevent clearConnection() from trying to close
760 // 					this.tcpConnection = null;
761 // 					clearConnection();
762 // 				}
763 // 				catch (Throwable ex) {
764 // 					// Shouldn't happen with connection reset beforehand
765 // 				}
766 // 			}
767 // 		}
768 
769 // 		/**
770 // 		 * Forward the given message to the STOMP broker.
771 // 		 * <p>The method checks whether we have an active TCP connection and have
772 // 		 * received the STOMP CONNECTED frame. For client messages this should be
773 // 		 * false only if we lose the TCP connection around the same time when a
774 // 		 * client message is being forwarded, so we simply log the ignored message
775 // 		 * at debug level. For messages from within the application being sent on
776 // 		 * the "system" connection an exception is raised so that components sending
777 // 		 * the message have a chance to handle it -- by default the broker message
778 // 		 * channel is synchronous.
779 // 		 * <p>Note that if messages arrive concurrently around the same time a TCP
780 // 		 * connection is lost, there is a brief period of time before the connection
781 // 		 * is reset when one or more messages may sneak through and an attempt made
782 // 		 * to forward them. Rather than synchronizing to guard against that, this
783 // 		 * method simply lets them try and fail. For client sessions that may
784 // 		 * result in an additional STOMP ERROR frame(s) being sent downstream but
785 // 		 * code handling that downstream should be idempotent in such cases.
786 // 		 * @param message the message to send (never {@code null})
787 // 		 * @return a future to wait for the result
788 // 		 */
789 		
790 // 		ListenableFuture!(Void) forward(final MessageBase message, final StompHeaderAccessor accessor) {
791 // 			TcpConnection!(byte[]) conn = this.tcpConnection;
792 
793 // 			if (!this.isStompConnected || conn is null) {
794 // 				if (this.isRemoteClientSession) {
795 // 					version(HUNT_DEBUG) {
796 // 						trace("TCP connection closed already, ignoring " ~
797 // 								accessor.getShortLogMessage(message.getPayload()));
798 // 					}
799 // 					return EMPTY_TASK;
800 // 				}
801 // 				else {
802 // 					throw new IllegalStateException("Cannot forward messages " ~
803 // 							(conn !is null ? "before STOMP CONNECTED. " : "while inactive. ") +
804 // 							"Consider subscribing to receive BrokerAvailabilityEvent's from " ~
805 // 							"an ApplicationListener Spring bean. Dropped " ~
806 // 							accessor.getShortLogMessage(message.getPayload()));
807 // 				}
808 // 			}
809 
810 // 			final MessageBase messageToSend = (accessor.isMutable() && accessor.isModified()) ?
811 // 					MessageHelper.createMessage(message.getPayload(), accessor.getMessageHeaders()) : message;
812 
813 // 			StompCommand command = accessor.getCommand();
814 // 			if (logger.isDebugEnabled() && (StompCommand.SEND.equals(command) || StompCommand.SUBSCRIBE.equals(command) ||
815 // 					StompCommand.UNSUBSCRIBE.equals(command) || StompCommand.DISCONNECT.equals(command))) {
816 // 				trace("Forwarding " ~ accessor.getShortLogMessage(message.getPayload()));
817 // 			}
818 // 			else version(HUNT_DEBUG) {
819 // 				trace("Forwarding " ~ accessor.getDetailedLogMessage(message.getPayload()));
820 // 			}
821 
822 // 			ListenableFuture!(Void) future = conn.send((Message!(byte[])) messageToSend);
823 // 			future.addCallback(new ListenableFutureCallback!(Void)() {
824 // 				override
825 // 				void onSuccess(Void result) {
826 // 					if (accessor.getCommand() == StompCommand.DISCONNECT) {
827 // 						afterDisconnectSent(accessor);
828 // 					}
829 // 				}
830 // 				override
831 // 				void onFailure(Throwable ex) {
832 // 					if (tcpConnection !is null) {
833 // 						handleTcpConnectionFailure("failed to forward " ~
834 // 								accessor.getShortLogMessage(message.getPayload()), ex);
835 // 					}
836 // 					else version(HUNT_DEBUG) {
837 // 						error("Failed to forward " ~ accessor.getShortLogMessage(message.getPayload()));
838 // 					}
839 // 				}
840 // 			});
841 // 			return future;
842 // 		}
843 
844 // 		/**
845 // 		 * After a DISCONNECT there should be no more client frames so we can
846 // 		 * close the connection pro-actively. However, if the DISCONNECT has a
847 // 		 * receipt header we leave the connection open and expect the server will
848 // 		 * respond with a RECEIPT and then close the connection.
849 // 		 * @see <a href="http://stomp.github.io/stomp-specification-1.2.html#DISCONNECT">
850 // 		 *     STOMP Specification 1.2 DISCONNECT</a>
851 // 		 */
852 // 		private void afterDisconnectSent(StompHeaderAccessor accessor) {
853 // 			if (accessor.getReceipt() is null) {
854 // 				try {
855 // 					clearConnection();
856 // 				}
857 // 				catch (Throwable ex) {
858 // 					version(HUNT_DEBUG) {
859 // 						trace("Failure while clearing TCP connection state in session " ~ this.sessionId, ex);
860 // 					}
861 // 				}
862 // 			}
863 // 		}
864 
865 // 		/**
866 // 		 * Clean up state associated with the connection and close it.
867 // 		 * Any exception arising from closing the connection are propagated.
868 // 		 */
869 // 		void clearConnection() {
870 // 			version(HUNT_DEBUG) {
871 // 				trace("Cleaning up connection state for session " ~ this.sessionId);
872 // 			}
873 
874 // 			if (this.isRemoteClientSession) {
875 // 				StompBrokerRelayMessageHandler.this.connectionHandlers.remove(this.sessionId);
876 // 			}
877 
878 // 			this.isStompConnected = false;
879 
880 // 			TcpConnection!(byte[]) conn = this.tcpConnection;
881 // 			this.tcpConnection = null;
882 // 			if (conn !is null) {
883 // 				version(HUNT_DEBUG) {
884 // 					trace("Closing TCP connection in session " ~ this.sessionId);
885 // 				}
886 // 				conn.close();
887 // 			}
888 // 		}
889 
890 // 		override
891 // 		string toString() {
892 // 			return "StompConnectionHandler[sessionId=" ~ this.sessionId ~ "]";
893 // 		}
894 // 	}
895 
896 
897 // 	private class SystemStompConnectionHandler extends StompConnectionHandler {
898 
899 // 		SystemStompConnectionHandler(StompHeaderAccessor connectHeaders) {
900 // 			super(SYSTEM_SESSION_ID, connectHeaders, false);
901 // 		}
902 
903 // 		override
904 // 		protected void afterStompConnected(StompHeaderAccessor connectedHeaders) {
905 // 			version(HUNT_DEBUG) {
906 // 				info("\"System\" session connected.");
907 // 			}
908 // 			super.afterStompConnected(connectedHeaders);
909 // 			publishBrokerAvailableEvent();
910 // 			sendSystemSubscriptions();
911 // 		}
912 
913 // 		private void sendSystemSubscriptions() {
914 // 			int i = 0;
915 // 			for (string destination : getSystemSubscriptions().keySet()) {
916 // 				StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.SUBSCRIBE);
917 // 				accessor.setSubscriptionId(string.valueOf(i++));
918 // 				accessor.setDestination(destination);
919 // 				version(HUNT_DEBUG) {
920 // 					trace("Subscribing to " ~ destination ~ " on \"system\" connection.");
921 // 				}
922 // 				TcpConnection!(byte[]) conn = getTcpConnection();
923 // 				if (conn !is null) {
924 // 					MessageHeaders headers = accessor.getMessageHeaders();
925 // 					conn.send(MessageHelper.createMessage(EMPTY_PAYLOAD, headers)).addCallback(
926 // 							result -> {},
927 // 							ex -> {
928 // 								string error = "Failed to subscribe in \"system\" session.";
929 // 								handleTcpConnectionFailure(error, ex);
930 // 							});
931 // 				}
932 // 			}
933 // 		}
934 
935 // 		override
936 // 		protected void handleInboundMessage(MessageBase message) {
937 // 			StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
938 // 			if (accessor !is null && StompCommand.MESSAGE.equals(accessor.getCommand())) {
939 // 				string destination = accessor.getDestination();
940 // 				if (destination is null) {
941 // 					version(HUNT_DEBUG) {
942 // 						trace("Got message on \"system\" connection, with no destination: " ~
943 // 								accessor.getDetailedLogMessage(message.getPayload()));
944 // 					}
945 // 					return;
946 // 				}
947 // 				if (!getSystemSubscriptions().containsKey(destination)) {
948 // 					version(HUNT_DEBUG) {
949 // 						trace("Got message on \"system\" connection with no handler: " ~
950 // 								accessor.getDetailedLogMessage(message.getPayload()));
951 // 					}
952 // 					return;
953 // 				}
954 // 				try {
955 // 					MessageHandler handler = getSystemSubscriptions().get(destination);
956 // 					handler.handleMessage(message);
957 // 				}
958 // 				catch (Throwable ex) {
959 // 					version(HUNT_DEBUG) {
960 // 						trace("Error while handling message on \"system\" connection.", ex);
961 // 					}
962 // 				}
963 // 			}
964 // 		}
965 
966 // 		override
967 // 		protected void handleTcpConnectionFailure(string errorMessage, Throwable ex) {
968 // 			super.handleTcpConnectionFailure(errorMessage, ex);
969 // 			publishBrokerUnavailableEvent();
970 // 		}
971 
972 // 		override
973 // 		void afterConnectionClosed() {
974 // 			super.afterConnectionClosed();
975 // 			publishBrokerUnavailableEvent();
976 // 		}
977 
978 // 		override
979 // 		ListenableFuture!(Void) forward(MessageBase message, StompHeaderAccessor accessor) {
980 // 			try {
981 // 				ListenableFuture!(Void) future = super.forward(message, accessor);
982 // 				if (message.getHeaders().get(SimpMessageHeaderAccessor.IGNORE_ERROR) is null) {
983 // 					future.get();
984 // 				}
985 // 				return future;
986 // 			}
987 // 			catch (Throwable ex) {
988 // 				throw new MessageDeliveryException(message, ex);
989 // 			}
990 // 		}
991 // 	}
992 
993 
994 // 	private static class VoidCallable implements Callable!(Void) {
995 
996 // 		override
997 // 		Void call() {
998 // 			return null;
999 // 		}
1000 // 	}
1001 
1002 
1003 // 	private class Stats {
1004 
1005 // 		private final AtomicInteger connect = new AtomicInteger();
1006 
1007 // 		private final AtomicInteger connected = new AtomicInteger();
1008 
1009 // 		private final AtomicInteger disconnect = new AtomicInteger();
1010 
1011 // 		void incrementConnectCount() {
1012 // 			this.connect.incrementAndGet();
1013 // 		}
1014 
1015 // 		void incrementConnectedCount() {
1016 // 			this.connected.incrementAndGet();
1017 // 		}
1018 
1019 // 		void incrementDisconnectCount() {
1020 // 			this.disconnect.incrementAndGet();
1021 // 		}
1022 
1023 // 		string toString() {
1024 // 			return (connectionHandlers.size() ~ " sessions, " ~ getTcpClientInfo() +
1025 // 					(isBrokerAvailable() ? " (available)" : " (not available)") +
1026 // 					", processed CONNECT(" ~ this.connect.get() ~ ")-CONNECTED(" ~
1027 // 					this.connected.get() ~ ")-DISCONNECT(" ~ this.disconnect.get() ~ ")");
1028 // 		}
1029 // 	}
1030 
1031 }