Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Elk os 5 #6

Open
wants to merge 24 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
348 changes: 348 additions & 0 deletions Elk.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,348 @@
import subprocess
import json
import datetime
import socket
import os
import urllib.request
import re
import sys
import glob
from requests import get
import requests
import boto3
from ec2_metadata import ec2_metadata
import uuid
import datetime
from pprint import pprint
from elasticsearch import Elasticsearch
import time
from art import *
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

avoid wildcard imports



def escape_ansi(line):
"""
when running lynis, the output in the terminal is colored.
when getting the data in python, there is a lot of unwanted ANSI stuff (from the coloring)
that this function removes.
:param line: getting the "ruined" text
:return: the "correct" text
"""
ansi_escape = re.compile(r'(?:\x1B[@-_]|[\x80-\x9F])[0-?]*[ -/]*[@-~]')
return ansi_escape.sub('', line)


def vuls(vuls_root, sudo_password):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider removing the sudo_password argument.
I think it is better to require the script to be run as a privileged user from the beginning (sudo python3 ELK.py)

data = {}
vuls_scan = 'sudo docker run --rm \
-v ~/.ssh:/root/.ssh:ro \
-v $PWD:/vuls \
-v $PWD/vuls-log:/var/log/vuls \
-v /etc/localtime:/etc/localtime:ro \
-v /etc/timezone:/etc/timezone:ro \
vuls/vuls scan \
-config=./config.toml # path to config.toml in docker'

vuls_report = 'sudo docker run --rm \
-v ~/.ssh:/root/.ssh:ro \
-v $PWD:/vuls \
-v $PWD/vuls-log:/var/log/vuls \
-v /etc/localtime:/etc/localtime:ro \
vuls/vuls report \
-format-json \
-config=./config.toml # path to report.toml in docker'
# sudo_password += sudo_password+ " command"

commands = ["cd /", "cd " + vuls_root, sudo_password + vuls_scan]
to_execute = "" # the string that will run in the terminal at the end
for i in commands:
to_execute += i + ';' # merging the commands into one line
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are using a lot of "+" to concut strings it is much better to use format strings like this:
to_execute = f"cd /; cd {vuls_root}; {sudo_password}{vuls_scan};"

https://realpython.com/python-string-formatting/#3-string-interpolation-f-strings-python-36

output1 = subprocess.getoutput(to_execute) # running the commands in the terminal and get the output.
# running the scan and then the report- in order to get just the report output.
commands = ["cd /", "cd " + vuls_root, sudo_password + vuls_report]
to_execute = ""
for i in commands:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the for loop is not necessary if using a format string as mentioned above.

to_execute += i + ';'
output = subprocess.getoutput(to_execute)
# getting the data from the new json file:
directory = "/" + vuls_root + "/results"
output = subprocess.getoutput("sudo " + " chmod -R 777 " + directory) # giving access
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. again use format strings
  2. security-wise giving everyone full permissions to the dir is not recommended. I believe you used it to read the results files with a normal (non-sudo) user for testing.

# we need the newest folder from the result folder:
subfolders = [f.path for f in os.scandir(directory) if f.is_dir()]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extreme solution for getting the last modified directory.
python os module has a builtin method to get it:
https://docs.python.org/3/library/os.path.html#os.path.getmtime

something like that should do it:
latest_subfolder = max(subfolders, key=os.path.getmtime)

max = 0
max_file = ""
for i in subfolders:
# print (i)
temp = i
i = i.replace(directory, "")
i = i.replace("-", "")
i = i.replace("/", "")
i = i.replace(":", "")
i = i.replace("+", "")
i = i.replace("T", "")
i = i.replace("Z", "")
try:
if int(i) > int(max):
max = i
max_file = temp
except:
max_file = temp
json_file = glob.glob(max_file + "/*")[0] # there is only one file in each folder.

with open(json_file, 'r') as outfile:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 you are using "with" statement
to make it perfect:
json modules has "load" & "dump" (no "s") to serialize \ deserialize a json file object.
take a look here:
https://stackabuse.com/reading-and-writing-json-to-a-file-in-python/

json_dict = json.loads(outfile.read()) # the json string
# loading into variable pretty big json file (about 20000 lines) and not really need all of it.
json_cves = json_dict["scannedCves"]

data = {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

already initialized at the beginning of the method

with open('cves.json', 'w') as outfile:
outfile.write("")
for cve_name in json_cves:
data = json_cves[cve_name]
with open('cves.json', 'a') as outfile:
outfile.write(json.dumps(data) + "\n")


def chkrotkit(sudo_password):
second_commnd = sudo_password +" chkrootkit"
commands = ["cd /", second_commnd]
to_execute = "" # the string that will run in the terminal at the end
for i in commands:
to_execute += i + ';'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use format strings

output_rootkit = subprocess.getoutput(to_execute) # getting the output from the terminal
# cleaning the output from the terminal and prepare it fot json-ing.
text = output_rootkit
text = text.replace("ROOTDIR is `/'", "")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will be easier to maintain if all replace strings are in a list object:
chkrotkit_strs = ["Checking", ", it may take a while", ...]
then loop over the list and replace.

text = text.replace("Checking", "")
text = text.replace(", it may take a while", "")
text = text.replace("...", "")
text = text.replace("", "")
text = text.replace(" `", "")
text = text.replace("'", "")
text = text.split("\n")
anomaly = "Searching for Ambients rootkit"
anomaly1 = "Searching for suspicious files and dirs"
data = {}
index = 0
mini_index = 1
temp = ""

with open('rootkit.json', 'w') as outfile:
outfile.write("")
for i in text:
if " " not in i and "/" in i:
data = {"scnned file:": i}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: scanned

with open('rootkit.json', 'a') as outfile:
outfile.write(json.dumps(data) + "\n")
data = {}
else:
i = i.split(" ")
# i eg: ['basename', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', 'not infected']
for j in i:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

complex to understand.
I if understand correctly you can split "j.split(anomaly)" and continue working on the rest of the string after the anomaly.
there is no need for nested loops.
to make it much more readable split this to a few small functions.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The second anomaly contain a list of suspicious files, the loop inside run over them. if its not an anomaly (there are 2), it run regularly.

if anomaly in j or anomaly1 in j:
# pass ?
if anomaly in j:
# maybe calling a file or user this name will break the program.
j = j.split(anomaly)
data[anomaly] = j
else:
for k in text:
if "Searching" not in text[index + mini_index]:
temp = temp + text[index + mini_index] + ", "
text[index + mini_index] = ""
mini_index += 1
# print "here"

else:
break
data[anomaly1] = temp

elif j != "" and j != i[0]:
if "infected" in j or "found" in j:
data[j] = i[0]
# print (data)
"""
# to get rootkit as terminal:
data[i[0]] = j
# without the else below.
"""
else:
data[i[0]] = j

with open('rootkit.json', 'a') as outfile:
if data != {}:
outfile.write(json.dumps(data) + "\n")
data = {}
index += 1


def lynis(directory, sudo_password):
to_execute = ""
"""
if cloned from their github:
run it from the lynis dir:
sudo ./lynis audit system

The git version gives a bit more output, I have a comparison in my mail.
"""
commands = ["cd /", sudo_password + " lynis audit system"]
for i in commands:
to_execute += i + ';'

output_lynis = subprocess.getoutput(to_execute) # saving terminal's output
title = ""
text = output_lynis

escaped_line = escape_ansi(text) # cleaning output from ANSI stuff.

text = escaped_line
text = text.split("Boot and services") # first title, don't need the things before it.
# cleaning text and prepare to json-ing:
text = text[1]
text = text.replace("'", "")
text = text.replace(".", "")
text = text.replace(" ", "was empty")
text = text.split("\n")
title = "Boot and services" # we deleted it earlier, this will be the first.
data = {}
with open('lynis.json', 'w') as outfile:
outfile.write("") # got to clean the file before appending.
for line in text:
line = line.replace("-", "")
if "[+]" in line:
title = line
elif line != "":
try:
line = line.split("[")
data["title"] = title.replace("[+]", "")
line[1] = line[1].replace("]", "")
data[line[0]] = line[1]

with open('lynis.json', 'a') as outfile:
outfile.write(json.dumps(data) + "\n")
data = {}
except Exception as e:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

too broad exception with a pass - nobody will notice something went wrong here.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this catches all of the unwanted things like "----------------------------------------------" from the lynis output.

# print(e)
pass


def send_json_to_ELK(file_name, index_name, instance_id, time, account_id, session_id, type_of, es):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function names should be lowercase

"""
file got to be in ndjson format
"""
try:
with open(file_name) as fp:
for line in fp:
line = line.replace("\n", "")
# line = line.replace(" ", "")
line = line.strip()
# jdoc = {"hostname": hostname, "ipaddr": ipaddr, "type": type_of, "data": json.loads(line)}
if type_of != "lynis":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can initiate a base dict:
mydict = {"instance_id": instance_id, "time": time, "account_id": account_id,
"session_id": session_id,
"type_of_scan": type_of}
at the beginning of each loop interval and use: mydict.update({"added_key": "value"})
to populate more data to the dict.
this way you have the initial format once - maintainable

jdoc = {"instance_id": instance_id, "time": time, "account_id": account_id,
"session_id": session_id,
"type_of_scan": type_of, "data": json.loads(line)}
else:
# lynis:
# need to break the title thing:
json_line = json.loads(line)
title = json_line["title"]
json_line.pop("title")
jdoc = {"instance_id": instance_id, "time": time, "account_id": account_id,
"session_id": session_id,
"type_of_scan": type_of, "title": title, title: json_line} # about "title":title
es.index(index=index_name, doc_type='_doc', body=jdoc)

print("finished upload: " + index_name)
except Exception as e:
print(e)


def main():
tprint("ELK EC2 SCAN")
link = input("insert your Elk URL (e.g: localhost:9200) : ")
username = input("insert your Elk username for auth(if there is no auth, click ENTER): ")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ELK

# (?) need to get the directories too (vuls and lynis), sudo password

if username != "":
password = input("insert your Elk password for auth: ")
elastic = Elasticsearch([link], http_auth=(username, password))
else:
elastic = Elasticsearch([link])
print("")
print("running ...")

# get IP:
hostname = socket.gethostname()
local_ip = socket.gethostbyname(hostname)

sudo_pass = '' # sudo password of the machine
sudo_password = "echo " + sudo_pass + " | sudo -S "

begin_time = datetime.datetime.now()

vuls_directory = "home/ubuntu/idannos"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

read directories path from user input with default to a sub dir in users home folder

lynis_directory = "home/ubuntu/lynis" # don't really use it the function, explanation there.

chkrotkit(sudo_password)
print("finished rootkit")

vuls(vuls_directory, sudo_password)
print("finished vuls")

lynis(lynis_directory, sudo_password)
print("finished lynis")

# get the public IP:
public_ip = str(get('https://api.ipify.org').text)

# get the instance_id:
instance_id = ec2_metadata.instance_id
# uuid:
uuid_string = str(uuid.uuid4())

# timeStamp:
date_and_hour = datetime.datetime.now()
temp = str(date_and_hour).split(" ")
date = temp[0] # getting the date only without hours


# need to fill this before running:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

boto3 has a built-in default to read these arguments from environment variables if set or read them from user input

try:
ACCESS_KEY = ''
SECRET_KEY = ''
iam = boto3.resource('iam',
aws_access_key_id=ACCESS_KEY,
aws_secret_access_key=SECRET_KEY,
)
account_id = iam.CurrentUser().arn.split(':')[4]
except Exception as e:
account_id = "failed to get account_id"
# all of the uploading take about 8 seconds:
send_json_to_ELK("rootkit.json", "aws_rootkit_scan_", instance_id, date, account_id, uuid_string, "rootkit",elastic)
send_json_to_ELK("cves.json", "aws_vuls_cves_scan_", instance_id, date, account_id, uuid_string, "vuls", elastic)
send_json_to_ELK("lynis.json", "aws_lynis_scan_", instance_id, date, account_id, uuid_string, "lynis", elastic)
# how to see: in kibana -> settings -> index patterns -> create index pattern -> providing the names etc.

print("Took: ", datetime.datetime.now() - begin_time, " to execute.")
# locally about 2:30 minutes. less on ec2- about 1:30


if __name__ == "__main__":
main()

"""
mini tutorial before running:
- install Vuls with docker from: https://vuls.io/docs/en/tutorial-docker.html
- install chkrootkit: apt-get install chkrootkit
- install lynis: apt-get install lynis

- Helping with setting auth to ELK: https://github.com/deviantony/docker-elk

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a requirements.txt file to the repo.
https://realpython.com/lessons/using-requirement-files/

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

link gives 404

pip3 install:
boto3
art
ec2_metadata
elasticsearch
requests
datetime
re
"""