supercat666's picture
add igv
78c921d
raw
history blame
17 kB
"use strict";
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
Object.defineProperty(exports, "__esModule", { value: true });
exports.RecordBatchJSONWriter = exports.RecordBatchFileWriter = exports.RecordBatchStreamWriter = exports.RecordBatchWriter = void 0;
const tslib_1 = require("tslib");
const table_js_1 = require("../table.js");
const message_js_1 = require("./message.js");
const vector_js_1 = require("../vector.js");
const type_js_1 = require("../type.js");
const message_js_2 = require("./metadata/message.js");
const metadata = tslib_1.__importStar(require("./metadata/message.js"));
const file_js_1 = require("./metadata/file.js");
const enum_js_1 = require("../enum.js");
const typecomparator_js_1 = require("../visitor/typecomparator.js");
const stream_js_1 = require("../io/stream.js");
const vectorassembler_js_1 = require("../visitor/vectorassembler.js");
const jsontypeassembler_js_1 = require("../visitor/jsontypeassembler.js");
const jsonvectorassembler_js_1 = require("../visitor/jsonvectorassembler.js");
const buffer_js_1 = require("../util/buffer.js");
const recordbatch_js_1 = require("../recordbatch.js");
const interfaces_js_1 = require("../io/interfaces.js");
const compat_js_1 = require("../util/compat.js");
class RecordBatchWriter extends interfaces_js_1.ReadableInterop {
constructor(options) {
super();
this._position = 0;
this._started = false;
// @ts-ignore
this._sink = new stream_js_1.AsyncByteQueue();
this._schema = null;
this._dictionaryBlocks = [];
this._recordBatchBlocks = [];
this._dictionaryDeltaOffsets = new Map();
(0, compat_js_1.isObject)(options) || (options = { autoDestroy: true, writeLegacyIpcFormat: false });
this._autoDestroy = (typeof options.autoDestroy === 'boolean') ? options.autoDestroy : true;
this._writeLegacyIpcFormat = (typeof options.writeLegacyIpcFormat === 'boolean') ? options.writeLegacyIpcFormat : false;
}
/** @nocollapse */
// @ts-ignore
static throughNode(options) {
throw new Error(`"throughNode" not available in this environment`);
}
/** @nocollapse */
static throughDOM(
// @ts-ignore
writableStrategy,
// @ts-ignore
readableStrategy) {
throw new Error(`"throughDOM" not available in this environment`);
}
toString(sync = false) {
return this._sink.toString(sync);
}
toUint8Array(sync = false) {
return this._sink.toUint8Array(sync);
}
writeAll(input) {
if ((0, compat_js_1.isPromise)(input)) {
return input.then((x) => this.writeAll(x));
}
else if ((0, compat_js_1.isAsyncIterable)(input)) {
return writeAllAsync(this, input);
}
return writeAll(this, input);
}
get closed() { return this._sink.closed; }
[Symbol.asyncIterator]() { return this._sink[Symbol.asyncIterator](); }
toDOMStream(options) { return this._sink.toDOMStream(options); }
toNodeStream(options) { return this._sink.toNodeStream(options); }
close() {
return this.reset()._sink.close();
}
abort(reason) {
return this.reset()._sink.abort(reason);
}
finish() {
this._autoDestroy ? this.close() : this.reset(this._sink, this._schema);
return this;
}
reset(sink = this._sink, schema = null) {
if ((sink === this._sink) || (sink instanceof stream_js_1.AsyncByteQueue)) {
this._sink = sink;
}
else {
this._sink = new stream_js_1.AsyncByteQueue();
if (sink && (0, compat_js_1.isWritableDOMStream)(sink)) {
this.toDOMStream({ type: 'bytes' }).pipeTo(sink);
}
else if (sink && (0, compat_js_1.isWritableNodeStream)(sink)) {
this.toNodeStream({ objectMode: false }).pipe(sink);
}
}
if (this._started && this._schema) {
this._writeFooter(this._schema);
}
this._started = false;
this._dictionaryBlocks = [];
this._recordBatchBlocks = [];
this._dictionaryDeltaOffsets = new Map();
if (!schema || !((0, typecomparator_js_1.compareSchemas)(schema, this._schema))) {
if (schema == null) {
this._position = 0;
this._schema = null;
}
else {
this._started = true;
this._schema = schema;
this._writeSchema(schema);
}
}
return this;
}
write(payload) {
let schema = null;
if (!this._sink) {
throw new Error(`RecordBatchWriter is closed`);
}
else if (payload == null) {
return this.finish() && undefined;
}
else if (payload instanceof table_js_1.Table && !(schema = payload.schema)) {
return this.finish() && undefined;
}
else if (payload instanceof recordbatch_js_1.RecordBatch && !(schema = payload.schema)) {
return this.finish() && undefined;
}
if (schema && !(0, typecomparator_js_1.compareSchemas)(schema, this._schema)) {
if (this._started && this._autoDestroy) {
return this.close();
}
this.reset(this._sink, schema);
}
if (payload instanceof recordbatch_js_1.RecordBatch) {
if (!(payload instanceof recordbatch_js_1._InternalEmptyPlaceholderRecordBatch)) {
this._writeRecordBatch(payload);
}
}
else if (payload instanceof table_js_1.Table) {
this.writeAll(payload.batches);
}
else if ((0, compat_js_1.isIterable)(payload)) {
this.writeAll(payload);
}
}
_writeMessage(message, alignment = 8) {
const a = alignment - 1;
const buffer = message_js_2.Message.encode(message);
const flatbufferSize = buffer.byteLength;
const prefixSize = !this._writeLegacyIpcFormat ? 8 : 4;
const alignedSize = (flatbufferSize + prefixSize + a) & ~a;
const nPaddingBytes = alignedSize - flatbufferSize - prefixSize;
if (message.headerType === enum_js_1.MessageHeader.RecordBatch) {
this._recordBatchBlocks.push(new file_js_1.FileBlock(alignedSize, message.bodyLength, this._position));
}
else if (message.headerType === enum_js_1.MessageHeader.DictionaryBatch) {
this._dictionaryBlocks.push(new file_js_1.FileBlock(alignedSize, message.bodyLength, this._position));
}
// If not in legacy pre-0.15.0 mode, write the stream continuation indicator
if (!this._writeLegacyIpcFormat) {
this._write(Int32Array.of(-1));
}
// Write the flatbuffer size prefix including padding
this._write(Int32Array.of(alignedSize - prefixSize));
// Write the flatbuffer
if (flatbufferSize > 0) {
this._write(buffer);
}
// Write any padding
return this._writePadding(nPaddingBytes);
}
_write(chunk) {
if (this._started) {
const buffer = (0, buffer_js_1.toUint8Array)(chunk);
if (buffer && buffer.byteLength > 0) {
this._sink.write(buffer);
this._position += buffer.byteLength;
}
}
return this;
}
_writeSchema(schema) {
return this._writeMessage(message_js_2.Message.from(schema));
}
// @ts-ignore
_writeFooter(schema) {
// eos bytes
return this._writeLegacyIpcFormat
? this._write(Int32Array.of(0))
: this._write(Int32Array.of(-1, 0));
}
_writeMagic() {
return this._write(message_js_1.MAGIC);
}
_writePadding(nBytes) {
return nBytes > 0 ? this._write(new Uint8Array(nBytes)) : this;
}
_writeRecordBatch(batch) {
const { byteLength, nodes, bufferRegions, buffers } = vectorassembler_js_1.VectorAssembler.assemble(batch);
const recordBatch = new metadata.RecordBatch(batch.numRows, nodes, bufferRegions);
const message = message_js_2.Message.from(recordBatch, byteLength);
return this
._writeDictionaries(batch)
._writeMessage(message)
._writeBodyBuffers(buffers);
}
_writeDictionaryBatch(dictionary, id, isDelta = false) {
this._dictionaryDeltaOffsets.set(id, dictionary.length + (this._dictionaryDeltaOffsets.get(id) || 0));
const { byteLength, nodes, bufferRegions, buffers } = vectorassembler_js_1.VectorAssembler.assemble(new vector_js_1.Vector([dictionary]));
const recordBatch = new metadata.RecordBatch(dictionary.length, nodes, bufferRegions);
const dictionaryBatch = new metadata.DictionaryBatch(recordBatch, id, isDelta);
const message = message_js_2.Message.from(dictionaryBatch, byteLength);
return this
._writeMessage(message)
._writeBodyBuffers(buffers);
}
_writeBodyBuffers(buffers) {
let buffer;
let size, padding;
for (let i = -1, n = buffers.length; ++i < n;) {
if ((buffer = buffers[i]) && (size = buffer.byteLength) > 0) {
this._write(buffer);
if ((padding = ((size + 7) & ~7) - size) > 0) {
this._writePadding(padding);
}
}
}
return this;
}
_writeDictionaries(batch) {
for (let [id, dictionary] of batch.dictionaries) {
let offset = this._dictionaryDeltaOffsets.get(id) || 0;
if (offset === 0 || (dictionary = dictionary === null || dictionary === void 0 ? void 0 : dictionary.slice(offset)).length > 0) {
for (const data of dictionary.data) {
this._writeDictionaryBatch(data, id, offset > 0);
offset += data.length;
}
}
}
return this;
}
}
exports.RecordBatchWriter = RecordBatchWriter;
/** @ignore */
class RecordBatchStreamWriter extends RecordBatchWriter {
/** @nocollapse */
static writeAll(input, options) {
const writer = new RecordBatchStreamWriter(options);
if ((0, compat_js_1.isPromise)(input)) {
return input.then((x) => writer.writeAll(x));
}
else if ((0, compat_js_1.isAsyncIterable)(input)) {
return writeAllAsync(writer, input);
}
return writeAll(writer, input);
}
}
exports.RecordBatchStreamWriter = RecordBatchStreamWriter;
/** @ignore */
class RecordBatchFileWriter extends RecordBatchWriter {
/** @nocollapse */
static writeAll(input) {
const writer = new RecordBatchFileWriter();
if ((0, compat_js_1.isPromise)(input)) {
return input.then((x) => writer.writeAll(x));
}
else if ((0, compat_js_1.isAsyncIterable)(input)) {
return writeAllAsync(writer, input);
}
return writeAll(writer, input);
}
constructor() {
super();
this._autoDestroy = true;
}
// @ts-ignore
_writeSchema(schema) {
return this._writeMagic()._writePadding(2);
}
_writeFooter(schema) {
const buffer = file_js_1.Footer.encode(new file_js_1.Footer(schema, enum_js_1.MetadataVersion.V4, this._recordBatchBlocks, this._dictionaryBlocks));
return super
._writeFooter(schema) // EOS bytes for sequential readers
._write(buffer) // Write the flatbuffer
._write(Int32Array.of(buffer.byteLength)) // then the footer size suffix
._writeMagic(); // then the magic suffix
}
}
exports.RecordBatchFileWriter = RecordBatchFileWriter;
/** @ignore */
class RecordBatchJSONWriter extends RecordBatchWriter {
constructor() {
super();
this._autoDestroy = true;
this._recordBatches = [];
this._dictionaries = [];
}
/** @nocollapse */
static writeAll(input) {
return new RecordBatchJSONWriter().writeAll(input);
}
_writeMessage() { return this; }
// @ts-ignore
_writeFooter(schema) { return this; }
_writeSchema(schema) {
return this._write(`{\n "schema": ${JSON.stringify({ fields: schema.fields.map(field => fieldToJSON(field)) }, null, 2)}`);
}
_writeDictionaries(batch) {
if (batch.dictionaries.size > 0) {
this._dictionaries.push(batch);
}
return this;
}
_writeDictionaryBatch(dictionary, id, isDelta = false) {
this._dictionaryDeltaOffsets.set(id, dictionary.length + (this._dictionaryDeltaOffsets.get(id) || 0));
this._write(this._dictionaryBlocks.length === 0 ? ` ` : `,\n `);
this._write(`${dictionaryBatchToJSON(dictionary, id, isDelta)}`);
this._dictionaryBlocks.push(new file_js_1.FileBlock(0, 0, 0));
return this;
}
_writeRecordBatch(batch) {
this._writeDictionaries(batch);
this._recordBatches.push(batch);
return this;
}
close() {
if (this._dictionaries.length > 0) {
this._write(`,\n "dictionaries": [\n`);
for (const batch of this._dictionaries) {
super._writeDictionaries(batch);
}
this._write(`\n ]`);
}
if (this._recordBatches.length > 0) {
for (let i = -1, n = this._recordBatches.length; ++i < n;) {
this._write(i === 0 ? `,\n "batches": [\n ` : `,\n `);
this._write(`${recordBatchToJSON(this._recordBatches[i])}`);
this._recordBatchBlocks.push(new file_js_1.FileBlock(0, 0, 0));
}
this._write(`\n ]`);
}
if (this._schema) {
this._write(`\n}`);
}
this._dictionaries = [];
this._recordBatches = [];
return super.close();
}
}
exports.RecordBatchJSONWriter = RecordBatchJSONWriter;
/** @ignore */
function writeAll(writer, input) {
let chunks = input;
if (input instanceof table_js_1.Table) {
chunks = input.batches;
writer.reset(undefined, input.schema);
}
for (const batch of chunks) {
writer.write(batch);
}
return writer.finish();
}
/** @ignore */
function writeAllAsync(writer, batches) {
var batches_1, batches_1_1;
var e_1, _a;
return tslib_1.__awaiter(this, void 0, void 0, function* () {
try {
for (batches_1 = tslib_1.__asyncValues(batches); batches_1_1 = yield batches_1.next(), !batches_1_1.done;) {
const batch = batches_1_1.value;
writer.write(batch);
}
}
catch (e_1_1) { e_1 = { error: e_1_1 }; }
finally {
try {
if (batches_1_1 && !batches_1_1.done && (_a = batches_1.return)) yield _a.call(batches_1);
}
finally { if (e_1) throw e_1.error; }
}
return writer.finish();
});
}
/** @ignore */
function fieldToJSON({ name, type, nullable }) {
const assembler = new jsontypeassembler_js_1.JSONTypeAssembler();
return {
'name': name, 'nullable': nullable,
'type': assembler.visit(type),
'children': (type.children || []).map((field) => fieldToJSON(field)),
'dictionary': !type_js_1.DataType.isDictionary(type) ? undefined : {
'id': type.id,
'isOrdered': type.isOrdered,
'indexType': assembler.visit(type.indices)
}
};
}
/** @ignore */
function dictionaryBatchToJSON(dictionary, id, isDelta = false) {
const [columns] = jsonvectorassembler_js_1.JSONVectorAssembler.assemble(new recordbatch_js_1.RecordBatch({ [id]: dictionary }));
return JSON.stringify({
'id': id,
'isDelta': isDelta,
'data': {
'count': dictionary.length,
'columns': columns
}
}, null, 2);
}
/** @ignore */
function recordBatchToJSON(records) {
const [columns] = jsonvectorassembler_js_1.JSONVectorAssembler.assemble(records);
return JSON.stringify({
'count': records.numRows,
'columns': columns
}, null, 2);
}
//# sourceMappingURL=writer.js.map