“There comes a time in all good programs when components or subsystems must stop communicating directly with one another.”
Core.async provides channels which allow processes to communicate in a decoupled way, by putting messages on queues. core.async.flow (C.A.F) is built on top of these channels, and provides 2 higher level abstractions: The Process - a thread of activity - and the Flow - a directed graph of Processes which communicate via channels. The objective of C.A.F: is separation of the application logic from:
Flows provide and centralize these things. Use it by:
The flow is a directed graph, where the nodes are the processes, and the edges are connections between the inputs and outputs of the processes. C.A.F handles all the internals of channel creation and process instantiation. The C.A.F API provides functions for starting/stopping, pausing/resuming, state-querying, and injecting data into the flow.
The usage API:
;; Create a flow by providing its config - i.e. the flow graph definition
def g (flow/create-flow gdef))
(;; start the flow, wrapping it in a monitoring function
(monitoring (flow/start g));; the flow actually starts paused, so you need to resume it to make
;; it do stuff
(flow/resume g);; wait a bit for some output to print to repl, then pause it
(flow/pause g):craps-finder :in] [[1 2] [2 1] [2 1] [6 6] [6 6] [4 3] [1 1]])
(flow/inject g [
(flow/ping g) (flow/stop g)
A flow-graph definition has two primary top-level keys: processes and
connections, called :procs
and :conns
. These
are your nodes and edges.
The flow-graph in the gist linked below is very simple, without any branching. It looks like this:
dice-source
↓
dedupe
↓
craps-finder
↓
printer
The dice-source
rolls two 6-sided dice. After going
through a deduplication function to remove consecutive identical rolls,
the craps
finder removes cases where the dice don’t sum to
2, 3 or 12 (presumably to do with the rules of craps, which I don’t
know). The output is then printed.
The full definition of graph looks like this.
def gdef
(:procs
{:dice-source
{:proc (flow/process
{:describe (fn [] {:outs {:out "roll the dice!"}})
{:introduce (fn [_]
200)
(Thread/sleep nil {:out [[(inc (rand-int 6)) (inc (rand-int 6))]]}])})}
[
:craps-finder
:proc (-> #(when (#{2 3 12} (apply + %)) %) flow/lift1->step flow/step-process)}
{
:dedupe
:proc (flow/step-process #'ddupe)}
{
:prn-sink
:proc (flow/process
{:describe (fn [] {:ins {:in "gimme stuff to print!"}})
{:transform (fn [_ _ v] (prn v))})}}
:conns
:dice-source :out] [:dedupe :in]]
[[[:dedupe :out] [:craps-finder :in]]
[[:craps-finder :out] [:prn-sink :in]]]}) [[
The connections part is pretty simple: a sequence of outchan to
inchan pairs creating a link between the two. The
flow/process
takes a map of functions:
Describe (required) is a 0-arity function which returns a map of keys
:ins
, :outs
, which enumerate the inchans and
outchans. The values are just docstrings. For each key in
:ins
, a channel will be created, and same for
:outs
. No key can be in both ins and outs. These keys will
be used to coordinate the flow in the connections section. If you
provide an ins key :foo
for node :bar
, your
connection tuple will be [:bar :foo]
.
A process can be a transforming process, taking inputs, or an introducing process, with no input channel. For any process you create, you have to provide either the transform or introduce key, but not both.
Transform is a 3-arity function which is called whenever a message
arrives. It does stuff with the input messages and (usually) will drop a
message on an output channel. The 3 params are
state in-name msg
, and it returns a
[state' output-map]
. The output-map is a map of
output-channel names to values. output-map can be nil. You can see this
most clearly in the dice-source
process:
nil ;; no state, so state is nil
[;; puts two random numbers on the :out chan
:out [[(inc (rand-int 6)) (inc (rand-int 6))]]}] {
Introduce is 1-arity function which takes a state and returns a
[state' output-map]
. It will be called repeatedly to
introduce data into the flow.
Init is a 1-arity function returning the initial state. It’s not shown explicitly in the gist, but I mention it because it comes up later.
;; prn-sink
(flow/process;; a process with a single inchan 'in' and no outchan.
;; it prints any messages it receives. Note the transform returns nil
:describe (fn [] {:ins {:in "gimme stuff to print!"}})
{:transform (fn [_ _ v] (prn v))})
;; dice-source
(flow/process;; a stateless introducing process which generates 2 random dice rolls
;; every 200ms and puts it on the only outchan 'out'
:describe (fn [] {:outs {:out "roll the dice!"}})
{:introduce (fn [_]
200)
(Thread/sleep nil {:out [[(inc (rand-int 6)) (inc (rand-int 6))]]}])})} [
The second way of describing a process is a
step-process
. It’s just sugar on top of the process
function. A step process takes a single function with 0, 1 and 3 arity,
with the arities corresponding to the describe, init, and transform keys
defined above. Looking at the ddupe:
defn ddupe
(:ins {:in "stuff"}
([] {:outs {:out "stuff w/o consecutive dupes"}})
:last nil})
([_] {:keys [last]} _ v]
([{;; updates the state to the msg
:last v}
[{;; returns the input message, provided it's not the same as the
;; last one
when (not= last v) {:out [v]})])) (
This could equally be described as
def ddupe2
(:describe (fn [] {:ins {:in "stuff"}
{:outs {:out "stuff w/o consecutive dupes"}})
:init (fn [_] {:last nil})
:transform (fn [{:keys [last]} _ v]
:last v} (when (not= last v) {:out [v]})])}) [{
The final process is craps-finder:
-> #(when (#{2 3 12} (apply + %)) %)
(
flow/lift1->step
flow/step-process)
;; separating out the function more explicitly:
;; The function returns the roll when it sums 2, 3 or 12. Otherwise nil
defn craps-finder [roll]
(when (#{2 3 12} (apply + roll)) roll)) (
lift1
is a specific case of a more general function
lift*
. lift*
takes a stateless function of
msg->[msg']
and turns it into a step-process with a
single inchan and outchan.
fn [_ _ msg] [nil {:out (f msg)}]) (
Note that the out value must be a vector. Which is no good
if your function is, say an int->int
function. When
lift*
is applied it’ll put an int
on the
outchan, which isn’t allowed. lift1 wraps the vector for you:
(fn [x] (when-some [m (f x)] (vector m)))
The last part of the gist has the following, which doesn’t actually use flows, but illustrates the monitoring functionality of flows:
defn monitoring [{:keys [report-chan error-chan]}]
(prn "========= monitoring start")
(
(async/threadloop []
(let [[val port] (async/alts!! [report-chan error-chan])]
(if (nil? val)
(prn "========= monitoring shutdown")
(do
(prn (str "======== message from " (if (= port error-chan) :error-chan :report-chan)))
(val)
(pp/pprint recur))))))
(nil)
(monitoring (flow/start g))
from this you can see that flow/start
returns a map with
keys report-chan
and error-chan
. The
monitoring function just takes stuff off these two channels and prints
them. As we’ll see this is used with the ping
function,
which writes to these channels. Presumably this would be used for
logging and stuff - stdout/stderr like.
Loading up the gist (you need to have
{io.github.clojure/core.async {:git/sha "c5a524c6607a792bc51a81f727e7d24ef4cae23b"}}
in your deps), we can play around with it.
def g (flow/create-flow gdef))
(;; it's a flow object
g ;; #object[clojure.core.async.flow.impl$create_flow$reify__17448 0x5c36347f "clojure.core.async.flow.impl$create_flow$reify__17448@5c36347f"]
If we start the flow, we can see that it does the ‘monitoring start’
(monitoring (flow/start g));; printed to repl
"========= monitoring start"
We can ping the flow, which prints the messages from the report-chan (and presumably the error-chan) to the repl. Note that the procs are all paused initially.
(flow/ping g)"======== message from :report-chan"
:clojure.core.async.flow{:report :ping,
#:pid :prn-sink,
:status :paused,
:state nil,
:count 0,
:ins
:in
{:buffer-type FixedBuffer,
{:buffer-count 0,
:put-count 0,
:take-count 0,
:closed? false}},
:outs {}}
"======== message from :report-chan"
:clojure.core.async.flow{:report :ping,
#:pid :craps-finder,
:status :paused,
:state nil,
:count 0,
:ins
:in
{:buffer-type FixedBuffer,
{:buffer-count 0,
:put-count 0,
:take-count 0,
:closed? false}},
:outs
:out
{:buffer-type FixedBuffer,
{:buffer-count 0,
:put-count 0,
:take-count 0,
:closed? false}}}
"======== message from :report-chan"
:clojure.core.async.flow{:report :ping,
#:pid :dedupe,
:status :paused,
:state {:last nil},
:count 0,
:ins
:in
{:buffer-type FixedBuffer,
{:buffer-count 0,
:put-count 0,
:take-count 0,
:closed? false}},
:outs
:out
{:buffer-type FixedBuffer,
{:buffer-count 0,
:put-count 0,
:take-count 0,
:closed? false}}}
"======== message from :report-chan"
:clojure.core.async.flow{:report :ping,
#:pid :dice-source,
:status :paused,
:state nil,
:count 0,
:ins {},
:outs
:out
{:buffer-type FixedBuffer,
{:buffer-count 0,
:put-count 0,
:take-count 0,
:closed? false}}}
When I resume, I start to get output printed to the repl
(flow/resume g);; to repl
1 2]
[2 1]
[2 1]
[1 1]
[2 1]
[6 6]
[6 6] [
And a ping shows the following (edited).
:report :ping,
{:pid :craps-finder,
:status :running,
:state nil,
:count 76,
:ins {:in :snipped},
:outs {:out :snipped}}
:report :ping,
{:pid :dedupe,
:status :running,
:state {:last [3 4]},
:count 79,
:ins {:in :snipped},
:outs {:out :snipped}}
If I do the pause, then the inject, I get the following report on the craps finder - note the 7 buffer-count on the craps-finder in and dedupe out (i.e. they are the same channel, and the inject just put the messages on that channel)
(flow/pause g):craps-finder :in] [[1 2] [2 1] [2 1] [6 6] [6 6] [4 3] [1 1]])
(flow/inject g [
(flow/ping g)
:report :ping,
{:pid :craps-finder,
:status :paused,
:state nil,
:count 85,
:ins
:in
{:buffer-type FixedBuffer,
{:buffer-count 7,
:put-count 0,
:take-count 0,
:closed? false}},
:outs {:out :snip}}
:report :ping,
{:pid :dedupe,
:status :paused,
:state {:last [1 1]},
:count 88,
:ins {:in :snip},
:outs
:out
{:buffer-type FixedBuffer,
{:buffer-count 7,
:put-count 0,
:take-count 0,
:closed? false}}}
If I resume the flow again, I get the values I injected (except the [4,3] which was filtered out by craps-finder) on the repl
(flow/resume g)
1 2]
[2 1]
[2 1]
[6 6]
[6 6]
[1 1] [
Doing the stop just shuts it all down.
There are several things in the docs which are not covered by the Gist:
All the processes in the :procs
map of the graph only
has the :proc
key (i.e. the actual process) in it. It can
also take an :args
key (a map which is passed on to the
process constructor), and a :chan-opts
map of
configurations for the channels that will be created. By default the
channels created have a buffer size of 10 before they start rejecting
input and exerting back pressure.
Multiple inputs/outputs for a proc are not shown here, but possible. A consequence of not having a multiple inchan process is that there’s no illustration of using the second argument of the transform, which is the inchan name, but presumably the pattern will be a pretty standard “if inchan is a, do this, but if it’s b, do this” sort of thing.
Multiple connections, e.g. one proc connects one output to the inputs of two separate procs, isn’t shown here. I think it’s just more tuples in the conns map, but it’s not explicit.
There’s a final set of keys to flow creation which allows the
ExecutorService to be specified. I don’t know what that is, but the keys
are :mixed-exec/:io-exec/:compute-exec
For most of the flow-level controls there are proc-level equivalents
- e.g. (pause-proc flow pid)
The :params
and :workload
keys of the
:describe
key in process creation are not illustrated, but
I think they’re just documentation, as opposed to ins and outs, which
are used in channel creation.
Proc can have a :transition
function, which will be
called when the proc (or flow) is paused, resumed or stopped. This
allows stuff like cleaning up state.
It’s not shown how to use :introduce
for more
‘realistic’ workflows, which would presumably be taking input from an
API or something. The docs mention a ::flow/control
channel.
Processes accept an option key which can include a
:workload
key, which can be :mixed
,
:io
or :compute
. I think this affects the
internals of what kinds of threads are used, or how they are used.
There is one other function in the API ns: futurize
,
though I don’t know enough about async programming to understand what it
does.