1 /* 2 * Copyright 2002-2018 the original author or authors. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 module hunt.stomp.simp.broker.SimpleBrokerMessageHandler; 18 19 import hunt.stomp.simp.broker.AbstractBrokerMessageHandler; 20 import hunt.stomp.simp.broker.DefaultSubscriptionRegistry; 21 import hunt.stomp.simp.broker.SubscriptionRegistry; 22 23 import hunt.stomp.Message; 24 import hunt.stomp.MessageChannel; 25 import hunt.stomp.MessageHeaders; 26 import hunt.stomp.simp.SimpMessageHeaderAccessor; 27 import hunt.stomp.simp.SimpMessageType; 28 import hunt.stomp.support.GenericMessage; 29 import hunt.stomp.support.MessageBuilder; 30 import hunt.stomp.support.MessageHeaderAccessor; 31 32 // import hunt.framework.task.TaskScheduler; 33 34 import hunt.collection; 35 import hunt.util.DateTime; 36 import hunt.util.Common; 37 import hunt.Exceptions; 38 import hunt.logging; 39 import hunt.text.PathMatcher; 40 41 // dfmt off 42 version(Have_hunt_security) { 43 import hunt.security.Principal; 44 } 45 // dfmt on 46 47 import std.algorithm; 48 import std.conv; 49 50 /** 51 * A "simple" message broker that recognizes the message types defined in 52 * {@link SimpMessageType}, keeps track of subscriptions with the help of a 53 * {@link SubscriptionRegistry} and sends messages to subscribers. 54 * 55 * @author Rossen Stoyanchev 56 * @author Juergen Hoeller 57 * @since 4.0 58 */ 59 class SimpleBrokerMessageHandler : AbstractBrokerMessageHandler { 60 61 private enum byte[] EMPTY_PAYLOAD = []; 62 63 private PathMatcher pathMatcher; 64 65 private int cacheLimit; 66 67 private string selectorHeaderName = "selector"; 68 69 // private TaskScheduler taskScheduler; 70 71 private long[] heartbeatValue; 72 73 private MessageHeaderInitializer headerInitializer; 74 75 private SubscriptionRegistry subscriptionRegistry; 76 77 private Map!(string, SessionInfo) sessions; 78 79 80 // private ScheduledFuture<?> heartbeatFuture; 81 82 83 /** 84 * Create a SimpleBrokerMessageHandler instance with the given message channels 85 * and destination prefixes. 86 * @param clientInboundChannel the channel for receiving messages from clients (e.g. WebSocket clients) 87 * @param clientOutboundChannel the channel for sending messages to clients (e.g. WebSocket clients) 88 * @param brokerChannel the channel for the application to send messages to the broker 89 * @param destinationPrefixes prefixes to use to filter out messages 90 */ 91 this(SubscribableChannel clientInboundChannel, MessageChannel clientOutboundChannel, 92 SubscribableChannel brokerChannel, string[] destinationPrefixes) { 93 94 super(clientInboundChannel, clientOutboundChannel, brokerChannel, destinationPrefixes); 95 sessions = new HashMap!(string, SessionInfo)(); 96 this.subscriptionRegistry = new DefaultSubscriptionRegistry(); 97 } 98 99 100 /** 101 * Configure a custom SubscriptionRegistry to use for storing subscriptions. 102 * <p><strong>Note</strong> that when a custom PathMatcher is configured via 103 * {@link #setPathMatcher}, if the custom registry is not an instance of 104 * {@link DefaultSubscriptionRegistry}, the provided PathMatcher is not used 105 * and must be configured directly on the custom registry. 106 */ 107 void setSubscriptionRegistry(SubscriptionRegistry subscriptionRegistry) { 108 assert(subscriptionRegistry, "SubscriptionRegistry must not be null"); 109 this.subscriptionRegistry = subscriptionRegistry; 110 initPathMatcherToUse(); 111 initCacheLimitToUse(); 112 initSelectorHeaderNameToUse(); 113 } 114 115 SubscriptionRegistry getSubscriptionRegistry() { 116 return this.subscriptionRegistry; 117 } 118 119 /** 120 * When configured, the given PathMatcher is passed down to the underlying 121 * SubscriptionRegistry to use for matching destination to subscriptions. 122 * <p>Default is a standard {@link hunt.framework.util.AntPathMatcher}. 123 * @since 4.1 124 * @see #setSubscriptionRegistry 125 * @see DefaultSubscriptionRegistry#setPathMatcher 126 * @see hunt.framework.util.AntPathMatcher 127 */ 128 void setPathMatcher(PathMatcher pathMatcher) { 129 this.pathMatcher = pathMatcher; 130 initPathMatcherToUse(); 131 } 132 133 private void initPathMatcherToUse() { 134 auto s = cast(DefaultSubscriptionRegistry) this.subscriptionRegistry; 135 if (this.pathMatcher !is null && s !is null) { 136 s.setPathMatcher(this.pathMatcher); 137 } 138 } 139 140 /** 141 * When configured, the specified cache limit is passed down to the 142 * underlying SubscriptionRegistry, overriding any default there. 143 * <p>With a standard {@link DefaultSubscriptionRegistry}, the default 144 * cache limit is 1024. 145 * @since 4.3.2 146 * @see #setSubscriptionRegistry 147 * @see DefaultSubscriptionRegistry#setCacheLimit 148 * @see DefaultSubscriptionRegistry#DEFAULT_CACHE_LIMIT 149 */ 150 void setCacheLimit(int cacheLimit) { 151 this.cacheLimit = cacheLimit; 152 initCacheLimitToUse(); 153 } 154 155 private void initCacheLimitToUse() { 156 auto s = cast(DefaultSubscriptionRegistry) this.subscriptionRegistry; 157 if (s !is null) { 158 s.setCacheLimit(this.cacheLimit); 159 } 160 } 161 162 /** 163 * Configure the name of a header that a subscription message can have for 164 * the purpose of filtering messages matched to the subscription. The header 165 * value is expected to be a Spring EL expression to be applied to 166 * the headers of messages matched to the subscription. 167 * <p>For example: 168 * <pre> 169 * headers.foo == 'bar' 170 * </pre> 171 * <p>By default this is set to "selector". You can set it to a different 172 * name, or to {@code null} to turn off support for a selector header. 173 * @param selectorHeaderName the name to use for a selector header 174 * @since 4.3.17 175 * @see #setSubscriptionRegistry 176 * @see DefaultSubscriptionRegistry#setSelectorHeaderName(string) 177 */ 178 void setSelectorHeaderName(string selectorHeaderName) { 179 this.selectorHeaderName = selectorHeaderName; 180 initSelectorHeaderNameToUse(); 181 } 182 183 private void initSelectorHeaderNameToUse() { 184 auto s = cast(DefaultSubscriptionRegistry) this.subscriptionRegistry; 185 if (s !is null) { 186 s.setSelectorHeaderName(this.selectorHeaderName); 187 } 188 } 189 190 /** 191 * Configure the {@link hunt.framework.scheduling.TaskScheduler} to 192 * use for providing heartbeat support. Setting this property also sets the 193 * {@link #setHeartbeatValue heartbeatValue} to "10000, 10000". 194 * <p>By default this is not set. 195 * @since 4.2 196 */ 197 // void setTaskScheduler(TaskScheduler taskScheduler) { 198 // this.taskScheduler = taskScheduler; 199 // if (taskScheduler !is null && this.heartbeatValue is null) { 200 // this.heartbeatValue = [10000, 10000]; 201 // } 202 // } 203 204 /** 205 * Return the configured TaskScheduler. 206 * @since 4.2 207 */ 208 209 // TaskScheduler getTaskScheduler() { 210 // return this.taskScheduler; 211 // } 212 213 /** 214 * Configure the value for the heart-beat settings. The first number 215 * represents how often the server will write or send a heartbeat. 216 * The second is how often the client should write. 0 means no heartbeats. 217 * <p>By default this is set to "0, 0" unless the {@link #setTaskScheduler 218 * taskScheduler} in which case the default becomes "10000,10000" 219 * (in milliseconds). 220 * @since 4.2 221 */ 222 void setHeartbeatValue(long[] heartbeat) { 223 if (heartbeat !is null && (heartbeat.length != 2 || heartbeat[0] < 0 || heartbeat[1] < 0)) { 224 throw new IllegalArgumentException("Invalid heart-beat: " ~ heartbeat.to!string()); 225 } 226 this.heartbeatValue = heartbeat; 227 } 228 229 /** 230 * The configured value for the heart-beat settings. 231 * @since 4.2 232 */ 233 234 long[] getHeartbeatValue() { 235 return this.heartbeatValue; 236 } 237 238 /** 239 * Configure a {@link MessageHeaderInitializer} to apply to the headers 240 * of all messages sent to the client outbound channel. 241 * <p>By default this property is not set. 242 * @since 4.1 243 */ 244 void setHeaderInitializer(MessageHeaderInitializer headerInitializer) { 245 this.headerInitializer = headerInitializer; 246 } 247 248 /** 249 * Return the configured header initializer. 250 * @since 4.1 251 */ 252 253 MessageHeaderInitializer getHeaderInitializer() { 254 return this.headerInitializer; 255 } 256 257 258 override 259 void startInternal() { 260 publishBrokerAvailableEvent(); 261 // implementationMissing(false); 262 // TODO: Tasks pending completion -@zxp at 11/13/2018, 3:18:33 PM 263 // 264 // if (this.taskScheduler !is null) { 265 // long interval = initHeartbeatTaskDelay(); 266 // if (interval > 0) { 267 // // this.heartbeatFuture = this.taskScheduler.scheduleWithFixedDelay(new HeartbeatTask(), interval); 268 // implementationMissing(false); 269 // } 270 // } 271 // else { 272 // assert(getHeartbeatValue() is null || 273 // (getHeartbeatValue()[0] == 0 && getHeartbeatValue()[1] == 0), 274 // "Heartbeat values configured but no TaskScheduler provided"); 275 // } 276 } 277 278 private long initHeartbeatTaskDelay() { 279 if (getHeartbeatValue() is null) { 280 return 0; 281 } 282 else if (getHeartbeatValue()[0] > 0 && getHeartbeatValue()[1] > 0) { 283 return min(getHeartbeatValue()[0], getHeartbeatValue()[1]); 284 } 285 else { 286 return (getHeartbeatValue()[0] > 0 ? getHeartbeatValue()[0] : getHeartbeatValue()[1]); 287 } 288 } 289 290 override 291 void stopInternal() { 292 publishBrokerUnavailableEvent(); 293 // if (this.heartbeatFuture !is null) { 294 // this.heartbeatFuture.cancel(true); 295 // } 296 } 297 298 override 299 protected void handleMessageInternal(MessageBase message) { 300 MessageHeaders headers = message.getHeaders(); 301 version(HUNT_DEBUG) { 302 trace(headers.toString()); 303 } 304 305 SimpMessageType messageType = SimpMessageHeaderAccessor.getMessageType(headers); 306 string destination = SimpMessageHeaderAccessor.getDestination(headers); 307 string sessionId = SimpMessageHeaderAccessor.getSessionId(headers); 308 309 updateSessionReadTime(sessionId); 310 311 if (!checkDestinationPrefix(destination)) { 312 return; 313 } 314 315 if (SimpMessageType.MESSAGE == messageType) { 316 logMessage(message); 317 sendMessageToSubscribers(destination, message); 318 } 319 else if (SimpMessageType.CONNECT == messageType) { 320 logMessage(message); 321 if (sessionId !is null) { 322 long[] heartbeatIn = SimpMessageHeaderAccessor.getHeartbeat(headers); 323 long[] heartbeatOut = getHeartbeatValue(); 324 // TODO: Tasks pending completion -@zxp at 10/31/2018, 4:39:26 PM 325 // 326 version(Have_hunt_security) { 327 Principal user = null; // SimpMessageHeaderAccessor.getUser(headers); 328 } 329 330 MessageChannel outChannel = getClientOutboundChannelForSession(sessionId); 331 332 // version(Have_hunt_security) { 333 // this.sessions.put(sessionId, new SessionInfo(sessionId, user, outChannel, heartbeatIn, heartbeatOut)); 334 // } else { 335 // this.sessions.put(sessionId, new SessionInfo(sessionId, outChannel, heartbeatIn, heartbeatOut)); 336 // } 337 338 this.sessions.put(sessionId, new SessionInfo(sessionId, outChannel, heartbeatIn, heartbeatOut)); 339 340 SimpMessageHeaderAccessor connectAck = SimpMessageHeaderAccessor.create(SimpMessageType.CONNECT_ACK); 341 initHeaders(connectAck); 342 connectAck.setSessionId(sessionId); 343 // if (user !is null) { 344 // connectAck.setUser(user); 345 // } 346 connectAck.setHeader(SimpMessageHeaderAccessor.CONNECT_MESSAGE_HEADER, message); 347 connectAck.setHeader(SimpMessageHeaderAccessor.HEART_BEAT_HEADER, heartbeatOut); 348 MessageBase messageOut = MessageHelper.createMessage(EMPTY_PAYLOAD, connectAck.getMessageHeaders()); 349 info("responding .........."); 350 getClientOutboundChannel().send(messageOut); 351 info("responding ..........done"); 352 } else { 353 warning("sessionId is null"); 354 } 355 } 356 else if (SimpMessageType.DISCONNECT == messageType) { 357 logMessage(message); 358 if (sessionId !is null) { 359 // Principal user = SimpMessageHeaderAccessor.getUser(headers); 360 361 // version(Have_hunt_security) { 362 // handleDisconnect(sessionId, null, message); 363 // } else { 364 // handleDisconnect(sessionId, message); 365 // } 366 367 handleDisconnect(sessionId, message); 368 } 369 } 370 else if (SimpMessageType.SUBSCRIBE == messageType) { 371 logMessage(message); 372 this.subscriptionRegistry.registerSubscription(message); 373 } 374 else if (SimpMessageType.UNSUBSCRIBE == messageType) { 375 logMessage(message); 376 this.subscriptionRegistry.unregisterSubscription(message); 377 } 378 } 379 380 private void updateSessionReadTime(string sessionId) { 381 if (sessionId !is null) { 382 SessionInfo info = this.sessions.get(sessionId); 383 if (info !is null) { 384 info.setLastReadTime(DateTime.currentTimeMillis); 385 } 386 } 387 } 388 389 private void logMessage(MessageBase message) { 390 version(HUNT_DEBUG) { 391 SimpMessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor!SimpMessageHeaderAccessor(message); 392 accessor = (accessor !is null ? accessor : SimpMessageHeaderAccessor.wrap(message)); 393 import hunt.stomp.support.GenericMessage; 394 import hunt.Nullable; 395 GenericMessage!(byte[]) gm = cast(GenericMessage!(byte[]))message; 396 if(gm is null) 397 trace("Processing " ~ typeid(cast(Object)message).name); 398 else 399 trace("Processing " ~ accessor.getShortLogMessage(new Nullable!(byte[])(gm.getPayload()))); 400 } 401 } 402 403 private void initHeaders(SimpMessageHeaderAccessor accessor) { 404 if (getHeaderInitializer() !is null) { 405 getHeaderInitializer().initHeaders(accessor); 406 } 407 } 408 409 private void handleDisconnect(string sessionId, MessageBase origMessage) { 410 this.sessions.remove(sessionId); 411 this.subscriptionRegistry.unregisterAllSubscriptions(sessionId); 412 SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.create(SimpMessageType.DISCONNECT_ACK); 413 accessor.setSessionId(sessionId); 414 // if (user !is null) { 415 // accessor.setUser(user); 416 // } 417 if (origMessage !is null) { 418 accessor.setHeader(SimpMessageHeaderAccessor.DISCONNECT_MESSAGE_HEADER, origMessage); 419 } 420 initHeaders(accessor); 421 MessageBase message = MessageHelper.createMessage(EMPTY_PAYLOAD, accessor.getMessageHeaders()); 422 getClientOutboundChannel().send(message); 423 } 424 425 protected void sendMessageToSubscribers(string destination, MessageBase message) { 426 MultiValueMap!(string,string) subscriptions = this.subscriptionRegistry.findSubscriptions(message); 427 version(HUNT_DEBUG) { 428 if (!subscriptions.isEmpty()) { 429 trace("Broadcasting to " ~ subscriptions.size().to!string() ~ " sessions."); 430 } 431 } 432 long now = DateTime.currentTimeMillis(); 433 foreach(string sessionId, List!string subscriptionIds; subscriptions) { 434 foreach (string subscriptionId ; subscriptionIds) { 435 SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE); 436 initHeaders(headerAccessor); 437 headerAccessor.setSessionId(sessionId); 438 headerAccessor.setSubscriptionId(subscriptionId); 439 headerAccessor.copyHeadersIfAbsent(message.getHeaders()); 440 headerAccessor.setLeaveMutable(true); 441 // warning(message.payloadType); 442 auto gm = cast(GenericMessage!(string))message; 443 if(gm is null) { 444 warning("Can't cast message: %s", typeid(message)); 445 continue; 446 } 447 string payload = gm.getPayload(); 448 MessageBase reply = MessageHelper.createMessage(cast(byte[])payload, headerAccessor.getMessageHeaders()); 449 SessionInfo info = this.sessions.get(sessionId); 450 if (info !is null) { 451 try { 452 info.getClientOutboundChannel().send(reply); 453 } 454 catch (Throwable ex) { 455 errorf("Failed to send " ~ message.to!string() ~ ": \n%s", ex.msg); 456 } 457 finally { 458 info.setLastWriteTime(now); 459 } 460 } 461 } 462 } 463 } 464 465 466 int opCmp(MessageHandler o) { 467 implementationMissing(false); 468 return 0; 469 } 470 471 472 override 473 string toString() { 474 return "SimpleBrokerMessageHandler [" ~ this.subscriptionRegistry.to!string() ~ "]"; 475 } 476 477 478 private class HeartbeatTask : Runnable { 479 480 override 481 void run() { 482 long now =DateTime.currentTimeMillis(); 483 foreach (SessionInfo info ; sessions.values()) { 484 if (info.getReadInterval() > 0 && (now - info.getLastReadTime()) > info.getReadInterval()) { 485 486 // version(Have_hunt_security) { 487 // handleDisconnect(info.getSessionId(), null, null); // info.getUser() 488 // } else { 489 // handleDisconnect(info.getSessionId(), null); // info.getUser() 490 // } 491 handleDisconnect(info.getSessionId(), null); 492 493 } 494 if (info.getWriteInterval() > 0 && (now - info.getLastWriteTime()) > info.getWriteInterval()) { 495 SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.create(SimpMessageType.HEARTBEAT); 496 accessor.setSessionId(info.getSessionId()); 497 // TODO: Tasks pending completion -@zxp at 10/31/2018, 4:39:51 PM 498 // 499 // Principal user = info.getUser(); 500 // if (user !is null) { 501 // accessor.setUser(user); 502 // } 503 initHeaders(accessor); 504 accessor.setLeaveMutable(true); 505 MessageHeaders headers = accessor.getMessageHeaders(); 506 info.getClientOutboundChannel().send(MessageHelper.createMessage(EMPTY_PAYLOAD, headers)); 507 } 508 } 509 } 510 } 511 512 } 513 514 515 516 private class SessionInfo { 517 518 /* STOMP spec: receiver SHOULD take into account an error margin */ 519 private enum long HEARTBEAT_MULTIPLIER = 3; 520 521 private string sessionId; 522 523 // private Principal user; 524 525 private MessageChannel clientOutboundChannel; 526 527 private long readInterval; 528 529 private long writeInterval; 530 531 private long lastReadTime; 532 533 private long lastWriteTime; 534 535 this(string sessionId, MessageChannel outboundChannel, 536 long[] clientHeartbeat, long[] serverHeartbeat) { // Principal user, 537 538 this.sessionId = sessionId; 539 // this.user = user; 540 this.clientOutboundChannel = outboundChannel; 541 if (clientHeartbeat !is null && serverHeartbeat !is null) { 542 this.readInterval = (clientHeartbeat[0] > 0 && serverHeartbeat[1] > 0 ? 543 max(clientHeartbeat[0], serverHeartbeat[1]) * HEARTBEAT_MULTIPLIER : 0); 544 this.writeInterval = (clientHeartbeat[1] > 0 && serverHeartbeat[0] > 0 ? 545 max(clientHeartbeat[1], serverHeartbeat[0]) : 0); 546 } 547 else { 548 this.readInterval = 0; 549 this.writeInterval = 0; 550 } 551 this.lastReadTime = this.lastWriteTime =DateTime.currentTimeMillis(); 552 } 553 554 string getSessionId() { 555 return this.sessionId; 556 } 557 558 559 // Principal getUser() { 560 // return this.user; 561 // } 562 563 MessageChannel getClientOutboundChannel() { 564 return this.clientOutboundChannel; 565 } 566 567 long getReadInterval() { 568 return this.readInterval; 569 } 570 571 long getWriteInterval() { 572 return this.writeInterval; 573 } 574 575 long getLastReadTime() { 576 return this.lastReadTime; 577 } 578 579 void setLastReadTime(long lastReadTime) { 580 this.lastReadTime = lastReadTime; 581 } 582 583 long getLastWriteTime() { 584 return this.lastWriteTime; 585 } 586 587 void setLastWriteTime(long lastWriteTime) { 588 this.lastWriteTime = lastWriteTime; 589 } 590 } 591