@@ -5,8 +5,8 @@ import model.v4_1._
55import genes ._
66import api .v4_1 ._
77import io .GenesIO ._
8- import io .{ Version , DatedVersionedObject , State }
9- import lenses .CombinedPerturbationLenses .safeCellLens
8+ import io .State
9+ import lenses .CombinedPerturbationLenses .safeCellDetailsLens
1010
1111import Common .ParamHandlers ._
1212import com .dataintuitive .jobserver ._
@@ -28,17 +28,14 @@ import org.apache.spark.sql.Encoders
2828
2929import org .apache .spark .storage .StorageLevel
3030import org .apache .spark .storage .StorageLevel ._
31- import org .apache .hadoop .fs .{FileSystem , Path }
3231
3332object initialize extends SparkSessionJob with NamedObjectSupport {
3433
3534 case class JobData (dbs : List [String ],
3635 geneAnnotations : String ,
3736 dbVersion : String ,
3837 partitions : Int ,
39- storageLevel : StorageLevel ,
40- geneFeatures : Map [String , String ],
41- geneDataTypes : Map [String , String ])
38+ storageLevel : StorageLevel )
4239 type JobOutput = collection.Map [String , Any ]
4340
4441 override def validate (sparkSession : SparkSession ,
@@ -50,10 +47,8 @@ object initialize extends SparkSessionJob with NamedObjectSupport {
5047 val dbVersion = paramDbVersion(config)
5148 val partitions = paramPartitions(config)
5249 val storageLevel = paramStorageLevel(config)
53- val geneFeatures = paramGeneFeatures(config)
54- val geneDataTypes = paramGeneDataTypes(config)
5550
56- withGood(db, genes) { JobData (_, _, dbVersion, partitions, storageLevel, geneFeatures, geneDataTypes ) }
51+ withGood(db, genes) { JobData (_, _, dbVersion, partitions, storageLevel) }
5752
5853 }
5954
@@ -81,60 +76,12 @@ object initialize extends SparkSessionJob with NamedObjectSupport {
8176 .set(" fs.s3n.awsSecretAccessKey" , fs_s3_awsSecretAccessKey)
8277
8378 // Loading gene annotations and broadcast
84- val genes =
85- loadGenesFromFile(sparkSession.sparkContext, data.geneAnnotations, delimiter= " \t " , dict = data.geneFeatures, dataTypeDict = data.geneDataTypes)
86- val genesDB = new GenesDB (genes)
79+ val genesDB = IO .getGenesDB(sparkSession, data.geneAnnotations)
8780 val genesBC = sparkSession.sparkContext.broadcast(genesDB)
8881
8982 runtime.namedObjects.update(" genes" , NamedBroadcast (genesBC))
9083
91- // Add inline, should be moved elsewhere --- START
92-
93- def allInput (sparkSession : SparkSession , path : List [String ]): List [DatedVersionedObject [Path ]] = {
94- import sparkSession .implicits ._
95-
96- val fs = FileSystem .get(sparkSession.sparkContext.hadoopConfiguration)
97-
98- val outputList =
99- path.flatMap(p => {
100- val pp = new Path (p)
101- if (pp.toString.contains(" .parquet" ))
102- List (pp)
103- .map(x => (x.getName, x.getParent, x))
104- else
105- fs
106- .listStatus(pp)
107- .map(_.getPath)
108- .map(x => (x.getName, x.getParent, x))
109- .filter(_._1.toString() contains " .parquet" )
110- })
111- val outputs = outputList.map{ case (name, path, fullPath) =>
112- val p = sparkSession.read.parquet(fullPath.toString).as[Perturbation ]
113- val version : Version =
114- p.first
115- .meta
116- .filter{ case MetaInformation (key, value) => key == " version" }
117- .headOption
118- .map(_.value)
119- .map(Version (_))
120- .getOrElse(Version (0 ,0 ))
121- val dateStrO =
122- p.first
123- .meta
124- .filter{ case MetaInformation (key, value) => key == " processingDate" }
125- .headOption
126- .map(_.value)
127- val date = dateStrO.map(java.time.LocalDate .parse).getOrElse(java.time.LocalDate .MIN )
128- DatedVersionedObject (date, version, fullPath)
129- }.toList
130-
131- outputs
132-
133- }
134-
135- // END
136-
137- val outputs = allInput(sparkSession, data.dbs)
84+ val outputs = IO .allInput(sparkSession, data.dbs)
13885 val state = State (outputs)
13986
14087 val thisVersion = state.state.filter(_.version.major.toString == data.dbVersion)
@@ -165,7 +112,7 @@ object initialize extends SparkSessionJob with NamedObjectSupport {
165112 val flatDb = db.map( row =>
166113 FlatDbRow (
167114 row.id,
168- safeCellLens .get(row),
115+ safeCellDetailsLens .get(row).head ,
169116 row.trt.trt_cp.map(_.dose).getOrElse(" N/A" ),
170117 row.trtType,
171118 row.trt.trt.name,
0 commit comments