用python模拟电梯程序

时间:2024-05-21 08:37:41

电梯使用起来非常的方便,只要按个按钮就可以去到想去的楼层。那么电梯是如何工作的,它内部的算法是如何起作用的呢?我们又如何编写电梯的程序呢?在写代码之前,我们先要知道电梯的工作原理。
用python模拟电梯程序
如上图所示,电梯的工作是基于多线程的。它由消息线程,状态机线程,以及开关门子线程共同协调完成动作。消息线程产生消息队列(MsgQueue),普通消息(非开门关门消息)按产生的时间顺序依次进入消息队列,状态机线程按顺序不断的读取消息队列里的消息,每读取一条消息后,执行消息改变自身状态,然后读取下一条消息。而如果消息线程中产生了开、关门的消息(ExitFlag),那么此时先判断可行性(比如电梯在运行过程中无法开关门),若可行,则立即中断其它动作,执行此消息。
首先我们给电梯定义几个状态,然后定义各种消息,定义楼层的高度,最后写出状态改变的具体逻辑。如图,我们给消息和状态都编码。

TOP = 6
BOTTOM = 1
STATE = {0:"门停开着",1:"门停关着",2:"电梯上升",3:"电梯下降"}
DIR = {0:"向下", 1:"向上"}
# 消息编码:0 00关门 ,01 开门, 02 有人进
#           1 11去1L ,12去2L,13去三楼,14去4楼,15去五楼,16去六楼
#           2 21一楼有人按上,22二楼有人按上,23三楼有人按上,24四楼有人按上,25五楼有人按上
#           3 32二楼有人按下,33三楼有人按下,34四楼有人按下,35五楼有人按下,36六楼有人按下

状态机线程:

状态机就是状态转移图。举个最简单的例子。人有三个状态健康,感冒,康复中。触发的条件有淋雨(t1),吃药(t2),打针(t3),休息(t4)。所以状态机就是健康-(t4)->健康;健康-(t1)->感冒;感冒-(t3)->健康;感冒-(t2)->康复中;康复中-(t4)->健康,等等。就是这样状态在不同的条件下跳转到自己或不同状态的图。现在,我们来看看电梯的状态机:
用python模拟电梯程序

消息线程:

真正生活中的电梯,它的消息来自于使用者按按钮,然后通过传感器设备采集信号传给中心处理系统,我们这里直接忽略硬件部分,用一些随机数字产生消息代码。首先定义一个消息的class,它包括消息类型type和消息值value。

class Msg:
    def __init__(self,type,value):
        self.type = type
        self.value = value

exitFlag = []
MsgQueue = []


def Msgfunction():

    global  MsgQueue,exitFlag
    for i in range(4):
        type = random.randint(0, 3)
        value = 0
        if type == 0:
            value = random.randint(0, 2)
            if lock.acquire():
                exitFlag.append(1)
                lock.release()
        if type == 2:
            value = random.randint(BOTTOM,TOP-1)
        if type == 3:
            value = random.randint(BOTTOM+1,TOP)
        if type == 1:
            value = random.randint(BOTTOM, TOP)

        TIME = random.randint(1, 8)
        m = Msg(type, value)
        if lock.acquire():
            MsgQueue.append(m)
            print("产生消息编码:"+ str([m.type,m.value]))
            lock.release()
        time.sleep(TIME)

注意到该函数最下面有一个锁死语句,即消息队列在增加消息时,锁死了消息队列,不让其它线程访问。这里主要是因为,状态机线程在完成一个消息动作后,会将已完成的消息弹出消息队列,而这个过程与上面添加消息是互斥的,因为他们都需要访问资源MsgQueue。

开门关门子线程:

相比上面的两个线程,这个线程相对难一点。主要是考虑到实时性。首先,要考虑两个问题。什么时候可以开关门,不能开关门要怎样。电梯只有处在状态“停门关着”才能开门,而只有处在状态 “门停开着”才能关门。如果此时电梯处于其它状态,则直接忽视掉开关门命令即可。但是又有问题了,通常我们会碰到这样的问题,比如门正在关闭中,电梯里面有人按了开门按钮,或者外面有人进来,那么此时电梯的状态要为“门停关着”,也就是说在执行动作关门前,提前将状态转换过去。
此外,开关门线程只在调用开关门函数时起作用,电梯下一次状态的转换必须等待开关门动作完成,而开关门线程运行时,还可以重启开关门线程,譬如“有人进”。

def closeThread():
    global exitFlag
    counter = 3
    print("正在关门...")
    while counter:
        if exitFlag != [1]:
            print("关门终止")
            break
        time.sleep(1)
        counter -=1
    if counter == 0:
        print("已关门")


def closedoor():

    t = threading.Thread(target=closeThread)
    t.start()
    t.join()


def openThread():
    global exitFlag
    counter = 3
    print("正在开门...")
    while counter:
        if exitFlag != [1]:
            print("开门终止")
            break
        time.sleep(1)
        counter -=1
    if counter == 0:
        print("已开门")


def opendoor():

    t = threading.Thread(target=openThread)
    t.start()
    t.join()

主线程:

主线程就简单了,只需要负责启动状态机线程和消息队列线程。运行主线程,电梯就开始运作啦。

if __name__ == "__main__":

    thread1 = threading.Thread(target=Msgfunction)
    thread2 = threading.Thread(target=statemachine)
    thread1.start()
    thread2.start()

好了,源码放在后面了。

源代码:
import time
import threading
import random

TOP = 6
BOTTOM = 1
STATE = {0:"门停开着",1:"门停关着",2:"电梯上升",3:"电梯下降"}
DIR = {0:"向下", 1:"向上"}
# 消息编码:0 00关门 ,01 开门, 02 有人进
#           1 11去1L ,12去2L,13去三楼,14去4楼,15去五楼,16去六楼
#           2 21一楼有人按上,22二楼有人按上,23三楼有人按上,24四楼有人按上,25五楼有人按上
#           3 32二楼有人按下,33三楼有人按下,34四楼有人按下,35五楼有人按下,36六楼有人按下
lock = threading.Lock()
class Msg:
    def __init__(self,type,value):
        self.type = type
        self.value = value

exitFlag = []
MsgQueue = []


def Msgfunction():

    global  MsgQueue,exitFlag
    for i in range(4):
        type = random.randint(0, 3)
        value = 0
        if type == 0:
            value = random.randint(0, 2)
            if lock.acquire():
                exitFlag.append(1)
                lock.release()
        if type == 2:
            value = random.randint(BOTTOM,TOP-1)
        if type == 3:
            value = random.randint(BOTTOM+1,TOP)
        if type == 1:
            value = random.randint(BOTTOM, TOP)

        TIME = random.randint(1, 8)
        m = Msg(type, value)
        if lock.acquire():
            MsgQueue.append(m)
            print("产生消息编码:"+ str([m.type,m.value]))
            lock.release()
        time.sleep(TIME)


def closed(state, cur, d):

    if d == 1:
        if startup(cur,d):
            state = 2
        else:
            d = 0
            if startup(cur,d):
                state = 3
            else:
                return state,cur,d
    else:
        if startup(cur,d):
            state = 3
        else:
            d = 1
            if startup(cur,d):
                state = 2
            else:
                return state,cur,d
    return state,cur,d

def up(state,cur,d):
    while True:
        state = state
        if stop(cur,d):
            state = 1
            print("当前状态:%s,当前楼层:%d,运行方向:%s" % (STATE[state], cur, DIR[d]))
            break
        cur +=1
        print("正在前往第%d层..." % cur)
        time.sleep(2)
    return state,cur,d


def down(state,cur,d):
    while True:
        state = state
        if stop(cur,d):
            state = 1
            print("当前状态:%s,当前楼层:%d,运行方向:%s" % (STATE[state], cur, DIR[d]))
            break
        cur -=1
        print("正在前往第%d层..." % cur)
        time.sleep(2)
        #print("当前状态:%s,当前楼层:%d,运行方向:%d" % (STATE[state], cur, d))
        #time.sleep(10)

    return state,cur,d


def startup(cur,d):
    global MsgQueue
    tmp = False
    if d == 1:
        for m in MsgQueue:
            if m.type == 1 and m.value > cur:
                tmp = True
            if m.type == 2 and m.value > cur:
                tmp = True
            if m.type == 3 and m.value > cur:
                tmp = True
    if d == 0:
        for m in MsgQueue:
            if m.type == 1 and m.value < cur:
                tmp = True
            if m.type == 2 and m.value < cur:
                tmp = True
            if m.type == 3 and m.value < cur:
                tmp = True

    return tmp


def stop(cur,d):
    global MsgQueue
    tmp = False
    if d == 1:
        if cur == TOP:
            tmp = True
        tmplist = MsgQueue[:]
        for m in MsgQueue:
            if m.type == 1 and m.value == cur:
                tmp = True
                tmplist.remove(m)
            if m.type == 2 and m.value == cur:
                tmp = True
                tmplist.remove(m)
        MsgQueue = tmplist[:]
    if d == 0:
        if cur == BOTTOM:
            tmp = True
        tmplist = MsgQueue[:]
        for m in MsgQueue:
            if m.type == 1 and m.value == cur:
                tmp = True
                tmplist.remove(m)
            if m.type == 3 and m.value == cur:
                tmp = True
                tmplist.remove(m)
        MsgQueue = tmplist[:]
    return tmp


def closeThread():
    global exitFlag
    counter = 3
    print("正在关门...")
    while counter:
        if exitFlag != [1]:
            print("关门终止")
            break
        time.sleep(1)
        counter -=1
    if counter == 0:
        print("已关门")



def closedoor():

    t = threading.Thread(target=closeThread)
    t.start()
    t.join()



def openThread():
    global exitFlag
    counter = 3
    print("正在开门...")
    while counter:
        if exitFlag != [1]:
            print("开门终止")
            break
        time.sleep(1)
        counter -=1
    if counter == 0:
        print("已开门")


def opendoor():

    t = threading.Thread(target=openThread)
    t.start()
    t.join()


def statemachine():
    global MsgQueue,exitFlag
    state = 0
    cur = 1
    d = 0
    while True:
        time.sleep(0.3)
        print("当前状态:%s,当前楼层:%d,运行方向:%s" % (STATE[state], cur, DIR[d]))
        if MsgQueue == [] and state == 1:
            continue
        if exitFlag != []:
            tmplist = MsgQueue[:]
            for m in tmplist:
                if m.type == 0 and m.value == 0:
                    if state == 0:
                        state = 1
                        closedoor()
                    exitFlag.pop(0)
                    tmplist.remove(m)
                if m.type == 0 and m.value == 1:
                    if state == 1 or state == 0:
                        state = 0
                        opendoor()
                    exitFlag.pop(0)
                    tmplist.remove(m)
                if m.type == 0 and m.value == 2:
                    if state == 1:
                        state = 0
                        opendoor()
                    exitFlag.pop(0)
                    tmplist.remove(m)
            MsgQueue = tmplist[:]
            continue

        if state == 0:
            counter = 4
            while counter:
                if exitFlag != []:
                    print("超时终止")
                    break
                time.sleep(1)
                counter -= 1
            if counter == 0:
                print("超时")
                exitFlag.append(1)
                closedoor()
                exitFlag.pop(0)
                state = 1
            continue
        if state == 1:
            if MsgQueue == []:
                continue
            state, cur, d = closed(state, cur, d)
            continue
        if state == 2:
            if MsgQueue == []:
                continue
            state,cur,d = up(state, cur, d)
            if state == 1:
                exitFlag.append(1)
                opendoor()
                exitFlag.pop(0)
                state = 0
            continue
        if state == 3:
            if MsgQueue == []:
                continue
            state,cur,d = down(state, cur, d)
            if state == 1:
                exitFlag.append(1)
                opendoor()
                exitFlag.pop(0)
                state = 0
            continue


if __name__ == "__main__":

    thread1 = threading.Thread(target=Msgfunction)
    thread2 = threading.Thread(target=statemachine)
    thread1.start()
    thread2.start()