Nov 18: Concurrency and Threads. Synchronization. Deadlocks.


Code discussed in lecture

Concurrency and threads

Concurrent programs perform multiple tasks "simultaneously". How is that different from parallel programs? I like the discussion by Simon Peyton Jones in his essay Tackling the Awkward Squad...:

A parallel ... program uses multiple processors to gain performance. For example, it may be faster to evaluate e1 + e2 by evaluating e1 and e2 in parallel, and then add the results. Parallelism has no semantic impact at all: the meaning of a program is unchanged whether it is executed sequentially or in parallel. Furthermore, the results are deterministic; there is no possibility that a parallel program will give one result in one run and a different result in a different run.

In contrast, a concurrent program has concurrency as part of its specification. The program must run concurrent threads, each of which can independently perform input/output. The program may be run on many processors, or on one — that is an implementation choice. The behaviour of the program is, necessarily and by design, non-deterministic. Hence, unlike parallelism, concurrency has a substantial semantic impact.

Java provides concurrent threads. A process is an independent task with its own memory allocation. So if you computer is running mail, word, and a game simultaneously they are running as separated processes. Threads are light-weight processes that all work on a single task and share the same memory. We have already discussed the fact that graphics are performed in a different thread than your program. However, up to now we have not created our own threads. That is what we will do today.

Let's first look at an example of how threads work. To create a thread you first create an object that extends Thread and overrides run. (It is also possible to implement the interface Runnable, which requires you to implement only one method, run.) Given an object of type Thread you call start on it, which causes it to split off a separate thread running the method run. Consider the example Incremeter.java:

public class Incrementer extends Thread {
  private static int shared;  // A variable shared by all incrementers
  
  private String myName;        // The name of the incrementer
  
  /**
   * Create an incrementer with a given name
   * @name the name of the incrementer
   */
  public Incrementer(String name) {
    myName = name;
  }
  
  /**
   * Thing to do when started
   */
  public void run() {
    shared = shared + 1;
    int temp = shared;
    System.out.println(myName + " incremented shared value " + shared);
    shared = temp + 1;
    System.out.println(myName + " incremented shared value again " + shared);
  }
  
  public static void main(String [] args) {
    Incrementer inc1 = new Incrementer("first");
    Incrementer inc2 = new Incrementer("second");
    
    shared = 0;
    
    try {
      inc1.start();
      inc2.start();
      inc1.join();
      inc2.join();
    }
    catch(InterruptedException e){
      System.err.println("Exception: " + e.getMessage());
    }
    
    System.out.println("shared at end = " + shared);
  }
}

An Incrementer thread has a name, and it increments the static variable shared twice, printing out the value as it does so. The main program creates two Incrementer objects and calls start() on each. Does this sound familiar? The Timer class is a thread, and you can also call stop() on a thread, although it is usually a bad idea to do so (except for timers).

These threads run concurrently. The main method also calls join() on each of them. This method says that the current thread should wait until the thread that join is called on completes. Show later what happens if this were not done.

It looks like the threads should each increment shared twice. The order of the printouts and of the increments could be different, but the final answer should be 4, right? Run the program and show this is not the case. The print statements do interleave and can appear in half a dozen different order. However, the final value is usually 3 and can be 2. Why?

One thread can increment first and save shared into temp when it equals 1. The other thread can then increment twice and print, and then the first thread can then save temp + 1 into shared. In fact, even if we just had a single shared++ statement in each thread it is possible for each thread to get the value of shared, one to increment it and store it back, and then the other to do so, so shared could end up with value 1.

If we have no join calls the main program can print "shared at end" before the other two have completed. They keep running on their own. If you want to stop them you call interrupt to kill them. (We will see this later.)

Producer/Consumer and Bounded Buffers

Why might we want concurrent threads? A classic example is Producer/Consumer. One or more Producers output items and one or more Consumers input the items that the Producers produce. Each Producer or Consumer runs in its own thread. We need a way for them to communicate, that is to pass the items from Producers to Consumers.

The standard solution to this problem uses a bounded buffer. A bounded buffer is a circular queue like the ones we saw earlier. However, it has the property that calling put (enqueue) on a full bounded buffer does not cause an error. The thread blocks until there is room. Similarly calling get (dequeue) on an empty buffer blocks until something is put into the buffer.

This is not a toy example. Bounded buffers are used all of the time. An example is the "pipe" construct in Unix. The following is a spell checker. The whole command should be on one line.

cat /Users/scot/TextFiles/USConstitution.txt | tr 'A-Z' 'a-z' | tr -cs 'a-z' '\n' | 
sort | uniq | comm -23 - /Users/scot/Documents/Courses/cs10/notes/27/biggerSorted.txt

Let's try to break this down. I don't expect you to understand the commands, but rather the overall information flow.

The cat command is concatenate. It concatenates all files that follow the command and writes the concatenation to the standard output. Here there is only one file, so it reads and outputs it.

The | is the pipe construct. It says to take the output from the command on its left (the cat) and use it as input to the command on its right (the tr).

The tr command is translate. In this case the letters in A-Z get converted to the letters in a-z. So this converts the text to lower case. Note that the output of this tr is input to a second tr, which translates any character NOT in a-z to a return character (\n). Thus each word ends up on its own line.

The output of this tr becomes the input to sort, which not surprisingly sorts it into alphabetical order and outputs it. The uniq method outputs only unique lines, eliminating duplicates.

Finally comm compares the two files. Each line appearing only in the first file (the input from the pipe) goes into one column, each line appearing only in the second file appears in a second column, and each line appearing in both files appears in a third column. However, the -23 says to surpress outputting columns 2 and 3. The file biggerSorted.txt is a sorted version of a file derived from a bunch of works from Project Gutenberg that I have used in a spell corrector in some previous classes, but with duplicates removed. So what is output from the whole thing is all words in USConstitution.txt that are not found in biggerSorted.txt. These words may be misspelled.

The ability to combine commands this way is one of the reasons that Unix is popular. Want to guess how I converted bigger.txt to biggerSorted.txt?

However, why am I bothering to tell you about this in a lecture on concurrent programming? What do you think is going on inside Unix when it executes this line? Each of the commands is run in its own thread. A pipe becomes a bounded buffer. So the cat puts characters into the bounded buffer from which the first tr gets them, etc. Six threads and five bounded buffers are created. Each thread runs at its own pace as long as it can get input and write output. When one of these is not possible it waits.

We now look at an implementation of bounded buffers in Java.

A Java implementation of Producer/Consumer using Bounded Buffers

We first look at a program that uses producers and consumers. It is BoundedBufferTest.java:

public class BoundedBufferTest {
  public static void main(String[] args) {
    BoundedBuffer buffer = new BoundedBuffer(5); // buffer has size 5
    Producer prod = new Producer(buffer);
    Consumer cons1 = new Consumer(buffer);
    Consumer cons2 = new Consumer(buffer);
    prod.start();
    cons1.start();
    cons2.start();
    try {
      prod.join();
      cons1.interrupt();
      cons2.interrupt();
    } catch (InterruptedException e) {
    }
    System.out.println("End of Program");
  }
}

This main method creates a producer and two consumers. The producer will put things into buffer and the consumers will get them and process them. This code starts all three and then calls join to wait for the producer's thead to finish. At that point it interrupts the consumers, so that they don't continue running after the main program ends.

The run method of Producer is:

  public void run() {
    try {
      for (int index = 0; index < 100; index ++) {
        buffer.put(index);
        sleep(100);
      }
    } catch (InterruptedException e) {  }
  }

So the producer is a for loop that generates the numbers from 0 to 99 and puts them into the buffer. Between each it sleeps for 100 milliseconds. Calling sleep on a thread puts it to sleep for the number of milliseconds that its parameter represents.

The run method of Consumer is also simple:

  public void run() {
    try{
      while (true) {
        int n = buffer.get();
        int waitTime = 100 + generator.nextInt(200);
        sleep(waitTime);
        System.out.println(n);
      }
    } catch(InterruptedException e){}
  }

The consumer loops forever, reading from the buffer and printing the value of the get. It also sleeps, but it does so for a random amount of time between 100 and 300 milliseconds.

We are now ready to look at the implementation of BoundedBuffer.java:

public class BoundedBuffer {
   protected int numSlots;              // Size of the bounded buffer (queue)
   private int[] buffer;                // Hold the circular queue
   private int takeOut = 0, putIn = 0;  // Mark front and back of queue
   private int count=0;                 // Number of items in the queue
   
  /** Create a new bounded buffer with numSlots slots
   * 
   * @param numSlots the size of the bounded buffer to be created
   */
   public BoundedBuffer(int numSlots) { 
      if(numSlots < 0) {
         throw new IllegalArgumentException(
                                    "numSlots <= 0");
      } 
      this.numSlots = numSlots; 
      buffer = new int[numSlots];
   }
   
   /**
    * Add an item to the back of the bounded buffer.
    * Block if full.
    * @param value the thing to add
    * @throws InterruptedException
    */
   public synchronized void put(int value) throws InterruptedException {
      while (count == numSlots) 
        wait();
      buffer[putIn] = value;
      putIn = (putIn + 1) % numSlots;
      count++;
      notifyAll();
   }

   /** 
    * Remove an item from the front of the bounded buffer.
    * Block if empty.
    * @return
    * @throws InterruptedException
    */
   public synchronized int get() throws InterruptedException {
      while (count == 0) 
        wait();
      int value = buffer[takeOut];
      takeOut = (takeOut + 1) % numSlots;
      count--;
      notifyAll();
      return value;
   }
}

The interesting parts are the put and get methods. They are similar to what we saw when we looked at storing a queue in an array, but there are a couple of differences:

The synchronized keywork says that only one thread is allowed to be executing any synchronized method of a class at a given time. Why is this important? If two threads both tried to put at the same time it would be possible for both of them to write the the slot buffer[putIn] and then both of them to increment putIn, losing one of the values and leaving a slot with garbage in it. Similarly, two threads executing get could both get the same value and the next value could be skipped.

Would it be safe for one thread to get and another to put simultaneosly? At first glance it looks safe because they are changing different parts of the buffer, but a subtle bug could occur if both tried to update counter at the same time. Both could read counter, update it, and write it back. The second one to write would overwrite the first, and counter would change value, when after a put and a get it should be back to its original value.

One mechanism for dealing with synchronization is called a monitor. Java provides a version of a monitor. With a monitor there are critical sections of code that only one thread should be allowed to execute at a time. If every method in a class is synchronized (except constructors, which cannot and do not need to be synchronized) then we would have a standard monitor. Java allows more flexibility, in that it allows only some methods in the class to be synchronized. This increases the amount of concurrency allowed, but can cause problems when a programmer fails to realize that certain operations cannot be safely performed simultaneously.

The basic idea of a monitor is that only one thread can be inside of it at a time. If several threads make method calls one is allowed to proceed and the others must wait. One of them will be allowed to proceed once the first thread leaves the critical region (the synchronized method). Java implements this by providing a lock for each object. Only one thread can hold the lock at a time, and it gives it up when it returns from the synchroized method. The other threads wait for the lock to come available.

This provides enough machinery for simple cases like preventing two threads from simultaneously updating a counter. However, we need more machinery to make more general synchronized methods like those in the BoundedBuffer work. Suppose get is called on an empty queue. It should wait until the queue is not empty. One way to do this would be to have a statement:

  while(counter == 0) 
    ;

However, this leads to a problem (beside the one that it wastes a lot of computer time to do this). The only way for the thread to proceed is for another thread to call put to add an item and increment counter. But this cannot happen while the first thread is still in get!

To get around this there are three additional thread methods: wait, notify, and notifyAll. The wait method removes the thread from the synchonized method and puts it into a list of threads waiting on the BoundedBuffer object. (It is possible to wait on any object, not just this, but for now we will only look at this form.) Therefore other methods can proceed and eventually some thread will call put and make an item available.

But if the thread is waiting in a list of waiting threads it is no longer examining counter. How is it to discover that it can now proceed?

The solution is that when an object leaves a synchronized method it calls either notify or notifyAll. The notify method picks an arbitrary method from the list of waiting threads and starts it running again from where it called wait. (Note that the waiting list is not a FIFO queue, which will have implications on fairness that we will discuss later.) The notifyAll method reactivates all of the threads currently waiting, and the first to respond gets to continue. The rest wait for the method, but do not need to be notified again. This is why there is a while loop around the wait rather than an if test. When the first thread leaves the synchronized region another will proceed, and by then the buffer may again be empty.

Note that there are two ways a thread can end up waiting. The first is that a method call to a synchronized method happens when another thread is in a synchronized method. The Java runtime environment takes care of this. The other is to explicitly call wait. Such threads continue to wait until they are signaled via notify or notifyAll.

Sieve of Eratosthenes

Eratosthenes was a Greek born in the third century BC in what is now Libya. He was the third Librarian of the great library of Alexandria. He computed the radius of the earth and the distance to the sun and moon with impressive accuracy. (There is some dispute on how accurate, because he stated his answers in stadia, a distance measure that is approximately the length of a stadium. There were different versions of stadia in use, and the Egyptian stadion, about 157.5 meters, gave very accurate answers.) But our interest in him is due to a method of computing prime numbers called the Sieve of Eratosthenes.

His idea was to write out integers starting at 2. Two is a prime, and he eliminated all multiples of 2 by crossing out second number starting with 4. 3 is the next number so must be prime, and he then crossed out every third number starting with 6. 5 is the next number not yet crossed out, so is a prime, etc.

An obvious way of implementing this uses a boolean array isPrime originally set to all true. Then isPrime[4], isPrime[6], ... are set to false. Then isPrime[6], isPrime[9], ... are set to false and so on. The entries of isPrime that remain true are the primes.

An alternate way is to use threads and bounded buffers. The first thread generates integers starting with 2. It passes these on via a bounded buffer to a second thread that prints out 2 as prime and passes on all integers not divisible by 2. These are fed to a bounded buffer that prints out 3 and passes on the integers the it receives that are not divisible by 3. To generate n primes we create n threads and bounded buffers. The code in Sieve.java does this.

public class Sieve extends Thread {

  private BoundedBuffer in, out;
  private int myprime;

  public Sieve ( BoundedBuffer store ) {
    in = store;
  }

  public void run() {
    int n;
    try {
      // First int seen will be a prime, because it passed all previous filters
      myprime = in.get();  

      if ( myprime < 0 ) {
        return;    // Done with input (base case)
      } else {
        System.out.println(myprime);  // Note each filter prints one prime only!

        out = new BoundedBuffer ( 5 );  

        Sieve s = new Sieve ( out );  // Set up next filter in the chain
        s.start();

        n = in.get();
        while ( n >= 0 ) {
          if ( n % myprime != 0 ) {  // Filter out multiples of myprime
            out.put(n);
          }
          n = in.get();
        }
        out.put(n);  // output the negative to signal end

      }
    } catch (InterruptedException e) {}
  }  


  public static void main ( String[] args ) {

    BoundedBuffer b = new BoundedBuffer(5);
    try { 
      Sieve s = new Sieve(b);
      s.start();

      for ( int i = 2; i <= 100; i++ ) {
        b.put(i);
      }
      b.put(-1);   // Signal that the steam of number is done
    } catch (InterruptedException e) {} 
  }
}

The main method creates a BoundedBuffer b and passes it to a Sieve object. It then writes the integers from 2 to 100 to the bounded buffer, ending by writing -1 to the bounded buffer as a signal that the stream of integers is done.

The run method of Sieve does what was described above. It creates a bounded buffer out and creates a new Sieve that will use out as its input. It prints the first thing it gets from its input buffer and saves it in myPrime. It then gets each successive thing from the bounded buffer and puts it to out if it is not divisible by myPrime. Thus each Sieve object is both a consumer and a producer. It stops when it sees a negative number, but puts that number into the out buffer.