Skip to content

Commit e32a147

Browse files
authored
Merge pull request #40 from awslabs/pandas-duplicated_cols
Removing Pandas duplicated columns
2 parents 5f9cc93 + 999946c commit e32a147

File tree

2 files changed

+71
-9
lines changed

2 files changed

+71
-9
lines changed

awswrangler/pandas.py

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -488,6 +488,7 @@ def to_csv(
488488
mode="append",
489489
procs_cpu_bound=None,
490490
procs_io_bound=None,
491+
inplace=True,
491492
):
492493
"""
493494
Write a Pandas Dataframe as CSV files on S3
@@ -504,6 +505,7 @@ def to_csv(
504505
:param mode: "append", "overwrite", "overwrite_partitions"
505506
:param procs_cpu_bound: Number of cores used for CPU bound tasks
506507
:param procs_io_bound: Number of cores used for I/O bound tasks
508+
:param inplace: True is cheapest (CPU and Memory) but False leaves your DataFrame intact
507509
:return: List of objects written on S3
508510
"""
509511
if serde not in Pandas.VALID_CSV_SERDES:
@@ -522,7 +524,8 @@ def to_csv(
522524
compression=None,
523525
procs_cpu_bound=procs_cpu_bound,
524526
procs_io_bound=procs_io_bound,
525-
extra_args=extra_args)
527+
extra_args=extra_args,
528+
inplace=inplace)
526529

527530
def to_parquet(self,
528531
dataframe,
@@ -535,7 +538,8 @@ def to_parquet(self,
535538
compression="snappy",
536539
procs_cpu_bound=None,
537540
procs_io_bound=None,
538-
cast_columns=None):
541+
cast_columns=None,
542+
inplace=True):
539543
"""
540544
Write a Pandas Dataframe as parquet files on S3
541545
Optionally writes metadata on AWS Glue.
@@ -550,7 +554,8 @@ def to_parquet(self,
550554
:param compression: None, snappy, gzip, lzo
551555
:param procs_cpu_bound: Number of cores used for CPU bound tasks
552556
:param procs_io_bound: Number of cores used for I/O bound tasks
553-
:param cast_columns: Dictionary of columns names and Athena/Glue types to be casted. (E.g. {"col name": "bigint", "col2 name": "int"})
557+
:param cast_columns: Dictionary of columns names and Athena/Glue types to be casted (E.g. {"col name": "bigint", "col2 name": "int"})
558+
:param inplace: True is cheapest (CPU and Memory) but False leaves your DataFrame intact
554559
:return: List of objects written on S3
555560
"""
556561
return self.to_s3(dataframe=dataframe,
@@ -564,7 +569,8 @@ def to_parquet(self,
564569
compression=compression,
565570
procs_cpu_bound=procs_cpu_bound,
566571
procs_io_bound=procs_io_bound,
567-
cast_columns=cast_columns)
572+
cast_columns=cast_columns,
573+
inplace=inplace)
568574

569575
def to_s3(self,
570576
dataframe,
@@ -579,7 +585,8 @@ def to_s3(self,
579585
procs_cpu_bound=None,
580586
procs_io_bound=None,
581587
cast_columns=None,
582-
extra_args=None):
588+
extra_args=None,
589+
inplace=True):
583590
"""
584591
Write a Pandas Dataframe on S3
585592
Optionally writes metadata on AWS Glue.
@@ -597,9 +604,13 @@ def to_s3(self,
597604
:param procs_io_bound: Number of cores used for I/O bound tasks
598605
:param cast_columns: Dictionary of columns names and Athena/Glue types to be casted. (E.g. {"col name": "bigint", "col2 name": "int"}) (Only for "parquet" file_format)
599606
:param extra_args: Extra arguments specific for each file formats (E.g. "sep" for CSV)
607+
:param inplace: True is cheapest (CPU and Memory) but False leaves your DataFrame intact
600608
:return: List of objects written on S3
601609
"""
602-
Pandas.normalize_columns_names_athena(dataframe, inplace=True)
610+
dataframe = Pandas.normalize_columns_names_athena(dataframe,
611+
inplace=inplace)
612+
dataframe = Pandas.drop_duplicated_columns(dataframe=dataframe,
613+
inplace=inplace)
603614
if compression is not None:
604615
compression = compression.lower()
605616
file_format = file_format.lower()
@@ -635,7 +646,8 @@ def to_s3(self,
635646
procs_cpu_bound=procs_cpu_bound,
636647
procs_io_bound=procs_io_bound,
637648
cast_columns=cast_columns,
638-
extra_args=extra_args)
649+
extra_args=extra_args,
650+
inplace=inplace)
639651
if database:
640652
self._session.glue.metadata_to_glue(dataframe=dataframe,
641653
path=path,
@@ -662,7 +674,8 @@ def data_to_s3(self,
662674
procs_cpu_bound=None,
663675
procs_io_bound=None,
664676
cast_columns=None,
665-
extra_args=None):
677+
extra_args=None,
678+
inplace=True):
666679
if not procs_cpu_bound:
667680
procs_cpu_bound = self._session.procs_cpu_bound
668681
if not procs_io_bound:
@@ -704,7 +717,8 @@ def data_to_s3(self,
704717
session_primitives=self._session.primitives,
705718
file_format=file_format,
706719
cast_columns=cast_columns,
707-
extra_args=extra_args)
720+
extra_args=extra_args,
721+
isolated_dataframe=inplace)
708722
if mode == "overwrite_partitions" and partition_cols:
709723
if procs_io_bound > procs_cpu_bound:
710724
num_procs = floor(
@@ -1034,3 +1048,11 @@ def normalize_columns_names_athena(dataframe, inplace=True):
10341048
athena.Athena.normalize_column_name(x) for x in dataframe.columns
10351049
]
10361050
return dataframe
1051+
1052+
@staticmethod
1053+
def drop_duplicated_columns(dataframe, inplace=True):
1054+
if inplace is False:
1055+
dataframe = dataframe.copy(deep=True)
1056+
duplicated_cols = dataframe.columns.duplicated()
1057+
logger.warning(f"Dropping repeated columns: {duplicated_cols}")
1058+
return dataframe.loc[:, ~duplicated_cols]

testing/test_awswrangler/test_pandas.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -824,3 +824,43 @@ def test_to_parquet_with_normalize(
824824
assert dataframe2.columns[2] == "with_dash"
825825
assert dataframe2.columns[3] == "accent"
826826
assert dataframe2.columns[4] == "with_dot"
827+
828+
829+
def test_drop_duplicated_columns():
830+
dataframe = pandas.DataFrame({
831+
"a": [1, 2, 3],
832+
"b": [4, 5, 6],
833+
"c": [7, 8, 9],
834+
})
835+
dataframe.columns = ["a", "a", "c"]
836+
dataframe = Pandas.drop_duplicated_columns(dataframe=dataframe)
837+
assert dataframe.columns[0] == "a"
838+
assert dataframe.columns[1] == "c"
839+
840+
841+
def test_to_parquet_duplicated_columns(
842+
session,
843+
bucket,
844+
database,
845+
):
846+
dataframe = pandas.DataFrame({
847+
"a": [1, 2, 3],
848+
"b": [4, 5, 6],
849+
"c": [7, 8, 9],
850+
})
851+
dataframe.columns = ["a", "a", "c"]
852+
session.pandas.to_parquet(dataframe=dataframe,
853+
database=database,
854+
path=f"s3://{bucket}/test/",
855+
mode="overwrite")
856+
dataframe2 = None
857+
for counter in range(10):
858+
dataframe2 = session.pandas.read_sql_athena(sql="select * from test",
859+
database=database)
860+
if len(dataframe.index) == len(dataframe2.index):
861+
break
862+
sleep(2)
863+
assert len(dataframe.index) == len(dataframe2.index)
864+
assert len(list(dataframe.columns)) == len(list(dataframe2.columns))
865+
assert dataframe2.columns[0] == "a"
866+
assert dataframe2.columns[1] == "c"

0 commit comments

Comments
 (0)