RxPy读取csv文件和进程行

问题描述 投票:1回答:1

我想使用RxPy打开(csv)文件并逐行处理文件。我确切地想要采取以下步骤

  1. 为流提供文件名
  2. 打开文件
  3. 逐行读取文件
  4. 删除以评论开头的行(例如#...)
  5. 应用csv阅读器
  6. 过滤符合某些条件的记录

到目前为止,我有:

def to_file(filename):
f = open(filename)
return Observable.using(
    lambda: AnonymousDisposable(lambda: f.close()),
    lambda d: Observable.just(f)
)

def to_reader(f):
    return csv.reader(f)

def print_rows(reader):
    for row in reader:
        print(row)

这有效

Observable.from_(["filename.csv", "filename2.csv"])
   .flat_map(to_file).**map**(to_reader).subscribe(print_rows)

这不是:ValueError:关闭文件的I / O操作

Observable.from_(["filename.csv", "filename2.csv"])
   .flat_map(to_file).**flat_map**(to_rows).subscribe(print)

第二个不起作用,因为(见https://github.com/ReactiveX/RxPY/issues/69

当第一个flatmap中的observable由第二个flatmap合并时,内部订阅将在完成时被处理。因此,文件将被关闭,即使文件句柄是on_next'ed到由第二个flatmap设置的新的observable。

知道如何实现:像:

Observable.from_(["filename.csv", "filename2.csv"]
   ).flat_map(to_file
   ).filter(comment_lines
   ).filter(empty_lines
   ).map(to_csv_reader
   ).filter(filter_by.. )
   ).do whatever

非常感谢你的帮助

克林斯曼

python reactivex
1个回答
0
投票

我刚刚开始与RxPy合作,需要做同样的事情。惊讶的是有人还没有回答你的问题,但决定回答以防万一其他人需要知道。假设你有一个像这样的CSV文件:

$ cat datafile.csv
"iata","airport","city","state","country","lat","long"
"00M","Thigpen ","Bay Springs","MS","USA",31.95376472,-89.23450472
"00R","Livingston Municipal","Livingston","TX","USA",30.68586111,-95.01792778
"00V","Meadow Lake","Colorado Springs","CO","USA",38.94574889,-104.5698933
"01G","Perry-Warsaw","Perry","NY","USA",42.74134667,-78.05208056
"01J","Hilliard Airpark","Hilliard","FL","USA",30.6880125,-81.90594389

这是一个解决方案:

from rx import Observable
from csv import DictReader

Observable.from_(DictReader(open('datafile.csv', 'r'))) \
          .subscribe(lambda row: 
                     print("{0:3}\t{1:<35}".format(row['iata'], row['airport'][:35]))
          )
© www.soinside.com 2019 - 2024. All rights reserved.