diff --git a/app/com/linkedin/drelephant/spark/SparkMetricsAggregator.scala b/app/com/linkedin/drelephant/spark/SparkMetricsAggregator.scala index 9dc1ac5ae..135ebbd34 100644 --- a/app/com/linkedin/drelephant/spark/SparkMetricsAggregator.scala +++ b/app/com/linkedin/drelephant/spark/SparkMetricsAggregator.scala @@ -51,25 +51,36 @@ class SparkMetricsAggregator(private val aggregatorConfigurationData: Aggregator executorMemoryBytes <- executorMemoryBytesOf(data) } { val applicationDurationMillis = applicationDurationMillisOf(data) - val totalExecutorTaskTimeMillis = totalExecutorTaskTimeMillisOf(data) - - val resourcesAllocatedForUse = - aggregateresourcesAllocatedForUse(executorInstances, executorMemoryBytes, applicationDurationMillis) - val resourcesActuallyUsed = aggregateresourcesActuallyUsed(executorMemoryBytes, totalExecutorTaskTimeMillis) - - val resourcesActuallyUsedWithBuffer = resourcesActuallyUsed.doubleValue() * (1.0 + allocatedMemoryWasteBufferPercentage) - val resourcesWastedMBSeconds = (resourcesActuallyUsedWithBuffer < resourcesAllocatedForUse.doubleValue()) match { - case true => resourcesAllocatedForUse.doubleValue() - resourcesActuallyUsedWithBuffer - case false => 0.0 + if( applicationDurationMillis < 0) { + logger.warn(s"applicationDurationMillis is negative. Skipping Metrics Aggregation:${applicationDurationMillis}") + } else { + val totalExecutorTaskTimeMillis = totalExecutorTaskTimeMillisOf(data) + + val resourcesAllocatedForUse = + aggregateresourcesAllocatedForUse(executorInstances, executorMemoryBytes, applicationDurationMillis) + val resourcesActuallyUsed = aggregateresourcesActuallyUsed(executorMemoryBytes, totalExecutorTaskTimeMillis) + + val resourcesActuallyUsedWithBuffer = resourcesActuallyUsed.doubleValue() * (1.0 + allocatedMemoryWasteBufferPercentage) + val resourcesWastedMBSeconds = (resourcesActuallyUsedWithBuffer < resourcesAllocatedForUse.doubleValue()) match { + case true => resourcesAllocatedForUse.doubleValue() - resourcesActuallyUsedWithBuffer + case false => 0.0 + } + //allocated is the total used resource from the cluster. + if (resourcesAllocatedForUse.isValidLong) { + hadoopAggregatedData.setResourceUsed(resourcesAllocatedForUse.toLong) + } else { + logger.warn(s"resourcesAllocatedForUse/resourcesWasted exceeds Long.MaxValue") + logger.warn(s"ResourceUsed: ${resourcesAllocatedForUse}") + logger.warn(s"executorInstances: ${executorInstances}") + logger.warn(s"executorMemoryBytes:${executorMemoryBytes}") + logger.warn(s"applicationDurationMillis:${applicationDurationMillis}") + logger.warn(s"totalExecutorTaskTimeMillis:${totalExecutorTaskTimeMillis}") + logger.warn(s"resourcesActuallyUsedWithBuffer:${resourcesActuallyUsedWithBuffer}") + logger.warn(s"resourcesWastedMBSeconds:${resourcesWastedMBSeconds}") + logger.warn(s"allocatedMemoryWasteBufferPercentage:${allocatedMemoryWasteBufferPercentage}") + } + hadoopAggregatedData.setResourceWasted(resourcesWastedMBSeconds.toLong) } - //allocated is the total used resource from the cluster. - if (resourcesAllocatedForUse.isValidLong) { - hadoopAggregatedData.setResourceUsed(resourcesAllocatedForUse.toLong) - } else { - logger.info(s"resourcesAllocatedForUse exceeds Long.MaxValue: ${resourcesAllocatedForUse }") - } - - hadoopAggregatedData.setResourceWasted(resourcesWastedMBSeconds.toLong) } private def aggregateresourcesActuallyUsed(executorMemoryBytes: Long, totalExecutorTaskTimeMillis: BigInt): BigInt = { diff --git a/test/com/linkedin/drelephant/spark/SparkMetricsAggregatorTest.scala b/test/com/linkedin/drelephant/spark/SparkMetricsAggregatorTest.scala index a3c0e1cf2..2278d124e 100644 --- a/test/com/linkedin/drelephant/spark/SparkMetricsAggregatorTest.scala +++ b/test/com/linkedin/drelephant/spark/SparkMetricsAggregatorTest.scala @@ -46,11 +46,11 @@ class SparkMetricsAggregatorTest extends FunSpec with Matchers { new ApplicationInfo(appId, name = "app", Seq(applicationAttemptInfo)) } + val executorSummaries = Seq( + newFakeExecutorSummary(id = "1", totalDuration = 1000000L), + newFakeExecutorSummary(id = "2", totalDuration = 3000000L) + ) val restDerivedData = { - val executorSummaries = Seq( - newFakeExecutorSummary(id = "1", totalDuration = 1000000L), - newFakeExecutorSummary(id = "2", totalDuration = 3000000L) - ) SparkRestDerivedData( applicationInfo, jobDatas = Seq.empty, @@ -105,6 +105,31 @@ class SparkMetricsAggregatorTest extends FunSpec with Matchers { it("doesn't calculate total delay") { result.getTotalDelay should be(0L) } + it("sets resourceused as 0 when duration is negative") { + //make the duration negative + val applicationInfo = { + val applicationAttemptInfo = { + val now = System.currentTimeMillis + val duration = -8000000L + newFakeApplicationAttemptInfo(Some("1"), startTime = new Date(now - duration), endTime = new Date(now)) + } + new ApplicationInfo(appId, name = "app", Seq(applicationAttemptInfo)) + } + val restDerivedData = SparkRestDerivedData( + applicationInfo, + jobDatas = Seq.empty, + stageDatas = Seq.empty, + executorSummaries = executorSummaries + ) + + val data = SparkApplicationData(appId, restDerivedData, Some(logDerivedData)) + + val aggregator = new SparkMetricsAggregator(aggregatorConfigurationData) + aggregator.aggregate(data) + + val result = aggregator.getResult + result.getResourceUsed should be(0L) + } } describe("when it doesn't have log-derived data") {