Laravel 8 - 处理数据库中的数百万条记录

问题描述 投票:0回答:1

数据库表中有数百万条记录。

购买

id (increments), device_id (int), app_id (int), status(boolean), receipt (string)

我的框架laravel 8,我使用mysql

我创建了一个命令

应用程序/控制台/命令/ProcessPurchase.php

<?php

namespace App\Console\Commands;

use App\Models\Purchase;
use Illuminate\Console\Command;
use Illuminate\Support\Facades\Log;
use Illuminate\Support\Facades\Queue;
use App\Jobs\ProcessPurchaseJob;

class ProcessPurchase extends Command
{
/**
 * The name and signature of the console command.
 *
 * @var string
 */
protected $signature = 'process:purchase';

/**
 * The console command description.
 *
 * @var string
 */
protected $description = 'Command description';

/**
 * Create a new command instance.
 *
 * @return void
 */
public function __construct()
{
    parent::__construct();
}

/**
 * Execute the console command.
 *
 * @return int
 */
public function handle()
{
    Log::info('process:purchase command start successfully.');
    $purchases = Purchase::getUnequalizedPurchases(250);
    if(count($purchases) > 0) {
        foreach ($purchases as $purchase) {
            //push job to the queue
            Queue::push((new ProcessPurchaseJob($purchase, $purchase->os)));
        }
    } else {
        Log::info('process:purchase command end successfully.');
    }
}
}

并创造了一份工作

应用程序/控制台/命令/ProcessPurchaseJob.php

<?php

namespace App\Jobs;

use App\Models\Purchase;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldBeUnique;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Illuminate\Support\Facades\Log;

class ProcessPurchaseJob implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

private $purchase, $os;

/**
 * Create a new job instance.
 *
 * @return void
 */
public function __construct(Purchase $purchase, string $os)
{
    $this->purchase = $purchase;
    $this->os = $os;
}

/**
 * Execute the job.
 *
 * @return void
 */
public function handle()
{
    //getting data from the third-party endpoint
    if($purchaseData = getPurchaseDataFromMarket($this->purchase->receipt, $this->os)) {
        $this->purchase->expire_date = $purchaseData['expire_date'];
        $this->purchase->status = $purchaseData['status'];
        try {
            $this->purchase->save();
            Log::info('purchase job runned successfully. receipt : ' . $this->purchase->receipt);
        } catch (\Exception $exception) {
            Log::error($exception->getMessage());
        }
    } else {
        Log::error('purchase request error. receipt : ' . $this->purchase->receipt);
    }
}
}

并添加到 app/http/Console/Kernel.php

$schedule->command('process:purchase')
        ->everyFiveMinutes();

要启动 cron,请执行此命令。

./vendor/bin/sail artisan schedule:work

要执行队列中的作业,请执行此命令

./vendor/bin/sail artisan queue:work --queue=high,default

我应该向第三方 HTTP 端点发送请求并更新每条记录。

此过程每条记录大约需要 1.5 秒。因此,对于 10.000.000 条记录,需要 173.6 天。 我应该可以在短时间内处理所有这些数据。 (比如12小时等)

我该怎么办?

php mysql laravel queue job-scheduling
1个回答
0
投票

我通常使用分页来处理这个问题,作业和函数都是递归执行的,每个页面的所有循环过程都在单独的作业中递归执行。

class ProcessPurchaseJob implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    protected $page;

    public function __construct($page = null)
    {
        $this->page = $page ?? 1;
    }

    public function handle()
    {
        $batchSize = 1000; // Amount of data per page

        $purchases = Purchase::orderBy('id')
            ->paginate($batchSize, ['*'], 'page', $this->page);

        foreach ($purchases as $item) {
            // Execute your purchase logic code
        }

        // If there are still next pages, call the job again recursively
        if ($users->hasMorePages()) {
            dispatch(new ProcessPurchaseJob($users->currentPage() + 1));
        }
    }
}

然后在任何地方打电话

ProcessPurchaseJob::dispatch();
© www.soinside.com 2019 - 2024. All rights reserved.