Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

zlib: refactor zlib internals #23360

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
928 changes: 518 additions & 410 deletions src/node_zlib.cc
Original file line number Diff line number Diff line change
@@ -88,32 +88,75 @@ enum node_zlib_mode {
#define GZIP_HEADER_ID1 0x1f
#define GZIP_HEADER_ID2 0x8b

/**
* Deflate/Inflate
*/
class ZCtx : public AsyncWrap, public ThreadPoolWork {
struct CompressionError {
CompressionError(const char* message, const char* code, int err)
: message(message), code(code), err(err) {}
CompressionError() = default;

const char* message = nullptr;
const char* code = nullptr;
int err = 0;

inline bool IsError() const { return code != nullptr; }
};

class ZlibContext : public MemoryRetainer {
public:
ZlibContext() = default;

// Streaming-related, should be available for all compression libraries:
void Close();
void DoThreadPoolWork();
void SetBuffers(char* in, uint32_t in_len, char* out, uint32_t out_len);
void SetFlush(int flush);
void GetAfterWriteOffsets(uint32_t* avail_in, uint32_t* avail_out) const;
CompressionError GetErrorInfo() const;

// Zlib-specific:
CompressionError Init(int level, int window_bits, int mem_level, int strategy,
std::vector<unsigned char>&& dictionary);
inline void SetMode(node_zlib_mode mode) { mode_ = mode; }
void SetAllocationFunctions(alloc_func alloc, free_func free, void* opaque);
CompressionError ResetStream();
CompressionError SetParams(int level, int strategy);

SET_MEMORY_INFO_NAME(ZlibContext)
SET_SELF_SIZE(ZlibContext)

void MemoryInfo(MemoryTracker* tracker) const override {
tracker->TrackField("dictionary", dictionary_);
}

private:
CompressionError ErrorForMessage(const char* message) const;
CompressionError SetDictionary();

int err_ = 0;
int flush_ = 0;
int level_ = 0;
int mem_level_ = 0;
node_zlib_mode mode_ = NONE;
int strategy_ = 0;
int window_bits_ = 0;
unsigned int gzip_id_bytes_read_ = 0;
std::vector<unsigned char> dictionary_;

z_stream strm_;

DISALLOW_COPY_AND_ASSIGN(ZlibContext);
};

template <typename CompressionContext>
class CompressionStream : public AsyncWrap, public ThreadPoolWork {
public:
ZCtx(Environment* env, Local<Object> wrap, node_zlib_mode mode)
CompressionStream(Environment* env, Local<Object> wrap)
: AsyncWrap(env, wrap, AsyncWrap::PROVIDER_ZLIB),
ThreadPoolWork(env),
err_(0),
flush_(0),
init_done_(false),
level_(0),
memLevel_(0),
mode_(mode),
strategy_(0),
windowBits_(0),
write_in_progress_(false),
pending_close_(false),
refs_(0),
gzip_id_bytes_read_(0),
write_result_(nullptr) {
MakeWeak();
}


~ZCtx() override {
~CompressionStream() override {
CHECK_EQ(false, write_in_progress_ && "write in progress");
Close();
CHECK_EQ(zlib_memory_, 0);
@@ -127,27 +170,16 @@ class ZCtx : public AsyncWrap, public ThreadPoolWork {
}

pending_close_ = false;
closed_ = true;
CHECK(init_done_ && "close before init");
CHECK_LE(mode_, UNZIP);

AllocScope alloc_scope(this);
int status = Z_OK;
if (mode_ == DEFLATE || mode_ == GZIP || mode_ == DEFLATERAW) {
status = deflateEnd(&strm_);
} else if (mode_ == INFLATE || mode_ == GUNZIP || mode_ == INFLATERAW ||
mode_ == UNZIP) {
status = inflateEnd(&strm_);
}

CHECK(status == Z_OK || status == Z_DATA_ERROR);
mode_ = NONE;

dictionary_.clear();
ctx_.Close();
}


static void Close(const FunctionCallbackInfo<Value>& args) {
ZCtx* ctx;
CompressionStream* ctx;
ASSIGN_OR_RETURN_UNWRAP(&ctx, args.Holder());
ctx->Close();
}
@@ -198,7 +230,7 @@ class ZCtx : public AsyncWrap, public ThreadPoolWork {
CHECK(Buffer::IsWithinBounds(out_off, out_len, Buffer::Length(out_buf)));
out = Buffer::Data(out_buf) + out_off;

ZCtx* ctx;
CompressionStream* ctx;
ASSIGN_OR_RETURN_UNWRAP(&ctx, args.Holder());

ctx->Write<async>(flush, in, in_len, out, out_len);
@@ -211,26 +243,22 @@ class ZCtx : public AsyncWrap, public ThreadPoolWork {
AllocScope alloc_scope(this);

CHECK(init_done_ && "write before init");
CHECK(mode_ != NONE && "already finalized");
CHECK(!closed_ && "already finalized");

CHECK_EQ(false, write_in_progress_);
CHECK_EQ(false, pending_close_);
write_in_progress_ = true;
Ref();

strm_.avail_in = in_len;
strm_.next_in = reinterpret_cast<Bytef*>(in);
strm_.avail_out = out_len;
strm_.next_out = reinterpret_cast<Bytef*>(out);
flush_ = flush;
ctx_.SetBuffers(in, in_len, out, out_len);
ctx_.SetFlush(flush);

if (!async) {
// sync version
env()->PrintSyncTrace();
DoThreadPoolWork();
if (CheckError()) {
write_result_[0] = strm_.avail_out;
write_result_[1] = strm_.avail_in;
UpdateWriteResult();
write_in_progress_ = false;
}
Unref();
@@ -241,142 +269,24 @@ class ZCtx : public AsyncWrap, public ThreadPoolWork {
ScheduleWork();
}

void UpdateWriteResult() {
ctx_.GetAfterWriteOffsets(&write_result_[1], &write_result_[0]);
}

// thread pool!
// This function may be called multiple times on the uv_work pool
// for a single write() call, until all of the input bytes have
// been consumed.
void DoThreadPoolWork() override {
const Bytef* next_expected_header_byte = nullptr;

// If the avail_out is left at 0, then it means that it ran out
// of room. If there was avail_out left over, then it means
// that all of the input was consumed.
switch (mode_) {
case DEFLATE:
case GZIP:
case DEFLATERAW:
err_ = deflate(&strm_, flush_);
break;
case UNZIP:
if (strm_.avail_in > 0) {
next_expected_header_byte = strm_.next_in;
}

switch (gzip_id_bytes_read_) {
case 0:
if (next_expected_header_byte == nullptr) {
break;
}

if (*next_expected_header_byte == GZIP_HEADER_ID1) {
gzip_id_bytes_read_ = 1;
next_expected_header_byte++;

if (strm_.avail_in == 1) {
// The only available byte was already read.
break;
}
} else {
mode_ = INFLATE;
break;
}

// fallthrough
case 1:
if (next_expected_header_byte == nullptr) {
break;
}

if (*next_expected_header_byte == GZIP_HEADER_ID2) {
gzip_id_bytes_read_ = 2;
mode_ = GUNZIP;
} else {
// There is no actual difference between INFLATE and INFLATERAW
// (after initialization).
mode_ = INFLATE;
}

break;
default:
CHECK(0 && "invalid number of gzip magic number bytes read");
}

// fallthrough
case INFLATE:
case GUNZIP:
case INFLATERAW:
err_ = inflate(&strm_, flush_);

// If data was encoded with dictionary (INFLATERAW will have it set in
// SetDictionary, don't repeat that here)
if (mode_ != INFLATERAW &&
err_ == Z_NEED_DICT &&
!dictionary_.empty()) {
// Load it
err_ = inflateSetDictionary(&strm_,
dictionary_.data(),
dictionary_.size());
if (err_ == Z_OK) {
// And try to decode again
err_ = inflate(&strm_, flush_);
} else if (err_ == Z_DATA_ERROR) {
// Both inflateSetDictionary() and inflate() return Z_DATA_ERROR.
// Make it possible for After() to tell a bad dictionary from bad
// input.
err_ = Z_NEED_DICT;
}
}

while (strm_.avail_in > 0 &&
mode_ == GUNZIP &&
err_ == Z_STREAM_END &&
strm_.next_in[0] != 0x00) {
// Bytes remain in input buffer. Perhaps this is another compressed
// member in the same archive, or just trailing garbage.
// Trailing zero bytes are okay, though, since they are frequently
// used for padding.

Reset();
err_ = inflate(&strm_, flush_);
}
break;
default:
UNREACHABLE();
}

// pass any errors back to the main thread to deal with.

// now After will emit the output, and
// either schedule another call to Process,
// or shift the queue and call Process.
ctx_.DoThreadPoolWork();
}


bool CheckError() {
// Acceptable error states depend on the type of zlib stream.
switch (err_) {
case Z_OK:
case Z_BUF_ERROR:
if (strm_.avail_out != 0 && flush_ == Z_FINISH) {
Error("unexpected end of file");
return false;
}
case Z_STREAM_END:
// normal statuses, not fatal
break;
case Z_NEED_DICT:
if (dictionary_.empty())
Error("Missing dictionary");
else
Error("Bad dictionary");
return false;
default:
// something else.
Error("Zlib error");
return false;
}

return true;
const CompressionError err = ctx_.GetErrorInfo();
if (!err.IsError()) return true;
EmitError(err);
return false;
}


@@ -400,8 +310,7 @@ class ZCtx : public AsyncWrap, public ThreadPoolWork {
if (!CheckError())
return;

write_result_[0] = strm_.avail_out;
write_result_[1] = strm_.avail_in;
UpdateWriteResult();

// call the write() cb
Local<Function> cb = PersistentToLocal(env()->isolate(),
@@ -413,19 +322,15 @@ class ZCtx : public AsyncWrap, public ThreadPoolWork {
}

// TODO(addaleax): Switch to modern error system (node_errors.h).
void Error(const char* message) {
void EmitError(const CompressionError& err) {
// If you hit this assertion, you forgot to enter the v8::Context first.
CHECK_EQ(env()->context(), env()->isolate()->GetCurrentContext());

if (strm_.msg != nullptr) {
message = strm_.msg;
}

HandleScope scope(env()->isolate());
Local<Value> args[3] = {
OneByteString(env()->isolate(), message),
Integer::New(env()->isolate(), err_),
OneByteString(env()->isolate(), ZlibStrerror(err_))
OneByteString(env()->isolate(), err.message),
Integer::New(env()->isolate(), err.err),
OneByteString(env()->isolate(), err.code)
};
MakeCallback(env()->onerror_string(), arraysize(args), args);

@@ -435,12 +340,107 @@ class ZCtx : public AsyncWrap, public ThreadPoolWork {
Close();
}

void MemoryInfo(MemoryTracker* tracker) const override {
tracker->TrackField("compression context", ctx_);
tracker->TrackFieldWithSize("zlib_memory",
zlib_memory_ + unreported_allocations_);
}

protected:
CompressionContext* context() { return &ctx_; }

void InitStream(uint32_t* write_result, Local<Function> write_js_callback) {
write_result_ = write_result;
write_js_callback_.Reset(env()->isolate(), write_js_callback);
init_done_ = true;
}

// Allocation functions provided to zlib itself. We store the real size of
// the allocated memory chunk just before the "payload" memory we return
// to zlib.
// Because we use zlib off the thread pool, we can not report memory directly
// to V8; rather, we first store it as "unreported" memory in a separate
// field and later report it back from the main thread.
static void* AllocForZlib(void* data, uInt items, uInt size) {
CompressionStream* ctx = static_cast<CompressionStream*>(data);
size_t real_size =
MultiplyWithOverflowCheck(static_cast<size_t>(items),
static_cast<size_t>(size)) + sizeof(size_t);
char* memory = UncheckedMalloc(real_size);
if (UNLIKELY(memory == nullptr)) return nullptr;
*reinterpret_cast<size_t*>(memory) = real_size;
ctx->unreported_allocations_.fetch_add(real_size,
std::memory_order_relaxed);
return memory + sizeof(size_t);
}

static void FreeForZlib(void* data, void* pointer) {
if (UNLIKELY(pointer == nullptr)) return;
CompressionStream* ctx = static_cast<CompressionStream*>(data);
char* real_pointer = static_cast<char*>(pointer) - sizeof(size_t);
size_t real_size = *reinterpret_cast<size_t*>(real_pointer);
ctx->unreported_allocations_.fetch_sub(real_size,
std::memory_order_relaxed);
free(real_pointer);
}

// This is called on the main thread after zlib may have allocated something
// in order to report it back to V8.
void AdjustAmountOfExternalAllocatedMemory() {
ssize_t report =
unreported_allocations_.exchange(0, std::memory_order_relaxed);
if (report == 0) return;
CHECK_IMPLIES(report < 0, zlib_memory_ >= static_cast<size_t>(-report));
zlib_memory_ += report;
env()->isolate()->AdjustAmountOfExternalAllocatedMemory(report);
}

struct AllocScope {
explicit AllocScope(CompressionStream* stream) : stream(stream) {}
~AllocScope() { stream->AdjustAmountOfExternalAllocatedMemory(); }
CompressionStream* stream;
};

private:
void Ref() {
if (++refs_ == 1) {
ClearWeak();
}
}

void Unref() {
CHECK_GT(refs_, 0);
if (--refs_ == 0) {
MakeWeak();
}
}

bool init_done_ = false;
bool write_in_progress_ = false;
bool pending_close_ = false;
bool closed_ = false;
unsigned int refs_ = 0;
uint32_t* write_result_ = nullptr;
Persistent<Function> write_js_callback_;
std::atomic<ssize_t> unreported_allocations_{0};
size_t zlib_memory_ = 0;

CompressionContext ctx_;
};

class ZlibStream : public CompressionStream<ZlibContext> {
public:
ZlibStream(Environment* env, Local<Object> wrap, node_zlib_mode mode)
: CompressionStream(env, wrap) {
context()->SetMode(mode);
}

static void New(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
CHECK(args[0]->IsInt32());
node_zlib_mode mode =
static_cast<node_zlib_mode>(args[0].As<Int32>()->Value());
new ZCtx(env, args.This(), mode);
new ZlibStream(env, args.This(), mode);
}

// just pull the ints out of the args and call the other Init
@@ -459,42 +459,25 @@ class ZCtx : public AsyncWrap, public ThreadPoolWork {
"init(windowBits, level, memLevel, strategy, writeResult, writeCallback,"
" dictionary)");

ZCtx* ctx;
ASSIGN_OR_RETURN_UNWRAP(&ctx, args.Holder());
ZlibStream* wrap;
ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());

Local<Context> context = args.GetIsolate()->GetCurrentContext();

// windowBits is special. On the compression side, 0 is an invalid value.
// But on the decompression side, a value of 0 for windowBits tells zlib
// to use the window size in the zlib header of the compressed stream.
uint32_t windowBits;
if (!args[0]->Uint32Value(context).To(&windowBits)) return;

if (!((windowBits == 0) &&
(ctx->mode_ == INFLATE ||
ctx->mode_ == GUNZIP ||
ctx->mode_ == UNZIP))) {
CHECK(
(windowBits >= Z_MIN_WINDOWBITS && windowBits <= Z_MAX_WINDOWBITS) &&
"invalid windowBits");
}
uint32_t window_bits;
if (!args[0]->Uint32Value(context).To(&window_bits)) return;

int level;
int32_t level;
if (!args[1]->Int32Value(context).To(&level)) return;
CHECK((level >= Z_MIN_LEVEL && level <= Z_MAX_LEVEL) &&
"invalid compression level");

uint32_t memLevel;
if (!args[2]->Uint32Value(context).To(&memLevel)) return;
CHECK((memLevel >= Z_MIN_MEMLEVEL && memLevel <= Z_MAX_MEMLEVEL) &&
"invalid memlevel");
uint32_t mem_level;
if (!args[2]->Uint32Value(context).To(&mem_level)) return;

uint32_t strategy;
if (!args[3]->Uint32Value(context).To(&strategy)) return;
CHECK((strategy == Z_FILTERED || strategy == Z_HUFFMAN_ONLY ||
strategy == Z_RLE || strategy == Z_FIXED ||
strategy == Z_DEFAULT_STRATEGY) &&
"invalid strategy");

CHECK(args[4]->IsUint32Array());
Local<Uint32Array> array = args[4].As<Uint32Array>();
@@ -512,279 +495,404 @@ class ZCtx : public AsyncWrap, public ThreadPoolWork {
data + Buffer::Length(args[6]));
}

bool ret = ctx->Init(level, windowBits, memLevel, strategy, write_result,
write_js_callback, std::move(dictionary));
if (ret)
ctx->SetDictionary();
wrap->InitStream(write_result, write_js_callback);

AllocScope alloc_scope(wrap);
wrap->context()->SetAllocationFunctions(
AllocForZlib, FreeForZlib, static_cast<CompressionStream*>(wrap));
const CompressionError err =
wrap->context()->Init(level, window_bits, mem_level, strategy,
std::move(dictionary));
if (err.IsError())
wrap->EmitError(err);

return args.GetReturnValue().Set(ret);
return args.GetReturnValue().Set(!err.IsError());
}

static void Params(const FunctionCallbackInfo<Value>& args) {
CHECK(args.Length() == 2 && "params(level, strategy)");
ZCtx* ctx;
ASSIGN_OR_RETURN_UNWRAP(&ctx, args.Holder());
Environment* env = ctx->env();
ZlibStream* wrap;
ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());
Local<Context> context = args.GetIsolate()->GetCurrentContext();
int level;
if (!args[0]->Int32Value(env->context()).To(&level)) return;
if (!args[0]->Int32Value(context).To(&level)) return;
int strategy;
if (!args[1]->Int32Value(env->context()).To(&strategy)) return;
ctx->Params(level, strategy);
if (!args[1]->Int32Value(context).To(&strategy)) return;

AllocScope alloc_scope(wrap);
const CompressionError err = wrap->context()->SetParams(level, strategy);
if (err.IsError())
wrap->EmitError(err);
}

static void Reset(const FunctionCallbackInfo<Value> &args) {
ZCtx* ctx;
ASSIGN_OR_RETURN_UNWRAP(&ctx, args.Holder());
ctx->Reset();
ctx->SetDictionary();
ZlibStream* wrap;
ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());

AllocScope alloc_scope(wrap);
const CompressionError err = wrap->context()->ResetStream();
if (err.IsError())
wrap->EmitError(err);
}

bool Init(int level, int windowBits, int memLevel,
int strategy, uint32_t* write_result,
Local<Function> write_js_callback,
std::vector<unsigned char>&& dictionary) {
AllocScope alloc_scope(this);
level_ = level;
windowBits_ = windowBits;
memLevel_ = memLevel;
strategy_ = strategy;
SET_MEMORY_INFO_NAME(ZlibStream)
SET_SELF_SIZE(ZlibStream)
};

strm_.zalloc = AllocForZlib;
strm_.zfree = FreeForZlib;
strm_.opaque = static_cast<void*>(this);

flush_ = Z_NO_FLUSH;
void ZlibContext::Close() {
CHECK_LE(mode_, UNZIP);

err_ = Z_OK;
int status = Z_OK;
if (mode_ == DEFLATE || mode_ == GZIP || mode_ == DEFLATERAW) {
status = deflateEnd(&strm_);
} else if (mode_ == INFLATE || mode_ == GUNZIP || mode_ == INFLATERAW ||
mode_ == UNZIP) {
status = inflateEnd(&strm_);
}

if (mode_ == GZIP || mode_ == GUNZIP) {
windowBits_ += 16;
}
CHECK(status == Z_OK || status == Z_DATA_ERROR);
mode_ = NONE;

if (mode_ == UNZIP) {
windowBits_ += 32;
}
dictionary_.clear();
}

if (mode_ == DEFLATERAW || mode_ == INFLATERAW) {
windowBits_ *= -1;
}

switch (mode_) {
case DEFLATE:
case GZIP:
case DEFLATERAW:
err_ = deflateInit2(&strm_,
level_,
Z_DEFLATED,
windowBits_,
memLevel_,
strategy_);
break;
case INFLATE:
case GUNZIP:
case INFLATERAW:
case UNZIP:
err_ = inflateInit2(&strm_, windowBits_);
break;
default:
UNREACHABLE();
}
void ZlibContext::DoThreadPoolWork() {
const Bytef* next_expected_header_byte = nullptr;

// If the avail_out is left at 0, then it means that it ran out
// of room. If there was avail_out left over, then it means
// that all of the input was consumed.
switch (mode_) {
case DEFLATE:
case GZIP:
case DEFLATERAW:
err_ = deflate(&strm_, flush_);
break;
case UNZIP:
if (strm_.avail_in > 0) {
next_expected_header_byte = strm_.next_in;
}

dictionary_ = std::move(dictionary);
switch (gzip_id_bytes_read_) {
case 0:
if (next_expected_header_byte == nullptr) {
break;
}

write_in_progress_ = false;
init_done_ = true;
if (*next_expected_header_byte == GZIP_HEADER_ID1) {
gzip_id_bytes_read_ = 1;
next_expected_header_byte++;

if (err_ != Z_OK) {
dictionary_.clear();
mode_ = NONE;
return false;
}
if (strm_.avail_in == 1) {
// The only available byte was already read.
break;
}
} else {
mode_ = INFLATE;
break;
}

write_result_ = write_result;
write_js_callback_.Reset(env()->isolate(), write_js_callback);
return true;
}
// fallthrough
case 1:
if (next_expected_header_byte == nullptr) {
break;
}

void SetDictionary() {
if (dictionary_.empty())
return;
if (*next_expected_header_byte == GZIP_HEADER_ID2) {
gzip_id_bytes_read_ = 2;
mode_ = GUNZIP;
} else {
// There is no actual difference between INFLATE and INFLATERAW
// (after initialization).
mode_ = INFLATE;
}

err_ = Z_OK;
break;
default:
CHECK(0 && "invalid number of gzip magic number bytes read");
}

switch (mode_) {
case DEFLATE:
case DEFLATERAW:
err_ = deflateSetDictionary(&strm_,
dictionary_.data(),
dictionary_.size());
break;
case INFLATERAW:
// The other inflate cases will have the dictionary set when inflate()
// returns Z_NEED_DICT in Process()
// fallthrough
case INFLATE:
case GUNZIP:
case INFLATERAW:
err_ = inflate(&strm_, flush_);

// If data was encoded with dictionary (INFLATERAW will have it set in
// SetDictionary, don't repeat that here)
if (mode_ != INFLATERAW &&
err_ == Z_NEED_DICT &&
!dictionary_.empty()) {
// Load it
err_ = inflateSetDictionary(&strm_,
dictionary_.data(),
dictionary_.size());
break;
default:
break;
}
if (err_ == Z_OK) {
// And try to decode again
err_ = inflate(&strm_, flush_);
} else if (err_ == Z_DATA_ERROR) {
// Both inflateSetDictionary() and inflate() return Z_DATA_ERROR.
// Make it possible for After() to tell a bad dictionary from bad
// input.
err_ = Z_NEED_DICT;
}
}

if (err_ != Z_OK) {
Error("Failed to set dictionary");
}
while (strm_.avail_in > 0 &&
mode_ == GUNZIP &&
err_ == Z_STREAM_END &&
strm_.next_in[0] != 0x00) {
// Bytes remain in input buffer. Perhaps this is another compressed
// member in the same archive, or just trailing garbage.
// Trailing zero bytes are okay, though, since they are frequently
// used for padding.

ResetStream();
err_ = inflate(&strm_, flush_);
}
break;
default:
UNREACHABLE();
}
}

void Params(int level, int strategy) {
AllocScope alloc_scope(this);

err_ = Z_OK;
void ZlibContext::SetBuffers(char* in, uint32_t in_len,
char* out, uint32_t out_len) {
strm_.avail_in = in_len;
strm_.next_in = reinterpret_cast<Bytef*>(in);
strm_.avail_out = out_len;
strm_.next_out = reinterpret_cast<Bytef*>(out);
}


void ZlibContext::SetFlush(int flush) {
flush_ = flush;
}

switch (mode_) {
case DEFLATE:
case DEFLATERAW:
err_ = deflateParams(&strm_, level, strategy);
break;
default:
break;
}

if (err_ != Z_OK && err_ != Z_BUF_ERROR) {
Error("Failed to set parameters");
void ZlibContext::GetAfterWriteOffsets(uint32_t* avail_in,
uint32_t* avail_out) const {
*avail_in = strm_.avail_in;
*avail_out = strm_.avail_out;
}


CompressionError ZlibContext::ErrorForMessage(const char* message) const {
if (strm_.msg != nullptr)
message = strm_.msg;

return CompressionError { message, ZlibStrerror(err_), err_ };
}


CompressionError ZlibContext::GetErrorInfo() const {
// Acceptable error states depend on the type of zlib stream.
switch (err_) {
case Z_OK:
case Z_BUF_ERROR:
if (strm_.avail_out != 0 && flush_ == Z_FINISH) {
return ErrorForMessage("unexpected end of file");
}
case Z_STREAM_END:
// normal statuses, not fatal
break;
case Z_NEED_DICT:
if (dictionary_.empty())
return ErrorForMessage("Missing dictionary");
else
return ErrorForMessage("Bad dictionary");
default:
// something else.
return ErrorForMessage("Zlib error");
}

void Reset() {
AllocScope alloc_scope(this);
return CompressionError {};
}

err_ = Z_OK;

switch (mode_) {
case DEFLATE:
case DEFLATERAW:
case GZIP:
err_ = deflateReset(&strm_);
break;
case INFLATE:
case INFLATERAW:
case GUNZIP:
err_ = inflateReset(&strm_);
break;
default:
break;
}

if (err_ != Z_OK) {
Error("Failed to reset stream");
}
CompressionError ZlibContext::ResetStream() {
err_ = Z_OK;

switch (mode_) {
case DEFLATE:
case DEFLATERAW:
case GZIP:
err_ = deflateReset(&strm_);
break;
case INFLATE:
case INFLATERAW:
case GUNZIP:
err_ = inflateReset(&strm_);
break;
default:
break;
}

void MemoryInfo(MemoryTracker* tracker) const override {
tracker->TrackField("dictionary", dictionary_);
tracker->TrackFieldWithSize("zlib_memory",
zlib_memory_ + unreported_allocations_);
if (err_ != Z_OK)
return ErrorForMessage("Failed to reset stream");

return SetDictionary();
}


void ZlibContext::SetAllocationFunctions(alloc_func alloc,
free_func free,
void* opaque) {
strm_.zalloc = alloc;
strm_.zfree = free;
strm_.opaque = opaque;
}


CompressionError ZlibContext::Init(
int level, int window_bits, int mem_level, int strategy,
std::vector<unsigned char>&& dictionary) {
if (!((window_bits == 0) &&
(mode_ == INFLATE ||
mode_ == GUNZIP ||
mode_ == UNZIP))) {
CHECK(
(window_bits >= Z_MIN_WINDOWBITS && window_bits <= Z_MAX_WINDOWBITS) &&
"invalid windowBits");
}

SET_MEMORY_INFO_NAME(ZCtx)
SET_SELF_SIZE(ZCtx)
CHECK((level >= Z_MIN_LEVEL && level <= Z_MAX_LEVEL) &&
"invalid compression level");

private:
void Ref() {
if (++refs_ == 1) {
ClearWeak();
}
CHECK((mem_level >= Z_MIN_MEMLEVEL && mem_level <= Z_MAX_MEMLEVEL) &&
"invalid memlevel");

CHECK((strategy == Z_FILTERED || strategy == Z_HUFFMAN_ONLY ||
strategy == Z_RLE || strategy == Z_FIXED ||
strategy == Z_DEFAULT_STRATEGY) &&
"invalid strategy");

level_ = level;
window_bits_ = window_bits;
mem_level_ = mem_level;
strategy_ = strategy;

flush_ = Z_NO_FLUSH;

err_ = Z_OK;

if (mode_ == GZIP || mode_ == GUNZIP) {
window_bits_ += 16;
}

void Unref() {
CHECK_GT(refs_, 0);
if (--refs_ == 0) {
MakeWeak();
}
if (mode_ == UNZIP) {
window_bits_ += 32;
}

// Allocation functions provided to zlib itself. We store the real size of
// the allocated memory chunk just before the "payload" memory we return
// to zlib.
// Because we use zlib off the thread pool, we can not report memory directly
// to V8; rather, we first store it as "unreported" memory in a separate
// field and later report it back from the main thread.
static void* AllocForZlib(void* data, uInt items, uInt size) {
ZCtx* ctx = static_cast<ZCtx*>(data);
size_t real_size =
MultiplyWithOverflowCheck(static_cast<size_t>(items),
static_cast<size_t>(size)) + sizeof(size_t);
char* memory = UncheckedMalloc(real_size);
if (UNLIKELY(memory == nullptr)) return nullptr;
*reinterpret_cast<size_t*>(memory) = real_size;
ctx->unreported_allocations_.fetch_add(real_size,
std::memory_order_relaxed);
return memory + sizeof(size_t);
if (mode_ == DEFLATERAW || mode_ == INFLATERAW) {
window_bits_ *= -1;
}

static void FreeForZlib(void* data, void* pointer) {
if (UNLIKELY(pointer == nullptr)) return;
ZCtx* ctx = static_cast<ZCtx*>(data);
char* real_pointer = static_cast<char*>(pointer) - sizeof(size_t);
size_t real_size = *reinterpret_cast<size_t*>(real_pointer);
ctx->unreported_allocations_.fetch_sub(real_size,
std::memory_order_relaxed);
free(real_pointer);
switch (mode_) {
case DEFLATE:
case GZIP:
case DEFLATERAW:
err_ = deflateInit2(&strm_,
level_,
Z_DEFLATED,
window_bits_,
mem_level_,
strategy_);
break;
case INFLATE:
case GUNZIP:
case INFLATERAW:
case UNZIP:
err_ = inflateInit2(&strm_, window_bits_);
break;
default:
UNREACHABLE();
}

// This is called on the main thread after zlib may have allocated something
// in order to report it back to V8.
void AdjustAmountOfExternalAllocatedMemory() {
ssize_t report =
unreported_allocations_.exchange(0, std::memory_order_relaxed);
if (report == 0) return;
CHECK_IMPLIES(report < 0, zlib_memory_ >= static_cast<size_t>(-report));
zlib_memory_ += report;
env()->isolate()->AdjustAmountOfExternalAllocatedMemory(report);
dictionary_ = std::move(dictionary);

if (err_ != Z_OK) {
dictionary_.clear();
mode_ = NONE;
return ErrorForMessage(nullptr);
}

struct AllocScope {
explicit AllocScope(ZCtx* ctx) : ctx(ctx) {}
~AllocScope() { ctx->AdjustAmountOfExternalAllocatedMemory(); }
ZCtx* ctx;
};
return SetDictionary();
}

std::vector<unsigned char> dictionary_;
int err_;
int flush_;
bool init_done_;
int level_;
int memLevel_;
node_zlib_mode mode_;
int strategy_;
z_stream strm_;
int windowBits_;
bool write_in_progress_;
bool pending_close_;
unsigned int refs_;
unsigned int gzip_id_bytes_read_;
uint32_t* write_result_;
Persistent<Function> write_js_callback_;
std::atomic<ssize_t> unreported_allocations_{0};
size_t zlib_memory_ = 0;
};

CompressionError ZlibContext::SetDictionary() {
if (dictionary_.empty())
return CompressionError {};

err_ = Z_OK;

switch (mode_) {
case DEFLATE:
case DEFLATERAW:
err_ = deflateSetDictionary(&strm_,
dictionary_.data(),
dictionary_.size());
break;
case INFLATERAW:
// The other inflate cases will have the dictionary set when inflate()
// returns Z_NEED_DICT in Process()
err_ = inflateSetDictionary(&strm_,
dictionary_.data(),
dictionary_.size());
break;
default:
break;
}

if (err_ != Z_OK) {
return ErrorForMessage("Failed to set dictionary");
}

return CompressionError {};
}


CompressionError ZlibContext::SetParams(int level, int strategy) {
err_ = Z_OK;

switch (mode_) {
case DEFLATE:
case DEFLATERAW:
err_ = deflateParams(&strm_, level, strategy);
break;
default:
break;
}

if (err_ != Z_OK && err_ != Z_BUF_ERROR) {
return ErrorForMessage("Failed to set parameters");
}

return CompressionError {};
}


void Initialize(Local<Object> target,
Local<Value> unused,
Local<Context> context,
void* priv) {
Environment* env = Environment::GetCurrent(context);
Local<FunctionTemplate> z = env->NewFunctionTemplate(ZCtx::New);
Local<FunctionTemplate> z = env->NewFunctionTemplate(ZlibStream::New);

z->InstanceTemplate()->SetInternalFieldCount(1);
z->Inherit(AsyncWrap::GetConstructorTemplate(env));

env->SetProtoMethod(z, "write", ZCtx::Write<true>);
env->SetProtoMethod(z, "writeSync", ZCtx::Write<false>);
env->SetProtoMethod(z, "init", ZCtx::Init);
env->SetProtoMethod(z, "close", ZCtx::Close);
env->SetProtoMethod(z, "params", ZCtx::Params);
env->SetProtoMethod(z, "reset", ZCtx::Reset);
env->SetProtoMethod(z, "write", ZlibStream::Write<true>);
env->SetProtoMethod(z, "writeSync", ZlibStream::Write<false>);
env->SetProtoMethod(z, "close", ZlibStream::Close);

env->SetProtoMethod(z, "init", ZlibStream::Init);
env->SetProtoMethod(z, "params", ZlibStream::Params);
env->SetProtoMethod(z, "reset", ZlibStream::Reset);

Local<String> zlibString = FIXED_ONE_BYTE_STRING(env->isolate(), "Zlib");
z->SetClassName(zlibString);
4 changes: 2 additions & 2 deletions test/parallel/test-heapdump-zlib.js
Original file line number Diff line number Diff line change
@@ -4,10 +4,10 @@ require('../common');
const { validateSnapshotNodes } = require('../common/heap');
const zlib = require('zlib');

validateSnapshotNodes('Node / ZCtx', []);
validateSnapshotNodes('Node / ZlibStream', []);
// eslint-disable-next-line no-unused-vars
const gunzip = zlib.createGunzip();
validateSnapshotNodes('Node / ZCtx', [
validateSnapshotNodes('Node / ZlibStream', [
{
children: [
{ node_name: 'Zlib', edge_name: 'wrapped' },