Skip to content

Commit

Permalink
feat: async api
Browse files Browse the repository at this point in the history
  • Loading branch information
OpportunityLiu committed May 16, 2024
1 parent 2274b50 commit 6858d6b
Show file tree
Hide file tree
Showing 21 changed files with 5,527 additions and 4,014 deletions.
2 changes: 1 addition & 1 deletion .github/actions/install/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ runs:
- name: Install
uses: wyvox/action-setup-pnpm@v3
with:
node-version: ${{ inputs.node-version }}
node-version-file: ''
node-version: ${{ inputs.node-version }}
pnpm-version: ${{ inputs.pnpm-version }}
args: ${{ inputs.args }}
19 changes: 19 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
// 使用 IntelliSense 了解相关属性。
// 悬停以查看现有属性的描述。
// 欲了解更多信息,请访问: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "(Windows) 启动",
"type": "cppvsdbg",
"request": "launch",
"program": "node",
"args": ["test.js"],
"stopAtEntry": false,
"cwd": "${workspaceRoot}",
"environment": [],
"console": "integratedTerminal"
}
]
}
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ To explicitly use the wasm or the n-api version, import `@cloudpss/zstd/wasm` an
> [!NOTE]
> If you are using this library in a browser, the `Buffer` classes is replaced with `Uint8Array` classes.
#### `compress(input: BinaryData, level?: number): Buffer`
#### `compress(input: BinaryData, level?: number): Promise<Buffer>` `compressSync(input: BinaryData, level?: number): Buffer`

Compresses the input buffer with the given compression level (default: 4).

#### `decompress(input: BinaryData): Buffer`
#### `decompress(input: BinaryData): Promise<Buffer>` `decompressSync(input: BinaryData): Buffer`

Decompresses the input buffer.

Expand Down
8 changes: 4 additions & 4 deletions benchmark/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ async function main() {
}
console.log(`\u001B[1;33mFile: ${file.name} \tRaw: ${pb(file.content.length)}\u001B[0m`);
for (const level of [-10, -5, -1, 1, 2, 3, 4, 5, 6, 9, 15, 19, 22]) {
const [compressed, napiCompressTime] = time(() => napi.compress(file.content, level));
const [decompressed, napiDecompressTime] = time(() => napi.decompress(compressed));
const [compressed, napiCompressTime] = time(() => napi.compressSync(file.content, level));
const [decompressed, napiDecompressTime] = time(() => napi.decompressSync(compressed));
const [, napiSCompressTime] = await atime(async () => {
const stream = new napi.Compressor(level);
return await pipeline(Readable.from(file.content), stream, async (chunks) => {
Expand All @@ -93,8 +93,8 @@ async function main() {
return Buffer.concat(chunks2);
});
});
const [, wasmCompressTime] = time(() => wasm.compress(file.content, level));
const [, wasmDecompressTime] = time(() => wasm.decompress(compressed));
const [, wasmCompressTime] = time(() => wasm.compressSync(file.content, level));
const [, wasmDecompressTime] = time(() => wasm.decompressSync(compressed));
console.assert(
Buffer.compare(decompressed, file.content) === 0,
'Decompressed data does not match original',
Expand Down
195 changes: 177 additions & 18 deletions lib/binding.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ extern "C"
#include <napi.h>
#include <iostream>

#if 0
#define DEBUG(x) \
#ifdef DEBUG
#define DEBUG_LOG(x) \
(std::cout << x << std::endl)
#else
#define DEBUG(x)
#define DEBUG_LOG(x)
#endif

#define _THROW_IF_FAILED(cond, error, returns) \
Expand Down Expand Up @@ -51,6 +51,84 @@ Napi::Value compress(const Napi::CallbackInfo &info)
return Napi::Buffer<char>::Copy(env, outBuffer.Data(), codeOrSize);
}

class CompressWorker : public Napi::AsyncWorker
{
public:
CompressWorker(Napi::Function &callback, Napi::Buffer<char> &inBuffer, int level)
: Napi::AsyncWorker(callback), level(level),
inBufferPtr(inBuffer.Data()), inBufferSize(inBuffer.Length()), inBufferRef((Napi::Persistent(inBuffer.As<Napi::Object>()))),
outBufferSize(ZSTD_compressBound(inBufferSize)), outBuffer(std::make_unique<char[]>(outBufferSize)),
codeOrSize(0)
{
DEBUG_LOG("[CompressWorker] Constructor level: " << level << ", inBufferSize: " << inBufferSize << ", outBufferSize: " << outBufferSize);
}

void Execute() override
{
DEBUG_LOG("[CompressWorker] Execute");
auto compressCtx = ZSTD_createCCtx();
codeOrSize = ZSTD_compressCCtx(compressCtx,
outBuffer.get(), outBufferSize,
inBufferPtr, inBufferSize,
level);
ZSTD_freeCCtx(compressCtx);
DEBUG_LOG("[CompressWorker] codeOrSize: " << codeOrSize);
}

void OnOK() override
{
DEBUG_LOG("[CompressWorker] OnOK");
Napi::HandleScope scope(Env());
if (ZSTD_isError(codeOrSize))
{
Callback().Call({
Napi::String::New(Env(), ZSTD_getErrorName(codeOrSize)),
Env().Null(),
});
}
else
{
Callback().Call({
Env().Null(),
Napi::Buffer<char>::Copy(Env(), outBuffer.get(), codeOrSize),
});
}
}

~CompressWorker()
{
DEBUG_LOG("[~CompressWorker] Destructor");
}

private:
int level;

size_t inBufferSize;
char *inBufferPtr;
Napi::ObjectReference inBufferRef;

size_t outBufferSize;
std::unique_ptr<char[]> outBuffer;

size_t codeOrSize;
};

Napi::Value compress_async(const Napi::CallbackInfo &info)
{
Napi::Env env = info.Env();
THROW_TYPE_ERROR_IF_FAILED(info.Length() == 3, "Wrong number of arguments");
THROW_TYPE_ERROR_IF_FAILED(info[0].IsBuffer(), "Wrong argument 0");
THROW_TYPE_ERROR_IF_FAILED(info[1].IsNumber(), "Wrong argument 1");
THROW_TYPE_ERROR_IF_FAILED(info[2].IsFunction(), "Wrong argument 2");
auto inBuffer = info[0].As<Napi::Buffer<char>>();
auto level = info[1].As<Napi::Number>().Int32Value();
auto callback = info[2].As<Napi::Function>();
DEBUG_LOG("[compress_async] args checked");
auto worker = new CompressWorker(callback, inBuffer, level);
worker->Queue();
return env.Undefined();
}

static ZSTD_DCtx *const decompressCtx = ZSTD_createDCtx();
Napi::Value decompress(const Napi::CallbackInfo &info)
{
Expand All @@ -74,6 +152,85 @@ Napi::Value decompress(const Napi::CallbackInfo &info)
return Napi::Buffer<char>::Copy(env, outBuffer.Data(), code);
}

class DecompressWorker : public Napi::AsyncWorker
{
public:
DecompressWorker(Napi::Function &callback, Napi::Buffer<char> &inBuffer, size_t outSize)
: Napi::AsyncWorker(callback),
inBufferPtr(inBuffer.Data()), inBufferSize(inBuffer.Length()), inBufferRef((Napi::Persistent(inBuffer.As<Napi::Object>()))),
outBufferSize(outSize), outBuffer(std::make_unique<char[]>(outBufferSize)),
codeOrSize(0)
{
DEBUG_LOG("[DecompressWorker] Constructor inBufferSize: " << inBufferSize << ", outBufferSize: " << outBufferSize);
}

void Execute() override
{
DEBUG_LOG("[DecompressWorker] Execute");
auto decompressCtx = ZSTD_createDCtx();
codeOrSize = ZSTD_decompressDCtx(decompressCtx,
outBuffer.get(), outBufferSize,
inBufferPtr, inBufferSize);
ZSTD_freeDCtx(decompressCtx);
DEBUG_LOG("[DecompressWorker] codeOrSize: " << codeOrSize);
}

void OnOK() override
{
DEBUG_LOG("[DecompressWorker] OnOK");
Napi::HandleScope scope(Env());
if (ZSTD_isError(codeOrSize))
{
Callback().Call({
Napi::String::New(Env(), ZSTD_getErrorName(codeOrSize)),
Env().Null(),
});
}
else
{
Callback().Call({
Env().Null(),
Napi::Buffer<char>::Copy(Env(), outBuffer.get(), codeOrSize),
});
}
}

~DecompressWorker()
{
DEBUG_LOG("[~DecompressWorker] Destructor");
}

private:
size_t inBufferSize;
char *inBufferPtr;
Napi::ObjectReference inBufferRef;

size_t outBufferSize;
std::unique_ptr<char[]> outBuffer;

size_t codeOrSize;
};

Napi::Value decompress_async(const Napi::CallbackInfo &info)
{
Napi::Env env = info.Env();
THROW_TYPE_ERROR_IF_FAILED(info.Length() == 3, "Wrong number of arguments");
THROW_TYPE_ERROR_IF_FAILED(info[0].IsBuffer(), "Wrong argument 0");
THROW_TYPE_ERROR_IF_FAILED(info[1].IsNumber(), "Wrong argument 1");
THROW_TYPE_ERROR_IF_FAILED(info[2].IsFunction(), "Wrong argument 2");
auto inBuffer = info[0].As<Napi::Buffer<char>>();
auto maxSize = info[1].As<Napi::Number>().Int64Value();
auto callback = info[2].As<Napi::Function>();
auto outSize = ZSTD_decompressBound(inBuffer.Data(), inBuffer.Length());
THROW_IF_FAILED(outSize != ZSTD_CONTENTSIZE_ERROR,
"Invalid compressed data");
THROW_IF_FAILED(outSize < (uint64_t)maxSize, "Content size is too large");
DEBUG_LOG("[decompress_async] args checked");
auto worker = new DecompressWorker(callback, inBuffer, outSize);
worker->Queue();
return env.Undefined();
}

class Compressor : public Napi::ObjectWrap<Compressor>
{
public:
Expand All @@ -99,14 +256,14 @@ class Compressor : public Napi::ObjectWrap<Compressor>
THROW_IF_FAILED_VOID(ctx != NULL, "Failed to create compression context");
auto code = ZSTD_initCStream(ctx, level);
THROW_IF_FAILED_VOID(!ZSTD_isError(code), ZSTD_getErrorName(code));
DEBUG("[Compressor]");
DEBUG_LOG("[Compressor]");
}

~Compressor()
{
ZSTD_freeCStream(ctx);
ctx = nullptr;
DEBUG("[~Compressor]");
DEBUG_LOG("[~Compressor]");
}

private:
Expand All @@ -128,18 +285,18 @@ class Compressor : public Napi::ObjectWrap<Compressor>
while (in.pos < in.size)
{
auto code = ZSTD_compressStream(ctx, &out, &in);
DEBUG("[Compress] code: " << code << ", in.pos: " << in.pos << ", in.size: " << in.size << ", out.pos: " << out.pos << ", out.size: " << out.size);
DEBUG_LOG("[Compress] code: " << code << ", in.pos: " << in.pos << ", in.size: " << in.size << ", out.pos: " << out.pos << ", out.size: " << out.size);
THROW_IF_FAILED(!ZSTD_isError(code), ZSTD_getErrorName(code));
if (out.pos == out.size)
{
DEBUG("[Compress] send chunk: " << out.pos);
DEBUG_LOG("[Compress] send chunk: " << out.pos);
callback.Call({Napi::Buffer<char>::Copy(env, outBuffer.Data(), out.size)});
out.pos = 0;
}
}
if (out.pos > 0)
{
DEBUG("[Compress] send chunk final: " << out.pos);
DEBUG_LOG("[Compress] send chunk final: " << out.pos);
callback.Call({Napi::Buffer<char>::Copy(env, outBuffer.Data(), out.pos)});
}
return env.Undefined();
Expand All @@ -156,11 +313,11 @@ class Compressor : public Napi::ObjectWrap<Compressor>
while (true)
{
auto code = ZSTD_endStream(ctx, &out);
DEBUG("[End] code: " << code << ", out.pos: " << out.pos << ", out.size: " << out.size);
DEBUG_LOG("[End] code: " << code << ", out.pos: " << out.pos << ", out.size: " << out.size);
THROW_IF_FAILED(!ZSTD_isError(code), ZSTD_getErrorName(code));
if (out.pos == out.size)
{
DEBUG("[End] send chunk: " << out.pos);
DEBUG_LOG("[End] send chunk: " << out.pos);
callback.Call({Napi::Buffer<char>::Copy(env, outBuffer.Data(), out.size)});
out.pos = 0;
}
Expand All @@ -169,7 +326,7 @@ class Compressor : public Napi::ObjectWrap<Compressor>
}
if (out.pos > 0)
{
DEBUG("[End] send chunk final: " << out.pos);
DEBUG_LOG("[End] send chunk final: " << out.pos);
callback.Call({Napi::Buffer<char>::Copy(env, outBuffer.Data(), out.pos)});
}
return env.Undefined();
Expand Down Expand Up @@ -199,14 +356,14 @@ class Decompressor : public Napi::ObjectWrap<Decompressor>
THROW_IF_FAILED_VOID(ctx != NULL, "Failed to create decompression context");
auto code = ZSTD_initDStream(ctx);
THROW_IF_FAILED_VOID(!ZSTD_isError(code), ZSTD_getErrorName(code));
DEBUG("[Decompressor]");
DEBUG_LOG("[Decompressor]");
}

~Decompressor()
{
ZSTD_freeDStream(ctx);
ctx = nullptr;
DEBUG("[~Decompressor]");
DEBUG_LOG("[~Decompressor]");
}

private:
Expand All @@ -228,18 +385,18 @@ class Decompressor : public Napi::ObjectWrap<Decompressor>
while (in.pos < in.size)
{
auto code = ZSTD_decompressStream(ctx, &out, &in);
DEBUG("[Decompress] code: " << code << ", in.pos: " << in.pos << ", in.size: " << in.size << ", out.pos: " << out.pos << ", out.size: " << out.size);
DEBUG_LOG("[Decompress] code: " << code << ", in.pos: " << in.pos << ", in.size: " << in.size << ", out.pos: " << out.pos << ", out.size: " << out.size);
THROW_IF_FAILED(!ZSTD_isError(code), ZSTD_getErrorName(code));
if (out.pos == out.size)
{
DEBUG("[Decompress] send chunk: " << out.pos);
DEBUG_LOG("[Decompress] send chunk: " << out.pos);
callback.Call({Napi::Buffer<char>::Copy(env, outBuffer.Data(), out.size)});
out.pos = 0;
}
}
if (out.pos > 0)
{
DEBUG("[Decompress] send chunk final: " << out.pos);
DEBUG_LOG("[Decompress] send chunk final: " << out.pos);
callback.Call({Napi::Buffer<char>::Copy(env, outBuffer.Data(), out.pos)});
}
return env.Undefined();
Expand All @@ -256,12 +413,12 @@ class Decompressor : public Napi::ObjectWrap<Decompressor>

ZSTD_inBuffer dummy = {nullptr, 0, 0};
auto code = ZSTD_decompressStream(ctx, &out, &dummy);
DEBUG("[End] code: " << code << ", out.pos: " << out.pos << ", out.size: " << out.size);
DEBUG_LOG("[End] code: " << code << ", out.pos: " << out.pos << ", out.size: " << out.size);
THROW_IF_FAILED(!ZSTD_isError(code), ZSTD_getErrorName(code));

if (out.pos > 0)
{
DEBUG("[End] send chunk: " << out.pos);
DEBUG_LOG("[End] send chunk: " << out.pos);
callback.Call({Napi::Buffer<char>::Copy(env, outBuffer.Data(), out.pos)});
// Frame not complete
THROW_IF_FAILED(code == 0, "Incomplete compressed data");
Expand All @@ -279,7 +436,9 @@ class Decompressor : public Napi::ObjectWrap<Decompressor>
Napi::Object Init(Napi::Env env, Napi::Object exports)
{
EXPORT_FUNCTION(compress);
EXPORT_FUNCTION(compress_async);
EXPORT_FUNCTION(decompress);
EXPORT_FUNCTION(decompress_async);
EXPORT_VALUE(version, Napi::String::New(env, ZSTD_versionString()));
EXPORT_VALUE(minLevel, ZSTD_minCLevel());
EXPORT_VALUE(maxLevel, ZSTD_maxCLevel());
Expand Down
Loading

0 comments on commit 6858d6b

Please sign in to comment.