1
+ from __future__ import annotations
1
2
import os
2
3
import random
3
4
import threading
@@ -15,23 +16,18 @@ class LogBatcher:
15
16
MAX_LOGS_BEFORE_FLUSH = 100
16
17
FLUSH_WAIT_TIME = 5.0
17
18
18
- def __init__ (
19
- self ,
20
- capture_func , # type: Callable[[Envelope], None]
21
- ):
22
- # type: (...) -> None
23
- self ._log_buffer = [] # type: List[Log]
19
+ def __init__ (self , capture_func : Callable [[Envelope ], None ]) -> None :
20
+ self ._log_buffer : List [Log ] = []
24
21
self ._capture_func = capture_func
25
22
self ._running = True
26
23
self ._lock = threading .Lock ()
27
24
28
- self ._flush_event = threading .Event () # type: threading.Event
25
+ self ._flush_event : threading .Event = threading .Event ()
29
26
30
- self ._flusher = None # type : Optional[threading.Thread]
31
- self ._flusher_pid = None # type : Optional[int]
27
+ self ._flusher : Optional [threading .Thread ] = None
28
+ self ._flusher_pid : Optional [int ] = None
32
29
33
- def _ensure_thread (self ):
34
- # type: (...) -> bool
30
+ def _ensure_thread (self ) -> bool :
35
31
"""For forking processes we might need to restart this thread.
36
32
This ensures that our process actually has that thread running.
37
33
"""
@@ -63,18 +59,13 @@ def _ensure_thread(self):
63
59
64
60
return True
65
61
66
- def _flush_loop (self ):
67
- # type: (...) -> None
62
+ def _flush_loop (self ) -> None :
68
63
while self ._running :
69
64
self ._flush_event .wait (self .FLUSH_WAIT_TIME + random .random ())
70
65
self ._flush_event .clear ()
71
66
self ._flush ()
72
67
73
- def add (
74
- self ,
75
- log , # type: Log
76
- ):
77
- # type: (...) -> None
68
+ def add (self , log : Log ) -> None :
78
69
if not self ._ensure_thread () or self ._flusher is None :
79
70
return None
80
71
@@ -83,24 +74,20 @@ def add(
83
74
if len (self ._log_buffer ) >= self .MAX_LOGS_BEFORE_FLUSH :
84
75
self ._flush_event .set ()
85
76
86
- def kill (self ):
87
- # type: (...) -> None
77
+ def kill (self ) -> None :
88
78
if self ._flusher is None :
89
79
return
90
80
91
81
self ._running = False
92
82
self ._flush_event .set ()
93
83
self ._flusher = None
94
84
95
- def flush (self ):
96
- # type: (...) -> None
85
+ def flush (self ) -> None :
97
86
self ._flush ()
98
87
99
88
@staticmethod
100
- def _log_to_transport_format (log ):
101
- # type: (Log) -> Any
102
- def format_attribute (val ):
103
- # type: (int | float | str | bool) -> Any
89
+ def _log_to_transport_format (log : Log ) -> Any :
90
+ def format_attribute (val : int | float | str | bool ) -> Any :
104
91
if isinstance (val , bool ):
105
92
return {"value" : val , "type" : "boolean" }
106
93
if isinstance (val , int ):
@@ -128,8 +115,7 @@ def format_attribute(val):
128
115
129
116
return res
130
117
131
- def _flush (self ):
132
- # type: (...) -> Optional[Envelope]
118
+ def _flush (self ) -> Optional [Envelope ]:
133
119
134
120
envelope = Envelope (
135
121
headers = {"sent_at" : format_timestamp (datetime .now (timezone .utc ))}
0 commit comments