shithub: mc

ref: 303798469e8afe0e514c667f3c8c7ae2c65e58c5
dir: /lib/thread/rwlock+futex.myr/

View raw version
use std

use "atomic"
use "futex"
use "tls"
use "types"

pkg thread =
	/*
	The 31 low bits of `_state` contain either the number of readers
	holding the lock, or `0x7fffffff` if the lock is held by a single
	writer.

	The high bit is set if there are any waiting readers or writers.
	*/
	type rwlock = struct
		_state : ftxtag
		_owner : tid
	;;

	const mkrwlock  : (-> rwlock)
	const rdlock    : (rw : rwlock# -> void)
	const wrlock    : (rw : rwlock# -> void)
	const tryrdlock : (rw : rwlock# -> bool)
	const trywrlock : (rw : rwlock# -> bool)
	const rdunlock  : (rw : rwlock# -> void)
	const wrunlock  : (rw : rwlock# -> void)
;;

const Nrmask  = 0x7fffffff
const Waitbit = 0x80000000

const mkrwlock = {
	-> [._state = 0, ._owner = -1]
}

const rdlock = {rw
	for ; ;
		var s = xget(&rw._state)
		match s & Nrmask
		| Nrmask - 1: std.die("error: rwlock overflowed\n")
		| Nrmask:
			/*
			The lock is held by a writer so we attempt to CAS in
			the wait bit and wait. If the CAS fails, the state of
			the lock has changed so we can try once again to
			acquire it.
			*/
			if xcas(&rw._state, s, Nrmask | Waitbit) == s
				ftxwait(&rw._state, Nrmask | Waitbit, 0)
			;;
		| _:
			/*
			Otherwise the lock is either unlocked or held by some
			number of readers. Either way we simply increment the
			reader count via CAS.
			*/
			if xcas(&rw._state, s, s + 1) == s
				-> void
			;;
		;;
	;;
}

const wrlock = {rw
	for ; ;
		if rw._owner == tid()
			std.fput(std.Err,
				"error: thread {} attempted to relock an rwlock it already holds\n",
				tid())
			std.suicide()
		;;

		/*
		`_state` must be 0 for a writer to acquire the lock. Anything
		else means the lock is either held or in the process of being
		released by the last reader.
		 */
		var s = xcas(&rw._state, 0, Nrmask)
		if s == 0
			rw._owner = tid()
			-> void
		;;

		/*
		If we fail to acquire the lock, attempt to CAS in the wait bit
		and wait. It the CAS fails, the state of the lock has changed
		so we can try once again to acquire it.
		*/
		if xcas(&rw._state, s, s | Waitbit) == s
			ftxwait(&rw._state, s | Waitbit, 0)
		;;
	;;
}

const tryrdlock = {rw
	for ; ;
		var s = xget(&rw._state)
		match s & Nrmask
		| Nrmask - 1: std.die("error: rwlock overflowed\n")
		| Nrmask: -> false
		| _:
			if xcas(&rw._state, s, s + 1) == s
				-> true
			;;
		;;
	;;
	-> false /* Unreachable */
}

const trywrlock = {rw
	if xcas(&rw._state, 0, Nrmask) == 0
		rw._owner = tid()
		-> true
	;;
	-> false
}

const rdunlock = {rw
	/*
	Only the last reader needs to potentially wake a writer so all other
	readers can get away with simply decrementing the reader count via FAA.
	*/
	var prev = xadd(&rw._state, -1)
	std.assert(prev & Nrmask != 0, "error: rwlock underflowed\n")
	if prev & Nrmask == 1 && prev & Waitbit != 0
		/*
		If there are one or more waiting writers and no other readers
		have acquired the lock since we fetched the reader count, then
		the value of `_state` is guaranteed to be `Waitbit`. If the CAS
		succeeds, we wake one of those writers.
		*/
		if xcas(&rw._state, Waitbit, 0) == Waitbit
			ftxwake(&rw._state)
		;;
	;;
}

const wrunlock = {rw
	if rw._owner != tid()
		std.fput(std.Err,
			"error: thread {} attempted to unlock an rwlock last held by {}\n",
			tid(), rw._owner)
		std.suicide()
	;;
	rw._owner = -1

	/*
	If the wait bit was set then there are one or more waiting readers,
	writers, or both. In the first and third cases, we need to wake
	everyone; in the second, we'd like to just wake one thread. However, we
	currently have no way of knowing which case we're in so we always have
	to wake everyone.
	*/
	if xchg(&rw._state, 0) & Waitbit != 0
		ftxwakeall(&rw._state)
	;;
}