gRPC 执行完毕后,gRPC 断言失败

问题描述 投票:0回答:1

我正在使用grpcio版本1.48.2

我的应用程序本质上是不同 docker 容器上的 gRPC 服务器的集合,线程池中最多有 10 个工作线程。

最近我开始遇到断言错误,导致 gRPC 服务器关闭:

[E0218 1000 ev_epoll1_linux.cc:1142] ASSERTION FAILED: next_worker->state == KICKED

此错误是在 gRPC 调用成功执行后立即发生的。 (它成功完成,因为我在最后有一个记录器打印表明成功。)

我尝试深入研究错误,我还在 gRPCio 源代码中检索了断言失败的实际函数:

(第1142行断言失败~,这意味着它在这个函数内部失败了:


static grpc_error_handle pollset_kick(grpc_pollset* pollset,
                                      grpc_pollset_worker* specific_worker) {
  GPR_TIMER_SCOPE("pollset_kick", 0);
  GRPC_STATS_INC_POLLSET_KICK();
  grpc_error_handle ret_err = GRPC_ERROR_NONE;
  if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
    std::vector<std::string> log;
    log.push_back(absl::StrFormat(
        "PS:%p KICK:%p curps=%p curworker=%p root=%p", pollset, specific_worker,
        static_cast<void*>(g_current_thread_pollset),
        static_cast<void*>(g_current_thread_worker), pollset->root_worker));
    if (pollset->root_worker != nullptr) {
      log.push_back(absl::StrFormat(
          " {kick_state=%s next=%p {kick_state=%s}}",
          kick_state_string(pollset->root_worker->state),
          pollset->root_worker->next,
          kick_state_string(pollset->root_worker->next->state)));
    }
    if (specific_worker != nullptr) {
      log.push_back(absl::StrFormat(" worker_kick_state=%s",
                                    kick_state_string(specific_worker->state)));
    }
    gpr_log(GPR_DEBUG, "%s", absl::StrJoin(log, "").c_str());
  }

  if (specific_worker == nullptr) {
    if (g_current_thread_pollset != pollset) {
      grpc_pollset_worker* root_worker = pollset->root_worker;
      if (root_worker == nullptr) {
        GRPC_STATS_INC_POLLSET_KICKED_WITHOUT_POLLER();
        pollset->kicked_without_poller = true;
        if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
          gpr_log(GPR_INFO, " .. kicked_without_poller");
        }
        goto done;
      }
      grpc_pollset_worker* next_worker = root_worker->next;
      if (root_worker->state == KICKED) {
        GRPC_STATS_INC_POLLSET_KICKED_AGAIN();
        if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
          gpr_log(GPR_INFO, " .. already kicked %p", root_worker);
        }
        SET_KICK_STATE(root_worker, KICKED);
        goto done;
      } else if (next_worker->state == KICKED) {
        GRPC_STATS_INC_POLLSET_KICKED_AGAIN();
        if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
          gpr_log(GPR_INFO, " .. already kicked %p", next_worker);
        }
        SET_KICK_STATE(next_worker, KICKED);
        goto done;
      } else if (root_worker == next_worker &&  // only try and wake up a poller
                                                // if there is no next worker
                 root_worker ==
                     reinterpret_cast<grpc_pollset_worker*>(
                         gpr_atm_no_barrier_load(&g_active_poller))) {
        GRPC_STATS_INC_POLLSET_KICK_WAKEUP_FD();
        if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
          gpr_log(GPR_INFO, " .. kicked %p", root_worker);
        }
        SET_KICK_STATE(root_worker, KICKED);
        ret_err = grpc_wakeup_fd_wakeup(&global_wakeup_fd);
        goto done;
      } else if (next_worker->state == UNKICKED) {
        GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV();
        if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
          gpr_log(GPR_INFO, " .. kicked %p", next_worker);
        }
        GPR_ASSERT(next_worker->initialized_cv);
        SET_KICK_STATE(next_worker, KICKED);
        gpr_cv_signal(&next_worker->cv);
        goto done;
      } else if (next_worker->state == DESIGNATED_POLLER) {
        if (root_worker->state != DESIGNATED_POLLER) {
          if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
            gpr_log(
                GPR_INFO,
                " .. kicked root non-poller %p (initialized_cv=%d) (poller=%p)",
                root_worker, root_worker->initialized_cv, next_worker);
          }
          SET_KICK_STATE(root_worker, KICKED);
          if (root_worker->initialized_cv) {
            GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV();
            gpr_cv_signal(&root_worker->cv);
          }
          goto done;
        } else {
          GRPC_STATS_INC_POLLSET_KICK_WAKEUP_FD();
          if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
            gpr_log(GPR_INFO, " .. non-root poller %p (root=%p)", next_worker,
                    root_worker);
          }
          SET_KICK_STATE(next_worker, KICKED);
          ret_err = grpc_wakeup_fd_wakeup(&global_wakeup_fd);
          goto done;
        }
      } else {
        GRPC_STATS_INC_POLLSET_KICKED_AGAIN();
        GPR_ASSERT(next_worker->state == KICKED);
        SET_KICK_STATE(next_worker, KICKED);
        goto done;
      }
    } else {
      GRPC_STATS_INC_POLLSET_KICK_OWN_THREAD();
      if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
        gpr_log(GPR_INFO, " .. kicked while waking up");
      }
      goto done;
    }

    GPR_UNREACHABLE_CODE(goto done);
  }

  if (specific_worker->state == KICKED) {
    if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
      gpr_log(GPR_INFO, " .. specific worker already kicked");
    }
    goto done;
  } else if (g_current_thread_worker == specific_worker) {
    GRPC_STATS_INC_POLLSET_KICK_OWN_THREAD();
    if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
      gpr_log(GPR_INFO, " .. mark %p kicked", specific_worker);
    }
    SET_KICK_STATE(specific_worker, KICKED);
    goto done;
  } else if (specific_worker ==
             reinterpret_cast<grpc_pollset_worker*>(
                 gpr_atm_no_barrier_load(&g_active_poller))) {
    GRPC_STATS_INC_POLLSET_KICK_WAKEUP_FD();
    if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
      gpr_log(GPR_INFO, " .. kick active poller");
    }
    SET_KICK_STATE(specific_worker, KICKED);
    ret_err = grpc_wakeup_fd_wakeup(&global_wakeup_fd);
    goto done;
  } else if (specific_worker->initialized_cv) {
    GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV();
    if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
      gpr_log(GPR_INFO, " .. kick waiting worker");
    }
    SET_KICK_STATE(specific_worker, KICKED);
    gpr_cv_signal(&specific_worker->cv);
    goto done;
  } else {
    GRPC_STATS_INC_POLLSET_KICKED_AGAIN();
    if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
      gpr_log(GPR_INFO, " .. kick non-waiting worker");
    }
    SET_KICK_STATE(specific_worker, KICKED);
    goto done;
  }
done:
  return ret_err;
}

我对如何进一步调试它没有任何好主意,以了解这里发生了什么 - 我不打算更改 GRPCIO 源代码也不责怪它,但这可能是 grpcio 库错误吗?

希望得到任何帮助/建议/一些阐明这个断言有什么问题,为什么会发生?什么可能导致这个?

提前非常感谢,伊夫。

grpc assertion epoll grpc-python grpcio
1个回答
0
投票

遇到同样的问题,但使用 Rails GCP Run Job,有更新吗?

© www.soinside.com 2019 - 2024. All rights reserved.