2025/02/15

Building a trait service with Rust and Kafka

In the world of microservices and distributed systems, managing user activity events can be a complex task. This article delves into the development of a trait-based service specifically designed to streamline this process in Rust.

Central hub for user Interactions

This service acts as a central hub, functioning as a single point of entry for user activity data originating from various sources within your application. Based on the collected user activity data, the service constructs a simplified user profile.

Responsibilities

Since this service will be directly accessed by clients (like your application's frontend), it comes with a set of key responsibilities to ensure smooth operation and reliable data handling:

  • Clear API: A well-defined API with clear documentation guides clients on data format and submission methods.

  • Robust Validation: Strong validation checks ensure only valid data reaches Kafka, safeguarding data integrity.

  • Scalability & Performance: The service must handle increasing data volume efficiently as your user base grows.

  • Security: Robust authentication, authorization, and data encryption protect sensitive user activity information.

  • Error Handling & Monitoring: Graceful error handling and proactive monitoring guarantee smooth data delivery and facilitate troubleshooting.

Project setup

Let's move forward with defining the API and building the endpoint to receive those user event messages! Here's a breakdown of the project setup using the Rocket framework and the rdkafka library for interacting with Kafka:

Cargo.toml

[package]
name = "trait-service"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at <https://doc.rust-lang.org/cargo/reference/manifest.html>

[dependencies]
# Web framework for Rust, used for building the API server
rocket = { version = "0.5.0", features = ["json"] }
# Kafka client library for Rust, used for producing and consuming messages
rdkafka = { version = "0.36.2", features = ["cmake-build"] }
# Serialization and deserialization of data
serde = { version = "1.0", features = ["derive"]

This file defines the project's dependencies, including Rocket for building the web API server, rdkafka for interacting with Kafka, and serde for data serialization and deserialization.

src/main.rs

#[macro_use]
extern crate rocket;

// Load modules
mod event;
mod kafka;

use event::user::controllers::user_controller;

#[launch]
fn rocket() -> _ {
    rocket::build().mount("/", routes![user_controller::track])
}

// Load test files
#[cfg(test)]
mod tests;

We'll establish the connection between user requests and the logic that handles them by linking routes to the controller. https://rocket.rs/guide/v0.5/

src/event/user/controllers/user_controller.rs

// # Goals of the controller files

// ## Define a route
// The controller establishes a connection between a specific URL path (route) and the corresponding function that handles the request.
// This tells the application how to direct incoming requests to the appropriate logic within the controller.

// ## Parse the incoming request payload
// The request payload contains data sent by the client (like form data or JSON objects).
// The controller parses this data into a usable format, making it accessible for further processing.

// ## Validate the payload
// It's crucial to ensure the received data is in the expected format and adheres to any defined rules.
// The controller validates the payload to prevent unexpected behavior or security vulnerabilities.

// ## Communicate with other services
// Controllers often interact with other parts of the application, like models or services, to retrieve or manipulate data.
// This communication allows the controller to fulfill the request's purpose.

// ## Return a response
// After processing the request, the controller generates a response object containing the data or message to be sent back to the client.
// This response could be HTML content, JSON data, or any format appropriate for the request type.

use rocket::serde::json::{serde_json, Json};

use crate::event::user::models::event::UserEvent;
use crate::kafka;

// Track user interactions with the product
// Send event to the kafka server and return 200 http code
// Message consumers will process the event asynchronously
// Failure points
// - Kafka server is down
// - Invalid event payload
#[post("/event/user/track", format = "json", data = "<event>")]
pub async fn track(event: Json<UserEvent<'_>>) -> &'static str {
    // TODO: Validate event payload
    let payload = serde_json::to_string(&event.payload).unwrap();

    // Send event to the kafka server
    kafka::send_event(event.key, &payload).await;

    "ok"
}

We are getting the parameters from the user POST request. Here we are using the request guards. The request guards check incoming requests against your rules (like valid login or required data) before letting them reach the endpoint's code.

Additionally, we can integrate data encoding options, including popular formats like Avro. This allows for compact and schema-based encoding, further enhancing data clarity and interoperability within the system.

Let’s see what the track request expects.

src/event/user/models/event.rs

// # Goals of the model files

// ## Define data structures
// This ensures consistency in how data is stored and accessed throughout your application.

// ## Show model examples
// This provides practical demonstrations of how the model is used, making it easier for developers to understand and implement.

// ## Define enum values
// This helps limit data to specific options and reduces the chance of errors or unexpected values being entered.

// ## Reduce ambiguity
// By clearly defining the model's structure and behavior, you minimize confusion and make the code more maintainable.

use rocket::serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Debug)]
#[serde(crate = "rocket::serde")]
pub struct UserEventPayload<'a> {
    pub client_os: Option<&'a str>,
}

// UserEvent captures the interaction of a user with the product
// the key should be unique and self-explanatory.
// It should respond to who, what, where, when, why & how
// key: v1.landing.user.order.create
// payload: { client_os: "iOS", client_version: "1.0.0" }
#[derive(Serialize, Deserialize, Debug)]
#[serde(crate = "rocket::serde")]
pub struct UserEvent<'a> {
    pub key: &'a str,
    pub payload: UserEventPayload<'a>,
}

src/kafka.rs

use std::time::Duration;

use rdkafka::config::ClientConfig;
use rdkafka::producer::{FutureProducer, FutureRecord};

// TODO: Move constants
const KAFKA_TOPIC_NAME: &str = "user_events";

// Send event to the kafka server and return the delivery status
pub async fn send_event(key: &str, message: &str) {
    let producer: FutureProducer = kafka_producer().expect("Producer creation error");

    producer
        .send(
            FutureRecord::to(KAFKA_TOPIC_NAME).payload(message).key(key),
            Duration::from_secs(0),
        )
        .await;
}

// Create kafka producer
fn kafka_producer() -> Result<FutureProducer, rdkafka::error::KafkaError> {
    // Get the kafka broker url from the environment variable or use the default value
    let kafka_broker_url: String =
        std::env::var("KAFKA_BROKER_URL").unwrap_or("localhost:9093".to_string());

    ClientConfig::new()
        .set("bootstrap.servers", kafka_broker_url)
        .set("message.timeout.ms", "5000")
        .create()
}

And here is the Kafka client. It sets up the Kafka producer using the environment variables and constants. We can make this method more generic so you can replace the underlying Kafka backend but I will skip for this project.

src/tests/event/user/controllers/user_controller.rs

use crate::rocket;
use rocket::http::ContentType;
use rocket::http::Status;
use rocket::local::blocking::{Client, LocalResponse};
use rocket::serde::json::{json, serde_json};

fn rocket_client() -> Client {
    Client::tracked(rocket()).expect("valid rocket instance")
}

fn event(key: &str, payload: serde_json::Value) -> String {
    json!({
        "key": key,
        "payload": payload,
    })
    .to_string()
}

fn empty_payload() -> serde_json::Value {
    json!({})
}

fn invalid_payload() -> serde_json::Value {
    json!({
        "client_os": 33,
    })
}

fn post_user_event<'a>(client: &'a Client, payload: &'a str) -> LocalResponse<'a> {
    return client
        .post("/event/user/track")
        .header(ContentType::JSON)
        .body(payload)
        .dispatch();
}

#[test]
fn track_empty_payload() {
    let client = rocket_client();
    let payload = event("landing.user.home.click", empty_payload());
    let response = post_user_event(&client, &payload);

    assert_eq!(response.status(), Status::Ok);
}

#[test]
fn track_invalid_payload() {
    let client = rocket_client();
    let payload = event("landing.user.home.click", invalid_payload());
    let response = post_user_event(&client, &payload);

    assert_eq!(response.status(), Status::UnprocessableEntity);
}

The controller tests are a great start! To truly ensure the service functions flawlessly, we can expand testing to include the Kafka client, data validation, and model behavior.

To streamline development, we'll leverage Docker and Docker Compose. These tools simplify setting up the entire tech stack (including this service, Kafka, and other dependencies) on your local machine. We'll also delve into the fascinating realm of Kafka Streams and ksqlDB, revealing how they work together to seamlessly process and analyze the user activity data we collect.

Follow me on other channels

Follow me on other channels

Follow me on other channels