• Hi Guest!

    We are extremely excited to announce the release of our first Beta1.1 and the first release of our Public AddonKit!
    To participate in the Beta, a subscription to the Entertainer or Creator Tier is required. For access to the Public AddonKit you must be a Creator tier member. Once subscribed, download instructions can be found here.

    Click here for information and guides regarding the VaM2 beta. Join our Discord server for more announcements and community discussion about VaM2.
  • Hi Guest!

    VaM2 Resource Categories have now been added to the Hub! For information on posting VaM2 resources and details about VaM2 related changes to our Community Forums, please see our official announcement here.

VaM 1.x Using the Coyote 3 with Toy Controller plugins

Threads regarding the original VaM 1.x

LittleBird

New member
Joined
Jan 15, 2021
Messages
1
Reactions
0
I vibe coded a python script that takes a UDP connection from the toy controller scripts (I have been using the Yoooi ToySerialController plugin) and bridges it to a websocket, which then the CoyoteSocket uses to control the Coyote 3.

The script requires asyncio, socket, and websockets.
Code:
pip install asyncio socket websockets

// Update the ports to your liking based on your configuration.
Default UDP port is 8000
Default WS port is 12346

### Usage ###

Requires python 3.10+

Run the script before connecting the plugin.

Code:
python.exe udp2ws.py

#####

# udp2ws.py
Python:
import asyncio
import socket
import websockets

UDP_IP = "0.0.0.0"
UDP_PORT = 8000

WS_URL = "ws://localhost:12346"   # WebSocket server to connect to


async def udp_listener(queue):
    """Listen for UDP packets and push them into an asyncio queue."""
    loop = asyncio.get_running_loop()

    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    sock.bind((UDP_IP, UDP_PORT))
    sock.setblocking(False)

    print(f"UDP server listening on {UDP_IP}:{UDP_PORT}")

    while True:
        data = await loop.sock_recv(sock, 4096)
        message = data.decode(errors="ignore").strip()
        print(f"UDP received: {message}")
        await queue.put(message)


async def websocket_sender(queue):
    """Connect to WebSocket server and forward UDP messages."""
    while True:
        try:
            print(f"Connecting to WebSocket server at {WS_URL}...")
            async with websockets.connect(WS_URL) as ws:
                print("Connected to WebSocket server")

                while True:
                    msg = await queue.get()
                    await ws.send(msg)
                    print(f"Sent to WS: {msg}")

        except Exception as e:
            print(f"WebSocket error: {e}")
            print("Retrying in 3 seconds...")
            await asyncio.sleep(3)


async def main():
    queue = asyncio.Queue()

    await asyncio.gather(
        udp_listener(queue),
        websocket_sender(queue)
    )


if __name__ == "__main__":
    asyncio.run(main())

I am sure this could get improved on, but it works.
 
Back
Top Bottom