Skip to content

Conversation

larryhudson
Copy link

@larryhudson larryhudson commented Aug 14, 2025

Closes #1041

@larryhudson larryhudson force-pushed the larryhudson/add-bedrock-prompt-caching branch from accb936 to 84c745c Compare August 14, 2025 19:40
@DouweM DouweM self-assigned this Aug 14, 2025
Copy link
Collaborator

@DouweM DouweM left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @larryhudson, I've left some notes on the implementation. I'll review the tests and examples once the implementation itself has stablized.

@@ -449,7 +461,7 @@ class ToolReturn:


# Ideally this would be a Union of types, but Python 3.9 requires it to be a string, and strings don't work with `isinstance``.
MultiModalContentTypes = (ImageUrl, AudioUrl, DocumentUrl, VideoUrl, BinaryContent)
MultiModalContentTypes = (ImageUrl, AudioUrl, DocumentUrl, VideoUrl, BinaryContent, CachePoint)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this should be added here, as cache points are not multi-modal content

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, thanks

if isinstance(item, CachePoint):
if last_block is not None:
# Add cache_control to the previous content block
last_block['cache_control'] = {'type': 'ephemeral'}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a ModelRequest has two UserPromptParts, or a SystemPromptPart followed by a UserPromptPart, and the cache point is the first item on the second part, I think we should apply this to the last part of the first part. That means we should probably do this one level up in _map_message. We can allow _map_user_prompt to yield a CachePoint directly and handle it in a special way.

That will also allow putting this after a ToolReturnPart, and having it be added to the tool return part itself.

This would also simplify the example because you can always just add a new UserPromptPart with a CachePoint instead of adding it to the existing one and having to check whether it's a str etc.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, I've implemented this pattern in models/bedrock.py and will apply the same logic in models/anthropic.py. I think handling the case when the UserPromptPart starts with a CachePoint, immediately following the SystemPromptPart, makes the code a little hard to follow! So open to any ideas to make it clearer / simpler for the user

cache_creation_tokens = details.get('cache_creation_input_tokens', 0)
cache_read_tokens = details.get('cache_read_input_tokens', 0)

request_tokens = input_tokens + cache_creation_tokens + cache_read_tokens

return usage.Usage(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we leave the Usage changes off for now and do them in a followup PR? We just did some refactoring there: #2378 so I'll have to check how this corresponds to the other places we're going to be using the usage data.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, thanks

profile = self._provider.model_profile(self.model_name)
if isinstance(profile, BedrockModelProfile):
return profile.bedrock_supports_prompt_caching
return False
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use BedrockModelProfile.from_profile(self.profile).bedrock_supports_prompt_caching. We can inline that where we use it and drop this helper method

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, have cleaned it up

@DouweM
Copy link
Collaborator

DouweM commented Aug 16, 2025

@larryhudson Just a heads-up that I'll be out this coming week and will be back the 25th. Assuming this is not urgent I'll review it then. If it is, please ping Kludex! Appreciate the patience :)

@larryhudson larryhudson force-pushed the larryhudson/add-bedrock-prompt-caching branch 11 times, most recently from ef70980 to ac6a93c Compare August 16, 2025 17:59
@larryhudson
Copy link
Author

@larryhudson Just a heads-up that I'll be out this coming week and will be back the 25th. Assuming this is not urgent I'll review it then. If it is, please ping Kludex! Appreciate the patience :)

Thanks @DouweM, I've spent some time on this today and will spend a little more time cleaning it up. All good to wait until you're back, have a good time off!

@@ -669,9 +731,14 @@ def model_name(self) -> str:
return self._model_name

def _map_usage(self, metadata: ConverseStreamMetadataEventTypeDef) -> usage.RequestUsage:
print('DEBUG: raw usage', metadata['usage'])

return usage.RequestUsage(
input_tokens=metadata['usage']['inputTokens'],
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same remark as above.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -0,0 +1,479 @@
#!/usr/bin/env python3
"""Example script to test prompt caching with AWS Bedrock.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could (a version of) this be under tests instead of examples?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep I can do that, would you want me to rewrite the file so that it is a bunch of tests, following the patterns in the tests directory?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@larryhudson Yep, we can't merge this PR without 100% test coverage, so you can use this as a starting point to write the tests, or just write them from scratch.

@@ -387,7 +389,15 @@ async def _map_message(self, messages: list[ModelMessage]) -> tuple[str, list[Be
system_prompt_parts.append(request_part.content)
elif isinstance(request_part, UserPromptPart):
async for content in self._map_user_prompt(request_part):
user_content_params.append(content)
if isinstance(content, dict) and content.get('type') == 'ephemeral':
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather have _map_user_prompt yield the CachePoint itself so we don't have to do dict introspection here

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, good idea

user_content_params.append(content)
if isinstance(content, dict) and content.get('type') == 'ephemeral':
if user_content_params:
# TODO(larryhudson): Ensure the last user content param supports cache_control
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should raise a UserError if there was no previous part to add this to

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, this is now done in '_add_cache_content_to_last_param'



# Supported models: https://docs.aws.amazon.com/bedrock/latest/userguide/prompt-caching.html
ANTHROPIC_CACHING_SUPPORTED_MODELS = ['claude-3-5-sonnet', 'claude-3-5-haiku', 'claude-3-7-sonnet', 'claude-sonnet-4']
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make this and the next one sets

Suggested change
ANTHROPIC_CACHING_SUPPORTED_MODELS = ['claude-3-5-sonnet', 'claude-3-5-haiku', 'claude-3-7-sonnet', 'claude-sonnet-4']
ANTHROPIC_CACHING_SUPPORTED_MODELS = {'claude-3-5-sonnet', 'claude-3-5-haiku', 'claude-3-7-sonnet', 'claude-sonnet-4'}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

if any(supported in model_name for supported in ANTHROPIC_CACHING_SUPPORTED_MODELS):
return BedrockModelProfile(
bedrock_supports_tool_choice=False,
bedrock_send_back_thinking_parts=True,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of duplicating these 2 fields, can we build just one BedrockModelProfile with bedrock_supports_prompt_caching=any(supported in model_name for supported in ANTHROPIC_CACHING_SUPPORTED_MODELS)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

if isinstance(part, SystemPromptPart):
system_prompt.append({'text': part.content})
elif isinstance(part, UserPromptPart):
bedrock_messages.extend(await self._map_user_prompt(part, document_count))
# Handle case where UserPromptPart starts with a CachePoint and follows the SystemPromptPart
cache_point_for_system_prompt, user_prompt_part = self._extract_leading_cache_point(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of extracting, could we have _map_user_prompt return tuple[leading_cache_point: bool, list[MessageUnionTypeDef]]?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, much better!


if (
immediately_follows_system_prompt
and isinstance(part.content, list)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

part.content could also itself be a CachePoint right? So maybe we should check if isinstance(list(part.content)[0], CachePoint)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

if isinstance(item, CachePoint):
if supports_caching:
# TODO: update the boto3 bedrock type defs so 'cachePoint' is available
content.append({'cachePoint': {'type': 'default'}})
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something worth noting about cachePoint in Bedrock is that BinaryContent should always be first and user prompt should always be last. This is because otherwise, the cachePoint we add to the messages ends up yielding the following error: The model returned the following errors: messages.0.content.2.type: Field required.

I'm not sure if Pydantic AI should workaround this issue by rearranging the order of the messages to ensure that BinaryContent is always first. This issue was really hard for me to fix and figure out so I think it might be worth doing this, as you can see the error AWS returns is extremely vague and there's no information about it online. I reported it to AWS and they simply responded by telling me that prompt caching works for them and refused to actually look into it properly :/

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rany2 Hmm, reordering the messages would be very unexpected I think. Can you show me the payload that triggers that error? Because the BinaryContent blocks do have a type field unlike the error message suggests:

if item.is_image:
yield BetaImageBlockParam(
source={'data': io.BytesIO(item.data), 'media_type': item.media_type, 'type': 'base64'}, # type: ignore
type='image',
)
elif item.media_type == 'application/pdf':
yield BetaBase64PDFBlockParam(
source=BetaBase64PDFSourceParam(
data=io.BytesIO(item.data),
media_type='application/pdf',
type='base64',
),
type='document',
)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@DouweM This reproduces the issue I'm describing.

import asyncio
import os

from pydantic_ai import Agent, BinaryContent
from pydantic_ai.messages import CachePoint
from pydantic_ai.models.bedrock import BedrockConverseModel
from pydantic_ai.providers.bedrock import BedrockProvider

agent = Agent(
    model=BedrockConverseModel(
        model_name=os.environ["MODEL"],
        provider=BedrockProvider(
            region_name=os.environ["AWS_REGION"],
            aws_access_key_id=os.environ["AWS_ACCESS_KEY_ID"],
            aws_secret_access_key=os.environ["AWS_SECRET_ACCESS_KEY"],
        ),
    ),
)


async def amain():
    # This works.
    response = await agent.run(
        [
            "What is 2+2? Provide the answer only.",
            CachePoint(),
        ]
    )
    print(response)

    # This also works.
    response = await agent.run(
        [
            BinaryContent(
                data="What is 2+2? Provide the answer only.",
                media_type="text/plain",
            ),
            "Process the attached text file. Return the answer only.",
            CachePoint(),
        ]
    )
    print(response)

    # botocore.errorfactory.ValidationException: An error occurred (ValidationException) when calling the Converse operation:
    # The model returned the following errors: messages.0.content.2.type: Field required
    response = await agent.run(
        [
            "Process the attached text file. Return the answer only.",
            BinaryContent(
                data="What is 2+2? Provide the answer only.",
                media_type="text/plain",
            ),
            CachePoint(),
        ]
    )
    print(response)


def main():
    asyncio.run(amain())


if __name__ == "__main__":
    main()

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be clear, this is only an issue when you use CachePoint(). Without prompt caching these all work just fine.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rany2 The error message is incorrect then, right, because item at index 2 (the cache point) does have a type: {'cachePoint': {'type': 'default'}}.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rany2 Can you please share the request payload that reproduces this? I'll see if I can get this to the attention of someone at Amazon.

Returning a more useful error message sounds like the best option for now

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@DouweM This results in a 400 response when sent to the converse endpoint:

{
  "messages": [
    {
      "role": "user",
      "content": [
        {
          "text": "Process the attached text file. Return the answer only."
        },
        {
          "document": {
            "name": "Document 1",
            "format": "txt",
            "source": {
              "bytes": "V2hhdCBpcyAyKzI/IFByb3ZpZGUgdGhlIGFuc3dlciBvbmx5Lg=="
            }
          }
        },
        {
          "cachePoint": {
            "type": "default"
          }
        }
      ]
    }
  ],
  "system": [],
  "inferenceConfig": {}
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rany2 Thanks, I've sent it to our contacts at Amazon, let's see what they say.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks both :) I haven't done anything regarding binary content yet - will wait for the response from Amazon

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the response I got :


I’ve raised this with service team, they’re now looking into that. In the meantime, leaving the text/ document sequence as is and adding another text right before the cachePoint could be a potential workaround:

[
    {"text": "Good example:"},
    {"document": {"name": "Doc_1", ...}}
    {"text": "Bad example:"},
    {"document": {"name": "Doc_2", ...}},
    {"text": "Which is the good example?"},
    {"cachePoint": {"type": "default"}}
]

I wonder if the API is happy if we just include an empty {"text": ""}, that'd be worth a try!

if isinstance(part, SystemPromptPart):
system_prompt.append({'text': part.content})
elif isinstance(part, UserPromptPart):
bedrock_messages.extend(await self._map_user_prompt(part, document_count))
# Handle case where UserPromptPart starts with a CachePoint and follows the SystemPromptPart
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is no SystemPromptPart, I suppose we should add a leading CachePoint to the toolConfig like in the "tools checkpoint" example here: https://docs.aws.amazon.com/bedrock/latest/userguide/prompt-caching.html

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@larryhudson larryhudson force-pushed the larryhudson/add-bedrock-prompt-caching branch 4 times, most recently from f6c4db7 to 300871f Compare September 6, 2025 16:42
@larryhudson larryhudson force-pushed the larryhudson/add-bedrock-prompt-caching branch from 300871f to a2222e1 Compare September 6, 2025 16:45
@larryhudson
Copy link
Author

Hey @DouweM sorry for the delay on this. I've gone through and taken in your changes. I've also updated the boto3 + mypy boto3 runtime dependencies so that 'CachePoint' works as expected. I think this is ready for another review when you have time

@larryhudson larryhudson requested a review from DouweM September 6, 2025 16:48
tools = self._get_tools(model_request_parameters)
if not tools:
return None

if should_add_cache_point:
tools[-1]['cachePoint'] = {'type': 'default'}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is causing test_bedrock_anthropic_tool_with_thinking to fail with this error:

Invalid number of parameters set for tagged union structure toolConfig.tools[0]. Can only set one of the following keys: toolSpec, cachePoint.

It sounds like the cachePoint has to be a separate item in tools, not a field on the last item.

@@ -0,0 +1,479 @@
#!/usr/bin/env python3
"""Example script to test prompt caching with AWS Bedrock.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@larryhudson Yep, we can't merge this PR without 100% test coverage, so you can use this as a starting point to write the tests, or just write them from scratch.

@@ -46,7 +46,13 @@ class ThinkingPart(TypedDict):
content: NotRequired[str]


MessagePart: TypeAlias = 'TextPart | ToolCallPart | ToolCallResponsePart | MediaUrlPart | BinaryDataPart | ThinkingPart'
class CachePointPart(TypedDict):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need this here, as it mentions at the top of the file, these are supposed to match the OpenTelemetry GenAI spec, which I don't think has cache points.

@@ -637,6 +649,8 @@ def otel_message_parts(self, settings: InstrumentationSettings) -> list[_otel_me
if settings.include_content and settings.include_binary_content:
converted_part['content'] = base64.b64encode(part.data).decode()
parts.append(converted_part)
elif isinstance(part, CachePoint):
parts.append(_otel_messages.CachePointPart(type=part.kind))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See above, I think we should skip it instead -- unless you've confirmed that the OTel spec does have cache points.

| BetaImageBlockParam
| BetaToolResultBlockParam
)
"""Content block parameter types that support cache_control."""
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please link to the doc this came from?

tool_config = self._map_tool_config(model_request_parameters)
tool_config = self._map_tool_config(
model_request_parameters,
should_add_cache_point=(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is now always adding a cache point, independent of has_leading_cache_point. I think we have map_messages return that value if it wasn't already used by adding to the system prompt, and then use it here. That may require some reordering.

part, document_count, profile.bedrock_supports_prompt_caching
)
if has_leading_cache_point:
system_prompt.append({'cachePoint': {'type': 'default'}})
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should only add it to the system prompt if this is the first ModelRequest -- if there were previous ModelResponses from the assistant, we should add it to the last assistant message, I think.

elif isinstance(item, CachePoint):
if supports_caching:
content.append({'cachePoint': {'type': 'default'}})
continue
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need this continue

if isinstance(part.content, str):
content.append({'text': part.content})
else:
for item in part.content:
if part.content and isinstance(part.content[0], CachePoint):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can move this to the isinstance(item, CachePoint) below by changing for item in part.content to for i, item in enumerate(part.content) and then checking if i == 0 to know if it was the first element

if isinstance(part.content, str):
content.append({'text': part.content})
else:
for item in part.content:
if part.content and isinstance(part.content[0], CachePoint):
has_leading_cache_point = True
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we sure that bedrock needs this special behavior of moving this to the system prompt? It doesn't support a cachePoint at the start of user content?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Anthropic prompt caching (inc. Anthropic on Bedrock)
3 participants