john pfeiffer
  • Home
  • Categories
  • Tags
  • Archives

Concurrency Multithreaded Advanced ExecutorService Callbacks

//2012-11-26 johnpfeiffer
package net.kittyandbear;

import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class ConcurrencyExample
{

    private int arraySize = 400000;
    private int bucketCount = 20; // thread count matches bucket count
    int bucketIndexOfMinimum = 0;
    int indexOfMinimum = 0;
    int currentMinimum;

    public static void main(String args[]) throws InterruptedException
    {
        ConcurrencyExample main = new ConcurrencyExample();

        System.out.println("data = " + main.arraySize + " random integers in each array, " + main.bucketCount + " arrays in the List");
        List<int[]> dataBuckets = new ArrayList<int[]>();

        for (int i = 0; i < main.bucketCount; i++)
        {
            dataBuckets.add(main.setupRandomData(main.arraySize));
        }
        // main.displayRandomData( dataBuckets ); //DEBUG

        System.out.println("\nstart non threaded search for the index of the minimum value...");
        long start = System.currentTimeMillis();

        main.nonThreadedSearch(dataBuckets);
        long nonThreadedElapsed = System.currentTimeMillis() - start;
        System.out.println("NonThreaded = " + nonThreadedElapsed + " ms");
        System.out.println("NonThreaded Minimum " + dataBuckets.get(main.bucketIndexOfMinimum)[main.indexOfMinimum] + " found at "
                + main.bucketIndexOfMinimum + "," + main.indexOfMinimum);


        System.out.println("\nstart threaded search for the index of the minimum value...");
        start = System.currentTimeMillis();

        HashMap <Integer,Integer> threadedSearchResults = main.threadedSearch(dataBuckets);

        int threadedBucketIdOfMinimum = 0;
        int threadedIndexOfMinimum = 0;
        int threadedMinimum = dataBuckets.get(0)[0];

        for (Map.Entry<Integer, Integer> entry : threadedSearchResults.entrySet())
        {
            int currentBucketId = entry.getKey();
            int currentBucketMinimumIndex = entry.getValue();
            int currentBucketMinimum = dataBuckets.get(currentBucketId)[ currentBucketMinimumIndex ];
            if(  currentBucketMinimum <  threadedMinimum )
            {
                threadedBucketIdOfMinimum = currentBucketId;
                threadedIndexOfMinimum = currentBucketMinimumIndex;
                threadedMinimum = currentBucketMinimum;
            }
            System.out.println( "index = " + entry.getValue() + " , value = " + currentBucketMinimum );
        }

        long threadedElapsed = System.currentTimeMillis() - start;
        System.out.println("Threaded = " + threadedElapsed + " ms");
        System.out.println("Threaded Minimum " + dataBuckets.get(threadedBucketIdOfMinimum)[threadedIndexOfMinimum] + " found at "
                + threadedBucketIdOfMinimum + "," + threadedIndexOfMinimum);

    }

    int[] setupRandomData(int size)
    {
        int array[] = new int[size];
        for (int i = 0; i < size; i++)
        {
            SecureRandom r = new SecureRandom();
            array[i] = r.nextInt();
        }
        return array;
    }

    void displayRandomData(List<int[]> dataBuckets)
    {
        int i = 0;
        for (int current[] : dataBuckets)
        {
            System.out.println("\n\nBucket # " + i);
            for (int k = 0; k < current.length; k++)
            {
                System.out.println(current[k]);
            }
            i++;
        }
    }

    void nonThreadedSearch(List<int[]> dataBuckets)
    {
        int currentMinimum = dataBuckets.get(bucketIndexOfMinimum)[indexOfMinimum];

        int i = 0;
        for (int current[] : dataBuckets)
        {
            // System.out.println("\n\n NonThreaded Searching Bucket # " + i);
            for (int k = 0; k < current.length; k++)
            {
                if (current[k] < currentMinimum)
                {
                    bucketIndexOfMinimum = i;
                    indexOfMinimum = k;
                    currentMinimum = current[k];
                }
            }
            i++;
        }
    }

    // Assign each bucket a thread
    HashMap<Integer,Integer> threadedSearch(List<int[]> dataBuckets )
    {
        HashMap<Integer,Integer> results = new HashMap <Integer,Integer>();
        ExecutorService executor = Executors.newFixedThreadPool(bucketCount);

        HashMap<Integer, Future<Integer>> map = new HashMap<Integer,Future<Integer>>();
        for (int i = 0; i < dataBuckets.size(); i++)
        {
            Callable<Integer> worker = new MyCallable(dataBuckets.get(i));
            Future<Integer> future = executor.submit(worker);
            map.put(i, future);
        }

        for (Map.Entry<Integer, Future<Integer>> entry : map.entrySet())
        {
             try
            {
                 Future<Integer> futureValue = entry.getValue();
                 results.put(entry.getKey(), futureValue.get());
            }
            catch (ExecutionException e)
            {
                e.printStackTrace();
            }
            catch (InterruptedException e)
            {
                e.printStackTrace();
            }
         }

        executor.shutdown();
        return results;
    }

    // Object (the "task") passed to a thread constructor and can return a value
    public class MyCallable implements Callable<Integer>
    {
        private int data[];

        MyCallable(int data[])
        {
            this.data = data;
        }

        @Override
        public Integer call() throws Exception
        {
            int indexOfMinimum = 0;
            for (int i = 0; i < data.length; i++)
            {
                if (data[i] < data[indexOfMinimum])
                {
                    indexOfMinimum = i;
                }
            }
            return indexOfMinimum;
        }

    } // end inner class MyCallable

} // end class

  • « Concurrency Multithreaded Simple
  • OxygenSpaceCLI pom.xml »

Published

Nov 27, 2012

Category

java

~383 words

Tags

  • advanced 5
  • callbacks 2
  • concurrency 10
  • executorservice 1
  • java 252
  • multithreaded 5