Splunk Python SDK API作业。结果限制为50k个结果。试图设置一个偏移量以拉取多个50k的块,但不知道如何使其工作]]

问题描述 投票:1回答:1

[我有一个工作,谁的工作['resultCount']是36.7万,但是无论我做什么,我似乎都不能超过前50,000个块。

我从这里得到了一段答案,这段代码是针对具有相似最终目标和设置的人的:https://answers.splunk.com/answers/114045/python-sdk-results-resultsreader-extremely-slow.html

rs = job.results(count=maxRecords, offset=self._offset)
results.ResultsReader(io.BufferedReader(ResponseReaderWrapper(rs)))

我写了下面的代码,我在上面弄了些花,但是我不能让offset = self._offset做任何事情,我也不知道它应该做什么。

class SplunkConnector(object):
def __init__(self, username, password, customerGuid):
    self.username = username
    self.password = password
    self.customerGuid = customerGuid
    flag = True
    while flag:
        try:
            self.service = client.connect(host=*****, port=8089, username=self.username, password=self.password, scheme='https')
            flag = False
        except binding.HTTPError as e:
            json_log.debug(str(e))

def search(self, query_dict):
    query = query_dict['search']
    label = query_dict['label']
    search_headers = query_dict['headers']
    customer = query_dict['customer']
    customerGuid = query_dict['customerGuid']
    try:
        earliest_time = query_dict['earliest_time']
        latest_time = query_dict['latest_time']
    except KeyError:
        earliest_time = '-1d@d'
        latest_time = '@d'
    json_log.debug('Starting %s customerGuid=%s' % (label, self.customerGuid))
    kwargs_normalsearch = {'exec_mode': 'normal', 'earliest_time': earliest_time, 'latest_time': latest_time, 'output_mode': 'csv'}
    job = self.service.jobs.create(query + ' | fillnull value="---"', **kwargs_normalsearch)
    while True:
        try:
            while not job.is_ready():
                pass
            stats = {"isDone": job["isDone"],
                     "label": label,
                     "customer": customer,
                     "customerGuid": customerGuid,
                     "doneProgress": float(job["doneProgress"]) * 100,
                     "scanCount": int(job["scanCount"]),
                     "eventCount": int(job["eventCount"]),
                     "resultCount": int(job["resultCount"])}

            json_log.debug(stats)

            if stats["isDone"] == "1":
                json_log.debug("\n\nDone!\n\n")
                break
            sleep(2)
            stats = {"isDone": job["isDone"],
                     "label": label,
                     "customer": customer,
                     "customerGuid": customerGuid,
                     "doneProgress": float(job["doneProgress"]) * 100}

            json_log.debug(stats)

            if stats["isDone"] == "1":
                json_log.debug('Search %s finished for customerGuid=%s'
                               % (label, customerGuid))
                break
            sleep(2)

        except binding.HTTPError as e:
            json_log.debug(str(e))
            pass
        except AttributeError:
            stats = {"isDone": job["isDone"],
                     "label": label,
                     "customer": customer,
                     "customerGuid": customerGuid,
                     "doneProgress": float(job["doneProgress"]) * 100}

            json_log.debug(stats)

            if stats["isDone"] == "1":
                json_log.debug('Search %s finished for customerGuid=%s'
                               % (label, customerGuid))
                break
            sleep(2)

    # Get the results and display them
    result_count = job['resultCount']
    rs = job.results(count=0)
    rr = results.ResultsReader(io.BufferedReader(rs))
    results_list = []
    for result in rr:
        if isinstance(result, results.Message):
            # Diagnostic messages may be returned in the results
            json_log.debug('%s: %s label=%s customerGuid=%s'
                           % (result.type, result.message, label, customerGuid))
        elif isinstance(result, dict):
            # Normal events are returned as dicts
            keys, values = [], []

            for header in search_headers:
                if header not in result.keys():
                    print(header)
                    result[header] = ''

            for key, value in result.items():
                if key in search_headers:
                    keys.append(str(key))
                    values.append(str(value))
            if not results_list == []:
                results_list.append(values)
            else:
                results_list.append(keys)
                results_list.append(values)

    output = io.BytesIO()
    writer = csv.writer(output, delimiter=',')
    writer.writerows(results_list)
    output_string = output.getvalue()
    assert rr.is_preview is False

    job.cancel()
    return [label, output_string.replace('\r\n', '\n').replace('---', '')]

    def searches(self, query_list):
        print(query_list)
        if type(query_list) == dict:
            query_list = [value for value in query_list.values()]
        with closing(ThreadPool(processes=len(query_list))) as pool:
            results = pool.map(self.search, query_list)
            pool.terminate()

        print(results)
        search_results = {item[0]: item[1] for item in results}
        print(search_results)
        return search_results

[我有一个工作,谁的工作['resultCount']是36.7万,但是无论我做什么,我似乎都不能超过前50,000个块。我从这里的答案中读取了这段代码,该代码是给......>

python-3.x python-2.7 api splunk splunk-sdk
1个回答
0
投票

我能够成功完成此工作。我下面的代码应演示如何实现此目的。

import io
import csv
from time import sleep
import splunklib.results as results
import splunklib.client as client
import splunklib.binding as binding
from multiprocessing.pool import ThreadPool
from contextlib import closing



class SplunkConnector(object):
    def __init__(self, username, password, customerGuid):
        self.username = username
        self.password = password
        self.customerGuid = customerGuid
        flag = True
        while flag:
            try:
                self.service = client.connect(host=*****, port=8089, username=self.username, password=self.password, scheme='https')
                flag = False
            except binding.HTTPError as e:
                json_log.debug(str(e))

    def search(self, query_dict):
        query = query_dict['search']
        label = query_dict['label']
        search_headers = query_dict['headers']
        customer = query_dict['customer']
        customerGuid = query_dict['customerGuid']
        try:
            earliest_time = query_dict['earliest_time']
            latest_time = query_dict['latest_time']
        except KeyError:
            earliest_time = '-1d@d'
            latest_time = '@d'
        kwargs_normalsearch = {'exec_mode': 'normal', 'earliest_time': earliest_time, 'latest_time': latest_time, 'output_mode': 'csv'}
        flag = True
        while flag:
            try:
                job = self.service.jobs.create(query + ' | fillnull value="---"', **kwargs_normalsearch)
                flag = False
            except binding.HTTPError as e:
                pass
            pass
        while True:
            try:
                while not job.is_ready():
                    pass
                stats = {"isDone": job["isDone"],
                         "label": label,
                         "customer": customer,
                         "customerGuid": customerGuid,
                         "doneProgress": float(job["doneProgress"]) * 100,
                         "scanCount": int(job["scanCount"]),
                         "eventCount": int(job["eventCount"]),
                         "resultCount": int(job["resultCount"])}

                if stats["isDone"] == "1":
                    break
                sleep(2)
                stats = {"isDone": job["isDone"],
                         "label": label,
                         "customer": customer,
                         "customerGuid": customerGuid,
                         "doneProgress": float(job["doneProgress"]) * 100}

                if stats["isDone"] == "1":
                    break
                sleep(2)

            except binding.HTTPError as e:
                pass
            except AttributeError:

                stats = {"isDone": job["isDone"],
                         "label": label,
                         "customer": customer,
                         "customerGuid": customerGuid,
                         "doneProgress": float(job["doneProgress"]) * 100}

                if stats["isDone"] == "1":
                    break
                sleep(2)

        result_count = job['resultCount']
        offset = 0
        count = 50000
        results_list = self.results_getter(job, label, customerGuid, search_headers, True, count, offset, result_count)


        while len(results_list) < int(result_count) + 1:
            offset += count
            placeholder = self.results_getter(job, label, customerGuid, search_headers, False, count, offset, result_count)
            results_list.extend(placeholder)

        output = io.BytesIO()
        writer = csv.writer(output, delimiter=',')
        writer.writerows(results_list)
        output_string = output.getvalue()
        job.cancel()
        return [label, output_string.replace('\r\n', '\n').replace('---', '')]

    def results_getter(self, job, label, customerGuid, search_headers, first, count, offset, result_count):
        # Get the results and display them
        kwargs_paginate = {"count": count,
                           "offset": offset}
        blocksearch_results = job.results(**kwargs_paginate)
        results_list = []

        reader = results.ResultsReader(blocksearch_results)

        for result in reader:
            if isinstance(result, results.Message):
                # Diagnostic messages may be returned in the results
            elif isinstance(result, dict):
                # Normal events are returned as dicts
                keys, values = [], []

                for header in search_headers:
                    if header not in result.keys():
                        result[header] = ''

                for key, value in result.items():
                    if key in search_headers:
                        keys.append(str(key))
                        values.append(str(value))
                if not results_list == []:
                    results_list.append(values)
                elif first:
                    results_list.append(keys)
                    results_list.append(values)
                else:
                    results_list.append(values)

        assert not reader.is_preview
        return results_list

    def searches(self, query_list):
        if type(query_list) == dict:
            query_list = [value for value in query_list.values()]
        with closing(ThreadPool(processes=len(query_list))) as pool:
            results = pool.map(self.search, query_list)
            pool.terminate()

        search_results = {item[0]: item[1] for item in results}
        return search_results
© www.soinside.com 2019 - 2024. All rights reserved.