//2012-11-26 johnpfeiffer
package net.kittyandbear;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class ConcurrencyExample
{
private int arraySize = 400000;
private int bucketCount = 20; // thread count matches bucket count
int bucketIndexOfMinimum = 0;
int indexOfMinimum = 0;
int currentMinimum;
public static void main(String args[]) throws InterruptedException
{
ConcurrencyExample main = new ConcurrencyExample();
System.out.println("data = " + main.arraySize + " random integers in each array, " + main.bucketCount + " arrays in the List");
List<int[]> dataBuckets = new ArrayList<int[]>();
for (int i = 0; i < main.bucketCount; i++)
{
dataBuckets.add(main.setupRandomData(main.arraySize));
}
// main.displayRandomData( dataBuckets ); //DEBUG
System.out.println("\nstart non threaded search for the index of the minimum value...");
long start = System.currentTimeMillis();
main.nonThreadedSearch(dataBuckets);
long nonThreadedElapsed = System.currentTimeMillis() - start;
System.out.println("NonThreaded = " + nonThreadedElapsed + " ms");
System.out.println("NonThreaded Minimum " + dataBuckets.get(main.bucketIndexOfMinimum)[main.indexOfMinimum] + " found at "
+ main.bucketIndexOfMinimum + "," + main.indexOfMinimum);
System.out.println("\nstart threaded search for the index of the minimum value...");
start = System.currentTimeMillis();
HashMap <Integer,Integer> threadedSearchResults = main.threadedSearch(dataBuckets);
int threadedBucketIdOfMinimum = 0;
int threadedIndexOfMinimum = 0;
int threadedMinimum = dataBuckets.get(0)[0];
for (Map.Entry<Integer, Integer> entry : threadedSearchResults.entrySet())
{
int currentBucketId = entry.getKey();
int currentBucketMinimumIndex = entry.getValue();
int currentBucketMinimum = dataBuckets.get(currentBucketId)[ currentBucketMinimumIndex ];
if( currentBucketMinimum < threadedMinimum )
{
threadedBucketIdOfMinimum = currentBucketId;
threadedIndexOfMinimum = currentBucketMinimumIndex;
threadedMinimum = currentBucketMinimum;
}
System.out.println( "index = " + entry.getValue() + " , value = " + currentBucketMinimum );
}
long threadedElapsed = System.currentTimeMillis() - start;
System.out.println("Threaded = " + threadedElapsed + " ms");
System.out.println("Threaded Minimum " + dataBuckets.get(threadedBucketIdOfMinimum)[threadedIndexOfMinimum] + " found at "
+ threadedBucketIdOfMinimum + "," + threadedIndexOfMinimum);
}
int[] setupRandomData(int size)
{
int array[] = new int[size];
for (int i = 0; i < size; i++)
{
SecureRandom r = new SecureRandom();
array[i] = r.nextInt();
}
return array;
}
void displayRandomData(List<int[]> dataBuckets)
{
int i = 0;
for (int current[] : dataBuckets)
{
System.out.println("\n\nBucket # " + i);
for (int k = 0; k < current.length; k++)
{
System.out.println(current[k]);
}
i++;
}
}
void nonThreadedSearch(List<int[]> dataBuckets)
{
int currentMinimum = dataBuckets.get(bucketIndexOfMinimum)[indexOfMinimum];
int i = 0;
for (int current[] : dataBuckets)
{
// System.out.println("\n\n NonThreaded Searching Bucket # " + i);
for (int k = 0; k < current.length; k++)
{
if (current[k] < currentMinimum)
{
bucketIndexOfMinimum = i;
indexOfMinimum = k;
currentMinimum = current[k];
}
}
i++;
}
}
// Assign each bucket a thread
HashMap<Integer,Integer> threadedSearch(List<int[]> dataBuckets )
{
HashMap<Integer,Integer> results = new HashMap <Integer,Integer>();
ExecutorService executor = Executors.newFixedThreadPool(bucketCount);
HashMap<Integer, Future<Integer>> map = new HashMap<Integer,Future<Integer>>();
for (int i = 0; i < dataBuckets.size(); i++)
{
Callable<Integer> worker = new MyCallable(dataBuckets.get(i));
Future<Integer> future = executor.submit(worker);
map.put(i, future);
}
for (Map.Entry<Integer, Future<Integer>> entry : map.entrySet())
{
try
{
Future<Integer> futureValue = entry.getValue();
results.put(entry.getKey(), futureValue.get());
}
catch (ExecutionException e)
{
e.printStackTrace();
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
executor.shutdown();
return results;
}
// Object (the "task") passed to a thread constructor and can return a value
public class MyCallable implements Callable<Integer>
{
private int data[];
MyCallable(int data[])
{
this.data = data;
}
@Override
public Integer call() throws Exception
{
int indexOfMinimum = 0;
for (int i = 0; i < data.length; i++)
{
if (data[i] < data[indexOfMinimum])
{
indexOfMinimum = i;
}
}
return indexOfMinimum;
}
} // end inner class MyCallable
} // end class