View Javadoc

1   /**
2    * This file Copyright (c) 2003-2011 Magnolia International
3    * Ltd.  (http://www.magnolia-cms.com). All rights reserved.
4    *
5    *
6    * This file is dual-licensed under both the Magnolia
7    * Network Agreement and the GNU General Public License.
8    * You may elect to use one or the other of these licenses.
9    *
10   * This file is distributed in the hope that it will be
11   * useful, but AS-IS and WITHOUT ANY WARRANTY; without even the
12   * implied warranty of MERCHANTABILITY or FITNESS FOR A
13   * PARTICULAR PURPOSE, TITLE, or NONINFRINGEMENT.
14   * Redistribution, except as permitted by whichever of the GPL
15   * or MNA you select, is prohibited.
16   *
17   * 1. For the GPL license (GPL), you can redistribute and/or
18   * modify this file under the terms of the GNU General
19   * Public License, Version 3, as published by the Free Software
20   * Foundation.  You should have received a copy of the GNU
21   * General Public License, Version 3 along with this program;
22   * if not, write to the Free Software Foundation, Inc., 51
23   * Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
24   *
25   * 2. For the Magnolia Network Agreement (MNA), this file
26   * and the accompanying materials are made available under the
27   * terms of the MNA which accompanies this distribution, and
28   * is available at http://www.magnolia-cms.com/mna.html
29   *
30   * Any modifications to this file must keep this entire header
31   * intact.
32   *
33   */
34  package info.magnolia.module.exchangesimple;
35  
36  import info.magnolia.cms.core.ItemType;
37  import info.magnolia.cms.exchange.ActivationManagerFactory;
38  import info.magnolia.cms.exchange.ExchangeException;
39  import info.magnolia.cms.exchange.Subscriber;
40  import info.magnolia.cms.exchange.Subscription;
41  
42  import java.io.IOException;
43  import java.net.MalformedURLException;
44  import java.net.URLConnection;
45  import java.util.Collection;
46  import java.util.Iterator;
47  import java.util.Map;
48  import java.util.Map.Entry;
49  import java.util.concurrent.ConcurrentHashMap;
50  
51  import org.apache.commons.lang.StringUtils;
52  import org.slf4j.Logger;
53  import org.slf4j.LoggerFactory;
54  
55  import EDU.oswego.cs.dl.util.concurrent.CountDown;
56  import EDU.oswego.cs.dl.util.concurrent.Sync;
57  
58  
59  /**
60   * Implementation of syndicator that simply sends all activated content over http connection specified in the subscriber.
61   * @author Sameer Charles $Id: SimpleSyndicator.java 48401 2011-08-17 10:48:05Z had $
62   */
63  public class SimpleSyndicator extends BaseSyndicatorImpl {
64      private static final Logger log = LoggerFactory.getLogger(SimpleSyndicator.class);
65  
66      public SimpleSyndicator() {
67      }
68  
69      @Override
70      public void activate(final ActivationContent activationContent, String nodePath) throws ExchangeException {
71          String nodeUUID = activationContent.getproperty(NODE_UUID);
72          Collection<Subscriber> subscribers = ActivationManagerFactory.getActivationManager().getSubscribers();
73          Iterator<Subscriber> subscriberIterator = subscribers.iterator();
74          final Sync done = new CountDown(subscribers.size());
75          final Map<Subscriber, Exception> errors = new ConcurrentHashMap<Subscriber, Exception>(subscribers.size());
76          while (subscriberIterator.hasNext()) {
77              final Subscriber subscriber = subscriberIterator.next();
78              if (subscriber.isActive()) {
79                  // Create runnable task for each subscriber execute
80                  if (Boolean.parseBoolean(activationContent.getproperty(ItemType.DELETED_NODE_MIXIN))) {
81                      executeInPool(getDeactivateTask(done, errors, subscriber, nodeUUID, nodePath));
82                  } else {
83                      executeInPool(getActivateTask(activationContent, done, errors, subscriber, nodePath));
84                  }
85              } else {
86                  // count down directly
87                  done.release();
88              }
89          } //end of subscriber loop
90  
91          // wait until all tasks are executed before returning back to user to make sure errors can be propagated back to the user.
92          acquireIgnoringInterruption(done);
93  
94          // collect all the errors and send them back.
95          if (!errors.isEmpty()) {
96              Exception e = null;
97              StringBuffer msg = new StringBuffer(errors.size() + " error").append(errors.size() > 1 ? "s" : "").append(" detected: ");
98              Iterator<Map.Entry<Subscriber, Exception>> iter = errors.entrySet().iterator();
99              while (iter.hasNext()) {
100                 Entry<Subscriber, Exception> entry = iter.next();
101                 e = entry.getValue();
102                 Subscriber subscriber = entry.getKey();
103                 msg.append("\n").append(e.getMessage()).append(" on ").append(subscriber.getName());
104                 log.error(e.getMessage(), e);
105             }
106 
107             throw new ExchangeException(msg.toString(), e);
108         }
109 
110         executeInPool(new Runnable() {
111             @Override
112             public void run() {
113                 cleanTemporaryStore(activationContent);
114             }
115         });
116     }
117 
118     private Runnable getActivateTask(final ActivationContent activationContent, final Sync done, final Map<Subscriber, Exception> errors, final Subscriber subscriber, final String nodePath) {
119         Runnable r = new Runnable() {
120             @Override
121             public void run() {
122                 try {
123                     activate(subscriber, activationContent, nodePath);
124                 } catch (Exception e) {
125                     log.error("Failed to activate content.", e);
126                     errors.put(subscriber,e);
127                 } finally {
128                     done.release();
129                 }
130             }
131         };
132         return r;
133     }
134 
135     @Override
136     public void doDeactivate(String nodeUUID, String nodePath) throws ExchangeException {
137         Collection<Subscriber> subscribers = ActivationManagerFactory.getActivationManager().getSubscribers();
138         Iterator<Subscriber> subscriberIterator = subscribers.iterator();
139         final Sync done = new CountDown(subscribers.size());
140         final Map<Subscriber, Exception> errors = new ConcurrentHashMap<Subscriber, Exception>();
141         while (subscriberIterator.hasNext()) {
142             final Subscriber subscriber = subscriberIterator.next();
143             if (subscriber.isActive()) {
144                 // Create runnable task for each subscriber.
145                 executeInPool(getDeactivateTask(done, errors, subscriber, nodeUUID, nodePath));
146             } else {
147                 // count down directly
148                 done.release();
149             }
150         } //end of subscriber loop
151 
152         // wait until all tasks are executed before returning back to user to make sure errors can be propagated back to the user.
153         acquireIgnoringInterruption(done);
154 
155         // collect all the errors and send them back.
156         if (!errors.isEmpty()) {
157             Exception e = null;
158             StringBuffer msg = new StringBuffer(errors.size() + " error").append(
159                     errors.size() > 1 ? "s" : "").append(" detected: ");
160             Iterator<Entry<Subscriber, Exception>> iter = errors.entrySet().iterator();
161             while (iter.hasNext()) {
162                 Entry<Subscriber, Exception> entry = iter.next();
163                 e = entry.getValue();
164                 Subscriber subscriber = entry.getKey();
165                 msg.append("\n").append(e.getMessage()).append(" on ").append(subscriber.getName());
166                 log.error(e.getMessage(), e);
167             }
168 
169             throw new ExchangeException(msg.toString(), e);
170         }
171     }
172 
173     private Runnable getDeactivateTask(final Sync done, final Map<Subscriber, Exception> errors, final Subscriber subscriber, final String nodeUUID, final String nodePath) {
174         Runnable r = new Runnable() {
175             @Override
176             public void run() {
177                 try {
178                     doDeactivate(subscriber, nodeUUID, nodePath);
179                 } catch (Exception e) {
180                     log.error("Failed to deactivate content.", e);
181                     errors.put(subscriber,e);
182                 } finally {
183                     done.release();
184                 }
185             }
186         };
187         return r;
188     }
189 
190     /**
191      * Deactivate from a specified subscriber.
192      * @param subscriber
193      * @throws ExchangeException
194      */
195     @Override
196     public String doDeactivate(Subscriber subscriber, String nodeUUID, String path) throws ExchangeException {
197         Subscription subscription = subscriber.getMatchedSubscription(path, this.repositoryName);
198         if (null != subscription) {
199             String urlString = getDeactivationURL(subscriber);
200             try {
201                 URLConnection urlConnection = prepareConnection(subscriber, urlString);
202 
203                 this.addDeactivationHeaders(urlConnection, nodeUUID);
204                 String status = urlConnection.getHeaderField(ACTIVATION_ATTRIBUTE_STATUS);
205 
206                 // check if the activation failed
207                 if (StringUtils.equals(status, ACTIVATION_FAILED)) {
208                     String message = urlConnection.getHeaderField(ACTIVATION_ATTRIBUTE_MESSAGE);
209                     throw new ExchangeException("Message received from subscriber: " + message);
210                 }
211 
212                 urlConnection.getContent();
213 
214             }
215             catch (MalformedURLException e) {
216                 throw new ExchangeException("Incorrect URL for subscriber " + subscriber + "[" + stripPasswordFromUrl(urlString) + "]");
217             }
218             catch (IOException e) {
219                 throw new ExchangeException("Not able to send the deactivation request [" + stripPasswordFromUrl(urlString) + "]: " + e.getMessage());
220             }
221             catch (Exception e) {
222                 throw new ExchangeException(e);
223             }
224         }
225         return null;
226     }
227 
228 }