Skip to content

Commit ee4882f

Browse files
authored
Merge pull request #854 from AbsaOSS/pr/759
Pr/759
2 parents 3706fb3 + f74a3d1 commit ee4882f

File tree

15 files changed

+204
-46
lines changed

15 files changed

+204
-46
lines changed

README.md

+10-1
Original file line numberDiff line numberDiff line change
@@ -698,10 +698,19 @@ When one of these commands occurs spline will let you know by logging a warning.
698698
### Plugin API
699699

700700
Using a plugin API you can capture lineage from a 3rd party data source provider.
701-
Spline discover plugins automatically by scanning a classpath, so no special steps required to register and configure a plugin.
701+
By default, Spline discover plugins automatically by scanning a classpath, so no special steps required to register and configure a plugin.
702702
All you need is to create a class extending the `za.co.absa.spline.harvester.plugin.Plugin` marker trait
703703
mixed with one or more `*Processing` traits, depending on your intention.
704704

705+
To disable automatic plugin discovery and speed up initialization, set `spline.scanClasspath` to `false` in your configuration file.
706+
Then, you will need to register all necessary plugins one by one, using `spline.plugins.{className}.enabled` property, e.g.:
707+
```properties
708+
# Disable automatic plugin discovery to save on bootstrap time
709+
spline.scanClasspath=false
710+
# This explicitly registers and enables a plugin with the class name 'com.example.MyPlugin'
711+
spline.plugins.com.example.MyPlugin.enabled=true
712+
```
713+
705714
There are three general processing traits:
706715

707716
- `DataSourceFormatNameResolving` - returns a name of a data provider/format in use.

core/src/main/resources/spline.default.yaml

+62
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,17 @@ spline:
242242
# can be a json string or url to json file
243243
rules: { }
244244

245+
# ===========================================
245246
# Plugins configuration.
247+
# ===========================================
248+
249+
# Classpath scanning is required for auto-discovery of Spline agent plugins.
250+
# It might however take up some of the job's startup time depending on the size
251+
# of your Spark application classpath.
252+
# If you are experiencing long job startup times, try to disable classpath
253+
# scanning, and instead register your custom plugins explicitly in the configuration.
254+
scanClasspath: true # true | false
255+
246256
# The `key` is the fully specified plugin class name.
247257
# For example:
248258
# ---
@@ -251,6 +261,58 @@ spline:
251261
# prop1: foo
252262
# prop2: bar
253263
plugins:
264+
265+
za.co.absa.spline.harvester.plugin.composite.LogicalRelationPlugin:
266+
enabled: true
267+
268+
za.co.absa.spline.harvester.plugin.composite.SaveIntoDataSourceCommandPlugin:
269+
enabled: true
270+
271+
za.co.absa.spline.harvester.plugin.embedded.CobrixPlugin:
272+
enabled: true
273+
274+
za.co.absa.spline.harvester.plugin.embedded.ExcelPlugin:
275+
enabled: true
276+
277+
za.co.absa.spline.harvester.plugin.embedded.BigQueryPlugin:
278+
enabled: true
279+
280+
za.co.absa.spline.harvester.plugin.embedded.DataSourceV2Plugin:
281+
enabled: true
282+
283+
za.co.absa.spline.harvester.plugin.embedded.RDDPlugin:
284+
enabled: true
285+
286+
za.co.absa.spline.harvester.plugin.embedded.MongoPlugin:
287+
enabled: true
288+
289+
za.co.absa.spline.harvester.plugin.embedded.AvroPlugin:
290+
enabled: true
291+
292+
za.co.absa.spline.harvester.plugin.embedded.CassandraPlugin:
293+
enabled: true
294+
295+
za.co.absa.spline.harvester.plugin.embedded.DatabricksPlugin:
296+
enabled: true
297+
298+
za.co.absa.spline.harvester.plugin.embedded.SQLPlugin:
299+
enabled: true
300+
301+
za.co.absa.spline.harvester.plugin.embedded.JDBCPlugin:
302+
enabled: true
303+
304+
za.co.absa.spline.harvester.plugin.embedded.KafkaPlugin:
305+
enabled: true
306+
307+
za.co.absa.spline.harvester.plugin.embedded.XMLPlugin:
308+
enabled: true
309+
310+
za.co.absa.spline.harvester.plugin.embedded.DeltaPlugin:
311+
enabled: true
312+
313+
za.co.absa.spline.harvester.plugin.embedded.ElasticSearchPlugin:
314+
enabled: true
315+
254316
za.co.absa.spline.harvester.plugin.embedded.NonPersistentActionsCapturePlugin:
255317
enabled: false
256318
funcNames:

core/src/main/scala/za/co/absa/spline/agent/AgentBOM.scala

+7-3
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import za.co.absa.spline.harvester.IdGenerator.UUIDVersion
2424
import za.co.absa.spline.harvester.conf.{SQLFailureCaptureMode, SplineMode}
2525
import za.co.absa.spline.harvester.dispatcher.{CompositeLineageDispatcher, LineageDispatcher}
2626
import za.co.absa.spline.harvester.iwd.IgnoredWriteDetectionStrategy
27+
import za.co.absa.spline.harvester.plugin.PluginsConfiguration
2728
import za.co.absa.spline.harvester.postprocessing.{CompositePostProcessingFilter, PostProcessingFilter}
2829

2930
import scala.collection.JavaConverters._
@@ -37,7 +38,7 @@ private[spline] trait AgentBOM {
3738
def lineageDispatcher: LineageDispatcher
3839
def iwdStrategy: IgnoredWriteDetectionStrategy
3940
def execPlanUUIDVersion: UUIDVersion
40-
def pluginsConfig: Configuration
41+
def pluginsConfig: PluginsConfiguration
4142
}
4243

4344
object AgentBOM {
@@ -62,8 +63,11 @@ object AgentBOM {
6263
mergedConfig.getRequiredInt(ConfProperty.ExecPlanUUIDVersion)
6364
}
6465

65-
override def pluginsConfig: Configuration = {
66-
mergedConfig.subset(ConfProperty.PluginsConfigNamespace)
66+
override def pluginsConfig: PluginsConfiguration = {
67+
PluginsConfiguration(
68+
mergedConfig.getRequiredBoolean(ConfProperty.ScanClasspath),
69+
mergedConfig.subset(ConfProperty.PluginsConfigNamespace)
70+
)
6771
}
6872

6973
override lazy val postProcessingFilter: Option[PostProcessingFilter] = {

core/src/main/scala/za/co/absa/spline/agent/AgentConfig.scala

+15
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,16 @@ object AgentConfig {
8181
this
8282
}
8383

84+
def scanClasspath(enabled: Boolean): this.type = synchronized {
85+
options += ConfProperty.ScanClasspath -> enabled
86+
this
87+
}
88+
89+
def enablePlugin(name: String): this.type = synchronized {
90+
options += s"${ConfProperty.PluginsConfigNamespace}.$name.enabled" -> true
91+
this
92+
}
93+
8494
def build(): AgentConfig = new AgentConfig {
8595
options.foreach(tupled(addProperty))
8696
}
@@ -123,6 +133,11 @@ object AgentConfig {
123133
*/
124134
val IgnoreWriteDetectionStrategy = "spline.IWDStrategy"
125135

136+
/**
137+
* Should the classpath be scanned at startup
138+
*/
139+
val ScanClasspath = "spline.scanClasspath"
140+
126141
val PluginsConfigNamespace = "spline.plugins"
127142

128143
def dispatcherClassName(logicalName: String): String = s"$RootLineageDispatcher.$logicalName.${HierarchicalObjectFactory.ClassName}"

core/src/main/scala/za/co/absa/spline/agent/SplineAgent.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
package za.co.absa.spline.agent
1818

19-
import org.apache.commons.configuration.Configuration
2019
import org.apache.spark.internal.Logging
2120
import org.apache.spark.sql.SparkSession
2221
import org.apache.spark.sql.execution.QueryExecution
@@ -30,6 +29,7 @@ import za.co.absa.spline.harvester.builder.write.PluggableWriteCommandExtractor
3029
import za.co.absa.spline.harvester.converter.{DataConverter, DataTypeConverter}
3130
import za.co.absa.spline.harvester.dispatcher.LineageDispatcher
3231
import za.co.absa.spline.harvester.iwd.IgnoredWriteDetectionStrategy
32+
import za.co.absa.spline.harvester.plugin.PluginsConfiguration
3333
import za.co.absa.spline.harvester.plugin.registry.AutoDiscoveryPluginRegistry
3434
import za.co.absa.spline.harvester.postprocessing._
3535
import za.co.absa.spline.harvester.qualifier.HDFSPathQualifier
@@ -54,7 +54,7 @@ object SplineAgent extends Logging {
5454
)
5555

5656
def create(
57-
pluginsConfig: Configuration,
57+
pluginsConfig: PluginsConfiguration,
5858
session: SparkSession,
5959
lineageDispatcher: LineageDispatcher,
6060
userPostProcessingFilter: Option[PostProcessingFilter],

core/src/main/scala/za/co/absa/spline/harvester/builder/dsformat/PluggableDataSourceFormatResolver.scala

+2-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ class PluggableDataSourceFormatResolver(pluginRegistry: PluginRegistry) extends
2525
private val processFn =
2626
pluginRegistry.plugins[DataSourceFormatNameResolving]
2727
.map(_.formatNameResolver)
28-
.reduce(_ orElse _)
28+
.reduceOption(_ orElse _)
29+
.getOrElse(PartialFunction.empty)
2930
.orElse[AnyRef, String] {
3031
case dsr: DataSourceRegister => dsr.shortName
3132
case o => o.toString

core/src/main/scala/za/co/absa/spline/harvester/builder/read/PluggableReadCommandExtractor.scala

+4-2
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,14 @@ class PluggableReadCommandExtractor(
3434
private val planProcessFn =
3535
pluginRegistry.plugins[ReadNodeProcessing]
3636
.map(_.readNodeProcessor)
37-
.reduce(_ orElse _)
37+
.reduceOption(_ orElse _)
38+
.getOrElse(PartialFunction.empty)
3839

3940
private val rddProcessFn =
4041
pluginRegistry.plugins[RddReadNodeProcessing]
4142
.map(_.rddReadNodeProcessor)
42-
.reduce(_ orElse _)
43+
.reduceOption(_ orElse _)
44+
.getOrElse(PartialFunction.empty)
4345

4446
override def asReadCommand(planOrRdd: PlanOrRdd): Option[ReadCommand] = {
4547
val res = planOrRdd match {

core/src/main/scala/za/co/absa/spline/harvester/builder/write/PluggableWriteCommandExtractor.scala

+2-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ class PluggableWriteCommandExtractor(
3838
private val processFn: ((FuncName, LogicalPlan)) => Option[WriteNodeInfo] =
3939
pluginRegistry.plugins[WriteNodeProcessing]
4040
.map(_.writeNodeProcessor)
41-
.reduce(_ orElse _)
41+
.reduceOption(_ orElse _)
42+
.getOrElse(PartialFunction.empty)
4243
.lift
4344

4445
def asWriteCommand(funcName: FuncName, logicalPlan: LogicalPlan): Option[WriteCommand] = {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Copyright 2023 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package za.co.absa.spline.harvester.plugin
18+
19+
import org.apache.commons.configuration.Configuration
20+
21+
case class PluginsConfiguration(
22+
classpathScanEnabled: Boolean,
23+
config: Configuration
24+
)

core/src/main/scala/za/co/absa/spline/harvester/plugin/composite/LogicalRelationPlugin.scala

+2-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ class LogicalRelationPlugin(pluginRegistry: PluginRegistry) extends Plugin with
3030
private lazy val baseRelProcessor =
3131
pluginRegistry.plugins[BaseRelationProcessing]
3232
.map(_.baseRelationProcessor)
33-
.reduce(_ orElse _)
33+
.reduceOption(_ orElse _)
34+
.getOrElse(PartialFunction.empty)
3435

3536
override val readNodeProcessor: PartialFunction[LogicalPlan, ReadNodeInfo] = {
3637
case lr: LogicalRelation

core/src/main/scala/za/co/absa/spline/harvester/plugin/composite/SaveIntoDataSourceCommandPlugin.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@ class SaveIntoDataSourceCommandPlugin(
3939
private lazy val rpProcessor =
4040
pluginRegistry.plugins[RelationProviderProcessing]
4141
.map(_.relationProviderProcessor)
42-
.reduce(_ orElse _)
43-
42+
.reduceOption(_ orElse _)
43+
.getOrElse(PartialFunction.empty)
4444

4545
override def writeNodeProcessor: PartialFunction[(FuncName, LogicalPlan), WriteNodeInfo] = {
4646
case (_, cmd: SaveIntoDataSourceCommand) => cmd match {

core/src/main/scala/za/co/absa/spline/harvester/plugin/registry/AutoDiscoveryPluginRegistry.scala

+44-15
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,8 @@ import org.apache.commons.configuration.Configuration
2121
import org.apache.commons.lang.ClassUtils.{getAllInterfaces, getAllSuperclasses}
2222
import org.apache.spark.internal.Logging
2323
import za.co.absa.spline.commons.lang.ARM
24-
import za.co.absa.spline.harvester.plugin.Plugin
2524
import za.co.absa.spline.harvester.plugin.Plugin.Precedence
26-
import za.co.absa.spline.harvester.plugin.registry.AutoDiscoveryPluginRegistry.{EnabledByDefault, EnabledConfProperty, PluginClasses, getOnlyOrThrow}
25+
import za.co.absa.spline.harvester.plugin.{Plugin, PluginsConfiguration}
2726

2827
import javax.annotation.Priority
2928
import scala.collection.JavaConverters._
@@ -32,11 +31,13 @@ import scala.util.Try
3231
import scala.util.control.NonFatal
3332

3433
class AutoDiscoveryPluginRegistry(
35-
conf: Configuration,
34+
pluginsConf: PluginsConfiguration,
3635
injectables: AnyRef*
3736
) extends PluginRegistry
3837
with Logging {
3938

39+
import za.co.absa.spline.harvester.plugin.registry.AutoDiscoveryPluginRegistry._
40+
4041
private val injectablesByType: Map[Class[_], Seq[_ <: AnyRef]] = {
4142
val typedInjectables =
4243
for {
@@ -47,13 +48,30 @@ class AutoDiscoveryPluginRegistry(
4748
typedInjectables.groupBy(_._1).mapValues(_.map(_._2))
4849
}
4950

50-
private val allPlugins: Seq[Plugin] =
51-
for (pc <- PluginClasses if isPluginEnabled(pc)) yield {
52-
logInfo(s"Loading plugin: $pc")
53-
instantiatePlugin(pc)
54-
.recover({ case NonFatal(e) => throw new RuntimeException(s"Plugin instantiation failure: $pc", e) })
51+
private val allPlugins: Seq[Plugin] = {
52+
val discoveredClasses: Seq[Class[Plugin]] =
53+
if (pluginsConf.classpathScanEnabled) scanForPluginClasses()
54+
else {
55+
logInfo(s"Classpath scanning is DISABLED. Only explicitly configured plugins will be loaded.")
56+
Seq.empty
57+
}
58+
59+
val configuredClasses: Seq[Class[Plugin]] = getRegisteredPluginClasses(pluginsConf.config)
60+
61+
val allFoundPluginClasses: Seq[Class[Plugin]] = (discoveredClasses ++ configuredClasses).distinct
62+
63+
val allSortedPluginClasses = allFoundPluginClasses
64+
.map(c => c -> priorityOf(c))
65+
.sortBy({ case (_, p) => p })
66+
.map({ case (c, _) => c })
67+
68+
for (cls <- allSortedPluginClasses if isPluginEnabled(cls)) yield {
69+
logInfo(s"Loading plugin: $cls")
70+
instantiatePlugin(cls)
71+
.recover({ case NonFatal(e) => throw new RuntimeException(s"Plugin instantiation failure: $cls", e) })
5572
.get
5673
}
74+
}
5775

5876
override def plugins[A: ClassTag]: Seq[Plugin with A] = {
5977
val ct = implicitly[ClassTag[A]]
@@ -65,7 +83,7 @@ class AutoDiscoveryPluginRegistry(
6583
val constr = getOnlyOrThrow(constrs, s"Plugin class must have a single public constructor: ${constrs.mkString(", ")}")
6684
val args = constr.getParameterTypes.map {
6785
case ct if classOf[Configuration].isAssignableFrom(ct) =>
68-
conf.subset(pluginClass.getName)
86+
pluginsConf.config.subset(pluginClass.getName)
6987
case pt =>
7088
val candidates = injectablesByType.getOrElse(pt, sys.error(s"Cannot bind $pt. No value found"))
7189
getOnlyOrThrow(candidates, s"Ambiguous constructor parameter binding. Multiple values found for $pt: ${candidates.length}")
@@ -74,7 +92,7 @@ class AutoDiscoveryPluginRegistry(
7492
}
7593

7694
private def isPluginEnabled(pc: Class[Plugin]): Boolean = {
77-
val pluginConf = conf.subset(pc.getName)
95+
val pluginConf = pluginsConf.config.subset(pc.getName)
7896
val isEnabled = pluginConf.getBoolean(EnabledConfProperty, EnabledByDefault)
7997
if (!isEnabled) {
8098
logWarning(s"Plugin ${pc.getName} is disabled in the configuration.")
@@ -89,22 +107,33 @@ object AutoDiscoveryPluginRegistry extends Logging {
89107
private val EnabledConfProperty = "enabled"
90108
private val EnabledByDefault = true
91109

92-
private val PluginClasses: Seq[Class[Plugin]] = {
110+
private def scanForPluginClasses(): Seq[Class[Plugin]] = {
93111
logDebug("Scanning for plugins")
94112
val classGraph = new ClassGraph().enableClassInfo
95113
for {
96114
scanResult <- ARM.managed(classGraph.scan)
97-
(cls, prt) <- scanResult
115+
cls <- scanResult
98116
.getClassesImplementing(classOf[Plugin].getName)
99117
.loadClasses.asScala.asInstanceOf[Seq[Class[Plugin]]]
100-
.map(c => c -> priorityOf(c))
101-
.sortBy(_._2)
102118
} yield {
103-
logDebug(s"Found plugin [priority=$prt]\t: $cls")
119+
logDebug(s"Discovered plugin: $cls")
104120
cls
105121
}
106122
}
107123

124+
private def getRegisteredPluginClasses(conf: Configuration): Seq[Class[Plugin]] = {
125+
for {
126+
key <- conf.getKeys.asScala.toSeq
127+
if key.endsWith(s".$EnabledConfProperty") // Looking for keys ending with ".enabled", since plugins must be explicitly enabled
128+
className = key.dropRight(EnabledConfProperty.length + 1) // Dropping ".enabled" to get plugin class name
129+
cls = Class.forName(className)
130+
if classOf[Plugin].isAssignableFrom(cls)
131+
} yield {
132+
logDebug(s"Found registered plugin: $cls")
133+
cls.asInstanceOf[Class[Plugin]]
134+
}
135+
}
136+
108137
private def priorityOf(c: Class[Plugin]): Int =
109138
Option(c.getAnnotation(classOf[Priority]))
110139
.map(_.value)

0 commit comments

Comments
 (0)