1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34 package info.magnolia.module.cache;
35
36 import info.magnolia.cms.util.ObservationUtil;
37 import info.magnolia.repository.RepositoryManager;
38
39 import java.util.ArrayList;
40 import java.util.Arrays;
41 import java.util.Collection;
42 import java.util.HashMap;
43 import java.util.List;
44 import java.util.Map;
45
46 import javax.inject.Inject;
47 import javax.jcr.RepositoryException;
48 import javax.jcr.observation.Event;
49 import javax.jcr.observation.EventIterator;
50 import javax.jcr.observation.EventListener;
51
52 import org.apache.commons.lang3.StringUtils;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55
56
57
58
59 public abstract class AbstractListeningFlushPolicy implements FlushPolicy {
60
61 private static final Logger log = LoggerFactory.getLogger(AbstractListeningFlushPolicy.class);
62
63 private List<String> workspaces;
64 private List<String> excludedWorkspaces = new ArrayList<>();
65 private Map<String, EventListener> registeredListeners = new HashMap<>();
66
67 private final CacheModule cacheModule;
68 private final RepositoryManager repositoryManager;
69
70 @Inject
71 public AbstractListeningFlushPolicy(CacheModule cacheModule, RepositoryManager repositoryManager) {
72 this.cacheModule = cacheModule;
73 this.repositoryManager = repositoryManager;
74 }
75
76 @Override
77 public void start(Cache cache) {
78 for (final String workspace : this.getWorkspacesToProcess()) {
79 try {
80 if (repositoryManager.getWorkspaceMapping(workspace) != null) {
81 for (String path : getPaths(cache, workspace)) {
82 final CacheCleaner cacheCleaner = new CacheCleaner(cache, workspace);
83 final EventListener listener = ObservationUtil.instanciateDeferredEventListener(cacheCleaner, 5000, 30000);
84 ObservationUtil.registerChangeListener(workspace, path, listener);
85 registeredListeners.put(cache.getName() + ":" + workspace + ":" + path, listener);
86 }
87 }
88 } catch (Exception e) {
89 log.warn("Failed to register cache flushing observation for workspace '{}' (cache named {}): ", workspace, cache.getName(), e);
90 }
91 }
92 }
93
94 protected Collection<String> getPaths(Cache cache, String workspace) {
95 return Arrays.asList("/");
96 }
97
98 @Override
99 public void stop(Cache cache) {
100 for (Map.Entry<String, EventListener> entry : new HashMap<>(registeredListeners).entrySet()) {
101 if (entry.getValue() == null) {
102
103 continue;
104 }
105 if (StringUtils.startsWith(entry.getKey(), cache.getName() + ":")) {
106 ObservationUtil.unregisterChangeListener(StringUtils.substringBetween(entry.getKey(), ":"), entry.getValue());
107 registeredListeners.remove(entry.getKey());
108 }
109 }
110 }
111
112 private List<String> getWorkspacesToProcess() {
113 if (this.getWorkspaces() != null) {
114 return this.getWorkspaces();
115 } else {
116 ArrayList<String> workspaces = new ArrayList<>(repositoryManager.getWorkspaceNames());
117 workspaces.removeAll(this.getExcludedWorkspaces());
118 return workspaces;
119 }
120 }
121
122
123
124
125
126
127 protected abstract boolean preHandleEvents(Cache cache, String repository);
128
129
130
131
132
133 protected abstract void postHandleEvents(Cache cache, String repository);
134
135
136
137
138
139
140 protected abstract void handleSingleEvent(Cache cache, String repository, Event event);
141
142
143
144
145
146 protected void flushByUUID(String uuid, String repository, Cache cache) {
147
148 final ContentCachingConfiguration config = cacheModule.getContentCaching(cache.getName());
149 final CachePolicy policy = config.getCachePolicy();
150 if (policy == null) {
151
152 return;
153 }
154
155 Object[] cacheEntryKeys = config.getCachePolicy().retrieveCacheKeys(uuid, repository);
156 log.debug("Flushing {} due to detected content {}:{} update.", cacheEntryKeys, repository, uuid);
157
158 if (cacheEntryKeys == null || cacheEntryKeys.length == 0) {
159
160 return;
161 }
162 for (Object key : cacheEntryKeys) {
163 cache.remove(key);
164 }
165
166 }
167
168
169
170
171 protected class CacheCleaner implements EventListener {
172 private final Cache cache;
173 private final String repository;
174
175 public CacheCleaner(Cache cache, String repository) {
176 this.cache = cache;
177 this.repository = repository;
178 }
179
180 @Override
181 public void onEvent(EventIterator events) {
182 List<Event> eventList = new ArrayList<Event>();
183
184 while (events.hasNext()) {
185 final Event event = events.nextEvent();
186 try {
187 if (!event.getPath().startsWith("/jcr:")) {
188 eventList.add(event);
189 }
190 } catch (RepositoryException e) {
191 log.warn("Failed to process an event {}, the observation based cache flushing might not have been fully completed.", event.toString());
192 }
193 }
194 if (eventList.isEmpty()) {
195 return;
196 }
197
198 boolean shouldContinue = preHandleEvents(cache, repository);
199 if (shouldContinue) {
200 for (Event event : eventList) {
201 handleSingleEvent(cache, repository, event);
202 }
203 postHandleEvents(cache, repository);
204 }
205 }
206 }
207
208
209
210
211
212
213
214 public List<String> getWorkspaces() {
215 return workspaces;
216 }
217
218 public void setWorkspaces(List<String> workspaces) {
219 if (!this.getExcludedWorkspaces().isEmpty()) {
220 log.error("You should configure only 'workspaces' or 'excludedWorkspaces' on {}. Not both of them.", this.getClass());
221 return;
222 }
223 this.workspaces = workspaces;
224 }
225
226
227
228
229 public List<String> getExcludedWorkspaces() {
230 return excludedWorkspaces;
231 }
232
233 public void setExcludedWorkspaces(List<String> excludedWorkspaces) {
234 if (this.getWorkspaces() != null) {
235 log.error("You should configure only 'workspaces' or 'excludedWorkspaces' on {}. Not both of them.", this.getClass());
236 return;
237 }
238 this.excludedWorkspaces = excludedWorkspaces;
239 }
240 }