1 /*
2  * Copyright 2002-2018 the original author or authors.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 module hunt.stomp.simp.stomp.DefaultStompSession;
18 
19 import hunt.stomp.simp.stomp.StompCommand;
20 import hunt.stomp.simp.stomp.StompDecoder;
21 import hunt.stomp.simp.stomp.StompHeaders;
22 import hunt.stomp.simp.stomp.StompSessionHandler;
23 
24 import hunt.stomp.exception;
25 import hunt.stomp.Message;
26 import hunt.stomp.MessagingException;
27 import hunt.stomp.converter.MessageConverter;
28 import hunt.stomp.converter.SimpleMessageConverter;
29 // 
30 import hunt.stomp.support.MessageBuilder;
31 import hunt.stomp.support.MessageHeaderAccessor;
32 // import hunt.stomp.tcp.TcpConnection;
33 import hunt.stomp.IdGenerator;
34 
35 // import java.lang.reflect.Type;
36 // import java.util.ArrayList;
37 // import hunt.collection.Collections;
38 // import java.util.Date;
39 // import java.util.List;
40 // import hunt.collection.Map;
41 // import java.util.concurrent.ConcurrentHashMap;
42 // import java.util.concurrent.ExecutionException;
43 // import java.util.concurrent.ScheduledFuture;
44 // import java.util.concurrent.TimeUnit;
45 // import java.util.concurrent.atomic.AtomicInteger;
46 
47 // import hunt.framework.task.TaskScheduler;
48 
49 import hunt.collection;
50 import hunt.util.DateTime;
51 import hunt.util.Common;
52 import hunt.Exceptions;
53 import hunt.Nullable;
54 import hunt.logging;
55 
56 import std.algorithm;
57 import std.datetime;
58 import std.uuid;
59 
60 // import hunt.framework.core.ResolvableType;
61 
62 // import hunt.framework.util.AlternativeJdkIdGenerator;
63 // import hunt.framework.util.IdGenerator;
64 // import hunt.framework.util.StringUtils;
65 // import hunt.framework.util.concurrent.ListenableFuture;
66 // import hunt.framework.util.concurrent.ListenableFutureCallback;
67 // import hunt.framework.util.concurrent.SettableListenableFuture;
68 
69 /**
70  * Default implementation of {@link ConnectionHandlingStompSession}.
71  *
72  * @author Rossen Stoyanchev
73  * @since 4.2
74  */
75 // class DefaultStompSession : ConnectionHandlingStompSession {
76 
77 // 	private __gshared IdGenerator idGenerator; //  = new AlternativeJdkIdGenerator();
78 
79 // 	/**
80 // 	 * An empty payload.
81 // 	 */
82 // 	enum byte[] EMPTY_PAYLOAD = [];
83 
84 // 	/* STOMP spec: receiver SHOULD take into account an error margin */
85 // 	private enum long HEARTBEAT_MULTIPLIER = 3;
86 
87 // 	private __gshared Message!(byte[]) HEARTBEAT;
88 
89 // 	shared static this() {
90 // 		idGenerator = new class IdGenerator {
91 // 			UUID generateId() {
92 // 				return randomUUID();
93 // 			}
94 // 		};
95 
96 // 		StompHeaderAccessor accessor = StompHeaderAccessor.createForHeartbeat();
97 // 		HEARTBEAT = MessageHelper.createMessage(StompDecoder.HEARTBEAT_PAYLOAD, accessor.getMessageHeaders());
98 // 	}
99 
100 
101 // 	private string sessionId;
102 
103 // 	private StompSessionHandler sessionHandler;
104 
105 // 	private StompHeaders connectHeaders;
106 
107 // 	// private SettableListenableFuture!(StompSession) sessionFuture = new SettableListenableFuture<>();
108 // 	// private StompSession sessionFuture;
109 
110 // 	private MessageConverter converter;
111 	
112 // 	private TaskScheduler taskScheduler;
113 
114 // 	private long receiptTimeLimit; // = TimeUnit.SECONDS.toMillis(15);
115 
116 // 	private bool autoReceiptEnabled;
117 	
118 // 	private TcpConnection!(byte[]) connection;
119 
120 	
121 // 	private string ver;
122 
123 // 	private shared int subscriptionIndex; // = new AtomicInteger();
124 
125 // 	private Map!(string, DefaultSubscription) subscriptions; // = new ConcurrentHashMap<>(4);
126 
127 // 	private shared int receiptIndex; // = new AtomicInteger();
128 
129 // 	private Map!(string, ReceiptHandler) receiptHandlers; // = new ConcurrentHashMap<>(4);
130 
131 // 	/* Whether the client is willfully closing the connection */
132 // 	private bool closing = false;
133 
134 
135 // 	/**
136 // 	 * Create a new session.
137 // 	 * @param sessionHandler the application handler for the session
138 // 	 * @param connectHeaders headers for the STOMP CONNECT frame
139 // 	 */
140 // 	this(StompSessionHandler sessionHandler, StompHeaders connectHeaders) {
141 // 		assert(sessionHandler, "StompSessionHandler must not be null");
142 // 		assert(connectHeaders, "StompHeaders must not be null");
143 // 		subscriptions = new HashMap!(string, DefaultSubscription)(4);
144 // 		receiptHandlers = new HashMap!(string, ReceiptHandler)(4);
145 
146 // 		converter = new SimpleMessageConverter();
147 // 		receiptTimeLimit = convert!(TimeUnit.Second, TimeUnit.Millisecond)(15);
148 // 		this.sessionId = idGenerator.generateId().toString();
149 // 		this.sessionHandler = sessionHandler;
150 // 		this.connectHeaders = connectHeaders;
151 // 	}
152 
153 
154 // 	override
155 // 	string getSessionId() {
156 // 		return this.sessionId;
157 // 	}
158 
159 // 	/**
160 // 	 * Return the configured session handler.
161 // 	 */
162 // 	StompSessionHandler getSessionHandler() {
163 // 		return this.sessionHandler;
164 // 	}
165 
166 // 	// override
167 // 	// ListenableFuture!(StompSession) getSessionFuture() {
168 // 	// 	return this.sessionFuture;
169 // 	// }
170 
171 // 	/**
172 // 	 * Set the {@link MessageConverter} to use to convert the payload of incoming
173 // 	 * and outgoing messages to and from {@code byte[]} based on object type, or
174 // 	 * expected object type, and the "content-type" header.
175 // 	 * <p>By default, {@link SimpleMessageConverter} is configured.
176 // 	 * @param messageConverter the message converter to use
177 // 	 */
178 // 	void setMessageConverter(MessageConverter messageConverter) {
179 // 		assert(messageConverter, "MessageConverter must not be null");
180 // 		this.converter = messageConverter;
181 // 	}
182 
183 // 	/**
184 // 	 * Return the configured {@link MessageConverter}.
185 // 	 */
186 // 	MessageConverter getMessageConverter() {
187 // 		return this.converter;
188 // 	}
189 
190 // 	/**
191 // 	 * Configure the TaskScheduler to use for receipt tracking.
192 // 	 */
193 // 	void setTaskScheduler(TaskScheduler taskScheduler) {
194 // 		this.taskScheduler = taskScheduler;
195 // 	}
196 
197 // 	/**
198 // 	 * Return the configured TaskScheduler to use for receipt tracking.
199 // 	 */
200 	
201 // 	TaskScheduler getTaskScheduler() {
202 // 		return this.taskScheduler;
203 // 	}
204 
205 // 	/**
206 // 	 * Configure the time in milliseconds before a receipt expires.
207 // 	 * <p>By default set to 15,000 (15 seconds).
208 // 	 */
209 // 	void setReceiptTimeLimit(long receiptTimeLimit) {
210 // 		assert(receiptTimeLimit > 0, "Receipt time limit must be larger than zero");
211 // 		this.receiptTimeLimit = receiptTimeLimit;
212 // 	}
213 
214 // 	/**
215 // 	 * Return the configured time limit before a receipt expires.
216 // 	 */
217 // 	long getReceiptTimeLimit() {
218 // 		return this.receiptTimeLimit;
219 // 	}
220 
221 // 	override
222 // 	void setAutoReceipt(bool autoReceiptEnabled) {
223 // 		this.autoReceiptEnabled = autoReceiptEnabled;
224 // 	}
225 
226 // 	/**
227 // 	 * Whether receipt headers should be automatically added.
228 // 	 */
229 // 	bool isAutoReceiptEnabled() {
230 // 		return this.autoReceiptEnabled;
231 // 	}
232 
233 // 	override
234 // 	bool isConnected() {
235 // 		return (this.connection !is null);
236 // 	}
237 
238 // 	override
239 // 	Receiptable send(string destination, Object payload) {
240 // 		StompHeaders headers = new StompHeaders();
241 // 		headers.setDestination(destination);
242 // 		return send(headers, payload);
243 // 	}
244 
245 // 	override
246 // 	Receiptable send(StompHeaders headers, Object payload) {
247 // 		Assert.hasText(headers.getDestination(), "Destination header is required");
248 
249 // 		string receiptId = checkOrAddReceipt(headers);
250 // 		Receiptable receiptable = new ReceiptHandler(receiptId);
251 
252 // 		StompHeaderAccessor accessor = createHeaderAccessor(StompCommand.SEND);
253 // 		accessor.addNativeHeaders(headers);
254 // 		Message!(byte[]) message = createMessage(accessor, payload);
255 // 		execute(message);
256 
257 // 		return receiptable;
258 // 	}
259 
260 	
261 // 	private string checkOrAddReceipt(StompHeaders headers) {
262 // 		string receiptId = headers.getReceipt();
263 // 		if (isAutoReceiptEnabled() && receiptId is null) {
264 // 			receiptId = to!string(this.outer.receiptIndex++);
265 // 			headers.setReceipt(receiptId);
266 // 		}
267 // 		return receiptId;
268 // 	}
269 
270 // 	private StompHeaderAccessor createHeaderAccessor(StompCommand command) {
271 // 		StompHeaderAccessor accessor = StompHeaderAccessor.create(command);
272 // 		accessor.setSessionId(this.sessionId);
273 // 		accessor.setLeaveMutable(true);
274 // 		return accessor;
275 // 	}
276 
277 	
278 // 	private Message!(byte[]) createMessage(StompHeaderAccessor accessor, Object payload) {
279 // 		accessor.updateSimpMessageHeadersFromStompHeaders();
280 // 		Message!(byte[]) message;
281 // 		if (payload is null) {
282 // 			message = MessageHelper.createMessage(EMPTY_PAYLOAD, accessor.getMessageHeaders());
283 // 		}
284 // 		else {
285 // 			auto pl = cast(Nullable!(byte[])) payload;
286 // 			if (pl !is null) {
287 // 				message = MessageHelper.createMessage(pl.value(), accessor.getMessageHeaders());
288 // 			}
289 // 			else {
290 // 				message = cast(Message!(byte[])) getMessageConverter().toMessage(payload, accessor.getMessageHeaders());
291 // 				accessor.updateStompHeadersFromSimpMessageHeaders();
292 // 				if (message is null) {
293 // 					throw new MessageConversionException("Unable to convert payload with type='" ~
294 // 							typeid(payload).name ~ "', contentType='" ~ accessor.getContentType() ~
295 // 							"', converter=[" ~ getMessageConverter() ~ "]");
296 // 				}
297 // 			}
298 // 		} 
299 // 		return message;
300 // 	}
301 
302 // 	private void execute(Message!(byte[]) message) {
303 // 		version(HUNT_DEBUG) {
304 // 			StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor!StompHeaderAccessor(message);
305 // 			if (accessor !is null) {
306 // 				trace("Sending " ~ accessor.getDetailedLogMessage(message.getPayload()));
307 // 			}
308 // 		}
309 // 		TcpConnection!(byte[]) conn = this.connection;
310 // 		assert(conn !is null, "Connection closed");
311 // 		try {
312 // 			conn.send(message).get();
313 // 		}
314 // 		catch (ExecutionException ex) {
315 // 			throw new MessageDeliveryException(message, ex.getCause());
316 // 		}
317 // 		catch (Throwable ex) {
318 // 			throw new MessageDeliveryException(message, ex);
319 // 		}
320 // 	}
321 
322 // 	override
323 // 	Subscription subscribe(string destination, StompFrameHandler handler) {
324 // 		StompHeaders headers = new StompHeaders();
325 // 		headers.setDestination(destination);
326 // 		return subscribe(headers, handler);
327 // 	}
328 
329 // 	override
330 // 	Subscription subscribe(StompHeaders headers, StompFrameHandler handler) {
331 // 		Assert.hasText(headers.getDestination(), "Destination header is required");
332 // 		assert(handler, "StompFrameHandler must not be null");
333 
334 // 		string subscriptionId = headers.getId();
335 // 		if (!StringUtils.hasText(subscriptionId)) {
336 // 			subscriptionId = string.valueOf(this.outer.subscriptionIndex++);
337 // 			headers.setId(subscriptionId);
338 // 		}
339 // 		checkOrAddReceipt(headers);
340 // 		Subscription subscription = new DefaultSubscription(headers, handler);
341 
342 // 		StompHeaderAccessor accessor = createHeaderAccessor(StompCommand.SUBSCRIBE);
343 // 		accessor.addNativeHeaders(headers);
344 // 		Message!(byte[]) message = createMessage(accessor, EMPTY_PAYLOAD);
345 // 		execute(message);
346 
347 // 		return subscription;
348 // 	}
349 
350 // 	override
351 // 	Receiptable acknowledge(string messageId,  consumed) {
352 // 		StompHeaders headers = new StompHeaders();
353 // 		if ("1.1".equals(this.ver)) {
354 // 			headers.setMessageId(messageId);
355 // 		}
356 // 		else {
357 // 			headers.setId(messageId);
358 // 		}
359 // 		return acknowledge(headers, consumed);
360 // 	}
361 
362 // 	override
363 // 	Receiptable acknowledge(StompHeaders headers,  consumed) {
364 // 		string receiptId = checkOrAddReceipt(headers);
365 // 		Receiptable receiptable = new ReceiptHandler(receiptId);
366 
367 // 		StompCommand command = (consumed ? StompCommand.ACK : StompCommand.NACK);
368 // 		StompHeaderAccessor accessor = createHeaderAccessor(command);
369 // 		accessor.addNativeHeaders(headers);
370 // 		Message!(byte[]) message = createMessage(accessor, null);
371 // 		execute(message);
372 
373 // 		return receiptable;
374 // 	}
375 
376 // 	private void unsubscribe(string id, StompHeaders headers) {
377 // 		StompHeaderAccessor accessor = createHeaderAccessor(StompCommand.UNSUBSCRIBE);
378 // 		if (headers !is null) {
379 // 			accessor.addNativeHeaders(headers);
380 // 		}
381 // 		accessor.setSubscriptionId(id);
382 // 		Message!(byte[]) message = createMessage(accessor, EMPTY_PAYLOAD);
383 // 		execute(message);
384 // 	}
385 
386 // 	override
387 // 	void disconnect() {
388 // 		this.closing = true;
389 // 		try {
390 // 			StompHeaderAccessor accessor = createHeaderAccessor(StompCommand.DISCONNECT);
391 // 			Message!(byte[]) message = createMessage(accessor, EMPTY_PAYLOAD);
392 // 			execute(message);
393 // 		}
394 // 		finally {
395 // 			resetConnection();
396 // 		}
397 // 	}
398 
399 
400 // 	// TcpConnectionHandler
401 
402 // 	override
403 // 	void afterConnected(TcpConnection!(byte[]) connection) {
404 // 		this.connection = connection;
405 // 		version(HUNT_DEBUG) {
406 // 			trace("Connection established in session id=" ~ this.sessionId);
407 // 		}
408 // 		StompHeaderAccessor accessor = createHeaderAccessor(StompCommand.CONNECT);
409 // 		accessor.addNativeHeaders(this.connectHeaders);
410 // 		if (this.connectHeaders.getAcceptVersion() is null) {
411 // 			accessor.setAcceptVersion("1.1,1.2");
412 // 		}
413 // 		Message!(byte[]) message = createMessage(accessor, EMPTY_PAYLOAD);
414 // 		execute(message);
415 // 	}
416 
417 // 	override
418 // 	void afterConnectFailure(Throwable ex) {
419 // 		version(HUNT_DEBUG) {
420 // 			trace("Failed to connect session id=" ~ this.sessionId, ex);
421 // 		}
422 // 		// this.sessionFuture.setException(ex);
423 // 		this.sessionHandler.handleTransportError(this, ex);
424 // 	}
425 
426 // 	override
427 // 	void handleMessage(Message!(byte[]) message) {
428 // 		StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor!(StompHeaderAccessor)(message);
429 // 		assert(accessor !is null, "No StompHeaderAccessor");
430 
431 // 		accessor.setSessionId(this.sessionId);
432 // 		StompCommand command = accessor.getCommand();
433 // 		Map!(string, List!(string)) nativeHeaders = accessor.getNativeHeaders();
434 // 		StompHeaders headers = StompHeaders.readOnlyStompHeaders(nativeHeaders);
435 // 		 isHeartbeat = accessor.isHeartbeat();
436 // 		version(HUNT_DEBUG) {
437 // 			trace("Received " ~ accessor.getDetailedLogMessage(message.getPayload()));
438 // 		}
439 
440 // 		try {
441 // 			if (StompCommand.MESSAGE == command) {
442 // 				DefaultSubscription subscription = this.subscriptions.get(headers.getSubscription());
443 // 				if (subscription !is null) {
444 // 					invokeHandler(subscription.getHandler(), message, headers);
445 // 				}
446 // 				else version(HUNT_DEBUG) {
447 // 					trace("No handler for: " ~ accessor.getDetailedLogMessage(message.getPayload()) ~
448 // 							". Perhaps just unsubscribed?");
449 // 				}
450 // 			}
451 // 			else {
452 // 				if (StompCommand.RECEIPT == command) {
453 // 					string receiptId = headers.getReceiptId();
454 // 					ReceiptHandler handler = this.receiptHandlers.get(receiptId);
455 // 					if (handler !is null) {
456 // 						handler.handleReceiptReceived();
457 // 					}
458 // 					else version(HUNT_DEBUG) {
459 // 						trace("No matching receipt: " ~ accessor.getDetailedLogMessage(message.getPayload()));
460 // 					}
461 // 				}
462 // 				else if (StompCommand.CONNECTED == command) {
463 // 					initHeartbeatTasks(headers);
464 // 					this.ver = headers.getFirst("version");
465 // 					// this.sessionFuture.set(this);
466 // 					this.sessionHandler.afterConnected(this, headers);
467 // 				}
468 // 				else if (StompCommand.ERROR == command) {
469 // 					invokeHandler(this.sessionHandler, message, headers);
470 // 				}
471 // 				else if (!isHeartbeat && logger.isTraceEnabled()) {
472 // 					trace("Message not handled.");
473 // 				}
474 // 			}
475 // 		}
476 // 		catch (Throwable ex) {
477 // 			this.sessionHandler.handleException(this, command, headers, message.getPayload(), ex);
478 // 		}
479 // 	}
480 
481 // 	private void invokeHandler(StompFrameHandler handler, 
482 // 		Message!(byte[]) message, StompHeaders headers) {
483 // 		if (message.getPayload().length == 0) {
484 // 			handler.handleFrame(headers, null);
485 // 			return;
486 // 		}
487 // 		// Type payloadType = handler.getPayloadType(headers);
488 // 		// Class<?> resolvedType = ResolvableType.forType(payloadType).resolve();
489 // 		// if (resolvedType is null) {
490 // 		// 	throw new MessageConversionException("Unresolvable payload type [" ~ payloadType +
491 // 		// 			"] from handler type [" ~ handler.getClass() ~ "]");
492 // 		// }
493 // 		// Object object = getMessageConverter().fromMessage(message, resolvedType);
494 // 		// if (object is null) {
495 // 		// 	throw new MessageConversionException("No suitable converter for payload type [" ~ payloadType +
496 // 		// 			"] from handler type [" ~ handler.getClass() ~ "]");
497 // 		// }
498 // 		// handler.handleFrame(headers, object);
499 		
500 // 	}
501 
502 // 	private void initHeartbeatTasks(StompHeaders connectedHeaders) {
503 // 		long[] connect = this.connectHeaders.getHeartbeat();
504 // 		long[] connected = connectedHeaders.getHeartbeat();
505 // 		if (connect is null || connected is null) {
506 // 			return;
507 // 		}
508 // 		// TODO: Tasks pending completion -@zxp at 10/30/2018, 3:10:33 PM
509 // 		// 
510 // 		// TcpConnection!(byte[]) con = this.connection;
511 // 		// assert(con !is null, "No TcpConnection available");
512 // 		// if (connect[0] > 0 && connected[1] > 0) {
513 // 		// 	long interval = max(connect[0],  connected[1]);
514 // 		// 	con.onWriteInactivity(new WriteInactivityTask(), interval);
515 // 		// }
516 // 		// if (connect[1] > 0 && connected[0] > 0) {
517 // 		// 	long interval = max(connect[1], connected[0]) * HEARTBEAT_MULTIPLIER;
518 // 		// 	con.onReadInactivity(new ReadInactivityTask(), interval);
519 // 		// }
520 // 	}
521 
522 // 	override
523 // 	void handleFailure(Throwable ex) {
524 // 		try {
525 // 			// this.sessionFuture.setException(ex);  // no-op if already set
526 // 			this.sessionHandler.handleTransportError(this, ex);
527 // 		}
528 // 		catch (Throwable ex2) {
529 // 			version(HUNT_DEBUG) {
530 // 				trace("Uncaught failure while handling transport failure", ex2);
531 // 			}
532 // 		}
533 // 	}
534 
535 // 	override
536 // 	void afterConnectionClosed() {
537 // 		version(HUNT_DEBUG) {
538 // 			trace("Connection closed in session id=" ~ this.sessionId);
539 // 		}
540 // 		if (!this.closing) {
541 // 			resetConnection();
542 // 			handleFailure(new ConnectionLostException("Connection closed"));
543 // 		}
544 // 	}
545 
546 // 	private void resetConnection() {
547 // 		implementationMissing(false);
548 // 		// TcpConnection<?> conn = this.connection;
549 // 		// this.connection = null;
550 // 		// if (conn !is null) {
551 // 		// 	try {
552 // 		// 		conn.close();
553 // 		// 	}
554 // 		// 	catch (Throwable ex) {
555 // 		// 		// ignore
556 // 		// 	}
557 // 		// }
558 // 	}
559 
560 
561 // 	private class ReceiptHandler : Receiptable {
562 
563 		
564 // 		private string receiptId;
565 
566 // 		private Runnable[] receiptCallbacks;
567 
568 // 		private Runnable[] receiptLostCallbacks;
569 		
570 // 		// private ScheduledFuture<?> future;
571 		
572 // 		private bool result;
573 
574 // 		this(string receiptId) {
575 // 			this.receiptId = receiptId;
576 // 			if (receiptId !is null) {
577 // 				initReceiptHandling();
578 // 			}
579 // 		}
580 
581 // 		private void initReceiptHandling() {
582 // 			assert(getTaskScheduler(), "To track receipts, a TaskScheduler must be configured");
583 // 			this.outer.receiptHandlers.put(this.receiptId, this);
584 // 			Date startTime = new Date(DateTimeHelper.currentTimeMillis + getReceiptTimeLimit());
585 // 			// this.future = getTaskScheduler().schedule(this::handleReceiptNotReceived, startTime);
586 // 		}
587 
588 // 		override
589 // 		string getReceiptId() {
590 // 			return this.receiptId;
591 // 		}
592 
593 // 		override
594 // 		void addReceiptTask(Runnable task) {
595 // 			addTask(task, true);
596 // 		}
597 
598 // 		override
599 // 		void addReceiptLostTask(Runnable task) {
600 // 			addTask(task, false);
601 // 		}
602 
603 // 		private void addTask(Runnable task,  successTask) {
604 // 			assert(this.receiptId,
605 // 					"To track receipts, set autoReceiptEnabled=true or add 'receiptId' header");
606 // 			synchronized (this) {
607 // 				if (this.result !is null && this.result == successTask) {
608 // 					invoke([task]);
609 // 				}
610 // 				else {
611 // 					if (successTask) {
612 // 						this.receiptCallbacks ~= task;
613 // 					}
614 // 					else {
615 // 						this.receiptLostCallbacks ~= task;
616 // 					}
617 // 				}
618 // 			}
619 // 		}
620 
621 // 		private void invoke(Runnable[] callbacks) {
622 // 			foreach (Runnable runnable ; callbacks) {
623 // 				try {
624 // 					runnable.run();
625 // 				}
626 // 				catch (Throwable ex) {
627 // 					// ignore
628 // 				}
629 // 			}
630 // 		}
631 
632 // 		void handleReceiptReceived() {
633 // 			handleInternal(true);
634 // 		}
635 
636 // 		void handleReceiptNotReceived() {
637 // 			handleInternal(false);
638 // 		}
639 
640 // 		private void handleInternal(bool result) {
641 // 			synchronized (this) {
642 // 				if (this.result !is null) {
643 // 					return;
644 // 				}
645 // 				this.result = result;
646 // 				invoke(result ? this.receiptCallbacks : this.receiptLostCallbacks);
647 // 				this.outer.receiptHandlers.remove(this.receiptId);
648 // 				if (this.future !is null) {
649 // 					this.future.cancel(true);
650 // 				}
651 // 			}
652 // 		}
653 // 	}
654 
655 
656 // 	private class DefaultSubscription : ReceiptHandler, Subscription {
657 
658 // 		private StompHeaders headers;
659 
660 // 		private StompFrameHandler handler;
661 
662 // 		this(StompHeaders headers, StompFrameHandler handler) {
663 // 			super(headers.getReceipt());
664 // 			assert(headers.getDestination(), "Destination must not be null");
665 // 			assert(handler, "StompFrameHandler must not be null");
666 // 			this.headers = headers;
667 // 			this.handler = handler;
668 // 			this.outer.subscriptions.put(headers.getId(), this);
669 // 		}
670 
671 // 		override
672 // 		string getSubscriptionId() {
673 // 			return this.headers.getId();
674 // 		}
675 
676 // 		override
677 // 		StompHeaders getSubscriptionHeaders() {
678 // 			return this.headers;
679 // 		}
680 
681 // 		StompFrameHandler getHandler() {
682 // 			return this.handler;
683 // 		}
684 
685 // 		override
686 // 		void unsubscribe() {
687 // 			unsubscribe(null);
688 // 		}
689 
690 // 		override
691 // 		void unsubscribe(StompHeaders headers) {
692 // 			string id = this.headers.getId();
693 // 			if (id !is null) {
694 // 				this.outer.subscriptions.remove(id);
695 // 				this.outer.unsubscribe(id, headers);
696 // 			}
697 // 		}
698 
699 // 		override
700 // 		string toString() {
701 // 			return "Subscription [id=" ~ getSubscriptionId() +
702 // 					", destination='" ~ this.headers.getDestination() +
703 // 					"', receiptId='" ~ getReceiptId() ~ "', handler=" ~ getHandler() ~ "]";
704 // 		}
705 // 	}
706 
707 
708 // 	private class WriteInactivityTask : Runnable {
709 
710 // 		override
711 // 		void run() {
712 // 			implementationMissing(false);
713 // 			// TcpConnection!(byte[]) conn = connection;
714 // 			// if (conn !is null) {
715 // 			// 	conn.send(HEARTBEAT).addCallback(
716 // 			// 			new ListenableFutureCallback!(Void)() {
717 // 			// 				void onSuccess(Void result) {
718 // 			// 				}
719 // 			// 				void onFailure(Throwable ex) {
720 // 			// 					handleFailure(ex);
721 // 			// 				}
722 // 			// 			});
723 // 			// }
724 // 		}
725 // 	}
726 
727 
728 // 	private class ReadInactivityTask : Runnable {
729 
730 // 		override
731 // 		void run() {
732 // 			closing = true;
733 // 			string error = "Server has gone quiet. Closing connection in session id=" ~ sessionId ~ ".";
734 // 			version(HUNT_DEBUG) {
735 // 				trace(error);
736 // 			}
737 // 			resetConnection();
738 // 			handleFailure(new IllegalStateException(error));
739 // 		}
740 // 	}
741 
742 // }