Beacon Huntress Modules¶
There are two main modules that can be used for the beacon detection data pipeline: beacon.py and ingest.py.
Beacon.py is used for beacon detection. Ingest.py is used for creating the data pipeline. Modules should only be used in a Juypter Notebook environment.
beacon.py¶
The Beacon module is used to run various beacon detection algorithms against an ingest-created delta file. Currently all delta files used within this module must be in Parquet format.
Note
Ensure that you have the __pycache__
directory in the same location as the beacon.py file.
Import Module¶
import beacon
These are the available methods:
- agglomerative_clustering()
- cli_results()
- cluster_conns()
- dbscan_clustering()
- dbscan_by_variance()
- get_dns()
Note
¶Any changes made to the beacon.py module within JuypterLab will require a reload of the beacon.py module.
Use the code below to reload the beacon.py module.
Reload Module¶
import importlib
imported_module = importlib.import_module("beacon")
importlib.reload(imported_module)
import beacon
agglomerative_clustering(**kwargs)¶
Run an agglomerative clustering algorithm against a delta file to identify cluster points to be used to find a beacon.
Request Syntax¶
import beacon
request = beacon.agglomerative_clustering(
delta_file = "string",
delta_column = "string",
max_variance = float,
min_records = int,
cluster_factor = float,
line_amounts = [ int, ],
min_delta_time = int,
gold_loc = "string",
cli_return = True|False,
overwrite = True|False,
verbose = True|False
)
Parameters¶
-
delta_file (string) -- [REQUIRED]
Source delta file. -
delta_column (string)
Source delta column that you want to search. Options (below) are ms = milliseconds and mins = minutes
Options _ delta_ms _ delta_mins -
max_variance (float) -- [REQUIRED]
Variance threshold for any potential beacons. -
min_records (int) -- [REQUIRED]
Minimum number of delta records to search. -
cluster_factor (float) -- [REQUIRED]
The likelihood percentage for a cluster. -
line_amounts (list) -- [REQUIRED]
Line amounts to process at a time, in list format. -
min_delta_time (string) -- [REQUIRED]
Minimum delta time to search by, in milliseconds. -
gold_loc (string)
Results file location. Blank string ("") for no result file
Default = "" -
cli_return (boolean)
Return Command Line Interface (CLI) results. Will display as a print statement and not a return variable.
Default = True -
overwrite (boolean)
Overwrite existing files. True or False (case-sensitive).
Default = False -
verbose (boolean)
Verbose logging. True or False (case-sensitive).
Default = False
Returns¶
Gold file (string)
Example Run¶
import beacon
gold_file = beacon.agglomerative_clustering(
delta_file = "/tmp/delta/delta_file.parquet",
max_variance = 0.01,
min_records = 10,
cluster_factor = 0.2,
line_amounts = [1],
min_delta_time = 300000,
gold_loc = "/tmp/gold/beacon/agg_cluster",
cli_return = False
)
print(gold_file)
cli_results(**kwargs)¶
Return Command Line Interface (CLI) results from a gold file. Results will be printed to the screen.
Request Syntax¶
import beacon
request = beacon.cli_results(
gold_file = "string",
file_type = "string
)
Parameters¶
-
gold_file (string) --[REQUIRED]
Gold file location. -
file_type (string) --[REQUIRED]
Gold file type (CSV or Parquet).
Default = "parquet"
Returns¶
None
Example Run¶
import beacon
beacon.cli_results(
gold_file = "/tmp/gold/data/12345678.parquet"
)
cluster_conns(**kwargs)¶
Return Command Line Interface (CLI) results from a gold file. Results will be printed to the screen.
Request Syntax¶
import beacon
request = beacon.cluster_conns(
delta_file = "string",
delta_column = "string",
conn_cnt = int,
conn_group = int,
threshold = int,
gold_loc = "string",
cli_return = True|False,
overwrite = True|False,
verbose = True|False
)
Parameters¶
-
delta_file (string) -- [REQUIRED]
Source delta file. -
delta_column (string) --[REQUIRED]
Source delta column that you want to search. -
conn_cnt (int)
Total connection count for a group. Greater than or equal (>=).
Default = 10 -
conn_group (int)
Total number of connection groups. Greater than or equal (>=).
Default = 5 -
threshold (int)
The time threshold in minutes when determining connection groups.
Default = 60 -
gold_loc (string)
Results file location. Blank string ("") for no result file
Default = "" -
cli_return (boolean)
Return Command Line Interface (CLI) results. Will display as a print statement and not a return variable.
Default = True -
overwrite (boolean)
Overwrite existing files. True or False (case-sensitive).
Default = False -
verbose (boolean)
Verbose logging. True or False (case-sensitive).
Default = False
Returns¶
Gold file (string)
Example Run¶
import beacon
gold_file = beacon.cluster_conns(
delta_file = "/tmp/delta/delta_file.parquet",
delta_column = "delta_mins",
conn_cnt = 10,
conn_group = 5,
threshold = 60,
gold_loc = "/tmp/gold/beacon/dbscan",
cli_return = False,
overwrite = False,
verbose = False
)
print(gold_file)
dbscan_clustering(**kwargs)¶
Run a DBSCAN cluster algorithm against a delta file to identify cluster points to be used to find a beacon.
Request Syntax¶
import beacon
request = beacon.dbscan_clustering(
delta_file = "string",
delta_column = "string",
minimum_delta = int,
spans = [[ int, int ], [ int, int], ],
minimum_points_in_cluster = int,
minimum_likelihood = float,
gold_loc = "string",
cli_return = True|False,
overwrite = True|False,
verbose = True|False
)
Parameters¶
-
delta_file (string) --[REQUIRED]
Source delta file. -
delta_column (string) --[REQUIRED]
Source delta column that you want to search. -
minimum_delta (int) --[REQUIRED]
Minimum number of delta records to search using your delta column. -
spans (list)
Spans of time that you wish to search, in list format.
Example: Will search within two time spans, 0-5 min and 5-10 min.
[[0, 5], [5, 10]]
Default = [] -
minimum_points_in_cluster (int)
Destination file type (CSV or Parquet).
Default = 4 -
minimum_likelihood (float)
Likelihood value (threshold) used to identify a potential beacon.
Default = .70 -
gold_loc (string)
Results file location. Blank string ("") for no result file
Default = "" -
cli_return (boolean)
Return Command Line Interface (CLI) results. Will display as a print statement and not a return variable.
Default = True -
overwrite (boolean)
Overwrite existing files. True or False (case-sensitive).
Default = False -
verbose (boolean)
Verbose logging. True or False (case-sensitive).
Default = False
Returns¶
Gold file (string)
Example Run¶
import beacon
gold_file = beacon.dbscan_clustering(
delta_file = "/tmp/delta/delta_file.parquet",
delta_column = "delta_mins",
minimum_delta = 1,
spans = [[0, 5], [2, 15], [15, 35], [30, 60]],
minimum_points_in_cluster = 4,
minimum_likelihood = 0.70,
gold_loc = "/tmp/gold/beacon/dbscan",
cli_return = False
)
print(gold_file)
dbscan_by_variance(**kwargs)¶
Run a DBSCAN cluster by filtering out records by delta variance percentage and average delta time. Source delta file must be in Parquet format.
Request Syntax¶
import beacon
request = beacon.dbscan_by_variance(
delta_file = "string",
delta_column = "string",
avg_delta = int,
conn_cnt = int,
span_avg = int,
variance_per = int,
minimum_likelihood = float,
gold_loc = "string",
cli_return = True|False,
overwrite = True|False,
verbose = True|False
)
Parameters¶
-
delta_file (string) --[REQUIRED]
Source delta file. -
delta_column (string) --[REQUIRED]
Source delta column that you want to search. -
avg_delta (int) --[REQUIRED]
Average delta time to include in the search using your delta column. Less than or equal (<=). -
conn_cnt (int)
Total connection count for filtering. Greater than or equal (>=).
Default = 4 -
span_avg (int)
The percentage to increase and decrease from the connections total delta span.
Example: 15 will increase/decrease the minimum and maximum of the delta span by 15%.
delta min = 5
delta max = 10
span min = 4.25 (5 - (5 _ 15%))
span max = 11.5 (10 + (10 _ 15%))
Default = 15 -
variance_per (int)
Total variance percentage for filtering. Greater than or equal (>=).
Default = 4 -
gold_loc (string)
Results file location. Blank string ("") for no result file
Default = "" -
cli_return (boolean)
Return Command Line Interface (CLI) results. Will display as a print statement and not a return variable.
Default = True -
overwrite (boolean)
Overwrite existing files. True or False (case-sensitive).
Default = False -
verbose (boolean)
Verbose logging. True or False (case-sensitive).
Default = False
Returns¶
Gold file (string)
Example Run¶
import beacon
gold_file = beacon.dbscan_by_variance(
delta_file = "/tmp/delta/delta_file.parquet",
delta_column = "delta_mins",
avg_delta = 10,
conn_cnt = 5,
span_avg = 15,
variance_per = 10,
minimum_likelihood = 0.70,
gold_loc = "/tmp/gold/beacon/dbscan_var",
cli_return = False
)
print(gold_file)
get_dns(**kwargs)¶
Look up a DNS record for an IP.
Request Syntax¶
import beacon
request = beacon.get_dns(
ip = "string"
)
Parameters¶
- ip (list) --[REQUIRED]
IP for which you want to look up a DNS entry.
Default = None
Returns¶
DNS Value (string)
Example Run¶
import beacon
dns = beacon.get_dns(
ip = "127.0.0.1"
)
print(dns)
packet(**kwargs)¶
Run a Beacon search by packet size uniqueness.
Request Syntax¶
import beacon
request = beacon.packet(
delta_file = "string",
delta_column = "string",
avg_delta = int,
conn_cnt = int,
min_unique_percent = int,
gold_loc = "string",
cli_return = True|False,
overwrite = True|False,
verbose = True|False
)
Parameters¶
-
delta_file (string) -- [REQUIRED]
Source delta file. -
delta_column (string) --[REQUIRED]
Source delta column that you want to search. -
avg_delta (int) --[REQUIRED]
Average delta time to include in the search using your delta column. Less than or equal (<=). -
conn_cnt (int)
Total connection count for a group. Greater than or equal (>=).
Default = 5 -
min_unique_percent (int)
Lowest packet uniqueness as a percentage. For instance, if you have 10 connections with 9 of them being the same packet size, your unique package size is 10%.
Default = 5 -
threshold (int)
The time threshold in minutes for determining connection groups.
Default = 60 -
gold_loc (string)
Results file location. Blank string ("") for no result file
Default = "" -
cli_return (boolean)
Return Command Line Interface (CLI) results. Will display as a print statement and not a return variable.
Default = True -
overwrite (boolean)
Overwrite existing files. True or False (case-sensitive).
Default = False -
verbose (boolean)
Verbose logging. True or False (case-sensitive).
Default = False
Returns¶
Gold file (string)
Example Run¶
import beacon
gold_file = beacon.packet(
delta_file = "/tmp/delta/delta_file.parquet",
delta_column = "delta_mins",
avg_delta = 15,
conn_cnt = 5,
min_unique_percent = 5,
gold_loc = "/tmp/gold/beacon/dbscan",
cli_return = False,
overwrite = False,
verbose = False
)
print(gold_file)
ingest.py¶
Below are the steps for loading the ingest.py module. Ensure that you have the __pycache__
directory in the same location as the ingest.py file.
import ingest
Any changes done to the ingest.py module within JuypterLab will require a reload of the ingest.py module. Use the code below to reload the ingest.py module.
import importlib
imported_module = importlib.import_module("ingest")
importlib.reload(imported_module)
import ingest
These are the available methods:
- add_dns()
- build_bronze_layer()
- build_delta_files()
- build_filter_files()
- build_null_files()
- build_raw()
- convert_parquet_to_csv()
- download_s3_folder()
- filter_dataframe()
- get_latest_file()
- unzip()
add_dns(**kwargs)¶
Add source and destination DNS entries to file(s) based upon a whitelist Parquet file or Pandas DataFrame. Both the source and destination files need to be in Parquet format.
Request Syntax¶
import ingest
request = ingest.add_dns(
src_file = "string",
dest_loc = "string",
file_type = "string",
dns_file = "string"
dns_df = Pandas_DataFrame,
verbose = True|False
)
Parameters¶
-
src_file (string) --[REQUIRED]
Source folder or file location. -
dest_loc (string) --[REQUIRED]
Destination folder or file location. -
file_type (string) --[REQUIRED]
Destination file type (CSV or Parquet).
Parquet format recommended! -
dns_file (string)
Source DNS lookup file. A blank string is the default. The lookup file must be in Parquet format.
Default = "" -
dns_df (Pandas DataFrame)
Source DNS Pandas DataFrame. A blank string is the default.
Default = "" -
verbose (boolean)
Verbose logging. True or False (case-sensitive).
Default = False
Returns¶
None
Example Run¶
import ingest
# Folder Location
ingest.add_dns(
src_file = "/tmp/source_files",
dest_loc = "/tmp/dest_files",
dns_file = "/tmp/whitelist/whitelist_ip.parquet"
)
# Single File
ingest.add_dns(
src_file = "/tmp/source_files/source_file.parquet",
dest_loc = "/tmp/dest_files",
dns_file = "/tmp/whitelist/whitelist_ip.parquet"
)
build_bronze_layer(**kwargs)¶
Create a bronze data layer for a source folder location. Bronze data will contain TCP data only and will include both source and destination DNS.
Request Syntax¶
import ingest
request = ingest.build_bronze_layer(
src_loc = "string",
bronze_loc = "string",
dns_file = "string",
overwrite = True|False,
verbose = True|False
)
Parameters¶
-
src_file (string) --[REQUIRED]
Raw source folder location. -
bronze_loc (string) --[REQUIRED]
Bronze layer folder location. The destination location for all bronze files. -
dns_file (string)
Source DNS lookup file. The lookup file must be in Parquet format. -
overwrite (boolean)
Overwrite existing files. True or False (case-sensitive).
Default = False -
verbose (boolean)
Verbose logging. True or False (case-sensitive).
Default = False
Returns¶
None
Example Run¶
import ingest
ingest.build_bronze_layer(
src_loc="/tmp/source/",
bronze_loc="/tmp/bronze/zeek/raw/parquet/2022/04/22",
dns_file = "/tmp/whitelist/whitelist_ip.parquet"
)
build_delta_files(**kwargs)¶
Create a delta file from a Parquet folder location or file. Current units of measurement are milliseconds and minutes. Source and destination files must be in Parquet format.
Note
Destination files are named delta_epochtime.parquet.
Request Syntax¶
import ingest
request = ingest.build_delta_files(
src_loc = "string",
delta_file_loc = "string",
delta_file_type = "string",
overwrite = True|False,
)
Parameters¶
-
src_file (string) --[REQUIRED]
Source folder or file location. -
delta_file_loc (string) --[REQUIRED]
Destination folder or file location for delta files. -
delta_file_type (string)
Destination file type (CSV or Parquet).
Parquet format is recommended!
Default = parquet -
overwrite (boolean)
Overwrite existing files. True or False (case-sensitive).
Default = False
Returns¶
None
Example Run¶
import ingest
ingest.build_delta_files(
src_loc = "/tmp/filtered/",
delta_file_loc = "/tmp/delta"
)
build_filter_files(**kwargs)¶
Create filtered files by searching for Ports, Source/Destination IPs or DNS entries. A default metadata.json will be created for historical filter identification purposes.
Request Syntax¶
import ingest
request = ingest.build_filter_files(
src_loc = "string",
dest_file = "string",
port_filter = [ int, ],
port_exclude = True|False,
src_file = [ "string", ],
src_exclude = True|False,
dest_filter = [ "string", ],
dest_exclude = True|False,
s_dns_filter = [ "string", ],
s_dns_exclude = True|False,
d_dns_filter = [ "string", ],
d_dns_exclude = True|False,
match_filter = [ "string", ],
match_exclude = True|False,
file_type = "string",
overwrite = True|False,
verbose = True|False
)
Parameters¶
-
src_file (string) --[REQUIRED]
Source folder location. -
dest_file (string)
Destination file location for filters.
Use a unique folder name at the end to identify your filter - a data folder will be appended automatically.
Default = "" -
port_filter (list)
Ports that you want to filter by, in a list format. Inclusive results only.
Default = None -
port_exclude (boolean)
Exclusive or inclusive search on port_filter values. True or False (case-sensitive).
Exclusive = True (not in)
Inclusive = False (in)
Default = False -
src_filter (list)
Source IP that you want to filter by, in a list format.
Default search will look for items equal to match.
Use of a wildcard (*) will perform a wildcard search.
Default = None -
src_exclude (boolean)
Exclusive or inclusive search on src_filter values. True or False (case-sensitive).
Exclusive = True (not in)
Inclusive = False (in)
Default = True -
dest_filter (list)
Destination DNS that you want to filter by, in a list format.
Default search will look for items equal to match.
Use of a wildcard (*) will perform a wildcard search.
Default = None -
dest_exclude (boolean)
Exclusive or inclusive search on dest_filter values. True or False (case-sensitive).
Exclusive = True (not in)
Inclusive = False (in)
Default = True -
s_dns_filter (list)
Source DNS that you want to filter by, in a list format.
Default search will look for items equal to match.
Use of a wildcard (*) will perform a wildcard search.
Default = None -
s_dns_exclude (boolean)
Exclusive or inclusive search on s_dns_filter values. True or False (case-sensitive).
Exclusive = True (not in)
Inclusive = False (in)
Default = True -
d_dns_filter (list)
Destination DNS that you want to filter by, in a list format.
Default search will look for items equal to match.
Use of a wildcard (*) will perform a wildcard search.
Default = None -
d_dns_exclude (boolean)
Exclusive or inclusive search on d_dns_filter values. True or False (case-sensitive).
Exclusive = True (not in)
Inclusive = False (in)
Default = True -
match_filter (list)
Source and Destination DNS that you want to filter by, in a list format.
Default search will look for items equal to match.
Use of a wildcard (*) will perform a wildcard search.
In order to match, objects must be in both source and destination.
Default = None -
match_exclude (boolean)
Exclusive or inclusive search on match_filter values. True or False (case-sensitive).
Exclusive = True (not in)
Inclusive = False (in)
Default = True -
file_type (string)
Destination file types. Parquet or CSV.
Parquet format is recommended!
Default = parquet -
overwrite (boolean)
Overwrite existing location. True or False (case-sensitive).
Default = False -
verbose (boolean)
Verbose logging. True or False (case-sensitive).
Default = False
Returns¶
None
Example Run¶
import ingest
# NO INCLUDE FILE
# ONLY PORTS 80 & 443 WILL BE INCLUDED.
# EXAMPLE IPS WILL BE REMOVED.
ingest.build_filter_files(
src_loc = "/tmp/source",
dest_file = "/tmp/dest/filtered/test_filter"
port_filter = [80, 443],
src_filter = ["127.0.0.1", "9.9.9.9"],
dest_filter = ["127.0.0.1", "9.9.9.9"])
build_null_files(**kwargs)¶
Create filter files with NULL DNS entries.
Request Syntax¶
import ingest
request = ingest.build_null_files(
src_loc = "string",
dest_loc = "string",
overwrite = True|False,
file_type = "string"
)
Parameters¶
-
src_file (string) --[REQUIRED]
Source folder or file location. -
dest_loc (string) --[REQUIRED]
Destination folder location. -
overwrite (boolean)
Overwrite existing location. True or False (case-sensitive).
Default = False -
file_type (string)
Destination file types. Parquet or CSV.
Parquet format is recommended!
Default = parquet
Returns¶
None
Example Run¶
import ingest
ingest.build_null_files(
src_loc = "/tmp/source",
dest_loc = "/tmp/filtered/no_match")
build_raw(**kwargs)¶
Build initial Parquet file from raw JSON Zeek file. To be used only for single files. Use build_bronze_layer() for folder-level processing.
Request Syntax¶
import ingest
ingest.build_raw(
src_loc = "string",
dest_loc = "string",
overwrite = True|False,
verbose = True|False
)
Parameters¶
-
src_file (string) --[REQUIRED]
Source file. -
dest_parquet_file (string) --[REQUIRED]
Destination Parquet file. -
overwrite (boolean)
Overwrite existing files. True or False (case-sensitive).
Default = False -
verbose (boolean)
Verbose logging. True or False (case-sensitive).
Default = False
Returns¶
None
Example Run¶
import ingest
ingest.build_raw(
src_loc = "/tmp/source/test.json",
dest_loc = "/tmp/raw/test.parquet"
)
convert_parquet_to_csv(**kwargs)¶
Create a CSV file from a Parquet file.
Request Syntax¶
import ingest
request = ingest.convert_parquet_to_csv(
par_file = "string",
csv_file = "string"
)
Parameters¶
-
par_file (string) --[REQUIRED]
Source Parquet file. -
csv_file (string) --[REQUIRED]
Destination CSV file.
Returns¶
None
Example Run¶
import ingest
ingest.convert_parquet_to_csv(
par_file = "/tmp/source/test.parquet",
csv_file = "/tmp/dest/test.csv"
)
download_s3_folder(**kwargs)¶
Download an AWS S3 folder to a local folder. Must have an AWS CLI Profile configured.
Example Run¶
import ingest
request = ingest.download_s3_folder(
s3_loc = "string",
csv_file = "string",
profile = "string"
)
Parameters¶
-
s3_loc (string) --[REQUIRED]
S3 folder location. -
local_dest (string) --[REQUIRED]
Local destination folder. -
profile (string)
AWS CLI Profile name.
Default = default
Returns¶
None
Example Run¶
import ingest
ingest.download_s3_folder(
s3_loc = "s3://bucket/foldername",
csv_file = "/tmp/bucketname/foldername")
filter_dataframe(**kwargs)¶
Filter a Zeek Pandas Dataframe for matching values. Returns a Pandas Dataframe.
Request Syntax¶
import ingest
request = ingest.filter_dataframe(
df = Pandas_DataFrame,
filter_vals = [ "string", ],
filter_column = "string",
filter_type = "string",
ret_org = True|False,
verbose = True|False
)
Parameters¶
-
df (pandas dataframe) --[REQUIRED]
Pandas Dataframe. -
filter_vals (list) --[REQUIRED]
Values that you want to filter by, in a list format.String values search types:
Default search will look for items equal to match.
Use of a wildcard (*) will perform a wildcard search. -
filter_column (string) --[REQUIRED]
Pandas DataFrame column that you want to search by. Case sensitive.Valid Options
* community_id
* conn_state
* duration
* history
* id.orig_h
* id.orig_p
* id.resp_h
* id.resp_p
* local_orig
* local_resp
* missed_bytes
* orig_bytes
* orig_ip_bytes
* orig_pkts
* proto
* resp_bytes
* resp_ip_bytes
* resp_pkts
* service
* ts
* uid
-
filter_type (string) --[REQUIRED]
The type of filter that you are using.
Valid Options
_ int (integer)
_ string (string)
* match (matching Source and Destination DNS values) -
ret_org (boolean)
Return original dataframe if no results gathered. True or False (case-sensitive).
Default = False -
verbose (boolean)
Verbose logging. True or False (case-sensitive).
Default = False
Returns¶
Pandas Dataframe
Example Run¶
import ingest
import pandas as pd
src_df = pd.read_parquet("/tmp/dest/test.parquet")
new_df = ingest.filter_dataframe(
df = src_df,
filter_vals = ["127.0.0.1", "localhost"],
filter_column = "id.orig_h",
filter_type = "string"
)
print(new_df)
get_latest_file(**kwargs)¶
Get the most recent file from a folder location. Method is used to grab the latest delta or gold file. Returns a full path.
Request Syntax¶
import ingest
request = ingest.get_latest_file(
folder_loc = "string",
file_type = "string"
)
Parameters¶
-
folder_loc (string) --[REQUIRED]
Source folder location. -
file_type (string)
Destination file type that you want to search for. Parquet or CSV.
Parquet format is recommended!
Default = parquet
Example Run¶
import ingest
# PULL LATEST DELTA FILE
max_file = ingest.filter_dataframe(
folder_loc = "/tmp/delta/"
)
print(max_file)
unzip(**kwargs)¶
Unzip a raw source file. Only tar files are available for unzipping.
Request Syntax¶
import ingest
request = ingest.unzip(
zip_file = "string",
dest_loc = "string",
file_type = "string"
)
Parameters¶
-
zip_file (string) --[REQUIRED]
Source zip file. -
dest_loc (string) --[REQUIRED]
Destination folder where the zip file will be extracted.
Parquet format is recommended!
Default = parquet -
dest_loc (string)
Zip file type.
Currently the only valid option is tar
Valid Options
* tar
Default = tar
import ingest
ingest.unzip(
zip_file = "/tmp/aw_data_file.tar",
dest_loc = "/tmp/raw/data"
)