Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions awswrangler/timestream.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,7 @@ def create_table(
memory_retention_hours: int,
magnetic_retention_days: int,
tags: Optional[Dict[str, str]] = None,
timestream_additional_kwargs: Optional[Dict[str, Any]] = None,
boto3_session: Optional[boto3.Session] = None,
) -> str:
"""Create a new Timestream database.
Expand All @@ -415,6 +416,9 @@ def create_table(
Tags enable you to categorize databases and/or tables, for example,
by purpose, owner, or environment.
e.g. {"foo": "boo", "bar": "xoo"})
timestream_additional_kwargs : Optional[Dict[str, Any]]
Forwarded to botocore requests.
e.g. timestream_additional_kwargs={'MagneticStoreWriteProperties': {'EnableMagneticStoreWrites': True}}
boto3_session : boto3.Session(), optional
Boto3 Session. The default boto3 Session will be used if boto3_session receive None.

Expand All @@ -437,13 +441,15 @@ def create_table(

"""
client: boto3.client = _utils.client(service_name="timestream-write", session=boto3_session)
timestream_additional_kwargs = {} if timestream_additional_kwargs is None else timestream_additional_kwargs
args: Dict[str, Any] = {
"DatabaseName": database,
"TableName": table,
"RetentionProperties": {
"MemoryStoreRetentionPeriodInHours": memory_retention_hours,
"MagneticStoreRetentionPeriodInDays": magnetic_retention_days,
},
**timestream_additional_kwargs,
}
if tags is not None:
args["Tags"] = [{"Key": k, "Value": v} for k, v in tags.items()]
Expand Down
28 changes: 28 additions & 0 deletions tests/test_timestream.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
from datetime import datetime

import boto3
import pandas as pd
import pytest

Expand Down Expand Up @@ -282,3 +283,30 @@ def test_list_tables(timestream_database_and_table):

tables_in_db = wr.timestream.list_tables(database=timestream_database_and_table)
assert f"{timestream_database_and_table}_2" not in tables_in_db


@pytest.mark.parametrize(
"timestream_additional_kwargs",
[None, {"MagneticStoreWriteProperties": {"EnableMagneticStoreWrites": True}}],
)
def test_create_table_additional_kwargs(timestream_database_and_table, timestream_additional_kwargs):
client_timestream = boto3.client("timestream-write")
wr.timestream.create_table(
database=timestream_database_and_table,
table=f"{timestream_database_and_table}_3",
memory_retention_hours=1,
magnetic_retention_days=1,
timestream_additional_kwargs=timestream_additional_kwargs,
)

desc = client_timestream.describe_table(
DatabaseName=timestream_database_and_table, TableName=f"{timestream_database_and_table}_3"
)["Table"]
if timestream_additional_kwargs is None:
assert desc["MagneticStoreWriteProperties"].get("EnableMagneticStoreWrites") is False
elif timestream_additional_kwargs["MagneticStoreWriteProperties"]["EnableMagneticStoreWrites"] is True:
assert desc["MagneticStoreWriteProperties"].get("EnableMagneticStoreWrites") is True

wr.timestream.delete_table(database=timestream_database_and_table, table=f"{timestream_database_and_table}_3")
tables_in_db = wr.timestream.list_tables(database=timestream_database_and_table)
assert f"{timestream_database_and_table}_3" not in tables_in_db