By Niren Shah
In Cybersecurity, knowing the history of an asset is absolutely critical. It lets you understand how your environment is evolving over time: which new assets showed up, which ones disappeared, who is talking to who from a communication standpoint, etc. Consider the following simple problem:
Most storage technologies will allow you to model a data structure that will let you do that. The problem is when you're dealing with millions of assets that generate hundreds of thousands of events every second. When you are dealing with that kind of scale, you want to be as efficient as possible and you need to avoid as many transactions as possible - especially the "get-and-check-and-write" problem. By that I mean:
Ideally, you want to try and create idempotent "upsert" transactions to make the system as fast as possible. That means:
We use Elasticsearch extensively across a wide variety of products. If you're not familiar with Elasticsearch - you can click here to get started. There is a docker image here which will make it super easy for you to play around.
Elasticsearch is awesome. It is a distributed full text indexing technology that reasonably degrades to a document storage engine. It scales to hundreds of servers handling millions of transactions per minute. It has some really elegant aggregation features and a well designed JSON based API to interact with it. It also has pluggable scripting engines that you can use to perform on-the-fly manipulation of the requests. Now, generally, I'm not a big fan of writing business logic in various storage technologies (stored procedures and such) as it can get out of hand if you are not careful. But, selectively using these features can make for an elegant architecture.
For this example, we're using a couple of features:
The idea is pretty simple:
# automatically create a new history section if it doesn't exist
# using a TreeMap which keeps the history sorted
if (ctx._source.history == null)
ctx._source.history = new TreeMap();
# set the incoming data to the current and history for the supplied date
ctx._source.history[params.dt]=params.current;
ctx._source.current = params.current;
# trim the map to keep only N (2 in our example) days of history
if (ctx._source.history.size() > 2)
ctx._source.history.remove(ctx._source.history.keySet().iterator().next());
So, long story short, the data:
"hits": [
{
"_index": "interface", "_type": "_doc",
"_id": "192.168.1.1", "_score": 1.0,
"_source": {
"current": {
"services": [{ "port": "22", "proto": "TCP" } ]
},
"history": {
"20180101": { "services": [ { "port": "22", "proto": "TCP" } ] }
}
}
}
]
"hits": [
{
"_index": "interface", "_type": "_doc",
"_id": "192.168.1.1", "_score": 1.0,
"_source": {
"current": {
"services": [ { "port": "22", "proto": "TCP" }, { "port": "12345", "proto": "TCP" } ]
},
"history": {
"20180101": { "services": [ { "port": "22", "proto": "TCP" } ] },
"20180102": { "services": [ { "port": "22", "proto": "TCP" }, { "port": "12345", "proto": "TCP" } ] }
}
}
}
]
There you go! You can insert as many assets as you wish without worrying if the asset exists or not and with the scripts doing the dirty work of keeping the latest history for "N" days and you always have the "current" data. You also have removed the whole optimistic locking problem since you're making idempotent atomic transactions! And trust me, it is blindingly fast: 100K upserts a second on a reasonable sized cluster is very possible :).
What a helpful little painless script!!!
HTTP Requests
# once you have an Elasticsearch instance running, you can just "curl" the following HTTP requests to play around
# create an index with the following mapping to hold the current and historical data
PUT http://localhost:9200/interface
Content-Type: application/json
{
"settings": {
"number_of_replicas": 0
},
"mappings": {
"_doc": {
"properties": {
"current": {"type": "nested"},
"history": {"type": "nested"}
}
}
}
}
# create a stored script to execute at every upsert to keep "N" (2 in this example) days of history
POST http://localhost:9200/_scripts/keep_history
Content-Type: application/json
{
"script": {
"lang": "painless",
"source":
"if (ctx._source.history == null) ctx._source.history = new TreeMap();
ctx._source.history[params.dt]=params.current;
ctx._source.current = params.current;
if (ctx._source.history.size() > 2)
ctx._source.history.remove(ctx._source.history.keySet().iterator().next());
"
}
}
# upsert a document for a date (20180101)
POST http://localhost:9200/interface/_doc/192.168.1.1/_update
Content-Type: application/json
{
"scripted_upsert":true,
"script": {
"id": "keep_history",
"params": {
"current": {
"services": [{"port": "22","proto": "TCP"}]
},
"dt":"20180101"
}
},
"upsert": {}
}
# upsert a document for another date (20180102)
POST http://localhost:9200/interface/_doc/192.168.1.1/_update
Content-Type: application/json
{
"scripted_upsert":true,
"script": {
"id": "keep_history",
"params": {
"current": {
"services": [
{"port": "22","proto": "TCP"},
{"port": "12345","proto": "TCP"}
]
},
"dt":"20180102"
}
},
"upsert": {}
}
# look at the data
POST http://localhost:9200/interface/_search
content-type: application/json
{
"query": {
"match_all": {}
}
}