Building a Scalable Ingestion Pipeline With Rust

In today’s fast-paced software development landscape, creating systems that are both scalable and maintainable is crucial. This blog post takes you through an ingestion pipeline built with Rust, showcasing the design patterns and architectural choices that make it robust and efficient.

Photo by Christian Bass on Unsplash. Image

Overview of the Ingestion Pipeline #

The ingestion pipeline is designed to handle audio files efficiently. It processes these files, generates waveforms, and analyzes tracks using a combination of AWS services, such as SQS (Simple Queue Service) and S3 (Simple Storage Service). The architecture of this pipeline allows it to manage multiple tasks at the same time, ensuring that audio files are processed quickly while keeping the system organized and easy to maintain.

Key Design Patterns and Architectural Choices #

Asynchronous Programming with Tokio #

One of the standout features of this pipeline is its use of the Tokio runtime for asynchronous programming. This approach allows the system to perform non-blocking I/O operations, which is essential when dealing with numerous audio file uploads and processing tasks simultaneously.

By using async/await, the code remains clean and easy to read, making it straightforward to follow the flow of data through the pipeline. This means that while one task is waiting for a file to upload, other tasks can continue running, which significantly improves performance.

#[tokio::main]
async fn main() -> Result<()> {
	let sqs = Arc::new(SQS::new().await);

	let mut handles = vec![];
	let ingestion_handle = tokio::spawn(async move { operations::ingestion::run(&sqs).await });
	let track_analysis_handle = tokio::spawn(async move { operations::track_analysis::run(&sqs).await });
	handles.push(ingestion_handle);
	handles.push(track_analysis_handle);

	tokio::signal::ctrl_c().await?;
	for handle in handles {
		handle.abort();
	}
	Ok(())
}

Message Queuing with AWS SQS #

The pipeline leverages AWS SQS for message queuing, which decouples the various components of the system. Each stage of the pipeline can operate independently, processing messages as they arrive. This design choice greatly enhances scalability; if the load increases, you can easily add more consumers to handle the extra messages without needing to change the existing code.

Here is the pattern we implemented for the SQS client that allows us to easily publish or subscribe to SQS queues in an asynchronous way from within the pipelines.

use aws_sdk_sqs::Client;
use serde::{de::DeserializeOwned, Serialize};

#[derive(Debug)]
pub struct SQS {
	client: Client,
}

#[derive(Debug, Serialize)]
pub struct Message<T: Serialize> {
	pub body: T,
}

impl SQS {
	pub async fn new() -> Self {
		let aws_config = aws::load_from_env().await;
		let client = Client::new(&aws_config);
		Self { client }
	}

	pub async fn publish<T>(&self, queue: &String, message: &Message<T>) -> Result<()>
	where
		T: Serialize,
	{
		self.client
			.send_message()
			.queue_url(queu)
			.message_body(serde_json::to_string(&message.body)?)
			.send()
			.await
			.map_err(|e| {
				eyre!(
					"Failed to publish message to SQS queue: {:?}, error: {:?}",
					queue,
					e
				)
			})?;
		Ok(())
	}

	pub async fn subscribe<'a, T, F, Fut>(&'a self, queue: &str, callback: F) -> Result<()>
	where
		T: DeserializeOwned,
		F: Fn(T, &'a SQS) -> Fut,
		Fut: Future<Output = Result<()>>,
	{
		loop {
			match self
				.client
				.receive_message()
				.wait_time_seconds(1)
				.queue_url(queue)
				.send()
				.await
			{
				Ok(response) => {
					for message in response.messages.unwrap_or_default() {
						log::info!("Received SQS Message: {:?}, from queue: {}", message, queue);
						let cloned_message = message.clone();
						let receipt_handle = cloned_message.receipt_handle().unwrap();
						if let Some(body) = message.body {
							log::info!("Trying to deserialize body: {:?}", body);
							match serde_json::from_str::<T>(&body) {
								Ok(message) => {
									if let Err(e) = callback(message, &self).await {
										log::error!(
											"Callback failed for SQS queue: {}, error: {:?}",
											queue,
											e
										);
									} else {
										if let Err(e) = self
											.client
											.delete_message()
											.queue_url(queue)
											.receipt_handle(receipt_handle)
											.send()
											.await
										{
											log::error!("Failed to delete message from queue: {:?}, error: {:?}", queue, e);
										} else {
											log::info!("Message deleted from SQS queue");
										}
									}
								}
								Err(e) => {
									log::error!("Error decoding message from SQS queue: {}, error: {:?}", queue, e);
								}
							}
						}
					}
				}
				Err(e) => {
					log::error!("Error receiving messages from AWS SQS: {:?}", e);
				}
			}
		}
	}
}

Modular Architecture for Enhanced Maintainability #

The codebase is organized into distinct modules, each responsible for a specific functionality, such as aws, backend, tasks, and operations. This modular design promotes separation of concerns, making the code easier to maintain and extend.

Each module encapsulates its logic, allowing for independent testing and development. For example, the tasks module contains the Transcoder and WaveformGenerator, which handle specific tasks in the pipeline. This organization not only makes the code cleaner but also allows developers to work on different parts of the system without stepping on each other’s toes.

mod tasks {
	pub mod transcoder;
	pub mod waveform_generator;
}

Task Trait for Abstraction and Flexibility #

The Task trait defines a common interface for all tasks within the pipeline. This abstraction allows for different implementations of tasks, such as transcoding and waveform generation, to be executed interchangeably.

By using traits, the code adheres to the Open/Closed Principle, which means you can add new tasks without modifying existing code. This flexibility is vital for adapting to new requirements or technologies as they emerge.

use async_trait::async_trait;
use color_eyre::Result;

pub trait Task: Send + Sync {
	async fn run(self) -> Result<()>;
}

Robust Error Handling with Color Eyre #

Error handling is a critical aspect of any robust system. The pipeline employs the color_eyre crate for error handling, providing rich context for any issues that arise during execution. This enhances observability, making it easier to diagnose problems when they occur.

By consistently using Result types throughout the codebase, errors are propagated up the call stack, allowing for centralized error handling and logging. This means that when something goes wrong, you have all the information you need to fix it quickly.

use color_eyre::Result;

fn example_function() -> Result<()> {
	// Some code that might fail
	Ok(())
}

Efficient Lazy Initialization with Lazy Static #

The lazy_static crate is used to initialize global settings and configurations only when they are first accessed. This approach minimizes startup time and memory usage, as configurations are loaded on demand.

The global SETTING instance acts as a centralized configuration management system, making it easy to access application settings throughout the codebase. This ensures that your application runs efficiently without unnecessary overhead.

#[macro_use]
extern crate lazy_static;

lazy_static! {
	static ref SETTING: Setting = Setting::new().unwrap();
}

Structured Logging and Monitoring #

Finally, the pipeline integrates structured logging using the log crate. This provides valuable insights into the system’s behavior during execution, which is essential for monitoring the health of the pipeline and diagnosing issues in production.

By logging at various levels (info, error), developers can filter logs based on severity, making it easier to identify critical issues and respond promptly.

use log::{info, error};

fn example_logging() {
	info!("This is an info message");
	error!("This is an error message");
}

Scalability Considerations #

The design choices made in this ingestion pipeline significantly contribute to its scalability:

Conclusion: A Robust and Scalable Solution #

The ingestion pipeline crafted with Rust exemplifies modern software design principles, emphasizing scalability, maintainability, and robustness. By leveraging asynchronous programming, modular architecture, and effective error handling, the pipeline is well-equipped to meet the demands of processing audio files in a cloud-native environment.