Batch Processing¶
Process large PCAP datasets efficiently.
Overview¶
When processing multiple PCAP files or large datasets: - Use parallel processing for multiple files - Use streaming for memory efficiency - Use the fast DPKT backend for speed
Processing Multiple Files¶
Sequential Processing¶
Python
import joyfuljay as jj
import pandas as pd
from pathlib import Path
config = jj.Config(features=["timing", "tls"])
pipeline = jj.Pipeline(config)
all_data = []
pcap_dir = Path("./captures")
for pcap_file in pcap_dir.glob("*.pcap"):
print(f"Processing {pcap_file.name}...")
df = pipeline.process_pcap(str(pcap_file))
df["source_file"] = pcap_file.name
all_data.append(df)
combined = pd.concat(all_data, ignore_index=True)
combined.to_csv("all_features.csv", index=False)
print(f"Total flows: {len(combined)}")
Parallel Processing¶
Python
import joyfuljay as jj
from pathlib import Path
config = jj.Config(features=["timing", "tls"])
pipeline = jj.Pipeline(config)
# Get all PCAP files
pcap_files = list(Path("./captures").glob("*.pcap"))
# Process in parallel (uses multiple CPU cores)
df = pipeline.process_pcaps_batch(
[str(f) for f in pcap_files],
num_workers=4, # Number of parallel workers
)
df.to_csv("all_features.csv", index=False)
print(f"Processed {len(pcap_files)} files, {len(df)} flows")
Memory-Efficient Processing¶
Streaming to File¶
For large files that don't fit in memory:
Python
import joyfuljay as jj
from joyfuljay.output import StreamingWriter
config = jj.Config(features=["timing", "tls"])
pipeline = jj.Pipeline(config)
# Write directly to CSV without loading all data into memory
with StreamingWriter("output.csv", format="csv") as writer:
for features in pipeline.iter_features("large_capture.pcap"):
writer.write(features)
print("Processing complete")
Streaming Multiple Files¶
Python
import joyfuljay as jj
from joyfuljay.output import StreamingWriter
from pathlib import Path
config = jj.Config(features=["timing", "tls"])
pipeline = jj.Pipeline(config)
pcap_files = list(Path("./captures").glob("*.pcap"))
with StreamingWriter("all_features.csv", format="csv") as writer:
for pcap_file in pcap_files:
print(f"Processing {pcap_file.name}...")
for features in pipeline.iter_features(str(pcap_file)):
features["source_file"] = pcap_file.name
writer.write(features)
print(f"Processed {len(pcap_files)} files")
Fast Processing with DPKT¶
The DPKT backend is 10x faster than Scapy:
Python
import joyfuljay as jj
from joyfuljay.capture import DpktBackend
# Use fast backend explicitly
config = jj.Config(
features=["timing", "size"],
capture_backend="dpkt", # 10x faster
)
pipeline = jj.Pipeline(config)
df = pipeline.process_pcap("large_capture.pcap")
Database Output¶
SQLite¶
Python
import joyfuljay as jj
from joyfuljay.output import DatabaseWriter
from pathlib import Path
config = jj.Config(features=["timing", "tls"])
pipeline = jj.Pipeline(config)
with DatabaseWriter("sqlite:///features.db", table="flows") as writer:
for pcap_file in Path("./captures").glob("*.pcap"):
for features in pipeline.iter_features(str(pcap_file)):
features["source_file"] = pcap_file.name
writer.write(features)
print("Data written to features.db")
PostgreSQL¶
Python
from joyfuljay.output import DatabaseWriter
connection_string = "postgresql://user:password@localhost/traffic_db"
with DatabaseWriter(connection_string, table="network_flows") as writer:
for features in pipeline.iter_features("capture.pcap"):
writer.write(features)
Parquet Output¶
Parquet is efficient for large datasets:
Python
import joyfuljay as jj
import pandas as pd
config = jj.Config(features=["timing", "tls"])
pipeline = jj.Pipeline(config)
df = pipeline.process_pcap("capture.pcap")
df.to_parquet("features.parquet", index=False)
# Read back efficiently
df_loaded = pd.read_parquet("features.parquet")
Streaming to Parquet¶
Python
import joyfuljay as jj
from joyfuljay.output import StreamingWriter
config = jj.Config(features=["timing", "tls"])
pipeline = jj.Pipeline(config)
with StreamingWriter("features.parquet", format="parquet") as writer:
for features in pipeline.iter_features("large_capture.pcap"):
writer.write(features)
Progress Monitoring¶
With Rich Progress Bar¶
Python
import joyfuljay as jj
from joyfuljay.utils import create_progress
from pathlib import Path
config = jj.Config(features=["timing", "tls"])
pipeline = jj.Pipeline(config)
pcap_files = list(Path("./captures").glob("*.pcap"))
with create_progress() as progress:
task = progress.add_task("Processing PCAPs", total=len(pcap_files))
all_data = []
for pcap_file in pcap_files:
df = pipeline.process_pcap(str(pcap_file))
all_data.append(df)
progress.advance(task)
Simple Progress¶
Python
import joyfuljay as jj
from joyfuljay.utils import SimpleProgress
from pathlib import Path
config = jj.Config(features=["timing", "tls"])
pipeline = jj.Pipeline(config)
pcap_files = list(Path("./captures").glob("*.pcap"))
progress = SimpleProgress(total=len(pcap_files), description="Processing")
for pcap_file in pcap_files:
df = pipeline.process_pcap(str(pcap_file))
progress.update(1)
progress.close()
Sampling for Large Datasets¶
Process a subset of packets:
Python
import joyfuljay as jj
config = jj.Config(
features=["timing", "tls"],
sampling_rate=0.1, # Process 10% of packets
)
pipeline = jj.Pipeline(config)
df = pipeline.process_pcap("very_large_capture.pcap")
Complete Batch Processing Script¶
Python
#!/usr/bin/env python3
"""Batch process PCAP files with progress and error handling."""
import argparse
import sys
from pathlib import Path
import joyfuljay as jj
from joyfuljay.output import StreamingWriter
from joyfuljay.utils import create_progress, is_rich_available
def main():
parser = argparse.ArgumentParser(description="Batch PCAP processor")
parser.add_argument("input_dir", help="Directory containing PCAP files")
parser.add_argument("-o", "--output", default="features.csv", help="Output file")
parser.add_argument("-f", "--format", default="csv", choices=["csv", "parquet", "json"])
parser.add_argument("-w", "--workers", type=int, default=1, help="Parallel workers")
parser.add_argument("--features", nargs="+", default=["timing", "tls"])
args = parser.parse_args()
input_dir = Path(args.input_dir)
if not input_dir.exists():
print(f"Error: {input_dir} does not exist")
sys.exit(1)
pcap_files = list(input_dir.glob("**/*.pcap")) + list(input_dir.glob("**/*.pcapng"))
if not pcap_files:
print(f"No PCAP files found in {input_dir}")
sys.exit(1)
print(f"Found {len(pcap_files)} PCAP files")
print(f"Features: {args.features}")
print(f"Output: {args.output} ({args.format})")
config = jj.Config(features=args.features)
pipeline = jj.Pipeline(config)
errors = []
if args.workers > 1:
# Parallel processing
print(f"Processing with {args.workers} workers...")
df = pipeline.process_pcaps_batch(
[str(f) for f in pcap_files],
num_workers=args.workers,
)
if args.format == "csv":
df.to_csv(args.output, index=False)
elif args.format == "parquet":
df.to_parquet(args.output, index=False)
else:
df.to_json(args.output, orient="records", lines=True)
else:
# Streaming processing
with StreamingWriter(args.output, format=args.format) as writer:
if is_rich_available():
with create_progress() as progress:
task = progress.add_task("Processing", total=len(pcap_files))
for pcap_file in pcap_files:
try:
for features in pipeline.iter_features(str(pcap_file)):
features["source_file"] = pcap_file.name
writer.write(features)
except Exception as e:
errors.append((pcap_file.name, str(e)))
progress.advance(task)
else:
for i, pcap_file in enumerate(pcap_files, 1):
print(f"[{i}/{len(pcap_files)}] {pcap_file.name}")
try:
for features in pipeline.iter_features(str(pcap_file)):
features["source_file"] = pcap_file.name
writer.write(features)
except Exception as e:
errors.append((pcap_file.name, str(e)))
print(f"\nProcessing complete: {args.output}")
if errors:
print(f"\nErrors ({len(errors)}):")
for filename, error in errors:
print(f" {filename}: {error}")
if __name__ == "__main__":
main()
CLI Batch Commands¶
Bash
# Process all PCAPs in directory
jj extract ./captures/*.pcap -o features.csv
# With specific features
jj extract ./captures/*.pcap -o features.csv --features timing tls fingerprint
# Watch directory for new files
jj watch ./incoming --output ./processed --format parquet
See Also¶
- CLI Reference - Command-line options
- Configuration - All config options
- API Reference - Pipeline methods