X-Git-Url: http://russells-world.com/code/?p=sdr-websocket.git;a=blobdiff_plain;f=sdrninja-server%2Fradio_math.py;fp=sdrninja-server%2Fradio_math.py;h=00612f12057b85ba54893784ce95f2416e3c6fb6;hp=0000000000000000000000000000000000000000;hb=ee61e3f582cb400a66be85d2bd4cb0570ba24b9b;hpb=61664285a6f1544c938a4879e74a921914930de9 diff --git a/sdrninja-server/radio_math.py b/sdrninja-server/radio_math.py new file mode 100644 index 0000000..00612f1 --- /dev/null +++ b/sdrninja-server/radio_math.py @@ -0,0 +1,204 @@ +#! /usr/bin/env python2 + +from itertools import * +import math, numpy + +tau = 2 * math.pi + +class Translate(object): + "set up once, consumes an I/Q stream, returns an I/Q stream" + def __init__(self, num, den): + angles = [(a*tau*num/den) % tau for a in range(den)] + fir = [complex(math.cos(a), math.sin(a)) for a in angles] + self.looping_fir = cycle(fir) + def __call__(self, stream): + return numpy.array([s1*s2 for s1,s2 in izip(self.looping_fir, stream)]) + +class Downsample(object): + "set up once, consumes an I/Q stream, returns an I/Q stream" + # aka lowpass + def __init__(self, scale): + self.scale = scale + self.offset = 0 + self.window = numpy.hanning(scale * 2) + self.window = self.window / sum(self.window) + def __call__(self, stream): + prev_off = self.offset + self.offset = self.scale - ((len(stream) + self.offset) % self.scale) + # bad edges, does 60x more math than needed + stream2 = numpy.convolve(stream, self.window) + return stream2[prev_off::self.scale] + +class DownsampleFloat(object): + # poor quality, but good temporal accuracy + # uses triangle window + def __init__(self, scale): + self.scale = scale + self.offset = 0 + def __call__(self, stream): + # bad edges + # should be using more numpy magic + scale = self.scale + stream2 = [] + for x in numpy.arange(self.offset, len(stream), scale): + frac = x % 1.0 + window = numpy.concatenate((numpy.arange(1-frac, scale), + numpy.arange(int(scale)+frac-1, 0, -1))) + window = window / sum(window) + start = x - len(window)//2 + start = max(start, 0) + start = min(start, len(stream)-len(window)) + c = sum(stream[start : start+len(window)] * window) + stream2.append(c) + return numpy.array(stream2) + +class Upsample(object): + # use minimal power interpolation? + def __init__(self, scale): + self.scale = scale + self.offset = 0 + def __call__(self, stream): + xp = range(len(stream)) + x2 = numpy.arange(self.offset, len(stream), 1.0/self.scale) + self.offset = (len(stream) + self.offset) % self.scale + reals = numpy.interp(x2, xp, stream.real) + imags = numpy.interp(x2, xp, stream.imag) + return numpy.array([complex(*ri) for ri in zip(reals, imags)]) + +class Bandpass(object): + "set up once, consumes an I/Q stream, returns an I/Q stream" + def __init__(self, center_fc, center_bw, pass_fc, pass_bw): + # some errors from dropping the fractional parts of the ratio + # either optimize tuning, scale up, or use floating point + ratio = (center_fc - pass_fc) / center_bw + self.translate = Translate(ratio * 1024, 1024) + self.downsample = Downsample(int(center_bw/pass_bw)) + def __call__(self, stream): + return self.downsample(self.translate(stream)) + +# check license on matplotlib code +# chop out as much slow junk as possible + +# http://mail.scipy.org/pipermail/numpy-discussion/2003-January/014298.html +# http://cleaver.cnx.rice.edu/eggs_directory/obspy.signal/obspy.signal/obspy/signal/freqattributes.py +# /usr/lib/python2.7/site-packages/matplotlib/mlab.py + +def detrend_none(x): + "Return x: no detrending" + return x + +def window_hanning(x): + "return x times the hanning window of len(x)" + return numpy.hanning(len(x))*x + +def psd(x, NFFT=256, Fs=2, Fc=0, detrend=detrend_none, window=window_hanning, + noverlap=0, pad_to=None, sides='default', scale_by_freq=True): + Pxx,freqs = csd(x, x, NFFT, Fs, detrend, window, noverlap, pad_to, sides, + scale_by_freq) + return Pxx.real, freqs + Fc + +def csd(x, y, NFFT=256, Fs=2, detrend=detrend_none, window=window_hanning, + noverlap=0, pad_to=None, sides='default', scale_by_freq=True): + Pxy, freqs, t = _spectral_helper(x, y, NFFT, Fs, detrend, window, + noverlap, pad_to, sides, scale_by_freq) + + if len(Pxy.shape) == 2 and Pxy.shape[1]>1: + Pxy = Pxy.mean(axis=1) + return Pxy, freqs + + +#This is a helper function that implements the commonality between the +#psd, csd, and spectrogram. It is *NOT* meant to be used outside of mlab +def _spectral_helper(x, y, NFFT=256, Fs=2, detrend=detrend_none, + window=window_hanning, noverlap=0, pad_to=None, sides='default', + scale_by_freq=True): + #The checks for if y is x are so that we can use the same function to + #implement the core of psd(), csd(), and spectrogram() without doing + #extra calculations. We return the unaveraged Pxy, freqs, and t. + same_data = y is x + + #Make sure we're dealing with a numpy array. If y and x were the same + #object to start with, keep them that way + x = numpy.asarray(x) + if not same_data: + y = numpy.asarray(y) + else: + y = x + + # zero pad x and y up to NFFT if they are shorter than NFFT + if len(x)