From 5d30561ec7fcd0d39815a1b84b7f308298d27ed9 Mon Sep 17 00:00:00 2001 From: shankar Date: Fri, 14 Apr 2017 19:06:11 +0530 Subject: [PATCH 1/3] Fix SparkMetricsAggregator to not produce negative ResourceUsage Sometimes the endtime is 0 in the applicationInfo. This is most likely because ApplicationEnd event was never fired. Fix to MetricsAggregator to handle this by setting duration to 0 in those cases. We need to handle this in the fetcher to set appropriate endtime in a later PR --- .../spark/SparkMetricsAggregator.scala | 19 +++++++++-- .../spark/SparkMetricsAggregatorTest.scala | 33 ++++++++++++++++--- 2 files changed, 45 insertions(+), 7 deletions(-) diff --git a/app/com/linkedin/drelephant/spark/SparkMetricsAggregator.scala b/app/com/linkedin/drelephant/spark/SparkMetricsAggregator.scala index 9dc1ac5ae..6053f3d54 100644 --- a/app/com/linkedin/drelephant/spark/SparkMetricsAggregator.scala +++ b/app/com/linkedin/drelephant/spark/SparkMetricsAggregator.scala @@ -63,10 +63,18 @@ class SparkMetricsAggregator(private val aggregatorConfigurationData: Aggregator case false => 0.0 } //allocated is the total used resource from the cluster. - if (resourcesAllocatedForUse.isValidLong) { + if (resourcesAllocatedForUse.isValidLong && resourcesAllocatedForUse > 0) { hadoopAggregatedData.setResourceUsed(resourcesAllocatedForUse.toLong) } else { - logger.info(s"resourcesAllocatedForUse exceeds Long.MaxValue: ${resourcesAllocatedForUse }") + logger.warn(s"resourcesAllocatedForUse exceeds Long.MaxValue: ${resourcesAllocatedForUse}") + logger.warn(s"ResourceUsed: ${resourcesAllocatedForUse}") + logger.warn(s"executorInstances: ${executorInstances}") + logger.warn(s"executorMemoryBytes:${executorMemoryBytes}") + logger.warn(s"applicationDurationMillis:${applicationDurationMillis}") + logger.warn(s"totalExecutorTaskTimeMillis:${totalExecutorTaskTimeMillis}") + logger.warn(s"resourcesActuallyUsedWithBuffer:${resourcesActuallyUsedWithBuffer}") + logger.warn(s"resourcesWastedMBSeconds:${resourcesWastedMBSeconds}") + logger.warn(s"allocatedMemoryWasteBufferPercentage:${allocatedMemoryWasteBufferPercentage}") } hadoopAggregatedData.setResourceWasted(resourcesWastedMBSeconds.toLong) @@ -99,7 +107,12 @@ class SparkMetricsAggregator(private val aggregatorConfigurationData: Aggregator private def applicationDurationMillisOf(data: SparkApplicationData): Long = { require(data.applicationInfo.attempts.nonEmpty) val lastApplicationAttemptInfo = data.applicationInfo.attempts.last - lastApplicationAttemptInfo.endTime.getTime - lastApplicationAttemptInfo.startTime.getTime + if(lastApplicationAttemptInfo.endTime.getTime < lastApplicationAttemptInfo.startTime.getTime) { + logger.info(s"Negative duration:${lastApplicationAttemptInfo.attemptId.get} startTime:${lastApplicationAttemptInfo.startTime.getTime} endTime:${lastApplicationAttemptInfo.endTime.getTime} ") + 0L + } else { + lastApplicationAttemptInfo.endTime.getTime - lastApplicationAttemptInfo.startTime.getTime + } } private def totalExecutorTaskTimeMillisOf(data: SparkApplicationData): BigInt = { diff --git a/test/com/linkedin/drelephant/spark/SparkMetricsAggregatorTest.scala b/test/com/linkedin/drelephant/spark/SparkMetricsAggregatorTest.scala index a3c0e1cf2..2278d124e 100644 --- a/test/com/linkedin/drelephant/spark/SparkMetricsAggregatorTest.scala +++ b/test/com/linkedin/drelephant/spark/SparkMetricsAggregatorTest.scala @@ -46,11 +46,11 @@ class SparkMetricsAggregatorTest extends FunSpec with Matchers { new ApplicationInfo(appId, name = "app", Seq(applicationAttemptInfo)) } + val executorSummaries = Seq( + newFakeExecutorSummary(id = "1", totalDuration = 1000000L), + newFakeExecutorSummary(id = "2", totalDuration = 3000000L) + ) val restDerivedData = { - val executorSummaries = Seq( - newFakeExecutorSummary(id = "1", totalDuration = 1000000L), - newFakeExecutorSummary(id = "2", totalDuration = 3000000L) - ) SparkRestDerivedData( applicationInfo, jobDatas = Seq.empty, @@ -105,6 +105,31 @@ class SparkMetricsAggregatorTest extends FunSpec with Matchers { it("doesn't calculate total delay") { result.getTotalDelay should be(0L) } + it("sets resourceused as 0 when duration is negative") { + //make the duration negative + val applicationInfo = { + val applicationAttemptInfo = { + val now = System.currentTimeMillis + val duration = -8000000L + newFakeApplicationAttemptInfo(Some("1"), startTime = new Date(now - duration), endTime = new Date(now)) + } + new ApplicationInfo(appId, name = "app", Seq(applicationAttemptInfo)) + } + val restDerivedData = SparkRestDerivedData( + applicationInfo, + jobDatas = Seq.empty, + stageDatas = Seq.empty, + executorSummaries = executorSummaries + ) + + val data = SparkApplicationData(appId, restDerivedData, Some(logDerivedData)) + + val aggregator = new SparkMetricsAggregator(aggregatorConfigurationData) + aggregator.aggregate(data) + + val result = aggregator.getResult + result.getResourceUsed should be(0L) + } } describe("when it doesn't have log-derived data") { From b5562b0d2e5df687a7ab5b5772312f8f77df1e30 Mon Sep 17 00:00:00 2001 From: shankar Date: Tue, 18 Apr 2017 09:18:29 +0530 Subject: [PATCH 2/3] Skip aggregation if duration is zero instead of hardcoding to 0 --- .../spark/SparkMetricsAggregator.scala | 54 ++++++++----------- 1 file changed, 22 insertions(+), 32 deletions(-) diff --git a/app/com/linkedin/drelephant/spark/SparkMetricsAggregator.scala b/app/com/linkedin/drelephant/spark/SparkMetricsAggregator.scala index 6053f3d54..804661ab1 100644 --- a/app/com/linkedin/drelephant/spark/SparkMetricsAggregator.scala +++ b/app/com/linkedin/drelephant/spark/SparkMetricsAggregator.scala @@ -51,33 +51,28 @@ class SparkMetricsAggregator(private val aggregatorConfigurationData: Aggregator executorMemoryBytes <- executorMemoryBytesOf(data) } { val applicationDurationMillis = applicationDurationMillisOf(data) - val totalExecutorTaskTimeMillis = totalExecutorTaskTimeMillisOf(data) - - val resourcesAllocatedForUse = - aggregateresourcesAllocatedForUse(executorInstances, executorMemoryBytes, applicationDurationMillis) - val resourcesActuallyUsed = aggregateresourcesActuallyUsed(executorMemoryBytes, totalExecutorTaskTimeMillis) - - val resourcesActuallyUsedWithBuffer = resourcesActuallyUsed.doubleValue() * (1.0 + allocatedMemoryWasteBufferPercentage) - val resourcesWastedMBSeconds = (resourcesActuallyUsedWithBuffer < resourcesAllocatedForUse.doubleValue()) match { - case true => resourcesAllocatedForUse.doubleValue() - resourcesActuallyUsedWithBuffer - case false => 0.0 - } - //allocated is the total used resource from the cluster. - if (resourcesAllocatedForUse.isValidLong && resourcesAllocatedForUse > 0) { - hadoopAggregatedData.setResourceUsed(resourcesAllocatedForUse.toLong) - } else { - logger.warn(s"resourcesAllocatedForUse exceeds Long.MaxValue: ${resourcesAllocatedForUse}") - logger.warn(s"ResourceUsed: ${resourcesAllocatedForUse}") - logger.warn(s"executorInstances: ${executorInstances}") - logger.warn(s"executorMemoryBytes:${executorMemoryBytes}") - logger.warn(s"applicationDurationMillis:${applicationDurationMillis}") - logger.warn(s"totalExecutorTaskTimeMillis:${totalExecutorTaskTimeMillis}") - logger.warn(s"resourcesActuallyUsedWithBuffer:${resourcesActuallyUsedWithBuffer}") - logger.warn(s"resourcesWastedMBSeconds:${resourcesWastedMBSeconds}") - logger.warn(s"allocatedMemoryWasteBufferPercentage:${allocatedMemoryWasteBufferPercentage}") + if( applicationDurationMillis < 0) { + logger.warn(s"applicationDurationMillis is negative. Skipping Metrics Aggregation:${applicationDurationMillis}") + } else { + val totalExecutorTaskTimeMillis = totalExecutorTaskTimeMillisOf(data) + + val resourcesAllocatedForUse = + aggregateresourcesAllocatedForUse(executorInstances, executorMemoryBytes, applicationDurationMillis) + val resourcesActuallyUsed = aggregateresourcesActuallyUsed(executorMemoryBytes, totalExecutorTaskTimeMillis) + + val resourcesActuallyUsedWithBuffer = resourcesActuallyUsed.doubleValue() * (1.0 + allocatedMemoryWasteBufferPercentage) + val resourcesWastedMBSeconds = (resourcesActuallyUsedWithBuffer < resourcesAllocatedForUse.doubleValue()) match { + case true => resourcesAllocatedForUse.doubleValue() - resourcesActuallyUsedWithBuffer + case false => 0.0 + } + //allocated is the total used resource from the cluster. + if (resourcesAllocatedForUse.isValidLong && resourcesAllocatedForUse >= 0) { + hadoopAggregatedData.setResourceUsed(resourcesAllocatedForUse.toLong) + } + if( resourcesWastedMBSeconds >= 0.0) { + hadoopAggregatedData.setResourceWasted(resourcesWastedMBSeconds.toLong) + } } - - hadoopAggregatedData.setResourceWasted(resourcesWastedMBSeconds.toLong) } private def aggregateresourcesActuallyUsed(executorMemoryBytes: Long, totalExecutorTaskTimeMillis: BigInt): BigInt = { @@ -107,12 +102,7 @@ class SparkMetricsAggregator(private val aggregatorConfigurationData: Aggregator private def applicationDurationMillisOf(data: SparkApplicationData): Long = { require(data.applicationInfo.attempts.nonEmpty) val lastApplicationAttemptInfo = data.applicationInfo.attempts.last - if(lastApplicationAttemptInfo.endTime.getTime < lastApplicationAttemptInfo.startTime.getTime) { - logger.info(s"Negative duration:${lastApplicationAttemptInfo.attemptId.get} startTime:${lastApplicationAttemptInfo.startTime.getTime} endTime:${lastApplicationAttemptInfo.endTime.getTime} ") - 0L - } else { - lastApplicationAttemptInfo.endTime.getTime - lastApplicationAttemptInfo.startTime.getTime - } + lastApplicationAttemptInfo.endTime.getTime - lastApplicationAttemptInfo.startTime.getTime } private def totalExecutorTaskTimeMillisOf(data: SparkApplicationData): BigInt = { From d70e5f7f7143dd43ce53a88fc6343e56869f2a32 Mon Sep 17 00:00:00 2001 From: shankar Date: Tue, 18 Apr 2017 10:09:35 +0530 Subject: [PATCH 3/3] fix code review comments --- .../spark/SparkMetricsAggregator.scala | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/app/com/linkedin/drelephant/spark/SparkMetricsAggregator.scala b/app/com/linkedin/drelephant/spark/SparkMetricsAggregator.scala index 804661ab1..135ebbd34 100644 --- a/app/com/linkedin/drelephant/spark/SparkMetricsAggregator.scala +++ b/app/com/linkedin/drelephant/spark/SparkMetricsAggregator.scala @@ -66,12 +66,20 @@ class SparkMetricsAggregator(private val aggregatorConfigurationData: Aggregator case false => 0.0 } //allocated is the total used resource from the cluster. - if (resourcesAllocatedForUse.isValidLong && resourcesAllocatedForUse >= 0) { + if (resourcesAllocatedForUse.isValidLong) { hadoopAggregatedData.setResourceUsed(resourcesAllocatedForUse.toLong) + } else { + logger.warn(s"resourcesAllocatedForUse/resourcesWasted exceeds Long.MaxValue") + logger.warn(s"ResourceUsed: ${resourcesAllocatedForUse}") + logger.warn(s"executorInstances: ${executorInstances}") + logger.warn(s"executorMemoryBytes:${executorMemoryBytes}") + logger.warn(s"applicationDurationMillis:${applicationDurationMillis}") + logger.warn(s"totalExecutorTaskTimeMillis:${totalExecutorTaskTimeMillis}") + logger.warn(s"resourcesActuallyUsedWithBuffer:${resourcesActuallyUsedWithBuffer}") + logger.warn(s"resourcesWastedMBSeconds:${resourcesWastedMBSeconds}") + logger.warn(s"allocatedMemoryWasteBufferPercentage:${allocatedMemoryWasteBufferPercentage}") } - if( resourcesWastedMBSeconds >= 0.0) { - hadoopAggregatedData.setResourceWasted(resourcesWastedMBSeconds.toLong) - } + hadoopAggregatedData.setResourceWasted(resourcesWastedMBSeconds.toLong) } }