Concurrency |
Seit 2005 sind CPUs nicht nennenswert schneller geworden.
Dafür bekommen wir immer mehr Kerne je CPU
Wenn wir die Hardware voll nutzen wollen, müssen wir Dinge parallel machen.
Keine Concurrency:
"I cannot text you with a drink in my hand, eh"
(Lady Gaga - Telephone)
Concurrency:
"I will put down this drink to text you, then put my phone away and continue drinking, eh"
Wir managen mehrere Aufgaben gleichzeitig
Parallelism:
"I can text you with one hand while I use the other to drink, eh"
Wir bearbeiten mehrere Aufgaben gleichzeitig
Distributed computing:
"Please text this guy while I drink, eh"
Der Übergang zu Concurrency ist der schwierige.
Ob wir am Ende wirklich parallelisieren macht für das Software Design wenig unterschied.
Wir realisieren Concurrency mithilfe von JVM-Threads.
(plattformunabhängige Abstraktion von OS Threads).
Die Operationen innerhalb eines Threads laufen sequentiell ab.
Die Reihenfolge zwischen Threads ist nicht vorbestimmt.
Mit Futures können wir eine Aufgabe auf einen eigenen Thread auslagern:
(do (future (Thread/sleep 4000)
(println "I'll print after 4 seconds"))
(println "I'll print immediately"))
Innerhalb eines Futures können wir einen Rückgabewert generieren
(def future-result
(future (println "Starting hard computation")
(Thread/sleep 10000)
(+ 1 1)))
Auf das Ergebnis können wir durch Dereferenzieren zugreifen:
(println (deref future-result))
(println @future-result)
deref blockiert, bis das Future seine Berechnung abgeschlossen hat.
Der Mehrwert entsteht, wenn ich zwischen Future erstellen und Dereferenzieren etwas anderes mache.
Wir können deref ein Timeout (in Millisekunden) und einen Default Wert mitgeben.
(deref (future (Thread/sleep 2000) 0) 1000 42)
Wir können ein Future fragen, ob es schon fertig ist (ohne zu blocken):
(def result (future (Thread/sleep 10000) (+ 1 1)))
(realized? result)
Futures sind vor allem gut um IO Prozesse auszulagern:
Sobald wir mehrere Threads verwenden, wird es für die Runtime schwierig festzustellen, wann unser Programm endet.
Mit (shutdown-agents) können wir den Thread-Pool schließen. Jeder Thread läuft noch zuende, aber keine neuen Futures werden mehr akzeptiert.
Invers zum Future ist das Promise:
(def promised-value (promise))
(future (println "The promised value is:" @promised-value))
(deliver promised-value (+ 1 1))
Wir können einem Promise nur einmal einen Wert geben.
Jedes weitere deliver hat keine Auswirkung.
(let [p (promise)]
(future (Thread/sleep 1500) (deliver p 1))
(future (Thread/sleep 1000) (deliver p 2))
(future (Thread/sleep 2000) (deliver p 3))
(println @p))
Die Reihenfolge der Rechenschritte wird nun
nicht-deterministisch.
Je nach Code können auch die berechneten Ergebnisse mit jedem Lauf anders sein.
Genau wie bei Futures, können wir auch bei Promises ein Timeout und einen Default Wert angeben:
(let [p (promise)]
(deref p 100 "timed out"))
Da Futures und Promises nur einmal ihren Zustand ändern können, sind sie praktisch immutable.
Manchmal brauchen wir aber die Option, den Zustand öfter zu wechseln.
In Clojure kann ein Atom seinen Zustand beliebig oft ändern.
Ausnahme zur allgemeinen Immutability.
(def my-atom (atom 1))
@my-atom
; => 1
(reset! my-atom 42)
@my-atom
; => 42
(defn counter-increaser [counter]
(dotimes [_ 10000] (reset! counter (+ @counter 1))))
(let [counter (atom 0)
f1 (future (counter-increaser counter))
f2 (future (counter-increaser counter))]
@f1 ; wait till futures finish
@f2
(println @counter))
Hier haben wir eine Race Condition
Beide Threads lesen gleichzeitig(+ @counter 1)
und überschreiben dasreset!
Der korrekte Weg, um den Wert eines Atom zu ändern ist mit swap!
Wir übergeben eine Funktion, die dem Atom sagt, wie es sich updaten soll:
(defn counter-increaser [counter]
(dotimes [_ 10000] (swap! counter #(+ % 1))))
swap! blockiert nicht, sondern macht ein optimistisches Update:
(defn counter-inc [counter-val]
(println counter-val)
(+ 1 counter-val))
(defn counter-increaser [counter]
(dotimes [n 20] (swap! counter counter-inc)))
(let [counter (atom 0)
f1 (future (counter-increaser counter))
f2 (future (counter-increaser counter))]
@f1 ; wait till futures finish
@f2
(println @counter))
Das Beispiel demonstriert noch eine Race Condition: Beide Threads schreiben gleichzeitig nach STDOUT
Idealerweise verwendet ihr eine Logging Library:
Auch wenn wir swap! korrekt nutzen kann es leicht geschehen, dass unser Code nicht-deterministisch wird.
Beispiel: Wir berechnen für verschiedene Objekte eine Punktzahl und fügen diese nacheinander in eine sortierte Liste ein.
Bei Punktgleichstand ist entscheidend, wessen Thread zuerst fertig wurde.
In diesem Fall hilft ein Tie-Breaker damit die Sortierung immer eindeutig ist.
Nicht-deterministische Ergebnisse sind nicht per se schlimm. Resultierende Bugs können aber sehr bösartig sein.
swap! gibt als Rückgabewert den neuen Wert des Atoms zurück.
Mit swap-vals! erhalten wir den alten und den neuen Wert.
So können wir genau prüfen, was beim Update passiert ist:
(defn increase-to-10 [x]
(if (< x 10)
(inc x)
x))
(def v (atom 8))
(swap-vals! v increase-to-10)
Ein weiterer Weg, um ein Atom sicher zu ändern ist compare-and-set!
Wenn der Wert des Atoms gleich dem zweiten Parameter ist, setze ihn auf den dritten.
Gibt true zurück, wenn der Wert geändert wurde.
(def v (atom 5))
(compare-and-set! v 3 7)
(println @v)
(compare-and-set! v 5 7)
(println @v)
Wenn wir keine größere Update Logik wollen kann das einfacher als ein swap! sein.
Noch schwieriger wird es, wenn wir zwei Änderungen synchronisieren müssen:
Für diesen Fall kennt Clojure Refs und Transactions
Refs sind wie Atome, aber sie können zusammen in einer Transaction geändert werden.
Wir erstellen zwei Refs:
(def account-a (ref 100))
(def account-b (ref 100))
(future
(dosync
(alter account-a - 50)
(Thread/sleep 1000)
(alter account-b + 50)))
Erst zum Schluss von dosync werden die Änderungen ausgeführt.
Analog zu swap! wird der Code wiederholt, wenn sich zwischendurch etwas geändert hat.
Wir können so atomare Transaktionen innerhalb eines Clojure Programms ausführen.
Allgemein ist es aber besser, diese Aufgabe einer Datenbank zu überlassen.
Wir können ein Callback (watch) registrieren um Änderungen an einem Atom oder einer Ref mitzubekommen:
(def foo (atom 5))
(defn change-logger [key atom old-state new-state]
(println "Key" key
": Atom changed from"
old-state "to" new-state))
(add-watch foo :logger change-logger)
(swap! foo inc)
(remove-watch foo :logger)
(swap! foo inc)
Ausserdem können wir einen Validator mitgeben, um den zulässigen Wertebereich zu prüfen:
(def percent (atom 50))
(defn validator [new-value]
(and (>= new-value 0)
(<= new-value 100)))
(set-validator! percent validator)
(reset! percent 200)
Wenn der Validator false zurückgibt oder eine Exception wirft,
landet das Update in einer Exception.
Exception Handling kommt an späterer Stelle genauer.
Allgemein ist es besser ohne Exceptions zu arbeiten.
Noch eine Kategorie von mutable State sind Dynamic Vars
Diese werden ähnlich wie reguläre Vars erzeugt:
(def ^:dynamic *notification-address* "test@test.com")
Die * (earmuffs) signalisieren, dass es eine Dynamic Var ist.
Dynamic Vars können wir mit binding überschreiben:
(def ^:dynamic *notification-address* "some@email.com")
(defn send-notification [text]
(println "Sending" text "to" *notification-address*))
(send-notification "Hello World!")
(binding [*notification-address* "test@test.com"]
(send-notification "Test!"))
Mit dynamic Vars können wir default Settings festlegen, ohne diese immer als Parameter zu übergeben.
Diese Option sollte spärlich verwendet werden.
Einige Build-In Settings sind dynamic Vars:
(binding [*out* (clojure.java.io/writer "file.txt")]
(println "Hello World!"))
Atoms, Refs und dynamic Vars geben unseren Programmen State.
Wir sollten State so gut wie möglich vermeiden, um unser Program simpel zu halten.
Vorteile von Statelessness:
Simples Caching mit memoize
(def m (memoize (fn [x]
(Thread/sleep 1000)
(inc x))))
Der Königsweg um Asynchrone Prozesse zu koordinieren sind Queues.
In Clojure heißen diese Channels.
Diese finden wir in einer eigenen Library:
:dependencies [[org.clojure/clojure "1.11.1"]
[org.clojure/core.async "1.6.673"]]
Auf können mit einem Thread auf den Channel schreiben und mit einem anderen lesen.
(require '[clojure.core.async :refer [chan >!! <!!]])
(def c (chan))
(future (>!! c 42) (println "Delivered 42 to channel"))
(<!! c)
>!! blockiert, bis der Wert auch gelesen wird.
<!! blockiert, bis auch ein Wert geschrieben wird.
Wir können dem Channel auch eine Buffer Kapazität mitgeben, damit der schreibende die Threads nicht aufeinander warten müssen:
(require '[clojure.core.async :refer [chan >!! <!!]])
(def c (chan 2))
(future (>!! c 42) (println "Delivered 42 to channel"))
(future (>!! c 1337) (println "Delivered 1337 to channel"))
(future (>!! c 123) (println "Delivered 123 to channel"))
(<!! c)
(<!! c)
(<!! c)
Mit (close!) können wir einem Channel signalisieren, dass er geschlossen ist.
(require '[clojure.core.async :refer [chan close! >!! <!!]])
(def c (chan 1))
(>!! c 42)
(close! c)
(<!! c)
(<!! c)
Wenn wir einen Thread erzeugen wollen, der mehr als nur eine Aufgabe erledigt, können wir das mit (thread) machen:
(require '[clojure.core.async :refer [chan thread close! >!! <!!]])
(defn worker [c]
(loop []
(when-let [value (<!! c)]
(println "Received" value)
(recur)))
(println "Channel closed"))
(def c (chan 10))
(thread (worker c))
(>!! c 42)
(>!! c 1337)
(close! c)
Futures kommunizieren über den dereferenzierten Rückgabewert.
Threads kommunizieren über Channels.
Threads bringen Overhead mit sich.
Abhängig von der JVM Implementierung und vom OS:
Wir sollten nicht tausende Threads parallel am Laufen haben
(defn split-ranges [n m]
(map #(range (* % m) (* (inc %) m)) (range n)))
(println (time (dorun (map #(reduce + %)
(split-ranges 1000 1000)))))
(println (time (dorun (pmap #(reduce + %)
(split-ranges 1000 1000)))))
pmap ist eine parallelisierte Variante von map
Um den Overhead klein zu halten, kann man einen Thread-Pool erzeugen, der eine fixe Zahl (etwas mehr als Anzahl Kerne) an Threads einmal erzeugt und dann für bestimmte Aufgaben wiederverwendet.
Clojure hat leichtgewichtige Prozesse, die sich einen Thread-Pool teilen. Diese werden mit go erzeugt.
(require '[clojure.core.async :refer [chan thread go close! >!! <!! >! <!]])
(defn calculator [in-chan]
(let [out-chan (chan 100)]
(go (loop []
(when-let [value (<! in-chan)]
(>! out-chan (reduce + value))
(recur))))
out-chan))
(defn printer [in-chan]
(go (loop []
(when-let [value (<! in-chan)]
(println value)
(recur)))))
(-> number-chan
calculator
printer)
(dotimes [i 20] (>!! number-chan (range (inc i))))
Ein go Prozess wird geparkt, wenn >! oder <! nicht direkt ausgeführt werden können.
Parken ist deutlich effizienter als den Thread wechseln.
Wenn ein go Prozess blockiert wird, blockiert auch der unterliegende Thread aus dem Thread-Pool.
Wenn die Prozesse keine gerichtete Pipeline (DAG) bilden, müssen wir aufpassen, dass wir keine Deadlocks produzieren.
Deadlocks entstehen, wenn zwei Prozesse jeweils auf den anderen warten.

https://commons.wikimedia.org/wiki/File:An_illustration_of_the_dining_philosophers_problem.png
(require '[clojure.core.async :refer [chan go-loop >!! <!! >! <!]])
(defn calculator [in-chan out-chan]
(go-loop []
(when-let [{value :value n :time-processed} (<! in-chan)]
(Thread/sleep 500)
(>! out-chan {:value (+ value n) :time-processed (inc n)})
(recur))))
(defn coordinator [in-chan calc-chan]
(go-loop []
(when-let [thing (<! in-chan)]
(if (< (:time-processed thing) 10)
(>! calc-chan thing)
(println (:value thing)))
(recur))))
(def coord-chan (chan 1))
(def calc-chan (chan 1))
(calculator calc-chan coord-chan)
(coordinator coord-chan calc-chan)
(>!! coord-chan {:value 1 :time-processed 0})
Durch erhöhen des Channel Buffers wird das Deadlock unwahrscheinlicher, das Problem ist aber nicht beseitigt.
Deadlocks sind teilweise sehr schwer zu finden und zu beseitigen.
Es gibt passende Funktionen um Channels wie Listen zu behandeln:
(require '[clojure.core.async :as async :refer [chan <!!]])
(def c (chan 10))
(async/onto-chan c [1 2 3 4 5])
(def mod-c (async/map #(* % %) [c] 100))
(loop []
(when-let [value (<!! mod-c)]
(println value)
(recur)))