# Source code for daschlab.lightcurves

```
# Copyright 2024 the President and Fellows of Harvard College
# Licensed under the MIT License
"""
DASCH lightcurve data.
The main class provided by this module is `Lightcurve`, instances of which can
be obtained with the `daschlab.Session.lightcurve()` method.
.. _lc-rejection:
Rejecting Data Points
=====================
`Lightcurve` tables come with a uint64 column named ``"reject"`` that is always
initialized with zeros. If you are analyzing a lightcurve and wish to discard
various bad data points from it, you should set their ``"reject"`` values to
something non-zero. Most lightcurve analysis functions will automatically drop
points with non-zero ``"reject"`` values.
.. _lc-filtering:
Filtering and Subsetting
========================
`Lightcurve` objects are instances of the `astropy.timeseries.TimeSeries` class,
which in turn are instances of the `astropy.table.Table` class. In your data
analysis you can use all of the usual methods and functions that these
superclasses provide. In addition, the `Lightcurve` class provides a convenient
set of tools for subsetting and filtering data.
These tools take the form of paired “actions” and “selections”. The syntax for
using them is as follows::
lc = sess.lightcurve(some_local_id)
# Get a subset of the data containing only the detections
# brighter than 13th mag. The action is `keep_only` and
# the selection is `brighter`.
bright = lc.keep_only.brighter(13)
# From that subset, remove points from the "ai" series.
# The action is `drop` and the selection is `series`.
no_ai = bright.drop.series("ai")
# Count remaining points not from narrow-field telescopes.
# The action is `count` and the selection is `narrow`, and
# `not_` is a modifier with the intuitive behavior.
n = no_ai.count.not_.narrow()
These operations can be conveniently chained together. More complex boolean
logic can be implemented with the
`~daschlab.photometry.PhotometrySelector.where` selection and the
`~daschlab.photometry.Photometry.match` action.
In terms of implementation, an item like ``lc.keep_only`` is a property that
returns a helper `~daschlab.photometry.PhotometrySelector` object, which is what
actually implements methods such as
`~daschlab.photometry.PhotometrySelector.brighter()`. The selector object helps
pair the desired action with the desired selection while maintaining a
convenient code syntax.
DASCH lightcurves often contain many nondetections (upper limits), which means
that you may need to be careful about selection logic. For instance::
lc = sess.lightcurve(some_local_id)
n1 = lc.count.brighter(13)
n2 = lc.count.not_.detected_and_fainter(13)
# This will NOT generally hold, because n2 will include
# nondetections while n1 will not.
assert n1 == n2
"""
from typing import List, Optional
from astropy.coordinates import SkyCoord
from astropy.table import Column, MaskedColumn
from astropy.time import Time
from astropy.timeseries import TimeSeries
from astropy import units as u
from astropy.utils.masked import Masked
from bokeh.plotting import figure, show
import numpy as np
from .apiclient import ApiClient
from .photometry import (
_tabulate_phot_data,
_postproc_phot_table,
Photometry,
PhotometrySelector,
)
__all__ = [
"Lightcurve",
"LightcurveSelector",
"merge",
]
[docs]
class LightcurveSelector(PhotometrySelector):
"""
A helper object that supports `Lightcurve` filtering functionality.
Lightcurve selector objects are returned by lightcurve selection "action
verbs" such as `~daschlab.photometry.Photometry.keep_only`. Calling one of
the methods on a selector instance will apply the associated action to the
specified portion of the lightcurve data.
"""
[docs]
def sep_below(
self,
sep_limit: u.Quantity = 20 * u.arcsec,
pos: Optional[SkyCoord] = None,
**kwargs,
) -> "Lightcurve":
"""
Act on rows whose positional separations from the source location
are below the limit.
Parameters
==========
sep_limit : optional `astropy.units.Quantity`, default 20 arcsec
The separation limit. This should be an angular quantity.
pos : optional `astropy.coordinates.SkyCoord` or `None`
The position relative to which the separation is computed. If
unspecified, the lightcurve `~daschlab.lightcurves.Lightcurve.mean_pos()`
is used.
**kwargs
Parameters forwarded to the action.
Returns
=======
Usually, another `Lightcurve`
However, different actions may return different types. For instance,
the `~daschlab.photometry.Photometry.count` action will return an integer.
Examples
========
Create a lightcurve subset containing only detections within 10 arcsec
of the mean source position::
from astropy import units as u
lc = sess.lightcurve(some_local_id)
near = lc.keep_only.sep_below(10 * u.arcsec)
Notes
=====
Nondetection rows do not have an associated position, and will never
match this filter.
"""
if pos is None:
pos = self._table.mean_pos()
seps = pos.separation(self._table["pos"])
m = seps < sep_limit
return self._apply(m, **kwargs)
[docs]
def sep_above(
self,
sep_limit: u.Quantity = 20 * u.arcsec,
pos: Optional[SkyCoord] = None,
**kwargs,
) -> "Lightcurve":
"""
Act on rows whose positional separations from the source location
are above the limit.
Parameters
==========
sep_limit : optional `astropy.units.Quantity`, default 20 arcsec
The separation limit. This should be an angular quantity.
pos : optional `astropy.coordinates.SkyCoord` or `None`
The position relative to which the separation is computed. If
unspecified, the lightcurve `~Lightcurve.mean_pos()`
is used.
**kwargs
Parameters forwarded to the action.
Returns
=======
Usually, another `Lightcurve`
However, different actions may return different types. For instance,
the `~daschlab.photometry.Photometry.count` action will return an integer.
Examples
========
Create a lightcurve subset containing only detections beyond 10 arcsec
from the mean source position::
from astropy import units as u
lc = sess.lightcurve(some_local_id)
near = lc.keep_only.sep_above(10 * u.arcsec)
Notes
=====
Nondetection rows do not have an associated position, and will never
match this filter.
"""
if pos is None:
pos = self._table.mean_pos()
seps = pos.separation(self._table["pos"])
m = seps > sep_limit
return self._apply(m, **kwargs)
[docs]
class Lightcurve(Photometry, TimeSeries):
"""
A DASCH lightcurve data table.
A `Lightcurve` is a specialized form of `astropy.table.Table`. It is a
subclass of both `astropy.timeseries.TimeSeries` and
`daschlab.photometry.Photometry`, equipping it with the methods associated
with both of those specializations. You can use all of the usual methods and
properties made available by these parent classes; items they provide are
not documented here.
The actual data contained in these tables — the columns — are documented
elsewhere, `on the main DASCH website`_.
.. _on the main DASCH website: https://dasch.cfa.harvard.edu/drnext/lightcurve-columns/
You should not construct `Lightcurve` instances directly. Instead, obtain
lightcurves using the `daschlab.Session.lightcurve()` method.
See :ref:`the module-level documentation <lc-filtering>` for a summary of
the filtering and subsetting functionality provided by this class.
Cheat sheet:
- ``time`` is HJD midpoint
- ``magcal_magdep`` is preferred calibrated phot measurement
- legacy plotter error bar is ``magcal_local_rms` * `error_bar_factor``, the
latter being set to match the error bars to the empirical RMS, if this
would shrink the error bars
"""
Selector = LightcurveSelector
[docs]
def mean_pos(self) -> SkyCoord:
"""
Obtain the mean source position from the lightcurve points.
Returns
=======
`astropy.coordinates.SkyCoord`
The mean position of the non-rejected detections
Notes
=====
Average is done in degrees naively, so if your source has RA values of
both 0 and 360, you might get seriously bogus results.
"""
detns = self.keep_only.nonrej_detected(verbose=False)
mra = detns["pos"].ra.deg.mean()
mdec = detns["pos"].dec.deg.mean()
return SkyCoord(mra, mdec, unit=u.deg, frame="icrs")
[docs]
def plot(
self, x_axis: str = "year", callout: Optional[np.ndarray] = None
) -> figure:
"""
Plot the lightcurve using default settings.
Parameters
==========
x_axis : optional `str`, default ``"year"``
The name of the column to use for the X axis. ``"year"`` is a
synthetic column calculated on-the-fly from the ``"time"`` column,
corresponding to the ``jyear`` property of the `astropy.time.Time`
object.
callout : optional `numpy.ndarray`, default `None`
If provided, this should be a boolean array of the same size as the
lightcurve table. Points (both detections and nondetections) for
which the array is true will be visually "called out" in the plot,
highlighted in red.
Returns
=======
`bokeh.plotting.figure`
A plot.
Examples
========
The `~daschlab.photometry.Photometry.match` selector combines
conveniently with the *callout* functionality in constructs like this::
# Call out points in the lightcurve with the "suspected defect" flag
lc.plot(callout=lc.match.any_aflags(AFlags.SUSPECTED_DEFECT))
Notes
=====
The function `bokeh.io.show` (imported as ``bokeh.plotting.show``) is
called on the figure before it is returned, so you don't need to do that
yourself.
"""
# We have to split by `callout` before dropping any rows, becauase it is
# a boolean filter array sized just for us. After that's done, the next
# order of business is to drop rejected rows. Then we can add helper
# columns that we don't want to preserve in `self`.
if callout is None:
main = self.drop.rejected(verbose=False)
called_out = None
callout_detect = None
callout_limit = None
else:
called_out, main = self.split_by.where(callout)
called_out = called_out.drop.rejected(verbose=False)
called_out["year"] = called_out["time"].jyear
main = main.drop.rejected(verbose=False)
main["year"] = main["time"].jyear
main_detect, main_limit = main.split_by.detected()
if callout is not None:
callout_detect, callout_limit = called_out.split_by.detected()
p = figure(
tools="pan,wheel_zoom,box_zoom,reset,hover",
tooltips=[
("LocalID", "@local_id"),
("Mag.", "@magcal_magdep"),
("Lim. Mag.", "@limiting_mag_local"),
("Epoch", "@year{0000.0000}"),
(
"Plate",
"@series@platenum / expLocID @exp_local_id",
),
],
)
if len(main_limit):
p.scatter(
x_axis,
"limiting_mag_local",
marker="inverted_triangle",
fill_color="lightgray",
line_color=None,
source=main_limit.to_pandas(),
)
if callout_limit and len(callout_limit):
p.scatter(
x_axis,
"limiting_mag_local",
marker="inverted_triangle",
fill_color="lightcoral",
line_color=None,
source=callout_limit.to_pandas(),
)
if len(main_detect):
p.scatter(x_axis, "magcal_magdep", source=main_detect.to_pandas())
if callout_detect and len(callout_detect):
p.scatter(
x_axis,
"magcal_magdep",
fill_color="crimson",
line_color=None,
source=callout_detect.to_pandas(),
)
p.y_range.flipped = True
show(p)
return p
def _query_lc(
client: ApiClient,
refcat: str,
gsc_bin_index: int,
ref_number: int,
) -> Lightcurve:
payload = {
"refcat": refcat,
"gsc_bin_index": int(gsc_bin_index), # `json` will reject `np.uint64`
"ref_number": int(ref_number),
}
data = client.invoke("lightcurve", payload)
if not isinstance(data, list):
from . import InteractiveError
raise InteractiveError(f"lightcurve API request failed: {data!r}")
return _postproc_phot_table(Lightcurve, _tabulate_phot_data(data))
# Merging lightcurves
[docs]
def merge(lcs: List[Lightcurve]) -> Lightcurve:
"""
Merge multiple lightcurves under the assumption that they all contain data
for the same source.
Parameters
==========
lcs : list of `Lightcurve`
The lightcurves to be merged. The provided value must be iterable and
indexable.
Returns
=======
merged_lc : `Lightcurve`
The merged lightcurve.
Notes
=====
This function is intended to address the `source splitting`_ issue that can
affect DASCH lightcurves. In some cases, detections of the same astronomical
source end up split across multiple DASCH lightcurves. When this happens,
for each exposure containing a detection of the source, the detection will
be assigned to one of several lightcurves quasi-randomly, depending on
factors like image defects and the local errors of the WCS solution for the
exposure in question.
.. _source splitting: https://dasch.cfa.harvard.edu/drnext/ki/source-splitting/
This function accomplishes the merger by unifying all of the input
lightcurves as keyed by their exposure identifier. For each exposure where
there is only one non-rejected detection among all the inputs, that
detection is chosen as the "best" one for that particular exposure. These
unique detections are used to calculate a mean magnitude for the source. For
cases where more than one input lightcurve has a non-rejected detection for
a given exposure, the detection with the magnitude closest to this mean
value is selected. Finally, for any exposures where *no* lightcurves have a
non-rejected detection, the data from the zero'th input lightcurve are used.
The returned lightcurve has columns matching those of the zero'th input
lightcurve, plus the following:
- ``source_lc_index`` specifies which input lightcurve a particular row's
data came from. This index number is relative to the *lcs* argument, which
does not necessarily have to align with reference catalog source numbers.
- ``source_lc_row`` specifies which row of the input lightcurve was the
source of the output row.
- ``merge_n_hits`` specifies how many lightcurves contained a non-rejected
detection for this exposure. When things are working well, most rows
should have only one hit. You may wish to reject rows where this quantity
is larger than one, since they may indicate that the source flux is split
between multiple SExtractor detections.
"""
lc0 = lcs[0]
colnames = lc0.colnames
# Analyze the first curve to check that we know what to do with all of its columns
T_SKYCOORD, T_TIME, T_COL, T_MASKCOL, T_MASKQUANT, T_QUANT = range(6)
coltypes = np.zeros(len(lc0.colnames), dtype=int)
for idx, cname in enumerate(colnames):
col = lc0[cname]
if isinstance(col, SkyCoord):
coltypes[idx] = T_SKYCOORD
elif isinstance(col, Time):
coltypes[idx] = T_TIME
elif isinstance(col, MaskedColumn):
coltypes[idx] = T_MASKCOL
elif isinstance(col, Column):
coltypes[idx] = T_COL
elif isinstance(col, Masked):
coltypes[idx] = T_MASKQUANT
elif isinstance(col, u.Quantity):
coltypes[idx] = T_QUANT
else:
raise ValueError(
f"unrecognized type of input column `{cname}`: {col.__class__.__name__}"
)
# Group detections in each candidate lightcurve by exposure. Some rows have
# exp_local_id = -1 if the exposure is missing from the session's exposure
# list; we want/need to distinguish these, so we assign them temporary ID's
# that are negative.
dets_by_exp = {}
untabulated_exps = {}
for i_lc, lc in enumerate(lcs):
for i_row, row in enumerate(lc):
expid = row["exp_local_id"]
if expid == -1:
key = (row["series"], row["platenum"], row["mosnum"], row["solnum"])
expid = untabulated_exps.get(key)
if expid is None:
expid = -(len(untabulated_exps) + 2)
untabulated_exps[key] = expid
if not row["magcal_magdep"].mask and row["reject"] == 0:
dets_by_exp.setdefault(expid, []).append((i_lc, i_row))
# Go back to lc0 and figure out which rows have no detections in any
# lightcurve. We could in principle do this for *all* input lightcurves, but
# the marginal gain ought to be tiny at best, unless something very weird is
# happening.
no_dets = []
for i_row, row in enumerate(lc0):
expid = row["exp_local_id"]
if expid == -1:
key = (row["series"], row["platenum"], row["mosnum"], row["solnum"])
expid = untabulated_exps[key]
if dets_by_exp.get(expid) is None:
no_dets.append((expid, i_row))
# We now know how many output rows we're going to have
n_det = len(dets_by_exp)
n_tot = n_det + len(no_dets)
# Allocate buffers that will hold the merged table data
buffers = []
for cname, ctype in zip(colnames, coltypes):
if ctype == T_SKYCOORD:
b = (np.zeros(n_tot), np.zeros(n_tot))
elif ctype == T_TIME:
b = np.zeros(n_tot)
elif ctype == T_COL:
b = np.zeros(n_tot, dtype=lc0[cname].dtype)
elif ctype == T_MASKCOL:
b = (np.zeros(n_tot, dtype=lc0[cname].dtype), np.zeros(n_tot, dtype=bool))
elif ctype == T_MASKQUANT:
b = (np.zeros(n_tot), np.zeros(n_tot, dtype=bool))
elif ctype == T_QUANT:
b = np.zeros(n_tot)
buffers.append(b)
# Calculate a mean magnitude for all unambiguous detections
mags = np.zeros(n_det)
umags = np.zeros(n_det)
idx = 0
for expid, hits in dets_by_exp.items():
if len(hits) == 1:
i_lc, i_row = hits[0]
row = lcs[i_lc][i_row]
mag = row["magcal_magdep"].filled(np.nan).value
umag = row["magcal_local_rms"].filled(np.nan).value
if np.isfinite(umag) and umag > 0:
mags[idx] = mag
umags[idx] = umag
idx += 1
mags = mags[:idx]
umags = umags[:idx]
weights = umags**-2
mean_mag = (mags * weights).sum() / weights.sum()
del mags, umags, weights
# Use the mean mag to decide which point we'll prefer if there are ambiguous
# detections. This is a super naive approach and will probably benefit from
# refinement.
det_expids = np.zeros(n_det, dtype=int)
det_i_lcs = np.zeros(n_det, dtype=np.uint32)
det_i_rows = np.zeros(n_det, dtype=np.uint32)
det_n_hits = np.zeros(n_det, dtype=np.uint32)
for idx, (expid, hits) in enumerate(dets_by_exp.items()):
det_expids[idx] = expid
n_hits = len(hits)
i_lc = i_row = best_absdelta = None
for j_lc, j_row in hits:
row = lcs[j_lc][j_row]
mag = row["magcal_magdep"]
absdelta = np.abs(mag.filled(np.nan).value - mean_mag)
if best_absdelta is None or absdelta < best_absdelta:
best_absdelta = absdelta
i_lc = j_lc
i_row = j_row
det_i_lcs[idx] = i_lc
det_i_rows[idx] = i_row
det_n_hits[idx] = n_hits
# TODO: Preserve info about other candidates?
# Figure out the timestamps of every output row so that we can sort them. If
# a row in `origins` is nonnegative, it represents a detection, and indexes
# into the `det_*` arrays; if it is negative, it is a non-detection, and it
# indexes into no_dets (with an offset).
timestamps = np.zeros(n_tot)
origins = np.zeros(n_tot, dtype=int)
alloced_expids = set()
idx = 0
for i in range(n_det):
expid = det_expids[i]
assert expid not in alloced_expids
alloced_expids.add(expid)
origins[idx] = i
timestamps[idx] = lcs[det_i_lcs[i]][det_i_rows[i]]["time"].jd
idx += 1
for i, (expid, i_row) in enumerate(no_dets):
assert expid not in alloced_expids
alloced_expids.add(expid)
origins[idx] = -(i + 1)
timestamps[idx] = lc0[i_row]["time"].jd
idx += 1
assert idx == n_tot
sort_args = np.argsort(timestamps)
# Now we can populate the column buffers, with appropriate sorting.
source_id = np.zeros(n_tot, dtype=np.uint64)
source_row = np.zeros(n_tot, dtype=np.uint64)
n_hits = np.zeros(n_tot, dtype=np.uint8)
for sort_idx, presort_idx in enumerate(sort_args):
origin = origins[presort_idx]
if origin >= 0:
i_lc = det_i_lcs[origin]
i_row = det_i_rows[origin]
i_n_hits = det_n_hits[origin]
else:
i_lc = 0
i_row = (-origin) - 1
i_n_hits = 0
row = lcs[i_lc][i_row]
source_id[sort_idx] = i_lc
source_row[sort_idx] = i_row
n_hits[sort_idx] = i_n_hits
for cname, ctype, cbuf in zip(colnames, coltypes, buffers):
val = row[cname]
if ctype == T_SKYCOORD:
cbuf[0][sort_idx] = val.ra.deg
cbuf[1][sort_idx] = val.dec.deg
elif ctype == T_TIME:
cbuf[sort_idx] = val.jd
elif ctype == T_COL:
cbuf[sort_idx] = val
elif ctype == T_MASKCOL:
if val is np.ma.masked:
cbuf[1][sort_idx] = True
else:
cbuf[0][sort_idx] = val
cbuf[1][sort_idx] = False
elif ctype == T_MASKQUANT:
cbuf[0][sort_idx] = val.filled(0).value
cbuf[1][sort_idx] = val.mask
elif ctype == T_QUANT:
cbuf[sort_idx] = val.value
# Build the actual table, and we're done.
result = Lightcurve(masked=True, meta=lc0.meta)
for cname, ctype, cbuf in zip(colnames, coltypes, buffers):
if ctype == T_SKYCOORD:
result[cname] = SkyCoord(
ra=cbuf[0] * u.deg, dec=cbuf[1] * u.deg, frame="icrs"
)
elif ctype == T_TIME:
result[cname] = Time(cbuf, format="jd")
elif ctype == T_COL:
result[cname] = cbuf
elif ctype == T_MASKCOL:
result[cname] = np.ma.array(cbuf[0], mask=cbuf[1])
elif ctype == T_MASKQUANT:
result[cname] = Masked(u.Quantity(cbuf[0], lc0[cname].unit), cbuf[1])
elif ctype == T_QUANT:
result[cname] = u.Quantity(cbuf, lc0[cname].unit)
result["source_lc_index"] = source_id
result["source_lc_row"] = source_row
result["merge_n_hits"] = n_hits
result["local_id"] = np.arange(n_tot)
return result
```