When writing code in any programming language, choosing the appropriate constructs is crucial for creating readable and maintainable code, and in Rust is no different. In this post, I will discuss a specific dilemma I find myself thinking often about: deciding between using combinators or match clauses.

Let’s consider an example where in an application we need to retrieve the value of an environment variable, convert it to a map, and then serialize it to JSON. The first approach uses combinators and closures:

std::env::var("SELECTOR")
    .ok()
    .and_then(|env_var| selector_field(env_var.as_str()).ok())
    .map(|sel_map| serde_json::to_value(sel_map).unwrap())

I ignore the declaration of the fn selector_field(&str) -> Option<BTreeMap<String, String>> function for simplification. This version is concise and elegant, but doesn’t log any warnings if and for what reason the result turns out to be None. To add some context and make it easier for debugging, we can log some useful information. We can modify the code like this:

std::env::var("SELECTOR")
    .map_err(|err| {
        log::warn!("Failed to find environment configuration for selector: {}", err);
    })
    .ok()
    .and_then(|env_var| {
        selector_field(env_var.as_str())
            .map_err(|err| {
                log::warn!("Failed to parse selector: {}", err);
            })
            .ok()
    })
    .map(|sel_map| serde_json::to_value(sel_map).unwrap())

While functional, this version feels convoluted and harder to read due to the nested closures. In such cases, I tend to use a match clause, as I think it is a better option:

let env_var = match std::env::var("SELECTOR") {
    Ok(env_var) => env_var,
    Err(err) => {
        log::warn!("Failed to find environment configuration for selector: {}", err);
        return None;
    }
};
let sel_map = match selector_field(env_var.as_str()) {
    Ok(sel_map) => sel_map,
    Err(err) => {
        log::warn!("Failed to parse selector: {}", err);
        return None;
    }
};
Some(serde_json::to_value(sel_map).unwrap())

This version is more readable and easier to understand. Thus, I think it’s useful to consider the following guidelines when choosing between combinators and match clauses:

  1. If you need to add multiple lines of code inside the closures when using combinators, a match clause might be a better choice.
  2. When error handling is minimal or not needed (conversion to Option or use of ? is possible) and simple closures can be used in combinators, combinators are a good choice.

Combinators in Rust are a powerful tool for writing concise and functional code, but sometimes a match clause is still the better choice for readability. When deciding between the two, consider the complexity of error handling and the need for multiple lines of code inside closures used in combinators. By striking the right balance, we can create more readable and maintainable Rust code.

A few days ago I had to deflate some compressed content using Rust. I quickly found the flate2 crate, it is well maintained and has a very high usage. I decided it would be a great fit for my little project, a CLI tool to download, decompress, and store in a local SQLite3 some content from AWS S3.

I wrote my small CLI tool, and it seemed to work perfectly on the first run. It was all nice until some time later. I have noticed that some lines of my compressed file were missing. The code was running fine and didn’t provide any warnings. After a few hours of debugging, I was sure there was a bug in my code somewhere. I went on and wrote a bunch more test to try to isolate in a reproducible way, where the problem was present, without any luck. All tests passing and no indication of problems anywhere.

At some point, I decided to use a sample from the original compressed files, and that is how I managed to reproduce the bug. The sample code in the README of the flate2 crate was what I used in my code:

use std::io::prelude::*;
use flate2::read::GzDecoder;

fn main() {
    let mut d = GzDecoder::new("...".as_bytes());
    let mut s = String::new();
    d.read_to_string(&mut s).unwrap();
    println!("{}", s);
}

Unfortunately, the GzDecoder cannot successfully decompress my files. I went then to the official library repository and found this issue which describes exactly my problem. I had to use the MultiGzDecoder instead, it all worked as expected after this change and I could decompress successfully my files. The example in the README of flate2 should probably be this:

use std::io::prelude::*;
use flate2::read::GzDecoder;

fn main() {
    let mut d = MultiGzDecoder::new("...".as_bytes());
    let mut s = String::new();
    d.read_to_string(&mut s).unwrap();
    println!("{}", s);
}

Finally, I still don’t know why the MultiGzDecoder works when the GzDecoder don’t. If you know, I will update this blog with your answer.

Following what has been happening on Twitter, I decided to leave. I had an account on Twitter for the last 14 years. It was fun, until it wasn’t anymore. The toxicity and fear of “going viral” prevented me from posting a lot of possible interesting things over the last few years. Yes, before Elon Musk bought the company and sped up the ruin of the site.

I just wanted to share this fact here so that people who used to follow me on Twitter can find a reason why my account is not showing up. I used to have more than 1,000 followers on Twitter, but I doubt many of them will even notice my absence.

People need to have very good self-esteem to handle the toxicity of a tweet that gets more than 50 retweets. Everything I have ever posted that got half of that already had trolls and people questioning my existence. I don’t need that toxicity in my life, thanks.

You can find me here in this blog, on Mastodon in my two accounts, and on LinkedIn. You can also email me.

A while ago, I started working with GStreamer. I wasn’t familiar with the framework
before, and took me a while to grasp how it works. I have recently created a mental model which has been helping me understand how to use GStreamer API and I thought sharing it here could also help others.

GStreamer is a multimedia framework. It is meant to be used to create applications that can manipulate multimedia content. We don’t usually manipulate the multimedia content in GStreamer directly, we orchestrate elements that deal with the content. GStreamer provides us with an API for orchestrating those elements. There are various types of elements: from audio and video encoding, decoding, demuxing, muxing to closed captions, image compositions and all kinds of metadata. There are over 90 elements available in the official GStreamer distribution. It is likely that you’ll find elements for most, if not all, the operations you might want to do.

Element types

When working with GStreamer you start with building a pipeline and using some elements to process your content. Building a pipeline consists generally of creating and connecting elements you need in a meaningful way. For example, to play a file, we need to read its content then decode the content to raw audio and send the raw audio to an audio device.

In GStreamer there are a few general types of elements which can be categorized according to the way they connect with each other. Elements connect using a single or multiple pads which provide input and output streams of content. Input pads are called sink pads, and output pads are called source pads.

Source elements

Source elements have a single source pad always available.

Read from external resources or produce content themselves. Source elements have one output stream or pad (source pad) available at all times (known as static pad). They do not accept any content as input (no sink pads).

Destination elements

Sink elements have a single sink pad always available.

Destination elements, also known as sinks, are elements that receive a single stream of content. They don’t produce any content out (no source pads). They usually have a single input (sink) pad available at all times (static pad).

Demuxer elements

Demuxer elements have a single sink pad always available and possibly multiple source pads created later on demand.

Demuxer elements usually receive one stream of content and split the content into multiple output streams. They usually have a single input (sink) pad available at all times and will create new output (source) pads as soon as they figure out what types of content are available in the stream it’s receiving.

Muxer elements

Muxer elements have a single source pad always available and multiple sink pads created later on demand.

Muxer type elements receive multiple streams (sink pads) of content and mix them together into a single output stream (source pad). Muxers usually have an output (source) pad available at all times. Whenever we want a muxer to process some content, we need to request new input (sink) pads to be created.

Filter elements

Filter elements have a sink and a source pad always available.

Filter elements generally have a single input (sink) pad and a single output (source) stream pad. The pads in a filter element are usually static (always available).

Multimedia manipulation using pseudocode

You now know about elements and their most common types. How to put them together? In GStreamer, elements usually connect inside a container (or bin) called a pipeline. We put elements into it and use API calls to make the elements connect. Let’s say we want to play music from a file. We need to read the file, decode the content to raw audio, then send this content to an audio device. Here’s a pseudo pipeline with made-up elements:

Inspired by Javascript and HTML manipulation, one could imagine pseudocode like this:

// Create our pipeline container
let pipeline = createElement("pipeline");

// Create our elements
let file_reader = createElement("filesource");
file_reader.setAttribute("location", "/Users/rafaelcaricio/astronaut.wav");

// Generic decoder which is similar
// to a demuxer type of element
let decoder = createElement("decoder");
let audio_device = createElement("audiodevice");

// Add all elements to the Pipeline container
pipeline.appendChild(file_reader);
pipeline.appendChild(decoder);
pipeline.appendChild(audio_device);

// Tell our elements how to connect
file_reader.link(decoder);

// The demuxer type of elements only create pads
// after it is able to get some content, so we
// add a event listener here to link later on
// whenever the new stream pad is available
let audio_device_pad = audio_device.getDestinationPad();
decoder.addEventListener("streamFound", function (stream_source_pad) {
    stream_source_pad.link(audio_device_pad);
})

pipeline.dispatchEvent("playing");

pipeline.waitUntilEnd();

pipeline.dispatchEvent("stop");

In this pseudocode, we are creating elements and manipulating them to connect together inside a pipeline container. If you have ever worked with web development, this should look familiar. This is comparable with how you will be thinking when working with GStreamer.

Using the GStreamer API

Let’s translate this example to real GStreamer code to play a WAV file. We need to select our elements for every operation we want to do: read from a file, decode the content, send to an audio device. For those, we have filesrc, decodebin, autoaudiosink respectively.

  • filesrc: Reads content from a file and has one output pad.
  • decodebin: An abstract element that identifies the content received, and puts together other elements to decode the content and then create new pads with the types of content which are available.
  • autoaudiosink: Selects the first available audio output device and sends the received audio content to be played.

Using GStreamer API in Rust it looks like this:

fn main() {
    gst::init().unwrap();
    
    // Create our pipeline container
    let pipeline = gst::Pipeline::default();

    let file_reader = gst::ElementFactory::make("filesrc").build().unwrap();
    file_reader.set_property("location", "/Users/rafaelcaricio/astronaut.wav");

    let demuxer = gst::ElementFactory::make("decodebin").build().unwrap();
    let audio_device = gst::ElementFactory::make("autoaudiosink").build().unwrap();

    pipeline.add(&file_reader).unwrap();
    pipeline.add(&demuxer).unwrap();
    pipeline.add(&audio_device).unwrap();

    file_reader.link(&demuxer).unwrap();

    // Our event listener to connect pads later on
    let audio_device_pad = audio_device.static_pad("sink").unwrap();
    demuxer.connect_pad_added(move |_, pad| {
        pad.link(&audio_device_pad).unwrap();
    });

    pipeline.set_state(gst::State::Playing).unwrap();
    
    // Wait until end, handle errors, etc
    let bus = pipeline.bus().unwrap();
    for msg in bus.iter() {
        match msg.view() {
            MessageView::Eos(..) => break,
            MessageView::Error(err) => {
                println!("Oh no.. something wrong happened: {:?}", err);
                break
            }
            _ => continue,
        }
    }

    pipeline.set_state(gst::State::Null).unwrap();
}

As you can see, the fake JavaScript API we created for manipulating elements (like they would be HTML elements) and the real GStreamer API are analogous. The GStreamer API is vast and provides many operations to manipulate elements inside pipelines or inside other elements (also called bins). There are elements for many different operations you might want to do, not only conversion of content types.

Here I generated a visualization of the GStreamer pipeline we wrote:

I like to think of elements as functionalities that we can piece together like LEGO blocks. If a source pad is of the same type as the sink pad, then they can connect. Pad types are called capabilities (or simply “caps”) in GStreamer and are compatible when they have compatible fields. Caps look very much like mime-types that we see in the HTTP header Content-Type. An example of caps is audio/x-wav which means an audio content in WAV format or video/x-raw,format=RGBA,width=480,height=320,framerate=30/1 which – you guessed it – is a decoded raw video content in RGBA format with the sizes and frame rate specified.

Conclusion

I hope this brief introduction has given you a good high-level overview of basic concepts that might help you jump start writing your own GStreamer multimedia processing pipelines. I recommend checking out the GStreamer documentation and browsing a little through some of the elements, something might catch your eye. There is extensive documentation and tutorials available in C and Rust that delve further into the concepts I introduced in this blog post. I’ve also created a basic GStreamer project template that you can use. The GStreamer community is very friendly and welcoming to newcomers. We usually hang around at the official GStreamer IRC chat which can be accessed using the matrix.org bridge. Feel free
to say “hi”. See you around!

Today I was working on a project that uses GStreamer and reads content using the SRT protocol on my macOS. The SRT elements could not be found in my system after installing GStreamer using Homebrew. I’ve installed all the GStreamer sub-packages from Homebrew.

$ brew install gstreamer gst-plugins-base \
    gst-plugins-good gst-plugins-bad \
    gst-plugins-ugly gst-libav \
    gst-rtsp-server gst-editing-services

Still, I could not find the strsrc element, for example.

$ gst-launch-1.0 -v srtsrc uri="srt://127.0.0.1:7001" ! fakesink
WARNING: erroneous pipeline: no element "srtsrc"

For some reason, not clear to me at the time, I did not have the plugin installed. This triggered me to look at the source code of GStreamer to find how the element is enabled. I know that GStreamer contains many plugins that depend on different third-party libraries. The SRT set of elements, resides in the gst-plugins-bad bundle. Then it was clear to me that the SRT elements are only compiled if the libsrt is available in the host system at compilation time.

GStreamer meson file showing that it only enables the SRT element if the dependency is found.

Ok, now I know what might be causing the SRT plugin to not be available on my GStreamer’s brew installation. In order to confirm that, I checked the Homebrew formula for its dependencies.

As I was guessing, libsrt is not a dependency of that formula. This means that the meson configuration we saw earlier is not letting the SRT plugin in the compilation process.

The fix

We need to modify the gst-plugins-bad formula locally and then install from source.

First, we uninstall the gst-plugins-bad formula.

$ brew rm gst-plugins-bad

Then we can edit our local formula to add the libsrt dependency. This is not strictly required, as we could just install libsrt manually and then recompile from source. But we will add here anyway, so we make sure we will not override this next time the package update. The following command will open your default editor as defined in the EDITOR environment variable.

$ brew edit gst-plugins-bad

Add the line below to the dependency list:

depends_on "srt"

We now install the package from source.

$ brew reinstall --build-from-source gst-plugins-bad

That’s it. Now you should have the SRT plugin installed and all its elements will be available in your system. We can double-check that by inspecting the one element, like the srtsrc.

$ gst-inspect-1.0 srtsrc

You should see something like:

Yay! That is it. Now I can continue my work and have all the SRT elements available on my macOS GStreamer installation from Homebrew.

I quite frequently stumble upon people in the Python community being misled to think that using async Python code will make their APIs “run faster”. Async Python is a great feature and should be used with care. One point that I constantly find being overseen is the mix of sync and async code. The general rule is that we should never mix blocking code with async code. I would like to present in this post a simplified example where we can observe the usage of async Python will hurt the performance of an API and then see how we can fix it.

Our example application is a FastAPI service that needs to call two operations from an external API within the handling of an HTTP request.

Those are all the dependencies we will use for the example:

# file requirements.txt
fastapi[all]==0.65.1
uvicorn[standard]==0.13.4
requests==2.25.1
httpx==0.18.2

Let’s look at the example API code:

# file app/application.py
from fastapi import FastAPI
import requests
import uuid
import logging

logging.basicConfig(format="%(asctime)s %(message)s")
log = logging.getLogger("myapp")
log.setLevel(logging.DEBUG)

app = FastAPI()

EXTERNAL_API_ENDPOINT = "http://localhost:8888"


@app.get("/healthcheck")
async def healthcheck():
    return {"status": "ok"}


#
# Async mixed with blocking
#

def internal_signing_op(op_num: int, request_id: str) -> None:
    session = requests.Session()
    response = session.request("GET", EXTERNAL_API_ENDPOINT, timeout=2000)
    print(f"{request_id} {op_num}: {response}")


def sign_op1(request_id: str) -> None:
    internal_signing_op(1, request_id)


def sign_op2(request_id: str) -> None:
    internal_signing_op(2, request_id)


@app.get("/async-blocking")
async def root():
    request_id = str(uuid.uuid4())

    print(f"{request_id}: started processing")

    sign_op1(request_id)
    sign_op2(request_id)

    print(f"{request_id}: finished!")
    return {"message": "hello world"}

Here we have a simple application that tries to replicate the behavior that I’m trying to point out. We have mixed async code with the synchronous library requests. The code works fine, but there is one problem. To understand the problem, we need to recap on how Uvicorn works. Uvicorn executes our application server by spawning workers (OS sub-process) that handles the requests coming into our server. Every worker (sub-process) is a fully-featured CPython instance and has its own I/O loop that runs our FastAPI application.

The Main Process holds a socket that is shared with the workers and accepts the HTTP requests that are handled by the workers to actually process the request. We can have as many workers as we want, usually the number of CPU cores. In our case, to make it easier to analyze the behavior, we are going to run only a single worker. We execute our server with the following command:

uvicorn app.application:app --workers 1

I’ve set up a fake external API that we will use for this example. Just a simple server that takes a long time to execute some obscure operation (sleep(20) 😄 ).

# file external_api.py
import asyncio
from fastapi import FastAPI

app = FastAPI()

@app.get("/")
async def root():
    await asyncio.sleep(20)
    return {"message": "Hello World"}

We spin up the external API server using this command:

uvicorn external_api:app --port 8888 --workers 1

We set 1 worker here for no good reason, the important part here is to make the external API run in the port 8888 which is the one we’ve hardcoded in our example application.

Full working tree of the example for reference:

.
├── app
│   ├── __init__.py
│   └── application.py
├── external_api.py
└── requirements.txt

1 directory, 4 files

Now we can call our application with mixed async and sync code and observe what is printed out. I used httpie to make the requests. I’ve opened two consoles and made distinct HTTP requests to our application within the 20 seconds timeframe. This is the output:

❯ uvicorn app.application:app --workers 1 --log-level error
2021-07-07 20:08:57,962 9631c187-8f46-402a-b8ea-a15496643b81: started processing
2021-07-07 20:09:17,978 9631c187-8f46-402a-b8ea-a15496643b81 1: <Response [200]>
2021-07-07 20:09:37,987 9631c187-8f46-402a-b8ea-a15496643b81 2: <Response [200]>
2021-07-07 20:09:37,987 9631c187-8f46-402a-b8ea-a15496643b81: finished!
2021-07-07 20:09:37,988 694ee4be-a15a-49f6-ad60-7c140135a1f6: started processing
2021-07-07 20:09:57,997 694ee4be-a15a-49f6-ad60-7c140135a1f6 1: <Response [200]>
2021-07-07 20:10:18,004 694ee4be-a15a-49f6-ad60-7c140135a1f6 2: <Response [200]>
2021-07-07 20:10:18,004 694ee4be-a15a-49f6-ad60-7c140135a1f6: finished!

As we can observe in the output that even though I’ve made both requests “in parallel” (same second) the server only accepted the request/started processing the second request (694ee4be-a15a-49f6-ad60-7c140135a1f6) after the full execution of the first request (9631c187-8f46-402a-b8ea-a15496643b81) which took a full 40 seconds. During the whole 40 seconds, there was no task switching and the worker event loop was completely blocked. All requests to the API are stale for the full 40 seconds, including requests to any other endpoints that might exist in other parts of the application. Even if the other requests don’t call the external API, they cannot execute because the worker event loop is blocked. If we call the GET /healthcheck endpoint it will not execute either.

One way to hide this problem and have our server still accepting multiple requests when the workers are blocked is to increase the number of workers. But those new workers can also be blocked on sync calls and our API is suspicious of a DDoS attack. The way to solve this problem is by not let our workers get blocked. Our API should be fully async. For that, we need to replace the requests library with a library that supports async.

Let’s now implement a “v2” version of our example API, still calling the same fake external API that takes 20 seconds to process. Furthermore, we will again run Uvicorn with a single worker.

Here is the code with the updated implementation:

#
# Async end-to-end
#


async def v2_internal_signing_op(op_num: int, request_id: str) -> None:
    """Calls external API endpoint and returns the response as a dict."""
    async with httpx.AsyncClient() as session:
        response = await session.request("GET", EXTERNAL_API_ENDPOINT, timeout=2000)
    log.debug(f"{request_id} {op_num}: {response}")


async def v2_sign_op1(request_id: str) -> None:
    await v2_internal_signing_op(1, request_id)


async def v2_sign_op2(request_id: str) -> None:
    await v2_internal_signing_op(2, request_id)


@app.get("/all-async")
async def v2_root():
    request_id = str(uuid.uuid4())

    log.debug(f"{request_id}: started processing")

    await v2_sign_op1(request_id)
    await v2_sign_op2(request_id)

    log.debug(f"{request_id}: finished!")
    return {"message": "hello world"}

Notice that I’ve replaced the requests library with the httpx library which supports async HTTP calls and has an API that is very similar to the one requests provide. The code is functionally equivalent to our previous mixed implementation, but now we implemented async fully. Let’s execute our API using the same commands as before.

uvicorn app.application:app --workers 1

Then calling the API using httpie, but to the fully async endpoint:

http localhost:8000/all-async

The console output is:

2021-07-07 23:30:21,673 da97310b-1d20-4082-8f90-b2e163523b83: started processing
2021-07-07 23:30:23,768 291f556e-038d-4230-8b3b-8e8270383e62: started processing
2021-07-07 23:30:41,718 da97310b-1d20-4082-8f90-b2e163523b83 1: <Response [200 OK]>
2021-07-07 23:30:43,781 291f556e-038d-4230-8b3b-8e8270383e62 1: <Response [200 OK]>
2021-07-07 23:31:01,740 da97310b-1d20-4082-8f90-b2e163523b83 2: <Response [200 OK]>
2021-07-07 23:31:01,740 da97310b-1d20-4082-8f90-b2e163523b83: finished!
2021-07-07 23:31:03,801 291f556e-038d-4230-8b3b-8e8270383e62 2: <Response [200 OK]>
2021-07-07 23:31:03,801 291f556e-038d-4230-8b3b-8e8270383e62: finished!

We can observe in the output that both requests started processing immediately and they are still sequential in their own request lifecycle. The event loop of the Uvicorn worker is not blocked, that is why the second request could continue processing even though the external API did not finish its operation. Other requests, like the GET /healthcheck, are not impacted by the slow execution of the external API. Overall our application continues to serve other requests independently on the external API.

When using async Python one must be careful about what libraries to use. Even though a library might be very popular in the Python community, it doesn’t mean that the library will play well in an async application. Choosing the right libraries will make the execution of the application more concurrent by not blocking the I/O loop. The overall throughput of the application will be better as more unrelated requests can be processed by the same Uvicorn worker.

I’ve used async Python in some applications I maintain, and it was challenging to choose the right libraries to use. The team has to be on the watch for possible places in the code where the event loop may block. Even using the built-in Python logging library or a “print” statement is going to block the I/O loop. Usually, those blocking calls are negligible, but it is important to understand that they are there. I highly recommend also reading the official documentation on other tips for developing async code in Python. Have you developed an async Python API, what was your experience?