收录日期:2019/04/20 17:06:00 时间:2010-06-22 20:24:05 标签:java,multithreading

I have a java application where the main-thread starts 2 other threads. If one of these threads terminates, the main-thread may start another thread depending on the result of the terminated thread.

Example: The main-thread creates 2 threads: A and B. Thread A will load a picture and thread B will load another picture. If A terminates and loaded the picture successfully a new Thread C will be created which does some other stuff and so on.

How can i do this? I do not want to use busy waiting in the main thread and check every 100ms if one of the two threads has finished. I think i cannot use a thread pool because the number of active threads (in this case A and B) will vary extremely and it's the main-threads dicision to create a new thread or not.

This is rough sketch of the "busy waiting" solution:

public class TestThreads {
 private class MyThread extends Thread {
  volatile boolean done = false;
  int steps;

  @Override
  public void run() {
   for (int i=0; i<steps; i++) {
    System.out.println(Thread.currentThread().getName() + ": " + i);
    try {
     Thread.sleep(1000);
    } catch (InterruptedException exc) {  }
   }
   done = true;
   synchronized (this) {
    notify();
   }
  }

  public void waitFor(long ms) {
   synchronized (this) {
    try {
     wait(ms);
    } catch (InterruptedException exc) {  }    
   }
  }
 }

 public void startTest() {
  MyThread a = new MyThread();
  a.steps = 6;
  a.start();

  MyThread b = new MyThread();
  b.steps = 3;
  b.start();

  while (true) {
   if (!a.done) {
    a.waitFor(100);
    if (a.done) {
     System.out.println("C will be started, because A is done.");
    }
   }

   if (!b.done) {
    b.waitFor(100);
    if (b.done) {
     System.out.println("C will be started, because B is done.");
    }
   }

   if (a.done && b.done) {
    break;
   }
  }
 }

 public static void main(String[] args) {
  TestThreads test = new TestThreads();
  test.startTest();
 }
}

This sounds like a classic case for using a ThreadPoolExecutor for performing the tasks concurrently, and wrapping it with an ExecutorCompletionService, for collecting the results as they arrive.

For example, assuming that tasks contains a set of tasks to execute in parallel, each returning a String value when it terminates, the code to process the results as they become available can be something like:

List<Callable<String>> tasks = ....;
Executor ex = Executors.newFixedThreadPool(10);
ExecutorCompletionService<String> ecs = new ExecutorCompletionService<String>(ex);
for (Callable<String> task : tasks) 
    ecs.submit(task);
for(int i = 0; i < tasks.size(); i++) {
    String result = ecs.take().get();
    //Do something with result
}

If you include the identity of the task as a part of the returned value, then you can make decisions depending on the completion order.

Check Semaphore

A counting semaphore. Conceptually, a semaphore maintains a set of permits. Each acquire() blocks if necessary until a permit is available, and then takes it

So, whenever you thread finishes, it frees one permit, which is then acquired by the main thread

You should use a thread pool. In a thread pool, you have a fixed number of threads and tasks are kept in a queue; whenever a thread is available, a task is taken off the queue and executed by that thread.

Here is a link to the Sun tutorial on thread pooling.

Edit: just noticed that you wrote in your answer that you think you cannot use thread pooling. I don't see why this is the case. You can set threads to be created on-demand rather than all at once if you are worried about creation overhead, and once created an idle thread is not really going to hurt anything.

You also say that it's the main thread's decision to create a new Thread or not, but does it really need to be? I think that may just overcomplicate things for you.

Is there a reason to control the thread execution directly instead of using something like ExecutorService?

@danben got there first, but I fell into the same pooling trap.

A lot of the complexity in your code is that the main thread is trying to wait on two different objects. There's nothing which says you can't use wait and notify on another object, and if your tasks are ( A or B ) then C, the code below will work - wait on a reference which is set to indicate the first task to complete.

import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;

public class BiggieThreads
{
    private static class MyTask implements Runnable
    {
        final int steps;
        final AtomicReference<MyTask> shared;
        final String name;

        MyTask ( int steps, AtomicReference<MyTask> shared, String name )
        {
            this.shared = shared;
            this.steps = steps;
            this.name = name;
        }

        @Override
        public void run()
        {
            for ( int i = 1; i <= steps; i++ ) {
                System.out.println ( "Running: " + this  + " " + i + "/" + steps);
                try {
                    Thread.sleep ( 100 );
                } catch ( InterruptedException exc ) {  }
            }

            // notify if this is the first to complete
            if ( shared.compareAndSet ( null, this ) )
                synchronized ( shared ) {
                    shared.notify();
                }

            System.out.println ( "Completed: " + this );
        }

        @Override
        public String toString ()
        {
            return name;
        }
    }

    public void startTest() throws InterruptedException
    {
        final ExecutorService pool = Executors.newFixedThreadPool ( 3 );
        final AtomicReference<MyTask> shared = new AtomicReference<MyTask>();

        Random random = new Random();

        synchronized ( shared ) {
            // tasks launched while lock on shared held to prevent
            // them notifying before this thread waits
            pool.execute ( new MyTask ( random.nextInt ( 5 ) + 3, shared, "a" ) );
            pool.execute ( new MyTask ( random.nextInt ( 5 ) + 3, shared, "b" ) );

            shared.wait();
        }

        System.out.println ( "Reported: " + shared.get() );

        pool.shutdown();
    }

    public static void main ( String[] args ) throws InterruptedException
    {
        BiggieThreads test = new BiggieThreads ();
        test.startTest();
    }
}

I'd tend to use a semaphore for this job in production, as although the wait is quite simple, using in semaphore puts a name to the behaviour, so there's less to work out when you next read the code.