1 /* 2 * Copyright 2002-2018 the original author or authors. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 module hunt.stomp.simp.broker.DefaultSubscriptionRegistry; 18 19 import hunt.stomp.simp.broker.AbstractSubscriptionRegistry; 20 import hunt.stomp.simp.broker.SubscriptionRegistry; 21 22 import hunt.collection; 23 import hunt.Exceptions; 24 import hunt.logging; 25 import hunt.text.PathMatcher; 26 27 import std.array; 28 import std.conv; 29 import std.string; 30 31 // import java.util.concurrent.ConcurrentHashMap; 32 // import java.util.concurrent.ConcurrentMap; 33 // import java.util.concurrent.CopyOnWriteArraySet; 34 35 // import hunt.framework.expression.EvaluationContext; 36 // import hunt.framework.expression.Expression; 37 // import hunt.framework.expression.ExpressionParser; 38 // import hunt.framework.expression.PropertyAccessor; 39 // import hunt.framework.expression.TypedValue; 40 // import hunt.framework.expression.spel.SpelEvaluationException; 41 // import hunt.framework.expression.spel.standard.SpelExpressionParser; 42 // import hunt.framework.expression.spel.support.SimpleEvaluationContext; 43 44 import hunt.stomp.Message; 45 import hunt.stomp.MessageHeaders; 46 import hunt.stomp.simp.SimpMessageHeaderAccessor; 47 import hunt.stomp.support.MessageHeaderAccessor; 48 // import hunt.framework.util.AntPathMatcher; 49 50 // import hunt.framework.util.LinkedMultiValueMap; 51 // import hunt.framework.util.MultiValueMap; 52 // import hunt.framework.util.PathMatcher; 53 // import hunt.framework.util.StringUtils; 54 55 /** 56 * Implementation of {@link SubscriptionRegistry} that stores subscriptions 57 * in memory and uses a {@link hunt.framework.util.PathMatcher PathMatcher} 58 * for matching destinations. 59 * 60 * <p>As of 4.2, this class supports a {@link #setSelectorHeaderName selector} 61 * header on subscription messages with Spring EL expressions evaluated against 62 * the headers to filter out messages in addition to destination matching. 63 * 64 * @author Rossen Stoyanchev 65 * @author Sebastien Deleuze 66 * @author Juergen Hoeller 67 * @since 4.0 68 */ 69 class DefaultSubscriptionRegistry : AbstractSubscriptionRegistry { 70 71 /** Default maximum number of entries for the destination cache: 1024. */ 72 enum int DEFAULT_CACHE_LIMIT = 1024; 73 74 /** Static evaluation context to reuse. */ 75 // private __gshared EvaluationContext messageEvalContext; 76 77 78 private PathMatcher pathMatcher; 79 80 private int cacheLimit = DEFAULT_CACHE_LIMIT; 81 82 private string selectorHeaderName = "selector"; 83 84 private bool selectorHeaderInUse = false; 85 86 // private ExpressionParser expressionParser; 87 88 private DestinationCache destinationCache; 89 90 private SessionSubscriptionRegistry subscriptionRegistry; 91 92 // shared static this() { 93 // messageEvalContext = 94 // SimpleEvaluationContext.forPropertyAccessors(new SimpMessageHeaderPropertyAccessor()).build(); 95 // } 96 97 this() { 98 pathMatcher = new AntPathMatcher(); 99 // expressionParser = new SpelExpressionParser(); 100 destinationCache = new DestinationCache(); 101 subscriptionRegistry = new SessionSubscriptionRegistry(); 102 } 103 104 105 /** 106 * Specify the {@link PathMatcher} to use. 107 */ 108 void setPathMatcher(PathMatcher pathMatcher) { 109 this.pathMatcher = pathMatcher; 110 } 111 112 /** 113 * Return the configured {@link PathMatcher}. 114 */ 115 PathMatcher getPathMatcher() { 116 return this.pathMatcher; 117 } 118 119 /** 120 * Specify the maximum number of entries for the resolved destination cache. 121 * Default is 1024. 122 */ 123 void setCacheLimit(int cacheLimit) { 124 this.cacheLimit = cacheLimit; 125 } 126 127 /** 128 * Return the maximum number of entries for the resolved destination cache. 129 */ 130 int getCacheLimit() { 131 return this.cacheLimit; 132 } 133 134 /** 135 * Configure the name of a header that a subscription message can have for 136 * the purpose of filtering messages matched to the subscription. The header 137 * value is expected to be a Spring EL expression to be applied to 138 * the headers of messages matched to the subscription. 139 * <p>For example: 140 * <pre> 141 * headers.foo == 'bar' 142 * </pre> 143 * <p>By default this is set to "selector". You can set it to a different 144 * name, or to {@code null} to turn off support for a selector header. 145 * @param selectorHeaderName the name to use for a selector header 146 * @since 4.2 147 */ 148 void setSelectorHeaderName(string selectorHeaderName) { 149 this.selectorHeaderName = selectorHeaderName; 150 } 151 152 /** 153 * Return the name for the selector header name. 154 * @since 4.2 155 */ 156 157 string getSelectorHeaderName() { 158 return this.selectorHeaderName; 159 } 160 161 162 override 163 protected void addSubscriptionInternal( 164 string sessionId, string subsId, string destination, MessageBase message) { 165 166 // Expression expression = getSelectorExpression(message.getHeaders()); 167 // this.subscriptionRegistry.addSubscription(sessionId, subsId, destination, expression); 168 this.subscriptionRegistry.addSubscription(sessionId, subsId, destination); 169 this.destinationCache.updateAfterNewSubscription(destination, sessionId, subsId); 170 171 infof("Subscription added: id=%s, subsId=%s, destination=%s", 172 sessionId, subsId, destination); 173 } 174 175 176 // private Expression getSelectorExpression(MessageHeaders headers) { 177 // Expression expression = null; 178 // if (getSelectorHeaderName() !is null) { 179 // string selector = SimpMessageHeaderAccessor.getFirstNativeHeader(getSelectorHeaderName(), headers); 180 // if (selector !is null) { 181 // try { 182 // expression = this.expressionParser.parseExpression(selector); 183 // this.selectorHeaderInUse = true; 184 // version(HUNT_DEBUG) { 185 // trace("Subscription selector: [" ~ selector ~ "]"); 186 // } 187 // } 188 // catch (Throwable ex) { 189 // version(HUNT_DEBUG) { 190 // trace("Failed to parse selector: " ~ selector, ex); 191 // } 192 // } 193 // } 194 // } 195 // return expression; 196 // } 197 198 override 199 protected void removeSubscriptionInternal(string sessionId, string subsId, MessageBase message) { 200 SessionSubscriptionInfo info = this.subscriptionRegistry.getSubscriptions(sessionId); 201 if (info !is null) { 202 string destination = info.removeSubscription(subsId); 203 if (destination !is null) { 204 this.destinationCache.updateAfterRemovedSubscription(sessionId, subsId); 205 } 206 } 207 } 208 209 // override 210 void unregisterAllSubscriptions(string sessionId) { 211 SessionSubscriptionInfo info = this.subscriptionRegistry.removeSubscriptions(sessionId); 212 if (info !is null) { 213 this.destinationCache.updateAfterRemovedSession(info); 214 } 215 } 216 217 override 218 protected MultiValueMap!(string, string) findSubscriptionsInternal(string destination, MessageBase message) { 219 MultiValueMap!(string, string) result = this.destinationCache.getSubscriptions(destination, message); 220 return filterSubscriptions(result, message); 221 } 222 223 private MultiValueMap!(string, string) filterSubscriptions( 224 MultiValueMap!(string, string) allMatches, MessageBase message) { 225 226 if (!this.selectorHeaderInUse) { 227 return allMatches; 228 } 229 MultiValueMap!(string, string) result = new LinkedMultiValueMap!(string, string)(allMatches.size()); 230 implementationMissing(false); 231 // allMatches.forEach((sessionId, subIds) -> { 232 // for (string subId : subIds) { 233 // SessionSubscriptionInfo info = this.subscriptionRegistry.getSubscriptions(sessionId); 234 // if (info is null) { 235 // continue; 236 // } 237 // Subscription sub = info.getSubscription(subId); 238 // if (sub is null) { 239 // continue; 240 // } 241 // Expression expression = sub.getSelectorExpression(); 242 // if (expression is null) { 243 // result.add(sessionId, subId); 244 // continue; 245 // } 246 // try { 247 // if (Boolean.TRUE.equals(expression.getValue(messageEvalContext, message, Boolean.class))) { 248 // result.add(sessionId, subId); 249 // } 250 // } 251 // catch (SpelEvaluationException ex) { 252 // version(HUNT_DEBUG) { 253 // trace("Failed to evaluate selector: " ~ ex.getMessage()); 254 // } 255 // } 256 // catch (Throwable ex) { 257 // trace("Failed to evaluate selector", ex); 258 // } 259 // } 260 // }); 261 return result; 262 } 263 264 override 265 string toString() { 266 return "DefaultSubscriptionRegistry[" ~ this.destinationCache.toString() ~ 267 ", " ~ this.subscriptionRegistry.toString() ~ "]"; 268 } 269 270 271 /** 272 * A cache for destinations previously resolved via 273 * {@link DefaultSubscriptionRegistry#findSubscriptionsInternal(string, Message)}. 274 */ 275 private class DestinationCache { 276 277 /** Map from destination to {@code <sessionId, subscriptionId>} for fast look-ups. */ 278 private Map!(string, LinkedMultiValueMap!(string, string)) accessCache; 279 280 /** Map from destination to {@code <sessionId, subscriptionId>} with locking. */ 281 282 private Map!(string, LinkedMultiValueMap!(string, string)) updateCache; 283 284 285 this() { 286 accessCache = new HashMap!(string, LinkedMultiValueMap!(string, string))(DEFAULT_CACHE_LIMIT); 287 288 updateCache = 289 new class LinkedHashMap!(string, LinkedMultiValueMap!(string, string)) { 290 this() { 291 super(DEFAULT_CACHE_LIMIT, 0.75f, true); 292 } 293 294 override 295 protected bool removeEldestEntry(MapEntry!(string, LinkedMultiValueMap!(string, string)) eldest) { 296 if (size() > getCacheLimit()) { 297 accessCache.remove(eldest.getKey()); 298 return true; 299 } 300 else { 301 return false; 302 } 303 } 304 }; 305 } 306 307 308 LinkedMultiValueMap!(string, string) getSubscriptions(string destination, MessageBase message) { 309 LinkedMultiValueMap!(string, string) result = this.accessCache.get(destination); 310 if (result is null) { 311 synchronized (this.updateCache) { 312 result = new LinkedMultiValueMap!(string, string)(); 313 foreach (SessionSubscriptionInfo info ; subscriptionRegistry.getAllSubscriptions()) { 314 foreach (string destinationPattern ; info.getDestinations()) { 315 if (getPathMatcher().match(destinationPattern, destination)) { 316 foreach (Subscription sub ; info.getSubscriptions(destinationPattern)) { 317 result.add(info.sessionId, sub.getId()); 318 } 319 } 320 } 321 } 322 if (!result.isEmpty()) { 323 this.updateCache.put(destination, result.deepCopy()); 324 this.accessCache.put(destination, result); 325 } 326 } 327 } 328 return result; 329 } 330 331 void updateAfterNewSubscription(string destination, string sessionId, string subsId) { 332 synchronized (this.updateCache) { 333 foreach(string cachedDestination, 334 LinkedMultiValueMap!(string, string) subscriptions; this.updateCache) { 335 if (getPathMatcher().match(destination, cachedDestination)) { 336 // Subscription id's may also be populated via getSubscriptions() 337 List!(string) subsForSession = subscriptions.get(sessionId); 338 if (subsForSession is null || !subsForSession.contains(subsId)) { 339 subscriptions.add(sessionId, subsId); 340 this.accessCache.put(cachedDestination, subscriptions.deepCopy()); 341 } 342 } 343 } 344 } 345 } 346 347 void updateAfterRemovedSubscription(string sessionId, string subsId) { 348 synchronized (this.updateCache) { 349 Set!(string) destinationsToRemove = new HashSet!(string)(); 350 351 foreach(string destination, 352 LinkedMultiValueMap!(string, string) sessionMap; this.updateCache) { 353 List!(string) subscriptions = sessionMap.get(sessionId); 354 if (subscriptions !is null) { 355 subscriptions.remove(subsId); 356 if (subscriptions.isEmpty()) { 357 sessionMap.remove(sessionId); 358 } 359 if (sessionMap.isEmpty()) { 360 destinationsToRemove.add(destination); 361 } 362 else { 363 this.accessCache.put(destination, sessionMap.deepCopy()); 364 } 365 } 366 } 367 368 foreach (string destination ; destinationsToRemove) { 369 this.updateCache.remove(destination); 370 this.accessCache.remove(destination); 371 } 372 } 373 } 374 375 void updateAfterRemovedSession(SessionSubscriptionInfo info) { 376 synchronized (this.updateCache) { 377 Set!(string) destinationsToRemove = new HashSet!(string)(); 378 379 foreach(string destination, 380 LinkedMultiValueMap!(string, string) sessionMap; this.updateCache) { 381 if (sessionMap.remove(info.getSessionId()) !is null) { 382 if (sessionMap.isEmpty()) { 383 destinationsToRemove.add(destination); 384 } 385 else { 386 this.accessCache.put(destination, sessionMap.deepCopy()); 387 } 388 } 389 } 390 391 foreach (string destination ; destinationsToRemove) { 392 this.updateCache.remove(destination); 393 this.accessCache.remove(destination); 394 } 395 } 396 } 397 398 override 399 string toString() { 400 return "cache[" ~ this.accessCache.size().to!string() ~ " destination(s)]"; 401 } 402 } 403 404 405 } 406 407 408 /** 409 * Provide access to session subscriptions by sessionId. 410 */ 411 private static class SessionSubscriptionRegistry { 412 413 // sessionId -> SessionSubscriptionInfo 414 // private ConcurrentMap!(string, SessionSubscriptionInfo) sessions = new ConcurrentHashMap<>(); 415 private Map!(string, SessionSubscriptionInfo) sessions; 416 417 this() { 418 sessions = new HashMap!(string, SessionSubscriptionInfo)(); 419 } 420 421 422 SessionSubscriptionInfo getSubscriptions(string sessionId) { 423 return this.sessions.get(sessionId); 424 } 425 426 SessionSubscriptionInfo[] getAllSubscriptions() { 427 return this.sessions.values(); 428 } 429 430 SessionSubscriptionInfo addSubscription(string sessionId, string subscriptionId, 431 string destination) { 432 // string destination, Expression selectorExpression) { 433 434 SessionSubscriptionInfo info = this.sessions.get(sessionId); 435 if (info is null) { 436 info = new SessionSubscriptionInfo(sessionId); 437 SessionSubscriptionInfo value = this.sessions.putIfAbsent(sessionId, info); 438 if (value !is null) { 439 info = value; 440 } 441 } 442 // info.addSubscription(destination, subscriptionId, selectorExpression); 443 info.addSubscription(destination, subscriptionId); 444 return info; 445 } 446 447 448 SessionSubscriptionInfo removeSubscriptions(string sessionId) { 449 return this.sessions.remove(sessionId); 450 } 451 452 override 453 string toString() { 454 return "registry[" ~ this.sessions.size().to!string() ~ " sessions]"; 455 } 456 } 457 458 459 /** 460 * Hold subscriptions for a session. 461 */ 462 private class SessionSubscriptionInfo { 463 464 private string sessionId; 465 466 // destination -> subscriptions 467 private Map!(string, Set!(Subscription)) destinationLookup; 468 469 this(string sessionId) { 470 assert(sessionId, "'sessionId' must not be null"); 471 this.sessionId = sessionId; 472 destinationLookup = new HashMap!(string, Set!(Subscription))(4); // new ConcurrentHashMap<>(4); 473 } 474 475 string getSessionId() { 476 return this.sessionId; 477 } 478 479 string[] getDestinations() { 480 return this.destinationLookup.byKey.array; // .keySet(); 481 } 482 483 Set!(Subscription) getSubscriptions(string destination) { 484 return this.destinationLookup.get(destination); 485 } 486 487 488 Subscription getSubscription(string subscriptionId) { 489 // for (Map.Entry<string, Set<Subscription>> destinationEntry : 490 // this.destinationLookup.entrySet()) { 491 foreach(Set!(Subscription) value; this.destinationLookup.byValue()) { 492 foreach (Subscription sub ; value) { 493 if (icmp(sub.getId(), subscriptionId)) { 494 return sub; 495 } 496 } 497 } 498 return null; 499 } 500 501 // void addSubscription(string destination, string subscriptionId, Expression selectorExpression) { 502 void addSubscription(string destination, string subscriptionId) { 503 Set!(Subscription) subs = this.destinationLookup.get(destination); 504 if (subs is null) { 505 synchronized (this.destinationLookup) { 506 subs = this.destinationLookup.get(destination); 507 if (subs is null) { 508 // TODO: Tasks pending completion -@zxp at 10/31/2018, 1:41:41 PM 509 // 510 // subs = new CopyOnWriteArraySet<>(); 511 subs = new HashSet!(Subscription)(); 512 this.destinationLookup.put(destination, subs); 513 } 514 } 515 } 516 // subs.add(new Subscription(subscriptionId, selectorExpression)); 517 subs.add(new Subscription(subscriptionId)); 518 } 519 520 521 string removeSubscription(string subscriptionId) { 522 // for (Map.Entry<string, Set<DefaultSubscriptionRegistry.Subscription>> destinationEntry : 523 // this.destinationLookup.entrySet()) { 524 foreach(string key, Set!(Subscription) subs; this.destinationLookup) { 525 if (subs !is null) { 526 foreach (Subscription sub ; subs) { 527 if (sub.getId() == subscriptionId && subs.remove(sub)) { 528 synchronized (this.destinationLookup) { 529 if (subs.isEmpty()) { 530 this.destinationLookup.remove(key); 531 } 532 } 533 return key; 534 } 535 } 536 } 537 } 538 return null; 539 } 540 541 override 542 string toString() { 543 return "[sessionId=" ~ this.sessionId ~ ", subscriptions=" ~ this.destinationLookup.toString() ~ "]"; 544 } 545 } 546 547 548 private final class Subscription { 549 550 private string id; 551 552 // private Expression selectorExpression; 553 554 // this(string id, Expression selector) { 555 this(string id) { 556 assert(id, "Subscription id must not be null"); 557 this.id = id; 558 // this.selectorExpression = selector; 559 } 560 561 string getId() { 562 return this.id; 563 } 564 565 566 // Expression getSelectorExpression() { 567 // return this.selectorExpression; 568 // } 569 570 override 571 bool opEquals(Object other) { 572 if(this is other) 573 return true; 574 Subscription ot = cast(Subscription) other; 575 if(ot is null) 576 return false; 577 return this.id == ot.id; 578 } 579 580 override 581 size_t toHash() @trusted nothrow { 582 return hashOf(this.id); 583 } 584 585 override 586 string toString() { 587 return "subscription(id=" ~ this.id ~ ")"; 588 } 589 } 590 591 592 // private class SimpMessageHeaderPropertyAccessor : PropertyAccessor { 593 594 // override 595 // Class<?>[] getSpecificTargetClasses() { 596 // return new Class<?>[] {Message.class, MessageHeaders.class}; 597 // } 598 599 // override 600 // canRead(EvaluationContext context, Object target, string name) { 601 // return true; 602 // } 603 604 // override 605 // TypedValue read(EvaluationContext context, Object target, string name) { 606 // Object value; 607 // if (target instanceof Message) { 608 // value = name.equals("headers") ? ((Message) target).getHeaders() : null; 609 // } 610 // else if (target instanceof MessageHeaders) { 611 // MessageHeaders headers = (MessageHeaders) target; 612 // SimpMessageHeaderAccessor accessor = 613 // MessageHeaderAccessor.getAccessor(headers, SimpMessageHeaderAccessor.class); 614 // assert(accessor !is null, "No SimpMessageHeaderAccessor"); 615 // if ("destination".equalsIgnoreCase(name)) { 616 // value = accessor.getDestination(); 617 // } 618 // else { 619 // value = accessor.getFirstNativeHeader(name); 620 // if (value is null) { 621 // value = headers.get(name); 622 // } 623 // } 624 // } 625 // else { 626 // // Should never happen... 627 // throw new IllegalStateException("Expected Message or MessageHeaders."); 628 // } 629 // return new TypedValue(value); 630 // } 631 632 // override 633 // canWrite(EvaluationContext context, Object target, string name) { 634 // return false; 635 // } 636 637 // override 638 // void write(EvaluationContext context, Object target, string name, Object value) { 639 // } 640 // }