Neural-Quantum Correlation Simulation Program

Neural-Quantum Correlation Simulation Program

 

"""
Poia Theory Simulation: Neural-Quantum Correlation Experiment

This program simulates the proposed experiment testing for correlations between
neural coherence states and quantum random number generator (QRNG) outputs.
It implements the core mathematical models from the Poia Theory and allows
for parameter exploration and statistical analysis.
"""

import numpy as np
import matplotlib.pyplot as plt
from scipy import signal
import pandas as pd
from sklearn.metrics import mutual_info_score
from tqdm import tqdm
import seaborn as sns
from scipy import stats

# Set random seed for reproducibility
np.random.seed(42)

class PoiaSimulation:
    def __init__(self, params=None):
        """Initialize the simulation with default or custom parameters."""
        # Default parameters
        self.params = {
            # Neural parameters
            'n_channels': 64,           # Number of EEG channels
            'fs': 500,                  # Sampling frequency (Hz)
            'trial_duration': 15,       # Trial duration in seconds
            'n_trials': 200,            # Number of trials
            'n_subjects': 24,           # Number of subjects
            
            # Coherence parameters
            'phi_critical': 0.27,       # Critical integrated information threshold
            'phi_std': 0.04,            # Standard deviation of phi across subjects
            'alpha_band': [8, 12],      # Alpha frequency band (Hz)
            'theta_band': [4, 7],       # Theta frequency band (Hz)
            
            # Quantum parameters
            'lambda_0': 1e-6,           # Base coupling parameter
            'alpha': 1.5,               # Scaling exponent for lambda(Phi)
            'qrng_bias': 0.5,           # Baseline QRNG bias (0.5 = unbiased)
            'amplification': 1e3,       # Amplification factor for quantum effects
            
            # Experimental conditions
            'conditions': ['rest', 'focused', 'open', 'cognitive', 'control'],
            'condition_phi_mod': {      # Phi modification by condition
                'rest': 1.0,            # Baseline
                'focused': 1.2,         # Increased in focused attention
                'open': 1.1,            # Slightly increased in open monitoring
                'cognitive': 0.9,       # Decreased during cognitive load
                'control': 0.0,         # No effect in control condition
            },
            
            # Noise parameters
            'neural_noise': 0.2,        # Neural measurement noise level
            'quantum_noise': 0.01,      # Quantum measurement noise level
            'em_noise': 0.05,           # Environmental EM noise level
        }
        
        # Update with custom parameters if provided
        if params:
            self.params.update(params)
        
        # Initialize data structures
        self.subjects = self._initialize_subjects()
        self.results = {
            'neural_coherence': {},
            'qrng_output': {},
            'correlation': {},
            'mutual_info': {},
            'statistics': {}
        }
    
    def _initialize_subjects(self):
        """Initialize subject parameters with individual variations."""
        subjects = []
        for i in range(self.params['n_subjects']):
            # Generate subject-specific parameters with natural variation
            phi_base = np.random.normal(
                self.params['phi_critical'], 
                self.params['phi_std']
            )
            
            # Ensure phi_base is positive
            phi_base = max(0.1, phi_base)
            
            # Add individual alpha peak frequency variation
            alpha_peak = np.random.normal(10, 0.5)  # Center of alpha band with variation
            
            subject = {
                'id': i,
                'phi_base': phi_base,
                'alpha_peak': alpha_peak,
                'neural_noise_factor': np.random.uniform(0.8, 1.2),  # Individual noise variation
                'quantum_susceptibility': np.random.uniform(0.7, 1.3)  # Individual quantum effect variation
            }
            subjects.append(subject)
        return subjects
    
    def _generate_neural_oscillations(self, duration, condition, subject):
        """Generate simulated EEG data with realistic oscillations."""
        fs = self.params['fs']
        n_samples = int(duration * fs)
        n_channels = self.params['n_channels']
        
        # Time vector
        t = np.arange(n_samples) / fs
        
        # Base oscillations (pink noise as background)
        eeg_data = np.zeros((n_channels, n_samples))
        for ch in range(n_channels):
            # Generate pink noise (1/f)
            pink_noise = self._generate_pink_noise(n_samples)
            eeg_data[ch, :] = pink_noise
        
        # Add oscillatory components with condition-specific modulation
        condition_mod = self.params['condition_phi_mod'][condition]
        
        # Skip oscillations for control condition
        if condition != 'control':
            # Alpha oscillations
            alpha_amp = 1.0 * condition_mod
            if condition == 'focused':
                alpha_amp *= 1.3  # Enhanced alpha in focused attention
            
            alpha_freq = subject['alpha_peak']
            alpha_phase = np.random.uniform(0, 2*np.pi)
            alpha_osc = alpha_amp * np.sin(2 * np.pi * alpha_freq * t + alpha_phase)
            
            # Theta oscillations
            theta_amp = 0.7 * condition_mod
            if condition == 'open':
                theta_amp *= 1.4  # Enhanced theta in open monitoring
            
            theta_freq = np.random.uniform(5, 6)  # Theta frequency with variation
            theta_phase = np.random.uniform(0, 2*np.pi)
            theta_osc = theta_amp * np.sin(2 * np.pi * theta_freq * t + theta_phase)
            
            # Add oscillations to channels with spatial patterns
            for ch in range(n_channels):
                # Spatial distribution - alpha stronger in posterior channels
                alpha_spatial = np.exp(-(ch - 0.7*n_channels)**2 / (0.3*n_channels)**2)
                
                # Spatial distribution - theta stronger in frontal channels
                theta_spatial = np.exp(-(ch - 0.3*n_channels)**2 / (0.3*n_channels)**2)
                
                eeg_data[ch, :] += alpha_spatial * alpha_osc + theta_spatial * theta_osc
        
        # Add random noise
        noise_level = self.params['neural_noise'] * subject['neural_noise_factor']
        noise = noise_level * np.random.randn(n_channels, n_samples)
        eeg_data += noise
        
        return eeg_data
    
    def _generate_pink_noise(self, n_samples):
        """Generate pink noise (1/f spectrum)."""
        white_noise = np.random.randn(n_samples)
        
        # Create pink noise by filtering white noise
        b, a = signal.butter(1, 0.9, 'low')
        pink_noise = signal.lfilter(b, a, white_noise)
        
        # Normalize
        pink_noise = pink_noise / np.std(pink_noise)
        
        return pink_noise
    
    def _calculate_neural_coherence(self, eeg_data, band):
        """Calculate neural coherence in specified frequency band."""
        fs = self.params['fs']
        n_channels = self.params['n_channels']
        
        # Apply bandpass filter
        low, high = band
        b, a = signal.butter(3, [low/(fs/2), high/(fs/2)], 'bandpass')
        filtered_data = np.zeros_like(eeg_data)
        
        for ch in range(n_channels):
            filtered_data[ch, :] = signal.filtfilt(b, a, eeg_data[ch, :])
        
        # Calculate Hilbert transform to get analytic signal
        analytic_signal = signal.hilbert(filtered_data)
        phase = np.angle(analytic_signal)
        
        # Calculate phase synchronization index (PSI) across channels
        n_pairs = 0
        psi_sum = 0
        
        for i in range(n_channels):
            for j in range(i+1, n_channels):
                phase_diff = phase[i, :] - phase[j, :]
                psi = np.abs(np.mean(np.exp(1j * phase_diff)))
                psi_sum += psi
                n_pairs += 1
        
        # Average PSI across all channel pairs
        mean_psi = psi_sum / n_pairs if n_pairs > 0 else 0
        
        return mean_psi
    
    def _calculate_phi(self, eeg_data, subject, condition):
        """Calculate integrated information (Phi) from EEG data."""
        # In a real implementation, this would be a complex calculation
        # Here we use a simplified approach based on coherence and condition
        
        # Calculate coherence in alpha and theta bands
        alpha_coherence = self._calculate_neural_coherence(eeg_data, self.params['alpha_band'])
        theta_coherence = self._calculate_neural_coherence(eeg_data, self.params['theta_band'])
        
        # Weighted combination as a proxy for Phi
        phi = 0.6 * alpha_coherence + 0.4 * theta_coherence
        
        # Apply subject-specific baseline and condition modifier
        condition_mod = self.params['condition_phi_mod'][condition]
        phi = subject['phi_base'] * condition_mod * phi
        
        # Add some random variation
        phi = max(0, phi + np.random.normal(0, 0.05))
        
        return phi
    
    def _calculate_lambda(self, phi):
        """Calculate lambda(Phi) scaling function."""
        phi_critical = self.params['phi_critical']
        
        # If phi is below critical threshold, lambda is zero
        if phi < phi_critical:
            return 0
        
        # Otherwise, calculate lambda based on scaling function
        lambda_0 = self.params['lambda_0']
        alpha = self.params['alpha']
        
        lambda_phi = lambda_0 * ((phi - phi_critical) ** alpha)
        
        return lambda_phi
    
    def _generate_qrng_output(self, n_samples, phi):
        """Generate quantum random number generator output potentially influenced by phi."""
        # Calculate lambda based on phi
        lambda_phi = self._calculate_lambda(phi)
        
        # Apply amplification factor
        effect_size = lambda_phi * self.params['amplification']
        
        # Base probability (0.5 for unbiased)
        p = self.params['qrng_bias']
        
        # Modified probability based on phi
        p_mod = p + effect_size
        
        # Ensure probability is valid
        p_mod = max(0, min(1, p_mod))
        
        # Generate binary outcomes
        qrng_output = np.random.binomial(1, p_mod, n_samples)
        
        # Add measurement noise
        noise = np.random.normal(0, self.params['quantum_noise'], n_samples)
        qrng_output_noisy = qrng_output + noise
        
        return qrng_output_noisy, p_mod
    
    def run_trial(self, subject_idx, condition):
        """Run a single experimental trial."""
        subject = self.subjects[subject_idx]
        duration = self.params['trial_duration']
        fs = self.params['fs']
        n_samples = int(duration * fs)
        
        # Generate neural data
        eeg_data = self._generate_neural_oscillations(duration, condition, subject)
        
        # Calculate Phi
        phi = self._calculate_phi(eeg_data, subject, condition)
        
        # Generate QRNG output
        qrng_output, p_mod = self._generate_qrng_output(n_samples, phi)
        
        # Calculate neural coherence in different bands
        alpha_coherence = self._calculate_neural_coherence(eeg_data, self.params['alpha_band'])
        theta_coherence = self._calculate_neural_coherence(eeg_data, self.params['theta_band'])
        
        # Calculate correlation between neural coherence and QRNG output
        # We'll use a sliding window approach
        window_size = int(1 * fs)  # 1-second windows
        n_windows = n_samples // window_size
        
        coherence_windows = []
        qrng_bias_windows = []
        
        for w in range(n_windows):
            start = w * window_size
            end = start + window_size
            
            # Calculate coherence in this window
            window_eeg = eeg_data[:, start:end]
            window_alpha_coh = self._calculate_neural_coherence(window_eeg, self.params['alpha_band'])
            coherence_windows.append(window_alpha_coh)
            
            # Calculate QRNG bias in this window
            window_qrng = qrng_output[start:end]
            qrng_bias = np.mean(window_qrng)
            qrng_bias_windows.append(qrng_bias)
        
        # Calculate correlation
        correlation = np.corrcoef(coherence_windows, qrng_bias_windows)[0, 1]
        
        # Calculate mutual information
        coherence_bins = np.linspace(min(coherence_windows), max(coherence_windows), 10)
        qrng_bins = np.linspace(min(qrng_bias_windows), max(qrng_bias_windows), 10)
        
        coherence_binned = np.digitize(coherence_windows, coherence_bins)
        qrng_binned = np.digitize(qrng_bias_windows, qrng_bins)
        
        mutual_info = mutual_info_score(coherence_binned, qrng_binned)
        
        return {
            'subject': subject_idx,
            'condition': condition,
            'phi': phi,
            'alpha_coherence': alpha_coherence,
            'theta_coherence': theta_coherence,
            'qrng_bias': p_mod,
            'correlation': correlation,
            'mutual_info': mutual_info,
            'coherence_windows': coherence_windows,
            'qrng_windows': qrng_bias_windows
        }
    
    def run_experiment(self):
        """Run the full experiment across all subjects and conditions."""
        n_subjects = self.params['n_subjects']
        n_trials = self.params['n_trials']
        conditions = self.params['conditions']
        
        all_results = []
        
        for subject_idx in tqdm(range(n_subjects), desc="Processing subjects"):
            for condition in conditions:
                for trial in range(n_trials // len(conditions)):
                    result = self.run_trial(subject_idx, condition)
                    all_results.append(result)
        
        # Convert to DataFrame for easier analysis
        self.results['raw_data'] = pd.DataFrame(all_results)
        
        # Aggregate results by condition
        self._analyze_results()
        
        return self.results
    
    def _analyze_results(self):
        """Analyze experimental results and calculate statistics."""
        df = self.results['raw_data']
        
        # Group by condition
        grouped = df.groupby('condition')
        
        # Calculate mean and std for key metrics
        self.results['summary'] = grouped.agg({
            'phi': ['mean', 'std'],
            'alpha_coherence': ['mean', 'std'],
            'theta_coherence': ['mean', 'std'],
            'correlation': ['mean', 'std'],
            'mutual_info': ['mean', 'std']
        })
        
        # Statistical tests
        conditions = self.params['conditions']
        stats_results = {}
        
        # Compare each condition to control
        for condition in conditions:
            if condition == 'control':
                continue
                
            # T-test for correlation
            condition_corr = df[df['condition'] == condition]['correlation']
            control_corr = df[df['condition'] == 'control']['correlation']
            
            t_stat, p_val = stats.ttest_ind(condition_corr, control_corr)
            
            # Effect size (Cohen's d)
            d = (condition_corr.mean() - control_corr.mean()) / np.sqrt(
                (condition_corr.std()**2 + control_corr.std()**2) / 2
            )
            
            stats_results[condition] = {
                't_statistic': t_stat,
                'p_value': p_val,
                'cohens_d': d
            }
        
        self.results['statistics'] = stats_results
    
    def plot_results(self):
        """Plot the experimental results."""
        df = self.results['raw_data']
        summary = self.results['summary']
        
        # Set up the figure
        fig = plt.figure(figsize=(15, 12))
        
        # 1. Plot Phi by condition
        ax1 = fig.add_subplot(2, 2, 1)
        sns.boxplot(x='condition', y='phi', data=df, ax=ax1)
        ax1.set_title('Integrated Information (Phi) by Condition')
        ax1.set_ylabel('Phi')
        
        # 2. Plot correlation by condition
        ax2 = fig.add_subplot(2, 2, 2)
        sns.boxplot(x='condition', y='correlation', data=df, ax=ax2)
        ax2.set_title('Neural-Quantum Correlation by Condition')
        ax2.set_ylabel('Correlation Coefficient')
        
        # 3. Scatter plot of Phi vs Correlation
        ax3 = fig.add_subplot(2, 2, 3)
        sns.scatterplot(x='phi', y='correlation', hue='condition', data=df, ax=ax3)
        ax3.set_title('Correlation vs Phi')
        ax3.set_xlabel('Phi')
        ax3.set_ylabel('Correlation Coefficient')
        
        # 4. Example time series for one trial
        ax4 = fig.add_subplot(2, 2, 4)
        example = df.iloc[0]
        coherence_windows = example['coherence_windows']
        qrng_windows = example['qrng_windows']
        
        # Normalize for visualization
        coherence_norm = (np.array(coherence_windows) - np.mean(coherence_windows)) / np.std(coherence_windows)
        qrng_norm = (np.array(qrng_windows) - np.mean(qrng_windows)) / np.std(qrng_windows)
        
        ax4.plot(coherence_norm, label='Neural Coherence')
        ax4.plot(qrng_norm, label='QRNG Bias')
        ax4.set_title(f'Example Time Series (Subject {example["subject"]}, {example["condition"]})')
        ax4.set_xlabel('Window')
        ax4.set_ylabel('Normalized Value')
        ax4.legend()
        
        plt.tight_layout()
        
        # Create a second figure for statistical results
        fig2 = plt.figure(figsize=(12, 6))
        
        # Plot effect sizes
        stats_results = self.results['statistics']
        conditions = list(stats_results.keys())
        effect_sizes = [stats_results[c]['cohens_d'] for c in conditions]
        p_values = [stats_results[c]['p_value'] for c in conditions]
        
        ax_stats = fig2.add_subplot(1, 1, 1)
        bars = ax_stats.bar(conditions, effect_sizes)
        
        # Color bars by significance
        for i, p in enumerate(p_values):
            if p < 0.01:
                bars[i].set_color('darkgreen')
            elif p < 0.05:
                bars[i].set_color('green')
            else:
                bars[i].set_color('gray')
        
        ax_stats.axhline(y=0, color='k', linestyle='-', alpha=0.3)
        ax_stats.set_title("Effect Size (Cohen's d) by Condition vs Control")
        ax_stats.set_ylabel("Cohen's d")
        
        # Add p-value annotations
        for i, p in enumerate(p_values):
            if p < 0.001:
                text = "p < 0.001"
            else:
                text = f"p = {p:.3f}"
            ax_stats.annotate(text, (i, effect_sizes[i]), 
                             ha='center', va='bottom' if effect_sizes[i] > 0 else 'top')
        
        plt.tight_layout()
        
        return fig, fig2

    def run_parameter_sweep(self, parameter, values):
        """Run a parameter sweep to test sensitivity."""
        original_value = self.params[parameter]
        results = []
        
        for value in tqdm(values, desc=f"Sweeping {parameter}"):
            # Update parameter
            self.params[parameter] = value
            
            # Run a smaller version of the experiment
            small_n_subjects = min(5, self.params['n_subjects'])
            small_n_trials = min(20, self.params['n_trials'])
            
            temp_subjects = self.subjects[:small_n_subjects]
            temp_n_subjects = self.params['n_subjects']
            temp_n_trials = self.params['n_trials']
            
            self.subjects = self._initialize_subjects()[:small_n_subjects]
            self.params['n_subjects'] = small_n_subjects
            self.params['n_trials'] = small_n_trials
            
            # Run experiment
            self.run_experiment()
            
            # Extract key results
            effect_sizes = [self.results['statistics'][c]['cohens_d'] 
                           for c in self.results['statistics']]
            mean_effect = np.mean(effect_sizes)
            
            p_values = [self.results['statistics'][c]['p_value'] 
                       for c in self.results['statistics']]
            mean_p = np.mean(p_values)
            
            results.append({
                'parameter_value': value,
                'mean_effect_size': mean_effect,
                'mean_p_value': mean_p,
                'significant_conditions': sum([p < 0.05 for p in p_values])
            })
            
            # Restore subjects
            self.subjects = temp_subjects
            self.params['n_subjects'] = temp_n_subjects
            self.params['n_trials'] = temp_n_trials
        
        # Restore original parameter value
        self.params[parameter] = original_value
        
        # Convert to DataFrame
        sweep_results = pd.DataFrame(results)
        
        # Plot results
        fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5))
        
        ax1.plot(sweep_results['parameter_value'], sweep_results['mean_effect_size'], 'o-')
        ax1.set_title(f'Effect Size vs {parameter}')
        ax1.set_xlabel(parameter)
        ax1.set_ylabel("Mean Cohen's d")
        
        ax2.plot(sweep_results['parameter_value'], sweep_results['mean_p_value'], 'o-')
        ax2.set_title(f'p-value vs {parameter}')
        ax2.set_xlabel(parameter)
        ax2.set_ylabel("Mean p-value")
        ax2.axhline(y=0.05, color='r', linestyle='--', alpha=0.7)
        
        plt.tight_layout()
        
        return sweep_results, fig


# Example usage
if __name__ == "__main__":
    print("Initializing Poia Neural-Quantum Correlation Simulation...")
    
    # Create simulation with default parameters
    sim = PoiaSimulation()
    
    print("Running experiment...")
    results = sim.run_experiment()
    
    print("Plotting results...")
    fig1, fig2 = sim.plot_results()
    
    print("\nSummary of results by condition:")
    print(sim.results['summary'])
    
    print("\nStatistical test results:")
    for condition, stats in sim.results['statistics'].items():
        print(f"{condition}: t={stats['t_statistic']:.3f}, p={stats['p_value']:.3f}, d={stats['cohens_d']:.3f}")
    
    print("\nRunning parameter sweep for amplification factor...")
    sweep_values = np.logspace(2, 4, 10)  # 10^2 to 10^4
    sweep_results, sweep_fig = sim.run_parameter_sweep('amplification', sweep_values)
    
    print("Simulation complete!")
    
    # Show plots
    plt.show()