Building a Scalable Ingestion Pipeline With Rust
In today’s fast-paced software development landscape, creating systems that are both scalable and maintainable is crucial. This blog post takes you through an ingestion pipeline built with Rust, showcasing the design patterns and architectural choices that make it robust and efficient.
Overview of the Ingestion Pipeline #
The ingestion pipeline is designed to handle audio files efficiently. It processes these files, generates waveforms, and analyzes tracks using a combination of AWS services, such as SQS (Simple Queue Service) and S3 (Simple Storage Service). The architecture of this pipeline allows it to manage multiple tasks at the same time, ensuring that audio files are processed quickly while keeping the system organized and easy to maintain.
Key Design Patterns and Architectural Choices #
Asynchronous Programming with Tokio #
One of the standout features of this pipeline is its use of the Tokio runtime for asynchronous programming. This approach allows the system to perform non-blocking I/O operations, which is essential when dealing with numerous audio file uploads and processing tasks simultaneously.
By using async/await
, the code remains clean and easy to read,
making it straightforward to follow the flow of data through the
pipeline. This means that while one task is waiting for a file to
upload, other tasks can continue running, which significantly improves
performance.
#[tokio::main]
async fn main() -> Result<()> {
let sqs = Arc::new(SQS::new().await);
let mut handles = vec![];
let ingestion_handle = tokio::spawn(async move { operations::ingestion::run(&sqs).await });
let track_analysis_handle = tokio::spawn(async move { operations::track_analysis::run(&sqs).await });
handles.push(ingestion_handle);
handles.push(track_analysis_handle);
tokio::signal::ctrl_c().await?;
for handle in handles {
handle.abort();
}
Ok(())
}
Message Queuing with AWS SQS #
The pipeline leverages AWS SQS for message queuing, which decouples the various components of the system. Each stage of the pipeline can operate independently, processing messages as they arrive. This design choice greatly enhances scalability; if the load increases, you can easily add more consumers to handle the extra messages without needing to change the existing code.
Here is the pattern we implemented for the SQS client that allows us to easily publish or subscribe to SQS queues in an asynchronous way from within the pipelines.
use aws_sdk_sqs::Client;
use serde::{de::DeserializeOwned, Serialize};
#[derive(Debug)]
pub struct SQS {
client: Client,
}
#[derive(Debug, Serialize)]
pub struct Message<T: Serialize> {
pub body: T,
}
impl SQS {
pub async fn new() -> Self {
let aws_config = aws::load_from_env().await;
let client = Client::new(&aws_config);
Self { client }
}
pub async fn publish<T>(&self, queue: &String, message: &Message<T>) -> Result<()>
where
T: Serialize,
{
self.client
.send_message()
.queue_url(queu)
.message_body(serde_json::to_string(&message.body)?)
.send()
.await
.map_err(|e| {
eyre!(
"Failed to publish message to SQS queue: {:?}, error: {:?}",
queue,
e
)
})?;
Ok(())
}
pub async fn subscribe<'a, T, F, Fut>(&'a self, queue: &str, callback: F) -> Result<()>
where
T: DeserializeOwned,
F: Fn(T, &'a SQS) -> Fut,
Fut: Future<Output = Result<()>>,
{
loop {
match self
.client
.receive_message()
.wait_time_seconds(1)
.queue_url(queue)
.send()
.await
{
Ok(response) => {
for message in response.messages.unwrap_or_default() {
log::info!("Received SQS Message: {:?}, from queue: {}", message, queue);
let cloned_message = message.clone();
let receipt_handle = cloned_message.receipt_handle().unwrap();
if let Some(body) = message.body {
log::info!("Trying to deserialize body: {:?}", body);
match serde_json::from_str::<T>(&body) {
Ok(message) => {
if let Err(e) = callback(message, &self).await {
log::error!(
"Callback failed for SQS queue: {}, error: {:?}",
queue,
e
);
} else {
if let Err(e) = self
.client
.delete_message()
.queue_url(queue)
.receipt_handle(receipt_handle)
.send()
.await
{
log::error!("Failed to delete message from queue: {:?}, error: {:?}", queue, e);
} else {
log::info!("Message deleted from SQS queue");
}
}
}
Err(e) => {
log::error!("Error decoding message from SQS queue: {}, error: {:?}", queue, e);
}
}
}
}
}
Err(e) => {
log::error!("Error receiving messages from AWS SQS: {:?}", e);
}
}
}
}
}
Modular Architecture for Enhanced Maintainability #
The codebase is organized into distinct modules, each responsible for
a specific functionality, such as aws
, backend
, tasks
, and
operations
. This modular design promotes separation of concerns,
making the code easier to maintain and extend.
Each module encapsulates its logic, allowing for independent testing
and development. For example, the tasks
module contains the
Transcoder
and WaveformGenerator
, which handle specific tasks in
the pipeline. This organization not only makes the code cleaner but
also allows developers to work on different parts of the system
without stepping on each other’s toes.
mod tasks {
pub mod transcoder;
pub mod waveform_generator;
}
Task Trait for Abstraction and Flexibility #
The Task
trait defines a common interface for all tasks within the
pipeline. This abstraction allows for different implementations of
tasks, such as transcoding and waveform generation, to be executed
interchangeably.
By using traits, the code adheres to the Open/Closed Principle, which means you can add new tasks without modifying existing code. This flexibility is vital for adapting to new requirements or technologies as they emerge.
use async_trait::async_trait;
use color_eyre::Result;
pub trait Task: Send + Sync {
async fn run(self) -> Result<()>;
}
Robust Error Handling with Color Eyre #
Error handling is a critical aspect of any robust system. The pipeline
employs the color_eyre
crate for error handling, providing rich
context for any issues that arise during execution. This enhances
observability, making it easier to diagnose problems when they occur.
By consistently using Result
types throughout the codebase, errors
are propagated up the call stack, allowing for centralized error
handling and logging. This means that when something goes wrong, you
have all the information you need to fix it quickly.
use color_eyre::Result;
fn example_function() -> Result<()> {
// Some code that might fail
Ok(())
}
Efficient Lazy Initialization with Lazy Static #
The lazy_static
crate is used to initialize global settings and
configurations only when they are first accessed. This approach
minimizes startup time and memory usage, as configurations are loaded
on demand.
The global SETTING
instance acts as a centralized configuration
management system, making it easy to access application settings
throughout the codebase. This ensures that your application runs
efficiently without unnecessary overhead.
#[macro_use]
extern crate lazy_static;
lazy_static! {
static ref SETTING: Setting = Setting::new().unwrap();
}
Structured Logging and Monitoring #
Finally, the pipeline integrates structured logging using the log
crate. This provides valuable insights into the system’s behavior
during execution, which is essential for monitoring the health of the
pipeline and diagnosing issues in production.
By logging at various levels (info, error), developers can filter logs based on severity, making it easier to identify critical issues and respond promptly.
use log::{info, error};
fn example_logging() {
info!("This is an info message");
error!("This is an error message");
}
Scalability Considerations #
The design choices made in this ingestion pipeline significantly contribute to its scalability:
- Horizontal Scalability: By using SQS, multiple consumers can process messages concurrently, allowing the system to scale horizontally as the load increases. New instances of the application can be spun up to handle additional traffic without requiring changes to the existing codebase.
- Decoupled Components: Each component of the pipeline operates independently, making it easy to update and modify. For instance, if a new audio processing algorithm is introduced, it can be added as a new task without affecting the existing pipeline.
- Efficient Resource Utilization: The asynchronous nature of the pipeline ensures that resources are utilized efficiently. While one task is waiting for I/O operations (like downloading files from S3), other tasks can continue processing, maximizing throughput.
Conclusion: A Robust and Scalable Solution #
The ingestion pipeline crafted with Rust exemplifies modern software design principles, emphasizing scalability, maintainability, and robustness. By leveraging asynchronous programming, modular architecture, and effective error handling, the pipeline is well-equipped to meet the demands of processing audio files in a cloud-native environment.