1 /*
2  * Copyright 2002-2018 the original author or authors.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 module hunt.stomp.simp.broker.OrderedMessageSender;
18 
19 import hunt.stomp.support.ChannelInterceptor;
20 
21 import hunt.collection;
22 import hunt.util.Common;
23 import hunt.Exceptions;
24 import hunt.logging;
25 
26 import hunt.stomp.Message;
27 import hunt.stomp.MessageChannel;
28 import hunt.stomp.simp.SimpMessageHeaderAccessor;
29 import hunt.stomp.support.ExecutorChannelInterceptor;
30 import hunt.stomp.support.ExecutorSubscribableChannel;
31 import hunt.stomp.support.MessageHeaderAccessor;
32 
33 import core.atomic;
34 import std.conv;
35 import std.container.dlist;
36 
37 /**
38  * Submit messages to an {@link ExecutorSubscribableChannel}, one at a time.
39  * The channel must have been configured with {@link #configureOutboundChannel}.
40  *
41  * @author Rossen Stoyanchev
42  * @since 5.1
43  */
44 class OrderedMessageSender : MessageChannel {
45 
46 	enum string COMPLETION_TASK_HEADER = "simpSendCompletionTask";
47 
48 	private MessageChannel channel;
49 
50 	// private Queue!(MessageBase) messages;
51 	private DList!(MessageBase) messages;
52 
53 	private shared bool sendInProgress = false;
54 
55 
56 	this(MessageChannel channel) {
57 		this.channel = channel;
58 	}
59 
60 	private void initlize() {
61 		// messages = new ConcurrentLinkedQueue<>();
62 		// messages = new LinkedQueue!(MessageBase)();
63 	}
64 
65 	// bool send(MessageBase message) {
66 	// 	return send(message, -1);
67 	// }
68 
69 	override
70 	bool send(MessageBase message, long timeout) {
71 		this.messages.insertBack(message);
72 		trySend();
73 		return true;
74 	}
75 
76 	private void trySend() {
77 		// Take sendInProgress flag only if queue is not empty
78 		if (this.messages.empty) {
79 			return;
80 		}
81 
82 		if (cas(&this.sendInProgress, false, true)) {
83 			sendNextMessage();
84 		}
85 	}
86 
87 	private void sendNextMessage() {
88 		for (;;) {
89 			MessageBase message = this.messages.front;
90 			this.messages.removeFront();
91 			if (message !is null) {
92 				try {
93 					addCompletionCallback(message);
94 					if (this.channel.send(message)) {
95 						return;
96 					}
97 				}
98 				catch (Throwable ex) {
99 					version(HUNT_DEBUG) {
100 						error("Failed to send " ~ message.to!string(), ex);
101 					}
102 				}
103 			}
104 			else {
105 				// We ran out of messages..
106 				this.sendInProgress = false;
107 				trySend();
108 				break;
109 			}
110 		}
111 	}
112 
113 	private void addCompletionCallback(MessageBase msg) {
114 		implementationMissing(false);
115 		// SimpMessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(msg, SimpMessageHeaderAccessor.class);
116 		// assert(accessor !is null && accessor.isMutable(), "Expected mutable SimpMessageHeaderAccessor");
117 		// accessor.setHeader(COMPLETION_TASK_HEADER, &this.sendNextMessage);
118 	}
119 
120 
121 	/**
122 	 * Install or remove an {@link ExecutorChannelInterceptor} that invokes a
123 	 * completion task once the message is handled.
124 	 * @param channel the channel to configure
125 	 * @param preservePublishOrder whether preserve order is on or off based on
126 	 * which an interceptor is either added or removed.
127 	 */
128 	static void configureOutboundChannel(MessageChannel channel, bool preservePublishOrder) {
129 		if (preservePublishOrder) {
130 			ExecutorSubscribableChannel execChannel = cast(ExecutorSubscribableChannel) channel;
131 			assert(execChannel !is null, 
132 				"An ExecutorSubscribableChannel is required for `preservePublishOrder`");
133 			implementationMissing(false);
134 			// if (execChannel.getInterceptors().stream().noneMatch(i -> i instanceof CallbackInterceptor)) {
135 			// 	execChannel.addInterceptor(0, new CallbackInterceptor());
136 			// }
137 		}
138 		else {
139 			ExecutorSubscribableChannel execChannel = cast(ExecutorSubscribableChannel) channel;
140 			if(execChannel !is null) {
141 				foreach(ChannelInterceptor i; execChannel.getInterceptors()) {
142 					CallbackInterceptor ci = cast(CallbackInterceptor)i;
143 					if(ci !is null)	{
144 						execChannel.removeInterceptor(ci);
145 						break;
146 					}
147 				}
148 			}
149 		}
150 	}
151 
152 }
153 
154 
155 
156 private class CallbackInterceptor : ExecutorChannelInterceptor {
157 
158 	override
159 	void afterMessageHandled(
160 			MessageBase msg, MessageChannel ch, MessageHandler handler, Exception ex) {
161 
162 		Runnable task = cast(Runnable) msg.getHeaders().get(OrderedMessageSender.COMPLETION_TASK_HEADER);
163 		if (task !is null) {
164 			task.run();
165 		}
166 	}
167 }