Spaces:
Sleeping
Sleeping
// Licensed to the Apache Software Foundation (ASF) under one | |
// or more contributor license agreements. See the NOTICE file | |
// distributed with this work for additional information | |
// regarding copyright ownership. The ASF licenses this file | |
// to you under the Apache License, Version 2.0 (the | |
// "License"); you may not use this file except in compliance | |
// with the License. You may obtain a copy of the License at | |
// | |
// http://www.apache.org/licenses/LICENSE-2.0 | |
// | |
// Unless required by applicable law or agreed to in writing, | |
// software distributed under the License is distributed on an | |
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
// KIND, either express or implied. See the License for the | |
// specific language governing permissions and limitations | |
// under the License. | |
import { makeData } from '../data.js'; | |
import { Vector } from '../vector.js'; | |
import { DataType, Struct, TypeMap } from '../type.js'; | |
import { MessageHeader } from '../enum.js'; | |
import { Footer } from './metadata/file.js'; | |
import { Schema, Field } from '../schema.js'; | |
import streamAdapters from '../io/adapters.js'; | |
import { Message } from './metadata/message.js'; | |
import * as metadata from './metadata/message.js'; | |
import { ArrayBufferViewInput } from '../util/buffer.js'; | |
import { ByteStream, AsyncByteStream } from '../io/stream.js'; | |
import { RandomAccessFile, AsyncRandomAccessFile } from '../io/file.js'; | |
import { VectorLoader, JSONVectorLoader } from '../visitor/vectorloader.js'; | |
import { RecordBatch, _InternalEmptyPlaceholderRecordBatch } from '../recordbatch.js'; | |
import { | |
FileHandle, | |
ArrowJSONLike, | |
ITERATOR_DONE, | |
ReadableInterop, | |
} from '../io/interfaces.js'; | |
import { | |
MessageReader, AsyncMessageReader, JSONMessageReader, | |
checkForMagicArrowString, magicLength, magicAndPadding, magicX2AndPadding | |
} from './message.js'; | |
import { | |
isPromise, | |
isIterable, isAsyncIterable, | |
isIteratorResult, isArrowJSON, | |
isFileHandle, isFetchResponse, | |
isReadableDOMStream, isReadableNodeStream | |
} from '../util/compat.js'; | |
/** @ignore */ export type FromArg0 = ArrowJSONLike; | |
/** @ignore */ export type FromArg1 = PromiseLike<ArrowJSONLike>; | |
/** @ignore */ export type FromArg2 = Iterable<ArrayBufferViewInput> | ArrayBufferViewInput; | |
/** @ignore */ export type FromArg3 = PromiseLike<Iterable<ArrayBufferViewInput> | ArrayBufferViewInput>; | |
/** @ignore */ export type FromArg4 = Response | NodeJS.ReadableStream | ReadableStream<ArrayBufferViewInput> | AsyncIterable<ArrayBufferViewInput>; | |
/** @ignore */ export type FromArg5 = FileHandle | PromiseLike<FileHandle> | PromiseLike<FromArg4>; | |
/** @ignore */ export type FromArgs = FromArg0 | FromArg1 | FromArg2 | FromArg3 | FromArg4 | FromArg5; | |
/** @ignore */ type OpenOptions = { autoDestroy?: boolean }; | |
/** @ignore */ type RecordBatchReaders<T extends TypeMap = any> = RecordBatchFileReader<T> | RecordBatchStreamReader<T>; | |
/** @ignore */ type AsyncRecordBatchReaders<T extends TypeMap = any> = AsyncRecordBatchFileReader<T> | AsyncRecordBatchStreamReader<T>; | |
/** @ignore */ type RecordBatchFileReaders<T extends TypeMap = any> = RecordBatchFileReader<T> | AsyncRecordBatchFileReader<T>; | |
/** @ignore */ type RecordBatchStreamReaders<T extends TypeMap = any> = RecordBatchStreamReader<T> | AsyncRecordBatchStreamReader<T>; | |
export class RecordBatchReader<T extends TypeMap = any> extends ReadableInterop<RecordBatch<T>> { | |
protected _impl: RecordBatchReaderImpls<T>; | |
protected constructor(impl: RecordBatchReaderImpls<T>) { | |
super(); | |
this._impl = impl; | |
} | |
public get closed() { return this._impl.closed; } | |
public get schema() { return this._impl.schema; } | |
public get autoDestroy() { return this._impl.autoDestroy; } | |
public get dictionaries() { return this._impl.dictionaries; } | |
public get numDictionaries() { return this._impl.numDictionaries; } | |
public get numRecordBatches() { return this._impl.numRecordBatches; } | |
public get footer(): Footer | null { return this._impl.isFile() ? this._impl.footer : null; } | |
public isSync(): this is RecordBatchReaders<T> { return this._impl.isSync(); } | |
public isAsync(): this is AsyncRecordBatchReaders<T> { return this._impl.isAsync(); } | |
public isFile(): this is RecordBatchFileReaders<T> { return this._impl.isFile(); } | |
public isStream(): this is RecordBatchStreamReaders<T> { return this._impl.isStream(); } | |
public next() { | |
return this._impl.next(); | |
} | |
public throw(value?: any) { | |
return this._impl.throw(value); | |
} | |
public return(value?: any) { | |
return this._impl.return(value); | |
} | |
public cancel() { | |
return this._impl.cancel(); | |
} | |
public reset(schema?: Schema<T> | null): this { | |
this._impl.reset(schema); | |
this._DOMStream = undefined; | |
this._nodeStream = undefined; | |
return this; | |
} | |
public open(options?: OpenOptions) { | |
const opening = this._impl.open(options); | |
return isPromise(opening) ? opening.then(() => this) : this; | |
} | |
public readRecordBatch(index: number): RecordBatch<T> | null | Promise<RecordBatch<T> | null> { | |
return this._impl.isFile() ? this._impl.readRecordBatch(index) : null; | |
} | |
public [Symbol.iterator](): IterableIterator<RecordBatch<T>> { | |
return (<IterableIterator<RecordBatch<T>>>this._impl)[Symbol.iterator](); | |
} | |
public [Symbol.asyncIterator](): AsyncIterableIterator<RecordBatch<T>> { | |
return (<AsyncIterableIterator<RecordBatch<T>>>this._impl)[Symbol.asyncIterator](); | |
} | |
public toDOMStream() { | |
return streamAdapters.toDOMStream<RecordBatch<T>>( | |
(this.isSync() | |
? { [Symbol.iterator]: () => this } as Iterable<RecordBatch<T>> | |
: { [Symbol.asyncIterator]: () => this } as AsyncIterable<RecordBatch<T>>)); | |
} | |
public toNodeStream() { | |
return streamAdapters.toNodeStream<RecordBatch<T>>( | |
(this.isSync() | |
? { [Symbol.iterator]: () => this } as Iterable<RecordBatch<T>> | |
: { [Symbol.asyncIterator]: () => this } as AsyncIterable<RecordBatch<T>>), | |
{ objectMode: true }); | |
} | |
/** @nocollapse */ | |
// @ts-ignore | |
public static throughNode(options?: import('stream').DuplexOptions & { autoDestroy: boolean }): import('stream').Duplex { | |
throw new Error(`"throughNode" not available in this environment`); | |
} | |
/** @nocollapse */ | |
public static throughDOM<T extends TypeMap>( | |
// @ts-ignore | |
writableStrategy?: ByteLengthQueuingStrategy, | |
// @ts-ignore | |
readableStrategy?: { autoDestroy: boolean } | |
): { writable: WritableStream<Uint8Array>; readable: ReadableStream<RecordBatch<T>> } { | |
throw new Error(`"throughDOM" not available in this environment`); | |
} | |
public static from<T extends RecordBatchReader>(source: T): T; | |
public static from<T extends TypeMap = any>(source: FromArg0): RecordBatchStreamReader<T>; | |
public static from<T extends TypeMap = any>(source: FromArg1): Promise<RecordBatchStreamReader<T>>; | |
public static from<T extends TypeMap = any>(source: FromArg2): RecordBatchFileReader<T> | RecordBatchStreamReader<T>; | |
public static from<T extends TypeMap = any>(source: FromArg3): Promise<RecordBatchFileReader<T> | RecordBatchStreamReader<T>>; | |
public static from<T extends TypeMap = any>(source: FromArg4): Promise<AsyncRecordBatchFileReader<T> | AsyncRecordBatchStreamReader<T>>; | |
public static from<T extends TypeMap = any>(source: FromArg5): Promise<AsyncRecordBatchFileReader<T> | AsyncRecordBatchStreamReader<T>>; | |
/** @nocollapse */ | |
public static from<T extends TypeMap = any>(source: any) { | |
if (source instanceof RecordBatchReader) { | |
return source; | |
} else if (isArrowJSON(source)) { | |
return fromArrowJSON<T>(source); | |
} else if (isFileHandle(source)) { | |
return fromFileHandle<T>(source); | |
} else if (isPromise<any>(source)) { | |
return (async () => await RecordBatchReader.from<any>(await source))(); | |
} else if (isFetchResponse(source) || isReadableDOMStream(source) || isReadableNodeStream(source) || isAsyncIterable(source)) { | |
return fromAsyncByteStream<T>(new AsyncByteStream(source)); | |
} | |
return fromByteStream<T>(new ByteStream(source)); | |
} | |
public static readAll<T extends RecordBatchReader>(source: T): T extends RecordBatchReaders ? IterableIterator<T> : AsyncIterableIterator<T>; | |
public static readAll<T extends TypeMap = any>(source: FromArg0): IterableIterator<RecordBatchStreamReader<T>>; | |
public static readAll<T extends TypeMap = any>(source: FromArg1): AsyncIterableIterator<RecordBatchStreamReader<T>>; | |
public static readAll<T extends TypeMap = any>(source: FromArg2): IterableIterator<RecordBatchFileReader<T> | RecordBatchStreamReader<T>>; | |
public static readAll<T extends TypeMap = any>(source: FromArg3): AsyncIterableIterator<RecordBatchFileReader<T> | RecordBatchStreamReader<T>>; | |
public static readAll<T extends TypeMap = any>(source: FromArg4): AsyncIterableIterator<AsyncRecordBatchReaders<T>>; | |
public static readAll<T extends TypeMap = any>(source: FromArg5): AsyncIterableIterator<AsyncRecordBatchReaders<T>>; | |
/** @nocollapse */ | |
public static readAll<T extends TypeMap = any>(source: any) { | |
if (source instanceof RecordBatchReader) { | |
return source.isSync() ? readAllSync(source) : readAllAsync(source as AsyncRecordBatchReaders<T>); | |
} else if (isArrowJSON(source) || ArrayBuffer.isView(source) || isIterable<ArrayBufferViewInput>(source) || isIteratorResult(source)) { | |
return readAllSync<T>(source) as IterableIterator<RecordBatchReaders<T>>; | |
} | |
return readAllAsync<T>(source) as AsyncIterableIterator<RecordBatchReaders<T> | AsyncRecordBatchReaders<T>>; | |
} | |
} | |
// | |
// Since TS is a structural type system, we define the following subclass stubs | |
// so that concrete types exist to associate with with the interfaces below. | |
// | |
// The implementation for each RecordBatchReader is hidden away in the set of | |
// `RecordBatchReaderImpl` classes in the second half of this file. This allows | |
// us to export a single RecordBatchReader class, and swap out the impl based | |
// on the io primitives or underlying arrow (JSON, file, or stream) at runtime. | |
// | |
// Async/await makes our job a bit harder, since it forces everything to be | |
// either fully sync or fully async. This is why the logic for the reader impls | |
// has been duplicated into both sync and async variants. Since the RBR | |
// delegates to its impl, an RBR with an AsyncRecordBatchFileReaderImpl for | |
// example will return async/await-friendly Promises, but one with a (sync) | |
// RecordBatchStreamReaderImpl will always return values. Nothing should be | |
// different about their logic, aside from the async handling. This is also why | |
// this code looks highly structured, as it should be nearly identical and easy | |
// to follow. | |
// | |
/** @ignore */ | |
export class RecordBatchStreamReader<T extends TypeMap = any> extends RecordBatchReader<T> { | |
constructor(protected _impl: RecordBatchStreamReaderImpl<T>) { super(_impl); } | |
public readAll() { return [...this]; } | |
public [Symbol.iterator]() { return (this._impl as IterableIterator<RecordBatch<T>>)[Symbol.iterator](); } | |
public async *[Symbol.asyncIterator](): AsyncIterableIterator<RecordBatch<T>> { yield* this[Symbol.iterator](); } | |
} | |
/** @ignore */ | |
export class AsyncRecordBatchStreamReader<T extends TypeMap = any> extends RecordBatchReader<T> { | |
constructor(protected _impl: AsyncRecordBatchStreamReaderImpl<T>) { super(_impl); } | |
public async readAll() { | |
const batches = new Array<RecordBatch<T>>(); | |
for await (const batch of this) { batches.push(batch); } | |
return batches; | |
} | |
public [Symbol.iterator](): IterableIterator<RecordBatch<T>> { throw new Error(`AsyncRecordBatchStreamReader is not Iterable`); } | |
public [Symbol.asyncIterator]() { return (this._impl as AsyncIterableIterator<RecordBatch<T>>)[Symbol.asyncIterator](); } | |
} | |
/** @ignore */ | |
export class RecordBatchFileReader<T extends TypeMap = any> extends RecordBatchStreamReader<T> { | |
constructor(protected _impl: RecordBatchFileReaderImpl<T>) { super(_impl); } | |
} | |
/** @ignore */ | |
export class AsyncRecordBatchFileReader<T extends TypeMap = any> extends AsyncRecordBatchStreamReader<T> { | |
constructor(protected _impl: AsyncRecordBatchFileReaderImpl<T>) { super(_impl); } | |
} | |
// | |
// Now override the return types for each sync/async RecordBatchReader variant | |
// | |
/** @ignore */ | |
export interface RecordBatchStreamReader<T extends TypeMap = any> extends RecordBatchReader<T> { | |
open(options?: OpenOptions | undefined): this; | |
cancel(): void; | |
throw(value?: any): IteratorResult<any>; | |
return(value?: any): IteratorResult<any>; | |
next(value?: any): IteratorResult<RecordBatch<T>>; | |
} | |
/** @ignore */ | |
export interface AsyncRecordBatchStreamReader<T extends TypeMap = any> extends RecordBatchReader<T> { | |
open(options?: OpenOptions | undefined): Promise<this>; | |
cancel(): Promise<void>; | |
throw(value?: any): Promise<IteratorResult<any>>; | |
return(value?: any): Promise<IteratorResult<any>>; | |
next(value?: any): Promise<IteratorResult<RecordBatch<T>>>; | |
} | |
/** @ignore */ | |
export interface RecordBatchFileReader<T extends TypeMap = any> extends RecordBatchStreamReader<T> { | |
readRecordBatch(index: number): RecordBatch<T> | null; | |
} | |
/** @ignore */ | |
export interface AsyncRecordBatchFileReader<T extends TypeMap = any> extends AsyncRecordBatchStreamReader<T> { | |
readRecordBatch(index: number): Promise<RecordBatch<T> | null>; | |
} | |
/** @ignore */ | |
type RecordBatchReaderImpls<T extends TypeMap = any> = | |
RecordBatchJSONReaderImpl<T> | | |
RecordBatchFileReaderImpl<T> | | |
RecordBatchStreamReaderImpl<T> | | |
AsyncRecordBatchFileReaderImpl<T> | | |
AsyncRecordBatchStreamReaderImpl<T>; | |
/** @ignore */ | |
interface RecordBatchReaderImpl<T extends TypeMap = any> { | |
closed: boolean; | |
schema: Schema<T>; | |
autoDestroy: boolean; | |
dictionaries: Map<number, Vector>; | |
isFile(): this is RecordBatchFileReaders<T>; | |
isStream(): this is RecordBatchStreamReaders<T>; | |
isSync(): this is RecordBatchReaders<T>; | |
isAsync(): this is AsyncRecordBatchReaders<T>; | |
reset(schema?: Schema<T> | null): this; | |
} | |
/** @ignore */ | |
interface RecordBatchStreamReaderImpl<T extends TypeMap = any> extends RecordBatchReaderImpl<T> { | |
open(options?: OpenOptions): this; | |
cancel(): void; | |
throw(value?: any): IteratorResult<any>; | |
return(value?: any): IteratorResult<any>; | |
next(value?: any): IteratorResult<RecordBatch<T>>; | |
[Symbol.iterator](): IterableIterator<RecordBatch<T>>; | |
} | |
/** @ignore */ | |
interface AsyncRecordBatchStreamReaderImpl<T extends TypeMap = any> extends RecordBatchReaderImpl<T> { | |
open(options?: OpenOptions): Promise<this>; | |
cancel(): Promise<void>; | |
throw(value?: any): Promise<IteratorResult<any>>; | |
return(value?: any): Promise<IteratorResult<any>>; | |
next(value?: any): Promise<IteratorResult<RecordBatch<T>>>; | |
[Symbol.asyncIterator](): AsyncIterableIterator<RecordBatch<T>>; | |
} | |
/** @ignore */ | |
interface RecordBatchFileReaderImpl<T extends TypeMap = any> extends RecordBatchStreamReaderImpl<T> { | |
readRecordBatch(index: number): RecordBatch<T> | null; | |
} | |
/** @ignore */ | |
interface AsyncRecordBatchFileReaderImpl<T extends TypeMap = any> extends AsyncRecordBatchStreamReaderImpl<T> { | |
readRecordBatch(index: number): Promise<RecordBatch<T> | null>; | |
} | |
/** @ignore */ | |
abstract class RecordBatchReaderImpl<T extends TypeMap = any> implements RecordBatchReaderImpl<T> { | |
declare public schema: Schema<T>; | |
public closed = false; | |
public autoDestroy = true; | |
public dictionaries: Map<number, Vector>; | |
protected _dictionaryIndex = 0; | |
protected _recordBatchIndex = 0; | |
public get numDictionaries() { return this._dictionaryIndex; } | |
public get numRecordBatches() { return this._recordBatchIndex; } | |
constructor(dictionaries = new Map<number, Vector>()) { | |
this.dictionaries = dictionaries; | |
} | |
public isSync(): this is RecordBatchReaders<T> { return false; } | |
public isAsync(): this is AsyncRecordBatchReaders<T> { return false; } | |
public isFile(): this is RecordBatchFileReaders<T> { return false; } | |
public isStream(): this is RecordBatchStreamReaders<T> { return false; } | |
public reset(schema?: Schema<T> | null) { | |
this._dictionaryIndex = 0; | |
this._recordBatchIndex = 0; | |
this.schema = <any>schema; | |
this.dictionaries = new Map(); | |
return this; | |
} | |
protected _loadRecordBatch(header: metadata.RecordBatch, body: any) { | |
const children = this._loadVectors(header, body, this.schema.fields); | |
const data = makeData({ type: new Struct(this.schema.fields), length: header.length, children }); | |
return new RecordBatch(this.schema, data); | |
} | |
protected _loadDictionaryBatch(header: metadata.DictionaryBatch, body: any) { | |
const { id, isDelta } = header; | |
const { dictionaries, schema } = this; | |
const dictionary = dictionaries.get(id); | |
if (isDelta || !dictionary) { | |
const type = schema.dictionaries.get(id)!; | |
const data = this._loadVectors(header.data, body, [type]); | |
return (dictionary && isDelta ? dictionary.concat( | |
new Vector(data)) : | |
new Vector(data)).memoize() as Vector; | |
} | |
return dictionary.memoize(); | |
} | |
protected _loadVectors(header: metadata.RecordBatch, body: any, types: (Field | DataType)[]) { | |
return new VectorLoader(body, header.nodes, header.buffers, this.dictionaries).visitMany(types); | |
} | |
} | |
/** @ignore */ | |
class RecordBatchStreamReaderImpl<T extends TypeMap = any> extends RecordBatchReaderImpl<T> implements IterableIterator<RecordBatch<T>> { | |
protected _reader: MessageReader; | |
protected _handle: ByteStream | ArrowJSONLike; | |
constructor(source: ByteStream | ArrowJSONLike, dictionaries?: Map<number, Vector>) { | |
super(dictionaries); | |
this._reader = !isArrowJSON(source) | |
? new MessageReader(this._handle = source) | |
: new JSONMessageReader(this._handle = source); | |
} | |
public isSync(): this is RecordBatchReaders<T> { return true; } | |
public isStream(): this is RecordBatchStreamReaders<T> { return true; } | |
public [Symbol.iterator](): IterableIterator<RecordBatch<T>> { | |
return this as IterableIterator<RecordBatch<T>>; | |
} | |
public cancel() { | |
if (!this.closed && (this.closed = true)) { | |
this.reset()._reader.return(); | |
this._reader = <any>null; | |
this.dictionaries = <any>null; | |
} | |
} | |
public open(options?: OpenOptions) { | |
if (!this.closed) { | |
this.autoDestroy = shouldAutoDestroy(this, options); | |
if (!(this.schema || (this.schema = this._reader.readSchema()!))) { | |
this.cancel(); | |
} | |
} | |
return this; | |
} | |
public throw(value?: any): IteratorResult<any> { | |
if (!this.closed && this.autoDestroy && (this.closed = true)) { | |
return this.reset()._reader.throw(value); | |
} | |
return ITERATOR_DONE; | |
} | |
public return(value?: any): IteratorResult<any> { | |
if (!this.closed && this.autoDestroy && (this.closed = true)) { | |
return this.reset()._reader.return(value); | |
} | |
return ITERATOR_DONE; | |
} | |
public next(): IteratorResult<RecordBatch<T>> { | |
if (this.closed) { return ITERATOR_DONE; } | |
let message: Message | null; | |
const { _reader: reader } = this; | |
while (message = this._readNextMessageAndValidate()) { | |
if (message.isSchema()) { | |
this.reset(message.header()); | |
} else if (message.isRecordBatch()) { | |
this._recordBatchIndex++; | |
const header = message.header(); | |
const buffer = reader.readMessageBody(message.bodyLength); | |
const recordBatch = this._loadRecordBatch(header, buffer); | |
return { done: false, value: recordBatch }; | |
} else if (message.isDictionaryBatch()) { | |
this._dictionaryIndex++; | |
const header = message.header(); | |
const buffer = reader.readMessageBody(message.bodyLength); | |
const vector = this._loadDictionaryBatch(header, buffer); | |
this.dictionaries.set(header.id, vector); | |
} | |
} | |
if (this.schema && this._recordBatchIndex === 0) { | |
this._recordBatchIndex++; | |
return { done: false, value: new _InternalEmptyPlaceholderRecordBatch<T>(this.schema) }; | |
} | |
return this.return(); | |
} | |
protected _readNextMessageAndValidate<T extends MessageHeader>(type?: T | null) { | |
return this._reader.readMessage<T>(type); | |
} | |
} | |
/** @ignore */ | |
class AsyncRecordBatchStreamReaderImpl<T extends TypeMap = any> extends RecordBatchReaderImpl<T> implements AsyncIterableIterator<RecordBatch<T>> { | |
protected _handle: AsyncByteStream; | |
protected _reader: AsyncMessageReader; | |
constructor(source: AsyncByteStream, dictionaries?: Map<number, Vector>) { | |
super(dictionaries); | |
this._reader = new AsyncMessageReader(this._handle = source); | |
} | |
public isAsync(): this is AsyncRecordBatchReaders<T> { return true; } | |
public isStream(): this is RecordBatchStreamReaders<T> { return true; } | |
public [Symbol.asyncIterator](): AsyncIterableIterator<RecordBatch<T>> { | |
return this as AsyncIterableIterator<RecordBatch<T>>; | |
} | |
public async cancel() { | |
if (!this.closed && (this.closed = true)) { | |
await this.reset()._reader.return(); | |
this._reader = <any>null; | |
this.dictionaries = <any>null; | |
} | |
} | |
public async open(options?: OpenOptions) { | |
if (!this.closed) { | |
this.autoDestroy = shouldAutoDestroy(this, options); | |
if (!(this.schema || (this.schema = (await this._reader.readSchema())!))) { | |
await this.cancel(); | |
} | |
} | |
return this; | |
} | |
public async throw(value?: any): Promise<IteratorResult<any>> { | |
if (!this.closed && this.autoDestroy && (this.closed = true)) { | |
return await this.reset()._reader.throw(value); | |
} | |
return ITERATOR_DONE; | |
} | |
public async return(value?: any): Promise<IteratorResult<any>> { | |
if (!this.closed && this.autoDestroy && (this.closed = true)) { | |
return await this.reset()._reader.return(value); | |
} | |
return ITERATOR_DONE; | |
} | |
public async next() { | |
if (this.closed) { return ITERATOR_DONE; } | |
let message: Message | null; | |
const { _reader: reader } = this; | |
while (message = await this._readNextMessageAndValidate()) { | |
if (message.isSchema()) { | |
await this.reset(message.header()); | |
} else if (message.isRecordBatch()) { | |
this._recordBatchIndex++; | |
const header = message.header(); | |
const buffer = await reader.readMessageBody(message.bodyLength); | |
const recordBatch = this._loadRecordBatch(header, buffer); | |
return { done: false, value: recordBatch }; | |
} else if (message.isDictionaryBatch()) { | |
this._dictionaryIndex++; | |
const header = message.header(); | |
const buffer = await reader.readMessageBody(message.bodyLength); | |
const vector = this._loadDictionaryBatch(header, buffer); | |
this.dictionaries.set(header.id, vector); | |
} | |
} | |
if (this.schema && this._recordBatchIndex === 0) { | |
this._recordBatchIndex++; | |
return { done: false, value: new _InternalEmptyPlaceholderRecordBatch<T>(this.schema) }; | |
} | |
return await this.return(); | |
} | |
protected async _readNextMessageAndValidate<T extends MessageHeader>(type?: T | null) { | |
return await this._reader.readMessage<T>(type); | |
} | |
} | |
/** @ignore */ | |
class RecordBatchFileReaderImpl<T extends TypeMap = any> extends RecordBatchStreamReaderImpl<T> { | |
protected _footer?: Footer; | |
declare protected _handle: RandomAccessFile; | |
public get footer() { return this._footer!; } | |
public get numDictionaries() { return this._footer ? this._footer.numDictionaries : 0; } | |
public get numRecordBatches() { return this._footer ? this._footer.numRecordBatches : 0; } | |
constructor(source: RandomAccessFile | ArrayBufferViewInput, dictionaries?: Map<number, Vector>) { | |
super(source instanceof RandomAccessFile ? source : new RandomAccessFile(source), dictionaries); | |
} | |
public isSync(): this is RecordBatchReaders<T> { return true; } | |
public isFile(): this is RecordBatchFileReaders<T> { return true; } | |
public open(options?: OpenOptions) { | |
if (!this.closed && !this._footer) { | |
this.schema = (this._footer = this._readFooter()).schema; | |
for (const block of this._footer.dictionaryBatches()) { | |
block && this._readDictionaryBatch(this._dictionaryIndex++); | |
} | |
} | |
return super.open(options); | |
} | |
public readRecordBatch(index: number) { | |
if (this.closed) { return null; } | |
if (!this._footer) { this.open(); } | |
const block = this._footer?.getRecordBatch(index); | |
if (block && this._handle.seek(block.offset)) { | |
const message = this._reader.readMessage(MessageHeader.RecordBatch); | |
if (message?.isRecordBatch()) { | |
const header = message.header(); | |
const buffer = this._reader.readMessageBody(message.bodyLength); | |
const recordBatch = this._loadRecordBatch(header, buffer); | |
return recordBatch; | |
} | |
} | |
return null; | |
} | |
protected _readDictionaryBatch(index: number) { | |
const block = this._footer?.getDictionaryBatch(index); | |
if (block && this._handle.seek(block.offset)) { | |
const message = this._reader.readMessage(MessageHeader.DictionaryBatch); | |
if (message?.isDictionaryBatch()) { | |
const header = message.header(); | |
const buffer = this._reader.readMessageBody(message.bodyLength); | |
const vector = this._loadDictionaryBatch(header, buffer); | |
this.dictionaries.set(header.id, vector); | |
} | |
} | |
} | |
protected _readFooter() { | |
const { _handle } = this; | |
const offset = _handle.size - magicAndPadding; | |
const length = _handle.readInt32(offset); | |
const buffer = _handle.readAt(offset - length, length); | |
return Footer.decode(buffer); | |
} | |
protected _readNextMessageAndValidate<T extends MessageHeader>(type?: T | null): Message<T> | null { | |
if (!this._footer) { this.open(); } | |
if (this._footer && this._recordBatchIndex < this.numRecordBatches) { | |
const block = this._footer?.getRecordBatch(this._recordBatchIndex); | |
if (block && this._handle.seek(block.offset)) { | |
return this._reader.readMessage(type); | |
} | |
} | |
return null; | |
} | |
} | |
/** @ignore */ | |
class AsyncRecordBatchFileReaderImpl<T extends TypeMap = any> extends AsyncRecordBatchStreamReaderImpl<T> | |
implements AsyncRecordBatchFileReaderImpl<T> { | |
protected _footer?: Footer; | |
declare protected _handle: AsyncRandomAccessFile; | |
public get footer() { return this._footer!; } | |
public get numDictionaries() { return this._footer ? this._footer.numDictionaries : 0; } | |
public get numRecordBatches() { return this._footer ? this._footer.numRecordBatches : 0; } | |
constructor(source: FileHandle, byteLength?: number, dictionaries?: Map<number, Vector>); | |
constructor(source: FileHandle | AsyncRandomAccessFile, dictionaries?: Map<number, Vector>); | |
constructor(source: FileHandle | AsyncRandomAccessFile, ...rest: any[]) { | |
const byteLength = typeof rest[0] !== 'number' ? <number>rest.shift() : undefined; | |
const dictionaries = rest[0] instanceof Map ? <Map<number, Vector>>rest.shift() : undefined; | |
super(source instanceof AsyncRandomAccessFile ? source : new AsyncRandomAccessFile(source, byteLength), dictionaries); | |
} | |
public isFile(): this is RecordBatchFileReaders<T> { return true; } | |
public isAsync(): this is AsyncRecordBatchReaders<T> { return true; } | |
public async open(options?: OpenOptions) { | |
if (!this.closed && !this._footer) { | |
this.schema = (this._footer = await this._readFooter()).schema; | |
for (const block of this._footer.dictionaryBatches()) { | |
block && await this._readDictionaryBatch(this._dictionaryIndex++); | |
} | |
} | |
return await super.open(options); | |
} | |
public async readRecordBatch(index: number) { | |
if (this.closed) { return null; } | |
if (!this._footer) { await this.open(); } | |
const block = this._footer?.getRecordBatch(index); | |
if (block && (await this._handle.seek(block.offset))) { | |
const message = await this._reader.readMessage(MessageHeader.RecordBatch); | |
if (message?.isRecordBatch()) { | |
const header = message.header(); | |
const buffer = await this._reader.readMessageBody(message.bodyLength); | |
const recordBatch = this._loadRecordBatch(header, buffer); | |
return recordBatch; | |
} | |
} | |
return null; | |
} | |
protected async _readDictionaryBatch(index: number) { | |
const block = this._footer?.getDictionaryBatch(index); | |
if (block && (await this._handle.seek(block.offset))) { | |
const message = await this._reader.readMessage(MessageHeader.DictionaryBatch); | |
if (message?.isDictionaryBatch()) { | |
const header = message.header(); | |
const buffer = await this._reader.readMessageBody(message.bodyLength); | |
const vector = this._loadDictionaryBatch(header, buffer); | |
this.dictionaries.set(header.id, vector); | |
} | |
} | |
} | |
protected async _readFooter() { | |
const { _handle } = this; | |
_handle._pending && await _handle._pending; | |
const offset = _handle.size - magicAndPadding; | |
const length = await _handle.readInt32(offset); | |
const buffer = await _handle.readAt(offset - length, length); | |
return Footer.decode(buffer); | |
} | |
protected async _readNextMessageAndValidate<T extends MessageHeader>(type?: T | null): Promise<Message<T> | null> { | |
if (!this._footer) { await this.open(); } | |
if (this._footer && this._recordBatchIndex < this.numRecordBatches) { | |
const block = this._footer.getRecordBatch(this._recordBatchIndex); | |
if (block && await this._handle.seek(block.offset)) { | |
return await this._reader.readMessage(type); | |
} | |
} | |
return null; | |
} | |
} | |
/** @ignore */ | |
class RecordBatchJSONReaderImpl<T extends TypeMap = any> extends RecordBatchStreamReaderImpl<T> { | |
constructor(source: ArrowJSONLike, dictionaries?: Map<number, Vector>) { | |
super(source, dictionaries); | |
} | |
protected _loadVectors(header: metadata.RecordBatch, body: any, types: (Field | DataType)[]) { | |
return new JSONVectorLoader(body, header.nodes, header.buffers, this.dictionaries).visitMany(types); | |
} | |
} | |
// | |
// Define some helper functions and static implementations down here. There's | |
// a bit of branching in the static methods that can lead to the same routines | |
// being executed, so we've broken those out here for readability. | |
// | |
/** @ignore */ | |
function shouldAutoDestroy(self: { autoDestroy: boolean }, options?: OpenOptions) { | |
return options && (typeof options['autoDestroy'] === 'boolean') ? options['autoDestroy'] : self['autoDestroy']; | |
} | |
/** @ignore */ | |
function* readAllSync<T extends TypeMap = any>(source: RecordBatchReaders<T> | FromArg0 | FromArg2) { | |
const reader = RecordBatchReader.from<T>(<any>source) as RecordBatchReaders<T>; | |
try { | |
if (!reader.open({ autoDestroy: false }).closed) { | |
do { yield reader; } while (!(reader.reset().open()).closed); | |
} | |
} finally { reader.cancel(); } | |
} | |
/** @ignore */ | |
async function* readAllAsync<T extends TypeMap = any>(source: AsyncRecordBatchReaders<T> | FromArg1 | FromArg3 | FromArg4 | FromArg5) { | |
const reader = await RecordBatchReader.from<T>(<any>source) as RecordBatchReader<T>; | |
try { | |
if (!(await reader.open({ autoDestroy: false })).closed) { | |
do { yield reader; } while (!(await reader.reset().open()).closed); | |
} | |
} finally { await reader.cancel(); } | |
} | |
/** @ignore */ | |
function fromArrowJSON<T extends TypeMap>(source: ArrowJSONLike) { | |
return new RecordBatchStreamReader(new RecordBatchJSONReaderImpl<T>(source)); | |
} | |
/** @ignore */ | |
function fromByteStream<T extends TypeMap>(source: ByteStream) { | |
const bytes = source.peek((magicLength + 7) & ~7); | |
return bytes && bytes.byteLength >= 4 ? !checkForMagicArrowString(bytes) | |
? new RecordBatchStreamReader(new RecordBatchStreamReaderImpl<T>(source)) | |
: new RecordBatchFileReader(new RecordBatchFileReaderImpl<T>(source.read())) | |
: new RecordBatchStreamReader(new RecordBatchStreamReaderImpl<T>(function* (): any { }())); | |
} | |
/** @ignore */ | |
async function fromAsyncByteStream<T extends TypeMap>(source: AsyncByteStream) { | |
const bytes = await source.peek((magicLength + 7) & ~7); | |
return bytes && bytes.byteLength >= 4 ? !checkForMagicArrowString(bytes) | |
? new AsyncRecordBatchStreamReader(new AsyncRecordBatchStreamReaderImpl<T>(source)) | |
: new RecordBatchFileReader(new RecordBatchFileReaderImpl<T>(await source.read())) | |
: new AsyncRecordBatchStreamReader(new AsyncRecordBatchStreamReaderImpl<T>(async function* (): any { }())); | |
} | |
/** @ignore */ | |
async function fromFileHandle<T extends TypeMap>(source: FileHandle) { | |
const { size } = await source.stat(); | |
const file = new AsyncRandomAccessFile(source, size); | |
if (size >= magicX2AndPadding && checkForMagicArrowString(await file.readAt(0, (magicLength + 7) & ~7))) { | |
return new AsyncRecordBatchFileReader(new AsyncRecordBatchFileReaderImpl<T>(file)); | |
} | |
return new AsyncRecordBatchStreamReader(new AsyncRecordBatchStreamReaderImpl<T>(file)); | |
} | |