"use strict"; // Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. Object.defineProperty(exports, "__esModule", { value: true }); exports.RecordBatchJSONWriter = exports.RecordBatchFileWriter = exports.RecordBatchStreamWriter = exports.RecordBatchWriter = void 0; const tslib_1 = require("tslib"); const table_js_1 = require("../table.js"); const message_js_1 = require("./message.js"); const vector_js_1 = require("../vector.js"); const type_js_1 = require("../type.js"); const message_js_2 = require("./metadata/message.js"); const metadata = tslib_1.__importStar(require("./metadata/message.js")); const file_js_1 = require("./metadata/file.js"); const enum_js_1 = require("../enum.js"); const typecomparator_js_1 = require("../visitor/typecomparator.js"); const stream_js_1 = require("../io/stream.js"); const vectorassembler_js_1 = require("../visitor/vectorassembler.js"); const jsontypeassembler_js_1 = require("../visitor/jsontypeassembler.js"); const jsonvectorassembler_js_1 = require("../visitor/jsonvectorassembler.js"); const buffer_js_1 = require("../util/buffer.js"); const recordbatch_js_1 = require("../recordbatch.js"); const interfaces_js_1 = require("../io/interfaces.js"); const compat_js_1 = require("../util/compat.js"); class RecordBatchWriter extends interfaces_js_1.ReadableInterop { constructor(options) { super(); this._position = 0; this._started = false; // @ts-ignore this._sink = new stream_js_1.AsyncByteQueue(); this._schema = null; this._dictionaryBlocks = []; this._recordBatchBlocks = []; this._dictionaryDeltaOffsets = new Map(); (0, compat_js_1.isObject)(options) || (options = { autoDestroy: true, writeLegacyIpcFormat: false }); this._autoDestroy = (typeof options.autoDestroy === 'boolean') ? options.autoDestroy : true; this._writeLegacyIpcFormat = (typeof options.writeLegacyIpcFormat === 'boolean') ? options.writeLegacyIpcFormat : false; } /** @nocollapse */ // @ts-ignore static throughNode(options) { throw new Error(`"throughNode" not available in this environment`); } /** @nocollapse */ static throughDOM( // @ts-ignore writableStrategy, // @ts-ignore readableStrategy) { throw new Error(`"throughDOM" not available in this environment`); } toString(sync = false) { return this._sink.toString(sync); } toUint8Array(sync = false) { return this._sink.toUint8Array(sync); } writeAll(input) { if ((0, compat_js_1.isPromise)(input)) { return input.then((x) => this.writeAll(x)); } else if ((0, compat_js_1.isAsyncIterable)(input)) { return writeAllAsync(this, input); } return writeAll(this, input); } get closed() { return this._sink.closed; } [Symbol.asyncIterator]() { return this._sink[Symbol.asyncIterator](); } toDOMStream(options) { return this._sink.toDOMStream(options); } toNodeStream(options) { return this._sink.toNodeStream(options); } close() { return this.reset()._sink.close(); } abort(reason) { return this.reset()._sink.abort(reason); } finish() { this._autoDestroy ? this.close() : this.reset(this._sink, this._schema); return this; } reset(sink = this._sink, schema = null) { if ((sink === this._sink) || (sink instanceof stream_js_1.AsyncByteQueue)) { this._sink = sink; } else { this._sink = new stream_js_1.AsyncByteQueue(); if (sink && (0, compat_js_1.isWritableDOMStream)(sink)) { this.toDOMStream({ type: 'bytes' }).pipeTo(sink); } else if (sink && (0, compat_js_1.isWritableNodeStream)(sink)) { this.toNodeStream({ objectMode: false }).pipe(sink); } } if (this._started && this._schema) { this._writeFooter(this._schema); } this._started = false; this._dictionaryBlocks = []; this._recordBatchBlocks = []; this._dictionaryDeltaOffsets = new Map(); if (!schema || !((0, typecomparator_js_1.compareSchemas)(schema, this._schema))) { if (schema == null) { this._position = 0; this._schema = null; } else { this._started = true; this._schema = schema; this._writeSchema(schema); } } return this; } write(payload) { let schema = null; if (!this._sink) { throw new Error(`RecordBatchWriter is closed`); } else if (payload == null) { return this.finish() && undefined; } else if (payload instanceof table_js_1.Table && !(schema = payload.schema)) { return this.finish() && undefined; } else if (payload instanceof recordbatch_js_1.RecordBatch && !(schema = payload.schema)) { return this.finish() && undefined; } if (schema && !(0, typecomparator_js_1.compareSchemas)(schema, this._schema)) { if (this._started && this._autoDestroy) { return this.close(); } this.reset(this._sink, schema); } if (payload instanceof recordbatch_js_1.RecordBatch) { if (!(payload instanceof recordbatch_js_1._InternalEmptyPlaceholderRecordBatch)) { this._writeRecordBatch(payload); } } else if (payload instanceof table_js_1.Table) { this.writeAll(payload.batches); } else if ((0, compat_js_1.isIterable)(payload)) { this.writeAll(payload); } } _writeMessage(message, alignment = 8) { const a = alignment - 1; const buffer = message_js_2.Message.encode(message); const flatbufferSize = buffer.byteLength; const prefixSize = !this._writeLegacyIpcFormat ? 8 : 4; const alignedSize = (flatbufferSize + prefixSize + a) & ~a; const nPaddingBytes = alignedSize - flatbufferSize - prefixSize; if (message.headerType === enum_js_1.MessageHeader.RecordBatch) { this._recordBatchBlocks.push(new file_js_1.FileBlock(alignedSize, message.bodyLength, this._position)); } else if (message.headerType === enum_js_1.MessageHeader.DictionaryBatch) { this._dictionaryBlocks.push(new file_js_1.FileBlock(alignedSize, message.bodyLength, this._position)); } // If not in legacy pre-0.15.0 mode, write the stream continuation indicator if (!this._writeLegacyIpcFormat) { this._write(Int32Array.of(-1)); } // Write the flatbuffer size prefix including padding this._write(Int32Array.of(alignedSize - prefixSize)); // Write the flatbuffer if (flatbufferSize > 0) { this._write(buffer); } // Write any padding return this._writePadding(nPaddingBytes); } _write(chunk) { if (this._started) { const buffer = (0, buffer_js_1.toUint8Array)(chunk); if (buffer && buffer.byteLength > 0) { this._sink.write(buffer); this._position += buffer.byteLength; } } return this; } _writeSchema(schema) { return this._writeMessage(message_js_2.Message.from(schema)); } // @ts-ignore _writeFooter(schema) { // eos bytes return this._writeLegacyIpcFormat ? this._write(Int32Array.of(0)) : this._write(Int32Array.of(-1, 0)); } _writeMagic() { return this._write(message_js_1.MAGIC); } _writePadding(nBytes) { return nBytes > 0 ? this._write(new Uint8Array(nBytes)) : this; } _writeRecordBatch(batch) { const { byteLength, nodes, bufferRegions, buffers } = vectorassembler_js_1.VectorAssembler.assemble(batch); const recordBatch = new metadata.RecordBatch(batch.numRows, nodes, bufferRegions); const message = message_js_2.Message.from(recordBatch, byteLength); return this ._writeDictionaries(batch) ._writeMessage(message) ._writeBodyBuffers(buffers); } _writeDictionaryBatch(dictionary, id, isDelta = false) { this._dictionaryDeltaOffsets.set(id, dictionary.length + (this._dictionaryDeltaOffsets.get(id) || 0)); const { byteLength, nodes, bufferRegions, buffers } = vectorassembler_js_1.VectorAssembler.assemble(new vector_js_1.Vector([dictionary])); const recordBatch = new metadata.RecordBatch(dictionary.length, nodes, bufferRegions); const dictionaryBatch = new metadata.DictionaryBatch(recordBatch, id, isDelta); const message = message_js_2.Message.from(dictionaryBatch, byteLength); return this ._writeMessage(message) ._writeBodyBuffers(buffers); } _writeBodyBuffers(buffers) { let buffer; let size, padding; for (let i = -1, n = buffers.length; ++i < n;) { if ((buffer = buffers[i]) && (size = buffer.byteLength) > 0) { this._write(buffer); if ((padding = ((size + 7) & ~7) - size) > 0) { this._writePadding(padding); } } } return this; } _writeDictionaries(batch) { for (let [id, dictionary] of batch.dictionaries) { let offset = this._dictionaryDeltaOffsets.get(id) || 0; if (offset === 0 || (dictionary = dictionary === null || dictionary === void 0 ? void 0 : dictionary.slice(offset)).length > 0) { for (const data of dictionary.data) { this._writeDictionaryBatch(data, id, offset > 0); offset += data.length; } } } return this; } } exports.RecordBatchWriter = RecordBatchWriter; /** @ignore */ class RecordBatchStreamWriter extends RecordBatchWriter { /** @nocollapse */ static writeAll(input, options) { const writer = new RecordBatchStreamWriter(options); if ((0, compat_js_1.isPromise)(input)) { return input.then((x) => writer.writeAll(x)); } else if ((0, compat_js_1.isAsyncIterable)(input)) { return writeAllAsync(writer, input); } return writeAll(writer, input); } } exports.RecordBatchStreamWriter = RecordBatchStreamWriter; /** @ignore */ class RecordBatchFileWriter extends RecordBatchWriter { /** @nocollapse */ static writeAll(input) { const writer = new RecordBatchFileWriter(); if ((0, compat_js_1.isPromise)(input)) { return input.then((x) => writer.writeAll(x)); } else if ((0, compat_js_1.isAsyncIterable)(input)) { return writeAllAsync(writer, input); } return writeAll(writer, input); } constructor() { super(); this._autoDestroy = true; } // @ts-ignore _writeSchema(schema) { return this._writeMagic()._writePadding(2); } _writeFooter(schema) { const buffer = file_js_1.Footer.encode(new file_js_1.Footer(schema, enum_js_1.MetadataVersion.V4, this._recordBatchBlocks, this._dictionaryBlocks)); return super ._writeFooter(schema) // EOS bytes for sequential readers ._write(buffer) // Write the flatbuffer ._write(Int32Array.of(buffer.byteLength)) // then the footer size suffix ._writeMagic(); // then the magic suffix } } exports.RecordBatchFileWriter = RecordBatchFileWriter; /** @ignore */ class RecordBatchJSONWriter extends RecordBatchWriter { constructor() { super(); this._autoDestroy = true; this._recordBatches = []; this._dictionaries = []; } /** @nocollapse */ static writeAll(input) { return new RecordBatchJSONWriter().writeAll(input); } _writeMessage() { return this; } // @ts-ignore _writeFooter(schema) { return this; } _writeSchema(schema) { return this._write(`{\n "schema": ${JSON.stringify({ fields: schema.fields.map(field => fieldToJSON(field)) }, null, 2)}`); } _writeDictionaries(batch) { if (batch.dictionaries.size > 0) { this._dictionaries.push(batch); } return this; } _writeDictionaryBatch(dictionary, id, isDelta = false) { this._dictionaryDeltaOffsets.set(id, dictionary.length + (this._dictionaryDeltaOffsets.get(id) || 0)); this._write(this._dictionaryBlocks.length === 0 ? ` ` : `,\n `); this._write(`${dictionaryBatchToJSON(dictionary, id, isDelta)}`); this._dictionaryBlocks.push(new file_js_1.FileBlock(0, 0, 0)); return this; } _writeRecordBatch(batch) { this._writeDictionaries(batch); this._recordBatches.push(batch); return this; } close() { if (this._dictionaries.length > 0) { this._write(`,\n "dictionaries": [\n`); for (const batch of this._dictionaries) { super._writeDictionaries(batch); } this._write(`\n ]`); } if (this._recordBatches.length > 0) { for (let i = -1, n = this._recordBatches.length; ++i < n;) { this._write(i === 0 ? `,\n "batches": [\n ` : `,\n `); this._write(`${recordBatchToJSON(this._recordBatches[i])}`); this._recordBatchBlocks.push(new file_js_1.FileBlock(0, 0, 0)); } this._write(`\n ]`); } if (this._schema) { this._write(`\n}`); } this._dictionaries = []; this._recordBatches = []; return super.close(); } } exports.RecordBatchJSONWriter = RecordBatchJSONWriter; /** @ignore */ function writeAll(writer, input) { let chunks = input; if (input instanceof table_js_1.Table) { chunks = input.batches; writer.reset(undefined, input.schema); } for (const batch of chunks) { writer.write(batch); } return writer.finish(); } /** @ignore */ function writeAllAsync(writer, batches) { var batches_1, batches_1_1; var e_1, _a; return tslib_1.__awaiter(this, void 0, void 0, function* () { try { for (batches_1 = tslib_1.__asyncValues(batches); batches_1_1 = yield batches_1.next(), !batches_1_1.done;) { const batch = batches_1_1.value; writer.write(batch); } } catch (e_1_1) { e_1 = { error: e_1_1 }; } finally { try { if (batches_1_1 && !batches_1_1.done && (_a = batches_1.return)) yield _a.call(batches_1); } finally { if (e_1) throw e_1.error; } } return writer.finish(); }); } /** @ignore */ function fieldToJSON({ name, type, nullable }) { const assembler = new jsontypeassembler_js_1.JSONTypeAssembler(); return { 'name': name, 'nullable': nullable, 'type': assembler.visit(type), 'children': (type.children || []).map((field) => fieldToJSON(field)), 'dictionary': !type_js_1.DataType.isDictionary(type) ? undefined : { 'id': type.id, 'isOrdered': type.isOrdered, 'indexType': assembler.visit(type.indices) } }; } /** @ignore */ function dictionaryBatchToJSON(dictionary, id, isDelta = false) { const [columns] = jsonvectorassembler_js_1.JSONVectorAssembler.assemble(new recordbatch_js_1.RecordBatch({ [id]: dictionary })); return JSON.stringify({ 'id': id, 'isDelta': isDelta, 'data': { 'count': dictionary.length, 'columns': columns } }, null, 2); } /** @ignore */ function recordBatchToJSON(records) { const [columns] = jsonvectorassembler_js_1.JSONVectorAssembler.assemble(records); return JSON.stringify({ 'count': records.numRows, 'columns': columns }, null, 2); } //# sourceMappingURL=writer.js.map