Skip to content

Commit e918889

Browse files
authored
refactor: 优化树结构实体类的保存逻辑和性能 (#223)
1 parent e577bc3 commit e918889

File tree

4 files changed

+434
-120
lines changed

4 files changed

+434
-120
lines changed

hsweb-commons/hsweb-commons-crud/src/main/java/org/hswebframework/web/crud/service/ReactiveTreeSortEntityService.java

+119-113
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,10 @@
11
package org.hswebframework.web.crud.service;
22

3+
import org.apache.commons.collections4.CollectionUtils;
34
import org.hswebframework.ezorm.rdb.mapping.defaults.SaveResult;
45
import org.hswebframework.ezorm.rdb.operator.dml.Terms;
56
import org.hswebframework.utils.RandomUtil;
6-
import org.hswebframework.web.api.crud.entity.QueryParamEntity;
7-
import org.hswebframework.web.api.crud.entity.TransactionManagers;
8-
import org.hswebframework.web.api.crud.entity.TreeSortSupportEntity;
9-
import org.hswebframework.web.api.crud.entity.TreeSupportEntity;
7+
import org.hswebframework.web.api.crud.entity.*;
108
import org.hswebframework.web.exception.ValidationException;
119
import org.hswebframework.web.id.IDGenerator;
1210
import org.hswebframework.web.validator.CreateGroup;
@@ -86,12 +84,13 @@ default Flux<E> queryIncludeChildren(Collection<K> idList) {
8684

8785
return findById(idList)
8886
.concatMap(e -> !StringUtils.hasText(e.getPath()) || !duplicateCheck.add(e.getPath())
89-
? Mono.just(e)
90-
: createQuery()
91-
.where()
92-
//使用path快速查询
93-
.like$("path", e.getPath())
94-
.fetch())
87+
? Mono.just(e)
88+
: createQuery()
89+
.where()
90+
//使用path快速查询
91+
.like$("path", e.getPath())
92+
.fetch(),
93+
Integer.MAX_VALUE)
9594
.distinct(TreeSupportEntity::getId);
9695
}
9796

@@ -114,7 +113,7 @@ default Flux<E> queryIncludeParent(Collection<K> idList) {
114113
.accept(Terms.Like.reversal("path", e.getPath(), false, true))
115114
.notEmpty("path")
116115
.notNull("path")
117-
.fetch())
116+
.fetch(), Integer.MAX_VALUE)
118117
.distinct(TreeSupportEntity::getId);
119118
}
120119

@@ -156,20 +155,18 @@ default Mono<Integer> insert(E data) {
156155
default Mono<Integer> insertBatch(Publisher<? extends Collection<E>> entityPublisher) {
157156
return this
158157
.getRepository()
159-
.insertBatch(Flux.from(entityPublisher)
160-
.flatMap(Flux::fromIterable)
161-
.collectList()
162-
.flatMap(this::checkParentId)
163-
.flatMapIterable(Function.identity())
164-
.flatMap(this::applyTreeProperty)
165-
.flatMap(e -> Flux.fromIterable(TreeSupportEntity.expandTree2List(e, getIDGenerator())))
166-
.buffer(getBufferSize()));
158+
.insertBatch(new TreeSortServiceHelper<>(this)
159+
.prepare(Flux.from(entityPublisher)
160+
.flatMapIterable(Function.identity()))
161+
.doOnNext(e -> e.tryValidate(CreateGroup.class))
162+
.buffer(getBufferSize()));
167163
}
168164

169165
default int getBufferSize() {
170166
return 200;
171167
}
172168

169+
@Deprecated
173170
default Mono<E> applyTreeProperty(E ele) {
174171
if (StringUtils.hasText(ele.getPath()) ||
175172
ObjectUtils.isEmpty(ele.getParentId())) {
@@ -182,6 +179,7 @@ default Mono<E> applyTreeProperty(E ele) {
182179
.thenReturn(ele);
183180
}
184181

182+
@Deprecated
185183
//校验是否有循环依赖,修改父节点为自己的子节点?
186184
default Mono<E> checkCyclicDependency(K id, E ele) {
187185
if (ObjectUtils.isEmpty(id)) {
@@ -197,6 +195,7 @@ default Mono<E> checkCyclicDependency(K id, E ele) {
197195
.then(Mono.just(ele));
198196
}
199197

198+
@Deprecated
200199
default Mono<Collection<E>> checkParentId(Collection<E> source) {
201200

202201
Set<K> idSet = source
@@ -221,107 +220,126 @@ default Mono<Collection<E>> checkParentId(Collection<E> source) {
221220

222221
return this
223222
.createQuery()
223+
.select("id")
224224
.in("id", readyToCheck)
225-
.count()
226-
.doOnNext(count -> {
227-
if (count != readyToCheck.size()) {
228-
throw new ValidationException("parentId", "error.tree_entity_parent_id_not_exist");
225+
.fetch()
226+
.doOnNext(e -> readyToCheck.remove(e.getId()))
227+
.then(Mono.fromSupplier(() -> {
228+
if (!readyToCheck.isEmpty()) {
229+
throw new ValidationException(
230+
"error.tree_entity_parent_id_not_exist",
231+
Collections.singletonList(
232+
new ValidationException.Detail(
233+
"parentId",
234+
"error.tree_entity_parent_id_not_exist",
235+
readyToCheck))
236+
);
229237
}
230-
})
231-
.thenReturn(source);
238+
return source;
239+
}));
232240

233241
}
234242

243+
@Deprecated
235244
//重构子节点的path
236-
default Mono<Void> refactorChildPath(K id, String path, Consumer<E> pathAccepter) {
237-
return this
238-
.createQuery()
239-
.where("parentId", id)
240-
.fetch()
241-
.flatMap(e -> {
242-
if (ObjectUtils.isEmpty(path)) {
243-
e.setPath(RandomUtil.randomChar(4));
244-
} else {
245-
e.setPath(path + "-" + RandomUtil.randomChar(4));
246-
}
247-
pathAccepter.accept(e);
248-
if (e.getParentId() != null) {
249-
return this
250-
.refactorChildPath(e.getId(), e.getPath(), pathAccepter)
251-
.thenReturn(e);
252-
}
253-
return Mono.just(e);
254-
})
255-
.as(getRepository()::save)
256-
.then();
245+
default void refactorChildPath(K id, Function<K, Collection<E>> childGetter, String path, Consumer<E> pathAccepter) {
246+
247+
Collection<E> children = childGetter.apply(id);
248+
if (CollectionUtils.isEmpty(children)) {
249+
return;
250+
}
251+
for (E child : children) {
252+
if (ObjectUtils.isEmpty(path)) {
253+
child.setPath(RandomUtil.randomChar(4));
254+
} else {
255+
child.setPath(path + "-" + RandomUtil.randomChar(4));
256+
}
257+
pathAccepter.accept(child);
258+
this.refactorChildPath(child.getId(), childGetter, child.getPath(), pathAccepter);
259+
}
260+
257261
}
258262

259263
@Override
264+
@Transactional(rollbackFor = Throwable.class,
265+
transactionManager = TransactionManagers.reactiveTransactionManager)
260266
default Mono<SaveResult> save(Publisher<E> entityPublisher) {
261-
return Flux
262-
.from(entityPublisher)
263-
//1.先平铺
264-
.flatMapIterable(e -> TreeSupportEntity.expandTree2List(e, getIDGenerator()))
265-
.collectList()
266-
.flatMap(this::checkParentId)
267-
.flatMapIterable(list -> {
268-
Map<K, E> map = list
269-
.stream()
270-
.filter(e -> e.getId() != null)
271-
.collect(Collectors.toMap(TreeSupportEntity::getId, Function.identity()));
272-
//2. 重新组装树结构
273-
return TreeSupportEntity.list2tree(list,
274-
this::setChildren,
275-
(Predicate<E>) e -> this.isRootNode(e) || map.get(e.getParentId()) == null);
276-
277-
})
278-
//执行验证
267+
return new TreeSortServiceHelper<>(this)
268+
.prepare(Flux.from(entityPublisher))
279269
.doOnNext(e -> e.tryValidate(CreateGroup.class))
280-
//再次平铺为
281-
.flatMapIterable(e -> TreeSupportEntity.expandTree2List(e, getIDGenerator()))
282-
//重构path
283-
.as(this::tryRefactorPath)
284270
.buffer(getBufferSize())
285271
.flatMap(this.getRepository()::save)
286272
.reduce(SaveResult::merge);
287273

288274
}
289275

276+
@Deprecated
290277
default Flux<E> tryRefactorPath(Flux<E> stream) {
291278
Flux<E> cache = stream.cache();
292279
Mono<Map<K, E>> mapping = cache
293280
.filter(e -> null != e.getId())
294281
.collectMap(TreeSupportEntity::getId, Function.identity())
295282
.defaultIfEmpty(Collections.emptyMap());
296283

297-
//查询出旧数据
298-
Mono<Map<K, E>> olds = cache
299-
.filter(e -> null != e.getId())
300-
.map(TreeSupportEntity::getId)
301-
.as(this::findById)
302-
.collectMap(TreeSupportEntity::getId, Function.identity())
303-
.defaultIfEmpty(Collections.emptyMap());
304-
284+
Mono<Map<K, E>> allDataFetcher =
285+
cache
286+
.filter(e -> null != e.getId())
287+
.flatMapIterable(e -> e.getParentId() != null ?
288+
Arrays.asList(e.getId(), e.getParentId()) :
289+
Collections.singleton(e.getId()))
290+
.collect(Collectors.toSet())
291+
.flatMap(list -> this
292+
.queryIncludeChildren(list)
293+
.collect(
294+
Collectors.toMap(
295+
TreeSupportEntity::getId,
296+
Function.identity()
297+
)
298+
));
305299

306300
return Mono
307-
.zip(mapping, olds)
301+
.zip(mapping, allDataFetcher)
308302
.flatMapMany(tp2 -> {
309-
Map<K, E> map = tp2.getT1();
303+
//本次提交的数据
304+
Map<K, E> thisTime = tp2.getT1();
305+
//旧的数据
310306
Map<K, E> oldMap = tp2.getT2();
311307

308+
Map<K, E> allMap = new LinkedHashMap<>(oldMap);
309+
allMap.putAll(thisTime);
310+
311+
//子节点映射
312+
Map<K, Map<K, E>> childMapping = new LinkedHashMap<>();
313+
314+
List<E> all = new ArrayList<>(oldMap.values());
315+
all.addAll(thisTime.values());
316+
317+
for (E value : all) {
318+
if (isRootNode(value) || value.getId() == null) {
319+
continue;
320+
}
321+
childMapping
322+
.computeIfAbsent(value.getParentId(), ignore -> new LinkedHashMap<>())
323+
.put(value.getId(), value);
324+
}
325+
326+
Function<K, Collection<E>> childGetter
327+
= id -> childMapping
328+
.getOrDefault(id, Collections.emptyMap())
329+
.values();
312330
return cache
313-
.flatMap(data -> {
331+
.concatMap(data -> {
314332
E old = data.getId() == null ? null : oldMap.get(data.getId());
315333
K parentId = old != null ? old.getParentId() : data.getParentId();
316-
E oldParent = parentId == null ? null : oldMap.get(parentId);
334+
E oldParent = parentId == null ? null : allMap.get(parentId);
335+
317336
if (old != null) {
318337
K newParentId = data.getParentId();
319338
//父节点发生变化,更新所有子节点path
320339
if (!Objects.equals(parentId, newParentId)) {
321-
List<Mono<Void>> jobs = new ArrayList<>();
322340
Consumer<E> childConsumer = child -> {
323341
//更新了父节点,但是同时也传入的对应的子节点
324-
E readyToUpdate = map.get(child.getId());
342+
E readyToUpdate = thisTime.get(child.getId());
325343
if (null != readyToUpdate) {
326344
readyToUpdate.setPath(child.getPath());
327345
}
@@ -330,51 +348,39 @@ default Flux<E> tryRefactorPath(Flux<E> stream) {
330348
//变更到了顶级节点
331349
if (isRootNode(data)) {
332350
data.setPath(RandomUtil.randomChar(4));
333-
jobs.add(this.refactorChildPath(old.getId(), data.getPath(), childConsumer));
351+
this.refactorChildPath(old.getId(), childGetter, data.getPath(), childConsumer);
352+
//重新保存所有子节点
353+
return Flux
354+
.fromIterable(childGetter.apply(old.getId()))
355+
.concatWithValues(data);
334356
} else {
335-
if (null != oldParent) {
336-
data.setPath(oldParent.getPath() + "-" + RandomUtil.randomChar(4));
337-
jobs.add(this.refactorChildPath(old.getId(), data.getPath(), childConsumer));
338-
} else {
339-
jobs.add(this.findById(newParentId)
340-
.flatMap(parent -> {
341-
data.setPath(parent.getPath() + "-" + RandomUtil.randomChar(4));
342-
return this.refactorChildPath(data.getId(), data.getPath(), childConsumer);
343-
})
344-
);
357+
E newParent = allMap.get(newParentId);
358+
if (null != newParent) {
359+
data.setPath(newParent.getPath() + "-" + RandomUtil.randomChar(4));
360+
this.refactorChildPath(data.getId(), childGetter, data.getPath(), childConsumer);
361+
//重新保存所有子节点
362+
return Flux.fromIterable(childGetter.apply(data.getId()))
363+
.concatWithValues(data);
345364
}
346365
}
347-
return Flux.merge(jobs)
348-
.then(Mono.just(data));
366+
return Mono.just(data);
349367
} else {
350-
//父节点未变化则使用原始的path
351-
Consumer<E> pathRefactor = (parent) -> {
352-
if (old.getPath().startsWith(parent.getPath())) {
368+
369+
if (oldParent != null) {
370+
if (old.getPath().startsWith(oldParent.getPath())) {
353371
data.setPath(old.getPath());
354372
} else {
355-
data.setPath(parent.getPath() + "-" + RandomUtil.randomChar(4));
373+
data.setPath(oldParent.getPath() + "-" + RandomUtil.randomChar(4));
356374
}
357-
};
358-
if (oldParent != null) {
359-
pathRefactor.accept(oldParent);
360-
} else if (parentId != null) {
361-
return findById(parentId)
362-
.switchIfEmpty(Mono.fromRunnable(() -> {
363-
data.setParentId(null);
364-
data.setLevel(1);
365-
data.setPath(old.getPath());
366-
}))
367-
.doOnNext(pathRefactor)
368-
.thenReturn(data);
369375
} else {
370376
data.setPath(old.getPath());
371377
}
372-
373378
}
374379
}
375380
return Mono.just(data);
376381
});
377-
});
382+
})
383+
.distinct(TreeSupportEntity::getId);
378384
}
379385

380386
@Override

0 commit comments

Comments
 (0)