CSV 中出现奇怪的字符,导致 Spark 无法读取

问题描述 投票:0回答:1

我正在使用 Salesforce Bulk 2.0 API 来获取 Salesforce 对象的数据。我已经为此创建了相应的函数 -

/**
 * Fetches results from a Job ID of Batch query.
 * NOTE : Provided Job ID must be in "JobComplete" state to fetch results
 *
 * @param jobId      Job ID
 * @param maxRecords Number of records to fetch in one chunk of API hit
 * @return {@link String} CSV-formatted String/results (text/csv)
 * @throws IOException Exception in mapping object
 * @author ujjawal pandey
 */
public String getJobResults(String jobId, String maxRecords) throws IOException {
    String getJobInfoUrl = String.format(JobResourcePath.getJobResultPath(), this.apiUrl,
            this.apiVersion, jobId, maxRecords);
    String sforceLocator = null;
    String filePath = jobId + ".csv";
    boolean isFirstDatasetFetch = true;
    BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(
            Files.newOutputStream(Paths.get(filePath)),
            StandardCharsets.UTF_8));
    do {
        String getJobInfoUrlLocator = getJobInfoUrl;
        HttpClient httpClient = HttpClientBuilder.create().build();
        if (sforceLocator != null) {
            getJobInfoUrlLocator = getJobInfoUrl + "&locator=" + sforceLocator;
        }

        LOGGER.info(getJobInfoUrlLocator);
        HttpGet httpGet = new HttpGet(getJobInfoUrlLocator);
        httpGet.setHeader("Authorization", "Bearer " + accessToken);

        HttpResponse response = httpClient.execute(httpGet);
        LOGGER.info(response.toString());
        int responseCode = response.getStatusLine().getStatusCode();
        String responseBody = "";

        if (responseCode == Constants.HTTP_STATUS_OK) {
            org.apache.http.HttpEntity entity = response.getEntity();
            sforceLocator = response.getFirstHeader("Sforce-Locator").getValue();
            LOGGER.info("Locator is: " + sforceLocator);

            responseBody = EntityUtils.toString(entity);

            LOGGER.debug(String.valueOf(responseBody));
            if (isFirstDatasetFetch) {
                writer.write(responseBody);
                isFirstDatasetFetch = false;
            } else {
                writer.write(responseBody.substring(
                        responseBody.indexOf('\n') + 1));
            }
        } else {
            LOGGER.error(responseBody);
            throw new RuntimeException(responseBody);
        }
    } while (sforceLocator != null && !sforceLocator.equals("null"));
    writer.close();
    return filePath;
}

问题是正在创建的 CSV 格式正确,但在某些列中出现了一些奇怪的字符。例如,

â¢
(欧元在它们之间,粘贴使其消失)

现在当我在 Spark 中阅读时,使用以下配置 -

spark.read()
     .option("header", "true")
     .option("delimiter", ",")
     .option("lineSep", "\n")
     .option("multiLine", "true")
     .option("encoding", "UTF-8")
     .csv(hdfsTempCsvStoragePath + "/" + csvPath);

由于以下字符“可能”,我得到了额外的行(4)。 PFA.

Screenshot of dataframe

我知道这个问题与编码有关,但没有很好的理解。

  1. 由于这个问题的发生,我在这里缺少什么(基本概念)?
  2. 解决这个问题的最佳方法是什么?

因为我觉得我错过了一些东西,所以我没有尝试太多。我想到的最后一招是必须清理 CSV,以便 Spark 可以读取它。

csv apache-spark encoding salesforce
1个回答
0
投票

我没有代表发表评论。这并不是一个真正的答案。但我有一些评论你应该会觉得有帮助。我不知道斯帕克。

  1. 执行创建 CSV 的 Java 时,将此属性添加到命令行:-Dfile.encoding=UTF-8

这将使 UTF-8 字符不会被破坏成两个 ASCII 字符。并且它将保留可处理的非 ascii utf-8 字符。

  1. 您是否应该继续清理 CSV。这个正则表达式可以用来识别特殊字符:/[^\x00-\x7f]/

如果您从一开始就可以访问数据,则可以使用此正则表达式来识别之前需要注意的所有字符。

没有人能为您提供最佳方法的建议,因为我们不知道数据、哪些进程正在监视您的批量 api 作业(例如,如果您抛出异常,可以处理数据以供人工审核)、出现特殊字符的频率(以及哪些),或者数据 100% 正确的紧急程度(或者即使这是可能的)。

© www.soinside.com 2019 - 2024. All rights reserved.