#!/usr/bin/env python
#
# Copyright (C) 2006 British Broadcasting Corporation and Kamaelia Contributors(1)
# All Rights Reserved.
#
# You may only modify and redistribute this under the terms of any of the
# following licenses(2): Mozilla Public License, V1.1, GNU General
# Public License, V2.0, GNU Lesser General Public License, V2.1
#
# (1) Kamaelia Contributors are listed in the AUTHORS file and at
# http://kamaelia.sourceforge.net/AUTHORS - please extend this file,
# not this notice.
# (2) Reproduced in the COPYING file, and at:
# http://kamaelia.sourceforge.net/COPYING
# Under section 3.5 of the MPL, we are using this text since we deem the MPL
# notice inappropriate for this file. As per MPL/GPL/LGPL removal of this
# notice is prohibited.
#
# Please contact us via: kamaelia-list-owner@lists.sourceforge.net
# to discuss alternative licensing.
# -------------------------------------------------------------------------
# Licensed to the BBC under a Contributor Agreement: RJL
"""\
===========================================
Peer-to-Peer Streaming System (client part)
===========================================
This example demonstrates the use of BitTorrent and HTTP to download, share
reconstruct a data stream in real-time.
It expects a webserver hosting a folder that contains:
- meta.txt (a file containing the number of chunks/torrents in the stream
so far as a decimal, ASCII string)
- 1.torrent
- 2.torrent
- ...
- 123.torrent (if meta.txt contained "123")
Only this metainfo is downloaded using HTTP. The stream itself is downloaded
(and uploaded to other downloaders) using BitTorrent.
Other users must upload the stream's chunks using BitTorrent for this demo
to work.
To listen to/view the stream, just point your favourite media player
(say, XMMS) at the reconstructed file after it's been downloading for a minute
or so.
"""
import time
from Axon.Component import component
from Kamaelia.Chassis.Pipeline import Pipeline
from Kamaelia.File.Writing import SimpleFileWriter
from Kamaelia.File.TriggeredFileReader import TriggeredFileReader
from Kamaelia.Protocol.HTTP.HTTPClient import SimpleHTTPClient
from Kamaelia.Protocol.Torrent.TorrentPatron import TorrentPatron
from Kamaelia.Protocol.Torrent.TorrentIPC import TIPCNewTorrentCreated, TIPCTorrentStatusUpdate
from Kamaelia.Util.Clock import CheapAndCheerfulClock
from Kamaelia.Util.DataSource import TriggeredSource
class StreamReconstructor(component):
"""\
StreamReconstructor()
This component receives reports on the status/completion of BitTorrent
downloads from a TorrentPatron instance. It keeps a record of the
order in which torrents were started and waits until the first is
finished. It then outputs the filename of this torrent and removes
it from its list. Then it waits for the second torrent (now the first
on the list) to finish downloading, then outputs its filename and so on.
If later torrents finish before earlier ones, their filenames are not
output until their all their predecessors have finished.
The purpose of this is output the names of files whose contents should
be concatenated to a master file to reconstruct the stream.
"""
def main(self):
torrents = []
while 1:
yield 1
while self.dataReady("inbox"):
msg = self.recv("inbox")
if isinstance(msg, TIPCNewTorrentCreated):
torrents.append([msg.torrentid, msg.savefolder]) # add the new torrent to the list of known torrents
elif isinstance(msg, TIPCTorrentStatusUpdate):
# if the status update is about the oldest torrent that
# has not been completed prior to now, then...
if len(torrents) > 0 and msg.torrentid == torrents[0][0]:
# if this oldest torrent is now complete
if msg.statsdictionary.get("fractionDone",0) == 1:
# forward on the name of the file downloaded in this torrent
self.send(torrents[0][1], "outbox")
torrents.pop(0) # and remove it from our list of torrents that we care about
while self.dataReady("control"):
msg = self.recv("control")
if isinstance(msg, shutdown) or isinstance(msg, producerFinished):
# if we are being told to shutdown then do so
self.send(producerFinished(self), "signal")
return
self.pause()
class PartsFilenameGenerator(component):
"""\
PartsFilenameGenerator()
Arguments:
- prefix - string to prepend to the id of a torrent to make its URL
- [suffix] - string to append to the id of the torrent to make the URL
defaults to ".torrent"
Generate the URLs of the .torrents that make up the stream
from reports of the total number of chunks/torrents in the stream
that are received on "inbox".
e.g. Assuming it was created as
PartsFilenameGenerator("http://www.example.com/", ".torrent"),
Send it "3" and it will output (one message listed per line):
- "http://www.example.com/1.torrent"
- "http://www.example.com/2.torrent"
- "http://www.example.com/3.torrent"
Then send it "3" again and it will output nothing.
Now send it "5" and it will output:
- "http://www.example.com/4.torrent"
- "http://www.example.com/5.torrent"
"""
def __init__(self, prefix, suffix = ".torrent"):
self.prefix = prefix
self.suffix = suffix
super(self, PartsFilenameGenerator).__init__()
def main(self):
highestseensofar = 0 # we have not outputted any torrent URLs so far
while 1:
yield 1
while self.dataReady("inbox"):
msg = int(self.recv("inbox"))
# output the URLs of all the torrents whose numbers are > the
# number of last torrent output and <= the value of message received
while highestsofar < msg:
highestsofar += 1
self.send(self.prefix + str(highestsofar) + self.suffix, "outbox")
while self.dataReady("control"):
msg = self.recv("control")
if isinstance(msg, shutdown) or isinstance(msg, producerFinished):
self.send(producerFinished(self), "signal")
return
self.pause()
def P2PStreamer(torrentsfolder):
"""\
Arguments:
- torrentsfolder, e.g. "http://my.server.example.org/radioFoo/"
"""
# Create a pipeline of components whose net result is to output the contents of a certain URL
# (torrentsfolder + metafilename) every 60 seconds (the contents at the time of output, i.e.
# it fetches the page every 60 seconds).
poller = Pipeline(
# This generates a message every 60 seconds to wake TriggeredSource
# allowing us to poll the meta file without busy-waiting.
CheapAndCheerfulClock(60.0),
# This sends the string (torrentsfolder + "meta.txt") every time it receives a message
# This string will be the URL of the meta file on the torrent hosting website
# e.g. "http://my.server.example.org/radioFoo/meta.txt"
TriggeredSource(torrentsfolder + "meta.txt"),
# SimpleHTTPClient retrieves the resource specified by the message it receives,
# which will be URL string.
# i.e. It fetches the page whose URL is (torrentsfolder + "meta.txt) (the string
# produced by TriggeredSource) and forwards on the contents of that page.
# The contents of that particular page will always be a number
# (in the form of a decimal ASCII string) which represents the number of
# 'chunks' of the stream that exist
SimpleHTTPClient()
)
# As a whole, streamer acts like a normal streaming client, outputting the contents of
# a stream to its outbox, although in much larger chunks with longer in between chunks
# than for a typical stream.
streamer = Pipeline(
# fetch the P2P-stream meta file every 60 seconds and send its contents on
poller,
# PartsFilenameGenerator uses the number retrived by poller
# i.e. the number of chunks/torrents in the stream
# to generate the URLs of all the .torrent files
# (torrent metadata files) that make up the stream.
# (They will have been named 1.torrent,
# 2.torrent, 3.torrent ... etc. on the server).
PartsFilenameGenerator(torrentsfolder, ".torrent"),
# Download these .torrent files (each message received by resourcefetcher
# will be the URL of one .torrent file it should download). The
# contents of the page downloaded it forwarded on to the next component.
# NOTE: this downloads the .torrent file (metadata about part of the
# stream) not the stream itself
SimpleHTTPClient(),
# now use BitTorrent to download the stream itself using the
# metadata retrieved from .torrent files (each has information about a
# section of the stream - a section itself is typically a few MB of data)
# (TorrentPatron is a BitTorrent client component)
TorrentPatron(),
# output the names of the chunks of the stream as soon as they and
# all previous chunks have been downloaded
StreamReconstructor(),
# read the contents of these chunks (files)
TriggeredFileReader(),
)
return streamer
if __name__ == '__main__':
# ask the user from which website we should get the stream's metadata
# e.g. "http://my.server.example.org/radioFoo/"
torrentsfolder = raw_input("P2P-stream meta folder (URL): ")
Pipeline(
# fetch the stream using BitTorrent and HTTP - see above for details
P2PStreamer(torrentsfolder),
# write the stream to a file on disk
SimpleFileWriter("myreconstructedstream.mp3")
).run()