Spaces:
Running
Running
// Licensed to the Apache Software Foundation (ASF) under one | |
// or more contributor license agreements. See the NOTICE file | |
// distributed with this work for additional information | |
// regarding copyright ownership. The ASF licenses this file | |
// to you under the Apache License, Version 2.0 (the | |
// "License"); you may not use this file except in compliance | |
// with the License. You may obtain a copy of the License at | |
// | |
// http://www.apache.org/licenses/LICENSE-2.0 | |
// | |
// Unless required by applicable law or agreed to in writing, | |
// software distributed under the License is distributed on an | |
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
// KIND, either express or implied. See the License for the | |
// specific language governing permissions and limitations | |
// under the License. | |
/* eslint-disable unicorn/no-array-for-each */ | |
import * as fs from 'fs'; | |
import * as stream from 'stream'; | |
import { valueToString } from '../util/pretty.js'; | |
import { Schema, RecordBatch, RecordBatchReader, AsyncByteQueue } from '../Arrow.node.js'; | |
import commandLineUsage from 'command-line-usage'; | |
import commandLineArgs from 'command-line-args'; | |
import padLeft from 'pad-left'; | |
// @ts-ignore | |
import { parse as bignumJSONParse } from 'json-bignum'; | |
const argv = commandLineArgs(cliOpts(), { partial: true }); | |
const files = argv.help ? [] : [...(argv.file || []), ...(argv._unknown || [])].filter(Boolean); | |
const state = { ...argv, closed: false, maxColWidths: [10] }; | |
type ToStringState = { | |
hr?: string; | |
sep?: string; | |
schema?: any; | |
closed?: boolean; | |
metadata?: boolean; | |
maxColWidths: number[]; | |
}; | |
(async () => { | |
const sources = argv.help ? [] : [ | |
...files.map((file) => () => fs.createReadStream(file)), | |
...(process.stdin.isTTY ? [] : [() => process.stdin]) | |
].filter(Boolean) as (() => NodeJS.ReadableStream)[]; | |
let reader: RecordBatchReader | null; | |
let hasReaders = false; | |
for (const source of sources) { | |
if (state.closed) { break; } | |
for await (reader of recordBatchReaders(source)) { | |
hasReaders = true; | |
const transformToString = batchesToString(state, reader.schema); | |
await pipeTo( | |
reader.pipe(transformToString), | |
process.stdout, { end: false } | |
).catch(() => state.closed = true); // Handle EPIPE errors | |
} | |
if (state.closed) { break; } | |
} | |
return hasReaders ? 0 : print_usage(); | |
})() | |
.then((x) => +x || 0, (err) => { | |
if (err) { | |
console.error(`${err?.stack || err}`); | |
} | |
return process.exitCode || 1; | |
}).then((code) => process.exit(code)); | |
function pipeTo(source: NodeJS.ReadableStream, sink: NodeJS.WritableStream, opts?: { end: boolean }) { | |
return new Promise((resolve, reject) => { | |
source.on('end', onEnd).pipe(sink, opts).on('error', onErr); | |
function onEnd() { done(undefined, resolve); } | |
function onErr(err: any) { done(err, reject); } | |
function done(e: any, cb: (e?: any) => void) { | |
source.removeListener('end', onEnd); | |
sink.removeListener('error', onErr); | |
cb(e); | |
} | |
}); | |
} | |
async function* recordBatchReaders(createSourceStream: () => NodeJS.ReadableStream): AsyncGenerator<RecordBatchReader, void, void> { | |
const json = new AsyncByteQueue(); | |
const stream = new AsyncByteQueue(); | |
const source = createSourceStream(); | |
let reader: RecordBatchReader | null = null; | |
let readers: AsyncIterable<RecordBatchReader> | null = null; | |
// tee the input source, just in case it's JSON | |
source.on('end', () => [stream, json].forEach((y) => y.close())) | |
.on('data', (x) => [stream, json].forEach((y) => y.write(x))) | |
.on('error', (e) => [stream, json].forEach((y) => y.abort(e))); | |
try { | |
for await (reader of RecordBatchReader.readAll(stream)) { | |
reader && (yield reader); | |
} | |
if (reader) return; | |
} catch { readers = null; } | |
if (!readers) { | |
await json.closed; | |
if (source instanceof fs.ReadStream) { source.close(); } | |
// If the data in the `json` ByteQueue parses to JSON, then assume it's Arrow JSON from a file or stdin | |
try { | |
for await (reader of RecordBatchReader.readAll(bignumJSONParse(await json.toString()))) { | |
reader && (yield reader); | |
} | |
} catch { readers = null; } | |
} | |
} | |
function batchesToString(state: ToStringState, schema: Schema) { | |
let rowId = 0; | |
let batchId = -1; | |
let maxColWidths = [10]; | |
const { hr, sep, metadata } = state; | |
const header = ['row_id', ...schema.fields.map((f) => `${f}`)].map(val => valueToString(val)); | |
state.maxColWidths = header.map((x, i) => Math.max(maxColWidths[i] || 0, x.length)); | |
return new stream.Transform({ | |
encoding: 'utf8', | |
writableObjectMode: true, | |
readableObjectMode: false, | |
final(cb: (error?: Error | null) => void) { | |
// if there were no batches, then print the Schema, and metadata | |
if (batchId === -1) { | |
hr && this.push(`${horizontalRule(state.maxColWidths, hr, sep)}\n\n`); | |
this.push(`${formatRow(header, maxColWidths, sep)}\n`); | |
if (metadata && schema.metadata.size > 0) { | |
this.push(`metadata:\n${formatMetadata(schema.metadata)}\n`); | |
} | |
} | |
hr && this.push(`${horizontalRule(state.maxColWidths, hr, sep)}\n\n`); | |
cb(); | |
}, | |
transform(batch: RecordBatch, _enc: string, cb: (error?: Error, data?: any) => void) { | |
batch = !state.schema?.length ? batch : batch.select(state.schema); | |
if (state.closed) { return cb(undefined, null); } | |
// Pass one to convert to strings and count max column widths | |
state.maxColWidths = measureColumnWidths(rowId, batch, header.map((x, i) => Math.max(maxColWidths[i] || 0, x.length))); | |
// If this is the first batch in a stream, print a top horizontal rule, schema metadata, and | |
if (++batchId === 0) { | |
hr && this.push(`${horizontalRule(state.maxColWidths, hr, sep)}\n`); | |
if (metadata && batch.schema.metadata.size > 0) { | |
this.push(`metadata:\n${formatMetadata(batch.schema.metadata)}\n`); | |
hr && this.push(`${horizontalRule(state.maxColWidths, hr, sep)}\n`); | |
} | |
if (batch.numRows <= 0 || batch.numCols <= 0) { | |
this.push(`${formatRow(header, maxColWidths = state.maxColWidths, sep)}\n`); | |
} | |
} | |
if (batch.numRows > 0 && batch.numCols > 0) { | |
// If any of the column widths changed, print the header again | |
if (rowId % 350 !== 0 && JSON.stringify(state.maxColWidths) !== JSON.stringify(maxColWidths)) { | |
this.push(`${formatRow(header, state.maxColWidths, sep)}\n`); | |
} | |
maxColWidths = state.maxColWidths; | |
for (const row of batch) { | |
if (state.closed) { break; } else if (!row) { continue; } | |
if (rowId % 350 === 0) { | |
this.push(`${formatRow(header, maxColWidths, sep)}\n`); | |
} | |
this.push(`${formatRow([rowId++, ...row.toArray()].map(v => valueToString(v)), maxColWidths, sep)}\n`); | |
} | |
} | |
cb(); | |
} | |
}); | |
} | |
function horizontalRule(maxColWidths: number[], hr = '', sep = ' | ') { | |
return ` ${padLeft('', maxColWidths.reduce((x, y) => x + y, -2 + maxColWidths.length * sep.length), hr)}`; | |
} | |
function formatRow(row: string[] = [], maxColWidths: number[] = [], sep = ' | ') { | |
return `${row.map((x, j) => padLeft(x, maxColWidths[j])).join(sep)}`; | |
} | |
function formatMetadataValue(value = '') { | |
let parsed = value; | |
try { | |
parsed = JSON.stringify(JSON.parse(value), null, 2); | |
} catch { parsed = value; } | |
return valueToString(parsed).split('\n').join('\n '); | |
} | |
function formatMetadata(metadata: Map<string, string>) { | |
return [...metadata].map(([key, val]) => | |
` ${key}: ${formatMetadataValue(val)}` | |
).join(', \n'); | |
} | |
function measureColumnWidths(rowId: number, batch: RecordBatch, maxColWidths: number[] = []) { | |
let val: any, j = 0; | |
for (const row of batch) { | |
if (!row) { continue; } | |
maxColWidths[j = 0] = Math.max(maxColWidths[0] || 0, (`${rowId++}`).length); | |
for (val of row) { | |
if (val && typedArrayElementWidths.has(val.constructor) && (typeof val[Symbol.toPrimitive] !== 'function')) { | |
// If we're printing a column of TypedArrays, ensure the column is wide enough to accommodate | |
// the widest possible element for a given byte size, since JS omits leading zeroes. For example: | |
// 1 | [1137743649,2170567488,244696391,2122556476] | |
// 2 | null | |
// 3 | [637174007,2142281880,961736230,2912449282] | |
// 4 | [1035112265,21832886,412842672,2207710517] | |
// 5 | null | |
// 6 | null | |
// 7 | [2755142991,4192423256,2994359,467878370] | |
const elementWidth = typedArrayElementWidths.get(val.constructor)!; | |
maxColWidths[j + 1] = Math.max(maxColWidths[j + 1] || 0, | |
2 + // brackets on each end | |
(val.length - 1) + // commas between elements | |
(val.length * elementWidth) // width of stringified 2^N-1 | |
); | |
} else { | |
maxColWidths[j + 1] = Math.max(maxColWidths[j + 1] || 0, valueToString(val).length); | |
} | |
++j; | |
} | |
} | |
return maxColWidths; | |
} | |
// Measure the stringified representation of 2^N-1 for each TypedArray variant | |
const typedArrayElementWidths = (() => { | |
const maxElementWidth = (ArrayType: any) => { | |
const octets = Array.from({ length: ArrayType.BYTES_PER_ELEMENT - 1 }, _ => 255); | |
return `${new ArrayType(new Uint8Array([...octets, 254]).buffer)[0]}`.length; | |
}; | |
return new Map<any, number>([ | |
[Int8Array, maxElementWidth(Int8Array)], | |
[Int16Array, maxElementWidth(Int16Array)], | |
[Int32Array, maxElementWidth(Int32Array)], | |
[Uint8Array, maxElementWidth(Uint8Array)], | |
[Uint16Array, maxElementWidth(Uint16Array)], | |
[Uint32Array, maxElementWidth(Uint32Array)], | |
[Float32Array, maxElementWidth(Float32Array)], | |
[Float64Array, maxElementWidth(Float64Array)], | |
[Uint8ClampedArray, maxElementWidth(Uint8ClampedArray)] | |
]); | |
})(); | |
function cliOpts() { | |
return [ | |
{ | |
type: String, | |
name: 'schema', alias: 's', | |
optional: true, multiple: true, | |
typeLabel: '{underline columns}', | |
description: 'A space-delimited list of column names' | |
}, | |
{ | |
type: String, | |
name: 'file', alias: 'f', | |
optional: true, multiple: true, | |
description: 'The Arrow file to read' | |
}, | |
{ | |
type: String, | |
name: 'sep', optional: true, default: ' | ', | |
description: 'The column separator character (default: " | ")' | |
}, | |
{ | |
type: String, | |
name: 'hr', optional: true, default: '', | |
description: 'The horizontal border character (default: "")' | |
}, | |
{ | |
type: Boolean, | |
name: 'metadata', alias: 'm', | |
optional: true, default: false, | |
description: 'Flag to print Schema metadata (default: false)' | |
}, | |
{ | |
type: Boolean, | |
name: 'help', optional: true, default: false, | |
description: 'Print this usage guide.' | |
} | |
]; | |
} | |
function print_usage() { | |
console.log(commandLineUsage([ | |
{ | |
header: 'arrow2csv', | |
content: 'Print a CSV from an Arrow file' | |
}, | |
{ | |
header: 'Synopsis', | |
content: [ | |
'$ arrow2csv {underline file.arrow} [{bold --schema} column_name ...]', | |
'$ arrow2csv [{bold --schema} column_name ...] [{bold --file} {underline file.arrow}]', | |
'$ arrow2csv {bold -s} column_1 {bold -s} column_2 [{bold -f} {underline file.arrow}]', | |
'$ arrow2csv [{bold --help}]' | |
] | |
}, | |
{ | |
header: 'Options', | |
optionList: cliOpts() | |
}, | |
{ | |
header: 'Example', | |
content: [ | |
'$ arrow2csv --schema foo baz --sep " , " -f simple.arrow', | |
'> "row_id", "foo: Int32", "baz: Utf8"', | |
'> 0, 1, "aa"', | |
'> 1, null, null', | |
'> 2, 3, null', | |
'> 3, 4, "bbb"', | |
'> 4, 5, "cccc"', | |
] | |
} | |
])); | |
return 1; | |
} | |