|
|||||||||||||||||||
HOME | COURSES | TALKS | ARTICLES | GENERICS | LAMBDAS | IOSTREAMS | ABOUT | CONTACT | | | | |||||||||||||||||||
|
Effective Java
|
||||||||||||||||||
Concurrency UpdatesEs gibt einige Ergänzungen in den Abstraktionen der java.util.concurrent.* -Packages (siehe / CONC /).Ergänzungen zu Atomics
Die Atomic-Klassen im Package
java.util.concurrent.atomic
haben neue Methoden bekommen, mit denen Zugriffsmodi wie bei den in Java
9 neuen Variable Handles möglich sind. Beispiele sind Methoden wie
get/setPlain()
,
getAcquire()
,
setRelease()
,
get/setOpaque()
,
weakCompareAndSetVolatile()
usw. in den Klassen
AtomicBoolean
,
AtomicLong
,
AtomicReference
etc.
Spin-Wait-Hints
Die Thread-Klasse hat eine neue statische
Methode
onSpinWait()
, mit der der aktuelle
Thread zu erkennen gibt, dass er in eine Spin-Wait-Loop (auch Busy-Wait
genannt) geht (siehe /
SPIN
/).
Auf manchen Plattformen kann dieser Hinweis zur Laufzeit genutzt werden,
um die Performance zu verbessern. Das ist beispielsweise auf Intel-Prozessoren
so, denn im x86-Instruction-Set gibt es extra eine PAUSE-Anweisung für
solche Situationen. Es handelt sich aber bei der Methode
onSpinWait()
lediglich um einen Hinweis, der auch ignoriert werden kann, wenn die Plattform
keine entsprechenden Optimierungsmöglichkeiten bietet.
Ergänzungen zum CompletableFutureDie Klasse CompletableFuture , die mit Java 8 zum JDK hinzugekommen ist, hat einige neue Methoden bekommen.
Bislang gab es keine Möglichkeit, über
das
CompletableFuture
eine Task
mit Timeout zu starten. Man kann sie starten und Funktionalität für
die asynchrone Resultatverarbeitung anhängen. Falls aber die Task zu lange
braucht, bis sie das Ergebnis liefert, konnte man bislang nicht ohne Weiteres
nach einer bestimmten Zeit abbrechen und ohne das Resultat weitermachen.
Dafür gibt es jetzt mit Java 9 die Methoden
orTimeout()
und
completeOnTimeout()
. Hier ein Beispiel:
CompletableFuture .supplyAsync(()->blockingReadPage(url)) . orTimeout (1,TimeUnit.SECONDS) .whenComplete((r,e)->{ if (r!=null) System.out.println(r); if (e!=null && e instanceof TimeoutException) System.out.println("time limit exceeded");
);
Die Methode
or
Timeout()
bewirkt, dass das
CompletableFuture
nach
Ablauf der Timeout-Zeit mit einer
TimeoutException
zurückkommt. Bei der Methode
completeOnTimeout()
kommt keine
TimeoutException
, sondern
man kann einen Ersatzwert angeben, der dann im Falle des Timeouts anstelle
des eigentlichen Resultats verwendet wird.
Der Timeout bezieht sich übrigens nicht
auf die Ablaufzeit der Task, sondern die Zeit läuft bereits, sobald die
Methode
supplyAsync()
die Task an der
unterliegenden Executor übergeben hat. Wenn die Task dort längere Zeit
in der Task-Queue steht, dann ist die Zeit schon abgelaufen, ehe die Task
überhaupt gestartet wurde.
Wenn man beispielsweise eine ganze Reihe
von Tasks unmittelbar nacheinander losschickt, dann werden nur die ersten
erfolgreich enden und alle späteren scheitern mit
TimeoutException
.
So wie hier:
for (String symbol : stockSymbols) { CompletableFuture .supplyAsync(()->getStockInfo(symbol)) .orTimeout(1,TimeUnit.SECONDS) .whenComplete((r,e)->{ if (r!=null) System.out.println(r); if (e!=null && e instanceof TimeoutException) System.out.println("time limit exceeded"); );
}
Der Output sieht dann beispielsweise so aus: GOOG: "Alphabet Inc.",835.24,"+12.03 - +1.46%" INTC: "Intel Corporation",35.93,"-0.27 - -0.75%" MXIM: "Maxim Integrated Products, Inc.",44.64,"+0.34 - +0.77%" time limit exceeded time limit exceeded time limit exceeded
time limit exceeded
Eine andere neue Method im CompletableFuture
ist die Methode
completeAsync()
. Sie
ist eine Vereinfachung für diejenigen, die
CompletableFuture
s
als Returnwert an ihrer Schnittstelle zurückgeben wollen. Bislang musste
man ein leeres "incomplete" Future zurückgeben und selber dafür sorgen,
dass das Resultat asynchron produziert und anschließend über die Methode
complete()
in dem leeren Future abgelegt wird. Mit
completeAsync()
kann man die asynchrone Resultatproduktion nun mit Hilfe des
CompletableFuture
machen.
Beispiel:
CompletableFuture<String> getWebpage(URL url) { CompletableFuture<String> future = new CompletableFuture<>(); Supplier<String> task = () -> blockingReadPage(url); Executor pool = ... grab a thread pool ... return future. completeAsync (task,pool);
}
Es gibt noch ein paar andere Kleinigkeiten. Beipielsweise kann man sich mit delayedExecutor(long,TimeUnit) einen Thread-Pool holen, den man beim supplyAsync() , runAsync() , etc. als Executor mitgeben kann. Er startet die Tasks erst nach dem angegebenen "delay". Dahinter steht ein ScheduledThreadPoolExecutor mit einem Daemon-Thread, der die Tasks mit der gewünschten Verzögerung an den eigentlichen Executor (z.B. an den ForkJoin-Common-Pool) übergibt. Reactive Programming with java.util.concurrent.Flow
Im
java.util.concurrent
Package gibt es eine neue Klasse
Flow
.
Sie implementieren ein API, das von der Reactive Stream Initiative (siehe
/
REAC
/)
als Standard für die asynchrone Verarbeitung von Datenströmen definiert
wurde. Damit soll folgendes Problem gelöst werden:
Es geht um Multithread-Anwendungen, in denen
ein Strom von Daten (der
Stream
) asynchron von einer Quelle (oft
Source
oder
Producer
genannt) zu einem Ziel (
Sink
oder
Consumer
genannt) transferiert wird. Das ist oft als Push-Strategie implementiert,
wo die Quelle die Initiative übernimmt und ihre Daten an das Ziel sendet.
Dabei kann es vorkommen, dass die Quelle die Daten schneller sendet, als
das Ziel sie verarbeiten kann. Diese Situation wird als
Overflow
bezeichnet. Dann müssen die Daten gepuffert werden und dieser Puffer kann
zum Engpass werden, denn wenn er voll ist, muss die Quelle anhalten und
warten, bis das Ziel bzw. der Puffer wieder Daten aufnehmen kann. Wenn
es zu dieser Overflow-Situation kommt, dann ist natürlich der Effekt der
Parallelverarbeitung weg, weil nun die Quell-Threads auf die Ziel-Threads
warten müssen.
Im Manifest der Reactive Stream Initiative
wird eine Lösung dafür vorgeschlagen, bei der die Datenflussmenge kontrolliert
wird, damit solche blockierenden Overflow-Situationen gar nicht erst entstehen.
Die Idee ist, dass das Ziel einen Gegendruck (die sogenannte
Backpressure
)
aufbaut, indem das Ziel der Quelle sagt, wie viele Daten es aufnehmen kann,
statt einfach nur alles zu akzeptieren, was von der Quelle kommt. Die
Implementierung soll so sein, dass die Quelle nun in Kenntnis der Kapazität
des Ziels dafür sorgt, dass es nicht zur Blockade kommt. Das wird als
Non-Blocking
Backpressure
bezeichnet.
Für die Implementierung der Reactive Stream
Ideen gibt es mit Java 9 die Klasse
Flow
im
java.util.concurrent
Package. Sie
implementiert das Publish-Subscriber-Pattern. Publish-Subscriber ist
ein Messing-Pattern: der
Publisher
stellt mehreren
Subscriber
n
Daten zur Verfügung. Die Subscriber spezifizieren in Form einer
Subscription
,
welche und wie viele Daten sie haben wollen. Die Subscription dient der
Entkopplung von Publisher und Subscriber.
Um nun das Ziel der Reactive Stream Initiative,
die Non-Blocking Backpressure, zu erreichen, ist eine Pull-Strategie implementiert,
bei der der Subscriber die Initiative übernimmt und von sich aus nur eine
begrenzte Menge von Daten anfordert.
Für das Publish-Subscriber-Pattern gibt
es vier neue Interfaces
Flow.Publisher
,
Flow.Subscriber
,
Flow.Processor
und
Flow.Subscription
und eine neue Klasse
SubmissionPublisher
,
die das
Flow.Publisher
Interface implementiert.
|
|||||||||||||||||||
© Copyright 1995-2018 by Angelika Langer. All Rights Reserved. URL: < http://www.AngelikaLanger.com/Articles/EffectiveJava/91.Java9.What-is-new-in-Java-9/90.java-9.1.overview.ready_8.html> last update: 26 Oct 2018 |