Skip to content
This repository was archived by the owner on Aug 19, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions databases/backends/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,13 @@ async def fetch_one(self, query: ClauseElement) -> typing.Optional[typing.Mappin
return None
return Record(row, result_columns, self._dialect)

async def fetch_val(
self, query: ClauseElement, column: typing.Any = 0
) -> typing.Any:
assert self._connection is not None, "Connection is not acquired"
query, args, _ = self._compile(query)
return await self._connection.fetchval(query, *args, column=column)

async def execute(self, query: ClauseElement) -> typing.Any:
assert self._connection is not None, "Connection is not acquired"
query, args, result_columns = self._compile(query)
Expand Down
5 changes: 3 additions & 2 deletions databases/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,9 @@ async def fetch_val(
column: typing.Any = 0,
) -> typing.Any:
async with self._query_lock:
row = await self._connection.fetch_one(self._build_query(query, values))
return None if row is None else row[column]
return await self._connection.fetch_val(
self._build_query(query, values), column
)

async def execute(
self, query: typing.Union[ClauseElement, str], values: dict = None
Expand Down
6 changes: 6 additions & 0 deletions databases/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ async def fetch_all(self, query: ClauseElement) -> typing.List[typing.Mapping]:
async def fetch_one(self, query: ClauseElement) -> typing.Optional[typing.Mapping]:
raise NotImplementedError() # pragma: no cover

async def fetch_val(
self, query: ClauseElement, column: typing.Any = 0
) -> typing.Any:
row = await self.fetch_one(query)
return None if row is None else row[column]

async def execute(self, query: ClauseElement) -> typing.Any:
raise NotImplementedError() # pragma: no cover

Expand Down
16 changes: 16 additions & 0 deletions tests/test_databases.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,12 @@ async def test_queries(database_url):
result = await database.fetch_val(query=query)
assert result == "example1"

# row access
query = sqlalchemy.sql.select([notes.c.text])
result = await database.fetch_one(query=query)
assert result["text"] == "example1"
assert result[0] == "example1"

# iterate()
query = notes.select()
iterate_results = []
Expand Down Expand Up @@ -202,6 +208,16 @@ async def test_queries_raw(database_url):
assert result["text"] == "example2"
assert result["completed"] == False

# fetch_val()
query = "SELECT completed FROM notes WHERE text = :text"
result = await database.fetch_val(query=query, values={"text": "example1"})
assert result == True
query = "SELECT completed FROM notes WHERE text = :text"
result = await database.fetch_val(
query=query, values={"text": "example1"}, column="completed"
)
assert result == True

# iterate()
query = "SELECT * FROM notes"
iterate_results = []
Expand Down