1 /* 2 * Copyright 2002-2018 the original author or authors. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 module hunt.stomp.simp.broker.AbstractSubscriptionRegistry; 18 19 import hunt.stomp.simp.broker.SubscriptionRegistry; 20 21 import hunt.stomp.Message; 22 import hunt.stomp.MessageHeaders; 23 import hunt.stomp.simp.SimpMessageHeaderAccessor; 24 import hunt.stomp.simp.SimpMessageType; 25 26 import hunt.collection; 27 import hunt.Exceptions; 28 import hunt.logging; 29 30 import std.conv; 31 // import hunt.framework.util.CollectionUtils; 32 // import hunt.framework.util.LinkedMultiValueMap; 33 // import hunt.framework.util.MultiValueMap; 34 35 /** 36 * Abstract base class for implementations of {@link SubscriptionRegistry} that 37 * looks up information in messages but delegates to abstract methods for the 38 * actual storage and retrieval. 39 * 40 * @author Rossen Stoyanchev 41 * @since 4.0 42 */ 43 abstract class AbstractSubscriptionRegistry : SubscriptionRegistry { 44 45 private __gshared MultiValueMap!(string, string) EMPTY_MAP; 46 47 shared static this() { 48 EMPTY_MAP = new LinkedMultiValueMap!(string, string)(); 49 } 50 51 52 override 53 final void registerSubscription(MessageBase message) { 54 MessageHeaders headers = message.getHeaders(); 55 56 SimpMessageType messageType = 57 cast(SimpMessageType)SimpMessageHeaderAccessor.getMessageType(headers); 58 if (SimpMessageType.SUBSCRIBE != messageType) { 59 throw new IllegalArgumentException("Expected SUBSCRIBE: " ~ (cast(Object)message).toString()); 60 } 61 62 string sessionId = SimpMessageHeaderAccessor.getSessionId(headers); 63 if (sessionId is null) { 64 version(HUNT_DEBUG) { 65 error("No sessionId in " ~ (cast(Object)message).toString()); 66 } 67 return; 68 } 69 70 string subscriptionId = SimpMessageHeaderAccessor.getSubscriptionId(headers); 71 if (subscriptionId is null) { 72 version(HUNT_DEBUG) { 73 error("No subscriptionId in " ~ (cast(Object)message).toString()); 74 } 75 return; 76 } 77 78 string destination = SimpMessageHeaderAccessor.getDestination(headers); 79 if (destination is null) { 80 version(HUNT_DEBUG) { 81 error("No destination in " ~ (cast(Object)message).toString()); 82 } 83 return; 84 } 85 86 addSubscriptionInternal(sessionId, subscriptionId, destination, message); 87 } 88 89 override 90 final void unregisterSubscription(MessageBase message) { 91 MessageHeaders headers = message.getHeaders(); 92 93 SimpMessageType messageType = 94 cast(SimpMessageType)SimpMessageHeaderAccessor.getMessageType(headers); 95 if (!SimpMessageType.UNSUBSCRIBE == messageType) { 96 throw new IllegalArgumentException("Expected UNSUBSCRIBE: " ~ (cast(Object)message).toString()); 97 } 98 99 string sessionId = SimpMessageHeaderAccessor.getSessionId(headers); 100 if (sessionId is null) { 101 version(HUNT_DEBUG) { 102 error("No sessionId in " ~ (cast(Object)message).toString()); 103 } 104 return; 105 } 106 107 string subscriptionId = SimpMessageHeaderAccessor.getSubscriptionId(headers); 108 if (subscriptionId is null) { 109 version(HUNT_DEBUG) { 110 error("No subscriptionId " ~ (cast(Object)message).toString()); 111 } 112 return; 113 } 114 115 removeSubscriptionInternal(sessionId, subscriptionId, message); 116 } 117 118 override 119 final MultiValueMap!(string, string) findSubscriptions(MessageBase message) { 120 MessageHeaders headers = message.getHeaders(); 121 122 SimpMessageType type = 123 cast(SimpMessageType)SimpMessageHeaderAccessor.getMessageType(headers); 124 if (SimpMessageType.MESSAGE != type) { 125 throw new IllegalArgumentException("Unexpected message type: " ~ type.to!string()); 126 } 127 128 string destination = SimpMessageHeaderAccessor.getDestination(headers); 129 if (destination is null) { 130 version(HUNT_DEBUG) { 131 error("No destination in " ~ message.to!string()); 132 } 133 return EMPTY_MAP; 134 } 135 136 return findSubscriptionsInternal(destination, message); 137 } 138 139 140 protected abstract void addSubscriptionInternal( 141 string sessionId, string subscriptionId, string destination, MessageBase message); 142 143 protected abstract void removeSubscriptionInternal( 144 string sessionId, string subscriptionId, MessageBase message); 145 146 protected abstract MultiValueMap!(string, string) findSubscriptionsInternal( 147 string destination, MessageBase message); 148 149 }