1 /* 2 * Copyright 2002-2018 the original author or authors. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 module hunt.stomp.simp.broker.AbstractBrokerMessageHandler; 18 19 import hunt.stomp.simp.broker.BrokerAvailabilityEvent; 20 import hunt.stomp.simp.broker.OrderedMessageSender; 21 22 import hunt.util.ApplicationEvent; 23 // import hunt.framework.context.ApplicationEventPublisherAware; 24 import hunt.util.Lifecycle; 25 26 import hunt.stomp.Message; 27 import hunt.stomp.MessageChannel; 28 import hunt.stomp.simp.SimpMessageHeaderAccessor; 29 import hunt.stomp.simp.SimpMessageType; 30 import hunt.stomp.support.ChannelInterceptor; 31 import hunt.stomp.support.InterceptableChannel; 32 33 import hunt.util.Common; 34 import hunt.collection; 35 import hunt.logging; 36 37 import core.atomic; 38 import std.array; 39 import std.conv; 40 import std.string; 41 42 /** 43 * Abstract base class for a {@link MessageHandler} that broker messages to 44 * registered subscribers. 45 * 46 * @author Rossen Stoyanchev 47 * @since 4.0 48 */ 49 abstract class AbstractBrokerMessageHandler : MessageHandler, SmartLifecycle { 50 // , ApplicationEventPublisherAware, 51 52 private SubscribableChannel clientInboundChannel; 53 54 private MessageChannel clientOutboundChannel; 55 56 private SubscribableChannel brokerChannel; 57 58 private string[] destinationPrefixes; 59 60 private bool preservePublishOrder = false; 61 62 private ApplicationEventPublisher eventPublisher; 63 64 private shared bool brokerAvailable = false; 65 66 private BrokerAvailabilityEvent availableEvent; 67 68 private BrokerAvailabilityEvent notAvailableEvent; 69 70 private bool autoStartup = true; 71 72 private bool running = false; 73 74 private Object lifecycleMonitor; 75 76 private ChannelInterceptor unsentDisconnectInterceptor; 77 78 79 private void initilize() { 80 availableEvent = new BrokerAvailabilityEvent(true, this); 81 notAvailableEvent = new BrokerAvailabilityEvent(false, this); 82 lifecycleMonitor = new Object(); 83 unsentDisconnectInterceptor = new UnsentDisconnectChannelInterceptor(); 84 } 85 86 /** 87 * Constructor with no destination prefixes (matches all destinations). 88 * @param inboundChannel the channel for receiving messages from clients (e.g. WebSocket clients) 89 * @param outboundChannel the channel for sending messages to clients (e.g. WebSocket clients) 90 * @param brokerChannel the channel for the application to send messages to the broker 91 */ 92 this(SubscribableChannel inboundChannel, MessageChannel outboundChannel, 93 SubscribableChannel brokerChannel) { 94 this(inboundChannel, outboundChannel, brokerChannel, []); 95 } 96 97 /** 98 * Constructor with destination prefixes to match to destinations of messages. 99 * @param inboundChannel the channel for receiving messages from clients (e.g. WebSocket clients) 100 * @param outboundChannel the channel for sending messages to clients (e.g. WebSocket clients) 101 * @param brokerChannel the channel for the application to send messages to the broker 102 * @param destinationPrefixes prefixes to use to filter out messages 103 */ 104 this(SubscribableChannel inboundChannel, MessageChannel outboundChannel, 105 SubscribableChannel brokerChannel, string[] destinationPrefixes) { 106 107 assert(inboundChannel, "'inboundChannel' must not be null"); 108 assert(outboundChannel, "'outboundChannel' must not be null"); 109 assert(brokerChannel, "'brokerChannel' must not be null"); 110 111 initilize(); 112 this.clientInboundChannel = inboundChannel; 113 this.clientOutboundChannel = outboundChannel; 114 this.brokerChannel = brokerChannel; 115 116 destinationPrefixes = (destinationPrefixes !is null ? destinationPrefixes : []); 117 this.destinationPrefixes = destinationPrefixes; 118 } 119 120 121 SubscribableChannel getClientInboundChannel() { 122 return this.clientInboundChannel; 123 } 124 125 MessageChannel getClientOutboundChannel() { 126 return this.clientOutboundChannel; 127 } 128 129 SubscribableChannel getBrokerChannel() { 130 return this.brokerChannel; 131 } 132 133 string[] getDestinationPrefixes() { 134 return this.destinationPrefixes; 135 } 136 137 /** 138 * Whether the client must receive messages in the order of publication. 139 * <p>By default messages sent to the {@code "clientOutboundChannel"} may 140 * not be processed in the same order because the channel is backed by a 141 * ThreadPoolExecutor that in turn does not guarantee processing in order. 142 * <p>When this flag is set to {@code true} messages within the same session 143 * will be sent to the {@code "clientOutboundChannel"} one at a time in 144 * order to preserve the order of publication. Enable this only if needed 145 * since there is some performance overhead to keep messages in order. 146 * @param preservePublishOrder whether to publish in order 147 * @since 5.1 148 */ 149 void setPreservePublishOrder(bool preservePublishOrder) { 150 OrderedMessageSender.configureOutboundChannel(this.clientOutboundChannel, preservePublishOrder); 151 this.preservePublishOrder = preservePublishOrder; 152 } 153 154 /** 155 * Whether to ensure messages are received in the order of publication. 156 * @since 5.1 157 */ 158 bool isPreservePublishOrder() { 159 return this.preservePublishOrder; 160 } 161 162 // override 163 void setApplicationEventPublisher(ApplicationEventPublisher publisher) { 164 this.eventPublisher = publisher; 165 } 166 167 168 ApplicationEventPublisher getApplicationEventPublisher() { 169 return this.eventPublisher; 170 } 171 172 void setAutoStartup(bool autoStartup) { 173 this.autoStartup = autoStartup; 174 } 175 176 // override 177 bool isAutoStartup() { 178 return this.autoStartup; 179 } 180 181 int getPhase() { 182 return int.max; 183 } 184 185 // override 186 void start() { 187 synchronized (this.lifecycleMonitor) { 188 version(HUNT_DEBUG) info("Starting..."); 189 this.clientInboundChannel.subscribe(this); 190 this.brokerChannel.subscribe(this); 191 InterceptableChannel channel = cast(InterceptableChannel) this.clientInboundChannel; 192 if (channel !is null) { 193 channel.addInterceptor(0, this.unsentDisconnectInterceptor); 194 } 195 startInternal(); 196 this.running = true; 197 version(HUNT_DEBUG) info("Started."); 198 } 199 } 200 201 protected void startInternal() { 202 } 203 204 // override 205 void stop() { 206 synchronized (this.lifecycleMonitor) { 207 version(HUNT_DEBUG) info("Stopping..."); 208 stopInternal(); 209 this.clientInboundChannel.unsubscribe(this); 210 this.brokerChannel.unsubscribe(this); 211 InterceptableChannel channel = cast(InterceptableChannel) this.clientInboundChannel; 212 if (channel !is null) { 213 channel.removeInterceptor(this.unsentDisconnectInterceptor); 214 } 215 this.running = false; 216 version(HUNT_DEBUG) info("Stopped."); 217 } 218 } 219 220 protected void stopInternal() { 221 } 222 223 // override 224 final void stop(Runnable callback) { 225 synchronized (this.lifecycleMonitor) { 226 stop(); 227 callback.run(); 228 } 229 } 230 231 /** 232 * Check whether this message handler is currently running. 233 * <p>Note that even when this message handler is running the 234 * {@link #isBrokerAvailable()} flag may still independently alternate between 235 * being on and off depending on the concrete sub-class implementation. 236 */ 237 // override 238 final bool isRunning() { 239 return this.running; 240 } 241 242 /** 243 * Whether the message broker is currently available and able to process messages. 244 * <p>Note that this is in addition to the {@link #isRunning()} flag, which 245 * indicates whether this message handler is running. In other words the message 246 * handler must first be running and then the {@code #isBrokerAvailable()} flag 247 * may still independently alternate between being on and off depending on the 248 * concrete sub-class implementation. 249 * <p>Application components may implement 250 * {@code hunt.framework.context.ApplicationListener<BrokerAvailabilityEvent>} 251 * to receive notifications when broker becomes available and unavailable. 252 */ 253 bool isBrokerAvailable() { 254 return this.brokerAvailable; 255 } 256 257 258 override 259 void handleMessage(MessageBase message) { 260 version(HUNT_DEBUG) { 261 trace("Processing " ~ typeid(cast(Object)message).name); 262 } 263 if (this.running) 264 handleMessageInternal(message); 265 else { 266 version(HUNT_DEBUG) { 267 trace(this.toString() ~ " not running yet. Ignoring " ~ message.to!string()); 268 } 269 } 270 } 271 272 protected abstract void handleMessageInternal(MessageBase message); 273 274 275 protected bool checkDestinationPrefix(string destination) { 276 if (destination is null || this.destinationPrefixes.empty()) { 277 return true; 278 } 279 foreach (string prefix ; this.destinationPrefixes) { 280 if (destination.startsWith(prefix)) { 281 return true; 282 } 283 } 284 return false; 285 } 286 287 protected void publishBrokerAvailableEvent() { 288 bool shouldPublish = cas(&this.brokerAvailable, false, true); 289 if (this.eventPublisher !is null && shouldPublish) { 290 version(HUNT_DEBUG) { 291 info(this.availableEvent); 292 } 293 this.eventPublisher.publishEvent(this.availableEvent); 294 } 295 } 296 297 protected void publishBrokerUnavailableEvent() { 298 bool shouldPublish = cas(&this.brokerAvailable, true, false); 299 if (this.eventPublisher !is null && shouldPublish) { 300 version(HUNT_DEBUG) { 301 info(this.notAvailableEvent); 302 } 303 this.eventPublisher.publishEvent(this.notAvailableEvent); 304 } 305 } 306 307 /** 308 * Get the MessageChannel to use for sending messages to clients, possibly 309 * a per-session wrapper when {@code preservePublishOrder=true}. 310 * @since 5.1 311 */ 312 protected MessageChannel getClientOutboundChannelForSession(string sessionId) { 313 return this.preservePublishOrder ? 314 new OrderedMessageSender(getClientOutboundChannel()) : getClientOutboundChannel(); 315 } 316 317 318 /** 319 * Detect unsent DISCONNECT messages and process them anyway. 320 */ 321 private class UnsentDisconnectChannelInterceptor : ChannelInterceptor { 322 323 override 324 void afterSendCompletion(MessageBase message, 325 MessageChannel channel, bool sent, Exception ex) { 326 if (!sent) { 327 SimpMessageType messageType = SimpMessageHeaderAccessor.getMessageType(message.getHeaders()); 328 if (SimpMessageType.DISCONNECT == (messageType)) { 329 trace("Detected unsent DISCONNECT message. Processing anyway."); 330 handleMessage(message); 331 } 332 } 333 } 334 } 335 336 }