CameraManager.py

Run Settings
LanguagePython
Language Version
Run Command
''' ************************************************************************* * * CareVision CONFIDENTIAL # * __________________ # * Authors: Vijay Varada, Ashwin Prasad, CS Raghunandan # * Created: Feb 2016 # * Copyright (C) 2018 by BetOnIndia Techonology Pvt. Ltd. * * NOTICE: All information contained herein is, and remains # * the property of Betonindia and Fracktal Works. # * The intellectual and technical concepts contained # * herein are proprietary to Betonindia and Fracktal Works # * and may be covered by Patents, patents in process, # * and are protected by trade secret or copyright law. # * Dissemination of this information or reproduction of this material # * is strictly forbidden unless prior written permission is obtained. # ************************************************************************* # ''' import datetime import logging import os import subprocess import time from re import search import gi from gi.repository import GObject, Gst from cleanup import fix_dvr_mp4 from config import CameraConf, DirPaths, Helper, MediaConf gi.require_version('Gst', '1.0') Gst.init(None) logger = logging.getLogger("carevision_log") class GstPipeline(object): """ Record DVR/Event videos, Timelapse images and local/remote stream using gstreamer pipelines - doesnt record dvr when there is no sd card """ def __init__(self, led_object): self.led_object = led_object self.date_folder = Helper.get_date_yyyymmdd(int(time.time())) self.filename = int(time.time()) self.dvr_started = False self.delay = 2 # Local self.LOCAL_PORT = os.getenv("LOCAL_LS_PORT") self.DEFAULT_LOCAL_BITRATE = 1000 * 1000 self.LOCAL_RUN_COUNT = 5 # Count self.LOCAL_MIN_BITRATE = 600 * 1000 self.LOCAL_MAX_BITRATE = 1500 * 1000 self.LOCAL_STEP_BITRATE = 100 * 1000 # Remote self.REMOTE_PORT = os.getenv("REMOTE_LS_PORT") self.DEFAULT_REMOTE_BITRATE = 800 * 1000 self.REMOTE_RUN_COUNT = 5 # Count self.REMOTE_MIN_BITRATE = 500 * 1000 self.REMOTE_MAX_BITRATE = 1200 * 1000 self.REMOTE_STEP_BITRATE = 100 * 1000 # media parameters self.ALSA_DEVICE = "device=hw:0" self.CAPTURE_FPMIN = 30 # Capture a photo every 30 sec self.TEMP_EVENT_VIDEO = "/dev/shm" self.tl_framerate_changed = False self.TL_SENSITIVITY = 3 # Create datetime folders self.create_folders(self.date_folder) if MediaConf.DVR_Enabled: # 480p local, remote stream, DVR Videos and Event videos and # 480p timelapse image capture (5 sinks) self.pipeline = Gst.parse_launch( 'v4l2src do-timestamp=true device=/dev/video0 ! video/x-raw,width=848,height=480,framerate=20/1 ! clockoverlay halignment=right valignment=bottom time-format="%d/%m/%Y %H:%M:%S" text=" " font-desc="Sans, 16" shaded-background=true name=clockoverlay_caps ! tee name=t t. ! queue ! omxh264enc periodicty-idr=40 control-rate=1 target-bitrate={8} name="lslocal" ! video/x-h264,stream-format=byte-stream,profile=baseline,quality=baseline ! h264parse ! rtph264pay config-interval=1 pt=96 ! queue2 ! udpsink host=127.0.0.1 port={13} t. ! queue ! omxh264enc periodicty-idr=40 control-rate=1 target-bitrate={9} name="lsremote" ! video/x-h264,stream-format=byte-stream,profile=baseline,quality=baseline ! h264parse ! rtph264pay config-interval=1 pt=96 ! queue2 ! udpsink host=127.0.0.1 port={14} t. ! queue ! omxh264enc periodicty-idr=40 control-rate=1 target-bitrate=1000000 ! video/x-h264,stream-format=byte-stream,profile=baseline,quality=baseline ! h264parse ! tee name=recordings ! queue ! dmuxout.video splitmuxsink muxer="{11}" name=dmuxout location="{2}/{10}/{3}{12}" max-size-time=120000000000 max-size-bytes=0 send-keyframe-requests=true recordings. ! queue ! emuxout.video splitmuxsink muxer="{11}" name=emuxout location="{0}/%07d{12}" max-size-time=10000000000 max-size-bytes=0 send-keyframe-requests=true t. ! queue ! videorate ! capsfilter caps="video/x-raw,framerate=1/{4}" name=timelapse_framerate ! queue ! jpegenc ! queue ! multifilesink post-messages=true name="timelapse_folder" location="{5}/{10}/{6}-%05d.jpg" alsasrc {7} do-timestamp=true ! queue ! audio/x-raw,rate=8000,channels=2,format=S16LE ! audioconvert ! tee name=wrtc ! queue ! opusenc ! rtpopuspay pt=127 ! queue2 ! multiudpsink clients=127.0.0.1:{15},127.0.0.1:{16} wrtc. ! queue ! voaacenc bitrate=65536 ! tee name=at ! queue ! emuxout.audio_%u at. ! queue ! dmuxout.audio_%u' .format( self.TEMP_EVENT_VIDEO, self.filename, DirPaths.DVR_video_path, self.filename, self.CAPTURE_FPMIN, DirPaths.timelapse_path, self.filename, self.ALSA_DEVICE, self.DEFAULT_LOCAL_BITRATE, self.DEFAULT_REMOTE_BITRATE, self.date_folder, MediaConf.MUXER, MediaConf.EXTENSION, self.LOCAL_PORT, self.REMOTE_PORT, os.getenv("LOCAL_AUDIO_PORT"), os.getenv("REMOTE_AUDIO_PORT"))) else: # 480p local,remote stream, Event videos and # 480p timelapse images capture (4 sinks) self.pipeline = Gst.parse_launch( 'v4l2src do-timestamp=true device=/dev/video0 ! video/x-raw,width=848,height=480,framerate=20/1 ! clockoverlay halignment=right valignment=bottom time-format="%d/%m/%Y %H:%M:%S" text=" " font-desc="Sans, 16" shaded-background=true name=clockoverlay_caps ! tee name=t t. ! queue ! omxh264enc periodicty-idr=40 control-rate=1 target-bitrate={8} name="lslocal" ! video/x-h264,stream-format=byte-stream,profile=baseline,quality=baseline ! h264parse ! rtph264pay config-interval=1 pt=96 ! queue2 ! udpsink host=127.0.0.1 port={13} t. ! queue ! omxh264enc periodicty-idr=40 control-rate=1 target-bitrate={9} name="lsremote" ! video/x-h264,stream-format=byte-stream,profile=baseline,quality=baseline ! h264parse ! rtph264pay config-interval=1 pt=96 ! queue2 ! udpsink host=127.0.0.1 port={14} t. ! queue ! omxh264enc periodicty-idr=40 control-rate=1 target-bitrate=1000000 ! video/x-h264,stream-format=byte-stream,profile=baseline,quality=baseline ! h264parse ! queue ! emuxout.video splitmuxsink muxer="{11}" name=emuxout location="{0}/%07d{12}" max-size-time=10000000000 max-size-bytes=0 send-keyframe-requests=true t. ! queue ! videorate ! capsfilter caps="video/x-raw,framerate=1/{4}" name=timelapse_framerate ! queue ! jpegenc ! queue ! multifilesink post-messages=true name="timelapse_folder" location="{5}/{10}/{6}-%05d.jpg" alsasrc {7} do-timestamp=true ! queue ! audio/x-raw,rate=8000,channels=2,format=S16LE ! audioconvert ! tee name=wrtc ! queue ! opusenc ! rtpopuspay pt=127 ! queue2 ! multiudpsink clients=127.0.0.1:{15},127.0.0.1:{16} wrtc. ! queue ! voaacenc bitrate=65536 ! queue ! emuxout.audio_%u' .format( self.TEMP_EVENT_VIDEO, self.filename, DirPaths.DVR_video_path, self.filename, self.CAPTURE_FPMIN, DirPaths.timelapse_path, self.filename, self.ALSA_DEVICE, self.DEFAULT_LOCAL_BITRATE, self.DEFAULT_REMOTE_BITRATE, self.date_folder, MediaConf.MUXER, MediaConf.EXTENSION, self.LOCAL_PORT, self.REMOTE_PORT, os.getenv("LOCAL_AUDIO_PORT"), os.getenv("REMOTE_AUDIO_PORT"))) def check_pipeline_status(self, ret): if ret == Gst.StateChangeReturn.FAILURE: logger.critical("Gstreamer: error starting pipeline") self.bus = self.pipeline.get_bus() msg = self.bus.pop_filtered(Gst.MessageType.ERROR | Gst.MessageType.EOS) if msg: t = msg.type if t == Gst.MessageType.ERROR: err, dbg = msg.parse_error() logger.critical("Gstreamer: Error element -" + msg.src.get_name() + " - " + err.message) if dbg: logger.critical("Gstreamer: debugging info - " + dbg) elif t == Gst.MessageType.EOS: logger.critical("Gstreamer: End-Of-Stream reached") else: logger.critical("Gstreamer: Unexpected message received ") def on_message(self, bus, message): try: msgStruct = message.get_structure() msgName = msgStruct.get_name() if (msgName == "GstMultiFileSink"): logger.info("Multifilesink: recorded a timelapse image!!") except Exception: logger.warning( "There was an error with processing the Gstreamer message", exc_info=True) def create_folders(self, date_folder): if not os.path.isdir("{}/{}".format(DirPaths.timelapse_path, date_folder)): os.makedirs("{}/{}".format(DirPaths.timelapse_path, date_folder)) if MediaConf.DVR_Enabled: if not os.path.isdir("{}/{}".format(DirPaths.DVR_video_path, date_folder)): os.makedirs("{}/{}".format(DirPaths.DVR_video_path, date_folder)) def get_dropped_frames(self, port): hexport = "%X" % int(port) with open('/proc/net/udp', 'rb') as udp_file: lines = udp_file.readlines() for item in lines: # print item if search(hexport, item) is not None: buffers = item.strip().split() sl, la, ra, st, tx_rx, tr, retrnsmt, uid, timeout, inode, ref, pointer, drops = buffers return int(drops) return 0 @Helper.run_async def start_pipeline(self): try: self.dvr_started = True self.bus = self.pipeline.get_bus() self.bus.enable_sync_message_emission() self.bus.add_signal_watch() self.bus.connect("message", self.on_message) ret = self.pipeline.set_state(Gst.State.PLAYING) self.check_pipeline_status(ret) if MediaConf.DVR_Enabled: self.pipeline.get_by_name("dmuxout").connect( 'format-location', self.on_file_change_dvr) else: self.pipeline.get_by_name("emuxout").connect( "format-location", self.on_file_change_tl) self.loop = GObject.MainLoop() logger.info("Gstreamer pipeline started") except Exception: logger.error("error starting gstreamer pipeline", exc_info=True) self.led_object.led_status("red") time.sleep(300) # reboot linux if there is an error starting the pipelines logger.critical("Restarting the box since pipelines not running") subprocess.Popen("sudo reboot", shell=True) def stop_pipeline(self): try: logger.info("Stopping gstreamer pipelines") ret = self.pipeline.set_state(Gst.State.NULL) self.check_pipeline_status(ret) except Exception: logger.error("error stopping gstreamer pipelines", exc_info=True) def pause_pipeline(self): try: logger.info("Pausing gstreamer pipelines") ret = self.pipeline.set_state(Gst.State.PAUSED) self.check_pipeline_status(ret) except Exception: logger.error("Error pausing gstreamer pipeline", exc_info=True) def resume_pipeline(self): try: logger.info("Resuming gstreamer pipelines") ret = self.pipeline.set_state(Gst.State.PLAYING) self.check_pipeline_status(ret) except Exception: logger.error("Error resuming gstreamer pipeline", exc_info=True) def change_bitrate_local(self, bitrate): self.pipeline.get_by_name("lslocal").set_property( "target-bitrate", bitrate) def change_bitrate_remote(self, bitrate): self.pipeline.get_by_name("lsremote").set_property( "target-bitrate", bitrate) def change_framerate_timelapse(self, framerate): videorate_caps = Gst.caps_from_string( 'video/x-raw,framerate=1/{}'.format(framerate)) self.pipeline.get_by_name("timelapse_framerate").set_property( "caps", videorate_caps) def on_file_change_tl(self, bus, filenumber): try: current_timestamp = int(time.time()) current_timestamp_folder = Helper.get_date_yyyymmdd( current_timestamp) self.create_folders(current_timestamp_folder) timelapse_location = "{}/{}/{}-%05d.jpg".format( DirPaths.timelapse_path, current_timestamp_folder, current_timestamp) self.pipeline.get_by_name("timelapse_folder").set_property( "location", timelapse_location) self.change_framerate_timelapse(self.CAPTURE_FPMIN) self.tl_framerate_changed = False except Exception: logger.error("Error with timelapse location change", exc_info=True) def on_file_change_dvr(self, bus, filenumber): try: current_timestamp = int(time.time()) current_timestamp_folder = Helper.get_date_yyyymmdd( current_timestamp) self.create_folders(current_timestamp_folder) DVR_path = "{}/{}".format(DirPaths.DVR_video_path, current_timestamp_folder) DVR_location = "{}/{}{}".format(DVR_path, current_timestamp, MediaConf.EXTENSION) DVR_date_files = Helper.list_all_files_sorted( DVR_path, '*' + MediaConf.EXTENSION, full=True) # call function to fix the previously recorded DVR video if DVR_date_files: fix_dvr_mp4(DVR_date_files[-1]) timelapse_location = "{}/{}/{}-%05d.jpg".format( DirPaths.timelapse_path, current_timestamp_folder, current_timestamp) self.pipeline.get_by_name("dmuxout").set_property( "location", DVR_location) self.pipeline.get_by_name("timelapse_folder").set_property( "location", timelapse_location) self.change_framerate_timelapse(self.CAPTURE_FPMIN) self.tl_framerate_changed = False DVR_start_time = datetime.datetime.fromtimestamp(current_timestamp) DVR_end_time = DVR_start_time + datetime.timedelta(minutes=2) logger.info( "Recording new DVR video for the duration: {} - {}".format( DVR_start_time, DVR_end_time)) logger.info("DVR video name: %s" % current_timestamp) except Exception: logger.error("Splitmuxsink filename change error", exc_info=True) @Helper.run_async def bitrate_monitor_local(self): local_old_drop = self.get_dropped_frames(self.LOCAL_PORT) local_count = self.LOCAL_RUN_COUNT local_current_bitrate = self.LOCAL_MIN_BITRATE while True: if CameraConf.livestream_active: local_drops = self.get_dropped_frames(self.LOCAL_PORT) if local_drops > local_old_drop: local_old_drop = local_drops logger.info( "drop occurred , decremented local bitrate and reset counter" ) if local_current_bitrate > self.LOCAL_MIN_BITRATE: local_current_bitrate -= self.LOCAL_STEP_BITRATE logger.info( "Set Bitrate {}".format(local_current_bitrate)) self.change_bitrate_local(local_current_bitrate) local_count = self.LOCAL_RUN_COUNT else: if local_count > 0: local_count = local_count - 1 if local_count == 0: if local_current_bitrate < self.LOCAL_MAX_BITRATE: logger.info( "reached 10 secs without drop, now increment local bitrate and reset counter" ) local_current_bitrate += self.LOCAL_STEP_BITRATE logger.info( "Set Bitrate {}".format(local_current_bitrate)) self.change_bitrate_local(local_current_bitrate) local_count = self.LOCAL_RUN_COUNT if local_current_bitrate == self.LOCAL_MAX_BITRATE: logger.info("reached max local bitrate: {}".format( local_current_bitrate)) local_count = self.LOCAL_RUN_COUNT logger.debug("Local bitrate monitor {}".format(local_count)) time.sleep(self.delay) @Helper.run_async def bitrate_monitor_remote(self): remote_old_drop = self.get_dropped_frames(self.REMOTE_PORT) remote_count = self.REMOTE_RUN_COUNT remote_current_bitrate = self.REMOTE_MIN_BITRATE while True: if CameraConf.livestream_active: remote_drops = self.get_dropped_frames(self.REMOTE_PORT) if remote_drops > remote_old_drop: remote_old_drop = remote_drops logger.info( "drop occurred, decremented remote bitrate and reset counter" ) if remote_current_bitrate > self.REMOTE_MIN_BITRATE: remote_current_bitrate -= self.REMOTE_STEP_BITRATE logger.info( "Set Bitrate {}".format(remote_current_bitrate)) self.change_bitrate_remote(remote_current_bitrate) remote_count = self.REMOTE_RUN_COUNT else: if remote_count > 0: remote_count = remote_count - 1 if remote_count == 0: if remote_current_bitrate < self.REMOTE_MAX_BITRATE: logger.info( "reached 10 secs without drop, now increment remote bitrate and reset counter" ) remote_current_bitrate += self.REMOTE_STEP_BITRATE logger.info("Set Bitrate {}".format( remote_current_bitrate)) self.change_bitrate_remote(remote_current_bitrate) remote_count = self.REMOTE_RUN_COUNT if remote_current_bitrate == self.REMOTE_MAX_BITRATE: logger.info("reached max remote bitrate:{}".format( remote_current_bitrate)) remote_count = self.REMOTE_RUN_COUNT logger.debug("Remote bitrate monitor {}".format(remote_count)) time.sleep(self.delay) class CameraManager(GstPipeline): def __init__(self, led_object): super(CameraManager, self).__init__(led_object) def decrement_livestream_counter(self): if CameraConf.stream_counter > 0: CameraConf.stream_counter -= 1 logger.debug('stream count decrement : {}'.format( CameraConf.stream_counter)) def increment_livestream_counter(self): CameraConf.livestream_active = True CameraConf.stream_counter += 1 logger.debug('strem count increment : {}'.format( CameraConf.stream_counter)) def check_camera(self): try: os.stat("/dev/video0") return True except OSError: return False def camera_watchdog(self): # check if camera is working, if not wait for 3 minutes and try # again for three times, if not restart the linux system camera_attempt = 0 while camera_attempt < 3: if self.check_camera(): logger.debug("Camera is detected at /dev/video0") break else: camera_attempt += 1 logger.critical("Camera not detected or is not working") time.sleep(180) if camera_attempt == 3: logger.critical("Unable to detected camera after 3 attempts") logger.critical("Restarting the box") subprocess.Popen("sudo reboot", shell=True) @Helper.run_async def motion_timelapse_handler(self): while True: if CameraConf.motion_count > self.TL_SENSITIVITY: if not self.tl_framerate_changed: CameraConf.motion_count = 0 self.change_framerate_timelapse(10) logger.info("Changed framerate for timelapse") self.tl_framerate_changed = True time.sleep(1) def run_pipelines(self): if self.check_camera(): # flip the camera orientation horizontally and vertically subprocess.call("v4l2-ctl -c vertical_flip=1", shell=True) subprocess.call("v4l2-ctl -c horizontal_flip=1", shell=True) # restore the alsa settings logger.info("restoring alsamixer settings") subprocess.call( "alsactl --file /etc/asound.state restore", shell=True) subprocess.call( "arecord -D hw:0 -c2 -r 8000 -f S32_LE -t wav -V mono -d 1 -v test.wav", shell=True) time.sleep(1) # Gstreamer pipelines starts here self.start_pipeline() self.bitrate_monitor_local() self.bitrate_monitor_remote() self.motion_timelapse_handler() else: logger.critical("Camera not detected or is not working") logger.critical("Did not start Gstreamer pieplines")
Editor Settings
Theme
Key bindings
Full width
Lines