Implement your own Clojure Atoms
Let’s implement our own Clojure atoms for fun and educational purpose. Do not use it for your actual application =).
First, we take a look at how the reset!
and swap!
operations are implemented:
(defn reset!
"Sets the value of atom to newval without regard for the
current value. Returns newval."
{:added "1.0"
:static true}
[^clojure.lang.IAtom atom newval] (.reset atom newval))
(defn swap!
"Atomically swaps the value of atom to be:
(apply f current-value-of-atom args). Note that f may be called
multiple times, and thus should be free of side effects. Returns
the value that was swapped in."
{:added "1.0"
:static true}
([^clojure.lang.IAtom atom f] (.swap atom f))
([^clojure.lang.IAtom atom f x] (.swap atom f x))
([^clojure.lang.IAtom atom f x y] (.swap atom f x y))
([^clojure.lang.IAtom atom f x y & args] (.swap atom f x y args)))
The swap!
and reset!
operations call operations on the clojure.lang.IAtom
interface.
We can implement those methods for our own atom implementation:
public interface IAtom{
Object swap(IFn f);
Object swap(IFn f, Object arg);
Object swap(IFn f, Object arg1, Object arg2);
Object swap(IFn f, Object x, Object y, ISeq args);
boolean compareAndSet(Object oldv, Object newv);
Object reset(Object newval);
}
Let’s implement reset!
first. We use an Java atomic reference to keep the value.
On reset!
we just set that value:
(ns experiment
(:import (clojure.lang IAtom)
(java.util.concurrent.atomic AtomicReference)))
(defn my-atom-impl [init-value]
(let [atomic-ref (AtomicReference. init-value)]
(proxy [IAtom] []
(reset [newval]
(.set atomic-ref newval)
newval))))
compare-and-set!
is also easy. We forward that to the AtomicReference.compareAndSet
,
nothing else to do:
```
(compareAndSet [oldv newv]
(.compareAndSet oldv newv))
```
Let`s try these operations:
(def instance (my-atom-impl 1))
(println (reset! instance 2)) ; => 2
(println (compare-and-set! instance 2 3)) ; => true
(println (compare-and-set! instance 4 5)) ; => false
(println (deref instance)) ; => Crash class experiment.proxy$java.lang.Object$IAtom$8f19b142 cannot be cast to class java.util.concurrent.Future
Everything works, but we can’t get the value from our atom.
Again, if we look at the deref
source, we see that it expects a IDeref
interface:
(defn deref
"Also reader macro: @ref/@agent/@var/@atom/@delay/@future/@promise. Within a transaction,
returns the in-transaction-value of ref, else returns the
most-recently-committed value of ref. When applied to a var, agent
or atom, returns its current state. When applied to a delay, forces
it if not already forced. When applied to a future, will block if
computation not complete. When applied to a promise, will block
until a value is delivered. The variant taking a timeout can be
used for blocking references (futures and promises), and will return
timeout-val if the timeout (in milliseconds) is reached before a
value is available. See also - realized?."
{:added "1.0"
:static true}
([ref] (if (instance? clojure.lang.IDeref ref)
(.deref ^clojure.lang.IDeref ref)
(deref-future ref)))
([ref timeout-ms timeout-val]
(if (instance? clojure.lang.IBlockingDeref ref)
(.deref ^clojure.lang.IBlockingDeref ref timeout-ms timeout-val)
(deref-future ref timeout-ms timeout-val))))
public interface IDeref{
Object deref() ;
}
Let’s implement that interface as well:
(proxy [IAtom IDeref] []
(deref [] (.get atomic-ref))
...
)
At last, we implement the swap!
method. Because this method does
atomically update based on the last value, we have to ensure that it retries if
there was an update meanwhile:
(swap
([iFn]
(loop [old (.get atomic-ref)]
(let [new (iFn old)]
(if (.compareAndSet atomic-ref old new)
new
(recur (.get atomic-ref))))))
([iFn o] (left-out-as-excersise-for-you))
([iFn o o1] (left-out-as-excersise-for-you))
([iFn o o1 iSeq] (left-out-as-excersise-for-you)))
We get the old value, update it with the specified function,
then only swap in the existing value didn’t change.
If the value in the atomic reference changed, then retry again with the changed value.
This is why the functions passed to swap!
may be called multiple times.
Atom Watchers
Let’s continue with the watchers.
Peek into the Clojure source and we see that its par of the IRef
interface:
(defn add-watch
"Adds a watch function to an agent/atom/var/ref reference. The watch
fn must be a fn of 4 args: a key, the reference, its old-state, its
new-state. Whenever the reference's state might have been changed,
any registered watches will have their functions called. The watch fn
will be called synchronously, on the agent's thread if an agent,
before any pending sends if agent or ref. Note that an atom's or
ref's state may have changed again prior to the fn call, so use
old/new-state rather than derefing the reference. Note also that watch
fns may be called from multiple threads simultaneously. Var watchers
are triggered only by root binding changes, not thread-local
set!s. Keys must be unique per reference, and can be used to remove
the watch with remove-watch, but are otherwise considered opaque by
the watch mechanism."
{:added "1.0"
:static true}
[^clojure.lang.IRef reference key fn] (.addWatch reference key fn))
public interface IRef extends IDeref{
void setValidator(IFn vf);
IFn getValidator();
IPersistentMap getWatches();
IRef addWatch(Object key, IFn callback);
IRef removeWatch(Object key);
}
Implementing the watchers makes the implementation a bit more tricky, as we can’t delegate calls 1:1 to the Java AtomicReference anymore. We need to call the watchers at the right time.
First, we use a ConcurrentHashMap
to keep the watchers.
This makes adding/removing watchers thread-safe.
Plus we create a utility function to call the watchers:
(let [atomic-ref (AtomicReference. init-value)
watchers (ConcurrentHashMap.)
notify-watchers (fn [atom old new]
(doseq [^Map$Entry e watchers]
((.getValue e) (.getKey e) atom old new)))]
Adding and removing uses the concurrent map to do so:
(addWatch [key callback]
(.put watchers key callback)
this)
(removeWatch [key]
(.remove watchers key)
this)
For each of the updating function, we have to call the watchers at the right time.
For the .compareAndSet
that is only when the update succeeded.
For reset
we need to ensure we also get the old value first.
For swap
it is when the atomic update worked:
(compareAndSet [oldv newv]
(let [success (.compareAndSet atomic-ref oldv newv)]
(when success
(notify-watchers this oldv newv))
success))
(reset [newval]
(let [old (.getAndSet atomic-ref newval)]
(notify-watchers this old newval)
newval))
(swap
([iFn]
(loop [old (.get atomic-ref)]
(let [new (iFn old)]
(if (.compareAndSet atomic-ref old new)
(do
(notify-watchers this old new)
new)
(recur (.get atomic-ref))))))
...
)
Conclusion
In summary: Clojure atoms are not magic. Each function on an atom delegates to a Java interface method. We could continue in this fashion, implementing the missing methods.
You can also peek into the actual Clojure code if want to see the complete implementation.
(defn left-out-as-excersise-for-you [] (throw (AssertionError. "Not yet implemented")))
(defn my-atom-impl [init-value]
(let [atomic-ref (AtomicReference. init-value)
watchers (ConcurrentHashMap.)
notify-watchers (fn [atom old new]
(doseq [^Map$Entry e watchers]
((.getValue e) (.getKey e) atom old new)))]
(proxy [IAtom IRef] []
(addWatch [key callback]
(.put watchers key callback)
this)
(removeWatch [key]
(.remove watchers key)
this)
(deref [] (.get atomic-ref))
(compareAndSet [oldv newv]
(let [success (.compareAndSet atomic-ref oldv newv)]
(when success
(notify-watchers this oldv newv))
success))
(reset [newval]
(let [old (.getAndSet atomic-ref newval)]
(notify-watchers this old newval)
newval))
(swap
([iFn]
(loop [old (.get atomic-ref)]
(let [new (iFn old)]
(if (.compareAndSet atomic-ref old new)
(do
(notify-watchers this old new)
new)
(recur (.get atomic-ref))))))
([iFn o] (left-out-as-excersise-for-you))
([iFn o o1] (left-out-as-excersise-for-you))
([iFn o o1 iSeq] (left-out-as-excersise-for-you))))))