Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split compression from encodings #422

Merged
merged 5 commits into from
Jul 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 29 additions & 9 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ members = [
"vortex-expr",
"vortex-flatbuffers",
"vortex-ipc",
"vortex-sampling-compressor",
]
resolver = "2"

Expand Down Expand Up @@ -57,9 +58,8 @@ datafusion-execution = "39.0.0"
datafusion-expr = "39.0.0"
datafusion-physical-expr = "39.0.0"
datafusion-physical-plan = "39.0.0"
derive_builder = "0.20.0"
divan = "0.1.14"
duckdb = { version = "0.10.1", features = ["bundled"] }
duckdb = { version = "0.10.2", features = ["bundled"] }
enum-iterator = "2.0.0"
enum_dispatch = "0.3.13"
fallible-iterator = "0.3.0"
Expand Down
5 changes: 3 additions & 2 deletions bench-vortex/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ csv = { workspace = true }
datafusion = { workspace = true }
enum-iterator = { workspace = true }
flexbuffers = { workspace = true }
futures = { workspace = true }
futures = { workspace = true, features = ["executor"] }
humansize = { workspace = true }
itertools = { workspace = true }
lazy_static = { workspace = true }
Expand All @@ -47,8 +47,9 @@ vortex-dtype = { path = "../vortex-dtype" }
vortex-error = { path = "../vortex-error", features = ["parquet"] }
vortex-fastlanes = { path = "../encodings/fastlanes" }
vortex-ipc = { path = "../vortex-ipc" }
vortex-ree = { path = "../encodings/runend" }
vortex-roaring = { path = "../encodings/roaring" }
vortex-runend = { path = "../encodings/runend" }
vortex-sampling-compressor = { path = "../vortex-sampling-compressor" }

[dev-dependencies]
criterion = { workspace = true, features = ["html_reports", "async_tokio"] }
Expand Down
25 changes: 21 additions & 4 deletions bench-vortex/benches/datafusion_benchmark.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::collections::HashSet;
use std::sync::Arc;

use arrow_array::builder::{StringBuilder, UInt32Builder};
Expand All @@ -11,22 +12,38 @@ use datafusion::execution::memory_pool::human_readable_size;
use datafusion::logical_expr::lit;
use datafusion::prelude::{col, count_distinct, DataFrame, SessionContext};
use lazy_static::lazy_static;
use vortex::compress::Compressor;
use vortex::compress::CompressionStrategy;
use vortex::encoding::EncodingRef;
use vortex::{Array, Context, IntoArray, ToArrayData};
use vortex_datafusion::{VortexMemTable, VortexMemTableOptions};
use vortex_dict::DictEncoding;
use vortex_fastlanes::{BitPackedEncoding, DeltaEncoding, FoREncoding};
use vortex_sampling_compressor::compressors::bitpacked::BitPackedCompressor;
use vortex_sampling_compressor::compressors::delta::DeltaCompressor;
use vortex_sampling_compressor::compressors::dict::DictCompressor;
use vortex_sampling_compressor::compressors::r#for::FoRCompressor;
use vortex_sampling_compressor::compressors::CompressorRef;
use vortex_sampling_compressor::SamplingCompressor;

lazy_static! {
pub static ref CTX: Context = Context::default().with_encodings([
&BitPackedEncoding as EncodingRef,
&DictEncoding,
&FoREncoding,
&DeltaEncoding,
&DeltaEncoding
]);
}

lazy_static! {
pub static ref COMPRESSORS: HashSet<CompressorRef<'static>> = [
&BitPackedCompressor as CompressorRef<'static>,
&DictCompressor,
&FoRCompressor,
&DeltaCompressor
]
.into();
}

fn toy_dataset_arrow() -> RecordBatch {
// 64,000 rows of string and numeric data.
// 8,000 values of first string, second string, third string, etc.
Expand Down Expand Up @@ -73,8 +90,8 @@ fn toy_dataset_vortex(compress: bool) -> Array {
"uncompressed size: {:?}",
human_readable_size(uncompressed.nbytes())
);
let compressor = Compressor::new(&CTX);
let compressed = compressor.compress(&uncompressed, None).unwrap();
let compressor: &dyn CompressionStrategy = &SamplingCompressor::new(COMPRESSORS.clone());
let compressed = compressor.compress(&uncompressed).unwrap();
println!(
"vortex compressed size: {:?}",
human_readable_size(compressed.nbytes())
Expand Down
42 changes: 35 additions & 7 deletions bench-vortex/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#![feature(exit_status_error)]

use std::collections::HashSet;
use std::env::temp_dir;
use std::fs::{create_dir_all, File};
use std::path::{Path, PathBuf};
Expand All @@ -14,16 +15,26 @@ use parquet::arrow::ProjectionMask;
use simplelog::{ColorChoice, Config, TermLogger, TerminalMode};
use vortex::array::chunked::ChunkedArray;
use vortex::arrow::FromArrowType;
use vortex::compress::Compressor;
use vortex::compress::CompressionStrategy;
use vortex::encoding::EncodingRef;
use vortex::{Array, Context, IntoArray, ToArrayData};
use vortex_alp::ALPEncoding;
use vortex_datetime_parts::DateTimePartsEncoding;
use vortex_dict::DictEncoding;
use vortex_dtype::DType;
use vortex_fastlanes::{BitPackedEncoding, FoREncoding};
use vortex_ree::REEEncoding;
use vortex_roaring::RoaringBoolEncoding;
use vortex_runend::RunEndEncoding;
use vortex_sampling_compressor::compressors::alp::ALPCompressor;
use vortex_sampling_compressor::compressors::bitpacked::BitPackedCompressor;
use vortex_sampling_compressor::compressors::dict::DictCompressor;
use vortex_sampling_compressor::compressors::localdatetime::DateTimePartsCompressor;
use vortex_sampling_compressor::compressors::r#for::FoRCompressor;
use vortex_sampling_compressor::compressors::roaring_bool::RoaringBoolCompressor;
use vortex_sampling_compressor::compressors::runend::DEFAULT_RUN_END_COMPRESSOR;
use vortex_sampling_compressor::compressors::sparse::SparseCompressor;
use vortex_sampling_compressor::compressors::CompressorRef;
use vortex_sampling_compressor::SamplingCompressor;

use crate::data_downloads::FileType;
use crate::reader::BATCH_SIZE;
Expand All @@ -44,14 +55,28 @@ lazy_static! {
&FoREncoding,
&DateTimePartsEncoding,
// &DeltaEncoding, Blows up the search space too much.
&REEEncoding,
&RunEndEncoding,
&RoaringBoolEncoding,
// &RoaringIntEncoding,
// Doesn't offer anything more than FoR really
// &ZigZagEncoding,
]);
}

lazy_static! {
pub static ref COMPRESSORS: HashSet<CompressorRef<'static>> = [
&ALPCompressor as CompressorRef<'static>,
&DictCompressor,
&BitPackedCompressor,
&FoRCompressor,
&DateTimePartsCompressor,
&DEFAULT_RUN_END_COMPRESSOR,
&RoaringBoolCompressor,
&SparseCompressor
]
.into();
}

/// Creates a file if it doesn't already exist.
/// NB: Does NOT modify the given path to ensure that it resides in the data directory.
pub fn idempotent<T, E, P: IdempotentPath + ?Sized>(
Expand Down Expand Up @@ -139,13 +164,14 @@ pub fn compress_taxi_data() -> Array {

let schema = reader.schema();
let mut uncompressed_size: usize = 0;
let compressor: &dyn CompressionStrategy = &SamplingCompressor::new(COMPRESSORS.clone());
let chunks = reader
.into_iter()
.map(|batch_result| batch_result.unwrap())
.map(|batch| batch.to_array_data().into_array())
.map(|array| {
uncompressed_size += array.nbytes();
Compressor::new(&CTX).compress(&array, None).unwrap()
compressor.compress(&array).unwrap()
})
.collect_vec();

Expand Down Expand Up @@ -215,11 +241,12 @@ mod test {
use log::LevelFilter;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use vortex::arrow::FromArrowArray;
use vortex::compress::Compressor;
use vortex::compress::CompressionStrategy;
use vortex::{ArrayData, IntoArray, IntoCanonical};
use vortex_sampling_compressor::SamplingCompressor;

use crate::taxi_data::taxi_data_parquet;
use crate::{compress_taxi_data, setup_logger, CTX};
use crate::{compress_taxi_data, setup_logger, COMPRESSORS};

#[ignore]
#[test]
Expand Down Expand Up @@ -252,13 +279,14 @@ mod test {
let file = File::open(taxi_data_parquet()).unwrap();
let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
let reader = builder.with_limit(1).build().unwrap();
let compressor: &dyn CompressionStrategy = &SamplingCompressor::new(COMPRESSORS.clone());

for record_batch in reader.map(|batch_result| batch_result.unwrap()) {
let struct_arrow: ArrowStructArray = record_batch.into();
let arrow_array: ArrowArrayRef = Arc::new(struct_arrow);
let vortex_array = ArrayData::from_arrow(arrow_array.clone(), false).into_array();

let compressed = Compressor::new(&CTX).compress(&vortex_array, None).unwrap();
let compressed = compressor.compress(&vortex_array).unwrap();
let compressed_as_arrow = compressed.into_canonical().unwrap().into_arrow();
assert_eq!(compressed_as_arrow.deref(), arrow_array.deref());
}
Expand Down
8 changes: 5 additions & 3 deletions bench-vortex/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use tokio::runtime::Runtime;
use vortex::array::chunked::ChunkedArray;
use vortex::array::primitive::PrimitiveArray;
use vortex::arrow::FromArrowType;
use vortex::compress::Compressor;
use vortex::compress::CompressionStrategy;
use vortex::stream::ArrayStreamExt;
use vortex::{Array, IntoArray, IntoCanonical, ToArrayData, ViewContext};
use vortex_buffer::Buffer;
Expand All @@ -32,8 +32,9 @@ use vortex_ipc::chunked_reader::ChunkedArrayReader;
use vortex_ipc::io::{TokioAdapter, VortexWrite};
use vortex_ipc::writer::ArrayWriter;
use vortex_ipc::MessageReader;
use vortex_sampling_compressor::SamplingCompressor;

use crate::CTX;
use crate::{COMPRESSORS, CTX};

pub const BATCH_SIZE: usize = 65_536;

Expand Down Expand Up @@ -106,11 +107,12 @@ pub fn compress_parquet_to_vortex(parquet_path: &Path) -> VortexResult<ChunkedAr

let dtype = DType::from_arrow(reader.schema());

let compressor: &dyn CompressionStrategy = &SamplingCompressor::new(COMPRESSORS.clone());
let chunks = reader
.map(|batch_result| batch_result.unwrap())
.map(|record_batch| {
let vortex_array = record_batch.to_array_data().into_array();
Compressor::new(&CTX).compress(&vortex_array, None).unwrap()
compressor.compress(&vortex_array).unwrap()
})
.collect_vec();
ChunkedArray::try_new(chunks, dtype)
Expand Down
10 changes: 5 additions & 5 deletions encodings/alp/src/alp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use serde::{Deserialize, Serialize};

const SAMPLE_SIZE: usize = 32;

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Debug, Copy, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct Exponents {
pub e: u8,
pub f: u8,
Expand Down Expand Up @@ -48,7 +48,7 @@ pub trait ALPFloat: Float + 'static {
for f in 0..e {
let (_, encoded, exc_pos, exc_patches) = Self::encode(
sample.as_deref().unwrap_or(values),
Some(&Exponents { e, f }),
Some(Exponents { e, f }),
);
let size =
(encoded.len() + exc_patches.len()) * size_of::<Self>() + (exc_pos.len() * 4);
Expand All @@ -66,9 +66,9 @@ pub trait ALPFloat: Float + 'static {

fn encode(
values: &[Self],
exponents: Option<&Exponents>,
exponents: Option<Exponents>,
) -> (Exponents, Vec<Self::ALPInt>, Vec<u64>, Vec<Self>) {
let exp = exponents.map_or_else(|| Self::find_best_exponents(values), Exponents::clone);
let exp = exponents.unwrap_or_else(|| Self::find_best_exponents(values));

let mut exc_pos = Vec::new();
let mut exc_value = Vec::new();
Expand Down Expand Up @@ -111,7 +111,7 @@ pub trait ALPFloat: Float + 'static {
}

#[inline]
fn decode_single(encoded: Self::ALPInt, exponents: &Exponents) -> Self {
fn decode_single(encoded: Self::ALPInt, exponents: Exponents) -> Self {
let encoded_float: Self = Self::from(encoded).unwrap();
encoded_float * Self::F10[exponents.f as usize] * Self::IF10[exponents.e as usize]
}
Expand Down
4 changes: 2 additions & 2 deletions encodings/alp/src/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ impl ALPArray {
}

#[inline]
pub fn exponents(&self) -> &Exponents {
&self.metadata().exponents
pub fn exponents(&self) -> Exponents {
self.metadata().exponents
}

pub fn patches(&self) -> Option<Array> {
Expand Down
Loading
Loading