CUE language:
<br> 1<br> 2<br> 3<br> 4<br> 5<br> 6<br> 7<br> 8<br> 9<br>10<br>11<br>12<br>13<br>14<br> | text<br>message: #Identity & {<br> first: "John"<br> Last: "Doe"<br> Age: 40<br>}<br><br>#Identity: {<br> // first name of the person<br> first: =~ "[A-Z].*"<br> // Last name of the person<br> Last: =~ "[A-Z].*"<br> // Age of the person<br> Age?: number & < 130<br>}<br> |
Note: More on the choice of the CUE language will come later in this article.
CUE’s tooling makes it easy for a computer to understand the syntax and facilitate the data validation (e.g., calling cue vet identity
will validate the data against their definition; you can play with the example directly in the Cue playground. If you don’t have the tooling locally) Changing the Last name from Doe
to doe
or setting an age above 130 would result in an error. The validation prevents sending noise or garbage over the channel:
The validation prevents sending noise or garbage over the channel:
As a résumé, the role of the message emitter is to expose its semantic in a language defined by the “federated governance” and to emit a message that is syntactically and functionally coherent with the definition (as validated by the “federated computational governance”).
Once we have transformed the information into an understandable message (this operation is usually called a marshaling process), we pass it to the transmitter to encode it into a signal and emit the data.
The role of the signal is to ensure that the information propagates safely over the communication channel. On top of that, the signal format should allow multiplexing to avoid cooking a spaghetti of channels in our mesh. _ Encapsulating the message into an envelope is a way to address the problem.
The envelope allows creating a shared structure. This structure handles metadata such as the emitter of the message, its type, its source, and so on.
Once again, it is the role of the “federated computational governance” to define the envelope format and standards. Cloudevents is one of those. It standardizes the exchange of messages between the cloud services.
As a résumé, the role of the transmitter is to transmit the message over the channel by encapsulating it into an event (aka marshaling the event). The event envelope is standardized by the governance. The transmitter is a capacity offered by a “self-serve infrastructure” (the data products should be autonomous to transmit some event)
The role of the channel is to store and expose the events to the receivers. Furthermore, the channel’s role is to validate that the message, once accepted, is delivered to the authorized and intended receivers. This will guarantee the security and trust of the whole infrastructure. It is not the role of the channel to analyze the message in any way. It is, therefore, independent of the type of messages (think about the telephone, you can speak English or French on the phone).
Now we have all the concepts, let’s implement the self-serve communication infrastructure that will facilitate the development of the product while ensuring the rules of the federated computational governance.
First, let’s summarize the pipeline using the Unix pipe (|
) symbol:
<br>1<br>2<br>3<br>4<br>5<br> | shell<br>// Send:<br>collect_data | marshal_message | emit_message | validate_message | marshal_event | send_to_channel<br><br>// Receive:<br>filter_events_from_channel | read_from_channel | unmarshal_event | unmarshal_data | profit<br> |
To facilitate the development and maintenance of the mesh, the self-serve communication infrastructure (let’s call it a streaming platform) will provide those capabilities:
On top of that, it must provide a repository of data schemas (data catalog) to make the information addressable.
We will build a product, part of a global platform; its purpose is to interconnect the mesh nodes and provide a standard way to expose the information. We will refer to what we are building as the streaming platform for the rest of the document.
As explained before, the system should be generic enough to be loosely coupled with the semantic of the data.
Precisely, it should be the role of the data-products owner to express the schema and the business validation rules; We can therefore consider the validation and data cataloging capabilities as the configuration of a generic instance of the streaming platform.
Note: The Site Reliability Engineering book defines a configuration as human-computer interface for modifying system behavior.
In our trivial implementation, we use the CUE language because it is accessible and concise in the definition.
Amongst its strengths, CUE:
Our streaming plateform is therefore a generic validation and message publishing interface configured with CUE.
Once we have configured our streaming platform to handle and understand any kind of messages described in CUE, we need to provide an end-user interface that facilitates the ingestion, validation, and transmission of data.
CUE stands for Configure/Unify/Execute. This is a perfect resumé of what we are trying to achieve: we configure the platform to understand a definition of information; internally, the platform unifies the definition and the data and executes the validation.
This is what the command cue vet
we’ve issued before does under the hood. But we may want to turn it into a service for ease of testing and robustness. This simple code snippet shows the power of the SDK: fewer than ten lines are required to validate data against a schema (including the functional constraints).
<br> 1<br> 2<br> 3<br> 4<br> 5<br> 6<br> 7<br> 8<br> 9<br>10<br>11<br>12<br>13<br>14<br>15<br> | go<br>type DataProduct struct {<br> definition cue.Value<br> // ...<br>}<br><br>func (d *DataProduct) ExtractData(b []byte) (cue.Value, error) {<br> data := d.definition.Context().CompileBytes(b)<br> unified := d.definition.Unify(data)<br> opts := []cue.Option{<br> cue.Attributes(true),<br> cue.Definitions(true),<br> cue.Hidden(true),<br> }<br> return data, unified.Validate(opts...)<br>}<br> |
Note: It is beyond the scope of this article to detail the implementation of the services, but as a proof-of-concept, you can refer to this gist for a complete example with an HTTP handler; This gist also holds a functional test that shows different validation scenarios.
So far, we’ve seen that it requires minimal effort to express a schema and validate the data at the entrance of the streaming platform.
Before submitting it to a communication channel (to be defined later), let’s ensure that we write a comprehensible envelope. We’ve expressed it already: interoperability is key to the success of the mesh. Using a standard envelope will guarantee that the message can move out of the platform’s ecosystem.
Cloudvents is a standard format of the Cloud-native Computing Foundation (CNCF) that addresses this need. The specification of Cloudevents standardizes the structure of the envelope by introducing concepts such as the source of the event, the type of the event, or its unique identifier (helpful in tracing and telemetry).
The Federated Governance role ensures that the declaration of the sources and the event types are registered correctly in a catalog and widely accessible to any data consumer. Our streaming data platform will encapsulate the data into a Cloudevent.
Example of event serialized in JSON:
<br>1<br>2<br>3<br>4<br>5<br>6<br>7<br>8<br> | json<br>{<br> "specversion": "1.0",<br> "id": "1234-4567-8910-1234-5678",<br> "source": "MySource",<br> "type": "MySource:newPerson",<br> "datacontenttype": "application/json",<br> "data_base64": "MyMessageInJSONEncodedInBase64=="<br>}<br> |
Of course, the platform can handle the encoding of the event easily.
Once again, it is beyond the scope of this article to show how to do it, but this gist contains all the required information a reader may need to go deeper into the implementation.
Now we have a signal, it is time to propagate it over a communication channel.
The channel is a medium of communication. Therefore, in Shannon’s model, it can be anything that can act as a buffer between the emitter and the receiver. But in our context, we may add some required features:
For robustness and efficiency, a solution like Kafka is probably a safe choice, but to move fast, managed solutions such as Google PubSub could do the trick. As we address the validation of the data at the channel’s entry to avoid the garbage in/garbage out, there is no need for an intrinsic validation mechanism. Kafka is part of the infrastructure (in the definition of a hexagonal architecture); keeping the validation outside of Kafka ensures strong infrastructure segregation and its independence to the streaming platform product.
Note: We won’t dig into partitioning problems in this article, nor will we use the partitioning extension of the Cloudevents specification. To continue the coding journey, you can refer to this implementation of a Kafka connect that publishes a “Cloudevent” into a topic.
An essential part of our journey is the ability of the consumer to extract and understand the data from the signal.
A solution is to expose the schema definition in CUE; an alternative is to provide a standard OpenAPI definition of the schema. This last option has the significant advantage of being compatible with most development languages and frameworks. Therefore, coding a data-consumer will become straightforward, and the time-to-market will increase.
CUE’s toolkit and SDK make it easy to convert a set of definitions into an OpenAPI v3 specification. The command-line utility from the standard CUE toolkit can perform such a job:
<br> 1<br> 2<br> 3<br> 4<br> 5<br> 6<br> 7<br> 8<br> 9<br>10<br>11<br>12<br>13<br>14<br>15<br>16<br>17<br>18<br>19<br>20<br>21<br>22<br>23<br>24<br>25<br>26<br>27<br>28<br>29<br>30<br>31<br>32<br>33<br>34<br>35<br>36<br>37<br>38<br>39<br>40<br>41<br>42<br>43<br>44<br>45<br>46<br>47<br>48<br> | json<br>❯ ( cat << EOF<br>#Identity: {<br> // first name of the person<br> first: =~ "[A-Z].*"<br> // Last name of the person<br> Last: =~ "[A-Z].*"<br> // Age of the person<br> Age?: number & < 130<br> }<br>EOF<br>) | cue export --out=openapi -<br>{<br> "openapi": "3.0.0",<br> "info": {<br> "title": "Generated by cue.",<br> "version": "no version"<br> },<br> "paths": {},<br> "components": {<br> "schemas": {<br> "Identity": {<br> "type": "object",<br> "required": [<br> "first",<br> "Last"<br> ],<br> "properties": {<br> "first": {<br> "description": "first name of the person",<br> "type": "string",<br> "pattern": "[A-Z].*"<br> },<br> "Last": {<br> "description": "Last name of the person",<br> "type": "string",<br> "pattern": "[A-Z].*"<br> },<br> "Age": {<br> "description": "Age of the person",<br> "type": "number",<br> "maximum": 130,<br> "exclusiveMaximum": true<br> }<br> }<br> }<br> }<br> }<br>}<br> |
And for our POC, we will once again use the SDK as exposed in this gist.
Gluing all the code we’ve walked through allows generating an elementary web server that:
All of this in 100 lines of code that you can find here.
You can feed the system with this definition:
<br>1<br>2<br>3<br>4<br>5<br>6<br>7<br>8<br> | text<br>#Identity: {<br> // first name of the person<br> first: =~ "[A-Z].*"<br> // Last name of the person<br> Last: =~ "[A-Z].*"<br> // Age of the person<br> Age?: number & < 130<br>}<br> |
Then query the server for the OpenAPI:
<br> 1<br> 2<br> 3<br> 4<br> 5<br> 6<br> 7<br> 8<br> 9<br>10<br>11<br>12<br> | shell<br>curl http://localhost:8181/openapi<br>{<br> "openapi": "3.0.0",<br> "info": {<br> "title": "Generated by cue.",<br> "version": "no version"<br> },<br> "paths": {},<br> "components": {<br> "schemas": {<br> ...<br>}<br> |
Or send good data …
<br>1<br>2<br> | shell<br>❯ curl -XPOST -d'{"first": "John","Last": "Doe","Age": 40}' http://localhost:8181/<br>ok<br> |
… or bad data …
<br>1<br>2<br> | shell<br>❯ curl -XPOST -d'{"first": "John","Last": "Doe","Age": 140}' http://localhost:8181/<br>#Identity.Age: invalid value 140 (out of bound <130)<br> |
… and, if you have a Kafka broker running on localhost:9092, it will send the message over the wire:
<br>1<br>2<br> | shell<br>❯ curl -XPOST -d'{"first": "John","Last": "Doe","Age": 40}' http://localhost:8181/<br>sent to the channel ok<br> |
The code we’ve generated is, obviously, not production-ready; nevertheless, the core is based on CUE, and we can legitimately wonder if it will scale. CUE is designed to be O(n), and this simple benchmark shows that the code can ingest, validate, encode and send thousands of events into a local Kafka topic in 2.5 seconds:
<br>1<br>2<br>3<br>4<br>5<br>6<br>7<br>8<br> | shell<br>> go test -run=NONE -bench=. -benchmem<br>goos: darwin<br>goarch: amd64<br>pkg: owulveryck.github.io/test1<br>cpu: Intel(R) Core(TM) i7-8850H CPU @ 2.60GHz<br>BenchmarkRun-12 1024 2135823 ns/op 55261 B/op 537 allocs/op<br>PASS<br>ok owulveryck.github.io/test1 2.503s<br> |
Through this article, we’ve built a complete communication and streaming mechanism to interconnect the nodes of a mesh.
This streaming mechanism is part of a global platform and will be operated by a platform team (as defined by the book team topologies from Matthew Skelton and Manuel Pais).
The users of this capability are different stream-aligned teams (in the context of team topologies, a stream-aligned team is organized around the flow of work and can deliver value directly to the customer or end user)
Within the stream-aligned team, the data-product-owner can use the CUE language to describe its data semantic and constraints; The developers will use the validation process to feed the stream with data.
The consumers of data will exploit the data cataloging capability and build other products thanks to the data they will find on the wire.
Meanwhile, the Cloudevents format ensures that the signal can be propagated through the infrastructure in an agnostic way. It also opens the possibility to build data-product on a pure serverless architecture, but let’s keep that warm for another article.
Final note: this article presents a single way to expose data through event streaming. To be complete, a “pull” mechanism should be defined as a standard to fetch the information via, for example, a set of REST APIs.