Skip to main content

· 2 min read

Today, we're excited to announce the release of Engula 0.3! Engula is a cloud-native data structure store, used as a database, cache, and storage engine.

What's in 0.3?

Engula has gone through a redesign in terms of its interface and architecture. The new design is based on the lessons we learned in the past few months, and it will provide a clearer guide for future development. You can check the new design document and discussion for more details.

The most important feature in Engula 0.3 is a set of data structures. In this release, Engula provides five data types: Any, I64, Blob, List, and Map. Each data type provides a set of APIs to manipulate objects of that type. For example, you can add on I64 objects or push elements to List objects.

Moreover, Engula supports ACID transactions across different data structures. For example, you can push an element to a list and insert an index to a map in the same transaction. This feature allows users to build more advanced applications on top of Engula.

Engula 0.3 also comes with a new server and a Rust client. You can get started with this tutorial. Welcome to explore and have fun!

The new website

As you may have noticed, this Engula website has also gone through a redesign! Special thanks to @iNorthIsle and @tisonkun for their nice work.

· 6 min read

Welcome to the tutorial for Engula 0.3! This tutorial will show you how to use Engula with the Rust client.

Prerequisites

  • rustup
  • Engula requires a nightly toolchain to build. You can install it with:
    rustup install nightly

Start the server

First of all, let's install Engula 0.3 with:

cargo +nightly install engula --version 0.3.0 --locked

Engula 0.3 comes with a standalone server. You can start the server with:

engula server start

The server will start and listen to the default address (http://localhost:21716).

Set up the Rust client

Engula 0.3 also comes with a Rust client. To explore the client, set up a new project first:

cargo new hello-engula
cd hello-engula

And then add the following dependencies to Cargo.toml:

[dependencies]
engula-client = "0.3"
anyhow = "1.0"
tokio = "1.15"

Data model

Before using the client, let's introduce the data model first. An Engula deployment is called a universe. For example, the server you started above represents a universe with a single server.

A universe consists of multiple databases, each of which consists of multiple collections. A collection contains a set of objects of the same data type. Each object has an object identifier that is unique within a collection. Each data type provides a set of APIs to manipulate objects of that type.

Engula 0.3 provides the following data types:

TypeNotesRust Value Type
AnyA value of any supported type.Value
I64A signed 64-bit integer.i64
BlobA sequence of bytes.Vec<u8>
List<T>A sequence of values.Vec<T>
Map<T>An unordered set of key-value pairs.HashMap<Vec<u8>, T>

For container types like List and Map, you can further specify the value type. For example, List<Any> contains values of Any type, and Map<I64> contains values of I64 type.

Note that there are some limitations on type compositions. Firstly, the key type of Map is fixed to Blob. Secondly, you can not specify nested container types. For example, List<Map<I64>> is not allowed. These limitations are by design to prevent overly complicated use cases.

Create databases and collections

Now you can connect to the universe you started above with:

use anyhow::Result;
use engula_client::{Any, Blob, List, Map, Universe, I64};

#[tokio::main]
async fn main() -> Result<()> {
// The address of the server you started above.
let url = "http://localhost:21716";
let uv = Universe::connect(url).await?;
let db = uv.create_database("db").await?;
let c1 = db.create_collection::<Any>("c1").await?;
let c2 = db.create_collection::<I64>("c2").await?;
let c3 = db.create_collection::<Blob>("c3").await?;
let c4 = db.create_collection::<List<Any>>("c4").await?;
let c5 = db.create_collection::<List<I64>>("c5").await?;
let c6 = db.create_collection::<List<Blob>>("c6").await?;
let c7 = db.create_collection::<Map<Any>>("c7").await?;
let c8 = db.create_collection::<Map<I64>>("c8").await?;
let c9 = db.create_collection::<Map<Blob>>("c9").await?;
Ok(())
}

This snippet creates a database and multiple collections. The collections listed above enumerate all the supported data type compositions.

Manipulate objects

You can manipulate collections with the basic get/set/delete interfaces:

// Sets the Any object with i64 (I64)
c1.set("o", 1).await?;
// Sets the Any object with Vec<u8> (Blob)
c1.set("o", vec![1u8, 2u8]).await?;
// Sets the Any object with Vec<i64> (List<I64>)
c1.set("o", vec![1i64, 2i64]).await?;
// Gets and prints the object
println!("c1.o = {:?}", c1.get("o").await?);
// Deletes the object
c1.delete("o").await?;
// Sets the I64 object with i64
c2.set("o", 1).await?;
println!("c2.o = {:?}", c2.get("o").await?);
// Sets the Blob object with Vec<u8>
c3.set("o", vec![1, 2]).await?;
println!("c3.o = {:?}", c3.get("o").await?);
// Sets the List<I64> object with Vec<i64>
c5.set("o", vec![1, 2]).await?;
println!("c5.o = {:?}", c5.get("o").await?);
// Sets the Map<Blob> object with HashMap<Vec<u8>, Vec<u8>>
c9.set("o", [(vec![1], vec![1]), (vec![2], vec![2])])
.await?;
println!("c9.o = {:?}", c9.get("o").await?);

Note that some collections are omitted here. You can try to explore them in your project. You can also manipulate individual objects in a collection with the object interface:

// Any object
c1.set("o", 1).await?;
c1.object("o").add(1).await?;
println!("c1.o = {:?}", c1.get("o").await?);
// I64 object
c2.object("o").add(2).await?;
println!("c2.o = {:?}", c2.get("o").await?);
// Blob object
c3.object("o").append(vec![3u8, 4u8]).await?;
println!("c3.o = {:?}", c3.get("o").await?);
println!("c3.o.len = {:?}", c3.object("o").len().await?);
// List<I64> object
c5.object("o").push_back(3).await?;
c5.object("o").push_front(0).await?;
println!("c5.o = {:?}", c5.get("o").await?);
println!("c5.o.len = {:?}", c5.object("o").len().await?);
// Map<Blob> object
c9.object("o").set(vec![3], vec![3]).await?;
c9.object("o").delete(vec![1]).await?;
println!("c9.o = {:?}", c9.get("o").await?);
println!("c9.o.len = {:?}", c9.object("o").len().await?);
println!("c9.o.[3] = {:?}", c9.object("o").get(vec![3]).await?);

As you can see, each data type provides some special APIs to manipulate objects of that type. The Any type is more flexible since it provides APIs of multiple data types. But it is also error-prone because it performs more implicit conversions with less type checking.

Transactions

The data structures introduced above provide rich semantics sufficient for simple use cases. But they only allow you to manipulate individual objects. If you need to build more advanced applications, you will need transactions. For example, you may want to push a message to a list whenever a map is updated. In this case, you will need to update the list and the map atomically for data consistency.

Engula provides powerful transactions for you to build high-level data structures on top of it. Engula supports object-level, collection-level, and database-level transactions.

Object-level transactions allow you to do multiple operations on a single object. For example:

// Updates a List<I64> object in a transaction.
let mut txn = c5.object("txn").begin();
txn.push_back(1).push_front(0);
txn.commit().await?;
println!("c5.txn = {:?}", c5.get("txn").await?);
// Updates a Map<Blob> object in a transaction.
let mut txn = c9.object("txn").begin();
txn.set(vec![1], vec![1])
.set(vec![2], vec![2])
.delete(vec![3]);
txn.commit().await?;
println!("c9.txn = {:?}", c9.get("txn").await?);

Collection-level transactions allow you to do mulitple operations on multiple objects in the same collection. For example:

// Updates multiple I64 objects in a transaction.
let mut txn = c2.begin();
txn.set("a", 1);
txn.object("b").add(1).sub(2);
txn.commit().await?;
// Updates multiple List<I64> objects in a transaction.
let mut txn = c5.begin();
txn.set("a", vec![1, 2]);
txn.object("b").push_back(3).push_front(0);
txn.commit().await?;

Database-level transactions allow you to do multiple operations on multiple objects across different collections in the same database. For example:

// Begins a database transaction
let txn = db.begin();
{
// Begins a collection sub-transaction
let mut t = c5.begin_with(txn.clone());
t.set("a", vec![1, 2]);
t.object("b").push_back(3);
// Commits the sub-transaction.
// Note that the operations will not be executed until the database transaction commits.
t.commit().await?;
}
{
// Begins another collection sub-transaction
let mut t = c9.begin_with(txn.clone());
t.set("a", [(vec![1], vec![1]), (vec![2], vec![2])]);
t.object("b").set(vec![3], vec![3]);
t.commit().await?;
}
// Commits the database transaction and executes all the sub-transactions.
// Note that this will fail if there is any pending sub-transaction.
txn.commit().await?;

Feedback

That's all for Engula 0.3. There is still a lot of work for the data APIs and internal architecture. We are looking forward to your feedback. Welcome to join us in GitHub Discussions and Zulip. See you in the next release!

· 3 min read

Today, we're excited to announce the release of Engula 0.2! Since demo 1, Engula has entered a formal development cycle, and Engula 0.2 is the first official release. Engula is a serverless storage engine that aims to help users build reliable and cost-effective databases.

What's in 0.2?

We have published a design document to introduce Engula's concepts and architecture. For those who don't know much about Engula or haven't been around for a while, Engula has changed a lot, and the design document is the best place to get started.

Engula 0.2 comes with one engine and three kernels:

  • A hash engine that supports simple key-value operations. This engine relies on a kernel to perform stateful operations. It can work with the three kernels below to tackle different use cases.
  • A memory kernel that stores everything in memory. This kernel integrates a memory journal, storage, and manifest for data storage.
  • A file kernel that stores everything in local files. This kernel integrates a file journal, storage, and manifest for data storage.
  • A gRPC kernel that stores data in remote gRPC servers. An engine uses a kernel client to communicate with a kernel server. The kernel server can further connect to a journal server and a storage server for data storage. Engula 0.2 provides a binary to start different kinds of servers easily.

If you want to try it out, we prepare a tutorial for you. Have fun!

Community development

Engula is an ambitious project that tries to offer a serverless storage engine for the next decade. It requires a lot of effort and different kinds of skills. We believe that a supportive community is vital to the success of Engula, and we are glad to see that the community is going in a promising direction.

If you are interested in joining our community, we have some guidelines for you:

  • Please read the CONTRIBUTING guide first.
  • For formal design documents, you can check the docs.
  • For discussions with a specific topic, you can go to Discussions.
  • For casual discussions or just want to say hi, feel free to join Zulip.

FAQ

Where is 0.1?

Version number 0.1 has been used by demo 1. It is a mistake, but we decided to make use of it and treated demo 1 as version 0.1. You can check the discussion here for more details.

Where is demo 2?

In the demo 1 report, we indicated that we would have a demo 2 to explore the path toward an adaptive storage engine. While the goal is fantastic, the timing is not. We did spend some time on demo 2, though. But then we realized that Engula doesn't have a solid foundation for such an exploration yet. So we decided to abort demo 2 and start a formal development cycle. You can check the discussion here for more details.

· 5 min read

Welcome to the tutorial for Engula 0.2! In this tutorial, we'll show you how to use an Engula engine.

Prerequisites

  • rustup
  • Engula requires a nightly toolchain to build. You can install it with:
    rustup install nightly

Let's go

First of all, create a new project with:

cargo new hello-engula
cd hello-engula

Then in Cargo.toml, add engula and tokio to the dependencies section:

[dependencies]
engula = "0.2"
tokio = "1.14"

Hash engine

Engula 0.2 comes with a hash engine that provides simple key-value storage. You can use the hash engine in src/main.rs:

use engula::engine::hash::{Engine, Result};
use engula::kernel::mem::Kernel;

#[tokio::main]
async fn main() -> Result<()> {
let kernel = Kernel::open().await?;
let engine = Engine::open(kernel).await?;

println!("✊ Here we go!");
let key = vec![0];
let value = vec![1];
println!("👉 Put {:?} {:?}", key, value);
engine.put(key.clone(), value.clone()).await?;
let got = engine.get(&key).await?;
println!("👉 Get {:?} {:?}", key, got);
println!("👉 Del {:?}", key);
engine.delete(key.clone()).await?;
let got = engine.get(&key).await?;
println!("👉 Get {:?} {:?}", key, got);
println!("🤟 Engula works!");

Ok(())
}

Now, you can run the example with:

cargo +nightly run

That's it. You have successfully run an Engula engine! Let's see what happened here. In the above example, you open a kernel and run an engine on it. Then you use the engine to do some normal get/put/delete operations. The engine stuff should be very intuitive if you have used any other storage engines before. But what does the kernel mean here?

Local kernel

In Engula, a kernel provides a stateful environment for an engine. An engine runs on a kernel and stores all its data in the kernel. You can run an engine on different kernels according to your use case. For example, the above example uses a memory kernel that stores everything in memory. The memory kernel is handy for tests, but if you restart the kernel, everything is gone. If that's not what you want, Engula 0.2 also provides a file kernel that stores everything in local files.

To use the file kernel, you need to replace engula::kernel::mem::Kernel with engula::kernel::file::Kernel like this:

use engula::engine::hash::{Engine, Result};
use engula::kernel::file::Kernel;

#[tokio::main]
async fn main() -> Result<()> {
// Opens a file kernel that stores data in the given path.
let kernel = Kernel::open("/tmp/engula").await?;
let engine = Engine::open(kernel).await?;
...
}

You can run it again and see what it will done to your /tmp/engula.

Nice, you have tried two kernels now, and everything just works. But a storage engine that can store data in memory and local files doesn't seem very exciting, right? Fine, but don't worry, we get a third kernel for you, a gRPC kernel. This kernel is more interesting because it makes Engula very different from other storage engines you may ever seen.

Remote kernel

A gRPC kernel allows you to run an embedded engine along with your database while storing data in a remote kernel. The remote kernel, in turn, stores data in a remote journal and a remote storage. But you don't need to know how they work for now. You just need to understand their relationship to connect them together.

Kernel

Simply speaking, to use a gRPC kernel, you need to start a journal server and a storage server first. Then start a kernel server that connects to the journal server and the storage server. After that, you can connect to the kernel server and run your engine on it. Engula 0.2 provides an engula binary to start those servers. It's not complicated. Let's try it out!

You need to install the engula binary first:

cargo +nightly install engula

Then start a journal server and a storage server:

engula journal run 127.0.0.1:10001 --mem
engula storage run 127.0.0.1:10002 --mem

And then start a kernel server connecting to the journal server and the storage server:

engula kernel run 127.0.0.1:10003 --journal 127.0.0.1:10001 --storage 127.0.0.1:10002 --mem

Now, we have set up a remote kernel. You may notice the --mem option at the end of each command. It lets you choose how to store data. Using --mem means that the server stores data in memory. You can also try --file <PATH> instead to store data in local files.

Finally, you can connect to the remote kernel and run the engine on it:

use engula::engine::hash::{Engine, Result};
use engula::kernel::grpc::Kernel;

#[tokio::main]
async fn main() -> Result<()> {
// Connects to the kernel server you just started.
let kernel = Kernel::connect("127.0.0.1:10003").await?;
let engine = Engine::open(kernel).await?;
...
}

That's it. You only need to import the engula::kernel::grpc::Kernel and connect to the kernel server. Then you can use the engine exactly the same as before, except that your engine is now powered by a remote kernel!

Well, the idea may not sound so exciting at this moment, as our remote kernel is very primitive for now. However, imagine that Engula provides a serverless kernel that scales automatically according to your workload in the future. Then you can build your database with the new kernel to get a serverless database instantly, without changing any other code at all!

Engula enables you to build a standalone database and a serverless database in the same way. You don't need to pay any unnecessary performance penalty in either case. And we will also help you deploy your database. These are the goals of Engula 1.0. Watch us!

Get more information

That's all for Engula 0.2. If you want to know more about Engula, you can start with the design document. See you in the next release!

· 18 min read

Congratulations, we have finished our first demo and evaluated it on AWS in September 2021. This demo is a proof of concept about an elastic storage engine on the cloud.

In this post, we first describe the architecture and implementation of the demo. We then describe the experiments we conducted on the demo and share some lessons we learned.

Architecture and Implementation

The architecture of the demo is as follow:

Architecture

The top half shows a classic LSM-Tree structure similar to RocksDB. However, the implementation is much simpler due to the limited time and workforce. Specifically, we only support single key-value Put and Get. For puts, all entries are first appended to the journal and then inserted into the memtable. When the size of the memtable reaches a threshold, the database flushes it to a file in the storage. When the number of files in the storage reaches a threshold, the database triggers compactions to merge small files into larger ones. For gets, the database examines the mutable memtable, the immutable memtable, and individual storage files in order (there are no filters). Moreover, to achieve concurrent flushes and compactions, we hash the memtable and the storage into a few shards.

The bottom half is what makes the demo interesting. It shows the major difference between the demo and other traditional storage engines. First, we unbundle the journal, storage, and compaction components, enabling us to switch each component to another implementation without affecting others. Then for each component, we provide a local implementation and a remote implementation. The local implementation runs in the same process with the database, while the remote implementation wraps the local one into a gRPC service that runs as a standalone process.

For more implementation details, please check the code here.

Evaluation

Now let's evaluate the demo we described above. Our goal here is to explore the potential of an elastic storage engine while comparing it to a traditional one. We should note that the demo's absolute performance doesn't matter a lot. The demo's implementation is over-simplified, so its absolute performance provides no reference value outside of it.

In the following sections, we conduct two groups of experiments on the demo. The first group compares the performance between local and remote components with a simple deployment. The second group employs a more sophisticated deployment to demonstrate a cloud-native architecture specific to AWS.

Local versus Remote

Let's start with a simple deployment. We run local and remote components on two different EC2 instances of the same type to make the comparison straightforward. Specifically, we choose two r6gd.2xlarge instances located at us-east-2. Each r6gd.2xlarge instance comes with a local SSD. In addition, we attach a gp3 EBS (5000 IOPS and 500MB/s throughput) to each instance. This extra disk allows us to separate the journal and the storage to reduce IO interference. As for the network performance between these instances, the latency is about 0.1ms, and the bandwidth is up to 10 Gbps.

The deployment is as follow:

Local/Remote Deployment

We also develop a purpose-built benchmark tool that allows us to switch between local and remote components in a configuration file. For all experiments in this section:

  • The benchmark tool always runs on node 1.
  • For the local setup, the local journal, storage, and compaction run in the same process with the benchmark tool on node 1.
  • For the remote setup, the remote journal, storage, and compaction run as standalone processes on node 2, and the benchmark tool communicates with them through gRPC.

Moreover, due to some limitations in the demo, we evaluate read-only and write-only performance separately.

Put

Now let's evaluate the Put performance with the local and remote setups. For more details about the specific setups, please check the local and remote configuration files.

The benchmark results without compactions are as follows:

Put Performance

Table 1QPSLatency (P95/P99/P999)CPU Usage (Sum of two nodes)
Local1.01 million9.2/12.0/17.5 ms7.4 cores
Remote1.04 million9.3/12.2/17.7 ms9.6 cores

Figures 1-3 show the runtime statistics of the local setup (on the left) and the remote setup (on the right).

First of all, we are excited to see that both setups achieve more than 1 million puts per second with sync enabled. As far as we know, 99% of applications never reach 1 million puts per second throughout their lifetime, which means that a well-optimized database can serve all these applications with a single compute node. We also notice that there are some fluctuations in Figures 1 and 3. These fluctuations occur whenever the memtable is flushed, which is expected due to the depleted CPU on node 1 (8 cores).

On the other hand, Table 1 shows that the remote setup achieves similar QPS and latency as the local one. However, the remote setup consumes 30% more CPU due to extra work on RPC and networking, which makes the remote setup unappealing in such a scenario.

Next, we enable compactions and rerun the benchmarks.

Put Performance with Compactions

Table 2QPSLatency (P95/P99/P999)CPU Usage (Sum of two nodes)
Local0.80 million11.7/15.5/35.2 ms7.6 cores
Remote0.99 million8.4/11.2/21.4 ms12.4 cores

This time, the local setup performs worse than before because compactions introduce excessive CPU contentions on node 1. In contrast, the remote setup performs similarly well because node 2 still has sufficient CPU for compactions. But we need to consider what we pay and what we get here. Table 2 shows that the remote setup pays 63% more CPU for 25% higher QPS, which may or may not be a cost-effective deal depending on the application.

Nevertheless, the remote setup does make the storage engine more elastic. Imagine that as the dataset grows, we can partition the remote components and scatter them into multiple nodes. In this way, we can extend the capability of a database with a single compute node as much as possible to avoid distributed transactions.

Get

Now let's evaluate the Get performance with the local and remote setups. For more details about the specific setups, please check the local and remote configuration files.

It is important to note that we keep all storage files in memory during the following experiments, which is a compromise due to Rust's lack of support for asynchronous random access file IO and our limited human resources. We decide to live with that because IO performance is not our primary concern here.

The benchmark results without cache are as follows:

Get Performance

Table 3QPSLatency (P95/P99/P999)CPU Usage (Sum of two nodes)
Local0.68 million0.03/0.03/0.03 ms8.0 cores
Remote0.10 million3.1/3.7/6.6 ms10.5 cores

Figures 7-10 show the runtime statistics of the local setup (on the left) and the remote setup (on the right).

For the local setup, the latency is very low since all data stays in memory. Hence, the benchmark is entirely CPU-bound, and it squeezes all CPUs on node 1.

For the remote setup, the performance is much worse than the local one. The reason for this bad performance is apparent, though. Without cache, every read involves one or more RPCs to retrieve data blocks from the remote storage. Figure 10 shows that the network bandwidth exhausts, although both nodes still have some spare CPU. This situation is very undesired because if the network bandwidth of the compute node becomes the bottleneck, all remote components are affected, and adding more nodes is not going to help.

Therefore, we have to admit that using remote storage without cache is not practical.

So let's enable cache and rerun the benchmarks.

Get Performance

Table 4QPSLatency (P95/P99/P999)CPU Usage (Sum of two instances)
Local0.70 million0.03/0.03/0.05 ms8.0 cores
Remote0.72 million2.0/6.2/12.3 ms8.8 cores
Remote 21.41 million0.1/7.1/13.2 ms15.8 cores

According to our statistics, the cache we introduce provides a 0.98 cache hit ratio, which boosts the performance of the remote setup to the same level as the local one. However, the remote setup still suffers a long-tail latency, which is inevitable unless we cache all data on the compute node.

On the other hand, Figure 13 shows that node 1 becomes the bottleneck of both setups now. But node 2 is almost idle here. So can we do something with it? Well, we can run benchmarks on both nodes against the same set of remote components. This setup simulates the use case of running multiple read replicas on a cloud database.

Table 4 (Remote 2) shows the result of running benchmarks on node 1 and node 2 concurrently against the same set of remote components. We successfully utilize all CPUs on both nodes and achieve 1.41 million gets per second. This setup reveals the advantage of remote components, providing flexible ways to leverage available resources across nodes.

Cloud-Native Architecture

While the first group of experiments gives some insights about the demo, they haven't shown the big picture yet. In previous experiments, we run the remote journal, storage, and compaction on the same node, which is not required. We can be more aggressive in separating these components into individual nodes and choose appropriate instance types and disks.

Furthermore, the demo provides other kinds of storage components that make things more interesting:

  • SSTable storage: stores files in SSTable format on file system or S3
  • Parquet storage: stores files in Parquet format on file system or S3
  • Hybrid storage: provides an abstraction to read, write, and SELECT files on different storage components

To showcase these components, we construct a hybrid storage on top of an SSTable storage on a local file system and a Parquet storage on S3. Specifically, whenever the memtable is flushed, the hybrid storage writes one copy to the SSTable storage and another copy to the Parquet storage. The hybrid storage always reads from the SSTable storage to serve row-oriented, latency-sensitive reads. On the other hand, the benefits of adding the Parquet storage here are threefold:

  • Offloads compaction reads from the SSTable storage to make it more stable.
  • Leverages S3 to improve durability at low cost.
  • Leverages S3 SELECT command to serve analytical workloads efficiently. We can even merge the memtable with the results from Parquet files on S3 to provide real-time, strongly consistent analytical services, which gives us a promising architecture for HTAP.

Finally, we get a cloud-native architecture as follow:

Cloud-Native Architecture

Deployment

To make a cost-effective deployment for the architecture above, we need to understand the characteristics of different components first:

  • The journal requires minimal CPU and memory. But it can benefit from a premium disk to write at low latency.
  • The storage requires a medium amount of CPU and memory and a disk with enough capacity and decent performance.
  • The compaction requires a medium amount of CPU and memory but doesn't need any instance storage. Moreover, since compactions are background jobs that can fail without affecting foreground services, they don't require reliable resources. AWS provides EC2 Spot Instances at up to a 90% discount for such fault-tolerant workloads.
  • The compute/benchmark requires a large amount of CPU and memory, and it has to run reliably.

Moreover, let's assume that we are running an OLTP workload that writes at 200MB/s in 1KB value size. Given such a workload and the characteristics above, we decide to give the following setup a try:

ComponentInstance nameHourly costvCPUMemoryStorageNetwork bandwidth
Computem5.2xlarge$0.384832 GiBUp to 10 Gbps
Journalt3.nano$0.005220.5 GiBio2 (2000 IOPS)Up to 5 Gbps
Storager5.xlarge$0.252432 GiBgp3 (5000 IOPS, 500MB/s)Up to 10 Gbps
Compactionr5.xlarge$0.0418 (Spot Instance)432 GiBUp to 10 Gbps

Benchmark

Now it's time to see if our beautiful architecture and deployment work. For more details about the specific setup, please check the configuration file. To make things simple, we only evaluate Put performance here. The Get performance of this architecture should be similar to the remote setup above, so we will not discuss it further.

The benchmark results for Put are as follow:

Cloud-Native Put Performance 1

Well, it works. But only for a little while, unfortunately. Then the QPS drops, and the latency raises horribly. Don't panic. Let's see what's wrong here and fix it. Figure 17 shows that the CPU usage is very low, far from the limits of our instances. Figure 18 shows that the disk bandwidth is also very low, far from the limits of our provisioned disks. So what's left is the network bandwidth, which also looks very low in Figure 19. Hmm, we must miss something here.

It turns out that we neglect a critical factor about the network bandwidth of our instances. The "network bandwidth" listed in the table above doesn't tell us all the truth. AWS uses a network I/O credit mechanism. In simple terms, network bandwidth of "Up to 10 Gbps" gives us no guarantee at all. What we can rely on is the network baseline bandwidth, not the network bandwidth. So we need to find out the baseline bandwidth of our instances:

ComponentInstance nameHourly costNetwork bandwidthNetwork baseline bandwidth
Computem5.2xlarge$0.384Up to 10 Gbps2.5 Gbps
Journalt3.nano$0.0052Up to 5 Gbps0.032 Gbps
Storager5.xlarge$0.252Up to 10 Gbps1.25 Gbps
Compactionr5.xlarge$0.0418Up to 10 Gbps1.25 Gbps

As we can see, t3.nano for the journal only provides a 0.032 Gbps (4MB/s) baseline bandwidth, which explains the result we get in Figure 19 very well. But what about the initial high performance we observe? That comes from the credits AWS gives the instance when its usage is below the baseline bandwidth. However, the credits are limited, which only allows the instance to burst for a little while. Once the credits exhaust, the network bandwidth drops to the baseline.

Therefore, we should carefully review the network baseline bandwidth of our previous setup. Specifically, to process 200MB/s write throughput for the assumed workload, we need at least 1.6 Gbps network bandwidth. In addition, we have to consider the write amplification of different components:

  • The compute writes to the journal, flushes to the SSTable storage and the Parquet storage. So the minimal bandwidth it requires is 1.6 * 3 = 4.8 Gbps.
  • The journal only needs 1.6 Gbps to receive writes from the compute.
  • The storage receives writes from both the compute and compaction, so it requires 3.2 Gbps.
  • The compaction reads from the Parquet storage and then writes to both the SSTable and Parquet storage, which requires 3.2 Gbps.

So, to satisfy the above requirements, we decide to upgrade our setup as follow:

ComponentInstance nameHourly costNetwork bandwidthNetwork baseline bandwidth
Computem5n.2xlarge$0.476Up to 25 Gbps8.125 Gbps
Journalm5n.large$0.119Up to 25 Gbps2.1 Gbps
Storager5n.xlarge$0.298Up to 25 Gbps4.1 Gbps
Compactionr5n.xlarge$0.0418Up to 25 Gbps4.1 Gbps

This setup costs us 38% more than the previous one.

Now let's rerun the benchmark with this setup to see if it is worth it.

Cloud-Native Put Performance 2

Awesome, we finally get everything work as expected! We accomplish our goal to deliver 200,000 puts per second in 1KB value size, which is 200MB/s. The latency also looks OK with P99 at about 10ms. Figure 23 shows that the network throughput of the compute (the purple one) is less than expected (200MB/s * 3). That's because the Parquet storage compresses our files very well.

On the other hand, we have to note that the resource allocation of this setup is still not optimal. For example, we have to choose m5n.large for the journal instead of t3.nano to get enough network bandwidth. But m5n.large costs 20 times as much as t3.nano, and the journal will never use the extra memory on it.

Besides, the network bandwidth calculation of individual components above is very conservative. For applications that require high durability and availability, we may need three copies of the journal and the storage, which brings 6X write amplification to the compute node. Furthermore, as the dataset grows, the compaction has to work more aggressively to maintain a decent read performance. For a typical OLTP application, the estimated write amplification caused by compactions can be 10X. It will be very challenging to choose appropriate instance types to get optimal resource allocation for different components with such conditions. We can think of two possible solutions to reduce network bandwidth for now. The first one is to enable compression to trade CPU for network bandwidth. The second one is to allow the storage to run some compactions locally when necessary.

As for S3 SELECT, we can't always get it to work because something is wrong with the AWS SDK for Rust. What we can say is that the basic idea works in our tests.

Conclusion

Well, that's all we did and learned last month in this demo. The conclusions we get so far are as follows:

  • The unbundled, cloud-native architecture we propose does work. The architecture makes the storage engine more elastic, providing flexible ways to leverage resources across multiple nodes.
  • However, transferring data between different components requires considerably more CPU and network bandwidth. There is no silver bullet. A cloud-native architecture may or may not be cost-effective depending on the workload.
  • For the read performance, using remote storage without cache is not practical due to the excessive network bandwidth it requires. We should try our best to avoid unnecessary cache misses, which is the key to reduce long-tail latency.
  • For the write performance, background jobs like flushes and compactions have significant effects on stability. We should pay attention to the runtime (in particular, the asynchronous runtime in Rust) of the compute node to reduce the interference caused by background jobs.
  • Network bandwidth is a bigger problem than we thought. Being too aggressive in separating every component into different nodes may not be beneficial. We should be more deliberate before going too far on this way.
  • A well-optimized database on a single node is capable of serving most real-world applications. We should strive to build a storage engine that runs blazing fast on a single node first.
  • Rust's ecosystem is much better than before. Thanks for awesome projects like tokio, tonic, tracing, and opentelemetry, etc. However, it still lacks support for asynchronous file IO, and the AWS SDK for Rust is also too young for production. But we believe that fixing these problems is just a matter of time.

In the end, we are very proud of our achievements in such a short time and workforce. We also want to thank our early contributors for participating in such an immature project.

What's Next

In this demo, we have explored the path toward an elastic storage engine, which is our first design goal. Our second design goal is to make the storage engine adaptive, which means that it should adjust its internal data structure to optimize for dynamic workloads.

So, in the next demo, we will explore the path toward an adaptive storage engine. The next demo will be more challenging than ever because we haven't found any practical references in the industry yet. We still need some time to figure out the roadmap and will share it on Github as soon as possible. Meanwhile, if you want to know how it may look in advance, you can check this paper.