Module Imandrakit_thread.Sync_queue

Blocking queue

Unbounded blocking queue.

This queue is thread-safe and will block when calling pop on it when it's empty.

val create : unit -> _ t

Create a new unbounded queue.

val size : _ t -> int

Number of items currently in the queue. Note that pop might still block if this returns a non-zero number, since another thread might have consumed the items in the mean time.

  • since 0.2
exception Closed
val push : 'a t -> 'a -> unit

push q x pushes x into q, and returns ().

In the current implementation, push q will never block for a long time, it will only block while waiting for a lock so it can push the element.

  • raises Closed

    if the queue is closed (by a previous call to close q)

val pop : 'a t -> 'a

pop q pops the next element in q. It might block until an element comes.

  • raises Closed

    if the queue was closed before a new element was available.

val close : _ t -> unit

Close the queue, meaning there won't be any more push allowed, ie push will raise Closed.

pop will keep working and will return the elements present in the queue, until it's entirely drained; then pop will also raise Closed.

val try_pop : force_lock:bool -> 'a t -> 'a option

try_pop q immediately pops the first element of q, if any, or returns None without blocking.

  • parameter force_lock

    if true, use Mutex.lock (which can block under contention); if false, use Mutex.try_lock, which might return None even in presence of an element if there's contention

val try_push : 'a t -> 'a -> bool

try_push q x tries to push into q, in which case it returns true; or it fails to push and returns false without blocking.

  • raises Closed

    if the locking succeeded but the queue is closed.

val transfer : 'a t -> 'a Stdlib.Queue.t -> unit

transfer bq q2 transfers all items presently in bq into q2 in one atomic section, and clears bq. It blocks if no element is in bq.

This is useful to consume elements from the queue in batch. Create a Queue.t locally:

let dowork (work_queue: job Bb_queue.t) =
  (* local queue, not thread safe *)
  let local_q = Queue.create() in
  try
    while true do
      (* work on local events, already on this thread *)
      while not (Queue.is_empty local_q) do
        let job = Queue.pop local_q in
        process_job job
      done;

      (* get all the events in the incoming blocking queue, in
         one single critical section. *)
      Bb_queue.transfer work_queue local_q
    done
  with Bb_queue.Closed -> ()
  • since 0.4
type 'a gen = unit -> 'a option
type 'a iter = ('a -> unit) -> unit
val to_gen : 'a t -> 'a gen

to_gen q returns a generator from the queue.

  • since 0.4
val to_seq : 'a t -> 'a Stdlib.Seq.t

to_gen q returns a (transient) sequence from the queue.

  • since 0.4
val pp : Imandrakit.Fmt.t -> 'a t -> unit
val to_iter : 'a t -> 'a Iter.t

Iterate on items.