Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deadlock accessing metric reader storage when running under Flask/Gunicorn with gevent workers #4345

Open
ZachMatuson opened this issue Dec 10, 2024 · 1 comment
Labels
bug Something isn't working

Comments

@ZachMatuson
Copy link

Describe your environment

OS: SLES15SP5 (Docker)
Python version: 3.10.15
SDK version: 1.28.2
API version: 1.28.2

opentelemetry-exporter-otlp-proto-common 1.28.2
opentelemetry-exporter-otlp-proto-grpc 1.28.2
opentelemetry-instrumentation 0.49b2
opentelemetry-instrumentation-dbapi 0.49b2
opentelemetry-instrumentation-elasticsearch 0.49b2
opentelemetry-instrumentation-flask 0.49b2
opentelemetry-instrumentation-kafka-python 0.49b2
opentelemetry-instrumentation-logging 0.49b2
opentelemetry-instrumentation-mysql 0.49b2
opentelemetry-instrumentation-pymysql 0.49b2
opentelemetry-instrumentation-wsgi 0.49b2
opentelemetry-proto 1.28.2
opentelemetry-semantic-conventions 0.49b2
opentelemetry-util-http 0.49b2

gunicorn 22.0.0
flask 3.1.0
gevent 24.11.1

What happened?

We started seeing request timeouts and a memory leak a few months ago, but did not discover the root cause until recently. After enabling the --max-requests flag in Gunicorn, we found that requests were getting stuck while trying to record metrics. It appears that there is a deadlock within OpenTelemetry when running a Flask/Gunicorn REST API with gevent workers.
I also suspect this is causing a memory leak, where stuck gevent threads slowly pile up trying to access the metric_reader. This does not happen when testing against flask directly.

This gevent exception is thrown when the stuck threads are killed after Gunicorn restarts the worker.

Unhandled error
Traceback (most recent call last):
  ...
  File "/hostname/projects/...", line 103, in record_metric
    counter.add(1, {
  File "conda_env/lib/python3.10/site-packages/opentelemetry/metrics/_internal/instrument.py", line 206, in add
    self._real_instrument.add(amount, attributes)
  File "conda_env/lib/python3.10/site-packages/opentelemetry/sdk/metrics/_internal/instrument.py", line 163, in add
    self._measurement_consumer.consume_measurement(
  File "conda_env/lib/python3.10/site-packages/opentelemetry/sdk/metrics/_internal/measurement_consumer.py", line 82, in consume_measurement
    reader_storage.consume_measurement(measurement)
  File "conda_env/lib/python3.10/site-packages/opentelemetry/sdk/metrics/_internal/metric_reader_storage.py", line 117, in consume_measurement
    for view_instrument_match in self._get_or_init_view_instrument_match(
  File "conda_env/lib/python3.10/site-packages/opentelemetry/sdk/metrics/_internal/metric_reader_storage.py", line 87, in _get_or_init_view_instrument_match
    with self._lock:
  File "conda_env/lib/python3.10/threading.py", line 168, in acquire
    rc = self._block.acquire(blocking, timeout)
  File "conda_env/lib/python3.10/site-packages/gevent/thread.py", line 121, in acquire
    acquired = BoundedSemaphore.acquire(self, blocking, timeout)
  File "src/gevent/_semaphore.py", line 180, in gevent._gevent_c_semaphore.Semaphore.acquire
  File "src/gevent/_semaphore.py", line 249, in gevent._gevent_c_semaphore.Semaphore.acquire
  File "src/gevent/_abstract_linkable.py", line 521, in gevent._gevent_c_abstract_linkable.AbstractLinkable._wait
  File "src/gevent/_abstract_linkable.py", line 487, in gevent._gevent_c_abstract_linkable.AbstractLinkable._wait_core
  File "src/gevent/_abstract_linkable.py", line 490, in gevent._gevent_c_abstract_linkable.AbstractLinkable._wait_core
  File "src/gevent/_abstract_linkable.py", line 442, in gevent._gevent_c_abstract_linkable.AbstractLinkable._AbstractLinkable__wait_to_be_notified
  File "src/gevent/_abstract_linkable.py", line 451, in gevent._gevent_c_abstract_linkable.AbstractLinkable._switch_to_hub
  File "src/gevent/_greenlet_primitives.py", line 61, in gevent._gevent_c_greenlet_primitives.SwitchOutGreenletWithLoop.switch
  File "src/gevent/_greenlet_primitives.py", line 65, in gevent._gevent_c_greenlet_primitives.SwitchOutGreenletWithLoop.switch
  File "src/gevent/_gevent_c_greenlet_primitives.pxd", line 35, in gevent._gevent_c_greenlet_primitives._greenlet_switch
greenlet.GreenletExit

We tested commenting out the calls to increment metrics and requests stopped timing out

Steps to Reproduce

Unfortunately it's not easy to reproduce.

Create a simple flask/gunicorn server and run with gevent workers, then send requests until deadlock happens

entrypoint.py

from flask import Flask

from opentelemetry import trace, metrics
from opentelemetry.sdk.trace import Span
from opentelemetry.sdk.resources import Resource
from opentelemetry.semconv.resource import ResourceAttributes
from opentelemetry.instrumentation.flask import FlaskInstrumentor

from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
    BatchSpanProcessor,
)
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.metrics.export import (
    PeriodicExportingMetricReader,
    ConsoleMetricExporter
)
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
from opentelemetry.sdk.metrics import MeterProvider

import logging
import logging.config
import os

from route import bp

logger = logging.getLogger(__name__)

flask_instrumentor: FlaskInstrumentor = FlaskInstrumentor()


def configure_app(initialize_otel: bool = True):
    # setup open telemetry when running directly (gunicorn is handled in post_fork)
    if initialize_otel:
        OTEL_ENDPOINT = 'FILL IN URL'
        resource = Resource(
            {
                ResourceAttributes.SERVICE_NAME: 'backend',
                ResourceAttributes.SERVICE_VERSION: '1.0',
            }
        )

        # setup open telemetry tracer provider
        tracer_provider = TracerProvider(resource=resource)
        exporter = OTLPSpanExporter(OTEL_ENDPOINT, insecure=True)
        tracer_provider.add_span_processor(BatchSpanProcessor(exporter))
        trace.set_tracer_provider(tracer_provider)

        metric_exporter = OTLPMetricExporter(OTEL_ENDPOINT, insecure=True)
        console_metric_exporter = ConsoleMetricExporter()

        meter_provider = MeterProvider(
            resource=resource,
            metric_readers=[
                PeriodicExportingMetricReader(console_metric_exporter),
                # Enable this if you want
                # PeriodicExportingMetricReader(metric_exporter),
            ]
        )
        metrics.set_meter_provider(meter_provider)

    app = Flask(__name__)
    app.register_blueprint(bp)

    flask_instrumentor.instrument_app(app)

    return app


def create_gunicorn(*_args, **kwargs):
    app = configure_app(False)

    gunicorn_logger = logging.getLogger('gunicorn.error')
    app.logger.handlers.extend(gunicorn_logger.handlers)

    return app

def create_flask(*_args, **kwargs):
    app = configure_app()
    return app

if __name__ == '__main__':
    application = configure_app()
    application.run(host='0.0.0.0', port=8000, debug=True)

route.py

from flask import Blueprint

from opentelemetry import trace, metrics
tracer = trace.get_tracer(__name__)
meter = metrics.get_meter(__name__)

call_counter = meter.create_counter('call_counter')
call_counter_2 = meter.create_counter('call_counter_2')

bp = Blueprint('test', __name__, url_prefix='/test')

@bp.route('/', methods=['GET'])
def test():
    call_counter.add(1, {
        'custom_attribute': 'foo'
    })
    return 'Ok'

@bp.route('/2', methods=['GET'])
def test2():
    call_counter_2.add(1, {
        'custom_attribute': 'foo'
    })
    return 'Ok'

gunicorn.conf.py

######################################################
# THIS MUST COME BEFORE ANY MODULE IMPORTS
# Otherwise otel will block if the collector is down
######################################################
# prevents warning when preloading https://github.com/benoitc/gunicorn/issues/2796
try:
    import gevent.monkey
    gevent.monkey.patch_all()
except ImportError:
    pass

from gunicorn.arbiter import Arbiter
from gunicorn.workers.ggevent import GeventWorker
from opentelemetry import trace, metrics
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, SpanExporter, ConsoleSpanExporter
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource, Attributes
from opentelemetry.semconv.resource import ResourceAttributes
from opentelemetry.sdk.metrics.export import MetricExporter, PeriodicExportingMetricReader
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
from opentelemetry.sdk.metrics import MeterProvider

# required to share config across workers
preload_app = True # pylint: disable=C0103

# workaround for BatchSpanProcessor not being fork-safe
# https://opentelemetry-python.readthedocs.io/en/latest/examples/fork-process-model/README.html
def post_fork(server: Arbiter, worker: GeventWorker):
    server.log.info('Worker spawned (pid: %s, worker_id: %s)', worker.pid, worker._worker_id) # type: ignore

    # If workers are not distinguished within attributes, traces and
    # metrics exported from each worker will be indistinguishable. While
    # not necessarily an issue for traces, it is confusing for almost
    # all metric types. A built-in way to identify a worker is by PID
    # but this may lead to high label cardinality. This is caused by
    # each worker generating it's own metrics, so if they are not
    # distinguished from eachother, the metrics appear to go up and down
    # https://github.com/open-telemetry/opentelemetry-python/issues/3001

    OTEL_ENDPOINT = '...'
    resource = Resource({
        ResourceAttributes.SERVICE_NAME: 'backend',
        ResourceAttributes.SERVICE_NAMESPACE: '...',
        ResourceAttributes.SERVICE_VERSION: '1.0',
        ResourceAttributes.DEPLOYMENT_ENVIRONMENT: '...',
        'gunicorn.worker_id': worker._worker_id
    })

    # setup open telemetry tracer provider
    tracer_provider = TracerProvider(resource=resource)
    exporter = OTLPSpanExporter(OTEL_ENDPOINT, insecure=True)
    tracer_provider.add_span_processor(BatchSpanProcessor(exporter))
    trace.set_tracer_provider(tracer_provider)

    metric_exporter = OTLPMetricExporter(OTEL_ENDPOINT, insecure=True)
    meter_provider = MeterProvider(resource=resource, metric_readers=[PeriodicExportingMetricReader(metric_exporter)])
    metrics.set_meter_provider(meter_provider)

# Add worker IDs to worker objects. Much smaller metric cardinality than if using PIDs
# from: https://gist.github.com/hynek/ba655c8756924a5febc5285c712a7946
def on_starting(server: Arbiter):
    """Attach a set of IDs that can be temporarily re-used.
      Used on reloads when each worker exists twice."""
    server._worker_id_overload = set() # type: ignore

def nworkers_changed(server: Arbiter, new_value, _old_value):
    """Gets called on startup too.
    Set the current number of workers.  Required if we raise the worker count
    temporarily using TTIN because server.cfg.workers won't be updated and if
    one of those workers dies, we wouldn't know the ids go that far."""
    server._worker_id_current_workers = new_value # type: ignore

def _next_worker_id(server: Arbiter):
    """If there are IDs open for re-use, take one.  Else look for a free one."""
    if server._worker_id_overload: # type: ignore
        return server._worker_id_overload.pop() # type: ignore

    in_use = {w._worker_id for w in (server.WORKERS.values()) if w.alive}
    free = set(range(1, server._worker_id_current_workers + 1)) - in_use # type: ignore

    return free.pop()

def on_reload(server: Arbiter):
    """Add a full set of ids into overload so it can be re-used once."""
    server._worker_id_overload = set(range(1, server.cfg.workers + 1)) # type: ignore

def pre_fork(server: Arbiter, worker: GeventWorker):
    """Attach the next free worker_id before forking"""
    worker._worker_id = _next_worker_id(server) # type: ignore
#!/bin/bash

# exit on any error
set -e

SCRIPT_DIR=`dirname $0`
if [ $SCRIPT_DIR == "." ]
then
    SCRIPT_DIR=$PWD
fi
echo "Starting server"

PORT=8000
WORKERS=2
TIMEOUT=360
MAX_REQUEST=1000
MAX_REQ_JITTER=100
LOG_LEVEL="debug"

gunicorn "entrypoint:create_gunicorn()" \
--bind 0.0.0.0:$PORT \
--workers $WORKERS \
--timeout $TIMEOUT \
--log-level=$LOG_LEVEL \
--keep-alive=360 \
--backlog 8000 \
--max-requests=$MAX_REQUEST \
--max-requests-jitter=$MAX_REQ_JITTER \
--worker-class=gevent \
-c ./example_conf.py

send_requests.sh

#!/bin/bash
while true;
do
    curl -L 127.0.0.1:8000/test --max-time 30 & curl -L 127.0.0.1:8000/test/2 --max-time 30
    wait
done

Expected Result

All requests succeed and metrics are recorded

Actual Result

A small number of requests deadlock

Additional context

No response

Would you like to implement a fix?

No

@ZachMatuson ZachMatuson added the bug Something isn't working label Dec 10, 2024
@Seven4ME
Copy link

Seven4ME commented Dec 17, 2024

Hey, it seems like I’m observing a similar issue.

It works for some time, but then it gets stuck and stops producing any metrics. After some debugging, I found that the problem could be related to a lock inside consume_measurement -> collect of metric_reader_storage, which is used in synchronous metrics. It seems to hang on that lock.

dump from py-spy ->

HEALTH

Thread 30 (idle): "OtelPeriodicExportingMetricReader"
wait (threading.py:316)
wait (threading.py:581)
_ticker (metrics/export.py:105)
run (threading.py:917)
_bootstrap_inner (threading.py:980)
_bootstrap (threading.py:937)

LOCKED

Thread 30 (idle): "OtelPeriodicExportingMetricReader"
aggregate (metrics/_internal/aggregation.py:135)
consume_measurement (metrics/_internal/_view_instrument_match.py:123)
consume_measurement (metrics/_internal/metric_reader_storage.py:120)
consume_measurement (metrics/reader_storage.py:22)
consume_measurement (metrics/_internal/measurement_consumer.py:82)
add (metrics/_internal/instrument.py:177)
on_get_count (monitors/gc.py:138)
callback (monitors/gc.py:127)
collect (metrics/_internal/aggregation.py:272)
collect (metrics/_internal/_view_instrument_match.py:134)
collect (metrics/_internal/metric_reader_storage.py:155)
collect (metrics/_internal/measurement_consumer.py:126)
collect (metrics/_internal/export/init.py:333)
_ticker (metrics/export.py:107)
run (threading.py:917)
_bootstrap_inner (threading.py:980)
_bootstrap (threading.py:937)

As a workaround, I changed the nature of the metrics to observable, and the problem disappeared.
However, this is just a workaround; the root cause should still be addressed and fixed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants