1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34 package info.magnolia.module.rssaggregator.importhandler;
35
36 import info.magnolia.objectfactory.ComponentProvider;
37
38 import java.util.ArrayList;
39 import java.util.List;
40 import java.util.Set;
41 import java.util.concurrent.CancellationException;
42 import java.util.concurrent.ExecutionException;
43 import java.util.concurrent.ExecutorService;
44 import java.util.concurrent.Executors;
45 import java.util.concurrent.Future;
46 import java.util.concurrent.TimeUnit;
47
48 import javax.inject.Inject;
49
50 import org.slf4j.Logger;
51 import org.slf4j.LoggerFactory;
52
53 import com.sun.syndication.fetcher.impl.HashMapFeedInfoCache;
54
55
56
57
58
59
60
61
62
63
64 public class FastRSSFeedFetcher implements RSSFeedFetcher {
65
66 private static final Logger log = LoggerFactory.getLogger(FastRSSFeedFetcher.class);
67
68 private static final int NUMBER_OF_THREADS = 3;
69
70 private Class<? extends Runnable> feedChannelFetchTask;
71
72 private int shutdownTimeout = 300000;
73
74 private final ExecutorService executorService;
75
76 private ComponentProvider componentProvider;
77
78 private ArrayList<Future<?>> futureList = new ArrayList<Future<?>>();
79
80 @Inject
81 public FastRSSFeedFetcher(ComponentProvider componentProvider) {
82 this.componentProvider = componentProvider;
83 setFeedChannelFetchTask(DefaultFeedChannelFetchTask.class);
84 executorService = Executors.newFixedThreadPool(NUMBER_OF_THREADS);
85 }
86
87 @Override
88 public Set<AggregateFeed> fetchAggregateFeeds(Set<AggregateFeed> aggregateFeeds) {
89 for (AggregateFeed aggregate : aggregateFeeds) {
90 for (FeedChannel channel : aggregate.getChannels()) {
91 Runnable task = componentProvider.newInstance(getFeedChannelFetchTaskClass(), new MgnlHttpURLFeedFetcher(HashMapFeedInfoCache.getInstance()), channel, aggregate.getName());
92 Future<?> future = executorService.submit(task);
93 futureList.add(future);
94 }
95 }
96
97
98 for (Future<?> future : futureList) {
99 waitForFutureResult(future);
100 }
101 return aggregateFeeds;
102 }
103
104 protected void waitForFutureResult(Future<?> future) {
105 try {
106 future.get();
107 } catch (InterruptedException ie) {
108
109 Thread.currentThread().interrupt();
110 } catch (ExecutionException ee) {
111 log.error(ee.getMessage(), ee);
112 } catch (CancellationException e) {
113
114 log.debug("RSS feed update has been cancelled.");
115 }
116 }
117
118
119
120
121 public void shutdown() {
122 List<Runnable> runnables = executorService.shutdownNow();
123 log.info("Canceling {} RSS Update Tasks because of shutdown...", runnables.size());
124
125
126 for (Future<?> f : futureList) {
127 f.cancel(true);
128 }
129
130 try {
131 if (executorService.awaitTermination(getShutdownTimeout(), TimeUnit.MILLISECONDS)) {
132 log.info("Termination successful.");
133 } else {
134 log.error("Termination timed out.");
135 }
136 } catch (InterruptedException e) {
137 log.error("Termination error.", e);
138 }
139 }
140
141 public Class<? extends Runnable> getFeedChannelFetchTaskClass() {
142 return feedChannelFetchTask;
143 }
144
145 public void setFeedChannelFetchTask(Class<? extends Runnable> feedChannelFetchTask) {
146 this.feedChannelFetchTask = feedChannelFetchTask;
147 }
148
149 public int getShutdownTimeout() {
150 return shutdownTimeout;
151 }
152
153 public void setShutdownTimeout(int shutdownTimeout) {
154 this.shutdownTimeout = shutdownTimeout;
155 }
156
157 protected ExecutorService getExecutorService() {
158 return executorService;
159 }
160 }