Header Ads Widget

Responsive Advertisement

Efficient Multithreaded Task Execution in Java with ExecutorService


Multithreaded task execution in Java using ExecutorService, FutureTask, and Callable allows for efficient execution of concurrent tasks while managing thread pools, enabling better resource management and scalability. Here's an overview of each component and how they work together:

1. ExecutorService

ExecutorService is part of Java's concurrency framework (java.util.concurrent). It abstracts the creation, management, and termination of threads, allowing you to manage a pool of threads to which tasks can be submitted.

Ø  You can create an ExecutorService using the Executors factory methods, such as:

java

ExecutorService executor = Executors.newFixedThreadPool(2); // A pool of 2 threads

 

Ø  Submit Tasks: You can submit tasks (e.g., Runnable or Callable) to the ExecutorService. It schedules them and manages thread execution.

java

executor.submit(() -> {

    // Task logic here

});

 

Ø  Shutdown: Once tasks are done, the ExecutorService should be shut down:

java

executor.shutdown();

 

2. Callable

Callable is a functional interface representing a task that returns a result and can throw a checked exception. It is similar to Runnable, but with the ability to return a result or throw an exception.

Ø  Example of a Callable:

java

Callable<Integer> task = () -> {

    // Some computation

    return 42; // Returning a result

};

 

3. Future and FutureTask

Future is a placeholder for the result of an asynchronous computation. When a Callable is submitted to an ExecutorService, it returns a Future object representing the pending result. You can retrieve the result once the task completes using future.get().

Ø  FutureTask: FutureTask implements Future and can be used to manually create and manage tasks.

java

FutureTask<Integer> futureTask = new FutureTask<>(task);

 

Example: Multithreaded Task Execution Using ExecutorService, FutureTask, and Callable

java

import java.util.ArrayList;

import java.util.List;

import java.util.concurrent.*;

 

public class MultithreadedTaskExecution {

 

    // Simulated task that takes some time and returns a result

    public static Callable<Integer> createTask(final int taskId) {

        return () -> {

            System.out.println("Task " + taskId + " started by " + Thread.currentThread().getName());

            Thread.sleep(2000); // Simulate long-running task

            System.out.println("Task " + taskId + " completed by " + Thread.currentThread().getName());

            return taskId;

        };

    }

 

    public static void main(String[] args) throws InterruptedException, ExecutionException {

 

        // Create an ExecutorService with a fixed thread pool of 3 threads

        ExecutorService executor = Executors.newFixedThreadPool(3);

 

        // Create a list to hold FutureTask objects

        List<FutureTask<Integer>> futureTasks = new ArrayList<>();

 

        // Submit tasks to the ExecutorService

        for (int i = 1; i <= 5; i++) {

            FutureTask<Integer> futureTask = new FutureTask<>(createTask(i));

            futureTasks.add(futureTask);

            executor.submit(futureTask); // Submitting task to the thread pool

        }

 

        // Retrieve and process results from FutureTasks

        for (FutureTask<Integer> futureTask : futureTasks) {

            Integer result = futureTask.get(); // Blocking call until task completes

            System.out.println("Task result: " + result);

        }

 

        // Shutdown the ExecutorService after task completion

        executor.shutdown();

        System.out.println("All tasks completed.");

    }

}

 

How This Code Works:

  1. Task Creation:

ü  The createTask method returns a Callable<Integer> representing each task. Each task simply prints messages, simulates work using Thread.sleep(2000), and returns the taskId as the result.

  1. ExecutorService:

ü  The ExecutorService is created using Executors.newFixedThreadPool(3), which allocates a pool of 3 threads to run tasks.

  1. Task Submission:

ü  Tasks are submitted to the ExecutorService as FutureTask objects using the submit method. This allows us to run tasks asynchronously and retrieve their results later.

  1. Result Retrieval:

ü  The main thread waits for each task to complete using futureTask.get(). This method blocks until the task has finished and returns the result of the task (in this case, the taskId).

  1. Thread Pool Shutdown:

ü  After all tasks are complete, the executor.shutdown() method is called to stop accepting new tasks and cleanly shut down the thread pool.

Output Example:

Task 1 started by pool-1-thread-1

Task 2 started by pool-1-thread-2

Task 3 started by pool-1-thread-3

Task 1 completed by pool-1-thread-1

Task result: 1

Task 4 started by pool-1-thread-1

Task 2 completed by pool-1-thread-2

Task result: 2

Task 5 started by pool-1-thread-2

Task 3 completed by pool-1-thread-3

Task result: 3

Task 4 completed by pool-1-thread-1

Task result: 4

Task 5 completed by pool-1-thread-2

Task result: 5

All tasks completed.

 

All tasks completed.

Benefits of Using ExecutorService, Callable, and FutureTask:

  1. Efficient Thread Management: ExecutorService manages a pool of threads and reuses them for different tasks, avoiding the overhead of creating and destroying threads manually.
  2. Asynchronous Execution: By submitting tasks as Callable and using Future objects, tasks are executed asynchronously, allowing other operations to proceed while the tasks complete in the background.
  3. Result Handling: FutureTask and Future enable retrieving results or exceptions from tasks once they finish, without blocking the main thread.
  4. Task Coordination: You can submit multiple tasks and retrieve their results in any order using future.get(), making it easier to coordinate and manage concurrent operations.

Notes:

Ø  You can use more advanced techniques, like using CompletionService or handling thread-pool tuning for optimal performance in larger, real-world applications.

Ø  Handling exceptions in Callable tasks and shutting down ExecutorService properly (using awaitTermination(), for example) are important to ensure robustness.

 

 

java

package com.kartik;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;

public class ThreadBaseMultitasking {

    /**
     * Maximum amount of numbers to check
     */
    public static final int MAX_NUMBER = 2000000000;

    /**
     * Returns the amount of numbers that can be divided by the divisor without remainder.
     * @param first First number to check
     * @param last Last number to check
     * @param divisor Divisor
     * @return Amount of numbers that can be divided by the divisor without remainder
     */
    public static int totalTaskExecution(int first, int last, int divisor) {

        int amount = 0;
        for (int i = first; i <= last; i++) {
            if (i % divisor == 0) {
                amount++;
            }
        }
        return amount;
    }

    /**
     * Returns the amount of numbers that can be divided by the divisor without remainder (using parallel execution).
     * @param first First number to check
     * @param last Last number to check
     * @param divisor Divisor
     * @return Amount of numbers that can be divided by the divisor without remainder
     * @throws InterruptedException
     * @throws ExecutionException
     */
    public static int totalTaskExecutionByNumberOfPoolThreadwise(final int first, final int last, final int divisor)
            throws InterruptedException, ExecutionException {

        int amount = 0;

        // Prepare to execute and store the Futures
        int threadNum = 2;
        ExecutorService executor = Executors.newFixedThreadPool(threadNum);
        List<FutureTask<Integer>> taskList = new ArrayList<FutureTask<Integer>>();

        // Start thread for the first half of the numbers
        FutureTask<Integer> futureTask_1 = new FutureTask<Integer>(new Callable<Integer>() {
            @Override
            public Integer call() {
                return ThreadBaseMultitasking.totalTaskExecution(first, last / 2, divisor);
            }
        });
        taskList.add(futureTask_1);
        executor.execute(futureTask_1);

        // Start thread for the second half of the numbers
        FutureTask<Integer> futureTask_2 = new FutureTask<Integer>(new Callable<Integer>() {
            @Override
            public Integer call() {
                return ThreadBaseMultitasking.totalTaskExecution(last / 2 + 1, last, divisor);
            }
        });
        taskList.add(futureTask_2);
        executor.execute(futureTask_2);

        // Wait until all results are available and combine them at the same time
        for (int j = 0; j < threadNum; j++) {
            FutureTask<Integer> futureTask = taskList.get(j);
            amount += futureTask.get();
        }
        executor.shutdown();

        return amount;
    }

    /**
     * Executing the example.
     * @param args Command line arguments
     * @throws ExecutionException
     * @throws InterruptedException
     */
    public static void main(String[] args) throws InterruptedException, ExecutionException {

        // Sequential execution
        long timeStart = Calendar.getInstance().getTimeInMillis();
        int result = ThreadBaseMultitasking.totalTaskExecution(0, MAX_NUMBER, 3);
        long timeEnd = Calendar.getInstance().getTimeInMillis();
        long timeNeeded = timeEnd - timeStart;
        System.out.println("Result         : " + result + " calculated in " + timeNeeded + " ms");

        // Parallel execution
        long timeStartFuture = Calendar.getInstance().getTimeInMillis();
        int resultFuture = ThreadBaseMultitasking.totalTaskExecutionByNumberOfPoolThreadwise(0, MAX_NUMBER, 3);
        long timeEndFuture = Calendar.getInstance().getTimeInMillis();
        long timeNeededFuture = timeEndFuture - timeStartFuture;
        System.out.println("Result (Future): " + resultFuture + " calculated in " + timeNeededFuture + " ms");
    }
}

 

multithreaded task execution in Java using the ExecutorService, FutureTask, and Callable interfaces to calculate how many numbers from 0 to a given MAX_NUMBER (in this case, 2 billion) are divisible by a specified divisor.

Key Concepts:

  1. Sequential Execution (totalTaskExecution):

ü  This method checks every number between first and last and counts how many are divisible by the provided divisor. This is a basic, single-threaded operation.

ü  It iterates from first to last and increments a counter whenever a number is divisible by the divisor.

  1. Parallel Execution (totalTaskExecutionByNumberOfPoolThreadwise):

ü  This method improves performance by using a fixed thread pool to divide the work across multiple threads.

ü  The task is split into two: one thread handles the first half of the range, and the second thread handles the second half.

ü  Each task runs concurrently, and the results are aggregated after both tasks complete.

Breakdown of Code:

1. totalTaskExecution Method:

This method performs a straightforward sequential calculation of how many numbers are divisible by the divisor.

Ø  It loops through each number from first to last and checks if the number is divisible by divisor without a remainder.

Ø  If divisible, it increments a counter and returns the total count.

2. totalTaskExecutionByNumberOfPoolThreadwise Method:

This method utilizes multithreading to perform the same task but more efficiently using two threads.

Ø  It splits the range of numbers in half:

ü  The first thread checks from first to last / 2.

ü  The second thread checks from last / 2 + 1 to last.

Ø  The two threads run concurrently, and each thread's result is stored as a FutureTask.

Ø  After both threads complete their work, the results are combined and returned.

3. main Method:

This method runs both sequential and parallel versions of the task and measures the time taken for each approach:

Ø  It first calls the totalTaskExecution method to run the task sequentially and calculates the execution time.

Ø  Then, it calls totalTaskExecutionByNumberOfPoolThreadwise to perform the task in parallel and calculates the execution time for this approach.

Ø  The results and execution times are printed to compare the performance of the two approaches.

Example Output (with numbers reduced for demonstration):

Result         : 666666667 calculated in 3000 ms

Result (Future): 666666667 calculated in 1500 ms

 

In this example, the parallel version would typically complete faster because it divides the workload between two threads. The actual time taken will depend on the system's available resources and thread scheduling.

Improvements:

Ø  Thread Pool Size: The thread pool is currently set to two threads (threadNum = 2). You can experiment with increasing this number based on the number of CPU cores available on the system.

Ø  Dynamic Task Division: Instead of splitting the task into only two threads, it could be beneficial to split the range further and use more threads for greater parallelism, especially on machines with many cores.

 

 

Efficient Multithreaded Task Execution in Java with ExecutorService
Efficient Multithreaded Task Execution in Java with ExecutorService



Post a Comment

0 Comments