1 /* 2 * Copyright 2002-2018 the original author or authors. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 module hunt.stomp.support.AbstractMessageChannel; 18 19 import hunt.stomp.support.ChannelInterceptor; 20 import hunt.stomp.support.InterceptableChannel; 21 22 import hunt.stomp.Message; 23 import hunt.stomp.MessageChannel; 24 import hunt.stomp.MessagingException; 25 26 import hunt.Object; 27 import hunt.collection; 28 import hunt.logging; 29 import hunt.util.ObjectUtils; 30 import hunt.util.TypeUtils; 31 32 import std.conv; 33 34 /** 35 * Abstract base class for {@link MessageChannel} implementations. 36 * 37 * @author Rossen Stoyanchev 38 * @since 4.0 39 */ 40 abstract class AbstractMessageChannel : MessageChannel, InterceptableChannel { 41 42 private List!(ChannelInterceptor) interceptors; 43 44 private string beanName; 45 46 string id; 47 48 this() { 49 interceptors = new ArrayList!ChannelInterceptor(5); 50 this.beanName = TypeUtils.getSimpleName(typeid(this)) ~ "@" ~ ObjectUtils.getIdentityHexString(this); 51 } 52 53 // private void initialize() { 54 // interceptors = new ArrayList!ChannelInterceptor(5); 55 // } 56 57 58 /** 59 * A message channel uses the bean name primarily for logging purposes. 60 */ 61 // override 62 void setBeanName(string name) { 63 this.beanName = name; 64 } 65 66 /** 67 * Return the bean name for this message channel. 68 */ 69 string getBeanName() { 70 return this.beanName; 71 } 72 73 74 override 75 void setInterceptors(ChannelInterceptor[] interceptors) { 76 this.interceptors.clear(); 77 this.interceptors.addAll(interceptors); 78 } 79 80 override 81 void addInterceptor(ChannelInterceptor interceptor) { 82 this.interceptors.add(interceptor); 83 } 84 85 override 86 void addInterceptor(int index, ChannelInterceptor interceptor) { 87 this.interceptors.add(index, interceptor); 88 } 89 90 override 91 List!(ChannelInterceptor) getInterceptors() { 92 return this.interceptors; // Collections.unmodifiableList(this.interceptors); 93 } 94 95 override 96 bool removeInterceptor(ChannelInterceptor interceptor) { 97 return this.interceptors.remove(interceptor); 98 } 99 100 override 101 ChannelInterceptor removeInterceptor(int index) { 102 return this.interceptors.removeAt(index); 103 } 104 105 // override 106 // bool send(MessageBase message) { 107 // return send(message, INDEFINITE_TIMEOUT); 108 // } 109 110 override 111 bool send(MessageBase message, long timeout) { 112 assert(message !is null, "Message must not be null"); 113 MessageBase messageToUse = message; 114 ChannelInterceptorChain chain = new ChannelInterceptorChain(); 115 bool sent = false; 116 try { 117 messageToUse = chain.applyPreSend(messageToUse, this); 118 if (messageToUse is null) { 119 return false; 120 } 121 sent = sendInternal(messageToUse, timeout); 122 chain.applyPostSend(messageToUse, this, sent); 123 chain.triggerAfterSendCompletion(messageToUse, this, sent, null); 124 return sent; 125 } 126 catch (Exception ex) { 127 chain.triggerAfterSendCompletion(messageToUse, this, sent, ex); 128 MessagingException mex = cast(MessagingException) ex; 129 if (mex !is null) 130 throw mex; 131 throw new MessageDeliveryException(messageToUse,"Failed to send message to " ~ this.toString(), ex); 132 } 133 catch (Throwable err) { 134 MessageDeliveryException ex2 = 135 new MessageDeliveryException(messageToUse, "Failed to send message to " ~ this.toString(), err); 136 chain.triggerAfterSendCompletion(messageToUse, this, sent, ex2); 137 throw ex2; 138 } 139 } 140 141 protected abstract bool sendInternal(MessageBase message, long timeout); 142 143 144 override 145 string toString() { 146 return TypeUtils.getSimpleName(typeid(this)) ~ "[" ~ this.beanName ~ "]"; 147 } 148 149 150 /** 151 * Assists with the invocation of the configured channel interceptors. 152 */ 153 protected class ChannelInterceptorChain { 154 155 private int sendInterceptorIndex = -1; 156 157 private int receiveInterceptorIndex = -1; 158 159 160 MessageBase applyPreSend(MessageBase message, MessageChannel channel) { 161 MessageBase messageToUse = message; 162 foreach (ChannelInterceptor interceptor ; interceptors) { 163 MessageBase resolvedMessage = interceptor.preSend(messageToUse, channel); 164 if (resolvedMessage is null) { 165 string name = TypeUtils.getSimpleName(typeid(interceptor)); 166 version(HUNT_DEBUG) { 167 trace(name ~ " returned null from preSend, i.e. precluding the send."); 168 } 169 triggerAfterSendCompletion(messageToUse, channel, false, null); 170 return null; 171 } 172 messageToUse = resolvedMessage; 173 this.sendInterceptorIndex++; 174 } 175 return messageToUse; 176 } 177 178 void applyPostSend(MessageBase message, MessageChannel channel, bool sent) { 179 foreach (ChannelInterceptor interceptor ; interceptors) { 180 interceptor.postSend(message, channel, sent); 181 } 182 } 183 184 void triggerAfterSendCompletion(MessageBase message, MessageChannel channel, 185 bool sent, Exception ex) { 186 for (int i = this.sendInterceptorIndex; i >= 0; i--) { 187 ChannelInterceptor interceptor = interceptors.get(i); 188 try { 189 interceptor.afterSendCompletion(message, channel, sent, ex); 190 } 191 catch (Throwable ex2) { 192 errorf("Exception from afterSendCompletion in " ~ interceptor.toString(), ex2); 193 } 194 } 195 } 196 197 bool applyPreReceive(MessageChannel channel) { 198 foreach (ChannelInterceptor interceptor ; interceptors) { 199 if (!interceptor.preReceive(channel)) { 200 triggerAfterReceiveCompletion(null, channel, null); 201 return false; 202 } 203 this.receiveInterceptorIndex++; 204 } 205 return true; 206 } 207 208 209 MessageBase applyPostReceive(MessageBase message, MessageChannel channel) { 210 MessageBase messageToUse = message; 211 foreach (ChannelInterceptor interceptor ; interceptors) { 212 messageToUse = interceptor.postReceive(messageToUse, channel); 213 if (messageToUse is null) { 214 return null; 215 } 216 } 217 return messageToUse; 218 } 219 220 void triggerAfterReceiveCompletion(MessageBase message, 221 MessageChannel channel, Exception ex) { 222 223 for (int i = this.receiveInterceptorIndex; i >= 0; i--) { 224 ChannelInterceptor interceptor = interceptors.get(i); 225 try { 226 interceptor.afterReceiveCompletion(message, channel, ex); 227 } 228 catch (Throwable ex2) { 229 version(HUNT_DEBUG) { 230 errorf("Exception from afterReceiveCompletion in " ~ interceptor.to!string, ex2); 231 } 232 } 233 } 234 } 235 } 236 237 }