diff --git a/awswrangler/cloudwatch.py b/awswrangler/cloudwatch.py index 8c92856cd..705caf659 100644 --- a/awswrangler/cloudwatch.py +++ b/awswrangler/cloudwatch.py @@ -245,3 +245,213 @@ def read_logs( if "timestamp" in df.columns: df["timestamp"] = pd.to_datetime(df["timestamp"]) return df + + +def describe_log_streams( + log_group_name: str, + log_stream_name_prefix: Optional[str] = None, + order_by: Optional[str] = "LogStreamName", + descending: Optional[bool] = False, + limit: Optional[int] = 50, + boto3_session: Optional[boto3.Session] = None, +) -> pd.DataFrame: + """List the log streams for the specified log group, return results as a Pandas DataFrame. + + https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/logs.html#CloudWatchLogs.Client.describe_log_streams + + Parameters + ---------- + log_group_name : str + The name of the log group. + log_stream_name_prefix : str + The prefix to match log streams' name + order_by : str + If the value is LogStreamName , the results are ordered by log stream name. + If the value is LastEventTime , the results are ordered by the event time. + The default value is LogStreamName . + descending : bool + If the value is True, results are returned in descending order. + If the value is to False, results are returned in ascending order. + The default value is False. + limit : Optional[int] + The maximum number of items returned. The default is up to 50 items. + boto3_session : boto3.Session(), optional + Boto3 Session. The default boto3 session will be used if boto3_session receive None. + + Returns + ------- + pandas.DataFrame + Result as a Pandas DataFrame. + + Examples + -------- + >>> import awswrangler as wr + >>> df = wr.cloudwatch.describe_log_streams( + ... log_group_name="aws_sdk_pandas_log_group", + ... log_stream_name_prefix="aws_sdk_pandas_log_stream", + ... ) + + """ + client_logs: boto3.client = _utils.client(service_name="logs", session=boto3_session) + args: Dict[str, Any] = { + "logGroupName": log_group_name, + "descending": descending, + "orderBy": order_by, + "limit": limit, + } + if log_stream_name_prefix and order_by == "LogStreamName": + args["logStreamNamePrefix"] = log_stream_name_prefix + elif log_stream_name_prefix and order_by == "LastEventTime": + raise exceptions.InvalidArgumentCombination( + "Cannot call describe_log_streams with both `log_stream_name_prefix` and order_by equal 'LastEventTime'" + ) + log_streams: List[Dict[str, Any]] = [] + response: Dict[str, Any] = client_logs.describe_log_streams(**args) + + log_streams += response["logStreams"] + while "nextToken" in response: + response = client_logs.describe_log_streams( + **args, + nextToken=response["nextToken"], + ) + log_streams += response["logStreams"] + if log_streams: + df: pd.DataFrame = pd.DataFrame(log_streams) + df["logGroupName"] = log_group_name + return df + return pd.DataFrame() + + +def _filter_log_events( + log_group_name: str, + log_stream_names: List[str], + start_timestamp: Optional[int] = None, + end_timestamp: Optional[int] = None, + filter_pattern: Optional[str] = None, + limit: Optional[int] = 10000, + boto3_session: Optional[boto3.Session] = None, +) -> List[Dict[str, Any]]: + client_logs: boto3.client = _utils.client(service_name="logs", session=boto3_session) + events: List[Dict[str, Any]] = [] + args: Dict[str, Any] = { + "logGroupName": log_group_name, + "logStreamNames": log_stream_names, + "limit": limit, + } + if start_timestamp: + args["startTime"] = start_timestamp + if end_timestamp: + args["endTime"] = end_timestamp + if filter_pattern: + args["filterPattern"] = filter_pattern + response: Dict[str, Any] = client_logs.filter_log_events(**args) + events += response["events"] + while "nextToken" in response: + response = client_logs.filter_log_events( + **args, + nextToken=response["nextToken"], + ) + events += response["events"] + return events + + +def filter_log_events( + log_group_name: str, + log_stream_name_prefix: Optional[str] = None, + log_stream_names: Optional[List[str]] = None, + filter_pattern: Optional[str] = None, + start_time: Optional[datetime.datetime] = None, + end_time: Optional[datetime.datetime] = None, + boto3_session: Optional[boto3.Session] = None, +) -> pd.DataFrame: + """List log events from the specified log group. The results are returned as Pandas DataFrame. + + https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/logs.html#CloudWatchLogs.Client.filter_log_events + + Note + ---- + Cannot call filter_log_events with both log_stream_names and log_stream_name_prefix. + + Parameters + ---------- + log_group_name : str + The name of the log group. + log_stream_name_prefix : str + Filters the results to include only events from log streams that have names starting with this prefix. + log_stream_names: List[str] + Filters the results to only logs from the log streams in this list. + filter_pattern : str + The filter pattern to use. If not provided, all the events are matched. + start_time : datetime.datetime + Events with a timestamp before this time are not returned. + end_time : datetime.datetime + Events with a timestamp later than this time are not returned. + boto3_session : boto3.Session(), optional + Boto3 Session. The default boto3 session will be used if boto3_session receive None. + + Returns + ------- + pandas.DataFrame + Result as a Pandas DataFrame. + + Examples + -------- + Get all log events from log group 'aws_sdk_pandas_log_group' that have log stream prefix 'aws_sdk_pandas_log_stream' + + >>> import awswrangler as wr + >>> df = wr.cloudwatch.filter_log_events( + ... log_group_name="aws_sdk_pandas_log_group", + ... log_stream_name_prefix="aws_sdk_pandas_log_stream", + ... ) + + Get all log events contains 'REPORT' from log stream + 'aws_sdk_pandas_log_stream_one' and 'aws_sdk_pandas_log_stream_two' + from log group 'aws_sdk_pandas_log_group' + + >>> import awswrangler as wr + >>> df = wr.cloudwatch.filter_log_events( + ... log_group_name="aws_sdk_pandas_log_group", + ... log_stream_names=["aws_sdk_pandas_log_stream_one","aws_sdk_pandas_log_stream_two"], + ... filter_pattern='REPORT', + ... ) + + """ + if log_stream_name_prefix and log_stream_names: + raise exceptions.InvalidArgumentCombination( + "Cannot call filter_log_events with both log_stream_names and log_stream_name_prefix" + ) + _logger.debug("log_group_name: %s", log_group_name) + + events: List[Dict[str, Any]] = [] + if not log_stream_names: + describe_log_streams_args: Dict[str, Any] = { + "log_group_name": log_group_name, + } + if boto3_session: + describe_log_streams_args["boto3_session"] = boto3_session + if log_stream_name_prefix: + describe_log_streams_args["log_stream_name_prefix"] = log_stream_name_prefix + log_streams = describe_log_streams(**describe_log_streams_args) + log_stream_names = log_streams["logStreamName"].tolist() if len(log_streams.index) else [] + + args: Dict[str, Any] = { + "log_group_name": log_group_name, + } + if start_time: + args["start_timestamp"] = int(1000 * start_time.timestamp()) + if end_time: + args["end_timestamp"] = int(1000 * end_time.timestamp()) + if filter_pattern: + args["filter_pattern"] = filter_pattern + if boto3_session: + args["boto3_session"] = boto3_session + chunked_log_streams_size: int = 50 + + for i in range(0, len(log_stream_names), chunked_log_streams_size): + log_streams = log_stream_names[i : i + chunked_log_streams_size] + events += _filter_log_events(**args, log_stream_names=log_streams) + if events: + df: pd.DataFrame = pd.DataFrame(events) + df["logGroupName"] = log_group_name + return df + return pd.DataFrame() diff --git a/docs/source/api.rst b/docs/source/api.rst index 420367ffa..e2e290b59 100644 --- a/docs/source/api.rst +++ b/docs/source/api.rst @@ -338,6 +338,8 @@ Amazon CloudWatch Logs run_query start_query wait_query + describe_log_streams + filter_log_events Amazon QuickSight ----------------- diff --git a/tests/test_cloudwatch.py b/tests/test_cloudwatch.py index 16c73095e..5f951350d 100644 --- a/tests/test_cloudwatch.py +++ b/tests/test_cloudwatch.py @@ -1,4 +1,5 @@ import logging +from datetime import datetime import boto3 import pytest @@ -49,3 +50,68 @@ def test_read_logs(loggroup): ) assert len(df.index) == 5 assert len(df.columns) == 3 + + +def test_describe_log_streams_and_filter_log_events(loggroup): + cloudwatch_log_client = boto3.client("logs") + log_stream_names = [ + "aws_sdk_pandas_log_stream_one", + "aws_sdk_pandas_log_stream_two", + "aws_sdk_pandas_log_stream_three", + "aws_sdk_pandas_log_stream_four", + ] + for log_stream in log_stream_names: + try: + cloudwatch_log_client.create_log_stream(logGroupName=loggroup, logStreamName=log_stream) + except cloudwatch_log_client.exceptions.ResourceAlreadyExistsException: + continue + + with pytest.raises(exceptions.InvalidArgumentCombination): + wr.cloudwatch.describe_log_streams( + log_group_name=loggroup, + log_stream_name_prefix="aws_sdk_pandas_log_stream", + order_by="LastEventTime", + descending=False, + ) + + log_streams_df = wr.cloudwatch.describe_log_streams( + log_group_name=loggroup, order_by="LastEventTime", descending=False + ) + + assert len(log_streams_df.index) >= 4 + assert "logGroupName" in log_streams_df.columns + + log_streams_df.dropna(inplace=True) + for log_stream in log_streams_df.to_dict("records"): + events = [] + token = log_stream.get("uploadSequenceToken") + for i, event_message in enumerate(["REPORT", "DURATION", "key:value", "START", "END"]): + events.append({"timestamp": int(1000 * datetime.now().timestamp()), "message": f"{i}_{event_message}"}) + args = { + "logGroupName": log_stream["logGroupName"], + "logStreamName": log_stream["logStreamName"], + "logEvents": events, + } + if token: + args["sequenceToken"] = token + try: + cloudwatch_log_client.put_log_events(**args) + except cloudwatch_log_client.exceptions.DataAlreadyAcceptedException: + pass + + with pytest.raises(exceptions.InvalidArgumentCombination): + wr.cloudwatch.filter_log_events( + log_group_name=loggroup, + log_stream_name_prefix="aws_sdk_pandas_log_stream", + log_stream_names=log_streams_df["logStreamName"].tolist(), + ) + + log_events_df = wr.cloudwatch.filter_log_events(log_group_name=loggroup) + assert len(log_events_df.index) >= 4 + assert "logGroupName" in log_events_df.columns + + filtered_log_events_df = wr.cloudwatch.filter_log_events( + log_group_name=loggroup, log_stream_names=log_streams_df["logStreamName"].tolist(), filter_pattern="REPORT" + ) + assert len(filtered_log_events_df.index) >= 4 + assert "logStreamName" in log_events_df.columns