ThreadPool中的保留顺序:如何在特定索引位置的csv中插入行?

问题描述 投票:0回答:1

我正在编写一个在csv文件上循环的ruby脚本,然后针对每一行从第三方api检索数据,然后将检索到的数据写入csv文件。我试图实现一个thread_pool以便并行处理api调用和行的插入。我不确定自己的做法是否正确,因此欢迎提出任何建议。我遇到的一个具体问题是如何保留原始文件的顺序。我的解决方案是将第一个文件的[[pass the index传递给线程,然后强制该线程将该索引位置的行插入到csv中。

这是我希望多线程的任务类。

class Task def initialize(row, index, conn) @row = row @index = index @file = CSV.open("temp_and_cases_parallel.csv", "ab") @conn = conn end def run get_climate_data writte_climate_data end private def get_climate_data uri = "https://weather.visualcrossing.com/VisualCrossingWebServices/rest/services/weatherdata/history?&aggregateHours=24&startDateTime=#{@row["day"].strip}T00:00:00&endDateTime=#{@row["day"].strip}T23:59:00&unitGroup=metric&contentType=csv&location=#{@row["lat"].strip},#{@row["long"].strip}&key=#{API_KEY}" response = @conn.get uri puts("calling #{uri}") @climate_info = CSV.parse(response.body, headers: true).first end def writte_climate_data if @index == 1 headers = @row.headers + @climate_info.headers @file << headers end @file << @row.fields + @climate_info.fields end end

所以我想在writte_climate_data中基于@file插入到特定位置的@index中>

这里是线程池的实现:

class ThreadPool def initialize(size: 10) @size = size @tasks = Queue.new @pool = [] end def schedule(*args, &block) @tasks << [block, args] end def start Thread.new do loop do next if @pool.size >= @size task, args = @tasks.pop thread = Thread.new do task.call(*args) end_thread(thread) end @pool << thread end end end def inactive? @tasks.empty? && @pool.empty? end def end_thread(thread) @pool.delete(thread) thread.kill end end

从cases_by_region.csv读取的脚本,并为每一行创建一个带有Task的线程:

RETRY_OPTIONS = { max: 10, interval: 3, interval_randomness: 0.5, backoff_factor: 2 } conn = Faraday.new do |f| f.request :retry, RETRY_OPTIONS end threads = [] thread_pool = ThreadPool.new thread_pool.start # CSV.open("temp_and_cases_parallel.csv", "ab") do |temp_and_cases| CSV.foreach("cases_by_region.csv", headers: true).first(10).each_with_index do |row, index| thread_pool.schedule do Task.new(row, index, conn).run end end # end sleep(1) until thread_pool.inactive?

您将如何继续实现这一目标,如何在生成的csv中保留原始文件中原始行的位置?

我正在编写一个遍历csv文件的ruby脚本,然后针对每一行从第三方api检索数据,然后将检索到的数据写入csv文件。我正在尝试实现...

ruby csv threadpool
1个回答
0
投票
如果需要订购,则不应使用数组。您可以尝试使用哈希数组。
© www.soinside.com 2019 - 2024. All rights reserved.