1 /*
2  * Copyright 2002-2018 the original author or authors.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 module hunt.stomp.support.AbstractMessageChannel;
18 
19 import hunt.stomp.support.ChannelInterceptor;
20 import hunt.stomp.support.InterceptableChannel;
21 
22 import hunt.stomp.Message;
23 import hunt.stomp.MessageChannel;
24 import hunt.stomp.MessagingException;
25 
26 import hunt.Object;
27 import hunt.collection;
28 import hunt.logging;
29 import hunt.util.ObjectUtils;
30 import hunt.util.TypeUtils;
31 
32 import std.conv;
33 
34 /**
35  * Abstract base class for {@link MessageChannel} implementations.
36  *
37  * @author Rossen Stoyanchev
38  * @since 4.0
39  */
40 abstract class AbstractMessageChannel : MessageChannel, InterceptableChannel { 
41 
42 	private List!(ChannelInterceptor) interceptors;
43 
44 	private string beanName;
45 
46 	string id;
47 
48 	this() {
49 		interceptors = new ArrayList!ChannelInterceptor(5);
50 		this.beanName = TypeUtils.getSimpleName(typeid(this)) ~ "@" ~ ObjectUtils.getIdentityHexString(this);
51 	}
52 
53 	// private void initialize() {
54 	// 	interceptors = new ArrayList!ChannelInterceptor(5);
55 	// }
56 
57 
58 	/**
59 	 * A message channel uses the bean name primarily for logging purposes.
60 	 */
61 	// override
62 	void setBeanName(string name) {
63 		this.beanName = name;
64 	}
65 
66 	/**
67 	 * Return the bean name for this message channel.
68 	 */
69 	string getBeanName() {
70 		return this.beanName;
71 	}
72 
73 
74 	override
75 	void setInterceptors(ChannelInterceptor[] interceptors) {
76 		this.interceptors.clear();
77 		this.interceptors.addAll(interceptors);
78 	}
79 
80 	override
81 	void addInterceptor(ChannelInterceptor interceptor) {
82 		this.interceptors.add(interceptor);
83 	}
84 
85 	override
86 	void addInterceptor(int index, ChannelInterceptor interceptor) {
87 		this.interceptors.add(index, interceptor);
88 	}
89 
90 	override
91 	List!(ChannelInterceptor) getInterceptors() {
92 		return this.interceptors; // Collections.unmodifiableList(this.interceptors);
93 	}
94 
95 	override
96 	bool removeInterceptor(ChannelInterceptor interceptor) {
97 		return this.interceptors.remove(interceptor);
98 	}
99 
100 	override
101 	ChannelInterceptor removeInterceptor(int index) {
102 		return this.interceptors.removeAt(index);
103 	}
104 
105 	// override
106 	// bool send(MessageBase message) {
107 	// 	return send(message, INDEFINITE_TIMEOUT);
108 	// }
109 
110 	override
111 	bool send(MessageBase message, long timeout) {
112 		assert(message !is null, "Message must not be null");
113 		MessageBase messageToUse = message;
114 		ChannelInterceptorChain chain = new ChannelInterceptorChain();
115 		bool sent = false;
116 		try {
117 			messageToUse = chain.applyPreSend(messageToUse, this);
118 			if (messageToUse is null) {
119 				return false;
120 			}
121 			sent = sendInternal(messageToUse, timeout);
122 			chain.applyPostSend(messageToUse, this, sent);
123 			chain.triggerAfterSendCompletion(messageToUse, this, sent, null);
124 			return sent;
125 		}
126 		catch (Exception ex) {
127 			chain.triggerAfterSendCompletion(messageToUse, this, sent, ex);
128 			MessagingException mex = cast(MessagingException) ex;
129 			if (mex !is null) 
130 				throw mex;
131 			throw new MessageDeliveryException(messageToUse,"Failed to send message to " ~ this.toString(), ex);
132 		}
133 		catch (Throwable err) {
134 			MessageDeliveryException ex2 =
135 					new MessageDeliveryException(messageToUse, "Failed to send message to " ~ this.toString(), err);
136 			chain.triggerAfterSendCompletion(messageToUse, this, sent, ex2);
137 			throw ex2;
138 		}
139 	}
140 
141 	protected abstract bool sendInternal(MessageBase message, long timeout);
142 
143 
144 	override
145 	string toString() {
146 		return TypeUtils.getSimpleName(typeid(this)) ~ "[" ~ this.beanName ~ "]";
147 	}
148 
149 
150 	/**
151 	 * Assists with the invocation of the configured channel interceptors.
152 	 */
153 	protected class ChannelInterceptorChain {
154 
155 		private int sendInterceptorIndex = -1;
156 
157 		private int receiveInterceptorIndex = -1;
158 
159 		
160 		MessageBase applyPreSend(MessageBase message, MessageChannel channel) {
161 			MessageBase messageToUse = message;
162 			foreach (ChannelInterceptor interceptor ; interceptors) {
163 				MessageBase resolvedMessage = interceptor.preSend(messageToUse, channel);
164 				if (resolvedMessage is null) {
165 					string name = TypeUtils.getSimpleName(typeid(interceptor));
166 					version(HUNT_DEBUG) {
167 						trace(name ~ " returned null from preSend, i.e. precluding the send.");
168 					}
169 					triggerAfterSendCompletion(messageToUse, channel, false, null);
170 					return null;
171 				}
172 				messageToUse = resolvedMessage;
173 				this.sendInterceptorIndex++;
174 			}
175 			return messageToUse;
176 		}
177 
178 		void applyPostSend(MessageBase message, MessageChannel channel, bool sent) {
179 			foreach (ChannelInterceptor interceptor ; interceptors) {
180 				interceptor.postSend(message, channel, sent);
181 			}
182 		}
183 
184 		void triggerAfterSendCompletion(MessageBase message, MessageChannel channel,
185 				bool sent, Exception ex) {
186 			for (int i = this.sendInterceptorIndex; i >= 0; i--) {
187 				ChannelInterceptor interceptor = interceptors.get(i);
188 				try {
189 					interceptor.afterSendCompletion(message, channel, sent, ex);
190 				}
191 				catch (Throwable ex2) {
192 					errorf("Exception from afterSendCompletion in " ~ interceptor.toString(), ex2);
193 				}
194 			}
195 		}
196 
197 		bool applyPreReceive(MessageChannel channel) {
198 			foreach (ChannelInterceptor interceptor ; interceptors) {
199 				if (!interceptor.preReceive(channel)) {
200 					triggerAfterReceiveCompletion(null, channel, null);
201 					return false;
202 				}
203 				this.receiveInterceptorIndex++;
204 			}
205 			return true;
206 		}
207 
208 		
209 		MessageBase applyPostReceive(MessageBase message, MessageChannel channel) {
210 			MessageBase messageToUse = message;
211 			foreach (ChannelInterceptor interceptor ; interceptors) {
212 				messageToUse = interceptor.postReceive(messageToUse, channel);
213 				if (messageToUse is null) {
214 					return null;
215 				}
216 			}
217 			return messageToUse;
218 		}
219 
220 		void triggerAfterReceiveCompletion(MessageBase message, 
221 			MessageChannel channel, Exception ex) {
222 
223 			for (int i = this.receiveInterceptorIndex; i >= 0; i--) {
224 				ChannelInterceptor interceptor = interceptors.get(i);
225 				try {
226 					interceptor.afterReceiveCompletion(message, channel, ex);
227 				}
228 				catch (Throwable ex2) {
229 					version(HUNT_DEBUG) {
230 						errorf("Exception from afterReceiveCompletion in " ~ interceptor.to!string, ex2);
231 					}
232 				}
233 			}
234 		}
235 	}
236 
237 }