'''
*************************************************************************
*
* CareVision CONFIDENTIAL
# * __________________
# * Authors: Vijay Varada, Ashwin Prasad, CS Raghunandan
# * Created: Feb 2016
# * Copyright (C) 2018 by BetOnIndia Techonology Pvt. Ltd.
*
* NOTICE: All information contained herein is, and remains
# * the property of Betonindia and Fracktal Works.
# * The intellectual and technical concepts contained
# * herein are proprietary to Betonindia and Fracktal Works
# * and may be covered by Patents, patents in process,
# * and are protected by trade secret or copyright law.
# * Dissemination of this information or reproduction of this material
# * is strictly forbidden unless prior written permission is obtained.
# *************************************************************************
# '''
import datetime
import logging
import os
import subprocess
import time
from re import search
import gi
from gi.repository import GObject, Gst
from cleanup import fix_dvr_mp4
from config import CameraConf, DirPaths, Helper, MediaConf
gi.require_version('Gst', '1.0')
Gst.init(None)
logger = logging.getLogger("carevision_log")
class GstPipeline(object):
"""
Record DVR/Event videos, Timelapse images and local/remote stream using
gstreamer pipelines - doesnt record dvr when there is no sd card
"""
def __init__(self, led_object):
self.led_object = led_object
self.date_folder = Helper.get_date_yyyymmdd(int(time.time()))
self.filename = int(time.time())
self.dvr_started = False
self.delay = 2
# Local
self.LOCAL_PORT = os.getenv("LOCAL_LS_PORT")
self.DEFAULT_LOCAL_BITRATE = 1000 * 1000
self.LOCAL_RUN_COUNT = 5 # Count
self.LOCAL_MIN_BITRATE = 600 * 1000
self.LOCAL_MAX_BITRATE = 1500 * 1000
self.LOCAL_STEP_BITRATE = 100 * 1000
# Remote
self.REMOTE_PORT = os.getenv("REMOTE_LS_PORT")
self.DEFAULT_REMOTE_BITRATE = 800 * 1000
self.REMOTE_RUN_COUNT = 5 # Count
self.REMOTE_MIN_BITRATE = 500 * 1000
self.REMOTE_MAX_BITRATE = 1200 * 1000
self.REMOTE_STEP_BITRATE = 100 * 1000
# media parameters
self.ALSA_DEVICE = "device=hw:0"
self.CAPTURE_FPMIN = 30 # Capture a photo every 30 sec
self.TEMP_EVENT_VIDEO = "/dev/shm"
self.tl_framerate_changed = False
self.TL_SENSITIVITY = 3
# Create datetime folders
self.create_folders(self.date_folder)
if MediaConf.DVR_Enabled:
# 480p local, remote stream, DVR Videos and Event videos and
# 480p timelapse image capture (5 sinks)
self.pipeline = Gst.parse_launch(
'v4l2src do-timestamp=true device=/dev/video0 ! video/x-raw,width=848,height=480,framerate=20/1 ! clockoverlay halignment=right valignment=bottom time-format="%d/%m/%Y %H:%M:%S" text=" " font-desc="Sans, 16" shaded-background=true name=clockoverlay_caps ! tee name=t t. ! queue ! omxh264enc periodicty-idr=40 control-rate=1 target-bitrate={8} name="lslocal" ! video/x-h264,stream-format=byte-stream,profile=baseline,quality=baseline ! h264parse ! rtph264pay config-interval=1 pt=96 ! queue2 ! udpsink host=127.0.0.1 port={13} t. ! queue ! omxh264enc periodicty-idr=40 control-rate=1 target-bitrate={9} name="lsremote" ! video/x-h264,stream-format=byte-stream,profile=baseline,quality=baseline ! h264parse ! rtph264pay config-interval=1 pt=96 ! queue2 ! udpsink host=127.0.0.1 port={14} t. ! queue ! omxh264enc periodicty-idr=40 control-rate=1 target-bitrate=1000000 ! video/x-h264,stream-format=byte-stream,profile=baseline,quality=baseline ! h264parse ! tee name=recordings ! queue ! dmuxout.video splitmuxsink muxer="{11}" name=dmuxout location="{2}/{10}/{3}{12}" max-size-time=120000000000 max-size-bytes=0 send-keyframe-requests=true recordings. ! queue ! emuxout.video splitmuxsink muxer="{11}" name=emuxout location="{0}/%07d{12}" max-size-time=10000000000 max-size-bytes=0 send-keyframe-requests=true t. ! queue ! videorate ! capsfilter caps="video/x-raw,framerate=1/{4}" name=timelapse_framerate ! queue ! jpegenc ! queue ! multifilesink post-messages=true name="timelapse_folder" location="{5}/{10}/{6}-%05d.jpg" alsasrc {7} do-timestamp=true ! queue ! audio/x-raw,rate=8000,channels=2,format=S16LE ! audioconvert ! tee name=wrtc ! queue ! opusenc ! rtpopuspay pt=127 ! queue2 ! multiudpsink clients=127.0.0.1:{15},127.0.0.1:{16} wrtc. ! queue ! voaacenc bitrate=65536 ! tee name=at ! queue ! emuxout.audio_%u at. ! queue ! dmuxout.audio_%u'
.format(
self.TEMP_EVENT_VIDEO, self.filename,
DirPaths.DVR_video_path, self.filename, self.CAPTURE_FPMIN,
DirPaths.timelapse_path, self.filename, self.ALSA_DEVICE,
self.DEFAULT_LOCAL_BITRATE, self.DEFAULT_REMOTE_BITRATE,
self.date_folder, MediaConf.MUXER, MediaConf.EXTENSION,
self.LOCAL_PORT, self.REMOTE_PORT,
os.getenv("LOCAL_AUDIO_PORT"),
os.getenv("REMOTE_AUDIO_PORT")))
else:
# 480p local,remote stream, Event videos and
# 480p timelapse images capture (4 sinks)
self.pipeline = Gst.parse_launch(
'v4l2src do-timestamp=true device=/dev/video0 ! video/x-raw,width=848,height=480,framerate=20/1 ! clockoverlay halignment=right valignment=bottom time-format="%d/%m/%Y %H:%M:%S" text=" " font-desc="Sans, 16" shaded-background=true name=clockoverlay_caps ! tee name=t t. ! queue ! omxh264enc periodicty-idr=40 control-rate=1 target-bitrate={8} name="lslocal" ! video/x-h264,stream-format=byte-stream,profile=baseline,quality=baseline ! h264parse ! rtph264pay config-interval=1 pt=96 ! queue2 ! udpsink host=127.0.0.1 port={13} t. ! queue ! omxh264enc periodicty-idr=40 control-rate=1 target-bitrate={9} name="lsremote" ! video/x-h264,stream-format=byte-stream,profile=baseline,quality=baseline ! h264parse ! rtph264pay config-interval=1 pt=96 ! queue2 ! udpsink host=127.0.0.1 port={14} t. ! queue ! omxh264enc periodicty-idr=40 control-rate=1 target-bitrate=1000000 ! video/x-h264,stream-format=byte-stream,profile=baseline,quality=baseline ! h264parse ! queue ! emuxout.video splitmuxsink muxer="{11}" name=emuxout location="{0}/%07d{12}" max-size-time=10000000000 max-size-bytes=0 send-keyframe-requests=true t. ! queue ! videorate ! capsfilter caps="video/x-raw,framerate=1/{4}" name=timelapse_framerate ! queue ! jpegenc ! queue ! multifilesink post-messages=true name="timelapse_folder" location="{5}/{10}/{6}-%05d.jpg" alsasrc {7} do-timestamp=true ! queue ! audio/x-raw,rate=8000,channels=2,format=S16LE ! audioconvert ! tee name=wrtc ! queue ! opusenc ! rtpopuspay pt=127 ! queue2 ! multiudpsink clients=127.0.0.1:{15},127.0.0.1:{16} wrtc. ! queue ! voaacenc bitrate=65536 ! queue ! emuxout.audio_%u'
.format(
self.TEMP_EVENT_VIDEO, self.filename,
DirPaths.DVR_video_path, self.filename, self.CAPTURE_FPMIN,
DirPaths.timelapse_path, self.filename, self.ALSA_DEVICE,
self.DEFAULT_LOCAL_BITRATE, self.DEFAULT_REMOTE_BITRATE,
self.date_folder, MediaConf.MUXER, MediaConf.EXTENSION,
self.LOCAL_PORT, self.REMOTE_PORT,
os.getenv("LOCAL_AUDIO_PORT"),
os.getenv("REMOTE_AUDIO_PORT")))
def check_pipeline_status(self, ret):
if ret == Gst.StateChangeReturn.FAILURE:
logger.critical("Gstreamer: error starting pipeline")
self.bus = self.pipeline.get_bus()
msg = self.bus.pop_filtered(Gst.MessageType.ERROR
| Gst.MessageType.EOS)
if msg:
t = msg.type
if t == Gst.MessageType.ERROR:
err, dbg = msg.parse_error()
logger.critical("Gstreamer: Error element -" +
msg.src.get_name() + " - " + err.message)
if dbg:
logger.critical("Gstreamer: debugging info - " + dbg)
elif t == Gst.MessageType.EOS:
logger.critical("Gstreamer: End-Of-Stream reached")
else:
logger.critical("Gstreamer: Unexpected message received ")
def on_message(self, bus, message):
try:
msgStruct = message.get_structure()
msgName = msgStruct.get_name()
if (msgName == "GstMultiFileSink"):
logger.info("Multifilesink: recorded a timelapse image!!")
except Exception:
logger.warning(
"There was an error with processing the Gstreamer message", exc_info=True)
def create_folders(self, date_folder):
if not os.path.isdir("{}/{}".format(DirPaths.timelapse_path,
date_folder)):
os.makedirs("{}/{}".format(DirPaths.timelapse_path, date_folder))
if MediaConf.DVR_Enabled:
if not os.path.isdir("{}/{}".format(DirPaths.DVR_video_path,
date_folder)):
os.makedirs("{}/{}".format(DirPaths.DVR_video_path,
date_folder))
def get_dropped_frames(self, port):
hexport = "%X" % int(port)
with open('/proc/net/udp', 'rb') as udp_file:
lines = udp_file.readlines()
for item in lines:
# print item
if search(hexport, item) is not None:
buffers = item.strip().split()
sl, la, ra, st, tx_rx, tr, retrnsmt, uid, timeout, inode, ref, pointer, drops = buffers
return int(drops)
return 0
@Helper.run_async
def start_pipeline(self):
try:
self.dvr_started = True
self.bus = self.pipeline.get_bus()
self.bus.enable_sync_message_emission()
self.bus.add_signal_watch()
self.bus.connect("message", self.on_message)
ret = self.pipeline.set_state(Gst.State.PLAYING)
self.check_pipeline_status(ret)
if MediaConf.DVR_Enabled:
self.pipeline.get_by_name("dmuxout").connect(
'format-location', self.on_file_change_dvr)
else:
self.pipeline.get_by_name("emuxout").connect(
"format-location", self.on_file_change_tl)
self.loop = GObject.MainLoop()
logger.info("Gstreamer pipeline started")
except Exception:
logger.error("error starting gstreamer pipeline", exc_info=True)
self.led_object.led_status("red")
time.sleep(300)
# reboot linux if there is an error starting the pipelines
logger.critical("Restarting the box since pipelines not running")
subprocess.Popen("sudo reboot", shell=True)
def stop_pipeline(self):
try:
logger.info("Stopping gstreamer pipelines")
ret = self.pipeline.set_state(Gst.State.NULL)
self.check_pipeline_status(ret)
except Exception:
logger.error("error stopping gstreamer pipelines", exc_info=True)
def pause_pipeline(self):
try:
logger.info("Pausing gstreamer pipelines")
ret = self.pipeline.set_state(Gst.State.PAUSED)
self.check_pipeline_status(ret)
except Exception:
logger.error("Error pausing gstreamer pipeline", exc_info=True)
def resume_pipeline(self):
try:
logger.info("Resuming gstreamer pipelines")
ret = self.pipeline.set_state(Gst.State.PLAYING)
self.check_pipeline_status(ret)
except Exception:
logger.error("Error resuming gstreamer pipeline", exc_info=True)
def change_bitrate_local(self, bitrate):
self.pipeline.get_by_name("lslocal").set_property(
"target-bitrate", bitrate)
def change_bitrate_remote(self, bitrate):
self.pipeline.get_by_name("lsremote").set_property(
"target-bitrate", bitrate)
def change_framerate_timelapse(self, framerate):
videorate_caps = Gst.caps_from_string(
'video/x-raw,framerate=1/{}'.format(framerate))
self.pipeline.get_by_name("timelapse_framerate").set_property(
"caps", videorate_caps)
def on_file_change_tl(self, bus, filenumber):
try:
current_timestamp = int(time.time())
current_timestamp_folder = Helper.get_date_yyyymmdd(
current_timestamp)
self.create_folders(current_timestamp_folder)
timelapse_location = "{}/{}/{}-%05d.jpg".format(
DirPaths.timelapse_path, current_timestamp_folder,
current_timestamp)
self.pipeline.get_by_name("timelapse_folder").set_property(
"location", timelapse_location)
self.change_framerate_timelapse(self.CAPTURE_FPMIN)
self.tl_framerate_changed = False
except Exception:
logger.error("Error with timelapse location change", exc_info=True)
def on_file_change_dvr(self, bus, filenumber):
try:
current_timestamp = int(time.time())
current_timestamp_folder = Helper.get_date_yyyymmdd(
current_timestamp)
self.create_folders(current_timestamp_folder)
DVR_path = "{}/{}".format(DirPaths.DVR_video_path,
current_timestamp_folder)
DVR_location = "{}/{}{}".format(DVR_path, current_timestamp,
MediaConf.EXTENSION)
DVR_date_files = Helper.list_all_files_sorted(
DVR_path, '*' + MediaConf.EXTENSION, full=True)
# call function to fix the previously recorded DVR video
if DVR_date_files:
fix_dvr_mp4(DVR_date_files[-1])
timelapse_location = "{}/{}/{}-%05d.jpg".format(
DirPaths.timelapse_path, current_timestamp_folder,
current_timestamp)
self.pipeline.get_by_name("dmuxout").set_property(
"location", DVR_location)
self.pipeline.get_by_name("timelapse_folder").set_property(
"location", timelapse_location)
self.change_framerate_timelapse(self.CAPTURE_FPMIN)
self.tl_framerate_changed = False
DVR_start_time = datetime.datetime.fromtimestamp(current_timestamp)
DVR_end_time = DVR_start_time + datetime.timedelta(minutes=2)
logger.info(
"Recording new DVR video for the duration: {} - {}".format(
DVR_start_time, DVR_end_time))
logger.info("DVR video name: %s" % current_timestamp)
except Exception:
logger.error("Splitmuxsink filename change error", exc_info=True)
@Helper.run_async
def bitrate_monitor_local(self):
local_old_drop = self.get_dropped_frames(self.LOCAL_PORT)
local_count = self.LOCAL_RUN_COUNT
local_current_bitrate = self.LOCAL_MIN_BITRATE
while True:
if CameraConf.livestream_active:
local_drops = self.get_dropped_frames(self.LOCAL_PORT)
if local_drops > local_old_drop:
local_old_drop = local_drops
logger.info(
"drop occurred , decremented local bitrate and reset counter"
)
if local_current_bitrate > self.LOCAL_MIN_BITRATE:
local_current_bitrate -= self.LOCAL_STEP_BITRATE
logger.info(
"Set Bitrate {}".format(local_current_bitrate))
self.change_bitrate_local(local_current_bitrate)
local_count = self.LOCAL_RUN_COUNT
else:
if local_count > 0:
local_count = local_count - 1
if local_count == 0:
if local_current_bitrate < self.LOCAL_MAX_BITRATE:
logger.info(
"reached 10 secs without drop, now increment local bitrate and reset counter"
)
local_current_bitrate += self.LOCAL_STEP_BITRATE
logger.info(
"Set Bitrate {}".format(local_current_bitrate))
self.change_bitrate_local(local_current_bitrate)
local_count = self.LOCAL_RUN_COUNT
if local_current_bitrate == self.LOCAL_MAX_BITRATE:
logger.info("reached max local bitrate: {}".format(
local_current_bitrate))
local_count = self.LOCAL_RUN_COUNT
logger.debug("Local bitrate monitor {}".format(local_count))
time.sleep(self.delay)
@Helper.run_async
def bitrate_monitor_remote(self):
remote_old_drop = self.get_dropped_frames(self.REMOTE_PORT)
remote_count = self.REMOTE_RUN_COUNT
remote_current_bitrate = self.REMOTE_MIN_BITRATE
while True:
if CameraConf.livestream_active:
remote_drops = self.get_dropped_frames(self.REMOTE_PORT)
if remote_drops > remote_old_drop:
remote_old_drop = remote_drops
logger.info(
"drop occurred, decremented remote bitrate and reset counter"
)
if remote_current_bitrate > self.REMOTE_MIN_BITRATE:
remote_current_bitrate -= self.REMOTE_STEP_BITRATE
logger.info(
"Set Bitrate {}".format(remote_current_bitrate))
self.change_bitrate_remote(remote_current_bitrate)
remote_count = self.REMOTE_RUN_COUNT
else:
if remote_count > 0:
remote_count = remote_count - 1
if remote_count == 0:
if remote_current_bitrate < self.REMOTE_MAX_BITRATE:
logger.info(
"reached 10 secs without drop, now increment remote bitrate and reset counter"
)
remote_current_bitrate += self.REMOTE_STEP_BITRATE
logger.info("Set Bitrate {}".format(
remote_current_bitrate))
self.change_bitrate_remote(remote_current_bitrate)
remote_count = self.REMOTE_RUN_COUNT
if remote_current_bitrate == self.REMOTE_MAX_BITRATE:
logger.info("reached max remote bitrate:{}".format(
remote_current_bitrate))
remote_count = self.REMOTE_RUN_COUNT
logger.debug("Remote bitrate monitor {}".format(remote_count))
time.sleep(self.delay)
class CameraManager(GstPipeline):
def __init__(self, led_object):
super(CameraManager, self).__init__(led_object)
def decrement_livestream_counter(self):
if CameraConf.stream_counter > 0:
CameraConf.stream_counter -= 1
logger.debug('stream count decrement : {}'.format(
CameraConf.stream_counter))
def increment_livestream_counter(self):
CameraConf.livestream_active = True
CameraConf.stream_counter += 1
logger.debug('strem count increment : {}'.format(
CameraConf.stream_counter))
def check_camera(self):
try:
os.stat("/dev/video0")
return True
except OSError:
return False
def camera_watchdog(self):
# check if camera is working, if not wait for 3 minutes and try
# again for three times, if not restart the linux system
camera_attempt = 0
while camera_attempt < 3:
if self.check_camera():
logger.debug("Camera is detected at /dev/video0")
break
else:
camera_attempt += 1
logger.critical("Camera not detected or is not working")
time.sleep(180)
if camera_attempt == 3:
logger.critical("Unable to detected camera after 3 attempts")
logger.critical("Restarting the box")
subprocess.Popen("sudo reboot", shell=True)
@Helper.run_async
def motion_timelapse_handler(self):
while True:
if CameraConf.motion_count > self.TL_SENSITIVITY:
if not self.tl_framerate_changed:
CameraConf.motion_count = 0
self.change_framerate_timelapse(10)
logger.info("Changed framerate for timelapse")
self.tl_framerate_changed = True
time.sleep(1)
def run_pipelines(self):
if self.check_camera():
# flip the camera orientation horizontally and vertically
subprocess.call("v4l2-ctl -c vertical_flip=1", shell=True)
subprocess.call("v4l2-ctl -c horizontal_flip=1", shell=True)
# restore the alsa settings
logger.info("restoring alsamixer settings")
subprocess.call(
"alsactl --file /etc/asound.state restore", shell=True)
subprocess.call(
"arecord -D hw:0 -c2 -r 8000 -f S32_LE -t wav -V mono -d 1 -v test.wav",
shell=True)
time.sleep(1)
# Gstreamer pipelines starts here
self.start_pipeline()
self.bitrate_monitor_local()
self.bitrate_monitor_remote()
self.motion_timelapse_handler()
else:
logger.critical("Camera not detected or is not working")
logger.critical("Did not start Gstreamer pieplines")