将 opentelemetry-instrumentation-psycopg2 与 psycopg2 一起使用时的递归跟踪问题

问题描述 投票:0回答:1

问题描述

当我将

opentelemetry-instrumentation-psycopg2
库与
psycopg2's ThreadedConnectionPool
一起使用时,我遇到了递归问题。当涉及并发时,这个问题似乎有一定的概率发生,所以我决定重现这个问题。

重现问题

  • otel-collector-config.yaml
receivers:
  otlp:
    protocols:
      grpc:
      http:

exporters:
  logging:
    loglevel: debug

  jaeger:
    endpoint: jaeger-all-in-one:14250
    tls:
      insecure: true
processors:
  batch:

service:
  pipelines:
    traces:
      receivers: [otlp]
      exporters: [logging, jaeger]
      processors: [batch]
  • docker-compose.yaml
version: "3"

services:
  jaeger-all-in-one:
    image: jaegertracing/all-in-one:1.42
    restart: always
    environment:
      - COLLECTOR_OTLP_ENABLED=true
    ports:
      - "16686:16686" # server frontend
      - "14268:14268" # HTTP collector
      - "14250:14250" # gRPC collector

  otel-collector:
    image: otel/opentelemetry-collector:0.72.0
    restart: always
    command: ["--config=/etc/otel-collector-config.yaml"]
    volumes:
      - ./otel-collector-config.yaml:/etc/otel-collector-config.yaml
    ports:
      - "4317:4317"   # OTLP gRPC receiver
      - "4318:4318"   # OTLP Http receiver
    depends_on:
      - jaeger-all-in-one

  # database
  postgres:
    image: postgres:13.2-alpine
    environment:
      - POSTGRES_USER=root
      - POSTGRES_PASSWORD=12345678
      - POSTGRES_DB=example
    ports:
      - "5432:5432"
  • 需求.txt
psycopg2==2.9.7
opentelemetry-instrumentation-psycopg2>=0.33b0
opentelemetry-exporter-otlp>=1.12.0
  • 索引.py
import logging
import threading
import time
from psycopg2 import pool
import psycopg2
from opentelemetry import trace
from opentelemetry.instrumentation.psycopg2 import Psycopg2Instrumentor
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor

# this database use the connection poll
class PostgresDatabasePool:
    def __init__(self, minconn, maxconn, host, database, user, password, port):
        self.db_pool = pool.ThreadedConnectionPool(
            minconn=minconn,
            maxconn=maxconn,
            host=host,
            database=database,
            user=user,
            password=password,
            port=port
        )

    def execute_query(self, query):
        conn = None
        cursor = None
        try:
            conn = self.db_pool.getconn()
            cursor = conn.cursor()
            cursor.execute(query)
            logging.info(f"execute command: {query}")
        except Exception as e:
            logging.error(f"SQL error: {e}")
        finally:
            if cursor:
                cursor.close()
            if conn:
                self.db_pool.putconn(conn)

# this database use the default connect
class PostgresDatabase:
    def __init__(self, host, database, user, password, port):
        self.conn = psycopg2.connect(
            host=host,
            database=database,
            user=user,
            password=password,
            port=port
        )
        self.conn.autocommit = True
        self.cursor = self.conn.cursor()

    def execute_query(self, query):
        if not self.cursor:
            logging.warning("Please connect first")
            return
        try:
            self.cursor.execute(query)
            logging.info(f"execute command: {query}")
        except Exception as e:
            logging.error(f"SQL error: {e}")

def delete1_with_db_pool(thread_name):
    while True:
        with tracer.start_as_current_span("delete1_with_db_pool", kind=trace.SpanKind.INTERNAL):
            dbPool.execute_query("DELETE FROM public.test1;")
            dbPool.execute_query("DELETE FROM public.test1;")
            dbPool.execute_query("DELETE FROM public.test1;")
            dbPool.execute_query("DELETE FROM public.test1;")
            dbPool.execute_query("DELETE FROM public.test1;")
        time.sleep(5)

def delete2_with_db_pool(thread_name):
    while True:
        with tracer.start_as_current_span("delete2_with_db_pool", kind=trace.SpanKind.INTERNAL):
            dbPool.execute_query("DELETE FROM public.test2;")
            dbPool.execute_query("DELETE FROM public.test2;")
            dbPool.execute_query("DELETE FROM public.test2;")
            dbPool.execute_query("DELETE FROM public.test2;")
            dbPool.execute_query("DELETE FROM public.test2;")
        time.sleep(5)

def delete1(thread_name):
    while True:
        with tracer.start_as_current_span("delete1", kind=trace.SpanKind.INTERNAL):
            db.execute_query("DELETE FROM public.test1;")
            db.execute_query("DELETE FROM public.test1;")
            db.execute_query("DELETE FROM public.test1;")
            db.execute_query("DELETE FROM public.test1;")
            db.execute_query("DELETE FROM public.test1;")
        time.sleep(5)

def delete2(thread_name):
    while True:
        with tracer.start_as_current_span("delete2", kind=trace.SpanKind.INTERNAL):
            db.execute_query("DELETE FROM public.test2;")
            db.execute_query("DELETE FROM public.test2;")
            db.execute_query("DELETE FROM public.test2;")
            db.execute_query("DELETE FROM public.test2;")
            db.execute_query("DELETE FROM public.test2;")
        time.sleep(5)

# tracing
resource = Resource(attributes={SERVICE_NAME: 'Demo_Bug'})
provider = TracerProvider(resource=resource)
otlp_exporter = OTLPSpanExporter(endpoint='localhost:4317', insecure=True)
provider.add_span_processor(BatchSpanProcessor(otlp_exporter))
trace.set_tracer_provider(provider)
tracer = trace.get_tracer("test", "1.0.0")

# Psycopg2
Psycopg2Instrumentor().instrument()

# init database
dbPool = PostgresDatabasePool(1, 5, "localhost", "example", "root", "12345678", 5432)
db = PostgresDatabase("localhost", "example", "root", "12345678", 5432)

# create table
db.execute_query("CREATE TABLE IF NOT EXISTS public.test1 (id serial  NOT NULL , text varchar(64)  NOT NULL);")
db.execute_query("CREATE TABLE IF NOT EXISTS public.test2 (id serial  NOT NULL , text varchar(64)  NOT NULL);")

# init thread
threads = [threading.Thread(target=delete1_with_db_pool, args=("Thread-1",)), threading.Thread(target=delete2_with_db_pool, args=("Thread-2",)),
           threading.Thread(target=delete1, args=("Thread-3",)), threading.Thread(target=delete2, args=("Thread-4",))]

# start
for thread in threads:
    thread.start()

# wait
for thread in threads:
    thread.join()
  • 运行命令并打开 Jaeger Dashboard 你会发现问题。
docker-compose up -d
pip install -r requirements.txt
python index.py

怀疑根本原因

我怀疑问题出在我们使用

pool.ThreadedConnectionPool()
库中的
psycopg2
函数来重用数据库连接。看来 opentelemetry-instrumentation-psycopg2 还没有采取这种情况。

我已在 GitHub opentelemetry-python-contrib 存储库上发布了此问题

python concurrency psycopg2 trace open-telemetry
1个回答
0
投票

当我切换Python版本并运行相同的代码进行测试后,最终的测试结果发生了变化。

如果使用3.6到3.8之间的Python版本进行测试,将会遇到递归问题。但是,如果您使用3.9和3.11之间的版本,则不会出现递归问题。很奇怪。

这是我切换Python版本后的测试结果:

  • 3.6.15(有递归问题)
  • 3.7.16(有递归问题)
  • 3.8.16(有递归问题)
  • 3.9.16(效果很好)
  • 3.10.9(效果很好)
  • 3.11.1(效果很好)

我怀疑根本原因是Python版本,如果你想使用

psycopg2's ThreadedConnectionPool
,请确保你的Python版本在3.9.x以上;否则,您将遇到递归问题。

© www.soinside.com 2019 - 2024. All rights reserved.