From 1237e55e006d9378b1d096a69dbda14e4d52cdda Mon Sep 17 00:00:00 2001 From: Khue Ngoc Dang Date: Sun, 20 Nov 2022 13:16:31 +0700 Subject: [PATCH 01/26] Added describe_log_streams --- awswrangler/cloudwatch.py | 75 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 74 insertions(+), 1 deletion(-) diff --git a/awswrangler/cloudwatch.py b/awswrangler/cloudwatch.py index 8c92856cd..bca8d38a1 100644 --- a/awswrangler/cloudwatch.py +++ b/awswrangler/cloudwatch.py @@ -3,7 +3,7 @@ import datetime import logging import time -from typing import Any, Dict, List, Optional, cast +from typing import Any, Dict, List, Literal, Optional, Union, cast import boto3 import pandas as pd @@ -245,3 +245,76 @@ def read_logs( if "timestamp" in df.columns: df["timestamp"] = pd.to_datetime(df["timestamp"]) return df + + +def describe_log_streams( + log_group_name: str, + log_stream_name_prefix: Optional[str] = None, + order_by: Optional[Union[Literal["LogStreamName"], Literal["LastEventTime"]]] = "LogStreamName", + descending: Optional[bool] = False, + limit: Optional[int] = 50, + boto3_session: Optional[boto3.Session] = None, +) -> pd.DataFrame: + """Lists the log streams for the specified log group, return results as a Pandas DataFrame + + https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/logs.html#CloudWatchLogs.Client.describe_log_streams + + Parameters + ---------- + log_group_name : str + The name of the log group. + log_stream_name_prefix : str + The prefix to match log streams' name + order_by : str + If the value is LogStreamName , the results are ordered by log stream name. + If the value is LastEventTime , the results are ordered by the event time. + The default value is LogStreamName . + descending : bool + If the value is True, results are returned in descending order. + If the value is to False, results are returned in ascending order. + The default value is False. + limit : Optional[int] + The maximum number of items returned. The default is up to 50 items. + boto3_session : boto3.Session(), optional + Boto3 Session. The default boto3 session will be used if boto3_session receive None. + + Returns + ------- + pandas.DataFrame + Result as a Pandas DataFrame. + + Examples + -------- + >>> import awswrangler as wr + >>> df = wr.cloudwatch.describe_log_streams( + ... log_group_name="loggroup", + ... log_stream_name_prefix="test", + ... ) + + """ + client_logs: boto3.client = _utils.client(service_name="logs", session=boto3_session) + args: Dict[str, Any] = { + "logGroupName": log_group_name, + "descending": descending, + "orderBy": order_by, + "limit": limit, + } + if log_stream_name_prefix and order_by == "LogStreamName": + args["logStreamNamePrefix"] = log_stream_name_prefix + elif log_stream_name_prefix and order_by == "LastEventTime": + raise exceptions.InvalidArgumentCombination( + "you cannot specify `log_stream_name_prefix` with order_by equal to 'LastEventTime' " + ) + log_streams: List[Dict[str, Any]] = [] + response: Dict[str, Any] = client_logs.describe_log_streams(**args) + + log_streams += response["logStreams"] + while "nextToken" in response: + response = client_logs.describe_log_streams( + **args, + nextToken=response["nextToken"], + ) + log_streams += response["logStreams"] + log_streams_df: pd.DataFrame = pd.DataFrame(log_streams) + log_streams_df["logGroupName"] = log_group_name + return log_streams_df From 0803d4de1c63d62048da312c559fc33ca5084b15 Mon Sep 17 00:00:00 2001 From: Khue Ngoc Dang Date: Sun, 20 Nov 2022 13:18:56 +0700 Subject: [PATCH 02/26] Added describe_log_streams to doc --- docs/source/api.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/source/api.rst b/docs/source/api.rst index 420367ffa..c2d0c8c9b 100644 --- a/docs/source/api.rst +++ b/docs/source/api.rst @@ -338,6 +338,7 @@ Amazon CloudWatch Logs run_query start_query wait_query + describe_log_streams Amazon QuickSight ----------------- From c5fd9cdbc7bba8a25e5fe47d7cb8fe4e89d4cb8c Mon Sep 17 00:00:00 2001 From: Khue Ngoc Dang Date: Sun, 20 Nov 2022 14:54:27 +0700 Subject: [PATCH 03/26] Added filter_log_events --- awswrangler/cloudwatch.py | 124 ++++++++++++++++++++++++++++++++++++-- 1 file changed, 120 insertions(+), 4 deletions(-) diff --git a/awswrangler/cloudwatch.py b/awswrangler/cloudwatch.py index bca8d38a1..dcc008fc9 100644 --- a/awswrangler/cloudwatch.py +++ b/awswrangler/cloudwatch.py @@ -303,7 +303,7 @@ def describe_log_streams( args["logStreamNamePrefix"] = log_stream_name_prefix elif log_stream_name_prefix and order_by == "LastEventTime": raise exceptions.InvalidArgumentCombination( - "you cannot specify `log_stream_name_prefix` with order_by equal to 'LastEventTime' " + "Cannot call describe_log_streams with both `log_stream_name_prefix` and order_by equal 'LastEventTime'" ) log_streams: List[Dict[str, Any]] = [] response: Dict[str, Any] = client_logs.describe_log_streams(**args) @@ -315,6 +315,122 @@ def describe_log_streams( nextToken=response["nextToken"], ) log_streams += response["logStreams"] - log_streams_df: pd.DataFrame = pd.DataFrame(log_streams) - log_streams_df["logGroupName"] = log_group_name - return log_streams_df + df: pd.DataFrame = pd.DataFrame(log_streams) + df["logGroupName"] = log_group_name + return df + + +def _filter_log_events( + log_group_name: str, + log_stream_names: List[str], + start_timestamp: int, + end_timestamp: int, + filter_pattern: Optional[str] = None, + limit: Optional[int] = 10000, + boto3_session: Optional[boto3.Session] = None, +) -> List[Dict[str, Any]]: + client_logs: boto3.client = _utils.client(service_name="logs", session=boto3_session) + events: List[Dict[str, Any]] = [] + args: Dict[str, Any] = { + "logGroupName": log_group_name, + "logStreamNames": log_stream_names, + "limit": limit, + "startTime": start_timestamp, + "endTime": end_timestamp, + } + if filter_pattern: + args["filterPattern"] = filter_pattern + response: Dict[str, Any] = client_logs.filter_log_events(**args) + events += response["events"] + while "nextToken" in response: + response = client_logs.filter_log_events( + **args, + nextToken=response["nextToken"], + ) + events += response["logStreams"] + return events + + +def filter_log_events( + log_group_name: str, + log_stream_name_prefix: Optional[str] = None, + log_stream_names: Optional[List[str]] = None, + filter_pattern: Optional[str] = None, + start_time: datetime.datetime = datetime.datetime(year=1970, month=1, day=1, tzinfo=datetime.timezone.utc), + end_time: datetime.datetime = datetime.datetime.utcnow(), + boto3_session: Optional[boto3.Session] = None, +) -> pd.DataFrame: + """Lists log events from the specified log group. The results are returned as Pandas DataFrame. + + https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/logs.html#CloudWatchLogs.Client.filter_log_events + + Note + ---- + Cannot call filter_log_events with both log_stream_names and log_stream_name_prefix. + + Parameters + ---------- + log_group_name : str + The name of the log group. + log_stream_name_prefix : str + Filters the results to include only events from log streams that have names starting with this prefix. + log_stream_names: List[str] + Filters the results to only logs from the log streams in this list. + filter_pattern : str + The filter pattern to use. If not provided, all the events are matched. + start_time : datetime.datetime + Events with a timestamp before this time are not returned. + end_time : datetime.datetime + Events with a timestamp later than this time are not returned. + boto3_session : boto3.Session(), optional + Boto3 Session. The default boto3 session will be used if boto3_session receive None. + + Returns + ------- + pandas.DataFrame + Result as a Pandas DataFrame. + + Examples + -------- + >>> import awswrangler as wr + >>> df = wr.cloudwatch.filter_log_events( + ... log_group_name="loggroup", + ... log_stream_name_prefix="test", + ... ) + + """ + if log_stream_name_prefix and log_stream_names: + raise exceptions.InvalidArgumentCombination( + "Cannot call filter_log_events with both log_stream_names and log_stream_name_prefix" + ) + _logger.debug("log_group_name: %s", log_group_name) + start_timestamp: int = int(1000 * start_time.timestamp()) + end_timestamp: int = int(1000 * end_time.timestamp()) + _validate_args(start_timestamp=start_timestamp, end_timestamp=end_timestamp) + + events: List[Dict[str, Any]] = [] + if log_stream_name_prefix and not log_stream_names: + log_stream_names = describe_log_streams( + log_group_name=log_group_name, log_stream_name_prefix=log_stream_name_prefix, boto3_session=boto3_session + )["logStreamName"].tolist() + assert log_stream_names is not None + args: Dict[str, Any] = { + "log_group_name": log_group_name, + "start_timestamp": start_timestamp, + "end_timestamp": end_timestamp, + } + + if filter_pattern: + args["filter_pattern"] = filter_pattern + if boto3_session: + args["boto3_session"] = boto3_session + chunked_log_streams_size: int = 50 + + for i in range(0, len(log_stream_names), chunked_log_streams_size): + log_streams = log_stream_names[i : i + chunked_log_streams_size] + events += _filter_log_events(**args, log_stream_names=log_streams) + if events: + df: pd.DataFrame = pd.DataFrame(events) + df["logGroupName"] = log_group_name + return df + return pd.DataFrame() From ef5a2486600c2d433f298acf578663844b33682b Mon Sep 17 00:00:00 2001 From: Khue Ngoc Dang Date: Sun, 20 Nov 2022 14:56:51 +0700 Subject: [PATCH 04/26] Added filter_log_events to doc --- docs/source/api.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/source/api.rst b/docs/source/api.rst index c2d0c8c9b..e2e290b59 100644 --- a/docs/source/api.rst +++ b/docs/source/api.rst @@ -339,6 +339,7 @@ Amazon CloudWatch Logs start_query wait_query describe_log_streams + filter_log_events Amazon QuickSight ----------------- From 60c84a0f6af021d51492af9a8d35e48616677977 Mon Sep 17 00:00:00 2001 From: Khue Ngoc Dang Date: Sun, 20 Nov 2022 15:49:54 +0700 Subject: [PATCH 05/26] Changed *_time* to optional --- awswrangler/cloudwatch.py | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/awswrangler/cloudwatch.py b/awswrangler/cloudwatch.py index dcc008fc9..7e95608c0 100644 --- a/awswrangler/cloudwatch.py +++ b/awswrangler/cloudwatch.py @@ -323,8 +323,8 @@ def describe_log_streams( def _filter_log_events( log_group_name: str, log_stream_names: List[str], - start_timestamp: int, - end_timestamp: int, + start_timestamp: Optional[int] = None, + end_timestamp: Optional[int] = None, filter_pattern: Optional[str] = None, limit: Optional[int] = 10000, boto3_session: Optional[boto3.Session] = None, @@ -335,9 +335,11 @@ def _filter_log_events( "logGroupName": log_group_name, "logStreamNames": log_stream_names, "limit": limit, - "startTime": start_timestamp, - "endTime": end_timestamp, } + if start_timestamp: + args["startTime"] = start_timestamp + if end_timestamp: + args["endTime"] = start_timestamp if filter_pattern: args["filterPattern"] = filter_pattern response: Dict[str, Any] = client_logs.filter_log_events(**args) @@ -347,7 +349,7 @@ def _filter_log_events( **args, nextToken=response["nextToken"], ) - events += response["logStreams"] + events += response["events"] return events @@ -356,8 +358,8 @@ def filter_log_events( log_stream_name_prefix: Optional[str] = None, log_stream_names: Optional[List[str]] = None, filter_pattern: Optional[str] = None, - start_time: datetime.datetime = datetime.datetime(year=1970, month=1, day=1, tzinfo=datetime.timezone.utc), - end_time: datetime.datetime = datetime.datetime.utcnow(), + start_time: Optional[datetime.datetime] = None, + end_time: Optional[datetime.datetime] = None, boto3_session: Optional[boto3.Session] = None, ) -> pd.DataFrame: """Lists log events from the specified log group. The results are returned as Pandas DataFrame. @@ -404,9 +406,6 @@ def filter_log_events( "Cannot call filter_log_events with both log_stream_names and log_stream_name_prefix" ) _logger.debug("log_group_name: %s", log_group_name) - start_timestamp: int = int(1000 * start_time.timestamp()) - end_timestamp: int = int(1000 * end_time.timestamp()) - _validate_args(start_timestamp=start_timestamp, end_timestamp=end_timestamp) events: List[Dict[str, Any]] = [] if log_stream_name_prefix and not log_stream_names: @@ -416,10 +415,11 @@ def filter_log_events( assert log_stream_names is not None args: Dict[str, Any] = { "log_group_name": log_group_name, - "start_timestamp": start_timestamp, - "end_timestamp": end_timestamp, } - + if start_time: + args["start_timestamp"] = int(1000 * start_time.timestamp()) + if end_time: + args["end_timestamp"] = int(1000 * end_time.timestamp()) if filter_pattern: args["filter_pattern"] = filter_pattern if boto3_session: From 153ddca09f34a0f710021dd4b29c873723157770 Mon Sep 17 00:00:00 2001 From: Khue Ngoc Dang Date: Sun, 20 Nov 2022 15:51:42 +0700 Subject: [PATCH 06/26] Added unittest for describe_log_streams and filter_log_events --- tests/test_cloudwatch.py | 45 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/tests/test_cloudwatch.py b/tests/test_cloudwatch.py index 16c73095e..84f2e4c6f 100644 --- a/tests/test_cloudwatch.py +++ b/tests/test_cloudwatch.py @@ -1,3 +1,4 @@ +from datetime import datetime import logging import boto3 @@ -49,3 +50,47 @@ def test_read_logs(loggroup): ) assert len(df.index) == 5 assert len(df.columns) == 3 + + +def test_describe_log_streams_and_filter_log_events(loggroup): + cloudwatch_log_client = boto3.client("logs") + log_stream_names = [ + "aws_sdk_pandas_log_stream_one", + "aws_sdk_pandas_log_stream_two", + "aws_sdk_pandas_log_stream_three", + "aws_sdk_pandas_log_stream_four", + ] + for log_stream in log_stream_names: + cloudwatch_log_client.create_log_stream(logGroupName=loggroup, logStreamName=log_stream) + log_streams_df = wr.cloudwatch.describe_log_streams( + log_group_name=loggroup, order_by="LastEventTime", descending=False + ) + assert len(log_streams_df.index) >= 4 + assert "logGroupName" in log_streams_df.columns + for log_stream in log_streams_df.to_dict("records"): + events = [] + token = log_stream.get("uploadSequenceToken") + for i, event_message in enumerate(["REPORT", "DURATION", "key:value", "START", "END"]): + events.append({"timestamp": int(1000 * datetime.now().timestamp()), "message": f"{i}_{event_message}"}) + args = { + "logGroupName": log_stream["logGroupName"], + "logStreamName": log_stream["logStreamName"], + "logEvents": events, + } + if token: + args["sequenceToken"] = token + try: + cloudwatch_log_client.put_log_events(**args) + except cloudwatch_log_client.exceptions.DataAlreadyAcceptedException: + pass # Concurrency + + log_events_df = wr.cloudwatch.filter_log_events( + log_group_name=loggroup, log_stream_name_prefix="aws_sdk_pandas_log_stream" + ) + assert len(set(log_events_df["logStreamName"].tolist())) >= 4 + + filtered_log_events_df = wr.cloudwatch.filter_log_events( + log_group_name=loggroup, log_stream_names=log_stream_names, filter_pattern='"REPORT"' + ) + assert len(filtered_log_events_df.index) >= 4 + assert set(log_events_df["logStreamName"].tolist()) == set(log_stream_names) From 0a14046d6e809b951f94e7d47e0496df9a11954b Mon Sep 17 00:00:00 2001 From: Khue Ngoc Dang Date: Sun, 20 Nov 2022 16:01:33 +0700 Subject: [PATCH 07/26] Sorted import --- tests/test_cloudwatch.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_cloudwatch.py b/tests/test_cloudwatch.py index 84f2e4c6f..147298a5a 100644 --- a/tests/test_cloudwatch.py +++ b/tests/test_cloudwatch.py @@ -1,5 +1,5 @@ -from datetime import datetime import logging +from datetime import datetime import boto3 import pytest From 839e790704edd67a980280a81c19bbaec153480e Mon Sep 17 00:00:00 2001 From: Khue Ngoc Dang Date: Sun, 20 Nov 2022 16:01:54 +0700 Subject: [PATCH 08/26] Updated doc string --- awswrangler/cloudwatch.py | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/awswrangler/cloudwatch.py b/awswrangler/cloudwatch.py index 7e95608c0..4e51701db 100644 --- a/awswrangler/cloudwatch.py +++ b/awswrangler/cloudwatch.py @@ -287,8 +287,8 @@ def describe_log_streams( -------- >>> import awswrangler as wr >>> df = wr.cloudwatch.describe_log_streams( - ... log_group_name="loggroup", - ... log_stream_name_prefix="test", + ... log_group_name="aws_sdk_pandas_log_group", + ... log_stream_name_prefix="aws_sdk_pandas_log_stream", ... ) """ @@ -394,10 +394,23 @@ def filter_log_events( Examples -------- + Get all log events from log group 'aws_sdk_pandas_log_group' that have log stream prefix 'aws_sdk_pandas_log_stream' + + >>> import awswrangler as wr + >>> df = wr.cloudwatch.filter_log_events( + ... log_group_name="aws_sdk_pandas_log_group", + ... log_stream_name_prefix="aws_sdk_pandas_log_stream", + ... ) + + Get all log events contains 'REPORT' from log stream + 'aws_sdk_pandas_log_stream_one' and 'aws_sdk_pandas_log_stream_two' + from log group 'aws_sdk_pandas_log_group' + >>> import awswrangler as wr >>> df = wr.cloudwatch.filter_log_events( - ... log_group_name="loggroup", - ... log_stream_name_prefix="test", + ... log_group_name="aws_sdk_pandas_log_group", + ... log_stream_names=["aws_sdk_pandas_log_stream_one","aws_sdk_pandas_log_stream_two"], + ... filter_pattern='REPORT', ... ) """ From 9bcfb3b7579e70c4c6e8b27c7c06a84d2960f4ea Mon Sep 17 00:00:00 2001 From: Khue Ngoc Dang Date: Sun, 20 Nov 2022 16:59:24 +0700 Subject: [PATCH 09/26] Removed Literal type for python3.7 --- awswrangler/cloudwatch.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/awswrangler/cloudwatch.py b/awswrangler/cloudwatch.py index 4e51701db..3525fa62a 100644 --- a/awswrangler/cloudwatch.py +++ b/awswrangler/cloudwatch.py @@ -250,7 +250,7 @@ def read_logs( def describe_log_streams( log_group_name: str, log_stream_name_prefix: Optional[str] = None, - order_by: Optional[Union[Literal["LogStreamName"], Literal["LastEventTime"]]] = "LogStreamName", + order_by: Optional[str] = "LogStreamName", descending: Optional[bool] = False, limit: Optional[int] = 50, boto3_session: Optional[boto3.Session] = None, From 8f8697204cc551c5eaf73810df1db10ff07e9f7a Mon Sep 17 00:00:00 2001 From: Khue Ngoc Dang Date: Sun, 20 Nov 2022 17:09:15 +0700 Subject: [PATCH 10/26] Added exception for create_log_stream in test module --- tests/test_cloudwatch.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/test_cloudwatch.py b/tests/test_cloudwatch.py index 147298a5a..0c766c7c0 100644 --- a/tests/test_cloudwatch.py +++ b/tests/test_cloudwatch.py @@ -61,7 +61,10 @@ def test_describe_log_streams_and_filter_log_events(loggroup): "aws_sdk_pandas_log_stream_four", ] for log_stream in log_stream_names: - cloudwatch_log_client.create_log_stream(logGroupName=loggroup, logStreamName=log_stream) + try: + cloudwatch_log_client.create_log_stream(logGroupName=loggroup, logStreamName=log_stream) + except cloudwatch_log_client.Client.exceptions.ResourceAlreadyExistsException: + continue log_streams_df = wr.cloudwatch.describe_log_streams( log_group_name=loggroup, order_by="LastEventTime", descending=False ) From 94d176b2b7ffa55919a7fcbc707e7c2105633380 Mon Sep 17 00:00:00 2001 From: Khue Ngoc Dang Date: Sun, 20 Nov 2022 17:18:42 +0700 Subject: [PATCH 11/26] Removed Literal and Union unused typing --- awswrangler/cloudwatch.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/awswrangler/cloudwatch.py b/awswrangler/cloudwatch.py index 3525fa62a..967a06498 100644 --- a/awswrangler/cloudwatch.py +++ b/awswrangler/cloudwatch.py @@ -3,7 +3,7 @@ import datetime import logging import time -from typing import Any, Dict, List, Literal, Optional, Union, cast +from typing import Any, Dict, List, Optional, cast import boto3 import pandas as pd From 792f6e821c7129bf1c88a30de21f3a2d79d5b4ec Mon Sep 17 00:00:00 2001 From: Khue Ngoc Dang Date: Sun, 20 Nov 2022 17:39:45 +0700 Subject: [PATCH 12/26] Corrected doc string --- awswrangler/cloudwatch.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/awswrangler/cloudwatch.py b/awswrangler/cloudwatch.py index 967a06498..5ff3c0d7b 100644 --- a/awswrangler/cloudwatch.py +++ b/awswrangler/cloudwatch.py @@ -255,7 +255,7 @@ def describe_log_streams( limit: Optional[int] = 50, boto3_session: Optional[boto3.Session] = None, ) -> pd.DataFrame: - """Lists the log streams for the specified log group, return results as a Pandas DataFrame + """List the log streams for the specified log group, return results as a Pandas DataFrame. https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/logs.html#CloudWatchLogs.Client.describe_log_streams @@ -362,7 +362,7 @@ def filter_log_events( end_time: Optional[datetime.datetime] = None, boto3_session: Optional[boto3.Session] = None, ) -> pd.DataFrame: - """Lists log events from the specified log group. The results are returned as Pandas DataFrame. + """List log events from the specified log group. The results are returned as Pandas DataFrame. https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/logs.html#CloudWatchLogs.Client.filter_log_events From 89732e04627d87efcc667993b83d7e7e216090a8 Mon Sep 17 00:00:00 2001 From: Khue Ngoc Dang Date: Sun, 20 Nov 2022 17:58:32 +0700 Subject: [PATCH 13/26] Fixed exceptions --- tests/test_cloudwatch.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_cloudwatch.py b/tests/test_cloudwatch.py index 0c766c7c0..69f2a6a92 100644 --- a/tests/test_cloudwatch.py +++ b/tests/test_cloudwatch.py @@ -63,7 +63,7 @@ def test_describe_log_streams_and_filter_log_events(loggroup): for log_stream in log_stream_names: try: cloudwatch_log_client.create_log_stream(logGroupName=loggroup, logStreamName=log_stream) - except cloudwatch_log_client.Client.exceptions.ResourceAlreadyExistsException: + except cloudwatch_log_client.exceptions.ResourceAlreadyExistsException: continue log_streams_df = wr.cloudwatch.describe_log_streams( log_group_name=loggroup, order_by="LastEventTime", descending=False From c83a6232ad7cdd27f0dd00b1240d4f6f43204ca8 Mon Sep 17 00:00:00 2001 From: Khue Ngoc Dang Date: Mon, 21 Nov 2022 06:19:27 +0700 Subject: [PATCH 14/26] Trigger githubcodebuild again --- tests/test_cloudwatch.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/test_cloudwatch.py b/tests/test_cloudwatch.py index 69f2a6a92..edf5faf24 100644 --- a/tests/test_cloudwatch.py +++ b/tests/test_cloudwatch.py @@ -70,6 +70,7 @@ def test_describe_log_streams_and_filter_log_events(loggroup): ) assert len(log_streams_df.index) >= 4 assert "logGroupName" in log_streams_df.columns + for log_stream in log_streams_df.to_dict("records"): events = [] token = log_stream.get("uploadSequenceToken") From 4a153221f970d918576db2ac8fa76d1e1ca0b4ff Mon Sep 17 00:00:00 2001 From: Khue Ngoc Dang Date: Mon, 21 Nov 2022 06:55:20 +0700 Subject: [PATCH 15/26] Added dropna to test --- tests/test_cloudwatch.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/test_cloudwatch.py b/tests/test_cloudwatch.py index edf5faf24..1e429477d 100644 --- a/tests/test_cloudwatch.py +++ b/tests/test_cloudwatch.py @@ -71,6 +71,7 @@ def test_describe_log_streams_and_filter_log_events(loggroup): assert len(log_streams_df.index) >= 4 assert "logGroupName" in log_streams_df.columns + log_streams_df.dropna(inplace=True) for log_stream in log_streams_df.to_dict("records"): events = [] token = log_stream.get("uploadSequenceToken") From a88a3a2380bab70be92f9421f1243285e3a0588d Mon Sep 17 00:00:00 2001 From: Khue Ngoc Dang Date: Mon, 21 Nov 2022 22:57:16 +0700 Subject: [PATCH 16/26] Trigger CodeBuild --- tests/test_cloudwatch.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_cloudwatch.py b/tests/test_cloudwatch.py index 1e429477d..b4068ffb2 100644 --- a/tests/test_cloudwatch.py +++ b/tests/test_cloudwatch.py @@ -87,7 +87,7 @@ def test_describe_log_streams_and_filter_log_events(loggroup): try: cloudwatch_log_client.put_log_events(**args) except cloudwatch_log_client.exceptions.DataAlreadyAcceptedException: - pass # Concurrency + pass log_events_df = wr.cloudwatch.filter_log_events( log_group_name=loggroup, log_stream_name_prefix="aws_sdk_pandas_log_stream" From 395d24748a6ecb4de21e90fc38b4c07b665bd19e Mon Sep 17 00:00:00 2001 From: Khue Ngoc Dang Date: Mon, 21 Nov 2022 23:28:34 +0700 Subject: [PATCH 17/26] Changed column len assertion --- tests/test_cloudwatch.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/test_cloudwatch.py b/tests/test_cloudwatch.py index b4068ffb2..d1546e418 100644 --- a/tests/test_cloudwatch.py +++ b/tests/test_cloudwatch.py @@ -92,10 +92,9 @@ def test_describe_log_streams_and_filter_log_events(loggroup): log_events_df = wr.cloudwatch.filter_log_events( log_group_name=loggroup, log_stream_name_prefix="aws_sdk_pandas_log_stream" ) - assert len(set(log_events_df["logStreamName"].tolist())) >= 4 + assert len(log_events_df.index) >= 4 filtered_log_events_df = wr.cloudwatch.filter_log_events( log_group_name=loggroup, log_stream_names=log_stream_names, filter_pattern='"REPORT"' ) assert len(filtered_log_events_df.index) >= 4 - assert set(log_events_df["logStreamName"].tolist()) == set(log_stream_names) From afe4f773c6ec755153f554db624e9f42cd6e54b2 Mon Sep 17 00:00:00 2001 From: Khue Ngoc Dang Date: Tue, 22 Nov 2022 06:22:44 +0700 Subject: [PATCH 18/26] Fixed filter_log_events not accepting None for both log_stream_name_prefix and log_stream_names --- awswrangler/cloudwatch.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/awswrangler/cloudwatch.py b/awswrangler/cloudwatch.py index 5ff3c0d7b..d01c399ec 100644 --- a/awswrangler/cloudwatch.py +++ b/awswrangler/cloudwatch.py @@ -421,7 +421,14 @@ def filter_log_events( _logger.debug("log_group_name: %s", log_group_name) events: List[Dict[str, Any]] = [] - if log_stream_name_prefix and not log_stream_names: + if not log_stream_names: + describe_log_streams_args: Dict[str, Any] = { + "log_group_name": log_group_name, + } + if boto3_session: + describe_log_streams_args["boto3_session"] = boto3_session + if log_stream_name_prefix: + describe_log_streams_args["log_stream_name_prefix"] = log_stream_name_prefix log_stream_names = describe_log_streams( log_group_name=log_group_name, log_stream_name_prefix=log_stream_name_prefix, boto3_session=boto3_session )["logStreamName"].tolist() From ae5ff239fd78ed2447292c2a2f810098d9703642 Mon Sep 17 00:00:00 2001 From: Khue Ngoc Dang Date: Tue, 22 Nov 2022 06:24:08 +0700 Subject: [PATCH 19/26] Changed filter_log_events in test --- tests/test_cloudwatch.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tests/test_cloudwatch.py b/tests/test_cloudwatch.py index d1546e418..67c6495a0 100644 --- a/tests/test_cloudwatch.py +++ b/tests/test_cloudwatch.py @@ -89,9 +89,7 @@ def test_describe_log_streams_and_filter_log_events(loggroup): except cloudwatch_log_client.exceptions.DataAlreadyAcceptedException: pass - log_events_df = wr.cloudwatch.filter_log_events( - log_group_name=loggroup, log_stream_name_prefix="aws_sdk_pandas_log_stream" - ) + log_events_df = wr.cloudwatch.filter_log_events(log_group_name=loggroup) assert len(log_events_df.index) >= 4 filtered_log_events_df = wr.cloudwatch.filter_log_events( From b618f6700036f911dfd31d5984023dba930ccf81 Mon Sep 17 00:00:00 2001 From: Khue Ngoc Dang Date: Tue, 22 Nov 2022 07:02:30 +0700 Subject: [PATCH 20/26] Changed filter pattern --- tests/test_cloudwatch.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_cloudwatch.py b/tests/test_cloudwatch.py index 67c6495a0..1a86f3e49 100644 --- a/tests/test_cloudwatch.py +++ b/tests/test_cloudwatch.py @@ -93,6 +93,6 @@ def test_describe_log_streams_and_filter_log_events(loggroup): assert len(log_events_df.index) >= 4 filtered_log_events_df = wr.cloudwatch.filter_log_events( - log_group_name=loggroup, log_stream_names=log_stream_names, filter_pattern='"REPORT"' + log_group_name=loggroup, log_stream_names=log_streams_df["logStreamName"].tolist(), filter_pattern="REPORT" ) - assert len(filtered_log_events_df.index) >= 4 + assert len(filtered_log_events_df.index) > 0 From 5e7b568428bb558b6f5dade58e05632e7f0420d6 Mon Sep 17 00:00:00 2001 From: Khue Ngoc Dang Date: Tue, 22 Nov 2022 19:54:02 +0700 Subject: [PATCH 21/26] Added additional test --- tests/test_cloudwatch.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/test_cloudwatch.py b/tests/test_cloudwatch.py index 1a86f3e49..c4aaacf8b 100644 --- a/tests/test_cloudwatch.py +++ b/tests/test_cloudwatch.py @@ -91,8 +91,10 @@ def test_describe_log_streams_and_filter_log_events(loggroup): log_events_df = wr.cloudwatch.filter_log_events(log_group_name=loggroup) assert len(log_events_df.index) >= 4 + assert "logGroupName" in log_events_df.columns filtered_log_events_df = wr.cloudwatch.filter_log_events( log_group_name=loggroup, log_stream_names=log_streams_df["logStreamName"].tolist(), filter_pattern="REPORT" ) - assert len(filtered_log_events_df.index) > 0 + assert len(filtered_log_events_df.index) >= 4 + assert "logStreamName" in log_events_df.columns From 2c33c90ba08cb385b30cb3bda0ca7913ef33ad95 Mon Sep 17 00:00:00 2001 From: Khue Ngoc Dang Date: Tue, 22 Nov 2022 20:29:36 +0700 Subject: [PATCH 22/26] Trigger code build --- tests/test_cloudwatch.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/test_cloudwatch.py b/tests/test_cloudwatch.py index c4aaacf8b..8a77a8e73 100644 --- a/tests/test_cloudwatch.py +++ b/tests/test_cloudwatch.py @@ -65,9 +65,11 @@ def test_describe_log_streams_and_filter_log_events(loggroup): cloudwatch_log_client.create_log_stream(logGroupName=loggroup, logStreamName=log_stream) except cloudwatch_log_client.exceptions.ResourceAlreadyExistsException: continue + log_streams_df = wr.cloudwatch.describe_log_streams( log_group_name=loggroup, order_by="LastEventTime", descending=False ) + assert len(log_streams_df.index) >= 4 assert "logGroupName" in log_streams_df.columns From 368cd8910de889cd71e8ed598235d9c2157882a6 Mon Sep 17 00:00:00 2001 From: Khue Ngoc Dang Date: Wed, 23 Nov 2022 07:02:48 +0700 Subject: [PATCH 23/26] Fixed typo in _filter_log_events's arg --- awswrangler/cloudwatch.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/awswrangler/cloudwatch.py b/awswrangler/cloudwatch.py index d01c399ec..af245a568 100644 --- a/awswrangler/cloudwatch.py +++ b/awswrangler/cloudwatch.py @@ -339,7 +339,7 @@ def _filter_log_events( if start_timestamp: args["startTime"] = start_timestamp if end_timestamp: - args["endTime"] = start_timestamp + args["endTime"] = end_timestamp if filter_pattern: args["filterPattern"] = filter_pattern response: Dict[str, Any] = client_logs.filter_log_events(**args) From a381b76c787efbce6d08c16df4b04e7838588b9a Mon Sep 17 00:00:00 2001 From: Khue Ngoc Dang Date: Wed, 23 Nov 2022 20:20:26 +0700 Subject: [PATCH 24/26] Added expected exceptions test --- tests/test_cloudwatch.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/tests/test_cloudwatch.py b/tests/test_cloudwatch.py index 8a77a8e73..5f951350d 100644 --- a/tests/test_cloudwatch.py +++ b/tests/test_cloudwatch.py @@ -66,6 +66,14 @@ def test_describe_log_streams_and_filter_log_events(loggroup): except cloudwatch_log_client.exceptions.ResourceAlreadyExistsException: continue + with pytest.raises(exceptions.InvalidArgumentCombination): + wr.cloudwatch.describe_log_streams( + log_group_name=loggroup, + log_stream_name_prefix="aws_sdk_pandas_log_stream", + order_by="LastEventTime", + descending=False, + ) + log_streams_df = wr.cloudwatch.describe_log_streams( log_group_name=loggroup, order_by="LastEventTime", descending=False ) @@ -91,6 +99,13 @@ def test_describe_log_streams_and_filter_log_events(loggroup): except cloudwatch_log_client.exceptions.DataAlreadyAcceptedException: pass + with pytest.raises(exceptions.InvalidArgumentCombination): + wr.cloudwatch.filter_log_events( + log_group_name=loggroup, + log_stream_name_prefix="aws_sdk_pandas_log_stream", + log_stream_names=log_streams_df["logStreamName"].tolist(), + ) + log_events_df = wr.cloudwatch.filter_log_events(log_group_name=loggroup) assert len(log_events_df.index) >= 4 assert "logGroupName" in log_events_df.columns From 3429c21bb7d7f6900d8dde5cf141c5aaf3c35e48 Mon Sep 17 00:00:00 2001 From: Khue Ngoc Dang Date: Wed, 23 Nov 2022 20:21:06 +0700 Subject: [PATCH 25/26] describe_log_streams return emptry dataframe if no log_streams found --- awswrangler/cloudwatch.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/awswrangler/cloudwatch.py b/awswrangler/cloudwatch.py index af245a568..21c10afeb 100644 --- a/awswrangler/cloudwatch.py +++ b/awswrangler/cloudwatch.py @@ -315,9 +315,11 @@ def describe_log_streams( nextToken=response["nextToken"], ) log_streams += response["logStreams"] - df: pd.DataFrame = pd.DataFrame(log_streams) - df["logGroupName"] = log_group_name - return df + if log_streams: + df: pd.DataFrame = pd.DataFrame(log_streams) + df["logGroupName"] = log_group_name + return df + return pd.DataFrame() def _filter_log_events( From f5838043cb36e01061bc88bed19019992dcf7495 Mon Sep 17 00:00:00 2001 From: Khue Ngoc Dang Date: Wed, 23 Nov 2022 20:22:02 +0700 Subject: [PATCH 26/26] Replaced assertion with if statement --- awswrangler/cloudwatch.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/awswrangler/cloudwatch.py b/awswrangler/cloudwatch.py index 21c10afeb..705caf659 100644 --- a/awswrangler/cloudwatch.py +++ b/awswrangler/cloudwatch.py @@ -431,10 +431,9 @@ def filter_log_events( describe_log_streams_args["boto3_session"] = boto3_session if log_stream_name_prefix: describe_log_streams_args["log_stream_name_prefix"] = log_stream_name_prefix - log_stream_names = describe_log_streams( - log_group_name=log_group_name, log_stream_name_prefix=log_stream_name_prefix, boto3_session=boto3_session - )["logStreamName"].tolist() - assert log_stream_names is not None + log_streams = describe_log_streams(**describe_log_streams_args) + log_stream_names = log_streams["logStreamName"].tolist() if len(log_streams.index) else [] + args: Dict[str, Any] = { "log_group_name": log_group_name, }