Skip to content

Commit

Permalink
start to work with wait
Browse files Browse the repository at this point in the history
  • Loading branch information
Hanaasagi committed Jun 16, 2023
1 parent 3013b47 commit 751289c
Showing 1 changed file with 103 additions and 14 deletions.
117 changes: 103 additions & 14 deletions src/multithread.zig
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
const std = @import("std");
const testing = std.testing;

// TODO:
// 1. getConst and getMut
// 2. support argumetns like thread.spawn
// 3. optimise lock

pub fn Lazy(comptime T: type, comptime f: fn () T) type {
return struct {
cell: OnceCell(T),
Expand All @@ -23,29 +28,45 @@ pub fn Lazy(comptime T: type, comptime f: fn () T) type {

pub fn OnceCell(comptime T: type) type {
return struct {
cell: T = undefined,
mutex: std.Thread.Mutex = std.Thread.Mutex{},
done: bool = false,
// cell: T = undefined,
// mutex: std.Thread.Mutex = std.Thread.Mutex{},
// done: bool = false,

cell: T,
mutex: std.Thread.Mutex,
done: std.atomic.Atomic(bool),
const Self = @This();

pub fn init() Self {
return Self{};
return Self{
.cell = undefined,
.mutex = std.Thread.Mutex{},
.done = std.atomic.Atomic(bool).init(false),
};
}

/// Creates a new initialized cell.
pub fn initWithValue(value: T) Self {
return Self{
.cell = value,
.done = true,
};
}

/// Gets the reference to the underlying value.
/// Returns `null` if the cell is uninitialized, or being initialized.
/// This method never blocks.
pub fn get(self: *Self) ?*T {
if (self.isInitialize()) {
return &self.cell;
}
return null;
}

pub fn getUnchecked(self: *Self) *T {
std.debug.assert(self.isInitialize());

return &self.cell;
}

/// Gets the reference to the underlying value, initializing it with `f` if the cell was uninitialized.
/// Many threads may call `getOrInit` concurrently with different initializing functions,
/// but it is guaranteed that only one function will be executed.
/// This method may block when the cell is not initialized.
pub fn getOrInit(self: *Self, comptime f: fn () T) *T {
// Fast path check
if (self.get()) |value| {
Expand All @@ -58,12 +79,42 @@ pub fn OnceCell(comptime T: type) type {
return self.getUnchecked();
}

/// Get the reference to the underlying value, without checking if the cell is initialized.
pub fn getUnchecked(self: *Self) *T {
std.debug.assert(self.isInitialize());

return &self.cell;
}

/// Takes the value out of this OnceCell, moving it back to an uninitialized state.
pub fn take(self: *Self) ?T {
if (self.isInitialize()) {
defer self.done.store(false, .Release);

var cell = self.cell;
self.cell = undefined;
return cell;
}
return null;
}

/// Gets the reference to the underlying value, blocking the current thread until it is set.
// pub fn wait(self: *Self) *T {
// if (self.done.load(.Monotonic) == false) {
// std.Thread.Futex.wait(&self.done, false);
// }

// while (self.done.swap(false, .Acquire) != true) {
// std.Thread.Futex.wait(&self.done, false);
// }
// }

// --------------------------------------------------------------------------------
// Core API
// --------------------------------------------------------------------------------

fn isInitialize(self: Self) bool {
return @atomicLoad(bool, &self.done, .Acquire);
return self.done.load(.Acquire);
}

fn initialize(self: *Self, comptime f: fn () T) void {
Expand All @@ -73,10 +124,9 @@ pub fn OnceCell(comptime T: type) type {
defer self.mutex.unlock();

// The first thread to acquire the mutex gets to run the initializer

if (!self.done) {
if (!self.done.loadUnchecked()) {
self.cell = f();
defer @atomicStore(bool, &self.done, true, .Release);
defer self.done.store(true, .Release);
}
}
};
Expand Down Expand Up @@ -121,6 +171,27 @@ test "test global map" {
defer r2.*.deinit();
}

var globalMap2 = OnceCell(std.StringHashMap(i32)).init();

test "test global map take" {
_ = globalMap2.getOrInit(returnMap);
var r1 = globalMap2.take().?;
defer r1.deinit();

try testing.expect(r1.get("b") != null);
try testing.expect(r1.get("b").? == 2);

var r2 = globalMap2.take();
try testing.expect(r2 == null);

_ = globalMap2.getOrInit(returnMap);

var r3 = globalMap2.take().?;
defer r3.deinit();
try testing.expect(r3.get("b") != null);
try testing.expect(r3.get("b").? == 2);
}

test "test assume init" {
var cell1 = OnceCell(i32).init();
const r1 = cell1.getOrInit(return_1);
Expand Down Expand Up @@ -176,6 +247,24 @@ test "test multithread shared value" {
try testing.expectEqual(@as(i32, 1), shared);
}

// var cell4 = OnceCell(i32).init();

// test "test wait" {
// var threads: [10]std.Thread = undefined;
// defer for (threads) |handle| handle.join();

// for (&threads) |*handle| {
// handle.* = try std.Thread.spawn(.{}, struct {
// fn thread_fn(x: u8) void {
// _ = x;
// _ = cell4.wait();
// }
// }.thread_fn, .{0});
// }

// _ = cell4.getOrInit(return_1);
// }

var LazyMap = Lazy(std.StringHashMap(i32), returnMap).init();

test "test lazy" {
Expand Down

0 comments on commit 751289c

Please sign in to comment.