Comparing Rust and Python for Asynchronous and Multithreaded Operations – Qdrant vector database


Introduction

In this guide, we will compare Rust and Python for asynchronous and multithreaded operations, particularly focusing on uploading data to a Qdrant vector search engine. We will evaluate how using Arc in Rust and AsyncQdrantClient in Python impacts performance. Imagine you get approached by a company to speed up their code and you improve it? There might be a nice drink it for you?

nice!

Performance Expectation:

On a newer machine with better CPU and I/O capabilities, the performance difference between parallel and sequential uploads should be more noticeable due to better handling of concurrent tasks and faster processing speeds.

Webdock – Fast Cloud VPS Linux Hosting

Rust Implementation

Setup

First, install the necessary crates:

[dependencies]
futures = "0.3.30"
qdrant-client = "1.10.2"
tokio = { version = "1.39.1", features = ["rt-multi-thread"] }

Rust Code for Parallel and Sequential Upload

use qdrant_client::qdrant::{
    Condition, CreateCollectionBuilder, Distance, Filter, PointStruct, ScalarQuantizationBuilder,
    SearchParamsBuilder, SearchPointsBuilder, UpsertPointsBuilder, VectorParamsBuilder,
};
use qdrant_client::{Payload, Qdrant, QdrantError};
use futures::future::join_all;
use std::sync::Arc;
use thiserror::Error;
use tokio::task::JoinError;
use tokio::time::Instant;

#[derive(Error, Debug)]
enum CustomError {
    #[error("Qdrant error: {0}")]
    Qdrant(#[from] QdrantError),
    #[error("Join error: {0}")]
    Join(#[from] JoinError),
}

#[tokio::main]
async fn main() -> Result<(), CustomError> {
    // Example of top level client
    let client = Arc::new(Qdrant::from_url("http://localhost:6334").build()?);

    let collections_list = client.list_collections().await?;
    dbg!(collections_list);

    let collection_name = "test";
    client.delete_collection(collection_name).await?;

    client
        .create_collection(
            CreateCollectionBuilder::new(collection_name)
                .vectors_config(VectorParamsBuilder::new(10, Distance::Cosine))
                .quantization_config(ScalarQuantizationBuilder::default()),
        )
        .await?;

    let collection_info = client.collection_info(collection_name).await?;
    dbg!(collection_info);

    let payload: Payload = serde_json::json!(
        {
            "foo": "Bar",
            "bar": 12,
            "baz": {
                "qux": "quux"
            }
        }
    )
    .try_into()
    .unwrap();

    // Create multiple points
    let mut points = Vec::new();
    for i in 0..100 {  // Example with 100 points
        points.push(PointStruct::new(i, vec![12.; 10], payload.clone()));
    }

    // Measure time for parallel upload
    let start_parallel = Instant::now();

    // Split points into chunks and upsert them in parallel
    let chunk_size = 10;
    let mut tasks = Vec::new();
    for chunk in points.chunks(chunk_size) {
        let client_clone = Arc::clone(&client);
        let points_chunk = chunk.to_vec();
        let collection_name = collection_name.to_string();
        // Spawn a new task for each chunk to run concurrently
        tasks.push(tokio::spawn(async move {
            client_clone
                .upsert_points(UpsertPointsBuilder::new(&collection_name, points_chunk))
                .await
        }));
    }

    // Wait for all tasks to complete
    let results = join_all(tasks).await;
    for result in results {
        result??; // Convert JoinError to CustomError and propagate QdrantError if any
    }

    let duration_parallel = start_parallel.elapsed();

    // Sequential upload for comparison
    let start_sequential = Instant::now();
    for chunk in points.chunks(chunk_size) {
        client
            .upsert_points(UpsertPointsBuilder::new(collection_name, chunk.to_vec()))
            .await?;
    }

    let duration_sequential = start_sequential.elapsed();

    let search_result = client
        .search_points(
            SearchPointsBuilder::new(collection_name, [11.; 10], 10)
                .filter(Filter::all([Condition::matches("bar", 12)]))
                .with_payload(true)
                .params(SearchParamsBuilder::default().exact(true)),
        )
        .await?;
    dbg!(&search_result);

    let found_point = search_result.result.into_iter().next().unwrap();
    let mut payload = found_point.payload;
    let baz_payload = payload.remove("baz").unwrap().into_json();
    println!("baz: {}", baz_payload);

    // Print durations at the end
    println!("Parallel upload took: {:?}", duration_parallel);
    println!("Sequential upload took: {:?}", duration_sequential);

    Ok(())
}

Python Implementation

Setup

First, install the necessary package:

pip install qdrant-client

Python Code for Parallel and Sequential Upload

import asyncio
import numpy as np
from qdrant_client import AsyncQdrantClient
from qdrant_client.models import VectorParams, Distance, PointStruct, Filter, FieldCondition, Range
from time import time

async def main():
    client = AsyncQdrantClient(url="http://localhost:6333")

    collection_name = "test"

    # Recreate collection
    await client.recreate_collection(
        collection_name=collection_name,
        vectors_config=VectorParams(size=100, distance=Distance.COSINE),
    )

    # Create a complex payload
    payload = {
        "foo": "Bar",
        "bar": 12,
        "baz": {
            "qux": "quux",
            "nested": {
                "level1": {
                    "level2": "data",
                    "array": [1, 2, 3, 4, 5]
                }
            }
        }
    }

    # Generate multiple points
    num_points = 10000
    vector_size = 100
    vectors = np.random.rand(num_points, vector_size)
    points = [
        PointStruct(
            id=idx,
            vector=vector.tolist(),
            payload=payload
        )
        for idx, vector in enumerate(vectors)
    ]

    # Parallel upload
    start_parallel = time()

    chunk_size = 100
    tasks = [
        client.upsert(
            collection_name=collection_name,
            points=points[i:i + chunk_size]
        )
        for i in range(0, len(points), chunk_size)
    ]
    await asyncio.gather(*tasks)

    duration_parallel = time() - start_parallel

    # Sequential upload
    start_sequential = time()

    for i in range(0, len(points), chunk_size):
        await client.upsert(
            collection_name=collection_name,
            points=points[i:i + chunk_size]
        )

    duration_sequential = time() - start_sequential

    # Perform a search query
    query_vector = np.random.rand(vector_size).tolist()
    search_result = await client.search(
        collection_name=collection_name,
        query_vector=query_vector,
        query_filter=Filter(
            must=[FieldCondition(
                key='bar',
                range=Range(gte=12)
            )]
        ),
        limit=10,
    )
    print(search_result)

    # Print durations
    print(f"Parallel upload took: {duration_parallel:.2f} seconds")
    print(f"Sequential upload took: {duration_sequential:.2f} seconds")

if __name__ == "__main__":
    asyncio.run(main())

Explanation of Arc Usage

Arc (Atomic Reference Counted) is used in Rust to allow multiple threads or async tasks to share ownership of the same instance of data, such as a Qdrant client.

  • Shared Ownership: Arc enables multiple parts of your program to own and use the same resource safely.
  • Efficiency: Cloning an Arc is cheap compared to cloning the underlying data.
  • Concurrency: With Arc, multiple threads or tasks can concurrently access a shared resource without needing a mutex for read-only access.

Summary

This guide demonstrates how to implement and compare parallel and sequential data uploads in Rust and Python using their respective async and concurrency features. Both implementations aim to perform the same tasks with similar performance considerations. As you can see, we get quicker upserts with Rust. Nice!

If you go from sequential Python to Parallel Rust you halve the time taken!

Parallel upload took: 5.09 seconds
Sequential upload took: 7.62 seconds
Parallel upload took: 3.620016425s
Sequential upload took: 5.665954612s
❯ lscpu
Architecture:            x86_64
  CPU op-mode(s):        32-bit, 64-bit
  Address sizes:         39 bits physical, 48 bits virtual
  Byte Order:            Little Endian
CPU(s):                  8
  On-line CPU(s) list:   0-7
Vendor ID:               GenuineIntel
  Model name:            Intel(R) Core(TM) i5-8350U CPU @ 1.70GHz
    CPU family:          6
    Model:               142
    Thread(s) per core:  2
    Core(s) per socket:  4
    Socket(s):           1
    Stepping:            10
    CPU max MHz:         3600.0000
    CPU min MHz:         400.0000

Webdock – Fast Cloud VPS Linux Hosting

Previous article

Using Rust for Preprocessing images

Next article

Rust thiserror