Skip to content

Commit 1af02d8

Browse files
committed
remove JValue imports to support json4s 4.0.7
1 parent da770f7 commit 1af02d8

File tree

7 files changed

+64
-64
lines changed

7 files changed

+64
-64
lines changed

spark-plugin/build.sbt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import xerial.sbt.Sonatype._
22

3-
lazy val versionNum: String = "0.4.0"
3+
lazy val versionNum: String = "0.4.1-SNAPSHOT"
44
lazy val scala212 = "2.12.18"
55
lazy val scala213 = "2.13.12"
66
lazy val supportedScalaVersions = List(scala212, scala213)
@@ -22,7 +22,7 @@ lazy val dataflint = project
2222

2323
lazy val plugin = (project in file("plugin"))
2424
.settings(
25-
name := "spark",
25+
name := "dataflint-spark-dbr-14-plus",
2626
organization := "io.dataflint",
2727
crossScalaVersions := supportedScalaVersions,
2828
version := (if (git.gitCurrentTags.value.exists(_.startsWith("v"))) {

spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/api/DataflintApplicationInfoPage.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,22 +3,21 @@ package org.apache.spark.dataflint.api
33
import org.apache.spark.dataflint.listener.DataflintStore
44
import org.apache.spark.internal.Logging
55
import org.apache.spark.ui.{SparkUI, WebUIPage}
6-
import org.json4s.JValue
76
import org.json4s.{Extraction, JObject}
87

98
import javax.servlet.http.HttpServletRequest
109
import scala.xml.Node
1110

1211
class DataflintApplicationInfoPage(ui: SparkUI, dataflintStore: DataflintStore)
1312
extends WebUIPage("applicationinfo") with Logging {
14-
override def renderJson(request: HttpServletRequest): JValue = {
13+
override def renderJson(request: HttpServletRequest) = {
1514
try {
1615
val runIdConfigFromStore = ui.store.environmentInfo().sparkProperties.find(_._1 == "spark.dataflint.runId").map(_._2)
1716
val runIdPotentiallyFromConfig = if (runIdConfigFromStore.isEmpty) ui.conf.getOption("spark.dataflint.runId") else runIdConfigFromStore
1817
val applicationInfo = ui.store.applicationInfo()
1918
val environmentInfo = dataflintStore.environmentInfo()
2019
val dataFlintApplicationInfo = DataFlintApplicationInfo(runIdPotentiallyFromConfig, applicationInfo, environmentInfo)
21-
val jsonValue: JValue = Extraction.decompose(dataFlintApplicationInfo)(org.json4s.DefaultFormats)
20+
val jsonValue = Extraction.decompose(dataFlintApplicationInfo)(org.json4s.DefaultFormats)
2221
jsonValue
2322
}
2423
catch {

spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/api/DataflintCachedStoragePage.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,14 @@ import org.apache.spark.dataflint.listener.{DataflintExecutorStorageInfo, Datafl
44
import org.apache.spark.internal.Logging
55
import org.apache.spark.status.AppStatusStore
66
import org.apache.spark.ui.{SparkUI, WebUIPage}
7-
import org.json4s.{Extraction, JObject, JValue}
7+
import org.json4s.{Extraction, JObject}
88

99
import javax.servlet.http.HttpServletRequest
1010
import scala.xml.Node
1111

1212
class DataflintCachedStoragePage(ui: SparkUI, dataflintStore: DataflintStore)
1313
extends WebUIPage("cachedstorage") with Logging {
14-
override def renderJson(request: HttpServletRequest): JValue = {
14+
override def renderJson(request: HttpServletRequest) = {
1515
try {
1616
val liveRddStorage = ui.store.rddList()
1717
val rddStorage = dataflintStore.rddStorageInfo()
@@ -42,7 +42,7 @@ class DataflintCachedStoragePage(ui: SparkUI, dataflintStore: DataflintStore)
4242
val cached = rddStorage.find(_.rddId == rdd.id)
4343
liveCached.getOrElse(cached)
4444
}))).toMap
45-
val jsonValue: JValue = Extraction.decompose(graphs)(org.json4s.DefaultFormats)
45+
val jsonValue = Extraction.decompose(graphs)(org.json4s.DefaultFormats)
4646
jsonValue
4747
}
4848
catch {

spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/api/DataflintIcebergPage.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,25 +4,24 @@ import org.apache.spark.dataflint.listener.DataflintStore
44
import org.apache.spark.internal.Logging
55
import org.apache.spark.ui.{SparkUI, WebUIPage}
66
import org.json4s.{Extraction, JObject}
7-
import org.json4s.JValue
87

98
import javax.servlet.http.HttpServletRequest
109
import scala.xml.Node
1110

1211
class DataflintIcebergPage(ui: SparkUI, dataflintStore: DataflintStore)
1312
extends WebUIPage("iceberg") with Logging {
14-
override def renderJson(request: HttpServletRequest): JValue = {
13+
override def renderJson(request: HttpServletRequest) = {
1514
try {
1615
val offset = request.getParameter("offset")
1716
val length = request.getParameter("length")
1817
if (offset == null || length == null) {
19-
return JObject()
18+
JObject()
19+
} else {
20+
val commits = dataflintStore.icebergCommits(offset.toInt, length.toInt)
21+
val icebergInfo = IcebergInfo(commitsInfo = commits)
22+
val jsonValue = Extraction.decompose(icebergInfo)(org.json4s.DefaultFormats)
23+
jsonValue
2024
}
21-
22-
val commits = dataflintStore.icebergCommits(offset.toInt, length.toInt)
23-
val icebergInfo = IcebergInfo(commitsInfo = commits)
24-
val jsonValue: JValue = Extraction.decompose(icebergInfo)(org.json4s.DefaultFormats)
25-
jsonValue
2625
}
2726
catch {
2827
case e: Throwable => {
@@ -31,5 +30,6 @@ class DataflintIcebergPage(ui: SparkUI, dataflintStore: DataflintStore)
3130
}
3231
}
3332
}
33+
3434
override def render(request: HttpServletRequest): Seq[Node] = Seq[Node]()
3535
}

spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/api/DataflintSQLMetricsPage.scala

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package org.apache.spark.dataflint.api
33
import org.apache.spark.internal.Logging
44
import org.apache.spark.sql.execution.ui.{SQLAppStatusListener, SQLAppStatusStore, SparkPlanGraph}
55
import org.apache.spark.ui.{SparkUI, WebUIPage}
6-
import org.json4s.{Extraction, JObject, JValue}
6+
import org.json4s.{Extraction, JObject}
77

88
import javax.servlet.http.HttpServletRequest
99
import scala.xml.Node
@@ -12,7 +12,7 @@ class DataflintSQLMetricsPage(ui: SparkUI, sqlListener: () => Option[SQLAppStatu
1212
extends WebUIPage("sqlmetrics") with Logging {
1313
private var sqlListenerCache: Option[SQLAppStatusListener] = None
1414

15-
override def renderJson(request: HttpServletRequest): JValue = {
15+
override def renderJson(request: HttpServletRequest) = {
1616
try {
1717
if (sqlListenerCache.isEmpty) {
1818
sqlListenerCache = sqlListener()
@@ -21,24 +21,25 @@ class DataflintSQLMetricsPage(ui: SparkUI, sqlListener: () => Option[SQLAppStatu
2121
val sqlStore = new SQLAppStatusStore(ui.store.store, sqlListenerCache)
2222
val executionId = request.getParameter("executionId")
2323
if (executionId == null) {
24-
return JObject()
24+
JObject()
25+
} else {
26+
val executionIdLong = executionId.toLong
27+
val metrics = sqlStore.executionMetrics(executionIdLong)
28+
val isDatabricks = ui.conf.getOption("spark.databricks.clusterUsageTags.cloudProvider").isDefined
29+
val graph = if (isDatabricks) {
30+
val exec = sqlStore.execution(executionIdLong).get
31+
val planVersion = exec.getClass.getMethod("latestVersion").invoke(exec).asInstanceOf[Long]
32+
sqlStore.getClass.getMethods.filter(_.getName == "planGraph").head.invoke(sqlStore, executionIdLong.asInstanceOf[Object], planVersion.asInstanceOf[Object]).asInstanceOf[SparkPlanGraph]
33+
} else
34+
sqlStore.planGraph(executionIdLong)
35+
val nodesMetrics = graph.allNodes.map(node => NodeMetrics(node.id, node.name, node.metrics.map(metric => {
36+
NodeMetric(metric.name, metrics.get(metric.accumulatorId))
37+
}).toSeq))
38+
// filter nodes without metrics
39+
.filter(nodeMetrics => !nodeMetrics.metrics.forall(_.value.isEmpty))
40+
val jValue = Extraction.decompose(nodesMetrics)(org.json4s.DefaultFormats)
41+
jValue
2542
}
26-
val executionIdLong = executionId.toLong
27-
val metrics = sqlStore.executionMetrics(executionIdLong)
28-
val isDatabricks = ui.conf.getOption("spark.databricks.clusterUsageTags.cloudProvider").isDefined
29-
val graph = if (isDatabricks) {
30-
val exec = sqlStore.execution(executionIdLong).get
31-
val planVersion = exec.getClass.getMethod("latestVersion").invoke(exec).asInstanceOf[Long]
32-
sqlStore.getClass.getMethods.filter(_.getName == "planGraph").head.invoke(sqlStore, executionIdLong.asInstanceOf[Object], planVersion.asInstanceOf[Object]).asInstanceOf[SparkPlanGraph]
33-
} else
34-
sqlStore.planGraph(executionIdLong)
35-
val nodesMetrics = graph.allNodes.map(node => NodeMetrics(node.id, node.name, node.metrics.map(metric => {
36-
NodeMetric(metric.name, metrics.get(metric.accumulatorId))
37-
}).toSeq))
38-
// filter nodes without metrics
39-
.filter(nodeMetrics => !nodeMetrics.metrics.forall(_.value.isEmpty))
40-
val jValue: JValue = Extraction.decompose(nodesMetrics)(org.json4s.DefaultFormats)
41-
jValue
4243
} catch {
4344
case e: Throwable => {
4445
logError("failed to serve dataflint SQL metrics", e)

spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/api/DataflintSQLPlanPage.scala

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,15 @@ import org.apache.spark.dataflint.listener.DataflintStore
44
import org.apache.spark.internal.Logging
55
import org.apache.spark.sql.execution.ui.{SQLAppStatusListener, SQLAppStatusStore, SparkPlanGraph}
66
import org.apache.spark.ui.{SparkUI, WebUIPage}
7-
import org.json4s.{Extraction, JArray, JObject, JValue}
7+
import org.json4s.{Extraction, JArray, JObject}
88
import javax.servlet.http.HttpServletRequest
99
import scala.xml.Node
1010

1111
class DataflintSQLPlanPage(ui: SparkUI, dataflintStore: DataflintStore, sqlListener: () => Option[SQLAppStatusListener])
1212
extends WebUIPage("sqlplan") with Logging {
1313
private var sqlListenerCache: Option[SQLAppStatusListener] = None
1414

15-
override def renderJson(request: HttpServletRequest): JValue = {
15+
override def renderJson(request: HttpServletRequest) = {
1616
try {
1717
if (sqlListenerCache.isEmpty) {
1818
sqlListenerCache = sqlListener()
@@ -22,38 +22,38 @@ class DataflintSQLPlanPage(ui: SparkUI, dataflintStore: DataflintStore, sqlListe
2222
val offset = request.getParameter("offset")
2323
val length = request.getParameter("length")
2424
if (offset == null || length == null) {
25-
return JArray(List())
26-
}
27-
28-
val executionList = sqlStore.executionsList(offset.toInt, length.toInt)
25+
JArray(List())
26+
} else {
27+
val executionList = sqlStore.executionsList(offset.toInt, length.toInt)
2928

30-
val isDatabricks = ui.conf.getOption("spark.databricks.clusterUsageTags.cloudProvider").isDefined
29+
val isDatabricks = ui.conf.getOption("spark.databricks.clusterUsageTags.cloudProvider").isDefined
3130

32-
val latestVersionReader = if(isDatabricks && executionList.nonEmpty) Some(executionList.head.getClass.getMethod("latestVersion")) else None
33-
val planGraphReader = if(isDatabricks) Some(sqlStore.getClass.getMethods.filter(_.getName == "planGraph").head) else None
34-
val rddScopesToStagesReader = if(isDatabricks && executionList.nonEmpty) Some(executionList.head.getClass.getMethod("rddScopesToStages")) else None
31+
val latestVersionReader = if (isDatabricks && executionList.nonEmpty) Some(executionList.head.getClass.getMethod("latestVersion")) else None
32+
val planGraphReader = if (isDatabricks) Some(sqlStore.getClass.getMethods.filter(_.getName == "planGraph").head) else None
33+
val rddScopesToStagesReader = if (isDatabricks && executionList.nonEmpty) Some(executionList.head.getClass.getMethod("rddScopesToStages")) else None
3534

36-
val nodeIdToRddScopeIdList = dataflintStore.databricksAdditionalExecutionInfo(offset.toInt, length.toInt)
35+
val nodeIdToRddScopeIdList = dataflintStore.databricksAdditionalExecutionInfo(offset.toInt, length.toInt)
3736

38-
val sqlPlans = executionList.map { exec =>
39-
val graph = if (isDatabricks) {
40-
val planVersion = latestVersionReader.get.invoke(exec).asInstanceOf[Long]
41-
planGraphReader.get.invoke(sqlStore, exec.executionId.asInstanceOf[Object], planVersion.asInstanceOf[Object]).asInstanceOf[SparkPlanGraph]
42-
} else
37+
val sqlPlans = executionList.map { exec =>
38+
val graph = if (isDatabricks) {
39+
val planVersion = latestVersionReader.get.invoke(exec).asInstanceOf[Long]
40+
planGraphReader.get.invoke(sqlStore, exec.executionId.asInstanceOf[Object], planVersion.asInstanceOf[Object]).asInstanceOf[SparkPlanGraph]
41+
} else
4342
sqlStore.planGraph(exec.executionId)
4443

45-
val rddScopesToStages = if(isDatabricks) Some(rddScopesToStagesReader.get.invoke(exec).asInstanceOf[Map[String, Set[Object]]]) else None
44+
val rddScopesToStages = if (isDatabricks) Some(rddScopesToStagesReader.get.invoke(exec).asInstanceOf[Map[String, Set[Object]]]) else None
4645

47-
val nodeIdToRddScopeId = nodeIdToRddScopeIdList.find(_.executionId == exec.executionId).map(_.nodeIdToRddScopeId)
48-
SqlEnrichedData(exec.executionId, graph.allNodes.length, rddScopesToStages,
49-
graph.allNodes.map(node => {
50-
val rddScopeId = nodeIdToRddScopeId.flatMap(_.get(node.id))
51-
NodePlan(node.id, node.desc, rddScopeId)
52-
}).toSeq
53-
)
46+
val nodeIdToRddScopeId = nodeIdToRddScopeIdList.find(_.executionId == exec.executionId).map(_.nodeIdToRddScopeId)
47+
SqlEnrichedData(exec.executionId, graph.allNodes.length, rddScopesToStages,
48+
graph.allNodes.map(node => {
49+
val rddScopeId = nodeIdToRddScopeId.flatMap(_.get(node.id))
50+
NodePlan(node.id, node.desc, rddScopeId)
51+
}).toSeq
52+
)
53+
}
54+
val jsonValue = Extraction.decompose(sqlPlans)(org.json4s.DefaultFormats)
55+
jsonValue
5456
}
55-
val jsonValue: JValue = Extraction.decompose(sqlPlans)(org.json4s.DefaultFormats)
56-
jsonValue
5757
}
5858
catch {
5959
case e: Throwable => {

spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/api/DataflintSQLStagesRddPage.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,22 +2,22 @@ package org.apache.spark.dataflint.api
22

33
import org.apache.spark.internal.Logging
44
import org.apache.spark.ui.{SparkUI, WebUIPage}
5-
import org.json4s.{Extraction, JObject, JValue}
5+
import org.json4s.{Extraction, JObject}
66

77
import javax.servlet.http.HttpServletRequest
88
import scala.xml.Node
99

1010
class DataflintSQLStagesRddPage(ui: SparkUI)
1111
extends WebUIPage("stagesrdd") with Logging {
12-
override def renderJson(request: HttpServletRequest): JValue = {
12+
override def renderJson(request: HttpServletRequest) = {
1313
try {
1414
val graphs = ui.store.stageList(null)
1515
.filter(_.submissionTime.isDefined) // filter skipped or pending stages
1616
.map(stage => Tuple2(stage.stageId,
1717
ui.store.operationGraphForStage(stage.stageId).rootCluster.childClusters
1818
.map(rdd => Tuple2(rdd.id, rdd.name)).toMap))
1919
.toMap
20-
val jsonValue: JValue = Extraction.decompose(graphs)(org.json4s.DefaultFormats)
20+
val jsonValue = Extraction.decompose(graphs)(org.json4s.DefaultFormats)
2121
jsonValue
2222
}
2323
catch {

0 commit comments

Comments
 (0)