Skip to content

Commit

Permalink
attempt merge ii
Browse files Browse the repository at this point in the history
  • Loading branch information
psubram3 committed Nov 22, 2024
1 parent c62023e commit cf1dfe9
Show file tree
Hide file tree
Showing 2 changed files with 187 additions and 163 deletions.
223 changes: 126 additions & 97 deletions src/packages/external-source/external-source.ts
Original file line number Diff line number Diff line change
@@ -1,53 +1,131 @@
import type { Express, Request, Response } from 'express';
import type {
DerivationGroupInsertInput,
ExternalSourceInsertInput,
ExternalSourceTypeInsertInput,
CreateExternalSourceResponse,
CreateExternalSourceTypeResponse,
GetExternalSourceTypeAttributeSchemaResponse,
GetExternalEventTypeAttributeSchemaResponse,
UploadExternalSourceJSON,
UploadAttributeJSON,
ExternalEventTypeInsertInput,
ExternalEventInsertInput,
ExternalEventJson,
ExternalEvent,
ExternalSourceInsertInput,
} from '../../types/external-source.js';
import Ajv from 'ajv';
import { getEnv } from '../../env.js';
import getLogger from '../../logger.js';
import gql from './gql.js';
import { attributeSchemaMetaschema, externalSourceSchema } from '../schemas/external-event-validation-schemata.js';
import { attributeSchemaMetaschema, baseExternalSourceSchema } from '../schemas/external-event-validation-schemata.js';
import { HasuraError } from '../../types/hasura.js';
import { auth } from '../auth/middleware.js';
import rateLimit from 'express-rate-limit';
import multer from 'multer';
import { parseJSONFile } from '../../util/fileParser.js';
import { convertDoyToYmd } from '../../util/time.js';

const upload = multer({ limits: { fieldSize: 25 * 1024 * 1024 } });
const logger = getLogger('packages/external-source/external-source');
const { RATE_LIMITER_LOGIN_MAX, HASURA_API_URL } = getEnv();
const GQL_API_URL = `${HASURA_API_URL}/v1/graphql`;
const ajv = new Ajv();
const compiledAttributeMetaschema = ajv.compile(attributeSchemaMetaschema);
const compiledExternalSourceSchema = ajv.compile(externalSourceSchema);
const refreshLimiter = rateLimit({
legacyHeaders: false,
max: RATE_LIMITER_LOGIN_MAX,
standardHeaders: true,
windowMs: 15 * 60 * 1000, // 15 minutes
});

function updateSchemaWithDefs(defs: { event_types: any, source_type: any }) {//: Ajv.ValidateFunction | undefined {
// build if statement
const ifThenElse: { [key: string]: any } = {

};
let ifThenElsePointer = ifThenElse;
const keys = Object.keys(defs.event_types);

// handling if there's only 1 event type
if (keys.length === 1) {
// no need for ifThenElse, simply create localSchemaCopy and update properties.events.items.properties.attributes
// to match the event type in defs, and verify the event_type_name matches the def name
const localSchemaCopy = structuredClone(baseExternalSourceSchema);
const event_type_name = keys[0];
const event_type_schema = defs.event_types[event_type_name];
const source_type_name = Object.keys(defs.source_type)[0];
const source_type_schema = defs.source_type[source_type_name];

localSchemaCopy.properties.events.items.properties.attributes = event_type_schema;
localSchemaCopy.properties.events.items.properties.event_type_name = { "const": event_type_name };

// insert def for "source" attributes
localSchemaCopy.properties.source.properties.attributes = source_type_schema;

const localAjv = new Ajv();
return localAjv.addSchema(defs).compile(localSchemaCopy);
}

// handle n event types
for (let i = 0; i < keys.length - 1; i++) {
const key = keys[i];
console.log("NOW ON:", key);
ifThenElsePointer["if"] = {
properties: {
event_type_name: {
const: key
}
}
};
ifThenElsePointer["then"] = {
properties: {
attributes: {
$ref: `#/$defs/event_types/${key}`
}
}
};
ifThenElsePointer["else"] = {

};
ifThenElsePointer = ifThenElsePointer["else"];
}

// fill in the final else with the last element
const key = keys[keys.length - 1];
ifThenElsePointer["properties"] = {
attributes: {
$ref: `#/$defs/event_types/${key}`
}
}

// insert if statement into local copy of baseExternalSourceSchema
const localSchemaCopy = structuredClone(baseExternalSourceSchema);
localSchemaCopy.properties.events.items["if"] = ifThenElse["if"];
localSchemaCopy.properties.events.items["then"] = ifThenElse["then"];
localSchemaCopy.properties.events.items["else"] = ifThenElse["else"];

// insert def for "source" attributes
const sourceTypeKey = Object.keys(defs.source_type)[0];
localSchemaCopy.properties.source.properties.attributes = { $ref: `#/$defs/source_type/${sourceTypeKey}`}


// add defs
localSchemaCopy.$defs = {
event_types: {},
source_type: {
[sourceTypeKey]: defs.source_type[sourceTypeKey]
}
}
for (const event_type of keys) {
localSchemaCopy.$defs.event_types[event_type] = defs.event_types[event_type];
}

// compile with defs, return
const localAjv = new Ajv();
return localAjv.compile(localSchemaCopy);
}

async function uploadExternalSourceEventTypes(req: Request, res: Response) {
const authorizationHeader = req.get('authorization');

const {
headers: { 'x-hasura-role': roleHeader, 'x-hasura-user-id': userHeader },
} = req;

const { file } = req;
const { body } = req;
logger.info(`POST /uploadExternalSourceEventTypes: Uploading External Source and Event Types...`);

const headers: HeadersInit = {
Expand All @@ -58,10 +136,8 @@ async function uploadExternalSourceEventTypes(req: Request, res: Response) {
'x-hasura-user-id': userHeader ? `${userHeader}` : '',
};

const uploadedExternalSourceEventTypeAttributeSchema = await parseJSONFile<UploadAttributeJSON>(file);

// Validate uploaded attribute schemas are formatted validly
const schemasAreValid: boolean = await compiledAttributeMetaschema(uploadedExternalSourceEventTypeAttributeSchema);
const schemasAreValid: boolean = await compiledAttributeMetaschema(body);
if (!schemasAreValid) {
logger.error(
`POST /uploadExternalSourceEventTypes: Schema validation failed for uploaded source and event types.`,
Expand All @@ -77,19 +153,19 @@ async function uploadExternalSourceEventTypes(req: Request, res: Response) {
const externalSourceTypeInput: ExternalSourceTypeInsertInput = [];
const externalEventTypeInput: ExternalEventTypeInsertInput = [];

const external_event_types = uploadedExternalSourceEventTypeAttributeSchema.event_types;
const external_event_types = body.event_types;
const event_type_keys = Object.keys(external_event_types);
for (const external_event_type of event_type_keys) {
externalSourceTypeInput.push({
externalEventTypeInput.push({
attribute_schema: external_event_types[external_event_type],
name: external_event_type
})
}

const external_source_types = uploadedExternalSourceEventTypeAttributeSchema.source_types;
const external_source_types = body.source_types;
const source_type_keys = Object.keys(external_source_types);
for (const external_source_type of source_type_keys) {
externalEventTypeInput.push({
externalSourceTypeInput.push({
attribute_schema: external_source_types[external_source_type],
name: external_source_type
})
Expand Down Expand Up @@ -117,22 +193,20 @@ async function uploadExternalSource(req: Request, res: Response) {
headers: { 'x-hasura-role': roleHeader, 'x-hasura-user-id': userHeader },
} = req;
const { body } = req;
const { source, external_events } = body;
const { attributes, derivation_group_name, key, end_time, start_time, source_type_name, valid_at } = source;
const parsedAttributes = JSON.parse(attributes);
const parsedExternalEvents: ExternalEvent[] = JSON.parse(external_events);

// Re-package the fields as a JSON object to be validated by the meta-schema
const { source, events } = body;
const parsedSource = JSON.parse(source);
const parsedExternalEvents: ExternalEvent[] = JSON.parse(events);
const { attributes, derivation_group_name, key, period, source_type_name, valid_at } = parsedSource;

// re-package the fields as a JSON object to be posted
const externalSourceJson = {
external_events: parsedExternalEvents,
events: parsedExternalEvents,
source: {
attributes: parsedAttributes,
attributes: attributes,
derivation_group_name: derivation_group_name,
key: key,
period: {
end_time: end_time,
start_time: start_time,
},
period: period,
source_type_name: source_type_name,
valid_at: valid_at,
},
Expand Down Expand Up @@ -163,96 +237,51 @@ async function uploadExternalSource(req: Request, res: Response) {
});

const attributeSchemaJson = await attributeSchemas.json();
const { external_event_type, external_source_type } = attributeSchemaJson.data;

const defs: { event_types: any, source_type: any } = {
event_types: {

// TODO: assemble megaschema from attribute schemas
console.log(attributeSchemaJson);
},
source_type: {
[external_source_type[0].name]: external_source_type[0].attribute_schema
}
};

return;
for (const event_type of external_event_type) {
defs.event_types[event_type.name] = event_type.attribute_schema
}

// Assemble megaschema from attribute schemas
const compiledExternalSourceMegaschema: Ajv.ValidateFunction = updateSchemaWithDefs(defs);

// Verify that this is a valid external source
let sourceIsValid: boolean = false;
sourceIsValid = await compiledExternalSourceSchema(body);
sourceIsValid = await compiledExternalSourceMegaschema(externalSourceJson);
if (sourceIsValid) {
logger.info(`POST /uploadExternalSource: External Source ${key}'s formatting is valid`);
} else {
logger.error(`POST /uploadExternalSource: External Source ${key}'s formatting is invalid:\n${JSON.stringify(compiledExternalSourceSchema.errors)}`);
res.status(500).send({ message: `External Source ${key}'s formatting is invalid:\n${JSON.stringify(compiledExternalSourceSchema.errors)}` });
logger.error(`POST /uploadExternalSource: External Source ${key}'s formatting is invalid:\n${JSON.stringify(compiledExternalSourceMegaschema.errors)}`);
res.status(500).send({ message: `External Source ${key}'s formatting is invalid:\n${JSON.stringify(compiledExternalSourceMegaschema.errors)}` });
return;
}

const usedExternalEventTypes = parsedExternalEvents.reduce((acc: string[], externalEvent: ExternalEventInsertInput) => {
if (!acc.includes(externalEvent.event_type_name)) {
acc.push(externalEvent.event_type_name);
}
return acc;
}, []);

const usedExternalEventTypesAttributesSchemas: Record<string, Ajv.ValidateFunction> = {};
for (const eventType of usedExternalEventTypes) {
const eventAttributeSchema = await fetch(GQL_API_URL, {
body: JSON.stringify({
query: gql.GET_EXTERNAL_EVENT_TYPE_ATTRIBUTE_SCHEMA,
variables: {
name: eventType,
},
}),
headers,
method: 'POST',
});
const eventTypeJSONResponse = await eventAttributeSchema.json();
const getExternalEventTypeAttributeSchemaResponse = eventTypeJSONResponse.data as
| GetExternalEventTypeAttributeSchemaResponse
| HasuraError;
if (
(getExternalEventTypeAttributeSchemaResponse as GetExternalEventTypeAttributeSchemaResponse)
.external_event_type_by_pk?.attribute_schema !== null
) {
const { external_event_type_by_pk: eventAttributeSchema } =
getExternalEventTypeAttributeSchemaResponse as GetExternalEventTypeAttributeSchemaResponse;
if (eventAttributeSchema !== undefined && eventAttributeSchema !== null) {
usedExternalEventTypesAttributesSchemas[eventType] = ajv.compile(eventAttributeSchema.attribute_schema);
}
}
}

// Validate attributes of all External Events in the source
for (const externalEvent of parsedExternalEvents) {
try {
const currentEventType = externalEvent.event_type_name;
const currentEventSchema: Ajv.ValidateFunction = usedExternalEventTypesAttributesSchemas[currentEventType];
const eventAttributesAreValid = await currentEventSchema(externalEvent.attributes);
if (!eventAttributesAreValid) {
throw new Error(
`External Event '${externalEvent.key
}' does not have a valid set of attributes, per it's type's schema:\n${JSON.stringify(
currentEventSchema.errors,
)}`,
);
}
} catch (error) {
logger.error(`POST /uploadExternalSource: External Event ${externalEvent.key}'s attributes are invalid`);
res.status(500).send({ message: (error as Error).message });
return;
}
}

// Run the Hasura migration for creating an external source
const derivationGroupInsert: DerivationGroupInsertInput = {
name: derivation_group_name,
source_type_name: source_type_name,
};

const externalSourceInsert: ExternalSourceInsertInput = {
attributes: parsedAttributes,
attributes: attributes,
derivation_group_name: derivation_group_name,
end_time: end_time,
end_time: period.end_time,
external_events: {
data: parsedExternalEvents,
},
key: key,
source_type_name: source_type_name,
start_time: start_time,
start_time: period.start_time,
valid_at: valid_at,
};

Expand Down Expand Up @@ -361,7 +390,7 @@ export default (app: Express) => {
* type: string
* end_time:
* type: string
* external_events:
* events:
* type: object
* properties:
* data:
Expand All @@ -380,7 +409,7 @@ export default (app: Express) => {
* - attributes
* derivation_group_name
* end_time
* external_events
* events
* key
* source_type_name
* start_time
Expand Down
Loading

0 comments on commit cf1dfe9

Please sign in to comment.