1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34 package info.magnolia.ui.framework;
35
36
37 import java.io.Serializable;
38 import java.util.ArrayList;
39 import java.util.List;
40 import java.util.Objects;
41 import java.util.Optional;
42 import java.util.function.Consumer;
43 import java.util.function.UnaryOperator;
44
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
47
48 import io.reactivex.BackpressureStrategy;
49 import io.reactivex.Flowable;
50 import io.reactivex.disposables.Disposable;
51 import io.reactivex.subjects.BehaviorSubject;
52 import io.reactivex.subjects.Subject;
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67 public interface ContextProperty<T> extends Serializable {
68
69 Logger log = LoggerFactory.getLogger(ContextProperty.class);
70
71 Disposable observeNullable(Consumer<T> action);
72
73 Disposable observe(Consumer<Optional<T>> action);
74
75 Optional<T> value();
76
77 void interceptWith(UnaryOperator<T> valueTransformer);
78
79 default void set(T value) {
80 set(value, false);
81 }
82
83 void set(T value, boolean shouldNotifyOnSameItem);
84
85 void update(UnaryOperator<T> updateOperation);
86
87 void update(Consumer<T> updateOperation);
88
89 default T nullableValue() {
90 return value().orElse(null);
91 }
92
93
94
95
96
97 class Wrapper<T> implements ContextProperty<T> {
98
99 private final ContextProperty<T> delegate;
100
101 public Wrapper(ContextProperty<T> delegate) {
102 this.delegate = delegate;
103 }
104
105 @Override
106 public Disposable observeNullable(Consumer<T> action) {
107 return this.delegate.observeNullable(action);
108 }
109
110 @Override
111 public Disposable observe(Consumer<Optional<T>> action) {
112 return this.delegate.observe(action);
113 }
114
115 @Override
116 public Optional<T> value() {
117 return this.delegate.value();
118 }
119
120 @Override
121 public void interceptWith(UnaryOperator<T> valueTransformer) {
122 this.delegate.interceptWith(valueTransformer);
123 }
124
125 @Override
126 public void set(T value, boolean shouldNotifyOnSameItem) {
127 this.delegate.set(value, shouldNotifyOnSameItem);
128 }
129
130 @Override
131 public void update(UnaryOperator<T> updateOperation) {
132 this.delegate.update(updateOperation);
133 }
134
135 @Override
136 public void update(Consumer<T> updateOperation) {
137 this.delegate.update(updateOperation);
138 }
139 }
140
141
142
143
144
145
146
147 class Impl<T> implements ContextProperty<T> {
148
149 private static final Logger log = LoggerFactory.getLogger(Impl.class);
150
151 private T lastValue = null;
152
153 private T pendingValueUpdate = null;
154
155 private Subject<Optional<T>> subject = BehaviorSubject.createDefault(Optional.empty());
156
157 private boolean updatesMuted;
158
159 private List<UnaryOperator<T>> valueTransformers = new ArrayList<>();
160
161 Impl() {
162 subject.onNext(Optional.empty());
163 }
164
165 public Disposable observeNullable(Consumer<T> action) {
166 return asFlowable()
167 .subscribe(
168 optional -> {
169 muteUpdates();
170 try {
171 action.accept(optional.orElse(null));
172 } finally {
173 unmuteUpdates();
174 }
175 },
176 e -> log.error("Failed to dispatch context property change: {}", e.getMessage(), e));
177 }
178
179 public Flowable<Optional<T>> asFlowable() {
180 return subject.toFlowable(BackpressureStrategy.LATEST).map(optionalValue -> Optional.ofNullable(applyValueTransformers(optionalValue.orElse(null))));
181 }
182
183 public T applyValueTransformers(T value) {
184 return valueTransformers.stream().reduce(
185 value,
186 (current, mapper) -> mapper.apply(current),
187 (f, s) -> s);
188 }
189
190 @Override
191 public Disposable observe(Consumer<Optional<T>> action) {
192 return asFlowable().subscribe(value -> {
193 muteUpdates();
194 try {
195 action.accept(value);
196 } finally {
197 unmuteUpdates();
198 }
199 }, e -> log.error("Failed to dispatch context property change: {}", e.getMessage(), e));
200
201 }
202
203 @Override
204 public void update(UnaryOperator<T> updateOperation) {
205 value().ifPresent(value -> {
206 try {
207 doSet(updateOperation.apply(value), true);
208 } catch (Exception e) {
209 log.error("Failed to update context property value", e);
210 }
211 });
212 }
213
214 @Override
215 public void update(Consumer<T> updateOperation) {
216 update(value -> {
217 updateOperation.accept(value);
218 return value;
219 });
220 }
221
222 @Override
223 public Optional<T> value() {
224 return Optional.ofNullable(applyValueTransformers(this.lastValue));
225 }
226
227 @Override
228 public void interceptWith(UnaryOperator<T> valueTransformer) {
229 this.valueTransformers.add(valueTransformer);
230 }
231
232 @Override
233 public void set(T value, boolean shouldNotifyOnSameItem) {
234 doSet(value, shouldNotifyOnSameItem);
235 }
236
237 public void doSet(T value, boolean shouldNotifyOnSameItem) {
238 if (updatesMuted) {
239 pendingValueUpdate = value;
240 return;
241 }
242
243 if (!shouldNotifyOnSameItem && Objects.equals(value, this.lastValue)) {
244 return;
245 }
246
247 this.lastValue = value;
248 muteUpdates();
249 try {
250 this.subject.onNext(Optional.ofNullable(value));
251 } finally {
252 unmuteUpdates();
253 }
254 }
255
256 private void muteUpdates() {
257 this.updatesMuted = true;
258 }
259
260 private void unmuteUpdates() {
261 if (pendingValueUpdate != null) {
262 doSet(pendingValueUpdate, false);
263 pendingValueUpdate = null;
264 }
265
266 this.updatesMuted = false;
267 }
268 }
269 }