co.paralleluniverse.pulsar.core documentation

Pulsar is an implementation of lightweight threads (fibers),
Go-like channles and Erlang-like actors for the JVM

->timeunit

(->timeunit x)
Constructs an instance of `java.util.concurrent.TimeUnit`.
If argument x is already an instance of `TimeUnit`, the function returns x.
Otherwise, x *must* be a keyword, in which case the following conversion
is performed:

:nanoseconds | :nanos | :ns   -> TimeUnit/NANOSECONDS
:microseconds | :us           -> TimeUnit/MICROSECONDS
:milliseconds | :millis | :ms -> TimeUnit/MILLISECONDS
:seconds | :sec               -> TimeUnit/SECONDS
:minutes | :mins              -> TimeUnit/MINUTES
:hours | :hrs                 -> TimeUnit/HOURS
:days                         -> TimeUnit/DAYS

alive?

(alive? a)
Tests whether or not a strand is alive. 
A strand is alive if it has been started but has not yet died.

channel

(channel capacity overflow-policy)(channel capacity)(channel)
Creates a new channel.

Optional arguments:
capacity        - specifies how many messages the channel can contain (until they are consumed)
                  * A value of `0` designates a *transfer channel*, that blocks both `snd` and `rcv` 
                    until a corresponding operation (`rcv` or `snd` respectively) is called.
                  * A value of `-1` creates an unbounded channel.

                  default: 0

overflow-policy - specifies what `snd` does when the channel's capacity is exhausted.
                  May be one of:
                  * :throw    - throws an exception.
                  * :block    - blocks until a message is consumed and room is available
                  * :drop     - the message is silently dropped
                  * :displace - the old message waiting in the queue is discarded to make room for the new message.
                  
                  default: :block

The default channel capacity is 0 and the default policy is :block

close!

(close! channel)
Closes a channel.
Messages already in the channel will be received, but all future attempts at `snd`
will silently discard the message.

closed?

(closed? channel)
Tests whether a channel has been closed and contains no more messages that 
can be received.

convert-duration

(convert-duration x from-unit to-unit)
Converts a time duration from one time unit to another.
x is the duration; `from-unit` and `to-unit` are the source
and target units repsectively, given as either a j.u.c.TimeUnit instance
or as a keyword, as specified by `->timeunit`.

current-fiber

(current-fiber)
Returns the currently running lightweight-thread or `nil` if none.

current-strand

(current-strand)
Returns the currently running fiber (if running in fiber)
or current thread (if not).

defsfn

macro

(defsfn & expr)
Defines a suspendable function that can be used by a fiber or actor.
Used exactly like `defn`

double-channel

(double-channel size overflow-policy)(double-channel size)(double-channel)
Creates a double channel

fiber

(fiber & args)
Creates, but does not start a new fiber (a lightweight thread) running in a fork/join pool.

It is much preferable to use `spawn-fiber`.

fiber->future

(fiber->future f)
Takes a spawned fiber yields a future object that will
invoke the function in another thread, and will cache the result and
return it on all subsequent calls to deref/@. If the computation has
not yet finished, calls to deref/@ will block, unless the variant
of deref with timeout is used. See also - realized?.

fj-pool

A global fork/join pool. The pool uses all available processors and runs in the async mode.

float-channel

(float-channel size overflow-policy)(float-channel size)(float-channel)
Creates a float channel

int-channel

(int-channel size overflow-policy)(int-channel size)(int-channel)
Creates an int channel

join

(join s)(join timeout unit s)
Awaits the termination of the given strand or strands, and returns
their result, if applicable.

If a single strand is given, its result is returned;
if a collection - then a collection of the repsective results.

Note that for threads, the result is always `nil`, as threads don't return a value.

If a timeout is supplied and it elapses before the strand has terminated,
a j.u.c.TimeoutException is thrown.

s       - either a strand or a collection of strands.
timeout - how long to wait for the strands termination
unit    - the unit of the timeout duration. TimeUnit or keyword as in `->timeunit`

letsfn

macro

(letsfn fnspecs & body)
Defines a local suspendable function that can be used by a fiber or actor.
Used exactly like `letfn`

long-channel

(long-channel size overflow-policy)(long-channel size)(long-channel)
Creates a long channel

make-fj-pool

(make-fj-pool parallelism async)
Creates a new ForkJoinPool with the given parallelism and with the given async mode

promise

(promise)(promise f)
Returns a promise object that can be read with deref/@, and set,
once only, with deliver. Calls to deref/@ prior to delivery will
block, unless the variant of deref with timeout is used. All
subsequent derefs will return the same delivered value without
blocking. See also - realized?.

Unlike clojure.core/promise, this promise object can be used inside Pulsar fibers.

rcv-double

macro

(rcv-double channel)(rcv-double channel timeout unit)
Receives a double value from a double-channel.

See: `rcv`

rcv-float

macro

(rcv-float channel)(rcv-float channel timeout unit)
Receives a float value from a float-channel.

See: `rcv`

rcv-group

(rcv-group ports)(rcv-group port & ports)
Creates a receive-port (a read-only channel) from a group of channels.
Receiving a message from the group will return the next message available from
any of the channels in the group.
A message received from the group is consumed (removed) from the original member
channel to which it has been sent.

rcv-int

macro

(rcv-int channel)(rcv-int channel timeout unit)
Receives an int value from an int-channel.

See: `rcv`

rcv-long

macro

(rcv-long channel)(rcv-long channel timeout unit)
Receives a long value from a long-channel.

See: `rcv`

select

macro

(select & clauses)
Performs a very similar operation to `sel`, but allows you to specify an action to perform depending 
on which operation has succeeded.
Takes an even number of expressions, ordered as (ops1, action1, ops2, action2 ...) with the ops being 
a channel operation descriptior (remember: a descriptor is either a channel for an `rcv` operation, 
or a vector of a channel and a message specifying a `snd` operation) or a collection of descriptors, 
and the actions are Clojure expressions. 
Like `sel`, `select` performs at most one operation, in which case it will run the operation's 
respective action and return its result.

An action expression can bind values to the operations results. 
The action expression may begin with a vector of one or two symbols. In that case, the first symbol 
will be bound to the message returned from the successful receive in the respective ops clause 
(or `nil` if the successful operation is a `snd`), and the second symbol, if present, will be bound 
to the successful operation's channel.

Like `sel`, `select` blocks until an operation succeeds, or, if a `:timeout` option is specified, 
until the timeout (in milliseconds) elapses. If a timeout is specfied and elapses, `select` will run 
the action in an optional `:else` clause and return its result, or, if an `:else` clause is not present, 
`select` will return `nil`.

Example:

(select :timeout 100 
       c1 ([v] (println "received" v))
       [[c2 m2] [c3 m3]] ([v c] (println "sent to" c))
       :else "timeout!")

In the example, if a message is received from channel `c1`, then it will be printed. 
If a message is sent to either `c2` or `c3`, then the identity of the channel will be printed, 
and if the 100 ms timeout elapses then "timeout!" will be printed.

sfn

macro

(sfn & expr)
Creates a suspendable function that can be used by a fiber or actor.
Used exactly like `fn`

snd-double

macro

(snd-double channel message)
Sends a double value to a double-channel.  

See: `snd`

snd-float

macro

(snd-float channel message)
Sends a float value to a float-channel.  

See: `snd`

snd-int

macro

(snd-int channel message)
Sends an int value to an int-channel.

See: `snd`

snd-long

macro

(snd-long channel message)
Sends a long value to a long-channel.  

See: `snd`

spawn-fiber

macro

(spawn-fiber :name? :stack-size? :fj-pool? f & args)
Creates and starts a new fiber.

f - the function to run in the fiber.
args - (optional) arguments for the function

Options:
:name str     - the fiber's name
:stack-size n - the fiber's initial stack size
:fj-pool      - the fork-join pool in which the fiber will run

spawn-thread

(spawn-thread :name? f & args)
Creates and starts a new thread.

f - the function to run in the thread.
args - (optional) arguments to pass to the function

Options:
:name str     - the thread's name

start

(start fiber)
Starts a fiber created with `fiber`.

subscribe!

(subscribe! topic channel)
Subscribes a channel to a topic.
The subscribed channel will receive all messages sent to the topic.

suspendable!

(suspendable! f)(suspendable! x prot)
Makes a function suspendable.

suspendable?

(suspendable? f)
Returns true of a function has been instrumented as suspendable; false otherwise.

ticker-consumer

(ticker-consumer ticker)
Creates a rcv-port (read-only channel) that returns messages from a *ticker channel*.
A ticker channel is a bounded channel with an overflow policy of :displace.

Different ticker consumers are independent (a message received from one is not removed from others),
and guarantee monotonicty (messages are received in order), but if messages are sent to the
ticker channel faster than they are consumed then messages can be lost.

topic

(topic)
Creates a new topic.
A topic is a send-port (a write-only channel) that forwards every message sent to it
to a group of subscribed channels.
Use `subscribe!` and `unsubscribe!` to subscribe and unsubscribe a channel to or from
the topic.

try-rcv

(try-rcv channel)
Attempts to immediately (without blocking) receive a message from a channel.
Returns the message if one is immediately available; `nil` otherwise.
This function never blocks.

try-snd

(try-snd channel message)
Tries to immediately send a message to a channel.
If the channel's capacity is exceeded, this function fails and returns `false`.
Returns `true` if the operation succeeded; `false` otherwise.
This function never blocks.

try-snd-double

macro

(try-snd-double channel message)
Tries to immediately send a double value to a double-channel.
Returns `true` if successful, `false` otherwise.  

See: `try-snd`

try-snd-float

macro

(try-snd-float channel message)
Tries to immediately send a float value to a float-channel.
Returns `true` if successful, `false` otherwise.  

See: `try-snd`

try-snd-int

macro

(try-snd-int channel message)
Tries to immediately send an int value to an int-channel.
Returns `true` if successful, `false` otherwise.

See: `try-snd`

try-snd-long

macro

(try-snd-long channel message)
Tries to immediately send a long value to a long-channel.
Returns `true` if successful, `false` otherwise.  

See: `try-snd`

unsubscribe!

(unsubscribe! topic channel)
Unsubscribes a channel from a topic.
The channel will stop receiving messages sent to the topic.