Source code for node_query_handler

# -*- coding: UTF-8 -*-

"""
| This Modules handles all the query to the nodes.
| it will receive the data from the router_handler and it will send the query to the nodes.
| It will also retry sending the message if the node doesn't answer right or if doesn't answer at all.
| If the nodes confirm the command was received then this module will tell onos to set the web object status 
| to reflect the new node status after the command was received.
|
"""


from globalVar import *






[docs]def make_query_to_remote_node(node_serial_number,query,objName,status_to_set,user,priority,mail_report_list): """ | This function make a query to a powerline/ethernet node and wait the answer from the node. | If the answer is positive | it will add to the priorityCmdQueue the command to change the web_object status | from pending to succesfully changed . | If the answer from the node is an error or the node is not responding | the query will be repeated x times before giving up. """ #time.sleep(0.1) print "make_query_to_remote_node() thread executed" print "i try this query:"+query timeout=0.1 html_response="local_error_in_router_handler_cant_connect_to_node" #wait_timeout=1000 for m in range(0,8): #retry n times to get the answer from node node_address=nodeDict[node_serial_number].getNodeAddress() #update the node address ..maybe has changed.. print "connection try number:"+str(m)+"to ip number"+str(node_address) html_response="local_error_in_router_handler_cant_connect_to_node" received_answer="" flag=0 while (wait_because_node_is_talking==1): #the node is talking to onos...wait ...banana to make it for each node.. print "i rrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrr" time.sleep(0.1) flag=1 if flag==1: time.sleep(0.2) try: s = socket.socket(socket.AF_INET,socket.SOCK_STREAM) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # to reuse the same address...prevent address already in use error s.connect((node_address,node_webserver_port)) # Protocol exchange - sends and receives time_start=time.time() s.settimeout(4) #timeout of 2 second ,don't change this! s.sendall(query) while (exit==0): print "s.recv(1024aaaa)" resp="" try: resp = s.recv(1024) except Exception, e : print "error_qqqq retry",e errorQueue.put("error0 in make_query_to_remote_node() router_handler class trying to query a node the query was"+query+"the number of try is "+str(m)+"error:"+str(e.args)+"at:" +getErrorTimeString() ) break print "after s.recv(1024)" received_answer=received_answer+resp #if (time.time()> (time_start+wait_timeout) ): # print "timeout waiting for answer from node....................................................................." # errorQueue.put("timeout waiting for answer from node, the query was:"+query) # break if received_answer.find("_#]")!=-1: break print resp if received_answer.find("ok")!=-1: # print "message sent" m=1000 break # Close the connection when completed s.close() print "\ndone" except Exception, e : print "error_i retry",e errorQueue.put("error2 in make_query_to_remote_node() router_handler class trying to query a node the query was"+query+"the number of try is "+str(m) +str(e.args)+"at:" +getErrorTimeString() ) print"the query was"+query+"number of try "+str(m) s.close() if m>5: timeout=2 time.sleep(timeout) continue else: # the connection was succesfull if received_answer.find("ok_#]")!=-1: print "msg sent correctly" html_response=received_answer priorityCmdQueue.put( {"cmd":"setSts","webObjectName":objName,"status_to_set":status_to_set,"write_to_hw":0,"user":user,"priority":priority,"mail_report_list":mail_report_list }) return() #break else: html_response=received_answer print "answer received is wrong:"+received_answer time.sleep(timeout) continue print "great error the node did not answer also if tried :"+str(m)+"times, the node will not be setted anymore(probably is not connected)" errorQueue.put("great error the node did not answer also if tried :"+str(m)+"times, the node will not be setted anymore(probably is not connected)" ) return()
[docs]def handle_new_query_to_remote_node_thread(): """ | This is a thread function that will run until every request in the queryToNodeQueue is done. | It will get each query from queryToNodeQueue and call make_query_to_remote_node() | While the query is running the current_node_handler_list will contain the node serialnumber being queried | In this way onos will avoid to make multiple simultaneos query to the same node. """ print "executed handle_new_query_to_remote_node_thread() " global node_query_threads_executing try: #with lock2_query_threads: node_query_threads_executing=node_query_threads_executing+1 while not queryToNodeQueue.empty(): #banana maybe to implement Queue.PriorityQueue() to give priority to certain queries current_query=queryToNodeQueue.get() #queryToNodeQueue.task_done() #banana maybe to remove because not usefull node_serial_number=current_query["node_serial_number"] if (nodeDict[node_serial_number].getNodeActivity()==0): # the node is inactive print "the node"+node_serial_number+"is inactive ,so I delete its query" queryToNodeQueue.put("the node"+node_serial_number+"is inactive ,so I delete its query") continue ##skip to the next query .. with lock1_current_node_handler_list: if ((node_serial_number not in current_node_handler_list)): #or (node_query_threads_executing==1)): current_node_handler_list.append(node_serial_number) print "handle_new_query_to_remote_node_thread excuted with "+node_serial_number else: print "node is already being contacted:q->",current_query queryToNodeQueue.put(current_query) continue print "node_query_threads_executing:",node_query_threads_executing #address=current_query["address"] query=current_query["query"] objName=current_query["objName"] status_to_set=current_query["status_to_set"] user=current_query["user"] priority=current_query["priority"] mail_report_list=current_query["mail_report_list"] while (wait_because_node_is_talking==1): #the node is talking to onos...wait ...todo to make it for each node.. print "i waiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiit" time.sleep(0.1) #print "wait!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!" make_query_to_remote_node(node_serial_number,query,objName,status_to_set,user,priority,mail_report_list) with lock1_current_node_handler_list: print "lock2b from handle_new_query_to_remote_node_thread_remove,query_to_node_dict[node_serial_number]"+node_serial_number current_node_handler_list.remove(node_serial_number) time.sleep(0.1) #delay to not block the node , now the thread will get the next query to execute #here there is no more queries to make except Exception, e : print ("main error in handle_new_query_to_remote_node_thread"+str(e.args)) errorQueue.put("main error in handle_new_query_to_remote_node_thread:"+str(e.args)) with lock1_current_node_handler_list: try: print "lock2c from handle_new_query_to_remote_node_thread_remove,query_to_node_dict[node_serial_number]"+node_serial_number current_node_handler_list.remove(node_serial_number) query_threads_number=query_threads_number-1 except: print "error in current_node_handler_list.remove after main error" if node_query_threads_executing>0: node_query_threads_executing=node_query_threads_executing-1 return()