我正在使用 Salesforce Bulk 2.0 API 来获取 Salesforce 对象的数据。我已经为此创建了相应的函数 -
/**
* Fetches results from a Job ID of Batch query.
* NOTE : Provided Job ID must be in "JobComplete" state to fetch results
*
* @param jobId Job ID
* @param maxRecords Number of records to fetch in one chunk of API hit
* @return {@link String} CSV-formatted String/results (text/csv)
* @throws IOException Exception in mapping object
* @author ujjawal pandey
*/
public String getJobResults(String jobId, String maxRecords) throws IOException {
String getJobInfoUrl = String.format(JobResourcePath.getJobResultPath(), this.apiUrl,
this.apiVersion, jobId, maxRecords);
String sforceLocator = null;
String filePath = jobId + ".csv";
boolean isFirstDatasetFetch = true;
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(
Files.newOutputStream(Paths.get(filePath)),
StandardCharsets.UTF_8));
do {
String getJobInfoUrlLocator = getJobInfoUrl;
HttpClient httpClient = HttpClientBuilder.create().build();
if (sforceLocator != null) {
getJobInfoUrlLocator = getJobInfoUrl + "&locator=" + sforceLocator;
}
LOGGER.info(getJobInfoUrlLocator);
HttpGet httpGet = new HttpGet(getJobInfoUrlLocator);
httpGet.setHeader("Authorization", "Bearer " + accessToken);
HttpResponse response = httpClient.execute(httpGet);
LOGGER.info(response.toString());
int responseCode = response.getStatusLine().getStatusCode();
String responseBody = "";
if (responseCode == Constants.HTTP_STATUS_OK) {
org.apache.http.HttpEntity entity = response.getEntity();
sforceLocator = response.getFirstHeader("Sforce-Locator").getValue();
LOGGER.info("Locator is: " + sforceLocator);
responseBody = EntityUtils.toString(entity);
LOGGER.debug(String.valueOf(responseBody));
if (isFirstDatasetFetch) {
writer.write(responseBody);
isFirstDatasetFetch = false;
} else {
writer.write(responseBody.substring(
responseBody.indexOf('\n') + 1));
}
} else {
LOGGER.error(responseBody);
throw new RuntimeException(responseBody);
}
} while (sforceLocator != null && !sforceLocator.equals("null"));
writer.close();
return filePath;
}
问题是正在创建的 CSV 格式正确,但在某些列中出现了一些奇怪的字符。例如,
â¢
(欧元在它们之间,粘贴使其消失)
现在当我在 Spark 中阅读时,使用以下配置 -
spark.read()
.option("header", "true")
.option("delimiter", ",")
.option("lineSep", "\n")
.option("multiLine", "true")
.option("encoding", "UTF-8")
.csv(hdfsTempCsvStoragePath + "/" + csvPath);
由于以下字符“可能”,我得到了额外的行(4)。 PFA.
我知道这个问题与编码有关,但没有很好的理解。
因为我觉得我错过了一些东西,所以我没有尝试太多。我想到的最后一招是必须清理 CSV,以便 Spark 可以读取它。
我没有代表发表评论。这并不是一个真正的答案。但我有一些评论你应该会觉得有帮助。我不知道斯帕克。
这将使 UTF-8 字符不会被破坏成两个 ASCII 字符。并且它将保留可处理的非 ascii utf-8 字符。
如果您从一开始就可以访问数据,则可以使用此正则表达式来识别之前需要注意的所有字符。
没有人能为您提供最佳方法的建议,因为我们不知道数据、哪些进程正在监视您的批量 api 作业(例如,如果您抛出异常,可以处理数据以供人工审核)、出现特殊字符的频率(以及哪些),或者数据 100% 正确的紧急程度(或者即使这是可能的)。