live video stream from Insta360 One X camera in ROS

I have Insta360 One X camera. I want to publish video stream as ROS topic from this camera.

I use Open Spherical Camera API. I want to get video stream from there and then republish it using videostreamopencv.

I already was using camera.takePicture command from there and it was working ok in python script. With that command camera makes photo and save it on cameras memory card. Then I do camera.getImage to download picture.

Now, I want to use getLivePreview. But I dont understand what is the output of this function, and how can I work with this in python script. I tried this code:

#!/usr/bin/env python3

import socket
import onex as insta
import sys
import time

from flask import Flask, render_template, Response
from camera import Camera

def executeRequest(kind, url, headers, data):
    Request handler

    In case of success returns responce and none 
    In case of falure returns None and error log 
        if kind == "post":
            responce =, data = data, headers = headers, timeout = timeout)
        elif kind == "get":
            responce = requests.get(url, data = data, headers = headers, timeout = timeout)     
    except requests.exceptions.HTTPError:
        error = "Incorrect HTTP headers"
        return None, error
    except requests.exceptions.Timeout:
        error = "Connection time out"
        return None, error
    except requests.exceptions.ConnectionError:
        error = "Connection error"
        return None, error
    except Exception as error:
        return None, error
    return responce, None

class ONEX:
    def ceepAlive(self):
        while True:

    def __init__(self, ip, port):
        port: 80
        self.ip = ip
        self.port = port
        self.header = {}
        self.header["Content-Type"] = "application/json; charset=utf-8"
        self.pathToSave = "files/"
        responce, self.lastError = self.setOption("offDelay", 65535)
        if not responce:
            self.connected = False
        responce, self.lastError = self.setOption("sleepDelay", 20)
        if  not responce:
            self.connected = False
        self.connected = True

    def getLivePreview(self):
        Get live preview

        url = "http://{ip}:{port}/osc/commands/execute".format(ip = self.ip, port = self.port)
        data = json.dumps({"name":"camera.getLivePreview",})
        responce, error = executeRequest("post", url, self.header, data)
        if not responce:
            return None, error
        responce = responce.json()
        if responce["state"] == "error":
            return None, responce["error"]["message"]
        return responce, None

def connectCamera(ip, port):
    cam = insta.ONEX(ip, port)
    if cam.connected == False:
        print "Can't connect to camera", cam.lastError
        return None
    print "Camera connected"
    return cam

if __name__ == '__main__':
    cam = connectCamera("", "80")
    responce, error = cam.getLivePreview()
    r = Response(responce, mimetype='multipart/x-mixed-replace; boundary=---osclivepreview---')
    ##print r
    ##frame = responce.get_frame()
    ##print frame

Asked by june2473 on 2020-01-30 02:24:38 UTC


Where onex module can be found?

Asked by gurije on 2020-06-02 03:13:56 UTC
