Concurrent Programming
Concurrency (multithreading, concurrency) is one of the most difficult topics in Java programming. The possible errors are often difficult to identify and mostly difficult to debug, as they may only occur sporadically and not so easily reproducible.
Concurrency is the ability to run several programs or parts of a program in parallel. If a time-consuming task can be performed asynchronously or in parallel, this will improve the program's throughput and interactivity.
A modern computer has multiple CPUs or multiple cores within a single CPU. The ability to leverage these multi-core applications can be the key to a successful high-volume application.
Process vs. threads
The process operates independently and is isolated from other processes. Shared data can not be accessed directly in other processes. The process resources, e.g. memory and CPU time, are allocated through the operating system.
A thread is a so-called lightweight process. It has its own call stack, but it can access the shared data of other threads in the same process. Each thread has its own memory cache. If a thread reads shared data, it stores the data in its own memory cache.
The shared data can be re-read by a thread.
The Java application is running in one process by default. You work with several threads within the Java application to achieve parallel processing or asynchronous behavior.
Concurrency in Java
Processes and Threads
The Java program runs in its own process and, by default, in a single thread. Java supports threads through the Thread
code as part of the Java language. The Java application can use this class to create new threads. Java 1.5 also provides enhanced support for the java.util.concurrent
package.
Synchronization of locks and thread
Java provides locks to protect certain parts of the code to be executed by multiple threads at the same time. The easiest way to lock a certain method or Java class is to define a method or class with a synchronized keyword.
The Java synchronized keyword ensures: * Only a single thread can execute a code block at the same time. * That every thread that enters the synchronized block of code sees the effects of all previous changes that have been guarded by the same lock.
Synchronization is necessary for mutually exclusive access to thread blocks and for reliable communication between threads. You can use the synchronized keyword to define the method. This would ensure that only one thread could enter this method at the same time. Another thread that calls this method would wait until the first thread leaves this method.
You can also use a synchronized keyword to protect code blocks within a method. This block is protected by a key, which can be either a string or an object. The key is called the lock.
Any code protected by the same lock can only be executed by one thread at the same time.
For example , the following data structure ensures that only one thread can access the inner block of the MyMethod
.
java.util.concurrent
The java.util.concurrent
package provides tools for creating concurrent applications. It has too many features to discuss here. In this article, we will mainly focus on some of the most useful utilities as follows:
- Executor
- ExecutorService
- ScheduledExecutorService
- Future
- CountDownLatch
- CyclicBarrier
- Semaphore
- Locks
- BlockingQueue
- DelayQueue
- SynchronousQueue
- Phaser
- ThreadFactory
Executor
Executor is an interface that represents an object that performs the tasks provided. Depends on the specific implementation (from where the invocation is initiated), the task run on a new or current thread. Thus, using this interface, we can decouple the task execution flow from the actual task execution mechanism.
One point to note here is that the Executor does not strictly require the execution of the task to be asynchronous. In the simplest case, the executor can instantly invoke the submitted task in the invoking thread.
To create an executor instance, we need to create an invoker:
public class Invoker implements Executor {
@Override
public void execute(Runnable r) {
r.run();
}
}
// the invoker can be used to execute the task.
public void execute() {
Executor executor = new Invoker();
executor.execute( () -> {
// task to be performed
});
}
If the execution task is not accepted by the executor, a RejectedExecutionException
will be thrown.
ExecutorService
ExecutorService is a complete asynchronous processing solution. It manages the in-memory queue and schedules of tasks submitted on the basis of thread availability.
To use the ExecutorService, we need to create a single Runable class nad assign it to ExecutorService instance.
public class Task implements Runnable {
@Override
public void run() {
// task details
}
}
...
ExecutorService executor = Executors.newFixedThreadPool(10);
public void execute() {
executor.submit(new Task());
}
ScheduledExecutorService
ScheduledExecutorService is a similar interface to the ExecutorService, but it can perform tasks periodically.
The methods of the Executor and ExecutorService are scheduled without any artificial delay. ScheduledExecutorService is able also to schedule the task after some given fixed delay:
executorService.scheduleAtFixedRate(() -> {
// ...
}, 1, 10, TimeUnit.SECONDS);
executorService.scheduleWithFixedDelay(() -> {
// ...
}, 1, 10, TimeUnit.SECONDS);
scheduleAtFixedRate(Runnable command, long initialDelay, long time, TimeUnit unit)
method creates and executes a periodic action that is invoked first after the initial delay provided, and then after that period until the service instance is shut down.
scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
method creates and executes a periodic action that is invoked first after the initial delay provided, and repeatedly with the delay between the completion of the one and the invocation of the next.
Future
Future is used to represent the outcome of an asynchronous operation. In addition, the cancel(boolean flag)
API cancels the operation and releases the execution thread. If the value of flag is true, the thread running the task will be terminated immediately. Otherwise, in-progress tasks may be completed. An example of creating a future instance comes as follows:
public void invoke() {
ExecutorService executorService = Executors.newFixedThreadPool(10);
Future<String> future = executorService.submit(() -> {
// ...
Thread.sleep(10000l);
return "Hello world";
});
}
...
// check if the future result is ready and fetch the data if the computation is finished:
if (future.isDone() && !future.isCancelled()) {
try {
str = future.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
...
// specify a timeout for a given operation: If the task takes more than this limit, a TimeoutException is thrown:
try {
future.get(10, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
e.printStackTrace();
}
CountDownLatch
A CountDownLatch is a synchronization aid and allows easy waiting for the termination of several threads. The number of threads that CountDownLatch monitors is set at the beginning via the constructor and can then no longer be changed. A CountDownLatch can also be used to start several threads at the same time. A CountDownLatch can only be used once. A restart is not possible.
CyclicBarrier
A CyclicBarrier can be seen as a variant of a CountDownLatch. A certain fixed number of threads are started, we call them workers again. They run until they hit a barrier and have to wait. An object of the type CyclicBarrier is this barrier and counts the arriving ones. If all workers have reached the barrier, there are two possibilities. In the simple case, all workers can then cross the barrier and possibly come across the next barrier. In the other case, a new thread is started at the barrier and all those who arrive wait again until this thread has ended. The BarriereThread is passed as a runnable to the CyclicBarrier via a constructor. Then the barrier is exceeded and possibly another barrier is reached.
In contrast to the CountDownLatch, the barrier can be used as often as required together with the BarriereThread. There are two constructors. The first only gets the number of threads to wait for. In this case, there will be no action at the barrier point. The second constructor has a runnable as a second argument, which contains the action at the barrier point.
In contrast to the CountDownLatch, there is no counterpart to the await()
method. There is no countDown()
method or methods like notify () or signal (). For this reason, a CyclicBarrier is not called advanceable. We can summarize the properties right away.
Semaphore
The word semaphore (that or the semaphore, plural the semaphore) comes from the Greek and literally means character carrier. Common translations are signal, signal generator or traffic light. In computer science, semaphore means a control instance that ensures that only a certain number of threads can access an object or resource. In Java this control instance is implemented by the java.util.concurrent.Semaphore class developed by Doug Lea (from Java 1.5).
A semaphore allows a fixed and therefore limited number of threads to use a resource (critical section). The threads can change during runtime. Every thread has to get permission. If the maximum number is reached, there is no more permission until an approved thread has finished its work. The semaphore internally manages a number of so-called permits.
static Semaphore semaphore = new Semaphore(10);
public void execute() throws InterruptedException {
LOG.info("Available permit : " + semaphore.availablePermits());
LOG.info("Number of threads waiting to acquire: " +
semaphore.getQueueLength());
if (semaphore.tryAcquire()) {
try {
// ...
} finally {
semaphore.release();
}
}
}
Locks
The big disadvantage of synchronized is that a thread locks ALL synchronized methods as soon as it processes a synchronized method. This is simply because there is only one lock per object. As soon as the monitor has assigned a lock, no other thread can inevitably access the synchronized methods or blocks of this object.
Object lock = new Object ();
// ...
synchronized (lock) {
// something that requires synchronized access
}
// from Java 5
Lock lock = ...
...
// Trying to enter the critical section
lock.lock (); // blocks until this thread gets the lock
try {
// access the resource protected by this lock
} finally {
lock.unlock (); // releases the lock
}
BlockingQueue
The BlockingQueue interface is an extension of the Queue interface and has been specially designed for producer-consumer queues. Since they allow a generic type, they can represent queues for any objects. We take the following passages from the API: A Queue that additionally supports operations that wait for the queue to become non-empty when retrieving an element, and wait for space to become available in the queue when storing an element.
BlockingQueue implementations are thread-safe. All queuing methods achieve their effects atomically using internal locks or other forms of concurrency control.
Adding Elements
- add() – returns true if insertion was successful, otherwise throws an IllegalStateException
- put() – inserts the specified element into a queue, waiting for a free slot if necessary
- offer() – returns true if insertion was successful, otherwise false
- offer(E e, long timeout, TimeUnit unit) – tries to insert element into a queue and waits for an available slot within a specified timeout
Retrieving Elements
- take() – waits for a head element of a queue and removes it. If the queue is empty, it blocks and waits for an element to become available
- poll(long timeout, TimeUnit unit) – retrieves and removes the head of the queue, waiting up to the specified wait time if necessary for an element to become available. Returns null after a timeout
DelayQueue
An unbounded blocking queue of delayed elements, in which an element can only be taken when its delay has expired. The head of the queue is that delayed element whose delay expired furthest in the past. If no delay has expired there is no head and poll will return null. Expiration occurs when an element's getDelay (TimeUnit.NANOSECONDS) method returns a value less than or equal to zero. Even though unexpired elements cannot be removed using take or poll, they are otherwise treated as normal elements. For example, the size method returns the count of both expired and unexpired elements. This queue does not permit null elements.
A DelayQueue works with elements of the Delayed type.
A DelayQueue can only accept objects whose class has implemented the Delayed interface. This class should implement the methodscompareTo()
and getDelay()
sensibly, since the DelayQueue uses these two methods to build up and clear the queue.
The DelayQueue uses compareTo()
to determine which element must be at the head of the queue. Usually this is the element with the smallest delay time. A DelayQueue can hold any number of elements. The head element - and only this - can be taken if getDelay()
returns a number <= 0. The delay times of the other elements are irrelevant. After a removal, delay queue must determine a new head element.
SynchronousQueue
A blocking queue in which each insert operation must wait for a corresponding remove operation by another thread, and vice versa. A synchronous queue does not have any internal capacity, not even a capacity of one. You cannot peek at a synchronous queue because an element is only present when you try to remove it; you cannot insert an element (using any method) unless another thread is trying to remove it; you cannot iterate as there is nothing to iterate. The head of the queue is the element that the first queued inserting thread is trying to add to the queue; if there is no such queued thread then no element is available for removal and poll () will return null. For purposes of other collection methods (for example contains), a SynchronousQueue acts as an empty collection. This queue does not permit null elements.
This class supports an optional fairness policy for ordering waiting producers and consumer threads. By default, this ordering is not guaranteed. However, a queue constructed with fairness set to true grants threads access in FIFO order.
Phaser
Introduced with Java 7, the Phaser combines the capabilities of CountDownLatch and CyclicBarrier and can also work with any number of threads. Similar to CyclicBarrier, there is a possibility to trigger an additional action at the barrier point. With CyclicBarrier you pass a runnable to the constructor, with Phaser you create a subclass and overwrite the protected method boolean onAdvance(int phase, int registeredParties)
.
ThreadFactory
As the name suggests, ThreadFactory acts as a (non-existing) thread pool that creates a new on demand thread. It eliminates the need for a lot of boiler plate coding to implement efficient thread creation mechanisms.
A ThreadFactory can be defined as follows:
public class MyThreadFactory implements ThreadFactory {
private int threadId;
private String name;
public MyThreadFactory(String name) {
threadId = 1;
this.name = name;
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, name + "-Thread_" + threadId);
LOG.info("created new thread with id : " + threadId + " and name : " + t.getName());
threadId++;
return t;
}
}
MyThreadFactory factory = new MyThreadFactory("MyThreadFactory");
for (int i = 0; i < 10; i++) {
Thread t = factory.newThread(new Task());
t.start();
}
Exercise
- Exercise 1:
- Define
Runnable
task that sleep for 3 seconds then finished - Create 20 Threads Using Thread Factory using above Runnable Tasks
- Define