Spring Batch : Tasklet with multi threaded executor has very bad performances related to Throttling algorithm Spring Batch : Tasklet with multi threaded executor has very bad performances related to Throttling algorithm spring spring

Spring Batch : Tasklet with multi threaded executor has very bad performances related to Throttling algorithm


As Alex said, it seems this behaviour is a contract as per javadocs of :

Subclasses just need to provide a method that gets the next result * and one that waits for all the results to be returned from concurrent * processes or threads

Look at:

TaskExecutorRepeatTemplate#waitForResults

Another option for you would be to use Partitioning :

  • A TaskExecutorPartitionHandler that will execute items from Partitionned ItemReader, see below
  • A Partitioner implementation that gives the ranges to be processed by ItemReader, see ColumnRangePartitioner below
  • A CustomReader that will read data using what Partitioner will have filled, see myItemReader configuration below

Michael Minella explains this in Chapter 11 of his book Pro Spring Batch:

<batch:job id="batchWithPartition">    <batch:step id="step1.master">        <batch:partition  partitioner="myPartitioner" handler="partitionHandler"/>    </batch:step>       </batch:job><!-- This one will create Paritions of Number of lines/ Grid Size--> <bean id="myPartitioner" class="....ColumnRangePartitioner"/><!-- This one will handle every partition in a Thread --><bean id="partitionHandler" class="org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler">    <property name="taskExecutor" ref="multiThreadedTaskExecutor"/>    <property name="step" ref="step1" />    <property name="gridSize" value="10" /></bean><batch:step id="step1">        <batch:tasklet transaction-manager="transactionManager">            <batch:chunk reader="myItemReader"                writer="manipulatableWriterForTests" commit-interval="1"                skip-limit="30000">                <batch:skippable-exception-classes>                    <batch:include class="java.lang.Exception" />                </batch:skippable-exception-classes>            </batch:chunk>        </batch:tasklet></batch:step> <!-- scope step is critical here--><bean id="myItemReader"                            class="org.springframework.batch.item.database.JdbcCursorItemReader" scope="step">    <property name="dataSource" ref="dataSource"/>    <property name="sql">        <value>            <![CDATA[                select * from customers where id >= ? and id <=  ?            ]]>        </value>    </property>    <property name="preparedStatementSetter">        <bean class="org.springframework.batch.core.resource.ListPreparedStatementSetter">            <property name="parameters">                <list> <!-- minValue and maxValue are filled in by Partitioner for each Partition in an ExecutionContext-->                    <value>{stepExecutionContext[minValue]}</value>                    <value>#{stepExecutionContext[maxValue]}</value>                </list>            </property>        </bean>    </property>    <property name="rowMapper" ref="customerRowMapper"/></bean>

Partitioner.java:

 package ...;  import java.util.HashMap;   import java.util.Map; import org.springframework.batch.core.partition.support.Partitioner; import org.springframework.batch.item.ExecutionContext; public class ColumnRangePartitioner  implements Partitioner { private String column; private String table; public Map<String, ExecutionContext> partition(int gridSize) {    int min =  queryForInt("SELECT MIN(" + column + ") from " + table);    int max = queryForInt("SELECT MAX(" + column + ") from " + table);    int targetSize = (max - min) / gridSize;    System.out.println("Our partition size will be " + targetSize);    System.out.println("We will have " + gridSize + " partitions");    Map<String, ExecutionContext> result = new HashMap<String, ExecutionContext>();    int number = 0;    int start = min;    int end = start + targetSize - 1;    while (start <= max) {        ExecutionContext value = new ExecutionContext();        result.put("partition" + number, value);        if (end >= max) {            end = max;        }        value.putInt("minValue", start);        value.putInt("maxValue", end);        System.out.println("minValue = " + start);        System.out.println("maxValue = " + end);        start += targetSize;        end += targetSize;        number++;    }    System.out.println("We are returning " + result.size() + " partitions");    return result;}public void setColumn(String column) {    this.column = column;}public void setTable(String table) {    this.table = table;}}


Here's what I think is going on:

  • As you said, your ThreadPoolTaskExecutor is limited to 15 threads
  • The framework's "chunk" is causing each item in the JdbcCursorItemReader (up to the thread limit) to be executed in a different thread
  • But the Spring Batch framework is also waiting for each of the threads (i.e., all 15) to complete their individual read/process/write flow before moving onto the next chunk, given your commit interval of 1. On occasion, this is causing 14 threads to wait almost 60 seconds on a sibling thread that is taking forever to complete.

In other words, for this multi-threaded approach in Spring Batch to be helpful, each thread needs to process in about the same amount of time. Given your scenario where there is a huge disparity between the processing time of certain items, you are experiencing a limitation where many of your threads are complete and waiting on a long-running sibling thread to be able to move onto the next chunk of processing.

My suggestion:

  • Generally, I'd say that increasing your commit interval should help somewhat, since it should allow more than one cursor item to be processed in a single thread in between commits even if one of the threads is stuck on a long-running write. However, if you're unlucky, multiple long transactions could occur in the same thread and make matters worse (e.g., 120 sec. between commits in a single thread for a commit interval of 2).
  • Specifically, I'd suggest increasing your thread pool size to a big number, even exceeding your max database connections by 2x or 3x. What should happen is that even though some of your threads will block trying to acquire a connection (because of the large thread pool size), you'll actually see an increase in throughput as your long-running threads are no longer stopping other threads from taking new items from the cursor and continuing your batch job's work in the meantime (at the beginning of a chunk, your number of pending threads will greatly exceed your number of available database connections. So the OS scheduler will churn a bit as it activates threads that are blocked on acquiring a database connection and has to deactivate the thread. However, since most of your threads will complete their work and release their database connection relatively quickly, you should see that overall your throughput is improved as many threads continue acquiring database connections, doing work, releasing database connections, and allowing further threads to do the same even while your long-running threads are doing their thing).


In my case, if i don't set the throttle-limit, then only 4 threads come in read() method of ItemReader which is also the default number of threads, if not specified in tasklet tag as per Spring Batch documentation.

If i specify more threads e.g 10 or 20 or 100, then only 8 threads come in read() method of ItemReader