Skip to content

Commit

Permalink
feat: improve performance of response data parsing (#1580)
Browse files Browse the repository at this point in the history
Co-authored-by: Michael Sun <[email protected]>
Co-authored-by: mShan0 <[email protected]>
  • Loading branch information
3 people authored Oct 21, 2023
1 parent 443701f commit 78a4530
Show file tree
Hide file tree
Showing 21 changed files with 2,043 additions and 1,569 deletions.
565 changes: 312 additions & 253 deletions src/metadata-parser.ts

Large diffs are not rendered by default.

164 changes: 88 additions & 76 deletions src/token/colmetadata-token-parser.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import metadataParse, { type Metadata } from '../metadata-parser';
import { readMetadata, type Metadata } from '../metadata-parser';

import Parser, { type ParserOptions } from './stream-parser';
import { ColMetadataToken } from './token';
import { NotEnoughDataError, Result, readBVarChar, readUInt16LE, readUInt8, readUsVarChar } from './helpers';

export interface ColumnMetadata extends Metadata {
/**
Expand All @@ -12,101 +13,112 @@ export interface ColumnMetadata extends Metadata {
tableName?: string | string[] | undefined;
}

function readTableName(parser: Parser, options: ParserOptions, metadata: Metadata, callback: (tableName?: string | string[]) => void) {
if (metadata.type.hasTableName) {
if (options.tdsVersion >= '7_2') {
parser.readUInt8((numberOfTableNameParts) => {
const tableName: string[] = [];
function readTableName(buf: Buffer, offset: number, metadata: Metadata, options: ParserOptions): Result<string | string[] | undefined> {
if (!metadata.type.hasTableName) {
return new Result(undefined, offset);
}

let i = 0;
function next(done: () => void) {
if (numberOfTableNameParts === i) {
return done();
}
if (options.tdsVersion < '7_2') {
return readUsVarChar(buf, offset);
}

parser.readUsVarChar((part) => {
tableName.push(part);
let numberOfTableNameParts;
({ offset, value: numberOfTableNameParts } = readUInt8(buf, offset));

i++;
const tableName: string[] = [];
for (let i = 0; i < numberOfTableNameParts; i++) {
let tableNamePart;
({ offset, value: tableNamePart } = readUsVarChar(buf, offset));

next(done);
});
}

next(() => {
callback(tableName);
});
});
} else {
parser.readUsVarChar(callback);
}
} else {
callback(undefined);
tableName.push(tableNamePart);
}

return new Result(tableName, offset);
}

function readColumnName(parser: Parser, options: ParserOptions, index: number, metadata: Metadata, callback: (colName: string) => void) {
parser.readBVarChar((colName) => {
if (options.columnNameReplacer) {
callback(options.columnNameReplacer(colName, index, metadata));
} else if (options.camelCaseColumns) {
callback(colName.replace(/^[A-Z]/, function(s) {
return s.toLowerCase();
}));
} else {
callback(colName);
}
});
function readColumnName(buf: Buffer, offset: number, index: number, metadata: Metadata, options: ParserOptions): Result<string> {
let colName;
({ offset, value: colName } = readBVarChar(buf, offset));

if (options.columnNameReplacer) {
return new Result(options.columnNameReplacer(colName, index, metadata), offset);
} else if (options.camelCaseColumns) {
return new Result(colName.replace(/^[A-Z]/, function(s) {
return s.toLowerCase();
}), offset);
} else {
return new Result(colName, offset);
}
}

function readColumn(parser: Parser, options: ParserOptions, index: number, callback: (column: ColumnMetadata) => void) {
metadataParse(parser, options, (metadata) => {
readTableName(parser, options, metadata, (tableName) => {
readColumnName(parser, options, index, metadata, (colName) => {
callback({
userType: metadata.userType,
flags: metadata.flags,
type: metadata.type,
collation: metadata.collation,
precision: metadata.precision,
scale: metadata.scale,
udtInfo: metadata.udtInfo,
dataLength: metadata.dataLength,
schema: metadata.schema,
colName: colName,
tableName: tableName
});
});
});
});
function readColumn(buf: Buffer, offset: number, options: ParserOptions, index: number) {
let metadata;
({ offset, value: metadata } = readMetadata(buf, offset, options));

let tableName;
({ offset, value: tableName } = readTableName(buf, offset, metadata, options));

let colName;
({ offset, value: colName } = readColumnName(buf, offset, index, metadata, options));

return new Result({
userType: metadata.userType,
flags: metadata.flags,
type: metadata.type,
collation: metadata.collation,
precision: metadata.precision,
scale: metadata.scale,
udtInfo: metadata.udtInfo,
dataLength: metadata.dataLength,
schema: metadata.schema,
colName: colName,
tableName: tableName
}, offset);
}

async function colMetadataParser(parser: Parser): Promise<ColMetadataToken> {
while (parser.buffer.length - parser.position < 2) {
await parser.streamBuffer.waitForChunk();
}
let columnCount;

while (true) {
let offset;

try {
({ offset, value: columnCount } = readUInt16LE(parser.buffer, parser.position));
} catch (err) {
if (err instanceof NotEnoughDataError) {
await parser.waitForChunk();
continue;
}

throw err;
}

const columnCount = parser.buffer.readUInt16LE(parser.position);
parser.position += 2;
parser.position = offset;
break;
}

const columns: ColumnMetadata[] = [];
for (let i = 0; i < columnCount; i++) {
let column: ColumnMetadata;

readColumn(parser, parser.options, i, (c) => {
column = c;
});
while (true) {
let column: ColumnMetadata;
let offset;

try {
({ offset, value: column } = readColumn(parser.buffer, parser.position, parser.options, i));
} catch (err: any) {
if (err instanceof NotEnoughDataError) {
await parser.waitForChunk();
continue;
}

while (parser.suspended) {
await parser.streamBuffer.waitForChunk();
throw err;
}

parser.suspended = false;
const next = parser.next!;
parser.position = offset;
columns.push(column);

next();
break;
}

columns.push(column!);
}

return new ColMetadataToken(columns);
Expand Down
76 changes: 36 additions & 40 deletions src/token/done-token-parser.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import Parser, { type ParserOptions } from './stream-parser';
import { type ParserOptions } from './stream-parser';
import { DoneToken, DoneInProcToken, DoneProcToken } from './token';
import { Result, readBigUInt64LE, readUInt16LE, readUInt32LE } from './helpers';

// s2.2.7.5/6/7

Expand All @@ -22,51 +23,46 @@ interface TokenData {
curCmd: number;
}

function parseToken(parser: Parser, options: ParserOptions, callback: (data: TokenData) => void) {
parser.readUInt16LE((status) => {
const more = !!(status & STATUS.MORE);
const sqlError = !!(status & STATUS.ERROR);
const rowCountValid = !!(status & STATUS.COUNT);
const attention = !!(status & STATUS.ATTN);
const serverError = !!(status & STATUS.SRVERROR);
function readToken(buf: Buffer, offset: number, options: ParserOptions): Result<TokenData> {
let status;
({ offset, value: status } = readUInt16LE(buf, offset));

parser.readUInt16LE((curCmd) => {
const next = (rowCount: number) => {
callback({
more: more,
sqlError: sqlError,
attention: attention,
serverError: serverError,
rowCount: rowCountValid ? rowCount : undefined,
curCmd: curCmd
});
};
const more = !!(status & STATUS.MORE);
const sqlError = !!(status & STATUS.ERROR);
const rowCountValid = !!(status & STATUS.COUNT);
const attention = !!(status & STATUS.ATTN);
const serverError = !!(status & STATUS.SRVERROR);

if (options.tdsVersion < '7_2') {
parser.readUInt32LE(next);
} else {
parser.readBigUInt64LE((rowCount) => {
next(Number(rowCount));
});
}
});
});
let curCmd;
({ offset, value: curCmd } = readUInt16LE(buf, offset));

let rowCount;
({ offset, value: rowCount } = (options.tdsVersion < '7_2' ? readUInt32LE : readBigUInt64LE)(buf, offset));

return new Result({
more: more,
sqlError: sqlError,
attention: attention,
serverError: serverError,
rowCount: rowCountValid ? Number(rowCount) : undefined,
curCmd: curCmd
}, offset);
}

export function doneParser(parser: Parser, options: ParserOptions, callback: (token: DoneToken) => void) {
parseToken(parser, options, (data) => {
callback(new DoneToken(data));
});
export function doneParser(buf: Buffer, offset: number, options: ParserOptions): Result<DoneToken> {
let value;
({ offset, value } = readToken(buf, offset, options));
return new Result(new DoneToken(value), offset);
}

export function doneInProcParser(parser: Parser, options: ParserOptions, callback: (token: DoneInProcToken) => void) {
parseToken(parser, options, (data) => {
callback(new DoneInProcToken(data));
});
export function doneInProcParser(buf: Buffer, offset: number, options: ParserOptions): Result<DoneInProcToken> {
let value;
({ offset, value } = readToken(buf, offset, options));
return new Result(new DoneInProcToken(value), offset);
}

export function doneProcParser(parser: Parser, options: ParserOptions, callback: (token: DoneProcToken) => void) {
parseToken(parser, options, (data) => {
callback(new DoneProcToken(data));
});
export function doneProcParser(buf: Buffer, offset: number, options: ParserOptions): Result<DoneProcToken> {
let value;
({ offset, value } = readToken(buf, offset, options));
return new Result(new DoneProcToken(value), offset);
}
Loading

0 comments on commit 78a4530

Please sign in to comment.