1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34 package info.magnolia.module.exchangesimple;
35
36 import info.magnolia.cms.core.ItemType;
37 import info.magnolia.cms.exchange.ActivationManagerFactory;
38 import info.magnolia.cms.exchange.ExchangeException;
39 import info.magnolia.cms.exchange.Subscriber;
40 import info.magnolia.cms.exchange.Subscription;
41 import info.magnolia.cms.security.SecurityUtil;
42
43 import java.io.IOException;
44 import java.net.MalformedURLException;
45 import java.net.URLConnection;
46 import java.util.Collection;
47 import java.util.Iterator;
48 import java.util.Map;
49 import java.util.Map.Entry;
50 import java.util.concurrent.ConcurrentHashMap;
51
52 import org.apache.commons.lang.StringUtils;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55
56 import EDU.oswego.cs.dl.util.concurrent.CountDown;
57 import EDU.oswego.cs.dl.util.concurrent.Sync;
58
59
60
61
62
63
64 public class SimpleSyndicator extends BaseSyndicatorImpl {
65 private static final Logger log = LoggerFactory.getLogger(SimpleSyndicator.class);
66
67 public SimpleSyndicator() {
68 }
69
70 @Override
71 public void activate(final ActivationContent activationContent, String nodePath) throws ExchangeException {
72 String nodeUUID = activationContent.getproperty(NODE_UUID);
73 Collection<Subscriber> subscribers = ActivationManagerFactory.getActivationManager().getSubscribers();
74 Iterator<Subscriber> subscriberIterator = subscribers.iterator();
75 final Sync done = new CountDown(subscribers.size());
76 final Map<Subscriber, Exception> errors = new ConcurrentHashMap<Subscriber, Exception>(subscribers.size());
77 while (subscriberIterator.hasNext()) {
78 final Subscriber subscriber = subscriberIterator.next();
79 if (subscriber.isActive()) {
80
81 if (Boolean.parseBoolean(activationContent.getproperty(ItemType.DELETED_NODE_MIXIN))) {
82 executeInPool(getDeactivateTask(done, errors, subscriber, nodeUUID, nodePath));
83 } else {
84 executeInPool(getActivateTask(activationContent, done, errors, subscriber, nodePath));
85 }
86 } else {
87
88 done.release();
89 }
90 }
91
92
93 acquireIgnoringInterruption(done);
94
95
96 if (!errors.isEmpty()) {
97 Exception e = null;
98 StringBuffer msg = new StringBuffer(errors.size() + " error").append(errors.size() > 1 ? "s" : "").append(" detected: ");
99 Iterator<Map.Entry<Subscriber, Exception>> iter = errors.entrySet().iterator();
100 while (iter.hasNext()) {
101 Entry<Subscriber, Exception> entry = iter.next();
102 e = entry.getValue();
103 Subscriber subscriber = entry.getKey();
104 msg.append("\n").append(e.getMessage()).append(" on ").append(subscriber.getName());
105 log.error(e.getMessage(), e);
106 }
107
108 throw new ExchangeException(msg.toString(), e);
109 }
110
111 executeInPool(new Runnable() {
112 public void run() {
113 cleanTemporaryStore(activationContent);
114 }
115 });
116 }
117
118 private Runnable getActivateTask(final ActivationContent activationContent, final Sync done, final Map<Subscriber, Exception> errors, final Subscriber subscriber, final String nodePath) {
119 Runnable r = new Runnable() {
120 public void run() {
121 try {
122 activate(subscriber, activationContent, nodePath);
123 } catch (Exception e) {
124 log.error("Failed to activate content.", e);
125 errors.put(subscriber,e);
126 } finally {
127 done.release();
128 }
129 }
130 };
131 return r;
132 }
133
134 @Override
135 public void doDeactivate(String nodeUUID, String nodePath) throws ExchangeException {
136 Collection<Subscriber> subscribers = ActivationManagerFactory.getActivationManager().getSubscribers();
137 Iterator<Subscriber> subscriberIterator = subscribers.iterator();
138 final Sync done = new CountDown(subscribers.size());
139 final Map<Subscriber, Exception> errors = new ConcurrentHashMap<Subscriber, Exception>();
140 while (subscriberIterator.hasNext()) {
141 final Subscriber subscriber = subscriberIterator.next();
142 if (subscriber.isActive()) {
143
144 executeInPool(getDeactivateTask(done, errors, subscriber, nodeUUID, nodePath));
145 } else {
146
147 done.release();
148 }
149 }
150
151
152 acquireIgnoringInterruption(done);
153
154
155 if (!errors.isEmpty()) {
156 Exception e = null;
157 StringBuffer msg = new StringBuffer(errors.size() + " error").append(
158 errors.size() > 1 ? "s" : "").append(" detected: ");
159 Iterator<Entry<Subscriber, Exception>> iter = errors.entrySet().iterator();
160 while (iter.hasNext()) {
161 Entry<Subscriber, Exception> entry = iter.next();
162 e = entry.getValue();
163 Subscriber subscriber = entry.getKey();
164 msg.append("\n").append(e.getMessage()).append(" on ").append(subscriber.getName());
165 log.error(e.getMessage(), e);
166 }
167
168 throw new ExchangeException(msg.toString(), e);
169 }
170 }
171
172 private Runnable getDeactivateTask(final Sync done, final Map<Subscriber, Exception> errors, final Subscriber subscriber, final String nodeUUID, final String nodePath) {
173 Runnable r = new Runnable() {
174 public void run() {
175 try {
176 doDeactivate(subscriber, nodeUUID, nodePath);
177 } catch (Exception e) {
178 log.error("Failed to deactivate content.", e);
179 errors.put(subscriber,e);
180 } finally {
181 done.release();
182 }
183 }
184 };
185 return r;
186 }
187
188
189
190
191
192
193 @Override
194 public String doDeactivate(Subscriber subscriber, String nodeUUID, String path) throws ExchangeException {
195 Subscription subscription = subscriber.getMatchedSubscription(path, this.repositoryName);
196 if (null != subscription) {
197 String urlString = getDeactivationURL(subscriber);
198 try {
199 URLConnection urlConnection = prepareConnection(subscriber, urlString);
200
201 this.addDeactivationHeaders(urlConnection, nodeUUID);
202 String status = urlConnection.getHeaderField(ACTIVATION_ATTRIBUTE_STATUS);
203
204
205 if (StringUtils.equals(status, ACTIVATION_FAILED)) {
206 String message = urlConnection.getHeaderField(ACTIVATION_ATTRIBUTE_MESSAGE);
207 throw new ExchangeException("Message received from subscriber: " + message);
208 }
209
210 urlConnection.getContent();
211
212 }
213 catch (MalformedURLException e) {
214 throw new ExchangeException("Incorrect URL for subscriber " + subscriber + "[" + SecurityUtil.stripPasswordFromUrl(urlString) + "]");
215 }
216 catch (IOException e) {
217 throw new ExchangeException("Not able to send the deactivation request [" + SecurityUtil.stripPasswordFromUrl(urlString) + "]: " + e.getMessage());
218 }
219 catch (Exception e) {
220 throw new ExchangeException(e);
221 }
222 }
223 return null;
224 }
225
226 }