Skip to content

Commit 183b4ad

Browse files
committed
redo writers
1 parent 673eb7b commit 183b4ad

File tree

19 files changed

+565
-728
lines changed

19 files changed

+565
-728
lines changed

core/src/main/scala/org/polars/scala/polars/api/DataFrame.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,11 @@ class DataFrame private (private[polars] val ptr: Long) {
5858
)
5959
.collect(noOptimization = true)
6060

61-
def sort(exprs: Array[Expression], null_last: Array[Boolean], maintain_order: Boolean): DataFrame =
61+
def sort(
62+
exprs: Array[Expression],
63+
null_last: Array[Boolean],
64+
maintain_order: Boolean
65+
): DataFrame =
6266
toLazy.sort(exprs, null_last, maintain_order).collect(noOptimization = true)
6367

6468
def sort(expr: Expression, null_last: Boolean, maintain_order: Boolean): DataFrame =

core/src/main/scala/org/polars/scala/polars/api/LazyFrame.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,11 @@ class LazyFrame private (private[polars] val ptr: Long) {
6767
): LazyFrame =
6868
sort(Array(col), Array(descending), Array(nullLast), maintainOrder = maintainOrder)
6969

70-
def sort(exprs: Array[Expression], null_last: Array[Boolean], maintainOrder: Boolean): LazyFrame = {
70+
def sort(
71+
exprs: Array[Expression],
72+
null_last: Array[Boolean],
73+
maintainOrder: Boolean
74+
): LazyFrame = {
7175
assert(
7276
exprs.length == null_last.length,
7377
s"Length of provided expressions (${exprs.length}) and their " +

core/src/main/scala/org/polars/scala/polars/api/io/Writeable.scala

Lines changed: 13 additions & 117 deletions
Original file line numberDiff line numberDiff line change
@@ -7,152 +7,48 @@ import scala.jdk.CollectionConverters._
77

88
import org.polars.scala.polars.internal.jni.io.write._
99

10-
object WriteCompressions extends Enumeration {
11-
type WriteCompression = Value
12-
13-
private lazy val stringMap: Map[String, WriteCompression] =
14-
values.map(v => (v.toString.toLowerCase(Locale.ROOT), v)).toMap
15-
16-
def fromString(str: String): Option[WriteCompression] =
17-
stringMap.get(str.toLowerCase(Locale.ROOT))
18-
19-
val lz4, uncompressed, snappy, gzip, lzo, brotli, zstd, deflate = Value
20-
}
21-
22-
object WriteModes extends Enumeration {
23-
type WriteMode = Value
24-
25-
private lazy val stringMap: Map[String, WriteMode] =
26-
values.map(v => (v.toString.toLowerCase(Locale.ROOT), v)).toMap
27-
28-
def fromString(str: String): Option[WriteMode] =
29-
stringMap.get(str.toLowerCase(Locale.ROOT))
30-
31-
val ErrorIfExists, Overwrite = Value
32-
}
33-
3410
class Writeable private[polars] (ptr: Long) {
3511
import org.polars.scala.polars.jsonMapper
3612

37-
private var _mode: String = WriteModes.ErrorIfExists.toString
38-
private var _compression: String = WriteCompressions.zstd.toString
39-
private var _compressionLevel: Int = -1
40-
private val _options: MutableMap[String, String] = MutableMap.empty
13+
private val _options: MutableMap[String, String] = MutableMap("writeMode" -> "errorifexists")
4114

42-
def compression(
43-
value: WriteCompressions.WriteCompression,
44-
level: Option[Int]
45-
): Writeable = synchronized {
46-
_compression = value.toString
47-
level match {
48-
case Some(value) => _compressionLevel = value
49-
case None =>
15+
def option(key: String, value: String): Writeable = synchronized {
16+
if (Option(key).exists(_.trim.isEmpty) || Option(value).exists(_.trim.isEmpty)) {
17+
throw new IllegalArgumentException("Option key or value cannot be null or empty.")
5018
}
5119

20+
_options.put(key.trim, value.trim)
5221
this
5322
}
5423

55-
def compression(
56-
value: String,
57-
level: Option[Int] = None
58-
): Writeable = synchronized {
59-
compression(
60-
WriteCompressions
61-
.fromString(value)
62-
.getOrElse(
63-
throw new IllegalArgumentException(
64-
s"Provided value '$value' is not a valid write mode."
65-
)
66-
),
67-
level
68-
)
69-
}
70-
71-
def mode(value: WriteModes.WriteMode): Writeable = synchronized {
72-
_mode = value.toString
73-
this
74-
}
75-
76-
def mode(value: String): Writeable = synchronized {
77-
mode(
78-
WriteModes
79-
.fromString(value)
80-
.getOrElse(
81-
throw new IllegalArgumentException(
82-
s"Provided value '$value' is not a valid write mode."
83-
)
84-
)
85-
)
86-
}
87-
88-
def withOption(key: String, value: String): Writeable = synchronized {
89-
(key, value) match {
90-
case (_, null) | (null, _) | (null, null) =>
91-
throw new IllegalArgumentException("Option key or value cannot be null or empty.")
92-
93-
case (k, v) =>
94-
_options.put(k.trim, v.trim)
95-
this
96-
}
97-
}
98-
9924
def options(opts: java.util.Map[String, String]): Writeable = synchronized {
100-
opts.asScala.foreach { case (key, value) => withOption(key, value) }
101-
25+
opts.asScala.foreach { case (key, value) => option(key, value) }
10226
this
10327
}
10428

10529
def options(opts: Iterable[(String, String)]): Writeable = synchronized {
106-
opts.foreach { case (key, value) => withOption(key, value) }
107-
30+
opts.foreach { case (key, value) => option(key, value) }
10831
this
10932
}
11033

111-
def parquet(filePath: String, writeStats: Boolean = false): Unit =
34+
def parquet(filePath: String): Unit =
11235
writeParquet(
11336
ptr = ptr,
11437
filePath = filePath,
115-
writeStats = writeStats,
116-
compression = _compression,
117-
compressionLevel = _compressionLevel,
118-
options = jsonMapper.writeValueAsString(_options),
119-
writeMode = _mode
38+
options = jsonMapper.writeValueAsString(_options)
12039
)
12140

122-
def ipc(filePath: String): Unit = {
123-
WriteCompressions.fromString(_compression).get match {
124-
case WriteCompressions.zstd | WriteCompressions.uncompressed | WriteCompressions.lz4 =>
125-
case v =>
126-
throw new IllegalArgumentException(
127-
s"Compression for IPC format must be one of {{'uncompressed', 'lz4', 'zstd'}}, got $v"
128-
)
129-
}
130-
41+
def ipc(filePath: String): Unit =
13142
writeIPC(
13243
ptr = ptr,
13344
filePath = filePath,
134-
compression = _compression,
135-
options = jsonMapper.writeValueAsString(_options),
136-
writeMode = _mode
45+
options = jsonMapper.writeValueAsString(_options)
13746
)
138-
}
139-
140-
def avro(filePath: String): Unit = {
141-
WriteCompressions.fromString(_compression).get match {
142-
case WriteCompressions.uncompressed | WriteCompressions.deflate |
143-
WriteCompressions.snappy =>
144-
case v =>
145-
throw new IllegalArgumentException(
146-
s"Compression for Avro format must be one of {{'uncompressed', 'deflate', 'snappy'}}, got $v"
147-
)
148-
}
14947

48+
def avro(filePath: String): Unit =
15049
writeAvro(
15150
ptr = ptr,
15251
filePath = filePath,
153-
compression = _compression,
154-
options = jsonMapper.writeValueAsString(_options),
155-
writeMode = _mode
52+
options = jsonMapper.writeValueAsString(_options)
15653
)
157-
}
15854
}

core/src/main/scala/org/polars/scala/polars/api/types/Schema.scala

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ class Schema private (private[polars] val json: String) {
8181
.map(Option(_))
8282
.collectFirst { case Some(v) => v } match {
8383
case Some(listNode) =>
84-
val listNodeType = listNode.get("data_type")
84+
val listNodeType = listNode.get("dtype")
8585
Field(name, ListType(toField((name, listNodeType, listNodeType.getNodeType)).dataType))
8686

8787
case None =>
@@ -93,9 +93,9 @@ class Schema private (private[polars] val json: String) {
9393
val structNode = node.get("Struct")
9494
val structFields = structNode.iterator().asScala
9595
val sf = structFields.map {
96-
case node: JsonNode if node.hasNonNull("name") && node.hasNonNull("data_type") =>
96+
case node: JsonNode if node.hasNonNull("name") && node.hasNonNull("dtype") =>
9797
val structFieldName: String = node.get("name").textValue()
98-
val structFieldType: JsonNode = node.get("data_type")
98+
val structFieldType: JsonNode = node.get("dtype")
9999

100100
Field(
101101
structFieldName,
@@ -119,9 +119,7 @@ class Schema private (private[polars] val json: String) {
119119
case Some(node: JsonNode) if node.hasNonNull("fields") =>
120120
val fields = node.get("fields").elements().asScala.toList
121121
_fields = fields
122-
.map(f =>
123-
toField(f.get("name").textValue(), f.get("data_type"), f.get("data_type").getNodeType)
124-
)
122+
.map(f => toField(f.get("name").textValue(), f.get("dtype"), f.get("dtype").getNodeType))
125123
.toArray
126124
_fieldNames = fields.map(f => f.get("name").toString).toArray
127125

core/src/main/scala/org/polars/scala/polars/internal/jni/io/write.scala

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,32 +2,24 @@ package org.polars.scala.polars.internal.jni.io
22

33
import org.polars.scala.polars.internal.jni.Natively
44

5-
private[polars] object write extends Natively {
5+
object write extends Natively {
66

77
@native def writeParquet(
88
ptr: Long,
99
filePath: String,
10-
writeStats: Boolean,
11-
compression: String,
12-
compressionLevel: Int,
13-
options: String,
14-
writeMode: String
10+
options: String
1511
): Unit
1612

1713
@native def writeIPC(
1814
ptr: Long,
1915
filePath: String,
20-
compression: String,
21-
options: String,
22-
writeMode: String
16+
options: String
2317
): Unit
2418

2519
@native def writeAvro(
2620
ptr: Long,
2721
filePath: String,
28-
compression: String,
29-
options: String,
30-
writeMode: String
22+
options: String
3123
): Unit
3224

3325
}

examples/src/main/java/examples/java/io/WritingToFileDatasets.java

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,13 @@
33
import examples.scala.utils.CommonUtils;
44
import org.polars.scala.polars.Polars;
55
import org.polars.scala.polars.api.DataFrame;
6-
import org.polars.scala.polars.api.io.WriteCompressions;
7-
import org.polars.scala.polars.api.io.WriteModes;
8-
import scala.Some;
96

107
/**
118
* Polars supports various output file formats like the following,
129
*
1310
* <ul>
14-
* <li>{@link org.polars.scala.polars.api.io.Writeable#parquet(String, boolean) Apache Parquet}
15-
* <li>{@link org.polars.scala.polars.api.io.Writeable#ipc(String) Apache Arrow IPC}
16-
* <li>{@link org.polars.scala.polars.api.io.Writeable#avro(String) Apache Avro}
11+
* <li>{@link org.polars.scala.polars.api.io.Writeable#parquet(String) Apache Parquet}
12+
* <li>{@link org.polars.scala.polars.api.io.Writeable#ipc(String) Apache IPC}
1713
* </ul>
1814
*
1915
* <p>A {@link DataFrame} can be written to an object storage as a file in one of the supported
@@ -40,29 +36,31 @@ public static void main(String[] args) {
4036

4137
/* Write this DataFrame to local filesystem at the provided path */
4238
String outputPath = CommonUtils.getOutputLocation("output.pq");
43-
df.write().parquet(outputPath, false);
39+
df.write().parquet(outputPath);
4440
System.out.printf("File written to location: %s%n%n", outputPath);
4541

4642
/* Overwrite output if already exists */
47-
df.write().mode("overwrite").parquet(outputPath, false);
43+
df.write().option("write_mode", "overwrite").parquet(outputPath);
4844
System.out.printf("File overwritten at location: %s%n%n", outputPath);
4945

5046
/* Write output file with compression */
5147
df.write()
52-
.compression(WriteCompressions.zstd(), Some.apply(14))
53-
.mode(WriteModes.Overwrite())
54-
.parquet(outputPath, true);
48+
.option("write_compression", "zstd")
49+
.option("write_mode", "overwrite")
50+
.option("write_parquet_stats", "full")
51+
.parquet(outputPath);
5552
System.out.printf("File overwritten at location: %s with compression%n%n", outputPath);
5653

5754
/* Write output file to Amazon S3 object store */
5855
String s3Path = "s3://bucket/output.pq";
5956
df.write()
60-
.withOption("aws_default_region", "us‑east‑2")
61-
.withOption("aws_access_key_id", "ABC")
62-
.withOption("aws_secret_access_key", "XYZ")
63-
.compression(WriteCompressions.zstd(), Some.apply(14))
64-
.mode(WriteModes.Overwrite())
65-
.parquet(s3Path, true);
57+
.option("write_compression", "zstd")
58+
.option("write_mode", "overwrite")
59+
.option("write_parquet_stats", "full")
60+
.option("aws_default_region", "us‑east‑2")
61+
.option("aws_access_key_id", "ABC")
62+
.option("aws_secret_access_key", "XYZ")
63+
.parquet(s3Path);
6664
System.out.printf("File overwritten at location: %s with compression%n%n", s3Path);
6765
}
6866
}

examples/src/main/scala/examples/scala/io/WritingToFileDatasets.scala

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,12 @@
11
package examples.scala.io
22

33
import org.polars.scala.polars.Polars
4-
import org.polars.scala.polars.api.io.{WriteCompressions, WriteModes}
54

65
import examples.scala.utils.CommonUtils
76

87
/** Polars supports various output file formats like the following,
98
* - [[org.polars.scala.polars.api.io.Writeable.parquet Apache Parquet]]
10-
* - [[org.polars.scala.polars.api.io.Writeable.ipc Apache Arrow IPC]]
11-
* - [[org.polars.scala.polars.api.io.Writeable.avro Apache Avro]]
9+
* - [[org.polars.scala.polars.api.io.Writeable.ipc Apache IPC]]
1210
*
1311
* A [[org.polars.scala.polars.api.DataFrame DataFrame]] can be written to an object storage as a
1412
* file in one of the supported formats mentioned above.
@@ -38,29 +36,35 @@ object WritingToFileDatasets {
3836
printf("File written to location: %s%n%n", outputPath)
3937

4038
/* Overwrite output if already exists */
41-
df.write().mode("overwrite").parquet(outputPath)
39+
df.write().option("write_mode", "full").parquet(outputPath)
4240
printf("File overwritten at location: %s%n%n", outputPath)
4341

4442
/* Write output file with compression */
4543
df.write()
46-
.compression(WriteCompressions.zstd, Some.apply(14))
47-
.mode(WriteModes.Overwrite)
48-
.parquet(outputPath, writeStats = true)
44+
.options(
45+
Map(
46+
"write_compression" -> "zstd",
47+
"write_mode" -> "overwrite",
48+
"write_parquet_stats" -> "full"
49+
)
50+
)
51+
.parquet(outputPath)
4952
printf("File overwritten at location: %s with compression%n%n", outputPath)
5053

5154
/* Write output file to Amazon S3 object store */
5255
val s3Path: String = "s3://bucket/output.pq"
5356
df.write()
5457
.options(
5558
Map(
59+
"write_compression" -> "zstd",
60+
"write_mode" -> "overwrite",
61+
"write_parquet_stats" -> "full",
5662
"aws_default_region" -> "us‑east‑2",
5763
"aws_access_key_id" -> "ABC",
5864
"aws_secret_access_key" -> "XYZ"
5965
)
6066
)
61-
.compression(WriteCompressions.zstd, Some.apply(14))
62-
.mode(WriteModes.Overwrite)
63-
.parquet(s3Path, writeStats = true)
67+
.parquet(s3Path)
6468
printf("File overwritten at location: %s with compression%n%n", s3Path)
6569
}
6670

0 commit comments

Comments
 (0)