Skip to content

Conversation

@LinuxMainframe
Copy link

@LinuxMainframe LinuxMainframe commented Nov 6, 2025

Adds support for per-scene-item show and hide transitions via new WebSocket RPC requests

Key Changes:

  • Introduced four new non-breaking RPC requests:
    • GetSceneItemShowTransition: Retrieves the current show transition configuration (name, UUID, kind, duration) for a specified scene item.
    • SetSceneItemShowTransition: Sets or updates the show transition for a scene item, supporting parameters like transitionName (null to disable) and transitionDuration (in milliseconds).
    • GetSceneItemHideTransition: Retrieves the hide transition details, similar to the show variant.
    • SetSceneItemHideTransition: Configures the hide transition, with analogous parameters.
  • These requests enable granular control over individual scene item visibility transitions, compatible with scenes and scene groups.
  • Added #include <optional> for full compatibility with OBS Studio v32, ensuring safe handling of nullable parameters.

This implementation extends the API to allow remote management of source appearances, such as fading in/out specific items without affecting the entire scene.

Motivation:
The change addresses a long-standing feature gap where per-source show/hide transitions, available in OBS Studio's UI, were not exposed via WebSocket. This limits automation and dynamic streaming setups, as clients couldn't gracefully introduce or modify sources remotely.

Closes #906 by providing dedicated RPC endpoints for these settings, aligning with the request for integration into source management workflows.

Tested with:
A comprehensive Python test suite using asyncio and websockets library. The script:

  • Connects to OBS WebSocket (v5.6.3) at ws://localhost:4455.
  • Sets up a test scene ("Test_Transitions_Scene") with a color source.
  • Validates Get/SetSceneItemShowTransition and Get/SetSceneItemHideTransition through cycles of enabling (e.g., Fade at 500ms/750ms), updating durations/names, disabling via null, and verifying responses.
  • Tests visibility toggles with SetSceneItemEnabled to confirm transitions apply during show/hide.
  • Covers edge cases: invalid scene names, item IDs, and transition names, ensuring proper error handling (e.g., "No transition was found").
    All tests passed, with outcomes confirming correct UUIDs, kinds (e.g., "fade_transition"), and durations.

Tested OS(s): Linux (Ubuntu-based development environment with OBS Studio v32+).

  • New request/event (non-breaking)

  • I have read the Contributing Guidelines.

  • All commit messages are properly formatted and commits squashed where appropriate.

  • My code is not on master or a release/* branch.

  • The code has been tested.

  • I have included updates to all appropriate documentation.

Commits:

aab18011 and others added 2 commits November 4, 2025 00:25
- Add GetSceneItemShowTransition request to get scene item show transitions
- Add SetSceneItemShowTransition request to set scene item show transitions
- Add GetSceneItemHideTransition request to get scene item hide transitions
- Add SetSceneItemHideTransition request to set scene item hide transitions

These requests expose per-scene-item transition properties through the WebSocket
API, allowing clients to configure and manage show/hide transitions for individual
scene items.

Transitions support:
- Setting custom transitions for show/hide actions
- Configuring duration (in milliseconds) for each transition
- Removing transitions by passing null
- Works with both scenes and scene groups

Addresses issue obsproject#906
In order to fully close the issue of obsproject#906 of obs-websockets, #include <optional> is necessary (v32 of OBS as of this commit).
@LinuxMainframe
Copy link
Author

Test cases for my commit:

#!/usr/bin/env python3
"""
Test set for OBS WebSocket scene item transition functions.
This script tests the functionality for handling remote show/hide transitions
on scene items via the OBS WebSocket API.

AUTHOR: Aidan A. Bradley
DATE: November 4th, 2025
"""

import asyncio
import json
import time
from typing import Optional, Dict, Any
import websockets
import argparse
import base64
import hashlib


class OBSWebSocketTest:
    def __init__(self, host: str = "localhost", port: int = 4455, password: Optional[str] = None):
        """
        Initialize the OBS WebSocket test client.

        :param host: Hostname or IP of the OBS WebSocket server (default: 'localhost')
        :param port: Port of the OBS WebSocket server (default: 4455)
        :param password: Password for authentication if required (default: None)
        """
        self.host = host
        self.port = port
        self.password = password
        self.ws: Optional[websockets.WebSocketClientProtocol] = None
        self.request_id: int = 0

    async def connect(self) -> None:
        """
        Establish a connection to the OBS WebSocket server, handle authentication if required,
        and complete the identification process.
        """
        uri = f"ws://{self.host}:{self.port}"
        print(f"Connecting to {uri}...")
        self.ws = await websockets.connect(uri)

        # Receive the initial Hello message from the server
        hello = json.loads(await self.ws.recv())
        print(f"Received Hello: {hello['op']} - OBS WebSocket version {hello['d']['obsWebSocketVersion']}")

        identify: Dict[str, Any] = {
            "op": 1,
            "d": {
                "rpcVersion": 1,
                "eventSubscriptions": 0
            }
        }

        # Handle authentication if required
        if 'authentication' in hello['d'] and hello['d']['authentication'] is not None:
            if not self.password:
                raise ValueError("Password required but not provided")
            print("Authentication required")
            salt = hello['d']['authentication']['salt']
            challenge = hello['d']['authentication']['challenge']
            # Compute the authentication response using SHA256 and base64
            passhash = hashlib.sha256((self.password + salt).encode('utf-8')).digest()
            secret = base64.b64encode(passhash)
            response_hash = hashlib.sha256(secret + challenge.encode('utf-8')).digest()
            auth_response = base64.b64encode(response_hash).decode('utf-8')
            identify["d"]["authentication"] = auth_response
        else:
            print("No authentication required")

        # Send the Identify message
        await self.ws.send(json.dumps(identify))

        # Receive the Identified message confirming connection
        identified = json.loads(await self.ws.recv())
        print(f"Connected! Negotiated RPC version: {identified['d']['negotiatedRpcVersion']}")

    async def request(self, request_type: str, request_data: Dict[str, Any] = None) -> Dict[str, Any]:
        """
        Send a request to the OBS WebSocket server and await the response.

        :param request_type: The type of request (e.g., 'GetSceneItemList')
        :param request_data: Optional data payload for the request
        :return: The response data from the server
        """
        self.request_id += 1
        request: Dict[str, Any] = {
            "op": 6,
            "d": {
                "requestType": request_type,
                "requestId": str(self.request_id),
                "requestData": request_data or {}
            }
        }

        await self.ws.send(json.dumps(request))

        # Loop until the matching response is received
        while True:
            response = json.loads(await self.ws.recv())
            if response['op'] == 7 and response['d']['requestId'] == str(self.request_id):
                return response['d']

    async def close(self) -> None:
        """
        Close the WebSocket connection if it is open.
        """
        if self.ws:
            await self.ws.close()
            print("Connection closed")

    async def setup_test_scene(self) -> tuple[str, int]:
        """
        Set up a test scene and add a color source for testing transitions.

        :return: Tuple containing the scene name and scene item ID
        :raises Exception: If unable to create or retrieve the test scene item
        """
        print("\n=== Setting up test environment ===")

        # Define test scene and source names
        scene_name = "Test_Transitions_Scene"
        print(f"Creating scene: {scene_name}")
        try:
            await self.request("CreateScene", {"sceneName": scene_name})
        except Exception:
            print(f"Scene {scene_name} might already exist, continuing...")

        source_name = "Test_Color_Source"
        print(f"Creating color source: {source_name}")
        try:
            await self.request("CreateInput", {
                "sceneName": scene_name,
                "inputName": source_name,
                "inputKind": "color_source_v3",
                "inputSettings": {
                    "color": 4278190335  # ARGB value for blue
                }
            })
        except Exception as e:
            print(f"Source might already exist: {str(e)}")

        # Retrieve the list of scene items to get the item ID
        response = await self.request("GetSceneItemList", {"sceneName": scene_name})
        if response['requestStatus']['result']:
            scene_items = response['responseData']['sceneItems']
            if scene_items:
                scene_item_id = scene_items[0]['sceneItemId']
                print(f"Found scene item ID: {scene_item_id}")
                return scene_name, scene_item_id

        raise Exception("Failed to create test scene item")

    async def test_get_scene_item_show_transition(self, scene_name: str, scene_item_id: int) -> Dict[str, Any]:
        """
        Test the GetSceneItemShowTransition request to retrieve show transition details.

        :param scene_name: Name of the scene
        :param scene_item_id: ID of the scene item
        :return: The response from the request
        """
        print("\n=== Testing GetSceneItemShowTransition ===")

        response = await self.request("GetSceneItemShowTransition", {
            "sceneName": scene_name,
            "sceneItemId": scene_item_id
        })

        if response['requestStatus']['result']:
            data = response.get('responseData', {})
            is_set = data.get('transitionName') is not None
            print(f"✓ Show transition set: {is_set}")
            print(f"  Transition name: {data.get('transitionName')}")
            print(f"  Transition UUID: {data.get('transitionUuid')}")
            print(f"  Transition kind: {data.get('transitionKind')}")
            print(f"  Transition duration: {data.get('transitionDuration')} ms")
        else:
            print(f"✗ Failed: {response['requestStatus'].get('comment', 'No error message')}")

        return response

    async def get_available_transitions(self) -> Optional[str]:
        """
        Retrieve the list of available transitions in OBS and return the first one.

        :return: Name of the first available transition or None if none found
        """
        print("\n=== Getting available transitions ===")
        response = await self.request("GetSceneTransitionList", {})

        if response['requestStatus']['result']:
            transitions = response.get('responseData', {}).get('transitions', [])
            if transitions:
                transition_names = [t.get('transitionName') for t in transitions]
                print(f"Available transitions: {transition_names}")
                return transition_names[0]  # Return the first available transition
            else:
                print("✗ No transitions available")
                return None
        else:
            print(f"✗ Failed to get transitions: {response['requestStatus'].get('comment', 'No error')}")

        return None

    async def test_set_scene_item_show_transition(self, scene_name: str, scene_item_id: int) -> None:
        """
        Test the SetSceneItemShowTransition request, including enabling, updating, and disabling.

        :param scene_name: Name of the scene
        :param scene_item_id: ID of the scene item
        """
        print("\n=== Testing SetSceneItemShowTransition ===")

        # Retrieve a valid transition name
        transition_name = await self.get_available_transitions()
        if not transition_name:
            print("✗ Cannot test without an available transition")
            return

        # Test 1: Enable the show transition with a duration
        print(f"\nTest 1: Enable with {transition_name} transition, 500ms duration")
        response = await self.request("SetSceneItemShowTransition", {
            "sceneName": scene_name,
            "sceneItemId": scene_item_id,
            "transitionName": transition_name,
            "transitionDuration": 500
        })

        if response['requestStatus']['result']:
            print("✓ Successfully set show transition")
        else:
            print(f"✗ Failed: {response['requestStatus']['comment']}")

        # Verify the change by getting the transition details
        await asyncio.sleep(0.1)
        await self.test_get_scene_item_show_transition(scene_name, scene_item_id)

        # Test 2: Update only the duration
        print("\nTest 2: Change duration to 1000ms")
        response = await self.request("SetSceneItemShowTransition", {
            "sceneName": scene_name,
            "sceneItemId": scene_item_id,
            "transitionDuration": 1000
        })

        if response['requestStatus']['result']:
            print("✓ Successfully updated duration")
        else:
            print(f"✗ Failed: {response['requestStatus']['comment']}")

        # Test 3: Disable the show transition by setting name to None
        print("\nTest 3: Disable show transition")
        response = await self.request("SetSceneItemShowTransition", {
            "sceneName": scene_name,
            "sceneItemId": scene_item_id,
            "transitionName": None
        })

        if response['requestStatus']['result']:
            print("✓ Successfully disabled show transition")
        else:
            print(f"✗ Failed: {response['requestStatus']['comment']}")

    async def test_get_scene_item_hide_transition(self, scene_name: str, scene_item_id: int) -> Dict[str, Any]:
        """
        Test the GetSceneItemHideTransition request to retrieve hide transition details.

        :param scene_name: Name of the scene
        :param scene_item_id: ID of the scene item
        :return: The response from the request
        """
        print("\n=== Testing GetSceneItemHideTransition ===")

        response = await self.request("GetSceneItemHideTransition", {
            "sceneName": scene_name,
            "sceneItemId": scene_item_id
        })

        if response['requestStatus']['result']:
            data = response.get('responseData', {})
            print(f"✓ Response data: {data}")
            print(f"  Transition name: {data.get('transitionName')}")
            print(f"  Transition duration: {data.get('transitionDuration')} ms")
        else:
            print(f"✗ Failed: {response['requestStatus'].get('comment', 'No error message')}")

        return response

    async def test_set_scene_item_hide_transition(self, scene_name: str, scene_item_id: int) -> None:
        """
        Test the SetSceneItemHideTransition request, including enabling, updating, and disabling.

        :param scene_name: Name of the scene
        :param scene_item_id: ID of the scene item
        """
        print("\n=== Testing SetSceneItemHideTransition ===")

        # Retrieve a valid transition name
        transition_name = await self.get_available_transitions()
        if not transition_name:
            print("✗ Cannot test without an available transition")
            return

        # Test 1: Enable the hide transition with a duration
        print(f"\nTest 1: Enable with {transition_name} transition, 750ms duration")
        response = await self.request("SetSceneItemHideTransition", {
            "sceneName": scene_name,
            "sceneItemId": scene_item_id,
            "transitionName": transition_name,
            "transitionDuration": 750
        })

        if response['requestStatus']['result']:
            print("✓ Successfully set hide transition")
        else:
            print(f"✗ Failed: {response['requestStatus']['comment']}")

        # Verify the change by getting the transition details
        await asyncio.sleep(0.1)
        await self.test_get_scene_item_hide_transition(scene_name, scene_item_id)

        # Test 2: Update only the transition name (using the same for simplicity)
        print(f"\nTest 2: Change transition name to {transition_name}")
        response = await self.request("SetSceneItemHideTransition", {
            "sceneName": scene_name,
            "sceneItemId": scene_item_id,
            "transitionName": transition_name
        })

        if response['requestStatus']['result']:
            print("✓ Successfully updated transition name")
        else:
            print(f"✗ Failed: {response['requestStatus']['comment']}")

        # Test 3: Disable the hide transition by setting name to None
        print("\nTest 3: Disable hide transition")
        response = await self.request("SetSceneItemHideTransition", {
            "sceneName": scene_name,
            "sceneItemId": scene_item_id,
            "transitionName": None
        })

        if response['requestStatus']['result']:
            print("✓ Successfully disabled hide transition")
        else:
            print(f"✗ Failed: {response['requestStatus']['comment']}")

    async def test_visibility_transitions(self, scene_name: str, scene_item_id: int) -> None:
        """
        Test visibility changes (show/hide) with transitions enabled to ensure they apply correctly.

        :param scene_name: Name of the scene
        :param scene_item_id: ID of the scene item
        """
        print("\n=== Testing Visibility Changes with Transitions ===")

        # Retrieve a valid transition name
        transition_name = await self.get_available_transitions()
        if not transition_name:
            print("✗ Cannot test without an available transition")
            return

        # Set up transitions for both show and hide
        print(f"Setting up {transition_name} transitions for visibility test...")
        await self.request("SetSceneItemShowTransition", {
            "sceneName": scene_name,
            "sceneItemId": scene_item_id,
            "transitionName": transition_name,
            "transitionDuration": 300
        })

        await self.request("SetSceneItemHideTransition", {
            "sceneName": scene_name,
            "sceneItemId": scene_item_id,
            "transitionName": transition_name,
            "transitionDuration": 300
        })

        # Test hiding the item
        print("\nHiding scene item (should use hide transition)...")
        await self.request("SetSceneItemEnabled", {
            "sceneName": scene_name,
            "sceneItemId": scene_item_id,
            "sceneItemEnabled": False
        })
        await asyncio.sleep(0.5)  # Wait for the transition to complete
        print("✓ Hide complete")

        # Test showing the item
        print("\nShowing scene item (should use show transition)...")
        await self.request("SetSceneItemEnabled", {
            "sceneName": scene_name,
            "sceneItemId": scene_item_id,
            "sceneItemEnabled": True
        })
        await asyncio.sleep(0.5)  # Wait for the transition to complete
        print("✓ Show complete")

    async def test_edge_cases(self, scene_name: str, scene_item_id: int) -> None:
        """
        Test edge cases for error handling, such as invalid scene names, item IDs, and transition names.

        :param scene_name: Name of the scene
        :param scene_item_id: ID of the scene item
        """
        print("\n=== Testing Edge Cases ===")

        # Test 1: Invalid scene name
        print("\nTest 1: Invalid scene name")
        response = await self.request("GetSceneItemShowTransition", {
            "sceneName": "NonExistentScene",
            "sceneItemId": scene_item_id
        })
        if not response['requestStatus']['result']:
            print(f"✓ Correctly failed: {response['requestStatus']['comment']}")
        else:
            print("✗ Should have failed with invalid scene")

        # Test 2: Invalid scene item ID
        print("\nTest 2: Invalid scene item ID")
        response = await self.request("GetSceneItemShowTransition", {
            "sceneName": scene_name,
            "sceneItemId": 99999
        })
        if not response['requestStatus']['result']:
            print(f"✓ Correctly failed: {response['requestStatus']['comment']}")
        else:
            print("✗ Should have failed with invalid item ID")

        # Test 3: Invalid transition name
        print("\nTest 3: Invalid transition name")
        response = await self.request("SetSceneItemShowTransition", {
            "sceneName": scene_name,
            "sceneItemId": scene_item_id,
            "transitionName": "nonexistent_transition_12345"
        })
        print(f"  Result: {response['requestStatus']['result']}")
        print(f"  Comment: {response['requestStatus'].get('comment', 'No comment')}")


async def main() -> None:
    """
    Main entry point for the script. Parses arguments, connects to OBS, runs tests, and cleans up.
    """
    parser = argparse.ArgumentParser(description='Test OBS WebSocket Scene Item Transitions')
    parser.add_argument('--host', default='localhost', help='OBS WebSocket host')
    parser.add_argument('--port', type=int, default=4455, help='OBS WebSocket port')
    parser.add_argument('--password', help='OBS WebSocket password')
    args = parser.parse_args()

    test = OBSWebSocketTest(host=args.host, port=args.port, password=args.password)

    try:
        await test.connect()
        scene_name, scene_item_id = await test.setup_test_scene()

        # Execute the test suite
        await test.test_get_scene_item_show_transition(scene_name, scene_item_id)
        await test.test_set_scene_item_show_transition(scene_name, scene_item_id)

        await test.test_get_scene_item_hide_transition(scene_name, scene_item_id)
        await test.test_set_scene_item_hide_transition(scene_name, scene_item_id)

        await test.test_visibility_transitions(scene_name, scene_item_id)

        await test.test_edge_cases(scene_name, scene_item_id)

        print("\n" + "=" * 50)
        print("All tests completed!")
        print("=" * 50)

    except Exception as e:
        print(f"Error: {e}")
        import traceback
        traceback.print_exc()
    finally:
        await test.close()


if __name__ == '__main__':
    asyncio.run(main())

@LinuxMainframe
Copy link
Author

I added the test case because I want to show what I tried. I am not sure if this covers everything, as I have kind of learned the protocol over time and only recently got into the source code. So if I am missing anything, documentation or otherwise, please say so. I have been programming a while but not with any group nor to open-source, so I am very new.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Feature Request: Add the Ability to set Source Transition

2 participants