Pipeline
A Tenzir pipeline is a chain of operators that represents a dataflow. Operators are the atomic building blocks that produce, transform, or consume data. Think of them as Unix or Powershell commands where the result from one command is feeding into the next:
Our pipelines have 3 types of operators: inputs that produce data, outputs that consume data, and transformations that do both:
Structured and Unstructured Dataflow
Tenzir pipelines make one more distinction: the elements that the operators push through the pipeline are typed. An operator has an upstream and downstream type:
When composing pipelines out of operators, upstream/downstream types of adjacent operators have to match. Otherwise the pipeline is malformed. We call any void-to-void operator sequence a closed pipeline. Only closed pipelines can execute.
If a pipeline is not closed, Tenzir attempts to auto-complete missing input and
output operators. When you run a pipeline on
the command line, we implicitly read JSON from stdin and write JSON to stdout.
When you run a pipeline in the app and do not provide a sink, we only append
serve
to make the pipeline a REST API and extract
data piecemeal through your browser.
Operators can be polymorphic in that they can have more than a single upstream
and downstream type. For example, buffer
accepts
both bytes and events.
Many Tenzir pipelines use the from
and
to
operators to get data in and out, respectively.
For example, to load data from a local JSON file system, filter events where a
certain field matches a predicate, and store the result to an S3 bucket in
Parquet format, you can write the following pipeline:
from "/path/to/file.json"where src_ip in 10.0.0.0/8to "s3://bucket/dir/file.parquet"
This pipelines consists of three operators:
The operator from
is a void-to-events
input operator, where
an events-to-events
transformation operator, and to
an events-to-void
output operator.
Other inputs provide bytes first and you need to interpret them in order to transform them as events.
load_kafka "topic"read_ndjsonselect host, messagewrite_yamlsave_zmq "tcp://1.2.3.4"
With these building blocks in place, you can create all kinds of pipelines, as long as they follow the two principal rules of (1) sequencing inputs, transformations, and outputs, and (2) ensuring that operator upstream/downstream types match. Here is an example of other valid pipeline instances:
Multi-Schema Dataflows
Every event that flows through a pipeline is part of a data frame with a schema. Internally, these data frames are represented as Apache Arrow record batch, encoding potentially of tens of thousands of events at once. This innate batching is the reason why the pipelines can achieve a high throughput.
Unique about Tenzir is that a single pipeline can run with multiple schemas, even though the events are data frames internally. Tenzir parsers (bytes-to-event operators) are capable of emitting events with changing schemas. This behavior is different from other engines that work with data frames where operators can only execute on a single schema. In this light, Tenzir combines the performance of structured query engines with the flexibility of document-oriented engines.
If an operator detects a schema changes, it creates a new batch of events. In
terms of performance, the worst case for Tenzir is a ordered stream of
schema-switching events, with every event having a new schema than the previous
one. But even for those scenarios operators can efficiently build homogeneous
batches when the inter-event order does not matter. Similar to predicate
pushdown, Tenzir operators support “ordering pushdown” to signal to upstream
operators that the event order only matters intra-schema but not inter-schema.
In this case the operator transparently “demultiplex” a heterogeneous event
stream into N homogeneous streams. The sort
operator
is an example of such an operator; it pushes its ordering requirements upstream,
allowing parsers to efficiently create multiple streams events in parallel.
Some operators only work with exactly one instance per schema internally, such
as write_csv
, which first writes a header
and then all subsequent rows have to adhere to the emitted schema. Such
operators cannot handle events with changing schemas.
It’s important to mention that most of the time you don’t have to worry about
schemas. They are there for you when you want to work with them, but it’s often
enough to just specified the fields that you want to work with, e.g., where id.orig_h in 10.0.0.0/8
, or select src_ip, dest_ip, proto
. Schemas are
inferred automatically in parsers, but you can also seed a parser with a schema
that you define explicitly.
Unified Live Stream Processing and Historical Queries
Engines for event stream processing and batch processing of historical data have vastly different requirements. We believe that we found a sweetspot with our language and accompanying execution engine that makes working with both types of workloads incredibly easy: just pick a input operator at the beginning the a pipeline that points to your data source, be it infinitely streaming or stored dataset. Tenzir will figure out the rest.
Our desired user experience for interacting with historical data looks like this:
- Ingest: to store data at a node, create a pipeline that ends with
import
. - Query: to run a historical query over data at the node, create a pipeline
that begins with
export
.
For example, to ingest JSON from a Kafka, you write from "kafka://topic | import
. To query the stored data, you write export | where file == 42
.
The example with export
suggests that the pipeline first exports everything,
and only then starts filtering with where
, performing a full scan over the
stored data. But this is not what’s happening. Pipelines support predicate
pushdown for every operator. This means that export
receives the filter
expression before it starts executing, enabling index lookups or other
optimizations to efficiently execute queries with high selectivity where scans
would be sub-optimal.
The key insight here is to realize that optimizations like predicate pushdown extend to the storage engine and do not only apply to the streaming executor.
The Tenzir native storage engine is not a full-fledged database, but rather a catalog with a thin indexing layer over a set of Parquet/Feather files. These sparse indexes (sketch data structures, such as min-max synopses, Bloom filters, etc.) avoid full scans for every query. The catalog tracks evolving schemas, performs expression binding, and provides a transactional interface to add and replace partitions during compaction.
The diagram below shows the main components of the storage engine:
Because of this transparent optimization, you can just exchange the input
operator of a pipeline and switch between historical and streaming execution
and everything works as expected. A typical use case begins some exploratory
data analysis involving a few export
pipelines, but then would deploy the
pipeline on streaming data by exchanging the input with a Kafka stream.
Built-in Networking to Create Data Fabrics
Tenzir pipelines have built-in network communication, allowing you to create a distributed fabric of dataflows to express intricate use cases that go beyond single-machine processing. There are two types of network connections: implicit and explicit ones:
An implicit network connection exists, for example, when you use the tenzir
binary on the command line to run a pipeline that ends in
import
:
from "/file/eve.json"where tag != "foo"import
Or one that begins with export
:
exportwhere src_ip in 10.0.0.0/8to "/tmp/result.json"
Tenzir pipelines are eschewing networking to minimize latency and maximize throughput, which results in the following operator placement for the above examples:
The executor generally transfers ownership of operators between
processes as late as possible to prefer local, high-bandwidth communication. For
maximum control over placement of computation, you can override the automatic
operator location with the local
and
remote
operators.
The above examples are implicit network connections because they’re not visible in the pipeline definition. An explicit network connection terminates a pipeline as with an input or output operator:
This fictive data fabric above consists of a heterogeneous set of technologies, interconnected by pipelines. Because you have full control over the location where you run the pipeline, you can push it all the way to the “last mile.” This helps especially when there are compliance and data residency concerns that must be properly addressed.