主循环不等待python多处理池完成并冻结

问题描述 投票:1回答:1

这是我10年后的第一个python项目,也是我在python多处理方面的第一次经验,所以可能只是一些我从未见过的非常基本的错误。

我被python和多处理网络搜寻器所困扰。我的搜寻器检查主页是否有更改,然后并行地遍历子类别,将项目添加到列表中。然后并行检查这些项目并通过硒提取(因为我无法弄清楚该怎么做,因为单击项目时内容会动态加载到页面中)。

主循环:

import requests
from requests.packages.urllib3.exceptions import InsecureRequestWarning
import time
from bs4 import BeautifulSoup
import pickledb
import random
import multiprocessing
import itertools

import config


requests.packages.urllib3.disable_warnings(InsecureRequestWarning)


def getAllSubCategories(pageNumber, items):
    # check website and look for subcategories that are "worth" extracting
    url = 'https://www.google.com' + str(pageNumber)
    response = requests.get(url, verify=False, headers=config.headers, cookies=config.cookies)
    pageSoup = BeautifulSoup(response.content, features='html.parser')
    elements = soup.find(...)
    if not elements: # website not loading properly
        return getAllSubCategories(items)

    for element in elements:
        items.append(element)


def checkAndExtract(item, ignoredItems, itemsToIgnore):
    # check if items are already extracted; if not, extract them if they contain a keyword
    import checker
    import extractor

    if item not in ignoredItems:
        if checker.check(item):
            extractor.extract(item, itemsToIgnore)
        else: itemsToIgnore.append(item)


if __name__ == '__main__':
    multiprocessing.freeze_support()

    itemsToIgnore = multiprocessing.Manager().list()

    crawlUrl = 'https://www.google.com/'
    db = pickledb.load('myDB.db', False)

    while True:
        try:
            # check main website for changes
            response = requests.get(crawlUrl, verify=False, headers=config.headers, cookies=config.cookies)
            soup = BeautifulSoup(response.content, features='html.parser')
            mainCondition = soup.find(...)

            if mainCondition:
                numberOfPages = soup.find(...)

                ignoredItems = db.get('ignoredItems')
                if not ignoredItems:
                    db.lcreate('ignoredItems')
                    ignoredItems = db.get('ignoredItems')

                items = multiprocessing.Manager().list()
                # get all items from subcategories
                with multiprocessing.Pool(30) as pool:
                    pool.starmap(getAllSubCategories, zip(range(numberOfPages, 0, -1), itertools.repeat(items)))

                itemsToIgnore[:] = []
                # loop through all items
                with multiprocessing.Pool(30) as pool:
                    pool.starmap(checkAndExtract, zip(items, itertools.repeat(ignoredItems), itertools.repeat(itemsToIgnore)))

                for item in itemsToIgnore:
                    if item not in db.get('ignoredItems'): db.ladd('ignoredItems', item)
                db.dump()

            time.sleep(random.randint(10, 20))
        except KeyboardInterrupt:
            break
        except Exception as e:
            print(e)
            continue

检查器:

import config

def check(item):
    title = item...
    try:
        for keyword in config.keywords: # just a string array
            if keyword.lower() in title.lower():
                return True
    except Exception as e:
        print(e)

    return False

提取器:

from selenium import webdriver
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.common.by import By
import time

import config

def extract(item, itemsToIgnore):
    driver = webdriver.Chrome('./chromedriver')
    driver.implicitly_wait(3)
    driver.get('https://www.google.com')
    for key in config.cookies:
        driver.add_cookie({'name': key, 'value': config.cookies[key], 'domain': '.google.com'})
    try:
        driver.get('https://www.google.com')

        wait = WebDriverWait(driver, 10)
        if driver.title == 'Page Not Found':
            extract(item, itemsToIgnore)
            return

        driver.find_element_by_xpath('...').click()
        time.sleep(1)
        button = wait.until(EC.element_to_be_clickable((By.XPATH, '...')))
        button.click()
        # and some extraction magic
    except:
        extract(item, itemsToIgnore) # try again

一切正常,并且一些测试运行成功。但是有时循环会在池完成其工作之前再次开始。在日志中,我可以看到项目检查器如何返回true,但是提取器甚至没有启动,主过程开始了下一次迭代:

2019-12-23 00:21:16,614 [SpawnPoolWorker-6220] [INFO ] check returns true
2019-12-23 00:21:18,142 [MainProcess         ] [DEBUG] starting next iteration
2019-12-23 00:21:39,630 [SpawnPoolWorker-6247] [INFO ] checking subcategory

[我也猜想该池无法以某种方式清理,因为我怀疑SpawnPoolWorker-XXXX值应该这么高。约1小时后它也会冻结。这可能与此问题有关。

python python-3.x multiprocessing web-crawler python-multiprocessing
1个回答
0
投票

我修复了从Win7切换到Win10或从starmap切换到starmap_async并随后在结果上调用get()的循环问题。

冻结很可能是由于未传递超时值而调用request.get()引起的。

© www.soinside.com 2019 - 2024. All rights reserved.