View Javadoc

1   /**
2    * This file Copyright (c) 2003-2010 Magnolia International
3    * Ltd.  (http://www.magnolia-cms.com). All rights reserved.
4    *
5    *
6    * This file is dual-licensed under both the Magnolia
7    * Network Agreement and the GNU General Public License.
8    * You may elect to use one or the other of these licenses.
9    *
10   * This file is distributed in the hope that it will be
11   * useful, but AS-IS and WITHOUT ANY WARRANTY; without even the
12   * implied warranty of MERCHANTABILITY or FITNESS FOR A
13   * PARTICULAR PURPOSE, TITLE, or NONINFRINGEMENT.
14   * Redistribution, except as permitted by whichever of the GPL
15   * or MNA you select, is prohibited.
16   *
17   * 1. For the GPL license (GPL), you can redistribute and/or
18   * modify this file under the terms of the GNU General
19   * Public License, Version 3, as published by the Free Software
20   * Foundation.  You should have received a copy of the GNU
21   * General Public License, Version 3 along with this program;
22   * if not, write to the Free Software Foundation, Inc., 51
23   * Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
24   *
25   * 2. For the Magnolia Network Agreement (MNA), this file
26   * and the accompanying materials are made available under the
27   * terms of the MNA which accompanies this distribution, and
28   * is available at http://www.magnolia-cms.com/mna.html
29   *
30   * Any modifications to this file must keep this entire header
31   * intact.
32   *
33   */
34  package info.magnolia.module.exchangesimple;
35  
36  import info.magnolia.cms.core.ItemType;
37  import info.magnolia.cms.exchange.ActivationManagerFactory;
38  import info.magnolia.cms.exchange.ExchangeException;
39  import info.magnolia.cms.exchange.Subscriber;
40  import info.magnolia.cms.exchange.Subscription;
41  
42  import java.io.IOException;
43  import java.net.HttpURLConnection;
44  import java.net.MalformedURLException;
45  import java.net.URLConnection;
46  import java.util.Collection;
47  import java.util.Iterator;
48  import java.util.Map;
49  import java.util.Map.Entry;
50  import java.util.concurrent.ConcurrentHashMap;
51  
52  import org.apache.commons.lang.StringUtils;
53  import org.slf4j.Logger;
54  import org.slf4j.LoggerFactory;
55  
56  import EDU.oswego.cs.dl.util.concurrent.CountDown;
57  import EDU.oswego.cs.dl.util.concurrent.Sync;
58  
59  
60  /**
61   * Implementation of syndicator that simply sends all activated content over http connection specified in the subscriber.
62   * @author Sameer Charles $Id: SimpleSyndicator.java 38886 2010-11-08 10:17:20Z had $
63   */
64  public class SimpleSyndicator extends BaseSyndicatorImpl {
65      private static final Logger log = LoggerFactory.getLogger(SimpleSyndicator.class);
66  
67      public SimpleSyndicator() {
68      }
69  
70      @Override
71      public void activate(final ActivationContent activationContent, String nodePath) throws ExchangeException {
72          String nodeUUID = activationContent.getproperty(NODE_UUID);
73          Collection<Subscriber> subscribers = ActivationManagerFactory.getActivationManager().getSubscribers();
74          Iterator<Subscriber> subscriberIterator = subscribers.iterator();
75          final Sync done = new CountDown(subscribers.size());
76          final Map<Subscriber, Exception> errors = new ConcurrentHashMap<Subscriber, Exception>(subscribers.size());
77          while (subscriberIterator.hasNext()) {
78              final Subscriber subscriber = subscriberIterator.next();
79              if (subscriber.isActive()) {
80                  // Create runnable task for each subscriber execute
81                  if (Boolean.parseBoolean(activationContent.getproperty(ItemType.DELETED_NODE_MIXIN))) {
82                      executeInPool(getDeactivateTask(done, errors, subscriber, nodeUUID, nodePath));
83                  } else {
84                      executeInPool(getActivateTask(activationContent, done, errors, subscriber, nodePath));
85                  }
86              } else {
87                  // count down directly
88                  done.release();
89              }
90          } //end of subscriber loop
91  
92          // wait until all tasks are executed before returning back to user to make sure errors can be propagated back to the user.
93          acquireIgnoringInterruption(done);
94  
95          // collect all the errors and send them back.
96          if (!errors.isEmpty()) {
97              Exception e = null;
98              StringBuffer msg = new StringBuffer(errors.size() + " error").append(errors.size() > 1 ? "s" : "").append(" detected: ");
99              Iterator<Map.Entry<Subscriber, Exception>> iter = errors.entrySet().iterator();
100             while (iter.hasNext()) {
101                 Entry<Subscriber, Exception> entry = iter.next();
102                 e = entry.getValue();
103                 Subscriber subscriber = entry.getKey();
104                 msg.append("\n").append(e.getMessage()).append(" on ").append(subscriber.getName());
105                 log.error(e.getMessage(), e);
106             }
107 
108             throw new ExchangeException(msg.toString(), e);
109         }
110 
111         executeInPool(new Runnable() {
112             public void run() {
113                 cleanTemporaryStore(activationContent);
114             }
115         });
116     }
117 
118     private Runnable getActivateTask(final ActivationContent activationContent, final Sync done, final Map<Subscriber, Exception> errors, final Subscriber subscriber, final String nodePath) {
119         Runnable r = new Runnable() {
120             public void run() {
121                 try {
122                     activate(subscriber, activationContent, nodePath);
123                 } catch (ExchangeException e) {
124                     log.error("Failed to activate content.", e);
125                     errors.put(subscriber,e);
126                 } finally {
127                     done.release();
128                 }
129             }
130         };
131         return r;
132     }
133 
134     /**
135      * Send activation request if subscribed to the activated URI.
136      * @param subscriber
137      * @param activationContent
138      * @throws ExchangeException
139      */
140     @Override
141     public String activate(Subscriber subscriber, ActivationContent activationContent, String nodePath) throws ExchangeException {
142         log.debug("activate");
143         if (null == subscriber) {
144             throw new ExchangeException("Null Subscriber");
145         }
146 
147         String parentPath = null;
148 
149         Subscription subscription = subscriber.getMatchedSubscription(nodePath, this.repositoryName);
150         if (null != subscription) {
151             // its subscribed since we found the matching subscription
152             // unfortunately activationContent is not thread safe and is used by multiple threads in case of multiple subscribers so we can't use it as a vessel for transfer of parentPath value
153             parentPath = this.getMappedPath(this.parent, subscription);
154         } else {
155             log.debug("Exchange : subscriber [{}] is not subscribed to {}", subscriber.getName(), nodePath);
156             return null;
157         }
158         log.debug("Exchange : sending activation request to {} with user {}", subscriber.getName(), this.user.getName()); //$NON-NLS-1$
159 
160         URLConnection urlConnection = null;
161         try {
162             urlConnection = prepareConnection(subscriber);
163             this.addActivationHeaders(urlConnection, activationContent);
164             // set a parent path manually instead of via activationHeaders since it can differ between subscribers.
165             urlConnection.setRequestProperty(PARENT_PATH, parentPath);
166 
167             Transporter.transport((HttpURLConnection) urlConnection, activationContent);
168 
169             String status = urlConnection.getHeaderField(ACTIVATION_ATTRIBUTE_STATUS);
170 
171             // check if the activation failed
172             if (StringUtils.equals(status, ACTIVATION_FAILED)) {
173                 String message = urlConnection.getHeaderField(ACTIVATION_ATTRIBUTE_MESSAGE);
174                 throw new ExchangeException("Message received from subscriber: " + message);
175             }
176             urlConnection.getContent();
177             log.debug("Exchange : activation request sent to {}", subscriber.getName()); //$NON-NLS-1$
178         }
179         catch (ExchangeException e) {
180             throw e;
181         }
182         catch (IOException e) {
183             throw new ExchangeException("Not able to send the activation request [" + (urlConnection == null ? null : urlConnection.getURL()) + "]: " + e.getMessage());
184         }
185         catch (Exception e) {
186             throw new ExchangeException(e);
187         }
188         return null;
189     }
190 
191     @Override
192     public void doDeactivate(String nodeUUID, String nodePath) throws ExchangeException {
193         Collection<Subscriber> subscribers = ActivationManagerFactory.getActivationManager().getSubscribers();
194         Iterator<Subscriber> subscriberIterator = subscribers.iterator();
195         final Sync done = new CountDown(subscribers.size());
196         final Map<Subscriber, Exception> errors = new ConcurrentHashMap<Subscriber, Exception>();
197         while (subscriberIterator.hasNext()) {
198             final Subscriber subscriber = subscriberIterator.next();
199             if (subscriber.isActive()) {
200                 // Create runnable task for each subscriber.
201                 executeInPool(getDeactivateTask(done, errors, subscriber, nodeUUID, nodePath));
202             } else {
203                 // count down directly
204                 done.release();
205             }
206         } //end of subscriber loop
207 
208         // wait until all tasks are executed before returning back to user to make sure errors can be propagated back to the user.
209         acquireIgnoringInterruption(done);
210 
211         // collect all the errors and send them back.
212         if (!errors.isEmpty()) {
213             Exception e = null;
214             StringBuffer msg = new StringBuffer(errors.size() + " error").append(
215                     errors.size() > 1 ? "s" : "").append(" detected: ");
216             Iterator<Entry<Subscriber, Exception>> iter = errors.entrySet().iterator();
217             while (iter.hasNext()) {
218                 Entry<Subscriber, Exception> entry = iter.next();
219                 e = entry.getValue();
220                 Subscriber subscriber = entry.getKey();
221                 msg.append("\n").append(e.getMessage()).append(" on ").append(subscriber.getName());
222                 log.error(e.getMessage(), e);
223             }
224 
225             throw new ExchangeException(msg.toString(), e);
226         }
227     }
228 
229     private Runnable getDeactivateTask(final Sync done, final Map<Subscriber, Exception> errors, final Subscriber subscriber, final String nodeUUID, final String nodePath) {
230         Runnable r = new Runnable() {
231             public void run() {
232                 try {
233                     doDeactivate(subscriber, nodeUUID, nodePath);
234                 } catch (ExchangeException e) {
235                     log.error("Failed to deactivate content.", e);
236                     errors.put(subscriber,e);
237                 } finally {
238                     done.release();
239                 }
240             }
241         };
242         return r;
243     }
244 
245     /**
246      * Deactivate from a specified subscriber.
247      * @param subscriber
248      * @throws ExchangeException
249      */
250     @Override
251     public String doDeactivate(Subscriber subscriber, String nodeUUID, String path) throws ExchangeException {
252         Subscription subscription = subscriber.getMatchedSubscription(path, this.repositoryName);
253         if (null != subscription) {
254             String handle = getDeactivationURL(subscriber);
255             try {
256                 URLConnection urlConnection = prepareConnection(subscriber);
257 
258                 this.addDeactivationHeaders(urlConnection, nodeUUID);
259                 String status = urlConnection.getHeaderField(ACTIVATION_ATTRIBUTE_STATUS);
260 
261                 // check if the activation failed
262                 if (StringUtils.equals(status, ACTIVATION_FAILED)) {
263                     String message = urlConnection.getHeaderField(ACTIVATION_ATTRIBUTE_MESSAGE);
264                     throw new ExchangeException("Message received from subscriber: " + message);
265                 }
266 
267                 urlConnection.getContent();
268 
269             }
270             catch (MalformedURLException e) {
271                 throw new ExchangeException("Incorrect URL for subscriber " + subscriber + "[" + handle + "]");
272             }
273             catch (IOException e) {
274                 throw new ExchangeException("Not able to send the deactivation request [" + handle + "]: " + e.getMessage());
275             }
276             catch (Exception e) {
277                 throw new ExchangeException(e);
278             }
279         }
280         return null;
281     }
282 
283 }