Nov 18: Concurrency and Threads. Synchronization. Deadlocks.
Code discussed in lecture
Concurrency and threads
Concurrent programs perform multiple tasks "simultaneously". How is that different from parallel programs? I like the discussion by Simon Peyton Jones in his essay Tackling the Awkward Squad...:
A parallel ... program uses multiple processors to gain performance. For example, it may be faster to evaluate e1 + e2 by evaluating e1 and e2 in parallel, and then add the results. Parallelism has no semantic impact at all: the meaning of a program is unchanged whether it is executed sequentially or in parallel. Furthermore, the results are deterministic; there is no possibility that a parallel program will give one result in one run and a different result in a different run.
In contrast, a concurrent program has concurrency as part of its specification. The program must run concurrent threads, each of which can independently perform input/output. The program may be run on many processors, or on one — that is an implementation choice. The behaviour of the program is, necessarily and by design, non-deterministic. Hence, unlike parallelism, concurrency has a substantial semantic impact.
Java provides concurrent threads. A process is an independent task with its own memory allocation. So if you computer is running mail, word, and a game simultaneously they are running as separated processes. Threads are light-weight processes that all work on a single task and share the same memory. We have already discussed the fact that graphics are performed in a different thread than your program. However, up to now we have not created our own threads. That is what we will do today.
Let's first look at an example of how threads work. To create
a thread you first create an object that extends Thread
and overrides run
. (It is also possible to implement
the interface Runnable
, which requires you to implement
only one method, run
.) Given an object of type Thread
you call start
on it, which causes it to split off a separate
thread running the method run. Consider the example
Incremeter.java:
public class Incrementer extends Thread {
private static int shared; // A variable shared by all incrementers
private String myName; // The name of the incrementer
/**
* Create an incrementer with a given name
* @name the name of the incrementer
*/
public Incrementer(String name) {
myName = name;
}
/**
* Thing to do when started
*/
public void run() {
shared = shared + 1;
int temp = shared;
System.out.println(myName + " incremented shared value " + shared);
shared = temp + 1;
System.out.println(myName + " incremented shared value again " + shared);
}
public static void main(String [] args) {
Incrementer inc1 = new Incrementer("first");
Incrementer inc2 = new Incrementer("second");
shared = 0;
try {
inc1.start();
inc2.start();
inc1.join();
inc2.join();
}
catch(InterruptedException e){
System.err.println("Exception: " + e.getMessage());
}
System.out.println("shared at end = " + shared);
}
}
An Incrementer
thread has a name, and it
increments the static variable shared
twice,
printing out the value as it does so. The main program
creates two Incrementer
objects and calls
start()
on each. Does this sound familiar?
The Timer
class is a thread, and you can also
call stop()
on a thread, although it is usually a bad idea to do so
(except for timers).
These threads run concurrently. The main method also calls join()
on each of them. This method says that the current thread should wait until the
thread that join
is called on completes. Show later what happens if this
were not done.
It looks like the threads should each increment shared
twice. The
order of the printouts and of the increments could be different, but the final
answer should be 4, right? Run the program and show this is not the case. The
print statements do interleave and can appear in half a dozen different order.
However, the final value is usually 3 and can be 2. Why?
One thread can
increment first and save shared
into temp
when it equals 1.
The other thread can then increment twice and print, and then the first thread can
then save temp + 1
into shared
. In fact, even if we just
had a single shared++
statement in each thread it is possible for each thread to get the
value of shared, one to increment it and store it back, and then the other to do so,
so shared
could end up with value 1.
If we have no join
calls the main program can print "shared at end" before the other two have completed.
They keep running on their own. If you want to stop them you call interrupt
to kill them. (We will see this later.)
Producer/Consumer and Bounded Buffers
Why might we want concurrent threads? A classic example is Producer/Consumer. One or more Producers output items and one or more Consumers input the items that the Producers produce. Each Producer or Consumer runs in its own thread. We need a way for them to communicate, that is to pass the items from Producers to Consumers.
The standard solution to this problem uses a
bounded buffer. A bounded buffer is a circular
queue like the ones we saw earlier. However, it has the property that
calling put
(enqueue) on a full bounded buffer does not
cause an error. The thread blocks until there is room. Similarly
calling get
(dequeue) on an empty buffer blocks until something is
put
into the buffer.
This is not a toy example. Bounded buffers are used all of the time. An example is the "pipe" construct in Unix. The following is a spell checker. The whole command should be on one line.
cat /Users/scot/TextFiles/USConstitution.txt | tr 'A-Z' 'a-z' | tr -cs 'a-z' '\n' |
sort | uniq | comm -23 - /Users/scot/Documents/Courses/cs10/notes/27/biggerSorted.txt
Let's try to break this down. I don't expect you to understand the commands, but rather the overall information flow.
The cat
command is concatenate. It concatenates all files that
follow the command and writes the concatenation to the standard output.
Here there is only one file, so it reads and outputs it.
The |
is the pipe construct. It says to take the output from the
command on its left (the cat
) and use it as input to the command on
its right (the tr
).
The tr
command is translate. In this case the letters in A-Z get
converted to the letters in a-z. So this converts the text to lower case. Note
that the output of this tr
is input to a second tr
,
which translates any character NOT in a-z to a return character (\n). Thus each word ends up
on its own line.
The output of this tr
becomes the input to sort
,
which not surprisingly sorts it into alphabetical order and outputs it. The uniq
method outputs only unique lines, eliminating duplicates.
Finally comm
compares the two files. Each line appearing only
in the first file (the input from the pipe) goes into one column, each line
appearing only in the second file appears in a second column, and each line
appearing in both files appears in a third column. However, the -23
says to surpress outputting columns 2 and 3. The file biggerSorted.txt is a
sorted version of a file derived from a bunch of works from Project Gutenberg that
I have used in a spell corrector in some previous classes, but
with duplicates removed. So what is output from the whole thing is all words
in USConstitution.txt that are not found in biggerSorted.txt. These words
may be misspelled.
The ability to combine commands this way is one of the reasons that Unix is popular. Want to guess how I converted bigger.txt to biggerSorted.txt?
However, why am I bothering to tell you about this in a lecture on concurrent
programming? What do you think is going on inside Unix when it executes this line?
Each of the commands is run in its own thread. A pipe becomes a bounded buffer.
So the cat
puts characters into the bounded buffer from which the first
tr
gets them, etc. Six threads and five bounded buffers are created.
Each thread runs at its own pace as long as it can get input and write output.
When one of these is not possible it waits.
We now look at an implementation of bounded buffers in Java.
A Java implementation of Producer/Consumer using Bounded Buffers
We first look at a program that uses producers and consumers. It is BoundedBufferTest.java:
public class BoundedBufferTest {
public static void main(String[] args) {
BoundedBuffer buffer = new BoundedBuffer(5); // buffer has size 5
Producer prod = new Producer(buffer);
Consumer cons1 = new Consumer(buffer);
Consumer cons2 = new Consumer(buffer);
prod.start();
cons1.start();
cons2.start();
try {
prod.join();
cons1.interrupt();
cons2.interrupt();
} catch (InterruptedException e) {
}
System.out.println("End of Program");
}
}
This main method creates a producer and two consumers. The producer will put
things into buffer
and the consumers will get
them and process them.
This code starts all three and then calls join
to wait for the producer's thead
to finish. At that point it interrupts the consumers, so that they don't continue running after
the main program ends.
The run
method of Producer is:
public void run() {
try {
for (int index = 0; index < 100; index ++) {
buffer.put(index);
sleep(100);
}
} catch (InterruptedException e) { }
}
So the producer is a for loop that generates the numbers from 0 to 99 and puts them into
the buffer. Between each it sleeps for 100 milliseconds. Calling sleep
on a
thread puts it to sleep for the number of milliseconds that its parameter represents.
The run
method of Consumer is also
simple:
public void run() {
try{
while (true) {
int n = buffer.get();
int waitTime = 100 + generator.nextInt(200);
sleep(waitTime);
System.out.println(n);
}
} catch(InterruptedException e){}
}
The consumer loops forever, reading from the buffer and printing
the value of the get
. It also sleeps, but it does so for
a random amount of time between 100 and 300 milliseconds.
We are now ready to look at the implementation of BoundedBuffer.java:
public class BoundedBuffer {
protected int numSlots; // Size of the bounded buffer (queue)
private int[] buffer; // Hold the circular queue
private int takeOut = 0, putIn = 0; // Mark front and back of queue
private int count=0; // Number of items in the queue
/** Create a new bounded buffer with numSlots slots
*
* @param numSlots the size of the bounded buffer to be created
*/
public BoundedBuffer(int numSlots) {
if(numSlots < 0) {
throw new IllegalArgumentException(
"numSlots <= 0");
}
this.numSlots = numSlots;
buffer = new int[numSlots];
}
/**
* Add an item to the back of the bounded buffer.
* Block if full.
* @param value the thing to add
* @throws InterruptedException
*/
public synchronized void put(int value) throws InterruptedException {
while (count == numSlots)
wait();
buffer[putIn] = value;
putIn = (putIn + 1) % numSlots;
count++;
notifyAll();
}
/**
* Remove an item from the front of the bounded buffer.
* Block if empty.
* @return
* @throws InterruptedException
*/
public synchronized int get() throws InterruptedException {
while (count == 0)
wait();
int value = buffer[takeOut];
takeOut = (takeOut + 1) % numSlots;
count--;
notifyAll();
return value;
}
}
The interesting parts are the put
and get
methods. They are similar
to what we saw when we looked at storing a queue in an array, but there are a couple of differences:
- They keep a count of the number of items. This way it is possible to distinguish between a
full and empty queue when
putIn == takeOut
. - They are synchronized.
The synchronized
keywork says that only one thread is allowed to be executing
any synchronized
method of a class at a given time. Why is this important? If two threads
both tried to put at the same time it would be possible for both of them to write
the the slot buffer[putIn]
and then both of them to increment putIn
,
losing one of the values and leaving a slot with garbage in it. Similarly, two threads
executing get
could both get the same value and the next value could be skipped.
Would it be safe for one thread to get
and another to put
simultaneosly?
At first glance it looks safe because they are changing different parts of the buffer, but a subtle
bug could occur if both tried to update counter
at the same time. Both could read counter
, update it, and write it back. The second one
to write would overwrite the first, and counter
would change value, when after a
put
and a get
it should be back to its original value.
One mechanism for dealing with synchronization is called a monitor. Java provides a version of a monitor. With a monitor there are critical sections of code that only one thread should be allowed to execute at a time. If every method in a class is synchronized (except constructors, which cannot and do not need to be synchronized) then we would have a standard monitor. Java allows more flexibility, in that it allows only some methods in the class to be synchronized. This increases the amount of concurrency allowed, but can cause problems when a programmer fails to realize that certain operations cannot be safely performed simultaneously.
The basic idea of a monitor is that only one thread can be inside of it at a time. If several threads make method calls one is allowed to proceed and the others must wait. One of them will be allowed to proceed once the first thread leaves the critical region (the synchronized method). Java implements this by providing a lock for each object. Only one thread can hold the lock at a time, and it gives it up when it returns from the synchroized method. The other threads wait for the lock to come available.
This provides enough machinery for simple cases like preventing two threads from simultaneously
updating a counter. However, we need more machinery to make more general synchronized methods
like those in the BoundedBuffer
work. Suppose get
is called on an empty queue. It should wait until the queue is not empty. One way to do this would
be to have a statement:
while(counter == 0)
;
However, this leads to a problem (beside the one that it wastes a lot of computer time to do this).
The only way for the thread to proceed is for another thread to call put
to add an
item and increment counter
. But this cannot happen while the first thread is still in
get
!
To get around this there are three additional thread methods: wait
, notify
,
and notifyAll
. The wait
method removes the thread from the synchonized method
and puts it into a list of threads waiting on the BoundedBuffer
object. (It is possible to
wait on any object, not just this
, but for now we will only look at this form.) Therefore
other methods can proceed and eventually some thread will call put
and make an item available.
But if the thread is waiting in a list of waiting threads it is no longer examining counter
.
How is it to discover that it can now proceed?
The solution is that when an object leaves a synchronized method it calls either notify
or notifyAll
. The notify
method picks an arbitrary method from the list of
waiting threads and starts it running again from where it called wait
. (Note that the
waiting list is not a FIFO queue, which will have implications on fairness that we will
discuss later.) The notifyAll
method reactivates all of the threads currently waiting,
and the first to respond gets to continue. The rest wait for the method, but do not need to be
notified again. This is why there is a while loop around the
wait
rather than an if test. When the first thread leaves the synchronized region another
will proceed, and by then the buffer may again be empty.
Note that there are two ways a thread can end up waiting. The first is that a method call to a
synchronized method happens when another thread is in a synchronized method. The Java runtime
environment takes care of this. The other is to explicitly call wait
. Such threads
continue to wait until they are signaled via notify
or notifyAll.
Sieve of Eratosthenes
Eratosthenes was a Greek born in the third century BC in what is now Libya. He was the third Librarian of the great library of Alexandria. He computed the radius of the earth and the distance to the sun and moon with impressive accuracy. (There is some dispute on how accurate, because he stated his answers in stadia, a distance measure that is approximately the length of a stadium. There were different versions of stadia in use, and the Egyptian stadion, about 157.5 meters, gave very accurate answers.) But our interest in him is due to a method of computing prime numbers called the Sieve of Eratosthenes.
His idea was to write out integers starting at 2. Two is a prime, and he eliminated all multiples of 2 by crossing out second number starting with 4. 3 is the next number so must be prime, and he then crossed out every third number starting with 6. 5 is the next number not yet crossed out, so is a prime, etc.
An obvious way of implementing this uses a boolean array isPrime
originally set to
all true. Then isPrime[4]
, isPrime[6]
, ... are set to false. Then
isPrime[6]
, isPrime[9]
, ... are set to false and so on. The entries of
isPrime
that remain true are the primes.
An alternate way is to use threads and bounded buffers. The first thread generates integers starting with 2. It passes these on via a bounded buffer to a second thread that prints out 2 as prime and passes on all integers not divisible by 2. These are fed to a bounded buffer that prints out 3 and passes on the integers the it receives that are not divisible by 3. To generate n primes we create n threads and bounded buffers. The code in Sieve.java does this.
public class Sieve extends Thread {
private BoundedBuffer in, out;
private int myprime;
public Sieve ( BoundedBuffer store ) {
in = store;
}
public void run() {
int n;
try {
// First int seen will be a prime, because it passed all previous filters
myprime = in.get();
if ( myprime < 0 ) {
return; // Done with input (base case)
} else {
System.out.println(myprime); // Note each filter prints one prime only!
out = new BoundedBuffer ( 5 );
Sieve s = new Sieve ( out ); // Set up next filter in the chain
s.start();
n = in.get();
while ( n >= 0 ) {
if ( n % myprime != 0 ) { // Filter out multiples of myprime
out.put(n);
}
n = in.get();
}
out.put(n); // output the negative to signal end
}
} catch (InterruptedException e) {}
}
public static void main ( String[] args ) {
BoundedBuffer b = new BoundedBuffer(5);
try {
Sieve s = new Sieve(b);
s.start();
for ( int i = 2; i <= 100; i++ ) {
b.put(i);
}
b.put(-1); // Signal that the steam of number is done
} catch (InterruptedException e) {}
}
}
The main method creates a BoundedBuffer b
and passes it to a
Sieve
object. It then writes the integers from 2 to 100 to the
bounded buffer, ending by writing -1 to the bounded buffer as a signal that
the stream of integers is done.
The run
method of Sieve
does what was
described above. It creates a bounded buffer out
and
creates a new Sieve
that will use out
as its
input. It prints the first thing it gets from its input
buffer and saves it in myPrime
. It then gets each successive thing from the
bounded buffer and puts it to out
if it is not divisible
by myPrime
. Thus each Sieve
object is both a consumer and a
producer. It stops when it sees a negative number, but puts that number
into the out
buffer.