1 /* 2 * Copyright 2002-2018 the original author or authors. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 module hunt.stomp.simp.broker.OrderedMessageSender; 18 19 import hunt.stomp.support.ChannelInterceptor; 20 21 import hunt.collection; 22 import hunt.util.Common; 23 import hunt.Exceptions; 24 import hunt.logging; 25 26 import hunt.stomp.Message; 27 import hunt.stomp.MessageChannel; 28 import hunt.stomp.simp.SimpMessageHeaderAccessor; 29 import hunt.stomp.support.ExecutorChannelInterceptor; 30 import hunt.stomp.support.ExecutorSubscribableChannel; 31 import hunt.stomp.support.MessageHeaderAccessor; 32 33 import core.atomic; 34 import std.conv; 35 import std.container.dlist; 36 37 /** 38 * Submit messages to an {@link ExecutorSubscribableChannel}, one at a time. 39 * The channel must have been configured with {@link #configureOutboundChannel}. 40 * 41 * @author Rossen Stoyanchev 42 * @since 5.1 43 */ 44 class OrderedMessageSender : MessageChannel { 45 46 enum string COMPLETION_TASK_HEADER = "simpSendCompletionTask"; 47 48 private MessageChannel channel; 49 50 // private Queue!(MessageBase) messages; 51 private DList!(MessageBase) messages; 52 53 private shared bool sendInProgress = false; 54 55 56 this(MessageChannel channel) { 57 this.channel = channel; 58 } 59 60 private void initlize() { 61 // messages = new ConcurrentLinkedQueue<>(); 62 // messages = new LinkedQueue!(MessageBase)(); 63 } 64 65 // bool send(MessageBase message) { 66 // return send(message, -1); 67 // } 68 69 override 70 bool send(MessageBase message, long timeout) { 71 this.messages.insertBack(message); 72 trySend(); 73 return true; 74 } 75 76 private void trySend() { 77 // Take sendInProgress flag only if queue is not empty 78 if (this.messages.empty) { 79 return; 80 } 81 82 if (cas(&this.sendInProgress, false, true)) { 83 sendNextMessage(); 84 } 85 } 86 87 private void sendNextMessage() { 88 for (;;) { 89 MessageBase message = this.messages.front; 90 this.messages.removeFront(); 91 if (message !is null) { 92 try { 93 addCompletionCallback(message); 94 if (this.channel.send(message)) { 95 return; 96 } 97 } 98 catch (Throwable ex) { 99 version(HUNT_DEBUG) { 100 error("Failed to send " ~ message.to!string(), ex); 101 } 102 } 103 } 104 else { 105 // We ran out of messages.. 106 this.sendInProgress = false; 107 trySend(); 108 break; 109 } 110 } 111 } 112 113 private void addCompletionCallback(MessageBase msg) { 114 implementationMissing(false); 115 // SimpMessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(msg, SimpMessageHeaderAccessor.class); 116 // assert(accessor !is null && accessor.isMutable(), "Expected mutable SimpMessageHeaderAccessor"); 117 // accessor.setHeader(COMPLETION_TASK_HEADER, &this.sendNextMessage); 118 } 119 120 121 /** 122 * Install or remove an {@link ExecutorChannelInterceptor} that invokes a 123 * completion task once the message is handled. 124 * @param channel the channel to configure 125 * @param preservePublishOrder whether preserve order is on or off based on 126 * which an interceptor is either added or removed. 127 */ 128 static void configureOutboundChannel(MessageChannel channel, bool preservePublishOrder) { 129 if (preservePublishOrder) { 130 ExecutorSubscribableChannel execChannel = cast(ExecutorSubscribableChannel) channel; 131 assert(execChannel !is null, 132 "An ExecutorSubscribableChannel is required for `preservePublishOrder`"); 133 implementationMissing(false); 134 // if (execChannel.getInterceptors().stream().noneMatch(i -> i instanceof CallbackInterceptor)) { 135 // execChannel.addInterceptor(0, new CallbackInterceptor()); 136 // } 137 } 138 else { 139 ExecutorSubscribableChannel execChannel = cast(ExecutorSubscribableChannel) channel; 140 if(execChannel !is null) { 141 foreach(ChannelInterceptor i; execChannel.getInterceptors()) { 142 CallbackInterceptor ci = cast(CallbackInterceptor)i; 143 if(ci !is null) { 144 execChannel.removeInterceptor(ci); 145 break; 146 } 147 } 148 } 149 } 150 } 151 152 } 153 154 155 156 private class CallbackInterceptor : ExecutorChannelInterceptor { 157 158 override 159 void afterMessageHandled( 160 MessageBase msg, MessageChannel ch, MessageHandler handler, Exception ex) { 161 162 Runnable task = cast(Runnable) msg.getHeaders().get(OrderedMessageSender.COMPLETION_TASK_HEADER); 163 if (task !is null) { 164 task.run(); 165 } 166 } 167 }