Leveraging AI to Classify Tweets Asynchronously with Python: A Step-by-Step Guide
Khalil Adib
Senior AI Engineer
In the fast-paced world of social media, keeping up with the flood of information can be challenging. Whether you're a marketer, researcher, or just someone keen on staying informed, categorizing tweets effectively can make a huge difference. In this blog post, we'll walk you through how to classify tweets asynchronously using AI-based models in Python. By the end of this guide, you'll have a powerful tool that not only classifies tweets but does so with impressive speed and efficiency.
Why Asynchronous Processing?
Before diving into the code, let's briefly discuss why asynchronous processing is crucial, especially when dealing with multiple tasks. Traditional synchronous processing handles tasks one at a time, which can be slow when you have numerous tweets to classify. Asynchronous processing, on the other hand, allows multiple tasks to run simultaneously, dramatically speeding up the process. This is particularly beneficial when working with APIs or models that may take a few seconds to respond.
Setting Up the Environment
To get started, you need to set up an environment with the necessary dependencies. This includes installing boto3 and aioboto3, which are Python libraries used for interacting with AWS services asynchronously.
pip install boto3
pip install aioboto3
These libraries will enable us to make API calls to AWS services asynchronously, allowing us to classify tweets without delays.
Importing the Required Libraries
Next, let's import the necessary libraries. These will help us manage asynchronous tasks, handle JSON data, and interact with AWS services.
import os
import json
import time
import aioboto3
import asyncio
from typing import Dict, Union
Initializing the AWS Session
To interact with AWS, we'll create an aioboto3 session using environment variables to store our AWS profile and region information. This setup ensures that our script is secure and easy to configure across different environments.
PROFILE_NAME = os.getenv("PROFILE_NAME")
REGION_NAME = os.getenv("REGION_NAME")
session = aioboto3.Session(profile_name=PROFILE_NAME, region_name=REGION_NAME)
Choosing the Right AI Model
For this project, we have a dictionary that holds metadata for various AI models, including model IDs, names, and pricing details. This allows us to dynamically select and use different models based on our requirements.
# supported model dictionary that has model Id, pricing, etc...
supported_llm_ids = {
'mistral-large': {
'model_id': 'mistral.mistral-large-2402-v1:0',
'model_name': 'mistral',
'input_price': 0.008,
'output_price': 0.024,
},
'mistral-8x7b': {
'model_id': 'mistral.mixtral-8x7b-instruct-v0:1',
'model_name': 'mistral',
'input_price': 0.00045,
'output_price': 0.0007,
},
'mistral-7b': {
'model_id': 'mistral.mistral-7b-instruct-v0:2',
'model_name': 'mistral',
'input_price': 0.00015,
'output_price': 0.0002,
},
'sonnet': {
'model_id': 'anthropic.claude-3-sonnet-20240229-v1:0',
'model_name': 'claude-3',
'input_price': 0.003,
'output_price': 0.015,
},
'haiku': {
'model_id': 'anthropic.claude-3-haiku-20240307-v1:0',
'model_name': 'claude-3',
'input_price': 0.00025,
'output_price': 0.00125,
},
'claude-2:1': {
'model_id': 'anthropic.claude-v2:1',
'model_name': 'claude-2',
'input_price': 0.008,
'output_price': 0.024,
},
'claude-2': {
'model_id': 'anthropic.claude-v2',
'model_name': 'claude-2',
'input_price': 0.008,
'output_price': 0.024,
},
}
Building the Input Prompt
The prompt_builder function constructs the input prompt for the model. This prompt will instruct the model to classify the tweet.
def prompt_builder(tweet):
return f"""Please classify the topic of the following tweet:
<tweet>
{tweet}
<tweet>
please return the output in JSON format:
{{'class': <class>}}
"""
Generating the Payload
The get_payload function generates the payload for the API call dynamically based on the selected model. This function formats the input text and other parameters according to the model's requirements.
# this function is to generate payload dynamically based on model type
def get_payload(model: str, **kwargs) -> Dict:
assert model in supported_llm_ids.keys(), f'You should choose on of the following: {list(supported_llm_ids.keys())}'
model = supported_llm_ids[model]
payload = {
'modelId': model['model_id'],
'contentType': 'application/json',
'accept': 'application/json',
'body': None,
}
model_name = model['model_name']
match model_name:
case 'mistral':
body = json.dumps(
{
'prompt': f'<s>[INST] {kwargs.get("text", "")}[/INST]',
'max_tokens': int(kwargs.get('max_tokens', 1024)),
'temperature': float(kwargs.get('temperature', 0.5)),
'top_p': min(float(kwargs.get('top_p', 0.9)), 1),
'top_k': min(int(kwargs.get('top_k', 50)), 200),
}
)
case 'claude-2':
body = json.dumps(
{
'prompt': f'Human: {kwargs.get("text", "")}\n\nAssistant:',
'max_tokens_to_sample': int(kwargs.get('max_tokens', 1024)),
'temperature': float(kwargs.get('temperature', 0.5)),
'top_p': float(kwargs.get('top_p', 1)),
'top_k': int(kwargs.get('top_k', 250)),
'stop_sequences': [kwargs.get('stop_sequences', '\n\nHuman:')],
'anthropic_version': 'bedrock-2023-05-31',
}
)
case 'claude-3':
body = json.dumps(
{
'anthropic_version': 'bedrock-2023-05-31',
'max_tokens': int(kwargs.get('max_tokens', 1024)),
'temperature': float(kwargs.get('temperature', 0.5)),
'top_p': float(kwargs.get('top_p', 1)),
'top_k': int(kwargs.get('top_k', 250)),
'messages': [{'role': 'user', 'content': [{'type': 'text', 'text': kwargs.get('text', '')}]}],
}
)
case _:
raise Exception
payload['body'] = body
return payload
Processing the Output Asynchronously
After receiving the output from the API call, the process_output_async function processes the data asynchronously, extracting the relevant information based on the model used.
# function to process the output from bedrock asynchronously
async def process_output_async(model: str, bedrock_response: Dict, json_parsable: bool = True) -> Union[str, Dict]:
output = await bedrock_response['body'].read()
output = output.decode('utf-8')
model_name = supported_llm_ids[model]['model_name']
match model_name:
case 'claude-2':
output = json.loads(output)['completion']
case 'claude-3':
output = json.loads(output)['content'][0]['text']
case 'mistral':
output = json.loads(output)['outputs'][0]['text']
case _:
raise Exception
if json_parsable:
output = json.loads(output)
return output
Orchestrating the Classification Process
The main function orchestrates the process of sending the request to the model and processing the response. It leverages the get_payload and process_output_async functions to manage these tasks.
async def main(**kwargs):
payload = get_payload(text=kwargs['input_prompt'], model=kwargs['model_name'])
print(f"Start of the prompt {kwargs['idx']}")
# Create an asynchronous client using aioboto3
async with session.client("bedrock-runtime") as bedrock_client:
response = await bedrock_client.invoke_model(**payload)
output = await process_output_async(model=kwargs['model_name'], bedrock_response=response, json_parsable=False)
print(f"End of prompt {kwargs['idx']}")
return output
Defining the List of Tweets
Now, let's define the list of tweets we want to classify. Each tweet is a string representing a social media post.
TWEETS = [
"Just tried the new coffee shop downtown. Best latte I've ever had! โ #coffee #latte",
"Excited for the weekend getaway! ๐โ๏ธ #travel #adventure",
"Had an amazing workout today. Feeling strong and energized! ๐ช #fitness #health",
"Just finished reading 'The Great Gatsby'. What a classic! ๐ #books #reading",
"Who else is watching the new episode of 'The Crown'? ๐ #TV #TheCrown",
"Feeling grateful for all the support from friends and family. โค๏ธ #gratitude #support",
"Tried cooking a new recipe tonight and it turned out great! ๐ด #cooking #foodie",
"Can't believe how quickly the year is flying by. Time to set new goals! โณ #NewYear #goals",
"The sunset tonight was absolutely breathtaking. ๐
#sunset #nature",
"Planning a movie night this weekend. Any recommendations? ๐ฌ #movies #film"
]
Running All Tasks in Parallel
The main_wrapper function is responsible for running all asynchronous calls in parallel. It creates a list of tasks, where each task corresponds to processing a tweet.
async def main_wrapper():
tasks = [
main(idx=idx, input_prompt=prompt_builder(tweet=tweet), model_name='sonnet')
for idx, tweet in enumerate(TWEETS)
]
async_results = await asyncio.gather(*tasks)
return async_results
Executing the Script
Finally, in the main execution block, the script runs the main_wrapper function using asyncio.run. It also measures the total execution time and prints the classification results for each tweet.
if __name__ == '__main__':
start_time = time.time()
results = asyncio.run(main_wrapper())
end_time = time.time()
# Print the results
for i, result in enumerate(results, start=1):
print(f"Result {i}: {result}")
execution_time = end_time - start_time
print(f"Execution time: {execution_time} seconds")
Results and Performance
When you execute the script, it classifies each tweet asynchronously and prints the results along with the total time taken. Here's an example of the output:
Start of the prompt 0
Start of the prompt 1
Start of the prompt 2
Start of the prompt 3
Start of the prompt 4
Start of the prompt 5
Start of the prompt 6
Start of the prompt 7
Start of the prompt 8
Start of the prompt 9
End of prompt 4
End of prompt 2
End of prompt 1
End of prompt 8
End of prompt 5
End of prompt 9
End of prompt 6
End of prompt 0
End of prompt 3
End of prompt 7
Result 1: {'class': 'Food & Drink'}
Result 2: {'class': 'Travel'}
Result 3: {'class': 'fitness'}
Result 4: {'class': 'literature'}
Result 5: {'class': 'Entertainment'}
Result 6: {'class': 'Personal'}
Result 7: {'class': 'Food & Cooking'}
Result 8: {'class': 'Personal Life'}
Result 9: {'class': 'nature'}
Result 10: {'class': 'Entertainment'}
Execution time: 2.5976717472076416 seconds
Conclusion
By following this guide, you've learned how to classify tweets asynchronously using Python and AI models. This approach not only makes the classification process more efficient but also scales well when dealing with large datasets. Whether you're analyzing sentiment, categorizing topics, or simply organizing content, asynchronous processing with AI models is a powerful tool that can significantly boost your productivity.
Happy coding!