1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34 package info.magnolia.module.exchangesimple;
35
36 import info.magnolia.cms.core.ItemType;
37 import info.magnolia.cms.exchange.ActivationManagerFactory;
38 import info.magnolia.cms.exchange.ExchangeException;
39 import info.magnolia.cms.exchange.Subscriber;
40 import info.magnolia.cms.exchange.Subscription;
41
42 import java.io.IOException;
43 import java.net.MalformedURLException;
44 import java.net.URLConnection;
45 import java.util.Collection;
46 import java.util.Iterator;
47 import java.util.Map;
48 import java.util.Map.Entry;
49 import java.util.concurrent.ConcurrentHashMap;
50
51 import org.apache.commons.lang.StringUtils;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
54
55 import EDU.oswego.cs.dl.util.concurrent.CountDown;
56 import EDU.oswego.cs.dl.util.concurrent.Sync;
57
58
59
60
61
62
63 public class SimpleSyndicator extends BaseSyndicatorImpl {
64 private static final Logger log = LoggerFactory.getLogger(SimpleSyndicator.class);
65
66 public SimpleSyndicator() {
67 }
68
69 @Override
70 public void activate(final ActivationContent activationContent, String nodePath) throws ExchangeException {
71 String nodeUUID = activationContent.getproperty(NODE_UUID);
72 Collection<Subscriber> subscribers = ActivationManagerFactory.getActivationManager().getSubscribers();
73 Iterator<Subscriber> subscriberIterator = subscribers.iterator();
74 final Sync done = new CountDown(subscribers.size());
75 final Map<Subscriber, Exception> errors = new ConcurrentHashMap<Subscriber, Exception>(subscribers.size());
76 while (subscriberIterator.hasNext()) {
77 final Subscriber subscriber = subscriberIterator.next();
78 if (subscriber.isActive()) {
79
80 if (Boolean.parseBoolean(activationContent.getproperty(ItemType.DELETED_NODE_MIXIN))) {
81 executeInPool(getDeactivateTask(done, errors, subscriber, nodeUUID, nodePath));
82 } else {
83 executeInPool(getActivateTask(activationContent, done, errors, subscriber, nodePath));
84 }
85 } else {
86
87 done.release();
88 }
89 }
90
91
92 acquireIgnoringInterruption(done);
93
94
95 if (!errors.isEmpty()) {
96 Exception e = null;
97 StringBuffer msg = new StringBuffer(errors.size() + " error").append(errors.size() > 1 ? "s" : "").append(" detected: ");
98 Iterator<Map.Entry<Subscriber, Exception>> iter = errors.entrySet().iterator();
99 while (iter.hasNext()) {
100 Entry<Subscriber, Exception> entry = iter.next();
101 e = entry.getValue();
102 Subscriber subscriber = entry.getKey();
103 msg.append("\n").append(e.getMessage()).append(" on ").append(subscriber.getName());
104 log.error(e.getMessage(), e);
105 }
106
107 throw new ExchangeException(msg.toString(), e);
108 }
109
110 executeInPool(new Runnable() {
111 @Override
112 public void run() {
113 cleanTemporaryStore(activationContent);
114 }
115 });
116 }
117
118 private Runnable getActivateTask(final ActivationContent activationContent, final Sync done, final Map<Subscriber, Exception> errors, final Subscriber subscriber, final String nodePath) {
119 Runnable r = new Runnable() {
120 @Override
121 public void run() {
122 try {
123 activate(subscriber, activationContent, nodePath);
124 } catch (Exception e) {
125 log.error("Failed to activate content.", e);
126 errors.put(subscriber,e);
127 } finally {
128 done.release();
129 }
130 }
131 };
132 return r;
133 }
134
135 @Override
136 public void doDeactivate(String nodeUUID, String nodePath) throws ExchangeException {
137 Collection<Subscriber> subscribers = ActivationManagerFactory.getActivationManager().getSubscribers();
138 Iterator<Subscriber> subscriberIterator = subscribers.iterator();
139 final Sync done = new CountDown(subscribers.size());
140 final Map<Subscriber, Exception> errors = new ConcurrentHashMap<Subscriber, Exception>();
141 while (subscriberIterator.hasNext()) {
142 final Subscriber subscriber = subscriberIterator.next();
143 if (subscriber.isActive()) {
144
145 executeInPool(getDeactivateTask(done, errors, subscriber, nodeUUID, nodePath));
146 } else {
147
148 done.release();
149 }
150 }
151
152
153 acquireIgnoringInterruption(done);
154
155
156 if (!errors.isEmpty()) {
157 Exception e = null;
158 StringBuffer msg = new StringBuffer(errors.size() + " error").append(
159 errors.size() > 1 ? "s" : "").append(" detected: ");
160 Iterator<Entry<Subscriber, Exception>> iter = errors.entrySet().iterator();
161 while (iter.hasNext()) {
162 Entry<Subscriber, Exception> entry = iter.next();
163 e = entry.getValue();
164 Subscriber subscriber = entry.getKey();
165 msg.append("\n").append(e.getMessage()).append(" on ").append(subscriber.getName());
166 log.error(e.getMessage(), e);
167 }
168
169 throw new ExchangeException(msg.toString(), e);
170 }
171 }
172
173 private Runnable getDeactivateTask(final Sync done, final Map<Subscriber, Exception> errors, final Subscriber subscriber, final String nodeUUID, final String nodePath) {
174 Runnable r = new Runnable() {
175 @Override
176 public void run() {
177 try {
178 doDeactivate(subscriber, nodeUUID, nodePath);
179 } catch (Exception e) {
180 log.error("Failed to deactivate content.", e);
181 errors.put(subscriber,e);
182 } finally {
183 done.release();
184 }
185 }
186 };
187 return r;
188 }
189
190
191
192
193
194
195 @Override
196 public String doDeactivate(Subscriber subscriber, String nodeUUID, String path) throws ExchangeException {
197 Subscription subscription = subscriber.getMatchedSubscription(path, this.repositoryName);
198 if (null != subscription) {
199 String urlString = getDeactivationURL(subscriber);
200 try {
201 URLConnection urlConnection = prepareConnection(subscriber, urlString);
202
203 this.addDeactivationHeaders(urlConnection, nodeUUID);
204 String status = urlConnection.getHeaderField(ACTIVATION_ATTRIBUTE_STATUS);
205
206
207 if (StringUtils.equals(status, ACTIVATION_FAILED)) {
208 String message = urlConnection.getHeaderField(ACTIVATION_ATTRIBUTE_MESSAGE);
209 throw new ExchangeException("Message received from subscriber: " + message);
210 }
211
212 urlConnection.getContent();
213
214 }
215 catch (MalformedURLException e) {
216 throw new ExchangeException("Incorrect URL for subscriber " + subscriber + "[" + stripPasswordFromUrl(urlString) + "]");
217 }
218 catch (IOException e) {
219 throw new ExchangeException("Not able to send the deactivation request [" + stripPasswordFromUrl(urlString) + "]: " + e.getMessage());
220 }
221 catch (Exception e) {
222 throw new ExchangeException(e);
223 }
224 }
225 return null;
226 }
227
228 }