Combine Amazon Translate with Elasticsearch and Skedler to build a cost-efficient multi-lingual omnichannel customer care – Part 2 of 2

In the previous post, we presented a system architecture to translate text from multiple languages with AWS Translate, index this information in Elasticsearch 6.2.3 for fast search, visualize the data with Kibana, and automated sending of customized intelligence with Skedler Reports and Alerts.

In this post, we are going to see how to implement the previously described architecture.

The main steps are:

  • Define API Gateway HTTP endpoints
  • Build a AWS Lambda Function
  • Deploy to AWS with Serverless framework
  • Translate text with AWS Translate
  • Index to Elasticsearch 6.2.3
  • Search in Elasticsearch by language – full-text search
  • Visualize, report and monitor with Kibana dashboards
  • Use Skedler Reports and Alerts for reporting, monitoring and alerting

We are going to define two HTTP API methods, one to translate and index new inquiries and another one to search for them.  We will use AWS API Gateway, a fully managed service that makes it easy for developers to create, publish, maintain, monitor, and secure APIs at any scale.

The core of our solution will receive the inquiry (as a string) to be translated, translate it and index the text with the translations to Elasticsearch.

We will use AWS Lambda. It lets you run code without provisioning or managing servers. You pay only for the compute time you consume – there is no charge when your code is not running.  With Lambda, you can run code for virtually any type of application or backend service – all with zero administration.  Just upload your code and Lambda takes care of everything required to run and scale your code with high availability. You can set up your code to automatically trigger from other AWS services or call it directly from any web or mobile app.

To deploy our AWS Lambda function and the API Gateway HTTP endpoints, we will use the Serverless Framework.  Serverless is a toolkit for deploying and operating serverless architectures.

1. API Gateway

We are going to configure the following HTTP enpoints:

  • HTTP POST /createTicket
  • HTTP GET /searchTicket

The createTicket endpoint will be used to translate the text using AWS Translate and to index the document in Elasticsearch. The searchTicket endpoint will be used to search for documents in different languages.  The handler for each endpoint will be an AWS Lambda function.

Below is the serverless.yml section where we have defined the two endpoints.

functions:

  create:

    handler: handler.create_ticket

    events:

      – http:

          path: createTicket

          method: post

  search:

    handler: handler.search_ticket

    events:

      – http:

          path: searchTicket

          method: get

2. AWS Lambda

Once we have defined the two endpoints, we need to write the Lambda function. We do not focus on the deploy of the function in AWS, the Serverless framework will take care of it.   The function we are going to write will perform the following:

  • Get the input that need be translated (the customer inquiry)
  • Invoke AWS Translate and get the translations of the input
  • Create the Elasticsearch document
  • Index the document

def create_ticket(event, context):

    body = json.loads(event[‘body’])

    text = body[‘text’]

    customer_code = body[‘customerCode’]

    country = body[‘country’]

Detect the language of the input text

translate_client = boto3.client(‘translate’)

comprehend_client = boto3.client(‘comprehend’)

def create_ticket(event, context):

    body = json.loads(event[‘body’])

    text = body[‘text’]

    customer_code = body[‘customerCode’]

    country = body[‘country’]

    target_languages = os.environ[‘AWS_TRANSLATE_SUPPORTED_LANGUAGES’].split()

    dominant_language = comprehend_client.detect_dominant_language(

        Text=text

    )[‘Languages’][0][‘LanguageCode’]

and invoke AWS Translation

translate_client = boto3.client(‘translate’)

def get_translation(text, source_dominant_language, target_dominant_language):

   return translate_client.translate_text(

        Text=text,

        SourceLanguageCode=source_dominant_language,

        TargetLanguageCode=target_dominant_language

    )[‘TranslatedText’] 

Create the JSON document and index it to Elasticsearch:

es = Elasticsearch(

    [os.environ[‘ELASTICSEARCH_HOST’]],

    verify_certs=False

)

def index_new_document(english_text, translations, customer_code, country):

    result_nested_obj = []

    for key, value in translations.items():

        result_nested_obj.append({“language”:key, “text”:value})

    doc = {

        “text” : english_text,

        “language” : ‘en’,

        “translations” : result_nested_obj,

        “timestamp”: datetime.now(),

        “customer_code”: customer_code,

        “country”: country,

        “ticket_number”: str(uuid.uuid4())

    }

    es.index(index=os.environ[‘ELASTICSEARCH_INDEX_NAME’], doc_type=os.environ[‘ELASTICSEARCH_TYPE_NAME’], body=doc)

As you may have noticed, we used environment variables. They are defined in the serverless.yml configuration file.

provider:

  name: aws

  runtime: python3.6

  region: eu-west-1

  memorySize: 1024

  timeout: 300

  environment:

    AWS_TRANSLATE_SUPPORTED_LANGUAGES: ‘ar zh fr de pt es’ # supported AWS Translate languages

    ELASTICSEARCH_HOST: ‘https://yourElasticsearchHost’

    ELASTICSEARCH_INDEX_NAME: ‘customercare’

    ELASTICSEARCH_TYPE_NAME: ‘ticket’

3. Deploy

We are now ready to the deploy our code to AWS.

This is how my serverless.yml looks like:

We specified the provider (AWS), the runtime (Python 3.6), the environment variables, our HTTP endpoints and the AWS Lambda function handlers.

service: awstranslate

provider:

  name: aws

  runtime: python3.6

  region: eu-west-1

  memorySize: 1024

  timeout: 300

  environment:

    AWS_TRANSLATE_SUPPORTED_LANGUAGES: ‘ar zh fr de pt es’ # supported AWS Translate languages

    ELASTICSEARCH_HOST: ‘https://yourElasticsearchHost’

    ELASTICSEARCH_INDEX_NAME: ‘customercare’

    ELASTICSEARCH_TYPE_NAME: ‘ticket’

functions:

  create:

    handler: handler.create_ticket

    events:

      – http:

          path: createTicket

          method: post

          cors: true

  search:

    handler: handler.search_ticket

    events:

      – http:

          path: searchTicket

          method: get

plugins:

  – serverless-python-requirements

custom:

  pythonRequirements:

    dockerizePip: false

We specified the provider (AWS), the runtime (Python 3.6), the environment variables, our HTTP endpoints and the AWS Lambda function handlers.

Deploy to AWS:

serveless deploy –aws-s3-accelerate

4. Index to Elasticsearch

Given an inquiry, we now have a list of translations. Now, we want to index this information to Elasticsearch 6.2.3.
We create a new index called customercare and a new type called ticket.

The ticket type will have the following properties:

  • text: the English text
  • language: the language of the text
  • country: the country from where we received the inquiry
  • ticket number: an ID generated to uniquely identify an inquiry
  • timestamp: index time
  • translations: list of the translations (text and language)

PUT /customercare

{

   “mappings”: {

     “ticket”: {

      “properties”: {

        “text”: { “type”: “text”  },

        “language”: { “type”: “keyword” },

“country”: { “type”: “keyword” },

“ticket_number”: { “type”: “keyword” },

“timestamp”: {“type”: “date”},

        “translations”: {

          “type”: “nested”, 

          “properties”: {

            “text”:    { “type”: “text”  },

            “language”: { “type”: “keyword”  }

          }

        }

      }

    }

  }

}

5. Search in Elasticsearch

Now that we indexed the data in Elasticsearch, we can perform some queries to search in a multi-lingual way.

Examples:

Full-text search through translations:

GET customercare/_search

{

  “query”: {

    “nested”: {

      “path”: “translations”,

      “query”: {

        “match”: {

          “translations.text”: “your text”

        }

      }

    }

  }

}

Full-text search through English text and translations:

GET customercare/_search

{

  “query”: {

    “bool”: {

      “should”: [

        {

          “nested”: {

            “path”: “translations”,

            “query”: {

              “match”: {

                “translations.text”: “tree”

              }

            }

          }

        },

        {

          “term”: {

            “text”: “tree”

          }

        }

      ]

    }

  }

}

Number of inquiries by a customer (full-text search):

GET customercare/_search

{

  “aggs”: {

    “genres”: {

      “terms”: {

        “field”: “customerId”

      }

    }

  },

  “query”: {

    “bool”: {

      “should”: [

        {

          “match”: {

            “text”: “tree”

          }

        }

      ]

    }

  }

}

6. Visualize, Report, and Monitor with Kibana dashboards and search

With Kibana you can create a set of visualizations/dashboards to search for inquiries by language and to monitor index metrics (like number of translations or number of translations by customer).

Examples of Kibana dashboards:

Top languages, languages inquiries by customer and geolocation of inquiries:

inquiries count by languages and customers and top customer by language:

7. Use Skedler Reports and Alerts to easily monitor data

Using Skedler, an easy to use report scheduling and distribution application for Elasticsearch-Kibana-Grafana, you can centrally schedule and distribute custom reports from Kibana Dashboards and Saved Searches as hourly/daily/weekly/monthly PDF, XLS or PNG reports to various stakeholders. If you want to read more about it: Skedler Overview.

We have created a custom report using Skedler Report Templates that provides an overview of the tickets based on languages and countries of origin.  The custom report generated by Skedler is shown below:


If you want to get notified when something happens in your index, for example, a certain entity is detected or the number of negative feedback by customers crosses a threshold value, you can use Skedler Alerts. It simplifies how you create and manage alert rules for Elasticsearch and it provides a flexible approach to notifications (it supports multiple notifications, from Email to Slack and Webhook).

We have seen how to schedule a report generation. We are now going to see how to use Skedler Alerts to get notified when something happens in our index. For example, if the number of inquiries from a specific country hits a certain threshold.

Choose the Alert Condition. For example: “the number of ticket in English must be higher than zero”.

or “the number of ticket in English coming from Italy and containing a given word must be higher than zero”.

The Skedler Alert notification in Slack looks like.

Conclusion

In this two-part blog series, we learnt how to build our own multi-lingual omni-channel customer care platform using AWS Translate, Elasticsearch, and Skedler. Let us know your thoughts about this approach. Send your comments to hello at skedler dot com.

Combine Amazon Translate with Elasticsearch and Skedler to build a cost-efficient multi-lingual omnichannel customer care – Part 1

Every organization provides services to customers before, during and after a purchase.  For organizations whose customers are spread all over the world, the customer care team has to handle requests in different languages.  Meeting the customer satisfaction SLA for a global multi-lingual customer base without breaking the bank is a significant challenge.   How can you enable our customer care team to respond to inquiries in different languages?  Is it feasible for organizations to handle customer inquiries from across the globe efficiently without compromising on quality?

With Amazon’s introduction of AWS Translate + ELK  + Skedler, you now can!

In this two-part blog post, we are going to present a system architecture to translate customer inquiries in different languages with AWS Translate, index this information in Elasticsearch 6.2.3 for fast search, visualize the data with Kibana 6.2.3, and automate reporting and alerting using Skedler.  In Part I, we will discuss the key components, architecture, and common use cases. In Part II, we will dive into the details on how to implement this architecture.

Let us begin by breaking down the business requirement into use cases:

  • Enable customer care teams (based in the US or other English language countries) to respond to tickets/questions from customers all over the world, automatically translated, across multiple channels such as email, chat
  • Build a searchable index of tickets/questions/responses/translations/customer satisfaction score to measure (such as key topics, customer satisfaction, identify topics for automation – auto-reply via chatbots or knowledgebase)
  • Use Skedler reporting and alerting to generate KPIs on the above and alert if customer satisfaction score falls below threshold levels

The components that we need are the following:

  • AWS API Gateway
  • AWS Lambda
  • AWS Translate
  • Elasticsearch 6.2.3
  • Kibana 6.2.3
  • Skedler Reports and Alerts

System architecture:

architecture

A Bit about AWS Translate

At the re:invent2017 conference, Amazon Web Services presented Amazon Translate, a new machine learning – natural language processing – service.

aws translate

Amazon Translate is a neural machine translation service that delivers fast, high-quality, and affordable language translation. Neural machine translation is a form of language translation automation that uses deep learning models to deliver more accurate and more natural sounding translation than traditional statistical and rule-based translation algorithms. Amazon Translate allows you to localize content – such as websites and applications – for international users, and to easily translate large volumes of text efficiently.

Alternatives to AWS Translate include Google Cloud Translation API and Azure Translator Text.

You can find more details about AWS Translate in the following links.

> AWS official documentation: What is Amazon Translate?
> Blog post: Amazon Translate Now Generally Available
> Blog post: Introducing Amazon Translate – Real-time Language Translation
> AWS Machine Learning blog: Amazon Translate

Conclusion

In this post we presented a system architecture that performs the following:

  • Text Translation with AWS Translate
  • Index and fast search – Elasticsearch
  • Dashboard visualization – Kibana
  • Automated Customizable Reporting and Alerting – Skedler Reports and Alerts

AWS Translate+ELK+Skedler is a robust solution in helping you to handle multi-lingual customer support inquiries in a high-quality and cost-efficient way.

Excited and ready to dive into the details?  In the next post (Part 2 of 2), you can see how to implement the described architecture.

How to Extract Business Insights from Audio Using AWS Transcribe, AWS Comprehend and Elasticsearch – Part 2 of 2

In the previous post, we presented a system architecture to convert audio and voice into written text with AWS Transcribe, extract useful information for quick understanding of content with AWS Comprehend, index this information in Elasticsearch 6.2 for fast search and visualize the data with Kibana 6.2.

In this post we are going to see how to implement the previosly described architecture.
The main steps performed in the process are:

  1. Configure S3 Event Notification
  2. Consume messages from Amazon SQS queue
  3. Convert the recording to text with AWS Transcribe
  4. Entities/key phrases/sentiment detection using AWS Comprehend
  5. Index to Elasticsearch 6.2
  6. Search in Elasticsearch by entities/sentiment/key phrases/customer
  7. Visualize, report and monitor with Kibana dashboards
  8. Use Skedler and Alerts for reporting, monitoring and alerting

1. Configure S3 Event Notification

When a new recording has been uploaded to the S3 bucket, a message will be sent to an Amazon SQS queue.

You can read more information on how to configure the S3 Bucket and read the queue programmatically here: Configuring Amazon S3 Event Notifications.

This is how a message notified from S3 looks. The information we need are the object key and bucket name.

{

  “Records”: [

    {

      “eventVersion”: “2.0”,

      “eventSource”: “aws:s3”,

      “eventName”: “ObjectCreated:Put”,

      “requestParameters”: { “sourceIPAddress”: “xxx.xxx.xx.xx” },

      “s3”: {

        “s3SchemaVersion”: “1.0”,

        “configurationId”: “ev”,

        “bucket”: {

          “name”: “your_bucket”,

          “arn”: “arn:aws:s3:::your_bucket”

        },

        “object”: {

          “key”: “my_new_recording.mp3”,

          “size”: 567,

        }

      }

    }

  ]

}

2. Consume messages from Amazon SQS queue

Now that the S3 bucket has been configured, a notification will be sent to the SQS queue when a recording is uploaded to the bucket. We are going to build a consumer that will perform the following operations:

  • Start a new AWS Transcribe transcription job
  • Check the status of the job
  • When the job is done, perform text analysis with AWS Comprehend
  • Index the results to Elasticsearch

With this code you can read the messages from a SQS queue, fetch the bucket and key (used in S3) of the uploaded document and use them to invoke AWS Transcribe for the speech to text task:

import boto3 as boto3

import time

import json

AWS_ACCESS_KEY = ‘youAWS_ACCES_KEY’

AWS_SECRET_ACCESS_KEY = ‘youAWS_SECRET_ACCESKEY’

AWS_REGION = ‘yourAWS_SUBSCRIBTION_REGION’

SQS_QUEUE_NAME = ‘SQS_QUEUE_NAME’

sqs_resource_connection = boto3.resource(

    ‘sqs’,

    aws_access_key_id = AWS_ACCESS_KEY,

    aws_secret_access_key = AWS_SECRET_ACCESS_KEY,

    region_name = AWS_REGION

)

queue = sqs_resource_connection.get_queue_by_name(QueueName = SQS_QUEUE_NAME)

while True:

    messages = queue.receive_messages(MaxNumberOfMessages = 1, WaitTimeSeconds  = 5)

    for message in messages:

        body = json.loads(message.body)

        key_name = body[‘Records’][0][‘s3’][‘object’][‘key’]

        bucket_name= body[‘Records’][0][‘bucket’][‘name’]

        object_url = f’https://s3.amazonaws.com/{bucket_name}/{key_name}’

        # Start the AWS Transcribe transcription job

        # Check job status 

        # Run text analysis

        # Index to Elasticsearch

        message.delete()

    time.sleep(10)

3. AWS Transcribe – Start Transcription Job

Once we have consumed a S3 message and we have the url of the new uploaded document, we can start a new transcription job (asynchronous) to perform the speech to text task.

We are going to use the start_transcription_job method.

It takes a job name, the S3 url and the media format as parameters.

To use the AWS Transcribe API be sure that your AWS Python SDK – Boto3 is updated.

pip install boto3 –upgrade

import boto3 

client_transcribe = boto3.client(

    ‘transcribe’,

    region_name=’us-east-1′ # service still in preview

)

def start_transcribe_job(job_name, media_file_uri):

    response = client_transcribe.start_transcription_job(

        TranscriptionJobName=job_name,

        LanguageCode=’en-US’, # TODO: use parameter when more languages will be available

        MediaFormat=’mp3′, # feel free to change it

        Media={

            ‘MediaFileUri’: media_file_uri

        }

    )

    return response[‘TranscriptionJob’][‘TranscriptionJobName’]

Read more details here: Python Boto3 AWS Transcribe.

3a. AWS Transcribe – Check Job Status

Due to the asynchronous nature of the transcription job (it could take a while depending on the length and complexity of your recordings), we need to check the job status.

Once the stauts is “COMPLETED” we can retrieve the result of the job (the text converted from the recording).

def get_transcribe_job_response(job_name):

    job_status = ‘IN_PROGRESS’

    while job_status == ‘IN_PROGRESS’:

        job = client_transcribe.get_transcription_job(

            TranscriptionJobName=job_name

        )

        job_status = job[‘TranscriptionJob’][‘TranscriptionJobStatus’]

        time.sleep(5)

    if job_status == ‘FAILED’:

        raise Exception(f’Job {job_name} failed’)

    elif job_status == ‘COMPLETED’:

        job_result = job[‘TranscriptionJob’][‘Transcript’][‘TranscriptFileUri’]

        with urllib.request.urlopen(job_result) as url:

            return json.loads(url.read().decode())[‘results’][‘transcripts’][0]

Here’s how the output looks:

{

    “jobName”: “myFirstJob”,

    “accountId”: “1111111”,

    “results”: {

        “transcripts”: [{

            “transcript”: “welcome back”

        }],

        “items”: [{

            “start_time”: “0.990”,

            “end_time”: “1.300”,

            “alternatives”: [{

                “confidence”: “0.9999”,

                “content”: “welcome”

            }],

            “type”: “pronunciation”

        }, {

            “start_time”: “1.300”,

            “end_time”: “1.440”,

            “alternatives”: [{

                “confidence”: “1.0000”,

                “content”: “back”

            }],

            “type”: “pronunciation”

        }]

    }

}

4. AWS Comprehend – Text Analysis

We have converted our recording to text. Now, we can run the text analysis using AWS Comprehend. The analysis will extract the following elements from the text:

  • Sentiment
  • Entities
  • Key phreses

import boto3

client_comprehend = boto3.client(

    ‘comprehend’,

    region_name = ‘yourRegion’

)

def comprehend_analysis(plain_text):

    # Max Bytes size supported by AWS Comprehend

    # https://boto3.readthedocs.io/en/latest/reference/services/comprehend.html#Comprehend.Client.detect_dominant_language

    # https://boto3.readthedocs.io/en/latest/reference/services/comprehend.html#Comprehend.Client.detect_entities

    while sys.getsizeof(plain_text) > 5000:

        plain_text = plain_text[:-1]

    dominant_language_response = client_comprehend.detect_dominant_language(

        Text=plain_text

    )

    dominant_language = sorted(dominant_language_response[‘Languages’], key=lambda k: k[‘LanguageCode’])[0][‘LanguageCode’]

    if dominant_language not in [‘en’,’es’]:

        dominant_language = ‘en’

    response_entities = client_comprehend.detect_entities(

        Text=plain_text,

        LanguageCode=dominant_language

    )

    response_key_phrases = client_comprehend.detect_key_phrases(

        Text=plain_text,

        LanguageCode=dominant_language

    )

    response_sentiment = client_comprehend.detect_sentiment(

        Text=plain_text,

        LanguageCode=dominant_language

    )

    entites = list(set([x[‘Type’] for x in response_entities[‘Entities’]]))

    key_phrases = list(set([x[‘Text’] for x in response_key_phrases[‘KeyPhrases’]]))

    sentiment = response_sentiment[‘Sentiment’]

    return entites, key_phrases, sentiment

Read more details here: Python Boto3 AWS Comprehend.

5. Index to Elasticsearch

Given a recording, we now have a set of elements that characterize it. Now, we want to index this information to Elasticsearch 6.2. I created a new index called audioarchive and a new type called recording.

The recording type we are going to create will have the following properties:

  • customer id: the id of the customer who submitted the recording (substring of the s3 key)
  • entities: the list of entities detected by AWS Comprehend
  • key phrases: the list of key phrases detected by AWS Comprehend
  • sentiment: the sentiment of the document detected by AWS Comprehend
  • s3Location: link to the document in the S3 bucket

Create the new index:

curl -XPUT ‘esHost:9200/audioarchive/’ -H ‘Content-Type: application/json’ -d ‘{

    “settings” : {

        “index” : {

            “number_of_shards” : 1, 

            “number_of_replicas” : 0

        }

    }

}’

Add the new mapping:

curl -X PUT “esHost:9200/audioarchive/recording/_mapping” -H ‘Content-Type: application/json’ -d ‘{

    “recording” : {

        “properties” : {

“customerId” : { “type” : “keyword” },

            “entities” : { “type” : “keyword” },

“keyPhrases” : { “type” : “keyword” },

“sentiment” : {“type” : “keyword”},

            “s3Location” : { “type” : “text”}

        }

}

}’

We can now index the new document:

from elasticsearch import Elasticsearch

es_client = Elasticsearch(‘esHost’)

def create_es_document(customer_id, entites, sentiment, key_phrases, s3_location):

    return {

        “customerId”: customer_id,

        “entities”: entites,

        “sentiment”: sentiment,

        “keyPhrases”: key_phrases,

        “s3Location”: s3_location

    }

def index_to_es(document, index_name, type):

    es_client.index(index=index_name, doc_type=type, body=document)

doc = create_es_document(1, [‘entity1’, ‘entity2’], ‘positive’, [‘k1′,’k2’], ‘https://your_bucket.s3.amazonaws.com/your_object_key’

index_to_es(doc, INDEX_NAME, TYPE_NAME)

6. Search in Elasticsearch by entities, sentiment, key phrases or customer

Now that we indexed the data in Elasticsearch, we can perform some queries to extract business insights from the recordings.

Examples:

Number of positive recordins that contains the _feedback_ key phrases by customer.

POST audioarchive/recording/_search?size=0

{

  “aggs”: {

    “genres”: {

      “terms”: {

        “field”: “customerId”

      }

    }

  },

  “query”: {

    “bool”: {

      “must”: [

        {

          “match”: {

            “sentiment”: “Positive”

          }

        },

        {

          “match”: {

            “keyPhrases”: “feedback”

          }

        }

      ]

    }

  }

}

Number of recordings by sentiment.

POST audioarchive/recording/_search?size=0

{

  “aggs”: {

    “genres”: {

      “terms”: {

        “field”: “sentiment”

      }

    }

  }

}

What are the main key phares in the nevative recordings?

POST audioarchive/recording/_search?size=0

{

  “aggs”: {

    “genres”: {

      “terms”: {

        “field”: “keyPhrases”

      }

    }

  },

  “query”: {

    “bool”: {

      “should”: [

        {

          “match”: {

            “sentiment”: “Negative”

          }

        },

        {

          “match”: {

            “sentiment”: “Mixed”

          }

        }

      ]

    }

  }

}

7. Visualize, Report, and Monitor with Kibana dashboards and search

With Kibana you can create a set of visualizations/dashboards to search for recording by customer, entities and to monitor index metrics (like number of positive recordings, number of recordings by customer, most common entities/key phreases in the recordings).

Examples of Kibana dashboards:

Percentage of documents by sentiment, percentage of positive feedback and key phrases:

kibana report dashboard

Number of recordings by customers, and sentiment by customers:

kibana report dashboard

Most common entities and heat map sentiment-entities:

kibana report

8. Use Skedler Reports and Alerts to easily monitor data

Using Skedler, an easy to use report scheduling and distribution application for Elasticsearch-Kibana-Grafana, you can centrally schedule and distribute custom reports from Kibana Dashboards and Saved Searches as hourly/daily/weekly/monthly PDF, XLS or PNG reports to various stakeholders. If you want to read more about it: Skedler Overview.

[video_embed video=”APEOKhsgIbo” parameters=”” mp4=”” ogv=”” placeholder=”” width=”700″ height=”400″]

If you want to get notified when something happens in your index, for example, a certain entity is detected or the number of negative recording by customer reaches a certain value, you can use Skedler Alerts. It simplifies how you create and manage alert rules for Elasticsearch and it provides a flexible approach to notification (it supports multiple notifications, from Email to Slack and Webhook).

Conclusion

In this post we have seen how to use Elasticsearch as the search engine for customer recordings. We used the speech to text power of AWS Transcribe to convert our recording to text and then AWS Comprehend to extract semantic information from the text. Then we used Kibana to aggregate the data and create useful visualizations and dashboards. Then scheduled and distribute custom reports from Kibana Dashboards using Skedler Reports.

Environment configurations:

  • Elasticsearch and Kibana 6.2
  • Python 3.6.3 and AWS SDK Boto3 1.6.3
  • Ubuntu 16.04.3 LTS
  • Skedler Reports & Alerts

Extract business insights from audio using AWS Transcribe, AWS Comprehend and Elasticsearch – Part 1

Many businesses struggle to gain actionable insights from customer recordings because they are locked in voice and audio files that can’t be analyzed. They have a gold mine of potential information from product feedback, customer service recordings and more, but it’s seemingly locked in a black box.

Until recently, transcribing audio files to text has been time-consuming or inaccurate.
Speech to text is the process of converting speech input into digital text, based on speech recognition. The best solutions were either not accurate enough, too expensive to scale or didn’t play well with legacy analysis tools. With Amazon’s introduction of AWS Transcribe, that has changed.

In this two-part blog post, we are going to present a system architecture to convert audio and voice into written text with AWS Transcribe, extract useful information for quick understanding of content with AWS Comprehend, index this information in Elasticsearch 6.2 for fast search and visualize the data with Kibana 6.2.  In Part I, you can learn about the key components, architecture, and common use cases.  In Part II, you can learn how to implement this architecture.

We are going to analyze some customer recordings (complaints, product feedbacks, customer support) to extract useful information and answer the following questions:

  • How many positive recordings do I have?
  • How many customers are complaining (negative feedback) about my products?
  • Which is the sentiment about my product?
  • Which entities/key phrases are the most common in my recordings?

The components that we are going to use are the following:

  • AWS S3 bucket
  • AWS Transcribe
  • AWS Comprehend
  • Elasticsearch 6.2
  • Kibana 6.2
  • Skedler Reports and Alerts

System architecture:

This architecture is useful when you want to get useful insights from a set or audio/voice recording. You will be able to convert to text your recordings, extract semantic details from the text, perform fast search/aggregations on the data, visualize and report the data.

Examples of common applications are:

  • transcription of customer service calls
  • generation of subtitles on audio and video content
  • conversion of audio file (for example podcast) to text
  • search for keywords or inappropriate words within an audio file

 

AWS Transcribe

At the re:invent2017 conference, Amazon Web Services presented Amazon Transcribe, a new, machine learning – natural language processing – service.

Amazon Transcribe is an automatic speech recognition (ASR) service that makes it easy for developers to add speech to text capability to their applications. Using the Amazon Transcribe API, you can analyze audio files stored in Amazon S3 and have the service return a text file of the transcribed speech.

Instead of AWS Transcribe, you can use similar services to perform speech to text analysis, like: Azure Bing Speech API or Google Cloud Speech API.

> The service is still in preview, watch the launch video here: AWS re:Invent 2017: Introducing Amazon Transcribe.

> You can read more about it here: Amazon Transcribe – Accurate Speech To Text At Scale.

 

AWS Comprehend

Amazon Comprehend is a natural language processing (NLP) service that uses machine learning to find insights and relationships in text. Amazon Comprehend identifies the language of the text; extracts key phrases, places, people, brands, or events; understands how positive or negative the text is, and automatically organizes a collection of text files by topic. – AWS Service Page

AWS Comprehend and Elasticsearch

It analyzes text and tells you what it finds, starting with the language, from Afrikaans to Yoruba, with 98 more in between. It can identify different types of entities (people, places, brands, products, and so forth), key phrases, sentiment (positive, negative, mixed, or neutral), and extract key phrases, all from a text in English or Spanish. Finally, Comprehend’s topic modeling service extracts topics from large sets of documents for analysis or topic-based grouping. – Jeff Barr – Amazon Comprehend – Continuously Trained Natural Language Processing.

Instead of AWS Comprehend, you can use similar services to perform Natural Language Processing, like: Google Cloud Platform – Natural Language API or Microsoft Azure – Text Analytics API.
I prefer to use AWS Comprehend because the service constantly learns and improves from a variety of information sources, including Amazon.com product descriptions and consumer reviews – one of the largest natural language data sets in the world. This means it will keep pace with the evolution of language and it is fully integrated with AWS S3 and AWS Glue (so you can load documents and texts from various AWS data stores such as Amazon Redshift, Amazon RDS, Amazon DynamoDB, etc.).

Once you have a text file of the audio recording, you enter it into Amazon Comprehend for analysis of the sentiment, tone and other insights. Instead of AWS Comprehend, you can use similar services to perform Natural Language Processing, like: Google Cloud Platform – Natural Language API or Microsoft Azure – Text Analytics API.

> Here you can find an AWS Comprehend use case: How to Combine Text Analytics and Search using AWS Comprehend and Elasticsearch 6.0.

 

Conclusion

In this post we have seen a system architecture that performs the following:

  • Speech to text task – AWS Transcribe
  • Text analysis – AWS Comprehend
  • Index and fast search – Elasticsearch
  • Dashboard visualization – Kibana
  • Automatic Reporting and Alerting – Skedler Reports and Alerts

Amazon Transcribe and Comprehend can be powerful tools in helping you unlock the potential insights from voice and video recordings that were previously too costly to access. Having these insights makes it easier to understand trends in issues and consumer behavior, brand and product sentiment, Net Promoter Score, as well as product ideas and suggestions, and more.

In the next post (Part 2 of 2), you can see how to implement the described architecture.

Application Performance Monitoring with Elasticsearch 6.1, Kibana and Skedler Alerts

Have you ever wonder how to easily monitor the performance of your application and how to house your application metrics in Elasticsearch? The answer is Elastic APM.
Elastic Application Performance Management(APM) is a new feature available in Elasticsearch 6.1 (in beta and alpha in 6.0).  A few months ago, Opbeat (an application performance monitoring – APM – company) joined forces with Elastic which is now Elastic APM.

Adding APM (Application Performance Monitoring) to the Elastic Stack is a natural next step in providing users with end-to-end monitoring, from logging to server-level metrics, to application-level metrics, all the way to the end-user experience in the browser or client.

In this post, we are going to see how to monitor the performance of a Python Flask application using the APM feature of Elasticsearch and how to get notified (webhook or email) when something happens in your application by Skedler Alerts.

Here you can read more about Opbeat acquisition and APM announcement:

APM Overview

First of all, let’s see how does APM work. What is written below is taken from here: APM Overview

APM is an application performance monitoring system built on the Elastic Stack. It uses Elasticsearch as its data store and allows you to monitor the performance of thousands of applications in real time.

With APM, you can automatically collect detailed performance information from inside your applications and it requires only minor changes to your application. APM will automatically instrument your application and measure the response time for incoming requests. It also automatically measures what your application was doing while it was preparing the response.

APM components:

elastic application performance management (APM)

APM agents

APM agents are open source libraries written in the same language as your application. You install them into your application as you would install any other library. The agents hook into your application and start collecting performance metrics and errors. All data that gets collected by agents and sent on to APM Server.

APM Server

APM server is an open source application written in Go which runs on your servers. It listens on port 8200 by default and receives data from agents periodically. The API is a simple JSON based HTTP API. APM Server builds Elasticsearch documents from the data received from agents. These documents are stored in an Elasticsearch cluster. A single APM Server process can typically handle data from hundreds of agents.

Righ now these APM Agents are available:

In this post we are not going to see how to install and configure the APM server, you can read more here (the procedure is well documented): Elastic – APM .

Use case

The new APM feature can be used when you need a free solution to monitor your Python/Node.jS/Ruby/JS application and you want to use the Elasticsearch’s search powers and Kibana’s visualization to look at your applications metrics. If you integrate the alerting feature of Skedler Alerts (licensed) you can get notified in a flexible way, from webhook to email when something happens in your application.

Example of applications and notifications by Alerts:

  • Back office application is written in Python with the Flask or Django Framework: get a Slack notification when the number of HTTP 4xx errors are higher than a given threshold in the last 30 minutes – access control alert
  • Application server is written in Node.js: get an email message when a not handled exception is raised by the application in the last hour – error handling alert
  • Batch process script is written in Ruby: get a daily Slack notification with the details of all the operations of the day – application summary alert

Python Flask Application

Flask is a micro web framework written in Python and based on the Werkzeug toolkit and Jinja2 template engine. You can read more about it here: Welcome to Flask.
In this example, we will assume to have some web APIs written with Flask. We want to monitor our application (APIs calls) and get notified when the number of errors is particularly high and when some endpoint get too many calls.

Given a set of Flask API endpoints:

we are going to add few lines of code to send the application metrics to the APM Server (that will index these to Elasticsearch). First of all, install the Elastic APM dependencies:

$ pip install elastic-apm[flask]

and import them:

from elasticapm.contrib.flask import ElasticAPM

Configure Elastic APM in your application by specifying the APM server URL, eventually a secret token (that you set in the APM server config.yml) and your application name.

# configure to use ELASTIC_APM in your application’s settings from elasticapm.contrib.flask import ElasticAPM

app.config[‘ELASTIC_APM’] = {

    # allowed app_name chars: a-z, A-Z, 0-9, -, _, and space from elasticapm.contrib.flask

   ‘APP_NAME’: ‘yourApplicationName’,

   #’SECRET_TOKEN’: ‘yourToken’, #if you set on the APM server configuration

   ‘SERVER_URL’: ‘http://apmServer:8200’ # your APM server url

}

apm = ElasticAPM(app)

We are now monitoring our application and housing our metrics in Elasticsearch!
You can monitor addition events or send additional data to the APM server.

Capture exceptions:

try:

    1 / 0

except ZeroDivisionError:

    apm.capture_exception()

Log generic message:

apm.capture_message(‘hello, world!’)

Send extra information:

@app.route(‘/’)

def bar():

    try:

        1 / 0

    except ZeroDivisionError:

        app.logger.error(‘Math is hard’,

            exc_info=True,

            extra={

                ‘good_at_math’: False,

            }

        )

    )

Elasticsearch and Kibana

All the collected metrics are stored within an Elasticsearch index APM-6.1.1-* as a doc type.
Here an extract of the doc type mapping (related to the HTTP request/response).

“request”: {

“properties”: {

  “http_version”: {

“type”: “keyword”,

“ignore_above”: 1024

  },

  “method”: {

“type”: “keyword”,

“ignore_above”: 1024

  },

  “url”: {

“properties”: {

  “pathname”: {

“type”: “keyword”,

“ignore_above”: 1024

  },

  “port”: {

“type”: “keyword”,

“ignore_above”: 1024

  },

  “protocol”: {

“type”: “keyword”,

“ignore_above”: 1024

  }

}

  },………

}

},

“response”: {

“properties”: {

  “finished”: {

“type”: “boolean”

  },

  “status_code”: {

“type”: “long”

  },……….

}

}

Here you can find the full type mapping: doc type mapping.

Example of indexed document:

kibana apm ui

Once our application is configured, all the metrics will be stored in Elasticsearch and we can use the default Kibana APM UI to view them.

kibana amp dashboard

Response time and response by minutes (HTTP 2xx and HTTP 4xx):

kibana apm dashboard

Request details:

Elasticsearch Alerts with Skedler

We are now sending our application’s metrics to Elasticsearch and we have a nice way to view them, but we will not look all the time to the Kibana APM UI to see if everything is ok.
Wouldn’t it be nice if we could receive a Slack notification or an email when something is wrong so we can look at the dashboard?

Here Skedler Alerts comes into the picture!

It simplifies how you monitor data in Elasticsearch for abnormal patterns, drill down to root cause and alert using webhooks and email.  You can design your rules for detecting patterns, spikes, new events, and threshold violations using Skedler’s easy to use UI. You can correlate across indexes, filter events and compare against baseline conditions to detect abnormal patterns in data.  

Read more about Skedler Alerts here:

From the Alerts UI (to see how to install Alerts, take a look here: Install Alerts) let’s define a new Webhook (I took the webhook URL from my Slack team setting, read more here: Slack Incoming Webhook):

elasticsearch slack webhook
elasticsearch alert skedler setup

We want to get a notification when our application returns an HTTP 400 error. Define a new Alert rule (Threshold type):

Filter by the context.response.status_code == 400 field:

elasticsearch alert skedler setup

Choose your schedule and action.

In the picture below, the job will run every minute and the notification will be sent to the Slack webhook. You can define your Slack message template.

elasticsearch alert schedule skedler

Once the event is fired we correctly get notified to the Slack channel.

elasticsearch slack alert

You can now create as many new Alert rules as you need to get notified when something happens is your application.

The applications metrics are written by the APM Server to a standard Elastic Index, so you can write your own Alert rules (no constraints on the APM index).

Here you can find some useful resources about Skedler Alerts:

Conclusion

In this post, we have seen how to monitor the performances of your application with Elastic APM, to automatically send them to Elasticsearch and how to use Skedler Alerts to get notified when something is wrong.

Monitoring the performance of your applications is something that you should always do to improve, fix, and manage your application.

You should use Elastic APM if you look for something free, easy to configure and fully integrated with Elasticsearch (metrics are stored in a normal index) and Kibana (you have a dedicated APM UI and you can build your own dashboards).

You should use Skedler Alerts if you want to be notified about your applications’ metrics. It provides a nice dashboard where you can configure your alert rules and supports webhook and email notifications with a custom template.

How to Combine Text Analytics and Search using AWS Comprehend and Elasticsearch 6.0

How to automatically extract metadata from documents? How to index them and perform fast searches? In this post, we are going to see how to automatically extract metadata from a document using Amazon AWS Comprehend and Elasticsearch 6.0 for fast search and analysis.

This architecture we present improves the search and automatic classification of documents (using the metadata) for your organization.

Using the automatically extracted metadata you can search for documents and find what you need.

We are going to use the following components:

 

Architecture

 

AWS Comprehend and Elasticsearch

Example of applications:

  • Voice of customer analytics: You can use Amazon Comprehend to analyze customer interactions in the form of documents, support emails, online comments, etc., and discover what factors drive the most positive and negative experiences. You can then use these insights to improve your products and services.
  • Semantic search: You can use Amazon Comprehend to provide a better search experience by enabling your search engine to index key phrases, entities, and sentiment. This enables you to focus the search on the intent and the context of the articles instead of basic keywords.
  • Knowledge management and discovery: You can use Amazon Comprehend to organize and categorize your documents by topic for easier discovery, and then personalize content recommendations for readers by recommending other articles related to the same topic.

When we talk about metadata, I like the following definition:

Metadata summarizes basic information about data, which can make finding and working with particular instances of data easier. For example, author, date created and date modified and file size are examples of very basic document metadata. Having the ability to filter through that metadata makes it much easier for someone to locate a specific document.

We are going to focus on the following metadata:

  • Document content type (PDF, Plain Text, HTML, Docx)
  • Document dominant language
  • Document entities
  • Key phrases
  • Sentiment
  • Document length
  • Country of origin of the document (metadata taken from the user details – ip address)

Amazon S3 will be the main documents storage. Once a document has been uploaded to S3 (you can easily use the AWS SDK to upload a document to S3 from your application) a notification is sent to an SQS queue and then consumed by a consumer.

The consumer gets the uploaded document and detects the entities/key phrases/sentiment using AWS Comprehend. Then it indexes the document to Elasticsearch. We use the Elasticsearch pre-processor plugins, Attachment Processor and Geoip Processor, to perform the other metadata extraction (more details below).

Here are the main steps performed in the process:

  1. Upload a document to S3 bucket
  2. Event notification from S3 to a SQS queue
  3. Event consumed by a consumer
  4. Entities/key phrases/sentiment detection using AWS Comprehend
  5. Index to Elasticsearch
  6. ES Ingestion pre-processing: extract document metadata using Attachment and Geoip Processor plugin
  7. Search in Elasticsearch by entities/sentiment/key phrases/language/content type/source country and full-text search
  8. Use Kibana for dashboard and search
  9. Use Skedler and Alerts for reporting, monitoring and alerting

 

In the example, we used AWS S3 as document storage. But you could extend the architecture and use the following:

  • SharePoint: create an event receiver and once a document has been uploaded extract the metadata and index it to Elasticsearch. Then search and get the document on SharePoint
  • Box, Dropbox and Google Drive: extract the metadata from the document stored in a folder and then easily search for them
  • Similar Object storage (i.e. Azure Blob Storage)

Event notification

When a document has been uploaded to the S3 bucket a message will be sent to an Amazon SQS queue. You can read more information on how to configure the S3 Bucket and read the queue programmatically here: Configuring Amazon S3 Event Notifications.

This is how a message notified from S3 looks. The information we need are the sourceIPAddress and object key

{

  “Records”: [

    {

      “eventVersion”: “2.0”,

      “eventSource”: “aws:s3”,

      “eventName”: “ObjectCreated:Put”,

      “requestParameters”: { “sourceIPAddress”: “xxx.xxx.xx.xx” },

      “s3”: {

        “s3SchemaVersion”: “1.0”,

        “configurationId”: “ev”,

        “bucket”: {

          “name”: “your_bucket”,

          “arn”: “arn:aws:s3:::your_bucket”

        },

        “object”: {

          “key”: “my_document.docx”,

          “size”: 567,

        }

      }

    }

  ]

}

Consume messages from Amazon SQS queue

Now that the S3 bucket has been configured, when a document is uploaded to the bucket a notification will be sent to the SQS queue. We are going to build a consumer that will read this message and perform the instances/key phrases/sentiment detection using AWS Comprehend. You can eventually read a set of messages (change the MaxNumberOfMessages parameter) from the queue and run the task against a set of documents (batch processing).

With this code you can read the messages from a SQS queue and fetch the bucket and key (used in S3) of the uploaded document and use them to invoke AWS Comprehend for the metadata detection task:

import boto3 as boto3

import time

import json

import os

AWS_ACCESS_KEY = ‘youAWS_ACCES_KEY’

AWS_SECRET_ACCESS_KEY = ‘youAWS_SECRET_ACCESKEY’

AWS_REGION = ‘yourAWS_SUBSCRIBTION_REGION’

SQS_QUEUE_NAME = ‘SQS_QUEUE_NAME’

sqs_resource_connection = boto3.resource(

    ‘sqs’,

    aws_access_key_id = AWS_ACCESS_KEY,

    aws_secret_access_key = AWS_SECRET_ACCESS_KEY,

    region_name = AWS_REGION

)

queue = sqs_resource_connection.get_queue_by_name(QueueName = SQS_QUEUE_NAME)

while True:

    messages = queue.receive_messages(MaxNumberOfMessages = 1, WaitTimeSeconds  = 5)

    for message in messages:

        body = json.loads(message.body)

        filename_key = body[‘Records’][0][‘s3’][‘object’][‘key’]

        ip = body[‘Records’][0][‘requestParameters’][‘sourceIPAddress’]

        # Here we will run the entities document detection with AWS Comprehend

        # and index the result to Elasticsearch

        message.delete()

    time.sleep(10)

We will download the uploaded document from S3.

import boto3 

AWS_ACCESS_KEY = ‘your_key’

AWS_SECRET_ACCESS_KEY =’your_secret_key’

BUCKET_NAME = ‘your bucket’

s3_client_connection = boto3.client(

    ‘s3’,

    aws_access_key_id = AWS_ACCESS_KEY,

    aws_secret_access_key = AWS_SECRET_ACCESS_KEY

)

def consume_from_sqs(message):

    # Extract uploaded document details

    ip = message[‘requestParameters’][‘sourceIPAddress’]

    key = message[‘s3’][‘object’][‘key’]

    # download the document from S3

    local_path = “{}”.format(key)

    s3_client_connection.download_file(BUCKET_NAME, key, local_path)

    # detect entities

    entities = process_document(local_path)

AWS Comprehend

Amazon Comprehend is a new AWS service presented at the re:invent 2017.
Amazon Comprehend is a natural language processing (NLP) service that uses machine learning to find insights and relationships in text. Amazon Comprehend identifies the language of the text; extracts key phrases, places, people, brands, or events; understands how positive or negative the text is, and automatically organizes a collection of text files by topic. – AWS Service Page

AWS Comprehend and Elasticsearch

It analyzes text and tells you what it finds, starting with the language, from Afrikaans to Yoruba, with 98 more in between. It can identify different types of entities (people, places, brands, products, and so forth), key phrases, sentiment (positive, negative, mixed, or neutral), and extract key phrases, all from a text in English or Spanish. Finally, Comprehend‘s topic modeling service extracts topics from large sets of documents for analysis or topic-based grouping. – Jeff Barr – Amazon Comprehend – Continuously Trained Natural Language Processing.

Instead of AWS Comprehend you can use similar service to perform Natural Language Processing, like: Google Cloud Platform – Natural Language API or Microsoft Azure – Text Analytics API.

Entities Detection

With this code, we can invoke the entities detection of AWS Comprehend. We will use the object key to download the object from S3.

Once you have downloaded the document, invoke the detect_entities method of AWS Comprehend.

import boto3

import sys

AWS_ACCESS_KEY = ‘your_key’

AWS_SECRET_ACCESS_KEY =’your_secret_key’

BUCKET_NAME = ‘your bucket’

client_comprehend = boto3.client(

    ‘comprehend’,

    region_name = ‘eu-west-1’,

    aws_access_key_id = AWS_ACCESS_KEY,

    aws_secret_access_key = AWS_SECRET_ACCESS_KEY    

)

def process_document(file_path):

    filename = file_path.split(“/”)[-1]

    extension =  filename.split(“.”)[-1]

    plain_text = ”

    # you can find the methods to extract the text from different document here: 

    # https://gist.github.com/mz1991/97ee3f7045c8fd0e6f21ab14f9e588c7

    if extension == “pdf”:

        plain_text = get_pdf_text(file_path)

    if extension == “docx”:

        plain_text = get_docx_text(file_path)

    if extension == “txt” or extension == “csv”:

        plain_text = get_txt_text(file_path)   

    # Add your custom file extension handler

    # Max Bytes size supported by AWS Comprehend

    # https://boto3.readthedocs.io/en/latest/reference/services/comprehend.html#Comprehend.Client.detect_dominant_language

    # https://boto3.readthedocs.io/en/latest/reference/services/comprehend.html#Comprehend.Client.detect_entities

    while sys.getsizeof(plain_text) > 5000:

        plain_text = plain_text[:-1]

    dominant_language_response = client_comprehend.detect_dominant_language(

        Text=plain_text

    )

    dominant_language = sorted(dominant_language_response[‘Languages’], key=lambda k: k[‘LanguageCode’])[0][‘LanguageCode’]

    # The service now only supports English and Spanish. In future more languages will be available.

    if dominant_language not in [‘en’,’es’]:

        dominant_language = ‘en’

    response = client_comprehend.detect_entities(

        Text=plain_text,

        LanguageCode=dominant_language

    )

    entites = list(set([x[‘Type’] for x in response[‘Entities’]]))

    return entites

Key phrases

To extract the key phrases use the detect_key_phrases method of AWS Comprehend.

response_key_phrases = client_comprehend.detect_key_phrases(

   Text=plain_text,

   LanguageCode=dominant_language

)

key_phrases = list(set([x[‘Text’] for x in response_key_phrases[‘KeyPhrases’]]))

Sentiment

To extract the sentiment (positive, negative, neutral) use the detect_sentiment method of AWS Comprehend.

response_sentiment = client_comprehend.detect_sentiment(

   Text=plain_text,

   LanguageCode=dominant_language

)

sentiment = response_sentiment[‘Sentiment’]

Index to Elasticsearch

Given a document, we now have a set of metadata that identify it. Next, we index these metadata to Elasticsearch and use a pipeline to extract the other metadata. To do so, I created a new index called library and a new type called document.

Since we are going to use Elasticsearch 6.0 and Kibana 6.0, I suggested you read the following resource:

The document type we are going to create will have the following properties:

  • title: the title of the document (s3 key)
  • data: the base64 encoding of the document (used from the Attachment plugin to extract metadata)
  • ip: field that will contain the ip address of the user that uploaded the document (so we can extract the location details)
  • entities: the list of entities detected by AWS Comprehend
  • key phrases: the list of key phrases detected by AWS Comprehend
  • sentiment: the sentiment of the document detected by AWS Comprehend
  • s3Location: link to the document in the S3 bucket

Create a new index:

curl -XPUT ‘esHost:9200/library/’ -H ‘Content-Type: application/json’ -d ‘{

    “settings” : {

        “index” : {

            “number_of_shards” : 1, 

            “number_of_replicas” : 0

        }

    }

}’

Create a new mapping. As you may notice, in ES 6.0, the type string has been replaced by the type text and keyword. String type ES 6.0

curl -X PUT “esHost:9200/library/document/_mapping” -H ‘Content-Type: application/json’ -d ‘{

    “document” : {

        “properties” : {

            “title” : { “type” : “text”},

            “data” : { “type” : “binary”, “doc_values”: false, “store”: false },

            “ip” : { “type” : “keyword” },

            “entities” : { “type” : “text” },

            “keyPhrases” : { “type” : “text” },

            “sentiment” : { “type” : “text” },

            “s3Location” : { “type” : “text”}

        }

    }

}’

To pre-process documents before indexing it, we define a pipeline that specifies a series of processors. Each processor transforms the document in some way. For example, you may have a pipeline that consists of one processor that removes a field from the document followed by another processor that renames a field. Our pipeline will extract the document metadata (from the encoded base64) and the location information from the ip address.

The attachment processors use the Ingest Attachment plugin and the geoip processor use the Ingest Geoip plugin.

curl -X PUT ‘esHost:9200/_ingest/pipeline/documentpipeline’ -H ‘Content-Type: application/json’ -d ‘

{

  “description” : “Pipeline description”,

  “processors” : [

    {

      “attachment” : {

        “field” : “data”,

        “properties”: [“content”, “content_type”, “language”, “content_length”]

      }

    },

    {

      “geoip” : {

        “field” : “ip”

      }

    }

  ]

}’

Read more about ingestion and pipeline here: Ingest Node, Pipeline Definition.

If you want, you can write your custom pre-processor and invoke AWS Comprehend in the ingestion phase: Writing Your Own Ingest Processor for Elasticsearch.

We can now index a new document:

from elasticsearch import Elasticsearch

import base64

es_client = Elasticsearch(‘esHost’)

def create_es_document(title, base64data, ip, entites, key_phrases, sentiment, s3_location):

    return {

        “title” : title,

        “data” : base64data.decode(“utf-8”),

        “ip” : ip,

        “entities”: entites,

        “keyPhrases”: key_phrases,

        “sentiment”: sentiment,

        “s3Location”: s3_location

    }

base64data = base64.b64encode(open(‘your_file’,”rb”).read())

document = create_es_document(‘the title of the document’, base64data, ‘xxx.xxx.xx.xx’, [‘entity1’, ‘entity2’], [‘k1′,’k2’], ‘positive’ ‘https://your_bucket.s3.amazonaws.com/your_object_key’] 

es_client.index(index=’library’, doc_type=’document’, body=document, pipeline=’documentpipeline’) # note the pipeline here

This is how an indexed document looks like. Notice the attachment and geoip section. We have the language, content type, length and user location details.

{

  “_index”: “library”,

  “_type”: “document”,

  “_id”: “o_lllsFmAtreId4Ib84”,

  “_score”: 1,

  “_source”: {

    “geoip”: {

      “continent_name”: “North America”,

      “city_name”: “Ottawa”,

      “country_iso_code”: “CA”,

      “region_name”: “Ontario”,

      “location”: {

        “lon”: -75.8265,

        “lat”: 45.3433

      }

    },

    “data”: “UEsDBBQABgAIAA…..”

    “attachment”: {

      “content_type”: “application/vnd.openxmlformats-officedocument.wordprocessingml.document”,

      “language”: “en”,

      “content”: “file content”,

      “content_length”: 120400

    },

    “entities”: [

      “Organization”,

      “Location”,

      “Date”

    ],

     “keyPhrases”: [

      “k1”,

      “k2”

    ],

    “sentiment”: “positive”,

    “ip”: “xx.xxx.xx.xxx”,

    “s3Location”: “https://your_bucket.s3.amazonaws.com/A Christmas Carol, by Charles Dickens.docx”,

    “title”: “A Christmas Carol, by Charles Dickens.docx”

  }

}

Visualize, Report, and Monitor

With Kibana you can create a set of visualizations/dashboards to search for documents by entities and to monitor index metrics (like number of document by language, most contributing countries, document by content type and so on).

Using Skedler, an easy to use report scheduling and distribution application for Elasticsearch-Kibana-Grafana, you can centrally schedule and distribute custom reports from Kibana Dashboards and Saved Searches as hourly/daily/weekly/monthly PDF, XLS or PNG reports to various stakeholders. If you want to read more about it: Skedler Overview.

Example of Kibana dashboard:

AWS Comprehend and Elasticsearch

Number of documents by language and countries that upload more documents.

Countries by the number of uploaded documents.

AWS Comprehend and Elasticsearch

If you want to get notified when something happens in your index, for example, a certain entity is detected or the number of documents by country or documents by language reaches a certain value, you can use Alerts. It simplifies how you create and manage alert rules for Elasticsearch and it provides a flexible approach to notification (it supports multiple notifications, from Email to Slack and Webhook).

Conclusion

In this post we have seen how to use Elasticsearch as the search engine for documents metadata. You can extend your system by adding this pipeline to automatically extract the document metadata and index them to Elasticsearch for fast search (semantic search).

By automatically extracting the metadata from your documents you can easily classify and search (Knowledge management and discovery) for them by content, entities, content type, dominant content language and source country (from where the document has been uploaded).

I ran this demo using the following environment configurations:

  • Elasticsearch and Kibana 6.0.0
  • Python 3.4 and AWS SDK Boto3 1.4.8
  • Ubuntu 14.04
  • Skedler Reports and Alerts

Machine learning with Amazon Recognition and Elasticsearch

In this post we are going to see how to build a machine learning system to perform the image recognition task and use Elasticsearch as search engine to search for the labels identified within the images. The image recognition task is the process of identifying and detecting an object or a feature in a digital image or video.

The components that we will use are the following:

  • Elasticsearch
  • Kibana
  • Skedler Reports and Alerts
  • Amazon S3 bucket
  • Amazon Simple Queue Service (eventually you can replace this with AWS Lambda)
  • Amazon Rekognition

The idea is to build a system that will process the image recognition task against some images stored in a S3 bucket and will index the results (set of labels and % of confidence) to Elasticsearch. So we are going to use Elasticsearch as search engine for the labels found in the images.

If you are not familiar with one or more on the item listed before, I suggest you to read more about them here:

Amazon Rekognition is a service that makes it easy to add image analysis to your applications. With Rekognition, you can detect objects, scenes, faces; recognize celebrities; and identify inappropriate content in images.

These are the main steps performed in the process:

  • Upload an image to S3 bucket
  • Event notification from S3 to an SQS queue
  • Event consumed by a consumer
  • Image recognition on the image using AWS Rekognition
  • The result of the labels detection is indexed in Elasticsearch
  • Search in Elasticsearch by labels
  • Get the results from Elasticsearch and get the images from S3
  • Use Kibana for dashboard and search
  • Use Skedler and Alerts for reporting, monitoring and alerting

Architecture:

Use Case

This system architecture can be useful when you need to detect the labels in a picture and perform fast searches.

Example of applications:

  • Smart photo gallery: find things in your photo. Detect labels in an automatic way and use Elasticsearch to search them
  • Product Catalog: automatically classify the products of a catalog. Take photos of a product and get it classified
  • Content moderation: get notified when a NSFW content is uploaded
  • Accessibility camera app: help people with disability to see and take pictures

Event notification

When an image is uploaded to the S3 bucket a message will be stored to a Amazon SQS queue. You can read more information on how to configure the S3 Bucket and to read the queue programmatically here: Configuring Amazon S3 Event Notifications.

This is how a message notified from S3 looks like (we need the bucket and key information):

{“Records”:

    [{

    “eventSource”:”aws:s3″,

    “awsRegion”:”your_aws_region”,

    “eventTime”:”2017-11-16T12:47:48.435Z”,

    “eventName”:”ObjectCreated:Put”,

    “s3”:{

        “configurationId”:”myS3EventCOnfiguration”,

        “bucket”:{

            “name”:”yourBucket”,

            “arn”:”arn:aws:s3:::yourBucket”

        },

        “object”:{

            “key”:”myImage.jpg”,

            “eTag”:”790xxxxxe255kkkk011ayyyy908″

        }

    }

   }]

}

Consume messages from the Amazon SQS queue

Now that the S3 bucket is configured, when an image is uploaded to the bucket an event will be notified and a message saved the SQS queue. We are going to build a consumer  that will read this message and perform the image labels detection using AWS Rekognition. You can eventually read a set of messages (change the MaxNumberOfMessages parameter) from the queue and run the task against a set of images (batch processing) or use a AWS Lambda notification (instead of SQS).

With this code you can read the messages from a SQS queue and fetch the bucket and key (used in S3) of the uploaded file and use them to invoke AWS Rekognition for the labels detection task:

import boto3 as boto3

import time

import json

import os

AWS_ACCESS_KEY = ‘youAWS_ACCES_KEY’

AWS_SECRET_ACCESS_KEY = ‘youAWS_SECRET_ACCESKEY’

AWS_REGION = ‘yourAWS_SUBSCRIBTION_REGION’

SQS_QUEUE_NAME = ‘SQS_QUEUE_NAME’

BUCKET_NAME = ‘S3_BUCKET_NAME’

sqs_resource_connection = boto3.resource(

    ‘sqs’,

    aws_access_key_id = AWS_ACCESS_KEY,

    aws_secret_access_key = AWS_SECRET_ACCESS_KEY,

    region_name = AWS_REGION

)

queue = sqs_resource_connection.get_queue_by_name(QueueName = SQS_QUEUE_NAME)

while True:

    messages = queue.receive_messages(MaxNumberOfMessages = 1, WaitTimeSeconds  = 5)

    for message in messages:

        body = json.loads(message.body)

        filename_key = body[‘Records’][0][‘s3’][‘object’][‘key’]

        # Here we will run the image labels detection with AWS Rekognition (we will need the Bucket name and file key)

        # and index the labels detection result to Elasticsearch

        # invoke_aws_reko(BUCKET_NAME, filename_key)

        message.delete()

    time.sleep(10)

Image recognition task

Now that we have the key of the uploaded image we can use AWS Rekognition to run the image recognition task.

The following function invoke the detect_labels method to get the labels of the image. It returns a dictionary with the identified labels and % of confidence.

import boto3

AWS_ACCESS_KEY = ‘youAWS_ACCES_KEY’

AWS_SECRET_ACCESS_KEY = ‘youAWS_SECRET_ACCESKEY’

AWS_REGION = ‘yourAWS_SUBSCRIBTION_REGION’

rekognition_client_connection = boto3.client(

    ‘rekognition’,

    aws_access_key_id = AWS_ACCESS_KEY,

    aws_secret_access_key = AWS_SECRET_ACCESS_KEY,

    region_name = AWS_REGION

)

def invoke_aws_reko(bucket_name, filename_key):

   result_dictionary = {}

   reko_response = rekognition_client_connection.detect_labels(Image={

      ‘S3Object’: {

         ‘Bucket’: bucket_name,

         ‘Name’: filename_key,

      }

   })

   if ‘Labels’ in reko_response and len(reko_response[‘Labels’])>0:

      res = reko_response[‘Labels’]

      for item in res:

           print(‘{} : {}’.format(item[‘Name’], item[‘Confidence’]))

           result_dictionary [item[‘Name’]] = item[‘Confidence’]

   # We will index these information to Elasticsearch

   return result_dictionary

Index to Elasticsearch

So given an image we have now a set of labels that identify it. We want now to index these labels to Elasticsearch. To do so, I created a new index called imagerepository and a new type called image.

The image type we are going to create will have the following properties:

  • title: the title of the image
  • s3_location: the link to the S3 resource
  • labels: field that will contain the result of the detection task

For the labels property I used the Nested datatype. It allows arrays of objects to be indexed and queried independently of each other.

You can read more about it here:

We will not store the image in Elasticsearch but just the URL of the image within the S3 bucket.

New Index:

curl -XPUT ‘esHost:9200/imagerepository/’ -d ‘{

    “settings” : {

        “index” : {

            “number_of_shards” : 1, 

            “number_of_replicas” : 0

        }

    }

}’

New type:

curl -X PUT “esHost:9200/imagerepository/image/_mapping” -d ‘{

    “image” : {

       “properties” : {

          “title” : { “type” : “string” },

          “s3_location” : { “type” : “string” },

          “labels” : { 

             “type” : “nested”

         }

       }  

    }

}’

You can now try to post a dummy document:

curl -XPOST ‘esHost:9200/imagerepository/image/’ -d’

{

    “title” : “test”,

    “s3_location” : “http://mybucket/test.jpg”,

    “labels” : [

                {“label”: “test_tag1”,

                 “confidence”: 0.5},

                {“label”: “test_tag2”,

                 “confidence”: 0.38},

                {“label”: “test_tag3”,

                 “confidence”: 0.12}

            ]

}’

We can index a new document using the Elasitcsearch Python SDK.

from elasticsearch import Elasticsearch

es = Elasticsearch()

# dictionary with the result (labels and scores) – output of the invoke_aws_reko function invocation

result_dictionary = {}

result_dictionary[‘label_1’] = 0.3

result_dictionary[‘label_2’] = 0.5

result_dictionary[‘label_3’] = 0.2

img_s3_location = “https://yourbucket/dummy_picture.jpg”

img_title = “dummy_picture.jpg”

def index_new_document(img_title, img_s3_location, result_dictionary):

    result_nested_obj = []

    for key, value in result_dictionary.items():

        result_nested_obj.append({“tag”:key, “score”:value})

    doc = {

    “title” : img_title,

    “s3_location” : img_s3_location,

    “labels” : result_nested_obj

    }

    # es is instance of Elasticsearch

    res = es.index(index=’imagerepository’, doc_type=’image’, body=doc)

# Call the function

index_new_document(img_title, img_s3_location, result_dictionary)

Search

Now that we indexed our documents in Elasticsearch we can search for them.

This is an example of queries we can run:

  • Give me all the images that represent this object (searching by label= object_name)
  • What does this image (give the title) represent?
  • Give me all the images that represent this object with at least 90% of probability (search by label= object_name and confidence>= 0.9)

I wrote some Sense queries.

Images that represent a waterfall:

post imagerepository/_search

{

   “query”: {

      “nested”: {

         “path”: “labels”,

         “query”: {

            “bool”: {

               “must”: [

                  {

                     “match”: {

                        “labels.label”: “waterfall”

                     }

                  }

               ]

            }

         }

      }

   }

}

Images that represent a pizza with at least 90% of probability:

post imagerepository/_search

{

   “query”: {

      “nested”: {

         “path”: “labels”,

         “query”: {

            “bool”: {

               “must”: [

                  {

                     “match”: {

                        “labels.label”: “pizza”

                     }

                  },

                  {

                     “range”: {

                        “labels.confidence”:{

                            “gte”: 0.90

                        }

                     }

                  }

               ]

            }

         }

      }

   }

}

Visualize and monitor

With Kibana you can create set of visualizations/dashboards to search for images by label and to monitor index metrics (like number of pictures by label, most common labels and so on).

Using Skedler, an easy to use report scheduling and distribution application for Elasticsearch-Kibana-Grafana, you can centrally schedule and distribute custom reports from Kibana Dashboards and Saved Searches as hourly/daily/weekly/monthly PDF, XLS or PNG reports to various stakeholders. If you want to read more about it: Skedler Review.

With Kibana you can create a dashboard to visualize the number of labels. Here you can see and example with bar/pie chart and tag cloud visualization (you can easily schedule and distribute the dashboard with Skedler).

If you want to get notified when something happen in your index, for example, a certain labels is detected or the number of labels reach a certain number, you can use Alerts. It simplifies how you create and manage alert rules for Elasticsearch and it provides a flexible approach to notification (it supports multiple notifications, from Email to Slack and Webhook).

Conclusion

In this post we have seen how to combine the power of Elasticsearch’s search with the powerful machine learning service AWS Rekognition. The process pipeline includes also a S3 bucket (where the images are stored) and a SQS Queue used to receive event notifications when a new image is stored to S3 (and it is ready for the image labels detection task). This use case show you how to use Elasticsearch as a search engine not only for logs.

If you are familiar with AWS Lambda you can replace the SQS Queue and the consumer with a function (S3 notification supports AWS Lambda as destination) and call the AWS rekognition service from your Lambda function. Keep in mind that with Lambda you have a 5 minutes execution limit and you can’t invoce the function on batch on a set of images (so you will pay the Lambda execution for each image).

I ran this demo using the following environment configuration:

  • Elasticsearch 5.0.0
  • Python 3.4 and Boto3 AWS SDK
  • Ubuntu 14.04

A similar (Tensorflow instead of AWS Rekognition) use case has been presented to the Elastic Stack in a Day in Milan, Italy (June 2017), you can find the slides here: Tensorflow and Elasticsearch: Image Recognition.

Copyright © 2023 Guidanz Inc
Translate »