Comparing Rust and Python for Asynchronous and Multithreaded Operations – Qdrant vector database
Introduction
In this guide, we will compare Rust and Python for asynchronous and multithreaded operations, particularly focusing on uploading data to a Qdrant vector search engine. We will evaluate how using Arc
in Rust and AsyncQdrantClient
in Python impacts performance. Imagine you get approached by a company to speed up their code and you improve it? There might be a nice drink it for you?
Performance Expectation:
On a newer machine with better CPU and I/O capabilities, the performance difference between parallel and sequential uploads should be more noticeable due to better handling of concurrent tasks and faster processing speeds.
Webdock – Fast Cloud VPS Linux Hosting
Rust Implementation
Setup
First, install the necessary crates:
[dependencies]
futures = "0.3.30"
qdrant-client = "1.10.2"
tokio = { version = "1.39.1", features = ["rt-multi-thread"] }
Rust Code for Parallel and Sequential Upload
use qdrant_client::qdrant::{
Condition, CreateCollectionBuilder, Distance, Filter, PointStruct, ScalarQuantizationBuilder,
SearchParamsBuilder, SearchPointsBuilder, UpsertPointsBuilder, VectorParamsBuilder,
};
use qdrant_client::{Payload, Qdrant, QdrantError};
use futures::future::join_all;
use std::sync::Arc;
use thiserror::Error;
use tokio::task::JoinError;
use tokio::time::Instant;
#[derive(Error, Debug)]
enum CustomError {
#[error("Qdrant error: {0}")]
Qdrant(#[from] QdrantError),
#[error("Join error: {0}")]
Join(#[from] JoinError),
}
#[tokio::main]
async fn main() -> Result<(), CustomError> {
// Example of top level client
let client = Arc::new(Qdrant::from_url("http://localhost:6334").build()?);
let collections_list = client.list_collections().await?;
dbg!(collections_list);
let collection_name = "test";
client.delete_collection(collection_name).await?;
client
.create_collection(
CreateCollectionBuilder::new(collection_name)
.vectors_config(VectorParamsBuilder::new(10, Distance::Cosine))
.quantization_config(ScalarQuantizationBuilder::default()),
)
.await?;
let collection_info = client.collection_info(collection_name).await?;
dbg!(collection_info);
let payload: Payload = serde_json::json!(
{
"foo": "Bar",
"bar": 12,
"baz": {
"qux": "quux"
}
}
)
.try_into()
.unwrap();
// Create multiple points
let mut points = Vec::new();
for i in 0..100 { // Example with 100 points
points.push(PointStruct::new(i, vec![12.; 10], payload.clone()));
}
// Measure time for parallel upload
let start_parallel = Instant::now();
// Split points into chunks and upsert them in parallel
let chunk_size = 10;
let mut tasks = Vec::new();
for chunk in points.chunks(chunk_size) {
let client_clone = Arc::clone(&client);
let points_chunk = chunk.to_vec();
let collection_name = collection_name.to_string();
// Spawn a new task for each chunk to run concurrently
tasks.push(tokio::spawn(async move {
client_clone
.upsert_points(UpsertPointsBuilder::new(&collection_name, points_chunk))
.await
}));
}
// Wait for all tasks to complete
let results = join_all(tasks).await;
for result in results {
result??; // Convert JoinError to CustomError and propagate QdrantError if any
}
let duration_parallel = start_parallel.elapsed();
// Sequential upload for comparison
let start_sequential = Instant::now();
for chunk in points.chunks(chunk_size) {
client
.upsert_points(UpsertPointsBuilder::new(collection_name, chunk.to_vec()))
.await?;
}
let duration_sequential = start_sequential.elapsed();
let search_result = client
.search_points(
SearchPointsBuilder::new(collection_name, [11.; 10], 10)
.filter(Filter::all([Condition::matches("bar", 12)]))
.with_payload(true)
.params(SearchParamsBuilder::default().exact(true)),
)
.await?;
dbg!(&search_result);
let found_point = search_result.result.into_iter().next().unwrap();
let mut payload = found_point.payload;
let baz_payload = payload.remove("baz").unwrap().into_json();
println!("baz: {}", baz_payload);
// Print durations at the end
println!("Parallel upload took: {:?}", duration_parallel);
println!("Sequential upload took: {:?}", duration_sequential);
Ok(())
}
Python Implementation
Setup
First, install the necessary package:
pip install qdrant-client
Python Code for Parallel and Sequential Upload
import asyncio
import numpy as np
from qdrant_client import AsyncQdrantClient
from qdrant_client.models import VectorParams, Distance, PointStruct, Filter, FieldCondition, Range
from time import time
async def main():
client = AsyncQdrantClient(url="http://localhost:6333")
collection_name = "test"
# Recreate collection
await client.recreate_collection(
collection_name=collection_name,
vectors_config=VectorParams(size=100, distance=Distance.COSINE),
)
# Create a complex payload
payload = {
"foo": "Bar",
"bar": 12,
"baz": {
"qux": "quux",
"nested": {
"level1": {
"level2": "data",
"array": [1, 2, 3, 4, 5]
}
}
}
}
# Generate multiple points
num_points = 10000
vector_size = 100
vectors = np.random.rand(num_points, vector_size)
points = [
PointStruct(
id=idx,
vector=vector.tolist(),
payload=payload
)
for idx, vector in enumerate(vectors)
]
# Parallel upload
start_parallel = time()
chunk_size = 100
tasks = [
client.upsert(
collection_name=collection_name,
points=points[i:i + chunk_size]
)
for i in range(0, len(points), chunk_size)
]
await asyncio.gather(*tasks)
duration_parallel = time() - start_parallel
# Sequential upload
start_sequential = time()
for i in range(0, len(points), chunk_size):
await client.upsert(
collection_name=collection_name,
points=points[i:i + chunk_size]
)
duration_sequential = time() - start_sequential
# Perform a search query
query_vector = np.random.rand(vector_size).tolist()
search_result = await client.search(
collection_name=collection_name,
query_vector=query_vector,
query_filter=Filter(
must=[FieldCondition(
key='bar',
range=Range(gte=12)
)]
),
limit=10,
)
print(search_result)
# Print durations
print(f"Parallel upload took: {duration_parallel:.2f} seconds")
print(f"Sequential upload took: {duration_sequential:.2f} seconds")
if __name__ == "__main__":
asyncio.run(main())
Explanation of Arc
Usage
Arc
(Atomic Reference Counted) is used in Rust to allow multiple threads or async tasks to share ownership of the same instance of data, such as a Qdrant client.
- Shared Ownership:
Arc
enables multiple parts of your program to own and use the same resource safely. - Efficiency: Cloning an
Arc
is cheap compared to cloning the underlying data. - Concurrency: With
Arc
, multiple threads or tasks can concurrently access a shared resource without needing a mutex for read-only access.
Summary
This guide demonstrates how to implement and compare parallel and sequential data uploads in Rust and Python using their respective async and concurrency features. Both implementations aim to perform the same tasks with similar performance considerations. As you can see, we get quicker upserts with Rust. Nice!
If you go from sequential Python to Parallel Rust you halve the time taken!
Parallel upload took: 5.09 seconds
Sequential upload took: 7.62 seconds
Parallel upload took: 3.620016425s
Sequential upload took: 5.665954612s
❯ lscpu
Architecture: x86_64
CPU op-mode(s): 32-bit, 64-bit
Address sizes: 39 bits physical, 48 bits virtual
Byte Order: Little Endian
CPU(s): 8
On-line CPU(s) list: 0-7
Vendor ID: GenuineIntel
Model name: Intel(R) Core(TM) i5-8350U CPU @ 1.70GHz
CPU family: 6
Model: 142
Thread(s) per core: 2
Core(s) per socket: 4
Socket(s): 1
Stepping: 10
CPU max MHz: 3600.0000
CPU min MHz: 400.0000