Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract avros for a VDS but only include the NEW samples that are in Foxtrot #9105

Draft
wants to merge 9 commits into
base: ah_var_store
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .dockstore.yml
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ workflows:
branches:
- master
- ah_var_store
- vs_1418_ploidy_for_foxtrot_vds
- rc-vs-1576-avro-update
tags:
- /.*/
- name: GvsCallsetStatistics
Expand Down
53 changes: 47 additions & 6 deletions scripts/variantstore/scripts/run_avro_query_for_sample_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import run_avro_query
import utils

## from create_ranges_cohort_extract_data_table import load_sample_names ## TODO can I just import this?!?!

def get_number_of_partitions(dataset_name, project_id):
query_labels_map = {
Expand All @@ -32,9 +33,40 @@ def get_number_of_partitions(dataset_name, project_id):
return math.ceil(max_table_num[0])


def construct_sample_info_avro_queries(call_set_identifier, dataset_name, project_id, avro_prefix):
def load_sample_names(sample_names_to_extract, fq_temp_table_dataset):
schema = [bigquery.SchemaField("sample_name", "STRING", mode="REQUIRED")]
fq_sample_table = f"{project_id}.{dataset_name}.{EXTRACT_SAMPLE_TABLE}"

job_labels = client._default_query_job_config.labels
job_labels["gvs_query_name"] = "load-sample-names"

job_config = bigquery.LoadJobConfig(source_format=bigquery.SourceFormat.CSV, skip_leading_rows=0, schema=schema,
labels=job_labels)

with open(sample_names_to_extract, "rb") as source_file:
job = client.load_table_from_file(source_file, fq_sample_table, job_config=job_config)

job.result() # Waits for the job to complete.

# setting the TTL needs to be done as a second API call
table = bigquery.Table(fq_sample_table, schema=schema)
expiration = datetime.datetime.utcnow() + datetime.timedelta(hours=TEMP_TABLE_TTL_HOURS)
table.expires = expiration
client.update_table(table, ["expires"])

return fq_sample_table


def construct_sample_info_avro_queries(call_set_identifier, dataset_name, project_id, avro_prefix, sample_names_to_extract=None):
num_of_tables = get_number_of_partitions(dataset_name, project_id)

fq_sample_name_table = f"{project_id}.{dataset_name}.sample_info"

# if we have a file of sample names, load it into a temporary table
if sample_names_to_extract:
sample_to_extract_name_table = load_sample_names(sample_names_to_extract, project_id, dataset_name)


for i in range(1, num_of_tables + 1):
file_name = f"*.{i:03}.avro"
id_where_clause = f"sample_id >= {((i -1) * 4000) + 1} AND sample_id <= {i * 4000}"
Expand All @@ -44,7 +76,8 @@ def construct_sample_info_avro_queries(call_set_identifier, dataset_name, projec
uri='{avro_prefix}/sample_mapping/{file_name}', format='AVRO', compression='SNAPPY', overwrite=true) AS
SELECT sample_id, sample_name, '40',
'gs://gcp-public-data--broad-references/hg38/v0/wgs_calling_regions.hg38.noCentromeres.noTelomeres.interval_list' AS intervals_file
FROM `{project_id}.{dataset_name}.sample_info`
FROM `{fq_sample_name_table}`
JOIN `{sample_to_extract_name_table}` USING (sample_name)
WHERE {id_where_clause} AND is_control = false
ORDER BY sample_id"""

Expand All @@ -59,10 +92,18 @@ def construct_sample_info_avro_queries(call_set_identifier, dataset_name, projec
parser.add_argument('--dataset_name',type=str, help='BigQuery dataset name', required=True)
parser.add_argument('--project_id', type=str, help='Google project for the GVS dataset', required=True)
parser.add_argument('--avro_prefix', type=str, help='prefix for the Avro file path', required=True)
parser.add_argument('--sample_names_to_extract', type=str, help='list of samples to extract', required=True)

args = parser.parse_args()

construct_sample_info_avro_queries(args.call_set_identifier,
args.dataset_name,
args.project_id,
args.avro_prefix)
if 'sample_names_to_extract' in args and args.sample_names_to_extract:
construct_sample_info_avro_queries(args.call_set_identifier,
args.dataset_name,
args.project_id,
args.avro_prefix,
args.sample_names_to_extract)
else:
construct_sample_info_avro_queries(args.call_set_identifier,
args.dataset_name,
args.project_id,
args.avro_prefix)
65 changes: 64 additions & 1 deletion scripts/variantstore/wdl/GvsExtractAvroFilesForHail.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ version 1.0

import "GvsUtils.wdl" as Utils


workflow GvsExtractAvroFilesForHail {
input {
Array[String]? cohort_sample_names_array
File? cohort_sample_names ## this would be a nice to have based on a parallel with GvsExtractCohortFromSampleNames
String? git_branch_or_tag
String? git_hash
Boolean go = true
Expand Down Expand Up @@ -93,6 +94,27 @@ workflow GvsExtractAvroFilesForHail {
cloud_sdk_docker = effective_cloud_sdk_docker,
}

# writing the array to a file has to be done in a task
# https://support.terra.bio/hc/en-us/community/posts/360071465631-write-lines-write-map-write-tsv-write-json-fail-when-run-in-a-workflow-rather-than-in-a-task
if (defined(cohort_sample_names_array)) {
call write_array_task {
input:
input_array = select_first([cohort_sample_names_array]),
cloud_sdk_docker = effective_cloud_sdk_docker,
}
}

if (defined(cohort_sample_names_array)) {
File? cohort_sample_names_file = write_array_task.output_file
}

if (defined(cohort_sample_names)) {
File? cohort_sample_names_file = cohort_sample_names
}



#File cohort_sample_names_file = select_first([write_array_task.output_file, cohort_sample_names])

call Utils.IsUsingCompressedReferences {
input:
Expand All @@ -106,6 +128,7 @@ workflow GvsExtractAvroFilesForHail {
scatter (i in range(scatter_width)) {
call ExtractFromSuperpartitionedTables {
input:
sample_names_to_extract = cohort_sample_names_file,
project_id = project_id,
dataset_name = dataset_name,
call_set_identifier = call_set_identifier,
Expand Down Expand Up @@ -310,6 +333,7 @@ task ExtractFromSuperpartitionedTables {
volatile: true
}
input {
File sample_names_to_extract
String project_id
String dataset_name
String avro_sibling
Expand All @@ -320,11 +344,25 @@ task ExtractFromSuperpartitionedTables {
String variants_docker
Boolean use_compressed_references = false
}
String fq_sample_mapping_table = "~{project_id}.~{dataset_name}.sample_info"
String use_sample_names_file = if (defined(sample_names_to_extract)) then 'true' else 'false'
String sample_list_param = if (defined(sample_names_to_extract)) then '--sample_names_to_extract sample_names_file' else '--fq_cohort_sample_names ' + fq_sample_mapping_table

## TODO okay so what I need to do here is:
## 1. put the sample_names_to_extract into a BQ table based on the fq_sample_mapping_table
## (should the above happen when we are splitting all the samples from the sample_info table?)
## 2. run the avro query only with the samples that are in the sample_names_to_extract BQ table
## 3. what happens with the subcohort when there's a sample that is not defined in the info table?!?!


parameter_meta {
avro_sibling: "Cloud path to a file that will be the sibling to the 'avro' 'directory' under which output Avro files will be written."
num_superpartitions: "Total number of superpartitions requiring extraact"
shard_index: "0-based index of this superpartition extract shard"
num_shards: "Count of all superpartition extract shards"
sample_names_to_extract: {
localization_optional: true
}
}

command <<<
Expand All @@ -334,6 +372,13 @@ task ExtractFromSuperpartitionedTables {

avro_prefix="$(dirname ~{avro_sibling})/avro"

echo ~{sample_list_param}

if [ ~{use_sample_names_file} = 'true' ]; then
gsutil cp ~{sample_names_to_extract} sample_names_file

fi

for superpartition in $(seq ~{shard_index + 1} ~{num_shards} ~{num_superpartitions})
do
str_table_index=$(printf "%03d" $superpartition)
Expand Down Expand Up @@ -407,3 +452,21 @@ task ExtractFromSuperpartitionedTables {
noAddress: true
}
}

task write_array_task {
input {
Array[String] input_array
String cloud_sdk_docker
}

command <<< ## yeah I think its weird that this is empty, but it echos the code in GvsExtractCohortFromSampleNames.wdl
>>>

output {
File output_file = write_lines(input_array)
}

runtime {
docker: cloud_sdk_docker
}
}
2 changes: 1 addition & 1 deletion scripts/variantstore/wdl/GvsUtils.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ task GetToolVersions {
# GVS generally uses the smallest `alpine` version of the Google Cloud SDK as it suffices for most tasks, but
# there are a handlful of tasks that require the larger GNU libc-based `slim`.
String cloud_sdk_slim_docker = "gcr.io/google.com/cloudsdktool/cloud-sdk:435.0.0-slim"
String variants_docker = "us-central1-docker.pkg.dev/broad-dsde-methods/gvs/variants:2025-03-03-alpine-3de322e309ea"
String variants_docker = "us-central1-docker.pkg.dev/broad-dsde-methods/gvs/variants:2025-03-05-alpine-c256afd87f68"
String variants_nirvana_docker = "us.gcr.io/broad-dsde-methods/variantstore:nirvana_2022_10_19"
String gatk_docker = "us-central1-docker.pkg.dev/broad-dsde-methods/gvs/gatk:2025-03-05-gatkbase-afa18f7ab47f"
String real_time_genomics_docker = "docker.io/realtimegenomics/rtg-tools:latest"
Expand Down
Loading