1 /*
2  * Copyright 2002-2018 the original author or authors.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 module hunt.stomp.support.ExecutorSubscribableChannel;
18 
19 import hunt.stomp.support.AbstractSubscribableChannel;
20 import hunt.stomp.support.ChannelInterceptor;
21 import hunt.stomp.support.ExecutorChannelInterceptor;
22 
23 import hunt.stomp.Message;
24 import hunt.stomp.MessagingException;
25 import hunt.stomp.MessageChannel;
26 
27 import hunt.collection.ArrayList;
28 import hunt.collection.List;
29 import hunt.Exceptions;
30 import hunt.util.Common;
31 import hunt.logging;
32 
33 import std.conv;
34 
35 // import hunt.concurrent.Executor;
36 
37 /**
38  * A {@link SubscribableChannel} that sends messages to each of its subscribers.
39  *
40  * @author Phillip Webb
41  * @author Rossen Stoyanchev
42  * @since 4.0
43  */
44 class ExecutorSubscribableChannel : AbstractSubscribableChannel {
45 
46 	private Executor executor;
47 
48 	// private List!(ExecutorChannelInterceptor) executorInterceptors = new ArrayList<>(4);
49 
50 
51 	/**
52 	 * Create a new {@link ExecutorSubscribableChannel} instance
53 	 * where messages will be sent in the callers thread.
54 	 */
55 	this() {
56 		this(null);
57 	}
58 
59 	/**
60 	 * Create a new {@link ExecutorSubscribableChannel} instance
61 	 * where messages will be sent via the specified executor.
62 	 * @param executor the executor used to send the message,
63 	 * or {@code null} to execute in the callers thread.
64 	 */
65 	this(Executor executor) {
66 		this.executor = executor;
67 	}
68 	
69 	Executor getExecutor() {
70 		return this.executor;
71 	}
72 
73 	override
74 	void setInterceptors(ChannelInterceptor[] interceptors) {
75 		super.setInterceptors(interceptors);
76 		// this.executorInterceptors.clear();
77 		// interceptors.forEach(this::updateExecutorInterceptorsFor);
78 		foreach(ChannelInterceptor c; interceptors) {
79 			this.updateExecutorInterceptorsFor(c);
80 		}
81 	}
82 
83 	override
84 	void addInterceptor(ChannelInterceptor interceptor) {
85 		super.addInterceptor(interceptor);
86 		updateExecutorInterceptorsFor(interceptor);
87 	}
88 
89 	override
90 	void addInterceptor(int index, ChannelInterceptor interceptor) {
91 		super.addInterceptor(index, interceptor);
92 		updateExecutorInterceptorsFor(interceptor);
93 	}
94 
95 	private void updateExecutorInterceptorsFor(ChannelInterceptor interceptor) {
96 		auto ec = cast(ExecutorChannelInterceptor) interceptor;
97 		if (ec !is null) {
98 			// this.executorInterceptors.add(ec);
99 			implementationMissing(false);
100 		}
101 	}
102 
103 
104 	override
105 	bool sendInternal(MessageBase message, long timeout) {
106 		version(HUNT_DEBUG) {
107 			trace("sending message: ", message.to!string());
108 		}
109 		foreach (MessageHandler handler ; getSubscribers()) {
110 			SendTask sendTask = new SendTask(message, handler);
111 			if (this.executor is null) {
112 				sendTask.run();
113 			} else {
114 				this.executor.execute(sendTask);
115 			}
116 		}
117 		return true;
118 	}
119 
120 
121 	/**
122 	 * Invoke a MessageHandler with ExecutorChannelInterceptors.
123 	 */
124 	private class SendTask : MessageHandlingRunnable {
125 
126 		private MessageBase inputMessage;
127 
128 		private MessageHandler messageHandler;
129 
130 		private int interceptorIndex = -1;
131 
132 		this(MessageBase message, MessageHandler handler) {
133 			version(HUNT_DEBUG) {
134 				tracef("creating SendTask for Message: %s, with handler: %s", 
135 					typeid(cast(Object)message), typeid(cast(Object)handler));
136 			}
137 			this.inputMessage = message;
138 			this.messageHandler = handler;
139 		}
140 
141 		override
142 		MessageBase getMessage() {
143 			return this.inputMessage;
144 		}
145 
146 		override
147 		MessageHandler getMessageHandler() {
148 			return this.messageHandler;
149 		}
150 
151 		override
152 		void run() {
153 			MessageBase message = this.inputMessage;
154 			try {
155 				message = applyBeforeHandle(message);
156 				if (message is null)
157 					return;
158 				this.messageHandler.handleMessage(message);
159 				triggerAfterMessageHandled(message, null);
160 			}
161 			catch (Exception ex) {
162 				triggerAfterMessageHandled(message, ex);
163 				auto e = cast(MessagingException) ex;
164 				if (e !is null) {
165 					throw e;
166 				}
167 				string description = "Failed to handle " ~ message.to!string() ~ 
168 					" to " ~ this.toString() ~ " in " ~ this.messageHandler.to!string();
169 				throw new MessageDeliveryException(message, description, ex);
170 			}
171 			catch (Throwable err) {
172 				string description = "Failed to handle " ~ message.to!string() ~ 
173 					" to " ~ this.toString() ~ " in " ~ this.messageHandler.to!string();
174 				MessageDeliveryException ex2 = new MessageDeliveryException(message, description, err);
175 				triggerAfterMessageHandled(message, ex2);
176 				throw ex2;
177 			}
178 		}
179 
180 		
181 		private MessageBase applyBeforeHandle(MessageBase message) {
182 			MessageBase messageToUse = message;
183 			// TODO: Tasks pending completion -@zxp at 11/13/2018, 2:17:10 PM
184 			// 
185 			// implementationMissing(false);
186 			// foreach (ExecutorChannelInterceptor interceptor ; executorInterceptors) {
187 			// 	messageToUse = interceptor.beforeHandle(messageToUse, ExecutorSubscribableChannel.this, this.messageHandler);
188 			// 	if (messageToUse is null) {
189 			// 		string name = interceptor.TypeUtils.getSimpleName(typeid(this));
190 			// 		version(HUNT_DEBUG) {
191 			// 			trace(name ~ " returned null from beforeHandle, i.e. precluding the send.");
192 			// 		}
193 			// 		triggerAfterMessageHandled(message, null);
194 			// 		return null;
195 			// 	}
196 			// 	this.interceptorIndex++;
197 			// }
198 			return messageToUse;
199 		}
200 
201 		private void triggerAfterMessageHandled(MessageBase message, Exception ex) {
202 			// TODO: Tasks pending completion -@zxp at 11/13/2018, 2:17:31 PM
203 			// 
204 			// implementationMissing(false);
205 			for (int i = this.interceptorIndex; i >= 0; i--) {
206 				// ExecutorChannelInterceptor interceptor = executorInterceptors.get(i);
207 				// try {
208 				// 	interceptor.afterMessageHandled(message, this.outer, this.messageHandler, ex);
209 				// }
210 				// catch (Throwable ex2) {
211 				// 	errorf("Exception from afterMessageHandled in " ~ interceptor.to!string(), ex2);
212 				// }
213 			}
214 		}
215 	}
216 
217 }