Back to blogs
2024ยท12 min read

Leveraging AI to Classify Tweets Asynchronously with Python: A Step-by-Step Guide

PythonAINLPAsyncAWS
KA

Khalil Adib

Senior AI Engineer

In the fast-paced world of social media, keeping up with the flood of information can be challenging. Whether you're a marketer, researcher, or just someone keen on staying informed, categorizing tweets effectively can make a huge difference. In this blog post, we'll walk you through how to classify tweets asynchronously using AI-based models in Python. By the end of this guide, you'll have a powerful tool that not only classifies tweets but does so with impressive speed and efficiency.

Why Asynchronous Processing?

Before diving into the code, let's briefly discuss why asynchronous processing is crucial, especially when dealing with multiple tasks. Traditional synchronous processing handles tasks one at a time, which can be slow when you have numerous tweets to classify. Asynchronous processing, on the other hand, allows multiple tasks to run simultaneously, dramatically speeding up the process. This is particularly beneficial when working with APIs or models that may take a few seconds to respond.

Setting Up the Environment

To get started, you need to set up an environment with the necessary dependencies. This includes installing boto3 and aioboto3, which are Python libraries used for interacting with AWS services asynchronously.

pip install boto3
pip install aioboto3

These libraries will enable us to make API calls to AWS services asynchronously, allowing us to classify tweets without delays.

Importing the Required Libraries

Next, let's import the necessary libraries. These will help us manage asynchronous tasks, handle JSON data, and interact with AWS services.

import os
import json
import time
import aioboto3
import asyncio
from typing import Dict, Union

Initializing the AWS Session

To interact with AWS, we'll create an aioboto3 session using environment variables to store our AWS profile and region information. This setup ensures that our script is secure and easy to configure across different environments.

PROFILE_NAME = os.getenv("PROFILE_NAME")
REGION_NAME = os.getenv("REGION_NAME")
session = aioboto3.Session(profile_name=PROFILE_NAME, region_name=REGION_NAME)

Choosing the Right AI Model

For this project, we have a dictionary that holds metadata for various AI models, including model IDs, names, and pricing details. This allows us to dynamically select and use different models based on our requirements.

# supported model dictionary that has model Id, pricing, etc...
supported_llm_ids = {
    'mistral-large': {
        'model_id': 'mistral.mistral-large-2402-v1:0',
        'model_name': 'mistral',
        'input_price': 0.008,
        'output_price': 0.024,
    },
    'mistral-8x7b': {
        'model_id': 'mistral.mixtral-8x7b-instruct-v0:1',
        'model_name': 'mistral',
        'input_price': 0.00045,
        'output_price': 0.0007,
    },
    'mistral-7b': {
        'model_id': 'mistral.mistral-7b-instruct-v0:2',
        'model_name': 'mistral',
        'input_price': 0.00015,
        'output_price': 0.0002,
    },
    'sonnet': {
        'model_id': 'anthropic.claude-3-sonnet-20240229-v1:0',
        'model_name': 'claude-3',
        'input_price': 0.003,
        'output_price': 0.015,
    },
    'haiku': {
        'model_id': 'anthropic.claude-3-haiku-20240307-v1:0',
        'model_name': 'claude-3',
        'input_price': 0.00025,
        'output_price': 0.00125,
    },
    'claude-2:1': {
        'model_id': 'anthropic.claude-v2:1',
        'model_name': 'claude-2',
        'input_price': 0.008,
        'output_price': 0.024,
    },
    'claude-2': {
        'model_id': 'anthropic.claude-v2',
        'model_name': 'claude-2',
        'input_price': 0.008,
        'output_price': 0.024,
    },
}

Building the Input Prompt

The prompt_builder function constructs the input prompt for the model. This prompt will instruct the model to classify the tweet.

def prompt_builder(tweet):
    return f"""Please classify the topic of the following tweet:
<tweet>
{tweet}
<tweet>

please return the output in JSON format:
{{'class': <class>}}
"""

Generating the Payload

The get_payload function generates the payload for the API call dynamically based on the selected model. This function formats the input text and other parameters according to the model's requirements.

# this function is to generate payload dynamically based on model type
def get_payload(model: str, **kwargs) -> Dict:
    assert model in supported_llm_ids.keys(), f'You should choose on of the following: {list(supported_llm_ids.keys())}'
    model = supported_llm_ids[model]
    payload = {
        'modelId': model['model_id'],
        'contentType': 'application/json',
        'accept': 'application/json',
        'body': None,
    }

    model_name = model['model_name']
    match model_name:
        case 'mistral':
            body = json.dumps(
                {
                    'prompt': f'<s>[INST] {kwargs.get("text", "")}[/INST]',
                    'max_tokens': int(kwargs.get('max_tokens', 1024)),
                    'temperature': float(kwargs.get('temperature', 0.5)),
                    'top_p': min(float(kwargs.get('top_p', 0.9)), 1),
                    'top_k': min(int(kwargs.get('top_k', 50)), 200),
                }
            )
        case 'claude-2':
            body = json.dumps(
                {
                    'prompt': f'Human: {kwargs.get("text", "")}\n\nAssistant:',
                    'max_tokens_to_sample': int(kwargs.get('max_tokens', 1024)),
                    'temperature': float(kwargs.get('temperature', 0.5)),
                    'top_p': float(kwargs.get('top_p', 1)),
                    'top_k': int(kwargs.get('top_k', 250)),
                    'stop_sequences': [kwargs.get('stop_sequences', '\n\nHuman:')],
                    'anthropic_version': 'bedrock-2023-05-31',
                }
            )
        case 'claude-3':
            body = json.dumps(
                {
                    'anthropic_version': 'bedrock-2023-05-31',
                    'max_tokens': int(kwargs.get('max_tokens', 1024)),
                    'temperature': float(kwargs.get('temperature', 0.5)),
                    'top_p': float(kwargs.get('top_p', 1)),
                    'top_k': int(kwargs.get('top_k', 250)),
                    'messages': [{'role': 'user', 'content': [{'type': 'text', 'text': kwargs.get('text', '')}]}],
                }
            )
        case _:
            raise Exception

    payload['body'] = body
    return payload

Processing the Output Asynchronously

After receiving the output from the API call, the process_output_async function processes the data asynchronously, extracting the relevant information based on the model used.

# function to process the output from bedrock asynchronously 
async def process_output_async(model: str, bedrock_response: Dict, json_parsable: bool = True) -> Union[str, Dict]:
    output = await bedrock_response['body'].read()
    output = output.decode('utf-8')
    model_name = supported_llm_ids[model]['model_name']
    match model_name:
        case 'claude-2':
            output = json.loads(output)['completion']
        case 'claude-3':
            output = json.loads(output)['content'][0]['text']
        case 'mistral':
            output = json.loads(output)['outputs'][0]['text']
        case _:
            raise Exception

    if json_parsable:
        output = json.loads(output)
    return output

Orchestrating the Classification Process

The main function orchestrates the process of sending the request to the model and processing the response. It leverages the get_payload and process_output_async functions to manage these tasks.

async def main(**kwargs):
    payload = get_payload(text=kwargs['input_prompt'], model=kwargs['model_name'])
    print(f"Start of the prompt {kwargs['idx']}")
    # Create an asynchronous client using aioboto3
    async with session.client("bedrock-runtime") as bedrock_client:
        response = await bedrock_client.invoke_model(**payload)
        output = await process_output_async(model=kwargs['model_name'], bedrock_response=response, json_parsable=False)
    print(f"End of prompt {kwargs['idx']}")
    return output

Defining the List of Tweets

Now, let's define the list of tweets we want to classify. Each tweet is a string representing a social media post.

TWEETS = [
    "Just tried the new coffee shop downtown. Best latte I've ever had! โ˜• #coffee #latte",
    "Excited for the weekend getaway! ๐ŸŒ„โœˆ๏ธ #travel #adventure",
    "Had an amazing workout today. Feeling strong and energized! ๐Ÿ’ช #fitness #health",
    "Just finished reading 'The Great Gatsby'. What a classic! ๐Ÿ“š #books #reading",
    "Who else is watching the new episode of 'The Crown'? ๐Ÿ‘‘ #TV #TheCrown",
    "Feeling grateful for all the support from friends and family. โค๏ธ #gratitude #support",
    "Tried cooking a new recipe tonight and it turned out great! ๐Ÿด #cooking #foodie",
    "Can't believe how quickly the year is flying by. Time to set new goals! โณ #NewYear #goals",
    "The sunset tonight was absolutely breathtaking. ๐ŸŒ… #sunset #nature",
    "Planning a movie night this weekend. Any recommendations? ๐ŸŽฌ #movies #film"
]

Running All Tasks in Parallel

The main_wrapper function is responsible for running all asynchronous calls in parallel. It creates a list of tasks, where each task corresponds to processing a tweet.

async def main_wrapper():
    tasks = [
        main(idx=idx, input_prompt=prompt_builder(tweet=tweet), model_name='sonnet')
        for idx, tweet in enumerate(TWEETS)
    ]
    async_results = await asyncio.gather(*tasks)
    return async_results

Executing the Script

Finally, in the main execution block, the script runs the main_wrapper function using asyncio.run. It also measures the total execution time and prints the classification results for each tweet.

if __name__ == '__main__':
    start_time = time.time()
    results = asyncio.run(main_wrapper())
    end_time = time.time()
    # Print the results
    for i, result in enumerate(results, start=1):
        print(f"Result {i}: {result}")
    execution_time = end_time - start_time
    print(f"Execution time: {execution_time} seconds")

Results and Performance

When you execute the script, it classifies each tweet asynchronously and prints the results along with the total time taken. Here's an example of the output:

Start of the prompt 0
Start of the prompt 1
Start of the prompt 2
Start of the prompt 3
Start of the prompt 4
Start of the prompt 5
Start of the prompt 6
Start of the prompt 7
Start of the prompt 8
Start of the prompt 9
End of prompt 4
End of prompt 2
End of prompt 1
End of prompt 8
End of prompt 5
End of prompt 9
End of prompt 6
End of prompt 0
End of prompt 3
End of prompt 7
Result 1: {'class': 'Food & Drink'}
Result 2: {'class': 'Travel'}
Result 3: {'class': 'fitness'}
Result 4: {'class': 'literature'}
Result 5: {'class': 'Entertainment'}
Result 6: {'class': 'Personal'}
Result 7: {'class': 'Food & Cooking'}
Result 8: {'class': 'Personal Life'}
Result 9: {'class': 'nature'}
Result 10: {'class': 'Entertainment'}
Execution time: 2.5976717472076416 seconds

Conclusion

By following this guide, you've learned how to classify tweets asynchronously using Python and AI models. This approach not only makes the classification process more efficient but also scales well when dealing with large datasets. Whether you're analyzing sentiment, categorizing topics, or simply organizing content, asynchronous processing with AI models is a powerful tool that can significantly boost your productivity.

Happy coding!