Skip to content

Commit

Permalink
Migrates to json-event-parser
Browse files Browse the repository at this point in the history
Makes bindings parsing work really in streaming
  • Loading branch information
Tpt committed Sep 5, 2022
1 parent f574346 commit ffcc306
Show file tree
Hide file tree
Showing 5 changed files with 234 additions and 48 deletions.
137 changes: 137 additions & 0 deletions lib/JsonStreamQueryTransformer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
import {Transform} from "readable-stream";
import {JsonEvent} from "json-event-parser";

export type JsonValue = string | number | boolean | null | JsonValue[] | {[key: string]: JsonValue};

export type QueryDefinition = {
id: string;
path: (string | null)[],
returnContent: boolean
}

export type QueryResult = {
query: string;
value?: JsonValue
}

/**
* Emits all JSON blobs matching one of the given query.
* A query is a path composed of object keys or `null` for array keys.
*/
export class JsonStreamQueryTransformer extends Transform {
private readonly queries: QueryDefinition[];
private keyStack: (string | number)[] = [];
private valueStack: JsonValue[] = [];
private currentQuery?: QueryDefinition = undefined;

constructor(queries: QueryDefinition[]) {
super({writableObjectMode: true, readableObjectMode: true});
this.queries = queries;
}

_transform(event: JsonEvent, encoding: BufferEncoding, callback: (error?: (Error | null), data?: any) => void): void {
this.currentQuery === undefined ? this.onOtherEvent(event) : this.onKeptEvent(event);
callback();
}

private onOtherEvent(event: JsonEvent): void {
switch (event.type) {
case "open-object":
if(event.key !== undefined) {
this.keyStack.push(event.key);
}
this.currentQuery = this.findCurrentQuery();
if(this.currentQuery) {
if(this.currentQuery.returnContent) {
this.valueStack.push({});
} else {
this.push({
query: this.currentQuery.id,
})
this.currentQuery = undefined;
}
}
return;
case "open-array":
if(event.key !== undefined) {
this.keyStack.push(event.key);
}
this.currentQuery = this.findCurrentQuery();
if(this.currentQuery) {
if(this.currentQuery.returnContent) {
this.valueStack.push([]);
} else {
this.push({
query: this.currentQuery.id
})
this.currentQuery = undefined;
}
}
return;
case "value":
if(event.key !== undefined) {
this.keyStack.push(event.key);
}
this.currentQuery = this.findCurrentQuery();
if(this.currentQuery) {
this.push({
query: this.currentQuery.id,
value: event.value
})
this.currentQuery = undefined;
}
case "close-object":
case "close-array":
this.keyStack.pop();
}
}

private findCurrentQuery(): QueryDefinition | undefined {
for(const query of this.queries) {
if(query.path.length === this.keyStack.length && query.path.every((value, i) => value === null ? typeof this.keyStack[i] === 'number' : value === this.keyStack[i])) {
return query;
}
}
return undefined;
}

private onKeptEvent(event: JsonEvent): void {
let value: any;
switch (event.type) {
case "open-object":
value = {};
this.insertInPreviousValue(event.key, value);
this.valueStack.push(value);
return;
case "open-array":
value = [];
this.insertInPreviousValue(event.key, value);
this.valueStack.push(value);
return;
case "value":
this.insertInPreviousValue(event.key, event.value);
return;
case "close-object":
case "close-array":
value = this.valueStack.pop();
if(this.valueStack.length === 0) {
// End
this.push({
query: this.currentQuery.id,
value
})
this.currentQuery = undefined;
this.keyStack.pop();
}
}
}

private insertInPreviousValue(key: number | string | undefined, value: JsonValue): void {
const previousValue = this.valueStack[this.valueStack.length - 1];
if(typeof key === "number") {
(<JsonValue[]>previousValue).push(value);
} else if(typeof key === "string") {
(<any>previousValue)[key] = value;
}
}
}
111 changes: 69 additions & 42 deletions lib/SparqlJsonParser.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import {DataFactory} from "rdf-data-factory";
import * as RDF from "@rdfjs/types";
import {Transform} from "readable-stream";

// tslint:disable-next-line:no-var-requires
const JsonParser = require('jsonparse');
import {JsonEvent, JsonEventParser} from "json-event-parser";
import {JsonStreamQueryTransformer, JsonValue, QueryResult} from './JsonStreamQueryTransformer';

/**
* Parser for the SPARQL 1.1 Query Results JSON format.
Expand Down Expand Up @@ -40,40 +39,57 @@ export class SparqlJsonParser {
*/
public parseJsonResultsStream(sparqlResponseStream: NodeJS.ReadableStream): NodeJS.ReadableStream {
const errorListener = (error: Error) => resultStream.emit('error', error);
sparqlResponseStream.on('error', errorListener);

const jsonParser = new JsonParser();
jsonParser.onError = errorListener;
let variablesFound = false;
let resultsFound = false;
jsonParser.onValue = (value: any) => {
if(jsonParser.key === "vars" && jsonParser.stack.length === 2 && jsonParser.stack[1].key === 'head') {
resultStream.emit('variables', value.map((v: string) => this.dataFactory.variable(v)));
variablesFound = true;
} else if(jsonParser.key === "results" && jsonParser.stack.length === 1) {
resultsFound = true;
} else if(typeof jsonParser.key === 'number' && jsonParser.stack.length === 3 && jsonParser.stack[1].key === 'results' && jsonParser.stack[2].key === 'bindings') {
resultStream.push(this.parseJsonBindings(value))
} else if(jsonParser.key === "metadata" && jsonParser.stack.length === 1) {
resultStream.emit('metadata', value);
}
}

const parser = this;
const resultStream = sparqlResponseStream
.on("end", _ => {
if (!resultsFound) {
resultStream.emit("error", new Error("No valid SPARQL query results were found."))
} else if (!variablesFound) {
resultStream.emit('variables', []);
}
})
.pipe(new Transform({
objectMode: true,
transform(chunk: any, encoding: string, callback: (error?: Error | null, data?: any) => void) {
jsonParser.write(chunk);
callback();
}
}));
.on('error', errorListener)
.pipe(new JsonEventParser())
.on('error', errorListener)
.pipe(new JsonStreamQueryTransformer([
{id: 'vars', path: ['head', 'vars'], returnContent: true},
{id: 'bindings', path: ['results', 'bindings'], returnContent: false},
{id: 'binding', path: ['results', 'bindings', null], returnContent: true},
{id: 'metadata', path: ['metadata'], returnContent: true},
]))
.on('error', errorListener)
.pipe(
new Transform({
writableObjectMode: true,
readableObjectMode: true,
transform(result: QueryResult, encoding: string, callback: (error?: Error | null, data?: any) => void): void {
try {
switch (result.query) {
case 'vars':
variablesFound = true;
this.emit('variables', parser.parseVariableList(result.value));
break;
case 'bindings':
resultsFound = true;
break;
case 'binding':
this.push(parser.parseJsonBindings(result.value));
break;
case 'metadata':
this.emit('metadata', result.value);
}
callback();
} catch (e: any) {
callback(e);
}
},
flush(callback: (error?: Error | null, data?: any) => void): void {
if (!resultsFound) {
callback(new Error("No valid SPARQL query results were found."));
return;
}
if (!variablesFound) {
this.emit('variables', []);
}
callback();
}
})
);
return resultStream;
}

Expand Down Expand Up @@ -134,20 +150,31 @@ export class SparqlJsonParser {
*/
public parseJsonBooleanStream(sparqlResponseStream: NodeJS.ReadableStream): Promise<boolean> {
return new Promise((resolve, reject) => {
const parser = new JsonParser();
parser.onError = reject;
parser.onValue = (value: any) => {
if(parser.key === "boolean" && typeof value === 'boolean' && parser.stack.length === 1) {
resolve(value);
}
}
sparqlResponseStream
.on('error', reject)
.on('data', d => parser.write(d))
.pipe(new JsonEventParser())
.on('error', reject)
.on('data', (event: JsonEvent) => {
if(event.type === 'value' && event.key === 'boolean' && typeof event.value === 'boolean') {
resolve(event.value);
}
})
.on('end', () => reject(new Error('No valid ASK response was found.')));
});
}

private parseVariableList(variables: JsonValue): RDF.Variable[] {
if(!Array.isArray(variables)) {
throw new Error("The variable list should be an array");
}
return variables.map(v => {
if(typeof v === "string") {
return this.dataFactory.variable(v);
} else {
throw new Error("Variable names should be strings");
}
});
}
}

/**
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@
"@rdfjs/types": "*",
"@types/readable-stream": "^2.3.13",
"buffer": "^6.0.3",
"jsonparse": "^1.3.1",
"json-event-parser": "1.0.0-beta.1",
"rdf-data-factory": "^1.1.0",
"readable-stream": "^4.0.0"
},
Expand Down
19 changes: 19 additions & 0 deletions test/SparqlJsonParser-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,17 @@ describe('SparqlJsonParser', () => {
`)))).toEqual([]);
});

it('should convert a slightly invalid empty SPARQL JSON response (common PHP error)', async () => {
return expect(await arrayifyStream(parser.parseJsonResultsStream(streamifyString(`
{
"head": { "vars": [] },
"results": {
"bindings": {}
}
}
`)))).toEqual([]);
});

it('should convert an empty SPARQL JSON response and emit the variables', async () => {
const stream = parser.parseJsonResultsStream(streamifyString(`
{
Expand Down Expand Up @@ -172,6 +183,14 @@ describe('SparqlJsonParser', () => {
return expect(arrayifyStream(parser.parseJsonResultsStream(streamifyString('{')))).rejects.toBeTruthy();
});

it('should reject on an invalid variables', async () => {
return expect(arrayifyStream(parser.parseJsonResultsStream(streamifyString('{"head": {"vars": null}, "results": {"bindings": []}}')))).rejects.toBeTruthy();
});

it('should reject on an invalid variables 2', async () => {
return expect(arrayifyStream(parser.parseJsonResultsStream(streamifyString('{"head": {"vars": [[]]}, "results": {"bindings": []}}')))).rejects.toBeTruthy();
});

it('should emit an error on an erroring stream', async () => {
const errorStream = new PassThrough();
errorStream._read = () => errorStream.emit('error', new Error('Some stream error'));
Expand Down
13 changes: 8 additions & 5 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2369,6 +2369,14 @@ jsesc@^2.5.1:
resolved "https://registry.yarnpkg.com/jsesc/-/jsesc-2.5.2.tgz#80564d2e483dacf6e8ef209650a67df3f0c283a4"
integrity sha512-OYu7XEzjkCQ3C5Ps3QIZsQfNpqoJyZZA99wd9aWd05NCtC5pWOkShK2mkL6HXQR6/Cy2lbNdPlZBpuQHXE63gA==

[email protected]:
version "1.0.0-beta.1"
resolved "https://registry.yarnpkg.com/json-event-parser/-/json-event-parser-1.0.0-beta.1.tgz#08ca3dc6c13184012754a8ff2b7e78a0e99bd95f"
integrity sha512-Ylf8GZNH2ftqvzVxOv7AdZ/+hf7Wy29JWvFe2g8wAmgKfyQDlP+H10dGOgwbGmeoPVhx19WDHWq+W8bLs5a1wg==
dependencies:
buffer "^6.0.3"
readable-stream "^4.0.0"

json-parse-better-errors@^1.0.1:
version "1.0.2"
resolved "https://registry.yarnpkg.com/json-parse-better-errors/-/json-parse-better-errors-1.0.2.tgz#bb867cfb3450e69107c131d1c514bab3dc8bcaa9"
Expand Down Expand Up @@ -2399,11 +2407,6 @@ json5@^2.2.1:
resolved "https://registry.yarnpkg.com/json5/-/json5-2.2.1.tgz#655d50ed1e6f95ad1a3caababd2b0efda10b395c"
integrity sha512-1hqLFMSrGHRHxav9q9gNjJ5EXznIxGVO09xQRrwplcS8qs28pZ8s8hupZAmqDwZUmVZ2Qb2jnyPOWcDH8m8dlA==

jsonparse@^1.3.1:
version "1.3.1"
resolved "https://registry.yarnpkg.com/jsonparse/-/jsonparse-1.3.1.tgz#3f4dae4a91fac315f71062f8521cc239f1366280"
integrity sha512-POQXvpdL69+CluYsillJ7SUhKvytYjW9vG/GKpnf+xP8UWgYEM/RaMzHHofbALDiKbbP1W8UEYmgGl39WkPZsg==

jsprim@^1.2.2:
version "1.4.2"
resolved "https://registry.yarnpkg.com/jsprim/-/jsprim-1.4.2.tgz#712c65533a15c878ba59e9ed5f0e26d5b77c5feb"
Expand Down

0 comments on commit ffcc306

Please sign in to comment.