Contents

Using LogStash to add AbuseIPDB confidence scores to IP Addressess

Introduction

I really wanted a way to enrich the data collected from my HoneyPots with data from third party sources. AbuseIPDB is one such source. If you don’t know already, AbuseIPDB allows you to query it’s API to check an IP has been previously reported to be involved in malicious activity. You can also pull an up-to-date blacklist of thousands of IP addresses, which is really neat.

AbuseIPDB gives each reported IP address an abuse confidence score as a percentage, based off the amount, frequency and types of attacks reported.

To see if the IPs attacking my honeypots are known offenders or servers that have only recently joined the dark side, I wanted to add the latest abuse confidence score to each of the logs generated by HoneyPotDB.

This looks like a job for Logstash

I use LogStash to parse and analyse logs from HoneyPotDB pots, so this would be the perfect place to add this enrichment. Logstash allows you to pull data from third party sources via a HTTP filter which is really neat.

Now, AbuseIPDB allows you up to 1,000 check API calls each day with their free plan. Although this can be increased to 5,000 if you add a supporter badge (Like the one below) to your site, this is still not enough for the number of logs HoneyPotDB generates, so I need to cache results somehow…

https://www.abuseipdb.com/contributor/44828.svg
AbuseIPDB

Logstash has an extensive list of filter plugins that can be used to translate or enrich data. One of which is the Memcached filter plugin which allows you to set and get data from a Memcached instance, which is perfect! Now, I would prefer to use something a bit more mainstream like REDIS, but LogStash only has input and output plugins for REDIS and for what we’re doing, we’ll need a filter plugin, but Memcached will do absolutely fine :)

Setting up Memcached

So, first of all Let’s get a Memcached instance installed locally. On a Centos 7 server, this is really simple

With memcached up and running, lets see if we can connect to using telnet.

1
2
3
4
5
6
7
[[email protected] ~]# telnet localhost 11211
Trying ::1...
Connected to localhost.
Escape character is '^]'.
quit
Connection closed by foreign host.
[[email protected] ~]#

Awesome.

You can get some quick stats from Memcached using the stats command, which is pretty cool, however there is a nice GUI PHPMemcachedAdmin which is really nice. I’m not gonna show how to install that here, but it’s just a simple as installing any PHP based app on a LAMP stack. Speaking of which, I used docker to spin one of thees up on my LogStash instance quickly https://hub.docker.com/r/mattrayner/lamp.

Using the AbuseIPDB API

AbuseIPDB has a really nice API that we can use to get data about IP addresses using their CHECK endpoint. For example, using curl to send a test query:

1
2
3
4
5
6
curl -G https://api.abuseipdb.com/api/v2/check \
  --data-urlencode "ipAddress=118.25.6.39" \
  -d maxAgeInDays=90 \
  -d verbose \
  -H "Key: $YOUR_API_KEY" \
  -H "Accept: application/json"

Which gives the JSON response 👍:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
{
    "data": {
      "ipAddress": "118.25.6.39",
      "isPublic": true,
      "ipVersion": 4,
      "isWhitelisted": false,
      "abuseConfidenceScore": 100,
      "countryCode": "CN",
      "countryName": "China",
      "usageType": "Data Center/Web Hosting/Transit",
      "isp": "Tencent Cloud Computing (Beijing) Co. Ltd",
      "domain": "tencent.com",
      "hostnames": [],
      "totalReports": 1,
      "numDistinctUsers": 1,
      "lastReportedAt": "2018-12-20T20:55:14+00:00",
      "reports": [
        {
          "reportedAt": "2018-12-20T20:55:14+00:00",
          "comment": "Dec 20 20:55:14 srv206 sshd[13937]: Invalid user oracle from 118.25.6.39",
          "categories": [
            18,
            22
          ],
          "reporterId": 1,
          "reporterCountryCode": "US",
          "reporterCountryName": "United States"
        }
      ]
    }
  }

Creating Logstash Pipelines

Now for the fun bit! LogStash has a really neat ability that allows you to create pipelines that listen on a kind of internal socket. You can then bounce events between pipelines to perform various actions and analysis before outputting to elasticSearch. You can also set persistent queues, saving events to disk until after their output to prevent data loss.

Ingress

So, for my setup, I use Filebeat to ship logs from HoneyPot servers to logstash and add metadata tags accordingly. Just to show you an example, here is mine below, YOUR INGRESS PIPELINE WILL VARY:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# [ honeypot_filebeat Pipeline ]
# This pipeline will handle all incoming events filebeaty on honeypots
#

# Filebeat input from Wazuh.
input {
     beats {
        id => "filebeat_ingress-beats-01"
        port => 5044
        codec => "json"
        add_field => { "[@metadata][source_type]" => "filebeat_honeypot" }
     }
}

# Remove the filebeat 'host' field as it causes mapping issues with the wazuh-alerts elasticsearch template
filter {
  mutate {
      remove_field => [ "host" ]
  }

  if [eventid] {
          if "cowrie" in [eventid] {
                  mutate {
                                id => "ingress_mutate-01"
                                add_field => { "[@metadata][log_type]" => "cowrie" }
                        }
                mutate {
                        id => "ingress_mutate-02"
                        add_tag => "added_cowrie_type"
                }
                }
        }

  if [flow_id] or [alert][signature_id] {
#          if "cowrie" in [eventid] {
                  mutate {
                                id => "ingress_mutate-03"
                                add_field => { "[@metadata][log_type]" => "suricata" }
                        }
                mutate {
                        id => "ingress_mutate-04"
                        add_tag => "added_suricata_type"
                }
#                }
        }

}

# Output to check_blockable pipeline
output {
    if [@metadata][source_type] == "filebeat_honeypot" {
    pipeline { send_to => [filter_whois] }
    }
}

In the above, I have my main input block, which has a beats input configured on port 5044. This decodes the received data and adds a metadata field for source_type. I then check if the event contains the string for SSH honeypot or suricata for NIDS logs. I then output all logs to another Logstash pipeline to add WhoIs data using an api-api.com lookup, more on that in another post :D

AbuseIPDB check and caching

The WhoIs pipeline then outputs to my AbuseIPDB check pipeline below:

Important things to change!
Make sure you change YOUR_KEY_HERE to your AbuseIPDB API Key
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
input { pipeline { address => filter_abuseipdb } }

filter {
        # Try and pull IP info from cache
        if [src_ip] {
                        memcached {
                                        hosts => ["192.168.1.25:11211"]
                                        namespace => "abuseipdb"
                                        get => {
                                        "%{[src_ip]}" => "[abuseipdb]"
                                        }
                                        add_tag => ["abuseipdb_from_cache"]
                                        id => "memcached-abuseipdb-get"
                        }
        }
                if ! [abuseipdb] and [src_ip] {
                if "." in [src_ip] or ":" in [src_ip] {
                http {
                        id => "abuseipdb-http-01"
                        url => "https://api.abuseipdb.com/api/v2/check?ipAddress=%{[src_ip]}"
                        #url => "https://api.abuseipdb.com/api/v2/check"
                        #query => { "ipAddress" => "[src_ip]" }
                        verb => "GET"
                        connect_timeout => 15
                        headers => { "Accept" => "application/json" "Key" => "YOUR_KEY_HERE" }
                        target_body => "[abuseipdb]"
                        target_headers => "[@metadata][abuseipdb_response_headers]"
                }
                if [abuseipdb] {
                        mutate { convert => { "[abuseipdb]" => "string" } }
                        memcached {
                                hosts => ["192.168.1.25:11211"]
                                namespace => "abuseipdb"
                                set => {
                                "[abuseipdb]" => "%{[src_ip]}"
                                }
                                ttl => 7200
                                add_tag => ["abuseipdb_cached"]
                                id => "memcached-abuseipdb-set"
                        }
                }
                }
        }
        if [abuseipdb] {
                mutate { convert => { "[abuseipdb]" => "string" } }
        }
}
output {
        if [@metadata][source_type] == "filebeat_honeypot" {
                pipeline { send_to => [abuseipdb_report] }
        }
}

This pipeline tags received events and checks if the event has the source IP field src_ip. It then tries to do a Memcached get using the Memcached filter to see if the IP’s confidence score has already been cached. If it’s successful, the cached data (Which is just the regular AbuseIPDB JSON response string) gets parsed and saved into the abuseipdb field in the event. I also add the tag abuseipdb_from_cache .

Next, I have an if condition block that checks if the abuseipdb field DOES NOT exist and a source IP has been passed. I then double check that the src-ip field contains an IP by checking for the string . or : (Which is good enough :P ) I then use the HTTP filter to poke the AbuseIPDB API to get the IP’s confidence score.

Here's a quick tip!
Just so you know, %{[SOME][VARIABLE]} allows you to output variables into a string, which is very useful.

I then save the response body to the abuseipdb field. To cache the returned results, I convert the event to a string and use the Memcached filter to do a set. I also set the TTL to 7200 seconds (2 hours), which defines how long the data is cached for before being deleted. Tags are also added to show that the event has been cached.

I then do a check to see if the abuseipdb field exists and make sure it’s a string before forwarding the event to my AbuseIPDB report pipeline.

AbuseIPDB report pipeline

I also want to report the IP addresses I see to AbuseIPDB to let other’s know about these sneaky little bastards. Here is my pipeline to do that:

Important things to change!
Be sure to change YOUR_KEY_HERE to your key, URL_ENCODED_MESSAGE to a URL encoded comment and the right CATEGORIES IDs.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
input { pipeline { address => abuseipdb_report } }
filter {
# Try and pull IP info from cache
        if [src_ip] {
                        memcached {
                                hosts => ["192.168.1.25:11211"]
                                namespace => "abuseipdb_reported"
                                get => {
                                "%{[src_ip]}" => "[abuseipdb_reported]"
                                }
                                add_tag => ["abuseipdb_reported_from_cache"]
                                id => "memcached-abuseipdb_reported-get"
                        }
                        if ! [abuseipdb_reported] {
                if "." in [src_ip] or ":" in [src_ip] {
                        if "SSH" in [sensor] or "ssh" in [sensor] {
                                http {
                                        id => "abuseipdb_reported-http-01"
                                        url => "https://api.abuseipdb.com/api/v2/report?ip=%{[src_ip]}&categories=CATEGORIES&comment=URL_ENCODED_MESSAGE"
                                        verb => "POST"
                                        connect_timeout => 15
                                        headers => { "Accept" => "application/json" "Key" => "YOUR_KEY_HERE" }
                                        target_body => "[abuseipdb_reported]"
                                        target_headers => "[@metadata][abuseipdb_reported_response_headers]"
                                        add_tag => [ "abuseipdb_reported" ]
                                }
                                                                mutate { convert => { "[abuseipdb]" => "string" } }
                                                                memcached {
                                                                                hosts => ["192.168.1.25:11211"]
                                                                                namespace => "abuseipdb_reported"
                                                                                set => {
                                                                                "[abuseipdb_reported]" => "%{[src_ip]}"
                                                                                }
                                                                                ttl => 900
                                                                                add_tag => ["abuseipdb_reported_cached"]
                                                                                id => "memcached-abuseipdb_reported-set"
                                                                }
                        }
                }
                        }
        }
        if [abuseipdb_reported] {
                mutate { convert => { "[abuseipdb_reported]" => "string" } }
        }
}
output {
        if [@metadata][source_type] == "filebeat_honeypot" {
                pipeline { send_to => [elasticsearch_egress] }
        }
}

This pipeline is very similar to the previous. I first check if the IP has already been reported in the last 15 minutes (AbuseIPDB will return an HTTP 429 if you try otherwise, eating up API requests), if it has, I add the tag abuseipdb_reported_from_cache. If not, I use the HTTP filter to report the IP address, setting a category and comment.

I then cache the response body into Memcached as before, but this time setting the TTL to 15 minutes. I then string-ify everything before finally sending the event to elasticSearch. Phew!

One last thing, be sure to set all these in your logstash pipelines.yaml file, here is mine for example:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
# This file is where you define your pipelines. You can define multiple.
# For more information on multiple pipelines, see the documentation:
#   https://www.elastic.co/guide/en/logstash/current/multiple-pipelines.html

- pipeline.id: ingress
  path.config: "/etc/logstash/conf.d/ingress.conf"
  queue.type: persisted
- pipeline.id: filter_abuseipdb
  path.config: "/etc/logstash/conf.d/abuseipdb.conf"
- pipeline.id: filter_abuseipdb_report
  path.config: "/etc/logstash/conf.d/abuseipdb_report.conf"
- pipeline.id: elasticsearch_egress
  path.config: "/etc/logstash/conf.d/elasticsearch_egress.conf"

So, does it work?

Like a charm! This actually worked a lot better than expected. The below image shows an example cowrie event in Kibana with AbuseIPDB data:

/8-logstash-adding-abuseipdb/abuseipdb_cache.png
AbuseIPDB Cache

Looking at the tags, this event was pulled from the Cache :D

Monitoring Memcached

As mentioned towards the start, You can use tools like PHPMemcachedAdmin to perform really easy monitoring and maintenance of your Memcached instance(s). I use it because it makes pretty metrics and shit, and it’s nice to see the total amount of GET and SET requests with a request per second value :D

/8-logstash-adding-abuseipdb/memcachedadmin.png
Monitoring Memcached