Beacon Huntress Modules¶
There are two main modules that can be used for the beacon detection data pipeline: beacon.py and ingest.py.
Beacon.py is used for beacon detection. Ingest.py is used for creating the data pipeline. Modules should only be used in a Juypter Notebook environment.
beacon.py¶
The Beacon module is used to run various beacon detection algorithms against an ingest-created delta file. Currently all delta files used within this module must be in Parquet format.
Note
Ensure that you have the __pycache__ directory in the same location as the beacon.py file.
Import Module¶
import beacon
These are the available methods:
- agglomerative_clustering()
- cli_results()
- cluster_conns()
- dbscan_clustering()
- dbscan_by_variance()
- get_dns()
Note
¶Any changes made to the beacon.py module within JuypterLab will require a reload of the beacon.py module.
Use the code below to reload the beacon.py module.
Reload Module¶
import importlib
imported_module = importlib.import_module("beacon")
importlib.reload(imported_module)
import beacon
agglomerative_clustering(**kwargs)¶
Run an agglomerative clustering algorithm against a delta file to identify cluster points to be used to find a beacon.
Request Syntax¶
import beacon
request = beacon.agglomerative_clustering(
    delta_file = "string",
    delta_column = "string",
    max_variance = float,
    min_records = int,
    cluster_factor = float,
    line_amounts = [ int, ],
    min_delta_time = int,
    gold_loc = "string",
    cli_return = True|False,
    overwrite = True|False,
    verbose = True|False
)
Parameters¶
- 
delta_file (string) -- [REQUIRED] 
 Source delta file.
- 
delta_column (string) 
 Source delta column that you want to search. Options (below) are ms = milliseconds and mins = minutes
 Options _ delta_ms _ delta_mins
- 
max_variance (float) -- [REQUIRED] 
 Variance threshold for any potential beacons.
- 
min_records (int) -- [REQUIRED] 
 Minimum number of delta records to search.
- 
cluster_factor (float) -- [REQUIRED] 
 The likelihood percentage for a cluster.
- 
line_amounts (list) -- [REQUIRED] 
 Line amounts to process at a time, in list format.
- 
min_delta_time (string) -- [REQUIRED] 
 Minimum delta time to search by, in milliseconds.
- 
gold_loc (string) 
 Results file location. Blank string ("") for no result file
 Default = ""
- 
cli_return (boolean) 
 Return Command Line Interface (CLI) results. Will display as a print statement and not a return variable.
 Default = True
- 
overwrite (boolean) 
 Overwrite existing files. True or False (case-sensitive).
 Default = False
- 
verbose (boolean) 
 Verbose logging. True or False (case-sensitive).
 Default = False
Returns¶
Gold file (string)
Example Run¶
import beacon
gold_file = beacon.agglomerative_clustering(
    delta_file = "/tmp/delta/delta_file.parquet",
    max_variance = 0.01,
    min_records = 10,
    cluster_factor = 0.2,
    line_amounts = [1],
    min_delta_time = 300000,
    gold_loc = "/tmp/gold/beacon/agg_cluster",
    cli_return = False
)
print(gold_file)
cli_results(**kwargs)¶
Return Command Line Interface (CLI) results from a gold file. Results will be printed to the screen.
Request Syntax¶
import beacon
request = beacon.cli_results(
    gold_file = "string",
    file_type = "string
)
Parameters¶
- 
gold_file (string) --[REQUIRED] 
 Gold file location.
- 
file_type (string) --[REQUIRED] 
 Gold file type (CSV or Parquet).
 Default = "parquet"
Returns¶
None
Example Run¶
import beacon
beacon.cli_results(
    gold_file = "/tmp/gold/data/12345678.parquet"
)
cluster_conns(**kwargs)¶
Return Command Line Interface (CLI) results from a gold file. Results will be printed to the screen.
Request Syntax¶
import beacon
request = beacon.cluster_conns(
    delta_file = "string",
    delta_column = "string",
    conn_cnt = int,
    conn_group = int,
    threshold = int,
    gold_loc = "string",
    cli_return = True|False,
    overwrite = True|False,
    verbose = True|False
)
Parameters¶
- 
delta_file (string) -- [REQUIRED] 
 Source delta file.
- 
delta_column (string) --[REQUIRED] 
 Source delta column that you want to search.
- 
conn_cnt (int) 
 Total connection count for a group. Greater than or equal (>=).
 Default = 10
- 
conn_group (int) 
 Total number of connection groups. Greater than or equal (>=).
 Default = 5
- 
threshold (int) 
 The time threshold in minutes when determining connection groups.
 Default = 60
- 
gold_loc (string) 
 Results file location. Blank string ("") for no result file
 Default = ""
- 
cli_return (boolean) 
 Return Command Line Interface (CLI) results. Will display as a print statement and not a return variable.
 Default = True
- 
overwrite (boolean) 
 Overwrite existing files. True or False (case-sensitive).
 Default = False
- 
verbose (boolean) 
 Verbose logging. True or False (case-sensitive).
 Default = False
Returns¶
Gold file (string)
Example Run¶
import beacon
gold_file = beacon.cluster_conns(
    delta_file = "/tmp/delta/delta_file.parquet",
    delta_column = "delta_mins",
    conn_cnt = 10,
    conn_group = 5,
    threshold = 60,
    gold_loc = "/tmp/gold/beacon/dbscan",
    cli_return = False,
    overwrite = False,
    verbose = False
)
print(gold_file)
dbscan_clustering(**kwargs)¶
Run a DBSCAN cluster algorithm against a delta file to identify cluster points to be used to find a beacon.
Request Syntax¶
import beacon
request = beacon.dbscan_clustering(
    delta_file = "string",
    delta_column = "string",
    minimum_delta = int,
    spans = [[ int, int ], [ int, int], ],
    minimum_points_in_cluster = int,
    minimum_likelihood = float,
    gold_loc = "string",
    cli_return = True|False,
    overwrite = True|False,
    verbose = True|False
)
Parameters¶
- 
delta_file (string) --[REQUIRED] 
 Source delta file.
- 
delta_column (string) --[REQUIRED] 
 Source delta column that you want to search.
- 
minimum_delta (int) --[REQUIRED] 
 Minimum number of delta records to search using your delta column.
- 
spans (list) 
 Spans of time that you wish to search, in list format.
 Example: Will search within two time spans, 0-5 min and 5-10 min.
 [[0, 5], [5, 10]] 
 Default = []
- 
minimum_points_in_cluster (int) 
 Destination file type (CSV or Parquet).
 Default = 4
- 
minimum_likelihood (float) 
 Likelihood value (threshold) used to identify a potential beacon.
 Default = .70
- 
gold_loc (string) 
 Results file location. Blank string ("") for no result file
 Default = ""
- 
cli_return (boolean) 
 Return Command Line Interface (CLI) results. Will display as a print statement and not a return variable.
 Default = True
- 
overwrite (boolean) 
 Overwrite existing files. True or False (case-sensitive).
 Default = False
- 
verbose (boolean) 
 Verbose logging. True or False (case-sensitive).
 Default = False
Returns¶
Gold file (string)
Example Run¶
import beacon
gold_file = beacon.dbscan_clustering(
    delta_file = "/tmp/delta/delta_file.parquet",
    delta_column = "delta_mins",
    minimum_delta = 1,
    spans = [[0, 5], [2, 15], [15, 35], [30, 60]],
    minimum_points_in_cluster = 4,
    minimum_likelihood = 0.70,
    gold_loc = "/tmp/gold/beacon/dbscan",
    cli_return = False
)
print(gold_file)
dbscan_by_variance(**kwargs)¶
Run a DBSCAN cluster by filtering out records by delta variance percentage and average delta time. Source delta file must be in Parquet format.
Request Syntax¶
import beacon
request = beacon.dbscan_by_variance(
    delta_file = "string",
    delta_column = "string",
    avg_delta = int,
    conn_cnt = int,
    span_avg = int,
    variance_per = int,
    minimum_likelihood = float,
    gold_loc = "string",
    cli_return = True|False,
    overwrite = True|False,
    verbose = True|False
)
Parameters¶
- 
delta_file (string) --[REQUIRED] 
 Source delta file.
- 
delta_column (string) --[REQUIRED] 
 Source delta column that you want to search.
- 
avg_delta (int) --[REQUIRED] 
 Average delta time to include in the search using your delta column. Less than or equal (<=).
- 
conn_cnt (int) 
 Total connection count for filtering. Greater than or equal (>=).
 Default = 4
- 
span_avg (int) 
 The percentage to increase and decrease from the connections total delta span.
 Example: 15 will increase/decrease the minimum and maximum of the delta span by 15%.
 delta min = 5 
 delta max = 10 
 span min = 4.25 (5 - (5 _ 15%)) 
 span max = 11.5 (10 + (10 _ 15%)) 
 Default = 15
- 
variance_per (int) 
 Total variance percentage for filtering. Greater than or equal (>=).
 Default = 4
- 
gold_loc (string) 
 Results file location. Blank string ("") for no result file
 Default = ""
- 
cli_return (boolean) 
 Return Command Line Interface (CLI) results. Will display as a print statement and not a return variable.
 Default = True
- 
overwrite (boolean) 
 Overwrite existing files. True or False (case-sensitive).
 Default = False
- 
verbose (boolean) 
 Verbose logging. True or False (case-sensitive).
 Default = False
Returns¶
Gold file (string)
Example Run¶
import beacon
gold_file = beacon.dbscan_by_variance(
    delta_file = "/tmp/delta/delta_file.parquet",
    delta_column = "delta_mins",
    avg_delta = 10,
    conn_cnt = 5,
    span_avg = 15,
    variance_per = 10,
    minimum_likelihood = 0.70,
    gold_loc = "/tmp/gold/beacon/dbscan_var",
    cli_return = False
)
print(gold_file)
get_dns(**kwargs)¶
Look up a DNS record for an IP.
Request Syntax¶
import beacon
request = beacon.get_dns(
    ip = "string"
)
Parameters¶
- ip (list) --[REQUIRED]
 IP for which you want to look up a DNS entry.
 Default = None
Returns¶
DNS Value (string)
Example Run¶
import beacon
dns = beacon.get_dns(
    ip = "127.0.0.1"
)
print(dns)
packet(**kwargs)¶
Run a Beacon search by packet size uniqueness.
Request Syntax¶
import beacon
request = beacon.packet(
    delta_file = "string",
    delta_column = "string",
    avg_delta = int,
    conn_cnt = int,
    min_unique_percent = int,
    gold_loc = "string",
    cli_return = True|False,
    overwrite = True|False,
    verbose = True|False
)
Parameters¶
- 
delta_file (string) -- [REQUIRED] 
 Source delta file.
- 
delta_column (string) --[REQUIRED] 
 Source delta column that you want to search.
- 
avg_delta (int) --[REQUIRED] 
 Average delta time to include in the search using your delta column. Less than or equal (<=).
- 
conn_cnt (int) 
 Total connection count for a group. Greater than or equal (>=).
 Default = 5
- 
min_unique_percent (int) 
 Lowest packet uniqueness as a percentage. For instance, if you have 10 connections with 9 of them being the same packet size, your unique package size is 10%.
 Default = 5
- 
threshold (int) 
 The time threshold in minutes for determining connection groups.
 Default = 60
- 
gold_loc (string) 
 Results file location. Blank string ("") for no result file
 Default = ""
- 
cli_return (boolean) 
 Return Command Line Interface (CLI) results. Will display as a print statement and not a return variable.
 Default = True
- 
overwrite (boolean) 
 Overwrite existing files. True or False (case-sensitive).
 Default = False
- 
verbose (boolean) 
 Verbose logging. True or False (case-sensitive).
 Default = False
Returns¶
Gold file (string)
Example Run¶
import beacon
gold_file = beacon.packet(
    delta_file = "/tmp/delta/delta_file.parquet",
    delta_column = "delta_mins",
    avg_delta = 15,
    conn_cnt = 5,
    min_unique_percent = 5,
    gold_loc = "/tmp/gold/beacon/dbscan",
    cli_return = False,
    overwrite = False,
    verbose = False
)
print(gold_file)
ingest.py¶
Below are the steps for loading the ingest.py module. Ensure that you have the __pycache__ directory in the same location as the ingest.py file.
import ingest
Any changes done to the ingest.py module within JuypterLab will require a reload of the ingest.py module. Use the code below to reload the ingest.py module.
import importlib
imported_module = importlib.import_module("ingest")
importlib.reload(imported_module)
import ingest
These are the available methods:
- add_dns()
- build_bronze_layer()
- build_delta_files()
- build_filter_files()
- build_null_files()
- build_raw()
- convert_parquet_to_csv()
- download_s3_folder()
- filter_dataframe()
- get_latest_file()
- unzip()
add_dns(**kwargs)¶
Add source and destination DNS entries to file(s) based upon a whitelist Parquet file or Pandas DataFrame. Both the source and destination files need to be in Parquet format.
Request Syntax¶
import ingest
request = ingest.add_dns(
    src_file = "string",
    dest_loc = "string",
    file_type = "string",
    dns_file = "string"
    dns_df = Pandas_DataFrame,
    verbose = True|False
)
Parameters¶
- 
src_file (string) --[REQUIRED] 
 Source folder or file location.
- 
dest_loc (string) --[REQUIRED] 
 Destination folder or file location.
- 
file_type (string) --[REQUIRED] 
 Destination file type (CSV or Parquet).
 Parquet format recommended!
- 
dns_file (string) 
 Source DNS lookup file. A blank string is the default. The lookup file must be in Parquet format.
 Default = ""
- 
dns_df (Pandas DataFrame) 
 Source DNS Pandas DataFrame. A blank string is the default.
 Default = ""
- 
verbose (boolean) 
 Verbose logging. True or False (case-sensitive).
 Default = False
Returns¶
None
Example Run¶
import ingest
# Folder Location
ingest.add_dns(
    src_file = "/tmp/source_files",
    dest_loc = "/tmp/dest_files",
    dns_file = "/tmp/whitelist/whitelist_ip.parquet"
)
# Single File
ingest.add_dns(
    src_file = "/tmp/source_files/source_file.parquet",
    dest_loc = "/tmp/dest_files",
    dns_file = "/tmp/whitelist/whitelist_ip.parquet"
)
build_bronze_layer(**kwargs)¶
Create a bronze data layer for a source folder location. Bronze data will contain TCP data only and will include both source and destination DNS.
Request Syntax¶
import ingest
request = ingest.build_bronze_layer(
    src_loc = "string",
    bronze_loc = "string",
    dns_file = "string",
    overwrite = True|False,
    verbose = True|False
)
Parameters¶
- 
src_file (string) --[REQUIRED] 
 Raw source folder location.
- 
bronze_loc (string) --[REQUIRED] 
 Bronze layer folder location. The destination location for all bronze files.
- 
dns_file (string) 
 Source DNS lookup file. The lookup file must be in Parquet format.
- 
overwrite (boolean) 
 Overwrite existing files. True or False (case-sensitive).
 Default = False
- 
verbose (boolean) 
 Verbose logging. True or False (case-sensitive).
 Default = False
Returns¶
None
Example Run¶
import ingest
ingest.build_bronze_layer(
    src_loc="/tmp/source/",
    bronze_loc="/tmp/bronze/zeek/raw/parquet/2022/04/22",
    dns_file = "/tmp/whitelist/whitelist_ip.parquet"
    )
build_delta_files(**kwargs)¶
Create a delta file from a Parquet folder location or file. Current units of measurement are milliseconds and minutes. Source and destination files must be in Parquet format. 
Note
Destination files are named delta_epochtime.parquet.
Request Syntax¶
import ingest
request = ingest.build_delta_files(
    src_loc = "string",
    delta_file_loc = "string",
    delta_file_type = "string",
    overwrite = True|False,
    )
Parameters¶
- 
src_file (string) --[REQUIRED] 
 Source folder or file location.
- 
delta_file_loc (string) --[REQUIRED] 
 Destination folder or file location for delta files.
- 
delta_file_type (string) 
 Destination file type (CSV or Parquet).
 Parquet format is recommended!
 Default = parquet
- 
overwrite (boolean) 
 Overwrite existing files. True or False (case-sensitive).
 Default = False
Returns¶
None
Example Run¶
import ingest
ingest.build_delta_files(
    src_loc = "/tmp/filtered/",
    delta_file_loc = "/tmp/delta"
    )
build_filter_files(**kwargs)¶
Create filtered files by searching for Ports, Source/Destination IPs or DNS entries. A default metadata.json will be created for historical filter identification purposes.
Request Syntax¶
import ingest
request = ingest.build_filter_files(
    src_loc = "string",
    dest_file = "string",
    port_filter = [ int, ],
    port_exclude = True|False,
    src_file = [ "string", ],
    src_exclude = True|False,
    dest_filter = [ "string", ],
    dest_exclude = True|False,
    s_dns_filter = [ "string", ],
    s_dns_exclude = True|False,
    d_dns_filter = [ "string", ],
    d_dns_exclude = True|False,
    match_filter = [ "string", ],
    match_exclude = True|False,
    file_type = "string",
    overwrite = True|False,
    verbose = True|False
    )
Parameters¶
- 
src_file (string) --[REQUIRED] 
 Source folder location.
- 
dest_file (string) 
 Destination file location for filters.
 Use a unique folder name at the end to identify your filter - a data folder will be appended automatically.
 Default = ""
- 
port_filter (list) 
 Ports that you want to filter by, in a list format. Inclusive results only.
 Default = None
- 
port_exclude (boolean) 
 Exclusive or inclusive search on port_filter values. True or False (case-sensitive).
 Exclusive = True (not in)
 Inclusive = False (in)
 Default = False
- 
src_filter (list) 
 Source IP that you want to filter by, in a list format.
 Default search will look for items equal to match.
 Use of a wildcard (*) will perform a wildcard search.
 Default = None
- 
src_exclude (boolean) 
 Exclusive or inclusive search on src_filter values. True or False (case-sensitive).
 Exclusive = True (not in)
 Inclusive = False (in)
 Default = True
- 
dest_filter (list) 
 Destination DNS that you want to filter by, in a list format.
 Default search will look for items equal to match.
 Use of a wildcard (*) will perform a wildcard search.
 Default = None
- 
dest_exclude (boolean) 
 Exclusive or inclusive search on dest_filter values. True or False (case-sensitive).
 Exclusive = True (not in)
 Inclusive = False (in)
 Default = True
- 
s_dns_filter (list) 
 Source DNS that you want to filter by, in a list format.
 Default search will look for items equal to match.
 Use of a wildcard (*) will perform a wildcard search.
 Default = None
- 
s_dns_exclude (boolean) 
 Exclusive or inclusive search on s_dns_filter values. True or False (case-sensitive).
 Exclusive = True (not in)
 Inclusive = False (in)
 Default = True
- 
d_dns_filter (list) 
 Destination DNS that you want to filter by, in a list format.
 Default search will look for items equal to match.
 Use of a wildcard (*) will perform a wildcard search.
 Default = None
- 
d_dns_exclude (boolean) 
 Exclusive or inclusive search on d_dns_filter values. True or False (case-sensitive).
 Exclusive = True (not in)
 Inclusive = False (in)
 Default = True
- 
match_filter (list) 
 Source and Destination DNS that you want to filter by, in a list format.
 Default search will look for items equal to match.
 Use of a wildcard (*) will perform a wildcard search.
 In order to match, objects must be in both source and destination.
 Default = None
- 
match_exclude (boolean) 
 Exclusive or inclusive search on match_filter values. True or False (case-sensitive).
 Exclusive = True (not in)
 Inclusive = False (in)
 Default = True
- 
file_type (string) 
 Destination file types. Parquet or CSV.
 Parquet format is recommended!
 Default = parquet
- 
overwrite (boolean) 
 Overwrite existing location. True or False (case-sensitive).
 Default = False
- 
verbose (boolean) 
 Verbose logging. True or False (case-sensitive).
 Default = False
Returns¶
None
Example Run¶
import ingest
# NO INCLUDE FILE
# ONLY PORTS 80 & 443 WILL BE INCLUDED.
# EXAMPLE IPS WILL BE REMOVED.
ingest.build_filter_files(
    src_loc = "/tmp/source",
    dest_file = "/tmp/dest/filtered/test_filter"
    port_filter = [80, 443],
    src_filter = ["127.0.0.1", "9.9.9.9"],
    dest_filter = ["127.0.0.1", "9.9.9.9"])
build_null_files(**kwargs)¶
Create filter files with NULL DNS entries.
Request Syntax¶
import ingest
request = ingest.build_null_files(
    src_loc = "string",
    dest_loc = "string",
    overwrite = True|False,
    file_type = "string"
    )
Parameters¶
- 
src_file (string) --[REQUIRED] 
 Source folder or file location.
- 
dest_loc (string) --[REQUIRED] 
 Destination folder location.
- 
overwrite (boolean) 
 Overwrite existing location. True or False (case-sensitive).
 Default = False
- 
file_type (string) 
 Destination file types. Parquet or CSV.
 Parquet format is recommended!
 Default = parquet
Returns¶
None
Example Run¶
import ingest
ingest.build_null_files(
    src_loc = "/tmp/source",
    dest_loc = "/tmp/filtered/no_match")
build_raw(**kwargs)¶
Build initial Parquet file from raw JSON Zeek file. To be used only for single files. Use build_bronze_layer() for folder-level processing.
Request Syntax¶
import ingest
ingest.build_raw(
    src_loc = "string",
    dest_loc = "string",
    overwrite = True|False,
    verbose = True|False
    )
Parameters¶
- 
src_file (string) --[REQUIRED] 
 Source file.
- 
dest_parquet_file (string) --[REQUIRED] 
 Destination Parquet file.
- 
overwrite (boolean) 
 Overwrite existing files. True or False (case-sensitive).
 Default = False
- 
verbose (boolean) 
 Verbose logging. True or False (case-sensitive).
 Default = False
Returns¶
None
Example Run¶
import ingest
ingest.build_raw(
    src_loc = "/tmp/source/test.json",
    dest_loc = "/tmp/raw/test.parquet"
    )
convert_parquet_to_csv(**kwargs)¶
Create a CSV file from a Parquet file.
Request Syntax¶
import ingest
request = ingest.convert_parquet_to_csv(
    par_file = "string",
    csv_file = "string"
    )
Parameters¶
- 
par_file (string) --[REQUIRED] 
 Source Parquet file.
- 
csv_file (string) --[REQUIRED] 
 Destination CSV file.
Returns¶
None
Example Run¶
import ingest
ingest.convert_parquet_to_csv(
    par_file = "/tmp/source/test.parquet",
    csv_file = "/tmp/dest/test.csv"
    )
download_s3_folder(**kwargs)¶
Download an AWS S3 folder to a local folder. Must have an AWS CLI Profile configured.
Example Run¶
import ingest
request = ingest.download_s3_folder(
    s3_loc = "string",
    csv_file = "string",
    profile = "string"
    )
Parameters¶
- 
s3_loc (string) --[REQUIRED] 
 S3 folder location.
- 
local_dest (string) --[REQUIRED] 
 Local destination folder.
- 
profile (string) 
 AWS CLI Profile name.
 Default = default
Returns¶
None
Example Run¶
import ingest
ingest.download_s3_folder(
    s3_loc = "s3://bucket/foldername",
    csv_file = "/tmp/bucketname/foldername")
filter_dataframe(**kwargs)¶
Filter a Zeek Pandas Dataframe for matching values. Returns a Pandas Dataframe.
Request Syntax¶
import ingest
request = ingest.filter_dataframe(
    df = Pandas_DataFrame,
    filter_vals = [ "string", ],
    filter_column = "string",
    filter_type = "string",
    ret_org = True|False,
    verbose = True|False
    )
Parameters¶
- 
df (pandas dataframe) --[REQUIRED] 
 Pandas Dataframe.
- 
filter_vals (list) --[REQUIRED] 
 Values that you want to filter by, in a list format.String values search types: 
 Default search will look for items equal to match.
 Use of a wildcard (*) will perform a wildcard search.
- 
filter_column (string) --[REQUIRED] 
 Pandas DataFrame column that you want to search by. Case sensitive.Valid Options 
 * community_id
 * conn_state
 * duration
 * history
 * id.orig_h
 * id.orig_p
 * id.resp_h
 * id.resp_p
 * local_orig
 * local_resp
 * missed_bytes
 * orig_bytes
 * orig_ip_bytes
 * orig_pkts
 * proto
 * resp_bytes
 * resp_ip_bytes
 * resp_pkts
 * service
 * ts
 * uid
- 
filter_type (string) --[REQUIRED] 
 The type of filter that you are using.
 Valid Options
 _ int (integer)
 _ string (string)
 * match (matching Source and Destination DNS values)
- 
ret_org (boolean) 
 Return original dataframe if no results gathered. True or False (case-sensitive).
 Default = False
- 
verbose (boolean) 
 Verbose logging. True or False (case-sensitive).
 Default = False
Returns¶
Pandas Dataframe
Example Run¶
import ingest
import pandas as pd
src_df = pd.read_parquet("/tmp/dest/test.parquet")
new_df = ingest.filter_dataframe(
    df = src_df,
    filter_vals = ["127.0.0.1", "localhost"],
    filter_column = "id.orig_h",
    filter_type = "string"
    )
print(new_df)
get_latest_file(**kwargs)¶
Get the most recent file from a folder location. Method is used to grab the latest delta or gold file. Returns a full path.
Request Syntax¶
import ingest
request = ingest.get_latest_file(
    folder_loc = "string",
    file_type = "string"
    )
Parameters¶
- 
folder_loc (string) --[REQUIRED] 
 Source folder location.
- 
file_type (string) 
 Destination file type that you want to search for. Parquet or CSV.
 Parquet format is recommended!
 Default = parquet
Example Run¶
import ingest
# PULL LATEST DELTA FILE
max_file = ingest.filter_dataframe(
    folder_loc = "/tmp/delta/"
    )
print(max_file)
unzip(**kwargs)¶
Unzip a raw source file. Only tar files are available for unzipping.
Request Syntax¶
import ingest
request = ingest.unzip(
    zip_file = "string",
    dest_loc = "string",
    file_type = "string"
    )
Parameters¶
- 
zip_file (string) --[REQUIRED] 
 Source zip file.
- 
dest_loc (string) --[REQUIRED] 
 Destination folder where the zip file will be extracted.
 Parquet format is recommended!
 Default = parquet
- 
dest_loc (string) 
 Zip file type.
 Currently the only valid option is tar
 Valid Options
 * tar
 Default = tar
import ingest
ingest.unzip(
    zip_file = "/tmp/aw_data_file.tar",
    dest_loc = "/tmp/raw/data"
    )