基因数据处理32之Avocado运行记录(人造数据集)
发布时间:2021-03-07 01:14:18 所属栏目:大数据 来源:网络整理
导读:主要是需要数据正确,如果中间缺少记录,avocado一般不会成功 1.代码: Avocado修改: /** * Licensed to Big Data Genomics (BDG) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional
副标题[/!--empirenews.page--]
主要是需要数据正确,如果中间缺少记录,avocado一般不会成功 /** * Licensed to Big Data Genomics (BDG) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The BDG licenses this file * to you under the Apache License,Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing,software * distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.bdgenomics.avocado.cli import java.nio.file.Files import org.apache.commons.configuration.HierarchicalConfiguration import org.apache.commons.configuration.plist.PropertyListConfiguration import org.apache.hadoop.mapreduce.Job import org.apache.spark.rdd.RDD import org.apache.spark.{SparkContext,Logging} import org.kohsuke.args4j.{Option => option,Argument} import org.bdgenomics.adam.models.{VariantContext,ReferenceRegion} import org.bdgenomics.adam.rdd.ADAMContext._ import org.bdgenomics.avocado.Timers._ import org.bdgenomics.avocado.discovery.Explore import org.bdgenomics.avocado.genotyping.CallGenotypes import org.bdgenomics.avocado.input.Input import org.bdgenomics.avocado.models.Observation import org.bdgenomics.avocado.preprocessing.Preprocessor import org.bdgenomics.avocado.postprocessing.Postprocessor import org.bdgenomics.avocado.stats.AvocadoConfigAndStats import org.bdgenomics.formats.avro.{ Variant,AlignmentRecord,NucleotideContigFragment,Genotype } import org.bdgenomics.utils.cli.{ BDGSparkCommand,BDGCommandCompanion,ParquetArgs,Args4j,Args4jBase } import org.bdgenomics.utils.instrumentation._ object Avocado extends BDGCommandCompanion { val commandName = "Avocado" val commandDescription = "Call variants using avocado and the ADAM preprocessing pipeline." def apply(args: Array[String]) = { new Avocado(Args4j[AvocadoArgs](args)) } } class AvocadoArgs extends Args4jBase with ParquetArgs { @Argument(metaVar = "READS",required = true,usage = "ADAM read-oriented data",index = 0) var readInput: String = _ @Argument(metaVar = "REFERENCE",usage = "ADAM or FASTA reference genome data",index = 1) var referenceInput: String = _ @Argument(metaVar = "VARIANTS",usage = "ADAM variant output",index = 2) var variantOutput: String = _ @Argument(metaVar = "CONFIG",usage = "avocado configuration file",index = 3) var configFile: String = _ @option(name = "-debug",usage = "If set,prints a higher level of debug output.") var debug = false @option(required = false,name = "-fragment_length",usage = "Sets maximum fragment length. Default value is 10,000. Values greater than 1e9 should be avoided.") var fragmentLength: Long = 10000L } class Avocado(protected val args: AvocadoArgs) extends BDGSparkCommand[AvocadoArgs] with Logging { // companion object to this class - needed for BDGCommand framework val companion = Avocado /** *********add by xubo 20160528 *******************/ println("companion:" + companion) println("companion.commandName:" + companion.commandName) println("companion.commandDescription:" + companion.commandDescription) // println("companion.commandName:"+companion.) println("AvocadoArgs:") println("args:" + args) println("args.configFile:" + args.configFile) println("args.debug:" + args.debug) println("args.fragmentLength:" + args.fragmentLength) println("args.readInput:" + args.readInput) println("args.referenceInput:" + args.referenceInput) println("args.variantOutput:" + args.variantOutput) println("test endn") /** *********add by xubo 20160528 *******************/ // get config off classpath and load into a temp file... val stream = Thread.currentThread.getContextClassLoader.getResourceAsStream(args.configFile) val tempPath = Files.createTempDirectory("config") val tempFilePath = tempPath.resolve("temp.properties") Files.copy(stream,tempFilePath) /** *********add by xubo 20160528 *******************/ println("stream:" + stream) println("tempFilePath:" + tempFilePath) /** *********add by xubo 20160528 *******************/ // load config val config: HierarchicalConfiguration = new PropertyListConfiguration(tempFilePath.toFile) val preprocessorNames = getStringArrayFromConfig("preprocessorNames") val preprocessorAlgorithms = getStringArrayFromConfig("preprocessorAlgorithms") assert(preprocessorNames.length == preprocessorAlgorithms.length,"Must have a 1-to-1 mapping between preprocessor names and algorithms.") val preprocessingStagesZippedWithNames = preprocessorNames.zip(preprocessorAlgorithms) val explorerName = config.getString("explorerName") val explorerAlgorithm = config.getString("explorerAlgorithm") val genotyperName = config.getString("genotyperName") val genotyperAlgorithm = config.getString("genotyperAlgorithm") val postprocessorNames = getStringArrayFromConfig("postprocessorNames") val postprocessorAlgorithms = getStringArrayFromConfig("postprocessorAlgorithms") assert(postprocessorNames.length == postprocessorAlgorithms.length,"Must have a 1-to-1 mapping between postprocessor names and algoritms.") val postprocessorsZipped = postprocessorNames.zip(postprocessorAlgorithms) val debug = args.debug /** *********add by xubo 20160528 *******************/ println("config:" + config) println("preprocessorNames:" + preprocessorNames) preprocessorNames.foreach(println) println("preprocessorAlgorithms:" + preprocessorAlgorithms) preprocessorAlgorithms.foreach(println) println("preprocessingStagesZippedWithNames:" + preprocessingStagesZippedWithNames) preprocessorNames.foreach(println) println("explorerName:" + explorerName) println("explorerAlgorithm:" + explorerAlgorithm) println("genotyperName:" + genotyperName) println("genotyperAlgorithm:" + genotyperAlgorithm) println("postprocessorNames:" + postprocessorNames) postprocessorNames.foreach(println) println("postprocessorAlgorithms:" + postprocessorAlgorithms) postprocessorAlgorithms.foreach(println) println("postprocessorsZipped:" + postprocessorsZipped) postprocessorsZipped.foreach(println) println("stream:" + stream) println("stream:" + stream) println("stream:" + stream) println("stream:" + stream) /** *********add by xubo 20160528 *******************/ private def getStringArrayFromConfig(name: String): Array[String] = { config.getStringArray(name).map(_.toString) } /** * Applies several pre-processing steps to the read pipeline. Currently,these are the default * steps in the ADAM processing pipeline. * * @param reads RDD of reads to process. * @return RDD containing reads that have been sorted and deduped. */ def preProcessReads(reads: RDD[AlignmentRecord]): RDD[AlignmentRecord] = PreprocessReads.time { var processedReads = reads //.cache if (debug) { log.info("avocado: Preprocessing " + processedReads.count + " reads.") } // loop over preprocessing stages and apply preprocessingStagesZippedWithNames.foreach(p => { val (stageName,stageAlgorithm) = p log.info("avocado: Running " + stageName) // run this preprocessing stage processedReads = Preprocessor(processedReads,stageName,stageAlgorithm,config) }) // return processed reads processedReads } /** * Applies variant calling algorithms to reads and pileups. Reduces down and returns called variants. * * @param reads * @param stats * @return Joined output of variant calling algorithms. */ def callVariants(reads: RDD[AlignmentRecord],stats: AvocadoConfigAndStats): RDD[VariantContext] = CallVariants.time { val discoveries: RDD[Observation] = Explore(explorerAlgorithm,explorerName,reads,stats,config) CallGenotypes(genotyperAlgorithm,genotyperName,discoveries,config) } /** * Applies variant post-processing methods to called variants. Post-processing can * include methods which modify the information in variant calls,or alternatively,* methods that filter out spurious variant calls. * * @param variants RDD of variants to process. * @return Post-processed variants. */ def postProcessVariants(variants: RDD[VariantContext],stats: AvocadoConfigAndStats): RDD[VariantContext] = PostprocessVariants.time { var rdd = variants // loop over post processing steps postprocessorsZipped.foreach(p => { val (ppStageName,ppAlgorithm) = p rdd = Postprocessor(rdd,ppStageName,ppAlgorithm,config) }) rdd } /** * Main method. Implements body of variant caller. SparkContext and Hadoop Job are provided * by the ADAMSparkCommand shell. * * @param sc SparkContext for RDDs. * @param job Hadoop Job container for file I/O. */ def run(sc: SparkContext) { log.info("Starting avocado...") // load in reference from ADAM file val reference: RDD[NucleotideContigFragment] = LoadContigs.time { sc.loadSequence(args.referenceInput,fragmentLength = args.fragmentLength) } log.info("Loading reads in from " + args.readInput) // load in reads from ADAM file val reads: RDD[AlignmentRecord] = LoadReads.time { Input(sc,args.readInput,reference,config) } /** *********add by xubo 20160528 *******************/ println("readInput:") reads.foreach(println) /** *********add by xubo 20160528 *******************/ // create stats/config item val stats = new AvocadoConfigAndStats(sc,args.debug,reference) /** *********add by xubo 20160528 *******************/ println("stats:" + stats) println("stats.contigLengths:" + stats.contigLengths) // println("stats.coverage:" + stats.coverage) println("stats.debug:" + stats.debug) println("stats.referenceObservations:" + stats.referenceObservations) // println("stats.referenceSeq:" + stats.referenceSeq) // stats.referenceSeq.foreach(println) println("stats.samplesInDataset:" + stats.samplesInDataset) stats.samplesInDataset.foreach(println) println("stats.sequenceDict:" + stats.sequenceDict) // println("stats.contigLengths:"+stats) /** *********add by xubo 20160528 *******************/ // apply read translation steps log.info("Processing reads.") var cleanedReads = preProcessReads(reads) /** *********add by xubo 20160528 *******************/ println("cleanedReads:" + cleanedReads) println("cleanedReads.count:" + cleanedReads.count()) cleanedReads.foreach(println) val cleanedReads2 = cleanedReads.map { each => each.setRecordGroupSample("hello") each } // cleanedReads.adamGetReadGroupDictionary() // cleanedReads.getSequenc // cleanedReads.foreach(each => println(each.getSequence + " " + each.referenceLength)) println("cleanedReads2:") cleanedReads2.foreach(println) /** *********add by xubo 20160528 *******************/ // call variants on filtered reads and pileups log.info("Calling variants.") // val calledVariants = callVariants(cleanedReads,stats) val calledVariants = callVariants(cleanedReads2,stats) /** *********add by xubo 20160528 *******************/ println("calledVariants:" + calledVariants) println("calledVariants.count:" + calledVariants.count()) // calledVariants.take(10).foreach(each=>println(each.databases+" "+each.position+" "+each.variant+" "=each.genotypes)) // calledVariants.take(10).foreach(println) /** *********add by xubo 20160528 *******************/ // post process variants log.info("Post-processing variants.") val processedGenotypes: RDD[Genotype] = postProcessVariants(calledVariants,stats).flatMap(variantContext => variantContext.genotypes) /** *********add by xubo 20160528 *******************/ println("processedGenotypes:" + calledVariants) println("processedGenotypes.count:" + processedGenotypes.count()) processedGenotypes.foreach(println) /** *********add by xubo 20160528 *******************/ // save variants to output file log.info("Writing calls to disk.") SaveVariants.time { processedGenotypes.adamParquetSave(args.variantOutput,args.blockSize,args.pageSize,args.compressionCodec,args.disableDictionaryEncoding) /** ***********add ***************/ processedGenotypes.foreach { each => println("processedGenotypes:" + each) } // processedGenotypes. } } } (编辑:广西网) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |
站长推荐