-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathvdif_reader.py
126 lines (90 loc) · 3.2 KB
/
vdif_reader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
__all__ = ["get_spectrum", "get_cal_spectrum"]
# standard library
import re
import time
from datetime import datetime, timedelta
from pathlib import Path
from struct import Struct
from typing import Callable, Pattern
# dependent packages
import numpy as np
# constants
LITTLE_ENDIAN: str = "<"
UINT: str = "I"
SHORT: str = "h"
N_ROWS_VDIF_HEAD: int = 8
N_ROWS_CORR_HEAD: int = 64
N_ROWS_CORR_DATA: int = 512
N_UNITS_PER_SCAN: int = 64
N_BYTES_PER_UNIT: int = 1312
N_BYTES_PER_SCAN: int = 1312 * 64
TIME_PER_SCAN: float = 1e-2 # seconds
TIME_FORMAT: str = "%Y%j%H%M%S"
VDIF_PATTERN: Pattern = re.compile(r"\w+_(\d+)_\d.vdif")
# main features
def get_spectrum(
path: Path,
integ: float = 1.0,
delay: float = 0.0,
chbin: int = 1,
) -> np.ndarray:
spectra = get_spectra(path, integ, delay)
spectrum = integrate_spectra(spectra, chbin)
return spectrum
def get_cal_spectrum(
path: Path,
cal: float = 0.0,
delay: float = 0.0,
chbin: int = 1,
) -> np.ndarray:
while get_elapsed_time_from_start(path, delay) < cal:
time.sleep(0.5)
return get_spectrum(path, cal, delay, chbin)
# sub features
def get_spectra(path: Path, integ: float = 1.0, delay: float = 0.0) -> np.ndarray:
n_scans = int(get_elapsed_time_from_start(path, delay) / TIME_PER_SCAN)
n_integ = int(integ / TIME_PER_SCAN)
n_units = N_UNITS_PER_SCAN * n_integ
n_chans = N_ROWS_CORR_DATA // 2
if n_scans - n_integ < 0:
raise ValueError("Not enough number of scans to integrate")
byte_start = N_BYTES_PER_SCAN * (n_scans - n_integ)
spectra = np.empty([n_units, n_chans], dtype=complex)
with open(path, "rb") as f:
f.seek(byte_start)
for i in range(n_units):
read_vdif_head(f)
read_corr_head(f)
corr_data = read_corr_data(f)
spectra[i] = parse_corr_data(corr_data)
return spectra.reshape([n_integ, N_UNITS_PER_SCAN * n_chans])
def integrate_spectra(spectra: np.ndarray, chbin: int = 1) -> np.ndarray:
spectrum = spectra.mean(0)
return spectrum.reshape([len(spectrum) // chbin, chbin]).mean(1)
def get_elapsed_time_from_start(path: Path, delay: float = 0.0) -> float:
match = VDIF_PATTERN.search(path.name)
if match is None:
raise ValueError("Cannot parse start time from file name.")
t_start = datetime.strptime(match.groups()[0], TIME_FORMAT)
t_now = datetime.utcnow() - timedelta(seconds=delay)
return (t_now - t_start).total_seconds()
# struct readers
def make_binary_reader(n_rows: int, dtype: str) -> Callable:
struct = Struct(LITTLE_ENDIAN + dtype * n_rows)
def reader(f):
return struct.unpack(f.read(struct.size))
return reader
read_vdif_head: Callable = make_binary_reader(N_ROWS_VDIF_HEAD, UINT)
read_corr_head: Callable = make_binary_reader(N_ROWS_CORR_HEAD, UINT)
read_corr_data: Callable = make_binary_reader(N_ROWS_CORR_DATA, SHORT)
# struct parsers
def parse_vdif_head(vdif_head: list):
# not implemented yet
pass
def parse_corr_head(corr_head: list):
# not implemented yet
pass
def parse_corr_data(corr_data: list) -> np.ndarray:
real = np.array(corr_data[0::2])
imag = np.array(corr_data[1::2])
return real + imag * 1j