1
+ from dataclasses import dataclass
1
2
from enum import Enum
2
3
from typing import Any
3
4
from typing import Optional
7
8
from ddtrace .debugging ._config import di_config
8
9
from ddtrace .debugging ._encoding import LogSignalJsonEncoder
9
10
from ddtrace .debugging ._encoding import SignalQueue
11
+ from ddtrace .debugging ._encoding import SnapshotJsonEncoder
10
12
from ddtrace .debugging ._metrics import metrics
11
13
from ddtrace .debugging ._signal .collector import SignalCollector
14
+ from ddtrace .debugging ._signal .model import SignalTrack
15
+ from ddtrace .internal import agent
12
16
from ddtrace .internal .logger import get_logger
13
17
from ddtrace .internal .periodic import ForksafeAwakeablePeriodicService
14
18
from ddtrace .internal .utils .http import connector
@@ -27,6 +31,12 @@ class UploaderProduct(str, Enum):
27
31
CODE_ORIGIN_SPAN = "code_origin.span"
28
32
29
33
34
+ @dataclass
35
+ class UploaderTrack :
36
+ endpoint : str
37
+ queue : SignalQueue
38
+
39
+
30
40
class LogsIntakeUploaderV1 (ForksafeAwakeablePeriodicService ):
31
41
"""Logs intake uploader.
32
42
@@ -36,26 +46,48 @@ class LogsIntakeUploaderV1(ForksafeAwakeablePeriodicService):
36
46
37
47
_instance : Optional ["LogsIntakeUploaderV1" ] = None
38
48
_products : Set [UploaderProduct ] = set ()
49
+ _agent_endpoints : Set [str ] = set ()
39
50
40
51
__queue__ = SignalQueue
41
52
__collector__ = SignalCollector
42
53
43
- ENDPOINT = di_config ._intake_endpoint
44
-
45
54
RETRY_ATTEMPTS = 3
46
55
47
56
def __init__ (self , interval : Optional [float ] = None ) -> None :
48
57
super ().__init__ (interval if interval is not None else di_config .upload_interval_seconds )
49
58
50
- self ._queue = self .__queue__ (encoder = LogSignalJsonEncoder (di_config .service_name ), on_full = self ._on_buffer_full )
51
- self ._collector = self .__collector__ (self ._queue )
59
+ endpoint_suffix = f"?ddtags={ quote (di_config .tags )} " if di_config ._tags_in_qs and di_config .tags else ""
60
+ if not self ._agent_endpoints :
61
+ try :
62
+ agent_info = agent .info ()
63
+ self ._agent_endpoints = set (agent_info .get ("endpoints" , [])) if agent_info is not None else set ()
64
+ except Exception :
65
+ pass # nosec B110
66
+
67
+ snapshot_track = "/debugger/v1/input"
68
+ if "/debugger/v2/input" in self ._agent_endpoints :
69
+ snapshot_track = "/debugger/v2/input"
70
+ elif "/debugger/v1/diagnostics" in self ._agent_endpoints :
71
+ snapshot_track = "/debugger/v1/diagnostics"
72
+
73
+ self ._tracks = {
74
+ SignalTrack .LOGS : UploaderTrack (
75
+ endpoint = f"/debugger/v1/input{ endpoint_suffix } " ,
76
+ queue = self .__queue__ (
77
+ encoder = LogSignalJsonEncoder (di_config .service_name ), on_full = self ._on_buffer_full
78
+ ),
79
+ ),
80
+ SignalTrack .SNAPSHOT : UploaderTrack (
81
+ endpoint = f"{ snapshot_track } { endpoint_suffix } " ,
82
+ queue = self .__queue__ (encoder = SnapshotJsonEncoder (di_config .service_name ), on_full = self ._on_buffer_full ),
83
+ ),
84
+ }
85
+ self ._collector = self .__collector__ ({t : ut .queue for t , ut in self ._tracks .items ()})
52
86
self ._headers = {
53
87
"Content-type" : "application/json; charset=utf-8" ,
54
88
"Accept" : "text/plain" ,
55
89
}
56
90
57
- if di_config ._tags_in_qs and di_config .tags :
58
- self .ENDPOINT += f"?ddtags={ quote (di_config .tags )} "
59
91
self ._connect = connector (di_config ._intake_url , timeout = di_config .upload_timeout )
60
92
61
93
# Make it retry-able
@@ -65,33 +97,31 @@ def __init__(self, interval: Optional[float] = None) -> None:
65
97
)(self ._write )
66
98
67
99
log .debug (
68
- "Logs intake uploader initialized (url: %s, endpoint : %s, interval: %f)" ,
100
+ "Logs intake uploader initialized (url: %s, endpoints : %s, interval: %f)" ,
69
101
di_config ._intake_url ,
70
- self .ENDPOINT ,
102
+ { t : ut . endpoint for t , ut in self ._tracks . items ()} ,
71
103
self .interval ,
72
104
)
73
105
74
- def _write (self , payload : bytes ) -> None :
106
+ self ._flush_full = False
107
+
108
+ def _write (self , payload : bytes , endpoint : str ) -> None :
75
109
try :
76
110
with self ._connect () as conn :
77
- conn .request (
78
- "POST" ,
79
- self .ENDPOINT ,
80
- payload ,
81
- headers = self ._headers ,
82
- )
111
+ conn .request ("POST" , endpoint , payload , headers = self ._headers )
83
112
resp = conn .getresponse ()
84
113
if not (200 <= resp .status < 300 ):
85
- log .error ("Failed to upload payload: [%d] %r" , resp .status , resp .read ())
114
+ log .error ("Failed to upload payload to endpoint %s : [%d] %r" , endpoint , resp .status , resp .read ())
86
115
meter .increment ("upload.error" , tags = {"status" : str (resp .status )})
87
116
else :
88
117
meter .increment ("upload.success" )
89
118
meter .distribution ("upload.size" , len (payload ))
90
119
except Exception :
91
- log .error ("Failed to write payload" , exc_info = True )
120
+ log .error ("Failed to write payload to endpoint %s" , endpoint , exc_info = True )
92
121
meter .increment ("error" )
93
122
94
123
def _on_buffer_full (self , _item : Any , _encoded : bytes ) -> None :
124
+ self ._flush_full = True
95
125
self .upload ()
96
126
97
127
def upload (self ) -> None :
@@ -100,20 +130,32 @@ def upload(self) -> None:
100
130
101
131
def reset (self ) -> None :
102
132
"""Reset the buffer on fork."""
103
- self ._queue = self .__queue__ (encoder = self ._queue ._encoder , on_full = self ._on_buffer_full )
104
- self ._collector ._encoder = self ._queue
133
+ for track in self ._tracks .values ():
134
+ track .queue = self .__queue__ (encoder = track .queue ._encoder , on_full = self ._on_buffer_full )
135
+ self ._collector ._tracks = {t : ut .queue for t , ut in self ._tracks .items ()}
136
+
137
+ def _flush_track (self , track : UploaderTrack ) -> None :
138
+ queue = track .queue
139
+ payload = queue .flush ()
140
+ if payload is not None :
141
+ try :
142
+ self ._write_with_backoff (payload , track .endpoint )
143
+ meter .distribution ("batch.cardinality" , queue .count )
144
+ except Exception :
145
+ log .debug ("Cannot upload logs payload" , exc_info = True )
105
146
106
147
def periodic (self ) -> None :
107
148
"""Upload the buffer content to the logs intake."""
108
- count = self ._queue .count
109
- if count :
110
- payload = self ._queue .flush ()
111
- if payload is not None :
112
- try :
113
- self ._write_with_backoff (payload )
114
- meter .distribution ("batch.cardinality" , count )
115
- except Exception :
116
- log .debug ("Cannot upload logs payload" , exc_info = True )
149
+ if self ._flush_full :
150
+ # We received the signal to flush a full buffer
151
+ self ._flush_full = False
152
+ for track in self ._tracks .values ():
153
+ if track .queue .is_full ():
154
+ self ._flush_track (track )
155
+
156
+ for track in self ._tracks .values ():
157
+ if track .queue .count :
158
+ self ._flush_track (track )
117
159
118
160
on_shutdown = periodic
119
161
0 commit comments