在另一个(同一类内)中使用来自一次 Locust 调用的一个相关值

问题描述 投票:0回答:1

我有以下工作代码:

import json
import requests
import os
import pandas as pd
import random
from locust import task, events
from locust.contrib.fasthttp import FastHttpUser
from azure.identity import DefaultAzureCredential

server_name = "https://test"

credential = DefaultAzureCredential()
token = credential.get_token("xxx")
token = token.token


def read_componentid_writepowerplant():
  csv_file = os.path.join("C:", os.sep, "PythonScripting", "MyCode", "pythonProject", 
  "powerplants.csv")
  comid = pd.read_csv(csv_file)["COMPONENTID"]
  return int(comid.sample(1))


def set_value_writepowerplant():
   value_random = random.randrange(500, 900)
   return value_random


   null = None
   global hpsId


   def setpayload_writepowerplant(dt_value):
   myjson = {
      "name": "Jostedal",
      "units": [
          {
              "generatorName": "Jostedal G1",
              "componentId": int(read_componentid_writepowerplant()),
              "timeSeries": null
          }
      ],
      "hpsId": 10014,
      "powerPlantId": 31109,
      "readPeriod": [
          1711407600,
          1711494000
      ],
      "timeAxis": {
          "t0": 1711407600,
          "dt": int(dt_value),
          "n": 96
      }
  }
  return myjson


@events.request.add_listener
def my_request_handler(request_type, name, response_time, response_length, response,
                   context, exception, start_time, url, **kwargs):
if exception:
    print(f"Request to {name} failed with exception {exception}")
else:
    print(f"Successfully made a request to: {name}")


class MffrApi(FastHttpUser):
host = server_name
network_timeout = 1000000.0
connection_timeout = 1000000.0

def read_powerplant(self):
    resp = self.client.get(f'/xxx/all/',
                           headers={"Authorization": f'Bearer {token}'},
                           name='/GetAllPowerplants')
    # print(resp.request.body)
    # print(resp.request.headers)
    # print(resp.request.url)
    # print(resp.text)
    hpsid = resp.json()[15]["hpsId"]
    print(hpsid)


def write_powerplant(self):
    payload = json.dumps(setpayload_writepowerplant(set_value_writepowerplant()), indent=2)
    resp = self.client.post(f'/xxx',
                            headers={
                                "Authorization": f'Bearer {token}', "Content-Type": 'application/json'},
                            name='/PostSingelPowerplant',
                            data=payload)
    # print(resp.request.body)
    # print(resp.request.headers)
    # print(resp.request.url)
    # print(resp.text)

@task(1)
def test_read_mfrr(self):
    self.read_powerplant()
    self.write_powerplant()

我希望将方法“read_powerplant”中的相关值“hpsid”放入方法“setpayload_writepowerplant”中的静态json有效负载中,以便在方法“write_powerplant”中使用。

我是否需要将此变量定义为全局变量才能做到这一点?

或者我是否需要更改某些方法的范围才能添加对方法之间交叉引用值的支持?

python scope locust
1个回答
0
投票

只需将其保存为用户的对象属性即可。另外,无需手动转储 json。

def read_powerplant(self):
    …
    self.hpsid = resp.json()[15]["hpsId"]

def write_powerplant(self):
    resp = self.client.post(f'/xxx', json={”hpsId”: self.hpsid, …}
    …
© www.soinside.com 2019 - 2024. All rights reserved.