1 /* 2 * Copyright 2002-2018 the original author or authors. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 module hunt.stomp.simp.config.AbstractMessageBrokerConfiguration; 18 19 import hunt.stomp.simp.config.ChannelRegistration; 20 import hunt.stomp.simp.config.MessageBrokerRegistry; 21 22 // import hunt.framework.beans.BeanUtils; 23 // import hunt.framework.beans.factory.BeanInitializationException; 24 // import hunt.framework.context.ApplicationContext; 25 // import hunt.framework.context.ApplicationContextAware; 26 // import hunt.framework.context.annotation.Bean; 27 // import hunt.framework.context.event.SmartApplicationListener; 28 29 30 import hunt.stomp.converter.ByteArrayMessageConverter; 31 import hunt.stomp.converter.CompositeMessageConverter; 32 import hunt.stomp.converter.DefaultContentTypeResolver; 33 import hunt.stomp.converter.JsonMessageConverter; 34 import hunt.stomp.converter.MessageConverter; 35 import hunt.stomp.converter.StringMessageConverter; 36 // import hunt.stomp.handler.invocation.HandlerMethodArgumentResolver; 37 // import hunt.stomp.handler.invocation.HandlerMethodReturnValueHandler; 38 39 import hunt.stomp.simp.SimpMessagingTemplate; 40 import hunt.stomp.simp.annotation.SimpAnnotationMethodMessageHandler; 41 import hunt.stomp.simp.broker.AbstractBrokerMessageHandler; 42 import hunt.stomp.simp.broker.SimpleBrokerMessageHandler; 43 import hunt.stomp.simp.stomp.StompBrokerRelayMessageHandler; 44 // import hunt.stomp.simp.user.DefaultUserDestinationResolver; 45 // import hunt.stomp.simp.user.MultiServerUserRegistry; 46 // import hunt.stomp.simp.user.SimpUserRegistry; 47 // import hunt.stomp.simp.user.UserDestinationMessageHandler; 48 // import hunt.stomp.simp.user.UserDestinationResolver; 49 // import hunt.stomp.simp.user.UserRegistryMessageHandler; 50 import hunt.stomp.support.AbstractSubscribableChannel; 51 import hunt.stomp.support.ExecutorSubscribableChannel; 52 import hunt.stomp.support.ImmutableMessageChannelInterceptor; 53 // import hunt.framework.scheduling.concurrent.ThreadPoolTaskExecutor; 54 // import hunt.framework.scheduling.concurrent.ThreadPoolTaskScheduler; 55 56 import hunt.collection; 57 import hunt.util.MimeType; 58 import hunt.Exceptions; 59 import hunt.text.PathMatcher; 60 61 import std.string; 62 63 /** 64 * Provides essential configuration for handling messages with simple messaging 65 * protocols such as STOMP. 66 * 67 * <p>{@link #clientInboundChannel()} and {@link #clientOutboundChannel()} deliver 68 * messages to and from remote clients to several message handlers such as 69 * <ul> 70 * <li>{@link #simpAnnotationMethodMessageHandler()}</li> 71 * <li>{@link #simpleBrokerMessageHandler()}</li> 72 * <li>{@link #stompBrokerRelayMessageHandler()}</li> 73 * <li>{@link #userDestinationMessageHandler()}</li> 74 * </ul> 75 * while {@link #brokerChannel()} delivers messages from within the application to the 76 * the respective message handlers. {@link #brokerMessagingTemplate()} can be injected 77 * into any application component to send messages. 78 * 79 * <p>Subclasses are responsible for the part of the configuration that feed messages 80 * to and from the client inbound/outbound channels (e.g. STOMP over WebSocket). 81 * 82 * @author Rossen Stoyanchev 83 * @author Brian Clozel 84 * @since 4.0 85 */ 86 abstract class AbstractMessageBrokerConfiguration { // : ApplicationContextAware 87 88 private enum string MVC_VALIDATOR_NAME = "mvcValidator"; 89 90 // private static bool jackson2Present = ClassUtils.isPresent( 91 // "com.fasterxml.jackson.databind.ObjectMapper", AbstractMessageBrokerConfiguration.class.getClassLoader()); 92 93 94 // protected ApplicationContext applicationContext; 95 96 private ChannelRegistration clientInboundChannelRegistration; 97 98 private ChannelRegistration clientOutboundChannelRegistration; 99 100 private MessageBrokerRegistry brokerRegistry; 101 102 103 /** 104 * Protected constructor. 105 */ 106 // protected this(ApplicationContext context) { 107 // this.applicationContext = context; 108 // } 109 110 // override 111 // void setApplicationContext(ApplicationContext applicationContext) { 112 // this.applicationContext = applicationContext; 113 // } 114 115 116 // ApplicationContext getApplicationContext() { 117 // return this.applicationContext; 118 // } 119 120 AbstractSubscribableChannel clientInboundChannel() { 121 if(inboundChannel is null) { 122 // ExecutorSubscribableChannel channel = new ExecutorSubscribableChannel(clientInboundChannelExecutor()); 123 inboundChannel = new ExecutorSubscribableChannel(null); 124 // channel.setLogger(SimpLogging.forLog(channel.getLogger())); 125 inboundChannel.id = "inbound"; 126 ChannelRegistration reg = getClientInboundChannelRegistration(); 127 if (reg.hasInterceptors()) { 128 inboundChannel.setInterceptors(reg.getInterceptors()); 129 } 130 } 131 return inboundChannel; 132 } 133 private AbstractSubscribableChannel inboundChannel; 134 135 136 // ThreadPoolTaskExecutor clientInboundChannelExecutor() { 137 // TaskExecutorRegistration reg = getClientInboundChannelRegistration().taskExecutor(); 138 // ThreadPoolTaskExecutor executor = reg.getTaskExecutor(); 139 // executor.setThreadNamePrefix("clientInboundChannel-"); 140 // return executor; 141 // } 142 143 protected final ChannelRegistration getClientInboundChannelRegistration() { 144 if (this.clientInboundChannelRegistration is null) { 145 ChannelRegistration registration = new ChannelRegistration(); 146 configureClientInboundChannel(registration); 147 registration.addInterceptors(new ImmutableMessageChannelInterceptor()); 148 this.clientInboundChannelRegistration = registration; 149 } 150 return this.clientInboundChannelRegistration; 151 } 152 153 /** 154 * A hook for subclasses to customize the message channel for inbound messages 155 * from WebSocket clients. 156 */ 157 protected void configureClientInboundChannel(ChannelRegistration registration) { 158 } 159 160 161 AbstractSubscribableChannel clientOutboundChannel() { 162 if(outboundChannel is null) { 163 // ExecutorSubscribableChannel channel = new ExecutorSubscribableChannel(clientOutboundChannelExecutor()); 164 outboundChannel = new ExecutorSubscribableChannel(null); 165 outboundChannel.id = "outbound"; 166 ChannelRegistration reg = getClientOutboundChannelRegistration(); 167 if (reg.hasInterceptors()) { 168 outboundChannel.setInterceptors(reg.getInterceptors()); 169 } 170 } 171 return outboundChannel; 172 } 173 private AbstractSubscribableChannel outboundChannel; 174 175 176 // ThreadPoolTaskExecutor clientOutboundChannelExecutor() { 177 // TaskExecutorRegistration reg = getClientOutboundChannelRegistration().taskExecutor(); 178 // ThreadPoolTaskExecutor executor = reg.getTaskExecutor(); 179 // executor.setThreadNamePrefix("clientOutboundChannel-"); 180 // return executor; 181 // } 182 183 protected final ChannelRegistration getClientOutboundChannelRegistration() { 184 if (this.clientOutboundChannelRegistration is null) { 185 ChannelRegistration registration = new ChannelRegistration(); 186 configureClientOutboundChannel(registration); 187 registration.addInterceptors(new ImmutableMessageChannelInterceptor()); 188 this.clientOutboundChannelRegistration = registration; 189 } 190 return this.clientOutboundChannelRegistration; 191 } 192 193 /** 194 * A hook for subclasses to customize the message channel for messages from 195 * the application or message broker to WebSocket clients. 196 */ 197 protected void configureClientOutboundChannel(ChannelRegistration registration) { 198 } 199 200 201 AbstractSubscribableChannel brokerChannel() { 202 if(_brokerChannel is null) { 203 ChannelRegistration reg = getBrokerRegistry().getBrokerChannelRegistration(); 204 _brokerChannel = (reg.hasTaskExecutor() ? 205 new ExecutorSubscribableChannel(null) : new ExecutorSubscribableChannel()); 206 _brokerChannel.id = "brokerChannel"; 207 208 // ExecutorSubscribableChannel channel = (reg.hasTaskExecutor() ? 209 // new ExecutorSubscribableChannel(brokerChannelExecutor()) : new ExecutorSubscribableChannel()); 210 reg.addInterceptors(new ImmutableMessageChannelInterceptor()); 211 // channel.setLogger(SimpLogging.forLog(channel.getLogger())); 212 _brokerChannel.setInterceptors(reg.getInterceptors()); 213 } 214 return _brokerChannel; 215 } 216 private AbstractSubscribableChannel _brokerChannel; 217 218 219 // ThreadPoolTaskExecutor brokerChannelExecutor() { 220 // ChannelRegistration reg = getBrokerRegistry().getBrokerChannelRegistration(); 221 // ThreadPoolTaskExecutor executor; 222 // if (reg.hasTaskExecutor()) { 223 // executor = reg.taskExecutor().getTaskExecutor(); 224 // } 225 // else { 226 // // Should never be used 227 // executor = new ThreadPoolTaskExecutor(); 228 // executor.setCorePoolSize(0); 229 // executor.setMaxPoolSize(1); 230 // executor.setQueueCapacity(0); 231 // } 232 // executor.setThreadNamePrefix("brokerChannel-"); 233 // return executor; 234 // } 235 236 /** 237 * An accessor for the {@link MessageBrokerRegistry} that ensures its one-time creation 238 * and initialization through {@link #configureMessageBroker(MessageBrokerRegistry)}. 239 */ 240 protected final MessageBrokerRegistry getBrokerRegistry() { 241 if (this.brokerRegistry is null) { 242 MessageBrokerRegistry registry = new 243 MessageBrokerRegistry(clientInboundChannel(), clientOutboundChannel()); 244 configureMessageBroker(registry); 245 this.brokerRegistry = registry; 246 } 247 return this.brokerRegistry; 248 } 249 250 /** 251 * A hook for subclasses to customize message broker configuration through the 252 * provided {@link MessageBrokerRegistry} instance. 253 */ 254 protected void configureMessageBroker(MessageBrokerRegistry registry) { 255 } 256 257 /** 258 * Provide access to the configured PatchMatcher for access from other 259 * configuration classes. 260 */ 261 262 final PathMatcher getPathMatcher() { 263 return getBrokerRegistry().getPathMatcher(); 264 } 265 266 267 SimpAnnotationMethodMessageHandler simpAnnotationMethodMessageHandler() { 268 SimpAnnotationMethodMessageHandler handler = createAnnotationMethodMessageHandler(); 269 handler.setDestinationPrefixes(getBrokerRegistry().getApplicationDestinationPrefixes()); 270 handler.setMessageConverter(brokerMessageConverter()); 271 // handler.setValidator(simpValidator()); 272 273 // List!(HandlerMethodArgumentResolver) argumentResolvers = 274 // new ArrayList!HandlerMethodArgumentResolver(); 275 // addArgumentResolvers(argumentResolvers); 276 // handler.setCustomArgumentResolvers(argumentResolvers); 277 278 // List!(HandlerMethodReturnValueHandler) returnValueHandlers = 279 // new ArrayList!(HandlerMethodReturnValueHandler)(); 280 // addReturnValueHandlers(returnValueHandlers); 281 // handler.setCustomReturnValueHandlers(returnValueHandlers); 282 283 PathMatcher pathMatcher = getBrokerRegistry().getPathMatcher(); 284 if (pathMatcher !is null) { 285 handler.setPathMatcher(pathMatcher); 286 } 287 return handler; 288 } 289 290 /** 291 * Protected method for plugging in a custom subclass of 292 * {@link hunt.stomp.simp.annotation.SimpAnnotationMethodMessageHandler 293 * SimpAnnotationMethodMessageHandler}. 294 * @since 4.2 295 */ 296 protected SimpAnnotationMethodMessageHandler createAnnotationMethodMessageHandler() { 297 return new SimpAnnotationMethodMessageHandler(clientInboundChannel(), 298 clientOutboundChannel(), brokerMessagingTemplate()); 299 } 300 301 // protected void addArgumentResolvers(List!(HandlerMethodArgumentResolver) argumentResolvers) { 302 // } 303 304 // protected void addReturnValueHandlers(List!(HandlerMethodReturnValueHandler) returnValueHandlers) { 305 // } 306 307 308 309 AbstractBrokerMessageHandler simpleBrokerMessageHandler() { 310 SimpleBrokerMessageHandler handler = getBrokerRegistry().getSimpleBroker(brokerChannel()); 311 if (handler is null) { 312 return null; 313 } 314 updateUserDestinationResolver(handler); 315 return handler; 316 } 317 318 private void updateUserDestinationResolver(AbstractBrokerMessageHandler handler) { 319 string[] prefixes = handler.getDestinationPrefixes(); 320 // if (!prefixes.isEmpty() && !prefixes.iterator().next().startsWith("/")) { 321 if(prefixes.length > 0 && !prefixes[0].startsWith("/")) { 322 // (cast(DefaultUserDestinationResolver) userDestinationResolver()).setRemoveLeadingSlash(true); 323 } 324 } 325 326 AbstractBrokerMessageHandler stompBrokerRelayMessageHandler() { 327 328 // implementationMissing(false); 329 return null; 330 // TODO: Tasks pending completion -@zxp at 10/31/2018, 5:39:16 PM 331 // 332 // StompBrokerRelayMessageHandler handler = getBrokerRegistry().getStompBrokerRelay(brokerChannel()); 333 // if (handler is null) { 334 // return null; 335 // } 336 337 // Map!(string, MessageHandler) subscriptions = new HashMap!(string, MessageHandler)(4); 338 // string destination = getBrokerRegistry().getUserDestinationBroadcast(); 339 // if (destination !is null) { 340 // subscriptions.put(destination, userDestinationMessageHandler()); 341 // } 342 // destination = getBrokerRegistry().getUserRegistryBroadcast(); 343 // if (destination !is null) { 344 // subscriptions.put(destination, userRegistryMessageHandler()); 345 // } 346 // handler.setSystemSubscriptions(subscriptions); 347 // updateUserDestinationResolver(handler); 348 // return handler; 349 } 350 351 352 // UserDestinationMessageHandler userDestinationMessageHandler() { 353 // UserDestinationMessageHandler handler = new UserDestinationMessageHandler(clientInboundChannel(), 354 // brokerChannel(), userDestinationResolver()); 355 // string destination = getBrokerRegistry().getUserDestinationBroadcast(); 356 // if (destination !is null) { 357 // handler.setBroadcastDestination(destination); 358 // } 359 // return handler; 360 // } 361 362 363 // MessageHandler userRegistryMessageHandler() { 364 // if (getBrokerRegistry().getUserRegistryBroadcast() is null) { 365 // return null; 366 // } 367 // SimpUserRegistry userRegistry = userRegistry(); 368 // Assert.isInstanceOf(MultiServerUserRegistry.class, userRegistry, "MultiServerUserRegistry required"); 369 // return new UserRegistryMessageHandler((MultiServerUserRegistry) userRegistry, 370 // brokerMessagingTemplate(), getBrokerRegistry().getUserRegistryBroadcast(), 371 // messageBrokerTaskScheduler()); 372 // } 373 374 // Expose alias for 4.1 compatibility 375 // ThreadPoolTaskScheduler messageBrokerTaskScheduler() { 376 // ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); 377 // scheduler.setThreadNamePrefix("MessageBroker-"); 378 // scheduler.setPoolSize(Runtime.getRuntime().availableProcessors()); 379 // scheduler.setRemoveOnCancelPolicy(true); 380 // return scheduler; 381 // } 382 383 384 SimpMessagingTemplate brokerMessagingTemplate() { 385 SimpMessagingTemplate t = new SimpMessagingTemplate(brokerChannel()); 386 string prefix = getBrokerRegistry().getUserDestinationPrefix(); 387 if (prefix !is null) { 388 t.setUserDestinationPrefix(prefix); 389 } 390 t.setMessageConverter(brokerMessageConverter()); 391 return t; 392 } 393 394 395 CompositeMessageConverter brokerMessageConverter() { 396 MessageConverter[] converters; 397 bool registerDefaults = configureMessageConverters(converters); 398 if (registerDefaults) { 399 converters ~= new StringMessageConverter(); 400 converters ~= new ByteArrayMessageConverter(); 401 converters ~= createJsonConverter(); 402 // if (jackson2Present) { 403 // converters.add(createJacksonConverter()); 404 // } 405 } 406 return new CompositeMessageConverter(converters); 407 } 408 409 protected JsonMessageConverter createJsonConverter() { 410 DefaultContentTypeResolver resolver = new DefaultContentTypeResolver(); 411 resolver.setDefaultMimeType(MimeType.APPLICATION_JSON); 412 413 JsonMessageConverter converter = new JsonMessageConverter(); 414 converter.setContentTypeResolver(resolver); 415 416 return converter; 417 } 418 419 /** 420 * Override this method to add custom message converters. 421 * @param messageConverters the list to add converters to, initially empty 422 * @return {@code true} if default message converters should be added to list, 423 * {@code false} if no more converters should be added. 424 */ 425 protected bool configureMessageConverters(MessageConverter[] messageConverters) { 426 return true; 427 } 428 429 430 // UserDestinationResolver userDestinationResolver() { 431 // DefaultUserDestinationResolver resolver = new DefaultUserDestinationResolver(userRegistry()); 432 // string prefix = getBrokerRegistry().getUserDestinationPrefix(); 433 // if (prefix !is null) { 434 // resolver.setUserDestinationPrefix(prefix); 435 // } 436 // return resolver; 437 // } 438 439 440 441 // SimpUserRegistry userRegistry() { 442 // SimpUserRegistry registry = createLocalUserRegistry(); 443 // if (registry is null) { 444 // registry = createLocalUserRegistry(getBrokerRegistry().getUserRegistryOrder()); 445 // } 446 // broadcast = getBrokerRegistry().getUserRegistryBroadcast() !is null; 447 // return (broadcast ? new MultiServerUserRegistry(registry) : registry); 448 // } 449 450 /** 451 * Create the user registry that provides access to local users. 452 * @deprecated as of 5.1 in favor of {@link #createLocalUserRegistry(int)} 453 */ 454 // @Deprecated 455 // protected SimpUserRegistry createLocalUserRegistry() { 456 // return null; 457 // } 458 459 /** 460 * Create the user registry that provides access to local users. 461 * @param order the order to use as a {@link SmartApplicationListener}. 462 * @since 5.1 463 */ 464 // protected abstract SimpUserRegistry createLocalUserRegistry(int order); 465 466 /** 467 * Return a {@link hunt.framework.validation.Validator 468 * hunt.framework.validation.Validators} instance for validating 469 * {@code @Payload} method arguments. 470 * <p>In order, this method tries to get a Validator instance: 471 * <ul> 472 * <li>delegating to getValidator() first</li> 473 * <li>if none returned, getting an existing instance with its well-known name "mvcValidator", 474 * created by an MVC configuration</li> 475 * <li>if none returned, checking the classpath for the presence of a JSR-303 implementation 476 * before creating a {@code OptionalValidatorFactoryBean}</li> 477 * <li>returning a no-op Validator instance</li> 478 * </ul> 479 */ 480 // protected Validator simpValidator() { 481 // Validator validator = getValidator(); 482 // if (validator is null) { 483 // if (this.applicationContext !is null && this.applicationContext.containsBean(MVC_VALIDATOR_NAME)) { 484 // validator = this.applicationContext.getBean(MVC_VALIDATOR_NAME, Validator.class); 485 // } 486 // else if (ClassUtils.isPresent("javax.validation.Validator", getClass().getClassLoader())) { 487 // Class<?> clazz; 488 // try { 489 // string className = "hunt.framework.validation.beanvalidation.OptionalValidatorFactoryBean"; 490 // clazz = ClassUtils.forName(className, AbstractMessageBrokerConfiguration.class.getClassLoader()); 491 // } 492 // catch (Throwable ex) { 493 // throw new BeanInitializationException("Could not find default validator class", ex); 494 // } 495 // validator = (Validator) BeanUtils.instantiateClass(clazz); 496 // } 497 // else { 498 // validator = new Validator() { 499 // override 500 // bool supports(Class<?> clazz) { 501 // return false; 502 // } 503 // override 504 // void validate(Object target, Errors errors) { 505 // } 506 // }; 507 // } 508 // } 509 // return validator; 510 // } 511 512 /** 513 * Override this method to provide a custom {@link Validator}. 514 * @since 4.0.1 515 */ 516 517 // Validator getValidator() { 518 // return null; 519 // } 520 521 }