Skip to content

"ValueError: The truth value of an array with more than one element is ambiguous. Use a.any() or a.all()" #1625

@awsrossw

Description

@awsrossw

Receiving this error when running the datawrangler based lambda function:

[ERROR] ValueError: The truth value of an array with more than one element is ambiguous. Use a.any() or a.all()
Traceback (most recent call last):
  File "/opt/python/lib/python3.9/site-packages/codeguru_profiler_agent/aws_lambda/profiler_decorator.py", line 52, in profiler_decorate
    return function(event, context)
  File "/opt/python/lib/python3.9/site-packages/codeguru_profiler_agent/aws_lambda/lambda_handler.py", line 91, in call_handler
    return handler_function(event, context)
  File "/var/task/lambda_function.py", line 75, in lambda_handler
    wr.opensearch.index_df(client=OS_client,df=df,index=os_index,bulk_size=6000)
  File "/opt/python/awswrangler/opensearch/_write.py", line 433, in index_df
    return index_documents(client=client, documents=_df_doc_generator(df), index=index, doc_type=doc_type, **kwargs)
  File "/opt/python/awswrangler/opensearch/_write.py", line 520, in index_documents
    documents = list(documents)
  File "/opt/python/awswrangler/opensearch/_write.py", line 82, in _df_doc_generator
    yield {k: _deserialize(v) for k, v in document.items() if notna(v)}
  File "/opt/python/awswrangler/opensearch/_write.py", line 82, in <dictcomp>
    yield {k: _deserialize(v) for k, v in document.items() if notna(v)}END RequestId: c04929d3-b0b7-4760-8f32-75aed2d41e2f

Updated to version 8 of the Datawrangler lambda layer and confirmed v2.16.1

Note: Not a programer...(there are probably some very ugly things in there)

import awswrangler as wr
import pandas as pd
from datetime import datetime
import json
import logging
import boto3


logger = logging.getLogger()
logger.setLevel(logging.INFO)
os_endpoint="https://search-aes-siem-htycyka7g5xv54tjdlukq5zrx4.us-east-1.es.amazonaws.com/"

OS_client = wr.opensearch.connect(
    host = os_endpoint,
    region = 'us-east-1'
)

#default index incase no category in log and used for testing
os_index = "demoindex915"

def lambda_handler(event, context):

    # retrieve bucket name and file_key from the S3 event
    if event['source'] == 'aws.s3' and event['detail-type'] == 'Object Created':
        # s3 notification from EventBridge
            bucket_name = event['detail']['bucket']['name']
            file_key = event['detail']['object']['key']
            bucket_file = "s3://"+bucket_name+"/"+file_key
            logger.info('Reading {} from {}'.format(file_key, bucket_name))

    #use our own creds
    #df = wr.s3.read_parquet(bucket_file,use_threads=True,path_suffix="gz.parquet",validate_schema=True)
    
    #use AssumeRoleCred
    sts_connection = boto3.client('sts')
    datalake_creds = sts_connection.assume_role(
        RoleArn="arn:aws:iam::956801307635:role/Moose-01afd9dd-c103-48c7-b5d0-d527e4f9ebb7-DO-NOT-DELETE",
        RoleSessionName="cross_acct_lambda"
    )
    
    ACCESS_KEY = datalake_creds['Credentials']['AccessKeyId']
    SECRET_KEY = datalake_creds['Credentials']['SecretAccessKey']
    SESSION_TOKEN = datalake_creds['Credentials']['SessionToken']

    datalake_s3 = boto3.session.Session(
    aws_access_key_id=ACCESS_KEY,
    aws_secret_access_key=SECRET_KEY,
    aws_session_token=SESSION_TOKEN
    )
    
    df = wr.s3.read_parquet(bucket_file,use_threads=True,path_suffix="gz.parquet",validate_schema=True,boto3_session=datalake_s3)
    #uncomment after testing
    #os_index = "moose-"+(df['category'].iloc[0]).replace(" ", "_").lower()
    
    if "answers" in df.columns:
        df = df[df['answers'].map(lambda d: len(d)) > 0]
        df = df.explode('answers')
        
    
    wr.opensearch.create_index(client=OS_client,index=os_index, 
            mappings={
                    "dynamic":"runtime",
                    "strings" : {
                        "match_mapping_type" : "string",
                            "mapping" : {
                            "type" : "keyword"}
                        }
            }
    )
                
    df.to_json(orient="table")
    logger.info('Writing {} to {}'.format(file_key, os_endpoint))
    wr.opensearch.index_df(client=OS_client,df=df,index=os_index,bulk_size=6000)
    

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions