Skip to content

Commit

Permalink
Migrates to json-event-parser
Browse files Browse the repository at this point in the history
Keeps buffering all the data in memory
  • Loading branch information
Tpt committed Aug 24, 2022
1 parent df0af6d commit 732a605
Show file tree
Hide file tree
Showing 6 changed files with 729 additions and 744 deletions.
6 changes: 3 additions & 3 deletions lib/ContextTree.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ import {JsonLdContextNormalized} from "jsonld-context-parser";
*/
export class ContextTree {

private readonly subTrees: {[key: string]: ContextTree} = {};
private readonly subTrees: {[key: string | number]: ContextTree} = {};
private context: Promise<JsonLdContextNormalized> | null;

public getContext(keys: string[]): Promise<{ context: JsonLdContextNormalized, depth: number }> | null {
public getContext(keys: (string | number)[]): Promise<{ context: JsonLdContextNormalized, depth: number }> | null {
if (keys.length > 0) {
const [head, ...tail] = keys;
const subTree = this.subTrees[head];
Expand All @@ -25,7 +25,7 @@ export class ContextTree {
return this.context ? this.context.then((context) => ({ context, depth: 0 })) : null;
}

public setContext(keys: any[], context: Promise<JsonLdContextNormalized> | null) {
public setContext(keys: (string | number)[], context: Promise<JsonLdContextNormalized> | null) {
if (keys.length === 0) {
this.context = context;
} else {
Expand Down
163 changes: 93 additions & 70 deletions lib/JsonLdParser.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import * as RDF from "@rdfjs/types";
// tslint:disable-next-line:no-var-requires
const Parser = require('jsonparse');
import {ERROR_CODES, ErrorCoded, IDocumentLoader, JsonLdContext, Util as ContextUtil} from "jsonld-context-parser";
import {PassThrough, Transform, Readable} from "readable-stream";
// @ts-ignore The types are not updated yet
import {PassThrough, Transform, Stream, pipeline} from "readable-stream";
import {EntryHandlerArrayValue} from "./entryhandler/EntryHandlerArrayValue";
import {EntryHandlerContainer} from "./entryhandler/EntryHandlerContainer";
import {EntryHandlerInvalidFallback} from "./entryhandler/EntryHandlerInvalidFallback";
Expand All @@ -20,6 +19,9 @@ import {EntryHandlerKeywordValue} from "./entryhandler/keyword/EntryHandlerKeywo
import {ParsingContext} from "./ParsingContext";
import {Util} from "./Util";
import {parse as parseLinkHeader} from "http-link-header";
import {JsonEventParser} from "json-event-parser";
import {JsonEvent} from "json-event-parser/lib/JsonEventParser";


/**
* A stream transformer that parses JSON-LD (text) streams to an {@link RDF.Stream}.
Expand All @@ -46,11 +48,10 @@ export class JsonLdParser extends Transform implements RDF.Sink<EventEmitter, RD
private readonly parsingContext: ParsingContext;
private readonly util: Util;

private readonly jsonParser: any;
// Jobs that are not started yet that process a @context (only used if streamingProfile is false)
private readonly contextJobs: (() => Promise<void>)[][];
// Jobs that are not started yet that process a @type (only used if streamingProfile is false)
private readonly typeJobs: { job: () => Promise<void>, keys: string[] }[];
private readonly typeJobs: { job: () => Promise<void>, keys: (string | number)[] }[];
// Jobs that are not started yet because of a missing @context or @type (only used if streamingProfile is false)
private readonly contextAwaitingJobs: { job: () => Promise<void>, keys: string[] }[];

Expand All @@ -60,30 +61,27 @@ export class JsonLdParser extends Transform implements RDF.Sink<EventEmitter, RD
private lastKeys: any[];
// A promise representing the last job
private lastOnValueJob: Promise<void>;
// The keys inside of the JSON tree
private readonly jsonKeyStack: (string | number)[];
// The value inside of the JSON tree
private readonly jsonValueStack: any[];

constructor(options?: IJsonLdParserOptions) {
super({ readableObjectMode: true });
super({ readableObjectMode: true, writableObjectMode: true });
options = options || {};
this.options = options;
this.parsingContext = new ParsingContext({ parser: this, ...options });
this.util = new Util({ dataFactory: options.dataFactory, parsingContext: this.parsingContext });

this.jsonParser = new Parser();
this.contextJobs = [];
this.typeJobs = [];
this.contextAwaitingJobs = [];

this.lastDepth = 0;
this.lastKeys = [];
this.lastOnValueJob = Promise.resolve();

this.attachJsonParserListeners();

this.on('end', () => {
if (typeof this.jsonParser.mode !== 'undefined') {
this.emit('error', new Error('Unclosed document'))
}
})
this.jsonKeyStack = [];
this.jsonValueStack = [];
}

/**
Expand Down Expand Up @@ -157,22 +155,20 @@ export class JsonLdParser extends Transform implements RDF.Sink<EventEmitter, RD
* @return {RDF.Stream} A quad stream.
*/
public import(stream: EventEmitter): RDF.Stream {
if('pipe' in stream) {
stream.on('error', (error) => parsed.emit('error', error));
const parsed = (<Readable>stream).pipe(new JsonLdParser(this.options));
return parsed;
} else {
const output = new PassThrough({ readableObjectMode: true });
stream.on('error', (error) => parsed.emit('error', error));
stream.on('data', (data) => output.push(data));
stream.on('end', () => output.push(null));
const parsed = output.pipe(new JsonLdParser(this.options));
return parsed;
let input: Stream = (<Stream>stream);
if(!('pipe' in stream)) {
input = new PassThrough({ readableObjectMode: true });
stream.on('error', (error) => input.emit('error', error));
stream.on('data', (data) => input.push(data));
stream.on('end', () => input.push(null));
}
return pipeline(input, new JsonEventParser(), new JsonLdParser(this.options), (err: any) => {
// We ignore the error?
});
}

public _transform(chunk: any, encoding: string, callback: (error?: Error | null, data?: any) => void): void {
this.jsonParser.write(chunk);
public _transform(event: any, _encoding: string, callback: (error?: Error | null, data?: any) => void): void {
this.onJsonEvent(event);
this.lastOnValueJob
.then(() => callback(), (error) => callback(error));
}
Expand All @@ -199,7 +195,7 @@ export class JsonLdParser extends Transform implements RDF.Sink<EventEmitter, RD
if (listPointer) {
// Terminate the list if the had at least one value
if (listPointer.value) {
this.emit('data', this.util.dataFactory.quad(listPointer.value, this.util.rdfRest, this.util.rdfNil,
this.push(this.util.dataFactory.quad(listPointer.value, this.util.rdfRest, this.util.rdfNil,
this.util.getDefaultGraph()));
}

Expand Down Expand Up @@ -413,65 +409,79 @@ export class JsonLdParser extends Transform implements RDF.Sink<EventEmitter, RD
*
* This should only be called once.
*/
protected attachJsonParserListeners() {
// Listen to json parser events
this.jsonParser.onValue = (value: any) => {
const depth = this.jsonParser.stack.length;
const keys = (new Array(depth + 1).fill(0)).map((v, i) => {
return i === depth ? this.jsonParser.key : this.jsonParser.stack[i].key;
});

if (!this.isParsingContextInner(depth)) { // Don't parse inner nodes inside @context
const valueJobCb = () => this.newOnValueJob(keys, value, depth, true);
if (!this.parsingContext.streamingProfile
protected onJsonEvent(event: JsonEvent) {
let key: any;
let value: any;
switch (event.type) {
case 'open-object':
this.insertInStack(event.key, {}, true);
return;
case 'open-array':
this.insertInStack(event.key, [], true);
return;
case 'value':
this.insertInStack(event.key, event.value, false);
key = event.key;
value = event.value;
break;
case 'close-object':
case 'close-array':
key = this.jsonKeyStack[this.jsonKeyStack.length - 1];
value = this.jsonValueStack[this.jsonValueStack.length - 1];
}

const depth = this.jsonKeyStack.length;
const keys = <string[]><any[]>[undefined, ...this.jsonKeyStack];

if (!this.isParsingContextInner()) { // Don't parse inner nodes inside @context
const valueJobCb = () => this.newOnValueJob(keys, value, depth, true);
if (!this.parsingContext.streamingProfile
&& !this.parsingContext.contextTree.getContext(keys.slice(0, -1))) {
// If an out-of-order context is allowed,
// we have to buffer everything.
// We store jobs for @context's and @type's separately,
// because at the end, we have to process them first.
// We also handle @type because these *could* introduce a type-scoped context.
if (keys[depth] === '@context') {
let jobs = this.contextJobs[depth];
if (!jobs) {
jobs = this.contextJobs[depth] = [];
}
jobs.push(valueJobCb);
} else if (keys[depth] === '@type'
|| typeof keys[depth] === 'number' && keys[depth - 1] === '@type') { // Also capture @type with array values
// Remove @type from keys, because we want it to apply to parent later on
this.typeJobs.push({ job: valueJobCb, keys: keys.slice(0, keys.length - 1) });
} else {
this.contextAwaitingJobs.push({ job: valueJobCb, keys });
if (key === '@context') {
let jobs = this.contextJobs[depth];
if (!jobs) {
jobs = this.contextJobs[depth] = [];
}
jobs.push(valueJobCb);
} else if (key === '@type'
|| typeof key === 'number' && this.jsonKeyStack[this.jsonKeyStack.length - 2] === '@type') { // Also capture @type with array values
// Remove @type from keys, because we want it to apply to parent later on
this.typeJobs.push({ job: valueJobCb, keys: keys.slice(0, keys.length - 1) });
} else {
// Make sure that our value jobs are chained synchronously
this.lastOnValueJob = this.lastOnValueJob.then(valueJobCb);
this.contextAwaitingJobs.push({ job: valueJobCb, keys });
}
} else {
// Make sure that our value jobs are chained synchronously
this.lastOnValueJob = this.lastOnValueJob.then(valueJobCb);
}

// Execute all buffered jobs on deeper levels
if (!this.parsingContext.streamingProfile && depth === 0) {
this.lastOnValueJob = this.lastOnValueJob
if (!this.parsingContext.streamingProfile && depth === 0) {
this.lastOnValueJob = this.lastOnValueJob
.then(() => this.executeBufferedJobs());
}
}
};
this.jsonParser.onError = (error: Error) => {
this.emit('error', error);
};
}

switch (event.type) {
case 'close-object':
case 'close-array':
this.jsonValueStack.pop();
case "value":
this.jsonKeyStack.pop();
}
}

/**
* Check if the parser is currently parsing an element that is part of an @context entry.
* @param {number} depth A depth.
* @return {boolean} A boolean.
*/
protected isParsingContextInner(depth: number) {
for (let i = depth; i > 0; i--) {
if (this.jsonParser.stack[i - 1].key === '@context') {
return true;
}
}
return false;
protected isParsingContextInner() {
return this.jsonKeyStack.slice(0, -1).includes('@context');
}

/**
Expand All @@ -497,7 +507,7 @@ export class JsonLdParser extends Transform implements RDF.Sink<EventEmitter, RD
// We check all possible parent nodes for the current job, from root to leaves.
if (this.typeJobs.length > 0) {
// First collect all applicable type jobs
const applicableTypeJobs: { job: () => Promise<void>, keys: string[] }[] = [];
const applicableTypeJobs: { job: () => Promise<void>, keys: (string | number)[] }[] = [];
const applicableTypeJobIds: number[] = [];
for (let i = 0; i < this.typeJobs.length; i++) {
const typeJob = this.typeJobs[i];
Expand Down Expand Up @@ -526,6 +536,19 @@ export class JsonLdParser extends Transform implements RDF.Sink<EventEmitter, RD
await job.job();
}
}

private insertInStack(key: string | number | undefined, value: any, push: boolean): void {
if (typeof key === 'string') {
this.jsonKeyStack.push(key);
this.jsonValueStack[this.jsonValueStack.length - 1][key] = value;
} else if (typeof key === 'number') {
this.jsonKeyStack.push(key);
this.jsonValueStack[this.jsonValueStack.length - 1].push(value);
}
if (push) {
this.jsonValueStack.push(value);
}
}
}

/**
Expand Down
2 changes: 1 addition & 1 deletion lib/Util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ export class Util {
* @param needle An array to check if it is a prefix.
* @param haystack An array to look in.
*/
public static isPrefixArray(needle: string[], haystack: string[]): boolean {
public static isPrefixArray(needle: any[], haystack: any[]): boolean {
if (needle.length > haystack.length) {
return false;
}
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
"canonicalize": "^1.0.1",
"http-link-header": "^1.0.2",
"jsonld-context-parser": "^2.1.3",
"jsonparse": "^1.3.1",
"json-event-parser": "https://github.com/comunica/json-event-parser.js",
"rdf-data-factory": "^1.1.0",
"readable-stream": "^4.0.0"
},
Expand Down
Loading

0 comments on commit 732a605

Please sign in to comment.