Skip to content

[Bug]: Managed I/O with BIGQUERY can not insert TIMESTAMP #36553

@cona-bear

Description

@cona-bear

What happened?

Can u help me how to define TIMESTAMP in schema in python?

import apache_beam as beam
from apache_beam import Row
from apache_beam.transforms import managed
from apache_beam.typehints import row_type
import datetime

from google.cloud import bigquery
from numpy._core.numerictypes import int64
client = bigquery.Client()


from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions
opts = GoogleCloudOptions(project="myproject",
                          region="us-west1",
                          temp_location="gs://myproject/temp")


from typing import NamedTuple
import typing

class TestCol(NamedTuple):
    color: str
    size: float

class TestRow(NamedTuple):
    id: str
    name: str
    value: int
    nested: TestCol
    ts: datetime.datetime 

def cast_to_schema(schema_class, data):
    result = {}
    field_types = typing.get_type_hints(schema_class)

    if not data:
        return None

    for field in schema_class._fields:
        field_type = field_types.get(field)
        value = data.get(field)
        if hasattr(field_type, '_fields'):
            result[field] = cast_to_schema(field_type, value)
        else:
            result[field] = value
    return schema_class(**result)

with beam.Pipeline(options=opts) as p:
    rows = (
        p
        | "CreateRows" >> beam.Create([
            {"id": "a", "name": "Amy", "value": 100, "nested": {"color": "red", "size": 1.0}, "ts": datetime.datetime(2025, 1, 1, 1, 0, 0, tzinfo=datetime.timezone.utc)},
            {"id": "b", "name": "Bob", "value": 200, "nested": {"color": "blue", "size": 2.0}, "ts": datetime.datetime(2025, 1, 1, 2, 0, 0, tzinfo=datetime.timezone.utc)},
        ])
        | "ToRow" >> beam.Map(lambda x: cast_to_schema(TestRow, x)).with_output_types(row_type.RowTypeConstraint.from_fields(list(TestRow.__annotations__.items())))
    )

    _ = rows | "WriteToBQ" >> managed.Write(
        managed.BIGQUERY,
        config={
            "table": "myproject.mydataset.mytable",
            "write_disposition": "WRITE_APPEND",
        },
    )
"errors": [
      {
        "message": "Provided Schema does not match Table xxxxx. Field ts has changed type from TIMESTAMP to INTEGER",
        "reason": "invalid"
      }
    ],

...

name: "ts"
type {
  nullable: true
  logical_type {
    urn: "beam:logical:pythonsdk_any:v1"
  }
}

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Infrastructure
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions