1 /*
2  * Copyright 2002-2018 the original author or authors.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 module hunt.stomp.simp.broker.AbstractBrokerMessageHandler;
18 
19 import hunt.stomp.simp.broker.BrokerAvailabilityEvent;
20 import hunt.stomp.simp.broker.OrderedMessageSender;
21 
22 import hunt.util.ApplicationEvent;
23 // import hunt.framework.context.ApplicationEventPublisherAware;
24 import hunt.util.Lifecycle;
25 
26 import hunt.stomp.Message;
27 import hunt.stomp.MessageChannel;
28 import hunt.stomp.simp.SimpMessageHeaderAccessor;
29 import hunt.stomp.simp.SimpMessageType;
30 import hunt.stomp.support.ChannelInterceptor;
31 import hunt.stomp.support.InterceptableChannel;
32 
33 import hunt.util.Common;
34 import hunt.collection;
35 import hunt.logging;
36 
37 import core.atomic;
38 import std.array;
39 import std.conv;
40 import std.string;
41 
42 /**
43  * Abstract base class for a {@link MessageHandler} that broker messages to
44  * registered subscribers.
45  *
46  * @author Rossen Stoyanchev
47  * @since 4.0
48  */
49 abstract class AbstractBrokerMessageHandler : MessageHandler, SmartLifecycle { 
50 	// , ApplicationEventPublisherAware, 
51 
52 	private SubscribableChannel clientInboundChannel;
53 
54 	private MessageChannel clientOutboundChannel;
55 
56 	private SubscribableChannel brokerChannel;
57 
58 	private string[] destinationPrefixes;
59 
60 	private bool preservePublishOrder = false;
61 	
62 	private ApplicationEventPublisher eventPublisher;
63 
64 	private shared bool brokerAvailable = false;
65 
66 	private BrokerAvailabilityEvent availableEvent;
67 
68 	private BrokerAvailabilityEvent notAvailableEvent;
69 
70 	private bool autoStartup = true;
71 
72 	private bool running = false;
73 
74 	private Object lifecycleMonitor;
75 
76 	private ChannelInterceptor unsentDisconnectInterceptor;
77 
78 
79 	private void initilize() {
80 		availableEvent = new BrokerAvailabilityEvent(true, this);
81 		notAvailableEvent = new BrokerAvailabilityEvent(false, this);
82 		lifecycleMonitor = new Object();
83 		unsentDisconnectInterceptor = new UnsentDisconnectChannelInterceptor();
84 	}
85 
86 	/**
87 	 * Constructor with no destination prefixes (matches all destinations).
88 	 * @param inboundChannel the channel for receiving messages from clients (e.g. WebSocket clients)
89 	 * @param outboundChannel the channel for sending messages to clients (e.g. WebSocket clients)
90 	 * @param brokerChannel the channel for the application to send messages to the broker
91 	 */
92 	this(SubscribableChannel inboundChannel, MessageChannel outboundChannel,
93 			SubscribableChannel brokerChannel) {
94 		this(inboundChannel, outboundChannel, brokerChannel, []);
95 	}
96 
97 	/**
98 	 * Constructor with destination prefixes to match to destinations of messages.
99 	 * @param inboundChannel the channel for receiving messages from clients (e.g. WebSocket clients)
100 	 * @param outboundChannel the channel for sending messages to clients (e.g. WebSocket clients)
101 	 * @param brokerChannel the channel for the application to send messages to the broker
102 	 * @param destinationPrefixes prefixes to use to filter out messages
103 	 */
104 	this(SubscribableChannel inboundChannel, MessageChannel outboundChannel,
105 			SubscribableChannel brokerChannel, string[] destinationPrefixes) {
106 
107 		assert(inboundChannel, "'inboundChannel' must not be null");
108 		assert(outboundChannel, "'outboundChannel' must not be null");
109 		assert(brokerChannel, "'brokerChannel' must not be null");
110 		
111 		initilize();
112 		this.clientInboundChannel = inboundChannel;
113 		this.clientOutboundChannel = outboundChannel;
114 		this.brokerChannel = brokerChannel;
115 
116 		destinationPrefixes = (destinationPrefixes !is null ? destinationPrefixes : []);
117 		this.destinationPrefixes = destinationPrefixes;
118 	}
119 
120 
121 	SubscribableChannel getClientInboundChannel() {
122 		return this.clientInboundChannel;
123 	}
124 
125 	MessageChannel getClientOutboundChannel() {
126 		return this.clientOutboundChannel;
127 	}
128 
129 	SubscribableChannel getBrokerChannel() {
130 		return this.brokerChannel;
131 	}
132 
133 	string[] getDestinationPrefixes() {
134 		return this.destinationPrefixes;
135 	}
136 
137 	/**
138 	 * Whether the client must receive messages in the order of publication.
139 	 * <p>By default messages sent to the {@code "clientOutboundChannel"} may
140 	 * not be processed in the same order because the channel is backed by a
141 	 * ThreadPoolExecutor that in turn does not guarantee processing in order.
142 	 * <p>When this flag is set to {@code true} messages within the same session
143 	 * will be sent to the {@code "clientOutboundChannel"} one at a time in
144 	 * order to preserve the order of publication. Enable this only if needed
145 	 * since there is some performance overhead to keep messages in order.
146 	 * @param preservePublishOrder whether to publish in order
147 	 * @since 5.1
148 	 */
149 	void setPreservePublishOrder(bool preservePublishOrder) {
150 		OrderedMessageSender.configureOutboundChannel(this.clientOutboundChannel, preservePublishOrder);
151 		this.preservePublishOrder = preservePublishOrder;
152 	}
153 
154 	/**
155 	 * Whether to ensure messages are received in the order of publication.
156 	 * @since 5.1
157 	 */
158 	bool isPreservePublishOrder() {
159 		return this.preservePublishOrder;
160 	}
161 
162 	// override
163 	void setApplicationEventPublisher(ApplicationEventPublisher publisher) {
164 		this.eventPublisher = publisher;
165 	}
166 
167 	
168 	ApplicationEventPublisher getApplicationEventPublisher() {
169 		return this.eventPublisher;
170 	}
171 
172 	void setAutoStartup(bool autoStartup) {
173 		this.autoStartup = autoStartup;
174 	}
175 
176 	// override
177 	bool isAutoStartup() {
178 		return this.autoStartup;
179 	}
180 
181 	int getPhase() {
182 		return int.max;
183 	}
184 
185 	// override
186 	void start() {
187 		synchronized (this.lifecycleMonitor) {
188 			version(HUNT_DEBUG) info("Starting...");
189 			this.clientInboundChannel.subscribe(this);
190 			this.brokerChannel.subscribe(this);
191 			InterceptableChannel channel = cast(InterceptableChannel) this.clientInboundChannel;
192 			if (channel !is null) {
193 				channel.addInterceptor(0, this.unsentDisconnectInterceptor);
194 			}
195 			startInternal();
196 			this.running = true;
197 			version(HUNT_DEBUG) info("Started.");
198 		}
199 	}
200 
201 	protected void startInternal() {
202 	}
203 
204 	// override
205 	void stop() {
206 		synchronized (this.lifecycleMonitor) {
207 			version(HUNT_DEBUG) info("Stopping...");
208 			stopInternal();
209 			this.clientInboundChannel.unsubscribe(this);
210 			this.brokerChannel.unsubscribe(this);
211 			InterceptableChannel channel = cast(InterceptableChannel) this.clientInboundChannel;
212 			if (channel !is null) {
213 				channel.removeInterceptor(this.unsentDisconnectInterceptor);
214 			}
215 			this.running = false;
216 			version(HUNT_DEBUG) info("Stopped.");
217 		}
218 	}
219 
220 	protected void stopInternal() {
221 	}
222 
223 	// override
224 	final void stop(Runnable callback) {
225 		synchronized (this.lifecycleMonitor) {
226 			stop();
227 			callback.run();
228 		}
229 	}
230 
231 	/**
232 	 * Check whether this message handler is currently running.
233 	 * <p>Note that even when this message handler is running the
234 	 * {@link #isBrokerAvailable()} flag may still independently alternate between
235 	 * being on and off depending on the concrete sub-class implementation.
236 	 */
237 	// override
238 	final bool isRunning() {
239 		return this.running;
240 	}
241 
242 	/**
243 	 * Whether the message broker is currently available and able to process messages.
244 	 * <p>Note that this is in addition to the {@link #isRunning()} flag, which
245 	 * indicates whether this message handler is running. In other words the message
246 	 * handler must first be running and then the {@code #isBrokerAvailable()} flag
247 	 * may still independently alternate between being on and off depending on the
248 	 * concrete sub-class implementation.
249 	 * <p>Application components may implement
250 	 * {@code hunt.framework.context.ApplicationListener&lt;BrokerAvailabilityEvent&gt;}
251 	 * to receive notifications when broker becomes available and unavailable.
252 	 */
253 	bool isBrokerAvailable() {
254 		return this.brokerAvailable;
255 	}
256 
257 
258 	override
259 	void handleMessage(MessageBase message) {
260 		version(HUNT_DEBUG) {
261 			trace("Processing " ~  typeid(cast(Object)message).name);
262 		}
263 		if (this.running) 
264 			handleMessageInternal(message);
265 		else {
266 			version(HUNT_DEBUG) {
267 				trace(this.toString() ~ " not running yet. Ignoring " ~ message.to!string());
268 			}
269 		}
270 	}
271 
272 	protected abstract void handleMessageInternal(MessageBase message);
273 
274 
275 	protected bool checkDestinationPrefix(string destination) {
276 		if (destination is null || this.destinationPrefixes.empty()) {
277 			return true;
278 		}
279 		foreach (string prefix ; this.destinationPrefixes) {
280 			if (destination.startsWith(prefix)) {
281 				return true;
282 			}
283 		}
284 		return false;
285 	}
286 
287 	protected void publishBrokerAvailableEvent() {
288 		bool shouldPublish = cas(&this.brokerAvailable, false, true);
289 		if (this.eventPublisher !is null && shouldPublish) {
290 			version(HUNT_DEBUG) {
291 				info(this.availableEvent);
292 			}
293 			this.eventPublisher.publishEvent(this.availableEvent);
294 		}
295 	}
296 
297 	protected void publishBrokerUnavailableEvent() {
298 		bool shouldPublish = cas(&this.brokerAvailable, true, false);
299 		if (this.eventPublisher !is null && shouldPublish) {
300 			version(HUNT_DEBUG) {
301 				info(this.notAvailableEvent);
302 			}
303 			this.eventPublisher.publishEvent(this.notAvailableEvent);
304 		}
305 	}
306 
307 	/**
308 	 * Get the MessageChannel to use for sending messages to clients, possibly
309 	 * a per-session wrapper when {@code preservePublishOrder=true}.
310 	 * @since 5.1
311 	 */
312 	protected MessageChannel getClientOutboundChannelForSession(string sessionId) {
313 		return this.preservePublishOrder ?
314 				new OrderedMessageSender(getClientOutboundChannel()) : getClientOutboundChannel();
315 	}
316 
317 
318 	/**
319 	 * Detect unsent DISCONNECT messages and process them anyway.
320 	 */
321 	private class UnsentDisconnectChannelInterceptor : ChannelInterceptor {
322 
323 		override
324 		void afterSendCompletion(MessageBase message, 
325 			MessageChannel channel, bool sent, Exception ex) {
326 			if (!sent) {
327 				SimpMessageType messageType = SimpMessageHeaderAccessor.getMessageType(message.getHeaders());
328 				if (SimpMessageType.DISCONNECT == (messageType)) {
329 					trace("Detected unsent DISCONNECT message. Processing anyway.");
330 					handleMessage(message);
331 				}
332 			}
333 		}
334 	}
335 
336 }