Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor IndexManagement to support custom actions #288

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ task ktlint(type: JavaExec, group: "verification") {
description = "Check Kotlin code style."
main = "com.pinterest.ktlint.Main"
classpath = configurations.ktlint
args "src/**/*.kt"
args "src/**/*.kt", "spi/src/main/**/*.kt"
}

check.dependsOn ktlint
Expand All @@ -113,7 +113,7 @@ task ktlintFormat(type: JavaExec, group: "formatting") {
description = "Fix Kotlin code style deviations."
main = "com.pinterest.ktlint.Main"
classpath = configurations.ktlint
args "-F", "src/**/*.kt"
args "-F", "src/**/*.kt", "spi/src/main/**/*.kt"
}

detekt {
Expand Down Expand Up @@ -148,6 +148,7 @@ dependencies {
compile "org.jetbrains.kotlin:kotlin-stdlib-jdk8:${kotlin_version}"
compile 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.9'
compile "org.jetbrains:annotations:13.0"
compile project(path: ":${rootProject.name}-spi", configuration: 'shadow')
compile "org.opensearch:notification:${notification_version}"
compile "org.opensearch:common-utils:${common_utils_version}"
compile "com.github.seancfoley:ipaddress:5.3.3"
Expand Down
3 changes: 3 additions & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,6 @@
*/

rootProject.name = 'opensearch-index-management'

include "spi"
project(":spi").name = rootProject.name + "-spi"
85 changes: 85 additions & 0 deletions spi/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
import org.opensearch.gradle.test.RestIntegTestTask

plugins {
id 'com.github.johnrengelman.shadow'
id 'jacoco'
}

apply plugin: 'opensearch.java'
apply plugin: 'opensearch.testclusters'
apply plugin: 'opensearch.java-rest-test'
apply plugin: 'kotlin'
apply plugin: 'org.jetbrains.kotlin.jvm'
apply plugin: 'org.jetbrains.kotlin.plugin.allopen'

ext {
projectSubstitutions = [:]
licenseFile = rootProject.file('LICENSE.txt')
noticeFile = rootProject.file('NOTICE')
}

jacoco {
toolVersion = '0.8.5'
reportsDir = file("$buildDir/JacocoReport")
}

jacocoTestReport {
reports {
xml.enabled false
csv.enabled false
html.destination file("${buildDir}/jacoco/")
}
}
check.dependsOn jacocoTestReport

repositories {
mavenLocal()
mavenCentral()
maven { url "https://plugins.gradle.org/m2/" }
maven { url "https://aws.oss.sonatype.org/content/repositories/snapshots" }
}

configurations.all {
if (it.state != Configuration.State.UNRESOLVED) return
resolutionStrategy {
force "org.jetbrains.kotlin:kotlin-stdlib:${kotlin_version}"
force "org.jetbrains.kotlin:kotlin-stdlib-common:${kotlin_version}"
}
}

dependencies {
compileOnly "org.opensearch:opensearch:${opensearch_version}"
compileOnly "org.jetbrains.kotlin:kotlin-stdlib:${kotlin_version}"
compileOnly "org.jetbrains.kotlin:kotlin-stdlib-common:${kotlin_version}"
compileOnly "org.jetbrains.kotlin:kotlin-stdlib-jdk8:${kotlin_version}"
compileOnly "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.9"
compileOnly "org.opensearch:common-utils:${common_utils_version}"

testImplementation "org.opensearch.test:framework:${opensearch_version}"
testImplementation "org.apache.logging.log4j:log4j-core:${versions.log4j}"
}

test {
doFirst {
test.classpath -= project.files(project.tasks.named('shadowJar'))
test.classpath -= project.configurations.getByName(ShadowBasePlugin.CONFIGURATION_NAME)
test.classpath += project.extensions.getByType(SourceSetContainer).getByName(SourceSet.MAIN_SOURCE_SET_NAME).runtimeClasspath
}
systemProperty 'tests.security.manager', 'false'
}

task integTest(type: RestIntegTestTask) {
description 'Run integ test with opensearch test framework'
group 'verification'
systemProperty 'tests.security.manager', 'false'
dependsOn test
}
check.dependsOn integTest

testClusters.javaRestTest {
testDistribution = 'INTEG_TEST'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.spi

import org.opensearch.indexmanagement.spi.indexstatemanagement.ActionParser
import org.opensearch.indexmanagement.spi.indexstatemanagement.DefaultStatusChecker
import org.opensearch.indexmanagement.spi.indexstatemanagement.IndexMetadataService
import org.opensearch.indexmanagement.spi.indexstatemanagement.StatusChecker

/**
* SPI for IndexManagement
*/
interface IndexManagementExtension {

/**
* List of action parsers that are supported by the extension, each of the action parser will parse the policy action into the defined action.
* The ActionParser provides the ability to parse the action
*/
fun getISMActionParsers(): List<ActionParser>

/**
* Status checker is used by IndexManagement to check the status of the extension before executing the actions registered by the extension.
* Actions registered by the plugin can only be executed if in enabled, otherwise the action fails without retries. The status returned
* should represent if the extension is enabled or disabled, and should not represent extension health or the availability of some extension
* dependency.
*/
fun statusChecker(): StatusChecker {
return DefaultStatusChecker()
}

/**
* Name of the extension
*/
fun getExtensionName(): String

/**
* Not Required to override but if extension moves the index metadata outside of cluster state and requires IndexManagement to manage these
* indices provide the metadata service that can provide the index metadata for these indices. An extension need to label the metadata service
* with a type string which is used to distinguish indices in IndexManagement plugin
*/
fun getIndexMetadataService(): Map<String, IndexMetadataService> {
return mapOf()
}

/**
* Caution: Experimental and can be removed in future
*
* If extension wants IndexManagement to determine cluster state indices UUID based on custom index setting if
* present of cluster state override this method.
*/
fun overrideClusterStateIndexUuidSetting(): String? {
return null
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.spi.indexstatemanagement

import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.common.io.stream.Writeable
import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.ToXContentObject
import org.opensearch.common.xcontent.XContentBuilder
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionRetry
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionTimeout
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext
import java.time.Instant

abstract class Action(
val type: String,
val actionIndex: Int
) : ToXContentObject, Writeable {

var configTimeout: ActionTimeout? = null
var configRetry: ActionRetry? = ActionRetry(DEFAULT_RETRIES)
var customAction: Boolean = false

final override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
builder.startObject()
configTimeout?.toXContent(builder, params)
configRetry?.toXContent(builder, params)
// Include a "custom" object wrapper for custom actions to allow extensions to put arbitrary action configs in the config
// index. The EXCLUDE_CUSTOM_FIELD_PARAM is used to not include this wrapper in api responses
if (customAction && !params.paramAsBoolean(EXCLUDE_CUSTOM_FIELD_PARAM, false)) builder.startObject(CUSTOM_ACTION_FIELD)
populateAction(builder, params)
if (customAction && !params.paramAsBoolean(EXCLUDE_CUSTOM_FIELD_PARAM, false)) builder.endObject()
return builder.endObject()
}

/**
* The implementer of Action can change this method to correctly serialize the internals of the action
* when stored internally or returned as response
*/
open fun populateAction(builder: XContentBuilder, params: ToXContent.Params) {
builder.startObject(type).endObject()
}

final override fun writeTo(out: StreamOutput) {
out.writeString(type)
out.writeOptionalWriteable(configTimeout)
out.writeOptionalWriteable(configRetry)
populateAction(out)
}

fun getUpdatedActionMetadata(managedIndexMetaData: ManagedIndexMetaData, stateName: String): ActionMetaData {
val stateMetaData = managedIndexMetaData.stateMetaData
val actionMetaData = managedIndexMetaData.actionMetaData

return when {
// start a new action
stateMetaData?.name != stateName ->
ActionMetaData(this.type, Instant.now().toEpochMilli(), this.actionIndex, false, 0, 0, null)
actionMetaData?.index != this.actionIndex ->
ActionMetaData(this.type, Instant.now().toEpochMilli(), this.actionIndex, false, 0, 0, null)
// RetryAPI will reset startTime to null for actionMetaData and we'll reset it to "now" here
else -> actionMetaData.copy(startTime = actionMetaData.startTime ?: Instant.now().toEpochMilli())
}
}

/**
* The implementer of Action can change this method to correctly serialize the internals of the action
* when data is shared between nodes
*/
open fun populateAction(out: StreamOutput) {
out.writeInt(actionIndex)
}

/**
* Get all the steps associated with the action
*/
abstract fun getSteps(): List<Step>

/**
* Get the current step to execute in the action
*/
abstract fun getStepToExecute(context: StepContext): Step

final fun isLastStep(stepName: String): Boolean = getSteps().last().name == stepName

final fun isFirstStep(stepName: String): Boolean = getSteps().first().name == stepName

/*
* Gets if the managedIndexMetaData reflects a state in which this action has completed successfully. Used in the
* runner when determining if the index metadata should be deleted. If the action isFinishedSuccessfully and
* deleteIndexMetadataAfterFinish is set to true, then we issue a request to delete the managedIndexConfig and its
* managedIndexMetadata.
*/
final fun isFinishedSuccessfully(managedIndexMetaData: ManagedIndexMetaData): Boolean {
val policyRetryInfo = managedIndexMetaData.policyRetryInfo
if (policyRetryInfo?.failed == true) return false
val actionMetaData = managedIndexMetaData.actionMetaData
if (actionMetaData == null || actionMetaData.failed || actionMetaData.name != this.type) return false
val stepMetaData = managedIndexMetaData.stepMetaData
if (stepMetaData == null || !isLastStep(stepMetaData.name) || stepMetaData.stepStatus != Step.StepStatus.COMPLETED) return false
return true
}

/*
* Denotes if the index metadata in the config index should be deleted for the index this action has just
* successfully finished running on. This may be used by custom actions which delete some off-cluster index,
* and following the action's success, the managed index config and metadata need to be deleted.
*/
open fun deleteIndexMetadataAfterFinish(): Boolean = false

companion object {
const val DEFAULT_RETRIES = 3L
const val CUSTOM_ACTION_FIELD = "custom"
const val EXCLUDE_CUSTOM_FIELD_PARAM = "exclude_custom"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.spi.indexstatemanagement

import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.xcontent.XContentParser

abstract class ActionParser(var customAction: Boolean = false) {

/**
* The action type parser will parse
*/
abstract fun getActionType(): String

/**
* Deserialize Action from stream input
*/
abstract fun fromStreamInput(sin: StreamInput): Action

/**
* Deserialize Action from xContent
*/
abstract fun fromXContent(xcp: XContentParser, index: Int): Action
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.spi.indexstatemanagement

import org.opensearch.client.Client
import org.opensearch.cluster.service.ClusterService
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ISMIndexMetadata

/**
* ISM by default considers all the index metadata to be part of the cluster state,
* if that doesn't hold true and indices metadata is present in some other place and
* ISM still need to manage these indices the following interface provides a mechanism
* for ISM extensions to register the metadata service for the type so ISM can get the
* index metadata for these special type of indices.
*
* ISM Rest APIs allows support for type param which determines the type of index, if there
* is a registered metadata service for the type - ISM will use the service to get the metadata
* else uses the default i.e cluster state
*/
interface IndexMetadataService {

/**
* Returns the index metadata needed for ISM
*/
suspend fun getMetadata(indices: List<String>, client: Client, clusterService: ClusterService): Map<String, ISMIndexMetadata>

/**
* Returns all the indices metadata
*/
suspend fun getMetadataForAllIndices(client: Client, clusterService: ClusterService): Map<String, ISMIndexMetadata>

/**
* Returns an optional setting path which, when set to true in the index settings, overrides a cluster level metadata write block.
*/
fun getIndexMetadataWriteOverrideSetting(): String? = null
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.spi.indexstatemanagement

import org.opensearch.cluster.ClusterState

interface StatusChecker {

/**
* checks and returns the status of the extension
*/
fun check(clusterState: ClusterState): Status {
return Status.ENABLED
}
}

enum class Status(private val value: String) {
ENABLED("enabled"),
DISABLED("disabled");

override fun toString(): String {
return value
}
}

class DefaultStatusChecker : StatusChecker
Loading