您好,欢迎访问一九零五行业门户网

Python使用Redis实现作业调度系统(超简单)

概述
redis是一个开源,先进的key-value存储,并用于构建高性能,可扩展的web应用程序的完美解决方案。
redis从它的许多竞争继承来的三个主要特点:
redis数据库完全在内存中,使用磁盘仅用于持久性。
相比许多键值数据存储,redis拥有一套较为丰富的数据类型。
redis可以将数据复制到任意数量的从服务器。
redis 优势
异常快速:redis的速度非常快,每秒能执行约11万集合,每秒约81000+条记录。
支持丰富的数据类型:redis支持最大多数开发人员已经知道像列表,集合,有序集合,散列数据类型。这使得它非常容易解决各种各样的问题,因为我们知道哪些问题是可以处理通过它的数据类型更好。
操作都是原子性:所有redis操作是原子的,这保证了如果两个客户端同时访问的redis服务器将获得更新后的值。
多功能实用工具:redis是一个多实用的工具,可以在多个用例如缓存,消息,队列使用(redis原生支持发布/订阅),任何短暂的数据,应用程序,如web应用程序会话,网页命中计数等。
步入主题:
redis作为内存数据库的一个典型代表,已经在很多应用场景中被使用,这里仅就redis的pub/sub功能来说说怎样通过此功能来实现一个简单的作业调度系统。这里只是想展现一个简单的想法,所以还是有很多需要考虑的东西没有包括在这个例子中,比如错误处理,持久化等。
下面是实现上的想法
mymaster:集群的master节点程序,负责产生作业,派发作业和获取执行结果。
myslave:集群的计算节点程序,每个计算节点一个,负责获取作业并运行,并将结果发送会master节点。
channel channel_dispatch:每个slave节点订阅一个channel,比如“channel_dispatch_[idx或机器名]”,master会向此channel中publish被dispatch的作业。
channel channel_result:用来保存作业结果的channel,master和slave共享此channel,master订阅此channel来获取作业运行结果,每个slave负责将作业执行结果发布到此channel中。
master代码
#!/usr/bin/env python# -*- coding: utf-8 -*-import timeimport threadingimport randomimport redisredis_host = 'localhost'redis_port = 6379redis_db = 0channel_dispatch = 'channel_dispatch'channel_result = 'channel_result'class mymaster():def __init__(self):passdef start(self):myserverresulthandlethread().start()myserverdispatchthread().start()class myserverdispatchthread(threading.thread):def __init__(self):threading.thread.__init__(self)def run(self):r = redis.strictredis(host=redis_host, port=redis_port, db=redis_db)for i in range(1, 100):channel = channel_dispatch + '_' + str(random.randint(1, 3))print(dispatch job %s to %s % (str(i), channel))ret = r.publish(channel, str(i))if ret == 0:print(dispatch job %s failed. % str(i))time.sleep(5)class myserverresulthandlethread(threading.thread):def __init__(self):threading.thread.__init__(self)def run(self):r = redis.strictredis(host=redis_host, port=redis_port, db=redis_db)p = r.pubsub()p.subscribe(channel_result)for message in p.listen():if message['type'] != 'message':continueprint(received finished job %s % message['data'])if __name__ == __main__:mymaster().start()time.sleep(10000)
说明
mymaster类 - master主程序,用来启动dispatch和resulthandler的线程
myserverdispatchthread类 - 派发作业线程,产生作业并派发到计算节点
myserverresulthandlethread类 - 作业运行结果处理线程,从channel里获取作业结果并显示
slave代码
#!/usr/bin/env python# -*- coding: utf-8 -*-from datetime import datetimeimport timeimport threadingimport randomimport redisredis_host = 'localhost'redis_port = 6379redis_db = 0channel_dispatch = 'channel_dispatch'channel_result = 'channel_result'class myslave():def __init__(self):passdef start(self):for i in range(1, 4):myjobworkerthread(channel_dispatch + '_' + str(i)).start()class myjobworkerthread(threading.thread):def __init__(self, channel):threading.thread.__init__(self)self.channel = channeldef run(self):r = redis.strictredis(host=redis_host, port=redis_port, db=redis_db)p = r.pubsub()p.subscribe(self.channel)for message in p.listen():if message['type'] != 'message':continueprint(%s: received dispatched job %s % (self.channel, message['data']))print(%s: run dispatched job %s % (self.channel, message['data']))time.sleep(2)print(%s: send finished job %s % (self.channel, message['data']))ret = r.publish(channel_result, message['data'])if ret == 0:print(%s: send finished job %s failed. % (self.channel, message['data']))if __name__ == __main__:myslave().start()time.sleep(10000)
说明
myslave类 - slave节点主程序,用来启动myjobworkerthread的线程
myjobworkerthread类 - 从channel里获取派发的作业并将运行结果发送回master
测试
首先运行myslave来定义派发作业channel。
然后运行mymaster派发作业并显示执行结果。
有关python使用redis实现作业调度系统(超简单),小编就给大家介绍这么多,希望对大家有所帮助!
其它类似信息

推荐信息