// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. import { makeData } from '../data.js'; import { Vector } from '../vector.js'; import { DataType, Struct, TypeMap } from '../type.js'; import { MessageHeader } from '../enum.js'; import { Footer } from './metadata/file.js'; import { Schema, Field } from '../schema.js'; import streamAdapters from '../io/adapters.js'; import { Message } from './metadata/message.js'; import * as metadata from './metadata/message.js'; import { ArrayBufferViewInput } from '../util/buffer.js'; import { ByteStream, AsyncByteStream } from '../io/stream.js'; import { RandomAccessFile, AsyncRandomAccessFile } from '../io/file.js'; import { VectorLoader, JSONVectorLoader } from '../visitor/vectorloader.js'; import { RecordBatch, _InternalEmptyPlaceholderRecordBatch } from '../recordbatch.js'; import { FileHandle, ArrowJSONLike, ITERATOR_DONE, ReadableInterop, } from '../io/interfaces.js'; import { MessageReader, AsyncMessageReader, JSONMessageReader, checkForMagicArrowString, magicLength, magicAndPadding, magicX2AndPadding } from './message.js'; import { isPromise, isIterable, isAsyncIterable, isIteratorResult, isArrowJSON, isFileHandle, isFetchResponse, isReadableDOMStream, isReadableNodeStream } from '../util/compat.js'; /** @ignore */ export type FromArg0 = ArrowJSONLike; /** @ignore */ export type FromArg1 = PromiseLike; /** @ignore */ export type FromArg2 = Iterable | ArrayBufferViewInput; /** @ignore */ export type FromArg3 = PromiseLike | ArrayBufferViewInput>; /** @ignore */ export type FromArg4 = Response | NodeJS.ReadableStream | ReadableStream | AsyncIterable; /** @ignore */ export type FromArg5 = FileHandle | PromiseLike | PromiseLike; /** @ignore */ export type FromArgs = FromArg0 | FromArg1 | FromArg2 | FromArg3 | FromArg4 | FromArg5; /** @ignore */ type OpenOptions = { autoDestroy?: boolean }; /** @ignore */ type RecordBatchReaders = RecordBatchFileReader | RecordBatchStreamReader; /** @ignore */ type AsyncRecordBatchReaders = AsyncRecordBatchFileReader | AsyncRecordBatchStreamReader; /** @ignore */ type RecordBatchFileReaders = RecordBatchFileReader | AsyncRecordBatchFileReader; /** @ignore */ type RecordBatchStreamReaders = RecordBatchStreamReader | AsyncRecordBatchStreamReader; export class RecordBatchReader extends ReadableInterop> { protected _impl: RecordBatchReaderImpls; protected constructor(impl: RecordBatchReaderImpls) { super(); this._impl = impl; } public get closed() { return this._impl.closed; } public get schema() { return this._impl.schema; } public get autoDestroy() { return this._impl.autoDestroy; } public get dictionaries() { return this._impl.dictionaries; } public get numDictionaries() { return this._impl.numDictionaries; } public get numRecordBatches() { return this._impl.numRecordBatches; } public get footer(): Footer | null { return this._impl.isFile() ? this._impl.footer : null; } public isSync(): this is RecordBatchReaders { return this._impl.isSync(); } public isAsync(): this is AsyncRecordBatchReaders { return this._impl.isAsync(); } public isFile(): this is RecordBatchFileReaders { return this._impl.isFile(); } public isStream(): this is RecordBatchStreamReaders { return this._impl.isStream(); } public next() { return this._impl.next(); } public throw(value?: any) { return this._impl.throw(value); } public return(value?: any) { return this._impl.return(value); } public cancel() { return this._impl.cancel(); } public reset(schema?: Schema | null): this { this._impl.reset(schema); this._DOMStream = undefined; this._nodeStream = undefined; return this; } public open(options?: OpenOptions) { const opening = this._impl.open(options); return isPromise(opening) ? opening.then(() => this) : this; } public readRecordBatch(index: number): RecordBatch | null | Promise | null> { return this._impl.isFile() ? this._impl.readRecordBatch(index) : null; } public [Symbol.iterator](): IterableIterator> { return (>>this._impl)[Symbol.iterator](); } public [Symbol.asyncIterator](): AsyncIterableIterator> { return (>>this._impl)[Symbol.asyncIterator](); } public toDOMStream() { return streamAdapters.toDOMStream>( (this.isSync() ? { [Symbol.iterator]: () => this } as Iterable> : { [Symbol.asyncIterator]: () => this } as AsyncIterable>)); } public toNodeStream() { return streamAdapters.toNodeStream>( (this.isSync() ? { [Symbol.iterator]: () => this } as Iterable> : { [Symbol.asyncIterator]: () => this } as AsyncIterable>), { objectMode: true }); } /** @nocollapse */ // @ts-ignore public static throughNode(options?: import('stream').DuplexOptions & { autoDestroy: boolean }): import('stream').Duplex { throw new Error(`"throughNode" not available in this environment`); } /** @nocollapse */ public static throughDOM( // @ts-ignore writableStrategy?: ByteLengthQueuingStrategy, // @ts-ignore readableStrategy?: { autoDestroy: boolean } ): { writable: WritableStream; readable: ReadableStream> } { throw new Error(`"throughDOM" not available in this environment`); } public static from(source: T): T; public static from(source: FromArg0): RecordBatchStreamReader; public static from(source: FromArg1): Promise>; public static from(source: FromArg2): RecordBatchFileReader | RecordBatchStreamReader; public static from(source: FromArg3): Promise | RecordBatchStreamReader>; public static from(source: FromArg4): Promise | AsyncRecordBatchStreamReader>; public static from(source: FromArg5): Promise | AsyncRecordBatchStreamReader>; /** @nocollapse */ public static from(source: any) { if (source instanceof RecordBatchReader) { return source; } else if (isArrowJSON(source)) { return fromArrowJSON(source); } else if (isFileHandle(source)) { return fromFileHandle(source); } else if (isPromise(source)) { return (async () => await RecordBatchReader.from(await source))(); } else if (isFetchResponse(source) || isReadableDOMStream(source) || isReadableNodeStream(source) || isAsyncIterable(source)) { return fromAsyncByteStream(new AsyncByteStream(source)); } return fromByteStream(new ByteStream(source)); } public static readAll(source: T): T extends RecordBatchReaders ? IterableIterator : AsyncIterableIterator; public static readAll(source: FromArg0): IterableIterator>; public static readAll(source: FromArg1): AsyncIterableIterator>; public static readAll(source: FromArg2): IterableIterator | RecordBatchStreamReader>; public static readAll(source: FromArg3): AsyncIterableIterator | RecordBatchStreamReader>; public static readAll(source: FromArg4): AsyncIterableIterator>; public static readAll(source: FromArg5): AsyncIterableIterator>; /** @nocollapse */ public static readAll(source: any) { if (source instanceof RecordBatchReader) { return source.isSync() ? readAllSync(source) : readAllAsync(source as AsyncRecordBatchReaders); } else if (isArrowJSON(source) || ArrayBuffer.isView(source) || isIterable(source) || isIteratorResult(source)) { return readAllSync(source) as IterableIterator>; } return readAllAsync(source) as AsyncIterableIterator | AsyncRecordBatchReaders>; } } // // Since TS is a structural type system, we define the following subclass stubs // so that concrete types exist to associate with with the interfaces below. // // The implementation for each RecordBatchReader is hidden away in the set of // `RecordBatchReaderImpl` classes in the second half of this file. This allows // us to export a single RecordBatchReader class, and swap out the impl based // on the io primitives or underlying arrow (JSON, file, or stream) at runtime. // // Async/await makes our job a bit harder, since it forces everything to be // either fully sync or fully async. This is why the logic for the reader impls // has been duplicated into both sync and async variants. Since the RBR // delegates to its impl, an RBR with an AsyncRecordBatchFileReaderImpl for // example will return async/await-friendly Promises, but one with a (sync) // RecordBatchStreamReaderImpl will always return values. Nothing should be // different about their logic, aside from the async handling. This is also why // this code looks highly structured, as it should be nearly identical and easy // to follow. // /** @ignore */ export class RecordBatchStreamReader extends RecordBatchReader { constructor(protected _impl: RecordBatchStreamReaderImpl) { super(_impl); } public readAll() { return [...this]; } public [Symbol.iterator]() { return (this._impl as IterableIterator>)[Symbol.iterator](); } public async *[Symbol.asyncIterator](): AsyncIterableIterator> { yield* this[Symbol.iterator](); } } /** @ignore */ export class AsyncRecordBatchStreamReader extends RecordBatchReader { constructor(protected _impl: AsyncRecordBatchStreamReaderImpl) { super(_impl); } public async readAll() { const batches = new Array>(); for await (const batch of this) { batches.push(batch); } return batches; } public [Symbol.iterator](): IterableIterator> { throw new Error(`AsyncRecordBatchStreamReader is not Iterable`); } public [Symbol.asyncIterator]() { return (this._impl as AsyncIterableIterator>)[Symbol.asyncIterator](); } } /** @ignore */ export class RecordBatchFileReader extends RecordBatchStreamReader { constructor(protected _impl: RecordBatchFileReaderImpl) { super(_impl); } } /** @ignore */ export class AsyncRecordBatchFileReader extends AsyncRecordBatchStreamReader { constructor(protected _impl: AsyncRecordBatchFileReaderImpl) { super(_impl); } } // // Now override the return types for each sync/async RecordBatchReader variant // /** @ignore */ export interface RecordBatchStreamReader extends RecordBatchReader { open(options?: OpenOptions | undefined): this; cancel(): void; throw(value?: any): IteratorResult; return(value?: any): IteratorResult; next(value?: any): IteratorResult>; } /** @ignore */ export interface AsyncRecordBatchStreamReader extends RecordBatchReader { open(options?: OpenOptions | undefined): Promise; cancel(): Promise; throw(value?: any): Promise>; return(value?: any): Promise>; next(value?: any): Promise>>; } /** @ignore */ export interface RecordBatchFileReader extends RecordBatchStreamReader { readRecordBatch(index: number): RecordBatch | null; } /** @ignore */ export interface AsyncRecordBatchFileReader extends AsyncRecordBatchStreamReader { readRecordBatch(index: number): Promise | null>; } /** @ignore */ type RecordBatchReaderImpls = RecordBatchJSONReaderImpl | RecordBatchFileReaderImpl | RecordBatchStreamReaderImpl | AsyncRecordBatchFileReaderImpl | AsyncRecordBatchStreamReaderImpl; /** @ignore */ interface RecordBatchReaderImpl { closed: boolean; schema: Schema; autoDestroy: boolean; dictionaries: Map; isFile(): this is RecordBatchFileReaders; isStream(): this is RecordBatchStreamReaders; isSync(): this is RecordBatchReaders; isAsync(): this is AsyncRecordBatchReaders; reset(schema?: Schema | null): this; } /** @ignore */ interface RecordBatchStreamReaderImpl extends RecordBatchReaderImpl { open(options?: OpenOptions): this; cancel(): void; throw(value?: any): IteratorResult; return(value?: any): IteratorResult; next(value?: any): IteratorResult>; [Symbol.iterator](): IterableIterator>; } /** @ignore */ interface AsyncRecordBatchStreamReaderImpl extends RecordBatchReaderImpl { open(options?: OpenOptions): Promise; cancel(): Promise; throw(value?: any): Promise>; return(value?: any): Promise>; next(value?: any): Promise>>; [Symbol.asyncIterator](): AsyncIterableIterator>; } /** @ignore */ interface RecordBatchFileReaderImpl extends RecordBatchStreamReaderImpl { readRecordBatch(index: number): RecordBatch | null; } /** @ignore */ interface AsyncRecordBatchFileReaderImpl extends AsyncRecordBatchStreamReaderImpl { readRecordBatch(index: number): Promise | null>; } /** @ignore */ abstract class RecordBatchReaderImpl implements RecordBatchReaderImpl { declare public schema: Schema; public closed = false; public autoDestroy = true; public dictionaries: Map; protected _dictionaryIndex = 0; protected _recordBatchIndex = 0; public get numDictionaries() { return this._dictionaryIndex; } public get numRecordBatches() { return this._recordBatchIndex; } constructor(dictionaries = new Map()) { this.dictionaries = dictionaries; } public isSync(): this is RecordBatchReaders { return false; } public isAsync(): this is AsyncRecordBatchReaders { return false; } public isFile(): this is RecordBatchFileReaders { return false; } public isStream(): this is RecordBatchStreamReaders { return false; } public reset(schema?: Schema | null) { this._dictionaryIndex = 0; this._recordBatchIndex = 0; this.schema = schema; this.dictionaries = new Map(); return this; } protected _loadRecordBatch(header: metadata.RecordBatch, body: any) { const children = this._loadVectors(header, body, this.schema.fields); const data = makeData({ type: new Struct(this.schema.fields), length: header.length, children }); return new RecordBatch(this.schema, data); } protected _loadDictionaryBatch(header: metadata.DictionaryBatch, body: any) { const { id, isDelta } = header; const { dictionaries, schema } = this; const dictionary = dictionaries.get(id); if (isDelta || !dictionary) { const type = schema.dictionaries.get(id)!; const data = this._loadVectors(header.data, body, [type]); return (dictionary && isDelta ? dictionary.concat( new Vector(data)) : new Vector(data)).memoize() as Vector; } return dictionary.memoize(); } protected _loadVectors(header: metadata.RecordBatch, body: any, types: (Field | DataType)[]) { return new VectorLoader(body, header.nodes, header.buffers, this.dictionaries).visitMany(types); } } /** @ignore */ class RecordBatchStreamReaderImpl extends RecordBatchReaderImpl implements IterableIterator> { protected _reader: MessageReader; protected _handle: ByteStream | ArrowJSONLike; constructor(source: ByteStream | ArrowJSONLike, dictionaries?: Map) { super(dictionaries); this._reader = !isArrowJSON(source) ? new MessageReader(this._handle = source) : new JSONMessageReader(this._handle = source); } public isSync(): this is RecordBatchReaders { return true; } public isStream(): this is RecordBatchStreamReaders { return true; } public [Symbol.iterator](): IterableIterator> { return this as IterableIterator>; } public cancel() { if (!this.closed && (this.closed = true)) { this.reset()._reader.return(); this._reader = null; this.dictionaries = null; } } public open(options?: OpenOptions) { if (!this.closed) { this.autoDestroy = shouldAutoDestroy(this, options); if (!(this.schema || (this.schema = this._reader.readSchema()!))) { this.cancel(); } } return this; } public throw(value?: any): IteratorResult { if (!this.closed && this.autoDestroy && (this.closed = true)) { return this.reset()._reader.throw(value); } return ITERATOR_DONE; } public return(value?: any): IteratorResult { if (!this.closed && this.autoDestroy && (this.closed = true)) { return this.reset()._reader.return(value); } return ITERATOR_DONE; } public next(): IteratorResult> { if (this.closed) { return ITERATOR_DONE; } let message: Message | null; const { _reader: reader } = this; while (message = this._readNextMessageAndValidate()) { if (message.isSchema()) { this.reset(message.header()); } else if (message.isRecordBatch()) { this._recordBatchIndex++; const header = message.header(); const buffer = reader.readMessageBody(message.bodyLength); const recordBatch = this._loadRecordBatch(header, buffer); return { done: false, value: recordBatch }; } else if (message.isDictionaryBatch()) { this._dictionaryIndex++; const header = message.header(); const buffer = reader.readMessageBody(message.bodyLength); const vector = this._loadDictionaryBatch(header, buffer); this.dictionaries.set(header.id, vector); } } if (this.schema && this._recordBatchIndex === 0) { this._recordBatchIndex++; return { done: false, value: new _InternalEmptyPlaceholderRecordBatch(this.schema) }; } return this.return(); } protected _readNextMessageAndValidate(type?: T | null) { return this._reader.readMessage(type); } } /** @ignore */ class AsyncRecordBatchStreamReaderImpl extends RecordBatchReaderImpl implements AsyncIterableIterator> { protected _handle: AsyncByteStream; protected _reader: AsyncMessageReader; constructor(source: AsyncByteStream, dictionaries?: Map) { super(dictionaries); this._reader = new AsyncMessageReader(this._handle = source); } public isAsync(): this is AsyncRecordBatchReaders { return true; } public isStream(): this is RecordBatchStreamReaders { return true; } public [Symbol.asyncIterator](): AsyncIterableIterator> { return this as AsyncIterableIterator>; } public async cancel() { if (!this.closed && (this.closed = true)) { await this.reset()._reader.return(); this._reader = null; this.dictionaries = null; } } public async open(options?: OpenOptions) { if (!this.closed) { this.autoDestroy = shouldAutoDestroy(this, options); if (!(this.schema || (this.schema = (await this._reader.readSchema())!))) { await this.cancel(); } } return this; } public async throw(value?: any): Promise> { if (!this.closed && this.autoDestroy && (this.closed = true)) { return await this.reset()._reader.throw(value); } return ITERATOR_DONE; } public async return(value?: any): Promise> { if (!this.closed && this.autoDestroy && (this.closed = true)) { return await this.reset()._reader.return(value); } return ITERATOR_DONE; } public async next() { if (this.closed) { return ITERATOR_DONE; } let message: Message | null; const { _reader: reader } = this; while (message = await this._readNextMessageAndValidate()) { if (message.isSchema()) { await this.reset(message.header()); } else if (message.isRecordBatch()) { this._recordBatchIndex++; const header = message.header(); const buffer = await reader.readMessageBody(message.bodyLength); const recordBatch = this._loadRecordBatch(header, buffer); return { done: false, value: recordBatch }; } else if (message.isDictionaryBatch()) { this._dictionaryIndex++; const header = message.header(); const buffer = await reader.readMessageBody(message.bodyLength); const vector = this._loadDictionaryBatch(header, buffer); this.dictionaries.set(header.id, vector); } } if (this.schema && this._recordBatchIndex === 0) { this._recordBatchIndex++; return { done: false, value: new _InternalEmptyPlaceholderRecordBatch(this.schema) }; } return await this.return(); } protected async _readNextMessageAndValidate(type?: T | null) { return await this._reader.readMessage(type); } } /** @ignore */ class RecordBatchFileReaderImpl extends RecordBatchStreamReaderImpl { protected _footer?: Footer; declare protected _handle: RandomAccessFile; public get footer() { return this._footer!; } public get numDictionaries() { return this._footer ? this._footer.numDictionaries : 0; } public get numRecordBatches() { return this._footer ? this._footer.numRecordBatches : 0; } constructor(source: RandomAccessFile | ArrayBufferViewInput, dictionaries?: Map) { super(source instanceof RandomAccessFile ? source : new RandomAccessFile(source), dictionaries); } public isSync(): this is RecordBatchReaders { return true; } public isFile(): this is RecordBatchFileReaders { return true; } public open(options?: OpenOptions) { if (!this.closed && !this._footer) { this.schema = (this._footer = this._readFooter()).schema; for (const block of this._footer.dictionaryBatches()) { block && this._readDictionaryBatch(this._dictionaryIndex++); } } return super.open(options); } public readRecordBatch(index: number) { if (this.closed) { return null; } if (!this._footer) { this.open(); } const block = this._footer?.getRecordBatch(index); if (block && this._handle.seek(block.offset)) { const message = this._reader.readMessage(MessageHeader.RecordBatch); if (message?.isRecordBatch()) { const header = message.header(); const buffer = this._reader.readMessageBody(message.bodyLength); const recordBatch = this._loadRecordBatch(header, buffer); return recordBatch; } } return null; } protected _readDictionaryBatch(index: number) { const block = this._footer?.getDictionaryBatch(index); if (block && this._handle.seek(block.offset)) { const message = this._reader.readMessage(MessageHeader.DictionaryBatch); if (message?.isDictionaryBatch()) { const header = message.header(); const buffer = this._reader.readMessageBody(message.bodyLength); const vector = this._loadDictionaryBatch(header, buffer); this.dictionaries.set(header.id, vector); } } } protected _readFooter() { const { _handle } = this; const offset = _handle.size - magicAndPadding; const length = _handle.readInt32(offset); const buffer = _handle.readAt(offset - length, length); return Footer.decode(buffer); } protected _readNextMessageAndValidate(type?: T | null): Message | null { if (!this._footer) { this.open(); } if (this._footer && this._recordBatchIndex < this.numRecordBatches) { const block = this._footer?.getRecordBatch(this._recordBatchIndex); if (block && this._handle.seek(block.offset)) { return this._reader.readMessage(type); } } return null; } } /** @ignore */ class AsyncRecordBatchFileReaderImpl extends AsyncRecordBatchStreamReaderImpl implements AsyncRecordBatchFileReaderImpl { protected _footer?: Footer; declare protected _handle: AsyncRandomAccessFile; public get footer() { return this._footer!; } public get numDictionaries() { return this._footer ? this._footer.numDictionaries : 0; } public get numRecordBatches() { return this._footer ? this._footer.numRecordBatches : 0; } constructor(source: FileHandle, byteLength?: number, dictionaries?: Map); constructor(source: FileHandle | AsyncRandomAccessFile, dictionaries?: Map); constructor(source: FileHandle | AsyncRandomAccessFile, ...rest: any[]) { const byteLength = typeof rest[0] !== 'number' ? rest.shift() : undefined; const dictionaries = rest[0] instanceof Map ? >rest.shift() : undefined; super(source instanceof AsyncRandomAccessFile ? source : new AsyncRandomAccessFile(source, byteLength), dictionaries); } public isFile(): this is RecordBatchFileReaders { return true; } public isAsync(): this is AsyncRecordBatchReaders { return true; } public async open(options?: OpenOptions) { if (!this.closed && !this._footer) { this.schema = (this._footer = await this._readFooter()).schema; for (const block of this._footer.dictionaryBatches()) { block && await this._readDictionaryBatch(this._dictionaryIndex++); } } return await super.open(options); } public async readRecordBatch(index: number) { if (this.closed) { return null; } if (!this._footer) { await this.open(); } const block = this._footer?.getRecordBatch(index); if (block && (await this._handle.seek(block.offset))) { const message = await this._reader.readMessage(MessageHeader.RecordBatch); if (message?.isRecordBatch()) { const header = message.header(); const buffer = await this._reader.readMessageBody(message.bodyLength); const recordBatch = this._loadRecordBatch(header, buffer); return recordBatch; } } return null; } protected async _readDictionaryBatch(index: number) { const block = this._footer?.getDictionaryBatch(index); if (block && (await this._handle.seek(block.offset))) { const message = await this._reader.readMessage(MessageHeader.DictionaryBatch); if (message?.isDictionaryBatch()) { const header = message.header(); const buffer = await this._reader.readMessageBody(message.bodyLength); const vector = this._loadDictionaryBatch(header, buffer); this.dictionaries.set(header.id, vector); } } } protected async _readFooter() { const { _handle } = this; _handle._pending && await _handle._pending; const offset = _handle.size - magicAndPadding; const length = await _handle.readInt32(offset); const buffer = await _handle.readAt(offset - length, length); return Footer.decode(buffer); } protected async _readNextMessageAndValidate(type?: T | null): Promise | null> { if (!this._footer) { await this.open(); } if (this._footer && this._recordBatchIndex < this.numRecordBatches) { const block = this._footer.getRecordBatch(this._recordBatchIndex); if (block && await this._handle.seek(block.offset)) { return await this._reader.readMessage(type); } } return null; } } /** @ignore */ class RecordBatchJSONReaderImpl extends RecordBatchStreamReaderImpl { constructor(source: ArrowJSONLike, dictionaries?: Map) { super(source, dictionaries); } protected _loadVectors(header: metadata.RecordBatch, body: any, types: (Field | DataType)[]) { return new JSONVectorLoader(body, header.nodes, header.buffers, this.dictionaries).visitMany(types); } } // // Define some helper functions and static implementations down here. There's // a bit of branching in the static methods that can lead to the same routines // being executed, so we've broken those out here for readability. // /** @ignore */ function shouldAutoDestroy(self: { autoDestroy: boolean }, options?: OpenOptions) { return options && (typeof options['autoDestroy'] === 'boolean') ? options['autoDestroy'] : self['autoDestroy']; } /** @ignore */ function* readAllSync(source: RecordBatchReaders | FromArg0 | FromArg2) { const reader = RecordBatchReader.from(source) as RecordBatchReaders; try { if (!reader.open({ autoDestroy: false }).closed) { do { yield reader; } while (!(reader.reset().open()).closed); } } finally { reader.cancel(); } } /** @ignore */ async function* readAllAsync(source: AsyncRecordBatchReaders | FromArg1 | FromArg3 | FromArg4 | FromArg5) { const reader = await RecordBatchReader.from(source) as RecordBatchReader; try { if (!(await reader.open({ autoDestroy: false })).closed) { do { yield reader; } while (!(await reader.reset().open()).closed); } } finally { await reader.cancel(); } } /** @ignore */ function fromArrowJSON(source: ArrowJSONLike) { return new RecordBatchStreamReader(new RecordBatchJSONReaderImpl(source)); } /** @ignore */ function fromByteStream(source: ByteStream) { const bytes = source.peek((magicLength + 7) & ~7); return bytes && bytes.byteLength >= 4 ? !checkForMagicArrowString(bytes) ? new RecordBatchStreamReader(new RecordBatchStreamReaderImpl(source)) : new RecordBatchFileReader(new RecordBatchFileReaderImpl(source.read())) : new RecordBatchStreamReader(new RecordBatchStreamReaderImpl(function* (): any { }())); } /** @ignore */ async function fromAsyncByteStream(source: AsyncByteStream) { const bytes = await source.peek((magicLength + 7) & ~7); return bytes && bytes.byteLength >= 4 ? !checkForMagicArrowString(bytes) ? new AsyncRecordBatchStreamReader(new AsyncRecordBatchStreamReaderImpl(source)) : new RecordBatchFileReader(new RecordBatchFileReaderImpl(await source.read())) : new AsyncRecordBatchStreamReader(new AsyncRecordBatchStreamReaderImpl(async function* (): any { }())); } /** @ignore */ async function fromFileHandle(source: FileHandle) { const { size } = await source.stat(); const file = new AsyncRandomAccessFile(source, size); if (size >= magicX2AndPadding && checkForMagicArrowString(await file.readAt(0, (magicLength + 7) & ~7))) { return new AsyncRecordBatchFileReader(new AsyncRecordBatchFileReaderImpl(file)); } return new AsyncRecordBatchStreamReader(new AsyncRecordBatchStreamReaderImpl(file)); }