Skip to content

Commit e2661ce

Browse files
committed
#509 log normalization
1 parent 4c3c3ce commit e2661ce

15 files changed

+147
-105
lines changed

saturn-core/src/main/java/com/vip/saturn/job/basic/AbstractElasticJob.java

+16-13
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.vip.saturn.job.internal.storage.JobNodePath;
2727
import com.vip.saturn.job.trigger.SaturnScheduler;
2828
import com.vip.saturn.job.trigger.SaturnTrigger;
29+
import com.vip.saturn.job.utils.LogUtils;
2930
import org.apache.curator.framework.CuratorFramework;
3031
import org.apache.zookeeper.data.Stat;
3132
import org.slf4j.Logger;
@@ -105,12 +106,12 @@ protected void init() {
105106
}
106107

107108
public final void execute() {
108-
log.trace("Saturn start to execute job [{}].", jobName);
109+
LogUtils.debug(log, jobName, "Saturn start to execute job [{}]", jobName);
109110
// 对每一个jobScheduler,作业对象只有一份,多次使用,所以每次开始执行前先要reset
110111
reset();
111112

112113
if (configService == null) {
113-
log.warn("configService is null");
114+
LogUtils.warn(log, jobName, "configService is null");
114115
return;
115116
}
116117

@@ -121,23 +122,22 @@ public final void execute() {
121122
}
122123

123124
if (!configService.isJobEnabled()) {
124-
if (log.isDebugEnabled()) {
125-
log.debug("{} is disabled, cannot be continued, do nothing about business.", jobName);
126-
}
125+
LogUtils.debug(log, jobName, "{} is disabled, cannot be continued, do nothing about business.",
126+
jobName);
127127
return;
128128
}
129129

130130
shardingContext = executionContextService.getJobExecutionShardingContext();
131131
if (shardingContext.getShardingItems() == null || shardingContext.getShardingItems().isEmpty()) {
132-
if (log.isDebugEnabled()) {
133-
log.debug("{} 's items of the executor is empty, do nothing about business.", jobName);
134-
}
132+
LogUtils.debug(log, jobName, "{} 's items of the executor is empty, do nothing about business.",
133+
jobName);
135134
callbackWhenShardingItemIsEmpty(shardingContext);
136135
return;
137136
}
138137

139138
if (configService.isInPausePeriod()) {
140-
log.info("the job {} current running time is in pausePeriod, do nothing about business.", jobName);
139+
LogUtils.info(log, jobName,
140+
"the job {} current running time is in pausePeriod, do nothing about business.", jobName);
141141
return;
142142
}
143143

@@ -147,7 +147,8 @@ public final void execute() {
147147
failoverService.failoverIfNecessary();
148148
}
149149

150-
log.trace("Saturn finish to execute job [{}], sharding context:{}.", jobName, shardingContext);
150+
LogUtils.debug(log, jobName, "Saturn finish to execute job [{}], sharding context:{}.", jobName,
151+
shardingContext);
151152
} catch (Exception e) {
152153
log.warn(String.format(SaturnConstant.LOG_FORMAT_FOR_STRING, jobName, e.getMessage()), e);
153154
} finally {
@@ -198,15 +199,17 @@ private boolean checkIfZkLostAfterExecution(final Integer item) {
198199
if (itemStat != null) {
199200
long ephemeralOwner = itemStat.getEphemeralOwner();
200201
if (ephemeralOwner != sessionId) {
201-
log.info("[{}] msg=item={} 's running node doesn't belong to current zk, node sessionid is {}, current zk sessionid is {}",
202-
jobName, item, ephemeralOwner, sessionId);
202+
LogUtils.info(log, jobName,
203+
"item={} 's running node doesn't belong to current zk, node sessionid is {}, current zk sessionid is {}",
204+
item, ephemeralOwner, sessionId);
203205
return false;
204206
} else {
205207
return true;
206208
}
207209
}
208210
// 如果itemStat是空,要么是已经failover完了,要么是没有节点failover;两种情况都返回false
209-
log.info("[{}] msg=item={} 's running node is not exists, zk sessionid={} ", jobName, item, sessionId);
211+
LogUtils.info(log, jobName, "item={} 's running node is not exists, zk sessionid={} ", item, sessionId);
212+
210213
return false;
211214
} catch (Throwable e) {
212215
log.error(String.format(SaturnConstant.LOG_FORMAT_FOR_STRING, jobName, e.getMessage()), e);

saturn-core/src/main/java/com/vip/saturn/job/basic/AbstractSaturnJob.java

+7-6
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import com.vip.saturn.job.exception.JobException;
99
import com.vip.saturn.job.executor.SaturnExecutorService;
1010
import com.vip.saturn.job.internal.statistics.ProcessCountStatistics;
11+
import com.vip.saturn.job.utils.LogUtils;
1112
import org.slf4j.Logger;
1213
import org.slf4j.LoggerFactory;
1314
import org.springframework.util.PropertyPlaceholderHelper;
@@ -31,7 +32,7 @@ public abstract class AbstractSaturnJob extends AbstractElasticJob {
3132
@Override
3233
protected final void executeJob(final JobExecutionMultipleShardingContext shardingContext) {
3334
if (!(shardingContext instanceof SaturnExecutionContext)) {
34-
log.error("[{}] msg=!!! The context must be instance of SaturnJobExecutionContext !!!", jobName);
35+
LogUtils.error(log, jobName, "!!! The context must be instance of SaturnJobExecutionContext !!!");
3536
return;
3637
}
3738
long start = System.currentTimeMillis();
@@ -48,14 +49,14 @@ protected final void executeJob(final JobExecutionMultipleShardingContext shardi
4849
// items为需要处理的作业分片
4950
List<Integer> items = shardingContext.getShardingItems();
5051

51-
log.info("[{}] msg=Job {} handle items: {}", jobName, jobName, items);
52+
LogUtils.info(log, jobName, "Job {} handle items: {}", jobName, items);
5253

5354
for (Integer item : items) {
5455
// 兼容配置错误,如配置3个分片, 参数表配置为0=*, 2=*, 则1分片不会执行
5556
if (!shardingItemParameters.containsKey(item)) {
56-
log.error(
57+
LogUtils.error(log, jobName,
5758
"The {} item's parameter is not valid, will not execute the business code, please check shardingItemParameters",
58-
item);
59+
items);
5960
SaturnJobReturn errRet = new SaturnJobReturn(SaturnSystemReturnCode.SYSTEM_FAIL,
6061
"Config of parameter is not valid, check shardingItemParameters", SaturnSystemErrorGroup.FAIL);
6162
retMap.put(item, errRet);
@@ -75,7 +76,7 @@ protected final void executeJob(final JobExecutionMultipleShardingContext shardi
7576
}
7677
}
7778
long end = System.currentTimeMillis();
78-
log.info("[{}] msg={} finished, totalCost={}ms, return={}", jobName, jobName, (end - start), retMap);
79+
LogUtils.info(log, jobName, "{} finished, totalCost={}ms, return={}", jobName, (end - start), retMap);
7980
}
8081

8182
protected void updateExecuteResult(SaturnJobReturn saturnJobReturn, SaturnExecutionContext saturnContext,
@@ -175,7 +176,7 @@ public String logBusinessExceptionIfNecessary(String jobName, Throwable t) {
175176
if (message == null) {
176177
message = t.toString();
177178
}
178-
log.error(String.format(SaturnConstant.LOG_FORMAT_FOR_STRING, jobName, message), t);
179+
LogUtils.error(log, jobName, message, t);
179180
return message;
180181
}
181182

saturn-core/src/main/java/com/vip/saturn/job/basic/JavaShardingItemCallable.java

+8-6
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import com.vip.saturn.job.SaturnSystemErrorGroup;
66
import com.vip.saturn.job.SaturnSystemReturnCode;
77
import com.vip.saturn.job.java.SaturnJavaJob;
8+
import com.vip.saturn.job.utils.LogUtils;
89
import com.vip.saturn.job.utils.SaturnSystemOutputStream;
910
import java.util.concurrent.atomic.AtomicInteger;
1011
import org.slf4j.Logger;
@@ -172,7 +173,7 @@ public SaturnJobReturn call() {
172173

173174
// 不是超时,不是强制停止。 打印错误日志,设置SaturnJobReturn。
174175
if (status.get() != TIMEOUT && status.get() != FORCE_STOP) {
175-
log.error(String.format(SaturnConstant.LOG_FORMAT_FOR_STRING, jobName, t.getMessage()), t);
176+
LogUtils.error(log, jobName, t.toString(), t);
176177
temp = new SaturnJobReturn(SaturnSystemReturnCode.SYSTEM_FAIL, t.getMessage(),
177178
SaturnSystemErrorGroup.FAIL);
178179
}
@@ -185,7 +186,8 @@ public SaturnJobReturn call() {
185186
if (saturnJob != null && saturnJob.getConfigService().showNormalLog()) {
186187
String jobLog = SaturnSystemOutputStream.clearAndGetLog();
187188
if (jobLog != null && jobLog.length() > SaturnConstant.MAX_JOB_LOG_DATA_LENGTH) {
188-
log.info("As the job log exceed max length, only the previous {} characters will be reported",
189+
LogUtils.info(log, jobName,
190+
"As the job log exceed max length, only the previous {} characters will be reported",
189191
SaturnConstant.MAX_JOB_LOG_DATA_LENGTH);
190192
jobLog = jobLog.substring(0, SaturnConstant.MAX_JOB_LOG_DATA_LENGTH);
191193
}
@@ -224,31 +226,31 @@ public void beforeTimeout() {
224226
try {
225227
((SaturnJavaJob) saturnJob).beforeTimeout(jobName, item, itemValue, shardingContext, this);
226228
} catch (Throwable t) {
227-
log.error(String.format(SaturnConstant.LOG_FORMAT_FOR_STRING, jobName, t.getMessage()), t);
229+
LogUtils.error(log, jobName, t.toString(), t);
228230
}
229231
}
230232

231233
protected void onTimeout() {
232234
try {
233235
((SaturnJavaJob) saturnJob).postTimeout(jobName, item, itemValue, shardingContext, this);
234236
} catch (Throwable t) {
235-
log.error(String.format(SaturnConstant.LOG_FORMAT_FOR_STRING, jobName, t.getMessage()), t);
237+
LogUtils.error(log, jobName, t.toString(), t);
236238
}
237239
}
238240

239241
public void beforeForceStop() {
240242
try {
241243
((SaturnJavaJob) saturnJob).beforeForceStop(jobName, item, itemValue, shardingContext, this);
242244
} catch (Throwable t) {
243-
log.error(String.format(SaturnConstant.LOG_FORMAT_FOR_STRING, jobName, t.getMessage()), t);
245+
LogUtils.error(log, jobName, t.toString(), t);
244246
}
245247
}
246248

247249
protected void postForceStop() {
248250
try {
249251
((SaturnJavaJob) saturnJob).postForceStop(jobName, item, itemValue, shardingContext, this);
250252
} catch (Throwable t) {
251-
log.error(String.format(SaturnConstant.LOG_FORMAT_FOR_STRING, jobName, t.getMessage()), t);
253+
LogUtils.error(log, jobName, t.toString(), t);
252254
}
253255
}
254256

saturn-core/src/main/java/com/vip/saturn/job/basic/JobScheduler.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import com.vip.saturn.job.threads.SaturnThreadFactory;
3737
import com.vip.saturn.job.threads.TaskQueue;
3838
import com.vip.saturn.job.trigger.SaturnScheduler;
39+
import com.vip.saturn.job.utils.LogUtils;
3940
import org.apache.curator.framework.CuratorFramework;
4041
import org.quartz.Trigger;
4142
import org.quartz.spi.OperableTrigger;
@@ -179,7 +180,7 @@ private void createJob() {
179180
try {
180181
job = (AbstractElasticJob) jobClass.newInstance();
181182
} catch (Exception e) {
182-
log.error("unexptected error", e);
183+
LogUtils.error(log, jobName, "unexptected error", e);
183184
throw new JobException(e);
184185
}
185186
job.setJobScheduler(this);
@@ -205,8 +206,7 @@ private void initExecutorService() {
205206
public void reCreateExecutorService() {
206207
synchronized (isShutdownFlag) {
207208
if (isShutdownFlag.get()) {
208-
log.warn(SaturnConstant.LOG_FORMAT, jobName,
209-
"the jobScheduler was shutdown, cannot re-create business thread pool");
209+
LogUtils.warn(log, jobName, "the jobScheduler was shutdown, cannot re-create business thread pool");
210210
return;
211211
}
212212
executionService.shutdown();
@@ -241,7 +241,7 @@ public Date getNextFireTimePausePeriodEffected() {
241241
}
242242
return nextFireTime;
243243
} catch (Throwable t) {
244-
log.error("fail to get next fire time", t);
244+
LogUtils.error(log, jobName, "fail to get next fire time", t);
245245
return null;
246246
}
247247
}
@@ -307,7 +307,7 @@ public void shutdown(boolean removejob) {
307307
try {
308308
Thread.sleep(500);// NOSONAR
309309
} catch (InterruptedException ignore) {
310-
log.warn(ignore.getMessage());
310+
LogUtils.warn(log, jobName, ignore.getMessage());
311311
}
312312
jobNodeStorage.deleteJobNode();
313313
saturnExecutorService.removeJobName(jobName);

saturn-core/src/main/java/com/vip/saturn/job/basic/ShardingItemFutureTask.java

+14-17
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.vip.saturn.job.basic;
22

33
import com.vip.saturn.job.SaturnJobReturn;
4+
import com.vip.saturn.job.utils.LogUtils;
45
import org.slf4j.Logger;
56
import org.slf4j.LoggerFactory;
67

@@ -72,12 +73,11 @@ public SaturnJobReturn call() throws Exception {
7273
@Override
7374
public void uncaughtException(Thread t, Throwable e) {
7475
if (e instanceof IllegalMonitorStateException || e instanceof ThreadDeath) {
75-
log.warn(String.format(SaturnConstant.LOG_FORMAT_FOR_STRING, callable.getJobName(),
76-
"business thread pool maybe crashed"), e);
76+
LogUtils.warn(log, callable.getJobName(), "business thread pool maybe crashed", e);
7777
if (callFuture != null) {
7878
callFuture.cancel(false);
7979
}
80-
log.warn(SaturnConstant.LOG_FORMAT, callable.getJobName(),
80+
LogUtils.warn(log, callable.getJobName(),
8181
"close the old business thread pool, and re-create new one");
8282
callable.getSaturnJob().getJobScheduler().reCreateExecutorService();
8383
}
@@ -89,8 +89,8 @@ public void uncaughtException(Thread t, Throwable e) {
8989
return ret;
9090
} finally {
9191
done();
92-
log.debug("job:[{}] item:[{}] finish execution, which takes {}ms", callable.getJobName(),
93-
callable.getItem(), callable.getExecutionTime());
92+
LogUtils.debug(log, callable.getJobName(), "job:[{}] item:[{}] finish execution, which takes {}ms",
93+
callable.getJobName(), callable.getItem(), callable.getExecutionTime());
9494
}
9595
}
9696

@@ -110,17 +110,15 @@ private void done() {
110110
callable.onTimeout();
111111
}
112112
} catch (Throwable t) {
113-
log.error(String.format(SaturnConstant.LOG_FORMAT_FOR_STRING, callable.getJobName(), t.getMessage()),
114-
t);
113+
LogUtils.error(log, callable.getJobName(), t.toString(), t);
115114
}
116115

117116
try {
118117
if (callable.isForceStop()) {
119118
callable.postForceStop();
120119
}
121120
} catch (Throwable t) {
122-
log.error(String.format(SaturnConstant.LOG_FORMAT_FOR_STRING, callable.getJobName(), t.getMessage()),
123-
t);
121+
LogUtils.error(log, callable.getJobName(), t.toString(), t);
124122
}
125123

126124
callable.checkAndSetSaturnJobReturn();
@@ -133,8 +131,7 @@ private void done() {
133131
doneFinallyCallback.call();
134132
}
135133
} catch (Exception e) {
136-
log.error(String.format(SaturnConstant.LOG_FORMAT_FOR_STRING, callable.getJobName(), e.getMessage()),
137-
e);
134+
LogUtils.error(log, callable.getJobName(), e.toString(), e);
138135
}
139136
}
140137
}
@@ -146,31 +143,31 @@ public static void killRunningBusinessThread(ShardingItemFutureTask shardingItem
146143
try {
147144
// interrupt thread one time, wait business thread to break, wait 2000ms at most
148145
if (!isBusinessBreak(shardingItemFutureTask, shardingItemCallable)) {
149-
log.info("try to interrupt business thread");
146+
LogUtils.info(log, shardingItemCallable.getJobName(), "try to interrupt business thread");
150147
businessThread.interrupt();
151148
for (int i = 0; i < 20; i++) {
152149
if (isBusinessBreak(shardingItemFutureTask, shardingItemCallable)) {
153-
log.info("interrupt business thread done");
150+
LogUtils.info(log, shardingItemCallable.getJobName(), "interrupt business thread done");
154151
return;
155152
}
156153
Thread.sleep(100L);
157154
}
158155
}
159156
// stop thread
160157
while (!isBusinessBreak(shardingItemFutureTask, shardingItemCallable)) {
161-
log.info("try to force stop business thread");
158+
LogUtils.info(log, shardingItemCallable.getJobName(), "try to force stop business thread");
162159
businessThread.stop();
163160
if (isBusinessBreak(shardingItemFutureTask, shardingItemCallable)) {
164-
log.info("force stop business thread done");
161+
LogUtils.info(log, shardingItemCallable.getJobName(), "force stop business thread done");
165162
return;
166163
}
167164
Thread.sleep(50L);
168165
}
169-
log.info("kill business thread done");
166+
LogUtils.info(log, shardingItemCallable.getJobName(), "kill business thread done");
170167
} catch (InterruptedException e) {// NOSONAR
171168
}
172169
} else {
173-
log.warn("business thread is null while killing it");
170+
LogUtils.warn(log, shardingItemCallable.getJobName(), "business thread is null while killing it");
174171
}
175172
}
176173

saturn-core/src/main/java/com/vip/saturn/job/basic/ShutdownHandler.java

+7-5
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package com.vip.saturn.job.basic;
22

3+
import com.vip.saturn.job.utils.LogEvents;
4+
import com.vip.saturn.job.utils.LogUtils;
35
import org.slf4j.Logger;
46
import org.slf4j.LoggerFactory;
57
import sun.misc.Signal;
@@ -71,15 +73,15 @@ public void handle(Signal sn) {
7173
isHandling.set(false);
7274
}
7375
} else {
74-
log.info("shutdown is handling");
76+
LogUtils.info(log, LogEvents.ExecutorEvent.SHUTDOWN, "shutdown is handling");
7577
}
7678
}
7779

7880
private void doHandle() {
79-
log.info("msg=Received the kill command");
81+
LogUtils.info(log, LogEvents.ExecutorEvent.SHUTDOWN, "Received the kill command");
8082
callExecutorListeners();
8183
callGlobalListeners();
82-
log.info("msg=Saturn executor is closed");
84+
LogUtils.info(log, LogEvents.ExecutorEvent.SHUTDOWN, "Saturn executor is closed");
8385
if (isExit) {
8486
exit();
8587
}
@@ -96,7 +98,7 @@ private static void callExecutorListeners() {
9698
runnable.run();
9799
}
98100
} catch (Exception e) {
99-
log.error("msg=" + e.getMessage(), e);
101+
LogUtils.info(log, LogEvents.ExecutorEvent.SHUTDOWN, e.toString(), e);
100102
}
101103
}
102104
}
@@ -110,7 +112,7 @@ private static void callGlobalListeners() {
110112
runnable.run();
111113
}
112114
} catch (Exception e) {
113-
log.error("msg=" + e.getMessage(), e);
115+
LogUtils.info(log, LogEvents.ExecutorEvent.SHUTDOWN, e.toString(), e);
114116
}
115117
}
116118
globalListeners.clear();

0 commit comments

Comments
 (0)