import os import re import argparse import glob from datetime import datetime, timedelta import pandas as pd import matplotlib.pyplot as plt import seaborn as sns def parse_arguments(): parser = argparse.ArgumentParser(description="Extract trigram mentions and generate a timeline.") parser.add_argument('--search', type=str, required=True, help='REQUIRED: Trigram prefix to search for (e.g., "Strait")') parser.add_argument('-i', '--ignore-case', action='store_true', help='Make the search case-insensitive') parser.add_argument('--dir', type=str, default='./CACHE/', help='Directory containing the ngram files (default: ./CACHE/)') parser.add_argument('--res', type=str, default='day', choices=['10min', '30min', 'hour', 'day', 'week', 'month', 'year'], help='Resolution of the timeline (default: day)') parser.add_argument('--start', type=str, default=None, help='Start date/time in YYYYMMDDHHMM format (default: include all past)') parser.add_argument('--end', type=str, default=None, help='End date/time in YYYYMMDDHHMM format (default: include all future)') return parser.parse_args() def process_files(cache_dir, search_prefix, start_dt=None, end_dt=None, ignore_case=False): records = [] file_pattern = os.path.join(cache_dir, "*.ngrams*.txt") # Regex to extract Date (YYYYMMDD) and Time (HHMMSS) regex = re.compile(r'_(\d{8})_(\d{6})\.') # Pre-lower the search prefix if case-insensitive to save processing time inside the loop if ignore_case: search_prefix = search_prefix.lower() for filepath in glob.glob(file_pattern): filename = os.path.basename(filepath) match = regex.search(filename) if not match: continue date_str, time_str = match.groups() try: file_dt = datetime.strptime(f"{date_str}{time_str}", "%Y%m%d%H%M%S") except ValueError: continue # Optimization: Skip files entirely if they start after our end date if end_dt and file_dt > end_dt: continue with open(filepath, 'r', encoding='utf-8') as f: for line in f: parts = line.strip().split() # Need at least offset, one word, and count if len(parts) < 3: continue try: offset_sec = int(parts[0]) count = int(parts[-1]) trigram = " ".join(parts[1:-1]) except ValueError: continue # Skip lines that don't match the expected format # Handle case sensitivity trigram_to_check = trigram.lower() if ignore_case else trigram if trigram_to_check.startswith(search_prefix): actual_dt = file_dt + timedelta(seconds=offset_sec) # Check time bounds if start_dt and actual_dt < start_dt: continue if end_dt and actual_dt > end_dt: continue records.append({'timestamp': actual_dt, 'count': count}) return records def main(): args = parse_arguments() # Parse Start and End Dates start_dt = None if args.start: try: start_dt = datetime.strptime(args.start, "%Y%m%d%H%M") except ValueError: print("Error: --start must be in YYYYMMDDHHMM format (e.g., 202605010000).") return end_dt = None if args.end: try: end_dt = datetime.strptime(args.end, "%Y%m%d%H%M") except ValueError: print("Error: --end must be in YYYYMMDDHHMM format (e.g., 202605312359).") return print(f"Scanning directory '{args.dir}' for trigrams starting with '{args.search}'...") if args.ignore_case: print("(Case-insensitive mode enabled)") if start_dt or end_dt: print(f"Filtering between {start_dt or 'Beginning'} and {end_dt or 'End'}") records = process_files(args.dir, args.search, start_dt, end_dt, args.ignore_case) if not records: print(f"No records found for '{args.search}' in the given timeframe. Exiting.") return # Convert to DataFrame df = pd.DataFrame(records) df['timestamp'] = pd.to_datetime(df['timestamp']) df.set_index('timestamp', inplace=True) # Map friendly resolution arguments to pandas frequency aliases res_map = { '10min': '10min', '30min': '30min', 'hour': 'h', 'day': 'D', 'week': 'W', 'month': 'MS', 'year': 'YS' } freq = res_map[args.res] print(f"Aggregating data at '{args.res}' resolution...") # Resample and sum counts. fillna(0) drops gaps to baseline zero. aggregated = df.resample(freq).sum().fillna(0) # Export to CSV safe_name = args.search.lower().strip().replace(' ', '_') csv_filename = f"timeline_{safe_name}_{args.res}.csv" aggregated.to_csv(csv_filename) print(f"✓ Saved data to {csv_filename}") # --------------------------------------------------------- # Generate Publication-Ready Chart # --------------------------------------------------------- sns.set_theme(style="whitegrid", context="paper", font_scale=1.2) fig, ax = plt.subplots(figsize=(12, 6), dpi=300) # Plot line and fill area below it ax.plot(aggregated.index, aggregated['count'], color='#1f77b4', linewidth=2, marker='o', markersize=4) ax.fill_between(aggregated.index, aggregated['count'], color='#1f77b4', alpha=0.2) # Titles and labels title_suffix = "" if start_dt or end_dt: start_str = start_dt.strftime('%Y-%m-%d') if start_dt else "Beginning" end_str = end_dt.strftime('%Y-%m-%d') if end_dt else "Present" title_suffix = f"\n({start_str} to {end_str})" case_label = " (Case Insensitive)" if args.ignore_case else "" ax.set_title(f"Mentions of Trigrams Starting with '{args.search}'{case_label}{title_suffix}", fontsize=16, fontweight='bold', pad=15) ax.set_xlabel("Date (UTC)", fontsize=13, fontweight='semibold', labelpad=10) ax.set_ylabel("Mention Count", fontsize=13, fontweight='semibold', labelpad=10) # Aesthetic formatting ax.spines['top'].set_visible(False) ax.spines['right'].set_visible(False) ax.margins(x=0.01) # Date formatting for X-axis fig.autofmt_xdate() plt.tight_layout() # Export to PNG png_filename = f"timeline_{safe_name}_{args.res}.png" plt.savefig(png_filename, bbox_inches='tight') print(f"✓ Saved beautiful chart to {png_filename}") if __name__ == "__main__": main()