live video stream from Insta360 One X camera in ROS
I have Insta360 One X camera. I want to publish video stream as ROS topic from this camera.
I use Open Spherical Camera API. I want to get video stream from there and then republish it using videostreamopencv.
I already was using camera.takePicture command from there and it was working ok in python script. With that command camera makes photo and save it on cameras memory card. Then I do camera.getImage to download picture.
Now, I want to use getLivePreview. But I dont understand what is the output of this function, and how can I work with this in python script. I tried this code:
#!/usr/bin/env python3
import socket
import onex as insta
import sys
import time
from flask import Flask, render_template, Response
from camera import Camera
def executeRequest(kind, url, headers, data):
"""
Request handler
In case of success returns responce and none
In case of falure returns None and error log
"""
try:
if kind == "post":
responce = requests.post(url, data = data, headers = headers, timeout = timeout)
elif kind == "get":
responce = requests.get(url, data = data, headers = headers, timeout = timeout)
except requests.exceptions.HTTPError:
error = "Incorrect HTTP headers"
return None, error
except requests.exceptions.Timeout:
error = "Connection time out"
return None, error
except requests.exceptions.ConnectionError:
error = "Connection error"
return None, error
except Exception as error:
return None, error
return responce, None
class ONEX:
def ceepAlive(self):
while True:
self.state()
time.sleep(300)
def __init__(self, ip, port):
"""
ip: 192.168.42.1
port: 80
"""
self.ip = ip
self.port = port
self.header = {}
self.header["Content-Type"] = "application/json; charset=utf-8"
self.pathToSave = "files/"
responce, self.lastError = self.setOption("offDelay", 65535)
if not responce:
self.connected = False
return
responce, self.lastError = self.setOption("sleepDelay", 20)
if not responce:
self.connected = False
return
self.connected = True
def getLivePreview(self):
"""
Get live preview
Reference:
https://developers.google.com/streetview/open-spherical-camera/reference/camera/getlivepreview
"""
url = "http://{ip}:{port}/osc/commands/execute".format(ip = self.ip, port = self.port)
data = json.dumps({"name":"camera.getLivePreview",})
responce, error = executeRequest("post", url, self.header, data)
if not responce:
return None, error
responce = responce.json()
if responce["state"] == "error":
return None, responce["error"]["message"]
return responce, None
def connectCamera(ip, port):
cam = insta.ONEX(ip, port)
if cam.connected == False:
print "Can't connect to camera", cam.lastError
return None
print "Camera connected"
return cam
if __name__ == '__main__':
cam = connectCamera("192.168.42.1", "80")
responce, error = cam.getLivePreview()
r = Response(responce, mimetype='multipart/x-mixed-replace; boundary=---osclivepreview---')
##print r
##frame = responce.get_frame()
##print frame
##time.sleep(100)
Asked by june2473 on 2020-01-30 02:24:38 UTC
Comments
Where onex module can be found?
Asked by gurije on 2020-06-02 03:13:56 UTC