From e9c685ebd9fff5ddba47121598b51848309e1773 Mon Sep 17 00:00:00 2001 From: Cagri Yonca Date: Thu, 18 Jun 2026 14:02:23 +0200 Subject: [PATCH] feat: Add elasticsearch instrumentation Signed-off-by: Cagri Yonca --- .circleci/config.yml | 5 + .gitignore | 5 +- docker-compose.yml | 14 + src/instana/__init__.py | 1 + src/instana/instrumentation/elasticsearch.py | 804 +++++++++++ src/instana/instrumentation/urllib3.py | 5 +- src/instana/span/kind.py | 1 + src/instana/span/registered_span.py | 544 +++---- src/instana/util/config.py | 1 + tests/helpers.py | 6 + tests/instrumentation/test_elasticsearch.py | 1325 ++++++++++++++++++ tests/requirements.txt | 1 + 12 files changed, 2475 insertions(+), 237 deletions(-) create mode 100644 src/instana/instrumentation/elasticsearch.py create mode 100644 tests/instrumentation/test_elasticsearch.py diff --git a/.circleci/config.yml b/.circleci/config.yml index 6128c89a..cdc4f8ed 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -173,6 +173,11 @@ jobs: environment: PUBSUB_EMULATOR_HOST: 0.0.0.0:8681 PUBSUB_PROJECT1: test-project,test-topic + - image: docker.elastic.co/elasticsearch/elasticsearch:9.0.0 + environment: + discovery.type: single-node + xpack.security.enabled: "false" + ES_JAVA_OPTS: "-Xms512m -Xmx512m" working_directory: ~/repo steps: - checkout diff --git a/.gitignore b/.gitignore index 02bee134..bfb55dcf 100644 --- a/.gitignore +++ b/.gitignore @@ -101,4 +101,7 @@ ENV/ .vscode # uv (https://docs.astral.sh/uv/) -uv.lock \ No newline at end of file +uv.lock + +# Sandbox +sandbox/ diff --git a/docker-compose.yml b/docker-compose.yml index 299806a5..2fd473f0 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -97,3 +97,17 @@ services: - transaction.state.log.min.isr=1 - --override - auto.create.topics.enable=true + + elasticsearch: + image: docker.elastic.co/elasticsearch/elasticsearch:9.0.0 + environment: + - discovery.type=single-node + - xpack.security.enabled=false + - "ES_JAVA_OPTS=-Xms512m -Xmx512m" + ports: + - "9200:9200" + healthcheck: + test: ["CMD-SHELL", "curl -sf http://localhost:9200/_cluster/health || exit 1"] + interval: 10s + timeout: 5s + retries: 5 diff --git a/src/instana/__init__.py b/src/instana/__init__.py index cdd37a6e..347f97ed 100644 --- a/src/instana/__init__.py +++ b/src/instana/__init__.py @@ -166,6 +166,7 @@ def boot_agent() -> None: cassandra, # noqa: F401 celery, # noqa: F401 couchbase, # noqa: F401 + elasticsearch, # noqa: F401 fastapi, # noqa: F401 flask, # noqa: F401 grpcio, # noqa: F401 diff --git a/src/instana/instrumentation/elasticsearch.py b/src/instana/instrumentation/elasticsearch.py new file mode 100644 index 00000000..f206dcff --- /dev/null +++ b/src/instana/instrumentation/elasticsearch.py @@ -0,0 +1,804 @@ +# (c) Copyright IBM Corp. 2026 + +""" +Elasticsearch instrumentation +Supports both sync and async clients for elasticsearch +""" + +try: + import json + import re + import time + from typing import TYPE_CHECKING, Any, Callable, Coroutine, Optional + + if TYPE_CHECKING: + from instana.span.span import InstanaSpan + + import elasticsearch # noqa: F401 + import wrapt + from opentelemetry.context import get_current + from opentelemetry.semconv.trace import SpanAttributes + + from instana.log import logger + from instana.util.traceutils import get_tracer_tuple + + # Regex patterns for URL parsing + DOCUMENT_ID_PATTERN = re.compile(r"^/[^/]+/_doc/([^/?]+)") + INDEX_PATTERN = re.compile(r"^/([^/?]+)") + + # Connection cache to avoid repeated URL parsing and store cluster info + # Structure: {connection_id: {host, port, cluster_name, last_updated}} + _connection_cache: dict[str, dict[str, Any]] = {} + + # Cluster name cache TTL (5 minutes) + CLUSTER_NAME_CACHE_TTL = 300 + + def get_connection_id(instance: Any) -> Optional[str]: + """ + Generate a unique connection ID for caching. + Uses host:port as identifier via elastic-transport node_pool. + """ + try: + if hasattr(instance, "transport"): + transport = instance.transport + if hasattr(transport, "node_pool"): + nodes = list(transport.node_pool.all()) + if nodes: + cfg = nodes[0].config + return f"{cfg.host}:{cfg.port}" + except Exception: + logger.debug("get_connection_id error:", exc_info=True) + return None + + def _get_cached_cluster_name(connection_id: str) -> Optional[str]: + """Return cached cluster name if still within TTL, otherwise None.""" + if connection_id in _connection_cache: + cached = _connection_cache[connection_id] + cluster_name = cached.get("cluster_name") + if ( + cluster_name + and (time.time() - cached.get("last_updated", 0)) + < CLUSTER_NAME_CACHE_TTL + ): + return cluster_name + return None + + def _store_cluster_name(connection_id: str, cluster_name: str) -> None: + """Persist a discovered cluster name into the connection cache.""" + if connection_id not in _connection_cache: + _connection_cache[connection_id] = {} + _connection_cache[connection_id]["cluster_name"] = cluster_name + _connection_cache[connection_id]["last_updated"] = time.time() + + def _extract_cluster_name_from_response(info_response: Any) -> Optional[str]: + """Pull cluster_name out of an ES info() response object.""" + if hasattr(info_response, "body"): + body = getattr(info_response, "body", None) + if isinstance(body, dict): + return body.get("cluster_name") + return None + + def discover_cluster_name(instance: Any, connection_id: str) -> Optional[str]: + """ + Discover Elasticsearch cluster name by calling cluster info API (sync). + Caches result with TTL to avoid repeated API calls. + """ + try: + cached = _get_cached_cluster_name(connection_id) + if cached: + return cached + + # perform_request is already instrumented; the span_name == "elasticsearch" + # guard inside it prevents recursive tracing of this info() call. + if hasattr(instance, "info"): + try: + cluster_name = _extract_cluster_name_from_response(instance.info()) + if cluster_name: + _store_cluster_name(connection_id, cluster_name) + return cluster_name + except Exception as e: + logger.debug(f"elasticsearch cluster name discovery failed: {e}") + + except Exception: + logger.debug("discover_cluster_name error:", exc_info=True) + + return None + + def _set_connection_span_attributes( + span: "InstanaSpan", + host: Optional[str], + port: Optional[Any], + cluster_name: Optional[str], + ) -> None: + """Set elasticsearch connection-related span attributes.""" + if host: + span.set_attribute("elasticsearch.address", host) + if port is not None: + span.set_attribute("elasticsearch.port", port) + if cluster_name: + span.set_attribute("elasticsearch.cluster", cluster_name) + + def _resolve_transport_host_port( + instance: Any, + ) -> tuple[Optional[str], Optional[Any]]: + """Read host and port from the first node in the transport node pool.""" + if hasattr(instance, "transport"): + transport = instance.transport + if hasattr(transport, "node_pool"): + try: + nodes = list(transport.node_pool.all()) + if nodes: + cfg = nodes[0].config + return cfg.host, cfg.port + except Exception: + pass + return None, None + + def collect_connection_info( + span: "InstanaSpan", + instance: Any, + ) -> None: + """ + Collect connection information and cluster name (sync). + Uses caching to optimize performance. + """ + try: + connection_id = get_connection_id(instance) + if not connection_id: + return + + if connection_id in _connection_cache: + cached = _connection_cache[connection_id] + _set_connection_span_attributes( + span, + cached.get("host"), + cached.get("port"), + cached.get("cluster_name"), + ) + # No fallback to host:port — backend uses address+port when cluster is absent + return + + host, port = _resolve_transport_host_port(instance) + if host is not None: + _connection_cache[connection_id] = { + "host": host, + "port": port, + "last_updated": time.time(), + } + _set_connection_span_attributes( + span, host, port, discover_cluster_name(instance, connection_id) + ) + # No fallback to host:port — backend uses address+port when cluster is absent + + except Exception: + logger.debug("elasticsearch collect_connection_info error:", exc_info=True) + + def shorten_query_string(query: str, max_length: int = 1000) -> str: + """ + Shorten long query strings for logging + """ + if not query or len(query) <= max_length: + return query + return query[:max_length] + "..." + + def to_string_es_multi_parameter(param: Any) -> Optional[str]: + """ + Convert Elasticsearch multi-parameter to string + Handles: string, list, None + """ + if param is None: + return None + if isinstance(param, str): + return "_all" if param == "" else param + if isinstance(param, list): + return ",".join(str(p) for p in param) + return str(param) + + def extract_index_from_url(url: str) -> Optional[str]: + """Extract index name from URL path""" + try: + # Match pattern: /index_name/... + match = INDEX_PATTERN.match(url) + if match: + index = match.group(1) + # Filter out special endpoints + if not index.startswith("_"): + return index + except Exception: + logger.debug("extract_index_from_url error:", exc_info=True) + return None + + def extract_document_id_from_url(url: str) -> Optional[str]: + """ + Extract document ID from URL + Pattern: /index/_doc/document_id + """ + try: + match = DOCUMENT_ID_PATTERN.match(url) + if match: + return match.group(1) + except Exception: + logger.debug("extract_document_id_from_url error:", exc_info=True) + return None + + def detect_action_from_url(method: str, url: str) -> str: + """ + Detect Elasticsearch action from HTTP method and URL + Returns action name like: search, index, get, delete, bulk, etc. + """ + try: + url_lower = url.lower() + + # Multi-operations + if "_msearch" in url_lower: + return "msearch" + if "_mget" in url_lower: + return "mget" + if "_bulk" in url_lower: + return "bulk" + + # Search operations + if "_search" in url_lower: + return "search" + + # Document operations + if "_doc" in url_lower or "_create" in url_lower: + if method == "POST" or method == "PUT": + return "index" + elif method == "GET": + return "get" + elif method == "DELETE": + return "delete" + + # Update operation + if "_update" in url_lower: + return "update" + + # Index operations + if "/_mapping" in url_lower: + return "indices.putMapping" if method == "PUT" else "indices.getMapping" + if "/_settings" in url_lower: + return ( + "indices.putSettings" if method == "PUT" else "indices.getSettings" + ) + + # Fallback to HTTP method + return method.lower() + except Exception: + logger.debug("detect_action_from_url error:", exc_info=True) + return method.lower() + + def extract_params_from_request( + span: "InstanaSpan", + method: str, + url: str, + params: Optional[dict[str, Any]] = None, + body: Optional[Any] = None, + ) -> None: + """ + Extract and set Elasticsearch parameters from request + Handles: index, type, id, query extraction, multi-operations + """ + try: + action = detect_action_from_url(method, url) + span.set_attribute("elasticsearch.action", action) + + # Handle multi-operations with specialized processors + if action == "mget": + process_mget_params(span, body, params) + return + elif action == "msearch": + process_msearch_params(span, body) + return + elif action == "bulk": + process_bulk_params(span, body) + return + + # Standard single operations + # Extract index from URL + index = extract_index_from_url(url) + if index: + span.set_attribute("elasticsearch.index", index) + + # Extract document ID from URL + doc_id = extract_document_id_from_url(url) + if doc_id: + span.set_attribute("elasticsearch.id", doc_id) + + # Extract query from body for search operations + if action == "search" and body: + try: + if isinstance(body, dict): + query_str = json.dumps(body) + elif isinstance(body, str): + query_str = body + else: + query_str = str(body) + + shortened_query = shorten_query_string(query_str) + span.set_attribute("elasticsearch.query", shortened_query) + except Exception: + logger.debug("extract query error:", exc_info=True) + + # Handle params if provided + if params: + # Extract index from params if not in URL + if not index and "index" in params: + index_param = to_string_es_multi_parameter(params.get("index")) + if index_param: + span.set_attribute("elasticsearch.index", index_param) + + # Extract document ID from params + if not doc_id and "id" in params: + span.set_attribute("elasticsearch.id", str(params["id"])) + + except Exception: + logger.debug("extract_params_from_request error:", exc_info=True) + + def process_mget_params( + span: "InstanaSpan", + body: Optional[Any] = None, + params: Optional[dict[str, Any]] = None, + ) -> None: + """ + Process multi-get (mget) parameters + Extracts index and id from docs array or ids array. + """ + try: + indices = set() + doc_ids = [] + + # Handle body with docs array + if body and isinstance(body, dict): + docs = body.get("docs", []) + if isinstance(docs, list): + for doc in docs: + if isinstance(doc, dict): + if "_index" in doc: + indices.add(doc["_index"]) + if "_id" in doc: + doc_ids.append(str(doc["_id"])) + + # Handle ids array (requires index in params or URL) + ids = body.get("ids", []) + if isinstance(ids, list) and ids: + doc_ids.extend(str(id_val) for id_val in ids) + + # Extract index from params if not in body + if params and "index" in params and not indices: + index_param = to_string_es_multi_parameter(params.get("index")) + if index_param: + indices.add(index_param) + + # Set attributes + if indices: + span.set_attribute("elasticsearch.index", ",".join(sorted(indices))) + if doc_ids: + # Limit to first 10 IDs to avoid too long attribute + ids_str = ",".join(doc_ids[:10]) + if len(doc_ids) > 10: + ids_str += f",... ({len(doc_ids)} total)" + span.set_attribute("elasticsearch.id", ids_str) + + except Exception: + logger.debug("process_mget_params error:", exc_info=True) + + def _parse_ndjson_body(body: Any) -> Optional[list]: + """ + Normalise a bulk/msearch body into a list of dicts. + Accepts a newline-delimited JSON string or an already-parsed list. + Returns None when the body type is unsupported. + """ + if isinstance(body, str): + result = [] + for line in body.split("\n"): + line = line.strip() + if not line: + continue + try: + result.append(json.loads(line)) + except json.JSONDecodeError: + continue + return result + if isinstance(body, list): + return body + return None + + def process_msearch_params( + span: "InstanaSpan", + body: Optional[Any] = None, + ) -> None: + """ + Process multi-search (msearch) parameters + Extracts indices and queries from body array + Body format: [header, body, header, body, ...] + """ + try: + indices = set() + queries = [] + + if body: + body_list = _parse_ndjson_body(body) + if body_list is None: + return + + # Process pairs: header (even index), body (odd index) + for i in range(0, len(body_list), 2): + # Header contains index + if i < len(body_list) and isinstance(body_list[i], dict): + header = body_list[i] + if "index" in header: + index_val = to_string_es_multi_parameter(header["index"]) + if index_val: + indices.add(index_val) + + # Body contains query + if i + 1 < len(body_list) and isinstance(body_list[i + 1], dict): + query_body = body_list[i + 1] + if query_body: + queries.append(query_body) + + # Set attributes + if indices: + span.set_attribute("elasticsearch.index", ",".join(sorted(indices))) + + # Combine queries (limit size) + if queries: + try: + combined_query = json.dumps({"queries": queries}) + shortened_query = shorten_query_string( + combined_query, max_length=1000 + ) + span.set_attribute("elasticsearch.query", shortened_query) + except Exception: + logger.debug("msearch query serialization error:", exc_info=True) + + except Exception: + logger.debug("process_msearch_params error:", exc_info=True) + + def process_bulk_params( + span: "InstanaSpan", + body: Optional[Any] = None, + ) -> None: + """ + Process bulk operation parameters + Extracts operation count and indices + Body format: [action, doc, action, doc, ...] + """ + try: + indices = set() + operation_count = 0 + operations = set() + + if body: + body_list = _parse_ndjson_body(body) + if body_list is None: + return + + # Process pairs: action (even index), document (odd index) + for i in range(0, len(body_list), 2): + if i < len(body_list) and isinstance(body_list[i], dict): + action_line = body_list[i] + operation_count += 1 + + # Extract operation type (index, create, update, delete) + for op_type in ["index", "create", "update", "delete"]: + if op_type in action_line: + operations.add(op_type) + op_data = action_line[op_type] + if isinstance(op_data, dict) and "_index" in op_data: + indices.add(op_data["_index"]) + break + + # Set attributes + if indices: + span.set_attribute("elasticsearch.index", ",".join(sorted(indices))) + if operation_count > 0: + span.set_attribute("elasticsearch.bulk.size", operation_count) + if operations: + span.set_attribute( + "elasticsearch.bulk.operations", ",".join(sorted(operations)) + ) + + except Exception: + logger.debug("process_bulk_params error:", exc_info=True) + + def extract_response_metadata(span: "InstanaSpan", response: Any) -> None: + """ + Extract metadata from Elasticsearch response + Handles: hits count, connection details, multi-operation responses + """ + try: + # Extract hits count from search response + if hasattr(response, "body") and isinstance(response.body, dict): + body = response.body + + # Single search response + if "hits" in body and "total" in body["hits"]: + total = body["hits"]["total"] + if isinstance(total, int): + span.set_attribute("elasticsearch.hits", total) + elif isinstance(total, dict) and "value" in total: + span.set_attribute("elasticsearch.hits", total["value"]) + + # Multi-search response (msearch) + elif "responses" in body and isinstance(body["responses"], list): + total_hits = 0 + success_count = 0 + error_count = 0 + + for resp in body["responses"]: + if isinstance(resp, dict): + # Check for errors in response + if "error" in resp: + error_count += 1 + else: + success_count += 1 + # Count hits + if "hits" in resp and "total" in resp["hits"]: + total = resp["hits"]["total"] + if isinstance(total, int): + total_hits += total + elif isinstance(total, dict) and "value" in total: + total_hits += total["value"] + + span.set_attribute("elasticsearch.hits", total_hits) + if success_count > 0: + span.set_attribute( + "elasticsearch.msearch.success", success_count + ) + if error_count > 0: + span.set_attribute("elasticsearch.msearch.errors", error_count) + + # Multi-get response (mget) + elif "docs" in body and isinstance(body["docs"], list): + found_count = 0 + not_found_count = 0 + + for doc in body["docs"]: + if isinstance(doc, dict): + if doc.get("found", False): + found_count += 1 + else: + not_found_count += 1 + + if found_count > 0: + span.set_attribute("elasticsearch.mget.found", found_count) + if not_found_count > 0: + span.set_attribute( + "elasticsearch.mget.not_found", not_found_count + ) + + # Bulk response + elif "items" in body and isinstance(body["items"], list): + success_count = 0 + error_count = 0 + + for item in body["items"]: + if isinstance(item, dict): + # Each item is a dict with operation type as key + for op_result in item.values(): + if isinstance(op_result, dict): + status = op_result.get("status", 0) + if 200 <= status < 300: + success_count += 1 + else: + error_count += 1 + + if success_count > 0: + span.set_attribute("elasticsearch.bulk.success", success_count) + if error_count > 0: + span.set_attribute("elasticsearch.bulk.errors", error_count) + + except Exception: + logger.debug("extract_response_metadata error:", exc_info=True) + + # Standard (Sync) Client Instrumentation + # ES 8.x/9.x: perform_request(method, path, *, params, headers, body, endpoint_id, path_parts) + # All parameters after `path` are keyword-only; we must forward them faithfully so the + # internal mimetype-compatibility header rewriting (_COMPAT_MIMETYPE_RE) still runs. + @wrapt.patch_function_wrapper( + "elasticsearch._sync.client._base", "BaseClient.perform_request" + ) + def perform_request_with_instana( + wrapped: Callable[..., Any], + instance: Any, + args: tuple[Any, ...], + kwargs: dict[str, Any], + ) -> Any: + tracer, _, span_name = get_tracer_tuple() + if span_name == "elasticsearch": + return wrapped(*args, **kwargs) + if not tracer: + logger.debug( + "elasticsearch: tracer not available, skipping instrumentation" + ) + return wrapped(*args, **kwargs) + + parent_context = get_current() + + logger.debug("elasticsearch: creating span for request") + + # ES uses keyword-only parameters after `path`. + # Extract method and path from positional args or kwargs. + method = args[0] if len(args) > 0 else kwargs.get("method", "GET") + # ES uses `path`; older versions used `url` as positional arg[1] + url = args[1] if len(args) > 1 else kwargs.get("path", kwargs.get("url", "/")) + params = kwargs.get("params") + body = kwargs.get("body") + + with tracer.start_as_current_span( + "elasticsearch", context=parent_context + ) as span: + try: + logger.debug(f"elasticsearch: method={method}, url={url}") + + # Collect connection info first + collect_connection_info(span, instance) + + # Extract parameters and set attributes + extract_params_from_request(span, method, url, params, body) + + # Set URL as endpoint (backend fallback label when action is absent) + span.set_attribute("elasticsearch.endpoint", url) + span.set_attribute("elasticsearch.url", url) + + # Execute the request — forward all original args/kwargs unchanged + # so ES internal header processing (mimetype compat) still works + response = wrapped(*args, **kwargs) + + # Extract response metadata + extract_response_metadata(span, response) + + # Response status code + if hasattr(response, "meta") and hasattr(response.meta, "status"): + status_code = response.meta.status + span.set_attribute(SpanAttributes.HTTP_STATUS_CODE, status_code) + if status_code >= 500: + span.set_attribute("elasticsearch.error", f"HTTP {status_code}") + + return response + except Exception as exc: + span.record_exception(exc) + span.set_attribute("elasticsearch.error", str(exc)) + raise + + # --------------------------------------------------------------------------- + # Async Client Instrumentation + # --------------------------------------------------------------------------- + + async def _async_discover_cluster_name( + instance: Any, connection_id: str + ) -> Optional[str]: + """ + Async version of discover_cluster_name. + Reuses the shared cache helpers; only the instance.info() call is awaited. + """ + try: + cached = _get_cached_cluster_name(connection_id) + if cached: + return cached + + if hasattr(instance, "info"): + try: + cluster_name = _extract_cluster_name_from_response( + await instance.info() + ) + if cluster_name: + _store_cluster_name(connection_id, cluster_name) + return cluster_name + except Exception as e: + logger.debug( + f"elasticsearch async cluster name discovery failed: {e}" + ) + + except Exception: + logger.debug("_async_discover_cluster_name error:", exc_info=True) + + return None + + async def _async_collect_connection_info( + span: "InstanaSpan", + instance: Any, + ) -> None: + """ + Async version of collect_connection_info. + Reuses shared helpers; only cluster discovery is awaited. + """ + try: + connection_id = get_connection_id(instance) + if not connection_id: + return + + if connection_id in _connection_cache: + cached = _connection_cache[connection_id] + _set_connection_span_attributes( + span, + cached.get("host"), + cached.get("port"), + cached.get("cluster_name"), + ) + return + + host, port = _resolve_transport_host_port(instance) + if host is not None: + _connection_cache[connection_id] = { + "host": host, + "port": port, + "last_updated": time.time(), + } + _set_connection_span_attributes( + span, + host, + port, + await _async_discover_cluster_name(instance, connection_id), + ) + + except Exception: + logger.debug( + "elasticsearch async collect_connection_info error:", exc_info=True + ) + + @wrapt.patch_function_wrapper( + "elasticsearch._async.client._base", "BaseClient.perform_request" + ) + async def async_perform_request_with_instana( + wrapped: Callable[..., Coroutine[Any, Any, Any]], + instance: Any, + args: tuple[Any, ...], + kwargs: dict[str, Any], + ) -> Any: + tracer, _, span_name = get_tracer_tuple() + if span_name == "elasticsearch": + return await wrapped(*args, **kwargs) + + if not tracer: + logger.debug( + "elasticsearch async: tracer not available, skipping instrumentation" + ) + return await wrapped(*args, **kwargs) + + parent_context = get_current() + + logger.debug("elasticsearch async: creating span for request") + + method = args[0] if len(args) > 0 else kwargs.get("method", "GET") + url = args[1] if len(args) > 1 else kwargs.get("path", kwargs.get("url", "/")) + params = kwargs.get("params") + body = kwargs.get("body") + + with tracer.start_as_current_span( + "elasticsearch", context=parent_context + ) as span: + try: + logger.debug(f"elasticsearch async: method={method}, url={url}") + + await _async_collect_connection_info(span, instance) + + extract_params_from_request(span, method, url, params, body) + + span.set_attribute("elasticsearch.endpoint", url) + span.set_attribute("elasticsearch.url", url) + + response = await wrapped(*args, **kwargs) + + extract_response_metadata(span, response) + + if hasattr(response, "meta") and hasattr(response.meta, "status"): + status_code = response.meta.status + span.set_attribute(SpanAttributes.HTTP_STATUS_CODE, status_code) + if status_code >= 500: + span.set_attribute("elasticsearch.error", f"HTTP {status_code}") + + return response + except Exception as exc: + span.record_exception(exc) + span.set_attribute("elasticsearch.error", str(exc)) + raise + + logger.debug("Instrumenting elasticsearch") + +except ImportError: + pass + +# Made with Bob diff --git a/src/instana/instrumentation/urllib3.py b/src/instana/instrumentation/urllib3.py index ba0bf7e5..8ed9a976 100644 --- a/src/instana/instrumentation/urllib3.py +++ b/src/instana/instrumentation/urllib3.py @@ -93,8 +93,9 @@ def urlopen_with_instana( ) -> urllib3.response.HTTPResponse: tracer, _, span_name = get_tracer_tuple() - # If we're not tracing, just return; boto3 has it's own visibility - if not tracer or span_name == "boto3": + # If we're not tracing, just return. + # boto3 and elasticsearch have their own dedicated exit spans. + if not tracer or span_name in ("boto3", "elasticsearch"): return wrapped(*args, **kwargs) parent_context = get_current() diff --git a/src/instana/span/kind.py b/src/instana/span/kind.py index 52663b13..1cc5e8dd 100644 --- a/src/instana/span/kind.py +++ b/src/instana/span/kind.py @@ -44,6 +44,7 @@ "celery-client", "couchbase", "dynamodb", + "elasticsearch", "httpx", "log", "memcache", diff --git a/src/instana/span/registered_span.py b/src/instana/span/registered_span.py index 340546a2..339cf6e1 100644 --- a/src/instana/span/registered_span.py +++ b/src/instana/span/registered_span.py @@ -72,98 +72,22 @@ def _populate_entry_span_data(self, span: "InstanaSpan") -> None: self._collect_http_attributes(span) elif span.name == "aioamqp-consumer": - self.data["amqp"]["command"] = span.attributes.pop("amqp.command", None) - self.data["amqp"]["routingkey"] = span.attributes.pop( - "amqp.routing_key", None - ) - self.data["amqp"]["connection"] = span.attributes.pop( - "amqp.connection", None - ) - self.data["amqp"]["error"] = span.attributes.pop("amqp.error", None) + self._collect_aioamqp_attributes(span) elif span.name == "aws.lambda.entry": - self.data["lambda"]["arn"] = span.attributes.pop("lambda.arn", "Unknown") - self.data["lambda"]["alias"] = None - self.data["lambda"]["runtime"] = "python" - self.data["lambda"]["functionName"] = span.attributes.pop( - "lambda.name", "Unknown" - ) - self.data["lambda"]["functionVersion"] = span.attributes.pop( - "lambda.version", "Unknown" - ) - self.data["lambda"]["trigger"] = span.attributes.pop("lambda.trigger", None) - self.data["lambda"]["error"] = span.attributes.pop("lambda.error", None) - - trigger_type = self.data["lambda"]["trigger"] - - if trigger_type in ["aws:api.gateway", "aws:application.load.balancer"]: - self._collect_http_attributes(span) - elif trigger_type == "aws:cloudwatch.events": - self.data["lambda"]["cw"]["events"]["id"] = span.attributes.pop( - "data.lambda.cw.events.id", None - ) - self.data["lambda"]["cw"]["events"]["more"] = span.attributes.pop( - "lambda.cw.events.more", False - ) - self.data["lambda"]["cw"]["events"]["resources"] = span.attributes.pop( - "lambda.cw.events.resources", None - ) - - elif trigger_type == "aws:cloudwatch.logs": - self.data["lambda"]["cw"]["logs"]["group"] = span.attributes.pop( - "lambda.cw.logs.group", None - ) - self.data["lambda"]["cw"]["logs"]["stream"] = span.attributes.pop( - "lambda.cw.logs.stream", None - ) - self.data["lambda"]["cw"]["logs"]["more"] = span.attributes.pop( - "lambda.cw.logs.more", None - ) - self.data["lambda"]["cw"]["logs"]["events"] = span.attributes.pop( - "lambda.cw.logs.events", None - ) - - elif trigger_type == "aws:s3": - self.data["lambda"]["s3"]["events"] = span.attributes.pop( - "lambda.s3.events", None - ) - elif trigger_type == "aws:sqs": - self.data["lambda"]["sqs"]["messages"] = span.attributes.pop( - "lambda.sqs.messages", None - ) + self._collect_lambda_attributes(span) elif span.name == "celery-worker": - self.data["celery"]["task"] = span.attributes.pop("task", None) - self.data["celery"]["task_id"] = span.attributes.pop("task_id", None) - self.data["celery"]["scheme"] = span.attributes.pop("scheme", None) - self.data["celery"]["host"] = span.attributes.pop("host", None) - self.data["celery"]["port"] = span.attributes.pop("port", None) - self.data["celery"]["retry-reason"] = span.attributes.pop( - "retry-reason", None - ) - self.data["celery"]["error"] = span.attributes.pop("error", None) + self._collect_celery_attributes(span) elif span.name == "gcps-consumer": - self.data["gcps"]["op"] = span.attributes.pop("gcps.op", None) - self.data["gcps"]["projid"] = span.attributes.pop("gcps.projid", None) - self.data["gcps"]["sub"] = span.attributes.pop("gcps.sub", None) + self._collect_gcps_consumer_attributes(span) elif span.name == "rabbitmq": - self.data["rabbitmq"]["exchange"] = span.attributes.pop("exchange", None) - self.data["rabbitmq"]["queue"] = span.attributes.pop("queue", None) - self.data["rabbitmq"]["sort"] = span.attributes.pop("sort", None) - self.data["rabbitmq"]["address"] = span.attributes.pop("address", None) - self.data["rabbitmq"]["key"] = span.attributes.pop("key", None) + self._collect_rabbitmq_attributes(span) elif span.name == "rpc-server": - self.data["rpc"]["flavor"] = span.attributes.pop("rpc.flavor", None) - self.data["rpc"]["host"] = span.attributes.pop("rpc.host", None) - self.data["rpc"]["port"] = span.attributes.pop("rpc.port", None) - self.data["rpc"]["call"] = span.attributes.pop("rpc.call", None) - self.data["rpc"]["call_type"] = span.attributes.pop("rpc.call_type", None) - self.data["rpc"]["params"] = span.attributes.pop("rpc.params", None) - # self.data["rpc"]["baggage"] = span.attributes.pop("rpc.baggage", None) - self.data["rpc"]["error"] = span.attributes.pop("rpc.error", None) + self._collect_rpc_attributes(span) elif span.name.startswith("kafka"): self._collect_kafka_attributes(span) @@ -185,190 +109,58 @@ def _populate_exit_span_data(self, span: "InstanaSpan") -> None: self._collect_http_attributes(span) elif span.name == "aioamqp-publisher": - self.data["amqp"]["command"] = span.attributes.pop("amqp.command", None) - self.data["amqp"]["routingkey"] = span.attributes.pop( - "amqp.routing_key", None - ) - self.data["amqp"]["connection"] = span.attributes.pop( - "amqp.connection", None - ) - self.data["amqp"]["error"] = span.attributes.pop("amqp.error", None) + self._collect_aioamqp_attributes(span) elif span.name == "boto3": - # boto3 also sends http attributes - self._collect_http_attributes(span) - - for attribute in ["op", "ep", "reg", "payload", "error"]: - value = span.attributes.pop(attribute, None) - if value is not None: - if attribute == "payload": - self.data["boto3"][attribute] = self._validate_attributes(value) - else: - self.data["boto3"][attribute] = value + self._collect_boto3_attributes(span) elif span.name == "cassandra": - self.data["cassandra"]["cluster"] = span.attributes.pop( - "cassandra.cluster", None - ) - self.data["cassandra"]["query"] = span.attributes.pop( - "cassandra.query", None - ) - self.data["cassandra"]["keyspace"] = span.attributes.pop( - "cassandra.keyspace", None - ) - self.data["cassandra"]["fetchSize"] = span.attributes.pop( - "cassandra.fetchSize", None - ) - self.data["cassandra"]["achievedConsistency"] = span.attributes.pop( - "cassandra.achievedConsistency", None - ) - self.data["cassandra"]["triedHosts"] = span.attributes.pop( - "cassandra.triedHosts", None - ) - self.data["cassandra"]["fullyFetched"] = span.attributes.pop( - "cassandra.fullyFetched", None - ) - self.data["cassandra"]["error"] = span.attributes.pop( - "cassandra.error", None - ) + self._collect_cassandra_attributes(span) elif span.name == "celery-client": - self.data["celery"]["task"] = span.attributes.pop("task", None) - self.data["celery"]["task_id"] = span.attributes.pop("task_id", None) - self.data["celery"]["scheme"] = span.attributes.pop("scheme", None) - self.data["celery"]["host"] = span.attributes.pop("host", None) - self.data["celery"]["port"] = span.attributes.pop("port", None) - self.data["celery"]["error"] = span.attributes.pop("error", None) + self._collect_celery_attributes(span) elif span.name == "couchbase": - self.data["couchbase"]["hostname"] = span.attributes.pop( - "couchbase.hostname", None - ) - self.data["couchbase"]["bucket"] = span.attributes.pop( - "couchbase.bucket", None - ) - self.data["couchbase"]["type"] = span.attributes.pop("couchbase.type", None) - self.data["couchbase"]["error"] = span.attributes.pop( - "couchbase.error", None - ) - self.data["couchbase"]["error_type"] = span.attributes.pop( - "couchbase.error_type", None - ) - self.data["couchbase"]["sql"] = span.attributes.pop("couchbase.sql", None) + self._collect_couchbase_attributes(span) elif span.name == "dynamodb": - self.data["dynamodb"]["op"] = span.attributes.pop("dynamodb.op", None) - self.data["dynamodb"]["region"] = span.attributes.pop( - "dynamodb.region", None - ) - self.data["dynamodb"]["table"] = span.attributes.pop("dynamodb.table", None) + self._collect_dynamodb_attributes(span) + + elif span.name == "elasticsearch": + self._collect_elasticsearch_attributes(span) elif span.name == "rabbitmq": - self.data["rabbitmq"]["exchange"] = span.attributes.pop("exchange", None) - self.data["rabbitmq"]["queue"] = span.attributes.pop("queue", None) - self.data["rabbitmq"]["sort"] = span.attributes.pop("sort", None) - self.data["rabbitmq"]["address"] = span.attributes.pop("address", None) - self.data["rabbitmq"]["key"] = span.attributes.pop("key", None) + self._collect_rabbitmq_attributes(span) elif span.name == "redis": - self.data["redis"]["connection"] = span.attributes.pop("connection", None) - self.data["redis"]["driver"] = span.attributes.pop("driver", None) - self.data["redis"]["command"] = span.attributes.pop("command", None) - self.data["redis"]["error"] = span.attributes.pop("redis.error", None) - self.data["redis"]["subCommands"] = span.attributes.pop("subCommands", None) + self._collect_redis_attributes(span) elif span.name == "rpc-client": - self.data["rpc"]["flavor"] = span.attributes.pop("rpc.flavor", None) - self.data["rpc"]["host"] = span.attributes.pop("rpc.host", None) - self.data["rpc"]["port"] = span.attributes.pop("rpc.port", None) - self.data["rpc"]["call"] = span.attributes.pop("rpc.call", None) - self.data["rpc"]["call_type"] = span.attributes.pop("rpc.call_type", None) - self.data["rpc"]["params"] = span.attributes.pop("rpc.params", None) - # self.data["rpc"]["baggage"] = span.attributes.pop("rpc.baggage", None) - self.data["rpc"]["error"] = span.attributes.pop("rpc.error", None) + self._collect_rpc_attributes(span) elif span.name == "s3": - self.data["s3"]["op"] = span.attributes.pop("s3.op", None) - self.data["s3"]["bucket"] = span.attributes.pop("s3.bucket", None) + self._collect_s3_attributes(span) elif span.name == "sqlalchemy": - self.data["sqlalchemy"]["sql"] = span.attributes.pop("sqlalchemy.sql", None) - self.data["sqlalchemy"]["eng"] = span.attributes.pop("sqlalchemy.eng", None) - self.data["sqlalchemy"]["url"] = span.attributes.pop("sqlalchemy.url", None) - self.data["sqlalchemy"]["err"] = span.attributes.pop("sqlalchemy.err", None) + self._collect_sqlalchemy_attributes(span) elif span.name == "mysql": - self.data["mysql"]["host"] = span.attributes.pop("host", None) - self.data["mysql"]["port"] = span.attributes.pop("port", None) - self.data["mysql"]["db"] = span.attributes.pop(SpanAttributes.DB_NAME, None) - self.data["mysql"]["user"] = span.attributes.pop( - SpanAttributes.DB_USER, None - ) - self.data["mysql"]["stmt"] = span.attributes.pop( - SpanAttributes.DB_STATEMENT, None - ) - self.data["mysql"]["error"] = span.attributes.pop("mysql.error", None) + self._collect_mysql_attributes(span) elif span.name == "postgres": - self.data["pg"]["host"] = span.attributes.pop("host", None) - self.data["pg"]["port"] = span.attributes.pop("port", None) - self.data["pg"]["db"] = span.attributes.pop("db.name", None) - self.data["pg"]["user"] = span.attributes.pop("db.user", None) - self.data["pg"]["stmt"] = span.attributes.pop("db.statement", None) - self.data["pg"]["error"] = span.attributes.pop("pg.error", None) + self._collect_postgres_attributes(span) elif span.name == "mongo": - service = f"{span.attributes.pop(SpanAttributes.SERVER_ADDRESS, None)}:{span.attributes.pop(SpanAttributes.SERVER_PORT, None)}" - namespace = f"{span.attributes.pop(SpanAttributes.DB_NAME, '?')}.{span.attributes.pop(SpanAttributes.DB_MONGODB_COLLECTION, '?')}" - - self.data["mongo"]["service"] = service - self.data["mongo"]["namespace"] = namespace - self.data["mongo"]["command"] = span.attributes.pop("command", None) - self.data["mongo"]["filter"] = span.attributes.pop("filter", None) - self.data["mongo"]["json"] = span.attributes.pop("json", None) - self.data["mongo"]["error"] = span.attributes.pop("error", None) + self._collect_mongo_attributes(span) elif span.name == "gcs": - self.data["gcs"]["op"] = span.attributes.pop("gcs.op", None) - self.data["gcs"]["bucket"] = span.attributes.pop("gcs.bucket", None) - self.data["gcs"]["object"] = span.attributes.pop("gcs.object", None) - self.data["gcs"]["entity"] = span.attributes.pop("gcs.entity", None) - self.data["gcs"]["range"] = span.attributes.pop("gcs.range", None) - self.data["gcs"]["sourceBucket"] = span.attributes.pop( - "gcs.sourceBucket", None - ) - self.data["gcs"]["sourceObject"] = span.attributes.pop( - "gcs.sourceObject", None - ) - self.data["gcs"]["sourceObjects"] = span.attributes.pop( - "gcs.sourceObjects", None - ) - self.data["gcs"]["destinationBucket"] = span.attributes.pop( - "gcs.destinationBucket", None - ) - self.data["gcs"]["destinationObject"] = span.attributes.pop( - "gcs.destinationObject", None - ) - self.data["gcs"]["numberOfOperations"] = span.attributes.pop( - "gcs.numberOfOperations", None - ) - self.data["gcs"]["projectId"] = span.attributes.pop("gcs.projectId", None) - self.data["gcs"]["accessId"] = span.attributes.pop("gcs.accessId", None) + self._collect_gcs_attributes(span) elif span.name == "gcps-producer": - self.data["gcps"]["op"] = span.attributes.pop("gcps.op", None) - self.data["gcps"]["projid"] = span.attributes.pop("gcps.projid", None) - self.data["gcps"]["top"] = span.attributes.pop("gcps.top", None) + self._collect_gcps_producer_attributes(span) elif span.name == "log": - # use last special key values - for event in span.events: - if "message" in event.attributes: - self.data["log"]["message"] = event.attributes.pop("message", None) - if "parameters" in event.attributes: - self.data["log"]["parameters"] = event.attributes.pop( - "parameters", None - ) + self._collect_log_attributes(span) elif span.name.startswith("kafka"): self._collect_kafka_attributes(span) @@ -400,3 +192,287 @@ def _collect_kafka_attributes(self, span: "InstanaSpan") -> None: self.data["kafka"]["service"] = span.attributes.pop("kafka.service", None) self.data["kafka"]["access"] = span.attributes.pop("kafka.access", None) self.data["kafka"]["error"] = span.attributes.pop("kafka.error", None) + + def _collect_aioamqp_attributes(self, span: "InstanaSpan") -> None: + self.data["amqp"]["command"] = span.attributes.pop("amqp.command", None) + self.data["amqp"]["routingkey"] = span.attributes.pop("amqp.routing_key", None) + self.data["amqp"]["connection"] = span.attributes.pop("amqp.connection", None) + self.data["amqp"]["error"] = span.attributes.pop("amqp.error", None) + + def _collect_boto3_attributes(self, span: "InstanaSpan") -> None: + # boto3 also sends http attributes + self._collect_http_attributes(span) + + for attribute in ["op", "ep", "reg", "payload", "error"]: + value = span.attributes.pop(attribute, None) + if value is not None: + if attribute == "payload": + self.data["boto3"][attribute] = self._validate_attributes(value) + else: + self.data["boto3"][attribute] = value + + def _collect_cassandra_attributes(self, span: "InstanaSpan") -> None: + self.data["cassandra"]["cluster"] = span.attributes.pop( + "cassandra.cluster", None + ) + self.data["cassandra"]["query"] = span.attributes.pop("cassandra.query", None) + self.data["cassandra"]["keyspace"] = span.attributes.pop( + "cassandra.keyspace", None + ) + self.data["cassandra"]["fetchSize"] = span.attributes.pop( + "cassandra.fetchSize", None + ) + self.data["cassandra"]["achievedConsistency"] = span.attributes.pop( + "cassandra.achievedConsistency", None + ) + self.data["cassandra"]["triedHosts"] = span.attributes.pop( + "cassandra.triedHosts", None + ) + self.data["cassandra"]["fullyFetched"] = span.attributes.pop( + "cassandra.fullyFetched", None + ) + self.data["cassandra"]["error"] = span.attributes.pop("cassandra.error", None) + + def _collect_celery_attributes(self, span: "InstanaSpan") -> None: + self.data["celery"]["task"] = span.attributes.pop("task", None) + self.data["celery"]["task_id"] = span.attributes.pop("task_id", None) + self.data["celery"]["scheme"] = span.attributes.pop("scheme", None) + self.data["celery"]["host"] = span.attributes.pop("host", None) + self.data["celery"]["port"] = span.attributes.pop("port", None) + self.data["celery"]["retry-reason"] = span.attributes.pop("retry-reason", None) + self.data["celery"]["error"] = span.attributes.pop("error", None) + + def _collect_couchbase_attributes(self, span: "InstanaSpan") -> None: + self.data["couchbase"]["hostname"] = span.attributes.pop( + "couchbase.hostname", None + ) + self.data["couchbase"]["bucket"] = span.attributes.pop("couchbase.bucket", None) + self.data["couchbase"]["type"] = span.attributes.pop("couchbase.type", None) + self.data["couchbase"]["error"] = span.attributes.pop("couchbase.error", None) + self.data["couchbase"]["error_type"] = span.attributes.pop( + "couchbase.error_type", None + ) + self.data["couchbase"]["sql"] = span.attributes.pop("couchbase.sql", None) + + def _collect_dynamodb_attributes(self, span: "InstanaSpan") -> None: + self.data["dynamodb"]["op"] = span.attributes.pop("dynamodb.op", None) + self.data["dynamodb"]["region"] = span.attributes.pop("dynamodb.region", None) + self.data["dynamodb"]["table"] = span.attributes.pop("dynamodb.table", None) + + def _collect_elasticsearch_attributes(self, span: "InstanaSpan") -> None: + self.data["elasticsearch"]["cluster"] = span.attributes.pop( + "elasticsearch.cluster", None + ) + self.data["elasticsearch"]["action"] = span.attributes.pop( + "elasticsearch.action", None + ) + self.data["elasticsearch"]["endpoint"] = span.attributes.pop( + "elasticsearch.endpoint", None + ) + self.data["elasticsearch"]["url"] = span.attributes.pop( + "elasticsearch.url", None + ) + self.data["elasticsearch"]["index"] = span.attributes.pop( + "elasticsearch.index", None + ) + self.data["elasticsearch"]["id"] = span.attributes.pop("elasticsearch.id", None) + self.data["elasticsearch"]["query"] = span.attributes.pop( + "elasticsearch.query", None + ) + self.data["elasticsearch"]["hits"] = span.attributes.pop( + "elasticsearch.hits", None + ) + self.data["elasticsearch"]["address"] = span.attributes.pop( + "elasticsearch.address", None + ) + self.data["elasticsearch"]["port"] = span.attributes.pop( + "elasticsearch.port", None + ) + self.data["elasticsearch"]["error"] = span.attributes.pop( + "elasticsearch.error", None + ) + + # Bulk operation attributes + self.data["elasticsearch"]["bulk.size"] = span.attributes.pop( + "elasticsearch.bulk.size", None + ) + self.data["elasticsearch"]["bulk.operations"] = span.attributes.pop( + "elasticsearch.bulk.operations", None + ) + self.data["elasticsearch"]["bulk.success"] = span.attributes.pop( + "elasticsearch.bulk.success", None + ) + self.data["elasticsearch"]["bulk.errors"] = span.attributes.pop( + "elasticsearch.bulk.errors", None + ) + + # Multi-get attributes + self.data["elasticsearch"]["mget.found"] = span.attributes.pop( + "elasticsearch.mget.found", None + ) + self.data["elasticsearch"]["mget.not_found"] = span.attributes.pop( + "elasticsearch.mget.not_found", None + ) + + # Multi-search attributes + self.data["elasticsearch"]["msearch.success"] = span.attributes.pop( + "elasticsearch.msearch.success", None + ) + self.data["elasticsearch"]["msearch.errors"] = span.attributes.pop( + "elasticsearch.msearch.errors", None + ) + + def _collect_rabbitmq_attributes(self, span: "InstanaSpan") -> None: + self.data["rabbitmq"]["exchange"] = span.attributes.pop("exchange", None) + self.data["rabbitmq"]["queue"] = span.attributes.pop("queue", None) + self.data["rabbitmq"]["sort"] = span.attributes.pop("sort", None) + self.data["rabbitmq"]["address"] = span.attributes.pop("address", None) + self.data["rabbitmq"]["key"] = span.attributes.pop("key", None) + + def _collect_redis_attributes(self, span: "InstanaSpan") -> None: + self.data["redis"]["connection"] = span.attributes.pop("connection", None) + self.data["redis"]["driver"] = span.attributes.pop("driver", None) + self.data["redis"]["command"] = span.attributes.pop("command", None) + self.data["redis"]["error"] = span.attributes.pop("redis.error", None) + self.data["redis"]["subCommands"] = span.attributes.pop("subCommands", None) + + def _collect_rpc_attributes(self, span: "InstanaSpan") -> None: + self.data["rpc"]["flavor"] = span.attributes.pop("rpc.flavor", None) + self.data["rpc"]["host"] = span.attributes.pop("rpc.host", None) + self.data["rpc"]["port"] = span.attributes.pop("rpc.port", None) + self.data["rpc"]["call"] = span.attributes.pop("rpc.call", None) + self.data["rpc"]["call_type"] = span.attributes.pop("rpc.call_type", None) + self.data["rpc"]["params"] = span.attributes.pop("rpc.params", None) + # self.data["rpc"]["baggage"] = span.attributes.pop("rpc.baggage", None) + self.data["rpc"]["error"] = span.attributes.pop("rpc.error", None) + + def _collect_s3_attributes(self, span: "InstanaSpan") -> None: + self.data["s3"]["op"] = span.attributes.pop("s3.op", None) + self.data["s3"]["bucket"] = span.attributes.pop("s3.bucket", None) + + def _collect_sqlalchemy_attributes(self, span: "InstanaSpan") -> None: + self.data["sqlalchemy"]["sql"] = span.attributes.pop("sqlalchemy.sql", None) + self.data["sqlalchemy"]["eng"] = span.attributes.pop("sqlalchemy.eng", None) + self.data["sqlalchemy"]["url"] = span.attributes.pop("sqlalchemy.url", None) + self.data["sqlalchemy"]["err"] = span.attributes.pop("sqlalchemy.err", None) + + def _collect_mysql_attributes(self, span: "InstanaSpan") -> None: + self.data["mysql"]["host"] = span.attributes.pop("host", None) + self.data["mysql"]["port"] = span.attributes.pop("port", None) + self.data["mysql"]["db"] = span.attributes.pop(SpanAttributes.DB_NAME, None) + self.data["mysql"]["user"] = span.attributes.pop(SpanAttributes.DB_USER, None) + self.data["mysql"]["stmt"] = span.attributes.pop( + SpanAttributes.DB_STATEMENT, None + ) + self.data["mysql"]["error"] = span.attributes.pop("mysql.error", None) + + def _collect_postgres_attributes(self, span: "InstanaSpan") -> None: + self.data["pg"]["host"] = span.attributes.pop("host", None) + self.data["pg"]["port"] = span.attributes.pop("port", None) + self.data["pg"]["db"] = span.attributes.pop("db.name", None) + self.data["pg"]["user"] = span.attributes.pop("db.user", None) + self.data["pg"]["stmt"] = span.attributes.pop("db.statement", None) + self.data["pg"]["error"] = span.attributes.pop("pg.error", None) + + def _collect_mongo_attributes(self, span: "InstanaSpan") -> None: + service = f"{span.attributes.pop(SpanAttributes.SERVER_ADDRESS, None)}:{span.attributes.pop(SpanAttributes.SERVER_PORT, None)}" + namespace = f"{span.attributes.pop(SpanAttributes.DB_NAME, '?')}.{span.attributes.pop(SpanAttributes.DB_MONGODB_COLLECTION, '?')}" + + self.data["mongo"]["service"] = service + self.data["mongo"]["namespace"] = namespace + self.data["mongo"]["command"] = span.attributes.pop("command", None) + self.data["mongo"]["filter"] = span.attributes.pop("filter", None) + self.data["mongo"]["json"] = span.attributes.pop("json", None) + self.data["mongo"]["error"] = span.attributes.pop("error", None) + + def _collect_gcs_attributes(self, span: "InstanaSpan") -> None: + self.data["gcs"]["op"] = span.attributes.pop("gcs.op", None) + self.data["gcs"]["bucket"] = span.attributes.pop("gcs.bucket", None) + self.data["gcs"]["object"] = span.attributes.pop("gcs.object", None) + self.data["gcs"]["entity"] = span.attributes.pop("gcs.entity", None) + self.data["gcs"]["range"] = span.attributes.pop("gcs.range", None) + self.data["gcs"]["sourceBucket"] = span.attributes.pop("gcs.sourceBucket", None) + self.data["gcs"]["sourceObject"] = span.attributes.pop("gcs.sourceObject", None) + self.data["gcs"]["sourceObjects"] = span.attributes.pop( + "gcs.sourceObjects", None + ) + self.data["gcs"]["destinationBucket"] = span.attributes.pop( + "gcs.destinationBucket", None + ) + self.data["gcs"]["destinationObject"] = span.attributes.pop( + "gcs.destinationObject", None + ) + self.data["gcs"]["numberOfOperations"] = span.attributes.pop( + "gcs.numberOfOperations", None + ) + self.data["gcs"]["projectId"] = span.attributes.pop("gcs.projectId", None) + self.data["gcs"]["accessId"] = span.attributes.pop("gcs.accessId", None) + + def _collect_gcps_consumer_attributes(self, span: "InstanaSpan") -> None: + self.data["gcps"]["op"] = span.attributes.pop("gcps.op", None) + self.data["gcps"]["projid"] = span.attributes.pop("gcps.projid", None) + self.data["gcps"]["sub"] = span.attributes.pop("gcps.sub", None) + + def _collect_gcps_producer_attributes(self, span: "InstanaSpan") -> None: + self.data["gcps"]["op"] = span.attributes.pop("gcps.op", None) + self.data["gcps"]["projid"] = span.attributes.pop("gcps.projid", None) + self.data["gcps"]["top"] = span.attributes.pop("gcps.top", None) + + def _collect_lambda_attributes(self, span: "InstanaSpan") -> None: + self.data["lambda"]["arn"] = span.attributes.pop("lambda.arn", "Unknown") + self.data["lambda"]["alias"] = None + self.data["lambda"]["runtime"] = "python" + self.data["lambda"]["functionName"] = span.attributes.pop( + "lambda.name", "Unknown" + ) + self.data["lambda"]["functionVersion"] = span.attributes.pop( + "lambda.version", "Unknown" + ) + self.data["lambda"]["trigger"] = span.attributes.pop("lambda.trigger", None) + self.data["lambda"]["error"] = span.attributes.pop("lambda.error", None) + + trigger_type = self.data["lambda"]["trigger"] + + if trigger_type in ["aws:api.gateway", "aws:application.load.balancer"]: + self._collect_http_attributes(span) + elif trigger_type == "aws:cloudwatch.events": + self.data["lambda"]["cw"]["events"]["id"] = span.attributes.pop( + "data.lambda.cw.events.id", None + ) + self.data["lambda"]["cw"]["events"]["more"] = span.attributes.pop( + "lambda.cw.events.more", False + ) + self.data["lambda"]["cw"]["events"]["resources"] = span.attributes.pop( + "lambda.cw.events.resources", None + ) + elif trigger_type == "aws:cloudwatch.logs": + self.data["lambda"]["cw"]["logs"]["group"] = span.attributes.pop( + "lambda.cw.logs.group", None + ) + self.data["lambda"]["cw"]["logs"]["stream"] = span.attributes.pop( + "lambda.cw.logs.stream", None + ) + self.data["lambda"]["cw"]["logs"]["more"] = span.attributes.pop( + "lambda.cw.logs.more", None + ) + self.data["lambda"]["cw"]["logs"]["events"] = span.attributes.pop( + "lambda.cw.logs.events", None + ) + elif trigger_type == "aws:s3": + self.data["lambda"]["s3"]["events"] = span.attributes.pop( + "lambda.s3.events", None + ) + elif trigger_type == "aws:sqs": + self.data["lambda"]["sqs"]["messages"] = span.attributes.pop( + "lambda.sqs.messages", None + ) + + def _collect_log_attributes(self, span: "InstanaSpan") -> None: + # use last special key values + for event in span.events: + if "message" in event.attributes: + self.data["log"]["message"] = event.attributes.pop("message", None) + if "parameters" in event.attributes: + self.data["log"]["parameters"] = event.attributes.pop( + "parameters", None + ) diff --git a/src/instana/util/config.py b/src/instana/util/config.py index f5f33655..5377c89c 100644 --- a/src/instana/util/config.py +++ b/src/instana/util/config.py @@ -28,6 +28,7 @@ "cassandra": "databases", "couchbase": "databases", "dynamodb": "databases", + "elasticsearch": "databases", "sqlalchemy": "databases", # Messaging types "kafka": "messaging", diff --git a/tests/helpers.py b/tests/helpers.py index 050e18a4..d65ede71 100644 --- a/tests/helpers.py +++ b/tests/helpers.py @@ -75,6 +75,12 @@ f"{testenv['kafka_host']}:{testenv['kafka_port']}", ] +""" +Elasticsearch Environment +""" +testenv["elasticsearch_host"] = os.environ.get("ELASTICSEARCH_HOST", "127.0.0.1") +testenv["elasticsearch_port"] = os.environ.get("ELASTICSEARCH_PORT", "9200") + def drop_log_spans_from_list(spans): """ diff --git a/tests/instrumentation/test_elasticsearch.py b/tests/instrumentation/test_elasticsearch.py new file mode 100644 index 00000000..ef684647 --- /dev/null +++ b/tests/instrumentation/test_elasticsearch.py @@ -0,0 +1,1325 @@ +# (c) Copyright IBM Corp. 2026 + +""" +Integration tests for Elasticsearch instrumentation +Tests ES 9.x compatibility with real Elasticsearch connection +""" + +import contextlib +import os +import pytest +from typing import Generator + +from instana.singletons import agent, get_tracer +from instana.span.span import get_current_span +from tests.helpers import testenv + + +# Check if Elasticsearch is available +try: + from elasticsearch import Elasticsearch + + elasticsearch_available = True +except ImportError: + elasticsearch_available = False + + +@pytest.mark.skipif( + not elasticsearch_available, reason="elasticsearch-py not installed" +) +class TestElasticsearch: + @pytest.fixture(autouse=True) + def _resource(self) -> Generator[None, None, None]: + """Setup test resources and clear spans before each test""" + # Disable Elasticsearch client's built-in OpenTelemetry instrumentation + # to avoid duplicate spans + os.environ["OTEL_PYTHON_INSTRUMENTATION_ELASTICSEARCH_ENABLED"] = "False" + + # Clear the instrumentation's connection cache so each test starts + # with a clean state (prevents cluster-discovery spans leaking in). + from instana.instrumentation.elasticsearch import _connection_cache + + _connection_cache.clear() + + self.tracer = get_tracer() + self.recorder = self.tracer.span_processor + self.recorder.clear_spans() + + # Create Elasticsearch client + self.client = Elasticsearch([ + f"http://{testenv['elasticsearch_host']}:{testenv['elasticsearch_port']}" + ]) + + # Create test index + self.test_index = "test-instana-es" + with contextlib.suppress(Exception): + self.client.indices.delete( + index=self.test_index, + ignore_unavailable=True, + ) + + # Warm up the connection so cluster-discovery urllib3 spans don't + # leak into the test's span count. + with contextlib.suppress(Exception): + self.client.info() + + # Clear any spans created during setup + self.recorder.clear_spans() + + yield + + # Cleanup + with contextlib.suppress(Exception): + self.client.indices.delete( + index=self.test_index, + ignore_unavailable=True, + ) + agent.options.allow_exit_as_root = False + + def test_vanilla_search(self) -> None: + """Test search without tracing context""" + # Index a document first + self.client.index( + index=self.test_index, id="1", document={"name": "test", "value": 100} + ) + self.client.indices.refresh(index=self.test_index) + + # Search without tracing + response = self.client.search( + index=self.test_index, body={"query": {"match_all": {}}} + ) + + # Should have results but no spans + assert response + spans = self.recorder.queued_spans() + assert len(spans) == 0 + + def test_basic_search(self) -> None: + """Test basic search operation with tracing""" + # Index a document + with self.tracer.start_as_current_span("test"): + self.client.index( + index=self.test_index, id="1", document={"name": "test", "value": 100} + ) + self.client.indices.refresh(index=self.test_index) + + # Search + response = self.client.search( + index=self.test_index, body={"query": {"match_all": {}}} + ) + + assert response + spans = self.recorder.queued_spans() + # urllib3 spans are suppressed when the active span is "elasticsearch", + # so each ES operation produces exactly one elasticsearch span. + # Total: es_index + es_refresh + es_search + test = 4 + assert len(spans) == 4 + + # Filter spans by type + es_spans = [s for s in spans if s.n == "elasticsearch"] + urllib3_spans = [s for s in spans if s.n == "urllib3"] + test_spans = [s for s in spans if s.n == "sdk"] + + assert len(es_spans) == 3 # index, refresh, search + assert len(urllib3_spans) == 0 + assert len(test_spans) == 1 + + search_span = es_spans[2] # Last ES span is search + test_span = test_spans[0] + + # Verify span relationships + assert search_span.t == test_span.t + + # Verify span attributes + assert search_span.n == "elasticsearch" + assert not search_span.ec + assert "elasticsearch" in search_span.data + + es_data = search_span.data["elasticsearch"] + assert es_data["action"] == "search" + assert es_data["index"] == self.test_index + assert "query" in es_data + assert "hits" in es_data + assert es_data["hits"] >= 0 + + def test_basic_search_as_root_span(self) -> None: + """Test search as root exit span""" + agent.options.allow_exit_as_root = True + + # Index a document + self.client.index( + index=self.test_index, id="1", document={"name": "test", "value": 100} + ) + self.client.indices.refresh(index=self.test_index) + + # Search as root span + response = self.client.search( + index=self.test_index, body={"query": {"match_all": {}}} + ) + + assert response + spans = self.recorder.queued_spans() + + # urllib3 spans suppressed under elasticsearch; only ES spans visible + es_spans = [s for s in spans if s.n == "elasticsearch"] + assert len(es_spans) == 3 # index, refresh, search + + search_span = es_spans[2] # The search operation + + # Root span should have no parent + assert not search_span.p + assert not search_span.ec + + # Verify attributes + assert search_span.n == "elasticsearch" + es_data = search_span.data["elasticsearch"] + assert es_data["action"] == "search" + assert es_data["index"] == self.test_index + + def test_index_document(self) -> None: + """Test document indexing""" + with self.tracer.start_as_current_span("test"): + response = self.client.index( + index=self.test_index, + id="doc1", + document={"field": "value", "number": 42}, + ) + + assert response + spans = self.recorder.queued_spans() + # urllib3 suppressed under elasticsearch: es_span + test span = 2 + assert len(spans) == 2 + + # Filter spans by type + es_spans = [s for s in spans if s.n == "elasticsearch"] + test_spans = [s for s in spans if s.n == "sdk"] + + assert len(es_spans) == 1 + assert len(test_spans) == 1 + + es_span = es_spans[0] + test_span = test_spans[0] + + assert es_span.t == test_span.t + assert not es_span.ec + + assert es_span.n == "elasticsearch" + es_data = es_span.data["elasticsearch"] + assert es_data["action"] == "index" + assert es_data["index"] == self.test_index + assert es_data["id"] == "doc1" + + def test_get_document(self) -> None: + """Test document retrieval""" + # Index a document first + self.client.index(index=self.test_index, id="doc1", document={"field": "value"}) + self.client.indices.refresh(index=self.test_index) + + with self.tracer.start_as_current_span("test"): + response = self.client.get(index=self.test_index, id="doc1") + + assert response + spans = self.recorder.queued_spans() + # urllib3 suppressed under elasticsearch: es_span + test span = 2 + assert len(spans) == 2 + + es_spans = [s for s in spans if s.n == "elasticsearch"] + assert len(es_spans) == 1 + es_span = es_spans[0] + + assert es_span.n == "elasticsearch" + es_data = es_span.data["elasticsearch"] + assert es_data["action"] == "get" + assert es_data["index"] == self.test_index + assert es_data["id"] == "doc1" + + def test_delete_document(self) -> None: + """Test document deletion""" + # Index a document first + self.client.index(index=self.test_index, id="doc1", document={"field": "value"}) + self.client.indices.refresh(index=self.test_index) + + with self.tracer.start_as_current_span("test"): + response = self.client.delete(index=self.test_index, id="doc1") + + assert response + spans = self.recorder.queued_spans() + # urllib3 suppressed under elasticsearch: es_span + test span = 2 + assert len(spans) == 2 + + es_spans = [s for s in spans if s.n == "elasticsearch"] + assert len(es_spans) == 1 + es_span = es_spans[0] + + assert es_span.n == "elasticsearch" + es_data = es_span.data["elasticsearch"] + assert es_data["action"] == "delete" + assert es_data["index"] == self.test_index + assert es_data["id"] == "doc1" + + def test_mget_operation(self) -> None: + """Test multi-get operation""" + # Index multiple documents + for i in range(1, 4): + self.client.index( + index=self.test_index, + id=str(i), + document={"name": f"doc{i}", "value": i * 10}, + ) + self.client.indices.refresh(index=self.test_index) + + with self.tracer.start_as_current_span("test"): + response = self.client.mget( + body={ + "docs": [ + {"_index": self.test_index, "_id": "1"}, + {"_index": self.test_index, "_id": "2"}, + {"_index": self.test_index, "_id": "3"}, + ] + } + ) + + assert response + spans = self.recorder.queued_spans() + # urllib3 suppressed under elasticsearch: es_span + test span = 2 + assert len(spans) == 2 + + es_spans = [s for s in spans if s.n == "elasticsearch"] + assert len(es_spans) == 1 + es_span = es_spans[0] + + assert es_span.n == "elasticsearch" + es_data = es_span.data["elasticsearch"] + assert es_data["action"] == "mget" + assert es_data["index"] == self.test_index + assert "1,2,3" in es_data["id"] + assert "mget.found" in es_data + assert es_data["mget.found"] == 3 + + def test_msearch_operation(self) -> None: + """Test multi-search operation""" + # Index documents in multiple indices + for idx in ["index1", "index2"]: + self.client.index( + index=f"{self.test_index}-{idx}", + id="1", + document={"name": "test", "value": 100}, + ) + self.client.indices.refresh(index=f"{self.test_index}-{idx}") + + with self.tracer.start_as_current_span("test"): + response = self.client.msearch( + body=[ + {"index": f"{self.test_index}-index1"}, + {"query": {"match_all": {}}}, + {"index": f"{self.test_index}-index2"}, + {"query": {"match_all": {}}}, + ] + ) + + assert response + spans = self.recorder.queued_spans() + # urllib3 suppressed under elasticsearch: es_span + test span = 2 + assert len(spans) == 2 + + es_spans = [s for s in spans if s.n == "elasticsearch"] + assert len(es_spans) == 1 + es_span = es_spans[0] + + assert es_span.n == "elasticsearch" + es_data = es_span.data["elasticsearch"] + assert es_data["action"] == "msearch" + assert "index1" in es_data["index"] + assert "index2" in es_data["index"] + assert "msearch.success" in es_data + assert es_data["msearch.success"] >= 0 + + def test_bulk_operation(self) -> None: + """Test bulk operation""" + with self.tracer.start_as_current_span("test"): + response = self.client.bulk( + body=[ + {"index": {"_index": self.test_index, "_id": "1"}}, + {"field": "value1"}, + {"index": {"_index": self.test_index, "_id": "2"}}, + {"field": "value2"}, + {"delete": {"_index": self.test_index, "_id": "3"}}, + ] + ) + + assert response + spans = self.recorder.queued_spans() + # urllib3 suppressed under elasticsearch: es_span + test span = 2 + assert len(spans) == 2 + + es_spans = [s for s in spans if s.n == "elasticsearch"] + assert len(es_spans) == 1 + es_span = es_spans[0] + + assert es_span.n == "elasticsearch" + es_data = es_span.data["elasticsearch"] + assert es_data["action"] == "bulk" + assert es_data["index"] == self.test_index + assert "bulk.size" in es_data + assert es_data["bulk.size"] == 3 + assert "index" in es_data["bulk.operations"] + assert "delete" in es_data["bulk.operations"] + + def test_error_capture(self) -> None: + """Test error handling and capture""" + try: + with self.tracer.start_as_current_span("test"): + # Try to get non-existent document + self.client.get(index=self.test_index, id="nonexistent") + except Exception: + pass + + spans = self.recorder.queued_spans() + # urllib3 suppressed under elasticsearch: es_span + test span = 2 + assert len(spans) == 2 + + es_spans = [s for s in spans if s.n == "elasticsearch"] + assert len(es_spans) == 1 + es_span = es_spans[0] + + # record_exception() increments ec; elasticsearch.error also sets it → ec >= 1 + assert es_span.ec >= 1 + assert "elasticsearch" in es_span.data + assert "error" in es_span.data["elasticsearch"] + + def test_connection_info(self) -> None: + """Test connection information capture""" + # First create the index + self.client.index(index=self.test_index, id="1", document={"test": "data"}) + self.client.indices.refresh(index=self.test_index) + + with self.tracer.start_as_current_span("test"): + self.client.search(index=self.test_index, body={"query": {"match_all": {}}}) + + spans = self.recorder.queued_spans() + # urllib3 suppressed under elasticsearch: es_span + test span = 2 + assert len(spans) == 2 + + es_spans = [s for s in spans if s.n == "elasticsearch"] + assert len(es_spans) == 1 + es_span = es_spans[0] + es_data = es_span.data["elasticsearch"] + + # Should have connection info + assert "address" in es_data + assert "port" in es_data + # Cluster name might be available depending on ES setup + # assert "cluster" in es_data + + def test_query_shortening(self) -> None: + """Test that long queries are shortened""" + # Create a very long query + long_query = { + "query": { + "bool": { + "should": [{"match": {"field": f"value{i}"}} for i in range(100)] + } + } + } + + with self.tracer.start_as_current_span("test"), contextlib.suppress(Exception): + self.client.search(index=self.test_index, body=long_query) + + spans = self.recorder.queued_spans() + # urllib3 suppressed under elasticsearch: es_span + test span = 2 + assert len(spans) == 2 + + es_spans = [s for s in spans if s.n == "elasticsearch"] + assert len(es_spans) == 1 + es_span = es_spans[0] + es_data = es_span.data["elasticsearch"] + + # Query should be present but shortened + assert "query" in es_data + query_str = es_data["query"] + # Should be truncated to max 1000 chars + "..." + assert len(query_str) <= 1003 + + def test_multiple_operations(self) -> None: + """Test multiple operations in sequence""" + with self.tracer.start_as_current_span("test"): + # Index + self.client.index(index=self.test_index, id="1", document={"name": "test"}) + # Get + self.client.get(index=self.test_index, id="1") + # Search + self.client.search(index=self.test_index, body={"query": {"match_all": {}}}) + # Delete + self.client.delete(index=self.test_index, id="1") + + spans = self.recorder.queued_spans() + # urllib3 suppressed under elasticsearch: 4 es_spans + test span = 5 + assert len(spans) == 5 + + # Filter spans by type + es_spans = [s for s in spans if s.n == "elasticsearch"] + test_spans = [s for s in spans if s.n == "sdk"] + + assert len(es_spans) == 4 # index, get, search, delete + assert len(test_spans) == 1 + + test_span = test_spans[0] + + # Verify all ES spans have correct trace ID + for es_span in es_spans: + assert es_span.t == test_span.t + assert es_span.n == "elasticsearch" + + def test_update_operation(self) -> None: + """Test update operation — covers _update URL action detection""" + self.client.index(index=self.test_index, id="1", document={"field": "value"}) + + with self.tracer.start_as_current_span("test"): + self.client.update( + index=self.test_index, id="1", body={"doc": {"field": "updated"}} + ) + + spans = self.recorder.queued_spans() + es_spans = [s for s in spans if s.n == "elasticsearch"] + assert len(es_spans) == 1 + assert es_spans[0].data["elasticsearch"]["action"] == "update" + + def test_mget_with_ids_array(self) -> None: + """Test mget with 'ids' array body — covers process_mget_params ids path""" + for i in range(1, 4): + self.client.index(index=self.test_index, id=str(i), document={"v": i}) + self.client.indices.refresh(index=self.test_index) + + with self.tracer.start_as_current_span("test"): + response = self.client.mget( + index=self.test_index, + body={"ids": ["1", "2", "3"]}, + ) + + assert response + spans = self.recorder.queued_spans() + es_spans = [s for s in spans if s.n == "elasticsearch"] + assert len(es_spans) == 1 + es_data = es_spans[0].data["elasticsearch"] + assert es_data["action"] == "mget" + assert "id" in es_data + + def test_mget_with_many_ids(self) -> None: + """Test mget with >10 docs — covers the id truncation path""" + for i in range(1, 13): + self.client.index(index=self.test_index, id=str(i), document={"v": i}) + self.client.indices.refresh(index=self.test_index) + + with self.tracer.start_as_current_span("test"): + response = self.client.mget( + body={ + "docs": [ + {"_index": self.test_index, "_id": str(i)} for i in range(1, 13) + ] + } + ) + + assert response + spans = self.recorder.queued_spans() + es_spans = [s for s in spans if s.n == "elasticsearch"] + assert len(es_spans) == 1 + es_data = es_spans[0].data["elasticsearch"] + assert "total)" in es_data["id"] + + def test_search_with_string_body(self) -> None: + """Test search with pre-serialised string body — covers str body path""" + import json as _json + + self.client.index(index=self.test_index, id="1", document={"name": "test"}) + self.client.indices.refresh(index=self.test_index) + + query_str = _json.dumps({"query": {"match_all": {}}}) + + with self.tracer.start_as_current_span("test"): + # Pass the body as a raw string so the str branch is exercised. + # ES 9.x accepts it through the params kwarg workaround below. + # We exercise the code path by calling extract_params_from_request + # directly since the high-level client always serialises to dict. + from instana.instrumentation.elasticsearch import ( + extract_params_from_request, + ) + from unittest.mock import MagicMock + + mock_span = MagicMock() + extract_params_from_request(mock_span, "GET", "/_search", None, query_str) + mock_span.set_attribute.assert_any_call("elasticsearch.query", query_str) + + def test_search_with_non_dict_body(self) -> None: + """Covers the else branch of the body type check in extract_params_from_request""" + from instana.instrumentation.elasticsearch import extract_params_from_request + from unittest.mock import MagicMock + + mock_span = MagicMock() + # Pass an arbitrary non-dict, non-str body + extract_params_from_request(mock_span, "GET", "/_search", None, 42) + mock_span.set_attribute.assert_any_call("elasticsearch.query", "42") + + def test_params_index_and_id_fallback(self) -> None: + """Covers params-based index/id extraction when URL has no index/id""" + from instana.instrumentation.elasticsearch import extract_params_from_request + from unittest.mock import MagicMock + + mock_span = MagicMock() + extract_params_from_request( + mock_span, + "GET", + "/_doc/doc1", + {"index": "my-index", "id": "doc1"}, + None, + ) + mock_span.set_attribute.assert_any_call("elasticsearch.index", "my-index") + # elasticsearch.type is no longer emitted (removed in ES 8.x+) + calls = [str(c) for c in mock_span.set_attribute.call_args_list] + assert not any("elasticsearch.type" in c for c in calls) + + def test_bulk_with_string_body(self) -> None: + """Test bulk with newline-delimited JSON string body — covers str body path""" + import json as _json + + ndjson = "\n".join([ + _json.dumps({"index": {"_index": self.test_index, "_id": "1"}}), + _json.dumps({"field": "value1"}), + _json.dumps({"index": {"_index": self.test_index, "_id": "2"}}), + _json.dumps({"field": "value2"}), + ]) + + with self.tracer.start_as_current_span("test"): + from instana.instrumentation.elasticsearch import process_bulk_params + from unittest.mock import MagicMock + + mock_span = MagicMock() + process_bulk_params(mock_span, ndjson) + mock_span.set_attribute.assert_any_call("elasticsearch.bulk.size", 2) + + def test_msearch_with_string_body(self) -> None: + """Test msearch with newline-delimited JSON string — covers str body path""" + import json as _json + + ndjson = "\n".join([ + _json.dumps({"index": f"{self.test_index}-index1"}), + _json.dumps({"query": {"match_all": {}}}), + ]) + + from instana.instrumentation.elasticsearch import process_msearch_params + from unittest.mock import MagicMock + + mock_span = MagicMock() + process_msearch_params(mock_span, ndjson) + mock_span.set_attribute.assert_any_call( + "elasticsearch.index", f"{self.test_index}-index1" + ) + + def test_http_500_error_sets_span_error(self) -> None: + """Covers the HTTP 5xx branch in perform_request_with_instana""" + from unittest.mock import MagicMock, patch + + mock_response = MagicMock() + mock_response.meta.status = 503 + mock_response.body = {} + + with ( + self.tracer.start_as_current_span("test"), + patch( + "elasticsearch._sync.client._base.BaseClient.perform_request", + wraps=lambda *a, **kw: mock_response, + ), + ): + pass # just verify the span error branch is reachable via unit path + + # Verify via direct unit call instead + from instana.instrumentation.elasticsearch import perform_request_with_instana + from unittest.mock import MagicMock + + mock_span = MagicMock() + mock_span.__enter__ = lambda s: mock_span + mock_span.__exit__ = MagicMock(return_value=False) + + mock_tracer = MagicMock() + mock_tracer.start_as_current_span.return_value = mock_span + + mock_response = MagicMock() + mock_response.meta.status = 503 + + with ( + patch("instana.instrumentation.elasticsearch.get_tracer_tuple") as mock_gt, + patch("instana.instrumentation.elasticsearch.get_current"), + patch("instana.instrumentation.elasticsearch.collect_connection_info"), + patch("instana.instrumentation.elasticsearch.extract_params_from_request"), + patch("instana.instrumentation.elasticsearch.extract_response_metadata"), + ): + mock_gt.return_value = (mock_tracer, None, None) + wrapped = MagicMock(return_value=mock_response) + instance = MagicMock() + perform_request_with_instana(wrapped, instance, ("GET", "/test"), {}) + + mock_span.set_attribute.assert_any_call("elasticsearch.error", "HTTP 503") + + def test_extract_response_metadata_int_total(self) -> None: + """Covers the isinstance(total, int) branch in extract_response_metadata""" + from instana.instrumentation.elasticsearch import extract_response_metadata + from unittest.mock import MagicMock + + mock_span = MagicMock() + mock_response = MagicMock() + mock_response.body = {"hits": {"total": 5, "hits": []}} + extract_response_metadata(mock_span, mock_response) + mock_span.set_attribute.assert_any_call("elasticsearch.hits", 5) + + def test_extract_response_metadata_msearch_errors(self) -> None: + """Covers msearch error_count branch in extract_response_metadata""" + from instana.instrumentation.elasticsearch import extract_response_metadata + from unittest.mock import MagicMock + + mock_span = MagicMock() + mock_response = MagicMock() + mock_response.body = { + "responses": [ + {"hits": {"total": {"value": 1}, "hits": []}}, + {"error": {"type": "index_not_found_exception"}}, + ] + } + extract_response_metadata(mock_span, mock_response) + mock_span.set_attribute.assert_any_call("elasticsearch.msearch.errors", 1) + mock_span.set_attribute.assert_any_call("elasticsearch.msearch.success", 1) + + def test_extract_response_metadata_mget_not_found(self) -> None: + """Covers mget not_found_count branch in extract_response_metadata""" + from instana.instrumentation.elasticsearch import extract_response_metadata + from unittest.mock import MagicMock + + mock_span = MagicMock() + mock_response = MagicMock() + mock_response.body = { + "docs": [ + {"found": True}, + {"found": False}, + {"found": False}, + ] + } + extract_response_metadata(mock_span, mock_response) + mock_span.set_attribute.assert_any_call("elasticsearch.mget.found", 1) + mock_span.set_attribute.assert_any_call("elasticsearch.mget.not_found", 2) + + def test_msearch_with_int_total_per_response(self) -> None: + """Covers msearch int total branch in extract_response_metadata""" + from instana.instrumentation.elasticsearch import extract_response_metadata + from unittest.mock import MagicMock + + mock_span = MagicMock() + mock_response = MagicMock() + mock_response.body = { + "responses": [ + {"hits": {"total": 3, "hits": []}}, + ] + } + extract_response_metadata(mock_span, mock_response) + mock_span.set_attribute.assert_any_call("elasticsearch.hits", 3) + + def test_current_span_cleanup(self) -> None: + """Test that current span is properly cleaned up""" + # First create the index and add a document + self.client.index(index=self.test_index, id="1", document={"name": "test"}) + self.client.indices.refresh(index=self.test_index) + self.recorder.clear_spans() + + with self.tracer.start_as_current_span("test"), contextlib.suppress(Exception): + self.client.search(index=self.test_index, body={"query": {"match_all": {}}}) + + # After context, current span should not be recording + current_span = get_current_span() + assert not current_span.is_recording() + + # Verify spans were created + spans = self.recorder.queued_spans() + # urllib3 suppressed under elasticsearch: es_span + test span = 2 + assert len(spans) == 2 + + def test_unit_to_string_es_multi_parameter(self) -> None: + """Covers empty-string → '_all' and list branches""" + from instana.instrumentation.elasticsearch import to_string_es_multi_parameter + + assert to_string_es_multi_parameter("") == "_all" + assert to_string_es_multi_parameter(["a", "b"]) == "a,b" + assert to_string_es_multi_parameter(None) is None + assert to_string_es_multi_parameter("hello") == "hello" + assert to_string_es_multi_parameter(42) == "42" + + def test_unit_detect_action_mapping_settings(self) -> None: + """Covers /_mapping and /_settings URL action detection""" + from instana.instrumentation.elasticsearch import detect_action_from_url + + assert ( + detect_action_from_url("PUT", "/my-index/_mapping") == "indices.putMapping" + ) + assert ( + detect_action_from_url("GET", "/my-index/_mapping") == "indices.getMapping" + ) + assert ( + detect_action_from_url("PUT", "/my-index/_settings") + == "indices.putSettings" + ) + assert ( + detect_action_from_url("GET", "/my-index/_settings") + == "indices.getSettings" + ) + + def test_unit_process_mget_params_type_field_ignored(self) -> None: + """_type field in docs is silently ignored (removed in ES 8.x+)""" + from instana.instrumentation.elasticsearch import process_mget_params + from unittest.mock import MagicMock + + mock_span = MagicMock() + process_mget_params( + mock_span, + body={ + "docs": [ + {"_index": "idx", "_type": "my_type", "_id": "1"}, + ] + }, + ) + # index and id should still be captured; type must not be emitted + mock_span.set_attribute.assert_any_call("elasticsearch.index", "idx") + mock_span.set_attribute.assert_any_call("elasticsearch.id", "1") + calls = [str(c) for c in mock_span.set_attribute.call_args_list] + assert not any("elasticsearch.type" in c for c in calls) + + def test_unit_process_msearch_params_empty_body(self) -> None: + """Covers process_msearch_params with None/empty body""" + from instana.instrumentation.elasticsearch import process_msearch_params + from unittest.mock import MagicMock + + mock_span = MagicMock() + process_msearch_params(mock_span, None) + mock_span.set_attribute.assert_not_called() + + def test_unit_process_msearch_params_bad_json_line(self) -> None: + """Covers json.JSONDecodeError continue branch in process_msearch_params""" + from instana.instrumentation.elasticsearch import process_msearch_params + from unittest.mock import MagicMock + + mock_span = MagicMock() + # Mix of valid and invalid JSON lines + ndjson = '{"index": "my-index"}\nNOT_JSON\n{"query": {"match_all": {}}}' + process_msearch_params(mock_span, ndjson) + # Should not raise; index should still be extracted from the valid header line + mock_span.set_attribute.assert_any_call("elasticsearch.index", "my-index") + + def test_unit_process_bulk_params_non_list_body(self) -> None: + """Covers the else/return branch when body is not str or list""" + from instana.instrumentation.elasticsearch import process_bulk_params + from unittest.mock import MagicMock + + mock_span = MagicMock() + process_bulk_params(mock_span, 12345) # int body → should return early + mock_span.set_attribute.assert_not_called() + + def test_unit_process_bulk_params_bad_json_line(self) -> None: + """Covers json.JSONDecodeError continue branch in process_bulk_params""" + import json as _json + from instana.instrumentation.elasticsearch import process_bulk_params + from unittest.mock import MagicMock + + mock_span = MagicMock() + ndjson = "\n".join([ + _json.dumps({"index": {"_index": "my-index", "_id": "1"}}), + "NOT_JSON", + _json.dumps({"field": "value"}), + ]) + process_bulk_params(mock_span, ndjson) + # Should not raise and should count the valid action line + mock_span.set_attribute.assert_any_call("elasticsearch.bulk.size", 1) + + def test_unit_collect_connection_info_no_connection_id(self) -> None: + """Covers the early-return when get_connection_id returns None""" + from instana.instrumentation.elasticsearch import collect_connection_info + from unittest.mock import MagicMock + + mock_span = MagicMock() + instance = MagicMock(spec=[]) # no 'transport' attribute + collect_connection_info(mock_span, instance) + mock_span.set_attribute.assert_not_called() + + def test_unit_discover_cluster_name_cached_ttl(self) -> None: + """Covers the cached cluster_name TTL-hit return path""" + import time + from instana.instrumentation.elasticsearch import ( + _connection_cache, + discover_cluster_name, + ) + from unittest.mock import MagicMock + + conn_id = "test-host:9999" + _connection_cache[conn_id] = { + "cluster_name": "my-cluster", + "last_updated": time.time(), + } + try: + instance = MagicMock() + result = discover_cluster_name(instance, conn_id) + assert result == "my-cluster" + # instance.info() should NOT have been called (cache hit) + instance.info.assert_not_called() + finally: + _connection_cache.pop(conn_id, None) + + def test_endpoint_attribute_set_on_span(self) -> None: + """elasticsearch.endpoint is set to the URL path for backend label fallback""" + from instana.instrumentation.elasticsearch import perform_request_with_instana + from unittest.mock import MagicMock, patch + + mock_response = MagicMock() + mock_response.meta.status = 200 + mock_response.body = {} + + mock_span = MagicMock() + mock_span.__enter__ = lambda s: mock_span + mock_span.__exit__ = MagicMock(return_value=False) + mock_tracer = MagicMock() + mock_tracer.start_as_current_span.return_value = mock_span + + with ( + patch("instana.instrumentation.elasticsearch.get_tracer_tuple") as mock_gt, + patch("instana.instrumentation.elasticsearch.get_current"), + patch("instana.instrumentation.elasticsearch.collect_connection_info"), + patch("instana.instrumentation.elasticsearch.extract_params_from_request"), + patch("instana.instrumentation.elasticsearch.extract_response_metadata"), + ): + mock_gt.return_value = (mock_tracer, None, None) + wrapped = MagicMock(return_value=mock_response) + perform_request_with_instana( + wrapped, MagicMock(), ("GET", "/my-index/_search"), {} + ) + + mock_span.set_attribute.assert_any_call( + "elasticsearch.endpoint", "/my-index/_search" + ) + mock_span.set_attribute.assert_any_call( + "elasticsearch.url", "/my-index/_search" + ) + + def test_cluster_fallback_not_set_when_cluster_absent(self) -> None: + """When cluster name cannot be discovered, elasticsearch.cluster must NOT be set + (backend uses address+port for destination resolution instead)""" + from instana.instrumentation.elasticsearch import collect_connection_info + from unittest.mock import MagicMock, patch + + mock_span = MagicMock() + + mock_cfg = MagicMock() + mock_cfg.host = "localhost" + mock_cfg.port = 9200 + + mock_node = MagicMock() + mock_node.config = mock_cfg + + mock_pool = MagicMock() + mock_pool.all.return_value = [mock_node] + + mock_transport = MagicMock() + mock_transport.node_pool = mock_pool + + mock_instance = MagicMock() + mock_instance.transport = mock_transport + + with patch( + "instana.instrumentation.elasticsearch.discover_cluster_name", + return_value=None, + ): + collect_connection_info(mock_span, mock_instance) + + calls = [str(c) for c in mock_span.set_attribute.call_args_list] + assert any("elasticsearch.address" in c for c in calls) + assert any("elasticsearch.port" in c for c in calls) + # cluster must NOT be set when discovery fails + assert not any("elasticsearch.cluster" in c for c in calls) + + def test_port_is_integer(self) -> None: + """elasticsearch.port must be sent as integer, not string""" + from instana.instrumentation.elasticsearch import collect_connection_info + from unittest.mock import MagicMock, patch + + mock_span = MagicMock() + + mock_cfg = MagicMock() + mock_cfg.host = "localhost" + mock_cfg.port = 9200 + + mock_node = MagicMock() + mock_node.config = mock_cfg + + mock_pool = MagicMock() + mock_pool.all.return_value = [mock_node] + + mock_transport = MagicMock() + mock_transport.node_pool = mock_pool + + mock_instance = MagicMock() + mock_instance.transport = mock_transport + + with patch( + "instana.instrumentation.elasticsearch.discover_cluster_name", + return_value=None, + ): + collect_connection_info(mock_span, mock_instance) + + port_calls = [ + c + for c in mock_span.set_attribute.call_args_list + if "elasticsearch.port" in str(c) + ] + assert len(port_calls) == 1 + _, port_value = port_calls[0].args + assert isinstance(port_value, int), ( + f"port should be int, got {type(port_value)}" + ) + assert port_value == 9200 + + def test_msearch_hits_zero_is_recorded(self) -> None: + """elasticsearch.hits must be set even when total_hits == 0""" + from instana.instrumentation.elasticsearch import extract_response_metadata + from unittest.mock import MagicMock + + mock_span = MagicMock() + mock_response = MagicMock() + mock_response.body = { + "responses": [ + {"hits": {"total": {"value": 0}, "hits": []}}, + ] + } + extract_response_metadata(mock_span, mock_response) + mock_span.set_attribute.assert_any_call("elasticsearch.hits", 0) + + +@pytest.mark.skipif( + not elasticsearch_available, reason="elasticsearch-py not installed" +) +class TestElasticsearchAsync: + """Unit tests for async Elasticsearch instrumentation (mock-only, no live server).""" + + def test_async_wrapper_is_registered(self) -> None: + """async_perform_request_with_instana must be importable after module load""" + from instana.instrumentation.elasticsearch import ( + async_perform_request_with_instana, + ) + import inspect + + assert inspect.iscoroutinefunction(async_perform_request_with_instana) + + def test_async_collect_connection_info_is_coroutine(self) -> None: + """_async_collect_connection_info must be a coroutine function""" + from instana.instrumentation.elasticsearch import _async_collect_connection_info + import inspect + + assert inspect.iscoroutinefunction(_async_collect_connection_info) + + def test_async_discover_cluster_name_is_coroutine(self) -> None: + """_async_discover_cluster_name must be a coroutine function""" + from instana.instrumentation.elasticsearch import _async_discover_cluster_name + import inspect + + assert inspect.iscoroutinefunction(_async_discover_cluster_name) + + def test_async_discover_cluster_name_cache_hit(self) -> None: + """Returns cached cluster name without calling instance.info()""" + import asyncio + import time + from instana.instrumentation.elasticsearch import ( + _connection_cache, + _async_discover_cluster_name, + ) + from unittest.mock import AsyncMock, MagicMock + + conn_id = "async-host:9200" + _connection_cache[conn_id] = { + "cluster_name": "async-cluster", + "last_updated": time.time(), + } + try: + instance = MagicMock() + instance.info = AsyncMock() + result = asyncio.run(_async_discover_cluster_name(instance, conn_id)) + assert result == "async-cluster" + instance.info.assert_not_called() + finally: + _connection_cache.pop(conn_id, None) + + def test_async_discover_cluster_name_live_call(self) -> None: + """Calls instance.info() and extracts cluster_name from body""" + import asyncio + from instana.instrumentation.elasticsearch import ( + _connection_cache, + _async_discover_cluster_name, + ) + from unittest.mock import AsyncMock, MagicMock + + conn_id = "async-host:9201" + _connection_cache.pop(conn_id, None) + + mock_info_response = MagicMock() + mock_info_response.body = {"cluster_name": "live-cluster", "version": {}} + + instance = MagicMock() + instance.info = AsyncMock(return_value=mock_info_response) + + try: + result = asyncio.run(_async_discover_cluster_name(instance, conn_id)) + assert result == "live-cluster" + assert _connection_cache[conn_id]["cluster_name"] == "live-cluster" + finally: + _connection_cache.pop(conn_id, None) + + def test_async_collect_connection_info_cache_hit(self) -> None: + """Uses cached host/port/cluster when available""" + import asyncio + from instana.instrumentation.elasticsearch import ( + _connection_cache, + _async_collect_connection_info, + ) + from unittest.mock import MagicMock + + conn_id = "cached-host:9200" + _connection_cache[conn_id] = { + "host": "cached-host", + "port": 9200, + "cluster_name": "cached-cluster", + } + try: + mock_span = MagicMock() + + mock_cfg = MagicMock() + mock_cfg.host = "cached-host" + mock_cfg.port = 9200 + mock_node = MagicMock() + mock_node.config = mock_cfg + mock_pool = MagicMock() + mock_pool.all.return_value = [mock_node] + mock_transport = MagicMock() + mock_transport.node_pool = mock_pool + instance = MagicMock() + instance.transport = mock_transport + + asyncio.run(_async_collect_connection_info(mock_span, instance)) + + mock_span.set_attribute.assert_any_call( + "elasticsearch.address", "cached-host" + ) + mock_span.set_attribute.assert_any_call("elasticsearch.port", 9200) + mock_span.set_attribute.assert_any_call( + "elasticsearch.cluster", "cached-cluster" + ) + finally: + _connection_cache.pop(conn_id, None) + + def test_async_cluster_fallback_not_set_when_cluster_absent(self) -> None: + """cluster must NOT be set when async discovery fails""" + import asyncio + from instana.instrumentation.elasticsearch import _async_collect_connection_info + from unittest.mock import AsyncMock, MagicMock, patch + + mock_span = MagicMock() + + mock_cfg = MagicMock() + mock_cfg.host = "localhost" + mock_cfg.port = 9200 + mock_node = MagicMock() + mock_node.config = mock_cfg + mock_pool = MagicMock() + mock_pool.all.return_value = [mock_node] + mock_transport = MagicMock() + mock_transport.node_pool = mock_pool + instance = MagicMock() + instance.transport = mock_transport + + with patch( + "instana.instrumentation.elasticsearch._async_discover_cluster_name", + new=AsyncMock(return_value=None), + ): + asyncio.run(_async_collect_connection_info(mock_span, instance)) + + calls = [str(c) for c in mock_span.set_attribute.call_args_list] + assert any("elasticsearch.address" in c for c in calls) + assert any("elasticsearch.port" in c for c in calls) + assert not any("elasticsearch.cluster" in c for c in calls) + + def test_async_perform_request_no_tracer(self) -> None: + """Returns bare await when tracer is unavailable""" + import asyncio + from instana.instrumentation.elasticsearch import ( + async_perform_request_with_instana, + ) + from unittest.mock import AsyncMock, MagicMock, patch + + expected = MagicMock() + wrapped = AsyncMock(return_value=expected) + + with patch( + "instana.instrumentation.elasticsearch.get_tracer_tuple", + return_value=(None, None, None), + ): + result = asyncio.run( + async_perform_request_with_instana( + wrapped, MagicMock(), ("GET", "/test"), {} + ) + ) + + assert result is expected + wrapped.assert_awaited_once() + + def test_async_perform_request_recursive_guard(self) -> None: + """Skips instrumentation when span_name is 'elasticsearch'""" + import asyncio + from instana.instrumentation.elasticsearch import ( + async_perform_request_with_instana, + ) + from unittest.mock import AsyncMock, MagicMock, patch + + expected = MagicMock() + wrapped = AsyncMock(return_value=expected) + + with patch( + "instana.instrumentation.elasticsearch.get_tracer_tuple", + return_value=(MagicMock(), None, "elasticsearch"), + ): + result = asyncio.run( + async_perform_request_with_instana( + wrapped, MagicMock(), ("GET", "/test"), {} + ) + ) + + assert result is expected + wrapped.assert_awaited_once() + + def test_async_perform_request_creates_span(self) -> None: + """Full happy-path: span created, endpoint/url set, response returned""" + import asyncio + from instana.instrumentation.elasticsearch import ( + async_perform_request_with_instana, + ) + from unittest.mock import AsyncMock, MagicMock, patch + + mock_response = MagicMock() + mock_response.meta.status = 200 + mock_response.body = {} + + mock_span = MagicMock() + mock_span.__enter__ = lambda s: mock_span + mock_span.__exit__ = MagicMock(return_value=False) + mock_tracer = MagicMock() + mock_tracer.start_as_current_span.return_value = mock_span + + with ( + patch( + "instana.instrumentation.elasticsearch.get_tracer_tuple", + return_value=(mock_tracer, None, None), + ), + patch("instana.instrumentation.elasticsearch.get_current"), + patch( + "instana.instrumentation.elasticsearch._async_collect_connection_info", + new=AsyncMock(), + ), + patch("instana.instrumentation.elasticsearch.extract_params_from_request"), + patch("instana.instrumentation.elasticsearch.extract_response_metadata"), + ): + wrapped = AsyncMock(return_value=mock_response) + result = asyncio.run( + async_perform_request_with_instana( + wrapped, MagicMock(), ("GET", "/my-index/_search"), {} + ) + ) + + assert result is mock_response + mock_span.set_attribute.assert_any_call( + "elasticsearch.endpoint", "/my-index/_search" + ) + mock_span.set_attribute.assert_any_call( + "elasticsearch.url", "/my-index/_search" + ) + + def test_async_perform_request_500_sets_error(self) -> None: + """HTTP 5xx response sets elasticsearch.error on the span""" + import asyncio + from instana.instrumentation.elasticsearch import ( + async_perform_request_with_instana, + ) + from unittest.mock import AsyncMock, MagicMock, patch + + mock_response = MagicMock() + mock_response.meta.status = 503 + mock_response.body = {} + + mock_span = MagicMock() + mock_span.__enter__ = lambda s: mock_span + mock_span.__exit__ = MagicMock(return_value=False) + mock_tracer = MagicMock() + mock_tracer.start_as_current_span.return_value = mock_span + + with ( + patch( + "instana.instrumentation.elasticsearch.get_tracer_tuple", + return_value=(mock_tracer, None, None), + ), + patch("instana.instrumentation.elasticsearch.get_current"), + patch( + "instana.instrumentation.elasticsearch._async_collect_connection_info", + new=AsyncMock(), + ), + patch("instana.instrumentation.elasticsearch.extract_params_from_request"), + patch("instana.instrumentation.elasticsearch.extract_response_metadata"), + ): + wrapped = AsyncMock(return_value=mock_response) + asyncio.run( + async_perform_request_with_instana( + wrapped, MagicMock(), ("GET", "/test"), {} + ) + ) + + mock_span.set_attribute.assert_any_call("elasticsearch.error", "HTTP 503") + + def test_async_perform_request_exception_recorded(self) -> None: + """Exception raised by wrapped call is recorded on the span and re-raised""" + import asyncio + from instana.instrumentation.elasticsearch import ( + async_perform_request_with_instana, + ) + from unittest.mock import AsyncMock, MagicMock, patch + + mock_span = MagicMock() + mock_span.__enter__ = lambda s: mock_span + mock_span.__exit__ = MagicMock(return_value=False) + mock_tracer = MagicMock() + mock_tracer.start_as_current_span.return_value = mock_span + + boom = RuntimeError("connection refused") + + with ( + patch( + "instana.instrumentation.elasticsearch.get_tracer_tuple", + return_value=(mock_tracer, None, None), + ), + patch("instana.instrumentation.elasticsearch.get_current"), + patch( + "instana.instrumentation.elasticsearch._async_collect_connection_info", + new=AsyncMock(), + ), + patch("instana.instrumentation.elasticsearch.extract_params_from_request"), + ): + wrapped = AsyncMock(side_effect=boom) + with pytest.raises(RuntimeError, match="connection refused"): + asyncio.run( + async_perform_request_with_instana( + wrapped, MagicMock(), ("GET", "/test"), {} + ) + ) + + mock_span.record_exception.assert_called_once_with(boom) + mock_span.set_attribute.assert_any_call( + "elasticsearch.error", "connection refused" + ) + + +# Made with Bob diff --git a/tests/requirements.txt b/tests/requirements.txt index 65948983..4dc2717e 100644 --- a/tests/requirements.txt +++ b/tests/requirements.txt @@ -25,6 +25,7 @@ protobuf<=6.33.4 pymongo>=3.11.4 pyramid>=2.0.1 pytz>=2024.1 +elasticsearch>=8.0.0 redis>=3.5.3 requests-mock responses<=0.17.0