Skip to content

Commit 42e5f51

Browse files
authored
Merge branch 'main' into taegyunkim/prof-12434-uwsgi
2 parents 3569721 + 60bf567 commit 42e5f51

File tree

175 files changed

+5142
-4942
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

175 files changed

+5142
-4942
lines changed

benchmarks/django_simple/scenario.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -69,17 +69,17 @@ def _(loops):
6969
from ddtrace.contrib.internal.django import database
7070

7171
try:
72-
database.get_conn_config.invalidate()
72+
database.get_conn_config.cache_clear()
7373
except Exception:
7474
pass
7575

7676
try:
77-
database.get_service_name.invalidate()
77+
database.get_service_name.cache_clear()
7878
except Exception:
7979
pass
8080

8181
try:
82-
database.get_conn_service_name.invalidate()
82+
database.get_conn_service_name.cache_clear()
8383
except Exception:
8484
pass
8585
except Exception:
@@ -90,12 +90,12 @@ def _(loops):
9090
from ddtrace.contrib.internal.django import cache
9191

9292
try:
93-
cache.get_service_name.invalidate()
93+
cache.get_service_name.cache_clear()
9494
except Exception:
9595
pass
9696

9797
try:
98-
cache.func_cache_operation.invalidate()
98+
cache.func_cache_operation.cache_clear()
9999
except Exception:
100100
pass
101101
except Exception:

ddtrace/appsec/_utils.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -367,15 +367,20 @@ def unpatching_popen():
367367
Context manager to temporarily unpatch `subprocess.Popen` for testing purposes.
368368
This is useful to ensure that the original `Popen` behavior is restored after the context.
369369
"""
370+
import os
370371
import subprocess # nosec B404
371372

373+
from ddtrace.internal._unpatched import unpatched_close
372374
from ddtrace.internal._unpatched import unpatched_Popen
373375

376+
original_os_close = os.close
377+
os.close = unpatched_close
374378
original_popen = subprocess.Popen
375379
subprocess.Popen = unpatched_Popen
376380
asm_config._bypass_instrumentation_for_waf = True
377381
try:
378382
yield
379383
finally:
380384
subprocess.Popen = original_popen
385+
os.close = original_os_close
381386
asm_config._bypass_instrumentation_for_waf = False

ddtrace/contrib/internal/django/middleware.py

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from inspect import iscoroutinefunction
12
from inspect import isfunction
23
from types import FunctionType
34
from typing import Any
@@ -134,17 +135,42 @@ def traced_auth_middleware_process_request(func: FunctionType, args: Tuple[Any],
134135
def traced_middleware_factory(func: FunctionType, args: Tuple[Any], kwargs: Dict[str, Any]) -> Any:
135136
middleware = func(*args, **kwargs)
136137

137-
if isfunction(middleware):
138-
if hasattr(func, "__module__") and hasattr(func, "__qualname__"):
139-
resource = f"{func.__module__}.{func.__qualname__}"
140-
else:
141-
resource = func_name(func)
138+
if not isfunction(middleware):
139+
return middleware
140+
141+
if hasattr(func, "__module__") and hasattr(func, "__qualname__"):
142+
resource = f"{func.__module__}.{func.__qualname__}"
143+
else:
144+
resource = func_name(func)
145+
146+
if iscoroutinefunction(middleware):
147+
# Handle async middleware - create async wrapper
148+
async def traced_async_middleware_func(*args, **kwargs):
149+
# The first argument for all middleware is the request object
150+
# DEV: Do `optional=true` to avoid raising an error for middleware that don't follow the convention
151+
# DEV: This is a function, so no `self` argument, so request is at position 0
152+
request = get_argument_value(args, kwargs, 0, "request", optional=True)
153+
154+
with core.context_with_data(
155+
"django.middleware.func",
156+
span_name="django.middleware",
157+
resource=resource,
158+
tags={
159+
COMPONENT: config_django.integration_name,
160+
},
161+
tracer=config_django._tracer,
162+
request=request,
163+
):
164+
return await middleware(*args, **kwargs)
142165

166+
return traced_async_middleware_func
167+
else:
168+
# Handle sync middleware - use original wrapping approach
143169
def traced_middleware_func(func: FunctionType, args: Tuple[Any], kwargs: Dict[str, Any]) -> Any:
144170
# The first argument for all middleware is the request object
145171
# DEV: Do `optional=true` to avoid raising an error for middleware that don't follow the convention
146172
# DEV: This is a function, so no `self` argument, so request is at position 0
147-
request = get_argument_value(args, kwargs, 0, "request")
173+
request = get_argument_value(args, kwargs, 0, "request", optional=True)
148174

149175
with core.context_with_data(
150176
"django.middleware.func",

ddtrace/internal/_unpatched.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
previous_loaded_modules = frozenset(sys.modules.keys())
1717
from subprocess import Popen as unpatched_Popen # noqa # nosec B404
18+
from os import close as unpatched_close # noqa: F401, E402
1819

1920
loaded_modules = frozenset(sys.modules.keys())
2021
for module in previous_loaded_modules - loaded_modules:

ddtrace/internal/utils/cache.py

Lines changed: 7 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
1+
from functools import lru_cache
12
from functools import wraps
23
from inspect import FullArgSpec
34
from inspect import getfullargspec
45
from inspect import isgeneratorfunction
5-
from threading import RLock
66
from typing import Any # noqa:F401
77
from typing import Callable # noqa:F401
88
from typing import Optional # noqa:F401
99
from typing import Type # noqa:F401
10-
from typing import TypeVar # noqa:F401
10+
from typing import TypeVar
1111

1212

1313
miss = object()
@@ -17,78 +17,14 @@
1717
M = Callable[[Any, T], Any]
1818

1919

20-
class LFUCache(dict):
21-
"""Simple LFU cache implementation.
20+
def cached(maxsize: int = 256) -> Callable[[Callable], Callable]:
21+
def _(f: Callable) -> Callable:
22+
return lru_cache(maxsize)(f)
2223

23-
This cache is designed for memoizing functions with a single hashable
24-
argument. The eviction policy is LFU, i.e. the least frequently used values
25-
are evicted when the cache is full. The amortized cost of shrinking the
26-
cache when it grows beyond the requested size is O(log(size)).
27-
"""
28-
29-
def __init__(self, maxsize=256):
30-
# type: (int) -> None
31-
self.maxsize = maxsize
32-
self.lock = RLock()
33-
self.count_lock = RLock()
34-
35-
def get(self, key, f): # type: ignore[override]
36-
# type: (T, F) -> Any
37-
"""Get a value from the cache.
38-
39-
If the value with the given key is not in the cache, the expensive
40-
function ``f`` is called on the key to generate it. The return value is
41-
then stored in the cache and returned to the caller.
42-
"""
43-
44-
_ = super(LFUCache, self).get(key, miss)
45-
if _ is not miss:
46-
with self.count_lock:
47-
value, count = _
48-
self[key] = (value, count + 1)
49-
return value
50-
51-
with self.lock:
52-
_ = super(LFUCache, self).get(key, miss)
53-
if _ is not miss:
54-
with self.count_lock:
55-
value, count = _
56-
self[key] = (value, count + 1)
57-
return value
58-
59-
# Cache miss: ensure that we have enough space in the cache
60-
# by evicting half of the entries when we go over the threshold
61-
while len(self) >= self.maxsize:
62-
for h in sorted(self, key=lambda h: self[h][1])[: self.maxsize >> 1]:
63-
del self[h]
64-
65-
value = f(key)
66-
67-
self[key] = (value, 1)
68-
69-
return value
70-
71-
72-
def cached(maxsize=256):
73-
# type: (int) -> Callable[[F], F]
74-
"""Decorator for memoizing functions of a single argument (LFU policy)."""
75-
76-
def cached_wrapper(f):
77-
# type: (F) -> F
78-
cache = LFUCache(maxsize)
79-
80-
def cached_f(key):
81-
# type: (T) -> Any
82-
return cache.get(key, f)
83-
84-
cached_f.invalidate = cache.clear # type: ignore[attr-defined]
85-
86-
return cached_f
87-
88-
return cached_wrapper
24+
return _
8925

9026

91-
class CachedMethodDescriptor(object):
27+
class CachedMethodDescriptor:
9228
def __init__(self, method, maxsize):
9329
# type: (M, int) -> None
9430
self._method = method

ddtrace/llmobs/_experiment.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,11 @@
4444
DatasetRecordInputType = Dict[str, NonNoneJSONType]
4545

4646

47+
class Project(TypedDict):
48+
name: str
49+
_id: str
50+
51+
4752
class DatasetRecordRaw(TypedDict):
4853
input_data: DatasetRecordInputType
4954
expected_output: JSONType
@@ -106,13 +111,15 @@ class Dataset:
106111
def __init__(
107112
self,
108113
name: str,
114+
project: Project,
109115
dataset_id: str,
110116
records: List[DatasetRecord],
111117
description: str,
112118
version: int,
113119
_dne_client: "LLMObsExperimentsClient",
114120
) -> None:
115121
self.name = name
122+
self.project = project
116123
self.description = description
117124
self._id = dataset_id
118125
self._version = version
@@ -335,8 +342,8 @@ def run(
335342
)
336343
return []
337344

338-
project_id = self._llmobs_instance._dne_client.project_create_or_get(self._project_name)
339-
self._project_id = project_id
345+
project = self._llmobs_instance._dne_client.project_create_or_get(self._project_name)
346+
self._project_id = project.get("_id", "")
340347

341348
experiment_id, experiment_run_name = self._llmobs_instance._dne_client.experiment_create(
342349
self.name,
@@ -416,6 +423,7 @@ def _run_task(self, jobs: int, raise_errors: bool = False, sample_size: Optional
416423
subset_name = "[Test subset of {} records] {}".format(sample_size, self._dataset.name)
417424
subset_dataset = Dataset(
418425
name=subset_name,
426+
project=self._dataset.project,
419427
dataset_id=self._dataset._id,
420428
records=subset_records,
421429
description=self._dataset.description,

ddtrace/llmobs/_llmobs.py

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@
9292
from ddtrace.llmobs._experiment import Experiment
9393
from ddtrace.llmobs._experiment import ExperimentConfigType
9494
from ddtrace.llmobs._experiment import JSONType
95+
from ddtrace.llmobs._experiment import Project
9596
from ddtrace.llmobs._utils import AnnotationContext
9697
from ddtrace.llmobs._utils import LinkTracker
9798
from ddtrace.llmobs._utils import _get_ml_app
@@ -212,6 +213,7 @@ def __init__(
212213
interval=float(os.getenv("_DD_LLMOBS_WRITER_INTERVAL", 1.0)),
213214
timeout=float(os.getenv("_DD_LLMOBS_WRITER_TIMEOUT", 5.0)),
214215
_app_key=self._app_key,
216+
_default_project=Project(name=self._project_name, _id=""),
215217
is_agentless=True, # agent proxy doesn't seem to work for experiments
216218
)
217219

@@ -645,15 +647,21 @@ def enable(
645647
)
646648

647649
@classmethod
648-
def pull_dataset(cls, name: str) -> Dataset:
649-
ds = cls._instance._dne_client.dataset_get_with_records(name)
650+
def pull_dataset(cls, dataset_name: str, project_name: Optional[str] = None) -> Dataset:
651+
ds = cls._instance._dne_client.dataset_get_with_records(dataset_name, (project_name or cls._project_name))
650652
return ds
651653

652654
@classmethod
653-
def create_dataset(cls, name: str, description: str = "", records: Optional[List[DatasetRecord]] = None) -> Dataset:
655+
def create_dataset(
656+
cls,
657+
dataset_name: str,
658+
project_name: Optional[str] = None,
659+
description: str = "",
660+
records: Optional[List[DatasetRecord]] = None,
661+
) -> Dataset:
654662
if records is None:
655663
records = []
656-
ds = cls._instance._dne_client.dataset_create(name, description)
664+
ds = cls._instance._dne_client.dataset_create(dataset_name, project_name, description)
657665
for r in records:
658666
ds.append(r)
659667
if len(records) > 0:
@@ -669,19 +677,20 @@ def create_dataset_from_csv(
669677
expected_output_columns: Optional[List[str]] = None,
670678
metadata_columns: Optional[List[str]] = None,
671679
csv_delimiter: str = ",",
672-
description="",
680+
description: str = "",
681+
project_name: Optional[str] = None,
673682
) -> Dataset:
674683
if expected_output_columns is None:
675684
expected_output_columns = []
676685
if metadata_columns is None:
677686
metadata_columns = []
678-
ds = cls._instance._dne_client.dataset_create(dataset_name, description)
679687

680688
# Store the original field size limit to restore it later
681689
original_field_size_limit = csv.field_size_limit()
682690

683691
csv.field_size_limit(EXPERIMENT_CSV_FIELD_MAX_SIZE) # 10mb
684692

693+
records = []
685694
try:
686695
with open(csv_path, mode="r") as csvfile:
687696
content = csvfile.readline().strip()
@@ -708,7 +717,7 @@ def create_dataset_from_csv(
708717
raise ValueError(f"Metadata columns not found in CSV header: {missing_metadata_columns}")
709718

710719
for row in rows:
711-
ds.append(
720+
records.append(
712721
DatasetRecord(
713722
input_data={col: row[col] for col in input_data_columns},
714723
expected_output={col: row[col] for col in expected_output_columns},
@@ -721,6 +730,9 @@ def create_dataset_from_csv(
721730
# Always restore the original field size limit
722731
csv.field_size_limit(original_field_size_limit)
723732

733+
ds = cls._instance._dne_client.dataset_create(dataset_name, project_name, description)
734+
for r in records:
735+
ds.append(r)
724736
if len(ds) > 0:
725737
cls._instance._dne_client.dataset_bulk_upload(ds._id, ds._records)
726738
return ds

0 commit comments

Comments
 (0)