From 208db77eaa16c04222e323a26b952a3251862304 Mon Sep 17 00:00:00 2001 From: Tpt Date: Wed, 31 Aug 2022 14:04:58 +0200 Subject: [PATCH] Migrates to json-event-parser Makes bindings parsing work really in streaming --- lib/JsonStreamQueryTransformer.ts | 137 ++++++++++++++++++++++++++++++ lib/SparqlJsonParser.ts | 111 +++++++++++++++--------- package.json | 2 +- test/SparqlJsonParser-test.ts | 19 +++++ yarn.lock | 7 ++ 5 files changed, 233 insertions(+), 43 deletions(-) create mode 100644 lib/JsonStreamQueryTransformer.ts diff --git a/lib/JsonStreamQueryTransformer.ts b/lib/JsonStreamQueryTransformer.ts new file mode 100644 index 0000000..295b4c6 --- /dev/null +++ b/lib/JsonStreamQueryTransformer.ts @@ -0,0 +1,137 @@ +import {Transform} from "readable-stream"; +import {JsonEvent} from "json-event-parser"; + +export type JsonValue = string | number | boolean | null | JsonValue[] | {[key: string]: JsonValue}; + +export type QueryDefinition = { + id: string; + path: (string | null)[], + returnContent: boolean +} + +export type QueryResult = { + query: string; + value?: JsonValue +} + +/** + * Emits all JSON blobs matching one of the given query. + * A query is a path composed of object keys or `null` for array keys. + */ +export class JsonStreamQueryTransformer extends Transform { + private readonly queries: QueryDefinition[]; + private keyStack: (string | number)[] = []; + private valueStack: JsonValue[] = []; + private currentQuery?: QueryDefinition = undefined; + + constructor(queries: QueryDefinition[]) { + super({writableObjectMode: true, readableObjectMode: true}); + this.queries = queries; + } + + _transform(event: JsonEvent, encoding: BufferEncoding, callback: (error?: (Error | null), data?: any) => void): void { + this.currentQuery === undefined ? this.onOtherEvent(event) : this.onKeptEvent(event); + callback(); + } + + private onOtherEvent(event: JsonEvent): void { + switch (event.type) { + case "open-object": + if(event.key !== undefined) { + this.keyStack.push(event.key); + } + this.currentQuery = this.findCurrentQuery(); + if(this.currentQuery) { + if(this.currentQuery.returnContent) { + this.valueStack.push({}); + } else { + this.push({ + query: this.currentQuery.id, + }) + this.currentQuery = undefined; + } + } + return; + case "open-array": + if(event.key !== undefined) { + this.keyStack.push(event.key); + } + this.currentQuery = this.findCurrentQuery(); + if(this.currentQuery) { + if(this.currentQuery.returnContent) { + this.valueStack.push([]); + } else { + this.push({ + query: this.currentQuery.id + }) + this.currentQuery = undefined; + } + } + return; + case "value": + if(event.key !== undefined) { + this.keyStack.push(event.key); + } + this.currentQuery = this.findCurrentQuery(); + if(this.currentQuery) { + this.push({ + query: this.currentQuery.id, + value: event.value + }) + this.currentQuery = undefined; + } + case "close-object": + case "close-array": + this.keyStack.pop(); + } + } + + private findCurrentQuery(): QueryDefinition | undefined { + for(const query of this.queries) { + if(query.path.length === this.keyStack.length && query.path.every((value, i) => value === null ? typeof this.keyStack[i] === 'number' : value === this.keyStack[i])) { + return query; + } + } + return undefined; + } + + private onKeptEvent(event: JsonEvent): void { + let value: any; + switch (event.type) { + case "open-object": + value = {}; + this.insertInPreviousValue(event.key, value); + this.valueStack.push(value); + return; + case "open-array": + value = []; + this.insertInPreviousValue(event.key, value); + this.valueStack.push(value); + return; + case "value": + this.insertInPreviousValue(event.key, event.value); + return; + case "close-object": + case "close-array": + value = this.valueStack.pop(); + if(this.valueStack.length === 0) { + // End + this.push({ + query: this.currentQuery.id, + value + }) + this.currentQuery = undefined; + this.keyStack.pop(); + } + } + } + + private insertInPreviousValue(key: number | string | undefined, value: JsonValue): void { + const previousValue = this.valueStack[this.valueStack.length - 1]; + if(typeof key === "number") { + (previousValue).push(value); + } else if(typeof key === "string") { + (previousValue)[key] = value; + } + } +} \ No newline at end of file diff --git a/lib/SparqlJsonParser.ts b/lib/SparqlJsonParser.ts index 32e3d27..b290e2d 100644 --- a/lib/SparqlJsonParser.ts +++ b/lib/SparqlJsonParser.ts @@ -1,9 +1,8 @@ import {DataFactory} from "rdf-data-factory"; import * as RDF from "@rdfjs/types"; import {Transform} from "readable-stream"; - -// tslint:disable-next-line:no-var-requires -const JsonParser = require('jsonparse'); +import {JsonEvent, JsonEventParser} from "json-event-parser"; +import {JsonStreamQueryTransformer, JsonValue, QueryResult} from './JsonStreamQueryTransformer'; /** * Parser for the SPARQL 1.1 Query Results JSON format. @@ -40,40 +39,57 @@ export class SparqlJsonParser { */ public parseJsonResultsStream(sparqlResponseStream: NodeJS.ReadableStream): NodeJS.ReadableStream { const errorListener = (error: Error) => resultStream.emit('error', error); - sparqlResponseStream.on('error', errorListener); - - const jsonParser = new JsonParser(); - jsonParser.onError = errorListener; let variablesFound = false; let resultsFound = false; - jsonParser.onValue = (value: any) => { - if(jsonParser.key === "vars" && jsonParser.stack.length === 2 && jsonParser.stack[1].key === 'head') { - resultStream.emit('variables', value.map((v: string) => this.dataFactory.variable(v))); - variablesFound = true; - } else if(jsonParser.key === "results" && jsonParser.stack.length === 1) { - resultsFound = true; - } else if(typeof jsonParser.key === 'number' && jsonParser.stack.length === 3 && jsonParser.stack[1].key === 'results' && jsonParser.stack[2].key === 'bindings') { - resultStream.push(this.parseJsonBindings(value)) - } else if(jsonParser.key === "metadata" && jsonParser.stack.length === 1) { - resultStream.emit('metadata', value); - } - } - + const parser = this; const resultStream = sparqlResponseStream - .on("end", _ => { - if (!resultsFound) { - resultStream.emit("error", new Error("No valid SPARQL query results were found.")) - } else if (!variablesFound) { - resultStream.emit('variables', []); - } - }) - .pipe(new Transform({ - objectMode: true, - transform(chunk: any, encoding: string, callback: (error?: Error | null, data?: any) => void) { - jsonParser.write(chunk); - callback(); - } - })); + .on('error', errorListener) + .pipe(new JsonEventParser()) + .on('error', errorListener) + .pipe(new JsonStreamQueryTransformer([ + {id: 'vars', path: ['head', 'vars'], returnContent: true}, + {id: 'bindings', path: ['results', 'bindings'], returnContent: false}, + {id: 'binding', path: ['results', 'bindings', null], returnContent: true}, + {id: 'metadata', path: ['metadata'], returnContent: true}, + ])) + .on('error', errorListener) + .pipe( + new Transform({ + writableObjectMode: true, + readableObjectMode: true, + transform(result: QueryResult, encoding: string, callback: (error?: Error | null, data?: any) => void): void { + try { + switch (result.query) { + case 'vars': + variablesFound = true; + this.emit('variables', parser.parseVariableList(result.value)); + break; + case 'bindings': + resultsFound = true; + break; + case 'binding': + this.push(parser.parseJsonBindings(result.value)); + break; + case 'metadata': + this.emit('metadata', result.value); + } + callback(); + } catch (e: any) { + callback(e); + } + }, + flush(callback: (error?: Error | null, data?: any) => void): void { + if (!resultsFound) { + callback(new Error("No valid SPARQL query results were found.")); + return; + } + if (!variablesFound) { + this.emit('variables', []); + } + callback(); + } + }) + ); return resultStream; } @@ -134,20 +150,31 @@ export class SparqlJsonParser { */ public parseJsonBooleanStream(sparqlResponseStream: NodeJS.ReadableStream): Promise { return new Promise((resolve, reject) => { - const parser = new JsonParser(); - parser.onError = reject; - parser.onValue = (value: any) => { - if(parser.key === "boolean" && typeof value === 'boolean' && parser.stack.length === 1) { - resolve(value); - } - } sparqlResponseStream .on('error', reject) - .on('data', d => parser.write(d)) + .pipe(new JsonEventParser()) + .on('error', reject) + .on('data', (event: JsonEvent) => { + if(event.type === 'value' && event.key === 'boolean' && typeof event.value === 'boolean') { + resolve(event.value); + } + }) .on('end', () => reject(new Error('No valid ASK response was found.'))); }); } + private parseVariableList(variables: JsonValue): RDF.Variable[] { + if(!Array.isArray(variables)) { + throw new Error("The variable list should be an array"); + } + return variables.map(v => { + if(typeof v === "string") { + return this.dataFactory.variable(v); + } else { + throw new Error("Variable names should be strings"); + } + }); + } } /** diff --git a/package.json b/package.json index 1cf541c..928664f 100644 --- a/package.json +++ b/package.json @@ -84,7 +84,7 @@ "@rdfjs/types": "*", "@types/readable-stream": "^2.3.13", "buffer": "^6.0.3", - "jsonparse": "^1.3.1", + "json-event-parser": "https://github.com/comunica/json-event-parser.js", "rdf-data-factory": "^1.1.0", "readable-stream": "^4.0.0" }, diff --git a/test/SparqlJsonParser-test.ts b/test/SparqlJsonParser-test.ts index cda34d1..847423f 100644 --- a/test/SparqlJsonParser-test.ts +++ b/test/SparqlJsonParser-test.ts @@ -96,6 +96,17 @@ describe('SparqlJsonParser', () => { `)))).toEqual([]); }); + it('should convert a slightly invalid empty SPARQL JSON response (common PHP error)', async () => { + return expect(await arrayifyStream(parser.parseJsonResultsStream(streamifyString(` +{ + "head": { "vars": [] }, + "results": { + "bindings": {} + } +} +`)))).toEqual([]); + }); + it('should convert an empty SPARQL JSON response and emit the variables', async () => { const stream = parser.parseJsonResultsStream(streamifyString(` { @@ -172,6 +183,14 @@ describe('SparqlJsonParser', () => { return expect(arrayifyStream(parser.parseJsonResultsStream(streamifyString('{')))).rejects.toBeTruthy(); }); + it('should reject on an invalid variables', async () => { + return expect(arrayifyStream(parser.parseJsonResultsStream(streamifyString('{"head": {"vars": null}, "results": {"bindings": []}}')))).rejects.toBeTruthy(); + }); + + it('should reject on an invalid variables 2', async () => { + return expect(arrayifyStream(parser.parseJsonResultsStream(streamifyString('{"head": {"vars": [[]]}, "results": {"bindings": []}}')))).rejects.toBeTruthy(); + }); + it('should emit an error on an erroring stream', async () => { const errorStream = new PassThrough(); errorStream._read = () => errorStream.emit('error', new Error('Some stream error')); diff --git a/yarn.lock b/yarn.lock index 0e19688..68bed07 100644 --- a/yarn.lock +++ b/yarn.lock @@ -2369,6 +2369,13 @@ jsesc@^2.5.1: resolved "https://registry.yarnpkg.com/jsesc/-/jsesc-2.5.2.tgz#80564d2e483dacf6e8ef209650a67df3f0c283a4" integrity sha512-OYu7XEzjkCQ3C5Ps3QIZsQfNpqoJyZZA99wd9aWd05NCtC5pWOkShK2mkL6HXQR6/Cy2lbNdPlZBpuQHXE63gA== +"json-event-parser@https://github.com/comunica/json-event-parser.js": + version "1.0.0" + resolved "https://github.com/comunica/json-event-parser.js#8a7a19558c60f66350820c9b3c11c1be2396b1b2" + dependencies: + buffer "^6.0.3" + readable-stream "^4.0.0" + json-parse-better-errors@^1.0.1: version "1.0.2" resolved "https://registry.yarnpkg.com/json-parse-better-errors/-/json-parse-better-errors-1.0.2.tgz#bb867cfb3450e69107c131d1c514bab3dc8bcaa9"