1 /*
2  * Copyright 2002-2018 the original author or authors.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 module hunt.stomp.simp.broker.DefaultSubscriptionRegistry;
18 
19 import hunt.stomp.simp.broker.AbstractSubscriptionRegistry;
20 import hunt.stomp.simp.broker.SubscriptionRegistry;
21 
22 import hunt.collection;
23 import hunt.Exceptions;
24 import hunt.logging;
25 import hunt.text.PathMatcher;
26 
27 import std.array;
28 import std.conv;
29 import std.string;
30 
31 // import java.util.concurrent.ConcurrentHashMap;
32 // import java.util.concurrent.ConcurrentMap;
33 // import java.util.concurrent.CopyOnWriteArraySet;
34 
35 // import hunt.framework.expression.EvaluationContext;
36 // import hunt.framework.expression.Expression;
37 // import hunt.framework.expression.ExpressionParser;
38 // import hunt.framework.expression.PropertyAccessor;
39 // import hunt.framework.expression.TypedValue;
40 // import hunt.framework.expression.spel.SpelEvaluationException;
41 // import hunt.framework.expression.spel.standard.SpelExpressionParser;
42 // import hunt.framework.expression.spel.support.SimpleEvaluationContext;
43 
44 import hunt.stomp.Message;
45 import hunt.stomp.MessageHeaders;
46 import hunt.stomp.simp.SimpMessageHeaderAccessor;
47 import hunt.stomp.support.MessageHeaderAccessor;
48 // import hunt.framework.util.AntPathMatcher;
49 
50 // import hunt.framework.util.LinkedMultiValueMap;
51 // import hunt.framework.util.MultiValueMap;
52 // import hunt.framework.util.PathMatcher;
53 // import hunt.framework.util.StringUtils;
54 
55 /**
56  * Implementation of {@link SubscriptionRegistry} that stores subscriptions
57  * in memory and uses a {@link hunt.framework.util.PathMatcher PathMatcher}
58  * for matching destinations.
59  *
60  * <p>As of 4.2, this class supports a {@link #setSelectorHeaderName selector}
61  * header on subscription messages with Spring EL expressions evaluated against
62  * the headers to filter out messages in addition to destination matching.
63  *
64  * @author Rossen Stoyanchev
65  * @author Sebastien Deleuze
66  * @author Juergen Hoeller
67  * @since 4.0
68  */
69 class DefaultSubscriptionRegistry : AbstractSubscriptionRegistry {
70 
71 	/** Default maximum number of entries for the destination cache: 1024. */
72 	enum int DEFAULT_CACHE_LIMIT = 1024;
73 
74 	/** Static evaluation context to reuse. */
75 	// private __gshared EvaluationContext messageEvalContext;
76 
77 
78 	private PathMatcher pathMatcher;
79 
80 	private int cacheLimit = DEFAULT_CACHE_LIMIT;
81 	
82 	private string selectorHeaderName = "selector";
83 
84 	private bool selectorHeaderInUse = false;
85 
86 	// private ExpressionParser expressionParser;
87 
88 	private DestinationCache destinationCache;
89 
90 	private SessionSubscriptionRegistry subscriptionRegistry;
91 
92     // shared static this() {
93     //     messageEvalContext =
94 	// 		SimpleEvaluationContext.forPropertyAccessors(new SimpMessageHeaderPropertyAccessor()).build();
95     // }
96 
97     this() {
98         pathMatcher = new AntPathMatcher();
99         // expressionParser = new SpelExpressionParser();
100         destinationCache = new DestinationCache();
101         subscriptionRegistry = new SessionSubscriptionRegistry();
102     }
103 
104 
105 	/**
106 	 * Specify the {@link PathMatcher} to use.
107 	 */
108 	void setPathMatcher(PathMatcher pathMatcher) {
109 		this.pathMatcher = pathMatcher;
110 	}
111 
112 	/**
113 	 * Return the configured {@link PathMatcher}.
114 	 */
115 	PathMatcher getPathMatcher() {
116 		return this.pathMatcher;
117 	}
118 
119 	/**
120 	 * Specify the maximum number of entries for the resolved destination cache.
121 	 * Default is 1024.
122 	 */
123 	void setCacheLimit(int cacheLimit) {
124 		this.cacheLimit = cacheLimit;
125 	}
126 
127 	/**
128 	 * Return the maximum number of entries for the resolved destination cache.
129 	 */
130 	int getCacheLimit() {
131 		return this.cacheLimit;
132 	}
133 
134 	/**
135 	 * Configure the name of a header that a subscription message can have for
136 	 * the purpose of filtering messages matched to the subscription. The header
137 	 * value is expected to be a Spring EL  expression to be applied to
138 	 * the headers of messages matched to the subscription.
139 	 * <p>For example:
140 	 * <pre>
141 	 * headers.foo == 'bar'
142 	 * </pre>
143 	 * <p>By default this is set to "selector". You can set it to a different
144 	 * name, or to {@code null} to turn off support for a selector header.
145 	 * @param selectorHeaderName the name to use for a selector header
146 	 * @since 4.2
147 	 */
148 	void setSelectorHeaderName(string selectorHeaderName) {
149 		this.selectorHeaderName = selectorHeaderName;
150 	}
151 
152 	/**
153 	 * Return the name for the selector header name.
154 	 * @since 4.2
155 	 */
156 	
157 	string getSelectorHeaderName() {
158 		return this.selectorHeaderName;
159 	}
160 
161 
162 	override
163 	protected void addSubscriptionInternal(
164 			string sessionId, string subsId, string destination, MessageBase message) {
165 
166 		// Expression expression = getSelectorExpression(message.getHeaders());
167 		// this.subscriptionRegistry.addSubscription(sessionId, subsId, destination, expression);
168         this.subscriptionRegistry.addSubscription(sessionId, subsId, destination);
169 		this.destinationCache.updateAfterNewSubscription(destination, sessionId, subsId);
170 
171 		infof("Subscription added: id=%s, subsId=%s, destination=%s", 
172 			sessionId, subsId, destination);
173 	}
174 
175 	
176 	// private Expression getSelectorExpression(MessageHeaders headers) {
177 	// 	Expression expression = null;
178 	// 	if (getSelectorHeaderName() !is null) {
179 	// 		string selector = SimpMessageHeaderAccessor.getFirstNativeHeader(getSelectorHeaderName(), headers);
180 	// 		if (selector !is null) {
181 	// 			try {
182 	// 				expression = this.expressionParser.parseExpression(selector);
183 	// 				this.selectorHeaderInUse = true;
184 	// 				version(HUNT_DEBUG) {
185 	// 					trace("Subscription selector: [" ~ selector ~ "]");
186 	// 				}
187 	// 			}
188 	// 			catch (Throwable ex) {
189 	// 				version(HUNT_DEBUG) {
190 	// 					trace("Failed to parse selector: " ~ selector, ex);
191 	// 				}
192 	// 			}
193 	// 		}
194 	// 	}
195 	// 	return expression;
196 	// }
197 
198 	override
199 	protected void removeSubscriptionInternal(string sessionId, string subsId, MessageBase message) {
200 		SessionSubscriptionInfo info = this.subscriptionRegistry.getSubscriptions(sessionId);
201 		if (info !is null) {
202 			string destination = info.removeSubscription(subsId);
203 			if (destination !is null) {
204 				this.destinationCache.updateAfterRemovedSubscription(sessionId, subsId);
205 			}
206 		}
207 	}
208 
209 	// override
210 	void unregisterAllSubscriptions(string sessionId) {
211 		SessionSubscriptionInfo info = this.subscriptionRegistry.removeSubscriptions(sessionId);
212 		if (info !is null) {
213 			this.destinationCache.updateAfterRemovedSession(info);
214 		}
215 	}
216 
217 	override
218 	protected MultiValueMap!(string, string) findSubscriptionsInternal(string destination, MessageBase message) {
219 		MultiValueMap!(string, string) result = this.destinationCache.getSubscriptions(destination, message);
220 		return filterSubscriptions(result, message);
221 	}
222 
223 	private MultiValueMap!(string, string) filterSubscriptions(
224 			MultiValueMap!(string, string) allMatches, MessageBase message) {
225 
226 		if (!this.selectorHeaderInUse) {
227 			return allMatches;
228 		}
229 		MultiValueMap!(string, string) result = new LinkedMultiValueMap!(string, string)(allMatches.size());
230         implementationMissing(false);
231 		// allMatches.forEach((sessionId, subIds) -> {
232 		// 	for (string subId : subIds) {
233 		// 		SessionSubscriptionInfo info = this.subscriptionRegistry.getSubscriptions(sessionId);
234 		// 		if (info is null) {
235 		// 			continue;
236 		// 		}
237 		// 		Subscription sub = info.getSubscription(subId);
238 		// 		if (sub is null) {
239 		// 			continue;
240 		// 		}
241 		// 		Expression expression = sub.getSelectorExpression();
242 		// 		if (expression is null) {
243 		// 			result.add(sessionId, subId);
244 		// 			continue;
245 		// 		}
246 		// 		try {
247 		// 			if (Boolean.TRUE.equals(expression.getValue(messageEvalContext, message, Boolean.class))) {
248 		// 				result.add(sessionId, subId);
249 		// 			}
250 		// 		}
251 		// 		catch (SpelEvaluationException ex) {
252 		// 			version(HUNT_DEBUG) {
253 		// 				trace("Failed to evaluate selector: " ~ ex.getMessage());
254 		// 			}
255 		// 		}
256 		// 		catch (Throwable ex) {
257 		// 			trace("Failed to evaluate selector", ex);
258 		// 		}
259 		// 	}
260 		// });
261 		return result;
262 	}
263 
264 	override
265 	string toString() {
266 		return "DefaultSubscriptionRegistry[" ~ this.destinationCache.toString() ~ 
267 			", " ~ this.subscriptionRegistry.toString() ~ "]";
268 	}
269 
270 
271 	/**
272 	 * A cache for destinations previously resolved via
273 	 * {@link DefaultSubscriptionRegistry#findSubscriptionsInternal(string, Message)}.
274 	 */
275 	private class DestinationCache {
276 
277 		/** Map from destination to {@code <sessionId, subscriptionId>} for fast look-ups. */
278 		private Map!(string, LinkedMultiValueMap!(string, string)) accessCache;
279 
280 		/** Map from destination to {@code <sessionId, subscriptionId>} with locking. */
281 		
282 		private Map!(string, LinkedMultiValueMap!(string, string)) updateCache;
283 
284 
285         this() {
286             accessCache = new HashMap!(string, LinkedMultiValueMap!(string, string))(DEFAULT_CACHE_LIMIT);
287 
288             updateCache =
289 				new class LinkedHashMap!(string, LinkedMultiValueMap!(string, string)) {
290 					this() {
291 						super(DEFAULT_CACHE_LIMIT, 0.75f, true);
292 					}
293 
294 					override
295 					protected bool removeEldestEntry(MapEntry!(string, LinkedMultiValueMap!(string, string)) eldest) {
296 						if (size() > getCacheLimit()) {
297 							accessCache.remove(eldest.getKey());
298 							return true;
299 						}
300 						else {
301 							return false;
302 						}
303 					}
304 				};            
305         }
306 
307 
308 		LinkedMultiValueMap!(string, string) getSubscriptions(string destination, MessageBase message) {
309 			LinkedMultiValueMap!(string, string) result = this.accessCache.get(destination);
310 			if (result is null) {
311 				synchronized (this.updateCache) {
312 					result = new LinkedMultiValueMap!(string, string)();
313 					foreach (SessionSubscriptionInfo info ; subscriptionRegistry.getAllSubscriptions()) {
314 						foreach (string destinationPattern ; info.getDestinations()) {
315 							if (getPathMatcher().match(destinationPattern, destination)) {
316 								foreach (Subscription sub ; info.getSubscriptions(destinationPattern)) {
317 									result.add(info.sessionId, sub.getId());
318 								}
319 							}
320 						}
321 					}
322 					if (!result.isEmpty()) {
323 						this.updateCache.put(destination, result.deepCopy());
324 						this.accessCache.put(destination, result);
325 					}
326 				}
327 			}
328 			return result;
329 		}
330 
331 		void updateAfterNewSubscription(string destination, string sessionId, string subsId) {
332 			synchronized (this.updateCache) {
333 				foreach(string cachedDestination, 
334 					LinkedMultiValueMap!(string, string) subscriptions; this.updateCache) {
335 					if (getPathMatcher().match(destination, cachedDestination)) {
336 						// Subscription id's may also be populated via getSubscriptions()
337 						List!(string) subsForSession = subscriptions.get(sessionId);
338 						if (subsForSession is null || !subsForSession.contains(subsId)) {
339 							subscriptions.add(sessionId, subsId);
340 							this.accessCache.put(cachedDestination, subscriptions.deepCopy());
341 						}
342 					}
343 				}
344 			}
345 		}
346 
347 		void updateAfterRemovedSubscription(string sessionId, string subsId) {
348 			synchronized (this.updateCache) {
349 				Set!(string) destinationsToRemove = new HashSet!(string)();
350 
351 				foreach(string destination, 
352 					LinkedMultiValueMap!(string, string) sessionMap; this.updateCache) {
353 					List!(string) subscriptions = sessionMap.get(sessionId);
354 					if (subscriptions !is null) {
355 						subscriptions.remove(subsId);
356 						if (subscriptions.isEmpty()) {
357 							sessionMap.remove(sessionId);
358 						}
359 						if (sessionMap.isEmpty()) {
360 							destinationsToRemove.add(destination);
361 						}
362 						else {
363 							this.accessCache.put(destination, sessionMap.deepCopy());
364 						}
365 					}
366 				}
367 
368 				foreach (string destination ; destinationsToRemove) {
369 					this.updateCache.remove(destination);
370 					this.accessCache.remove(destination);
371 				}
372 			}
373 		}
374 
375 		void updateAfterRemovedSession(SessionSubscriptionInfo info) {
376 			synchronized (this.updateCache) {
377 				Set!(string) destinationsToRemove = new HashSet!(string)();
378 
379 				foreach(string destination, 
380 					LinkedMultiValueMap!(string, string) sessionMap; this.updateCache) {
381 					if (sessionMap.remove(info.getSessionId()) !is null) {
382 						if (sessionMap.isEmpty()) {
383 							destinationsToRemove.add(destination);
384 						}
385 						else {
386 							this.accessCache.put(destination, sessionMap.deepCopy());
387 						}
388 					}
389 				}
390 
391 				foreach (string destination ; destinationsToRemove) {
392 					this.updateCache.remove(destination);
393 					this.accessCache.remove(destination);
394 				}
395 			}
396 		}
397 
398 		override
399 		string toString() {
400 			return "cache[" ~ this.accessCache.size().to!string() ~ " destination(s)]";
401 		}
402 	}
403 
404 
405 }
406 
407 
408 /**
409  * Provide access to session subscriptions by sessionId.
410  */
411 private static class SessionSubscriptionRegistry {
412 
413     // sessionId -> SessionSubscriptionInfo
414     // private ConcurrentMap!(string, SessionSubscriptionInfo) sessions = new ConcurrentHashMap<>();
415     private Map!(string, SessionSubscriptionInfo) sessions;
416 
417     this() {
418         sessions = new HashMap!(string, SessionSubscriptionInfo)();
419     }
420 
421     
422     SessionSubscriptionInfo getSubscriptions(string sessionId) {
423         return this.sessions.get(sessionId);
424     }
425 
426     SessionSubscriptionInfo[] getAllSubscriptions() {
427         return this.sessions.values();
428     }
429 
430     SessionSubscriptionInfo addSubscription(string sessionId, string subscriptionId,
431         string destination) {
432             // string destination, Expression selectorExpression) {
433 
434         SessionSubscriptionInfo info = this.sessions.get(sessionId);
435         if (info is null) {
436             info = new SessionSubscriptionInfo(sessionId);
437             SessionSubscriptionInfo value = this.sessions.putIfAbsent(sessionId, info);
438             if (value !is null) {
439                 info = value;
440             }
441         }
442         // info.addSubscription(destination, subscriptionId, selectorExpression);
443         info.addSubscription(destination, subscriptionId);
444         return info;
445     }
446 
447     
448     SessionSubscriptionInfo removeSubscriptions(string sessionId) {
449         return this.sessions.remove(sessionId);
450     }
451 
452     override
453     string toString() {
454         return "registry[" ~ this.sessions.size().to!string() ~ " sessions]";
455     }
456 }
457 
458 
459 /**
460  * Hold subscriptions for a session.
461  */
462 private class SessionSubscriptionInfo {
463 
464     private string sessionId;
465 
466     // destination -> subscriptions
467     private Map!(string, Set!(Subscription)) destinationLookup;
468 
469     this(string sessionId) {
470         assert(sessionId, "'sessionId' must not be null");
471         this.sessionId = sessionId;
472         destinationLookup = new HashMap!(string, Set!(Subscription))(4); // new ConcurrentHashMap<>(4);
473     }
474 
475     string getSessionId() {
476         return this.sessionId;
477     }
478 
479     string[] getDestinations() {
480         return this.destinationLookup.byKey.array; // .keySet();
481     }
482 
483     Set!(Subscription) getSubscriptions(string destination) {
484         return this.destinationLookup.get(destination);
485     }
486 
487     
488     Subscription getSubscription(string subscriptionId) {
489         // for (Map.Entry<string, Set<Subscription>> destinationEntry :
490         //         this.destinationLookup.entrySet()) {
491         foreach(Set!(Subscription) value; this.destinationLookup.byValue())  {            
492             foreach (Subscription sub ; value) {
493                 if (icmp(sub.getId(), subscriptionId)) {
494                     return sub;
495                 }
496             }
497         }
498         return null;
499     }
500 
501     // void addSubscription(string destination, string subscriptionId, Expression selectorExpression) {
502     void addSubscription(string destination, string subscriptionId) {
503         Set!(Subscription) subs = this.destinationLookup.get(destination);
504         if (subs is null) {
505             synchronized (this.destinationLookup) {
506                 subs = this.destinationLookup.get(destination);
507                 if (subs is null) {
508                     // TODO: Tasks pending completion -@zxp at 10/31/2018, 1:41:41 PM
509                     // 
510                     // subs = new CopyOnWriteArraySet<>();
511                     subs = new HashSet!(Subscription)();
512                     this.destinationLookup.put(destination, subs);
513                 }
514             }
515         }
516         // subs.add(new Subscription(subscriptionId, selectorExpression));
517         subs.add(new Subscription(subscriptionId));
518     }
519 
520     
521     string removeSubscription(string subscriptionId) {
522         // for (Map.Entry<string, Set<DefaultSubscriptionRegistry.Subscription>> destinationEntry :
523         //         this.destinationLookup.entrySet()) {
524         foreach(string key, Set!(Subscription) subs; this.destinationLookup)  {
525             if (subs !is null) {
526                 foreach (Subscription sub ; subs) {
527                     if (sub.getId() == subscriptionId && subs.remove(sub)) {
528                         synchronized (this.destinationLookup) {
529                             if (subs.isEmpty()) {
530                                 this.destinationLookup.remove(key);
531                             }
532                         }
533                         return key;
534                     }
535                 }
536             }
537         }
538         return null;
539     }
540 
541     override
542     string toString() {
543         return "[sessionId=" ~ this.sessionId ~ ", subscriptions=" ~ this.destinationLookup.toString() ~ "]";
544     }
545 }
546 
547 
548 private final class Subscription {
549 
550     private string id;
551     
552     // private Expression selectorExpression;
553 
554     // this(string id, Expression selector) {
555     this(string id) {        
556         assert(id, "Subscription id must not be null");
557         this.id = id;
558         // this.selectorExpression = selector;
559     }
560 
561     string getId() {
562         return this.id;
563     }
564 
565     
566     // Expression getSelectorExpression() {
567     //     return this.selectorExpression;
568     // }
569 
570     override
571     bool opEquals(Object other) {
572         if(this is other)
573             return true;
574         Subscription ot = cast(Subscription) other;
575         if(ot is null)
576             return false;
577         return this.id == ot.id;
578     }
579 
580     override
581     size_t toHash() @trusted nothrow {
582         return hashOf(this.id);
583     }
584 
585     override
586     string toString() {
587         return "subscription(id=" ~ this.id ~ ")";
588     }
589 }
590 
591 
592 // private class SimpMessageHeaderPropertyAccessor : PropertyAccessor {
593 
594 //     override
595 //     Class<?>[] getSpecificTargetClasses() {
596 //         return new Class<?>[] {Message.class, MessageHeaders.class};
597 //     }
598 
599 //     override
600 //         canRead(EvaluationContext context, Object target, string name) {
601 //         return true;
602 //     }
603 
604 //     override
605 //     TypedValue read(EvaluationContext context, Object target, string name) {
606 //         Object value;
607 //         if (target instanceof Message) {
608 //             value = name.equals("headers") ? ((Message) target).getHeaders() : null;
609 //         }
610 //         else if (target instanceof MessageHeaders) {
611 //             MessageHeaders headers = (MessageHeaders) target;
612 //             SimpMessageHeaderAccessor accessor =
613 //                     MessageHeaderAccessor.getAccessor(headers, SimpMessageHeaderAccessor.class);
614 //             assert(accessor !is null, "No SimpMessageHeaderAccessor");
615 //             if ("destination".equalsIgnoreCase(name)) {
616 //                 value = accessor.getDestination();
617 //             }
618 //             else {
619 //                 value = accessor.getFirstNativeHeader(name);
620 //                 if (value is null) {
621 //                     value = headers.get(name);
622 //                 }
623 //             }
624 //         }
625 //         else {
626 //             // Should never happen...
627 //             throw new IllegalStateException("Expected Message or MessageHeaders.");
628 //         }
629 //         return new TypedValue(value);
630 //     }
631 
632 //     override
633 //         canWrite(EvaluationContext context, Object target, string name) {
634 //         return false;
635 //     }
636 
637 //     override
638 //     void write(EvaluationContext context, Object target, string name, Object value) {
639 //     }
640 // }