#!/usr/bin/python3 -q
"""This module contains the main functions and the auxiliary/control functions needed to calculate the IBP index.
"""
from scipy import special
import numpy as np
import cdflib
import os
from datetime import datetime
import warnings
import torch
import pandas as pd
#=== UTILS =====================================================================
[docs]
def tiler(*args):
"""Provides mixed combinations of arguments.
Creates copies of the arguments that have length equal to the product of the length of the arguments.
This results in an ordered set of all combinations of the individual values from each of the arguements.
Parameters
----------
*args : array-likes
Each argument is an array-like of possibly different length.
Returns
-------
list of array-likes
A list of ordered combination of the input arguments.
Examples
--------
>>> from ibpmodel import ibpcalc
>>> ibpcalc.tiler([1, 2, 3],['A', 'B'])
[array([1, 1, 2, 2, 3, 3]), array(['A', 'B', 'A', 'B', 'A', 'B'], dtype='<U1')]
>>> ibpcalc.tiler([17,13],[1, 2, 3],['A', 'B'])
[array([17, 17, 17, 17, 17, 17, 13, 13, 13, 13, 13, 13]), \
array([1, 1, 2, 2, 3, 3, 1, 1, 2, 2, 3, 3]), \
array(['A', 'B', 'A', 'B', 'A', 'B', 'A', 'B', 'A', 'B', 'A', 'B'],
dtype='<U1')]
"""
arg_number = len(args)
if arg_number < 1:
return None
elif arg_number == 1:
return np.array(args[0])
lengths = list(map(len,args))
maybe_tile = lambda arg,i: np.tile(arg,np.prod(lengths[:i])) if i > 0 else arg
return [
maybe_tile(
np.repeat(args[i], np.prod(lengths[(i + 1):]))
if i < arg_number - 1 else args[i], i)
for i in range(len(args))
]
[docs]
def tile_aggregate(result,*args,aggregator=np.mean):
"""Compresses tiles and aggregate results on the last axis of tiled ranges
Parameters
----------
result : numpy.array
Array that is aggregated.
*args : list or numpy.array
Along the last element is crompressed.
aggregator : function, optional
Specifies how to aggregate. the defaults is `np.mean`.
Returns
-------
list of numpy.array
last element is the compressed result
Example
-------
>>> from ibpmodel import ibpcalc
>>> ibpcalc.tile_aggregate(np.array([3,2,3,3,2,1,2,1]), [10,12], [20,21,22,24])
(10, 12, array([2.75, 1.5 ]))
>>> ibpcalc.tile_aggregate(np.array([3,2]), np.array([12]), np.array([20,21]))
(12, array([2.5]))
>>> ibpcalc.tile_aggregate(\
np.array([3,2,2,1,4,2,3,3,1,4,8,5,9,6,7,4,5,2,1,6,7,9,5,7]), \
[9,10,11], [20,21], [7,8,5,6])
(array([ 9, 9, 10, 10, 11, 11]), array([20, 21, 20, 21, 20, 21]), \
array([2. , 3. , 4.5, 6.5, 3.5, 7. ]))
"""
if len(args) <= 1:
return aggregator(result)
*preserved, collapesed = args
reshaped= result.reshape(np.prod(list(map(len,preserved))),len(collapesed))
return (*tiler(*preserved), aggregator(reshaped, axis=1))
[docs]
def doyFromMonth(month):
'''Calculate day of year from the 15th of the month
Parameters
----------
month : int
Value as the month in the year, with january being month 1, ``1 <= month <= 12``
Returns
-------
int
Day of year.
Example
-------
>>> from ibpmodel import ibpcalc
>>> ibpcalc.doyFromMonth(7)
196
'''
doy_month = [15, 46, 74, 105, 135, 166, 196, 227, 258, 288, 319, 349]
if isinstance(month, (int,np.integer)) and month in range(1,13):
return doy_month[month-1]
else:
raise ValueError("Value " + str(month) + " out of range or wrong type!")
[docs]
def monthFromDoy(doy):
'''Calculate month from day of the year
Parameters
----------
doy : int
Day of the year, ``1 <= doy <= 365``.
Returns
-------
int
Value as the month in the year, with january being month 1.
Example
-------
>>> from ibpmodel import ibpcalc
>>> ibpcalc.monthFromDoy(275)
10
'''
if isinstance(doy, (int,np.integer)) and doy in range(1,366):
return int(datetime.strptime(str(doy), '%j').month)
else:
raise ValueError("Value " + str(doy) + " out of range or wrong type!")
[docs]
def monthfromString(month_str):
'''Convert abbreviated month name to *int*
Parameters
----------
month_str : str
Abbreviated month name. *['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']*
Returns
-------
int
Value as the month in the year, with january being month 1.
Example
-------
>>> from ibpmodel import ibpcalc
>>> ibpcalc.monthfromString('Mar')
3
'''
months = [ datetime(2000,m,1).strftime("%b") for m in range(1,13) ]
if isinstance(month_str, str) and month_str in months:
return months.index(month_str)+1
else:
raise ValueError("Wrong month string: " + str(month_str))
[docs]
def checkDoyMonth(day_month):
'''Control if input is day of the year (*int*) or abbreviated month name (*str*).
Parameters
----------
day_month : int or str or list
Value to be checked
Returns
-------
doy_out : list of int(s)
list of days of the year
Example
-------
>>> from ibpmodel import ibpcalc
>>> ibpcalc.checkDoyMonth(['Mar',200,1])
[74, 200, 1]
>>> ibpcalc.checkDoyMonth('Dec')
[349]
'''
if not isinstance(day_month, list):
day_month = [day_month]
doy_out = []
for e in day_month:
if isinstance(e, str):
doy_out.append(doyFromMonth(monthfromString(e)))
elif isinstance(e, int) and e in range(1,366):
doy_out.append(e)
else:
raise ValueError("Wrong day_month value: " + str(e))
return doy_out
[docs]
def checkParameter(para, para_range):
'''Check if *para* or element of *para* in *para_range*.
Parameters
----------
para : int or float or list
Value to be checked
para_range : range
searching range
Returns
-------
numpy.array
Numpy.array contains *para*.
Example
-------
>>> from ibpmodel import ibpcalc
>>> ibpcalc.checkParameter(3,range(0,5))
array([3])
>>> ibpcalc.checkParameter([0,1,2,3,4],range(0,5))
array([0, 1, 2, 3, 4])
'''
if isinstance(para, (list,np.ndarray)) and not False in list(map(lambda i: True if para_range[0] <= i <= para_range[-1] else False, para)):
return np.array(para)
elif isinstance(para, (float, int)) and para_range[0] <= para <= para_range[-1]:
return np.array([para])
else:
raise ValueError("Value " + str(para) + " out of range or wrong type!")
[docs]
def checkSolarRadioRange(f107, elecDensity=False):
'''Check if Solar Radio Flux *f107* in range.
Parameters
----------
f107 : int or float or list
Value to be checked
elecDensity : bool, optional
Use of the electron density. The default is False.
Returns
-------
numpy.array
Numpy.array contains *f107*.
'''
if elecDensity:
f107_range = range(60,1000)
else:
f107_range = range(80,1000)
f107 = checkParameter(f107, f107_range)
if max(f107) > 200:
warnings.warn('You are using values for Solar Radio Flux greater than 200sfu. \nPlease note: The model is not designed for F10.7 larger than 200sfu.')
return f107
#=== IBP ROUTINES =============================================================
[docs]
def read_model_file(file=None, elecDensity=False):
"""Load a model or coefficient file into a Python object.
If no file is specified:
- When elecDensity is True, loads a PyTorch model checkpoint (.pth) from the package. (SW_OPER_IBP_CLI_2__00000000T000000_99999999T999999_0101.pth)
- When elecDensity is False, loads a CDF file containing climatological coefficients. (SW_OPER_IBP_CLI_2__00000000T000000_99999999T999999_0101.cdf)
Parameters
----------
file : str, optional
Path to a .pth or .cdf file containing model parameters or climatological data.
If None, a default file bundled with the package will be used based on climatological coefficients.
elecDensity : bool, optional
If True, loads a neural network model for electron density prediction.
If False, loads climatological coefficients from a CDF file.
Default is False.
Returns
-------
BubblePipeline or dict
If elecDensity is True, returns a BubblePipeline object containing the model and scaler.
If elecDensity is False, returns a dictionary with variables extracted from the CDF file.
"""
if file == None:
basepath = os.path.dirname(__file__)
if elecDensity:
file = os.path.abspath(os.path.join(basepath,'SW_OPER_IBP_CLI_2__00000000T000000_99999999T999999_0101.pth'))
elif not elecDensity:
file = os.path.abspath(os.path.join(basepath,'SW_OPER_IBP_CLI_2__00000000T000000_99999999T999999_0101.cdf'))
if elecDensity:
checkpoint = torch.load(file, map_location="cpu", weights_only=False)
model = BubbleMLP(7, checkpoint['config']['model_config']['hidden_layers'], checkpoint['config']['model_config']['dropouts'], checkpoint['config']['model_config']['activation'])
model.load_state_dict(checkpoint["model_state_dict"])
scaler = checkpoint["scaler"]
data = BubblePipeline(model, scaler)
else:
cdf_variables = ['Parameters', 'Intensity', 'Monthly_LT_Shift', 'Density_Estimators', 'Density_Estimator_Lons']
data = {}
with cdflib.CDF(file) as cdf:
for key in cdf_variables:
data[key] = cdf.varget(key)
return data
[docs]
def compute_probability(time, expected_bubbles, expected_lifetime, mu, sigma):
"""Computes the probability of Ionospheric Plasma Bubbles.
Parameters
----------
time : float or ndarray of floats
The local time, can be eiter as an array or not.
expected_bubbles : float or ndarray of floats
The expected amount of bubbles, can be eiter as an array or not.
expected_lifetime : float or ndarray of floats
The expected lifetime of each bubble, can be eiter as an array or not.
mu : float or ndarray of floats
The mean position of the probability distribution in the model, can be eiter as an array or not.
sigma : float or ndarray of floats
The standard deviation of the probability distribution in the model, can be eiter as an array or not.
Returns
-------
float or ndarray of floats
The Ionospheric Bubble Index, value(s) between 0.0 and 1.0, it has the same length as all of the inputs
Note
----
Plasma Bubbles are large-scale depletions compared to the background
ionosphere, occurring in the equatorial F-region, in particular
after sunset. They are assumably driven by Rayleigh-Taylor
instability and already in the past extensively studied by different
techniques, showing occurrence probabilities depending on
evironmental parameters as season, location, local time and sun
activity.
For a climatologic model of these dependencies, extracted from fairly
long time series of distortions in the magnetic field readings of the
LEO satellites CHAMP (2000-2010) and Swarm (since 2013) the function
calculates a probability density.
Examples
--------
>>> import numpy as np
>>> from ibpmodel import ibpcalc
>>> ibpcalc.compute_probability(0.0,0.5,0.5,0.5,0.5)
0.049701856965716384
>>> a = np.arange(24)+0.5
>>> ibpcalc.compute_probability(a,a,a,a,a)
array([0.12259724, 0.32454412, 0.48001002, 0.5996932 , 0.69182957,
0.76275943, 0.81736377, 0.85940013, 0.89176121, 0.91667393,
0.93585262, 0.95061706, 0.96198326, 0.97073336, 0.9774695 ,
0.98265522, 0.98664737, 0.98972067, 0.99208661, 0.99390799,
0.99531015, 0.99638959, 0.99722058, 0.9978603 ])
>>> ibpcalc.compute_probability(a,2*a,1.2,1.3,0.4)
array([2.00069488e-02, 7.81272947e-01, 8.55868576e-01, 6.93670227e-01,
4.83704476e-01, 2.96120013e-01, 1.65026216e-01, 8.64714939e-02,
4.35684741e-02, 2.14048463e-02, 1.03395302e-02, 4.93490050e-03,
2.33423700e-03, 1.09629097e-03, 5.11888048e-04, 2.37840684e-04,
1.10040886e-04, 5.07234746e-05, 2.33043267e-05, 1.06755465e-05,
4.87751438e-06, 2.22316484e-06, 1.01112283e-06, 4.58962618e-07])
"""
expected_bubbles = np.maximum(0,np.array(expected_bubbles, dtype = 'float'))
lambda_1 = np.array(expected_bubbles, dtype = 'float')
lambda_2 = np.array(1 / expected_lifetime, dtype = 'float')
mu = np.array(mu, dtype = 'float')
sigma = np.array(sigma, dtype = 'float')
time = np.array(time, dtype = 'float')
# Transform problem to be purely in terms of error function
# (by Ask Neve Gamby). Based on rewriting the integrant to an
# exponential of a quadric polynomia, and then using coordinate
# transformation and scaling of y axis to go to a basic
# exp(-x**2) form, which can be easily evaluated by an error function
outer_mult = -1. / (2 * np.pi * sigma**2)**0.5
inner_mult = -1. / (2*sigma**2)
x0 = mu - lambda_2 / (2*inner_mult)
y0 = lambda_2 * (mu - time - lambda_2 / (4 * inner_mult))
inner_mult_sqrt = (-inner_mult)**0.5
outer_mult_corrected2 = (np.pi)**0.5 * outer_mult * np.exp(y0) / inner_mult_sqrt
integrated = (0.5 + special.erf(
(time - x0) * inner_mult_sqrt
) * 0.5) * outer_mult_corrected2
return 1.0 - np.exp(lambda_1 * integrated)
[docs]
def fourier_model(coeffients, theta, periode = 365.0):
"""Computes a value based on a model of fourier components up to 2nd degree.
Parameters
----------
coeffients : array-like of numbers
The coefficients of the fourier series up to second degree,
with even numbers representing cosinus and odd sinus.
The shape of `coefficients` must be *(5,*theta.shape)*.
theta : number or ndarray of numbers
The part representing the phase in the fourier expansion.
It is in the same units as the periode.
periode : number, optional
The amount of `theta` need for one periode of the 1st degree
of the fourier expansion. Defaults to 365 (the days in a year).
Returns
-------
number or ndarray of numbers
The shape of this is equivalent to the shape of `theta`.
Examples
--------
>>> from ibpmodel import ibpcalc
>>> import numpy as np
>>> ibpcalc.fourier_model([0,1,-1,0,0],180)
1.0420963481067607
>>> ibpcalc.fourier_model([0,1,-1,-0.5,0.7],1,periode=10)
-0.48044810416758793
>>> ibpcalc.fourier_model([0,1,-1,-0.5,0.7],np.arange(11),10)
array([-0.3 , -0.4804481 , -0.218165 , 0.98765424, 2.0886424 ,
1.7 , -0.03798462, -1.50224404, -1.53249278, -0.70496209,
-0.3 ])
"""
base = theta * (np.pi * 2. / periode)
base2 = base * 2
return (
coeffients[0] +
coeffients[1] * np.sin(base) +
coeffients[2] * np.cos(base) +
coeffients[3] * np.sin(base2) +
coeffients[4] * np.cos(base2)
)
[docs]
def compute_lambda(longitude, params, f107, gosc_val, density, month = 0):
"""Computes the 'lambda' parameter.
Lambda is the intensity, one of the parameters describing
the final probability and modeled as a Poisson process.
Parameters
----------
longitude : float or ndarray of floats
The longitude(s) of the point(s) we calculate lambda for.
params : array-like
Parameter values from the model (CDF-file).
f107 : float or ndarray of floats
The Solar Radio Flux (F10.7 index).
gosc_val : float or ndarray of floats
result of method `fourier_model()`
density : ndarray of floats
Density values from the model (CDF-file).
month : int or ndarray of ints, optional
The number of months since the year started, meaning January would
be 0. The default is 0.
Returns
-------
float or ndarray of floats
Examples
--------
>>> from ibpmodel import ibpcalc
>>> import numpy as np
>>> ibpcalc.compute_lambda(1,[-232.54229262, 4.67324294], 123.4, 17.3, \
np.array([2.1,3.0]))
1084.307658528
>>> params = np.array([-232.54229262, 4.67324294, 1.34695254, -1.31448553, 1.09712955])
>>> densi_once = np.sin(np.arange(360)*np.pi/180) * 3 + 5
>>> ibpcalc.compute_lambda(1,[-232.54229262, 4.67324294], 123.4, 17.3, densi_once)
1788.2556529203105
>>> densi_year = np.array([ (densi_once -3)* np.cos(month*np.pi/6) \
+ 8 for month in range(12)])
>>> ibpcalc.compute_lambda(1,[-232.54229262, 4.67324294], 123.4, 17.3, densi_year, \
month=5)
1851.5944384882494
>>> months = np.array((np.sin(np.arange(20))+1) % 12, dtype='int')
>>> ibpcalc.compute_lambda(1,[-232.54229262, 4.67324294], 123.4, 17.3, \
densi_year[:,months], month=months)
array([2149.6915391, 2149.6915391, 2149.6915391, 2149.6915391,
2149.6915391, 2149.6915391, 2149.6915391, 2149.6915391,
2149.6915391, 2149.6915391, 2149.6915391, 2149.6915391,
2149.6915391, 2149.6915391, 2149.6915391, 2149.6915391,
2149.6915391, 2149.6915391, 2149.6915391, 2149.6915391])
"""
number_of_density = np.prod(density.shape)
selection = np.array(
(longitude + 180.) *
(number_of_density / 360) + month,
dtype = 'int'
)
selected_density = density.reshape(number_of_density)[selection]
return np.maximum(
0,
selected_density * (params[0] + f107 * params[1] + gosc_val)
)
[docs]
def align_time_of_year(day_of_year, month):
"""Guess day of year and month from each other.
Parameters
----------
day_of_year : int or ndarray of ints
Time as the day of year, restricted to ``0 < day_of_year <= 365``,
with the value 0 meaning it should be calculated based on `month`
(which gives the median day of the `month`).
month : int or ndarray of ints
Time as the month in the year, with january being month 1,
so ``0 < month <= 12``.
Returns
-------
day_of_year : int or ndarray of ints
A recalculated possible copy of the `day_of_year` parameter.
month : int or ndarray of ints
A recalculated copy of the `month` parameter.
Examples
--------
>>> from ibpmodel import ibpcalc
>>> import numpy as np
>>> ibpcalc.align_time_of_year(0,6)
(166, 6)
>>> ibpcalc.align_time_of_year(162,3)
(162, 6)
>>> ibpcalc.align_time_of_year(0,np.arange(12)+1)
(array([ 16, 45, 75, 105, 136, 166, 197, 228, 258, 289, 319, 350]), array([ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]))
>>> ibpcalc.align_time_of_year(np.array([0,33,21,17,267,0,115,96,315,0,172,256]),np.arange(12)+1)
(array([ 16, 33, 21, 17, 267, 166, 115, 96, 315, 289, 172, 256]), array([ 1, 2, 1, 1, 9, 6, 4, 4, 11, 10, 6, 9]))
"""
# This is a side-effect free version by Ask Neve Gamby.
#
# 2019-03-05: Now always recalculating 'month' (clumsy type handling?),
# Martin Rother (rother@gfz-potsdam.de).
is_array = isinstance(day_of_year,np.ndarray)
if is_array != isinstance(month,np.ndarray):
#ensure that both now are arrays
if is_array:
month = np.array([month] * len(day_of_year))
else:
day_of_year = np.array([day_of_year] * len(month))
is_array = True
days_in_month = np.array([31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31])
days_at_month = np.cumsum([0, *days_in_month])
if (
not is_array and day_of_year == 0
or is_array and any(day_of_year == 0)
):
day_of_year_guess = days_at_month[:-1] + days_in_month // 2 + (days_in_month % 2 > 0 )
guesses = day_of_year_guess[month - 1]
if is_array:
day_of_year = np.array(day_of_year)
day_of_year[day_of_year == 0] = guesses[day_of_year == 0]
else:
day_of_year = guesses
month = sum(day_of_year > dat for dat in days_at_month)
return (day_of_year, month)
[docs]
def compute_probability_exp(day_of_year, month, longitude, local_time, f107, data):
"""Compute the Ionospheric Bubble Probability, representing the probability of bubble occurrence.
This function supports two modes:
- If `data` is a dictionary (from a CDF file), it uses a climatological model based on Fourier components.
- If `data` is a BubblePipeline object (from a PyTorch .pth file), it uses a trained neural network model.
Parameters
----------
day_of_year : int or ndarray of ints
Time as the day of year, restricted to ``0 < day_of_year <= 356``,
with the value 0 meaning it should be calculated based on month
(which gives the median day of the month).
month : int or ndarray of ints
Time as the month in the year, with january being month 1, ``0 < month <= 12``.
longitude : float or ndarray of floats
The geographical longitude(s), ``-180 <= longitude <= 180``.
local_time : float or ndarray of floats
The local time, can be eiter as an array or not, ``-6.0 <= local_time <= 24``.
f107 : float or ndarray of floats
The Solar Radio Flux (F10.7 index), ``0.0 <= f107 <= 200.0``.
data : dict or BubblePipeline
Either:
- A dictionary containing climatological model parameters from a CDF file.
- A BubblePipeline object wrapping a trained neural network model and scaler.
Returns
-------
float or ndarray of floats
The Ionospheric Bubble Index, value(s) between 0.0 and 1.0,
it has the same length as all of the inputs
Examples
--------
>>> from ibpmodel import ibpcalc
>>> import numpy as np
>>> data = ibpcalc.read_model_file()
>>> ibpcalc.compute_probability_exp(0, 3, 12, 2, 150, data)
0.08662858683870789
>>> ibpcalc.compute_probability_exp(0, 2, 12, 2, 150, data)
0.058342698283923244
Note
----
Rearranged/rewritten and optimized by
Ask Neve Gamby <aknvg@space.dtu.dk>.
The resolution of the function `gosc` is higher than monthly.
If a `day_of_year` is known, the results can be more precise.
It is now possible to calculate with either ndarrays or single values,
for all parameters except data (which has a special structure).
Note that this version is, compared with the initial version
more closely resembling the original 'R' code, much more efficient
due to vectorization.
"""
# CHECKS:
if np.any( (local_time < -6.0) | (local_time > 24.0) ):
raise ValueError("Local time(s) or hour to midnight out of range!")
#if np.any( (f107 < 0.0) | (f107 > 200.0) ):
# raise ValueError("F10.7 parameter(s) out of valid range!")
# Normalize time selected
(day_of_year, month) = align_time_of_year(day_of_year, month)
if isinstance(data, BubblePipeline):
# Now directly use:
return data.predict(pd.DataFrame({
'DOY': np.atleast_1d(day_of_year),
'LT': np.atleast_1d(local_time),
'Longitude': np.atleast_1d(longitude),
'ff': np.atleast_1d(f107)
}))
elif isinstance(data, dict):
# Extraction
density = np.array(data['Density_Estimators'])
shifts = np.array(data['Monthly_LT_Shift' ])
params = np.array(data['Parameters' ])
gosc = np.array(data['Intensity' ])
gosc_val = fourier_model(gosc, day_of_year, 365.0)
# Force LT to be between -6 and 6 (needed by model)
local_time = ((local_time + 12) % 24) - 12
lambda0 = compute_lambda(
longitude,
params,
f107,
gosc_val,
density,
month - 1
)
shifttime = fourier_model(shifts[:, month - 1], longitude, 360.0)
return compute_probability(
local_time,
lambda0,
params[2],
params[3] + shifttime,
params[4]
)
#=== NEURAL NETWORK MODEL =====================================================
[docs]
class BubbleMLP(torch.nn.Module):
"""
A customizable multi-layer perceptron (MLP) model using PyTorch.
Designed for binary classification with optional dropout and batch normalization.
"""
def __init__(self, input_dim, hidden_layers, dropouts, activation_f):
"""
Initializes the MLP architecture.
Args:
input_dim (int): Number of input features.
hidden_layers (list of int): Sizes of hidden layers.
dropouts (list of float): Dropout rates for each hidden layer.
activation_f (torch.nn.Module): Activation function class (e.g., torch.nn.ReLU).
"""
super().__init__()
layers = []
prev_dim = input_dim
# Create a fresh instance of the activation function for each layer
activation_l = [type(activation_f)() for _ in hidden_layers]
# Build hidden layers with batch normalization, activation, and dropout
for h, d, ac in zip(hidden_layers, dropouts, activation_l):
layers.append(torch.nn.Linear(prev_dim, h)) # Fully connected layer
layers.append(torch.nn.BatchNorm1d(h)) # Batch normalization
layers.append(ac) # Activation function
layers.append(torch.nn.Dropout(d)) # Dropout for regularization
prev_dim = h
# Final output layer with sigmoid activation for binary classification
layers.append(torch.nn.Linear(prev_dim, 1))
layers.append(torch.nn.Sigmoid())
# Wrap all layers into a sequential model
self.net = torch.nn.Sequential(*layers)
[docs]
def forward(self, x):
"""
Forward pass through the network.
Args:
x (torch.Tensor): Input tensor of shape (batch_size, input_dim).
Returns:
torch.Tensor: Output probabilities of shape (batch_size, 1).
"""
return self.net(x)
[docs]
class BubblePipeline:
"""
A pipeline for preprocessing input data and generating predictions using BubbleMLP.
"""
def __init__(self, model, scaler=None):
"""
Initializes the pipeline.
Args:
model (BubbleMLP): Trained PyTorch model.
scaler (sklearn scaler, optional): Scaler for normalizing 'ff' feature.
"""
self.model = model
self.scaler = scaler
self.model.eval() # Set model to evaluation mode (disables dropout, etc.)
[docs]
def prepare_features(self, df):
"""
Transforms raw input DataFrame into model-ready features.
Args:
df (pandas.DataFrame): Input data with columns 'DOY', 'LT', 'Longitude', 'ff'.
Returns:
pandas.DataFrame: DataFrame with engineered features.
"""
df = df.copy()
# Encode cyclical features using sine and cosine transformations
df['DOY_sin'] = np.sin(2 * np.pi * df['DOY'].astype(float) / 365)
df['DOY_cos'] = np.cos(2 * np.pi * df['DOY'].astype(float) / 365)
df['LT_sin'] = np.sin(2 * np.pi * df['LT'] / 24)
df['LT_cos'] = np.cos(2 * np.pi * df['LT'] / 24)
df['Lon_sin'] = np.sin(2 * np.pi * (df['Longitude'] + 180) / 360)
df['Lon_cos'] = np.cos(2 * np.pi * (df['Longitude'] + 180) / 360)
# Optionally scale the 'ff' feature
df['ff2'] = df['ff']
if self.scaler is not None:
df['ff2'] = self.scaler.transform(df[['ff']].values)
return df
[docs]
def predict(self, df):
"""
Generates predictions from the input DataFrame.
Args:
df (pandas.DataFrame): Raw input data.
Returns:
numpy.ndarray: Predicted probabilities as a 1D array.
"""
# Select and prepare features
feats = ['DOY_sin', 'DOY_cos', 'LT_sin', 'LT_cos', 'Lon_sin', 'Lon_cos', 'ff2']
X = self.prepare_features(df)[feats].values.astype(np.float32)
# Convert to tensor and make predictions without tracking gradients
with torch.no_grad():
X_tensor = torch.tensor(X, dtype=torch.float32)
preds = self.model(X_tensor).numpy().flatten()
return preds
#===============================================================================
if __name__ == "__main__":
import doctest
if int(np.__version__.split(".")[0]) >= 2:
np.set_printoptions(legacy="1.25")
doctest.testmod(verbose = True)