Skip to content

Коллектор sFlow на Logstash

В данном посте будет описана процедура установки коллектора sFlow на базе ELK на CentOS 7 для сбора трафика со свитча Juniper.

Компоненты:

  • Logstash — система принимающая данные, фильтрующая их и передающая их в Elasticsearch.
  • Elasticsearch — база данных, которая хранит информацию и осуществляет по ней поиск.
  • Kibana — служит для визуального представления данных, хранящихся в Elasticsearch.

Взаимодействие компонентов выглядит так:

sflow-elk.png

10.10.10.10 - адрес сервера CentOS, где установлен ELK

Установка Java JDK

Для работы Elasticsearch рекомендуется Java версии 8.

wget --no-check-certificate --no-cookies --header "Cookie: oraclelicense=accept-securebackup-cookie" http://download.oracle.com/otn-pub/java/jdk/8u111-b14/jdk-8u111-linux-x64.rpm
rpm -Uhv jdk-8u111-linux-x64.rpm

Проверяем версию

java -version

Установка Elacticsearch

rpm --import http://packages.elastic.co/GPG-KEY-elasticsearch
nano /etc/yum.repos.d/elasticsearch.repo
[elasticsearch-5.x] name=Elasticsearch repository for 5.x packages baseurl=https://artifacts.elastic.co/packages/5.x/yum gpgcheck=1 gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch enabled=1 autorefresh=1 type=rpm-md
yum install elasticsearch
systemctl daemon-reload
systemctl start elasticsearch.service
systemctl enable elasticsearch

Проверяем статус сервиса

systemctl status elasticsearch

Создаем сервис для FirewallD:

cp /usr/lib/firewalld/services/ssh.xml /etc/firewalld/services/elasticsearch.xml
nano /etc/firewalld/services/elasticsearch.xml
<service> 
<short>Elasticsearch</short> 
<description>Elasticsearch</description> 
<port protocol="tcp" port="9200"/> 
</service>

Открываем доступ в сетевом экране:

firewall-cmd --permanent --zone=public --add-service=elasticsearch
firewall-cmd --reload

Проверяем сетевую доступность Elasticsearch

ss -naut | grep :9200
curl -X GET http://localhost:9200
curl -X GET 'http://localhost:9200/_nodes?pretty'

Изменяем настройки

nano /etc/elasticsearch/elasticsearch.yml
cluster.name: sflowcluster 
node.name: "sflowcollector" 
network.host: localhost

Применяем изменения

systemctl restart elasticsearch

Установка Logstash

nano /etc/yum.repos.d/logstash.repo
[logstash-5.x] name=Logstash repository for 5.x packages baseurl=https://artifacts.elastic.co/packages/5.x/yum gpgcheck=1 gpgkey=http://packages.elasticsearch.org/GPG-KEY-elasticsearch enabled=1 autorefresh=1 type=rpm-md
yum install logstash
systemctl daemon-reload
systemctl start logstash
systemctl enable logstash

Проверяем статус сервиса

systemctl status logstash

Установка Kibana

nano /etc/yum.repos.d/kibana.repo
[kibana-5.x] name=Kibana repository for 5.x packages baseurl=https://artifacts.elastic.co/packages/5.x/yum gpgcheck=1 gpgkey=http://packages.elastic.co/GPG-KEY-elasticsearch enabled=1 autorefresh=1 type=rpm-md
yum install kibana.x86_64
systemctl daemon-reload
systemctl start kibana
systemctl enable kibana

Проверяем статус сервиса

systemctl status kibana

Проверяем автозагрузку всех сервисов

systemctl list-unit-files --type=service | grep -E 'elasticsearch|kibana|logstash'

Проверяем сетевую доступность Kibana

ss -naut | grep :5601

Добавляем в сетевой фильтр

cp /usr/lib/firewalld/services/ssh.xml /etc/firewalld/services/kibana.xml
nano /etc/firewalld/services/kibana.xml
<service> 
<short>Kibana</short> 
<description>Kibana</description> 
<port protocol="tcp" port="5601"/> 
</service>

Открываем доступ к Kibana через сетевой фильтр

firewall-cmd --permanent --zone=public --add-service=kibana
firewall-cmd --reload

Изменяем настройки

nano /etc/kibana/kibana.yml
server.host: 10.10.10.10 
server.name: "sflowcollector"

Применяем изменения

systemctl restart kibana

Настройка Logstash

cp /usr/lib/firewalld/services/ssh.xml /etc/firewalld/services/sflow.xml
nano  /etc/firewalld/services/sflow.xml
<service> 
<short>sflow</short> 
<description>sflow</description> 
<port protocol="udp" port="6343"/> 
</service>

Открываем доступ к sflow через сетевой фильтр

firewall-cmd --permanent --zone=public --add-service=sflow
firewall-cmd --reload

Проверяем что все добавленные сервисы открыты в сетевом фильтре

firewall-cmd --list-all

Устанавливаем logstash плагины для поддержки sflow

/usr/share/logstash/bin/logstash-plugin install logstash-codec-sflow
/usr/share/logstash/bin/logstash-plugin install logstash-filter-translate

Проверяем, что плагины есть в списке

/usr/share/logstash/bin/logstash-plugin list

Настраиваем Logstash на обработку sflow

nano /etc/logstash/conf.d/config.conf
input {
udp {
port => 6343
codec => sflow {}
}
}
filter {
translate {
field => protocol
dictionary => [ "1", "ETHERNET",
"11", "IP"
]
fallback => "UNKNOWN"
destination => protocol
override => true
}
translate {
field => eth_type
dictionary => [ "2048", "IP",
"33024", "802.1Q VLAN"
]
fallback => "UNKNOWN"
destination => eth_type
override => true
}
translate {
field => vlan_type
dictionary => [ "2048", "IP"
]
fallback => "UNKNOWN"
destination => vlan_type
override => true
}
translate {
field => ip_protocol
dictionary => [ "6", "TCP",
"17", "UDP",
"50", "Encapsulating Security Payload"
]
fallback => "UNKNOWN"
destination => ip_protocol
override => true
}
mutate {
convert => { "src_port" => "integer"
"dst_port" => "integer" }
}
}
output {
elasticsearch {
index => "sflow-%{+YYYY.MM.dd}"
hosts => ["localhost"]
}
}

В файле конфигурации Logstash мы настроили следующее:

  • открыть порт udp 6343
  • обрабатывать входящий трафик на порту udp 6343 кодеком sflow
  • преобразовывать номера протоколов в их названия
  • конвертировать тип данных портов в целочисленный
  • хранить данные в elastichsearch в индексе sflow-*

Проверяем корректность конфигурационного файла

/usr/share/logstash/bin/logstash --path.settings /etc/logstash/ -f /etc/logstash/conf.d/config.conf -t

Применяем изменения

systemctl restart logstash

Проверяем сетевую доступность sflow

ss -naut | grep :6343

Настраиваем Juniper

set protocols sflow polling-interval 30 sample-rate ingress 1000 egress 1000
set protocols sflow collector 10.10.10.10 udp-port 6343
set protocols sflow interfaces ge-0/0/47

Настраиваем Kibana

Проверяем что индекс в Elastichsearch создался

curl -X GET 'http://localhost:9200/sflow-*

Заходим на Kibana

http://10.10.0.52:5601/

настраиваем индекс по умолчанию Management Index name or pattern sflow-* .

Пробуем отобразить интересующий трафик

  • Добавляем поля Discover -> Add fields (src_ip, dst_ip, dst_port, ip_protocol)
  • Пробуем поиск по источнику Discover -> src_ip: 10.10.*

Может потребоваться удалить сохраненную информацию в Elastisearch

curl -X DELETE 'http://localhost:9200/sflow-*

Настраиваем удаление старых indeces

Со временем количество данных, хранящихся в Elaasticsearch может занять все свободное место на диске. Если ELK используется только для просмотра новых данных, то можно настроить удаление старых indeces. Проверяем количество indeces и занимаемое ими место

curl -X GET http://localhost:9200/_cat/indices?v

Установим утилиту curator, которая позволит нам удалять indeces

nano /etc/yum.repos.d/curator.repo
[curator-4]
name=CentOS/RHEL 7 repository for Elasticsearch Curator 4.x packages
baseurl=http://packages.elastic.co/curator/4/centos/7
gpgcheck=1
gpgkey=http://packages.elastic.co/GPG-KEY-elasticsearch
enabled=1
yum install elasticsearch-curator.x86_64

Проверим indeces, старше 30 дней

curator_cli show_indices --filter_list '[{"filtertype":"age","source":"name","direction":"older", "timestring": "'%Y.%m.%d'", "unit":"days","unit_count":30},{"filtertype":"pattern","kind":"prefix","value":"sflow-"}]'

Добавляем задание в cron на удаление indeces старше 90 дней

cp /etc/cron.daily/logrotate /etc/cron.daily/curator
nano /etc/cron.daily/curator
#!/bin/sh
/usr/bin/curator_cli delete_indices --filter_list '[{"filtertype":"age","source":"name","direction":"older", "timestring": "'%Y.%m.%d'", "unit":"days","unit_count":90},{"filtertype":"pattern","kind":"prefix","value":"sflow-"}]'

Стоить заметить, что это минимальная настройка для проверки работоспособности. При вводе ELK в эксплуатацию следует подумать о аутентификации, шифровании, оптимизации работы Java.

Источник 1

Источник 2