- 
                Notifications
    
You must be signed in to change notification settings  - Fork 4.4k
 
Open
Description
What happened?
Can u help me how to define TIMESTAMP in schema in python?
import apache_beam as beam
from apache_beam import Row
from apache_beam.transforms import managed
from apache_beam.typehints import row_type
import datetime
from google.cloud import bigquery
from numpy._core.numerictypes import int64
client = bigquery.Client()
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions
opts = GoogleCloudOptions(project="myproject",
                          region="us-west1",
                          temp_location="gs://myproject/temp")
from typing import NamedTuple
import typing
class TestCol(NamedTuple):
    color: str
    size: float
class TestRow(NamedTuple):
    id: str
    name: str
    value: int
    nested: TestCol
    ts: datetime.datetime 
def cast_to_schema(schema_class, data):
    result = {}
    field_types = typing.get_type_hints(schema_class)
    if not data:
        return None
    for field in schema_class._fields:
        field_type = field_types.get(field)
        value = data.get(field)
        if hasattr(field_type, '_fields'):
            result[field] = cast_to_schema(field_type, value)
        else:
            result[field] = value
    return schema_class(**result)
with beam.Pipeline(options=opts) as p:
    rows = (
        p
        | "CreateRows" >> beam.Create([
            {"id": "a", "name": "Amy", "value": 100, "nested": {"color": "red", "size": 1.0}, "ts": datetime.datetime(2025, 1, 1, 1, 0, 0, tzinfo=datetime.timezone.utc)},
            {"id": "b", "name": "Bob", "value": 200, "nested": {"color": "blue", "size": 2.0}, "ts": datetime.datetime(2025, 1, 1, 2, 0, 0, tzinfo=datetime.timezone.utc)},
        ])
        | "ToRow" >> beam.Map(lambda x: cast_to_schema(TestRow, x)).with_output_types(row_type.RowTypeConstraint.from_fields(list(TestRow.__annotations__.items())))
    )
    _ = rows | "WriteToBQ" >> managed.Write(
        managed.BIGQUERY,
        config={
            "table": "myproject.mydataset.mytable",
            "write_disposition": "WRITE_APPEND",
        },
    )"errors": [
      {
        "message": "Provided Schema does not match Table xxxxx. Field ts has changed type from TIMESTAMP to INTEGER",
        "reason": "invalid"
      }
    ],
...
name: "ts"
type {
  nullable: true
  logical_type {
    urn: "beam:logical:pythonsdk_any:v1"
  }
}
Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
Issue Components
- Component: Python SDK
 - Component: Java SDK
 - Component: Go SDK
 - Component: Typescript SDK
 - Component: IO connector
 - Component: Beam YAML
 - Component: Beam examples
 - Component: Beam playground
 - Component: Beam katas
 - Component: Website
 - Component: Infrastructure
 - Component: Spark Runner
 - Component: Flink Runner
 - Component: Samza Runner
 - Component: Twister2 Runner
 - Component: Hazelcast Jet Runner
 - Component: Google Cloud Dataflow Runner