diff --git a/awswrangler/pandas.py b/awswrangler/pandas.py index f61bfa601..d392fe9a7 100644 --- a/awswrangler/pandas.py +++ b/awswrangler/pandas.py @@ -17,7 +17,8 @@ EmptyDataframe, InvalidSerDe, InvalidCompression) from awswrangler.utils import calculate_bounders -from awswrangler import s3, athena +from awswrangler import s3 +from awswrangler.athena import Athena logger = logging.getLogger(__name__) @@ -607,8 +608,21 @@ def to_s3(self, :param inplace: True is cheapest (CPU and Memory) but False leaves your DataFrame intact :return: List of objects written on S3 """ + if not partition_cols: + partition_cols = [] + if not cast_columns: + cast_columns = {} dataframe = Pandas.normalize_columns_names_athena(dataframe, inplace=inplace) + cast_columns = { + Athena.normalize_column_name(k): v + for k, v in cast_columns.items() + } + logger.debug(f"cast_columns: {cast_columns}") + partition_cols = [ + Athena.normalize_column_name(x) for x in partition_cols + ] + logger.debug(f"partition_cols: {partition_cols}") dataframe = Pandas.drop_duplicated_columns(dataframe=dataframe, inplace=inplace) if compression is not None: @@ -628,8 +642,6 @@ def to_s3(self, raise UnsupportedFileFormat(file_format) if dataframe.empty: raise EmptyDataframe() - if not partition_cols: - partition_cols = [] if ((mode == "overwrite") or ((mode == "overwrite_partitions") and # noqa (not partition_cols))): @@ -1042,7 +1054,7 @@ def normalize_columns_names_athena(dataframe, inplace=True): if inplace is False: dataframe = dataframe.copy(deep=True) dataframe.columns = [ - athena.Athena.normalize_column_name(x) for x in dataframe.columns + Athena.normalize_column_name(x) for x in dataframe.columns ] return dataframe diff --git a/testing/test_awswrangler/test_pandas.py b/testing/test_awswrangler/test_pandas.py index e7df77fd2..cf20137f6 100644 --- a/testing/test_awswrangler/test_pandas.py +++ b/testing/test_awswrangler/test_pandas.py @@ -781,16 +781,22 @@ def test_read_sql_athena_with_time_zone(session, bucket, database): def test_normalize_columns_names_athena(): dataframe = pandas.DataFrame({ - "CammelCase": [1, 2, 3], + "CamelCase": [1, 2, 3], "With Spaces": [4, 5, 6], "With-Dash": [7, 8, 9], "Ãccént": [10, 11, 12], + "with.dot": [10, 11, 12], + "Camel_Case2": [13, 14, 15], + "Camel___Case3": [16, 17, 18] }) Pandas.normalize_columns_names_athena(dataframe=dataframe, inplace=True) - assert dataframe.columns[0] == "cammel_case" + assert dataframe.columns[0] == "camel_case" assert dataframe.columns[1] == "with_spaces" assert dataframe.columns[2] == "with_dash" assert dataframe.columns[3] == "accent" + assert dataframe.columns[4] == "with_dot" + assert dataframe.columns[5] == "camel_case2" + assert dataframe.columns[6] == "camel_case3" def test_to_parquet_with_normalize( @@ -799,11 +805,13 @@ def test_to_parquet_with_normalize( database, ): dataframe = pandas.DataFrame({ - "CammelCase": [1, 2, 3], + "CamelCase": [1, 2, 3], "With Spaces": [4, 5, 6], "With-Dash": [7, 8, 9], "Ãccént": [10, 11, 12], "with.dot": [10, 11, 12], + "Camel_Case2": [13, 14, 15], + "Camel___Case3": [16, 17, 18] }) session.pandas.to_parquet(dataframe=dataframe, database=database, @@ -818,11 +826,57 @@ def test_to_parquet_with_normalize( sleep(2) assert len(dataframe.index) == len(dataframe2.index) assert (len(list(dataframe.columns)) + 1) == len(list(dataframe2.columns)) - assert dataframe2.columns[0] == "cammel_case" + assert dataframe2.columns[0] == "camel_case" assert dataframe2.columns[1] == "with_spaces" assert dataframe2.columns[2] == "with_dash" assert dataframe2.columns[3] == "accent" assert dataframe2.columns[4] == "with_dot" + assert dataframe2.columns[5] == "camel_case2" + assert dataframe2.columns[6] == "camel_case3" + + +def test_to_parquet_with_normalize_and_cast( + session, + bucket, + database, +): + dataframe = pandas.DataFrame({ + "CamelCase": [1, 2, 3], + "With Spaces": [4, 5, 6], + "With-Dash": [7, 8, 9], + "Ãccént": [10, 11, 12], + "with.dot": [10, 11, 12], + "Camel_Case2": [13, 14, 15], + "Camel___Case3": [16, 17, 18] + }) + session.pandas.to_parquet(dataframe=dataframe, + database=database, + path=f"s3://{bucket}/TestTable-with.dot/", + mode="overwrite", + partition_cols=["CamelCase"], + cast_columns={ + "Camel_Case2": "double", + "Camel___Case3": "float" + }) + dataframe2 = None + for counter in range(10): + dataframe2 = session.pandas.read_sql_athena( + sql="select * from test_table_with_dot", database=database) + if len(dataframe.index) == len(dataframe2.index): + break + sleep(2) + assert len(dataframe.index) == len(dataframe2.index) + assert (len(list(dataframe.columns)) + 1) == len(list(dataframe2.columns)) + assert dataframe2.columns[0] == "with_spaces" + assert dataframe2.columns[1] == "with_dash" + assert dataframe2.columns[2] == "accent" + assert dataframe2.columns[3] == "with_dot" + assert dataframe2.columns[4] == "camel_case2" + assert dataframe2.columns[5] == "camel_case3" + assert dataframe2.columns[6] == "__index_level_0__" + assert dataframe2.columns[7] == "camel_case" + assert dataframe2[dataframe2.columns[4]].dtype == "float64" + assert dataframe2[dataframe2.columns[5]].dtype == "float64" def test_drop_duplicated_columns():