I believe that I have found the solution. Close the bulk job at the end of the processing.
Keep a count of how many batches are extracted, for example in the variable "finalBatchCount"
Now once all the batches are read, just before closing the Bulk Query Job, call the "bulkConnection.getBatchInfoList(job.getId()).getBatchInfo().length" again, and if this is less then "finalBatchCount", then get the records from the rest of the batches.
Once all the batches are read, close the Bulk Query Job
Looping this way will make sure that all the batches have been read. Attached is the code for reference
import java.io.*;
import java.util.*;
import com.sforce.async.*;
import com.sforce.soap.partner.PartnerConnection;
import com.sforce.ws.ConnectionException;
import com.sforce.ws.ConnectorConfig;
public class BulkQueryDirect {
public static void main(String[] args)
throws AsyncApiException, ConnectionException, IOException {
BulkQueryDirect exampleQuery = new BulkQueryDirect();
System.setProperty("https.protocols", "TLSv1.2");
BulkConnection bulkConnection = exampleQuery.login();
bulkConnection.addHeader("Sforce-Enable-PKChunking","chunkSize=200");
exampleQuery.doBulkQuery(bulkConnection);
}
public BulkConnection login() {
String userName = "abc@abc.com";
String passWord = "passwordSecuritytoken";
String url = "https://login.salesforce.com/services/Soap/u/32.0";
BulkConnection _bulkConnection = null;
try {
ConnectorConfig partnerConfig = new ConnectorConfig();
partnerConfig.setUsername(userName);
partnerConfig.setPassword(passWord);
partnerConfig.setAuthEndpoint(url);
new PartnerConnection(partnerConfig);
ConnectorConfig config = new ConnectorConfig();
config.setSessionId(partnerConfig.getSessionId());
String soapEndpoint = partnerConfig.getServiceEndpoint();
String apiVersion = "32.0";
String restEndpoint = soapEndpoint.substring(0, soapEndpoint.indexOf("Soap/"))
+ "async/" + apiVersion;
config.setRestEndpoint(restEndpoint);
config.setCompression(true);
config.setTraceMessage(false);
_bulkConnection = new BulkConnection(config);
} catch (AsyncApiException aae) {
aae.printStackTrace();
} catch (ConnectionException ce) {
ce.printStackTrace();
}
return _bulkConnection;
}
public void doBulkQuery(BulkConnection bulkConnection) {
try {
int finalBatchCount=0;
JobInfo job = new JobInfo();
job.setObject("tmpTest__c");
job.setOperation(OperationEnum.query);
job.setConcurrencyMode(ConcurrencyMode.Parallel);
job.setContentType(ContentType.CSV);
job = bulkConnection.createJob(job);
assert job.getId() != null;
System.out.println("Job id is " + job.getId());
job = bulkConnection.getJobStatus(job.getId());
String query = "SELECT Name, Id FROM tmpTest__c";
ByteArrayInputStream bout = new ByteArrayInputStream(query.getBytes());
bulkConnection.createBatchFromStream(job, bout);
BatchInfo[] bListInfo = bulkConnection.getBatchInfoList(job.getId()).getBatchInfo();
while(bListInfo.length < 2)
bListInfo = bulkConnection.getBatchInfoList(job.getId()).getBatchInfo();
System.out.println("Numbe of batches is " + bListInfo.length);
int numberOfBatchesForQueryExtract = 0;
int numberOfRecordsExtracted = 0;
for(int ib=1; ib < bListInfo.length; ib++)
{
finalBatchCount++;
BatchInfo info = bListInfo[ib];
numberOfBatchesForQueryExtract++;
String[] queryResults = null;
for(int i=0; i<10000; i++)
{
info = bulkConnection.getBatchInfo(job.getId(), info.getId());
if (info.getState() == BatchStateEnum.Completed)
{
QueryResultList list = bulkConnection.getQueryResultList(job.getId(),info.getId());
queryResults = list.getResult();
break;
}
else if (info.getState() == BatchStateEnum.Failed)
{
System.out.println("-------------- failed ----------" + info);
break;
}
else
{
System.out.println("-------------- waiting ----------" + info);
}
}
System.out.println("Batch " + ib + " data is");
if (queryResults != null)
{
for (String resultId : queryResults)
{
InputStream resultStream = bulkConnection.getQueryResultStream(job.getId(), info.getId(), resultId);
BufferedReader lineReader = new BufferedReader(new InputStreamReader(resultStream,"UTF8"));
String lineString = null;
while((lineString = lineReader.readLine()) != null)
{
System.out.println("lineString : " + lineString);
numberOfRecordsExtracted++;
}
}
}
}
while(finalBatchCount < (bulkConnection.getBatchInfoList(job.getId()).getBatchInfo().length -1))
{
bListInfo = bulkConnection.getBatchInfoList(job.getId()).getBatchInfo();
for(int ib=finalBatchCount; ib < bListInfo.length; ib++)
{
finalBatchCount++;
BatchInfo info = bListInfo[ib];
numberOfBatchesForQueryExtract++;
String[] queryResults = null;
for(int i=0; i<10000; i++)
{
info = bulkConnection.getBatchInfo(job.getId(), info.getId());
if (info.getState() == BatchStateEnum.Completed)
{
QueryResultList list = bulkConnection.getQueryResultList(job.getId(),info.getId());
queryResults = list.getResult();
break;
}
else if (info.getState() == BatchStateEnum.Failed)
{
System.out.println("-------------- failed ----------" + info);
break;
}
else
{
System.out.println("-------------- waiting ----------" + info);
}
}
System.out.println("Batch " + ib + " data is");
if (queryResults != null)
{
for (String resultId : queryResults)
{
InputStream resultStream = bulkConnection.getQueryResultStream(job.getId(), info.getId(), resultId);
BufferedReader lineReader = new BufferedReader(new InputStreamReader(resultStream,"UTF8"));
String lineString = null;
while((lineString = lineReader.readLine()) != null)
{
System.out.println("lineString : " + lineString);
numberOfRecordsExtracted++;
}
}
}
}
}
bulkConnection.closeJob(job.getId());
System.out.println("Number of batches created with chunkSize 2000 is " + numberOfBatchesForQueryExtract);
System.out.println("Number of total records extracted is " + numberOfRecordsExtracted);
System.out.println("Final Total Batch Count is " + finalBatchCount);
System.out.println("BatchList length is " + bulkConnection.getBatchInfoList(job.getId()).getBatchInfo().length);
} catch (AsyncApiException aae) {
aae.printStackTrace();
} catch (UnsupportedEncodingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public String convertInputStreamToString(InputStream inputStream)
{
StringBuilder result = new StringBuilder();
String line;
String newLine = System.getProperty("line.separator");
boolean flag = false;
try {
BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream,"UTF8"));
while ((line = reader.readLine()) != null) {
result.append(flag? newLine: "").append(line);
flag = true;
}
} catch (IOException e) {
e.printStackTrace();
}
return result.toString();
}
public static int countLines(String filename)
{
InputStream is = null;
try {
is = new BufferedInputStream(new FileInputStream(filename));
byte[] c = new byte[1024];
int count = 0;
int readChars = 0;
boolean empty = true;
while ((readChars = is.read(c)) != -1) {
empty = false;
for (int i = 0; i < readChars; ++i) {
if (c[i] == '\n') {
++count;
}
}
}
return (count == 0 && !empty) ? 1 : count;
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
try {
is.close();
} catch (IOException e) {
e.printStackTrace();
}
}
return 0;
}
}
Best Answer
The batch size thats specified in the document is for a single batch.You can process as many batches as you like limiting to 2000 per 24 hour rolling period.
10k is the maximum batch size that you can set per batch.So say you have 50K records then 5 batches will be the minimum no of batches required .
Bulk API can process any no of records .Only limit you may have to look is no of batches per day.This limit is 2000 batches per 24 hour .(On rolling basis)