1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34 package info.magnolia.module.exchangesimple;
35
36 import info.magnolia.cms.core.ItemType;
37 import info.magnolia.cms.exchange.ActivationManagerFactory;
38 import info.magnolia.cms.exchange.ExchangeException;
39 import info.magnolia.cms.exchange.Subscriber;
40 import info.magnolia.cms.exchange.Subscription;
41
42 import java.io.IOException;
43 import java.net.HttpURLConnection;
44 import java.net.MalformedURLException;
45 import java.net.URLConnection;
46 import java.util.Collection;
47 import java.util.Iterator;
48 import java.util.Map;
49 import java.util.Map.Entry;
50 import java.util.concurrent.ConcurrentHashMap;
51
52 import org.apache.commons.lang.StringUtils;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55
56 import EDU.oswego.cs.dl.util.concurrent.CountDown;
57 import EDU.oswego.cs.dl.util.concurrent.Sync;
58
59
60
61
62
63
64 public class SimpleSyndicator extends BaseSyndicatorImpl {
65 private static final Logger log = LoggerFactory.getLogger(SimpleSyndicator.class);
66
67 public SimpleSyndicator() {
68 }
69
70 @Override
71 public void activate(final ActivationContent activationContent, String nodePath) throws ExchangeException {
72 String nodeUUID = activationContent.getproperty(NODE_UUID);
73 Collection<Subscriber> subscribers = ActivationManagerFactory.getActivationManager().getSubscribers();
74 Iterator<Subscriber> subscriberIterator = subscribers.iterator();
75 final Sync done = new CountDown(subscribers.size());
76 final Map<Subscriber, Exception> errors = new ConcurrentHashMap<Subscriber, Exception>(subscribers.size());
77 while (subscriberIterator.hasNext()) {
78 final Subscriber subscriber = subscriberIterator.next();
79 if (subscriber.isActive()) {
80
81 if (Boolean.parseBoolean(activationContent.getproperty(ItemType.DELETED_NODE_MIXIN))) {
82 executeInPool(getDeactivateTask(done, errors, subscriber, nodeUUID, nodePath));
83 } else {
84 executeInPool(getActivateTask(activationContent, done, errors, subscriber, nodePath));
85 }
86 } else {
87
88 done.release();
89 }
90 }
91
92
93 acquireIgnoringInterruption(done);
94
95
96 if (!errors.isEmpty()) {
97 Exception e = null;
98 StringBuffer msg = new StringBuffer(errors.size() + " error").append(errors.size() > 1 ? "s" : "").append(" detected: ");
99 Iterator<Map.Entry<Subscriber, Exception>> iter = errors.entrySet().iterator();
100 while (iter.hasNext()) {
101 Entry<Subscriber, Exception> entry = iter.next();
102 e = entry.getValue();
103 Subscriber subscriber = entry.getKey();
104 msg.append("\n").append(e.getMessage()).append(" on ").append(subscriber.getName());
105 log.error(e.getMessage(), e);
106 }
107
108 throw new ExchangeException(msg.toString(), e);
109 }
110
111 executeInPool(new Runnable() {
112 public void run() {
113 cleanTemporaryStore(activationContent);
114 }
115 });
116 }
117
118 private Runnable getActivateTask(final ActivationContent activationContent, final Sync done, final Map<Subscriber, Exception> errors, final Subscriber subscriber, final String nodePath) {
119 Runnable r = new Runnable() {
120 public void run() {
121 try {
122 activate(subscriber, activationContent, nodePath);
123 } catch (ExchangeException e) {
124 log.error("Failed to activate content.", e);
125 errors.put(subscriber,e);
126 } finally {
127 done.release();
128 }
129 }
130 };
131 return r;
132 }
133
134
135
136
137
138
139
140 @Override
141 public String activate(Subscriber subscriber, ActivationContent activationContent, String nodePath) throws ExchangeException {
142 log.debug("activate");
143 if (null == subscriber) {
144 throw new ExchangeException("Null Subscriber");
145 }
146
147 String parentPath = null;
148
149 Subscription subscription = subscriber.getMatchedSubscription(nodePath, this.repositoryName);
150 if (null != subscription) {
151
152
153 parentPath = this.getMappedPath(this.parent, subscription);
154 } else {
155 log.debug("Exchange : subscriber [{}] is not subscribed to {}", subscriber.getName(), nodePath);
156 return null;
157 }
158 log.debug("Exchange : sending activation request to {} with user {}", subscriber.getName(), this.user.getName());
159
160 URLConnection urlConnection = null;
161 try {
162 urlConnection = prepareConnection(subscriber);
163 this.addActivationHeaders(urlConnection, activationContent);
164
165 urlConnection.setRequestProperty(PARENT_PATH, parentPath);
166
167 Transporter.transport((HttpURLConnection) urlConnection, activationContent);
168
169 String status = urlConnection.getHeaderField(ACTIVATION_ATTRIBUTE_STATUS);
170
171
172 if (StringUtils.equals(status, ACTIVATION_FAILED)) {
173 String message = urlConnection.getHeaderField(ACTIVATION_ATTRIBUTE_MESSAGE);
174 throw new ExchangeException("Message received from subscriber: " + message);
175 }
176 urlConnection.getContent();
177 log.debug("Exchange : activation request sent to {}", subscriber.getName());
178 }
179 catch (ExchangeException e) {
180 throw e;
181 }
182 catch (IOException e) {
183 throw new ExchangeException("Not able to send the activation request [" + (urlConnection == null ? null : urlConnection.getURL()) + "]: " + e.getMessage());
184 }
185 catch (Exception e) {
186 throw new ExchangeException(e);
187 }
188 return null;
189 }
190
191 @Override
192 public void doDeactivate(String nodeUUID, String nodePath) throws ExchangeException {
193 Collection<Subscriber> subscribers = ActivationManagerFactory.getActivationManager().getSubscribers();
194 Iterator<Subscriber> subscriberIterator = subscribers.iterator();
195 final Sync done = new CountDown(subscribers.size());
196 final Map<Subscriber, Exception> errors = new ConcurrentHashMap<Subscriber, Exception>();
197 while (subscriberIterator.hasNext()) {
198 final Subscriber subscriber = subscriberIterator.next();
199 if (subscriber.isActive()) {
200
201 executeInPool(getDeactivateTask(done, errors, subscriber, nodeUUID, nodePath));
202 } else {
203
204 done.release();
205 }
206 }
207
208
209 acquireIgnoringInterruption(done);
210
211
212 if (!errors.isEmpty()) {
213 Exception e = null;
214 StringBuffer msg = new StringBuffer(errors.size() + " error").append(
215 errors.size() > 1 ? "s" : "").append(" detected: ");
216 Iterator<Entry<Subscriber, Exception>> iter = errors.entrySet().iterator();
217 while (iter.hasNext()) {
218 Entry<Subscriber, Exception> entry = iter.next();
219 e = entry.getValue();
220 Subscriber subscriber = entry.getKey();
221 msg.append("\n").append(e.getMessage()).append(" on ").append(subscriber.getName());
222 log.error(e.getMessage(), e);
223 }
224
225 throw new ExchangeException(msg.toString(), e);
226 }
227 }
228
229 private Runnable getDeactivateTask(final Sync done, final Map<Subscriber, Exception> errors, final Subscriber subscriber, final String nodeUUID, final String nodePath) {
230 Runnable r = new Runnable() {
231 public void run() {
232 try {
233 doDeactivate(subscriber, nodeUUID, nodePath);
234 } catch (ExchangeException e) {
235 log.error("Failed to deactivate content.", e);
236 errors.put(subscriber,e);
237 } finally {
238 done.release();
239 }
240 }
241 };
242 return r;
243 }
244
245
246
247
248
249
250 @Override
251 public String doDeactivate(Subscriber subscriber, String nodeUUID, String path) throws ExchangeException {
252 Subscription subscription = subscriber.getMatchedSubscription(path, this.repositoryName);
253 if (null != subscription) {
254 String handle = getDeactivationURL(subscriber);
255 try {
256 URLConnection urlConnection = prepareConnection(subscriber);
257
258 this.addDeactivationHeaders(urlConnection, nodeUUID);
259 String status = urlConnection.getHeaderField(ACTIVATION_ATTRIBUTE_STATUS);
260
261
262 if (StringUtils.equals(status, ACTIVATION_FAILED)) {
263 String message = urlConnection.getHeaderField(ACTIVATION_ATTRIBUTE_MESSAGE);
264 throw new ExchangeException("Message received from subscriber: " + message);
265 }
266
267 urlConnection.getContent();
268
269 }
270 catch (MalformedURLException e) {
271 throw new ExchangeException("Incorrect URL for subscriber " + subscriber + "[" + handle + "]");
272 }
273 catch (IOException e) {
274 throw new ExchangeException("Not able to send the deactivation request [" + handle + "]: " + e.getMessage());
275 }
276 catch (Exception e) {
277 throw new ExchangeException(e);
278 }
279 }
280 return null;
281 }
282
283 }