Imandrakit_thread.Sync_queueBlocking queue
A simple blocking queue.
This queue is quite basic and will not behave well under heavy contention. However, it can be sufficient for many practical use cases.
NOTE: this queue will typically block the caller thread in case the operation (push/pop) cannot proceed. Be wary of deadlocks when using the queue from a pool when you expect the other end to also be produced/consumed from the same pool.
See discussion on Fut.wait_block for more details on deadlocks and how to mitigate the risk of running into them.
More scalable queues can be found in Lockfree (https://github.com/ocaml-multicore/lockfree/)
type 'a t = 'a Moonpool.Blocking_queue.tUnbounded blocking queue.
This queue is thread-safe and will block when calling pop on it when it's empty.
val create : unit -> _ tCreate a new unbounded queue.
val size : _ t -> intNumber of items currently in the queue. Note that pop might still block if this returns a non-zero number, since another thread might have consumed the items in the mean time.
val push : 'a t -> 'a -> unitpush q x pushes x into q, and returns ().
In the current implementation, push q will never block for a long time, it will only block while waiting for a lock so it can push the element.
val pop : 'a t -> 'apop q pops the next element in q. It might block until an element comes.
val close : _ t -> unitval try_pop : force_lock:bool -> 'a t -> 'a optiontry_pop q immediately pops the first element of q, if any, or returns None without blocking.
val try_push : 'a t -> 'a -> booltry_push q x tries to push into q, in which case it returns true; or it fails to push and returns false without blocking.
val transfer : 'a t -> 'a Stdlib.Queue.t -> unittransfer bq q2 transfers all items presently in bq into q2 in one atomic section, and clears bq. It blocks if no element is in bq.
This is useful to consume elements from the queue in batch. Create a Queue.t locally:
let dowork (work_queue: job Bb_queue.t) =
(* local queue, not thread safe *)
let local_q = Queue.create() in
try
while true do
(* work on local events, already on this thread *)
while not (Queue.is_empty local_q) do
let job = Queue.pop local_q in
process_job job
done;
(* get all the events in the incoming blocking queue, in
one single critical section. *)
Bb_queue.transfer work_queue local_q
done
with Bb_queue.Closed -> ()val to_seq : 'a t -> 'a Stdlib.Seq.tto_gen q returns a (transient) sequence from the queue.
val pp : Imandrakit.Fmt.t -> 'a t -> unit