1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34 package info.magnolia.module.exchangesimple;
35
36 import info.magnolia.cms.core.ItemType;
37 import info.magnolia.cms.exchange.ActivationManagerFactory;
38 import info.magnolia.cms.exchange.ExchangeException;
39 import info.magnolia.cms.exchange.Subscriber;
40 import info.magnolia.cms.exchange.Subscription;
41 import info.magnolia.cms.security.SecurityUtil;
42
43 import java.io.IOException;
44 import java.net.MalformedURLException;
45 import java.net.URLConnection;
46 import java.util.Collection;
47 import java.util.Iterator;
48 import java.util.Map;
49 import java.util.Map.Entry;
50 import java.util.concurrent.ConcurrentHashMap;
51
52 import org.apache.commons.lang.StringUtils;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55
56 import EDU.oswego.cs.dl.util.concurrent.CountDown;
57 import EDU.oswego.cs.dl.util.concurrent.Sync;
58
59
60
61
62
63
64 public class SimpleSyndicator extends BaseSyndicatorImpl {
65 private static final Logger log = LoggerFactory.getLogger(SimpleSyndicator.class);
66
67 public SimpleSyndicator() {
68 }
69
70 @Override
71 public void activate(final ActivationContent activationContent, String nodePath) throws ExchangeException {
72 String nodeUUID = activationContent.getproperty(NODE_UUID);
73 Collection<Subscriber> subscribers = getSubscribers();
74 Iterator<Subscriber> subscriberIterator = subscribers.iterator();
75 final Sync done = new CountDown(subscribers.size());
76 final Map<Subscriber, Exception> errors = new ConcurrentHashMap<Subscriber, Exception>(subscribers.size());
77 while (subscriberIterator.hasNext()) {
78 final Subscriber subscriber = subscriberIterator.next();
79 if (subscriber.isActive()) {
80
81 if (Boolean.parseBoolean(activationContent.getproperty(ItemType.DELETED_NODE_MIXIN))) {
82 executeInPool(getDeactivateTask(done, errors, subscriber, nodeUUID, nodePath));
83 } else {
84 executeInPool(getActivateTask(activationContent, done, errors, subscriber, nodePath));
85 }
86 } else {
87
88 done.release();
89 }
90 }
91
92
93 acquireIgnoringInterruption(done);
94
95
96 if (!errors.isEmpty()) {
97 Exception e = null;
98 StringBuffer msg = new StringBuffer(errors.size() + " error").append(errors.size() > 1 ? "s" : "").append(" detected: ");
99 Iterator<Map.Entry<Subscriber, Exception>> iter = errors.entrySet().iterator();
100 while (iter.hasNext()) {
101 Entry<Subscriber, Exception> entry = iter.next();
102 e = entry.getValue();
103 Subscriber subscriber = entry.getKey();
104 msg.append("\n").append(e.getMessage()).append(" on ").append(subscriber.getName());
105 log.error(e.getMessage(), e);
106 }
107
108 throw new ExchangeException(msg.toString(), e);
109 }
110
111 executeInPool(new Runnable() {
112 public void run() {
113 cleanTemporaryStore(activationContent);
114 }
115 });
116 }
117
118 protected Collection<Subscriber> getSubscribers() {
119 return ActivationManagerFactory.getActivationManager().getSubscribers();
120 }
121
122 private Runnable getActivateTask(final ActivationContent activationContent, final Sync done, final Map<Subscriber, Exception> errors, final Subscriber subscriber, final String nodePath) {
123 Runnable r = new Runnable() {
124 public void run() {
125 try {
126 activate(subscriber, activationContent, nodePath);
127 } catch (Exception e) {
128 log.error("Failed to activate content.", e);
129 errors.put(subscriber,e);
130 } finally {
131 done.release();
132 }
133 }
134 };
135 return r;
136 }
137
138 @Override
139 public void doDeactivate(String nodeUUID, String nodePath) throws ExchangeException {
140 Collection<Subscriber> subscribers = getSubscribers();
141 Iterator<Subscriber> subscriberIterator = subscribers.iterator();
142 final Sync done = new CountDown(subscribers.size());
143 final Map<Subscriber, Exception> errors = new ConcurrentHashMap<Subscriber, Exception>();
144 while (subscriberIterator.hasNext()) {
145 final Subscriber subscriber = subscriberIterator.next();
146 if (subscriber.isActive()) {
147
148 executeInPool(getDeactivateTask(done, errors, subscriber, nodeUUID, nodePath));
149 } else {
150
151 done.release();
152 }
153 }
154
155
156 acquireIgnoringInterruption(done);
157
158
159 if (!errors.isEmpty()) {
160 Exception e = null;
161 StringBuffer msg = new StringBuffer(errors.size() + " error").append(
162 errors.size() > 1 ? "s" : "").append(" detected: ");
163 Iterator<Entry<Subscriber, Exception>> iter = errors.entrySet().iterator();
164 while (iter.hasNext()) {
165 Entry<Subscriber, Exception> entry = iter.next();
166 e = entry.getValue();
167 Subscriber subscriber = entry.getKey();
168 msg.append("\n").append(e.getMessage()).append(" on ").append(subscriber.getName());
169 log.error(e.getMessage(), e);
170 }
171
172 throw new ExchangeException(msg.toString(), e);
173 }
174 }
175
176 private Runnable getDeactivateTask(final Sync done, final Map<Subscriber, Exception> errors, final Subscriber subscriber, final String nodeUUID, final String nodePath) {
177 Runnable r = new Runnable() {
178 public void run() {
179 try {
180 doDeactivate(subscriber, nodeUUID, nodePath);
181 } catch (Exception e) {
182 log.error("Failed to deactivate content.", e);
183 errors.put(subscriber,e);
184 } finally {
185 done.release();
186 }
187 }
188 };
189 return r;
190 }
191
192
193
194
195
196
197 @Override
198 public String doDeactivate(Subscriber subscriber, String nodeUUID, String path) throws ExchangeException {
199 Subscription subscription = subscriber.getMatchedSubscription(path, this.repositoryName);
200 if (null != subscription) {
201 String urlString = getDeactivationURL(subscriber);
202 try {
203 URLConnection urlConnection = prepareConnection(subscriber, urlString);
204
205 this.addDeactivationHeaders(urlConnection, nodeUUID);
206 String status = urlConnection.getHeaderField(ACTIVATION_ATTRIBUTE_STATUS);
207
208
209 if (StringUtils.equals(status, ACTIVATION_FAILED)) {
210 String message = urlConnection.getHeaderField(ACTIVATION_ATTRIBUTE_MESSAGE);
211 throw new ExchangeException("Message received from subscriber: " + message);
212 }
213
214 urlConnection.getContent();
215
216 }
217 catch (MalformedURLException e) {
218 throw new ExchangeException("Incorrect URL for subscriber " + subscriber + "[" + SecurityUtil.stripPasswordFromUrl(urlString) + "]");
219 }
220 catch (IOException e) {
221 throw new ExchangeException("Not able to send the deactivation request [" + SecurityUtil.stripPasswordFromUrl(urlString) + "]: " + e.getMessage());
222 }
223 catch (Exception e) {
224 throw new ExchangeException(e);
225 }
226 }
227 return null;
228 }
229
230 }