1 /* 2 * Copyright 2002-2018 the original author or authors. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 module hunt.stomp.simp.stomp.DefaultStompSession; 18 19 import hunt.stomp.simp.stomp.StompCommand; 20 import hunt.stomp.simp.stomp.StompDecoder; 21 import hunt.stomp.simp.stomp.StompHeaders; 22 import hunt.stomp.simp.stomp.StompSessionHandler; 23 24 import hunt.stomp.exception; 25 import hunt.stomp.Message; 26 import hunt.stomp.MessagingException; 27 import hunt.stomp.converter.MessageConverter; 28 import hunt.stomp.converter.SimpleMessageConverter; 29 // 30 import hunt.stomp.support.MessageBuilder; 31 import hunt.stomp.support.MessageHeaderAccessor; 32 // import hunt.stomp.tcp.TcpConnection; 33 import hunt.stomp.IdGenerator; 34 35 // import java.lang.reflect.Type; 36 // import java.util.ArrayList; 37 // import hunt.collection.Collections; 38 // import java.util.Date; 39 // import java.util.List; 40 // import hunt.collection.Map; 41 // import java.util.concurrent.ConcurrentHashMap; 42 // import java.util.concurrent.ExecutionException; 43 // import java.util.concurrent.ScheduledFuture; 44 // import java.util.concurrent.TimeUnit; 45 // import java.util.concurrent.atomic.AtomicInteger; 46 47 // import hunt.framework.task.TaskScheduler; 48 49 import hunt.collection; 50 import hunt.util.DateTime; 51 import hunt.util.Common; 52 import hunt.Exceptions; 53 import hunt.Nullable; 54 import hunt.logging; 55 56 import std.algorithm; 57 import std.datetime; 58 import std.uuid; 59 60 // import hunt.framework.core.ResolvableType; 61 62 // import hunt.framework.util.AlternativeJdkIdGenerator; 63 // import hunt.framework.util.IdGenerator; 64 // import hunt.framework.util.StringUtils; 65 // import hunt.framework.util.concurrent.ListenableFuture; 66 // import hunt.framework.util.concurrent.ListenableFutureCallback; 67 // import hunt.framework.util.concurrent.SettableListenableFuture; 68 69 /** 70 * Default implementation of {@link ConnectionHandlingStompSession}. 71 * 72 * @author Rossen Stoyanchev 73 * @since 4.2 74 */ 75 // class DefaultStompSession : ConnectionHandlingStompSession { 76 77 // private __gshared IdGenerator idGenerator; // = new AlternativeJdkIdGenerator(); 78 79 // /** 80 // * An empty payload. 81 // */ 82 // enum byte[] EMPTY_PAYLOAD = []; 83 84 // /* STOMP spec: receiver SHOULD take into account an error margin */ 85 // private enum long HEARTBEAT_MULTIPLIER = 3; 86 87 // private __gshared Message!(byte[]) HEARTBEAT; 88 89 // shared static this() { 90 // idGenerator = new class IdGenerator { 91 // UUID generateId() { 92 // return randomUUID(); 93 // } 94 // }; 95 96 // StompHeaderAccessor accessor = StompHeaderAccessor.createForHeartbeat(); 97 // HEARTBEAT = MessageHelper.createMessage(StompDecoder.HEARTBEAT_PAYLOAD, accessor.getMessageHeaders()); 98 // } 99 100 101 // private string sessionId; 102 103 // private StompSessionHandler sessionHandler; 104 105 // private StompHeaders connectHeaders; 106 107 // // private SettableListenableFuture!(StompSession) sessionFuture = new SettableListenableFuture<>(); 108 // // private StompSession sessionFuture; 109 110 // private MessageConverter converter; 111 112 // private TaskScheduler taskScheduler; 113 114 // private long receiptTimeLimit; // = TimeUnit.SECONDS.toMillis(15); 115 116 // private bool autoReceiptEnabled; 117 118 // private TcpConnection!(byte[]) connection; 119 120 121 // private string ver; 122 123 // private shared int subscriptionIndex; // = new AtomicInteger(); 124 125 // private Map!(string, DefaultSubscription) subscriptions; // = new ConcurrentHashMap<>(4); 126 127 // private shared int receiptIndex; // = new AtomicInteger(); 128 129 // private Map!(string, ReceiptHandler) receiptHandlers; // = new ConcurrentHashMap<>(4); 130 131 // /* Whether the client is willfully closing the connection */ 132 // private bool closing = false; 133 134 135 // /** 136 // * Create a new session. 137 // * @param sessionHandler the application handler for the session 138 // * @param connectHeaders headers for the STOMP CONNECT frame 139 // */ 140 // this(StompSessionHandler sessionHandler, StompHeaders connectHeaders) { 141 // assert(sessionHandler, "StompSessionHandler must not be null"); 142 // assert(connectHeaders, "StompHeaders must not be null"); 143 // subscriptions = new HashMap!(string, DefaultSubscription)(4); 144 // receiptHandlers = new HashMap!(string, ReceiptHandler)(4); 145 146 // converter = new SimpleMessageConverter(); 147 // receiptTimeLimit = convert!(TimeUnit.Second, TimeUnit.Millisecond)(15); 148 // this.sessionId = idGenerator.generateId().toString(); 149 // this.sessionHandler = sessionHandler; 150 // this.connectHeaders = connectHeaders; 151 // } 152 153 154 // override 155 // string getSessionId() { 156 // return this.sessionId; 157 // } 158 159 // /** 160 // * Return the configured session handler. 161 // */ 162 // StompSessionHandler getSessionHandler() { 163 // return this.sessionHandler; 164 // } 165 166 // // override 167 // // ListenableFuture!(StompSession) getSessionFuture() { 168 // // return this.sessionFuture; 169 // // } 170 171 // /** 172 // * Set the {@link MessageConverter} to use to convert the payload of incoming 173 // * and outgoing messages to and from {@code byte[]} based on object type, or 174 // * expected object type, and the "content-type" header. 175 // * <p>By default, {@link SimpleMessageConverter} is configured. 176 // * @param messageConverter the message converter to use 177 // */ 178 // void setMessageConverter(MessageConverter messageConverter) { 179 // assert(messageConverter, "MessageConverter must not be null"); 180 // this.converter = messageConverter; 181 // } 182 183 // /** 184 // * Return the configured {@link MessageConverter}. 185 // */ 186 // MessageConverter getMessageConverter() { 187 // return this.converter; 188 // } 189 190 // /** 191 // * Configure the TaskScheduler to use for receipt tracking. 192 // */ 193 // void setTaskScheduler(TaskScheduler taskScheduler) { 194 // this.taskScheduler = taskScheduler; 195 // } 196 197 // /** 198 // * Return the configured TaskScheduler to use for receipt tracking. 199 // */ 200 201 // TaskScheduler getTaskScheduler() { 202 // return this.taskScheduler; 203 // } 204 205 // /** 206 // * Configure the time in milliseconds before a receipt expires. 207 // * <p>By default set to 15,000 (15 seconds). 208 // */ 209 // void setReceiptTimeLimit(long receiptTimeLimit) { 210 // assert(receiptTimeLimit > 0, "Receipt time limit must be larger than zero"); 211 // this.receiptTimeLimit = receiptTimeLimit; 212 // } 213 214 // /** 215 // * Return the configured time limit before a receipt expires. 216 // */ 217 // long getReceiptTimeLimit() { 218 // return this.receiptTimeLimit; 219 // } 220 221 // override 222 // void setAutoReceipt(bool autoReceiptEnabled) { 223 // this.autoReceiptEnabled = autoReceiptEnabled; 224 // } 225 226 // /** 227 // * Whether receipt headers should be automatically added. 228 // */ 229 // bool isAutoReceiptEnabled() { 230 // return this.autoReceiptEnabled; 231 // } 232 233 // override 234 // bool isConnected() { 235 // return (this.connection !is null); 236 // } 237 238 // override 239 // Receiptable send(string destination, Object payload) { 240 // StompHeaders headers = new StompHeaders(); 241 // headers.setDestination(destination); 242 // return send(headers, payload); 243 // } 244 245 // override 246 // Receiptable send(StompHeaders headers, Object payload) { 247 // Assert.hasText(headers.getDestination(), "Destination header is required"); 248 249 // string receiptId = checkOrAddReceipt(headers); 250 // Receiptable receiptable = new ReceiptHandler(receiptId); 251 252 // StompHeaderAccessor accessor = createHeaderAccessor(StompCommand.SEND); 253 // accessor.addNativeHeaders(headers); 254 // Message!(byte[]) message = createMessage(accessor, payload); 255 // execute(message); 256 257 // return receiptable; 258 // } 259 260 261 // private string checkOrAddReceipt(StompHeaders headers) { 262 // string receiptId = headers.getReceipt(); 263 // if (isAutoReceiptEnabled() && receiptId is null) { 264 // receiptId = to!string(this.outer.receiptIndex++); 265 // headers.setReceipt(receiptId); 266 // } 267 // return receiptId; 268 // } 269 270 // private StompHeaderAccessor createHeaderAccessor(StompCommand command) { 271 // StompHeaderAccessor accessor = StompHeaderAccessor.create(command); 272 // accessor.setSessionId(this.sessionId); 273 // accessor.setLeaveMutable(true); 274 // return accessor; 275 // } 276 277 278 // private Message!(byte[]) createMessage(StompHeaderAccessor accessor, Object payload) { 279 // accessor.updateSimpMessageHeadersFromStompHeaders(); 280 // Message!(byte[]) message; 281 // if (payload is null) { 282 // message = MessageHelper.createMessage(EMPTY_PAYLOAD, accessor.getMessageHeaders()); 283 // } 284 // else { 285 // auto pl = cast(Nullable!(byte[])) payload; 286 // if (pl !is null) { 287 // message = MessageHelper.createMessage(pl.value(), accessor.getMessageHeaders()); 288 // } 289 // else { 290 // message = cast(Message!(byte[])) getMessageConverter().toMessage(payload, accessor.getMessageHeaders()); 291 // accessor.updateStompHeadersFromSimpMessageHeaders(); 292 // if (message is null) { 293 // throw new MessageConversionException("Unable to convert payload with type='" ~ 294 // typeid(payload).name ~ "', contentType='" ~ accessor.getContentType() ~ 295 // "', converter=[" ~ getMessageConverter() ~ "]"); 296 // } 297 // } 298 // } 299 // return message; 300 // } 301 302 // private void execute(Message!(byte[]) message) { 303 // version(HUNT_DEBUG) { 304 // StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor!StompHeaderAccessor(message); 305 // if (accessor !is null) { 306 // trace("Sending " ~ accessor.getDetailedLogMessage(message.getPayload())); 307 // } 308 // } 309 // TcpConnection!(byte[]) conn = this.connection; 310 // assert(conn !is null, "Connection closed"); 311 // try { 312 // conn.send(message).get(); 313 // } 314 // catch (ExecutionException ex) { 315 // throw new MessageDeliveryException(message, ex.getCause()); 316 // } 317 // catch (Throwable ex) { 318 // throw new MessageDeliveryException(message, ex); 319 // } 320 // } 321 322 // override 323 // Subscription subscribe(string destination, StompFrameHandler handler) { 324 // StompHeaders headers = new StompHeaders(); 325 // headers.setDestination(destination); 326 // return subscribe(headers, handler); 327 // } 328 329 // override 330 // Subscription subscribe(StompHeaders headers, StompFrameHandler handler) { 331 // Assert.hasText(headers.getDestination(), "Destination header is required"); 332 // assert(handler, "StompFrameHandler must not be null"); 333 334 // string subscriptionId = headers.getId(); 335 // if (!StringUtils.hasText(subscriptionId)) { 336 // subscriptionId = string.valueOf(this.outer.subscriptionIndex++); 337 // headers.setId(subscriptionId); 338 // } 339 // checkOrAddReceipt(headers); 340 // Subscription subscription = new DefaultSubscription(headers, handler); 341 342 // StompHeaderAccessor accessor = createHeaderAccessor(StompCommand.SUBSCRIBE); 343 // accessor.addNativeHeaders(headers); 344 // Message!(byte[]) message = createMessage(accessor, EMPTY_PAYLOAD); 345 // execute(message); 346 347 // return subscription; 348 // } 349 350 // override 351 // Receiptable acknowledge(string messageId, consumed) { 352 // StompHeaders headers = new StompHeaders(); 353 // if ("1.1".equals(this.ver)) { 354 // headers.setMessageId(messageId); 355 // } 356 // else { 357 // headers.setId(messageId); 358 // } 359 // return acknowledge(headers, consumed); 360 // } 361 362 // override 363 // Receiptable acknowledge(StompHeaders headers, consumed) { 364 // string receiptId = checkOrAddReceipt(headers); 365 // Receiptable receiptable = new ReceiptHandler(receiptId); 366 367 // StompCommand command = (consumed ? StompCommand.ACK : StompCommand.NACK); 368 // StompHeaderAccessor accessor = createHeaderAccessor(command); 369 // accessor.addNativeHeaders(headers); 370 // Message!(byte[]) message = createMessage(accessor, null); 371 // execute(message); 372 373 // return receiptable; 374 // } 375 376 // private void unsubscribe(string id, StompHeaders headers) { 377 // StompHeaderAccessor accessor = createHeaderAccessor(StompCommand.UNSUBSCRIBE); 378 // if (headers !is null) { 379 // accessor.addNativeHeaders(headers); 380 // } 381 // accessor.setSubscriptionId(id); 382 // Message!(byte[]) message = createMessage(accessor, EMPTY_PAYLOAD); 383 // execute(message); 384 // } 385 386 // override 387 // void disconnect() { 388 // this.closing = true; 389 // try { 390 // StompHeaderAccessor accessor = createHeaderAccessor(StompCommand.DISCONNECT); 391 // Message!(byte[]) message = createMessage(accessor, EMPTY_PAYLOAD); 392 // execute(message); 393 // } 394 // finally { 395 // resetConnection(); 396 // } 397 // } 398 399 400 // // TcpConnectionHandler 401 402 // override 403 // void afterConnected(TcpConnection!(byte[]) connection) { 404 // this.connection = connection; 405 // version(HUNT_DEBUG) { 406 // trace("Connection established in session id=" ~ this.sessionId); 407 // } 408 // StompHeaderAccessor accessor = createHeaderAccessor(StompCommand.CONNECT); 409 // accessor.addNativeHeaders(this.connectHeaders); 410 // if (this.connectHeaders.getAcceptVersion() is null) { 411 // accessor.setAcceptVersion("1.1,1.2"); 412 // } 413 // Message!(byte[]) message = createMessage(accessor, EMPTY_PAYLOAD); 414 // execute(message); 415 // } 416 417 // override 418 // void afterConnectFailure(Throwable ex) { 419 // version(HUNT_DEBUG) { 420 // trace("Failed to connect session id=" ~ this.sessionId, ex); 421 // } 422 // // this.sessionFuture.setException(ex); 423 // this.sessionHandler.handleTransportError(this, ex); 424 // } 425 426 // override 427 // void handleMessage(Message!(byte[]) message) { 428 // StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor!(StompHeaderAccessor)(message); 429 // assert(accessor !is null, "No StompHeaderAccessor"); 430 431 // accessor.setSessionId(this.sessionId); 432 // StompCommand command = accessor.getCommand(); 433 // Map!(string, List!(string)) nativeHeaders = accessor.getNativeHeaders(); 434 // StompHeaders headers = StompHeaders.readOnlyStompHeaders(nativeHeaders); 435 // isHeartbeat = accessor.isHeartbeat(); 436 // version(HUNT_DEBUG) { 437 // trace("Received " ~ accessor.getDetailedLogMessage(message.getPayload())); 438 // } 439 440 // try { 441 // if (StompCommand.MESSAGE == command) { 442 // DefaultSubscription subscription = this.subscriptions.get(headers.getSubscription()); 443 // if (subscription !is null) { 444 // invokeHandler(subscription.getHandler(), message, headers); 445 // } 446 // else version(HUNT_DEBUG) { 447 // trace("No handler for: " ~ accessor.getDetailedLogMessage(message.getPayload()) ~ 448 // ". Perhaps just unsubscribed?"); 449 // } 450 // } 451 // else { 452 // if (StompCommand.RECEIPT == command) { 453 // string receiptId = headers.getReceiptId(); 454 // ReceiptHandler handler = this.receiptHandlers.get(receiptId); 455 // if (handler !is null) { 456 // handler.handleReceiptReceived(); 457 // } 458 // else version(HUNT_DEBUG) { 459 // trace("No matching receipt: " ~ accessor.getDetailedLogMessage(message.getPayload())); 460 // } 461 // } 462 // else if (StompCommand.CONNECTED == command) { 463 // initHeartbeatTasks(headers); 464 // this.ver = headers.getFirst("version"); 465 // // this.sessionFuture.set(this); 466 // this.sessionHandler.afterConnected(this, headers); 467 // } 468 // else if (StompCommand.ERROR == command) { 469 // invokeHandler(this.sessionHandler, message, headers); 470 // } 471 // else if (!isHeartbeat && logger.isTraceEnabled()) { 472 // trace("Message not handled."); 473 // } 474 // } 475 // } 476 // catch (Throwable ex) { 477 // this.sessionHandler.handleException(this, command, headers, message.getPayload(), ex); 478 // } 479 // } 480 481 // private void invokeHandler(StompFrameHandler handler, 482 // Message!(byte[]) message, StompHeaders headers) { 483 // if (message.getPayload().length == 0) { 484 // handler.handleFrame(headers, null); 485 // return; 486 // } 487 // // Type payloadType = handler.getPayloadType(headers); 488 // // Class<?> resolvedType = ResolvableType.forType(payloadType).resolve(); 489 // // if (resolvedType is null) { 490 // // throw new MessageConversionException("Unresolvable payload type [" ~ payloadType + 491 // // "] from handler type [" ~ handler.getClass() ~ "]"); 492 // // } 493 // // Object object = getMessageConverter().fromMessage(message, resolvedType); 494 // // if (object is null) { 495 // // throw new MessageConversionException("No suitable converter for payload type [" ~ payloadType + 496 // // "] from handler type [" ~ handler.getClass() ~ "]"); 497 // // } 498 // // handler.handleFrame(headers, object); 499 500 // } 501 502 // private void initHeartbeatTasks(StompHeaders connectedHeaders) { 503 // long[] connect = this.connectHeaders.getHeartbeat(); 504 // long[] connected = connectedHeaders.getHeartbeat(); 505 // if (connect is null || connected is null) { 506 // return; 507 // } 508 // // TODO: Tasks pending completion -@zxp at 10/30/2018, 3:10:33 PM 509 // // 510 // // TcpConnection!(byte[]) con = this.connection; 511 // // assert(con !is null, "No TcpConnection available"); 512 // // if (connect[0] > 0 && connected[1] > 0) { 513 // // long interval = max(connect[0], connected[1]); 514 // // con.onWriteInactivity(new WriteInactivityTask(), interval); 515 // // } 516 // // if (connect[1] > 0 && connected[0] > 0) { 517 // // long interval = max(connect[1], connected[0]) * HEARTBEAT_MULTIPLIER; 518 // // con.onReadInactivity(new ReadInactivityTask(), interval); 519 // // } 520 // } 521 522 // override 523 // void handleFailure(Throwable ex) { 524 // try { 525 // // this.sessionFuture.setException(ex); // no-op if already set 526 // this.sessionHandler.handleTransportError(this, ex); 527 // } 528 // catch (Throwable ex2) { 529 // version(HUNT_DEBUG) { 530 // trace("Uncaught failure while handling transport failure", ex2); 531 // } 532 // } 533 // } 534 535 // override 536 // void afterConnectionClosed() { 537 // version(HUNT_DEBUG) { 538 // trace("Connection closed in session id=" ~ this.sessionId); 539 // } 540 // if (!this.closing) { 541 // resetConnection(); 542 // handleFailure(new ConnectionLostException("Connection closed")); 543 // } 544 // } 545 546 // private void resetConnection() { 547 // implementationMissing(false); 548 // // TcpConnection<?> conn = this.connection; 549 // // this.connection = null; 550 // // if (conn !is null) { 551 // // try { 552 // // conn.close(); 553 // // } 554 // // catch (Throwable ex) { 555 // // // ignore 556 // // } 557 // // } 558 // } 559 560 561 // private class ReceiptHandler : Receiptable { 562 563 564 // private string receiptId; 565 566 // private Runnable[] receiptCallbacks; 567 568 // private Runnable[] receiptLostCallbacks; 569 570 // // private ScheduledFuture<?> future; 571 572 // private bool result; 573 574 // this(string receiptId) { 575 // this.receiptId = receiptId; 576 // if (receiptId !is null) { 577 // initReceiptHandling(); 578 // } 579 // } 580 581 // private void initReceiptHandling() { 582 // assert(getTaskScheduler(), "To track receipts, a TaskScheduler must be configured"); 583 // this.outer.receiptHandlers.put(this.receiptId, this); 584 // Date startTime = new Date(DateTimeHelper.currentTimeMillis + getReceiptTimeLimit()); 585 // // this.future = getTaskScheduler().schedule(this::handleReceiptNotReceived, startTime); 586 // } 587 588 // override 589 // string getReceiptId() { 590 // return this.receiptId; 591 // } 592 593 // override 594 // void addReceiptTask(Runnable task) { 595 // addTask(task, true); 596 // } 597 598 // override 599 // void addReceiptLostTask(Runnable task) { 600 // addTask(task, false); 601 // } 602 603 // private void addTask(Runnable task, successTask) { 604 // assert(this.receiptId, 605 // "To track receipts, set autoReceiptEnabled=true or add 'receiptId' header"); 606 // synchronized (this) { 607 // if (this.result !is null && this.result == successTask) { 608 // invoke([task]); 609 // } 610 // else { 611 // if (successTask) { 612 // this.receiptCallbacks ~= task; 613 // } 614 // else { 615 // this.receiptLostCallbacks ~= task; 616 // } 617 // } 618 // } 619 // } 620 621 // private void invoke(Runnable[] callbacks) { 622 // foreach (Runnable runnable ; callbacks) { 623 // try { 624 // runnable.run(); 625 // } 626 // catch (Throwable ex) { 627 // // ignore 628 // } 629 // } 630 // } 631 632 // void handleReceiptReceived() { 633 // handleInternal(true); 634 // } 635 636 // void handleReceiptNotReceived() { 637 // handleInternal(false); 638 // } 639 640 // private void handleInternal(bool result) { 641 // synchronized (this) { 642 // if (this.result !is null) { 643 // return; 644 // } 645 // this.result = result; 646 // invoke(result ? this.receiptCallbacks : this.receiptLostCallbacks); 647 // this.outer.receiptHandlers.remove(this.receiptId); 648 // if (this.future !is null) { 649 // this.future.cancel(true); 650 // } 651 // } 652 // } 653 // } 654 655 656 // private class DefaultSubscription : ReceiptHandler, Subscription { 657 658 // private StompHeaders headers; 659 660 // private StompFrameHandler handler; 661 662 // this(StompHeaders headers, StompFrameHandler handler) { 663 // super(headers.getReceipt()); 664 // assert(headers.getDestination(), "Destination must not be null"); 665 // assert(handler, "StompFrameHandler must not be null"); 666 // this.headers = headers; 667 // this.handler = handler; 668 // this.outer.subscriptions.put(headers.getId(), this); 669 // } 670 671 // override 672 // string getSubscriptionId() { 673 // return this.headers.getId(); 674 // } 675 676 // override 677 // StompHeaders getSubscriptionHeaders() { 678 // return this.headers; 679 // } 680 681 // StompFrameHandler getHandler() { 682 // return this.handler; 683 // } 684 685 // override 686 // void unsubscribe() { 687 // unsubscribe(null); 688 // } 689 690 // override 691 // void unsubscribe(StompHeaders headers) { 692 // string id = this.headers.getId(); 693 // if (id !is null) { 694 // this.outer.subscriptions.remove(id); 695 // this.outer.unsubscribe(id, headers); 696 // } 697 // } 698 699 // override 700 // string toString() { 701 // return "Subscription [id=" ~ getSubscriptionId() + 702 // ", destination='" ~ this.headers.getDestination() + 703 // "', receiptId='" ~ getReceiptId() ~ "', handler=" ~ getHandler() ~ "]"; 704 // } 705 // } 706 707 708 // private class WriteInactivityTask : Runnable { 709 710 // override 711 // void run() { 712 // implementationMissing(false); 713 // // TcpConnection!(byte[]) conn = connection; 714 // // if (conn !is null) { 715 // // conn.send(HEARTBEAT).addCallback( 716 // // new ListenableFutureCallback!(Void)() { 717 // // void onSuccess(Void result) { 718 // // } 719 // // void onFailure(Throwable ex) { 720 // // handleFailure(ex); 721 // // } 722 // // }); 723 // // } 724 // } 725 // } 726 727 728 // private class ReadInactivityTask : Runnable { 729 730 // override 731 // void run() { 732 // closing = true; 733 // string error = "Server has gone quiet. Closing connection in session id=" ~ sessionId ~ "."; 734 // version(HUNT_DEBUG) { 735 // trace(error); 736 // } 737 // resetConnection(); 738 // handleFailure(new IllegalStateException(error)); 739 // } 740 // } 741 742 // }