February 20, 2022

Implement your own Clojure Atoms

Let’s implement our own Clojure atoms for fun and educational purpose. Do not use it for your actual application =).

Enhance
Figure 1. Enhance

First, we take a look at how the reset! and swap! operations are implemented:

From Clojure core.clj:
(defn reset!
  "Sets the value of atom to newval without regard for the
  current value. Returns newval."
  {:added "1.0"
   :static true}
  [^clojure.lang.IAtom atom newval] (.reset atom newval))

(defn swap!
  "Atomically swaps the value of atom to be:
  (apply f current-value-of-atom args). Note that f may be called
  multiple times, and thus should be free of side effects.  Returns
  the value that was swapped in."
  {:added "1.0"
   :static true}
  ([^clojure.lang.IAtom atom f] (.swap atom f))
  ([^clojure.lang.IAtom atom f x] (.swap atom f x))
  ([^clojure.lang.IAtom atom f x y] (.swap atom f x y))
  ([^clojure.lang.IAtom atom f x y & args] (.swap atom f x y args)))

The swap! and reset! operations call operations on the clojure.lang.IAtom interface. We can implement those methods for our own atom implementation:

Methods of IAtom:
public interface IAtom{
    Object swap(IFn f);
    Object swap(IFn f, Object arg);
    Object swap(IFn f, Object arg1, Object arg2);
    Object swap(IFn f, Object x, Object y, ISeq args);
    boolean compareAndSet(Object oldv, Object newv);
    Object reset(Object newval);
}

Let’s implement reset! first. We use an Java atomic reference to keep the value. On reset! we just set that value:

reset!
(ns experiment
  (:import (clojure.lang IAtom)
           (java.util.concurrent.atomic AtomicReference)))

(defn my-atom-impl [init-value]
  (let [atomic-ref (AtomicReference. init-value)]
    (proxy [IAtom] []
      (reset [newval]
        (.set atomic-ref newval)
        newval))))

compare-and-set! is also easy. We forward that to the AtomicReference.compareAndSet, nothing else to do:

compare-and-set!
```
(compareAndSet [oldv newv]
        (.compareAndSet oldv newv))
```

Let`s try these operations:

(def instance (my-atom-impl 1))
(println (reset! instance 2)) ; => 2
(println (compare-and-set! instance 2 3)) ; => true
(println (compare-and-set! instance 4 5)) ; => false
(println (deref instance)) ; => Crash class experiment.proxy$java.lang.Object$IAtom$8f19b142 cannot be cast to class java.util.concurrent.Future

Everything works, but we can’t get the value from our atom. Again, if we look at the deref source, we see that it expects a IDeref interface:

From Clojure core.clj:
(defn deref
  "Also reader macro: @ref/@agent/@var/@atom/@delay/@future/@promise. Within a transaction,
  returns the in-transaction-value of ref, else returns the
  most-recently-committed value of ref. When applied to a var, agent
  or atom, returns its current state. When applied to a delay, forces
  it if not already forced. When applied to a future, will block if
  computation not complete. When applied to a promise, will block
  until a value is delivered.  The variant taking a timeout can be
  used for blocking references (futures and promises), and will return
  timeout-val if the timeout (in milliseconds) is reached before a
  value is available. See also - realized?."
  {:added "1.0"
   :static true}
  ([ref] (if (instance? clojure.lang.IDeref ref)
           (.deref ^clojure.lang.IDeref ref)
           (deref-future ref)))
  ([ref timeout-ms timeout-val]
     (if (instance? clojure.lang.IBlockingDeref ref)
       (.deref ^clojure.lang.IBlockingDeref ref timeout-ms timeout-val)
       (deref-future ref timeout-ms timeout-val))))
public interface IDeref{
    Object deref() ;
}

Let’s implement that interface as well:

(proxy [IAtom IDeref] []
   (deref [] (.get atomic-ref))
   ...
)

At last, we implement the swap! method. Because this method does atomically update based on the last value, we have to ensure that it retries if there was an update meanwhile:

swap!
(swap
        ([iFn]
         (loop [old (.get atomic-ref)]
           (let [new (iFn old)]
             (if (.compareAndSet atomic-ref old new)
               new
               (recur (.get atomic-ref))))))
        ([iFn o] (left-out-as-excersise-for-you))
        ([iFn o o1] (left-out-as-excersise-for-you))
        ([iFn o o1 iSeq] (left-out-as-excersise-for-you)))

We get the old value, update it with the specified function, then only swap in the existing value didn’t change. If the value in the atomic reference changed, then retry again with the changed value. This is why the functions passed to swap! may be called multiple times.

Atom Watchers

Let’s continue with the watchers. Peek into the Clojure source and we see that its par of the IRef interface:

add-watch from Clojure core
(defn add-watch
  "Adds a watch function to an agent/atom/var/ref reference. The watch
  fn must be a fn of 4 args: a key, the reference, its old-state, its
  new-state. Whenever the reference's state might have been changed,
  any registered watches will have their functions called. The watch fn
  will be called synchronously, on the agent's thread if an agent,
  before any pending sends if agent or ref. Note that an atom's or
  ref's state may have changed again prior to the fn call, so use
  old/new-state rather than derefing the reference. Note also that watch
  fns may be called from multiple threads simultaneously. Var watchers
  are triggered only by root binding changes, not thread-local
  set!s. Keys must be unique per reference, and can be used to remove
  the watch with remove-watch, but are otherwise considered opaque by
  the watch mechanism."
  {:added "1.0"
   :static true}
  [^clojure.lang.IRef reference key fn] (.addWatch reference key fn))
public interface IRef extends IDeref{
	void setValidator(IFn vf);
    IFn getValidator();
    IPersistentMap getWatches();
    IRef addWatch(Object key, IFn callback);
    IRef removeWatch(Object key);
}

Implementing the watchers makes the implementation a bit more tricky, as we can’t delegate calls 1:1 to the Java AtomicReference anymore. We need to call the watchers at the right time.

First, we use a ConcurrentHashMap to keep the watchers. This makes adding/removing watchers thread-safe. Plus we create a utility function to call the watchers:

(let [atomic-ref (AtomicReference. init-value)
    watchers (ConcurrentHashMap.)
    notify-watchers (fn [atom old new]
                      (doseq [^Map$Entry e watchers]
                        ((.getValue e) (.getKey e) atom old new)))]

Adding and removing uses the concurrent map to do so:

(addWatch [key callback]
  (.put watchers key callback)
  this)
(removeWatch [key]
  (.remove watchers key)
  this)

For each of the updating function, we have to call the watchers at the right time. For the .compareAndSet that is only when the update succeeded. For reset we need to ensure we also get the old value first. For swap it is when the atomic update worked:

(compareAndSet [oldv newv]
(let [success (.compareAndSet atomic-ref oldv newv)]
  (when success
    (notify-watchers this oldv newv))
  success))
(reset [newval]
(let [old (.getAndSet atomic-ref newval)]
  (notify-watchers this old newval)
  newval))
(swap
([iFn]
 (loop [old (.get atomic-ref)]
   (let [new (iFn old)]
     (if (.compareAndSet atomic-ref old new)
       (do
         (notify-watchers this old new)
         new)
       (recur (.get atomic-ref))))))
  ...
)

Conclusion

In summary: Clojure atoms are not magic. Each function on an atom delegates to a Java interface method. We could continue in this fashion, implementing the missing methods.

You can also peek into the actual Clojure code if want to see the complete implementation.

PS: Here’s the complete listing of the implementation in this post:
(defn left-out-as-excersise-for-you [] (throw (AssertionError. "Not yet implemented")))
(defn my-atom-impl [init-value]
  (let [atomic-ref (AtomicReference. init-value)
        watchers (ConcurrentHashMap.)
        notify-watchers (fn [atom old new]
                          (doseq [^Map$Entry e watchers]
                            ((.getValue e) (.getKey e) atom old new)))]
    (proxy [IAtom IRef] []
      (addWatch [key callback]
        (.put watchers key callback)
        this)
      (removeWatch [key]
        (.remove watchers key)
        this)
      (deref [] (.get atomic-ref))
      (compareAndSet [oldv newv]
        (let [success (.compareAndSet atomic-ref oldv newv)]
          (when success
            (notify-watchers this oldv newv))
          success))
      (reset [newval]
        (let [old (.getAndSet atomic-ref newval)]
          (notify-watchers this old newval)
          newval))
      (swap
        ([iFn]
         (loop [old (.get atomic-ref)]
           (let [new (iFn old)]
             (if (.compareAndSet atomic-ref old new)
               (do
                 (notify-watchers this old new)
                 new)
               (recur (.get atomic-ref))))))
        ([iFn o] (left-out-as-excersise-for-you))
        ([iFn o o1] (left-out-as-excersise-for-you))
        ([iFn o o1 iSeq] (left-out-as-excersise-for-you))))))
Tags: Concurrency Clojure Development