1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34 package info.magnolia.module.exchangesimple;
35
36 import info.magnolia.cms.core.ItemType;
37 import info.magnolia.cms.exchange.ActivationManagerFactory;
38 import info.magnolia.cms.exchange.ExchangeException;
39 import info.magnolia.cms.exchange.Subscriber;
40 import info.magnolia.cms.exchange.Subscription;
41
42 import java.io.IOException;
43 import java.net.MalformedURLException;
44 import java.net.URLConnection;
45 import java.util.Collection;
46 import java.util.Iterator;
47 import java.util.Map;
48 import java.util.Map.Entry;
49 import java.util.concurrent.ConcurrentHashMap;
50
51 import org.apache.commons.lang.StringUtils;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
54
55 import EDU.oswego.cs.dl.util.concurrent.CountDown;
56 import EDU.oswego.cs.dl.util.concurrent.Sync;
57
58
59
60
61
62
63 public class SimpleSyndicator extends BaseSyndicatorImpl {
64 private static final Logger log = LoggerFactory.getLogger(SimpleSyndicator.class);
65
66 public SimpleSyndicator() {
67 }
68
69 @Override
70 public void activate(final ActivationContent activationContent, String nodePath) throws ExchangeException {
71 String nodeUUID = activationContent.getproperty(NODE_UUID);
72 Collection<Subscriber> subscribers = ActivationManagerFactory.getActivationManager().getSubscribers();
73 Iterator<Subscriber> subscriberIterator = subscribers.iterator();
74 final Sync done = new CountDown(subscribers.size());
75 final Map<Subscriber, Exception> errors = new ConcurrentHashMap<Subscriber, Exception>(subscribers.size());
76 while (subscriberIterator.hasNext()) {
77 final Subscriber subscriber = subscriberIterator.next();
78 if (subscriber.isActive()) {
79
80 if (Boolean.parseBoolean(activationContent.getproperty(ItemType.DELETED_NODE_MIXIN))) {
81 executeInPool(getDeactivateTask(done, errors, subscriber, nodeUUID, nodePath));
82 } else {
83 executeInPool(getActivateTask(activationContent, done, errors, subscriber, nodePath));
84 }
85 } else {
86
87 done.release();
88 }
89 }
90
91
92 acquireIgnoringInterruption(done);
93
94
95 if (!errors.isEmpty()) {
96 Exception e = null;
97 StringBuffer msg = new StringBuffer(errors.size() + " error").append(errors.size() > 1 ? "s" : "").append(" detected: ");
98 Iterator<Map.Entry<Subscriber, Exception>> iter = errors.entrySet().iterator();
99 while (iter.hasNext()) {
100 Entry<Subscriber, Exception> entry = iter.next();
101 e = entry.getValue();
102 Subscriber subscriber = entry.getKey();
103 msg.append("\n").append(e.getMessage()).append(" on ").append(subscriber.getName());
104 log.error(e.getMessage(), e);
105 }
106
107 throw new ExchangeException(msg.toString(), e);
108 }
109
110 executeInPool(new Runnable() {
111 public void run() {
112 cleanTemporaryStore(activationContent);
113 }
114 });
115 }
116
117 private Runnable getActivateTask(final ActivationContent activationContent, final Sync done, final Map<Subscriber, Exception> errors, final Subscriber subscriber, final String nodePath) {
118 Runnable r = new Runnable() {
119 public void run() {
120 try {
121 activate(subscriber, activationContent, nodePath);
122 } catch (Exception e) {
123 log.error("Failed to activate content.", e);
124 errors.put(subscriber,e);
125 } finally {
126 done.release();
127 }
128 }
129 };
130 return r;
131 }
132
133 @Override
134 public void doDeactivate(String nodeUUID, String nodePath) throws ExchangeException {
135 Collection<Subscriber> subscribers = ActivationManagerFactory.getActivationManager().getSubscribers();
136 Iterator<Subscriber> subscriberIterator = subscribers.iterator();
137 final Sync done = new CountDown(subscribers.size());
138 final Map<Subscriber, Exception> errors = new ConcurrentHashMap<Subscriber, Exception>();
139 while (subscriberIterator.hasNext()) {
140 final Subscriber subscriber = subscriberIterator.next();
141 if (subscriber.isActive()) {
142
143 executeInPool(getDeactivateTask(done, errors, subscriber, nodeUUID, nodePath));
144 } else {
145
146 done.release();
147 }
148 }
149
150
151 acquireIgnoringInterruption(done);
152
153
154 if (!errors.isEmpty()) {
155 Exception e = null;
156 StringBuffer msg = new StringBuffer(errors.size() + " error").append(
157 errors.size() > 1 ? "s" : "").append(" detected: ");
158 Iterator<Entry<Subscriber, Exception>> iter = errors.entrySet().iterator();
159 while (iter.hasNext()) {
160 Entry<Subscriber, Exception> entry = iter.next();
161 e = entry.getValue();
162 Subscriber subscriber = entry.getKey();
163 msg.append("\n").append(e.getMessage()).append(" on ").append(subscriber.getName());
164 log.error(e.getMessage(), e);
165 }
166
167 throw new ExchangeException(msg.toString(), e);
168 }
169 }
170
171 private Runnable getDeactivateTask(final Sync done, final Map<Subscriber, Exception> errors, final Subscriber subscriber, final String nodeUUID, final String nodePath) {
172 Runnable r = new Runnable() {
173 public void run() {
174 try {
175 doDeactivate(subscriber, nodeUUID, nodePath);
176 } catch (Exception e) {
177 log.error("Failed to deactivate content.", e);
178 errors.put(subscriber,e);
179 } finally {
180 done.release();
181 }
182 }
183 };
184 return r;
185 }
186
187
188
189
190
191
192 @Override
193 public String doDeactivate(Subscriber subscriber, String nodeUUID, String path) throws ExchangeException {
194 Subscription subscription = subscriber.getMatchedSubscription(path, this.repositoryName);
195 if (null != subscription) {
196 String urlString = getDeactivationURL(subscriber);
197 try {
198 URLConnection urlConnection = prepareConnection(subscriber, urlString);
199
200 this.addDeactivationHeaders(urlConnection, nodeUUID);
201 String status = urlConnection.getHeaderField(ACTIVATION_ATTRIBUTE_STATUS);
202
203
204 if (StringUtils.equals(status, ACTIVATION_FAILED)) {
205 String message = urlConnection.getHeaderField(ACTIVATION_ATTRIBUTE_MESSAGE);
206 throw new ExchangeException("Message received from subscriber: " + message);
207 }
208
209 urlConnection.getContent();
210
211 }
212 catch (MalformedURLException e) {
213 throw new ExchangeException("Incorrect URL for subscriber " + subscriber + "[" + stripPasswordFromUrl(urlString) + "]");
214 }
215 catch (IOException e) {
216 throw new ExchangeException("Not able to send the deactivation request [" + stripPasswordFromUrl(urlString) + "]: " + e.getMessage());
217 }
218 catch (Exception e) {
219 throw new ExchangeException(e);
220 }
221 }
222 return null;
223 }
224
225 }