Skip to content
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
1237e55
Added describe_log_streams
KhueNgocDang Nov 20, 2022
0803d4d
Added describe_log_streams to doc
KhueNgocDang Nov 20, 2022
c5fd9cd
Added filter_log_events
KhueNgocDang Nov 20, 2022
ef5a248
Added filter_log_events to doc
KhueNgocDang Nov 20, 2022
60c84a0
Changed *_time* to optional
KhueNgocDang Nov 20, 2022
153ddca
Added unittest for describe_log_streams and filter_log_events
KhueNgocDang Nov 20, 2022
0a14046
Sorted import
KhueNgocDang Nov 20, 2022
839e790
Updated doc string
KhueNgocDang Nov 20, 2022
9bcfb3b
Removed Literal type for python3.7
KhueNgocDang Nov 20, 2022
8f86972
Added exception for create_log_stream in test module
KhueNgocDang Nov 20, 2022
94d176b
Removed Literal and Union unused typing
KhueNgocDang Nov 20, 2022
792f6e8
Corrected doc string
KhueNgocDang Nov 20, 2022
89732e0
Fixed exceptions
KhueNgocDang Nov 20, 2022
c83a623
Trigger githubcodebuild again
KhueNgocDang Nov 20, 2022
4a15322
Added dropna to test
KhueNgocDang Nov 20, 2022
a88a3a2
Trigger CodeBuild
KhueNgocDang Nov 21, 2022
395d247
Changed column len assertion
KhueNgocDang Nov 21, 2022
afe4f77
Fixed filter_log_events not accepting None for both log_stream_name_p…
KhueNgocDang Nov 21, 2022
ae5ff23
Changed filter_log_events in test
KhueNgocDang Nov 21, 2022
b618f67
Changed filter pattern
KhueNgocDang Nov 22, 2022
5e7b568
Added additional test
KhueNgocDang Nov 22, 2022
2c33c90
Trigger code build
KhueNgocDang Nov 22, 2022
368cd89
Fixed typo in _filter_log_events's arg
KhueNgocDang Nov 23, 2022
a381b76
Added expected exceptions test
KhueNgocDang Nov 23, 2022
3429c21
describe_log_streams return emptry dataframe if no log_streams found
KhueNgocDang Nov 23, 2022
f583804
Replaced assertion with if statement
KhueNgocDang Nov 23, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
209 changes: 209 additions & 0 deletions awswrangler/cloudwatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,3 +245,212 @@ def read_logs(
if "timestamp" in df.columns:
df["timestamp"] = pd.to_datetime(df["timestamp"])
return df


def describe_log_streams(
log_group_name: str,
log_stream_name_prefix: Optional[str] = None,
order_by: Optional[str] = "LogStreamName",
descending: Optional[bool] = False,
limit: Optional[int] = 50,
boto3_session: Optional[boto3.Session] = None,
) -> pd.DataFrame:
"""List the log streams for the specified log group, return results as a Pandas DataFrame.

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/logs.html#CloudWatchLogs.Client.describe_log_streams

Parameters
----------
log_group_name : str
The name of the log group.
log_stream_name_prefix : str
The prefix to match log streams' name
order_by : str
If the value is LogStreamName , the results are ordered by log stream name.
If the value is LastEventTime , the results are ordered by the event time.
The default value is LogStreamName .
descending : bool
If the value is True, results are returned in descending order.
If the value is to False, results are returned in ascending order.
The default value is False.
limit : Optional[int]
The maximum number of items returned. The default is up to 50 items.
boto3_session : boto3.Session(), optional
Boto3 Session. The default boto3 session will be used if boto3_session receive None.

Returns
-------
pandas.DataFrame
Result as a Pandas DataFrame.

Examples
--------
>>> import awswrangler as wr
>>> df = wr.cloudwatch.describe_log_streams(
... log_group_name="aws_sdk_pandas_log_group",
... log_stream_name_prefix="aws_sdk_pandas_log_stream",
... )

"""
client_logs: boto3.client = _utils.client(service_name="logs", session=boto3_session)
args: Dict[str, Any] = {
"logGroupName": log_group_name,
"descending": descending,
"orderBy": order_by,
"limit": limit,
}
if log_stream_name_prefix and order_by == "LogStreamName":
args["logStreamNamePrefix"] = log_stream_name_prefix
elif log_stream_name_prefix and order_by == "LastEventTime":
raise exceptions.InvalidArgumentCombination(
"Cannot call describe_log_streams with both `log_stream_name_prefix` and order_by equal 'LastEventTime'"
)
log_streams: List[Dict[str, Any]] = []
response: Dict[str, Any] = client_logs.describe_log_streams(**args)

log_streams += response["logStreams"]
while "nextToken" in response:
response = client_logs.describe_log_streams(
**args,
nextToken=response["nextToken"],
)
log_streams += response["logStreams"]
df: pd.DataFrame = pd.DataFrame(log_streams)
df["logGroupName"] = log_group_name
return df


def _filter_log_events(
log_group_name: str,
log_stream_names: List[str],
start_timestamp: Optional[int] = None,
end_timestamp: Optional[int] = None,
filter_pattern: Optional[str] = None,
limit: Optional[int] = 10000,
boto3_session: Optional[boto3.Session] = None,
) -> List[Dict[str, Any]]:
client_logs: boto3.client = _utils.client(service_name="logs", session=boto3_session)
events: List[Dict[str, Any]] = []
args: Dict[str, Any] = {
"logGroupName": log_group_name,
"logStreamNames": log_stream_names,
"limit": limit,
}
if start_timestamp:
args["startTime"] = start_timestamp
if end_timestamp:
args["endTime"] = start_timestamp
if filter_pattern:
args["filterPattern"] = filter_pattern
response: Dict[str, Any] = client_logs.filter_log_events(**args)
events += response["events"]
while "nextToken" in response:
response = client_logs.filter_log_events(
**args,
nextToken=response["nextToken"],
)
events += response["events"]
return events


def filter_log_events(
log_group_name: str,
log_stream_name_prefix: Optional[str] = None,
log_stream_names: Optional[List[str]] = None,
filter_pattern: Optional[str] = None,
start_time: Optional[datetime.datetime] = None,
end_time: Optional[datetime.datetime] = None,
boto3_session: Optional[boto3.Session] = None,
) -> pd.DataFrame:
"""List log events from the specified log group. The results are returned as Pandas DataFrame.

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/logs.html#CloudWatchLogs.Client.filter_log_events

Note
----
Cannot call filter_log_events with both log_stream_names and log_stream_name_prefix.

Parameters
----------
log_group_name : str
The name of the log group.
log_stream_name_prefix : str
Filters the results to include only events from log streams that have names starting with this prefix.
log_stream_names: List[str]
Filters the results to only logs from the log streams in this list.
filter_pattern : str
The filter pattern to use. If not provided, all the events are matched.
start_time : datetime.datetime
Events with a timestamp before this time are not returned.
end_time : datetime.datetime
Events with a timestamp later than this time are not returned.
boto3_session : boto3.Session(), optional
Boto3 Session. The default boto3 session will be used if boto3_session receive None.

Returns
-------
pandas.DataFrame
Result as a Pandas DataFrame.

Examples
--------
Get all log events from log group 'aws_sdk_pandas_log_group' that have log stream prefix 'aws_sdk_pandas_log_stream'

>>> import awswrangler as wr
>>> df = wr.cloudwatch.filter_log_events(
... log_group_name="aws_sdk_pandas_log_group",
... log_stream_name_prefix="aws_sdk_pandas_log_stream",
... )

Get all log events contains 'REPORT' from log stream
'aws_sdk_pandas_log_stream_one' and 'aws_sdk_pandas_log_stream_two'
from log group 'aws_sdk_pandas_log_group'

>>> import awswrangler as wr
>>> df = wr.cloudwatch.filter_log_events(
... log_group_name="aws_sdk_pandas_log_group",
... log_stream_names=["aws_sdk_pandas_log_stream_one","aws_sdk_pandas_log_stream_two"],
... filter_pattern='REPORT',
... )

"""
if log_stream_name_prefix and log_stream_names:
raise exceptions.InvalidArgumentCombination(
"Cannot call filter_log_events with both log_stream_names and log_stream_name_prefix"
)
_logger.debug("log_group_name: %s", log_group_name)

events: List[Dict[str, Any]] = []
if not log_stream_names:
describe_log_streams_args: Dict[str, Any] = {
"log_group_name": log_group_name,
}
if boto3_session:
describe_log_streams_args["boto3_session"] = boto3_session
if log_stream_name_prefix:
describe_log_streams_args["log_stream_name_prefix"] = log_stream_name_prefix
log_stream_names = describe_log_streams(
log_group_name=log_group_name, log_stream_name_prefix=log_stream_name_prefix, boto3_session=boto3_session
)["logStreamName"].tolist()
assert log_stream_names is not None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we use assert anywhere in this codebase apart from the unit tests. It would be better to expicitely throw an error here.

However, can thus value even be None in this caqse? If describe_log_streams doesn't return any results, then wouldn't log_stream_names just be an empty list?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used assert to prevent mypy from raising Argument 1 to "len" has incompatible type "Optional[List[str]]"; expected "Sized". There are some cases like mine in the codebase (lines 407 and 410 in postgresql.py) that used # type: ignore to bypass the errors. but I don't like that approach, so I used assert instead.
I'll replace the assert with a one-liner if statement at line 434 that assigns an empty list to log_stream_names If describe_log_streams doesn't return any results.

args: Dict[str, Any] = {
"log_group_name": log_group_name,
}
if start_time:
args["start_timestamp"] = int(1000 * start_time.timestamp())
if end_time:
args["end_timestamp"] = int(1000 * end_time.timestamp())
if filter_pattern:
args["filter_pattern"] = filter_pattern
if boto3_session:
args["boto3_session"] = boto3_session
chunked_log_streams_size: int = 50

for i in range(0, len(log_stream_names), chunked_log_streams_size):
log_streams = log_stream_names[i : i + chunked_log_streams_size]
events += _filter_log_events(**args, log_stream_names=log_streams)
if events:
df: pd.DataFrame = pd.DataFrame(events)
df["logGroupName"] = log_group_name
return df
return pd.DataFrame()
2 changes: 2 additions & 0 deletions docs/source/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,8 @@ Amazon CloudWatch Logs
run_query
start_query
wait_query
describe_log_streams
filter_log_events

Amazon QuickSight
-----------------
Expand Down
51 changes: 51 additions & 0 deletions tests/test_cloudwatch.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
from datetime import datetime

import boto3
import pytest
Expand Down Expand Up @@ -49,3 +50,53 @@ def test_read_logs(loggroup):
)
assert len(df.index) == 5
assert len(df.columns) == 3


def test_describe_log_streams_and_filter_log_events(loggroup):
cloudwatch_log_client = boto3.client("logs")
log_stream_names = [
"aws_sdk_pandas_log_stream_one",
"aws_sdk_pandas_log_stream_two",
"aws_sdk_pandas_log_stream_three",
"aws_sdk_pandas_log_stream_four",
]
for log_stream in log_stream_names:
try:
cloudwatch_log_client.create_log_stream(logGroupName=loggroup, logStreamName=log_stream)
except cloudwatch_log_client.exceptions.ResourceAlreadyExistsException:
continue

log_streams_df = wr.cloudwatch.describe_log_streams(
log_group_name=loggroup, order_by="LastEventTime", descending=False
)

assert len(log_streams_df.index) >= 4
assert "logGroupName" in log_streams_df.columns

log_streams_df.dropna(inplace=True)
for log_stream in log_streams_df.to_dict("records"):
events = []
token = log_stream.get("uploadSequenceToken")
for i, event_message in enumerate(["REPORT", "DURATION", "key:value", "START", "END"]):
events.append({"timestamp": int(1000 * datetime.now().timestamp()), "message": f"{i}_{event_message}"})
args = {
"logGroupName": log_stream["logGroupName"],
"logStreamName": log_stream["logStreamName"],
"logEvents": events,
}
if token:
args["sequenceToken"] = token
try:
cloudwatch_log_client.put_log_events(**args)
except cloudwatch_log_client.exceptions.DataAlreadyAcceptedException:
pass

log_events_df = wr.cloudwatch.filter_log_events(log_group_name=loggroup)
assert len(log_events_df.index) >= 4
assert "logGroupName" in log_events_df.columns

filtered_log_events_df = wr.cloudwatch.filter_log_events(
log_group_name=loggroup, log_stream_names=log_streams_df["logStreamName"].tolist(), filter_pattern="REPORT"
)
assert len(filtered_log_events_df.index) >= 4
assert "logStreamName" in log_events_df.columns