给定一个数据集,我试图创建一个逻辑,在两个列中,我需要强制执行连续性,最后一个目的地(进入列)是每个id的确切下一个起点(从列)。例如这个表
+----+-------+-------+
| id | from | to |
+----+-------+-------+
| 1 | A | B |
| 1 | C | A |
| 2 | D | D |
| 2 | F | G |
| 2 | F | F |
+----+-------+-------+
理想情况下应该是这样的。
+----+-------+-------+
| id | from | to |
+----+-------+-------+
| 1 | A | B |
| 1 | B | C |
| 1 | C | A |
| 2 | D | D |
| 2 | D | F |
| 2 | F | G |
| 2 | G | F |
| 2 | F | F |
+----+-------+-------+
我用Pandas做了这样的工作,我按行循环检查 previous_row['to'] == current_row['from'],同时检查id,也许可以用groupby来避免,你可以看到下面的内容
for i in range(len(df)):
if (i < (len(df)-1)):
if (new.ix[i,"to"] != new.ix[i+1,"from"]) & (new.ix[i,"id"] == new.ix[i+1,"id"]):
new_index = i + 0.5
line = pd.DataFrame({"id":new.ix[i,"id"],
"from":new.ix[i,"to"],"to":new.ix[i+1,"from"],}, index = [new_index])
appendings = pd.concat([appendings,line])
else:
pass
else:
pass
是否有可能将其 "翻译 "为pyspark rdds?
我知道在Pyspark中,循环远不是最佳的复制循环和if-else逻辑。
我考虑过通过分组和从列和到列的zipping,并在单列上工作。主要问题在于,我可以在 "有问题 "的行上产生一个标志,但没有办法在不使用索引操作的情况下插入新行。
这不是一个 pyspark
答案,但只是一个部分答案,以告诉你如何实现没有循环的任务在 pandas
.
你可以试试。
def f(sub_df):
return sub_df.assign(to_=np.roll(sub_df.To, 1)) \
.apply(lambda x: [[x.From, x.To]] if x.to_ == x.From else [[x.to_, x.From], [x.From, x.To]], axis=1) \
.explode() \
.apply(pd.Series)
out = df.groupby('id').apply(f) \
.reset_index(level=1, drop=True) \
.rename(columns={0: "from", 1: "to"})
工作流。
id
使用 groupby
to_
)来拥有前一行。np.roll
执行 圆班 以便保留最后的数值。explode
爆炸 list
的 list
变成每行一个列表。apply(pd.Series)
reset_index
rename
完整代码
# Import module
import pandas as pd
import numpy as np
# create dataset
df = pd.DataFrame({"id": [1,1,2,2,2], "From": ["A", "C", "D", "F", "F"], "To": ["B", "D", "D", "G", "F"]})
# print(df)
def f(sub_df):
return sub_df.assign(to_=np.roll(sub_df.To, 1)) \
.apply(lambda x: [[x.From, x.To]] if x.to_ == x.From else [[x.to_, x.From], [x.From, x.To]], axis=1) \
.explode() \
.apply(pd.Series)
out = df.groupby('id').apply(f) \
.reset_index(level=1, drop=True) \
.rename(columns={0: "from", 1: "to"})
print(out)
# from to
# id
# 1 D A
# 1 A B
# 1 B C
# 1 C D
# 2 F D
# 2 D D
# 2 D F
# 2 F G
# 2 G F
# 2 F F
下一步是将其转化为 PySpark
. 尝试一下,并随时打开一个新的问题与你的尝试。