Skip to content

Commit f2072c9

Browse files
authored
Merge pull request #327 from Sanketika-Obsrv/release-1.5.0
Release 1.5.0 to main
2 parents c37e24a + 6172a88 commit f2072c9

File tree

13 files changed

+255
-77
lines changed

13 files changed

+255
-77
lines changed

api-service/src/controllers/DataIngestion/DataIngestionController.ts

+30-43
Original file line numberDiff line numberDiff line change
@@ -5,48 +5,35 @@ import { schemaValidation } from "../../services/ValidationService";
55
import { ResponseHandler } from "../../helpers/ResponseHandler";
66
import { send } from "../../connections/kafkaConnection";
77
import { datasetService } from "../../services/DatasetService";
8-
import logger from "../../logger";
98
import { config } from "../../configs/Config";
9+
import { obsrvError } from "../../types/ObsrvError";
1010

11-
const errorObject = {
12-
datasetNotFound: {
13-
"message": "Dataset with id not found",
14-
"statusCode": 404,
15-
"errCode": "BAD_REQUEST",
16-
"code": "DATASET_NOT_FOUND"
17-
},
18-
topicNotFound: {
19-
"message": "Entry topic is not defined",
20-
"statusCode": 404,
21-
"errCode": "BAD_REQUEST",
22-
"code": "TOPIC_NOT_FOUND"
11+
const apiId = "api.data.in";
12+
13+
const requestValidation = async (req: Request) => {
14+
const datasetKey = req.params.dataset_id.trim();
15+
16+
const isValidSchema = schemaValidation(req.body, validationSchema)
17+
if (!isValidSchema?.isValid) {
18+
throw obsrvError("", "DATA_INGESTION_INVALID_INPUT", isValidSchema?.message, "BAD_REQUEST", 400)
2319
}
20+
const dataset = await datasetService.getDatasetWithDatasetkey(datasetKey, ["id", "entry_topic", "api_version", "dataset_config", "dataset_id", "extraction_config"], true)
21+
if (_.isEmpty(dataset)) {
22+
throw obsrvError(datasetKey, "DATASET_NOT_FOUND", `Dataset with id/alias name '${datasetKey}' not found`, "NOT_FOUND", 404)
23+
}
24+
return dataset
2425
}
25-
const apiId = "api.data.in";
2626

2727
const dataIn = async (req: Request, res: Response) => {
2828

29-
const requestBody = req.body;
30-
const datasetId = req.params.dataset_id.trim();
31-
32-
const isValidSchema = schemaValidation(requestBody, validationSchema)
33-
if (!isValidSchema?.isValid) {
34-
logger.error({ apiId, message: isValidSchema?.message, code: "DATA_INGESTION_INVALID_INPUT" })
35-
return ResponseHandler.errorResponse({ message: isValidSchema?.message, statusCode: 400, errCode: "BAD_REQUEST", code: "DATA_INGESTION_INVALID_INPUT" }, req, res);
36-
}
37-
const dataset = await datasetService.getDataset(datasetId, ["id", "entry_topic", "api_version", "dataset_config"], true)
38-
if (!dataset) {
39-
logger.error({ apiId, message: `Dataset with id ${datasetId} not found in live table`, code: "DATASET_NOT_FOUND" })
40-
return ResponseHandler.errorResponse(errorObject.datasetNotFound, req, res);
41-
}
42-
const { entry_topic, dataset_config, extraction_config, api_version } = dataset
43-
const entryTopic = api_version !== "v2" ? _.get(dataset_config, "entry_topic") : entry_topic
44-
if (!entryTopic) {
45-
logger.error({ apiId, message: "Entry topic not found", code: "TOPIC_NOT_FOUND" })
46-
return ResponseHandler.errorResponse(errorObject.topicNotFound, req, res);
47-
}
48-
await send(addMetadataToEvents(datasetId, requestBody, extraction_config), entryTopic)
49-
ResponseHandler.successResponse(req, res, { status: 200, data: { message: "Data ingested successfully" } });
29+
const dataset = await requestValidation(req)
30+
const { entry_topic, dataset_config, extraction_config, api_version, dataset_id } = dataset
31+
const entryTopic = api_version !== "v2" ? _.get(dataset_config, "entry_topic") : entry_topic
32+
if (!entryTopic) {
33+
throw obsrvError(dataset_id, "TOPIC_NOT_FOUND", `Entry topic not found`, "NOT_FOUND", 404)
34+
}
35+
await send(addMetadataToEvents(dataset_id, req.body, extraction_config), entryTopic)
36+
ResponseHandler.successResponse(req, res, { status: 200, data: { message: "Data ingested successfully" } });
5037

5138
}
5239

@@ -59,14 +46,14 @@ const addMetadataToEvents = (datasetId: string, payload: any, extraction_config:
5946
if (Array.isArray(validData)) {
6047
const extraction_key: string = _.get(extraction_config, "extraction_key", 'events');
6148
const dedup_key: string = _.get(extraction_config, "dedup_config.dedup_key", 'id');
62-
const payload: any = {
63-
"obsrv_meta": obsrvMeta,
64-
"dataset": datasetId,
65-
"msgid": mid
66-
};
67-
payload[extraction_key] = validData;
68-
payload[dedup_key] = mid
69-
return payload;
49+
const payload: any = {
50+
"obsrv_meta": obsrvMeta,
51+
"dataset": datasetId,
52+
"msgid": mid
53+
};
54+
payload[extraction_key] = validData;
55+
payload[dedup_key] = mid
56+
return payload;
7057
}
7158
else {
7259
return ({

api-service/src/controllers/DataOut/DataOutController.ts

+18-6
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,29 @@ import validationSchema from "./DataOutValidationSchema.json";
66
import { validateQuery } from "./QueryValidator";
77
import * as _ from "lodash";
88
import { executeNativeQuery, executeSqlQuery } from "../../connections/druidConnection";
9+
import { datasetService } from "../../services/DatasetService";
10+
import { obsrvError } from "../../types/ObsrvError";
911

1012
export const apiId = "api.data.out";
13+
14+
const requestValidation = async (req: Request) => {
15+
const datasetKey = req.params?.dataset_id;
16+
const isValidSchema = schemaValidation(req.body, validationSchema);
17+
if (!isValidSchema?.isValid) {
18+
throw obsrvError(datasetKey, "DATA_OUT_INVALID_INPUT", isValidSchema?.message, "BAD_REQUEST", 400)
19+
}
20+
const dataset = await datasetService.getDatasetWithDatasetkey(datasetKey, ["dataset_id"], true)
21+
if (_.isEmpty(dataset)) {
22+
throw obsrvError(datasetKey, "DATASET_NOT_FOUND", `Dataset with id/alias name '${datasetKey}' not found`, "NOT_FOUND", 404)
23+
}
24+
return dataset
25+
}
26+
1127
const dataOut = async (req: Request, res: Response) => {
12-
const datasetId = req.params?.dataset_id;
1328
const requestBody = req.body;
1429
const msgid = _.get(req, "body.params.msgid");
15-
const isValidSchema = schemaValidation(requestBody, validationSchema);
16-
if (!isValidSchema?.isValid) {
17-
logger.error({ apiId, datasetId, msgid, requestBody, message: isValidSchema?.message, code: "DATA_OUT_INVALID_INPUT" })
18-
return ResponseHandler.errorResponse({ message: isValidSchema?.message, statusCode: 400, errCode: "BAD_REQUEST", code: "DATA_OUT_INVALID_INPUT" }, req, res);
19-
}
30+
const dataset = await requestValidation(req)
31+
const datasetId = _.get(dataset, "dataset_id")
2032
const isValidQuery: any = await validateQuery(req.body, datasetId);
2133
const query = _.get(req, "body.query", "")
2234

api-service/src/controllers/DataOut/QueryValidator.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ const setQueryLimits = (queryPayload: any) => {
117117

118118
const getDataSourceFromPayload = (queryPayload: any) => {
119119
if (_.isString(queryPayload.query)) {
120-
queryPayload?.query.replace(/from\s+["'`]?[\w-]+["'`]?(\s+where\s+)/i, ` from "${dataset_id}"$1`);
120+
queryPayload.query = queryPayload.query.replace(/from\s+["'`]?[\w-]+["'`]?(\s+where\s+)/i, ` from "${dataset_id}"$1`);
121121
return dataset_id
122122
}
123123
if (_.isObject(queryPayload.query)) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
import { Request, Response } from "express";
2+
import { schemaValidation } from "../../services/ValidationService";
3+
import DatasetAliasSchema from "./DatasetAliasValidationSchema.json"
4+
import { obsrvError } from "../../types/ObsrvError";
5+
import _ from "lodash";
6+
import { datasetService } from "../../services/DatasetService";
7+
import { Op } from "sequelize";
8+
import { Dataset } from "../../models/Dataset";
9+
import { ResponseHandler } from "../../helpers/ResponseHandler";
10+
import httpStatus from "http-status";
11+
12+
13+
const validateRequest = async (req: Request) => {
14+
15+
const isRequestValid: Record<string, any> = schemaValidation(req.body, DatasetAliasSchema)
16+
if (!isRequestValid.isValid) {
17+
throw obsrvError("", "DATASET_ALIAS_INPUT_INVALID", isRequestValid.message, "BAD_REQUEST", 400)
18+
}
19+
20+
const { dataset_id, action, alias_name: alias } = _.get(req, ["body", "request"])
21+
let datasetAlias = alias
22+
const dataset = await datasetService.getDataset(dataset_id, ["id", "name", "alias"], true);
23+
if (_.isEmpty(dataset)) {
24+
throw obsrvError(dataset_id, "DATASET_NOT_EXISTS", `Dataset does not exists with id:${dataset_id}`, "NOT_FOUND", 404);
25+
}
26+
27+
if (action === "attach") {
28+
if (_.get(dataset, "alias")) {
29+
throw obsrvError(dataset_id, "DATASET_ALIAS_EXISTS", `Dataset already has alias '${_.get(dataset, "alias")}' associated with it. Please detach the existing alias and try again`, "BAD_REQUEST", 400);
30+
}
31+
32+
const datasetList = await datasetService.findDatasets({ [Op.or]: [{ dataset_id: alias }, { name: alias }, { alias }] }, ["id"]);
33+
const draftDatasetList = await datasetService.findDraftDatasets({ [Op.or]: [{ dataset_id: alias }, { name: alias }] }, ["id"]);
34+
if (!(_.isEmpty(datasetList) && _.isEmpty(draftDatasetList))) {
35+
throw obsrvError(dataset_id, "DATASET_ALIAS_NOT_UNIQUE", `Dataset alias must be unique. The alias '${alias}' cannot be the same as the dataset id, dataset name or alias name of any other dataset.`, "BAD_REQUEST", 400);
36+
}
37+
}
38+
39+
if (action === "detach") {
40+
const existingAliasName = _.get(dataset, "alias")
41+
if (!existingAliasName) {
42+
throw obsrvError(dataset_id, "DATASET_ALIAS_NOT_EXISTS", `Dataset '${dataset_id}' does not have any alias associated with it`, "BAD_REQUEST", 400);
43+
}
44+
datasetAlias = existingAliasName;
45+
}
46+
47+
return datasetAlias
48+
}
49+
50+
const datasetAlias = async (req: Request, res: Response) => {
51+
const dataset_alias = await validateRequest(req)
52+
const { dataset_id, action, alias_name } = _.get(req, ["body", "request"])
53+
const userID = (req as any)?.userID;
54+
switch (action) {
55+
case "attach":
56+
await attachAlias(dataset_id, alias_name, userID);
57+
break;
58+
case "detach":
59+
await detachAlias(dataset_id, userID);
60+
break;
61+
}
62+
ResponseHandler.successResponse(req, res, { status: httpStatus.OK, data: { message: `Dataset alias name '${dataset_alias}' ${action}ed successfully`, dataset_id } });
63+
}
64+
65+
const attachAlias = async (dataset_id: string, alias_name: string, userID: string) => {
66+
await Dataset.update({ alias: alias_name, updated_by: userID }, { where: { id: dataset_id } });
67+
}
68+
69+
const detachAlias = async (dataset_id: string, userID: string) => {
70+
await Dataset.update({ alias: null, updated_by: userID }, { where: { id: dataset_id } });
71+
}
72+
73+
export default datasetAlias;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
{
2+
"type": "object",
3+
"properties": {
4+
"id": {
5+
"type": "string",
6+
"enum": [
7+
"api.datasets.alias"
8+
]
9+
},
10+
"ver": {
11+
"type": "string"
12+
},
13+
"ts": {
14+
"type": "string"
15+
},
16+
"params": {
17+
"type": "object",
18+
"properties": {
19+
"msgid": {
20+
"type": "string"
21+
}
22+
},
23+
"required": [
24+
"msgid"
25+
],
26+
"additionalProperties": false
27+
},
28+
"request": {
29+
"type": "object",
30+
"properties": {
31+
"action": {
32+
"type": "string",
33+
"enum": [
34+
"attach",
35+
"detach"
36+
]
37+
},
38+
"dataset_id": {
39+
"type": "string"
40+
},
41+
"alias_name": {
42+
"type": "string"
43+
}
44+
},
45+
"if": {
46+
"properties": {
47+
"action": {
48+
"const": "attach"
49+
}
50+
}
51+
},
52+
"then": {
53+
"properties": {
54+
"alias_name": {
55+
"minLength": 1
56+
}
57+
},
58+
"required": [
59+
"alias_name"
60+
]
61+
},
62+
"required": [
63+
"action",
64+
"dataset_id"
65+
],
66+
"additionalProperties": false
67+
}
68+
},
69+
"required": [
70+
"id",
71+
"ver",
72+
"ts",
73+
"params",
74+
"request"
75+
],
76+
"additionalProperties": false
77+
}

api-service/src/controllers/DatasetExport/DatasetExport.ts

+2
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ const validateDataset = async (req: Request) => {
2525
const migratedConfigs = await datasetService.migrateDatasetV1(dataset_id, datasetRecord)
2626
datasetRecord = { ...datasetRecord, ...migratedConfigs }
2727
}
28+
29+
datasetRecord = _.omit(datasetRecord, "alias")
2830
return datasetRecord;
2931
}
3032

api-service/src/controllers/DatasetList/DatasetList.ts

+4-4
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ const draftDatasetStatus = ["Draft", "ReadyToPublish"]
1515
const defaultFields = ["dataset_id", "name", "type", "status", "tags", "version", "api_version", "dataset_config", "created_date", "updated_date"]
1616

1717
const datasetList = async (req: Request, res: Response) => {
18-
18+
1919
const isRequestValid: Record<string, any> = schemaValidation(req.body, DatasetCreate)
2020
if (!isRequestValid.isValid) {
2121
throw obsrvError("", "DATASET_LIST_INPUT_INVALID", isRequestValid.message, "BAD_REQUEST", 400)
@@ -24,9 +24,9 @@ const datasetList = async (req: Request, res: Response) => {
2424
const datasetBody = req.body.request;
2525
const datasetList = await listDatasets(datasetBody)
2626
const responseData = { data: datasetList, count: _.size(datasetList) }
27-
logger.info({req: req.body, resmsgid: _.get(res, "resmsgid"), message: `Datasets are listed successfully with a dataset count (${_.size(datasetList)})` })
27+
logger.info({ req: req.body, resmsgid: _.get(res, "resmsgid"), message: `Datasets are listed successfully with a dataset count (${_.size(datasetList)})` })
2828
ResponseHandler.successResponse(req, res, { status: httpStatus.OK, data: responseData });
29-
29+
3030
}
3131

3232
const listDatasets = async (request: Record<string, any>): Promise<Record<string, any>> => {
@@ -36,7 +36,7 @@ const listDatasets = async (request: Record<string, any>): Promise<Record<string
3636
const status = _.isArray(datasetStatus) ? datasetStatus : _.compact([datasetStatus])
3737
const draftFilters = _.set(_.cloneDeep(filters), "status", _.isEmpty(status) ? draftDatasetStatus : _.intersection(status, draftDatasetStatus));
3838
const liveFilters = _.set(_.cloneDeep(filters), "status", _.isEmpty(status) ? liveDatasetStatus : _.intersection(status, liveDatasetStatus));
39-
const liveDatasetList = await datasetService.findDatasets(liveFilters, defaultFields, [["updated_date", "DESC"]]);
39+
const liveDatasetList = await datasetService.findDatasets(liveFilters, [...defaultFields, "alias"], [["updated_date", "DESC"]]);
4040
const draftDatasetList = await datasetService.findDraftDatasets(draftFilters, [...defaultFields, "data_schema", "validation_config", "dedup_config", "denorm_config", "connectors_config", "version_key"], [["updated_date", "DESC"]]);
4141
return _.compact(_.concat(liveDatasetList, draftDatasetList));
4242
}

api-service/src/controllers/DatasetRead/DatasetRead.ts

+4-3
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { DatasetDraft } from "../../models/DatasetDraft";
66
import { datasetService, getV1Connectors } from "../../services/DatasetService";
77
import { obsrvError } from "../../types/ObsrvError";
88
import { cipherService } from "../../services/CipherService";
9+
import { Dataset } from "../../models/Dataset";
910

1011
export const apiId = "api.datasets.read";
1112
export const errorCode = "DATASET_READ_FAILURE"
@@ -16,9 +17,9 @@ export const defaultFields = ["dataset_id", "name", "type", "status", "tags", "v
1617
const validateRequest = (req: Request) => {
1718

1819
const { dataset_id } = req.params;
19-
const { fields } = req.query;
20+
const { fields, mode } = req.query;
2021
const fieldValues = fields ? _.split(fields as string, ",") : [];
21-
const invalidFields = _.difference(fieldValues, Object.keys(DatasetDraft.getAttributes()));
22+
const invalidFields = mode === "edit" ? _.difference(fieldValues, Object.keys(DatasetDraft.getAttributes())) : _.difference(fieldValues, Object.keys(Dataset.getAttributes()));
2223
if (!_.isEmpty(invalidFields)) {
2324
throw obsrvError(dataset_id, "DATASET_INVALID_FIELDS", `The specified fields [${invalidFields}] in the dataset cannot be found.`, "BAD_REQUEST", 400);
2425
}
@@ -37,7 +38,7 @@ const datasetRead = async (req: Request, res: Response) => {
3738
throw obsrvError(dataset_id, "DATASET_NOT_FOUND", `Dataset with the given dataset_id:${dataset_id} not found`, "NOT_FOUND", 404);
3839
}
3940
if (dataset.connectors_config) {
40-
dataset.connectors_config = processConnectorsConfig(dataset.connectors_config);
41+
dataset.connectors_config = processConnectorsConfig(dataset.connectors_config);
4142
}
4243
ResponseHandler.successResponse(req, res, { status: httpStatus.OK, data: dataset });
4344
}

api-service/src/middlewares/userPermissions.json

+2-1
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@
4848
"api.datasets.update",
4949
"api.datasets.import",
5050
"api.datasets.copy",
51-
"api.datasets.dataschema"
51+
"api.datasets.dataschema",
52+
"api.datasets.alias"
5253
],
5354
"data": [
5455
"api.data.in",

api-service/src/models/Dataset.ts

+4
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,10 @@ export const Dataset = sequelize.define("datasets", {
7575
entry_topic: {
7676
type: DataTypes.STRING,
7777
allowNull: false
78+
},
79+
alias: {
80+
type: DataTypes.STRING,
81+
allowNull: true
7882
}
7983
}, {
8084
tableName: "datasets",

api-service/src/routes/Router.ts

+2
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import connectorRegisterController from "../controllers/ConnectorRegister/Connec
3434
import dataMetrics from "../controllers/DataMetrics/DataMetricsController";
3535
import datasetMetrics from "../controllers/DatasetMetrics/DatasetMetricsController";
3636
import { dataAnalyzePII } from "../controllers/DataAnalyzePII/DataAnalyzePIIController";
37+
import datasetAlias from "../controllers/DatasetAlias/DatasetAlias";
3738

3839
export const router = express.Router();
3940

@@ -62,6 +63,7 @@ router.post("/connectors/list", setDataToRequestObject("api.connectors.list"), o
6263
router.get("/connectors/read/:id", setDataToRequestObject("api.connectors.read"), onRequest({entity: Entity.Management }), telemetryAuditStart({action: telemetryActions.readConnectors, operationType: OperationType.GET}), checkRBAC.handler(), ConnectorsRead);
6364
router.post("/datasets/import", setDataToRequestObject("api.datasets.import"), onRequest({ entity: Entity.Management }), checkRBAC.handler(), DatasetImport);
6465
router.post("/connector/register", setDataToRequestObject("api.connector.register"), onRequest({ entity: Entity.Management }), connectorRegisterController);
66+
router.post("/datasets/alias", setDataToRequestObject("api.datasets.alias"), onRequest({ entity: Entity.Management }), checkRBAC.handler(), datasetAlias);
6567
router.post("/data/analyze/pii", setDataToRequestObject("api.data.analyze.pii"), onRequest({ entity: Entity.Management }),checkRBAC.handler(), dataAnalyzePII);
6668
//Wrapper Service
6769
router.post("/obsrv/data/sql-query", setDataToRequestObject("api.obsrv.data.sql-query"), onRequest({ entity: Entity.Data_out }), checkRBAC.handler(), sqlQuery);

0 commit comments

Comments
 (0)