Moonpool.Blocking_queue
A simple blocking queue.
This queue is quite basic and will not behave well under heavy contention. However, it can be sufficient for many practical use cases.
NOTE: this queue will typically block the caller thread in case the operation (push/pop) cannot proceed. Be wary of deadlocks when using the queue from a pool when you expect the other end to also be produced/consumed from the same pool.
See discussion on Fut.wait_block
for more details on deadlocks and how to mitigate the risk of running into them.
More scalable queues can be found in Lockfree (https://github.com/ocaml-multicore/lockfree/)
Unbounded blocking queue.
This queue is thread-safe and will block when calling pop
on it when it's empty.
val create : unit -> _ t
Create a new unbounded queue.
val size : _ t -> int
Number of items currently in the queue. Note that pop
might still block if this returns a non-zero number, since another thread might have consumed the items in the mean time.
val push : 'a t -> 'a -> unit
push q x
pushes x
into q
, and returns ()
.
In the current implementation, push q
will never block for a long time, it will only block while waiting for a lock so it can push the element.
val pop : 'a t -> 'a
pop q
pops the next element in q
. It might block until an element comes.
val close : _ t -> unit
val try_pop : force_lock:bool -> 'a t -> 'a option
try_pop q
immediately pops the first element of q
, if any, or returns None
without blocking.
val try_push : 'a t -> 'a -> bool
try_push q x
tries to push into q
, in which case it returns true
; or it fails to push and returns false
without blocking.
val transfer : 'a t -> 'a Stdlib.Queue.t -> unit
transfer bq q2
transfers all items presently in bq
into q2
in one atomic section, and clears bq
. It blocks if no element is in bq
.
This is useful to consume elements from the queue in batch. Create a Queue.t
locally:
let dowork (work_queue: job Bb_queue.t) =
(* local queue, not thread safe *)
let local_q = Queue.create() in
try
while true do
(* work on local events, already on this thread *)
while not (Queue.is_empty local_q) do
let job = Queue.pop local_q in
process_job job
done;
(* get all the events in the incoming blocking queue, in
one single critical section. *)
Bb_queue.transfer work_queue local_q
done
with Bb_queue.Closed -> ()
to_iter q
returns an iterator over all items in the queue. This might not terminate if q
is never closed.
val to_seq : 'a t -> 'a Stdlib.Seq.t
to_gen q
returns a (transient) sequence from the queue.