193 lines
7.7 KiB
JavaScript
193 lines
7.7 KiB
JavaScript
![]() |
"use strict";
|
||
|
/* --------------------------------------------------------------------------------------------
|
||
|
* Copyright (c) Microsoft Corporation. All rights reserved.
|
||
|
* Licensed under the MIT License. See License.txt in the project root for license information.
|
||
|
* ------------------------------------------------------------------------------------------ */
|
||
|
Object.defineProperty(exports, "__esModule", { value: true });
|
||
|
exports.ReadableStreamMessageReader = exports.AbstractMessageReader = exports.MessageReader = void 0;
|
||
|
const ral_1 = require("./ral");
|
||
|
const Is = require("./is");
|
||
|
const events_1 = require("./events");
|
||
|
const semaphore_1 = require("./semaphore");
|
||
|
var MessageReader;
|
||
|
(function (MessageReader) {
|
||
|
function is(value) {
|
||
|
let candidate = value;
|
||
|
return candidate && Is.func(candidate.listen) && Is.func(candidate.dispose) &&
|
||
|
Is.func(candidate.onError) && Is.func(candidate.onClose) && Is.func(candidate.onPartialMessage);
|
||
|
}
|
||
|
MessageReader.is = is;
|
||
|
})(MessageReader = exports.MessageReader || (exports.MessageReader = {}));
|
||
|
class AbstractMessageReader {
|
||
|
constructor() {
|
||
|
this.errorEmitter = new events_1.Emitter();
|
||
|
this.closeEmitter = new events_1.Emitter();
|
||
|
this.partialMessageEmitter = new events_1.Emitter();
|
||
|
}
|
||
|
dispose() {
|
||
|
this.errorEmitter.dispose();
|
||
|
this.closeEmitter.dispose();
|
||
|
}
|
||
|
get onError() {
|
||
|
return this.errorEmitter.event;
|
||
|
}
|
||
|
fireError(error) {
|
||
|
this.errorEmitter.fire(this.asError(error));
|
||
|
}
|
||
|
get onClose() {
|
||
|
return this.closeEmitter.event;
|
||
|
}
|
||
|
fireClose() {
|
||
|
this.closeEmitter.fire(undefined);
|
||
|
}
|
||
|
get onPartialMessage() {
|
||
|
return this.partialMessageEmitter.event;
|
||
|
}
|
||
|
firePartialMessage(info) {
|
||
|
this.partialMessageEmitter.fire(info);
|
||
|
}
|
||
|
asError(error) {
|
||
|
if (error instanceof Error) {
|
||
|
return error;
|
||
|
}
|
||
|
else {
|
||
|
return new Error(`Reader received error. Reason: ${Is.string(error.message) ? error.message : 'unknown'}`);
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
exports.AbstractMessageReader = AbstractMessageReader;
|
||
|
var ResolvedMessageReaderOptions;
|
||
|
(function (ResolvedMessageReaderOptions) {
|
||
|
function fromOptions(options) {
|
||
|
let charset;
|
||
|
let result;
|
||
|
let contentDecoder;
|
||
|
const contentDecoders = new Map();
|
||
|
let contentTypeDecoder;
|
||
|
const contentTypeDecoders = new Map();
|
||
|
if (options === undefined || typeof options === 'string') {
|
||
|
charset = options ?? 'utf-8';
|
||
|
}
|
||
|
else {
|
||
|
charset = options.charset ?? 'utf-8';
|
||
|
if (options.contentDecoder !== undefined) {
|
||
|
contentDecoder = options.contentDecoder;
|
||
|
contentDecoders.set(contentDecoder.name, contentDecoder);
|
||
|
}
|
||
|
if (options.contentDecoders !== undefined) {
|
||
|
for (const decoder of options.contentDecoders) {
|
||
|
contentDecoders.set(decoder.name, decoder);
|
||
|
}
|
||
|
}
|
||
|
if (options.contentTypeDecoder !== undefined) {
|
||
|
contentTypeDecoder = options.contentTypeDecoder;
|
||
|
contentTypeDecoders.set(contentTypeDecoder.name, contentTypeDecoder);
|
||
|
}
|
||
|
if (options.contentTypeDecoders !== undefined) {
|
||
|
for (const decoder of options.contentTypeDecoders) {
|
||
|
contentTypeDecoders.set(decoder.name, decoder);
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
if (contentTypeDecoder === undefined) {
|
||
|
contentTypeDecoder = (0, ral_1.default)().applicationJson.decoder;
|
||
|
contentTypeDecoders.set(contentTypeDecoder.name, contentTypeDecoder);
|
||
|
}
|
||
|
return { charset, contentDecoder, contentDecoders, contentTypeDecoder, contentTypeDecoders };
|
||
|
}
|
||
|
ResolvedMessageReaderOptions.fromOptions = fromOptions;
|
||
|
})(ResolvedMessageReaderOptions || (ResolvedMessageReaderOptions = {}));
|
||
|
class ReadableStreamMessageReader extends AbstractMessageReader {
|
||
|
constructor(readable, options) {
|
||
|
super();
|
||
|
this.readable = readable;
|
||
|
this.options = ResolvedMessageReaderOptions.fromOptions(options);
|
||
|
this.buffer = (0, ral_1.default)().messageBuffer.create(this.options.charset);
|
||
|
this._partialMessageTimeout = 10000;
|
||
|
this.nextMessageLength = -1;
|
||
|
this.messageToken = 0;
|
||
|
this.readSemaphore = new semaphore_1.Semaphore(1);
|
||
|
}
|
||
|
set partialMessageTimeout(timeout) {
|
||
|
this._partialMessageTimeout = timeout;
|
||
|
}
|
||
|
get partialMessageTimeout() {
|
||
|
return this._partialMessageTimeout;
|
||
|
}
|
||
|
listen(callback) {
|
||
|
this.nextMessageLength = -1;
|
||
|
this.messageToken = 0;
|
||
|
this.partialMessageTimer = undefined;
|
||
|
this.callback = callback;
|
||
|
const result = this.readable.onData((data) => {
|
||
|
this.onData(data);
|
||
|
});
|
||
|
this.readable.onError((error) => this.fireError(error));
|
||
|
this.readable.onClose(() => this.fireClose());
|
||
|
return result;
|
||
|
}
|
||
|
onData(data) {
|
||
|
this.buffer.append(data);
|
||
|
while (true) {
|
||
|
if (this.nextMessageLength === -1) {
|
||
|
const headers = this.buffer.tryReadHeaders(true);
|
||
|
if (!headers) {
|
||
|
return;
|
||
|
}
|
||
|
const contentLength = headers.get('content-length');
|
||
|
if (!contentLength) {
|
||
|
this.fireError(new Error('Header must provide a Content-Length property.'));
|
||
|
return;
|
||
|
}
|
||
|
const length = parseInt(contentLength);
|
||
|
if (isNaN(length)) {
|
||
|
this.fireError(new Error('Content-Length value must be a number.'));
|
||
|
return;
|
||
|
}
|
||
|
this.nextMessageLength = length;
|
||
|
}
|
||
|
const body = this.buffer.tryReadBody(this.nextMessageLength);
|
||
|
if (body === undefined) {
|
||
|
/** We haven't received the full message yet. */
|
||
|
this.setPartialMessageTimer();
|
||
|
return;
|
||
|
}
|
||
|
this.clearPartialMessageTimer();
|
||
|
this.nextMessageLength = -1;
|
||
|
// Make sure that we convert one received message after the
|
||
|
// other. Otherwise it could happen that a decoding of a second
|
||
|
// smaller message finished before the decoding of a first larger
|
||
|
// message and then we would deliver the second message first.
|
||
|
this.readSemaphore.lock(async () => {
|
||
|
const bytes = this.options.contentDecoder !== undefined
|
||
|
? await this.options.contentDecoder.decode(body)
|
||
|
: body;
|
||
|
const message = await this.options.contentTypeDecoder.decode(bytes, this.options);
|
||
|
this.callback(message);
|
||
|
}).catch((error) => {
|
||
|
this.fireError(error);
|
||
|
});
|
||
|
}
|
||
|
}
|
||
|
clearPartialMessageTimer() {
|
||
|
if (this.partialMessageTimer) {
|
||
|
this.partialMessageTimer.dispose();
|
||
|
this.partialMessageTimer = undefined;
|
||
|
}
|
||
|
}
|
||
|
setPartialMessageTimer() {
|
||
|
this.clearPartialMessageTimer();
|
||
|
if (this._partialMessageTimeout <= 0) {
|
||
|
return;
|
||
|
}
|
||
|
this.partialMessageTimer = (0, ral_1.default)().timer.setTimeout((token, timeout) => {
|
||
|
this.partialMessageTimer = undefined;
|
||
|
if (token === this.messageToken) {
|
||
|
this.firePartialMessage({ messageToken: token, waitingTime: timeout });
|
||
|
this.setPartialMessageTimer();
|
||
|
}
|
||
|
}, this._partialMessageTimeout, this.messageToken, this._partialMessageTimeout);
|
||
|
}
|
||
|
}
|
||
|
exports.ReadableStreamMessageReader = ReadableStreamMessageReader;
|