1 /*
2  * Copyright 2002-2018 the original author or authors.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 module hunt.stomp.simp.broker.SimpleBrokerMessageHandler;
18 
19 import hunt.stomp.simp.broker.AbstractBrokerMessageHandler;
20 import hunt.stomp.simp.broker.DefaultSubscriptionRegistry;
21 import hunt.stomp.simp.broker.SubscriptionRegistry;
22 
23 import hunt.stomp.Message;
24 import hunt.stomp.MessageChannel;
25 import hunt.stomp.MessageHeaders;
26 import hunt.stomp.simp.SimpMessageHeaderAccessor;
27 import hunt.stomp.simp.SimpMessageType;
28 import hunt.stomp.support.GenericMessage;
29 import hunt.stomp.support.MessageBuilder;
30 import hunt.stomp.support.MessageHeaderAccessor;
31 
32 // import hunt.framework.task.TaskScheduler;
33 
34 import hunt.collection;
35 import hunt.util.DateTime;
36 import hunt.util.Common;
37 import hunt.Exceptions;
38 import hunt.logging;
39 import hunt.text.PathMatcher;
40 
41 // dfmt off
42 version(Have_hunt_security) {
43     import hunt.security.Principal;
44 }
45 // dfmt on
46 
47 import std.algorithm;
48 import std.conv;
49 
50 /**
51  * A "simple" message broker that recognizes the message types defined in
52  * {@link SimpMessageType}, keeps track of subscriptions with the help of a
53  * {@link SubscriptionRegistry} and sends messages to subscribers.
54  *
55  * @author Rossen Stoyanchev
56  * @author Juergen Hoeller
57  * @since 4.0
58  */
59 class SimpleBrokerMessageHandler : AbstractBrokerMessageHandler {
60 
61 	private enum byte[] EMPTY_PAYLOAD = [];
62 
63 	private PathMatcher pathMatcher;
64 	
65 	private int cacheLimit;
66 	
67 	private string selectorHeaderName = "selector";
68 	
69 	// private TaskScheduler taskScheduler;
70 	
71 	private long[] heartbeatValue;
72 	
73 	private MessageHeaderInitializer headerInitializer;
74 
75 	private SubscriptionRegistry subscriptionRegistry;
76 
77 	private Map!(string, SessionInfo) sessions;
78 
79 	
80 	// private ScheduledFuture<?> heartbeatFuture;
81 
82 
83 	/**
84 	 * Create a SimpleBrokerMessageHandler instance with the given message channels
85 	 * and destination prefixes.
86 	 * @param clientInboundChannel the channel for receiving messages from clients (e.g. WebSocket clients)
87 	 * @param clientOutboundChannel the channel for sending messages to clients (e.g. WebSocket clients)
88 	 * @param brokerChannel the channel for the application to send messages to the broker
89 	 * @param destinationPrefixes prefixes to use to filter out messages
90 	 */
91 	this(SubscribableChannel clientInboundChannel, MessageChannel clientOutboundChannel,
92 			SubscribableChannel brokerChannel, string[] destinationPrefixes) {
93 
94 		super(clientInboundChannel, clientOutboundChannel, brokerChannel, destinationPrefixes);
95 		sessions = new HashMap!(string, SessionInfo)();
96 		this.subscriptionRegistry = new DefaultSubscriptionRegistry();
97 	}
98 
99 
100 	/**
101 	 * Configure a custom SubscriptionRegistry to use for storing subscriptions.
102 	 * <p><strong>Note</strong> that when a custom PathMatcher is configured via
103 	 * {@link #setPathMatcher}, if the custom registry is not an instance of
104 	 * {@link DefaultSubscriptionRegistry}, the provided PathMatcher is not used
105 	 * and must be configured directly on the custom registry.
106 	 */
107 	void setSubscriptionRegistry(SubscriptionRegistry subscriptionRegistry) {
108 		assert(subscriptionRegistry, "SubscriptionRegistry must not be null");
109 		this.subscriptionRegistry = subscriptionRegistry;
110 		initPathMatcherToUse();
111 		initCacheLimitToUse();
112 		initSelectorHeaderNameToUse();
113 	}
114 
115 	SubscriptionRegistry getSubscriptionRegistry() {
116 		return this.subscriptionRegistry;
117 	}
118 
119 	/**
120 	 * When configured, the given PathMatcher is passed down to the underlying
121 	 * SubscriptionRegistry to use for matching destination to subscriptions.
122 	 * <p>Default is a standard {@link hunt.framework.util.AntPathMatcher}.
123 	 * @since 4.1
124 	 * @see #setSubscriptionRegistry
125 	 * @see DefaultSubscriptionRegistry#setPathMatcher
126 	 * @see hunt.framework.util.AntPathMatcher
127 	 */
128 	void setPathMatcher(PathMatcher pathMatcher) {
129 		this.pathMatcher = pathMatcher;
130 		initPathMatcherToUse();
131 	}
132 
133 	private void initPathMatcherToUse() {
134 		auto s = cast(DefaultSubscriptionRegistry) this.subscriptionRegistry;
135 		if (this.pathMatcher !is null && s !is null) {
136 			s.setPathMatcher(this.pathMatcher);
137 		}
138 	}
139 
140 	/**
141 	 * When configured, the specified cache limit is passed down to the
142 	 * underlying SubscriptionRegistry, overriding any default there.
143 	 * <p>With a standard {@link DefaultSubscriptionRegistry}, the default
144 	 * cache limit is 1024.
145 	 * @since 4.3.2
146 	 * @see #setSubscriptionRegistry
147 	 * @see DefaultSubscriptionRegistry#setCacheLimit
148 	 * @see DefaultSubscriptionRegistry#DEFAULT_CACHE_LIMIT
149 	 */
150 	void setCacheLimit(int cacheLimit) {
151 		this.cacheLimit = cacheLimit;
152 		initCacheLimitToUse();
153 	}
154 
155 	private void initCacheLimitToUse() {
156 		auto s = cast(DefaultSubscriptionRegistry) this.subscriptionRegistry;
157 		if (s !is null) {
158 			s.setCacheLimit(this.cacheLimit);
159 		}
160 	}
161 
162 	/**
163 	 * Configure the name of a header that a subscription message can have for
164 	 * the purpose of filtering messages matched to the subscription. The header
165 	 * value is expected to be a Spring EL  expression to be applied to
166 	 * the headers of messages matched to the subscription.
167 	 * <p>For example:
168 	 * <pre>
169 	 * headers.foo == 'bar'
170 	 * </pre>
171 	 * <p>By default this is set to "selector". You can set it to a different
172 	 * name, or to {@code null} to turn off support for a selector header.
173 	 * @param selectorHeaderName the name to use for a selector header
174 	 * @since 4.3.17
175 	 * @see #setSubscriptionRegistry
176 	 * @see DefaultSubscriptionRegistry#setSelectorHeaderName(string)
177 	 */
178 	void setSelectorHeaderName(string selectorHeaderName) {
179 		this.selectorHeaderName = selectorHeaderName;
180 		initSelectorHeaderNameToUse();
181 	}
182 
183 	private void initSelectorHeaderNameToUse() {
184 		auto s = cast(DefaultSubscriptionRegistry) this.subscriptionRegistry;
185 		if (s !is null) {
186 			s.setSelectorHeaderName(this.selectorHeaderName);
187 		}
188 	}
189 
190 	/**
191 	 * Configure the {@link hunt.framework.scheduling.TaskScheduler} to
192 	 * use for providing heartbeat support. Setting this property also sets the
193 	 * {@link #setHeartbeatValue heartbeatValue} to "10000, 10000".
194 	 * <p>By default this is not set.
195 	 * @since 4.2
196 	 */
197 	// void setTaskScheduler(TaskScheduler taskScheduler) {
198 	// 	this.taskScheduler = taskScheduler;
199 	// 	if (taskScheduler !is null && this.heartbeatValue is null) {
200 	// 		this.heartbeatValue = [10000, 10000];
201 	// 	}
202 	// }
203 
204 	/**
205 	 * Return the configured TaskScheduler.
206 	 * @since 4.2
207 	 */
208 	
209 	// TaskScheduler getTaskScheduler() {
210 	// 	return this.taskScheduler;
211 	// }
212 
213 	/**
214 	 * Configure the value for the heart-beat settings. The first number
215 	 * represents how often the server will write or send a heartbeat.
216 	 * The second is how often the client should write. 0 means no heartbeats.
217 	 * <p>By default this is set to "0, 0" unless the {@link #setTaskScheduler
218 	 * taskScheduler} in which case the default becomes "10000,10000"
219 	 * (in milliseconds).
220 	 * @since 4.2
221 	 */
222 	void setHeartbeatValue(long[] heartbeat) {
223 		if (heartbeat !is null && (heartbeat.length != 2 || heartbeat[0] < 0 || heartbeat[1] < 0)) {
224 			throw new IllegalArgumentException("Invalid heart-beat: " ~ heartbeat.to!string());
225 		}
226 		this.heartbeatValue = heartbeat;
227 	}
228 
229 	/**
230 	 * The configured value for the heart-beat settings.
231 	 * @since 4.2
232 	 */
233 	
234 	long[] getHeartbeatValue() {
235 		return this.heartbeatValue;
236 	}
237 
238 	/**
239 	 * Configure a {@link MessageHeaderInitializer} to apply to the headers
240 	 * of all messages sent to the client outbound channel.
241 	 * <p>By default this property is not set.
242 	 * @since 4.1
243 	 */
244 	void setHeaderInitializer(MessageHeaderInitializer headerInitializer) {
245 		this.headerInitializer = headerInitializer;
246 	}
247 
248 	/**
249 	 * Return the configured header initializer.
250 	 * @since 4.1
251 	 */
252 	
253 	MessageHeaderInitializer getHeaderInitializer() {
254 		return this.headerInitializer;
255 	}
256 
257 
258 	override
259 	void startInternal() {
260 		publishBrokerAvailableEvent();
261 		// implementationMissing(false);
262 		// TODO: Tasks pending completion -@zxp at 11/13/2018, 3:18:33 PM
263 		// 
264 		// if (this.taskScheduler !is null) {
265 		// 	long interval = initHeartbeatTaskDelay();
266 		// 	if (interval > 0) {
267 		// 		// this.heartbeatFuture = this.taskScheduler.scheduleWithFixedDelay(new HeartbeatTask(), interval);
268 		// 		implementationMissing(false);
269 		// 	}
270 		// }
271 		// else {
272 		// 	assert(getHeartbeatValue() is null ||
273 		// 			(getHeartbeatValue()[0] == 0 && getHeartbeatValue()[1] == 0),
274 		// 			"Heartbeat values configured but no TaskScheduler provided");
275 		// }
276 	}
277 
278 	private long initHeartbeatTaskDelay() {
279 		if (getHeartbeatValue() is null) {
280 			return 0;
281 		}
282 		else if (getHeartbeatValue()[0] > 0 && getHeartbeatValue()[1] > 0) {
283 			return min(getHeartbeatValue()[0], getHeartbeatValue()[1]);
284 		}
285 		else {
286 			return (getHeartbeatValue()[0] > 0 ? getHeartbeatValue()[0] : getHeartbeatValue()[1]);
287 		}
288 	}
289 
290 	override
291 	void stopInternal() {
292 		publishBrokerUnavailableEvent();
293 		// if (this.heartbeatFuture !is null) {
294 		// 	this.heartbeatFuture.cancel(true);
295 		// }
296 	}
297 
298 	override
299 	protected void handleMessageInternal(MessageBase message) {
300 		MessageHeaders headers = message.getHeaders();
301 		version(HUNT_DEBUG) {
302 			trace(headers.toString());
303 		}
304 
305 		SimpMessageType messageType = SimpMessageHeaderAccessor.getMessageType(headers);
306 		string destination = SimpMessageHeaderAccessor.getDestination(headers);
307 		string sessionId = SimpMessageHeaderAccessor.getSessionId(headers);
308 
309 		updateSessionReadTime(sessionId);
310 
311 		if (!checkDestinationPrefix(destination)) {
312 			return;
313 		}
314 
315 		if (SimpMessageType.MESSAGE == messageType) {
316 			logMessage(message);
317 			sendMessageToSubscribers(destination, message);
318 		}
319 		else if (SimpMessageType.CONNECT == messageType) {
320 			logMessage(message);
321 			if (sessionId !is null) {
322 				long[] heartbeatIn = SimpMessageHeaderAccessor.getHeartbeat(headers);
323 				long[] heartbeatOut = getHeartbeatValue();
324 				// TODO: Tasks pending completion -@zxp at 10/31/2018, 4:39:26 PM
325 				// 
326 				version(Have_hunt_security) {
327 					Principal user = null; // SimpMessageHeaderAccessor.getUser(headers);
328 				}
329 			
330 				MessageChannel outChannel = getClientOutboundChannelForSession(sessionId);
331 
332 				// version(Have_hunt_security) {
333 				// 	this.sessions.put(sessionId, new SessionInfo(sessionId, user, outChannel, heartbeatIn, heartbeatOut));
334 				// } else {
335 				// 	this.sessions.put(sessionId, new SessionInfo(sessionId, outChannel, heartbeatIn, heartbeatOut));
336 				// }
337 
338 					this.sessions.put(sessionId, new SessionInfo(sessionId, outChannel, heartbeatIn, heartbeatOut));
339 
340 				SimpMessageHeaderAccessor connectAck = SimpMessageHeaderAccessor.create(SimpMessageType.CONNECT_ACK);
341 				initHeaders(connectAck);
342 				connectAck.setSessionId(sessionId);
343 				// if (user !is null) {
344 				// 	connectAck.setUser(user);
345 				// }
346 				connectAck.setHeader(SimpMessageHeaderAccessor.CONNECT_MESSAGE_HEADER, message);
347 				connectAck.setHeader(SimpMessageHeaderAccessor.HEART_BEAT_HEADER, heartbeatOut);
348 				MessageBase messageOut = MessageHelper.createMessage(EMPTY_PAYLOAD, connectAck.getMessageHeaders());
349 				info("responding ..........");
350 				getClientOutboundChannel().send(messageOut);
351 				info("responding ..........done");
352 			} else {
353 				warning("sessionId is null");
354 			}
355 		}
356 		else if (SimpMessageType.DISCONNECT == messageType) {
357 			logMessage(message);
358 			if (sessionId !is null) {
359 				// Principal user = SimpMessageHeaderAccessor.getUser(headers);
360 
361 				// version(Have_hunt_security) {
362 				// 	handleDisconnect(sessionId, null, message);
363 				// } else {
364 				// 	handleDisconnect(sessionId, message);
365 				// }
366 
367 					handleDisconnect(sessionId, message);				
368 			}
369 		}
370 		else if (SimpMessageType.SUBSCRIBE == messageType) {
371 			logMessage(message);
372 			this.subscriptionRegistry.registerSubscription(message);
373 		}
374 		else if (SimpMessageType.UNSUBSCRIBE == messageType) {
375 			logMessage(message);
376 			this.subscriptionRegistry.unregisterSubscription(message);
377 		}
378 	}
379 
380 	private void updateSessionReadTime(string sessionId) {
381 		if (sessionId !is null) {
382 			SessionInfo info = this.sessions.get(sessionId);
383 			if (info !is null) {
384 				info.setLastReadTime(DateTime.currentTimeMillis);
385 			}
386 		}
387 	}
388 
389 	private void logMessage(MessageBase message) {
390 		version(HUNT_DEBUG) {
391 			SimpMessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor!SimpMessageHeaderAccessor(message);
392 			accessor = (accessor !is null ? accessor : SimpMessageHeaderAccessor.wrap(message));
393 			import hunt.stomp.support.GenericMessage;
394 			import hunt.Nullable;
395 			GenericMessage!(byte[]) gm = cast(GenericMessage!(byte[]))message;
396 			if(gm is null) 
397 				trace("Processing " ~ typeid(cast(Object)message).name);
398 			else
399 				trace("Processing " ~ accessor.getShortLogMessage(new Nullable!(byte[])(gm.getPayload())));
400 		}
401 	}
402 
403 	private void initHeaders(SimpMessageHeaderAccessor accessor) {
404 		if (getHeaderInitializer() !is null) {
405 			getHeaderInitializer().initHeaders(accessor);
406 		}
407 	}
408 
409 	private void handleDisconnect(string sessionId, MessageBase origMessage) {
410 		this.sessions.remove(sessionId);
411 		this.subscriptionRegistry.unregisterAllSubscriptions(sessionId);
412 		SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.create(SimpMessageType.DISCONNECT_ACK);
413 		accessor.setSessionId(sessionId);
414 		// if (user !is null) {
415 		// 	accessor.setUser(user);
416 		// }
417 		if (origMessage !is null) {
418 			accessor.setHeader(SimpMessageHeaderAccessor.DISCONNECT_MESSAGE_HEADER, origMessage);
419 		}
420 		initHeaders(accessor);
421 		MessageBase message = MessageHelper.createMessage(EMPTY_PAYLOAD, accessor.getMessageHeaders());
422 		getClientOutboundChannel().send(message);
423 	}
424 
425 	protected void sendMessageToSubscribers(string destination, MessageBase message) {
426 		MultiValueMap!(string,string) subscriptions = this.subscriptionRegistry.findSubscriptions(message);
427 		version(HUNT_DEBUG) {
428 			if (!subscriptions.isEmpty()) {
429 				trace("Broadcasting to " ~ subscriptions.size().to!string() ~ " sessions.");
430 			}
431 		}
432 		long now = DateTime.currentTimeMillis();
433 		foreach(string sessionId, List!string subscriptionIds; subscriptions) {
434 			foreach (string subscriptionId ; subscriptionIds) {
435 				SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
436 				initHeaders(headerAccessor);
437 				headerAccessor.setSessionId(sessionId);
438 				headerAccessor.setSubscriptionId(subscriptionId);
439 				headerAccessor.copyHeadersIfAbsent(message.getHeaders());
440 				headerAccessor.setLeaveMutable(true);
441 				// warning(message.payloadType);
442 				auto gm = cast(GenericMessage!(string))message;
443 				if(gm is null) {
444 					warning("Can't cast message: %s", typeid(message));
445 					continue;
446 				}
447 				string payload = gm.getPayload();
448 				MessageBase reply = MessageHelper.createMessage(cast(byte[])payload, headerAccessor.getMessageHeaders());
449 				SessionInfo info = this.sessions.get(sessionId);
450 				if (info !is null) {
451 					try {
452 						info.getClientOutboundChannel().send(reply);
453 					}
454 					catch (Throwable ex) {
455 						errorf("Failed to send " ~ message.to!string()  ~ ": \n%s", ex.msg);
456 					}
457 					finally {
458 						info.setLastWriteTime(now);
459 					}
460 				}
461 			}
462 		}
463 	}
464 
465 
466 	int opCmp(MessageHandler o) {
467 		implementationMissing(false);
468 		return 0;
469 	}
470 
471 
472 	override
473 	string toString() {
474 		return "SimpleBrokerMessageHandler [" ~ this.subscriptionRegistry.to!string() ~ "]";
475 	}
476 
477 
478 	private class HeartbeatTask : Runnable {
479 
480 		override
481 		void run() {
482 			long now =DateTime.currentTimeMillis();
483 			foreach (SessionInfo info ; sessions.values()) {
484 				if (info.getReadInterval() > 0 && (now - info.getLastReadTime()) > info.getReadInterval()) {
485 
486 					// version(Have_hunt_security) {
487 					// 	handleDisconnect(info.getSessionId(), null, null); // info.getUser()
488 					// } else {
489 					// 	handleDisconnect(info.getSessionId(), null); // info.getUser()
490 					// }
491 					handleDisconnect(info.getSessionId(), null);
492 
493 				}
494 				if (info.getWriteInterval() > 0 && (now - info.getLastWriteTime()) > info.getWriteInterval()) {
495 					SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.create(SimpMessageType.HEARTBEAT);
496 					accessor.setSessionId(info.getSessionId());
497 					// TODO: Tasks pending completion -@zxp at 10/31/2018, 4:39:51 PM
498 					// 
499 					// Principal user = info.getUser();
500 					// if (user !is null) {
501 					// 	accessor.setUser(user);
502 					// }
503 					initHeaders(accessor);
504 					accessor.setLeaveMutable(true);
505 					MessageHeaders headers = accessor.getMessageHeaders();
506 					info.getClientOutboundChannel().send(MessageHelper.createMessage(EMPTY_PAYLOAD, headers));
507 				}
508 			}
509 		}
510 	}
511 
512 }
513 
514 
515 
516 private class SessionInfo {
517 
518 	/* STOMP spec: receiver SHOULD take into account an error margin */
519 	private enum long HEARTBEAT_MULTIPLIER = 3;
520 
521 	private string sessionId;
522 
523 	// private Principal user;
524 
525 	private MessageChannel clientOutboundChannel;
526 
527 	private long readInterval;
528 
529 	private long writeInterval;
530 
531 	private long lastReadTime;
532 
533 	private long lastWriteTime;
534 
535 	this(string sessionId, MessageChannel outboundChannel,
536 			long[] clientHeartbeat, long[] serverHeartbeat) { // Principal user, 
537 
538 		this.sessionId = sessionId;
539 		// this.user = user;
540 		this.clientOutboundChannel = outboundChannel;
541 		if (clientHeartbeat !is null && serverHeartbeat !is null) {
542 			this.readInterval = (clientHeartbeat[0] > 0 && serverHeartbeat[1] > 0 ?
543 					max(clientHeartbeat[0], serverHeartbeat[1]) * HEARTBEAT_MULTIPLIER : 0);
544 			this.writeInterval = (clientHeartbeat[1] > 0 && serverHeartbeat[0] > 0 ?
545 					max(clientHeartbeat[1], serverHeartbeat[0]) : 0);
546 		}
547 		else {
548 			this.readInterval = 0;
549 			this.writeInterval = 0;
550 		}
551 		this.lastReadTime = this.lastWriteTime =DateTime.currentTimeMillis();
552 	}
553 
554 	string getSessionId() {
555 		return this.sessionId;
556 	}
557 
558 	
559 	// Principal getUser() {
560 	// 	return this.user;
561 	// }
562 
563 	MessageChannel getClientOutboundChannel() {
564 		return this.clientOutboundChannel;
565 	}
566 
567 	long getReadInterval() {
568 		return this.readInterval;
569 	}
570 
571 	long getWriteInterval() {
572 		return this.writeInterval;
573 	}
574 
575 	long getLastReadTime() {
576 		return this.lastReadTime;
577 	}
578 
579 	void setLastReadTime(long lastReadTime) {
580 		this.lastReadTime = lastReadTime;
581 	}
582 
583 	long getLastWriteTime() {
584 		return this.lastWriteTime;
585 	}
586 
587 	void setLastWriteTime(long lastWriteTime) {
588 		this.lastWriteTime = lastWriteTime;
589 	}
590 }
591