我有一个数据集,其中包含 ID 以及句子中特定 ID 的开始和结束位置。这是一个庞大的数据集,约有 2.5 亿行。 现在我需要处理这些数据,根据每个句子的位置信息将这些 Id 转换为序列。这是我为此编写的代码:
def process_chunk(chunk):
sequence = [' '] * chunk.shape[0]
sorted_chunk = chunk.sort_values(by='Feature Start', ascending=True)
i = 0
for _, row in sorted_chunk.iterrows():
sequence[i] = row['Id']
i += 1
sequence = ' '.join(sequence)
return sequence
# Apply the function to each partition of the Dask DataFrame
import dask.dataframe as dd
sample_ddf = dd.from_pandas(sample_df, npartitions=10)
result_ddf = ddf.groupby('sentence_id', 'Feature Start').apply(process_chunk, meta=('x', 'string')).compute()
result_df = result_ddf.reset_index()
result_df.columns = ['sentence_id', 'ordered_sequence']
此逻辑不起作用并输出空序列。我添加了打印语句并验证了它是因为某些块没有任何数据
在上面的代码片段中,sample_df 具有以下列:[sentence_id、Feature Start、Feature End、Id] 如果这是sample_df中的样本数据:
Id sentence_id Feature Start Feature End
24976 120 57 70
23430 120 49 50
09086 120 74 76
10550 120 15 18
10550 120 15 18
上述逻辑理想情况下应该输出示例数据:
print(result_df['ordered_sequence'].tolist())
>>> ['10550 10550 23430 24976 09086']
这个示例之所以有效,是因为只有 5 行,它不适用于我的实际数据集。我得到空序列值。
给出你自定义函数的代码,不需要,直接使用pandas方法即可:
out = (sample_df
.astype({'Id': str})
.sort_values(by=['sentence_id', 'Feature Start'])
.groupby('sentence_id', as_index=False)['Id']
.agg(' '.join)
)
或dask:
out = (dd.from_pandas(sample_df, npartitions=10)
.astype({'Id': str})
.sort_values(by=['sentence_id', 'Feature Start'])
.groupby('sentence_id')['Id']
.apply(' '.join, meta=('ordered_sequence', 'string')).compute()
.reset_index()
)
输出:
sentence_id ordered_sequence
0 120 10550 10550 23430 24976 9086
1 121 10550 10550 23430 24976 9086